Hudi 實(shí)踐 | 客路旅行基于 Apache Hudi 的數(shù)據(jù)湖實(shí)踐
1. 業(yè)務(wù)背景介紹
客路旅行(KLOOK)是一家專注于境外目的地旅游資源整合的在線旅行平臺(tái),提供景點(diǎn)門票、一日游、特色體驗(yàn)、當(dāng)?shù)亟煌ㄅc美食預(yù)訂服務(wù)。覆蓋全球100個(gè)國(guó)家及地區(qū),支持12種語(yǔ)言和41種貨幣的支付系統(tǒng),與超過(guò)10000家商戶合作伙伴緊密合作,為全球旅行者提供10萬(wàn)多種旅行體驗(yàn)預(yù)訂服務(wù)。KLOOK數(shù)倉(cāng)RDS數(shù)據(jù)同步是一個(gè)很典型的互聯(lián)網(wǎng)電商公司數(shù)倉(cāng)接入層的需求。對(duì)于公司數(shù)倉(cāng),約60%以上的數(shù)據(jù)直接來(lái)源與業(yè)務(wù)數(shù)據(jù)庫(kù),數(shù)據(jù)庫(kù)有很大一部分為托管的AWS RDS-MYSQL 數(shù)據(jù)庫(kù),有超100+數(shù)據(jù)庫(kù)/實(shí)例。RDS直接通過(guò)來(lái)的數(shù)據(jù)通過(guò)標(biāo)準(zhǔn)化清洗即作為數(shù)倉(cāng)的ODS層,公司之前使用第三方商業(yè)工具進(jìn)行同步,限制為每隔8小時(shí)的數(shù)據(jù)同步,無(wú)法滿足公司業(yè)務(wù)對(duì)數(shù)據(jù)時(shí)效性的要求,數(shù)據(jù)團(tuán)隊(duì)在進(jìn)行調(diào)研及一系列poc驗(yàn)證后,最后我們選擇Debezium+Kafka+Flink+Hudi的ods層pipeline方案,數(shù)據(jù)秒級(jí)入湖,后續(xù)數(shù)倉(cāng)可基于近實(shí)時(shí)的ODS層做更多的業(yè)務(wù)場(chǎng)景需求。
2. 架構(gòu)改進(jìn)
2.1 改造前架構(gòu)

整體依賴于第三服務(wù),通過(guò)Google alooma進(jìn)行RDS全量增量數(shù)據(jù)同步,每隔8小時(shí)進(jìn)行raw table的consolidation,后續(xù)使用data flow 每24小時(shí)進(jìn)行刷入數(shù)倉(cāng)ODS層
2.2 新架構(gòu)

