<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink 實踐 | Flink CDC + Hudi + Hive + Presto 構(gòu)建實時數(shù)據(jù)湖最佳實踐

          共 10394字,需瀏覽 21分鐘

           ·

          2021-11-24 02:19



          1. 測試過程環(huán)境版本說明

          Flink1.13.1

          Scala2.11

          CDH6.2.0

          Hadoop3.0.0

          Hive2.1.1

          Hudi0.10(master)

          PrestoDB0.256

          Mysql5.7

          2. 集群服務(wù)器基礎(chǔ)環(huán)境

          2.1 Maven和JDK環(huán)境版本

          2.2 Hadoop 集群環(huán)境版本

          2.3 HADOOP環(huán)境變量配置

          export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoopexport HADOOP_CALSSPATH=`$HADOOP_HOME/bin/hadoop classpath`  

          3. Hudi編譯環(huán)境配置

          3.1 Maven Home settings.xml配置修改

          說明:指定aliyun maven地址(支持CDH cloudera依賴) mirror庫

          alimavencentral,!clouderaaliyun mavenhttp://maven.aliyun.com/nexus/content/groups/public/

          3.2 下載Hudi源碼包

          git clone https://github.com/apache/hudi.git

          Hudi社區(qū)建議版本適配

          hudi0.9 適配 flink1.12.2

          hudi0.10(master) 適配 flink1.13.X ( 說明master分支上版本還未release)

          3.3 Hudi 客戶端命令行

          3.4 修改Hudi集成flink和Hive編譯依賴版本配置

          hudi-master/packaging/hudi-flink-bundle

          pom.xml文件 ( 筆者環(huán)境CDH6.2.0 hive2.1.1)

          flink-bundle-shade-hive22.1.1-cdh6.2.0compile${hive.groupid}hive-service-rpc${hive.version}${flink.bundle.hive.scope}

          3.5 編譯Hudi 指定Hadoop和Hive版本信息

          mvn clean install -DskipTests-Drat.skip=true-Dscala-2.11-Dhadoop.version=3.0.0-Pflink-bundle-shade-hive2

          (可加 –e –X 參數(shù)查看編譯ERROR異常和DEBUG信息)

          說明:默認(rèn)scala2.11、默認(rèn)不包含hive依賴

          首次編譯耗時較長 筆者首次編譯大概花費 50min+(也和服務(wù)器網(wǎng)絡(luò)有關(guān))

          后續(xù)編譯會快一些 大約15min左右

          3.6 Hudi編譯異常

          修改Hudi master pom.xml 增加 CDH repository地址

          3.7 Hudi重新編譯

          3.8 Hudi編譯結(jié)果說明

          hudi-master/packaging/hudi-flink-bundle/target

          hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar

          說明:hudi-flink-bundle jar 是 flink 用來寫入和讀取數(shù)據(jù)

          hudi-master/packaging/hudi-hadoop-mr-bundle/target

          hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar

          說明:hudi-mr-bundle jar 是 hive 需要用來讀hudi數(shù)據(jù)

          4. Flink環(huán)境配置

          版本說明:Flink 1.13.1 scala2.11版本

          4.1 FLINK_HOME 下 sql-client-defaults.yaml 配置

          4.2 flink-conf.yaml配置修改

          # state.backend: filesystemstate.backend: rocksdb# 開啟增量checkpointstate.backend.incremental: true# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpointsstate.checkpoints.dir: hdfs://nameservice/flink/flink-checkpointsclassloader.check-leaked-classloader: falseclassloader.resolve-order: parent-first

          4.3 FLINK_HOME lib下添加依賴

          flink-sql-connector-mysql-cdc-1.4.0.jarflink-sql-connector-oracle-cdc-2.1-SNAPSHOT.jar.BAK – oracle cdc 依賴 flink-format-changelog-json-1.4.0.jarflink-sql-connector-kafka_2.11-1.13.1.jar--- Hadoop home lib下copy過來hadoop-mapreduce-client-common-3.0.0-cdh6.2.0.jarhadoop-mapreduce-client-core-3.0.0-cdh6.2.0.jarhadoop-mapreduce-client-jobclient-3.0.0-cdh6.2.0.jar--- hudi編譯jar copy過來hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar

          說明:目前oracle cdc jar和mysql cdc jar一起在lib下發(fā)現(xiàn)有沖突異常

          5 啟動flink yarn session服務(wù)

          5.1 FLINK_HOME shell 命令

          $FLINK_HOME/bin/yarn-session.sh -s 2-jm 2048-tm 2048-nm ys-hudi01 -d

          5.2 Yarn Web UI

          5.3 Flinksql Client 啟動命令

          $FLINK_HOME/bin/sql-client.sh embedded -j ./lib/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar shell

          說明:-j指定hudi-flink 依賴jar

          Show table /show catalogs

          6. MySQL binlog 開啟配置

          6.1 創(chuàng)建binlog日志存儲路徑

          mkdir logs

          6.2 修改目錄屬主和group

          chown -R mysql:mysql /mysqldata/logs

          6.3 修改mysql配置信息

          vim /etc/my.cnfserver-id=2log-bin= /mysqldata/logs/mysql-binbinlog_format=rowexpire_logs_days=15binlog_row_image=full

          6.4 修改完,重啟mysql server

          service mysqld restart

          6.5 客戶端查看binlog日志情況

          show master logs;

          Mysql 版本:5.7.30

          5.6 創(chuàng)建mysql sources 表 DDL

          create table users_cdc(   id bigint auto_increment primary key,   name varchar(20) null,   birthday timestamp default CURRENT_TIMESTAMP notnull,   ts timestamp default CURRENT_TIMESTAMP notnull);

          7. FlinkCDC sink Hudi測試代碼過程

          7.1 Flink sql cdc DDL 語句:(具體參數(shù)說明可參考flink官網(wǎng))

          CREATE TABLE mysql_users (    id BIGINT PRIMARY KEY NOT ENFORCED ,    name STRING,    birthday TIMESTAMP(3),    ts TIMESTAMP(3)) WITH ('connector'= 'mysql-cdc','hostname'= '127.0.0.1','port'= '3306','username'= '','password'=’’,'server-time-zone'= 'Asia/Shanghai','debezium.snapshot.mode'='initial','database-name'= 'luo','table-name'= 'users_cdc');

          7.2 查詢mysql cdc 表

          Flink SQL> select * from mysql_users;

          由于目前MySQL users_cdc表是空,所以flinksql 查詢沒有數(shù)據(jù) 只有表結(jié)構(gòu);

          Flink web UI:

          7.3 創(chuàng)建一個臨時視圖,增加分區(qū)列 方便后續(xù)同步hive分區(qū)表

          Flink SQL> create view mycdc_v AS SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as?partition?FROM mysql_users;

          說明:partition 關(guān)鍵字需要 `` 引起來

          查詢視圖數(shù)據(jù)也是空結(jié)構(gòu),但增加了分區(qū)字段:

          Flink SQL> select * from mycdc_v;

          Flink web UI:

          7.4 設(shè)置checkpoint間隔時間,存儲路徑已在flink-conf配置設(shè)置全局路徑

          建議:測試環(huán)境 可設(shè)置秒級別(不能太小),生產(chǎn)環(huán)境可設(shè)置分鐘級別。

          Flink SQL> set execution.checkpointing.interval=30sec;

          7.5 Flinksql 創(chuàng)建 cdc sink hudi文件,并自動同步hive分區(qū)表DDL 語句

          CREATE TABLE mysqlcdc_sync_hive01(id bigint ,name string,birthday TIMESTAMP(3),ts TIMESTAMP(3),`partition` VARCHAR(20),primary key(id) not enforced --必須指定uuid 主鍵)PARTITIONED BY (`partition`)with('connector'='hudi','path'= 'hdfs://nameservice /luo/hudi/mysqlcdc_sync_hive01', 'hoodie.datasource.write.recordkey.field'= 'id'-- 主鍵, 'write.precombine.field'= 'ts'-- 自動precombine的字段, 'write.tasks'= '1', 'compaction.tasks'= '1', 'write.rate.limit'= '2000'-- 限速, 'table.type'= 'MERGE_ON_READ'-- 默認(rèn)COPY_ON_WRITE,可選MERGE_ON_READ , 'compaction.async.enabled'= 'true'-- 是否開啟異步壓縮, 'compaction.trigger.strategy'= 'num_commits'-- 按次數(shù)壓縮, 'compaction.delta_commits'= '1'-- 默認(rèn)為5, 'changelog.enabled'= 'true'-- 開啟changelog變更, 'read.streaming.enabled'= 'true'-- 開啟流讀, 'read.streaming.check-interval'= '3'-- 檢查間隔,默認(rèn)60s, 'hive_sync.enable'= 'true'-- 開啟自動同步hive, 'hive_sync.mode'= 'hms'-- 自動同步hive模式,默認(rèn)jdbc模式, 'hive_sync.metastore.uris'= 'thrift://hadoop:9083'-- hive metastore地址-- , 'hive_sync.jdbc_url'= 'jdbc:hive2://hadoop:10000'-- hiveServer地址, 'hive_sync.table'= 'mysqlcdc_sync_hive01'-- hive 新建表名, 'hive_sync.db'= 'luo'-- hive 新建數(shù)據(jù)庫名, 'hive_sync.username'= ''-- HMS 用戶名, 'hive_sync.password'= ''-- HMS 密碼, 'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp類型);

          說明:Hudi目前支持MOR和COW兩種模式

          (1) Copy on Write:使用列式存儲來存儲數(shù)據(jù)(例如:parquet),通過在寫入期間執(zhí)行同步合并來簡單地更新和重現(xiàn)文件

          (2) Merge on Read:使用列式存儲(parquet)+行式文件(arvo)組合存儲數(shù)據(jù)。更新記錄到增量文件中,然后進行同步或異步壓縮來生成新版本的列式文件。

          COW:Copy on Write (寫時復(fù)制),快照查詢+增量查詢

          MOR:Merge on Read (讀時合并),快照查詢+增量查詢+讀取優(yōu)化查詢(近實時)

          使用場景上:

          (1)COW適用寫少讀多的場景 ,MOR 適用寫多讀少的場景;

          (2)MOR適合CDC場景,更新延遲要求較低,COW目前不支持 changelog mode 不適合處理cdc場景;

          Flink web UI

          7.6 Flink sql mysql cdc數(shù)據(jù)寫入hudi文件數(shù)據(jù)

          Flink SQL> insert into mysqlcdc_sync_hive01 select id,name,birthday,ts,partition?from mycdc_v;

          Flink web UI DAG圖:

          7.7 HDFS上Hudi文件目錄情況

          說明:目前還沒寫入測試數(shù)據(jù),hudi目錄只生成一些狀態(tài)標(biāo)記文件,還未生成分區(qū)目錄以及.log 和.parquet數(shù)據(jù)文件,具體含義可見hudi官方文檔。

          7.8 Mysql數(shù)據(jù)源寫入測試數(shù)據(jù)

          insert into users_cdc (name) values ('cdc01');

          7.9 Flinksql 查詢mysql cdc insert數(shù)據(jù):

          Flink SQL> set execution.result-mode=tableau;

          [WARNING] The specified key 'execution.result-mode' is deprecated. Please use 'sql-client.execution.result-mode' instead.

          [INFO] Session property has been set.

          Flink SQL> select * from mysql_users; -- 查詢到一條insert數(shù)據(jù)

          7.10 Flink web UI頁面可以看到DAG 各個環(huán)節(jié)產(chǎn)生一條測試數(shù)據(jù)

          7.11 Flinksql 查詢 sink的hudi表數(shù)據(jù)

          Flink SQL> select * from mysqlcdc_sync_hive01; --已查詢到一條insert數(shù)據(jù)

          7.12 Hdfs上Hudi文件目錄變化情況

          7.13 Hive分區(qū)表和數(shù)據(jù)自動同步情況

          7.14 查看自動創(chuàng)建hive表結(jié)構(gòu)

          hive> show create table mysqlcdc_sync_hive01_ro;

          hive> show create table mysqlcdc_sync_hive01_rt;

          7.15 查看自動生成的表分區(qū)信息

          hive> show partitions mysqlcdc_sync_hive01_ro;

          hive> show partitions mysqlcdc_sync_hive01_rt;

          說明:已自動生產(chǎn)hudi MOR模式的

          mysqlcdc_sync_hive01_ro

          mysqlcdc_sync_hive01_rt

          ro表和rt表區(qū)別:

          ro 表全稱 read oprimized table,對于 MOR 表同步的 xxx_ro 表,只暴露壓縮后的 parquet。其查詢方式和COW表類似。設(shè)置完 hiveInputFormat 之后 和普通的 Hive 表一樣查詢即可;

          rt表示增量視圖,主要針對增量查詢的rt表;

          ro表只能查parquet文件數(shù)據(jù), rt表 parquet文件數(shù)據(jù)和log文件數(shù)據(jù)都可查;

          7.16 Hive訪問Hudi數(shù)據(jù)

          說明:需要引入hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar

          引入Hudi依賴jar方式:

          (1) 引入到 $HIVE_HOME/lib下;

          (2) 引入到$HIVE_HOME/auxlib 自定義第三方依賴 修改 hive-site.xml配置文件;

          (3) Hive shell命令行引入 Session級別有效;

          其中(1)和(3)配置完后需要重啟 hive-server服務(wù);

          查詢Hive 分區(qū)表數(shù)據(jù):

          hive> select * from mysqlcdc_sync_hive01_ro; --已查詢到mysq insert的一條數(shù)據(jù)

          hive> select * from mysqlcdc_sync_hive01_rt; --已查詢到mysq insert的一條數(shù)據(jù)

          Hive 條件查詢:

          hive> select name,ts from mysqlcdc_sync_hive01_ro where?partition='20211109';

          Hive ro表 count查詢

          hive> select count(1) from mysqlcdc_sync_hive01_ro;


          Hive Count異常解決:

          引入hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar依賴

          hive> add jar hdfs://nameservice /luo/hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar;

          hive> set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;


          hive> select count(1) from mysqlcdc_sync_hive01_ro; --可正常count


          Hive rt表 count查詢

          hive> select count(1) from mysqlcdc_sync_hive01_rt;

          說明:rt 表count 還是異常,和Hudi社區(qū)人員溝通hudi master目前還沒release這塊存在bug正在修復(fù)中

          具體見:https://issues.apache.org/jira/browse/HUDI-2649

          7.17 Mysql 數(shù)據(jù)源寫入多條測試數(shù)據(jù)

          insert into users_cdc (name) values ('cdc02');insert into users_cdc (name) values ('cdc03');insert into users_cdc (name) values ('cdc04');insert into users_cdc (name) values ('cdc05');insert into users_cdc (name) values ('cdc06');


          Flink web UI DAG中數(shù)據(jù)鏈路情況:

          7.18 Flinksql中新寫入數(shù)據(jù)查詢情況

          Yarn web UI?application_1626256835287_40351[1]資源使用情況


          Hdfs上Hudi文件目錄變化情況


          Hudi狀態(tài)文件說明:

          (1)requested:表示一個動作已被安排,但尚未啟動

          (2)inflight:表示當(dāng)前正在執(zhí)行操作

          (3)completed:表示在時間線上完成了操作

          Flink jobmanager log sync hive過程詳細(xì)日志




          7.19 Mysql 數(shù)據(jù)源更新數(shù)據(jù)

          update users_cdc set name = 'cdc05-bj'where id = 5;


          7.20 Flinksql 查詢cdc update數(shù)據(jù) 產(chǎn)生兩條binlog數(shù)據(jù)


          說明:flinksql 查詢最終只有一條+I有效數(shù)據(jù),且數(shù)據(jù)已更新

          Flink web UI DAG接受到兩條binlog數(shù)據(jù),但最終compact和sink只有一條有效數(shù)據(jù)


          7.21 MySQL 數(shù)據(jù)源 delete 一條數(shù)據(jù):

          deletefrom users_cdc where id = 3;


          Flink Web UI job DAG中捕獲一條新數(shù)據(jù):


          Flinksql changlog delete數(shù)據(jù)變化查詢

          HDFS上Hudi數(shù)據(jù)文件生成情況



          Hudi文件類型說明:

          (1)commits: 表示將一批數(shù)據(jù)原子性寫入表中

          (2)cleans: 清除表中不在需要的舊版本文件的后臺活動

          (3)delta_commit:增量提交是指將一批數(shù)據(jù)原子性寫入MergeOnRead類型的表中,其中部分或者所有數(shù)據(jù)可以寫入增量日志中

          (4)compaction: 協(xié)調(diào)hudi中差異數(shù)據(jù)結(jié)構(gòu)的后臺活動,例如:將更新從基于行的日志文件變成列格式。在內(nèi)部,壓縮的表現(xiàn)為時間軸上的特殊提交

          (5)rollback:表示提交操作不成功且已經(jīng)回滾,會刪除在寫入過程中產(chǎn)生的數(shù)據(jù)


          說明:hudi分區(qū)文件以及.log和.parquet文件都已生成

          兩種文件區(qū)別:Hudi會在DFS分布式文件系統(tǒng)上的basepath基本路徑下組織成目錄結(jié)構(gòu)。每張對應(yīng)的表都會成多個分區(qū),這些分區(qū)是包含該分區(qū)的數(shù)據(jù)文件的文件夾,與hive的目錄結(jié)構(gòu)非常相似。在每個分區(qū)內(nèi),文件被組織成文件組,文件id為唯一標(biāo)識。每個文件組包含多個切片,其中每個切片包含在某個提交/壓縮即時時間生成的基本列文件(parquet文件),以及自生成基本文件以來對基本文件的插入/更新的一組日志文件(*.log)。Hudi采用MVCC設(shè)計,其中壓縮操作會將日志和基本文件合并成新的文件片,清理操作會將未使用/較舊的文件片刪除來回收DFS上的空間。

          Flink 任務(wù)checkpoint 情況:

          設(shè)置30s 一次



          7.22 Hive shell查詢數(shù)據(jù)update和delete變化情況:

          hive> select * from mysqlcdc_sync_hive01_ro;


          hive> select * from mysqlcdc_sync_hive01_rt;


          7.23 Hudi Client端操作Hudi表

          進入Hudi客戶端命令行:

          hudi-master/hudi-cli/hudi-cli.sh

          連接Hudi表,查看表信息

          hudi->connect --path hdfs://nameservice1/tmp/luo/hudi/mysqlcdc_sync_hive01


          查看Hudi commit信息

          hudi:mysqlcdc_sync_hive01->commits show --sortBy "CommitTime"

          查看Hudi compactions 計劃

          hudi:mysqlcdc_sync_hive01->compactions show all

          7.24 PrestoDB 查詢Hive表Hudi數(shù)據(jù)

          版本說明:PrestoDB 0.256 DBeaver7.0.4

          PrestoDB 集群配置和hive集成參考PrestoDB官網(wǎng)

          presto-server-***/etc/catalog/hive.properties 配置hive catalog

          可通過 presto-cli 連接 hive metastore 開啟查詢,presto-cli 的設(shè)置參考 presto官方配置;

          DBeaver客戶端查詢Hive ro表數(shù)據(jù):


          Hive ro表count 正常:


          查詢Hive rt表數(shù)據(jù)查詢異常:


          Hive rt表count異常:


          Presto Web ui:



          推薦閱讀


          超詳細(xì)步驟!整合Apache Hudi + Flink + CDH

          Apache Kyuubi + Hudi在 T3 出行的深度實踐

          Apache Hudi PMC暢談Hudi未來演進之路

          順豐科技 Hudi on Flink 實時數(shù)倉實踐

          一文徹底弄懂Apache Hudi不同表類型



          瀏覽 127
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  激情成人综合网 | 五月丁香在线观看 | 黑人大屌轮奸视频播放免费成人 | wWW香焦yeyeLu | 女人精品视频 |