Hudi 源碼 | Hudi Insert 源碼剖析(整體流程)
前言
Apache Hudi Insert源碼分析總結(jié),以Java Client為例,不了解Hudi Java Client的可以參考:Hudi Java Client總結(jié)|讀取Hive寫Hudi代碼示例。
以Java Client為例的原因:1、自己生產(chǎn)上用的Java Client,相比于Spark客戶端更熟悉一點(diǎn)。
2、Java Client和Spark、Flink客戶端核心邏輯是一樣的。不同的是比如Spark的入口是DF和SQL,多了一層API封裝。
3、Java Client更貼近源碼,可以直接分析核心邏輯。不用剖析Spark、Flink源碼。對(duì)Sprk、Flink源碼不熟悉的更容易上手。
4、等分析完Java Client源碼后,有時(shí)間的話我會(huì)再總結(jié)一下Spark客戶端的源碼,這樣大家會(huì)更容易理解。
版本
Hudi 0.9.0
備注:其實(shí)每個(gè)版本核心代碼都差不多,之所以使用0.9.0,一個(gè)是因?yàn)閷?duì)于Java Client,我用0.9.0用的比較多,相比于使用最新版可以節(jié)省不少時(shí)間,另一個(gè)原因是,之前總結(jié)的Java Client的源碼也是基于0.9.0。比如Hudi Clean Policy 清理策略實(shí)現(xiàn)分析和Hudi Clean 清理文件實(shí)現(xiàn)分析
initTable
首先是通過initTable初始化Hudi表,可以看出來主要就是根據(jù)我們配置的一些參數(shù),創(chuàng)建.hoodie元數(shù)據(jù)目錄,然后將這些參數(shù)持久化到hoodier.properties文件中,具體的細(xì)節(jié)可以自己研究。
public HoodieTableMetaClient initTable(Configuration configuration, String basePath)
throws IOException {
return HoodieTableMetaClient.initTableAndGetMetaClient(configuration, basePath, build());
}
/**
* Helper method to initialize a given path as a hoodie table with configs passed in as Properties.
*
* @return Instance of HoodieTableMetaClient
*/
public static HoodieTableMetaClient initTableAndGetMetaClient(Configuration hadoopConf, String basePath,
Properties props) throws IOException {
LOG.info("Initializing " + basePath + " as hoodie table " + basePath);
Path basePathDir = new Path(basePath);
final FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
if (!fs.exists(basePathDir)) {
fs.mkdirs(basePathDir);
}
Path metaPathDir = new Path(basePath, METAFOLDER_NAME);
if (!fs.exists(metaPathDir)) {
fs.mkdirs(metaPathDir);
}
// if anything other than default archive log folder is specified, create that too
String archiveLogPropVal = new HoodieConfig(props).getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER);
if (!StringUtils.isNullOrEmpty(archiveLogPropVal)) {
Path archiveLogDir = new Path(metaPathDir, archiveLogPropVal);
if (!fs.exists(archiveLogDir)) {
fs.mkdirs(archiveLogDir);
}
}
// Always create temporaryFolder which is needed for finalizeWrite for Hoodie tables
final Path temporaryFolder = new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME);
if (!fs.exists(temporaryFolder)) {
fs.mkdirs(temporaryFolder);
}
// Always create auxiliary folder which is needed to track compaction workloads (stats and any metadata in future)
final Path auxiliaryFolder = new Path(basePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME);
if (!fs.exists(auxiliaryFolder)) {
fs.mkdirs(auxiliaryFolder);
}
initializeBootstrapDirsIfNotExists(hadoopConf, basePath, fs);
HoodieTableConfig.createHoodieProperties(fs, metaPathDir, props);
// We should not use fs.getConf as this might be different from the original configuration
// used to create the fs in unit tests
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
LOG.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType() + " from " + basePath);
return metaClient;
}
HoodieWriteConfig
這里的配置是寫數(shù)據(jù)時(shí)使用的配置,上面initTable的配置是持久化文件的配置,當(dāng)然這倆配置要保持一致(實(shí)際上Spark客戶端就是保持一致的)。可以看到有Schema、表名、payload、索引、clean、文件大小等一些參數(shù)。熟悉這些參數(shù)后就可以進(jìn)行調(diào)優(yōu)了。
Properties indexProperties = new Properties();
indexProperties.put(BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.key(), 150000); // 1000萬總體時(shí)間提升1分鐘
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(writeSchema.toString())
.withParallelism(parallelism, parallelism).withDeleteParallelism(parallelism)
.forTable(tableName)
.withWritePayLoad(payloadClassName)
.withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(orderingField).build())
.withIndexConfig(HoodieIndexConfig.newBuilder()
.withIndexType(HoodieIndex.IndexType.BLOOM)
// .bloomIndexPruneByRanges(false) // 1000萬總體時(shí)間提升1分鐘
.bloomFilterFPP(0.000001) // 1000萬總體時(shí)間提升3分鐘
.fromProperties(indexProperties)
.build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(150, 200)
.compactionSmallFileSize(Long.parseLong(smallFileLimit))
.approxRecordSize(Integer.parseInt(recordSizeEstimate))
.retainCommits(100).build())
.withStorageConfig(HoodieStorageConfig.newBuilder()
.parquetMaxFileSize(Long.parseLong(maxFileSize)).build())
.build();
HoodieJavaWriteClient
創(chuàng)建writeClient
HoodieJavaWriteClient writeClient = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg)
startCommit
String newCommitTime = writeClient.startCommit();
具體的實(shí)現(xiàn)在其父類AbstractHoodieWriteClient中。首先調(diào)用rollbackFailedWrites執(zhí)行rollback操作,關(guān)于rollback分析本文先不講。然后通過HoodieActiveTimeline.createNewInstantTime()創(chuàng)建一個(gè)新的instantTime。最后創(chuàng)建metaClient,通過metaClient.getActiveTimeline().createNewInstant生成.commit.request文件
public String startCommit() {
// 首先調(diào)用rollbackFailedWrites執(zhí)行rollback操作
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.COMMIT_ACTION, () -> rollbackFailedWrites());
// 生成新的instantTime
String instantTime = HoodieActiveTimeline.createNewInstantTime();
// 創(chuàng)建metaClient
HoodieTableMetaClient metaClient = createMetaClient(true);
startCommit(instantTime, metaClient.getCommitActionType(), metaClient);
return instantTime;
}
private void startCommit(String instantTime, String actionType, HoodieTableMetaClient metaClient) {
LOG.info("Generate a new instant time: " + instantTime + " action: " + actionType);
// if there are pending compactions, their instantTime must not be greater than that of this instant time
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending ->
ValidationUtils.checkArgument(
HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), HoodieTimeline.LESSER_THAN, instantTime),
"Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :"
+ latestPending + ", Ingesting at " + instantTime));
if (config.getFailedWritesCleanPolicy().isLazy()) {
this.heartbeatClient.start(instantTime);
}
// 創(chuàng)建.commit.request
metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, actionType,
instantTime));
}
generateRecord
主要是構(gòu)造writeClient寫數(shù)據(jù)所需的數(shù)據(jù)結(jié)構(gòu)writeRecords:List<hoodierecord
client.insert(writeRecords, newCommitTime)
首先獲取table,這里返回HoodieJavaCopyOnWriteTable,接著驗(yàn)證一下schema和歷史數(shù)據(jù)的兼容性。然后通過preWrite執(zhí)行寫之前的一些步驟,比如設(shè)置操作類型,接著調(diào)用table.insert執(zhí)行完整的寫數(shù)據(jù)操作,返回result。最后調(diào)用postWrite執(zhí)行archive、clean等操作返回WriteStatuses。
public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {
// 首先獲取table,這里的table為HoodieJavaCopyOnWriteTable
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
// 驗(yàn)證schema
table.validateUpsertSchema();
// 寫之前的一些步驟,比如設(shè)置操作類型
preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
// 調(diào)用table.insert執(zhí)行寫數(shù)據(jù)操作,返回result
HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
}
// 調(diào)用postWrite返回WriteStatuses
return postWrite(result, instantTime, table);
}
postWrite
我們先看一下postWrite的邏輯,首先判斷是否已經(jīng)commit生成了.commit文件,如果是的話,則執(zhí)行archive、clean,也就是archive、clean等操作是在寫操作完成、生成.commit文件之后進(jìn)行的。
/**
* 判斷是否已經(jīng)commit生成了.commit文件,如果是的話,則執(zhí)行archive、clean
*/
protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result,
String instantTime,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
}
// commit是否已經(jīng)提交,這里主要考慮是否設(shè)置了自動(dòng)提交,hoodie.auto.commit默認(rèn)true
// 如果不是自動(dòng)提交的話,那么我們需要手動(dòng)執(zhí)行clean等操作,然后手動(dòng)commit
// 所以這里默認(rèn)為true
// isCommitted代表著已經(jīng)生成了.commit文件,也就是寫操作成功了,也就是通過table.insert已經(jīng)完成了整個(gè)的寫操作
if (result.isCommitted()) {
// Perform post commit operations.
if (result.getFinalizeDuration().isPresent()) {
metrics.updateFinalizeWriteMetrics(result.getFinalizeDuration().get().toMillis(),
result.getWriteStats().get().size());
}
// postCommit主要是執(zhí)行archive、clean等操作。也就是archive、clean等操作是在寫操作完成,生成.commit文件之后進(jìn)行的。
postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty());
emitCommitMetrics(instantTime, result.getCommitMetadata().get(), hoodieTable.getMetaClient().getCommitActionType());
}
return result.getWriteStatuses();
}
table.insert
先調(diào)用JavaInsertCommitActionExecutor.execute接著調(diào)用JavaWriteHelper.newInstance().write
public HoodieWriteMetadata<List<WriteStatus>> insert(HoodieEngineContext context,
String instantTime,
List<HoodieRecord<T>> records) {
return new JavaInsertCommitActionExecutor<>(context, config,
this, instantTime, records).execute();
}
public HoodieWriteMetadata<List<WriteStatus>> execute() {
return JavaWriteHelper.newInstance().write(instantTime, inputRecords, context, table,
config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false);
}
JavaWriteHelper.write
它的write方法是在其父類AbstractWriteHelper中實(shí)現(xiàn)的,首先首先判斷是否需要去重(通過配置項(xiàng)hoodie.combine.before.insert配置是否需要去重),insert默認(rèn)不需要去重(upsert/delete默認(rèn)需要)。如果需要去重的話調(diào)用方法combineOnCondition先進(jìn)行去重。
然后判斷是否需要tag, tag的作用主要是利用文件中保存的索引信息(默認(rèn)布隆索引),判斷records中的數(shù)據(jù)哪些是新增數(shù)據(jù),哪些是更新數(shù)據(jù),對(duì)于更新的數(shù)據(jù),還要添加上對(duì)應(yīng)的文件位置信息,方便后面更新時(shí)查找對(duì)應(yīng)的parquet文件。由于這里為insert所以不需要tag,這也是insert和upsert一個(gè)比較大的區(qū)別。
我們后面分析upsert源碼時(shí),會(huì)專門分析tag怎么實(shí)現(xiàn)的,本文先略過。然后通過調(diào)用executor.execute執(zhí)行寫操作,返回result,這里的executor為JavaInsertCommitActionExecutor。
public HoodieWriteMetadata<O> write(String instantTime,
I inputRecords,
HoodieEngineContext context,
HoodieTable<T, I, K, O> table,
boolean shouldCombine,
int shuffleParallelism,
BaseCommitActionExecutor<T, I, K, O, R> executor,
boolean performTagging) {
try {
// De-dupe/merge if needed
// 如果開啟了去重,則先去重,insert默認(rèn)不去重
// 配置項(xiàng)hoodie.combine.before.insert
I dedupedRecords =
combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table);
Instant lookupBegin = Instant.now();
I taggedRecords = dedupedRecords;
if (performTagging) { // 是否需要tag,insert為false
// perform index loop up to get existing location of records
// tag的作用主要是利用文件中保存的索引信息(默認(rèn)布隆索引),判斷records中的數(shù)據(jù)哪些是新增數(shù)據(jù),哪些是更新數(shù)據(jù)
// 對(duì)于更新的數(shù)據(jù),還要添加上對(duì)應(yīng)的文件位置信息,方便后面更新時(shí)查找對(duì)應(yīng)的parquet文件
taggedRecords = tag(dedupedRecords, context, table);
}
Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());
// 通過調(diào)用executor.execute執(zhí)行寫操作,返回result。這里的executor為JavaInsertCommitActionExecutor
HoodieWriteMetadata<O> result = executor.execute(taggedRecords);
result.setIndexLookupDuration(indexLookupDuration);
return result;
} catch (Throwable e) {
if (e instanceof HoodieUpsertException) {
throw (HoodieUpsertException) e;
}
throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
}
}
public boolean shouldCombineBeforeInsert() {
return getBoolean(COMBINE_BEFORE_INSERT);
}
public static final ConfigProperty<String> COMBINE_BEFORE_INSERT = ConfigProperty
.key("hoodie.combine.before.insert")
.defaultValue("false")
.withDocumentation("When inserted records share same key, controls whether they should be first combined (i.e de-duplicated) before"
+ " writing to storage.");
JavaInsertCommitActionExecutor.execute
JavaInsertCommitActionExecutor.execute實(shí)際上調(diào)用的父類BaseJavaCommitActionExecutor的execute
首先通過buildProfile構(gòu)建WorkloadProfile,構(gòu)建WorkloadProfile的目的主要是為給getPartitioner使用。WorkloadProfile包含了分區(qū)路徑對(duì)應(yīng)的insert/upsert數(shù)量以及upsert數(shù)據(jù)對(duì)應(yīng)的文件位置信息。數(shù)量信息是為了分桶,或者說是為了分幾個(gè)文件,這里涉及了小文件合并、文件大小等原理,位置信息是為了獲取要更新的文件,也就是對(duì)應(yīng)的fileId。對(duì)于upsert數(shù)據(jù),我們復(fù)用原來的fileId。對(duì)于insert數(shù)據(jù),我們生成新的fileId,如果record數(shù)比較多,則分多個(gè)文件寫。然后將WorkloadProfile元數(shù)據(jù)信息持久化到.inflight文件中,.commit.request->.commit.inflight。這一步主要是為了mor表的rollback,rollback時(shí)可以從.inflight文件中讀取對(duì)應(yīng)的元數(shù)據(jù)信息。然后通過getPartitioner根據(jù)WorkloadProfile獲取partitioner,接著調(diào)用partition方法返回partitionedRecords(<桶號(hào),對(duì)應(yīng)的HoodieRecord>),一個(gè)桶對(duì)應(yīng)一個(gè)文件 fileId。最后再遍歷partitionedRecords,也就是每個(gè)桶執(zhí)行一次寫操作handleInsertPartition/handleUpsertPartition,最后調(diào)用BoundedInMemoryExecutor.execute,利用生產(chǎn)者消費(fèi)者模式寫數(shù)據(jù),關(guān)于如何通過生產(chǎn)者消費(fèi)者模式寫數(shù)據(jù),我已經(jīng)在Hudi源碼|bootstrap源碼分析總結(jié)(寫Hudi)分析過bootstrap的源碼了,原理一樣,不同的是實(shí)現(xiàn)類不一樣,感興趣的可以看看。
關(guān)于tag(索引相關(guān))、WorkloadProfile、getPartitioner、handleInsertPartition/handleUpsertPartition本文講個(gè)大概,可能有不準(zhǔn)確的地方,大家可以先結(jié)合17張圖帶你徹底理解Hudi Upsert原理進(jìn)行學(xué)習(xí),至于具體的源碼分析,限于篇幅及個(gè)人精力,本文先不涉及,會(huì)放在后面的文章單獨(dú)講解,對(duì)于本文可能不準(zhǔn)確的地方,也會(huì)在后面的文章中更新。
public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
WorkloadProfile profile = null;
if (isWorkloadProfileNeeded()) { // 始終為true
// 構(gòu)建WorkloadProfile,構(gòu)建WorkloadProfile的目的主要是為給getPartitioner使用
// WorkloadProfile包含了分區(qū)路徑對(duì)應(yīng)的insert/upsert數(shù)量以及upsert數(shù)據(jù)對(duì)應(yīng)的文件位置信息
// 數(shù)量信息是為了分桶,或者說是為了分幾個(gè)文件,這里涉及了小文件合并、文件大小等原理
// 位置信息是為了獲取要更新的文件
// 對(duì)于upsert數(shù)據(jù),我們復(fù)用原來的fileId
// 對(duì)于insert數(shù)據(jù),我們生成新的fileId,如果record數(shù)比較多,則分多個(gè)文件寫
profile = new WorkloadProfile(buildProfile(inputRecords));
LOG.info("Workload profile :" + profile);
try {
// 將WorkloadProfile元數(shù)據(jù)信息持久化到.inflight文件中,.commit.request->.commit.inflight.
// 這一步主要是為了mor表的rollback,rollback時(shí)可以從.inflight文件中讀取對(duì)應(yīng)的元數(shù)據(jù)信息
saveWorkloadProfileMetadataToInflight(profile, instantTime);
} catch (Exception e) {
HoodieTableMetaClient metaClient = table.getMetaClient();
HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
try {
if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
}
} catch (IOException ex) {
LOG.error("Check file exists failed");
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
}
}
}
// 根據(jù)WorkloadProfile獲取partitioner
final Partitioner partitioner = getPartitioner(profile);
// <桶號(hào),對(duì)應(yīng)的HoodieRecord>,一個(gè)桶對(duì)應(yīng)一個(gè)文件 fileId
Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);
List<WriteStatus> writeStatuses = new LinkedList<>();
// forEach,每個(gè)桶執(zhí)行一次寫操作handleInsertPartition/handleUpsertPartition
// 最終通過BoundedInMemoryExecutor.execute 生產(chǎn)者消費(fèi)者模式寫數(shù)據(jù)
partitionedRecords.forEach((partition, records) -> {
// 是否更新、刪除
if (WriteOperationType.isChangingRecords(operationType)) {
handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
} else {
handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
}
});
updateIndex(writeStatuses, result);
// commit生成.commit文件,.commit文件的生成標(biāo)記著寫數(shù)據(jù)的完成
updateIndexAndCommitIfNeeded(writeStatuses, result);
return result;
}
commit
上面通過handleInsertPartition/handleUpsertPartition實(shí)際上已經(jīng)完成了數(shù)據(jù)的寫入。但是最后還要生成.commit元數(shù)據(jù)文件,代表一次commit的完成,否則如果沒有生成commit的話,比如只有.commit.request或者commit.inflight,這樣在查詢時(shí)候不會(huì)查到本地寫數(shù)據(jù)生成的文件,而且下次寫數(shù)據(jù)時(shí)會(huì)觸發(fā)rollback來處理。
這里索引相關(guān)的先不看,commit調(diào)用鏈updateIndexAndCommitIfNeeded->commitOnAutoCommit->autoCommit->commit->commit,最終在BaseJavaCommitActionExecutor的commit方法中通過activeTimeline.saveAsComplete生成.commit文件。
前面講過了,在生成.commit文件后,會(huì)調(diào)用postWrite方法觸發(fā)archive、clean等操作。實(shí)際上archive、clean等操作的失敗,不影響本次寫數(shù)據(jù)的成功。比如clean失敗了,可以下次再clean就可以了。所以當(dāng)commit完成后,如果clean失敗了,這樣對(duì)于有失敗機(jī)制的集成工具,比如我們使用的Apache NIFI,是不能將本批次數(shù)據(jù)放進(jìn)失敗隊(duì)列的。PS:當(dāng)本次commit不成功時(shí),我們需要放進(jìn)失敗隊(duì)列,目的是防止數(shù)據(jù)丟失。其實(shí)我們可以自己寫代碼繼承JavaClient類,將postWrite方法和table.insert分開。這樣便于判斷是寫數(shù)據(jù)失敗還是clean失敗,以后我會(huì)分享相關(guān)代碼實(shí)現(xiàn)。
public void updateIndexAndCommitIfNeeded(List<WriteStatus> writeStatuses, HoodieWriteMetadata result) {
Instant indexStartTime = Instant.now();
// Update the index back
List<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table);
result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
result.setWriteStatuses(statuses);
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(result));
commitOnAutoCommit(result);
}
protected void commitOnAutoCommit(HoodieWriteMetadata result) {
// validate commit action before committing result
runPrecommitValidators(result);
// validate commit action before committing result
if (config.shouldAutoCommit()) {
LOG.info("Auto commit enabled: Committing " + instantTime);
autoCommit(extraMetadata, result);
} else {
LOG.info("Auto commit disabled for " + instantTime);
}
}
protected void autoCommit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<O> result) {
this.txnManager.beginTransaction(Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, instantTime)),
lastCompletedTxn.isPresent() ? Option.of(lastCompletedTxn.get().getLeft()) : Option.empty());
try {
TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
result.getCommitMetadata(), config, this.txnManager.getLastCompletedTransactionOwner());
commit(extraMetadata, result);
} finally {
this.txnManager.endTransaction();
}
}
// BaseJavaCommitActionExecutor
protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result) {
commit(extraMetadata, result, result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()));
}
protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result, List<HoodieWriteStat> writeStats) {
String actionType = getCommitActionType();
LOG.info("Committing " + instantTime + ", action Type " + actionType);
result.setCommitted(true);
result.setWriteStats(writeStats);
// Finalize write
finalizeWrite(instantTime, writeStats, result);
try {
LOG.info("Committing " + instantTime + ", action Type " + getCommitActionType());
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
// 通過activeTimeline.saveAsComplete生成.commit文件
activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
LOG.info("Committed " + instantTime);
result.setCommitMetadata(Option.of(metadata));
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
e);
}
}
handleInsertPartition
附handleInsertPartition到生產(chǎn)者消費(fèi)者模式調(diào)用鏈,handleInsertPartition->handleUpsertPartition->handleInsert->JavaLazyInsertIterable.computeNext->BoundedInMemoryExecutor.execute
protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr,
Partitioner partitioner) {
return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
}
protected Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr,
Partitioner partitioner) {
JavaUpsertPartitioner javaUpsertPartitioner = (JavaUpsertPartitioner) partitioner;
BucketInfo binfo = javaUpsertPartitioner.getBucketInfo(partition);
BucketType btype = binfo.bucketType;
try {
if (btype.equals(BucketType.INSERT)) {
return handleInsert(binfo.fileIdPrefix, recordItr);
} else if (btype.equals(BucketType.UPDATE)) {
return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr);
} else {
throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
}
} catch (Throwable t) {
String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
LOG.error(msg, t);
throw new HoodieUpsertException(msg, t);
}
}
public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr) {
// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
if (!recordItr.hasNext()) {
LOG.info("Empty partition");
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
}
return new JavaLazyInsertIterable<>(recordItr, true, config, instantTime, table, idPfx,
taskContextSupplier, new CreateHandleFactory<>());
}
// JavaLazyInsertIterable
protected List<WriteStatus> computeNext() {
// Executor service used for launching writer thread.
BoundedInMemoryExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor =
null;
try {
final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
bufferedIteratorExecutor =
new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getTransformFunction(schema));
// bufferedIteratorExecutor.execute通過生產(chǎn)者消費(fèi)者模型實(shí)現(xiàn)寫數(shù)據(jù)
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
return result;
} catch (Exception e) {
throw new HoodieException(e);
} finally {
if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow();
}
}
}
總結(jié)
本文以Java Client為例,對(duì)Apache Hudi insert源碼進(jìn)行了整體邏輯的分析總結(jié),希望能對(duì)大家有所幫助。由于精力有限,對(duì)于文中提到的WorkloadProfile、getPartitioner、handleInsertPartition/handleUpsertPartition等,我會(huì)在后面的文章再進(jìn)行總結(jié)。并且等insert相關(guān)源碼分析完后,會(huì)再進(jìn)行upsert的源碼分析。可能有些地方不夠準(zhǔn)確,還請(qǐng)大家多多指正,讓我們共同進(jìn)步。
注釋代碼
github: https://github.com/dongkelun/hudi/tree/0.9.0-learning-comments
gitee: https://gitee.com/dongkelun/hudi/tree/0.9.0-learning-comments
