Flink CDC 2.0 數(shù)據(jù)處理流程全面解析

8月份 FlinkCDC 發(fā)布2.0.0版本,相較于1.0版本,在全量讀取階段支持分布式讀取、支持checkpoint,且在全量 + 增量讀取的過程在不鎖表的情況下保障數(shù)據(jù)一致性。
Flink CDC2.0 數(shù)據(jù)讀取邏輯并不復(fù)雜,復(fù)雜的是 FLIP-27: Refactor Source Interface 的設(shè)計及對Debezium Api的不了解。本文重點(diǎn)對 Flink CDC 的處理邏輯進(jìn)行介紹, FLIP-27 的設(shè)計及 Debezium 的API調(diào)用不做過多講解。
本文先以Flink SQL 案例來介紹Flink CDC2.0的使用,接著介紹CDC中的核心設(shè)計包含切片劃分、切分讀取、增量讀取,最后對數(shù)據(jù)處理過程中涉及flink-mysql-cdc 接口的調(diào)用及實現(xiàn)進(jìn)行代碼講解。
案例
全量讀取+增量讀取 Mysql表數(shù)據(jù),以changelog-json 格式寫入kafka,觀察 RowKind 類型及影響的數(shù)據(jù)條數(shù)。
public?static?void?main(String[]?args)?{
????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????EnvironmentSettings?envSettings?=?EnvironmentSettings.newInstance()
????????????????.useBlinkPlanner()
????????????????.inStreamingMode()
????????????????.build();
????????env.setParallelism(3);
????????//?note:?增量同步需要開啟CK
????????env.enableCheckpointing(10000);
????????StreamTableEnvironment?tableEnvironment?=?StreamTableEnvironment.create(env,?envSettings);
????????????
????????tableEnvironment.executeSql("?CREATE?TABLE?demoOrders?(\n"?+
????????????????"?????????`order_id`?INTEGER?,\n"?+
????????????????"??????????`order_date`?DATE?,\n"?+
????????????????"??????????`order_time`?TIMESTAMP(3),\n"?+
????????????????"??????????`quantity`?INT?,\n"?+
????????????????"??????????`product_id`?INT?,\n"?+
????????????????"??????????`purchaser`?STRING,\n"?+
????????????????"???????????primary?key(order_id)??NOT?ENFORCED"?+
????????????????"?????????)?WITH?(\n"?+
????????????????"??????????'connector'?=?'mysql-cdc',\n"?+
????????????????"??????????'hostname'?=?'localhost',\n"?+
????????????????"??????????'port'?=?'3306',\n"?+
????????????????"??????????'username'?=?'cdc',\n"?+
????????????????"??????????'password'?=?'123456',\n"?+
????????????????"??????????'database-name'?=?'test',\n"?+
????????????????"??????????'table-name'?=?'demo_orders',"?+
????????????????????????????//??全量?+?增量同步???
????????????????"??????????'scan.startup.mode'?=?'initial'??????"?+
????????????????"?)");
????????????tableEnvironment.executeSql("CREATE?TABLE?sink?(\n"?+
????????????????"?????????`order_id`?INTEGER?,\n"?+
????????????????"??????????`order_date`?DATE?,\n"?+
????????????????"??????????`order_time`?TIMESTAMP(3),\n"?+
????????????????"??????????`quantity`?INT?,\n"?+
????????????????"??????????`product_id`?INT?,\n"?+
????????????????"??????????`purchaser`?STRING,\n"?+
????????????????"??????????primary?key?(order_id)??NOT?ENFORCED?"?+
????????????????")?WITH?(\n"?+
????????????????"????'connector'?=?'kafka',\n"?+
????????????????"????'properties.bootstrap.servers'?=?'localhost:9092',\n"?+
????????????????"????'topic'?=?'mqTest02',\n"?+
????????????????"????'format'?=?'changelog-json'?"+
????????????????")");
????????????tableEnvironment.executeSql("insert?into?sink?select?*?from?demoOrders");}
全量數(shù)據(jù)輸出:
{"data":{"order_id":1010,"order_date":"2021-09-17","order_time":"2021-09-22?10:52:12.189","quantity":53,"product_id":502,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1009,"order_date":"2021-09-17","order_time":"2021-09-22?10:52:09.709","quantity":31,"product_id":500,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1008,"order_date":"2021-09-17","order_time":"2021-09-22?10:52:06.637","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1007,"order_date":"2021-09-17","order_time":"2021-09-22?10:52:03.535","quantity":52,"product_id":502,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1002,"order_date":"2021-09-17","order_time":"2021-09-22?10:51:51.347","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1001,"order_date":"2021-09-17","order_time":"2021-09-22?10:51:48.783","quantity":50,"product_id":502,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1000,"order_date":"2021-09-17","order_time":"2021-09-17?17:40:32.354","quantity":30,"product_id":500,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1006,"order_date":"2021-09-17","order_time":"2021-09-22?10:52:01.249","quantity":31,"product_id":500,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22?10:51:58.813","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1004,"order_date":"2021-09-17","order_time":"2021-09-22?10:51:56.153","quantity":50,"product_id":502,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1003,"order_date":"2021-09-17","order_time":"2021-09-22?10:51:53.727","quantity":30,"product_id":500,"purchaser":"flink"},"op":"+I"}
修改表數(shù)據(jù),增量捕獲:
##?更新?1005?的值?
{"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22?02:51:58.813","quantity":69,"product_id":503,"purchaser":"flink"},"op":"-U"}
{"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22?02:55:43.627","quantity":80,"product_id":503,"purchaser":"flink"},"op":"+U"}
##?刪除?1000?
{"data":{"order_id":1000,"order_date":"2021-09-17","order_time":"2021-09-17?09:40:32.354","quantity":30,"product_id":500,"purchaser":"flink"},"op":"-D"}
核心設(shè)計
切片劃分
全量階段數(shù)據(jù)讀取方式為分布式讀取,會先對當(dāng)前表數(shù)據(jù)按主鍵劃分成多個Chunk,后續(xù)子任務(wù)讀取Chunk 區(qū)間內(nèi)的數(shù)據(jù)。根據(jù)主鍵列是否為自增整數(shù)類型,對表數(shù)據(jù)劃分為均勻分布的Chunk及非均勻分布的Chunk。
均勻分布
主鍵列自增且類型為整數(shù)類型(int,bigint,decimal)。查詢出主鍵列的最小值,最大值,按 chunkSize 大小將數(shù)據(jù)均勻劃分,因為主鍵為整數(shù)類型,根據(jù)當(dāng)前chunk 起始位置、chunkSize大小,直接計算chunk 的結(jié)束位置。
//??計算主鍵列數(shù)據(jù)區(qū)間
select?min(`order_id`),?max(`order_id`)?from?demo_orders;
//??將數(shù)據(jù)劃分為?chunkSize?大小的切片
chunk-0:?[min,start + chunkSize)
chunk-1:?[start + chunkSize, start + 2chunkSize)
.......
chunk-last:?[max,null)
非均勻分布
主鍵列非自增或者類型為非整數(shù)類型。主鍵為非數(shù)值類型,每次劃分需要對未劃分的數(shù)據(jù)按主鍵進(jìn)行升序排列,取出前 chunkSize 的最大值為當(dāng)前 chunk 的結(jié)束位置。
//?未拆分的數(shù)據(jù)排序后,取 chunkSize 條數(shù)據(jù)取最大值,作為切片的終止位置。
chunkend?=?SELECT?MAX(`order_id`)?FROM?(
????????SELECT?`order_id`??FROM?`demo_orders`?
????????WHERE?`order_id`?>=?[前一個切片的起始位置]?
????????ORDER?BY?`order_id`?ASC?
????????LIMIT???[chunkSize]??
????)?AS?T
全量切片數(shù)據(jù)讀取
Flink 將表數(shù)據(jù)劃分為多個Chunk,子任務(wù)在不加鎖的情況下,并行讀取 Chunk數(shù)據(jù)。因為全程無鎖在數(shù)據(jù)分片讀取過程中,可能有其他事務(wù)對切片范圍內(nèi)的數(shù)據(jù)進(jìn)行修改,此時無法保證數(shù)據(jù)一致性。因此,在全量階段Flink 使用快照記錄讀取+Binlog數(shù)據(jù)修正的方式來保證數(shù)據(jù)的一致性。
快照讀取
通過JDBC執(zhí)行SQL查詢切片范圍的數(shù)據(jù)記錄。
##?快照記錄數(shù)據(jù)讀取SQL?
SELECT?*?FROM?`test`.`demo_orders`?
WHERE?order_id?>=?[chunkStart]?
AND?NOT?(order_id?=?[chunkEnd])?
AND?order_id?<=?[chunkEnd]
數(shù)據(jù)修正
在快照讀取操作前、后執(zhí)行 SHOW MASTER STATUS 查詢binlog文件的當(dāng)前偏移量,在快照讀取完畢后,查詢區(qū)間內(nèi)的binlog數(shù)據(jù)并對讀取的快照記錄進(jìn)行修正。
快照讀取+Binlog數(shù)據(jù)讀取時的數(shù)據(jù)組織結(jié)構(gòu)。

