Java 8 ConcurrentHashMap源碼中竟然隱藏著兩個(gè)BUG
往期熱門(mén)文章:
1、《往期精選優(yōu)秀博文都在這里了!》 2、能掙錢(qián)的,開(kāi)源 SpringBoot 商城系統(tǒng),功能超全,超漂亮,真TMD香! 3、一套簡(jiǎn)單通用的Java后臺(tái)管理系統(tǒng),拿來(lái)即用,非常方便(附項(xiàng)目地址) 4、ConcurrentHashMap有十個(gè)提升性能的地方,你都知道嗎? 5、Java 中的 Switch 都支持 String 了,為什么不支持 long?
Java 7的ConcurrenHashMap的源碼我建議大家都看看,那個(gè)版本的源碼就是Java多線(xiàn)程編程的教科書(shū)。在Java 7的源碼中,作者對(duì)悲觀鎖的使用非常謹(jǐn)慎,大多都轉(zhuǎn)換為自旋鎖加volatile獲得相同的語(yǔ)義,即使最后迫不得已要用,作者也會(huì)通過(guò)各種技巧減少鎖的臨界區(qū)。在上一篇文章中我們也有講到,自旋鎖在臨界區(qū)比較小的時(shí)候是一個(gè)較優(yōu)的選擇是因?yàn)樗苊饬司€(xiàn)程由于阻塞而切換上下文,但本質(zhì)上它也是個(gè)鎖,在自旋等待期間只有一個(gè)線(xiàn)程能進(jìn)入臨界區(qū),其他線(xiàn)程只會(huì)自旋消耗CPU的時(shí)間片。Java 8中ConcurrentHashMap的實(shí)現(xiàn)通過(guò)一些巧妙的設(shè)計(jì)和技巧,避開(kāi)了自旋鎖的局限,提供了更高的并發(fā)性能。如果說(shuō)Java 7版本的源碼是在教我們?nèi)绾螌⒈^鎖轉(zhuǎn)換為自旋鎖,那么在Java 8中我們甚至可以看到如何將自旋鎖轉(zhuǎn)換為無(wú)鎖的方法和技巧。
把書(shū)讀薄

