Java并發(fā)之AQS詳解
靚仔靚女們好,我們又見面了,我是java小杰要加油,現(xiàn)就職于京東,致力于分享java相關(guān)知識,包括但不限于并發(fā)、多線程、鎖、mysql以及京東面試真題
AQS介紹
AQS全稱是AbstractQueuedSynchronizer,是一個(gè)抽象隊(duì)列同步器,JUC并發(fā)包中的大部分的并發(fā)工具類,都是基于AQS實(shí)現(xiàn)的,所以理解了AQS就算是四舍五入掌握了JUC了(好一個(gè)四舍五入學(xué)習(xí)法)那么AQS到底有什么神奇之處呢?有什么特點(diǎn)呢?讓我們今天就來拔光它,一探究竟!
state:代表被搶占的鎖的狀態(tài) 隊(duì)列:沒有搶到鎖的線程會包裝成一個(gè)node節(jié)點(diǎn)存放到一個(gè)雙向鏈表中
AQS大概長這樣,如圖所示:

你說我隨便畫的,我可不是隨便畫的啊,我是有bear而來,來看下AQS基本屬性的代碼

那么這個(gè)Node節(jié)點(diǎn)又包含什么呢?來吧,展示。

那么我們就可以把這個(gè)隊(duì)列變的更具體一點(diǎn)

怎么突然出來個(gè)exclusiveOwnerThread?還是保存當(dāng)前獲得鎖的線程,哪里來的呢
還記得我們AQS一開始繼承了一個(gè)類嗎

這個(gè)exclusiveOwnerThread就是它里面的屬性

再次回顧總結(jié)一下,AQS屬性如下:
state:代表被搶占的鎖的狀態(tài) exclusiveOwnerThread:當(dāng)前獲得鎖的線程 隊(duì)列:沒有搶到鎖的線程會包裝成一個(gè)node節(jié)點(diǎn)存放到一個(gè)雙向鏈表中 thread: 當(dāng)前node節(jié)點(diǎn)包裝的線程 waitStatus:當(dāng)前節(jié)點(diǎn)的狀態(tài) pre: 當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn) next: 當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn) nextWaiter:表示當(dāng)前節(jié)點(diǎn)對鎖的模式,獨(dú)占鎖的話就是null,共享鎖為Node() Node節(jié)點(diǎn) :
好了,我們對AQS大概是什么東西什么結(jié)構(gòu)長什么樣子有了個(gè)清楚的認(rèn)知,下面我們直接上硬菜,從源碼角度分析下,AQS加鎖,它這個(gè)結(jié)構(gòu)到底是怎么變化的呢?
注:以下分析的都是獨(dú)占模式下的加鎖
獨(dú)占模式 : 鎖只允許一個(gè)線程獲得 NODE.EXCLUSIVE 共享模式 :鎖允許多個(gè)線程獲得 NODE.SHARED
AQS加鎖源碼——acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
乍一看這是什么啊,沒關(guān)系,我們可以把它畫成流程圖方便我們理解,流程圖如下

下面我們來一個(gè)一個(gè)分析,圖文并茂,來吧寶貝兒。
AQS加鎖源碼——tryAcquire
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
這是什么情況?怎么直接拋出了異常?其實(shí)這是由AQS子類重寫的方法,就類似lock鎖,由子類定義嘗試獲取鎖的具體邏輯
我們平常使用lock鎖時(shí)往往如下 (若不想看lock鎖怎么實(shí)現(xiàn)的可以直接跳轉(zhuǎn)到下一節(jié))
ReentrantLock lock = new ReentrantLock();
lock.lock();
try{
//todo
}finally {
lock.unlock();
}
我們看下lock.lock()源碼
public void lock() {
sync.lock();
}
這個(gè)sync又是什么呢,我們來看下lock類的總體屬性就好了

