<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink Table Store 入湖 | 基于 Apache Flink Table Store 的全增量一體實時入湖

          共 10303字,需瀏覽 21分鐘

           ·

          2022-12-17 18:51

          摘要:本文簡要回顧數據入湖(倉)的發(fā)展階段,針對在數據庫數據入湖中面臨的問題,提出了使用 Flink Table Store 作為全增量一體入湖的解決方案,并輔以開源 Demo 的測試結果作為展示。文章主要內容包括:


          1. 數據庫數據集成入湖(倉)的發(fā)展階段及面臨痛點

          2. 基于 Apache Flink Table Store 解決全增量一體入湖

          3. 總結與展望


          Tips:點擊「閱讀原文」預約 FFA 大會直播

          01

          數據入湖(倉)發(fā)展階段及面臨痛點


          基于數據庫的數據集成過程,簡要來說經歷了如下幾個階段。


          1.1 全量 + 定期增量的數據入倉


          Fig.1 數據入倉: 一次全量+周期增量

          全量數據通過 bulk load 一次性導入,定時調度增量同步任務從數據庫同步增量到臨時表,再與全量數據進行合并。這種方式雖然能滿足一定的業(yè)務需求,但是也存在以下問題。

          ?? 鏈路復雜,時效性差

          全量與增量需要定期的合并以獲取最新的數據快照,由于不支持記錄級別的更新,用戶需要額外的 SQL 任務去計算去重;數據新鮮度依賴于調度,若數據存在晚到,則還需要處理數據漂移情況,一種常見處理方式是在 T + N 時刻(processing time)調度產生 T (event time)的合并任務,同時取 T ~ T + N 個分區(qū)(processing time),再從中過濾出業(yè)務時間小于等于 T (event time)的數據進行合并,這會導致數據新鮮度進一步降低。


          ?? 明細查詢慢,排查問題難

          雖然下游會使用各種交互式分析引擎來加速查詢,但基于成本考慮,底表明細數據一般沒有這種待遇,這就導致在數據正確性排查時需要直接查詢明細,特別是需要查詢合并前后全量和增量的明細變化來定位問題。如果業(yè)務變更,導致一批訂單數據需要訂正并要求生成訂正之后的各類指標,則需要手工對原始表及其下游依賴表進行級聯訂正。


          1.2 全量 + 實時增量的數據入湖


          相對于傳統(tǒng)數據倉庫,數據湖的出現使得數據在以低成本存儲的同時,數據新鮮度有了極大的提升。以 Apache Hudi 為例,它支持先做一次全量 bootstrap 構建基礎表,然后基于新接入的 CDC 數據進行實時構建 [1],如 Fig.2 所示。


          Fig.2 數據入湖: 一次全量+實時增量


          由于支持記錄級別的更新及刪除,在存儲側就可以完成主鍵的去重,不再需要額外的合并任務。在數據新鮮度方面,由于流式作業(yè)會定期的觸發(fā) checkpoint 來產生全量與增量合并后的快照,故而數據新鮮度對比第一種方式(以天或小時調度產生合并快照)有了很大提升。


          但從另外一方面,我們也發(fā)現這種方式有以下這些問題。


          ?? Bootstrap Index 超時及 state 膨脹

          以流模式啟動 Flink 增量同步作業(yè)后,系統(tǒng)會先將全量表導入到 Flink state 來構建 Hoodie key(即主鍵 + 分區(qū)路徑)到寫入文件的 file group 的索引,此過程會阻塞 checkpoint 完成。而只有在 checkpoint 成功后,寫入的數據才可以變?yōu)榭勺x狀態(tài),故而當全量數據很大時,有可能會出現 checkpoint 一直超時的情況,導致下游讀不到數據。另外,由于索引一直保存在 state 內,在增量同步階段遇到了 insert 類型的記錄也會更新索引,需要合理評估 state TTL,配置太小可能會丟失數據,配置過大可能導致 state 膨脹。


          ?? 鏈路依然復雜,難以對齊增量點位,自動化運維成本高

          量 + 實時增量的方式并沒有簡化鏈路的復雜度,因為它額外引入了 Kafka 的運維,需要手工對齊增量消費的點位以防止數據丟失 [2]。在啟動增量 CDC 作業(yè)后,用戶需要等待和觀察作業(yè)的運行狀態(tài),在第一次 checkpoint 成功后,暫停作業(yè)(stop-with-savepoint)修改配置禁用 Bootstrap Index,然后從 savepoint 重啟作業(yè)( restore-from-savepoint)。整個過程操作復雜,實現自動化運維成本比較高。


          此外,我們回顧了一些使用 Hudi 的行業(yè)實踐,發(fā)現用戶需要格外注意各項配置來實現不同需求,這對易用性有一定的傷害。比如 [1] 中提到的需要在平臺層面監(jiān)控用戶的建表語句,防止在大規(guī)模寫入場景配置為 COW(Copy on Write) 模式;全增量切換時用戶必須格外注意 Kafka 消費點位來保證數據準確性,參數配置極大影響了作業(yè)的數據準確性及性能。


          02

          基于 Apache Flink Table Store 的全增量一體入湖


          隨著基于日志的 CDC 逐步取代基于查詢的 CDC,特別是 Flink SQL CDC 在 source 端已支持全增量一體同步后,全增量一體入湖(使用一個流作業(yè)完成全量同步、并持續(xù)監(jiān)聽增量 changelog)也成為一個新的探索方向。這種方式降低了鏈路復雜度,同時將全增量切換時需要手工對齊 offset 的繁瑣托管給了 Flink CDC 和 checkpoint 機制,讓框架層面去保障數據的最終一致性。但經過調研我們發(fā)現,在使用 Hudi 做這種嘗試時遇到了以下挑戰(zhàn)。


          ?? 全量同步階段數據亂序嚴重,寫入性能和穩(wěn)定性難以保障 

          在全量同步階段面臨的一個問題是多并發(fā)同時讀取 chunk 會遇到嚴重的數據亂序,出現同時寫多個分區(qū)的情況,大量的隨機寫入會導致性能回退,出現吞吐毛刺,每個分區(qū)對應的 writer 都要維護各自緩存,很容易發(fā)生 OOM 導致作業(yè)不穩(wěn)定。雖然 Hudi 支持通過 Rate Limit [3] 限制每分鐘的數據寫入來起到一定的平滑效果,但在作業(yè)穩(wěn)定性和性能吞吐之間取得平衡的調優(yōu)過程對于一般用戶來說門檻也較高。


          2.1 為什么選擇 Flink Table Store


          Apache Table Store [4] 作為 2022 年初開源的 Apache Flink 子項目,目標是打造一個支持更新的據湖存儲,用于實時流式 Changelog 攝取和高性能查詢。


          ?? 大吞吐量的更新數據攝取,支持全增量一體入湖,一個 Flink 作業(yè)搞定所有

          Fig.3 Flink Table Store: 全量 + 增量一體化同步

          回顧前文,我們知道全增量一體化同步入湖的主要挑戰(zhàn)在于全量同步階段產生了大量數據亂序引起的隨機寫入,導致性能回退、吞吐毛刺及不穩(wěn)定。而 Table Store 的存儲格式采用先分區(qū)(Partition)再分桶(Bucket),每個桶內各自維護一棵 LSM(Log-structured Merge Tree)的方式(參見 Fig.4、Fig.5),每條記錄通過主鍵哈希落入桶內時就確定寫入路徑(Directory),以 KV 方式寫入 MemTable 中(類似于 HashMap,Key 就是主鍵,Value 是記錄)。在 flush 到磁盤的過程中,以主鍵排序合并(去重),以追加方式寫入磁盤。Sort Merge 在 buffer 內進行,避免了需要點查索引來判斷一條記錄是 insert 還是 update 來獲取寫入文件的 file group 的 tagging [5] 。另外,觸發(fā) MemTable flush 發(fā)生在 buffer 充滿時,不需要額外通過 Auto-File Sizing [6](Auto-File Sizing 會影響寫入速度 [7])來避免小文件產生,整個寫入過程都是局部且順序的 [8],避免了隨機 IO 產生。

          使用 Table Store 作為湖存儲時,只需要一條 INSERT INTO 語句就可以完成全增量同步。以如下 SQL 為例,它展示了使用一個 Flink 流作業(yè)將 MySQL 數據庫中的訂單表通過 Streaming 方式寫入 Table Store 表,并持續(xù)消費增量數據。

              -- 創(chuàng)建并使用 table store catalog    CREATE CATALOG `table_store` WITH (        'type' = 'table-store',        'warehouse' = 'hdfs://foo/bar'    );
          USE CATALOG `table_store`;
          -- 定義 mysql-cdc source 表 CREATE TEMPORARY TABLE `orders_cdc` ( order_id BIGINT NOT NULL, gmt_modified TIMESTAMP(3) NOT NULL, ... PRIMARY KEY (`order_id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', ... );
          -- 定義 table store ods 表,按日期作分區(qū) CREATE TABLE IF NOT EXISTS `orders` ( ... PRIMARY KEY (`dt`, `order_id`) NOT ENFORCED ) PARTITIONED BY (`dt`);
          -- streaming 模式下提交作業(yè) SET 'execution.runtime-mode' = 'streaming';
          -- 設置 1min 的 cp interval,對應 1min 的數據新鮮度 SET 'execution.checkpointing.interval' = '1min';
          -- 一條 SQL 同步全量 + 增量,動態(tài)分區(qū)寫入 INSERT INTO `orders` SELECT ..., DATE_FORMAT(gmt_modified, 'yyyy-MM-dd') AS dt    FROM `orders_cdc`;

          Fig.4 展示了以 dt 作為分區(qū) orders 表的存儲結構,在用戶指定總 bucket 數 N 后,每個分區(qū)下會生成相應的 bucket-${n} 目錄,每個目錄下以列存格式(orc 或 parquet)存放 hash_func(pk) % N == ${n} 的記錄文件。


           Fig.4 Flink Table Store 表的文件目錄


          元數據與數據存儲在表的同一級目錄下,包括 manifest 目錄和 snapshot 目錄。


          • manifest 目錄下中記錄每次經 checkpoint 觸發(fā)而提交的數據文件變更,包含新增和刪除的數據文件

          • snapshot 目錄下記錄每次提交產生的 snapshot 文件,內容包括為上一次提交產生的 manifest,加上本次提交產生的 manifest 作為增量


          Fig.5 展示了 Fig.4 中每個 bucket 內 LSM 實現過程。我們以 Flink 流作業(yè)為例,在每次 checkpoint 時,Flink Table Store 會產生一次提交(commit),包含以下信息

          • 生成當前 table 的一個快照(snapshot)。系統(tǒng)會通過 snapshot pointer file(類似于指針)追蹤最早產生和當前最新的 snapshot 文件


          • snapshot 文件中包含本次 commit 新增哪些 manifest 文件、刪除哪些 manifest 文件


          • 每個 manifest 文件中記錄了產生了哪些 sst 文件、刪除了哪些 sst 文件,以及每個 sst 文件所包含記錄的主鍵范圍、每個字段的 min/max/null count 等統(tǒng)計信息


          • 每個 sst 文件則包含了按主鍵排好序的、列存格式的記錄。對于 Level 0 的文件,Table Store 會異步地觸發(fā) compact 合并線程來消除主鍵范圍重疊帶來的讀端 merge 開銷


          Fig.5 Flink Table Store 表的 LSM 實現


          在 Flink Table Store 0.2.0 發(fā)布時,我們測試了五億條數據在一億主鍵下的實時更新場景寫入(包含插入與更新)并與 Apache Hudi MOR 及 COW 進行對比 [9], Table Store 在大規(guī)模實時更新寫入場景擁有良好的寫入性能。

          ?? 高效 Data Skipping 支持過濾,提供高性能的點查和范圍查詢

          雖然沒有額外的索引,但是得益于 meta 的管理和列存格式,manifest 中保存了

          • 文件的主鍵的 min/max 及每個字段的統(tǒng)計信息,這可以在不訪問文件的情況下,進行一些 predicate 的過濾

          • orc/parquet 格式中,文件的尾部記錄了稀疏索引,每個 chunk 的統(tǒng)計信息和 offset,這可以通過文件的尾部信息,進行一些 predicate 的過濾

          數據在有 filter 讀取時,可以根據上述信息做如下過濾

          1. 讀取 manifest:根據文件的 min/max、分區(qū),執(zhí)行分區(qū)和字段的 predicate,淘汰多余的文件

          2. 讀取文件 footer:根據 chunk 的 min/max,過濾不需要讀取的 chunk

          3. 讀取剩下與文件以及其中的 chunks

          上述訂單表 orders 為例,當用戶想要查詢 dt = 2022-01-01 分區(qū)下所有 order_id 在 100 ~ 200 之間的訂單時

          SELECT * FROM orders WHERE dt = '2022-01-01' AND order_id >= 100 AND order_id <= 200;

          Flink Table Store 會先根據 LATEST-SNAPSHOT 文件讀到最近一次提交的 snapshot 文件(read committed),然后從 snapshot 中讀取到對應 manifest meta 文件, 根據分區(qū)條件 dt='2022-01-01',過濾出包含這些分區(qū)的統(tǒng)計信息,由于統(tǒng)計信息里包含了每個 sst 文件 key 的范圍,所以繼續(xù)執(zhí)行 order_id 在 [100, 200] 區(qū)間這個過濾條件,就能在 2022-01-01 這個目錄下只讀取對應的 sst 文件。

          我們同樣基于上述數據集測試了 Flink Table Store 的查詢性能[9],在點查和范圍查詢的場景下,Flink Table Store 表現出眾。從實現原理來說,MOR 的查詢性能低于 COW、COW 的寫入性能低于 MOR 是難以避免的。而在實踐層面,在大規(guī)模寫入場景下建立的 MOR 表也很難一鍵轉換為 COW 來讀取,所以在查詢寫入較多的表(MOR 表)這個前提下,Flink Table Store 的查詢表現還是不俗的。


          ?? 文件格式支持流讀

          Flink Table Store 實現 Incremental Scan,在流模式下,可以持續(xù)監(jiān)聽文件更新,數據新鮮度保持在分鐘級別,如下所示。

          -- 進入 SQL CLI,創(chuàng)建 catalog 和 tableCREATE CATALOG table_store WITH (  'type' = 'table-store',  'warehouse' = 'file://foo/bar/' --或 'hdfs://foo/bar');
          CREATE TABLE IF NOT EXIST my_table ( f0 INT, f1 STRING, PRIMARY KEY(f0) NOT ENFORCED);
          -- 切換到 batch 模式,寫入數據SET 'execution.runtime-mode' = 'batch';INSERT INTO my_table VALUES(1, 'Hello');
          -- 新打開一個 SQL CLI 中,切換到 streaming 模式,提交流式查詢SET 'execution.runtime-mode' = 'streaming';SET 'sql-client.execution.result-mode' = 'tableau';SELECT * FROM my_table; -- 可以讀到結果如下+----+-------------+--------------------------------+| op | f0 | f1 |+----+-------------+--------------------------------+| +I | 1 | Hello |
          -- 在第一個 SQL CLI 中,繼續(xù)寫入數據INSERT INTO my_table VALUES(1, 'Bye'), (2, '你好');
          -- 可以在第二個 SQL CLI 中,觀察到新增輸出 (-U, 1, Hello),(+U, 1, Bye) 和 (+I, 2, 你好) +----+-------------+--------------------------------+| op | f0 | f1 |+----+-------------+--------------------------------+| +I | 1 | Hello || -U | 1 | Hello || +U | 1 | Bye || +I | 2 | 你好 |


          2.2 基于 TPC-H 數據集的全增量一體入湖 Demo


          前文對 Flink Table Store 解決全增量一體入湖進行了簡要分析,下面一個實例演示了如何在本地單機環(huán)境下,將近六千萬條訂單記錄作為全量數據從 MySQL 同步到 Flink Table Store,并持續(xù)消費增量更新(由 TPC-H RF1 和 RF2 產生),下游接實時聚合及查詢的過程。



          數據源由 TPC-H 自動生成并導入 MySQL ,運行在 Docker 容器內,本地只需要下載 Flink release 包和 Flink Table Store 依賴即可完成。

          Demo 使用 lineitem 表中發(fā)貨日期 l_shipdate 作為業(yè)務字段定義了二級分區(qū) l_year 和 l_month,時間跨度從 1992.1 ~ 1998.12,即動態(tài)寫入 84 個分區(qū)。 經測試,在本地單機并發(fā)為 2,checkpoint interval 為 1 min 的配置下(每分鐘更新可見)46 min 內寫入 59.9 million 全量數據,每 10 min 的寫入性能如下表所示,平均寫入性能在 1.3 million/min。


          詳細配置如下所示



          詳細步驟可查看 Apache Flink Table Store 全增量一體 CDC 實時入湖https://github.com/LadyForest/flink-table-store-101/blob/master/real-time-update/README.zh.md


          03

          總結與展望


          本文簡要回顧了數據入湖(倉)的發(fā)展階段,針對在數據庫數據入湖中面臨的問題,我們提出了使用 Flink Table Store 作為全增量一體入湖的解決方案,并輔以開源 Demo 的測試結果作為展示。

          我們期待用戶的一線反饋及同行深入交流,歡迎大家掃碼加入釘釘群一起探索。


          參考資料

          [1] 基于 Hudi 的湖倉一體技術在 Shopee 的實踐

          https://mp.weixin.qq.com/s/3nsYTVu9nZCIFaaXP09hiQ

          [2] Change Data Capture with Debezium and Apache Hudi
          https://hudi.apache.org/blog/2022/01/14/change-data-capture-with-debezium-and-apache-hudi/#bootstrapping-existing-tables
          [3] Apache Hudi DeltaStreamer#Rate Limit
          https://hudi.apache.org/docs/next/hoodie_deltastreamer#rate-limit
          [4] https://github.com/apache/flink-table-store
          [5] Apache Hudi Technical Specification#Writer Expectations
          https://hudi.apache.org/tech-specs/#writer-expectations
          [6] Apache Hudi Write Operations#Writing path
          https://hudi.apache.org/docs/next/write_operations/#writing-path
          [7] Apache Hudi File Sizing#Auto-Size During ingestion
          https://hudi.apache.org/docs/next/file_sizing/#auto-size-during-ingestion
          [8] On Disk IO, Part 3: LSM Trees
          https://medium.com/databasss/on-disk-io-part-3-lsm-trees-8b2da218496f
          [9] Apache Flink Table Store 0.2.0 發(fā)布
          https://mp.weixin.qq.com/s/Xy9HkWlzrq63mRUMuQluQg
          [10] Apache Flink Table Store 全增量一體 CDC 實時入湖
          https://github.com/LadyForest/flink-table-store-101/blob/master/real-time-update/README.zh.md




          ▼ 關注「Apache Flink」,獲取更多技術干貨 



          瀏覽 39
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产亚洲视频免费观看 | 天天干天天射天天日天天操 | 欧美黄在线 | 成人性生活免费视频 | 91探花国产视频 |