萬字長文,從5個方面說清楚AQS 隊列同步器
你知道的越多,不知道的就越多,業(yè)余的像一棵小草!
你來,我們一起精進!你不來,我和你的競爭對手一起精進!
編輯:業(yè)余草
http://yli11.cn/XBmj9
推薦:https://www.xttblog.com/?p=5165
寫在前面
提到并發(fā)編程,當然離不開ReentrantLock。但祥說清楚ReentrantLock,又必須先搞明白AQS,所以,今天我們就來詳細的聊聊這個AQS。
請耐心看,慢慢看,文章比較長~
AbstractQueuedSynchronizer 簡稱 「AQS」,可能我們幾乎不會直接去使用它,但它卻是 JUC 的核心基礎(chǔ)組件,支撐著 java 鎖和同步器的實現(xiàn),例如 ReentrantLock、ReentrantReadWriteLock、CountDownLatch,以及 Semaphore 等。大神 「Doug Lea」 在設(shè)計 JUC 包時希望能夠抽象一個基礎(chǔ)且通用的組件以支撐上層模塊的實現(xiàn),AQS 應(yīng)運而生。
「AQS」 本質(zhì)上是一個 FIFO 的雙向隊列,線程被包裝成結(jié)點的形式,基于自旋機制在隊列中等待獲取資源(這里的資源可以簡單理解為對象鎖)。AQS 在設(shè)計上實現(xiàn)了兩類隊列,即 同步隊列 和 條件隊列 ,其中同步隊列服務(wù)于線程阻塞等待獲取資源,而條件隊列則服務(wù)于線程因某個條件不滿足而進入等待狀態(tài)。條件隊列中的線程實際上已經(jīng)獲取到了資源,但是沒有能夠繼續(xù)執(zhí)行下去的條件,所以被打入條件隊列并釋放持有的資源,以讓渡其它線程執(zhí)行,如果未來某個時刻條件得以滿足,則該線程會被從條件隊列轉(zhuǎn)移到同步隊列,繼續(xù)參與競爭資源,以繼續(xù)向下執(zhí)行。
本文我們主要分析 AQS 的設(shè)計與實現(xiàn),包括 LockSupport 工具類、同步隊列、條件隊列,以及 AQS 資源獲取和釋放的通用過程。
「AQS」 采用模板方法設(shè)計模式,具體獲取資源和釋放資源的過程都交由子類實現(xiàn),對于這些方法的分析將留到后面分析具體子類的文章中再展開。

