<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink 實(shí)踐 | 使用 Flink Hudi 構(gòu)建流式數(shù)據(jù)湖平臺(tái)

          共 5143字,需瀏覽 11分鐘

           ·

          2022-02-20 14:15

          ▼ 關(guān)注「ApacheFlink」,看更多大咖?

          摘要本文整理自阿里巴巴技術(shù)專家陳玉兆 (玉兆)、阿里巴巴開發(fā)工程師劉大龍 (風(fēng)離) 在 Flink Forward Asia 2021 實(shí)時(shí)數(shù)據(jù)湖專場(chǎng)的演講。主要內(nèi)容包括:

          1. Apache Hudi 101
          2. Flink Hudi Integration
          3. Flink Hudi Use Case
          4. Apache Hudi Roadmap

          Tips:點(diǎn)擊「閱讀原文」查看原文視頻 & 演講PDF~

          一、Apache Hudi 101


          提到數(shù)據(jù)湖,大家都會(huì)有這樣的疑問(wèn),什么是數(shù)據(jù)湖?為什么數(shù)據(jù)湖近兩年熱度很高?數(shù)據(jù)湖其實(shí)不是一個(gè)新的概念,最早的數(shù)據(jù)湖概念在 80 年代就已經(jīng)提出,當(dāng)時(shí)對(duì)數(shù)據(jù)湖的定義是原始數(shù)據(jù)層,可以存放各種結(jié)構(gòu)化、半結(jié)構(gòu)化甚至非結(jié)構(gòu)化的數(shù)據(jù)。像機(jī)器學(xué)習(xí)、實(shí)時(shí)分析等很多場(chǎng)景都是在查詢的時(shí)候確定數(shù)據(jù)的 Schema。


          湖存儲(chǔ)成本低、靈活性高的特性,非常適用于做查詢場(chǎng)景的中心化存儲(chǔ)。伴隨著近年來(lái)云服務(wù)的興起,尤其是對(duì)象存儲(chǔ)的成熟,越來(lái)越多的企業(yè)選擇在云上構(gòu)建存儲(chǔ)服務(wù)。數(shù)據(jù)湖的存算分離架構(gòu)非常適合當(dāng)前的云服務(wù)架構(gòu),通過(guò)快照隔離的方式,提供基礎(chǔ)的 acid 事務(wù),同時(shí)支持對(duì)接多種分析引擎適配不同的查詢場(chǎng)景,可以說(shuō)湖存儲(chǔ)在成本和開放性上占了極大優(yōu)勢(shì)。


          當(dāng)前的湖存儲(chǔ)已經(jīng)開始承擔(dān)數(shù)倉(cāng)的功能,通過(guò)和計(jì)算引擎對(duì)接實(shí)現(xiàn)湖倉(cāng)一體的架構(gòu)。湖存儲(chǔ)是一種 table format,在原有的 data format 基礎(chǔ)上封裝了 table 的高級(jí)語(yǔ)義。Hudi 從 2016 年開始將數(shù)據(jù)湖投入實(shí)踐,當(dāng)時(shí)是為了解決大數(shù)據(jù)場(chǎng)景下文件系統(tǒng)上的數(shù)據(jù)更新問(wèn)題,Hudi 類 LSM 的 table format 當(dāng)前在湖格式中是獨(dú)樹一幟的,對(duì)近實(shí)時(shí)更新比較友好,語(yǔ)義也相對(duì)完善。

          Table format 是當(dāng)前流行的三種數(shù)據(jù)湖格式的基礎(chǔ)屬性,而 Hudi 從項(xiàng)目之初就一直朝著平臺(tái)方向去演化,擁有比較完善的數(shù)據(jù)治理和 table service,比如用戶在寫入的時(shí)候可以并發(fā)地優(yōu)化文件的布局,metadata table 可以大幅優(yōu)化寫入時(shí)查詢端的文件查找效率。

          下面介紹一些 Hudi 的基礎(chǔ)概念。


          Timeline service 是 Hudi 事務(wù)層的核心抽象,Hudi 所有數(shù)據(jù)操作都是圍繞著 timeline service 來(lái)展開的,每次操作通過(guò) instant 抽象綁定一個(gè)特定的時(shí)間戳,一連串的 instant 構(gòu)成了 timeline service,每一個(gè) instance 記錄了對(duì)應(yīng)的 action 和狀態(tài)。通過(guò) timeline service,Hudi 可以知道當(dāng)前表操作的狀態(tài),通過(guò)一套文件系統(tǒng)視圖的抽象結(jié)合 timeline service,可以對(duì) table 當(dāng)前的 reader 和 writer 暴露特定時(shí)間戳下的文件布局視圖。


          file group 是 Hudi 在文件布局層的核心抽象,每一個(gè) file group 相當(dāng)于一個(gè) bucket,通過(guò)文件大小來(lái)來(lái)劃分,它的每次寫入行為都會(huì)產(chǎn)生一個(gè)新的版本,一個(gè)版本被抽象為一個(gè) file slice,file slice 內(nèi)部維護(hù)了相應(yīng)版本的數(shù)據(jù)文件。當(dāng)一個(gè) file group 寫入到規(guī)定的文件大小的時(shí)候,就會(huì)切換一個(gè)新的 file group。

          Hudi 在 file slice 的寫入行為可以抽象成兩種語(yǔ)義, copy on write 和 merge on read。


          copy on write 每次都會(huì)寫全量數(shù)據(jù),新數(shù)據(jù)會(huì)和上一個(gè) file slice 的數(shù)據(jù) merge,然后再寫一個(gè)新的 file slice,產(chǎn)生一個(gè)新的 bucket 的文件。


          而 merge on read 則比較復(fù)雜一些,它的語(yǔ)義是追加寫入,即每次只寫增量數(shù)據(jù),所以不會(huì)寫新的 file slice。它首先會(huì)嘗試追加之前的 file slice,只有當(dāng)該寫入的 file slice 被納入壓縮計(jì)劃之后,才會(huì)切新的 file slice。

          二、Flink Hudi Integration



          Flink Hudi 的寫入 pipeline 由幾個(gè)算子構(gòu)成。第一個(gè)算子負(fù)責(zé)將 table 層的 rowdata 轉(zhuǎn)換成 Hudi 的消息格式 HudiRecord。接著經(jīng)過(guò)一個(gè) Bucket Assigner,它主要負(fù)責(zé)將已經(jīng)轉(zhuǎn)好的 HudiRecord 分配到特定的 file group 中,接著分好 file group 的 record 會(huì)流入 Writer 算子執(zhí)行真正的文件寫入。最后還有一個(gè) coordinator,負(fù)責(zé) Hudi table 層的 table service 調(diào)度以及新事務(wù)的發(fā)起和提交。此外,還有一些后臺(tái)的清理角色負(fù)責(zé)清理老版本的數(shù)據(jù)。


          當(dāng)前的設(shè)計(jì)中,每一個(gè) bucket assign task 都會(huì)持有一個(gè) bucket assigner,它獨(dú)立維護(hù)自己的一組 file group。在寫入新數(shù)據(jù)或非更新 insert 數(shù)據(jù)的時(shí)候,bucket assign task 會(huì)掃描文件視圖,優(yōu)先將這一批新的數(shù)據(jù)寫入到被判定為小 bucket 的 file group 里。

          比如上圖, file group 默認(rèn)大小是 120M,那么左圖的 task1 會(huì)優(yōu)先寫到 file group1和 file group2,注意這里不會(huì)寫到 file group3,這是因?yàn)?file group3 已經(jīng)有 100M 數(shù)據(jù),對(duì)于比較接近目標(biāo)閾值的 bucket 不再寫入可以避免過(guò)度寫放大。而右圖中的 task2 會(huì)直接寫一個(gè)新的 file group,不會(huì)去追加那些已經(jīng)寫的比較大的 file group 了。


          接下來(lái)介紹 Flink Hudi 寫流程的狀態(tài)切換機(jī)制。作業(yè)剛啟動(dòng)時(shí),coordinator 會(huì)先嘗試去文件系統(tǒng)上新建這張表,如果當(dāng)前表不存在,它就會(huì)去文件目錄上寫一些 meta 信息,也就是構(gòu)建一個(gè)表。收到所有 task 的初始化 meta 信息后,coordinator 會(huì)開啟一個(gè)新的 transaction,write task 看到 transaction 的發(fā)起后,就會(huì)解鎖當(dāng)前數(shù)據(jù)的 flush 行為。

          Write Task 會(huì)先積攢一批數(shù)據(jù),這里有兩種 flush 策略,一種是當(dāng)前的數(shù)據(jù) buffer 達(dá)到了指定的大小,就會(huì)把內(nèi)存中的數(shù)據(jù) flush 出去;另一種是當(dāng)上游的 checkpoint barrier 到達(dá)需要做快照的時(shí)候,會(huì)把所有內(nèi)存中的數(shù)據(jù) flush 到磁盤。每次 flush 數(shù)據(jù)之后都會(huì)把 meta 信息發(fā)送給 coordinator。coordinator 收到 checkpoint 的 success 事件后,會(huì)提交對(duì)應(yīng)的事務(wù),并且發(fā)起下一個(gè)新的事務(wù)。writer task 看到新事務(wù)后,又會(huì)解鎖下一輪事務(wù)的寫入。這樣,整個(gè)寫入流程就串起來(lái)了。


          Flink Hudi Write 提供了非常豐富的寫入場(chǎng)景。當(dāng)前支持對(duì) log 數(shù)據(jù)類型的寫入,即非更新的數(shù)據(jù)類型,同時(shí)支持小文件合并。另外對(duì)于 Hudi 的核心寫入場(chǎng)景比如更新流、CDC 數(shù)據(jù)也都是 Hudi 重點(diǎn)支持的。同時(shí),F(xiàn)link Hudi 還支持歷史數(shù)據(jù)的高效率批量導(dǎo)入,bucket insert 模式可以一次性將比如 Hive 中的離線數(shù)據(jù)或者數(shù)據(jù)庫(kù)中的離線數(shù)據(jù),通過(guò)批量查詢的方式,高效導(dǎo)入 Hudi 格式中。另外,F(xiàn)link Hudi 還提供了全量和增量的索引加載,用戶可以一次性將批量數(shù)據(jù)高效導(dǎo)入湖格式,再通過(guò)對(duì)接流的寫入程序,實(shí)現(xiàn)全量接增量的數(shù)據(jù)導(dǎo)入。


          Flink Hudi read 端也支持了非常豐富的查詢視圖,目前主要支持的有全量讀取、歷史時(shí)間 range 的增量讀取以及流式讀取。


          上圖是一段通過(guò) Flink sql 寫 Hudi 的例子,Hudi 支持的 use case 非常豐富,也盡量簡(jiǎn)化了用戶需要配置的參數(shù)。通過(guò)簡(jiǎn)單配置表 path、 并發(fā)以及 operation type,用戶可以非常方便地將上游的數(shù)據(jù)寫入到 Hudi 格式中。

          三、Flink Hudi Use Case


          下面介紹 Flink Hudi 的經(jīng)典應(yīng)用場(chǎng)景。


          第一個(gè)經(jīng)典場(chǎng)景是 DB 導(dǎo)入數(shù)據(jù)湖。目前 DB 數(shù)據(jù)導(dǎo)入數(shù)據(jù)湖有兩種方式:可以通過(guò) CDC connector 一次性將全量和增量數(shù)據(jù)導(dǎo)入到 Hudi 格式中;也可以通過(guò)消費(fèi) Kafka 上的 CDC changelog,通過(guò) Flink 的 CDC format 將數(shù)據(jù)導(dǎo)入到 Hudi 格式。


          第二個(gè)經(jīng)典場(chǎng)景是流計(jì)算的 ETL (近實(shí)時(shí)的 olap 分析)。通過(guò)對(duì)接上游流計(jì)算簡(jiǎn)單的一些 ETL,比如雙流 join 或雙流 join 接一個(gè) agg,直接將變更流寫入到 Hudi 格式中,然后下游的 read 端可以對(duì)接傳統(tǒng)經(jīng)典的 olap 引擎比如 presto、spark 來(lái)做端到端的近實(shí)時(shí)查詢。


          第三個(gè)經(jīng)典場(chǎng)景和第二個(gè)有些類似, Hudi 支持原生的 changelog,也就是支持保存 Flink 計(jì)算中行級(jí)別的變更。基于這個(gè)能力,通過(guò)流讀消費(fèi)變更的方式,可以實(shí)現(xiàn)端到端的近實(shí)時(shí)的 ETL 生產(chǎn)。


          未來(lái),社區(qū)兩個(gè)大版本主要的精力還是放在流讀和流寫方向,并且會(huì)加強(qiáng)流讀的語(yǔ)義;另外在 catalog 和 metadata 方面會(huì)做自管理;我們還會(huì)在近期推出一個(gè) trino 原生的 connector 支持,取代當(dāng)前讀 Hive 的方式,提高效率。

          四、Apache Hudi Roadmap


          下面是一個(gè) MySql 到 Hudi 千表入湖的演示。


          首先數(shù)據(jù)源這里我們準(zhǔn)備了兩個(gè)庫(kù),benchmark1 和 benchmark2,benchmark1 下面有 100 張表,benchmark2 下面有 1000 張表。因?yàn)榍П砣牒?qiáng)依賴于 catalog,所以我們首先要?jiǎng)?chuàng)建 catalog,對(duì)于數(shù)據(jù)源我們要?jiǎng)?chuàng)建 MySql catalog,對(duì)于目標(biāo)我們要?jiǎng)?chuàng)建 Hudi catalog。MySql catalog 用于獲取所有源表相關(guān)的信息,包括表結(jié)構(gòu)、表的數(shù)據(jù)等。Hudi catalog 用于創(chuàng)建目標(biāo)。


          執(zhí)行兩條 sql 語(yǔ)句以后,兩條 catalog 就創(chuàng)建成功了。


          接下來(lái)到作業(yè)開發(fā)頁(yè)面創(chuàng)建一個(gè)千表入湖的作業(yè)。只需要簡(jiǎn)單的 9 行 SQL,第一種語(yǔ)法是 create database as database,它的作用是把 MySql benchmark1 庫(kù)下所有的表結(jié)構(gòu)和表數(shù)據(jù)一鍵同步到 Hudi CDS demo 庫(kù),表的關(guān)系是一對(duì)一映射。第二條語(yǔ)法是 create table as table,它的作用是把 MySql benchmark2 庫(kù)下所有匹配 sbtest. 正則表達(dá)式的表同步到 Hudi 的 DB1 下的 ctas_dema 表里面,是多對(duì)一的映射關(guān)系,會(huì)做分庫(kù)分表的合并。

          接著我們運(yùn)行并上線,然后到作業(yè)運(yùn)維的頁(yè)面去啟動(dòng)作業(yè),可以看到配置信息已經(jīng)更新了,說(shuō)明已經(jīng)重新上線過(guò)。接著點(diǎn)擊啟動(dòng)按鈕,啟動(dòng)作業(yè)。然后就可以到作業(yè)總覽頁(yè)面查看作業(yè)相關(guān)的狀態(tài)信息。


          上圖是作業(yè)的拓?fù)洌浅?fù)雜,有 1100 張?jiān)幢砗?101 張目標(biāo)表。這里我們做了一些優(yōu)化 —— source merge,把所有的表合并到一個(gè)節(jié)點(diǎn)里,可以在增量 binlog 拉取階段只拉取一次,減輕對(duì) MySql 的壓力。


          接下來(lái)刷新 oss 頁(yè)面,可以看到已經(jīng)多了一個(gè) cdas_demo 路徑,進(jìn)入 subtest1 路徑,可以看到已經(jīng)有元數(shù)據(jù)在寫入,表明數(shù)據(jù)其實(shí)在寫入過(guò)程中。


          再到作業(yè)開發(fā)頁(yè)面寫一個(gè)簡(jiǎn)單的 SQL 查詢某張表,來(lái)驗(yàn)證一下數(shù)據(jù)是否真的在寫入。執(zhí)行上圖 SQL 語(yǔ)句,可以看到數(shù)據(jù)已經(jīng)可以查詢到,這些數(shù)據(jù)與插入的數(shù)據(jù)是一致的。

          我們利用 catalog 提供的元數(shù)據(jù)能力,結(jié)合 CDS 和 CTS 語(yǔ)法,通過(guò)幾行簡(jiǎn)單的 SQL,就能輕松實(shí)現(xiàn)幾千張表的數(shù)據(jù)入湖,極大簡(jiǎn)化了數(shù)據(jù)入湖的流程,降低了開發(fā)運(yùn)維的工作量。

          往期精選


          ▼ 關(guān)注「Apache Flink」,獲取更多技術(shù)干貨?
          更多 Flink 相關(guān)技術(shù)問(wèn)題,可掃碼加入社區(qū)釘釘交流群~

          ???戳我,查看原文視頻&演講PDF~

          瀏覽 132
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩一级黄色电影免费在线播放 | 中文无码短视频 | 欧美日韩A片 | 北条麻妃无码在线 | 国国产毛片 |