RRW源碼分析以及應(yīng)用
“?寫給自己看,說(shuō)給別人聽。你好,這是think123的第83篇原創(chuàng)文章”
針對(duì)讀多寫少的場(chǎng)景,Java提供了另外一個(gè)實(shí)現(xiàn)Lock接口的讀寫鎖ReentrantReadWriteLock(RRW),之前分析過ReentrantLock是一個(gè)獨(dú)占鎖,同一時(shí)間只允許一個(gè)線程訪問。
而 RRW 允許多個(gè)讀線程同時(shí)訪問,但不允許寫線程和讀線程、寫線程和寫線程同時(shí)訪問。讀寫鎖內(nèi)部維護(hù)了兩個(gè)鎖,一個(gè)是用于讀操作的ReadLock,一個(gè)是用于寫操作的 WriteLock。
讀寫鎖遵守以下三條基本原則
允許多個(gè)線程同時(shí)讀共享變量; 只允許一個(gè)線程寫共享變量; 如果一個(gè)寫線程正在執(zhí)行寫操作,此時(shí)禁止讀線程讀共享變量。
讀寫鎖如何實(shí)現(xiàn)
RRW也是基于AQS實(shí)現(xiàn)的,它的自定義同步器(繼承自AQS)需要在同步狀態(tài)state上維護(hù)多個(gè)讀線程和一個(gè)寫線程的狀態(tài)。RRW的做法是使用高低位來(lái)實(shí)現(xiàn)一個(gè)整形控制兩種狀態(tài),一個(gè)int占4個(gè)字節(jié),一個(gè)字節(jié)8位。所以高16位表示讀,低16位表示寫。
abstract?static?class?Sync?extends?AbstractQueuedSynchronizer?{
??static?final?int?SHARED_SHIFT???=?16;
??//?10000000000000000(65536)
??static?final?int?SHARED_UNIT????=?(1?<
??//?65535
??static?final?int?MAX_COUNT??????=?(1?<1;
??//1111111111111111
??static?final?int?EXCLUSIVE_MASK?=?(1?<1;
??//?讀鎖(共享鎖)的數(shù)量,只計(jì)算高16位的值
??static?int?sharedCount(int?c)????{?return?c?>>>?SHARED_SHIFT;?}
??//?寫鎖(獨(dú)占鎖)的數(shù)量
??static?int?exclusiveCount(int?c)?{?return?c?&?EXCLUSIVE_MASK;?}
?}
獲取讀鎖
當(dāng)線程獲取讀鎖時(shí),首先判斷同步狀態(tài)低16位,如果存在寫鎖,則獲取鎖失敗,進(jìn)入CLH隊(duì)列阻塞,反之,判斷當(dāng)前線程是否應(yīng)該被阻塞,如果不應(yīng)該阻塞則嘗試 CAS 同步狀態(tài),獲取成功更新同步鎖為讀狀態(tài)。
?protected?final?int?tryAcquireShared(int?unused)?{
???????????
??Thread?current?=?Thread.currentThread();
??int?c?=?getState();
??//?如果當(dāng)前已經(jīng)有寫鎖了,則獲取失敗
??if?(exclusiveCount(c)?!=?0?&&
??????getExclusiveOwnerThread()?!=?current)
??????return?-1;
??//?獲取讀鎖數(shù)量
??int?r?=?sharedCount(c);
??//?非公平鎖實(shí)現(xiàn)中readerShouldBlock()返回true表示CLH隊(duì)列中有正在排隊(duì)的寫鎖
??//?CAS設(shè)置讀鎖的狀態(tài)值
??if?(!readerShouldBlock()?&&
??????r???????compareAndSetState(c,?c?+?SHARED_UNIT))?{
??????//?省略記錄獲取readLock次數(shù)的代碼
??????return?1;
??}
??//?針對(duì)上面失敗的條件進(jìn)行再次處理
??return?fullTryAcquireShared(current);
}
final?int?fullTryAcquireShared(Thread?current)?{
??
??//?無(wú)線循環(huán)
??for?(;;)?{
????int?c?=?getState();
????if?(exclusiveCount(c)?!=?0)?{
??????//?如果不是當(dāng)前線程持有寫鎖,則進(jìn)入CLH隊(duì)列阻塞
??????if?(getExclusiveOwnerThread()?!=?current)
????????return?-1;
????}?
????//?如果reader應(yīng)該被阻塞
????else?if?(readerShouldBlock())?{
????????//?Make?sure?we're?not?acquiring?read?lock?reentrantly
????????if?(firstReader?==?current)?{
????????????//?assert?firstReaderHoldCount?>?0;
????????}?else?{
????????????if?(rh?==?null)?{
????????????????rh?=?cachedHoldCounter;
????????????????if?(rh?==?null?||?rh.tid?!=?getThreadId(current))?{
????????????????????rh?=?readHolds.get();
????????????????????if?(rh.count?==?0)
????????????????????????readHolds.remove();
????????????????}
????????????}
????????????//?當(dāng)前線程沒有持有讀鎖,即不存在鎖重入情況。則進(jìn)入CLH隊(duì)列阻塞
????????????if?(rh.count?==?0)
????????????????return?-1;
????????}
????}
????//?共享鎖的如果超出了限制
????if?(sharedCount(c)?==?MAX_COUNT)
????????throw?new?Error("Maximum?lock?count?exceeded");
????//?CAS設(shè)置狀態(tài)值
????if?(compareAndSetState(c,?c?+?SHARED_UNIT))?{
??????
??????//?省略記錄readLock次數(shù)的代碼
??????return?1;
????}
??}
}
SHARED_UNIT的值是65536,也就是說(shuō),當(dāng)?shù)谝淮潍@取讀鎖的后,state的值就變成了65536。在公平鎖的實(shí)現(xiàn)中當(dāng)CLH隊(duì)列中有排隊(duì)的線程,readerShouldBlock()方法就會(huì)返回為true。非公平鎖的實(shí)現(xiàn)中則是當(dāng)CLH隊(duì)列中存在等待獲取寫鎖的線程就返回true
還需要注意的是獲取讀鎖的時(shí)候,如果當(dāng)前線程已經(jīng)持有寫鎖,是仍然能獲取讀鎖成功的。后面會(huì)提到鎖的降級(jí),如果你對(duì)那里的代碼有疑問,可以在回過頭來(lái)看看這里申請(qǐng)鎖的代碼
釋放讀鎖
protected?final?boolean?tryReleaseShared(int?unused)?{
???????????
??for?(;;)?{
????int?c?=?getState();
????//?減去65536
????int?nextc?=?c?-?SHARED_UNIT;
????//?只有當(dāng)state的值變成0才會(huì)真正的釋放鎖
????if?(compareAndSetState(c,?nextc))
????????return?nextc?==?0;
}
}
釋放鎖時(shí),state的值需要減去65536,因?yàn)楫?dāng)?shù)谝淮潍@取讀鎖后,state值變成了65536。
任何一個(gè)線程釋放讀鎖的時(shí)候只有在state==0的時(shí)候才真正釋放了鎖,比如有100個(gè)線程獲取了讀鎖,只有最后一個(gè)線程執(zhí)行tryReleaseShared方法時(shí)才真正釋放了鎖,此時(shí)會(huì)喚醒CLH隊(duì)列中的排隊(duì)線程。
獲取寫鎖
一個(gè)線程嘗試獲取寫鎖時(shí),會(huì)先判斷同步狀態(tài) state 是否為0。如果 state 等于 0,說(shuō)明暫時(shí)沒有其它線程獲取鎖;如果 state 不等于 0,則說(shuō)明有其它線程獲取了鎖。
此時(shí)再判斷state的低16位(w)是否為0,如果w為0,表示其他線程獲取了讀鎖,此時(shí)進(jìn)入CLH隊(duì)列進(jìn)行阻塞等待。
如果w不為0,則說(shuō)明其他線程獲取了寫鎖,此時(shí)需要判斷獲取了寫鎖的是不是當(dāng)前線程,如果不是則進(jìn)入CLH隊(duì)列進(jìn)行阻塞等待,如果獲取了寫鎖的是當(dāng)前線程,則判斷當(dāng)前線程獲取寫鎖是否超過了最大次數(shù),若超過,拋出異常。反之則更新同步狀態(tài)。
//?獲取寫鎖
protected?final?boolean?tryAcquire(int?acquires)?{
???????????
??Thread?current?=?Thread.currentThread();
??int?c?=?getState();
??int?w?=?exclusiveCount(c);
??//?判斷state是否為0
??if?(c?!=?0)?{
??????//?獲取鎖失敗
??????if?(w?==?0?||?current?!=?getExclusiveOwnerThread())
??????????return?false;
??????//?判斷當(dāng)前線程獲取寫鎖是否超出了最大次數(shù)65535
??????if?(w?+?exclusiveCount(acquires)?>?MAX_COUNT)
??????????throw?new?Error("Maximum?lock?count?exceeded");
??????
??????//?鎖重入
??????setState(c?+?acquires);
??????return?true;
??}
??//?非公平鎖實(shí)現(xiàn)中writerShouldBlock()永遠(yuǎn)返回為false
??//?CAS修改state的值
??if?(writerShouldBlock()?||
??????!compareAndSetState(c,?c?+?acquires))
??????return?false;
??//?CAS成功后,設(shè)置當(dāng)前線程為擁有獨(dú)占鎖的線程
??setExclusiveOwnerThread(current);
??return?true;
}
在公平鎖的實(shí)現(xiàn)中當(dāng)CLH隊(duì)列中存在排隊(duì)的線程,那么writerShouldBlock()方法就會(huì)返回為true,此時(shí)獲取寫鎖的線程就會(huì)被阻塞。
釋放寫鎖
釋放寫鎖的邏輯比較簡(jiǎn)單
?protected?final?boolean?tryRelease(int?releases)?{
??//?寫鎖是否被當(dāng)前線程持有
??if?(!isHeldExclusively())
??????throw?new?IllegalMonitorStateException();
??
??int?nextc?=?getState()?-?releases;
??boolean?free?=?exclusiveCount(nextc)?==?0;
??//?沒有其他線程持有寫鎖
??if?(free)
??????setExclusiveOwnerThread(null);
??setState(nextc);
??return?free;
}
鎖的升級(jí)?
//?準(zhǔn)備讀緩存
readLock.lock();
try?{
??v?=?map.get(key);
??if(v?==?null)?{
????writeLock.lock();
????try?{
??????if(map.get(key)?!=?null)?{
????????return?map.get(key);
??????}
??????//?更新緩存代碼,省略
????}?finally?{
??????writeLock.unlock();
????}
??}
}?finally?{
??readLock.unlock();
}
對(duì)于上面獲取緩存數(shù)據(jù)(這也是RRW的應(yīng)用場(chǎng)景)的代碼,先是獲取讀鎖,然后再升級(jí)為寫鎖,這樣的行為叫做鎖的升級(jí)。可惜RRW不支持,這樣會(huì)導(dǎo)致寫鎖永久等待,最終導(dǎo)致線程被永久阻塞。所以鎖的升級(jí)是不允許的。
鎖的降級(jí)
雖然鎖的升級(jí)不允許,但是鎖的降級(jí)卻是可以的。
ReentrantReadWriteLock?lock?=?new?ReentrantReadWriteLock();
ReadLock?readLock?=?lock.readLock();
WriteLock?writeLock?=?lock.writeLock();
Map?dataMap?=?new?HashMap();
public?void?processCacheData()?{
??readLock.lock();
??if(!cacheValid())?{
????//?釋放讀鎖,因?yàn)椴辉试S
????readLock.unlock();
????writeLock.lock();
????try?{
??????if(!cacheValid())?{
??????????dataMap.put("key",?"think123");
??????}
??????//?降級(jí)為讀鎖
??????readLock.lock();
????}?finally?{
????????writeLock.unlock();
????}
??}
??try?{
????//?仍然持有讀鎖
????System.out.println(dataMap);
??}?finally?{
??????readLock.unlock();
??}
}
public?boolean?cacheValid()?{
????return?!dataMap.isEmpty();
}
RRW需要注意的問題
在讀取很多、寫入很少的情況下,RRW 會(huì)使寫入線程遭遇饑餓(Starvation)問題,也就是說(shuō)寫入線程會(huì)因遲遲無(wú)法競(jìng)爭(zhēng)到鎖而一直處于等待狀態(tài)。
寫鎖支持條件變量,讀鎖不支持。讀鎖調(diào)用newCondition() 會(huì)拋出UnsupportedOperationException 異常
推薦閱讀
之前有寫過AQS的實(shí)現(xiàn),ReentrantLock的實(shí)現(xiàn),可以參考我下面的文章
關(guān)注我,不迷路
老哥,能給我個(gè)點(diǎn)贊,和關(guān)注嗎?
