Java 隊(duì)列同步器 AQS
點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”
優(yōu)質(zhì)文章,第一時(shí)間送達(dá)
作者 | 低吟不作語(yǔ)
來(lái)源 | urlify.cn/B3Qnqe
76套java從入門到精通實(shí)戰(zhàn)課程分享
1、概述
隊(duì)列同步器 AbstractQueuedSynchronize(以下簡(jiǎn)稱同步器),是用來(lái)構(gòu)建鎖(Lock)或者其他同步組件(JUC 并發(fā)包)的基礎(chǔ)框架,它使用了一個(gè) int 成員變量表示同步狀態(tài),通過(guò)內(nèi)置的 FIFO 隊(duì)列來(lái)完成資源獲取線程的排隊(duì)工作
同步器的主要使用方式是繼承,子類通過(guò)繼承同步器并實(shí)現(xiàn)它的抽象方法來(lái)管理同步狀態(tài),子類推薦被定義為自定義同步組件的靜態(tài)內(nèi)部類。同步器自身沒(méi)有實(shí)現(xiàn)任何同步接口,它僅僅是定義了若干同步狀態(tài)的獲取和釋放方法來(lái)供自定義組件使用
一言以蔽之,同步器是實(shí)現(xiàn)鎖(也可以是任意同步組件)的一種方式,它屏蔽了更加底層的一些機(jī)制,使開發(fā)者更易于理解和使用
2、隊(duì)列同步器的接口
同步器的設(shè)計(jì)是基于模板方法模式的,使用者需要繼承隊(duì)列同步器并重寫指定的方法,隨后將同步器組合在自定義同步組件的實(shí)現(xiàn)中,并調(diào)用同步器提供的模板方法,而這些模板方法將會(huì)調(diào)用使用者重寫的方法
1. 訪問(wèn)或修改同步狀態(tài)
重寫同步器指定的方法時(shí),需要使用同步器提供的如下三個(gè)方法來(lái)訪問(wèn)或修改同步狀態(tài):
getState()
獲取當(dāng)前同步狀態(tài)
setState(int newState)
設(shè)置當(dāng)前同步狀態(tài)
compareAndSetState(int expect, int update)
使用 CAS 設(shè)置當(dāng)前狀態(tài),該方法能保證狀態(tài)設(shè)置的原子性
2. 同步器可重寫的方法
| protected boolean tryAcquire(int arg) | 獨(dú)占式獲取同步狀態(tài),實(shí)現(xiàn)該方法需要查詢當(dāng)前狀態(tài),并判斷同步狀態(tài)是否符合預(yù)期,然后再進(jìn)行 CAS 設(shè)置同步狀態(tài) |
| protected boolean tryRelease(int arg) | 獨(dú)占式地釋放同步狀態(tài),等待獲取同步狀態(tài)的線程將有機(jī)會(huì)獲取同步狀態(tài) |
| protected int tryAcquireShared(int arg) | 共享式獲取同步狀態(tài),返回大于等于 0 的值,表示獲取成功,否則獲取失敗 |
| protected boolean tryReleaseShared(int arg) | 共享式釋放同步狀態(tài) |
| protected boolean isHeldExclusively() | 當(dāng)前同步器是否在獨(dú)占模式下被線程占有,一般該方法表示是否被當(dāng)前線程所獨(dú)占 |
3. 同步器提供的模板方法
| void acquire(int arg) | 獨(dú)占式獲取同步狀態(tài),如果當(dāng)前線程獲取同步狀態(tài)成功,則由該方法返回,否則,將會(huì)進(jìn)入同步隊(duì)列等待,該方法將會(huì)調(diào)用重寫的 tryAcquire(int arg) 方法 |
| void acquireInterruptibly(int arg) | 與 acquire(int arg) 相同,但該方法響應(yīng)中斷,當(dāng)前線程未獲取到同步狀態(tài)而進(jìn)入同步隊(duì)列中,如果當(dāng)前線程被中斷,則該方法會(huì)拋出 InterruptedException 并返回 |
| boolean tryAcquireNanos(int arg, long nanos) | 在 acquireInterruptibly(int arg) 的基礎(chǔ)上增加了超時(shí)限制 |
| void acquireShared(int arg) | 共享式的獲取同步狀態(tài),與獨(dú)占式獲取的主要區(qū)別是在同一時(shí)刻可以有多個(gè)線程獲取到同步狀態(tài) |
| void acquireSharedInterruptibly(int arg) | 與 acquireShared(int arg) 相同,該方法響應(yīng)中斷 |
| boolean tryAcquireSharedNanos(int arg, long nanos) | 在 acquireSharedInterruptibly 的基礎(chǔ)上增加了超時(shí)限制 |
| boolean release(int arg) | 獨(dú)占式的釋放同步狀態(tài),該方法會(huì)在釋放同步狀態(tài)之后,將同步隊(duì)列中第一個(gè)節(jié)點(diǎn)包含的線程喚醒 |
| boolean releaseShared(int arg) | 共享式的釋放同步狀態(tài) |
| Collection<Thread> getQueuedThreads() | 獲取等待在同步隊(duì)列上的線程集合 |
4. 示例
下面通過(guò)一個(gè)獨(dú)占鎖的示例來(lái)深入了解一下同步器的工作原理。顧名思義,獨(dú)占鎖就是在同一時(shí)刻只能有一個(gè)線程獲取到鎖,其他獲取鎖的線程只能處于同步隊(duì)列中等待,只有獲取鎖的線程釋放了鎖,后繼的線程才能獲取鎖
public class Mutex implements Lock {
/**
* 自定義同步器
*/
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean isHeldExclusively() {
// 是否處于占用狀態(tài)
return getState() == 1;
}
@Override
public boolean tryAcquire(int acquires) {
// 當(dāng)狀態(tài)為 0 時(shí)獲取鎖
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int releases) {
// 釋放鎖,將狀態(tài)設(shè)置為 0
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
/**
* 返回一個(gè) Condition, 每個(gè) condition 都包含一個(gè) condition 隊(duì)列
*/
Condition newCondition() {
return new ConditionObject();
}
}
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
Mutex 中定義了一個(gè)靜態(tài)內(nèi)部類,該內(nèi)部類繼承了同步器并實(shí)現(xiàn)了獨(dú)占式獲取和釋放同步狀態(tài)。用戶使用 Mutex 時(shí)并不會(huì)直接和內(nèi)部同步器實(shí)現(xiàn)打交道,而是調(diào)用 Mutex 提供的方法,大大降低了實(shí)現(xiàn)一個(gè)可靠自定義組件的門檻
3、隊(duì)列同步器的實(shí)現(xiàn)
1. 同步隊(duì)列
同步器依賴內(nèi)部的同步雙向隊(duì)列來(lái)完成同步狀態(tài)的管理,當(dāng)前線程獲取同步狀態(tài)失敗后,同步器會(huì)將當(dāng)前線程及其等待狀態(tài)等信息構(gòu)造成一個(gè)節(jié)點(diǎn),并加入同步隊(duì)列,同時(shí)阻塞當(dāng)前線程。當(dāng)同步狀態(tài)釋放后,會(huì)把首節(jié)點(diǎn)中的線程喚醒,使其再次嘗試獲取同步狀態(tài)
節(jié)點(diǎn)是構(gòu)成同步隊(duì)列的基礎(chǔ),同步器擁有首節(jié)點(diǎn)(head)和尾結(jié)點(diǎn)(tail),沒(méi)有成功獲取同步狀態(tài)的線程將會(huì)成為節(jié)點(diǎn)并加入該隊(duì)列的尾部
同步隊(duì)列的基本結(jié)構(gòu)如下:

同步器將節(jié)點(diǎn)加入到同步隊(duì)列的過(guò)程如圖所示:

首節(jié)點(diǎn)是獲取同步狀態(tài)成功的節(jié)點(diǎn),首節(jié)點(diǎn)線程在釋放同步狀態(tài)時(shí),會(huì)喚醒后繼節(jié)點(diǎn),而后繼節(jié)點(diǎn)將會(huì)在獲取同步狀態(tài)成功時(shí)將自己設(shè)置為首節(jié)點(diǎn),過(guò)程如下:

設(shè)置首節(jié)點(diǎn)是通過(guò)獲取同步狀態(tài)成功的線程來(lái)完成的,由于只有一個(gè)線程能夠成功獲取同步狀態(tài),因此設(shè)置頭節(jié)點(diǎn)的方法并不需要使用 CAS 來(lái)保證,只需要將首節(jié)點(diǎn)設(shè)置成原首節(jié)點(diǎn)的后繼節(jié)點(diǎn)并斷開原首節(jié)點(diǎn)的 next 引用即可
2. 獨(dú)占式同步狀態(tài)獲取與釋放
通過(guò)調(diào)用同步器的 acquire(int arg) 方法可以獲取同步狀態(tài),該方法對(duì)中斷不敏感,線程獲取同步狀態(tài)失敗則進(jìn)入同步隊(duì)列中,后續(xù)對(duì)線程進(jìn)行中斷操作,線程不會(huì)從同步隊(duì)列中移出
獨(dú)占式同步狀態(tài)獲取流程,也就是 acquire(int arg) 方法調(diào)用流程如圖所示:

如果當(dāng)前線程獲取同步狀態(tài)失敗,就會(huì)生成一個(gè)節(jié)點(diǎn)(獨(dú)占式 Node.EXCLUSIVE,同一時(shí)刻只能有一個(gè)線程成功獲取同步狀態(tài)),并加入到隊(duì)列尾部。一個(gè)隊(duì)列里有很多節(jié)點(diǎn),而只有前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)的節(jié)點(diǎn)才能嘗試獲取同步狀態(tài),原因有兩個(gè):
頭節(jié)點(diǎn)是成功獲取到同步狀態(tài)的節(jié)點(diǎn),而頭節(jié)點(diǎn)的線程釋放了同步狀態(tài)之后,將會(huì)喚醒其后繼節(jié)點(diǎn),后繼節(jié)點(diǎn)的線程被喚醒后需要檢查自己的前驅(qū)節(jié)點(diǎn)是否是頭節(jié)點(diǎn)
維護(hù)同步隊(duì)列的 FIFO 原則
因此,如果隊(duì)列中的非頭節(jié)點(diǎn)線程的前驅(qū)節(jié)點(diǎn)出隊(duì)或者被中斷而從等待狀態(tài)返回,那么其隨后會(huì)檢查自己的前驅(qū)是否為頭節(jié)點(diǎn),如果是則嘗試獲取同步狀態(tài)
當(dāng)前線程獲取同步狀態(tài)并執(zhí)行了相應(yīng)邏輯之后,就需要釋放同步狀態(tài),使得后繼節(jié)點(diǎn)能夠繼續(xù)獲取同步狀態(tài)。通過(guò)調(diào)用同步器的 release(int arg) 方法可以釋放同步狀態(tài),該方法執(zhí)行時(shí),會(huì)喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)線程
3. 共享式同步狀態(tài)獲取與釋放
共享式獲取與獨(dú)占式獲取最主要的區(qū)別在于同一時(shí)刻能否有多個(gè)線程同時(shí)獲取到同步狀態(tài)。以文件的讀寫為例,若一個(gè)程序在對(duì)文件進(jìn)行讀操作,那么這一時(shí)刻對(duì)于該文件的寫操作均被阻塞,而讀操作能夠同時(shí)進(jìn)行。寫操作要求對(duì)資源的獨(dú)占式訪問(wèn),而讀操作可以是共享式訪問(wèn),兩種不同的訪問(wèn)模式在同一時(shí)刻對(duì)文件或資源的訪問(wèn)情況,如下圖所示:

通過(guò)調(diào)用同步器的 acquireShared(int arg) 方法可以共享式地獲取同步狀態(tài),其代碼核心邏輯和 acquire() 差不多,也是判斷當(dāng)前節(jié)點(diǎn)的前驅(qū)是否為頭節(jié)點(diǎn),如果是就嘗試獲取同步狀態(tài)。頭節(jié)點(diǎn)在釋放同步狀態(tài)之后,也會(huì)喚醒后續(xù)處于等待狀態(tài)的節(jié)點(diǎn)
問(wèn)題的關(guān)鍵在于如何做到多個(gè)線程訪問(wèn)同步狀態(tài),因?yàn)榘凑丈厦嫠v的過(guò)程,和獨(dú)占式幾乎沒(méi)有任何區(qū)別。獨(dú)占式與共享式在實(shí)現(xiàn)上的差別其實(shí)僅僅在于:每次頭節(jié)點(diǎn)釋放同步狀態(tài)之后,獨(dú)占式只是把其后繼節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn),而共享式還多了一個(gè)傳播的過(guò)程(筆者能力有限,這一塊沒(méi)搞明白,就不瞎寫了。。)
與獨(dú)占式一樣,共享式獲取也需要釋放同步狀態(tài),通過(guò)調(diào)用 releaseShared(int arg) 方法可以釋放同步狀態(tài),并喚醒后續(xù)處于等待狀態(tài)的節(jié)點(diǎn)
4. 獨(dú)占式超時(shí)獲取同步狀態(tài)
通過(guò)調(diào)用同步器的 doAcquireNanos(int arg, long nanosTimeout) 方法可以超時(shí)獲取同步狀態(tài),即在指定的時(shí)間段內(nèi)獲取同步狀態(tài)
在介紹這個(gè)方法之前,先介紹一下響應(yīng)中斷的同步狀態(tài)獲取過(guò)程。Java5 以后,同步器提供了 acquireInterruptibly(int arg) 方法,這個(gè)方法在等待獲取同步狀態(tài)時(shí),如果當(dāng)前線程被中斷,會(huì)立刻返回,并拋出 InterruptedException
超時(shí)獲取同步狀態(tài)可以視為響應(yīng)中斷獲取同步狀態(tài)的增強(qiáng)版。獨(dú)占式超時(shí)和非獨(dú)占式獲取在流程上非常相似,其主要區(qū)別在于未獲取到同步狀態(tài)時(shí)的處理邏輯。acquire(int arg) 在未獲取到同步狀態(tài)時(shí),會(huì)使當(dāng)前線程一致處于等待狀態(tài),而 doAcquireNanos(int arg, long nanosTimeout) 會(huì)使當(dāng)前線程等待 nanosTimeout 納秒,如果當(dāng)前線程在 nanosTimeout 納秒內(nèi)沒(méi)有獲取同步狀態(tài),將會(huì)從等待邏輯中自動(dòng)返回

