<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          CyclicBarrier 源碼解析

          共 6108字,需瀏覽 13分鐘

           ·

          2021-01-11 17:30


          點擊上方?藍字?關注我們!



          Java,Python,C/C++,Linux,PHP,Go,C#,QT,大數(shù)據(jù),算法,軟件教程,前端,簡歷,畢業(yè)設計等分類,資源在不斷更新中... 點擊領取


          1、學習切入點


          百度翻譯大概意思就是:

          一種同步輔助程序,允許一組線程相互等待到達一個公共的屏障點。CyclicBarrier在涉及固定大小的線程方的程序中非常有用,這些線程方有時必須相互等待。這個屏障被稱為循環(huán)屏障,因為它可以在等待的線程被釋放后重新使用。

          CyclicBarrier支持可選的Runnable命令,該命令在參與方中的最后一個線程到達后,但在釋放任何線程之前,每個屏障點運行一次。此屏障操作有助于在任何參與方繼續(xù)之前更新共享狀態(tài)。

          動圖演示:

          在上文中我們分析完了 CountDownLatch源碼,可以理解為減法計數(shù)器,是基于AQS的共享模式使用,而CyclicBarrier相比于CountDownLatch 來說,要簡單很多,它類似于加法計數(shù)器,在源碼中使用?ReentrantLock 和 Condition 的組合來使用。

          2、案例演示 CyclicBarrier

          //加法計數(shù)器
          public?class?CyclicBarrierDemo?{
          ????public?static?void?main(String[]?args)?{
          ????????/**
          ?????????*?集齊5名隊員,開始游戲
          ?????????*/

          ????????//?開始戰(zhàn)斗的線程
          ????????CyclicBarrier?cyclicBarrier?=?new?CyclicBarrier(5,()->{
          ????????????System.out.println("歡迎來到王者榮耀,敵軍還有五秒到達戰(zhàn)場!全軍出擊!");
          ????????});
          ????????for?(int?i?=?1;?i?<=5?;?i++)?{
          ????????????final?int?temp?=?i;
          ????????????//?lambda能操作到?i?嗎
          ????????????new?Thread(()->{
          ????????????????System.out.println(Thread.currentThread().getName()+"第"+temp+"個進入游戲!");
          ????????????????try?{
          ????????????????????cyclicBarrier.await();?//?等待
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}?catch?(BrokenBarrierException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}
          ????????????}).start();
          ????????}
          ????}
          }

          3、入手構造器

          //構造器1
          /**?創(chuàng)建一個新的CyclicBarrier,它將在給定數(shù)量的參與方(線程)等待時觸發(fā),并在觸發(fā)屏障時執(zhí)行給定的屏障操作,由最后一個進入屏障的線程執(zhí)行?*/???
          public?CyclicBarrier(int?parties,?Runnable?barrierAction)?{
          ????????if?(parties?<=?0)?throw?new?IllegalArgumentException();
          ????????this.parties?=?parties;
          ????????this.count?=?parties;
          ????????this.barrierCommand?=?barrierAction;
          ????}

          //構造器2
          /**?創(chuàng)建一個新的CyclicBarrier,當給定數(shù)量的參與方(線程)在等待它時,它將跳閘,并且在屏障跳閘時不執(zhí)行預定義的操作?*/
          public?CyclicBarrier(int?parties)?{
          ????????this(parties,?null);
          ????}

          其中構造器1為核心構造器,在這里你可以指定?parties?本局游戲的參與者的數(shù)量(要攔截的線程數(shù))以及?barrierAction?本局游戲結束時要執(zhí)行的任務。

          3、入手成員變量

          ???/**?同步操作鎖?*/
          ????private?final?ReentrantLock?lock?=?new?ReentrantLock();
          ????/**?線程攔截器?Condition維護了一個阻塞隊列*/
          ????private?final?Condition?trip?=?lock.newCondition();
          ????/**?每次攔截的線程數(shù)?*/
          ????private?final?int?parties;
          ????/*?換代前執(zhí)行的任務?*/
          ????private?final?Runnable?barrierCommand;
          ????/**?表示柵欄的當前代?類似代表本局游戲*/
          ????private?Generation?generation?=?new?Generation();
          ????/**?計數(shù)器?*/
          ????private?int?count;
          ????/**?靜態(tài)內(nèi)部類Generation??*/
          ????private?static?class?Generation?{
          ????????boolean?broken?=?false;
          ????}

          3、入手核心方法

          3.1、【await】方法源碼分析

          下面分析這兩個方法,分別為【非定時等待】和【定時等待】!

          //非定時等待
          public?int?await()?throws?InterruptedException,?BrokenBarrierException?{
          ????????try?{
          ????????????return?dowait(false,?0L);
          ????????}?catch?(TimeoutException?toe)?{
          ????????????throw?new?Error(toe);?//?cannot?happen
          ????????}
          ????}
          //定時等待
          public?int?await(long?timeout,?TimeUnit?unit)throws?InterruptedException,
          ??????????????BrokenBarrierException,
          ??????????????TimeoutException?
          {
          ???????return?dowait(true,?unit.toNanos(timeout));
          ???}

          可以看到,最終兩個方法都走【dowait】 方法,只不過參數(shù)不同。下面我們重點看看這個方法到底做了哪些事情。

          //核心等待方法
          ?private?int?dowait(boolean?timed,?long?nanos)
          ????????throws?InterruptedException,?BrokenBarrierException,
          ???????????????TimeoutException?
          {
          ????????final?ReentrantLock?lock?=?this.lock;
          ????????lock.lock();//加鎖操作
          ????????try?{
          ????????????final?Generation?g?=?generation;
          ????????????//檢查當前柵欄是否被打翻
          ????????????if?(g.broken)
          ????????????????throw?new?BrokenBarrierException();
          ????????????//檢查當前線程是否被中斷
          ????????????if?(Thread.interrupted())?{
          ????????????????breakBarrier();
          ????????????????throw?new?InterruptedException();
          ????????????}
          ????????????//每次都將計數(shù)器的值-1
          ????????????int?index?=?--count;
          ????????????//計數(shù)器的值減為0,則需要喚醒所有線程并轉換到下一代
          ????????????if?(index?==?0)?{??//?tripped
          ????????????????boolean?ranAction?=?false;
          ????????????????try?{
          ????????????????????//喚醒所有線程前先執(zhí)行指定的任務
          ????????????????????final?Runnable?command?=?barrierCommand;
          ????????????????????if?(command?!=?null)
          ????????????????????????command.run();
          ????????????????????ranAction?=?true;
          ????????????????????//喚醒所有線程并轉換到下一代
          ????????????????????nextGeneration();
          ????????????????????return?0;
          ????????????????}?finally?{
          ????????????????????//確保在任務未成功執(zhí)行時能將所有線程喚醒
          ????????????????????if?(!ranAction)
          ????????????????????????breakBarrier();
          ????????????????}
          ????????????}
          ????????????//如果計數(shù)器不為0?則執(zhí)行此循環(huán)
          ????????????//?loop?until?tripped,?broken,?interrupted,?or?timed?out
          ????????????for?(;;)?{
          ????????????????try?{
          ????????????????????//根據(jù)傳入的參數(shù)來覺得是定時等待還是非定時等待
          ????????????????????if?(!timed)
          ????????????????????????//如果沒有時間限制,則直接等待,直到被喚醒
          ????????????????????????trip.await();
          ????????????????????else?if?(nanos?>?0L)
          ????????????????????????//如果有時間限制,則等待指定時間
          ????????????????????????nanos?=?trip.awaitNanos(nanos);
          ????????????????}?catch?(InterruptedException?ie)?{
          ????????????????????//若當前線程在等待期間被中斷則打翻柵欄喚醒其它線程
          ????????????????????if?(g?==?generation?&&?!?g.broken)?{
          ????????????????????????breakBarrier();
          ????????????????????????throw?ie;
          ????????????????????}?else?{
          ????????????????????????//?若在捕獲中斷異常前已經(jīng)完成在柵欄上的等待,則直接調(diào)用中斷操作
          ????????????????????????Thread.currentThread().interrupt();
          ????????????????????}
          ????????????????}
          ????????????????//如果線程因為打翻柵欄操作而被喚醒則拋出異常
          ????????????????if?(g.broken)
          ????????????????????throw?new?BrokenBarrierException();
          ????????????????//如果線程因為換代操作而被喚醒則返回計數(shù)器的值
          ????????????????if?(g?!=?generation)
          ????????????????????return?index;
          ????????????????//如果線程因為時間到了而被喚醒則打翻柵欄并拋出異常
          ????????????????if?(timed?&&?nanos?<=?0L)?{
          ????????????????????breakBarrier();
          ????????????????????throw?new?TimeoutException();
          ????????????????}
          ????????????}
          ????????}?finally?{
          ????????????lock.unlock();//最終解鎖
          ????????}
          ????}

          分兩步分析,首先計數(shù)器的值減為0的情況,和計數(shù)器不為0的情況,首先第一種情況下:

          img

          第二種情況,計數(shù)器不為0,則進入自旋for(;;):

          多線程同時并發(fā)訪問,如何阻塞當前線程?

          我們翻看源碼,這里就看一下沒有時間限制的【trip.await】方法:

          整個await的過程:

          1、將當前線程加入到Condition鎖隊列中。特別主要要區(qū)分AQS的等待隊列,這里進入的是Condition的FIFO隊列

          2、釋放鎖。這里可以看到【fullyRelease】將鎖釋放了,否則【acquireQueued(node, savedState)】別的線程就無法拿到鎖而發(fā)生死鎖。

          3、自旋(while)掛起,直到被喚醒或者超時或者CACELLED等。

          4、獲取鎖【acquireQueued】方法,并將自己從Condition的FIFO隊列中釋放,表面自己不再需要鎖(我已經(jīng)有鎖了)

          3.2、Condition 隊列與AQS等待隊列 補充

          AQS等待隊列與Condition隊列是兩個相互獨立的隊列,【await】就是在當前線程持有鎖的基礎上釋放鎖資源,并新建Condition節(jié)點加入到Condition隊列尾部,阻塞當前線程。【signal】就是將當前Condition的頭結點移動到AQS等待隊列節(jié)點尾部,讓其等待再次獲取鎖。下面畫圖演示區(qū)別:

          節(jié)點1執(zhí)行Condition.await()->(1)將head后移 ->(2)釋放節(jié)點1的鎖并從AQS等待隊列中移除->(3)將節(jié)點1加入到Condition的等待隊列中->(4)更新lastWrite為節(jié)點1

          節(jié)點2執(zhí)行signal()操作->(1)將firstWrite后移->(2)將節(jié)點4移出Condition隊列->(3)將節(jié)點4加入到AQS的等待隊列中去->(4)更新AQS等待隊列的tail

          3.3、總結:

          一、Condition的數(shù)據(jù)結構:

          我們知道一個Condition可以在多個地方被await(),那么就需要一個FIFO的結構將這些Condition串聯(lián)起來,然后根據(jù)需要喚醒一個或者多個(通常是所有)。所以在Condition內(nèi)部就需要一個FIFO的隊列。private transient Node firstWaiter; private transient Node lastWaiter;上面的兩個節(jié)點就是描述一個FIFO的隊列。我們再結合前面提到的節(jié)點(Node)數(shù)據(jù)結構。我們就發(fā)現(xiàn)Node.nextWaiter就派上用場了!nextWaiter就是將一系列的Condition.await 串聯(lián)起來組成一個FIFO的隊列。

          二、線程何時阻塞和釋放

          阻塞:await()方法中,在線程釋放鎖資源之后,如果節(jié)點不在AQS等待隊列,則阻塞當前線程,如果在等待隊列,則自旋等待嘗試獲取鎖 釋放:signal()后,節(jié)點會從condition隊列移動到AQS等待隊列,則進入正常鎖的獲取流程。

          3.4、【signalAll】signalAll源碼分析

          signalAll】方法,喚醒所有在Condition阻塞隊列中的線程

          private?void?breakBarrier()?{
          ????????generation.broken?=?true;
          ????????count?=?parties;
          ????????trip.signalAll();//喚醒Condition中等待的線程
          ????}

          public?final?void?signalAll()?{
          ????????????if?(!isHeldExclusively())
          ????????????????throw?new?IllegalMonitorStateException();
          ????????????Node?first?=?firstWaiter;
          ????????????if?(first?!=?null)
          ????????????????doSignalAll(first);
          ?????}
          /**?這個方法相當于把Condition隊列中的所有Node全部取出插入到等待隊列中去?*/
          private?void?doSignalAll(Node?first)?{
          ????????????lastWaiter?=?firstWaiter?=?null;
          ????????????do?{
          ????????????????Node?next?=?first.nextWaiter;
          ????????????????first.nextWaiter?=?null;
          ????????????????transferForSignal(first);
          ????????????????first?=?next;
          ????????????}?while?(first?!=?null);
          ???????}
          /**?將節(jié)點從條件隊列傳輸?shù)酵疥犃蠥QS的等待隊列中?*/
          final?boolean?transferForSignal(Node?node)?{
          ????????//核心添加節(jié)點到AQS隊列方法
          ????????Node?p?=?enq(node);
          ????????int?ws?=?p.waitStatus;
          ????????if?(ws?>?0?||?!compareAndSetWaitStatus(p,?ws,?Node.SIGNAL))
          ????????????LockSupport.unpark(node.thread);
          ????????return?true;
          ????}
          /**?使用CAS+自旋方式插入節(jié)點到等待隊列,如果隊列為空,則初始化隊列?*/
          private?Node?enq(final?Node?node)?{
          ????????for?(;;)?{
          ????????????Node?t?=?tail;
          ????????????if?(t?==?null)?{?//?Must?initialize
          ????????????????if?(compareAndSetHead(new?Node()))
          ????????????????????tail?=?head;
          ????????????}?else?{
          ????????????????node.prev?=?t;
          ????????????????if?(compareAndSetTail(t,?node))?{
          ????????????????????t.next?=?node;
          ????????????????????return?t;
          ????????????????}
          ????????????}
          ????????}

          3.5、【reset】方法源碼分析

          最后,我們來看看怎么重置一個柵欄:

          將屏障重置為初始狀態(tài)。如果任何一方目前在隔離墻等候,他們將帶著BrokenBarrierException返回。請注意,由于其他原因發(fā)生中斷后的重置可能很復雜;線程需要以其他方式重新同步,并選擇一種方式執(zhí)行重置。最好是創(chuàng)建一個新的屏障供以后使用

          ????public?void?reset()?{
          ????????final?ReentrantLock?lock?=?this.lock;
          ????????lock.lock();
          ????????try?{
          ????????????breakBarrier();???//?break?the?current?generation
          ????????????nextGeneration();?//?start?a?new?generation
          ????????}?finally?{
          ????????????lock.unlock();
          ????????}
          ????}

          測試reset代碼:

          首先,打破柵欄,那意味著所有等待的線程(5個等待的線程)會喚醒,【await 】方法會通過拋出【BrokenBarrierException】異常返回。然后開啟新一代,重置了 count 和 generation,相當于一切歸0了。

          4、CyclicBarrier 與 CountDownLatch 的區(qū)別

          相同點:

          1、都可以實現(xiàn)一組線程在到達某個條件之前進行等待

          2、它們內(nèi)部都有一個計數(shù)器,當計數(shù)器的值不斷減為0的時候,所有阻塞的線程都會被喚醒!

          不同點:

          1、CyclicBarrier 的計數(shù)器是由它自己來控制,而CountDownLatch 的計數(shù)器則是由使用則來控制

          2、在CyclicBarrier 中線程調(diào)用 await方法不僅會將自己阻塞,還會將計數(shù)器減1,而在CountDownLatch中線程調(diào)用 await方法只是將自己阻塞而不會減少計數(shù)器的值。

          3、另外,CountDownLatch 只能攔截一輪,而CyclicBarrier 可以實現(xiàn)循環(huán)攔截。一般來說CyclicBarrier 可以實現(xiàn) CountDownLatch的功能,而反之不能。

          5、總結:

          當調(diào)用【cyclicBarrier.await】方法時,最終都會執(zhí)行【dowait】方法,使用了ReentrantLock去上鎖,每次講計數(shù)器count值-1,當計數(shù)器值-1為0的時候,會先執(zhí)行指定任務,調(diào)用Condition的【trip.signalAll()】喚醒所有線程并進入下一代

          如果當前計數(shù)器值-1不為0的時候,進入自旋,執(zhí)行Condition的【await()】方法,將當前線程添加到Condition的條件隊列中等待,執(zhí)行【fullyRelease】調(diào)用【tryRelease】將count值-1,再判斷count值是否為0,為0 則會先執(zhí)行指定任務,調(diào)用Condition的【trip.signalAll()】喚醒所有線程并進入下一代,再判斷是否在AQS等待隊列中,如果不在的話就park當前線程進入AQS等待隊列中,否則自旋直到被喚醒在Condition中的等待隊列被signalAll進入AQS等待隊列中獲取鎖


          往期推薦

          一個 Java 對象占據(jù)多少內(nèi)存?如何避免內(nèi)存泄露?

          Java真的要涼了嗎?

          這滿屏的 if/ else,不解決就會被逼瘋!

          手動獲取 Spring 容器中的 Bean的 3 種方法

          END



          若覺得文章對你有幫助,隨手轉發(fā)分享,也是我們繼續(xù)更新的動力。


          長按二維碼,掃掃關注哦

          ?「C語言中文網(wǎng)」官方公眾號,關注手機閱讀教程??


          必備編程學習資料


          目前收集的資料包括:?Java,Python,C/C++,Linux,PHP,go,C#,QT,git/svn,人工智能,大數(shù)據(jù),單片機,算法,小程序,易語言,安卓,ios,PPT,軟件教程,前端,軟件測試,簡歷,畢業(yè)設計,公開課?等分類,資源在不斷更新中...


          點擊“閱讀原文”,立即免費領取最新資料!
          ??????
          瀏覽 53
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  波多野结衣中文字幕一区二区 | 人人操人人操人摸 | WWW高清视频一区在线观看 | 日韩午夜毛片 | 国产欧美在线免费观看 |