37 手游基于 Flink CDC + Hudi 湖倉一體方案實(shí)踐
一、Flink-CDC 2.0
Flink CDC Connectors 是 Apache Flink 的一個 source 端的連接器,目前 2.0 版本支持從 MySQL 以及 Postgres 兩種數(shù)據(jù)源中獲取數(shù)據(jù),2.1 版本社區(qū)確定會支持 Oracle,MongoDB 數(shù)據(jù)源。
Fink CDC 2.0 的核心 feature,主要表現(xiàn)為實(shí)現(xiàn)了以下三個非常重要的功能:
全程無鎖,不會對數(shù)據(jù)庫產(chǎn)生需要加鎖所帶來的風(fēng)險;
多并行度,全量數(shù)據(jù)的讀取階段支持水平擴(kuò)展,使億級別的大表可以通過加大并行度來加快讀取速度; 斷點(diǎn)續(xù)傳,全量階段支持 checkpoint,即使任務(wù)因某種原因退出了,也可通過保存的 checkpoint 對任務(wù)進(jìn)行恢復(fù)實(shí)現(xiàn)數(shù)據(jù)的斷點(diǎn)續(xù)傳。
Flink CDC Connectors: https://ververica.github.io/flink-cdc-connectors/master/index.html
二、Hudi
Apache Hudi 目前被業(yè)內(nèi)描述為圍繞數(shù)據(jù)庫內(nèi)核構(gòu)建的流式數(shù)據(jù)湖平臺 (Streaming Data Lake Platform)。
由于 Hudi 擁有良好的 Upsert 能力,并且 0.10 Master 對 Flink 版本支持至 1.13.x,因此我們選擇通過 Flink + Hudi 的方式為 37 手游的業(yè)務(wù)場景提供分鐘級 Upsert 數(shù)據(jù)的分析查詢能力。
Apache Hudi: https://hudi.apache.org/docs/overview/
三、37 手游的業(yè)務(wù)痛點(diǎn)和技術(shù)方案選型

1. 舊架構(gòu)與業(yè)務(wù)痛點(diǎn)
■ 1.1 數(shù)據(jù)實(shí)時性不夠
日志類數(shù)據(jù)通過 sqoop 每 30min 同步前 60min 數(shù)據(jù)到 Hive;
數(shù)據(jù)庫類數(shù)據(jù)通過 sqoop 每 60min 同步當(dāng)天全量數(shù)據(jù)到 Hive;
數(shù)據(jù)庫類數(shù)據(jù)通過 sqoop 每天同步前 60 天數(shù)據(jù)到 Hive。
■ 1.2 業(yè)務(wù)代碼邏輯復(fù)雜且難維護(hù)
目前 37 手游還有很多的業(yè)務(wù)開發(fā)沿用 MySQL + PHP 的開發(fā)模式,代碼邏輯復(fù)雜且很難維護(hù); 相同的代碼邏輯,往往流處理需要開發(fā)一份代碼,批處理則需要另開發(fā)一份代碼,不能復(fù)用。
■ 1.3 頻繁重刷歷史數(shù)據(jù)
頻繁地重刷歷史數(shù)據(jù)來保證數(shù)據(jù)一致。
■ 1.4 Schema 變更頻繁
由于業(yè)務(wù)需求,經(jīng)常需要添加表字段。
■ 1.5 Hive 版本低
目前 Hive 使用版本為 1.x 版本,并且升級版本比較困難; 不支持 Upsert; 不支持行級別的 delete。
2. 技術(shù)選型
四、新架構(gòu)與湖倉一體

