<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink + Iceberg 在去哪兒的實(shí)時(shí)數(shù)倉(cāng)實(shí)踐

          共 6305字,需瀏覽 13分鐘

           ·

          2021-06-07 01:46


          摘要:本文介紹去哪兒數(shù)據(jù)平臺(tái)在使用 Flink + Iceberg 0.11 的一些實(shí)踐。內(nèi)容包括:

          1. 背景及痛點(diǎn)

          2. Iceberg 架構(gòu)

          3. 痛點(diǎn)一:Kafka 數(shù)據(jù)丟失

          4. 痛點(diǎn)二:近實(shí)時(shí) Hive 壓力大

          5. Iceberg 優(yōu)化實(shí)踐

          6. 總結(jié)


          Tips:點(diǎn)擊文末閱讀原文即可查看更多技術(shù)干貨

           GitHub 地址 
          https://github.com/apache/flink
          歡迎大家給 Flink 點(diǎn)贊送 star~

          一、背景及痛點(diǎn)



          1. 背景



          我們?cè)谑褂?Flink 做實(shí)時(shí)數(shù)倉(cāng)以及數(shù)據(jù)傳輸過(guò)程中,遇到了一些問(wèn)題:比如 Kafka 數(shù)據(jù)丟失,F(xiàn)link 結(jié)合 Hive 的近實(shí)時(shí)數(shù)倉(cāng)性能等。Iceberg 0.11 的新特性解決了這些業(yè)務(wù)場(chǎng)景碰到的問(wèn)題。對(duì)比 Kafka 來(lái)說(shuō),Iceberg 在某些特定場(chǎng)景有自己的優(yōu)勢(shì),在此我們做了一些基于 Iceberg 的實(shí)踐分享。


          2. 原架構(gòu)方案



          原先的架構(gòu)采用 Kafka 存儲(chǔ)實(shí)時(shí)數(shù)據(jù),其中包括日志、訂單、車票等數(shù)據(jù)。然后用 Flink SQL 或者 Flink datastream 消費(fèi)數(shù)據(jù)進(jìn)行流轉(zhuǎn)。內(nèi)部自研了提交 SQL 和 Datastream 的平臺(tái),通過(guò)該平臺(tái)提交實(shí)時(shí)作業(yè)。


          3. 痛點(diǎn)


          • Kafka 存儲(chǔ)成本高且數(shù)據(jù)量大。Kafka 由于壓力大將數(shù)據(jù)過(guò)期時(shí)間設(shè)置的比較短,當(dāng)數(shù)據(jù)產(chǎn)生反壓,積壓等情況時(shí),如果在一定的時(shí)間內(nèi)沒(méi)消費(fèi)數(shù)據(jù)導(dǎo)致數(shù)據(jù)過(guò)期,會(huì)造成數(shù)據(jù)丟失。




          • Flink 在 Hive 上做了近實(shí)時(shí)的讀寫支持。為了分擔(dān) Kafka 壓力,將一些實(shí)時(shí)性不太高的數(shù)據(jù)放入 Hive,讓 Hive 做分鐘級(jí)的分區(qū)。但是隨著元數(shù)據(jù)不斷增加,Hive metadata 的壓力日益顯著,查詢也變得更慢,且存儲(chǔ) Hive 元數(shù)據(jù)的數(shù)據(jù)庫(kù)壓力也變大。



          二、Iceberg 架構(gòu)



          1. Iceberg 架構(gòu)解析




          術(shù)語(yǔ)解析


          • 數(shù)據(jù)文件(data files)




          Iceberg 表真實(shí)存儲(chǔ)數(shù)據(jù)的文件,一般存儲(chǔ)在 data 目錄下,以 “.parquet” 結(jié)尾。


          • 清單文件(Manifest file)


          每行都是每個(gè)數(shù)據(jù)文件的詳細(xì)描述,包括數(shù)據(jù)文件的狀態(tài)、文件路徑、分區(qū)信息、列級(jí)別的統(tǒng)計(jì)信息(比如每列的最大最小值、空值數(shù)等)。通過(guò)該文件,可過(guò)濾掉無(wú)關(guān)數(shù)據(jù),提高檢索速度。


          • 快照(Snapshot)




          快照代表一張表在某個(gè)時(shí)刻的狀態(tài)。每個(gè)快照版本包含某個(gè)時(shí)刻的所有數(shù)據(jù)文件列表。Data files 存儲(chǔ)在不同的 manifest files 里面, manifest files 存儲(chǔ)在一個(gè) Manifest list 文件里面,而一個(gè) Manifest list 文件代表一個(gè)快照。


          2. Iceberg 查詢計(jì)劃



          查詢計(jì)劃是在表中查找 “查詢所需文件” 的過(guò)程。


          • 元數(shù)據(jù)過(guò)濾


          清單文件包括分區(qū)數(shù)據(jù)元組和每個(gè)數(shù)據(jù)文件的列級(jí)統(tǒng)計(jì)信息。在計(jì)劃期間,查詢謂詞會(huì)自動(dòng)轉(zhuǎn)換為分區(qū)數(shù)據(jù)上的謂詞,并首先應(yīng)用于過(guò)濾數(shù)據(jù)文件。接下來(lái),使用列級(jí)值計(jì)數(shù),空計(jì)數(shù),下限和上限來(lái)消除與查詢謂詞不匹配的文件。


          • Snapshot ID


          每個(gè) Snapshot ID 會(huì)關(guān)聯(lián)到一組 manifest files,而每一組 manifest files 包含很多 manifest file。


          • manifest files 文件列表


          每個(gè) manifest files 又記錄了當(dāng)前 data 數(shù)據(jù)塊的元數(shù)據(jù)信息,其中就包含了文件列的最大值和最小值,然后根據(jù)這個(gè)元數(shù)據(jù)信息,索引到具體的文件塊,從而更快的查詢到數(shù)據(jù)。



          三、痛點(diǎn)一:Kafka 數(shù)據(jù)丟失



          1. 痛點(diǎn)介紹



          通常我們會(huì)選擇 Kafka 做實(shí)時(shí)數(shù)倉(cāng),以及日志傳輸。Kafka 本身存儲(chǔ)成本很高,且數(shù)據(jù)保留時(shí)間有時(shí)效性,一旦消費(fèi)積壓,數(shù)據(jù)達(dá)到過(guò)期時(shí)間后,就會(huì)造成數(shù)據(jù)丟失且沒(méi)有消費(fèi)到。


          2. 解決方案



          將實(shí)時(shí)要求不高的業(yè)務(wù)數(shù)據(jù)入湖、比如說(shuō)能接受 1-10 分鐘的延遲。因?yàn)?Iceberg 0.11 也支持 SQL 實(shí)時(shí)讀取,而且還能保存歷史數(shù)據(jù)。這樣既可以減輕線上 Kafka 的壓力,還能確保數(shù)據(jù)不丟失的同時(shí)也能實(shí)時(shí)讀取。


          3 .為什么 Iceberg 只能做近實(shí)時(shí)入湖?



          1. Iceberg 提交 Transaction 時(shí)是以文件粒度來(lái)提交。這就沒(méi)法以秒為單位提交 Transaction,否則會(huì)造成文件數(shù)量膨脹;


          2. 沒(méi)有在線服務(wù)節(jié)點(diǎn)。對(duì)于實(shí)時(shí)的高吞吐低延遲寫入,無(wú)法得到純實(shí)時(shí)的響應(yīng);


          3. Flink 寫入以 checkpoint 為單位,物理數(shù)據(jù)寫入 Iceberg 后并不能直接查詢,當(dāng)觸發(fā)了 checkpoint 才會(huì)寫 metadata 文件,這時(shí)數(shù)據(jù)由不可見(jiàn)變?yōu)榭梢?jiàn)。checkpoint 每次執(zhí)行都會(huì)有一定時(shí)間。


          4. Flink 入湖分析




          組件介紹


          • IcebergStreamWriter




          主要用來(lái)寫入記錄到對(duì)應(yīng)的 avro、parquet、orc 文件,生成一個(gè)對(duì)應(yīng)的 Iceberg DataFile,并發(fā)送給下游算子。



          另外一個(gè)叫做 IcebergFilesCommitter,主要用來(lái)在 checkpoint 到來(lái)時(shí)把所有的 DataFile 文件收集起來(lái),并提交 Transaction 到 Apache Iceberg,完成本次 checkpoint 的數(shù)據(jù)寫入,生成 DataFile。


          • IcebergFilesCommitter


          為每個(gè) checkpointId 維護(hù)了一個(gè) DataFile 文件列表,即 map<Long, List<DataFile>>,這樣即使中間有某個(gè) checkpoint 的 transaction 提交失敗了,它的 DataFile 文件仍然維護(hù)在 State 中,依然可以通過(guò)后續(xù)的 checkpoint 來(lái)提交數(shù)據(jù)到 Iceberg 表中。


          5. Flink SQL Demo



          Flink Iceberg 實(shí)時(shí)入湖流程,消費(fèi) Kafka 數(shù)據(jù)寫入 Iceberg,并從 Iceberg 近實(shí)時(shí)讀取數(shù)據(jù)。





          ■ 5.1 前期工作


          • 開(kāi)啟實(shí)時(shí)讀寫功能



          set execution.type = streaming


          • 開(kāi)啟 table sql hint 功能來(lái)使用 OPTIONS 屬性



          set table.dynamic-table-options.enabled=true


          • 注冊(cè) Iceberg catalog 用于操作 Iceberg 表


          CREATE CATALOG Iceberg_catalog WITH (\n" +            "  'type'='Iceberg',\n" +            "  'catalog-type'='Hive'," +            "  'uri'='thrift://localhost:9083'" +            ");

          • Kafka 實(shí)時(shí)數(shù)據(jù)入湖


          insert into Iceberg_catalog.Iceberg_db.tbl1 \n             select * from Kafka_tbl;

          • 數(shù)據(jù)湖之間實(shí)時(shí)流轉(zhuǎn) tbl1 -> tbl2



           insert into Iceberg_catalog.Iceberg_db.tbl2      select * from Iceberg_catalog.Iceberg_db.tbl1     /*+ OPTIONS('streaming'='true', 'monitor-interval'='10s',snapshot-id'='3821550127947089987')*/

          ■ 5.2 參數(shù)解釋


          • monitor-interval



          連續(xù)監(jiān)視新提交的數(shù)據(jù)文件的時(shí)間間隔(默認(rèn)值:1s)。


          • start-snapshot-id



          從指定的快照 ID 開(kāi)始讀取數(shù)據(jù)、每個(gè)快照 ID 關(guān)聯(lián)的是一組 manifest file 元數(shù)據(jù)文件,每個(gè)元數(shù)據(jù)文件映射著自己的真實(shí)數(shù)據(jù)文件,通過(guò)快照 ID,從而讀取到某個(gè)版本的數(shù)據(jù)。


          6. 踩坑記錄



          我之前在 SQL Client 寫數(shù)據(jù)到 Iceberg,data 目錄數(shù)據(jù)一直在更新,但是 metadata 沒(méi)有數(shù)據(jù),導(dǎo)致查詢的時(shí)候沒(méi)有數(shù),因?yàn)?Iceberg 的查詢是需要元數(shù)據(jù)來(lái)索引真實(shí)數(shù)據(jù)的。SQL Client 默認(rèn)沒(méi)有開(kāi)啟 checkpoint,需要通過(guò)配置文件來(lái)開(kāi)啟狀態(tài)。所以會(huì)導(dǎo)致 data 目錄寫入數(shù)據(jù)而 metadata 目錄不寫入元數(shù)據(jù)。
          PS:無(wú)論通過(guò) SQL 還是 Datastream 入湖,都必須開(kāi)啟 Checkpoint。


          7. 數(shù)據(jù)樣例



          下面兩張圖展示的是實(shí)時(shí)查詢 Iceberg 的效果,一秒前和一秒后的數(shù)據(jù)變化情況。


          • 一秒前的數(shù)據(jù)





          • 一秒后刷新的數(shù)據(jù)




          四、痛點(diǎn)二:

          Flink 結(jié)合 Hive 的近實(shí)時(shí)越來(lái)越慢



          1. 痛點(diǎn)介紹


          選用 Flink + Hive 的近實(shí)時(shí)架構(gòu)雖然支持了實(shí)時(shí)讀寫,但是這種架構(gòu)帶來(lái)的問(wèn)題是隨著表和分區(qū)增多,將會(huì)面臨以下問(wèn)題:


          • 元數(shù)據(jù)過(guò)多


          Hive 將分區(qū)改為小時(shí) / 分鐘級(jí),雖然提高了數(shù)據(jù)的準(zhǔn)實(shí)時(shí)性,但是 metestore 的壓力也是顯而易見(jiàn)的,元數(shù)據(jù)過(guò)多導(dǎo)致生成查詢計(jì)劃變慢,而且還會(huì)影響線上其他業(yè)務(wù)穩(wěn)定。


          • 數(shù)據(jù)庫(kù)壓力變大


          隨著元數(shù)據(jù)增加,存儲(chǔ) Hive 元數(shù)據(jù)的數(shù)據(jù)庫(kù)壓力也會(huì)增加,一段時(shí)間后,還需要對(duì)該庫(kù)進(jìn)行擴(kuò)容,比如存儲(chǔ)空間。



          2. 解決方案


          將原先的 Hive 近實(shí)時(shí)遷移到 Iceberg。為什么 Iceberg 可以處理元數(shù)據(jù)量大的問(wèn)題,而 Hive 在元數(shù)據(jù)大的時(shí)候卻容易形成瓶頸?


          • Iceberg 是把 metadata 維護(hù)在可拓展的分布式文件系統(tǒng)上,不存在中心化的元數(shù)據(jù)系統(tǒng);


          • Hive 則是把 partition 之上的元數(shù)據(jù)維護(hù)在 metastore 里面(partition 過(guò)多則給 mysql 造成巨大壓力),而 partition 內(nèi)的元數(shù)據(jù)其實(shí)是維護(hù)在文件內(nèi)的(啟動(dòng)作業(yè)需要列舉大量文件才能確定文件是否需要被掃描,整個(gè)過(guò)程非常耗時(shí))。




          五、優(yōu)化實(shí)踐


          1. 小文件處理


          • Iceberg 0.11 以前,通過(guò)定時(shí)觸發(fā) batch api 進(jìn)行小文件合并,這樣雖然能合并,但是需要維護(hù)一套 Actions 代碼,而且也不是實(shí)時(shí)合并的。


          Table table = findTable(options, conf);Actions.forTable(table).rewriteDataFiles()        .targetSizeInBytes(10 * 1024) // 10KB        .execute();

          • Iceberg 0.11 新特性,支持了流式小文件合并。



          通過(guò)分區(qū)/存儲(chǔ)桶鍵使用哈希混洗方式寫數(shù)據(jù)、從源頭直接合并文件,這樣的好處在于,一個(gè) task 會(huì)處理某個(gè)分區(qū)的數(shù)據(jù),提交自己的 Datafile 文件,比如一個(gè) task 只處理對(duì)應(yīng)分區(qū)的數(shù)據(jù)。這樣避免了多個(gè) task 處理提交很多小文件的問(wèn)題,且不需要額外的維護(hù)代碼,只需在建表的時(shí)候指定屬性 write.distribution-mode,該參數(shù)與其它引擎是通用的,比如 Spark 等。


          CREATE TABLE city_table (      province BIGINT,     city STRING  ) PARTITIONED BY (province, city) WITH (    'write.distribution-mode'='hash'   );

          2. Iceberg 0.11 排序


          ■ 2.1 排序介紹



          在 Iceberg 0.11 之前,F(xiàn)link 是不支持 Iceberg 排序功能的,所以之前只能結(jié)合 Spark 以批模式來(lái)支持排序功能,0.11 新增了排序特性的支持,也意味著,我們?cè)趯?shí)時(shí)也可以體會(huì)到這個(gè)好處。

          排序的本質(zhì)是為了掃描更快,因?yàn)榘凑?sort key 做了聚合之后,所有的數(shù)據(jù)都按照從小到大排列,max-min 可以過(guò)濾掉大量的無(wú)效數(shù)據(jù)。





          ■ 2.2 排序 demo



          insert into Iceberg_table select days from Kafka_tbl order by days, province_id;



          3. Iceberg 排序后 manifest 詳解




          參數(shù)解釋


          • file_path:物理文件位置。

          • partition:文件所對(duì)應(yīng)的分區(qū)。

          • lower_bounds:該文件中,多個(gè)排序字段的最小值,下圖是我的 days 和 province_id 最小值。

          • upper_bounds:該文件中,多個(gè)排序字段的最大值,下圖是我的 days 和 province_id 最大值。



          通過(guò)分區(qū)、列的上下限信息來(lái)確定是否讀取 file_path 的文件,數(shù)據(jù)排序后,文件列的信息也會(huì)記錄在元數(shù)據(jù)中,查詢計(jì)劃從 manifest 去定位文件,不需要把信息記錄在 Hive metadata,從而減輕 Hive metadata 壓力,提升查詢效率。

          利用 Iceberg 0.11 的排序特性,將天作為分區(qū)。按天、小時(shí)、分鐘進(jìn)行排序,那么 manifest 文件就會(huì)記錄這個(gè)排序規(guī)則,從而在檢索數(shù)據(jù)的時(shí)候,提高查詢效率,既能實(shí)現(xiàn) Hive 分區(qū)的檢索優(yōu)點(diǎn),還能避免 Hive metadata 元數(shù)據(jù)過(guò)多帶來(lái)的壓力。



          六、總結(jié)



          相較于之前的版本來(lái)說(shuō),Iceberg 0.11 新增了許多實(shí)用的功能,對(duì)比了之前使用的舊版本,做以下總結(jié):


          • Flink + Iceberg 排序功能



          在 Iceberg 0.11 以前,排序功能集成了 Spark,但沒(méi)有集成 Flink,當(dāng)時(shí)用 Spark + Iceberg 0.10 批量遷移了一批 Hive 表。在 BI 上的收益是:原先 BI 為了提升 Hive 查詢速度建了多級(jí)分區(qū),導(dǎo)致小文件和元數(shù)據(jù)過(guò)多,入湖過(guò)程中,利用 Spark 排序 BI 經(jīng)常查詢的條件,結(jié)合隱式分區(qū),最終提升 BI 檢索速度的同時(shí),也沒(méi)有小文件的問(wèn)題,Iceberg 有自身的元數(shù)據(jù),也減少了 Hive metadata 的壓力。

          Icebeg 0.11 支持了 Flink 的排序,是一個(gè)很實(shí)用的功能點(diǎn)。我們可以把原先 Flink + Hive 的分區(qū)轉(zhuǎn)移到 Iceberg 排序中,既能達(dá)到 Hive 分區(qū)的效果,也能減少小文件和提升查詢效率。


          • 實(shí)時(shí)讀取數(shù)據(jù)



          通過(guò) SQL 的編程方式,即可實(shí)現(xiàn)數(shù)據(jù)的實(shí)時(shí)讀取。好處在于,可以把實(shí)時(shí)性要求不高的,比如業(yè)務(wù)可以接受 1-10 分鐘延遲的數(shù)據(jù)放入 Iceberg 中 ,在減少 Kafka 壓力的同時(shí),也能實(shí)現(xiàn)數(shù)據(jù)的近實(shí)時(shí)讀取,還能保存歷史數(shù)據(jù)。


          • 實(shí)時(shí)合并小文件



          在Iceberg 0.11以前,需要用 Iceberg 的合并 API 來(lái)維護(hù)小文件合并,該 API 需要傳入表信息,以及定時(shí)信息,且合并是按批次這樣進(jìn)行的,不是實(shí)時(shí)的。從代碼上來(lái)說(shuō),增加了維護(hù)和開(kāi)發(fā)成本;從時(shí)效性來(lái)說(shuō),不是實(shí)時(shí)的。0.11 用 Hash 的方式,從源頭對(duì)數(shù)據(jù)進(jìn)行實(shí)時(shí)合并,只需在 SQL 建表時(shí)指定 ('write.distribution-mode'='hash') 屬性即可,不需要手工維護(hù)。

          瀏覽 60
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  久久成年人精品視频 | 丁香六月婷婷综合 | 成人A级黄色毛片 | 日本精品网站在线观看 | 五月天婷婷色 |