BinlogEvents 修正 SnapshotEvents 規(guī)則。
未讀取到binlog數(shù)據(jù),即在執(zhí)行select階段沒有其他事務(wù)進(jìn)行操作,直接下發(fā)所有快照記錄。 讀取到binlog數(shù)據(jù),且變更的數(shù)據(jù)記錄不屬于當(dāng)前切片,下發(fā)快照記錄。 讀取到binlog數(shù)據(jù),且數(shù)據(jù)記錄的變更屬于當(dāng)前切片。delete 操作從快照內(nèi)存中移除該數(shù)據(jù),insert 操作向快照內(nèi)存添加新的數(shù)據(jù),update操作向快照內(nèi)存中添加變更記錄,最終會輸出更新前后的兩條記錄到下游。
修正后的數(shù)據(jù)組織結(jié)構(gòu):

以讀取切片[1,11)范圍的數(shù)據(jù)為例,描述切片數(shù)據(jù)的處理過程。c,d,u代表Debezium捕獲到的新增、刪除、更新操作。
修正前數(shù)據(jù)及結(jié)構(gòu):

修正后數(shù)據(jù)及結(jié)構(gòu):

單個切片數(shù)據(jù)處理完畢后會向 SplitEnumerator 發(fā)送已完成切片數(shù)據(jù)的起始位置(ChunkStart, ChunkStartEnd)、Binlog的最大偏移量(High watermark),用來為增量讀取指定起始偏移量。
單個切片數(shù)據(jù)處理完畢后會向 SplitEnumerator 發(fā)送已完成切片數(shù)據(jù)的起始位置(ChunkStart, ChunkStartEnd)、Binlog的最大偏移量(High watermark),用來為增量讀取指定起始偏移量。
增量切片數(shù)據(jù)讀取
全量階段切片數(shù)據(jù)讀取完成后,SplitEnumerator 會下發(fā)一個 BinlogSplit 進(jìn)行增量數(shù)據(jù)讀取。BinlogSplit讀取最重要的屬性就是起始偏移量,偏移量如果設(shè)置過小下游可能會有重復(fù)數(shù)據(jù),偏移量如果設(shè)置過大下游可能是已超期的臟數(shù)據(jù)。而 Flink CDC增量讀取的起始偏移量為所有已完成的全量切片最小的Binlog偏移量,只有滿足條件的數(shù)據(jù)才被下發(fā)到下游。
數(shù)據(jù)下發(fā)條件:
捕獲的Binlog數(shù)據(jù)的偏移量 > 數(shù)據(jù)所屬分片的Binlog的最大偏移量。
例如,SplitEnumerator 保留的已完成切片信息為。

增量讀取時,從偏移量 800 開始讀取Binlog數(shù)據(jù) ,當(dāng)捕獲到數(shù)據(jù)
代碼詳解
關(guān)于 FLIP-27: Refactor Source Interface 設(shè)計不做詳細(xì)介紹,本文側(cè)重對 flink-mysql-cdc 接口調(diào)用及實現(xiàn)進(jìn)行講解。
MySqlSourceEnumerator 初始化
SourceCoordinator作為OperatorCoordinator對Source的實現(xiàn),運(yùn)行在Master節(jié)點(diǎn),在啟動時通過調(diào)用MySqlParallelSource#createEnumerator 創(chuàng)建 MySqlSourceEnumerator 并調(diào)用start方法,做一些初始化工作。

創(chuàng)建 MySqlSourceEnumerator,使用 MySqlHybridSplitAssigner 對全量+增量數(shù)據(jù)進(jìn)行切片,使用 MySqlValidator 對 mysql 版本、配置進(jìn)行校驗。
MySqlValidator 校驗:
mysql版本必須大于等于5.7。 binlog_format 配置必須為 ROW。 binlog_row_image 配置必須為 FULL。
MySqlSplitAssigner 初始化:
創(chuàng)建 ChunkSplitter用來劃分切片。 篩選出要讀的表名稱。
啟動周期調(diào)度線程,要求 SourceReader 向 SourceEnumerator 發(fā)送已完成但未發(fā)送ACK事件的切片信息。
private?void?syncWithReaders(int[]?subtaskIds,?Throwable?t)?{
????if?(t?!=?null)?{
????????throw?new?FlinkRuntimeException("Failed?to?list?obtain?registered?readers?due?to:",?t);
????}
????//?when?the?SourceEnumerator?restores?or?the?communication?failed?between
????//?SourceEnumerator?and?SourceReader,?it?may?missed?some?notification?event.
????//?tell?all?SourceReader(s)?to?report?there?finished?but?unacked?splits.
????if?(splitAssigner.waitingForFinishedSplits())?{
????????for?(int?subtaskId?:?subtaskIds)?{
????????????//?note:?發(fā)送?FinishedSnapshotSplitsRequestEvent?
????????????context.sendEventToSourceReader(
????????????????????subtaskId,?new?FinishedSnapshotSplitsRequestEvent());
????????}
????}
}
MySqlSourceReader 初始化
SourceOperator 集成了SourceReader,通過OperatorEventGateway 和 SourceCoordinator 進(jìn)行交互。

