面試:談?wù)勀銓eentrantReadWriteLock的理解,推薦看看(圖解版)
轉(zhuǎn)自:Ccww技術(shù)博客
前言
在前面看了ReentrantLock面試分析,在面試問了多并發(fā)的就少不了ReentrantReadWriteLock讀寫鎖了,那具體是什么問題?我們來看看:
ReentrantReadWriteLock讀寫鎖是什么,有什么用?ReentrantReadWriteLock具有哪些特性?ReentrantReadWriteLock是怎么實現(xiàn)讀寫鎖的獲取?ReentrantReadWriteLock讀寫鎖的釋放
ReentrantReadWriteLock讀寫鎖是什么,有什么用?
平時存在很多場景,大部分線程提供讀服務(wù),只有少數(shù)的寫服務(wù)。然而讀服務(wù)不存在數(shù)據(jù)競爭問題,如果一個線程在讀時禁止其他線程讀勢必會導(dǎo)致性能降低,因此延伸了ReadWriteLock讀寫鎖。
ReentrantReadWriteLock讀寫鎖維護著一對鎖,一個讀鎖和一個寫鎖。
通過分離讀鎖和寫鎖,使得對共享資源的訪問分為讀訪問和寫訪問,多個線程可以同時對共享資源進行讀訪問,讀寫鎖可以極大地提高并發(fā)量。但是同一時間只能有一個線程對共享資源進行寫訪問,其他所有讀線程和寫線程都會被阻塞。
ReentrantReadWriteLock具有哪些特性?
讀寫鎖的主要特性:
公平性:支持公平性和非公平性。
重入性:支持重入。讀寫鎖最多支持65535個遞歸寫入鎖和65535個遞歸讀取鎖。
鎖降級:遵循獲取寫鎖、獲取讀鎖在釋放寫鎖的次序,寫鎖能夠降級成為讀鎖
讀寫鎖除了讀讀不互斥,讀寫、寫讀、寫寫都是互斥的
是否互斥 讀 寫 讀 否 是 寫 是 是
在初步了解了ReentrantReadWriteLock讀寫鎖的原理的情況,我們來講講其到底是怎么實現(xiàn)讀寫鎖的?
ReentrantReadWriteLock是怎么實現(xiàn)讀寫鎖的獲取?
ReentrantReadWriteLock實現(xiàn)讀寫鎖以及公平性是依賴:
// 讀鎖public static class ReadLock implements Lock, java.io.Serializable {}// 寫鎖public static class WriteLock implements Lock, java.io.Serializable {}// 同步器-公平性abstract static class Sync extends AbstractQueuedSynchronizer {}
接下來我們通過構(gòu)造函數(shù)創(chuàng)建一個非公平性的ReentrantReadWriteLock讀寫鎖的來看看其實現(xiàn)原理:
// 默認構(gòu)造方法*public ReentrantReadWriteLock() {this(false);}public ReentrantReadWriteLock(boolean fair) {//默認非公平鎖sync = fair ? new FairSync() : new NonfairSync();//創(chuàng)建讀鎖readerLock = new ReadLock(this);//創(chuàng)建寫鎖writerLock = new WriteLock(this);}
讀寫鎖ReadLock和WriteLock其都是使用同一個Sync,
//讀鎖上鎖protected ReadLock(ReentrantReadWriteLock lock) {sync = lock.sync;}//寫鎖上鎖protected WriteLock(ReentrantReadWriteLock lock) {sync = lock.sync;}
那我們來看看這個sync有什么奇特之處?
在Sync中32位的同步狀態(tài)state,劃分成2部分,高16位表示讀,低16位表示寫,假設(shè)當(dāng)前同步狀態(tài)為s:
讀狀態(tài)為 s >>> 16,即無符號補0右移16位,讀狀態(tài)增加1,等于 s + (1 << 16)
寫狀態(tài)為 s & 0000FFFF,即將高16位全部抹去,寫狀態(tài)增加1,等于 s + 1

abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 6317671515068378041L;static final int SHARED_SHIFT = 16;//讀狀態(tài)加+1使用變量static final int SHARED_UNIT = (1 << SHARED_SHIFT);static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;/** 獲取讀狀態(tài) */static int sharedCount(int c) { return c >>> SHARED_SHIFT; }/** 獲取寫狀態(tài) */static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }...}
readLock::lock()實現(xiàn)
調(diào)用readLock.lock()進行獲取讀鎖共享鎖:
public void lock() {sync.acquireShared(1);}public final void acquireShared(int arg) {//嘗試獲取同步狀態(tài)if (tryAcquireShared(arg) < 0)//獲取失敗則加入到同步隊列doAcquireShared(arg);}
嘗試獲取同步狀態(tài)tryAcquireShared(arg):
互斥鎖-寫狀態(tài)不為0(寫鎖已被獲取),且獲取寫鎖的不是當(dāng)前線程,那么直接返回失敗(已寫鎖狀態(tài)-->進入等待隊列
判斷
readerShouldBlock()同步隊列中第一個節(jié)點是否為寫線程的節(jié)點,如果是寫線程需要阻塞,如果是讀線程不應(yīng)該阻塞,需要更新讀狀態(tài)成功如果不存在,設(shè)置到當(dāng)前線程在最近獲取讀鎖的線程
CachedHoldCounter,且重入次數(shù)加+1存在,重入次數(shù)加+1
如果之前還沒有線程獲取讀鎖,記錄第一個讀者為當(dāng)前線程(無鎖狀態(tài)時-->獲取讀鎖)
如果有線程獲取了讀鎖且是當(dāng)前線程是隊列中第一個讀線程,則把其重入次數(shù)加1(已有讀鎖時-->線程讀鎖再重入)
如果有線程獲取了讀鎖且當(dāng)前線程不是隊列中第一個讀線程,先判斷
readHolds是否存在當(dāng)前線程(已有讀鎖時-->當(dāng)前線程非首個獲取讀鎖)
protected final int tryAcquireShared(int unused) {//獲取當(dāng)前線程Thread current = Thread.currentThread();//獲取同步狀態(tài)int c = getState();//如果寫狀態(tài)不為0(寫鎖已被獲取),且獲取寫鎖的不是當(dāng)前線程,那么直接返回失敗if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;//獲取讀狀態(tài)int r = sharedCount(c);//如果讀線程不應(yīng)該阻塞,且更新讀狀態(tài)成功,那么返回成功//讀線程是否需要排隊(是否是公平模式)if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) {//如果之前還沒有線程獲取讀鎖if (r == 0) {firstReader = current;firstReaderHoldCount = 1;//如果有線程獲取了讀鎖且是當(dāng)前線程是隊列中第一個讀線程} else if (firstReader == current) {firstReaderHoldCount++;} else {//如果有線程獲取了讀鎖當(dāng)前線程不是隊列中第一個讀線程HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;}return 1;}//嘗試獲取讀狀態(tài)return fullTryAcquireShared(current);}
CAS更新讀狀態(tài)失敗,或者readerShouldBlock()判斷出第一個節(jié)點為寫線程的節(jié)點,調(diào)用此方法嘗試獲取讀狀態(tài)
final int fullTryAcquireShared(Thread current) {HoldCounter rh = null;for (;;) {//獲取同步狀態(tài)int c = getState();//如果寫線程已經(jīng)獲取到鎖,且獲取到鎖的線程不是當(dāng)前線程,那么直接返回獲取失敗if (exclusiveCount(c) != 0) {if (getExclusiveOwnerThread() != current)return -1;//如果當(dāng)前線程已經(jīng)獲取了寫鎖,現(xiàn)在正在嘗試獲取讀鎖,//那么此線程不應(yīng)該給同步隊列中的寫線程讓位(讓位是為了防止寫線程饑餓),//因為會造成死鎖。} else if (readerShouldBlock()) {...}if (sharedCount(c) == MAX_COUNT)throw new Error("Maximum lock count exceeded");//不給同步隊列中的寫線程讓位,而是直接嘗試獲取讀鎖if (compareAndSetState(c, c + SHARED_UNIT)) {...}}}
獲取讀鎖失敗了就可能要排隊,其原理跟ReentrantLock差不多。其中setHeadAndPropagate()喚醒下一個讀節(jié)點,是一個喚醒一個喚醒隊列中的讀鎖,而不是一次性喚醒后面所有的讀節(jié)點。
這樣不是很簡潔明了,那直接看看圖:
當(dāng)已寫鎖狀態(tài)-->進入等待隊列

當(dāng)為無鎖狀態(tài)時-->獲取讀鎖

當(dāng)已有讀鎖時-->讀鎖再重入

當(dāng)已有讀鎖時-->當(dāng)前線程非首個獲取讀鎖

還有其他復(fù)雜的情況,可以自己熟悉后再慢慢理解,現(xiàn)在看完了readLock::lock()實現(xiàn),我們來研究研究writeLock::lock()實現(xiàn),你會發(fā)現(xiàn)比較簡單。
writeLock::lock()實現(xiàn)
獲取寫鎖
public final void acquire(int arg) {//嘗試獲取同步狀態(tài),失敗則加入到同步隊列中if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
嘗試獲取同步狀態(tài)
獲取同步狀態(tài)
state,是否為0,是否已經(jīng)獲取過鎖了寫狀態(tài)-互斥
exclusiveCount(c)==0寫狀態(tài)為0(說明讀狀態(tài)不為0,已獲取讀鎖)或者當(dāng)前線程current != getExclusiveOwnerThread()(寫鎖已經(jīng)被搶占),都不允許再次獲取其他線程寫鎖當(dāng)前線程已經(jīng)獲取過寫鎖,這里是重入了,直接把
state加1;state不為0時(說明已獲取讀/寫鎖了)state為0時(說明還沒有讀/寫鎖了),就嘗試更新state的值(非公平模式writerShouldBlock()返回false),該線程設(shè)置為占有者
protected final boolean tryAcquire(int acquires) {//獲取當(dāng)前線程Thread current = Thread.currentThread();//獲取同步狀態(tài)int c = getState();//獲取寫狀態(tài)-互斥int w = exclusiveCount(c);//已獲取鎖了(讀/寫)if (c != 0) {// 如果同步狀態(tài)不為0,且寫狀態(tài)為0,那么說明if (w == 0 || current != getExclusiveOwnerThread())//如果讀狀態(tài)不為0,或者寫鎖已經(jīng)被搶占,那么返回獲取寫鎖失敗return false;if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");//當(dāng)前線程已經(jīng)獲取過寫鎖,這里是重入了,直接把state加1即可setState(c + acquires);return true;}//還沒有鎖(讀/寫),更新同步狀態(tài)且設(shè)置該線程為占有者if (writerShouldBlock() ||!compareAndSetState(c, c + acquires))return false;setExclusiveOwnerThread(current);return true;}
失敗處理流程跟ReadLock模式一樣。也來具體看看流程圖示:
當(dāng)為無鎖狀態(tài)-->獲取寫鎖

當(dāng)已有寫鎖-->獲取重入寫鎖

當(dāng)已有寫鎖-->不同線程獲取寫鎖

那我們來總結(jié)一下獲取鎖的
小結(jié)-獲取鎖
在獲取讀鎖時:
如果寫鎖已經(jīng)被其他線程獲取,那么此線程將會加入到同步隊列,掛起等待喚醒。
如果寫鎖沒有被其他線程獲取,但是同步隊列的第一個節(jié)點是寫線程的節(jié)點,那么此線程讓位給寫線程,掛起等待喚醒
如果獲取讀鎖的線程,已經(jīng)持有了寫鎖,那么即使同步隊列的第一個節(jié)點是寫線程的節(jié)點,它也不會讓位給同步隊列中的寫線程,而是自旋去獲取讀鎖。因為此時讓位會造成死鎖
獲取寫鎖時:
如果讀鎖已經(jīng)被獲取,那么不允許獲取寫鎖。將此線程加入到同步隊列,掛起等待喚醒
如果讀鎖沒有被獲取,但是寫鎖已經(jīng)被其他線程搶占,那么還是將此線程加入到同步隊列,掛起等待喚醒
如果寫鎖已經(jīng)被此線程持有,那么重入,即寫狀態(tài)+1
如果讀鎖和寫鎖都沒有被獲取,那么CAS嘗試獲取寫鎖
獲取讀寫鎖的研究完,我們也需要看看鎖的釋放
ReentrantReadWriteLock讀寫鎖的釋放
readLock.unlock()
讀鎖釋放步驟
釋放鎖
如果第一個讀線程是當(dāng)前線程,將重入的次數(shù)減1,當(dāng)減到0了就把第一個讀者置為空
如果第一個讀線程不是當(dāng)前線程,也需要將重入的次數(shù)減1,
共享鎖獲取的次數(shù)減1, 如果減為0了說明完全釋放了,才返回true
釋放成功,喚醒下一個節(jié)點
// java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock.unlockpublic void unlock() {sync.releaseShared(1);}// java.util.concurrent.locks.AbstractQueuedSynchronizer.releaseSharedpublic final boolean releaseShared(int arg) {// 嘗試釋放鎖if (tryReleaseShared(arg)) {// 喚醒下一個節(jié)點doReleaseShared();return true;}return false;}// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryReleaseSharedprotected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();if (firstReader == current) {// 第一個讀線程是當(dāng)前線程if (firstReaderHoldCount == 1)firstReader = null;elsefirstReaderHoldCount--;} else {// 如果第一個讀者不是當(dāng)前線程HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();int count = rh.count;if (count <= 1) {readHolds.remove();if (count <= 0)throw unmatchedUnlockException();}--rh.count;}for (;;) {// 共享鎖獲取的次數(shù)減1int c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))return nextc == 0;}}private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;// 如果頭節(jié)點狀態(tài)為SIGNAL,說明要喚醒下一個節(jié)點if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck cases// 喚醒下一個節(jié)點unparkSuccessor(h);}else if (ws == 0 &&// 把頭節(jié)點的狀態(tài)改為PROPAGATE成功才會跳到下面的if!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}// 如果喚醒后head沒變,則跳出循環(huán)if (h == head) // loop if head changedbreak;}}
WriteLock.unlock()
寫鎖釋放步驟:
釋放鎖
state狀態(tài)變量的值減1,是否完全釋放鎖,如果完全釋放鎖則設(shè)置獨占為null,設(shè)置
state狀態(tài)變量的值釋放成功,喚醒下一個節(jié)點
public void unlock() {sync.release(1);}public final boolean release(int arg) {// 如果嘗試釋放鎖成功(完全釋放鎖)// 就嘗試喚醒下一個節(jié)點if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryRelease()protected final boolean tryRelease(int releases) {// 如果寫鎖不是當(dāng)前線程占有著,拋出異常if (!isHeldExclusively())throw new IllegalMonitorStateException();// 狀態(tài)變量的值減1int nextc = getState() - releases;// 是否完全釋放鎖boolean free = exclusiveCount(nextc) == 0;if (free)setExclusiveOwnerThread(null);// 設(shè)置狀態(tài)變量的值setState(nextc);// 如果完全釋放了寫鎖,返回truereturn free;
總結(jié)
ReentrantReadWriteLock在使用中也會,比如并發(fā)的讀多寫少情景,本地緩存也可以使用其實現(xiàn),有時間會寫一篇關(guān)于本地緩存設(shè)計方面的文章。
后臺回復(fù)?學(xué)習(xí)資料?領(lǐng)取學(xué)習(xí)視頻
如有收獲,點個在看,誠摯感謝
