Flink 實踐 | Flink CDC + Hudi + Hive + Presto 構(gòu)建實時數(shù)據(jù)湖最佳實踐

1. 測試過程環(huán)境版本說明
Flink1.13.1
Scala2.11
CDH6.2.0
Hadoop3.0.0
Hive2.1.1
Hudi0.10(master)
PrestoDB0.256
Mysql5.7
2. 集群服務(wù)器基礎(chǔ)環(huán)境
2.1 Maven和JDK環(huán)境版本

2.2 Hadoop 集群環(huán)境版本

2.3 HADOOP環(huán)境變量配置
export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoopexport HADOOP_CALSSPATH=`$HADOOP_HOME/bin/hadoop classpath`
3. Hudi編譯環(huán)境配置
3.1 Maven Home settings.xml配置修改
說明:指定aliyun maven地址(支持CDH cloudera依賴) mirror庫

alimaven central,!cloudera aliyun maven http://maven.aliyun.com/nexus/content/groups/public/
3.2 下載Hudi源碼包
git clone https://github.com/apache/hudi.git
Hudi社區(qū)建議版本適配
hudi0.9 適配 flink1.12.2
hudi0.10(master) 適配 flink1.13.X ( 說明master分支上版本還未release)
3.3 Hudi 客戶端命令行

3.4 修改Hudi集成flink和Hive編譯依賴版本配置
hudi-master/packaging/hudi-flink-bundle

pom.xml文件 ( 筆者環(huán)境CDH6.2.0 hive2.1.1)

flink-bundle-shade-hive2 2.1.1-cdh6.2.0 compile ${hive.groupid} hive-service-rpc ${hive.version} ${flink.bundle.hive.scope}
3.5 編譯Hudi 指定Hadoop和Hive版本信息
mvn clean install -DskipTests-Drat.skip=true-Dscala-2.11-Dhadoop.version=3.0.0-Pflink-bundle-shade-hive2(可加 –e –X 參數(shù)查看編譯ERROR異常和DEBUG信息)
說明:默認(rèn)scala2.11、默認(rèn)不包含hive依賴

首次編譯耗時較長 筆者首次編譯大概花費 50min+(也和服務(wù)器網(wǎng)絡(luò)有關(guān))
后續(xù)編譯會快一些 大約15min左右
3.6 Hudi編譯異常


修改Hudi master pom.xml 增加 CDH repository地址

3.7 Hudi重新編譯

3.8 Hudi編譯結(jié)果說明
hudi-master/packaging/hudi-flink-bundle/target

hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar
說明:hudi-flink-bundle jar 是 flink 用來寫入和讀取數(shù)據(jù)
hudi-master/packaging/hudi-hadoop-mr-bundle/target

hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar
說明:hudi-mr-bundle jar 是 hive 需要用來讀hudi數(shù)據(jù)
4. Flink環(huán)境配置
版本說明:Flink 1.13.1 scala2.11版本
4.1 FLINK_HOME 下 sql-client-defaults.yaml 配置

4.2 flink-conf.yaml配置修改


# state.backend: filesystemstate.backend: rocksdb# 開啟增量checkpointstate.backend.incremental: true# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpointsstate.checkpoints.dir: hdfs://nameservice/flink/flink-checkpointsclassloader.check-leaked-classloader: falseclassloader.resolve-order: parent-first
4.3 FLINK_HOME lib下添加依賴

