ConcurrentHashMap源碼分析

預(yù)備知識
安全失?。╢ail safe)和快速失?。╢ail quick)
快速失敗
現(xiàn)象:在用迭代器遍歷一個集合對象時,如果遍歷過程中對集合對象的內(nèi)容進(jìn)行了增加、刪除、修改操作,則會拋出ConcurrentModificationException。
原理:迭代器在遍歷時直接訪問集合中的內(nèi)容,并且在遍歷過程中使用一個 modCount 變量。集合在被遍歷期間如果內(nèi)容發(fā)生變化,就會改變modCount的值。每當(dāng)?shù)魇褂胔ashNext()/next()遍歷下一個元素之前,都會檢測modCount變量是否為expectedmodCount值,是的話就返回遍歷;否則拋出ConcurrentModificationException異常,終止遍歷。
安全失敗
現(xiàn)象:采用失敗安全機(jī)制的集合容器,在遍歷時不是直接在集合內(nèi)容上訪問的,而是先復(fù)制原有集合內(nèi)容,在拷貝的集合上進(jìn)行遍歷。
原理:由于迭代時是對原集合的拷貝進(jìn)行遍歷,所以在遍歷過程中對原集合所作的修改并不能被迭代器檢測到,所以不會觸發(fā)ConcurrentModificationException。
正是由于安全失敗機(jī)制,使得HashTable無法防止null的元素,因為hashTable是線程安全的,當(dāng)遍歷元素的時候無法判斷當(dāng)前table中是否擁有元素,是不存在元素還是元素為null。
分析思路
hashmap實現(xiàn)線程安全的方式介紹
并發(fā)容器是如何實現(xiàn)線程安全的(底層的介紹)
hashmap和concurrenthashmap進(jìn)行對比
如何實現(xiàn)實現(xiàn)線程安全
hashtable
colletions.synchronizedMap()方法
ConcurrentHashMap
其中上面兩種是效率比較低,原因是使用粗粒度鎖,所以在高并發(fā)環(huán)境下效率沒有第三種高。
其中hashTable中使用了大量對Synchronized關(guān)鍵字保證線程安全。而Collections的方法則是將傳入的形參作為一個參數(shù),內(nèi)部擁有一個mutex對象作為加鎖對象。進(jìn)行synchronized關(guān)鍵字的加鎖。
ConcurrentHashmap實現(xiàn)線程安全的方法
主要看put和get擴(kuò)容以及size方法
put方法流程
如果數(shù)組為空,初始化,初始化完成之后,走 2;
計算當(dāng)前槽點(diǎn)有沒有值,沒有值的話,cas 創(chuàng)建,失敗繼續(xù)自旋(for 死循環(huán)),直到成功, 槽點(diǎn)有值的話,走 3;
如果槽點(diǎn)是轉(zhuǎn)移節(jié)點(diǎn)(正在擴(kuò)容),就會一直自旋等待擴(kuò)容完成之后再新增,不是轉(zhuǎn)移節(jié)點(diǎn)走 4;
槽點(diǎn)有值的,先鎖定當(dāng)前槽點(diǎn),保證其余線程不能操作,如果是鏈表,新增值到鏈表的尾 部,如果是紅黑樹,使用紅黑樹新增的方法新增;
新增完成之后 check 需不需要擴(kuò)容,需要的話去擴(kuò)容。
使用的流程主要是自旋+cas+鎖來保證put方法為安全的。
其中方法1中初始化方法保證多個線程中只有一個線程進(jìn)行初始化的主要操作是通過一個cas操作來保證只有一個線程能夠初始化 ,當(dāng)一個線程獲取到這個sizeCtl后就將sizeCtl值設(shè)置為-1。在不斷的while循環(huán)中若讀取到sizeCtl的值為-1則調(diào)用yield()方法后,釋放cpu再次進(jìn)入自旋的過程。
private?final Node[] initTable() {
????????Node[] tab; int?sc;
????????while?((tab = table) == null?|| tab.length == 0) {
????????????if?((sc = sizeCtl) < 0)
????????????????Thread.yield(); // lost initialization race; just spin
????????????else?if?(U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
????????????????try?{
????????????????????if?((tab = table) == null?|| tab.length == 0) {
????????????????????????int?n = (sc > 0) ? sc : DEFAULT_CAPACITY;
????????????????????????@SuppressWarnings("unchecked")
????????????????????????Node[] nt = (Node[])new?Node,?>[n];
????????????????????????table = tab = nt;
????????????????????????sc = n - (n >>> 2);
????????????????????}
????????????????} finally?{
????????????????????sizeCtl = sc;
????????????????}
????????????????break;
????????????}
????????}
????????return?tab;
????} 第三步:若當(dāng)前的槽點(diǎn)為空的時候為了安全會進(jìn)行cas進(jìn)行添加元素,而不是直接添加安全性能更好。
第四步:若當(dāng)前數(shù)組正在擴(kuò)容就進(jìn)行獲取到當(dāng)前最新的數(shù)組列表然后再次進(jìn)入自旋。
第五步:對當(dāng)前發(fā)生hash沖突的首節(jié)點(diǎn)進(jìn)行上鎖操作,這邊的鎖的粒度比jdk1.7中更加細(xì),所以并發(fā)效果更加好
get方法原理:
在一次qq聊天討論中,我們討論為什么hashtable的get方法是使用syn關(guān)鍵字進(jìn)行加鎖而cchm(concurrenthashMap)get方法卻沒有添加鎖等關(guān)鍵字 這個是為什么?是否會出現(xiàn)數(shù)據(jù)的臟讀等問題?
get操作源碼
首先計算hash值,定位到該table索引位置,如果是首節(jié)點(diǎn)符合就返回
如果遇到擴(kuò)容的時候,會調(diào)用標(biāo)志正在擴(kuò)容節(jié)點(diǎn)ForwardingNode的find方法,查找該節(jié)點(diǎn),匹配就返回
以上都不符合的話,就往下遍歷節(jié)點(diǎn),匹配就返回,否則最后就返回null
//會發(fā)現(xiàn)源碼中沒有一處加了鎖
public?V get(Object key) {
????Node[] tab; Node e, p; int?n, eh; K ek;
????int?h = spread(key.hashCode()); //計算hash
????if?((tab = table) != null?&& (n = tab.length) > 0?&&
????????(e = tabAt(tab, (n - 1) & h)) != null) {//讀取首節(jié)點(diǎn)的Node元素
????????if?((eh = e.hash) == h) { //如果該節(jié)點(diǎn)就是首節(jié)點(diǎn)就返回
????????????if?((ek = e.key) == key || (ek != null?&& key.equals(ek)))
????????????????return?e.val;
????????}
????????//hash值為負(fù)值表示正在擴(kuò)容,這個時候查的是ForwardingNode的find方法來定位到nextTable來
????????//eh=-1,說明該節(jié)點(diǎn)是一個ForwardingNode,正在遷移,此時調(diào)用ForwardingNode的find方法去nextTable里找。
????????//eh=-2,說明該節(jié)點(diǎn)是一個TreeBin,此時調(diào)用TreeBin的find方法遍歷紅黑樹,由于紅黑樹有可能正在旋轉(zhuǎn)變色,所以find里會有讀寫鎖。
????????//eh>=0,說明該節(jié)點(diǎn)下掛的是一個鏈表,直接遍歷該鏈表即可。
????????else?if?(eh < 0)
????????????return?(p = e.find(h, key)) != null?? p.val : null;
????????while?((e = e.next) != null) {//既不是首節(jié)點(diǎn)也不是ForwardingNode,那就往下遍歷
????????????if?(e.hash == h &&
????????????????((ek = e.key) == key || (ek != null?&& key.equals(ek))))
????????????????return?e.val;
????????}
????}
????return?null;
} get沒有加鎖的話,ConcurrentHashMap是如何保證讀到的數(shù)據(jù)不是臟數(shù)據(jù)的呢?
對于可見性,Java提供了volatile關(guān)鍵字來保證可見性、有序性。但不保證原子性。普通的共享變量不能保證可見性,因為普通共享變量被修改之后,什么時候被寫入主存是不確定的,當(dāng)其他線程去讀取時,此時內(nèi)存中可能還是原來的舊值,因此無法保證可見性。
volatile關(guān)鍵字對于基本類型的修改可以在隨后對多個線程的讀保持一致,但是對于引用類型如數(shù)組,實體bean,僅僅保證引用的可見性,但并不保證引用內(nèi)容的可見性。
禁止進(jìn)行指令重排序。
背景:為了提高處理速度,處理器不直接和內(nèi)存進(jìn)行通信,而是先將系統(tǒng)內(nèi)存的數(shù)據(jù)讀到內(nèi)部緩存(L1,L2或其他)后再進(jìn)行操作,但操作完不知道何時會寫到內(nèi)存。
如果對聲明了volatile的變量進(jìn)行寫操作,JVM就會向處理器發(fā)送一條指令,將這個變量所在緩存行的數(shù)據(jù)寫回到系統(tǒng)內(nèi)存。但是,就算寫回到內(nèi)存,如果其他處理器緩存的值還是舊的,再執(zhí)行計算操作就會有問題。
在多處理器下,為了保證各個處理器的緩存是一致的,就會實現(xiàn)緩存一致性協(xié)議,當(dāng)某個CPU在寫數(shù)據(jù)時,如果發(fā)現(xiàn)操作的變量是共享變量,則會通知其他CPU告知該變量的緩存行是無效的,因此其他CPU在讀取該變量時,發(fā)現(xiàn)其無效會重新從主存中加載數(shù)據(jù)。