1. 使用AWS DMS 數(shù)據(jù)遷移工具,將全量RDS Mysql 數(shù)據(jù)同步至S3存儲(chǔ)中;
2. 通過(guò)Flink SQL Batch 作業(yè)將S3數(shù)據(jù)批量寫入Hudi 表;
3.?建立Debeizum MySQL binlog 訂閱任務(wù),將binlog 數(shù)據(jù)實(shí)時(shí)同步至Kafka;
4. 通過(guò)Flink SQL 啟動(dòng)兩個(gè)流作業(yè),一個(gè)將數(shù)據(jù)實(shí)時(shí)寫入Hudi,另一個(gè)作業(yè)將數(shù)據(jù)追加寫入到S3,S3 binlog文件保存30天,以備數(shù)據(jù)回溯使用;
5. 通過(guò)hive-hudi meta data sync tools,同步hudi catalog數(shù)據(jù)至Hive,通過(guò)Hive/Trino提供OLAP數(shù)據(jù)查詢。
2.3 新架構(gòu)收益
??數(shù)據(jù)使用及開發(fā)靈活度提升,地方放同步服務(wù)限制明顯,改進(jìn)后的架構(gòu)易于擴(kuò)展,并可以提供實(shí)時(shí)同步數(shù)據(jù)供其它業(yè)務(wù)使用;
??數(shù)據(jù)延遲問題得到解決,基于Flink on Hudi 的實(shí)時(shí)數(shù)據(jù)寫入,對(duì)于RDS數(shù)據(jù)攝入數(shù)倉(cāng)可以縮短至分鐘甚至秒級(jí),對(duì)于一些庫(kù)存、風(fēng)控、訂單類的數(shù)據(jù)可以更快的進(jìn)行數(shù)據(jù)取數(shù)分析,整體從原來(lái)近8小時(shí)的consolidation縮減至5分鐘;
??成本更加可控,基于Flink on Hudi存算分離的架構(gòu),可以有效通過(guò)控制對(duì)數(shù)據(jù)同步計(jì)算處理資源配額、同步刷新數(shù)據(jù)表落盤時(shí)間、數(shù)據(jù)存儲(chǔ)冷熱歸檔等進(jìn)行成本控制,與第三方服務(wù)成本整體對(duì)比預(yù)計(jì)可以縮減40%。
3. 實(shí)踐要點(diǎn)
3.1 Debezium 增量Binlog同步配置
Kafka connect 關(guān)鍵配置信息
bootstrap.servers=localhost:9092
#?unique?name?for?the?cluster,?used?in?forming?the?Connect?cluster?group.?Note?that?this?must?not?conflict?with?consumer?group?IDs
group.id=connect-cluster
#?The?converters?specify?the?format?of?data?in?Kafka?and?how?to?translate?it?into?Connect?data.?Every?Connect?user?will
#?need?to?configure?these?based?on?the?format?they?want?their?data?in?when?loaded?from?or?stored?into?Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
#?Converter-specific?settings?can?be?passed?in?by?prefixing?the?Converter's?setting?with?the?converter?we?want?to?apply
key.converter.schemas.enable=true
value.converter.schemas.enable=true
#?Topic?to?use?for?storing?offsets.?This?topic?should?have?many?partitions?and?be?replicated?and?compacted.
#?Kafka?Connect?will?attempt?to?create?the?topic?automatically?when?needed,?but?you?can?always?manually?create
#?the?topic?before?starting?Kafka?Connect?if?a?specific?topic?configuration?is?needed.
#?Most?users?will?want?to?use?the?built-in?default?replication?factor?of?3?or?in?some?cases?even?specify?a?larger?value.
#?Since?this?means?there?must?be?at?least?as?many?brokers?as?the?maximum?replication?factor?used,?we'd?like?to?be?able
#?to?run?this?example?on?a?single-broker?cluster?and?so?here?we?instead?set?the?replication?factor?to?1.
offset.storage.topic=connect-offsets
#?Topic?to?use?for?storing?connector?and?task?configurations;?note?that?this?should?be?a?single?partition,?highly?replicated,
#?and?compacted?topic.?Kafka?Connect?will?attempt?to?create?the?topic?automatically?when?needed,?but?you?can?always?manually?create
#?the?topic?before?starting?Kafka?Connect?if?a?specific?topic?configuration?is?needed.
#?Most?users?will?want?to?use?the?built-in?default?replication?factor?of?3?or?in?some?cases?even?specify?a?larger?value.
#?Since?this?means?there?must?be?at?least?as?many?brokers?as?the?maximum?replication?factor?used,?we'd?like?to?be?able
#?to?run?this?example?on?a?single-broker?cluster?and?so?here?we?instead?set?the?replication?factor?to?1.
config.storage.topic=connect-configs
#?Topic?to?use?for?storing?statuses.?This?topic?can?have?multiple?partitions?and?should?be?replicated?and?compacted.
#?Kafka?Connect?will?attempt?to?create?the?topic?automatically?when?needed,?but?you?can?always?manually?create
#?the?topic?before?starting?Kafka?Connect?if?a?specific?topic?configuration?is?needed.
#?Most?users?will?want?to?use?the?built-in?default?replication?factor?of?3?or?in?some?cases?even?specify?a?larger?value.
#?Since?this?means?there?must?be?at?least?as?many?brokers?as?the?maximum?replication?factor?used,?we'd?like?to?be?able
#?to?run?this?example?on?a?single-broker?cluster?and?so?here?we?instead?set?the?replication?factor?to?1.
status.storage.topic=connect-status查詢 MySQL 最近binlog file 信息
SQL
MySQL?[(none)]>?show?binary?logs;
|?mysql-bin-changelog.094531?|????176317?|
|?mysql-bin-changelog.094532?|????191443?|
|?mysql-bin-changelog.094533?|???1102466?|
|?mysql-bin-changelog.094534?|????273347?|
|?mysql-bin-changelog.094535?|????141555?|
|?mysql-bin-changelog.094536?|??????4808?|
|?mysql-bin-changelog.094537?|????146217?|
|?mysql-bin-changelog.094538?|?????29607?|
|?mysql-bin-changelog.094539?|????141260?|
+----------------------------+-----------+
MySQL?[(none)]>?show?binlog?events?in?'mysql-bin-changelog.094539';
MySQL?[(none)]>?show?binlog?events?in?'mysql-bin-changelog.094539'?limit?10;
+----------------------------+-----+----------------+------------+-------------+---------------------------------------------------------------------------+
|?Log_name???????????????????|?Pos?|?Event_type?????|?Server_id??|?End_log_pos?|?Info??????????????????????????????????????????????????????????????????????|
+----------------------------+-----+----------------+------------+-------------+---------------------------------------------------------------------------+
|?mysql-bin-changelog.094539?|???4?|?Format_desc????|?1399745413?|?????????123?|?Server?ver:?5.7.31-log,?Binlog?ver:?4?????????????????????????????????????|
|?mysql-bin-changelog.094539?|?123?|?Previous_gtids?|?1399745413?|?????????194?|?90710e1c-f699-11ea-85c0-0ec6a6bed381:1-108842347??????????????????????????|指定server name key 發(fā)送offset 記錄到offset.storage.topic
$?./bin/kafka-console-producer.sh?-bootstrap-server?localhost:9092?--topic??connect-offsets?--property?"parse.key=true"?--property?"key.separator=>"
$>["test_servername",{"server":"test_servername"}]>{"ts_sec":1647845014,"file":"mysql-bin-changelog.007051","pos":74121553,"row":1,"server_id":1404217221,"event":2}編輯task api 請(qǐng)求,啟動(dòng)debezium task
{
????"name":"test_servername",
????"config":{
????????"connector.class":"io.debezium.connector.mysql.MySqlConnector",
????????"snapshot.locking.mode":"none",
????????"database.user":"db_user",
????????"transforms.Reroute.type":"io.debezium.transforms.ByLogicalTableRouter",
????????"database.server.id":"1820615119",
????????"database.history.kafka.bootstrap.servers":"localhost:9092",
????????"database.history.kafka.topic":"history-topic",
????????"inconsistent.schema.handling.mode":"skip",
????????"transforms":"Reroute",?//?配置binlog數(shù)據(jù)轉(zhuǎn)發(fā)到一個(gè)topic,默認(rèn)一個(gè)表一個(gè)topic
????????"database.server.name":"test_servername",
????????"transforms.Reroute.topic.regex":"test_servername(.*)",
????????"database.port":"3306",
????????"include.schema.changes":"true",
????????"transforms.Reroute.topic.replacement":"binlog_data_topic",
????????"table.exclude.list":"table_test",
????????"database.hostname":"host",
????????"database.password":"******",
????????"name":"test_servername",
????????"database.whitelist":"test_db",
????????"database.include.list":"test_db",
????????"snapshot.mode":"schema_only_recovery"??//?使用recovery模式從指定binlog文件的offset同步
????}
}
3.2 Hudi 全量接增量數(shù)據(jù)寫入
在已經(jīng)有全量數(shù)據(jù)在Hudi表的場(chǎng)景中,后續(xù)從kafka消費(fèi)的binlog數(shù)據(jù)需要增量upsert到Hudi表。debezium的binlog格式攜帶每條數(shù)據(jù)更新的信息,需要將其解析為可直接插入的數(shù)據(jù)。
示例解析生成Flink SQL的Python代碼
#?寫入數(shù)據(jù)到ODS?Raw表
insert_hudi_raw_query?=?'''
INSERT?INTO?
{0}_ods_raw.{1}
SELECT?
{2}
FROM?
{0}_debezium_kafka.kafka_rds_{1}_log
WHERE?
REGEXP(GET_JSON_OBJECT(payload,?'$.source.table'),?'^{3}$')?
AND?GET_JSON_OBJECT(payload,?'$.source.db')?=?'{4}'?
AND?IF(GET_JSON_OBJECT(payload,?\'$.op\')?=?\'d\',?GET_JSON_OBJECT(payload,?\'$.before.{5}\'),?GET_JSON_OBJECT(payload,?\'$.after.{5}\'))?IS?NOT?NULL
AND?GET_JSON_OBJECT(payload,?'$.op')?IN?('d',?'c',?'u')
'''.format(
????database_name,?
????table_name,?
????hudi_schema,?
????mysql_table_name,?
????mysql_database_name,
????primary_key
)如上對(duì)Debezium的三種binlog數(shù)據(jù)進(jìn)行解析,我們將insert及update的數(shù)據(jù)只取after后的數(shù)據(jù),對(duì)于delete,我們追加一個(gè)硬刪除字段標(biāo)記進(jìn)行插入,Hudi則會(huì)自動(dòng)去重。在這里為了保證增量更新的hudi數(shù)據(jù)不重復(fù),需要開啟index bootstrap功能。
Hudi配置參數(shù)
| 名稱 | Required | 默認(rèn)值 | 說(shuō)明 |
| index.bootstrap.enabled | true | false | 開啟索引加載,會(huì)將已存表的最新數(shù)據(jù)一次性加載到 state 中 |
| index.partition.regex | false | * | 設(shè)置正則表達(dá)式進(jìn)行分區(qū)篩選,默認(rèn)為加載全部分區(qū) |
1.?CREATE TABLE 創(chuàng)建和 Hoodie 表對(duì)應(yīng)的語(yǔ)句,注意 table type 要正確
2.?設(shè)置 index.bootstrap.enabled = true開啟索引加載功能
3.?索引加載為并發(fā)加載,根據(jù)數(shù)據(jù)量大小加載時(shí)間不同,可以在log中搜索finish loading the index under partition 和 Load records from file 日志來(lái)觀察索引加載進(jìn)度
4.?重啟任務(wù)將 index.bootstrap.enabled 關(guān)閉,參數(shù)配置到合適的大小,如果RowDataToHoodieFunction 和 BootstrapFunction 并發(fā)不同,可以重啟避免 shuffle
3.3 Hudi同步Metastore自定義分區(qū)格式改寫
Hudi 提供了HIVE Sync Tool https://hudi.apache.org/docs/syncing_metastore 用來(lái)將Hudi的meta data 同步至Hive 進(jìn)行查詢,同時(shí) PrestoDB / Trino 可以直接通過(guò)配置Hive的catalog信息實(shí)現(xiàn)Hudi表的秒級(jí)查詢。但目前HiveSyncTool 僅自帶支持幾種格式的Hudi partion ,源碼位置如下位置:

