JDK1.8-ConcurrentHashMap詳解
ava JDK升級到1.8后有些集合類的實現(xiàn)有了變化,其中ConcurrentHashMap就有進行結構上的大調整。jdk1.6、1.7實現(xiàn)的共同點主要是通過采用分段鎖Segment減少熱點域來提高并發(fā)效率,1.8版本的實現(xiàn)有哪些變化呢?
重要概念
在正式研究前,我們需要先知道幾個重要參數(shù),提前說明其值所代表的意義以便更好的講解源碼實現(xiàn)。
table
所有數(shù)據(jù)都存在table中,table的容量會根據(jù)實際情況進行擴容,table[i]存放的數(shù)據(jù)類型有以下3種:
- TreeBin 用于包裝紅黑樹結構的結點類型
- ForwardingNode 擴容時存放的結點類型,并發(fā)擴容的實現(xiàn)關鍵之一
- Node 普通結點類型,表示鏈表頭結點
nextTable
擴容時用于存放數(shù)據(jù)的變量,擴容完成后會置為null。
sizeCtl
以volatile修飾的sizeCtl用于數(shù)組初始化與擴容控制,它有以下幾個值:
當前未初始化:
?=?0??//未指定初始容量
?>?0??//由指定的初始容量計算而來,再找最近的2的冪次方。
??//比如傳入6,計算公式為6+6/2+1=10,最近的2的冪次方為16,所以sizeCtl就為16。
初始化中:
?=?-1?//table正在初始化
?=?-N?//N是int類型,分為兩部分,高15位是指定容量標識,低16位表示
??????//并行擴容線程數(shù)+1,具體在resizeStamp函數(shù)介紹。
初始化完成:
?=table.length?*?0.75??//擴容閾值調為table容量大小的0.75倍
其它的分析相應源碼時再細說。
put()
public?V?put(K?key,?V?value)?{
????return?putVal(key,?value,?false);
}
final?V?putVal(K?key,?V?value,?boolean?onlyIfAbsent)?{
????if?(key?==?null?||?value?==?null)?throw?new?NullPointerException();
????int?hash?=?spread(key.hashCode());???//@1,講解見下面小標題。
?????//i處結點數(shù)量,2: TreeBin或鏈表結點數(shù), 其它:鏈表結點數(shù)。主要用于每次加入結點后查看是否要由鏈表轉為紅黑樹
????int?binCount?=?0;?
????for?(Node[]?tab?=?table;;)?{???//CAS經(jīng)典寫法,不成功無限重試,讓再次進行循環(huán)進行相應操作。
????????Node?f;?int?n,?i,?fh;
????????//除非構造時指定初始化集合,否則默認構造不初始化table,所以需要在添加時元素檢查是否需要初始化。
????????if?(tab?==?null?||?(n?=?tab.length)?==?0)
????????????tab?=?initTable();??//@2
????????//CAS操作得到對應table中元素
????????else?if?((f?=?tabAt(tab,?i?=?(n?-?1)?&?hash))?==?null)?{?//@3
????????????if?(casTabAt(tab,?i,?null,
?????????????????????????new?Node(hash,?key,?value,?null)))
????????????????break;???????????????????//null創(chuàng)建Node對象做為鏈表首結點
????????}
????????else?if?((fh?=?f.hash)?==?MOVED)??//當前結點正在擴容
?????????//讓當前線程調用helpTransfer也參與到擴容過程中來,擴容完畢后tab指向新table。
????????????tab?=?helpTransfer(tab,?f);?
????????else?{
????????????V?oldVal?=?null;
????????????synchronized?(f)?{
????????????????if?(tabAt(tab,?i)?==?f)?{??//雙重檢查i處結點未變化
????????????????????if?(fh?>=?0)?{??//表明是鏈表結點類型,hash值是大于0的,即spread()方法計算而來
????????????????????????binCount?=?1;
????????????????????????for?(Node?e?=?f;;?++binCount)?{
????????????????????????????K?ek;
????????????????????????????if?(e.hash?==?hash?&&
????????????????????????????????((ek?=?e.key)?==?key?||
?????????????????????????????????(ek?!=?null?&&?key.equals(ek))))?{
????????????????????????????????oldVal?=?e.val;
????????????????????????????????//onlyIfAbsent表示是新元素才加入,舊值不替換,默認為fase。
????????????????????????????????if?(!onlyIfAbsent)??
????????????????????????????????????e.val?=?value;
????????????????????????????????break;
????????????????????????????}
????????????????????????????Node?pred?=?e;
????????????????????????????if?((e?=?e.next)?==?null)?{
????????????????????????????????//jdk1.8版本是把新結點加入鏈表尾部,next由volatile修飾
????????????????????????????????pred.next?=?new?Node(hash,?key,
??????????????????????????????????????????????????????????value,?null);
????????????????????????????????break;
????????????????????????????}
????????????????????????}
????????????????????}
????????????????????else?if?(f?instanceof?TreeBin)?{??//紅黑樹結點類型
????????????????????????Node?p;
????????????????????????binCount?=?2;
????????????????????????if?((p?=?((TreeBin)f).putTreeVal(hash,?key,
???????????????????????????????????????????????????????value))?!=?null)?{???//@4
????????????????????????????oldVal?=?p.val;
????????????????????????????if?(!onlyIfAbsent)
????????????????????????????????p.val?=?value;
????????????????????????}
????????????????????}
????????????????}
????????????}
????????????if?(binCount?!=?0)?{
????????????????if?(binCount?>=?TREEIFY_THRESHOLD)??//默認桶中結點數(shù)超過8個數(shù)據(jù)結構會轉為紅黑樹
????????????????????treeifyBin(tab,?i);???//@5
????????????????if?(oldVal?!=?null)
????????????????????return?oldVal;
????????????????break;
????????????}
????????}
????}
????addCount(1L,?binCount);??//更新size,檢測擴容
????return?null;
}
注釋已說的比較明白,上面的代碼中的數(shù)字注釋再單獨細說下:
spread()
jdk1.8的hash策略,與以往版本一樣都是為了減少hash沖突:
static?final?int?HASH_BITS?=?0x7fffffff;?//?usable?bits?of?normal?node?hash???//01111111_11111111_11111111_11111111
static?final?int?spread(int?h)?{
????//無符號右移加入高位影響,與HASH_BITS做與操作保留對hash有用的比特位,有讓hash>0的意思
????return?(h?^?(h?>>>?16))?&?HASH_BITS;
}
initTable()
initTable()用于里面table數(shù)組的初始化,值得一提的是table初始化是沒有加鎖的,那么如何處理并發(fā)呢?
由下面代碼可以看到,當要初始化時會通過CAS操作將sizeCtl置為-1,而sizeCtl由volatile修飾,保證修改對后面線程可見。
這之后如果再有線程執(zhí)行到此方法時檢測到sizeCtl為負數(shù),說明已經(jīng)有線程在給擴容了,這個線程就會調用Thread.yield()讓出一次CPU執(zhí)行時間。
private?final?Node[]?initTable()?{
????????Node[]?tab;?int?sc;
????????while?((tab?=?table)?==?null?||?tab.length?==?0)?{
????????????if?((sc?=?sizeCtl)?0)
????????????????Thread.yield();?
????????????????//正在初始化時將sizeCtl設為-1
????????????else?if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?-1))?{
????????????????try?{
????????????????????if?((tab?=?table)?==?null?||?tab.length?==?0)?{
????????????????????????int?n?=?(sc?>?0)???sc?:?DEFAULT_CAPACITY;??//DEFAULT_CAPACITY為16
????????????????????????@SuppressWarnings("unchecked")
????????????????????????Node[]?nt?=?(Node[])new?Node,?>[n];
????????????????????????table?=?tab?=?nt;
????????????????????????sc?=?n?-?(n?>>>?2);???//擴容閾值為新容量的0.75倍
????????????????????}
????????????????}?finally?{
????????????????????sizeCtl?=?sc;???//擴容保護
????????????????}
????????????????break;
????????????}
????????}
????????return?tab;
????}
tabAt()/casTabAt()/setTabAt()
ABASE表示table中首個元素的內存偏移地址,所以(long)i << ASHIFT) + ABASE得到table[i]的內存偏移地址:
static?final??Node?tabAt(Node[]?tab,?int?i) ? {
????return?(Node)U.getObjectVolatile(tab,?((long)i?<}
static?final??boolean?casTabAt(Node[]?tab,?int?i,
????????????????????????????????????Node?c,?Node?v) ?{
????return?U.compareAndSwapObject(tab,?((long)i?<}
static?final??void?setTabAt(Node[]?tab,?int?i,?Node?v) ?{
????U.putObjectVolatile(tab,?((long)i?<}
對i位置結點的寫操作有兩個方法,casTabAt()與setTabAt()。源碼中有這樣一段注釋:
*?Note?that?calls?to?setTabAt?always?occur?within?locked?regions,
*?and?so?in?principle?require?only?release?ordering,?not
*?full?volatile?semantics,?but?are?currently?coded?as?volatile
*?writes?to?be?conservative.
所以要原子語義的寫操作需要使用casTabAt(),setTabAt()是在鎖定桶的狀態(tài)下才會被調用,之所以實現(xiàn)成這樣只是帶保守性的一種寫法而已。放松一下繼續(xù)~

TreeBin
注釋4、5都是有關TreeBin的操作,為進一步提升性能,ConcurrentHashMap引入了紅黑樹。引入紅黑樹是因為鏈表查詢的時間復雜度為O(n),紅黑樹查詢的時間復雜度為O(log(n)),所以在結點比較多的情況下使用紅黑樹可以大大提升性能。
紅黑樹是一種自平衡二叉查找樹,有如下性質:
- 每個節(jié)點要么是紅色,要么是黑色。
- 根節(jié)點永遠是黑色的。
- 所有的葉節(jié)點都是空節(jié)點(即 null),并且是黑色的。
- 每個紅色節(jié)點的兩個子節(jié)點都是黑色。(從每個葉子到- 根的路徑上不會有兩個連續(xù)的紅色節(jié)點)
- 從任一節(jié)點到其每個葉子的所有簡單路徑都包含相同數(shù)目的黑色節(jié)點。
圖例:

treeifyBin()
桶內元素超時8個時會調用到此方法。
private?final?void?treeifyBin(Node[]?tab,?int?index) ?{
????????Node?b;?int?n,?sc;
????????if?(tab?!=?null)?{
????????????if?((n?=?tab.length)?????????????????tryPresize(n?<1);??//如果數(shù)組整體容量太小則去擴容,放棄轉紅黑樹結構
????????????else?if?((b?=?tabAt(tab,?index))?!=?null?&&?b.hash?>=?0)?{
????????????????synchronized?(b)?{
????????????????????if?(tabAt(tab,?index)?==?b)?{
????????????????????????TreeNode?hd?=?null,?tl?=?null;
????????????????????????for?(Node?e?=?b;?e?!=?null;?e?=?e.next)?{
????????????????????????????TreeNode?p?=
????????????????????????????????new?TreeNode(e.hash,?e.key,?e.val,
??????????????????????????????????????????????????null,?null);
????????????????????????????if?((p.prev?=?tl)?==?null)
????????????????????????????????hd?=?p;
????????????????????????????else
????????????????????????????????tl.next?=?p;
????????????????????????????tl?=?p;
????????????????????????}
????????????????????????setTabAt(tab,?index,?new?TreeBin(hd));
可以看出按原Node鏈表順序轉為了TreeNode鏈表,每個TreeNode的prev、next已完備,傳入頭結點hd構造紅黑樹。
TreeBin構造函數(shù)
TreeBin(TreeNode?b)?{
???//Node(int?hash,?K?key,?V?val,?Node?next)
????????????super(TREEBIN,?null,?null,?null);
????????????this.first?=?b;
????????????TreeNode?r?=?null;
????????????for?(TreeNode?x?=?b,?next;?x?!=?null;?x?=?next)?{??//依次處理每個結點
????????????????next?=?(TreeNode)x.next;
????????????????x.left?=?x.right?=?null;
????????????????if?(r?==?null)?{
????????????????????x.parent?=?null;
????????????????????x.red?=?false;??//根結點為黑色
????????????????????r?=?x;
????????????????}
????????????????else?{
????????????????????K?k?=?x.key;
????????????????????int?h?=?x.hash;
????????????????????Class>?kc?=?null;
????????????????????for?(TreeNode?p?=?r;;)?{?//遍歷查找新結點存放位置
????????????????????????int?dir,?ph;
????????????????????????K?pk?=?p.key;
????????????????????????if?((ph?=?p.hash)?>?h)
????????????????????????????dir?=?-1;
????????????????????????else?if?(ph?????????????????????????????dir?=?1;
????????????????????????//key有實現(xiàn)Comparable接口則使用compareTo()進行比較,否則采用tieBreakOrder中默認的比較方式,即比較hashCode。
????????????????????????else?if?((kc?==?null?&&
??????????????????????????????????(kc?=?comparableClassFor(k))?==?null)?||
?????????????????????????????????(dir?=?compareComparables(kc,?k,?pk))?==?0)
????????????????????????????dir?=?tieBreakOrder(k,?pk);
????????????????????????????TreeNode?xp?=?p;
????????????????????????if?((p?=?(dir?<=?0)???p.left?:?p.right)?==?null)?{??//左子節(jié)點或右子節(jié)點為空則在p下添加新結點,否則p的值更新為子節(jié)點繼續(xù)查找。紅黑樹中結點p.left <= p <= p.right
????????????????????????????x.parent?=?xp;??//保存新結點的父結點
????????????????????????????if?(dir?<=?0)
????????????????????????????????xp.left?=?x;?//排序小的放左邊
????????????????????????????else
????????????????????????????????xp.right?=?x;??//排序大的放右邊
????????????????????????????r?=?balanceInsertion(r,?x);??//平衡紅黑樹
????????????????????????????break;
...
????????????this.root?=?r;
...
????????}
putTreeVal()與此方法遍歷方式類似不再介紹。
擴容實現(xiàn)
寫這篇文章主要就是想講講擴容,Let’s go!
什么時候會擴容?
使用put()添加元素時會調用addCount(),內部檢查sizeCtl看是否需要擴容。
tryPresize()被調用,此方法被調用有兩個調用點:
- 鏈表轉紅黑樹(put()時檢查)時如果table容量小于64(MIN_TREEIFY_CAPACITY),則會觸發(fā)擴容。
- 調用putAll()之類一次性加入大量元素,會觸發(fā)擴容。
addCount()
addCount()與tryPresize()實現(xiàn)很相似,我們先以addCount()分析下擴容邏輯:
private?final?void?addCount(long?x,?int?check)?{
????...
????//check就是結點數(shù)量,有新元素加入成功才檢查是否要擴容。
????if?(check?>=?0)?{
????????Node[]?tab,?nt;?int?n,?sc;
????????//s表示加入新元素后容量大小,計算已省略。
????????//新容量大于當前擴容閾值并且小于最大擴容值才擴容,如果tab=null說明正在初始化,死循環(huán)等待初始化完成。
????????while?(s?>=?(long)(sc?=?sizeCtl)?&&?(tab?=?table)?!=?null?&&
???????????????(n?=?tab.length)?????????????int?rs?=?resizeStamp(n);??//@1
????????????//sc<0表示已經(jīng)有線程在進行擴容工作
????????????if?(sc?0)?{
????????????????//條件1:檢查是對容量n的擴容,保證sizeCtl與n是一塊修改好的
????????????????//條件2與條件3:應該是進行sc的最小值或最大值判斷。
????????????????//條件4與條件5:?確保tranfer()中的nextTable相關初始化邏輯已走完。
????????????????if?((sc?>>>?RESIZE_STAMP_SHIFT)?!=?rs?||?sc?==?rs?+?1?||
????????????????????sc?==?rs?+?MAX_RESIZERS?||?(nt?=?nextTable)?==?null?||
????????????????????transferIndex?<=?0)
????????????????????break;
????????????????if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?sc?+?1))??//有新線程參與擴容則sizeCtl加1
????????????????????transfer(tab,?nt);
????????????}
????????????//沒有線程在進行擴容,將sizeCtl的值改為(rs << RESIZE_STAMP_SHIFT)?+ 2),原因見下面sizeCtl值的計算分析。
????????????else?if?(U.compareAndSwapInt(this,?SIZECTL,?sc,
?????????????????????????????????????????(rs?<2))
????????????????transfer(tab,?null);
????????????s?=?sumCount();
????????}
????}
}
resizeStamp()
在上面的代碼中首先有調用到這樣的一個方法。
/**
?*?The?number?of?bits?used?for?generation?stamp?in?sizeCtl.
?*?Must?be?at?least?6?for?32bit?arrays.
*/
private?static?int?RESIZE_STAMP_BITS?=?16;
/**
?*?The?bit?shift?for?recording?size?stamp?in?sizeCtl.
?*/
private?static?final?int?RESIZE_STAMP_SHIFT?=?32?-?RESIZE_STAMP_BITS;
static?final?int?resizeStamp(int?n)?{
????????return?Integer.numberOfLeadingZeros(n)?|?(1?<(RESIZE_STAMP_BITS?-?1));
}
Integer.numberOfLeadingZeros(n)用于計算n轉換成二進制后前面有幾個0。這個有什么作用呢?
首先ConcurrentHashMap的容量必定是2的冪次方,所以不同的容量n前面0的個數(shù)必然不同,這樣可以保證是在原容量為n的情況下進行擴容。(搜索公眾號Java知音,回復“2021”,送你一份Java面試題寶典)
(1 << (RESIZE_STAMP_BITS - 1)即是1<<15,表示為二進制即是高16位為0,第16位為1:
0000?0000?0000?0000?1000?0000?0000?0000
所以resizeStamp()的返回值(簡稱為rs) 高16位置0,第16位為1,低15位存放當前容量n擴容標識,用于表示是對n的擴容。rs與RESIZE_STAMP_SHIFT配合可以求出新的sizeCtl的值,分情況如下:
sc >= 0表示沒有線程在擴容,使用CAS將sizeCtl的值改為(rs << RESIZE_STAMP_SHIFT) + 2)。sc < 0已經(jīng)有線程在擴容,將sizeCtl+1并調用transfer()讓當前線程參與擴容。
rs即resizeStamp(n),如當前容量為8時sc(sizeCtl)的計算過程如下:
//容量n=8
0000?0000?0000?0000?0000?0000?0000?1000
//Integer.numberOfLeadingZeros(8)=28,二進制表示如下:
0000?0000?0000?0000?0000?0000?0001?1100
//rs
0000?0000?0000?0000?1000?0000?0001?1100
//temp = rs << RESIZE_STAMP_SHIFT,即 temp = rs << 16,左移16后temp最高位為1,所以temp成了一個負數(shù)。
1000?0000?0001?1100?0000?0000?0000?0000
//第一個線程要擴容時,sc?=?(rs?<
1000?0000?0001?1100?0000?0000?0000?0010
那么在擴容時sizeCtl值的意義便如下圖所示:

tryPresize()
private?final?void?tryPresize(int?size)?{
????????//根據(jù)傳入的size計算出真正的新容量,新容量需要是2的冪次方。
????????int?c?=?(size?>=?(MAXIMUM_CAPACITY?>>>?1))???MAXIMUM_CAPACITY?:
????????????tableSizeFor(size?+?(size?>>>?1)?+?1);
????????int?sc;
????????while?((sc?=?sizeCtl)?>=?0)?{
????????????Node[]?tab?=?table;?int?n;
????????????if?(tab?==?null?||?(n?=?tab.length)?==?0)?{
????????????????n?=?(sc?>?c)???sc?:?c;???//table未初始化則給一個初始容量
????????????????//后面相似代碼不再講解
????????????????if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?-1))?{
????????????????????try?{
????????????????????????if?(table?==?tab)?{
????????????????????????????@SuppressWarnings("unchecked")
????????????????????????????Node[]?nt?=?(Node[])new?Node,?>[n];
????????????????????????????table?=?nt;
????????????????????????????sc?=?n?-?(n?>>>?2);
????????????????????????}
????????????????????}?finally?{
????????????????????????sizeCtl?=?sc;
????????????????????}
????????????????}
????????????}
????????????else?if?(c?<=?sc?||?n?>=?MAXIMUM_CAPACITY)
????????????????break;
????????????else?if?(tab?==?table)?{
????????????????int?rs?=?resizeStamp(n);
????????????????if?(sc?0)?{
????????????????????Node[]?nt;
????????????????????if?((sc?>>>?RESIZE_STAMP_SHIFT)?!=?rs?||?sc?==?rs?+?1?||
????????????????????????sc?==?rs?+?MAX_RESIZERS?||?(nt?=?nextTable)?==?null?||
????????????????????????transferIndex?<=?0)
????????????????????????break;
????????????????????if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?sc?+?1))
????????????????????????//傳入指定容量
????????????????????????transfer(tab,?nt);
????????????????}
????????????????else?if?(U.compareAndSwapInt(this,?SIZECTL,?sc,
?????????????????????????????????????????????(rs?<2))
????????????????????transfer(tab,?null);
????????????}
????????}
????}
transfer()
jdk1.8版本的ConcurrentHashMap支持并發(fā)擴容,上面已經(jīng)分析了一小部分,下面這個方法是真正進行并行擴容的地方。
private?final?void?transfer(Node[]?tab,?Node[]?nextTab) ?{
????????int?n?=?tab.length,?stride;
????????if?((stride?=?(NCPU?>?1)???(n?>>>?3)?/?NCPU?:?n)?//每個線程處理桶的最小數(shù)目,可以看出核數(shù)越高步長越小,最小16個。
????????????stride?=?MIN_TRANSFER_STRIDE;?//?subdivide?range
????????if?(nextTab?==?null)?{
????????????try?{
????????????????@SuppressWarnings("unchecked")
????????????????Node[]?nt?=?(Node[])new?Node,?>[n?<1];??//擴容到2倍
????????????????nextTab?=?nt;
????????????}?catch?(Throwable?ex)?{??????//?try?to?cope?with?OOME
????????????????sizeCtl?=?Integer.MAX_VALUE;??//擴容保護
????????????????return;
????????????}
????????????nextTable?=?nextTab;
????????????transferIndex?=?n;??//擴容總進度,>=transferIndex的桶都已分配出去。
????????}
????????int?nextn?=?nextTab.length;
??????????//擴容時的特殊節(jié)點,標明此節(jié)點正在進行遷移,擴容期間的元素查找要調用其find()方法在nextTable中查找元素。
????????ForwardingNode?fwd?=?new?ForwardingNode(nextTab);?
????????//當前線程是否需要繼續(xù)尋找下一個可處理的節(jié)點
????????boolean?advance?=?true;
????????boolean?finishing?=?false;?//所有桶是否都已遷移完成。
????????for?(int?i?=?0,?bound?=?0;;)?{
????????????Node?f;?int?fh;
????????????//此循環(huán)的作用是確定當前線程要遷移的桶的范圍或通過更新i的值確定當前范圍內下一個要處理的節(jié)點。
????????????while?(advance)?{
????????????????int?nextIndex,?nextBound;
????????????????if?(--i?>=?bound?||?finishing)??//每次循環(huán)都檢查結束條件
????????????????????advance?=?false;
????????????????//遷移總進度<=0,表示所有桶都已遷移完成。
????????????????else?if?((nextIndex?=?transferIndex)?<=?0)?{??
????????????????????i?=?-1;
????????????????????advance?=?false;
????????????????}
????????????????else?if?(U.compareAndSwapInt
?????????????????????????(this,?TRANSFERINDEX,?nextIndex,
??????????????????????????nextBound?=?(nextIndex?>?stride??
???????????????????????????????????????nextIndex?-?stride?:?0)))?{??//transferIndex減去已分配出去的桶。
????????????????????//確定當前線程每次分配的待遷移桶的范圍為[bound,?nextIndex)
????????????????????bound?=?nextBound;
????????????????????i?=?nextIndex?-?1;
????????????????????advance?=?false;
????????????????}
????????????}
????????????//當前線程自己的活已經(jīng)做完或所有線程的活都已做完,第二與第三個條件應該是下面讓"i = n"后,再次進入循環(huán)時要做的邊界檢查。
????????????if?(i?0?||?i?>=?n?||?i?+?n?>=?nextn)?{
????????????????int?sc;
????????????????if?(finishing)?{??//所有線程已干完活,最后才走這里。
????????????????????nextTable?=?null;
????????????????????table?=?nextTab;??//替換新table
????????????????????sizeCtl?=?(n?<1)?-?(n?>>>?1);?//調sizeCtl為新容量0.75倍。
????????????????????return;
????????????????}
????????????????//當前線程已結束擴容,sizeCtl-1表示參與擴容線程數(shù)-1。
????????????????if?(U.compareAndSwapInt(this,?SIZECTL,?sc?=?sizeCtl,?sc?-?1))?{
?????????????????//還記得addCount()處給sizeCtl賦的初值嗎?相等時說明沒有線程在參與擴容了,置finishing=advance=true,為保險讓i=n再檢查一次。
????????????????????if?((sc?-?2)?!=?resizeStamp(n)?<????????????????????????return;
????????????????????finishing?=?advance?=?true;
????????????????????i?=?n;?//?recheck?before?commit
????????????????}
????????????}
????????????else?if?((f?=?tabAt(tab,?i))?==?null)
????????????????advance?=?casTabAt(tab,?i,?null,?fwd);??//如果i處是ForwardingNode表示第i個桶已經(jīng)有線程在負責遷移了。
????????????else?if?((fh?=?f.hash)?==?MOVED)
????????????????advance?=?true;?//?already?processed
????????????else?{
????????????????synchronized?(f)?{??//桶內元素遷移需要加鎖。
????????????????????if?(tabAt(tab,?i)?==?f)?{
????????????????????????Node?ln,?hn;
????????????????????????if?(fh?>=?0)?{??//>=0表示是鏈表結點
????????????????????????????//由于n是2的冪次方(所有二進制位中只有一個1),如n=16(0001 0000),第4位為1,那么hash&n后的值第4位只能為0或1。所以可以根據(jù)hash&n的結果將所有結點分為兩部分。
????????????????????????????int?runBit?=?fh?&?n;
????????????????????????????Node?lastRun?=?f;
????????????????????????????//找出最后一段完整的fh&n不變的鏈表,這樣最后這一段鏈表就不用重新創(chuàng)建新結點了。
????????????????????????????for?(Node?p?=?f.next;?p?!=?null;?p?=?p.next)?{
????????????????????????????????int?b?=?p.hash?&?n;
????????????????????????????????if?(b?!=?runBit)?{
????????????????????????????????????runBit?=?b;
????????????????????????????????????lastRun?=?p;
????????????????????????????????}
????????????????????????????}
????????????????????????????if?(runBit?==?0)?{
????????????????????????????????ln?=?lastRun;
????????????????????????????????hn?=?null;
????????????????????????????}
????????????????????????????else?{
????????????????????????????????hn?=?lastRun;
????????????????????????????????ln?=?null;
????????????????????????????}
????????????????????????????//lastRun之前的結點因為fh&n不確定,所以全部需要重新遷移。
????????????????????????????for?(Node?p?=?f;?p?!=?lastRun;?p?=?p.next)?{
????????????????????????????????int?ph?=?p.hash;?K?pk?=?p.key;?V?pv?=?p.val;
????????????????????????????????if?((ph?&?n)?==?0)
????????????????????????????????????ln?=?new?Node(ph,?pk,?pv,?ln);
????????????????????????????????else
????????????????????????????????????hn?=?new?Node(ph,?pk,?pv,?hn);
????????????????????????????}
????????????????????????????//低位鏈表放在i處
????????????????????????????setTabAt(nextTab,?i,?ln);
????????????????????????????//高位鏈表放在i+n處
????????????????????????????setTabAt(nextTab,?i?+?n,?hn);
????????????????????????????setTabAt(tab,?i,?fwd);??//在原table中設置ForwardingNode節(jié)點以提示該桶擴容完成。
????????????????????????????advance?=?true;
????????????????????????}
????????????????????????else?if?(f?instanceof?TreeBin)?{?//紅黑樹處理。
????????????????????????????...
helpTransfer()
添加、刪除節(jié)點之處都會檢測到table的第i個桶是ForwardingNode的話會調用helpTransfer()方法。
final?Node[]?helpTransfer(Node[]?tab,?Node?f)?{
????????Node[]?nextTab;?int?sc;
????????if?(tab?!=?null?&&?(f?instanceof?ForwardingNode)?&&
????????????(nextTab?=?((ForwardingNode)f).nextTable)?!=?null)?{
????????????int?rs?=?resizeStamp(tab.length);
????????????while?(nextTab?==?nextTable?&&?table?==?tab?&&
???????????????????(sc?=?sizeCtl)?0)?{
????????????????if?((sc?>>>?RESIZE_STAMP_SHIFT)?!=?rs?||?sc?==?rs?+?1?||
????????????????????sc?==?rs?+?MAX_RESIZERS?||?transferIndex?<=?0)
????????????????????break;
????????????????if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?sc?+?1))?{
????????????????????transfer(tab,?nextTab);
????????????????????break;
????????????????}
????????????}
????????????return?nextTab;
????????}
????????return?table;
????}
并發(fā)擴容總結
- 單線程新建nextTable,新容量一般為原table容量的兩倍。
- 每個線程想增/刪元素時,如果訪問的桶是ForwardingNode節(jié)點,則表明當前正處于擴容狀態(tài),協(xié)助一起擴容完成后再完成相應的數(shù)據(jù)更改操作。
- 擴容時將原table的所有桶倒序分配,每個線程每次最小分配16個桶,防止資源競爭導致的效率下降。單個桶內元素的遷移是加鎖的,但桶范圍處理分配可以多線程,在沒有遷移完成所有桶之前每個線程需要重復獲取遷移桶范圍,直至所有桶遷移完成。
- 一個舊桶內的數(shù)據(jù)遷移完成但不是所有桶都遷移完成時,查詢數(shù)據(jù)委托給ForwardingNode結點查詢nextTable完成(這個后面看find()分析)。
- 遷移過程中sizeCtl用于記錄參與擴容線程的數(shù)量,全部遷移完成后sizeCtl更新為新table容量的0.75倍。
擴容節(jié)結束!其它常用操作再說下。(搜索公眾號Java知音,回復“2021”,送你一份Java面試題寶典)
get()
public?V?get(Object?key)?{
????????Node[]?tab;?Node?e,?p;?int?n,?eh;?K?ek;
????????int?h?=?spread(key.hashCode());
????????if?((tab?=?table)?!=?null?&&?(n?=?tab.length)?>?0?&&
????????????(e?=?tabAt(tab,?(n?-?1)?&?h))?!=?null)?{
????????????if?((eh?=?e.hash)?==?h)?{??//總是檢查頭結點
????????????????if?((ek?=?e.key)?==?key?||?(ek?!=?null?&&?key.equals(ek)))
????????????????????return?e.val;
????????????}
????????????else?if?(eh?0)??//在遷移或都是TreeBin
????????????????return?(p?=?e.find(h,?key))?!=?null???p.val?:?null;
????????????while?((e?=?e.next)?!=?null)?{??//鏈表則直接遍歷查詢
????????????????if?(e.hash?==?h?&&
????????????????????((ek?=?e.key)?==?key?||?(ek?!=?null?&&?key.equals(ek))))
????????????????????return?e.val;
????????????}
????????}
????????return?null;
????}
remove()
public?V?remove(Object?key)?{
????return?replaceNode(key,?null,?null);
}
final?V?replaceNode(Object?key,?V?value,?Object?cv)?{
????int?hash?=?spread(key.hashCode());
????for?(Node[]?tab?=?table;;)?{
????????Node?f;?int?n,?i,?fh;
????????if?(tab?==?null?||?(n?=?tab.length)?==?0?||
????????????(f?=?tabAt(tab,?i?=?(n?-?1)?&?hash))?==?null)
????????????break;
????????else?if?((fh?=?f.hash)?==?MOVED)??//刪除時也需要確實擴容完成后才可以操作。
????????????tab?=?helpTransfer(tab,?f);
????????else?{
????????????V?oldVal?=?null;
????????????boolean?validated?=?false;
????????????synchronized?(f)?{
????????????????if?(tabAt(tab,?i)?==?f)?{
????????????????????if?(fh?>=?0)?{
????????????????????????validated?=?true;
????????????????????????for?(Node?e?=?f,?pred?=?null;;)?{
????????????????????????????K?ek;
????????????????????????????if?(e.hash?==?hash?&&
????????????????????????????????((ek?=?e.key)?==?key?||
?????????????????????????????????(ek?!=?null?&&?key.equals(ek))))?{
????????????????????????????????V?ev?=?e.val;
????????????????????????????????if?(cv?==?null?||?cv?==?ev?||
????????????????????????????????????(ev?!=?null?&&?cv.equals(ev)))?{??//cv不為null則替換,否則是刪除。
????????????????????????????????????oldVal?=?ev;
????????????????????????????????????if?(value?!=?null)
????????????????????????????????????????e.val?=?value;
????????????????????????????????????else?if?(pred?!=?null)
????????????????????????????????????????pred.next?=?e.next;
????????????????????????????????????else
????????????????????????????????????????//沒前置節(jié)點就是頭節(jié)點
????????????????????????????????????????setTabAt(tab,?i,?e.next);
????????????????????????????????}
????????????????????????????????break;
????????????????????????????}
????????????????????????????pred?=?e;
????????????????????????????if?((e?=?e.next)?==?null)
????????????????????????????????break;
????????????????????????}
????????????????????}
????????????????????else?if?(f?instanceof?TreeBin)?{
????????????????????????//...
????????????????????}
????????????????}
????????????}
????????????if?(validated)?{
????????????????if?(oldVal?!=?null)?{
????????????????????if?(value?==?null)
????????????????????????addCount(-1L,?-1);
????????????????????return?oldVal;
????????????????}
????????????????break;
????????????}
????????}
????}
????return?null;
}
主要改進
CAS無鎖算法與synchronized保證并發(fā)安全,支持并發(fā)擴容,數(shù)據(jù)結構變更為數(shù)組+鏈表+紅黑樹,提高性能。
jdk1.8版棄用變量
Segment只有序列化時會用到。loadFactor僅用于構造函數(shù)中設定初始容量,已不能影響擴容閾值,jdk1.8版本中閾值計算基本恒定為0.75。concurrencyLevel只能影響初始容量,table容量大小與它無關。
ava JDK升級到1.8后有些集合類的實現(xiàn)有了變化,其中ConcurrentHashMap就有進行結構上的大調整。jdk1.6、1.7實現(xiàn)的共同點主要是通過采用分段鎖Segment減少熱點域來提高并發(fā)效率,1.8版本的實現(xiàn)有哪些變化呢?
重要概念
在正式研究前,我們需要先知道幾個重要參數(shù),提前說明其值所代表的意義以便更好的講解源碼實現(xiàn)。
table
所有數(shù)據(jù)都存在table中,table的容量會根據(jù)實際情況進行擴容,table[i]存放的數(shù)據(jù)類型有以下3種:
- TreeBin 用于包裝紅黑樹結構的結點類型
- ForwardingNode 擴容時存放的結點類型,并發(fā)擴容的實現(xiàn)關鍵之一
- Node 普通結點類型,表示鏈表頭結點
nextTable
擴容時用于存放數(shù)據(jù)的變量,擴容完成后會置為null。
sizeCtl
以volatile修飾的sizeCtl用于數(shù)組初始化與擴容控制,它有以下幾個值:
當前未初始化:
?=?0??//未指定初始容量
?>?0??//由指定的初始容量計算而來,再找最近的2的冪次方。
??//比如傳入6,計算公式為6+6/2+1=10,最近的2的冪次方為16,所以sizeCtl就為16。
初始化中:
?=?-1?//table正在初始化
?=?-N?//N是int類型,分為兩部分,高15位是指定容量標識,低16位表示
??????//并行擴容線程數(shù)+1,具體在resizeStamp函數(shù)介紹。
初始化完成:
?=table.length?*?0.75??//擴容閾值調為table容量大小的0.75倍
其它的分析相應源碼時再細說。
put()
public?V?put(K?key,?V?value)?{
????return?putVal(key,?value,?false);
}
final?V?putVal(K?key,?V?value,?boolean?onlyIfAbsent)?{
????if?(key?==?null?||?value?==?null)?throw?new?NullPointerException();
????int?hash?=?spread(key.hashCode());???//@1,講解見下面小標題。
?????//i處結點數(shù)量,2: TreeBin或鏈表結點數(shù), 其它:鏈表結點數(shù)。主要用于每次加入結點后查看是否要由鏈表轉為紅黑樹
????int?binCount?=?0;?
????for?(Node[]?tab?=?table;;)?{???//CAS經(jīng)典寫法,不成功無限重試,讓再次進行循環(huán)進行相應操作。
????????Node?f;?int?n,?i,?fh;
????????//除非構造時指定初始化集合,否則默認構造不初始化table,所以需要在添加時元素檢查是否需要初始化。
????????if?(tab?==?null?||?(n?=?tab.length)?==?0)
????????????tab?=?initTable();??//@2
????????//CAS操作得到對應table中元素
????????else?if?((f?=?tabAt(tab,?i?=?(n?-?1)?&?hash))?==?null)?{?//@3
????????????if?(casTabAt(tab,?i,?null,
?????????????????????????new?Node(hash,?key,?value,?null)))
????????????????break;???????????????????//null創(chuàng)建Node對象做為鏈表首結點
????????}
????????else?if?((fh?=?f.hash)?==?MOVED)??//當前結點正在擴容
?????????//讓當前線程調用helpTransfer也參與到擴容過程中來,擴容完畢后tab指向新table。
????????????tab?=?helpTransfer(tab,?f);?
????????else?{
????????????V?oldVal?=?null;
????????????synchronized?(f)?{
????????????????if?(tabAt(tab,?i)?==?f)?{??//雙重檢查i處結點未變化
????????????????????if?(fh?>=?0)?{??//表明是鏈表結點類型,hash值是大于0的,即spread()方法計算而來
????????????????????????binCount?=?1;
????????????????????????for?(Node?e?=?f;;?++binCount)?{
????????????????????????????K?ek;
????????????????????????????if?(e.hash?==?hash?&&
????????????????????????????????((ek?=?e.key)?==?key?||
?????????????????????????????????(ek?!=?null?&&?key.equals(ek))))?{
????????????????????????????????oldVal?=?e.val;
????????????????????????????????//onlyIfAbsent表示是新元素才加入,舊值不替換,默認為fase。
????????????????????????????????if?(!onlyIfAbsent)??
????????????????????????????????????e.val?=?value;
????????????????????????????????break;
????????????????????????????}
????????????????????????????Node?pred?=?e;
????????????????????????????if?((e?=?e.next)?==?null)?{
????????????????????????????????//jdk1.8版本是把新結點加入鏈表尾部,next由volatile修飾
????????????????????????????????pred.next?=?new?Node(hash,?key,
??????????????????????????????????????????????????????????value,?null);
????????????????????????????????break;
????????????????????????????}
????????????????????????}
????????????????????}
????????????????????else?if?(f?instanceof?TreeBin)?{??//紅黑樹結點類型
????????????????????????Node?p;
????????????????????????binCount?=?2;
????????????????????????if?((p?=?((TreeBin)f).putTreeVal(hash,?key,
???????????????????????????????????????????????????????value))?!=?null)?{???//@4
????????????????????????????oldVal?=?p.val;
????????????????????????????if?(!onlyIfAbsent)
????????????????????????????????p.val?=?value;
????????????????????????}
????????????????????}
????????????????}
????????????}
????????????if?(binCount?!=?0)?{
????????????????if?(binCount?>=?TREEIFY_THRESHOLD)??//默認桶中結點數(shù)超過8個數(shù)據(jù)結構會轉為紅黑樹
????????????????????treeifyBin(tab,?i);???//@5
????????????????if?(oldVal?!=?null)
????????????????????return?oldVal;
????????????????break;
????????????}
????????}
????}
????addCount(1L,?binCount);??//更新size,檢測擴容
????return?null;
}
注釋已說的比較明白,上面的代碼中的數(shù)字注釋再單獨細說下:
spread()
jdk1.8的hash策略,與以往版本一樣都是為了減少hash沖突:
static?final?int?HASH_BITS?=?0x7fffffff;?//?usable?bits?of?normal?node?hash???//01111111_11111111_11111111_11111111
static?final?int?spread(int?h)?{
????//無符號右移加入高位影響,與HASH_BITS做與操作保留對hash有用的比特位,有讓hash>0的意思
????return?(h?^?(h?>>>?16))?&?HASH_BITS;
}
initTable()
initTable()用于里面table數(shù)組的初始化,值得一提的是table初始化是沒有加鎖的,那么如何處理并發(fā)呢?
由下面代碼可以看到,當要初始化時會通過CAS操作將sizeCtl置為-1,而sizeCtl由volatile修飾,保證修改對后面線程可見。
這之后如果再有線程執(zhí)行到此方法時檢測到sizeCtl為負數(shù),說明已經(jīng)有線程在給擴容了,這個線程就會調用Thread.yield()讓出一次CPU執(zhí)行時間。
private?final?Node[]?initTable()?{
????????Node[]?tab;?int?sc;
????????while?((tab?=?table)?==?null?||?tab.length?==?0)?{
????????????if?((sc?=?sizeCtl)?0)
????????????????Thread.yield();?
????????????????//正在初始化時將sizeCtl設為-1
????????????else?if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?-1))?{
????????????????try?{
????????????????????if?((tab?=?table)?==?null?||?tab.length?==?0)?{
????????????????????????int?n?=?(sc?>?0)???sc?:?DEFAULT_CAPACITY;??//DEFAULT_CAPACITY為16
????????????????????????@SuppressWarnings("unchecked")
????????????????????????Node[]?nt?=?(Node[])new?Node,?>[n];
????????????????????????table?=?tab?=?nt;
????????????????????????sc?=?n?-?(n?>>>?2);???//擴容閾值為新容量的0.75倍
????????????????????}
????????????????}?finally?{
????????????????????sizeCtl?=?sc;???//擴容保護
????????????????}
????????????????break;
????????????}
????????}
????????return?tab;
????}
tabAt()/casTabAt()/setTabAt()
ABASE表示table中首個元素的內存偏移地址,所以(long)i << ASHIFT) + ABASE得到table[i]的內存偏移地址:
static?final??Node?tabAt(Node[]?tab,?int?i) ? {
????return?(Node)U.getObjectVolatile(tab,?((long)i?<}
static?final??boolean?casTabAt(Node[]?tab,?int?i,
????????????????????????????????????Node?c,?Node?v) ?{
????return?U.compareAndSwapObject(tab,?((long)i?<}
static?final??void?setTabAt(Node[]?tab,?int?i,?Node?v) ?{
????U.putObjectVolatile(tab,?((long)i?<}
對i位置結點的寫操作有兩個方法,casTabAt()與setTabAt()。源碼中有這樣一段注釋:
*?Note?that?calls?to?setTabAt?always?occur?within?locked?regions,
*?and?so?in?principle?require?only?release?ordering,?not
*?full?volatile?semantics,?but?are?currently?coded?as?volatile
*?writes?to?be?conservative.
所以要原子語義的寫操作需要使用casTabAt(),setTabAt()是在鎖定桶的狀態(tài)下才會被調用,之所以實現(xiàn)成這樣只是帶保守性的一種寫法而已。放松一下繼續(xù)~

TreeBin
注釋4、5都是有關TreeBin的操作,為進一步提升性能,ConcurrentHashMap引入了紅黑樹。引入紅黑樹是因為鏈表查詢的時間復雜度為O(n),紅黑樹查詢的時間復雜度為O(log(n)),所以在結點比較多的情況下使用紅黑樹可以大大提升性能。
紅黑樹是一種自平衡二叉查找樹,有如下性質:
- 每個節(jié)點要么是紅色,要么是黑色。
- 根節(jié)點永遠是黑色的。
- 所有的葉節(jié)點都是空節(jié)點(即 null),并且是黑色的。
- 每個紅色節(jié)點的兩個子節(jié)點都是黑色。(從每個葉子到- 根的路徑上不會有兩個連續(xù)的紅色節(jié)點)
- 從任一節(jié)點到其每個葉子的所有簡單路徑都包含相同數(shù)目的黑色節(jié)點。
圖例:

treeifyBin()
桶內元素超時8個時會調用到此方法。
private?final?void?treeifyBin(Node[]?tab,?int?index) ?{
????????Node?b;?int?n,?sc;
????????if?(tab?!=?null)?{
????????????if?((n?=?tab.length)?????????????????tryPresize(n?<1);??//如果數(shù)組整體容量太小則去擴容,放棄轉紅黑樹結構
????????????else?if?((b?=?tabAt(tab,?index))?!=?null?&&?b.hash?>=?0)?{
????????????????synchronized?(b)?{
????????????????????if?(tabAt(tab,?index)?==?b)?{
????????????????????????TreeNode?hd?=?null,?tl?=?null;
????????????????????????for?(Node?e?=?b;?e?!=?null;?e?=?e.next)?{
????????????????????????????TreeNode?p?=
????????????????????????????????new?TreeNode(e.hash,?e.key,?e.val,
??????????????????????????????????????????????????null,?null);
????????????????????????????if?((p.prev?=?tl)?==?null)
????????????????????????????????hd?=?p;
????????????????????????????else
????????????????????????????????tl.next?=?p;
????????????????????????????tl?=?p;
????????????????????????}
????????????????????????setTabAt(tab,?index,?new?TreeBin(hd));
可以看出按原Node鏈表順序轉為了TreeNode鏈表,每個TreeNode的prev、next已完備,傳入頭結點hd構造紅黑樹。
TreeBin構造函數(shù)
TreeBin(TreeNode?b)?{
???//Node(int?hash,?K?key,?V?val,?Node?next)
????????????super(TREEBIN,?null,?null,?null);
????????????this.first?=?b;
????????????TreeNode?r?=?null;
????????????for?(TreeNode?x?=?b,?next;?x?!=?null;?x?=?next)?{??//依次處理每個結點
????????????????next?=?(TreeNode)x.next;
????????????????x.left?=?x.right?=?null;
????????????????if?(r?==?null)?{
????????????????????x.parent?=?null;
????????????????????x.red?=?false;??//根結點為黑色
????????????????????r?=?x;
????????????????}
????????????????else?{
????????????????????K?k?=?x.key;
????????????????????int?h?=?x.hash;
????????????????????Class>?kc?=?null;
????????????????????for?(TreeNode?p?=?r;;)?{?//遍歷查找新結點存放位置
????????????????????????int?dir,?ph;
????????????????????????K?pk?=?p.key;
????????????????????????if?((ph?=?p.hash)?>?h)
????????????????????????????dir?=?-1;
????????????????????????else?if?(ph?????????????????????????????dir?=?1;
????????????????????????//key有實現(xiàn)Comparable接口則使用compareTo()進行比較,否則采用tieBreakOrder中默認的比較方式,即比較hashCode。
????????????????????????else?if?((kc?==?null?&&
??????????????????????????????????(kc?=?comparableClassFor(k))?==?null)?||
?????????????????????????????????(dir?=?compareComparables(kc,?k,?pk))?==?0)
????????????????????????????dir?=?tieBreakOrder(k,?pk);
????????????????????????????TreeNode?xp?=?p;
????????????????????????if?((p?=?(dir?<=?0)???p.left?:?p.right)?==?null)?{??//左子節(jié)點或右子節(jié)點為空則在p下添加新結點,否則p的值更新為子節(jié)點繼續(xù)查找。紅黑樹中結點p.left <= p <= p.right
????????????????????????????x.parent?=?xp;??//保存新結點的父結點
????????????????????????????if?(dir?<=?0)
????????????????????????????????xp.left?=?x;?//排序小的放左邊
????????????????????????????else
????????????????????????????????xp.right?=?x;??//排序大的放右邊
????????????????????????????r?=?balanceInsertion(r,?x);??//平衡紅黑樹
????????????????????????????break;
...
????????????this.root?=?r;
...
????????}
putTreeVal()與此方法遍歷方式類似不再介紹。
擴容實現(xiàn)
寫這篇文章主要就是想講講擴容,Let’s go!
什么時候會擴容?
使用put()添加元素時會調用addCount(),內部檢查sizeCtl看是否需要擴容。
tryPresize()被調用,此方法被調用有兩個調用點:
- 鏈表轉紅黑樹(put()時檢查)時如果table容量小于64(MIN_TREEIFY_CAPACITY),則會觸發(fā)擴容。
- 調用putAll()之類一次性加入大量元素,會觸發(fā)擴容。
addCount()
addCount()與tryPresize()實現(xiàn)很相似,我們先以addCount()分析下擴容邏輯:
private?final?void?addCount(long?x,?int?check)?{
????...
????//check就是結點數(shù)量,有新元素加入成功才檢查是否要擴容。
????if?(check?>=?0)?{
????????Node[]?tab,?nt;?int?n,?sc;
????????//s表示加入新元素后容量大小,計算已省略。
????????//新容量大于當前擴容閾值并且小于最大擴容值才擴容,如果tab=null說明正在初始化,死循環(huán)等待初始化完成。
????????while?(s?>=?(long)(sc?=?sizeCtl)?&&?(tab?=?table)?!=?null?&&
???????????????(n?=?tab.length)?????????????int?rs?=?resizeStamp(n);??//@1
????????????//sc<0表示已經(jīng)有線程在進行擴容工作
????????????if?(sc?0)?{
????????????????//條件1:檢查是對容量n的擴容,保證sizeCtl與n是一塊修改好的
????????????????//條件2與條件3:應該是進行sc的最小值或最大值判斷。
????????????????//條件4與條件5:?確保tranfer()中的nextTable相關初始化邏輯已走完。
????????????????if?((sc?>>>?RESIZE_STAMP_SHIFT)?!=?rs?||?sc?==?rs?+?1?||
????????????????????sc?==?rs?+?MAX_RESIZERS?||?(nt?=?nextTable)?==?null?||
????????????????????transferIndex?<=?0)
????????????????????break;
????????????????if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?sc?+?1))??//有新線程參與擴容則sizeCtl加1
????????????????????transfer(tab,?nt);
????????????}
????????????//沒有線程在進行擴容,將sizeCtl的值改為(rs << RESIZE_STAMP_SHIFT)?+ 2),原因見下面sizeCtl值的計算分析。
????????????else?if?(U.compareAndSwapInt(this,?SIZECTL,?sc,
?????????????????????????????????????????(rs?<2))
????????????????transfer(tab,?null);
????????????s?=?sumCount();
????????}
????}
}
resizeStamp()
在上面的代碼中首先有調用到這樣的一個方法。
/**
?*?The?number?of?bits?used?for?generation?stamp?in?sizeCtl.
?*?Must?be?at?least?6?for?32bit?arrays.
*/
private?static?int?RESIZE_STAMP_BITS?=?16;
/**
?*?The?bit?shift?for?recording?size?stamp?in?sizeCtl.
?*/
private?static?final?int?RESIZE_STAMP_SHIFT?=?32?-?RESIZE_STAMP_BITS;
static?final?int?resizeStamp(int?n)?{
????????return?Integer.numberOfLeadingZeros(n)?|?(1?<(RESIZE_STAMP_BITS?-?1));
}
Integer.numberOfLeadingZeros(n)用于計算n轉換成二進制后前面有幾個0。這個有什么作用呢?
首先ConcurrentHashMap的容量必定是2的冪次方,所以不同的容量n前面0的個數(shù)必然不同,這樣可以保證是在原容量為n的情況下進行擴容。(搜索公眾號Java知音,回復“2021”,送你一份Java面試題寶典)
(1 << (RESIZE_STAMP_BITS - 1)即是1<<15,表示為二進制即是高16位為0,第16位為1:
0000?0000?0000?0000?1000?0000?0000?0000
所以resizeStamp()的返回值(簡稱為rs) 高16位置0,第16位為1,低15位存放當前容量n擴容標識,用于表示是對n的擴容。rs與RESIZE_STAMP_SHIFT配合可以求出新的sizeCtl的值,分情況如下:
sc >= 0表示沒有線程在擴容,使用CAS將sizeCtl的值改為(rs << RESIZE_STAMP_SHIFT) + 2)。sc < 0已經(jīng)有線程在擴容,將sizeCtl+1并調用transfer()讓當前線程參與擴容。
rs即resizeStamp(n),如當前容量為8時sc(sizeCtl)的計算過程如下:
//容量n=8
0000?0000?0000?0000?0000?0000?0000?1000
//Integer.numberOfLeadingZeros(8)=28,二進制表示如下:
0000?0000?0000?0000?0000?0000?0001?1100
//rs
0000?0000?0000?0000?1000?0000?0001?1100
//temp = rs << RESIZE_STAMP_SHIFT,即 temp = rs << 16,左移16后temp最高位為1,所以temp成了一個負數(shù)。
1000?0000?0001?1100?0000?0000?0000?0000
//第一個線程要擴容時,sc?=?(rs?<
1000?0000?0001?1100?0000?0000?0000?0010
那么在擴容時sizeCtl值的意義便如下圖所示:

tryPresize()
private?final?void?tryPresize(int?size)?{
????????//根據(jù)傳入的size計算出真正的新容量,新容量需要是2的冪次方。
????????int?c?=?(size?>=?(MAXIMUM_CAPACITY?>>>?1))???MAXIMUM_CAPACITY?:
????????????tableSizeFor(size?+?(size?>>>?1)?+?1);
????????int?sc;
????????while?((sc?=?sizeCtl)?>=?0)?{
????????????Node[]?tab?=?table;?int?n;
????????????if?(tab?==?null?||?(n?=?tab.length)?==?0)?{
????????????????n?=?(sc?>?c)???sc?:?c;???//table未初始化則給一個初始容量
????????????????//后面相似代碼不再講解
????????????????if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?-1))?{
????????????????????try?{
????????????????????????if?(table?==?tab)?{
????????????????????????????@SuppressWarnings("unchecked")
????????????????????????????Node[]?nt?=?(Node[])new?Node,?>[n];
????????????????????????????table?=?nt;
????????????????????????????sc?=?n?-?(n?>>>?2);
????????????????????????}
????????????????????}?finally?{
????????????????????????sizeCtl?=?sc;
????????????????????}
????????????????}
????????????}
????????????else?if?(c?<=?sc?||?n?>=?MAXIMUM_CAPACITY)
????????????????break;
????????????else?if?(tab?==?table)?{
????????????????int?rs?=?resizeStamp(n);
????????????????if?(sc?0)?{
????????????????????Node[]?nt;
????????????????????if?((sc?>>>?RESIZE_STAMP_SHIFT)?!=?rs?||?sc?==?rs?+?1?||
????????????????????????sc?==?rs?+?MAX_RESIZERS?||?(nt?=?nextTable)?==?null?||
????????????????????????transferIndex?<=?0)
????????????????????????break;
????????????????????if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?sc?+?1))
????????????????????????//傳入指定容量
????????????????????????transfer(tab,?nt);
????????????????}
????????????????else?if?(U.compareAndSwapInt(this,?SIZECTL,?sc,
?????????????????????????????????????????????(rs?<2))
????????????????????transfer(tab,?null);
????????????}
????????}
????}
transfer()
jdk1.8版本的ConcurrentHashMap支持并發(fā)擴容,上面已經(jīng)分析了一小部分,下面這個方法是真正進行并行擴容的地方。
private?final?void?transfer(Node[]?tab,?Node[]?nextTab) ?{
????????int?n?=?tab.length,?stride;
????????if?((stride?=?(NCPU?>?1)???(n?>>>?3)?/?NCPU?:?n)?//每個線程處理桶的最小數(shù)目,可以看出核數(shù)越高步長越小,最小16個。
????????????stride?=?MIN_TRANSFER_STRIDE;?//?subdivide?range
????????if?(nextTab?==?null)?{
????????????try?{
????????????????@SuppressWarnings("unchecked")
????????????????Node[]?nt?=?(Node[])new?Node,?>[n?<1];??//擴容到2倍
????????????????nextTab?=?nt;
????????????}?catch?(Throwable?ex)?{??????//?try?to?cope?with?OOME
????????????????sizeCtl?=?Integer.MAX_VALUE;??//擴容保護
????????????????return;
????????????}
????????????nextTable?=?nextTab;
????????????transferIndex?=?n;??//擴容總進度,>=transferIndex的桶都已分配出去。
????????}
????????int?nextn?=?nextTab.length;
??????????//擴容時的特殊節(jié)點,標明此節(jié)點正在進行遷移,擴容期間的元素查找要調用其find()方法在nextTable中查找元素。
????????ForwardingNode?fwd?=?new?ForwardingNode(nextTab);?
????????//當前線程是否需要繼續(xù)尋找下一個可處理的節(jié)點
????????boolean?advance?=?true;
????????boolean?finishing?=?false;?//所有桶是否都已遷移完成。
????????for?(int?i?=?0,?bound?=?0;;)?{
????????????Node?f;?int?fh;
????????????//此循環(huán)的作用是確定當前線程要遷移的桶的范圍或通過更新i的值確定當前范圍內下一個要處理的節(jié)點。
????????????while?(advance)?{
????????????????int?nextIndex,?nextBound;
????????????????if?(--i?>=?bound?||?finishing)??//每次循環(huán)都檢查結束條件
????????????????????advance?=?false;
????????????????//遷移總進度<=0,表示所有桶都已遷移完成。
????????????????else?if?((nextIndex?=?transferIndex)?<=?0)?{??
????????????????????i?=?-1;
????????????????????advance?=?false;
????????????????}
????????????????else?if?(U.compareAndSwapInt
?????????????????????????(this,?TRANSFERINDEX,?nextIndex,
??????????????????????????nextBound?=?(nextIndex?>?stride??
???????????????????????????????????????nextIndex?-?stride?:?0)))?{??//transferIndex減去已分配出去的桶。
????????????????????//確定當前線程每次分配的待遷移桶的范圍為[bound,?nextIndex)
????????????????????bound?=?nextBound;
????????????????????i?=?nextIndex?-?1;
????????????????????advance?=?false;
????????????????}
????????????}
????????????//當前線程自己的活已經(jīng)做完或所有線程的活都已做完,第二與第三個條件應該是下面讓"i = n"后,再次進入循環(huán)時要做的邊界檢查。
????????????if?(i?0?||?i?>=?n?||?i?+?n?>=?nextn)?{
????????????????int?sc;
????????????????if?(finishing)?{??//所有線程已干完活,最后才走這里。
????????????????????nextTable?=?null;
????????????????????table?=?nextTab;??//替換新table
????????????????????sizeCtl?=?(n?<1)?-?(n?>>>?1);?//調sizeCtl為新容量0.75倍。
????????????????????return;
????????????????}
????????????????//當前線程已結束擴容,sizeCtl-1表示參與擴容線程數(shù)-1。
????????????????if?(U.compareAndSwapInt(this,?SIZECTL,?sc?=?sizeCtl,?sc?-?1))?{
?????????????????//還記得addCount()處給sizeCtl賦的初值嗎?相等時說明沒有線程在參與擴容了,置finishing=advance=true,為保險讓i=n再檢查一次。
????????????????????if?((sc?-?2)?!=?resizeStamp(n)?<????????????????????????return;
????????????????????finishing?=?advance?=?true;
????????????????????i?=?n;?//?recheck?before?commit
????????????????}
????????????}
????????????else?if?((f?=?tabAt(tab,?i))?==?null)
????????????????advance?=?casTabAt(tab,?i,?null,?fwd);??//如果i處是ForwardingNode表示第i個桶已經(jīng)有線程在負責遷移了。
????????????else?if?((fh?=?f.hash)?==?MOVED)
????????????????advance?=?true;?//?already?processed
????????????else?{
????????????????synchronized?(f)?{??//桶內元素遷移需要加鎖。
????????????????????if?(tabAt(tab,?i)?==?f)?{
????????????????????????Node?ln,?hn;
????????????????????????if?(fh?>=?0)?{??//>=0表示是鏈表結點
????????????????????????????//由于n是2的冪次方(所有二進制位中只有一個1),如n=16(0001 0000),第4位為1,那么hash&n后的值第4位只能為0或1。所以可以根據(jù)hash&n的結果將所有結點分為兩部分。
????????????????????????????int?runBit?=?fh?&?n;
????????????????????????????Node?lastRun?=?f;
????????????????????????????//找出最后一段完整的fh&n不變的鏈表,這樣最后這一段鏈表就不用重新創(chuàng)建新結點了。
????????????????????????????for?(Node?p?=?f.next;?p?!=?null;?p?=?p.next)?{
????????????????????????????????int?b?=?p.hash?&?n;
????????????????????????????????if?(b?!=?runBit)?{
????????????????????????????????????runBit?=?b;
????????????????????????????????????lastRun?=?p;
????????????????????????????????}
????????????????????????????}
????????????????????????????if?(runBit?==?0)?{
????????????????????????????????ln?=?lastRun;
????????????????????????????????hn?=?null;
????????????????????????????}
????????????????????????????else?{
????????????????????????????????hn?=?lastRun;
????????????????????????????????ln?=?null;
????????????????????????????}
????????????????????????????//lastRun之前的結點因為fh&n不確定,所以全部需要重新遷移。
????????????????????????????for?(Node?p?=?f;?p?!=?lastRun;?p?=?p.next)?{
????????????????????????????????int?ph?=?p.hash;?K?pk?=?p.key;?V?pv?=?p.val;
????????????????????????????????if?((ph?&?n)?==?0)
????????????????????????????????????ln?=?new?Node(ph,?pk,?pv,?ln);
????????????????????????????????else
????????????????????????????????????hn?=?new?Node(ph,?pk,?pv,?hn);
????????????????????????????}
????????????????????????????//低位鏈表放在i處
????????????????????????????setTabAt(nextTab,?i,?ln);
????????????????????????????//高位鏈表放在i+n處
????????????????????????????setTabAt(nextTab,?i?+?n,?hn);
????????????????????????????setTabAt(tab,?i,?fwd);??//在原table中設置ForwardingNode節(jié)點以提示該桶擴容完成。
????????????????????????????advance?=?true;
????????????????????????}
????????????????????????else?if?(f?instanceof?TreeBin)?{?//紅黑樹處理。
????????????????????????????...
helpTransfer()
添加、刪除節(jié)點之處都會檢測到table的第i個桶是ForwardingNode的話會調用helpTransfer()方法。
final?Node[]?helpTransfer(Node[]?tab,?Node?f)?{
????????Node[]?nextTab;?int?sc;
????????if?(tab?!=?null?&&?(f?instanceof?ForwardingNode)?&&
????????????(nextTab?=?((ForwardingNode)f).nextTable)?!=?null)?{
????????????int?rs?=?resizeStamp(tab.length);
????????????while?(nextTab?==?nextTable?&&?table?==?tab?&&
???????????????????(sc?=?sizeCtl)?0)?{
????????????????if?((sc?>>>?RESIZE_STAMP_SHIFT)?!=?rs?||?sc?==?rs?+?1?||
????????????????????sc?==?rs?+?MAX_RESIZERS?||?transferIndex?<=?0)
????????????????????break;
????????????????if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?sc?+?1))?{
????????????????????transfer(tab,?nextTab);
????????????????????break;
????????????????}
????????????}
????????????return?nextTab;
????????}
????????return?table;
????}
并發(fā)擴容總結
- 單線程新建nextTable,新容量一般為原table容量的兩倍。
- 每個線程想增/刪元素時,如果訪問的桶是ForwardingNode節(jié)點,則表明當前正處于擴容狀態(tài),協(xié)助一起擴容完成后再完成相應的數(shù)據(jù)更改操作。
- 擴容時將原table的所有桶倒序分配,每個線程每次最小分配16個桶,防止資源競爭導致的效率下降。單個桶內元素的遷移是加鎖的,但桶范圍處理分配可以多線程,在沒有遷移完成所有桶之前每個線程需要重復獲取遷移桶范圍,直至所有桶遷移完成。
- 一個舊桶內的數(shù)據(jù)遷移完成但不是所有桶都遷移完成時,查詢數(shù)據(jù)委托給ForwardingNode結點查詢nextTable完成(這個后面看find()分析)。
- 遷移過程中sizeCtl用于記錄參與擴容線程的數(shù)量,全部遷移完成后sizeCtl更新為新table容量的0.75倍。
擴容節(jié)結束!其它常用操作再說下。(搜索公眾號Java知音,回復“2021”,送你一份Java面試題寶典)
get()
public?V?get(Object?key)?{
????????Node[]?tab;?Node?e,?p;?int?n,?eh;?K?ek;
????????int?h?=?spread(key.hashCode());
????????if?((tab?=?table)?!=?null?&&?(n?=?tab.length)?>?0?&&
????????????(e?=?tabAt(tab,?(n?-?1)?&?h))?!=?null)?{
????????????if?((eh?=?e.hash)?==?h)?{??//總是檢查頭結點
????????????????if?((ek?=?e.key)?==?key?||?(ek?!=?null?&&?key.equals(ek)))
????????????????????return?e.val;
????????????}
????????????else?if?(eh?0)??//在遷移或都是TreeBin
????????????????return?(p?=?e.find(h,?key))?!=?null???p.val?:?null;
????????????while?((e?=?e.next)?!=?null)?{??//鏈表則直接遍歷查詢
????????????????if?(e.hash?==?h?&&
????????????????????((ek?=?e.key)?==?key?||?(ek?!=?null?&&?key.equals(ek))))
????????????????????return?e.val;
????????????}
????????}
????????return?null;
????}
remove()
public?V?remove(Object?key)?{
????return?replaceNode(key,?null,?null);
}
final?V?replaceNode(Object?key,?V?value,?Object?cv)?{
????int?hash?=?spread(key.hashCode());
????for?(Node[]?tab?=?table;;)?{
????????Node?f;?int?n,?i,?fh;
????????if?(tab?==?null?||?(n?=?tab.length)?==?0?||
????????????(f?=?tabAt(tab,?i?=?(n?-?1)?&?hash))?==?null)
????????????break;
????????else?if?((fh?=?f.hash)?==?MOVED)??//刪除時也需要確實擴容完成后才可以操作。
????????????tab?=?helpTransfer(tab,?f);
????????else?{
????????????V?oldVal?=?null;
????????????boolean?validated?=?false;
????????????synchronized?(f)?{
????????????????if?(tabAt(tab,?i)?==?f)?{
????????????????????if?(fh?>=?0)?{
????????????????????????validated?=?true;
????????????????????????for?(Node?e?=?f,?pred?=?null;;)?{
????????????????????????????K?ek;
????????????????????????????if?(e.hash?==?hash?&&
????????????????????????????????((ek?=?e.key)?==?key?||
?????????????????????????????????(ek?!=?null?&&?key.equals(ek))))?{
????????????????????????????????V?ev?=?e.val;
????????????????????????????????if?(cv?==?null?||?cv?==?ev?||
????????????????????????????????????(ev?!=?null?&&?cv.equals(ev)))?{??//cv不為null則替換,否則是刪除。
????????????????????????????????????oldVal?=?ev;
????????????????????????????????????if?(value?!=?null)
????????????????????????????????????????e.val?=?value;
????????????????????????????????????else?if?(pred?!=?null)
????????????????????????????????????????pred.next?=?e.next;
????????????????????????????????????else
????????????????????????????????????????//沒前置節(jié)點就是頭節(jié)點
????????????????????????????????????????setTabAt(tab,?i,?e.next);
????????????????????????????????}
????????????????????????????????break;
????????????????????????????}
????????????????????????????pred?=?e;
????????????????????????????if?((e?=?e.next)?==?null)
????????????????????????????????break;
????????????????????????}
????????????????????}
????????????????????else?if?(f?instanceof?TreeBin)?{
????????????????????????//...
????????????????????}
????????????????}
????????????}
????????????if?(validated)?{
????????????????if?(oldVal?!=?null)?{
????????????????????if?(value?==?null)
????????????????????????addCount(-1L,?-1);
????????????????????return?oldVal;
????????????????}
????????????????break;
????????????}
????????}
????}
????return?null;
}
主要改進
CAS無鎖算法與synchronized保證并發(fā)安全,支持并發(fā)擴容,數(shù)據(jù)結構變更為數(shù)組+鏈表+紅黑樹,提高性能。
jdk1.8版棄用變量
Segment只有序列化時會用到。loadFactor僅用于構造函數(shù)中設定初始容量,已不能影響擴容閾值,jdk1.8版本中閾值計算基本恒定為0.75。concurrencyLevel只能影響初始容量,table容量大小與它無關。
