淺入淺出 Java ConcurrentHashMap
HashMap 是 Java 中非常強大的數(shù)據(jù)結構,使用頻率非常高,幾乎所有的應用程序都會用到它。但 HashMap 不是線程安全的,不能在多線程環(huán)境下使用,該怎么辦呢?
1)Hashtable,一個老掉牙的同步哈希表,t 竟然還是小寫的,一看就非常不專業(yè):
public?class?Hashtable<K,V>
????????extends?Dictionary<K,V>
????????implements?Map<K,V>,?Cloneable,?java.io.Serializable?{
????public?synchronized?V?put(K?key,?V?value)?{}
????public?synchronized?int?size()?{}
????public?synchronized?V?get(Object?key)?{}
}
里面的方法全部是 synchronized,同步的力度非常大,對不對?這樣的話,性能就沒法保證了。pass。
2)Collections.synchronizedMap(new HashMap,可以把一個 HashMap 包裝成同步的 SynchronizedMap:
private?static?class?SynchronizedMap<K,V>
????????implements?Map<K,V>,?Serializable?{
????public?int?size()?{
????????synchronized?(mutex)?{return?m.size();}
????}
????public?V?get(Object?key)?{
????????synchronized?(mutex)?{return?m.get(key);}
????}
????public?V?put(K?key,?V?value)?{
????????synchronized?(mutex)?{return?m.put(key,?value);}
????}
}
可以看得出,SynchronizedMap 確實比 Hashtable 改進了,synchronized 不再放在方法上,而是放在方法內(nèi)部,作為同步塊出現(xiàn),但仍然是對象級別的同步鎖,讀和寫操作都需要獲取鎖,本質上,仍然只允許一個線程訪問,其他線程被排斥在外。
3)ConcurrentHashMap,本篇的主角,唯一正確的答案。Concurrent 這個單詞就是并發(fā)、并行的意思,所以 ConcurrentHashMap 就是一個可以在多線程環(huán)境下使用的 HashMap。
ConcurrentHashMap 一直在進化,Java 7 和 Java 8 就有很大的不同。Java 7 版本的 ConcurrentHashMap 是基于分段鎖的,就是將內(nèi)部分成不同的 Segment(段),每個段里面是 HashEntry 數(shù)組。

來看一下 Segment:
static?final?class?Segment<K,V>?extends?ReentrantLock?implements?Serializable?{
???????transient?volatile?HashEntry[]?table;
???????transient?int?count;
???????transient?int?modCount;
???????transient?int?threshold;
???????final?float?loadFactor;
}
再來看一下 HashEntry:
static?final?class?HashEntry<K,V>?{
????????final?K?key;???????????????????????//?聲明?key?為?final?型
????????final?int?hash;???????????????????//?聲明?hash?值為?final?型
????????volatile?V?value;?????????????????//?聲明?value?為?volatile?型
????????final?HashEntry?next;??????//?聲明?next?為?final?型
????????HashEntry(K?key,?int?hash,?HashEntry?next,?V?value)?{
????????????this.key?=?key;
????????????this.hash?=?hash;
????????????this.next?=?next;
????????????this.value?=?value;
????????}
?}
和 HashMap 非常相似,唯一的區(qū)別就是 value 是 volatile 的,保證 get 時候的可見性。
Segment 繼承自 ReentrantLock,所以不會像 Hashtable 那樣不管是 put 還是 get 都需要 synchronized,鎖的力度變小了,每個線程只鎖一個 Segment,對其他線程訪問的 Segment 沒有影響。
Java 8 和之后的版本在此基礎上做了很大的改進,不再采用分段鎖的機制了,而是利用 CAS(Compare and Swap,即比較并替換,實現(xiàn)并發(fā)算法時常用到的一種技術)和 synchronized 來保證并發(fā),雖然內(nèi)部仍然定義了 Segment,但僅僅是為了保證序列化時的兼容性,代碼注釋上就可以看得出來:
/**
?*?Stripped-down?version?of?helper?class?used?in?previous?version,
?*?declared?for?the?sake?of?serialization?compatibility.
?*/
static?class?Segment<K,V>?extends?ReentrantLock?implements?Serializable?{
????final?float?loadFactor;
????Segment(float?lf)?{?this.loadFactor?=?lf;?}
}
底層結構和 Java 7 也有所不同,更接近 HashMap(數(shù)組+雙向鏈表+紅黑樹):

