解讀Java 8 中為并發(fā)而生的 ConcurrentHashMap

作者 |?Single_Yam
HashMap 是我們日常最常見的一種容器,它以鍵值對的形式完成對數(shù)據(jù)的存儲,但眾所周知,它在高并發(fā)的情境下是不安全的。尤其是在 jdk 1.8 之前,rehash 的過程中采用頭插法轉移結點,高并發(fā)下,多個線程同時操作一條鏈表將直接導致閉鏈,死循環(huán)并占滿 CPU。
當然,jdk 1.8 以來,對 HashMap 的內部進行了很大的改進,采用數(shù)組+鏈表+紅黑樹來進行數(shù)據(jù)的存儲。rehash 的過程也進行了改動,基于復制的算法思想,不直接操作原鏈,而是定義了兩條鏈表分別完成對原鏈的結點分離操作,即使是多線程的情況下也是安全的。當然,它畢竟不是并發(fā)容器,除非大改,否則依然是不能應對高并發(fā)場景的,或者說即使沒有因多線程訪問而造成什么問題,但是效率必然是受到影響的。例如在多線程同時添加元素的時候可能會丟失數(shù)據(jù),迭代 map 的時候發(fā)生 fail-fast 等。
本篇文章將要介紹的 ConcurrentHashMap 是 HashMap 的并發(fā)版本,它是線程安全的,并且在高并發(fā)的情境下,性能優(yōu)于 HashMap 很多。我們主要從以下幾個方面對其進行學習:
歷史版本的實現(xiàn)演變 重要成員屬性的介紹 put 方法實現(xiàn)并發(fā)添加 remove 方法實現(xiàn)并發(fā)刪除 其他的一些方法的簡單介紹
一、歷史版本的實現(xiàn)演變
jdk 1.7 采用分段鎖技術,整個 Hash 表被分成多個段,每個段中會對應一個 Segment 段鎖,段與段之間可以并發(fā)訪問,但是多線程想要操作同一個段是需要獲取鎖的。所有的 put,get,remove 等方法都是根據(jù)鍵的 hash 值對應到相應的段中,然后嘗試獲取鎖進行訪問。