SourceOperator 在初始化時,通過 MySqlParallelSource 創(chuàng)建 MySqlSourceReader。MySqlSourceReader 通過 SingleThreadFetcherManager 創(chuàng)建Fetcher拉取分片數(shù)據(jù),數(shù)據(jù)以 MySqlRecords 格式寫入到 elementsQueue。
MySqlParallelSource#createReader
public?SourceReader?createReader(SourceReaderContext?readerContext)?throws?Exception?{
????//?note:??數(shù)據(jù)存儲隊列
FutureCompletingBlockingQueue>?elementsQueue?=
????????new?FutureCompletingBlockingQueue<>();
final?Configuration?readerConfiguration?=?getReaderConfig(readerContext);
????//?note:?Split?Reader?工廠類
Supplier?splitReaderSupplier?=
????????()?->?new?MySqlSplitReader(readerConfiguration,?readerContext.getIndexOfSubtask());
return?new?MySqlSourceReader<>(
????????elementsQueue,
????????splitReaderSupplier,
????????new?MySqlRecordEmitter<>(deserializationSchema),
????????readerConfiguration,
????????readerContext);
}
將創(chuàng)建的 MySqlSourceReader 以事件的形式傳遞給 SourceCoordinator 進(jìn)行注冊。SourceCoordinator 接收到注冊事件后,將reader 地址及索引進(jìn)行保存。
SourceCoordinator#handleReaderRegistrationEvent
//?note:?SourceCoordinator?處理Reader?注冊事件
private?void?handleReaderRegistrationEvent(ReaderRegistrationEvent?event)?{
????context.registerSourceReader(new?ReaderInfo(event.subtaskId(),?event.location()));
????enumerator.addReader(event.subtaskId());
}
MySqlSourceReader 啟動后會向 MySqlSourceEnumerator 發(fā)送請求分片事件,從而收集分配的切片數(shù)據(jù)。
SourceOperator 初始化完畢后,調(diào)用 emitNext 由 SourceReaderBase 從 elementsQueue 獲取數(shù)據(jù)集合并下發(fā)給 MySqlRecordEmitter。接口調(diào)用示意圖:

MySqlSourceEnumerator 處理分片請求
MySqlSourceReader 啟動時會向 MySqlSourceEnumerator 發(fā)送請求 RequestSplitEvent 事件,根據(jù)返回的切片范圍讀取區(qū)間數(shù)據(jù)。MySqlSourceEnumerator 全量讀取階段分片請求處理邏輯,最終返回一個MySqlSnapshotSplit。

