Flink+hudi 構(gòu)架滄湖一體化解決方案
本文是對(duì)前文《如何用Flink整合hudi,構(gòu)架滄湖一體化解決方案》的一些問(wèn)題更正,未添加新內(nèi)容,請(qǐng)讀者酌情閱讀。
更新說(shuō)明:
修正章節(jié) 《版本搭配》中的使用建議 修改章節(jié) 《Flink on hudi》 內(nèi)的配置依賴(lài)描述,現(xiàn)在使用Flink cdc 2.0 對(duì)章節(jié)《添加依賴(lài)》添加配圖 更新文章排版 最后,感謝社區(qū)大佬 玉兆 同學(xué)的細(xì)心指導(dǎo)
在《如何利用 Flink CDC 實(shí)現(xiàn)數(shù)據(jù)增量備份到 Clickhouse》里,我們介紹了如何cdc到ck,今天我們依舊使用前文的案例,來(lái)sink到hudi,那么我們開(kāi)始吧。
hudi
簡(jiǎn)介
Apache Hudi(發(fā)音為“Hoodie”)在DFS的數(shù)據(jù)集上提供以下流原語(yǔ)
插入更新 (如何改變數(shù)據(jù)集?) 增量拉取 (如何獲取變更的數(shù)據(jù)?)
Hudi維護(hù)在數(shù)據(jù)集上執(zhí)行的所有操作的時(shí)間軸(timeline),以提供數(shù)據(jù)集的即時(shí)視圖。Hudi將數(shù)據(jù)集組織到與Hive表非常相似的基本路徑下的目錄結(jié)構(gòu)中。數(shù)據(jù)集分為多個(gè)分區(qū),文件夾包含該分區(qū)的文件。每個(gè)分區(qū)均由相對(duì)于基本路徑的分區(qū)路徑唯一標(biāo)識(shí)。
分區(qū)記錄會(huì)被分配到多個(gè)文件。每個(gè)文件都有一個(gè)唯一的文件ID和生成該文件的提交(commit)。如果有更新,則多個(gè)文件共享相同的文件ID,但寫(xiě)入時(shí)的提交(commit)不同。
存儲(chǔ)類(lèi)型–處理數(shù)據(jù)的存儲(chǔ)方式
寫(xiě)時(shí)復(fù)制
純列式
創(chuàng)建新版本的文件
讀時(shí)合并
近實(shí)時(shí)
視圖–處理數(shù)據(jù)的讀取方式
讀取優(yōu)化視圖-輸入格式僅選擇壓縮的列式文件
parquet文件查詢(xún)性能
500 GB的延遲時(shí)間約為30分鐘
導(dǎo)入現(xiàn)有的Hive表
近實(shí)時(shí)視圖
混合、格式化數(shù)據(jù)
約1-5分鐘的延遲
提供近實(shí)時(shí)表
增量視圖
數(shù)據(jù)集的變更
啟用增量拉取
Hudi存儲(chǔ)層由三個(gè)不同的部分組成
元數(shù)據(jù)–它以時(shí)間軸的形式維護(hù)了在數(shù)據(jù)集上執(zhí)行的所有操作的元數(shù)據(jù),該時(shí)間軸允許將數(shù)據(jù)集的即時(shí)視圖存儲(chǔ)在基本路徑的元數(shù)據(jù)目錄下。時(shí)間軸上的操作類(lèi)型包括
提交(commit),一次提交表示將一批記錄原子寫(xiě)入數(shù)據(jù)集中的過(guò)程。單調(diào)遞增的時(shí)間戳,提交表示寫(xiě)操作的開(kāi)始。
清理(clean),清理數(shù)據(jù)集中不再被查詢(xún)中使用的文件的較舊版本。
壓縮(compaction),將行式文件轉(zhuǎn)化為列式文件的動(dòng)作。
索引,將傳入的記錄鍵快速映射到文件(如果已存在記錄鍵)。索引實(shí)現(xiàn)是可插拔的,Bloom過(guò)濾器-由于不依賴(lài)任何外部系統(tǒng),因此它是默認(rèn)配置,索引和數(shù)據(jù)始終保持一致。Apache HBase-對(duì)少量key更高效。在索引標(biāo)記過(guò)程中可能會(huì)節(jié)省幾秒鐘。
數(shù)據(jù),Hudi以?xún)煞N不同的存儲(chǔ)格式存儲(chǔ)數(shù)據(jù)。實(shí)際使用的格式是可插入的,但要求具有以下特征–讀優(yōu)化的列存儲(chǔ)格式(ROFormat),默認(rèn)值為Apache Parquet;寫(xiě)優(yōu)化的基于行的存儲(chǔ)格式(WOFormat),默認(rèn)值為Apache Avro。

