<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          SpringBoot2.x 官方推薦緩存框架-Caffeine高性能設(shè)計剖析

          共 12678字,需瀏覽 26分鐘

           ·

          2020-12-17 20:43

          概要

          Caffeine是一個高性能,高命中率,低內(nèi)存占用,near optimal 的本地緩存,簡單來說它是Guava Cache的優(yōu)化加強版,有些文章把Caffeine稱為“新一代的緩存”、“現(xiàn)代緩存之王”。本文將重點講解Caffeine的高性能設(shè)計,以及對應(yīng)部分的源碼分析。

          與Guava Cache比較

          如果你對Guava Cache還不理解的話,可以點擊這里來看一下我之前寫過關(guān)于Guava Cache的文章。

          大家都知道,Spring5即將放棄掉Guava Cache作為緩存機制,而改用Caffeine作為新的本地Cache的組件,這對于Caffeine來說是一個很大的肯定。為什么Spring會這樣做呢?其實在Caffeine的Benchmarks里給出了好靚仔的數(shù)據(jù),對讀和寫的場景,還有跟其他幾個緩存工具進行了比較,Caffeine的性能都表現(xiàn)很突出。

          圖片

          使用Caffeine

          Caffeine為了方便大家使用以及從Guava Cache切換過來(很有針對性啊~),借鑒了Guava Cache大部分的概念(諸如核心概念Cache、LoadingCache、CacheLoader、CacheBuilder等等),對于Caffeine的理解只要把它當作Guava Cache就可以了。

          使用上,大家只要把Caffeine的包引進來,然后換一下cache的實現(xiàn)類,基本應(yīng)該就沒問題了。這對與已經(jīng)使用過Guava Cache的同學來說沒有任何難度,甚至還有一點熟悉的味道,如果你之前沒有使用過Guava Cache,可以查看Caffeine的官方API說明文檔,其中Population,Eviction,Removal,Refresh,Statistics,Cleanup,Policy等等這些特性都是跟Guava Cache基本一樣的。

          下面給出一個例子說明怎樣創(chuàng)建一個Cache:

          private?static?LoadingCache?cache?=?Caffeine.newBuilder()??
          ????????????//最大個數(shù)限制??
          ????????????.maximumSize(256L)??
          ????????????//初始化容量??
          ????????????.initialCapacity(1)??
          ????????????//訪問后過期(包括讀和寫)??
          ????????????.expireAfterAccess(2,?TimeUnit.DAYS)??
          ????????????//寫后過期??
          ????????????.expireAfterWrite(2,?TimeUnit.HOURS)??
          ????????????//寫后自動異步刷新??
          ????????????.refreshAfterWrite(1,?TimeUnit.HOURS)??
          ????????????//記錄下緩存的一些統(tǒng)計數(shù)據(jù),例如命中率等??
          ????????????.recordStats()??
          ????????????//cache對緩存寫的通知回調(diào)??
          ????????????.writer(new?CacheWriter()?{??
          ????????????????@Override??
          ????????????????public?void?write(@NonNull?Object?key,?@NonNull?Object?value)?{??
          ????????????????????log.info("key={},?CacheWriter?write",?key);??
          ????????????????}??
          ??
          ????????????????@Override??
          ????????????????public?void?delete(@NonNull?Object?key,?@Nullable?Object?value,?@NonNull?RemovalCause?cause)?{??
          ????????????????????log.info("key={},?cause={},?CacheWriter?delete",?key,?cause);??
          ????????????????}??
          ????????????})??
          ????????????//使用CacheLoader創(chuàng)建一個LoadingCache??
          ????????????.build(new?CacheLoader()?{??
          ????????????????//同步加載數(shù)據(jù)??
          ????????????????@Nullable??
          ????????????????@Override??
          ????????????????public?String?load(@NonNull?String?key)?throws?Exception?{??
          ????????????????????return?"value_"?+?key;??
          ????????????????}??
          ??
          ????????????????//異步加載數(shù)據(jù)??
          ????????????????@Nullable??
          ????????????????@Override??
          ????????????????public?String?reload(@NonNull?String?key,?@NonNull?String?oldValue)?throws?Exception?{??
          ????????????????????return?"value_"?+?key;??
          ????????????????}??
          ????????????});??

          Caffeine的高性能設(shè)計

          判斷一個緩存的好壞最核心的指標就是命中率,影響緩存命中率有很多因素,包括業(yè)務(wù)場景、淘汰策略、清理策略、緩存容量等等。如果作為本地緩存, 它的性能的情況,資源的占用也都是一個很重要的指標。下面

          我們來看看Caffeine在這幾個方面是怎么著手的,如何做優(yōu)化的。

          (注:本文不會分析Caffeine全部源碼,只會對核心設(shè)計的實現(xiàn)進行分析,但我建議讀者把Caffeine的源碼都涉獵一下,有個overview才能更好理解本文。如果你看過Guava Cache的源碼也行,代碼的數(shù)據(jù)結(jié)構(gòu)和處理邏輯很類似的。源碼基于:caffeine-2.8.0.jar)

          W-TinyLFU整體設(shè)計

          上面說到淘汰策略是影響緩存命中率的因素之一,一般比較簡單的緩存就會直接用到LFU(Least Frequently Used,即最不經(jīng)常使用)或者LRU(Least Recently Used,即最近最少使用),而Caffeine就是使用了W-TinyLFU算法。

          W-TinyLFU看名字就能大概猜出來,它是LFU的變種,也是一種緩存淘汰算法。那為什么要使用W-TinyLFU呢?

          LRU和LFU的缺點

          • LRU實現(xiàn)簡單,在一般情況下能夠表現(xiàn)出很好的命中率,是一個“性價比”很高的算法,平時也很常用。雖然LRU對突發(fā)性的稀疏流量(sparse bursts)表現(xiàn)很好,但同時也會產(chǎn)生緩存污染,舉例來說,如果偶然性的要對全量數(shù)據(jù)進行遍歷,那么“歷史訪問記錄”就會被刷走,造成污染。

          • 如果數(shù)據(jù)的分布在一段時間內(nèi)是固定的話,那么LFU可以達到最高的命中率。但是LFU有兩個缺點,第一,它需要給每個記錄項維護頻率信息,每次訪問都需要更新,這是個巨大的開銷;第二,對突發(fā)性的稀疏流量無力,因為前期經(jīng)常訪問的記錄已經(jīng)占用了緩存,偶然的流量不太可能會被保留下來,而且過去的一些大量被訪問的記錄在將來也不一定會使用上,這樣就一直把“坑”占著了。

          無論LRU還是LFU都有其各自的缺點,不過,現(xiàn)在已經(jīng)有很多針對其缺點而改良、優(yōu)化出來的變種算法。

          TinyLFU

          TinyLFU就是其中一個優(yōu)化算法,它是專門為了解決LFU上述提到的兩個問題而被設(shè)計出來的。

          解決第一個問題是采用了Count–Min Sketch算法。

          解決第二個問題是讓記錄盡量保持相對的“新鮮”(Freshness Mechanism),并且當有新的記錄插入時,可以讓它跟老的記錄進行“PK”,輸者就會被淘汰,這樣一些老的、不再需要的記錄就會被剔除。

          下圖是TinyLFU設(shè)計圖(來自官方)

          圖片
          統(tǒng)計頻率Count–Min Sketch算法

          如何對一個key進行統(tǒng)計,但又可以節(jié)省空間呢?(不是簡單的使用HashMap,這太消耗內(nèi)存了),注意哦,不需要精確的統(tǒng)計,只需要一個近似值就可以了,怎么樣,這樣場景是不是很熟悉,如果你是老司機,或許已經(jīng)聯(lián)想到布隆過濾器(Bloom Filter)的應(yīng)用了。

          沒錯,將要介紹的Count–Min Sketch的原理跟Bloom Filter一樣,只不過Bloom Filter只有0和1的值,那么你可以把Count–Min Sketch看作是“數(shù)值”版的Bloom Filter。

          更多關(guān)于Count–Min Sketch的介紹請自行搜索。

          在TinyLFU中,近似頻率的統(tǒng)計如下圖所示:

          圖片

          對一個key進行多次hash函數(shù)后,index到多個數(shù)組位置后進行累加,查詢時取多個值中的最小值即可。

          Caffeine對這個算法的實現(xiàn)在FrequencySketch類。但Caffeine對此有進一步的優(yōu)化,例如Count–Min Sketch使用了二維數(shù)組,Caffeine只是用了一個一維的數(shù)組;再者,如果是數(shù)值類型的話,這個數(shù)需要用int或long來存儲,但是Caffeine認為緩存的訪問頻率不需要用到那么大,只需要15就足夠,一般認為達到15次的頻率算是很高的了,而且Caffeine還有另外一個機制來使得這個頻率進行衰退減半(下面就會講到)。如果最大是15的話,那么只需要4個bit就可以滿足了,一個long有64bit,可以存儲16個這樣的統(tǒng)計數(shù),Caffeine就是這樣的設(shè)計,使得存儲效率提高了16倍。

          Caffeine對緩存的讀寫(afterReadafterWrite方法)都會調(diào)用onAccesss方法,而onAccess方法里有一句:

          frequencySketch().increment(key);??

          這句就是追加記錄的頻率,下面我們看看具體實現(xiàn)

          //FrequencySketch的一些屬性??
          ??
          //種子數(shù)??
          static?final?long[]?SEED?=?{?//?A?mixture?of?seeds?from?FNV-1a,?CityHash,?and?Murmur3??
          ????0xc3a5c85c97cb3127L,?0xb492b66fbe98f273L,?0x9ae16a3b2f90404fL,?0xcbf29ce484222325L};??
          static?final?long?RESET_MASK?=?0x7777777777777777L;??
          static?final?long?ONE_MASK?=?0x1111111111111111L;??
          ??
          int?sampleSize;??
          //為了快速根據(jù)hash值得到table的index值的掩碼??
          //table的長度size一般為2的n次方,而tableMask為size-1,這樣就可以通過&操作來模擬取余操作,速度快很多,老司機都知道??
          int?tableMask;??
          //存儲數(shù)據(jù)的一維long數(shù)組??
          long[]?table;??
          int?size;??
          ??
          /**??
          ?*?Increments?the?popularity?of?the?element?if?it?does?not?exceed?the?maximum?(15).?The?popularity??
          ?*?of?all?elements?will?be?periodically?down?sampled?when?the?observed?events?exceeds?a?threshold.??
          ?*?This?process?provides?a?frequency?aging?to?allow?expired?long?term?entries?to?fade?away.??
          ?*??
          ?*?@param?e?the?element?to?add??
          ?*/
          ??
          public?void?increment(@NonNull?E?e)?{??
          ??if?(isNotInitialized())?{??
          ????return;??
          ??}??
          ??
          ??//根據(jù)key的hashCode通過一個哈希函數(shù)得到一個hash值??
          ??//本來就是hashCode了,為什么還要再做一次hash?怕原來的hashCode不夠均勻分散,再打散一下。??
          ??int?hash?=?spread(e.hashCode());??
          ??//這句光看有點難理解??
          ??//就如我剛才說的,Caffeine把一個long的64bit劃分成16個等分,每一等分4個bit。??
          ??//這個start就是用來定位到是哪一個等分的,用hash值低兩位作為隨機數(shù),再左移2位,得到一個小于16的值??
          ??int?start?=?(hash?&?3)?<2;??
          ??
          ??//indexOf方法的意思就是,根據(jù)hash值和不同種子得到table的下標index??
          ??//這里通過四個不同的種子,得到四個不同的下標index??
          ??int?index0?=?indexOf(hash,?0);??
          ??int?index1?=?indexOf(hash,?1);??
          ??int?index2?=?indexOf(hash,?2);??
          ??int?index3?=?indexOf(hash,?3);??
          ??
          ??//根據(jù)index和start(+1,?+2,?+3)的值,把table[index]對應(yīng)的等分追加1??
          ??//這個incrementAt方法有點難理解,看我下面的解釋??
          ??boolean?added?=?incrementAt(index0,?start);??
          ??added?|=?incrementAt(index1,?start?+?1);??
          ??added?|=?incrementAt(index2,?start?+?2);??
          ??added?|=?incrementAt(index3,?start?+?3);??
          ??
          ??//這個reset等下說??
          ??if?(added?&&?(++size?==?sampleSize))?{??
          ????reset();??
          ??}??
          }??
          ??
          /**??
          ?*?Increments?the?specified?counter?by?1?if?it?is?not?already?at?the?maximum?value?(15).??
          ?*??
          ?*?@param?i?the?table?index?(16?counters)??
          ?*?@param?j?the?counter?to?increment??
          ?*?@return?if?incremented??
          ?*/
          ??
          boolean?incrementAt(int?i,?int?j)?{??
          ??//這個j表示16個等分的下標,那么offset就是相當于在64位中的下標(這個自己想想)??
          ??int?offset?=?j?<2;??
          ??//上面提到Caffeine把頻率統(tǒng)計最大定為15,即0xfL??
          ??//mask就是在64位中的掩碼,即1111后面跟很多個0??
          ??long?mask?=?(0xfL?<??//如果&的結(jié)果不等于15,那么就追加1。等于15就不會再加了??
          ??if?((table[i]?&?mask)?!=?mask)?{??
          ????table[i]?+=?(1L?<????return?true;??
          ??}??
          ??return?false;??
          }??
          ??
          /**??
          ?*?Returns?the?table?index?for?the?counter?at?the?specified?depth.??
          ?*??
          ?*?@param?item?the?element's?hash??
          ?*?@param?i?the?counter?depth??
          ?*?@return?the?table?index??
          ?*/
          ??
          int?indexOf(int?item,?int?i)?{??
          ??long?hash?=?SEED[i]?*?item;??
          ??hash?+=?hash?>>>?32;??
          ??return?((int)?hash)?&?tableMask;??
          }??
          ??
          /**??
          ?*?Applies?a?supplemental?hash?function?to?a?given?hashCode,?which?defends?against?poor?quality??
          ?*?hash?functions.??
          ?*/
          ??
          int?spread(int?x)?{??
          ??x?=?((x?>>>?16)?^?x)?*?0x45d9f3b;??
          ??x?=?((x?>>>?16)?^?x)?*?0x45d9f3b;??
          ??return?(x?>>>?16)?^?x;??
          }??

          知道了追加方法,那么讀取方法frequency就很容易理解了。

          /**??
          ?*?Returns?the?estimated?number?of?occurrences?of?an?element,?up?to?the?maximum?(15).??
          ?*??
          ?*?@param?e?the?element?to?count?occurrences?of??
          ?*?@return?the?estimated?number?of?occurrences?of?the?element;?possibly?zero?but?never?negative??
          ?*/
          ??
          @NonNegative??
          public?int?frequency(@NonNull?E?e)?{??
          ??if?(isNotInitialized())?{??
          ????return?0;??
          ??}??
          ??
          ??//得到hash值,跟上面一樣??
          ??int?hash?=?spread(e.hashCode());??
          ??//得到等分的下標,跟上面一樣??
          ??int?start?=?(hash?&?3)?<2;??
          ??int?frequency?=?Integer.MAX_VALUE;??
          ??//循環(huán)四次,分別獲取在table數(shù)組中不同的下標位置??
          ??for?(int?i?=?0;?i?4;?i++)?{??
          ????int?index?=?indexOf(hash,?i);??
          ????//這個操作就不多說了,其實跟上面incrementAt是一樣的,定位到table[index]?+?等分的位置,再根據(jù)mask取出計數(shù)值??
          ????int?count?=?(int)?((table[index]?>>>?((start?+?i)?<2))?&?0xfL);??
          ????//取四個中的較小值??
          ????frequency?=?Math.min(frequency,?count);??
          ??}??
          ??return?frequency;??
          }??

          通過代碼和注釋或者讀者可能難以理解,下圖是我畫出來幫助大家理解的結(jié)構(gòu)圖。

          注意紫色虛線框,其中藍色小格就是需要計算的位置:

          圖片
          保新機制

          為了讓緩存保持“新鮮”,剔除掉過往頻率很高但之后不經(jīng)常的緩存,Caffeine有一個Freshness Mechanism。做法很簡答,就是當整體的統(tǒng)計計數(shù)(當前所有記錄的頻率統(tǒng)計之和,這個數(shù)值內(nèi)部維護)達到某一個值時,那么所有記錄的頻率統(tǒng)計除以2。

          從上面的代碼

          //size變量就是所有記錄的頻率統(tǒng)計之,即每個記錄加1,這個size都會加1??
          //sampleSize一個閾值,從FrequencySketch初始化可以看到它的值為maximumSize的10倍??
          if?(added?&&?(++size?==?sampleSize))?{??
          ??????reset();??
          }??

          看到reset方法就是做這個事情

          /**?Reduces?every?counter?by?half?of?its?original?value.?*/??
          void?reset()?{??
          ??int?count?=?0;??
          ??for?(int?i?=?0;?i?????count?+=?Long.bitCount(table[i]?&?ONE_MASK);??
          ????table[i]?=?(table[i]?>>>?1)?&?RESET_MASK;??
          ??}??
          ??size?=?(size?>>>?1)?-?(count?>>>?2);??
          }??

          關(guān)于這個reset方法,為什么是除以2,而不是其他,及其正確性,在最下面的參考資料的TinyLFU論文中3.3章節(jié)給出了數(shù)學證明,大家有興趣可以看看。

          增加一個Window?

          Caffeine通過測試發(fā)現(xiàn)TinyLFU在面對突發(fā)性的稀疏流量(sparse bursts)時表現(xiàn)很差,因為新的記錄(new items)還沒來得及建立足夠的頻率就被剔除出去了,這就使得命中率下降。

          于是Caffeine設(shè)計出一種新的policy,即Window Tiny LFU(W-TinyLFU),并通過實驗和實踐發(fā)現(xiàn)W-TinyLFU比TinyLFU表現(xiàn)的更好。

          W-TinyLFU的設(shè)計如下所示(兩圖等價):

          圖片
          圖片

          它主要包括兩個緩存模塊,主緩存是SLRU(Segmented LRU,即分段LRU),SLRU包括一個名為protected和一個名為probation的緩存區(qū)。通過增加一個緩存區(qū)(即Window Cache),當有新的記錄插入時,會先在window區(qū)呆一下,就可以避免上述說的sparse bursts問題。

          淘汰策略(eviction policy)

          當window區(qū)滿了,就會根據(jù)LRU把candidate(即淘汰出來的元素)放到probation區(qū),如果probation區(qū)也滿了,就把candidate和probation將要淘汰的元素victim,兩個進行“PK”,勝者留在probation,輸者就要被淘汰了。

          而且經(jīng)過實驗發(fā)現(xiàn)當window區(qū)配置為總?cè)萘康?%,剩余的99%當中的80%分給protected區(qū),20%分給probation區(qū)時,這時整體性能和命中率表現(xiàn)得最好,所以Caffeine默認的比例設(shè)置就是這個。

          不過這個比例Caffeine會在運行時根據(jù)統(tǒng)計數(shù)據(jù)(statistics)去動態(tài)調(diào)整,如果你的應(yīng)用程序的緩存隨著時間變化比較快的話,那么增加window區(qū)的比例可以提高命中率,相反緩存都是比較固定不變的話,增加Main Cache區(qū)(protected區(qū) +probation區(qū))的比例會有較好的效果。

          下面我們看看上面說到的淘汰策略是怎么實現(xiàn)的:

          一般緩存對讀寫操作后都有后續(xù)的一系列“維護”操作,Caffeine也不例外,這些操作都在maintenance方法,我們將要說到的淘汰策略也在里面。

          這方法比較重要,下面也會提到,所以這里只先說跟“淘汰策略”有關(guān)的evictEntriesclimb

          /**??
          ???*?Performs?the?pending?maintenance?work?and?sets?the?state?flags?during?processing?to?avoid??
          ???*?excess?scheduling?attempts.?The?read?buffer,?write?buffer,?and?reference?queues?are??
          ???*?drained,?followed?by?expiration,?and?size-based?eviction.??
          ???*??
          ???*?@param?task?an?additional?pending?task?to?run,?or?{@code?null}?if?not?present??
          ???*/
          ??
          ??@GuardedBy("evictionLock")??
          ??void?maintenance(@Nullable?Runnable?task)?{??
          ????lazySetDrainStatus(PROCESSING_TO_IDLE);??
          ??
          ????try?{??
          ??????drainReadBuffer();??
          ??
          ??????drainWriteBuffer();??
          ??????if?(task?!=?null)?{??
          ????????task.run();??
          ??????}??
          ??
          ??????drainKeyReferences();??
          ??????drainValueReferences();??
          ??
          ??????expireEntries();??
          ??????//把符合條件的記錄淘汰掉??
          ??????evictEntries();??
          ??????//動態(tài)調(diào)整window區(qū)和protected區(qū)的大小??
          ??????climb();??
          ????}?finally?{??
          ??????if?((drainStatus()?!=?PROCESSING_TO_IDLE)?||?!casDrainStatus(PROCESSING_TO_IDLE,?IDLE))?{??
          ????????lazySetDrainStatus(REQUIRED);??
          ??????}??
          ????}??
          ??}??

          先說一下Caffeine對上面說到的W-TinyLFU策略的實現(xiàn)用到的數(shù)據(jù)結(jié)構(gòu):

          //最大的個數(shù)限制??
          long?maximum;??
          //當前的個數(shù)??
          long?weightedSize;??
          //window區(qū)的最大限制??
          long?windowMaximum;??
          //window區(qū)當前的個數(shù)??
          long?windowWeightedSize;??
          //protected區(qū)的最大限制??
          long?mainProtectedMaximum;??
          //protected區(qū)當前的個數(shù)??
          long?mainProtectedWeightedSize;??
          //下一次需要調(diào)整的大小(還需要進一步計算)??
          double?stepSize;??
          //window區(qū)需要調(diào)整的大小??
          long?adjustment;??
          //命中計數(shù)??
          int?hitsInSample;??
          //不命中的計數(shù)??
          int?missesInSample;??
          //上一次的緩存命中率??
          double?previousSampleHitRate;??
          ??
          final?FrequencySketch?sketch;??
          //window區(qū)的LRU?queue(FIFO)??
          final?AccessOrderDeque>?accessOrderWindowDeque;??
          //probation區(qū)的LRU?queue(FIFO)??
          final?AccessOrderDeque>?accessOrderProbationDeque;??
          //protected區(qū)的LRU?queue(FIFO)??
          final?AccessOrderDeque>?accessOrderProtectedDeque;??

          以及默認比例設(shè)置(意思看注釋)

          /**?The?initial?percent?of?the?maximum?weighted?capacity?dedicated?to?the?main?space.?*/??
          static?final?double?PERCENT_MAIN?=?0.99d;??
          /**?The?percent?of?the?maximum?weighted?capacity?dedicated?to?the?main's?protected?space.?*/??
          static?final?double?PERCENT_MAIN_PROTECTED?=?0.80d;??
          /**?The?difference?in?hit?rates?that?restarts?the?climber.?*/??
          static?final?double?HILL_CLIMBER_RESTART_THRESHOLD?=?0.05d;??
          /**?The?percent?of?the?total?size?to?adapt?the?window?by.?*/??
          static?final?double?HILL_CLIMBER_STEP_PERCENT?=?0.0625d;??
          /**?The?rate?to?decrease?the?step?size?to?adapt?by.?*/??
          static?final?double?HILL_CLIMBER_STEP_DECAY_RATE?=?0.98d;??
          /**?The?maximum?number?of?entries?that?can?be?transfered?between?queues.?*/??

          重點來了,evictEntries和climb方法:

          /**?Evicts?entries?if?the?cache?exceeds?the?maximum.?*/??
          @GuardedBy("evictionLock")??
          void?evictEntries()?{??
          ??if?(!evicts())?{??
          ????return;??
          ??}??
          ??//淘汰window區(qū)的記錄??
          ??int?candidates?=?evictFromWindow();??
          ??//淘汰Main區(qū)的記錄??
          ??evictFromMain(candidates);??
          }??
          ??
          /**??
          ?*?Evicts?entries?from?the?window?space?into?the?main?space?while?the?window?size?exceeds?a??
          ?*?maximum.??
          ?*??
          ?*?@return?the?number?of?candidate?entries?evicted?from?the?window?space??
          ?*/
          ??
          //根據(jù)W-TinyLFU,新的數(shù)據(jù)都會無條件的加到admission?window??
          //但是window是有大小限制,所以要“定期”做一下“維護”??
          @GuardedBy("evictionLock")??
          int?evictFromWindow()?{??
          ??int?candidates?=?0;??
          ??//查看window?queue的頭部節(jié)點??
          ??Node?node?=?accessOrderWindowDeque().peek();??
          ??//如果window區(qū)超過了最大的限制,那么就要把“多出來”的記錄做處理??
          ??while?(windowWeightedSize()?>?windowMaximum())?{??
          ????//?The?pending?operations?will?adjust?the?size?to?reflect?the?correct?weight??
          ????if?(node?==?null)?{??
          ??????break;??
          ????}??
          ????//下一個節(jié)點??
          ????Node?next?=?node.getNextInAccessOrder();??
          ????if?(node.getWeight()?!=?0)?{??
          ??????//把node定位在probation區(qū)??
          ??????node.makeMainProbation();??
          ??????//從window區(qū)去掉??
          ??????accessOrderWindowDeque().remove(node);??
          ??????//加入到probation?queue,相當于把節(jié)點移動到probation區(qū)(晉升了)??
          ??????accessOrderProbationDeque().add(node);??
          ??????candidates++;??
          ??????//因為移除了一個節(jié)點,所以需要調(diào)整window的size??
          ??????setWindowWeightedSize(windowWeightedSize()?-?node.getPolicyWeight());??
          ????}??
          ????//處理下一個節(jié)點??
          ????node?=?next;??
          ??}??
          ??
          ??return?candidates;??
          }??

          evictFromMain方法:

          /**??
          ?*?Evicts?entries?from?the?main?space?if?the?cache?exceeds?the?maximum?capacity.?The?main?space??
          ?*?determines?whether?admitting?an?entry?(coming?from?the?window?space)?is?preferable?to?retaining??
          ?*?the?eviction?policy's?victim.?This?is?decision?is?made?using?a?frequency?filter?so?that?the??
          ?*?least?frequently?used?entry?is?removed.??
          ?*??
          ?*?The?window?space?candidates?were?previously?placed?in?the?MRU?position?and?the?eviction??
          ?*?policy's?victim?is?at?the?LRU?position.?The?two?ends?of?the?queue?are?evaluated?while?an??
          ?*?eviction?is?required.?The?number?of?remaining?candidates?is?provided?and?decremented?on??
          ?*?eviction,?so?that?when?there?are?no?more?candidates?the?victim?is?evicted.??
          ?*??
          ?*?@param?candidates?the?number?of?candidate?entries?evicted?from?the?window?space??
          ?*/
          ??
          //根據(jù)W-TinyLFU,從window晉升過來的要跟probation區(qū)的進行“PK”,勝者才能留下??
          @GuardedBy("evictionLock")??
          void?evictFromMain(int?candidates)?{??
          ??int?victimQueue?=?PROBATION;??
          ??//victim是probation?queue的頭部??
          ??Node?victim?=?accessOrderProbationDeque().peekFirst();??
          ??//candidate是probation?queue的尾部,也就是剛從window晉升來的??
          ??Node?candidate?=?accessOrderProbationDeque().peekLast();??
          ??//當cache不夠容量時才做處理??
          ??while?(weightedSize()?>?maximum())?{??
          ????//?Stop?trying?to?evict?candidates?and?always?prefer?the?victim??
          ????if?(candidates?==?0)?{??
          ??????candidate?=?null;??
          ????}??
          ??
          ????//對candidate為null且victim為bull的處理??
          ????if?((candidate?==?null)?&&?(victim?==?null))?{??
          ??????if?(victimQueue?==?PROBATION)?{??
          ????????victim?=?accessOrderProtectedDeque().peekFirst();??
          ????????victimQueue?=?PROTECTED;??
          ????????continue;??
          ??????}?else?if?(victimQueue?==?PROTECTED)?{??
          ????????victim?=?accessOrderWindowDeque().peekFirst();??
          ????????victimQueue?=?WINDOW;??
          ????????continue;??
          ??????}??
          ??
          ??????//?The?pending?operations?will?adjust?the?size?to?reflect?the?correct?weight??
          ??????break;??
          ????}??
          ??
          ????//對節(jié)點的weight為0的處理??
          ????if?((victim?!=?null)?&&?(victim.getPolicyWeight()?==?0))?{??
          ??????victim?=?victim.getNextInAccessOrder();??
          ??????continue;??
          ????}?else?if?((candidate?!=?null)?&&?(candidate.getPolicyWeight()?==?0))?{??
          ??????candidate?=?candidate.getPreviousInAccessOrder();??
          ??????candidates--;??
          ??????continue;??
          ????}??
          ??
          ????//?Evict?immediately?if?only?one?of?the?entries?is?present??
          ????if?(victim?==?null)?{??
          ??????@SuppressWarnings("NullAway")??
          ??????Node?previous?=?candidate.getPreviousInAccessOrder();??
          ??????Node?evict?=?candidate;??
          ??????candidate?=?previous;??
          ??????candidates--;??
          ??????evictEntry(evict,?RemovalCause.SIZE,?0L);??
          ??????continue;??
          ????}?else?if?(candidate?==?null)?{??
          ??????Node?evict?=?victim;??
          ??????victim?=?victim.getNextInAccessOrder();??
          ??????evictEntry(evict,?RemovalCause.SIZE,?0L);??
          ??????continue;??
          ????}??
          ??
          ????//?Evict?immediately?if?an?entry?was?collected??
          ????K?victimKey?=?victim.getKey();??
          ????K?candidateKey?=?candidate.getKey();??
          ????if?(victimKey?==?null)?{??
          ??????@NonNull?Node?evict?=?victim;??
          ??????victim?=?victim.getNextInAccessOrder();??
          ??????evictEntry(evict,?RemovalCause.COLLECTED,?0L);??
          ??????continue;??
          ????}?else?if?(candidateKey?==?null)?{??
          ??????candidates--;??
          ??????@NonNull?Node?evict?=?candidate;??
          ??????candidate?=?candidate.getPreviousInAccessOrder();??
          ??????evictEntry(evict,?RemovalCause.COLLECTED,?0L);??
          ??????continue;??
          ????}??
          ??
          ????//放不下的節(jié)點直接處理掉??
          ????if?(candidate.getPolicyWeight()?>?maximum())?{??
          ??????candidates--;??
          ??????Node?evict?=?candidate;??
          ??????candidate?=?candidate.getPreviousInAccessOrder();??
          ??????evictEntry(evict,?RemovalCause.SIZE,?0L);??
          ??????continue;??
          ????}??
          ??
          ????//根據(jù)節(jié)點的統(tǒng)計頻率frequency來做比較,看看要處理掉victim還是candidate??
          ????//admit是具體的比較規(guī)則,看下面??
          ????candidates--;??
          ????//如果candidate勝出則淘汰victim??
          ????if?(admit(candidateKey,?victimKey))?{??
          ??????Node?evict?=?victim;??
          ??????victim?=?victim.getNextInAccessOrder();??
          ??????evictEntry(evict,?RemovalCause.SIZE,?0L);??
          ??????candidate?=?candidate.getPreviousInAccessOrder();??
          ????}?else?{??
          ??????//如果是victim勝出,則淘汰candidate??
          ??????Node?evict?=?candidate;??
          ??????candidate?=?candidate.getPreviousInAccessOrder();??
          ??????evictEntry(evict,?RemovalCause.SIZE,?0L);??
          ????}??
          ??}??
          }??
          ??
          /**??
          ?*?Determines?if?the?candidate?should?be?accepted?into?the?main?space,?as?determined?by?its??
          ?*?frequency?relative?to?the?victim.?A?small?amount?of?randomness?is?used?to?protect?against?hash??
          ?*?collision?attacks,?where?the?victim's?frequency?is?artificially?raised?so?that?no?new?entries??
          ?*?are?admitted.??
          ?*??
          ?*?@param?candidateKey?the?key?for?the?entry?being?proposed?for?long?term?retention??
          ?*?@param?victimKey?the?key?for?the?entry?chosen?by?the?eviction?policy?for?replacement??
          ?*?@return?if?the?candidate?should?be?admitted?and?the?victim?ejected??
          ?*/
          ??
          @GuardedBy("evictionLock")??
          boolean?admit(K?candidateKey,?K?victimKey)?{??
          ??//分別獲取victim和candidate的統(tǒng)計頻率??
          ??//frequency這個方法的原理和實現(xiàn)上面已經(jīng)解釋了??
          ??int?victimFreq?=?frequencySketch().frequency(victimKey);??
          ??int?candidateFreq?=?frequencySketch().frequency(candidateKey);??
          ??//誰大誰贏??
          ??if?(candidateFreq?>?victimFreq)?{??
          ????return?true;??
          ??????
          ????//如果相等,candidate小于5都當輸了??
          ??}?else?if?(candidateFreq?<=?5)?{??
          ????//?The?maximum?frequency?is?15?and?halved?to?7?after?a?reset?to?age?the?history.?An?attack??
          ????//?exploits?that?a?hot?candidate?is?rejected?in?favor?of?a?hot?victim.?The?threshold?of?a?warm??
          ????//?candidate?reduces?the?number?of?random?acceptances?to?minimize?the?impact?on?the?hit?rate.??
          ????return?false;??
          ??}??
          ??//如果相等且candidate大于5,則隨機淘汰一個??
          ??int?random?=?ThreadLocalRandom.current().nextInt();??
          ??return?((random?&?127)?==?0);??
          }??

          climb方法主要是用來調(diào)整window size的,使得Caffeine可以適應(yīng)你的應(yīng)用類型(如OLAP或OLTP)表現(xiàn)出最佳的命中率。

          下圖是官方測試的數(shù)據(jù):

          圖片

          我們看看window size的調(diào)整是怎么實現(xiàn)的。

          調(diào)整時用到的默認比例數(shù)據(jù):

          //與上次命中率之差的閾值??
          static?final?double?HILL_CLIMBER_RESTART_THRESHOLD?=?0.05d;??
          //步長(調(diào)整)的大小(跟最大值maximum的比例)??
          static?final?double?HILL_CLIMBER_STEP_PERCENT?=?0.0625d;??
          //步長的衰減比例??
          static?final?double?HILL_CLIMBER_STEP_DECAY_RATE?=?0.98d;??
          ??/**?Adapts?the?eviction?policy?to?towards?the?optimal?recency?/?frequency?configuration.?*/??
          //climb方法的主要作用就是動態(tài)調(diào)整window區(qū)的大小(相應(yīng)的,main區(qū)的大小也會發(fā)生變化,兩個之和為100%)。??
          //因為區(qū)域的大小發(fā)生了變化,那么區(qū)域內(nèi)的數(shù)據(jù)也可能需要發(fā)生相應(yīng)的移動。??
          @GuardedBy("evictionLock")??
          void?climb()?{??
          ??if?(!evicts())?{??
          ????return;??
          ??}??
          ??//確定window需要調(diào)整的大小??
          ??determineAdjustment();??
          ??//如果protected區(qū)有溢出,把溢出部分移動到probation區(qū)。因為下面的操作有可能需要調(diào)整到protected區(qū)。??
          ??demoteFromMainProtected();??
          ??long?amount?=?adjustment();??
          ??if?(amount?==?0)?{??
          ????return;??
          ??}?else?if?(amount?>?0)?{??
          ????//增加window的大小??
          ????increaseWindow();??
          ??}?else?{??
          ????//減少window的大小??
          ????decreaseWindow();??
          ??}??
          }??

          下面分別展開每個方法來解釋:

          /**?Calculates?the?amount?to?adapt?the?window?by?and?sets?{@link?#adjustment()}?accordingly.?*/??
          @GuardedBy("evictionLock")??
          void?determineAdjustment()?{??
          ??//如果frequencySketch還沒初始化,則返回??
          ??if?(frequencySketch().isNotInitialized())?{??
          ????setPreviousSampleHitRate(0.0);??
          ????setMissesInSample(0);??
          ????setHitsInSample(0);??
          ????return;??
          ??}??
          ??//總請求量?=?命中?+?miss??
          ??int?requestCount?=?hitsInSample()?+?missesInSample();??
          ??//沒達到sampleSize則返回??
          ??//默認下sampleSize = 10?* maximum。用sampleSize來判斷緩存是否足夠”熱“。??
          ??if?(requestCount?????return;??
          ??}??
          ????
          ??//命中率的公式?=?命中?/?總請求??
          ??double?hitRate?=?(double)?hitsInSample()?/?requestCount;??
          ??//命中率的差值??
          ??double?hitRateChange?=?hitRate?-?previousSampleHitRate();??
          ??//本次調(diào)整的大小,是由命中率的差值和上次的stepSize決定的??
          ??double?amount?=?(hitRateChange?>=?0)???stepSize()?:?-stepSize();??
          ??//下次的調(diào)整大小:如果命中率的之差大于0.05,則重置為0.065 * maximum,否則按照0.98來進行衰減??
          ??double?nextStepSize?=?(Math.abs(hitRateChange)?>=?HILL_CLIMBER_RESTART_THRESHOLD)??
          ????????HILL_CLIMBER_STEP_PERCENT?*?maximum()?*?(amount?>=?0???1?:?-1)??
          ??????:?HILL_CLIMBER_STEP_DECAY_RATE?*?amount;??
          ??setPreviousSampleHitRate(hitRate);??
          ??setAdjustment((long)?amount);??
          ??setStepSize(nextStepSize);??
          ??setMissesInSample(0);??
          ??setHitsInSample(0);??
          }??
          ??
          /**?Transfers?the?nodes?from?the?protected?to?the?probation?region?if?it?exceeds?the?maximum.?*/??
          ??
          //這個方法比較簡單,減少protected區(qū)溢出的部分??
          @GuardedBy("evictionLock")??
          void?demoteFromMainProtected()?{??
          ??long?mainProtectedMaximum?=?mainProtectedMaximum();??
          ??long?mainProtectedWeightedSize?=?mainProtectedWeightedSize();??
          ??if?(mainProtectedWeightedSize?<=?mainProtectedMaximum)?{??
          ????return;??
          ??}??
          ??
          ??for?(int?i?=?0;?i?????if?(mainProtectedWeightedSize?<=?mainProtectedMaximum)?{??
          ??????break;??
          ????}??
          ??
          ????Node?demoted?=?accessOrderProtectedDeque().poll();??
          ????if?(demoted?==?null)?{??
          ??????break;??
          ????}??
          ????demoted.makeMainProbation();??
          ????accessOrderProbationDeque().add(demoted);??
          ????mainProtectedWeightedSize?-=?demoted.getPolicyWeight();??
          ??}??
          ??setMainProtectedWeightedSize(mainProtectedWeightedSize);??
          }??
          ??
          /**??
          ?*?Increases?the?size?of?the?admission?window?by?shrinking?the?portion?allocated?to?the?main??
          ?*?space.?As?the?main?space?is?partitioned?into?probation?and?protected?regions?(80%?/?20%),?for??
          ?*?simplicity?only?the?protected?is?reduced.?If?the?regions?exceed?their?maximums,?this?may?cause??
          ?*?protected?items?to?be?demoted?to?the?probation?region?and?probation?items?to?be?demoted?to?the??
          ?*?admission?window.??
          ?*/
          ??
          ??
          //增加window區(qū)的大小,這個方法比較簡單,思路就像我上面說的??
          @GuardedBy("evictionLock")??
          void?increaseWindow()?{??
          ??if?(mainProtectedMaximum()?==?0)?{??
          ????return;??
          ??}??
          ??
          ??long?quota?=?Math.min(adjustment(),?mainProtectedMaximum());??
          ??setMainProtectedMaximum(mainProtectedMaximum()?-?quota);??
          ??setWindowMaximum(windowMaximum()?+?quota);??
          ??demoteFromMainProtected();??
          ??
          ??for?(int?i?=?0;?i?????Node?candidate?=?accessOrderProbationDeque().peek();??
          ????boolean?probation?=?true;??
          ????if?((candidate?==?null)?||?(quota???????candidate?=?accessOrderProtectedDeque().peek();??
          ??????probation?=?false;??
          ????}??
          ????if?(candidate?==?null)?{??
          ??????break;??
          ????}??
          ??
          ????int?weight?=?candidate.getPolicyWeight();??
          ????if?(quota???????break;??
          ????}??
          ??
          ????quota?-=?weight;??
          ????if?(probation)?{??
          ??????accessOrderProbationDeque().remove(candidate);??
          ????}?else?{??
          ??????setMainProtectedWeightedSize(mainProtectedWeightedSize()?-?weight);??
          ??????accessOrderProtectedDeque().remove(candidate);??
          ????}??
          ????setWindowWeightedSize(windowWeightedSize()?+?weight);??
          ????accessOrderWindowDeque().add(candidate);??
          ????candidate.makeWindow();??
          ??}??
          ??
          ??setMainProtectedMaximum(mainProtectedMaximum()?+?quota);??
          ??setWindowMaximum(windowMaximum()?-?quota);??
          ??setAdjustment(quota);??
          }??
          ??
          /**?Decreases?the?size?of?the?admission?window?and?increases?the?main's?protected?region.?*/??
          //同上increaseWindow差不多,反操作??
          @GuardedBy("evictionLock")??
          void?decreaseWindow()?{??
          ??if?(windowMaximum()?<=?1)?{??
          ????return;??
          ??}??
          ??
          ??long?quota?=?Math.min(-adjustment(),?Math.max(0,?windowMaximum()?-?1));??
          ??setMainProtectedMaximum(mainProtectedMaximum()?+?quota);??
          ??setWindowMaximum(windowMaximum()?-?quota);??
          ??
          ??for?(int?i?=?0;?i?????Node?candidate?=?accessOrderWindowDeque().peek();??
          ????if?(candidate?==?null)?{??
          ??????break;??
          ????}??
          ??
          ????int?weight?=?candidate.getPolicyWeight();??
          ????if?(quota???????break;??
          ????}??
          ??
          ????quota?-=?weight;??
          ????setMainProtectedWeightedSize(mainProtectedWeightedSize()?+?weight);??
          ????setWindowWeightedSize(windowWeightedSize()?-?weight);??
          ????accessOrderWindowDeque().remove(candidate);??
          ????accessOrderProbationDeque().add(candidate);??
          ????candidate.makeMainProbation();??
          ??}??
          ??
          ??setMainProtectedMaximum(mainProtectedMaximum()?-?quota);??
          ??setWindowMaximum(windowMaximum()?+?quota);??
          ??setAdjustment(-quota);??
          }??

          以上,是Caffeine的W-TinyLFU策略的設(shè)計原理及代碼實現(xiàn)解析。

          異步的高性能讀寫

          一般的緩存每次對數(shù)據(jù)處理完之后(讀的話,已經(jīng)存在則直接返回,不存在則load數(shù)據(jù),保存,再返回;寫的話,則直接插入或更新),但是因為要維護一些淘汰策略,則需要一些額外的操作,諸如:

          • 計算和比較數(shù)據(jù)的是否過期

          • 統(tǒng)計頻率(像LFU或其變種)

          • 維護read queue和write queue

          • 淘汰符合條件的數(shù)據(jù)

          • 等等。。。

          這種數(shù)據(jù)的讀寫伴隨著緩存狀態(tài)的變更,Guava Cache的做法是把這些操作和讀寫操作放在一起,在一個同步加鎖的操作中完成,雖然Guava Cache巧妙地利用了JDK的ConcurrentHashMap(分段鎖或者無鎖CAS)來降低鎖的密度,達到提高并發(fā)度的目的。但是,對于一些熱點數(shù)據(jù),這種做法還是避免不了頻繁的鎖競爭。Caffeine借鑒了數(shù)據(jù)庫系統(tǒng)的WAL(Write-Ahead Logging)思想,即先寫日志再執(zhí)行操作,這種思想同樣適合緩存的,執(zhí)行讀寫操作時,先把操作記錄在緩沖區(qū),然后在合適的時機異步、批量地執(zhí)行緩沖區(qū)中的內(nèi)容。但在執(zhí)行緩沖區(qū)的內(nèi)容時,也是需要在緩沖區(qū)加上同步鎖的,不然存在并發(fā)問題,只不過這樣就可以把對鎖的競爭從緩存數(shù)據(jù)轉(zhuǎn)移到對緩沖區(qū)上。

          ReadBuffer

          在Caffeine的內(nèi)部實現(xiàn)中,為了很好的支持不同的Features(如Eviction,Removal,Refresh,Statistics,Cleanup,Policy等等),擴展了很多子類,它們共同的父類是BoundedLocalCache,而readBuffer就是作為它們共有的屬性,即都是用一樣的readBuffer,看定義:

          final?Buffer>?readBuffer;??
          ??
          readBuffer?=?evicts()?||?collectKeys()?||?collectValues()?||?expiresAfterAccess()??
          ??????????new?BoundedBuffer<>()??
          ????????:?Buffer.disabled();??

          上面提到Caffeine對每次緩存的讀操作都會觸發(fā)afterRead

          /**??
          ?*?Performs?the?post-processing?work?required?after?a?read.??
          ?*??
          ?*?@param?node?the?entry?in?the?page?replacement?policy??
          ?*?@param?now?the?current?time,?in?nanoseconds??
          ?*?@param?recordHit?if?the?hit?count?should?be?incremented??
          ?*/
          ??
          void?afterRead(Node?node,?long?now,?boolean?recordHit)?{??
          ??if?(recordHit)?{??
          ????statsCounter().recordHits(1);??
          ??}??
          ??//把記錄加入到readBuffer??
          ??//判斷是否需要立即處理readBuffer??
          ??//注意這里無論offer是否成功都可以走下去的,即允許寫入readBuffer丟失,因為這個??
          ??boolean?delayable?=?skipReadBuffer()?||?(readBuffer.offer(node)?!=?Buffer.FULL);??
          ??if?(shouldDrainBuffers(delayable))?{??
          ????scheduleDrainBuffers();??
          ??}??
          ??refreshIfNeeded(node,?now);??
          }??
          ??
          ?/**??
          ???*?Returns?whether?maintenance?work?is?needed.??
          ???*??
          ???*?@param?delayable?if?draining?the?read?buffer?can?be?delayed??
          ???*/
          ??
          ??
          ??//caffeine用了一組狀態(tài)來定義和管理“維護”的過程??
          ??boolean?shouldDrainBuffers(boolean?delayable)?{??
          ????switch?(drainStatus())?{??
          ??????case?IDLE:??
          ????????return?!delayable;??
          ??????case?REQUIRED:??
          ????????return?true;??
          ??????case?PROCESSING_TO_IDLE:??
          ??????case?PROCESSING_TO_REQUIRED:??
          ????????return?false;??
          ??????default:??
          ????????throw?new?IllegalStateException();??
          ????}??
          ??}??

          重點看BoundedBuffer

          /**??
          ?*?A?striped,?non-blocking,?bounded?buffer.??
          ?*??
          ?*?@author[email protected]?(Ben?Manes)??
          ?*?@param??the?type?of?elements?maintained?by?this?buffer??
          ?*/
          ??
          final?class?BoundedBuffer<E>?extends?StripedBuffer<E>??

          它是一個striped、非阻塞、有界限的buffer,繼承于StripedBuffer類。下面看看StripedBuffer的實現(xiàn):

          /**??
          ?*?A?base?class?providing?the?mechanics?for?supporting?dynamic?striping?of?bounded?buffers.?This??
          ?*?implementation?is?an?adaption?of?the?numeric?64-bit?{@link?java.util.concurrent.atomic.Striped64}??
          ?*?class,?which?is?used?by?atomic?counters.?The?approach?was?modified?to?lazily?grow?an?array?of??
          ?*?buffers?in?order?to?minimize?memory?usage?for?caches?that?are?not?heavily?contended?on.??
          ?*??
          ?*?@author[email protected]?(Doug?Lea)??
          ?*?@author[email protected]?(Ben?Manes)??
          ?*/
          ??
          ??
          abstract?class?StripedBuffer<E>?implements?Buffer<E>??

          這個StripedBuffer設(shè)計的思想是跟Striped64類似的,通過擴展結(jié)構(gòu)把競爭熱點分離。

          具體實現(xiàn)是這樣的,StripedBuffer維護一個Buffer[]數(shù)組,每個元素就是一個RingBuffer,每個線程用自己threadLocalRandomProbe屬性作為hash值,這樣就相當于每個線程都有自己“專屬”的RingBuffer,就不會產(chǎn)生競爭啦,而不是用key的hashCode作為hash值,因為會產(chǎn)生熱點數(shù)據(jù)問題。

          看看StripedBuffer的屬性

          /**?Table?of?buffers.?When?non-null,?size?is?a?power?of?2.?*/??
          //RingBuffer數(shù)組??
          transient?volatile?Buffer?@Nullable[]?table;??
          ??
          //當進行resize時,需要整個table鎖住。tableBusy作為CAS的標記。??
          static?final?long?TABLE_BUSY?=?UnsafeAccess.objectFieldOffset(StripedBuffer.class,?"tableBusy");??
          static?final?long?PROBE?=?UnsafeAccess.objectFieldOffset(Thread.class,?"threadLocalRandomProbe");??
          ??
          /**?Number?of?CPUS.?*/??
          static?final?int?NCPU?=?Runtime.getRuntime().availableProcessors();??
          ??
          /**?The?bound?on?the?table?size.?*/??
          //table最大size??
          static?final?int?MAXIMUM_TABLE_SIZE?=?4?*?ceilingNextPowerOfTwo(NCPU);??
          ??
          /**?The?maximum?number?of?attempts?when?trying?to?expand?the?table.?*/??
          //如果發(fā)生競爭時(CAS失敗)的嘗試次數(shù)??
          static?final?int?ATTEMPTS?=?3;??
          ??
          /**?Table?of?buffers.?When?non-null,?size?is?a?power?of?2.?*/??
          //核心數(shù)據(jù)結(jié)構(gòu)??
          transient?volatile?Buffer?@Nullable[]?table;??
          ??
          /**?Spinlock?(locked?via?CAS)?used?when?resizing?and/or?creating?Buffers.?*/??
          transient?volatile?int?tableBusy;??
          ??
          /**?CASes?the?tableBusy?field?from?0?to?1?to?acquire?lock.?*/??
          final?boolean?casTableBusy()?{??
          ??return?UnsafeAccess.UNSAFE.compareAndSwapInt(this,?TABLE_BUSY,?0,?1);??
          }??
          ??
          /**??
          ?*?Returns?the?probe?value?for?the?current?thread.?Duplicated?from?ThreadLocalRandom?because?of??
          ?*?packaging?restrictions.??
          ?*/
          ??
          static?final?int?getProbe()?{??
          ??return?UnsafeAccess.UNSAFE.getInt(Thread.currentThread(),?PROBE);??
          }??

          offer方法,當沒初始化或存在競爭時,則擴容為2倍。

          實際是調(diào)用RingBuffer的offer方法,把數(shù)據(jù)追加到RingBuffer后面。

          @Override??
          public?int?offer(E?e)?{??
          ??int?mask;??
          ??int?result?=?0;??
          ??Buffer?buffer;??
          ??//是否不存在競爭??
          ??boolean?uncontended?=?true;??
          ??Buffer[]?buffers?=?table??
          ??//是否已經(jīng)初始化??
          ??if?((buffers?==?null)??
          ??????||?(mask?=?buffers.length?-?1)?0??
          ??????//用thread的隨機值作為hash值,得到對應(yīng)位置的RingBuffer??
          ??????||?(buffer?=?buffers[getProbe()?&?mask])?==?null??
          ??????//檢查追加到RingBuffer是否成功??
          ??????||?!(uncontended?=?((result?=?buffer.offer(e))?!=?Buffer.FAILED)))?{??
          ????//其中一個符合條件則進行擴容??
          ????expandOrRetry(e,?uncontended);??
          ??}??
          ??return?result;??
          }??
          ??
          /**??
          ?*?Handles?cases?of?updates?involving?initialization,?resizing,?creating?new?Buffers,?and/or??
          ?*?contention.?See?above?for?explanation.?This?method?suffers?the?usual?non-modularity?problems?of??
          ?*?optimistic?retry?code,?relying?on?rechecked?sets?of?reads.??
          ?*??
          ?*?@param?e?the?element?to?add??
          ?*?@param?wasUncontended?false?if?CAS?failed?before?call??
          ?*/
          ??
          ??
          //這個方法比較長,但思路還是相對清晰的。??
          @SuppressWarnings("PMD.ConfusingTernary")??
          final?void?expandOrRetry(E?e,?boolean?wasUncontended)?{??
          ??int?h;??
          ??if?((h?=?getProbe())?==?0)?{??
          ????ThreadLocalRandom.current();?//?force?initialization??
          ????h?=?getProbe();??
          ????wasUncontended?=?true;??
          ??}??
          ??boolean?collide?=?false;?//?True?if?last?slot?nonempty??
          ??for?(int?attempt?=?0;?attempt?????Buffer[]?buffers;??
          ????Buffer?buffer;??
          ????int?n;??
          ????if?(((buffers?=?table)?!=?null)?&&?((n?=?buffers.length)?>?0))?{??
          ??????if?((buffer?=?buffers[(n?-?1)?&?h])?==?null)?{??
          ????????if?((tableBusy?==?0)?&&?casTableBusy())?{?//?Try?to?attach?new?Buffer??
          ??????????boolean?created?=?false;??
          ??????????try?{?//?Recheck?under?lock??
          ????????????Buffer[]?rs;??
          ????????????int?mask,?j;??
          ????????????if?(((rs?=?table)?!=?null)?&&?((mask?=?rs.length)?>?0)??
          ????????????????&&?(rs[j?=?(mask?-?1)?&?h]?==?null))?{??
          ??????????????rs[j]?=?create(e);??
          ??????????????created?=?true;??
          ????????????}??
          ??????????}?finally?{??
          ????????????tableBusy?=?0;??
          ??????????}??
          ??????????if?(created)?{??
          ????????????break;??
          ??????????}??
          ??????????continue;?//?Slot?is?now?non-empty??
          ????????}??
          ????????collide?=?false;??
          ??????}?else?if?(!wasUncontended)?{?//?CAS?already?known?to?fail??
          ????????wasUncontended?=?true;??????//?Continue?after?rehash??
          ??????}?else?if?(buffer.offer(e)?!=?Buffer.FAILED)?{??
          ????????break;??
          ??????}?else?if?(n?>=?MAXIMUM_TABLE_SIZE?||?table?!=?buffers)?{??
          ????????collide?=?false;?//?At?max?size?or?stale??
          ??????}?else?if?(!collide)?{??
          ????????collide?=?true;??
          ??????}?else?if?(tableBusy?==?0?&&?casTableBusy())?{??
          ????????try?{??
          ??????????if?(table?==?buffers)?{?//?Expand?table?unless?stale??
          ????????????table?=?Arrays.copyOf(buffers,?n?<1);??
          ??????????}??
          ????????}?finally?{??
          ??????????tableBusy?=?0;??
          ????????}??
          ????????collide?=?false;??
          ????????continue;?//?Retry?with?expanded?table??
          ??????}??
          ??????h?=?advanceProbe(h);??
          ????}?else?if?((tableBusy?==?0)?&&?(table?==?buffers)?&&?casTableBusy())?{??
          ??????boolean?init?=?false;??
          ??????try?{?//?Initialize?table??
          ????????if?(table?==?buffers)?{??
          ??????????@SuppressWarnings({"unchecked",?"rawtypes"})??
          ??????????Buffer[]?rs?=?new?Buffer[1];??
          ??????????rs[0]?=?create(e);??
          ??????????table?=?rs;??
          ??????????init?=?true;??
          ????????}??
          ??????}?finally?{??
          ????????tableBusy?=?0;??
          ??????}??
          ??????if?(init)?{??
          ????????break;??
          ??????}??
          ????}??
          ??}??
          }??

          最后看看RingBuffer,注意RingBuffer是BoundedBuffer的內(nèi)部類。

          /**?The?maximum?number?of?elements?per?buffer.?*/??
          static?final?int?BUFFER_SIZE?=?16;??
          ??
          //?Assume?4-byte?references?and?64-byte?cache?line?(16?elements?per?line)??
          //256長度,但是是以16為單位,所以最多存放16個元素??
          static?final?int?SPACED_SIZE?=?BUFFER_SIZE?<4;??
          static?final?int?SPACED_MASK?=?SPACED_SIZE?-?1;??
          static?final?int?OFFSET?=?16;???
          //RingBuffer數(shù)組??
          final?AtomicReferenceArray?buffer;??
          ??
          ?//插入方法??
          ?@Override??
          ?public?int?offer(E?e)?{??
          ???long?head?=?readCounter;??
          ???long?tail?=?relaxedWriteCounter();??
          ???//用head和tail來限制個數(shù)??
          ???long?size?=?(tail?-?head);??
          ???if?(size?>=?SPACED_SIZE)?{??
          ?????return?Buffer.FULL;??
          ???}??
          ???//tail追加16??
          ???if?(casWriteCounter(tail,?tail?+?OFFSET))?{??
          ?????//用tail“取余”得到下標??
          ?????int?index?=?(int)?(tail?&?SPACED_MASK);??
          ?????//用unsafe.putOrderedObject設(shè)值??
          ?????buffer.lazySet(index,?e);??
          ?????return?Buffer.SUCCESS;??
          ???}??
          ???//如果CAS失敗則返回失敗??
          ???return?Buffer.FAILED;??
          ?}??
          ??
          ?//用consumer來處理buffer的數(shù)據(jù)??
          ?@Override??
          ?public?void?drainTo(Consumer?consumer)?{??
          ???long?head?=?readCounter;??
          ???long?tail?=?relaxedWriteCounter();??
          ???//判斷數(shù)據(jù)多少??
          ???long?size?=?(tail?-?head);??
          ???if?(size?==?0)?{??
          ?????return;??
          ???}??
          ???do?{??
          ?????int?index?=?(int)?(head?&?SPACED_MASK);??
          ?????E?e?=?buffer.get(index);??
          ?????if?(e?==?null)?{??
          ???????//?not?published?yet??
          ???????break;??
          ?????}??
          ?????buffer.lazySet(index,?null);??
          ?????consumer.accept(e);??
          ?????//head也跟tail一樣,每次遞增16??
          ?????head?+=?OFFSET;??
          ???}?while?(head?!=?tail);??
          ???lazySetReadCounter(head);??
          ?}??

          注意,ring buffer的size(固定是16個)是不變的,變的是head和tail而已。

          總的來說ReadBuffer有如下特點:

          • 使用 Striped-RingBuffer來提升對buffer的讀寫

          • 用thread的hash來避開熱點key的競爭

          • 允許寫入的丟失

          WriteBuffer

          writeBuffer跟readBuffer不一樣,主要體現(xiàn)在使用場景的不一樣。本來緩存的一般場景是讀多寫少的,讀的并發(fā)會更高,且afterRead顯得沒那么重要,允許延遲甚至丟失。寫不一樣,寫afterWrite不允許丟失,且要求盡量馬上執(zhí)行。Caffeine使用MPSC(Multiple Producer / Single Consumer)作為buffer數(shù)組,實現(xiàn)在MpscGrowableArrayQueue類,它是仿照JCTools的MpscGrowableArrayQueue來寫的。

          MPSC允許無鎖的高并發(fā)寫入,但只允許一個消費者,同時也犧牲了部分操作。

          MPSC我打算另外分析,這里不展開了。

          TimerWheel

          除了支持expireAfterAccessexpireAfterWrite之外(Guava Cache也支持這兩個特性),Caffeine還支持expireAfter。因為expireAfterAccess和expireAfterWrite都只能是固定的過期時間,這可能滿足不了某些場景,譬如記錄的過期時間是需要根據(jù)某些條件而不一樣的,這就需要用戶自定義過期時間。

          先看看expireAfter的用法

          private?static?LoadingCache?cache?=?Caffeine.newBuilder()??
          ????????.maximumSize(256L)??
          ????????.initialCapacity(1)??
          ????????//.expireAfterAccess(2,?TimeUnit.DAYS)??
          ????????//.expireAfterWrite(2,?TimeUnit.HOURS)??
          ????????.refreshAfterWrite(1,?TimeUnit.HOURS)??
          ????????//自定義過期時間??
          ????????.expireAfter(new?Expiry()?{??
          ????????????//返回創(chuàng)建后的過期時間??
          ????????????@Override??
          ????????????public?long?expireAfterCreate(@NonNull?String?key,?@NonNull?String?value,?long?currentTime)?{??
          ????????????????return?0;??
          ????????????}??
          ??
          ????????????//返回更新后的過期時間??
          ????????????@Override??
          ????????????public?long?expireAfterUpdate(@NonNull?String?key,?@NonNull?String?value,?long?currentTime,?@NonNegative?long?currentDuration)?{??
          ????????????????return?0;??
          ????????????}??
          ??
          ????????????//返回讀取后的過期時間??
          ????????????@Override??
          ????????????public?long?expireAfterRead(@NonNull?String?key,?@NonNull?String?value,?long?currentTime,?@NonNegative?long?currentDuration)?{??
          ????????????????return?0;??
          ????????????}??
          ????????})??
          ????????.recordStats()??
          ????????.build(new?CacheLoader()?{??
          ????????????@Nullable??
          ????????????@Override??
          ????????????public?String?load(@NonNull?String?key)?throws?Exception?{??
          ????????????????return?"value_"?+?key;??
          ????????????}??
          ????????});??

          通過自定義過期時間,使得不同的key可以動態(tài)的得到不同的過期時間。

          注意,我把expireAfterAccess和expireAfterWrite注釋了,因為這兩個特性不能跟expireAfter一起使用。

          而當使用了expireAfter特性后,Caffeine會啟用一種叫“時間輪”的算法來實現(xiàn)這個功能。更多關(guān)于時間輪的介紹,可以看我的文章HashedWheelTimer時間輪原理分析。

          好,重點來了,為什么要用時間輪?

          對expireAfterAccess和expireAfterWrite的實現(xiàn)是用一個AccessOrderDeque雙端隊列,它是FIFO的,因為它們的過期時間是固定的,所以在隊列頭的數(shù)據(jù)肯定是最早過期的,要處理過期數(shù)據(jù)時,只需要首先看看頭部是否過期,然后再挨個檢查就可以了。但是,如果過期時間不一樣的話,這需要對accessOrderQueue進行排序&插入,這個代價太大了。于是,Caffeine用了一種更加高效、優(yōu)雅的算法-時間輪。

          時間輪的結(jié)構(gòu):

          圖片

          因為在我的對時間輪分析的文章里已經(jīng)說了時間輪的原理和機制了,所以我就不展開Caffeine對時間輪的實現(xiàn)了。

          Caffeine對時間輪的實現(xiàn)在TimerWheel,它是一種多層時間輪(hierarchical timing wheels )。

          看看元素加入到時間輪的schedule方法:

          /**??
          ?*?Schedules?a?timer?event?for?the?node.??
          ?*??
          ?*?@param?node?the?entry?in?the?cache??
          ?*/
          ??
          public?void?schedule(@NonNull?Node?node)?{??
          ??Node?sentinel?=?findBucket(node.getVariableTime());??
          ??link(sentinel,?node);??
          }??
          ??
          /**??
          ?*?Determines?the?bucket?that?the?timer?event?should?be?added?to.??
          ?*??
          ?*?@param?time?the?time?when?the?event?fires??
          ?*?@return?the?sentinel?at?the?head?of?the?bucket??
          ?*/
          ??
          Node?findBucket(long?time)?{??
          ??long?duration?=?time?-?nanos;??
          ??int?length?=?wheel.length?-?1;??
          ??for?(int?i?=?0;?i?????if?(duration?1])?{??
          ??????long?ticks?=?(time?>>>?SHIFT[i]);??
          ??????int?index?=?(int)?(ticks?&?(wheel[i].length?-?1));??
          ??????return?wheel[i][index];??
          ????}??
          ??}??
          ??return?wheel[length][0];??
          }??
          ??
          /**?Adds?the?entry?at?the?tail?of?the?bucket's?list.?*/??
          void?link(Node?sentinel,?Node?node)?{??
          ??node.setPreviousInVariableOrder(sentinel.getPreviousInVariableOrder());??
          ??node.setNextInVariableOrder(sentinel);??
          ??
          ??sentinel.getPreviousInVariableOrder().setNextInVariableOrder(node);??
          ??sentinel.setPreviousInVariableOrder(node);??
          }??

          其他

          Caffeine還有其他的優(yōu)化性能的手段,如使用軟引用和弱引用、消除偽共享、CompletableFuture異步等等。

          總結(jié)

          Caffeien是一個優(yōu)秀的本地緩存,通過使用W-TinyLFU算法, 高性能的readBuffer和WriteBuffer,時間輪算法等,使得它擁有高性能,高命中率(near optimal),低內(nèi)存占用等特點。

          參考資料

          TinyLFU論文

          Design Of A Modern Cache

          Design Of A Modern Cache—Part Deux

          Caffeine的github


          點個在看支持我吧,轉(zhuǎn)發(fā)就更好了
          瀏覽 56
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  伊人大香蕉在线观看视频 | 导航黄色一级片 | 日本欧美久久久久免费播放网 | 久热精品在线观看视频 | 精品久久ai |