<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Iceberg 實(shí)踐 | 汽車之家:基于 Flink + Iceberg 的湖倉一體架構(gòu)實(shí)踐

          共 5994字,需瀏覽 12分鐘

           ·

          2021-06-12 01:02

          摘要:由汽車之家實(shí)時(shí)計(jì)算平臺(tái)負(fù)責(zé)人邸星星在 4 月 17 日上海站 Meetup 分享的,基于 Flink + Iceberg 的湖倉一體架構(gòu)實(shí)踐,內(nèi)容包括:


          1. 數(shù)據(jù)倉庫架構(gòu)升級(jí)的背景

          2. 基于 Iceberg 的湖倉一體架構(gòu)實(shí)踐

          3. 總結(jié)與收益

          4. 后續(xù)規(guī)劃


          Tips:點(diǎn)擊文閱讀原文即可查看原文視頻~

           GitHub 地址 
          https://github.com/apache/flink
          歡迎大家給 Flink 點(diǎn)贊送 star~

          一、數(shù)據(jù)倉庫架構(gòu)升級(jí)的背景


          1. 基于 Hive 的數(shù)據(jù)倉庫的痛點(diǎn)


          原有的數(shù)據(jù)倉庫完全基于 Hive 建造而成,主要存在三大痛點(diǎn):


          痛點(diǎn)一:不支持 ACID


          1)不支持 Upsert 場景;


          2)不支持 Row-level delete,數(shù)據(jù)修正成本高。


          痛點(diǎn)二:時(shí)效性難以提升


          1)數(shù)據(jù)難以做到準(zhǔn)實(shí)時(shí)可見;


          2)無法增量讀取,無法實(shí)現(xiàn)存儲(chǔ)層面的流批統(tǒng)一;


          3)無法支持分鐘級(jí)延遲的數(shù)據(jù)分析場景。


          痛點(diǎn)三:Table Evolution


          1)寫入型 Schema,對(duì) Schema 變更支持不好;


          2)Partition Spec 變更支持不友好。


          2. Iceberg 關(guān)鍵特性


          Iceberg 主要有四大關(guān)鍵特性:支持 ACID 語義、增量快照機(jī)制、開放的表格式和流批接口支持。


          • 支持 ACID 語義


            • 不會(huì)讀到不完整的 Commit;

            • 基于樂觀鎖支持并發(fā) Commit;

            • Row-level delete,支持 Upsert。


          • 增量快照機(jī)制


            • Commit 后數(shù)據(jù)即可見(分鐘級(jí));

            • 可回溯歷史快照。


          • 開放的表格式


            • 數(shù)據(jù)格式:parquet、orc、avro

            • 計(jì)算引擎:Spark、Flink、Hive、Trino/Presto


          • 流批接口支持


            • 支持流、批寫入;

            • 支持流、批讀取。


          二、基于 Iceberg 的湖倉一體架構(gòu)實(shí)踐


          湖倉一體的意義就是說我不需要看見湖和倉,數(shù)據(jù)有著打通的元數(shù)據(jù)的格式,它可以自由的流動(dòng),也可以對(duì)接上層多樣化的計(jì)算生態(tài)。

          ——賈揚(yáng)清(阿里云計(jì)算平臺(tái)高級(jí)研究員)


          1. Append 流入湖的鏈路



          上圖為日志類數(shù)據(jù)入湖的鏈路,日志類數(shù)據(jù)包含客戶端日志、用戶端日志以及服務(wù)端日志。這些日志數(shù)據(jù)會(huì)實(shí)時(shí)錄入到 Kafka,然后通過 Flink 任務(wù)寫到 Iceberg 里面,最終存儲(chǔ)到 HDFS。


          2. Flink SQL 入湖鏈路打通


          我們的 Flink SQL 入湖鏈路打通是基于 “Flink 1.11 + Iceberg 0.11” 完成的,對(duì)接 Iceberg Catalog 我們主要做了以下內(nèi)容:


          1)Meta Server 增加對(duì) Iceberg Catalog 的支持;


          2)SQL SDK 增加 Iceberg Catalog 支持。


          然后在這基礎(chǔ)上,平臺(tái)開放 Iceberg 表的管理功能,使得用戶可以自己在平臺(tái)上建 SQL 的表。


          3. 入湖 - 支持代理用戶


          第二步是內(nèi)部的實(shí)踐,對(duì)接現(xiàn)有預(yù)算體系、權(quán)限體系。


          因?yàn)橹捌脚_(tái)做實(shí)時(shí)作業(yè)的時(shí)候,平臺(tái)都是默認(rèn)為 Flink 用戶去運(yùn)行的,之前存儲(chǔ)不涉及 HDFS 存儲(chǔ),因此可能沒有什么問題,也就沒有思考預(yù)算劃分方面的問題。


          但是現(xiàn)在寫 Iceberg 的話,可能就會(huì)涉及一些問題。比如數(shù)倉團(tuán)隊(duì)有自己的集市,數(shù)據(jù)就應(yīng)該寫到他們的目錄下面,預(yù)算也是劃到他們的預(yù)算下,同時(shí)權(quán)限和離線團(tuán)隊(duì)賬號(hào)的體系打通。



          如上所示,這塊主要是在平臺(tái)上做了代理用戶的功能,用戶可以去指定用哪個(gè)賬號(hào)去把這個(gè)數(shù)據(jù)寫到 Iceberg 里面,實(shí)現(xiàn)過程主要有以下三個(gè)。


          • 增加 Table 級(jí)別配置:'iceberg.user.proxy' = 'targetUser’


          1)啟用 Superuser
          2)團(tuán)隊(duì)賬號(hào)鑒權(quán)


          • 訪問 HDFS 時(shí)啟用代理用戶:



          • 訪問 Hive Metastore 時(shí)指定代理用戶

            1)參考 Spark 的相關(guān)實(shí)現(xiàn):

            org.apache.spark.deploy.security.HiveDelegationTokenProvider

            2)動(dòng)態(tài)代理 HiveMetaStoreClient,使用代理用戶訪問 Hive metastore


          4. Flink SQL 入湖示例


          DDL + DML



          5. CDC 數(shù)據(jù)入湖鏈路



          如上所示,我們有一個(gè) AutoDTS 平臺(tái),負(fù)責(zé)業(yè)務(wù)庫數(shù)據(jù)的實(shí)時(shí)接入。我們會(huì)把這些業(yè)務(wù)庫的數(shù)據(jù)接入到 Kafka 里面,同時(shí)它還支持在平臺(tái)上配置分發(fā)任務(wù),相當(dāng)于把進(jìn) Kafka 的數(shù)據(jù)分發(fā)到不同的存儲(chǔ)引擎里,在這個(gè)場景下是分發(fā)到 Iceberg 里。


          6. Flink SQL CDC 入湖鏈路打通


          下面是我們基于 “Flink1.11 + Iceberg 0.11” 支持 CDC 入湖所做的改動(dòng):


          • 改進(jìn) Iceberg Sink:

            Flink 1.11 版本為 AppendStreamTableSink,無法處理 CDC 流,修改并適配。


          • 表管理

            1)支持 Primary key(PR1978)

            2)開啟 V2 版本:'iceberg.format.version' = '2'


          7. CDC 數(shù)據(jù)入湖


          ■ 1. 支持 Bucket


          Upsert 場景下,需要確保同一條數(shù)據(jù)寫入到同一 Bucket 下,這又如何實(shí)現(xiàn)?

          目前 Flink SQL 語法不支持聲明 bucket 分區(qū),通過配置的方式聲明 Bucket:

          'partition.bucket.source'='id', // 指定 bucket 字段

          'partition.bucket.num'='10',   // 指定 bucket 數(shù)量


          ■ 2. Copy-on-write sink


          做 Copy-on-Write 的原因是原本社區(qū)的 Merge-on-Read 不支持合并小文件,所以我們臨時(shí)去做了 Copy-on-write sink 的實(shí)現(xiàn)。目前業(yè)務(wù)一直在測試使用,效果良好。



          上方為 Copy-on-Write 的實(shí)現(xiàn),其實(shí)跟原來的 Merge-on-Read 比較類似,也是有 StreamWriter 多并行度寫入和 FileCommitter 單并行度順序提交


          在 Copy-on-Write 里面,需要根據(jù)表的數(shù)據(jù)量合理設(shè)置 Bucket 數(shù),無需額外做小文件合并。


          • StreamWriter 在 snapshotState 階段多并行度寫入

            1)增加 Buffer;

            2)寫入前需要判斷上次 checkpoint 已經(jīng) commit 成功;

            3)按 bucket 分組、合并,逐個(gè) Bucket 寫入。


          • FileCommitter 單并行度順序提交

            1)table.newOverwrite()

            2)Flink.last.committed.checkpoint.id



          8. 示例 - CDC 數(shù)據(jù)配置入湖



          如上圖所示,在實(shí)際使用中,業(yè)務(wù)方可以在 DTS 平臺(tái)上創(chuàng)建或配置分發(fā)任務(wù)即可。


          實(shí)例類型選擇 Iceberg 表,然后選擇目標(biāo)庫,表明要把哪個(gè)表的數(shù)據(jù)同步到 Iceberg 里,然后可以選原表和目標(biāo)表的字段的映射關(guān)系是什么樣的,配置之后就可以啟動(dòng)分發(fā)任務(wù)。啟動(dòng)之后,會(huì)在實(shí)時(shí)計(jì)算平臺(tái) Flink 里面提交一個(gè)實(shí)時(shí)任務(wù),接著用 Copy-on-write sink 去實(shí)時(shí)地把數(shù)據(jù)寫到 Iceberg 表里面。



          9. 入湖其他實(shí)踐


          實(shí)踐一:減少 empty commit


          • 問題描述:

            在上游 Kafka 長期沒有數(shù)據(jù)的情況下,每次 Checkpoint 依舊會(huì)生成新的 Snapshot,導(dǎo)致大量的空文件和不必要的 Snapshot。


          • 解決方案(PR - 2042):

            增加配置 Flink.max-continuousempty-commits,在連續(xù)指定次數(shù) Checkpoint 都沒有數(shù)據(jù)后才真正觸發(fā) Commit,生成 Snapshot。


          實(shí)踐二:記錄 watermark


          • 問題描述:

            目前 Iceberg 表本身無法直接反映數(shù)據(jù)寫入的進(jìn)度,離線調(diào)度難以精準(zhǔn)觸發(fā)下游任務(wù)。


          • 解決方案( PR - 2109 ):

            在 Commit 階段將 Flink 的 Watermark 記錄到 Iceberg 表的 Properties 中,可直觀的反映端到端的延遲情況,同時(shí)可以用來判斷分區(qū)數(shù)據(jù)完整性,用于調(diào)度觸發(fā)下游任務(wù)。


          實(shí)踐三:刪表優(yōu)化


          • 問題描述:

            刪除 Iceberg 可能會(huì)很慢,導(dǎo)致平臺(tái)接口相應(yīng)超時(shí)。因?yàn)?Iceberg 是面向?qū)ο蟠鎯?chǔ)來抽象 IO 層的,沒有快速清除目錄的方法。

          • 解決方案:

            擴(kuò)展 FileIO,增加 deleteDir 方法,在 HDFS 上快速刪除表數(shù)據(jù)。


          10. 小文件合并及數(shù)據(jù)清理


          定期為每個(gè)表執(zhí)行批處理任務(wù)(spark 3),分為以下三個(gè)步驟:


          1. 定期合并新增分區(qū)的小文件:


          rewriteDataFilesAction.execute(); 僅合并小文件,不會(huì)刪除舊文件。


          2. 刪除過期的 snapshot,清理元數(shù)據(jù)及數(shù)據(jù)文件:


          table.expireSnapshots().expireOld erThan(timestamp).commit();


          3. 清理 orphan 文件,默認(rèn)清理 3 天前,且無法觸及的文件:


          removeOrphanFilesAction.older Than(timestamp).execute();


          11. 計(jì)算引擎 – Flink


          Flink 是實(shí)時(shí)平臺(tái)的核心計(jì)算引擎,目前主要支持?jǐn)?shù)據(jù)入湖場景,主要有以下幾個(gè)方面的特點(diǎn)。


          • 數(shù)據(jù)準(zhǔn)實(shí)時(shí)入湖:

            Flink 和 Iceberg 在數(shù)據(jù)入湖方面集成度最高,F(xiàn)link 社區(qū)主動(dòng)擁抱數(shù)據(jù)湖技術(shù)。


          • 平臺(tái)集成:

            AutoStream 引入 IcebergCatalog,支持通過 SQL 建表、入湖 AutoDTS 支持將 MySQL、SQLServer、TiDB 表配置入湖。


          • 流批一體:

            在流批一體的理念下,F(xiàn)link 的優(yōu)勢會(huì)逐漸體現(xiàn)出來。


          12. 計(jì)算引擎 – Hive


          Hive 在 SQL 批處理層面 Iceberg 和 Spark 3 集成度更高,主要提供以下三個(gè)方面的功能。


          • 定期小文件合并及 meta 信息查詢:

            SELECT * FROM prod.db.table.history 還可查看 snapshots, files, manifests。


          • 離線數(shù)據(jù)寫入:

            1)Insert into

            2)Insert overwrite

            3)Merge into


          • 分析查詢:

            主要支持日常的準(zhǔn)實(shí)時(shí)分析查詢場景。


          13. 計(jì)算引擎 – Trino/Presto


          AutoBI 已經(jīng)和 Presto 集成,用于報(bào)表、分析型查詢場景。


          • Trino

            1)直接將 Iceberg 作為報(bào)表數(shù)據(jù)源

            2)需要增加元數(shù)據(jù)緩存機(jī)制:https://github.com/trinodb/trino/issues/7551


          • Presto

            社區(qū)集成中:https://github.com/prestodb/presto/pull/15836


          14. 踩過的坑


          ■ 1. 訪問 Hive Metastore 異常


          問題描述:HiveConf 的構(gòu)造方法的誤用,導(dǎo)致 Hive 客戶端中聲明的配置被覆蓋,導(dǎo)致訪問 Hive metastore 時(shí)異常。


          解決方案(PR-2075):修復(fù) HiveConf 的構(gòu)造,顯示調(diào)用 addResource 方法,確保配置不會(huì)被覆蓋:hiveConf.addResource(conf);


          ■ 2.Hive metastore 鎖未釋放


          問題描述:“CommitFailedException: Timed out after 181138 ms waiting for lock xxx.” 原因是 hiveMetastoreClient.lock 方法,在未獲得鎖的情況下,也需要顯示 unlock,否則會(huì)導(dǎo)致上面異常。


          解決方案(PR-2263):優(yōu)化 HiveTableOperations#acquireLock 方法,在獲取鎖失敗的情況下顯示調(diào)用 unlock 來釋放鎖。


          ■ 3. 元數(shù)據(jù)文件丟失


          問題描述:Iceberg 表無法訪問,報(bào) “NotFoundException Failed to open input stream for file : xxx.metadata.json”


          解決方案(PR-2328):當(dāng)調(diào)用 Hive metastore 更新 iceberg 表的 metadata_location 超時(shí)后,增加檢查機(jī)制,確認(rèn)元數(shù)據(jù)未保存成功后再刪除元數(shù)據(jù)文件。


          三、收益與總結(jié)


          1. 總結(jié)


          通過對(duì)湖倉一體、流批融合的探索,我們分別做了總結(jié)。


          • 湖倉一體

            1)Iceberg 支持 Hive Metastore;

            2)總體使用上與 Hive 表類似:相同數(shù)據(jù)格式、相同的計(jì)算引擎。


          • 流批融合

            準(zhǔn)實(shí)時(shí)場景下實(shí)現(xiàn)流批統(tǒng)一:同源、同計(jì)算、同存儲(chǔ)。


          2. 業(yè)務(wù)收益


          • 數(shù)據(jù)時(shí)效性提升:

            入倉延遲從 2 小時(shí)以上降低到 10 分鐘以內(nèi);算法核心任務(wù) SLA 提前 2 小時(shí)完成。


          • 準(zhǔn)實(shí)時(shí)的分析查詢:

            結(jié)合 Spark 3 和 Trino,支持準(zhǔn)實(shí)時(shí)的多維分析查詢。


          • 特征工程提效:

            提供準(zhǔn)實(shí)時(shí)的樣本數(shù)據(jù),提高模型訓(xùn)練時(shí)效性。


          • CDC 數(shù)據(jù)準(zhǔn)實(shí)時(shí)入倉:

            可以在數(shù)倉針對(duì)業(yè)務(wù)表做準(zhǔn)實(shí)時(shí)分析查詢。


          3. 架構(gòu)收益 - 準(zhǔn)實(shí)時(shí)數(shù)倉



          上方也提到了,我們支持準(zhǔn)實(shí)時(shí)的入倉和分析,相當(dāng)于是為后續(xù)的準(zhǔn)實(shí)時(shí)數(shù)倉建設(shè)提供了基礎(chǔ)的架構(gòu)驗(yàn)證。準(zhǔn)實(shí)時(shí)數(shù)倉的優(yōu)勢是一次開發(fā)、口徑統(tǒng)一、統(tǒng)一存儲(chǔ),是真正的批流一體。劣勢是實(shí)時(shí)性較差,原來可能是秒級(jí)、毫秒級(jí)的延遲,現(xiàn)在是分鐘級(jí)的數(shù)據(jù)可見性。


          但是在架構(gòu)層面上,這個(gè)意義還是很大的,后續(xù)我們能看到一些希望,可以把整個(gè)原來 “T + 1” 的數(shù)倉,做成準(zhǔn)實(shí)時(shí)的數(shù)倉,提升數(shù)倉整體的數(shù)據(jù)時(shí)效性,然后更好地支持上下游的業(yè)務(wù)。

           

          四、后續(xù)規(guī)劃


          ■ 1. 跟進(jìn) Iceberg 版本


          全面開放 V2 格式,支持 CDC 數(shù)據(jù)的 MOR 入湖。


          ■ 2. 建設(shè)準(zhǔn)實(shí)時(shí)數(shù)倉


          基于 Flink 通過 Data pipeline 模式對(duì)數(shù)倉各層表全面提速。


          ■ 3. 流批一體


          隨著 upsert 功能的逐步完善,持續(xù)探索存儲(chǔ)層面流批一體。


          ■ 4. 多維分析


          基于 Presto/Spark3 輸出準(zhǔn)實(shí)時(shí)多維分析。


          更多 Flink 相關(guān)技術(shù)問題,可掃碼加入社區(qū)釘釘交流群~



          ▼ 關(guān)注「Flink 中文社區(qū)」,獲取更多技術(shù)干貨 


             戳我,查看更多技術(shù)干貨~
          瀏覽 65
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产色婷婷免费视频 | 最新人妻在线 | 激情五月天视频 | 一区二区三区四区免费在线 | 国产黄色电影黄色视频一级片 |