HashMap比較熟悉,那這張圖也應(yīng)該不會(huì)陌生。事實(shí)上在整體的數(shù)據(jù)結(jié)構(gòu)的設(shè)計(jì)上Java 8的ConcurrentHashMap和HashMap基本上是一致的。Java 7中ConcurrentHashMap為了提升性能使用了很多的編程技巧,但是引入Segment的設(shè)計(jì)還是有很大的改進(jìn)空間的,Java 7中ConcurrrentHashMap的設(shè)計(jì)有下面這幾個(gè)可以改進(jìn)的點(diǎn):Segment在擴(kuò)容的時(shí)候非擴(kuò)容線(xiàn)程對(duì)本Segment的寫(xiě)操作時(shí)都要掛起等待的對(duì) ConcurrentHashMap的讀操作需要做兩次哈希尋址,在讀多寫(xiě)少的情況下其實(shí)是有額外的性能損失的盡管 size()方法的實(shí)現(xiàn)中先嘗試無(wú)鎖讀,但是如果在這個(gè)過(guò)程中有別的線(xiàn)程做寫(xiě)入操作,那調(diào)用size()的這個(gè)線(xiàn)程就會(huì)給整個(gè)ConcurrentHashMap加鎖,這是整個(gè)ConcurrrentHashMap唯一一個(gè)全局鎖,這點(diǎn)對(duì)底層的組件來(lái)說(shuō)還是有性能隱患的極端情況下(比如客戶(hù)端實(shí)現(xiàn)了一個(gè)性能很差的哈希函數(shù)) get()方法的復(fù)雜度會(huì)退化到O(n)。
Java 8的設(shè)計(jì)是廢棄了Segment的使用,將悲觀鎖的粒度降低至桶維度,因此調(diào)用get的時(shí)候也不需要再做兩次哈希了。size()的設(shè)計(jì)是Java 8版本中最大的亮點(diǎn),我們?cè)诤竺娴奈恼轮袝?huì)詳細(xì)說(shuō)明。至于紅黑樹(shù),這篇文章仍然不做過(guò)多闡述。接下來(lái)的篇幅會(huì)深挖細(xì)節(jié),把書(shū)讀厚,涉及到的模塊有:初始化,put方法, 擴(kuò)容方法transfer以及size()方法,而其他模塊,比如hash函數(shù)等改變較小,故不再深究。準(zhǔn)備知識(shí)
ForwardingNode
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
// MOVED = -1,F(xiàn)orwardingNode的哈希值為-1
super(MOVED, null, null, null);
this.nextTable = tab;
}
}
Node和TreeNode之外,ConcurrentHashMap還引入了一個(gè)新的數(shù)據(jù)類(lèi)型ForwardingNode,我們這里只展示他的構(gòu)造方法,ForwardingNode的作用有兩個(gè):在動(dòng)態(tài)擴(kuò)容的過(guò)程中標(biāo)志某個(gè)桶已經(jīng)被復(fù)制到了新的桶數(shù)組中 如果在動(dòng)態(tài)擴(kuò)容的時(shí)候有 get方法的調(diào)用,則ForwardingNode將會(huì)把請(qǐng)求轉(zhuǎn)發(fā)到新的桶數(shù)組中,以避免阻塞get方法的調(diào)用,ForwardingNode在構(gòu)造的時(shí)候會(huì)將擴(kuò)容后的桶數(shù)組nextTable保存下來(lái)。
UNSAFE.compareAndSwap***
Java 8版本的ConcurrentHashMap實(shí)現(xiàn)CAS的工具,以int類(lèi)型為例其方法定義如下:/**
* Atomically update Java variable to <tt>x</tt> if it is currently
* holding <tt>expected</tt>.
* @return <tt>true</tt> if successful
*/
public final native boolean compareAndSwapInt(Object o, long offset,
int expected,
int x);
如果對(duì)象 o起始地址偏移量為offset的值等于expected,則將該值設(shè)為x,并返回true表明更新成功,否則返回false,表明CAS失敗
初始化
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) // 檢查參數(shù)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel)
initialCapacity = concurrencyLevel;
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size); // tableSizeFor,求不小于size的 2^n的算法,jdk1.8的HashMap中說(shuō)過(guò)
this.sizeCtl = cap;
}
concurrencyLevel在Java 7中是Segment數(shù)組的長(zhǎng)度,由于在Java 8中已經(jīng)廢棄了Segment,因此concurrencyLevel只是一個(gè)保留字段,無(wú)實(shí)際意義sizeCtl這個(gè)值第一次出現(xiàn),這個(gè)值如果等于-1則表明系統(tǒng)正在初始化,如果是其他負(fù)數(shù)則表明系統(tǒng)正在擴(kuò)容,在擴(kuò)容時(shí)sizeCtl二進(jìn)制的低十六位等于擴(kuò)容的線(xiàn)程數(shù)加一,高十六位(除符號(hào)位之外)包含桶數(shù)組的大小信息
put方法
public V put(K key, V value) {
return putVal(key, value, false);
}
put方法將調(diào)用轉(zhuǎn)發(fā)到putVal方法:final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 【A】延遲初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 【B】當(dāng)前桶是空的,直接更新
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 【C】如果當(dāng)前的桶的第一個(gè)元素是一個(gè)ForwardingNode節(jié)點(diǎn),則該線(xiàn)程嘗試加入擴(kuò)容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 【D】否則遍歷桶內(nèi)的鏈表或樹(shù),并插入
else {
// 暫時(shí)折疊起來(lái),后面詳細(xì)看
}
}
// 【F】流程走到此處,說(shuō)明已經(jīng)put成功,map的記錄總數(shù)加一
addCount(1L, binCount);
return null;
}
put方法依然牽扯出很多的知識(shí)點(diǎn)桶數(shù)組的初始化
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
// 說(shuō)明已經(jīng)有線(xiàn)程在初始化了,本線(xiàn)程開(kāi)始自旋
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// CAS保證只有一個(gè)線(xiàn)程能走到這個(gè)分支
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// sc = n - n/4 = 0.75n
sc = n - (n >>> 2);
}
} finally {
// 恢復(fù)sizeCtl > 0相當(dāng)于釋放鎖
sizeCtl = sc;
}
break;
}
}
return tab;
}
initTable方法的時(shí)候,CAS可以保證只有一個(gè)線(xiàn)程能夠進(jìn)入到真正的初始化分支,其他線(xiàn)程都是自旋等待。這段代碼中我們關(guān)注三點(diǎn)即可:依照前文所述,當(dāng)有線(xiàn)程開(kāi)始初始化桶數(shù)組時(shí),會(huì)通過(guò) CAS將sizeCtl置為-1,其他線(xiàn)程以此為標(biāo)志開(kāi)始自旋等待當(dāng)桶數(shù)組初始化結(jié)束后將 sizeCtl的值恢復(fù)為正數(shù),其值等于0.75倍的桶數(shù)組長(zhǎng)度,這個(gè)值的含義和之前HashMap中的THRESHOLD一致,是系統(tǒng)觸發(fā)擴(kuò)容的臨界點(diǎn)在 finally語(yǔ)句中對(duì)sizeCtl的操作并沒(méi)有使用CAS是因?yàn)?code style="overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: 'Operator Mono', Consolas, Monaco, Menlo, monospace;word-break: break-all;">CAS保證只有一個(gè)線(xiàn)程能夠執(zhí)行到這個(gè)地方
添加桶數(shù)組第一個(gè)元素
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
put方法的第二個(gè)分支會(huì)用tabAt判斷當(dāng)前桶是否是空的,如果是則會(huì)通過(guò)CAS寫(xiě)入,tabAt通過(guò)UNSAFE接口會(huì)拿到桶中的最新元素,casTabAt通過(guò)CAS保證不會(huì)有并發(fā)問(wèn)題,如果CAS失敗,則通過(guò)循環(huán)再進(jìn)入其他分支判斷是否需要新增線(xiàn)程擴(kuò)容
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// RESIZE_STAMP_SHIFT = 16
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 這里將sizeCtl的值自增1,表明參與擴(kuò)容的線(xiàn)程數(shù)量+1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
sizeCtl這個(gè)標(biāo)志位了,臨時(shí)變量rs由resizeStamp這個(gè)方法返回static final int resizeStamp(int n) {
// RESIZE_STAMP_BITS = 16
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
int類(lèi)型的值,所有Integer.numberOfLeadingZeros(n)的返回值介于0到32之間,如果轉(zhuǎn)換成二進(jìn)制Integer.numberOfLeadingZeros(n)的最大值是:00000000 00000000 00000000 00100000Integer.numberOfLeadingZeros(n)的最小值是:00000000 00000000 00000000 00000000
resizeStampd的返回值也就介于00000000 00000000 10000000 00000000到00000000 00000000 10000000 00100000之間,從這個(gè)返回值的范圍可以看出來(lái)resizeStamp的返回值高16位全都是0,是不包含任何信息的。因此在ConcurrrentHashMap中,會(huì)把resizeStamp的返回值左移16位拼到sizeCtl中,這就是為什么sizeCtl的高16位包含整個(gè)Map大小的原理。有了這個(gè)分析,這段代碼中比較長(zhǎng)的if判斷也就能看懂了if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
(sc >>> RESIZE_STAMP_SHIFT) != rs保證所有線(xiàn)程要基于同一個(gè)舊的桶數(shù)組擴(kuò)容transferIndex <= 0已經(jīng)有線(xiàn)程完成擴(kuò)容任務(wù)了
sc == rs + 1 || sc == rs + MAX_RESIZERS這兩個(gè)判斷條件如果是細(xì)心的同學(xué)一定會(huì)覺(jué)得難以理解,這個(gè)地方確實(shí)是JDK的一個(gè)BUG,這個(gè)BUG已經(jīng)在JDK 12中修復(fù),詳細(xì)情況可以參考一下Oracle的官網(wǎng):https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427,這兩個(gè)判斷條件應(yīng)該寫(xiě)成這樣:sc == (rs << RESIZE_STAMP_SHIFT) + 1 || sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS,因?yàn)橹苯颖容^rs和sc是沒(méi)有意義的,必須要有移位操作。它表達(dá)的含義是sc == (rs << RESIZE_STAMP_SHIFT) + 1當(dāng)前擴(kuò)容的線(xiàn)程數(shù)為0,即已經(jīng)擴(kuò)容完成了,就不需要再新增線(xiàn)程擴(kuò)容sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS參與擴(kuò)容的線(xiàn)程數(shù)已經(jīng)到了最大,就不需要再新增線(xiàn)程擴(kuò)容
transfer方法中,我們后面會(huì)詳細(xì)看,不過(guò)有個(gè)小細(xì)節(jié)可以提前注意,如果nextTable已經(jīng)初始化了,transfer會(huì)返回nextTable的的引用,后續(xù)可以直接操作新的桶數(shù)組。插入新值
HashMap一樣了,唯一一點(diǎn)不同的就是,這時(shí)候要給當(dāng)前的桶加鎖,且看代碼:final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)// 折疊
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 折疊}
else if ((fh = f.hash) == MOVED)// 折疊
else {
V oldVal = null;
synchronized (f) {
// 要注意這里這個(gè)不起眼的判斷條件
if (tabAt(tab, i) == f) {
if (fh >= 0) { // fh>=0的節(jié)點(diǎn)是鏈表,否則是樹(shù)節(jié)點(diǎn)或者ForwardingNode
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val; // 如果鏈表中有值了,直接更新
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
// 如果流程走到這里,則說(shuō)明鏈表中還沒(méi)值,直接連接到鏈表尾部
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
// 紅黑樹(shù)的操作先略過(guò)
}
}
}
}
// put成功,map的元素個(gè)數(shù)+1
addCount(1L, binCount);
return null;
}
tabAt(tab, i) == f,這個(gè)判斷的目的是為了處理調(diào)用put方法的線(xiàn)程和擴(kuò)容線(xiàn)程的競(jìng)爭(zhēng)。因?yàn)?code style="overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: 'Operator Mono', Consolas, Monaco, Menlo, monospace;word-break: break-all;">synchronized是阻塞鎖,如果調(diào)用put方法的線(xiàn)程恰好和擴(kuò)容線(xiàn)程同時(shí)操作同一個(gè)桶,且調(diào)用put方法的線(xiàn)程競(jìng)爭(zhēng)鎖失敗,等到該線(xiàn)程重新獲取到鎖的時(shí)候,當(dāng)前桶中的元素就會(huì)變成一個(gè)ForwardingNode,那就會(huì)出現(xiàn)tabAt(tab, i) != f的情況。多線(xiàn)程動(dòng)態(tài)擴(kuò)容
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // 初始化新的桶數(shù)組
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 判斷是會(huì)否是最后一個(gè)擴(kuò)容線(xiàn)程
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED) // 只有最后一個(gè)擴(kuò)容線(xiàn)程才有機(jī)會(huì)執(zhí)行這個(gè)分支
advance = true; // already processed
else { // 復(fù)制過(guò)程與HashMap類(lèi)似,這里不再贅述
synchronized (f) {
// 折疊
}
}
}
}
Java 8中ConcurrentHashMap擴(kuò)容的幾個(gè)特點(diǎn):新的桶數(shù)組
nextTable是原先桶數(shù)組長(zhǎng)度的2倍,這與之前HashMap一致參與擴(kuò)容的線(xiàn)程也是分段將
table中的元素復(fù)制到新的桶數(shù)組nextTable中桶一個(gè)桶數(shù)組中的元素在新的桶數(shù)組中均勻的分布在兩個(gè)桶中,桶下標(biāo)相差n(舊的桶數(shù)組的長(zhǎng)度),這一點(diǎn)依然與
HashMap保持一致