如果要同步的hudi表沒有分區(qū),或者符合hive 的’yyyy-MM-dd’ / ‘yyyy-MM-dd-HH’ 分區(qū)格式,可以直接使用參數(shù)--partition-value-extractor 指定到Non/SlashEncodedDayPartitionValueExtractor/SlashEncodedHourPartitionValueExtractor 進(jìn)行同步,如下命令:
sh??run_sync_tool.sh??--jdbc-url?jdbc:hive2:\/\xxxx:10000?--user?hive?--pass?hive?--partitioned-by?partition?--partition-value-extractor??org.apache.hudi.hive.SlashEncodedHourPartitionValueExtractor?--base-path?s3://xxx/raw/order_business_db/ord_basics??--auto-create-database??--database?order_business_db_ods_raw_hive_sync??--table?ord_basics但存在分區(qū)不滿足上述格式,如果使用non分區(qū)同步,則會(huì)出現(xiàn)查詢不到數(shù)據(jù)的問題,這個(gè)時(shí)候需要自己實(shí)現(xiàn)一個(gè)Extractor,實(shí)現(xiàn)代碼位于package org.apache.hudi.hive,繼承 PartitionValueExtractor 定義 SlashEncodedHourPartitionValueExtractor 實(shí)現(xiàn)extractPartitionValuesInPath 方法,代碼片段如下,實(shí)現(xiàn)格式 dd-MM-yy,代碼片段截取如下:

然后重新打包,執(zhí)行如下命令,隨后在PrestoDB/Hive/Trino 均可直接進(jìn)行查詢。
sh??run_sync_tool.sh??--jdbc-url?jdbc:hive2:\/\/xxxx10000?--user?hive?--pass?hive?--partitioned-by?partition?--partition-value-extractor??org.apache.hudi.hive.KlookEncodedDayPartitionValueExtractor?--base-path?s3://xxxx/raw/order_business_db/ord_basics??--auto-create-database??--database?order_business_db_ods_raw_hive_sync??--table?ord_basicsAWS ?EMR ?上需要注意的:
??找不到log4j 修改run_sync_tool.sh HADOOP_HIVE_JARS=${HIVE_JARS}:${HADOOP_HOME}/:${HADOOP_HOME}/lib/:/usr/lib/hadoop-hdfs/:/usr/lib/hadoop-mapreduce/:/usr/share/aws/emr/emrfs/lib/:/usr/share/aws/emr/emrfs/auxlib/:${GLUE_JARS}
??找不到libfb修改 java -cp $HUDI_HIVE_UBER_JAR:${HADOOP_HIVE_JARS}:${HIVE_CONF_DIR}:${HADOOP_CONF_DIR}:${EMRFS_CONF_DIR}:/usr/lib/hudi/cli/lib/libfb303-0.9.3.jar org.apache.hudi.hive.HiveSyncTool "$@"
4. 經(jīng)驗(yàn)總結(jié)
??當(dāng)前整體RDS數(shù)據(jù)同步解決了對(duì)數(shù)據(jù)時(shí)效性及靈活擴(kuò)展性的業(yè)務(wù)需求,但如上述,數(shù)據(jù)鏈路較長(zhǎng)帶來(lái)大量手動(dòng)操作。因此,我們做了一些流程自動(dòng)化的工作,使用Airflow 將DMS全量同步S3,S3同步Hudi的Flink 批作業(yè)進(jìn)行自動(dòng)調(diào)度觸發(fā),使得我們填寫簡(jiǎn)單數(shù)據(jù)庫(kù)同步參數(shù)就可完成一個(gè)鏈路的數(shù)據(jù)入湖。對(duì)于增量Debezium 數(shù)據(jù)同步,我們也通過(guò)編寫一些腳本,在啟動(dòng)Flink Stream SQL作業(yè)時(shí),同步拉取最新MySQL schema,生成解析binlog數(shù)據(jù)的SQL ,進(jìn)行自動(dòng)任務(wù)提交。
??在穩(wěn)定性方面,當(dāng)前主要考慮增量流作業(yè)的穩(wěn)定性,我們從kafka備份了binlog原始數(shù)據(jù),這些數(shù)據(jù)會(huì)在S3保存30天,如果出現(xiàn)流作業(yè)寫入Hudi異常,我們可以很快跑一個(gè)批任務(wù)將數(shù)據(jù)回溯。
??該方案運(yùn)行近一年時(shí)間,期間Hudi版本快速迭代fix很多問題,例如前期Hudi在增量接全量時(shí)開啟index后,必須一次將index緩存在state,index階段為了提升速度,我們?cè)O(shè)置了較大的并行度資源,需要人工值守等待一個(gè)checkpoint周期然后調(diào)低。初期,咨詢社區(qū)后,提出了全量也使用流讀等方式,避免增加改表參數(shù)的問題,后續(xù)社區(qū)也做了一些優(yōu)化,異步執(zhí)行index并發(fā)加載索引等,無(wú)需等待checkpoint完成,index不會(huì)阻塞數(shù)據(jù)寫入checkpoint等。
??在OLAP選擇上,我們?cè)诓捎肨rino進(jìn)行數(shù)據(jù)查詢Hudi時(shí),由于需要同步工具對(duì)Hudi所有分區(qū)進(jìn)行索引同步,我們也遇到了需要兼容分區(qū)策略等問題。我們參考了Hudi同步metastore工具編寫了轉(zhuǎn)換類兼容了自定義分區(qū)。
5. 未來(lái)展望
在使用Hudi開源組件過(guò)程中,我們體會(huì)到必須緊密與社區(qū)保持溝通,及時(shí)反饋問題,也可以與來(lái)自其它公司不同業(yè)務(wù)場(chǎng)景的工程師進(jìn)行交流,分享我們遇到的問題及解決思路。后續(xù)的改進(jìn),我們會(huì)從脫離第三方服務(wù)DMS 試圖直接使用Flink 進(jìn)行全量數(shù)據(jù)同步,減少鏈路中組件的維護(hù)數(shù)量,同樣的,我們將積極跟隨Hudi及Flink的發(fā)展,優(yōu)化整體鏈路的效率。
