<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Apache Hudi:說出來你可能不信,你的ETL任務(wù)慢如狗

          共 9571字,需瀏覽 20分鐘

           ·

          2020-08-24 18:36

          點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)

          回復(fù)”資源“獲取更多資源

          大數(shù)據(jù)技術(shù)與架構(gòu)
          點(diǎn)擊右側(cè)關(guān)注,大數(shù)據(jù)開發(fā)領(lǐng)域最強(qiáng)公眾號(hào)!

          暴走大數(shù)據(jù)
          點(diǎn)擊右側(cè)關(guān)注,暴走大數(shù)據(jù)!




          1.簡介

          Apache Hudi(簡稱:Hudi)使得您能在hadoop兼容的存儲(chǔ)之上存儲(chǔ)大量數(shù)據(jù),同時(shí)它還提供兩種原語,使得除了經(jīng)典的批處理之外,還可以在數(shù)據(jù)湖上進(jìn)行流處理。這兩種原語分別是:
          • Update/Delete記錄:Hudi使用細(xì)粒度的文件/記錄級(jí)別索引來支持Update/Delete記錄,同時(shí)還提供寫操作的事務(wù)保證。查詢會(huì)處理最后一個(gè)提交的快照,并基于此輸出結(jié)果。

          • 變更流:Hudi對獲取數(shù)據(jù)變更提供了一流的支持:可以從給定的時(shí)間點(diǎn)獲取給定表中已updated/inserted/deleted的所有記錄的增量流,并解鎖新的查詢姿勢(類別)。


          如果你希望將數(shù)據(jù)快速提取到HDFS或云存儲(chǔ)中,Hudi可以提供幫助。另外,如果你的ETL /hive/spark作業(yè)很慢或占用大量資源,那么Hudi可以通過提供一種增量式讀取和寫入數(shù)據(jù)的方法來提供幫助。

          2. 基本概念

          存儲(chǔ)類型
          我們看一下 Hudi 的兩種存儲(chǔ)類型:
          • 寫時(shí)復(fù)制(copy on write):僅使用列式文件(parquet)存儲(chǔ)數(shù)據(jù)。在寫入/更新數(shù)據(jù)時(shí),直接同步合并原文件,生成新版本的基文件(需要重寫整個(gè)列數(shù)據(jù)文件,即使只有一個(gè)字節(jié)的新數(shù)據(jù)被提交)。此存儲(chǔ)類型下,寫入數(shù)據(jù)非常昂貴,而讀取的成本沒有增加,所以適合頻繁讀的工作負(fù)載,因?yàn)閿?shù)據(jù)集的最新版本在列式文件中始終可用,以進(jìn)行高效的查詢。

          • 讀時(shí)合并(merge on read):使用列式(parquet)與行式(avro)文件組合,進(jìn)行數(shù)據(jù)存儲(chǔ)。在更新記錄時(shí),更新到增量文件中(avro),然后進(jìn)行異步(或同步)的compaction,創(chuàng)建列式文件(parquet)的新版本。此存儲(chǔ)類型適合頻繁寫的工作負(fù)載,因?yàn)樾掠涗浭且詀ppending 的模式寫入增量文件中。但是在讀取數(shù)據(jù)集時(shí),需要將增量文件與舊文件進(jìn)行合并,生成列式文件。

          ?
          視圖
          在了解這兩種存儲(chǔ)類型后,我們再看一下Hudi支持的存儲(chǔ)數(shù)據(jù)的視圖(也就是查詢模式):
          1. 讀優(yōu)化視圖(Read Optimized view):直接query 基文件(數(shù)據(jù)集的最新快照),也就是列式文件(如parquet)。相較于非Hudi列式數(shù)據(jù)集,有相同的列式查詢性能

          2. 增量視圖(Incremental View):僅query新寫入數(shù)據(jù)集的文件,也就是指定一個(gè)commit/compaction,query此之后的新數(shù)據(jù)。

          3. 實(shí)時(shí)視圖(Real-time View):query最新基文件與增量文件。此視圖通過將最新的基文件(parquet)與增量文件(avro)進(jìn)行動(dòng)態(tài)合并,然后進(jìn)行query??梢蕴峁┙鼘?shí)時(shí)的數(shù)據(jù)(會(huì)有幾分鐘的延遲)

          在以上3種視圖中,“讀優(yōu)化視圖”與“增量視圖”均可在“寫時(shí)復(fù)制”與“讀時(shí)合并”的存儲(chǔ)類型下使用。而“實(shí)時(shí)視圖“僅能在”讀時(shí)合并“模式下使用。

          時(shí)間軸
          最后介紹一下 Hudi 的核心 —— 時(shí)間軸。Hudi 會(huì)維護(hù)一個(gè)時(shí)間軸,在每次執(zhí)行操作時(shí)(如寫入、刪除、合并等),均會(huì)帶有一個(gè)時(shí)間戳。通過時(shí)間軸,可以實(shí)現(xiàn)在僅查詢某個(gè)時(shí)間點(diǎn)之后成功提交的數(shù)據(jù),或是僅查詢某個(gè)時(shí)間點(diǎn)之前的數(shù)據(jù)。這樣可以避免掃描更大的時(shí)間范圍,并非常高效地只消費(fèi)更改過的文件(例如在某個(gè)時(shí)間點(diǎn)提交了更改操作后,僅query某個(gè)時(shí)間點(diǎn)之前的數(shù)據(jù),則仍可以query修改前的數(shù)據(jù))。
          3.典型應(yīng)用場景

          1.近實(shí)時(shí)攝取

          將數(shù)據(jù)從外部源如事件日志、數(shù)據(jù)庫提取到Hadoop數(shù)據(jù)湖中是一個(gè)很常見的問題。在大多數(shù)Hadoop部署中,一般使用混合提取工具并以零散的方式解決該問題,盡管這些數(shù)據(jù)對組織是非常有價(jià)值的。
          對于RDBMS攝取,Hudi通過Upserts提供了更快的負(fù)載,而非昂貴且低效的批量負(fù)載。例如你可以讀取MySQL binlog日志或Sqoop增量導(dǎo)入,并將它們應(yīng)用在DFS上的Hudi表,這比批量合并作業(yè)或復(fù)雜的手工合并工作流更快/更高效。
          對于像Cassandra / Voldemort / HBase這樣的NoSQL數(shù)據(jù)庫,即使規(guī)模集群不大也可以存儲(chǔ)數(shù)十億行數(shù)據(jù),此時(shí)進(jìn)行批量加載則完全不可行,需要采用更有效的方法使得攝取速度與較頻繁的更新數(shù)據(jù)量相匹配。
          即使對于像Kafka這樣的不可變數(shù)據(jù)源,Hudi也會(huì)強(qiáng)制在DFS上保持最小文件大小,從而解決Hadoop領(lǐng)域中的古老問題以便改善NameNode的運(yùn)行狀況。這對于事件流尤為重要,因?yàn)槭录鳎ɡ鐔螕袅鳎┩ǔ]^大,如果管理不善,可能會(huì)嚴(yán)重?fù)p害Hadoop集群性能。
          對于所有數(shù)據(jù)源,Hudi都提供了通過提交將新數(shù)據(jù)原子化地發(fā)布給消費(fèi)者,從而避免部分提取失敗。

          2. 近實(shí)時(shí)分析

          通常實(shí)時(shí)數(shù)據(jù)集市由專門的分析存儲(chǔ),如Druid、Memsql甚至OpenTSDB提供支持。這對于需要亞秒級(jí)查詢響應(yīng)(例如系統(tǒng)監(jiān)視或交互式實(shí)時(shí)分析)的較小規(guī)模(相對于安裝Hadoop)數(shù)據(jù)而言是非常完美的選擇。但由于Hadoop上的數(shù)據(jù)令人難以忍受,因此這些系統(tǒng)通常最終會(huì)被較少的交互查詢所濫用,從而導(dǎo)致利用率不足和硬件/許可證成本的浪費(fèi)。
          另一方面,Hadoop上的交互式SQL解決方案(如Presto和SparkSQL),能在幾秒鐘內(nèi)完成的查詢。通過將數(shù)據(jù)的更新時(shí)間縮短至幾分鐘,Hudi提供了一種高效的替代方案,并且還可以對存儲(chǔ)在DFS上多個(gè)更大的表進(jìn)行實(shí)時(shí)分析。此外,Hudi沒有外部依賴項(xiàng)(例如專用于實(shí)時(shí)分析的專用HBase群集),因此可以在不增加運(yùn)營成本的情況下,對更實(shí)時(shí)的數(shù)據(jù)進(jìn)行更快的分析。

          3. 增量處理管道

          Hadoop提供的一項(xiàng)基本功能是構(gòu)建基于表的派生鏈,并通過DAG表示整個(gè)工作流。工作流通常取決于多個(gè)上游工作流輸出的新數(shù)據(jù),傳統(tǒng)上新生成的DFS文件夾/Hive分區(qū)表示新數(shù)據(jù)可用。例如上游工作流?U可以每小時(shí)創(chuàng)建一個(gè)Hive分區(qū),并在每小時(shí)的末尾(?processing_time)包含該小時(shí)(?event_time)的數(shù)據(jù),從而提供1小時(shí)的數(shù)據(jù)新鮮度。然后下游工作流?D在?U完成后立即開始,并在接下來的一個(gè)小時(shí)進(jìn)行處理,從而將延遲增加到2個(gè)小時(shí)。
          上述示例忽略了延遲到達(dá)的數(shù)據(jù),即?processing_time和?event_time分開的情況。不幸的是在后移動(dòng)和物聯(lián)網(wǎng)前的時(shí)代,數(shù)據(jù)延遲到達(dá)是非常常見的情況。在這種情況下,保證正確性的唯一方法是每小時(shí)重復(fù)處理最后幾個(gè)小時(shí)的數(shù)據(jù),這會(huì)嚴(yán)重?fù)p害整個(gè)生態(tài)系統(tǒng)的效率。想象下在數(shù)百個(gè)工作流中每小時(shí)重新處理TB級(jí)別的數(shù)據(jù)。
          Hudi可以很好的解決上述問題,其通過記錄粒度(而非文件夾或分區(qū))來消費(fèi)上游Hudi表?HU中的新數(shù)據(jù),下游的Hudi表?HD應(yīng)用處理邏輯并更新/協(xié)調(diào)延遲數(shù)據(jù),這里?HU和?HD可以以更頻繁的時(shí)間(例如15分鐘)連續(xù)進(jìn)行調(diào)度,并在?HD上提供30分鐘的端到端延遲。
          為了實(shí)現(xiàn)這一目標(biāo),Hudi從流處理框架如Spark Streaming、發(fā)布/訂閱系統(tǒng)如Kafka或數(shù)據(jù)庫復(fù)制技術(shù)如Oracle XStream中引入了類似概念。若感興趣可以在此處找到有關(guān)增量處理(與流處理和批處理相比)更多優(yōu)勢的更詳細(xì)說明。

          4. DFS上數(shù)據(jù)分發(fā)

          Hadoop的經(jīng)典應(yīng)用是處理數(shù)據(jù),然后將其分發(fā)到在線存儲(chǔ)以供應(yīng)用程序使用。例如使用Spark Pipeline將Hadoop的數(shù)據(jù)導(dǎo)入到ElasticSearch供Uber應(yīng)用程序使用。一種典型的架構(gòu)是在Hadoop和服務(wù)存儲(chǔ)之間使用隊(duì)列進(jìn)行解耦,以防止壓垮目標(biāo)服務(wù)存儲(chǔ),一般會(huì)選擇Kafka作為隊(duì)列,該架構(gòu)會(huì)導(dǎo)致相同數(shù)據(jù)冗余存儲(chǔ)在DFS(用于對計(jì)算結(jié)果進(jìn)行離線分析)和Kafka(用于分發(fā))上。
          Hudi可以通過以下方式再次有效地解決此問題:將Spark Pipeline 插入更新輸出到Hudi表,然后對表進(jìn)行增量讀?。ň拖馣afka主題一樣)以獲取新數(shù)據(jù)并寫入服務(wù)存儲(chǔ)中,即使用Hudi統(tǒng)一存儲(chǔ)。

          4.入門案例


          1、編譯

          github地址:https://github.com/apache/incubator-hudi
          cd incubator-hudi-hoodie-0.4.7mvn?clean?install?-DskipITs?-DskipTests?-Dhadoop.version=2.6.0-cdh5.13.0?-Dhive.version=1.1.0-cdh5.13.0

          2、快速開始

          1、新建項(xiàng)目

          新建maven項(xiàng)目,并加入scala框架,然后依次加入spark、hudi依賴
          <properties>   <scala.version>2.11scala.version>    <spark.version>2.4.0spark.version>    <parquet.version>1.10.1parquet.version>    <parquet-format-structures.version>1.10.1-palantir.3-2-gda7f810parquet-format-structures.version>    <hudi.version>0.4.7hudi.version>properties>
          <repositories> <repository> <id>clouderaid> <url>https://repository.cloudera.com/artifactory/cloudera-repos/url> repository>repositories>
          <dependencies> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-core_${scala.version}artifactId> <version>${spark.version}version> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-sql_${scala.version}artifactId> <version>${spark.version}version> <exclusions> <exclusion> <artifactId>parquet-columnartifactId> <groupId>org.apache.parquetgroupId> exclusion> exclusions> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-hive_${scala.version}artifactId> <version>${spark.version}version> dependency> <dependency> <groupId>com.databricksgroupId> <artifactId>spark-avro_${scala.version}artifactId> <version>4.0.0version> dependency> <dependency> <groupId>com.uber.hoodiegroupId> <artifactId>hoodie-commonartifactId> <version>${hudi.version}version> dependency>
          <dependency> <groupId>com.uber.hoodiegroupId> <artifactId>hoodie-hadoop-mrartifactId> <version>${hudi.version}version> dependency> <dependency> <groupId>com.uber.hoodiegroupId> <artifactId>hoodie-sparkartifactId> <version>${hudi.version}version> dependency> <dependency> <groupId>com.uber.hoodiegroupId> <artifactId>hoodie-hiveartifactId> <version>${hudi.version}version> dependency>
          <dependency> <groupId>com.uber.hoodiegroupId> <artifactId>hoodie-clientartifactId> <version>${hudi.version}version> dependency>
          <dependency> <groupId>org.apache.avrogroupId> <artifactId>avroartifactId> <version>1.7.7version> dependency> <dependency> <groupId>org.apache.parquetgroupId> <artifactId>parquet-avroartifactId> <version>${parquet.version}version> <exclusions> <exclusion> <artifactId>parquet-columnartifactId> <groupId>org.apache.parquetgroupId> exclusion> exclusions> dependency> <dependency> <groupId>org.apache.parquetgroupId> <artifactId>parquet-hadoopartifactId> <version>${parquet.version}version> <exclusions> <exclusion> <artifactId>parquet-columnartifactId> <groupId>org.apache.parquetgroupId> exclusion> exclusions> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-streaming_${scala.version}artifactId> <version>${spark.version}version> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-streaming-kafka-0-10_${scala.version}artifactId> <version>${spark.version}version> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-sql-kafka-0-10_${scala.version}artifactId> <version>${spark.version}version> dependency> <dependency> <groupId>com.alibabagroupId> <artifactId>fastjsonartifactId> <version>1.2.62version> dependency> <dependency> <groupId>org.apache.hivegroupId> <artifactId>hive-jdbcartifactId> <version>1.1.0-cdh5.13.0version> dependency>dependencies>

          2、插入數(shù)據(jù)

          準(zhǔn)備數(shù)據(jù),在本地文件目錄下新建如下文件:
          {"id":1,"name": "aaa","age": 10}{"id":2,"name": "bbb","age": 11}{"id":3,"name": "ccc","age": 12}{"id":4,"name": "ddd","age": 13}{"id":5,"name": "eee","age": 14}{"id":6,"name":?"fff","age":?15}
          構(gòu)建sparksession:
          val spark = SparkSession.builder .master("local") .appName("Demo2") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport()??????.getOrCreate
          hudi強(qiáng)制要求使用Kryo的序列化方式,所以初始化的時(shí)候需要添加該配置。
          讀取剛才保存的json文件:
          val?jsonData?=?spark.read.json("file:///Users/apple/Documents/project/study/hudi-study/source_data/insert.json")
          然后通過write方法寫入數(shù)據(jù):
          import com.uber.hoodie.config.HoodieWriteConfig._
          val tableName = "test_data"val basePath = "file:///Users/apple/Documents/project/study/hudi-study/hudi_data/" + tableName
          jsonData.write.format("com.uber.hoodie") .option("hoodie.upsert.shuffle.parallelism", "1") .option(PRECOMBINE_FIELD_OPT_KEY, "id") .option(RECORDKEY_FIELD_OPT_KEY, "id") .option(KEYGENERATOR_CLASS_OPT_KEY, "com.mbp.study.DayKeyGenerator") .option(TABLE_NAME, tableName) .mode(SaveMode.Overwrite)??????.save(basePath)

          3、查詢數(shù)據(jù)

          val jsonDataDf = spark.read.format("com.uber.hoodie").load(basePath + "/*/*")jsonDataDf.show(false)

          4、更新數(shù)據(jù)

          先創(chuàng)建需要更新的json數(shù)據(jù)集,數(shù)據(jù)如下:
          {"id":1,"name": "aaa","age": 20,"address": "a1"}{"id":2,"name": "bbb","age": 21,"address": "a1"}{"id":3,"name":?"ccc","age":?22,"address":?"a1"}
          然后讀取要更新的數(shù)據(jù),并執(zhí)行寫入操作:
          val updateJsonf = spark.read.json("/Users/apple/Documents/project/study/hudi-study/source_data/update.json")updateJsonf.write.format("com.uber.hoodie")  .option("hoodie.insert.shuffle.parallelism", "2")  .option("hoodie.upsert.shuffle.parallelism", "2")  .option(PRECOMBINE_FIELD_OPT_KEY, "id")  .option(RECORDKEY_FIELD_OPT_KEY, "id")  .option(TABLE_NAME, tableName)  .mode(SaveMode.Append)  .save(basePath)
          保存模式需要改為追加,每個(gè)寫操作都會(huì)生成一個(gè)新的由時(shí)間戳表示的commit。

          5、增量查詢

          Hudi還提供了獲取給定提交時(shí)間戳以來已更改的記錄流的功能。這可以通過使用Hudi的增量視圖并提供所需更改的開始時(shí)間來實(shí)現(xiàn)。如果我們需要給定提交之后的所有更改(這是常見的情況),則無需指定結(jié)束時(shí)間。
          val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50)val beginTime = commits(commits.length - 2) // commit time we are interested in
          // 增量查詢數(shù)據(jù)val incViewDF = spark. read. format("org.apache.hudi"). option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath);incViewDF.registerTempTable("hudi_incr_table")spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()
          這將提供在開始時(shí)間提交之后發(fā)生的所有更改,其中包含票價(jià)大于20.0的過濾器。關(guān)于此功能的獨(dú)特之處在于,它現(xiàn)在使您可以在批量數(shù)據(jù)上創(chuàng)作流式管道。

          6、特定時(shí)間點(diǎn)查詢

          可以通過將結(jié)束時(shí)間指向特定的提交時(shí)間,將開始時(shí)間指向”000”(表示最早的提交時(shí)間)來表示特定時(shí)間。
          val beginTime = "000" // Represents all commits > this time.val endTime = commits(commits.length - 2) // commit time we are interested in// 增量查詢數(shù)據(jù)val incViewDF = spark.read.format("org.apache.hudi").    option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).    option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).    option(END_INSTANTTIME_OPT_KEY, endTime).    load(basePath);incViewDF.registerTempTable("hudi_incr_table")spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_incr_table where fare > 20.0").show()

          7、同步到Hive

          要想hive可以查詢到該表,需要以下兩部分操作:
          1、寫數(shù)據(jù)的時(shí)候設(shè)置同步hive
          jsonData.write.format("com.uber.hoodie")      .option("hoodie.upsert.shuffle.parallelism", "1")      .option(HIVE_PARTITION_FIELDS_OPT_KEY, "etl_tx_dt")      .option(HIVE_URL_OPT_KEY, "jdbc:hive2://xxx.xxx.xxx.xxx:10000")      .option(HIVE_USER_OPT_KEY, "hive")      .option(HIVE_PASS_OPT_KEY, "123")      .option(HIVE_DATABASE_OPT_KEY, "test")      .option(HIVE_SYNC_ENABLED_OPT_KEY, true)      .option(HIVE_TABLE_OPT_KEY, tableName)      .option(PRECOMBINE_FIELD_OPT_KEY, "id")      .option(RECORDKEY_FIELD_OPT_KEY, "id")      .option(TABLE_NAME, tableName)      .mode(SaveMode.Append)      .save(basePath)

          2、在hive集群中上傳hudi所需的jar包
          hoodie-hadoop-mr-0.4.7.jarhoodie-common-0.4.7.jar
          然后再hive中直接執(zhí)行查詢即可。

          版權(quán)聲明:

          本文為大數(shù)據(jù)技術(shù)與架構(gòu)整理,原作者獨(dú)家授權(quán)。未經(jīng)原作者允許轉(zhuǎn)載追究侵權(quán)責(zé)任。
          編輯|冷眼丶
          微信公眾號(hào)|import_bigdata


          歡迎點(diǎn)贊+收藏+轉(zhuǎn)發(fā)朋友圈素質(zhì)三連


          文章不錯(cuò)?點(diǎn)個(gè)【在看】吧!??

          瀏覽 66
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  中文字幕日韩成人 | 日本插逼视频 | 亚洲成人在线观看视频 | 一级特黄录像免费看 | 人人摸天天干 |