flink-sql-connector-mysql-cdc-1.4.0.jarflink-sql-connector-oracle-cdc-2.1-SNAPSHOT.jar.BAK – oracle cdc 依賴flink-format-changelog-json-1.4.0.jarflink-sql-connector-kafka_2.11-1.13.1.jar--- Hadoop home lib下copy過來hadoop-mapreduce-client-common-3.0.0-cdh6.2.0.jarhadoop-mapreduce-client-core-3.0.0-cdh6.2.0.jarhadoop-mapreduce-client-jobclient-3.0.0-cdh6.2.0.jar--- hudi編譯jar copy過來hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar
說明:目前oracle cdc jar和mysql cdc jar一起在lib下發(fā)現(xiàn)有沖突異常
5 啟動flink yarn session服務(wù)
5.1 FLINK_HOME shell 命令
$FLINK_HOME/bin/yarn-session.sh -s 2-jm 2048-tm 2048-nm ys-hudi01 -d
5.2 Yarn Web UI

5.3 Flinksql Client 啟動命令
$FLINK_HOME/bin/sql-client.sh embedded -j ./lib/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar shell說明:-j指定hudi-flink 依賴jar

Show table /show catalogs

6. MySQL binlog 開啟配置
6.1 創(chuàng)建binlog日志存儲路徑
mkdir logs6.2 修改目錄屬主和group
chown -R mysql:mysql /mysqldata/logs6.3 修改mysql配置信息
vim /etc/my.cnfserver-id=2log-bin= /mysqldata/logs/mysql-binbinlog_format=rowexpire_logs_days=15binlog_row_image=full
6.4 修改完,重啟mysql server
service mysqld restart6.5 客戶端查看binlog日志情況
show master logs;

Mysql 版本:5.7.30

5.6 創(chuàng)建mysql sources 表 DDL
create table users_cdc(id bigint auto_increment primary key,name varchar(20) null,birthday timestamp default CURRENT_TIMESTAMP notnull,ts timestamp default CURRENT_TIMESTAMP notnull);

7. FlinkCDC sink Hudi測試代碼過程
7.1 Flink sql cdc DDL 語句:(具體參數(shù)說明可參考flink官網(wǎng))
CREATE TABLE mysql_users (id BIGINT PRIMARY KEY NOT ENFORCED ,name STRING,birthday TIMESTAMP(3),ts TIMESTAMP(3)) WITH ('connector'= 'mysql-cdc','hostname'= '127.0.0.1','port'= '3306','username'= '','password'=’’,'server-time-zone'= 'Asia/Shanghai','debezium.snapshot.mode'='initial','database-name'= 'luo','table-name'= 'users_cdc');

7.2 查詢mysql cdc 表
Flink SQL> select * from mysql_users;

由于目前MySQL users_cdc表是空,所以flinksql 查詢沒有數(shù)據(jù) 只有表結(jié)構(gòu);

Flink web UI:


7.3 創(chuàng)建一個臨時視圖,增加分區(qū)列 方便后續(xù)同步hive分區(qū)表
Flink SQL> create view mycdc_v AS SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as?partition?FROM mysql_users;
說明:partition 關(guān)鍵字需要 `` 引起來

查詢視圖數(shù)據(jù)也是空結(jié)構(gòu),但增加了分區(qū)字段:
Flink SQL> select * from mycdc_v;


Flink web UI:

7.4 設(shè)置checkpoint間隔時間,存儲路徑已在flink-conf配置設(shè)置全局路徑
建議:測試環(huán)境 可設(shè)置秒級別(不能太小),生產(chǎn)環(huán)境可設(shè)置分鐘級別。
Flink SQL> set execution.checkpointing.interval=30sec;

