<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink CDC 系列 - Flink MongoDB CDC 在 XTransfer 的生產(chǎn)實(shí)踐

          共 9711字,需瀏覽 20分鐘

           ·

          2021-12-31 17:50

          ▼ 關(guān)注「Apache Flink」,獲取更多技術(shù)干貨?
          摘要:本文作者孫家寶,分享如何在 Flink CDC 基礎(chǔ)上通過 MongoDB Change Streams 特性實(shí)現(xiàn)了 Flink MongoDB CDC Connector。主要內(nèi)容包括:

          1. Flink CDC
          2. MongoDB 復(fù)制機(jī)制
          3. Flink MongoDB CDC
          4. 生產(chǎn)實(shí)踐
          5. 后續(xù)規(guī)劃

          Tips:點(diǎn)擊「閱讀原文」預(yù)約 FFA 2021~

          前言


          XTransfer 專注為跨境 B2B 電商中小企業(yè)提供跨境金融和風(fēng)控服務(wù),通過建立數(shù)據(jù)化、自動(dòng)化、互聯(lián)網(wǎng)化和智能化的風(fēng)控基礎(chǔ)設(shè)施,搭建通達(dá)全球的財(cái)資管理平臺(tái),提供開立全球和本地收款賬戶、外匯兌換、海外外匯管制國(guó)家申報(bào)等多種跨境金融服務(wù)的綜合解決方案。

          在業(yè)務(wù)發(fā)展早期,我們選擇了傳統(tǒng)的離線數(shù)倉(cāng)架構(gòu),采用全量采集、批量處理、覆蓋寫入的數(shù)據(jù)集成方式,數(shù)據(jù)時(shí)效性較差。隨著業(yè)務(wù)的發(fā)展,離線數(shù)倉(cāng)越來越不能滿足對(duì)數(shù)據(jù)時(shí)效性的要求,我們決定從離線數(shù)倉(cāng)向?qū)崟r(shí)數(shù)倉(cāng)進(jìn)行演進(jìn)。而建設(shè)實(shí)時(shí)數(shù)倉(cāng)的關(guān)鍵點(diǎn)在于變更數(shù)據(jù)采集工具和實(shí)時(shí)計(jì)算引擎的選擇。

          經(jīng)過了一系列的調(diào)研,在 2021 年 2 月份,我們關(guān)注到了 Flink CDC 項(xiàng)目,F(xiàn)link CDC 內(nèi)嵌了 Debezium,使 Flink 本身具有了變更數(shù)據(jù)捕獲的能力,很大程度上降低了開發(fā)門檻,簡(jiǎn)化了部署復(fù)雜度。加上 Flink 強(qiáng)大的實(shí)時(shí)計(jì)算能力和豐富的外部系統(tǒng)接入能力,成為了我們構(gòu)建實(shí)時(shí)數(shù)倉(cāng)的關(guān)鍵工具。

          另外,我們?cè)谏a(chǎn)中也大量使用到了 MongoDB,所以我們?cè)?Flink CDC 基礎(chǔ)上通過 MongoDB Change Streams 特性實(shí)現(xiàn)了 Flink MongoDB CDC Connector,并貢獻(xiàn)給了 Flink CDC 社區(qū),目前已在 2.1 版本中發(fā)布。很榮幸在這里能夠在這里和大家分享一下實(shí)現(xiàn)細(xì)節(jié)和生產(chǎn)實(shí)踐。

          一、Flink CDC


          Dynamic Table (動(dòng)態(tài)表) 是 Flink 的支持流數(shù)據(jù)的 Table API 和 SQL 的核心概念。流和表具有對(duì)偶性,可以將表轉(zhuǎn)換成一個(gè)變更流 (changelog stream),也可以回放變更流還原成一張表。

          變更流有兩種形式:Append Mode 和 Update Mode。Append Mode 只會(huì)新增,不會(huì)變更和刪除,常見的如事件流。Update Mode 可能新增,也可能發(fā)生變更和刪除,常見的如數(shù)據(jù)庫(kù)操作日志。在 Flink 1.11之前,只支持在 Append Mode 上定義動(dòng)態(tài)表。

          Flink 1.11 在 FLIP-95 引入了新的 TableSource 和 TableSink,實(shí)現(xiàn)了對(duì) Update Mode changelog 的支持。并且在 FLIP-105 中,引入了對(duì) Debezium 和 Canal CDC format 的直接支持。通過實(shí)現(xiàn) ScanTableSource,接收外部系統(tǒng)變更日志 (如數(shù)據(jù)庫(kù)的變更日志),將其解釋為 Flink 的能夠識(shí)別的 changlog 并向下流轉(zhuǎn),便可以支持從變更日志定義動(dòng)態(tài)表。


          在 Flink 內(nèi)部,changelog 記錄由 RowData 表示,RowData 包括 4 種類型:+I (INSERT), -U (UPDATE_BEFORE),+U (UPDATE_AFTER), -D (DELETE)。根據(jù) changelog 產(chǎn)生記錄類型的不同,又可以分為 3 種 changelog mode。


          • INSERT_ONLY:只包含 +I,適用于批處理和事件流。

          • ALL:包含 +I, -U, +U, -D 全部的 RowKind,如 MySQL binlog。

          • UPSERT:只包含 +I, +U, -D 三種類型的 RowKind,不包含 -U,但必須按唯一鍵的冪等更新 , 如 MongoDB Change Streams。

          二、MongoDB 復(fù)制機(jī)制


          如上節(jié)所述,實(shí)現(xiàn) Flink CDC MongoDB 的關(guān)鍵點(diǎn)在于:如何將 MongoDB 的操作日志轉(zhuǎn)換為 Flink 支持的 changelog。要解決這個(gè)問題,首先需要了解一下 MongoDB 的集群部署和復(fù)制機(jī)制。

          2.1 副本集和分片集群


          副本集是 MongoDB 提供的一種高可用的部署模式,副本集成員之間通過 oplog (操作日志) 的復(fù)制,來完成副本集成員之間的數(shù)據(jù)同步。

          分片集群是 MongoDB 支持大規(guī)模數(shù)據(jù)集和高吞吐量操作的部署模式,每個(gè)分片由一個(gè)副本集組成。


          2.2 Replica Set Oplog


          操作日志 oplog,在 MongoDB 中是一個(gè)特殊的 capped collection (固定容量的集合),用來記錄數(shù)據(jù)的操作日志,用于副本集成員之間的同步。oplog 記錄的數(shù)據(jù)結(jié)構(gòu)如下所示。

          {    "ts" : Timestamp(1640190995, 3),    "t" : NumberLong(434),    "h" : NumberLong(3953156019015894279),    "v" : 2,    "op" : "u",    "ns" : "db.firm",    "ui" : UUID("19c72da0-2fa0-40a4-b000-83e038cd2c01"),    "o2" : {        "_id" : ObjectId("61c35441418152715fc3fcbc")    },    "wall" : ISODate("2021-12-22T16:36:35.165Z"),    "o" : {        "$v" : 1,        "$set" : {            "address" : "Shanghai China"        }    }}

          字段
          是否可空
          描述
          tsN操作時(shí)間,BsonTimestamp
          tY對(duì)應(yīng)raft協(xié)議里面的term,每次發(fā)生節(jié)點(diǎn)down掉,新節(jié)點(diǎn)加入,主從切換,term都會(huì)自增。
          hY操作的全局唯一id的hash結(jié)果
          vNoplog版本
          opN操作類型:"i" insert, "u" update, "d" delete, "c" db cmd, "n" no op
          nsN命名空間,表示操作對(duì)應(yīng)的集合全稱
          uiNsession id
          o2Y在更新操作中記錄_id和sharding key

          wall

          N操作時(shí)間,精確到毫秒
          oN變更數(shù)據(jù)描述

          從示例中可以看出,MongoDB oplog 的更新記錄即不包含更新前的信息,也不包含更新后的完整記錄,所以即不能轉(zhuǎn)換成 Flink 支持的 ALL 類型的 changelog,也難以轉(zhuǎn)換成 UPSERT 類型的 changelog。

          另外,在分片集群中,數(shù)據(jù)的寫入可能發(fā)生在不同的分片副本集中,因此每個(gè)分片的 oplog 中僅會(huì)記錄發(fā)生在該分片上的數(shù)據(jù)變更。因此需要獲取完整的數(shù)據(jù)變更,需要將每個(gè)分片的 oplog 按照操作時(shí)間排序合并到一起,加大了捕獲變更記錄的難度和風(fēng)險(xiǎn)。

          Debezium MongoDB Connector 在 1.7 版本之前是通過遍歷 oplog 來實(shí)現(xiàn)變更數(shù)據(jù)捕獲,由于上述原因,我們沒有采用 Debezium MongoDB Connector 而選擇了 MongoDB 官方的基于 Change Streams 的 MongoDB Kafka Connector。

          2.3 Change Streams


          Change Streams 是 MongoDB 3.6 推出的一個(gè)新特性,屏蔽了遍歷 oplog 的復(fù)雜度,使用戶通過簡(jiǎn)單的 API 就能訂閱集群、數(shù)據(jù)庫(kù)、集合級(jí)別的數(shù)據(jù)變更。

          ■?2.3.1 使用條件


          • WiredTiger 存儲(chǔ)引擎

          • 副本集 (測(cè)試環(huán)境下,也可以使用單節(jié)點(diǎn)的副本集) 或分片集群部署

          • 副本集協(xié)議版本:pv1 (默認(rèn))

          • 4.0 版本之前允許 Majority Read Concern: replication.enableMajorityReadConcern = true (默認(rèn)允許)

          • MongoDB 用戶擁有 find 和 changeStream 權(quán)限

          ■?2.3.2 Change Events


          Change Events 是 Change Streams 返回的變更記錄,其數(shù)據(jù)結(jié)構(gòu)如下所示:

          {   _id : { Object> },   "operationType" : "",   "fullDocument" : { <document> },   "ns" : {      "db" : "",      "coll" : ""   },   "to" : {      "db" : "",      "coll" : ""   },   "documentKey" : { "_id" :  },   "updateDescription" : {      "updatedFields" : { <document> },      "removedFields" : [ "", ... ],      "truncatedArrays" : [         { "field" : , "newSize" :  },         ...      ]   },   "clusterTime" : ,   "txnNumber" : ,   "lsid" : {      "id" : ,      "uid" :    }}

          字段類型描述
          _iddocument表示resumeToken
          operationTypestring操作類型,包括:insert, delete, replace, update, drop, rename, dropDatabase, invalidate
          fullDocumentdocument完整文檔記錄,insert, replace默認(rèn)包含,update需要開啟updateLookup,delete和其他操作類型不包含
          nsdocument操作記錄對(duì)應(yīng)集合的完全名稱
          todocument當(dāng)操作類型為rename時(shí),to表示重命名后的完全名稱
          documentKeydocument包含變更文檔的主鍵 _id,如果該集合是一個(gè)分片集合,documentKey中也會(huì)包含分片建
          updateDescriptiondocument當(dāng)操作類型為update時(shí),描述有變更的字段和值
          clusterTimeTimestamp操作時(shí)間
          txnNumberNumberLong事務(wù)號(hào)
          lsidDocumentsession id

          ■?2.3.3 Update Lookup


          由于 oplog 的更新操作僅包含了有變更后的字段,變更后完整的文檔無法從 oplog 直接獲取,但是在轉(zhuǎn)換為 UPSERT 模式的 changelog 時(shí),UPDATE_AFTER RowData 必須擁有完整行記錄。Change Streams 通過設(shè)置 fullDocument = updateLookup,可以在獲取變更記錄時(shí)返回該文檔的最新狀態(tài)。另外,Change Event 的每條記錄都包含 documentKey (_id 以及 shard key),標(biāo)識(shí)發(fā)生變更記錄的主鍵信息,即滿足冪等更新的條件。所以通過 Update Lookup 特性,可以將 MongoDB 的變更記錄轉(zhuǎn)換成 Flink 的 UPSERT changelog。

          三、Flink MongoDB CDC


          在具體實(shí)現(xiàn)上,我們集成了 MongoDB 官方基于 Change Streams 實(shí)現(xiàn)的 MongoDB Kafka Connector。通過 Debezium EmbeddedEngine,可以很容易地在 Flink 中驅(qū)動(dòng) MongoDB Kafka Connector 運(yùn)行。通過將 Change Stream 轉(zhuǎn)換成 Flink UPSERT changelog,實(shí)現(xiàn)了 MongoDB CDC TableSource。配合 Change Streams 的 resume 機(jī)制,實(shí)現(xiàn)了從 checkpoint、savepoint 恢復(fù)的功能。

          如 FLIP-149 所述,一些運(yùn)算 (如聚合) 在缺失 -U 消息時(shí)難以正確處理。對(duì)于 UPSERT 類型的 changelog,F(xiàn)link Planner 會(huì)引入額外的計(jì)算節(jié)點(diǎn) (Changelog Normalize) 來將其標(biāo)準(zhǔn)化為 ALL 類型的 changelog。


          支持特性


          • 支持 Exactly-Once 語(yǔ)義

          • 支持全量、增量訂閱

          • 支持 Snapshot 數(shù)據(jù)過濾

          • 支持從檢查點(diǎn)、保存點(diǎn)恢復(fù)

          • 支持元數(shù)據(jù)提取

          四、生產(chǎn)實(shí)踐


          4.1 使用 RocksDB State Backend


          Changelog Normalize 為了補(bǔ)齊 -U 的前置鏡像值,會(huì)帶來額外的狀態(tài)開銷,在生產(chǎn)環(huán)境中推薦使用 RocksDB State Backend。

          4.2 合適的 oplog 容量和過期時(shí)間


          MongoDB oplog.rs 是一個(gè)特殊的有容量集合,當(dāng) oplog.rs 容量達(dá)到最大值時(shí),會(huì)丟棄歷史的數(shù)據(jù)。Change Streams 通過 resume token 進(jìn)行恢復(fù),太小的 oplog 容量可能導(dǎo)致 resume token 對(duì)應(yīng)的 oplog 記錄不再存在,因而導(dǎo)致恢復(fù)失敗。

          在沒有顯示指定 oplog 容量時(shí),WiredTiger 引擎的 oplog 默認(rèn)容量為磁盤大小的 5%,下限為 990MB,上限為 50GB。在 MongoDB 4.4 之后,支持設(shè)置 oplog 最短保留時(shí)間,在 oplog 已滿并且 oplog 記錄超過最短保留時(shí)間時(shí),才會(huì)對(duì)該 oplog 記錄進(jìn)行回收。

          可以使用 replSetResizeOplog 命令重新設(shè)置 oplog 容量和最短保留時(shí)間。在生產(chǎn)環(huán)境下,建議設(shè)置 oplog 容量不小于 20GB,oplog 保留時(shí)間不少于 7 天。

          db.adminCommand(  {    replSetResizeOplog: 1, // 固定值1    size: 20480,           // 單位為MB,范圍在990MB到1PB    minRetentionHours: 168 // 可選項(xiàng),單位為小時(shí)  })

          4.3 變更慢的表開啟心跳事件


          Flink MongoDB CDC 會(huì)定期將 resume token 寫入 checkpoint 對(duì) Change Stream 進(jìn)行恢復(fù),MongoDB 變更事件或者心跳事件都能觸發(fā) resume token 的更新。如果訂閱的集合變更緩慢,可能造成最后一條變更記錄對(duì)應(yīng)的 resume token 過期,從而無法從 checkpoint 進(jìn)行恢復(fù)。因此對(duì)于變更緩慢的集合,建議開啟心跳事件 (設(shè)置 heartbeat.interval.ms > 0),來維持 resume token 的更新。

          WITH (    'connector' = 'mongodb-cdc',    'heartbeat.interval.ms' = '60000')

          4.4 自定義 MongoDB 連接參數(shù)


          當(dāng)默認(rèn)連接無法滿足使用要求時(shí),可以通過 connection.options 配置項(xiàng)傳遞 MongoDB 支持的連接參數(shù)。
          https://docs.mongodb.com/manual/reference/connection-string/#connection-string-options

          WITH (   'connector' = 'mongodb-cdc',   'connection.options' = 'authSource=authDB&maxPoolSize=3')

          4.5 Change Stream 參數(shù)調(diào)優(yōu)


          可以在 Flink DDL 中通過 poll.await.time.ms 和 poll.max.batch.size 精細(xì)化配置變更事件的拉取。


          • poll.await.time.ms


          變更事件拉取時(shí)間間隔,默認(rèn)為 1500ms。對(duì)于變更頻繁的集合,可以適當(dāng)調(diào)小拉取間隔,提升處理時(shí)效;對(duì)于變更緩慢的集合,可以適當(dāng)調(diào)大拉取時(shí)間間隔,減輕數(shù)據(jù)庫(kù)壓力。


          • poll.max.batch.size


          每一批次拉取變更事件的最大條數(shù),默認(rèn)為 1000 條。調(diào)大改參數(shù)會(huì)加快從 Cursor 中拉取變更事件的速度,但會(huì)提升內(nèi)存的開銷。


          4.6 訂閱整庫(kù)、集群變更


          database = "db",collection = "",可以訂閱 db 整庫(kù)的變更;database = "",collection = "",可以訂閱整個(gè)集群的變更。

          DataStream API 可以使用 pipeline 可以過濾需要訂閱的 db 和 collection,對(duì)于 Snapshot 集合的過濾目前還不支持。

          MongoDBSource.<String>builder()    .hosts("127.0.0.1:27017")    .database("")    .collection("")    .pipeline("[{'$match': {'ns.db': {'$regex': '/^(sandbox|firewall)$/'}}}]")    .deserializer(new JsonDebeziumDeserializationSchema())    .build();

          4.7 權(quán)限控制


          MongoDB 支持對(duì)用戶、角色、權(quán)限進(jìn)行細(xì)粒度的管控,開啟 Change Stream 的用戶需要擁有 find 和 changeStream 兩個(gè)權(quán)限。


          • 單集合


          { resource: { db: , collection:  }, actions: [ "find", "changeStream" ] }


          • 單庫(kù)


          { resource: { db: , collection: "" }, actions: [ "find", "changeStream" ] }


          • 集群


          { resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }

          在生產(chǎn)環(huán)境下,建議創(chuàng)建 Flink 用戶和角色,并對(duì)該角色進(jìn)行細(xì)粒度的授權(quán)。需要注意的是,MongoDB 可以在任何 database 下創(chuàng)建用戶和角色,如果用戶不是創(chuàng)建在 admin 下,需要在連接參數(shù)中指定 authSource =< 用戶所在的 database>。

          use admin;// 創(chuàng)建用戶db.createUser( {   user: "flink",   pwd: "flinkpw",   roles: [] });
          // 創(chuàng)建角色db.createRole( { role: "flink_role", privileges: [ { resource: { db: "inventory", collection: "products" }, actions: [ "find", "changeStream" ] } ], roles: [] });
          // 給用戶授予角色db.grantRolesToUser( "flink", [ // 注意:這里的db指角色創(chuàng)建時(shí)的db,在admin下創(chuàng)建的角色可以包含不同database的訪問權(quán)限 { role: "flink_role", db: "admin" } ]);
          // 給角色追加權(quán)限db.grantPrivilegesToRole( "flink_role", [ { resource: { db: "inventory", collection: "orders" }, actions: [ "find", "changeStream" ] } ]);

          在開發(fā)環(huán)境和測(cè)試環(huán)境下,可以授予 read 和 readAnyDatabase 兩個(gè)內(nèi)置角色給 Flink 用戶,即可對(duì)任意集合開啟 change stream。

          use admin;db.createUser({  user: "flink",  pwd: "flinkpw",  roles: [    { role: "read", db: "admin" },    { role: "readAnyDatabase", db: "admin" }  ]});

          五、后續(xù)規(guī)劃


          • 支持增量 Snapshot

            目前,MongoDB CDC Connector 還不支持增量 Snapshot,對(duì)于數(shù)據(jù)量較大的表還不能很好發(fā)揮 Flink 并行計(jì)算的優(yōu)勢(shì)。后續(xù)將實(shí)現(xiàn) MongoDB 的增量 Snapshot 功能,使其支持 Snapshot 階段的 checkpoint,和并發(fā)度設(shè)置。

          • 支持從指定時(shí)間進(jìn)行變更訂閱


            目前,MongoDB CDC Connector 僅支持從當(dāng)前時(shí)間開始 Change Stream 的訂閱,后續(xù)將提供從指定時(shí)間點(diǎn)的 Change Stream 訂閱。

          • 支持庫(kù)和集合的篩選


            目前,MongoDB CDC Connector 支持集群、整庫(kù)的變更訂閱和篩選,但對(duì)于是否需要進(jìn)行 Snapshot 的集合的篩選還不支持,后續(xù)將完善這個(gè)功能。

          參考文檔

          [1] Duality of Streams and Tables
          [2] FLIP-95: New TableSource and TableSink interfaces
          [3] FLIP-105: Support to Interpret Changelog in Flink SQL (Introducing Debezium and Canal Format)
          [4] FLIP-149: Introduce the upsert-kafka Connector
          [5] Apache Flink 1.11.0 Release Announcement
          [6] Introduction to SQL in Flink 1.11
          [7] MongoDB Manual
          [8] MongoDB Connection String Options
          [9] MongoDB Kafka Connector



          相關(guān)文章


          Flink Forward Asia?2021?

          2022 年 1?月 8-9 日,F(xiàn)FA 2021 重磅開啟,全球?40+?多行業(yè)一線廠商,80+?干貨議題,帶來專屬于開發(fā)者的技術(shù)盛宴。


          大會(huì)官網(wǎng):
          https://flink-forward.org.cn

          大會(huì)線上觀看地址 (記得預(yù)約哦):
          https://developer.aliyun.com/special/ffa2021/live



          更多 Flink 相關(guān)技術(shù)問題,可掃碼加入社區(qū)釘釘交流群~

          ???戳我,預(yù)約 FFA 2021~

          瀏覽 159
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  免费观看黄色视频网站在线观看 | 操小逼逼 | 爱草在线视频 | 18禁福利网站 | 亚洲成人中文娱乐网 |