Flink CDC 系列 - Flink MongoDB CDC 在 XTransfer 的生產(chǎn)實(shí)踐
Flink CDC MongoDB 復(fù)制機(jī)制 Flink MongoDB CDC 生產(chǎn)實(shí)踐 后續(xù)規(guī)劃
前言
一、Flink CDC

在 Flink 內(nèi)部,changelog 記錄由 RowData 表示,RowData 包括 4 種類型:+I (INSERT), -U (UPDATE_BEFORE),+U (UPDATE_AFTER), -D (DELETE)。根據(jù) changelog 產(chǎn)生記錄類型的不同,又可以分為 3 種 changelog mode。
INSERT_ONLY:只包含 +I,適用于批處理和事件流。 ALL:包含 +I, -U, +U, -D 全部的 RowKind,如 MySQL binlog。 UPSERT:只包含 +I, +U, -D 三種類型的 RowKind,不包含 -U,但必須按唯一鍵的冪等更新 , 如 MongoDB Change Streams。
二、MongoDB 復(fù)制機(jī)制
2.1 副本集和分片集群
2.2 Replica Set Oplog
{"ts" : Timestamp(1640190995, 3),"t" : NumberLong(434),"h" : NumberLong(3953156019015894279),"v" : 2,"op" : "u","ns" : "db.firm","ui" : UUID("19c72da0-2fa0-40a4-b000-83e038cd2c01"),"o2" : {"_id" : ObjectId("61c35441418152715fc3fcbc")},"wall" : ISODate("2021-12-22T16:36:35.165Z"),"o" : {"$v" : 1,"$set" : {"address" : "Shanghai China"}}}
| 字段 | 是否可空 | 描述 |
| ts | N | 操作時(shí)間,BsonTimestamp |
| t | Y | 對(duì)應(yīng)raft協(xié)議里面的term,每次發(fā)生節(jié)點(diǎn)down掉,新節(jié)點(diǎn)加入,主從切換,term都會(huì)自增。 |
| h | Y | 操作的全局唯一id的hash結(jié)果 |
| v | N | oplog版本 |
| op | N | 操作類型:"i" insert, "u" update, "d" delete, "c" db cmd, "n" no op |
| ns | N | 命名空間,表示操作對(duì)應(yīng)的集合全稱 |
| ui | N | session id |
| o2 | Y | 在更新操作中記錄_id和sharding key |
wall | N | 操作時(shí)間,精確到毫秒 |
| o | N | 變更數(shù)據(jù)描述 |
2.3 Change Streams
■?2.3.1 使用條件
WiredTiger 存儲(chǔ)引擎 副本集 (測(cè)試環(huán)境下,也可以使用單節(jié)點(diǎn)的副本集) 或分片集群部署 副本集協(xié)議版本:pv1 (默認(rèn)) 4.0 版本之前允許 Majority Read Concern: replication.enableMajorityReadConcern = true (默認(rèn)允許) MongoDB 用戶擁有 find 和 changeStream 權(quán)限
■?2.3.2 Change Events
{_id : {Object > },"operationType" : "" ,"fullDocument" : { <document> },"ns" : {"db" : "" ,"coll" : "" },"to" : {"db" : "" ,"coll" : "" },"documentKey" : { "_id" :}, "updateDescription" : {"updatedFields" : { <document> },"removedFields" : [ "" , ... ],"truncatedArrays" : [{ "field" :, "newSize" : }, ...]},"clusterTime" :, "txnNumber" :, "lsid" : {"id" :, "uid" :}}
| 字段 | 類型 | 描述 |
| _id | document | 表示resumeToken |
| operationType | string | 操作類型,包括:insert, delete, replace, update, drop, rename, dropDatabase, invalidate |
| fullDocument | document | 完整文檔記錄,insert, replace默認(rèn)包含,update需要開啟updateLookup,delete和其他操作類型不包含 |
| ns | document | 操作記錄對(duì)應(yīng)集合的完全名稱 |
| to | document | 當(dāng)操作類型為rename時(shí),to表示重命名后的完全名稱 |
| documentKey | document | 包含變更文檔的主鍵 _id,如果該集合是一個(gè)分片集合,documentKey中也會(huì)包含分片建 |
| updateDescription | document | 當(dāng)操作類型為update時(shí),描述有變更的字段和值 |
| clusterTime | Timestamp | 操作時(shí)間 |
| txnNumber | NumberLong | 事務(wù)號(hào) |
| lsid | Document | session id |
■?2.3.3 Update Lookup
三、Flink MongoDB CDC

