<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink 實(shí)踐 | 37 手游基于 Flink CDC + Hudi 湖倉(cāng)一體方案實(shí)踐

          共 7375字,需瀏覽 15分鐘

           ·

          2021-09-21 23:23

          ▼ 關(guān)注「Flink 中文社區(qū)」,獲取更多技術(shù)干貨 
          摘要:本文作者是 37 手游大數(shù)據(jù)開發(fā)徐潤(rùn)柏,介紹了 37 手游為何選擇 Flink 作為計(jì)算引擎,并如何基于 Flink CDC + Hudi 構(gòu)建新的湖倉(cāng)一體方案,主要內(nèi)容包括:

          1. Flink CDC 基本知識(shí)介紹
          2. Hudi 基本知識(shí)介紹
          3. 37 手游的業(yè)務(wù)痛點(diǎn)和技術(shù)方案選型
          4. 37 手游湖倉(cāng)一體介紹
          5. Flink CDC + Hudi 實(shí)踐
          6. 總結(jié)

          Tips:點(diǎn)「閱讀原文」即可查看更多技術(shù)文章~ 

           GitHub 地址 
          歡迎大家給 Flink 點(diǎn)贊送 star~


          一、Flink-CDC 2.0


          Flink CDC Connectors 是 Apache Flink 的一個(gè) source 端的連接器,目前 2.0 版本支持從 MySQL 以及 Postgres 兩種數(shù)據(jù)源中獲取數(shù)據(jù),2.1 版本社區(qū)確定會(huì)支持 Oracle,MongoDB 數(shù)據(jù)源。


          Fink CDC 2.0 的核心 feature,主要表現(xiàn)為實(shí)現(xiàn)了以下三個(gè)非常重要的功能:


          • 全程無(wú)鎖,不會(huì)對(duì)數(shù)據(jù)庫(kù)產(chǎn)生需要加鎖所帶來(lái)的風(fēng)險(xiǎn);


          • 多并行度,全量數(shù)據(jù)的讀取階段支持水平擴(kuò)展,使億級(jí)別的大表可以通過(guò)加大并行度來(lái)加快讀取速度;

          • 斷點(diǎn)續(xù)傳,全量階段支持 checkpoint,即使任務(wù)因某種原因退出了,也可通過(guò)保存的 checkpoint 對(duì)任務(wù)進(jìn)行恢復(fù)實(shí)現(xiàn)數(shù)據(jù)的斷點(diǎn)續(xù)傳。

          Flink CDC Connectors:
          https://ververica.github.io/flink-cdc-connectors/master/index.html

          ▼ Flink CDC 2.0 更多詳情 

          Flink CDC 2.0 正式發(fā)布,詳解核心改進(jìn)


          二、Hudi


          Apache Hudi 目前被業(yè)內(nèi)描述為圍繞數(shù)據(jù)庫(kù)內(nèi)核構(gòu)建的流式數(shù)據(jù)湖平臺(tái) (Streaming Data Lake Platform)。


          由于 Hudi 擁有良好的 Upsert 能力,并且 0.10 Master 對(duì) Flink 版本支持至 1.13.x,因此我們選擇通過(guò) Flink + Hudi 的方式為 37 手游的業(yè)務(wù)場(chǎng)景提供分鐘級(jí) Upsert 數(shù)據(jù)的分析查詢能力。


          Apache Hudi:
          https://hudi.apache.org/docs/overview/


          三、37 手游的業(yè)務(wù)痛點(diǎn)和技術(shù)方案選型



          1. 舊架構(gòu)與業(yè)務(wù)痛點(diǎn)


          ■ 1.1 數(shù)據(jù)實(shí)時(shí)性不夠


          • 日志類數(shù)據(jù)通過(guò) sqoop 每 30min 同步前 60min 數(shù)據(jù)到 Hive;


          • 數(shù)據(jù)庫(kù)類數(shù)據(jù)通過(guò) sqoop 每 60min 同步當(dāng)天全量數(shù)據(jù)到 Hive;


          • 數(shù)據(jù)庫(kù)類數(shù)據(jù)通過(guò) sqoop 每天同步前 60 天數(shù)據(jù)到 Hive。


          ■ 1.2 業(yè)務(wù)代碼邏輯復(fù)雜且難維護(hù)


          • 目前 37 手游還有很多的業(yè)務(wù)開發(fā)沿用 MySQL + PHP 的開發(fā)模式,代碼邏輯復(fù)雜且很難維護(hù);

          • 相同的代碼邏輯,往往流處理需要開發(fā)一份代碼,批處理則需要另開發(fā)一份代碼,不能復(fù)用。

          ■ 1.3 頻繁重刷歷史數(shù)據(jù)


          • 頻繁地重刷歷史數(shù)據(jù)來(lái)保證數(shù)據(jù)一致。

          ■ 1.4 Schema 變更頻繁


          • 由于業(yè)務(wù)需求,經(jīng)常需要添加表字段。

          ■ 1.5 Hive 版本低


          • 目前 Hive 使用版本為 1.x 版本,并且升級(jí)版本比較困難;

          • 不支持 Upsert;

          • 不支持行級(jí)別的 delete。

          由于 37 手游的業(yè)務(wù)場(chǎng)景,數(shù)據(jù) upsert、delete 是個(gè)很常見的需求。所以基于 Hive 數(shù)倉(cāng)的架構(gòu)對(duì)業(yè)務(wù)需求的滿足度不夠。


          2. 技術(shù)選型


          在同步工具的選型上考慮過(guò) Canal 和 Maxwell。但 Canal 只適合增量數(shù)據(jù)的同步并且需要部署,維護(hù)起來(lái)相對(duì)較重。而 Maxwell 雖然比較輕量,但與 Canal 一樣需要配合 Kafka 等消息隊(duì)列使用。對(duì)比之下,F(xiàn)link CDC 可以通過(guò)配置 Flink connector 的方式基于 Flink-SQL 進(jìn)行使用,十分輕巧,并且完美契合基于 Flink-SQL 的流批一體架構(gòu)。

          在存儲(chǔ)引擎的選型上,目前最熱門的數(shù)據(jù)湖產(chǎn)品當(dāng)屬:Apache Hudi,Apache Iceberg 和 DeltaLake,這些在我們的場(chǎng)景下各有優(yōu)劣。最終,基于 Hudi 對(duì)上下游生態(tài)的開放、對(duì)全局索引的支持、對(duì) Flink 1.13 版本的支持,以及對(duì) Hive 版本的兼容性 (Iceberg 不支持 Hive1.x 的版本) 等原因,選擇了 Hudi 作為湖倉(cāng)一體和流批一體的存儲(chǔ)引擎。

          針對(duì)上述存在的業(yè)務(wù)痛點(diǎn)以及選型對(duì)比,我們的最終方案為:以 Flink1.13.2 作為計(jì)算引擎,依靠 Flink 提供的流批統(tǒng)一的 API,基于 Flink-SQL 實(shí)現(xiàn)流批一體,F(xiàn)link-CDC 2.0 作為 ODS 層的數(shù)據(jù)同步工具以及 Hudi-0.10 Master 作為存儲(chǔ)引擎的湖倉(cāng)一體,解決維護(hù)兩套代碼的業(yè)務(wù)痛點(diǎn)。


          四、新架構(gòu)與湖倉(cāng)一體


          37 手游的湖倉(cāng)一體方案,是 37 手游流批一體架構(gòu)的一部分。通過(guò)湖倉(cāng)一體、流批一體,準(zhǔn)實(shí)時(shí)場(chǎng)景下做到了:數(shù)據(jù)同源、同計(jì)算引擎、同存儲(chǔ)、同計(jì)算口徑。數(shù)據(jù)的時(shí)效性可以到分鐘級(jí),能很好的滿足業(yè)務(wù)準(zhǔn)實(shí)時(shí)數(shù)倉(cāng)的需求。下面是架構(gòu)圖:



          MySQL 數(shù)據(jù)通過(guò) Flink CDC 進(jìn)入到 Kafka。之所以數(shù)據(jù)先入 Kafka 而不是直接入 Hudi,是為了實(shí)現(xiàn)多個(gè)實(shí)時(shí)任務(wù)復(fù)用 MySQL 過(guò)來(lái)的數(shù)據(jù),避免多個(gè)任務(wù)通過(guò) Flink CDC 接 MySQL 表以及 Binlog,對(duì) MySQL 庫(kù)的性能造成影響。

          通過(guò) CDC 進(jìn)入到 Kafka 的數(shù)據(jù)除了落一份到離線數(shù)據(jù)倉(cāng)庫(kù)的 ODS 層之外,會(huì)同時(shí)按照實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)的鏈路,從 ODS->DWD->DWS->OLAP 數(shù)據(jù)庫(kù),最后供報(bào)表等數(shù)據(jù)服務(wù)使用。實(shí)時(shí)數(shù)倉(cāng)的每一層結(jié)果數(shù)據(jù)會(huì)準(zhǔn)實(shí)時(shí)的落一份到離線數(shù)倉(cāng),通過(guò)這種方式做到程序一次開發(fā)、指標(biāo)口徑統(tǒng)一,數(shù)據(jù)統(tǒng)一。

          從架構(gòu)圖上,可以看到有一步數(shù)據(jù)修正 (重跑歷史數(shù)據(jù)) 的動(dòng)作,之所以有這一步是考慮到:有可能存在由于口徑調(diào)整或者前一天的實(shí)時(shí)任務(wù)計(jì)算結(jié)果錯(cuò)誤,導(dǎo)致重跑歷史數(shù)據(jù)的情況。

          而存儲(chǔ)在 Kafka 的數(shù)據(jù)有失效時(shí)間,不會(huì)存太久的歷史數(shù)據(jù),重跑很久的歷史數(shù)據(jù)無(wú)法從 Kafka 中獲取歷史源數(shù)據(jù)。再者,如果把大量的歷史數(shù)據(jù)再一次推到 Kafka,走實(shí)時(shí)計(jì)算的鏈路來(lái)修正歷史數(shù)據(jù),可能會(huì)影響當(dāng)天的實(shí)時(shí)作業(yè)。所以針對(duì)重跑歷史數(shù)據(jù),會(huì)通過(guò)數(shù)據(jù)修正這一步來(lái)處理。

          總體上說(shuō),37 手游的數(shù)據(jù)倉(cāng)庫(kù)屬于 Lambda 和 Kappa 混搭的架構(gòu)。流批一體數(shù)據(jù)倉(cāng)庫(kù)的各個(gè)數(shù)據(jù)鏈路有數(shù)據(jù)質(zhì)量校驗(yàn)的流程。第二天對(duì)前一天的數(shù)據(jù)進(jìn)行對(duì)賬,如果前一天實(shí)時(shí)計(jì)算的數(shù)據(jù)無(wú)異常,則不需要修正數(shù)據(jù),Kappa 架構(gòu)已經(jīng)足夠。


          五、Flink CDC 2.0 + Kafka + Hudi 0.10 

          實(shí)踐


          1. 環(huán)境準(zhǔn)備

          • Flink 1.13.2

          • .../lib/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar (修改 Master 分支的 Hudi Flink 版本為 1.13.2 然后構(gòu)建)

          • .../lib/hadoop-mapreduce-client-core-2.7.3.jar (解決 Hudi ClassNotFoundException)

          • ../lib/flink-sql-connector-mysql-cdc-2.0.0.jar

          • ../lib/flink-format-changelog-json-2.0.0.jar

          • ../lib/flink-sql-connector-kafka_2.11-1.13.2.jar

          source 端 MySQL-CDC 表定義:

          create table sy_payment_cdc (  ID BIGINT,  ...  PRIMARY KEY(ID) NOT ENFORCED) with(  'connector' = 'mysql-cdc',  'hostname' = '',  'port' = '',  'username' = '',  'password' = '',  'database-name' = '',  'table-name' = '',  'connect.timeout' = '60s',  'scan.incremental.snapshot.chunk.size' = '100000',  'server-id'='5401-5416');


          值得注意的是:scan.incremental.snapshot.chunk.size 參數(shù)需要根據(jù)實(shí)際情況來(lái)配置,如果表數(shù)據(jù)量不大,使用默認(rèn)值即可。

          Sink 端 Kafka+Hudi COW 表定義:

          create table sy_payment_cdc2kafka (  ID BIGINT,  ...  PRIMARY KEY(ID) NOT ENFORCED) with (  'connector' = 'kafka',  'topic' = '',  'scan.startup.mode' = 'latest-offset',  'properties.bootstrap.servers' = '',  'properties.group.id' = '',  'key.format' = '',  'key.fields' = '',  'format' = 'changelog-json');
          create table sy_payment2hudi ( ID BIGINT, ... PRIMARY KEY(ID) NOT ENFORCED)PARTITIONED BY (YMD)WITH ( 'connector' = 'hudi', 'path' = 'hdfs:///data/hudi/m37_mpay_tj/sy_payment', 'table.type' = 'COPY_ON_WRITE', 'partition.default_name' = 'YMD', 'write.insert.drop.duplicates' = 'true', 'write.bulk_insert.shuffle_by_partition' = 'false', 'write.bulk_insert.sort_by_partition' = 'false', 'write.precombine.field' = 'MTIME', 'write.tasks' = '16', 'write.bucket_assign.tasks' = '16', 'write.task.max.size' = '', 'write.merge.max_memory' = '');


          針對(duì)歷史數(shù)據(jù)入 Hudi,可以選擇離線 bulk_insert 的方式入湖,再通過(guò) Load Index Bootstrap 加載數(shù)據(jù)后接回增量數(shù)據(jù)。bulk_insert 方式入湖數(shù)據(jù)的唯一性依靠源端的數(shù)據(jù)本身,在接回增量數(shù)據(jù)時(shí)也需要做到保證數(shù)據(jù)不丟失。

          這里我們選擇更為簡(jiǎn)單的調(diào)整任務(wù)資源的方式將歷史數(shù)據(jù)入湖。依靠 Flink 的 checkpoint 機(jī)制,不管是 CDC 2.0 入 Kafka 期間還是 Kafka 入 Hudi 期間,都可以通過(guò)指定 checkpoint 的方式對(duì)任務(wù)進(jìn)行重啟并且數(shù)據(jù)不會(huì)丟失。

          我們可以在配置 CDC 2.0 入 Kafka,Kafka 入 Hudi 任務(wù)時(shí)調(diào)大內(nèi)存并配置多個(gè)并行度,加快歷史數(shù)據(jù)入湖,等到所有歷史數(shù)據(jù)入湖后,再相應(yīng)的調(diào)小入湖任務(wù)的內(nèi)存配置并且將 CDC 入 Kafka 的并行度設(shè)置為 1,因?yàn)樵隽侩A段 CDC 是單并行度,然后指定 checkpoint 重啟任務(wù)。

          按照上面表定義的參數(shù)配置,配置 16 個(gè)并行度,F(xiàn)link TaskManager 內(nèi)存大小為 50G 的情況下,單表 15 億歷史數(shù)據(jù)入至 Hudi COW 表實(shí)際用時(shí) 10 小時(shí),單表 9 億數(shù)據(jù)入至 Hudi COW 表實(shí)際用時(shí) 6 小時(shí)。當(dāng)然這個(gè)耗時(shí)很大一部分是 COW 寫放大的特性,在大數(shù)據(jù)量的 upsert 模式下耗時(shí)較多。

          目前我們的集群由 200 多臺(tái)機(jī)器組成,在線的流計(jì)算任務(wù)總數(shù)有 200 多,總數(shù)據(jù)量接近 2PB。

          如果集群資源很有限的情況下,可以根據(jù)實(shí)際情況調(diào)整 Hudi 表以及 Flink 任務(wù)的內(nèi)存配置,還可以通過(guò)配置 Hudi 的限流參數(shù) write.rate.limit 讓歷史數(shù)據(jù)緩慢入湖。



          之前 Flink CDC 1.x 版本由于全量 snapshot 階段單并行度讀取的原因,當(dāng)時(shí)億級(jí)以上的表在全量 snapshot 讀取階段就需要耗費(fèi)很長(zhǎng)時(shí)間,并且 checkpoint 會(huì)失敗無(wú)法保證數(shù)據(jù)的斷點(diǎn)續(xù)傳。


          所以當(dāng)時(shí)入 Hudi 是采用先啟動(dòng)一個(gè) CDC 1.x 的程序?qū)⒋丝涕_始的增量數(shù)據(jù)寫入 Kafka,之后再啟動(dòng)另外一個(gè) sqoop 程序拉取當(dāng)前的所有數(shù)據(jù)至 Hive 后,通過(guò) Flink 讀取 Hive 的數(shù)據(jù)寫 Hudi,最后再把 Kafka 的增量數(shù)據(jù)從頭消費(fèi)接回 Hudi。由于 Kafka 與 Hive 的數(shù)據(jù)存在交集,因此數(shù)據(jù)不會(huì)丟失,加上 Hudi 的 upsert 能力保證了數(shù)據(jù)唯一。


          但是,這種方式的鏈路太長(zhǎng)操作困難,如今通過(guò) CDC 2.0 在全量 snapshot 階段支持多并行度以及 checkpoint 的能力,確實(shí)大大降低了架構(gòu)的復(fù)雜度。


          2. 數(shù)據(jù)比對(duì)


          • 由于生產(chǎn)環(huán)境用的是 Hive1.x,Hudi 對(duì)于 1.x 還不支持?jǐn)?shù)據(jù)同步,所以通過(guò)創(chuàng)建 Hive 外部表的方式進(jìn)行查詢,如果是 Hive2.x 以上版本,可參考 Hive 同步章節(jié);

          • 創(chuàng)建 Hive 外部表 + 預(yù)創(chuàng)建分區(qū);

          • auxlib 文件夾添加 Hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar。


          CREATE EXTERNAL TABLE m37_mpay_tj.`ods_sy_payment_f_d_b_ext`(  `_hoodie_commit_time` string,  `_hoodie_commit_seqno` string,  `_hoodie_record_key` string,  `_hoodie_partition_path` string,  `_hoodie_file_name` string,  `ID` bigint,  ...  )PARTITIONED BY (  `dt` string)ROW FORMAT SERDE  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS INPUTFORMAT  'org.apache.hudi.hadoop.HoodieParquetInputFormat'OUTPUTFORMAT  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'LOCATION  'hdfs:///data/hudi/m37_mpay_tj/sy_payment'


          最終查詢 Hudi 數(shù)據(jù) (Hive 外部表的形式) 與原來(lái) sqoop 同步的 Hive 數(shù)據(jù)做比對(duì)得到:

          • 總數(shù)一致;

          • 按天分組統(tǒng)計(jì)數(shù)量一致;

          • 按天分組統(tǒng)計(jì)金額一致。


          Hive 同步章節(jié):

          https://www.yuque.com/docs/share/01c98494-a980-414c-9c45-152023bf3c17?#IsoNU


          六、總結(jié)


          湖倉(cāng)一體以及流批一體架構(gòu)對(duì)比傳統(tǒng)數(shù)倉(cāng)架構(gòu)主要有以下幾點(diǎn)好處:

          • Hudi 提供了 Upsert 能力,解決頻繁 Upsert/Delete 的痛點(diǎn);

          • 提供分鐘級(jí)的數(shù)據(jù),比傳統(tǒng)數(shù)倉(cāng)有更高的時(shí)效性;

          • 基于 Flink-SQL 實(shí)現(xiàn)了流批一體,代碼維護(hù)成本低;

          • 數(shù)據(jù)同源、同計(jì)算引擎、同存儲(chǔ)、同計(jì)算口徑;

          • 選用 Flink CDC 作為數(shù)據(jù)同步工具,省掉 sqoop 的維護(hù)成本。

          最后針對(duì)頻繁增加表字段的痛點(diǎn)需求,并且希望后續(xù)同步下游系統(tǒng)的時(shí)候能夠自動(dòng)加入這個(gè)字段,目前還沒(méi)有完美的解決方案,希望 Flink CDC 社區(qū)能在后續(xù)的版本提供 Schema Evolution 的支持。


          Reference


          [1] MySQL CDC 文檔:

          https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html


          [2] Hudi Flink 答疑解惑:

          https://www.yuque.com/docs/share/01c98494-a980-414c-9c45-152023bf3c17?#


          [3] Hudi 的一些設(shè)計(jì):

          https://www.yuque.com/docs/share/5d1c383d-c3fc-483a-ad7e-d8181d6295cd?#




          熱點(diǎn)推薦






          更多 Flink 相關(guān)技術(shù)問(wèn)題,可掃碼加入社區(qū)釘釘交流群~

             戳我,查看更多技術(shù)文章!

          瀏覽 86
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产免费一级特黄A片 | 三区在线视频观看 | 在线观看无码高清视频 | 午夜操逼逼 | www.玖玖在线 |