<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          ConcurrentHashMap 是如何保證線程安全的,你知道么?

          共 4992字,需瀏覽 10分鐘

           ·

          2022-05-10 00:24

          點擊下方“IT牧場”,選擇“設為星標”

          blog.csdn.net/qq_41737716/article/details/90549847

          • 01、前言
          • 02、相關概念
            • 03、Amdahl定律
          • 04、初始化數據結構時的線程安全
            • 05、總結
          • 06、put操作的線程安全
            • 07、總結
          • 08、擴容操作的線程安全
            • 09、擴容時的get操作
            • 10、多線程協助擴容
            • 11、在什么情況下會進行擴容操作?
            • 12、總結
          • 13、統計容器大小的線程安全
            • 14、假設當前線程為第一個put的線程
            • 15、出現了線程競爭導致CAS失敗
            • 16、計數桶擴容
            • 17、總結
          • 18、get操作的線程安全
          • 19、JDK1.7與1.8的不同實現
          • 20、總結

          01、前言

          閱讀此篇文章,你需要有以下知識基礎

          • Java內存模型,可見性問題
          • CAS
          • HashMap底層原理

          我們知道,在日常開發(fā)中使用的HashMap是線程不安全的,而線程安全類HashTable只是簡單的在方法上加鎖實現線程安全,效率低下,所以在線程安全的環(huán)境下我們通常會使用ConcurrentHashMap,但是又為何需要學習ConcurrentHashMap?用不就完事了?我認為學習其源碼有兩個好處:

          1. 更靈活的運用ConcurrentHashMap
          2. 欣賞并發(fā)編程大師Doug Lea的作品,源碼中有很多值得我們學習的并發(fā)思想,要意識到,線程安全不僅僅只是加鎖

          我拋出以下問題:

          • ConcurrentHashMap是怎么做到線程安全的?
            • get方法如何線程安全地獲取key、value?
            • put方法如何線程安全地設置key、value?
            • size方法如果線程安全地獲取容器容量?
            • 底層數據結構擴容時如果保證線程安全?
            • 初始化數據結構時如果保證線程安全?
          • ConcurrentHashMap并發(fā)效率是如何提高的?
            • 和加鎖相比較,為什么它比HashTable效率高?

          接下來,帶著問題來繼續(xù)看下去,欣賞并發(fā)大師精妙絕倫的并發(fā)藝術作品(以下討論基于JDK1.8)

          02、相關概念

          03、Amdahl定律

          此節(jié)定律描述均來自《Java并發(fā)編程實戰(zhàn)》一書

          假設F是必須被串行執(zhí)行的部分,N代表處理器數量,Speedup代表加速比,可以簡單理解為CPU使用率此公式告訴我們,當N趨近無限大,加速比最大趨近于1/F,假設我們的程序有50%的部分需要串行執(zhí)行,就算處理器數量無限多,最高的加速比只能是2(20%的使用率),如果程序中僅有10%的部分需要串行執(zhí)行,最高的加速比可以達到9.2(92%的使用率),但我們的程序或多或少都一定會有串行執(zhí)行的部分,所以F不可能為0,所以,就算有無限多的CPU,加速比也不可能達到10(100%的使用率),下面給一張圖來表示串行執(zhí)行部分占比不同對利用率的影響:由此我們可以看出,程序中的可伸縮性(提升外部資源即可提升并發(fā)性能的比率)是由程序中串行執(zhí)行部分所影響的,而常見的串行執(zhí)行有鎖競爭(上下文切換消耗、等待、串行)等等,這給了我們一個啟發(fā),可以通過減少鎖競爭來優(yōu)化并發(fā)性能,而ConcurrentHashMap則使用了鎖分段(減小鎖范圍)、CAS(樂觀鎖,減小上下文切換開銷,無阻塞)等等技術,下面來具體看看吧

          04、初始化數據結構時的線程安全

          HashMap的底層數據結構這里簡單帶過一下,不做過多贅述:大致是以一個Node對象數組來存放數據,Hash沖突時會形成Node鏈表,在鏈表長度超過8,Node數組超過64時會將鏈表結構轉換為紅黑樹,Node對象:

          static?class?Node<K,V>?implements?Map.Entry<K,V>?{
          ??final?int?hash;
          ??final?K?key;
          ??volatile?V?val;
          ??volatile?Node?next;
          ??...
          }

          值得注意的是,value和next指針使用了volatile來保證其可見性

          在JDK1.8中,初始化ConcurrentHashMap的時候這個Node[]數組是還未初始化的,會等到第一次put方法調用時才初始化:

          final?V?putVal(K?key,?V?value,?boolean?onlyIfAbsent)?{
          ????????if?(key?==?null?||?value?==?null)?throw?new?NullPointerException();
          ????????int?hash?=?spread(key.hashCode());
          ????????int?binCount?=?0;
          ????????for?(Node[]?tab?=?table;;)?{
          ????????????Node?f;?int?n,?i,?fh;
          ????????????//判斷Node數組為空
          ????????????if?(tab?==?null?||?(n?=?tab.length)?==?0)
          ????????????????//初始化Node數組
          ????????????????tab?=?initTable();
          ??????????...
          }

          此時是會有并發(fā)問題的,如果多個線程同時調用initTable初始化Node數組怎么辦?看看大師是如何處理的:

          private?final?Node[]?initTable()?{
          ??Node[]?tab;?int?sc;
          ??//每次循環(huán)都獲取最新的Node數組引用
          ??while?((tab?=?table)?==?null?||?tab.length?==?0)?{
          ????//sizeCtl是一個標記位,若為-1也就是小于0,代表有線程在進行初始化工作了
          ????if?((sc?=?sizeCtl)?0)
          ??????//讓出CPU時間片
          ??????Thread.yield();?//?lost?initialization?race;?just?spin
          ????//CAS操作,將本實例的sizeCtl變量設置為-1
          ????else?if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?-1))?{
          ??????//如果CAS操作成功了,代表本線程將負責初始化工作
          ??????try?{
          ????????//再檢查一遍數組是否為空
          ????????if?((tab?=?table)?==?null?||?tab.length?==?0)?{
          ??????????//在初始化Map時,sizeCtl代表數組大小,默認16
          ??????????//所以此時n默認為16
          ??????????int?n?=?(sc?>?0)???sc?:?DEFAULT_CAPACITY;
          ??????????@SuppressWarnings("unchecked")
          ??????????//Node數組
          ??????????Node[]?nt?=?(Node[])new?Node[n];
          ??????????//將其賦值給table變量
          ??????????table?=?tab?=?nt;
          ??????????//通過位運算,n減去n二進制右移2位,相當于乘以0.75
          ??????????//例如16經過運算為12,與乘0.75一樣,只不過位運算更快
          ??????????sc?=?n?-?(n?>>>?2);
          ????????}
          ??????}?finally?{
          ????????//將計算后的sc(12)直接賦值給sizeCtl,表示達到12長度就擴容
          ????????//由于這里只會有一個線程在執(zhí)行,直接賦值即可,沒有線程安全問題
          ????????//只需要保證可見性
          ????????sizeCtl?=?sc;
          ??????}
          ??????break;
          ????}
          ??}
          ??return?tab;
          }

          table變量使用了volatile來保證每次獲取到的都是最新寫入的值

          transient?volatile?Node[]?table;

          05、總結

          就算有多個線程同時進行put操作,在初始化數組時使用了樂觀鎖CAS操作來決定到底是哪個線程有資格進行初始化,其他線程均只能等待。

          用到的并發(fā)技巧:

          • volatile變量(sizeCtl):它是一個標記位,用來告訴其他線程這個坑位有沒有人在,其線程間的可見性由volatile保證。
          • CAS操作:CAS操作保證了設置sizeCtl標記位的原子性,保證了只有一個線程能設置成功

          06、put操作的線程安全

          直接看代碼:

          final?V?putVal(K?key,?V?value,?boolean?onlyIfAbsent)?{
          ??if?(key?==?null?||?value?==?null)?throw?new?NullPointerException();
          ??//對key的hashCode進行散列
          ??int?hash?=?spread(key.hashCode());
          ??int?binCount?=?0;
          ??//一個無限循環(huán),直到put操作完成后退出循環(huán)
          ??for?(Node[]?tab?=?table;;)?{
          ????Node?f;?int?n,?i,?fh;
          ????//當Node數組為空時進行初始化
          ????if?(tab?==?null?||?(n?=?tab.length)?==?0)
          ??????tab?=?initTable();
          ????//Unsafe類volatile的方式取出hashCode散列后通過與運算得出的Node數組下標值對應的Node對象
          ????//此時的Node對象若為空,則代表還未有線程對此Node進行插入操作
          ????else?if?((f?=?tabAt(tab,?i?=?(n?-?1)?&?hash))?==?null)?{
          ??????//直接CAS方式插入數據
          ??????if?(casTabAt(tab,?i,?null,
          ???????????????????new?Node(hash,?key,?value,?null)))
          ????????//插入成功,退出循環(huán)
          ????????break;???????????????????//?no?lock?when?adding?to?empty?bin
          ????}
          ????//查看是否在擴容,先不看,擴容再介紹
          ????else?if?((fh?=?f.hash)?==?MOVED)
          ??????//幫助擴容
          ??????tab?=?helpTransfer(tab,?f);
          ????else?{
          ??????V?oldVal?=?null;
          ??????//對Node對象進行加鎖
          ??????synchronized?(f)?{
          ????????//二次確認此Node對象還是原來的那一個
          ????????if?(tabAt(tab,?i)?==?f)?{
          ??????????if?(fh?>=?0)?{
          ????????????binCount?=?1;
          ????????????//無限循環(huán),直到完成put
          ????????????for?(Node?e?=?f;;?++binCount)?{
          ??????????????K?ek;
          ??????????????//和HashMap一樣,先比較hash,再比較equals
          ??????????????if?(e.hash?==?hash?&&
          ??????????????????((ek?=?e.key)?==?key?||
          ???????????????????(ek?!=?null?&&?key.equals(ek))))?{
          ????????????????oldVal?=?e.val;
          ????????????????if?(!onlyIfAbsent)
          ??????????????????e.val?=?value;
          ????????????????break;
          ??????????????}
          ??????????????Node?pred?=?e;
          ??????????????if?((e?=?e.next)?==?null)?{
          ????????????????//和鏈表頭Node節(jié)點不沖突,就將其初始化為新Node作為上一個Node節(jié)點的next
          ????????????????//形成鏈表結構
          ????????????????pred.next?=?new?Node(hash,?key,
          ??????????????????????????????????????????value,?null);
          ????????????????break;
          ??????????????}
          ????????????}
          ??????????}
          ??????????...
          }

          值得關注的是tabAt(tab, i)方法,其使用Unsafe類volatile的操作volatile式地查看值,保證每次獲取到的值都是最新的:

          static?final??Node?tabAt(Node[]?tab,?int?i)?{
          ??return?(Node)U.getObjectVolatile(tab,?((long)i?<}

          雖然上面的table變量加了volatile,但也只能保證其引用的可見性,并不能確保其數組中的對象是否是最新的,所以需要Unsafe類volatile式地拿到最新的Node

          07、總結

          由于其減小了鎖的粒度,若Hash完美不沖突的情況下,可同時支持n個線程同時put操作,n為Node數組大小,在默認大小16下,可以支持最大同時16個線程無競爭同時操作且線程安全。當hash沖突嚴重時,Node鏈表越來越長,將導致嚴重的鎖競爭,此時會進行擴容,將Node進行再散列,下面會介紹擴容的線程安全性??偨Y一下用到的并發(fā)技巧:

          1. 減小鎖粒度:將Node鏈表的頭節(jié)點作為鎖,若在默認大小16情況下,將有16把鎖,大大減小了鎖競爭(上下文切換),就像開頭所說,將串行的部分最大化縮小,在理想情況下線程的put操作都為并行操作。同時直接鎖住頭節(jié)點,保證了線程安全
          2. Unsafe的getObjectVolatile方法:此方法確保獲取到的值為最新

          08、擴容操作的線程安全

          在擴容時,ConcurrentHashMap支持多線程并發(fā)擴容,在擴容過程中同時支持get查數據,若有線程put數據,還會幫助一起擴容,這種無阻塞算法,將并行最大化的設計,堪稱一絕。

          先來看看擴容代碼實現:

          private?final?void?transfer(Node[]?tab,?Node[]?nextTab)?{
          ??int?n?=?tab.length,?stride;
          ??//根據機器CPU核心數來計算,一條線程負責Node數組中多長的遷移量
          ??if?((stride?=?(NCPU?>?1)???(n?>>>?3)?/?NCPU?:?n)?????//本線程分到的遷移量
          ????//假設為16(默認也為16)
          ????stride?=?MIN_TRANSFER_STRIDE;?//?subdivide?range
          ??//nextTab若為空代表線程是第一個進行遷移的
          ??//初始化遷移后的新Node數組
          ??if?(nextTab?==?null)?{????????????//?initiating
          ????try?{
          ??????@SuppressWarnings("unchecked")
          ??????//這里n為舊數組長度,左移一位相當于乘以2
          ??????//例如原數組長度16,新數組長度則為32
          ??????Node[]?nt?=?(Node[])new?Node[n?<1];
          ??????nextTab?=?nt;
          ????}?catch?(Throwable?ex)?{??????//?try?to?cope?with?OOME
          ??????sizeCtl?=?Integer.MAX_VALUE;
          ??????return;
          ????}
          ????//設置nextTable變量為新數組
          ????nextTable?=?nextTab;
          ????//假設為16
          ????transferIndex?=?n;
          ??}
          ??//假設為32
          ??int?nextn?=?nextTab.length;
          ??//標示Node對象,此對象的hash變量為-1
          ??//在get或者put時若遇到此Node,則可以知道當前Node正在遷移
          ??//傳入nextTab對象
          ??ForwardingNode?fwd?=?new?ForwardingNode(nextTab);
          ??boolean?advance?=?true;
          ??boolean?finishing?=?false;?//?to?ensure?sweep?before?committing?nextTab
          ??for?(int?i?=?0,?bound?=?0;;)?{
          ????Node?f;?int?fh;
          ????while?(advance)?{
          ??????int?nextIndex,?nextBound;
          ??????//i為當前正在處理的Node數組下標,每次處理一個Node節(jié)點就會自減1
          ??????if?(--i?>=?bound?||?finishing)
          ????????advance?=?false;
          ??????//假設nextIndex=16
          ??????else?if?((nextIndex?=?transferIndex)?<=?0)?{
          ????????i?=?-1;
          ????????advance?=?false;
          ??????}
          ??????//由以上假設,nextBound就為0
          ??????//且將nextIndex設置為0
          ??????else?if?(U.compareAndSwapInt
          ???????????????(this,?TRANSFERINDEX,?nextIndex,
          ????????????????nextBound?=?(nextIndex?>?stride??
          ?????????????????????????????nextIndex?-?stride?:?0)))?{
          ????????//bound=0
          ????????bound?=?nextBound;
          ????????//i=16-1=15
          ????????i?=?nextIndex?-?1;
          ????????advance?=?false;
          ??????}
          ????}
          ????if?(i?0?||?i?>=?n?||?i?+?n?>=?nextn)?{
          ??????int?sc;
          ??????if?(finishing)?{
          ????????nextTable?=?null;
          ????????table?=?nextTab;
          ????????sizeCtl?=?(n?<1)?-?(n?>>>?1);
          ????????return;
          ??????}
          ??????if?(U.compareAndSwapInt(this,?SIZECTL,?sc?=?sizeCtl,?sc?-?1))?{
          ????????if?((sc?-?2)?!=?resizeStamp(n)?<??????????return;
          ????????finishing?=?advance?=?true;
          ????????i?=?n;?//?recheck?before?commit
          ??????}
          ????}
          ????//此時i=15,取出Node數組下標為15的那個Node,若為空則不需要遷移
          ????//直接設置占位標示,代表此Node已處理完成
          ????else?if?((f?=?tabAt(tab,?i))?==?null)
          ??????advance?=?casTabAt(tab,?i,?null,?fwd);
          ????//檢測此Node的hash是否為MOVED,MOVED是一個常量-1,也就是上面說的占位Node的hash
          ????//如果是占位Node,證明此節(jié)點已經處理過了,跳過i=15的處理,繼續(xù)循環(huán)
          ????else?if?((fh?=?f.hash)?==?MOVED)
          ??????advance?=?true;?//?already?processed
          ????else?{
          ??????//鎖住這個Node
          ??????synchronized?(f)?{
          ????????//確認Node是原先的Node
          ????????if?(tabAt(tab,?i)?==?f)?{
          ??????????//ln為lowNode,低位Node,hn為highNode,高位Node
          ??????????//這兩個概念下面以圖來說明
          ??????????Node?ln,?hn;
          ??????????if?(fh?>=?0)?{
          ????????????//此時fh與原來Node數組長度進行與運算
          ????????????//如果高X位為0,此時runBit=0
          ????????????//如果高X位為1,此時runBit=1
          ????????????int?runBit?=?fh?&?n;
          ????????????Node?lastRun?=?f;
          ????????????for?(Node?p?=?f.next;?p?!=?null;?p?=?p.next)?{
          ??????????????//這里的Node,都是同一Node鏈表中的Node對象
          ??????????????int?b?=?p.hash?&?n;
          ??????????????if?(b?!=?runBit)?{
          ????????????????runBit?=?b;
          ????????????????lastRun?=?p;
          ??????????????}
          ????????????}
          ????????????//正如上面所說,runBit=0,表示此Node為低位Node
          ????????????if?(runBit?==?0)?{
          ??????????????ln?=?lastRun;
          ??????????????hn?=?null;
          ????????????}
          ????????????else?{
          ??????????????//Node為高位Node
          ??????????????hn?=?lastRun;
          ??????????????ln?=?null;
          ????????????}
          ????????????for?(Node?p?=?f;?p?!=?lastRun;?p?=?p.next)?{
          ??????????????int?ph?=?p.hash;?K?pk?=?p.key;?V?pv?=?p.val;
          ??????????????//若hash和n與運算為0,證明為低位Node,原理同上
          ??????????????if?((ph?&?n)?==?0)
          ????????????????ln?=?new?Node(ph,?pk,?pv,?ln);
          ??????????????//這里將高位Node與地位Node都各自組成了兩個鏈表
          ??????????????else
          ????????????????hn?=?new?Node(ph,?pk,?pv,?hn);
          ????????????}
          ????????????//將低位Node設置到新Node數組中,下標為原來的位置
          ????????????setTabAt(nextTab,?i,?ln);
          ????????????//將高位Node設置到新Node數組中,下標為原來的位置加上原Node數組長度
          ????????????setTabAt(nextTab,?i?+?n,?hn);
          ????????????//將此Node設置為占位Node,代表處理完成
          ????????????setTabAt(tab,?i,?fwd);
          ????????????//繼續(xù)循環(huán)
          ????????????advance?=?true;
          ??????????}
          ??????????....
          ????????}
          ??????}
          ????}
          ??}
          }

          這里說一下遷移時為什么要分一個ln(低位Node)、hn(高位Node),首先說一個現象:

          我們知道,在put值的時候,首先會計算hash值,再散列到指定的Node數組下標中:

          //根據key的hashCode再散列
          int?hash?=?spread(key.hashCode());
          //使用(n?-?1)?&?hash?運算,定位Node數組中下標值
          (f?=?tabAt(tab,?i?=?(n?-?1)?&?hash);

          其中n為Node數組長度,這里假設為16。

          假設有一個key進來,它的散列之后的hash=9,那么它的下標值是多少呢?

          • (16 - 1)和 9 進行與運算 -> 0000 1111 和 0000 1001 結果還是 0000 1001 = 9

          假設Node數組需要擴容,我們知道,擴容是將數組長度增加兩倍,也就是32,那么下標值會是多少呢?

          • (32 - 1)和 9 進行與運算 -> 0001 1111 和 0000 1001 結果還是9

          此時,我們把散列之后的hash換成20,那么會有怎樣的變化呢?

          • (16 - 1)和 20 進行與運算 -> 0000 1111 和 0001 0100 結果是 0000 0100 = 4
          • (32 - 1)和 20 進行與運算 -> 0001 1111 和 0001 0100 結果是 0001 0100 = 20

          此時細心的讀者應該可以發(fā)現,如果hash在高X位為1,(X為數組長度的二進制-1的最高位),則擴容時是需要變換在Node數組中的索引值的,不然就hash不到,丟失數據,所以這里在遷移的時候將高X位為1的Node分類為hn,將高X位為0的Node分類為ln。

          回到代碼中:

          for?(Node?p?=?f;?p?!=?lastRun;?p?=?p.next)?{
          ??int?ph?=?p.hash;?
          ??K?pk?=?p.key;?
          ??V?pv?=?p.val;
          ??if?((ph?&?n)?==?0)
          ????ln?=?new?Node(ph,?pk,?pv,?ln);
          ??else
          ????hn?=?new?Node(ph,?pk,?pv,?hn);
          }

          這個操作將高低位組成了兩條鏈表結構,由下圖所示:然后將其CAS操作放入新的Node數組中:

          setTabAt(nextTab,?i,?ln);
          setTabAt(nextTab,?i?+?n,?hn);

          其中,低位鏈表放入原下標處,而高位鏈表則需要加上原Node數組長度,其中為什么不多贅述,上面已經舉例說明了,這樣就可以保證高位Node在遷移到新Node數組中依然可以使用hash算法散列到對應下標的數組中去了。

          最后將原Node數組中對應下標Node對象設置為fwd標記Node,表示該節(jié)點遷移完成,到這里,一個節(jié)點的遷移就完成了,將進行下一個節(jié)點的遷移,也就是i-1=14下標的Node節(jié)點。

          09、擴容時的get操作

          假設Node下標為16的Node節(jié)點正在遷移,突然有一個線程進來調用get方法,正好key又散列到下標為16的節(jié)點,此時怎么辦?

          public?V?get(Object?key)?{
          ??Node[]?tab;?Node?e,?p;?int?n,?eh;?K?ek;
          ??int?h?=?spread(key.hashCode());
          ??if?((tab?=?table)?!=?null?&&?(n?=?tab.length)?>?0?&&
          ??????(e?=?tabAt(tab,?(n?-?1)?&?h))?!=?null)?{
          ????if?((eh?=?e.hash)?==?h)?{
          ??????if?((ek?=?e.key)?==?key?||?(ek?!=?null?&&?key.equals(ek)))
          ????????return?e.val;
          ????}
          ????//假如Node節(jié)點的hash值小于0
          ????//則有可能是fwd節(jié)點
          ????else?if?(eh?0)
          ??????//調用節(jié)點對象的find方法查找值
          ??????return?(p?=?e.find(h,?key))?!=?null???p.val?:?null;
          ????while?((e?=?e.next)?!=?null)?{
          ??????if?(e.hash?==?h?&&
          ??????????((ek?=?e.key)?==?key?||?(ek?!=?null?&&?key.equals(ek))))
          ????????return?e.val;
          ????}
          ??}
          ??return?null;
          }

          重點看有注釋的那兩行,在get操作的源碼中,會判斷Node中的hash是否小于0,是否還記得我們的占位Node,其hash為MOVED,為常量值-1,所以此時判斷線程正在遷移,委托給fwd占位Node去查找值:

          //內部類?ForwardingNode中
          Node?find(int?h,?Object?k)?{
          ??//?loop?to?avoid?arbitrarily?deep?recursion?on?forwarding?nodes
          ??//?這里的查找,是去新Node數組中查找的
          ??//?下面的查找過程與HashMap查找無異,不多贅述
          ??outer:?for?(Node[]?tab?=?nextTable;;)?{
          ????Node?e;?int?n;
          ????if?(k?==?null?||?tab?==?null?||?(n?=?tab.length)?==?0?||
          ????????(e?=?tabAt(tab,?(n?-?1)?&?h))?==?null)
          ??????return?null;
          ????for?(;;)?{
          ??????int?eh;?K?ek;
          ??????if?((eh?=?e.hash)?==?h?&&
          ??????????((ek?=?e.key)?==?k?||?(ek?!=?null?&&?k.equals(ek))))
          ????????return?e;
          ??????if?(eh?0)?{
          ????????if?(e?instanceof?ForwardingNode)?{
          ??????????tab?=?((ForwardingNode)e).nextTable;
          ??????????continue?outer;
          ????????}
          ????????else
          ??????????return?e.find(h,?k);
          ??????}
          ??????if?((e?=?e.next)?==?null)
          ????????return?null;
          ????}
          ??}
          }

          到這里應該可以恍然大悟了,之所以占位Node需要保存新Node數組的引用也是因為這個,它可以支持在遷移的過程中照樣不阻塞地查找值,可謂是精妙絕倫的設計。

          10、多線程協助擴容

          在put操作時,假設正在遷移,正好有一個線程進來,想要put值到遷移的Node上,怎么辦?

          final?V?putVal(K?key,?V?value,?boolean?onlyIfAbsent)?{
          ??if?(key?==?null?||?value?==?null)?throw?new?NullPointerException();
          ??int?hash?=?spread(key.hashCode());
          ??int?binCount?=?0;
          ??for?(Node[]?tab?=?table;;)?{
          ????Node?f;?int?n,?i,?fh;
          ????if?(tab?==?null?||?(n?=?tab.length)?==?0)
          ??????tab?=?initTable();
          ????else?if?((f?=?tabAt(tab,?i?=?(n?-?1)?&?hash))?==?null)?{
          ??????if?(casTabAt(tab,?i,?null,
          ???????????????????new?Node(hash,?key,?value,?null)))
          ????????break;???????????????????//?no?lock?when?adding?to?empty?bin
          ????}
          ????//若此時發(fā)現了占位Node,證明此時HashMap正在遷移
          ????else?if?((fh?=?f.hash)?==?MOVED)
          ??????//進行協助遷移
          ??????tab?=?helpTransfer(tab,?f);
          ?????...
          }
          final?Node[]?helpTransfer(Node[]?tab,?Node?f)?{
          ??Node[]?nextTab;?int?sc;
          ??if?(tab?!=?null?&&?(f?instanceof?ForwardingNode)?&&
          ??????(nextTab?=?((ForwardingNode)f).nextTable)?!=?null)?{
          ????int?rs?=?resizeStamp(tab.length);
          ????while?(nextTab?==?nextTable?&&?table?==?tab?&&
          ???????????(sc?=?sizeCtl)?0)?{
          ??????if?((sc?>>>?RESIZE_STAMP_SHIFT)?!=?rs?||?sc?==?rs?+?1?||
          ??????????sc?==?rs?+?MAX_RESIZERS?||?transferIndex?<=?0)
          ????????break;
          ??????//sizeCtl加一,標示多一個線程進來協助擴容
          ??????if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?sc?+?1))?{
          ????????//擴容
          ????????transfer(tab,?nextTab);
          ????????break;
          ??????}
          ????}
          ????return?nextTab;
          ??}
          ??return?table;
          }

          此方法涉及大量復雜的位運算,這里不多贅述,只是簡單的說幾句,此時sizeCtl變量用來標示HashMap正在擴容,當其準備擴容時,會將sizeCtl設置為一個負數,(例如數組長度為16時)其二進制表示為:

          1000?0000?0001?1011?0000?0000?0000?0010

          無符號位為1,表示負數。其中高16位代表數組長度的一個位算法標示(有點像epoch的作用,表示當前遷移朝代為數組長度X),低16位表示有幾個線程正在做遷移,剛開始為2,接下來自增1,線程遷移完會進行減1操作,也就是如果低十六位為2,代表有一個線程正在遷移,如果為3,代表2個線程正在遷移以此類推…

          只要數組長度足夠長,就可以同時容納足夠多的線程來一起擴容,最大化并行任務,提高性能。

          11、在什么情況下會進行擴容操作?

          • 在put值時,發(fā)現Node為占位Node(fwd)時,會協助擴容

          • 在新增節(jié)點后,檢測到鏈表長度大于8時

            final?V?putVal(K?key,?V?value,?boolean?onlyIfAbsent)?{
            ??...
            ?if?(binCount?!=?0)?{
            ????//TREEIFY_THRESHOLD=8,當鏈表長度大于8時
            ???if?(binCount?>=?TREEIFY_THRESHOLD)
            ??????//調用treeifyBin方法
            ?????treeifyBin(tab,?i);
            ???if?(oldVal?!=?null)
            ?????return?oldVal;
            ???break;
            ?}
            ??...
            }

            treeifyBin方法會將鏈表轉換為紅黑樹,增加查找效率,但在這之前,會檢查數組長度,若小于64,則會優(yōu)先做擴容操作:

            private?final?void?treeifyBin(Node[]?tab,?int?index)?{
            ??Node?b;?int?n,?sc;
            ??if?(tab?!=?null)?{
            ????//MIN_TREEIFY_CAPACITY=64
            ????//若數組長度小于64,則先擴容
            ????if?((n?=?tab.length)???????//擴容
            ??????tryPresize(n?<1);
            ????else?if?((b?=?tabAt(tab,?index))?!=?null?&&?b.hash?>=?0)?{
            ??????synchronized?(b)?{
            ????????//...轉換為紅黑樹的操作
            ??????}
            ????}
            ??}
            }
          • 在每次新增節(jié)點之后,都會調用addCount方法,檢測Node數組大小是否達到閾值:

            final?V?putVal(K?key,?V?value,?boolean?onlyIfAbsent)?{
            ??...
            ????//在下面一節(jié)會講到,此方法統計容器元素數量
            ????addCount(1L,?binCount);
            ??return?null;
            }
            private?final?void?addCount(long?x,?int?check)?{
            ??CounterCell[]?as;?long?b,?s;
            ??if?((as?=?counterCells)?!=?null?||
            ??????!U.compareAndSwapLong(this,?BASECOUNT,?b?=?baseCount,?s?=?b?+?x))?{
            ????//統計元素個數的操作...
            ??}
            ??if?(check?>=?0)?{
            ????Node[]?tab,?nt;?int?n,?sc;
            ????//元素個數達到閾值,進行擴容
            ????while?(s?>=?(long)(sc?=?sizeCtl)?&&?(tab?=?table)?!=?null?&&
            ???????????(n?=?tab.length)???????int?rs?=?resizeStamp(n);
            ??????//發(fā)現sizeCtl為負數,證明有線程正在遷移
            ??????if?(sc?0)?{
            ????????if?((sc?>>>?RESIZE_STAMP_SHIFT)?!=?rs?||?sc?==?rs?+?1?||
            ????????????sc?==?rs?+?MAX_RESIZERS?||?(nt?=?nextTable)?==?null?||
            ????????????transferIndex?<=?0)
            ??????????break;
            ????????if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?sc?+?1))
            ??????????transfer(tab,?nt);
            ??????}
            ??????//不為負數,則為第一個遷移的線程
            ??????else?if?(U.compareAndSwapInt(this,?SIZECTL,?sc,
            ???????????????????????????????????(rs?<2))
            ????????transfer(tab,?null);
            ??????s?=?sumCount();
            ????}
            ??}
            }

          12、總結

          ConcurrentHashMap運用各類CAS操作,將擴容操作的并發(fā)性能實現最大化,在擴容過程中,就算有線程調用get查詢方法,也可以安全的查詢數據,若有線程進行put操作,還會協助擴容,利用sizeCtl標記位和各種volatile變量進行CAS操作達到多線程之間的通信、協助,在遷移過程中只鎖一個Node節(jié)點,即保證了線程安全,又提高了并發(fā)性能。

          13、統計容器大小的線程安全

          ConcurrentHashMap在每次put操作之后都會調用addCount方法,此方法用于統計容器大小且檢測容器大小是否達到閾值,若達到閾值需要進行擴容操作,這在上面也是有提到的。這一節(jié)重點討論容器大小的統計是如何做到線程安全且并發(fā)性能不低的。

          大部分的單機數據查詢優(yōu)化方案都會降低并發(fā)性能,就像緩存的存儲,在多線程環(huán)境下將有并發(fā)問題,所以會產生并行或者一系列并發(fā)沖突鎖競爭的問題,降低了并發(fā)性能。類似的,熱點數據也有這樣的問題,在多線程并發(fā)的過程中,熱點數據(頻繁被訪問的變量)是在每一個線程中幾乎或多或少都會訪問到的數據,這將增加程序中的串行部分,回憶一下開頭所描述的,程序中的串行部分將影響并發(fā)的可伸縮性,使并發(fā)性能下降,這通常會成為并發(fā)程序性能的瓶頸。而在ConcurrentHashMap中,如何快速的統計容器大小更是一個很重要的議題,因為容器內部需要依靠容器大小來考慮是否需要擴容,而在客戶端而言需要調用此方法來知道容器有多少個元素,如果處理不好這種熱點數據,并發(fā)性能將因為這個短板整體性能下降。

          試想一下,如果是你,你會如何設計這種熱點數據?是加鎖,還是進行CAS操作?進入ConcurrentHashMap中,看看大師是如何巧妙的運用了并發(fā)技巧,提高熱點數據的并發(fā)性能。

          先用圖的方式來看看大致的實現思路:

          @sun.misc.Contended?static?final?class?CounterCell?{
          ??volatile?long?value;
          ??CounterCell(long?x)?{?value?=?x;?}
          }

          這是一個粗略的實現,在設計中,使用了分而治之的思想,將每一個計數都分散到各個countCell對象里面(下面稱之為桶),使競爭最小化,又使用了CAS操作,就算有競爭,也可以對失敗了的線程進行其他的處理。樂觀鎖的實現方式與悲觀鎖不同之處就在于樂觀鎖可以對競爭失敗了的線程進行其他策略的處理,而悲觀鎖只能等待鎖釋放,所以這里使用CAS操作對競爭失敗的線程做了其他處理,很巧妙的運用了CAS樂觀鎖。

          下面看看具體的代碼實現吧:

          //計數,并檢查長度是否達到閾值
          private?final?void?addCount(long?x,?int?check)?{
          ??//計數桶
          ??CounterCell[]?as;?long?b,?s;
          ??//如果counterCells不為null,則代表已經初始化了,直接進入if語句塊
          ??//若競爭不嚴重,counterCells有可能還未初始化,為null,先嘗試CAS操作遞增baseCount值
          ??if?((as?=?counterCells)?!=?null?||
          ??????!U.compareAndSwapLong(this,?BASECOUNT,?b?=?baseCount,?s?=?b?+?x))?{
          ????//進入此語句塊有兩種可能
          ????//1.counterCells被初始化完成了,不為null
          ????//2.CAS操作遞增baseCount值失敗了,說明有競爭
          ????CounterCell?a;?long?v;?int?m;
          ????//標志是否存在競爭
          ????boolean?uncontended?=?true;
          ????//1.先判斷計數桶是否還沒初始化,則as=null,進入語句塊
          ????//2.判斷計數桶長度是否為空或,若是進入語句塊
          ????//3.這里做了一個線程變量隨機數,與上桶大小-1,若桶的這個位置為空,進入語句塊
          ????//4.到這里說明桶已經初始化了,且隨機的這個位置不為空,嘗試CAS操作使桶加1,失敗進入語句塊
          ????if?(as?==?null?||?(m?=?as.length?-?1)?0?||
          ????????(a?=?as[ThreadLocalRandom.getProbe()?&?m])?==?null?||
          ????????!(uncontended?=
          ??????????U.compareAndSwapLong(a,?CELLVALUE,?v?=?a.value,?v?+?x)))?{
          ??????fullAddCount(x,?uncontended);
          ??????return;
          ????}
          ????if?(check?<=?1)
          ??????return;
          ????//統計容器大小
          ????s?=?sumCount();
          ??}
          ??...
          }

          14、假設當前線程為第一個put的線程

          先假設當前Map還未被put數據,則addCount一定沒有被調用過,當前線程第一個調用addCount方法,則此時countCell一定沒有被初始化,為null,則進行如下判斷:

          if?((as?=?counterCells)?!=?null?||
          ??????!U.compareAndSwapLong(this,?BASECOUNT,?b?=?baseCount,?s?=?b?+?x))?

          這里的if判斷一定會走第二個判斷,先CAS增加變量baseCount的值:

          private?transient?volatile?long?baseCount;

          這個值有什么用呢?我們看看統計容器大小的方法sumCount:

          final?long?sumCount()?{
          ??//獲取計數桶
          ??CounterCell[]?as?=?counterCells;?CounterCell?a;
          ??//獲取baseCount,賦值給sum總數
          ??long?sum?=?baseCount;
          ??//若計數桶不為空,統計計數桶內的值
          ??if?(as?!=?null)?{
          ????for?(int?i?=?0;?i???????//遍歷計數桶,將value值相加
          ??????if?((a?=?as[i])?!=?null)
          ????????sum?+=?a.value;
          ????}
          ??}
          ??return?sum;
          }

          這個方法的大體思路與我們開頭那張圖差不多,容器的大小其實是分為兩部分,開頭只說了計數桶的那部分,其實還有一個baseCount,在線程沒有競爭的情況下的統計值,換句話說,在增加容量的時候其實是先去做CAS遞增baseCount的。

          由此可見,統計容器大小其實是用了兩種思路:

          1. CAS方式直接遞增:在線程競爭不大的時候,直接使用CAS操作遞增baseCount值即可,這里說的競爭不大指的是CAS操作不會失敗的情況
          2. 分而治之桶計數:若出現了CAS操作失敗的情況,則證明此時有線程競爭了,計數方式從CAS方式轉變?yōu)榉侄沃耐坝嫈捣绞?/section>

          15、出現了線程競爭導致CAS失敗

          此時出現了競爭,則不會再用CAS方式來計數了,直接使用桶方式,從上面的addCount方法可以看出來,此時的countCell是為空的,最終一定會進入fullAddCount方法來進行初始化桶:

          ???private?final?void?fullAddCount(long?x,?boolean?wasUncontended)?{
          ????????int?h;
          ????????if?((h?=?ThreadLocalRandom.getProbe())?==?0)?{
          ????????????ThreadLocalRandom.localInit();??????//?force?initialization
          ????????????h?=?ThreadLocalRandom.getProbe();
          ????????????wasUncontended?=?true;
          ????????}
          ????????boolean?collide?=?false;????????????????//?True?if?last?slot?nonempty
          ????????for?(;;)?{
          ????????????CounterCell[]?as;?CounterCell?a;?int?n;?long?v;
          ????????????...
          ????????????//如果計數桶!=null,證明已經初始化,此時不走此語句塊
          ????????????if?((as?=?counterCells)?!=?null?&&?(n?=?as.length)?>?0)?{
          ??????????????...
          ????????????}
          ????????????//進入此語句塊進行計數桶的初始化
          ????????????//CAS設置cellsBusy=1,表示現在計數桶Busy中...
          ????????????else?if?(cellsBusy?==?0?&&?counterCells?==?as?&&
          ?????????????????????U.compareAndSwapInt(this,?CELLSBUSY,?0,?1))?{
          ????????????????//若有線程同時初始化計數桶,由于CAS操作只有一個線程進入這里
          ????????????????boolean?init?=?false;
          ????????????????try?{???????????????????????????//?Initialize?table
          ????????????????????//再次確認計數桶為空
          ????????????????????if?(counterCells?==?as)?{
          ????????????????????????//初始化一個長度為2的計數桶
          ????????????????????????CounterCell[]?rs?=?new?CounterCell[2];
          ????????????????????????//h為一個隨機數,與上1則代表結果為0、1中隨機的一個
          ????????????????????????//也就是在0、1下標中隨便選一個計數桶,x=1,放入1的值代表增加1個容量
          ????????????????????????rs[h?&?1]?=?new?CounterCell(x);
          ????????????????????????//將初始化好的計數桶賦值給ConcurrentHashMap
          ????????????????????????counterCells?=?rs;
          ????????????????????????init?=?true;
          ????????????????????}
          ????????????????}?finally?{
          ????????????????????//最后將busy標識設置為0,表示不busy了
          ????????????????????cellsBusy?=?0;
          ????????????????}
          ????????????????if?(init)
          ????????????????????break;
          ????????????}
          ????????????//若有線程同時來初始化計數桶,則沒有搶到busy資格的線程就先來CAS遞增baseCount
          ????????????else?if?(U.compareAndSwapLong(this,?BASECOUNT,?v?=?baseCount,?v?+?x))
          ????????????????break;??????????????????????????//?Fall?back?on?using?base
          ????????}
          ????}

          到這里就完成了計數桶的初始化工作,在之后的計數都將會使用計數桶方式來統計總數

          16、計數桶擴容

          從上面的分析中我們知道,計數桶初始化之后長度為2,在競爭大的時候肯定是不夠用的,所以一定有計數桶的擴容操作,所以現在就有兩個問題了:

          1. 什么條件下會進行計數桶的擴容?
          2. 擴容操作是怎么樣的?

          假設此時是用計數桶方式進行計數:

          private?final?void?addCount(long?x,?int?check)?{
          ??CounterCell[]?as;?long?b,?s;
          ??if?((as?=?counterCells)?!=?null?||
          ??????!U.compareAndSwapLong(this,?BASECOUNT,?b?=?baseCount,?s?=?b?+?x))?{
          ????CounterCell?a;?long?v;?int?m;
          ????boolean?uncontended?=?true;
          ????//此時顯然會在計數桶數組中隨機選一個計數桶
          ????//然后使用CAS操作將此計數桶中的value+1
          ????if?(as?==?null?||?(m?=?as.length?-?1)?0?||
          ????????(a?=?as[ThreadLocalRandom.getProbe()?&?m])?==?null?||
          ????????!(uncontended?=
          ??????????U.compareAndSwapLong(a,?CELLVALUE,?v?=?a.value,?v?+?x)))?{
          ??????//若CAS操作失敗,證明有競爭,進入fullAddCount方法
          ??????fullAddCount(x,?uncontended);
          ??????return;
          ????}
          ????if?(check?<=?1)
          ??????return;
          ????s?=?sumCount();
          ??}
          ??...
          }

          進入fullAddCount方法:

          private?final?void?fullAddCount(long?x,?boolean?wasUncontended)?{
          ??int?h;
          ??if?((h?=?ThreadLocalRandom.getProbe())?==?0)?{
          ????ThreadLocalRandom.localInit();??????//?force?initialization
          ????h?=?ThreadLocalRandom.getProbe();
          ????wasUncontended?=?true;
          ??}
          ??boolean?collide?=?false;????????????????//?True?if?last?slot?nonempty
          ??for?(;;)?{
          ????CounterCell[]?as;?CounterCell?a;?int?n;?long?v;
          ????//計數桶初始化好了,一定是走這個if語句塊
          ????if?((as?=?counterCells)?!=?null?&&?(n?=?as.length)?>?0)?{
          ??????//從計數桶數組隨機選一個計數桶,若為null表示該桶位還沒線程遞增過
          ??????if?((a?=?as[(n?-?1)?&?h])?==?null)?{
          ????????//查看計數桶busy狀態(tài)是否被標識
          ????????if?(cellsBusy?==?0)?{????????????//?Try?to?attach?new?Cell
          ??????????//若不busy,直接new一個計數桶
          ??????????CounterCell?r?=?new?CounterCell(x);?//?Optimistic?create
          ??????????//CAS操作,標示計數桶busy中
          ??????????if?(cellsBusy?==?0?&&
          ??????????????U.compareAndSwapInt(this,?CELLSBUSY,?0,?1))?{
          ????????????boolean?created?=?false;
          ????????????try?{???????????????//?Recheck?under?lock
          ??????????????CounterCell[]?rs;?int?m,?j;
          ??????????????//在鎖下再檢查一次計數桶為null
          ??????????????if?((rs?=?counterCells)?!=?null?&&
          ??????????????????(m?=?rs.length)?>?0?&&
          ??????????????????rs[j?=?(m?-?1)?&?h]?==?null)?{
          ????????????????//將剛剛創(chuàng)建的計數桶賦值給對應位置
          ????????????????rs[j]?=?r;
          ????????????????created?=?true;
          ??????????????}
          ????????????}?finally?{
          ??????????????//標示不busy了
          ??????????????cellsBusy?=?0;
          ????????????}
          ????????????if?(created)
          ??????????????break;
          ????????????continue;???????????//?Slot?is?now?non-empty
          ??????????}
          ????????}
          ????????collide?=?false;
          ??????}
          ??????else?if?(!wasUncontended)???????//?CAS?already?known?to?fail
          ????????wasUncontended?=?true;??????//?Continue?after?rehash
          ??????//走到這里代表計數桶不為null,嘗試遞增計數桶
          ??????else?if?(U.compareAndSwapLong(a,?CELLVALUE,?v?=?a.value,?v?+?x))
          ????????break;
          ??????else?if?(counterCells?!=?as?||?n?>=?NCPU)
          ????????collide?=?false;????????????//?At?max?size?or?stale
          ??????//若CAS操作失敗了,到了這里,會先進入一次,然后再走一次剛剛的for循環(huán)
          ??????//若是第二次for循環(huán),collide=true,則不會走進去
          ??????else?if?(!collide)
          ????????collide?=?true;
          ??????//計數桶擴容,一個線程若走了兩次for循環(huán),也就是進行了多次CAS操作遞增計數桶失敗了
          ??????//則進行計數桶擴容,CAS標示計數桶busy中
          ??????else?if?(cellsBusy?==?0?&&
          ???????????????U.compareAndSwapInt(this,?CELLSBUSY,?0,?1))?{
          ????????try?{
          ??????????//確認計數桶還是同一個
          ??????????if?(counterCells?==?as)?{//?Expand?table?unless?stale
          ????????????//將長度擴大到2倍
          ????????????CounterCell[]?rs?=?new?CounterCell[n?<1];
          ????????????//遍歷舊計數桶,將引用直接搬過來
          ????????????for?(int?i?=?0;?i???????????????rs[i]?=?as[i];
          ????????????//賦值
          ????????????counterCells?=?rs;
          ??????????}
          ????????}?finally?{
          ??????????//取消busy狀態(tài)
          ??????????cellsBusy?=?0;
          ????????}
          ????????collide?=?false;
          ????????continue;???????????????????//?Retry?with?expanded?table
          ??????}
          ??????h?=?ThreadLocalRandom.advanceProbe(h);
          ????}
          ????else?if?(cellsBusy?==?0?&&?counterCells?==?as?&&
          ?????????????U.compareAndSwapInt(this,?CELLSBUSY,?0,?1))?{
          ??????//初始化計數桶...
          ????}
          ????else?if?(U.compareAndSwapLong(this,?BASECOUNT,?v?=?baseCount,?v?+?x))
          ??????break;??????????????????????????//?Fall?back?on?using?base
          ??}
          }

          看到這里,想必已經可以解決上面兩個問題了:

          1. 什么條件下會進行計數桶的擴容?

            答:在CAS操作遞增計數桶失敗了3次之后,會進行擴容計數桶操作,注意此時同時進行了兩次隨機定位計數桶來進行CAS遞增的,所以此時可以保證大概率是因為計數桶不夠用了,才會進行計數桶擴容

          2. 擴容操作是怎么樣的?

            答:計數桶長度增加到兩倍長度,數據直接遍歷遷移過來,由于計數桶不像HashMap數據結構那么復雜,有hash算法的影響,加上計數桶只是存放一個long類型的計數值而已,所以直接賦值引用即可。

          17、總結

          個人感覺,統計容器大小的線程安全與擴容ConcurrentHashMap這兩個算得上ConcurrentHashMap中最聰明的兩個并發(fā)設計了,閱讀此源碼的我看到這兩個操作的設計,都忍不住拍手叫絕,我想,這或許也是一個看源碼的樂趣吧,站在巨人的肩膀看巨人的思想。

          總結一下計數中用到的并發(fā)技巧:

          1. 利用CAS遞增baseCount值來感知是否存在線程競爭,若競爭不大直接CAS遞增baseCount值即可,性能與直接baseCount++差別不大
          2. 若存在線程競爭,則初始化計數桶,若此時初始化計數桶的過程中也存在競爭,多個線程同時初始化計數桶,則沒有搶到初始化資格的線程直接嘗試CAS遞增baseCount值的方式完成計數,最大化利用了線程的并行。此時使用計數桶計數,分而治之的方式來計數,此時兩個計數桶最大可提供兩個線程同時計數,同時使用CAS操作來感知線程競爭,若兩個桶情況下CAS操作還是頻繁失?。ㄊ?次),則直接擴容計數桶,變?yōu)?個計數桶,支持最大同時4個線程并發(fā)計數,以此類推…同時使用位運算和隨機數的方式"負載均衡"一樣的將線程計數請求接近均勻的落在各個計數桶中。

          18、get操作的線程安全

          對于get操作,其實沒有線程安全的問題,只有可見性的問題,只需要確保get的數據是線程之間可見的即可:

          public?V?get(Object?key)?{
          ??Node[]?tab;?Node?e,?p;?int?n,?eh;?K?ek;
          ??int?h?=?spread(key.hashCode());
          ??//此過程與HashMap的get操作無異,不多贅述
          ??if?((tab?=?table)?!=?null?&&?(n?=?tab.length)?>?0?&&
          ??????(e?=?tabAt(tab,?(n?-?1)?&?h))?!=?null)?{
          ????if?((eh?=?e.hash)?==?h)?{
          ??????if?((ek?=?e.key)?==?key?||?(ek?!=?null?&&?key.equals(ek)))
          ????????return?e.val;
          ????}
          ????//當hash<0,有可能是在遷移,使用fwd占位Node去查找新table中的數據
          ????else?if?(eh?0)
          ??????return?(p?=?e.find(h,?key))?!=?null???p.val?:?null;
          ????while?((e?=?e.next)?!=?null)?{
          ??????if?(e.hash?==?h?&&
          ??????????((ek?=?e.key)?==?key?||?(ek?!=?null?&&?key.equals(ek))))
          ????????return?e.val;
          ????}
          ??}
          ??return?null;
          }

          在get操作中除了增加了遷移的判斷以外,基本與HashMap的get操作無異,這里不多贅述,值得一提的是這里使用了tabAt方法Unsafe類volatile的方式去獲取Node數組中的Node,保證獲得到的Node是最新的

          static?final??Node?tabAt(Node[]?tab,?int?i)?{
          ??return?(Node)U.getObjectVolatile(tab,?((long)i?<}

          19、JDK1.7與1.8的不同實現

          JDK1.7的ConcurrentHashMap底層數據結構:其中1.7的實現也同樣采用了分段鎖的技術,只不過多個一個segment,一個segment里對應一個小HashMap,其中segment繼承了ReentrantLock,充當了鎖的角色,一把鎖鎖一個小HashMap(相當于多個Node),從1.8的實現來看, 鎖的粒度從多個Node級別又減小到一個Node級別,再度減小鎖競爭,減小程序同步的部分。

          20、總結

          不得不說,大師將CAS操作運用的淋漓盡致,相信理解了以上源碼的讀者也可以學習到大師所運用的并發(fā)技巧,不僅僅是在ConcurrentHashMap中,其實在大部分JUC的源碼中很多并發(fā)思想很值得我們去閱讀、學習。

          干貨分享

          最近將個人學習筆記整理成冊,使用PDF分享。關注我,回復如下代碼,即可獲得百度盤地址,無套路領??!

          ?001:《Java并發(fā)與高并發(fā)解決方案》學習筆記;?002:《深入JVM內核——原理、診斷與優(yōu)化》學習筆記;?003:《Java面試寶典》?004:《Docker開源書》?005:《Kubernetes開源書》?006:《DDD速成(領域驅動設計速成)》?007:全部?008:加技術群討論

          加個關注不迷路

          喜歡就點個"在看"唄^_^

          瀏覽 54
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  黄色国产视频 | 水蜜桃视频在线免费观看 | 99免费热视频 | 亚洲日本在线观看 | 人妻巨大乳HD免费看 |