<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Hudi 源碼 | Hudi 索引:Tag 和 TagLocation

          共 53695字,需瀏覽 108分鐘

           ·

          2024-07-25 14:16

          前言

          上篇文章和之前的總結(jié)的源碼文章,本文總結(jié)源碼 tag/tagLocation ,對(duì)應(yīng)功能:根據(jù)索引信息判斷記錄是否存在,如果不存在,代表是新增數(shù)據(jù),如果記錄存在則代表是更新數(shù)據(jù),需要找到并設(shè)置 currentLocation。

          tag

          AbstractWriteHelper.tag

            private I tag(
                I dedupedRecords, HoodieEngineContext context, HoodieTable<T, I, K, O> table)
           
          {
              // perform index loop up to get existing location of records
              // 執(zhí)行索引循環(huán)以獲取記錄的現(xiàn)有位置
              // 對(duì)于 Java Client 這里 table 為 HoodieJavaCopyOnWriteTable
              return table.getIndex().tagLocation(dedupedRecords, context, table);
            }

          table.getIndex()

          對(duì)于 Java Client 這里 table 為 HoodieJavaCopyOnWriteTable , HoodieJavaCopyOnWriteTable.getIndex() -> HoodieJavaTable.getIndex

            protected HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext context) {
              return JavaHoodieIndex.createIndex(config);
            }

            public static HoodieIndex createIndex(HoodieWriteConfig config) {
              // first use index class config to create index.
              if (!StringUtils.isNullOrEmpty(config.getIndexClass())) {
                Object instance = ReflectionUtils.loadClass(config.getIndexClass(), config);
                if (!(instance instanceof HoodieIndex)) {
                  throw new HoodieIndexException(config.getIndexClass() + " is not a subclass of HoodieIndex");
                }
                return (JavaHoodieIndex) instance;
              }

              // TODO more indexes to be added
              // 從這里看出,當(dāng)前版本(0.9.0),Java Client 只支持兩種索引類型:INMEMORY 和 BLOOM
              switch (config.getIndexType()) {
                case INMEMORY:
                  return new JavaInMemoryHashIndex(config);
                case BLOOM:
                  return new JavaHoodieBloomIndex(config);
                default:
                  throw new HoodieIndexException("Unsupported index type " + config.getIndexType());
              }
            } 

          因?yàn)橹付怂饕愋蜑?BLOOM , 所以這里返回 JavaHoodieBloomIndex 。

          JavaHoodieBloomIndex

          JavaHoodieBloomIndex 的父類為 HoodieBaseBloomIndex ,其 tagLocation 在父類 HoodieBaseBloomIndex

          public class JavaHoodieBloomIndex<T extends HoodieRecordPayloadextends HoodieBaseBloomIndex<T{
            public JavaHoodieBloomIndex(HoodieWriteConfig config) {
              super(config);
            }
          }

          tagLocation

          1. 提取映射:Map(partitionPath, List )

          2. lookupIndex :根據(jù)索引查找每個(gè) recordKey 的 location,返回 每個(gè) recordKey 和 location 的對(duì)應(yīng)關(guān)系: Map<HoodieKey, HoodieRecordLocation>

          3. tagLocationBacktoRecords :根據(jù) lookupIndex 返回的 recordKey 和 location 的對(duì)應(yīng)關(guān)系對(duì)應(yīng)關(guān)系 keyFilenamePair ,為每個(gè) HoodieRecord 設(shè)置 currentLocation 。

            @Override
            public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records, HoodieEngineContext context,
                                                     HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
              // Step 1: Extract out thinner Map of (partitionPath, recordKey)
              // 第一步:提取更薄的映射(partitionPath,recordKey)
              // (partitionPath, List(recordKey))
              Map<String, List<String>> partitionRecordKeyMap = new HashMap<>();
              records.forEach(record -> {
                // 如果包含記錄對(duì)應(yīng)的分區(qū)路徑
                if (partitionRecordKeyMap.containsKey(record.getPartitionPath())) {
                  // 現(xiàn)有分區(qū)中對(duì)應(yīng)的 List 添加 recordKey
                  partitionRecordKeyMap.get(record.getPartitionPath()).add(record.getRecordKey());
                } else {
                  // 將 recordKey 添加到 recordKeys 中
                  List<String> recordKeys = Lists.newArrayList();
                  recordKeys.add(record.getRecordKey());
                  // 添加新的分區(qū)路徑和對(duì)應(yīng)的  List(recordKey)
                  partitionRecordKeyMap.put(record.getPartitionPath(), recordKeys);
                }
              });

              // Step 2: Lookup indexes for all the partition/recordkey pair
              // 第二步:根據(jù)索引查找每個(gè) recordKey 的 location,返回 每個(gè) recordKey 和 location 的對(duì)應(yīng)關(guān)系
              // (HoodieKey, HoodieRecordLocation)
              Map<HoodieKey, HoodieRecordLocation> keyFilenamePairMap =
                  lookupIndex(partitionRecordKeyMap, context, hoodieTable);

              if (LOG.isDebugEnabled()) {
                long totalTaggedRecords = keyFilenamePairMap.values().size();
                LOG.debug("Number of update records (ones tagged with a fileID): " + totalTaggedRecords);
              }

              // Step 3: Tag the incoming records, as inserts or updates, by joining with existing record keys
              // 第三步:通過與現(xiàn)有recordKey連接,將傳入記錄標(biāo)記為插入或更新
              List<HoodieRecord<T>> taggedRecords = tagLocationBacktoRecords(keyFilenamePairMap, records);

              return taggedRecords;
            }

          lookupIndex

          查找每個(gè) recordKey 的 location,并返回已存在的所有 recordKey 和 location 的映射: Map<HoodieKey, HoodieRecordLocation>,如果不存在,則刪除記錄鍵。

          1. 在傳入記錄中獲取每個(gè)分區(qū)的記錄數(shù) :recordsPerPartition

          2. loadInvolvedFiles :將所有涉及的文件加載為<Partition, filename>對(duì)

          3. explodeRecordsWithFileComparisons :利用區(qū)間樹根據(jù)最大值最小值查找每個(gè) HoodieKey 可能存在于哪個(gè)文件,返回:List<Pair<fileId, HoodieKey>> ,這里多個(gè) fileId 對(duì)應(yīng)一個(gè) HoodieKey ,一個(gè) fileId 對(duì)應(yīng)多個(gè)HoodieKey,類似于笛卡爾積,多對(duì)多的關(guān)系,按照 fileId 排序

          4. findMatchingFilesForRecordKeys :遍歷 explodeRecordsWithFileComparisons 返回的 List<Pair<fileId, HoodieKey>> ,以 fileId 為維度,利用布隆索引判斷有哪些 HoodieKey 可能存在于該文件中,并添加到候選:candidateRecordKeys ,最后遍歷 candidateRecordKeys ,去 parquet 數(shù)據(jù)文件中確認(rèn)該 key 是否確實(shí)存在于該文件,最后返回確切的 recordKey 和 location 的映射關(guān)系 :Map<HoodieKey, HoodieRecordLocation>

            /**
             * Lookup the location for each record key and return the pair<record_key,location> for all record keys already
             * present and drop the record keys if not present.
             *
             * 查找每個(gè) recordKey 的 location,并返回已存在的所有 recordKey 的 pair<record_key,location>,如果不存在,則刪除記錄鍵。
             */

            private Map<HoodieKey, HoodieRecordLocation> lookupIndex(
                Map<String, List<String>> partitionRecordKeyMap, final HoodieEngineContext context,
                final HoodieTable hoodieTable)
           
          {
              // Obtain records per partition, in the incoming records
              // 在傳入記錄中獲取每個(gè)分區(qū)的記錄數(shù)
              Map<String, Long> recordsPerPartition = new HashMap<>();
              // (分區(qū)路徑,每個(gè)分區(qū)路徑對(duì)應(yīng)的記錄數(shù))
              partitionRecordKeyMap.keySet().forEach(k -> recordsPerPartition.put(k, Long.valueOf(partitionRecordKeyMap.get(k).size())));
              // 所有的分區(qū)路徑
              List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());

              // Step 2: Load all involved files as <Partition, filename> pairs
              // 第二步:將所有涉及的文件加載為<Partition, filename>對(duì)
              // List(Partition, BloomIndexFileInfo) BloomIndexFileInfo 包含 fileID,minRecordKey,maxRecordKey
              List<Pair<String, BloomIndexFileInfo>> fileInfoList =
                  loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable);
              // Map (Partition, List(BloomIndexFileInfo))
              final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
                  fileInfoList.stream().collect(groupingBy(Pair::getLeft, mapping(Pair::getRight, toList())));

              // Step 3: Obtain a List, for each incoming record, that already exists, with the file id,
              // that contains it.
              // 第三步:為每個(gè)已存在的傳入記錄獲取一個(gè)列表,其中包含該列表的文件id。
              // List(fileId, HoodieKey) ,這里多個(gè) fileId 對(duì)應(yīng)一個(gè) HoodieKey ,一個(gè) fileId 對(duì)應(yīng)多個(gè)HoodieKey
              // 類似于笛卡爾積,多對(duì)多的關(guān)系,按照 fileId 排序
              List<Pair<String, HoodieKey>> fileComparisons =
                  explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyMap);
              return findMatchingFilesForRecordKeys(fileComparisons, hoodieTable);
            }

          loadInvolvedFiles

          將所有涉及的文件加載為 List<Pair<partitionPath, BloomIndexFileInfo>>

            /**
             * Load all involved files as <Partition, filename> pair List.
             *
             * 將所有涉及的文件加載為<Partition,BloomIndexFileInfo>對(duì)列表。
             */

            //TODO duplicate code with spark, we can optimize this method later
            List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final HoodieEngineContext context,
                                                                     final HoodieTable hoodieTable) {
              // Obtain the latest data files from all the partitions.
              // 從所有分區(qū)中獲取最新的數(shù)據(jù)文件。
              // List (partitionPath,FileId)
              List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
                  .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
                  .collect(toList());

              // 是否需要根據(jù)最大值最小值進(jìn)行第一階段過濾
              if (config.getBloomIndexPruneByRanges()) {// 默認(rèn)true
                // also obtain file ranges, if range pruning is enabled
                context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
                return context.map(partitionPathFileIDList, pf -> {
                  try {
                    HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
                    // 讀取最大值最小值,具體為 parquet 文件元數(shù)據(jù)中的 hoodie_min_record_key 、hoodie_max_record_key
                    String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
                    // 返回 (partitionPath, (fileId, hoodie_min_record_key, hoodie_max_record_key))
                    return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
                  } catch (MetadataNotFoundException me) {
                    LOG.warn("Unable to find range metadata in file :" + pf);
                    return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
                  }
                }, Math.max(partitionPathFileIDList.size(), 1));
              } else {
                // 返回 (partitionPath, (fileId, null, null))
                return partitionPathFileIDList.stream()
                    .map(pf -> Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList());
              }

          Interval Tree

          Interval Tree :區(qū)間樹 ,翻譯軟件一般翻譯為間隔樹。

          • 百度百科:區(qū)間樹是在平衡樹基礎(chǔ)上進(jìn)行擴(kuò)展得到的支持以區(qū)間為元素的動(dòng)態(tài)集合的操作,其中每個(gè)節(jié)點(diǎn)的關(guān)鍵值是區(qū)間的左端點(diǎn)。

          • 博客:區(qū)間樹是在紅黑樹基礎(chǔ)上進(jìn)行擴(kuò)展得到的支持以區(qū)間為元素的動(dòng)態(tài)集合的操作,其中每個(gè)節(jié)點(diǎn)的關(guān)鍵值是區(qū)間的左端點(diǎn)。通過建立這種特定的結(jié)構(gòu),可是使區(qū)間的元素的查找和插入都可以在O(lgn)的時(shí)間內(nèi)完成。相比于基礎(chǔ)的紅黑樹數(shù)據(jù)結(jié)構(gòu),增加了一個(gè)max[x],即以x為根的子樹中所有區(qū)間的斷點(diǎn)的最大值

          • 請(qǐng)注意:區(qū)間樹和線段樹不一樣,線段樹是一種特殊的區(qū)間樹。區(qū)間樹:Interval Tree , 線段樹:Segment Tree 。網(wǎng)上有很多博客將區(qū)間樹和線段樹歸為一種。

          • 線段樹:Segment Tree ,線段樹是一種二叉搜索樹,與區(qū)間樹相似,它將一個(gè)區(qū)間劃分成一些單元區(qū)間,每個(gè)單元區(qū)間對(duì)應(yīng)線段樹中的一個(gè)葉結(jié)點(diǎn)。使用線段樹可以快速的查找某一個(gè)節(jié)點(diǎn)在若干條線段中出現(xiàn)的次數(shù),時(shí)間復(fù)雜度為O(logN)。而未優(yōu)化的空間復(fù)雜度為2N,實(shí)際應(yīng)用時(shí)一般還要開4N的數(shù)組以免越界,因此有時(shí)需要離散化讓空間壓縮。

          explodeRecordsWithFileComparisons

          • 首先判斷是否需要使用區(qū)間樹基于最小和最大記錄鍵值進(jìn)行過濾,默認(rèn)為true,則創(chuàng)建 IntervalTreeBasedIndexFileFilter 。

          • IntervalTreeBasedIndexFileFilter :基于區(qū)間樹的索引查找。為每個(gè)分區(qū)構(gòu)建一個(gè){@link KeyRangeLookupTree},并使用它來搜索需要查找的任何給定recordKey的匹配索引文件。

          • 主要邏輯:利用區(qū)間樹根據(jù)最大值最小值,返回可能包含 recordKey 的文件列表。利用區(qū)間樹的原因主要是可以降低查詢時(shí)間。

          • 查詢邏輯:

            • 對(duì)于有最大值和最小值的文件,如果該 recordKey 在最大值最小之區(qū)間內(nèi),則認(rèn)為該文件可能包含 recordKey

            • 對(duì)于沒有最大值和最小值的文件,則認(rèn)為該文件可能包含 recordKey

          • 返回值:List(fileId, HoodieKey) , 多個(gè) fileId 對(duì)應(yīng)一個(gè) HoodieKey

            /**
             * For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be
             * checked. For tables, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files
             * to be compared gets cut down a lot from range pruning.
             *
             * 對(duì)于每個(gè)傳入的記錄,生成N個(gè)輸出記錄,每個(gè)文件1個(gè),需要對(duì)照該記錄的密鑰進(jìn)行檢查。
             * 對(duì)于鍵有明確插入順序的表(例如:時(shí)間戳作為前綴),要比較的文件數(shù)量會(huì)因范圍修剪而大大減少。
             * <p>
             * Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
             * recordKey ranges in the index info.
             *
             * 子分區(qū),以確保可以根據(jù)文件查找記錄,還可以根據(jù)索引信息中的recordKey范圍修剪文件<=>記錄比較。
             *
             * 主要邏輯:利用區(qū)間樹根據(jù)最大值最小值,返回可能包含 recordKey 的文件列表。利用區(qū)間樹的原因主要是可以降低查詢時(shí)間。
             * 查詢邏輯:1、對(duì)于有最大值和最小值的文件,如果該 recordKey 在最大值最小之區(qū)間內(nèi),則認(rèn)為該文件可能包含 recordKey
             *         2、對(duì)于沒有最大值和最小值的文件,則認(rèn)為該文件可能包含 recordKey
             *
             * 返回值:List(fileId, HoodieKey)
             */

            List<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
                final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
                Map<String, List<String>> partitionRecordKeyMap) {
              // 是否使用區(qū)間樹基于最小和最大記錄鍵值進(jìn)行過濾,默認(rèn)為true
              IndexFileFilter indexFileFilter =
                  config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
                      : new ListBasedIndexFileFilter(partitionToFileIndexInfo);
              // List(fileId, HoodieKey)
              List<Pair<String, HoodieKey>> fileRecordPairs = new ArrayList<>();
              partitionRecordKeyMap.keySet().forEach(partitionPath ->  {
                List<String> hoodieRecordKeys = partitionRecordKeyMap.get(partitionPath);
                hoodieRecordKeys.forEach(hoodieRecordKey -> {
                  indexFileFilter.getMatchingFilesAndPartition(partitionPath, hoodieRecordKey).forEach(partitionFileIdPair -> {
                    // (fileId, HoodieKey)
                    fileRecordPairs.add(Pair.of(partitionFileIdPair.getRight(),
                        new HoodieKey(hoodieRecordKey, partitionPath)));
                  });
                });
              });
              return fileRecordPairs;
            }

          IntervalTreeBasedIndexFileFilter

          基于區(qū)間樹的索引查找。為每個(gè)分區(qū)構(gòu)建一個(gè){@link KeyRangeLookupTree},并使用它來搜索需要查找的任何給定recordKey的匹配索引文件。

          • KeyRangeLookupTree: 基于區(qū)間樹實(shí)現(xiàn)的查找樹,查詢?nèi)我饨o定 Key 的時(shí)間復(fù)雜度為 (N logN)

          • 對(duì)于有有最大值最小值的文件,構(gòu)造為區(qū)間樹:KeyRangeLookupTree

          • 對(duì)于沒有最大值最小值的文件,將 fileId 添加到 partitionToFilesWithNoRanges

            IntervalTreeBasedIndexFileFilter(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) {
              partitionToFileIndexInfo.forEach((partition, bloomIndexFiles) -> {
                // Note that the interval tree implementation doesn't have auto-balancing to ensure logN search time.
                // 請(qǐng)注意,區(qū)間樹實(shí)現(xiàn)沒有自動(dòng)平衡來確保logN搜索時(shí)間。
                // So, we are shuffling the input here hoping the tree will not have any skewness. If not, the tree could be
                // skewed which could result in N search time instead of logN.
                // 所以,我們?cè)谶@里打亂輸入,希望樹不會(huì)有任何傾斜。否則,樹可能會(huì)傾斜,這可能導(dǎo)致搜索時(shí)間是N而不是logN。
                Collections.shuffle(bloomIndexFiles);
                KeyRangeLookupTree lookUpTree = new KeyRangeLookupTree();
                bloomIndexFiles.forEach(indexFileInfo -> {
                  if (indexFileInfo.hasKeyRanges()) { // 如果有最大值最小值
                    // 將 最大值,最小值,fileId 插入到 lookUpTree
                    // 構(gòu)造間隔數(shù)
                    lookUpTree.insert(new KeyRangeNode(indexFileInfo.getMinRecordKey(), indexFileInfo.getMaxRecordKey(),
                        indexFileInfo.getFileId()));
                  } else {
                    if (!partitionToFilesWithNoRanges.containsKey(partition)) {
                      partitionToFilesWithNoRanges.put(partition, new HashSet<>());
                    }
                    // 將沒有最大值最小值的 fileId 添加到 partitionToFilesWithNoRanges
                    partitionToFilesWithNoRanges.get(partition).add(indexFileInfo.getFileId());
                  }
                });
                partitionToFileIndexLookUpTree.put(partition, lookUpTree);
              });
            }

          lookUpTree.insert

          區(qū)間樹的具體構(gòu)造邏輯。

            void insert(KeyRangeNode newNode) {
              root = insert(getRoot(), newNode);
            }

            /**
             * Inserts a new {@link KeyRangeNode} to this look up tree.
             *
             * If no root exists, make {@code newNode} as the root and return the new root.
             *
             * If current root and newNode matches with min record key and max record key, merge two nodes. In other words, add
             * files from {@code newNode} to current root. Return current root.
             *
             * If current root is < newNode if current root has no right sub tree update current root's right sub tree max and min
             * set newNode as right sub tree else update root's right sub tree min and max with newNode's min and max record key
             * as applicable recursively call insert() with root's right subtree as new root
             *
             * else // current root is >= newNode if current root has no left sub tree update current root's left sub tree max and
             * min set newNode as left sub tree else update root's left sub tree min and max with newNode's min and max record key
             * as applicable recursively call insert() with root's left subtree as new root
             *
             * @param root refers to the current root of the look up tree
             * @param newNode newNode the new {@link KeyRangeNode} to be inserted
             */

            private KeyRangeNode insert(KeyRangeNode root, KeyRangeNode newNode) {
              if (root == null) {
                root = newNode;
                return root;
              }

              if (root.compareTo(newNode) == 0) {
                root.addFiles(newNode.getFileNameList());
                return root;
              }

              if (root.compareTo(newNode) < 0) {
                if (root.getRight() == null) {
                  root.setRightSubTreeMax(newNode.getMaxRecordKey());
                  root.setRightSubTreeMin(newNode.getMinRecordKey());
                  root.setRight(newNode);
                } else {
                  if (root.getRightSubTreeMax().compareTo(newNode.getMaxRecordKey()) < 0) {
                    root.setRightSubTreeMax(newNode.getMaxRecordKey());
                  }
                  if (root.getRightSubTreeMin().compareTo(newNode.getMinRecordKey()) > 0) {
                    root.setRightSubTreeMin(newNode.getMinRecordKey());
                  }
                  insert(root.getRight(), newNode);
                }
              } else {
                if (root.getLeft() == null) {
                  root.setLeftSubTreeMax(newNode.getMaxRecordKey());
                  root.setLeftSubTreeMin(newNode.getMinRecordKey());
                  root.setLeft(newNode);
                } else {
                  if (root.getLeftSubTreeMax().compareTo(newNode.getMaxRecordKey()) < 0) {
                    root.setLeftSubTreeMax(newNode.getMaxRecordKey());
                  }
                  if (root.getLeftSubTreeMin().compareTo(newNode.getMinRecordKey()) > 0) {
                    root.setLeftSubTreeMin(newNode.getMinRecordKey());
                  }
                  insert(root.getLeft(), newNode);
                }
              }
              return root;
            }

          getMatchingFilesAndPartition

          • 對(duì)于有最大值最小值的文件,利用區(qū)間樹 KeyRangeLookupTree 查找可能包含該 recordKey 的 fileId 列表 。

            • 根據(jù)文件的最大最小值判斷,如果 recordKey 在最大值最小值區(qū)間,則可能存在該文件中

            • 如果不在最大值最小值區(qū)間,則不存在該文件中

            • 利用區(qū)間樹的原因主要是可以降低查詢時(shí)間。

          • 對(duì)于沒有最大值最小值的文件,則認(rèn)為都可能存在該 recordKey ,所以全部返回    

          • 返回值 Set(Pair(partitionPath, fileId))

            @Override
            public Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey) {
              // (partitionPath, fileId)
              Set<Pair<String, String>> toReturn = new HashSet<>();
              // could be null, if there are no files in a given partition yet or if all index files have no ranges
              // 如果給定分區(qū)中還沒有文件,或者所有索引文件都沒有范圍,則可能為null
              if (partitionToFileIndexLookUpTree.containsKey(partitionPath)) {
                // 利用 KeyRangeLookupTree 查找該分區(qū)下有最大值最小值的文件中可能包含該 recordKey 的 fileId 列表
                // 查找邏輯:根據(jù)文件的最大最小值判斷,如果 recordKey 在最大值最小值區(qū)間,則可能存在該文件中
                // 如果不在最大值最小值區(qū)間,則不存在該文件中
                partitionToFileIndexLookUpTree.get(partitionPath).getMatchingIndexFiles(recordKey).forEach(file ->
                    toReturn.add(Pair.of(partitionPath, file)));
              }
              if (partitionToFilesWithNoRanges.containsKey(partitionPath)) {
                // 對(duì)于沒有最大值最小值的文件,則認(rèn)為都是可能存在該 recordKey ,所以全部返回
                partitionToFilesWithNoRanges.get(partitionPath).forEach(file ->
                    toReturn.add(Pair.of(partitionPath, file)));
              }
              return toReturn;
            }

          findMatchingFilesForRecordKeys

          找出<RowKey,filename>對(duì)。

            /**
             * Find out <RowKey, filename> pair.
             * 找出<RowKey,filename>對(duì)。
             */

            Map<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
                List<Pair<String, HoodieKey>> fileComparisons,
                HoodieTable hoodieTable)
           
          {
              // 按照 fileId 排序
              fileComparisons = fileComparisons.stream().sorted((o1, o2) -> o1.getLeft().compareTo(o2.getLeft())).collect(toList());

              List<HoodieKeyLookupHandle.KeyLookupResult> keyLookupResults = new ArrayList<>();

              // 這里實(shí)際返回 LazyKeyCheckIterator,其父類的 LazyIterableIterator 的 next 方法會(huì)調(diào)用 computeNext
              Iterator<List<HoodieKeyLookupHandle.KeyLookupResult>> iterator = new HoodieBaseBloomIndexCheckFunction(hoodieTable, config).apply(fileComparisons.iterator());
              while (iterator.hasNext()) {
                // 這里實(shí)際調(diào)用 LazyKeyCheckIterator.computeNext
                // 這里涉及讀取保存在 Parquet文件中的布隆過濾器 BloomFilter
                keyLookupResults.addAll(iterator.next());
              }

              Map<HoodieKey, HoodieRecordLocation> hoodieRecordLocationMap = new HashMap<>();

              // 過濾掉 matchingRecordKeys 為空的,matchingRecordKeys 為空代表,沒有一個(gè) recordKey 存在于該文件中
              keyLookupResults = keyLookupResults.stream().filter(lr -> lr.getMatchingRecordKeys().size() > 0).collect(toList());
              keyLookupResults.forEach(lookupResult -> {
                lookupResult.getMatchingRecordKeys().forEach(r -> {
                  // (HoodieKey, HoodieRecordLocation) ,將 HoodieKey 和 HoodieRecordLocation 關(guān)聯(lián)
                  hoodieRecordLocationMap.put(new HoodieKey(r, lookupResult.getPartitionPath()), new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId()));
                });
              });

              return hoodieRecordLocationMap;
            }

          LazyKeyCheckIterator.computeNext

              protected List<KeyLookupResult> computeNext() {
                List<KeyLookupResult> ret = new ArrayList<>();
                try {
                  // process one file in each go.
                  // 遍歷 (fileId, HoodieKey)
                  while (inputItr.hasNext()) {
                    Pair<String, HoodieKey> currentTuple = inputItr.next();
                    String fileId = currentTuple.getLeft();
                    String partitionPath = currentTuple.getRight().getPartitionPath();
                    String recordKey = currentTuple.getRight().getRecordKey();
                    // (partitionPath, fileId)
                    Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, fileId);

                    // lazily init state
                    // 延遲初始化狀態(tài)
                    if (keyLookupHandle == null) {
                      // 在 HoodieKeyLookupHandle 的構(gòu)造方法中會(huì)讀取保存在Parquet文件中的布隆過濾器信息
                      // 將其反序列化為 BloomFilter
                      keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
                    }

                    // if continue on current file
                    // 如果繼續(xù)當(dāng)前文件
                    // (partitionPath, fileId) 確定一個(gè)文件,一個(gè) fileId 對(duì)應(yīng)多個(gè)HoodieKey,
                    // 所以可能在一個(gè)文件上可能遍歷多次
                    // 前面已經(jīng)按照 fileId 排序,所以可以保證一個(gè) fileId 對(duì)應(yīng)的記錄是連續(xù)的。
                    if (keyLookupHandle.getPartitionPathFilePair().equals(partitionPathFilePair)) {
                      // 添加 recordKey
                      // 這里利用布隆過濾器進(jìn)行二次過濾,將命中(可能存在于該文件中)的 recordKey 添加到 candidateRecordKeys (候選RecordKeys)
                      // bloomFilter.mightContain(recordKey) 判斷該recordKey 是否可能存在于該文件
                      keyLookupHandle.addKey(recordKey);
                    } else { // 如果上一個(gè)文件結(jié)束
                      // do the actual checking of file & break out
                      // 進(jìn)行文件的實(shí)際檢查和分解
                      // 將 keyLookupHandle.getLookupResult 查詢結(jié)果添加到返回值 ret 中
                      ret.add(keyLookupHandle.getLookupResult());
                      // 新文件的 HoodieKeyLookupHandle
                      keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
                      // 添加 recordKey
                      // 這里利用布隆過濾器進(jìn)行二次過濾,將命中(可能存在于該文件中)的 recordKey 添加到 candidateRecordKeys (候選RecordKeys)
                      // bloomFilter.mightContain(recordKey) 判斷該recordKey 是否可能存在于該文件
                      keyLookupHandle.addKey(recordKey);
                      break;
                    }
                  }

                  // handle case, where we ran out of input, close pending work, update return val
                  // 處理輸入不足的情況,關(guān)閉待處理的工作,更新返回值
                  if (!inputItr.hasNext()) {
                    // 遍歷結(jié)束,將 getLookupResult 的返回值添加到 ret 中
                    ret.add(keyLookupHandle.getLookupResult());
                  }
                } catch (Throwable e) {
                  if (e instanceof HoodieException) {
                    throw e;
                  }
                  throw new HoodieIndexException("Error checking bloom filter index. ", e);
                }
                return ret;
              }

          createNewFileReader().readBloomFilter()

            this.bloomFilter = createNewFileReader().readBloomFilter();

            // HoodieParquetReader.readBloomFilter
            public BloomFilter readBloomFilter() {
              return parquetUtils.readBloomFilterFromMetadata(conf, path);
            }

            // BaseFileUtils.readBloomFilterFromMetadata
              /**
             * Read the bloom filter from the metadata of the given data file.
             * 從給定數(shù)據(jù)文件的元數(shù)據(jù)中讀取布隆過濾器。
             * @param configuration Configuration
             * @param filePath The data file path
             * @return a BloomFilter object
             */

            public BloomFilter readBloomFilterFromMetadata(Configuration configuration, Path filePath) {
              Map<String, String> footerVals =
                  readFooter(configuration, false, filePath,
                      HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
                      HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
                      HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE);
              String footerVal = footerVals.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
              if (null == footerVal) {
                // We use old style key "com.uber.hoodie.bloomfilter"
                footerVal = footerVals.get(HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
              }
              BloomFilter toReturn = null;
              if (footerVal != null) {
                if (footerVals.containsKey(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)) {
                  toReturn = BloomFilterFactory.fromString(footerVal,
                      footerVals.get(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE));
                } else {
                  toReturn = BloomFilterFactory.fromString(footerVal, BloomFilterTypeCode.SIMPLE.name());
                }
              }
              return toReturn;
            }

          keyLookupHandle.addKey

            public void addKey(String recordKey) {
              // check record key against bloom filter of current file & add to possible keys if needed
              // 根據(jù)當(dāng)前文件的布隆過濾器檢查記錄鍵,并在需要時(shí)添加可能的鍵
              if (bloomFilter.mightContain(recordKey)) {
                if (LOG.isDebugEnabled()) {
                  LOG.debug("Record key " + recordKey + " matches bloom filter in  " + partitionPathFilePair);
                }
                // 如果命中的話,添加到候選
                candidateRecordKeys.add(recordKey);
              }
              totalKeysChecked++;
            }

          getLookupResult

          在所有添加的鍵中,返回在文件組中實(shí)際找到的鍵的列表。

            /**
             * Of all the keys, that were added, return a list of keys that were actually found in the file group.
             *
             * 在所有添加的鍵中,返回在文件組中實(shí)際找到的鍵的列表。
             */

            public KeyLookupResult getLookupResult() {
              if (LOG.isDebugEnabled()) {
                LOG.debug("#The candidate row keys for " + partitionPathFilePair + " => " + candidateRecordKeys);
              }

              HoodieBaseFile dataFile = getLatestDataFile();
              // 調(diào)用 checkCandidatesAgainstFile 返回在文件組中實(shí)際找到的 RecordKeys。
              List<String> matchingKeys =
                  checkCandidatesAgainstFile(hoodieTable.getHadoopConf(), candidateRecordKeys, new Path(dataFile.getPath()));
              LOG.info(
                  String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)", totalKeysChecked,
                      candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size()));
              return new KeyLookupResult(partitionPathFilePair.getRight(), partitionPathFilePair.getLeft(),
                  dataFile.getCommitTime(), matchingKeys);
            }

          /**
             * Given a list of row keys and one file, return only row keys existing in that file.
             *
             * 給定一個(gè)行鍵列表和一個(gè)文件,只返回該文件中存在的行鍵。
             * 這里拿候選的 RecordKeys 去實(shí)際的 parquet文件中做一一比對(duì),看是否確實(shí)存在于該 parquet文件中
             * 返回過濾后的實(shí)際存在于該 parquet文件中的 RecordKeys
             */

            public List<String> checkCandidatesAgainstFile(Configuration configuration, List<String> candidateRecordKeys,
                                                           Path filePath)
           throws HoodieIndexException 
          {
              List<String> foundRecordKeys = new ArrayList<>();
              try {
                // Load all rowKeys from the file, to double-confirm
                if (!candidateRecordKeys.isEmpty()) {
                  HoodieTimer timer = new HoodieTimer().startTimer();
                  // 這里拿候選的 RecordKeys 去實(shí)際的 parquet文件中做一一比對(duì),看是否確實(shí)存在于該 parquet文件中
                  // 返回過濾后的實(shí)際存在于該 parquet文件中的 RecordKeys
                  Set<String> fileRowKeys = createNewFileReader().filterRowKeys(new HashSet<>(candidateRecordKeys));
                  foundRecordKeys.addAll(fileRowKeys);
                  LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath,
                      timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size()));
                  if (LOG.isDebugEnabled()) {
                    LOG.debug("Keys matching for file " + filePath + " => " + foundRecordKeys);
                  }
                }
              } catch (Exception e) {
                throw new HoodieIndexException("Error checking candidate keys against file.", e);
              }
              return foundRecordKeys;
            }

          tagLocationBacktoRecords

          在第二步已經(jīng)通過 lookupIndex 獲取的 HoodieKey 和 HoodieRecordLocation 的對(duì)應(yīng)關(guān)系,tagLocationBacktoRecords 就是根據(jù) lookupIndex 返回的對(duì)應(yīng)關(guān)系 keyFilenamePair ,為每個(gè) HoodieRecord 設(shè)置 currentLocation 。
          有的 HoodieRecord 可能沒有對(duì)應(yīng)的 fileId,所以也就不會(huì)設(shè)置 currentLocation 。

            /**
             * Tag the <rowKey, filename> back to the original HoodieRecord List.
             *
             * 將<rowKey,filename>標(biāo)記回原始的 HoodieRecord 列表。
             * 其實(shí)就是設(shè)置 HoodieRecord 的 currentLocation
             */

            protected List<HoodieRecord<T>> tagLocationBacktoRecords(
                Map<HoodieKey, HoodieRecordLocation> keyFilenamePair, List<HoodieRecord<T>> records) {
              // (HoodieKey, HoodieRecord)
              Map<HoodieKey, HoodieRecord<T>> keyRecordPairMap = new HashMap<>();
              records.forEach(r -> keyRecordPairMap.put(r.getKey(), r));
              // Here as the record might have more data than rowKey (some rowKeys' fileId is null),
              // so we do left outer join.
              // 在這里,由于記錄可能比rowKey有更多的數(shù)據(jù)(一些rowKeys的fileId為空),因此我們進(jìn)行了左外連接。
              List<Pair<HoodieRecord<T>, HoodieRecordLocation>> newList = new ArrayList<>();
              keyRecordPairMap.keySet().forEach(k -> {
                if (keyFilenamePair.containsKey(k)) { // 如果存在,代表該 key 已經(jīng)存在于文件中
                  //(HoodieRecord, HoodieRecordLocation) 根據(jù)該 key 獲取對(duì)應(yīng)的 HoodieRecordLocation
                  newList.add(Pair.of(keyRecordPairMap.get(k), keyFilenamePair.get(k)));
                } else {
                  // 否則,沒有對(duì)應(yīng)的文件,添加為 null
                  newList.add(Pair.of(keyRecordPairMap.get(k), null));
                }
              });
              List<HoodieRecord<T>> res = Lists.newArrayList();
              for (Pair<HoodieRecord<T>, HoodieRecordLocation> v : newList) {
                // 通過 HoodieIndexUtils.getTaggedRecord 設(shè)置每個(gè) HoodieRecord 的 currentLocation
                res.add(HoodieIndexUtils.getTaggedRecord(v.getLeft(), Option.ofNullable(v.getRight())));
              }
              return res;
            }

          總結(jié)

          • tag/tagLocation :根據(jù)索引信息判斷記錄是否存在,如果不存在,代表是新增數(shù)據(jù),如果記錄存在則代表是更新數(shù)據(jù),需要找到并設(shè)置 currentLocation。

          • tag : table.getIndex().tagLocation -> JavaHoodieBloomIndex.tagLocation -> HoodieBaseBloomIndex.tagLocation

          • tagLocation 會(huì)利用上篇文章講的寫到 parquet 文件中的 最大值最小值和布隆過濾器

          • 最大值最小值用在第一階段的過濾:構(gòu)造區(qū)間樹 (Interval Tree),利用區(qū)間樹查找 每個(gè) recordKey 可能存在于哪些文件中,利用區(qū)間樹的有點(diǎn)在于可以加速查找,時(shí)間復(fù)雜度為 O(logN)。

            • 對(duì)于有最大值和最小值的文件,如果該 recordKey 在最大值最小之區(qū)間內(nèi),則認(rèn)為該文件可能包含 recordKey

            • 對(duì)于沒有最大值和最小值的文件,則認(rèn)為該文件可能包含 recordKey

            • 所以這里返回的是多對(duì)多的關(guān)系,類似于笛卡爾積。即一個(gè)文件可能保存多個(gè) recordKey ,一個(gè) recordKey 可能存在于多個(gè)文件中。

            • 所以對(duì)于 recordKey 有明確的順序關(guān)系的(例如:時(shí)間戳作為前綴),要比較的文件數(shù)量會(huì)因范圍修剪而大大減少。這樣不僅可以加速查找時(shí)間,還會(huì)提高查詢精確度,也就是返回的 recordKey 和 location 的關(guān)系數(shù)量會(huì)少許多。

            • 對(duì)于像字符串這種沒有順序關(guān)系的(hash值) recordKey ,會(huì)導(dǎo)致每個(gè)文件的最大值最小值區(qū)間范圍都會(huì)比較大,這樣 recordKey 就會(huì)可能存在于多個(gè)文件中,導(dǎo)致返回的對(duì)應(yīng)關(guān)系特別大,不僅影響區(qū)間樹的查詢效率,還會(huì)影響后面的遍歷性能。而且這種對(duì)應(yīng)關(guān)系也會(huì)占比較大的內(nèi)存,比如
              本批次recordKey 的數(shù)量有 10萬 ,文件數(shù)有 1000個(gè),那個(gè)最后返回的 List(fileId, HoodieKey) 的數(shù)量可能會(huì)達(dá)到 10w * 1000 = 1億個(gè),這是最壞的情況,但是一般情況下也會(huì)達(dá)到幾千萬個(gè)。

            • 所以對(duì)于沒有順序關(guān)系的 recordKey ,我們可能禁用第一階段的利用區(qū)間樹過濾,效率可能會(huì)更好一些。相關(guān)參數(shù) :hoodie.bloom.index.use.treebased.filter = false , hoodie.bloom.index.prune.by.ranges = false

          • 布隆過濾器用在第二階段的過濾,遍歷第一階段返回的 List(fileId, HoodieKey) ,利用從parquet文件中反序列化的來的布隆過濾器進(jìn)行二次過濾,判斷哪些 HoodieKey 有可能存在于該 fileId 中,如果可能存在則添加到候選:candidateRecordKeys

          • 布隆過濾器的優(yōu)點(diǎn)是空間效率和查詢時(shí)間都比一般的算法要好的多,缺點(diǎn)是有一定的誤識(shí)別率和刪除困難。所以只能判斷有可能存在,還需要去和實(shí)際的數(shù)據(jù)文件去對(duì)比,進(jìn)一步確認(rèn)是否確實(shí)存在于該文件中。

          • 然后遍歷 candidateRecordKeys ,去遍歷每個(gè)parquet數(shù)據(jù)文件,和 parquet 文件中的 key 進(jìn)行實(shí)際的比較,對(duì)于確實(shí)存在于該文件中的,返回實(shí)際的 HoodieKey 和 Location 的對(duì)應(yīng)關(guān)系:Map<HoodieKey, HoodieRecordLocation>

          • 最后根據(jù)上一步返回的 HoodieKey 和 Location 的對(duì)應(yīng)關(guān)系,為每個(gè) HoodieRecord 設(shè)置 currentLocation ,有的 HoodieRecord 沒有對(duì)應(yīng)的 fileId ,所以不需要設(shè)置 currentLocation。

          • 無論是區(qū)間樹查詢最大值最小值,還是反序列化布隆過濾,都僅涉及讀取文件頁腳,所以讀取成本較低

          • 隨著表數(shù)據(jù)量的增加、數(shù)據(jù)文件數(shù)的增加,會(huì)導(dǎo)致兩個(gè)問題,從而使索引性能越來越差。

            • 占用內(nèi)存增加:前面提到有笛卡爾積,文件數(shù)越多,笛卡爾積越大,從而占用內(nèi)存也會(huì)增加,遍歷耗時(shí)也會(huì)增加。另外布隆過濾器 BitSet 也會(huì)隨著每個(gè)文件的 recordKey 的數(shù)量的增加越來越大,從而導(dǎo)致布隆過濾器占用的內(nèi)存也越來越大。

            • 我們?cè)诶脜^(qū)間樹和布隆過濾器過濾完每個(gè)key可能存在于哪些文件中之后,會(huì)使用篩選后的鍵和關(guān)聯(lián)的base文件(這里為parquet) 執(zhí)行實(shí)際的文件查找 。因?yàn)檫@里要加載所有涉及的整個(gè)parquet文件內(nèi)容,隨著文件數(shù)量的增大和文件大小的增加,都會(huì)導(dǎo)致遍歷查詢這些parquet文件的時(shí)間越來越長,從而導(dǎo)致索引性能越來越差。

          ?? 分享、點(diǎn)贊、在看,給個(gè)3連擊??

          瀏覽 35
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产黑人大屌 | 爱爱无码视频 | 99在线观看精品视频 | 天天射天天搞天天干 | 琪琪五月色|