總結(jié)下來:
第一:使用volatile關(guān)鍵字會強(qiáng)制將修改的值立即寫入主存;
第二:使用volatile關(guān)鍵字的話,當(dāng)線程2進(jìn)行修改時,會導(dǎo)致線程1的工作內(nèi)存中緩存變量的緩存行無效(反映到硬件層的話,就是CPU的L1或者L2緩存中對應(yīng)的緩存行無效);
第三:由于線程1的工作內(nèi)存中緩存變量的緩存行無效,所以線程1再次讀取變量的值時會去主存讀取。
是加在數(shù)組上的volatile嗎?
/**
?????* The array of bins. Lazily initialized upon first insertion.
?????* Size is always a power of two. Accessed directly by iterators.
?????*/
????transient volatile Node[]?table; 我們知道volatile可以修飾數(shù)組的,只是意思和它表面上看起來的樣子不同。舉個栗子,volatile int array[10]是指array的地址是volatile的而不是數(shù)組元素的值是volatile的.
用volatile修飾node
get操作可以無鎖是由于Node的元素val和指針next是用volatile修飾的,在多線程環(huán)境下線程A修改結(jié)點(diǎn)的val或者新增節(jié)點(diǎn)的時候是對線程B可見的。
static?class?Node<K,V> implements?Map.Entry<K,V> {
????final?int?hash;
????final?K key;
????//可以看到這些都用了volatile修飾
????volatile?V val;
????volatile?Node next;
????Node(int?hash, K key, V val, Node next) {
????????this.hash = hash;
????????this.key = key;
????????this.val = val;
????????this.next = next;
????}
????public?final?K getKey()???????{ return?key; }
????public?final?V getValue()?????{ return?val; }
????public?final?int?hashCode()???{ return?key.hashCode() ^ val.hashCode(); }
????public?final?String toString(){ return?key + "="?+ val; }
????public?final?V setValue(V value)?{
????????throw?new?UnsupportedOperationException();
????}
????public?final?boolean?equals(Object o)?{
????????Object k, v, u; Map.Entry,?> e;
????????return?((o instanceof?Map.Entry) &&
????????????????(k = (e = (Map.Entry,?>)o).getKey()) != null?&&
????????????????(v = e.getValue()) != null?&&
????????????????(k == key || k.equals(key)) &&
????????????????(v == (u = val) || v.equals(u)));
????}
????/**
?????* Virtualized support for map.get(); overridden in subclasses.
?????*/
????Node find(int?h, Object k)? {
????????Node e = this;
????????if?(k != null) {
????????????do?{
????????????????K ek;
????????????????if?(e.hash == h &&
????????????????????((ek = e.key) == k || (ek != null?&& k.equals(ek))))
????????????????????return?e;
????????????} while?((e = e.next) != null);
????????}
????????return?null;
????}
} 既然volatile修飾數(shù)組對get操作沒有效果那加在數(shù)組上的volatile的目的是什么呢?
其實就是為了使得Node數(shù)組在擴(kuò)容的時候?qū)ζ渌€程具有可見性而加的volatile
ConcurrentHashMap 1.7和1.8進(jìn)行比較
| 區(qū)別 | 1.7 | 1.8 |
|---|---|---|
| 底層實現(xiàn) | Segment數(shù)組+hashEntry數(shù)組實現(xiàn) | 數(shù)組+鏈表+紅黑樹實現(xiàn) |
| 如何實現(xiàn)線程安全 | Segment保證 | cas+Synchronized |
| 重要方法 | put size | put size |
底層數(shù)據(jù)結(jié)構(gòu)
jdk1.7