為什么Hudi對(duì)于大規(guī)模和近實(shí)時(shí)應(yīng)用很重要?
Hudi解決了以下限制
HDFS的可伸縮性限制
需要在Hadoop中更快地呈現(xiàn)數(shù)據(jù)
沒(méi)有直接支持對(duì)現(xiàn)有數(shù)據(jù)的更新和刪除
快速的ETL和建模
要檢索所有更新的記錄,無(wú)論這些更新是添加到最近日期分區(qū)的新記錄還是對(duì)舊數(shù)據(jù)的更新,Hudi都允許用戶(hù)使用最后一個(gè)檢查點(diǎn)時(shí)間戳。此過(guò)程不用執(zhí)行掃描整個(gè)源表的查詢(xún)
Hudi的優(yōu)勢(shì)
HDFS中的可伸縮性限制。 Hadoop中數(shù)據(jù)的快速呈現(xiàn) 支持對(duì)于現(xiàn)有數(shù)據(jù)的更新和刪除 快速的ETL和建模
(以上內(nèi)容主要引用于:《Apache Hudi 詳解》)
新架構(gòu)與湖倉(cāng)一體
通過(guò)湖倉(cāng)一體、流批一體,準(zhǔn)實(shí)時(shí)場(chǎng)景下做到了:數(shù)據(jù)同源、同計(jì)算引擎、同存儲(chǔ)、同計(jì)算口徑。數(shù)據(jù)的時(shí)效性可以到分鐘級(jí),能很好的滿(mǎn)足業(yè)務(wù)準(zhǔn)實(shí)時(shí)數(shù)倉(cāng)的需求。下面是架構(gòu)圖:

MySQL 數(shù)據(jù)通過(guò) Flink CDC 進(jìn)入到 Kafka。之所以數(shù)據(jù)先入 Kafka 而不是直接入 Hudi,是為了實(shí)現(xiàn)多個(gè)實(shí)時(shí)任務(wù)復(fù)用 MySQL 過(guò)來(lái)的數(shù)據(jù),避免多個(gè)任務(wù)通過(guò) Flink CDC 接 MySQL 表以及 Binlog,對(duì) MySQL 庫(kù)的性能造成影響。
通過(guò) CDC 進(jìn)入到 Kafka 的數(shù)據(jù)除了落一份到離線(xiàn)數(shù)據(jù)倉(cāng)庫(kù)的 ODS 層之外,會(huì)同時(shí)按照實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)的鏈路,從 ODS->DWD->DWS->OLAP 數(shù)據(jù)庫(kù),最后供報(bào)表等數(shù)據(jù)服務(wù)使用。實(shí)時(shí)數(shù)倉(cāng)的每一層結(jié)果數(shù)據(jù)會(huì)準(zhǔn)實(shí)時(shí)的落一份到離線(xiàn)數(shù)倉(cāng),通過(guò)這種方式做到程序一次開(kāi)發(fā)、指標(biāo)口徑統(tǒng)一,數(shù)據(jù)統(tǒng)一。
從架構(gòu)圖上,可以看到有一步數(shù)據(jù)修正 (重跑歷史數(shù)據(jù)) 的動(dòng)作,之所以有這一步是考慮到:有可能存在由于口徑調(diào)整或者前一天的實(shí)時(shí)任務(wù)計(jì)算結(jié)果錯(cuò)誤,導(dǎo)致重跑歷史數(shù)據(jù)的情況。
而存儲(chǔ)在 Kafka 的數(shù)據(jù)有失效時(shí)間,不會(huì)存太久的歷史數(shù)據(jù),重跑很久的歷史數(shù)據(jù)無(wú)法從 Kafka 中獲取歷史源數(shù)據(jù)。再者,如果把大量的歷史數(shù)據(jù)再一次推到 Kafka,走實(shí)時(shí)計(jì)算的鏈路來(lái)修正歷史數(shù)據(jù),可能會(huì)影響當(dāng)天的實(shí)時(shí)作業(yè)。所以針對(duì)重跑歷史數(shù)據(jù),會(huì)通過(guò)數(shù)據(jù)修正這一步來(lái)處理。
總體上說(shuō),這個(gè)架構(gòu)屬于 Lambda 和 Kappa 混搭的架構(gòu)。流批一體數(shù)據(jù)倉(cāng)庫(kù)的各個(gè)數(shù)據(jù)鏈路有數(shù)據(jù)質(zhì)量校驗(yàn)的流程。第二天對(duì)前一天的數(shù)據(jù)進(jìn)行對(duì)賬,如果前一天實(shí)時(shí)計(jì)算的數(shù)據(jù)無(wú)異常,則不需要修正數(shù)據(jù),Kappa 架構(gòu)已經(jīng)足夠。
(本節(jié)內(nèi)容,引用自:《37 手游基于 Flink CDC + Hudi 湖倉(cāng)一體方案實(shí)踐》)
最佳實(shí)踐
版本搭配
版本選擇,這個(gè)問(wèn)題可能會(huì)成為困擾大家的第一個(gè)絆腳石,下面是hudi中文社區(qū)推薦的版本適配:
| flink | hudi |
|---|---|
| 1.12.2 | 0.9.0 |
| 1.13.1 | 0.10.0 |
建議用 hudi master +flink 1.13 這樣可以和 cdc connector 更好地適配。
下載hudi
https://mvnrepository.com/artifact/org.apache.hudi/hudi-flink-bundle
目前maven中央倉(cāng)庫(kù),最新版本是0.9.0 ,如果需要下載0.10.0版本 , 可以加入社區(qū)群,在共享文件中下載,也可以下載源碼自行編譯。
執(zhí)行
如果將 hudi-flink-bundle_2.11-0.10.0.jar 放到了 flink/lib 下,則只需要如下執(zhí)行即可,否則會(huì)出現(xiàn)各種找不到類(lèi)的異常
bin/sql-client.sh embedded
Flink on hudi
新建maven工程,修改pom如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink_hudi_test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.1</flink.version>
<hudi.version>0.10.0</hudi.version>
<hadoop.version>2.10.1</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink-bundle_2.11</artifactId>
<version>${hudi.version}</version>
<scope>system</scope>
<systemPath>${project.basedir}/libs/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar</systemPath>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
</dependencies>
</project>
我們通過(guò)構(gòu)建查詢(xún)insert into t2 select replace(uuid(),'-',''),id,name,description,now() from mysql_binlog 將創(chuàng)建的mysql表,插入到hudi里。
package name.lijiaqi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class MysqlToHudiExample {
public static void main(String[] args) throws Exception {
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
// 數(shù)據(jù)源表
String sourceDDL =
"CREATE TABLE mysql_binlog (\n" +
" id INT NOT NULL,\n" +
" name STRING,\n" +
" description STRING\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://127.0.0.1:3306/test', \n"+
" 'driver' = 'com.mysql.jdbc.Driver', \n"+
" 'username' = 'root',\n" +
" 'password' = 'dafei1288', \n" +
" 'table-name' = 'test_cdc'\n" +
")";
// 輸出目標(biāo)表
String sinkDDL =
"CREATE TABLE t2(\n" +
"\tuuid VARCHAR(20),\n"+
"\tid INT NOT NULL,\n" +
"\tname VARCHAR(40),\n" +
"\tdescription VARCHAR(40),\n" +
"\tts TIMESTAMP(3)\n"+
// "\t`partition` VARCHAR(20)\n" +
")\n" +
// "PARTITIONED BY (`partition`)\n" +
"WITH (\n" +
"\t'connector' = 'hudi',\n" +
"\t'path' = 'hdfs://172.19.28.4:9000/hudi_t4/',\n" +
"\t'table.type' = 'MERGE_ON_READ'\n" +
")" ;
// 簡(jiǎn)單的聚合處理
String transformSQL =
"insert into t2 select replace(uuid(),'-',''),id,name,description,now() from mysql_binlog";
tableEnv.executeSql(sourceDDL);
tableEnv.executeSql(sinkDDL);
TableResult result = tableEnv.executeSql(transformSQL);
result.print();
env.execute("mysql-to-hudi");
}
}
查詢(xún)hudi
package name.lijiaqi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class ReadHudi {
public static void main(String[] args) throws Exception {
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
String sourceDDL =
"CREATE TABLE t2(\n" +
"\tuuid VARCHAR(20),\n"+
"\tid INT NOT NULL,\n" +
"\tname VARCHAR(40),\n" +
"\tdescription VARCHAR(40),\n" +
"\tts TIMESTAMP(3)\n"+
// "\t`partition` VARCHAR(20)\n" +
")\n" +
// "PARTITIONED BY (`partition`)\n" +
"WITH (\n" +
"\t'connector' = 'hudi',\n" +
"\t'path' = 'hdfs://172.19.28.4:9000/hudi_t4/',\n" +
"\t'table.type' = 'MERGE_ON_READ'\n" +
")" ;
tableEnv.executeSql(sourceDDL);
TableResult result2 = tableEnv.executeSql("select * from t2");
result2.print();
env.execute("read_hudi");
}
}
展示結(jié)果