所以我們來看下 默認(rèn)非公平鎖的加鎖實(shí)現(xiàn)
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
//將state狀態(tài)從0設(shè)為1 CAS方式
if (compareAndSetState(0, 1))
//如果設(shè)定成功的話,則將當(dāng)前線程(就是自己)設(shè)為占有鎖的線程
setExclusiveOwnerThread(Thread.currentThread());
else
//設(shè)置失敗的話,就當(dāng)前線程沒有搶到鎖,然后進(jìn)行AQS父類的這個(gè)方法
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
//調(diào)用非公平鎖的方法
return nonfairTryAcquire(acquires);
}
}
現(xiàn)在壓力又來到了nonfairTryAcquire(acquires)這里
final boolean nonfairTryAcquire(int acquires) {
//獲得當(dāng)前線程
final Thread current = Thread.currentThread();
//獲得當(dāng)前鎖的狀態(tài)
int c = getState();
//如果鎖的狀態(tài)是0的話,就表明還沒有線程獲取到這個(gè)鎖
if (c == 0) {
//進(jìn)行CAS操作,將鎖的狀態(tài)改為acquires,因?yàn)槭强芍厝腈i,所以這個(gè)數(shù)字可能是>0的數(shù)字
if (compareAndSetState(0, acquires)) {
//將當(dāng)前持有鎖的線程設(shè)為自己
setExclusiveOwnerThread(current);
//返回 獲取鎖成功
return true;
}
}// 如果當(dāng)前鎖的狀態(tài)不是0,判斷當(dāng)前獲取鎖的線程是不是自己,如果是的話
else if (current == getExclusiveOwnerThread()) {
//則重入數(shù)加acquires (這里acquires是1) 1->2 3->4 這樣
int nextc = c + acquires;
if (nextc < 0) // overflow 異常檢測
throw new Error("Maximum lock count exceeded");
//將鎖的狀態(tài)設(shè)為當(dāng)前值
setState(nextc);
//返回獲取鎖成功
return true;
}
//當(dāng)前獲取鎖的線程不是自己,獲取鎖失敗,返回
return false;
}
由此可見,回到剛才的問題,AQS中的tryAcquire是由子類實(shí)現(xiàn)具體邏輯的
AQS加鎖源碼——addWaiter
如果我們獲取鎖失敗的話,就要把當(dāng)前線程包裝成一個(gè)Node節(jié)點(diǎn),那么具體是怎么包裝的呢,也需要化妝師經(jīng)紀(jì)人嗎?我們來看下源碼就知道了addWaiter(Node.EXCLUSIVE), arg) 這就代表添加的是獨(dú)占模式的節(jié)點(diǎn)
private Node addWaiter(Node mode) {
//將當(dāng)前線程包裝成一個(gè)Node節(jié)點(diǎn)
Node node = new Node(Thread.currentThread(), mode);
// 聲明一個(gè)pred指針指向尾節(jié)點(diǎn)
Node pred = tail;
//尾節(jié)點(diǎn)不為空
if (pred != null) {
//將當(dāng)前節(jié)點(diǎn)的前置指針指向pred
node.prev = pred;
//CAS操作將當(dāng)前節(jié)點(diǎn)設(shè)為尾節(jié)點(diǎn),tail指向當(dāng)前節(jié)點(diǎn)
if (compareAndSetTail(pred, node)) {
//pred下一節(jié)點(diǎn)指針指向當(dāng)前節(jié)點(diǎn)
pred.next = node;
//返回當(dāng)前節(jié)點(diǎn) (此時(shí)當(dāng)前節(jié)點(diǎn)就已經(jīng)是尾節(jié)點(diǎn))
return node;
}
}
//如果尾節(jié)點(diǎn)為空或者CAS操作失敗
enq(node);
return node;
}
其中node的構(gòu)造函數(shù)是這樣的
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
我們可以通過圖解的方法來更直觀的來看下addWaiter做了什么


