<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          多圖剖析緩存之王 Caffeine 高性能設(shè)計(jì)

          共 67691字,需瀏覽 136分鐘

           ·

          2021-06-19 15:07

          以下文章來(lái)源于武培軒,回復(fù)資源獲取資料

          概要

          Caffeine[1]是一個(gè)高性能,高命中率,低內(nèi)存占用,near optimal 的本地緩存,簡(jiǎn)單來(lái)說(shuō)它是 Guava Cache 的優(yōu)化加強(qiáng)版,有些文章把 Caffeine 稱(chēng)為“新一代的緩存”、“現(xiàn)代緩存之王”。

          本文將重點(diǎn)講解 Caffeine 的高性能設(shè)計(jì),以及對(duì)應(yīng)部分的源碼分析。

          與 Guava Cache 比較

          如果你對(duì) Guava Cache 還不理解的話,可以點(diǎn)擊這里[2]來(lái)看一下我之前寫(xiě)過(guò)關(guān)于 Guava Cache 的文章。

          大家都知道,Spring5 即將放棄掉 Guava Cache 作為緩存機(jī)制,而改用 Caffeine 作為新的本地 Cache 的組件,這對(duì)于 Caffeine 來(lái)說(shuō)是一個(gè)很大的肯定。為什么 Spring 會(huì)這樣做呢?其實(shí)在 Caffeine 的Benchmarks[3]里給出了好靚仔的數(shù)據(jù),對(duì)讀和寫(xiě)的場(chǎng)景,還有跟其他幾個(gè)緩存工具進(jìn)行了比較,Caffeine 的性能都表現(xiàn)很突出。

          使用 Caffeine

          Caffeine 為了方便大家使用以及從 Guava Cache 切換過(guò)來(lái)(很有針對(duì)性啊~),借鑒了 Guava Cache 大部分的概念(諸如核心概念CacheLoadingCacheCacheLoaderCacheBuilder等等),對(duì)于 Caffeine 的理解只要把它當(dāng)作 Guava Cache 就可以了。

          使用上,大家只要把 Caffeine 的包引進(jìn)來(lái),然后換一下 cache 的實(shí)現(xiàn)類(lèi),基本應(yīng)該就沒(méi)問(wèn)題了。這對(duì)與已經(jīng)使用過(guò) Guava Cache 的同學(xué)來(lái)說(shuō)沒(méi)有任何難度,甚至還有一點(diǎn)熟悉的味道,如果你之前沒(méi)有使用過(guò) Guava Cache,可以查看 Caffeine 的官方 API 說(shuō)明文檔[4],其中PopulationEvictionRemovalRefreshStatisticsCleanupPolicy等等這些特性都是跟 Guava Cache 基本一樣的。

          下面給出一個(gè)例子說(shuō)明怎樣創(chuàng)建一個(gè) Cache:

          private static LoadingCache<String, String> cache = Caffeine.newBuilder()
                      //最大個(gè)數(shù)限制
                      .maximumSize(256L)
                      //初始化容量
                      .initialCapacity(1)
                      //訪問(wèn)后過(guò)期(包括讀和寫(xiě))
                      .expireAfterAccess(2, TimeUnit.DAYS)
                      //寫(xiě)后過(guò)期
                      .expireAfterWrite(2, TimeUnit.HOURS)
                      //寫(xiě)后自動(dòng)異步刷新
                      .refreshAfterWrite(1, TimeUnit.HOURS)
                      //記錄下緩存的一些統(tǒng)計(jì)數(shù)據(jù),例如命中率等
                      .recordStats()
                      //cache對(duì)緩存寫(xiě)的通知回調(diào)
                      .writer(new CacheWriter<Object, Object>() {
                          @Override
                          public void write(@NonNull Object key, @NonNull Object value) {
                              log.info("key={}, CacheWriter write", key);
                          }

                          @Override
                          public void delete(@NonNull Object key, @Nullable Object value, @NonNull RemovalCause cause) {
                              log.info("key={}, cause={}, CacheWriter delete", key, cause);
                          }
                      })
                      //使用CacheLoader創(chuàng)建一個(gè)LoadingCache
                      .build(new CacheLoader<String, String>() {
                          //同步加載數(shù)據(jù)
                          @Nullable
                          @Override
                          public String load(@NonNull String key) throws Exception {
                              return "value_" + key;
                          }

                          //異步加載數(shù)據(jù)
                          @Nullable
                          @Override
                          public String reload(@NonNull String key, @NonNull String oldValue) throws Exception {
                              return "value_" + key;
                          }
                      });

          更多從 Guava Cache 遷移過(guò)來(lái)的使用說(shuō)明,請(qǐng)看這里[5]

          Caffeine 的高性能設(shè)計(jì)

          判斷一個(gè)緩存的好壞最核心的指標(biāo)就是命中率,影響緩存命中率有很多因素,包括業(yè)務(wù)場(chǎng)景、淘汰策略、清理策略、緩存容量等等。如果作為本地緩存, 它的性能的情況,資源的占用也都是一個(gè)很重要的指標(biāo)。下面

          我們來(lái)看看 Caffeine 在這幾個(gè)方面是怎么著手的,如何做優(yōu)化的。

          (注:本文不會(huì)分析 Caffeine 全部源碼,只會(huì)對(duì)核心設(shè)計(jì)的實(shí)現(xiàn)進(jìn)行分析,但我建議讀者把 Caffeine 的源碼都涉獵一下,有個(gè) overview 才能更好理解本文。如果你看過(guò) Guava Cache 的源碼也行,代碼的數(shù)據(jù)結(jié)構(gòu)和處理邏輯很類(lèi)似的。

          源碼基于:caffeine-2.8.0.jar)

          W-TinyLFU 整體設(shè)計(jì)

          上面說(shuō)到淘汰策略是影響緩存命中率的因素之一,一般比較簡(jiǎn)單的緩存就會(huì)直接用到 LFU(Least Frequently Used,即最不經(jīng)常使用) 或者LRU(Least Recently Used,即最近最少使用) ,而 Caffeine 就是使用了 W-TinyLFU 算法。

          W-TinyLFU 看名字就能大概猜出來(lái),它是 LFU 的變種,也是一種緩存淘汰算法。那為什么要使用 W-TinyLFU 呢?

          LRU 和 LFU 的缺點(diǎn)

          • LRU 實(shí)現(xiàn)簡(jiǎn)單,在一般情況下能夠表現(xiàn)出很好的命中率,是一個(gè)“性?xún)r(jià)比”很高的算法,平時(shí)也很常用。雖然 LRU 對(duì)突發(fā)性的稀疏流量(sparse bursts)表現(xiàn)很好,但同時(shí)也會(huì)產(chǎn)生緩存污染,舉例來(lái)說(shuō),如果偶然性的要對(duì)全量數(shù)據(jù)進(jìn)行遍歷,那么“歷史訪問(wèn)記錄”就會(huì)被刷走,造成污染。
          • 如果數(shù)據(jù)的分布在一段時(shí)間內(nèi)是固定的話,那么 LFU 可以達(dá)到最高的命中率。但是 LFU 有兩個(gè)缺點(diǎn),第一,它需要給每個(gè)記錄項(xiàng)維護(hù)頻率信息,每次訪問(wèn)都需要更新,這是個(gè)巨大的開(kāi)銷(xiāo);第二,對(duì)突發(fā)性的稀疏流量無(wú)力,因?yàn)榍捌诮?jīng)常訪問(wèn)的記錄已經(jīng)占用了緩存,偶然的流量不太可能會(huì)被保留下來(lái),而且過(guò)去的一些大量被訪問(wèn)的記錄在將來(lái)也不一定會(huì)使用上,這樣就一直把“坑”占著了。

          無(wú)論 LRU 還是 LFU 都有其各自的缺點(diǎn),不過(guò),現(xiàn)在已經(jīng)有很多針對(duì)其缺點(diǎn)而改良、優(yōu)化出來(lái)的變種算法。

          TinyLFU

          TinyLFU 就是其中一個(gè)優(yōu)化算法,它是專(zhuān)門(mén)為了解決 LFU 上述提到的兩個(gè)問(wèn)題而被設(shè)計(jì)出來(lái)的。

          解決第一個(gè)問(wèn)題是采用了 Count–Min Sketch 算法。

          解決第二個(gè)問(wèn)題是讓記錄盡量保持相對(duì)的“新鮮”(Freshness Mechanism),并且當(dāng)有新的記錄插入時(shí),可以讓它跟老的記錄進(jìn)行“PK”,輸者就會(huì)被淘汰,這樣一些老的、不再需要的記錄就會(huì)被剔除。

          下圖是 TinyLFU 設(shè)計(jì)圖(來(lái)自官方)

          統(tǒng)計(jì)頻率 Count–Min Sketch 算法

          如何對(duì)一個(gè) key 進(jìn)行統(tǒng)計(jì),但又可以節(jié)省空間呢?(不是簡(jiǎn)單的使用HashMap,這太消耗內(nèi)存了),注意哦,不需要精確的統(tǒng)計(jì),只需要一個(gè)近似值就可以了,怎么樣,這樣場(chǎng)景是不是很熟悉,如果你是老司機(jī),或許已經(jīng)聯(lián)想到布隆過(guò)濾器(Bloom Filter)的應(yīng)用了。

          沒(méi)錯(cuò),將要介紹的 Count–Min Sketch 的原理跟 Bloom Filter 一樣,只不過(guò) Bloom Filter 只有 0 和 1 的值,那么你可以把 Count–Min Sketch 看作是“數(shù)值”版的 Bloom Filter。

          更多關(guān)于 Count–Min Sketch 的介紹請(qǐng)自行搜索。

          在 TinyLFU 中,近似頻率的統(tǒng)計(jì)如下圖所示:

          對(duì)一個(gè) key 進(jìn)行多次 hash 函數(shù)后,index 到多個(gè)數(shù)組位置后進(jìn)行累加,查詢(xún)時(shí)取多個(gè)值中的最小值即可。

          Caffeine 對(duì)這個(gè)算法的實(shí)現(xiàn)在FrequencySketch類(lèi)。但 Caffeine 對(duì)此有進(jìn)一步的優(yōu)化,例如 Count–Min Sketch 使用了二維數(shù)組,Caffeine 只是用了一個(gè)一維的數(shù)組;再者,如果是數(shù)值類(lèi)型的話,這個(gè)數(shù)需要用 int 或 long 來(lái)存儲(chǔ),但是 Caffeine 認(rèn)為緩存的訪問(wèn)頻率不需要用到那么大,只需要 15 就足夠,一般認(rèn)為達(dá)到 15 次的頻率算是很高的了,而且 Caffeine 還有另外一個(gè)機(jī)制來(lái)使得這個(gè)頻率進(jìn)行衰退減半(下面就會(huì)講到)。如果最大是 15 的話,那么只需要 4 個(gè) bit 就可以滿(mǎn)足了,一個(gè) long 有 64bit,可以存儲(chǔ) 16 個(gè)這樣的統(tǒng)計(jì)數(shù),Caffeine 就是這樣的設(shè)計(jì),使得存儲(chǔ)效率提高了 16 倍。

          Caffeine 對(duì)緩存的讀寫(xiě)(afterReadafterWrite方法)都會(huì)調(diào)用onAccesss 方法,而onAccess方法里有一句:

          frequencySketch().increment(key);

          這句就是追加記錄的頻率,下面我們看看具體實(shí)現(xiàn)

          //FrequencySketch的一些屬性

          //種子數(shù)
          static final long[] SEED = { // A mixture of seeds from FNV-1a, CityHash, and Murmur3
              0xc3a5c85c97cb3127L0xb492b66fbe98f273L0x9ae16a3b2f90404fL0xcbf29ce484222325L};
          static final long RESET_MASK = 0x7777777777777777L;
          static final long ONE_MASK = 0x1111111111111111L;

          int sampleSize;
          //為了快速根據(jù)hash值得到table的index值的掩碼
          //table的長(zhǎng)度size一般為2的n次方,而tableMask為size-1,這樣就可以通過(guò)&操作來(lái)模擬取余操作,速度快很多,老司機(jī)都知道
          int tableMask;
          //存儲(chǔ)數(shù)據(jù)的一維long數(shù)組
          long[] table;
          int size;

          /**
           * Increments the popularity of the element if it does not exceed the maximum (15). The popularity
           * of all elements will be periodically down sampled when the observed events exceeds a threshold.
           * This process provides a frequency aging to allow expired long term entries to fade away.
           *
           * @param e the element to add
           */

          public void increment(@NonNull E e) {
            if (isNotInitialized()) {
              return;
            }

            //根據(jù)key的hashCode通過(guò)一個(gè)哈希函數(shù)得到一個(gè)hash值
            //本來(lái)就是hashCode了,為什么還要再做一次hash?怕原來(lái)的hashCode不夠均勻分散,再打散一下。
            int hash = spread(e.hashCode());
            //這句光看有點(diǎn)難理解
            //就如我剛才說(shuō)的,Caffeine把一個(gè)long的64bit劃分成16個(gè)等分,每一等分4個(gè)bit。
            //這個(gè)start就是用來(lái)定位到是哪一個(gè)等分的,用hash值低兩位作為隨機(jī)數(shù),再左移2位,得到一個(gè)小于16的值
            int start = (hash & 3) << 2;

            //indexOf方法的意思就是,根據(jù)hash值和不同種子得到table的下標(biāo)index
            //這里通過(guò)四個(gè)不同的種子,得到四個(gè)不同的下標(biāo)index
            int index0 = indexOf(hash, 0);
            int index1 = indexOf(hash, 1);
            int index2 = indexOf(hash, 2);
            int index3 = indexOf(hash, 3);

            //根據(jù)index和start(+1, +2, +3)的值,把table[index]對(duì)應(yīng)的等分追加1
            //這個(gè)incrementAt方法有點(diǎn)難理解,看我下面的解釋
            boolean added = incrementAt(index0, start);
            added |= incrementAt(index1, start + 1);
            added |= incrementAt(index2, start + 2);
            added |= incrementAt(index3, start + 3);

            //這個(gè)reset等下說(shuō)
            if (added && (++size == sampleSize)) {
              reset();
            }
          }

          /**
           * Increments the specified counter by 1 if it is not already at the maximum value (15).
           *
           * @param i the table index (16 counters)
           * @param j the counter to increment
           * @return if incremented
           */

          boolean incrementAt(int i, int j) {
            //這個(gè)j表示16個(gè)等分的下標(biāo),那么offset就是相當(dāng)于在64位中的下標(biāo)(這個(gè)自己想想)
            int offset = j << 2;
            //上面提到Caffeine把頻率統(tǒng)計(jì)最大定為15,即0xfL
            //mask就是在64位中的掩碼,即1111后面跟很多個(gè)0
            long mask = (0xfL << offset);
            //如果&的結(jié)果不等于15,那么就追加1。等于15就不會(huì)再加了
            if ((table[i] & mask) != mask) {
              table[i] += (1L << offset);
              return true;
            }
            return false;
          }

          /**
           * Returns the table index for the counter at the specified depth.
           *
           * @param item the element's hash
           * @param i the counter depth
           * @return the table index
           */

          int indexOf(int item, int i) {
            long hash = SEED[i] * item;
            hash += hash >>> 32;
            return ((int) hash) & tableMask;
          }

          /**
           * Applies a supplemental hash function to a given hashCode, which defends against poor quality
           * hash functions.
           */

          int spread(int x) {
            x = ((x >>> 16) ^ x) * 0x45d9f3b;
            x = ((x >>> 16) ^ x) * 0x45d9f3b;
            return (x >>> 16) ^ x;
          }

          知道了追加方法,那么讀取方法frequency就很容易理解了。

          /**
           * Returns the estimated number of occurrences of an element, up to the maximum (15).
           *
           * @param e the element to count occurrences of
           * @return the estimated number of occurrences of the element; possibly zero but never negative
           */

          @NonNegative
          public int frequency(@NonNull E e) {
            if (isNotInitialized()) {
              return 0;
            }

            //得到hash值,跟上面一樣
            int hash = spread(e.hashCode());
            //得到等分的下標(biāo),跟上面一樣
            int start = (hash & 3) << 2;
            int frequency = Integer.MAX_VALUE;
            //循環(huán)四次,分別獲取在table數(shù)組中不同的下標(biāo)位置
            for (int i = 0; i < 4; i++) {
              int index = indexOf(hash, i);
              //這個(gè)操作就不多說(shuō)了,其實(shí)跟上面incrementAt是一樣的,定位到table[index] + 等分的位置,再根據(jù)mask取出計(jì)數(shù)值
              int count = (int) ((table[index] >>> ((start + i) << 2)) & 0xfL);
              //取四個(gè)中的較小值
              frequency = Math.min(frequency, count);
            }
            return frequency;
          }

          通過(guò)代碼和注釋或者讀者可能難以理解,下圖是我畫(huà)出來(lái)幫助大家理解的結(jié)構(gòu)圖。

          注意紫色虛線框,其中藍(lán)色小格就是需要計(jì)算的位置:


          保新機(jī)制

          為了讓緩存保持“新鮮”,剔除掉過(guò)往頻率很高但之后不經(jīng)常的緩存,Caffeine 有一個(gè) Freshness Mechanism。做法很簡(jiǎn)答,就是當(dāng)整體的統(tǒng)計(jì)計(jì)數(shù)(當(dāng)前所有記錄的頻率統(tǒng)計(jì)之和,這個(gè)數(shù)值內(nèi)部維護(hù))達(dá)到某一個(gè)值時(shí),那么所有記錄的頻率統(tǒng)計(jì)除以 2。

          從上面的代碼

          //size變量就是所有記錄的頻率統(tǒng)計(jì)之,即每個(gè)記錄加1,這個(gè)size都會(huì)加1
          //sampleSize一個(gè)閾值,從FrequencySketch初始化可以看到它的值為maximumSize的10倍
          if (added && (++size == sampleSize)) {
                reset();
          }

          看到reset方法就是做這個(gè)事情

          /** Reduces every counter by half of its original value. */
          void reset() {
            int count = 0;
            for (int i = 0; i < table.length; i++) {
              count += Long.bitCount(table[i] & ONE_MASK);
              table[i] = (table[i] >>> 1) & RESET_MASK;
            }
            size = (size >>> 1) - (count >>> 2);
          }

          關(guān)于這個(gè) reset 方法,為什么是除以 2,而不是其他,及其正確性,在最下面的參考資料的 TinyLFU 論文中 3.3 章節(jié)給出了數(shù)學(xué)證明,大家有興趣可以看看。

          增加一個(gè) Window?

          Caffeine 通過(guò)測(cè)試發(fā)現(xiàn) TinyLFU 在面對(duì)突發(fā)性的稀疏流量(sparse bursts)時(shí)表現(xiàn)很差,因?yàn)樾碌挠涗洠╪ew items)還沒(méi)來(lái)得及建立足夠的頻率就被剔除出去了,這就使得命中率下降。

          于是 Caffeine 設(shè)計(jì)出一種新的 policy,即 Window Tiny LFU(W-TinyLFU),并通過(guò)實(shí)驗(yàn)和實(shí)踐發(fā)現(xiàn) W-TinyLFU 比 TinyLFU 表現(xiàn)的更好。

          W-TinyLFU 的設(shè)計(jì)如下所示(兩圖等價(jià)):

          它主要包括兩個(gè)緩存模塊,主緩存是 SLRU(Segmented LRU,即分段 LRU),SLRU 包括一個(gè)名為 protected 和一個(gè)名為 probation 的緩存區(qū)。通過(guò)增加一個(gè)緩存區(qū)(即 Window Cache),當(dāng)有新的記錄插入時(shí),會(huì)先在 window 區(qū)呆一下,就可以避免上述說(shuō)的 sparse bursts 問(wèn)題。

          淘汰策略(eviction policy)

          當(dāng) window 區(qū)滿(mǎn)了,就會(huì)根據(jù) LRU 把 candidate(即淘汰出來(lái)的元素)放到 probation 區(qū),如果 probation 區(qū)也滿(mǎn)了,就把 candidate 和 probation 將要淘汰的元素 victim,兩個(gè)進(jìn)行“PK”,勝者留在 probation,輸者就要被淘汰了。

          而且經(jīng)過(guò)實(shí)驗(yàn)發(fā)現(xiàn)當(dāng) window 區(qū)配置為總?cè)萘康?1%,剩余的 99%當(dāng)中的 80%分給 protected 區(qū),20%分給 probation 區(qū)時(shí),這時(shí)整體性能和命中率表現(xiàn)得最好,所以 Caffeine 默認(rèn)的比例設(shè)置就是這個(gè)。

          不過(guò)這個(gè)比例 Caffeine 會(huì)在運(yùn)行時(shí)根據(jù)統(tǒng)計(jì)數(shù)據(jù)(statistics)去動(dòng)態(tài)調(diào)整,如果你的應(yīng)用程序的緩存隨著時(shí)間變化比較快的話,那么增加 window 區(qū)的比例可以提高命中率,相反緩存都是比較固定不變的話,增加 Main Cache 區(qū)(protected 區(qū) +probation 區(qū))的比例會(huì)有較好的效果。

          下面我們看看上面說(shuō)到的淘汰策略是怎么實(shí)現(xiàn)的:

          一般緩存對(duì)讀寫(xiě)操作后都有后續(xù)的一系列“維護(hù)”操作,Caffeine 也不例外,這些操作都在maintenance方法,我們將要說(shuō)到的淘汰策略也在里面。

          這方法比較重要,下面也會(huì)提到,所以這里只先說(shuō)跟“淘汰策略”有關(guān)的evictEntriesclimb

          /**
             * Performs the pending maintenance work and sets the state flags during processing to avoid
             * excess scheduling attempts. The read buffer, write buffer, and reference queues are
             * drained, followed by expiration, and size-based eviction.
             *
             * @param task an additional pending task to run, or {@code null} if not present
             */

            @GuardedBy("evictionLock")
            void maintenance(@Nullable Runnable task) {
              lazySetDrainStatus(PROCESSING_TO_IDLE);

              try {
                drainReadBuffer();

                drainWriteBuffer();
                if (task != null) {
                  task.run();
                }

                drainKeyReferences();
                drainValueReferences();

                expireEntries();
                //把符合條件的記錄淘汰掉
                evictEntries();
                //動(dòng)態(tài)調(diào)整window區(qū)和protected區(qū)的大小
                climb();
              } finally {
                if ((drainStatus() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {
                  lazySetDrainStatus(REQUIRED);
                }
              }
            }

          先說(shuō)一下 Caffeine 對(duì)上面說(shuō)到的 W-TinyLFU 策略的實(shí)現(xiàn)用到的數(shù)據(jù)結(jié)構(gòu):

          //最大的個(gè)數(shù)限制
          long maximum;
          //當(dāng)前的個(gè)數(shù)
          long weightedSize;
          //window區(qū)的最大限制
          long windowMaximum;
          //window區(qū)當(dāng)前的個(gè)數(shù)
          long windowWeightedSize;
          //protected區(qū)的最大限制
          long mainProtectedMaximum;
          //protected區(qū)當(dāng)前的個(gè)數(shù)
          long mainProtectedWeightedSize;
          //下一次需要調(diào)整的大小(還需要進(jìn)一步計(jì)算)
          double stepSize;
          //window區(qū)需要調(diào)整的大小
          long adjustment;
          //命中計(jì)數(shù)
          int hitsInSample;
          //不命中的計(jì)數(shù)
          int missesInSample;
          //上一次的緩存命中率
          double previousSampleHitRate;

          final FrequencySketch<K> sketch;
          //window區(qū)的LRU queue(FIFO)
          final AccessOrderDeque<Node<K, V>> accessOrderWindowDeque;
          //probation區(qū)的LRU queue(FIFO)
          final AccessOrderDeque<Node<K, V>> accessOrderProbationDeque;
          //protected區(qū)的LRU queue(FIFO)
          final AccessOrderDeque<Node<K, V>> accessOrderProtectedDeque;

          以及默認(rèn)比例設(shè)置(意思看注釋?zhuān)?/p>

          /** The initial percent of the maximum weighted capacity dedicated to the main space. */
          static final double PERCENT_MAIN = 0.99d;
          /** The percent of the maximum weighted capacity dedicated to the main's protected space. */
          static final double PERCENT_MAIN_PROTECTED = 0.80d;
          /** The difference in hit rates that restarts the climber. */
          static final double HILL_CLIMBER_RESTART_THRESHOLD = 0.05d;
          /** The percent of the total size to adapt the window by. */
          static final double HILL_CLIMBER_STEP_PERCENT = 0.0625d;
          /** The rate to decrease the step size to adapt by. */
          static final double HILL_CLIMBER_STEP_DECAY_RATE = 0.98d;
          /** The maximum number of entries that can be transfered between queues. */

          重點(diǎn)來(lái)了,evictEntriesclimb方法:

          /** Evicts entries if the cache exceeds the maximum. */
          @GuardedBy("evictionLock")
          void evictEntries() {
            if (!evicts()) {
              return;
            }
            //淘汰window區(qū)的記錄
            int candidates = evictFromWindow();
            //淘汰Main區(qū)的記錄
            evictFromMain(candidates);
          }

          /**
           * Evicts entries from the window space into the main space while the window size exceeds a
           * maximum.
           *
           * @return the number of candidate entries evicted from the window space
           */

          //根據(jù)W-TinyLFU,新的數(shù)據(jù)都會(huì)無(wú)條件的加到admission window
          //但是window是有大小限制,所以要“定期”做一下“維護(hù)”
          @GuardedBy("evictionLock")
          int evictFromWindow() {
            int candidates = 0;
            //查看window queue的頭部節(jié)點(diǎn)
            Node<K, V> node = accessOrderWindowDeque().peek();
            //如果window區(qū)超過(guò)了最大的限制,那么就要把“多出來(lái)”的記錄做處理
            while (windowWeightedSize() > windowMaximum()) {
              // The pending operations will adjust the size to reflect the correct weight
              if (node == null) {
                break;
              }
              //下一個(gè)節(jié)點(diǎn)
              Node<K, V> next = node.getNextInAccessOrder();
              if (node.getWeight() != 0) {
                //把node定位在probation區(qū)
                node.makeMainProbation();
                //從window區(qū)去掉
                accessOrderWindowDeque().remove(node);
                //加入到probation queue,相當(dāng)于把節(jié)點(diǎn)移動(dòng)到probation區(qū)(晉升了)
                accessOrderProbationDeque().add(node);
                candidates++;
                //因?yàn)橐瞥艘粋€(gè)節(jié)點(diǎn),所以需要調(diào)整window的size
                setWindowWeightedSize(windowWeightedSize() - node.getPolicyWeight());
              }
              //處理下一個(gè)節(jié)點(diǎn)
              node = next;
            }

            return candidates;
          }

          evictFromMain方法:

          /**
           * Evicts entries from the main space if the cache exceeds the maximum capacity. The main space
           * determines whether admitting an entry (coming from the window space) is preferable to retaining
           * the eviction policy's victim. This is decision is made using a frequency filter so that the
           * least frequently used entry is removed.
           *
           * The window space candidates were previously placed in the MRU position and the eviction
           * policy's victim is at the LRU position. The two ends of the queue are evaluated while an
           * eviction is required. The number of remaining candidates is provided and decremented on
           * eviction, so that when there are no more candidates the victim is evicted.
           *
           * @param candidates the number of candidate entries evicted from the window space
           */

          //根據(jù)W-TinyLFU,從window晉升過(guò)來(lái)的要跟probation區(qū)的進(jìn)行“PK”,勝者才能留下
          @GuardedBy("evictionLock")
          void evictFromMain(int candidates) {
            int victimQueue = PROBATION;
            //victim是probation queue的頭部
            Node<K, V> victim = accessOrderProbationDeque().peekFirst();
            //candidate是probation queue的尾部,也就是剛從window晉升來(lái)的
            Node<K, V> candidate = accessOrderProbationDeque().peekLast();
            //當(dāng)cache不夠容量時(shí)才做處理
            while (weightedSize() > maximum()) {
              // Stop trying to evict candidates and always prefer the victim
              if (candidates == 0) {
                candidate = null;
              }

              //對(duì)candidate為null且victim為bull的處理
              if ((candidate == null) && (victim == null)) {
                if (victimQueue == PROBATION) {
                  victim = accessOrderProtectedDeque().peekFirst();
                  victimQueue = PROTECTED;
                  continue;
                } else if (victimQueue == PROTECTED) {
                  victim = accessOrderWindowDeque().peekFirst();
                  victimQueue = WINDOW;
                  continue;
                }

                // The pending operations will adjust the size to reflect the correct weight
                break;
              }

              //對(duì)節(jié)點(diǎn)的weight為0的處理
              if ((victim != null) && (victim.getPolicyWeight() == 0)) {
                victim = victim.getNextInAccessOrder();
                continue;
              } else if ((candidate != null) && (candidate.getPolicyWeight() == 0)) {
                candidate = candidate.getPreviousInAccessOrder();
                candidates--;
                continue;
              }

              // Evict immediately if only one of the entries is present
              if (victim == null) {
                @SuppressWarnings("NullAway")
                Node<K, V> previous = candidate.getPreviousInAccessOrder();
                Node<K, V> evict = candidate;
                candidate = previous;
                candidates--;
                evictEntry(evict, RemovalCause.SIZE, 0L);
                continue;
              } else if (candidate == null) {
                Node<K, V> evict = victim;
                victim = victim.getNextInAccessOrder();
                evictEntry(evict, RemovalCause.SIZE, 0L);
                continue;
              }

              // Evict immediately if an entry was collected
              K victimKey = victim.getKey();
              K candidateKey = candidate.getKey();
              if (victimKey == null) {
                @NonNull Node<K, V> evict = victim;
                victim = victim.getNextInAccessOrder();
                evictEntry(evict, RemovalCause.COLLECTED, 0L);
                continue;
              } else if (candidateKey == null) {
                candidates--;
                @NonNull Node<K, V> evict = candidate;
                candidate = candidate.getPreviousInAccessOrder();
                evictEntry(evict, RemovalCause.COLLECTED, 0L);
                continue;
              }

              //放不下的節(jié)點(diǎn)直接處理掉
              if (candidate.getPolicyWeight() > maximum()) {
                candidates--;
                Node<K, V> evict = candidate;
                candidate = candidate.getPreviousInAccessOrder();
                evictEntry(evict, RemovalCause.SIZE, 0L);
                continue;
              }

              //根據(jù)節(jié)點(diǎn)的統(tǒng)計(jì)頻率frequency來(lái)做比較,看看要處理掉victim還是candidate
              //admit是具體的比較規(guī)則,看下面
              candidates--;
              //如果candidate勝出則淘汰victim
              if (admit(candidateKey, victimKey)) {
                Node<K, V> evict = victim;
                victim = victim.getNextInAccessOrder();
                evictEntry(evict, RemovalCause.SIZE, 0L);
                candidate = candidate.getPreviousInAccessOrder();
              } else {
                //如果是victim勝出,則淘汰candidate
                Node<K, V> evict = candidate;
                candidate = candidate.getPreviousInAccessOrder();
                evictEntry(evict, RemovalCause.SIZE, 0L);
              }
            }
          }

          /**
           * Determines if the candidate should be accepted into the main space, as determined by its
           * frequency relative to the victim. A small amount of randomness is used to protect against hash
           * collision attacks, where the victim's frequency is artificially raised so that no new entries
           * are admitted.
           *
           * @param candidateKey the key for the entry being proposed for long term retention
           * @param victimKey the key for the entry chosen by the eviction policy for replacement
           * @return if the candidate should be admitted and the victim ejected
           */

          @GuardedBy("evictionLock")
          boolean admit(K candidateKey, K victimKey) {
            //分別獲取victim和candidate的統(tǒng)計(jì)頻率
            //frequency這個(gè)方法的原理和實(shí)現(xiàn)上面已經(jīng)解釋了
            int victimFreq = frequencySketch().frequency(victimKey);
            int candidateFreq = frequencySketch().frequency(candidateKey);
            //誰(shuí)大誰(shuí)贏
            if (candidateFreq > victimFreq) {
              return true;

              //如果相等,candidate小于5都當(dāng)輸了
            } else if (candidateFreq <= 5) {
              // The maximum frequency is 15 and halved to 7 after a reset to age the history. An attack
              // exploits that a hot candidate is rejected in favor of a hot victim. The threshold of a warm
              // candidate reduces the number of random acceptances to minimize the impact on the hit rate.
              return false;
            }
            //如果相等且candidate大于5,則隨機(jī)淘汰一個(gè)
            int random = ThreadLocalRandom.current().nextInt();
            return ((random & 127) == 0);
          }

          climb方法主要是用來(lái)調(diào)整 window size 的,使得 Caffeine 可以適應(yīng)你的應(yīng)用類(lèi)型(如 OLAP 或 OLTP)表現(xiàn)出最佳的命中率。

          下圖是官方測(cè)試的數(shù)據(jù):

          我們看看 window size 的調(diào)整是怎么實(shí)現(xiàn)的。

          調(diào)整時(shí)用到的默認(rèn)比例數(shù)據(jù):

          //與上次命中率之差的閾值
          static final double HILL_CLIMBER_RESTART_THRESHOLD = 0.05d;
          //步長(zhǎng)(調(diào)整)的大小(跟最大值maximum的比例)
          static final double HILL_CLIMBER_STEP_PERCENT = 0.0625d;
          //步長(zhǎng)的衰減比例
          static final double HILL_CLIMBER_STEP_DECAY_RATE = 0.98d;
            /** Adapts the eviction policy to towards the optimal recency / frequency configuration. */
          //climb方法的主要作用就是動(dòng)態(tài)調(diào)整window區(qū)的大小(相應(yīng)的,main區(qū)的大小也會(huì)發(fā)生變化,兩個(gè)之和為100%)。
          //因?yàn)閰^(qū)域的大小發(fā)生了變化,那么區(qū)域內(nèi)的數(shù)據(jù)也可能需要發(fā)生相應(yīng)的移動(dòng)。
          @GuardedBy("evictionLock")
          void climb() {
            if (!evicts()) {
              return;
            }
            //確定window需要調(diào)整的大小
            determineAdjustment();
            //如果protected區(qū)有溢出,把溢出部分移動(dòng)到probation區(qū)。因?yàn)橄旅娴牟僮饔锌赡苄枰{(diào)整到protected區(qū)。
            demoteFromMainProtected();
            long amount = adjustment();
            if (amount == 0) {
              return;
            } else if (amount > 0) {
              //增加window的大小
              increaseWindow();
            } else {
              //減少window的大小
              decreaseWindow();
            }
          }

          下面分別展開(kāi)每個(gè)方法來(lái)解釋?zhuān)?/p>

          /** Calculates the amount to adapt the window by and sets {@link #adjustment()} accordingly. */
          @GuardedBy("evictionLock")
          void determineAdjustment() {
            //如果frequencySketch還沒(méi)初始化,則返回
            if (frequencySketch().isNotInitialized()) {
              setPreviousSampleHitRate(0.0);
              setMissesInSample(0);
              setHitsInSample(0);
              return;
            }
            //總請(qǐng)求量 = 命中 + miss
            int requestCount = hitsInSample() + missesInSample();
            //沒(méi)達(dá)到sampleSize則返回
            //默認(rèn)下sampleSize = 10 * maximum。用sampleSize來(lái)判斷緩存是否足夠”熱“。
            if (requestCount < frequencySketch().sampleSize) {
              return;
            }

            //命中率的公式 = 命中 / 總請(qǐng)求
            double hitRate = (double) hitsInSample() / requestCount;
            //命中率的差值
            double hitRateChange = hitRate - previousSampleHitRate();
            //本次調(diào)整的大小,是由命中率的差值和上次的stepSize決定的
            double amount = (hitRateChange >= 0) ? stepSize() : -stepSize();
            //下次的調(diào)整大小:如果命中率的之差大于0.05,則重置為0.065 * maximum,否則按照0.98來(lái)進(jìn)行衰減
            double nextStepSize = (Math.abs(hitRateChange) >= HILL_CLIMBER_RESTART_THRESHOLD)
                ? HILL_CLIMBER_STEP_PERCENT * maximum() * (amount >= 0 ? 1 : -1)
                : HILL_CLIMBER_STEP_DECAY_RATE * amount;
            setPreviousSampleHitRate(hitRate);
            setAdjustment((long) amount);
            setStepSize(nextStepSize);
            setMissesInSample(0);
            setHitsInSample(0);
          }

          /** Transfers the nodes from the protected to the probation region if it exceeds the maximum. */

          //這個(gè)方法比較簡(jiǎn)單,減少protected區(qū)溢出的部分
          @GuardedBy("evictionLock")
          void demoteFromMainProtected() {
            long mainProtectedMaximum = mainProtectedMaximum();
            long mainProtectedWeightedSize = mainProtectedWeightedSize();
            if (mainProtectedWeightedSize <= mainProtectedMaximum) {
              return;
            }

            for (int i = 0; i < QUEUE_TRANSFER_THRESHOLD; i++) {
              if (mainProtectedWeightedSize <= mainProtectedMaximum) {
                break;
              }

              Node<K, V> demoted = accessOrderProtectedDeque().poll();
              if (demoted == null) {
                break;
              }
              demoted.makeMainProbation();
              accessOrderProbationDeque().add(demoted);
              mainProtectedWeightedSize -= demoted.getPolicyWeight();
            }
            setMainProtectedWeightedSize(mainProtectedWeightedSize);
          }

          /**
           * Increases the size of the admission window by shrinking the portion allocated to the main
           * space. As the main space is partitioned into probation and protected regions (80% / 20%), for
           * simplicity only the protected is reduced. If the regions exceed their maximums, this may cause
           * protected items to be demoted to the probation region and probation items to be demoted to the
           * admission window.
           */


          //增加window區(qū)的大小,這個(gè)方法比較簡(jiǎn)單,思路就像我上面說(shuō)的
          @GuardedBy("evictionLock")
          void increaseWindow() {
            if (mainProtectedMaximum() == 0) {
              return;
            }

            long quota = Math.min(adjustment(), mainProtectedMaximum());
            setMainProtectedMaximum(mainProtectedMaximum() - quota);
            setWindowMaximum(windowMaximum() + quota);
            demoteFromMainProtected();

            for (int i = 0; i < QUEUE_TRANSFER_THRESHOLD; i++) {
              Node<K, V> candidate = accessOrderProbationDeque().peek();
              boolean probation = true;
              if ((candidate == null) || (quota < candidate.getPolicyWeight())) {
                candidate = accessOrderProtectedDeque().peek();
                probation = false;
              }
              if (candidate == null) {
                break;
              }

              int weight = candidate.getPolicyWeight();
              if (quota < weight) {
                break;
              }

              quota -= weight;
              if (probation) {
                accessOrderProbationDeque().remove(candidate);
              } else {
                setMainProtectedWeightedSize(mainProtectedWeightedSize() - weight);
                accessOrderProtectedDeque().remove(candidate);
              }
              setWindowWeightedSize(windowWeightedSize() + weight);
              accessOrderWindowDeque().add(candidate);
              candidate.makeWindow();
            }

            setMainProtectedMaximum(mainProtectedMaximum() + quota);
            setWindowMaximum(windowMaximum() - quota);
            setAdjustment(quota);
          }

          /** Decreases the size of the admission window and increases the main's protected region. */
          //同上increaseWindow差不多,反操作
          @GuardedBy("evictionLock")
          void decreaseWindow() {
            if (windowMaximum() <= 1) {
              return;
            }

            long quota = Math.min(-adjustment(), Math.max(0, windowMaximum() - 1));
            setMainProtectedMaximum(mainProtectedMaximum() + quota);
            setWindowMaximum(windowMaximum() - quota);

            for (int i = 0; i < QUEUE_TRANSFER_THRESHOLD; i++) {
              Node<K, V> candidate = accessOrderWindowDeque().peek();
              if (candidate == null) {
                break;
              }

              int weight = candidate.getPolicyWeight();
              if (quota < weight) {
                break;
              }

              quota -= weight;
              setMainProtectedWeightedSize(mainProtectedWeightedSize() + weight);
              setWindowWeightedSize(windowWeightedSize() - weight);
              accessOrderWindowDeque().remove(candidate);
              accessOrderProbationDeque().add(candidate);
              candidate.makeMainProbation();
            }

            setMainProtectedMaximum(mainProtectedMaximum() - quota);
            setWindowMaximum(windowMaximum() + quota);
            setAdjustment(-quota);
          }

          以上,是 Caffeine 的 W-TinyLFU 策略的設(shè)計(jì)原理及代碼實(shí)現(xiàn)解析。

          異步的高性能讀寫(xiě)

          一般的緩存每次對(duì)數(shù)據(jù)處理完之后(讀的話,已經(jīng)存在則直接返回,不存在則 load 數(shù)據(jù),保存,再返回;寫(xiě)的話,則直接插入或更新),但是因?yàn)橐S護(hù)一些淘汰策略,則需要一些額外的操作,諸如:

          • 計(jì)算和比較數(shù)據(jù)的是否過(guò)期
          • 統(tǒng)計(jì)頻率(像 LFU 或其變種)
          • 維護(hù) read queue 和 write queue
          • 淘汰符合條件的數(shù)據(jù)
          • 等等。。。

          這種數(shù)據(jù)的讀寫(xiě)伴隨著緩存狀態(tài)的變更,Guava Cache 的做法是把這些操作和讀寫(xiě)操作放在一起,在一個(gè)同步加鎖的操作中完成,雖然 Guava Cache 巧妙地利用了 JDK 的 ConcurrentHashMap(分段鎖或者無(wú)鎖 CAS)來(lái)降低鎖的密度,達(dá)到提高并發(fā)度的目的。但是,對(duì)于一些熱點(diǎn)數(shù)據(jù),這種做法還是避免不了頻繁的鎖競(jìng)爭(zhēng)。Caffeine 借鑒了數(shù)據(jù)庫(kù)系統(tǒng)的 WAL(Write-Ahead Logging)思想,即先寫(xiě)日志再執(zhí)行操作,這種思想同樣適合緩存的,執(zhí)行讀寫(xiě)操作時(shí),先把操作記錄在緩沖區(qū),然后在合適的時(shí)機(jī)異步、批量地執(zhí)行緩沖區(qū)中的內(nèi)容。但在執(zhí)行緩沖區(qū)的內(nèi)容時(shí),也是需要在緩沖區(qū)加上同步鎖的,不然存在并發(fā)問(wèn)題,只不過(guò)這樣就可以把對(duì)鎖的競(jìng)爭(zhēng)從緩存數(shù)據(jù)轉(zhuǎn)移到對(duì)緩沖區(qū)上。

          ReadBuffer

          在 Caffeine 的內(nèi)部實(shí)現(xiàn)中,為了很好的支持不同的 Features(如 Eviction,Removal,Refresh,Statistics,Cleanup,Policy 等等),擴(kuò)展了很多子類(lèi),它們共同的父類(lèi)是BoundedLocalCache,而readBuffer就是作為它們共有的屬性,即都是用一樣的 readBuffer,看定義:

          final Buffer<Node<K, V>> readBuffer;

          readBuffer = evicts() || collectKeys() || collectValues() || expiresAfterAccess()
                  ? new BoundedBuffer<>()
                  : Buffer.disabled();

          上面提到 Caffeine 對(duì)每次緩存的讀操作都會(huì)觸發(fā)afterRead

          /**
           * Performs the post-processing work required after a read.
           *
           * @param node the entry in the page replacement policy
           * @param now the current time, in nanoseconds
           * @param recordHit if the hit count should be incremented
           */

          void afterRead(Node<K, V> node, long now, boolean recordHit) {
            if (recordHit) {
              statsCounter().recordHits(1);
            }
            //把記錄加入到readBuffer
            //判斷是否需要立即處理readBuffer
            //注意這里無(wú)論offer是否成功都可以走下去的,即允許寫(xiě)入readBuffer丟失,因?yàn)檫@個(gè)
            boolean delayable = skipReadBuffer() || (readBuffer.offer(node) != Buffer.FULL);
            if (shouldDrainBuffers(delayable)) {
              scheduleDrainBuffers();
            }
            refreshIfNeeded(node, now);
          }

           /**
             * Returns whether maintenance work is needed.
             *
             * @param delayable if draining the read buffer can be delayed
             */


            //caffeine用了一組狀態(tài)來(lái)定義和管理“維護(hù)”的過(guò)程
            boolean shouldDrainBuffers(boolean delayable) {
              switch (drainStatus()) {
                case IDLE:
                  return !delayable;
                case REQUIRED:
                  return true;
                case PROCESSING_TO_IDLE:
                case PROCESSING_TO_REQUIRED:
                  return false;
                default:
                  throw new IllegalStateException();
              }
            }

          重點(diǎn)看BoundedBuffer

          /**
           * A striped, non-blocking, bounded buffer.
           *
           * @author [email protected] (Ben Manes)
           * @param <E> the type of elements maintained by this buffer
           */

          final class BoundedBuffer<Eextends StripedBuffer<E>

          它是一個(gè) striped、非阻塞、有界限的 buffer,繼承于StripedBuffer類(lèi)。下面看看StripedBuffer的實(shí)現(xiàn):

          /**
           * A base class providing the mechanics for supporting dynamic striping of bounded buffers. This
           * implementation is an adaption of the numeric 64-bit {@link java.util.concurrent.atomic.Striped64}
           * class, which is used by atomic counters. The approach was modified to lazily grow an array of
           * buffers in order to minimize memory usage for caches that are not heavily contended on.
           *
           * @author [email protected] (Doug Lea)
           * @author [email protected] (Ben Manes)
           */


          abstract class StripedBuffer<Eimplements Buffer<E>

          這個(gè)StripedBuffer設(shè)計(jì)的思想是跟Striped64類(lèi)似的,通過(guò)擴(kuò)展結(jié)構(gòu)把競(jìng)爭(zhēng)熱點(diǎn)分離。

          具體實(shí)現(xiàn)是這樣的,StripedBuffer維護(hù)一個(gè)Buffer[]數(shù)組,每個(gè)元素就是一個(gè)RingBuffer,每個(gè)線程用自己threadLocalRandomProbe屬性作為 hash 值,這樣就相當(dāng)于每個(gè)線程都有自己“專(zhuān)屬”的RingBuffer,就不會(huì)產(chǎn)生競(jìng)爭(zhēng)啦,而不是用 key 的hashCode作為 hash 值,因?yàn)闀?huì)產(chǎn)生熱點(diǎn)數(shù)據(jù)問(wèn)題。

          看看StripedBuffer的屬性

          /** Table of buffers. When non-null, size is a power of 2. */
          //RingBuffer數(shù)組
          transient volatile Buffer<E> @Nullable[] table;

          //當(dāng)進(jìn)行resize時(shí),需要整個(gè)table鎖住。tableBusy作為CAS的標(biāo)記。
          static final long TABLE_BUSY = UnsafeAccess.objectFieldOffset(StripedBuffer.class, "tableBusy");
          static final long PROBE = UnsafeAccess.objectFieldOffset(Thread.class, "threadLocalRandomProbe");

          /** Number of CPUS. */
          static final int NCPU = Runtime.getRuntime().availableProcessors();

          /** The bound on the table size. */
          //table最大size
          static final int MAXIMUM_TABLE_SIZE = 4 * ceilingNextPowerOfTwo(NCPU);

          /** The maximum number of attempts when trying to expand the table. */
          //如果發(fā)生競(jìng)爭(zhēng)時(shí)(CAS失敗)的嘗試次數(shù)
          static final int ATTEMPTS = 3;

          /** Table of buffers. When non-null, size is a power of 2. */
          //核心數(shù)據(jù)結(jié)構(gòu)
          transient volatile Buffer<E> @Nullable[] table;

          /** Spinlock (locked via CAS) used when resizing and/or creating Buffers. */
          transient volatile int tableBusy;

          /** CASes the tableBusy field from 0 to 1 to acquire lock. */
          final boolean casTableBusy() {
            return UnsafeAccess.UNSAFE.compareAndSwapInt(this, TABLE_BUSY, 01);
          }

          /**
           * Returns the probe value for the current thread. Duplicated from ThreadLocalRandom because of
           * packaging restrictions.
           */

          static final int getProbe() {
            return UnsafeAccess.UNSAFE.getInt(Thread.currentThread(), PROBE);
          }

          offer方法,當(dāng)沒(méi)初始化或存在競(jìng)爭(zhēng)時(shí),則擴(kuò)容為 2 倍。

          實(shí)際是調(diào)用RingBuffer的 offer 方法,把數(shù)據(jù)追加到RingBuffer后面。

          @Override
          public int offer(E e) {
            int mask;
            int result = 0;
            Buffer<E> buffer;
            //是否不存在競(jìng)爭(zhēng)
            boolean uncontended = true;
            Buffer<E>[] buffers = table
            //是否已經(jīng)初始化
            if ((buffers == null)
                || (mask = buffers.length - 1) < 0
                //用thread的隨機(jī)值作為hash值,得到對(duì)應(yīng)位置的RingBuffer
                || (buffer = buffers[getProbe() & mask]) == null
                //檢查追加到RingBuffer是否成功
                || !(uncontended = ((result = buffer.offer(e)) != Buffer.FAILED))) {
              //其中一個(gè)符合條件則進(jìn)行擴(kuò)容
              expandOrRetry(e, uncontended);
            }
            return result;
          }

          /**
           * Handles cases of updates involving initialization, resizing, creating new Buffers, and/or
           * contention. See above for explanation. This method suffers the usual non-modularity problems of
           * optimistic retry code, relying on rechecked sets of reads.
           *
           * @param e the element to add
           * @param wasUncontended false if CAS failed before call
           */


          //這個(gè)方法比較長(zhǎng),但思路還是相對(duì)清晰的。
          @SuppressWarnings("PMD.ConfusingTernary")
          final void expandOrRetry(E e, boolean wasUncontended) {
            int h;
            if ((h = getProbe()) == 0) {
              ThreadLocalRandom.current(); // force initialization
              h = getProbe();
              wasUncontended = true;
            }
            boolean collide = false// True if last slot nonempty
            for (int attempt = 0; attempt < ATTEMPTS; attempt++) {
              Buffer<E>[] buffers;
              Buffer<E> buffer;
              int n;
              if (((buffers = table) != null) && ((n = buffers.length) > 0)) {
                if ((buffer = buffers[(n - 1) & h]) == null) {
                  if ((tableBusy == 0) && casTableBusy()) { // Try to attach new Buffer
                    boolean created = false;
                    try { // Recheck under lock
                      Buffer<E>[] rs;
                      int mask, j;
                      if (((rs = table) != null) && ((mask = rs.length) > 0)
                          && (rs[j = (mask - 1) & h] == null)) {
                        rs[j] = create(e);
                        created = true;
                      }
                    } finally {
                      tableBusy = 0;
                    }
                    if (created) {
                      break;
                    }
                    continue// Slot is now non-empty
                  }
                  collide = false;
                } else if (!wasUncontended) { // CAS already known to fail
                  wasUncontended = true;      // Continue after rehash
                } else if (buffer.offer(e) != Buffer.FAILED) {
                  break;
                } else if (n >= MAXIMUM_TABLE_SIZE || table != buffers) {
                  collide = false// At max size or stale
                } else if (!collide) {
                  collide = true;
                } else if (tableBusy == 0 && casTableBusy()) {
                  try {
                    if (table == buffers) { // Expand table unless stale
                      table = Arrays.copyOf(buffers, n << 1);
                    }
                  } finally {
                    tableBusy = 0;
                  }
                  collide = false;
                  continue// Retry with expanded table
                }
                h = advanceProbe(h);
              } else if ((tableBusy == 0) && (table == buffers) && casTableBusy()) {
                boolean init = false;
                try { // Initialize table
                  if (table == buffers) {
                    @SuppressWarnings({"unchecked""rawtypes"})
                    Buffer<E>[] rs = new Buffer[1];
                    rs[0] = create(e);
                    table = rs;
                    init = true;
                  }
                } finally {
                  tableBusy = 0;
                }
                if (init) {
                  break;
                }
              }
            }
          }

          最后看看RingBuffer,注意RingBufferBoundedBuffer的內(nèi)部類(lèi)。

          /** The maximum number of elements per buffer. */
          static final int BUFFER_SIZE = 16;

          // Assume 4-byte references and 64-byte cache line (16 elements per line)
          //256長(zhǎng)度,但是是以16為單位,所以最多存放16個(gè)元素
          static final int SPACED_SIZE = BUFFER_SIZE << 4;
          static final int SPACED_MASK = SPACED_SIZE - 1;
          static final int OFFSET = 16;
          //RingBuffer數(shù)組
          final AtomicReferenceArray<E> buffer;

           //插入方法
           @Override
           public int offer(E e) {
             long head = readCounter;
             long tail = relaxedWriteCounter();
             //用head和tail來(lái)限制個(gè)數(shù)
             long size = (tail - head);
             if (size >= SPACED_SIZE) {
               return Buffer.FULL;
             }
             //tail追加16
             if (casWriteCounter(tail, tail + OFFSET)) {
               //用tail“取余”得到下標(biāo)
               int index = (int) (tail & SPACED_MASK);
               //用unsafe.putOrderedObject設(shè)值
               buffer.lazySet(index, e);
               return Buffer.SUCCESS;
             }
             //如果CAS失敗則返回失敗
             return Buffer.FAILED;
           }

           //用consumer來(lái)處理buffer的數(shù)據(jù)
           @Override
           public void drainTo(Consumer<E> consumer) {
             long head = readCounter;
             long tail = relaxedWriteCounter();
             //判斷數(shù)據(jù)多少
             long size = (tail - head);
             if (size == 0) {
               return;
             }
             do {
               int index = (int) (head & SPACED_MASK);
               E e = buffer.get(index);
               if (e == null) {
                 // not published yet
                 break;
               }
               buffer.lazySet(index, null);
               consumer.accept(e);
               //head也跟tail一樣,每次遞增16
               head += OFFSET;
             } while (head != tail);
             lazySetReadCounter(head);
           }

          注意,ring buffer 的 size(固定是 16 個(gè))是不變的,變的是 head 和 tail 而已。

          總的來(lái)說(shuō)ReadBuffer有如下特點(diǎn):

          • 使用 Striped-RingBuffer來(lái)提升對(duì) buffer 的讀寫(xiě)
          • 用 thread 的 hash 來(lái)避開(kāi)熱點(diǎn) key 的競(jìng)爭(zhēng)
          • 允許寫(xiě)入的丟失

          WriteBuffer

          writeBufferreadBuffer不一樣,主要體現(xiàn)在使用場(chǎng)景的不一樣。本來(lái)緩存的一般場(chǎng)景是讀多寫(xiě)少的,讀的并發(fā)會(huì)更高,且 afterRead 顯得沒(méi)那么重要,允許延遲甚至丟失。寫(xiě)不一樣,寫(xiě)afterWrite不允許丟失,且要求盡量馬上執(zhí)行。Caffeine 使用MPSC(Multiple Producer / Single Consumer)作為 buffer 數(shù)組,實(shí)現(xiàn)在MpscGrowableArrayQueue類(lèi),它是仿照JCToolsMpscGrowableArrayQueue來(lái)寫(xiě)的。

          MPSC 允許無(wú)鎖的高并發(fā)寫(xiě)入,但只允許一個(gè)消費(fèi)者,同時(shí)也犧牲了部分操作。

          MPSC 我打算另外分析,這里不展開(kāi)了。

          TimerWheel

          除了支持expireAfterAccessexpireAfterWrite之外(Guava Cache 也支持這兩個(gè)特性),Caffeine 還支持expireAfter。因?yàn)?code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;background-color: rgba(27, 31, 35, 0.05);font-family: 'Operator Mono', Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(150, 84, 181);">expireAfterAccess和expireAfterWrite都只能是固定的過(guò)期時(shí)間,這可能滿(mǎn)足不了某些場(chǎng)景,譬如記錄的過(guò)期時(shí)間是需要根據(jù)某些條件而不一樣的,這就需要用戶(hù)自定義過(guò)期時(shí)間。

          先看看expireAfter的用法

          private static LoadingCache<String, String> cache = Caffeine.newBuilder()
                  .maximumSize(256L)
                  .initialCapacity(1)
                  //.expireAfterAccess(2, TimeUnit.DAYS)
                  //.expireAfterWrite(2, TimeUnit.HOURS)
                  .refreshAfterWrite(1, TimeUnit.HOURS)
                  //自定義過(guò)期時(shí)間
                  .expireAfter(new Expiry<String, String>() {
                      //返回創(chuàng)建后的過(guò)期時(shí)間
                      @Override
                      public long expireAfterCreate(@NonNull String key, @NonNull String value, long currentTime) {
                          return 0;
                      }

                      //返回更新后的過(guò)期時(shí)間
                      @Override
                      public long expireAfterUpdate(@NonNull String key, @NonNull String value, long currentTime, @NonNegative long currentDuration) {
                          return 0;
                      }

                      //返回讀取后的過(guò)期時(shí)間
                      @Override
                      public long expireAfterRead(@NonNull String key, @NonNull String value, long currentTime, @NonNegative long currentDuration) {
                          return 0;
                      }
                  })
                  .recordStats()
                  .build(new CacheLoader<String, String>() {
                      @Nullable
                      @Override
                      public String load(@NonNull String key) throws Exception {
                          return "value_" + key;
                      }
                  });

          通過(guò)自定義過(guò)期時(shí)間,使得不同的 key 可以動(dòng)態(tài)的得到不同的過(guò)期時(shí)間。

          注意,我把expireAfterAccessexpireAfterWrite注釋了,因?yàn)檫@兩個(gè)特性不能跟expireAfter一起使用。

          而當(dāng)使用了expireAfter特性后,Caffeine 會(huì)啟用一種叫“時(shí)間輪”的算法來(lái)實(shí)現(xiàn)這個(gè)功能。更多關(guān)于時(shí)間輪的介紹,可以看我的文章HashedWheelTimer 時(shí)間輪原理分析[6]

          好,重點(diǎn)來(lái)了,為什么要用時(shí)間輪?

          對(duì)expireAfterAccessexpireAfterWrite的實(shí)現(xiàn)是用一個(gè)AccessOrderDeque雙端隊(duì)列,它是 FIFO 的,因?yàn)樗鼈兊倪^(guò)期時(shí)間是固定的,所以在隊(duì)列頭的數(shù)據(jù)肯定是最早過(guò)期的,要處理過(guò)期數(shù)據(jù)時(shí),只需要首先看看頭部是否過(guò)期,然后再挨個(gè)檢查就可以了。但是,如果過(guò)期時(shí)間不一樣的話,這需要對(duì)accessOrderQueue進(jìn)行排序&插入,這個(gè)代價(jià)太大了。于是,Caffeine 用了一種更加高效、優(yōu)雅的算法-時(shí)間輪。

          時(shí)間輪的結(jié)構(gòu):

          因?yàn)樵谖业膶?duì)時(shí)間輪分析的文章里已經(jīng)說(shuō)了時(shí)間輪的原理和機(jī)制了,所以我就不展開(kāi) Caffeine 對(duì)時(shí)間輪的實(shí)現(xiàn)了。

          Caffeine 對(duì)時(shí)間輪的實(shí)現(xiàn)在TimerWheel,它是一種多層時(shí)間輪(hierarchical timing wheels )。

          看看元素加入到時(shí)間輪的schedule方法:

          /**
           * Schedules a timer event for the node.
           *
           * @param node the entry in the cache
           */

          public void schedule(@NonNull Node<K, V> node) {
            Node<K, V> sentinel = findBucket(node.getVariableTime());
            link(sentinel, node);
          }

          /**
           * Determines the bucket that the timer event should be added to.
           *
           * @param time the time when the event fires
           * @return the sentinel at the head of the bucket
           */

          Node<K, V> findBucket(long time) {
            long duration = time - nanos;
            int length = wheel.length - 1;
            for (int i = 0; i < length; i++) {
              if (duration < SPANS[i + 1]) {
                long ticks = (time >>> SHIFT[i]);
                int index = (int) (ticks & (wheel[i].length - 1));
                return wheel[i][index];
              }
            }
            return wheel[length][0];
          }

          /** Adds the entry at the tail of the bucket's list. */
          void link(Node<K, V> sentinel, Node<K, V> node) {
            node.setPreviousInVariableOrder(sentinel.getPreviousInVariableOrder());
            node.setNextInVariableOrder(sentinel);

            sentinel.getPreviousInVariableOrder().setNextInVariableOrder(node);
            sentinel.setPreviousInVariableOrder(node);
          }

          其他

          Caffeine 還有其他的優(yōu)化性能的手段,如使用軟引用和弱引用、消除偽共享、CompletableFuture異步等等。

          總結(jié)

          Caffeien 是一個(gè)優(yōu)秀的本地緩存,通過(guò)使用 W-TinyLFU 算法, 高性能的 readBuffer 和 WriteBuffer,時(shí)間輪算法等,使得它擁有高性能,高命中率(near optimal),低內(nèi)存占用等特點(diǎn)。

          參考資料

          TinyLFU 論文[7]

          Design Of A Modern Cache[8]

          Design Of A Modern Cache—Part Deux[9]

          Caffeine 的 github[10]

          參考資料

          [1]

          Caffeine: https://github.com/ben-manes/caffeine

          [2]

          這里: https://albenw.github.io/posts/df42dc84/

          [3]

          Benchmarks: https://github.com/ben-manes/caffeine/wiki/Benchmarks

          [4]

          官方API說(shuō)明文檔: https://github.com/ben-manes/caffeine/wiki

          [5]

          這里: https://github.com/ben-manes/caffeine/wiki/Guava

          [6]

          HashedWheelTimer時(shí)間輪原理分析: https://albenw.github.io/posts/ec8df8c/

          [7]

          TinyLFU論文: https://arxiv.org/abs/1512.00727

          [8]

          Design Of A Modern Cache: http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html

          [9]

          Design Of A Modern Cache—Part Deux: http://highscalability.com/blog/2019/2/25/design-of-a-modern-cachepart-deux.html

          [10]

          Caffeine的github: https://github.com/ben-manes/caffeine

          來(lái)源albenw.github.io/posts/a4ae1aa2

          覺(jué)得不錯(cuò),請(qǐng)點(diǎn)個(gè)在看

          瀏覽 56
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日逼黄色视频 | 大香蕉欧美视频 | 国产一级a毛一级做a爱 | 豆花视频网站在线观看 | av在线官网 |