五、Flink CDC 2.0 + Kafka + Hudi 0.10
實(shí)踐
Flink 1.13.2 .../lib/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar (修改 Master 分支的 Hudi Flink 版本為 1.13.2 然后構(gòu)建) .../lib/hadoop-mapreduce-client-core-2.7.3.jar (解決 Hudi ClassNotFoundException) ../lib/flink-sql-connector-mysql-cdc-2.0.0.jar ../lib/flink-format-changelog-json-2.0.0.jar ../lib/flink-sql-connector-kafka_2.11-1.13.2.jar
create table sy_payment_cdc (ID BIGINT,...PRIMARY KEY(ID) NOT ENFORCED) with('connector' = 'mysql-cdc','hostname' = '','port' = '','username' = '','password' = '','database-name' = '','table-name' = '','connect.timeout' = '60s','scan.incremental.snapshot.chunk.size' = '100000','server-id'='5401-5416');
create table sy_payment_cdc2kafka (ID BIGINT,...PRIMARY KEY(ID) NOT ENFORCED) with ('connector' = 'kafka','topic' = '','scan.startup.mode' = 'latest-offset','properties.bootstrap.servers' = '','properties.group.id' = '','key.format' = '','key.fields' = '','format' = 'changelog-json');create table sy_payment2hudi (ID BIGINT,...PRIMARY KEY(ID) NOT ENFORCED)PARTITIONED BY (YMD)WITH ('connector' = 'hudi','path' = 'hdfs:///data/hudi/m37_mpay_tj/sy_payment','table.type' = 'COPY_ON_WRITE','partition.default_name' = 'YMD','write.insert.drop.duplicates' = 'true','write.bulk_insert.shuffle_by_partition' = 'false','write.bulk_insert.sort_by_partition' = 'false','write.precombine.field' = 'MTIME','write.tasks' = '16','write.bucket_assign.tasks' = '16','write.task.max.size' = '','write.merge.max_memory' = '');

之前 Flink CDC 1.x 版本由于全量 snapshot 階段單并行度讀取的原因,當(dāng)時億級以上的表在全量 snapshot 讀取階段就需要耗費(fèi)很長時間,并且 checkpoint 會失敗無法保證數(shù)據(jù)的斷點(diǎn)續(xù)傳。
所以當(dāng)時入 Hudi 是采用先啟動一個 CDC 1.x 的程序?qū)⒋丝涕_始的增量數(shù)據(jù)寫入 Kafka,之后再啟動另外一個 sqoop 程序拉取當(dāng)前的所有數(shù)據(jù)至 Hive 后,通過 Flink 讀取 Hive 的數(shù)據(jù)寫 Hudi,最后再把 Kafka 的增量數(shù)據(jù)從頭消費(fèi)接回 Hudi。由于 Kafka 與 Hive 的數(shù)據(jù)存在交集,因此數(shù)據(jù)不會丟失,加上 Hudi 的 upsert 能力保證了數(shù)據(jù)唯一。
但是,這種方式的鏈路太長操作困難,如今通過 CDC 2.0 在全量 snapshot 階段支持多并行度以及 checkpoint 的能力,確實(shí)大大降低了架構(gòu)的復(fù)雜度。
2. 數(shù)據(jù)比對
由于生產(chǎn)環(huán)境用的是 Hive1.x,Hudi 對于 1.x 還不支持?jǐn)?shù)據(jù)同步,所以通過創(chuàng)建 Hive 外部表的方式進(jìn)行查詢,如果是 Hive2.x 以上版本,可參考 Hive 同步章節(jié); 創(chuàng)建 Hive 外部表 + 預(yù)創(chuàng)建分區(qū); auxlib 文件夾添加 Hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar。
CREATE EXTERNAL TABLE m37_mpay_tj.`ods_sy_payment_f_d_b_ext`(`_hoodie_commit_time` string,`_hoodie_commit_seqno` string,`_hoodie_record_key` string,`_hoodie_partition_path` string,`_hoodie_file_name` string,`ID` bigint,...)PARTITIONED BY (`dt` string)ROW FORMAT SERDE'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS INPUTFORMAT'org.apache.hudi.hadoop.HoodieParquetInputFormat'OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'LOCATION'hdfs:///data/hudi/m37_mpay_tj/sy_payment'
總數(shù)一致; 按天分組統(tǒng)計數(shù)量一致; 按天分組統(tǒng)計金額一致。
Hive 同步章節(jié):
https://www.yuque.com/docs/share/01c98494-a980-414c-9c45-152023bf3c17?#IsoNU
六、總結(jié)
Hudi 提供了 Upsert 能力,解決頻繁 Upsert/Delete 的痛點(diǎn); 提供分鐘級的數(shù)據(jù),比傳統(tǒng)數(shù)倉有更高的時效性; 基于 Flink-SQL 實(shí)現(xiàn)了流批一體,代碼維護(hù)成本低; 數(shù)據(jù)同源、同計算引擎、同存儲、同計算口徑; 選用 Flink CDC 作為數(shù)據(jù)同步工具,省掉 sqoop 的維護(hù)成本。
