<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Caffeine !你簡直太騷了!

          共 12314字,需瀏覽 25分鐘

           ·

          2021-01-15 08:11

          概要

          Caffeine[1]是一個高性能,高命中率,低內(nèi)存占用,near optimal 的本地緩存,簡單來說它是 Guava Cache 的優(yōu)化加強(qiáng)版,有些文章把 Caffeine 稱為“新一代的緩存”、“現(xiàn)代緩存之王”。

          本文將重點(diǎn)講解 Caffeine 的高性能設(shè)計(jì),以及對應(yīng)部分的源碼分析。

          與 Guava Cache 比較

          如果你對 Guava Cache 還不理解的話,可以點(diǎn)擊這里[2]來看一下我之前寫過關(guān)于 Guava Cache 的文章。

          大家都知道,Spring5 即將放棄掉 Guava Cache 作為緩存機(jī)制,而改用 Caffeine 作為新的本地 Cache 的組件,這對于 Caffeine 來說是一個很大的肯定。為什么 Spring 會這樣做呢?其實(shí)在 Caffeine 的Benchmarks[3]里給出了好靚仔的數(shù)據(jù),對讀和寫的場景,還有跟其他幾個緩存工具進(jìn)行了比較,Caffeine 的性能都表現(xiàn)很突出。

          使用 Caffeine

          Caffeine 為了方便大家使用以及從 Guava Cache 切換過來(很有針對性啊~),借鑒了 Guava Cache 大部分的概念(諸如核心概念Cache、LoadingCache、CacheLoader、CacheBuilder等等),對于 Caffeine 的理解只要把它當(dāng)作 Guava Cache 就可以了。

          使用上,大家只要把 Caffeine 的包引進(jìn)來,然后換一下 cache 的實(shí)現(xiàn)類,基本應(yīng)該就沒問題了。這對與已經(jīng)使用過 Guava Cache 的同學(xué)來說沒有任何難度,甚至還有一點(diǎn)熟悉的味道,如果你之前沒有使用過 Guava Cache,可以查看 Caffeine 的官方 API 說明文檔[4],其中Population,Eviction,Removal,RefreshStatistics,Cleanup,Policy等等這些特性都是跟 Guava Cache 基本一樣的。

          下面給出一個例子說明怎樣創(chuàng)建一個 Cache:

          private?static?LoadingCache?cache?=?Caffeine.newBuilder()
          ????????????//最大個數(shù)限制
          ????????????.maximumSize(256L)
          ????????????//初始化容量
          ????????????.initialCapacity(1)
          ????????????//訪問后過期(包括讀和寫)
          ????????????.expireAfterAccess(2,?TimeUnit.DAYS)
          ????????????//寫后過期
          ????????????.expireAfterWrite(2,?TimeUnit.HOURS)
          ????????????//寫后自動異步刷新
          ????????????.refreshAfterWrite(1,?TimeUnit.HOURS)
          ????????????//記錄下緩存的一些統(tǒng)計(jì)數(shù)據(jù),例如命中率等
          ????????????.recordStats()
          ????????????//cache對緩存寫的通知回調(diào)
          ????????????.writer(new?CacheWriter()?{
          ????????????????@Override
          ????????????????public?void?write(@NonNull?Object?key,?@NonNull?Object?value)?{
          ????????????????????log.info("key={},?CacheWriter?write",?key);
          ????????????????}

          ????????????????@Override
          ????????????????public?void?delete(@NonNull?Object?key,?@Nullable?Object?value,?@NonNull?RemovalCause?cause)?{
          ????????????????????log.info("key={},?cause={},?CacheWriter?delete",?key,?cause);
          ????????????????}
          ????????????})
          ????????????//使用CacheLoader創(chuàng)建一個LoadingCache
          ????????????.build(new?CacheLoader()?{
          ????????????????//同步加載數(shù)據(jù)
          ????????????????@Nullable
          ????????????????@Override
          ????????????????public?String?load(@NonNull?String?key)?throws?Exception?{
          ????????????????????return?"value_"?+?key;
          ????????????????}

          ????????????????//異步加載數(shù)據(jù)
          ????????????????@Nullable
          ????????????????@Override
          ????????????????public?String?reload(@NonNull?String?key,?@NonNull?String?oldValue)?throws?Exception?{
          ????????????????????return?"value_"?+?key;
          ????????????????}
          ????????????});

          更多從 Guava Cache 遷移過來的使用說明,請看這里[5]

          Caffeine 的高性能設(shè)計(jì)

          判斷一個緩存的好壞最核心的指標(biāo)就是命中率,影響緩存命中率有很多因素,包括業(yè)務(wù)場景、淘汰策略、清理策略、緩存容量等等。如果作為本地緩存, 它的性能的情況,資源的占用也都是一個很重要的指標(biāo)。下面

          我們來看看 Caffeine 在這幾個方面是怎么著手的,如何做優(yōu)化的。

          (注:本文不會分析 Caffeine 全部源碼,只會對核心設(shè)計(jì)的實(shí)現(xiàn)進(jìn)行分析,但我建議讀者把 Caffeine 的源碼都涉獵一下,有個 overview 才能更好理解本文。如果你看過 Guava Cache 的源碼也行,代碼的數(shù)據(jù)結(jié)構(gòu)和處理邏輯很類似的。

          源碼基于:caffeine-2.8.0.jar)

          W-TinyLFU 整體設(shè)計(jì)

          上面說到淘汰策略是影響緩存命中率的因素之一,一般比較簡單的緩存就會直接用到 LFU(Least Frequently Used,即最不經(jīng)常使用) 或者LRU(Least Recently Used,即最近最少使用) ,而 Caffeine 就是使用了 W-TinyLFU 算法。

          W-TinyLFU 看名字就能大概猜出來,它是 LFU 的變種,也是一種緩存淘汰算法。那為什么要使用 W-TinyLFU 呢?

          LRU 和 LFU 的缺點(diǎn)

          • LRU 實(shí)現(xiàn)簡單,在一般情況下能夠表現(xiàn)出很好的命中率,是一個“性價比”很高的算法,平時也很常用。雖然 LRU 對突發(fā)性的稀疏流量(sparse bursts)表現(xiàn)很好,但同時也會產(chǎn)生緩存污染,舉例來說,如果偶然性的要對全量數(shù)據(jù)進(jìn)行遍歷,那么“歷史訪問記錄”就會被刷走,造成污染。
          • 如果數(shù)據(jù)的分布在一段時間內(nèi)是固定的話,那么 LFU 可以達(dá)到最高的命中率。但是 LFU 有兩個缺點(diǎn),第一,它需要給每個記錄項(xiàng)維護(hù)頻率信息,每次訪問都需要更新,這是個巨大的開銷;第二,對突發(fā)性的稀疏流量無力,因?yàn)榍捌诮?jīng)常訪問的記錄已經(jīng)占用了緩存,偶然的流量不太可能會被保留下來,而且過去的一些大量被訪問的記錄在將來也不一定會使用上,這樣就一直把“坑”占著了。

          無論 LRU 還是 LFU 都有其各自的缺點(diǎn),不過,現(xiàn)在已經(jīng)有很多針對其缺點(diǎn)而改良、優(yōu)化出來的變種算法。

          TinyLFU

          TinyLFU 就是其中一個優(yōu)化算法,它是專門為了解決 LFU 上述提到的兩個問題而被設(shè)計(jì)出來的。

          解決第一個問題是采用了 Count–Min Sketch 算法。

          解決第二個問題是讓記錄盡量保持相對的“新鮮”(Freshness Mechanism),并且當(dāng)有新的記錄插入時,可以讓它跟老的記錄進(jìn)行“PK”,輸者就會被淘汰,這樣一些老的、不再需要的記錄就會被剔除。

          下圖是 TinyLFU 設(shè)計(jì)圖(來自官方)

          統(tǒng)計(jì)頻率 Count–Min Sketch 算法

          如何對一個 key 進(jìn)行統(tǒng)計(jì),但又可以節(jié)省空間呢?(不是簡單的使用HashMap,這太消耗內(nèi)存了),注意哦,不需要精確的統(tǒng)計(jì),只需要一個近似值就可以了,怎么樣,這樣場景是不是很熟悉,如果你是老司機(jī),或許已經(jīng)聯(lián)想到布隆過濾器(Bloom Filter)的應(yīng)用了。

          沒錯,將要介紹的 Count–Min Sketch 的原理跟 Bloom Filter 一樣,只不過 Bloom Filter 只有 0 和 1 的值,那么你可以把 Count–Min Sketch 看作是“數(shù)值”版的 Bloom Filter。

          更多關(guān)于 Count–Min Sketch 的介紹請自行搜索。

          在 TinyLFU 中,近似頻率的統(tǒng)計(jì)如下圖所示:

          對一個 key 進(jìn)行多次 hash 函數(shù)后,index 到多個數(shù)組位置后進(jìn)行累加,查詢時取多個值中的最小值即可。

          Caffeine 對這個算法的實(shí)現(xiàn)在FrequencySketch類。但 Caffeine 對此有進(jìn)一步的優(yōu)化,例如 Count–Min Sketch 使用了二維數(shù)組,Caffeine 只是用了一個一維的數(shù)組;再者,如果是數(shù)值類型的話,這個數(shù)需要用 int 或 long 來存儲,但是 Caffeine 認(rèn)為緩存的訪問頻率不需要用到那么大,只需要 15 就足夠,一般認(rèn)為達(dá)到 15 次的頻率算是很高的了,而且 Caffeine 還有另外一個機(jī)制來使得這個頻率進(jìn)行衰退減半(下面就會講到)。如果最大是 15 的話,那么只需要 4 個 bit 就可以滿足了,一個 long 有 64bit,可以存儲 16 個這樣的統(tǒng)計(jì)數(shù),Caffeine 就是這樣的設(shè)計(jì),使得存儲效率提高了 16 倍。

          Caffeine 對緩存的讀寫(afterReadafterWrite方法)都會調(diào)用onAccesss 方法,而onAccess方法里有一句:

          frequencySketch().increment(key);

          這句就是追加記錄的頻率,下面我們看看具體實(shí)現(xiàn)

          //FrequencySketch的一些屬性

          //種子數(shù)
          static?final?long[]?SEED?=?{?//?A?mixture?of?seeds?from?FNV-1a,?CityHash,?and?Murmur3
          ????0xc3a5c85c97cb3127L,?0xb492b66fbe98f273L,?0x9ae16a3b2f90404fL,?0xcbf29ce484222325L};
          static?final?long?RESET_MASK?=?0x7777777777777777L;
          static?final?long?ONE_MASK?=?0x1111111111111111L;

          int?sampleSize;
          //為了快速根據(jù)hash值得到table的index值的掩碼
          //table的長度size一般為2的n次方,而tableMask為size-1,這樣就可以通過&操作來模擬取余操作,速度快很多,老司機(jī)都知道
          int?tableMask;
          //存儲數(shù)據(jù)的一維long數(shù)組
          long[]?table;
          int?size;

          /**
          ?*?Increments?the?popularity?of?the?element?if?it?does?not?exceed?the?maximum?(15).?The?popularity
          ?*?of?all?elements?will?be?periodically?down?sampled?when?the?observed?events?exceeds?a?threshold.
          ?*?This?process?provides?a?frequency?aging?to?allow?expired?long?term?entries?to?fade?away.
          ?*
          ?*?@param?e?the?element?to?add
          ?*/

          public?void?increment(@NonNull?E?e)?{
          ??if?(isNotInitialized())?{
          ????return;
          ??}

          ??//根據(jù)key的hashCode通過一個哈希函數(shù)得到一個hash值
          ??//本來就是hashCode了,為什么還要再做一次hash?怕原來的hashCode不夠均勻分散,再打散一下。
          ??int?hash?=?spread(e.hashCode());
          ??//這句光看有點(diǎn)難理解
          ??//就如我剛才說的,Caffeine把一個long的64bit劃分成16個等分,每一等分4個bit。
          ??//這個start就是用來定位到是哪一個等分的,用hash值低兩位作為隨機(jī)數(shù),再左移2位,得到一個小于16的值
          ??int?start?=?(hash?&?3)?<2;

          ??//indexOf方法的意思就是,根據(jù)hash值和不同種子得到table的下標(biāo)index
          ??//這里通過四個不同的種子,得到四個不同的下標(biāo)index
          ??int?index0?=?indexOf(hash,?0);
          ??int?index1?=?indexOf(hash,?1);
          ??int?index2?=?indexOf(hash,?2);
          ??int?index3?=?indexOf(hash,?3);

          ??//根據(jù)index和start(+1,?+2,?+3)的值,把table[index]對應(yīng)的等分追加1
          ??//這個incrementAt方法有點(diǎn)難理解,看我下面的解釋
          ??boolean?added?=?incrementAt(index0,?start);
          ??added?|=?incrementAt(index1,?start?+?1);
          ??added?|=?incrementAt(index2,?start?+?2);
          ??added?|=?incrementAt(index3,?start?+?3);

          ??//這個reset等下說
          ??if?(added?&&?(++size?==?sampleSize))?{
          ????reset();
          ??}
          }

          /**
          ?*?Increments?the?specified?counter?by?1?if?it?is?not?already?at?the?maximum?value?(15).
          ?*
          ?*?@param?i?the?table?index?(16?counters)
          ?*?@param?j?the?counter?to?increment
          ?*?@return?if?incremented
          ?*/

          boolean?incrementAt(int?i,?int?j)?{
          ??//這個j表示16個等分的下標(biāo),那么offset就是相當(dāng)于在64位中的下標(biāo)(這個自己想想)
          ??int?offset?=?j?<2;
          ??//上面提到Caffeine把頻率統(tǒng)計(jì)最大定為15,即0xfL
          ??//mask就是在64位中的掩碼,即1111后面跟很多個0
          ??long?mask?=?(0xfL?<??//如果&的結(jié)果不等于15,那么就追加1。等于15就不會再加了
          ??if?((table[i]?&?mask)?!=?mask)?{
          ????table[i]?+=?(1L?<????return?true;
          ??}
          ??return?false;
          }

          /**
          ?*?Returns?the?table?index?for?the?counter?at?the?specified?depth.
          ?*
          ?*?@param?item?the?element's?hash
          ?*?@param?i?the?counter?depth
          ?*?@return?the?table?index
          ?*/

          int?indexOf(int?item,?int?i)?{
          ??long?hash?=?SEED[i]?*?item;
          ??hash?+=?hash?>>>?32;
          ??return?((int)?hash)?&?tableMask;
          }

          /**
          ?*?Applies?a?supplemental?hash?function?to?a?given?hashCode,?which?defends?against?poor?quality
          ?*?hash?functions.
          ?*/

          int?spread(int?x)?{
          ??x?=?((x?>>>?16)?^?x)?*?0x45d9f3b;
          ??x?=?((x?>>>?16)?^?x)?*?0x45d9f3b;
          ??return?(x?>>>?16)?^?x;
          }

          知道了追加方法,那么讀取方法frequency就很容易理解了。

          /**
          ?*?Returns?the?estimated?number?of?occurrences?of?an?element,?up?to?the?maximum?(15).
          ?*
          ?*?@param?e?the?element?to?count?occurrences?of
          ?*?@return?the?estimated?number?of?occurrences?of?the?element;?possibly?zero?but?never?negative
          ?*/

          @NonNegative
          public?int?frequency(@NonNull?E?e)?{
          ??if?(isNotInitialized())?{
          ????return?0;
          ??}

          ??//得到hash值,跟上面一樣
          ??int?hash?=?spread(e.hashCode());
          ??//得到等分的下標(biāo),跟上面一樣
          ??int?start?=?(hash?&?3)?<2;
          ??int?frequency?=?Integer.MAX_VALUE;
          ??//循環(huán)四次,分別獲取在table數(shù)組中不同的下標(biāo)位置
          ??for?(int?i?=?0;?i?4;?i++)?{
          ????int?index?=?indexOf(hash,?i);
          ????//這個操作就不多說了,其實(shí)跟上面incrementAt是一樣的,定位到table[index]?+?等分的位置,再根據(jù)mask取出計(jì)數(shù)值
          ????int?count?=?(int)?((table[index]?>>>?((start?+?i)?<2))?&?0xfL);
          ????//取四個中的較小值
          ????frequency?=?Math.min(frequency,?count);
          ??}
          ??return?frequency;
          }

          通過代碼和注釋或者讀者可能難以理解,下圖是我畫出來幫助大家理解的結(jié)構(gòu)圖。

          注意紫色虛線框,其中藍(lán)色小格就是需要計(jì)算的位置:


          保新機(jī)制

          為了讓緩存保持“新鮮”,剔除掉過往頻率很高但之后不經(jīng)常的緩存,Caffeine 有一個 Freshness Mechanism。做法很簡答,就是當(dāng)整體的統(tǒng)計(jì)計(jì)數(shù)(當(dāng)前所有記錄的頻率統(tǒng)計(jì)之和,這個數(shù)值內(nèi)部維護(hù))達(dá)到某一個值時,那么所有記錄的頻率統(tǒng)計(jì)除以 2。

          從上面的代碼

          //size變量就是所有記錄的頻率統(tǒng)計(jì)之,即每個記錄加1,這個size都會加1
          //sampleSize一個閾值,從FrequencySketch初始化可以看到它的值為maximumSize的10倍
          if?(added?&&?(++size?==?sampleSize))?{
          ??????reset();
          }

          看到reset方法就是做這個事情

          /**?Reduces?every?counter?by?half?of?its?original?value.?*/
          void?reset()?{
          ??int?count?=?0;
          ??for?(int?i?=?0;?i?????count?+=?Long.bitCount(table[i]?&?ONE_MASK);
          ????table[i]?=?(table[i]?>>>?1)?&?RESET_MASK;
          ??}
          ??size?=?(size?>>>?1)?-?(count?>>>?2);
          }

          關(guān)于這個 reset 方法,為什么是除以 2,而不是其他,及其正確性,在最下面的參考資料的 TinyLFU 論文中 3.3 章節(jié)給出了數(shù)學(xué)證明,大家有興趣可以看看。

          增加一個 Window?

          Caffeine 通過測試發(fā)現(xiàn) TinyLFU 在面對突發(fā)性的稀疏流量(sparse bursts)時表現(xiàn)很差,因?yàn)樾碌挠涗洠╪ew items)還沒來得及建立足夠的頻率就被剔除出去了,這就使得命中率下降。

          于是 Caffeine 設(shè)計(jì)出一種新的 policy,即 Window Tiny LFU(W-TinyLFU),并通過實(shí)驗(yàn)和實(shí)踐發(fā)現(xiàn) W-TinyLFU 比 TinyLFU 表現(xiàn)的更好。

          W-TinyLFU 的設(shè)計(jì)如下所示(兩圖等價):

          它主要包括兩個緩存模塊,主緩存是 SLRU(Segmented LRU,即分段 LRU),SLRU 包括一個名為 protected 和一個名為 probation 的緩存區(qū)。通過增加一個緩存區(qū)(即 Window Cache),當(dāng)有新的記錄插入時,會先在 window 區(qū)呆一下,就可以避免上述說的 sparse bursts 問題。

          淘汰策略(eviction policy)

          當(dāng) window 區(qū)滿了,就會根據(jù) LRU 把 candidate(即淘汰出來的元素)放到 probation 區(qū),如果 probation 區(qū)也滿了,就把 candidate 和 probation 將要淘汰的元素 victim,兩個進(jìn)行“PK”,勝者留在 probation,輸者就要被淘汰了。

          而且經(jīng)過實(shí)驗(yàn)發(fā)現(xiàn)當(dāng) window 區(qū)配置為總?cè)萘康?1%,剩余的 99%當(dāng)中的 80%分給 protected 區(qū),20%分給 probation 區(qū)時,這時整體性能和命中率表現(xiàn)得最好,所以 Caffeine 默認(rèn)的比例設(shè)置就是這個。

          不過這個比例 Caffeine 會在運(yùn)行時根據(jù)統(tǒng)計(jì)數(shù)據(jù)(statistics)去動態(tài)調(diào)整,如果你的應(yīng)用程序的緩存隨著時間變化比較快的話,那么增加 window 區(qū)的比例可以提高命中率,相反緩存都是比較固定不變的話,增加 Main Cache 區(qū)(protected 區(qū) +probation 區(qū))的比例會有較好的效果。

          下面我們看看上面說到的淘汰策略是怎么實(shí)現(xiàn)的:

          一般緩存對讀寫操作后都有后續(xù)的一系列“維護(hù)”操作,Caffeine 也不例外,這些操作都在maintenance方法,我們將要說到的淘汰策略也在里面。

          這方法比較重要,下面也會提到,所以這里只先說跟“淘汰策略”有關(guān)的evictEntriesclimb。

          /**
          ???*?Performs?the?pending?maintenance?work?and?sets?the?state?flags?during?processing?to?avoid
          ???*?excess?scheduling?attempts.?The?read?buffer,?write?buffer,?and?reference?queues?are
          ???*?drained,?followed?by?expiration,?and?size-based?eviction.
          ???*
          ???*?@param?task?an?additional?pending?task?to?run,?or?{@code?null}?if?not?present
          ???*/

          ??@GuardedBy("evictionLock")
          ??void?maintenance(@Nullable?Runnable?task)?{
          ????lazySetDrainStatus(PROCESSING_TO_IDLE);

          ????try?{
          ??????drainReadBuffer();

          ??????drainWriteBuffer();
          ??????if?(task?!=?null)?{
          ????????task.run();
          ??????}

          ??????drainKeyReferences();
          ??????drainValueReferences();

          ??????expireEntries();
          ??????//把符合條件的記錄淘汰掉
          ??????evictEntries();
          ??????//動態(tài)調(diào)整window區(qū)和protected區(qū)的大小
          ??????climb();
          ????}?finally?{
          ??????if?((drainStatus()?!=?PROCESSING_TO_IDLE)?||?!casDrainStatus(PROCESSING_TO_IDLE,?IDLE))?{
          ????????lazySetDrainStatus(REQUIRED);
          ??????}
          ????}
          ??}
          先說一下 Caffeine 對上面說到的 W-TinyLFU 策略的實(shí)現(xiàn)用到的數(shù)據(jù)結(jié)構(gòu):
          //最大的個數(shù)限制
          long?maximum;
          //當(dāng)前的個數(shù)
          long?weightedSize;
          //window區(qū)的最大限制
          long?windowMaximum;
          //window區(qū)當(dāng)前的個數(shù)
          long?windowWeightedSize;
          //protected區(qū)的最大限制
          long?mainProtectedMaximum;
          //protected區(qū)當(dāng)前的個數(shù)
          long?mainProtectedWeightedSize;
          //下一次需要調(diào)整的大?。ㄟ€需要進(jìn)一步計(jì)算)
          double?stepSize;
          //window區(qū)需要調(diào)整的大小
          long?adjustment;
          //命中計(jì)數(shù)
          int?hitsInSample;
          //不命中的計(jì)數(shù)
          int?missesInSample;
          //上一次的緩存命中率
          double?previousSampleHitRate;

          final?FrequencySketch?sketch;
          //window區(qū)的LRU?queue(FIFO)
          final?AccessOrderDeque>?accessOrderWindowDeque;
          //probation區(qū)的LRU?queue(FIFO)
          final?AccessOrderDeque>?accessOrderProbationDeque;
          //protected區(qū)的LRU?queue(FIFO)
          final?AccessOrderDeque>?accessOrderProtectedDeque;

          以及默認(rèn)比例設(shè)置(意思看注釋)

          /**?The?initial?percent?of?the?maximum?weighted?capacity?dedicated?to?the?main?space.?*/
          static?final?double?PERCENT_MAIN?=?0.99d;
          /**?The?percent?of?the?maximum?weighted?capacity?dedicated?to?the?main's?protected?space.?*/
          static?final?double?PERCENT_MAIN_PROTECTED?=?0.80d;
          /**?The?difference?in?hit?rates?that?restarts?the?climber.?*/
          static?final?double?HILL_CLIMBER_RESTART_THRESHOLD?=?0.05d;
          /**?The?percent?of?the?total?size?to?adapt?the?window?by.?*/
          static?final?double?HILL_CLIMBER_STEP_PERCENT?=?0.0625d;
          /**?The?rate?to?decrease?the?step?size?to?adapt?by.?*/
          static?final?double?HILL_CLIMBER_STEP_DECAY_RATE?=?0.98d;
          /**?The?maximum?number?of?entries?that?can?be?transfered?between?queues.?*/

          重點(diǎn)來了,evictEntriesclimb方法:

          /**?Evicts?entries?if?the?cache?exceeds?the?maximum.?*/
          @GuardedBy("evictionLock")
          void?evictEntries()?{
          ??if?(!evicts())?{
          ????return;
          ??}
          ??//淘汰window區(qū)的記錄
          ??int?candidates?=?evictFromWindow();
          ??//淘汰Main區(qū)的記錄
          ??evictFromMain(candidates);
          }

          /**
          ?*?Evicts?entries?from?the?window?space?into?the?main?space?while?the?window?size?exceeds?a
          ?*?maximum.
          ?*
          ?*?@return?the?number?of?candidate?entries?evicted?from?the?window?space
          ?*/

          //根據(jù)W-TinyLFU,新的數(shù)據(jù)都會無條件的加到admission?window
          //但是window是有大小限制,所以要“定期”做一下“維護(hù)”
          @GuardedBy("evictionLock")
          int?evictFromWindow()?{
          ??int?candidates?=?0;
          ??//查看window?queue的頭部節(jié)點(diǎn)
          ??Node?node?=?accessOrderWindowDeque().peek();
          ??//如果window區(qū)超過了最大的限制,那么就要把“多出來”的記錄做處理
          ??while?(windowWeightedSize()?>?windowMaximum())?{
          ????//?The?pending?operations?will?adjust?the?size?to?reflect?the?correct?weight
          ????if?(node?==?null)?{
          ??????break;
          ????}
          ????//下一個節(jié)點(diǎn)
          ????Node?next?=?node.getNextInAccessOrder();
          ????if?(node.getWeight()?!=?0)?{
          ??????//把node定位在probation區(qū)
          ??????node.makeMainProbation();
          ??????//從window區(qū)去掉
          ??????accessOrderWindowDeque().remove(node);
          ??????//加入到probation?queue,相當(dāng)于把節(jié)點(diǎn)移動到probation區(qū)(晉升了)
          ??????accessOrderProbationDeque().add(node);
          ??????candidates++;
          ??????//因?yàn)橐瞥艘粋€節(jié)點(diǎn),所以需要調(diào)整window的size
          ??????setWindowWeightedSize(windowWeightedSize()?-?node.getPolicyWeight());
          ????}
          ????//處理下一個節(jié)點(diǎn)
          ????node?=?next;
          ??}

          ??return?candidates;
          }

          evictFromMain方法:

          /**
          ?*?Evicts?entries?from?the?main?space?if?the?cache?exceeds?the?maximum?capacity.?The?main?space
          ?*?determines?whether?admitting?an?entry?(coming?from?the?window?space)?is?preferable?to?retaining
          ?*?the?eviction?policy's?victim.?This?is?decision?is?made?using?a?frequency?filter?so?that?the
          ?*?least?frequently?used?entry?is?removed.
          ?*
          ?*?The?window?space?candidates?were?previously?placed?in?the?MRU?position?and?the?eviction
          ?*?policy's?victim?is?at?the?LRU?position.?The?two?ends?of?the?queue?are?evaluated?while?an
          ?*?eviction?is?required.?The?number?of?remaining?candidates?is?provided?and?decremented?on
          ?*?eviction,?so?that?when?there?are?no?more?candidates?the?victim?is?evicted.
          ?*
          ?*?@param?candidates?the?number?of?candidate?entries?evicted?from?the?window?space
          ?*/

          //根據(jù)W-TinyLFU,從window晉升過來的要跟probation區(qū)的進(jìn)行“PK”,勝者才能留下
          @GuardedBy("evictionLock")
          void?evictFromMain(int?candidates)?{
          ??int?victimQueue?=?PROBATION;
          ??//victim是probation?queue的頭部
          ??Node?victim?=?accessOrderProbationDeque().peekFirst();
          ??//candidate是probation?queue的尾部,也就是剛從window晉升來的
          ??Node?candidate?=?accessOrderProbationDeque().peekLast();
          ??//當(dāng)cache不夠容量時才做處理
          ??while?(weightedSize()?>?maximum())?{
          ????//?Stop?trying?to?evict?candidates?and?always?prefer?the?victim
          ????if?(candidates?==?0)?{
          ??????candidate?=?null;
          ????}

          ????//對candidate為null且victim為bull的處理
          ????if?((candidate?==?null)?&&?(victim?==?null))?{
          ??????if?(victimQueue?==?PROBATION)?{
          ????????victim?=?accessOrderProtectedDeque().peekFirst();
          ????????victimQueue?=?PROTECTED;
          ????????continue;
          ??????}?else?if?(victimQueue?==?PROTECTED)?{
          ????????victim?=?accessOrderWindowDeque().peekFirst();
          ????????victimQueue?=?WINDOW;
          ????????continue;
          ??????}

          ??????//?The?pending?operations?will?adjust?the?size?to?reflect?the?correct?weight
          ??????break;
          ????}

          ????//對節(jié)點(diǎn)的weight為0的處理
          ????if?((victim?!=?null)?&&?(victim.getPolicyWeight()?==?0))?{
          ??????victim?=?victim.getNextInAccessOrder();
          ??????continue;
          ????}?else?if?((candidate?!=?null)?&&?(candidate.getPolicyWeight()?==?0))?{
          ??????candidate?=?candidate.getPreviousInAccessOrder();
          ??????candidates--;
          ??????continue;
          ????}

          ????//?Evict?immediately?if?only?one?of?the?entries?is?present
          ????if?(victim?==?null)?{
          ??????@SuppressWarnings("NullAway")
          ??????Node?previous?=?candidate.getPreviousInAccessOrder();
          ??????Node?evict?=?candidate;
          ??????candidate?=?previous;
          ??????candidates--;
          ??????evictEntry(evict,?RemovalCause.SIZE,?0L);
          ??????continue;
          ????}?else?if?(candidate?==?null)?{
          ??????Node?evict?=?victim;
          ??????victim?=?victim.getNextInAccessOrder();
          ??????evictEntry(evict,?RemovalCause.SIZE,?0L);
          ??????continue;
          ????}

          ????//?Evict?immediately?if?an?entry?was?collected
          ????K?victimKey?=?victim.getKey();
          ????K?candidateKey?=?candidate.getKey();
          ????if?(victimKey?==?null)?{
          ??????@NonNull?Node?evict?=?victim;
          ??????victim?=?victim.getNextInAccessOrder();
          ??????evictEntry(evict,?RemovalCause.COLLECTED,?0L);
          ??????continue;
          ????}?else?if?(candidateKey?==?null)?{
          ??????candidates--;
          ??????@NonNull?Node?evict?=?candidate;
          ??????candidate?=?candidate.getPreviousInAccessOrder();
          ??????evictEntry(evict,?RemovalCause.COLLECTED,?0L);
          ??????continue;
          ????}

          ????//放不下的節(jié)點(diǎn)直接處理掉
          ????if?(candidate.getPolicyWeight()?>?maximum())?{
          ??????candidates--;
          ??????Node?evict?=?candidate;
          ??????candidate?=?candidate.getPreviousInAccessOrder();
          ??????evictEntry(evict,?RemovalCause.SIZE,?0L);
          ??????continue;
          ????}

          ????//根據(jù)節(jié)點(diǎn)的統(tǒng)計(jì)頻率frequency來做比較,看看要處理掉victim還是candidate
          ????//admit是具體的比較規(guī)則,看下面
          ????candidates--;
          ????//如果candidate勝出則淘汰victim
          ????if?(admit(candidateKey,?victimKey))?{
          ??????Node?evict?=?victim;
          ??????victim?=?victim.getNextInAccessOrder();
          ??????evictEntry(evict,?RemovalCause.SIZE,?0L);
          ??????candidate?=?candidate.getPreviousInAccessOrder();
          ????}?else?{
          ??????//如果是victim勝出,則淘汰candidate
          ??????Node?evict?=?candidate;
          ??????candidate?=?candidate.getPreviousInAccessOrder();
          ??????evictEntry(evict,?RemovalCause.SIZE,?0L);
          ????}
          ??}
          }

          /**
          ?*?Determines?if?the?candidate?should?be?accepted?into?the?main?space,?as?determined?by?its
          ?*?frequency?relative?to?the?victim.?A?small?amount?of?randomness?is?used?to?protect?against?hash
          ?*?collision?attacks,?where?the?victim's?frequency?is?artificially?raised?so?that?no?new?entries
          ?*?are?admitted.
          ?*
          ?*?@param?candidateKey?the?key?for?the?entry?being?proposed?for?long?term?retention
          ?*?@param?victimKey?the?key?for?the?entry?chosen?by?the?eviction?policy?for?replacement
          ?*?@return?if?the?candidate?should?be?admitted?and?the?victim?ejected
          ?*/

          @GuardedBy("evictionLock")
          boolean?admit(K?candidateKey,?K?victimKey)?{
          ??//分別獲取victim和candidate的統(tǒng)計(jì)頻率
          ??//frequency這個方法的原理和實(shí)現(xiàn)上面已經(jīng)解釋了
          ??int?victimFreq?=?frequencySketch().frequency(victimKey);
          ??int?candidateFreq?=?frequencySketch().frequency(candidateKey);
          ??//誰大誰贏
          ??if?(candidateFreq?>?victimFreq)?{
          ????return?true;

          ????//如果相等,candidate小于5都當(dāng)輸了
          ??}?else?if?(candidateFreq?<=?5)?{
          ????//?The?maximum?frequency?is?15?and?halved?to?7?after?a?reset?to?age?the?history.?An?attack
          ????//?exploits?that?a?hot?candidate?is?rejected?in?favor?of?a?hot?victim.?The?threshold?of?a?warm
          ????//?candidate?reduces?the?number?of?random?acceptances?to?minimize?the?impact?on?the?hit?rate.
          ????return?false;
          ??}
          ??//如果相等且candidate大于5,則隨機(jī)淘汰一個
          ??int?random?=?ThreadLocalRandom.current().nextInt();
          ??return?((random?&?127)?==?0);
          }

          climb方法主要是用來調(diào)整 window size 的,使得 Caffeine 可以適應(yīng)你的應(yīng)用類型(如 OLAP 或 OLTP)表現(xiàn)出最佳的命中率。

          下圖是官方測試的數(shù)據(jù):

          我們看看 window size 的調(diào)整是怎么實(shí)現(xiàn)的。

          調(diào)整時用到的默認(rèn)比例數(shù)據(jù):

          //與上次命中率之差的閾值
          static?final?double?HILL_CLIMBER_RESTART_THRESHOLD?=?0.05d;
          //步長(調(diào)整)的大?。ǜ畲笾祄aximum的比例)
          static?final?double?HILL_CLIMBER_STEP_PERCENT?=?0.0625d;
          //步長的衰減比例
          static?final?double?HILL_CLIMBER_STEP_DECAY_RATE?=?0.98d;
          ??/**?Adapts?the?eviction?policy?to?towards?the?optimal?recency?/?frequency?configuration.?*/
          //climb方法的主要作用就是動態(tài)調(diào)整window區(qū)的大?。ㄏ鄳?yīng)的,main區(qū)的大小也會發(fā)生變化,兩個之和為100%)。
          //因?yàn)閰^(qū)域的大小發(fā)生了變化,那么區(qū)域內(nèi)的數(shù)據(jù)也可能需要發(fā)生相應(yīng)的移動。
          @GuardedBy("evictionLock")
          void?climb()?{
          ??if?(!evicts())?{
          ????return;
          ??}
          ??//確定window需要調(diào)整的大小
          ??determineAdjustment();
          ??//如果protected區(qū)有溢出,把溢出部分移動到probation區(qū)。因?yàn)橄旅娴牟僮饔锌赡苄枰{(diào)整到protected區(qū)。
          ??demoteFromMainProtected();
          ??long?amount?=?adjustment();
          ??if?(amount?==?0)?{
          ????return;
          ??}?else?if?(amount?>?0)?{
          ????//增加window的大小
          ????increaseWindow();
          ??}?else?{
          ????//減少window的大小
          ????decreaseWindow();
          ??}
          }

          下面分別展開每個方法來解釋:

          /**?Calculates?the?amount?to?adapt?the?window?by?and?sets?{@link?#adjustment()}?accordingly.?*/
          @GuardedBy("evictionLock")
          void?determineAdjustment()?{
          ??//如果frequencySketch還沒初始化,則返回
          ??if?(frequencySketch().isNotInitialized())?{
          ????setPreviousSampleHitRate(0.0);
          ????setMissesInSample(0);
          ????setHitsInSample(0);
          ????return;
          ??}
          ??//總請求量?=?命中?+?miss
          ??int?requestCount?=?hitsInSample()?+?missesInSample();
          ??//沒達(dá)到sampleSize則返回
          ??//默認(rèn)下sampleSize = 10?* maximum。用sampleSize來判斷緩存是否足夠”熱“。
          ??if?(requestCount?????return;
          ??}

          ??//命中率的公式?=?命中?/?總請求
          ??double?hitRate?=?(double)?hitsInSample()?/?requestCount;
          ??//命中率的差值
          ??double?hitRateChange?=?hitRate?-?previousSampleHitRate();
          ??//本次調(diào)整的大小,是由命中率的差值和上次的stepSize決定的
          ??double?amount?=?(hitRateChange?>=?0)???stepSize()?:?-stepSize();
          ??//下次的調(diào)整大?。喝绻新实闹畲笥?.05,則重置為0.065 * maximum,否則按照0.98來進(jìn)行衰減
          ??double?nextStepSize?=?(Math.abs(hitRateChange)?>=?HILL_CLIMBER_RESTART_THRESHOLD)
          ????????HILL_CLIMBER_STEP_PERCENT?*?maximum()?*?(amount?>=?0???1?:?-1)
          ??????:?HILL_CLIMBER_STEP_DECAY_RATE?*?amount;
          ??setPreviousSampleHitRate(hitRate);
          ??setAdjustment((long)?amount);
          ??setStepSize(nextStepSize);
          ??setMissesInSample(0);
          ??setHitsInSample(0);
          }

          /**?Transfers?the?nodes?from?the?protected?to?the?probation?region?if?it?exceeds?the?maximum.?*/

          //這個方法比較簡單,減少protected區(qū)溢出的部分
          @GuardedBy("evictionLock")
          void?demoteFromMainProtected()?{
          ??long?mainProtectedMaximum?=?mainProtectedMaximum();
          ??long?mainProtectedWeightedSize?=?mainProtectedWeightedSize();
          ??if?(mainProtectedWeightedSize?<=?mainProtectedMaximum)?{
          ????return;
          ??}

          ??for?(int?i?=?0;?i?????if?(mainProtectedWeightedSize?<=?mainProtectedMaximum)?{
          ??????break;
          ????}

          ????Node?demoted?=?accessOrderProtectedDeque().poll();
          ????if?(demoted?==?null)?{
          ??????break;
          ????}
          ????demoted.makeMainProbation();
          ????accessOrderProbationDeque().add(demoted);
          ????mainProtectedWeightedSize?-=?demoted.getPolicyWeight();
          ??}
          ??setMainProtectedWeightedSize(mainProtectedWeightedSize);
          }

          /**
          ?*?Increases?the?size?of?the?admission?window?by?shrinking?the?portion?allocated?to?the?main
          ?*?space.?As?the?main?space?is?partitioned?into?probation?and?protected?regions?(80%?/?20%),?for
          ?*?simplicity?only?the?protected?is?reduced.?If?the?regions?exceed?their?maximums,?this?may?cause
          ?*?protected?items?to?be?demoted?to?the?probation?region?and?probation?items?to?be?demoted?to?the
          ?*?admission?window.
          ?*/


          //增加window區(qū)的大小,這個方法比較簡單,思路就像我上面說的
          @GuardedBy("evictionLock")
          void?increaseWindow()?{
          ??if?(mainProtectedMaximum()?==?0)?{
          ????return;
          ??}

          ??long?quota?=?Math.min(adjustment(),?mainProtectedMaximum());
          ??setMainProtectedMaximum(mainProtectedMaximum()?-?quota);
          ??setWindowMaximum(windowMaximum()?+?quota);
          ??demoteFromMainProtected();

          ??for?(int?i?=?0;?i?????Node?candidate?=?accessOrderProbationDeque().peek();
          ????boolean?probation?=?true;
          ????if?((candidate?==?null)?||?(quota???????candidate?=?accessOrderProtectedDeque().peek();
          ??????probation?=?false;
          ????}
          ????if?(candidate?==?null)?{
          ??????break;
          ????}

          ????int?weight?=?candidate.getPolicyWeight();
          ????if?(quota???????break;
          ????}

          ????quota?-=?weight;
          ????if?(probation)?{
          ??????accessOrderProbationDeque().remove(candidate);
          ????}?else?{
          ??????setMainProtectedWeightedSize(mainProtectedWeightedSize()?-?weight);
          ??????accessOrderProtectedDeque().remove(candidate);
          ????}
          ????setWindowWeightedSize(windowWeightedSize()?+?weight);
          ????accessOrderWindowDeque().add(candidate);
          ????candidate.makeWindow();
          ??}

          ??setMainProtectedMaximum(mainProtectedMaximum()?+?quota);
          ??setWindowMaximum(windowMaximum()?-?quota);
          ??setAdjustment(quota);
          }

          /**?Decreases?the?size?of?the?admission?window?and?increases?the?main's?protected?region.?*/
          //同上increaseWindow差不多,反操作
          @GuardedBy("evictionLock")
          void?decreaseWindow()?{
          ??if?(windowMaximum()?<=?1)?{
          ????return;
          ??}

          ??long?quota?=?Math.min(-adjustment(),?Math.max(0,?windowMaximum()?-?1));
          ??setMainProtectedMaximum(mainProtectedMaximum()?+?quota);
          ??setWindowMaximum(windowMaximum()?-?quota);

          ??for?(int?i?=?0;?i?????Node?candidate?=?accessOrderWindowDeque().peek();
          ????if?(candidate?==?null)?{
          ??????break;
          ????}

          ????int?weight?=?candidate.getPolicyWeight();
          ????if?(quota???????break;
          ????}

          ????quota?-=?weight;
          ????setMainProtectedWeightedSize(mainProtectedWeightedSize()?+?weight);
          ????setWindowWeightedSize(windowWeightedSize()?-?weight);
          ????accessOrderWindowDeque().remove(candidate);
          ????accessOrderProbationDeque().add(candidate);
          ????candidate.makeMainProbation();
          ??}

          ??setMainProtectedMaximum(mainProtectedMaximum()?-?quota);
          ??setWindowMaximum(windowMaximum()?+?quota);
          ??setAdjustment(-quota);
          }

          以上,是 Caffeine 的 W-TinyLFU 策略的設(shè)計(jì)原理及代碼實(shí)現(xiàn)解析。

          異步的高性能讀寫

          一般的緩存每次對數(shù)據(jù)處理完之后(讀的話,已經(jīng)存在則直接返回,不存在則 load 數(shù)據(jù),保存,再返回;寫的話,則直接插入或更新),但是因?yàn)橐S護(hù)一些淘汰策略,則需要一些額外的操作,諸如:

          • 計(jì)算和比較數(shù)據(jù)的是否過期
          • 統(tǒng)計(jì)頻率(像 LFU 或其變種)
          • 維護(hù) read queue 和 write queue
          • 淘汰符合條件的數(shù)據(jù)
          • 等等。。。

          這種數(shù)據(jù)的讀寫伴隨著緩存狀態(tài)的變更,Guava Cache 的做法是把這些操作和讀寫操作放在一起,在一個同步加鎖的操作中完成,雖然 Guava Cache 巧妙地利用了 JDK 的 ConcurrentHashMap(分段鎖或者無鎖 CAS)來降低鎖的密度,達(dá)到提高并發(fā)度的目的。但是,對于一些熱點(diǎn)數(shù)據(jù),這種做法還是避免不了頻繁的鎖競爭。Caffeine 借鑒了數(shù)據(jù)庫系統(tǒng)的 WAL(Write-Ahead Logging)思想,即先寫日志再執(zhí)行操作,這種思想同樣適合緩存的,執(zhí)行讀寫操作時,先把操作記錄在緩沖區(qū),然后在合適的時機(jī)異步、批量地執(zhí)行緩沖區(qū)中的內(nèi)容。但在執(zhí)行緩沖區(qū)的內(nèi)容時,也是需要在緩沖區(qū)加上同步鎖的,不然存在并發(fā)問題,只不過這樣就可以把對鎖的競爭從緩存數(shù)據(jù)轉(zhuǎn)移到對緩沖區(qū)上。

          ReadBuffer

          在 Caffeine 的內(nèi)部實(shí)現(xiàn)中,為了很好的支持不同的 Features(如 Eviction,Removal,Refresh,Statistics,Cleanup,Policy 等等),擴(kuò)展了很多子類,它們共同的父類是BoundedLocalCache,而readBuffer就是作為它們共有的屬性,即都是用一樣的 readBuffer,看定義:

          final?Buffer>?readBuffer;

          readBuffer?=?evicts()?||?collectKeys()?||?collectValues()?||?expiresAfterAccess()
          ??????????new?BoundedBuffer<>()
          ????????:?Buffer.disabled();

          上面提到 Caffeine 對每次緩存的讀操作都會觸發(fā)afterRead

          /**
          ?*?Performs?the?post-processing?work?required?after?a?read.
          ?*
          ?*?@param?node?the?entry?in?the?page?replacement?policy
          ?*?@param?now?the?current?time,?in?nanoseconds
          ?*?@param?recordHit?if?the?hit?count?should?be?incremented
          ?*/

          void?afterRead(Node?node,?long?now,?boolean?recordHit)?{
          ??if?(recordHit)?{
          ????statsCounter().recordHits(1);
          ??}
          ??//把記錄加入到readBuffer
          ??//判斷是否需要立即處理readBuffer
          ??//注意這里無論offer是否成功都可以走下去的,即允許寫入readBuffer丟失,因?yàn)檫@個
          ??boolean?delayable?=?skipReadBuffer()?||?(readBuffer.offer(node)?!=?Buffer.FULL);
          ??if?(shouldDrainBuffers(delayable))?{
          ????scheduleDrainBuffers();
          ??}
          ??refreshIfNeeded(node,?now);
          }

          ?/**
          ???*?Returns?whether?maintenance?work?is?needed.
          ???*
          ???*?@param?delayable?if?draining?the?read?buffer?can?be?delayed
          ???*/


          ??//caffeine用了一組狀態(tài)來定義和管理“維護(hù)”的過程
          ??boolean?shouldDrainBuffers(boolean?delayable)?{
          ????switch?(drainStatus())?{
          ??????case?IDLE:
          ????????return?!delayable;
          ??????case?REQUIRED:
          ????????return?true;
          ??????case?PROCESSING_TO_IDLE:
          ??????case?PROCESSING_TO_REQUIRED:
          ????????return?false;
          ??????default:
          ????????throw?new?IllegalStateException();
          ????}
          ??}

          重點(diǎn)看BoundedBuffer

          /**
          ?*?A?striped,?non-blocking,?bounded?buffer.
          ?*
          ?*?@author[email protected]?(Ben?Manes)
          ?*?@param??the?type?of?elements?maintained?by?this?buffer
          ?*/

          final?class?BoundedBuffer<E>?extends?StripedBuffer<E>

          它是一個 striped、非阻塞、有界限的 buffer,繼承于StripedBuffer類。下面看看StripedBuffer的實(shí)現(xiàn):

          /**
          ?*?A?base?class?providing?the?mechanics?for?supporting?dynamic?striping?of?bounded?buffers.?This
          ?*?implementation?is?an?adaption?of?the?numeric?64-bit?{@link?java.util.concurrent.atomic.Striped64}
          ?*?class,?which?is?used?by?atomic?counters.?The?approach?was?modified?to?lazily?grow?an?array?of
          ?*?buffers?in?order?to?minimize?memory?usage?for?caches?that?are?not?heavily?contended?on.
          ?*
          ?*?@author[email protected]?(Doug?Lea)
          ?*?@author[email protected]?(Ben?Manes)
          ?*/


          abstract?class?StripedBuffer<E>?implements?Buffer<E>

          這個StripedBuffer設(shè)計(jì)的思想是跟Striped64類似的,通過擴(kuò)展結(jié)構(gòu)把競爭熱點(diǎn)分離。

          具體實(shí)現(xiàn)是這樣的,StripedBuffer維護(hù)一個Buffer[]數(shù)組,每個元素就是一個RingBuffer,每個線程用自己threadLocalRandomProbe屬性作為 hash 值,這樣就相當(dāng)于每個線程都有自己“專屬”的RingBuffer,就不會產(chǎn)生競爭啦,而不是用 key 的hashCode作為 hash 值,因?yàn)闀a(chǎn)生熱點(diǎn)數(shù)據(jù)問題。

          看看StripedBuffer的屬性

          /**?Table?of?buffers.?When?non-null,?size?is?a?power?of?2.?*/
          //RingBuffer數(shù)組
          transient?volatile?Buffer?@Nullable[]?table;

          //當(dāng)進(jìn)行resize時,需要整個table鎖住。tableBusy作為CAS的標(biāo)記。
          static?final?long?TABLE_BUSY?=?UnsafeAccess.objectFieldOffset(StripedBuffer.class,?"tableBusy");
          static?final?long?PROBE?=?UnsafeAccess.objectFieldOffset(Thread.class,?"threadLocalRandomProbe");

          /**?Number?of?CPUS.?*/
          static?final?int?NCPU?=?Runtime.getRuntime().availableProcessors();

          /**?The?bound?on?the?table?size.?*/
          //table最大size
          static?final?int?MAXIMUM_TABLE_SIZE?=?4?*?ceilingNextPowerOfTwo(NCPU);

          /**?The?maximum?number?of?attempts?when?trying?to?expand?the?table.?*/
          //如果發(fā)生競爭時(CAS失?。┑膰L試次數(shù)
          static?final?int?ATTEMPTS?=?3;

          /**?Table?of?buffers.?When?non-null,?size?is?a?power?of?2.?*/
          //核心數(shù)據(jù)結(jié)構(gòu)
          transient?volatile?Buffer?@Nullable[]?table;

          /**?Spinlock?(locked?via?CAS)?used?when?resizing?and/or?creating?Buffers.?*/
          transient?volatile?int?tableBusy;

          /**?CASes?the?tableBusy?field?from?0?to?1?to?acquire?lock.?*/
          final?boolean?casTableBusy()?{
          ??return?UnsafeAccess.UNSAFE.compareAndSwapInt(this,?TABLE_BUSY,?0,?1);
          }

          /**
          ?*?Returns?the?probe?value?for?the?current?thread.?Duplicated?from?ThreadLocalRandom?because?of
          ?*?packaging?restrictions.
          ?*/

          static?final?int?getProbe()?{
          ??return?UnsafeAccess.UNSAFE.getInt(Thread.currentThread(),?PROBE);
          }

          offer方法,當(dāng)沒初始化或存在競爭時,則擴(kuò)容為 2 倍。

          實(shí)際是調(diào)用RingBuffer的 offer 方法,把數(shù)據(jù)追加到RingBuffer后面。

          @Override
          public?int?offer(E?e)?{
          ??int?mask;
          ??int?result?=?0;
          ??Buffer?buffer;
          ??//是否不存在競爭
          ??boolean?uncontended?=?true;
          ??Buffer[]?buffers?=?table
          ??//是否已經(jīng)初始化
          ??if?((buffers?==?null)
          ??????||?(mask?=?buffers.length?-?1)?0
          ??????//用thread的隨機(jī)值作為hash值,得到對應(yīng)位置的RingBuffer
          ??????||?(buffer?=?buffers[getProbe()?&?mask])?==?null
          ??????//檢查追加到RingBuffer是否成功
          ??????||?!(uncontended?=?((result?=?buffer.offer(e))?!=?Buffer.FAILED)))?{
          ????//其中一個符合條件則進(jìn)行擴(kuò)容
          ????expandOrRetry(e,?uncontended);
          ??}
          ??return?result;
          }

          /**
          ?*?Handles?cases?of?updates?involving?initialization,?resizing,?creating?new?Buffers,?and/or
          ?*?contention.?See?above?for?explanation.?This?method?suffers?the?usual?non-modularity?problems?of
          ?*?optimistic?retry?code,?relying?on?rechecked?sets?of?reads.
          ?*
          ?*?@param?e?the?element?to?add
          ?*?@param?wasUncontended?false?if?CAS?failed?before?call
          ?*/


          //這個方法比較長,但思路還是相對清晰的。
          @SuppressWarnings("PMD.ConfusingTernary")
          final?void?expandOrRetry(E?e,?boolean?wasUncontended)?{
          ??int?h;
          ??if?((h?=?getProbe())?==?0)?{
          ????ThreadLocalRandom.current();?//?force?initialization
          ????h?=?getProbe();
          ????wasUncontended?=?true;
          ??}
          ??boolean?collide?=?false;?//?True?if?last?slot?nonempty
          ??for?(int?attempt?=?0;?attempt?????Buffer[]?buffers;
          ????Buffer?buffer;
          ????int?n;
          ????if?(((buffers?=?table)?!=?null)?&&?((n?=?buffers.length)?>?0))?{
          ??????if?((buffer?=?buffers[(n?-?1)?&?h])?==?null)?{
          ????????if?((tableBusy?==?0)?&&?casTableBusy())?{?//?Try?to?attach?new?Buffer
          ??????????boolean?created?=?false;
          ??????????try?{?//?Recheck?under?lock
          ????????????Buffer[]?rs;
          ????????????int?mask,?j;
          ????????????if?(((rs?=?table)?!=?null)?&&?((mask?=?rs.length)?>?0)
          ????????????????&&?(rs[j?=?(mask?-?1)?&?h]?==?null))?{
          ??????????????rs[j]?=?create(e);
          ??????????????created?=?true;
          ????????????}
          ??????????}?finally?{
          ????????????tableBusy?=?0;
          ??????????}
          ??????????if?(created)?{
          ????????????break;
          ??????????}
          ??????????continue;?//?Slot?is?now?non-empty
          ????????}
          ????????collide?=?false;
          ??????}?else?if?(!wasUncontended)?{?//?CAS?already?known?to?fail
          ????????wasUncontended?=?true;??????//?Continue?after?rehash
          ??????}?else?if?(buffer.offer(e)?!=?Buffer.FAILED)?{
          ????????break;
          ??????}?else?if?(n?>=?MAXIMUM_TABLE_SIZE?||?table?!=?buffers)?{
          ????????collide?=?false;?//?At?max?size?or?stale
          ??????}?else?if?(!collide)?{
          ????????collide?=?true;
          ??????}?else?if?(tableBusy?==?0?&&?casTableBusy())?{
          ????????try?{
          ??????????if?(table?==?buffers)?{?//?Expand?table?unless?stale
          ????????????table?=?Arrays.copyOf(buffers,?n?<1);
          ??????????}
          ????????}?finally?{
          ??????????tableBusy?=?0;
          ????????}
          ????????collide?=?false;
          ????????continue;?//?Retry?with?expanded?table
          ??????}
          ??????h?=?advanceProbe(h);
          ????}?else?if?((tableBusy?==?0)?&&?(table?==?buffers)?&&?casTableBusy())?{
          ??????boolean?init?=?false;
          ??????try?{?//?Initialize?table
          ????????if?(table?==?buffers)?{
          ??????????@SuppressWarnings({"unchecked",?"rawtypes"})
          ??????????Buffer[]?rs?=?new?Buffer[1];
          ??????????rs[0]?=?create(e);
          ??????????table?=?rs;
          ??????????init?=?true;
          ????????}
          ??????}?finally?{
          ????????tableBusy?=?0;
          ??????}
          ??????if?(init)?{
          ????????break;
          ??????}
          ????}
          ??}
          }

          最后看看RingBuffer,注意RingBufferBoundedBuffer的內(nèi)部類。

          /**?The?maximum?number?of?elements?per?buffer.?*/
          static?final?int?BUFFER_SIZE?=?16;

          //?Assume?4-byte?references?and?64-byte?cache?line?(16?elements?per?line)
          //256長度,但是是以16為單位,所以最多存放16個元素
          static?final?int?SPACED_SIZE?=?BUFFER_SIZE?<4;
          static?final?int?SPACED_MASK?=?SPACED_SIZE?-?1;
          static?final?int?OFFSET?=?16;
          //RingBuffer數(shù)組
          final?AtomicReferenceArray?buffer;

          ?//插入方法
          ?@Override
          ?public?int?offer(E?e)?{
          ???long?head?=?readCounter;
          ???long?tail?=?relaxedWriteCounter();
          ???//用head和tail來限制個數(shù)
          ???long?size?=?(tail?-?head);
          ???if?(size?>=?SPACED_SIZE)?{
          ?????return?Buffer.FULL;
          ???}
          ???//tail追加16
          ???if?(casWriteCounter(tail,?tail?+?OFFSET))?{
          ?????//用tail“取余”得到下標(biāo)
          ?????int?index?=?(int)?(tail?&?SPACED_MASK);
          ?????//用unsafe.putOrderedObject設(shè)值
          ?????buffer.lazySet(index,?e);
          ?????return?Buffer.SUCCESS;
          ???}
          ???//如果CAS失敗則返回失敗
          ???return?Buffer.FAILED;
          ?}

          ?//用consumer來處理buffer的數(shù)據(jù)
          ?@Override
          ?public?void?drainTo(Consumer?consumer)?{
          ???long?head?=?readCounter;
          ???long?tail?=?relaxedWriteCounter();
          ???//判斷數(shù)據(jù)多少
          ???long?size?=?(tail?-?head);
          ???if?(size?==?0)?{
          ?????return;
          ???}
          ???do?{
          ?????int?index?=?(int)?(head?&?SPACED_MASK);
          ?????E?e?=?buffer.get(index);
          ?????if?(e?==?null)?{
          ???????//?not?published?yet
          ???????break;
          ?????}
          ?????buffer.lazySet(index,?null);
          ?????consumer.accept(e);
          ?????//head也跟tail一樣,每次遞增16
          ?????head?+=?OFFSET;
          ???}?while?(head?!=?tail);
          ???lazySetReadCounter(head);
          ?}

          注意,ring buffer 的 size(固定是 16 個)是不變的,變的是 head 和 tail 而已。

          總的來說ReadBuffer有如下特點(diǎn):

          • 使用 Striped-RingBuffer來提升對 buffer 的讀寫
          • 用 thread 的 hash 來避開熱點(diǎn) key 的競爭
          • 允許寫入的丟失

          WriteBuffer

          writeBufferreadBuffer不一樣,主要體現(xiàn)在使用場景的不一樣。本來緩存的一般場景是讀多寫少的,讀的并發(fā)會更高,且 afterRead 顯得沒那么重要,允許延遲甚至丟失。寫不一樣,寫afterWrite不允許丟失,且要求盡量馬上執(zhí)行。Caffeine 使用MPSC(Multiple Producer / Single Consumer)作為 buffer 數(shù)組,實(shí)現(xiàn)在MpscGrowableArrayQueue類,它是仿照JCToolsMpscGrowableArrayQueue來寫的。

          MPSC 允許無鎖的高并發(fā)寫入,但只允許一個消費(fèi)者,同時也犧牲了部分操作。

          MPSC 我打算另外分析,這里不展開了。

          TimerWheel

          除了支持expireAfterAccessexpireAfterWrite之外(Guava Cache 也支持這兩個特性),Caffeine 還支持expireAfter。因?yàn)?/span>expireAfterAccessexpireAfterWrite都只能是固定的過期時間,這可能滿足不了某些場景,譬如記錄的過期時間是需要根據(jù)某些條件而不一樣的,這就需要用戶自定義過期時間。

          先看看expireAfter的用法

          private?static?LoadingCache?cache?=?Caffeine.newBuilder()
          ????????.maximumSize(256L)
          ????????.initialCapacity(1)
          ????????//.expireAfterAccess(2,?TimeUnit.DAYS)
          ????????//.expireAfterWrite(2,?TimeUnit.HOURS)
          ????????.refreshAfterWrite(1,?TimeUnit.HOURS)
          ????????//自定義過期時間
          ????????.expireAfter(new?Expiry()?{
          ????????????//返回創(chuàng)建后的過期時間
          ????????????@Override
          ????????????public?long?expireAfterCreate(@NonNull?String?key,?@NonNull?String?value,?long?currentTime)?{
          ????????????????return?0;
          ????????????}

          ????????????//返回更新后的過期時間
          ????????????@Override
          ????????????public?long?expireAfterUpdate(@NonNull?String?key,?@NonNull?String?value,?long?currentTime,?@NonNegative?long?currentDuration)?{
          ????????????????return?0;
          ????????????}

          ????????????//返回讀取后的過期時間
          ????????????@Override
          ????????????public?long?expireAfterRead(@NonNull?String?key,?@NonNull?String?value,?long?currentTime,?@NonNegative?long?currentDuration)?{
          ????????????????return?0;
          ????????????}
          ????????})
          ????????.recordStats()
          ????????.build(new?CacheLoader()?{
          ????????????@Nullable
          ????????????@Override
          ????????????public?String?load(@NonNull?String?key)?throws?Exception?{
          ????????????????return?"value_"?+?key;
          ????????????}
          ????????});

          通過自定義過期時間,使得不同的 key 可以動態(tài)的得到不同的過期時間。

          注意,我把expireAfterAccessexpireAfterWrite注釋了,因?yàn)檫@兩個特性不能跟expireAfter一起使用。

          而當(dāng)使用了expireAfter特性后,Caffeine 會啟用一種叫“時間輪”的算法來實(shí)現(xiàn)這個功能。更多關(guān)于時間輪的介紹,可以看我的文章HashedWheelTimer 時間輪原理分析[6]。

          好,重點(diǎn)來了,為什么要用時間輪?

          expireAfterAccessexpireAfterWrite的實(shí)現(xiàn)是用一個AccessOrderDeque雙端隊(duì)列,它是 FIFO 的,因?yàn)樗鼈兊倪^期時間是固定的,所以在隊(duì)列頭的數(shù)據(jù)肯定是最早過期的,要處理過期數(shù)據(jù)時,只需要首先看看頭部是否過期,然后再挨個檢查就可以了。但是,如果過期時間不一樣的話,這需要對accessOrderQueue進(jìn)行排序&插入,這個代價太大了。于是,Caffeine 用了一種更加高效、優(yōu)雅的算法-時間輪。

          時間輪的結(jié)構(gòu):

          因?yàn)樵谖业膶r間輪分析的文章里已經(jīng)說了時間輪的原理和機(jī)制了,所以我就不展開 Caffeine 對時間輪的實(shí)現(xiàn)了。

          Caffeine 對時間輪的實(shí)現(xiàn)在TimerWheel,它是一種多層時間輪(hierarchical timing wheels )。

          看看元素加入到時間輪的schedule方法:

          /**
          ?*?Schedules?a?timer?event?for?the?node.
          ?*
          ?*?@param?node?the?entry?in?the?cache
          ?*/

          public?void?schedule(@NonNull?Node?node)?{
          ??Node?sentinel?=?findBucket(node.getVariableTime());
          ??link(sentinel,?node);
          }

          /**
          ?*?Determines?the?bucket?that?the?timer?event?should?be?added?to.
          ?*
          ?*?@param?time?the?time?when?the?event?fires
          ?*?@return?the?sentinel?at?the?head?of?the?bucket
          ?*/

          Node?findBucket(long?time)?{
          ??long?duration?=?time?-?nanos;
          ??int?length?=?wheel.length?-?1;
          ??for?(int?i?=?0;?i?????if?(duration?1])?{
          ??????long?ticks?=?(time?>>>?SHIFT[i]);
          ??????int?index?=?(int)?(ticks?&?(wheel[i].length?-?1));
          ??????return?wheel[i][index];
          ????}
          ??}
          ??return?wheel[length][0];
          }

          /**?Adds?the?entry?at?the?tail?of?the?bucket's?list.?*/
          void?link(Node?sentinel,?Node?node)?{
          ??node.setPreviousInVariableOrder(sentinel.getPreviousInVariableOrder());
          ??node.setNextInVariableOrder(sentinel);

          ??sentinel.getPreviousInVariableOrder().setNextInVariableOrder(node);
          ??sentinel.setPreviousInVariableOrder(node);
          }

          其他

          Caffeine 還有其他的優(yōu)化性能的手段,如使用軟引用和弱引用、消除偽共享、CompletableFuture異步等等。

          總結(jié)

          Caffeien 是一個優(yōu)秀的本地緩存,通過使用 W-TinyLFU 算法, 高性能的 readBuffer 和 WriteBuffer,時間輪算法等,使得它擁有高性能,高命中率(near optimal),低內(nèi)存占用等特點(diǎn)。

          推薦閱讀

          分享基于 Spring Cloud +OAuth2 的權(quán)限管理系統(tǒng)

          鏈家程序員刪公司9TB 數(shù)據(jù) 被判7年

          工作10年后,再看String s = new String("xyz") 創(chuàng)建了幾個對象?

          SpringBoot集成WebSocket,實(shí)現(xiàn)后臺向前端推送信息

          SpringBoot 配置 ELK 環(huán)境

          給代碼寫注釋時有哪些講究?

          程序員該如何把 Windows 系統(tǒng)打造的跟 Mac 一樣牛逼?

          基于 SpringBoot,來實(shí)現(xiàn)MySQL讀寫分離技術(shù)

          瀏覽 21
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  操片免费 | 2020天天日天天干 | 国产精品色在线 | 爱情岛论坛成人自拍 | 伊人婷婷色激情 |