處理切片請求事件,為請求的Reader分配切片,通過發(fā)送AddSplitEvent時間傳遞MySqlSplit(全量階段MySqlSnapshotSplit、增量階段MySqlBinlogSplit)。
MySqlSourceEnumerator#handleSplitRequest
public?void?handleSplitRequest(int?subtaskId,?@Nullable?String?requesterHostname)?{
????if?(!context.registeredReaders().containsKey(subtaskId))?{
????????//?reader?failed?between?sending?the?request?and?now.?skip?this?request.
????????return;
????}
????//?note:??將reader所屬的subtaskId存儲到TreeSet,?在處理binlog?split時優(yōu)先分配個task-0
????readersAwaitingSplit.add(subtaskId);
????assignSplits();
}
//?note:?分配切片
private?void?assignSplits()?{
????final?Iterator?awaitingReader?=?readersAwaitingSplit.iterator();
????while?(awaitingReader.hasNext())?{
????????int?nextAwaiting?=?awaitingReader.next();
????????//?if?the?reader?that?requested?another?split?has?failed?in?the?meantime,?remove
????????//?it?from?the?list?of?waiting?readers
????????if?(!context.registeredReaders().containsKey(nextAwaiting))?{
????????????awaitingReader.remove();
????????????continue;
????????}
????????//note:?由?MySqlSplitAssigner?分配切片
????????Optional?split?=?splitAssigner.getNext();
????????if?(split.isPresent())?{
????????????final?MySqlSplit?mySqlSplit?=?split.get();
????????????//??note:?發(fā)送AddSplitEvent,?為?Reader?返回切片信息
????????????context.assignSplit(mySqlSplit,?nextAwaiting);
????????????awaitingReader.remove();
????????????LOG.info("Assign?split?{}?to?subtask?{}",?mySqlSplit,?nextAwaiting);
????????}?else?{
????????????//?there?is?no?available?splits?by?now,?skip?assigning
????????????break;
????????}
????}
}
MySqlHybridSplitAssigner 處理全量切片、增量切片的邏輯。
任務(wù)剛啟動時,remainingTables不為空,noMoreSplits返回值為false,創(chuàng)建 SnapshotSplit。
全量階段分片讀取完成后,noMoreSplits返回值為true, 創(chuàng)建 BinlogSplit。
MySqlHybridSplitAssigner#getNext
@Override
public?Optional?getNext()?{
????if?(snapshotSplitAssigner.noMoreSplits())?{
????????//?binlog?split?assigning
????????if?(isBinlogSplitAssigned)?{
????????????//?no?more?splits?for?the?assigner
????????????return?Optional.empty();
????????}?else?if?(snapshotSplitAssigner.isFinished())?{
????????????//?we?need?to?wait?snapshot-assigner?to?be?finished?before
????????????//?assigning?the?binlog?split.?Otherwise,?records?emitted?from?binlog?split
????????????//?might?be?out-of-order?in?terms?of?same?primary?key?with?snapshot?splits.
????????????isBinlogSplitAssigned?=?true;
????????????//note: snapshot split 切片完成后,創(chuàng)建BinlogSplit。
????????????return?Optional.of(createBinlogSplit());
????????}?else?{
????????????//?binlog?split?is?not?ready?by?now
????????????return?Optional.empty();
????????}
????}?else?{
????????//?note:?由MySqlSnapshotSplitAssigner?創(chuàng)建?SnapshotSplit
????????//?snapshot?assigner?still?have?remaining?splits,?assign?split?from?it
????????return?snapshotSplitAssigner.getNext();
????}
}
MySqlSnapshotSplitAssigner 處理全量切片邏輯,通過 ChunkSplitter 生成切片,并存儲到Iterator中。
@Override
public?Optional?getNext()?{
????if?(!remainingSplits.isEmpty())?{
????????//?return?remaining?splits?firstly
????????Iterator?iterator?=?remainingSplits.iterator();
????????MySqlSnapshotSplit?split?=?iterator.next();
????????iterator.remove();
????????
????????//note:?已分配的切片存儲到?assignedSplits?集合
????????assignedSplits.put(split.splitId(),?split);
????????return?Optional.of(split);
????}?else?{
????????//?note:?初始化階段?remainingTables?存儲了要讀取的表名
????????TableId?nextTable?=?remainingTables.pollFirst();
????????if?(nextTable?!=?null)?{
????????????//?split?the?given?table?into?chunks?(snapshot?splits)
????????????//??note:?初始化階段創(chuàng)建了?ChunkSplitter,調(diào)用generateSplits?進(jìn)行切片劃分
????????????Collection?splits?=?chunkSplitter.generateSplits(nextTable);
????????????//??note:?保留所有切片信息
????????????remainingSplits.addAll(splits);
????????????//??note:?已經(jīng)完成分片的?Table
????????????alreadyProcessedTables.add(nextTable);
????????????//??note:?遞歸調(diào)用該該方法
????????????return?getNext();
????????}?else?{
????????????return?Optional.empty();
????????}
????}
}
ChunkSplitter 將表劃分為均勻分布 or 不均勻分布切片的邏輯。讀取的表必須包含物理主鍵。
public?Collection?generateSplits(TableId?tableId)?{
????Table?schema?=?mySqlSchema.getTableSchema(tableId).getTable();
????List?primaryKeys?=?schema.primaryKeyColumns();
????//?note:?必須有主鍵
????if?(primaryKeys.isEmpty())?{
????????throw?new?ValidationException(
????????????????String.format(
????????????????????????"Incremental?snapshot?for?tables?requires?primary?key,"
????????????????????????????????+?"?but?table?%s?doesn't?have?primary?key.",
????????????????????????tableId));
????}
????//?use?first?field?in?primary?key?as?the?split?key
????Column?splitColumn?=?primaryKeys.get(0);
????final?List?chunks;
????try?{
?????????//?note:?按主鍵列將數(shù)據(jù)劃分成多個切片
????????chunks?=?splitTableIntoChunks(tableId,?splitColumn);
????}?catch?(SQLException?e)?{
????????throw?new?FlinkRuntimeException("Failed?to?split?chunks?for?table?"?+?tableId,?e);
????}
????//note:?主鍵數(shù)據(jù)類型轉(zhuǎn)換、ChunkRange 包裝成MySqlSnapshotSplit。
????//?convert?chunks?into?splits
????List?splits?=?new?ArrayList<>();
????RowType?splitType?=?splitType(splitColumn);
?
????for?(int?i?=?0;?i?????????ChunkRange?chunk?=?chunks.get(i);
????????MySqlSnapshotSplit?split?=
????????????????createSnapshotSplit(
????????????????????????tableId,?i,?splitType,?chunk.getChunkStart(),?chunk.getChunkEnd());
????????splits.add(split);
????}
????return?splits;
}
splitTableIntoChunks 根據(jù)物理主鍵劃分切片。
private?List?splitTableIntoChunks(TableId?tableId,?Column?splitColumn)
????????throws?SQLException?{
????final?String?splitColumnName?=?splitColumn.name();
????//??select?min,?max
????final?Object[]?minMaxOfSplitColumn?=?queryMinMax(jdbc,?tableId,?splitColumnName);
????final?Object?min?=?minMaxOfSplitColumn[0];
????final?Object?max?=?minMaxOfSplitColumn[1];
????if?(min?==?null?||?max?==?null?||?min.equals(max))?{
????????//?empty?table,?or?only?one?row,?return?full?table?scan?as?a?chunk
????????return?Collections.singletonList(ChunkRange.all());
????}
????final?List?chunks;
????if?(splitColumnEvenlyDistributed(splitColumn))?{
????????//?use?evenly-sized?chunks?which?is?much?efficient
????????//?note:?按主鍵均勻劃分
????????chunks?=?splitEvenlySizedChunks(min,?max);
????}?else?{
????????//?note:?按主鍵非均勻劃分
????????//?use?unevenly-sized?chunks?which?will?request?many?queries?and?is?not?efficient.
????????chunks?=?splitUnevenlySizedChunks(tableId,?splitColumnName,?min,?max);
????}
????return?chunks;
}
/**?Checks?whether?split?column?is?evenly?distributed?across?its?range.?*/
private?static?boolean?splitColumnEvenlyDistributed(Column?splitColumn)?{
????//?only?column?is?auto-incremental?are?recognized?as?evenly?distributed.
????//?TODO:?we?may?use?MAX,MIN,COUNT?to?calculate?the?distribution?in?the?future.
????if?(splitColumn.isAutoIncremented())?{
????????DataType?flinkType?=?MySqlTypeUtils.fromDbzColumn(splitColumn);
????????LogicalTypeRoot?typeRoot?=?flinkType.getLogicalType().getTypeRoot();
????????//?currently,?we?only?support?split?column?with?type?BIGINT,?INT,?DECIMAL
????????return?typeRoot?==?LogicalTypeRoot.BIGINT
????????????????||?typeRoot?==?LogicalTypeRoot.INTEGER
????????????????||?typeRoot?==?LogicalTypeRoot.DECIMAL;
????}?else?{
????????return?false;
????}
}
/**
?*??根據(jù)拆分列的最小值和最大值將表拆分為大小均勻的塊,并以?{@link?#chunkSize}?步長滾動塊。
?*?Split?table?into?evenly?sized?chunks?based?on?the?numeric?min?and?max?value?of?split?column,
?*?and?tumble?chunks?in?{@link?#chunkSize}?step?size.
?*/
private?List?splitEvenlySizedChunks(Object?min,?Object?max)?{
????if?(ObjectUtils.compare(ObjectUtils.plus(min,?chunkSize),?max)?>?0)?{
????????//?there?is?no?more?than?one?chunk,?return?full?table?as?a?chunk
????????return?Collections.singletonList(ChunkRange.all());
????}
????final?List?splits?=?new?ArrayList<>();
????Object?chunkStart?=?null;
????Object?chunkEnd?=?ObjectUtils.plus(min,?chunkSize);
????//??chunkEnd?<=?max
????while?(ObjectUtils.compare(chunkEnd,?max)?<=?0)?{
????????splits.add(ChunkRange.of(chunkStart,?chunkEnd));
????????chunkStart?=?chunkEnd;
????????chunkEnd?=?ObjectUtils.plus(chunkEnd,?chunkSize);
????}
????//?add?the?ending?split
????splits.add(ChunkRange.of(chunkStart,?null));
????return?splits;
}
/**???通過連續(xù)計算下一個塊最大值,將表拆分為大小不均勻的塊。
?*?Split?table?into?unevenly?sized?chunks?by?continuously?calculating?next?chunk?max?value.?*/
private?List?splitUnevenlySizedChunks(
????????TableId?tableId,?String?splitColumnName,?Object?min,?Object?max)?throws?SQLException?{
????final?List?splits?=?new?ArrayList<>();
????Object?chunkStart?=?null;
????Object?chunkEnd?=?nextChunkEnd(min,?tableId,?splitColumnName,?max);
????int?count?=?0;
????while?(chunkEnd?!=?null?&&?ObjectUtils.compare(chunkEnd,?max)?<=?0)?{
????????//?we?start?from?[null,?min?+?chunk_size)?and?avoid?[null,?min)
????????splits.add(ChunkRange.of(chunkStart,?chunkEnd));
????????//?may?sleep?a?while?to?avoid?DDOS?on?MySQL?server
????????maySleep(count++);
????????chunkStart?=?chunkEnd;
????????chunkEnd?=?nextChunkEnd(chunkEnd,?tableId,?splitColumnName,?max);
????}
????//?add?the?ending?split
????splits.add(ChunkRange.of(chunkStart,?null));
????return?splits;
}
private?Object?nextChunkEnd(
????????Object?previousChunkEnd,?TableId?tableId,?String?splitColumnName,?Object?max)
????????throws?SQLException?{
????//?chunk?end?might?be?null?when?max?values?are?removed
????Object?chunkEnd?=
????????????queryNextChunkMax(jdbc,?tableId,?splitColumnName,?chunkSize,?previousChunkEnd);
????if?(Objects.equals(previousChunkEnd,?chunkEnd))?{
????????//?we?don't?allow?equal?chunk?start?and?end,
????????//?should?query?the?next?one?larger?than?chunkEnd
????????chunkEnd?=?queryMin(jdbc,?tableId,?splitColumnName,?chunkEnd);
????}
????if?(ObjectUtils.compare(chunkEnd,?max)?>=?0)?{
????????return?null;
????}?else?{
????????return?chunkEnd;
????}
}
MySqlSourceReader 處理切片分配請求

