摘要:本文由社區(qū)志愿者陳政羽整理,內(nèi)容來源自阿里巴巴高級開發(fā)工程師徐榜江 (雪盡) 7 月 10 日在北京站 Flink Meetup 分享的《詳解 Flink-CDC》。深入講解了最新發(fā)布的 Flink CDC 2.0.0 版本帶來的核心特性,包括:全量數(shù)據(jù)的并發(fā)讀取、checkpoint、無鎖讀取等重大改進。
GitHub 項目地址:
https://github.com/ververica/flink-cdc-connectors
Tips:點擊「閱讀原文」了解 Flink CDC 2.0.0 更多特性~
GitHub 地址 
CDC 的全稱是 Change Data Capture ,在廣義的概念上,只要是能捕獲數(shù)據(jù)變更的技術,我們都可以稱之為 CDC 。目前通常描述的 CDC 技術主要面向數(shù)據(jù)庫的變更,是一種用于捕獲數(shù)據(jù)庫中數(shù)據(jù)變更的技術。CDC 技術的應用場景非常廣泛:
- 數(shù)據(jù)分發(fā):一個數(shù)據(jù)源分發(fā)給多個下游系統(tǒng);
- 數(shù)據(jù)采集:面向數(shù)據(jù)倉庫 / 數(shù)據(jù)湖的 ETL 數(shù)據(jù)集成,是非常重要的數(shù)據(jù)源。
CDC 的技術方案非常多,目前業(yè)界主流的實現(xiàn)機制可以分為兩種:- 離線調(diào)度查詢作業(yè),批處理。把一張表同步到其他系統(tǒng),每次通過查詢?nèi)カ@取表中最新的數(shù)據(jù);
- 無法保障數(shù)據(jù)一致性,查的過程中有可能數(shù)據(jù)已經(jīng)發(fā)生了多次變更;
- 不保障實時性,基于離線調(diào)度存在天然的延遲。
- 實時消費日志,流處理,例如 MySQL 的 binlog 日志完整記錄了數(shù)據(jù)庫中的變更,可以把 binlog 文件當作流的數(shù)據(jù)源;
- 保障數(shù)據(jù)一致性,因為 binlog 文件包含了所有歷史變更明細;
- 保障實時性,因為類似 binlog 的日志文件是可以流式消費的,提供的是實時數(shù)據(jù)。
對比常見的開源 CDC 方案,我們可以發(fā)現(xiàn):
- 對比全量同步能力,基于查詢或者日志的 CDC 方案基本都支持,除了 Canal。
- 而對比全量 + 增量同步的能力,只有 Flink CDC、Debezium、Oracle Goldengate 支持較好。
- 從架構角度去看,該表將架構分為單機和分布式,這里的分布式架構不單純體現(xiàn)在數(shù)據(jù)讀取能力的水平擴展上,更重要的是在大數(shù)據(jù)場景下分布式系統(tǒng)接入能力。例如 Flink CDC 的數(shù)據(jù)入湖或者入倉的時候,下游通常是分布式的系統(tǒng),如 Hive、HDFS、Iceberg、Hudi 等,那么從對接入分布式系統(tǒng)能力上看,F(xiàn)link CDC 的架構能夠很好地接入此類系統(tǒng)。
- 在數(shù)據(jù)轉換 / 數(shù)據(jù)清洗能力上,當數(shù)據(jù)進入到 CDC 工具的時候是否能較方便的對數(shù)據(jù)做一些過濾或者清洗,甚至聚合?
- 在 Flink CDC 上操作相當簡單,可以通過 Flink SQL 去操作這些數(shù)據(jù);
- 但是像 DataX、Debezium 等則需要通過腳本或者模板去做,所以用戶的使用門檻會比較高。
- 另外,在生態(tài)方面,這里指的是下游的一些數(shù)據(jù)庫或者數(shù)據(jù)源的支持。Flink CDC 下游有豐富的 Connector,例如寫入到 TiDB、MySQL、Pg、HBase、Kafka、ClickHouse 等常見的一些系統(tǒng),也支持各種自定義 connector。
講到這里,先帶大家回顧下開發(fā) Flink CDC 項目的動機。1. Dynamic Table & ChangeLog Stream
大家都知道 Flink 有兩個基礎概念:Dynamic Table 和 Changelog Stream。
- Dynamic Table 就是 Flink SQL 定義的動態(tài)表,動態(tài)表和流的概念是對等的。參照上圖,流可以轉換成動態(tài)表,動態(tài)表也可以轉換成流。
- 在 Flink SQL中,數(shù)據(jù)在從一個算子流向另外一個算子時都是以 Changelog Stream 的形式,任意時刻的 Changelog Stream 可以翻譯為一個表,也可以翻譯為一個流。
聯(lián)想下 MySQL 中的表和 binlog 日志,就會發(fā)現(xiàn):MySQL 數(shù)據(jù)庫的一張表所有的變更都記錄在 binlog 日志中,如果一直對表進行更新,binlog 日志流也一直會追加,數(shù)據(jù)庫中的表就相當于 binlog 日志流在某個時刻點物化的結果;日志流就是將表的變更數(shù)據(jù)持續(xù)捕獲的結果。這說明 Flink SQL 的 Dynamic Table 是可以非常自然地表示一張不斷變化的 MySQL 數(shù)據(jù)庫表。
在此基礎上,我們調(diào)研了一些 CDC 技術,最終選擇了 Debezium 作為 Flink CDC 的底層采集工具。Debezium 支持全量同步,也支持增量同步,也支持全量 + 增量的同步,非常靈活,同時基于日志的 CDC 技術使得提供 Exactly-Once 成為可能。將 Flink SQL 的內(nèi)部數(shù)據(jù)結構 RowData 和 Debezium 的數(shù)據(jù)結構進行對比,可以發(fā)現(xiàn)兩者是非常相似的。- 每條 RowData 都有一個元數(shù)據(jù) RowKind,包括 4 種類型, 分別是插入 (INSERT)、更新前鏡像 (UPDATE_BEFORE)、更新后鏡像 (UPDATE_AFTER)、刪除 (DELETE),這四種類型和數(shù)據(jù)庫里面的 binlog 概念保持一致。
- 而 Debezium 的數(shù)據(jù)結構,也有一個類似的元數(shù)據(jù) op 字段, op 字段的取值也有四種,分別是 c、u、d、r,各自對應 create、update、delete、read。對于代表更新操作的 u,其數(shù)據(jù)部分同時包含了前鏡像 (before) 和后鏡像 (after)。
通過分析兩種數(shù)據(jù)結構,F(xiàn)link 和 Debezium 兩者的底層數(shù)據(jù)是可以非常方便地對接起來的,大家可以發(fā)現(xiàn) Flink 做 CDC 從技術上是非常合適的。2. 傳統(tǒng) CDC ETL 分析
我們來看下傳統(tǒng) CDC 的 ETL 分析鏈路,如下圖所示:
傳統(tǒng)的基于 CDC 的 ETL 分析中,數(shù)據(jù)采集工具是必須的,國外用戶常用 Debezium,國內(nèi)用戶常用阿里開源的 Canal,采集工具負責采集數(shù)據(jù)庫的增量數(shù)據(jù),一些采集工具也支持同步全量數(shù)據(jù)。采集到的數(shù)據(jù)一般輸出到消息中間件如 Kafka,然后 Flink 計算引擎再去消費這一部分數(shù)據(jù)寫入到目的端,目的端可以是各種 DB,數(shù)據(jù)湖,實時數(shù)倉和離線數(shù)倉。注意,F(xiàn)link 提供了 changelog-json format,可以將 changelog 數(shù)據(jù)寫入離線數(shù)倉如 Hive / HDFS;對于實時數(shù)倉,F(xiàn)link 支持將 changelog 通過 upsert-kafka connector 直接寫入 Kafka。
我們一直在思考是否可以使用 Flink CDC 去替換上圖中虛線框內(nèi)的采集組件和消息隊列,從而簡化分析鏈路,降低維護成本。同時更少的組件也意味著數(shù)據(jù)時效性能夠進一步提高。答案是可以的,于是就有了我們基于 Flink CDC 的 ETL 分析流程。3. 基于 Flink CDC 的 ETL 分析
在使用了 Flink CDC 之后,除了組件更少,維護更方便外,另一個優(yōu)勢是通過 Flink SQL 極大地降低了用戶使用門檻,可以看下面的例子:
該例子是通過 Flink CDC 去同步數(shù)據(jù)庫數(shù)據(jù)并寫入到 TiDB,用戶直接使用 Flink SQL 創(chuàng)建了產(chǎn)品和訂單的 MySQL-CDC 表,然后對數(shù)據(jù)流進行 JOIN 加工,加工后直接寫入到下游數(shù)據(jù)庫。通過一個 Flink SQL 作業(yè)就完成了 CDC 的數(shù)據(jù)分析,加工和同步。大家會發(fā)現(xiàn)這是一個純 SQL 作業(yè),這意味著只要會 SQL 的 BI,業(yè)務線同學都可以完成此類工作。與此同時,用戶也可以利用 Flink SQL 提供的豐富語法進行數(shù)據(jù)清洗、分析、聚合。
而這些能力,對于現(xiàn)有的 CDC 方案來說,進行數(shù)據(jù)的清洗,分析和聚合是非常困難的。此外,利用 Flink SQL 雙流 JOIN、維表 JOIN、UDTF 語法可以非常容易地完成數(shù)據(jù)打寬,以及各種業(yè)務邏輯加工。
4. Flink CDC 項目發(fā)展
- 2020 年 7 月由云邪提交了第一個 commit,這是基于個人興趣孵化的項目;
- 2020 年 7 中旬支持了 MySQL-CDC;
- 2020 年 7 月末支持了 Postgres-CDC;
- 一年的時間,該項目在 GitHub 上的 star 數(shù)已經(jīng)超過 800。

