Hudi 源碼 | Hudi 索引:Tag 和 TagLocation
共 53695字,需瀏覽 108分鐘
·
2024-07-25 14:16
前言
接上篇文章和之前的總結(jié)的源碼文章,本文總結(jié)源碼 tag/tagLocation ,對(duì)應(yīng)功能:根據(jù)索引信息判斷記錄是否存在,如果不存在,代表是新增數(shù)據(jù),如果記錄存在則代表是更新數(shù)據(jù),需要找到并設(shè)置 currentLocation。
tag
AbstractWriteHelper.tag
private I tag(
I dedupedRecords, HoodieEngineContext context, HoodieTable<T, I, K, O> table) {
// perform index loop up to get existing location of records
// 執(zhí)行索引循環(huán)以獲取記錄的現(xiàn)有位置
// 對(duì)于 Java Client 這里 table 為 HoodieJavaCopyOnWriteTable
return table.getIndex().tagLocation(dedupedRecords, context, table);
}
table.getIndex()
對(duì)于 Java Client 這里 table 為 HoodieJavaCopyOnWriteTable , HoodieJavaCopyOnWriteTable.getIndex() -> HoodieJavaTable.getIndex
protected HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext context) {
return JavaHoodieIndex.createIndex(config);
}
public static HoodieIndex createIndex(HoodieWriteConfig config) {
// first use index class config to create index.
if (!StringUtils.isNullOrEmpty(config.getIndexClass())) {
Object instance = ReflectionUtils.loadClass(config.getIndexClass(), config);
if (!(instance instanceof HoodieIndex)) {
throw new HoodieIndexException(config.getIndexClass() + " is not a subclass of HoodieIndex");
}
return (JavaHoodieIndex) instance;
}
// TODO more indexes to be added
// 從這里看出,當(dāng)前版本(0.9.0),Java Client 只支持兩種索引類型:INMEMORY 和 BLOOM
switch (config.getIndexType()) {
case INMEMORY:
return new JavaInMemoryHashIndex(config);
case BLOOM:
return new JavaHoodieBloomIndex(config);
default:
throw new HoodieIndexException("Unsupported index type " + config.getIndexType());
}
}
因?yàn)橹付怂饕愋蜑?BLOOM , 所以這里返回 JavaHoodieBloomIndex 。
JavaHoodieBloomIndex
JavaHoodieBloomIndex 的父類為 HoodieBaseBloomIndex ,其 tagLocation 在父類 HoodieBaseBloomIndex
public class JavaHoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieBaseBloomIndex<T> {
public JavaHoodieBloomIndex(HoodieWriteConfig config) {
super(config);
}
}
tagLocation
提取映射:
Map(partitionPath, List) lookupIndex :根據(jù)索引查找每個(gè) recordKey 的 location,返回 每個(gè) recordKey 和 location 的對(duì)應(yīng)關(guān)系:
Map<HoodieKey, HoodieRecordLocation>tagLocationBacktoRecords :根據(jù) lookupIndex 返回的 recordKey 和 location 的對(duì)應(yīng)關(guān)系對(duì)應(yīng)關(guān)系
keyFilenamePair,為每個(gè) HoodieRecord 設(shè)置 currentLocation 。
@Override
public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records, HoodieEngineContext context,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
// Step 1: Extract out thinner Map of (partitionPath, recordKey)
// 第一步:提取更薄的映射(partitionPath,recordKey)
// (partitionPath, List(recordKey))
Map<String, List<String>> partitionRecordKeyMap = new HashMap<>();
records.forEach(record -> {
// 如果包含記錄對(duì)應(yīng)的分區(qū)路徑
if (partitionRecordKeyMap.containsKey(record.getPartitionPath())) {
// 現(xiàn)有分區(qū)中對(duì)應(yīng)的 List 添加 recordKey
partitionRecordKeyMap.get(record.getPartitionPath()).add(record.getRecordKey());
} else {
// 將 recordKey 添加到 recordKeys 中
List<String> recordKeys = Lists.newArrayList();
recordKeys.add(record.getRecordKey());
// 添加新的分區(qū)路徑和對(duì)應(yīng)的 List(recordKey)
partitionRecordKeyMap.put(record.getPartitionPath(), recordKeys);
}
});
// Step 2: Lookup indexes for all the partition/recordkey pair
// 第二步:根據(jù)索引查找每個(gè) recordKey 的 location,返回 每個(gè) recordKey 和 location 的對(duì)應(yīng)關(guān)系
// (HoodieKey, HoodieRecordLocation)
Map<HoodieKey, HoodieRecordLocation> keyFilenamePairMap =
lookupIndex(partitionRecordKeyMap, context, hoodieTable);
if (LOG.isDebugEnabled()) {
long totalTaggedRecords = keyFilenamePairMap.values().size();
LOG.debug("Number of update records (ones tagged with a fileID): " + totalTaggedRecords);
}
// Step 3: Tag the incoming records, as inserts or updates, by joining with existing record keys
// 第三步:通過與現(xiàn)有recordKey連接,將傳入記錄標(biāo)記為插入或更新
List<HoodieRecord<T>> taggedRecords = tagLocationBacktoRecords(keyFilenamePairMap, records);
return taggedRecords;
}
lookupIndex
查找每個(gè) recordKey 的 location,并返回已存在的所有 recordKey 和 location 的映射: Map<HoodieKey, HoodieRecordLocation>,如果不存在,則刪除記錄鍵。
在傳入記錄中獲取每個(gè)分區(qū)的記錄數(shù) :recordsPerPartition
loadInvolvedFiles :將所有涉及的文件加載為<Partition, filename>對(duì)
explodeRecordsWithFileComparisons :利用區(qū)間樹根據(jù)最大值最小值查找每個(gè) HoodieKey 可能存在于哪個(gè)文件,返回:
List<Pair<fileId, HoodieKey>>,這里多個(gè) fileId 對(duì)應(yīng)一個(gè) HoodieKey ,一個(gè) fileId 對(duì)應(yīng)多個(gè)HoodieKey,類似于笛卡爾積,多對(duì)多的關(guān)系,按照 fileId 排序findMatchingFilesForRecordKeys :遍歷 explodeRecordsWithFileComparisons 返回的
List<Pair<fileId, HoodieKey>>,以 fileId 為維度,利用布隆索引判斷有哪些 HoodieKey 可能存在于該文件中,并添加到候選:candidateRecordKeys ,最后遍歷 candidateRecordKeys ,去 parquet 數(shù)據(jù)文件中確認(rèn)該 key 是否確實(shí)存在于該文件,最后返回確切的 recordKey 和 location 的映射關(guān)系 :Map<HoodieKey, HoodieRecordLocation>
/**
* Lookup the location for each record key and return the pair<record_key,location> for all record keys already
* present and drop the record keys if not present.
*
* 查找每個(gè) recordKey 的 location,并返回已存在的所有 recordKey 的 pair<record_key,location>,如果不存在,則刪除記錄鍵。
*/
private Map<HoodieKey, HoodieRecordLocation> lookupIndex(
Map<String, List<String>> partitionRecordKeyMap, final HoodieEngineContext context,
final HoodieTable hoodieTable) {
// Obtain records per partition, in the incoming records
// 在傳入記錄中獲取每個(gè)分區(qū)的記錄數(shù)
Map<String, Long> recordsPerPartition = new HashMap<>();
// (分區(qū)路徑,每個(gè)分區(qū)路徑對(duì)應(yīng)的記錄數(shù))
partitionRecordKeyMap.keySet().forEach(k -> recordsPerPartition.put(k, Long.valueOf(partitionRecordKeyMap.get(k).size())));
// 所有的分區(qū)路徑
List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());
// Step 2: Load all involved files as <Partition, filename> pairs
// 第二步:將所有涉及的文件加載為<Partition, filename>對(duì)
// List(Partition, BloomIndexFileInfo) BloomIndexFileInfo 包含 fileID,minRecordKey,maxRecordKey
List<Pair<String, BloomIndexFileInfo>> fileInfoList =
loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable);
// Map (Partition, List(BloomIndexFileInfo))
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
fileInfoList.stream().collect(groupingBy(Pair::getLeft, mapping(Pair::getRight, toList())));
// Step 3: Obtain a List, for each incoming record, that already exists, with the file id,
// that contains it.
// 第三步:為每個(gè)已存在的傳入記錄獲取一個(gè)列表,其中包含該列表的文件id。
// List(fileId, HoodieKey) ,這里多個(gè) fileId 對(duì)應(yīng)一個(gè) HoodieKey ,一個(gè) fileId 對(duì)應(yīng)多個(gè)HoodieKey
// 類似于笛卡爾積,多對(duì)多的關(guān)系,按照 fileId 排序
List<Pair<String, HoodieKey>> fileComparisons =
explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyMap);
return findMatchingFilesForRecordKeys(fileComparisons, hoodieTable);
}
loadInvolvedFiles
將所有涉及的文件加載為 List<Pair<partitionPath, BloomIndexFileInfo>>
/**
* Load all involved files as <Partition, filename> pair List.
*
* 將所有涉及的文件加載為<Partition,BloomIndexFileInfo>對(duì)列表。
*/
//TODO duplicate code with spark, we can optimize this method later
List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final HoodieEngineContext context,
final HoodieTable hoodieTable) {
// Obtain the latest data files from all the partitions.
// 從所有分區(qū)中獲取最新的數(shù)據(jù)文件。
// List (partitionPath,FileId)
List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
.map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
.collect(toList());
// 是否需要根據(jù)最大值最小值進(jìn)行第一階段過濾
if (config.getBloomIndexPruneByRanges()) {// 默認(rèn)true
// also obtain file ranges, if range pruning is enabled
context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
return context.map(partitionPathFileIDList, pf -> {
try {
HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
// 讀取最大值最小值,具體為 parquet 文件元數(shù)據(jù)中的 hoodie_min_record_key 、hoodie_max_record_key
String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
// 返回 (partitionPath, (fileId, hoodie_min_record_key, hoodie_max_record_key))
return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
} catch (MetadataNotFoundException me) {
LOG.warn("Unable to find range metadata in file :" + pf);
return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
}
}, Math.max(partitionPathFileIDList.size(), 1));
} else {
// 返回 (partitionPath, (fileId, null, null))
return partitionPathFileIDList.stream()
.map(pf -> Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList());
}
Interval Tree
Interval Tree :區(qū)間樹 ,翻譯軟件一般翻譯為間隔樹。
百度百科:區(qū)間樹是在平衡樹基礎(chǔ)上進(jìn)行擴(kuò)展得到的支持以區(qū)間為元素的動(dòng)態(tài)集合的操作,其中每個(gè)節(jié)點(diǎn)的關(guān)鍵值是區(qū)間的左端點(diǎn)。
博客:區(qū)間樹是在紅黑樹基礎(chǔ)上進(jìn)行擴(kuò)展得到的支持以區(qū)間為元素的動(dòng)態(tài)集合的操作,其中每個(gè)節(jié)點(diǎn)的關(guān)鍵值是區(qū)間的左端點(diǎn)。通過建立這種特定的結(jié)構(gòu),可是使區(qū)間的元素的查找和插入都可以在O(lgn)的時(shí)間內(nèi)完成。相比于基礎(chǔ)的紅黑樹數(shù)據(jù)結(jié)構(gòu),增加了一個(gè)max[x],即以x為根的子樹中所有區(qū)間的斷點(diǎn)的最大值
請(qǐng)注意:區(qū)間樹和線段樹不一樣,線段樹是一種特殊的區(qū)間樹。區(qū)間樹:Interval Tree , 線段樹:Segment Tree 。網(wǎng)上有很多博客將區(qū)間樹和線段樹歸為一種。
線段樹:Segment Tree ,線段樹是一種二叉搜索樹,與區(qū)間樹相似,它將一個(gè)區(qū)間劃分成一些單元區(qū)間,每個(gè)單元區(qū)間對(duì)應(yīng)線段樹中的一個(gè)葉結(jié)點(diǎn)。使用線段樹可以快速的查找某一個(gè)節(jié)點(diǎn)在若干條線段中出現(xiàn)的次數(shù),時(shí)間復(fù)雜度為O(logN)。而未優(yōu)化的空間復(fù)雜度為2N,實(shí)際應(yīng)用時(shí)一般還要開4N的數(shù)組以免越界,因此有時(shí)需要離散化讓空間壓縮。
explodeRecordsWithFileComparisons
首先判斷是否需要使用區(qū)間樹基于最小和最大記錄鍵值進(jìn)行過濾,默認(rèn)為true,則創(chuàng)建 IntervalTreeBasedIndexFileFilter 。
IntervalTreeBasedIndexFileFilter :基于區(qū)間樹的索引查找。為每個(gè)分區(qū)構(gòu)建一個(gè){@link KeyRangeLookupTree},并使用它來搜索需要查找的任何給定recordKey的匹配索引文件。
主要邏輯:利用區(qū)間樹根據(jù)最大值最小值,返回可能包含 recordKey 的文件列表。利用區(qū)間樹的原因主要是可以降低查詢時(shí)間。
查詢邏輯:
對(duì)于有最大值和最小值的文件,如果該 recordKey 在最大值最小之區(qū)間內(nèi),則認(rèn)為該文件可能包含 recordKey
對(duì)于沒有最大值和最小值的文件,則認(rèn)為該文件可能包含 recordKey
返回值:List(fileId, HoodieKey) , 多個(gè) fileId 對(duì)應(yīng)一個(gè) HoodieKey
/**
* For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be
* checked. For tables, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files
* to be compared gets cut down a lot from range pruning.
*
* 對(duì)于每個(gè)傳入的記錄,生成N個(gè)輸出記錄,每個(gè)文件1個(gè),需要對(duì)照該記錄的密鑰進(jìn)行檢查。
* 對(duì)于鍵有明確插入順序的表(例如:時(shí)間戳作為前綴),要比較的文件數(shù)量會(huì)因范圍修剪而大大減少。
* <p>
* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
* recordKey ranges in the index info.
*
* 子分區(qū),以確保可以根據(jù)文件查找記錄,還可以根據(jù)索引信息中的recordKey范圍修剪文件<=>記錄比較。
*
* 主要邏輯:利用區(qū)間樹根據(jù)最大值最小值,返回可能包含 recordKey 的文件列表。利用區(qū)間樹的原因主要是可以降低查詢時(shí)間。
* 查詢邏輯:1、對(duì)于有最大值和最小值的文件,如果該 recordKey 在最大值最小之區(qū)間內(nèi),則認(rèn)為該文件可能包含 recordKey
* 2、對(duì)于沒有最大值和最小值的文件,則認(rèn)為該文件可能包含 recordKey
*
* 返回值:List(fileId, HoodieKey)
*/
List<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
Map<String, List<String>> partitionRecordKeyMap) {
// 是否使用區(qū)間樹基于最小和最大記錄鍵值進(jìn)行過濾,默認(rèn)為true
IndexFileFilter indexFileFilter =
config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
: new ListBasedIndexFileFilter(partitionToFileIndexInfo);
// List(fileId, HoodieKey)
List<Pair<String, HoodieKey>> fileRecordPairs = new ArrayList<>();
partitionRecordKeyMap.keySet().forEach(partitionPath -> {
List<String> hoodieRecordKeys = partitionRecordKeyMap.get(partitionPath);
hoodieRecordKeys.forEach(hoodieRecordKey -> {
indexFileFilter.getMatchingFilesAndPartition(partitionPath, hoodieRecordKey).forEach(partitionFileIdPair -> {
// (fileId, HoodieKey)
fileRecordPairs.add(Pair.of(partitionFileIdPair.getRight(),
new HoodieKey(hoodieRecordKey, partitionPath)));
});
});
});
return fileRecordPairs;
}
IntervalTreeBasedIndexFileFilter
基于區(qū)間樹的索引查找。為每個(gè)分區(qū)構(gòu)建一個(gè){@link KeyRangeLookupTree},并使用它來搜索需要查找的任何給定recordKey的匹配索引文件。
KeyRangeLookupTree: 基于區(qū)間樹實(shí)現(xiàn)的查找樹,查詢?nèi)我饨o定 Key 的時(shí)間復(fù)雜度為 (N logN)
對(duì)于有有最大值最小值的文件,構(gòu)造為區(qū)間樹:KeyRangeLookupTree
對(duì)于沒有最大值最小值的文件,將 fileId 添加到 partitionToFilesWithNoRanges
IntervalTreeBasedIndexFileFilter(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) {
partitionToFileIndexInfo.forEach((partition, bloomIndexFiles) -> {
// Note that the interval tree implementation doesn't have auto-balancing to ensure logN search time.
// 請(qǐng)注意,區(qū)間樹實(shí)現(xiàn)沒有自動(dòng)平衡來確保logN搜索時(shí)間。
// So, we are shuffling the input here hoping the tree will not have any skewness. If not, the tree could be
// skewed which could result in N search time instead of logN.
// 所以,我們?cè)谶@里打亂輸入,希望樹不會(huì)有任何傾斜。否則,樹可能會(huì)傾斜,這可能導(dǎo)致搜索時(shí)間是N而不是logN。
Collections.shuffle(bloomIndexFiles);
KeyRangeLookupTree lookUpTree = new KeyRangeLookupTree();
bloomIndexFiles.forEach(indexFileInfo -> {
if (indexFileInfo.hasKeyRanges()) { // 如果有最大值最小值
// 將 最大值,最小值,fileId 插入到 lookUpTree
// 構(gòu)造間隔數(shù)
lookUpTree.insert(new KeyRangeNode(indexFileInfo.getMinRecordKey(), indexFileInfo.getMaxRecordKey(),
indexFileInfo.getFileId()));
} else {
if (!partitionToFilesWithNoRanges.containsKey(partition)) {
partitionToFilesWithNoRanges.put(partition, new HashSet<>());
}
// 將沒有最大值最小值的 fileId 添加到 partitionToFilesWithNoRanges
partitionToFilesWithNoRanges.get(partition).add(indexFileInfo.getFileId());
}
});
partitionToFileIndexLookUpTree.put(partition, lookUpTree);
});
}
lookUpTree.insert
區(qū)間樹的具體構(gòu)造邏輯。
void insert(KeyRangeNode newNode) {
root = insert(getRoot(), newNode);
}
/**
* Inserts a new {@link KeyRangeNode} to this look up tree.
*
* If no root exists, make {@code newNode} as the root and return the new root.
*
* If current root and newNode matches with min record key and max record key, merge two nodes. In other words, add
* files from {@code newNode} to current root. Return current root.
*
* If current root is < newNode if current root has no right sub tree update current root's right sub tree max and min
* set newNode as right sub tree else update root's right sub tree min and max with newNode's min and max record key
* as applicable recursively call insert() with root's right subtree as new root
*
* else // current root is >= newNode if current root has no left sub tree update current root's left sub tree max and
* min set newNode as left sub tree else update root's left sub tree min and max with newNode's min and max record key
* as applicable recursively call insert() with root's left subtree as new root
*
* @param root refers to the current root of the look up tree
* @param newNode newNode the new {@link KeyRangeNode} to be inserted
*/
private KeyRangeNode insert(KeyRangeNode root, KeyRangeNode newNode) {
if (root == null) {
root = newNode;
return root;
}
if (root.compareTo(newNode) == 0) {
root.addFiles(newNode.getFileNameList());
return root;
}
if (root.compareTo(newNode) < 0) {
if (root.getRight() == null) {
root.setRightSubTreeMax(newNode.getMaxRecordKey());
root.setRightSubTreeMin(newNode.getMinRecordKey());
root.setRight(newNode);
} else {
if (root.getRightSubTreeMax().compareTo(newNode.getMaxRecordKey()) < 0) {
root.setRightSubTreeMax(newNode.getMaxRecordKey());
}
if (root.getRightSubTreeMin().compareTo(newNode.getMinRecordKey()) > 0) {
root.setRightSubTreeMin(newNode.getMinRecordKey());
}
insert(root.getRight(), newNode);
}
} else {
if (root.getLeft() == null) {
root.setLeftSubTreeMax(newNode.getMaxRecordKey());
root.setLeftSubTreeMin(newNode.getMinRecordKey());
root.setLeft(newNode);
} else {
if (root.getLeftSubTreeMax().compareTo(newNode.getMaxRecordKey()) < 0) {
root.setLeftSubTreeMax(newNode.getMaxRecordKey());
}
if (root.getLeftSubTreeMin().compareTo(newNode.getMinRecordKey()) > 0) {
root.setLeftSubTreeMin(newNode.getMinRecordKey());
}
insert(root.getLeft(), newNode);
}
}
return root;
}
getMatchingFilesAndPartition
對(duì)于有最大值最小值的文件,利用區(qū)間樹 KeyRangeLookupTree 查找可能包含該 recordKey 的 fileId 列表 。
根據(jù)文件的最大最小值判斷,如果 recordKey 在最大值最小值區(qū)間,則可能存在該文件中
如果不在最大值最小值區(qū)間,則不存在該文件中
利用區(qū)間樹的原因主要是可以降低查詢時(shí)間。
對(duì)于沒有最大值最小值的文件,則認(rèn)為都可能存在該 recordKey ,所以全部返回
返回值 Set(Pair(partitionPath, fileId))
@Override
public Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey) {
// (partitionPath, fileId)
Set<Pair<String, String>> toReturn = new HashSet<>();
// could be null, if there are no files in a given partition yet or if all index files have no ranges
// 如果給定分區(qū)中還沒有文件,或者所有索引文件都沒有范圍,則可能為null
if (partitionToFileIndexLookUpTree.containsKey(partitionPath)) {
// 利用 KeyRangeLookupTree 查找該分區(qū)下有最大值最小值的文件中可能包含該 recordKey 的 fileId 列表
// 查找邏輯:根據(jù)文件的最大最小值判斷,如果 recordKey 在最大值最小值區(qū)間,則可能存在該文件中
// 如果不在最大值最小值區(qū)間,則不存在該文件中
partitionToFileIndexLookUpTree.get(partitionPath).getMatchingIndexFiles(recordKey).forEach(file ->
toReturn.add(Pair.of(partitionPath, file)));
}
if (partitionToFilesWithNoRanges.containsKey(partitionPath)) {
// 對(duì)于沒有最大值最小值的文件,則認(rèn)為都是可能存在該 recordKey ,所以全部返回
partitionToFilesWithNoRanges.get(partitionPath).forEach(file ->
toReturn.add(Pair.of(partitionPath, file)));
}
return toReturn;
}
findMatchingFilesForRecordKeys
找出<RowKey,filename>對(duì)。
/**
* Find out <RowKey, filename> pair.
* 找出<RowKey,filename>對(duì)。
*/
Map<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
List<Pair<String, HoodieKey>> fileComparisons,
HoodieTable hoodieTable) {
// 按照 fileId 排序
fileComparisons = fileComparisons.stream().sorted((o1, o2) -> o1.getLeft().compareTo(o2.getLeft())).collect(toList());
List<HoodieKeyLookupHandle.KeyLookupResult> keyLookupResults = new ArrayList<>();
// 這里實(shí)際返回 LazyKeyCheckIterator,其父類的 LazyIterableIterator 的 next 方法會(huì)調(diào)用 computeNext
Iterator<List<HoodieKeyLookupHandle.KeyLookupResult>> iterator = new HoodieBaseBloomIndexCheckFunction(hoodieTable, config).apply(fileComparisons.iterator());
while (iterator.hasNext()) {
// 這里實(shí)際調(diào)用 LazyKeyCheckIterator.computeNext
// 這里涉及讀取保存在 Parquet文件中的布隆過濾器 BloomFilter
keyLookupResults.addAll(iterator.next());
}
Map<HoodieKey, HoodieRecordLocation> hoodieRecordLocationMap = new HashMap<>();
// 過濾掉 matchingRecordKeys 為空的,matchingRecordKeys 為空代表,沒有一個(gè) recordKey 存在于該文件中
keyLookupResults = keyLookupResults.stream().filter(lr -> lr.getMatchingRecordKeys().size() > 0).collect(toList());
keyLookupResults.forEach(lookupResult -> {
lookupResult.getMatchingRecordKeys().forEach(r -> {
// (HoodieKey, HoodieRecordLocation) ,將 HoodieKey 和 HoodieRecordLocation 關(guān)聯(lián)
hoodieRecordLocationMap.put(new HoodieKey(r, lookupResult.getPartitionPath()), new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId()));
});
});
return hoodieRecordLocationMap;
}
LazyKeyCheckIterator.computeNext
protected List<KeyLookupResult> computeNext() {
List<KeyLookupResult> ret = new ArrayList<>();
try {
// process one file in each go.
// 遍歷 (fileId, HoodieKey)
while (inputItr.hasNext()) {
Pair<String, HoodieKey> currentTuple = inputItr.next();
String fileId = currentTuple.getLeft();
String partitionPath = currentTuple.getRight().getPartitionPath();
String recordKey = currentTuple.getRight().getRecordKey();
// (partitionPath, fileId)
Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, fileId);
// lazily init state
// 延遲初始化狀態(tài)
if (keyLookupHandle == null) {
// 在 HoodieKeyLookupHandle 的構(gòu)造方法中會(huì)讀取保存在Parquet文件中的布隆過濾器信息
// 將其反序列化為 BloomFilter
keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
}
// if continue on current file
// 如果繼續(xù)當(dāng)前文件
// (partitionPath, fileId) 確定一個(gè)文件,一個(gè) fileId 對(duì)應(yīng)多個(gè)HoodieKey,
// 所以可能在一個(gè)文件上可能遍歷多次
// 前面已經(jīng)按照 fileId 排序,所以可以保證一個(gè) fileId 對(duì)應(yīng)的記錄是連續(xù)的。
if (keyLookupHandle.getPartitionPathFilePair().equals(partitionPathFilePair)) {
// 添加 recordKey
// 這里利用布隆過濾器進(jìn)行二次過濾,將命中(可能存在于該文件中)的 recordKey 添加到 candidateRecordKeys (候選RecordKeys)
// bloomFilter.mightContain(recordKey) 判斷該recordKey 是否可能存在于該文件
keyLookupHandle.addKey(recordKey);
} else { // 如果上一個(gè)文件結(jié)束
// do the actual checking of file & break out
// 進(jìn)行文件的實(shí)際檢查和分解
// 將 keyLookupHandle.getLookupResult 查詢結(jié)果添加到返回值 ret 中
ret.add(keyLookupHandle.getLookupResult());
// 新文件的 HoodieKeyLookupHandle
keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
// 添加 recordKey
// 這里利用布隆過濾器進(jìn)行二次過濾,將命中(可能存在于該文件中)的 recordKey 添加到 candidateRecordKeys (候選RecordKeys)
// bloomFilter.mightContain(recordKey) 判斷該recordKey 是否可能存在于該文件
keyLookupHandle.addKey(recordKey);
break;
}
}
// handle case, where we ran out of input, close pending work, update return val
// 處理輸入不足的情況,關(guān)閉待處理的工作,更新返回值
if (!inputItr.hasNext()) {
// 遍歷結(jié)束,將 getLookupResult 的返回值添加到 ret 中
ret.add(keyLookupHandle.getLookupResult());
}
} catch (Throwable e) {
if (e instanceof HoodieException) {
throw e;
}
throw new HoodieIndexException("Error checking bloom filter index. ", e);
}
return ret;
}
createNewFileReader().readBloomFilter()
this.bloomFilter = createNewFileReader().readBloomFilter();
// HoodieParquetReader.readBloomFilter
public BloomFilter readBloomFilter() {
return parquetUtils.readBloomFilterFromMetadata(conf, path);
}
// BaseFileUtils.readBloomFilterFromMetadata
/**
* Read the bloom filter from the metadata of the given data file.
* 從給定數(shù)據(jù)文件的元數(shù)據(jù)中讀取布隆過濾器。
* @param configuration Configuration
* @param filePath The data file path
* @return a BloomFilter object
*/
public BloomFilter readBloomFilterFromMetadata(Configuration configuration, Path filePath) {
Map<String, String> footerVals =
readFooter(configuration, false, filePath,
HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE);
String footerVal = footerVals.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
if (null == footerVal) {
// We use old style key "com.uber.hoodie.bloomfilter"
footerVal = footerVals.get(HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
}
BloomFilter toReturn = null;
if (footerVal != null) {
if (footerVals.containsKey(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)) {
toReturn = BloomFilterFactory.fromString(footerVal,
footerVals.get(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE));
} else {
toReturn = BloomFilterFactory.fromString(footerVal, BloomFilterTypeCode.SIMPLE.name());
}
}
return toReturn;
}
keyLookupHandle.addKey
public void addKey(String recordKey) {
// check record key against bloom filter of current file & add to possible keys if needed
// 根據(jù)當(dāng)前文件的布隆過濾器檢查記錄鍵,并在需要時(shí)添加可能的鍵
if (bloomFilter.mightContain(recordKey)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Record key " + recordKey + " matches bloom filter in " + partitionPathFilePair);
}
// 如果命中的話,添加到候選
candidateRecordKeys.add(recordKey);
}
totalKeysChecked++;
}
getLookupResult
在所有添加的鍵中,返回在文件組中實(shí)際找到的鍵的列表。
/**
* Of all the keys, that were added, return a list of keys that were actually found in the file group.
*
* 在所有添加的鍵中,返回在文件組中實(shí)際找到的鍵的列表。
*/
public KeyLookupResult getLookupResult() {
if (LOG.isDebugEnabled()) {
LOG.debug("#The candidate row keys for " + partitionPathFilePair + " => " + candidateRecordKeys);
}
HoodieBaseFile dataFile = getLatestDataFile();
// 調(diào)用 checkCandidatesAgainstFile 返回在文件組中實(shí)際找到的 RecordKeys。
List<String> matchingKeys =
checkCandidatesAgainstFile(hoodieTable.getHadoopConf(), candidateRecordKeys, new Path(dataFile.getPath()));
LOG.info(
String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)", totalKeysChecked,
candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size()));
return new KeyLookupResult(partitionPathFilePair.getRight(), partitionPathFilePair.getLeft(),
dataFile.getCommitTime(), matchingKeys);
}
/**
* Given a list of row keys and one file, return only row keys existing in that file.
*
* 給定一個(gè)行鍵列表和一個(gè)文件,只返回該文件中存在的行鍵。
* 這里拿候選的 RecordKeys 去實(shí)際的 parquet文件中做一一比對(duì),看是否確實(shí)存在于該 parquet文件中
* 返回過濾后的實(shí)際存在于該 parquet文件中的 RecordKeys
*/
public List<String> checkCandidatesAgainstFile(Configuration configuration, List<String> candidateRecordKeys,
Path filePath) throws HoodieIndexException {
List<String> foundRecordKeys = new ArrayList<>();
try {
// Load all rowKeys from the file, to double-confirm
if (!candidateRecordKeys.isEmpty()) {
HoodieTimer timer = new HoodieTimer().startTimer();
// 這里拿候選的 RecordKeys 去實(shí)際的 parquet文件中做一一比對(duì),看是否確實(shí)存在于該 parquet文件中
// 返回過濾后的實(shí)際存在于該 parquet文件中的 RecordKeys
Set<String> fileRowKeys = createNewFileReader().filterRowKeys(new HashSet<>(candidateRecordKeys));
foundRecordKeys.addAll(fileRowKeys);
LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath,
timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size()));
if (LOG.isDebugEnabled()) {
LOG.debug("Keys matching for file " + filePath + " => " + foundRecordKeys);
}
}
} catch (Exception e) {
throw new HoodieIndexException("Error checking candidate keys against file.", e);
}
return foundRecordKeys;
}
tagLocationBacktoRecords
在第二步已經(jīng)通過 lookupIndex 獲取的 HoodieKey 和 HoodieRecordLocation 的對(duì)應(yīng)關(guān)系,tagLocationBacktoRecords 就是根據(jù) lookupIndex 返回的對(duì)應(yīng)關(guān)系 keyFilenamePair ,為每個(gè) HoodieRecord 設(shè)置 currentLocation 。
有的 HoodieRecord 可能沒有對(duì)應(yīng)的 fileId,所以也就不會(huì)設(shè)置 currentLocation 。
/**
* Tag the <rowKey, filename> back to the original HoodieRecord List.
*
* 將<rowKey,filename>標(biāo)記回原始的 HoodieRecord 列表。
* 其實(shí)就是設(shè)置 HoodieRecord 的 currentLocation
*/
protected List<HoodieRecord<T>> tagLocationBacktoRecords(
Map<HoodieKey, HoodieRecordLocation> keyFilenamePair, List<HoodieRecord<T>> records) {
// (HoodieKey, HoodieRecord)
Map<HoodieKey, HoodieRecord<T>> keyRecordPairMap = new HashMap<>();
records.forEach(r -> keyRecordPairMap.put(r.getKey(), r));
// Here as the record might have more data than rowKey (some rowKeys' fileId is null),
// so we do left outer join.
// 在這里,由于記錄可能比rowKey有更多的數(shù)據(jù)(一些rowKeys的fileId為空),因此我們進(jìn)行了左外連接。
List<Pair<HoodieRecord<T>, HoodieRecordLocation>> newList = new ArrayList<>();
keyRecordPairMap.keySet().forEach(k -> {
if (keyFilenamePair.containsKey(k)) { // 如果存在,代表該 key 已經(jīng)存在于文件中
//(HoodieRecord, HoodieRecordLocation) 根據(jù)該 key 獲取對(duì)應(yīng)的 HoodieRecordLocation
newList.add(Pair.of(keyRecordPairMap.get(k), keyFilenamePair.get(k)));
} else {
// 否則,沒有對(duì)應(yīng)的文件,添加為 null
newList.add(Pair.of(keyRecordPairMap.get(k), null));
}
});
List<HoodieRecord<T>> res = Lists.newArrayList();
for (Pair<HoodieRecord<T>, HoodieRecordLocation> v : newList) {
// 通過 HoodieIndexUtils.getTaggedRecord 設(shè)置每個(gè) HoodieRecord 的 currentLocation
res.add(HoodieIndexUtils.getTaggedRecord(v.getLeft(), Option.ofNullable(v.getRight())));
}
return res;
}
總結(jié)
tag/tagLocation :根據(jù)索引信息判斷記錄是否存在,如果不存在,代表是新增數(shù)據(jù),如果記錄存在則代表是更新數(shù)據(jù),需要找到并設(shè)置 currentLocation。
tag : table.getIndex().tagLocation -> JavaHoodieBloomIndex.tagLocation -> HoodieBaseBloomIndex.tagLocation
tagLocation 會(huì)利用上篇文章講的寫到 parquet 文件中的 最大值最小值和布隆過濾器
最大值最小值用在第一階段的過濾:構(gòu)造區(qū)間樹 (Interval Tree),利用區(qū)間樹查找 每個(gè) recordKey 可能存在于哪些文件中,利用區(qū)間樹的有點(diǎn)在于可以加速查找,時(shí)間復(fù)雜度為 O(logN)。
對(duì)于有最大值和最小值的文件,如果該 recordKey 在最大值最小之區(qū)間內(nèi),則認(rèn)為該文件可能包含 recordKey
對(duì)于沒有最大值和最小值的文件,則認(rèn)為該文件可能包含 recordKey
所以這里返回的是多對(duì)多的關(guān)系,類似于笛卡爾積。即一個(gè)文件可能保存多個(gè) recordKey ,一個(gè) recordKey 可能存在于多個(gè)文件中。
所以對(duì)于 recordKey 有明確的順序關(guān)系的(例如:時(shí)間戳作為前綴),要比較的文件數(shù)量會(huì)因范圍修剪而大大減少。這樣不僅可以加速查找時(shí)間,還會(huì)提高查詢精確度,也就是返回的 recordKey 和 location 的關(guān)系數(shù)量會(huì)少許多。
對(duì)于像字符串這種沒有順序關(guān)系的(hash值) recordKey ,會(huì)導(dǎo)致每個(gè)文件的最大值最小值區(qū)間范圍都會(huì)比較大,這樣 recordKey 就會(huì)可能存在于多個(gè)文件中,導(dǎo)致返回的對(duì)應(yīng)關(guān)系特別大,不僅影響區(qū)間樹的查詢效率,還會(huì)影響后面的遍歷性能。而且這種對(duì)應(yīng)關(guān)系也會(huì)占比較大的內(nèi)存,比如
本批次recordKey 的數(shù)量有 10萬 ,文件數(shù)有 1000個(gè),那個(gè)最后返回的 List(fileId, HoodieKey) 的數(shù)量可能會(huì)達(dá)到 10w * 1000 = 1億個(gè),這是最壞的情況,但是一般情況下也會(huì)達(dá)到幾千萬個(gè)。所以對(duì)于沒有順序關(guān)系的 recordKey ,我們可能禁用第一階段的利用區(qū)間樹過濾,效率可能會(huì)更好一些。相關(guān)參數(shù) :hoodie.bloom.index.use.treebased.filter = false , hoodie.bloom.index.prune.by.ranges = false
布隆過濾器用在第二階段的過濾,遍歷第一階段返回的 List(fileId, HoodieKey) ,利用從parquet文件中反序列化的來的布隆過濾器進(jìn)行二次過濾,判斷哪些 HoodieKey 有可能存在于該 fileId 中,如果可能存在則添加到候選:candidateRecordKeys
布隆過濾器的優(yōu)點(diǎn)是空間效率和查詢時(shí)間都比一般的算法要好的多,缺點(diǎn)是有一定的誤識(shí)別率和刪除困難。所以只能判斷有可能存在,還需要去和實(shí)際的數(shù)據(jù)文件去對(duì)比,進(jìn)一步確認(rèn)是否確實(shí)存在于該文件中。
然后遍歷 candidateRecordKeys ,去遍歷每個(gè)parquet數(shù)據(jù)文件,和 parquet 文件中的 key 進(jìn)行實(shí)際的比較,對(duì)于確實(shí)存在于該文件中的,返回實(shí)際的 HoodieKey 和 Location 的對(duì)應(yīng)關(guān)系:
Map<HoodieKey, HoodieRecordLocation>最后根據(jù)上一步返回的 HoodieKey 和 Location 的對(duì)應(yīng)關(guān)系,為每個(gè) HoodieRecord 設(shè)置 currentLocation ,有的 HoodieRecord 沒有對(duì)應(yīng)的 fileId ,所以不需要設(shè)置 currentLocation。
無論是區(qū)間樹查詢最大值最小值,還是反序列化布隆過濾,都僅涉及讀取文件頁腳,所以讀取成本較低
隨著表數(shù)據(jù)量的增加、數(shù)據(jù)文件數(shù)的增加,會(huì)導(dǎo)致兩個(gè)問題,從而使索引性能越來越差。
占用內(nèi)存增加:前面提到有笛卡爾積,文件數(shù)越多,笛卡爾積越大,從而占用內(nèi)存也會(huì)增加,遍歷耗時(shí)也會(huì)增加。另外布隆過濾器 BitSet 也會(huì)隨著每個(gè)文件的 recordKey 的數(shù)量的增加越來越大,從而導(dǎo)致布隆過濾器占用的內(nèi)存也越來越大。
我們?cè)诶脜^(qū)間樹和布隆過濾器過濾完每個(gè)key可能存在于哪些文件中之后,會(huì)使用篩選后的鍵和關(guān)聯(lián)的base文件(這里為parquet) 執(zhí)行實(shí)際的文件查找 。因?yàn)檫@里要加載所有涉及的整個(gè)parquet文件內(nèi)容,隨著文件數(shù)量的增大和文件大小的增加,都會(huì)導(dǎo)致遍歷查詢這些parquet文件的時(shí)間越來越長,從而導(dǎo)致索引性能越來越差。
?? 分享、點(diǎn)贊、在看,給個(gè)3連擊唄!??
