冷熱數(shù)據(jù)分離 | Alluxio元數(shù)據(jù)管理策略
點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”
回復(fù)”資源“獲取更多資源
本文作者:林意群
原文地址:http://suo.im/5Xcmci
大數(shù)據(jù)技術(shù)與架構(gòu)點(diǎn)擊右側(cè)關(guān)注,大數(shù)據(jù)開發(fā)領(lǐng)域最強(qiáng)公眾號!暴走大數(shù)據(jù)點(diǎn)擊右側(cè)關(guān)注,暴走大數(shù)據(jù)!
一.Alluxio概述
Alluxio(前身Tachyon)是世界上第一個(gè)以內(nèi)存為中心的虛擬的分布式存儲系統(tǒng)。它統(tǒng)一了數(shù)據(jù)訪問的方式,為上層計(jì)算框架和底層存儲系統(tǒng)構(gòu)建了橋梁。Alluxio項(xiàng)目源自加州大學(xué)伯克利分校AMPLab,作為伯克利數(shù)據(jù)分析堆棧(BDAS)的數(shù)據(jù)訪問層。Alluxio是增長最快的開源項(xiàng)目之一,吸引了來自300多家機(jī)構(gòu)的1000多名貢獻(xiàn)者,包括阿里巴巴,Alluxio,百度,CMU,谷歌,IBM,英特爾,NJU,紅帽,騰訊,加州大學(xué)伯克利分校,以及雅虎。本文我們就來簡單聊聊Alluxio的tier layer的元數(shù)據(jù)管理。
當(dāng)元數(shù)據(jù)管理再進(jìn)一步加大的時(shí)候,我們還能如何拓展單個(gè)節(jié)點(diǎn)元數(shù)據(jù)管理能力的極限呢?比如從支持百萬級別量級文件到數(shù)十億級別體量文件。將數(shù)十億級別量級文件元數(shù)據(jù)全部load到機(jī)器內(nèi)存已經(jīng)是一件不太靠譜的做法了。這個(gè)時(shí)候我們有一種新的元數(shù)據(jù)管理系統(tǒng)模式:分層級的元數(shù)據(jù)管理,官方術(shù)語的稱呼叫做Tier layer的元數(shù)據(jù)管理。這里主要分為兩種layer:
最近訪問的熱點(diǎn)元數(shù)據(jù),做內(nèi)存緩存,叫做cached layer。
很久沒有訪問過的數(shù)據(jù)((也可稱作冷數(shù)據(jù)),做持久化保存存,叫做persisted layer。
在此模式系統(tǒng)下,服務(wù)只cache當(dāng)前active的數(shù)據(jù),所以也就不會有內(nèi)存瓶頸這樣的問題。Alluxio內(nèi)部元數(shù)據(jù)管理架構(gòu)相比較于將元數(shù)據(jù)全部load到memory然后以此提高快速訪問能力的元數(shù)據(jù)管理方式,Alluxio在這點(diǎn)上做了優(yōu)化改進(jìn),只cache那些active的數(shù)據(jù),這是其內(nèi)部元數(shù)據(jù)管理的一大特點(diǎn)。對于那些近期沒有訪問過的冷數(shù)據(jù),則保存在本地的rocksdb內(nèi)。
在Alluxio中,有專門的定義來定義上述元數(shù)據(jù)的存儲,在內(nèi)存中cache active數(shù)據(jù)的存儲層,我們叫做cache store,底層rocksdb層則叫做baking store。
Alluxio就是基于上面提到的2層store做數(shù)據(jù)數(shù)據(jù)然后對外提供數(shù)據(jù)訪問能力,架構(gòu)圖如下所示:

本文筆者這里想重點(diǎn)聊的點(diǎn)在于Cache store如何和上面Rocks store(Baking store)進(jìn)行數(shù)據(jù)交互的。
Alluxio的支持異步寫出功能的自定義Cache實(shí)現(xiàn)在Cache store層,它需要做以下2件事情來保證元數(shù)據(jù)的正常更新:
及時(shí)將那些訪問頻率降低的熱點(diǎn)數(shù)據(jù)移除并寫出到baking store里去。
有新的數(shù)據(jù)訪問來時(shí),將這些數(shù)據(jù)從baking store讀出來并加載到cache里去。
在上面兩點(diǎn)中,毫無疑問,第一點(diǎn)是Alluxio具體要實(shí)現(xiàn)。那么Alluxio采用的是什么辦法呢?用現(xiàn)有成熟Cache工具,guava cache?Guava cache自帶expireAfterAccess能很好的滿足上述的使用場景。
不過最終Alluxio并沒有使用Guava cahce的方案。這點(diǎn)筆者認(rèn)為主要的一點(diǎn)在于guava cahce不支持異步的entry過期寫出功能。Gauva cache在更新過期entry時(shí)并沒有開啟額外線程的方式來做過期entry的處理,而是放在了后面的每次的cache訪問操作里順帶做了。那么這里其實(shí)會有一個(gè)隱患:當(dāng)cache很久沒有被訪問過了,然后下一次cache訪問發(fā)生在已經(jīng)超過大部分entry的過期時(shí)間之后,那么這時(shí)候可能會觸發(fā)大量的cache更新重新加載的行為。此時(shí)Guava Cache本身將會消耗掉很多的CPU來做這樣的事情,這也勢必會影響Cache對外提供數(shù)據(jù)訪問的能力。另外一點(diǎn),Gauva Cache的entry更新是要帶鎖的,如果Cache entry更新的緩慢是會block住其它想要訪問此entry的thread的。
結(jié)論是說,如果我們想要Cache entry能夠被及時(shí)的移除以及更新,可以自己實(shí)現(xiàn)一個(gè)thread來觸發(fā)更新的行為。下面是Guava cache的Git文檔對這塊的一個(gè)說明解釋,里面也提到了為什么Guava Cahce為什么不在內(nèi)部實(shí)現(xiàn)啟動(dòng)線程來做cache過期更新的原因:
When Does Cleanup Happen?Caches?built?with?CacheBuilder?do?not?perform?cleanup?and?evict?values?"automatically,"?or?instantly?after?a?value?expires,?or?anything?of?the?sort.?Instead,?it?performs?small?amounts?of?maintenance?during?write?operations,?or?during?occasional?read?operations?if?writes?are?rare.The?reason?for?this?is?as?follows:?if?we?wanted?to?perform?Cache?maintenance?continuously,?we?would?need?to?create?a?thread,?and?its?operations?would?be?competing?with?user?operations?for?shared?locks.?Additionally,?some?environments?restrict?the?creation?of?threads,?which?would?make?CacheBuilder?unusable?in?that?environment.Instead,?we?put?the?choice?in?your?hands.?If?your?cache?is?high-throughput,?then?you?don't?have?to?worry?about?performing?cache?maintenance?to?clean?up?expired?entries?and?the?like.?If?your?cache?does?writes?only?rarely?and?you?don't?want?cleanup?to?block?cache?reads,?you?may?wish?to?create?your?own?maintenance?thread?that?calls?Cache.cleanUp()?at?regular?intervals.If?you?want?to?schedule?regular?cache?maintenance?for?a?cache?which?only?rarely?has?writes,?just?schedule?the?maintenance?using?ScheduledExecutorService.
這里我們主要看mInodeCache這個(gè)cache,它保存了最近訪問過的inode。public final class CachingInodeStore implements InodeStore, Closeable {private static final Logger LOG = LoggerFactory.getLogger(CachingInodeStore.class);// Backing store用戶數(shù)據(jù)寫出持久化的storeprivate final InodeStore mBackingStore;private final InodeLockManager mLockManager;// Cache recently-accessed inodes.@VisibleForTestingfinal InodeCache mInodeCache;// Cache recently-accessed inode tree edges.@VisibleForTestingfinal EdgeCache mEdgeCache;@VisibleForTestingfinal ListingCache mListingCache;// Starts true, but becomes permanently false if we ever need to spill metadata to the backing// store. When true, we can optimize lookups for non-existent inodes because we don't need to// check the backing store. We can also optimize getChildren by skipping the range query on the// backing store.private volatile boolean mBackingStoreEmpty;...public CachingInodeStore(InodeStore backingStore, InodeLockManager lockManager) {mBackingStore = backingStore;mLockManager = lockManager;AlluxioConfiguration conf = ServerConfiguration.global();int maxSize = conf.getInt(PropertyKey.MASTER_METASTORE_INODE_CACHE_MAX_SIZE);Preconditions.checkState(maxSize > 0,"Maximum cache size %s must be positive, but is set to %s",PropertyKey.MASTER_METASTORE_INODE_CACHE_MAX_SIZE.getName(), maxSize);float highWaterMarkRatio = ConfigurationUtils.checkRatio(conf,PropertyKey.MASTER_METASTORE_INODE_CACHE_HIGH_WATER_MARK_RATIO);// 最高水位的計(jì)算int highWaterMark = Math.round(maxSize * highWaterMarkRatio);float lowWaterMarkRatio = ConfigurationUtils.checkRatio(conf,PropertyKey.MASTER_METASTORE_INODE_CACHE_LOW_WATER_MARK_RATIO);Preconditions.checkState(lowWaterMarkRatio <= highWaterMarkRatio,"low water mark ratio (%s=%s) must not exceed high water mark ratio (%s=%s)",PropertyKey.MASTER_METASTORE_INODE_CACHE_LOW_WATER_MARK_RATIO.getName(), lowWaterMarkRatio,PropertyKey.MASTER_METASTORE_INODE_CACHE_HIGH_WATER_MARK_RATIO, highWaterMarkRatio);// 最低水位的計(jì)算int lowWaterMark = Math.round(maxSize * lowWaterMarkRatio);mBackingStoreEmpty = true;CacheConfiguration cacheConf = CacheConfiguration.newBuilder().setMaxSize(maxSize).setHighWaterMark(highWaterMark).setLowWaterMark(lowWaterMark).setEvictBatchSize(conf.getInt(PropertyKey.MASTER_METASTORE_INODE_CACHE_EVICT_BATCH_SIZE)).build();// 將上述cache相關(guān)配置值傳入cache中mInodeCache = new InodeCache(cacheConf);mEdgeCache = new EdgeCache(cacheConf);mListingCache = new ListingCache(cacheConf);??}
我們看到InodeCache底層繼承的是Cacheclass InodeCache extends Cache> { public InodeCache(CacheConfiguration conf) {super(conf, "inode-cache", MetricKey.MASTER_INODE_CACHE_SIZE);}...}
簡單而言,Alluxio的Cache類工作的本質(zhì)模式是一個(gè)ConcurrentHashMap+EvictionThread的模式。因?yàn)樯婕暗組ap并發(fā)操作的情況,所以這里使用了ConcurrentHashMap。然后再根據(jù)這里閾值的定義(高低watermark值的設(shè)定),進(jìn)行entry的寫出更新。public abstract class Cacheimplements Closeable { private static final Logger LOG = LoggerFactory.getLogger(Cache.class);private final int mMaxSize;// cache的高水位值,當(dāng)當(dāng)前cache entry總數(shù)超過此值時(shí),會觸發(fā)entry的寫出private final int mHighWaterMark;// cache的低水位值,每次cache寫出清理后的entry總數(shù)private final int mLowWaterMark;// 每次過期寫出entry的批量大小private final int mEvictBatchSize;private final String mName;// cache map,為了保證線程安全,使用了ConcurrentHashMap@VisibleForTestingfinal ConcurrentHashMapmMap; // TODO(andrew): Support using multiple threads to speed up backing store writes.// Thread for performing eviction to the backing store.@VisibleForTesting// entry移除寫出線程final EvictionThread mEvictionThread;??...
下面我們直接來看EvictionThread的操作邏輯:
繼續(xù)進(jìn)入evictToLowWaterMark方法:class EvictionThread extends Thread {@VisibleForTestingvolatile boolean mIsSleeping = true;// 存儲需要被清理出去的cache entryprivate final ListmEvictionCandidates = new ArrayList<>(mEvictBatchSize); private final ListmDirtyEvictionCandidates = new ArrayList<>(mEvictBatchSize); private final Logger mCacheFullLogger = new SamplingLogger(LOG, 10L * Constants.SECOND_MS);...@Overridepublic void run() {while (!Thread.interrupted()) {// 如果當(dāng)前map總entry數(shù)未超過高水位置,則線程進(jìn)行wait等待while (!overHighWaterMark()) {synchronized (mEvictionThread) {if (!overHighWaterMark()) {try {mIsSleeping = true;mEvictionThread.wait();mIsSleeping = false;} catch (InterruptedException e) {return;}}}}if (cacheIsFull()) {mCacheFullLogger.warn("Metastore {} cache is full. Consider increasing the cache size or lowering the "+ "high water mark. size:{} lowWaterMark:{} highWaterMark:{} maxSize:{}",mName, mMap.size(), mLowWaterMark, mHighWaterMark, mMaxSize);}// 如果當(dāng)前map總entry數(shù)超過高水位置,則開始準(zhǔn)備進(jìn)行entry的寫出清理,map entry數(shù)量清理至低水位置evictToLowWaterMark();}}}
上面fillBatch的entry數(shù)收集過程如下所示:private void evictToLowWaterMark() {long evictionStart = System.nanoTime();// 計(jì)算此處entry移除會被移除的數(shù)量int toEvict = mMap.size() - mLowWaterMark;// 當(dāng)前移除entry的計(jì)數(shù)累加值int evictionCount = 0;// 進(jìn)行entry的寫出移除while (evictionCount < toEvict) {if (!mEvictionHead.hasNext()) {mEvictionHead = mMap.values().iterator();}// 遍歷mapentry,進(jìn)行需要被移除的entry數(shù)的收集fillBatch(toEvict - evictionCount);// 進(jìn)行entry的寫出清理evictionCount += evictBatch();}if (evictionCount > 0) {LOG.debug("{}: Evicted {} entries in {}ms", mName, evictionCount,(System.nanoTime() - evictionStart) / Constants.MS_NANO);}????}
然后是entry寫出操作:private void fillBatch(int count) {// 單次移除entry數(shù)的上限值設(shè)定int targetSize = Math.min(count, mEvictBatchSize);// 當(dāng)待移除entry未達(dá)到目標(biāo)值時(shí),繼續(xù)遍歷map尋找未被引用的entrywhile (mEvictionCandidates.size() < targetSize && mEvictionHead.hasNext()) {Entry candidate = mEvictionHead.next();// 如果entry被外界引用,則將其引用值標(biāo)記為false,下次如果還遍歷到此entry,此entry將被收集移除// 當(dāng)entry被會訪問時(shí),其reference值會被標(biāo)記為true。if (candidate.mReferenced) {candidate.mReferenced = false;continue;}// 如果此entry已經(jīng)被標(biāo)記為沒有引用,則加入到待移除entry列表內(nèi)mEvictionCandidates.add(candidate);if (candidate.mDirty) {mDirtyEvictionCandidates.add(candidate);}}}
我們可以看到entry移除的過程其實(shí)還會被分出兩類,這其中取決于此entry值和baking store中持久化保存的值是否一致。private int evictBatch() {int evicted = 0;if (mEvictionCandidates.isEmpty()) {return evicted;}// 進(jìn)行entry的寫出,entry分為兩類// 如果entry值和baking store里保存的是一致的話:則直接從map里進(jìn)行移除即可// 如果entry值和baking store對比是發(fā)生過更新的,則額外還需要進(jìn)行flush寫出,然后map里再進(jìn)行移除flushEntries(mDirtyEvictionCandidates);for (Entry entry : mEvictionCandidates) {if (evictIfClean(entry)) {evicted++;}}mEvictionCandidates.clear();mDirtyEvictionCandidates.clear();return evicted;}
第一類,只需從cache map中進(jìn)行移除
第二類,從cache map中進(jìn)行移除,還需要寫出到baking store。
evictBatch的flushEntries方法取決于繼承子類如何實(shí)現(xiàn)baking store的寫出。protected class Entry {protected K mKey;// null value means that the key has been removed from the cache, but still needs to be removed// from the backing store.@Nullableprotected V mValue;// Whether the entry is out of sync with the backing store. If mDirty is true, the entry must be// flushed to the backing store before it can be evicted.protected volatile boolean mDirty = true;,,,
Map entry的異步寫出過期entry過程說完了,我們再來看另一部分內(nèi)容Entry的訪問操作get/put, delete的操作。這里我們以put操作為例:/*** Attempts to flush the given entries to the backing store.** The subclass is responsible for setting each candidate's mDirty field to false on success.** @param candidates the candidate entries to flush*/??protected?abstract?void?flushEntries(List?candidates)
在上面方法的最后一行邏輯,會第一時(shí)間激活Eviction線程來做entry的移除操作,這樣就不會存在前文說的短期內(nèi)可能大量entry的寫出移除操作了。這點(diǎn)和Guava cache的過期更新策略是不同的。/*** Writes a key/value pair to the cache.** @param key the key* @param value the value*/public void put(K key, V value) {mMap.compute(key, (k, entry) -> {// put操作callback接口方法onPut(key, value);// 如果是cache已經(jīng)滿了,則直接寫出到baking store里if (entry == null && cacheIsFull()) {writeToBackingStore(key, value);return null;}if (entry == null || entry.mValue == null) {onCacheUpdate(key, value);return new Entry(key, value);}// 進(jìn)行entry的更新entry.mValue = value;// 標(biāo)記entry reference引用值為true,意為近期此entry被訪問過,在get,remove方法中,也會更新此屬性值為trueentry.mReferenced = true;// 標(biāo)記此數(shù)據(jù)為dirty,意為從baking load此entry值后,此值發(fā)生過更新entry.mDirty = true;return entry;});// 隨后通知Eviction線程,判斷是否需要進(jìn)行entry的移除,在get,remove方法中,也會在末尾調(diào)用此方法wakeEvictionThreadIfNecessary();}
以上就是本文所講述的主要內(nèi)容了,其中大量篇幅介紹的是Alluxio內(nèi)部Cache功能的實(shí)現(xiàn),更詳細(xì)邏輯讀者朋友們可閱讀下文相關(guān)類代碼的鏈接進(jìn)行進(jìn)一步的學(xué)習(xí)。
引用[1].https://github.com/google/guava/wiki/CachesExplained#refresh[2].https://dzone.com/articles/scalable-metadata-service-in-alluxio-storing-billi[3].https://dzone.com/articles/store-1-billion-files-in-alluxio-20[4].https://github.com/Alluxio/alluxio/blob/master/core/server/master/src/main/java/alluxio/master/metastore/caching/CachingInodeStore.java[5].https://github.com/Alluxio/alluxio/blob/master/core/server/master/src/main/java/alluxio/master/metastore/caching/Cache.java
文章不錯(cuò)?點(diǎn)個(gè)【在看】吧!??
評論
圖片
表情


