線程同步輔助工具類
前言
Java 并發(fā)編程是整個 Java 開發(fā)體系中最難以理解但也是最重要的知識點,也是各類開源分布式框架(如 ZooKeeper、Kafka、Spring Cloud、Netty 等)中各個并發(fā)組件實現(xiàn)的基礎。J.U.C 并發(fā)包,即 java.util.concurrent 包,大大提高了并發(fā)性能,是 JDK 的核心工具包,是 JDK 1.5 之后,由 Doug Lea 實現(xiàn)并引入。而 AQS 被認為是 J.U.C 的核心。
AQS 是一個抽象類,并沒有對并發(fā)類提供了一個統(tǒng)一的接口定義,而是由子類根據自身的情況實現(xiàn)相應的方法,AQS 中一般包含兩個方法 acquire(int)、release(int),獲取同步狀態(tài)和釋放同步狀態(tài),AQS 根據其狀態(tài)是否獨占分為獨占模式和共享模式。
獨占模式:同一時刻最多只有一個線程獲取同步狀態(tài),處于該模式下,其他線程試圖獲取該鎖將無法獲取成功。
共享模式:同一時刻會有多個線程獲取共享同步狀態(tài),處于該模式下,其他線程試圖獲取該鎖可能會獲取成功。
同步器根據同步狀態(tài)分為獨占模式和共享模式,獨占模式包括類:ReentrantLock、ReentrantReadWriteLock.WriteLock,共享模式包括:Semaphore、CountDownLatch、ReentrantReadWriteLock.ReadLock,本文將著重介紹一下 java.util.concurrent 包下一些輔助同步器類:CountDownLatch、CyclicBarrier、Semaphore、Exchanger、Phaser。
0x01: CountDownLatch - 閉鎖
簡介
CountDownLatch 是一個同步輔助工具類,通過它可以完成類似于阻塞當前線程的功能,也就是一個或多個線程一直等待直到其他線程執(zhí)行完成。即允許一個或多個線程一直等待,直到其他線程執(zhí)行完后再執(zhí)行。例如,應用程序的主線程希望在負責啟動框架服務的線程已經啟動所有框架服務之后執(zhí)行。
CountDownLatch 用了一個給定的計數器 cnt 來進行初始化,該計數器的操作是原子操作,即同時只能有一個線程操作該計數器,調用該類 await 方法的線程會一直處于阻塞狀態(tài),直到其他線程調用 countDown 方法時計數器的值變成 0,每次調用 countDown 時計數器的值會減 1,當計數器的值為 0 時所有因 await 方法而處于等待狀態(tài)的線程就會繼續(xù)執(zhí)行。計數器 cnt 是閉鎖需要等待的線程數量,只能被設置一次,且 CountDownLatch 沒有提供任何機制去重新設置計數器 count,如果需要重置,可以考慮使用 CyclicBarrier。