LockSupport 工具類
LockSupport 工具類是 JUC 的基礎(chǔ)組件,主要作用是用來阻塞和喚醒線程,底層依賴于 Unsafe類實現(xiàn)。LockSupport 主要定義了兩類方法:park 和 unpark,其中 park 方法用于阻塞當前線程,而 unpark 方法用于喚醒處于阻塞狀態(tài)的指定線程。
下面的示例演示了 park和unpark方法的基本使用:
Thread thread = new Thread(() -> {
System.out.println("Thread start: " + Thread.currentThread().getName());
LockSupport.park(); // 阻塞自己
System.out.println("Thread end: " + Thread.currentThread().getName());
});
thread.setName("A");
thread.start();
System.out.println("Main thread sleep 3 second: " + Thread.currentThread().getId());
TimeUnit.SECONDS.sleep(3);
LockSupport.unpark(thread); // 喚醒線程 A
線程 A 在啟動之后調(diào)用了 LockSupport#park方法將自己阻塞,主線程在休息 3 秒之后調(diào)用 LockSupport#unpark方法線程 A 喚醒。運行結(jié)果:
Thread start: A
Main thread sleep 3 second: 1
Thread end: A
LockSupport 針對 park 方法提供了多種實現(xiàn),如下:
public static void park()
public static void park(Object blocker)
public static void parkNanos(long nanos)
public static void parkNanos(Object blocker, long nanos)
public static void parkUntil(long deadline)
public static void parkUntil(Object blocker, long deadline)
由方法命名不難看出,parkNanos 和 parkUntil 都屬于 park 方法的超時版本,區(qū)別在于 parkNanos 方法接收一個納秒單位的時間值,用于指定阻塞的時間長度,例如當設(shè)置 nanos=3000000000 時,線程將阻塞 3 秒后蘇醒,而 parkUntil 方法則接收一個時間戳,參數(shù) deadline 用于指定阻塞的到期時間。
所有的 park 方法都提供了包含 Object blocker 參數(shù)的重載版本,參數(shù) blocker 指代導(dǎo)致當前線程阻塞等待的鎖對象,方便問題排查和系統(tǒng)監(jiān)控,而在 LockSupport 最開始被設(shè)計時卻忽視了這一點,導(dǎo)致在線程 dump 時無法提供阻塞對象的相關(guān)信息,這一點在 java 6 中得以改進。實際開發(fā)中如果使用到了 LockSupport 工具類,推薦使用帶 blocker 參數(shù)的版本。
下面以 LockSupport#park(java.lang.Object) 方法為例來看一下具體的實現(xiàn),如下:
public static void park(Object blocker) {
// 獲取當前線程對象
Thread t = Thread.currentThread();
// 記錄當前線程阻塞等待的鎖對象(設(shè)置線程對象的 parkBlocker 為參數(shù)指定的 blocker 對象)
setBlocker(t, blocker);
// 阻塞線程
UNSAFE.park(false, 0L);
// 線程恢復(fù)運行,清除 parkBlocker 參數(shù)記錄的鎖對象
setBlocker(t, null);
}
具體實現(xiàn)比較簡單,阻塞線程的操作依賴于 Unsafe 類實現(xiàn)。上述方法會調(diào)用 LockSupport#setBlocker方法基于 Unsafe 類將參數(shù)指定的 blocker 對象記錄到當前線程對象的 Thread#parkBlocker字段中,然后進入阻塞狀態(tài),并在被喚醒之后清空對應(yīng)的 Thread#parkBlocker 字段。
當一個線程調(diào)用 park 方法進入阻塞狀態(tài)之后,會在滿足以下 3 個條件之一時從阻塞狀態(tài)中蘇醒:
其它線程調(diào)用 unpark 方法喚醒當前線程。 其它線程中斷了當前線程的阻塞狀態(tài)。 方法 park 因為一些不合邏輯的原因退出。
線程在從 park 方法中返回時并不會攜帶具體的返回原因,調(diào)用者需要自行檢測,例如再次檢查之前調(diào)用 park 方法的條件是否仍然滿足以予以推測。
方法 LockSupport#unpark的實現(xiàn)同樣基于 Unsafe 類實現(xiàn),不同于 park 的多版本實現(xiàn),LockSupport 針對 unpark 方法僅提供了單一實現(xiàn),如下:
public static void unpark(Thread thread) {
if (thread != null) {
UNSAFE.unpark(thread);
}
}
需要注意的一點是,如果事先針對某個線程調(diào)用了 unpark 方法,則該線程繼續(xù)調(diào)用 park 方法并不會進入阻塞狀態(tài),而是會立即返回,并且 park 方法是不可重入的。
同步隊列
同步隊列的作用在于管理競爭資源的線程,當一個線程競爭資源失敗會被記錄到同步隊列的末端,并以自旋的方式循環(huán)檢查能夠成功獲取到資源。AQS 的同步隊列基于 「CLH」(Craig, Landin, and Hagersten) 鎖思想進行設(shè)計和實現(xiàn)。CLH 鎖是一種基于鏈表的可擴展、高性能,且具備公平性的自旋鎖。線程以鏈表結(jié)點的形式進行組織,在等待期間相互獨立的執(zhí)行自旋,并不斷輪詢前驅(qū)結(jié)點的狀態(tài),如果發(fā)現(xiàn)前驅(qū)結(jié)點上的線程釋放了資源則嘗試獲取。
CLH 鎖是 AQS 隊列同步器實現(xiàn)的基礎(chǔ),AQS 以內(nèi)部類 Node 的形式定義了同步隊列結(jié)點,包括下一小節(jié)介紹的條件隊列,同樣以 Node 定義結(jié)點。Node 的字段定義如下:
static final class Node {
/** 模式定義 */
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
/** 線程狀態(tài) */
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/** 線程等待狀態(tài) */
volatile int waitStatus;
/** 前驅(qū)結(jié)點 */
volatile Node prev;
/** 后置結(jié)點 */
volatile Node next;
/** 持有的線程對象 */
volatile Thread thread;
/** 對于獨占模式而言,指向下一個處于 CONDITION 等待狀態(tài)的結(jié)點;對于共享模式而言,則為 SHARED 結(jié)點 */
Node nextWaiter;
// ... 省略方法定義
}
由上述字段定義可以看出,位于 CLH 鏈表中的線程以 2 種模式在等待資源,即 SHARED 和 EXCLUSIVE,其中 SHARED 表示共享模式,而 EXCLUSIVE 表示獨占模式。共享模式與獨占模式的主要區(qū)別在于,同一時刻獨占模式只能有一個線程獲取到資源,而共享模式在同一時刻可以有多個線程獲取到資源。典型的場景就是讀寫鎖,讀操作可以有多個線程同時獲取到讀鎖資源,而寫操作同一時刻只能有一個線程獲取到寫鎖資源,其它線程在嘗試獲取資源時都會被阻塞。
AQS 的 CLH 鎖為處于 CLH 鏈表中的線程定義了 4 種狀態(tài),包括 CANCELLED、SIGNAL、CONDITION,以及 PROPAGATE,并以 Node#waitStatus字段進行記錄。這 4 種狀態(tài)的含義分別為:
CANCELLED:表示當前線程處于取消狀態(tài),一般是因為等待超時或者被中斷,處于取消狀態(tài)的線程不會再參與到競爭中,并一直保持該狀態(tài)。__SIGNAL__:表示當前結(jié)點后繼結(jié)點上的線程正在等待被喚醒,如果當前線程釋放了持有的資源或者被取消,需要喚醒后繼結(jié)點上的線程。CONDITION:表示當前線程正在等待某個條件,當某個線程在調(diào)用了 Condition#signal 方法后,當前結(jié)點將會被從條件隊列轉(zhuǎn)移到同步隊列中,參與競爭資源。PROPAGATE:處于該狀態(tài)的線程在釋放共享資源,或接收到釋放共享資源的信號時需要通知后繼結(jié)點,以防止通知丟失。
一個結(jié)點在被創(chuàng)建時,字段 Node#waitStatus 的初始值為 0,表示結(jié)點上的線程不位于上述任何狀態(tài)。
Node 類在方法定義上除了基本的構(gòu)造方法外,僅定義了 Node#isShared 和 Node#predecessor 兩個方法,其中前者用于返回當前結(jié)點是否以共享模式在等待,后者用于返回當前結(jié)點的前驅(qū)結(jié)點。
介紹完了隊列結(jié)點的定義,那么同步隊列具體如何實現(xiàn)呢?這還需要依賴于 AbstractQueuedSynchronizer 類中的兩個字段定義,即:
private transient volatile Node head;
private transient volatile Node tail;
其中 head 表示同步隊列的頭結(jié)點,而 tail 則表示同步隊列的尾結(jié)點,具體組織形式如下圖:

當調(diào)用 AQS 的 acquire 方法獲取資源時,如果資源不足則當前線程會被封裝成 Node 結(jié)點添加到同步隊列的末端,頭結(jié)點 head 用于記錄當前正在持有資源的線程結(jié)點,而 head 的后繼結(jié)點就是下一個將要被調(diào)度的線程結(jié)點,當 release 方法被調(diào)用時,該結(jié)點上的線程將被喚醒,繼續(xù)獲取資源。
關(guān)于同步隊列結(jié)點入隊列、出隊列的實現(xiàn)先不展開,留到后面分析 AQS 資源獲取與釋放的過程時一并分析。
條件隊列
除了上面介紹的同步隊列,在 AQS 中還定義了一個條件隊列。內(nèi)部類 ConditionObject 實現(xiàn)了條件隊列的組織形式,包含一個起始結(jié)點(firstWaiter)和一個末尾結(jié)點(lastWaiter),并同樣以上面介紹的 Node 類定義結(jié)點,如下:
public class ConditionObject implements Condition, Serializable {
/** 指向條件隊列中的起始結(jié)點 */
private transient Node firstWaiter;
/** 指向條件隊列的末尾結(jié)點 */
private transient Node lastWaiter;
// ... 省略方法定義
}
前面在分析 Node 內(nèi)部類的時候,可以看到 Node 類還定義了一個 Node#nextWaiter字段,用于指向條件隊列中的下一個等待結(jié)點。由此我們可以描繪出條件隊列的組織形式如下:

