<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Hudi 源碼 | Hudi Insert 源碼剖析(整體流程)

          共 41944字,需瀏覽 84分鐘

           ·

          2022-11-18 18:17

          前言

          Apache Hudi Insert源碼分析總結(jié),以Java Client為例,不了解Hudi Java Client的可以參考:Hudi Java Client總結(jié)|讀取Hive寫Hudi代碼示例

          以Java Client為例的原因:1、自己生產(chǎn)上用的Java Client,相比于Spark客戶端更熟悉一點(diǎn)。
          2、Java Client和Spark、Flink客戶端核心邏輯是一樣的。不同的是比如Spark的入口是DF和SQL,多了一層API封裝。
          3、Java Client更貼近源碼,可以直接分析核心邏輯。不用剖析Spark、Flink源碼。對(duì)Sprk、Flink源碼不熟悉的更容易上手。
          4、等分析完Java Client源碼后,有時(shí)間的話我會(huì)再總結(jié)一下Spark客戶端的源碼,這樣大家會(huì)更容易理解。

          版本

          Hudi 0.9.0

          備注:其實(shí)每個(gè)版本核心代碼都差不多,之所以使用0.9.0,一個(gè)是因?yàn)閷?duì)于Java Client,我用0.9.0用的比較多,相比于使用最新版可以節(jié)省不少時(shí)間,另一個(gè)原因是,之前總結(jié)的Java Client的源碼也是基于0.9.0。比如Hudi Clean Policy 清理策略實(shí)現(xiàn)分析Hudi Clean 清理文件實(shí)現(xiàn)分析

          initTable

          首先是通過initTable初始化Hudi表,可以看出來主要就是根據(jù)我們配置的一些參數(shù),創(chuàng)建.hoodie元數(shù)據(jù)目錄,然后將這些參數(shù)持久化到hoodier.properties文件中,具體的細(xì)節(jié)可以自己研究。

              public HoodieTableMetaClient initTable(Configuration configuration, String basePath)
                  throws IOException 
          {
                return HoodieTableMetaClient.initTableAndGetMetaClient(configuration, basePath, build());
              }

            /**
             * Helper method to initialize a given path as a hoodie table with configs passed in as Properties.
             *
             * @return Instance of HoodieTableMetaClient
             */

            public static HoodieTableMetaClient initTableAndGetMetaClient(Configuration hadoopConf, String basePath,
                Properties props)
           throws IOException 
          {
              LOG.info("Initializing " + basePath + " as hoodie table " + basePath);
              Path basePathDir = new Path(basePath);
              final FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
              if (!fs.exists(basePathDir)) {
                fs.mkdirs(basePathDir);
              }
              Path metaPathDir = new Path(basePath, METAFOLDER_NAME);
              if (!fs.exists(metaPathDir)) {
                fs.mkdirs(metaPathDir);
              }

              // if anything other than default archive log folder is specified, create that too
              String archiveLogPropVal = new HoodieConfig(props).getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER);
              if (!StringUtils.isNullOrEmpty(archiveLogPropVal)) {
                Path archiveLogDir = new Path(metaPathDir, archiveLogPropVal);
                if (!fs.exists(archiveLogDir)) {
                  fs.mkdirs(archiveLogDir);
                }
              }

              // Always create temporaryFolder which is needed for finalizeWrite for Hoodie tables
              final Path temporaryFolder = new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME);
              if (!fs.exists(temporaryFolder)) {
                fs.mkdirs(temporaryFolder);
              }

              // Always create auxiliary folder which is needed to track compaction workloads (stats and any metadata in future)
              final Path auxiliaryFolder = new Path(basePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME);
              if (!fs.exists(auxiliaryFolder)) {
                fs.mkdirs(auxiliaryFolder);
              }

              initializeBootstrapDirsIfNotExists(hadoopConf, basePath, fs);
              HoodieTableConfig.createHoodieProperties(fs, metaPathDir, props);
              // We should not use fs.getConf as this might be different from the original configuration
              // used to create the fs in unit tests
              HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
              LOG.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType() + " from " + basePath);
              return metaClient;
            }    

          HoodieWriteConfig

          這里的配置是寫數(shù)據(jù)時(shí)使用的配置,上面initTable的配置是持久化文件的配置,當(dāng)然這倆配置要保持一致(實(shí)際上Spark客戶端就是保持一致的)。可以看到有Schema、表名、payload、索引、clean、文件大小等一些參數(shù)。熟悉這些參數(shù)后就可以進(jìn)行調(diào)優(yōu)了。

                      Properties indexProperties = new Properties();
                      indexProperties.put(BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.key(), 150000); // 1000萬總體時(shí)間提升1分鐘
                      HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
                              .withSchema(writeSchema.toString())
                              .withParallelism(parallelism, parallelism).withDeleteParallelism(parallelism)
                              .forTable(tableName)
                              .withWritePayLoad(payloadClassName)
                              .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(orderingField).build())
                              .withIndexConfig(HoodieIndexConfig.newBuilder()
                                      .withIndexType(HoodieIndex.IndexType.BLOOM)
          //                            .bloomIndexPruneByRanges(false) // 1000萬總體時(shí)間提升1分鐘
                                      .bloomFilterFPP(0.000001)   // 1000萬總體時(shí)間提升3分鐘
                                      .fromProperties(indexProperties)
                                      .build())
                              .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(150200)
                                      .compactionSmallFileSize(Long.parseLong(smallFileLimit))
                                      .approxRecordSize(Integer.parseInt(recordSizeEstimate))
                                      .retainCommits(100).build())
                              .withStorageConfig(HoodieStorageConfig.newBuilder()
                                      .parquetMaxFileSize(Long.parseLong(maxFileSize)).build())
                              .build();

          HoodieJavaWriteClient

          創(chuàng)建writeClient

          HoodieJavaWriteClient writeClient = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg)

          startCommit

          String newCommitTime = writeClient.startCommit();

          具體的實(shí)現(xiàn)在其父類AbstractHoodieWriteClient中。首先調(diào)用rollbackFailedWrites執(zhí)行rollback操作,關(guān)于rollback分析本文先不講。然后通過HoodieActiveTimeline.createNewInstantTime()創(chuàng)建一個(gè)新的instantTime。最后創(chuàng)建metaClient,通過metaClient.getActiveTimeline().createNewInstant生成.commit.request文件

            public String startCommit() {
              // 首先調(diào)用rollbackFailedWrites執(zhí)行rollback操作
              CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
                  HoodieTimeline.COMMIT_ACTION, () -> rollbackFailedWrites());
              // 生成新的instantTime
              String instantTime = HoodieActiveTimeline.createNewInstantTime();
              // 創(chuàng)建metaClient
              HoodieTableMetaClient metaClient = createMetaClient(true);
              startCommit(instantTime, metaClient.getCommitActionType(), metaClient);
              return instantTime;
            }
            private void startCommit(String instantTime, String actionType, HoodieTableMetaClient metaClient) {
              LOG.info("Generate a new instant time: " + instantTime + " action: " + actionType);
              // if there are pending compactions, their instantTime must not be greater than that of this instant time
              metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending ->
                  ValidationUtils.checkArgument(
                      HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), HoodieTimeline.LESSER_THAN, instantTime),
                  "Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :"
                      + latestPending + ",  Ingesting at " + instantTime));
              if (config.getFailedWritesCleanPolicy().isLazy()) {
                this.heartbeatClient.start(instantTime);
              }
              // 創(chuàng)建.commit.request
              metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, actionType,
                  instantTime));
            }  

          generateRecord

          主要是構(gòu)造writeClient寫數(shù)據(jù)所需的數(shù)據(jù)結(jié)構(gòu)writeRecords:List<hoodierecord>,具體的可以參考我之前分享的文章。</hoodierecord

          client.insert(writeRecords, newCommitTime)

          首先獲取table,這里返回HoodieJavaCopyOnWriteTable,接著驗(yàn)證一下schema和歷史數(shù)據(jù)的兼容性。然后通過preWrite執(zhí)行寫之前的一些步驟,比如設(shè)置操作類型,接著調(diào)用table.insert執(zhí)行完整的寫數(shù)據(jù)操作,返回result。最后調(diào)用postWrite執(zhí)行archive、clean等操作返回WriteStatuses。

            public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {
              // 首先獲取table,這里的table為HoodieJavaCopyOnWriteTable
              HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
                  getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
              // 驗(yàn)證schema
              table.validateUpsertSchema();
              // 寫之前的一些步驟,比如設(shè)置操作類型
              preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
              // 調(diào)用table.insert執(zhí)行寫數(shù)據(jù)操作,返回result
              HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, instantTime, records);
              if (result.getIndexLookupDuration().isPresent()) {
                metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
              }
              // 調(diào)用postWrite返回WriteStatuses
              return postWrite(result, instantTime, table);
            }

          postWrite

          我們先看一下postWrite的邏輯,首先判斷是否已經(jīng)commit生成了.commit文件,如果是的話,則執(zhí)行archive、clean,也就是archive、clean等操作是在寫操作完成、生成.commit文件之后進(jìn)行的。

            /**
             * 判斷是否已經(jīng)commit生成了.commit文件,如果是的話,則執(zhí)行archive、clean
             */

            protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result,
                                                  String instantTime,
                                                  HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable)
           
          {
              if (result.getIndexLookupDuration().isPresent()) {
                metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
              }
              // commit是否已經(jīng)提交,這里主要考慮是否設(shè)置了自動(dòng)提交,hoodie.auto.commit默認(rèn)true
              // 如果不是自動(dòng)提交的話,那么我們需要手動(dòng)執(zhí)行clean等操作,然后手動(dòng)commit
              // 所以這里默認(rèn)為true
              // isCommitted代表著已經(jīng)生成了.commit文件,也就是寫操作成功了,也就是通過table.insert已經(jīng)完成了整個(gè)的寫操作
              if (result.isCommitted()) {
                // Perform post commit operations.
                if (result.getFinalizeDuration().isPresent()) {
                  metrics.updateFinalizeWriteMetrics(result.getFinalizeDuration().get().toMillis(),
                      result.getWriteStats().get().size());
                }
                // postCommit主要是執(zhí)行archive、clean等操作。也就是archive、clean等操作是在寫操作完成,生成.commit文件之后進(jìn)行的。
                postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty());

                emitCommitMetrics(instantTime, result.getCommitMetadata().get(), hoodieTable.getMetaClient().getCommitActionType());
              }
              return result.getWriteStatuses();
            }

          table.insert

          先調(diào)用JavaInsertCommitActionExecutor.execute接著調(diào)用JavaWriteHelper.newInstance().write

            public HoodieWriteMetadata<List<WriteStatus>> insert(HoodieEngineContext context,
                                                                 String instantTime,
                                                                 List<HoodieRecord<T>> records) {
              return new JavaInsertCommitActionExecutor<>(context, config,
                  this, instantTime, records).execute();
            }

            public HoodieWriteMetadata<List<WriteStatus>> execute() {
              return JavaWriteHelper.newInstance().write(instantTime, inputRecords, context, table,
                  config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), thisfalse);
            }

          JavaWriteHelper.write

          它的write方法是在其父類AbstractWriteHelper中實(shí)現(xiàn)的,首先首先判斷是否需要去重(通過配置項(xiàng)hoodie.combine.before.insert配置是否需要去重),insert默認(rèn)不需要去重(upsert/delete默認(rèn)需要)。如果需要去重的話調(diào)用方法combineOnCondition先進(jìn)行去重。
          然后判斷是否需要tag, tag的作用主要是利用文件中保存的索引信息(默認(rèn)布隆索引),判斷records中的數(shù)據(jù)哪些是新增數(shù)據(jù),哪些是更新數(shù)據(jù),對(duì)于更新的數(shù)據(jù),還要添加上對(duì)應(yīng)的文件位置信息,方便后面更新時(shí)查找對(duì)應(yīng)的parquet文件。由于這里為insert所以不需要tag,這也是insert和upsert一個(gè)比較大的區(qū)別。
          我們后面分析upsert源碼時(shí),會(huì)專門分析tag怎么實(shí)現(xiàn)的,本文先略過。然后通過調(diào)用executor.execute執(zhí)行寫操作,返回result,這里的executor為JavaInsertCommitActionExecutor。

            public HoodieWriteMetadata<O> write(String instantTime,
                                                I inputRecords,
                                                HoodieEngineContext context,
                                                HoodieTable<T, I, K, O> table,
                                                boolean shouldCombine,
                                                int shuffleParallelism,
                                                BaseCommitActionExecutor<T, I, K, O, R> executor,
                                                boolean performTagging)
           
          {
              try {
                // De-dupe/merge if needed
                // 如果開啟了去重,則先去重,insert默認(rèn)不去重
                // 配置項(xiàng)hoodie.combine.before.insert
                I dedupedRecords =
                    combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table);

                Instant lookupBegin = Instant.now();
                I taggedRecords = dedupedRecords;
                if (performTagging) { // 是否需要tag,insert為false
                  // perform index loop up to get existing location of records
                  // tag的作用主要是利用文件中保存的索引信息(默認(rèn)布隆索引),判斷records中的數(shù)據(jù)哪些是新增數(shù)據(jù),哪些是更新數(shù)據(jù)
                  // 對(duì)于更新的數(shù)據(jù),還要添加上對(duì)應(yīng)的文件位置信息,方便后面更新時(shí)查找對(duì)應(yīng)的parquet文件
                  taggedRecords = tag(dedupedRecords, context, table);
                }
                Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());

                // 通過調(diào)用executor.execute執(zhí)行寫操作,返回result。這里的executor為JavaInsertCommitActionExecutor
                HoodieWriteMetadata<O> result = executor.execute(taggedRecords);
                result.setIndexLookupDuration(indexLookupDuration);
                return result;
              } catch (Throwable e) {
                if (e instanceof HoodieUpsertException) {
                  throw (HoodieUpsertException) e;
                }
                throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
              }
            }

            public boolean shouldCombineBeforeInsert() {
              return getBoolean(COMBINE_BEFORE_INSERT);
            }

            public static final ConfigProperty<String> COMBINE_BEFORE_INSERT = ConfigProperty
                .key("hoodie.combine.before.insert")
                .defaultValue("false")
                .withDocumentation("When inserted records share same key, controls whether they should be first combined (i.e de-duplicated) before"
                    + " writing to storage.");

          JavaInsertCommitActionExecutor.execute

          JavaInsertCommitActionExecutor.execute實(shí)際上調(diào)用的父類BaseJavaCommitActionExecutorexecute
          首先通過buildProfile構(gòu)建WorkloadProfile,構(gòu)建WorkloadProfile的目的主要是為給getPartitioner使用。WorkloadProfile包含了分區(qū)路徑對(duì)應(yīng)的insert/upsert數(shù)量以及upsert數(shù)據(jù)對(duì)應(yīng)的文件位置信息。數(shù)量信息是為了分桶,或者說是為了分幾個(gè)文件,這里涉及了小文件合并、文件大小等原理,位置信息是為了獲取要更新的文件,也就是對(duì)應(yīng)的fileId。對(duì)于upsert數(shù)據(jù),我們復(fù)用原來的fileId。對(duì)于insert數(shù)據(jù),我們生成新的fileId,如果record數(shù)比較多,則分多個(gè)文件寫。然后將WorkloadProfile元數(shù)據(jù)信息持久化到.inflight文件中,.commit.request->.commit.inflight。這一步主要是為了mor表的rollback,rollback時(shí)可以從.inflight文件中讀取對(duì)應(yīng)的元數(shù)據(jù)信息。然后通過getPartitioner根據(jù)WorkloadProfile獲取partitioner,接著調(diào)用partition方法返回partitionedRecords(<桶號(hào),對(duì)應(yīng)的HoodieRecord>),一個(gè)桶對(duì)應(yīng)一個(gè)文件 fileId。最后再遍歷partitionedRecords,也就是每個(gè)桶執(zhí)行一次寫操作handleInsertPartition/handleUpsertPartition,最后調(diào)用BoundedInMemoryExecutor.execute,利用生產(chǎn)者消費(fèi)者模式寫數(shù)據(jù),關(guān)于如何通過生產(chǎn)者消費(fèi)者模式寫數(shù)據(jù),我已經(jīng)在Hudi源碼|bootstrap源碼分析總結(jié)(寫Hudi)分析過bootstrap的源碼了,原理一樣,不同的是實(shí)現(xiàn)類不一樣,感興趣的可以看看。

          關(guān)于tag(索引相關(guān))、WorkloadProfile、getPartitioner、handleInsertPartition/handleUpsertPartition本文講個(gè)大概,可能有不準(zhǔn)確的地方,大家可以先結(jié)合17張圖帶你徹底理解Hudi Upsert原理進(jìn)行學(xué)習(xí),至于具體的源碼分析,限于篇幅及個(gè)人精力,本文先不涉及,會(huì)放在后面的文章單獨(dú)講解,對(duì)于本文可能不準(zhǔn)確的地方,也會(huì)在后面的文章中更新。

            public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
              HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();

              WorkloadProfile profile = null;
              if (isWorkloadProfileNeeded()) { // 始終為true
                // 構(gòu)建WorkloadProfile,構(gòu)建WorkloadProfile的目的主要是為給getPartitioner使用
                // WorkloadProfile包含了分區(qū)路徑對(duì)應(yīng)的insert/upsert數(shù)量以及upsert數(shù)據(jù)對(duì)應(yīng)的文件位置信息
                // 數(shù)量信息是為了分桶,或者說是為了分幾個(gè)文件,這里涉及了小文件合并、文件大小等原理
                // 位置信息是為了獲取要更新的文件
                // 對(duì)于upsert數(shù)據(jù),我們復(fù)用原來的fileId
                // 對(duì)于insert數(shù)據(jù),我們生成新的fileId,如果record數(shù)比較多,則分多個(gè)文件寫
                profile = new WorkloadProfile(buildProfile(inputRecords));
                LOG.info("Workload profile :" + profile);
                try {
                  // 將WorkloadProfile元數(shù)據(jù)信息持久化到.inflight文件中,.commit.request->.commit.inflight.
                  // 這一步主要是為了mor表的rollback,rollback時(shí)可以從.inflight文件中讀取對(duì)應(yīng)的元數(shù)據(jù)信息
                  saveWorkloadProfileMetadataToInflight(profile, instantTime);
                } catch (Exception e) {
                  HoodieTableMetaClient metaClient = table.getMetaClient();
                  HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
                  try {
                    if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
                      throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
                    }
                  } catch (IOException ex) {
                    LOG.error("Check file exists failed");
                    throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
                  }
                }
              }


              // 根據(jù)WorkloadProfile獲取partitioner
              final Partitioner partitioner = getPartitioner(profile);
              // <桶號(hào),對(duì)應(yīng)的HoodieRecord>,一個(gè)桶對(duì)應(yīng)一個(gè)文件 fileId
              Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);

              List<WriteStatus> writeStatuses = new LinkedList<>();
              // forEach,每個(gè)桶執(zhí)行一次寫操作handleInsertPartition/handleUpsertPartition
              // 最終通過BoundedInMemoryExecutor.execute 生產(chǎn)者消費(fèi)者模式寫數(shù)據(jù)
              partitionedRecords.forEach((partition, records) -> {
                // 是否更新、刪除
                if (WriteOperationType.isChangingRecords(operationType)) {
                  handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
                } else {
                  handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
                }
              });
              updateIndex(writeStatuses, result);
              // commit生成.commit文件,.commit文件的生成標(biāo)記著寫數(shù)據(jù)的完成
              updateIndexAndCommitIfNeeded(writeStatuses, result);
              return result;
            }

          commit

          上面通過handleInsertPartition/handleUpsertPartition實(shí)際上已經(jīng)完成了數(shù)據(jù)的寫入。但是最后還要生成.commit元數(shù)據(jù)文件,代表一次commit的完成,否則如果沒有生成commit的話,比如只有.commit.request或者commit.inflight,這樣在查詢時(shí)候不會(huì)查到本地寫數(shù)據(jù)生成的文件,而且下次寫數(shù)據(jù)時(shí)會(huì)觸發(fā)rollback來處理。

          這里索引相關(guān)的先不看,commit調(diào)用鏈updateIndexAndCommitIfNeeded->commitOnAutoCommit->autoCommit->commit->commit,最終在BaseJavaCommitActionExecutor的commit方法中通過activeTimeline.saveAsComplete生成.commit文件。
          前面講過了,在生成.commit文件后,會(huì)調(diào)用postWrite方法觸發(fā)archive、clean等操作。實(shí)際上archive、clean等操作的失敗,不影響本次寫數(shù)據(jù)的成功。比如clean失敗了,可以下次再clean就可以了。所以當(dāng)commit完成后,如果clean失敗了,這樣對(duì)于有失敗機(jī)制的集成工具,比如我們使用的Apache NIFI,是不能將本批次數(shù)據(jù)放進(jìn)失敗隊(duì)列的。PS:當(dāng)本次commit不成功時(shí),我們需要放進(jìn)失敗隊(duì)列,目的是防止數(shù)據(jù)丟失。其實(shí)我們可以自己寫代碼繼承JavaClient類,將postWrite方法和table.insert分開。這樣便于判斷是寫數(shù)據(jù)失敗還是clean失敗,以后我會(huì)分享相關(guān)代碼實(shí)現(xiàn)。

            public void updateIndexAndCommitIfNeeded(List<WriteStatus> writeStatuses, HoodieWriteMetadata result) {
              Instant indexStartTime = Instant.now();
              // Update the index back
              List<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table);
              result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
              result.setWriteStatuses(statuses);
              result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(result));
              commitOnAutoCommit(result);
            }

            protected void commitOnAutoCommit(HoodieWriteMetadata result) {
              // validate commit action before committing result
              runPrecommitValidators(result);
              // validate commit action before committing result
              if (config.shouldAutoCommit()) {
                LOG.info("Auto commit enabled: Committing " + instantTime);
                autoCommit(extraMetadata, result);
              } else {
                LOG.info("Auto commit disabled for " + instantTime);
              }
            }

            protected void autoCommit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<O> result) {
              this.txnManager.beginTransaction(Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, instantTime)),
                  lastCompletedTxn.isPresent() ? Option.of(lastCompletedTxn.get().getLeft()) : Option.empty());
              try {
                TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
                    result.getCommitMetadata(), config, this.txnManager.getLastCompletedTransactionOwner());
                commit(extraMetadata, result);
              } finally {
                this.txnManager.endTransaction();
              }
            }

            // BaseJavaCommitActionExecutor
            protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result) {
              commit(extraMetadata, result, result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()));
            }

            protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result, List<HoodieWriteStat> writeStats) {
              String actionType = getCommitActionType();
              LOG.info("Committing " + instantTime + ", action Type " + actionType);
              result.setCommitted(true);
              result.setWriteStats(writeStats);
              // Finalize write
              finalizeWrite(instantTime, writeStats, result);
              try {
                LOG.info("Committing " + instantTime + ", action Type " + getCommitActionType());
                HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
                HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(),
                    extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());

                // 通過activeTimeline.saveAsComplete生成.commit文件
                activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime),
                    Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
                LOG.info("Committed " + instantTime);
                result.setCommitMetadata(Option.of(metadata));
              } catch (IOException e) {
                throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
                    e);
              }
            } 

          handleInsertPartition

          handleInsertPartition到生產(chǎn)者消費(fèi)者模式調(diào)用鏈,handleInsertPartition->handleUpsertPartition->handleInsert->JavaLazyInsertIterable.computeNext->BoundedInMemoryExecutor.execute

            protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr,
                                                                        Partitioner partitioner) {
              return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
            }

            protected Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr,
                                                                        Partitioner partitioner) {
              JavaUpsertPartitioner javaUpsertPartitioner = (JavaUpsertPartitioner) partitioner;
              BucketInfo binfo = javaUpsertPartitioner.getBucketInfo(partition);
              BucketType btype = binfo.bucketType;
              try {
                if (btype.equals(BucketType.INSERT)) {
                  return handleInsert(binfo.fileIdPrefix, recordItr);
                } else if (btype.equals(BucketType.UPDATE)) {
                  return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr);
                } else {
                  throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
                }
              } catch (Throwable t) {
                String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
                LOG.error(msg, t);
                throw new HoodieUpsertException(msg, t);
              }
            }

            public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr) {
              // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
              if (!recordItr.hasNext()) {
                LOG.info("Empty partition");
                return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
              }
              return new JavaLazyInsertIterable<>(recordItr, true, config, instantTime, table, idPfx,
                  taskContextSupplier, new CreateHandleFactory<>());
            }

            // JavaLazyInsertIterable
            protected List<WriteStatus> computeNext() {
              // Executor service used for launching writer thread.
              BoundedInMemoryExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor =
                  null;
              try {
                final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
                bufferedIteratorExecutor =
                    new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getTransformFunction(schema));
                // bufferedIteratorExecutor.execute通過生產(chǎn)者消費(fèi)者模型實(shí)現(xiàn)寫數(shù)據(jù)    
                final List<WriteStatus> result = bufferedIteratorExecutor.execute();
                assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
                return result;
              } catch (Exception e) {
                throw new HoodieException(e);
              } finally {
                if (null != bufferedIteratorExecutor) {
                  bufferedIteratorExecutor.shutdownNow();
                }
              }
            }  

          總結(jié)

          本文以Java Client為例,對(duì)Apache Hudi insert源碼進(jìn)行了整體邏輯的分析總結(jié),希望能對(duì)大家有所幫助。由于精力有限,對(duì)于文中提到的WorkloadProfile、getPartitioner、handleInsertPartition/handleUpsertPartition等,我會(huì)在后面的文章再進(jìn)行總結(jié)。并且等insert相關(guān)源碼分析完后,會(huì)再進(jìn)行upsert的源碼分析。可能有些地方不夠準(zhǔn)確,還請(qǐng)大家多多指正,讓我們共同進(jìn)步。

          注釋代碼

          github: https://github.com/dongkelun/hudi/tree/0.9.0-learning-comments
          gitee: https://gitee.com/dongkelun/hudi/tree/0.9.0-learning-comments

          相關(guān)閱讀

          瀏覽 132
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  天天夜夜av | 美女尿口无遮挡 | 动漫操逼网站 | www.97色色 | 超碰人人操在线 |