Flink CDC 2.0原理詳解和生產(chǎn)實踐

Hi,我是王知無,一個大數(shù)據(jù)領(lǐng)域的原創(chuàng)作者。? 放心關(guān)注我,獲取更多行業(yè)的一手消息。
Flink CDC 概念
CDC 的全稱是 Change Data Capture ,在廣義的概念上,只要能捕獲數(shù)據(jù)變更的技術(shù),我們都可以稱為 CDC 。通常我們說的 CDC 技術(shù)主要面向 數(shù)據(jù)庫的變更,是一種用于捕獲數(shù)據(jù)庫中數(shù)據(jù)變更的技術(shù)。
應(yīng)用場景
數(shù)據(jù)同步,用于備份,容災(zāi) 數(shù)據(jù)分發(fā),一個數(shù)據(jù)源分發(fā)給多個下游 數(shù)據(jù)采集(E),面向數(shù)據(jù)倉庫/數(shù)據(jù)湖的 ETL 數(shù)據(jù)集成
CDC 技術(shù)
目前業(yè)界主流的實現(xiàn)機制的可以分為兩種:
基于查詢的 CDC
a.離線調(diào)度查詢作業(yè),批處理。
b.無法保障數(shù)據(jù)一致性。
c.不保障實時性。
基于日志的 CDC
a.實時消費日志,流處理。
b.保障數(shù)據(jù)一致性。
c.提供實時數(shù)據(jù)。
常見的開源 CDC 方案









Flink CDC 2.0 設(shè)計詳解











Source 官網(wǎng)
https://github.com/ververica/flink-cdc-connectors
支持的連接