CountdownLatch_example
使用場景
(1)開啟多個線程分塊下載一個大文件,每個線程只下載固定的一截,最后由另外一個線程來拼接所有的分段。
(2)應用程序的主線程希望在負責啟動框架服務的線程已經啟動所有的框架服務之后再執(zhí)行。
(3)確保一個計算不會執(zhí)行,直到所需要的資源被初始化。
(4)并行計算,處理量很大時可以將運算任務拆分成多個子任務,當所有子任務都完成之后,父任務再將所有子任務都結果進行匯總。
主要接口分析
CountDownLatch 內部依賴 Sync 實現(xiàn),而 Sync 繼承 AQS。CountDownLatch 關鍵接口如下:
countDown() 如果當前計數器的值大于 1,則將其減 1;若當前值為 1,則將其置為 0 并喚醒所有通過 await 等待的線程;若當前值為 0,則什么也不做直接返回。
await() 等待計數器的值為 0,若計數器的值為 0 則該方法返回;若等待期間該線程被中斷,則拋出 InterruptedException 并清除該線程的中斷狀態(tài)。
await(long timeout, TimeUnit unit) 在指定的時間內等待計數器的值為 0,若在指定時間內計數器的值變?yōu)?0,則該方法返回 true;若指定時間內計數器的值仍未變?yōu)?0,則返回 false;若指定時間內計數器的值變?yōu)?0 之前當前線程被中斷,則拋出 InterruptedException 并清除該線程的中斷狀態(tài)。
getCount() 讀取當前計數器的值,一般用于調試或者測試。
Coding 演示
(1)作為一個開關 / 入口
將初始計數值為 1 的 CountDownLatch 作為一個的開關或入口,在調用 countDown() 的線程打開入口前,所有調用 await 的線程都一直在入口處等待。
public class Driver {
private static final Integer WORK_COUNT = 10;
private static final ExecutorService executorService = Executors.newCachedThreadPool();
public static void main(String[] args) {
// 初始化計數器為 10 的 CountDownLatch
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < WORK_COUNT; i++) {
executorService.execute(new Worker(countDownLatch));
}
// 主線程執(zhí)行
doSomething();
// 主線程開啟開關
countDownLatch.countDown();
// 平滑地關閉 ExecutorService
executorService.shutdown();
}
private static void doSomething() {
// ...
System.out.print("start..");
}
}
class Worker implements Runnable {
private final CountDownLatch countDownLatch;
Worker(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
// 所有執(zhí)行線程在此處等待開關開啟 [多個子線程同時執(zhí)行]
countDownLatch.await();
// 子線程執(zhí)行
doWork();
} catch (InterruptedException ignored) {
}
}
private void doWork() {
// ...
System.out.print("run..");
}
}
// Output
// start..run..run..run..run..run..run..run..run..run..run..
(2)作為一個完成信號
將初始計數值為 N 的 CountDownLatch 作為一個完成信號點,使某個線程在其它 N 個線程完成某項操作之前一直等待。
public class Driver {
private static final Integer WORK_COUNT = 10;
private static final ExecutorService executorService = Executors.newCachedThreadPool();
public static void main(String[] args) throws InterruptedException {
// 初始化計數器為 10 的 CountDownLatch
CountDownLatch countDownLatch = new CountDownLatch(WORK_COUNT);
for (int i = 0; i < WORK_COUNT; i++) {
executorService.execute(new Worker(countDownLatch));
}
// 主線程等待其它 N 個線程完成
countDownLatch.await();
// 主線程執(zhí)行
doSomething();
// 平滑地關閉 ExecutorService
executorService.shutdown();
}
private static void doSomething() {
// ...
System.out.println("end");
}
}
class Worker implements Runnable {
private final CountDownLatch countDownLatch;
Worker(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
// 子線程執(zhí)行
doWork();
// 每個線程做完自己的事情后, 就將計數器減去 1
countDownLatch.countDown();
}
private void doWork() {
// ...
System.out.print("run..");
}
}
// Output
// run..run..run..run..run..run..run..run..run..run..end0x02: CyclicBarrier - 循環(huán)柵欄
簡介
CyclicBarrier 和 CountDownLatch 是非常類似的,CyclicBarrier 核心的概念是在于設置一個等待線程的數量邊界,到達了此邊界之后進行執(zhí)行。CyclicBarrier 也是一個同步輔助工具類,它允許一組線程相互等待直到到達某個公共的屏障點(Common Barrier Point),通過它可以完成多個線程之間相互等待時,只有當每個線程都準備就緒后才能各自繼續(xù)執(zhí)行后面的操作。
CyclicBarrier 也是通過計數器來實現(xiàn),當某個線程調用 await 方法后就進入等待狀態(tài),計數器執(zhí)行加一操作。當計數器的值達到了設置的初始值時等待狀態(tài)的線程會被喚醒繼續(xù)執(zhí)行。通過調用 CyclicBarrier 對象的 await() 方法,兩個線程可以實現(xiàn)互相等待。一旦 N 個線程在等待 CyclicBarrier 達成,所有線程將被釋放掉去繼續(xù)執(zhí)行。由于 CyclicBarrier 在釋放等待線程后可以重用,所以可以稱之為循環(huán)柵欄。

