<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink CDC 2.2.0同步Mysql數(shù)據(jù)到Hudi數(shù)據(jù)湖表實(shí)踐

          共 9110字,需瀏覽 19分鐘

           ·

          2022-10-08 16:44

          4033ead4f7189a634f8807070dff4493.webp 全網(wǎng)最全大數(shù)據(jù)面試提升手冊(cè)!

          目錄

          1. 介紹
          2. Deserialization序列化和反序列化
          3. 添加Flink CDC依賴(lài)
            3.1 sql-client
            3.2 Java/Scala API
          4. 使用SQL方式同步Mysql數(shù)據(jù)到Hudi數(shù)據(jù)湖
            4.1 Mysql表結(jié)構(gòu)和數(shù)據(jù)
            4.2 Flink開(kāi)啟checkpoint
            4.3 在Flink中創(chuàng)建Mysql的映射表
            4.4 在Flink中創(chuàng)建Hudi Sink的映射表
            4.5 流式寫(xiě)入Hudi

          1. 介紹

          Flink CDC底層是使用Debezium來(lái)進(jìn)行data changes的capture

          特色:

          1. snapshot能并行讀取。根據(jù)表定義的primary key中的第一列劃分成chunk。如果表沒(méi)有primary key,需要通過(guò)參數(shù)scan.incremental.snapshot.enabled關(guān)閉snapshot增量讀取
          2. snapshot讀取時(shí)的checkpoint粒度為chunk
          3. snapshot讀取不需要global read lock(FLUSH TABLES WITH READ LOCK)
          4. reader讀取snapshot和binlog的一致性過(guò)程:
          • 標(biāo)記當(dāng)前的binlog position為low
          • 多個(gè)reader讀取各自的chunk
          • 標(biāo)記當(dāng)前的binlog position為high
          • 一個(gè)reader讀取low ~ high之間的binlog
          • 一個(gè)reader讀取high之后的binlog

          2. Deserialization序列化和反序列化

          下面用json格式,展示了change event

                
                {
          ??"before":?{
          ????"id":?111,
          ????"name":?"scooter",
          ????"description":?"Big?2-wheel?scooter",
          ????"weight":?5.18
          ??},
          ??"after":?{
          ????"id":?111,
          ????"name":?"scooter",
          ????"description":?"Big?2-wheel?scooter",
          ????"weight":?5.15
          ??},
          ??"source":?{...},
          ??"op":?"u",??//?operation?type,?u表示這是一個(gè)update?event?
          ??"ts_ms":?1589362330904,??//?connector處理event的時(shí)間
          ??"transaction":?null
          }

          在DataStrea API中,用戶可以使用Constructor:JsonDebeziumDeserializationSchema(true),在message中包含schema。但是不推薦使用

          JsonDebeziumDeserializationSchema也可以接收J(rèn)sonConverter的自定義配置。如下示例在output中包含小數(shù)的數(shù)據(jù)

                
                Map<String,?Object>?customConverterConfigs?=?new?HashMap<>();
          ?customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,?"numeric");
          ?JsonDebeziumDeserializationSchema?schema?=?
          ??????new?JsonDebeziumDeserializationSchema(true,?customConverterConfigs);

          3. 添加Flink CDC依賴(lài)

          3.1 sql-client

          集成步驟如下:

          1. 從github flink cdc下載flink-sql-connector-mysql-cdc-2.2.0.jar包
          2. 將jar包放到Flink集群所有服務(wù)器的lib目錄下
          3. 重啟Flink集群
          4. 啟動(dòng)sql-client.sh
          3.2 Java/Scala API

          添加如下依賴(lài)到pom.xml中

                
                <dependency>
          ????<groupId>com.ververica</groupId>
          ????<artifactId>flink-connector-mysql-cdc</artifactId>
          ????<version>2.2.0</version>
          </dependency>

          4. 使用SQL方式同步Mysql數(shù)據(jù)到Hudi數(shù)據(jù)湖

          4.1 Mysql表結(jié)構(gòu)和數(shù)據(jù)

          建表語(yǔ)句如下:

                
                CREATE?TABLE?`info_message`?(
          ??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'主鍵',
          ??`msg_title`?varchar(100)?DEFAULT?NULL?COMMENT?'消息名稱(chēng)',
          ??`msg_ctx`?varchar(2048)?DEFAULT?NULL?COMMENT?'消息內(nèi)容',
          ??`msg_time`?datetime?DEFAULT?NULL?COMMENT?'消息發(fā)送時(shí)間',
          ??PRIMARY?KEY?(`id`)
          )

          部分?jǐn)?shù)據(jù)內(nèi)容如下:

                
                mysql>?
          mysql>?select?*?from?d_general.info_message?limit?3;
          +--------------------+-----------+-------------------------------------------------------+---------------------+
          |?id?????????????????|?msg_title?|?msg_ctx???????????????????????????????????????????????|?msg_time????????????|
          +--------------------+-----------+-------------------------------------------------------+---------------------+
          |?????????1??????????|???title1??|?????????????????????????content1??????????????????????|?2019-03-29?15:27:21?|
          |?????????2??????????|???title2??|?????????????????????????content2??????????????????????|?2019-03-29?15:38:36?|
          |?????????3??????????|???title3??|?????????????????????????content3??????????????????????|?2019-03-29?15:38:36?|
          +--------------------+-----------+-------------------------------------------------------+---------------------+
          3?rows?in?set?(0.00?sec)

          mysql>
          4.2 Flink開(kāi)啟checkpoint
          • Checkpoint默認(rèn)是不開(kāi)啟的,開(kāi)啟Checkpoint讓Hudi可以提交事務(wù)
          • 并且mysql-cdc在binlog讀取階段開(kāi)始前,需要等待一個(gè)完整的checkpoint來(lái)避免binlog記錄亂序的情況
          • binlog讀取的并行度為1,checkpoint的粒度為數(shù)據(jù)行級(jí)別
          • 可以在任務(wù)失敗的情況下,達(dá)到Exactly-once語(yǔ)義
                
                Flink?SQL>?set?'execution.checkpointing.interval'?=?'10s';
          [INFO]?Session?property?has?been?set.

          Flink?SQL>
          4.3 在Flink中創(chuàng)建Mysql的映射表
                
                Flink?SQL>?create?table?mysql_source(
          >?database_name?string?metadata?from?'database_name'?virtual,
          >?table_name?string?metadata?from?'table_name'?virtual,
          >?id?decimal(20,0)?not?null,
          >?msg_title?string,
          >?msg_ctx?string,
          >?msg_time?timestamp(9),
          >?primary?key?(id)?not?enforced
          >?)?with?(
          >?????'connector'?=?'mysql-cdc',
          >?????'hostname'?=?'192.168.8.124',
          >?????'port'?=?'3306',
          >?????'username'?=?'hnmqet',
          >?????'password'?=?'hnmq123456',
          >?'server-time-zone'?=?'Asia/Shanghai',
          >?'scan.startup.mode'?=?'initial',
          >?????'database-name'?=?'d_general',
          >?????'table-name'?=?'info_message'
          >?);
          [INFO]?Execute?statement?succeed.

          Flink?SQL>

          說(shuō)明如下:

          • Flink的table中添加了兩個(gè)metadata列。還可以定義op_ts列,類(lèi)型為T(mén)IMESTAMP_LTZ(3),表示binlog在數(shù)據(jù)庫(kù)創(chuàng)建的時(shí)間,如果是snapshot,則值為0
          • 如果Mysql中有很多個(gè)列,這里只獲取Flink Table中定義的列
          • Mysql的用戶需要的權(quán)限:SELECT、SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT
          • server-time-zone: Mysql數(shù)據(jù)庫(kù)的session time zone,用來(lái)控制如何將Mysql的timestamp類(lèi)型轉(zhuǎn)換成string類(lèi)型
          • scan.startup.mode:mysql-cdc啟動(dòng)時(shí)消費(fèi)的模式,initial表示同步snapshot和binlog,latest-offset表示同步最新的binlog
          • database-name和table-name可以使用正則表達(dá)式匹配多個(gè)數(shù)據(jù)庫(kù)和多個(gè)表,例如"d_general[0-9]+"可以匹配d_general0、d_general999等
          4.4 在Flink中創(chuàng)建Hudi Sink的映射表
                
                Flink?SQL>?create?table?hudi_sink(
          >?database_name?string,
          >?table_name?string,
          >?id?decimal(20,0)?not?null,
          >?msg_title?string,
          >?msg_ctx?string,
          >?msg_time?timestamp(6),
          >?primary?key?(database_name,?table_name,?id)?not?enforced
          >?)?with?(
          >?????'connector'?=?'hudi',
          >?'path'?=?'hdfs://nnha/user/hudi/warehouse/hudi_db/info_message',
          >?'table.type'?=?'MERGE_ON_READ',
          >?'hoodie.datasource.write.recordkey.field'?=?'database_name.table_name.id',
          >?'write.precombine.field'?=?'msg_time',
          >?'write.rate.limit'?=?'2000',
          >?'write.tasks'?=?'2',
          >?'write.operation'?=?'upsert',
          >?'compaction.tasks'?=?'2',
          >?'compaction.async.enabled'?=?'true',
          >?'compaction.trigger.strategy'?=?'num_commits',
          >?'compaction.delta_commits'?=?'5',
          >?'read.tasks'?=?'2',
          >?'changelog.enabled'?=?'true'
          >?);
          [INFO]?Execute?statement?succeed.

          Flink?SQL>

          說(shuō)明如下:

          • 不同數(shù)據(jù)庫(kù)和表的id字段可能會(huì)相同,定義復(fù)合主鍵
          • hoodie.datasource.write.recordkey.field:默指定表的主鍵,多個(gè)字段用.分隔。認(rèn)為uuid字段
          • 如果upstream不能保證數(shù)據(jù)的order,則需要顯式指定write.precombine.field,且選取的字段不能包含null。默認(rèn)為ts字段。作用是如果在一個(gè)批次中,有兩條key相同的數(shù)據(jù),取較大的precombine數(shù)據(jù),插入到Hudi中
          • write.rate.limit:每秒寫(xiě)入數(shù)據(jù)的條數(shù),默認(rèn)為0表示不限制
          • 默認(rèn)write的并行度為4
          • write.operation:默認(rèn)是upsert
          • 默認(rèn)compaction的并行度為4
          • compaction.async.enabled:是否開(kāi)啟online compaction,默認(rèn)為true
          • compaction.trigger.strategy:compaction觸發(fā)的策略,可選值:num_commits、time_elapsed、num_and_time、num_or_time,默認(rèn)值為num_commits
          • compaction.delta_commits:每多少次commit進(jìn)行一次compaction,默認(rèn)值為5
          • MOR類(lèi)型的表,還不能處理delete,所以會(huì)導(dǎo)致數(shù)據(jù)不一致??梢酝ㄟ^(guò)changelog.enabled轉(zhuǎn)換到change log模式
          4.5 流式寫(xiě)入Hudi

          先同步snapshot,再同步事務(wù)日志

                
                Flink?SQL>?insert?into?hudi_sink?select?database_name,?table_name,?id,?msg_title,?msg_ctx,?msg_time?from?mysql_source?/*+?OPTIONS('server-id'='5401')?*/?where?msg_time?is?not?null;
          [INFO]?Submitting?SQL?update?statement?to?the?cluster...
          [INFO]?SQL?update?statement?has?been?successfully?submitted?to?the?cluster:
          Job?ID:?afa575f5451af65d1ee7d225d77888ac


          Flink?SQL>
          • 注意:這里如果where條件如果添加了"msg_time > timestamp ‘2021-04-14 09:49:00’",任務(wù)會(huì)一直卡在write_stream這一步,write_stream的狀態(tài)一直是bush(max): 100%,并且checkpoint也會(huì)一直卡住,查看HDFS上的表是沒(méi)有數(shù)據(jù)
          • 默認(rèn)查詢(xún)的并行度是1。如果并行度大于1,需要為每個(gè)slot設(shè)置server-id,4個(gè)slot的設(shè)置方法為:'server-id'='5401-5404'。這樣Mysql server就能正確維護(hù)network connection和binlog position
          如果這個(gè)文章對(duì)你有幫助,不要忘記? 「在看」?「點(diǎn)贊」?「收藏」 ?三連啊喂!

          e0a3a4f88ca0e5582efd367f104d94b9.webp2022年全網(wǎng)首發(fā)|大數(shù)據(jù)專(zhuān)家級(jí)技能模型與學(xué)習(xí)指南(勝天半子篇) 互聯(lián)網(wǎng)最壞的時(shí)代可能真的來(lái)了
          我在B站讀大學(xué),大數(shù)據(jù)專(zhuān)業(yè)
          我們?cè)趯W(xué)習(xí)Flink的時(shí)候,到底在學(xué)習(xí)什么? 193篇文章暴揍Flink,這個(gè)合集你需要關(guān)注一下 Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問(wèn)題小盤(pán)點(diǎn) 我們?cè)趯W(xué)習(xí)Spark的時(shí)候,到底在學(xué)習(xí)什么? 在所有Spark模塊中,我愿稱(chēng)SparkSQL為最強(qiáng)! 硬剛Hive | 4萬(wàn)字基礎(chǔ)調(diào)優(yōu)面試小總結(jié) 數(shù)據(jù)治理方法論和實(shí)踐小百科全書(shū)
          標(biāo)簽體系下的用戶畫(huà)像建設(shè)小指南
          4萬(wàn)字長(zhǎng)文 | ClickHouse基礎(chǔ)&實(shí)踐&調(diào)優(yōu)全視角解析
          【面試&個(gè)人成長(zhǎng)】2021年過(guò)半,社招和校招的經(jīng)驗(yàn)之談 大數(shù)據(jù)方向另一個(gè)十年開(kāi)啟 |《硬剛系列》第一版完結(jié) 我寫(xiě)過(guò)的關(guān)于成長(zhǎng)/面試/職場(chǎng)進(jìn)階的文章 當(dāng)我們?cè)趯W(xué)習(xí)Hive的時(shí)候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」
          瀏覽 88
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  久久新 | 求A片网址| 免费搞黄网站。 | 国产做爰视频免费播放 | a片一级富二代表兄妹淫乱新春 |