Hudi 實踐 | 順豐科技 Hudi on Flink 實時數(shù)倉實踐
摘要:本文作者劉杰,介紹了順豐科技數(shù)倉的架構(gòu),趟過的一些問題、使用 Hudi 來優(yōu)化整個 job 狀態(tài)的實踐細節(jié),以及未來的一些規(guī)劃。主要內(nèi)容為:
數(shù)倉架構(gòu) Hudi 代碼躺過的坑 狀態(tài)優(yōu)化 未來規(guī)劃
GitHub 地址 
順豐科技早在 2019 年引入 Hudi ,當(dāng)時是基于 Spark 批處理,2020 年對數(shù)據(jù)的實時性要求更高公司對架構(gòu)進行了升級,在社區(qū) Hudi on Flink 的半成品上持續(xù)優(yōu)化實現(xiàn) Binlog 數(shù)據(jù) CDC 入湖。在 Hudi 社區(qū)飛速發(fā)展的同時公司今年對數(shù)倉也提出了新的要求,最終采用 Flink + Hudi 的方式來寬表的實時化。過程中遇到了很多問題主要有兩點:
Hudi Master 代碼當(dāng)時存在一些漏洞;
寬表涉及到多個 Join,Top One 等操作使得狀態(tài)很大。
慶幸的是社區(qū)的修復(fù)速度很給力加上 Hudi 強大 upsert 能力使這兩個問題得到以有效的解決。
一、數(shù)倉架構(gòu)
二、Hudi 代碼趟過的坑
1. Hudi StreamWriteFunction 算子核心流程梳理

StreamWriteFunction算子收數(shù)據(jù)的時候會先把數(shù)據(jù)按照 fileld 分組緩存好,數(shù)據(jù)的持續(xù)流會使得緩存數(shù)據(jù)越來越大,當(dāng)達到一定閾值時便會執(zhí)行 flush。閾值由 2 個核心參數(shù)控制:write.batch.size 默認 64M ,write.task.max.size 默認 1G 。當(dāng)單個分組數(shù)據(jù)達到 64M 或者總緩存數(shù)據(jù)達到 800M ~ 1G 就會觸發(fā) flush 。
flush 會調(diào)用 client 的 api 去創(chuàng)建一個 WriteHandle,然后把 WriteHandle 放入 Map 進行緩存,一個 handle 可以理解為對應(yīng)一個文件的 cow。
如果一個 fileld 在同一 checkpoint 期間被多次寫入,則后一次是基于前一次的 cow, 它的 handle 是一個FlinkMergeAndReplaceHandle,判斷一個 fileld 是否之前被寫入過就是根據(jù)上面 Map 緩存得來的。
StreamWriteFunction執(zhí)行 snapshotState 時會把內(nèi)存的所有分組數(shù)據(jù)一次進行 flush, 之后對 client 的 handle 進行清空。
2. 場景還原
Hudi 本身是具備 upsert 能力的,所以我們開始認為 Hudi Sink 在 At Least Once 模式下是沒問題的,并且 At Least Once 模式下 Flink 算子不需要等待 Barrier 對齊,能夠處理先到的數(shù)據(jù)使得處理速度更快,于是我們在 Copy On Write 場景中對 Flink CheckpointingMode 設(shè)置了 AT_LEAST_ONCE。
writeFunction 的上游是文件 BucketAssignFunction fileld 分配算子,假如有一批 insert 數(shù)據(jù) A、B、C、D 屬于同一個分區(qū)并且分配到同一個BucketAssignFunction 的 subtask ,但是 A、B 和 C、D 是相鄰兩個不同的 checkpoint。
當(dāng) A 進入BucketAssignFunction 時如果發(fā)現(xiàn)沒有新的小文件可以使用,就會創(chuàng)建一個新的 fileld f0,當(dāng) B 流入時也會給他分配到 f0 上。同時因為是 AT_LEAST_ONCE 模式,C、D 數(shù)據(jù)都有可能被處理到也被分配到了 f0 上。也就是說 在 AT_LEAST_ONCE 模式下由于 C、D 數(shù)據(jù)被提前處理,導(dǎo)致 A、B、C、D 4 條屬于兩個 checkpoint 的 insert 數(shù)據(jù)被分配到了同一個 fileld。
writeFunction 有可能當(dāng)接收到 A、B、C 后這個算子的 barrier 就對齊了,會把 A、B、C 進行 flush,而 D 將被遺留到下一個 checkpoint 才處理。A、B、C 是 insert 數(shù)據(jù)所以就會直接創(chuàng)建一個文件寫入,D 屬于下一個 checkpoint ,A、B、C 寫入時創(chuàng)建的 handle 已被清理了,等到下一個 checkpoint 執(zhí)行 flush。因為 D 也是 insert 數(shù)據(jù)所以也會直接創(chuàng)建一個文件寫數(shù)據(jù),但是 A、B、C、D 的 fileld 是一樣的,導(dǎo)致最終 D 創(chuàng)建的文件覆蓋了 A、B、C 寫入的文件最終導(dǎo)致 A、B、C 數(shù)據(jù)丟失。

