<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Hudi 實踐 | 基于 Apache Hudi 構(gòu)建分析型數(shù)據(jù)湖

          共 7943字,需瀏覽 16分鐘

           ·

          2022-07-16 19:03



          為了更好地發(fā)展業(yè)務(wù),每個組織都在迅速采用分析。在分析過程的幫助下,產(chǎn)品團隊正在接收來自用戶的反饋,并能夠以更快的速度交付新功能。通過分析提供的對用戶的更深入了解,營銷團隊能夠調(diào)整他們的活動以針對特定受眾。只有當(dāng)我們能夠大規(guī)模提供分析時,這一切才有可能。

          對數(shù)據(jù)湖的需求

          在 NoBrokercom[1],出于操作目的,事務(wù)數(shù)據(jù)存儲在基于 SQL 的數(shù)據(jù)庫中,事件數(shù)據(jù)存儲在 No-SQL 數(shù)據(jù)庫中。這些應(yīng)用程序 dB 未針對分析工作負載進行調(diào)整。此外,為了更全面地了解客戶和業(yè)務(wù),通常需要跨交易和事件數(shù)據(jù)加入數(shù)據(jù)。這些限制大大減慢了分析過程。為了解決這些問題,我們開發(fā)了一個名為 STARSHIP 的數(shù)據(jù)平臺,它提供了所有 Nobroker 數(shù)據(jù)的集中存儲庫,并且可以通過 SQL 訪問。

          STARSHIP 正在為 40TB+ 快速發(fā)展的數(shù)據(jù)提供分析。在 Nobroker 上發(fā)生的任何事件或交易,都可以在 30 分鐘內(nèi)在 Starship 中進行分析。

          它的一個組成部分是構(gòu)建針對分析優(yōu)化的數(shù)據(jù)存儲層。Parquet 和 ORC 數(shù)據(jù)格式提供此功能,但它們?nèi)鄙俑潞蛣h除功能。

          Apache Hudi

          Apache Hudi 是一個開源數(shù)據(jù)管理框架,提供列數(shù)據(jù)格式的記錄級插入、更新和刪除功能。我們在將數(shù)據(jù)帶到 STARSHIP 的所有 ETL 管道中廣泛使用 Apache Hudi。我們使用 Apache Hudi 的 DeltaStreamer 實用程序采用增量數(shù)據(jù)攝取。我們已經(jīng)能夠增強 DeltaStreamer 以適應(yīng)我們的業(yè)務(wù)邏輯和數(shù)據(jù)特征。

          DeltaStreamer

          在到達分布式云存儲之前,數(shù)據(jù)通過 Apache Hudi 中的多個相互連接的模塊進行處理。這些模塊可以獨立工作,也可以通過 Delta-streamer 實用程序工作,從而簡化整個 ETL 流程。盡管提供的默認功能有限,但它允許使用可擴展的 Java 類進行定制。

          源讀取器

          源讀取器是 Hudi 數(shù)據(jù)處理中的第一個也是最重要的模塊,用于從上游讀取數(shù)據(jù)。Hudi 提供支持類,可以從本地文件(如 JSON、Avro 和 Kafka 流)讀取。在我們的數(shù)據(jù)管道中,CDC 事件以 Avro 格式生成到 Kafka。我們擴展了源類以添加來自 Kafka 的增量讀取,每次讀取一個特定的編號。來自存儲的檢查點的消息,我們添加了一項功能,將 Kafka 偏移量附加為數(shù)據(jù)列。

          # Reading data from Kafka from given Offset ranges
          baseConsumerRDD = KafkaUtils.createRDD(
                                                 sparkContext,          
                                                 KafkaParams, 
                                                 offsetRanges,
                                                 consistent_location_strategy,
                                                 )
                                      .filter(x -> x != null)
                                      .filter(x -> x.value() != null);
          # Adding Message offset to the data
          baseRDD = baseConsumerRDD.map(x ->"{ 
                                              \"starship_offset\":"+x.offset()
                                              +","
                                              +"\"starship_value\": " 
                                              + x.value().toString() + 
                                             "}"
                                        );
          # Reading into Spark data frame & Applying schema
          table_df = sparkSession.read()
                                 .schema(table.getIncomingSchema())
                                 .json(baseRDD)
                                 .select(
                                         "starship_value.*",
                                         "starship_offset"
                                         );

          在初始數(shù)據(jù)讀取之后,我們還強制執(zhí)行從 Kafka 模式注冊表或用戶提供的自定義模式獲取的模式。

          業(yè)務(wù)邏輯處理器

          從 Source reader 帶入 Spark 數(shù)據(jù)幀的數(shù)據(jù)將采用原始格式。為了使其可用于分析,我們需要對數(shù)據(jù)進行清理、標(biāo)準化和添加業(yè)務(wù)邏輯。STARSHIP 中的每個數(shù)據(jù)點都經(jīng)過以下轉(zhuǎn)換,以確保數(shù)據(jù)質(zhì)量。

          • ? case標(biāo)準化:下/上case。

          • ? 日期格式轉(zhuǎn)換:將各種字符串日期格式轉(zhuǎn)換為毫秒。

          • ? 時區(qū)標(biāo)準化:將所有時區(qū)的數(shù)據(jù)轉(zhuǎn)換為 UTC。

          • ? 電話號碼標(biāo)準化:將電話號碼格式化為“國家代碼 - 電話號碼”格式。

          • ? 數(shù)據(jù)類型轉(zhuǎn)換:將引用的數(shù)字轉(zhuǎn)換為 Int/Long,轉(zhuǎn)換為文本格式等。

          • ? 屏蔽和散列:使用散列算法屏蔽敏感信息。

          • ? 自定義 SQL 查詢處理:如果需要對特定列應(yīng)用自定義過濾器,它們可以作為 SQL 子句傳遞。

          • ? 地理點數(shù)據(jù)處理:將地理點數(shù)據(jù)處理為 Parquet 支持的格式。

          • ? 列標(biāo)準化:將所有列名轉(zhuǎn)換為蛇形大小寫并展平任何嵌套列。

          鍵生成器

          Hudi 中的每一行都使用一組鍵表示,以提供行級別的更新和刪除。Hudi 要求每個數(shù)據(jù)點都有一個主鍵、一個排序鍵以及在分區(qū)的情況下還需要一個分區(qū)鍵。

          • ? 主鍵:識別一行是更新還是新插入。

          • ? 排序鍵:識別當(dāng)前批次事件中每個主鍵的最新事件,以防同一批次中同一行出現(xiàn)多個事件。

          • ? 分區(qū)鍵:以分區(qū)格式寫入數(shù)據(jù)。

          對來自 CDC 管道的事件進行排序變得很棘手,尤其是在同一邏輯處理多種類型的流時。為此,我們編寫了一個鍵生成器類,它根據(jù)輸入數(shù)據(jù)流源處理排序邏輯,并提供對多個鍵作為主鍵的支持。

          Parquet寫入器

          一旦數(shù)據(jù)處于最終轉(zhuǎn)換格式,Hudi writer 將負責(zé)寫入過程。每個新的數(shù)據(jù)攝取周期稱為一次提交并與提交編號相關(guān)聯(lián)。

          • ? 提交開始:攝取從在云存儲中創(chuàng)建的“

            .commit_requested”文件開始。
          • ? 提交飛行:一旦處理完所有轉(zhuǎn)換后開始寫入過程,就會創(chuàng)建一個“

            .commit_inflight”文件。
          • ? 提交結(jié)束:一旦數(shù)據(jù)成功寫入磁盤,就會創(chuàng)建最終的“

            .commit”文件。

          只有當(dāng)最終的 .commit 文件被創(chuàng)建時,攝取過程才被稱為成功。萬一發(fā)生故障,Hudi writer 會回滾對 parquet 文件所做的任何更改,并從最新的可用 .commit 文件中獲取新的攝取。如果我們每次提交都編寫新的 Parquet 文件,我們最終會得到一個很大的數(shù)字。小文件會減慢分析過程。為此,每次有新插入時,Hudi writer 會識別是否有任何小文件并向它們添加新插入,而不是寫入新文件。

          在 Nobroker,我們確保每個 parquet 文件的大小至少為 100MB,以優(yōu)化分析的速度。

          數(shù)據(jù)索引

          除了寫入數(shù)據(jù),Hudi 還跟蹤特定行的存儲位置,以加快更新和刪除速度。此信息存儲在稱為索引的專用數(shù)據(jù)結(jié)構(gòu)中。Hudi 提供了多種索引實現(xiàn),例如布隆過濾器、簡單索引和 HBase 索引Hudi表。我們從布隆過濾器開始,但隨著數(shù)據(jù)的增加和用例的發(fā)展,我們轉(zhuǎn)向 HBase 索引,它提供了非常快速的行元數(shù)據(jù)檢索。

          HBase 索引將我們的 ETL 管道的資源需求減少了 30%。

          Schema寫入器

          一旦數(shù)據(jù)被寫入云存儲,我們應(yīng)該能夠在我們的平臺上自動發(fā)現(xiàn)它。為此,Hudi 提供了一個模式編寫器,它可以更新任何用戶指定的模式存儲庫,了解新數(shù)據(jù)庫、表和添加到數(shù)據(jù)湖的列。我們使用 Hive 作為我們的集中Schema存儲庫。默認情況下Hudi 將源數(shù)據(jù)中的所有列以及所有元數(shù)據(jù)字段添加到模式存儲庫中。由于我們的數(shù)據(jù)平臺面向業(yè)務(wù),我們確保在編寫Schema時跳過元數(shù)據(jù)字段。這對性能沒有影響,但為分析用戶提供了更好的體驗。

          在 Schema writer 的幫助下,業(yè)務(wù)可以在上游數(shù)據(jù)中添加一個新的特性,并且它可以在我們的數(shù)據(jù)平臺上使用,而無需任何人工干預(yù)。

          Cleaner

          在攝取過程中,會創(chuàng)建大量元數(shù)據(jù)文件和臨時文件。如果保持不變,它們會降低分析性能。Hudi 確保所有不必要的文件在需要時被歸檔和刪除。每次發(fā)生新的攝取時,一些現(xiàn)有的 Parquet 文件都會推出一個新版本。舊版本可用于跟蹤事件時間線和使查詢運行更長時間。他們慢慢地填滿了存儲空間。為此,Cleaner 提供了 2 種減少存儲空間的方法

          • ? KEEP_LATEST_FILE_VERSIONS :最新的文件版本被保留,而舊的被刪除。

          • ? KEEP_LATEST_COMMITS :僅保留 n 個最新提交寫入的文件版本。

          我們的數(shù)據(jù)平臺經(jīng)過調(diào)整,可在 1 分鐘內(nèi)提供交互式查詢/報告。同時,我們確保舊文件版本最多保留 1 小時,以支持長時間運行的數(shù)據(jù)科學(xué)工作負載。

          Apache Hudi 是 Starship Data 平臺最重要的部分之一。我們還有更多組件提供其他功能,例如可視化、交互式查詢引擎等。

          推薦閱讀
          B站基于Apache Hudi的增量數(shù)據(jù)湖探索與實踐
          Apache Hudi vs Delta Lake:透明TPC-DS Lakehouse性能基準
          詳解Apache Hudi Schema Evolution(模式演進)
          Apache Hudi數(shù)據(jù)跳過技術(shù)加速查詢高達50倍
          深入理解Apache Hudi異步索引機制

          引用鏈接

          [1] NoBrokercom: [https://medium.com/@NoBroker](https://medium.com/@NoBroker)

          瀏覽 81
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  91av影院在线观看 | 国产日产精品一区二区三区 | 狠狠狠操 | 国产l精品久久久久久久久久 | 国产精品 A片在线 |