Flink CDC 2.0 on Hudi
上一章節(jié),我們使用代碼形式構(gòu)建實(shí)驗(yàn),在本章節(jié)里,我們直接使用官網(wǎng)下載的flink包來(lái)構(gòu)建實(shí)驗(yàn)環(huán)境。
添加依賴(lài)
添加如下依賴(lài)到$FLINK_HOME/lib下
hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar (修改 Master 分支的 Hudi Flink 版本為 1.13.2 然后構(gòu)建) hadoop-mapreduce-client-core-2.7.3.jar (解決 Hudi ClassNotFoundException) flink-sql-connector-mysql-cdc-2.0.0.jar flink-format-changelog-json-2.0.0.jar flink-sql-connector-kafka_2.11-1.13.2.jar
注意,在尋找jar的時(shí)候,cdc 2.0 更新過(guò)group id ,不再試 com.alibaba.ververica 而是改成了 com.ververica

flink sql cdc on hudi
創(chuàng)建mysql cdc表
CREATE TABLE mysql_users (
id BIGINT PRIMARY KEY NOT ENFORCED ,
name STRING,
birthday TIMESTAMP(3),
ts TIMESTAMP(3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'dafei1288',
'server-time-zone' = 'Asia/Shanghai',
'database-name' = 'test',
'table-name' = 'users'
);
創(chuàng)建hudi表
CREATE TABLE hudi_users5(
id BIGINT PRIMARY KEY NOT ENFORCED,
name STRING,
birthday TIMESTAMP(3),
ts TIMESTAMP(3),
`partition` VARCHAR(20)
) PARTITIONED BY (`partition`) WITH (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ',
'path' = 'hdfs://localhost:9009/hudi/hudi_users5'
);
修改配置,讓查詢(xún)模式輸出為表,設(shè)置checkpoint
set execution.result-mode=tableau;
set execution.checkpointing.interval=10sec;
進(jìn)行輸入導(dǎo)入
INSERT INTO hudi_users5(id,name,birthday,ts,
partition) SELECT id,name,birthday,ts,DATE_FORMAT(birthday, 'yyyyMMdd') FROM mysql_users;
查詢(xún)數(shù)據(jù)
select * from hudi_users5;
執(zhí)行結(jié)果

卡執(zhí)行計(jì)劃

這個(gè)問(wèn)題坑了我好幾天,一度都打算放棄hudi了,表面上很正常,日志也沒(méi)有任何報(bào)錯(cuò),也可以看出來(lái)cdc起作用了,有數(shù)據(jù)寫(xiě)入,但是就是卡在 hoodie_stream_write 上一動(dòng)不動(dòng)了,沒(méi)有數(shù)據(jù)下發(fā)了。感謝社區(qū)大佬 Danny Chan 的提點(diǎn),可能是 checkpoint的問(wèn)題,于是做了設(shè)置
set execution.checkpointing.interval=10sec;
終于正常了

致此,flink + hudi 倉(cāng)湖一體化方案的原型構(gòu)建完成,感謝大家看到這里,如果對(duì)你有點(diǎn)點(diǎn)幫助的話(huà),希望點(diǎn)個(gè)關(guān)注,轉(zhuǎn)發(fā)。您的舉手之勞,會(huì)對(duì)我非常有幫助,非常感謝。
參考鏈接
https://blog.csdn.net/weixin_49218925/article/details/115511022
https://blog.csdn.net/qq_37095882/article/details/103714548
https://mp.weixin.qq.com/s/xoucbJxzO2Zkq_b2_WDUbA
