<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Hudi 原理 | 一文徹底掌握 Apache Hudi 異步 Clustering 部署

          共 8453字,需瀏覽 17分鐘

           ·

          2021-09-17 12:27

          1. 摘要

          在之前的一篇博客中,我們介紹了Clustering(聚簇)的表服務(wù)來(lái)重新組織數(shù)據(jù)來(lái)提供更好的查詢性能,而不用降低攝取速度,并且我們已經(jīng)知道如何部署同步Clustering,本篇博客中,我們將討論近期社區(qū)做的一些改進(jìn)以及如何通過(guò)HoodieClusteringJobDeltaStreamer工具來(lái)部署異步Clustering

          2. 介紹

          通常講,Clustering根據(jù)可配置的策略創(chuàng)建一個(gè)計(jì)劃,根據(jù)特定規(guī)則對(duì)符合條件的文件進(jìn)行分組,然后執(zhí)行該計(jì)劃。Hudi支持并發(fā)寫入[1],并在多個(gè)表服務(wù)之間提供快照隔離,從而允許寫入程序在后臺(tái)運(yùn)行Clustering時(shí)繼續(xù)攝取。有關(guān)Clustering的體系結(jié)構(gòu)的更詳細(xì)概述請(qǐng)查看上一篇博文。

          3. Clustering策略

          如前所述Clustering計(jì)劃和執(zhí)行取決于可插拔的配置策略。這些策略大致可分為三類:計(jì)劃策略執(zhí)行策略更新策略。

          3.1 計(jì)劃策略

          該策略在創(chuàng)建Clustering計(jì)劃時(shí)發(fā)揮作用。它有助于決定應(yīng)該對(duì)哪些文件組進(jìn)行Clustering。讓我們看一下Hudi提供的不同計(jì)劃策略。請(qǐng)注意,使用此配置[2]可以輕松地插拔這些策略。

          ?SparkSizeBasedClusteringPlanStrategy:根據(jù)基本文件的小文件限制[3]選擇文件切片并創(chuàng)建Clustering組,最大大小為每個(gè)組允許的最大文件大小??梢允褂?span style="-webkit-tap-highlight-color: transparent;color: rgb(87, 107, 149);line-height: 1.75;">此配置[4]指定最大大小。此策略對(duì)于將中等大小的文件合并成大文件非常有用,以減少跨冷分區(qū)分布的大量文件。?SparkRecentDaysClusteringPlanStrategy:根據(jù)以前的N天分區(qū)創(chuàng)建一個(gè)計(jì)劃,將這些分區(qū)中的小文件片進(jìn)行Clustering,這是默認(rèn)策略,當(dāng)工作負(fù)載是可預(yù)測(cè)的并且數(shù)據(jù)是按時(shí)間劃分時(shí),它可能很有用。?SparkSelectedPartitionsClusteringPlanStrategy:如果只想對(duì)某個(gè)范圍內(nèi)的特定分區(qū)進(jìn)行Clustering,那么無(wú)論這些分區(qū)是新分區(qū)還是舊分區(qū),此策略都很有用,要使用此策略,還需要在下面設(shè)置兩個(gè)配置(包括開(kāi)始和結(jié)束分區(qū)):

            hoodie.clustering.plan.strategy.cluster.begin.partition  hoodie.clustering.plan.strategy.cluster.end.partition

          注意:所有策略都是分區(qū)感知的,后兩種策略仍然受到第一種策略的大小限制的約束。

          3.2 執(zhí)行策略

          在計(jì)劃階段構(gòu)建Clustering組后,Hudi主要根據(jù)排序列和大小為每個(gè)組應(yīng)用執(zhí)行策略,可以使用此配置[5]指定策略。

          SparkSortAndSizeExecutionStrategy是默認(rèn)策略。使用此配置進(jìn)行Clustering時(shí),用戶可以指定數(shù)據(jù)排序列。除此之外我們還可以為Clustering產(chǎn)生的Parquet文件設(shè)置最大文件大小[6]。該策略使用bulk_insert將數(shù)據(jù)寫入新文件,在這種情況下,Hudi隱式使用一個(gè)分區(qū)器,該分區(qū)器根據(jù)指定列進(jìn)行排序。通過(guò)這種策略改變數(shù)據(jù)布局,不僅提高了查詢性能,而且自動(dòng)平衡了重寫開(kāi)銷。

          現(xiàn)在該策略可以作為單個(gè)Spark作業(yè)或多個(gè)作業(yè)執(zhí)行,具體取決于在計(jì)劃階段創(chuàng)建的Clustering組的數(shù)量。默認(rèn)情況下Hudi將提交多個(gè)Spark作業(yè)并合并結(jié)果。如果要強(qiáng)制Hudi使用單Spark作業(yè),請(qǐng)將執(zhí)行策略類配置設(shè)置為SingleSparkJobExecutionStrategy

          3.3 更新策略

          目前只能為未接收任何并發(fā)更新的表/分區(qū)調(diào)度Clustering。默認(rèn)情況下更新策略的配置設(shè)置為SparkRejectUpdateStrategy。如果某個(gè)文件組在Clustering期間有更新,則它將拒絕更新并引發(fā)異常。然而在某些用例中,更新是非常稀疏的,并且不涉及大多數(shù)文件組。簡(jiǎn)單拒絕更新的默認(rèn)策略似乎不公平。在這種用例中用戶可以將配置設(shè)置為SparkAllowUpdateStregy。

          我們討論了關(guān)鍵策略配置,下面列出了與Clustering相關(guān)的所有其他配置。在此列表中一些非常有用的配置包括:

          配置項(xiàng)解釋默認(rèn)值
          hoodie.clustering.async.enabled啟用在表上的異步運(yùn)行Clustering服務(wù)。false
          hoodie.clustering.async.max.commits通過(guò)指定應(yīng)觸發(fā)多少次提交來(lái)控制異步Clustering的頻率。4
          hoodie.clustering.preserve.commit.metadata重寫數(shù)據(jù)時(shí)保留現(xiàn)有的_hoodie_commit_time。這意味著用戶可以在Clustering數(shù)據(jù)上運(yùn)行增量查詢,而不會(huì)產(chǎn)生任何副作用。false

          4. 異步Clustering

          之前我們已經(jīng)了解了用戶如何設(shè)置同步Clustering[7]。此外用戶可以利用HoodiecClusteringJob[8]設(shè)置兩步異步Clustering。

          4.1 HoodieClusteringJob

          隨著Hudi版本0.9.0的發(fā)布,我們可以在同一步驟中調(diào)度和執(zhí)行Clustering。我們只需要指定-mode-m選項(xiàng)。有如下三種模式:

          ?schedule(調(diào)度):制定一個(gè)Clustering計(jì)劃。這提供了一個(gè)可以在執(zhí)行模式下傳遞的instant。?execute(執(zhí)行):在給定的instant執(zhí)行Clustering計(jì)劃,這意味著這里需要instant。?scheduleAndExecute(調(diào)度并執(zhí)行):首先制定Clustering計(jì)劃并立即執(zhí)行該計(jì)劃。

          請(qǐng)注意要在原始寫入程序仍在運(yùn)行時(shí)運(yùn)行作業(yè)請(qǐng)啟用多寫入:

          hoodie.write.concurrency.mode=optimistic_concurrency_controlhoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider

          使用spark submit命令提交HoodieClusteringJob示例如下:

          spark-submit \--class org.apache.hudi.utilities.HoodieClusteringJob \/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar \--props /path/to/config/clusteringjob.properties \--mode scheduleAndExecute \--base-path /path/to/hudi_table/basePath \--table-name hudi_table_schedule_clustering \--spark-memory 1g

          clusteringjob.properties配置文件示例如下

          hoodie.clustering.async.enabled=truehoodie.clustering.async.max.commits=4hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824hoodie.clustering.plan.strategy.small.file.limit=629145600hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategyhoodie.clustering.plan.strategy.sort.columns=column1,column2

          4.2 HoodieDeltaStreamer

          接著看下如何使用HudiDeltaStreamer?,F(xiàn)在我們可以使用DeltaStreamer觸發(fā)異步Clustering。只需將hoodie.clustering.async.enabledtrue,并在屬性文件中指定其他Clustering配置,在啟動(dòng)Deltastreamer時(shí)可以將其位置設(shè)為-props(與HoodieClusteringJob配置類似)。

          使用spark submit命令提交HoodieDeltaStreamer示例如下:

          spark-submit \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar \--props /path/to/config/clustering_kafka.properties \--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \--source-ordering-field impresssiontime \--table-type COPY_ON_WRITE \--target-base-path /path/to/hudi_table/basePath \--target-table impressions_cow_cluster \--op INSERT \--hoodie-conf hoodie.clustering.async.enabled=true \--continuous

          4.3 Spark Structured Streaming

          我們還可以使用Spark結(jié)構(gòu)化流啟用異步Clustering,如下所示。

          val commonOpts = Map(   "hoodie.insert.shuffle.parallelism" -> "4",   "hoodie.upsert.shuffle.parallelism" -> "4",   DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",   DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",   DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",   HoodieWriteConfig.TBL_NAME.key -> "hoodie_test")def getAsyncClusteringOpts(isAsyncClustering: String,                            clusteringNumCommit: String,                            executionStrategy: String):Map[String, String] = {   commonOpts + (DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering,           HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit,           HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME.key -> executionStrategy   )}def initStreamingWriteFuture(hudiOptions: Map[String, String]): Future[Unit] = {   val streamingInput = // define the source of streaming   Future {      println("streaming starting")      streamingInput              .writeStream              .format("org.apache.hudi")              .options(hudiOptions)              .option("checkpointLocation", basePath + "/checkpoint")              .mode(Append)              .start()              .awaitTermination(10000)      println("streaming ends")   }}def structuredStreamingWithClustering(): Unit = {   val df = //generate data frame   val hudiOptions = getClusteringOpts("true", "1", "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy")   val f1 = initStreamingWriteFuture(hudiOptions)   Await.result(f1, Duration.Inf)}

          5. 總結(jié)和未來(lái)工作

          在這篇文章中,我們討論了不同的Clustering策略以及如何設(shè)置異步Clustering。未來(lái)的工作包括:

          ?Clustering支持更新。?支持Clustering的CLI工具。

          另外Flink支持Clustering已經(jīng)有相應(yīng)Pull Request[9],有興趣的小伙伴可以關(guān)注該P(yáng)R。

          可以查看JIRA[10]了解更多關(guān)于此問(wèn)題的開(kāi)發(fā),我們期待社會(huì)各界的貢獻(xiàn),希望你喜歡這個(gè)博客!

          引用鏈接

          [1] 并發(fā)寫入: https://hudi.apache.org/docs/concurrency_control#enabling
          [2] 此配置: http://hudi.apache.org/docs/next/configurations#hoodieclusteringplanstrategyclass
          [3] 小文件限制: http://hudi.apache.org/docs/next/configurations/#hoodieclusteringplanstrategysmallfilelimit
          [4] 此配置: http://hudi.apache.org/docs/next/configurations/#hoodieclusteringplanstrategymaxbytespergroup
          [5] 此配置: http://hudi.apache.org/docs/next/configurations/#hoodieclusteringexecutionstrategyclass
          [6] 最大文件大小: http://hudi.apache.org/docs/next/configurations/#hoodieparquetmaxfilesize
          [7] 同步Clusteringhttp://hudi.apache.org/blog/2021/01/27/hudi-clustering-intro#setting-up-clustering
          [8] HoodiecClusteringJobhttps://cwiki.apache.org/confluence/display/HUDI/RFC+-+19+Clustering+data+for+freshness+and+query+performance#RFC19Clusteringdataforfreshnessandqueryperformance-SetupforAsyncclusteringJob
          [9] Pull Request: https://github.com/apache/hudi/pull/3599
          [10] JIRA: https://issues.apache.org/jira/browse/HUDI-1042


          推薦閱讀

          使用 Flink Hudi 構(gòu)建流式數(shù)據(jù)湖

          Apache Hudi內(nèi)核之文件標(biāo)記機(jī)制深入解析

          Apache Hudi助力Uber低成本構(gòu)建開(kāi)源大數(shù)據(jù)平臺(tái)

          Apache Hudi 0.9.0版本重磅發(fā)布!更強(qiáng)大的流式數(shù)據(jù)湖平臺(tái)

          更進(jìn)一步節(jié)省空間!Apache Hudi支持虛擬鍵

          引用鏈接

          [1] 并發(fā)寫入: https://hudi.apache.org/docs/concurrency_control#enabling
          [2] 此配置: http://hudi.apache.org/docs/next/configurations#hoodieclusteringplanstrategyclass
          [3] 小文件限制: http://hudi.apache.org/docs/next/configurations/#hoodieclusteringplanstrategysmallfilelimit
          [4] 此配置: http://hudi.apache.org/docs/next/configurations/#hoodieclusteringplanstrategymaxbytespergroup
          [5] 此配置: http://hudi.apache.org/docs/next/configurations/#hoodieclusteringexecutionstrategyclass
          [6] 最大文件大小: http://hudi.apache.org/docs/next/configurations/#hoodieparquetmaxfilesize
          [7] 同步Clusteringhttp://hudi.apache.org/blog/2021/01/27/hudi-clustering-intro#setting-up-clustering
          [8] HoodiecClusteringJobhttps://cwiki.apache.org/confluence/display/HUDI/RFC+-+19+Clustering+data+for+freshness+and+query+performance#RFC19Clusteringdataforfreshnessandqueryperformance-SetupforAsyncclusteringJob
          [9] Pull Request: https://github.com/apache/hudi/pull/3599
          [10] JIRA: https://issues.apache.org/jira/browse/HUDI-1042


          瀏覽 133
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  精品人人妻人人澡人人爽牛牛 | 人人妻人人草 | 精品免费囯产一区二区三区四区视频 | 国产免费又粗又大又硬又爽视频 | 黄色视频在线免费观看视频 |