來看一下新版 ConcurrentHashMap 定義的關鍵字段:
public?class?ConcurrentHashMap<K,V>?extends?AbstractMap<K,V>
????????implements?ConcurrentMap<K,V>,?Serializable?{
????transient?volatile?Node[]?table;
????private?transient?volatile?Node[]?nextTable;
????private?transient?volatile?int?sizeCtl;
}
1)table,默認為 null,第一次 put 的時候初始化,默認大小為 16,用來存儲 Node 節(jié)點,擴容時大小總是 2 的冪次方。
順帶看一下 Node 的定義:
static?class?Node<K,V>?implements?Map.Entry<K,V>?{
????????final?int?hash;
????????final?K?key;
????????volatile?V?val;
????????volatile?Node?next;
????????//?…?
}
hash 和 key 是 final 的,和 HashMap 的 Node 一樣,因為 key 是不會發(fā)生變化的。val 和 next 是 volatile 的,保證多線程環(huán)境下的可見性。
2)nextTable,默認為 null,擴容時新生成的數(shù)組,大小為原數(shù)組的兩倍。
3)sizeCtl,默認為 0,用來控制 table 的初始化和擴容操作。-1 表示 table 正在初始化;-(1+線程數(shù)) 表示正在被多個線程擴容。
Map 最重要的方法就是 put,ConcurrentHashMap 也不例外:
public?V?put(K?key,?V?value)?{
????return?putVal(key,?value,?false);
}
final?V?putVal(K?key,?V?value,?boolean?onlyIfAbsent)?{
????if?(key?==?null?||?value?==?null)?throw?new?NullPointerException();
????int?hash?=?spread(key.hashCode());
????int?binCount?=?0;
????for?(Node[]?tab?=?table;;)?{
????????Node?f;?int?n,?i,?fh;
????????if?(tab?==?null?||?(n?=?tab.length)?==?0)
????????????tab?=?initTable();
????????else?if?((f?=?tabAt(tab,?i?=?(n?-?1)?&?hash))?==?null)?{
????????????if?(casTabAt(tab,?i,?null,?new?Node(hash,?key,?value,?null)))
????????????????break;???????????????????//?no?lock?when?adding?to?empty?bin
????????}
????????else?if?((fh?=?f.hash)?==?MOVED)
????????????tab?=?helpTransfer(tab,?f);
????????...省略部分代碼
????}
????addCount(1L,?binCount);
????return?null;
}
1)spread() 是一個哈希算法,和 HashMap 的 hash() 方法類似:
static?final?int?spread(int?h)?{
????return?(h?^?(h?>>>?16))?&?HASH_BITS;
}
2)如果是第一次 put 的話,會調(diào)用 initTable() 對 table 進行初始化。
private?final?ConcurrentHashMap.Node[]?initTable()?{
????ConcurrentHashMap.Node[]?tab;?int?sc;
????while?((tab?=?table)?==?null?||?tab.length?==?0)?{
????????if?((sc?=?sizeCtl)?0)
????????????Thread.yield();?//?lost?initialization?race;?just?spin
????????else?if?(U.compareAndSetInt(this,?SIZECTL,?sc,?-1))?{
????????????try?{
????????????????if?((tab?=?table)?==?null?||?tab.length?==?0)?{
????????????????????int?n?=?(sc?>?0)???sc?:?DEFAULT_CAPACITY;
????????????????????@SuppressWarnings("unchecked")
????????????????????ConcurrentHashMap.Node[]?nt?=?(ConcurrentHashMap.Node[])new?ConcurrentHashMap.Node,?>[n];
????????????????????table?=?tab?=?nt;
????????????????????sc?=?n?-?(n?>>>?2);
????????????????}
????????????}?finally?{
????????????????sizeCtl?=?sc;
????????????}
????????????break;
????????}
????}
????return?tab;
}
外層用了一個 while 循環(huán),如果發(fā)現(xiàn) sizeCtl 小于 0 的話,就意味著其他線程正在初始化,yield 讓出 CPU。
第一次 put 的時候會執(zhí)行 U.compareAndSetInt(this, SIZECTL, sc, -1),把 sizeCtl 賦值為 -1,表示當前線程正在初始化。
private?static?final?Unsafe?U?=?Unsafe.getUnsafe();
private?static?final?long?SIZECTL
????????=?U.objectFieldOffset(ConcurrentHashMap.class,?"sizeCtl");
U 是一個 Unsafe(可以提供硬件級別的原子操作,可以獲取某個屬性在內(nèi)存中的位置,也可以修改對象的字段值)對象,compareAndSetInt() 是 Unsafe 的一個本地(native)方法,它就負責把 ConcurrentHashMap 的 sizeCtl 修改為指定的值(-1)。
初始化后的 table 大小為 16(DEFAULT_CAPACITY)。
不是第一次 put 的話,會調(diào)用 tabAt() 取出 key 位置((n - 1) & hash)上的值(f):
static?final??ConcurrentHashMap.Node?tabAt(ConcurrentHashMap.Node[]?tab,?int?i) ? {
????return?(ConcurrentHashMap.Node)U.getReferenceAcquire(tab,?((long)i?<}
U.getReferenceAcquire() 會調(diào)用 Unsafe 的 本地方法 getReferenceVolatile() 獲取指定內(nèi)存中的數(shù)據(jù),保證每次拿到的數(shù)據(jù)都是最新的。
如果 f 為 null,說明 table 中這個位置上是第一次 put 元素,調(diào)用 casTabAt() 插入 Node。
static?final??boolean?casTabAt(ConcurrentHashMap.Node[]?tab,?int?i,
????????????????????????????????????ConcurrentHashMap.Node?c,?ConcurrentHashMap.Node?v) ?{
????return?U.compareAndSetReference(tab,?((long)i?<}
如果 CAS 成功,說明 Node 插入成功,執(zhí)行 addCount() 方法檢查是否需要擴容。
如果失敗,說明有其他線程提前插入了 Node,進行下一輪 for 循環(huán)繼續(xù)嘗試,俗稱自旋。
如果 f 的 hash 為 MOVED(-1),意味著有其他線程正在擴容,執(zhí)行 helpTransfer() 一起擴容。
否則,把 Node 按鏈表或者紅黑樹的方式插入到合適的位置,這個過程是通過 synchronized 塊實現(xiàn)的。
synchronized?(f)?{
????if?(tabAt(tab,?i)?==?f)?{
????????if?(fh?>=?0)?{
????????????binCount?=?1;
????????????for?(Node?e?=?f;;?++binCount)?{
????????????????K?ek;
????????????????if?(e.hash?==?hash?&&
????????????????????((ek?=?e.key)?==?key?||
?????????????????????(ek?!=?null?&&?key.equals(ek))))?{
????????????????????oldVal?=?e.val;
????????????????????if?(!onlyIfAbsent)
????????????????????????e.val?=?value;
????????????????????break;
????????????????}
????????????????Node?pred?=?e;
????????????????if?((e?=?e.next)?==?null)?{
????????????????????pred.next?=?new?Node(hash,?key,
??????????????????????????????????????????????value,?null);
????????????????????break;
????????????????}
????????????}
????????}
????????else?if?(f?instanceof?TreeBin)?{
????????????Node?p;
????????????binCount?=?2;
????????????if?((p?=?((TreeBin)f).putTreeVal(hash,?key,
???????????????????????????????????????????value))?!=?null)?{
????????????????oldVal?=?p.val;
????????????????if?(!onlyIfAbsent)
????????????????????p.val?=?value;
????????????}
????????}
????}
}
1)插入之前,再次調(diào)用 tabAt(tab, i) == f 來判斷 f 是否被其他線程修改。
2)如果 fh(f 的哈希值) >= 0,說明 f 是鏈表的頭節(jié)點,遍歷鏈表,找到對應的 Node,更新值,否則插入到末尾。
3)如果 f 是紅黑樹,則按照紅黑樹的方式插入或者更新節(jié)點。
分析完 put() 方法后,再來看 get() 方法:
public?V?get(Object?key)?{
????ConcurrentHashMap.Node[]?tab;?ConcurrentHashMap.Node?e,?p;?int?n,?eh;?K?ek;
????int?h?=?spread(key.hashCode());
????if?((tab?=?table)?!=?null?&&?(n?=?tab.length)?>?0?&&
????????????(e?=?tabAt(tab,?(n?-?1)?&?h))?!=?null)?{
????????if?((eh?=?e.hash)?==?h)?{
????????????if?((ek?=?e.key)?==?key?||?(ek?!=?null?&&?key.equals(ek)))
????????????????return?e.val;
????????}
????????else?if?(eh?0)
????????????return?(p?=?e.find(h,?key))?!=?null???p.val?:?null;
????????while?((e?=?e.next)?!=?null)?{
????????????if?(e.hash?==?h?&&
????????????????????((ek?=?e.key)?==?key?||?(ek?!=?null?&&?key.equals(ek))))
????????????????return?e.val;
????????}
????}
????return?null;
}
是不是簡單很多?
1)如果哈希值相等((eh = e.hash) == h),直接返回 table 數(shù)組中的元素。
2)如果是紅黑樹(eh < 0),按照紅黑樹的方式 find 返回。
3)如果是鏈表,進行遍歷,然后根據(jù) key 獲取 value。
最后,來寫一個 ConcurrentHashMap 的應用實例吧!
/**
?*?@author?沉默王二,一枚有趣的程序員
?*/
public?class?ConcurrentHashMapDemo?{
????public?final?static?int?THREAD_POOL_SIZE?=?5;
????public?static?void?main(String[]?args)?throws?InterruptedException?{
????????Map?map?=?new?ConcurrentHashMap<>();
????????long?startTime?=?System.nanoTime();
????????ExecutorService?crunchifyExServer?=?Executors.newFixedThreadPool(THREAD_POOL_SIZE);
????????for?(int?j?=?0;?j?????????????crunchifyExServer.execute(new?Runnable()?{
????????????????@SuppressWarnings("unused")
????????????????@Override
????????????????public?void?run()?{
????????????????????for?(int?i?=?0;?i?500000;?i++)?{
????????????????????????map.put("itwanger"+i,?"沉默王二");
????????????????????}
????????????????}
????????????});
????????}
????????crunchifyExServer.shutdown();
????????crunchifyExServer.awaitTermination(Long.MAX_VALUE,?TimeUnit.DAYS);
????????long?entTime?=?System.nanoTime();
????????long?totalTime?=?(entTime?-?startTime)?/?1000000L;
????????System.out.println(totalTime?+?"ms");
????}
}
給同學們留一道作業(yè)題,感興趣的話可以嘗試下,把 ConcurrentHashMap 換成 SynchronizedMap,比較一下兩者性能上的差異,差距還是挺明顯的。
公眾號:沉默王二(ID:cmower)
CSDN:沉默王二
bilibili:沉默王二
知乎:沉默王二
這是一個有顏值卻假裝靠才華茍且的程序員,你知道,他的文章風趣幽默,讀起來就好像花錢一樣爽快。
長按下圖二維碼關注,你將感受到一個有趣的靈魂,且每篇文章都有干貨。
------------------
原創(chuàng)不易,莫要白票,如果覺得有點用的話,請毫不留情地素質三連吧,分享、點贊、在看,我不挑,因為這將是我寫作更多優(yōu)質文章的最強動力。
