StampedLock——比讀寫鎖更快的鎖
“?寫給自己看,說給別人聽。你好,這是think123的第84篇原創(chuàng)文章”
簡介
ReentrantReadWriteLock支持讀寫鎖,StampedLock支持寫鎖、悲觀鎖讀和樂觀讀(無鎖)。
其中寫鎖、悲觀讀鎖的語義和ReentrantReadWriteLock中的寫鎖、讀鎖語義一樣,都是允許多個線程同時獲取悲觀鎖讀,但是只允許一個線程獲取寫鎖,寫鎖和悲觀讀鎖是互斥的。
不同的是:StampedLock 里的寫鎖和悲觀讀鎖加鎖成功之后,都會返回一個 stamp;然后解鎖的時候,需要傳入這個 stamp。
以下為官方使用例子
public?class?Point?{
??private?final?StampedLock?sl?=?new?StampedLock();
??private?double?x,?y;
??void?move(double?deltaX,?double?deltaY)?{
????long?stamp?=?sl.writeLock();
????try?{
??????x?+=?deltaX;
??????y?+=?deltaY;
????}?finally?{
??????sl.unlockWrite(stamp);
????}
??}
??double?distanceFromOrigin()?{
????long?stamp?=?sl.tryOptimisticRead();
????double?currentX?=?x,?currentY?=?y;
????if?(!sl.validate(stamp))?{
????????stamp?=?sl.readLock();
????????try?{
??????????currentX?=?x;
??????????currentY?=?y;
????????}?finally?{
??????????sl.unlockRead(stamp);
????????}
????}
????//?如果處理業(yè)務需要保持互斥,那么就用互斥鎖,如果不需要保持互斥才可以
????//?用讀寫鎖。一般來講緩存是不需要保持互斥性的,能接受瞬間的不一致
????return?Math.sqrt(currentX?*?currentX?+?currentY?*?currentY);
??}
??//?StampedLock?支持鎖的降級(通過?tryConvertToReadLock()方法)
??//?升級(通過tryConvertToWriteLock()方法)
??void?moveIfAtOrigin(double?newX,?double?newY)?{
????long?stamp?=?sl.readLock();
????try?{
??????while?(x?==?0.0?&&?y?==?0.0)?{
????????long?ws?=?sl.tryConvertToWriteLock(stamp);
????????if?(ws?!=?0L)?{
??????????stamp?=?ws;
??????????x?=?newX;
??????????y?=?newY;
??????????break;
????????}?else?{
??????????sl.unlockRead(stamp);
??????????stamp?=?sl.writeLock();
????????}
??????}
????}?finally?{
??????sl.unlock(stamp);
????}
??}
}
主要數(shù)據結構
雖然StampedLock沒有使用AQS,不過它的數(shù)據結構中還是用到了CLH隊列。
WNode是CLH隊列的節(jié)點,其源碼如下
??static?final?class?WNode?{
????//?前驅節(jié)點
????volatile?WNode?prev;
????//?后繼節(jié)點
????volatile?WNode?next;
????//?獲取讀鎖的列表
????volatile?WNode?cowait;
????//?線程
????volatile?Thread?thread;
????//?節(jié)點狀態(tài)。0,?WAITING,?or?CANCELLED
????volatile?int?status;
????
????//?讀模式或者寫模式。RMODE?or?WMODE
????final?int?mode;??
????WNode(int?m,?WNode?p)?{?mode?=?m;?prev?=?p;?}
??}
StampedLock中其中重要屬性如下
?//CLH隊列頭結點
??private?transient?volatile?WNode?whead;
??//?CLH隊列尾節(jié)點
??private?transient?volatile?WNode?wtail;
??//?鎖狀態(tài)
??private?transient?volatile?long?state;
??
??//?讀鎖次數(shù)的額外計數(shù)器
??private?transient?int?readerOverflow;
??//?讀鎖的位數(shù)
??private?static?final?int?LG_READERS?=?7;
??//?計算state值常量
??private?static?final?long?RUNIT?=?1L;
??//?寫鎖標志位,十進制:128?二進制:1000?0000
??private?static?final?long?WBIT??=?1L?<
??//?讀狀態(tài)標志位。?十進制:127??二進制:?0111?1111
??private?static?final?long?RBITS?=?WBIT?-?1L;
??//?state狀態(tài)中記錄讀鎖快滿了的值,126
??private?static?final?long?RFULL?=?RBITS?-?1L;
??//?用來獲取讀寫狀態(tài)。?十進制:255?二進制:1111?1111
??private?static?final?long?ABITS?=?RBITS?|?WBIT;
??//?-128?(1....1?1000?0000)
??private?static?final?long?SBITS?=?~RBITS;
??//?狀態(tài)state的初始值?256?二進制:?00001?0000?0000
??private?static?final?long?ORIGIN?=?WBIT?<1;
??//?鎖的狀態(tài)。使用state來控制當前是讀鎖,還是寫鎖
??private?transient?volatile?long?state;
??
??//?額外的讀鎖計數(shù)器
??private?transient?int?readerOverflow;
??//?state初始值為256
??public?StampedLock()?{
??????state?=?ORIGIN;
??}
看過我之前分析AQS源碼的應該對這個CLH很熟悉了。同樣的StampedLock也是通過這個stae變量來進行讀寫鎖判斷的。這個state承載了三個部分的內容

