<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink CDC 2.0 數(shù)據(jù)處理流程全面解析

          共 14275字,需瀏覽 29分鐘

           ·

          2021-11-18 06:08

          8月份 FlinkCDC 發(fā)布2.0.0版本,相較于1.0版本,在全量讀取階段支持分布式讀取、支持checkpoint,且在全量 + 增量讀取的過程在不鎖表的情況下保障數(shù)據(jù)一致性。

          Flink CDC2.0 數(shù)據(jù)讀取邏輯并不復(fù)雜,復(fù)雜的是 FLIP-27: Refactor Source Interface 的設(shè)計及對Debezium Api的不了解。本文重點(diǎn)對 Flink CDC 的處理邏輯進(jìn)行介紹, FLIP-27 的設(shè)計及 Debezium 的API調(diào)用不做過多講解。

          本文先以Flink SQL 案例來介紹Flink CDC2.0的使用,接著介紹CDC中的核心設(shè)計包含切片劃分、切分讀取、增量讀取,最后對數(shù)據(jù)處理過程中涉及flink-mysql-cdc 接口的調(diào)用及實現(xiàn)進(jìn)行代碼講解。

          案例

          全量讀取+增量讀取 Mysql表數(shù)據(jù),以changelog-json 格式寫入kafka,觀察 RowKind 類型及影響的數(shù)據(jù)條數(shù)。

          public?static?void?main(String[]?args)?{
          ????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
          ????????EnvironmentSettings?envSettings?=?EnvironmentSettings.newInstance()
          ????????????????.useBlinkPlanner()
          ????????????????.inStreamingMode()
          ????????????????.build();
          ????????env.setParallelism(3);
          ????????//?note:?增量同步需要開啟CK
          ????????env.enableCheckpointing(10000);
          ????????StreamTableEnvironment?tableEnvironment?=?StreamTableEnvironment.create(env,?envSettings);
          ????????????
          ????????tableEnvironment.executeSql("?CREATE?TABLE?demoOrders?(\n"?+
          ????????????????"?????????`order_id`?INTEGER?,\n"?+
          ????????????????"??????????`order_date`?DATE?,\n"?+
          ????????????????"??????????`order_time`?TIMESTAMP(3),\n"?+
          ????????????????"??????????`quantity`?INT?,\n"?+
          ????????????????"??????????`product_id`?INT?,\n"?+
          ????????????????"??????????`purchaser`?STRING,\n"?+
          ????????????????"???????????primary?key(order_id)??NOT?ENFORCED"?+
          ????????????????"?????????)?WITH?(\n"?+
          ????????????????"??????????'connector'?=?'mysql-cdc',\n"?+
          ????????????????"??????????'hostname'?=?'localhost',\n"?+
          ????????????????"??????????'port'?=?'3306',\n"?+
          ????????????????"??????????'username'?=?'cdc',\n"?+
          ????????????????"??????????'password'?=?'123456',\n"?+
          ????????????????"??????????'database-name'?=?'test',\n"?+
          ????????????????"??????????'table-name'?=?'demo_orders',"?+
          ????????????????????????????//??全量?+?增量同步???
          ????????????????"??????????'scan.startup.mode'?=?'initial'??????"?+
          ????????????????"?)");

          ????????????tableEnvironment.executeSql("CREATE?TABLE?sink?(\n"?+
          ????????????????"?????????`order_id`?INTEGER?,\n"?+
          ????????????????"??????????`order_date`?DATE?,\n"?+
          ????????????????"??????????`order_time`?TIMESTAMP(3),\n"?+
          ????????????????"??????????`quantity`?INT?,\n"?+
          ????????????????"??????????`product_id`?INT?,\n"?+
          ????????????????"??????????`purchaser`?STRING,\n"?+
          ????????????????"??????????primary?key?(order_id)??NOT?ENFORCED?"?+
          ????????????????")?WITH?(\n"?+
          ????????????????"????'connector'?=?'kafka',\n"?+
          ????????????????"????'properties.bootstrap.servers'?=?'localhost:9092',\n"?+
          ????????????????"????'topic'?=?'mqTest02',\n"?+
          ????????????????"????'format'?=?'changelog-json'?"+
          ????????????????")");

          ????????????tableEnvironment.executeSql("insert?into?sink?select?*?from?demoOrders");}

          全量數(shù)據(jù)輸出:

          {"data":{"order_id":1010,"order_date":"2021-09-17","order_time":"2021-09-22?10:52:12.189","quantity":53,"product_id":502,"purchaser":"flink"},"op":"+I"}
          {"data":{"order_id":1009,"order_date":"2021-09-17","order_time":"2021-09-22?10:52:09.709","quantity":31,"product_id":500,"purchaser":"flink"},"op":"+I"}
          {"data":{"order_id":1008,"order_date":"2021-09-17","order_time":"2021-09-22?10:52:06.637","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"}
          {"data":{"order_id":1007,"order_date":"2021-09-17","order_time":"2021-09-22?10:52:03.535","quantity":52,"product_id":502,"purchaser":"flink"},"op":"+I"}
          {"data":{"order_id":1002,"order_date":"2021-09-17","order_time":"2021-09-22?10:51:51.347","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"}
          {"data":{"order_id":1001,"order_date":"2021-09-17","order_time":"2021-09-22?10:51:48.783","quantity":50,"product_id":502,"purchaser":"flink"},"op":"+I"}
          {"data":{"order_id":1000,"order_date":"2021-09-17","order_time":"2021-09-17?17:40:32.354","quantity":30,"product_id":500,"purchaser":"flink"},"op":"+I"}
          {"data":{"order_id":1006,"order_date":"2021-09-17","order_time":"2021-09-22?10:52:01.249","quantity":31,"product_id":500,"purchaser":"flink"},"op":"+I"}
          {"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22?10:51:58.813","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"}
          {"data":{"order_id":1004,"order_date":"2021-09-17","order_time":"2021-09-22?10:51:56.153","quantity":50,"product_id":502,"purchaser":"flink"},"op":"+I"}
          {"data":{"order_id":1003,"order_date":"2021-09-17","order_time":"2021-09-22?10:51:53.727","quantity":30,"product_id":500,"purchaser":"flink"},"op":"+I"}

          修改表數(shù)據(jù),增量捕獲:

          ##?更新?1005?的值?
          {"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22?02:51:58.813","quantity":69,"product_id":503,"purchaser":"flink"},"op":"-U"}
          {"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22?02:55:43.627","quantity":80,"product_id":503,"purchaser":"flink"},"op":"+U"}

          ##?刪除?1000?
          {"data":{"order_id":1000,"order_date":"2021-09-17","order_time":"2021-09-17?09:40:32.354","quantity":30,"product_id":500,"purchaser":"flink"},"op":"-D"}

          核心設(shè)計

          切片劃分

          全量階段數(shù)據(jù)讀取方式為分布式讀取,會先對當(dāng)前表數(shù)據(jù)按主鍵劃分成多個Chunk,后續(xù)子任務(wù)讀取Chunk 區(qū)間內(nèi)的數(shù)據(jù)。根據(jù)主鍵列是否為自增整數(shù)類型,對表數(shù)據(jù)劃分為均勻分布的Chunk及非均勻分布的Chunk。

          均勻分布

          主鍵列自增且類型為整數(shù)類型(int,bigint,decimal)。查詢出主鍵列的最小值,最大值,按 chunkSize 大小將數(shù)據(jù)均勻劃分,因為主鍵為整數(shù)類型,根據(jù)當(dāng)前chunk 起始位置、chunkSize大小,直接計算chunk 的結(jié)束位置。

          //??計算主鍵列數(shù)據(jù)區(qū)間
          select?min(`order_id`),?max(`order_id`)?from?demo_orders;

          //??將數(shù)據(jù)劃分為?chunkSize?大小的切片
          chunk-0:?[min,start + chunkSize)
          chunk-1:?[start + chunkSize, start + 2chunkSize)
          .......
          chunk-last:?[max,null)

          非均勻分布

          主鍵列非自增或者類型為非整數(shù)類型。主鍵為非數(shù)值類型,每次劃分需要對未劃分的數(shù)據(jù)按主鍵進(jìn)行升序排列,取出前 chunkSize 的最大值為當(dāng)前 chunk 的結(jié)束位置。

          //?未拆分的數(shù)據(jù)排序后,取 chunkSize 條數(shù)據(jù)取最大值,作為切片的終止位置。
          chunkend?=?SELECT?MAX(`order_id`)?FROM?(
          ????????SELECT?`order_id`??FROM?`demo_orders`?
          ????????WHERE?`order_id`?>=?[前一個切片的起始位置]?
          ????????ORDER?BY?`order_id`?ASC?
          ????????LIMIT???[chunkSize]??
          ????)?AS?T

          全量切片數(shù)據(jù)讀取

          Flink 將表數(shù)據(jù)劃分為多個Chunk,子任務(wù)在不加鎖的情況下,并行讀取 Chunk數(shù)據(jù)。因為全程無鎖在數(shù)據(jù)分片讀取過程中,可能有其他事務(wù)對切片范圍內(nèi)的數(shù)據(jù)進(jìn)行修改,此時無法保證數(shù)據(jù)一致性。因此,在全量階段Flink 使用快照記錄讀取+Binlog數(shù)據(jù)修正的方式來保證數(shù)據(jù)的一致性。

          快照讀取

          通過JDBC執(zhí)行SQL查詢切片范圍的數(shù)據(jù)記錄。

          ##?快照記錄數(shù)據(jù)讀取SQL?
          SELECT?*?FROM?`test`.`demo_orders`?
          WHERE?order_id?>=?[chunkStart]?
          AND?NOT?(order_id?=?[chunkEnd])?
          AND?order_id?<=?[chunkEnd]

          數(shù)據(jù)修正

          在快照讀取操作前、后執(zhí)行 SHOW MASTER STATUS 查詢binlog文件的當(dāng)前偏移量,在快照讀取完畢后,查詢區(qū)間內(nèi)的binlog數(shù)據(jù)并對讀取的快照記錄進(jìn)行修正。

          快照讀取+Binlog數(shù)據(jù)讀取時的數(shù)據(jù)組織結(jié)構(gòu)。

          BinlogEvents 修正 SnapshotEvents 規(guī)則。

          • 未讀取到binlog數(shù)據(jù),即在執(zhí)行select階段沒有其他事務(wù)進(jìn)行操作,直接下發(fā)所有快照記錄。
          • 讀取到binlog數(shù)據(jù),且變更的數(shù)據(jù)記錄不屬于當(dāng)前切片,下發(fā)快照記錄。
          • 讀取到binlog數(shù)據(jù),且數(shù)據(jù)記錄的變更屬于當(dāng)前切片。delete 操作從快照內(nèi)存中移除該數(shù)據(jù),insert 操作向快照內(nèi)存添加新的數(shù)據(jù),update操作向快照內(nèi)存中添加變更記錄,最終會輸出更新前后的兩條記錄到下游。

          修正后的數(shù)據(jù)組織結(jié)構(gòu):

          以讀取切片[1,11)范圍的數(shù)據(jù)為例,描述切片數(shù)據(jù)的處理過程。c,d,u代表Debezium捕獲到的新增、刪除、更新操作。

          修正前數(shù)據(jù)及結(jié)構(gòu):

          修正后數(shù)據(jù)及結(jié)構(gòu):

          單個切片數(shù)據(jù)處理完畢后會向 SplitEnumerator 發(fā)送已完成切片數(shù)據(jù)的起始位置(ChunkStart, ChunkStartEnd)、Binlog的最大偏移量(High watermark),用來為增量讀取指定起始偏移量。

          單個切片數(shù)據(jù)處理完畢后會向 SplitEnumerator 發(fā)送已完成切片數(shù)據(jù)的起始位置(ChunkStart, ChunkStartEnd)、Binlog的最大偏移量(High watermark),用來為增量讀取指定起始偏移量。

          增量切片數(shù)據(jù)讀取

          全量階段切片數(shù)據(jù)讀取完成后,SplitEnumerator 會下發(fā)一個 BinlogSplit 進(jìn)行增量數(shù)據(jù)讀取。BinlogSplit讀取最重要的屬性就是起始偏移量,偏移量如果設(shè)置過小下游可能會有重復(fù)數(shù)據(jù),偏移量如果設(shè)置過大下游可能是已超期的臟數(shù)據(jù)。而 Flink CDC增量讀取的起始偏移量為所有已完成的全量切片最小的Binlog偏移量,只有滿足條件的數(shù)據(jù)才被下發(fā)到下游。

          數(shù)據(jù)下發(fā)條件:

          • 捕獲的Binlog數(shù)據(jù)的偏移量 > 數(shù)據(jù)所屬分片的Binlog的最大偏移量。

          例如,SplitEnumerator 保留的已完成切片信息為。

          增量讀取時,從偏移量 800 開始讀取Binlog數(shù)據(jù) ,當(dāng)捕獲到數(shù)據(jù) 時,先找到 123 所屬快照分片,并找到對應(yīng)的最大Binlog 偏移量 800。當(dāng)前偏移量大于快照讀的最大偏移量,則下發(fā)數(shù)據(jù),否則直接丟棄。

          代碼詳解

          關(guān)于 FLIP-27: Refactor Source Interface 設(shè)計不做詳細(xì)介紹,本文側(cè)重對 flink-mysql-cdc 接口調(diào)用及實現(xiàn)進(jìn)行講解。

          MySqlSourceEnumerator 初始化

          SourceCoordinator作為OperatorCoordinator對Source的實現(xiàn),運(yùn)行在Master節(jié)點(diǎn),在啟動時通過調(diào)用MySqlParallelSource#createEnumerator 創(chuàng)建 MySqlSourceEnumerator 并調(diào)用start方法,做一些初始化工作。

          1. 創(chuàng)建 MySqlSourceEnumerator,使用 MySqlHybridSplitAssigner 對全量+增量數(shù)據(jù)進(jìn)行切片,使用 MySqlValidator 對 mysql 版本、配置進(jìn)行校驗。

          2. MySqlValidator 校驗:

          • mysql版本必須大于等于5.7。
          • binlog_format 配置必須為 ROW。
          • binlog_row_image 配置必須為 FULL。
          1. MySqlSplitAssigner 初始化:
          • 創(chuàng)建 ChunkSplitter用來劃分切片。
          • 篩選出要讀的表名稱。
          1. 啟動周期調(diào)度線程,要求 SourceReader 向 SourceEnumerator 發(fā)送已完成但未發(fā)送ACK事件的切片信息。
          private?void?syncWithReaders(int[]?subtaskIds,?Throwable?t)?{
          ????if?(t?!=?null)?{
          ????????throw?new?FlinkRuntimeException("Failed?to?list?obtain?registered?readers?due?to:",?t);
          ????}
          ????//?when?the?SourceEnumerator?restores?or?the?communication?failed?between
          ????//?SourceEnumerator?and?SourceReader,?it?may?missed?some?notification?event.
          ????//?tell?all?SourceReader(s)?to?report?there?finished?but?unacked?splits.
          ????if?(splitAssigner.waitingForFinishedSplits())?{
          ????????for?(int?subtaskId?:?subtaskIds)?{
          ????????????//?note:?發(fā)送?FinishedSnapshotSplitsRequestEvent?
          ????????????context.sendEventToSourceReader(
          ????????????????????subtaskId,?new?FinishedSnapshotSplitsRequestEvent());
          ????????}
          ????}
          }

          MySqlSourceReader 初始化

          SourceOperator 集成了SourceReader,通過OperatorEventGateway 和 SourceCoordinator 進(jìn)行交互。

          1. SourceOperator 在初始化時,通過 MySqlParallelSource 創(chuàng)建 MySqlSourceReader。MySqlSourceReader 通過 SingleThreadFetcherManager 創(chuàng)建Fetcher拉取分片數(shù)據(jù),數(shù)據(jù)以 MySqlRecords 格式寫入到 elementsQueue。
          MySqlParallelSource#createReader

          public?SourceReader?createReader(SourceReaderContext?readerContext)?throws?Exception?{
          ????//?note:??數(shù)據(jù)存儲隊列
          FutureCompletingBlockingQueue>?elementsQueue?=
          ????????new?FutureCompletingBlockingQueue<>();
          final?Configuration?readerConfiguration?=?getReaderConfig(readerContext);

          ????//?note:?Split?Reader?工廠類
          Supplier?splitReaderSupplier?=
          ????????()?->?new?MySqlSplitReader(readerConfiguration,?readerContext.getIndexOfSubtask());

          return?new?MySqlSourceReader<>(
          ????????elementsQueue,
          ????????splitReaderSupplier,
          ????????new?MySqlRecordEmitter<>(deserializationSchema),
          ????????readerConfiguration,
          ????????readerContext);
          }
          1. 將創(chuàng)建的 MySqlSourceReader 以事件的形式傳遞給 SourceCoordinator 進(jìn)行注冊。SourceCoordinator 接收到注冊事件后,將reader 地址及索引進(jìn)行保存。
          SourceCoordinator#handleReaderRegistrationEvent
          //?note:?SourceCoordinator?處理Reader?注冊事件
          private?void?handleReaderRegistrationEvent(ReaderRegistrationEvent?event)?{
          ????context.registerSourceReader(new?ReaderInfo(event.subtaskId(),?event.location()));
          ????enumerator.addReader(event.subtaskId());
          }
          1. MySqlSourceReader 啟動后會向 MySqlSourceEnumerator 發(fā)送請求分片事件,從而收集分配的切片數(shù)據(jù)。

          2. SourceOperator 初始化完畢后,調(diào)用 emitNext 由 SourceReaderBase 從 elementsQueue 獲取數(shù)據(jù)集合并下發(fā)給 MySqlRecordEmitter。接口調(diào)用示意圖:

          MySqlSourceEnumerator 處理分片請求

          MySqlSourceReader 啟動時會向 MySqlSourceEnumerator 發(fā)送請求 RequestSplitEvent 事件,根據(jù)返回的切片范圍讀取區(qū)間數(shù)據(jù)。MySqlSourceEnumerator 全量讀取階段分片請求處理邏輯,最終返回一個MySqlSnapshotSplit。

          1. 處理切片請求事件,為請求的Reader分配切片,通過發(fā)送AddSplitEvent時間傳遞MySqlSplit(全量階段MySqlSnapshotSplit、增量階段MySqlBinlogSplit)。
          MySqlSourceEnumerator#handleSplitRequest
          public?void?handleSplitRequest(int?subtaskId,?@Nullable?String?requesterHostname)?{
          ????if?(!context.registeredReaders().containsKey(subtaskId))?{
          ????????//?reader?failed?between?sending?the?request?and?now.?skip?this?request.
          ????????return;
          ????}
          ????//?note:??將reader所屬的subtaskId存儲到TreeSet,?在處理binlog?split時優(yōu)先分配個task-0
          ????readersAwaitingSplit.add(subtaskId);

          ????assignSplits();
          }

          //?note:?分配切片
          private?void?assignSplits()?{
          ????final?Iterator?awaitingReader?=?readersAwaitingSplit.iterator();
          ????while?(awaitingReader.hasNext())?{
          ????????int?nextAwaiting?=?awaitingReader.next();
          ????????//?if?the?reader?that?requested?another?split?has?failed?in?the?meantime,?remove
          ????????//?it?from?the?list?of?waiting?readers
          ????????if?(!context.registeredReaders().containsKey(nextAwaiting))?{
          ????????????awaitingReader.remove();
          ????????????continue;
          ????????}

          ????????//note:?由?MySqlSplitAssigner?分配切片
          ????????Optional?split?=?splitAssigner.getNext();
          ????????if?(split.isPresent())?{
          ????????????final?MySqlSplit?mySqlSplit?=?split.get();
          ????????????//??note:?發(fā)送AddSplitEvent,?為?Reader?返回切片信息
          ????????????context.assignSplit(mySqlSplit,?nextAwaiting);
          ????????????awaitingReader.remove();

          ????????????LOG.info("Assign?split?{}?to?subtask?{}",?mySqlSplit,?nextAwaiting);
          ????????}?else?{
          ????????????//?there?is?no?available?splits?by?now,?skip?assigning
          ????????????break;
          ????????}
          ????}
          }
          1. MySqlHybridSplitAssigner 處理全量切片、增量切片的邏輯。
          • 任務(wù)剛啟動時,remainingTables不為空,noMoreSplits返回值為false,創(chuàng)建 SnapshotSplit。

          • 全量階段分片讀取完成后,noMoreSplits返回值為true, 創(chuàng)建 BinlogSplit。

          MySqlHybridSplitAssigner#getNext
          @Override
          public?Optional?getNext()?{
          ????if?(snapshotSplitAssigner.noMoreSplits())?{
          ????????//?binlog?split?assigning
          ????????if?(isBinlogSplitAssigned)?{
          ????????????//?no?more?splits?for?the?assigner
          ????????????return?Optional.empty();
          ????????}?else?if?(snapshotSplitAssigner.isFinished())?{
          ????????????//?we?need?to?wait?snapshot-assigner?to?be?finished?before
          ????????????//?assigning?the?binlog?split.?Otherwise,?records?emitted?from?binlog?split
          ????????????//?might?be?out-of-order?in?terms?of?same?primary?key?with?snapshot?splits.
          ????????????isBinlogSplitAssigned?=?true;

          ????????????//note: snapshot split 切片完成后,創(chuàng)建BinlogSplit。
          ????????????return?Optional.of(createBinlogSplit());
          ????????}?else?{
          ????????????//?binlog?split?is?not?ready?by?now
          ????????????return?Optional.empty();
          ????????}
          ????}?else?{
          ????????//?note:?由MySqlSnapshotSplitAssigner?創(chuàng)建?SnapshotSplit
          ????????//?snapshot?assigner?still?have?remaining?splits,?assign?split?from?it
          ????????return?snapshotSplitAssigner.getNext();
          ????}
          }

          1. MySqlSnapshotSplitAssigner 處理全量切片邏輯,通過 ChunkSplitter 生成切片,并存儲到Iterator中。
          @Override
          public?Optional?getNext()?{
          ????if?(!remainingSplits.isEmpty())?{
          ????????//?return?remaining?splits?firstly
          ????????Iterator?iterator?=?remainingSplits.iterator();
          ????????MySqlSnapshotSplit?split?=?iterator.next();
          ????????iterator.remove();
          ????????
          ????????//note:?已分配的切片存儲到?assignedSplits?集合
          ????????assignedSplits.put(split.splitId(),?split);

          ????????return?Optional.of(split);
          ????}?else?{
          ????????//?note:?初始化階段?remainingTables?存儲了要讀取的表名
          ????????TableId?nextTable?=?remainingTables.pollFirst();
          ????????if?(nextTable?!=?null)?{
          ????????????//?split?the?given?table?into?chunks?(snapshot?splits)
          ????????????//??note:?初始化階段創(chuàng)建了?ChunkSplitter,調(diào)用generateSplits?進(jìn)行切片劃分
          ????????????Collection?splits?=?chunkSplitter.generateSplits(nextTable);
          ????????????//??note:?保留所有切片信息
          ????????????remainingSplits.addAll(splits);
          ????????????//??note:?已經(jīng)完成分片的?Table
          ????????????alreadyProcessedTables.add(nextTable);
          ????????????//??note:?遞歸調(diào)用該該方法
          ????????????return?getNext();
          ????????}?else?{
          ????????????return?Optional.empty();
          ????????}
          ????}
          }
          1. ChunkSplitter 將表劃分為均勻分布 or 不均勻分布切片的邏輯。讀取的表必須包含物理主鍵。
          public?Collection?generateSplits(TableId?tableId)?{

          ????Table?schema?=?mySqlSchema.getTableSchema(tableId).getTable();
          ????List?primaryKeys?=?schema.primaryKeyColumns();
          ????//?note:?必須有主鍵
          ????if?(primaryKeys.isEmpty())?{
          ????????throw?new?ValidationException(
          ????????????????String.format(
          ????????????????????????"Incremental?snapshot?for?tables?requires?primary?key,"
          ????????????????????????????????+?"?but?table?%s?doesn't?have?primary?key.",
          ????????????????????????tableId));
          ????}
          ????//?use?first?field?in?primary?key?as?the?split?key
          ????Column?splitColumn?=?primaryKeys.get(0);

          ????final?List?chunks;
          ????try?{
          ?????????//?note:?按主鍵列將數(shù)據(jù)劃分成多個切片
          ????????chunks?=?splitTableIntoChunks(tableId,?splitColumn);
          ????}?catch?(SQLException?e)?{
          ????????throw?new?FlinkRuntimeException("Failed?to?split?chunks?for?table?"?+?tableId,?e);
          ????}
          ????//note:?主鍵數(shù)據(jù)類型轉(zhuǎn)換、ChunkRange 包裝成MySqlSnapshotSplit。
          ????//?convert?chunks?into?splits
          ????List?splits?=?new?ArrayList<>();
          ????RowType?splitType?=?splitType(splitColumn);
          ?
          ????for?(int?i?=?0;?i?????????ChunkRange?chunk?=?chunks.get(i);
          ????????MySqlSnapshotSplit?split?=
          ????????????????createSnapshotSplit(
          ????????????????????????tableId,?i,?splitType,?chunk.getChunkStart(),?chunk.getChunkEnd());
          ????????splits.add(split);
          ????}
          ????return?splits;
          }
          1. splitTableIntoChunks 根據(jù)物理主鍵劃分切片。
          private?List?splitTableIntoChunks(TableId?tableId,?Column?splitColumn)
          ????????throws?SQLException?{
          ????final?String?splitColumnName?=?splitColumn.name();
          ????//??select?min,?max
          ????final?Object[]?minMaxOfSplitColumn?=?queryMinMax(jdbc,?tableId,?splitColumnName);
          ????final?Object?min?=?minMaxOfSplitColumn[0];
          ????final?Object?max?=?minMaxOfSplitColumn[1];
          ????if?(min?==?null?||?max?==?null?||?min.equals(max))?{
          ????????//?empty?table,?or?only?one?row,?return?full?table?scan?as?a?chunk
          ????????return?Collections.singletonList(ChunkRange.all());
          ????}

          ????final?List?chunks;
          ????if?(splitColumnEvenlyDistributed(splitColumn))?{
          ????????//?use?evenly-sized?chunks?which?is?much?efficient
          ????????//?note:?按主鍵均勻劃分
          ????????chunks?=?splitEvenlySizedChunks(min,?max);
          ????}?else?{
          ????????//?note:?按主鍵非均勻劃分
          ????????//?use?unevenly-sized?chunks?which?will?request?many?queries?and?is?not?efficient.
          ????????chunks?=?splitUnevenlySizedChunks(tableId,?splitColumnName,?min,?max);
          ????}

          ????return?chunks;
          }

          /**?Checks?whether?split?column?is?evenly?distributed?across?its?range.?*/
          private?static?boolean?splitColumnEvenlyDistributed(Column?splitColumn)?{
          ????//?only?column?is?auto-incremental?are?recognized?as?evenly?distributed.
          ????//?TODO:?we?may?use?MAX,MIN,COUNT?to?calculate?the?distribution?in?the?future.
          ????if?(splitColumn.isAutoIncremented())?{
          ????????DataType?flinkType?=?MySqlTypeUtils.fromDbzColumn(splitColumn);
          ????????LogicalTypeRoot?typeRoot?=?flinkType.getLogicalType().getTypeRoot();
          ????????//?currently,?we?only?support?split?column?with?type?BIGINT,?INT,?DECIMAL
          ????????return?typeRoot?==?LogicalTypeRoot.BIGINT
          ????????????????||?typeRoot?==?LogicalTypeRoot.INTEGER
          ????????????????||?typeRoot?==?LogicalTypeRoot.DECIMAL;
          ????}?else?{
          ????????return?false;
          ????}
          }


          /**
          ?*??根據(jù)拆分列的最小值和最大值將表拆分為大小均勻的塊,并以?{@link?#chunkSize}?步長滾動塊。
          ?*?Split?table?into?evenly?sized?chunks?based?on?the?numeric?min?and?max?value?of?split?column,
          ?*?and?tumble?chunks?in?{@link?#chunkSize}?step?size.
          ?*/
          private?List?splitEvenlySizedChunks(Object?min,?Object?max)?{
          ????if?(ObjectUtils.compare(ObjectUtils.plus(min,?chunkSize),?max)?>?0)?{
          ????????//?there?is?no?more?than?one?chunk,?return?full?table?as?a?chunk
          ????????return?Collections.singletonList(ChunkRange.all());
          ????}

          ????final?List?splits?=?new?ArrayList<>();
          ????Object?chunkStart?=?null;
          ????Object?chunkEnd?=?ObjectUtils.plus(min,?chunkSize);
          ????//??chunkEnd?<=?max
          ????while?(ObjectUtils.compare(chunkEnd,?max)?<=?0)?{
          ????????splits.add(ChunkRange.of(chunkStart,?chunkEnd));
          ????????chunkStart?=?chunkEnd;
          ????????chunkEnd?=?ObjectUtils.plus(chunkEnd,?chunkSize);
          ????}
          ????//?add?the?ending?split
          ????splits.add(ChunkRange.of(chunkStart,?null));
          ????return?splits;
          }

          /**???通過連續(xù)計算下一個塊最大值,將表拆分為大小不均勻的塊。
          ?*?Split?table?into?unevenly?sized?chunks?by?continuously?calculating?next?chunk?max?value.?*/
          private?List?splitUnevenlySizedChunks(
          ????????TableId?tableId,?String?splitColumnName,?Object?min,?Object?max)?throws?SQLException?{
          ????final?List?splits?=?new?ArrayList<>();
          ????Object?chunkStart?=?null;

          ????Object?chunkEnd?=?nextChunkEnd(min,?tableId,?splitColumnName,?max);
          ????int?count?=?0;
          ????while?(chunkEnd?!=?null?&&?ObjectUtils.compare(chunkEnd,?max)?<=?0)?{
          ????????//?we?start?from?[null,?min?+?chunk_size)?and?avoid?[null,?min)
          ????????splits.add(ChunkRange.of(chunkStart,?chunkEnd));
          ????????//?may?sleep?a?while?to?avoid?DDOS?on?MySQL?server
          ????????maySleep(count++);
          ????????chunkStart?=?chunkEnd;
          ????????chunkEnd?=?nextChunkEnd(chunkEnd,?tableId,?splitColumnName,?max);
          ????}
          ????//?add?the?ending?split
          ????splits.add(ChunkRange.of(chunkStart,?null));
          ????return?splits;
          }

          private?Object?nextChunkEnd(
          ????????Object?previousChunkEnd,?TableId?tableId,?String?splitColumnName,?Object?max)
          ????????throws?SQLException?{
          ????//?chunk?end?might?be?null?when?max?values?are?removed
          ????Object?chunkEnd?=
          ????????????queryNextChunkMax(jdbc,?tableId,?splitColumnName,?chunkSize,?previousChunkEnd);
          ????if?(Objects.equals(previousChunkEnd,?chunkEnd))?{
          ????????//?we?don't?allow?equal?chunk?start?and?end,
          ????????//?should?query?the?next?one?larger?than?chunkEnd
          ????????chunkEnd?=?queryMin(jdbc,?tableId,?splitColumnName,?chunkEnd);
          ????}
          ????if?(ObjectUtils.compare(chunkEnd,?max)?>=?0)?{
          ????????return?null;
          ????}?else?{
          ????????return?chunkEnd;
          ????}
          }

          MySqlSourceReader 處理切片分配請求

          MySqlSourceReader接收到切片分配請求后,會為先創(chuàng)建一個 SplitFetcher線程,向 taskQueue 添加、執(zhí)行AddSplitsTask 任務(wù)用來處理添加分片任務(wù),接著執(zhí)行 FetchTask 使用Debezium API進(jìn)行讀取數(shù)據(jù),讀取的數(shù)據(jù)存儲到elementsQueue中,SourceReaderBase 會從該隊列中獲取數(shù)據(jù),并下發(fā)給 MySqlRecordEmitter。

          1. 處理切片分配事件時,創(chuàng)建SplitFetcher向taskQueue添加AddSplitsTask。
          SingleThreadFetcherManager#addSplits
          public?void?addSplits(List?splitsToAdd)?{
          ????SplitFetcher?fetcher?=?getRunningFetcher();
          ????if?(fetcher?==?null)?{
          ????????fetcher?=?createSplitFetcher();
          ????????//?Add?the?splits?to?the?fetchers.
          ????????fetcher.addSplits(splitsToAdd);
          ????????startFetcher(fetcher);
          ????}?else?{
          ????????fetcher.addSplits(splitsToAdd);
          ????}
          }

          //?創(chuàng)建?SplitFetcher
          protected?synchronized?SplitFetcher?createSplitFetcher()?{
          ????if?(closed)?{
          ????????throw?new?IllegalStateException("The?split?fetcher?manager?has?closed.");
          ????}
          ????//?Create?SplitReader.
          ????SplitReader?splitReader?=?splitReaderFactory.get();

          ????int?fetcherId?=?fetcherIdGenerator.getAndIncrement();
          ????SplitFetcher?splitFetcher?=
          ????????????new?SplitFetcher<>(
          ????????????????????fetcherId,
          ????????????????????elementsQueue,
          ????????????????????splitReader,
          ????????????????????errorHandler,
          ????????????????????()?->?{
          ????????????????????????fetchers.remove(fetcherId);
          ????????????????????????elementsQueue.notifyAvailable();
          ????????????????????});
          ????fetchers.put(fetcherId,?splitFetcher);
          ????return?splitFetcher;
          }

          public?void?addSplits(List?splitsToAdd)?{
          ????enqueueTask(new?AddSplitsTask<>(splitReader,?splitsToAdd,?assignedSplits));
          ????wakeUp(true);
          }

          1. 執(zhí)行 SplitFetcher線程,首次執(zhí)行 AddSplitsTask 線程添加分片,以后執(zhí)行 FetchTask 線程拉取數(shù)據(jù)。
          SplitFetcher#runOnce
          void?runOnce()?{
          ????try?{
          ????????if?(shouldRunFetchTask())?{
          ????????????runningTask?=?fetchTask;
          ????????}?else?{
          ????????????runningTask?=?taskQueue.take();
          ????????}
          ????????
          ????????if?(!wakeUp.get()?&&?runningTask.run())?{
          ????????????LOG.debug("Finished?running?task?{}",?runningTask);
          ????????????runningTask?=?null;
          ????????????checkAndSetIdle();
          ????????}
          ????}?catch?(Exception?e)?{
          ????????throw?new?RuntimeException(
          ????????????????String.format(
          ????????????????????????"SplitFetcher?thread?%d?received?unexpected?exception?while?polling?the?records",
          ????????????????????????id),
          ????????????????e);
          ????}

          ????maybeEnqueueTask(runningTask);
          ????synchronized?(wakeUp)?{
          ????????//?Set?the?running?task?to?null.?It?is?necessary?for?the?shutdown?method?to?avoid
          ????????//?unnecessarily?interrupt?the?running?task.
          ????????runningTask?=?null;
          ????????//?Set?the?wakeUp?flag?to?false.
          ????????wakeUp.set(false);
          ????????LOG.debug("Cleaned?wakeup?flag.");
          ????}
          }
          1. AddSplitsTask 調(diào)用 MySqlSplitReader 的 handleSplitsChanges方法,向切片隊列中添加已分配的切片信息。在下一次fetch()調(diào)用時,從隊列中獲取切片并讀取切片數(shù)據(jù)。
          AddSplitsTask#run
          public?boolean?run()?{
          ????for?(SplitT?s?:?splitsToAdd)?{
          ????????assignedSplits.put(s.splitId(),?s);
          ????}
          ????splitReader.handleSplitsChanges(new?SplitsAddition<>(splitsToAdd));
          ????return?true;
          }
          MySqlSplitReader#handleSplitsChanges
          public?void?handleSplitsChanges(SplitsChange?splitsChanges)?{
          ????if?(!(splitsChanges?instanceof?SplitsAddition))?{
          ????????throw?new?UnsupportedOperationException(
          ????????????????String.format(
          ????????????????????????"The?SplitChange?type?of?%s?is?not?supported.",
          ????????????????????????splitsChanges.getClass()));
          ????}

          ????//note:?添加切片?到隊列。
          ????splits.addAll(splitsChanges.splits());
          }
          1. MySqlSplitReader 執(zhí)行fetch(),由DebeziumReader讀取數(shù)據(jù)到事件隊列,在對數(shù)據(jù)修正后以MySqlRecords格式返回。
          MySqlSplitReader#fetch
          @Override
          public?RecordsWithSplitIds?fetch()?throws?IOException?{
          ????//?note:?創(chuàng)建Reader?并讀取數(shù)據(jù)
          ????checkSplitOrStartNext();

          ????Iterator?dataIt?=?null;
          ????try?{
          ????????//?note:??對讀取的數(shù)據(jù)進(jìn)行修正
          ????????dataIt?=?currentReader.pollSplitRecords();
          ????}?catch?(InterruptedException?e)?{
          ????????LOG.warn("fetch?data?failed.",?e);
          ????????throw?new?IOException(e);
          ????}

          ????//??note:?返回的數(shù)據(jù)被封裝為?MySqlRecords?進(jìn)行傳輸
          ????return?dataIt?==?null
          ??????????????finishedSnapshotSplit()???
          ????????????:?MySqlRecords.forRecords(currentSplitId,?dataIt);
          }

          private?void?checkSplitOrStartNext()?throws?IOException?{
          ????//?the?binlog?reader?should?keep?alive
          ????if?(currentReader?instanceof?BinlogSplitReader)?{
          ????????return;
          ????}

          ????if?(canAssignNextSplit())?{
          ????????//?note:??從切片隊列讀取MySqlSplit
          ????????final?MySqlSplit?nextSplit?=?splits.poll();
          ????????if?(nextSplit?==?null)?{
          ????????????throw?new?IOException("Cannot?fetch?from?another?split?-?no?split?remaining");
          ????????}

          ????????currentSplitId?=?nextSplit.splitId();
          ????????//?note:??區(qū)分全量切片讀取還是增量切片讀取
          ????????if?(nextSplit.isSnapshotSplit())?{
          ????????????if?(currentReader?==?null)?{
          ????????????????final?MySqlConnection?jdbcConnection?=?getConnection(config);
          ????????????????final?BinaryLogClient?binaryLogClient?=?getBinaryClient(config);

          ????????????????final?StatefulTaskContext?statefulTaskContext?=
          ????????????????????????new?StatefulTaskContext(config,?binaryLogClient,?jdbcConnection);
          ????????????????//?note:?創(chuàng)建SnapshotSplitReader,使用Debezium?Api讀取分配數(shù)據(jù)及區(qū)間Binlog值
          ????????????????currentReader?=?new?SnapshotSplitReader(statefulTaskContext,?subtaskId);
          ????????????}

          ????????}?else?{
          ????????????//?point?from?snapshot?split?to?binlog?split
          ????????????if?(currentReader?!=?null)?{
          ????????????????LOG.info("It's?turn?to?read?binlog?split,?close?current?snapshot?reader");
          ????????????????currentReader.close();
          ????????????}

          ????????????final?MySqlConnection?jdbcConnection?=?getConnection(config);
          ????????????final?BinaryLogClient?binaryLogClient?=?getBinaryClient(config);
          ????????????final?StatefulTaskContext?statefulTaskContext?=
          ????????????????????new?StatefulTaskContext(config,?binaryLogClient,?jdbcConnection);
          ????????????LOG.info("Create?binlog?reader");
          ????????????//?note:?創(chuàng)建BinlogSplitReader,使用Debezium?API進(jìn)行增量讀取
          ????????????currentReader?=?new?BinlogSplitReader(statefulTaskContext,?subtaskId);
          ????????}
          ????????//?note:?執(zhí)行Reader進(jìn)行數(shù)據(jù)讀取
          ????????currentReader.submitSplit(nextSplit);
          ????}
          }

          DebeziumReader 數(shù)據(jù)處理

          DebeziumReader 包含全量切片讀取、增量切片讀取兩個階段,數(shù)據(jù)讀取后存儲到 ChangeEventQueue,執(zhí)行pollSplitRecords 時對數(shù)據(jù)進(jìn)行修正。

          1. SnapshotSplitReader 全量切片讀取。全量階段的數(shù)據(jù)讀取通過執(zhí)行Select語句查詢出切片范圍內(nèi)的表數(shù)據(jù),在寫入隊列前后執(zhí)行 SHOW MASTER STATUS 時,寫入當(dāng)前偏移量。
          public?void?submitSplit(MySqlSplit?mySqlSplit)?{
          ????......
          ????executor.submit(
          ????????????()?->?{
          ????????????????try?{
          ????????????????????currentTaskRunning?=?true;
          ????????????????????//?note:?數(shù)據(jù)讀取,在數(shù)據(jù)前后插入Binlog當(dāng)前偏移量
          ????????????????????//?1.?execute?snapshot?read?task。?
          ????????????????????final?SnapshotSplitChangeEventSourceContextImpl?sourceContext?=
          ????????????????????????????new?SnapshotSplitChangeEventSourceContextImpl();
          ????????????????????SnapshotResult?snapshotResult?=
          ????????????????????????????splitSnapshotReadTask.execute(sourceContext);

          ????????????????????//??note:?為增量讀取做準(zhǔn)備,包含了起始偏移量
          ????????????????????final?MySqlBinlogSplit?appendBinlogSplit?=?createBinlogSplit(sourceContext);
          ????????????????????final?MySqlOffsetContext?mySqlOffsetContext?=
          ????????????????????????????statefulTaskContext.getOffsetContext();
          ????????????????????mySqlOffsetContext.setBinlogStartPoint(
          ????????????????????????????appendBinlogSplit.getStartingOffset().getFilename(),
          ????????????????????????????appendBinlogSplit.getStartingOffset().getPosition());

          ????????????????????//??note:?從起始偏移量開始讀取???????????
          ????????????????????//?2.?execute?binlog?read?task
          ????????????????????if?(snapshotResult.isCompletedOrSkipped())?{
          ????????????????????????//?we?should?only?capture?events?for?the?current?table,
          ????????????????????????Configuration?dezConf?=
          ????????????????????????????????statefulTaskContext
          ????????????????????????????????????????.getDezConf()
          ????????????????????????????????????????.edit()
          ????????????????????????????????????????.with(
          ????????????????????????????????????????????????"table.whitelist",
          ????????????????????????????????????????????????currentSnapshotSplit.getTableId())
          ????????????????????????????????????????.build();

          ????????????????????????//?task?to?read?binlog?for?current?split
          ????????????????????????MySqlBinlogSplitReadTask?splitBinlogReadTask?=
          ????????????????????????????????new?MySqlBinlogSplitReadTask(
          ????????????????????????????????????????new?MySqlConnectorConfig(dezConf),
          ????????????????????????????????????????mySqlOffsetContext,
          ????????????????????????????????????????statefulTaskContext.getConnection(),
          ????????????????????????????????????????statefulTaskContext.getDispatcher(),
          ????????????????????????????????????????statefulTaskContext.getErrorHandler(),
          ????????????????????????????????????????StatefulTaskContext.getClock(),
          ????????????????????????????????????????statefulTaskContext.getTaskContext(),
          ????????????????????????????????????????(MySqlStreamingChangeEventSourceMetrics)
          ????????????????????????????????????????????????statefulTaskContext
          ????????????????????????????????????????????????????????.getStreamingChangeEventSourceMetrics(),
          ????????????????????????????????????????statefulTaskContext
          ????????????????????????????????????????????????.getTopicSelector()
          ????????????????????????????????????????????????.getPrimaryTopic(),
          ????????????????????????????????????????appendBinlogSplit);

          ????????????????????????splitBinlogReadTask.execute(
          ????????????????????????????????new?SnapshotBinlogSplitChangeEventSourceContextImpl());
          ????????????????????}?else?{
          ????????????????????????readException?=
          ????????????????????????????????new?IllegalStateException(
          ????????????????????????????????????????String.format(
          ????????????????????????????????????????????????"Read?snapshot?for?mysql?split?%s?fail",
          ????????????????????????????????????????????????currentSnapshotSplit));
          ????????????????????}
          ????????????????}?catch?(Exception?e)?{
          ????????????????????currentTaskRunning?=?false;
          ????????????????????LOG.error(
          ????????????????????????????String.format(
          ????????????????????????????????????"Execute?snapshot?read?task?for?mysql?split?%s?fail",
          ????????????????????????????????????currentSnapshotSplit),
          ????????????????????????????e);
          ????????????????????readException?=?e;
          ????????????????}
          ????????????});
          }
          1. SnapshotSplitReader 增量切片讀取。增量階段切片讀取重點(diǎn)是判斷BinlogSplitReadTask什么時候停止,在讀取到分片階段的結(jié)束時的偏移量即終止。
          MySqlBinlogSplitReadTask#handleEvent
          protected?void?handleEvent(Event?event)?{
          ????//?note:?事件下發(fā)?隊列
          ????super.handleEvent(event);
          ????//?note:?全量讀取階段需要終止Binlog讀取
          ????//?check?do?we?need?to?stop?for?read?binlog?for?snapshot?split.
          ????if?(isBoundedRead())?{
          ????????final?BinlogOffset?currentBinlogOffset?=
          ????????????????new?BinlogOffset(
          ????????????????????????offsetContext.getOffset().get(BINLOG_FILENAME_OFFSET_KEY).toString(),
          ????????????????????????Long.parseLong(
          ????????????????????????????????offsetContext
          ????????????????????????????????????????.getOffset()
          ????????????????????????????????????????.get(BINLOG_POSITION_OFFSET_KEY)
          ????????????????????????????????????????.toString()));
          ????????//?note:?currentBinlogOffset?>?HW?停止讀取
          ????????//?reach?the?high?watermark,?the?binlog?reader?should?finished
          ????????if?(currentBinlogOffset.isAtOrBefore(binlogSplit.getEndingOffset()))?{
          ????????????//?send?binlog?end?event
          ????????????try?{
          ????????????????signalEventDispatcher.dispatchWatermarkEvent(
          ????????????????????????binlogSplit,
          ????????????????????????currentBinlogOffset,
          ????????????????????????SignalEventDispatcher.WatermarkKind.BINLOG_END);
          ????????????}?catch?(InterruptedException?e)?{
          ????????????????logger.error("Send?signal?event?error.",?e);
          ????????????????errorHandler.setProducerThrowable(
          ????????????????????????new?DebeziumException("Error?processing?binlog?signal?event",?e));
          ????????????}
          ????????????//??終止binlog讀取
          ????????????//?tell?reader?the?binlog?task?finished
          ????????????((SnapshotBinlogSplitChangeEventSourceContextImpl)?context).finished();
          ????????}
          ????}
          }
          1. SnapshotSplitReader 執(zhí)行pollSplitRecords 時對隊列中的原始數(shù)據(jù)進(jìn)行修正。具體處理邏輯查看 RecordUtils#normalizedSplitRecords。
          public?Iterator?pollSplitRecords()?throws?InterruptedException?{
          ????if?(hasNextElement.get())?{
          ????????//?data?input:?[low?watermark?event][snapshot?events][high?watermark?event][binlogevents][binlog-end?event]
          ????????//?data?output:?[low?watermark?event][normalized?events][high?watermark?event]
          ????????boolean?reachBinlogEnd?=?false;
          ????????final?List?sourceRecords?=?new?ArrayList<>();
          ????????while?(!reachBinlogEnd)?{
          ????????????//?note:?處理隊列中寫入的?DataChangeEvent?事件
          ????????????List?batch?=?queue.poll();
          ????????????for?(DataChangeEvent?event?:?batch)?{
          ????????????????sourceRecords.add(event.getRecord());
          ????????????????if?(RecordUtils.isEndWatermarkEvent(event.getRecord()))?{
          ????????????????????reachBinlogEnd?=?true;
          ????????????????????break;
          ????????????????}
          ????????????}
          ????????}
          ????????//?snapshot?split?return?its?data?once
          ????????hasNextElement.set(false);
          ????????//??************???修正數(shù)據(jù)??***********
          ????????return?normalizedSplitRecords(currentSnapshotSplit,?sourceRecords,?nameAdjuster)
          ????????????????.iterator();
          ????}
          ????//?the?data?has?been?polled,?no?more?data
          ????reachEnd.compareAndSet(false,?true);
          ????return?null;
          }
          1. BinlogSplitReader 數(shù)據(jù)讀取。讀取邏輯比較簡單,重點(diǎn)是起始偏移量的設(shè)置,起始偏移量為所有切片的HW。

          2. BinlogSplitReader 執(zhí)行pollSplitRecords 時對隊列中的原始數(shù)據(jù)進(jìn)行修正,保障數(shù)據(jù)一致性。增量階段的Binlog讀取是無界的,數(shù)據(jù)會全部下發(fā)到事件隊列,BinlogSplitReader 通過shouldEmit()判斷數(shù)據(jù)是否下發(fā)。

          BinlogSplitReader#pollSplitRecords
          public?Iterator?pollSplitRecords()?throws?InterruptedException?{
          ????checkReadException();
          ????final?List?sourceRecords?=?new?ArrayList<>();
          ????if?(currentTaskRunning)?{
          ????????List?batch?=?queue.poll();
          ????????for?(DataChangeEvent?event?:?batch)?{
          ????????????if?(shouldEmit(event.getRecord()))?{
          ????????????????sourceRecords.add(event.getRecord());
          ????????????}
          ????????}
          ????}
          ????return?sourceRecords.iterator();
          }

          事件下發(fā)條件:

          1. 新收到的event post 大于 maxwm

          2. 當(dāng)前 data值所屬某個snapshot spilt & 偏移量大于 HWM,下發(fā)數(shù)據(jù)。

          /**
          ?*
          ?*?Returns?the?record?should?emit?or?not.
          ?*
          ?*?

          The?watermark?signal?algorithm?is?the?binlog?split?reader?only?sends?the?binlog?event?that
          ?*?belongs?to?its?finished?snapshot?splits.?For?each?snapshot?split,?the?binlog?event?is?valid
          ?*?since?the?offset?is?after?its?high?watermark.
          ?*
          ?*?

          ?E.g:?the?data?input?is?:
          ?*????snapshot-split-0?info?:?[0,????1024)?highWatermark0
          ?*????snapshot-split-1?info?:?[1024,?2048)?highWatermark1
          ?*??the?data?output?is:
          ?*??only?the?binlog?event?belong?to?[0,????1024)?and?offset?is?after?highWatermark0?should?send,
          ?*??only?the?binlog?event?belong?to?[1024,?2048)?and?offset?is?after?highWatermark1?should?send.
          ?*?

          ?*/
          private?boolean?shouldEmit(SourceRecord?sourceRecord)?{
          ????if?(isDataChangeRecord(sourceRecord))?{
          ????????TableId?tableId?=?getTableId(sourceRecord);
          ????????BinlogOffset?position?=?getBinlogPosition(sourceRecord);
          ????????//?aligned,?all?snapshot?splits?of?the?table?has?reached?max?highWatermark
          ???????
          ????????//?note:??新收到的event?post?大于?maxwm?,直接下發(fā)
          ????????if?(position.isAtOrBefore(maxSplitHighWatermarkMap.get(tableId)))?{
          ????????????return?true;
          ????????}
          ????????Object[]?key?=
          ????????????????getSplitKey(
          ????????????????????????currentBinlogSplit.getSplitKeyType(),
          ????????????????????????sourceRecord,
          ????????????????????????statefulTaskContext.getSchemaNameAdjuster());

          ????????for?(FinishedSnapshotSplitInfo?splitInfo?:?finishedSplitsInfo.get(tableId))?{
          ????????????/**
          ?????????????*??note:?當(dāng)前?data值所屬某個snapshot?spilt?&??偏移量大于?HWM,下發(fā)數(shù)據(jù)
          ?????????????*/
          ????????????if?(RecordUtils.splitKeyRangeContains(
          ????????????????????????????key,?splitInfo.getSplitStart(),?splitInfo.getSplitEnd())
          ????????????????????&&?position.isAtOrBefore(splitInfo.getHighWatermark()))?{
          ????????????????return?true;
          ????????????}
          ????????}
          ????????//?not?in?the?monitored?splits?scope,?do?not?emit
          ????????return?false;
          ????}

          ????//?always?send?the?schema?change?event?and?signal?event
          ????//?we?need?record?them?to?state?of?Flink
          ????return?true;
          }

          MySqlRecordEmitter 數(shù)據(jù)下發(fā)

          SourceReaderBase 從隊列中獲取切片讀取的DataChangeEvent數(shù)據(jù)集合,將數(shù)據(jù)類型由Debezium的DataChangeEvent 轉(zhuǎn)換為Flink 的RowData類型。

          1. SourceReaderBase 處理切片數(shù)據(jù)流程
          org.apache.flink.connector.base.source.reader.SourceReaderBase#pollNext
          public?InputStatus?pollNext(ReaderOutput?output)?throws?Exception?{
          ????//?make?sure?we?have?a?fetch?we?are?working?on,?or?move?to?the?next
          ????RecordsWithSplitIds?recordsWithSplitId?=?this.currentFetch;
          ????if?(recordsWithSplitId?==?null)?{
          ????????recordsWithSplitId?=?getNextFetch(output);
          ????????if?(recordsWithSplitId?==?null)?{
          ????????????return?trace(finishedOrAvailableLater());
          ????????}
          ????}

          ????//?we?need?to?loop?here,?because?we?may?have?to?go?across?splits
          ????while?(true)?{
          ????????//?Process?one?record.
          ????????//?note:??通過MySqlRecords從迭代器中讀取單條數(shù)據(jù)
          ????????final?E?record?=?recordsWithSplitId.nextRecordFromSplit();
          ????????if?(record?!=?null)?{
          ????????????//?emit?the?record.
          ????????????recordEmitter.emitRecord(record,?currentSplitOutput,?currentSplitContext.state);
          ????????????LOG.trace("Emitted?record:?{}",?record);

          ????????????//?We?always?emit?MORE_AVAILABLE?here,?even?though?we?do?not?strictly?know?whether
          ????????????//?more?is?available.?If?nothing?more?is?available,?the?next?invocation?will?find
          ????????????//?this?out?and?return?the?correct?status.
          ????????????//?That?means?we?emit?the?occasional?'false?positive'?for?availability,?but?this
          ????????????//?saves?us?doing?checks?for?every?record.?Ultimately,?this?is?cheaper.
          ????????????return?trace(InputStatus.MORE_AVAILABLE);
          ????????}?else?if?(!moveToNextSplit(recordsWithSplitId,?output))?{
          ????????????//?The?fetch?is?done?and?we?just?discovered?that?and?have?not?emitted?anything,?yet.
          ????????????//?We?need?to?move?to?the?next?fetch.?As?a?shortcut,?we?call?pollNext()?here?again,
          ????????????//?rather?than?emitting?nothing?and?waiting?for?the?caller?to?call?us?again.
          ????????????return?pollNext(output);
          ????????}
          ????????//?else?fall?through?the?loop
          ????}
          }

          private?RecordsWithSplitIds?getNextFetch(final?ReaderOutput?output)?{
          ????splitFetcherManager.checkErrors();

          ????LOG.trace("Getting?next?source?data?batch?from?queue");
          ????//?note:?從elementsQueue?獲取數(shù)據(jù)
          ????final?RecordsWithSplitIds?recordsWithSplitId?=?elementsQueue.poll();
          ????if?(recordsWithSplitId?==?null?||?!moveToNextSplit(recordsWithSplitId,?output))?{
          ????????return?null;
          ????}

          ????currentFetch?=?recordsWithSplitId;
          ????return?recordsWithSplitId;
          }
          1. MySqlRecords 返回單條數(shù)據(jù)集合。
          com.ververica.cdc.connectors.mysql.source.split.MySqlRecords#nextRecordFromSplit

          public?SourceRecord?nextRecordFromSplit()?{
          ????final?Iterator?recordsForSplit?=?this.recordsForCurrentSplit;
          ????if?(recordsForSplit?!=?null)?{
          ????????if?(recordsForSplit.hasNext())?{
          ????????????return?recordsForSplit.next();
          ????????}?else?{
          ????????????return?null;
          ????????}
          ????}?else?{
          ????????throw?new?IllegalStateException();
          ????}
          }
          1. MySqlRecordEmitter 通過 RowDataDebeziumDeserializeSchema 將數(shù)據(jù)轉(zhuǎn)換為Rowdata。
          com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter#emitRecord
          public?void?emitRecord(SourceRecord?element,?SourceOutput?output,?MySqlSplitState?splitState)
          ????throws?Exception?{
          if?(isWatermarkEvent(element))?{
          ????BinlogOffset?watermark?=?getWatermark(element);
          ????if?(isHighWatermarkEvent(element)?&&?splitState.isSnapshotSplitState())?{
          ????????splitState.asSnapshotSplitState().setHighWatermark(watermark);
          ????}
          }?else?if?(isSchemaChangeEvent(element)?&&?splitState.isBinlogSplitState())?{
          ????HistoryRecord?historyRecord?=?getHistoryRecord(element);
          ????Array?tableChanges?=
          ????????????historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
          ????TableChanges?changes?=?TABLE_CHANGE_SERIALIZER.deserialize(tableChanges,?true);
          ????for?(TableChanges.TableChange?tableChange?:?changes)?{
          ????????splitState.asBinlogSplitState().recordSchema(tableChange.getId(),?tableChange);
          ????}
          }?else?if?(isDataChangeRecord(element))?{
          ????//??note:?數(shù)據(jù)的處理
          ????if?(splitState.isBinlogSplitState())?{
          ????????BinlogOffset?position?=?getBinlogPosition(element);
          ????????splitState.asBinlogSplitState().setStartingOffset(position);
          ????}
          ????debeziumDeserializationSchema.deserialize(
          ????????????element,
          ????????????new?Collector()?{
          ????????????????@Override
          ????????????????public?void?collect(final?T?t)?{
          ????????????????????output.collect(t);
          ????????????????}

          ????????????????@Override
          ????????????????public?void?close()?{
          ????????????????????//?do?nothing
          ????????????????}
          ????????????});
          }?else?{
          ????//?unknown?element
          ????LOG.info("Meet?unknown?element?{},?just?skip.",?element);
          }
          }

          RowDataDebeziumDeserializeSchema 序列化過程。

          com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema#deserialize
          public?void?deserialize(SourceRecord?record,?Collector?out)?throws?Exception?{
          ????Envelope.Operation?op?=?Envelope.operationFor(record);
          ????Struct?value?=?(Struct)?record.value();
          ????Schema?valueSchema?=?record.valueSchema();
          ????if?(op?==?Envelope.Operation.CREATE?||?op?==?Envelope.Operation.READ)?{
          ????????GenericRowData?insert?=?extractAfterRow(value,?valueSchema);
          ????????validator.validate(insert,?RowKind.INSERT);
          ????????insert.setRowKind(RowKind.INSERT);
          ????????out.collect(insert);
          ????}?else?if?(op?==?Envelope.Operation.DELETE)?{
          ????????GenericRowData?delete?=?extractBeforeRow(value,?valueSchema);
          ????????validator.validate(delete,?RowKind.DELETE);
          ????????delete.setRowKind(RowKind.DELETE);
          ????????out.collect(delete);
          ????}?else?{
          ????????GenericRowData?before?=?extractBeforeRow(value,?valueSchema);
          ????????validator.validate(before,?RowKind.UPDATE_BEFORE);
          ????????before.setRowKind(RowKind.UPDATE_BEFORE);
          ????????out.collect(before);

          ????????GenericRowData?after?=?extractAfterRow(value,?valueSchema);
          ????????validator.validate(after,?RowKind.UPDATE_AFTER);
          ????????after.setRowKind(RowKind.UPDATE_AFTER);
          ????????out.collect(after);
          ????}
          }

          MySqlSourceReader 匯報切片讀取完成事件

          MySqlSourceReader處理完一個全量切片后,會向MySqlSourceEnumerator發(fā)送已完成的切片信息,包含切片ID、HighWatermar ,然后繼續(xù)發(fā)送切片請求。

          com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#onSplitFinished
          protected?void?onSplitFinished(Map?finishedSplitIds)?{
          for?(MySqlSplitState?mySqlSplitState?:?finishedSplitIds.values())?{
          ????MySqlSplit?mySqlSplit?=?mySqlSplitState.toMySqlSplit();

          ????finishedUnackedSplits.put(mySqlSplit.splitId(),?mySqlSplit.asSnapshotSplit());
          }
          /**
          ?*???note:?發(fā)送切片完成事件
          ?*/
          reportFinishedSnapshotSplitsIfNeed();

          //??上一個spilt處理完成后繼續(xù)發(fā)送切片請求
          context.sendSplitRequest();
          }

          private?void?reportFinishedSnapshotSplitsIfNeed()?{
          ????if?(!finishedUnackedSplits.isEmpty())?{
          ????????final?Map?finishedOffsets?=?new?HashMap<>();
          ????????for?(MySqlSnapshotSplit?split?:?finishedUnackedSplits.values())?{
          ????????????//?note:?發(fā)送切片ID,及最大偏移量
          ????????????finishedOffsets.put(split.splitId(),?split.getHighWatermark());
          ????????}
          ????????FinishedSnapshotSplitsReportEvent?reportEvent?=
          ????????????????new?FinishedSnapshotSplitsReportEvent(finishedOffsets);

          ????????context.sendSourceEventToCoordinator(reportEvent);
          ????????LOG.debug(
          ????????????????"The?subtask?{}?reports?offsets?of?finished?snapshot?splits?{}.",
          ????????????????subtaskId,
          ????????????????finishedOffsets);
          ????}
          }

          MySqlSourceEnumerator 分配增量切片

          全量階段所有分片讀取完畢后,MySqlHybridSplitAssigner 會創(chuàng)建BinlogSplit 進(jìn)行后續(xù)增量讀取,在創(chuàng)建BinlogSplit 會從全部已完成的全量切片中篩選最小BinlogOffset。注意:2.0.0分支 createBinlogSplit 最小偏移量總是從0開始,最新master分支已經(jīng)修復(fù)這個BUG.

          private?MySqlBinlogSplit?createBinlogSplit()?{
          ????final?List?assignedSnapshotSplit?=
          ????????????snapshotSplitAssigner.getAssignedSplits().values().stream()
          ????????????????????.sorted(Comparator.comparing(MySqlSplit::splitId))
          ????????????????????.collect(Collectors.toList());

          ????Map?splitFinishedOffsets?=
          ????????????snapshotSplitAssigner.getSplitFinishedOffsets();
          ????final?List?finishedSnapshotSplitInfos?=?new?ArrayList<>();
          ????final?Map?tableSchemas?=?new?HashMap<>();

          ????BinlogOffset?minBinlogOffset?=?null;
          ????//?note:?從所有assignedSnapshotSplit中篩選最小偏移量
          ????for?(MySqlSnapshotSplit?split?:?assignedSnapshotSplit)?{
          ????????//?find?the?min?binlog?offset
          ????????BinlogOffset?binlogOffset?=?splitFinishedOffsets.get(split.splitId());
          ????????if?(minBinlogOffset?==?null?||?binlogOffset.compareTo(minBinlogOffset)?????????????minBinlogOffset?=?binlogOffset;
          ????????}
          ????????finishedSnapshotSplitInfos.add(
          ????????????????new?FinishedSnapshotSplitInfo(
          ????????????????????????split.getTableId(),
          ????????????????????????split.splitId(),
          ????????????????????????split.getSplitStart(),
          ????????????????????????split.getSplitEnd(),
          ????????????????????????binlogOffset));
          ????????tableSchemas.putAll(split.getTableSchemas());
          ????}

          ????final?MySqlSnapshotSplit?lastSnapshotSplit?=
          ????????????assignedSnapshotSplit.get(assignedSnapshotSplit.size()?-?1).asSnapshotSplit();
          ????
          ????return?new?MySqlBinlogSplit(
          ????????????BINLOG_SPLIT_ID,
          ????????????lastSnapshotSplit.getSplitKeyType(),
          ????????????minBinlogOffset?==?null???BinlogOffset.INITIAL_OFFSET?:?minBinlogOffset,
          ????????????BinlogOffset.NO_STOPPING_OFFSET,
          ????????????finishedSnapshotSplitInfos,
          ????????????tableSchemas);
          }


          瀏覽 224
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  99热7| 黄色电影一级A片 | 超碰美女在线 | 日韩靠逼 | 中国女人一级一次看片 |