Hudi 實(shí)踐 | 使用 Flink Hudi 構(gòu)建流式數(shù)據(jù)湖
背景 增量 ETL 演示
GitHub 地址 
一、背景
近實(shí)時(shí)

分鐘級(jí)別的大屏;
各種 BI 分析 (OLAP);
機(jī)器學(xué)習(xí)分鐘級(jí)別的特征提取。
增量計(jì)算
流處理的時(shí)延低,但是 SQL 的 pattern 比較固定,查詢(xún)端的能力(索引、ad hoc)欠缺;
批處理的數(shù)倉(cāng)能力豐富但是數(shù)據(jù)時(shí)延大。
核心問(wèn)題
UPSERT 能力:類(lèi)似 KUDU 和 Hive ACID,Hudi 也提供了分鐘級(jí)的更新能力;
增量消費(fèi):Hudi 通過(guò)湖存儲(chǔ)的多 snapshots 提供增量拉取。
二、增量 ETL
DB 數(shù)據(jù)入湖

用戶(hù)既可以通過(guò) Flink CDC connector 直接將 DB 數(shù)據(jù)導(dǎo)入 Hudi;
也可以先將 CDC 數(shù)據(jù)導(dǎo)入 Kafka,再通過(guò) Kafka connector 導(dǎo)入 Hudi。
數(shù)據(jù)湖 CDC


