基于 Flink 實現(xiàn)解決數(shù)據(jù)庫分庫分表任務拆分
點擊上方“中間件興趣圈”,選擇“設為星標”
1、場景描述
例如訂單庫進行了分庫分表,其示例如下圖所示:

現(xiàn)在的需求是希望創(chuàng)建一個任務就將數(shù)據(jù)同步到MQ集群,而不是為每一個數(shù)據(jù)庫實例單獨創(chuàng)建一個任務,將其數(shù)據(jù)導入到MQ集群,因為同步任務除了庫不同之外,表的結構、數(shù)據(jù)映射規(guī)則都是一致的。
2、flinkx 的解決方案詳解
2.1 fink Stream API 開發(fā)基本流程
使用 Flink Stream API 編程的通用步驟如下圖所示:

溫馨提示:有關 Stream API 的詳細內容將在后續(xù)的文章中展開,本文主要是關注 InputFormatSourceFunction,重點關注數(shù)據(jù)源的拆分。
2.2 flinkx Reader(數(shù)據(jù)源)核心類圖

在 flinkx 中將不同的數(shù)據(jù)源封裝成一個個 Reader,其基類為 BaseDataReader,上圖中主要羅列了如下幾個關鍵的類體系:
InputFormat
flink 核心API,主要是對輸入源進行數(shù)據(jù)切分、讀取數(shù)據(jù)的抽象,其核心接口說明如下:void configure(Configuration parameters)
對輸入源進行額外的配置,該方法在 Input 的生命周期中只需調用一次。BaseStatistics getStatistics(BaseStatistics cachedStatistics)
返回 input 的統(tǒng)計數(shù)據(jù),如果不需要統(tǒng)計,在實現(xiàn)的時候可以直接返回 null。T[] createInputSplits(int minNumSplits)
對輸入數(shù)據(jù)進行數(shù)據(jù)切片,使之支持并行處理,數(shù)據(jù)切片相關類體系見:InputSplit。InputSplitAssigner getInputSplitAssigner(T[] inputSplits)
獲取 InputSplit 分配器,主要是在具體執(zhí)行任務時如何獲取下一個 InputSplit,其聲明如下圖所示:
void open(T split)
根據(jù)指定的數(shù)據(jù)分片 (InputSplit) 打開數(shù)據(jù)通道。為了加深對該方法的理解,下面看一下 Flinkx 關于 jdbc、es 的寫入示例:
boolean reachedEnd()
數(shù)據(jù)是否已結束,在 Flink 中通常 InputFormat 的數(shù)據(jù)源通常表示有界數(shù)據(jù) (DataSet)。OT nextRecord(OT reuse)
從通道中獲取下一條記錄。
void close()
關閉。InputSplit
數(shù)據(jù)分片根接口,只定義了如下方法:int getSplitNumber()
獲取當前分片所在所有分片中的序號。本文先簡單介紹一下其通用實現(xiàn)子類:GenericInputSplit。
int partitionNumber
當前 split 所在的序號int totalNumberOfPartitions
總分片數(shù)為了方便理解我們可以思考一下如下場景,對于一個數(shù)據(jù)量超過千萬級別的表,在進行數(shù)據(jù)切分時可以考慮使用10個線程,即切割成 10分,那每一個數(shù)據(jù)線程查詢數(shù)據(jù)時可以 id % totalNumberOfPartitions = partitionNumber,進行數(shù)據(jù)讀取。
SourceFunction
Flink 源的抽象定義。RichFunction
富函數(shù),定義了生命周期、可獲取運行時環(huán)境上下文。ParallelSourceFunction
支持并行的 source function。RichParallelSourceFunction
并行的富函數(shù)
InputFormatSourceFunction
Flink 默認提供的 RichParallelSourceFunction 實現(xiàn)類,可以當成是RichParallelSourceFunction 的通用寫法,其內部的數(shù)據(jù)讀取邏輯由 InputFormat 實現(xiàn)。
BaseDataReader
flinkx 數(shù)據(jù)讀取基類,在 flinkx 中將所有的數(shù)據(jù)讀取源封裝成 Reader 。
2.3 flinkx Reader構建 DataStream 流程
經(jīng)過了上面類圖的梳理,大家應該 flink 中提到的上述類的含義有了一個大概的理解,但如何運用呢?接下來將通過查閱 flinkx 的 DistributedJdbcDataReader(BaseDataReader的子類)的 readData 調用流程,體會一下其使用方法。

基本遵循創(chuàng)建 InputFormat、從而創(chuàng)建對應的 SourceFunction,然后通過 StreamExecutionEnvironment 的 addSource 方法將 SourceFunction 創(chuàng)建對應的 DataStreamSource。
2.4 flinkx 針對數(shù)據(jù)庫分庫分表任務拆分解決方案
正如本文開頭部分的場景描述那樣,某訂單系統(tǒng)被設計成4庫8表,每一個庫(Schema)中包含2個表,如何提高數(shù)據(jù)導出的性能呢,如何提高數(shù)據(jù)的抽取性能呢?通常的解決方案如下:
首先按庫按表進行拆分,即4庫8表,可以進行切分8份,每一個數(shù)據(jù)分配處理一個實例中的1個表。
單個表的數(shù)據(jù)抽取再進行拆分,例如按ID進行取模進一步分解。
flinkx 就是采取上面的策略,我們來看一下其具體做法。

Step1:首先先根據(jù)數(shù)據(jù)庫實例、表進行拆分,按表維度組織成一個 DataSource 列表,后續(xù)將基于這個原始數(shù)據(jù)執(zhí)行拆分算法。
接下來具體的任務拆分在 InputFormat 中實現(xiàn),本實例在 DistributedJdbcInputFormat 的 createInputSplitsInternal 中。

Step2:根據(jù)分區(qū)創(chuàng)建 inputSplit 數(shù)組,這里分區(qū)的概念就相當于上文提到方案中的第一條。

Step3:如果指定了 splitKey 的任務拆分算法,首先 DistributedJdbcInputSplit 繼承自 GenericInputSplit,總分區(qū)數(shù)為 numPartitions,然后生成數(shù)據(jù)庫的參數(shù),這里主要是生成 SQL Where 語句中的 splitKey mod totalNumberOfPartitions = partitionNumber,其中 splitKey 為分片鍵,例如 id,而 totalNumberOfPartitions 表示分區(qū)總數(shù),partitionNumber 表示當前分片的序號,通過 SQL 取模函數(shù)進行數(shù)據(jù)拆分。

Step4:如果未指定表級別的數(shù)據(jù)拆分鍵,則拆分策略是對 sourceList 進行拆分,即一些分區(qū)處理其中幾個表。
關于 flinkx 中關于任務切分的介紹就到這里了。
3、總結
本文主要是基于 flinkx 介紹 MySQL 分庫分表情況下如何基于 flink 進行任務切分,簡單介紹了 Flink 中關于基本的編程范式、InputFormat、SourceFunction 的基本類體系。
溫馨提示:本文并沒有太詳細對 Flink API 進行深入研究,后續(xù)會單獨對 Flink 內容進行逐一剖析,但 Flink 系列的文章組織,其文章的組織并不具備順序性,筆者會在不斷實踐 Flink 的過程中對 FLink 進行剖析。
本文就介紹到這來了,點贊、在看、轉發(fā)是對我最大的鼓勵。

