Flink CDC 2.2.0同步Mysql數(shù)據(jù)到Hudi數(shù)據(jù)湖表實(shí)踐
目錄
- 介紹
- Deserialization序列化和反序列化
-
添加Flink CDC依賴(lài)
3.1 sql-client
3.2 Java/Scala API -
使用SQL方式同步Mysql數(shù)據(jù)到Hudi數(shù)據(jù)湖
4.1 Mysql表結(jié)構(gòu)和數(shù)據(jù)
4.2 Flink開(kāi)啟checkpoint
4.3 在Flink中創(chuàng)建Mysql的映射表
4.4 在Flink中創(chuàng)建Hudi Sink的映射表
4.5 流式寫(xiě)入Hudi
1. 介紹
Flink CDC底層是使用Debezium來(lái)進(jìn)行data changes的capture
特色:
-
snapshot能并行讀取。根據(jù)表定義的primary key中的第一列劃分成chunk。如果表沒(méi)有primary key,需要通過(guò)參數(shù)
scan.incremental.snapshot.enabled關(guān)閉snapshot增量讀取 - snapshot讀取時(shí)的checkpoint粒度為chunk
- snapshot讀取不需要global read lock(FLUSH TABLES WITH READ LOCK)
- reader讀取snapshot和binlog的一致性過(guò)程:
- 標(biāo)記當(dāng)前的binlog position為low
- 多個(gè)reader讀取各自的chunk
- 標(biāo)記當(dāng)前的binlog position為high
- 一個(gè)reader讀取low ~ high之間的binlog
- 一個(gè)reader讀取high之后的binlog
2. Deserialization序列化和反序列化
下面用json格式,展示了change event
{
??"before":?{
????"id":?111,
????"name":?"scooter",
????"description":?"Big?2-wheel?scooter",
????"weight":?5.18
??},
??"after":?{
????"id":?111,
????"name":?"scooter",
????"description":?"Big?2-wheel?scooter",
????"weight":?5.15
??},
??"source":?{...},
??"op":?"u",??//?operation?type,?u表示這是一個(gè)update?event?
??"ts_ms":?1589362330904,??//?connector處理event的時(shí)間
??"transaction":?null
}
在DataStrea API中,用戶可以使用Constructor:JsonDebeziumDeserializationSchema(true),在message中包含schema。但是不推薦使用
JsonDebeziumDeserializationSchema也可以接收J(rèn)sonConverter的自定義配置。如下示例在output中包含小數(shù)的數(shù)據(jù)
Map<String,?Object>?customConverterConfigs?=?new?HashMap<>();
?customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,?"numeric");
?JsonDebeziumDeserializationSchema?schema?=?
??????new?JsonDebeziumDeserializationSchema(true,?customConverterConfigs);
3. 添加Flink CDC依賴(lài)
3.1 sql-client
集成步驟如下:
- 從github flink cdc下載flink-sql-connector-mysql-cdc-2.2.0.jar包
- 將jar包放到Flink集群所有服務(wù)器的lib目錄下
- 重啟Flink集群
- 啟動(dòng)sql-client.sh
3.2 Java/Scala API
添加如下依賴(lài)到pom.xml中
<dependency>
????<groupId>com.ververica</groupId>
????<artifactId>flink-connector-mysql-cdc</artifactId>
????<version>2.2.0</version>
</dependency>
4. 使用SQL方式同步Mysql數(shù)據(jù)到Hudi數(shù)據(jù)湖
4.1 Mysql表結(jié)構(gòu)和數(shù)據(jù)
建表語(yǔ)句如下:
CREATE?TABLE?`info_message`?(
??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'主鍵',
??`msg_title`?varchar(100)?DEFAULT?NULL?COMMENT?'消息名稱(chēng)',
??`msg_ctx`?varchar(2048)?DEFAULT?NULL?COMMENT?'消息內(nèi)容',
??`msg_time`?datetime?DEFAULT?NULL?COMMENT?'消息發(fā)送時(shí)間',
??PRIMARY?KEY?(`id`)
)
部分?jǐn)?shù)據(jù)內(nèi)容如下:
mysql>?
mysql>?select?*?from?d_general.info_message?limit?3;
+--------------------+-----------+-------------------------------------------------------+---------------------+
|?id?????????????????|?msg_title?|?msg_ctx???????????????????????????????????????????????|?msg_time????????????|
+--------------------+-----------+-------------------------------------------------------+---------------------+
|?????????1??????????|???title1??|?????????????????????????content1??????????????????????|?2019-03-29?15:27:21?|
|?????????2??????????|???title2??|?????????????????????????content2??????????????????????|?2019-03-29?15:38:36?|
|?????????3??????????|???title3??|?????????????????????????content3??????????????????????|?2019-03-29?15:38:36?|
+--------------------+-----------+-------------------------------------------------------+---------------------+
3?rows?in?set?(0.00?sec)
mysql>
4.2 Flink開(kāi)啟checkpoint
- Checkpoint默認(rèn)是不開(kāi)啟的,開(kāi)啟Checkpoint讓Hudi可以提交事務(wù)
- 并且mysql-cdc在binlog讀取階段開(kāi)始前,需要等待一個(gè)完整的checkpoint來(lái)避免binlog記錄亂序的情況
- binlog讀取的并行度為1,checkpoint的粒度為數(shù)據(jù)行級(jí)別
- 可以在任務(wù)失敗的情況下,達(dá)到Exactly-once語(yǔ)義
Flink?SQL>?set?'execution.checkpointing.interval'?=?'10s';
[INFO]?Session?property?has?been?set.
Flink?SQL>
4.3 在Flink中創(chuàng)建Mysql的映射表
Flink?SQL>?create?table?mysql_source(
>?database_name?string?metadata?from?'database_name'?virtual,
>?table_name?string?metadata?from?'table_name'?virtual,
>?id?decimal(20,0)?not?null,
>?msg_title?string,
>?msg_ctx?string,
>?msg_time?timestamp(9),
>?primary?key?(id)?not?enforced
>?)?with?(
>?????'connector'?=?'mysql-cdc',
>?????'hostname'?=?'192.168.8.124',
>?????'port'?=?'3306',
>?????'username'?=?'hnmqet',
>?????'password'?=?'hnmq123456',
>?'server-time-zone'?=?'Asia/Shanghai',
>?'scan.startup.mode'?=?'initial',
>?????'database-name'?=?'d_general',
>?????'table-name'?=?'info_message'
>?);
[INFO]?Execute?statement?succeed.
Flink?SQL>
說(shuō)明如下:
- Flink的table中添加了兩個(gè)metadata列。還可以定義op_ts列,類(lèi)型為T(mén)IMESTAMP_LTZ(3),表示binlog在數(shù)據(jù)庫(kù)創(chuàng)建的時(shí)間,如果是snapshot,則值為0
- 如果Mysql中有很多個(gè)列,這里只獲取Flink Table中定義的列
- Mysql的用戶需要的權(quán)限:SELECT、SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT
- server-time-zone: Mysql數(shù)據(jù)庫(kù)的session time zone,用來(lái)控制如何將Mysql的timestamp類(lèi)型轉(zhuǎn)換成string類(lèi)型
- scan.startup.mode:mysql-cdc啟動(dòng)時(shí)消費(fèi)的模式,initial表示同步snapshot和binlog,latest-offset表示同步最新的binlog
- database-name和table-name可以使用正則表達(dá)式匹配多個(gè)數(shù)據(jù)庫(kù)和多個(gè)表,例如"d_general[0-9]+"可以匹配d_general0、d_general999等
4.4 在Flink中創(chuàng)建Hudi Sink的映射表
Flink?SQL>?create?table?hudi_sink(
>?database_name?string,
>?table_name?string,
>?id?decimal(20,0)?not?null,
>?msg_title?string,
>?msg_ctx?string,
>?msg_time?timestamp(6),
>?primary?key?(database_name,?table_name,?id)?not?enforced
>?)?with?(
>?????'connector'?=?'hudi',
>?'path'?=?'hdfs://nnha/user/hudi/warehouse/hudi_db/info_message',
>?'table.type'?=?'MERGE_ON_READ',
>?'hoodie.datasource.write.recordkey.field'?=?'database_name.table_name.id',
>?'write.precombine.field'?=?'msg_time',
>?'write.rate.limit'?=?'2000',
>?'write.tasks'?=?'2',
>?'write.operation'?=?'upsert',
>?'compaction.tasks'?=?'2',
>?'compaction.async.enabled'?=?'true',
>?'compaction.trigger.strategy'?=?'num_commits',
>?'compaction.delta_commits'?=?'5',
>?'read.tasks'?=?'2',
>?'changelog.enabled'?=?'true'
>?);
[INFO]?Execute?statement?succeed.
Flink?SQL>
說(shuō)明如下:
- 不同數(shù)據(jù)庫(kù)和表的id字段可能會(huì)相同,定義復(fù)合主鍵
- hoodie.datasource.write.recordkey.field:默指定表的主鍵,多個(gè)字段用.分隔。認(rèn)為uuid字段
- 如果upstream不能保證數(shù)據(jù)的order,則需要顯式指定write.precombine.field,且選取的字段不能包含null。默認(rèn)為ts字段。作用是如果在一個(gè)批次中,有兩條key相同的數(shù)據(jù),取較大的precombine數(shù)據(jù),插入到Hudi中
- write.rate.limit:每秒寫(xiě)入數(shù)據(jù)的條數(shù),默認(rèn)為0表示不限制
- 默認(rèn)write的并行度為4
- write.operation:默認(rèn)是upsert
- 默認(rèn)compaction的并行度為4
- compaction.async.enabled:是否開(kāi)啟online compaction,默認(rèn)為true
- compaction.trigger.strategy:compaction觸發(fā)的策略,可選值:num_commits、time_elapsed、num_and_time、num_or_time,默認(rèn)值為num_commits
- compaction.delta_commits:每多少次commit進(jìn)行一次compaction,默認(rèn)值為5
- MOR類(lèi)型的表,還不能處理delete,所以會(huì)導(dǎo)致數(shù)據(jù)不一致??梢酝ㄟ^(guò)changelog.enabled轉(zhuǎn)換到change log模式
4.5 流式寫(xiě)入Hudi
先同步snapshot,再同步事務(wù)日志
Flink?SQL>?insert?into?hudi_sink?select?database_name,?table_name,?id,?msg_title,?msg_ctx,?msg_time?from?mysql_source?/*+?OPTIONS('server-id'='5401')?*/?where?msg_time?is?not?null;
[INFO]?Submitting?SQL?update?statement?to?the?cluster...
[INFO]?SQL?update?statement?has?been?successfully?submitted?to?the?cluster:
Job?ID:?afa575f5451af65d1ee7d225d77888ac
Flink?SQL>
- 注意:這里如果where條件如果添加了"msg_time > timestamp ‘2021-04-14 09:49:00’",任務(wù)會(huì)一直卡在write_stream這一步,write_stream的狀態(tài)一直是bush(max): 100%,并且checkpoint也會(huì)一直卡住,查看HDFS上的表是沒(méi)有數(shù)據(jù)
-
默認(rèn)查詢(xún)的并行度是1。如果并行度大于1,需要為每個(gè)slot設(shè)置server-id,4個(gè)slot的設(shè)置方法為:
'server-id'='5401-5404'。這樣Mysql server就能正確維護(hù)network connection和binlog position
2022年全網(wǎng)首發(fā)|大數(shù)據(jù)專(zhuān)家級(jí)技能模型與學(xué)習(xí)指南(勝天半子篇)
互聯(lián)網(wǎng)最壞的時(shí)代可能真的來(lái)了
我在B站讀大學(xué),大數(shù)據(jù)專(zhuān)業(yè)
我們?cè)趯W(xué)習(xí)Flink的時(shí)候,到底在學(xué)習(xí)什么? 193篇文章暴揍Flink,這個(gè)合集你需要關(guān)注一下 Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問(wèn)題小盤(pán)點(diǎn) 我們?cè)趯W(xué)習(xí)Spark的時(shí)候,到底在學(xué)習(xí)什么? 在所有Spark模塊中,我愿稱(chēng)SparkSQL為最強(qiáng)! 硬剛Hive | 4萬(wàn)字基礎(chǔ)調(diào)優(yōu)面試小總結(jié) 數(shù)據(jù)治理方法論和實(shí)踐小百科全書(shū)
標(biāo)簽體系下的用戶畫(huà)像建設(shè)小指南
4萬(wàn)字長(zhǎng)文 | ClickHouse基礎(chǔ)&實(shí)踐&調(diào)優(yōu)全視角解析
【面試&個(gè)人成長(zhǎng)】2021年過(guò)半,社招和校招的經(jīng)驗(yàn)之談 大數(shù)據(jù)方向另一個(gè)十年開(kāi)啟 |《硬剛系列》第一版完結(jié) 我寫(xiě)過(guò)的關(guān)于成長(zhǎng)/面試/職場(chǎng)進(jìn)階的文章 當(dāng)我們?cè)趯W(xué)習(xí)Hive的時(shí)候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」
評(píng)論
圖片
表情

