Flink+hudi 構(gòu)架滄湖一體化解決方案
簡介
Apache Hudi(發(fā)音為"Hoodie")在DFS的數(shù)據(jù)集上提供以下流原語
插入更新 (如何改變數(shù)據(jù)集?) 增量拉取 (如何獲取變更的數(shù)據(jù)?)
Hudi維護在數(shù)據(jù)集上執(zhí)行的所有操作的時間軸(timeline),以提供數(shù)據(jù)集的即時視圖。Hudi將數(shù)據(jù)集組織到與Hive表非常相似的基本路徑下的目錄結(jié)構(gòu)中。數(shù)據(jù)集分為多個分區(qū),文件夾包含該分區(qū)的文件。每個分區(qū)均由相對于基本路徑的分區(qū)路徑唯一標(biāo)識。
分區(qū)記錄會被分配到多個文件。每個文件都有一個唯一的文件ID和生成該文件的提交(commit)。如果有更新,則多個文件共享相同的文件ID,但寫入時的提交(commit)不同。
存儲類型–處理數(shù)據(jù)的存儲方式
寫時復(fù)制
純列式
創(chuàng)建新版本的文件
讀時合并
近實時
視圖–處理數(shù)據(jù)的讀取方式
讀取優(yōu)化視圖-輸入格式僅選擇壓縮的列式文件
parquet文件查詢性能
500 GB的延遲時間約為30分鐘
導(dǎo)入現(xiàn)有的Hive表
近實時視圖
混合、格式化數(shù)據(jù)
約1-5分鐘的延遲
提供近實時表
增量視圖
數(shù)據(jù)集的變更
啟用增量拉取
Hudi存儲層由三個不同的部分組成
元數(shù)據(jù)–它以時間軸的形式維護了在數(shù)據(jù)集上執(zhí)行的所有操作的元數(shù)據(jù),該時間軸允許將數(shù)據(jù)集的即時視圖存儲在基本路徑的元數(shù)據(jù)目錄下。時間軸上的操作類型包括
提交(commit),一次提交表示將一批記錄原子寫入數(shù)據(jù)集中的過程。單調(diào)遞增的時間戳,提交表示寫操作的開始。
清理(clean),清理數(shù)據(jù)集中不再被查詢中使用的文件的較舊版本。
壓縮(compaction),將行式文件轉(zhuǎn)化為列式文件的動作。
索引,將傳入的記錄鍵快速映射到文件(如果已存在記錄鍵)。索引實現(xiàn)是可插拔的,Bloom過濾器-由于不依賴任何外部系統(tǒng),因此它是默認配置,索引和數(shù)據(jù)始終保持一致。Apache HBase-對少量key更高效。在索引標(biāo)記過程中可能會節(jié)省幾秒鐘。
數(shù)據(jù),Hudi以兩種不同的存儲格式存儲數(shù)據(jù)。實際使用的格式是可插入的,但要求具有以下特征–讀優(yōu)化的列存儲格式(ROFormat),默認值為Apache Parquet;寫優(yōu)化的基于行的存儲格式(WOFormat),默認值為Apache Avro。

為什么Hudi對于大規(guī)模和近實時應(yīng)用很重要?
Hudi解決了以下限制
HDFS的可伸縮性限制
需要在Hadoop中更快地呈現(xiàn)數(shù)據(jù)
沒有直接支持對現(xiàn)有數(shù)據(jù)的更新和刪除
快速的ETL和建模
要檢索所有更新的記錄,無論這些更新是添加到最近日期分區(qū)的新記錄還是對舊數(shù)據(jù)的更新,Hudi都允許用戶使用最后一個檢查點時間戳。此過程不用執(zhí)行掃描整個源表的查詢
Hudi的優(yōu)勢
HDFS中的可伸縮性限制。 Hadoop中數(shù)據(jù)的快速呈現(xiàn) 支持對于現(xiàn)有數(shù)據(jù)的更新和刪除 快速的ETL和建模
新架構(gòu)與湖倉一體
通過湖倉一體、流批一體,準(zhǔn)實時場景下做到了:數(shù)據(jù)同源、同計算引擎、同存儲、同計算口徑。數(shù)據(jù)的時效性可以到分鐘級,能很好的滿足業(yè)務(wù)準(zhǔn)實時數(shù)倉的需求。下面是架構(gòu)圖:

MySQL 數(shù)據(jù)通過 Flink CDC 進入到 Kafka。之所以數(shù)據(jù)先入 Kafka 而不是直接入 Hudi,是為了實現(xiàn)多個實時任務(wù)復(fù)用 MySQL 過來的數(shù)據(jù),避免多個任務(wù)通過 Flink CDC 接 MySQL 表以及 Binlog,對 MySQL 庫的性能造成影響。
通過 CDC 進入到 Kafka 的數(shù)據(jù)除了落一份到離線數(shù)據(jù)倉庫的 ODS 層之外,會同時按照實時數(shù)據(jù)倉庫的鏈路,從 ODS->DWD->DWS->OLAP 數(shù)據(jù)庫,最后供報表等數(shù)據(jù)服務(wù)使用。實時數(shù)倉的每一層結(jié)果數(shù)據(jù)會準(zhǔn)實時的落一份到離線數(shù)倉,通過這種方式做到程序一次開發(fā)、指標(biāo)口徑統(tǒng)一,數(shù)據(jù)統(tǒng)一。
從架構(gòu)圖上,可以看到有一步數(shù)據(jù)修正 (重跑歷史數(shù)據(jù)) 的動作,之所以有這一步是考慮到:有可能存在由于口徑調(diào)整或者前一天的實時任務(wù)計算結(jié)果錯誤,導(dǎo)致重跑歷史數(shù)據(jù)的情況。
而存儲在 Kafka 的數(shù)據(jù)有失效時間,不會存太久的歷史數(shù)據(jù),重跑很久的歷史數(shù)據(jù)無法從 Kafka 中獲取歷史源數(shù)據(jù)。再者,如果把大量的歷史數(shù)據(jù)再一次推到 Kafka,走實時計算的鏈路來修正歷史數(shù)據(jù),可能會影響當(dāng)天的實時作業(yè)。所以針對重跑歷史數(shù)據(jù),會通過數(shù)據(jù)修正這一步來處理。
總體上說,這個架構(gòu)屬于 Lambda 和 Kappa 混搭的架構(gòu)。流批一體數(shù)據(jù)倉庫的各個數(shù)據(jù)鏈路有數(shù)據(jù)質(zhì)量校驗的流程。第二天對前一天的數(shù)據(jù)進行對賬,如果前一天實時計算的數(shù)據(jù)無異常,則不需要修正數(shù)據(jù),Kappa 架構(gòu)已經(jīng)足夠。
最佳實踐
版本搭配
版本選擇,這個問題可能會成為困擾大家的第一個絆腳石,下面是hudi中文社區(qū)推薦的版本適配:
| flink | hudi |
|---|---|
| 1.12.2 | 0.9.0 |
| 1.13.1 | 0.10.0 |
建議用 hudi master +flink 1.13 這樣可以和 cdc connector 更好地適配。
下載hudi
https://mvnrepository.com/artifact/org.apache.hudi/hudi-flink-bundle
目前maven中央倉庫,最新版本是0.9.0 ,如果需要下載0.10.0版本 , 可以加入社區(qū)群,在共享文件中下載,也可以下載源碼自行編譯。
執(zhí)行
如果將?hudi-flink-bundle_2.11-0.10.0.jar?放到了?flink/lib?下,則只需要如下執(zhí)行即可,否則會出現(xiàn)各種找不到類的異常
bin/sql-client.sh embedded
Flink on hudi
新建maven工程,修改pom如下
<project?xmlns="http://maven.apache.org/POM/4.0.0"
?????????xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
?????????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?http://maven.apache.org/xsd/maven-4.0.0.xsd">
????<modelVersion>4.0.0modelVersion>
????<groupId>org.examplegroupId>
????<artifactId>flink_hudi_testartifactId>
????<version>1.0-SNAPSHOTversion>
????<properties>
????????<maven.compiler.source>8maven.compiler.source>
????????<maven.compiler.target>8maven.compiler.target>
????????<flink.version>1.13.1flink.version>
????????<hudi.version>0.10.0hudi.version>
????????<hadoop.version>2.10.1hadoop.version>
????properties>
????<dependencies>
????????<dependency>
????????????<groupId>org.apache.hadoopgroupId>
????????????<artifactId>hadoop-clientartifactId>
????????????<version>${hadoop.version}version>
????????dependency>
????????<dependency>
????????????<groupId>org.apache.hadoopgroupId>
????????????<artifactId>hadoop-hdfsartifactId>
????????????<version>${hadoop.version}version>
????????dependency>
????????<dependency>
????????????<groupId>org.apache.hadoopgroupId>
????????????<artifactId>hadoop-commonartifactId>
????????????<version>${hadoop.version}version>
????????dependency>
????????<dependency>
????????????<groupId>org.apache.flinkgroupId>
????????????<artifactId>flink-coreartifactId>
????????????<version>${flink.version}version>
????????dependency>
????????<dependency>
????????????<groupId>org.apache.flinkgroupId>
????????????<artifactId>flink-streaming-java_2.11artifactId>
????????????<version>${flink.version}version>
????????dependency>
????????<dependency>
????????????<groupId>org.apache.flinkgroupId>
????????????<artifactId>flink-connector-jdbc_2.11artifactId>
????????????<version>${flink.version}version>
????????dependency>
????????<dependency>
????????????<groupId>org.apache.flinkgroupId>
????????????<artifactId>flink-javaartifactId>
????????????<version>${flink.version}version>
????????dependency>
????????<dependency>
????????????<groupId>org.apache.flinkgroupId>
????????????<artifactId>flink-clients_2.11artifactId>
????????????<version>${flink.version}version>
????????dependency>
????????<dependency>
????????????<groupId>org.apache.flinkgroupId>
????????????<artifactId>flink-table-api-java-bridge_2.11artifactId>
????????????<version>${flink.version}version>
????????dependency>
????????<dependency>
????????????<groupId>org.apache.flinkgroupId>
????????????<artifactId>flink-table-commonartifactId>
????????????<version>${flink.version}version>
????????dependency>
????????<dependency>
????????????<groupId>org.apache.flinkgroupId>
????????????<artifactId>flink-table-planner_2.11artifactId>
????????????<version>${flink.version}version>
????????dependency>
????????<dependency>
????????????<groupId>org.apache.flinkgroupId>
????????????<artifactId>flink-table-planner-blink_2.11artifactId>
????????????<version>${flink.version}version>
????????dependency>
????????<dependency>
????????????<groupId>org.apache.flinkgroupId>
????????????<artifactId>flink-table-planner-blink_2.11artifactId>
????????????<version>${flink.version}version>
????????????<type>test-jartype>
????????dependency>
????????<dependency>
????????????<groupId>com.ververicagroupId>
????????????<artifactId>flink-connector-mysql-cdcartifactId>
????????????<version>2.0.0version>
????????dependency>
????????<dependency>
????????????<groupId>org.apache.hudigroupId>
????????????<artifactId>hudi-flink-bundle_2.11artifactId>
????????????<version>${hudi.version}version>
????????????<scope>systemscope>
????????????<systemPath>${project.basedir}/libs/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jarsystemPath>
????????dependency>
????????<dependency>
????????????<groupId>mysqlgroupId>
????????????<artifactId>mysql-connector-javaartifactId>
????????????<version>5.1.49version>
????????dependency>
????dependencies>
project>
我們通過構(gòu)建查詢insert into t2 select replace(uuid(),'-',''),id,name,description,now() from mysql_binlog?將創(chuàng)建的mysql表,插入到hudi里。
package?name.lijiaqi;
import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import?org.apache.flink.table.api.EnvironmentSettings;
import?org.apache.flink.table.api.SqlDialect;
import?org.apache.flink.table.api.TableResult;
import?org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public?class?MysqlToHudiExample?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????EnvironmentSettings?fsSettings?=?EnvironmentSettings.newInstance()
????????????????.useBlinkPlanner()
????????????????.inStreamingMode()
????????????????.build();
????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????env.setParallelism(1);
????????StreamTableEnvironment?tableEnv?=?StreamTableEnvironment.create(env,?fsSettings);
????????tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
????????//?數(shù)據(jù)源表
????????String?sourceDDL?=
????????????????"CREATE?TABLE?mysql_binlog?(\n"?+
????????????????????????"?id?INT?NOT?NULL,\n"?+
????????????????????????"?name?STRING,\n"?+
????????????????????????"?description?STRING\n"?+
????????????????????????")?WITH?(\n"?+
????????????????????????"?'connector'?=?'jdbc',\n"?+
????????????????????????"?'url'?=?'jdbc:mysql://127.0.0.1:3306/test',?\n"+
????????????????????????"?'driver'?=?'com.mysql.jdbc.Driver',?\n"+
????????????????????????"?'username'?=?'root',\n"?+
????????????????????????"?'password'?=?'dafei1288',?\n"?+
????????????????????????"?'table-name'?=?'test_cdc'\n"?+
????????????????????????")";
????????//?輸出目標(biāo)表
????????String?sinkDDL?=
????????????????"CREATE?TABLE?t2(\n"?+
????????????????????????"\tuuid?VARCHAR(20),\n"+
????????????????????????"\tid?INT?NOT?NULL,\n"?+
????????????????????????"\tname?VARCHAR(40),\n"?+
????????????????????????"\tdescription?VARCHAR(40),\n"?+
????????????????????????"\tts?TIMESTAMP(3)\n"+
//????????????????????????"\t`partition`?VARCHAR(20)\n"?+
????????????????????????")\n"?+
//????????????????????????"PARTITIONED?BY?(`partition`)\n"?+
????????????????????????"WITH?(\n"?+
????????????????????????"\t'connector'?=?'hudi',\n"?+
????????????????????????"\t'path'?=?'hdfs://172.19.28.4:9000/hudi_t4/',\n"?+
????????????????????????"\t'table.type'?=?'MERGE_ON_READ'\n"?+
????????????????????????")"?;
????????//?簡單的聚合處理
????????String?transformSQL?=
????????????????"insert?into?t2?select?replace(uuid(),'-',''),id,name,description,now()??from?mysql_binlog";
????????tableEnv.executeSql(sourceDDL);
????????tableEnv.executeSql(sinkDDL);
????????TableResult?result?=?tableEnv.executeSql(transformSQL);
????????result.print();
????????env.execute("mysql-to-hudi");
????}
}
查詢hudi
package?name.lijiaqi;
import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import?org.apache.flink.table.api.EnvironmentSettings;
import?org.apache.flink.table.api.SqlDialect;
import?org.apache.flink.table.api.TableResult;
import?org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public?class?ReadHudi?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????EnvironmentSettings?fsSettings?=?EnvironmentSettings.newInstance()
????????????????.useBlinkPlanner()
????????????????.inStreamingMode()
????????????????.build();
????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????env.setParallelism(1);
????????StreamTableEnvironment?tableEnv?=?StreamTableEnvironment.create(env,?fsSettings);
????????tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
????????String?sourceDDL?=
????????????????"CREATE?TABLE?t2(\n"?+
????????????????????????"\tuuid?VARCHAR(20),\n"+
????????????????????????"\tid?INT?NOT?NULL,\n"?+
????????????????????????"\tname?VARCHAR(40),\n"?+
????????????????????????"\tdescription?VARCHAR(40),\n"?+
????????????????????????"\tts?TIMESTAMP(3)\n"+
//????????????????????????"\t`partition`?VARCHAR(20)\n"?+
????????????????????????")\n"?+
//????????????????????????"PARTITIONED?BY?(`partition`)\n"?+
????????????????????????"WITH?(\n"?+
????????????????????????"\t'connector'?=?'hudi',\n"?+
????????????????????????"\t'path'?=?'hdfs://172.19.28.4:9000/hudi_t4/',\n"?+
????????????????????????"\t'table.type'?=?'MERGE_ON_READ'\n"?+
????????????????????????")"?;
????????tableEnv.executeSql(sourceDDL);
????????TableResult?result2?=?tableEnv.executeSql("select?*?from?t2");
????????result2.print();
????????env.execute("read_hudi");
????}
}
展示結(jié)果

