<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Hudi 實踐 | 順豐科技 Hudi on Flink 實時數(shù)倉實踐

          共 5597字,需瀏覽 12分鐘

           ·

          2021-10-01 02:11

          ▼ 關(guān)注「Flink 中文社區(qū)」,獲取更多技術(shù)干貨 

          摘要:本文作者劉杰,介紹了順豐科技數(shù)倉的架構(gòu),趟過的一些問題、使用 Hudi 來優(yōu)化整個 job 狀態(tài)的實踐細節(jié),以及未來的一些規(guī)劃。主要內(nèi)容為:


          1. 數(shù)倉架構(gòu)
          2. Hudi 代碼躺過的坑
          3. 狀態(tài)優(yōu)化
          4. 未來規(guī)劃

          Tips:「閱讀原文」即可查看更多技術(shù)文章

           GitHub 地址 
          歡迎大家給 Flink 點贊送 star~


          順豐科技早在 2019 年引入 Hudi ,當(dāng)時是基于 Spark 批處理,2020 年對數(shù)據(jù)的實時性要求更高公司對架構(gòu)進行了升級,在社區(qū) Hudi on Flink 的半成品上持續(xù)優(yōu)化實現(xiàn) Binlog 數(shù)據(jù) CDC 入湖。在 Hudi 社區(qū)飛速發(fā)展的同時公司今年對數(shù)倉也提出了新的要求,最終采用 Flink + Hudi 的方式來寬表的實時化。過程中遇到了很多問題主要有兩點:


          1. Hudi Master 代碼當(dāng)時存在一些漏洞;


          2. 寬表涉及到多個 Join,Top One 等操作使得狀態(tài)很大。


          慶幸的是社區(qū)的修復(fù)速度很給力加上 Hudi 強大 upsert 能力使這兩個問題得到以有效的解決。


          一、數(shù)倉架構(gòu)


          感興趣的同學(xué)可以參考之前順豐分享的《Hudi on Flink 在順豐的實踐應(yīng)用》

          ▼ 文章點擊 ▼

          Hudi on Flink 在順豐的實踐應(yīng)用

          二、Hudi 代碼趟過的坑


          在去年我們是基于 Hudi  0.6 左右進行的 Hudi on Flink 的實踐,代碼較老。為了擁抱社區(qū)我們使用最新 master 代碼進行實踐,在大數(shù)據(jù)量寫入場景中,發(fā)現(xiàn)了一個比較隱秘的丟數(shù)問題,這個問題花了將近兩周的時間才定位到。


          1. Hudi StreamWriteFunction 算子核心流程梳理



          StreamWriteFunction算子收數(shù)據(jù)的時候會先把數(shù)據(jù)按照 fileld 分組緩存好,數(shù)據(jù)的持續(xù)流會使得緩存數(shù)據(jù)越來越大,當(dāng)達到一定閾值時便會執(zhí)行 flush。閾值由 2 個核心參數(shù)控制write.batch.size 默認 64Mwrite.task.max.size 默認 1G 。當(dāng)單個分組數(shù)據(jù)達到 64M 或者總緩存數(shù)據(jù)達到 800M ~ 1G 就會觸發(fā) flush 。


          flush 會調(diào)用 client 的 api 去創(chuàng)建一個 WriteHandle,然后把 WriteHandle 放入 Map 進行緩存,一個 handle 可以理解為對應(yīng)一個文件的 cow。


          如果一個 fileld 在同一 checkpoint 期間被多次寫入,則后一次是基于前一次的 cow, 它的 handle 是一個FlinkMergeAndReplaceHandle,判斷一個 fileld 是否之前被寫入過就是根據(jù)上面 Map 緩存得來的。


          StreamWriteFunction執(zhí)行 snapshotState 時會把內(nèi)存的所有分組數(shù)據(jù)一次進行 flush, 之后對 client 的 handle 進行清空。


          2. 場景還原


          Hudi 本身是具備 upsert 能力的,所以我們開始認為 Hudi Sink 在 At Least Once 模式下是沒問題的,并且 At Least Once 模式下 Flink 算子不需要等待 Barrier 對齊,能夠處理先到的數(shù)據(jù)使得處理速度更快,于是我們在 Copy On Write 場景中對 Flink CheckpointingMode 設(shè)置了 AT_LEAST_ONCE。


          writeFunction 的上游是文件 BucketAssignFunction fileld 分配算子,假如有一批 insert 數(shù)據(jù) A、B、C、D 屬于同一個分區(qū)并且分配到同一個BucketAssignFunction 的 subtask ,但是 A、B 和 C、D 是相鄰兩個不同的 checkpoint。


          當(dāng) A 進入BucketAssignFunction 時如果發(fā)現(xiàn)沒有新的小文件可以使用,就會創(chuàng)建一個新的 fileld f0,當(dāng) B 流入時也會給他分配到 f0 上。同時因為是 AT_LEAST_ONCE 模式,C、D 數(shù)據(jù)都有可能被處理到也被分配到了 f0 上。也就是說 在 AT_LEAST_ONCE 模式下由于 C、D 數(shù)據(jù)被提前處理,導(dǎo)致 A、B、C、D 4 條屬于兩個 checkpoint 的 insert 數(shù)據(jù)被分配到了同一個 fileld。


          writeFunction 有可能當(dāng)接收到 A、B、C 后這個算子的 barrier 就對齊了,會把 A、B、C 進行 flush,而 D 將被遺留到下一個 checkpoint 才處理。A、B、C 是 insert 數(shù)據(jù)所以就會直接創(chuàng)建一個文件寫入,D 屬于下一個 checkpoint ,A、B、C 寫入時創(chuàng)建的 handle 已被清理了,等到下一個 checkpoint 執(zhí)行 flush。因為 D 也是 insert 數(shù)據(jù)所以也會直接創(chuàng)建一個文件寫數(shù)據(jù),但是 A、B、C、D 的 fileld 是一樣的,導(dǎo)致最終 D 創(chuàng)建的文件覆蓋了 A、B、C 寫入的文件最終導(dǎo)致 A、B、C 數(shù)據(jù)丟失。



          3. 問題定位


          這個問題之所以難定位是因為具有一定隨機性,每次丟失的數(shù)據(jù)都不太一樣,而且小數(shù)據(jù)量不易出現(xiàn)。最終通過開啟 Flink 的 Queryable State 進行查詢, 查找丟失數(shù)據(jù)的定位到 fileld, 發(fā)現(xiàn) ABCD state  的 instant 都是 I,然后解析對應(yīng) fileld 的所有版本進行跟蹤還原。

          三、狀態(tài)優(yōu)化


          我們對線上最大的離線寬邊進行了實時化的,寬表字段較多,涉及到多個表對主表的 left join 還包括一些 Top One 的計算,這些算子都會占用 state.  而我們的數(shù)據(jù)周期較長需要保存 180 天數(shù)據(jù)。估算下來狀態(tài)大小將會達到上百 T,這無疑會對狀態(tài)的持久化帶來很大的壓力。但是這些操作放入 Hudi 來做就顯得輕而易舉。

          1. Top One 下沉 Hudi


          在 Hudi 中有一個write.precombine.field 配置項用來指定使用某個字段對 flush 的數(shù)據(jù)去重,當(dāng)出現(xiàn)多條數(shù)據(jù)需要去重時就會按照整個字段進行比較,保留最大的那條記錄,這其實和 Top One 很像。


          我們在 SQL 上將 Top One 的排序邏輯組合成了一個字段設(shè)置為 Hudi 的 write.precombine.field,同時把這個字段寫入 state,同一 key 的數(shù)據(jù)多次進來時都會和 state 的 write.precombine.field 進行比較更新。


          Flink Top One 的 state 默認是保存整記錄的所有字段,但是我們只保存了一個字段,大大節(jié)省了 state 的大小。


          2. 多表 Left Join 下沉 Hudi


          ■ 2.1 Flink SQL join


          我們把這個場景簡化成如下一個案例,假如有寬表 t_p 由三張表組成

          insert into t_p select     t0.id,t0.name,    t1.age,               t2.sex from t0     left join t1 on t0.id = t1.id     left join t2 on t0.id = t2.id

          在 Flink SQL join 算子內(nèi)部會維護一個左表和右表的 state,這都是每個 table 的全字段,且多一次 join 就會多出一個 state. 最終導(dǎo)致 state 大小膨脹,如果 join 算子上游是一個 append 流,state 大小膨脹的效果更明顯。

          ■ 2.2 把 Join 改寫成 Union All


          對于上面案例每次 left join 只是補充了幾個字段,我們想到用 union all 的方式進行 SQL 改寫,union all 需要補齊所有字段,缺的字段用 null 補。我們認為 null 補充的字段不是有效字段。改成從 union all 之后要求 Hudi 具備局部更新的能力才能達到 join 的效果。


          • 當(dāng)收到的數(shù)據(jù)是來自 t0 的時候就只更新 id 和 name 字段;


          • 同理 ,數(shù)據(jù)是來自 t1 的時候就只更新 age 字段;


          • t2 只更新 sex 字段。


          不幸的是 Hudi 的默認實現(xiàn)是全字段覆蓋,也就是說當(dāng)收到 t0 的數(shù)據(jù)時會把 age sex 覆蓋成 null, 收到 t1 數(shù)據(jù)時會把 name sex 覆蓋成 null。這顯然是不可接受的。這就要求我們對 Hudi sink 進行改造。

          ■ 2.3 Hudi  Union All 實現(xiàn)


          Hudi 在 cow 模式每條記錄的更新寫入都是對舊數(shù)據(jù)進行 copy 覆蓋寫入,似乎只要知道這條記錄來自哪個表,哪幾個字段是有效的字段就選擇性的對 copy 出來的字段進行覆蓋即可。但是在分區(qū)變更的場景中就不是那么好使了。在分區(qū)變更的場景中,數(shù)據(jù)從一個分區(qū)變到另一個分區(qū)的邏輯是把舊分區(qū)數(shù)據(jù)刪掉,往新分區(qū)新增數(shù)據(jù)。這可能會把一些之前局部更新的字段信息丟失掉。細聊下來 Hudi on Flink 涉及到由幾個核心算子組成 pipeline。


          • RowDataToHoodieFunction:這是對收入的數(shù)據(jù)進行轉(zhuǎn)化成一個 HudiRecord,收到數(shù)據(jù)是包含全字段的,我們在轉(zhuǎn)化 HudiRecord 的時候只選擇了有效字段進行轉(zhuǎn)化。

          • BoostrapFunction:在任務(wù)恢復(fù)的時候會讀取文件加載索引數(shù)據(jù),當(dāng)任務(wù)恢復(fù)后次算子不做數(shù)據(jù)轉(zhuǎn)化處理。

          • BucketAssignFunction:這個算子用來對記錄分配 location,loaction 包含兩部分信息。一是分區(qū)目錄,另一個是 fileld。fileld 用來標(biāo)識記錄將寫入哪個文件,一旦記錄被確定寫入哪個文件,就會發(fā)記錄按照 fileld 分組發(fā)送到 StreamWriteFunction,StreamWriteFunction 再按文件進行批量寫入。

          原生的 BucketAssignFunction 的算子邏輯如下圖,當(dāng)收到一條記錄時會先從 state 里面進行查找是否之前有寫過這條記錄,如果有就會找對應(yīng)的 location。如果分區(qū)沒有發(fā)生變更,就把當(dāng)前這條記錄也分配給這個location,如果在 state 中沒有找到 location 就會新創(chuàng)建一個 location,把這個新的location 分配給當(dāng)前記錄,并更新到 state。

          總之這個 state 存儲的 location 就是告訴當(dāng)前記錄應(yīng)該從哪個文件進行更新或者寫入。遇到分區(qū)變更的場景會復(fù)雜一點。假如一條記錄從 2020 分區(qū)變更成了 2021,就會創(chuàng)建一條刪除的記錄,它的 loaction 是 state 中的 location。這條記錄讓下游進行實際的刪除操作,然后再創(chuàng)建一個新的 location (分區(qū)是 2021) 發(fā)送到下游進行 insert。


          為了在 Hudi 中實現(xiàn) top one,我們對 state 信息進行了擴展,用來做 Top One 時間字段。


          對于 StreamWriteFunction 在 Insert 場景中,假如收到了如下 3 條數(shù)據(jù) {id:1,name:zs},{id:1,age:20},{id:1,sex:man},在執(zhí)行 flush 時會創(chuàng)建一個全字段的空記錄 {id:null,name:null,age:null,sex:null},然后依次和 3 條記錄進行合并。注意,這個合并過程只會選擇有效字段的合并。如下圖:


          在 Update 場景中的更新邏輯類似 insert 場景,假如老數(shù)據(jù)是 {id:1,name:zs,age:20,sex:man} ,新收到了{id:1,name:ls},{id:1,age:30} 這 2 條數(shù)據(jù),就會先從文件中把老的數(shù)據(jù)讀出來,然后依次和新收到的數(shù)據(jù)進行合并,合并步驟同 insert。如下圖:



          這樣通過 union all 的方式達到了 left join 的效果,大大節(jié)省了 state 的大小。


          四、未來規(guī)劃


          parquet 元數(shù)據(jù)信息收集,parquet 文件可以從 footer 里面得到每個行列的最大最小等信息,我們計劃在寫入文件的后把這些信息收集起來,并且基于上一次的 commit  的元數(shù)據(jù)信息進行合并,生成一個包含所有文件的元數(shù)據(jù)文件,這樣可以在讀取數(shù)據(jù)時進行謂詞下推進行文件的過濾。


          公司致力于打造基于 Hudi 作為底層存儲,F(xiàn)link 作為流批一體化的 SQL 計算引擎,F(xiàn)link 的批處理 Hudi 這塊還涉足不深,未來可能會計劃用 Flink 對 Hudi 實現(xiàn) clustering 等功能,在 Flink 引擎上完善 Hudi 的批處理功能。




          熱點推薦




          更多 Flink 相關(guān)技術(shù)問題,可掃碼加入社區(qū)釘釘交流群~

             戳我,查看更多技術(shù)文章!

          瀏覽 37
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  九九九精品在线 | 天天操妞网 | 婷婷五月天综合导航 | 骚资源| 亚洲,日韩,aⅴ在线欧美 |