其中Segment數(shù)組容量初始化為16,hashEntry數(shù)組的大小為2,兩者擴(kuò)容大小都為2的冪次。
put方法
1.7中put方法主要流程為:
1、線程A執(zhí)行tryLock()方法成功獲取鎖,則把HashEntry對象插入到相應(yīng)的位置;
2、線程B獲取鎖失敗,則執(zhí)行scanAndLockForPut()方法,在scanAndLockForPut方法中,會通過重復(fù)執(zhí)行tryLock()方法嘗試獲取鎖,在多處理器環(huán)境下,重復(fù)次數(shù)為64,單處理器重復(fù)次數(shù)為1,當(dāng)執(zhí)行tryLock()方法的次數(shù)超過上限時,則執(zhí)行l(wèi)ock()方法掛起線程B;
3、當(dāng)線程A執(zhí)行完插入操作時,會通過unlock()方法釋放鎖,接著喚醒線程B繼續(xù)執(zhí)行;
多次獲取鎖的過程有點(diǎn)類似自旋。
size方法主要流程為:
先采用不加鎖的方式,連續(xù)計算元素的個數(shù),最多計算3次:
1、如果前后兩次計算結(jié)果相同,則說明計算出來的元素個數(shù)是準(zhǔn)確的;
2、如果前后兩次計算結(jié)果都不同,則給每個Segment進(jìn)行加鎖,再計算一次元素的個數(shù);
jdk1.8