支持特性
支持 Exactly-Once 語(yǔ)義 支持全量、增量訂閱 支持 Snapshot 數(shù)據(jù)過濾 支持從檢查點(diǎn)、保存點(diǎn)恢復(fù) 支持元數(shù)據(jù)提取
四、生產(chǎn)實(shí)踐
4.1 使用 RocksDB State Backend
4.2 合適的 oplog 容量和過期時(shí)間
db.adminCommand({replSetResizeOplog: 1, // 固定值1size: 20480, // 單位為MB,范圍在990MB到1PBminRetentionHours: 168 // 可選項(xiàng),單位為小時(shí)})
4.3 變更慢的表開啟心跳事件
WITH ('connector' = 'mongodb-cdc','heartbeat.interval.ms' = '60000')
4.4 自定義 MongoDB 連接參數(shù)
WITH ('connector' = 'mongodb-cdc','connection.options' = 'authSource=authDB&maxPoolSize=3')
4.5 Change Stream 參數(shù)調(diào)優(yōu)
可以在 Flink DDL 中通過 poll.await.time.ms 和 poll.max.batch.size 精細(xì)化配置變更事件的拉取。
poll.await.time.ms
變更事件拉取時(shí)間間隔,默認(rèn)為 1500ms。對(duì)于變更頻繁的集合,可以適當(dāng)調(diào)小拉取間隔,提升處理時(shí)效;對(duì)于變更緩慢的集合,可以適當(dāng)調(diào)大拉取時(shí)間間隔,減輕數(shù)據(jù)庫(kù)壓力。
poll.max.batch.size
每一批次拉取變更事件的最大條數(shù),默認(rèn)為 1000 條。調(diào)大改參數(shù)會(huì)加快從 Cursor 中拉取變更事件的速度,但會(huì)提升內(nèi)存的開銷。
4.6 訂閱整庫(kù)、集群變更
MongoDBSource.<String>builder().hosts("127.0.0.1:27017").database("").collection("").pipeline("[{'$match': {'ns.db': {'$regex': '/^(sandbox|firewall)$/'}}}]").deserializer(new JsonDebeziumDeserializationSchema()).build();
4.7 權(quán)限控制
MongoDB 支持對(duì)用戶、角色、權(quán)限進(jìn)行細(xì)粒度的管控,開啟 Change Stream 的用戶需要擁有 find 和 changeStream 兩個(gè)權(quán)限。
單集合
{ resource: { db:, collection: }, actions: [ "find", "changeStream" ] }
單庫(kù)
{ resource: { db:, collection: "" }, actions: [ "find", "changeStream" ] }
集群
{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
use admin;// 創(chuàng)建用戶db.createUser({user: "flink",pwd: "flinkpw",roles: []});// 創(chuàng)建角色db.createRole({role: "flink_role",privileges: [{ resource: { db: "inventory", collection: "products" }, actions: [ "find", "changeStream" ] }],roles: []});// 給用戶授予角色db.grantRolesToUser("flink",[// 注意:這里的db指角色創(chuàng)建時(shí)的db,在admin下創(chuàng)建的角色可以包含不同database的訪問權(quán)限{ role: "flink_role", db: "admin" }]);// 給角色追加權(quán)限db.grantPrivilegesToRole("flink_role",[{ resource: { db: "inventory", collection: "orders" }, actions: [ "find", "changeStream" ] }]);
use admin;db.createUser({user: "flink",pwd: "flinkpw",roles: [{ role: "read", db: "admin" },{ role: "readAnyDatabase", db: "admin" }]});
五、后續(xù)規(guī)劃
支持增量 Snapshot 目前,MongoDB CDC Connector 還不支持增量 Snapshot,對(duì)于數(shù)據(jù)量較大的表還不能很好發(fā)揮 Flink 并行計(jì)算的優(yōu)勢(shì)。后續(xù)將實(shí)現(xiàn) MongoDB 的增量 Snapshot 功能,使其支持 Snapshot 階段的 checkpoint,和并發(fā)度設(shè)置。
支持從指定時(shí)間進(jìn)行變更訂閱 目前,MongoDB CDC Connector 僅支持從當(dāng)前時(shí)間開始 Change Stream 的訂閱,后續(xù)將提供從指定時(shí)間點(diǎn)的 Change Stream 訂閱。
支持庫(kù)和集合的篩選 目前,MongoDB CDC Connector 支持集群、整庫(kù)的變更訂閱和篩選,但對(duì)于是否需要進(jìn)行 Snapshot 的集合的篩選還不支持,后續(xù)將完善這個(gè)功能。
Flink CDC 系列 - 同步 MySQL 分庫(kù)分表,構(gòu)建 Iceberg 實(shí)時(shí)數(shù)據(jù)湖 Flink CDC 系列 - 實(shí)時(shí)抽取 Oracle 數(shù)據(jù),排雷和調(diào)優(yōu)實(shí)踐 Flink CDC 系列 - 構(gòu)建 MySQL 和 Postgres 上的 Streaming ETL
2022 年 1?月 8-9 日,F(xiàn)FA 2021 重磅開啟,全球?40+?多行業(yè)一線廠商,80+?干貨議題,帶來專屬于開發(fā)者的技術(shù)盛宴。

?
??戳我,預(yù)約 FFA 2021~