各個(gè)線(xiàn)程之間如何通力協(xié)作
transferIndex,這是一個(gè)被volatile修飾的變量,這一點(diǎn)可以保證所有線(xiàn)程讀到的一定是最新的值。private transient volatile int transferIndex;
nextTab == nullif (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
transferIndex屬性的基礎(chǔ)上,上面的這個(gè)循環(huán)就好理解了while (advance) {
int nextIndex, nextBound;
// 當(dāng)bound <= i <= transferIndex的時(shí)候i自減跳出這個(gè)循環(huán)繼續(xù)干活
if (--i >= bound || finishing)
advance = false;
// 擴(kuò)容的所有任務(wù)已經(jīng)被認(rèn)領(lǐng)完畢,本線(xiàn)程結(jié)束干活
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 否則認(rèn)領(lǐng)新的一段復(fù)制任務(wù),并通過(guò)`CAS`更新transferIndex的值
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
transferIndex就像是一個(gè)游標(biāo),每個(gè)線(xiàn)程認(rèn)領(lǐng)一段復(fù)制任務(wù)的時(shí)候都會(huì)通過(guò)CAS將其更新為transferIndex - stride, CAS可以保證transferIndex可以按照stride這個(gè)步長(zhǎng)降到0。最后一個(gè)擴(kuò)容線(xiàn)程需要二次確認(rèn)?
for循環(huán)的變量i代表要復(fù)制的桶的在桶數(shù)組中的下標(biāo),這個(gè)值的上限和下限通過(guò)游標(biāo)transferIndex和步長(zhǎng)stride計(jì)算得來(lái),當(dāng)i減小為負(fù)數(shù),則說(shuō)明當(dāng)前擴(kuò)容線(xiàn)程完成了擴(kuò)容任務(wù),這時(shí)候流程會(huì)走到這個(gè)分支:// i >= n || i + n >= nextn現(xiàn)在看來(lái)取不到
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) { // 【A】完成整個(gè)擴(kuò)容過(guò)程
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 【B】判斷是否是最后一個(gè)擴(kuò)容線(xiàn)程,如果是,則需要重新掃描一遍桶數(shù)組,做二次確認(rèn)
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT 說(shuō)明是最后一個(gè)擴(kuò)容線(xiàn)程
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 重新掃描一遍桶數(shù)組,做二次確認(rèn)
finishing = advance = true;
i = n; // recheck before commit
}
}
false,所以當(dāng)線(xiàn)程第一次進(jìn)入這個(gè)if分支的話(huà),會(huì)先執(zhí)行注釋為【B】的這個(gè)分支,同時(shí)因?yàn)?code style="overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: 'Operator Mono', Consolas, Monaco, Menlo, monospace;word-break: break-all;">sizeCtl的低16位被初始化為參與擴(kuò)容的線(xiàn)程數(shù)加一,因此,當(dāng)條件(sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT滿(mǎn)足時(shí),就能證明當(dāng)前線(xiàn)程就是最后一個(gè)擴(kuò)容線(xiàn)程了,這這時(shí)候?qū)?code style="overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: 'Operator Mono', Consolas, Monaco, Menlo, monospace;word-break: break-all;">i置為n重新掃描一遍桶數(shù)組,并且將finishing置為true保證當(dāng)桶數(shù)組被掃描結(jié)束后能夠進(jìn)入注釋為【A】的分支結(jié)束擴(kuò)容。concurrency-interest這個(gè)郵件列表中也關(guān)于這件事咨詢(xún)了Doug Lea(地址:http://cs.oswego.edu/pipermail/concurrency-interest/2020-July/017171.html),他給出的回復(fù)是:Yes, this is a valid point; thanks. The post-scan was needed in a previous version, and could be removed. It does not trigger often enough to matter though, so is for now another minor tweak that might be included next time CHM is updated.
Doug在郵件中的措辭用了could be, not often enough等,但也確認(rèn)了最后一個(gè)擴(kuò)容線(xiàn)程的二次檢查是沒(méi)有必要的。具體的復(fù)制過(guò)程與HashMap類(lèi)似,感興趣的讀者可以翻一下高端的面試從來(lái)不會(huì)在HashMap的紅黑樹(shù)上糾纏太多這篇文章。size()方法
addCount()方法
// 記錄map元素總數(shù)的成員變量
private transient volatile long baseCount;
put方法的最后,有一個(gè)addCount方法,因?yàn)?code style="overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: 'Operator Mono', Consolas, Monaco, Menlo, monospace;word-break: break-all;">putVal執(zhí)行到此處說(shuō)明已經(jīng)成功新增了一個(gè)元素,所以addCount方法的作用就是維護(hù)當(dāng)前ConcurrentHashMap的元素總數(shù),在ConcurrentHashMap中有一個(gè)變量baseCount用來(lái)記錄map中元素的個(gè)數(shù),如下圖所示,如果同一時(shí)刻有n個(gè)線(xiàn)程通過(guò)CAS同時(shí)操作baseCount變量,有且僅有一個(gè)線(xiàn)程會(huì)成功,其他線(xiàn)程都會(huì)陷入無(wú)休止的自旋當(dāng)中,那一定會(huì)帶來(lái)性能瓶頸。
baseCount,ConcurrentHashMap引入了一個(gè)輔助隊(duì)列,如下圖所示,現(xiàn)在操作baseCount的線(xiàn)程可以分散到這個(gè)輔助隊(duì)列中去了,調(diào)用size()的時(shí)候只需要將baseCount和輔助隊(duì)列中的數(shù)值相加即可,這樣就實(shí)現(xiàn)了調(diào)用size()無(wú)需加鎖。
CounterCell的數(shù)組:@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
long型的變量value,還需要解決一個(gè)問(wèn)題是,對(duì)于某個(gè)具體的線(xiàn)程它是如何知道操作輔助隊(duì)列中的哪個(gè)值呢?答案是下面的這個(gè)方法:static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
getProbe方法會(huì)返回當(dāng)前線(xiàn)程的一個(gè)唯一身份碼,這個(gè)值是不會(huì)變的,因此可以將getProbe的返回值與輔助隊(duì)列的長(zhǎng)度作求余運(yùn)算得到具體的下標(biāo),它的返回值可能是0,如果返回0則需要調(diào)用ThreadLocalRandom.localInit()初始化。addCount方法中有兩個(gè)細(xì)節(jié)需要注意private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 注意這里的判斷條件,是有技巧的
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
// 變量uncontended記錄著這個(gè)CAS操作是否成功
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
// 檢查是否需要擴(kuò)容,后面再詳細(xì)看
}
}
if判斷條件:if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
}
(as = counterCells) != null則后面的CAS是不會(huì)執(zhí)行的,為什么要這么設(shè)置呢?作者有兩點(diǎn)考慮:原因在于如果 (as = counterCells) != null,則說(shuō)明輔助隊(duì)列已經(jīng)初始化好了,相比于所有的線(xiàn)程都自旋等待baseCount這一個(gè)變量,讓線(xiàn)程通過(guò)CAS去操作隊(duì)列中的值有更大的可能性成功,因?yàn)檩o助隊(duì)列的最大長(zhǎng)度為大于當(dāng)前處理器個(gè)數(shù)的2的正整數(shù)冪,可以支持更大的并發(fā)如果輔助隊(duì)列還沒(méi)有初始化好,直到有必要的時(shí)候再去創(chuàng)建隊(duì)列,如何判斷“必要性”呢?就看對(duì) baseCount的CAS操作能否成功,如果失敗,就說(shuō)明當(dāng)前系統(tǒng)的并發(fā)已經(jīng)比較高了,需要隊(duì)列的輔助,否則直接操作baseCount
ThreadLocalRandom.getProbe()在輔助隊(duì)列中確定的位置不為null時(shí),才對(duì)其做CAS操作,這本來(lái)是一個(gè)正常的防御性判斷,但是uncontended記錄了CAS是否成功,如果失敗,則會(huì)在fullAddCount中調(diào)用ThreadLocalRandom.advanceProbe換一個(gè)身份碼調(diào)整下當(dāng)前線(xiàn)程在輔助隊(duì)列的位置,避免所有線(xiàn)程都在輔助隊(duì)列的同一個(gè)坑位自旋等待。fullAddCount()方法
// See LongAdder version for explanation
// wasUncontended 記錄著調(diào)用方CAS是否成功,如果失敗則換一個(gè)輔助隊(duì)列的元素繼續(xù)CAS
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
// 【A】如果輔助隊(duì)列已經(jīng)創(chuàng)建,則直接操作輔助隊(duì)列
if ((as = counterCells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // 如果調(diào)用方CAS失敗了,本輪空跑,下一個(gè)循環(huán)換下標(biāo)繼續(xù)操作
wasUncontended = true; // Continue after rehash
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
// 如果輔助隊(duì)列長(zhǎng)度已經(jīng)超過(guò)了CPU個(gè)數(shù),本輪空跑,下一個(gè)循環(huán)換下標(biāo)繼續(xù)操作
collide = false; // At max size or stale
else if (!collide) // 如果上一次操作失敗了(CAS失敗或者新建CounterCell失敗),本輪空跑,下一個(gè)循環(huán)換下標(biāo)繼續(xù)操作
collide = true;
else if (cellsBusy == 0 && // 如果連續(xù)兩次操作輔助隊(duì)列失敗,則考慮擴(kuò)容
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
// 如果上一次操作失敗或者調(diào)用方CAS失敗,都會(huì)走到這里,變換要操作的輔助隊(duì)列下標(biāo)
h = ThreadLocalRandom.advanceProbe(h);
}
// 【B】如果輔助隊(duì)列還未創(chuàng)建,則加鎖創(chuàng)建
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// 【C】如果輔助隊(duì)列創(chuàng)建失敗(拿鎖失敗),則嘗試直接操作`baseCount`
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
cellsBusy的自旋鎖。先看最外層的三個(gè)分支:【B】如果輔助隊(duì)列還沒(méi)有創(chuàng)建,則加鎖創(chuàng)建 【C】如果因?yàn)槟面i失敗導(dǎo)致輔助隊(duì)列創(chuàng)建失敗,則嘗試自旋寫(xiě)入變量 baseCount,萬(wàn)一真的成功了呢【A】如果輔助隊(duì)列已經(jīng)創(chuàng)建了,則直接去操作輔助隊(duì)列相應(yīng)的元素
CAS或者加鎖操作輔助隊(duì)列中的某個(gè)元素失敗,則首先通過(guò)調(diào)用ThreadLocalRandom.advanceProbe(h)換一個(gè)隊(duì)列中的元素繼續(xù)操作,這次操作是否成功會(huì)記錄在臨時(shí)變量collide中。如果下一次操作還是失敗,則說(shuō)明此時(shí)的并發(fā)量比較大需要擴(kuò)容了。如果輔助隊(duì)列的長(zhǎng)度已經(jīng)超過(guò)了CPU的個(gè)數(shù),那就不再擴(kuò)容,繼續(xù)換一個(gè)元素操作,因?yàn)橥粫r(shí)間能運(yùn)行的線(xiàn)程數(shù)最大不會(huì)超過(guò)計(jì)算機(jī)的CPU個(gè)數(shù)。counterCells只是一個(gè)普通的數(shù)組,因此并不是線(xiàn)程安全的,所以對(duì)其寫(xiě)操作需要加鎖保證并發(fā)安全double-check的動(dòng)作,我看有的文章將其解讀為“類(lèi)似于單例模式的double-check”,這個(gè)是不對(duì)的,作者這樣做的原因我們?cè)谏弦黄恼轮杏兄v過(guò),首先第一個(gè)檢查cellsBusy == 0是流程往下走的基礎(chǔ),如果cellsBusy == 1則直接拿鎖失敗退出,調(diào)用h = ThreadLocalRandom.advanceProbe(h);更新h后重試,如果cellsBusy == 0校驗(yàn)通過(guò),則調(diào)用CounterCell r = new CounterCell(x);初始化一個(gè)CounterCell,這樣做是為了減少自旋鎖的臨界區(qū)的大小,以此來(lái)提升并發(fā)性能cellsBusy是否為0,如果為1那直接宣告拿鎖失敗,為什么這么做呢?因?yàn)橄啾扔谡{(diào)用UNSAFE的CAS操作,直接讀取volatile的消耗更少,如果直接讀取cellsBusy已經(jīng)能判斷出拿鎖失敗,那就沒(méi)必要再調(diào)用耗時(shí)更多的CAS了cellsBusy從0到1的更改調(diào)用了CAS但是從1置為0卻只用了賦值操作,這是因?yàn)?code style="overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: 'Operator Mono', Consolas, Monaco, Menlo, monospace;word-break: break-all;">CAS可以保證能走到這條語(yǔ)句的只有一個(gè)線(xiàn)程,因此可以用賦值操作來(lái)更改cellsBusy的值。sumCount
ConcurrentHashMap中的元素個(gè)數(shù)分散的記錄到baseCount和輔助隊(duì)列中,調(diào)用size()方法的時(shí)候只需要把這些值相加即可。public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
后記
Java 8對(duì)ConcurrentHashMap的優(yōu)化都在點(diǎn)子上,理論上性能會(huì)比Java 7的ConcurrentHashMap提升不少,但是我暫時(shí)還沒(méi)有找到有對(duì)比的兩個(gè)版本的benchmark,如果有讀者朋友做過(guò)相關(guān)的驗(yàn)證或者看到過(guò)有關(guān)的benchmark,也歡迎和我溝通。往期熱門(mén)文章:
1、《歷史文章分類(lèi)導(dǎo)讀列表!精選優(yōu)秀博文都在這里了!》
2、程序員離職事件始末 3、別總寫(xiě)代碼,這130個(gè)網(wǎng)站比漲工資都重要 4、程序員養(yǎng)生指北 5、如何解決MySQL order by limit語(yǔ)句的分頁(yè)數(shù)據(jù)重復(fù)問(wèn)題? 6、Java中八個(gè)潛在的內(nèi)存泄露風(fēng)險(xiǎn),你知道幾個(gè)? 7、一個(gè)牛逼的 多級(jí)緩存 實(shí)現(xiàn)方案! 8、阿里一面:如何保障消息100%投遞成功、消息冪等性? 9、GitHub 熱榜:被網(wǎng)友瘋狂惡搞的「螞蟻呀嘿」項(xiàng)目終于開(kāi)源了! 10、記住!看小電影前一定要檢查一下域名是不是 HTTPS 的,不然....