MySqlSourceReader接收到切片分配請求后,會為先創(chuàng)建一個 SplitFetcher線程,向 taskQueue 添加、執(zhí)行AddSplitsTask 任務(wù)用來處理添加分片任務(wù),接著執(zhí)行 FetchTask 使用Debezium API進(jìn)行讀取數(shù)據(jù),讀取的數(shù)據(jù)存儲到elementsQueue中,SourceReaderBase 會從該隊列中獲取數(shù)據(jù),并下發(fā)給 MySqlRecordEmitter。
處理切片分配事件時,創(chuàng)建SplitFetcher向taskQueue添加AddSplitsTask。
SingleThreadFetcherManager#addSplits
public?void?addSplits(List?splitsToAdd)?{
????SplitFetcher?fetcher?=?getRunningFetcher();
????if?(fetcher?==?null)?{
????????fetcher?=?createSplitFetcher();
????????//?Add?the?splits?to?the?fetchers.
????????fetcher.addSplits(splitsToAdd);
????????startFetcher(fetcher);
????}?else?{
????????fetcher.addSplits(splitsToAdd);
????}
}
//?創(chuàng)建?SplitFetcher
protected?synchronized?SplitFetcher?createSplitFetcher()?{
????if?(closed)?{
????????throw?new?IllegalStateException("The?split?fetcher?manager?has?closed.");
????}
????//?Create?SplitReader.
????SplitReader?splitReader?=?splitReaderFactory.get();
????int?fetcherId?=?fetcherIdGenerator.getAndIncrement();
????SplitFetcher?splitFetcher?=
????????????new?SplitFetcher<>(
????????????????????fetcherId,
????????????????????elementsQueue,
????????????????????splitReader,
????????????????????errorHandler,
????????????????????()?->?{
????????????????????????fetchers.remove(fetcherId);
????????????????????????elementsQueue.notifyAvailable();
????????????????????});
????fetchers.put(fetcherId,?splitFetcher);
????return?splitFetcher;
}
public?void?addSplits(List?splitsToAdd)?{
????enqueueTask(new?AddSplitsTask<>(splitReader,?splitsToAdd,?assignedSplits));
????wakeUp(true);
}
執(zhí)行 SplitFetcher線程,首次執(zhí)行 AddSplitsTask 線程添加分片,以后執(zhí)行 FetchTask 線程拉取數(shù)據(jù)。
SplitFetcher#runOnce
void?runOnce()?{
????try?{
????????if?(shouldRunFetchTask())?{
????????????runningTask?=?fetchTask;
????????}?else?{
????????????runningTask?=?taskQueue.take();
????????}
????????
????????if?(!wakeUp.get()?&&?runningTask.run())?{
????????????LOG.debug("Finished?running?task?{}",?runningTask);
????????????runningTask?=?null;
????????????checkAndSetIdle();
????????}
????}?catch?(Exception?e)?{
????????throw?new?RuntimeException(
????????????????String.format(
????????????????????????"SplitFetcher?thread?%d?received?unexpected?exception?while?polling?the?records",
????????????????????????id),
????????????????e);
????}
????maybeEnqueueTask(runningTask);
????synchronized?(wakeUp)?{
????????//?Set?the?running?task?to?null.?It?is?necessary?for?the?shutdown?method?to?avoid
????????//?unnecessarily?interrupt?the?running?task.
????????runningTask?=?null;
????????//?Set?the?wakeUp?flag?to?false.
????????wakeUp.set(false);
????????LOG.debug("Cleaned?wakeup?flag.");
????}
}
AddSplitsTask 調(diào)用 MySqlSplitReader 的 handleSplitsChanges方法,向切片隊列中添加已分配的切片信息。在下一次fetch()調(diào)用時,從隊列中獲取切片并讀取切片數(shù)據(jù)。
AddSplitsTask#run
public?boolean?run()?{
????for?(SplitT?s?:?splitsToAdd)?{
????????assignedSplits.put(s.splitId(),?s);
????}
????splitReader.handleSplitsChanges(new?SplitsAddition<>(splitsToAdd));
????return?true;
}
MySqlSplitReader#handleSplitsChanges
public?void?handleSplitsChanges(SplitsChange?splitsChanges)?{
????if?(!(splitsChanges?instanceof?SplitsAddition))?{
????????throw?new?UnsupportedOperationException(
????????????????String.format(
????????????????????????"The?SplitChange?type?of?%s?is?not?supported.",
????????????????????????splitsChanges.getClass()));
????}
????//note:?添加切片?到隊列。
????splits.addAll(splitsChanges.splits());
}
MySqlSplitReader 執(zhí)行fetch(),由DebeziumReader讀取數(shù)據(jù)到事件隊列,在對數(shù)據(jù)修正后以MySqlRecords格式返回。
MySqlSplitReader#fetch
@Override
public?RecordsWithSplitIds?fetch()?throws?IOException?{
????//?note:?創(chuàng)建Reader?并讀取數(shù)據(jù)
????checkSplitOrStartNext();
????Iterator?dataIt?=?null;
????try?{
????????//?note:??對讀取的數(shù)據(jù)進(jìn)行修正
????????dataIt?=?currentReader.pollSplitRecords();
????}?catch?(InterruptedException?e)?{
????????LOG.warn("fetch?data?failed.",?e);
????????throw?new?IOException(e);
????}
????//??note:?返回的數(shù)據(jù)被封裝為?MySqlRecords?進(jìn)行傳輸
????return?dataIt?==?null
??????????????finishedSnapshotSplit()???
????????????:?MySqlRecords.forRecords(currentSplitId,?dataIt);
}
private?void?checkSplitOrStartNext()?throws?IOException?{
????//?the?binlog?reader?should?keep?alive
????if?(currentReader?instanceof?BinlogSplitReader)?{
????????return;
????}
????if?(canAssignNextSplit())?{
????????//?note:??從切片隊列讀取MySqlSplit
????????final?MySqlSplit?nextSplit?=?splits.poll();
????????if?(nextSplit?==?null)?{
????????????throw?new?IOException("Cannot?fetch?from?another?split?-?no?split?remaining");
????????}
????????currentSplitId?=?nextSplit.splitId();
????????//?note:??區(qū)分全量切片讀取還是增量切片讀取
????????if?(nextSplit.isSnapshotSplit())?{
????????????if?(currentReader?==?null)?{
????????????????final?MySqlConnection?jdbcConnection?=?getConnection(config);
????????????????final?BinaryLogClient?binaryLogClient?=?getBinaryClient(config);
????????????????final?StatefulTaskContext?statefulTaskContext?=
????????????????????????new?StatefulTaskContext(config,?binaryLogClient,?jdbcConnection);
????????????????//?note:?創(chuàng)建SnapshotSplitReader,使用Debezium?Api讀取分配數(shù)據(jù)及區(qū)間Binlog值
????????????????currentReader?=?new?SnapshotSplitReader(statefulTaskContext,?subtaskId);
????????????}
????????}?else?{
????????????//?point?from?snapshot?split?to?binlog?split
????????????if?(currentReader?!=?null)?{
????????????????LOG.info("It's?turn?to?read?binlog?split,?close?current?snapshot?reader");
????????????????currentReader.close();
????????????}
????????????final?MySqlConnection?jdbcConnection?=?getConnection(config);
????????????final?BinaryLogClient?binaryLogClient?=?getBinaryClient(config);
????????????final?StatefulTaskContext?statefulTaskContext?=
????????????????????new?StatefulTaskContext(config,?binaryLogClient,?jdbcConnection);
????????????LOG.info("Create?binlog?reader");
????????????//?note:?創(chuàng)建BinlogSplitReader,使用Debezium?API進(jìn)行增量讀取
????????????currentReader?=?new?BinlogSplitReader(statefulTaskContext,?subtaskId);
????????}
????????//?note:?執(zhí)行Reader進(jìn)行數(shù)據(jù)讀取
????????currentReader.submitSplit(nextSplit);
????}
}
DebeziumReader 數(shù)據(jù)處理
DebeziumReader 包含全量切片讀取、增量切片讀取兩個階段,數(shù)據(jù)讀取后存儲到 ChangeEventQueue,執(zhí)行pollSplitRecords 時對數(shù)據(jù)進(jìn)行修正。
SnapshotSplitReader 全量切片讀取。全量階段的數(shù)據(jù)讀取通過執(zhí)行Select語句查詢出切片范圍內(nèi)的表數(shù)據(jù),在寫入隊列前后執(zhí)行 SHOW MASTER STATUS 時,寫入當(dāng)前偏移量。
public?void?submitSplit(MySqlSplit?mySqlSplit)?{
????......
????executor.submit(
????????????()?->?{
????????????????try?{
????????????????????currentTaskRunning?=?true;
????????????????????//?note:?數(shù)據(jù)讀取,在數(shù)據(jù)前后插入Binlog當(dāng)前偏移量
????????????????????//?1.?execute?snapshot?read?task。?
????????????????????final?SnapshotSplitChangeEventSourceContextImpl?sourceContext?=
????????????????????????????new?SnapshotSplitChangeEventSourceContextImpl();
????????????????????SnapshotResult?snapshotResult?=
????????????????????????????splitSnapshotReadTask.execute(sourceContext);
????????????????????//??note:?為增量讀取做準(zhǔn)備,包含了起始偏移量
????????????????????final?MySqlBinlogSplit?appendBinlogSplit?=?createBinlogSplit(sourceContext);
????????????????????final?MySqlOffsetContext?mySqlOffsetContext?=
????????????????????????????statefulTaskContext.getOffsetContext();
????????????????????mySqlOffsetContext.setBinlogStartPoint(
????????????????????????????appendBinlogSplit.getStartingOffset().getFilename(),
????????????????????????????appendBinlogSplit.getStartingOffset().getPosition());
????????????????????//??note:?從起始偏移量開始讀取???????????
????????????????????//?2.?execute?binlog?read?task
????????????????????if?(snapshotResult.isCompletedOrSkipped())?{
????????????????????????//?we?should?only?capture?events?for?the?current?table,
????????????????????????Configuration?dezConf?=
????????????????????????????????statefulTaskContext
????????????????????????????????????????.getDezConf()
????????????????????????????????????????.edit()
????????????????????????????????????????.with(
????????????????????????????????????????????????"table.whitelist",
????????????????????????????????????????????????currentSnapshotSplit.getTableId())
????????????????????????????????????????.build();
????????????????????????//?task?to?read?binlog?for?current?split
????????????????????????MySqlBinlogSplitReadTask?splitBinlogReadTask?=
????????????????????????????????new?MySqlBinlogSplitReadTask(
????????????????????????????????????????new?MySqlConnectorConfig(dezConf),
????????????????????????????????????????mySqlOffsetContext,
????????????????????????????????????????statefulTaskContext.getConnection(),
????????????????????????????????????????statefulTaskContext.getDispatcher(),
????????????????????????????????????????statefulTaskContext.getErrorHandler(),
????????????????????????????????????????StatefulTaskContext.getClock(),
????????????????????????????????????????statefulTaskContext.getTaskContext(),
????????????????????????????????????????(MySqlStreamingChangeEventSourceMetrics)
????????????????????????????????????????????????statefulTaskContext
????????????????????????????????????????????????????????.getStreamingChangeEventSourceMetrics(),
????????????????????????????????????????statefulTaskContext
????????????????????????????????????????????????.getTopicSelector()
????????????????????????????????????????????????.getPrimaryTopic(),
????????????????????????????????????????appendBinlogSplit);
????????????????????????splitBinlogReadTask.execute(
????????????????????????????????new?SnapshotBinlogSplitChangeEventSourceContextImpl());
????????????????????}?else?{
????????????????????????readException?=
????????????????????????????????new?IllegalStateException(
????????????????????????????????????????String.format(
????????????????????????????????????????????????"Read?snapshot?for?mysql?split?%s?fail",
????????????????????????????????????????????????currentSnapshotSplit));
????????????????????}
????????????????}?catch?(Exception?e)?{
????????????????????currentTaskRunning?=?false;
????????????????????LOG.error(
????????????????????????????String.format(
????????????????????????????????????"Execute?snapshot?read?task?for?mysql?split?%s?fail",
????????????????????????????????????currentSnapshotSplit),
????????????????????????????e);
????????????????????readException?=?e;
????????????????}
????????????});
}
SnapshotSplitReader 增量切片讀取。增量階段切片讀取重點(diǎn)是判斷BinlogSplitReadTask什么時候停止,在讀取到分片階段的結(jié)束時的偏移量即終止。
MySqlBinlogSplitReadTask#handleEvent
protected?void?handleEvent(Event?event)?{
????//?note:?事件下發(fā)?隊列
????super.handleEvent(event);
????//?note:?全量讀取階段需要終止Binlog讀取
????//?check?do?we?need?to?stop?for?read?binlog?for?snapshot?split.
????if?(isBoundedRead())?{
????????final?BinlogOffset?currentBinlogOffset?=
????????????????new?BinlogOffset(
????????????????????????offsetContext.getOffset().get(BINLOG_FILENAME_OFFSET_KEY).toString(),
????????????????????????Long.parseLong(
????????????????????????????????offsetContext
????????????????????????????????????????.getOffset()
????????????????????????????????????????.get(BINLOG_POSITION_OFFSET_KEY)
????????????????????????????????????????.toString()));
????????//?note:?currentBinlogOffset?>?HW?停止讀取
????????//?reach?the?high?watermark,?the?binlog?reader?should?finished
????????if?(currentBinlogOffset.isAtOrBefore(binlogSplit.getEndingOffset()))?{
????????????//?send?binlog?end?event
????????????try?{
????????????????signalEventDispatcher.dispatchWatermarkEvent(
????????????????????????binlogSplit,
????????????????????????currentBinlogOffset,
????????????????????????SignalEventDispatcher.WatermarkKind.BINLOG_END);
????????????}?catch?(InterruptedException?e)?{
????????????????logger.error("Send?signal?event?error.",?e);
????????????????errorHandler.setProducerThrowable(
????????????????????????new?DebeziumException("Error?processing?binlog?signal?event",?e));
????????????}
????????????//??終止binlog讀取
????????????//?tell?reader?the?binlog?task?finished
????????????((SnapshotBinlogSplitChangeEventSourceContextImpl)?context).finished();
????????}
????}
}
SnapshotSplitReader 執(zhí)行pollSplitRecords 時對隊列中的原始數(shù)據(jù)進(jìn)行修正。具體處理邏輯查看 RecordUtils#normalizedSplitRecords。
public?Iterator?pollSplitRecords()?throws?InterruptedException?{
????if?(hasNextElement.get())?{
????????//?data?input:?[low?watermark?event][snapshot?events][high?watermark?event][binlogevents][binlog-end?event]
????????//?data?output:?[low?watermark?event][normalized?events][high?watermark?event]
????????boolean?reachBinlogEnd?=?false;
????????final?List?sourceRecords?=?new?ArrayList<>();
????????while?(!reachBinlogEnd)?{
????????????//?note:?處理隊列中寫入的?DataChangeEvent?事件
????????????List?batch?=?queue.poll();
????????????for?(DataChangeEvent?event?:?batch)?{
????????????????sourceRecords.add(event.getRecord());
????????????????if?(RecordUtils.isEndWatermarkEvent(event.getRecord()))?{
????????????????????reachBinlogEnd?=?true;
????????????????????break;
????????????????}
????????????}
????????}
????????//?snapshot?split?return?its?data?once
????????hasNextElement.set(false);
????????//??************???修正數(shù)據(jù)??***********
????????return?normalizedSplitRecords(currentSnapshotSplit,?sourceRecords,?nameAdjuster)
????????????????.iterator();
????}
????//?the?data?has?been?polled,?no?more?data
????reachEnd.compareAndSet(false,?true);
????return?null;
}
BinlogSplitReader 數(shù)據(jù)讀取。讀取邏輯比較簡單,重點(diǎn)是起始偏移量的設(shè)置,起始偏移量為所有切片的HW。
BinlogSplitReader 執(zhí)行pollSplitRecords 時對隊列中的原始數(shù)據(jù)進(jìn)行修正,保障數(shù)據(jù)一致性。增量階段的Binlog讀取是無界的,數(shù)據(jù)會全部下發(fā)到事件隊列,BinlogSplitReader 通過shouldEmit()判斷數(shù)據(jù)是否下發(fā)。
BinlogSplitReader#pollSplitRecords
public?Iterator?pollSplitRecords()?throws?InterruptedException?{
????checkReadException();
????final?List?sourceRecords?=?new?ArrayList<>();
????if?(currentTaskRunning)?{
????????List?batch?=?queue.poll();
????????for?(DataChangeEvent?event?:?batch)?{
????????????if?(shouldEmit(event.getRecord()))?{
????????????????sourceRecords.add(event.getRecord());
????????????}
????????}
????}
????return?sourceRecords.iterator();
}
事件下發(fā)條件:
新收到的event post 大于 maxwm
當(dāng)前 data值所屬某個snapshot spilt & 偏移量大于 HWM,下發(fā)數(shù)據(jù)。
/**
?*
?*?Returns?the?record?should?emit?or?not.
?*
?*?The?watermark?signal?algorithm?is?the?binlog?split?reader?only?sends?the?binlog?event?that
?*?belongs?to?its?finished?snapshot?splits.?For?each?snapshot?split,?the?binlog?event?is?valid
?*?since?the?offset?is?after?its?high?watermark.
?*
?*?
?E.g:?the?data?input?is?:
?*????snapshot-split-0?info?:?[0,????1024)?highWatermark0
?*????snapshot-split-1?info?:?[1024,?2048)?highWatermark1
?*??the?data?output?is:
?*??only?the?binlog?event?belong?to?[0,????1024)?and?offset?is?after?highWatermark0?should?send,
?*??only?the?binlog?event?belong?to?[1024,?2048)?and?offset?is?after?highWatermark1?should?send.
?*?
?*/
private?boolean?shouldEmit(SourceRecord?sourceRecord)?{
????if?(isDataChangeRecord(sourceRecord))?{
????????TableId?tableId?=?getTableId(sourceRecord);
????????BinlogOffset?position?=?getBinlogPosition(sourceRecord);
????????//?aligned,?all?snapshot?splits?of?the?table?has?reached?max?highWatermark
???????
????????//?note:??新收到的event?post?大于?maxwm?,直接下發(fā)
????????if?(position.isAtOrBefore(maxSplitHighWatermarkMap.get(tableId)))?{
????????????return?true;
????????}
????????Object[]?key?=
????????????????getSplitKey(
????????????????????????currentBinlogSplit.getSplitKeyType(),
????????????????????????sourceRecord,
????????????????????????statefulTaskContext.getSchemaNameAdjuster());
????????for?(FinishedSnapshotSplitInfo?splitInfo?:?finishedSplitsInfo.get(tableId))?{
????????????/**
?????????????*??note:?當(dāng)前?data值所屬某個snapshot?spilt?&??偏移量大于?HWM,下發(fā)數(shù)據(jù)
?????????????*/
????????????if?(RecordUtils.splitKeyRangeContains(
????????????????????????????key,?splitInfo.getSplitStart(),?splitInfo.getSplitEnd())
????????????????????&&?position.isAtOrBefore(splitInfo.getHighWatermark()))?{
????????????????return?true;
????????????}
????????}
????????//?not?in?the?monitored?splits?scope,?do?not?emit
????????return?false;
????}
????//?always?send?the?schema?change?event?and?signal?event
????//?we?need?record?them?to?state?of?Flink
????return?true;
}
MySqlRecordEmitter 數(shù)據(jù)下發(fā)
SourceReaderBase 從隊列中獲取切片讀取的DataChangeEvent數(shù)據(jù)集合,將數(shù)據(jù)類型由Debezium的DataChangeEvent 轉(zhuǎn)換為Flink 的RowData類型。
SourceReaderBase 處理切片數(shù)據(jù)流程
org.apache.flink.connector.base.source.reader.SourceReaderBase#pollNext
public?InputStatus?pollNext(ReaderOutput?output)?throws?Exception?{
????//?make?sure?we?have?a?fetch?we?are?working?on,?or?move?to?the?next
????RecordsWithSplitIds?recordsWithSplitId?=?this.currentFetch;
????if?(recordsWithSplitId?==?null)?{
????????recordsWithSplitId?=?getNextFetch(output);
????????if?(recordsWithSplitId?==?null)?{
????????????return?trace(finishedOrAvailableLater());
????????}
????}
????//?we?need?to?loop?here,?because?we?may?have?to?go?across?splits
????while?(true)?{
????????//?Process?one?record.
????????//?note:??通過MySqlRecords從迭代器中讀取單條數(shù)據(jù)
????????final?E?record?=?recordsWithSplitId.nextRecordFromSplit();
????????if?(record?!=?null)?{
????????????//?emit?the?record.
????????????recordEmitter.emitRecord(record,?currentSplitOutput,?currentSplitContext.state);
????????????LOG.trace("Emitted?record:?{}",?record);
????????????//?We?always?emit?MORE_AVAILABLE?here,?even?though?we?do?not?strictly?know?whether
????????????//?more?is?available.?If?nothing?more?is?available,?the?next?invocation?will?find
????????????//?this?out?and?return?the?correct?status.
????????????//?That?means?we?emit?the?occasional?'false?positive'?for?availability,?but?this
????????????//?saves?us?doing?checks?for?every?record.?Ultimately,?this?is?cheaper.
????????????return?trace(InputStatus.MORE_AVAILABLE);
????????}?else?if?(!moveToNextSplit(recordsWithSplitId,?output))?{
????????????//?The?fetch?is?done?and?we?just?discovered?that?and?have?not?emitted?anything,?yet.
????????????//?We?need?to?move?to?the?next?fetch.?As?a?shortcut,?we?call?pollNext()?here?again,
????????????//?rather?than?emitting?nothing?and?waiting?for?the?caller?to?call?us?again.
????????????return?pollNext(output);
????????}
????????//?else?fall?through?the?loop
????}
}
private?RecordsWithSplitIds?getNextFetch(final?ReaderOutput?output)?{
????splitFetcherManager.checkErrors();
????LOG.trace("Getting?next?source?data?batch?from?queue");
????//?note:?從elementsQueue?獲取數(shù)據(jù)
????final?RecordsWithSplitIds?recordsWithSplitId?=?elementsQueue.poll();
????if?(recordsWithSplitId?==?null?||?!moveToNextSplit(recordsWithSplitId,?output))?{
????????return?null;
????}
????currentFetch?=?recordsWithSplitId;
????return?recordsWithSplitId;
}
MySqlRecords 返回單條數(shù)據(jù)集合。
com.ververica.cdc.connectors.mysql.source.split.MySqlRecords#nextRecordFromSplit
public?SourceRecord?nextRecordFromSplit()?{
????final?Iterator?recordsForSplit?=?this.recordsForCurrentSplit;
????if?(recordsForSplit?!=?null)?{
????????if?(recordsForSplit.hasNext())?{
????????????return?recordsForSplit.next();
????????}?else?{
????????????return?null;
????????}
????}?else?{
????????throw?new?IllegalStateException();
????}
}
MySqlRecordEmitter 通過 RowDataDebeziumDeserializeSchema 將數(shù)據(jù)轉(zhuǎn)換為Rowdata。
com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter#emitRecord
public?void?emitRecord(SourceRecord?element,?SourceOutput?output,?MySqlSplitState?splitState)
????throws?Exception?{
if?(isWatermarkEvent(element))?{
????BinlogOffset?watermark?=?getWatermark(element);
????if?(isHighWatermarkEvent(element)?&&?splitState.isSnapshotSplitState())?{
????????splitState.asSnapshotSplitState().setHighWatermark(watermark);
????}
}?else?if?(isSchemaChangeEvent(element)?&&?splitState.isBinlogSplitState())?{
????HistoryRecord?historyRecord?=?getHistoryRecord(element);
????Array?tableChanges?=
????????????historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
????TableChanges?changes?=?TABLE_CHANGE_SERIALIZER.deserialize(tableChanges,?true);
????for?(TableChanges.TableChange?tableChange?:?changes)?{
????????splitState.asBinlogSplitState().recordSchema(tableChange.getId(),?tableChange);
????}
}?else?if?(isDataChangeRecord(element))?{
????//??note:?數(shù)據(jù)的處理
????if?(splitState.isBinlogSplitState())?{
????????BinlogOffset?position?=?getBinlogPosition(element);
????????splitState.asBinlogSplitState().setStartingOffset(position);
????}
????debeziumDeserializationSchema.deserialize(
????????????element,
????????????new?Collector()?{
????????????????@Override
????????????????public?void?collect(final?T?t)?{
????????????????????output.collect(t);
????????????????}
????????????????@Override
????????????????public?void?close()?{
????????????????????//?do?nothing
????????????????}
????????????});
}?else?{
????//?unknown?element
????LOG.info("Meet?unknown?element?{},?just?skip.",?element);
}
}
RowDataDebeziumDeserializeSchema 序列化過程。
com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema#deserialize
public?void?deserialize(SourceRecord?record,?Collector?out)?throws?Exception?{
????Envelope.Operation?op?=?Envelope.operationFor(record);
????Struct?value?=?(Struct)?record.value();
????Schema?valueSchema?=?record.valueSchema();
????if?(op?==?Envelope.Operation.CREATE?||?op?==?Envelope.Operation.READ)?{
????????GenericRowData?insert?=?extractAfterRow(value,?valueSchema);
????????validator.validate(insert,?RowKind.INSERT);
????????insert.setRowKind(RowKind.INSERT);
????????out.collect(insert);
????}?else?if?(op?==?Envelope.Operation.DELETE)?{
????????GenericRowData?delete?=?extractBeforeRow(value,?valueSchema);
????????validator.validate(delete,?RowKind.DELETE);
????????delete.setRowKind(RowKind.DELETE);
????????out.collect(delete);
????}?else?{
????????GenericRowData?before?=?extractBeforeRow(value,?valueSchema);
????????validator.validate(before,?RowKind.UPDATE_BEFORE);
????????before.setRowKind(RowKind.UPDATE_BEFORE);
????????out.collect(before);
????????GenericRowData?after?=?extractAfterRow(value,?valueSchema);
????????validator.validate(after,?RowKind.UPDATE_AFTER);
????????after.setRowKind(RowKind.UPDATE_AFTER);
????????out.collect(after);
????}
}
MySqlSourceReader 匯報切片讀取完成事件
MySqlSourceReader處理完一個全量切片后,會向MySqlSourceEnumerator發(fā)送已完成的切片信息,包含切片ID、HighWatermar ,然后繼續(xù)發(fā)送切片請求。
com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#onSplitFinished
protected?void?onSplitFinished(Map?finishedSplitIds)?{
for?(MySqlSplitState?mySqlSplitState?:?finishedSplitIds.values())?{
????MySqlSplit?mySqlSplit?=?mySqlSplitState.toMySqlSplit();
????finishedUnackedSplits.put(mySqlSplit.splitId(),?mySqlSplit.asSnapshotSplit());
}
/**
?*???note:?發(fā)送切片完成事件
?*/
reportFinishedSnapshotSplitsIfNeed();
//??上一個spilt處理完成后繼續(xù)發(fā)送切片請求
context.sendSplitRequest();
}
private?void?reportFinishedSnapshotSplitsIfNeed()?{
????if?(!finishedUnackedSplits.isEmpty())?{
????????final?Map?finishedOffsets?=?new?HashMap<>();
????????for?(MySqlSnapshotSplit?split?:?finishedUnackedSplits.values())?{
????????????//?note:?發(fā)送切片ID,及最大偏移量
????????????finishedOffsets.put(split.splitId(),?split.getHighWatermark());
????????}
????????FinishedSnapshotSplitsReportEvent?reportEvent?=
????????????????new?FinishedSnapshotSplitsReportEvent(finishedOffsets);
????????context.sendSourceEventToCoordinator(reportEvent);
????????LOG.debug(
????????????????"The?subtask?{}?reports?offsets?of?finished?snapshot?splits?{}.",
????????????????subtaskId,
????????????????finishedOffsets);
????}
}
MySqlSourceEnumerator 分配增量切片
全量階段所有分片讀取完畢后,MySqlHybridSplitAssigner 會創(chuàng)建BinlogSplit 進(jìn)行后續(xù)增量讀取,在創(chuàng)建BinlogSplit 會從全部已完成的全量切片中篩選最小BinlogOffset。注意:2.0.0分支 createBinlogSplit 最小偏移量總是從0開始,最新master分支已經(jīng)修復(fù)這個BUG.
private?MySqlBinlogSplit?createBinlogSplit()?{
????final?List?assignedSnapshotSplit?=
????????????snapshotSplitAssigner.getAssignedSplits().values().stream()
????????????????????.sorted(Comparator.comparing(MySqlSplit::splitId))
????????????????????.collect(Collectors.toList());
????Map?splitFinishedOffsets?=
????????????snapshotSplitAssigner.getSplitFinishedOffsets();
????final?List?finishedSnapshotSplitInfos?=?new?ArrayList<>();
????final?Map?tableSchemas?=?new?HashMap<>();
????BinlogOffset?minBinlogOffset?=?null;
????//?note:?從所有assignedSnapshotSplit中篩選最小偏移量
????for?(MySqlSnapshotSplit?split?:?assignedSnapshotSplit)?{
????????//?find?the?min?binlog?offset
????????BinlogOffset?binlogOffset?=?splitFinishedOffsets.get(split.splitId());
????????if?(minBinlogOffset?==?null?||?binlogOffset.compareTo(minBinlogOffset)?0)?{
????????????minBinlogOffset?=?binlogOffset;
????????}
????????finishedSnapshotSplitInfos.add(
????????????????new?FinishedSnapshotSplitInfo(
????????????????????????split.getTableId(),
????????????????????????split.splitId(),
????????????????????????split.getSplitStart(),
????????????????????????split.getSplitEnd(),
????????????????????????binlogOffset));
????????tableSchemas.putAll(split.getTableSchemas());
????}
????final?MySqlSnapshotSplit?lastSnapshotSplit?=
????????????assignedSnapshotSplit.get(assignedSnapshotSplit.size()?-?1).asSnapshotSplit();
????
????return?new?MySqlBinlogSplit(
????????????BINLOG_SPLIT_ID,
????????????lastSnapshotSplit.getSplitKeyType(),
????????????minBinlogOffset?==?null???BinlogOffset.INITIAL_OFFSET?:?minBinlogOffset,
????????????BinlogOffset.NO_STOPPING_OFFSET,
????????????finishedSnapshotSplitInfos,
????????????tableSchemas);
} 