<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink+hudi 構(gòu)架滄湖一體化解決方案

          共 10777字,需瀏覽 22分鐘

           ·

          2021-12-14 12:00

          簡介

          Apache Hudi(發(fā)音為"Hoodie")在DFS的數(shù)據(jù)集上提供以下流原語

          • 插入更新 (如何改變數(shù)據(jù)集?)
          • 增量拉取 (如何獲取變更的數(shù)據(jù)?)

          Hudi維護在數(shù)據(jù)集上執(zhí)行的所有操作的時間軸(timeline),以提供數(shù)據(jù)集的即時視圖。Hudi將數(shù)據(jù)集組織到與Hive表非常相似的基本路徑下的目錄結(jié)構(gòu)中。數(shù)據(jù)集分為多個分區(qū),文件夾包含該分區(qū)的文件。每個分區(qū)均由相對于基本路徑的分區(qū)路徑唯一標(biāo)識。

          分區(qū)記錄會被分配到多個文件。每個文件都有一個唯一的文件ID和生成該文件的提交(commit)。如果有更新,則多個文件共享相同的文件ID,但寫入時的提交(commit)不同。

          存儲類型–處理數(shù)據(jù)的存儲方式

          • 寫時復(fù)制

          • 純列式

          • 創(chuàng)建新版本的文件

          • 讀時合并

          • 近實時

          視圖–處理數(shù)據(jù)的讀取方式

          讀取優(yōu)化視圖-輸入格式僅選擇壓縮的列式文件

          • parquet文件查詢性能

          • 500 GB的延遲時間約為30分鐘

          • 導(dǎo)入現(xiàn)有的Hive表

          近實時視圖

          • 混合、格式化數(shù)據(jù)

          • 約1-5分鐘的延遲

          • 提供近實時表

          增量視圖

          • 數(shù)據(jù)集的變更

          • 啟用增量拉取

          Hudi存儲層由三個不同的部分組成

          元數(shù)據(jù)–它以時間軸的形式維護了在數(shù)據(jù)集上執(zhí)行的所有操作的元數(shù)據(jù),該時間軸允許將數(shù)據(jù)集的即時視圖存儲在基本路徑的元數(shù)據(jù)目錄下。時間軸上的操作類型包括

          • 提交(commit),一次提交表示將一批記錄原子寫入數(shù)據(jù)集中的過程。單調(diào)遞增的時間戳,提交表示寫操作的開始。

          • 清理(clean),清理數(shù)據(jù)集中不再被查詢中使用的文件的較舊版本。

          • 壓縮(compaction),將行式文件轉(zhuǎn)化為列式文件的動作。

          • 索引,將傳入的記錄鍵快速映射到文件(如果已存在記錄鍵)。索引實現(xiàn)是可插拔的,Bloom過濾器-由于不依賴任何外部系統(tǒng),因此它是默認配置,索引和數(shù)據(jù)始終保持一致。Apache HBase-對少量key更高效。在索引標(biāo)記過程中可能會節(jié)省幾秒鐘。

          • 數(shù)據(jù),Hudi以兩種不同的存儲格式存儲數(shù)據(jù)。實際使用的格式是可插入的,但要求具有以下特征–讀優(yōu)化的列存儲格式(ROFormat),默認值為Apache Parquet;寫優(yōu)化的基于行的存儲格式(WOFormat),默認值為Apache Avro。

          為什么Hudi對于大規(guī)模和近實時應(yīng)用很重要?

          Hudi解決了以下限制

          • HDFS的可伸縮性限制

          • 需要在Hadoop中更快地呈現(xiàn)數(shù)據(jù)

          • 沒有直接支持對現(xiàn)有數(shù)據(jù)的更新和刪除

          • 快速的ETL和建模

          • 要檢索所有更新的記錄,無論這些更新是添加到最近日期分區(qū)的新記錄還是對舊數(shù)據(jù)的更新,Hudi都允許用戶使用最后一個檢查點時間戳。此過程不用執(zhí)行掃描整個源表的查詢

          Hudi的優(yōu)勢

          • HDFS中的可伸縮性限制。
          • Hadoop中數(shù)據(jù)的快速呈現(xiàn)
          • 支持對于現(xiàn)有數(shù)據(jù)的更新和刪除
          • 快速的ETL和建模

          新架構(gòu)與湖倉一體

          通過湖倉一體、流批一體,準(zhǔn)實時場景下做到了:數(shù)據(jù)同源、同計算引擎、同存儲、同計算口徑。數(shù)據(jù)的時效性可以到分鐘級,能很好的滿足業(yè)務(wù)準(zhǔn)實時數(shù)倉的需求。下面是架構(gòu)圖:

          MySQL 數(shù)據(jù)通過 Flink CDC 進入到 Kafka。之所以數(shù)據(jù)先入 Kafka 而不是直接入 Hudi,是為了實現(xiàn)多個實時任務(wù)復(fù)用 MySQL 過來的數(shù)據(jù),避免多個任務(wù)通過 Flink CDC 接 MySQL 表以及 Binlog,對 MySQL 庫的性能造成影響。

          通過 CDC 進入到 Kafka 的數(shù)據(jù)除了落一份到離線數(shù)據(jù)倉庫的 ODS 層之外,會同時按照實時數(shù)據(jù)倉庫的鏈路,從 ODS->DWD->DWS->OLAP 數(shù)據(jù)庫,最后供報表等數(shù)據(jù)服務(wù)使用。實時數(shù)倉的每一層結(jié)果數(shù)據(jù)會準(zhǔn)實時的落一份到離線數(shù)倉,通過這種方式做到程序一次開發(fā)、指標(biāo)口徑統(tǒng)一,數(shù)據(jù)統(tǒng)一。

          從架構(gòu)圖上,可以看到有一步數(shù)據(jù)修正 (重跑歷史數(shù)據(jù)) 的動作,之所以有這一步是考慮到:有可能存在由于口徑調(diào)整或者前一天的實時任務(wù)計算結(jié)果錯誤,導(dǎo)致重跑歷史數(shù)據(jù)的情況。

          而存儲在 Kafka 的數(shù)據(jù)有失效時間,不會存太久的歷史數(shù)據(jù),重跑很久的歷史數(shù)據(jù)無法從 Kafka 中獲取歷史源數(shù)據(jù)。再者,如果把大量的歷史數(shù)據(jù)再一次推到 Kafka,走實時計算的鏈路來修正歷史數(shù)據(jù),可能會影響當(dāng)天的實時作業(yè)。所以針對重跑歷史數(shù)據(jù),會通過數(shù)據(jù)修正這一步來處理。

          總體上說,這個架構(gòu)屬于 Lambda 和 Kappa 混搭的架構(gòu)。流批一體數(shù)據(jù)倉庫的各個數(shù)據(jù)鏈路有數(shù)據(jù)質(zhì)量校驗的流程。第二天對前一天的數(shù)據(jù)進行對賬,如果前一天實時計算的數(shù)據(jù)無異常,則不需要修正數(shù)據(jù),Kappa 架構(gòu)已經(jīng)足夠。

          最佳實踐

          版本搭配

          版本選擇,這個問題可能會成為困擾大家的第一個絆腳石,下面是hudi中文社區(qū)推薦的版本適配:

          flinkhudi
          1.12.20.9.0
          1.13.10.10.0

          建議用 hudi master +flink 1.13 這樣可以和 cdc connector 更好地適配。

          下載hudi

          https://mvnrepository.com/artifact/org.apache.hudi/hudi-flink-bundle

          目前maven中央倉庫,最新版本是0.9.0 ,如果需要下載0.10.0版本 , 可以加入社區(qū)群,在共享文件中下載,也可以下載源碼自行編譯。

          執(zhí)行

          如果將?hudi-flink-bundle_2.11-0.10.0.jar?放到了?flink/lib?下,則只需要如下執(zhí)行即可,否則會出現(xiàn)各種找不到類的異常

          bin/sql-client.sh embedded

          Flink on hudi

          新建maven工程,修改pom如下


          <project?xmlns="http://maven.apache.org/POM/4.0.0"
          ?????????xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          ?????????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?http://maven.apache.org/xsd/maven-4.0.0.xsd">

          ????<modelVersion>4.0.0modelVersion>

          ????<groupId>org.examplegroupId>
          ????<artifactId>flink_hudi_testartifactId>
          ????<version>1.0-SNAPSHOTversion>

          ????<properties>
          ????????<maven.compiler.source>8maven.compiler.source>
          ????????<maven.compiler.target>8maven.compiler.target>
          ????????<flink.version>1.13.1flink.version>
          ????????<hudi.version>0.10.0hudi.version>
          ????????<hadoop.version>2.10.1hadoop.version>
          ????properties>

          ????<dependencies>
          ????????<dependency>
          ????????????<groupId>org.apache.hadoopgroupId>
          ????????????<artifactId>hadoop-clientartifactId>
          ????????????<version>${hadoop.version}version>
          ????????dependency>
          ????????<dependency>
          ????????????<groupId>org.apache.hadoopgroupId>
          ????????????<artifactId>hadoop-hdfsartifactId>
          ????????????<version>${hadoop.version}version>
          ????????dependency>
          ????????<dependency>
          ????????????<groupId>org.apache.hadoopgroupId>
          ????????????<artifactId>hadoop-commonartifactId>
          ????????????<version>${hadoop.version}version>
          ????????dependency>


          ????????<dependency>
          ????????????<groupId>org.apache.flinkgroupId>
          ????????????<artifactId>flink-coreartifactId>
          ????????????<version>${flink.version}version>
          ????????dependency>
          ????????<dependency>
          ????????????<groupId>org.apache.flinkgroupId>
          ????????????<artifactId>flink-streaming-java_2.11artifactId>
          ????????????<version>${flink.version}version>
          ????????dependency>

          ????????<dependency>
          ????????????<groupId>org.apache.flinkgroupId>
          ????????????<artifactId>flink-connector-jdbc_2.11artifactId>
          ????????????<version>${flink.version}version>
          ????????dependency>

          ????????<dependency>
          ????????????<groupId>org.apache.flinkgroupId>
          ????????????<artifactId>flink-javaartifactId>
          ????????????<version>${flink.version}version>
          ????????dependency>
          ????????<dependency>
          ????????????<groupId>org.apache.flinkgroupId>
          ????????????<artifactId>flink-clients_2.11artifactId>
          ????????????<version>${flink.version}version>
          ????????dependency>
          ????????<dependency>
          ????????????<groupId>org.apache.flinkgroupId>
          ????????????<artifactId>flink-table-api-java-bridge_2.11artifactId>
          ????????????<version>${flink.version}version>
          ????????dependency>
          ????????<dependency>
          ????????????<groupId>org.apache.flinkgroupId>
          ????????????<artifactId>flink-table-commonartifactId>
          ????????????<version>${flink.version}version>
          ????????dependency>

          ????????<dependency>
          ????????????<groupId>org.apache.flinkgroupId>
          ????????????<artifactId>flink-table-planner_2.11artifactId>
          ????????????<version>${flink.version}version>
          ????????dependency>

          ????????<dependency>
          ????????????<groupId>org.apache.flinkgroupId>
          ????????????<artifactId>flink-table-planner-blink_2.11artifactId>
          ????????????<version>${flink.version}version>
          ????????dependency>
          ????????<dependency>
          ????????????<groupId>org.apache.flinkgroupId>
          ????????????<artifactId>flink-table-planner-blink_2.11artifactId>
          ????????????<version>${flink.version}version>
          ????????????<type>test-jartype>
          ????????dependency>

          ????????<dependency>
          ????????????<groupId>com.ververicagroupId>
          ????????????<artifactId>flink-connector-mysql-cdcartifactId>
          ????????????<version>2.0.0version>
          ????????dependency>

          ????????<dependency>
          ????????????<groupId>org.apache.hudigroupId>
          ????????????<artifactId>hudi-flink-bundle_2.11artifactId>
          ????????????<version>${hudi.version}version>
          ????????????<scope>systemscope>
          ????????????<systemPath>${project.basedir}/libs/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jarsystemPath>
          ????????dependency>

          ????????<dependency>
          ????????????<groupId>mysqlgroupId>
          ????????????<artifactId>mysql-connector-javaartifactId>
          ????????????<version>5.1.49version>
          ????????dependency>


          ????dependencies>
          project>

          我們通過構(gòu)建查詢insert into t2 select replace(uuid(),'-',''),id,name,description,now() from mysql_binlog?將創(chuàng)建的mysql表,插入到hudi里。

          package?name.lijiaqi;

          import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import?org.apache.flink.table.api.EnvironmentSettings;
          import?org.apache.flink.table.api.SqlDialect;
          import?org.apache.flink.table.api.TableResult;
          import?org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

          public?class?MysqlToHudiExample?{
          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????EnvironmentSettings?fsSettings?=?EnvironmentSettings.newInstance()
          ????????????????.useBlinkPlanner()
          ????????????????.inStreamingMode()
          ????????????????.build();
          ????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
          ????????env.setParallelism(1);
          ????????StreamTableEnvironment?tableEnv?=?StreamTableEnvironment.create(env,?fsSettings);

          ????????tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

          ????????//?數(shù)據(jù)源表
          ????????String?sourceDDL?=
          ????????????????"CREATE?TABLE?mysql_binlog?(\n"?+
          ????????????????????????"?id?INT?NOT?NULL,\n"?+
          ????????????????????????"?name?STRING,\n"?+
          ????????????????????????"?description?STRING\n"?+
          ????????????????????????")?WITH?(\n"?+
          ????????????????????????"?'connector'?=?'jdbc',\n"?+
          ????????????????????????"?'url'?=?'jdbc:mysql://127.0.0.1:3306/test',?\n"+
          ????????????????????????"?'driver'?=?'com.mysql.jdbc.Driver',?\n"+
          ????????????????????????"?'username'?=?'root',\n"?+
          ????????????????????????"?'password'?=?'dafei1288',?\n"?+
          ????????????????????????"?'table-name'?=?'test_cdc'\n"?+
          ????????????????????????")";

          ????????//?輸出目標(biāo)表
          ????????String?sinkDDL?=
          ????????????????"CREATE?TABLE?t2(\n"?+
          ????????????????????????"\tuuid?VARCHAR(20),\n"+
          ????????????????????????"\tid?INT?NOT?NULL,\n"?+
          ????????????????????????"\tname?VARCHAR(40),\n"?+
          ????????????????????????"\tdescription?VARCHAR(40),\n"?+
          ????????????????????????"\tts?TIMESTAMP(3)\n"+
          //????????????????????????"\t`partition`?VARCHAR(20)\n"?+
          ????????????????????????")\n"?+
          //????????????????????????"PARTITIONED?BY?(`partition`)\n"?+
          ????????????????????????"WITH?(\n"?+
          ????????????????????????"\t'connector'?=?'hudi',\n"?+
          ????????????????????????"\t'path'?=?'hdfs://172.19.28.4:9000/hudi_t4/',\n"?+
          ????????????????????????"\t'table.type'?=?'MERGE_ON_READ'\n"?+
          ????????????????????????")"?;
          ????????//?簡單的聚合處理
          ????????String?transformSQL?=
          ????????????????"insert?into?t2?select?replace(uuid(),'-',''),id,name,description,now()??from?mysql_binlog";

          ????????tableEnv.executeSql(sourceDDL);
          ????????tableEnv.executeSql(sinkDDL);
          ????????TableResult?result?=?tableEnv.executeSql(transformSQL);
          ????????result.print();

          ????????env.execute("mysql-to-hudi");
          ????}
          }

          查詢hudi

          package?name.lijiaqi;

          import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import?org.apache.flink.table.api.EnvironmentSettings;
          import?org.apache.flink.table.api.SqlDialect;
          import?org.apache.flink.table.api.TableResult;
          import?org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

          public?class?ReadHudi?{
          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????EnvironmentSettings?fsSettings?=?EnvironmentSettings.newInstance()
          ????????????????.useBlinkPlanner()
          ????????????????.inStreamingMode()
          ????????????????.build();
          ????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
          ????????env.setParallelism(1);
          ????????StreamTableEnvironment?tableEnv?=?StreamTableEnvironment.create(env,?fsSettings);

          ????????tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

          ????????String?sourceDDL?=
          ????????????????"CREATE?TABLE?t2(\n"?+
          ????????????????????????"\tuuid?VARCHAR(20),\n"+
          ????????????????????????"\tid?INT?NOT?NULL,\n"?+
          ????????????????????????"\tname?VARCHAR(40),\n"?+
          ????????????????????????"\tdescription?VARCHAR(40),\n"?+
          ????????????????????????"\tts?TIMESTAMP(3)\n"+
          //????????????????????????"\t`partition`?VARCHAR(20)\n"?+
          ????????????????????????")\n"?+
          //????????????????????????"PARTITIONED?BY?(`partition`)\n"?+
          ????????????????????????"WITH?(\n"?+
          ????????????????????????"\t'connector'?=?'hudi',\n"?+
          ????????????????????????"\t'path'?=?'hdfs://172.19.28.4:9000/hudi_t4/',\n"?+
          ????????????????????????"\t'table.type'?=?'MERGE_ON_READ'\n"?+
          ????????????????????????")"?;
          ????????tableEnv.executeSql(sourceDDL);
          ????????TableResult?result2?=?tableEnv.executeSql("select?*?from?t2");
          ????????result2.print();

          ????????env.execute("read_hudi");
          ????}
          }

          展示結(jié)果

          Flink CDC 2.0 on Hudi

          上一章節(jié),我們使用代碼形式構(gòu)建實驗,在本章節(jié)里,我們直接使用官網(wǎng)下載的flink包來構(gòu)建實驗環(huán)境。

          添加依賴

          添加如下依賴到$FLINK_HOME/lib下

          • hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar (修改 Master 分支的 Hudi Flink 版本為 1.13.2 然后構(gòu)建)
          • hadoop-mapreduce-client-core-2.7.3.jar (解決 Hudi ClassNotFoundException)
          • flink-sql-connector-mysql-cdc-2.0.0.jar
          • flink-format-changelog-json-2.0.0.jar
          • flink-sql-connector-kafka_2.11-1.13.2.jar

          注意,在尋找jar的時候,cdc 2.0?更新過group id?,不再試?com.alibaba.ververica?而是改成了?com.ververica?


          flink sql cdc on hudi

          創(chuàng)建mysql cdc表

          CREATE??TABLE?mysql_users?(
          ?id?BIGINT?PRIMARY?KEY?NOT?ENFORCED?,
          ?name?STRING,
          ?birthday?TIMESTAMP(3),
          ?ts?TIMESTAMP(3)
          )?WITH?(
          ?'connector'?=?'mysql-cdc',
          ?'hostname'?=?'localhost',
          ?'port'?=?'3306',
          ?'username'?=?'root',
          ?'password'?=?'dafei1288',
          ?'server-time-zone'?=?'Asia/Shanghai',
          ?'database-name'?=?'test',
          ?'table-name'?=?'users'???
          );

          創(chuàng)建hudi表

          CREATE?TABLE?hudi_users5(
          ?id?BIGINT?PRIMARY?KEY?NOT?ENFORCED,
          ????name?STRING,
          ????birthday?TIMESTAMP(3),
          ????ts?TIMESTAMP(3),
          ????`partition`?VARCHAR(20)
          )?PARTITIONED?BY?(`partition`)?WITH?(
          ????'connector'?=?'hudi',
          ????'table.type'?=?'MERGE_ON_READ',
          ????'path'?=?'hdfs://localhost:9009/hudi/hudi_users5'
          );

          修改配置,讓查詢模式輸出為表,設(shè)置checkpoint

          set execution.result-mode=tableau;

          set execution.checkpointing.interval=10sec;

          進行輸入導(dǎo)入

          INSERT INTO hudi_users5(id,name,birthday,ts,?partition) SELECT id,name,birthday,ts,DATE_FORMAT(birthday, 'yyyyMMdd') FROM mysql_users;

          查詢數(shù)據(jù)

          select * from hudi_users5;

          執(zhí)行結(jié)果

          卡執(zhí)行計劃

          這個問題坑了我好幾天,一度都打算放棄hudi了,表面上很正常,日志也沒有任何報錯,也可以看出來cdc起作用了,有數(shù)據(jù)寫入,但是就是卡在?hoodie_stream_write?上一動不動了,沒有數(shù)據(jù)下發(fā)了。感謝社區(qū)大佬?Danny Chan?的提點,可能是 checkpoint的問題,于是做了設(shè)置

          set execution.checkpointing.interval=10sec;

          終于正常了

          致此,F(xiàn)link + Hudi 倉湖一體化方案的原型構(gòu)建完成,感謝大家看到這里。

          Hi,我是王知無,一個大數(shù)據(jù)領(lǐng)域的原創(chuàng)作者。?

          放心關(guān)注我,獲取更多行業(yè)的一手消息。



          八千里路云和月 | 從零到大數(shù)據(jù)專家學(xué)習(xí)路徑指南

          互聯(lián)網(wǎng)最壞的時代可能真的來了

          我在B站讀大學(xué),大數(shù)據(jù)專業(yè)

          我們在學(xué)習(xí)Flink的時候,到底在學(xué)習(xí)什么?

          193篇文章暴揍Flink,這個合集你需要關(guān)注一下

          Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS

          Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點

          我們在學(xué)習(xí)Spark的時候,到底在學(xué)習(xí)什么?

          在所有Spark模塊中,我愿稱SparkSQL為最強!

          硬剛Hive | 4萬字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)

          數(shù)據(jù)治理方法論和實踐小百科全書

          標(biāo)簽體系下的用戶畫像建設(shè)小指南

          4萬字長文 | ClickHouse基礎(chǔ)&實踐&調(diào)優(yōu)全視角解析

          【面試&個人成長】2021年過半,社招和校招的經(jīng)驗之談

          大數(shù)據(jù)方向另一個十年開啟 |《硬剛系列》第一版完結(jié)

          我寫過的關(guān)于成長/面試/職場進階的文章

          當(dāng)我們在學(xué)習(xí)Hive的時候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」

          瀏覽 57
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  久久国产成人免费视频 | 成人污污污www免费网站 | 日韩精品人妻一区二区 | 青娱乐少妇在线免费视频 | 亚洲 国产 另类 无码 日韩 |