Java并發(fā)之AQS原理剖析
點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”
優(yōu)質(zhì)文章,第一時(shí)間送達(dá)
作者 | Yanci丶
來(lái)源 | urlify.cn/IFJ3Mb
概述:
AbstractQueuedSynchronizer,可以稱為抽象隊(duì)列同步器。
AQS有獨(dú)占模式和共享模式兩種:
獨(dú)占模式:
公平鎖:
非公平鎖:
共享模式:
數(shù)據(jù)結(jié)構(gòu):
基本屬性:
/**
* 同步等待隊(duì)列的頭結(jié)點(diǎn)
*/
private transient volatile Node head;
/**
* 同步等待隊(duì)列的尾結(jié)點(diǎn)
*/
private transient volatile Node tail;
/**
* 同步資源狀態(tài)
*/
private volatile int state;
內(nèi)部類:
static final class Node {
/**
* 標(biāo)記節(jié)點(diǎn)為共享模式
*/
static final Node SHARED = new Node();
/**
* 標(biāo)記節(jié)點(diǎn)為獨(dú)占模式
*/
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/**
* CANCELLED: 值為1,表示當(dāng)前的線程被取消
* SIGNAL: 值為-1,表示當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)包含的線程需要運(yùn)行,也就是unpark;
* CONDITION: 值為-2,表示當(dāng)前節(jié)點(diǎn)在等待condition,也就是在condition隊(duì)列中;
* PROPAGATE: 值為-3,表示當(dāng)前場(chǎng)景下后續(xù)的acquireShared能夠得以執(zhí)行;
* 0: 表示當(dāng)前節(jié)點(diǎn)在sync隊(duì)列中,等待著獲取鎖。
* 表示當(dāng)前節(jié)點(diǎn)的狀態(tài)值
*/
volatile int waitStatus;
/**
* 前置節(jié)點(diǎn)
*/
volatile Node prev;
/**
* 后繼節(jié)點(diǎn)
*/
volatile Node next;
/**
* 節(jié)點(diǎn)同步狀態(tài)的線程
*/
volatile Thread thread;
/**
* 存儲(chǔ)condition隊(duì)列中的后繼節(jié)點(diǎn)
*/
Node nextWaiter;
/**
* 是否為共享模式
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 獲取前驅(qū)結(jié)點(diǎn)
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
主要方法解析:

tryAcquire/tryAcquireShared(int arg)
獨(dú)占/共享模式獲取鎖;由子類實(shí)現(xiàn),僅僅獲取鎖,獲取鎖失敗時(shí)不進(jìn)行阻塞排隊(duì)。
tryRelease/tryReleaseShared(int arg)
獨(dú)占/共享模式釋放鎖;由子類實(shí)現(xiàn),僅僅釋放鎖,釋放鎖成功不對(duì)后繼節(jié)點(diǎn)進(jìn)行喚醒操作。
acquire/acquireShared(int arg)
獨(dú)占/共享模式獲取鎖,如果線程被中斷喚醒,會(huì)返回線程中斷狀態(tài),不會(huì)拋異常中止執(zhí)行操作(忽略中斷)。
acquireInterruptibly/acquireSharedInterruptibly(int arg)
獨(dú)占/共享模式獲取鎖,線程如果被中斷喚醒,則拋出InterruptedException異常(中斷即中止)。
tryAcquireNanos/tryAcquireSharedNanos(int arg, long nanosTimeout)
獨(dú)占/共享時(shí)間中斷模式獲取鎖,線程如果被中斷喚醒,則拋出InterruptedException異常(中斷即中止);如果超出等待時(shí)間則返回加鎖失敗。
release/releaseShared(int arg)
獨(dú)占/共享模式釋放鎖。
addWaiter(Node mode)
將給定模式節(jié)點(diǎn)進(jìn)行入隊(duì)操作。
private Node addWaiter(Node mode) {
// 根據(jù)指定模式,新建一個(gè)當(dāng)前節(jié)點(diǎn)的對(duì)象
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
// 將當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)指向之前的尾結(jié)點(diǎn)
node.prev = pred;
// 將當(dāng)前等待的節(jié)點(diǎn)設(shè)置為尾結(jié)點(diǎn)(原子操作)
if (compareAndSetTail(pred, node)) {
// 之前尾結(jié)點(diǎn)的后繼節(jié)點(diǎn)設(shè)置為當(dāng)前等待的節(jié)點(diǎn)
pred.next = node;
return node;
}
}
enq(node);
return node;
}
enq(final Node node)
將節(jié)點(diǎn)設(shè)置為尾結(jié)點(diǎn)。注意這里會(huì)進(jìn)行自旋操作,確保節(jié)點(diǎn)設(shè)置成功。因?yàn)榈却木€程需要被喚醒操作;如果操作失敗,當(dāng)前節(jié)點(diǎn)沒(méi)有與其他節(jié)點(diǎn)沒(méi)有引用指向關(guān)系,一直就不會(huì)被喚醒(除非程序代碼中斷線程)。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 判斷尾結(jié)點(diǎn)是否為空,尾結(jié)點(diǎn)初始值是為空
if (t == null) { // Must initialize
// 尾結(jié)點(diǎn)為空,需要初始化
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 設(shè)置當(dāng)前節(jié)點(diǎn)設(shè)置為尾結(jié)點(diǎn)
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued(final Node node, int arg)
已經(jīng)在隊(duì)列當(dāng)中的節(jié)點(diǎn),準(zhǔn)備阻塞獲取鎖。在阻塞前會(huì)判斷前置節(jié)點(diǎn)是否為頭結(jié)點(diǎn),如果為頭結(jié)點(diǎn);這時(shí)會(huì)嘗試獲取下鎖(因?yàn)檫@時(shí)頭結(jié)點(diǎn)有可能會(huì)釋放鎖)。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)
final Node p = node.predecessor();
// 入隊(duì)前會(huì)先判斷下該節(jié)點(diǎn)的前置節(jié)點(diǎn)是否是頭節(jié)點(diǎn)(此時(shí)頭結(jié)點(diǎn)有可能會(huì)釋放鎖);然后嘗試去搶鎖
// 在非公平鎖場(chǎng)景下有可能會(huì)搶鎖失敗,這時(shí)候會(huì)繼續(xù)往下執(zhí)行 阻塞線程
if (p == head && tryAcquire(arg)) {
//如果搶到鎖,將頭節(jié)點(diǎn)后移(也就是將該節(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn))
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果前置節(jié)點(diǎn)不是頭結(jié)點(diǎn),或者當(dāng)前節(jié)點(diǎn)搶鎖失敗;通過(guò)shouldParkAfterFailedAcquire判斷是否應(yīng)該阻塞
// 當(dāng)前置節(jié)點(diǎn)的狀態(tài)為SIGNAL=-1,才可以安全被parkAndCheckInterrupt阻塞線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 該線程已被中斷
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire(Node pred, Node node)
檢查和更新未能獲取鎖節(jié)點(diǎn)的狀態(tài),返回是否可以被安全阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 獲取前置節(jié)點(diǎn)的狀態(tài)
if (ws == Node.SIGNAL)
/*
* 前置節(jié)點(diǎn)的狀態(tài)waitStatus為SIGNAL=-1,當(dāng)前線程可以安全的阻塞
*/
return true;
if (ws > 0) {
/*
* 如果前置節(jié)點(diǎn)的狀態(tài)waitStatus>0,即waitStatus為CANCELLED=1(無(wú)效節(jié)點(diǎn)),需要從同步狀態(tài)隊(duì)列中取消等待(移除隊(duì)列)
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 將前置狀態(tài)的waitStatus修改為SIGNAL=-1,然后當(dāng)前節(jié)點(diǎn)才可以被安全的阻塞
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt()
阻塞當(dāng)前節(jié)點(diǎn),返回當(dāng)前線程的中斷狀態(tài)。
1 private final boolean parkAndCheckInterrupt() {
2 LockSupport.park(this); //阻塞
3 return Thread.interrupted();
4 }
cancelAcquire(Node node)
取消進(jìn)行的獲取鎖操作,在非忽略中斷模式下,線程被中斷喚醒拋異常時(shí)會(huì)調(diào)用該方法。
// 將當(dāng)前節(jié)點(diǎn)的狀態(tài)設(shè)置為CANCELLED,無(wú)效的節(jié)點(diǎn),同時(shí)移除隊(duì)列
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
hasQueuedPredecessors()
判斷當(dāng)前線程是否應(yīng)該排隊(duì)。
1.第一種結(jié)果——返回true:(1.1和1.2同時(shí)存在,1.2.1和1.2.2有一個(gè)存在)
1.1 h != t為true,說(shuō)明頭結(jié)點(diǎn)和尾結(jié)點(diǎn)不相等,表示隊(duì)列中至少有兩個(gè)不同節(jié)點(diǎn)存在,至少有一點(diǎn)不為null。
1.2 ((s = h.next) == null || s.thread != Thread.currentThread())為true
1.2.1 (s = h.next) == null為true,表示頭結(jié)點(diǎn)之后沒(méi)有后續(xù)節(jié)點(diǎn)。
1.2.2 (s = h.next) == null為false,s.thread != Thread.currentThread()為true
表示頭結(jié)點(diǎn)之后有后續(xù)節(jié)點(diǎn),但是頭節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)不是當(dāng)前線程
2.第二種結(jié)果——返回false,無(wú)需排隊(duì)。(2.1和2.2有一個(gè)存在)
2.1 h != t為false,即h == t;表示h和t同時(shí)為null或者h(yuǎn)和t是同一個(gè)節(jié)點(diǎn),無(wú)后續(xù)節(jié)點(diǎn)。
2.2 h != t為true,((s = h.next) == null || s.thread != Thread.currentThread())為false
表示隊(duì)列中至少有兩個(gè)不同節(jié)點(diǎn)存在,同時(shí)持有鎖的線程為當(dāng)前線程。
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
我是【程序員二胡】,熱愛(ài)技術(shù)分享,信仰終身學(xué)習(xí),愛(ài)運(yùn)動(dòng)旅游,也是一個(gè)萌新up主,我們下期再見(jiàn)!
