<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Hudi 原理 | Apache Hudi 核心概念總覽

          共 6100字,需瀏覽 13分鐘

           ·

          2021-03-26 05:40


          1. 場景

          https://hudi.apache.org/docs/use_cases.html

          ?近實時寫入?減少碎片化工具的使用?CDC 增量導(dǎo)入 RDBMS 數(shù)據(jù)?限制小文件的大小和數(shù)量?近實時分析?相對于秒級存儲 (Druid, OpenTSDB) ,節(jié)省資源?提供分鐘級別時效性,支撐更高效的查詢?Hudi 作為 lib,非常輕量?增量 pipeline?區(qū)分 arrivetime 和 event time 處理延遲數(shù)據(jù)?更短的調(diào)度 interval 減少端到端延遲 (小時 -> 分鐘) => Incremental Processing?增量導(dǎo)出?替代部分 Kafka 的場景,數(shù)據(jù)導(dǎo)出到在線服務(wù)存儲 e.g. ES

          2. 概念/術(shù)語

          https://hudi.apache.org/docs/concepts.html

          2.1 Timeline

          Timeline 是 HUDI 用來管理提交(commit)的抽象,每個 commit 都綁定一個固定時間戳,分散到時間線上。在 Timeline 上,每個 commit 被抽象為一個 HoodieInstant,一個 instant 記錄了一次提交 (commit) 的行為、時間戳、和狀態(tài)。HUDI 的讀寫 API 通過 Timeline 的接口可以方便的在 commits 上進(jìn)行條件篩選,對 history 和 on-going 的 commits 應(yīng)用各種策略,快速篩選出需要操作的目標(biāo) commit。

          2.2 Time

          Arrival time: 數(shù)據(jù)到達(dá) Hudi 的時間,commit time

          Event time: record 中記錄的時間

          上圖中采用時間(小時)作為分區(qū)字段,從 10:00 開始陸續(xù)產(chǎn)生各種 commits,10:20 來了一條 9:00 的數(shù)據(jù),該數(shù)據(jù)仍然可以落到 9:00 對應(yīng)的分區(qū),通過 timeline 直接消費(fèi) 10:00 之后的增量更新(只消費(fèi)有新 commits 的 group),那么這條延遲的數(shù)據(jù)仍然可以被消費(fèi)到。

          2.3 文件管理

          2.3.1 文件版本

          一個新的 base commit time 對應(yīng)一個新的 FileSlice,實際就是一個新的數(shù)據(jù)版本。HUDI 通過 TableFileSystemView 抽象來管理 table 對應(yīng)的文件,比如找到所有最新版本 FileSlice 中的 base file (Copy On Write Snapshot 讀)或者 base + log files(Merge On Read 讀)。通過 Timeline 和 TableFileSystemView 抽象,HUDI 實現(xiàn)了非常便捷和高效的表文件查找。

          2.3.3 文件格式

          Hoodie 的每個 FileSlice 中包含一個 base file (merge on read 模式可能沒有)和多個 log file (copy on write 模式?jīng)]有)。

          每個文件的文件名都帶有其歸屬的 FileID(即 FileGroup Identifier)和 base commit time(即 InstanceTime)。通過文件名的 group id 組織 FileGroup 的 logical 關(guān)系;通過文件名的 base commit time 組織 FileSlice 的邏輯關(guān)系。

          HUDI 的 base file (parquet 文件) 在 footer 的 meta 去記錄了 record key 組成的 BloomFilter,用于在 file based index 的實現(xiàn)中實現(xiàn)高效率的 key contains 檢測。只有不在 BloomFilter 的 key 才需要掃描整個文件消滅假陽。

          HUDI 的 log (avro 文件)是自己編碼的,通過積攢數(shù)據(jù) buffer 以 LogBlock 為單位寫出,每個 LogBlock 包含 magic number、size、content、footer 等信息,用于數(shù)據(jù)讀、校驗和過濾。

          2.4 Index

          Hoodie key (record key + partition path) 和 file id (FileGroup) 之間的映射關(guān)系,數(shù)據(jù)第一次寫入文件后保持不變,所以,一個 FileGroup 包含了一批 record 的所有版本記錄。Index 用于區(qū)分消息是 INSERT 還是 UPDATE。

          2.4.1 Index的創(chuàng)建過程

          1. BloomFilter Index

          ?新增 records 找到映射關(guān)系:record key => target partition?當(dāng)前最新的數(shù)據(jù) 找到映射關(guān)系:partition => (fileID, minRecordKey, maxRecordKey) LIST (如果是 base files 可加速)?新增 records 找到需要搜索的映射關(guān)系:fileID => HoodieKey(record key + partition path) LIST,key 是候選的 fileID?通過 HoodieKeyLookupHandle 查找目標(biāo)文件(通過 BloomFilter 加速)

          2. Flink State-based Index

          HUDI 在 0.8.0 版本中實現(xiàn)的 Flink witer,采用了 Flink 的 state 作為底層的 index 存儲,每個 records 在寫入之前都會先計算目標(biāo) bucket ID,不同于 BloomFilter Index,避免了每次重復(fù)的文件 index 查找。

          2.5 Table 類型

          2.5.1 Copy On Write

          Copy On Write 類型表每次寫入都會生成一個新的持有 base file(對應(yīng)寫入的 instant time ) 的 FileSlice。

          用戶在 snapshot 讀取的時候會掃描所有最新的 FileSlice 下的 base file。

          2.5.2 Merge On Read

          Merge On Read 表的寫入行為,依據(jù) index 的不同會有細(xì)微的差別:

          ?對于 BloomFilter 這種無法對 log file 生成 index 的索引方案,對于 INSERT 消息仍然會寫 base file (parquet format),只有 UPDATE 消息會 append log 文件(因為 base file 已經(jīng)記錄了該 UPDATE 消息的 FileGroup ID)。?對于可以對 log file 生成 index 的索引方案,例如 Flink writer 中基于 state 的索引,每次寫入都是 log format,并且會不斷追加和 roll over。

          Merge On Read 表的讀在 READ OPTIMIZED 模式下,只會讀最近的經(jīng)過 compaction 的 commit。

          3. 數(shù)據(jù)寫

          3.1 寫操作

          ?UPSERT:默認(rèn)行為,數(shù)據(jù)先通過 index 打標(biāo)(INSERT/UPDATE),有一些啟發(fā)式算法決定消息的組織以優(yōu)化文件的大小 => CDC 導(dǎo)入?INSERT:跳過 index,寫入效率更高 => Log Deduplication?BULK_INSERT:寫排序,對大數(shù)據(jù)量的 Hudi 表初始化友好,對文件大小的限制 best effort(寫 HFile)

          3.1.1 寫流程(UPSERT)

          1. Copy On Write

          ?先對 records 按照 record key 去重?首先對這批數(shù)據(jù)創(chuàng)建索引 (HoodieKey => HoodieRecordLocation);通過索引區(qū)分哪些 records 是 update,哪些 records 是 insert(key 第一次寫入)?對于 update 消息,會直接找到對應(yīng) key 所在的最新 FileSlice 的 base 文件,并做 merge 后寫新的 base file (新的 FileSlice)?對于 insert 消息,會掃描當(dāng)前 partition 的所有 SmallFile(小于一定大小的 base file),然后 merge 寫新的 FileSlice;如果沒有 SmallFile,直接寫新的 FileGroup + FileSlice

          2. Merge On Read

          ?先對 records 按照 record key 去重(可選)?首先對這批數(shù)據(jù)創(chuàng)建索引 (HoodieKey => HoodieRecordLocation);通過索引區(qū)分哪些 records 是 update,哪些 records 是 insert(key 第一次寫入)?如果是 insert 消息,如果 log file 不可建索引(默認(rèn)),會嘗試 merge 分區(qū)內(nèi)最小的 base file (不包含 log file 的 FileSlice),生成新的 FileSlice;如果沒有 base file 就新寫一個 FileGroup + FileSlice + base file;如果 log file 可建索引,嘗試 append 小的 log file,如果沒有就新寫一個 FileGroup + FileSlice + base file?如果是 update 消息,寫對應(yīng)的 file group + file slice,直接 append 最新的 log file(如果碰巧是當(dāng)前最小的小文件,會 merge base file,生成新的 file slice)log file 大小達(dá)到閾值會 roll over 一個新的

          3.1.2 寫流程(INSERT)

          1. Copy On Write

          ?先對 records 按照 record key 去重(可選)?不會創(chuàng)建 Index?如果有小的 base file 文件,merge base file,生成新的 FileSlice + base file,否則直接寫新的 FileSlice + base file

          2. Merge On Read

          ?先對 records 按照 record key 去重(可選)?不會創(chuàng)建 Index?如果 log file 可索引,并且有小的 FileSlice,嘗試追加或?qū)懽钚碌?log file;如果 log file 不可索引,寫一個新的 FileSlice + base file

          3.1.3 工具

          ?DeltaStreamer?Datasource Writer?Flink SQL API

          3.1.4 Key 生成策略

          用來生成 HoodieKey(record key + partition path),目前支持以下策略:

          ?支持多個字段組合 record keys?支持多個字段組合的 parition path (可定制時間格式,Hive style path name)?非分區(qū)表

          3.1.5 刪除策略

          ?邏輯刪:將 value 字段全部標(biāo)記為 null?物理刪:?通過 OPERATION_OPT_KEY 刪除所有的輸入記錄?配置 PAYLOAD_CLASS_OPT_KEY = org.apache.hudi.EmptyHoodieRecordPayload 刪除所有的輸入記錄?在輸入記錄添加字段:_hoodie_is_deleted

          4. 數(shù)據(jù)讀

          4.1 Snapshot 讀

          讀取所有 partiiton 下每個 FileGroup 最新的 FileSlice 中的文件,Copy On Write 表讀 parquet 文件,Merge On Read 表讀 parquet + log 文件

          4.2 Incremantal 讀

          https://hudi.apache.org/docs/querying_data.html#spark-incr-query,當(dāng)前的 Spark data source 可以指定消費(fèi)的起始和結(jié)束 commit 時間,讀取 commit 增量的數(shù)據(jù)集。但是內(nèi)部的實現(xiàn)不夠高效:拉取每個 commit 的全部目標(biāo)文件再按照系統(tǒng)字段 hoodie_commit_time apply 過濾條件。

          4.3 Streaming 讀

          0.8.0 版本的 HUDI Flink writer 支持實時的增量訂閱,可用于同步 CDC 數(shù)據(jù),日常的數(shù)據(jù)同步 ETL pipeline。Flink 的 streaming 讀做到了真正的流式讀取,source 定期監(jiān)控新增的改動文件,將讀取任務(wù)下派給讀 task。

          5. Compaction

          ?沒有 base file:走 copy on write insert 流程,直接 merge 所有的 log file 并寫 base file?有 base file:走 copy on write upsert 流程,先讀 log file 建 index,再讀 base file,最后讀 log file 寫新的 base file

          Flink 和 Spark streaming 的 writer 都可以 apply 異步的 compaction 策略,按照間隔 commits 數(shù)或者時間來觸發(fā) compaction 任務(wù),在獨(dú)立的 pipeline 中執(zhí)行。

          6. 總結(jié)

          通過對寫流程的梳理我們了解到 HUDI 相對于其他數(shù)據(jù)湖方案的核心優(yōu)勢:

          ?寫入過程充分優(yōu)化了文件存儲的小文件問題,Copy On Write 寫會一直將一個 bucket (FileGroup)的 base 文件寫到設(shè)定的閾值大小才會劃分新的 bucket;Merge On Read 寫在同一個 bucket 中,log file 也是一直 append 直到大小超過設(shè)定的閾值 roll over。?對 UPDATE 和 DELETE 的支持非常高效,一條 record 的整個生命周期操作都發(fā)生在同一個 bucket,不僅減少小文件數(shù)量,也提升了數(shù)據(jù)讀取的效率(不必要的 join 和 merge)。

          0.8.0 的 HUDI Flink 支持了 streaming 消費(fèi) HUDI 表,在后續(xù)版本還會支持 watermark 機(jī)制,讓 HUDI Flink 承擔(dān) streaming ETL pipeline 的中間層,成為數(shù)據(jù)湖/倉建設(shè)中流批一體的中間計算層。

          瀏覽 183
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲精品乱码久久久久久蜜桃欧美 | 国产精品麻豆 | 被操高潮视频 | 黄色片在线观看视频 | 麻豆影院91免费版 |