Java多線程之CountDownLatch
這里就JUC包中的CountDownLatch類做相關(guān)介紹

概述
JUC包中的CountDownLatch類是一個同步工具類,可實現(xiàn)線程間的通信。其典型方法如下所示
//?創(chuàng)建一個指定計數(shù)器值的CountDownLatch實例
public?CountDownLatch(int?count);
//?當(dāng)前線程阻塞等待CountDownLatch實例的計數(shù)器值為0
public?void?await()?throws?InterruptedException;
//?支持超時的阻塞等待;?返回true:?CountDownLatch實例的計數(shù)器值為0;?返回false:?超時
public?boolean?await(long?timeout,?TimeUnit?unit);
//?CountDownLatch實例的計數(shù)器值減1
public?void?countDown();
基本使用方法也很簡單。首先創(chuàng)建一個指定計數(shù)器值的CountDownLatch實例,每當(dāng)其他線程完成任務(wù)時就通過countDown方法將計數(shù)器值減1。這樣當(dāng)計數(shù)器的值為0時,之前由于調(diào)用await方法而被阻塞的線程就會結(jié)束等待,恢復(fù)執(zhí)行
實踐
CountDownLatch的典型應(yīng)用場景,大體可分為兩類:結(jié)束信號、開始信號
結(jié)束信號
主線程創(chuàng)建、啟動N個異步任務(wù),我們期望當(dāng)這N個任務(wù)全部執(zhí)行完畢結(jié)束后,主線程才可以繼續(xù)往下執(zhí)行。即將CountDownLatch作為任務(wù)的結(jié)束信號來使用。示例代碼如下所示
public?class?CountDownLatchTest1?{
????@Test
????public?void?test1()?throws?InterruptedException?{
????????ExecutorService?threadPool?=?Executors.newFixedThreadPool(5);
????????CountDownLatch?doneSignal?=?new?CountDownLatch(3);
????????Arrays.asList("Task?1","Task?2","Task?3")
????????????????.stream()
????????????????.map(?name?->?new?Task(name,?doneSignal)?)
????????????????.forEach(?task?->?threadPool.execute(task)?);
????????//?阻塞等待, 直到計數(shù)器變?yōu)?。即所有任務(wù)均完成
????????doneSignal.await();
????????System.out.println("所有任務(wù)均完成");
????}
????@AllArgsConstructor
????private?static?class?Task?implements?Runnable{
????????private?String?taskName;
????????private?CountDownLatch?doneSignal;
????????@Override
????????public?void?run()?{
????????????System.out.println(taskName?+?"?開始");
????????????//?模擬業(yè)務(wù)耗時
????????????try{
????????????????Thread.sleep(?RandomUtils.nextInt(5,9)?*?1000?);
????????????}catch?(Exception?e)?{
????????????????System.out.println(?"Happen?Exception:?"?+?e.getMessage());
????????????}
????????????System.out.println(taskName?+?"?完成");
????????????//?當(dāng)前任務(wù)完成,?則計數(shù)器減一
????????????doneSignal.countDown();
????????}
????}
}
測試結(jié)果如下所示,符合預(yù)期

開始信號
主線程創(chuàng)建N個異步任務(wù),但這N個任務(wù)不能立即開始執(zhí)行。而需要等待某個共同的前置任務(wù)(比如初始化任務(wù))完成后,才允許這N個任務(wù)開始執(zhí)行。即將CountDownLatch作為任務(wù)的開始信號來使用。示例代碼如下所示
public?class?CountDownLatchTest2?{
????@Test
????public?void?test1()?throws?InterruptedException?{
????????ExecutorService?threadPool?=?Executors.newFixedThreadPool(10);
????????CountDownLatch?startSignal?=?new?CountDownLatch(1);
????????Arrays.asList("Task?1","Task?2","Task?3")
????????????????.stream()
????????????????.map(?name?->?new?Task(name,?startSignal)?)
????????????????.forEach(?task?->?threadPool.execute(task)?);
????????//?執(zhí)行初始化準(zhǔn)備工作
????????System.out.println("初始化準(zhǔn)備工作開始");
????????//?模擬業(yè)務(wù)耗時
????????try{
????????????Thread.sleep(?RandomUtils.nextInt(5,9)?*?1000?);
????????}catch?(Exception?e)?{
????????????System.out.println(?"Happen?Exception:?"?+?e.getMessage());
????????}
????????System.out.println("初始化準(zhǔn)備工作結(jié)束");
????????//?初始化準(zhǔn)備工作完成,?則計數(shù)器減一
????????startSignal.countDown();
????????//?主線程等待所有任務(wù)執(zhí)行完畢
????????try{?Thread.sleep(?20*1000?);?}?catch?(Exception?e)?{}
????????System.out.println("Game?Over");
????}
????@AllArgsConstructor
????private?static?class?Task?implements?Runnable{
????????private?String?taskName;
????????private?CountDownLatch?startSignal;
????????@Override
????????public?void?run()?{
????????????try{
????????????????//?阻塞等待, 直到計數(shù)器變?yōu)?。?即前置任務(wù)完成
????????????????startSignal.await();
????????????}catch?(InterruptedException?e)?{
????????????????System.out.println(?"Happen?Exception:?"?+?e.getMessage());
????????????}
????????????System.out.println(taskName?+?"?開始");
????????????//?模擬業(yè)務(wù)耗時
????????????try{
????????????????Thread.sleep(?RandomUtils.nextInt(5,9)?*?1000?);
????????????}catch?(Exception?e)?{
????????????????System.out.println(?"Happen?Exception:?"?+?e.getMessage());
????????????}
????????????System.out.println(taskName?+?"?完成");
????????}
????}
}
測試結(jié)果如下所示,符合預(yù)期

