Apache Hudi:說出來你可能不信,你的ETL任務(wù)慢如狗
點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”


1.簡介
Update/Delete記錄:Hudi使用細(xì)粒度的文件/記錄級(jí)別索引來支持Update/Delete記錄,同時(shí)還提供寫操作的事務(wù)保證。查詢會(huì)處理最后一個(gè)提交的快照,并基于此輸出結(jié)果。
變更流:Hudi對獲取數(shù)據(jù)變更提供了一流的支持:可以從給定的時(shí)間點(diǎn)獲取給定表中已updated/inserted/deleted的所有記錄的增量流,并解鎖新的查詢姿勢(類別)。
2. 基本概念
寫時(shí)復(fù)制(copy on write):僅使用列式文件(parquet)存儲(chǔ)數(shù)據(jù)。在寫入/更新數(shù)據(jù)時(shí),直接同步合并原文件,生成新版本的基文件(需要重寫整個(gè)列數(shù)據(jù)文件,即使只有一個(gè)字節(jié)的新數(shù)據(jù)被提交)。此存儲(chǔ)類型下,寫入數(shù)據(jù)非常昂貴,而讀取的成本沒有增加,所以適合頻繁讀的工作負(fù)載,因?yàn)閿?shù)據(jù)集的最新版本在列式文件中始終可用,以進(jìn)行高效的查詢。
讀時(shí)合并(merge on read):使用列式(parquet)與行式(avro)文件組合,進(jìn)行數(shù)據(jù)存儲(chǔ)。在更新記錄時(shí),更新到增量文件中(avro),然后進(jìn)行異步(或同步)的compaction,創(chuàng)建列式文件(parquet)的新版本。此存儲(chǔ)類型適合頻繁寫的工作負(fù)載,因?yàn)樾掠涗浭且詀ppending 的模式寫入增量文件中。但是在讀取數(shù)據(jù)集時(shí),需要將增量文件與舊文件進(jìn)行合并,生成列式文件。
讀優(yōu)化視圖(Read Optimized view):直接query 基文件(數(shù)據(jù)集的最新快照),也就是列式文件(如parquet)。相較于非Hudi列式數(shù)據(jù)集,有相同的列式查詢性能
增量視圖(Incremental View):僅query新寫入數(shù)據(jù)集的文件,也就是指定一個(gè)commit/compaction,query此之后的新數(shù)據(jù)。
實(shí)時(shí)視圖(Real-time View):query最新基文件與增量文件。此視圖通過將最新的基文件(parquet)與增量文件(avro)進(jìn)行動(dòng)態(tài)合并,然后進(jìn)行query??梢蕴峁┙鼘?shí)時(shí)的數(shù)據(jù)(會(huì)有幾分鐘的延遲)