由圖可知,如果曾經(jīng)尾節(jié)點(diǎn)不為空的時(shí)候,node節(jié)點(diǎn)會加入到隊(duì)列末尾,那么如果曾經(jīng)尾節(jié)點(diǎn)為空或者CAS失敗調(diào)用enq(node);會怎么樣呢?
AQS加鎖源碼——enq
private Node enq(final Node node) {
//死循環(huán),直到有返回值
for (;;) {
//聲明一個(gè)t的指針指向tail
Node t = tail;
//如果尾巴節(jié)點(diǎn)為空
if (t == null) { // Must initialize
//則CAS設(shè)置一個(gè)節(jié)點(diǎn)為頭節(jié)點(diǎn)(頭節(jié)點(diǎn)并沒有包裝線程!)這也是延遲初始化頭節(jié)點(diǎn)
if (compareAndSetHead(new Node()))
//將尾指針指向頭節(jié)點(diǎn)
tail = head;
} else { //如果尾節(jié)點(diǎn)不為空,則說明這是CAS失敗
// 將node節(jié)點(diǎn)前驅(qū)節(jié)點(diǎn)指向t
node.prev = t;
//繼續(xù)CAS操作將自己設(shè)為尾節(jié)點(diǎn)
if (compareAndSetTail(t, node)) {
//將t的next指針指向自己 (此時(shí)自己真的是尾節(jié)點(diǎn)了)
t.next = node;
//返回自己節(jié)點(diǎn)的前置節(jié)點(diǎn),隊(duì)列的倒數(shù)第二個(gè)
return t;
}
}
}
}
隊(duì)列中的頭節(jié)點(diǎn),是延遲初始化的,加鎖時(shí)用到的時(shí)候才去輸出話,并不是一開始就有這個(gè)頭節(jié)點(diǎn)的 頭節(jié)點(diǎn)并不保存任何線程
end 尾分叉
// 將node節(jié)點(diǎn)前驅(qū)節(jié)點(diǎn)指向t
node.prev = t; 1
//繼續(xù)CAS操作將自己設(shè)為尾節(jié)點(diǎn)
if (compareAndSetTail(t, node)) { 2
//將t的next指針指向自己 (此時(shí)自己真的是尾節(jié)點(diǎn)了)
t.next = node; 3
//返回自己節(jié)點(diǎn)的前置節(jié)點(diǎn),隊(duì)列的倒數(shù)第二個(gè)
return t;
}
我們注意到,enq函數(shù)有上面三行代碼,3是在2執(zhí)行成功后才會執(zhí)行的,由于我們這個(gè)代碼無時(shí)無刻都在并發(fā)執(zhí)行,存在一種可能就是
1執(zhí)行成功,2執(zhí)行失敗(cas并發(fā)操作),3沒有執(zhí)行,所以就只有一個(gè)線程1,2,3都執(zhí)行成功,其他線程1執(zhí)行成功,2,3沒有執(zhí)行成功,出現(xiàn)尾分叉情況,如圖所示