put方法
若插入位置元素為null 則通過cas進(jìn)行插入元素
若插入元素不為空則對頭節(jié)點(diǎn)元素進(jìn)行加鎖操作,進(jìn)行插入元素操作
第二步需要判斷頭結(jié)點(diǎn)類型若為鏈表使用鏈表方式進(jìn)行插入否則使用樹節(jié)點(diǎn)形式進(jìn)行插入
size方法
實現(xiàn)原理是通過baseCount和cell數(shù)組來計數(shù)保證數(shù)組元素的大小不會改變
初始化時counterCells為空,在并發(fā)量很高時,如果存在兩個線程同時執(zhí)行CAS修改baseCount值,則失敗的線程會繼續(xù)執(zhí)行方法體中的邏輯,使用CounterCell記錄元素個數(shù)的變化;
如果CounterCell數(shù)組counterCells為空,調(diào)用fullAddCount()方法進(jìn)行初始化,并插入對應(yīng)的記錄數(shù),通過CAS設(shè)置cellsBusy字段,只有設(shè)置成功的線程才能初始化CounterCell數(shù)組。
如果通過CAS設(shè)置cellsBusy字段失敗的話,則繼續(xù)嘗試通過CAS修改baseCount字段,如果修改baseCount字段成功的話,就退出循環(huán),否則繼續(xù)循環(huán)插入CounterCell對象;
簡單總結(jié)就是如果多個線程如果同時要修改元素的數(shù)量,一個元素成功修改basecount的同時,另一個元素會進(jìn)行修改countCell數(shù)組內(nèi)元素的大小,若多個線程同時修改則啟動cas。
總結(jié)
在1.7中多線程的并發(fā)修改容量跟segment數(shù)組的容量大小有關(guān),而在1.8中進(jìn)行修改后,鎖的粒度更加小,使得多線程環(huán)境下效率更高。同時Synchronized鎖在之后也進(jìn)行了升級,由無鎖到偏向所再到輕量級鎖到最后的重量級鎖。這樣一個不同的過程。
作者:Deciscive
鏈接:juejin.im/post/6844904146462572557
