一文看懂JUC之AQS機(jī)制
正文
為了解決原子性的問(wèn)題,Java加入了鎖機(jī)制,同時(shí)保證了可見(jiàn)性和順序性。JDK1.5的并發(fā)包中新增了Lock接口以及相關(guān)實(shí)現(xiàn)類(lèi)來(lái)實(shí)現(xiàn)鎖功能,比synchronized更加靈活,開(kāi)發(fā)者可根據(jù)實(shí)際的場(chǎng)景選擇相應(yīng)的實(shí)現(xiàn)類(lèi)。
本文注重講解其不同衍生類(lèi)的使用場(chǎng)景以及其內(nèi)部AQS的原理。并發(fā)問(wèn)題引入以及synchronized相關(guān)的知識(shí)請(qǐng)看上一篇文章一文看懂Java鎖機(jī)制。
Lock特性
可重入
像synchronized和ReentrantLock都是可重入鎖,可重入性表明了鎖的分配機(jī)制是基于線程的分配,而不是基于方法調(diào)用的分配。
舉個(gè)簡(jiǎn)單的例子,當(dāng)一個(gè)線程已經(jīng)獲取到鎖,當(dāng)后續(xù)再獲取同一個(gè)鎖,直接獲取成功。但獲取鎖和釋放鎖必須要成對(duì)出現(xiàn)。
可響應(yīng)中斷
當(dāng)線程因?yàn)楂@取鎖而進(jìn)入阻塞狀態(tài),外部是可以中斷該線程的,調(diào)用方通過(guò)捕獲InterruptedException可以捕獲中斷
可設(shè)置超時(shí)時(shí)間
獲取鎖時(shí),可以指定超時(shí)時(shí)間,可以通過(guò)返回值來(lái)判斷是否成功獲取鎖
公平性
提供公平性鎖和非公平鎖(默認(rèn))兩種選擇。
公平鎖,線程將按照他們發(fā)出請(qǐng)求的順序來(lái)獲取鎖,不允許插隊(duì);
非公平鎖,則允許插隊(duì):當(dāng)一個(gè)線程發(fā)生獲取鎖的請(qǐng)求的時(shí)刻,如果這個(gè)鎖是可用的,那這個(gè)線程將跳過(guò)所在隊(duì)列里等待線程并獲得鎖。
考慮這么一種情況:A線程持有鎖,B線程請(qǐng)求這個(gè)鎖,因此B線程被掛起;A線程釋放這個(gè)鎖時(shí),B線程將被喚醒,因此再次嘗試獲取鎖;與此同時(shí),C線程也請(qǐng)求獲取這個(gè)鎖,那么C線程很可能在B線程被完全喚醒之前獲得、使用以及釋放這個(gè)鎖。
這是種雙贏的局面,B獲取鎖的時(shí)刻(B被喚醒后才能獲取鎖)并沒(méi)有推遲,C更早地獲取了鎖,并且吞吐量也獲得了提高。在大多數(shù)情況下,非公平鎖的性能要高于公平鎖的性能。
另外,這個(gè)公平性是針對(duì)線程而言的,不能依賴此來(lái)實(shí)現(xiàn)業(yè)務(wù)上的公平性,應(yīng)該由開(kāi)發(fā)者自己控制,比如通過(guò)FIFO隊(duì)列來(lái)保證公布。
讀寫(xiě)鎖
允許讀鎖和寫(xiě)鎖分離,讀鎖與寫(xiě)鎖互斥,但是多個(gè)讀鎖可以共存,適用于讀頻次遠(yuǎn)大于寫(xiě)頻次的場(chǎng)景
豐富的API
提供了多個(gè)方法來(lái)獲取鎖相關(guān)的信息,可以幫助開(kāi)發(fā)者監(jiān)控和排查問(wèn)題
isFair():判斷鎖是否是公平鎖 isLocked():判斷鎖是否被任何線程獲取了 isHeldByCurrentThread():判斷鎖是否被當(dāng)前線程獲取了 hasQueuedThreads():判斷是否有線程在等待該鎖 getHoldCount():查詢當(dāng)前線程占有l(wèi)ock鎖的次數(shù) getQueueLength():獲取正在等待此鎖的線程數(shù)
鎖的使用
ReentrantLock
獨(dú)占鎖的實(shí)現(xiàn),擁有上面列舉的除讀寫(xiě)鎖之外的所有特性,使用比較簡(jiǎn)單
class X {// 創(chuàng)建獨(dú)占鎖實(shí)例private final ReentrantLock lock = new ReentrantLock();// ...public void m() {lock.lock(); // block until condition holdstry {// ... method body} finally {// 必須要釋放鎖,unlock與lock成對(duì)出現(xiàn)lock.unlock()}}}
ReentrantReadWriteLock
讀寫(xiě)鎖的實(shí)現(xiàn),擁有上面列舉的所有特性。并且寫(xiě)鎖可降級(jí)為讀鎖,反之不行。
class CachedData {Object data;volatile boolean cacheValid;final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();void processCachedData() {rwl.readLock().lock();if (!cacheValid) {// Must release read lock before acquiring write lockrwl.readLock().unlock();rwl.writeLock().lock();try {// Recheck state because another thread might have// acquired write lock and changed state before we did.if (!cacheValid) {data = ...cacheValid = true;}// Downgrade by acquiring read lock before releasing write lockrwl.readLock().lock();} finally {rwl.writeLock().unlock(); // Unlock write, still hold read}}try {use(data);} finally {rwl.readLock().unlock();}}}
StampedLock
StampedLock也是一種讀寫(xiě)鎖,提供兩種讀模式:樂(lè)觀讀和悲觀讀。樂(lè)觀讀允許讀的過(guò)程中也可以獲取寫(xiě)鎖后寫(xiě)入!這樣一來(lái),我們讀的數(shù)據(jù)就可能不一致,所以,需要一點(diǎn)額外的代碼來(lái)判斷讀的過(guò)程中是否有寫(xiě)入。
樂(lè)觀鎖的意思就是樂(lè)觀地估計(jì)讀的過(guò)程中大概率不會(huì)有寫(xiě)入,因此被稱為樂(lè)觀鎖。反過(guò)來(lái),悲觀鎖則是讀的過(guò)程中拒絕有寫(xiě)入,也就是寫(xiě)入必須等待。顯然樂(lè)觀鎖的并發(fā)效率更高,但一旦有小概率的寫(xiě)入導(dǎo)致讀取的數(shù)據(jù)不一致,需要能檢測(cè)出來(lái),再讀一遍就行。
public class Point {private final StampedLock stampedLock = new StampedLock();private double x;private double y;public void move(double deltaX, double deltaY) {long stamp = stampedLock.writeLock(); // 獲取寫(xiě)鎖try {x += deltaX;y += deltaY;} finally {stampedLock.unlockWrite(stamp); // 釋放寫(xiě)鎖}}public double distanceFromOrigin() {long stamp = stampedLock.tryOptimisticRead(); // 獲得一個(gè)樂(lè)觀讀鎖// 注意下面兩行代碼不是原子操作// 假設(shè)x,y = (100,200)double currentX = x;// 此處已讀取到x=100,但x,y可能被寫(xiě)線程修改為(300,400)double currentY = y;// 此處已讀取到y(tǒng),如果沒(méi)有寫(xiě)入,讀取是正確的(100,200)// 如果有寫(xiě)入,讀取是錯(cuò)誤的(100,400)if (!stampedLock.validate(stamp)) { // 檢查樂(lè)觀讀鎖后是否有其他寫(xiě)鎖發(fā)生stamp = stampedLock.readLock(); // 獲取一個(gè)悲觀讀鎖try {currentX = x;currentY = y;} finally {stampedLock.unlockRead(stamp); // 釋放悲觀讀鎖}}return Math.sqrt(currentX * currentX + currentY * currentY);}}
Condition
Condition成為條件隊(duì)列或條件變量,為一個(gè)線程掛起執(zhí)行(等待)提供了一種方法,直到另一線程通知某些狀態(tài)條件現(xiàn)在可能為真為止。由于對(duì)該共享狀態(tài)信息的訪問(wèn)發(fā)生在不同的線程中,因此必須由互斥鎖對(duì)其其進(jìn)行保護(hù)。
await方法:必須在獲取鎖之后的調(diào)用,表示釋放當(dāng)前鎖,阻塞當(dāng)前線程;等待其他線程調(diào)用鎖的signal或signalAll方法,線程喚醒重新獲取鎖。
Lock配合Condition,可以實(shí)現(xiàn)synchronized 與 對(duì)象(wait,notify)同樣的效果,來(lái)進(jìn)行線程間基于共享變量的通信。但優(yōu)勢(shì)在于同一個(gè)鎖可以由多個(gè)條件隊(duì)列,當(dāng)某個(gè)條件滿足時(shí),只需要喚醒對(duì)應(yīng)的條件隊(duì)列即可,避免無(wú)效的競(jìng)爭(zhēng)。
// 此類(lèi)實(shí)現(xiàn)類(lèi)似阻塞隊(duì)列(ArrayBlockingQueue)class BoundedBuffer {final Lock lock = new ReentrantLock();final Condition notFull = lock.newCondition();final Condition notEmpty = lock.newCondition();final Object[] items = new Object[100];int putptr, takeptr, count;public void put(Object x) throws InterruptedException {lock.lock();try {while (count == items.length)notFull.await();items[putptr] = x;if (++putptr == items.length) putptr = 0;++count;notEmpty.signal();} finally {lock.unlock();}}public Object take() throws InterruptedException {lock.lock();try {while (count == 0)notEmpty.await();Object x = items[takeptr];if (++takeptr == items.length) takeptr = 0;--count;notFull.signal();return x;} finally {lock.unlock();}}}
BlockingQueue
BlockingQueue阻塞隊(duì)列實(shí)際上是一個(gè)生產(chǎn)者/消費(fèi)者模型,當(dāng)隊(duì)列長(zhǎng)度大于指定的最大值,生產(chǎn)線程就會(huì)被阻塞;反之當(dāng)隊(duì)列元素為空時(shí),消費(fèi)線程就會(huì)被阻塞;同時(shí)當(dāng)消費(fèi)成功時(shí),就會(huì)喚醒阻塞的生產(chǎn)者線程;生產(chǎn)成功就會(huì)喚醒消費(fèi)者線程;
內(nèi)部使用就是ReentrantLock + Condition來(lái)實(shí)現(xiàn)的,可以參照上面的示例。
CountDownLatch
稱之為倒計(jì)時(shí)器鎖,初始化指定數(shù)值,調(diào)用countDown可以對(duì)數(shù)值減一,當(dāng)數(shù)值減為0時(shí),就會(huì)喚醒所有因?yàn)檎{(diào)用await方法而阻塞的線程。
可以達(dá)到一組線程等待另外一組線程都完成任務(wù)的效果。
class Driver { // ...void main() throws InterruptedException {CountDownLatch startSignal = new CountDownLatch(1);CountDownLatch doneSignal = new CountDownLatch(N);for (int i = 0; i < N; ++i) // create and start threadsnew Thread(new Worker(startSignal, doneSignal)).start();doSomethingElse(); // don't let run yetstartSignal.countDown(); // let all threads proceeddoSomethingElse();doneSignal.await(); // wait for all to finish}}class Worker implements Runnable {private final CountDownLatch startSignal;private final CountDownLatch doneSignal;Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {this.startSignal = startSignal;this.doneSignal = doneSignal;}public void run() {try {startSignal.await();doWork();doneSignal.countDown();} catch (InterruptedException ex) {} // return;}void doWork() { ... }}
CyclicBarrier
稱之為同步屏障,它使得一組線程互相等待,直到到達(dá)某個(gè)公共屏障點(diǎn)。
初始化指定數(shù)值,調(diào)用await方法會(huì)使得線程阻塞,直到指定數(shù)量的線程都調(diào)用await方法時(shí),所有被阻塞的線程會(huì)被喚醒,繼續(xù)執(zhí)行。
與CountDownLatch的區(qū)別是,CountDownLatch是一組線程等待另外一組線程,而CyclicBarrier是一組線程之間相互等待。
Semaphore
稱之為信號(hào)量,與互斥鎖ReentrantLock用法類(lèi)似,區(qū)別就是Semaphore共享的資源是多個(gè),允許多個(gè)線程同時(shí)競(jìng)爭(zhēng)成功。
AQS原理
AQS 是 AbstractQueuedSynchronizer的縮寫(xiě),中文 抽象隊(duì)列同步器,是構(gòu)建各類(lèi)鎖和同步器的基礎(chǔ)實(shí)現(xiàn)。內(nèi)部維護(hù)了共享變量state (int類(lèi)型) 和 雙向隊(duì)列 (包含頭指針和尾指針)

并發(fā)問(wèn)題解決
原子性
Unsafe.compareAndSwapXXX 實(shí)現(xiàn)CAS更改 state 和 隊(duì)列指針 內(nèi)部依賴CPU提供的原子指令
可見(jiàn)性與有序性
volatile 修飾 state 與 隊(duì)列指針 (prev/next/head/tail)
線程阻塞與喚醒
Unsafe.park Unsafe.parkNanos Unsafe.unpark
Unsafe類(lèi)是在sun.misc包下,不屬于Java標(biāo)準(zhǔn)。提供了內(nèi)存管理、對(duì)象實(shí)例化、數(shù)組操作、CAS操作、線程掛起與恢復(fù)等功能,Unsafe類(lèi)提升了Java運(yùn)行效率,增強(qiáng)了Java語(yǔ)言底層的操作能力。很多Java的基礎(chǔ)類(lèi)庫(kù),包括一些被廣泛使用的高性能開(kāi)發(fā)庫(kù)都是基于Unsafe類(lèi)開(kāi)發(fā)的,比如Netty、Cassandra、Hadoop、Kafka等
AQS內(nèi)部有兩種模式:獨(dú)占模式和共享模式
AQS 的設(shè)計(jì)是基于模板方法的,使用者需要繼承 AQS 并重寫(xiě)指定的方法。不同的自定義同步器爭(zhēng)用共享資源的方式不同,比如可重入、公平性等都是子類(lèi)來(lái)實(shí)現(xiàn)。
自定義同步器在實(shí)現(xiàn)時(shí)只需要實(shí)現(xiàn)共享資源state的獲取與釋放方式即可,至于具體線程等待隊(duì)列的維護(hù)(如獲取資源失敗入隊(duì)/喚醒出隊(duì)等),由AQS內(nèi)部處理。
獨(dú)占模式
只有一個(gè)線程都能夠獲取到鎖
鎖釋放后需要喚醒后繼節(jié)點(diǎn)
搜索公眾號(hào)后端架構(gòu)師后臺(tái)回復(fù)“架構(gòu)整潔”,獲取一份驚喜禮包。
AQS提供的獨(dú)占模式相關(guān)的方法
// 獲取獨(dú)占鎖(線程阻塞直至獲取成功)public final void acquire(int)// 獲取獨(dú)占鎖,可被中斷public final void acquireInterruptibly(int)// 獲取獨(dú)占鎖,可被中斷 和 指定超時(shí)時(shí)間public final boolean tryAcquireNanos(int, long)// 釋放獨(dú)占鎖(釋放鎖后,將等待隊(duì)列中第一個(gè)等待節(jié)點(diǎn)喚醒 )public final boolean release(int)
AQS子類(lèi)需要實(shí)現(xiàn)的獨(dú)占模式相關(guān)的方法
// 嘗試獲取獨(dú)占鎖protected boolean tryAcquire(int)// 嘗試釋放獨(dú)占鎖protected boolean tryRelease(int)
獲取獨(dú)占鎖的流程


調(diào)用子類(lèi)tryAcquire嘗試獲取鎖,獲取成功,直接返回 通過(guò)自旋CAS將當(dāng)前線程封裝成節(jié)點(diǎn)加入隊(duì)列末尾 循環(huán)等待或嘗試tryAcquire獲取鎖 判斷前置節(jié)點(diǎn)如果為head,則嘗試獲取鎖 根據(jù)隊(duì)列中節(jié)點(diǎn)狀態(tài),決定是否需要阻塞當(dāng)前線程 tryAcquire獲取鎖成功后,將當(dāng)前節(jié)點(diǎn)設(shè)置為head 并 返回 如果當(dāng)前線程中斷或超時(shí),則執(zhí)行cancelAcquire 將當(dāng)前節(jié)點(diǎn)狀態(tài)置為CANCELED,并從隊(duì)列刪除 如果前置節(jié)點(diǎn)為Head,則將后置節(jié)點(diǎn)喚醒
釋放獨(dú)占鎖的流程

共享模式
多個(gè)線程都能夠獲取到鎖
鎖釋放后需要喚醒后繼節(jié)點(diǎn)
鎖獲取后如果還有資源需要喚醒后繼共享節(jié)點(diǎn)
AQS提供的共享模式相關(guān)的方法
// 獲取共享鎖(線程阻塞直至獲取成功)public final void acquireShared(int)// 獲取共享鎖,可被中斷public final acquireSharedInterruptibly(int)// 獲取共享鎖,可被中斷 和 指定超時(shí)時(shí)間public final tryAcquireSharedNanos(int, long)// 獲取共享鎖public final boolean releaseShared(int)
AQS子類(lèi)需要實(shí)現(xiàn)的共享模式相關(guān)的方法
// 嘗試獲取共享鎖protected int tryAcquireShared(int)// 嘗試釋放共享鎖protected boolean tryReleaseShared(int)
獲取共享鎖的流程

1.調(diào)用子類(lèi)tryAcquireShared嘗試獲取鎖,獲取成功,直接返回
2.通過(guò)自旋CAS將當(dāng)前線程封裝成節(jié)點(diǎn)加入隊(duì)列末尾
3.循環(huán)等待或嘗試tryAcquireShared獲取鎖
判斷前置節(jié)點(diǎn)如果為head,則嘗試獲取鎖 根據(jù)隊(duì)列中節(jié)點(diǎn)狀態(tài),決定是否需要阻塞當(dāng)前線程 tryAcquireShared獲取鎖成功后,將當(dāng)前節(jié)點(diǎn)設(shè)置為head 如果資源有剩余或者原先的head節(jié)點(diǎn)狀態(tài)為SIGNAL/PROPAGATE,則調(diào)用doReleaseShared 如果當(dāng)前head節(jié)點(diǎn)狀態(tài)為SIGNAL,喚醒后繼節(jié)點(diǎn) 如果當(dāng)前head節(jié)點(diǎn)狀態(tài)為ZERO,將head節(jié)點(diǎn)狀態(tài)置為PROPAGATE 如果當(dāng)前線程中斷或超時(shí),則執(zhí)行cancelAcquire 將當(dāng)前節(jié)點(diǎn)狀態(tài)置為CANCELED,并從隊(duì)列刪除 如果前置節(jié)點(diǎn)為Head,則將后置節(jié)點(diǎn)喚醒
釋放共享鎖的流程

等待隊(duì)列中節(jié)點(diǎn)的狀態(tài)變化

ReentrantLock示例
tryAcquire邏輯

tryRelease邏輯

最近給大家找了 百萬(wàn)級(jí)電商
資源,怎么領(lǐng)???
掃二維碼,加我微信,回復(fù):百萬(wàn)級(jí)電商
注意,不要亂回復(fù) 沒(méi)錯(cuò),不是機(jī)器人 記得一定要等待,等待才有好東西