基本原理
構(gòu)造器
CountDownLatch類實現(xiàn)過程同樣依賴于AQS。在構(gòu)建CountDownLatch實例過程時,一方面,通過sync變量持有AQS的實現(xiàn)類Sync;另一方面,通過AQS的state字段來存儲計數(shù)器值
public?class?CountDownLatch?{
????private?final?Sync?sync;
????public?CountDownLatch(int?count)?{
????????if?(count?0)?throw?new?IllegalArgumentException("count?0");
????????this.sync?=?new?Sync(count);
????}
????private?static?final?class?Sync?extends?AbstractQueuedSynchronizer?{
????????Sync(int?count)?{
????????????setState(count);
????????}???
????}
}
...
public?abstract?class?AbstractQueuedSynchronizer?extends?AbstractOwnableSynchronizer?implements?java.io.Serializable?{
????private?volatile?int?state;
????protected?final?void?setState(int?newState)?{
????????state?=?newState;
????}
}
await方法
首先來看CountDownLatch的await方法。其委托sync調(diào)用AQS的acquireSharedInterruptibly方法,從方法名也可以看到其是對AQS中共享鎖的使用。并根據(jù)當(dāng)前計數(shù)器的值是否為0,來判斷該線程是繼續(xù)執(zhí)行還是應(yīng)該被阻塞。可以看到事實上AQS只是定義了是否需要阻塞線程的tryAcquireShared方法,具體的規(guī)則需要CountDownLatch類來進行實現(xiàn)
public?class?CountDownLatch?{
????public?void?await()?throws?InterruptedException?{
????????sync.acquireSharedInterruptibly(1);
????}
????private?static?final?class?Sync?extends?AbstractQueuedSynchronizer?{???
????????//?判斷當(dāng)前計數(shù)器值是否為0,?是則返回1;?否則返回-1
????????protected?int?tryAcquireShared(int?acquires)?{
????????????return?(getState()?==?0)???1?:?-1;
????????}???
????}
}
...
public?abstract?class?AbstractQueuedSynchronizer?extends?AbstractOwnableSynchronizer?implements?java.io.Serializable?{
????public?final?void?acquireSharedInterruptibly(int?arg)?throws?InterruptedException?{
????????//?線程被中斷則直接拋出異常
????????if?(Thread.interrupted())
????????????throw?new?InterruptedException();
????????
????????if?(tryAcquireShared(arg)?0)
????????????//?當(dāng)前計數(shù)器不為0,?需進入AQS的隊列準(zhǔn)備阻塞
????????????doAcquireSharedInterruptibly(arg);
????}
????
????//?需要子類去實現(xiàn)
????protected?int?tryAcquireShared(int?arg)?{
????????throw?new?UnsupportedOperationException();
????}
}
當(dāng)tryAcquireShared方法結(jié)果小于0時,即當(dāng)前計數(shù)器不為0時,AQS如何通過doAcquireSharedInterruptibly方法實現(xiàn)阻塞呢?結(jié)合相關(guān)源碼可以看到,首先通過addWaiter方法將當(dāng)前線程包裝為一個node實例,并將其加入AQS隊列。在入隊過程中需要注意,如果隊列為空則其并不是直接將該node實例加入隊列。而是先構(gòu)造一個哨兵節(jié)點來入隊,然后在enq方法下一輪for循環(huán)才將該node實例加入隊列
public?abstract?class?AbstractQueuedSynchronizer?extends?AbstractOwnableSynchronizer?implements?java.io.Serializable?{
????private?void?doAcquireSharedInterruptibly(int?arg)?throws?InterruptedException?{
????????//?將當(dāng)前線程包裝為node,加入AQS隊列,并返回該node實例
????????final?Node?node?=?addWaiter(Node.SHARED);
????????boolean?failed?=?true;
????????try?{
????????????for?(;;)?{
????????????????//?獲取?node?的前驅(qū)節(jié)點
????????????????final?Node?p?=?node.predecessor();
????????????????if?(p?==?head)?{
????????????????????int?r?=?tryAcquireShared(arg);
????????????????????if?(r?>=?0)?{
????????????????????????setHeadAndPropagate(node,?r);
????????????????????????p.next?=?null;?//?help?GC
????????????????????????failed?=?false;
????????????????????????return;
????????????????????}
????????????????}
????????????????if?(shouldParkAfterFailedAcquire(p,?node)?&&
????????????????????parkAndCheckInterrupt())
????????????????????throw?new?InterruptedException();
????????????}
????????}?finally?{
????????????if?(failed)
????????????????cancelAcquire(node);
????????}
????}
????private?Node?addWaiter(Node?mode)?{
????????//?將當(dāng)前線程包裝為一個node實例
????????Node?node?=?new?Node(Thread.currentThread(),?mode);
????????Node?pred?=?tail;
????????//?隊列的尾指針不為空,?說明隊列不為空,?則利用尾插法將node入隊
????????if?(pred?!=?null)?{
????????????node.prev?=?pred;
????????????if?(compareAndSetTail(pred,?node))?{
????????????????pred.next?=?node;
????????????????//?入隊完畢,?直接返回該node
????????????????return?node;
????????????}
????????}
????????//?隊列為空,?則先構(gòu)建一個哨兵節(jié)點、入隊,再將該node入隊
????????enq(node);
????????return?node;
????}
????private?Node?enq(final?Node?node)?{
????????for?(;;)?{
????????????Node?t?=?tail;
????????????//?隊尾指針為空,?則先進行隊列的初始化
????????????if?(t?==?null)?{?
????????????????//?構(gòu)建一個哨兵節(jié)點并入隊
????????????????if?(compareAndSetHead(new?Node()))
????????????????????tail?=?head;
????????????}?else?{
????????????????//?將node入隊?
????????????????node.prev?=?t;
????????????????if?(compareAndSetTail(t,?node))?{
????????????????????t.next?=?node;
????????????????????return?t;
????????????????}
????????????}
????????}
????}
}
然后通過shouldParkAfterFailedAcquire方法修改前驅(qū)節(jié)點的waitStatus。如果前驅(qū)節(jié)點的waitStatus字段是初始值0的話,需在第一輪for循環(huán)中進入shouldParkAfterFailedAcquire方法時,通過compareAndSetWaitStatus(pred, ws, Node.SIGNAL)方法將前驅(qū)節(jié)點的waitStatus字段修改為Node.SIGNAL(即-1)。這樣在開始下一輪for循環(huán)時,shouldParkAfterFailedAcquire方法即會返回true。進而執(zhí)行parkAndCheckInterrupt方法,利用LockSupport.park完成線程阻塞
private?static?boolean?shouldParkAfterFailedAcquire(Node?pred,?Node?node)?{
????//?獲取前驅(qū)節(jié)點的waitStatus字段值
????int?ws?=?pred.waitStatus;
????if?(ws?==?Node.SIGNAL)
????????return?true;
????if?(ws?>?0)?{
????????do?{
????????????node.prev?=?pred?=?pred.prev;
????????}?while?(pred.waitStatus?>?0);
????????pred.next?=?node;
????}?else?{
????????compareAndSetWaitStatus(pred,?ws,?Node.SIGNAL);
????}
????return?false;
}
private?final?boolean?parkAndCheckInterrupt()?{
????LockSupport.park(this);
????return?Thread.interrupted();
}
countDown方法
CountDownLatch的countDown方法類似。其同樣是委托sync調(diào)用AQS的releaseShared方法。然后AQS執(zhí)行tryReleaseShared方法,CountDownLatch類負(fù)責(zé)實現(xiàn)具體的規(guī)則邏輯。如果自減后當(dāng)前計數(shù)器為0,則說明需要喚醒之前通過await方法而被阻塞的線程。然后通過AQS的doReleaseShared方法實現(xiàn)喚醒。具體地,其是從頭節(jié)點的后繼節(jié)點開始喚醒。因為前面已經(jīng)說過,AQS隊列的第一個節(jié)點(即頭節(jié)點)只是一個哨兵節(jié)點
public?class?CountDownLatch?{
????public?void?countDown()?{
????????sync.releaseShared(1);
????}
????private?static?final?class?Sync?extends?AbstractQueuedSynchronizer?{
????????protected?boolean?tryReleaseShared(int?releases)?{
????????????for?(;;)?{
????????????????int?c?=?getState();
????????????????if?(c?==?0)
????????????????????return?false;
????????????????int?nextc?=?c-1;
????????????????if?(compareAndSetState(c,?nextc))
????????????????????return?nextc?==?0;
????????????}
????????}??
????}?
}
...
public?abstract?class?AbstractQueuedSynchronizer?extends?AbstractOwnableSynchronizer?implements?java.io.Serializable?{
????public?final?boolean?releaseShared(int?arg)?{
????????if?(tryReleaseShared(arg))?{
????????????doReleaseShared();
????????????return?true;
????????}
????????return?false;
????}
????
????//?需要子類去實現(xiàn)
????protected?boolean?tryReleaseShared(int?arg)?{
????????throw?new?UnsupportedOperationException();
????}
????private?void?doReleaseShared()?{
????????for?(;;)?{
????????????Node?h?=?head;
????????????if?(h?!=?null?&&?h?!=?tail)?{
????????????????int?ws?=?h.waitStatus;
????????????????if?(ws?==?Node.SIGNAL)?{
????????????????????if?(!compareAndSetWaitStatus(h,?Node.SIGNAL,?0))
????????????????????????continue;????????????//?loop?to?recheck?cases
????????????????????//?喚醒頭節(jié)點的后繼節(jié)點
????????????????????unparkSuccessor(h);
????????????????}
????????????????else?if?(ws?==?0?&&
?????????????????????????!compareAndSetWaitStatus(h,?0,?Node.PROPAGATE))
????????????????????continue;????????????????//?loop?on?failed?CAS
????????????}
????????????if?(h?==?head)???????????????????//?loop?if?head?changed
????????????????break;
????????}
????}
????private?void?unparkSuccessor(Node?node)?{
????????int?ws?=?node.waitStatus;
????????if?(ws?0)
????????????compareAndSetWaitStatus(node,?ws,?0);
????????Node?s?=?node.next;
????????if?(s?==?null?||?s.waitStatus?>?0)?{
????????????s?=?null;
????????????for?(Node?t?=?tail;?t?!=?null?&&?t?!=?node;?t?=?t.prev)
????????????????if?(t.waitStatus?<=?0)
????????????????????s?=?t;
????????}
????????if?(s?!=?null)
????????????LockSupport.unpark(s.thread);
????}
}
這里補充說明下,當(dāng)上文由于調(diào)用await方法而被阻塞的線程喚醒后,其會在doAcquireSharedInterruptibly方法的for循環(huán)中恢復(fù)執(zhí)行。此時由于tryAcquireShared方法的返回值r大于0滿足條件,故其進入setHeadAndPropagate方法。在該方法中,其將自身重新設(shè)置為AQS的頭節(jié)點。并通過doReleaseShared方法繼續(xù)喚醒它的后繼節(jié)點。從而實現(xiàn)將AQS隊列被阻塞的線程全部喚醒
private?void?setHeadAndPropagate(Node?node,?int?propagate)?{
????Node?h?=?head;?//?Record?old?head?for?check?below
????setHead(node);
????if?(propagate?>?0?||?h?==?null?||?h.waitStatus?0?||
????????(h?=?head)?==?null?||?h.waitStatus?0)?{
????????Node?s?=?node.next;
????????if?(s?==?null?||?s.isShared())
????????????doReleaseShared();
????}
}
Note
CountDownLatch的計數(shù)器值只能在創(chuàng)建實例時進行設(shè)置,之后不可以對其進行重新設(shè)置。換言之,CountDownLatch是一次性的,當(dāng)其使用完畢后將無法再次利用
參考文獻(xiàn)
Java并發(fā)編程之美 翟陸續(xù)、薛賓田著