3. 問題定位
三、狀態(tài)優(yōu)化
1. Top One 下沉 Hudi
在 Hudi 中有一個write.precombine.field 配置項用來指定使用某個字段對 flush 的數(shù)據(jù)去重,當(dāng)出現(xiàn)多條數(shù)據(jù)需要去重時就會按照整個字段進行比較,保留最大的那條記錄,這其實和 Top One 很像。
我們在 SQL 上將 Top One 的排序邏輯組合成了一個字段設(shè)置為 Hudi 的 write.precombine.field,同時把這個字段寫入 state,同一 key 的數(shù)據(jù)多次進來時都會和 state 的 write.precombine.field 進行比較更新。
Flink Top One 的 state 默認是保存整記錄的所有字段,但是我們只保存了一個字段,大大節(jié)省了 state 的大小。
2. 多表 Left Join 下沉 Hudi
■ 2.1 Flink SQL join
insert into t_pselectt0.id,t0.name,t1.age,t2.sexfrom t0left join t1 on t0.id = t1.idleft join t2 on t0.id = t2.id
■ 2.2 把 Join 改寫成 Union All
對于上面案例每次 left join 只是補充了幾個字段,我們想到用 union all 的方式進行 SQL 改寫,union all 需要補齊所有字段,缺的字段用 null 補。我們認為 null 補充的字段不是有效字段。改成從 union all 之后要求 Hudi 具備局部更新的能力才能達到 join 的效果。
當(dāng)收到的數(shù)據(jù)是來自 t0 的時候就只更新 id 和 name 字段;
同理 ,數(shù)據(jù)是來自 t1 的時候就只更新 age 字段;
t2 只更新 sex 字段。
■ 2.3 Hudi Union All 實現(xiàn)

RowDataToHoodieFunction:這是對收入的數(shù)據(jù)進行轉(zhuǎn)化成一個 HudiRecord,收到數(shù)據(jù)是包含全字段的,我們在轉(zhuǎn)化 HudiRecord 的時候只選擇了有效字段進行轉(zhuǎn)化。
BoostrapFunction:在任務(wù)恢復(fù)的時候會讀取文件加載索引數(shù)據(jù),當(dāng)任務(wù)恢復(fù)后次算子不做數(shù)據(jù)轉(zhuǎn)化處理。
BucketAssignFunction:這個算子用來對記錄分配 location,loaction 包含兩部分信息。一是分區(qū)目錄,另一個是 fileld。fileld 用來標(biāo)識記錄將寫入哪個文件,一旦記錄被確定寫入哪個文件,就會發(fā)記錄按照 fileld 分組發(fā)送到 StreamWriteFunction,StreamWriteFunction 再按文件進行批量寫入。

為了在 Hudi 中實現(xiàn) top one,我們對 state 信息進行了擴展,用來做 Top One 時間字段。

在 Update 場景中的更新邏輯類似 insert 場景,假如老數(shù)據(jù)是 {id:1,name:zs,age:20,sex:man} ,新收到了{id:1,name:ls},{id:1,age:30} 這 2 條數(shù)據(jù),就會先從文件中把老的數(shù)據(jù)讀出來,然后依次和新收到的數(shù)據(jù)進行合并,合并步驟同 insert。如下圖:

這樣通過 union all 的方式達到了 left join 的效果,大大節(jié)省了 state 的大小。
四、未來規(guī)劃
parquet 元數(shù)據(jù)信息收集,parquet 文件可以從 footer 里面得到每個行列的最大最小等信息,我們計劃在寫入文件的后把這些信息收集起來,并且基于上一次的 commit 的元數(shù)據(jù)信息進行合并,生成一個包含所有文件的元數(shù)據(jù)文件,這樣可以在讀取數(shù)據(jù)時進行謂詞下推進行文件的過濾。
公司致力于打造基于 Hudi 作為底層存儲,F(xiàn)link 作為流批一體化的 SQL 計算引擎,F(xiàn)link 的批處理 Hudi 這塊還涉足不深,未來可能會計劃用 Flink 對 Hudi 實現(xiàn) clustering 等功能,在 Flink 引擎上完善 Hudi 的批處理功能。
戳我,查看更多技術(shù)文章!
