<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink CDC 2.0原理詳解和生產(chǎn)實踐

          共 27495字,需瀏覽 55分鐘

           ·

          2022-03-11 01:11

          點擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”
          回復(fù)"面試"獲取更多驚喜
          八股文交給我,你們專心刷題和面試
          Hi,我是王知無,一個大數(shù)據(jù)領(lǐng)域的原創(chuàng)作者。?
          放心關(guān)注我,獲取更多行業(yè)的一手消息。

          Flink CDC 概念

          CDC 的全稱是 Change Data Capture ,在廣義的概念上,只要能捕獲數(shù)據(jù)變更的技術(shù),我們都可以稱為 CDC 。通常我們說的 CDC 技術(shù)主要面向 數(shù)據(jù)庫的變更,是一種用于捕獲數(shù)據(jù)庫中數(shù)據(jù)變更的技術(shù)。

          應(yīng)用場景

          1. 數(shù)據(jù)同步,用于備份,容災(zāi)
          2. 數(shù)據(jù)分發(fā),一個數(shù)據(jù)源分發(fā)給多個下游
          3. 數(shù)據(jù)采集(E),面向數(shù)據(jù)倉庫/數(shù)據(jù)湖的 ETL 數(shù)據(jù)集成

          CDC 技術(shù)

          目前業(yè)界主流的實現(xiàn)機制的可以分為兩種:

          1. 基于查詢的 CDC
          a.離線調(diào)度查詢作業(yè),批處理。
          b.無法保障數(shù)據(jù)一致性。
          c.不保障實時性。
          1. 基于日志的 CDC
          a.實時消費日志,流處理。
          b.保障數(shù)據(jù)一致性。
          c.提供實時數(shù)據(jù)。

          常見的開源 CDC 方案

          Flink CDC 2.0 設(shè)計詳解

          Source 官網(wǎng)

          https://github.com/ververica/flink-cdc-connectors

          支持的連接

          實戰(zhàn)應(yīng)用

          pom 文件

          "1.0"?encoding="UTF-8"?>
          "http://maven.apache.org/POM/4.0.0"
          ?????????xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          ?????????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?http://maven.apache.org/xsd/maven-4.0.0.xsd">
          ????
          ????????Flink-learning
          ????????com.wudl.flink
          ????????1.0-SNAPSHOT
          ????

          ????4.0.0

          ????Flink-cdc2.0
          ????
          ????????1.13.0
          ????????????1.8
          ????????????1.8
          ????




          ????
          ????????
          ????????????org.projectlombok
          ????????????lombok
          ????????????1.18.2
          ????????????provided
          ????????

          ????????
          ????????????org.apache.flink
          ????????????flink-java
          ????????????${flink-version}
          ????????


          ????????
          ????????????org.apache.flink
          ????????????flink-streaming-java_2.12
          ????????????${flink-version}
          ????????


          ????????
          ????????????org.apache.flink
          ????????????flink-clients_2.12
          ????????????${flink-version}
          ????????


          ????????
          ????????????org.apache.hadoop
          ????????????hadoop-client
          ????????????3.1.3
          ????????


          ????????
          ????????????mysql
          ????????????mysql-connector-java
          ????????????5.1.49
          ????????


          ????????
          ????????????org.apache.flink
          ????????????flink-table-planner-blink_2.12
          ????????????${flink-version}
          ????????


          ????????
          ????????????com.ververica
          ????????????flink-connector-mysql-cdc
          ????????????2.0.2
          ????????


          ????????
          ????????????com.alibaba
          ????????????fastjson
          ????????????1.2.75
          ????????


          ????????
          ????????????org.apache.flink
          ????????????flink-connector-jdbc_2.12
          ????????????1.13.3
          ????????

          ????

          ????
          ????????
          ????????????
          ????????????????org.apache.maven.plugins
          ????????????????maven-assembly-plugin
          ????????????????3.0.0
          ????????????????
          ????????????????????
          ????????????????????????jar-with-dependencies
          ????????????????????

          ????????????????

          ????????????????
          ????????????????????
          ????????????????????????make-assembly
          ????????????????????????package
          ????????????????????????
          ????????????????????????????single
          ????????????????????????

          ????????????????????

          ????????????????

          ????????????

          ????????

          ????


          代碼

          package?com.wud.cdc2;

          import?com.ververica.cdc.connectors.mysql.MySqlSource;
          import?com.ververica.cdc.connectors.mysql.table.StartupOptions;
          import?com.ververica.cdc.debezium.DebeziumDeserializationSchema;
          import?com.ververica.cdc.debezium.DebeziumSourceFunction;
          import?com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
          import?org.apache.flink.runtime.state.filesystem.FsStateBackend;
          import?org.apache.flink.streaming.api.CheckpointingMode;
          import?org.apache.flink.streaming.api.datastream.DataStreamSource;
          import?org.apache.flink.streaming.api.environment.CheckpointConfig;
          import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import?org.apache.flink.streaming.api.environment.StreamPipelineOptions;
          import?org.apache.flink.table.api.Table;
          import?org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
          import?org.apache.flink.types.Row;

          import?static?org.apache.flink.table.api.Expressions.$;

          public?class?FlinkCDC?{
          ????public?static?void?main(String[]?args)?throws?Exception?{

          ????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
          ????????env.setParallelism(1);
          ????????StreamTableEnvironment?tabEnv?=?StreamTableEnvironment.create(env);
          ????????env.enableCheckpointing(5000L);
          ????????env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
          ????????//?設(shè)置任務(wù)關(guān)閉時候保留最后一次checkpoint?的數(shù)據(jù)
          ????????env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
          ????????//?指定ck?的自動重啟策略
          ????????env.setStateBackend(new?FsStateBackend("hdfs://192.168.1.161:8020/cdc2.0-test/ck"));
          ????????//?設(shè)置hdfs?的訪問用戶名
          ????????System.setProperty("HADOOP_USER_NAME","hdfs");

          ????????DebeziumSourceFunction?mySqlSource?=?MySqlSource.builder()
          ????????????????.hostname("192.168.1.180")
          ????????????????.port(3306)
          ????????????????.username("root")
          ????????????????.password("123456")
          ????????????????.databaseList("test")
          ????????????????.tableList("test.Flink_iceberg")
          ????????????????.deserializer(new?StringDebeziumDeserializationSchema())
          ????????????????.startupOptions(StartupOptions.initial())
          ????????????????.build();
          ????????DataStreamSource?dataStreamSource?=?env.addSource(mySqlSource);
          ????????dataStreamSource.print();
          ????????env.execute();


          ????}
          }

          執(zhí)行結(jié)果

          SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10011,name=flink-mysql,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585007,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585013},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
          SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10012,name=flink-mysqA,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585015,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585016},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
          SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10012,name=flink-mysqA,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585017},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
          SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10011,name=flink-mysql,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585017},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
          SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10011,name=flink-mysql,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585017},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
          SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10012,name=flink-mysqA,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585017},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
          SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10013,name=flink-mysqA3,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585017},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
          SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10014,name=flink-mysqA4,age=19,dt=2021-09-28},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585017},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
          SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10011,name=flink-mysql,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585017},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
          SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10012,name=flink-mysqA,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
          SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10012,name=flink-mysqA,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585018,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
          SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10011,name=flink-mysql,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585018,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
          SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10011,name=flink-mysql,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585018,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
          SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10012,name=flink-mysqA,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585018,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
          SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10013,name=flink-mysqA3,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585018,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
          SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765,?snapshot=true}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10014,name=flink-mysqA4,age=19,dt=2021-09-28},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585018,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
          SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1633178585,?file=mysql-bin.000036,?pos=765}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=10050,name=flink-cdc-add,age=21,dt=2021-10-2},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585018,snapshot=last,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}

          集群提交

          執(zhí)行命令

          [root@basenode?flink-1.13.2]#?bin/flink?run?-c?com.wud.cdc2.FlinkCDC?/opt/datas/Flink-cdc2.0-1.0-SNAPSHOT-jar-with-dependencies.jar

          Job?has?been?submitted?with?JobID?137b680a6bb934e43568f14f6583b62c

          手動執(zhí)行savepoint

          給當(dāng)前程序創(chuàng)建保存點-savepoint

          [root@basenode?flink-1.13.2]#?bin/flink?savepoint?????e8e918c2517a777e817c630cf1d6b932????hdfs://192.168.1.161:8020/cdc-test/savepoint
          Triggering?savepoint?for?job?e8e918c2517a777e817c630cf1d6b932.
          Waiting?for?response...
          Savepoint?completed.?Path:?hdfs://192.168.1.161:8020/cdc-test/savepoint/savepoint-e8e918-9ef094f349be
          You?can?resume?your?program?from?this?savepoint?with?the?run?command.
          [root@basenode?flink-1.13.2]#??

          界面停止 flink 程序

          然后再mysql中添加數(shù)據(jù)

          啟動flink 程序

          [root@basenode?flink-1.13.2]#?bin/flink?run?-s?hdfs://192.168.1.161:8020/cdc-test/savepoint/savepoint-e8e918-9ef094f349be?-c??com.wud.cdc2.FlinkCDC?/opt/datas/Flink-cdc2.0-1.0-SNAPSHOT-jar-with-dependencies.jar
          Job?has?been?submitted?with?JobID?474a0da99820aa6025203f9806b9fcad

          查看日志:

          接下來 flink cdc 2.0 的自定義序列號函數(shù)

          從上面可以看出flink cdc 的原始結(jié)構(gòu)

          ?SourceRecord{sourcePartition={server=mysql_binlog_source},?
          ?sourceOffset={file=mysql-bin.000063,?pos=154}}
          ?ConnectRecord{topic='mysql_binlog_source.wudl-gmall.user_info',?kafkaPartition=null,?key=Struct{id=4000},?keySchema=Schema{mysql_binlog_source.wudl_gmall.user_info.Key:STRUCT},?value=Struct{after=Struct{id=4000,login_name=i0v0k9,nick_name=xxx,name=xxx,phone_num=137xxxxx,[email protected],user_level=1,birthday=1969-12-04,gender=F,create_time=2020-12-04?23:28:45},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=last,db=wudl-gmall,table=user_info,server_id=0,file=mysql-bin.000063,pos=154,row=0},op=c,ts_ms=1636255826014},?valueSchema=Schema{mysql_binlog_source.wudl_gmall.user_info.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}

          我們可以自定義序列化:

          package?com.wud.cdc2;

          import?com.alibaba.fastjson.JSONObject;
          import?com.ververica.cdc.debezium.DebeziumDeserializationSchema;
          import?io.debezium.data.Envelope;
          import?org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          import?org.apache.flink.api.common.typeinfo.TypeInformation;
          import?org.apache.flink.util.Collector;
          import?org.apache.kafka.connect.data.Field;
          import?org.apache.kafka.connect.data.Schema;
          import?org.apache.kafka.connect.data.Struct;
          import?org.apache.kafka.connect.source.SourceRecord;
          import?java.util.List;

          public?class?CustomerDeserialization?implements?DebeziumDeserializationSchema?{
          ????/**
          ?????*
          ?????*?SourceRecord{sourcePartition={server=mysql_binlog_source},?sourceOffset={ts_sec=1636269821,?file=mysql-bin.000063,?pos=6442}}?ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg',?kafkaPartition=null,?key=null,?keySchema=null,?value=Struct{after=Struct{id=102,name=flinksql,age=25,dt=2021-11-08},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1636269821531,snapshot=last,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000063,pos=6442,row=0},op=r,ts_ms=1636269821531},?valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT},?timestamp=null,?headers=ConnectHeaders(headers=)}
          ?????*
          ?????*
          ?????*
          ?????*
          ?????*
          ?????*?@param?sourceRecord???返回一行數(shù)據(jù)
          ?????*?@param?collector?數(shù)據(jù)輸出
          ?????*?@throws?Exception
          ?????*/
          ????@Override
          ????public?void?deserialize(SourceRecord?sourceRecord,?Collector?collector)?throws?Exception?{

          ????????JSONObject??result?=?new?JSONObject();
          ????????String?topic?=?sourceRecord.topic();
          ????????String[]?fields?=?topic.split("\\.");
          ????????result.put("db",fields[1])?;
          ????????result.put("tableName",fields[2]);
          ????????//?獲取before?數(shù)據(jù)
          ????????Struct?value?=?(Struct)?sourceRecord.value();
          ????????Struct?before?=?value.getStruct("before");
          ????????JSONObject?beforeJson?=?new?JSONObject();
          ????????if?(before?!=null)
          ????????{
          ????????????//獲取列信息
          ????????????Schema?schema?=?before.schema();
          ????????????List?fieldList?=?schema.fields();
          ????????????for?(Field?field:fieldList)
          ????????????{
          ????????????????beforeJson.put(field.name(),before.get(field));
          ????????????}
          ????????}
          ????????result.put("before",beforeJson);
          ????????//?獲取after?數(shù)據(jù)
          ????????Struct?after?=?value.getStruct("after");
          ????????JSONObject?afterJson?=?new?JSONObject();
          ????????if?(after?!=null)
          ????????{
          ????????????Schema?schema?=?after.schema();
          ????????????List?afterFields?=?schema.fields();
          ????????????for?(Field?field:afterFields)
          ????????????{
          ????????????????afterJson.put(field.name(),after.get(field));
          ????????????}
          ????????}
          ????????result.put("after",?afterJson);
          ????????//獲取操作類型
          ????????Envelope.Operation?operation?=?Envelope.operationFor(sourceRecord);
          ????????result.put("op",?operation);
          ????????//輸出數(shù)據(jù)
          ????????collector.collect(result.toJSONString());
          ????}
          ????@Override
          ????public?TypeInformation?getProducedType()?{
          ????????return??BasicTypeInfo.STRING_TYPE_INFO;
          ????}
          }

          調(diào)用flink cdc 的自定義函數(shù)

          package?com.wud.cdc2;

          import?com.ververica.cdc.connectors.mysql.MySqlSource;
          import?com.ververica.cdc.connectors.mysql.table.StartupOptions;
          import?com.ververica.cdc.debezium.DebeziumDeserializationSchema;
          import?com.ververica.cdc.debezium.DebeziumSourceFunction;
          import?com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
          import?org.apache.flink.runtime.state.filesystem.FsStateBackend;
          import?org.apache.flink.streaming.api.CheckpointingMode;
          import?org.apache.flink.streaming.api.datastream.DataStreamSource;
          import?org.apache.flink.streaming.api.environment.CheckpointConfig;
          import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import?org.apache.flink.streaming.api.environment.StreamPipelineOptions;
          import?org.apache.flink.table.api.Table;
          import?org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
          import?org.apache.flink.types.Row;

          public?class?FlinkCDC?{
          ????public?static?void?main(String[]?args)?throws?Exception?{

          ????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
          ????????env.setParallelism(1);
          //????????env.enableCheckpointing(5000L);
          //????????env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
          //????????//?設(shè)置任務(wù)關(guān)閉時候保留最后一次checkpoint?的數(shù)據(jù)
          //????????env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
          //????????//?指定ck?的自動重啟策略
          //????????env.setStateBackend(new?FsStateBackend("hdfs://192.168.1.161:8020/cdc2.0-test/ck"));
          //????????//?設(shè)置hdfs?的訪問用戶名
          //????????System.setProperty("HADOOP_USER_NAME","hdfs");

          ????????DebeziumSourceFunction?mySqlSource?=?MySqlSource.builder()
          ????????????????.hostname("192.168.1.180")
          ????????????????.port(3306)
          ????????????????.username("root")
          ????????????????.password("123456")
          ????????????????.databaseList("test")
          ????????????????.tableList("test.Flink_iceberg")
          //????????????????.deserializer(new?StringDebeziumDeserializationSchema())
          ????????????????.deserializer(new?CustomerDeserialization())
          ????????????????.startupOptions(StartupOptions.initial())
          ????????????????.build();
          ????????DataStreamSource?dataStreamSource?=?env.addSource(mySqlSource);
          ????????dataStreamSource.print();
          ????????env.execute();
          ????}
          }

          新增一條數(shù)據(jù)可以看出 控制臺輸出結(jié)果:

          {"op":"UPDATE","before":{"dt":"2021-11-07","name":"spark","id":104,"age":22},"after":{"dt":"2021-11-07","name":"spark02","id":104,"age":22},"db":"test","tableName":"Flink_iceberg"}

          Flink CDC 2.0 SQL 可以做一個 ETL

          需要注意的是必須要有主鍵 否則更新數(shù)據(jù)是新增一列,加主鍵后,更新數(shù)據(jù)不會新增。

          數(shù)據(jù)庫表結(jié)構(gòu):

          CREATE?TABLE?`Flink_iceberg`?(
          ??`id`?bigint(64)?DEFAULT?NULL,
          ??`name`?varchar(64)?DEFAULT?NULL,
          ??`age`?int(20)?DEFAULT?NULL,
          ??`dt`?varchar(64)?DEFAULT?NULL
          )?ENGINE=InnoDB?DEFAULT?CHARSET=latin1

          實現(xiàn)代碼:

          package?com.wud.cdc2;

          import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

          import?org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

          public?class?FlinkCdc20MysqlToMysql?{
          ????public?static?void?main(String[]?args)?throws?Exception?{

          ????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
          ????????env.setParallelism(1);
          ????????StreamTableEnvironment?tableEnv?=?StreamTableEnvironment.create(env);
          ????????String?sourceSql?=?"CREATE?TABLE?IF?NOT?EXISTS?mySqlSource?("?+
          ????????????????"id?BIGINT?primary?key,?"?+
          ????????????????"name?string?,"?+
          ????????????????"age?int,"?+
          ????????????????"dt?string"?+
          ????????????????")?with?(?"?+
          ????????????????"?'connector'?=?'mysql-cdc',?"?+
          ????????????????"?'scan.startup.mode'?=?'latest-offset',?"?+
          ????????????????"?'hostname'?=?'192.168.1.180',?"?+
          ????????????????"?'port'?=?'3306',?"?+
          ????????????????"?'username'?=?'root',?"?+
          ????????????????"?'password'?=?'123456',?"?+
          ????????????????"?'database-name'?=?'test',?"?+
          ????????????????"?'table-name'?=?'Flink_iceberg'?"?+
          ????????????????")";

          ????????String?sinkSql?=?"?CREATE?TABLE?IF?NOT?EXISTS?mySqlSink?("?+
          ????????????????"id?BIGINT?primary?key?,?"?+
          ????????????????"name?string?,"?+
          ????????????????"age?int,"?+
          ????????????????"dt?string"?+
          ????????????????")?with?("?+
          ????????????????"?'connector'?=?'jdbc',"?+
          ????????????????"?'url'?=?'jdbc:mysql://192.168.1.180:3306/test',"?+
          ????????????????"'table-name'?=?'Flink_iceberg-cdc',"?+
          ????????????????"?'username'?=?'root',"?+
          ????????????????"?'password'?=?'123456'?"?+
          ????????????????"?)";
          ????????tableEnv.executeSql(sourceSql);
          ????????tableEnv.executeSql(sinkSql);
          ????????tableEnv.executeSql("insert??into??mySqlSink?select?*?from?mySqlSource?");
          //????????env.execute("FlinkCdc20MysqlToMysql");
          ????}
          }

          新增一條數(shù)據(jù)和更新數(shù)據(jù)顯示

          {"op":"READ","before":{},"after":{"dt":"2021-09-24","name":"flink-mysqA","id":10012,"age":19},"db":"test","tableName":"Flink_iceberg"}
          {"op":"READ","before":{},"after":{"dt":"2021-09-24","name":"flink-mysqA","id":10012,"age":19},"db":"test","tableName":"Flink_iceberg"}
          {"op":"READ","before":{},"after":{"dt":"2021-09-24","name":"flink-mysql","id":10011,"age":19},"db":"test","tableName":"Flink_iceberg"}
          {"op":"READ","before":{},"after":{"dt":"2021-09-24","name":"flink-mysql","id":10011,"age":19},"db":"test","tableName":"Flink_iceberg"}
          {"op":"READ","before":{},"after":{"dt":"2021-09-24","name":"flink-mysqA","id":10012,"age":19},"db":"test","tableName":"Flink_iceberg"}
          {"op":"READ","before":{},"after":{"dt":"2021-09-24","name":"flink-mysqA3","id":10013,"age":19},"db":"test","tableName":"Flink_iceberg"}
          {"op":"READ","before":{},"after":{"dt":"2021-09-28","name":"flink-mysqA4","id":10014,"age":19},"db":"test","tableName":"Flink_iceberg"}
          {"op":"READ","before":{},"after":{"dt":"2021-09-24","name":"flink-mysql","id":10011,"age":19},"db":"test","tableName":"Flink_iceberg"}
          {"op":"READ","before":{},"after":{"dt":"2021-11-07","name":"flink","id":101,"age":20},"db":"test","tableName":"Flink_iceberg"}
          {"op":"READ","before":{},"after":{"dt":"2021-11-08","name":"flinksql","id":102,"age":25},"db":"test","tableName":"Flink_iceberg"}
          {"op":"READ","before":{},"after":{"dt":"2021-11-09","name":"flink-table","id":103,"age":21},"db":"test","tableName":"Flink_iceberg"}
          {"op":"READ","before":{},"after":{"dt":"2021-11-07","name":"spark","id":104,"age":22},"db":"test","tableName":"Flink_iceberg"}
          {"op":"READ","before":{},"after":{"dt":"2021-11-07","name":"hbase","id":105,"age":25},"db":"test","tableName":"Flink_iceberg"}
          {"op":"UPDATE","before":{"dt":"2021-11-07","name":"spark","id":104,"age":22},"after":{"dt":"2021-11-07","name":"spark02","id":104,"age":22},"db":"test","tableName":"Flink_iceberg"}
          {"op":"CREATE","before":{},"after":{"dt":"2021-11-07","name":"flinkcdc","id":106,"age":22},"db":"test","tableName":"Flink_iceberg"}
          如果這個文章對你有幫助,不要忘記?「在看」?「點贊」?「收藏」?三連啊喂!


          2022年全網(wǎng)首發(fā)|大數(shù)據(jù)專家級技能模型與學(xué)習(xí)指南(勝天半子篇)
          互聯(lián)網(wǎng)最壞的時代可能真的來了
          我在B站讀大學(xué),大數(shù)據(jù)專業(yè)
          我們在學(xué)習(xí)Flink的時候,到底在學(xué)習(xí)什么?
          193篇文章暴揍Flink,這個合集你需要關(guān)注一下
          Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS
          Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點
          我們在學(xué)習(xí)Spark的時候,到底在學(xué)習(xí)什么?
          在所有Spark模塊中,我愿稱SparkSQL為最強!
          硬剛Hive | 4萬字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)
          數(shù)據(jù)治理方法論和實踐小百科全書
          標(biāo)簽體系下的用戶畫像建設(shè)小指南
          4萬字長文 | ClickHouse基礎(chǔ)&實踐&調(diào)優(yōu)全視角解析
          【面試&個人成長】2021年過半,社招和校招的經(jīng)驗之談
          大數(shù)據(jù)方向另一個十年開啟 |《硬剛系列》第一版完結(jié)
          我寫過的關(guān)于成長/面試/職場進(jìn)階的文章
          當(dāng)我們在學(xué)習(xí)Hive的時候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」
          瀏覽 73
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  a亚洲a| 啪啪啪综合网 | 中文字字幕中文字幕乱码 | 嗯嗯呢啊精品 | 亚洲看高清无码专区视频 |