有比 ReadWriteLock更快的鎖?
點(diǎn)擊上方藍(lán)色“程序猿DD”,選擇“設(shè)為星標(biāo)”
回復(fù)“資源”獲取獨(dú)家整理的學(xué)習(xí)資料!

作者 |?Ressmix
點(diǎn)擊贈(zèng)書:聊聊「分布式架構(gòu)」那些事兒
一、StampedLock類簡(jiǎn)介
在?搞定ReentrantReadWriteLock 幾道小小數(shù)學(xué)題就夠了,我們?cè)敿?xì)的介紹了RWL,但 Doug Lea 覺(jué)得不夠好。StampedLock類,在JDK1.8時(shí)引入,是對(duì)讀寫鎖ReentrantReadWriteLock的增強(qiáng),該類提供了一些功能,優(yōu)化了讀鎖、寫鎖的訪問(wèn),同時(shí)使讀寫鎖之間可以互相轉(zhuǎn)換,更細(xì)粒度控制并發(fā)。
首先明確下,該類的設(shè)計(jì)初衷是作為一個(gè)內(nèi)部工具類,用于輔助開(kāi)發(fā)其它線程安全組件,用得好,該類可以提升系統(tǒng)性能,用不好,容易產(chǎn)生死鎖和其它莫名其妙的問(wèn)題,算是一把“雙刃劍”
1.1 StampedLock的引入
先來(lái)看下,為什么有了ReentrantReadWriteLock,還要引入StampedLock?
ReentrantReadWriteLock使得多個(gè)讀線程同時(shí)持有讀鎖(只要寫鎖未被占用),而寫鎖是獨(dú)占的。
但是,讀寫鎖如果使用不當(dāng),很容易產(chǎn)生“饑餓”問(wèn)題:
比如在讀線程非常多,寫線程很少的情況下,很容易導(dǎo)致寫線程“饑餓”,雖然使用“公平”策略可以一定程度上緩解這個(gè)問(wèn)題,但是“公平”策略是以犧牲系統(tǒng)吞吐量為代價(jià)的。(在ReentrantLock類的介紹章節(jié)中,介紹過(guò)這種情況)
1.2 StampedLock的特點(diǎn)
StampedLock的主要特點(diǎn)概括一下,有以下幾點(diǎn):
所有獲取鎖的方法,都返回一個(gè)郵戳(Stamp),Stamp為0表示獲取失敗,其余都表示成功; 所有釋放鎖的方法,都需要一個(gè)郵戳(Stamp),這個(gè)Stamp必須是和成功獲取鎖時(shí)得到的Stamp一致; StampedLock是不可重入的;(如果一個(gè)線程已經(jīng)持有了寫鎖,再去獲取寫鎖的話就會(huì)造成死鎖) StampedLock有三種訪問(wèn)模式:①Reading(讀模式):功能和ReentrantReadWriteLock的讀鎖類似;②Writing(寫模式):功能和ReentrantReadWriteLock的寫鎖類似;③Optimistic reading(樂(lè)觀讀模式):這是一種優(yōu)化的讀模式。 StampedLock支持讀鎖和寫鎖的相互轉(zhuǎn)換;我們知道RRW中,當(dāng)線程獲取到寫鎖后,可以降級(jí)為讀鎖,但是讀鎖是不能直接升級(jí)為寫鎖的。StampedLock提供了讀鎖和寫鎖相互轉(zhuǎn)換的功能,使得該類支持更多的應(yīng)用場(chǎng)景。 無(wú)論寫鎖還是讀鎖,都不支持Conditon等待。
我們知道,在ReentrantReadWriteLock中,當(dāng)讀鎖被使用時(shí),如果有線程嘗試獲取寫鎖,該寫線程會(huì)阻塞。但是,在Optimistic reading中,即使讀線程獲取到了讀鎖,寫線程嘗試獲取寫鎖也不會(huì)阻塞,這相當(dāng)于對(duì)讀模式的優(yōu)化,但是可能會(huì)導(dǎo)致數(shù)據(jù)不一致的問(wèn)題。所以,當(dāng)使用Optimistic reading獲取到讀鎖時(shí),必須對(duì)獲取結(jié)果進(jìn)行校驗(yàn)。
二、StampedLock使用示例
先來(lái)看一個(gè)Oracle官方的例子:
class?Point?{
????private?double?x,?y;
????private?final?StampedLock?sl?=?new?StampedLock();
????void?move(double?deltaX,?double?deltaY)?{
????????long?stamp?=?sl.writeLock();????//涉及對(duì)共享資源的修改,使用寫鎖-獨(dú)占操作
????????try?{
????????????x?+=?deltaX;
????????????y?+=?deltaY;
????????}?finally?{
????????????sl.unlockWrite(stamp);
????????}
????}
????/**
?????*?使用樂(lè)觀讀鎖訪問(wèn)共享資源
?????*?注意:樂(lè)觀讀鎖在保證數(shù)據(jù)一致性上需要拷貝一份要操作的變量到方法棧,并且在操作數(shù)據(jù)時(shí)候可能其他寫線程已經(jīng)修改了數(shù)據(jù),
?????*?而我們操作的是方法棧里面的數(shù)據(jù),也就是一個(gè)快照,所以最多返回的不是最新的數(shù)據(jù),但是一致性還是得到保障的。
?????*
?????*?@return
?????*/
????double?distanceFromOrigin()?{
????????long?stamp?=?sl.tryOptimisticRead();????//?使用樂(lè)觀讀鎖
????????double?currentX?=?x,?currentY?=?y;??????//?拷貝共享資源到本地方法棧中
????????if?(!sl.validate(stamp))?{??????????????//?如果有寫鎖被占用,可能造成數(shù)據(jù)不一致,所以要切換到普通讀鎖模式
????????????stamp?=?sl.readLock();?????????????
????????????try?{
????????????????currentX?=?x;
????????????????currentY?=?y;
????????????}?finally?{
????????????????sl.unlockRead(stamp);
????????????}
????????}
????????return?Math.sqrt(currentX?*?currentX?+?currentY?*?currentY);
????}
????void?moveIfAtOrigin(double?newX,?double?newY)?{?//?upgrade
????????//?Could?instead?start?with?optimistic,?not?read?mode
????????long?stamp?=?sl.readLock();
????????try?{
????????????while?(x?==?0.0?&&?y?==?0.0)?{
????????????????long?ws?=?sl.tryConvertToWriteLock(stamp);??//讀鎖轉(zhuǎn)換為寫鎖
????????????????if?(ws?!=?0L)?{
????????????????????stamp?=?ws;
????????????????????x?=?newX;
????????????????????y?=?newY;
????????????????????break;
????????????????}?else?{
????????????????????sl.unlockRead(stamp);
????????????????????stamp?=?sl.writeLock();
????????????????}
????????????}
????????}?finally?{
????????????sl.unlock(stamp);
????????}
????}
}
可以看到,上述示例最特殊的其實(shí)是distanceFromOrigin方法,這個(gè)方法中使用了“Optimistic reading”樂(lè)觀讀鎖,使得讀寫可以并發(fā)執(zhí)行,但是“Optimistic reading”的使用必須遵循以下模式:
long?stamp?=?lock.tryOptimisticRead();??//?非阻塞獲取版本信息
copyVaraibale2ThreadMemory();???????????//?拷貝變量到線程本地堆棧
if(!lock.validate(stamp)){??????????????//?校驗(yàn)
????long?stamp?=?lock.readLock();???????//?獲取讀鎖
????try?{
????????copyVaraibale2ThreadMemory();???//?拷貝變量到線程本地堆棧
?????}?finally?{
???????lock.unlock(stamp);??????????????//?釋放悲觀鎖
????}
}
useThreadMemoryVarables();??????????????//?使用線程本地堆棧里面的數(shù)據(jù)進(jìn)行操作
三、StampedLock原理
3.1 StampedLock的內(nèi)部常量
StampedLock雖然不像其它鎖一樣定義了內(nèi)部類來(lái)實(shí)現(xiàn)AQS框架,但是StampedLock的基本實(shí)現(xiàn)思路還是利用CLH隊(duì)列進(jìn)行線程的管理,通過(guò)同步狀態(tài)值來(lái)表示鎖的狀態(tài)和類型。
StampedLock內(nèi)部定義了很多常量,定義這些常量的根本目的還是和ReentrantReadWriteLock一樣,對(duì)同步狀態(tài)值按位切分,以通過(guò)位運(yùn)算對(duì)State進(jìn)行操作:
對(duì)于StampedLock來(lái)說(shuō),寫鎖被占用的標(biāo)志是第8位為1,讀鎖使用0-7位,正常情況下讀鎖數(shù)目為1-126,超過(guò)126時(shí),使用一個(gè)名為
readerOverflow的int整型保存超出數(shù)。
//?用于計(jì)算state值的位常量
private?static?final?int?LG_READERS?=?7;
private?static?final?long?RUNIT?=?1L;?//?一單位讀鎖???????0000?0001
private?static?final?long?WBIT??=?1L?<//?寫鎖標(biāo)志位??1000?0000
private?static?final?long?RBITS?=?WBIT?-?1L;?//?讀狀態(tài)標(biāo)志?0111?1111
private?static?final?long?RFULL?=?RBITS?-?1L;?//?讀鎖的最大數(shù)量?0111?1110
private?static?final?long?ABITS?=?RBITS?|?WBIT;?//?用于獲取讀寫狀態(tài)?1111?1111
private?static?final?long?SBITS?=?~RBITS;?//?1111...1000?0000
/**?
*?初始state值
*/
private?static?final?long?ORIGIN?=?WBIT?<1;
/**?
*?同步狀態(tài)state,處于寫鎖使用第8位(為1表示占用),讀鎖使用前7位(為1~126,附加的readerOverflow用于當(dāng)讀鎖超過(guò)126時(shí))
*/
private?transient?volatile?long?state;
/**?
*?因?yàn)樽x鎖只使用了前7位,所以當(dāng)超過(guò)對(duì)應(yīng)數(shù)值之后需要使用一個(gè)int型保存?
*/
private?transient?int?readerOverflow;
部分常量的比特位表示如下:
另外,StampedLock相比ReentrantReadWriteLock,對(duì)多核CPU進(jìn)行了優(yōu)化,可以看到,當(dāng)CPU核數(shù)超過(guò)1時(shí),會(huì)有一些自旋操作:
/**
*?CPU核數(shù),用于控制自旋次數(shù)
*/
private?static?final?int?NCPU?=?Runtime.getRuntime().availableProcessors();
/**?
*?嘗試獲取鎖時(shí),如果超過(guò)該值仍未獲取到鎖,則進(jìn)入等待隊(duì)列
*/
private?static?final?int?SPINS?=?(NCPU?>?1)???1?<6?:?0;
/**?
*?等待隊(duì)列的首節(jié)點(diǎn),自旋獲取鎖失敗時(shí)會(huì),會(huì)繼續(xù)阻塞
*/
private?static?final?int?HEAD_SPINS?=?(NCPU?>?1)???1?<10?:?0;
/**?
*?再次進(jìn)入阻塞之前的最大重試次數(shù)
*/
private?static?final?int?MAX_HEAD_SPINS?=?(NCPU?>?1)???1?<16?:?0;
3.2 示例分析
假設(shè)現(xiàn)在有三個(gè)線程:ThreadA、ThreadB、ThreadC、ThreadD。操作如下:
//?ThreadA調(diào)用writeLock,?獲取寫鎖
//?ThreadB調(diào)用readLock,?獲取讀鎖
//?ThreadC調(diào)用readLock,?獲取讀鎖
//?ThreadD調(diào)用writeLock,?獲取寫鎖
//?ThreadE調(diào)用readLock,?獲取讀鎖
1. StampedLock對(duì)象的創(chuàng)建
StampedLock的構(gòu)造器很簡(jiǎn)單,構(gòu)造時(shí)設(shè)置下同步狀態(tài)值:
/**
*?Creates?a?new?lock,?initially?in?unlocked?state.
*/
public?StampedLock()?{
????state?=?ORIGIN;
}
另外,StamedLock提供了三類視圖:
//?views
transient?ReadLockView?readLockView;
transient?WriteLockView?writeLockView;
transient?ReadWriteLockView?readWriteLockView;
這些視圖其實(shí)是對(duì)StampedLock方法的封裝,便于習(xí)慣了ReentrantReadWriteLock的用戶使用:例如,ReadLockView其實(shí)相當(dāng)于ReentrantReadWriteLock.readLock()返回的讀鎖;
final?class?ReadLockView?implements?Lock?{
????public?void?lock()?{?readLock();?}
????public?void?lockInterruptibly()?throws?InterruptedException?{
????????readLockInterruptibly();
????}
????public?boolean?tryLock()?{?return?tryReadLock()?!=?0L;?}
????public?boolean?tryLock(long?time,?TimeUnit?unit)
????????throws?InterruptedException?{
????????return?tryReadLock(time,?unit)?!=?0L;
????}
????public?void?unlock()?{?unstampedUnlockRead();?}
????public?Condition?newCondition()?{
????????throw?new?UnsupportedOperationException();
????}
}
2. ThreadA調(diào)用writeLock獲取寫鎖
來(lái)看下writeLock方法:
/**
?*?獲取寫鎖,如果獲取失敗則進(jìn)入阻塞
?*?注意該方法不響應(yīng)中斷
?*
?*?@return?返回一個(gè)非0的值表示成功,用于解鎖或者轉(zhuǎn)換鎖模式
?*/
public?long?writeLock()?{
????long?s,?next;??
????return?((((s?=?state)?&?ABITS)?==?0L?&&?//?((s?=?state)?&?ABITS)?==?0L表示讀鎖和寫鎖都未被使用
?????????????U.compareAndSwapLong(this,?STATE,?s,?next?=?s?+?WBIT))???//?CAS將第8位置為1,表示寫鎖被占用
????????????next?:?acquireWrite(false,?0L));?//?獲取失敗則調(diào)用acquireWrite,加入等待隊(duì)列
}
StampedLock中大量運(yùn)用了位運(yùn)算,這里(s = state) & ABITS == 0L?表示讀鎖和寫鎖都未被使用,這里寫鎖可以立即獲取成功,然后CAS操作更新同步狀態(tài)值State。
操作完成后,等待隊(duì)列的結(jié)構(gòu)如下:

注意:StampedLock中,等待隊(duì)列的結(jié)點(diǎn)要比AQS中簡(jiǎn)單些,僅僅三種狀態(tài)。0:初始狀態(tài)
-1:等待中
1:取消
另外,結(jié)點(diǎn)的定義中有個(gè)cowait字段,該字段指向一個(gè)棧,用于保存讀線程,這個(gè)后續(xù)會(huì)講到。
//?節(jié)點(diǎn)狀態(tài)
private?static?final?int?WAITING???=?-1;
private?static?final?int?CANCELLED?=??1;
//?節(jié)點(diǎn)類型
private?static?final?int?RMODE?=?0;
private?static?final?int?WMODE?=?1;
/**?
?*?等待隊(duì)列的節(jié)點(diǎn)定義?
?*/
static?final?class?WNode?{
????volatile?WNode?prev;
????volatile?WNode?next;
????volatile?WNode?cowait;????//?該模式使用該節(jié)點(diǎn)形成棧
????volatile?Thread?thread;???//?non-null?while?possibly?parked
????volatile?int?status;??????//?0,?WAITING,?or?CANCELLED
????final?int?mode;???????????//?RMODE?or?WMODE
????WNode(int?m,?WNode?p)?{?mode?=?m;?prev?=?p;?}
?}
/**?等待隊(duì)列頭結(jié)點(diǎn)指針?*/
private?transient?volatile?WNode?whead;
/**?等待隊(duì)列尾結(jié)點(diǎn)指針?*/
private?transient?volatile?WNode?wtail;
3. ThreadB調(diào)用readLock獲取讀鎖
來(lái)看下readLock方法:由于ThreadA此時(shí)持有寫鎖,所以ThreadB獲取讀鎖失敗,將調(diào)用acquireRead方法,加入等待隊(duì)列:
/**
?*?獲取讀鎖,如果寫鎖被占用,則阻塞
?*?注意該方法不響應(yīng)中斷
?*?@return?返回非0表示成功
?*/
public?long?readLock()?{
????long?s?=?state,?next;??
????//?隊(duì)列為空且讀鎖未超限
????return?((whead?==?wtail?&&?(s?&?ABITS)?//?(s?&?ABITS)?
???????????U.compareAndSwapLong(this,?STATE,?s,?next?=?s?+?RUNIT))?
???????????next?:?acquireRead(false,?0L));
}
acquireRead方法非常復(fù)雜,用到了大量自旋操作:
/**
?*?嘗試自旋的獲取讀鎖,?獲取不到則加入等待隊(duì)列,?并阻塞線程
?*
?*?@param?interruptible?true?表示檢測(cè)中斷,?如果線程被中斷過(guò),?則最終返回INTERRUPTED
?*?@param?deadline??????如果非0,?則表示限時(shí)獲取
?*?@return?非0表示獲取成功,?INTERRUPTED表示中途被中斷過(guò)
?*/
private?long?acquireRead(boolean?interruptible,?long?deadline)?{
????WNode?node?=?null,?p;???//?node指向入隊(duì)結(jié)點(diǎn),?p指向入隊(duì)前的隊(duì)尾結(jié)點(diǎn)
????/**
?????*?自旋入隊(duì)操作
?????*?如果寫鎖未被占用,?則立即嘗試獲取讀鎖,?獲取成功則返回.
?????*?如果寫鎖被占用,?則將當(dāng)前讀線程包裝成結(jié)點(diǎn),?并插入等待隊(duì)列(如果隊(duì)尾是寫結(jié)點(diǎn),直接鏈接到隊(duì)尾;否則,鏈接到隊(duì)尾讀結(jié)點(diǎn)的棧中)
?????*/
????for?(int?spins?=?-1;?;?)?{
????????WNode?h;
????????if?((h?=?whead)?==?(p?=?wtail))?{???//?如果隊(duì)列為空或只有頭結(jié)點(diǎn),?則會(huì)立即嘗試獲取讀鎖
????????????for?(long?m,?s,?ns;?;?)?{
????????????????if?((m?=?(s?=?state)?&?ABITS)?//?判斷寫鎖是否被占用
????????????????????U.compareAndSwapLong(this,?STATE,?s,?ns?=?s?+?RUNIT)?:??//寫鎖未占用,且讀鎖數(shù)量未超限,?則更新同步狀態(tài)
????????????????????(m?0L))????????//寫鎖未占用,但讀鎖數(shù)量超限,?超出部分放到readerOverflow字段中
????????????????????return?ns;??????????//?獲取成功后,?直接返回
????????????????else?if?(m?>=?WBIT)?{???//?寫鎖被占用,以隨機(jī)方式探測(cè)是否要退出自旋
????????????????????if?(spins?>?0)?{
????????????????????????if?(LockSupport.nextSecondarySeed()?>=?0)
????????????????????????????--spins;
????????????????????}?else?{
????????????????????????if?(spins?==?0)?{
????????????????????????????WNode?nh?=?whead,?np?=?wtail;
????????????????????????????if?((nh?==?h?&&?np?==?p)?||?(h?=?nh)?!=?(p?=?np))
????????????????????????????????break;
????????????????????????}
????????????????????????spins?=?SPINS;
????????????????????}
????????????????}
????????????}
????????}
????????if?(p?==?null)?{????????????????????????????//?p?==?null表示隊(duì)列為空,?則初始化隊(duì)列(構(gòu)造頭結(jié)點(diǎn))
????????????WNode?hd?=?new?WNode(WMODE,?null);
????????????if?(U.compareAndSwapObject(this,?WHEAD,?null,?hd))
????????????????wtail?=?hd;
????????}?else?if?(node?==?null)?{??????????????????//?將當(dāng)前線程包裝成讀結(jié)點(diǎn)
????????????node?=?new?WNode(RMODE,?p);
????????}?else?if?(h?==?p?||?p.mode?!=?RMODE)?{?????//?如果隊(duì)列只有一個(gè)頭結(jié)點(diǎn),?或隊(duì)尾結(jié)點(diǎn)不是讀結(jié)點(diǎn),?則直接將結(jié)點(diǎn)鏈接到隊(duì)尾,?鏈接完成后退出自旋
????????????if?(node.prev?!=?p)
????????????????node.prev?=?p;
????????????else?if?(U.compareAndSwapObject(this,?WTAIL,?p,?node))?{
????????????????p.next?=?node;
????????????????break;
????????????}
????????}
????????//?隊(duì)列不為空,?且隊(duì)尾是讀結(jié)點(diǎn),?則將添加當(dāng)前結(jié)點(diǎn)鏈接到隊(duì)尾結(jié)點(diǎn)的cowait鏈中(實(shí)際上構(gòu)成一個(gè)棧,?p是棧頂指針?)
????????else?if?(!U.compareAndSwapObject(p,?WCOWAIT,?node.cowait?=?p.cowait,?node))?{????//?CAS操作隊(duì)尾結(jié)點(diǎn)p的cowait字段,實(shí)際上就是頭插法插入結(jié)點(diǎn)
????????????node.cowait?=?null;
????????}?else?{
????????????for?(;?;?)?{
????????????????WNode?pp,?c;
????????????????Thread?w;
????????????????//?嘗試喚醒頭結(jié)點(diǎn)的cowait中的第一個(gè)元素,?假如是讀鎖會(huì)通過(guò)循環(huán)釋放cowait鏈
????????????????if?((h?=?whead)?!=?null?&&?(c?=?h.cowait)?!=?null?&&
????????????????????U.compareAndSwapObject(h,?WCOWAIT,?c,?c.cowait)?&&
????????????????????(w?=?c.thread)?!=?null)?//?help?release
????????????????????U.unpark(w);
????????????????if?(h?==?(pp?=?p.prev)?||?h?==?p?||?pp?==?null)?{
????????????????????long?m,?s,?ns;
????????????????????do?{
????????????????????????if?((m?=?(s?=?state)?&?ABITS)?????????????????????????????U.compareAndSwapLong(this,?STATE,?s,
????????????????????????????????ns?=?s?+?RUNIT)?:
????????????????????????????(m?????????????????????????????????(ns?=?tryIncReaderOverflow(s))?!=?0L))
????????????????????????????return?ns;
????????????????????}?while?(m?????????????????}
????????????????if?(whead?==?h?&&?p.prev?==?pp)?{
????????????????????long?time;
????????????????????if?(pp?==?null?||?h?==?p?||?p.status?>?0)?{
????????????????????????node?=?null;?//?throw?away
????????????????????????break;
????????????????????}
????????????????????if?(deadline?==?0L)
????????????????????????time?=?0L;
????????????????????else?if?((time?=?deadline?-?System.nanoTime())?<=?0L)
????????????????????????return?cancelWaiter(node,?p,?false);
????????????????????Thread?wt?=?Thread.currentThread();
????????????????????U.putObject(wt,?PARKBLOCKER,?this);
????????????????????node.thread?=?wt;
????????????????????if?((h?!=?pp?||?(state?&?ABITS)?==?WBIT)?&&?whead?==?h?&&?p.prev?==?pp)?{
????????????????????????//?寫鎖被占用,?且當(dāng)前結(jié)點(diǎn)不是隊(duì)首結(jié)點(diǎn),?則阻塞當(dāng)前線程
????????????????????????U.park(false,?time);
????????????????????}
????????????????????node.thread?=?null;
????????????????????U.putObject(wt,?PARKBLOCKER,?null);
????????????????????if?(interruptible?&&?Thread.interrupted())
????????????????????????return?cancelWaiter(node,?p,?true);
????????????????}
????????????}
????????}
????}
????for?(int?spins?=?-1;?;?)?{
????????WNode?h,?np,?pp;
????????int?ps;
????????if?((h?=?whead)?==?p)?{?????//?如果當(dāng)前線程是隊(duì)首結(jié)點(diǎn),?則嘗試獲取讀鎖
????????????if?(spins?0)
????????????????spins?=?HEAD_SPINS;
????????????else?if?(spins?????????????????spins?<<=?1;
????????????for?(int?k?=?spins;?;?)?{?//?spin?at?head
????????????????long?m,?s,?ns;
????????????????if?((m?=?(s?=?state)?&?ABITS)?//?判斷寫鎖是否被占用
????????????????????U.compareAndSwapLong(this,?STATE,?s,?ns?=?s?+?RUNIT)?:??//寫鎖未占用,且讀鎖數(shù)量未超限,?則更新同步狀態(tài)
????????????????????(m?0L))?{??????//寫鎖未占用,但讀鎖數(shù)量超限,?超出部分放到readerOverflow字段中
????????????????????//?獲取讀鎖成功,?釋放cowait鏈中的所有讀結(jié)點(diǎn)
????????????????????WNode?c;
????????????????????Thread?w;
????????????????????//?釋放頭結(jié)點(diǎn),?當(dāng)前隊(duì)首結(jié)點(diǎn)成為新的頭結(jié)點(diǎn)
????????????????????whead?=?node;
????????????????????node.prev?=?null;
????????????????????//?從棧頂開(kāi)始(node.cowait指向的結(jié)點(diǎn)),?依次喚醒所有讀結(jié)點(diǎn),?最終node.cowait==null,?node成為新的頭結(jié)點(diǎn)
????????????????????while?((c?=?node.cowait)?!=?null)?{
????????????????????????if?(U.compareAndSwapObject(node,?WCOWAIT,?c,?c.cowait)?&&?(w?=?c.thread)?!=?null)
????????????????????????????U.unpark(w);
????????????????????}
????????????????????return?ns;
????????????????}?else?if?(m?>=?WBIT?&&
????????????????????LockSupport.nextSecondarySeed()?>=?0?&&?--k?<=?0)
????????????????????break;
????????????}
????????}?else?if?(h?!=?null)?{?????//?如果頭結(jié)點(diǎn)存在cowait鏈,?則喚醒鏈中所有讀線程
????????????WNode?c;
????????????Thread?w;
????????????while?((c?=?h.cowait)?!=?null)?{
????????????????if?(U.compareAndSwapObject(h,?WCOWAIT,?c,?c.cowait)?&&
????????????????????(w?=?c.thread)?!=?null)
????????????????????U.unpark(w);
????????????}
????????}
????????if?(whead?==?h)?{
????????????if?((np?=?node.prev)?!=?p)?{
????????????????if?(np?!=?null)
????????????????????(p?=?np).next?=?node;???//?stale
????????????}?else?if?((ps?=?p.status)?==?0)????????//?將前驅(qū)結(jié)點(diǎn)的等待狀態(tài)置為WAITING,?表示之后將喚醒當(dāng)前結(jié)點(diǎn)
????????????????U.compareAndSwapInt(p,?WSTATUS,?0,?WAITING);
????????????else?if?(ps?==?CANCELLED)?{
????????????????if?((pp?=?p.prev)?!=?null)?{
????????????????????node.prev?=?pp;
????????????????????pp.next?=?node;
????????????????}
????????????}?else?{????????//?阻塞當(dāng)前讀線程
????????????????long?time;
????????????????if?(deadline?==?0L)
????????????????????time?=?0L;
????????????????else?if?((time?=?deadline?-?System.nanoTime())?<=?0L)???//限時(shí)等待超時(shí),?取消等待
????????????????????return?cancelWaiter(node,?node,?false);
????????????????Thread?wt?=?Thread.currentThread();
????????????????U.putObject(wt,?PARKBLOCKER,?this);
????????????????node.thread?=?wt;
????????????????if?(p.status?0?&&?(p?!=?h?||?(state?&?ABITS)?==?WBIT)?&&?whead?==?h?&&?node.prev?==?p)?{
????????????????????//?如果前驅(qū)的等待狀態(tài)為WAITING,?且寫鎖被占用,?則阻塞當(dāng)前調(diào)用線程
????????????????????U.park(false,?time);
????????????????}
????????????????node.thread?=?null;
????????????????U.putObject(wt,?PARKBLOCKER,?null);
????????????????if?(interruptible?&&?Thread.interrupted())
????????????????????return?cancelWaiter(node,?node,?true);
????????????}
????????}
????}
}
我們來(lái)分析下這個(gè)方法。
該方法會(huì)首先自旋的嘗試獲取讀鎖,獲取成功后,就直接返回;否則,會(huì)將當(dāng)前線程包裝成一個(gè)讀結(jié)點(diǎn),插入到等待隊(duì)列。
由于,目前等待隊(duì)列還是空,所以ThreadB會(huì)初始化隊(duì)列,然后將自身包裝成一個(gè)讀結(jié)點(diǎn),插入隊(duì)尾,然后在下面這個(gè)地方跳出自旋:

此時(shí),等待隊(duì)列的結(jié)構(gòu)如下:

跳出自旋后,ThreadB會(huì)繼續(xù)向下執(zhí)行,進(jìn)入下一個(gè)自旋,在下一個(gè)自旋中,依然會(huì)再次嘗試獲取讀鎖,如果這次再獲取不到,就會(huì)將前驅(qū)的等待狀態(tài)置為WAITING, 表示我(當(dāng)前線程)要去睡了(阻塞),到時(shí)記得叫醒我:


最終, ThreadB進(jìn)入阻塞狀態(tài):
最終,等待隊(duì)列的結(jié)構(gòu)如下:

4. ThreadC調(diào)用readLock獲取讀鎖
這個(gè)過(guò)程和ThreadB獲取讀鎖一樣,區(qū)別在于ThreadC被包裝成結(jié)點(diǎn)加入等待隊(duì)列后,是鏈接到ThreadB結(jié)點(diǎn)的棧指針中的。調(diào)用完下面這段代碼后,ThreadC會(huì)鏈接到以Thread B為棧頂指針的棧中:


注意:讀結(jié)點(diǎn)的cowait字段其實(shí)構(gòu)成了一個(gè)棧,入棧的過(guò)程其實(shí)是個(gè)“頭插法”插入單鏈表的過(guò)程。比如,再來(lái)個(gè)ThreadX讀結(jié)點(diǎn),則cowait鏈表結(jié)構(gòu)為:
ThreadB - > ThreadX -> ThreadC。最終喚醒讀結(jié)點(diǎn)時(shí),將從棧頂開(kāi)始。
然后會(huì)在下一次自旋中,阻塞當(dāng)前讀線程:
最終,等待隊(duì)列的結(jié)構(gòu)如下:

可以看到,此時(shí)ThreadC結(jié)點(diǎn)并沒(méi)有把它的前驅(qū)的等待狀態(tài)置為-1,因?yàn)門hreadC是鏈接到棧中的,當(dāng)寫鎖釋放的時(shí)候,會(huì)從棧底元素開(kāi)始,喚醒棧中所有讀結(jié)點(diǎn)。
5. ThreadD調(diào)用writeLock獲取寫鎖
ThreadD調(diào)用writeLock方法獲取寫鎖失敗后(ThreadA依然占用著寫鎖),會(huì)調(diào)用acquireWrite方法,該方法整體邏輯和acquireRead差不多,首先自旋的嘗試獲取寫鎖,獲取成功后,就直接返回;否則,會(huì)將當(dāng)前線程包裝成一個(gè)寫結(jié)點(diǎn),插入到等待隊(duì)列。
/**
?*?獲取寫鎖,如果獲取失敗則進(jìn)入阻塞
?*?注意該方法不響應(yīng)中斷
?*
?*?@return?返回一個(gè)非0的值表示成功,用于解鎖或者轉(zhuǎn)換鎖模式
?*/
public?long?writeLock()?{
????long?s,?next;??
????return?((((s?=?state)?&?ABITS)?==?0L?&&?//?((s?=?state)?&?ABITS)?==?0L表示讀鎖和寫鎖都未被使用
?????????????U.compareAndSwapLong(this,?STATE,?s,?next?=?s?+?WBIT))???//?CAS將第8位置為1,表示寫鎖被占用
????????????next?:?acquireWrite(false,?0L));?//?獲取失敗則調(diào)用acquireWrite,加入等待隊(duì)列
}
acquireWrite源碼:
/**
?*?嘗試自旋的獲取寫鎖,?獲取不到則阻塞線程
?*
?*?@param?interruptible?true?表示檢測(cè)中斷,?如果線程被中斷過(guò),?則最終返回INTERRUPTED
?*?@param?deadline??????如果非0,?則表示限時(shí)獲取
?*?@return?非0表示獲取成功,?INTERRUPTED表示中途被中斷過(guò)
?*/
private?long?acquireWrite(boolean?interruptible,?long?deadline)?{
????WNode?node?=?null,?p;
????/**
?????*?自旋入隊(duì)操作
?????*?如果沒(méi)有任何鎖被占用,?則立即嘗試獲取寫鎖,?獲取成功則返回.
?????*?如果存在鎖被使用,?則將當(dāng)前線程包裝成獨(dú)占結(jié)點(diǎn),?并插入等待隊(duì)列尾部
?????*/
????for?(int?spins?=?-1;?;?)?{
????????long?m,?s,?ns;
????????if?((m?=?(s?=?state)?&?ABITS)?==?0L)?{??????//?沒(méi)有任何鎖被占用
????????????if?(U.compareAndSwapLong(this,?STATE,?s,?ns?=?s?+?WBIT))????//?嘗試立即獲取寫鎖
????????????????return?ns;?????????????????????????????????????????????????//?獲取成功直接返回
????????}?else?if?(spins?0)
????????????spins?=?(m?==?WBIT?&&?wtail?==?whead)???SPINS?:?0;
????????else?if?(spins?>?0)?{
????????????if?(LockSupport.nextSecondarySeed()?>=?0)
????????????????--spins;
????????}?else?if?((p?=?wtail)?==?null)?{???????//?隊(duì)列為空,?則初始化隊(duì)列,?構(gòu)造隊(duì)列的頭結(jié)點(diǎn)
????????????WNode?hd?=?new?WNode(WMODE,?null);
????????????if?(U.compareAndSwapObject(this,?WHEAD,?null,?hd))
????????????????wtail?=?hd;
????????}?else?if?(node?==?null)???????????????//?將當(dāng)前線程包裝成寫結(jié)點(diǎn)
????????????node?=?new?WNode(WMODE,?p);
????????else?if?(node.prev?!=?p)
????????????node.prev?=?p;
????????else?if?(U.compareAndSwapObject(this,?WTAIL,?p,?node))?{????//?鏈接結(jié)點(diǎn)至隊(duì)尾
????????????p.next?=?node;
????????????break;
????????}
????}
????for?(int?spins?=?-1;?;?)?{
????????WNode?h,?np,?pp;
????????int?ps;
????????if?((h?=?whead)?==?p)?{?????//?如果當(dāng)前結(jié)點(diǎn)是隊(duì)首結(jié)點(diǎn),?則立即嘗試獲取寫鎖
????????????if?(spins?0)
????????????????spins?=?HEAD_SPINS;
????????????else?if?(spins?????????????????spins?<<=?1;
????????????for?(int?k?=?spins;?;?)?{?//?spin?at?head
????????????????long?s,?ns;
????????????????if?(((s?=?state)?&?ABITS)?==?0L)?{??????//?寫鎖未被占用
????????????????????if?(U.compareAndSwapLong(this,?STATE,?s,
????????????????????????ns?=?s?+?WBIT))?{???????????????//?CAS修改State:?占用寫鎖
????????????????????????//?將隊(duì)首結(jié)點(diǎn)從隊(duì)列移除
????????????????????????whead?=?node;
????????????????????????node.prev?=?null;
????????????????????????return?ns;
????????????????????}
????????????????}?else?if?(LockSupport.nextSecondarySeed()?>=?0?&&
????????????????????--k?<=?0)
????????????????????break;
????????????}
????????}?else?if?(h?!=?null)?{??//?喚醒頭結(jié)點(diǎn)的棧中的所有讀線程
????????????WNode?c;
????????????Thread?w;
????????????while?((c?=?h.cowait)?!=?null)?{
????????????????if?(U.compareAndSwapObject(h,?WCOWAIT,?c,?c.cowait)?&&?(w?=?c.thread)?!=?null)
????????????????????U.unpark(w);
????????????}
????????}
????????if?(whead?==?h)?{
????????????if?((np?=?node.prev)?!=?p)?{
????????????????if?(np?!=?null)
????????????????????(p?=?np).next?=?node;???//?stale
????????????}?else?if?((ps?=?p.status)?==?0)????????//?將當(dāng)前結(jié)點(diǎn)的前驅(qū)置為WAITING,?表示當(dāng)前結(jié)點(diǎn)會(huì)進(jìn)入阻塞,?前驅(qū)將來(lái)需要喚醒我
????????????????U.compareAndSwapInt(p,?WSTATUS,?0,?WAITING);
????????????else?if?(ps?==?CANCELLED)?{
????????????????if?((pp?=?p.prev)?!=?null)?{
????????????????????node.prev?=?pp;
????????????????????pp.next?=?node;
????????????????}
????????????}?else?{????????//?阻塞當(dāng)前調(diào)用線程
????????????????long?time;??//?0?argument?to?park?means?no?timeout
????????????????if?(deadline?==?0L)
????????????????????time?=?0L;
????????????????else?if?((time?=?deadline?-?System.nanoTime())?<=?0L)
????????????????????return?cancelWaiter(node,?node,?false);
????????????????Thread?wt?=?Thread.currentThread();
????????????????U.putObject(wt,?PARKBLOCKER,?this);
????????????????node.thread?=?wt;
????????????????if?(p.status?0?&&?(p?!=?h?||?(state?&?ABITS)?!=?0L)?&&?whead?==?h?&&?node.prev?==?p)
????????????????????U.park(false,?time);????//?emulate?LockSupport.park
????????????????node.thread?=?null;
????????????????U.putObject(wt,?PARKBLOCKER,?null);
????????????????if?(interruptible?&&?Thread.interrupted())
????????????????????return?cancelWaiter(node,?node,?true);
????????????}
????????}
????}
}
acquireWrite中的下面這個(gè)自旋操作,用于將線程包裝成寫結(jié)點(diǎn),插入隊(duì)尾:
插入完成后,隊(duì)列結(jié)構(gòu)如下:

然后,進(jìn)入下一個(gè)自旋,并在下一個(gè)自旋中阻塞ThreadD,最終隊(duì)列結(jié)構(gòu)如下:

6 . ThreadE調(diào)用readLock獲取讀鎖
同樣,由于寫鎖被ThreadA占用著,所以最終會(huì)調(diào)用acquireRead方法,在該方法的第一個(gè)自旋中,會(huì)將ThreadE加入等待隊(duì)列:

注意,由于隊(duì)尾結(jié)點(diǎn)是寫結(jié)點(diǎn),所以當(dāng)前讀結(jié)點(diǎn)會(huì)直接鏈接到隊(duì)尾;如果隊(duì)尾是讀結(jié)點(diǎn),則會(huì)鏈接到隊(duì)尾讀結(jié)點(diǎn)的cowait鏈中。
然后進(jìn)入第二個(gè)自旋,阻塞ThreadE,最終隊(duì)列結(jié)構(gòu)如下:

7. ThreadA調(diào)用unlockWrite釋放寫鎖
通過(guò)CAS操作,修改State成功后,會(huì)調(diào)用release方法喚醒等待隊(duì)列的隊(duì)首結(jié)點(diǎn):
/**
?*?如果鎖狀態(tài)傳遞的stamp匹配則釋放鎖
?*?
?*?@param?stamp?一個(gè)寫鎖操作返回的對(duì)應(yīng)stamp
?*?@throws?IllegalMonitorStateException?如果stamp和當(dāng)前狀態(tài)不匹配則拋出異常
?*/
public?void?unlockWrite(long?stamp)?{
????WNode?h;
????if?(state?!=?stamp?||?(stamp?&?WBIT)?==?0L)?//?如果stamp不匹配或者寫鎖未占用則拋出異常
????????throw?new?IllegalMonitorStateException();
????state?=?(stamp?+=?WBIT)?==?0L???ORIGIN?:?stamp;?//?正常情況下,stamp += WBIT后,第8位為0,表示寫鎖被釋放;但是溢出則置為ORIGIN
????if?((h?=?whead)?!=?null?&&?h.status?!=?0)
????????release(h);?//?喚醒等待隊(duì)列中的隊(duì)首節(jié)點(diǎn)
}
release方法非常簡(jiǎn)單,先將頭結(jié)點(diǎn)的等待狀態(tài)置為0,表示即將喚醒后繼結(jié)點(diǎn),然后立即喚醒隊(duì)首結(jié)點(diǎn):
/**
?*?喚醒等待隊(duì)列的首節(jié)點(diǎn)(即頭結(jié)點(diǎn)whead的后繼節(jié)點(diǎn))
?*?
?*?@param?h?頭結(jié)點(diǎn)
?*/
private?void?release(WNode?h)?{
????if?(h?!=?null)?{
????????WNode?q;?Thread?w;
????????U.compareAndSwapInt(h,?WSTATUS,?WAITING,?0);?//?將頭結(jié)點(diǎn)的狀態(tài)從-1置為0,表示要喚醒后繼節(jié)點(diǎn)
????????if?((q?=?h.next)?==?null?||?q.status?==?CANCELLED)?{
????????????for?(WNode?t?=?wtail;?t?!=?null?&&?t?!=?h;?t?=?t.prev)?//?從隊(duì)尾開(kāi)始查找距離頭結(jié)點(diǎn)最近的WAITING結(jié)點(diǎn)
????????????????if?(t.status?<=?0)
????????????????????q?=?t;
????????}
????????if?(q?!=?null?&&?(w?=?q.thread)?!=?null)
????????????U.unpark(w);?//?喚醒售結(jié)點(diǎn)
????}
}
此時(shí),等待隊(duì)列的結(jié)構(gòu)如下:

8. ThreadB被喚醒后繼續(xù)向下執(zhí)行
ThreadB被喚醒后,會(huì)從原阻塞處繼續(xù)向下執(zhí)行,然后開(kāi)始下一次自旋:

第二次自旋時(shí),ThreadB發(fā)現(xiàn)寫鎖未被占用,則成功獲取到讀鎖,然后從棧頂(ThreadB的cowait指針指向的結(jié)點(diǎn))開(kāi)始喚醒棧中所有線程, 最后返回:

最終,等待隊(duì)列的結(jié)構(gòu)如下:

9. ThreadC被喚醒后繼續(xù)向下執(zhí)行
ThreadC被喚醒后,繼續(xù)執(zhí)行,并進(jìn)入下一次自旋,下一次自旋時(shí),會(huì)成功獲取到讀鎖。
注意,此時(shí)ThreadB和ThreadC已經(jīng)拿到了讀鎖,ThreadD(寫線程)和ThreadE(讀線程)依然阻塞中,原來(lái)ThreadC對(duì)應(yīng)的結(jié)點(diǎn)是個(gè)孤立結(jié)點(diǎn),會(huì)被GC回收。
最終,等待隊(duì)列的結(jié)構(gòu)如下:

10. ThreadB和ThreadC釋放讀鎖
ThreadB和ThreadC調(diào)用unlockRead方法釋放讀鎖,CAS操作State將讀鎖數(shù)量減1:
/**
?*?如果stamp匹配,則釋放鎖
?????*
?*?@param?stamp?一次readLock返回的stamp
?*?@throws?IllegalMonitorStateException?如果stamp和鎖當(dāng)前狀態(tài)不匹配則拋出異常
?*/
public?void?unlockRead(long?stamp)?{
????long?s,?m;?WNode?h;
????for?(;;)?{
????????if?(((s?=?state)?&?SBITS)?!=?(stamp?&?SBITS)?||
????????????(stamp?&?ABITS)?==?0L?||?(m?=?s?&?ABITS)?==?0L?||?m?==?WBIT)
????????????//?stamp不匹配或沒(méi)有任何鎖被占用時(shí)均拋出異常
????????????throw?new?IllegalMonitorStateException();
????????if?(m?//?讀鎖數(shù)量未超限
????????????if?(U.compareAndSwapLong(this,?STATE,?s,?s?-?RUNIT))?{?//?讀鎖數(shù)量-1
????????????????if?(m?==?RUNIT?&&?(h?=?whead)?!=?null?&&?h.status?!=?0)
????????????????????release(h);
????????????????//?如果當(dāng)前讀鎖數(shù)量為0,則喚醒隊(duì)列首結(jié)點(diǎn)
????????????????break;
????????????}
????????}
????????else?if?(tryDecReaderOverflow(s)?!=?0L)?//?讀鎖數(shù)量超限,則溢出字段-1
????????????break;
????}
}
注意,當(dāng)讀鎖的數(shù)量變?yōu)?時(shí)才會(huì)調(diào)用release方法,喚醒隊(duì)首結(jié)點(diǎn):
/**
?*?喚醒等待隊(duì)列的首節(jié)點(diǎn)(即頭結(jié)點(diǎn)whead的后繼節(jié)點(diǎn))
?*?
?*?@param?h?頭結(jié)點(diǎn)
?*/
private?void?release(WNode?h)?{
????if?(h?!=?null)?{
????????WNode?q;?Thread?w;
????????U.compareAndSwapInt(h,?WSTATUS,?WAITING,?0);?//?將頭結(jié)點(diǎn)的狀態(tài)從-1置為0,表示要喚醒后繼節(jié)點(diǎn)
????????if?((q?=?h.next)?==?null?||?q.status?==?CANCELLED)?{
????????????for?(WNode?t?=?wtail;?t?!=?null?&&?t?!=?h;?t?=?t.prev)?//?從隊(duì)尾開(kāi)始查找距離頭結(jié)點(diǎn)最近的WAITING結(jié)點(diǎn)
????????????????if?(t.status?<=?0)
????????????????????q?=?t;
????????}
????????if?(q?!=?null?&&?(w?=?q.thread)?!=?null)
????????????U.unpark(w);?//?喚醒售結(jié)點(diǎn)
????}
}
隊(duì)首結(jié)點(diǎn)(ThreadD寫結(jié)點(diǎn)被喚醒),最終等待隊(duì)列的結(jié)構(gòu)如下:

11. ThreadD被喚醒后繼續(xù)向下執(zhí)行
ThreadD會(huì)從原阻塞處繼續(xù)向下執(zhí)行,并在下一次自旋中獲取到寫鎖,然后返回:

最終,等待隊(duì)列的結(jié)構(gòu)如下:

12. ThreadD調(diào)用unlockWrite釋放寫鎖
ThreadD釋放寫鎖的過(guò)程和步驟7完全相同,會(huì)調(diào)用unlockWrite喚醒隊(duì)首結(jié)點(diǎn)(ThreadE)。

ThreadE被喚醒后會(huì)從原阻塞處繼續(xù)向下執(zhí)行,但由于ThreadE是個(gè)讀結(jié)點(diǎn),所以同時(shí)會(huì)喚醒cowait棧中的所有讀結(jié)點(diǎn),過(guò)程和步驟8完全一樣。最終,等待隊(duì)列的結(jié)構(gòu)如下:

至此,全部執(zhí)行完成。
四、StampedLock類/方法聲明
參考Oracle官方文檔:https://docs.oracle.com/javase/8/docs/api/ *類聲明:*

*方法聲明:*
五、StampedLock總結(jié)
StampedLock的等待隊(duì)列與RRW的CLH隊(duì)列相比,有以下特點(diǎn):
當(dāng)入隊(duì)一個(gè)線程時(shí),如果隊(duì)尾是讀結(jié)點(diǎn),不會(huì)直接鏈接到隊(duì)尾,而是鏈接到該讀結(jié)點(diǎn)的cowait鏈中,cowait鏈本質(zhì)是一個(gè)棧; 當(dāng)入隊(duì)一個(gè)線程時(shí),如果隊(duì)尾是寫結(jié)點(diǎn),則直接鏈接到隊(duì)尾; 喚醒線程的規(guī)則和AQS類似,都是首先喚醒隊(duì)首結(jié)點(diǎn)。區(qū)別是StampedLock中,當(dāng)喚醒的結(jié)點(diǎn)是讀結(jié)點(diǎn)時(shí),會(huì)喚醒該讀結(jié)點(diǎn)的cowait鏈中的所有讀結(jié)點(diǎn)(順序和入棧順序相反,也就是后進(jìn)先出)。
另外,StampedLock使用時(shí)要特別小心,避免鎖重入的操作,在使用樂(lè)觀讀鎖時(shí)也需要遵循相應(yīng)的調(diào)用模板,防止出現(xiàn)數(shù)據(jù)不一致的問(wèn)題。
往期推薦