1. Flink CDC 痛點
MySQL CDC 是 Flink CDC 中使用最多也是最重要的 Connector,本文下述章節(jié)描述 Flink CDC Connector 均為 MySQL CDC Connector。隨著 Flink CDC 項目的發(fā)展,得到了很多用戶在社區(qū)的反饋,主要歸納為三個:
- 全量 + 增量讀取的過程需要保證所有數(shù)據(jù)的一致性,因此需要通過加鎖保證,但是加鎖在數(shù)據(jù)庫層面上是一個十分高危的操作。底層 Debezium 在保證數(shù)據(jù)一致性時,需要對讀取的庫或表加鎖,全局鎖可能導致數(shù)據(jù)庫鎖住,表級鎖會鎖住表的讀,DBA 一般不給鎖權限。
- 不支持水平擴展,因為 Flink CDC 底層是基于 Debezium,起架構是單節(jié)點,所以Flink CDC 只支持單并發(fā)。在全量階段讀取階段,如果表非常大 (億級別),讀取時間在小時甚至天級別,用戶不能通過增加資源去提升作業(yè)速度。
- 全量讀取階段不支持 checkpoint:CDC 讀取分為兩個階段,全量讀取和增量讀取,目前全量讀取階段是不支持 checkpoint 的,因此會存在一個問題:當我們同步全量數(shù)據(jù)時,假設需要 5 個小時,當我們同步了 4 小時的時候作業(yè)失敗,這時候就需要重新開始,再讀取 5 個小時。
2. Debezium 鎖分析
Flink CDC 底層封裝了 Debezium, Debezium 同步一張表分為兩個階段:
- 增量階段:從 binlog 消費變更數(shù)據(jù)。
大部分用戶使用的場景都是全量 + 增量同步,加鎖是發(fā)生在全量階段,目的是為了確定全量階段的初始位點,保證增量 + 全量實現(xiàn)一條不多,一條不少,從而保證數(shù)據(jù)一致性。從下圖中我們可以分析全局鎖和表鎖的一些加鎖流程,左邊紅色線條是鎖的生命周期,右邊是 MySQL 開啟可重復讀事務的生命周期。
以全局鎖為例,首先是獲取一個鎖,然后再去開啟可重復讀的事務。這里鎖住操作是讀取 binlog 的起始位置和當前表的 schema。這樣做的目的是保證 binlog 的起始位置和讀取到的當前 schema 是可以對應上的,因為表的 schema 是會改變的,比如如刪除列或者增加列。在讀取這兩個信息后,SnapshotReader 會在可重復讀事務里讀取全量數(shù)據(jù),在全量數(shù)據(jù)讀取完成后,會啟動 BinlogReader 從讀取的 binlog 起始位置開始增量讀取,從而保證全量數(shù)據(jù) + 增量數(shù)據(jù)的無縫銜接。表鎖是全局鎖的退化版,因為全局鎖的權限會比較高,因此在某些場景,用戶只有表鎖。表鎖鎖的時間會更長,因為表鎖有個特征:鎖提前釋放了可重復讀的事務默認會提交,所以鎖需要等到全量數(shù)據(jù)讀完后才能釋放。經(jīng)過上面分析,接下來看看這些鎖到底會造成怎樣嚴重的后果:
Flink CDC 默認使用無鎖模式,能夠滿足大部分場景,但犧牲了一定的數(shù)據(jù)準確性。Flink CDC 也支持配置鎖模式,雖然能保證數(shù)據(jù)一致性,但存在上述 hang 住數(shù)據(jù)的風險。3. Flink CDC 2.0 設計 ( 以 MySQL 為例)
通過上面的分析,可以知道 2.0 的設計方案,核心要解決上述的三個問題,即支持無鎖、水平擴展、checkpoint。

