<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          使用 Flink Hudi 構(gòu)建流式數(shù)據(jù)湖

          共 21476字,需瀏覽 43分鐘

           ·

          2021-09-01 15:17


          摘要:本文介紹了 Flink Hudi 通過流計算對原有基于 mini-batch 的增量計算模型不斷優(yōu)化演進(jìn)。用戶可以通過 Flink SQL 將 CDC 數(shù)據(jù)實時寫入 Hudi 存儲,且在即將發(fā)布的 0.9 版本 Hudi 原生支持 CDC format。主要內(nèi)容為:

          1. 背景
          2. 增量 ETL
          3. 演示

          一、背景



          近實時



          從 2016 年開始,Apache Hudi 社區(qū)就開始通過 Hudi 的 UPSERT 能力探索近實時場景的使用案例 [1]。通過 MR/Spark 的批處理模型,用戶可以實現(xiàn)小時級別的數(shù)據(jù)注入 HDFS/OSS。在純實時場景,用戶通過流計算引擎 Flink + KV/OLAP 存儲的架構(gòu)可以實現(xiàn)端到端的秒級 (5分鐘級) 實時分析。然而在秒級 (5分鐘級) 到小時級時的場景還存在大量的用例,我們稱之為 NEAR-REAL-TIME (近實時)。
          在實踐中有大量的案例都屬于近實時的范疇:


          1. 分鐘級別的大屏;


          2. 各種 BI 分析 (OLAP);


          3. 機(jī)器學(xué)習(xí)分鐘級別的特征提取。




          增量計算



          解決近實時的方案當(dāng)前是比較開放的。


          • 流處理的時延低,但是 SQL 的 pattern 比較固定,查詢端的能力(索引、ad hoc)欠缺;


          • 批處理的數(shù)倉能力豐富但是數(shù)據(jù)時延大。



          于是 Hudi 社區(qū)提出基于 mini-batch 的增量計算模型:

          增量數(shù)據(jù)集 => 增量計算結(jié)果 merge 已存結(jié)果 => 外存

          這套模型通過湖存儲的 snapshot 拉取增量的數(shù)據(jù)集 (兩個 commits 之前的數(shù)據(jù)集),通過 Spark/Hive 等批處理框架計算增量的結(jié)果 (比如簡單的 count) 再 merge 到已存結(jié)果中。


          核心問題



          增量模型需要解決的核心問題:


          • UPSERT 能力:類似 KUDU 和 Hive ACID,Hudi 也提供了分鐘級的更新能力;


          • 增量消費:Hudi 通過湖存儲的多 snapshots 提供增量拉取。



          基于 mini-batch 的增量計算模型可以提升部分場景的時延、節(jié)省計算成本,但有一個很大的限制:對 SQL 的 pattern 有要求。因為計算走的是批,批計算本身不維護(hù)狀態(tài),這就要求計算的指標(biāo)能夠比較方便地 merge,簡單的 count、sum 可以做,但是 avg、count distinct 這些還是需要拉取全量數(shù)據(jù)重算。

          隨著流計算和實時數(shù)倉的普及,Hudi 社區(qū)也在積極的擁抱變化,通過流計算對原有基于 mini-batch 的增量計算模型不斷優(yōu)化演進(jìn):在 0.7 版本引入了流式數(shù)據(jù)入湖,在 0.9 版本支持了原生的 CDC format。

          二、增量 ETL



          DB 數(shù)據(jù)入湖



          隨著 CDC 技術(shù)的成熟,debezium 這樣的 CDC 工具越來越流行,Hudi 社區(qū)也先后集成了流寫,流讀的能力。用戶可以通過 Flink SQL 將 CDC 數(shù)據(jù)實時寫入 Hudi 存儲:



          • 用戶既可以通過 Flink CDC connector 直接將 DB 數(shù)據(jù)導(dǎo)入 Hudi;


          • 也可以先將 CDC 數(shù)據(jù)導(dǎo)入 Kafka,再通過 Kafka connector 導(dǎo)入 Hudi。



          第二種方案的容錯和擴(kuò)展性會好一些。


          數(shù)據(jù)湖 CDC



          在即將發(fā)布的 0.9 版本,Hudi 原生支持 CDC format,一條 record 的所有變更記錄都可以保存,基于此,Hudi 和流計算系統(tǒng)結(jié)合的更加完善,可以流式讀取 CDC 數(shù)據(jù) [2]:




          源頭 CDC 流的所有消息變更都在入湖之后保存下來,被用于流式消費。Flink 的有狀態(tài)計算實時累加計算結(jié)果 (state),通過流式寫 Hudi 將計算的變更同步到 Hudi 湖存儲,之后繼續(xù)對接 Flink 流式消費 Hudi 存儲的 changelog, 實現(xiàn)下一層級的有狀態(tài)計算。近實時端到端 ETL pipeline:




          這套架構(gòu)將端到端的 ETL 時延縮短到分鐘級,并且每一層的存儲格式都可以通過 compaction 壓縮成列存(Parquet、ORC)以提供 OLAP 分析能力,由于數(shù)據(jù)湖的開放性,壓縮后的格式可以對接各種查詢引擎:Flink、Spark、Presto、Hive 等。

          一張 Hudi 數(shù)據(jù)湖表具備兩種形態(tài):


          • 表形態(tài):查詢最新的快照結(jié)果,同時提供高效的列存格式。


          • 流形態(tài):流式消費變更,可以指定任意點位流讀之后的 changelog。



          三、演示




          我們通過一段 Demo 演示 Hudi 表的兩種形態(tài)。


          環(huán)境準(zhǔn)備


          • Flink SQL Client


          • Hudi master 打包 hudi-flink-bundle jar


          • Flink 1.13.1



          這里提前準(zhǔn)備一段 debezium-json 格式的 CDC 數(shù)據(jù):

          {"before":null,"after":{"id":101,"ts":1000,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}{"before":null,"after":{"id":102,"ts":2000,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}{"before":null,"after":{"id":103,"ts":3000,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.800000011920929},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}{"before":null,"after":{"id":104,"ts":4000,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}{"before":null,"after":{"id":105,"ts":5000,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}{"before":null,"after":{"id":106,"ts":6000,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}{"before":null,"after":{"id":107,"ts":7000,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}{"before":null,"after":{"id":108,"ts":8000,"name":"jacket","description":"water resistent black wind breaker","weight":0.10000000149011612},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}{"before":null,"after":{"id":109,"ts":9000,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}{"before":{"id":106,"ts":6000,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"after":{"id":106,"ts":10000,"name":"hammer","description":"18oz carpenter hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589361987000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":362,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589361987936,"transaction":null}{"before":{"id":107,"ts":7000,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"after":{"id":107,"ts":11000,"name":"rocks","description":"box of assorted rocks","weight":5.099999904632568},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362099000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362099505,"transaction":null}{"before":null,"after":{"id":110,"ts":12000,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362210000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1068,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362210230,"transaction":null}{"before":null,"after":{"id":111,"ts":13000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362243000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1394,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362243428,"transaction":null}{"before":{"id":110,"ts":12000,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"after":{"id":110,"ts":14000,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362293000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1707,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362293539,"transaction":null}{"before":{"id":111,"ts":13000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"after":{"id":111,"ts":15000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362330000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2090,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362330904,"transaction":null}{"before":{"id":111,"ts":16000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"after":null,"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362344000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2443,"row":0,"thread":2,"query":null},"op":"d","ts_ms":1589362344455,"transaction":null}

          通過 Flink SQL Client 創(chuàng)建表用來讀取 CDC 數(shù)據(jù)文件:

          Flink SQL> CREATE TABLE debezium_source(>   id INT NOT NULL,>   ts BIGINT,>   name STRING,>   description STRING,>   weight DOUBLE> ) WITH (>   'connector' = 'filesystem',>   'path' = '/Users/chenyuzhao/workspace/hudi-demo/source.data',>   'format' = 'debezium-json'> );[INFO] Execute statement succeed.

          執(zhí)行 SELECT 觀察結(jié)果,可以看到一共有 20 條記錄,中間有一些 UPDATE s,最后一條消息是 DELETE:


          Flink SQL> select * from debezium_source;+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+| op |          id |                   ts |                           name |                    description |                         weight |+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+| +I |         101 |                 1000 |                        scooter |          Small 2-wheel scooter |              3.140000104904175 || +I |         102 |                 2000 |                    car battery |                12V car battery |              8.100000381469727 || +I |         103 |                 3000 |             12-pack drill bits | 12-pack of drill bits with ... |              0.800000011920929 || +I |         104 |                 4000 |                         hammer |        12oz carpenter's hammer |                           0.75 || +I |         105 |                 5000 |                         hammer |        14oz carpenter's hammer |                          0.875 || +I |         106 |                 6000 |                         hammer |        16oz carpenter's hammer |                            1.0 || +I |         107 |                 7000 |                          rocks |          box of assorted rocks |              5.300000190734863 || +I |         108 |                 8000 |                         jacket | water resistent black wind ... |            0.10000000149011612 || +I |         109 |                 9000 |                     spare tire |             24 inch spare tire |             22.200000762939453 || -U |         106 |                 6000 |                         hammer |        16oz carpenter's hammer |                            1.0 || +U |         106 |                10000 |                         hammer |          18oz carpenter hammer |                            1.0 || -U |         107 |                 7000 |                          rocks |          box of assorted rocks |              5.300000190734863 || +U |         107 |                11000 |                          rocks |          box of assorted rocks |              5.099999904632568 || +I |         110 |                12000 |                         jacket | water resistent white wind ... |            0.20000000298023224 || +I |         111 |                13000 |                        scooter |           Big 2-wheel scooter  |              5.179999828338623 || -U |         110 |                12000 |                         jacket | water resistent white wind ... |            0.20000000298023224 || +U |         110 |                14000 |                         jacket | new water resistent white w... |                            0.5 || -U |         111 |                13000 |                        scooter |           Big 2-wheel scooter  |              5.179999828338623 || +U |         111 |                15000 |                        scooter |           Big 2-wheel scooter  |              5.170000076293945 || -D |         111 |                16000 |                        scooter |           Big 2-wheel scooter  |              5.170000076293945 |+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+Received a total of 20 rows

          創(chuàng)建  Hudi 表,這里設(shè)置表的形態(tài)為 MERGE_ON_READ 并且打開 changelog 模式屬性 changelog.enabled


          Flink SQL> CREATE TABLE hoodie_table(>   id INT NOT NULL PRIMARY KEY NOT ENFORCED,>   ts BIGINT,>   name STRING,>   description STRING,>   weight DOUBLE> ) WITH (>   'connector' = 'hudi',>   'path' = '/Users/chenyuzhao/workspace/hudi-demo/t1',>   'table.type' = 'MERGE_ON_READ',>   'changelog.enabled' = 'true',>   'compaction.async.enabled' = 'false'> );[INFO] Execute statement succeed.

          查詢



          通過  INSERT 語句將數(shù)據(jù)導(dǎo)入 Hudi,開啟流讀模式,并執(zhí)行查詢觀察結(jié)果:



          Flink SQL> select * from hoodie_table/*+ OPTIONS('read.streaming.enabled'='true')*/;+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+| op |          id |                   ts |                           name |                    description |                         weight |+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+| +I |         101 |                 1000 |                        scooter |          Small 2-wheel scooter |              3.140000104904175 || +I |         102 |                 2000 |                    car battery |                12V car battery |              8.100000381469727 || +I |         103 |                 3000 |             12-pack drill bits | 12-pack of drill bits with ... |              0.800000011920929 || +I |         104 |                 4000 |                         hammer |        12oz carpenter's hammer |                           0.75 || +I |         105 |                 5000 |                         hammer |        14oz carpenter's hammer |                          0.875 || +I |         106 |                 6000 |                         hammer |        16oz carpenter's hammer |                            1.0 || +I |         107 |                 7000 |                          rocks |          box of assorted rocks |              5.300000190734863 || +I |         108 |                 8000 |                         jacket | water resistent black wind ... |            0.10000000149011612 || +I |         109 |                 9000 |                     spare tire |             24 inch spare tire |             22.200000762939453 || -U |         106 |                 6000 |                         hammer |        16oz carpenter's hammer |                            1.0 || +U |         106 |                10000 |                         hammer |          18oz carpenter hammer |                            1.0 || -U |         107 |                 7000 |                          rocks |          box of assorted rocks |              5.300000190734863 || +U |         107 |                11000 |                          rocks |          box of assorted rocks |              5.099999904632568 || +I |         110 |                12000 |                         jacket | water resistent white wind ... |            0.20000000298023224 || +I |         111 |                13000 |                        scooter |           Big 2-wheel scooter  |              5.179999828338623 || -U |         110 |                12000 |                         jacket | water resistent white wind ... |            0.20000000298023224 || +U |         110 |                14000 |                         jacket | new water resistent white w... |                            0.5 || -U |         111 |                13000 |                        scooter |           Big 2-wheel scooter  |              5.179999828338623 || +U |         111 |                15000 |                        scooter |           Big 2-wheel scooter  |              5.170000076293945 || -D |         111 |                16000 |                        scooter |           Big 2-wheel scooter  |              5.170000076293945 |


          可以看到 Hudi 保留了每行的變更記錄,包括 change log 的 operation 類型,這里我們打開 TABLE HINTS 功能,方便動態(tài)設(shè)置表參數(shù)。

          繼續(xù)使用 batch 讀模式,執(zhí)行查詢觀察輸出結(jié)果,可以看到中間的變更被合并:


          Flink SQL> select * from hoodie_table;2021-08-20 20:51:25,052 INFO  org.apache.hadoop.conf.Configuration.deprecation             [] - mapred.job.map.memory.mb is deprecated. Instead, use mapreduce.map.memory.mb+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+| op |          id |                   ts |                           name |                    description |                         weight |+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+| +U |         110 |                14000 |                         jacket | new water resistent white w... |                            0.5 || +I |         101 |                 1000 |                        scooter |          Small 2-wheel scooter |              3.140000104904175 || +I |         102 |                 2000 |                    car battery |                12V car battery |              8.100000381469727 || +I |         103 |                 3000 |             12-pack drill bits | 12-pack of drill bits with ... |              0.800000011920929 || +I |         104 |                 4000 |                         hammer |        12oz carpenter's hammer |                           0.75 || +I |         105 |                 5000 |                         hammer |        14oz carpenter's hammer |                          0.875 || +U |         106 |                10000 |                         hammer |          18oz carpenter hammer |                            1.0 || +U |         107 |                11000 |                          rocks |          box of assorted rocks |              5.099999904632568 || +I |         108 |                 8000 |                         jacket | water resistent black wind ... |            0.10000000149011612 || +I |         109 |                 9000 |                     spare tire |             24 inch spare tire |             22.200000762939453 |+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+Received a total of 10 rows

          聚合


          Bounded Source 讀下計算 count(*)



          Flink SQL> select count (*) from hoodie_table;+----+----------------------+| op |               EXPR$0 |+----+----------------------+| +I |                    1 || -U |                    1 || +U |                    2 || -U |                    2 || +U |                    3 || -U |                    3 || +U |                    4 || -U |                    4 || +U |                    5 || -U |                    5 || +U |                    6 || -U |                    6 || +U |                    7 || -U |                    7 || +U |                    8 || -U |                    8 || +U |                    9 || -U |                    9 || +U |                   10 |+----+----------------------+Received a total of 19 rows



          Streaming 讀模式計算 count(*)



          Flink SQL> select count (*) from hoodie_table/*+OPTIONS('read.streaming.enabled'='true')*/;+----+----------------------+| op |               EXPR$0 |+----+----------------------+| +I |                    1 || -U |                    1 || +U |                    2 || -U |                    2 || +U |                    3 || -U |                    3 || +U |                    4 || -U |                    4 || +U |                    5 || -U |                    5 || +U |                    6 || -U |                    6 || +U |                    7 || -U |                    7 || +U |                    8 || -U |                    8 || +U |                    9 || -U |                    9 || +U |                    8 || -U |                    8 || +U |                    9 || -U |                    9 || +U |                    8 || -U |                    8 || +U |                    9 || -U |                    9 || +U |                   10 || -U |                   10 || +U |                   11 || -U |                   11 || +U |                   10 || -U |                   10 || +U |                   11 || -U |                   11 || +U |                   10 || -U |                   10 || +U |                   11 || -U |                   11 || +U |                   10 |

          可以看到 batch 和 streaming 模式下的計算結(jié)果是一致的。
          瀏覽 62
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  A成V人| 久久艹精品 | 亚洲综合一区二区 | 操人网站 | 丰滿人妻综合一区二区三区 |