4、自定義同步組件
設(shè)計(jì)一個(gè)同步工具:同一時(shí)刻,只能允許至多兩個(gè)線程同時(shí)訪問(wèn),超過(guò)兩個(gè)線程的訪問(wèn)將被阻塞。顯然這是共享式訪問(wèn),主要設(shè)計(jì)思路如下:
重寫 tryAcquireShared(int args) 方法和 tryReleaseShared(int args) 方法
定義初始狀態(tài) status 為 2,當(dāng)一個(gè)線程進(jìn)行獲取,status 減 1,該線程釋放,status 加 1,為 0 時(shí)再有其他線程進(jìn)行獲取,則阻塞
示例代碼如下:
public class TwinsLock implements Lock {
private final Sync sync = new Sync(2);
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("count must large than zero");
}
setState(count);
}
@Override
public int tryAcquireShared(int reduceCount) {
while (true) {
int current = getState();
int newCount = current - reduceCount;
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}
@Override
protected boolean tryReleaseShared(int reduceCount) {
while (true) {
int current = getState();
int newCount = current + reduceCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
Condition newCondition() {
return new ConditionObject();
}
}
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquireShared(1) > 0;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
再編寫一個(gè)測(cè)試來(lái)驗(yàn)證 TwinsLock 是否按預(yù)期工作
public class TwinsLockTest {
public static void main(String[] args) {
final Lock lock = new TwinsLock();
class Worker extends Thread {
@Override
public void run() {
while (true) {
lock.lock();
try {
SleepUtils.second(1);
System.out.println(Thread.currentThread().getName());
SleepUtils.second(1);
} finally {
lock.unlock();
}
}
}
}
for (int i = 0; i < 10; i++) {
Worker worker = new Worker();
worker.setDaemon(true);
worker.start();
}
for (int i = 0; i < 10; i++) {
SleepUtils.second(1);
System.out.println();
}
}
}
運(yùn)行該測(cè)試用例,發(fā)現(xiàn)線程名稱成對(duì)輸出,說(shuō)明同一時(shí)刻只有兩個(gè)線程能夠獲取到鎖
粉絲福利:Java從入門到入土學(xué)習(xí)路線圖
??????

??長(zhǎng)按上方微信二維碼 2 秒
感謝點(diǎn)贊支持下哈 
