<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink CDC 原理及生產(chǎn)實(shí)踐

          共 5077字,需瀏覽 11分鐘

           ·

          2020-12-31 09:51

          點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)

          回復(fù)”資源“獲取更多資源

          大數(shù)據(jù)技術(shù)與架構(gòu)
          點(diǎn)擊右側(cè)關(guān)注,大數(shù)據(jù)開發(fā)領(lǐng)域最強(qiáng)公眾號(hào)!

          大數(shù)據(jù)真好玩
          點(diǎn)擊右側(cè)關(guān)注,大數(shù)據(jù)真好玩!



          MySQL CDC連接器允許從MySQL數(shù)據(jù)庫(kù)讀取快照數(shù)據(jù)和增量數(shù)據(jù)。本文檔根據(jù)官網(wǎng)翻譯了如何設(shè)置MySQL CDC連接器以對(duì)MySQL數(shù)據(jù)庫(kù)運(yùn)行SQL查詢。

          依賴關(guān)系

          為了設(shè)置MySQL CDC連接器,下表提供了使用構(gòu)建自動(dòng)化工具(例如Maven或SBT)和帶有SQL JAR捆綁包的SQL Client的兩個(gè)項(xiàng)目的依賴項(xiàng)信息。

          1、Maven依賴


          com.alibaba.ververica
          flink-connector-mysql-cdc
          1.1.0

          2、SQL客戶端JAR

           下載flink-sql-connector-mysql-cdc-1.1.0.jar并將其放在下  /lib/。

          設(shè)置MySQL服務(wù)器

          您必須定義一個(gè)對(duì)Debezium MySQL連接器監(jiān)視的所有數(shù)據(jù)庫(kù)具有適當(dāng)權(quán)限的MySQL用戶。

          1、創(chuàng)建MySQL用戶

          mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';

          2、向用戶授予所需的權(quán)限

          mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';

          3、最終確定用戶的權(quán)限

          mysql> FLUSH PRIVILEGES;

          注意

          1、MySQL CDC源代碼如何工作

          啟動(dòng)MySQL CDC源時(shí),它將獲取一個(gè)全局讀取鎖(FLUSH TABLES WITH READ LOCK),該鎖將阻止其他數(shù)據(jù)庫(kù)的寫入。然后,它讀取當(dāng)前binlog位置以及數(shù)據(jù)庫(kù)和表的schema。之后,將釋放 全局讀取鎖。然后,它掃描數(shù)據(jù)庫(kù)表并從先前記錄的位置讀取binlog。Flink將定期執(zhí)行checkpoints以記錄binlog位置。如果發(fā)生故障,作業(yè)將重新啟動(dòng)并從checkpoint完成的binlog位置恢復(fù)。因此,它保證了僅一次的語(yǔ)義。

          2、向MySQL用戶授予RELOAD權(quán)限

          如果未授予MySQL用戶RELOAD權(quán)限,則MySQL CDC源將改為使用表級(jí)鎖,并使用此方法執(zhí)行快照。這會(huì)阻止寫入更長(zhǎng)的時(shí)間。

          3、全局讀取鎖(FLUSH TABLES WITH READ LOCK)

          全局讀取鎖 在讀取binlog位置和schema期間保持。這可能需要幾秒鐘,具體取決于表的數(shù)量。全局讀取鎖定會(huì)阻止寫入,因此它仍然可能影響在線業(yè)務(wù)。如果要跳過讀取鎖,并且可以容忍至少一次語(yǔ)義,則可以添加'debezium.snapshot.locking.mode' = 'none'選項(xiàng)以跳過鎖。

          4、為每個(gè)作業(yè)設(shè)置一個(gè)differnet SERVER ID 每個(gè)用于讀取binlog的MySQL數(shù)據(jù)庫(kù)客戶端都應(yīng)具有唯一的ID,稱為server id。MySQL服務(wù)器將使用此ID維護(hù)網(wǎng)絡(luò)連接和binlog位置。如果不同的作業(yè)共享相同的server id,則可能導(dǎo)致從錯(cuò)誤的binlog位置進(jìn)行讀取。提示:默認(rèn)情況下,啟動(dòng)TaskManager時(shí),server id是隨機(jī)的。如果TaskManager失敗,則再次啟動(dòng)時(shí),它可能具有不同的server id。但這不應(yīng)該經(jīng)常發(fā)生(作業(yè)異常不會(huì)重新啟動(dòng)TaskManager),也不會(huì)對(duì)MySQL服務(wù)器造成太大影響。

          因此,建議為每個(gè)作業(yè)設(shè)置不同的server id ,例如:

          通過SQL Hints:SELECT * FROM source_table /+ OPTIONS('server-id'='123456') / ;

          通過Stream ApI的 創(chuàng)建source時(shí)設(shè)置:MySQLSource.builder().xxxxxx.serverId(123456);

          重點(diǎn):Mysq的binlog 可以說是針對(duì)庫(kù)級(jí)別,所以相同的server id去拉一個(gè)庫(kù)里的不同表或者相同表可能會(huì)造成數(shù)據(jù)丟失。所以建議設(shè)置server id。

          5、掃描數(shù)據(jù)庫(kù)表期間無(wú)法執(zhí)行檢查點(diǎn)

          在掃描表期間,由于沒有可恢復(fù)的位置,因此我們無(wú)法執(zhí)行checkpoints。為了不執(zhí)行檢查點(diǎn),MySQL CDC源將保持檢查點(diǎn)等待超時(shí)。超時(shí)檢查點(diǎn)將被識(shí)別為失敗的檢查點(diǎn),默認(rèn)情況下,這將觸發(fā)Flink作業(yè)的故障轉(zhuǎn)移。因此,如果數(shù)據(jù)庫(kù)表很大,則建議添加以下Flink配置,以避免由于超時(shí)檢查點(diǎn)而導(dǎo)致故障轉(zhuǎn)移:

          execution.checkpointing.interval: 10min
          execution.checkpointing.tolerable-failed-checkpoints: 100
          restart-strategy: fixed-delay
          restart-strategy.fixed-delay.attempts: 2147483647

          6、設(shè)置MySQL會(huì)話超時(shí)

          為大型數(shù)據(jù)庫(kù)創(chuàng)建初始一致的快照時(shí),在讀取表時(shí),您建立的連接可能會(huì)超時(shí)。您可以通過在MySQL配置文件中配置Interactive_timeout和wait_timeout來(lái)防止此行為。

          • interactive_timeout:服務(wù)器在關(guān)閉交互式連接之前等待活動(dòng)的秒數(shù)。

          • wait_timeout:服務(wù)器在關(guān)閉非交互式連接之前等待其活動(dòng)的秒數(shù)。

          如何創(chuàng)建MySQL CDC表

          1、Sql的方式:(1)定義表如下:

          -- register a MySQL table 'orders' in Flink SQL
          CREATE TABLE orders (
          order_id INT,
          order_date TIMESTAMP(0),
          customer_name STRING,
          price DECIMAL(10, 5),
          product_id INT,
          order_status BOOLEAN
          ) WITH (
          'connector' = 'mysql-cdc',
          'hostname' = 'localhost',
          'port' = '3306',
          'username' = 'root',
          'password' = '123456',
          'database-name' = 'mydb',
          'table-name' = 'orders'
          );

          -- read snapshot and binlogs from orders table
          SELECT * FROM orders;

          2、Stream API

          MySQL CDC連接器也可以是DataStream源。您可以創(chuàng)建SourceFunction,如下所示:

          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import org.apache.flink.streaming.api.functions.source.SourceFunction;
          import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
          import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;

          public class MySqlBinlogSourceExample {
          public static void main(String[] args) throws Exception {
          SourceFunction sourceFunction = MySQLSource.builder()
          .hostname("localhost")
          .port(3306)
          .databaseList("inventory") // monitor all tables under inventory database
          .username("flinkuser")
          .password("flinkpw")
          .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
          .build();

          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

          env
          .addSource(sourceFunction)
          .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

          env.execute();
          }
          }

          特征

          1、Exactly-Once Processing 一次處理 MySQL CDC連接器是Flink Source連接器,它將首先讀取數(shù)據(jù)庫(kù)快照,然后即使發(fā)生故障,也將以完全一次的處理繼續(xù)讀取二進(jìn)制日志。請(qǐng)閱讀連接器如何執(zhí)行數(shù)據(jù)庫(kù)快照。

          2、Single Thread Reading 單線程閱讀 MySQL CDC源無(wú)法并行讀取,因?yàn)橹挥幸粋€(gè)任務(wù)可以接收Binlog事件。

          常見問題

          1、如何跳過快照并僅從binlog中讀取?可以通過選項(xiàng)進(jìn)行控制debezium.snapshot.mode,您可以將其設(shè)置為:

          • never:指定連接永遠(yuǎn)不要使用快照,并且在第一次使用邏輯服務(wù)器名稱啟動(dòng)時(shí),連接器應(yīng)該從binlog的開頭讀取;請(qǐng)謹(jǐn)慎使用,因?yàn)橹挥性赽inlog保證包含數(shù)據(jù)庫(kù)的整個(gè)歷史記錄時(shí)才有效。

          • schema_only:如果自連接器啟動(dòng)以來(lái)不需要數(shù)據(jù)的連續(xù)快照,而只需要它們進(jìn)行更改,則可以使用該schema_only選項(xiàng),其中連接器僅對(duì)模式(而不是數(shù)據(jù))進(jìn)行快照。

          2、如何讀取包含多個(gè)表(例如user_00,user_01,...,user99)的共享數(shù)據(jù)庫(kù)?該table-name選項(xiàng)支持正則表達(dá)式以監(jiān)視多個(gè)與正則表達(dá)式匹配的表。因此,您可以設(shè)置table-name為user.*監(jiān)視所有user_前綴表。database-name選項(xiàng)相同。請(qǐng)注意,共享表應(yīng)該在相同的架構(gòu)中。

          3、ConnectException:收到用于處理的DML'...',binlog可能包含使用語(yǔ)句或基于混合的復(fù)制格式生成的事件 如果有上述異常,請(qǐng)檢查是否binlog_format為ROW,您可以通過show variables like '%binlog_format%'在MySQL客戶端中運(yùn)行來(lái)進(jìn)行檢查。請(qǐng)注意,即使binlog_format您的數(shù)據(jù)庫(kù)配置為ROW,也可以通過其他會(huì)話更改此配置,例如SET SESSION binlog_format='MIXED'; SET SESSION tx_isolation='REPEATABLE-READ'; COMMIT;。還請(qǐng)確保沒有其他會(huì)話正在更改此配置

          實(shí)踐中遇到的問題

          1、不同的kafka版本依賴沖突會(huì)造成cdc報(bào)錯(cuò):http://apache-flink.147419.n8.nabble.com/cdc-td8357.html#a8393

          2、超時(shí)問題:根據(jù)上面提到設(shè)置wait_timeout解決。

          版權(quán)聲明:

          本文為大數(shù)據(jù)技術(shù)與架構(gòu)整理,原作者獨(dú)家授權(quán)。未經(jīng)原作者允許轉(zhuǎn)載追究侵權(quán)責(zé)任。
          微信公眾號(hào)|import_bigdata

          編輯?《大數(shù)據(jù)技術(shù)與架構(gòu)》

          插畫?《大數(shù)據(jù)技術(shù)與架構(gòu)》

          文章鏈接?https://www.jianshu.com/p/439b1d1247b2



          歡迎點(diǎn)贊+收藏+轉(zhuǎn)發(fā)朋友圈素質(zhì)三連


          文章不錯(cuò)?點(diǎn)個(gè)【在看】吧!??

          瀏覽 106
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  学生妹内射 | re久久6热 | 影音先锋中文字幕夜夜操 | 中文字幕一区二区三区润滑油 | 黑人操逼黑 |