<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Java多線程之CountDownLatch

          共 6345字,需瀏覽 13分鐘

           ·

          2021-11-30 14:22

          這里就JUC包中的CountDownLatch類做相關(guān)介紹

          abstract.jpeg

          概述

          JUC包中的CountDownLatch類是一個同步工具類,可實現(xiàn)線程間的通信。其典型方法如下所示

          //?創(chuàng)建一個指定計數(shù)器值的CountDownLatch實例
          public?CountDownLatch(int?count);

          //?當(dāng)前線程阻塞等待CountDownLatch實例的計數(shù)器值為0
          public?void?await()?throws?InterruptedException;

          //?支持超時的阻塞等待;?返回true:?CountDownLatch實例的計數(shù)器值為0;?返回false:?超時
          public?boolean?await(long?timeout,?TimeUnit?unit);

          //?CountDownLatch實例的計數(shù)器值減1
          public?void?countDown();

          基本使用方法也很簡單。首先創(chuàng)建一個指定計數(shù)器值的CountDownLatch實例,每當(dāng)其他線程完成任務(wù)時就通過countDown方法將計數(shù)器值減1。這樣當(dāng)計數(shù)器的值為0時,之前由于調(diào)用await方法而被阻塞的線程就會結(jié)束等待,恢復(fù)執(zhí)行

          實踐

          CountDownLatch的典型應(yīng)用場景,大體可分為兩類:結(jié)束信號、開始信號

          結(jié)束信號

          主線程創(chuàng)建、啟動N個異步任務(wù),我們期望當(dāng)這N個任務(wù)全部執(zhí)行完畢結(jié)束后,主線程才可以繼續(xù)往下執(zhí)行。即將CountDownLatch作為任務(wù)的結(jié)束信號來使用。示例代碼如下所示

          public?class?CountDownLatchTest1?{

          ????@Test
          ????public?void?test1()?throws?InterruptedException?{
          ????????ExecutorService?threadPool?=?Executors.newFixedThreadPool(5);
          ????????CountDownLatch?doneSignal?=?new?CountDownLatch(3);

          ????????Arrays.asList("Task?1","Task?2","Task?3")
          ????????????????.stream()
          ????????????????.map(?name?->?new?Task(name,?doneSignal)?)
          ????????????????.forEach(?task?->?threadPool.execute(task)?);

          ????????//?阻塞等待, 直到計數(shù)器變?yōu)?。即所有任務(wù)均完成
          ????????doneSignal.await();
          ????????System.out.println("所有任務(wù)均完成");
          ????}

          ????@AllArgsConstructor
          ????private?static?class?Task?implements?Runnable{

          ????????private?String?taskName;

          ????????private?CountDownLatch?doneSignal;

          ????????@Override
          ????????public?void?run()?{
          ????????????System.out.println(taskName?+?"?開始");
          ????????????//?模擬業(yè)務(wù)耗時
          ????????????try{
          ????????????????Thread.sleep(?RandomUtils.nextInt(5,9)?*?1000?);
          ????????????}catch?(Exception?e)?{
          ????????????????System.out.println(?"Happen?Exception:?"?+?e.getMessage());
          ????????????}
          ????????????System.out.println(taskName?+?"?完成");
          ????????????//?當(dāng)前任務(wù)完成,?則計數(shù)器減一
          ????????????doneSignal.countDown();
          ????????}
          ????}
          }

          測試結(jié)果如下所示,符合預(yù)期

          figure 1.jpeg

          開始信號

          主線程創(chuàng)建N個異步任務(wù),但這N個任務(wù)不能立即開始執(zhí)行。而需要等待某個共同的前置任務(wù)(比如初始化任務(wù))完成后,才允許這N個任務(wù)開始執(zhí)行。即將CountDownLatch作為任務(wù)的開始信號來使用。示例代碼如下所示

          public?class?CountDownLatchTest2?{

          ????@Test
          ????public?void?test1()?throws?InterruptedException?{
          ????????ExecutorService?threadPool?=?Executors.newFixedThreadPool(10);
          ????????CountDownLatch?startSignal?=?new?CountDownLatch(1);

          ????????Arrays.asList("Task?1","Task?2","Task?3")
          ????????????????.stream()
          ????????????????.map(?name?->?new?Task(name,?startSignal)?)
          ????????????????.forEach(?task?->?threadPool.execute(task)?);

          ????????//?執(zhí)行初始化準(zhǔn)備工作
          ????????System.out.println("初始化準(zhǔn)備工作開始");
          ????????//?模擬業(yè)務(wù)耗時
          ????????try{
          ????????????Thread.sleep(?RandomUtils.nextInt(5,9)?*?1000?);
          ????????}catch?(Exception?e)?{
          ????????????System.out.println(?"Happen?Exception:?"?+?e.getMessage());
          ????????}
          ????????System.out.println("初始化準(zhǔn)備工作結(jié)束");

          ????????//?初始化準(zhǔn)備工作完成,?則計數(shù)器減一
          ????????startSignal.countDown();

          ????????//?主線程等待所有任務(wù)執(zhí)行完畢
          ????????try{?Thread.sleep(?20*1000?);?}?catch?(Exception?e)?{}
          ????????System.out.println("Game?Over");
          ????}

          ????@AllArgsConstructor
          ????private?static?class?Task?implements?Runnable{

          ????????private?String?taskName;

          ????????private?CountDownLatch?startSignal;

          ????????@Override
          ????????public?void?run()?{
          ????????????try{
          ????????????????//?阻塞等待, 直到計數(shù)器變?yōu)?。?即前置任務(wù)完成
          ????????????????startSignal.await();
          ????????????}catch?(InterruptedException?e)?{
          ????????????????System.out.println(?"Happen?Exception:?"?+?e.getMessage());
          ????????????}

          ????????????System.out.println(taskName?+?"?開始");
          ????????????//?模擬業(yè)務(wù)耗時
          ????????????try{
          ????????????????Thread.sleep(?RandomUtils.nextInt(5,9)?*?1000?);
          ????????????}catch?(Exception?e)?{
          ????????????????System.out.println(?"Happen?Exception:?"?+?e.getMessage());
          ????????????}
          ????????????System.out.println(taskName?+?"?完成");
          ????????}
          ????}
          }

          測試結(jié)果如下所示,符合預(yù)期

          figure 2.jpeg

          基本原理

          構(gòu)造器

          CountDownLatch類實現(xiàn)過程同樣依賴于AQS。在構(gòu)建CountDownLatch實例過程時,一方面,通過sync變量持有AQS的實現(xiàn)類Sync;另一方面,通過AQS的state字段來存儲計數(shù)器值

          public?class?CountDownLatch?{
          ????private?final?Sync?sync;

          ????public?CountDownLatch(int?count)?{
          ????????if?(count?0)?throw?new?IllegalArgumentException("count?);
          ????????this.sync?=?new?Sync(count);
          ????}

          ????private?static?final?class?Sync?extends?AbstractQueuedSynchronizer?{
          ????????Sync(int?count)?{
          ????????????setState(count);
          ????????}???
          ????}
          }

          ...

          public?abstract?class?AbstractQueuedSynchronizer?extends?AbstractOwnableSynchronizer?implements?java.io.Serializable?{
          ????private?volatile?int?state;

          ????protected?final?void?setState(int?newState)?{
          ????????state?=?newState;
          ????}
          }

          await方法

          首先來看CountDownLatch的await方法。其委托sync調(diào)用AQS的acquireSharedInterruptibly方法,從方法名也可以看到其是對AQS中共享鎖的使用。并根據(jù)當(dāng)前計數(shù)器的值是否為0,來判斷該線程是繼續(xù)執(zhí)行還是應(yīng)該被阻塞。可以看到事實上AQS只是定義了是否需要阻塞線程的tryAcquireShared方法,具體的規(guī)則需要CountDownLatch類來進行實現(xiàn)

          public?class?CountDownLatch?{
          ????public?void?await()?throws?InterruptedException?{
          ????????sync.acquireSharedInterruptibly(1);
          ????}

          ????private?static?final?class?Sync?extends?AbstractQueuedSynchronizer?{???
          ????????//?判斷當(dāng)前計數(shù)器值是否為0,?是則返回1;?否則返回-1
          ????????protected?int?tryAcquireShared(int?acquires)?{
          ????????????return?(getState()?==?0)???1?:?-1;
          ????????}???
          ????}
          }

          ...

          public?abstract?class?AbstractQueuedSynchronizer?extends?AbstractOwnableSynchronizer?implements?java.io.Serializable?{
          ????public?final?void?acquireSharedInterruptibly(int?arg)?throws?InterruptedException?{
          ????????//?線程被中斷則直接拋出異常
          ????????if?(Thread.interrupted())
          ????????????throw?new?InterruptedException();
          ????????
          ????????if?(tryAcquireShared(arg)?0)
          ????????????//?當(dāng)前計數(shù)器不為0,?需進入AQS的隊列準(zhǔn)備阻塞
          ????????????doAcquireSharedInterruptibly(arg);
          ????}
          ????
          ????//?需要子類去實現(xiàn)
          ????protected?int?tryAcquireShared(int?arg)?{
          ????????throw?new?UnsupportedOperationException();
          ????}
          }

          當(dāng)tryAcquireShared方法結(jié)果小于0時,即當(dāng)前計數(shù)器不為0時,AQS如何通過doAcquireSharedInterruptibly方法實現(xiàn)阻塞呢?結(jié)合相關(guān)源碼可以看到,首先通過addWaiter方法將當(dāng)前線程包裝為一個node實例,并將其加入AQS隊列。在入隊過程中需要注意,如果隊列為空則其并不是直接將該node實例加入隊列。而是先構(gòu)造一個哨兵節(jié)點來入隊,然后在enq方法下一輪for循環(huán)才將該node實例加入隊列

          public?abstract?class?AbstractQueuedSynchronizer?extends?AbstractOwnableSynchronizer?implements?java.io.Serializable?{
          ????private?void?doAcquireSharedInterruptibly(int?arg)?throws?InterruptedException?{
          ????????//?將當(dāng)前線程包裝為node,加入AQS隊列,并返回該node實例
          ????????final?Node?node?=?addWaiter(Node.SHARED);
          ????????boolean?failed?=?true;
          ????????try?{
          ????????????for?(;;)?{
          ????????????????//?獲取?node?的前驅(qū)節(jié)點
          ????????????????final?Node?p?=?node.predecessor();
          ????????????????if?(p?==?head)?{
          ????????????????????int?r?=?tryAcquireShared(arg);
          ????????????????????if?(r?>=?0)?{
          ????????????????????????setHeadAndPropagate(node,?r);
          ????????????????????????p.next?=?null;?//?help?GC
          ????????????????????????failed?=?false;
          ????????????????????????return;
          ????????????????????}
          ????????????????}
          ????????????????if?(shouldParkAfterFailedAcquire(p,?node)?&&
          ????????????????????parkAndCheckInterrupt())
          ????????????????????throw?new?InterruptedException();
          ????????????}
          ????????}?finally?{
          ????????????if?(failed)
          ????????????????cancelAcquire(node);
          ????????}
          ????}

          ????private?Node?addWaiter(Node?mode)?{
          ????????//?將當(dāng)前線程包裝為一個node實例
          ????????Node?node?=?new?Node(Thread.currentThread(),?mode);
          ????????Node?pred?=?tail;
          ????????//?隊列的尾指針不為空,?說明隊列不為空,?則利用尾插法將node入隊
          ????????if?(pred?!=?null)?{
          ????????????node.prev?=?pred;
          ????????????if?(compareAndSetTail(pred,?node))?{
          ????????????????pred.next?=?node;
          ????????????????//?入隊完畢,?直接返回該node
          ????????????????return?node;
          ????????????}
          ????????}
          ????????//?隊列為空,?則先構(gòu)建一個哨兵節(jié)點、入隊,再將該node入隊
          ????????enq(node);
          ????????return?node;
          ????}

          ????private?Node?enq(final?Node?node)?{
          ????????for?(;;)?{
          ????????????Node?t?=?tail;
          ????????????//?隊尾指針為空,?則先進行隊列的初始化
          ????????????if?(t?==?null)?{?
          ????????????????//?構(gòu)建一個哨兵節(jié)點并入隊
          ????????????????if?(compareAndSetHead(new?Node()))
          ????????????????????tail?=?head;
          ????????????}?else?{
          ????????????????//?將node入隊?
          ????????????????node.prev?=?t;
          ????????????????if?(compareAndSetTail(t,?node))?{
          ????????????????????t.next?=?node;
          ????????????????????return?t;
          ????????????????}
          ????????????}
          ????????}
          ????}
          }

          然后通過shouldParkAfterFailedAcquire方法修改前驅(qū)節(jié)點的waitStatus。如果前驅(qū)節(jié)點的waitStatus字段是初始值0的話,需在第一輪for循環(huán)中進入shouldParkAfterFailedAcquire方法時,通過compareAndSetWaitStatus(pred, ws, Node.SIGNAL)方法將前驅(qū)節(jié)點的waitStatus字段修改為Node.SIGNAL(即-1)。這樣在開始下一輪for循環(huán)時,shouldParkAfterFailedAcquire方法即會返回true。進而執(zhí)行parkAndCheckInterrupt方法,利用LockSupport.park完成線程阻塞

          private?static?boolean?shouldParkAfterFailedAcquire(Node?pred,?Node?node)?{
          ????//?獲取前驅(qū)節(jié)點的waitStatus字段值
          ????int?ws?=?pred.waitStatus;
          ????if?(ws?==?Node.SIGNAL)
          ????????return?true;
          ????if?(ws?>?0)?{
          ????????do?{
          ????????????node.prev?=?pred?=?pred.prev;
          ????????}?while?(pred.waitStatus?>?0);
          ????????pred.next?=?node;
          ????}?else?{
          ????????compareAndSetWaitStatus(pred,?ws,?Node.SIGNAL);
          ????}
          ????return?false;
          }

          private?final?boolean?parkAndCheckInterrupt()?{
          ????LockSupport.park(this);
          ????return?Thread.interrupted();
          }

          countDown方法

          CountDownLatch的countDown方法類似。其同樣是委托sync調(diào)用AQS的releaseShared方法。然后AQS執(zhí)行tryReleaseShared方法,CountDownLatch類負(fù)責(zé)實現(xiàn)具體的規(guī)則邏輯。如果自減后當(dāng)前計數(shù)器為0,則說明需要喚醒之前通過await方法而被阻塞的線程。然后通過AQS的doReleaseShared方法實現(xiàn)喚醒。具體地,其是從頭節(jié)點的后繼節(jié)點開始喚醒。因為前面已經(jīng)說過,AQS隊列的第一個節(jié)點(即頭節(jié)點)只是一個哨兵節(jié)點

          public?class?CountDownLatch?{
          ????public?void?countDown()?{
          ????????sync.releaseShared(1);
          ????}

          ????private?static?final?class?Sync?extends?AbstractQueuedSynchronizer?{
          ????????protected?boolean?tryReleaseShared(int?releases)?{
          ????????????for?(;;)?{
          ????????????????int?c?=?getState();
          ????????????????if?(c?==?0)
          ????????????????????return?false;
          ????????????????int?nextc?=?c-1;
          ????????????????if?(compareAndSetState(c,?nextc))
          ????????????????????return?nextc?==?0;
          ????????????}
          ????????}??
          ????}?
          }

          ...

          public?abstract?class?AbstractQueuedSynchronizer?extends?AbstractOwnableSynchronizer?implements?java.io.Serializable?{
          ????public?final?boolean?releaseShared(int?arg)?{
          ????????if?(tryReleaseShared(arg))?{
          ????????????doReleaseShared();
          ????????????return?true;
          ????????}
          ????????return?false;
          ????}
          ????
          ????//?需要子類去實現(xiàn)
          ????protected?boolean?tryReleaseShared(int?arg)?{
          ????????throw?new?UnsupportedOperationException();
          ????}

          ????private?void?doReleaseShared()?{
          ????????for?(;;)?{
          ????????????Node?h?=?head;
          ????????????if?(h?!=?null?&&?h?!=?tail)?{
          ????????????????int?ws?=?h.waitStatus;
          ????????????????if?(ws?==?Node.SIGNAL)?{
          ????????????????????if?(!compareAndSetWaitStatus(h,?Node.SIGNAL,?0))
          ????????????????????????continue;????????????//?loop?to?recheck?cases
          ????????????????????//?喚醒頭節(jié)點的后繼節(jié)點
          ????????????????????unparkSuccessor(h);
          ????????????????}
          ????????????????else?if?(ws?==?0?&&
          ?????????????????????????!compareAndSetWaitStatus(h,?0,?Node.PROPAGATE))
          ????????????????????continue;????????????????//?loop?on?failed?CAS
          ????????????}
          ????????????if?(h?==?head)???????????????????//?loop?if?head?changed
          ????????????????break;
          ????????}
          ????}

          ????private?void?unparkSuccessor(Node?node)?{
          ????????int?ws?=?node.waitStatus;
          ????????if?(ws?0)
          ????????????compareAndSetWaitStatus(node,?ws,?0);
          ????????Node?s?=?node.next;
          ????????if?(s?==?null?||?s.waitStatus?>?0)?{
          ????????????s?=?null;
          ????????????for?(Node?t?=?tail;?t?!=?null?&&?t?!=?node;?t?=?t.prev)
          ????????????????if?(t.waitStatus?<=?0)
          ????????????????????s?=?t;
          ????????}
          ????????if?(s?!=?null)
          ????????????LockSupport.unpark(s.thread);
          ????}
          }

          這里補充說明下,當(dāng)上文由于調(diào)用await方法而被阻塞的線程喚醒后,其會在doAcquireSharedInterruptibly方法的for循環(huán)中恢復(fù)執(zhí)行。此時由于tryAcquireShared方法的返回值r大于0滿足條件,故其進入setHeadAndPropagate方法。在該方法中,其將自身重新設(shè)置為AQS的頭節(jié)點。并通過doReleaseShared方法繼續(xù)喚醒它的后繼節(jié)點。從而實現(xiàn)將AQS隊列被阻塞的線程全部喚醒

          private?void?setHeadAndPropagate(Node?node,?int?propagate)?{
          ????Node?h?=?head;?//?Record?old?head?for?check?below
          ????setHead(node);

          ????if?(propagate?>?0?||?h?==?null?||?h.waitStatus?0?||
          ????????(h?=?head)?==?null?||?h.waitStatus?0)?{
          ????????Node?s?=?node.next;
          ????????if?(s?==?null?||?s.isShared())
          ????????????doReleaseShared();
          ????}
          }

          Note

          CountDownLatch的計數(shù)器值只能在創(chuàng)建實例時進行設(shè)置,之后不可以對其進行重新設(shè)置。換言之,CountDownLatch是一次性的,當(dāng)其使用完畢后將無法再次利用

          參考文獻(xiàn)

          1. Java并發(fā)編程之美 翟陸續(xù)、薛賓田著
          瀏覽 68
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美在线一区二区 | 成人免费污污污视频 | 欧美三级在线 | 日本1234区在线观看 | a视频免费在线观看 |