這些分叉失敗的節(jié)點(diǎn),在以后的循環(huán)中他們還會執(zhí)行1,直總會指向新的尾節(jié)點(diǎn),1,2,3這么執(zhí)行,早晚會入隊(duì)
AQS加鎖源碼——acquireQueued
final boolean acquireQueued(final Node node, int arg) {
// 是否有異常發(fā)生
boolean failed = true;
try {
//中斷標(biāo)志
boolean interrupted = false;
//開始自旋,要么當(dāng)前線程嘗試去拿到鎖,要么拋出異常
for (;;) {
//獲得當(dāng)前節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)
final Node p = node.predecessor();
//判斷這個(gè)節(jié)點(diǎn)是不是頭節(jié)點(diǎn),如果是頭節(jié)點(diǎn)則獲取鎖
if (p == head && tryAcquire(arg)) {
//將此節(jié)點(diǎn)設(shè)為頭節(jié)點(diǎn)
setHead(node);
//原來的頭節(jié)點(diǎn)出隊(duì)
p.next = null; // help GC
failed = false;
//返回是否中斷
return interrupted;
}
// 說明p不是頭節(jié)點(diǎn)
// 或者
// p是頭節(jié)點(diǎn)但是獲取鎖失敗
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 中斷標(biāo)志設(shè)為true
interrupted = true;
}
} finally {
//如果有異常發(fā)生的話
if (failed)
//取消當(dāng)前線程競爭鎖,將當(dāng)前node節(jié)點(diǎn)狀態(tài)設(shè)置為cancel
cancelAcquire(node);
}
}
其中有一行代碼是setHead(node);
private void setHead(Node node) {
head = node;
node.thread = null; //將head節(jié)點(diǎn)的線程置為空
node.prev = null;
}
為什么要將頭節(jié)點(diǎn)的線程置為空呢,是因?yàn)樵? tryAcquire(arg)中就已經(jīng)記錄了當(dāng)前獲取鎖的線程了,在記錄就多此一舉了,我們看前文中提到的nonfairTryAcquire(acquires)其中有一段代碼
if (compareAndSetState(0, acquires)) {
//將當(dāng)前持有鎖的線程設(shè)為自己
setExclusiveOwnerThread(current);
//返回 獲取鎖成功
return true;
}
可見setExclusiveOwnerThread(current);就已經(jīng)記錄了獲得鎖的線程了
我們acquireQueued返回值是中斷標(biāo)志,true表示中斷過,false表示沒有中斷過,還記得我們一開始嗎,回到最初的起點(diǎn)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
如果返回了true,代表此線程有中斷過,那么調(diào)用selfInterrupt();方法,將當(dāng)前線程中斷一下
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
AQS加鎖源碼——shouldParkAfterFailedAcquire
程序運(yùn)行到這里就說明
// 說明p不是頭節(jié)點(diǎn)
// 或者
// p是頭節(jié)點(diǎn)但是獲取鎖失敗
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 中斷標(biāo)志設(shè)為true
interrupted = true;
}
我們來分析下shouldParkAfterFailedAcquire(p, node)的源碼里面到底做了什么?
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//獲取當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)的狀態(tài)
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//如果是SIGNAL(-1)狀態(tài)直接返回true,代表此節(jié)點(diǎn)可以掛起
//因?yàn)榍爸霉?jié)點(diǎn)狀態(tài)為SIGNAL在適當(dāng)狀態(tài) 會喚醒后繼節(jié)點(diǎn)
return true;
if (ws > 0) {
//如果是cancelled
do {
//則從后往前依此跳過cancelled狀態(tài)的節(jié)點(diǎn)
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//將找到的符合標(biāo)準(zhǔn)的節(jié)點(diǎn)的后置節(jié)點(diǎn)指向當(dāng)前節(jié)點(diǎn)
pred.next = node;
} else {
//否則將前置節(jié)點(diǎn)等待狀態(tài)設(shè)置為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
其中的node.prev = pred = pred.prev;可以看成
pred = pred.prev;
node.prev = pred;

可見一頓操作后,隊(duì)列中跳過了節(jié)點(diǎn)狀態(tài)為cancelled的節(jié)點(diǎn)
AQS加鎖源碼——parkAndCheckInterrupt
當(dāng)shouldParkAfterFailedAcquire返回true時(shí)就代表允許當(dāng)前線程掛起然后就執(zhí)行 parkAndCheckInterrupt()這個(gè)函數(shù)
private final boolean parkAndCheckInterrupt() {
// 掛起當(dāng)前線程 線程卡在這里不再下執(zhí)行,直到unpark喚醒
LockSupport.park(this);
return Thread.interrupted();
}
所以當(dāng)前線程就被掛起啦
AQS加鎖源碼——cancelAcquire
我們還記得前文中提到acquireQueued中的一段代碼
try {
} finally {
if (failed)
cancelAcquire(node);
}
這是拋出異常時(shí)處理節(jié)點(diǎn)的代碼,下面來看下源代碼
private void cancelAcquire(Node node) {
//過濾掉無效節(jié)點(diǎn)
if (node == null)
return;
//當(dāng)前節(jié)點(diǎn)線程置為空
node.thread = null;
//獲取當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)
Node pred = node.prev;
//跳過取消的節(jié)點(diǎn)
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
//記錄過濾后的節(jié)點(diǎn)的后置節(jié)點(diǎn)
Node predNext = pred.next;
//將當(dāng)前節(jié)點(diǎn)狀態(tài)改為CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果當(dāng)前節(jié)點(diǎn)是tail尾節(jié)點(diǎn) 則將從后往前找到第一個(gè)非取消狀態(tài)的節(jié)點(diǎn)設(shè)為tail尾節(jié)點(diǎn)
if (node == tail && compareAndSetTail(node, pred)) {
//如果設(shè)置成功,則tail節(jié)點(diǎn)后面的節(jié)點(diǎn)會被設(shè)置為null
compareAndSetNext(pred, predNext, null);
} else {
int ws;
//如果當(dāng)前節(jié)點(diǎn)不是首節(jié)點(diǎn)的后置節(jié)點(diǎn)
if (pred != head && //并且
//如果前置節(jié)點(diǎn)的狀態(tài)是SIGNAL
((ws = pred.waitStatus) == Node.SIGNAL || //或者
//狀態(tài)小于0 并且設(shè)置狀態(tài)為SIGNAL成功
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
//并且前置節(jié)點(diǎn)線程不為null時(shí)
pred.thread != null) {
//記錄下當(dāng)前節(jié)點(diǎn)的后置節(jié)點(diǎn)
Node next = node.next;
//如果后置節(jié)點(diǎn)不為空 并且后置節(jié)點(diǎn)的狀態(tài)小于0
if (next != null && next.waitStatus <= 0)
//把當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)的后繼指針指向當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)
compareAndSetNext(pred, predNext, next);
} else {
//喚醒當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)
unparkSuccessor(node);
}
//將當(dāng)前節(jié)點(diǎn)下一節(jié)點(diǎn)指向自己
node.next = node; // help GC
}
}
看起來太復(fù)雜了,不過沒關(guān)系,我們可以拆開看,其中有這一段代碼
//當(dāng)前節(jié)點(diǎn)線程置為空
node.thread = null;
//獲取當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)
Node pred = node.prev;
//跳過取消的節(jié)點(diǎn)
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
//記錄過濾后的節(jié)點(diǎn)的后置節(jié)點(diǎn)
Node predNext = pred.next;
//將當(dāng)前節(jié)點(diǎn)狀態(tài)改為CANCELLED
node.waitStatus = Node.CANCELLED;
如圖所示

通過while循環(huán)從后往前找到signal狀態(tài)的節(jié)點(diǎn),跳過中間cancelled狀態(tài)的節(jié)點(diǎn),同時(shí)將當(dāng)前節(jié)點(diǎn)狀態(tài)改為CANCELLED
我們可以把這復(fù)雜的判斷條件轉(zhuǎn)換成圖來直觀的看一下
當(dāng)前節(jié)點(diǎn)是尾節(jié)點(diǎn)時(shí),隊(duì)列變成這樣

當(dāng)前節(jié)點(diǎn)是head后繼節(jié)點(diǎn)

當(dāng)前節(jié)點(diǎn)不是尾節(jié)點(diǎn)也不是頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)(隊(duì)列中的某個(gè)普通節(jié)點(diǎn))

總結(jié)
太不容易了家人們,終于到了這里,我們再來總結(jié)一下整體的流程
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1.基于AQS實(shí)現(xiàn)的子類去實(shí)現(xiàn)tryAcquire嘗試獲取鎖
2.如果獲取鎖失敗,則把當(dāng)前節(jié)點(diǎn)通過addWaiter方法包裝成node節(jié)點(diǎn)插入隊(duì)列
如果尾節(jié)點(diǎn)為空或者CAS操作失敗則調(diào)用 enq方法保證成功插入到隊(duì)列,若節(jié)點(diǎn)為空則初始化頭節(jié)點(diǎn)
3.acquireQueued方法,入隊(duì)后的節(jié)點(diǎn)繼續(xù)獲取鎖(此節(jié)點(diǎn)的前置節(jié)點(diǎn)是頭節(jié)點(diǎn))或者掛起
shouldParkAfterFailedAcquire判斷節(jié)點(diǎn)是否應(yīng)該掛起如果當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)是signal狀態(tài),則返回true,可以掛起 如果當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)是cancelled,則隊(duì)列會從當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)開始從后向前遍歷跳過cacelled狀態(tài)的節(jié)點(diǎn),將當(dāng)前節(jié)點(diǎn)和非cacelled狀態(tài)的節(jié)點(diǎn)連接起來,返回false,不可以掛起 否則將前置節(jié)點(diǎn)等待狀態(tài)設(shè)置為SIGNAL,返回false,不可以掛起 parkAndCheckInterrupt掛起當(dāng)前線程cancelAcquire將當(dāng)前節(jié)點(diǎn)狀態(tài)改為cancelld
4.selfInterrupt(); 設(shè)置中斷標(biāo)志,將中斷補(bǔ)上