StampedLock中state是long類型,占64位,它被劃為了三個部分來使用。低7位作為讀鎖標志位,可以由多個線程共享,每有一個線程申請讀鎖成功,低7位就加1。第8位是寫鎖位,由線程獨占。 其余位是stamp位,用來記錄寫鎖狀態(tài)的變化(版本號),每使用一次寫鎖,stamp位就會加1。
同時如果讀鎖數(shù)量超過了126之后,超出的次數(shù)使用readerOverflow來進行計數(shù)。
當出現(xiàn)并發(fā)的情況的時候,CLH隊列的排隊情況是怎樣的呢?
比如,線程w1獲取了寫鎖,一直未釋放。此時有4個線程分別獲取讀鎖(獲取順序是R0-->R1-->R2-->R3),又有線程W2獲取寫鎖,最后還有R4,R5,R6三個線程獲取讀鎖,那么此時隊列的排隊情況如下

因為讀鎖是可以被多個線程獲取的,如果同一時間有多個線程來獲取讀鎖卻獲取不到時,這個時候第一個獲取讀鎖的線程會被加入到鏈表中,而后面的的讀線程會被加入到cowait棧中, 可以認為cowait是一條副鏈。
這里的cowait可以理解為協(xié)同等待,表示將這些獲取讀鎖的線程作為一個整體來獲取鎖。
獲取樂觀讀鎖
public?long?tryOptimisticRead()?{
????long?s;
????return?(((s?=?state)?&?WBIT)?==?0L)???(s?&?SBITS)?:?0L;
}
樂觀讀鎖邏輯比較簡單,就是判斷寫鎖是否被占用(判斷state第8位的值是否為1),如果寫鎖被占用則返回0,否則返回stamp位。
state初始值為256(1?0000?0000)
??01000?0000(WBIT)
&?10000?0000(state)
==============
??00000?0000
?1?0000?0000(state)
?1?1000?0000(SBITS)
?==============
?1?0000?0000
檢測樂觀讀鎖
public?boolean?validate(long?stamp)?{
??//?通過UNSafe插入內存屏障,禁止重排序
??U.loadFence();
??return?(stamp?&?SBITS)?==?(state?&?SBITS);
}
返回true: 表示期間沒有寫鎖,不關心讀鎖。
返回false: 表示期間有寫鎖發(fā)生
SBITS為-128,用二進制表示是:1111 1111 1111 1000 0000
x?xxxx?xxxx(stamp)
1?1000?0000
1?yyyy?yyyy(state)
1?1000?0000
SBITS后7位都是0,也就是不關心讀鎖,我們只關心stamp位和寫位。
當我們獲取樂觀讀時,如果此時已經有了寫鎖,那么返回stamp值為0,此時進行驗證肯定為false。
獲取普通讀鎖
public?long?readLock()?{
??long?s?=?state,?next;
??//?whead?==?wtail時,隊列為空,表示當前沒有線程在排隊
??return?((whead?==?wtail?&&?(s?&?ABITS)????????????U.compareAndSwapLong(this,?STATE,?s,?next?=?s?+?RUNIT))??
???????????//?acquireRead的第一個參數(shù)為false,標識不處理線程中斷
??????????next?:?acquireRead(false,?0L));
}
當獲取讀鎖成功時,會將state值加1。當條件不滿足時(隊列不為空,CAS失敗,或者讀鎖的個數(shù)已經大于等于126),都會進入到acquireRead()方法,這個方法主要分為兩個大的for循環(huán),代碼比較長我就不貼出來了。
它的主要邏輯如下:
如果當前隊列沒有排隊的線程,該線程是第一個準備排隊的元素,就有很大機會獲取到鎖,所以會先自旋嘗試獲取到鎖(自旋次數(shù)為64),如果獲取到了將state值加1,如果讀鎖次數(shù)已經超出了126,則使用readerOverflow記錄超出的讀鎖數(shù)目。如果未獲取到鎖,則進入第2步
?private?long?tryIncReaderOverflow(long?s)?{
?
??//?如果讀鎖的記錄數(shù)等于126,
??if?((s?&?ABITS)?==?RFULL)?{
??????//?將state的值加1
??????//?CAS之后state的后七位的值是127,state的整個值是383(1?0111?1111)。
??????if?(U.compareAndSwapLong(this,?STATE,?s,?s?|?RBITS))?{
??????????//?將readOverflow的值加1
??????????++readerOverflow;
??????????state?=?s;
??????????return?s;
??????}
??}
??else?if?((LockSupport.nextSecondarySeed()?&
????????????OVERFLOW_YIELD_RATE)?==?0)
??????Thread.yield();
??return?0L;
}
初始化入隊排隊節(jié)點,形成鏈表關系。和AQS一樣的是head節(jié)點被當成哨兵節(jié)點或者正持有鎖正在運行的線程(虛擬)。所以head節(jié)點的thread為null,mode的值為WMODE。這樣會形成
head --> node <-- tail這樣的節(jié)點關系如果又有新的線程獲取讀鎖失敗,會將新的線程加入到cowait棧中
對于不是第一個獲取讀鎖的線程,它會先加入到cowait這個棧中,然后再通過CAS獲取鎖,如果獲取失敗就被阻塞,等待被喚醒或者由于超時中斷被取消。
會對第一個獲取讀鎖的線程進行優(yōu)待,因為它前面沒有在排隊的線程,所以這一層還是在自旋獲取鎖,只是自旋次數(shù)再增加,首先會自旋1024次獲得鎖,如果還未獲取到,在自旋2048次。 如果還未等到鎖釋放,就阻塞當前線程,等待被喚醒,直到獲得鎖或者超時中斷被取消。
第1-4步屬于一個大的循環(huán),第5步驟屬于另外一個大的循環(huán)。
同時當獲取讀鎖線程被喚醒獲取到鎖后,它同時也會喚醒掛在它身上的cowait棧中的線程。
從分析中可以看到StampedLock中通過大量的自旋操作可以一定程度避免線程阻塞,只要線程執(zhí)行操作夠快,釋放鎖比較及時,可以說幾乎不會存在阻塞。
釋放讀鎖
釋放讀鎖的代碼比較簡單,主要操作如下
首先判斷stamp是否合法,如果不合法則拋出 IllegalMonitorStateException異常如果讀鎖個數(shù)小于126,則通過CAS將state值減去1,如果釋放的是最后一個讀鎖,則需要喚醒隊列中的節(jié)點。 如果讀鎖個數(shù)已經溢出了,則將readerOverflow減去1
public?void?unlockRead(long?stamp)?{
??long?s,?m;?WNode?h;
??//?自旋
??for?(;;)?{
??????//?判斷stamp是否合法
??????if?(((s?=?state)?&?SBITS)?!=?(stamp?&?SBITS)?||
??????????(stamp?&?ABITS)?==?0L?||?(m?=?s?&?ABITS)?==?0L?||?m?==?WBIT)
??????????throw?new?IllegalMonitorStateException();
??????//?讀鎖個數(shù)小于126
??????if?(m???????????if?(U.compareAndSwapLong(this,?STATE,?s,?s?-?RUNIT))?{
??????????????//?釋放最后一個讀鎖時,需要喚醒下一個節(jié)點
??????????????if?(m?==?RUNIT?&&?(h?=?whead)?!=?null?&&?h.status?!=?0)
??????????????????release(h);
??????????????break;
??????????}
??????}
??????//?讀鎖個數(shù)飽和溢出,需要減少readerOverflow
??????else?if?(tryDecReaderOverflow(s)?!=?0L)
??????????break;
??}
}
獲取寫鎖
public?long?writeLock()?{
??long?s,?next;
??return?((((s?=?state)?&?ABITS)?==?0L?&&
???????????U.compareAndSwapLong(this,?STATE,?s,?next?=?s?+?WBIT))??
??????????next?:?acquireWrite(false,?0L));
}
如果state中的第8位等于0并且CAS設置state值成功,則獲取鎖成功,否則進入到acquireWrite方法。
acquireWrite的邏輯也分為兩部分,分別是兩個for循環(huán)。
第一個for循環(huán)邏輯如下
?for?(int?spins?=?-1;;)?{?//?spin?while?enqueuing
????long?m,?s,?ns;
????//?如果當前寫鎖標志位仍然為0,即沒有其他線程獲取到寫鎖
????if?((m?=?(s?=?state)?&?ABITS)?==?0L)?{
????????//?CAS將state的值加128,實質是將寫鎖狀態(tài)位加1
????????if?(U.compareAndSwapLong(this,?STATE,?s,?ns?=?s?+?WBIT))
????????????return?ns;
????}
????//?其他線程獲取到了寫鎖,則設定自旋次數(shù)為64次
????else?if?(spins?0)
????????spins?=?(m?==?WBIT?&&?wtail?==?whead)???SPINS?:?0;
????else?if?(spins?>?0)?{
????????//?隨機減少自旋次數(shù)
????????if?(LockSupport.nextSecondarySeed()?>=?0)
????????????--spins;
????}
????//?自旋仍未獲取到寫鎖則執(zhí)行下面的邏輯
????//?初始化CLH隊列(主要是whead,wtail節(jié)點)
????//?p指向了wtail節(jié)點
????else?if?((p?=?wtail)?==?null)?{
????????WNode?hd?=?new?WNode(WMODE,?null);
????????if?(U.compareAndSwapObject(this,?WHEAD,?null,?hd))
????????????wtail?=?hd;
????}
????//?初始化節(jié)點
????else?if?(node?==?null)
????????node?=?new?WNode(WMODE,?p);
????//?最終形成?whead?-->?node?<--?wtail
????else?if?(node.prev?!=?p)
????????node.prev?=?p;
????else?if?(U.compareAndSwapObject(this,?WTAIL,?p,?node))?{
????????p.next?=?node;
????????//?形成鏈表關系之后自旋結束
????????break;
????}
}
第二個for循環(huán)代碼也比較長,我就不貼出來了。它的核心邏輯如下
如果仍然只有當前線程在等待鎖,則會先自旋1024次去獲取寫鎖,如果獲取失敗,則在自旋2048次再次去獲取。如果都獲取失敗,則進入第二步。
阻塞當前線程等待被喚醒
釋放寫鎖
public?void?unlockWrite(long?stamp)?{
??WNode?h;
??//?驗證stamp是否合法
??if?(state?!=?stamp?||?(stamp?&?WBIT)?==?0L)
????throw?new?IllegalMonitorStateException();
??//?釋放寫鎖,stamp位(版本號)加1
??//?stamp+WBIT會將state的第8位置為0,就相當于釋放了寫鎖
??state?=?(stamp?+=?WBIT)?==?0L???ORIGIN?:?stamp;
??//?如果頭結點不為空,并且狀態(tài)不為0,,則調用release方法喚醒下一個節(jié)點
??if?((h?=?whead)?!=?null?&&?h.status?!=?0)
??????release(h);
}
private?void?release(WNode?h)?{
??if?(h?!=?null)?{
????WNode?q;?Thread?w;
????U.compareAndSwapInt(h,?WSTATUS,?WAITING,?0);
????//?如果頭結點的下一個節(jié)點為空或者其狀態(tài)為已取消
????if?((q?=?h.next)?==?null?||?q.status?==?CANCELLED)?{
????????//?從尾結點向前遍歷找到可用節(jié)點
????????for?(WNode?t?=?wtail;?t?!=?null?&&?t?!=?h;?t?=?t.prev)
????????????if?(t.status?<=?0)
????????????????q?=?t;
????}
????//?喚醒q節(jié)點所在的線程
????if?(q?!=?null?&&?(w?=?q.thread)?!=?null)
????????U.unpark(w);
??}
}
釋放寫鎖過程總結如下
將state的第8位置為0(釋放寫鎖),并將stamp位(version)加1 喚醒下一個節(jié)點
鎖的降級(tryConvertToReadLock)
tryConvertToReadLock方法可以用于鎖的降級。不過并不是只有再獲取讀鎖時才能調用該方法。
public?long?tryConvertToReadLock(long?stamp)?{
??//?a為鎖標識,m則是最新的鎖標識
??long?a?=?stamp?&?ABITS,?m,?s,?next;?WNode?h;
??//?state的寫鎖標志位和版本號一致(有可能寫鎖標志位是0,即可能是讀鎖)
??while?(((s?=?state)?&?SBITS)?==?(stamp?&?SBITS))?{
??????//?還未添加任何鎖標識
??????if?((m?=?s?&?ABITS)?==?0L)?{
??????????if?(a?!=?0L)
??????????????break;
??????????//?讀鎖次數(shù)小于126
??????????else?if?(m???????????????//?將state的值加1
??????????????if?(U.compareAndSwapLong(this,?STATE,?s,?next?=?s?+?RUNIT))
??????????????????return?next;
??????????}
??????????//?讀鎖次數(shù)已經超出了,使用額外字段記錄
??????????else?if?((next?=?tryIncReaderOverflow(s))?!=?0L)
??????????????return?next;
??????}
??????//?當前是寫鎖狀態(tài)
??????else?if?(m?==?WBIT)?{
??????????if?(a?!=?m)
??????????????break;
??????????//?將讀鎖次數(shù)加1,寫鎖標志置為0,stamp位加1。即加129
??????????state?=?next?=?s?+?(WBIT?+?RUNIT);
??????????//?釋放鎖
??????????if?((h?=?whead)?!=?null?&&?h.status?!=?0)
??????????????release(h);
??????????return?next;
??????}
??????//?已經是讀鎖了直接返回
??????else?if?(a?!=?0L?&&?a???????????return?stamp;
??????else
??????????break;
??}
??//?轉換失敗
??return?0L;
}
它的主要邏輯如下,如果當前線程還未加任何鎖,則加上寫鎖并返回最新的stamp值。如果當前線程已經是寫鎖,則釋放寫鎖,并更新state的值(讀鎖加1,寫鎖狀態(tài)為置為0,version加1)。 如果當前線程是讀鎖則直接返回stamp值。如果上面條件都不滿足,則轉換失敗。
鎖的升級(tryConvertToWriteLock)
public?long?tryConvertToWriteLock(long?stamp)?{
??long?a?=?stamp?&?ABITS,?m,?s,?next;
??//?state的寫鎖標志位和版本號一致(有可能寫鎖標志位是0,即可能是讀鎖)
??while?(((s?=?state)?&?SBITS)?==?(stamp?&?SBITS))?{
??????//?還未添加任何鎖標識
??????if?((m?=?s?&?ABITS)?==?0L)?{
??????????if?(a?!=?0L)
??????????????break;
??????????//?將寫鎖狀態(tài)位置為0
??????????if?(U.compareAndSwapLong(this,?STATE,?s,?next?=?s?+?WBIT))
??????????????return?next;
??????}
??????//?如果當前已經是寫鎖狀態(tài),則直接返回
??????else?if?(m?==?WBIT)?{
??????????if?(a?!=?m)
??????????????break;
??????????return?stamp;
??????}
??????//?如果當前是唯一讀鎖,則轉換為寫鎖
??????else?if?(m?==?RUNIT?&&?a?!=?0L)?{
??????????if?(U.compareAndSwapLong(this,?STATE,?s,
???????????????????????????????????next?=?s?-?RUNIT?+?WBIT))
??????????????return?next;
??????}
??????else
??????????break;
??}
??return?0L;
}
鎖的升級和鎖的降級的邏輯類似,這里就不再讀過介紹了。
總結以及注意
StampedLock沒有使用AQS,而是依靠自己實現(xiàn)的同步狀態(tài)(long state占64位)和變異的CLH隊列
StampedLock使用state的低7位標識讀鎖數(shù)量(超出126的使用readerOverflow字段記錄),第8位標識寫鎖,高56位記錄鎖的版本,每次釋放/獲取寫鎖版本號都會加1
StampedLock中讀鎖和讀鎖不阻塞,讀鎖寫鎖相互阻塞,寫鎖和寫鎖也相互阻塞
StampedLock的連續(xù)多個讀鎖線程,只有第一個是在CLH隊列中,后面的會掛在第一個線程的cowait棧中
StampedLock喚醒第一個讀線程后,讀線程會喚醒它cowait棧的所有讀線程(acquireRead()方法中)
StampedLock不支持公平鎖,也不支持Condition
StampedLock支持鎖的降級和鎖的升級
StampedLock中的樂觀讀操作是無鎖的
StampedLock中使用了大量的自旋+CAS操作,適合持有鎖時間比較短的任務,持有鎖時間長的話不僅會浪費CPU而且仍然會阻塞自己。
線程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上時,此時調用該阻塞線程的 interrupt() 方法,會導致 CPU 飆升。所以,使用 StampedLock 一定不要調用中斷操作,如果需要支持中斷功能,一定使用可中斷的悲觀讀鎖 readLockInterruptibly() 和寫鎖 writeLockInterruptibly()