使用場景
CyclicBarrier 特別適用于并行迭代計算,每個線程負責一部分計算,然后在柵欄處等待其他線程完成,所有線程到齊后,交換數據和計算結果,再進行下一次迭代。
主要接口分析
CyclicBarrier 并沒有自己去實現(xiàn) AQS 框架的 API,而是利用了 ReentrantLock 和 Condition。
CyclicBarrier 提供的關鍵方法如下:
await() 等待其它參與方的到來(調用 await())。如果當前調用是最后一個調用,則喚醒所有其它的線程的等待并且如果在構造 CyclicBarrier 時指定了 action,當前線程會去執(zhí)行該 action,然后該方法返回該線程調用 await 的次序(getParties()-1 說明該線程是第一個調用 await 的,0 說明該線程是最后一個執(zhí)行 await 的),接著該線程繼續(xù)執(zhí)行 await 后的代碼;如果該調用不是最后一個調用,則阻塞等待;如果等待過程中,當前線程被中斷,則拋出 InterruptedException;如果等待過程中,其它等待的線程被中斷,或者其它線程等待超時,或者該 barrier 被 reset,或者當前線程在執(zhí)行 barrier 構造時注冊的 action 時因為拋出異常而失敗,則拋出 BrokenBarrierException。
await(long timeout, TimeUnit unit) 與 await() 唯一的不同點在于設置了等待超時時間,等待超時時會拋出 TimeoutException。
reset() 該方法會將該 barrier 重置為它的初始狀態(tài),并使得所有對該 barrier 的 await 調用拋出 BrokenBarrierException。
CyclicBarrier 提供的兩個構造函數:
CyclicBarrier(int parties):parties 表示攔截線程的數量。創(chuàng)建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處于等待狀態(tài)時啟動,但它不會在啟動 barrier 時執(zhí)行預定義的操作。
CyclicBarrier(int parties, Runnable barrierAction) :barrierAction 為 CyclicBarrier 接收的 Runnable 命令,用于在線程到達屏障時,優(yōu)先執(zhí)行 barrierAction ,用于處理更加復雜的業(yè)務場景。創(chuàng)建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處于等待狀態(tài)時啟動,并在啟動 barrier 時執(zhí)行給定的屏障操作,該操作由最后一個進入 barrier 的線程執(zhí)行。
Coding 演示
(1)簡單例子
public class Solver {
private static final Integer WORK_COUNT = 10;
private static final ExecutorService executorService = Executors.newCachedThreadPool();
public static void main(String[] args) {
// 初始化計數器為 10 的 CyclicBarrier
CyclicBarrier cyclicBarrier = new CyclicBarrier(WORK_COUNT);
for (int i = 0; i < WORK_COUNT; i++) {
executorService.execute(new Worker(cyclicBarrier));
}
// 平滑地關閉 ExecutorService
executorService.shutdown();
}
}
class Worker implements Runnable {
private final CyclicBarrier cyclicBarrier;
Worker(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.print("before..");
try {
// 多個線程之間相互等待時,只有當每個線程都準備就緒后才能各自繼續(xù)執(zhí)行后面的操作
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
// 子線程執(zhí)行
doWork();
}
private void doWork() {
// ...
System.out.print("after..");
}
}
// Output
// before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..
(2)執(zhí)行 barrierAction
在 ready 狀態(tài)時日志是每秒輸出一條,當有 5 條 ready 時會一次性輸出 5 條 continue。這就是前面講的全部線程準備就緒后同時開始執(zhí)行。在初始化 CyclicBarrier 時還可以在等待線程數后指定一個 runnable,含義是當線程到達這個屏障時優(yōu)先執(zhí)行這里的 runnable。
public class Solver {
private static final Integer WORK_COUNT = 10;
private static final ExecutorService executorService = Executors.newCachedThreadPool();
public static void main(String[] args) throws InterruptedException {
// 初始化計數器為 5 的 CyclicBarrier
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println(String.format("%s call back is ready.", Thread.currentThread().getName())));
for (int i = 0; i < WORK_COUNT; i++) {
Thread.sleep(1000);
executorService.execute(new Worker(cyclicBarrier));
}
// 平滑地關閉 ExecutorService
executorService.shutdown();
}
}
class Worker implements Runnable {
private final CyclicBarrier cyclicBarrier;
Worker(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
System.out.println(String.format("%s is ready", Thread.currentThread().getName()));
cyclicBarrier.await();
System.out.println(String.format("%s continue", Thread.currentThread().getName()));
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
// Output
// pool-1-thread-1 is ready
// pool-1-thread-2 is ready
// pool-1-thread-3 is ready
// pool-1-thread-4 is ready
// pool-1-thread-5 is ready
// pool-1-thread-5 call back is ready.
// pool-1-thread-5 continue
// pool-1-thread-3 continue
// pool-1-thread-4 continue
// pool-1-thread-1 continue
// pool-1-thread-2 continue
// pool-1-thread-5 is ready
// pool-1-thread-1 is ready
// pool-1-thread-4 is ready
// pool-1-thread-2 is ready
// pool-1-thread-3 is ready
// pool-1-thread-3 call back is ready.
// pool-1-thread-3 continue
// pool-1-thread-5 continue
// pool-1-thread-4 continue
// pool-1-thread-2 continue
// pool-1-thread-1 continue
CyclicBarrier 與 CountDownLatch 區(qū)別
CyclicBarrier 與 CountDownLatch 可能容易混淆,我們強調下其區(qū)別:
CountDownLatch 的參與線程是有不同角色的,有的負責倒計時,有的在等待倒計時變?yōu)?0,負責倒計時和等待倒計時的線程都可以有多個,它用于不同角色線程間的同步。
CyclicBarrier 的參與線程角色是一樣的,用于同一角色線程間的協(xié)調一致。
CountDownLatch 是一次性的,而 CyclicBarrier 是可以重復利用的。
0x03: Semaphore - 信號量
簡介
Semaphore,又名信號量,這個類的作用有點類似于 “許可證”。信號量 Semaphore 是一個控制訪問多個共享資源的計數器,和 CountDownLatch 一樣,其本質上是一個 “共享鎖”。從源碼角度來看,Semaphore 的實現(xiàn)方式和 CountDownLatch 非常相似,基于 AQS 做了一些定制。通過維持 AQS 的鎖全局計數 state 字段來實現(xiàn)定量鎖的加鎖和解鎖操作。Semaphore 通常用于限制可以訪問某些資源(物理或邏輯的)的線程數目。
有時,我們因為一些原因需要控制同時訪問共享資源的最大線程數量,比如出于系統(tǒng)性能的考慮需要限流,或者共享資源是稀缺資源,我們需要有一種辦法能夠協(xié)調各個線程,以保證合理的使用公共資源。當有線程想要訪問共享資源時,需要先獲取 (acquire) 的許可;如果許可不夠了,線程需要一直等待,直到許可可用。當線程使用完共享資源后,可以歸還 (release) 許可,以供其它需要的線程使用;然而,實際上并沒有真實的許可證對象供線程使用,Semaphore 只是對可用的數量進行管理維護。

使用場景
Semaphore 可以用于做流量控制,特別公用資源有限的應用場景,比如數據庫連接。
主要接口分析
Semaphore 內部包含公平鎖(FairSync)和非公平鎖(NonfairSync),繼承內部類 Sync,其中 Sync 繼承 AQS,作為 Semaphore 的公平鎖和非公平鎖的基類。
CyclicBarrier 提供的關鍵方法如下:
isFair():是否公平模式 FIFO
availablePermits():獲取當前可用的許可證數量
acquire():當前線程嘗試去阻塞的獲取 1 個許可證。此過程是阻塞的,它會一直等待許可證,直到發(fā)生以下任意一件事:當前線程獲取了 1 個可用的許可證,則會停止等待,繼續(xù)執(zhí)行;當前線程被中斷,則會拋出 InterruptedException 異常,并停止等待,繼續(xù)執(zhí)行。
acquire(permits):當前線程嘗試去阻塞的獲取 permits 個許可證。此過程是阻塞的,它會一直等待許可證,直到發(fā)生以下任意一件事:當前線程獲取了 n 個可用的許可證,則會停止等待,繼續(xù)執(zhí)行;當前線程被中斷,則會拋出 InterruptedException 異常,并停止等待,繼續(xù)執(zhí)行。
acquierUninterruptibly():當前線程嘗試去阻塞的獲取 1 個許可證 (不可中斷的)。此過程是阻塞的,它會一直等待許可證,直到發(fā)生以下任意一件事:當前線程獲取了 1 個可用的許可證,則會停止等待,繼續(xù)執(zhí)行。
acquireUninterruptibly(permits):當前線程嘗試去阻塞的獲取 permits 個許可證。此過程是阻塞的,它會一直等待許可證,直到發(fā)生以下任意一件事:當前線程獲取了 n 個可用的許可證,則會停止等待,繼續(xù)執(zhí)行。
tryAcquire():當前線程嘗試去獲取 1 個許可證。此過程是非阻塞的,它只是在方法調用時進行一次嘗試。如果當前線程獲取了 1 個可用的許可證,則會停止等待,繼續(xù)執(zhí)行,并返回 true。如果當前線程沒有獲得這個許可證,也會停止等待,繼續(xù)執(zhí)行,并返回 false。
tryAcquire(permits):當前線程嘗試去獲取 permits 個許可證。此過程是非阻塞的,它只是在方法調用時進行一次嘗試。如果當前線程獲取了 permits 個可用的許可證,則會停止等待,繼續(xù)執(zhí)行,并返回 true。如果當前線程沒有獲得 permits 個許可證,也會停止等待,繼續(xù)執(zhí)行,并返回 false。
tryAcquire(timeout, TimeUnit):當前線程在限定時間內,阻塞的嘗試去獲取 1 個許可證。此過程是阻塞的,它會一直等待許可證,直到發(fā)生以下任意一件事:當前線程獲取了可用的許可證,則會停止等待,繼續(xù)執(zhí)行,并返回 true;當前線程等待時間 timeout 超時,則會停止等待,繼續(xù)執(zhí)行,并返回 false;當前線程在 timeout 時間內被中斷,則會拋出 InterruptedException 一次,并停止等待,繼續(xù)執(zhí)行。
tryAcquire(permits, timeout, TimeUnit):當前線程在限定時間內,阻塞的嘗試去獲取 permits 個許可證。此過程是阻塞的,它會一直等待許可證,直到發(fā)生以下任意一件事:當前線程獲取了可用的 permits 個許可證,則會停止等待,繼續(xù)執(zhí)行,并返回 true;當前線程等待時間 timeout 超時,則會停止等待,繼續(xù)執(zhí)行,并返回 false;當前線程在 timeout 時間內被中斷,則會拋出 InterruptedException 一次,并停止等待,繼續(xù)執(zhí)行。
release():當前線程釋放 1 個可用的許可證。
release(permits):當前線程釋放 permits 個可用的許可證。
drainPermits():當前線程獲得剩余的所有可用許可證。
hasQueuedThreads():判斷當前 Semaphore 對象上是否存在正在等待許可證的線程。
getQueueLength():獲取當前 Semaphore 對象上是正在等待許可證的線程數量。
Semaphore 提供了兩個構造函數:
Semaphore(int permits):創(chuàng)建具有給定的許可數和非公平的公平設置的 Semaphore,Semaphore 默認選擇非公平鎖。
Semaphore(int permits, boolean fair):創(chuàng)建具有給定的許可數和給定的公平設置的 Semaphore。Semaphore 有兩種模式,公平模式和非公平模式。公平模式就是調用 acquire 的順序就是獲取許可證的順序,遵循 FIFO;而非公平模式是搶占式的,也就是有可能一個新的獲取線程恰好在一個許可證釋放時得到了這個許可證,而前面還有等待的線程,簡單的說就是隨機選取新線程來運行。
Coding 演示
public class SemaphoreExample {
public static void main(String[] args) {
final int clientCount = 3;
final int totalRequestCount = 10;
Semaphore semaphore = new Semaphore(clientCount);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalRequestCount; i++) {
executorService.execute(()->{
try {
semaphore.acquire();
System.out.print(semaphore.availablePermits() + " ");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
});
}
executorService.shutdown();
}
}
// Output
// 2 1 2 2 2 2 2 1 2 20x04: Exchanger - 交換器
簡介
Exchanger(交換器)是一個用于線程間協(xié)作的工具類,是 JDK 1.5 開始提供的一個用于兩個工作線程之間交換數據的封裝工具類。Exchanger 有點類似于 CyclicBarrier,我們知道 CyclicBarrier 是一個柵欄,到達柵欄的線程需要等待其它一定數量的線程到達后,才能通過柵欄,Exchanger 可以看成是一個雙向柵欄。它提供一個同步點,在這個同步點兩個線程可以交換彼此的數據。
可簡單地將 Exchanger 對象理解為一個包含兩個格子的容器,通過 exchanger 方法可以向兩個格子中填充信息。當兩個格子中的均被填充時,該對象會自動將兩個格子的信息交換,然后返回給線程,從而實現(xiàn)兩個線程的信息交換。這兩個線程通過 exchange 方法交換數據,如果第一個線程先執(zhí)行 exchange 方法,它會一直等待第二個線程也執(zhí)行 exchange,當兩個線程都到達同步點時,這兩個線程就可以交換數據,將本線程生產出來的數據傳遞給對方。

主要接口分析
Exchanger 是最簡單的也是最復雜的,簡單在于 API 非常簡單,就一個構造方法和兩個 exchange() 方法,最復雜在于它的實現(xiàn)是最復雜的。
Exchanger 提供的關鍵方法如下:
exchange(V x) :當前線程跟另外一個線程交換數據 x,如果另外一個線程的數據準備好,那么當前線程會立刻返回,并獲得另外一個線程的數據;否則當前線程會進入等待狀態(tài)。
V exchange(V x, long timeout, TimeUnit unit):當前線程跟另外一個線程交換數據 x,有一個指定的超時時間,如果在等待時間超時了,而且還沒有收到對方的數據的話,則會拋出 TimeoutException 異常。
可以看出,當一個線程到達 exchange 調用點時,如果其他線程此前已經調用了此方法,則其他線程會被調度喚醒并與之進行對象交換,然后各自返回;如果其他線程還沒到達交換點,則當前線程會被掛起,直至其他線程到達才會完成交換并正常返回,或者當前線程被中斷或超時返回。
Coding 演示
public class ExchangerExample {
private static final Integer WORK_COUNT = 2;
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < WORK_COUNT; i++) {
executorService.execute(() -> {
String beforeObj = Thread.currentThread().getName();
try {
String afterObj = exchanger.exchange(Thread.currentThread().getName());
System.out.println(String.format("currentThread %s , before exchange %s , after exchange %s", Thread.currentThread().getName(), beforeObj, afterObj));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}
// Output
// currentThread pool-1-thread-1 , before exchange pool-1-thread-1 , after exchange pool-1-thread-2
// currentThread pool-1-thread-2 , before exchange pool-1-thread-2 , after exchange pool-1-thread-10x05: Phaser - 多階段柵欄
簡介
CountDownLatch 和 CyclicBarrier 都是 JDK 1.5 引入的,而 Phaser 是 JDK 1.7 引入的。Phaser 的功能與 CountDownLatch 和 CyclicBarrier 有部分重疊,它幾乎可以取代 CountDownLatch 和 CyclicBarrier, 其功能更靈活,更強大,支持動態(tài)調整需要控制的線程數。
CountDownLatch,閉鎖,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個或多個線程一直等待,它提供了 await()、countDown() 兩個方法來進行操作;CyclicBarrier,循環(huán)柵欄,允許一組線程互相等待,直到到達某個公共屏障點,它提供的 await() 可以實現(xiàn)讓所有參與者在臨界點到來之前一直處于等待狀態(tài);Phaser,多階段柵欄,它把多個線程協(xié)作執(zhí)行的任務劃分為多個階段,編程時需要明確各個階段的任務,每個階段都可以有任意個參與者,線程都可以隨時注冊并參與到某個階段,當到達的參與者數量滿足柵欄設定的數量后,會進行階段升級(advance)。
Phaser 顧名思義,與階段相關。Phaser 比較適合這樣一種場景,一種任務可以分為多個階段,現(xiàn)希望多個線程去處理該批任務,對于每個階段,多個線程可以并發(fā)進行,但是希望保證只有前面一個階段的任務完成之后才能開始后面的任務。這種場景可以使用多個 CyclicBarrier 來實現(xiàn),每個 CyclicBarrier 負責等待一個階段的任務全部完成。但是使用 CyclicBarrier 的缺點在于,需要明確知道總共有多少個階段,同時并行的任務數需要提前預定義好,且無法動態(tài)修改。而 Phaser 可同時解決這兩個問題。

使用場景
Phaser 主要接口如下:
主要接口分析
Phaser 提供的關鍵方法如下:
arriveAndAwaitAdvance():當前線程當前階段執(zhí)行完畢,等待其它線程完成當前階段。如果當前線程是該階段最后一個未到達的,則該方法直接返回下一個階段的序號(階段序號從 0 開始),同時其它線程的該方法也返回下一個階段的序號。arriveAndAwaitAdvance 方法是不響應中斷的,也就是說即使當前線程被中斷,arriveAndAwaitAdvance 方法也不會返回或拋出異常,而是繼續(xù)等待。如果希望能夠響應中斷,可以參考 awaitAdvanceInterruptibly 方法。
arriveAndDeregister():該方法立即返回下一階段的序號,并且其它線程需要等待的個數減一,并且把當前線程從之后需要等待的成員中移除。如果該 Phaser 是另外一個 Phaser 的子 Phaser,并且該操作導致當前 Phaser 的成員數為 0,則該操作也會將當前 Phaser 從其父 Phaser 中移除。
arrive():該方法不作任何等待,直接返回下一階段的序號。
awaitAdvance(int phase):該方法等待某一階段執(zhí)行完畢。如果當前階段不等于指定的階段或者該 Phaser 已經被終止,則立即返回。該階段數一般由 arrive() 方法或者 arriveAndDeregister() 方法返回。返回下一階段的序號,或者返回參數指定的值(如果該參數為負數),或者直接返回當前階段序號(如果當前 Phaser 已經被終止)。
awaitAdvanceInterruptibly(int phase):效果與 awaitAdvance(int phase) 相當,唯一的不同在于若該線程在該方法等待時被中斷,則該方法拋出 InterruptedException。
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit):效果與 awaitAdvanceInterruptibly(int phase) 相當,區(qū)別在于如果超時則拋出 TimeoutException。
bulkRegister(int parties):注冊多個 party。如果當前 phaser 已經被終止,則該方法無效,并返回負數。如果調用該方法時,onAdvance 方法正在執(zhí)行,則該方法等待其執(zhí)行完畢。如果該 Phaser 有父 Phaser 則指定的 party 數大于 0,且之前該 Phaser 的 party 數為 0,那么該 Phaser 會被注冊到其父 Phaser 中。
forceTermination():強制讓該 Phaser 進入終止狀態(tài)。已經注冊的 party 數不受影響。如果該 Phaser 有子 Phaser,則其所有的子 Phaser 均進入終止狀態(tài)。如果該 Phaser 已經處于終止狀態(tài),該方法調用不造成任何影響。
Coding 演示
(1)通過 Phaser 實現(xiàn) CyclicBarrier 控制多個線程的執(zhí)行時機的功能
通過 Phaser 控制多個線程的執(zhí)行時機:有時候我們希望所有線程到達指定點后再同時開始執(zhí)行,我們可以利用 CyclicBarrier 來實現(xiàn),這里給出使用 Phaser 的版本。
public class PhaserExample {
public static void main(String[] args) {
final int totalRequestCount = 10;
Phaser phaser = new Phaser();
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalRequestCount; i++) {
phaser.register();
executorService.execute(() -> {
// 等待其它參與者線程到達 [arriveAndAwaitAdvance 方法是不響應中斷的,也就是說即使當前線程被中斷,arriveAndAwaitAdvance 方法也不會返回或拋出異常,而是繼續(xù)等待。如果希望能夠響應中斷,可以參考 awaitAdvanceInterruptibly 方法]
int j = phaser.arriveAndAwaitAdvance();
// do something
System.out.println(String.format("currentThread:%s, Executing the task, currentPhase:%s", Thread.currentThread().getName(), j));
});
}
executorService.shutdown();
}
}
// Output
// currentThread:pool-1-thread-1, Executing the task, currentPhase:1
// currentThread:pool-1-thread-2, Executing the task, currentPhase:1
// currentThread:pool-1-thread-7, Executing the task, currentPhase:1
// currentThread:pool-1-thread-4, Executing the task, currentPhase:1
// currentThread:pool-1-thread-6, Executing the task, currentPhase:1
// currentThread:pool-1-thread-3, Executing the task, currentPhase:1
// currentThread:pool-1-thread-10, Executing the task, currentPhase:1
// currentThread:pool-1-thread-9, Executing the task, currentPhase:1
// currentThread:pool-1-thread-5, Executing the task, currentPhase:1
// currentThread:pool-1-thread-8, Executing the task, currentPhase:1
(2)通過 Phaser 實現(xiàn) CyclicBarrier 執(zhí)行 barrierAction
CyclicBarrier 支持 barrier action, Phaser 同樣也支持。不同之處是 Phaser 的 barrier action 需要改寫 onAdvance 方法來進行定制。
public class PhaserExample {
public static void main(String[] args) {
final int totalRequestCount = 10;
Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(String.format("%s call back is ready.", Thread.currentThread().getName()));
return super.onAdvance(phase, registeredParties);
}
};
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalRequestCount; i++) {
// 注冊各個參與者線程
phaser.register();
executorService.execute(() -> {
// 等待其它參與者線程到達 [arriveAndAwaitAdvance 方法是不響應中斷的,也就是說即使當前線程被中斷,arriveAndAwaitAdvance 方法也不會返回或拋出異常,而是繼續(xù)等待。如果希望能夠響應中斷,可以參考 awaitAdvanceInterruptibly 方法]
int j = phaser.arriveAndAwaitAdvance();
// do something
System.out.println(String.format("currentThread:%s, Executing the task, currentPhase:%s", Thread.currentThread().getName(), j));
});
}
executorService.shutdown();
}
}
// Output
// pool-1-thread-10 call back is ready.
// currentThread:pool-1-thread-10, Executing the task, currentPhase:1
// currentThread:pool-1-thread-9, Executing the task, currentPhase:1
// currentThread:pool-1-thread-7, Executing the task, currentPhase:1
// currentThread:pool-1-thread-8, Executing the task, currentPhase:1
// currentThread:pool-1-thread-5, Executing the task, currentPhase:1
// currentThread:pool-1-thread-3, Executing the task, currentPhase:1
// currentThread:pool-1-thread-1, Executing the task, currentPhase:1
// currentThread:pool-1-thread-6, Executing the task, currentPhase:1
// currentThread:pool-1-thread-2, Executing the task, currentPhase:1
// currentThread:pool-1-thread-4, Executing the task, currentPhase:1
(3)通過 Phaser 實現(xiàn) CountDownLatch 作為一個開關 / 入口功能
public class PhaserExample {
public static void main(String[] args) throws IOException {
final int totalRequestCount = 10;
// 注冊主線程, 當外部條件滿足時, 由主線程打開開關
Phaser phaser = new Phaser(1);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalRequestCount; i++) {
// 注冊各個參與者線程
phaser.register();
executorService.execute(() -> {
// 等待其它參與者線程到達 [arriveAndAwaitAdvance 方法是不響應中斷的,也就是說即使當前線程被中斷,arriveAndAwaitAdvance 方法也不會返回或拋出異常,而是繼續(xù)等待。如果希望能夠響應中斷,可以參考 awaitAdvanceInterruptibly 方法]
int j = phaser.arriveAndAwaitAdvance();
// do something
System.out.println(String.format("currentThread:%s, Executing the task, currentPhase:%s", Thread.currentThread().getName(), j));
});
}
// 打開開關 [parties 共 11 個, 主線程從之后需要等待的成員中移除, 即 parties 還剩 10]
phaser.arriveAndDeregister();
System.out.println("主線程打開了開關");
executorService.shutdown();
}
}
// Output
// 主線程打開了開關
// currentThread:pool-1-thread-6, Executing the task, currentPhase:1
// currentThread:pool-1-thread-7, Executing the task, currentPhase:1
// currentThread:pool-1-thread-1, Executing the task, currentPhase:1
// currentThread:pool-1-thread-8, Executing the task, currentPhase:1
// currentThread:pool-1-thread-3, Executing the task, currentPhase:1
// currentThread:pool-1-thread-9, Executing the task, currentPhase:1
// currentThread:pool-1-thread-4, Executing the task, currentPhase:1
// currentThread:pool-1-thread-2, Executing the task, currentPhase:1
// currentThread:pool-1-thread-5, Executing the task, currentPhase:1
(4)通過 Phaser 實現(xiàn)分層
public class PhaserExample {
public static void main(String[] args) {
final int parties = 3;
final int phases = 4;
Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("====== Phase :" + phase + "======");
return super.onAdvance(phase, registeredParties);
}
};
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < parties; i++) {
// 注冊各個參與者線程
phaser.register();
executorService.execute(() -> {
for (int phase = 0; phase < phases; phase++) {
// 等待其它參與者線程到達 [arriveAndAwaitAdvance 方法是不響應中斷的,也就是說即使當前線程被中斷,arriveAndAwaitAdvance 方法也不會返回或拋出異常,而是繼續(xù)等待。如果希望能夠響應中斷,可以參考 awaitAdvanceInterruptibly 方法]
int j = phaser.arriveAndAwaitAdvance();
// do something
System.out.println(String.format("currentThread:%s, Executing the task, currentPhase:%s", Thread.currentThread().getName(), j));
}
});
}
executorService.shutdown();
}
}
// Output
// ====== Phase : 0 ======
// currentThread:pool-1-thread-1, Executing the task, currentPhase:1
// currentThread:pool-1-thread-2, Executing the task, currentPhase:1
// currentThread:pool-1-thread-3, Executing the task, currentPhase:1
// ====== Phase : 1 ======
// currentThread:pool-1-thread-3, Executing the task, currentPhase:2
// currentThread:pool-1-thread-1, Executing the task, currentPhase:2
// currentThread:pool-1-thread-2, Executing the task, currentPhase:2
// ====== Phase : 2 ======
// currentThread:pool-1-thread-2, Executing the task, currentPhase:3
// currentThread:pool-1-thread-1, Executing the task, currentPhase:3
// currentThread:pool-1-thread-3, Executing the task, currentPhase:3
// ====== Phase : 3 ======
// currentThread:pool-1-thread-3, Executing the task, currentPhase:4
// currentThread:pool-1-thread-1, Executing the task, currentPhase:4
// currentThread:pool-1-thread-2, Executing the task, currentPhase:4
參考博文
[1]. 【并發(fā)編程】J.U.C 之 AQS 介紹、實現(xiàn)及其子類使用演示
[2]. Java 進階(四)線程間通信剖析
[3]. 透徹理解 Java 并發(fā)編程
[4]. 死磕 Java 并發(fā)
source:https://morning-pro.github.io/archives/f4f43ede.html
喜歡,在看
