你真的了解 ConcurrentHashMap 嗎?
作者:Amaranth007
blog.csdn.net/qq_32828253/article/details/110450249
HashMap是線程不安,而hashTable雖然是線程安全,但是性能很差。java提供了ConcurrentHashMap來(lái)替代hashTable。
我們先來(lái)看一下JDK1.7的currentMap的結(jié)構(gòu):

在ConcurrentHashMap中有個(gè)重要的概念就是Segment。我們知道HashMap的結(jié)構(gòu)是數(shù)組+鏈表形式,從圖中我們可以看出其實(shí)每個(gè)segment就類似于一個(gè)HashMap。Segment包含一個(gè)HashEntry數(shù)組,數(shù)組中的每一個(gè)HashEntry既是一個(gè)鍵值對(duì),也是一個(gè)鏈表的頭節(jié)點(diǎn)。
在ConcurrentHashMap中有2的N次方個(gè)Segment,共同保存在一個(gè)名為segments的數(shù)組當(dāng)中。可以說(shuō),ConcurrentHashMap是一個(gè)二級(jí)哈希表。在一個(gè)總的哈希表下面,有若干個(gè)子哈希表。
為什么說(shuō)ConcurrentHashMap的性能要比HashTable好,HashTables是用全局同步鎖,而CconurrentHashMap采用的是鎖分段,每一個(gè)Segment就好比一個(gè)自治區(qū),讀寫操作高度自治,Segment之間互不干擾。
先來(lái)看看幾個(gè)并發(fā)的場(chǎng)景:
Case1:不同Segment的并發(fā)寫入

不同Segment的寫入是可以并發(fā)執(zhí)行的。
Case2:同一Segment的一寫一讀

同一Segment的寫和讀是可以并發(fā)執(zhí)行的。
Case3:同一Segment的并發(fā)寫入
Segment的寫入是需要上鎖的,因此對(duì)同一Segment的并發(fā)寫入會(huì)被阻塞。
由此可見(jiàn),ConcurrentHashMap當(dāng)中每個(gè)Segment各自持有一把鎖。在保證線程安全的同時(shí)降低了鎖的粒度,讓并發(fā)操作效率更高。
上面我們說(shuō)過(guò),每個(gè)Segment就好比一個(gè)HashMap,其實(shí)里面的操作原理都差不多,只是Segment里面加了鎖。
Get方法:
為輸入的Key做Hash運(yùn)算,得到hash值。 通過(guò)hash值,定位到對(duì)應(yīng)的Segment對(duì)象 再次通過(guò)hash值,定位到Segment當(dāng)中數(shù)組的具體位置。
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
Put方法:
為輸入的Key做Hash運(yùn)算,得到hash值。 通過(guò)hash值,定位到對(duì)應(yīng)的Segment對(duì)象 獲取可重入鎖 再次通過(guò)hash值,定位到Segment當(dāng)中數(shù)組的具體位置。 插入或覆蓋HashEntry對(duì)象。 釋放鎖。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
可以看出get方法并沒(méi)有加鎖,那怎么保證讀取的正確性呢,這是應(yīng)為value變量用了volatile來(lái)修飾,后面再詳細(xì)講解volatile。(搜索公眾號(hào)Java知音,回復(fù)“2021”,送你一份Java面試題寶典)
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
.....
size方法
既然每一個(gè)Segment都各自加鎖,那么我們?cè)诮y(tǒng)計(jì)size()的時(shí)候怎么保證解決一直性的問(wèn)題,比如我們?cè)谟?jì)算size時(shí),有其他線程在添加或刪除元素。
/**
* Returns the number of key-value mappings in this map. If the
* map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
* <tt>Integer.MAX_VALUE</tt>.
*
* @return the number of key-value mappings in this map
*/
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
ConcurrentHashMap的Size方法是一個(gè)嵌套循環(huán),大體邏輯如下:
遍歷所有的Segment。 把Segment的元素?cái)?shù)量累加起來(lái)。 把Segment的修改次數(shù)累加起來(lái)。 判斷所有Segment的總修改次數(shù)是否大于上一次的總修改次數(shù)。如果大于,說(shuō)明統(tǒng)計(jì)過(guò)程中有修改,重新統(tǒng)計(jì),嘗試次數(shù)+1;如果不是。說(shuō)明沒(méi)有修改,統(tǒng)計(jì)結(jié)束。 如果嘗試次數(shù)超過(guò)閾值,則對(duì)每一個(gè)Segment加鎖,再重新統(tǒng)計(jì)。 再次判斷所有Segment的總修改次數(shù)是否大于上一次的總修改次數(shù)。由于已經(jīng)加鎖,次數(shù)一定和上次相等。 釋放鎖,統(tǒng)計(jì)結(jié)束。
可以看出JDK1.7的size計(jì)算方式有點(diǎn)像樂(lè)觀鎖和悲觀鎖的方式,為了盡量不鎖住所有Segment,首先樂(lè)觀地假設(shè)Size過(guò)程中不會(huì)有修改。當(dāng)嘗試一定次數(shù),才無(wú)奈轉(zhuǎn)為悲觀鎖,鎖住所有Segment保證強(qiáng)一致性。
JDK1.7和JDK1.8中ConcurrentHashMap的區(qū)別
1、在JDK1.8中ConcurrentHashMap的實(shí)現(xiàn)方式有了很大的改變,在JDK1.7中采用的是Segment + HashEntry,而Sement繼承了ReentrantLock,所以自帶鎖功能,而在JDK1.8中則取消了Segment,作者認(rèn)為Segment太過(guò)臃腫,采用Node + CAS + Synchronized
ps:Synchronized一直以來(lái)被各種吐槽性能差,但java一直沒(méi)有放棄Synchronized,也一直在改進(jìn),既然作者在這里采用了Synchronized,可見(jiàn)Synchronized的性能應(yīng)該是有所提升的,當(dāng)然只是猜想哈哈哈。。。
2、在上篇HashMap中我們知道,在JDK1.8中當(dāng)HashMap的鏈表個(gè)數(shù)超過(guò)8時(shí),會(huì)轉(zhuǎn)換為紅黑樹(shù),在ConcurrentHashMap中也不例外。這也是新能的一個(gè)小小提升。
3、在JDK1.8版本中,對(duì)于size的計(jì)算,在擴(kuò)容和addCount()時(shí)已經(jīng)在處理了。JDK1.7是在調(diào)用時(shí)才去計(jì)算。
為了幫助統(tǒng)計(jì)size,ConcurrentHashMap提供了baseCount和counterCells兩個(gè)輔助變量和CounterCell輔助類,1.8中使用一個(gè)volatile類型的變量baseCount記錄元素的個(gè)數(shù),當(dāng)插入新數(shù)據(jù)或則刪除數(shù)據(jù)時(shí),會(huì)通過(guò)addCount()方法更新baseCount。
//ConcurrentHashMap中元素個(gè)數(shù),但返回的不一定是當(dāng)前Map的真實(shí)元素個(gè)數(shù)。基于CAS無(wú)鎖更新
private transient volatile long baseCount;
private transient volatile CounterCell[] counterCells; // 部分元素變化的個(gè)數(shù)保存在此數(shù)組中
/**
* {@inheritDoc}
*/
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
END
推薦資源
歡迎添加程序汪個(gè)人微信 itwang007 進(jìn)粉絲群或圍觀朋友圈
往期資源 需要請(qǐng)自取
喜歡就"在看"唄^_^