上一章節(jié),我們使用代碼形式構(gòu)建實驗,在本章節(jié)里,我們直接使用官網(wǎng)下載的flink包來構(gòu)建實驗環(huán)境。
添加依賴
添加如下依賴到$FLINK_HOME/lib下
hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar (修改 Master 分支的 Hudi Flink 版本為 1.13.2 然后構(gòu)建) hadoop-mapreduce-client-core-2.7.3.jar (解決 Hudi ClassNotFoundException) flink-sql-connector-mysql-cdc-2.0.0.jar flink-format-changelog-json-2.0.0.jar flink-sql-connector-kafka_2.11-1.13.2.jar
注意,在尋找jar的時候,cdc 2.0?更新過group id?,不再試?com.alibaba.ververica?而是改成了?com.ververica?

flink sql cdc on hudi
創(chuàng)建mysql cdc表
CREATE??TABLE?mysql_users?(
?id?BIGINT?PRIMARY?KEY?NOT?ENFORCED?,
?name?STRING,
?birthday?TIMESTAMP(3),
?ts?TIMESTAMP(3)
)?WITH?(
?'connector'?=?'mysql-cdc',
?'hostname'?=?'localhost',
?'port'?=?'3306',
?'username'?=?'root',
?'password'?=?'dafei1288',
?'server-time-zone'?=?'Asia/Shanghai',
?'database-name'?=?'test',
?'table-name'?=?'users'???
);
創(chuàng)建hudi表
CREATE?TABLE?hudi_users5(
?id?BIGINT?PRIMARY?KEY?NOT?ENFORCED,
????name?STRING,
????birthday?TIMESTAMP(3),
????ts?TIMESTAMP(3),
????`partition`?VARCHAR(20)
)?PARTITIONED?BY?(`partition`)?WITH?(
????'connector'?=?'hudi',
????'table.type'?=?'MERGE_ON_READ',
????'path'?=?'hdfs://localhost:9009/hudi/hudi_users5'
);
修改配置,讓查詢模式輸出為表,設(shè)置checkpoint
set execution.result-mode=tableau;
set execution.checkpointing.interval=10sec;
進行輸入導(dǎo)入
INSERT INTO hudi_users5(id,name,birthday,ts,?
partition) SELECT id,name,birthday,ts,DATE_FORMAT(birthday, 'yyyyMMdd') FROM mysql_users;
查詢數(shù)據(jù)
select * from hudi_users5;
執(zhí)行結(jié)果

