<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          37 手游基于 Flink CDC + Hudi 湖倉一體方案實(shí)踐

          共 6587字,需瀏覽 14分鐘

           ·

          2021-09-21 20:26


          一、Flink-CDC 2.0



          Flink CDC Connectors 是 Apache Flink 的一個 source 端的連接器,目前 2.0 版本支持從 MySQL 以及 Postgres 兩種數(shù)據(jù)源中獲取數(shù)據(jù),2.1 版本社區(qū)確定會支持 Oracle,MongoDB 數(shù)據(jù)源。




          Fink CDC 2.0 的核心 feature,主要表現(xiàn)為實(shí)現(xiàn)了以下三個非常重要的功能:


          • 全程無鎖,不會對數(shù)據(jù)庫產(chǎn)生需要加鎖所帶來的風(fēng)險;


          • 多并行度,全量數(shù)據(jù)的讀取階段支持水平擴(kuò)展,使億級別的大表可以通過加大并行度來加快讀取速度;

          • 斷點(diǎn)續(xù)傳,全量階段支持 checkpoint,即使任務(wù)因某種原因退出了,也可通過保存的 checkpoint 對任務(wù)進(jìn)行恢復(fù)實(shí)現(xiàn)數(shù)據(jù)的斷點(diǎn)續(xù)傳。



          Flink CDC Connectors:
          https://ververica.github.io/flink-cdc-connectors/master/index.html

          二、Hudi



          Apache Hudi 目前被業(yè)內(nèi)描述為圍繞數(shù)據(jù)庫內(nèi)核構(gòu)建的流式數(shù)據(jù)湖平臺 (Streaming Data Lake Platform)。


          由于 Hudi 擁有良好的 Upsert 能力,并且 0.10 Master 對 Flink 版本支持至 1.13.x,因此我們選擇通過 Flink + Hudi 的方式為 37 手游的業(yè)務(wù)場景提供分鐘級 Upsert 數(shù)據(jù)的分析查詢能力。


          Apache Hudi:
          https://hudi.apache.org/docs/overview/



          三、37 手游的業(yè)務(wù)痛點(diǎn)和技術(shù)方案選型




          1. 舊架構(gòu)與業(yè)務(wù)痛點(diǎn)


          ■ 1.1 數(shù)據(jù)實(shí)時性不夠


          • 日志類數(shù)據(jù)通過 sqoop 每 30min 同步前 60min 數(shù)據(jù)到 Hive;


          • 數(shù)據(jù)庫類數(shù)據(jù)通過 sqoop 每 60min 同步當(dāng)天全量數(shù)據(jù)到 Hive;


          • 數(shù)據(jù)庫類數(shù)據(jù)通過 sqoop 每天同步前 60 天數(shù)據(jù)到 Hive。




          ■ 1.2 業(yè)務(wù)代碼邏輯復(fù)雜且難維護(hù)


          • 目前 37 手游還有很多的業(yè)務(wù)開發(fā)沿用 MySQL + PHP 的開發(fā)模式,代碼邏輯復(fù)雜且很難維護(hù);

          • 相同的代碼邏輯,往往流處理需要開發(fā)一份代碼,批處理則需要另開發(fā)一份代碼,不能復(fù)用。




          ■ 1.3 頻繁重刷歷史數(shù)據(jù)




          • 頻繁地重刷歷史數(shù)據(jù)來保證數(shù)據(jù)一致。




          ■ 1.4 Schema 變更頻繁


          • 由于業(yè)務(wù)需求,經(jīng)常需要添加表字段。




          ■ 1.5 Hive 版本低




          • 目前 Hive 使用版本為 1.x 版本,并且升級版本比較困難;

          • 不支持 Upsert;

          • 不支持行級別的 delete。



          由于 37 手游的業(yè)務(wù)場景,數(shù)據(jù) upsert、delete 是個很常見的需求。所以基于 Hive 數(shù)倉的架構(gòu)對業(yè)務(wù)需求的滿足度不夠。



          2. 技術(shù)選型



          在同步工具的選型上考慮過 Canal 和 Maxwell。但 Canal 只適合增量數(shù)據(jù)的同步并且需要部署,維護(hù)起來相對較重。而 Maxwell 雖然比較輕量,但與 Canal 一樣需要配合 Kafka 等消息隊(duì)列使用。對比之下,F(xiàn)link CDC 可以通過配置 Flink connector 的方式基于 Flink-SQL 進(jìn)行使用,十分輕巧,并且完美契合基于 Flink-SQL 的流批一體架構(gòu)。

          在存儲引擎的選型上,目前最熱門的數(shù)據(jù)湖產(chǎn)品當(dāng)屬:Apache Hudi,Apache Iceberg 和 DeltaLake,這些在我們的場景下各有優(yōu)劣。最終,基于 Hudi 對上下游生態(tài)的開放、對全局索引的支持、對 Flink 1.13 版本的支持,以及對 Hive 版本的兼容性 (Iceberg 不支持 Hive1.x 的版本) 等原因,選擇了 Hudi 作為湖倉一體和流批一體的存儲引擎。

          針對上述存在的業(yè)務(wù)痛點(diǎn)以及選型對比,我們的最終方案為:以 Flink1.13.2 作為計算引擎,依靠 Flink 提供的流批統(tǒng)一的 API,基于 Flink-SQL 實(shí)現(xiàn)流批一體,F(xiàn)link-CDC 2.0 作為 ODS 層的數(shù)據(jù)同步工具以及 Hudi-0.10 Master 作為存儲引擎的湖倉一體,解決維護(hù)兩套代碼的業(yè)務(wù)痛點(diǎn)。




          四、新架構(gòu)與湖倉一體


          37 手游的湖倉一體方案,是 37 手游流批一體架構(gòu)的一部分。通過湖倉一體、流批一體,準(zhǔn)實(shí)時場景下做到了:數(shù)據(jù)同源、同計算引擎、同存儲、同計算口徑。數(shù)據(jù)的時效性可以到分鐘級,能很好的滿足業(yè)務(wù)準(zhǔn)實(shí)時數(shù)倉的需求。下面是架構(gòu)圖:





          MySQL 數(shù)據(jù)通過 Flink CDC 進(jìn)入到 Kafka。之所以數(shù)據(jù)先入 Kafka 而不是直接入 Hudi,是為了實(shí)現(xiàn)多個實(shí)時任務(wù)復(fù)用 MySQL 過來的數(shù)據(jù),避免多個任務(wù)通過 Flink CDC 接 MySQL 表以及 Binlog,對 MySQL 庫的性能造成影響。

          通過 CDC 進(jìn)入到 Kafka 的數(shù)據(jù)除了落一份到離線數(shù)據(jù)倉庫的 ODS 層之外,會同時按照實(shí)時數(shù)據(jù)倉庫的鏈路,從 ODS->DWD->DWS->OLAP 數(shù)據(jù)庫,最后供報表等數(shù)據(jù)服務(wù)使用。實(shí)時數(shù)倉的每一層結(jié)果數(shù)據(jù)會準(zhǔn)實(shí)時的落一份到離線數(shù)倉,通過這種方式做到程序一次開發(fā)、指標(biāo)口徑統(tǒng)一,數(shù)據(jù)統(tǒng)一。

          從架構(gòu)圖上,可以看到有一步數(shù)據(jù)修正 (重跑歷史數(shù)據(jù)) 的動作,之所以有這一步是考慮到:有可能存在由于口徑調(diào)整或者前一天的實(shí)時任務(wù)計算結(jié)果錯誤,導(dǎo)致重跑歷史數(shù)據(jù)的情況。

          而存儲在 Kafka 的數(shù)據(jù)有失效時間,不會存太久的歷史數(shù)據(jù),重跑很久的歷史數(shù)據(jù)無法從 Kafka 中獲取歷史源數(shù)據(jù)。再者,如果把大量的歷史數(shù)據(jù)再一次推到 Kafka,走實(shí)時計算的鏈路來修正歷史數(shù)據(jù),可能會影響當(dāng)天的實(shí)時作業(yè)。所以針對重跑歷史數(shù)據(jù),會通過數(shù)據(jù)修正這一步來處理。

          總體上說,37 手游的數(shù)據(jù)倉庫屬于 Lambda 和 Kappa 混搭的架構(gòu)。流批一體數(shù)據(jù)倉庫的各個數(shù)據(jù)鏈路有數(shù)據(jù)質(zhì)量校驗(yàn)的流程。第二天對前一天的數(shù)據(jù)進(jìn)行對賬,如果前一天實(shí)時計算的數(shù)據(jù)無異常,則不需要修正數(shù)據(jù),Kappa 架構(gòu)已經(jīng)足夠。




          五、Flink CDC 2.0 + Kafka + Hudi 0.10 

          實(shí)踐


          1. 環(huán)境準(zhǔn)備


          • Flink 1.13.2

          • .../lib/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar (修改 Master 分支的 Hudi Flink 版本為 1.13.2 然后構(gòu)建)

          • .../lib/hadoop-mapreduce-client-core-2.7.3.jar (解決 Hudi ClassNotFoundException)

          • ../lib/flink-sql-connector-mysql-cdc-2.0.0.jar

          • ../lib/flink-format-changelog-json-2.0.0.jar

          • ../lib/flink-sql-connector-kafka_2.11-1.13.2.jar



          source 端 MySQL-CDC 表定義:


          create table sy_payment_cdc (  ID BIGINT,  ...  PRIMARY KEY(ID) NOT ENFORCED) with(  'connector' = 'mysql-cdc',  'hostname' = '',  'port' = '',  'username' = '',  'password' = '',  'database-name' = '',  'table-name' = '',  'connect.timeout' = '60s',  'scan.incremental.snapshot.chunk.size' = '100000',  'server-id'='5401-5416');



          值得注意的是:scan.incremental.snapshot.chunk.size 參數(shù)需要根據(jù)實(shí)際情況來配置,如果表數(shù)據(jù)量不大,使用默認(rèn)值即可。

          Sink 端 Kafka+Hudi COW 表定義:


          create table sy_payment_cdc2kafka (  ID BIGINT,  ...  PRIMARY KEY(ID) NOT ENFORCED) with (  'connector' = 'kafka',  'topic' = '',  'scan.startup.mode' = 'latest-offset',  'properties.bootstrap.servers' = '',  'properties.group.id' = '',  'key.format' = '',  'key.fields' = '',  'format' = 'changelog-json');
          create table sy_payment2hudi ( ID BIGINT, ... PRIMARY KEY(ID) NOT ENFORCED)PARTITIONED BY (YMD)WITH ( 'connector' = 'hudi', 'path' = 'hdfs:///data/hudi/m37_mpay_tj/sy_payment', 'table.type' = 'COPY_ON_WRITE', 'partition.default_name' = 'YMD', 'write.insert.drop.duplicates' = 'true', 'write.bulk_insert.shuffle_by_partition' = 'false', 'write.bulk_insert.sort_by_partition' = 'false', 'write.precombine.field' = 'MTIME', 'write.tasks' = '16', 'write.bucket_assign.tasks' = '16', 'write.task.max.size' = '', 'write.merge.max_memory' = '');



          針對歷史數(shù)據(jù)入 Hudi,可以選擇離線 bulk_insert 的方式入湖,再通過 Load Index Bootstrap 加載數(shù)據(jù)后接回增量數(shù)據(jù)。bulk_insert 方式入湖數(shù)據(jù)的唯一性依靠源端的數(shù)據(jù)本身,在接回增量數(shù)據(jù)時也需要做到保證數(shù)據(jù)不丟失。

          這里我們選擇更為簡單的調(diào)整任務(wù)資源的方式將歷史數(shù)據(jù)入湖。依靠 Flink 的 checkpoint 機(jī)制,不管是 CDC 2.0 入 Kafka 期間還是 Kafka 入 Hudi 期間,都可以通過指定 checkpoint 的方式對任務(wù)進(jìn)行重啟并且數(shù)據(jù)不會丟失。

          我們可以在配置 CDC 2.0 入 Kafka,Kafka 入 Hudi 任務(wù)時調(diào)大內(nèi)存并配置多個并行度,加快歷史數(shù)據(jù)入湖,等到所有歷史數(shù)據(jù)入湖后,再相應(yīng)的調(diào)小入湖任務(wù)的內(nèi)存配置并且將 CDC 入 Kafka 的并行度設(shè)置為 1,因?yàn)樵隽侩A段 CDC 是單并行度,然后指定 checkpoint 重啟任務(wù)。

          按照上面表定義的參數(shù)配置,配置 16 個并行度,F(xiàn)link TaskManager 內(nèi)存大小為 50G 的情況下,單表 15 億歷史數(shù)據(jù)入至 Hudi COW 表實(shí)際用時 10 小時,單表 9 億數(shù)據(jù)入至 Hudi COW 表實(shí)際用時 6 小時。當(dāng)然這個耗時很大一部分是 COW 寫放大的特性,在大數(shù)據(jù)量的 upsert 模式下耗時較多。

          目前我們的集群由 200 多臺機(jī)器組成,在線的流計算任務(wù)總數(shù)有 200 多,總數(shù)據(jù)量接近 2PB。

          如果集群資源很有限的情況下,可以根據(jù)實(shí)際情況調(diào)整 Hudi 表以及 Flink 任務(wù)的內(nèi)存配置,還可以通過配置 Hudi 的限流參數(shù) write.rate.limit 讓歷史數(shù)據(jù)緩慢入湖。




          之前 Flink CDC 1.x 版本由于全量 snapshot 階段單并行度讀取的原因,當(dāng)時億級以上的表在全量 snapshot 讀取階段就需要耗費(fèi)很長時間,并且 checkpoint 會失敗無法保證數(shù)據(jù)的斷點(diǎn)續(xù)傳。


          所以當(dāng)時入 Hudi 是采用先啟動一個 CDC 1.x 的程序?qū)⒋丝涕_始的增量數(shù)據(jù)寫入 Kafka,之后再啟動另外一個 sqoop 程序拉取當(dāng)前的所有數(shù)據(jù)至 Hive 后,通過 Flink 讀取 Hive 的數(shù)據(jù)寫 Hudi,最后再把 Kafka 的增量數(shù)據(jù)從頭消費(fèi)接回 Hudi。由于 Kafka 與 Hive 的數(shù)據(jù)存在交集,因此數(shù)據(jù)不會丟失,加上 Hudi 的 upsert 能力保證了數(shù)據(jù)唯一。


          但是,這種方式的鏈路太長操作困難,如今通過 CDC 2.0 在全量 snapshot 階段支持多并行度以及 checkpoint 的能力,確實(shí)大大降低了架構(gòu)的復(fù)雜度。


          2. 數(shù)據(jù)比對


          • 由于生產(chǎn)環(huán)境用的是 Hive1.x,Hudi 對于 1.x 還不支持?jǐn)?shù)據(jù)同步,所以通過創(chuàng)建 Hive 外部表的方式進(jìn)行查詢,如果是 Hive2.x 以上版本,可參考 Hive 同步章節(jié);

          • 創(chuàng)建 Hive 外部表 + 預(yù)創(chuàng)建分區(qū);

          • auxlib 文件夾添加 Hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar。


          CREATE EXTERNAL TABLE m37_mpay_tj.`ods_sy_payment_f_d_b_ext`(  `_hoodie_commit_time` string,  `_hoodie_commit_seqno` string,  `_hoodie_record_key` string,  `_hoodie_partition_path` string,  `_hoodie_file_name` string,  `ID` bigint,  ...  )PARTITIONED BY (  `dt` string)ROW FORMAT SERDE  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS INPUTFORMAT  'org.apache.hudi.hadoop.HoodieParquetInputFormat'OUTPUTFORMAT  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'LOCATION  'hdfs:///data/hudi/m37_mpay_tj/sy_payment'



          最終查詢 Hudi 數(shù)據(jù) (Hive 外部表的形式) 與原來 sqoop 同步的 Hive 數(shù)據(jù)做比對得到:


          • 總數(shù)一致;

          • 按天分組統(tǒng)計數(shù)量一致;

          • 按天分組統(tǒng)計金額一致。


          Hive 同步章節(jié):

          https://www.yuque.com/docs/share/01c98494-a980-414c-9c45-152023bf3c17?#IsoNU



          六、總結(jié)




          湖倉一體以及流批一體架構(gòu)對比傳統(tǒng)數(shù)倉架構(gòu)主要有以下幾點(diǎn)好處:


          • Hudi 提供了 Upsert 能力,解決頻繁 Upsert/Delete 的痛點(diǎn);

          • 提供分鐘級的數(shù)據(jù),比傳統(tǒng)數(shù)倉有更高的時效性;

          • 基于 Flink-SQL 實(shí)現(xiàn)了流批一體,代碼維護(hù)成本低;

          • 數(shù)據(jù)同源、同計算引擎、同存儲、同計算口徑;

          • 選用 Flink CDC 作為數(shù)據(jù)同步工具,省掉 sqoop 的維護(hù)成本。



          最后針對頻繁增加表字段的痛點(diǎn)需求,并且希望后續(xù)同步下游系統(tǒng)的時候能夠自動加入這個字段,目前還沒有完美的解決方案,希望 Flink CDC 社區(qū)能在后續(xù)的版本提供 Schema Evolution 的支持。
          瀏覽 70
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  免费直接观看的黄色 | 人人艹在线观看 | 91丨牛牛丨国产人妻 | 亚洲sv视频| 欧美激情亚洲色图 |