ConditionObject類實現(xiàn)了 Condition 接口,該接口定義了與 Lock 鎖相關(guān)的線程通信方法,主要分為 await 和 signal 兩大類。
當線程調(diào)用 await 方法時,該線程會被包裝成結(jié)點添加到條件隊列的末端,并釋放持有的資源。當條件得以滿足時,方法 signal 可以將條件隊列中的一個或全部的線程結(jié)點從條件隊列轉(zhuǎn)移到同步隊列以參與競爭資源。應(yīng)用可以創(chuàng)建多個 ConditionObject 對象,每個對象都對應(yīng)一個條件隊列,對于同一個條件隊列而言,其中的線程所等待的條件是相同的。
Condition 接口的定義如下:
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
等待await
下面來分析一下 ConditionObject 類針對 Condition 接口方法的實現(xiàn),首先來看一下 ConditionObject#await方法,該方法用于將當前線程添加到條件隊列中進行等待,同時支持響應(yīng)中斷。方法實現(xiàn)如下:
public final void await() throws InterruptedException {
if (Thread.interrupted()) {
// 立即響應(yīng)中斷
throw new InterruptedException();
}
// 將當前線程添加到等待隊列末尾,等待狀態(tài)為 CONDITION
Node node = this.addConditionWaiter();
// 釋放當前線程持有的資源
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 如果當前結(jié)點位于條件隊列中,則循環(huán)
// 阻塞當前線程
LockSupport.park(this);
// 如果線程在阻塞期間被中斷,則退出循環(huán)
if ((interruptMode = this.checkInterruptWhileWaiting(node)) != 0) {
break;
}
}
// 如果在同步隊列中等待期間被中斷,且之前的中斷狀態(tài)不為 THROW_IE
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
if (node.nextWaiter != null) {
// 清除條件隊列中所有狀態(tài)不為 CONDITION 的結(jié)點
this.unlinkCancelledWaiters();
}
// 如果等待期間被中斷,則響應(yīng)中斷
if (interruptMode != 0) {
this.reportInterruptAfterWait(interruptMode);
}
}
因為 ConditionObject#await方法支持響應(yīng)中斷,所以在方法一開始會先檢查一下當前線程是否被中斷,如果是則拋出 InterruptedException 異常,否則繼續(xù)將當前線程加入到條件隊列中進行等待。整體執(zhí)行流程可以概括為:
將當前線程加入到條件隊列末端,并設(shè)置等待狀態(tài)為 CONDITION; 釋放當前線程所持有的資源,避免饑餓或死鎖; 基于自旋機制在條件隊列中等待,直到被其它線程轉(zhuǎn)移到同步隊列,或者等待期間被中斷; 如果等待期間被中斷,則響應(yīng)中斷。
ConditionObject 定義了兩種中斷響應(yīng)方式,即:REINTERRUPT和 THROW_IE。如果是 REINTERRUPT,則線程會調(diào)用 Thread#interrupt 方法中斷自己;如果是 THROW_IE,則線程會直接拋出 InterruptedException 異常。
下面繼續(xù)分析一下支撐 ConditionObject#await 運行的其它幾個方法,包括 addConditionWaiter、fullyRelease、isOnSyncQueue,以及 unlinkCancelledWaiters。
方法 ConditionObject#addConditionWaiter 用于將當前線程包裝成 Node 結(jié)點對象添加到條件隊列的末端,期間會執(zhí)行清除條件隊列中處于取消狀態(tài)(等待狀態(tài)不為 CONDITION)的線程結(jié)點。方法實現(xiàn)如下:
private Node addConditionWaiter() {
// 獲取條件隊列的末尾結(jié)點
Node t = lastWaiter;
// 如果末尾結(jié)點狀態(tài)不為 CONDITION,表示對應(yīng)的線程已經(jīng)取消了等待,需要執(zhí)行清理操作
if (t != null && t.waitStatus != Node.CONDITION) {
// 清除條件隊列中所有狀態(tài)不為 CONDITION 的結(jié)點
this.unlinkCancelledWaiters();
t = lastWaiter;
}
// 構(gòu)建當前線程對應(yīng)的 Node 結(jié)點,等待狀態(tài)為 CONDITION,并添加到條件隊列末尾
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null) {
firstWaiter = node;
} else {
t.nextWaiter = node;
}
lastWaiter = node;
return node;
}
將當前線程對象添加到條件隊列中的過程本質(zhì)上是一個簡單的鏈表插入操作,在執(zhí)行插入操作之前,上述方法會先對條件隊列執(zhí)行一遍清理操作,清除那些狀態(tài)不為 CONDITION 的結(jié)點。具體實現(xiàn)位于 ConditionObject#unlinkCancelledWaiters方法中:
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null; // 記錄上一個不被刪除的結(jié)點
while (t != null) {
Node next = t.nextWaiter;
// 如果結(jié)點上的線程等待狀態(tài)不為 CONDITION,則刪除對應(yīng)結(jié)點
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null) {
firstWaiter = next;
} else {
trail.nextWaiter = next;
}
if (next == null) {
lastWaiter = trail;
}
} else {
trail = t;
}
t = next;
}
}
方法 AbstractQueuedSynchronizer#fullyRelease 用于釋放當前線程持有的資源,這也是非常容易理解的,畢竟當前線程即將進入等待狀態(tài),如果持有的資源不被釋放,將可能導(dǎo)致程序最終被餓死,或者死鎖。方法的實現(xiàn)如下:
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 獲取當前線程的同步狀態(tài),可以理解為持有的資源數(shù)量
int savedState = this.getState();
// 嘗試釋放當前線程持有的資源
if (this.release(savedState)) {
failed = false;
return savedState;
} else {
// 釋放資源失敗
throw new IllegalMonitorStateException();
}
} finally {
if (failed) {
// 如果釋放資源失敗,則取消當前線程
node.waitStatus = Node.CANCELLED;
}
}
}
如果資源釋放失敗,則上述方法會將當前線程的狀態(tài)設(shè)置為 CANCELLED,以退出等待狀態(tài)。
方法 AbstractQueuedSynchronizer#isOnSyncQueue 用于檢測當前結(jié)點是否位于同步隊列中,方法實現(xiàn)如下:
final boolean isOnSyncQueue(Node node) {
// 如果結(jié)點位于等待隊列,或是頭結(jié)點則返回 false
if (node.waitStatus == Node.CONDITION || node.prev == null) {
return false;
}
// If has successor, it must be on queue
if (node.next != null) {
return true;
}
/*
* node.prev can be non-null, but not yet on queue because the CAS to place it on queue can fail.
* So we have to traverse from tail to make sure it actually made it. It will always be near the tail in calls to this method,
* and unless the CAS failed (which is unlikely), it will be there, so we hardly ever traverse much.
*/
// 從后往前檢測目標結(jié)點是否位于同步隊列中
return this.findNodeFromTail(node);
}
如果一個線程所等待的條件被滿足,則觸發(fā)條件滿足的線程會將等待該條件的一個或全部線程結(jié)點從條件隊列轉(zhuǎn)移到同步隊列,此時,這些線程將從 ConditionObject#await 方法中退出,以參與競爭資源。
方法 ConditionObject#awaitNanos、ConditionObject#awaitUntil 和 ConditionObject#await(long, TimeUnit)在上面介紹的 ConditionObject#await 方法的基礎(chǔ)上引入了超時機制,當一個線程在條件隊列中等待的時間超過設(shè)定值時,線程結(jié)點將被從條件隊列轉(zhuǎn)移到同步隊列,參與競爭資源。其它執(zhí)行過程與 ConditionObject#await 方法相同,故不再展開。
下面來分析一下 ConditionObject#awaitUninterruptibly 方法,由方法命名可以看出該方法相對于 ConditionObject#await 方法的區(qū)別在于在等待期間不響應(yīng)中斷。方法實現(xiàn)如下:
public final void awaitUninterruptibly() {
// 將當前線程添加到等待隊列末尾,等待狀態(tài)為 CONDITION
Node node = this.addConditionWaiter();
// 釋放當前線程持有的資源
int savedState = fullyRelease(node);
boolean interrupted = false;
// 如果當前結(jié)點位于條件隊列中,則循環(huán)
while (!isOnSyncQueue(node)) {
// 阻塞當前線程
LockSupport.park(this);
if (Thread.interrupted()) {
// 標識線程等待期間被中斷,但不立即響應(yīng)
interrupted = true;
}
}
// 自旋獲取資源,返回 true 則說明等待期間被中斷過
if (acquireQueued(node, savedState) || interrupted) {
// 響應(yīng)中斷
selfInterrupt();
}
}
如果線程在等待期間被中斷,則上述方法會用一個字段進行記錄,并在最后集中處理,而不會因為中斷而退出等待狀態(tài)。
通知signal
調(diào)用 await 方法會將線程對象自身加入到條件隊列中進行等待,而 signal 通知方法則用于將一個或全部的等待線程從條件隊列轉(zhuǎn)移到同步隊列,以參與競爭資源。ConditionObject 定義了兩個通知方法:signal 和signalAll,前者用于將條件隊列的頭結(jié)點(也就是等待時間最長的結(jié)點)從條件隊列轉(zhuǎn)移到同步隊列,后者用于將條件隊列中所有處于等待狀態(tài)的結(jié)點從條件隊列轉(zhuǎn)移到同步隊列。下面分別來分析一下這兩個方法的實現(xiàn)。
方法ConditionObject#signal 的實現(xiàn)如下:
public final void signal() {
// 先檢測當前線程是否獲取到了鎖,否則不允許繼續(xù)執(zhí)行
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
// 獲取條件隊列頭結(jié)點,即等待時間最長的結(jié)點
Node first = firstWaiter;
if (first != null) {
// 將頭結(jié)點從條件隊列轉(zhuǎn)移到同步隊列,參與競爭資源
this.doSignal(first);
}
}
調(diào)用ConditionObject#signal方法的線程必須位于臨界區(qū),也就是必須先持有獨占鎖,所以上述方法一開始會對這一條件進行校驗,方法 AbstractQueuedSynchronizer#isHeldExclusively是一個模板方法,交由子類來實現(xiàn)。如果滿足執(zhí)行條件,則上述方法會調(diào)用 ConditionObject#doSignal 方法將條件隊列的頭結(jié)點從條件隊列轉(zhuǎn)移到同步隊列。
private void doSignal(Node first) {
// 從前往后遍歷,直到遇到第一個不為 null 的結(jié)點,并將其從條件隊列轉(zhuǎn)移到同步隊列
do {
if ((firstWaiter = first.nextWaiter) == null) {
lastWaiter = null;
}
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
// AbstractQueuedSynchronizer#transferForSignal
final boolean transferForSignal(Node node) {
// 更新當前結(jié)點的等待狀態(tài):CONDITION -> 0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 更新失敗,說明對應(yīng)的結(jié)點上的線程已經(jīng)被取消
return false;
}
/*
* Splice onto queue and try to set waitStatus of predecessor to indicate that thread is (probably) waiting.
* If cancelled or attempt to set waitStatus fails, wake up to resync (in which case the waitStatus can be transiently and harmlessly wrong).
*/
// 將結(jié)點添加到同步隊列末端,并返回該結(jié)點的前驅(qū)結(jié)點
Node p = this.enq(node);
int ws = p.waitStatus;
// 如果前驅(qū)結(jié)點被取消,或者設(shè)置前驅(qū)結(jié)點的狀態(tài)為 SIGNAL 失敗,則喚醒當前結(jié)點上的線程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) {
LockSupport.unpark(node.thread);
}
return true;
}
方法 ConditionObject#doSignal 會從前往后遍歷條件隊列,尋找第一個不為 null 的結(jié)點,并應(yīng)用 AbstractQueuedSynchronizer#transferForSignal 方法嘗試將其從條件隊列轉(zhuǎn)移到同步隊列。
在入同步隊列之前,方法 AbstractQueuedSynchronizer#transferForSignal 會基于 CAS 機制清除結(jié)點的 CONDITION 狀態(tài),如果清除失敗則說明該結(jié)點上的線程已被取消,此時 ConditionObject#doSignal 方法會繼續(xù)尋找下一個可以被喚醒的結(jié)點。如果清除結(jié)點狀態(tài)成功,則接下來會將該結(jié)點添加到同步隊列的末端,同時依據(jù)前驅(qū)結(jié)點的狀態(tài)決定是否喚醒當前結(jié)點上的線程。
繼續(xù)來看 ConditionObject#signalAll 方法的實現(xiàn),相對于上面介紹的 ConditionObject#signal 方法,該方法的特點在于它會喚醒條件隊列中所有不為 null 的等待結(jié)點。方法實現(xiàn)如下:
public final void signalAll() {
if (!isHeldExclusively()) {
// 先檢測當前線程是否獲取到了鎖,否則不允許繼續(xù)執(zhí)行
throw new IllegalMonitorStateException();
}
// 獲取條件隊列頭結(jié)點
Node first = firstWaiter;
if (first != null) {
// 將所有結(jié)點從條件隊列轉(zhuǎn)移到同步隊列,參與競爭資源
this.doSignalAll(first);
}
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
實際上理解了 ConditionObject#doSignal 的運行機制,再理解 ConditionObject#signalAll 的運行機制也是水到渠成的事情。
資源的獲取與釋放
前面的小節(jié)我們分析了 LockSupport 工具類,以及 AQS 同步隊列和條件隊列的設(shè)計與實現(xiàn),這些都是支撐 AQS 運行的基礎(chǔ)組件,本小節(jié)我們將正式開始分析 AQS 的實現(xiàn)機制。
AQS 對應(yīng)的 AbstractQueuedSynchronizer實現(xiàn)類,在屬性定義上主要包含 4 個字段(如下),其中 exclusiveOwnerThread 由父類 AbstractOwnableSynchronizer 繼承而來,用于記錄當前持有獨占鎖的線程對象,而 head 和 tail 字段分別指向同步隊列的頭結(jié)點和尾結(jié)點:
private transient Thread exclusiveOwnerThread;
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
字段 state 用于描述同步狀態(tài),對于不同的實現(xiàn)類來說具備不同的用途:
對于 ReentrantLock 而言,表示當前線程獲取鎖的重入次數(shù)。 對于 ReentrantReadWriteLock 而言,高 16 位表示獲取讀鎖的重入次數(shù),低 16 位表示獲取寫鎖的重入次數(shù)。 對于 Semaphore 而言,表示當前可用的信號個數(shù)。 對于 CountDownLatch 而言,表示計數(shù)器當前的值。具體細節(jié)我們將在后面分析相應(yīng)組件實現(xiàn)機制的文章中再展開說明。
AbstractQueuedSynchronizer 是一個抽象類,在方法設(shè)計上引入了模板方法設(shè)計模式,下面的代碼塊中列出了所有需要子類依據(jù)自身運行機制針對性實現(xiàn)的模板方法:
protected boolean tryAcquire(int arg)
protected boolean tryRelease(int arg)
protected int tryAcquireShared(int arg)
protected boolean tryReleaseShared(int arg)
protected boolean isHeldExclusively()
這里先簡單說明一下各個方法的作用,具體實現(xiàn)留到后面分析各個基于 AQS 實現(xiàn)組件的文章中再進一步分析:
tryAcquire :嘗試以獨占模式獲取資源,如果獲取成功則返回 true,否則返回 false。 tryRelease :嘗試以獨占模式釋放資源,如果釋放成功則返回 true,否則返回 false。 tryAcquireShared :嘗試以共享模式獲取資源,如果返回正數(shù)則說明獲取成功,且還有可用的剩余資源;如果返回 0 則說明獲取成功,但是沒有可用的剩余資源;如果返回負數(shù)則說明獲取資源失敗。 tryReleaseShared :嘗試以共享模式釋放資源,如果釋放成功則返回 true,否則返回 false。 isHeldExclusively :判斷當前線程是否正在獨占資源,如果是則返回 true,否則返回 false。 AbstractQueuedSynchronizer 中的方法實現(xiàn)按照功能劃分可以分為兩大類,即獲取資源(acquire)和釋放資源(release),同時區(qū)分獨占模式和共享模式。下面的小節(jié)中主要對獲取和釋放資源的方法區(qū)分獨占模式和共享模式進行分析。
獨占獲取資源
針對獨占模式獲取資源,AbstractQueuedSynchronizer 定義了多個版本的 acquire 方法實現(xiàn),包括:acquire、acquireInterruptibly,以及 tryAcquireNanos,其中 acquireInterruptibly是 acquire 的中斷版本,在等待獲取資源期間支持響應(yīng)中斷請求,tryAcquireNanos 除了支持響應(yīng)中斷以外,還引入了超時等待機制。
下面主要分析一下 AbstractQueuedSynchronizer#acquire 的實現(xiàn),理解了該方法的實現(xiàn)機制,也就自然而然理解了另外兩個版本的實現(xiàn)機制。方法 AbstractQueuedSynchronizer#acquire的實現(xiàn)如下:
public final void acquire(int arg) {
if (!this.tryAcquire(arg) // 嘗試獲取資源
// 如果獲取資源失敗,則將當前線程加入到同步隊列的末端(獨占模式),并基于自旋機制等待獲取資源
&& this.acquireQueued(this.addWaiter(Node.EXCLUSIVE), arg)) {
// 等待獲取資源期間曾被中斷過,在獲取資源成功之后再響應(yīng)中斷
selfInterrupt();
}
}
方法 AbstractQueuedSynchronizer#tryAcquire 的功能在前面已經(jīng)簡單介紹過了,用于嘗試獲取資源,如果獲取資源失敗則會將當前線程添加到同步隊列中,基于自旋機制等待獲取資源。
方法AbstractQueuedSynchronizer#addWaiter 用于將當前線程對象封裝成結(jié)點添加到同步隊列末端,并最終返回線程結(jié)點對象:
private Node addWaiter(Node mode) {
// 為當前線程創(chuàng)建結(jié)點對象
Node node = new Node(Thread.currentThread(), mode);
// 基于 CAS 機制嘗試快速添加結(jié)點到同步隊列末端
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (this.compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 快速添加失敗,繼續(xù)嘗試將該結(jié)點添加到同步隊列末端,如果同步隊列未被初始化則執(zhí)行初始化
this.enq(node);
// 返回當前線程對應(yīng)的結(jié)點對象
return node;
}
上述方法在添加結(jié)點的時候,如果同步隊列已經(jīng)存在,則嘗試基于 CAS 操作快速將當前結(jié)點添加到同步隊列末端。如果添加失敗,或者隊列不存在,則需要再次調(diào)用 AbstractQueuedSynchronizer#enq方法執(zhí)行添加操作,該方法在判斷隊列不存在時會初始化同步隊列,然后基于 CAS 機制嘗試往同步隊列末端插入線程結(jié)點。方法實現(xiàn)如下:
private Node enq(final Node node) {
for (; ; ) {
// 獲取同步隊列末尾結(jié)點
Node t = tail;
// 如果結(jié)點不存在,則初始化
if (t == null) { // Must initialize
if (this.compareAndSetHead(new Node())) {
tail = head;
}
} else {
// 往末尾追加
node.prev = t;
if (this.compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
完成了結(jié)點的入同步隊列操作,接下來會調(diào)用 AbstractQueuedSynchronizer#acquireQueued方法基于自旋機制等待獲取資源,在等待期間并不會響應(yīng)中斷,而是記錄中斷標志,等待獲取資源成功后延遲響應(yīng)。方法實現(xiàn)如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 標記自旋過程中是否被中斷
// 基于自旋機制等待獲取資源
for (; ; ) {
// 獲取前驅(qū)結(jié)點
final Node p = node.predecessor();
// 如果前驅(qū)結(jié)點為頭結(jié)點,說明當前結(jié)點是排在同步隊列最前面,可以嘗試獲取資源
if (p == head && this.tryAcquire(arg)) {
// 獲取資源成功,更新頭結(jié)點
this.setHead(node); // 頭結(jié)點一般記錄持有資源的線程結(jié)點
p.next = null; // help GC
failed = false;
return interrupted; // 自旋過程中是否被中斷
}
// 如果還未輪到當前結(jié)點,或者獲取資源失敗
if (shouldParkAfterFailedAcquire(p, node) // 判斷是否需要阻塞當前線程
&& this.parkAndCheckInterrupt()) { // 如果需要,則進入阻塞狀態(tài),并在蘇醒時檢查中斷狀態(tài)
// 標識等待期間被中斷
interrupted = true;
}
}
} finally {
// 嘗試獲取資源失敗,說明執(zhí)行異常,取消當前結(jié)點獲取資源的進程
if (failed) {
this.cancelAcquire(node);
}
}
}
上述方法會循環(huán)檢測當前結(jié)點是否已經(jīng)排在同步隊列的最前端,如果是則調(diào)用 AbstractQueuedSynchronizer#tryAcquire方法嘗試獲取資源,具體獲取資源的過程由子類實現(xiàn)。自旋期間如果還未輪到調(diào)度當前線程結(jié)點,或者嘗試獲取資源失敗,則會調(diào)用 AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire方法檢測是否需要阻塞當前線程,具體判定的過程依賴于前驅(qū)結(jié)點的等待狀態(tài),實現(xiàn)如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取前驅(qū)結(jié)點狀態(tài)
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) {
// 前驅(qū)結(jié)點狀態(tài)為 SIGNAL,說明當前結(jié)點需要被阻塞
return true;
}
if (ws > 0) {
// 前驅(qū)結(jié)點處于取消狀態(tài),則一直往前尋找處于等待狀態(tài)的結(jié)點,并排在其后面
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 前驅(qū)結(jié)點的狀態(tài)為 0 或 PROPAGATE,但是當前結(jié)點需要一個被喚醒的信號,
* 所以基于 CAS 將前驅(qū)結(jié)點等待狀態(tài)設(shè)置為 SIGNAL,在阻塞之前,調(diào)用者需要重試以再次確認不能獲取到資源。
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
上述方法首先會獲取前驅(qū)結(jié)點的等待狀態(tài),并依據(jù)具體的狀態(tài)值進行決策:
如果前驅(qū)結(jié)點等待狀態(tài)為 SIGNAL,則說明當前結(jié)點需要被阻塞,所以直接返回 true;否則,如果前驅(qū)結(jié)點的等待狀態(tài)大于 0(即處于取消狀態(tài)),則一直往前尋找未被取消的結(jié)點,并將當前結(jié)點排在其后,這種情況下直接返回 false,再次嘗試獲取一次資源;否則,前驅(qū)結(jié)點的狀態(tài)為 0 或 PROPAGATE(不可能為 CONDITION 狀態(tài),因為當前處于同步隊列),因為當前結(jié)點需要一個喚醒信號,所以修改前驅(qū)結(jié)點的狀態(tài)為 SIGNAL,這種情況下同樣返回 false,以再次確認不能獲取到資源。如果上述檢查返回 true,則接下來會調(diào)用 AbstractQueuedSynchronizer#parkAndCheckInterrupt 方法,基于 LockSupport 工具阻塞當前線程,并在線程蘇醒時檢查中斷狀態(tài)。如果期間被中斷過則記錄中斷標記,而不立即響應(yīng),直到成功獲取到資源,或者期間發(fā)生異常退出自旋。方法 AbstractQueuedSynchronizer#acquireQueued最終會返回這一中斷標記,并在外圍進行響應(yīng)。
如果在自旋期間發(fā)生異常,則上述方法會執(zhí)行 AbstractQueuedSynchronizer#cancelAcquire 以取消當前結(jié)點等待獲取資源的進程,包括設(shè)置結(jié)點的等待狀態(tài)為 CANCELLED,喚醒后繼結(jié)點等。
獨占釋放資源
針對獨占模式釋放資源,AbstractQueuedSynchronizer 定義了單一實現(xiàn),即 AbstractQueuedSynchronizer#release方法,該方法本質(zhì)上是一個調(diào)度的過程,具體釋放資源的操作交由 tryRelease 方法完成,由子類實現(xiàn)。
方法AbstractQueuedSynchronizer#release實現(xiàn)如下:
public final boolean release(int arg) {
// 嘗試釋放資源
if (this.tryRelease(arg)) {
Node h = head;
// 如果釋放資源成功,則嘗試喚醒后繼結(jié)點
if (h != null && h.waitStatus != 0) {
this.unparkSuccessor(h);
}
return true;
}
return false;
}
如果 tryRelease 釋放資源成功,則上述方法會嘗試喚醒同步隊列中由后往前距離頭結(jié)點最近的一個結(jié)點上的線程。方法 AbstractQueuedSynchronizer#unparkSuccessor 的實現(xiàn)如下:
private void unparkSuccessor(Node node) {
// 獲取當前結(jié)點狀態(tài)
int ws = node.waitStatus;
if (ws < 0) {
// 如果當前結(jié)點未被取消,則基于 CAS 更新結(jié)點等待狀態(tài)為 0
compareAndSetWaitStatus(node, ws, 0);
}
/*
* Thread to unpark is held in successor, which is normally just the next node.
* But if cancelled or apparently null, traverse backwards from tail to find the actual non-cancelled successor.
*/
Node s = node.next; // 獲取后繼結(jié)點
// 如果后繼結(jié)點為 null,或者被取消
if (s == null || s.waitStatus > 0) {
s = null;
// 從后往前尋找距離當前結(jié)點最近的一個未被取消的線程結(jié)點
for (Node t = tail; t != null && t != node; t = t.prev) {
if (t.waitStatus <= 0) {
s = t;
}
}
}
// 喚醒結(jié)點上的線程
if (s != null) {
LockSupport.unpark(s.thread);
}
}
選舉待喚醒線程結(jié)點的過程被設(shè)計成從后往前遍歷,尋找距離當前結(jié)點最近的未被取消的結(jié)點,并調(diào)用 LockSupport 工具類喚醒結(jié)點上的線程。
那 為什么要設(shè)計成從后往前遍歷同步隊列呢 ?在 Doug Lea 大神的論文 The java.util.concurrent Synchronizer Framework 中給出了答案,摘錄如下:
An AbstractQueuedSynchronizer queue node contains a next link to its successor. But because there are no applicable techniques for lock-free atomic insertion of double-linked listnodes using compareAndSet, this link is not atomically set as part of insertion; it is simply assigned: pred.next = node; after the i
nsertion. This is reflected in all usages. The next link is treated only as an optimized path.
If a node’s successor does not appear to exist (or appears to be cancelled) via its next field, it is a
lways possible to start at the tail of the list and traverse backwards using the pred field to accurately check if therereally is one.
也就說對于雙向鏈表而言,沒有不加鎖的原子手段可以保證構(gòu)造雙向指針的線程安全性?;氐酱a中,我們回顧一下往同步隊列中添加結(jié)點的執(zhí)行過程,如下(其中 pred 是末尾結(jié)點,而 node 是待插入的結(jié)點):
node.prev = pred;
if (this.compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
上述方法會將 node 結(jié)點的 prev 指針指向 pred 結(jié)點,而將 pred 的 next 指針指向 node 結(jié)點的過程需要建立在基于 CAS 成功將 node 設(shè)置為末端結(jié)點的基礎(chǔ)之上,如果這一過程失敗則 next 指針將會斷掉,而選擇從后往前遍歷則始終能夠保證遍歷到頭結(jié)點。
共享獲取資源 針對共享模式獲取資源,AbstractQueuedSynchronizer 同樣定義了多個版本的 acquire 方法實現(xiàn),包括:acquireShared、acquireSharedInterruptibly,以及 tryAcquireSharedNanos,其中 acquireSharedInterruptibly 是 acquireShared 的中斷版本,在等待獲取資源期間支持響應(yīng)中斷請求,tryAcquireSharedNanos 除了支持響應(yīng)中斷以外,還引入了超時等待機制。下面同樣主要分析一下 AbstractQueuedSynchronizer#acquireShared 的實現(xiàn),理解了該方法的實現(xiàn)機制,也就自然而然理解了另外兩個版本的實現(xiàn)機制。
方法 AbstractQueuedSynchronizer#acquireShared 的實現(xiàn)如下:
public final void acquireShared(int arg) {
// 返回負數(shù)表示獲取資源失敗
if (this.tryAcquireShared(arg) < 0) {
// 將當前線程添加到條件隊列,基于自旋等待獲取資源
this.doAcquireShared(arg);
}
}
private void doAcquireShared(int arg) {
// 將當前線程加入條件隊列末端,并標記為共享模式
final Node node = this.addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false; // 標記自旋過程中是否被中斷
for (; ; ) {
// 獲取前驅(qū)結(jié)點
final Node p = node.predecessor();
// 如果前驅(qū)結(jié)點為頭結(jié)點,說明當前結(jié)點是排在同步隊列最前面,可以嘗試獲取資源
if (p == head) {
// 嘗試獲取資源
int r = this.tryAcquireShared(arg);
if (r >= 0) {
// 獲取資源成功,設(shè)置自己為頭結(jié)點,并嘗試喚醒后繼結(jié)點
this.setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted) {
selfInterrupt();
}
failed = false;
return;
}
}
// 如果還未輪到當前結(jié)點,或者獲取資源失敗
if (shouldParkAfterFailedAcquire(p, node) // 判斷是否需要阻塞當前線程
&& this.parkAndCheckInterrupt()) { // 如果需要,則進入阻塞狀態(tài),并在蘇醒時檢查中斷狀態(tài)
// 標識等待期間被中斷
interrupted = true;
}
}
} finally {
// 嘗試獲取資源失敗,說明執(zhí)行異常,取消當前結(jié)點獲取資源的進程
if (failed) {
this.cancelAcquire(node);
}
}
}
上述方法與 AbstractQueuedSynchronizer#acquire 的實現(xiàn)邏輯大同小異,區(qū)別在于線程在被封裝成結(jié)點之后,是以共享(SHARED)模式在同步隊列中進行等待。這里我們重點關(guān)注一下 AbstractQueuedSynchronizer#setHeadAndPropagate 方法的實現(xiàn),當結(jié)點上的線程成功獲取到資源會觸發(fā)執(zhí)行該方法,以嘗試喚醒后繼結(jié)點。實現(xiàn)如下:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 記錄之前的頭結(jié)點
this.setHead(node); // 頭結(jié)點一般記錄持有資源的線程結(jié)點
/*
* 如果滿足以下條件,嘗試喚醒后繼結(jié)點:
*
* 1. 存在剩余可用的資源;
* 2. 后繼結(jié)點處于等待狀態(tài),或后繼結(jié)點為空
*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause unnecessary wake-ups,
* but only when there are multiple racing acquires/releases, so most need signals now or soon anyway.
*/
if (propagate > 0 // 存在剩余可用的資源
|| h == null || h.waitStatus < 0 // 此時 h 是之前的頭結(jié)點
|| (h = head) == null || h.waitStatus < 0) { // 此時 h 已經(jīng)更新為當前頭結(jié)點
Node s = node.next;
// 如果后繼結(jié)點以共享模式在等待,或者后繼結(jié)點未知,則嘗試喚醒后繼結(jié)點
if (s == null || s.isShared()) {
this.doReleaseShared();
}
}
}
因為當前結(jié)點已經(jīng)獲取到資源,所以需要將當前結(jié)點記錄到頭結(jié)點中。此外,如果滿足以下 2 種情況之一,還需要喚醒后繼結(jié)點:
參數(shù) propagate > 0,即存在可用的剩余資源;前任頭結(jié)點或當前頭結(jié)點不存在,或指明后繼結(jié)點需要被喚醒。如果滿足上述條件之一,且后繼結(jié)點狀態(tài)未知或以共享模式在等待,則調(diào)用 AbstractQueuedSynchronizer#doReleaseShared 方法喚醒后繼結(jié)點,關(guān)于該方法的實現(xiàn)留到下一小節(jié)進行分析。
共享釋放資源
針對共享模式釋放資源,AbstractQueuedSynchronizer 同樣定義了單一實現(xiàn),即 AbstractQueuedSynchronizer#releaseShared 方法,該方法本質(zhì)上也是一個調(diào)度的過程,具體釋放資源的操作交由 tryReleaseShared 方法完成,由子類實現(xiàn)。方法 AbstractQueuedSynchronizer#releaseShared 實現(xiàn)如下:
public final boolean releaseShared(int arg) {
// 嘗試釋放資源
if (this.tryReleaseShared(arg)) {
// 釋放資源成功,喚醒后繼結(jié)點
this.doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other in-progress acquires/releases.
* This proceeds in the usual way of trying to unparkSuccessor of head if it needs signal.
* But if it does not, status is set to PROPAGATE to ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added while we are doing this.
* Also, unlike other uses of unparkSuccessor, we need to know if CAS to reset status fails, if so rechecking.
*/
for (; ; ) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果頭結(jié)點狀態(tài)為 SIGNAL,則在喚醒后繼結(jié)點之前嘗試清除當前結(jié)點的狀態(tài)
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
// loop to recheck cases
continue;
}
// 喚醒后繼結(jié)點
this.unparkSuccessor(h);
}
/*
* 如果后繼結(jié)點暫時不需要被喚醒,則基于 CAS 嘗試將目標結(jié)點的 waitStatus 由 0 修改為 PROPAGATE,
* 以保證后續(xù)由喚醒通知到來時,能夠?qū)⑼ㄖ獋鬟f下去
*/
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
// loop on failed CAS
continue;
}
}
// 如果頭結(jié)點未變更,則說明期間持有鎖的線程未發(fā)生變化,能夠走到這一步說明前面的操作已經(jīng)成功完成
if (h == head) {
break;
}
// 如果頭結(jié)點發(fā)生變更,則說明期間持有鎖的線程發(fā)生了變化,需要重試以保證喚醒動作的成功執(zhí)行
}
}
如果釋放資源成功,需要依據(jù)頭結(jié)點當下等待狀態(tài)分別處理:
如果頭結(jié)點的等待狀態(tài)為 SIGNAL,則表明后繼結(jié)點需要被喚醒,在執(zhí)行喚醒操作之前需要清除等待狀態(tài)。如果頭結(jié)點狀態(tài)為 0,則表示后繼結(jié)點不需要被喚醒,此時需要將結(jié)點狀態(tài)修改為 PROPAGATE,以保證后續(xù)接收到喚醒通知時能夠?qū)⑼ㄖ獋鬟f下去。
總結(jié)
本文我們分析了 AQS 的設(shè)計與實現(xiàn)。理解了 AQS 的運行機制也就理解了 java 的 Lock 鎖是如何實現(xiàn)線程的阻塞、喚醒、等待和通知機制的,所以理解 AQS 也是我們后面分析 Lock 鎖和同步器實現(xiàn)的基礎(chǔ)。