jdk 1.8 取消了基于 Segment 的分段鎖思想,改用 CAS + synchronized 控制并發(fā)操作,在某些方面提升了性能。并且追隨 1.8 版本的 HashMap 底層實現(xiàn),使用數(shù)組+鏈表+紅黑樹進行數(shù)據(jù)存儲。本篇主要介紹 1.8 版本的 ConcurrentHashMap 的具體實現(xiàn),有關其之前版本的實現(xiàn)情況,這里推薦幾篇文章:
談談ConcurrentHashMap1.7和1.8的不同實現(xiàn):
http://www.jianshu.com/p/e694f1e868ec
ConcurrentHashMap在jdk1.8中的改進?:
https://www.cnblogs.com/everSeeker/p/5601861.html
ConcurrentHashMap原理分析(1.7與1.8):
https://www.cnblogs.com/study-everyday/p/6430462.html
二、重要成員屬性的介紹
transient?volatile?Node[]?table;
和 HashMap 中的語義一樣,代表整個哈希表。
/**
*?The?next?table?to?use;?non-null?only?while?resizing.
*/
private?transient?volatile?Node[]?nextTable;
這是一個連接表,用于哈希表擴容,擴容完成后會被重置為 null。
private?transient?volatile?long?baseCount;
該屬性保存著整個哈希表中存儲的所有的結點的個數(shù)總和,有點類似于 HashMap 的 size 屬性。
private?transient?volatile?int?sizeCtl;
這是一個重要的屬性,無論是初始化哈希表,還是擴容 rehash 的過程,都是需要依賴這個關鍵屬性的。該屬性有以下幾種取值:
0:默認值 -1:代表哈希表正在進行初始化 大于0:相當于 HashMap 中的 threshold,表示閾值 小于-1:代表有多個線程正在進行擴容
該屬性的使用還是有點復雜的,在我們分析擴容源碼的時候再給予更加詳盡的描述,此處了解其可取的幾個值都分別代表著什么樣的含義即可。
構造函數(shù)的實現(xiàn)也和我們上篇介紹的 HashMap 的實現(xiàn)類似,此處不再贅述,貼出源碼供大家比較。
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
其他常用的方法我們將在文末進行簡單介紹,下面我們主要來分析下 ConcurrentHashMap 的一個核心方法 put,我們也會一并解決掉該方法中涉及到的擴容、輔助擴容,初始化哈希表等方法。
三、put 方法實現(xiàn)并發(fā)添加
對于 HashMap 來說,多線程并發(fā)添加元素會導致數(shù)據(jù)丟失等并發(fā)問題,那么 ConcurrentHashMap 又是如何做到并發(fā)添加的呢?
public?V?put(K?key,?V?value)?{
????return?putVal(key,?value,?false);
}
putVal 的方法比較多,我們分兩個部分進行分析。
//第一部分
final?V?putVal(K?key,?V?value,?boolean?onlyIfAbsent)?{
?//對傳入的參數(shù)進行合法性判斷
????if?(key?==?null?||?value?==?null)?throw?new?NullPointerException();
????//計算鍵所對應的?hash?值
????int?hash?=?spread(key.hashCode());
????int?binCount?=?0;
????for?(Node[]?tab?=?table;;)?{
????????Node?f;?int?n,?i,?fh;
????????//如果哈希表還未初始化,那么初始化它
????????if?(tab?==?null?||?(n?=?tab.length)?==?0)
????????????tab?=?initTable();
????????//根據(jù)鍵的?hash?值找到哈希數(shù)組相應的索引位置
????????//如果為空,那么以CAS無鎖式向該位置添加一個節(jié)點
????????else?if?((f?=?tabAt(tab,?i?=?(n?-?1)?&?hash))?==?null)?{
????????????if?(casTabAt(tab,?i,?null,
?????????????????????????new?Node(hash,?key,?value,?null)))
????????????????break;???????????????????
????????}
這里需要詳細說明的只有 initTable 方法,這是一個初始化哈希表的操作,它同時只允許一個線程進行初始化操作。
private?final?Node[]?initTable()?{
????Node[]?tab;?int?sc;
????//如果表為空才進行初始化操作
????while?((tab?=?table)?==?null?||?tab.length?==?0)?{
????????//sizeCtl?小于零說明已經有線程正在進行初始化操作
????????//當前線程應該放棄?CPU?的使用
????????if?((sc?=?sizeCtl)?0)
????????????Thread.yield();?//?lost?initialization?race;?just?spin
????????//否則說明還未有線程對表進行初始化,那么本線程就來做這個工作
????????else?if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?-1))?{
??????//保險起見,再次判斷下表是否為空
????????????try?{
????????????????if?((tab?=?table)?==?null?||?tab.length?==?0)?{
?????????????????//sc?大于零說明容量已經初始化了,否則使用默認容量
????????????????????int?n?=?(sc?>?0)???sc?:?DEFAULT_CAPACITY;
????????????????????@SuppressWarnings("unchecked")
????????????????????//根據(jù)容量構建數(shù)組
????????????????????Node[]?nt?=?(Node[])new?Node,?>[n];
????????????????????table?=?tab?=?nt;
????????????????????//計算閾值,等效于?n*0.75
????????????????????sc?=?n?-?(n?>>>?2);
????????????????}
????????????}?finally?{
??????????//設置閾值
????????????????sizeCtl?=?sc;
????????????}
????????????break;
????????}
????}
????return?tab;
}
關于 initTable 方法的每一步實現(xiàn)都已經給出注釋,該方法的核心思想就是,只允許一個線程對表進行初始化,如果不巧有其他線程進來了,那么會讓其他線程交出 CPU 等待下次系統(tǒng)調度。這樣,保證了表同時只會被一個線程初始化。
接著,我們回到 putVal 方法,這樣的話,我們第一部分的 putVal 源碼就分析結束了,下面我們看后一部分的源碼:
//檢測到桶結點是?ForwardingNode?類型,協(xié)助擴容
else?if?((fh?=?f.hash)?==?MOVED)
?????tab?=?helpTransfer(tab,?f);
//桶結點是普通的結點,鎖住該桶頭結點并試圖在該鏈表的尾部添加一個節(jié)點
else?{
???????V?oldVal?=?null;
???????synchronized?(f)?{
???????????if?(tabAt(tab,?i)?==?f)?{
??????????????//向普通的鏈表中添加元素,無需贅述
??????????????if?(fh?>=?0)?{
?????????????????binCount?=?1;
?????????????????for?(Node?e?=?f;;?++binCount)?{
?????????????????????K?ek;
?????????????????????if?(e.hash?==?hash?&&((ek?=?e.key)?==?key?||(ek?!=?null?&&?key.equals(ek))))?{
?????????????????????????oldVal?=?e.val;
?????????????????????????if?(!onlyIfAbsent)
????????????????????????????e.val?=?value;
????????????????????????????break;
??????????????????????}
??????????????????????Node?pred?=?e;
??????????????????????if?((e?=?e.next)?==?null)?{
?????????????????????????pred.next?=?new?Node(hash,?key,value,?null);
?????????????????????????break;
??????????????????????}
?????????????????}
???????????}
???????????//向紅黑樹中添加元素,TreeBin?結點的hash值為TREEBIN(-2)
???????????else?if?(f?instanceof?TreeBin)?{
???????????????Node?p;
???????????????binCount?=?2;
?????????????????if?((p?=?((TreeBin)f).putTreeVal(hash,?key,??????????????????????????????????????value))?!=?null)?{
???????????????????oldVal?=?p.val;
???????????????????if?(!onlyIfAbsent)
??????????????????????p.val?=?value;
????????????????}
???????????}
???????}
???}
??//binCount?!=?0?說明向鏈表或者紅黑樹中添加或修改一個節(jié)點成功
??//binCount??==?0?說明?put?操作將一個新節(jié)點添加成為某個桶的首節(jié)點
??if?(binCount?!=?0)?{
???//鏈表深度超過?8?轉換為紅黑樹
?????????if?(binCount?>=?TREEIFY_THRESHOLD)
?????????????treeifyBin(tab,?i);
?????????//oldVal?!=?null?說明此次操作是修改操作
?????????//直接返回舊值即可,無需做下面的擴容邊界檢查
?????????if?(oldVal?!=?null)
?????????????return?oldVal;
???????????break;
????????}
????}
}
//CAS?式更新baseCount,并判斷是否需要擴容
addCount(1L,?binCount);
//程序走到這一步說明此次?put?操作是一個添加操作,否則早就?return?返回了
return?null;
這一部分的源碼大體上已如注釋所描述,至此整個 putVal 方法的大體邏輯實現(xiàn)相信你也已經清晰了,好好回味一下。下面我們對這部分中的某些方法的實現(xiàn)細節(jié)再做一些深入學習。
首先需要介紹一下,F(xiàn)orwardingNode 這個節(jié)點類型,
static?final?class?ForwardingNode<K,V>?extends?Node<K,V>?{
????????final?Node[]?nextTable;
????????ForwardingNode(Node[]?tab)?{
????????????//注意這里
????????????super(MOVED,?null,?null,?null);
????????????this.nextTable?=?tab;
????????}
?//省略其?find?方法
}
這個節(jié)點內部保存了一 nextTable 引用,它指向一張 hash 表。在擴容操作中,我們需要對每個桶中的結點進行分離和轉移,如果某個桶結點中所有節(jié)點都已經遷移完成了(已經被轉移到新表 nextTable 中了),那么會在原 table 表的該位置掛上一個 ForwardingNode 結點,說明此桶已經完成遷移。
ForwardingNode 繼承自 Node 結點,并且它唯一的構造函數(shù)將構建一個鍵,值,next 都為 null 的結點,反正它就是個標識,無需那些屬性。但是 hash 值卻為 MOVED。
所以,我們在 putVal 方法中遍歷整個 hash 表的桶結點,如果遇到 hash 值等于 MOVED,說明已經有線程正在擴容 rehash 操作,整體上還未完成,不過我們要插入的桶的位置已經完成了所有節(jié)點的遷移。
由于檢測到當前哈希表正在擴容,于是讓當前線程去協(xié)助擴容。
final Node[] helpTransfer(Node[] tab, Node f) {
Node[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode)f).nextTable) != null) {
//返回一個 16 位長度的擴容校驗標識
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
//sizeCtl 如果處于擴容狀態(tài)的話
//前 16 位是數(shù)據(jù)校驗標識,后 16 位是當前正在擴容的線程總數(shù)
//這里判斷校驗標識是否相等,如果校驗符不等或者擴容操作已經完成了,直接退出循環(huán),不用協(xié)助它們擴容了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//否則調用 transfer 幫助它們進行擴容
//sc + 1 標識增加了一個線程進行擴容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
下面我們看這個稍顯復雜的 transfer 方法,我們依然分幾個部分來細說。
//第一部分
private?final?void?transfer(Node[]?tab,?Node[]?nextTab) ?{
????????int?n?=?tab.length,?stride;
????????//計算單個線程允許處理的最少table桶首節(jié)點個數(shù),不能小于?16
????????if?((stride?=?(NCPU?>?1)???(n?>>>?3)?/?NCPU?:?n)?????????????stride?=?MIN_TRANSFER_STRIDE;?
????????//剛開始擴容,初始化?nextTab?
????????if?(nextTab?==?null)?{
????????????try?{
????????????????@SuppressWarnings("unchecked")
????????????????Node[]?nt?=?(Node[])new?Node,?>[n?<1];
????????????????nextTab?=?nt;
????????????}?catch?(Throwable?ex)?{
????????????????sizeCtl?=?Integer.MAX_VALUE;
????????????????return;
????????????}
????????????nextTable?=?nextTab;
????????????//transferIndex?指向最后一個桶,方便從后向前遍歷?
????????????transferIndex?=?n;
????????}
????????int?nextn?=?nextTab.length;
????????//定義?ForwardingNode?用于標記遷移完成的桶
????????ForwardingNode?fwd?=?new?ForwardingNode(nextTab);
這部分代碼還是比較簡單的,主要完成的是對單個線程能處理的最少桶結點個數(shù)的計算和一些屬性的初始化操作。
//第二部分,并發(fā)擴容控制的核心
boolean?advance?=?true;
boolean?finishing?=?false;
//i?指向當前桶,bound?指向當前線程需要處理的桶結點的區(qū)間下限
for?(int?i?=?0,?bound?=?0;;)?{
???????Node?f;?int?fh;
???????//這個?while?循環(huán)的目的就是通過?--i?遍歷當前線程所分配到的桶結點
???????//一個桶一個桶的處理
???????while?(advance)?{
???????????int?nextIndex,?nextBound;
???????????if?(--i?>=?bound?||?finishing)
???????????????advance?=?false;
???????????//transferIndex?<=?0?說明已經沒有需要遷移的桶了
???????????else?if?((nextIndex?=?transferIndex)?<=?0)?{
???????????????i?=?-1;
???????????????advance?=?false;
???????????}
???????????//更新?transferIndex
???????????//為當前線程分配任務,處理的桶結點區(qū)間為(nextBound,nextIndex)
???????????else?if?(U.compareAndSwapInt(this,?TRANSFERINDEX,?nextIndex,nextBound?=?(nextIndex?>?stride???nextIndex?-?stride?:?0)))?{
???????????????bound?=?nextBound;
???????????????i?=?nextIndex?-?1;
???????????????advance?=?false;
???????????}
???????}
???????//當前線程所有任務完成
???????if?(i?0?||?i?>=?n?||?i?+?n?>=?nextn)?{
???????????int?sc;
???????????if?(finishing)?{
???????????????nextTable?=?null;
???????????????table?=?nextTab;
???????????????sizeCtl?=?(n?<1)?-?(n?>>>?1);
???????????????return;
???????????}
???????????if?(U.compareAndSwapInt(this,?SIZECTL,?sc?=?sizeCtl,?sc?-?1))?{
???????????????if?((sc?-?2)?!=?resizeStamp(n)?<???????????????????return;
???????????????finishing?=?advance?=?true;
???????????????i?=?n;?
???????????}
???????}
???????//待遷移桶為空,那么在此位置?CAS?添加?ForwardingNode?結點標識該桶已經被處理過了
???????else?if?((f?=?tabAt(tab,?i))?==?null)
???????????advance?=?casTabAt(tab,?i,?null,?fwd);
???????//如果掃描到?ForwardingNode,說明此桶已經被處理過了,跳過即可
???????else?if?((fh?=?f.hash)?==?MOVED)
???????????advance?=?true;?
每個新參加進來擴容的線程必然先進 while 循環(huán)的最后一個判斷條件中去領取自己需要遷移的桶的區(qū)間。然后 i 指向區(qū)間的最后一個位置,表示遷移操作從后往前的做。接下來的幾個判斷就是實際的遷移結點操作了。等我們大致介紹完成第三部分的源碼再回來對各個判斷條件下的遷移過程進行詳細的敘述。
//第三部分
else?{
?//
????synchronized?(f)?{
????????if?(tabAt(tab,?i)?==?f)?{
????????????Node?ln,?hn;
????????????//鏈表的遷移操作
????????????if?(fh?>=?0)?{
????????????????int?runBit?=?fh?&?n;
????????????????Node?lastRun?=?f;
????????????????//整個?for?循環(huán)為了找到整個桶中最后連續(xù)的?fh?&?n?不變的結點
????????????????for?(Node?p?=?f.next;?p?!=?null;?p?=?p.next)?{
????????????????????int?b?=?p.hash?&?n;
????????????????????if?(b?!=?runBit)?{
????????????????????????runBit?=?b;
????????????????????????lastRun?=?p;
????????????????????}
????????????????}
????????????????if?(runBit?==?0)?{
????????????????????ln?=?lastRun;
????????????????????hn?=?null;
????????????????}
????????????????else?{
????????????????????hn?=?lastRun;
????????????????????ln?=?null;
????????????????}
????????????????//如果fh&n不變的鏈表的runbit都是0,則nextTab[i]內元素ln前逆序,ln及其之后順序
????????????????//否則,nextTab[i+n]內元素全部相對原table逆序
????????????????//這是通過一個節(jié)點一個節(jié)點的往nextTab添加
????????????????for?(Node?p?=?f;?p?!=?lastRun;?p?=?p.next)?{
????????????????????int?ph?=?p.hash;?K?pk?=?p.key;?V?pv?=?p.val;
????????????????????if?((ph?&?n)?==?0)
????????????????????????ln?=?new?Node(ph,?pk,?pv,?ln);
????????????????????else
????????????????????????hn?=?new?Node(ph,?pk,?pv,?hn);
????????????????}
????????????????//把兩條鏈表整體遷移到nextTab中
????????????????setTabAt(nextTab,?i,?ln);
????????????????setTabAt(nextTab,?i?+?n,?hn);
????????????????//將原桶標識位已經處理
????????????????setTabAt(tab,?i,?fwd);
????????????????advance?=?true;
????????????}
????????????//紅黑樹的復制算法,不再贅述
????????????else?if?(f?instanceof?TreeBin)?{
????????????????TreeBin?t?=?(TreeBin)f;
????????????????TreeNode?lo?=?null,?loTail?=?null;
????????????????TreeNode?hi?=?null,?hiTail?=?null;
????????????????int?lc?=?0,?hc?=?0;
????????????????for?(Node?e?=?t.first;?e?!=?null;?e?=?e.next)?{
????????????????????int?h?=?e.hash;
????????????????????TreeNode?p?=?new?TreeNode(h,?e.key,?e.val,?null,?null);
????????????????????if?((h?&?n)?==?0)?{
????????????????????????if?((p.prev?=?loTail)?==?null)
????????????????????????????lo?=?p;
????????????????????????else
????????????????????????????loTail.next?=?p;
????????????????????loTail?=?p;
????????????????????++lc;
????????????????????}
????????????????????else?{
????????????????????????if?((p.prev?=?hiTail)?==?null)
????????????????????????????hi?=?p;
????????????????????????else
????????????????????????????hiTail.next?=?p;
????????????????????hiTail?=?p;
????????????????????++hc;
????????????????????}
????????????????}
????????????????ln?=?(lc?<=?UNTREEIFY_THRESHOLD)???untreeify(lo)?:(hc?!=?0)???new?TreeBin(lo)?:?t;
????????????????hn?=?(hc?<=?UNTREEIFY_THRESHOLD)???untreeify(hi)?:(lc?!=?0)???new?TreeBin(hi)?:?t;
????????????????setTabAt(nextTab,?i,?ln);
????????????????setTabAt(nextTab,?i?+?n,?hn);
????????????????setTabAt(tab,?i,?fwd);
????????????????advance?=?true;
???????????}
那么至此,有關遷移的幾種情況已經介紹完成了,下面我們整體上把控一下整個擴容和遷移過程。
首先,每個線程進來會先領取自己的任務區(qū)間,然后開始 --i 來遍歷自己的任務區(qū)間,對每個桶進行處理。如果遇到桶的頭結點是空的,那么使用 ForwardingNode 標識該桶已經被處理完成了。如果遇到已經處理完成的桶,直接跳過進行下一個桶的處理。如果是正常的桶,對桶首節(jié)點加鎖,正常的遷移即可,遷移結束后依然會將原表的該位置標識位已經處理。
當 i < 0,說明本線程處理速度夠快的,整張表的最后一部分已經被它處理完了,現(xiàn)在需要看看是否還有其他線程在自己的區(qū)間段還在遷移中。這是退出的邏輯判斷部分:

finnish 是一個標志,如果為 true 則說明整張表的遷移操作已經全部完成了,我們只需要重置 table 的引用并將 nextTable 賦為空即可。否則,CAS 式的將 sizeCtl 減一,表示當前線程已經完成了任務,退出擴容操作。
如果退出成功,那么需要進一步判斷是否還有其他線程仍然在執(zhí)行任務。
if?((sc?-?2)?!=?resizeStamp(n)?<???return;
我們說過 resizeStamp(n) 返回的是對 n 的一個數(shù)據(jù)校驗標識,占 16 位。而 RESIZE_STAMP_SHIFT 的值為 16,那么位運算后,整個表達式必然在右邊空出 16 個零。也正如我們所說的,sizeCtl 的高 16 位為數(shù)據(jù)校驗標識,低 16 為表示正在進行擴容的線程數(shù)量。
(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 表示當前只有一個線程正在工作,相對應的,如果 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,說明當前線程就是最后一個還在擴容的線程,那么會將 finishing 標識為 true,并在下一次循環(huán)中退出擴容方法。
這一塊的難點在于對 sizeCtl 的各個值的理解,關于它的深入理解,這里推薦一篇文章。
著重理解位操作:
http://wuzhaoyang.me/2016/09/05/java-collection-map-2.html
看到這里,真的為 Doug Lea 精妙的設計而折服,針對于多線程訪問問題,不但沒有拒絕式得將他們阻塞在門外,反而邀請他們來幫忙一起工作。
好了,我們一路往回走,回到我們最初分析的 putVal 方法。接著前文的分析,當我們根據(jù) hash 值,找到對應的桶結點,如果發(fā)現(xiàn)該結點為 ForwardingNode 結點,表明當前的哈希表正在擴容和 rehash,于是將本線程送進去幫忙擴容。否則如果是普通的桶結點,于是鎖住該桶,分鏈表和紅黑樹的插入一個節(jié)點,具體插入過程類似 HashMap,此處不再贅述。
當我們成功的添加完成一個結點,最后是需要判斷添加操作后是否會導致哈希表達到它的閾值,并針對不同情況決定是否需要進行擴容,還有 CAS 式更新哈希表實際存儲的鍵值對數(shù)量。這些操作都封裝在 addCount 這個方法中,當然 putVal 方法的最后必然會調用該方法進行處理。下面我們看看該方法的具體實現(xiàn),該方法主要做兩個事情。一是更新 baseCount,二是判斷是否需要擴容。
//第一部分,更新?baseCount
private?final?void?addCount(long?x,?int?check)?{
????CounterCell[]?as;?long?b,?s;
????//如果更新失敗才會進入的?if?的主體代碼中
????//s?=?b?+?x??其中?x?等于?1
????if?((as?=?counterCells)?!=?null?||
????????!U.compareAndSwapLong(this,?BASECOUNT,?b?=?baseCount,?s?=?b?+?x))?{
????????CounterCell?a;?long?v;?int?m;
????????boolean?uncontended?=?true;
????????//高并發(fā)下?CAS?失敗會執(zhí)行?fullAddCount?方法
????????if?(as?==?null?||?(m?=?as.length?-?1)?0?||?(a?=?as[ThreadLocalRandom.getProbe()?&?m])?==?null?||!(uncontended?=U.compareAndSwapLong(a,?CELLVALUE,?v?=?a.value,?v?+?x)))?{
????????????fullAddCount(x,?uncontended);
????????????return;
????????}
????????if?(check?<=?1)
????????????return;
????????s?=?sumCount();
????}
這一部分主要完成的是對 baseCount 的 CAS 更新。
//第二部分,判斷是否需要擴容
if?(check?>=?0)?{
?????Node[]?tab,?nt;?int?n,?sc;
?????while?(s?>=?(long)(sc?=?sizeCtl)?&&?(tab?=?table)?!=?null?&&(n?=?tab.length)???????????int?rs?=?resizeStamp(n);
??????????if?(sc?0)?{
?????????????if?((sc?>>>?RESIZE_STAMP_SHIFT)?!=?rs?||?sc?==?rs?+?1?||sc?==?rs?+?MAX_RESIZERS?||?(nt?=?nextTable)?==?null?||transferIndex?<=?0)
???????????????break;
???????????????if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?sc?+?1))
??????????????????transfer(tab,?nt);
????????????}
????????else?if?(U.compareAndSwapInt(this,?SIZECTL,?sc,(rs?<2))
????????????????????transfer(tab,?null);
???????????????s?=?sumCount();
????????????}
????}
這部分代碼也是比較簡單的,不再贅述。
至此,對于 put 方法的源碼分析已經完全結束了,很復雜但也很讓人欽佩。下面我們簡單看看 remove 方法的實現(xiàn)。
四、remove 方法實現(xiàn)并發(fā)刪除
在我們分析完 put 方法的源碼之后,相信 remove 方法對你而言就比較輕松了,無非就是先定位再刪除的復合。
限于篇幅,我們這里簡單的描述下 remove 方法的并發(fā)刪除過程。
首先遍歷整張表的桶結點,如果表還未初始化或者無法根據(jù)參數(shù)的 hash 值定位到桶結點,那么將返回 null。
如果定位到的桶結點類型是 ForwardingNode 結點,調用 helpTransfer 協(xié)助擴容。
否則就老老實實的給桶加鎖,刪除一個節(jié)點。
最后會調用 addCount 方法 CAS 更新 baseCount 的值。
五、其他的一些常用方法的基本介紹
最后我們在補充一些 ConcurrentHashMap 中的小而常用的方法的介紹。
1、size?size 方法的作用是為我們返回哈希表中實際存在的鍵值對的總數(shù)。
public?int?size()?{
????long?n?=?sumCount();
????return?((n?0L)???0?:(n?>?(long)Integer.MAX_VALUE)???Integer.MAX_VALUE?:(int)n);
}
final?long?sumCount()?{
????CounterCell[]?as?=?counterCells;?CounterCell?a;
????long?sum?=?baseCount;
????if?(as?!=?null)?{
????????for?(int?i?=?0;?i?????????????if?((a?=?as[i])?!=?null)
????????????????sum?+=?a.value;
????????}
????}
????return?sum;
}
可能你會有所疑問,ConcurrentHashMap 中的 baseCount 屬性不就是記錄的所有鍵值對的總數(shù)嗎?直接返回它不就行了嗎?
之所以沒有這么做,是因為我們的 addCount 方法用于 CAS 更新 baseCount,但很有可能在高并發(fā)的情況下,更新失敗,那么這些節(jié)點雖然已經被添加到哈希表中了,但是數(shù)量卻沒有被統(tǒng)計。
還好,addCount 方法在更新 baseCount 失敗的時候,會調用 fullAddCount 將這些失敗的結點包裝成一個 CounterCell 對象,保存在 CounterCell 數(shù)組中。那么整張表實際的 size 其實是 baseCount 加上 CounterCell 數(shù)組中元素的個數(shù)。
2、get?get 方法可以根據(jù)指定的鍵,返回對應的鍵值對,由于是讀操作,所以不涉及到并發(fā)問題。源碼也是比較簡單的。
public?V?get(Object?key)?{
????????Node[]?tab;?Node?e,?p;?int?n,?eh;?K?ek;
????????int?h?=?spread(key.hashCode());
????????if?((tab?=?table)?!=?null?&&?(n?=?tab.length)?>?0?&&
????????????(e?=?tabAt(tab,?(n?-?1)?&?h))?!=?null)?{
????????????if?((eh?=?e.hash)?==?h)?{
????????????????if?((ek?=?e.key)?==?key?||?(ek?!=?null?&&?key.equals(ek)))
????????????????????return?e.val;
????????????}
????????????else?if?(eh?0)
????????????????return?(p?=?e.find(h,?key))?!=?null???p.val?:?null;
????????????while?((e?=?e.next)?!=?null)?{
????????????????if?(e.hash?==?h?&&
????????????????????((ek?=?e.key)?==?key?||?(ek?!=?null?&&?key.equals(ek))))
????????????????????return?e.val;
????????????}
????????}
????????return?null;
????}
3、clear?clear 方法將刪除整張哈希表中所有的鍵值對,刪除操作也是一個桶一個桶的進行刪除。
public?void?clear()?{
????long?delta?=?0L;?//?negative?number?of?deletions
????int?i?=?0;
????Node[]?tab?=?table;
????while?(tab?!=?null?&&?i?????????int?fh;
????????Node?f?=?tabAt(tab,?i);
????????if?(f?==?null)
????????????++i;
????????else?if?((fh?=?f.hash)?==?MOVED)?{
????????????tab?=?helpTransfer(tab,?f);
????????????i?=?0;?//?restart
????????}
????????else?{
????????????synchronized?(f)?{
????????????????if?(tabAt(tab,?i)?==?f)?{
????????????????????Node?p?=?(fh?>=?0???f?:(f?instanceof?TreeBin)??((TreeBin)f).first?:?null);
????????????????????????//循環(huán)到鏈表或者紅黑樹的尾部
????????????????????????while?(p?!=?null)?{
????????????????????????????--delta;
????????????????????????????p?=?p.next;
????????????????????????}
????????????????????????//首先刪除鏈、樹的末尾元素,避免產生大量垃圾??
????????????????????????//利用CAS無鎖置null??
????????????????????????setTabAt(tab,?i++,?null);
????????????????????}
????????????????}
????????????}
????????}
????????if?(delta?!=?0L)
????????????addCount(delta,?-1);
????}
到此為止,有關這個為并發(fā)而生的 ConcurrentHashMap 內部的核心的部分,我們已經通過源碼進行了分析。確實挺費腦,再次膜拜下 jdk 大神們的智慧。總結不到之處,望指出!
- 推薦閱讀 -
下方二維碼關注我

互聯(lián)網草根,堅持分享技術、創(chuàng)業(yè)、產品等心得和總結~

點擊“閱讀原文”,領取 2020 年最新免費技術資料大全
