<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          深入源碼分析,緩存之王 Caffeine 為何這么猛?

          共 12386字,需瀏覽 25分鐘

           ·

          2020-12-27 22:27


          點擊上方?泥瓦匠 關(guān)注我!

          老家浙江東海邊,靠海吃海,目前經(jīng)營一個小品牌,讓普通人吃到最新鮮的海鮮。有興趣可以點擊了解:《浙里有漁,鮮人一步!》???

          Caffeine[1]是一個高性能,高命中率,低內(nèi)存占用,near optimal 的本地緩存,簡單來說它是 Guava Cache 的優(yōu)化加強版,有些文章把 Caffeine 稱為“新一代的緩存”、“現(xiàn)代緩存之王”。

          本文將重點講解 Caffeine 的高性能設(shè)計,以及對應(yīng)部分的源碼分析。

          與 Guava Cache 比較

          如果你對 Guava Cache 還不理解的話,可以點擊這里[2]來看一下我之前寫過關(guān)于 Guava Cache 的文章。

          大家都知道,Spring5 即將放棄掉 Guava Cache 作為緩存機制,而改用 Caffeine 作為新的本地 Cache 的組件,這對于 Caffeine 來說是一個很大的肯定。為什么 Spring 會這樣做呢?其實在 Caffeine 的Benchmarks[3]里給出了好靚仔的數(shù)據(jù),對讀和寫的場景,還有跟其他幾個緩存工具進行了比較,Caffeine 的性能都表現(xiàn)很突出。

          使用 Caffeine

          Caffeine 為了方便大家使用以及從 Guava Cache 切換過來(很有針對性啊~),借鑒了 Guava Cache 大部分的概念(諸如核心概念Cache、LoadingCache、CacheLoader、CacheBuilder等等),對于 Caffeine 的理解只要把它當(dāng)作 Guava Cache 就可以了。

          使用上,大家只要把 Caffeine 的包引進來,然后換一下 cache 的實現(xiàn)類,基本應(yīng)該就沒問題了。這對與已經(jīng)使用過 Guava Cache 的同學(xué)來說沒有任何難度,甚至還有一點熟悉的味道,如果你之前沒有使用過 Guava Cache,可以查看 Caffeine 的官方 API 說明文檔[4],其中Population,EvictionRemoval,RefreshStatistics,Cleanup,Policy等等這些特性都是跟 Guava Cache 基本一樣的。

          下面給出一個例子說明怎樣創(chuàng)建一個 Cache:

          private?static?LoadingCache?cache?=?Caffeine.newBuilder()
          ????????????//最大個數(shù)限制
          ????????????.maximumSize(256L)
          ????????????//初始化容量
          ????????????.initialCapacity(1)
          ????????????//訪問后過期(包括讀和寫)
          ????????????.expireAfterAccess(2,?TimeUnit.DAYS)
          ????????????//寫后過期
          ????????????.expireAfterWrite(2,?TimeUnit.HOURS)
          ????????????//寫后自動異步刷新
          ????????????.refreshAfterWrite(1,?TimeUnit.HOURS)
          ????????????//記錄下緩存的一些統(tǒng)計數(shù)據(jù),例如命中率等
          ????????????.recordStats()
          ????????????//cache對緩存寫的通知回調(diào)
          ????????????.writer(new?CacheWriter()?{
          ????????????????@Override
          ????????????????public?void?write(@NonNull?Object?key,?@NonNull?Object?value)?{
          ????????????????????log.info("key={},?CacheWriter?write",?key);
          ????????????????}

          ????????????????@Override
          ????????????????public?void?delete(@NonNull?Object?key,?@Nullable?Object?value,?@NonNull?RemovalCause?cause)?{
          ????????????????????log.info("key={},?cause={},?CacheWriter?delete",?key,?cause);
          ????????????????}
          ????????????})
          ????????????//使用CacheLoader創(chuàng)建一個LoadingCache
          ????????????.build(new?CacheLoader()?{
          ????????????????//同步加載數(shù)據(jù)
          ????????????????@Nullable
          ????????????????@Override
          ????????????????public?String?load(@NonNull?String?key)?throws?Exception?{
          ????????????????????return?"value_"?+?key;
          ????????????????}

          ????????????????//異步加載數(shù)據(jù)
          ????????????????@Nullable
          ????????????????@Override
          ????????????????public?String?reload(@NonNull?String?key,?@NonNull?String?oldValue)?throws?Exception?{
          ????????????????????return?"value_"?+?key;
          ????????????????}
          ????????????});

          更多從 Guava Cache 遷移過來的使用說明,請看這里[5]

          Caffeine 的高性能設(shè)計

          判斷一個緩存的好壞最核心的指標(biāo)就是命中率,影響緩存命中率有很多因素,包括業(yè)務(wù)場景、淘汰策略、清理策略、緩存容量等等。如果作為本地緩存, 它的性能的情況,資源的占用也都是一個很重要的指標(biāo)。下面

          我們來看看 Caffeine 在這幾個方面是怎么著手的,如何做優(yōu)化的。

          (注:本文不會分析 Caffeine 全部源碼,只會對核心設(shè)計的實現(xiàn)進行分析,但我建議讀者把 Caffeine 的源碼都涉獵一下,有個 overview 才能更好理解本文。如果你看過 Guava Cache 的源碼也行,代碼的數(shù)據(jù)結(jié)構(gòu)和處理邏輯很類似的。

          源碼基于:caffeine-2.8.0.jar)

          W-TinyLFU 整體設(shè)計

          上面說到淘汰策略是影響緩存命中率的因素之一,一般比較簡單的緩存就會直接用到?LFU(Least Frequently Used,即最不經(jīng)常使用)?或者LRU(Least Recently Used,即最近最少使用)?,而 Caffeine 就是使用了?W-TinyLFU?算法。

          W-TinyLFU 看名字就能大概猜出來,它是 LFU 的變種,也是一種緩存淘汰算法。那為什么要使用 W-TinyLFU 呢?

          LRU 和 LFU 的缺點

          • LRU 實現(xiàn)簡單,在一般情況下能夠表現(xiàn)出很好的命中率,是一個“性價比”很高的算法,平時也很常用。雖然 LRU 對突發(fā)性的稀疏流量(sparse bursts)表現(xiàn)很好,但同時也會產(chǎn)生緩存污染,舉例來說,如果偶然性的要對全量數(shù)據(jù)進行遍歷,那么“歷史訪問記錄”就會被刷走,造成污染。
          • 如果數(shù)據(jù)的分布在一段時間內(nèi)是固定的話,那么 LFU 可以達到最高的命中率。但是 LFU 有兩個缺點,第一,它需要給每個記錄項維護頻率信息,每次訪問都需要更新,這是個巨大的開銷;第二,對突發(fā)性的稀疏流量無力,因為前期經(jīng)常訪問的記錄已經(jīng)占用了緩存,偶然的流量不太可能會被保留下來,而且過去的一些大量被訪問的記錄在將來也不一定會使用上,這樣就一直把“坑”占著了。

          無論 LRU 還是 LFU 都有其各自的缺點,不過,現(xiàn)在已經(jīng)有很多針對其缺點而改良、優(yōu)化出來的變種算法。

          TinyLFU

          TinyLFU 就是其中一個優(yōu)化算法,它是專門為了解決 LFU 上述提到的兩個問題而被設(shè)計出來的。

          解決第一個問題是采用了 Count–Min Sketch 算法。

          解決第二個問題是讓記錄盡量保持相對的“新鮮”(Freshness Mechanism),并且當(dāng)有新的記錄插入時,可以讓它跟老的記錄進行“PK”,輸者就會被淘汰,這樣一些老的、不再需要的記錄就會被剔除。

          下圖是 TinyLFU 設(shè)計圖(來自官方)

          統(tǒng)計頻率 Count–Min Sketch 算法

          如何對一個 key 進行統(tǒng)計,但又可以節(jié)省空間呢?(不是簡單的使用HashMap,這太消耗內(nèi)存了),注意哦,不需要精確的統(tǒng)計,只需要一個近似值就可以了,怎么樣,這樣場景是不是很熟悉,如果你是老司機,或許已經(jīng)聯(lián)想到布隆過濾器(Bloom Filter)的應(yīng)用了。

          沒錯,將要介紹的 Count–Min Sketch 的原理跟 Bloom Filter 一樣,只不過 Bloom Filter 只有 0 和 1 的值,那么你可以把 Count–Min Sketch 看作是“數(shù)值”版的 Bloom Filter。

          更多關(guān)于 Count–Min Sketch 的介紹請自行搜索。

          在 TinyLFU 中,近似頻率的統(tǒng)計如下圖所示:

          對一個 key 進行多次 hash 函數(shù)后,index 到多個數(shù)組位置后進行累加,查詢時取多個值中的最小值即可。

          Caffeine 對這個算法的實現(xiàn)在FrequencySketch類。但 Caffeine 對此有進一步的優(yōu)化,例如 Count–Min Sketch 使用了二維數(shù)組,Caffeine 只是用了一個一維的數(shù)組;再者,如果是數(shù)值類型的話,這個數(shù)需要用 int 或 long 來存儲,但是 Caffeine 認為緩存的訪問頻率不需要用到那么大,只需要 15 就足夠,一般認為達到 15 次的頻率算是很高的了,而且 Caffeine 還有另外一個機制來使得這個頻率進行衰退減半(下面就會講到)。如果最大是 15 的話,那么只需要 4 個 bit 就可以滿足了,一個 long 有 64bit,可以存儲 16 個這樣的統(tǒng)計數(shù),Caffeine 就是這樣的設(shè)計,使得存儲效率提高了 16 倍。

          Caffeine 對緩存的讀寫(afterReadafterWrite方法)都會調(diào)用onAccesss 方法,而onAccess方法里有一句:

          frequencySketch().increment(key);

          這句就是追加記錄的頻率,下面我們看看具體實現(xiàn)

          //FrequencySketch的一些屬性

          //種子數(shù)
          static?final?long[]?SEED?=?{?//?A?mixture?of?seeds?from?FNV-1a,?CityHash,?and?Murmur3
          ????0xc3a5c85c97cb3127L,?0xb492b66fbe98f273L,?0x9ae16a3b2f90404fL,?0xcbf29ce484222325L};
          static?final?long?RESET_MASK?=?0x7777777777777777L;
          static?final?long?ONE_MASK?=?0x1111111111111111L;

          int?sampleSize;
          //為了快速根據(jù)hash值得到table的index值的掩碼
          //table的長度size一般為2的n次方,而tableMask為size-1,這樣就可以通過&操作來模擬取余操作,速度快很多,老司機都知道
          int?tableMask;
          //存儲數(shù)據(jù)的一維long數(shù)組
          long[]?table;
          int?size;

          /**
          ?*?Increments?the?popularity?of?the?element?if?it?does?not?exceed?the?maximum?(15).?The?popularity
          ?*?of?all?elements?will?be?periodically?down?sampled?when?the?observed?events?exceeds?a?threshold.
          ?*?This?process?provides?a?frequency?aging?to?allow?expired?long?term?entries?to?fade?away.
          ?*
          ?*?@param?e?the?element?to?add
          ?*/

          public?void?increment(@NonNull?E?e)?{
          ??if?(isNotInitialized())?{
          ????return;
          ??}

          ??//根據(jù)key的hashCode通過一個哈希函數(shù)得到一個hash值
          ??//本來就是hashCode了,為什么還要再做一次hash?怕原來的hashCode不夠均勻分散,再打散一下。
          ??int?hash?=?spread(e.hashCode());
          ??//這句光看有點難理解
          ??//就如我剛才說的,Caffeine把一個long的64bit劃分成16個等分,每一等分4個bit。
          ??//這個start就是用來定位到是哪一個等分的,用hash值低兩位作為隨機數(shù),再左移2位,得到一個小于16的值
          ??int?start?=?(hash?&?3)?<2;

          ??//indexOf方法的意思就是,根據(jù)hash值和不同種子得到table的下標(biāo)index
          ??//這里通過四個不同的種子,得到四個不同的下標(biāo)index
          ??int?index0?=?indexOf(hash,?0);
          ??int?index1?=?indexOf(hash,?1);
          ??int?index2?=?indexOf(hash,?2);
          ??int?index3?=?indexOf(hash,?3);

          ??//根據(jù)index和start(+1,?+2,?+3)的值,把table[index]對應(yīng)的等分追加1
          ??//這個incrementAt方法有點難理解,看我下面的解釋
          ??boolean?added?=?incrementAt(index0,?start);
          ??added?|=?incrementAt(index1,?start?+?1);
          ??added?|=?incrementAt(index2,?start?+?2);
          ??added?|=?incrementAt(index3,?start?+?3);

          ??//這個reset等下說
          ??if?(added?&&?(++size?==?sampleSize))?{
          ????reset();
          ??}
          }

          /**
          ?*?Increments?the?specified?counter?by?1?if?it?is?not?already?at?the?maximum?value?(15).
          ?*
          ?*?@param?i?the?table?index?(16?counters)
          ?*?@param?j?the?counter?to?increment
          ?*?@return?if?incremented
          ?*/

          boolean?incrementAt(int?i,?int?j)?{
          ??//這個j表示16個等分的下標(biāo),那么offset就是相當(dāng)于在64位中的下標(biāo)(這個自己想想)
          ??int?offset?=?j?<2;
          ??//上面提到Caffeine把頻率統(tǒng)計最大定為15,即0xfL
          ??//mask就是在64位中的掩碼,即1111后面跟很多個0
          ??long?mask?=?(0xfL?<??//如果&的結(jié)果不等于15,那么就追加1。等于15就不會再加了
          ??if?((table[i]?&?mask)?!=?mask)?{
          ????table[i]?+=?(1L?<????return?true;
          ??}
          ??return?false;
          }

          /**
          ?*?Returns?the?table?index?for?the?counter?at?the?specified?depth.
          ?*
          ?*?@param?item?the?element's?hash
          ?*?@param?i?the?counter?depth
          ?*?@return?the?table?index
          ?*/

          int?indexOf(int?item,?int?i)?{
          ??long?hash?=?SEED[i]?*?item;
          ??hash?+=?hash?>>>?32;
          ??return?((int)?hash)?&?tableMask;
          }

          /**
          ?*?Applies?a?supplemental?hash?function?to?a?given?hashCode,?which?defends?against?poor?quality
          ?*?hash?functions.
          ?*/

          int?spread(int?x)?{
          ??x?=?((x?>>>?16)?^?x)?*?0x45d9f3b;
          ??x?=?((x?>>>?16)?^?x)?*?0x45d9f3b;
          ??return?(x?>>>?16)?^?x;
          }

          知道了追加方法,那么讀取方法frequency就很容易理解了。

          /**
          ?*?Returns?the?estimated?number?of?occurrences?of?an?element,?up?to?the?maximum?(15).
          ?*
          ?*?@param?e?the?element?to?count?occurrences?of
          ?*?@return?the?estimated?number?of?occurrences?of?the?element;?possibly?zero?but?never?negative
          ?*/

          @NonNegative
          public?int?frequency(@NonNull?E?e)?{
          ??if?(isNotInitialized())?{
          ????return?0;
          ??}

          ??//得到hash值,跟上面一樣
          ??int?hash?=?spread(e.hashCode());
          ??//得到等分的下標(biāo),跟上面一樣
          ??int?start?=?(hash?&?3)?<2;
          ??int?frequency?=?Integer.MAX_VALUE;
          ??//循環(huán)四次,分別獲取在table數(shù)組中不同的下標(biāo)位置
          ??for?(int?i?=?0;?i?4;?i++)?{
          ????int?index?=?indexOf(hash,?i);
          ????//這個操作就不多說了,其實跟上面incrementAt是一樣的,定位到table[index]?+?等分的位置,再根據(jù)mask取出計數(shù)值
          ????int?count?=?(int)?((table[index]?>>>?((start?+?i)?<2))?&?0xfL);
          ????//取四個中的較小值
          ????frequency?=?Math.min(frequency,?count);
          ??}
          ??return?frequency;
          }

          通過代碼和注釋或者讀者可能難以理解,下圖是我畫出來幫助大家理解的結(jié)構(gòu)圖。

          注意紫色虛線框,其中藍色小格就是需要計算的位置:


          保新機制

          為了讓緩存保持“新鮮”,剔除掉過往頻率很高但之后不經(jīng)常的緩存,Caffeine 有一個 Freshness Mechanism。做法很簡答,就是當(dāng)整體的統(tǒng)計計數(shù)(當(dāng)前所有記錄的頻率統(tǒng)計之和,這個數(shù)值內(nèi)部維護)達到某一個值時,那么所有記錄的頻率統(tǒng)計除以 2。

          從上面的代碼

          //size變量就是所有記錄的頻率統(tǒng)計之,即每個記錄加1,這個size都會加1
          //sampleSize一個閾值,從FrequencySketch初始化可以看到它的值為maximumSize的10倍
          if?(added?&&?(++size?==?sampleSize))?{
          ??????reset();
          }

          看到reset方法就是做這個事情

          /**?Reduces?every?counter?by?half?of?its?original?value.?*/
          void?reset()?{
          ??int?count?=?0;
          ??for?(int?i?=?0;?i?????count?+=?Long.bitCount(table[i]?&?ONE_MASK);
          ????table[i]?=?(table[i]?>>>?1)?&?RESET_MASK;
          ??}
          ??size?=?(size?>>>?1)?-?(count?>>>?2);
          }

          關(guān)于這個 reset 方法,為什么是除以 2,而不是其他,及其正確性,在最下面的參考資料的 TinyLFU 論文中 3.3 章節(jié)給出了數(shù)學(xué)證明,大家有興趣可以看看。

          增加一個 Window?

          Caffeine 通過測試發(fā)現(xiàn) TinyLFU 在面對突發(fā)性的稀疏流量(sparse bursts)時表現(xiàn)很差,因為新的記錄(new items)還沒來得及建立足夠的頻率就被剔除出去了,這就使得命中率下降。

          于是 Caffeine 設(shè)計出一種新的 policy,即?Window Tiny LFU(W-TinyLFU),并通過實驗和實踐發(fā)現(xiàn) W-TinyLFU 比 TinyLFU 表現(xiàn)的更好。

          W-TinyLFU 的設(shè)計如下所示(兩圖等價):

          它主要包括兩個緩存模塊,主緩存是 SLRU(Segmented LRU,即分段 LRU),SLRU 包括一個名為 protected 和一個名為 probation 的緩存區(qū)。通過增加一個緩存區(qū)(即 Window Cache),當(dāng)有新的記錄插入時,會先在 window 區(qū)呆一下,就可以避免上述說的 sparse bursts 問題。

          淘汰策略(eviction policy)

          當(dāng) window 區(qū)滿了,就會根據(jù) LRU 把 candidate(即淘汰出來的元素)放到 probation 區(qū),如果 probation 區(qū)也滿了,就把 candidate 和 probation 將要淘汰的元素 victim,兩個進行“PK”,勝者留在 probation,輸者就要被淘汰了。

          而且經(jīng)過實驗發(fā)現(xiàn)當(dāng) window 區(qū)配置為總?cè)萘康?1%,剩余的 99%當(dāng)中的 80%分給 protected 區(qū),20%分給 probation 區(qū)時,這時整體性能和命中率表現(xiàn)得最好,所以 Caffeine 默認的比例設(shè)置就是這個。

          不過這個比例 Caffeine 會在運行時根據(jù)統(tǒng)計數(shù)據(jù)(statistics)去動態(tài)調(diào)整,如果你的應(yīng)用程序的緩存隨著時間變化比較快的話,那么增加 window 區(qū)的比例可以提高命中率,相反緩存都是比較固定不變的話,增加 Main Cache 區(qū)(protected 區(qū) +probation 區(qū))的比例會有較好的效果。

          下面我們看看上面說到的淘汰策略是怎么實現(xiàn)的:

          一般緩存對讀寫操作后都有后續(xù)的一系列“維護”操作,Caffeine 也不例外,這些操作都在maintenance方法,我們將要說到的淘汰策略也在里面。

          這方法比較重要,下面也會提到,所以這里只先說跟“淘汰策略”有關(guān)的evictEntriesclimb。

          /**
          ???*?Performs?the?pending?maintenance?work?and?sets?the?state?flags?during?processing?to?avoid
          ???*?excess?scheduling?attempts.?The?read?buffer,?write?buffer,?and?reference?queues?are
          ???*?drained,?followed?by?expiration,?and?size-based?eviction.
          ???*
          ???*?@param?task?an?additional?pending?task?to?run,?or?{@code?null}?if?not?present
          ???*/

          ??@GuardedBy("evictionLock")
          ??void?maintenance(@Nullable?Runnable?task)?{
          ????lazySetDrainStatus(PROCESSING_TO_IDLE);

          ????try?{
          ??????drainReadBuffer();

          ??????drainWriteBuffer();
          ??????if?(task?!=?null)?{
          ????????task.run();
          ??????}

          ??????drainKeyReferences();
          ??????drainValueReferences();

          ??????expireEntries();
          ??????//把符合條件的記錄淘汰掉
          ??????evictEntries();
          ??????//動態(tài)調(diào)整window區(qū)和protected區(qū)的大小
          ??????climb();
          ????}?finally?{
          ??????if?((drainStatus()?!=?PROCESSING_TO_IDLE)?||?!casDrainStatus(PROCESSING_TO_IDLE,?IDLE))?{
          ????????lazySetDrainStatus(REQUIRED);
          ??????}
          ????}
          ??}
          先說一下 Caffeine 對上面說到的 W-TinyLFU 策略的實現(xiàn)用到的數(shù)據(jù)結(jié)構(gòu):
          //最大的個數(shù)限制
          long?maximum;
          //當(dāng)前的個數(shù)
          long?weightedSize;
          //window區(qū)的最大限制
          long?windowMaximum;
          //window區(qū)當(dāng)前的個數(shù)
          long?windowWeightedSize;
          //protected區(qū)的最大限制
          long?mainProtectedMaximum;
          //protected區(qū)當(dāng)前的個數(shù)
          long?mainProtectedWeightedSize;
          //下一次需要調(diào)整的大?。ㄟ€需要進一步計算)
          double?stepSize;
          //window區(qū)需要調(diào)整的大小
          long?adjustment;
          //命中計數(shù)
          int?hitsInSample;
          //不命中的計數(shù)
          int?missesInSample;
          //上一次的緩存命中率
          double?previousSampleHitRate;

          final?FrequencySketch?sketch;
          //window區(qū)的LRU?queue(FIFO)
          final?AccessOrderDeque>?accessOrderWindowDeque;
          //probation區(qū)的LRU?queue(FIFO)
          final?AccessOrderDeque>?accessOrderProbationDeque;
          //protected區(qū)的LRU?queue(FIFO)
          final?AccessOrderDeque>?accessOrderProtectedDeque;

          以及默認比例設(shè)置(意思看注釋)

          /**?The?initial?percent?of?the?maximum?weighted?capacity?dedicated?to?the?main?space.?*/
          static?final?double?PERCENT_MAIN?=?0.99d;
          /**?The?percent?of?the?maximum?weighted?capacity?dedicated?to?the?main's?protected?space.?*/
          static?final?double?PERCENT_MAIN_PROTECTED?=?0.80d;
          /**?The?difference?in?hit?rates?that?restarts?the?climber.?*/
          static?final?double?HILL_CLIMBER_RESTART_THRESHOLD?=?0.05d;
          /**?The?percent?of?the?total?size?to?adapt?the?window?by.?*/
          static?final?double?HILL_CLIMBER_STEP_PERCENT?=?0.0625d;
          /**?The?rate?to?decrease?the?step?size?to?adapt?by.?*/
          static?final?double?HILL_CLIMBER_STEP_DECAY_RATE?=?0.98d;
          /**?The?maximum?number?of?entries?that?can?be?transfered?between?queues.?*/

          重點來了,evictEntriesclimb方法:

          /**?Evicts?entries?if?the?cache?exceeds?the?maximum.?*/
          @GuardedBy("evictionLock")
          void?evictEntries()?{
          ??if?(!evicts())?{
          ????return;
          ??}
          ??//淘汰window區(qū)的記錄
          ??int?candidates?=?evictFromWindow();
          ??//淘汰Main區(qū)的記錄
          ??evictFromMain(candidates);
          }

          /**
          ?*?Evicts?entries?from?the?window?space?into?the?main?space?while?the?window?size?exceeds?a
          ?*?maximum.
          ?*
          ?*?@return?the?number?of?candidate?entries?evicted?from?the?window?space
          ?*/

          //根據(jù)W-TinyLFU,新的數(shù)據(jù)都會無條件的加到admission?window
          //但是window是有大小限制,所以要“定期”做一下“維護”
          @GuardedBy("evictionLock")
          int?evictFromWindow()?{
          ??int?candidates?=?0;
          ??//查看window?queue的頭部節(jié)點
          ??Node?node?=?accessOrderWindowDeque().peek();
          ??//如果window區(qū)超過了最大的限制,那么就要把“多出來”的記錄做處理
          ??while?(windowWeightedSize()?>?windowMaximum())?{
          ????//?The?pending?operations?will?adjust?the?size?to?reflect?the?correct?weight
          ????if?(node?==?null)?{
          ??????break;
          ????}
          ????//下一個節(jié)點
          ????Node?next?=?node.getNextInAccessOrder();
          ????if?(node.getWeight()?!=?0)?{
          ??????//把node定位在probation區(qū)
          ??????node.makeMainProbation();
          ??????//從window區(qū)去掉
          ??????accessOrderWindowDeque().remove(node);
          ??????//加入到probation?queue,相當(dāng)于把節(jié)點移動到probation區(qū)(晉升了)
          ??????accessOrderProbationDeque().add(node);
          ??????candidates++;
          ??????//因為移除了一個節(jié)點,所以需要調(diào)整window的size
          ??????setWindowWeightedSize(windowWeightedSize()?-?node.getPolicyWeight());
          ????}
          ????//處理下一個節(jié)點
          ????node?=?next;
          ??}

          ??return?candidates;
          }

          evictFromMain方法:

          /**
          ?*?Evicts?entries?from?the?main?space?if?the?cache?exceeds?the?maximum?capacity.?The?main?space
          ?*?determines?whether?admitting?an?entry?(coming?from?the?window?space)?is?preferable?to?retaining
          ?*?the?eviction?policy's?victim.?This?is?decision?is?made?using?a?frequency?filter?so?that?the
          ?*?least?frequently?used?entry?is?removed.
          ?*
          ?*?The?window?space?candidates?were?previously?placed?in?the?MRU?position?and?the?eviction
          ?*?policy's?victim?is?at?the?LRU?position.?The?two?ends?of?the?queue?are?evaluated?while?an
          ?*?eviction?is?required.?The?number?of?remaining?candidates?is?provided?and?decremented?on
          ?*?eviction,?so?that?when?there?are?no?more?candidates?the?victim?is?evicted.
          ?*
          ?*?@param?candidates?the?number?of?candidate?entries?evicted?from?the?window?space
          ?*/

          //根據(jù)W-TinyLFU,從window晉升過來的要跟probation區(qū)的進行“PK”,勝者才能留下
          @GuardedBy("evictionLock")
          void?evictFromMain(int?candidates)?{
          ??int?victimQueue?=?PROBATION;
          ??//victim是probation?queue的頭部
          ??Node?victim?=?accessOrderProbationDeque().peekFirst();
          ??//candidate是probation?queue的尾部,也就是剛從window晉升來的
          ??Node?candidate?=?accessOrderProbationDeque().peekLast();
          ??//當(dāng)cache不夠容量時才做處理
          ??while?(weightedSize()?>?maximum())?{
          ????//?Stop?trying?to?evict?candidates?and?always?prefer?the?victim
          ????if?(candidates?==?0)?{
          ??????candidate?=?null;
          ????}

          ????//對candidate為null且victim為bull的處理
          ????if?((candidate?==?null)?&&?(victim?==?null))?{
          ??????if?(victimQueue?==?PROBATION)?{
          ????????victim?=?accessOrderProtectedDeque().peekFirst();
          ????????victimQueue?=?PROTECTED;
          ????????continue;
          ??????}?else?if?(victimQueue?==?PROTECTED)?{
          ????????victim?=?accessOrderWindowDeque().peekFirst();
          ????????victimQueue?=?WINDOW;
          ????????continue;
          ??????}

          ??????//?The?pending?operations?will?adjust?the?size?to?reflect?the?correct?weight
          ??????break;
          ????}

          ????//對節(jié)點的weight為0的處理
          ????if?((victim?!=?null)?&&?(victim.getPolicyWeight()?==?0))?{
          ??????victim?=?victim.getNextInAccessOrder();
          ??????continue;
          ????}?else?if?((candidate?!=?null)?&&?(candidate.getPolicyWeight()?==?0))?{
          ??????candidate?=?candidate.getPreviousInAccessOrder();
          ??????candidates--;
          ??????continue;
          ????}

          ????//?Evict?immediately?if?only?one?of?the?entries?is?present
          ????if?(victim?==?null)?{
          ??????@SuppressWarnings("NullAway")
          ??????Node?previous?=?candidate.getPreviousInAccessOrder();
          ??????Node?evict?=?candidate;
          ??????candidate?=?previous;
          ??????candidates--;
          ??????evictEntry(evict,?RemovalCause.SIZE,?0L);
          ??????continue;
          ????}?else?if?(candidate?==?null)?{
          ??????Node?evict?=?victim;
          ??????victim?=?victim.getNextInAccessOrder();
          ??????evictEntry(evict,?RemovalCause.SIZE,?0L);
          ??????continue;
          ????}

          ????//?Evict?immediately?if?an?entry?was?collected
          ????K?victimKey?=?victim.getKey();
          ????K?candidateKey?=?candidate.getKey();
          ????if?(victimKey?==?null)?{
          ??????@NonNull?Node?evict?=?victim;
          ??????victim?=?victim.getNextInAccessOrder();
          ??????evictEntry(evict,?RemovalCause.COLLECTED,?0L);
          ??????continue;
          ????}?else?if?(candidateKey?==?null)?{
          ??????candidates--;
          ??????@NonNull?Node?evict?=?candidate;
          ??????candidate?=?candidate.getPreviousInAccessOrder();
          ??????evictEntry(evict,?RemovalCause.COLLECTED,?0L);
          ??????continue;
          ????}

          ????//放不下的節(jié)點直接處理掉
          ????if?(candidate.getPolicyWeight()?>?maximum())?{
          ??????candidates--;
          ??????Node?evict?=?candidate;
          ??????candidate?=?candidate.getPreviousInAccessOrder();
          ??????evictEntry(evict,?RemovalCause.SIZE,?0L);
          ??????continue;
          ????}

          ????//根據(jù)節(jié)點的統(tǒng)計頻率frequency來做比較,看看要處理掉victim還是candidate
          ????//admit是具體的比較規(guī)則,看下面
          ????candidates--;
          ????//如果candidate勝出則淘汰victim
          ????if?(admit(candidateKey,?victimKey))?{
          ??????Node?evict?=?victim;
          ??????victim?=?victim.getNextInAccessOrder();
          ??????evictEntry(evict,?RemovalCause.SIZE,?0L);
          ??????candidate?=?candidate.getPreviousInAccessOrder();
          ????}?else?{
          ??????//如果是victim勝出,則淘汰candidate
          ??????Node?evict?=?candidate;
          ??????candidate?=?candidate.getPreviousInAccessOrder();
          ??????evictEntry(evict,?RemovalCause.SIZE,?0L);
          ????}
          ??}
          }

          /**
          ?*?Determines?if?the?candidate?should?be?accepted?into?the?main?space,?as?determined?by?its
          ?*?frequency?relative?to?the?victim.?A?small?amount?of?randomness?is?used?to?protect?against?hash
          ?*?collision?attacks,?where?the?victim's?frequency?is?artificially?raised?so?that?no?new?entries
          ?*?are?admitted.
          ?*
          ?*?@param?candidateKey?the?key?for?the?entry?being?proposed?for?long?term?retention
          ?*?@param?victimKey?the?key?for?the?entry?chosen?by?the?eviction?policy?for?replacement
          ?*?@return?if?the?candidate?should?be?admitted?and?the?victim?ejected
          ?*/

          @GuardedBy("evictionLock")
          boolean?admit(K?candidateKey,?K?victimKey)?{
          ??//分別獲取victim和candidate的統(tǒng)計頻率
          ??//frequency這個方法的原理和實現(xiàn)上面已經(jīng)解釋了
          ??int?victimFreq?=?frequencySketch().frequency(victimKey);
          ??int?candidateFreq?=?frequencySketch().frequency(candidateKey);
          ??//誰大誰贏
          ??if?(candidateFreq?>?victimFreq)?{
          ????return?true;

          ????//如果相等,candidate小于5都當(dāng)輸了
          ??}?else?if?(candidateFreq?<=?5)?{
          ????//?The?maximum?frequency?is?15?and?halved?to?7?after?a?reset?to?age?the?history.?An?attack
          ????//?exploits?that?a?hot?candidate?is?rejected?in?favor?of?a?hot?victim.?The?threshold?of?a?warm
          ????//?candidate?reduces?the?number?of?random?acceptances?to?minimize?the?impact?on?the?hit?rate.
          ????return?false;
          ??}
          ??//如果相等且candidate大于5,則隨機淘汰一個
          ??int?random?=?ThreadLocalRandom.current().nextInt();
          ??return?((random?&?127)?==?0);
          }

          climb方法主要是用來調(diào)整 window size 的,使得 Caffeine 可以適應(yīng)你的應(yīng)用類型(如 OLAP 或 OLTP)表現(xiàn)出最佳的命中率。

          下圖是官方測試的數(shù)據(jù):

          我們看看 window size 的調(diào)整是怎么實現(xiàn)的。

          調(diào)整時用到的默認比例數(shù)據(jù):

          //與上次命中率之差的閾值
          static?final?double?HILL_CLIMBER_RESTART_THRESHOLD?=?0.05d;
          //步長(調(diào)整)的大?。ǜ畲笾祄aximum的比例)
          static?final?double?HILL_CLIMBER_STEP_PERCENT?=?0.0625d;
          //步長的衰減比例
          static?final?double?HILL_CLIMBER_STEP_DECAY_RATE?=?0.98d;
          ??/**?Adapts?the?eviction?policy?to?towards?the?optimal?recency?/?frequency?configuration.?*/
          //climb方法的主要作用就是動態(tài)調(diào)整window區(qū)的大小(相應(yīng)的,main區(qū)的大小也會發(fā)生變化,兩個之和為100%)。
          //因為區(qū)域的大小發(fā)生了變化,那么區(qū)域內(nèi)的數(shù)據(jù)也可能需要發(fā)生相應(yīng)的移動。
          @GuardedBy("evictionLock")
          void?climb()?{
          ??if?(!evicts())?{
          ????return;
          ??}
          ??//確定window需要調(diào)整的大小
          ??determineAdjustment();
          ??//如果protected區(qū)有溢出,把溢出部分移動到probation區(qū)。因為下面的操作有可能需要調(diào)整到protected區(qū)。
          ??demoteFromMainProtected();
          ??long?amount?=?adjustment();
          ??if?(amount?==?0)?{
          ????return;
          ??}?else?if?(amount?>?0)?{
          ????//增加window的大小
          ????increaseWindow();
          ??}?else?{
          ????//減少window的大小
          ????decreaseWindow();
          ??}
          }

          下面分別展開每個方法來解釋:

          /**?Calculates?the?amount?to?adapt?the?window?by?and?sets?{@link?#adjustment()}?accordingly.?*/
          @GuardedBy("evictionLock")
          void?determineAdjustment()?{
          ??//如果frequencySketch還沒初始化,則返回
          ??if?(frequencySketch().isNotInitialized())?{
          ????setPreviousSampleHitRate(0.0);
          ????setMissesInSample(0);
          ????setHitsInSample(0);
          ????return;
          ??}
          ??//總請求量?=?命中?+?miss
          ??int?requestCount?=?hitsInSample()?+?missesInSample();
          ??//沒達到sampleSize則返回
          ??//默認下sampleSize = 10?* maximum。用sampleSize來判斷緩存是否足夠”熱“。
          ??if?(requestCount?????return;
          ??}

          ??//命中率的公式?=?命中?/?總請求
          ??double?hitRate?=?(double)?hitsInSample()?/?requestCount;
          ??//命中率的差值
          ??double?hitRateChange?=?hitRate?-?previousSampleHitRate();
          ??//本次調(diào)整的大小,是由命中率的差值和上次的stepSize決定的
          ??double?amount?=?(hitRateChange?>=?0)???stepSize()?:?-stepSize();
          ??//下次的調(diào)整大?。喝绻新实闹畲笥?.05,則重置為0.065 * maximum,否則按照0.98來進行衰減
          ??double?nextStepSize?=?(Math.abs(hitRateChange)?>=?HILL_CLIMBER_RESTART_THRESHOLD)
          ????????HILL_CLIMBER_STEP_PERCENT?*?maximum()?*?(amount?>=?0???1?:?-1)
          ??????:?HILL_CLIMBER_STEP_DECAY_RATE?*?amount;
          ??setPreviousSampleHitRate(hitRate);
          ??setAdjustment((long)?amount);
          ??setStepSize(nextStepSize);
          ??setMissesInSample(0);
          ??setHitsInSample(0);
          }

          /**?Transfers?the?nodes?from?the?protected?to?the?probation?region?if?it?exceeds?the?maximum.?*/

          //這個方法比較簡單,減少protected區(qū)溢出的部分
          @GuardedBy("evictionLock")
          void?demoteFromMainProtected()?{
          ??long?mainProtectedMaximum?=?mainProtectedMaximum();
          ??long?mainProtectedWeightedSize?=?mainProtectedWeightedSize();
          ??if?(mainProtectedWeightedSize?<=?mainProtectedMaximum)?{
          ????return;
          ??}

          ??for?(int?i?=?0;?i?????if?(mainProtectedWeightedSize?<=?mainProtectedMaximum)?{
          ??????break;
          ????}

          ????Node?demoted?=?accessOrderProtectedDeque().poll();
          ????if?(demoted?==?null)?{
          ??????break;
          ????}
          ????demoted.makeMainProbation();
          ????accessOrderProbationDeque().add(demoted);
          ????mainProtectedWeightedSize?-=?demoted.getPolicyWeight();
          ??}
          ??setMainProtectedWeightedSize(mainProtectedWeightedSize);
          }

          /**
          ?*?Increases?the?size?of?the?admission?window?by?shrinking?the?portion?allocated?to?the?main
          ?*?space.?As?the?main?space?is?partitioned?into?probation?and?protected?regions?(80%?/?20%),?for
          ?*?simplicity?only?the?protected?is?reduced.?If?the?regions?exceed?their?maximums,?this?may?cause
          ?*?protected?items?to?be?demoted?to?the?probation?region?and?probation?items?to?be?demoted?to?the
          ?*?admission?window.
          ?*/


          //增加window區(qū)的大小,這個方法比較簡單,思路就像我上面說的
          @GuardedBy("evictionLock")
          void?increaseWindow()?{
          ??if?(mainProtectedMaximum()?==?0)?{
          ????return;
          ??}

          ??long?quota?=?Math.min(adjustment(),?mainProtectedMaximum());
          ??setMainProtectedMaximum(mainProtectedMaximum()?-?quota);
          ??setWindowMaximum(windowMaximum()?+?quota);
          ??demoteFromMainProtected();

          ??for?(int?i?=?0;?i?????Node?candidate?=?accessOrderProbationDeque().peek();
          ????boolean?probation?=?true;
          ????if?((candidate?==?null)?||?(quota???????candidate?=?accessOrderProtectedDeque().peek();
          ??????probation?=?false;
          ????}
          ????if?(candidate?==?null)?{
          ??????break;
          ????}

          ????int?weight?=?candidate.getPolicyWeight();
          ????if?(quota???????break;
          ????}

          ????quota?-=?weight;
          ????if?(probation)?{
          ??????accessOrderProbationDeque().remove(candidate);
          ????}?else?{
          ??????setMainProtectedWeightedSize(mainProtectedWeightedSize()?-?weight);
          ??????accessOrderProtectedDeque().remove(candidate);
          ????}
          ????setWindowWeightedSize(windowWeightedSize()?+?weight);
          ????accessOrderWindowDeque().add(candidate);
          ????candidate.makeWindow();
          ??}

          ??setMainProtectedMaximum(mainProtectedMaximum()?+?quota);
          ??setWindowMaximum(windowMaximum()?-?quota);
          ??setAdjustment(quota);
          }

          /**?Decreases?the?size?of?the?admission?window?and?increases?the?main's?protected?region.?*/
          //同上increaseWindow差不多,反操作
          @GuardedBy("evictionLock")
          void?decreaseWindow()?{
          ??if?(windowMaximum()?<=?1)?{
          ????return;
          ??}

          ??long?quota?=?Math.min(-adjustment(),?Math.max(0,?windowMaximum()?-?1));
          ??setMainProtectedMaximum(mainProtectedMaximum()?+?quota);
          ??setWindowMaximum(windowMaximum()?-?quota);

          ??for?(int?i?=?0;?i?????Node?candidate?=?accessOrderWindowDeque().peek();
          ????if?(candidate?==?null)?{
          ??????break;
          ????}

          ????int?weight?=?candidate.getPolicyWeight();
          ????if?(quota???????break;
          ????}

          ????quota?-=?weight;
          ????setMainProtectedWeightedSize(mainProtectedWeightedSize()?+?weight);
          ????setWindowWeightedSize(windowWeightedSize()?-?weight);
          ????accessOrderWindowDeque().remove(candidate);
          ????accessOrderProbationDeque().add(candidate);
          ????candidate.makeMainProbation();
          ??}

          ??setMainProtectedMaximum(mainProtectedMaximum()?-?quota);
          ??setWindowMaximum(windowMaximum()?+?quota);
          ??setAdjustment(-quota);
          }

          以上,是 Caffeine 的 W-TinyLFU 策略的設(shè)計原理及代碼實現(xiàn)解析。

          異步的高性能讀寫

          一般的緩存每次對數(shù)據(jù)處理完之后(讀的話,已經(jīng)存在則直接返回,不存在則 load 數(shù)據(jù),保存,再返回;寫的話,則直接插入或更新),但是因為要維護一些淘汰策略,則需要一些額外的操作,諸如:

          • 計算和比較數(shù)據(jù)的是否過期
          • 統(tǒng)計頻率(像 LFU 或其變種)
          • 維護 read queue 和 write queue
          • 淘汰符合條件的數(shù)據(jù)
          • 等等。。。

          這種數(shù)據(jù)的讀寫伴隨著緩存狀態(tài)的變更,Guava Cache 的做法是把這些操作和讀寫操作放在一起,在一個同步加鎖的操作中完成,雖然 Guava Cache 巧妙地利用了 JDK 的 ConcurrentHashMap(分段鎖或者無鎖 CAS)來降低鎖的密度,達到提高并發(fā)度的目的。但是,對于一些熱點數(shù)據(jù),這種做法還是避免不了頻繁的鎖競爭。Caffeine 借鑒了數(shù)據(jù)庫系統(tǒng)的 WAL(Write-Ahead Logging)思想,即先寫日志再執(zhí)行操作,這種思想同樣適合緩存的,執(zhí)行讀寫操作時,先把操作記錄在緩沖區(qū),然后在合適的時機異步、批量地執(zhí)行緩沖區(qū)中的內(nèi)容。但在執(zhí)行緩沖區(qū)的內(nèi)容時,也是需要在緩沖區(qū)加上同步鎖的,不然存在并發(fā)問題,只不過這樣就可以把對鎖的競爭從緩存數(shù)據(jù)轉(zhuǎn)移到對緩沖區(qū)上。

          ReadBuffer

          在 Caffeine 的內(nèi)部實現(xiàn)中,為了很好的支持不同的 Features(如 Eviction,Removal,Refresh,Statistics,Cleanup,Policy 等等),擴展了很多子類,它們共同的父類是BoundedLocalCache,而readBuffer就是作為它們共有的屬性,即都是用一樣的 readBuffer,看定義:

          final?Buffer>?readBuffer;

          readBuffer?=?evicts()?||?collectKeys()?||?collectValues()?||?expiresAfterAccess()
          ??????????new?BoundedBuffer<>()
          ????????:?Buffer.disabled();

          上面提到 Caffeine 對每次緩存的讀操作都會觸發(fā)afterRead

          /**
          ?*?Performs?the?post-processing?work?required?after?a?read.
          ?*
          ?*?@param?node?the?entry?in?the?page?replacement?policy
          ?*?@param?now?the?current?time,?in?nanoseconds
          ?*?@param?recordHit?if?the?hit?count?should?be?incremented
          ?*/

          void?afterRead(Node?node,?long?now,?boolean?recordHit)?{
          ??if?(recordHit)?{
          ????statsCounter().recordHits(1);
          ??}
          ??//把記錄加入到readBuffer
          ??//判斷是否需要立即處理readBuffer
          ??//注意這里無論offer是否成功都可以走下去的,即允許寫入readBuffer丟失,因為這個
          ??boolean?delayable?=?skipReadBuffer()?||?(readBuffer.offer(node)?!=?Buffer.FULL);
          ??if?(shouldDrainBuffers(delayable))?{
          ????scheduleDrainBuffers();
          ??}
          ??refreshIfNeeded(node,?now);
          }

          ?/**
          ???*?Returns?whether?maintenance?work?is?needed.
          ???*
          ???*?@param?delayable?if?draining?the?read?buffer?can?be?delayed
          ???*/


          ??//caffeine用了一組狀態(tài)來定義和管理“維護”的過程
          ??boolean?shouldDrainBuffers(boolean?delayable)?{
          ????switch?(drainStatus())?{
          ??????case?IDLE:
          ????????return?!delayable;
          ??????case?REQUIRED:
          ????????return?true;
          ??????case?PROCESSING_TO_IDLE:
          ??????case?PROCESSING_TO_REQUIRED:
          ????????return?false;
          ??????default:
          ????????throw?new?IllegalStateException();
          ????}
          ??}

          重點看BoundedBuffer

          /**
          ?*?A?striped,?non-blocking,?bounded?buffer.
          ?*
          ?*?@author[email protected]?(Ben?Manes)
          ?*?@param??the?type?of?elements?maintained?by?this?buffer
          ?*/

          final?class?BoundedBuffer<E>?extends?StripedBuffer<E>

          它是一個 striped、非阻塞、有界限的 buffer,繼承于StripedBuffer類。下面看看StripedBuffer的實現(xiàn):

          /**
          ?*?A?base?class?providing?the?mechanics?for?supporting?dynamic?striping?of?bounded?buffers.?This
          ?*?implementation?is?an?adaption?of?the?numeric?64-bit?{@link?java.util.concurrent.atomic.Striped64}
          ?*?class,?which?is?used?by?atomic?counters.?The?approach?was?modified?to?lazily?grow?an?array?of
          ?*?buffers?in?order?to?minimize?memory?usage?for?caches?that?are?not?heavily?contended?on.
          ?*
          ?*?@author[email protected]?(Doug?Lea)
          ?*?@author[email protected]?(Ben?Manes)
          ?*/


          abstract?class?StripedBuffer<E>?implements?Buffer<E>

          這個StripedBuffer設(shè)計的思想是跟Striped64類似的,通過擴展結(jié)構(gòu)把競爭熱點分離。

          具體實現(xiàn)是這樣的,StripedBuffer維護一個Buffer[]數(shù)組,每個元素就是一個RingBuffer,每個線程用自己threadLocalRandomProbe屬性作為 hash 值,這樣就相當(dāng)于每個線程都有自己“專屬”的RingBuffer,就不會產(chǎn)生競爭啦,而不是用 key 的hashCode作為 hash 值,因為會產(chǎn)生熱點數(shù)據(jù)問題。

          看看StripedBuffer的屬性

          /**?Table?of?buffers.?When?non-null,?size?is?a?power?of?2.?*/
          //RingBuffer數(shù)組
          transient?volatile?Buffer?@Nullable[]?table;

          //當(dāng)進行resize時,需要整個table鎖住。tableBusy作為CAS的標(biāo)記。
          static?final?long?TABLE_BUSY?=?UnsafeAccess.objectFieldOffset(StripedBuffer.class,?"tableBusy");
          static?final?long?PROBE?=?UnsafeAccess.objectFieldOffset(Thread.class,?"threadLocalRandomProbe");

          /**?Number?of?CPUS.?*/
          static?final?int?NCPU?=?Runtime.getRuntime().availableProcessors();

          /**?The?bound?on?the?table?size.?*/
          //table最大size
          static?final?int?MAXIMUM_TABLE_SIZE?=?4?*?ceilingNextPowerOfTwo(NCPU);

          /**?The?maximum?number?of?attempts?when?trying?to?expand?the?table.?*/
          //如果發(fā)生競爭時(CAS失?。┑膰L試次數(shù)
          static?final?int?ATTEMPTS?=?3;

          /**?Table?of?buffers.?When?non-null,?size?is?a?power?of?2.?*/
          //核心數(shù)據(jù)結(jié)構(gòu)
          transient?volatile?Buffer?@Nullable[]?table;

          /**?Spinlock?(locked?via?CAS)?used?when?resizing?and/or?creating?Buffers.?*/
          transient?volatile?int?tableBusy;

          /**?CASes?the?tableBusy?field?from?0?to?1?to?acquire?lock.?*/
          final?boolean?casTableBusy()?{
          ??return?UnsafeAccess.UNSAFE.compareAndSwapInt(this,?TABLE_BUSY,?0,?1);
          }

          /**
          ?*?Returns?the?probe?value?for?the?current?thread.?Duplicated?from?ThreadLocalRandom?because?of
          ?*?packaging?restrictions.
          ?*/

          static?final?int?getProbe()?{
          ??return?UnsafeAccess.UNSAFE.getInt(Thread.currentThread(),?PROBE);
          }

          offer方法,當(dāng)沒初始化或存在競爭時,則擴容為 2 倍。

          實際是調(diào)用RingBuffer的 offer 方法,把數(shù)據(jù)追加到RingBuffer后面。

          @Override
          public?int?offer(E?e)?{
          ??int?mask;
          ??int?result?=?0;
          ??Buffer?buffer;
          ??//是否不存在競爭
          ??boolean?uncontended?=?true;
          ??Buffer[]?buffers?=?table
          ??//是否已經(jīng)初始化
          ??if?((buffers?==?null)
          ??????||?(mask?=?buffers.length?-?1)?0
          ??????//用thread的隨機值作為hash值,得到對應(yīng)位置的RingBuffer
          ??????||?(buffer?=?buffers[getProbe()?&?mask])?==?null
          ??????//檢查追加到RingBuffer是否成功
          ??????||?!(uncontended?=?((result?=?buffer.offer(e))?!=?Buffer.FAILED)))?{
          ????//其中一個符合條件則進行擴容
          ????expandOrRetry(e,?uncontended);
          ??}
          ??return?result;
          }

          /**
          ?*?Handles?cases?of?updates?involving?initialization,?resizing,?creating?new?Buffers,?and/or
          ?*?contention.?See?above?for?explanation.?This?method?suffers?the?usual?non-modularity?problems?of
          ?*?optimistic?retry?code,?relying?on?rechecked?sets?of?reads.
          ?*
          ?*?@param?e?the?element?to?add
          ?*?@param?wasUncontended?false?if?CAS?failed?before?call
          ?*/


          //這個方法比較長,但思路還是相對清晰的。
          @SuppressWarnings("PMD.ConfusingTernary")
          final?void?expandOrRetry(E?e,?boolean?wasUncontended)?{
          ??int?h;
          ??if?((h?=?getProbe())?==?0)?{
          ????ThreadLocalRandom.current();?//?force?initialization
          ????h?=?getProbe();
          ????wasUncontended?=?true;
          ??}
          ??boolean?collide?=?false;?//?True?if?last?slot?nonempty
          ??for?(int?attempt?=?0;?attempt?????Buffer[]?buffers;
          ????Buffer?buffer;
          ????int?n;
          ????if?(((buffers?=?table)?!=?null)?&&?((n?=?buffers.length)?>?0))?{
          ??????if?((buffer?=?buffers[(n?-?1)?&?h])?==?null)?{
          ????????if?((tableBusy?==?0)?&&?casTableBusy())?{?//?Try?to?attach?new?Buffer
          ??????????boolean?created?=?false;
          ??????????try?{?//?Recheck?under?lock
          ????????????Buffer[]?rs;
          ????????????int?mask,?j;
          ????????????if?(((rs?=?table)?!=?null)?&&?((mask?=?rs.length)?>?0)
          ????????????????&&?(rs[j?=?(mask?-?1)?&?h]?==?null))?{
          ??????????????rs[j]?=?create(e);
          ??????????????created?=?true;
          ????????????}
          ??????????}?finally?{
          ????????????tableBusy?=?0;
          ??????????}
          ??????????if?(created)?{
          ????????????break;
          ??????????}
          ??????????continue;?//?Slot?is?now?non-empty
          ????????}
          ????????collide?=?false;
          ??????}?else?if?(!wasUncontended)?{?//?CAS?already?known?to?fail
          ????????wasUncontended?=?true;??????//?Continue?after?rehash
          ??????}?else?if?(buffer.offer(e)?!=?Buffer.FAILED)?{
          ????????break;
          ??????}?else?if?(n?>=?MAXIMUM_TABLE_SIZE?||?table?!=?buffers)?{
          ????????collide?=?false;?//?At?max?size?or?stale
          ??????}?else?if?(!collide)?{
          ????????collide?=?true;
          ??????}?else?if?(tableBusy?==?0?&&?casTableBusy())?{
          ????????try?{
          ??????????if?(table?==?buffers)?{?//?Expand?table?unless?stale
          ????????????table?=?Arrays.copyOf(buffers,?n?<1);
          ??????????}
          ????????}?finally?{
          ??????????tableBusy?=?0;
          ????????}
          ????????collide?=?false;
          ????????continue;?//?Retry?with?expanded?table
          ??????}
          ??????h?=?advanceProbe(h);
          ????}?else?if?((tableBusy?==?0)?&&?(table?==?buffers)?&&?casTableBusy())?{
          ??????boolean?init?=?false;
          ??????try?{?//?Initialize?table
          ????????if?(table?==?buffers)?{
          ??????????@SuppressWarnings({"unchecked",?"rawtypes"})
          ??????????Buffer[]?rs?=?new?Buffer[1];
          ??????????rs[0]?=?create(e);
          ??????????table?=?rs;
          ??????????init?=?true;
          ????????}
          ??????}?finally?{
          ????????tableBusy?=?0;
          ??????}
          ??????if?(init)?{
          ????????break;
          ??????}
          ????}
          ??}
          }

          最后看看RingBuffer,注意RingBufferBoundedBuffer的內(nèi)部類。

          /**?The?maximum?number?of?elements?per?buffer.?*/
          static?final?int?BUFFER_SIZE?=?16;

          //?Assume?4-byte?references?and?64-byte?cache?line?(16?elements?per?line)
          //256長度,但是是以16為單位,所以最多存放16個元素
          static?final?int?SPACED_SIZE?=?BUFFER_SIZE?<4;
          static?final?int?SPACED_MASK?=?SPACED_SIZE?-?1;
          static?final?int?OFFSET?=?16;
          //RingBuffer數(shù)組
          final?AtomicReferenceArray?buffer;

          ?//插入方法
          ?@Override
          ?public?int?offer(E?e)?{
          ???long?head?=?readCounter;
          ???long?tail?=?relaxedWriteCounter();
          ???//用head和tail來限制個數(shù)
          ???long?size?=?(tail?-?head);
          ???if?(size?>=?SPACED_SIZE)?{
          ?????return?Buffer.FULL;
          ???}
          ???//tail追加16
          ???if?(casWriteCounter(tail,?tail?+?OFFSET))?{
          ?????//用tail“取余”得到下標(biāo)
          ?????int?index?=?(int)?(tail?&?SPACED_MASK);
          ?????//用unsafe.putOrderedObject設(shè)值
          ?????buffer.lazySet(index,?e);
          ?????return?Buffer.SUCCESS;
          ???}
          ???//如果CAS失敗則返回失敗
          ???return?Buffer.FAILED;
          ?}

          ?//用consumer來處理buffer的數(shù)據(jù)
          ?@Override
          ?public?void?drainTo(Consumer?consumer)?{
          ???long?head?=?readCounter;
          ???long?tail?=?relaxedWriteCounter();
          ???//判斷數(shù)據(jù)多少
          ???long?size?=?(tail?-?head);
          ???if?(size?==?0)?{
          ?????return;
          ???}
          ???do?{
          ?????int?index?=?(int)?(head?&?SPACED_MASK);
          ?????E?e?=?buffer.get(index);
          ?????if?(e?==?null)?{
          ???????//?not?published?yet
          ???????break;
          ?????}
          ?????buffer.lazySet(index,?null);
          ?????consumer.accept(e);
          ?????//head也跟tail一樣,每次遞增16
          ?????head?+=?OFFSET;
          ???}?while?(head?!=?tail);
          ???lazySetReadCounter(head);
          ?}

          注意,ring buffer 的 size(固定是 16 個)是不變的,變的是 head 和 tail 而已。

          總的來說ReadBuffer有如下特點:

          • 使用?Striped-RingBuffer來提升對 buffer 的讀寫
          • 用 thread 的 hash 來避開熱點 key 的競爭
          • 允許寫入的丟失

          WriteBuffer

          writeBufferreadBuffer不一樣,主要體現(xiàn)在使用場景的不一樣。本來緩存的一般場景是讀多寫少的,讀的并發(fā)會更高,且 afterRead 顯得沒那么重要,允許延遲甚至丟失。寫不一樣,寫afterWrite不允許丟失,且要求盡量馬上執(zhí)行。Caffeine 使用MPSC(Multiple Producer / Single Consumer)作為 buffer 數(shù)組,實現(xiàn)在MpscGrowableArrayQueue類,它是仿照JCToolsMpscGrowableArrayQueue來寫的。

          MPSC 允許無鎖的高并發(fā)寫入,但只允許一個消費者,同時也犧牲了部分操作。

          MPSC 我打算另外分析,這里不展開了。

          TimerWheel

          除了支持expireAfterAccessexpireAfterWrite之外(Guava Cache 也支持這兩個特性),Caffeine 還支持expireAfter。因為expireAfterAccessexpireAfterWrite都只能是固定的過期時間,這可能滿足不了某些場景,譬如記錄的過期時間是需要根據(jù)某些條件而不一樣的,這就需要用戶自定義過期時間。

          先看看expireAfter的用法

          private?static?LoadingCache?cache?=?Caffeine.newBuilder()
          ????????.maximumSize(256L)
          ????????.initialCapacity(1)
          ????????//.expireAfterAccess(2,?TimeUnit.DAYS)
          ????????//.expireAfterWrite(2,?TimeUnit.HOURS)
          ????????.refreshAfterWrite(1,?TimeUnit.HOURS)
          ????????//自定義過期時間
          ????????.expireAfter(new?Expiry()?{
          ????????????//返回創(chuàng)建后的過期時間
          ????????????@Override
          ????????????public?long?expireAfterCreate(@NonNull?String?key,?@NonNull?String?value,?long?currentTime)?{
          ????????????????return?0;
          ????????????}

          ????????????//返回更新后的過期時間
          ????????????@Override
          ????????????public?long?expireAfterUpdate(@NonNull?String?key,?@NonNull?String?value,?long?currentTime,?@NonNegative?long?currentDuration)?{
          ????????????????return?0;
          ????????????}

          ????????????//返回讀取后的過期時間
          ????????????@Override
          ????????????public?long?expireAfterRead(@NonNull?String?key,?@NonNull?String?value,?long?currentTime,?@NonNegative?long?currentDuration)?{
          ????????????????return?0;
          ????????????}
          ????????})
          ????????.recordStats()
          ????????.build(new?CacheLoader()?{
          ????????????@Nullable
          ????????????@Override
          ????????????public?String?load(@NonNull?String?key)?throws?Exception?{
          ????????????????return?"value_"?+?key;
          ????????????}
          ????????});

          通過自定義過期時間,使得不同的 key 可以動態(tài)的得到不同的過期時間。

          注意,我把expireAfterAccessexpireAfterWrite注釋了,因為這兩個特性不能跟expireAfter一起使用。

          而當(dāng)使用了expireAfter特性后,Caffeine 會啟用一種叫“時間輪”的算法來實現(xiàn)這個功能。更多關(guān)于時間輪的介紹,可以看我的文章HashedWheelTimer 時間輪原理分析[6]

          好,重點來了,為什么要用時間輪?

          expireAfterAccessexpireAfterWrite的實現(xiàn)是用一個AccessOrderDeque雙端隊列,它是 FIFO 的,因為它們的過期時間是固定的,所以在隊列頭的數(shù)據(jù)肯定是最早過期的,要處理過期數(shù)據(jù)時,只需要首先看看頭部是否過期,然后再挨個檢查就可以了。但是,如果過期時間不一樣的話,這需要對accessOrderQueue進行排序&插入,這個代價太大了。于是,Caffeine 用了一種更加高效、優(yōu)雅的算法-時間輪。

          時間輪的結(jié)構(gòu):

          因為在我的對時間輪分析的文章里已經(jīng)說了時間輪的原理和機制了,所以我就不展開 Caffeine 對時間輪的實現(xiàn)了。

          Caffeine 對時間輪的實現(xiàn)在TimerWheel,它是一種多層時間輪(hierarchical timing wheels )。

          看看元素加入到時間輪的schedule方法:

          /**
          ?*?Schedules?a?timer?event?for?the?node.
          ?*
          ?*?@param?node?the?entry?in?the?cache
          ?*/

          public?void?schedule(@NonNull?Node?node)?{
          ??Node?sentinel?=?findBucket(node.getVariableTime());
          ??link(sentinel,?node);
          }

          /**
          ?*?Determines?the?bucket?that?the?timer?event?should?be?added?to.
          ?*
          ?*?@param?time?the?time?when?the?event?fires
          ?*?@return?the?sentinel?at?the?head?of?the?bucket
          ?*/

          Node?findBucket(long?time)?{
          ??long?duration?=?time?-?nanos;
          ??int?length?=?wheel.length?-?1;
          ??for?(int?i?=?0;?i?????if?(duration?1])?{
          ??????long?ticks?=?(time?>>>?SHIFT[i]);
          ??????int?index?=?(int)?(ticks?&?(wheel[i].length?-?1));
          ??????return?wheel[i][index];
          ????}
          ??}
          ??return?wheel[length][0];
          }

          /**?Adds?the?entry?at?the?tail?of?the?bucket's?list.?*/
          void?link(Node?sentinel,?Node?node)?{
          ??node.setPreviousInVariableOrder(sentinel.getPreviousInVariableOrder());
          ??node.setNextInVariableOrder(sentinel);

          ??sentinel.getPreviousInVariableOrder().setNextInVariableOrder(node);
          ??sentinel.setPreviousInVariableOrder(node);
          }

          其他

          Caffeine 還有其他的優(yōu)化性能的手段,如使用軟引用和弱引用、消除偽共享、CompletableFuture異步等等。

          總結(jié)

          Caffeien 是一個優(yōu)秀的本地緩存,通過使用 W-TinyLFU 算法, 高性能的 readBuffer 和 WriteBuffer,時間輪算法等,使得它擁有高性能,高命中率(near optimal),低內(nèi)存占用等特點。

          參考資料

          TinyLFU 論文[7]

          Design Of A Modern Cache[8]

          Design Of A Modern Cache—Part Deux[9]

          Caffeine 的 github[10]

          參考資料

          [1]

          Caffeine:?https://github.com/ben-manes/caffeine

          [2]

          這里:?https://albenw.github.io/posts/df42dc84/

          [3]

          Benchmarks:?https://github.com/ben-manes/caffeine/wiki/Benchmarks

          [4]

          官方API說明文檔:?https://github.com/ben-manes/caffeine/wiki

          [5]

          這里:?https://github.com/ben-manes/caffeine/wiki/Guava

          [6]

          HashedWheelTimer時間輪原理分析:?https://albenw.github.io/posts/ec8df8c/

          [7]

          TinyLFU論文:?https://arxiv.org/abs/1512.00727

          [8]

          Design Of A Modern Cache:?http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html

          [9]

          Design Of A Modern Cache—Part Deux:?http://highscalability.com/blog/2019/2/25/design-of-a-modern-cachepart-deux.html

          [10]

          Caffeine的github:?https://github.com/ben-manes/caffeine

          老家浙江東海邊,靠海吃海,目前經(jīng)營一個小品牌,讓普通人吃到最新鮮的海鮮。有興趣可以點擊了解:《浙里有漁,鮮人一步!》???

          點擊領(lǐng)?。撼绦騿T最新學(xué)習(xí)資料!

          往期推薦

          消息隊列(MQ)詳細總結(jié),程序員必看!

          配置中心 Nacos 不同環(huán)境的配置管理方案

          如何手動獲取 Spring 容器中的 Bean?

          Spring Boot 項目優(yōu)化和 JVM 調(diào)優(yōu)(樓主親測,真實有效)


          下方二維碼關(guān)注我

          技術(shù)草根堅持分享?編程,算法,架構(gòu)

          朋友助力下!點個在看!
          瀏覽 188
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  开心五月深深爱婷婷和丁香 | 天天操鸡巴 | 国产无码激情视频 | 成人AV导航 | 做爱成人视频 |