左邊是 Chunk 的切分算法描述,Chunk 的切分算法其實和很多數(shù)據(jù)庫的分庫分表原理類似,通過表的主鍵對表中的數(shù)據(jù)進行分片。假設每個 Chunk 的步長為 10,按照這個規(guī)則進行切分,只需要把這些 Chunk 的區(qū)間做成左開右閉或者左閉右開的區(qū)間,保證銜接后的區(qū)間能夠等于表的主鍵區(qū)間即可。右邊是每個 Chunk 的無鎖讀算法描述,該算法的核心思想是在劃分了 Chunk 后,對于每個 Chunk 的全量讀取和增量讀取,在不用鎖的條件下完成一致性的合并。Chunk 的切分如下圖所示:
因為每個 chunk 只負責自己主鍵范圍內(nèi)的數(shù)據(jù),不難推導,只要能夠保證每個 Chunk 讀取的一致性,就能保證整張表讀取的一致性,這便是無鎖算法的基本原理。Netflix 的 DBLog 論文中 Chunk 讀取算法是通過在 DB 維護一張信號表,再通過信號表在 binlog 文件中打點,記錄每個 chunk 讀取前的 Low Position (低位點) 和讀取結束之后 High Position (高位點) ,在低位點和高位點之間去查詢該 Chunk 的全量數(shù)據(jù)。在讀取出這一部分 Chunk 的數(shù)據(jù)之后,再將這 2 個位點之間的 binlog 增量數(shù)據(jù)合并到 chunk 所屬的全量數(shù)據(jù),從而得到高位點時刻,該 chunk 對應的全量數(shù)據(jù)。Flink CDC 結合自身的情況,在 Chunk 讀取算法上做了去信號表的改進,不需要額外維護信號表,通過直接讀取 binlog 位點替代在 binog 中做標記的功能,整體的 chunk 讀算法描述如下圖所示:
比如正在讀取 Chunk-1,Chunk 的區(qū)間是 [K1, K10],首先直接將該區(qū)間內(nèi)的數(shù)據(jù) select 出來并把它存在 buffer 中,在 select 之前記錄 binlog 的一個位點 (低位點),select 完成后記錄 binlog 的一個位點 (高位點)。然后開始增量部分,消費從低位點到高位點的 binlog。- 圖中的 - ( k2,100 ) + ( k2,108 ) 記錄表示這條數(shù)據(jù)的值從 100 更新到 108;
- 第四條記錄是 k5 的數(shù)據(jù)由原來的 77 變更為 100。
觀察圖片中右下角最終的輸出,會發(fā)現(xiàn)在消費該 chunk 的 binlog 時,出現(xiàn)的 key 是k2、k3、k5,我們前往 buffer 將這些 key 做標記。
- 對于 k1、k4、k6、k7 來說,在高位點讀取完畢之后,這些記錄沒有變化過,所以這些數(shù)據(jù)是可以直接輸出的;
- 對于改變過的數(shù)據(jù),則需要將增量的數(shù)據(jù)合并到全量的數(shù)據(jù)中,只保留合并后的最終數(shù)據(jù)。例如,k2 最終的結果是 119 ,那么只需要輸出 +(k2,119),而不需要中間發(fā)生過改變的數(shù)據(jù)。
通過這種方式,Chunk 最終的輸出就是在高位點是 chunk 中最新的數(shù)據(jù)。上圖描述的是單個 Chunk 的一致性讀,但是如果有多個表分了很多不同的 Chunk,且這些 Chunk 分發(fā)到了不同的 task 中,那么如何分發(fā) Chunk 并保證全局一致性讀呢?這個就是基于 FLIP-27 來優(yōu)雅地實現(xiàn)的,通過下圖可以看到有 SourceEnumerator 的組件,這個組件主要用于 Chunk 的劃分,劃分好的 Chunk 會提供給下游的 SourceReader 去讀取,通過把 chunk 分發(fā)給不同的 SourceReader 便實現(xiàn)了并發(fā)讀取 Snapshot Chunk 的過程,同時基于 FLIP-27 我們能較為方便地做到 chunk 粒度的 checkpoint。
當 Snapshot Chunk 讀取完成之后,需要有一個匯報的流程,如下圖中橘色的匯報信息,將 Snapshot Chunk 完成信息匯報給 SourceEnumerator。
匯報的主要目的是為了后續(xù)分發(fā) binlog chunk (如下圖)。因為 Flink CDC 支持全量 + 增量同步,所以當所有 Snapshot Chunk 讀取完成之后,還需要消費增量的 binlog,這是通過下發(fā)一個 binlog chunk 給任意一個 Source Reader 進行單并發(fā)讀取實現(xiàn)的。
對于大部分用戶來講,其實無需過于關注如何無鎖算法和分片的細節(jié),了解整體的流程就好。整體流程可以概括為,首先通過主鍵對表進行 Snapshot Chunk 劃分,再將 Snapshot Chunk 分發(fā)給多個 SourceReader,每個 Snapshot Chunk 讀取時通過算法實現(xiàn)無鎖條件下的一致性讀,SourceReader 讀取時支持 chunk 粒度的 checkpoint,在所有 Snapshot Chunk 讀取完成后,下發(fā)一個 binlog chunk 進行增量部分的 binlog 讀取,這便是 Flink CDC 2.0 的整體流程,如下圖所示:
Flink CDC 是一個完全開源的項目,項目所有設計和源碼目前都已貢獻到開源社區(qū),Flink CDC 2.0 也已經(jīng)正式發(fā)布,此次的核心改進和提升包括:
- 提供 MySQL CDC 2.0,核心 feature 包括
- 并發(fā)讀取,全量數(shù)據(jù)的讀取性能可以水平擴展;
- 全程無鎖,不對線上業(yè)務產(chǎn)生鎖的風險;
- 斷點續(xù)傳,支持全量階段的 checkpoint。
- 搭建文檔網(wǎng)站,提供多版本文檔支持,文檔支持關鍵詞搜索
筆者用 TPC-DS 數(shù)據(jù)集中的 customer 表進行了測試,F(xiàn)link 版本是 1.13.1,customer 表的數(shù)據(jù)量是 6500 萬條,Source 并發(fā)為 8,全量讀取階段:
為了提供更好的文檔支持,F(xiàn)link CDC 社區(qū)搭建了文檔網(wǎng)站,網(wǎng)站支持對文檔的版本管理:
文檔網(wǎng)站支持關鍵字搜索功能,非常實用:

關于 CDC 項目的未來規(guī)劃,我們希望圍繞穩(wěn)定性,進階 feature 和生態(tài)集成三個方面展開。
- 通過社區(qū)的方式吸引更多的開發(fā)者,公司的開源力量提升 Flink CDC 的成熟度;
- 支持 Lazy Assigning。Lazy Assigning 的思路是將 chunk 先劃分一批,而不是一次性進行全部劃分。當前 Source Reader 對數(shù)據(jù)讀取進行分片是一次性全部劃分好所有 chunk,例如有 1 萬個 chunk,可以先劃分 1 千個 chunk,而不是一次性全部劃分,在 SourceReader 讀取完 1 千 chunk 后再繼續(xù)劃分,節(jié)約劃分 chunk 的時間。
- 支持 Schema Evolution。這個場景是:當同步數(shù)據(jù)庫的過程中,突然在表中添加了一個字段,并且希望后續(xù)同步下游系統(tǒng)的時候能夠自動加入這個字段;
- 支持 Watermark Pushdown 通過 CDC 的 binlog 獲取到一些心跳信息,這些心跳的信息可以作為一個 Watermark,通過這個心跳信息可以知道到這個流當前消費的一些進度;
- 支持 META 數(shù)據(jù),分庫分表的場景下,有可能需要元數(shù)據(jù)知道這條數(shù)據(jù)來源哪個庫哪個表,在下游系統(tǒng)入湖入倉可以有更多的靈活操作;
- 整庫同步:用戶要同步整個數(shù)據(jù)庫只需一行 SQL 語法即可完成,而不用每張表定義一個 DDL 和 query。
- 集成更多上游數(shù)據(jù)庫,如 Oracle,MS SqlServer。Cloudera 目前正在積極貢獻 oracle-cdc connector;
- 在入湖層面,Hudi 和 Iceberg 寫入上有一定的優(yōu)化空間,例如在高 QPS 入湖的時候,數(shù)據(jù)分布有比較大的性能影響,這一點可以通過與生態(tài)打通和集成繼續(xù)優(yōu)化。
最后,歡迎大家加入 Flink CDC 用戶群一起交流。
附錄
https://github.com/ververica/flink-cdc-connectors[2] Flink-CDC 文檔網(wǎng)站:https://ververica.github.io/flink-cdc-connectors/master/[3] Percona - MySQL 全局鎖時間分析:https://www.percona.com/blog/2014/03/11/introducing-backup-locks-percona-server-2/https://arxiv.org/pdf/2010.12597v1.pdfhttps://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
實時數(shù)倉 Meetup 議題征集:
8 月 29 日左右 (時間暫定),F(xiàn)link 社區(qū)計劃舉辦 Meetup 實時數(shù)倉專場,現(xiàn)征集議題中!
關于實時數(shù)倉,大家的關注度一直很高,目前業(yè)界也有許多落地的公司。在 Meetup 實時數(shù)倉專場, 我們將更加注重 “交流”,希望將大家聚集在一起相互探討關于實時數(shù)倉的話題,重點在踩過的坑、碰到的痛點都是怎樣解決的~
現(xiàn)征集實時數(shù)倉 Meetup 的議題,圍繞 “實時數(shù)倉踩坑痛點和避坑經(jīng)驗”,歡迎各位老師和同學帶上貴公司的介紹,以及議題的初步大綱來找小松鼠。
公司不議大小,經(jīng)驗才論足缺。我們會選取其中最具代表性的議題,邀請您參加實時數(shù)倉 Meetup 專場~ 你們的經(jīng)驗對于其他技術開發(fā)者和 Flink 社區(qū)都很重要!
▼ 掃碼添加小松鼠微信 ▼
更多 Flink 相關技術問題,可掃碼加入社區(qū)釘釘交流群~
▼ 關注「Flink 中文社區(qū)」,獲取更多技術干貨 ▼
戳我,立即參加活動 ~