解讀Java 8 中為并發(fā)而生的 ConcurrentHashMap
作者 |?Single_Yam
HashMap 是我們?nèi)粘W畛R?jiàn)的一種容器,它以鍵值對(duì)的形式完成對(duì)數(shù)據(jù)的存儲(chǔ),但眾所周知,它在高并發(fā)的情境下是不安全的。尤其是在 jdk 1.8 之前,rehash 的過(guò)程中采用頭插法轉(zhuǎn)移結(jié)點(diǎn),高并發(fā)下,多個(gè)線程同時(shí)操作一條鏈表將直接導(dǎo)致閉鏈,死循環(huán)并占滿 CPU。
當(dāng)然,jdk 1.8 以來(lái),對(duì) HashMap 的內(nèi)部進(jìn)行了很大的改進(jìn),采用數(shù)組+鏈表+紅黑樹(shù)來(lái)進(jìn)行數(shù)據(jù)的存儲(chǔ)。rehash 的過(guò)程也進(jìn)行了改動(dòng),基于復(fù)制的算法思想,不直接操作原鏈,而是定義了兩條鏈表分別完成對(duì)原鏈的結(jié)點(diǎn)分離操作,即使是多線程的情況下也是安全的。當(dāng)然,它畢竟不是并發(fā)容器,除非大改,否則依然是不能應(yīng)對(duì)高并發(fā)場(chǎng)景的,或者說(shuō)即使沒(méi)有因多線程訪問(wèn)而造成什么問(wèn)題,但是效率必然是受到影響的。例如在多線程同時(shí)添加元素的時(shí)候可能會(huì)丟失數(shù)據(jù),迭代 map 的時(shí)候發(fā)生 fail-fast 等。
本篇文章將要介紹的 ConcurrentHashMap 是 HashMap 的并發(fā)版本,它是線程安全的,并且在高并發(fā)的情境下,性能優(yōu)于 HashMap 很多。我們主要從以下幾個(gè)方面對(duì)其進(jìn)行學(xué)習(xí):
歷史版本的實(shí)現(xiàn)演變 重要成員屬性的介紹 put 方法實(shí)現(xiàn)并發(fā)添加 remove 方法實(shí)現(xiàn)并發(fā)刪除 其他的一些方法的簡(jiǎn)單介紹
一、歷史版本的實(shí)現(xiàn)演變
jdk 1.7 采用分段鎖技術(shù),整個(gè) Hash 表被分成多個(gè)段,每個(gè)段中會(huì)對(duì)應(yīng)一個(gè) Segment 段鎖,段與段之間可以并發(fā)訪問(wèn),但是多線程想要操作同一個(gè)段是需要獲取鎖的。所有的 put,get,remove 等方法都是根據(jù)鍵的 hash 值對(duì)應(yīng)到相應(yīng)的段中,然后嘗試獲取鎖進(jìn)行訪問(wèn)。

