<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          冷熱數(shù)據(jù)分離 | Alluxio元數(shù)據(jù)管理策略

          共 4853字,需瀏覽 10分鐘

           ·

          2020-07-12 03:23

          點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)

          回復(fù)”資源“獲取更多資源

          2bc18450a8f312ba17304a3eca3d2fad.webp

          本文作者:林意群

          原文地址:http://suo.im/5Xcmci

          9d7a35ebf83331eb3d7d0aaeed72457b.webp

          大數(shù)據(jù)技術(shù)與架構(gòu)點(diǎn)擊右側(cè)關(guān)注,大數(shù)據(jù)開發(fā)領(lǐng)域最強(qiáng)公眾號!

          65c4d5ba742590a2a1ab88800373f873.webp

          暴走大數(shù)據(jù)點(diǎn)擊右側(cè)關(guān)注,暴走大數(shù)據(jù)!a138da3612ca5f05cd558aa48fa73e64.webp


          一.Alluxio概述

          Alluxio(前身Tachyon)是世界上第一個(gè)以內(nèi)存為中心的虛擬的分布式存儲系統(tǒng)。它統(tǒng)一了數(shù)據(jù)訪問的方式,為上層計(jì)算框架和底層存儲系統(tǒng)構(gòu)建了橋梁。Alluxio項(xiàng)目源自加州大學(xué)伯克利分校AMPLab,作為伯克利數(shù)據(jù)分析堆棧(BDAS)的數(shù)據(jù)訪問層。Alluxio是增長最快的開源項(xiàng)目之一,吸引了來自300多家機(jī)構(gòu)的1000多名貢獻(xiàn)者,包括阿里巴巴,Alluxio,百度,CMU,谷歌,IBM,英特爾,NJU,紅帽,騰訊,加州大學(xué)伯克利分校,以及雅虎。本文我們就來簡單聊聊Alluxio的tier layer的元數(shù)據(jù)管理。b25329a33773ed01602ceeae3838b587.webp當(dāng)元數(shù)據(jù)管理再進(jìn)一步加大的時(shí)候,我們還能如何拓展單個(gè)節(jié)點(diǎn)元數(shù)據(jù)管理能力的極限呢?比如從支持百萬級別量級文件到數(shù)十億級別體量文件。將數(shù)十億級別量級文件元數(shù)據(jù)全部load到機(jī)器內(nèi)存已經(jīng)是一件不太靠譜的做法了。這個(gè)時(shí)候我們有一種新的元數(shù)據(jù)管理系統(tǒng)模式:分層級的元數(shù)據(jù)管理,官方術(shù)語的稱呼叫做Tier layer的元數(shù)據(jù)管理。
          這里主要分為兩種layer:
          • 最近訪問的熱點(diǎn)元數(shù)據(jù),做內(nèi)存緩存,叫做cached layer。

          • 很久沒有訪問過的數(shù)據(jù)((也可稱作冷數(shù)據(jù)),做持久化保存存,叫做persisted layer。

          熱點(diǎn)數(shù)據(jù)和冷數(shù)據(jù)根據(jù)用戶的訪問頻率行為可以互相之間做轉(zhuǎn)換,類似如下所示:60c9f677da4052e5ed8b5a0b492942c7.webp在此模式系統(tǒng)下,服務(wù)只cache當(dāng)前active的數(shù)據(jù),所以也就不會有內(nèi)存瓶頸這樣的問題。
          Alluxio內(nèi)部元數(shù)據(jù)管理架構(gòu)相比較于將元數(shù)據(jù)全部load到memory然后以此提高快速訪問能力的元數(shù)據(jù)管理方式,Alluxio在這點(diǎn)上做了優(yōu)化改進(jìn),只cache那些active的數(shù)據(jù),這是其內(nèi)部元數(shù)據(jù)管理的一大特點(diǎn)。對于那些近期沒有訪問過的冷數(shù)據(jù),則保存在本地的rocksdb內(nèi)。
          在Alluxio中,有專門的定義來定義上述元數(shù)據(jù)的存儲,在內(nèi)存中cache active數(shù)據(jù)的存儲層,我們叫做cache store,底層rocksdb層則叫做baking store。
          Alluxio就是基于上面提到的2層store做數(shù)據(jù)數(shù)據(jù)然后對外提供數(shù)據(jù)訪問能力,架構(gòu)圖如下所示:a2fff99cf5c091d32821a3148e95ee25.webp
          本文筆者這里想重點(diǎn)聊的點(diǎn)在于Cache store如何和上面Rocks store(Baking store)進(jìn)行數(shù)據(jù)交互的。
          Alluxio的支持異步寫出功能的自定義Cache實(shí)現(xiàn)在Cache store層,它需要做以下2件事情來保證元數(shù)據(jù)的正常更新:
          • 及時(shí)將那些訪問頻率降低的熱點(diǎn)數(shù)據(jù)移除并寫出到baking store里去。

          • 有新的數(shù)據(jù)訪問來時(shí),將這些數(shù)據(jù)從baking store讀出來并加載到cache里去。


          在上面兩點(diǎn)中,毫無疑問,第一點(diǎn)是Alluxio具體要實(shí)現(xiàn)。那么Alluxio采用的是什么辦法呢?用現(xiàn)有成熟Cache工具,guava cache?Guava cache自帶expireAfterAccess能很好的滿足上述的使用場景。
          不過最終Alluxio并沒有使用Guava cahce的方案。這點(diǎn)筆者認(rèn)為主要的一點(diǎn)在于guava cahce不支持異步的entry過期寫出功能。Gauva cache在更新過期entry時(shí)并沒有開啟額外線程的方式來做過期entry的處理,而是放在了后面的每次的cache訪問操作里順帶做了。那么這里其實(shí)會有一個(gè)隱患:當(dāng)cache很久沒有被訪問過了,然后下一次cache訪問發(fā)生在已經(jīng)超過大部分entry的過期時(shí)間之后,那么這時(shí)候可能會觸發(fā)大量的cache更新重新加載的行為。此時(shí)Guava Cache本身將會消耗掉很多的CPU來做這樣的事情,這也勢必會影響Cache對外提供數(shù)據(jù)訪問的能力。另外一點(diǎn),Gauva Cache的entry更新是要帶鎖的,如果Cache entry更新的緩慢是會block住其它想要訪問此entry的thread的。
          結(jié)論是說,如果我們想要Cache entry能夠被及時(shí)的移除以及更新,可以自己實(shí)現(xiàn)一個(gè)thread來觸發(fā)更新的行為。下面是Guava cache的Git文檔對這塊的一個(gè)說明解釋,里面也提到了為什么Guava Cahce為什么不在內(nèi)部實(shí)現(xiàn)啟動(dòng)線程來做cache過期更新的原因:
          When Does Cleanup Happen?Caches?built?with?CacheBuilder?do?not?perform?cleanup?and?evict?values?"automatically,"?or?instantly?after?a?value?expires,?or?anything?of?the?sort.?Instead,?it?performs?small?amounts?of?maintenance?during?write?operations,?or?during?occasional?read?operations?if?writes?are?rare.The?reason?for?this?is?as?follows:?if?we?wanted?to?perform?Cache?maintenance?continuously,?we?would?need?to?create?a?thread,?and?its?operations?would?be?competing?with?user?operations?for?shared?locks.?Additionally,?some?environments?restrict?the?creation?of?threads,?which?would?make?CacheBuilder?unusable?in?that?environment.Instead,?we?put?the?choice?in?your?hands.?If?your?cache?is?high-throughput,?then?you?don't?have?to?worry?about?performing?cache?maintenance?to?clean?up?expired?entries?and?the?like.?If?your?cache?does?writes?only?rarely?and?you?don't?want?cleanup?to?block?cache?reads,?you?may?wish?to?create?your?own?maintenance?thread?that?calls?Cache.cleanUp()?at?regular?intervals.If?you?want?to?schedule?regular?cache?maintenance?for?a?cache?which?only?rarely?has?writes,?just?schedule?the?maintenance?using?ScheduledExecutorService.


          OK,下面我們就來看看Alluxio內(nèi)部實(shí)現(xiàn)的帶異步寫出outdated entry功能的cache實(shí)現(xiàn)。這里我們對著其代碼實(shí)現(xiàn)做具體闡述。首先是上面架構(gòu)圖中的CachingInodeStore的定義:
          public final class CachingInodeStore implements InodeStore, Closeable {  private static final Logger LOG = LoggerFactory.getLogger(CachingInodeStore.class);  // Backing store用戶數(shù)據(jù)寫出持久化的store  private final InodeStore mBackingStore;  private final InodeLockManager mLockManager;
          // Cache recently-accessed inodes. @VisibleForTesting final InodeCache mInodeCache;
          // Cache recently-accessed inode tree edges. @VisibleForTesting final EdgeCache mEdgeCache;
          @VisibleForTesting final ListingCache mListingCache;
          // Starts true, but becomes permanently false if we ever need to spill metadata to the backing // store. When true, we can optimize lookups for non-existent inodes because we don't need to // check the backing store. We can also optimize getChildren by skipping the range query on the // backing store. private volatile boolean mBackingStoreEmpty; ...
          public CachingInodeStore(InodeStore backingStore, InodeLockManager lockManager) { mBackingStore = backingStore; mLockManager = lockManager; AlluxioConfiguration conf = ServerConfiguration.global(); int maxSize = conf.getInt(PropertyKey.MASTER_METASTORE_INODE_CACHE_MAX_SIZE); Preconditions.checkState(maxSize > 0, "Maximum cache size %s must be positive, but is set to %s", PropertyKey.MASTER_METASTORE_INODE_CACHE_MAX_SIZE.getName(), maxSize); float highWaterMarkRatio = ConfigurationUtils.checkRatio(conf, PropertyKey.MASTER_METASTORE_INODE_CACHE_HIGH_WATER_MARK_RATIO); // 最高水位的計(jì)算 int highWaterMark = Math.round(maxSize * highWaterMarkRatio); float lowWaterMarkRatio = ConfigurationUtils.checkRatio(conf, PropertyKey.MASTER_METASTORE_INODE_CACHE_LOW_WATER_MARK_RATIO); Preconditions.checkState(lowWaterMarkRatio <= highWaterMarkRatio, "low water mark ratio (%s=%s) must not exceed high water mark ratio (%s=%s)", PropertyKey.MASTER_METASTORE_INODE_CACHE_LOW_WATER_MARK_RATIO.getName(), lowWaterMarkRatio, PropertyKey.MASTER_METASTORE_INODE_CACHE_HIGH_WATER_MARK_RATIO, highWaterMarkRatio); // 最低水位的計(jì)算 int lowWaterMark = Math.round(maxSize * lowWaterMarkRatio);
          mBackingStoreEmpty = true; CacheConfiguration cacheConf = CacheConfiguration.newBuilder().setMaxSize(maxSize) .setHighWaterMark(highWaterMark).setLowWaterMark(lowWaterMark) .setEvictBatchSize(conf.getInt(PropertyKey.MASTER_METASTORE_INODE_CACHE_EVICT_BATCH_SIZE)) .build(); // 將上述cache相關(guān)配置值傳入cache中 mInodeCache = new InodeCache(cacheConf); mEdgeCache = new EdgeCache(cacheConf); mListingCache = new ListingCache(cacheConf);??}
          這里我們主要看mInodeCache這個(gè)cache,它保存了最近訪問過的inode。
            class InodeCache extends Cache> {    public InodeCache(CacheConfiguration conf) {      super(conf, "inode-cache", MetricKey.MASTER_INODE_CACHE_SIZE);    }    ...}
          我們看到InodeCache底層繼承的是Cache這個(gè)類,我們繼續(xù)進(jìn)入這個(gè)類的實(shí)現(xiàn):
          public abstract class Cache implements Closeable {  private static final Logger LOG = LoggerFactory.getLogger(Cache.class);
          private final int mMaxSize; // cache的高水位值,當(dāng)當(dāng)前cache entry總數(shù)超過此值時(shí),會觸發(fā)entry的寫出 private final int mHighWaterMark; // cache的低水位值,每次cache寫出清理后的entry總數(shù) private final int mLowWaterMark; // 每次過期寫出entry的批量大小 private final int mEvictBatchSize; private final String mName; // cache map,為了保證線程安全,使用了ConcurrentHashMap @VisibleForTesting final ConcurrentHashMap mMap; // TODO(andrew): Support using multiple threads to speed up backing store writes. // Thread for performing eviction to the backing store. @VisibleForTesting // entry移除寫出線程 final EvictionThread mEvictionThread;??...
          簡單而言,Alluxio的Cache類工作的本質(zhì)模式是一個(gè)ConcurrentHashMap+EvictionThread的模式。因?yàn)樯婕暗組ap并發(fā)操作的情況,所以這里使用了ConcurrentHashMap。然后再根據(jù)這里閾值的定義(高低watermark值的設(shè)定),進(jìn)行entry的寫出更新。
          下面我們直接來看EvictionThread的操作邏輯:
          class EvictionThread extends Thread {    @VisibleForTesting    volatile boolean mIsSleeping = true;
          // 存儲需要被清理出去的cache entry private final List mEvictionCandidates = new ArrayList<>(mEvictBatchSize); private final List mDirtyEvictionCandidates = new ArrayList<>(mEvictBatchSize); private final Logger mCacheFullLogger = new SamplingLogger(LOG, 10L * Constants.SECOND_MS);
          ...
          @Override public void run() { while (!Thread.interrupted()) { // 如果當(dāng)前map總entry數(shù)未超過高水位置,則線程進(jìn)行wait等待 while (!overHighWaterMark()) { synchronized (mEvictionThread) { if (!overHighWaterMark()) { try { mIsSleeping = true; mEvictionThread.wait(); mIsSleeping = false; } catch (InterruptedException e) { return; } } } } if (cacheIsFull()) { mCacheFullLogger.warn( "Metastore {} cache is full. Consider increasing the cache size or lowering the " + "high water mark. size:{} lowWaterMark:{} highWaterMark:{} maxSize:{}", mName, mMap.size(), mLowWaterMark, mHighWaterMark, mMaxSize); } // 如果當(dāng)前map總entry數(shù)超過高水位置,則開始準(zhǔn)備進(jìn)行entry的寫出清理,map entry數(shù)量清理至低水位置 evictToLowWaterMark(); } }}
          繼續(xù)進(jìn)入evictToLowWaterMark方法:
              private void evictToLowWaterMark() {      long evictionStart = System.nanoTime();      // 計(jì)算此處entry移除會被移除的數(shù)量      int toEvict = mMap.size() - mLowWaterMark;      // 當(dāng)前移除entry的計(jì)數(shù)累加值      int evictionCount = 0;      // 進(jìn)行entry的寫出移除      while (evictionCount < toEvict) {        if (!mEvictionHead.hasNext()) {          mEvictionHead = mMap.values().iterator();        }        // 遍歷mapentry,進(jìn)行需要被移除的entry數(shù)的收集        fillBatch(toEvict - evictionCount);        // 進(jìn)行entry的寫出清理        evictionCount += evictBatch();      }      if (evictionCount > 0) {        LOG.debug("{}: Evicted {} entries in {}ms", mName, evictionCount,            (System.nanoTime() - evictionStart) / Constants.MS_NANO);      }????}
          上面fillBatch的entry數(shù)收集過程如下所示:
              private void fillBatch(int count) {      // 單次移除entry數(shù)的上限值設(shè)定      int targetSize = Math.min(count, mEvictBatchSize);      // 當(dāng)待移除entry未達(dá)到目標(biāo)值時(shí),繼續(xù)遍歷map尋找未被引用的entry      while (mEvictionCandidates.size() < targetSize && mEvictionHead.hasNext()) {        Entry candidate = mEvictionHead.next();        // 如果entry被外界引用,則將其引用值標(biāo)記為false,下次如果還遍歷到此entry,此entry將被收集移除        // 當(dāng)entry被會訪問時(shí),其reference值會被標(biāo)記為true。        if (candidate.mReferenced) {          candidate.mReferenced = false;          continue;        }        // 如果此entry已經(jīng)被標(biāo)記為沒有引用,則加入到待移除entry列表內(nèi)        mEvictionCandidates.add(candidate);        if (candidate.mDirty) {          mDirtyEvictionCandidates.add(candidate);        }      }    }
          然后是entry寫出操作
              private int evictBatch() {      int evicted = 0;      if (mEvictionCandidates.isEmpty()) {        return evicted;      }      // 進(jìn)行entry的寫出,entry分為兩類      // 如果entry值和baking store里保存的是一致的話:則直接從map里進(jìn)行移除即可      // 如果entry值和baking store對比是發(fā)生過更新的,則額外還需要進(jìn)行flush寫出,然后map里再進(jìn)行移除      flushEntries(mDirtyEvictionCandidates);      for (Entry entry : mEvictionCandidates) {        if (evictIfClean(entry)) {          evicted++;        }      }      mEvictionCandidates.clear();      mDirtyEvictionCandidates.clear();      return evicted;    }
          我們可以看到entry移除的過程其實(shí)還會被分出兩類,這其中取決于此entry值和baking store中持久化保存的值是否一致。
          • 第一類,只需從cache map中進(jìn)行移除

          • 第二類,從cache map中進(jìn)行移除,還需要寫出到baking store。

          這里是由cache Entry的dirty屬性值來確定的:
            protected class Entry {    protected K mKey;    // null value means that the key has been removed from the cache, but still needs to be removed    // from the backing store.    @Nullable    protected V mValue;
          // Whether the entry is out of sync with the backing store. If mDirty is true, the entry must be // flushed to the backing store before it can be evicted. protected volatile boolean mDirty = true;,,,
          evictBatch的flushEntries方法取決于繼承子類如何實(shí)現(xiàn)baking store的寫出。
            /**   * Attempts to flush the given entries to the backing store.   *   * The subclass is responsible for setting each candidate's mDirty field to false on success.   *   * @param candidates the candidate entries to flush   */??protected?abstract?void?flushEntries(List?candidates)
          Map entry的異步寫出過期entry過程說完了,我們再來看另一部分內(nèi)容Entry的訪問操作get/put, delete的操作。這里我們以put操作為例:
           /**   * Writes a key/value pair to the cache.   *   * @param key the key   * @param value the value   */  public void put(K key, V value) {    mMap.compute(key, (k, entry) -> {      // put操作callback接口方法      onPut(key, value);      // 如果是cache已經(jīng)滿了,則直接寫出到baking store里      if (entry == null && cacheIsFull()) {        writeToBackingStore(key, value);        return null;      }      if (entry == null || entry.mValue == null) {        onCacheUpdate(key, value);        return new Entry(key, value);      }      // 進(jìn)行entry的更新      entry.mValue = value;      // 標(biāo)記entry reference引用值為true,意為近期此entry被訪問過,在get,remove方法中,也會更新此屬性值為true      entry.mReferenced = true;      // 標(biāo)記此數(shù)據(jù)為dirty,意為從baking load此entry值后,此值發(fā)生過更新      entry.mDirty = true;      return entry;    });    // 隨后通知Eviction線程,判斷是否需要進(jìn)行entry的移除,在get,remove方法中,也會在末尾調(diào)用此方法    wakeEvictionThreadIfNecessary();  }
          在上面方法的最后一行邏輯,會第一時(shí)間激活Eviction線程來做entry的移除操作,這樣就不會存在前文說的短期內(nèi)可能大量entry的寫出移除操作了。這點(diǎn)和Guava cache的過期更新策略是不同的。
          以上就是本文所講述的主要內(nèi)容了,其中大量篇幅介紹的是Alluxio內(nèi)部Cache功能的實(shí)現(xiàn),更詳細(xì)邏輯讀者朋友們可閱讀下文相關(guān)類代碼的鏈接進(jìn)行進(jìn)一步的學(xué)習(xí)。
          引用[1].https://github.com/google/guava/wiki/CachesExplained#refresh[2].https://dzone.com/articles/scalable-metadata-service-in-alluxio-storing-billi[3].https://dzone.com/articles/store-1-billion-files-in-alluxio-20[4].https://github.com/Alluxio/alluxio/blob/master/core/server/master/src/main/java/alluxio/master/metastore/caching/CachingInodeStore.java[5].https://github.com/Alluxio/alluxio/blob/master/core/server/master/src/main/java/alluxio/master/metastore/caching/Cache.java


          歡迎點(diǎn)贊+收藏+轉(zhuǎn)發(fā)朋友圈素質(zhì)三連

          文章不錯(cuò)?點(diǎn)個(gè)【在看】吧!??


          瀏覽 28
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  中文丰满亲子伦在线观看 | 午夜操B| 欧美黑人一级 | 玖草福利| 操逼免费无码 |