實戰(zhàn)應(yīng)用
pom 文件
"1.0"?encoding="UTF-8"?>
"http://maven.apache.org/POM/4.0.0"
?????????xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
?????????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?http://maven.apache.org/xsd/maven-4.0.0.xsd">
????
????????Flink-learning
????????com.wudl.flink
????????1.0-SNAPSHOT
????
????4.0.0
????Flink-cdc2.0
????
????????1.13.0
????????????1.8
????????????1.8
????
????
????????
????????????org.projectlombok
????????????lombok
????????????1.18.2
????????????provided
????????
????????
????????????org.apache.flink
????????????flink-java
????????????${flink-version}
????????
????????
????????????org.apache.flink
????????????flink-streaming-java_2.12
????????????${flink-version}
????????
????????
????????????org.apache.flink
????????????flink-clients_2.12
????????????${flink-version}
????????
????????
????????????org.apache.hadoop
????????????hadoop-client
????????????3.1.3
????????
????????
????????????mysql
????????????mysql-connector-java
????????????5.1.49
????????
????????
????????????org.apache.flink
????????????flink-table-planner-blink_2.12
????????????${flink-version}
????????
????????
????????????com.ververica
????????????flink-connector-mysql-cdc
????????????2.0.2
????????
????????
????????????com.alibaba
????????????fastjson
????????????1.2.75
????????
????????
????????????org.apache.flink
????????????flink-connector-jdbc_2.12
????????????1.13.3
????????
????
????
????????
????????????
????????????????org.apache.maven.plugins
????????????????maven-assembly-plugin
????????????????3.0.0
????????????????
????????????????????
????????????????????????jar-with-dependencies
????????????????????
????????????????
????????????????
????????????????????
????????????????????????make-assembly
????????????????????????package
????????????????????????
????????????????????????????single
????????????????????????
????????????????????
????????????????
????????????
????????
????
代碼
package?com.wud.cdc2;
import?com.ververica.cdc.connectors.mysql.MySqlSource;
import?com.ververica.cdc.connectors.mysql.table.StartupOptions;
import?com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import?com.ververica.cdc.debezium.DebeziumSourceFunction;
import?com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import?org.apache.flink.runtime.state.filesystem.FsStateBackend;
import?org.apache.flink.streaming.api.CheckpointingMode;
import?org.apache.flink.streaming.api.datastream.DataStreamSource;
import?org.apache.flink.streaming.api.environment.CheckpointConfig;
import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import?org.apache.flink.streaming.api.environment.StreamPipelineOptions;
import?org.apache.flink.table.api.Table;
import?org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import?org.apache.flink.types.Row;
import?static?org.apache.flink.table.api.Expressions.$;
public?class?FlinkCDC?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????env.setParallelism(1);
????????StreamTableEnvironment?tabEnv?=?StreamTableEnvironment.create(env);
????????env.enableCheckpointing(5000L);
????????env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
????????//?設(shè)置任務(wù)關(guān)閉時候保留最后一次checkpoint?的數(shù)據(jù)
????????env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
????????//?指定ck?的自動重啟策略
????????env.setStateBackend(new?FsStateBackend("hdfs://192.168.1.161:8020/cdc2.0-test/ck"));
????????//?設(shè)置hdfs?的訪問用戶名
????????System.setProperty("HADOOP_USER_NAME","hdfs");
????????DebeziumSourceFunction?mySqlSource?=?MySqlSource.builder()
????????????????.hostname("192.168.1.180")
????????????????.port(3306)
????????????????.username("root")
????????????????.password("123456")
????????????????.databaseList("test")
????????????????.tableList("test.Flink_iceberg")
????????????????.deserializer(new?StringDebeziumDeserializationSchema())
????????????????.startupOptions(StartupOptions.initial())
????????????????.build();
????????DataStreamSource?dataStreamSource?=?env.addSource(mySqlSource);
????????dataStreamSource.print();
????????env.execute();
????}
}
執(zhí)行結(jié)果
SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10011,name=flink-mysql,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585007,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585013},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10012,name=flink-mysqA,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585015,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585016},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10012,name=flink-mysqA,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585017},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10011,name=flink-mysql,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585017},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10011,name=flink-mysql,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585017},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10012,name=flink-mysqA,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585017},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10013,name=flink-mysqA3,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585017},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10014,name=flink-mysqA4,age=19,dt=2021-09-28},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585017},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10011,name=flink-mysql,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585017},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10012,name=flink-mysqA,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10012,name=flink-mysqA,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585018,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10011,name=flink-mysql,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585018,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10011,name=flink-mysql,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585018,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10012,name=flink-mysqA,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585018,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10013,name=flink-mysqA3,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585018,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10014,name=flink-mysqA4,age=19,dt=2021-09-28},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585018,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10050,name=flink-cdc-add,age=21,dt=2021-10-2},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585018,snapshot=last,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
集群提交
執(zhí)行命令
[root@basenode?flink-1.13.2]#?bin/flink?run?-c?com.wud.cdc2.FlinkCDC?/opt/datas/Flink-cdc2.0-1.0-SNAPSHOT-jar-with-dependencies.jar
Job?has?been?submitted?with?JobID?137b680a6bb934e43568f14f6583b62c
手動執(zhí)行savepoint
給當(dāng)前程序創(chuàng)建保存點-savepoint
[root@basenode?flink-1.13.2]#?bin/flink?savepoint?????e8e918c2517a777e817c630cf1d6b932????hdfs://192.168.1.161:8020/cdc-test/savepoint
Triggering?savepoint?for?job?e8e918c2517a777e817c630cf1d6b932.
Waiting?for?response...
Savepoint?completed.?Path:?hdfs://192.168.1.161:8020/cdc-test/savepoint/savepoint-e8e918-9ef094f349be
You?can?resume?your?program?from?this?savepoint?with?the?run?command.
[root@basenode?flink-1.13.2]#??
界面停止 flink 程序
然后再mysql中添加數(shù)據(jù)
啟動flink 程序
[root@basenode?flink-1.13.2]#?bin/flink?run?-s?hdfs://192.168.1.161:8020/cdc-test/savepoint/savepoint-e8e918-9ef094f349be?-c??com.wud.cdc2.FlinkCDC?/opt/datas/Flink-cdc2.0-1.0-SNAPSHOT-jar-with-dependencies.jar
Job?has?been?submitted?with?JobID?474a0da99820aa6025203f9806b9fcad
查看日志:

接下來 flink cdc 2.0 的自定義序列號函數(shù)
從上面可以看出flink cdc 的原始結(jié)構(gòu)
?SourceRecord{sourcePartition={server=mysql_binlog_source},?
?sourceOffset={file=mysql-bin.000063,?pos=154}}
?ConnectRecord{topic='mysql_binlog_source.wudl-gmall.user_info',?kafkaPartition=null,?key=Struct{id=4000},?keySchema=Schema{mysql_binlog_source.wudl_gmall.user_info.Key:STRUCT},?value=Struct{after=Struct{id=4000,login_name=i0v0k9,nick_name=xxx,name=xxx,phone_num=137xxxxx,[email protected],user_level=1,birthday=1969-12-04,gender=F,create_time=2020-12-04?23:28:45},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=last,db=wudl-gmall,table=user_info,server_id=0,file=mysql-bin.000063,pos=154,row=0},op=c,ts_ms=1636255826014},?valueSchema=Schema{mysql_binlog_source.wudl_gmall.user_info.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
我們可以自定義序列化:
package?com.wud.cdc2;
import?com.alibaba.fastjson.JSONObject;
import?com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import?io.debezium.data.Envelope;
import?org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import?org.apache.flink.api.common.typeinfo.TypeInformation;
import?org.apache.flink.util.Collector;
import?org.apache.kafka.connect.data.Field;
import?org.apache.kafka.connect.data.Schema;
import?org.apache.kafka.connect.data.Struct;
import?org.apache.kafka.connect.source.SourceRecord;
import?java.util.List;
public?class?CustomerDeserialization?implements?DebeziumDeserializationSchema?{
????/**
?????*
?????*?SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1636269821,?file=mysql-bin.000063,?pos=6442}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=102,name=flinksql,age=25,dt=2021-11-08},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1636269821531,snapshot=last,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000063,pos=6442,row=0},op=r,ts_ms=1636269821531},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
?????*
?????*
?????*
?????*
?????*
?????*?@param?sourceRecord???返回一行數(shù)據(jù)
?????*?@param?collector?數(shù)據(jù)輸出
?????*?@throws?Exception
?????*/
????@Override
????public?void?deserialize(SourceRecord?sourceRecord,?Collector?collector)?throws?Exception?{
????????JSONObject??result?=?new?JSONObject();
????????String?topic?=?sourceRecord.topic();
????????String[]?fields?=?topic.split("\\.");
????????result.put("db",fields[1])?;
????????result.put("tableName",fields[2]);
????????//?獲取before?數(shù)據(jù)
????????Struct?value?=?(Struct)?sourceRecord.value();
????????Struct?before?=?value.getStruct("before");
????????JSONObject?beforeJson?=?new?JSONObject();
????????if?(before?!=null)
????????{
????????????//獲取列信息
????????????Schema?schema?=?before.schema();
????????????List?fieldList?=?schema.fields();
????????????for?(Field?field:fieldList)
????????????{
????????????????beforeJson.put(field.name(),before.get(field));
????????????}
????????}
????????result.put("before",beforeJson);
????????//?獲取after?數(shù)據(jù)
????????Struct?after?=?value.getStruct("after");
????????JSONObject?afterJson?=?new?JSONObject();
????????if?(after?!=null)
????????{
????????????Schema?schema?=?after.schema();
????????????List?afterFields?=?schema.fields();
????????????for?(Field?field:afterFields)
????????????{
????????????????afterJson.put(field.name(),after.get(field));
????????????}
????????}
????????result.put("after",?afterJson);
????????//獲取操作類型
????????Envelope.Operation?operation?=?Envelope.operationFor(sourceRecord);
????????result.put("op",?operation);
????????//輸出數(shù)據(jù)
????????collector.collect(result.toJSONString());
????}
????@Override
????public?TypeInformation?getProducedType()?{
????????return??BasicTypeInfo.STRING_TYPE_INFO;
????}
}
調(diào)用flink cdc 的自定義函數(shù)
package?com.wud.cdc2;
import?com.ververica.cdc.connectors.mysql.MySqlSource;
import?com.ververica.cdc.connectors.mysql.table.StartupOptions;
import?com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import?com.ververica.cdc.debezium.DebeziumSourceFunction;
import?com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import?org.apache.flink.runtime.state.filesystem.FsStateBackend;
import?org.apache.flink.streaming.api.CheckpointingMode;
import?org.apache.flink.streaming.api.datastream.DataStreamSource;
import?org.apache.flink.streaming.api.environment.CheckpointConfig;
import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import?org.apache.flink.streaming.api.environment.StreamPipelineOptions;
import?org.apache.flink.table.api.Table;
import?org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import?org.apache.flink.types.Row;
public?class?FlinkCDC?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????env.setParallelism(1);
//????????env.enableCheckpointing(5000L);
//????????env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//????????//?設(shè)置任務(wù)關(guān)閉時候保留最后一次checkpoint?的數(shù)據(jù)
//????????env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//????????//?指定ck?的自動重啟策略
//????????env.setStateBackend(new?FsStateBackend("hdfs://192.168.1.161:8020/cdc2.0-test/ck"));
//????????//?設(shè)置hdfs?的訪問用戶名
//????????System.setProperty("HADOOP_USER_NAME","hdfs");
????????DebeziumSourceFunction?mySqlSource?=?MySqlSource.builder()
????????????????.hostname("192.168.1.180")
????????????????.port(3306)
????????????????.username("root")
????????????????.password("123456")
????????????????.databaseList("test")
????????????????.tableList("test.Flink_iceberg")
//????????????????.deserializer(new?StringDebeziumDeserializationSchema())
????????????????.deserializer(new?CustomerDeserialization())
????????????????.startupOptions(StartupOptions.initial())
????????????????.build();
????????DataStreamSource?dataStreamSource?=?env.addSource(mySqlSource);
????????dataStreamSource.print();
????????env.execute();
????}
}
新增一條數(shù)據(jù)可以看出 控制臺輸出結(jié)果:
{"op":"UPDATE","before":{"dt":"2021-11-07","name":"spark","id":104,"age":22},"after":{"dt":"2021-11-07","name":"spark02","id":104,"age":22},"db":"test","tableName":"Flink_iceberg"}
Flink CDC 2.0 SQL 可以做一個 ETL
需要注意的是必須要有主鍵 否則更新數(shù)據(jù)是新增一列,加主鍵后,更新數(shù)據(jù)不會新增。
數(shù)據(jù)庫表結(jié)構(gòu):
CREATE?TABLE?`Flink_iceberg`?(
??`id`?bigint(64)?DEFAULT?NULL,
??`name`?varchar(64)?DEFAULT?NULL,
??`age`?int(20)?DEFAULT?NULL,
??`dt`?varchar(64)?DEFAULT?NULL
)?ENGINE=InnoDB?DEFAULT?CHARSET=latin1
實現(xiàn)代碼:
package?com.wud.cdc2;
import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import?org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public?class?FlinkCdc20MysqlToMysql?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????env.setParallelism(1);
????????StreamTableEnvironment?tableEnv?=?StreamTableEnvironment.create(env);
????????String?sourceSql?=?"CREATE?TABLE?IF?NOT?EXISTS?mySqlSource?("?+
????????????????"id?BIGINT?primary?key,?"?+
????????????????"name?string?,"?+
????????????????"age?int,"?+
????????????????"dt?string"?+
????????????????")?with?(?"?+
????????????????"?'connector'?=?'mysql-cdc',?"?+
????????????????"?'scan.startup.mode'?=?'latest-offset',?"?+
????????????????"?'hostname'?=?'192.168.1.180',?"?+
????????????????"?'port'?=?'3306',?"?+
????????????????"?'username'?=?'root',?"?+
????????????????"?'password'?=?'123456',?"?+
????????????????"?'database-name'?=?'test',?"?+
????????????????"?'table-name'?=?'Flink_iceberg'?"?+
????????????????")";
????????String?sinkSql?=?"?CREATE?TABLE?IF?NOT?EXISTS?mySqlSink?("?+
????????????????"id?BIGINT?primary?key?,?"?+
????????????????"name?string?,"?+
????????????????"age?int,"?+
????????????????"dt?string"?+
????????????????")?with?("?+
????????????????"?'connector'?=?'jdbc',"?+
????????????????"?'url'?=?'jdbc:mysql://192.168.1.180:3306/test',"?+
????????????????"'table-name'?=?'Flink_iceberg-cdc',"?+
????????????????"?'username'?=?'root',"?+
????????????????"?'password'?=?'123456'?"?+
????????????????"?)";
????????tableEnv.executeSql(sourceSql);
????????tableEnv.executeSql(sinkSql);
????????tableEnv.executeSql("insert??into??mySqlSink?select?*?from?mySqlSource?");
//????????env.execute("FlinkCdc20MysqlToMysql");
????}
}
新增一條數(shù)據(jù)和更新數(shù)據(jù)顯示
{"op":"READ","before":{},"after":{"dt":"2021-09-24","name":"flink-mysqA","id":10012,"age":19},"db":"test","tableName":"Flink_iceberg"}
{"op":"READ","before":{},"after":{"dt":"2021-09-24","name":"flink-mysqA","id":10012,"age":19},"db":"test","tableName":"Flink_iceberg"}
{"op":"READ","before":{},"after":{"dt":"2021-09-24","name":"flink-mysql","id":10011,"age":19},"db":"test","tableName":"Flink_iceberg"}
{"op":"READ","before":{},"after":{"dt":"2021-09-24","name":"flink-mysql","id":10011,"age":19},"db":"test","tableName":"Flink_iceberg"}
{"op":"READ","before":{},"after":{"dt":"2021-09-24","name":"flink-mysqA","id":10012,"age":19},"db":"test","tableName":"Flink_iceberg"}
{"op":"READ","before":{},"after":{"dt":"2021-09-24","name":"flink-mysqA3","id":10013,"age":19},"db":"test","tableName":"Flink_iceberg"}
{"op":"READ","before":{},"after":{"dt":"2021-09-28","name":"flink-mysqA4","id":10014,"age":19},"db":"test","tableName":"Flink_iceberg"}
{"op":"READ","before":{},"after":{"dt":"2021-09-24","name":"flink-mysql","id":10011,"age":19},"db":"test","tableName":"Flink_iceberg"}
{"op":"READ","before":{},"after":{"dt":"2021-11-07","name":"flink","id":101,"age":20},"db":"test","tableName":"Flink_iceberg"}
{"op":"READ","before":{},"after":{"dt":"2021-11-08","name":"flinksql","id":102,"age":25},"db":"test","tableName":"Flink_iceberg"}
{"op":"READ","before":{},"after":{"dt":"2021-11-09","name":"flink-table","id":103,"age":21},"db":"test","tableName":"Flink_iceberg"}
{"op":"READ","before":{},"after":{"dt":"2021-11-07","name":"spark","id":104,"age":22},"db":"test","tableName":"Flink_iceberg"}
{"op":"READ","before":{},"after":{"dt":"2021-11-07","name":"hbase","id":105,"age":25},"db":"test","tableName":"Flink_iceberg"}
{"op":"UPDATE","before":{"dt":"2021-11-07","name":"spark","id":104,"age":22},"after":{"dt":"2021-11-07","name":"spark02","id":104,"age":22},"db":"test","tableName":"Flink_iceberg"}
{"op":"CREATE","before":{},"after":{"dt":"2021-11-07","name":"flinkcdc","id":106,"age":22},"db":"test","tableName":"Flink_iceberg"}
評論
圖片
表情