jdk 1.8 取消了基于 Segment 的分段鎖思想,改用 CAS + synchronized 控制并發(fā)操作,在某些方面提升了性能。并且追隨 1.8 版本的 HashMap 底層實(shí)現(xiàn),使用數(shù)組+鏈表+紅黑樹(shù)進(jìn)行數(shù)據(jù)存儲(chǔ)。本篇主要介紹 1.8 版本的 ConcurrentHashMap 的具體實(shí)現(xiàn),有關(guān)其之前版本的實(shí)現(xiàn)情況,這里推薦幾篇文章:
談?wù)凜oncurrentHashMap1.7和1.8的不同實(shí)現(xiàn):
http://www.jianshu.com/p/e694f1e868ec
ConcurrentHashMap在jdk1.8中的改進(jìn)?:
https://www.cnblogs.com/everSeeker/p/5601861.html
ConcurrentHashMap原理分析(1.7與1.8):
https://www.cnblogs.com/study-everyday/p/6430462.html
二、重要成員屬性的介紹
transient?volatile?Node[]?table;
和 HashMap 中的語(yǔ)義一樣,代表整個(gè)哈希表。
/**
*?The?next?table?to?use;?non-null?only?while?resizing.
*/
private?transient?volatile?Node[]?nextTable;
這是一個(gè)連接表,用于哈希表擴(kuò)容,擴(kuò)容完成后會(huì)被重置為 null。
private?transient?volatile?long?baseCount;
該屬性保存著整個(gè)哈希表中存儲(chǔ)的所有的結(jié)點(diǎn)的個(gè)數(shù)總和,有點(diǎn)類似于 HashMap 的 size 屬性。
private?transient?volatile?int?sizeCtl;
這是一個(gè)重要的屬性,無(wú)論是初始化哈希表,還是擴(kuò)容 rehash 的過(guò)程,都是需要依賴這個(gè)關(guān)鍵屬性的。該屬性有以下幾種取值:
0:默認(rèn)值 -1:代表哈希表正在進(jìn)行初始化 大于0:相當(dāng)于 HashMap 中的 threshold,表示閾值 小于-1:代表有多個(gè)線程正在進(jìn)行擴(kuò)容
該屬性的使用還是有點(diǎn)復(fù)雜的,在我們分析擴(kuò)容源碼的時(shí)候再給予更加詳盡的描述,此處了解其可取的幾個(gè)值都分別代表著什么樣的含義即可。
構(gòu)造函數(shù)的實(shí)現(xiàn)也和我們上篇介紹的 HashMap 的實(shí)現(xiàn)類似,此處不再贅述,貼出源碼供大家比較。
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
其他常用的方法我們將在文末進(jìn)行簡(jiǎn)單介紹,下面我們主要來(lái)分析下 ConcurrentHashMap 的一個(gè)核心方法 put,我們也會(huì)一并解決掉該方法中涉及到的擴(kuò)容、輔助擴(kuò)容,初始化哈希表等方法。
三、put 方法實(shí)現(xiàn)并發(fā)添加
對(duì)于 HashMap 來(lái)說(shuō),多線程并發(fā)添加元素會(huì)導(dǎo)致數(shù)據(jù)丟失等并發(fā)問(wèn)題,那么 ConcurrentHashMap 又是如何做到并發(fā)添加的呢?
public?V?put(K?key,?V?value)?{
????return?putVal(key,?value,?false);
}
putVal 的方法比較多,我們分兩個(gè)部分進(jìn)行分析。
//第一部分
final?V?putVal(K?key,?V?value,?boolean?onlyIfAbsent)?{
?//對(duì)傳入的參數(shù)進(jìn)行合法性判斷
????if?(key?==?null?||?value?==?null)?throw?new?NullPointerException();
????//計(jì)算鍵所對(duì)應(yīng)的?hash?值
????int?hash?=?spread(key.hashCode());
????int?binCount?=?0;
????for?(Node[]?tab?=?table;;)?{
????????Node?f;?int?n,?i,?fh;
????????//如果哈希表還未初始化,那么初始化它
????????if?(tab?==?null?||?(n?=?tab.length)?==?0)
????????????tab?=?initTable();
????????//根據(jù)鍵的?hash?值找到哈希數(shù)組相應(yīng)的索引位置
????????//如果為空,那么以CAS無(wú)鎖式向該位置添加一個(gè)節(jié)點(diǎn)
????????else?if?((f?=?tabAt(tab,?i?=?(n?-?1)?&?hash))?==?null)?{
????????????if?(casTabAt(tab,?i,?null,
?????????????????????????new?Node(hash,?key,?value,?null)))
????????????????break;???????????????????
????????}
這里需要詳細(xì)說(shuō)明的只有 initTable 方法,這是一個(gè)初始化哈希表的操作,它同時(shí)只允許一個(gè)線程進(jìn)行初始化操作。
private?final?Node[]?initTable()?{
????Node[]?tab;?int?sc;
????//如果表為空才進(jìn)行初始化操作
????while?((tab?=?table)?==?null?||?tab.length?==?0)?{
????????//sizeCtl?小于零說(shuō)明已經(jīng)有線程正在進(jìn)行初始化操作
????????//當(dāng)前線程應(yīng)該放棄?CPU?的使用
????????if?((sc?=?sizeCtl)?0)
????????????Thread.yield();?//?lost?initialization?race;?just?spin
????????//否則說(shuō)明還未有線程對(duì)表進(jìn)行初始化,那么本線程就來(lái)做這個(gè)工作
????????else?if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?-1))?{
??????//保險(xiǎn)起見(jiàn),再次判斷下表是否為空
????????????try?{
????????????????if?((tab?=?table)?==?null?||?tab.length?==?0)?{
?????????????????//sc?大于零說(shuō)明容量已經(jīng)初始化了,否則使用默認(rèn)容量
????????????????????int?n?=?(sc?>?0)???sc?:?DEFAULT_CAPACITY;
????????????????????@SuppressWarnings("unchecked")
????????????????????//根據(jù)容量構(gòu)建數(shù)組
????????????????????Node[]?nt?=?(Node[])new?Node,?>[n];
????????????????????table?=?tab?=?nt;
????????????????????//計(jì)算閾值,等效于?n*0.75
????????????????????sc?=?n?-?(n?>>>?2);
????????????????}
????????????}?finally?{
??????????//設(shè)置閾值
????????????????sizeCtl?=?sc;
????????????}
????????????break;
????????}
????}
????return?tab;
}
關(guān)于 initTable 方法的每一步實(shí)現(xiàn)都已經(jīng)給出注釋,該方法的核心思想就是,只允許一個(gè)線程對(duì)表進(jìn)行初始化,如果不巧有其他線程進(jìn)來(lái)了,那么會(huì)讓其他線程交出 CPU 等待下次系統(tǒng)調(diào)度。這樣,保證了表同時(shí)只會(huì)被一個(gè)線程初始化。
接著,我們回到 putVal 方法,這樣的話,我們第一部分的 putVal 源碼就分析結(jié)束了,下面我們看后一部分的源碼:
//檢測(cè)到桶結(jié)點(diǎn)是?ForwardingNode?類型,協(xié)助擴(kuò)容
else?if?((fh?=?f.hash)?==?MOVED)
?????tab?=?helpTransfer(tab,?f);
//桶結(jié)點(diǎn)是普通的結(jié)點(diǎn),鎖住該桶頭結(jié)點(diǎn)并試圖在該鏈表的尾部添加一個(gè)節(jié)點(diǎn)
else?{
???????V?oldVal?=?null;
???????synchronized?(f)?{
???????????if?(tabAt(tab,?i)?==?f)?{
??????????????//向普通的鏈表中添加元素,無(wú)需贅述
??????????????if?(fh?>=?0)?{
?????????????????binCount?=?1;
?????????????????for?(Node?e?=?f;;?++binCount)?{
?????????????????????K?ek;
?????????????????????if?(e.hash?==?hash?&&((ek?=?e.key)?==?key?||(ek?!=?null?&&?key.equals(ek))))?{
?????????????????????????oldVal?=?e.val;
?????????????????????????if?(!onlyIfAbsent)
????????????????????????????e.val?=?value;
????????????????????????????break;
??????????????????????}
??????????????????????Node?pred?=?e;
??????????????????????if?((e?=?e.next)?==?null)?{
?????????????????????????pred.next?=?new?Node(hash,?key,value,?null);
?????????????????????????break;
??????????????????????}
?????????????????}
???????????}
???????????//向紅黑樹(shù)中添加元素,TreeBin?結(jié)點(diǎn)的hash值為TREEBIN(-2)
???????????else?if?(f?instanceof?TreeBin)?{
???????????????Node?p;
???????????????binCount?=?2;
?????????????????if?((p?=?((TreeBin)f).putTreeVal(hash,?key,??????????????????????????????????????value))?!=?null)?{
???????????????????oldVal?=?p.val;
???????????????????if?(!onlyIfAbsent)
??????????????????????p.val?=?value;
????????????????}
???????????}
???????}
???}
??//binCount?!=?0?說(shuō)明向鏈表或者紅黑樹(shù)中添加或修改一個(gè)節(jié)點(diǎn)成功
??//binCount??==?0?說(shuō)明?put?操作將一個(gè)新節(jié)點(diǎn)添加成為某個(gè)桶的首節(jié)點(diǎn)
??if?(binCount?!=?0)?{
???//鏈表深度超過(guò)?8?轉(zhuǎn)換為紅黑樹(shù)
?????????if?(binCount?>=?TREEIFY_THRESHOLD)
?????????????treeifyBin(tab,?i);
?????????//oldVal?!=?null?說(shuō)明此次操作是修改操作
?????????//直接返回舊值即可,無(wú)需做下面的擴(kuò)容邊界檢查
?????????if?(oldVal?!=?null)
?????????????return?oldVal;
???????????break;
????????}
????}
}
//CAS?式更新baseCount,并判斷是否需要擴(kuò)容
addCount(1L,?binCount);
//程序走到這一步說(shuō)明此次?put?操作是一個(gè)添加操作,否則早就?return?返回了
return?null;
這一部分的源碼大體上已如注釋所描述,至此整個(gè) putVal 方法的大體邏輯實(shí)現(xiàn)相信你也已經(jīng)清晰了,好好回味一下。下面我們對(duì)這部分中的某些方法的實(shí)現(xiàn)細(xì)節(jié)再做一些深入學(xué)習(xí)。
首先需要介紹一下,F(xiàn)orwardingNode 這個(gè)節(jié)點(diǎn)類型,
static?final?class?ForwardingNode<K,V>?extends?Node<K,V>?{
????????final?Node[]?nextTable;
????????ForwardingNode(Node[]?tab)?{
????????????//注意這里
????????????super(MOVED,?null,?null,?null);
????????????this.nextTable?=?tab;
????????}
?//省略其?find?方法
}
這個(gè)節(jié)點(diǎn)內(nèi)部保存了一 nextTable 引用,它指向一張 hash 表。在擴(kuò)容操作中,我們需要對(duì)每個(gè)桶中的結(jié)點(diǎn)進(jìn)行分離和轉(zhuǎn)移,如果某個(gè)桶結(jié)點(diǎn)中所有節(jié)點(diǎn)都已經(jīng)遷移完成了(已經(jīng)被轉(zhuǎn)移到新表 nextTable 中了),那么會(huì)在原 table 表的該位置掛上一個(gè) ForwardingNode 結(jié)點(diǎn),說(shuō)明此桶已經(jīng)完成遷移。
ForwardingNode 繼承自 Node 結(jié)點(diǎn),并且它唯一的構(gòu)造函數(shù)將構(gòu)建一個(gè)鍵,值,next 都為 null 的結(jié)點(diǎn),反正它就是個(gè)標(biāo)識(shí),無(wú)需那些屬性。但是 hash 值卻為 MOVED。
所以,我們?cè)?putVal 方法中遍歷整個(gè) hash 表的桶結(jié)點(diǎn),如果遇到 hash 值等于 MOVED,說(shuō)明已經(jīng)有線程正在擴(kuò)容 rehash 操作,整體上還未完成,不過(guò)我們要插入的桶的位置已經(jīng)完成了所有節(jié)點(diǎn)的遷移。
由于檢測(cè)到當(dāng)前哈希表正在擴(kuò)容,于是讓當(dāng)前線程去協(xié)助擴(kuò)容。
final Node[] helpTransfer(Node[] tab, Node f) {
Node[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode)f).nextTable) != null) {
//返回一個(gè) 16 位長(zhǎng)度的擴(kuò)容校驗(yàn)標(biāo)識(shí)
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
//sizeCtl 如果處于擴(kuò)容狀態(tài)的話
//前 16 位是數(shù)據(jù)校驗(yàn)標(biāo)識(shí),后 16 位是當(dāng)前正在擴(kuò)容的線程總數(shù)
//這里判斷校驗(yàn)標(biāo)識(shí)是否相等,如果校驗(yàn)符不等或者擴(kuò)容操作已經(jīng)完成了,直接退出循環(huán),不用協(xié)助它們擴(kuò)容了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//否則調(diào)用 transfer 幫助它們進(jìn)行擴(kuò)容
//sc + 1 標(biāo)識(shí)增加了一個(gè)線程進(jìn)行擴(kuò)容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
下面我們看這個(gè)稍顯復(fù)雜的 transfer 方法,我們依然分幾個(gè)部分來(lái)細(xì)說(shuō)。
//第一部分
private?final?void?transfer(Node[]?tab,?Node[]?nextTab) ?{
????????int?n?=?tab.length,?stride;
????????//計(jì)算單個(gè)線程允許處理的最少table桶首節(jié)點(diǎn)個(gè)數(shù),不能小于?16
????????if?((stride?=?(NCPU?>?1)???(n?>>>?3)?/?NCPU?:?n)?????????????stride?=?MIN_TRANSFER_STRIDE;?
????????//剛開(kāi)始擴(kuò)容,初始化?nextTab?
????????if?(nextTab?==?null)?{
????????????try?{
????????????????@SuppressWarnings("unchecked")
????????????????Node[]?nt?=?(Node[])new?Node,?>[n?<1];
????????????????nextTab?=?nt;
????????????}?catch?(Throwable?ex)?{
????????????????sizeCtl?=?Integer.MAX_VALUE;
????????????????return;
????????????}
????????????nextTable?=?nextTab;
????????????//transferIndex?指向最后一個(gè)桶,方便從后向前遍歷?
????????????transferIndex?=?n;
????????}
????????int?nextn?=?nextTab.length;
????????//定義?ForwardingNode?用于標(biāo)記遷移完成的桶
????????ForwardingNode?fwd?=?new?ForwardingNode(nextTab);
這部分代碼還是比較簡(jiǎn)單的,主要完成的是對(duì)單個(gè)線程能處理的最少桶結(jié)點(diǎn)個(gè)數(shù)的計(jì)算和一些屬性的初始化操作。
//第二部分,并發(fā)擴(kuò)容控制的核心
boolean?advance?=?true;
boolean?finishing?=?false;
//i?指向當(dāng)前桶,bound?指向當(dāng)前線程需要處理的桶結(jié)點(diǎn)的區(qū)間下限
for?(int?i?=?0,?bound?=?0;;)?{
???????Node?f;?int?fh;
???????//這個(gè)?while?循環(huán)的目的就是通過(guò)?--i?遍歷當(dāng)前線程所分配到的桶結(jié)點(diǎn)
???????//一個(gè)桶一個(gè)桶的處理
???????while?(advance)?{
???????????int?nextIndex,?nextBound;
???????????if?(--i?>=?bound?||?finishing)
???????????????advance?=?false;
???????????//transferIndex?<=?0?說(shuō)明已經(jīng)沒(méi)有需要遷移的桶了
???????????else?if?((nextIndex?=?transferIndex)?<=?0)?{
???????????????i?=?-1;
???????????????advance?=?false;
???????????}
???????????//更新?transferIndex
???????????//為當(dāng)前線程分配任務(wù),處理的桶結(jié)點(diǎn)區(qū)間為(nextBound,nextIndex)
???????????else?if?(U.compareAndSwapInt(this,?TRANSFERINDEX,?nextIndex,nextBound?=?(nextIndex?>?stride???nextIndex?-?stride?:?0)))?{
???????????????bound?=?nextBound;
???????????????i?=?nextIndex?-?1;
???????????????advance?=?false;
???????????}
???????}
???????//當(dāng)前線程所有任務(wù)完成
???????if?(i?0?||?i?>=?n?||?i?+?n?>=?nextn)?{
???????????int?sc;
???????????if?(finishing)?{
???????????????nextTable?=?null;
???????????????table?=?nextTab;
???????????????sizeCtl?=?(n?<1)?-?(n?>>>?1);
???????????????return;
???????????}
???????????if?(U.compareAndSwapInt(this,?SIZECTL,?sc?=?sizeCtl,?sc?-?1))?{
???????????????if?((sc?-?2)?!=?resizeStamp(n)?<???????????????????return;
???????????????finishing?=?advance?=?true;
???????????????i?=?n;?
???????????}
???????}
???????//待遷移桶為空,那么在此位置?CAS?添加?ForwardingNode?結(jié)點(diǎn)標(biāo)識(shí)該桶已經(jīng)被處理過(guò)了
???????else?if?((f?=?tabAt(tab,?i))?==?null)
???????????advance?=?casTabAt(tab,?i,?null,?fwd);
???????//如果掃描到?ForwardingNode,說(shuō)明此桶已經(jīng)被處理過(guò)了,跳過(guò)即可
???????else?if?((fh?=?f.hash)?==?MOVED)
???????????advance?=?true;?
每個(gè)新參加進(jìn)來(lái)擴(kuò)容的線程必然先進(jìn) while 循環(huán)的最后一個(gè)判斷條件中去領(lǐng)取自己需要遷移的桶的區(qū)間。然后 i 指向區(qū)間的最后一個(gè)位置,表示遷移操作從后往前的做。接下來(lái)的幾個(gè)判斷就是實(shí)際的遷移結(jié)點(diǎn)操作了。等我們大致介紹完成第三部分的源碼再回來(lái)對(duì)各個(gè)判斷條件下的遷移過(guò)程進(jìn)行詳細(xì)的敘述。
//第三部分
else?{
?//
????synchronized?(f)?{
????????if?(tabAt(tab,?i)?==?f)?{
????????????Node?ln,?hn;
????????????//鏈表的遷移操作
????????????if?(fh?>=?0)?{
????????????????int?runBit?=?fh?&?n;
????????????????Node?lastRun?=?f;
????????????????//整個(gè)?for?循環(huán)為了找到整個(gè)桶中最后連續(xù)的?fh?&?n?不變的結(jié)點(diǎn)
????????????????for?(Node?p?=?f.next;?p?!=?null;?p?=?p.next)?{
????????????????????int?b?=?p.hash?&?n;
????????????????????if?(b?!=?runBit)?{
????????????????????????runBit?=?b;
????????????????????????lastRun?=?p;
????????????????????}
????????????????}
????????????????if?(runBit?==?0)?{
????????????????????ln?=?lastRun;
????????????????????hn?=?null;
????????????????}
????????????????else?{
????????????????????hn?=?lastRun;
????????????????????ln?=?null;
????????????????}
????????????????//如果fh&n不變的鏈表的runbit都是0,則nextTab[i]內(nèi)元素ln前逆序,ln及其之后順序
????????????????//否則,nextTab[i+n]內(nèi)元素全部相對(duì)原table逆序
????????????????//這是通過(guò)一個(gè)節(jié)點(diǎn)一個(gè)節(jié)點(diǎn)的往nextTab添加
????????????????for?(Node?p?=?f;?p?!=?lastRun;?p?=?p.next)?{
????????????????????int?ph?=?p.hash;?K?pk?=?p.key;?V?pv?=?p.val;
????????????????????if?((ph?&?n)?==?0)
????????????????????????ln?=?new?Node(ph,?pk,?pv,?ln);
????????????????????else
????????????????????????hn?=?new?Node(ph,?pk,?pv,?hn);
????????????????}
????????????????//把兩條鏈表整體遷移到nextTab中
????????????????setTabAt(nextTab,?i,?ln);
????????????????setTabAt(nextTab,?i?+?n,?hn);
????????????????//將原桶標(biāo)識(shí)位已經(jīng)處理
????????????????setTabAt(tab,?i,?fwd);
????????????????advance?=?true;
????????????}
????????????//紅黑樹(shù)的復(fù)制算法,不再贅述
????????????else?if?(f?instanceof?TreeBin)?{
????????????????TreeBin?t?=?(TreeBin)f;
????????????????TreeNode?lo?=?null,?loTail?=?null;
????????????????TreeNode?hi?=?null,?hiTail?=?null;
????????????????int?lc?=?0,?hc?=?0;
????????????????for?(Node?e?=?t.first;?e?!=?null;?e?=?e.next)?{
????????????????????int?h?=?e.hash;
????????????????????TreeNode?p?=?new?TreeNode(h,?e.key,?e.val,?null,?null);
????????????????????if?((h?&?n)?==?0)?{
????????????????????????if?((p.prev?=?loTail)?==?null)
????????????????????????????lo?=?p;
????????????????????????else
????????????????????????????loTail.next?=?p;
????????????????????loTail?=?p;
????????????????????++lc;
????????????????????}
????????????????????else?{
????????????????????????if?((p.prev?=?hiTail)?==?null)
????????????????????????????hi?=?p;
????????????????????????else
????????????????????????????hiTail.next?=?p;
????????????????????hiTail?=?p;
????????????????????++hc;
????????????????????}
????????????????}
????????????????ln?=?(lc?<=?UNTREEIFY_THRESHOLD)???untreeify(lo)?:(hc?!=?0)???new?TreeBin(lo)?:?t;
????????????????hn?=?(hc?<=?UNTREEIFY_THRESHOLD)???untreeify(hi)?:(lc?!=?0)???new?TreeBin(hi)?:?t;
????????????????setTabAt(nextTab,?i,?ln);
????????????????setTabAt(nextTab,?i?+?n,?hn);
????????????????setTabAt(tab,?i,?fwd);
????????????????advance?=?true;
???????????}
那么至此,有關(guān)遷移的幾種情況已經(jīng)介紹完成了,下面我們整體上把控一下整個(gè)擴(kuò)容和遷移過(guò)程。
首先,每個(gè)線程進(jìn)來(lái)會(huì)先領(lǐng)取自己的任務(wù)區(qū)間,然后開(kāi)始 --i 來(lái)遍歷自己的任務(wù)區(qū)間,對(duì)每個(gè)桶進(jìn)行處理。如果遇到桶的頭結(jié)點(diǎn)是空的,那么使用 ForwardingNode 標(biāo)識(shí)該桶已經(jīng)被處理完成了。如果遇到已經(jīng)處理完成的桶,直接跳過(guò)進(jìn)行下一個(gè)桶的處理。如果是正常的桶,對(duì)桶首節(jié)點(diǎn)加鎖,正常的遷移即可,遷移結(jié)束后依然會(huì)將原表的該位置標(biāo)識(shí)位已經(jīng)處理。
當(dāng) i < 0,說(shuō)明本線程處理速度夠快的,整張表的最后一部分已經(jīng)被它處理完了,現(xiàn)在需要看看是否還有其他線程在自己的區(qū)間段還在遷移中。這是退出的邏輯判斷部分:

finnish 是一個(gè)標(biāo)志,如果為 true 則說(shuō)明整張表的遷移操作已經(jīng)全部完成了,我們只需要重置 table 的引用并將 nextTable 賦為空即可。否則,CAS 式的將 sizeCtl 減一,表示當(dāng)前線程已經(jīng)完成了任務(wù),退出擴(kuò)容操作。
如果退出成功,那么需要進(jìn)一步判斷是否還有其他線程仍然在執(zhí)行任務(wù)。
if?((sc?-?2)?!=?resizeStamp(n)?<???return;
我們說(shuō)過(guò) resizeStamp(n) 返回的是對(duì) n 的一個(gè)數(shù)據(jù)校驗(yàn)標(biāo)識(shí),占 16 位。而 RESIZE_STAMP_SHIFT 的值為 16,那么位運(yùn)算后,整個(gè)表達(dá)式必然在右邊空出 16 個(gè)零。也正如我們所說(shuō)的,sizeCtl 的高 16 位為數(shù)據(jù)校驗(yàn)標(biāo)識(shí),低 16 為表示正在進(jìn)行擴(kuò)容的線程數(shù)量。
(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 表示當(dāng)前只有一個(gè)線程正在工作,相對(duì)應(yīng)的,如果 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,說(shuō)明當(dāng)前線程就是最后一個(gè)還在擴(kuò)容的線程,那么會(huì)將 finishing 標(biāo)識(shí)為 true,并在下一次循環(huán)中退出擴(kuò)容方法。
這一塊的難點(diǎn)在于對(duì) sizeCtl 的各個(gè)值的理解,關(guān)于它的深入理解,這里推薦一篇文章。
著重理解位操作:
http://wuzhaoyang.me/2016/09/05/java-collection-map-2.html
看到這里,真的為 Doug Lea 精妙的設(shè)計(jì)而折服,針對(duì)于多線程訪問(wèn)問(wèn)題,不但沒(méi)有拒絕式得將他們阻塞在門外,反而邀請(qǐng)他們來(lái)幫忙一起工作。
好了,我們一路往回走,回到我們最初分析的 putVal 方法。接著前文的分析,當(dāng)我們根據(jù) hash 值,找到對(duì)應(yīng)的桶結(jié)點(diǎn),如果發(fā)現(xiàn)該結(jié)點(diǎn)為 ForwardingNode 結(jié)點(diǎn),表明當(dāng)前的哈希表正在擴(kuò)容和 rehash,于是將本線程送進(jìn)去幫忙擴(kuò)容。否則如果是普通的桶結(jié)點(diǎn),于是鎖住該桶,分鏈表和紅黑樹(shù)的插入一個(gè)節(jié)點(diǎn),具體插入過(guò)程類似 HashMap,此處不再贅述。
當(dāng)我們成功的添加完成一個(gè)結(jié)點(diǎn),最后是需要判斷添加操作后是否會(huì)導(dǎo)致哈希表達(dá)到它的閾值,并針對(duì)不同情況決定是否需要進(jìn)行擴(kuò)容,還有 CAS 式更新哈希表實(shí)際存儲(chǔ)的鍵值對(duì)數(shù)量。這些操作都封裝在 addCount 這個(gè)方法中,當(dāng)然 putVal 方法的最后必然會(huì)調(diào)用該方法進(jìn)行處理。下面我們看看該方法的具體實(shí)現(xiàn),該方法主要做兩個(gè)事情。一是更新 baseCount,二是判斷是否需要擴(kuò)容。
//第一部分,更新?baseCount
private?final?void?addCount(long?x,?int?check)?{
????CounterCell[]?as;?long?b,?s;
????//如果更新失敗才會(huì)進(jìn)入的?if?的主體代碼中
????//s?=?b?+?x??其中?x?等于?1
????if?((as?=?counterCells)?!=?null?||
????????!U.compareAndSwapLong(this,?BASECOUNT,?b?=?baseCount,?s?=?b?+?x))?{
????????CounterCell?a;?long?v;?int?m;
????????boolean?uncontended?=?true;
????????//高并發(fā)下?CAS?失敗會(huì)執(zhí)行?fullAddCount?方法
????????if?(as?==?null?||?(m?=?as.length?-?1)?0?||?(a?=?as[ThreadLocalRandom.getProbe()?&?m])?==?null?||!(uncontended?=U.compareAndSwapLong(a,?CELLVALUE,?v?=?a.value,?v?+?x)))?{
????????????fullAddCount(x,?uncontended);
????????????return;
????????}
????????if?(check?<=?1)
????????????return;
????????s?=?sumCount();
????}
這一部分主要完成的是對(duì) baseCount 的 CAS 更新。
//第二部分,判斷是否需要擴(kuò)容
if?(check?>=?0)?{
?????Node[]?tab,?nt;?int?n,?sc;
?????while?(s?>=?(long)(sc?=?sizeCtl)?&&?(tab?=?table)?!=?null?&&(n?=?tab.length)???????????int?rs?=?resizeStamp(n);
??????????if?(sc?0)?{
?????????????if?((sc?>>>?RESIZE_STAMP_SHIFT)?!=?rs?||?sc?==?rs?+?1?||sc?==?rs?+?MAX_RESIZERS?||?(nt?=?nextTable)?==?null?||transferIndex?<=?0)
???????????????break;
???????????????if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?sc?+?1))
??????????????????transfer(tab,?nt);
????????????}
????????else?if?(U.compareAndSwapInt(this,?SIZECTL,?sc,(rs?<2))
????????????????????transfer(tab,?null);
???????????????s?=?sumCount();
????????????}
????}
這部分代碼也是比較簡(jiǎn)單的,不再贅述。
至此,對(duì)于 put 方法的源碼分析已經(jīng)完全結(jié)束了,很復(fù)雜但也很讓人欽佩。下面我們簡(jiǎn)單看看 remove 方法的實(shí)現(xiàn)。
四、remove 方法實(shí)現(xiàn)并發(fā)刪除
在我們分析完 put 方法的源碼之后,相信 remove 方法對(duì)你而言就比較輕松了,無(wú)非就是先定位再刪除的復(fù)合。
限于篇幅,我們這里簡(jiǎn)單的描述下 remove 方法的并發(fā)刪除過(guò)程。
首先遍歷整張表的桶結(jié)點(diǎn),如果表還未初始化或者無(wú)法根據(jù)參數(shù)的 hash 值定位到桶結(jié)點(diǎn),那么將返回 null。
如果定位到的桶結(jié)點(diǎn)類型是 ForwardingNode 結(jié)點(diǎn),調(diào)用 helpTransfer 協(xié)助擴(kuò)容。
否則就老老實(shí)實(shí)的給桶加鎖,刪除一個(gè)節(jié)點(diǎn)。
最后會(huì)調(diào)用 addCount 方法 CAS 更新 baseCount 的值。
五、其他的一些常用方法的基本介紹
最后我們?cè)谘a(bǔ)充一些 ConcurrentHashMap 中的小而常用的方法的介紹。
1、size?size 方法的作用是為我們返回哈希表中實(shí)際存在的鍵值對(duì)的總數(shù)。
public?int?size()?{
????long?n?=?sumCount();
????return?((n?0L)???0?:(n?>?(long)Integer.MAX_VALUE)???Integer.MAX_VALUE?:(int)n);
}
final?long?sumCount()?{
????CounterCell[]?as?=?counterCells;?CounterCell?a;
????long?sum?=?baseCount;
????if?(as?!=?null)?{
????????for?(int?i?=?0;?i?????????????if?((a?=?as[i])?!=?null)
????????????????sum?+=?a.value;
????????}
????}
????return?sum;
}
可能你會(huì)有所疑問(wèn),ConcurrentHashMap 中的 baseCount 屬性不就是記錄的所有鍵值對(duì)的總數(shù)嗎?直接返回它不就行了嗎?
之所以沒(méi)有這么做,是因?yàn)槲覀兊?addCount 方法用于 CAS 更新 baseCount,但很有可能在高并發(fā)的情況下,更新失敗,那么這些節(jié)點(diǎn)雖然已經(jīng)被添加到哈希表中了,但是數(shù)量卻沒(méi)有被統(tǒng)計(jì)。
還好,addCount 方法在更新 baseCount 失敗的時(shí)候,會(huì)調(diào)用 fullAddCount 將這些失敗的結(jié)點(diǎn)包裝成一個(gè) CounterCell 對(duì)象,保存在 CounterCell 數(shù)組中。那么整張表實(shí)際的 size 其實(shí)是 baseCount 加上 CounterCell 數(shù)組中元素的個(gè)數(shù)。
2、get?get 方法可以根據(jù)指定的鍵,返回對(duì)應(yīng)的鍵值對(duì),由于是讀操作,所以不涉及到并發(fā)問(wèn)題。源碼也是比較簡(jiǎn)單的。
public?V?get(Object?key)?{
????????Node[]?tab;?Node?e,?p;?int?n,?eh;?K?ek;
????????int?h?=?spread(key.hashCode());
????????if?((tab?=?table)?!=?null?&&?(n?=?tab.length)?>?0?&&
????????????(e?=?tabAt(tab,?(n?-?1)?&?h))?!=?null)?{
????????????if?((eh?=?e.hash)?==?h)?{
????????????????if?((ek?=?e.key)?==?key?||?(ek?!=?null?&&?key.equals(ek)))
????????????????????return?e.val;
????????????}
????????????else?if?(eh?0)
????????????????return?(p?=?e.find(h,?key))?!=?null???p.val?:?null;
????????????while?((e?=?e.next)?!=?null)?{
????????????????if?(e.hash?==?h?&&
????????????????????((ek?=?e.key)?==?key?||?(ek?!=?null?&&?key.equals(ek))))
????????????????????return?e.val;
????????????}
????????}
????????return?null;
????}
3、clear?clear 方法將刪除整張哈希表中所有的鍵值對(duì),刪除操作也是一個(gè)桶一個(gè)桶的進(jìn)行刪除。
public?void?clear()?{
????long?delta?=?0L;?//?negative?number?of?deletions
????int?i?=?0;
????Node[]?tab?=?table;
????while?(tab?!=?null?&&?i?????????int?fh;
????????Node?f?=?tabAt(tab,?i);
????????if?(f?==?null)
????????????++i;
????????else?if?((fh?=?f.hash)?==?MOVED)?{
????????????tab?=?helpTransfer(tab,?f);
????????????i?=?0;?//?restart
????????}
????????else?{
????????????synchronized?(f)?{
????????????????if?(tabAt(tab,?i)?==?f)?{
????????????????????Node?p?=?(fh?>=?0???f?:(f?instanceof?TreeBin)??((TreeBin)f).first?:?null);
????????????????????????//循環(huán)到鏈表或者紅黑樹(shù)的尾部
????????????????????????while?(p?!=?null)?{
????????????????????????????--delta;
????????????????????????????p?=?p.next;
????????????????????????}
????????????????????????//首先刪除鏈、樹(shù)的末尾元素,避免產(chǎn)生大量垃圾??
????????????????????????//利用CAS無(wú)鎖置null??
????????????????????????setTabAt(tab,?i++,?null);
????????????????????}
????????????????}
????????????}
????????}
????????if?(delta?!=?0L)
????????????addCount(delta,?-1);
????}
到此為止,有關(guān)這個(gè)為并發(fā)而生的 ConcurrentHashMap 內(nèi)部的核心的部分,我們已經(jīng)通過(guò)源碼進(jìn)行了分析。確實(shí)挺費(fèi)腦,再次膜拜下 jdk 大神們的智慧??偨Y(jié)不到之處,望指出!
END