7.5 Flinksql 創(chuàng)建 cdc sink hudi文件,并自動同步hive分區(qū)表DDL 語句
CREATE TABLE mysqlcdc_sync_hive01(id bigint ,name string,birthday TIMESTAMP(3),ts TIMESTAMP(3),`partition` VARCHAR(20),primary key(id) not enforced --必須指定uuid 主鍵)PARTITIONED BY (`partition`)with('connector'='hudi','path'= 'hdfs://nameservice /luo/hudi/mysqlcdc_sync_hive01', 'hoodie.datasource.write.recordkey.field'= 'id'-- 主鍵, 'write.precombine.field'= 'ts'-- 自動precombine的字段, 'write.tasks'= '1', 'compaction.tasks'= '1', 'write.rate.limit'= '2000'-- 限速, 'table.type'= 'MERGE_ON_READ'-- 默認(rèn)COPY_ON_WRITE,可選MERGE_ON_READ, 'compaction.async.enabled'= 'true'-- 是否開啟異步壓縮, 'compaction.trigger.strategy'= 'num_commits'-- 按次數(shù)壓縮, 'compaction.delta_commits'= '1'-- 默認(rèn)為5, 'changelog.enabled'= 'true'-- 開啟changelog變更, 'read.streaming.enabled'= 'true'-- 開啟流讀, 'read.streaming.check-interval'= '3'-- 檢查間隔,默認(rèn)60s, 'hive_sync.enable'= 'true'-- 開啟自動同步hive, 'hive_sync.mode'= 'hms'-- 自動同步hive模式,默認(rèn)jdbc模式, 'hive_sync.metastore.uris'= 'thrift://hadoop:9083'-- hive metastore地址-- , 'hive_sync.jdbc_url'= 'jdbc:hive2://hadoop:10000'-- hiveServer地址, 'hive_sync.table'= 'mysqlcdc_sync_hive01'-- hive 新建表名, 'hive_sync.db'= 'luo'-- hive 新建數(shù)據(jù)庫名, 'hive_sync.username'= ''-- HMS 用戶名, 'hive_sync.password'= ''-- HMS 密碼, 'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp類型);
說明:Hudi目前支持MOR和COW兩種模式
(1) Copy on Write:使用列式存儲來存儲數(shù)據(jù)(例如:parquet),通過在寫入期間執(zhí)行同步合并來簡單地更新和重現(xiàn)文件
(2) Merge on Read:使用列式存儲(parquet)+行式文件(arvo)組合存儲數(shù)據(jù)。更新記錄到增量文件中,然后進行同步或異步壓縮來生成新版本的列式文件。
COW:Copy on Write (寫時復(fù)制),快照查詢+增量查詢
MOR:Merge on Read (讀時合并),快照查詢+增量查詢+讀取優(yōu)化查詢(近實時)
使用場景上:
(1)COW適用寫少讀多的場景 ,MOR 適用寫多讀少的場景;
(2)MOR適合CDC場景,更新延遲要求較低,COW目前不支持 changelog mode 不適合處理cdc場景;


Flink web UI

7.6 Flink sql mysql cdc數(shù)據(jù)寫入hudi文件數(shù)據(jù)
Flink SQL> insert into mysqlcdc_sync_hive01 select id,name,birthday,ts,partition?from mycdc_v;

Flink web UI DAG圖:

7.7 HDFS上Hudi文件目錄情況


說明:目前還沒寫入測試數(shù)據(jù),hudi目錄只生成一些狀態(tài)標(biāo)記文件,還未生成分區(qū)目錄以及.log 和.parquet數(shù)據(jù)文件,具體含義可見hudi官方文檔。
7.8 Mysql數(shù)據(jù)源寫入測試數(shù)據(jù)
insert into users_cdc (name) values ('cdc01');
7.9 Flinksql 查詢mysql cdc insert數(shù)據(jù):
Flink SQL> set execution.result-mode=tableau;
[WARNING] The specified key 'execution.result-mode' is deprecated. Please use 'sql-client.execution.result-mode' instead.
[INFO] Session property has been set.
Flink SQL> select * from mysql_users; -- 查詢到一條insert數(shù)據(jù)

7.10 Flink web UI頁面可以看到DAG 各個環(huán)節(jié)產(chǎn)生一條測試數(shù)據(jù)

7.11 Flinksql 查詢 sink的hudi表數(shù)據(jù)
Flink SQL> select * from mysqlcdc_sync_hive01; --已查詢到一條insert數(shù)據(jù)

7.12 Hdfs上Hudi文件目錄變化情況

7.13 Hive分區(qū)表和數(shù)據(jù)自動同步情況

7.14 查看自動創(chuàng)建hive表結(jié)構(gòu)
hive> show create table mysqlcdc_sync_hive01_ro;

hive> show create table mysqlcdc_sync_hive01_rt;

7.15 查看自動生成的表分區(qū)信息
hive> show partitions mysqlcdc_sync_hive01_ro;
hive> show partitions mysqlcdc_sync_hive01_rt;

說明:已自動生產(chǎn)hudi MOR模式的
mysqlcdc_sync_hive01_ro
mysqlcdc_sync_hive01_rt
ro表和rt表區(qū)別:
ro 表全稱 read oprimized table,對于 MOR 表同步的 xxx_ro 表,只暴露壓縮后的 parquet。其查詢方式和COW表類似。設(shè)置完 hiveInputFormat 之后 和普通的 Hive 表一樣查詢即可;
rt表示增量視圖,主要針對增量查詢的rt表;
ro表只能查parquet文件數(shù)據(jù), rt表 parquet文件數(shù)據(jù)和log文件數(shù)據(jù)都可查;
7.16 Hive訪問Hudi數(shù)據(jù)
說明:需要引入hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar
引入Hudi依賴jar方式:
(1) 引入到 $HIVE_HOME/lib下;
(2) 引入到$HIVE_HOME/auxlib 自定義第三方依賴 修改 hive-site.xml配置文件;
(3) Hive shell命令行引入 Session級別有效;
其中(1)和(3)配置完后需要重啟 hive-server服務(wù);
查詢Hive 分區(qū)表數(shù)據(jù):
hive> select * from mysqlcdc_sync_hive01_ro; --已查詢到mysq insert的一條數(shù)據(jù)

hive> select * from mysqlcdc_sync_hive01_rt; --已查詢到mysq insert的一條數(shù)據(jù)

Hive 條件查詢:
hive> select name,ts from mysqlcdc_sync_hive01_ro where?partition='20211109';

Hive ro表 count查詢
hive> select count(1) from mysqlcdc_sync_hive01_ro;

Hive Count異常解決:
引入hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar依賴
hive> add jar hdfs://nameservice /luo/hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar;
hive> set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;

hive> select count(1) from mysqlcdc_sync_hive01_ro; --可正常count

Hive rt表 count查詢
hive> select count(1) from mysqlcdc_sync_hive01_rt;

說明:rt 表count 還是異常,和Hudi社區(qū)人員溝通hudi master目前還沒release這塊存在bug正在修復(fù)中
具體見:https://issues.apache.org/jira/browse/HUDI-2649
7.17 Mysql 數(shù)據(jù)源寫入多條測試數(shù)據(jù)
insert into users_cdc (name) values ('cdc02');insert into users_cdc (name) values ('cdc03');insert into users_cdc (name) values ('cdc04');insert into users_cdc (name) values ('cdc05');insert into users_cdc (name) values ('cdc06');

Flink web UI DAG中數(shù)據(jù)鏈路情況:

7.18 Flinksql中新寫入數(shù)據(jù)查詢情況

Yarn web UI?application_1626256835287_40351[1]資源使用情況

Hdfs上Hudi文件目錄變化情況

Hudi狀態(tài)文件說明:
(1)requested:表示一個動作已被安排,但尚未啟動
(2)inflight:表示當(dāng)前正在執(zhí)行操作
(3)completed:表示在時間線上完成了操作
Flink jobmanager log sync hive過程詳細(xì)日志



7.19 Mysql 數(shù)據(jù)源更新數(shù)據(jù)
update users_cdc set name = 'cdc05-bj'where id = 5;
7.20 Flinksql 查詢cdc update數(shù)據(jù) 產(chǎn)生兩條binlog數(shù)據(jù)

說明:flinksql 查詢最終只有一條+I有效數(shù)據(jù),且數(shù)據(jù)已更新
Flink web UI DAG接受到兩條binlog數(shù)據(jù),但最終compact和sink只有一條有效數(shù)據(jù)

7.21 MySQL 數(shù)據(jù)源 delete 一條數(shù)據(jù):
deletefrom users_cdc where id = 3;
Flink Web UI job DAG中捕獲一條新數(shù)據(jù):

Flinksql changlog delete數(shù)據(jù)變化查詢

HDFS上Hudi數(shù)據(jù)文件生成情況


Hudi文件類型說明:
(1)commits: 表示將一批數(shù)據(jù)原子性寫入表中
(2)cleans: 清除表中不在需要的舊版本文件的后臺活動
(3)delta_commit:增量提交是指將一批數(shù)據(jù)原子性寫入MergeOnRead類型的表中,其中部分或者所有數(shù)據(jù)可以寫入增量日志中
(4)compaction: 協(xié)調(diào)hudi中差異數(shù)據(jù)結(jié)構(gòu)的后臺活動,例如:將更新從基于行的日志文件變成列格式。在內(nèi)部,壓縮的表現(xiàn)為時間軸上的特殊提交
(5)rollback:表示提交操作不成功且已經(jīng)回滾,會刪除在寫入過程中產(chǎn)生的數(shù)據(jù)

說明:hudi分區(qū)文件以及.log和.parquet文件都已生成
兩種文件區(qū)別:Hudi會在DFS分布式文件系統(tǒng)上的basepath基本路徑下組織成目錄結(jié)構(gòu)。每張對應(yīng)的表都會成多個分區(qū),這些分區(qū)是包含該分區(qū)的數(shù)據(jù)文件的文件夾,與hive的目錄結(jié)構(gòu)非常相似。在每個分區(qū)內(nèi),文件被組織成文件組,文件id為唯一標(biāo)識。每個文件組包含多個切片,其中每個切片包含在某個提交/壓縮即時時間生成的基本列文件(parquet文件),以及自生成基本文件以來對基本文件的插入/更新的一組日志文件(*.log)。Hudi采用MVCC設(shè)計,其中壓縮操作會將日志和基本文件合并成新的文件片,清理操作會將未使用/較舊的文件片刪除來回收DFS上的空間。
Flink 任務(wù)checkpoint 情況:
設(shè)置30s 一次


7.22 Hive shell查詢數(shù)據(jù)update和delete變化情況:
hive> select * from mysqlcdc_sync_hive01_ro;

hive> select * from mysqlcdc_sync_hive01_rt;

7.23 Hudi Client端操作Hudi表
進入Hudi客戶端命令行:
hudi-master/hudi-cli/hudi-cli.sh
連接Hudi表,查看表信息
hudi->connect --path hdfs://nameservice1/tmp/luo/hudi/mysqlcdc_sync_hive01

查看Hudi commit信息
hudi:mysqlcdc_sync_hive01->commits show --sortBy "CommitTime"

hudi:mysqlcdc_sync_hive01->compactions show all

7.24 PrestoDB 查詢Hive表Hudi數(shù)據(jù)
版本說明:PrestoDB 0.256 DBeaver7.0.4
PrestoDB 集群配置和hive集成參考PrestoDB官網(wǎng)
presto-server-***/etc/catalog/hive.properties 配置hive catalog
可通過 presto-cli 連接 hive metastore 開啟查詢,presto-cli 的設(shè)置參考 presto官方配置;
DBeaver客戶端查詢Hive ro表數(shù)據(jù):

Hive ro表count 正常:

查詢Hive rt表數(shù)據(jù)查詢異常:

Hive rt表count異常:

Presto Web ui:


推薦閱讀
超詳細(xì)步驟!整合Apache Hudi + Flink + CDH
Apache Kyuubi + Hudi在 T3 出行的深度實踐
順豐科技 Hudi on Flink 實時數(shù)倉實踐