卡執(zhí)行計劃

這個問題坑了我好幾天,一度都打算放棄hudi了,表面上很正常,日志也沒有任何報錯,也可以看出來cdc起作用了,有數(shù)據(jù)寫入,但是就是卡在?hoodie_stream_write?上一動不動了,沒有數(shù)據(jù)下發(fā)了。感謝社區(qū)大佬?Danny Chan?的提點,可能是 checkpoint的問題,于是做了設(shè)置
set execution.checkpointing.interval=10sec;
終于正常了

致此,F(xiàn)link + Hudi 倉湖一體化方案的原型構(gòu)建完成,感謝大家看到這里。
Hi,我是王知無,一個大數(shù)據(jù)領(lǐng)域的原創(chuàng)作者。?
放心關(guān)注我,獲取更多行業(yè)的一手消息。

八千里路云和月 | 從零到大數(shù)據(jù)專家學(xué)習(xí)路徑指南
我在B站讀大學(xué),大數(shù)據(jù)專業(yè)
我們在學(xué)習(xí)Flink的時候,到底在學(xué)習(xí)什么?
193篇文章暴揍Flink,這個合集你需要關(guān)注一下
Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS
Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點
我們在學(xué)習(xí)Spark的時候,到底在學(xué)習(xí)什么?
硬剛Hive | 4萬字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)
4萬字長文 | ClickHouse基礎(chǔ)&實踐&調(diào)優(yōu)全視角解析
【面試&個人成長】2021年過半,社招和校招的經(jīng)驗之談
大數(shù)據(jù)方向另一個十年開啟 |《硬剛系列》第一版完結(jié)