1.近實(shí)時(shí)攝取
2. 近實(shí)時(shí)分析
3. 增量處理管道
4. DFS上數(shù)據(jù)分發(fā)
1、編譯
cd incubator-hudi-hoodie-0.4.7mvn?clean?install?-DskipITs?-DskipTests?-Dhadoop.version=2.6.0-cdh5.13.0?-Dhive.version=1.1.0-cdh5.13.0
2、快速開始
1、新建項(xiàng)目
<properties><scala.version>2.11scala.version><spark.version>2.4.0spark.version><parquet.version>1.10.1parquet.version><parquet-format-structures.version>1.10.1-palantir.3-2-gda7f810parquet-format-structures.version><hudi.version>0.4.7hudi.version>properties><repositories><repository><id>clouderaid><url>https://repository.cloudera.com/artifactory/cloudera-repos/url>repository>repositories><dependencies><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-core_${scala.version}artifactId><version>${spark.version}version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-sql_${scala.version}artifactId><version>${spark.version}version><exclusions><exclusion><artifactId>parquet-columnartifactId><groupId>org.apache.parquetgroupId>exclusion>exclusions>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-hive_${scala.version}artifactId><version>${spark.version}version>dependency><dependency><groupId>com.databricksgroupId><artifactId>spark-avro_${scala.version}artifactId><version>4.0.0version>dependency><dependency><groupId>com.uber.hoodiegroupId><artifactId>hoodie-commonartifactId><version>${hudi.version}version>dependency><dependency><groupId>com.uber.hoodiegroupId><artifactId>hoodie-hadoop-mrartifactId><version>${hudi.version}version>dependency><dependency><groupId>com.uber.hoodiegroupId><artifactId>hoodie-sparkartifactId><version>${hudi.version}version>dependency><dependency><groupId>com.uber.hoodiegroupId><artifactId>hoodie-hiveartifactId><version>${hudi.version}version>dependency><dependency><groupId>com.uber.hoodiegroupId><artifactId>hoodie-clientartifactId><version>${hudi.version}version>dependency><dependency><groupId>org.apache.avrogroupId><artifactId>avroartifactId><version>1.7.7version>dependency><dependency><groupId>org.apache.parquetgroupId><artifactId>parquet-avroartifactId><version>${parquet.version}version><exclusions><exclusion><artifactId>parquet-columnartifactId><groupId>org.apache.parquetgroupId>exclusion>exclusions>dependency><dependency><groupId>org.apache.parquetgroupId><artifactId>parquet-hadoopartifactId><version>${parquet.version}version><exclusions><exclusion><artifactId>parquet-columnartifactId><groupId>org.apache.parquetgroupId>exclusion>exclusions>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-streaming_${scala.version}artifactId><version>${spark.version}version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-streaming-kafka-0-10_${scala.version}artifactId><version>${spark.version}version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-sql-kafka-0-10_${scala.version}artifactId><version>${spark.version}version>dependency><dependency><groupId>com.alibabagroupId><artifactId>fastjsonartifactId><version>1.2.62version>dependency><dependency><groupId>org.apache.hivegroupId><artifactId>hive-jdbcartifactId><version>1.1.0-cdh5.13.0version>dependency>dependencies>
2、插入數(shù)據(jù)
{"id":1,"name": "aaa","age": 10}{"id":2,"name": "bbb","age": 11}{"id":3,"name": "ccc","age": 12}{"id":4,"name": "ddd","age": 13}{"id":5,"name": "eee","age": 14}{"id":6,"name":?"fff","age":?15}
val spark = SparkSession.builder.master("local").appName("Demo2").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").enableHiveSupport()??????.getOrCreate
讀取剛才保存的json文件:
val?jsonData?=?spark.read.json("file:///Users/apple/Documents/project/study/hudi-study/source_data/insert.json")
import com.uber.hoodie.config.HoodieWriteConfig._val tableName = "test_data"val basePath = "file:///Users/apple/Documents/project/study/hudi-study/hudi_data/" + tableNamejsonData.write.format("com.uber.hoodie").option("hoodie.upsert.shuffle.parallelism", "1").option(PRECOMBINE_FIELD_OPT_KEY, "id").option(RECORDKEY_FIELD_OPT_KEY, "id").option(KEYGENERATOR_CLASS_OPT_KEY, "com.mbp.study.DayKeyGenerator").option(TABLE_NAME, tableName).mode(SaveMode.Overwrite)??????.save(basePath)
3、查詢數(shù)據(jù)
val jsonDataDf = spark.read.format("com.uber.hoodie").load(basePath + "/*/*")jsonDataDf.show(false)
4、更新數(shù)據(jù)
{"id":1,"name": "aaa","age": 20,"address": "a1"}{"id":2,"name": "bbb","age": 21,"address": "a1"}{"id":3,"name":?"ccc","age":?22,"address":?"a1"}
val updateJsonf = spark.read.json("/Users/apple/Documents/project/study/hudi-study/source_data/update.json")updateJsonf.write.format("com.uber.hoodie").option("hoodie.insert.shuffle.parallelism", "2").option("hoodie.upsert.shuffle.parallelism", "2").option(PRECOMBINE_FIELD_OPT_KEY, "id").option(RECORDKEY_FIELD_OPT_KEY, "id").option(TABLE_NAME, tableName).mode(SaveMode.Append).save(basePath)
5、增量查詢
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50)val beginTime = commits(commits.length - 2) // commit time we are interested in// 增量查詢數(shù)據(jù)val incViewDF = spark.read.format("org.apache.hudi").option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).load(basePath);incViewDF.registerTempTable("hudi_incr_table")spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()
6、特定時(shí)間點(diǎn)查詢
val beginTime = "000" // Represents all commits > this time.val endTime = commits(commits.length - 2) // commit time we are interested in// 增量查詢數(shù)據(jù)val incViewDF = spark.read.format("org.apache.hudi").option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).option(END_INSTANTTIME_OPT_KEY, endTime).load(basePath);incViewDF.registerTempTable("hudi_incr_table")spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()
7、同步到Hive
1、寫數(shù)據(jù)的時(shí)候設(shè)置同步hive
jsonData.write.format("com.uber.hoodie").option("hoodie.upsert.shuffle.parallelism", "1").option(HIVE_PARTITION_FIELDS_OPT_KEY, "etl_tx_dt").option(HIVE_URL_OPT_KEY, "jdbc:hive2://xxx.xxx.xxx.xxx:10000").option(HIVE_USER_OPT_KEY, "hive").option(HIVE_PASS_OPT_KEY, "123").option(HIVE_DATABASE_OPT_KEY, "test").option(HIVE_SYNC_ENABLED_OPT_KEY, true).option(HIVE_TABLE_OPT_KEY, tableName).option(PRECOMBINE_FIELD_OPT_KEY, "id").option(RECORDKEY_FIELD_OPT_KEY, "id").option(TABLE_NAME, tableName).mode(SaveMode.Append).save(basePath)

hoodie-hadoop-mr-0.4.7.jarhoodie-common-0.4.7.jar

版權(quán)聲明:
文章不錯(cuò)?點(diǎn)個(gè)【在看】吧!??