表形態(tài):查詢(xún)最新的快照結(jié)果,同時(shí)提供高效的列存格式。
流形態(tài):流式消費(fèi)變更,可以指定任意點(diǎn)位流讀之后的 changelog。
三、演示
環(huán)境準(zhǔn)備
Flink SQL Client
Hudi master 打包 hudi-flink-bundlejarFlink 1.13.1
{"before":null,"after":{"id":101,"ts":1000,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}{"before":null,"after":{"id":102,"ts":2000,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}{"before":null,"after":{"id":103,"ts":3000,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.800000011920929},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}{"before":null,"after":{"id":104,"ts":4000,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}{"before":null,"after":{"id":105,"ts":5000,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}{"before":null,"after":{"id":106,"ts":6000,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}{"before":null,"after":{"id":107,"ts":7000,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}{"before":null,"after":{"id":108,"ts":8000,"name":"jacket","description":"water resistent black wind breaker","weight":0.10000000149011612},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}{"before":null,"after":{"id":109,"ts":9000,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}{"before":{"id":106,"ts":6000,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"after":{"id":106,"ts":10000,"name":"hammer","description":"18oz carpenter hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589361987000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":362,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589361987936,"transaction":null}{"before":{"id":107,"ts":7000,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"after":{"id":107,"ts":11000,"name":"rocks","description":"box of assorted rocks","weight":5.099999904632568},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362099000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362099505,"transaction":null}{"before":null,"after":{"id":110,"ts":12000,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362210000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1068,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362210230,"transaction":null}{"before":null,"after":{"id":111,"ts":13000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362243000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1394,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362243428,"transaction":null}{"before":{"id":110,"ts":12000,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"after":{"id":110,"ts":14000,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362293000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1707,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362293539,"transaction":null}{"before":{"id":111,"ts":13000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"after":{"id":111,"ts":15000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362330000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2090,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362330904,"transaction":null}{"before":{"id":111,"ts":16000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"after":null,"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362344000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2443,"row":0,"thread":2,"query":null},"op":"d","ts_ms":1589362344455,"transaction":null}
Flink SQL> CREATE TABLE debezium_source(id INT NOT NULL,ts BIGINT,name STRING,description STRING,weight DOUBLE) WITH ('connector' = 'filesystem','path' = '/Users/chenyuzhao/workspace/hudi-demo/source.data','format' = 'debezium-json');[INFO] Execute statement succeed.
Flink SQL> select * from debezium_source;+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+| op | id | ts | name | description | weight |+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+| +I | 101 | 1000 | scooter | Small 2-wheel scooter | 3.140000104904175 || +I | 102 | 2000 | car battery | 12V car battery | 8.100000381469727 || +I | 103 | 3000 | 12-pack drill bits | 12-pack of drill bits with ... | 0.800000011920929 || +I | 104 | 4000 | hammer | 12oz carpenter's hammer | 0.75 || +I | 105 | 5000 | hammer | 14oz carpenter's hammer | 0.875 || +I | 106 | 6000 | hammer | 16oz carpenter's hammer | 1.0 || +I | 107 | 7000 | rocks | box of assorted rocks | 5.300000190734863 || +I | 108 | 8000 | jacket | water resistent black wind ... | 0.10000000149011612 || +I | 109 | 9000 | spare tire | 24 inch spare tire | 22.200000762939453 || -U | 106 | 6000 | hammer | 16oz carpenter's hammer | 1.0 || +U | 106 | 10000 | hammer | 18oz carpenter hammer | 1.0 || -U | 107 | 7000 | rocks | box of assorted rocks | 5.300000190734863 || +U | 107 | 11000 | rocks | box of assorted rocks | 5.099999904632568 || +I | 110 | 12000 | jacket | water resistent white wind ... | 0.20000000298023224 || +I | 111 | 13000 | scooter | Big 2-wheel scooter | 5.179999828338623 || -U | 110 | 12000 | jacket | water resistent white wind ... | 0.20000000298023224 || +U | 110 | 14000 | jacket | new water resistent white w... | 0.5 || -U | 111 | 13000 | scooter | Big 2-wheel scooter | 5.179999828338623 || +U | 111 | 15000 | scooter | Big 2-wheel scooter | 5.170000076293945 || -D | 111 | 16000 | scooter | Big 2-wheel scooter | 5.170000076293945 |+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+Received a total of 20 rows
創(chuàng)建 Hudi 表,這里設(shè)置表的形態(tài)為 MERGE_ON_READ 并且打開(kāi) changelog 模式屬性 changelog.enabled:
Flink SQL> CREATE TABLE hoodie_table(id INT NOT NULL PRIMARY KEY NOT ENFORCED,ts BIGINT,name STRING,description STRING,weight DOUBLE) WITH ('connector' = 'hudi','path' = '/Users/chenyuzhao/workspace/hudi-demo/t1','table.type' = 'MERGE_ON_READ','changelog.enabled' = 'true','compaction.async.enabled' = 'false');[INFO] Execute statement succeed.
查詢(xún)
Flink SQL> select * from hoodie_table/*+ OPTIONS('read.streaming.enabled'='true')*/;+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+| op | id | ts | name | description | weight |+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+| +I | 101 | 1000 | scooter | Small 2-wheel scooter | 3.140000104904175 || +I | 102 | 2000 | car battery | 12V car battery | 8.100000381469727 || +I | 103 | 3000 | 12-pack drill bits | 12-pack of drill bits with ... | 0.800000011920929 || +I | 104 | 4000 | hammer | 12oz carpenter's hammer | 0.75 || +I | 105 | 5000 | hammer | 14oz carpenter's hammer | 0.875 || +I | 106 | 6000 | hammer | 16oz carpenter's hammer | 1.0 || +I | 107 | 7000 | rocks | box of assorted rocks | 5.300000190734863 || +I | 108 | 8000 | jacket | water resistent black wind ... | 0.10000000149011612 || +I | 109 | 9000 | spare tire | 24 inch spare tire | 22.200000762939453 || -U | 106 | 6000 | hammer | 16oz carpenter's hammer | 1.0 || +U | 106 | 10000 | hammer | 18oz carpenter hammer | 1.0 || -U | 107 | 7000 | rocks | box of assorted rocks | 5.300000190734863 || +U | 107 | 11000 | rocks | box of assorted rocks | 5.099999904632568 || +I | 110 | 12000 | jacket | water resistent white wind ... | 0.20000000298023224 || +I | 111 | 13000 | scooter | Big 2-wheel scooter | 5.179999828338623 || -U | 110 | 12000 | jacket | water resistent white wind ... | 0.20000000298023224 || +U | 110 | 14000 | jacket | new water resistent white w... | 0.5 || -U | 111 | 13000 | scooter | Big 2-wheel scooter | 5.179999828338623 || +U | 111 | 15000 | scooter | Big 2-wheel scooter | 5.170000076293945 || -D | 111 | 16000 | scooter | Big 2-wheel scooter | 5.170000076293945 |
Flink SQL> select * from hoodie_table;2021-08-20 20:51:25,052 INFO org.apache.hadoop.conf.Configuration.deprecation [] - mapred.job.map.memory.mb is deprecated. Instead, use mapreduce.map.memory.mb+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+| op | id | ts | name | description | weight |+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+| +U | 110 | 14000 | jacket | new water resistent white w... | 0.5 || +I | 101 | 1000 | scooter | Small 2-wheel scooter | 3.140000104904175 || +I | 102 | 2000 | car battery | 12V car battery | 8.100000381469727 || +I | 103 | 3000 | 12-pack drill bits | 12-pack of drill bits with ... | 0.800000011920929 || +I | 104 | 4000 | hammer | 12oz carpenter's hammer | 0.75 || +I | 105 | 5000 | hammer | 14oz carpenter's hammer | 0.875 || +U | 106 | 10000 | hammer | 18oz carpenter hammer | 1.0 || +U | 107 | 11000 | rocks | box of assorted rocks | 5.099999904632568 || +I | 108 | 8000 | jacket | water resistent black wind ... | 0.10000000149011612 || +I | 109 | 9000 | spare tire | 24 inch spare tire | 22.200000762939453 |+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+Received a total of 10 rows
聚合
Bounded Source 讀模式下計(jì)算 count(*):
Flink SQL> select count (*) from hoodie_table;+----+----------------------+| op | EXPR$0 |+----+----------------------+| +I | 1 || -U | 1 || +U | 2 || -U | 2 || +U | 3 || -U | 3 || +U | 4 || -U | 4 || +U | 5 || -U | 5 || +U | 6 || -U | 6 || +U | 7 || -U | 7 || +U | 8 || -U | 8 || +U | 9 || -U | 9 || +U | 10 |+----+----------------------+Received a total of 19 rows
Streaming 讀模式下計(jì)算 count(*):
Flink SQL> select count (*) from hoodie_table/*+OPTIONS('read.streaming.enabled'='true')*/;+----+----------------------+| op | EXPR$0 |+----+----------------------+| +I | 1 || -U | 1 || +U | 2 || -U | 2 || +U | 3 || -U | 3 || +U | 4 || -U | 4 || +U | 5 || -U | 5 || +U | 6 || -U | 6 || +U | 7 || -U | 7 || +U | 8 || -U | 8 || +U | 9 || -U | 9 || +U | 8 || -U | 8 || +U | 9 || -U | 9 || +U | 8 || -U | 8 || +U | 9 || -U | 9 || +U | 10 || -U | 10 || +U | 11 || -U | 11 || +U | 10 || -U | 10 || +U | 11 || -U | 11 || +U | 10 || -U | 10 || +U | 11 || -U | 11 || +U | 10 |
伴隨著海量數(shù)據(jù)的沖擊,數(shù)據(jù)處理分析能力在業(yè)務(wù)中的價(jià)值與日俱增,各行各業(yè)對(duì)于數(shù)據(jù)處理時(shí)效性的探索也在不斷深入,作為主打?qū)崟r(shí)計(jì)算的計(jì)算引擎 - Apache Flink 應(yīng)運(yùn)而生。
為給行業(yè)帶來(lái)更多實(shí)時(shí)計(jì)算賦能實(shí)踐的思路,鼓勵(lì)廣大熱愛(ài)技術(shù)的開(kāi)發(fā)者加深對(duì) Flink 的掌握,Apache Flink 社區(qū)聯(lián)手阿里云、英特爾、阿里巴巴人工智能治理與可持續(xù)發(fā)展實(shí)驗(yàn)室 (AAIG)、Occlum 聯(lián)合舉辦 "第三屆 Apache Flink 極客挑戰(zhàn)賽暨 AAIG CUP" 活動(dòng),即日起正式啟動(dòng)。

▼ 掃描二維碼,了解更多賽事信息 ▼
戳我,了解 Flink 挑戰(zhàn)賽信息~
