<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          多線程進(jìn)階——JUC并發(fā)編程之CyclicBarrier源碼一探究竟

          共 6123字,需瀏覽 13分鐘

           ·

          2021-01-23 06:12


          點(diǎn)擊上方?藍(lán)字?關(guān)注我們!



          Java,Python,C/C++,Linux,PHP,Go,C#,QT,大數(shù)據(jù),算法,軟件教程,前端,簡(jiǎn)歷,畢業(yè)設(shè)計(jì)等分類,資源在不斷更新中... 點(diǎn)擊領(lǐng)取

          1、學(xué)習(xí)切入點(diǎn)

          百度翻譯大概意思就是:

          一種同步輔助程序,允許一組線程相互等待到達(dá)一個(gè)公共的屏障點(diǎn)。CyclicBarrier在涉及固定大小的線程方的程序中非常有用,這些線程方有時(shí)必須相互等待。這個(gè)屏障被稱為循環(huán)屏障,因?yàn)樗梢栽诘却木€程被釋放后重新使用。

          CyclicBarrier支持可選的Runnable命令,該命令在參與方中的最后一個(gè)線程到達(dá)后,但在釋放任何線程之前,每個(gè)屏障點(diǎn)運(yùn)行一次。此屏障操作有助于在任何參與方繼續(xù)之前更新共享狀態(tài)。

          動(dòng)圖演示:

          在上文中我們分析完了 CountDownLatch源碼,可以理解為減法計(jì)數(shù)器,是基于AQS的共享模式使用,而CyclicBarrier相比于CountDownLatch 來說,要簡(jiǎn)單很多,它類似于加法計(jì)數(shù)器,在源碼中使用?ReentrantLock 和 Condition 的組合來使用。

          2、案例演示 CyclicBarrier

          //加法計(jì)數(shù)器
          public?class?CyclicBarrierDemo?{
          ????public?static?void?main(String[]?args)?{
          ????????/**
          ?????????*?集齊5名隊(duì)員,開始游戲
          ?????????*/

          ????????//?開始戰(zhàn)斗的線程
          ????????CyclicBarrier?cyclicBarrier?=?new?CyclicBarrier(5,()->{
          ????????????System.out.println("歡迎來到王者榮耀,敵軍還有五秒到達(dá)戰(zhàn)場(chǎng)!全軍出擊!");
          ????????});
          ????????for?(int?i?=?1;?i?<=5?;?i++)?{
          ????????????final?int?temp?=?i;
          ????????????//?lambda能操作到?i?嗎
          ????????????new?Thread(()->{
          ????????????????System.out.println(Thread.currentThread().getName()+"第"+temp+"個(gè)進(jìn)入游戲!");
          ????????????????try?{
          ????????????????????cyclicBarrier.await();?//?等待
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}?catch?(BrokenBarrierException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}
          ????????????}).start();
          ????????}
          ????}
          }

          3、入手構(gòu)造器

          //構(gòu)造器1
          /**?創(chuàng)建一個(gè)新的CyclicBarrier,它將在給定數(shù)量的參與方(線程)等待時(shí)觸發(fā),并在觸發(fā)屏障時(shí)執(zhí)行給定的屏障操作,由最后一個(gè)進(jìn)入屏障的線程執(zhí)行?*/???
          public?CyclicBarrier(int?parties,?Runnable?barrierAction)?{
          ????????if?(parties?<=?0)?throw?new?IllegalArgumentException();
          ????????this.parties?=?parties;
          ????????this.count?=?parties;
          ????????this.barrierCommand?=?barrierAction;
          ????}

          //構(gòu)造器2
          /**?創(chuàng)建一個(gè)新的CyclicBarrier,當(dāng)給定數(shù)量的參與方(線程)在等待它時(shí),它將跳閘,并且在屏障跳閘時(shí)不執(zhí)行預(yù)定義的操作?*/
          public?CyclicBarrier(int?parties)?{
          ????????this(parties,?null);
          ????}

          其中構(gòu)造器1為核心構(gòu)造器,在這里你可以指定?parties?本局游戲的參與者的數(shù)量(要攔截的線程數(shù))以及?barrierAction?本局游戲結(jié)束時(shí)要執(zhí)行的任務(wù)。

          3、入手成員變量

          ???/**?同步操作鎖?*/
          ????private?final?ReentrantLock?lock?=?new?ReentrantLock();
          ????/**?線程攔截器?Condition維護(hù)了一個(gè)阻塞隊(duì)列*/
          ????private?final?Condition?trip?=?lock.newCondition();
          ????/**?每次攔截的線程數(shù)?*/
          ????private?final?int?parties;
          ????/*?換代前執(zhí)行的任務(wù)?*/
          ????private?final?Runnable?barrierCommand;
          ????/**?表示柵欄的當(dāng)前代?類似代表本局游戲*/
          ????private?Generation?generation?=?new?Generation();
          ????/**?計(jì)數(shù)器?*/
          ????private?int?count;
          ????/**?靜態(tài)內(nèi)部類Generation??*/
          ????private?static?class?Generation?{
          ????????boolean?broken?=?false;
          ????}

          3、入手核心方法

          3.1、【await】方法源碼分析

          下面分析這兩個(gè)方法,分別為【非定時(shí)等待】和【定時(shí)等待】!

          //非定時(shí)等待
          public?int?await()?throws?InterruptedException,?BrokenBarrierException?{
          ????????try?{
          ????????????return?dowait(false,?0L);
          ????????}?catch?(TimeoutException?toe)?{
          ????????????throw?new?Error(toe);?//?cannot?happen
          ????????}
          ????}
          //定時(shí)等待
          public?int?await(long?timeout,?TimeUnit?unit)throws?InterruptedException,
          ??????????????BrokenBarrierException,
          ??????????????TimeoutException?
          {
          ???????return?dowait(true,?unit.toNanos(timeout));
          ???}

          可以看到,最終兩個(gè)方法都走【dowait】 方法,只不過參數(shù)不同。下面我們重點(diǎn)看看這個(gè)方法到底做了哪些事情。

          //核心等待方法
          ?private?int?dowait(boolean?timed,?long?nanos)
          ????????throws?InterruptedException,?BrokenBarrierException,
          ???????????????TimeoutException?
          {
          ????????final?ReentrantLock?lock?=?this.lock;
          ????????lock.lock();//加鎖操作
          ????????try?{
          ????????????final?Generation?g?=?generation;
          ????????????//檢查當(dāng)前柵欄是否被打翻
          ????????????if?(g.broken)
          ????????????????throw?new?BrokenBarrierException();
          ????????????//檢查當(dāng)前線程是否被中斷
          ????????????if?(Thread.interrupted())?{
          ????????????????breakBarrier();
          ????????????????throw?new?InterruptedException();
          ????????????}
          ????????????//每次都將計(jì)數(shù)器的值-1
          ????????????int?index?=?--count;
          ????????????//計(jì)數(shù)器的值減為0,則需要喚醒所有線程并轉(zhuǎn)換到下一代
          ????????????if?(index?==?0)?{??//?tripped
          ????????????????boolean?ranAction?=?false;
          ????????????????try?{
          ????????????????????//喚醒所有線程前先執(zhí)行指定的任務(wù)
          ????????????????????final?Runnable?command?=?barrierCommand;
          ????????????????????if?(command?!=?null)
          ????????????????????????command.run();
          ????????????????????ranAction?=?true;
          ????????????????????//喚醒所有線程并轉(zhuǎn)換到下一代
          ????????????????????nextGeneration();
          ????????????????????return?0;
          ????????????????}?finally?{
          ????????????????????//確保在任務(wù)未成功執(zhí)行時(shí)能將所有線程喚醒
          ????????????????????if?(!ranAction)
          ????????????????????????breakBarrier();
          ????????????????}
          ????????????}
          ????????????//如果計(jì)數(shù)器不為0?則執(zhí)行此循環(huán)
          ????????????//?loop?until?tripped,?broken,?interrupted,?or?timed?out
          ????????????for?(;;)?{
          ????????????????try?{
          ????????????????????//根據(jù)傳入的參數(shù)來覺得是定時(shí)等待還是非定時(shí)等待
          ????????????????????if?(!timed)
          ????????????????????????//如果沒有時(shí)間限制,則直接等待,直到被喚醒
          ????????????????????????trip.await();
          ????????????????????else?if?(nanos?>?0L)
          ????????????????????????//如果有時(shí)間限制,則等待指定時(shí)間
          ????????????????????????nanos?=?trip.awaitNanos(nanos);
          ????????????????}?catch?(InterruptedException?ie)?{
          ????????????????????//若當(dāng)前線程在等待期間被中斷則打翻柵欄喚醒其它線程
          ????????????????????if?(g?==?generation?&&?!?g.broken)?{
          ????????????????????????breakBarrier();
          ????????????????????????throw?ie;
          ????????????????????}?else?{
          ????????????????????????//?若在捕獲中斷異常前已經(jīng)完成在柵欄上的等待,則直接調(diào)用中斷操作
          ????????????????????????Thread.currentThread().interrupt();
          ????????????????????}
          ????????????????}
          ????????????????//如果線程因?yàn)榇蚍瓥艡诓僮鞫粏拘褎t拋出異常
          ????????????????if?(g.broken)
          ????????????????????throw?new?BrokenBarrierException();
          ????????????????//如果線程因?yàn)閾Q代操作而被喚醒則返回計(jì)數(shù)器的值
          ????????????????if?(g?!=?generation)
          ????????????????????return?index;
          ????????????????//如果線程因?yàn)闀r(shí)間到了而被喚醒則打翻柵欄并拋出異常
          ????????????????if?(timed?&&?nanos?<=?0L)?{
          ????????????????????breakBarrier();
          ????????????????????throw?new?TimeoutException();
          ????????????????}
          ????????????}
          ????????}?finally?{
          ????????????lock.unlock();//最終解鎖
          ????????}
          ????}

          分兩步分析,首先計(jì)數(shù)器的值減為0的情況,和計(jì)數(shù)器不為0的情況,首先第一種情況下:

          img

          第二種情況,計(jì)數(shù)器不為0,則進(jìn)入自旋for(;;):

          多線程同時(shí)并發(fā)訪問,如何阻塞當(dāng)前線程?

          我們翻看源碼,這里就看一下沒有時(shí)間限制的【trip.await】方法:

          整個(gè)await的過程:

          1、將當(dāng)前線程加入到Condition鎖隊(duì)列中。特別主要要區(qū)分AQS的等待隊(duì)列,這里進(jìn)入的是Condition的FIFO隊(duì)列

          2、釋放鎖。這里可以看到【fullyRelease】將鎖釋放了,否則【acquireQueued(node, savedState)】別的線程就無法拿到鎖而發(fā)生死鎖。

          3、自旋(while)掛起,直到被喚醒或者超時(shí)或者CACELLED等。

          4、獲取鎖【acquireQueued】方法,并將自己從Condition的FIFO隊(duì)列中釋放,表面自己不再需要鎖(我已經(jīng)有鎖了)

          3.2、Condition 隊(duì)列與AQS等待隊(duì)列 補(bǔ)充

          AQS等待隊(duì)列與Condition隊(duì)列是兩個(gè)相互獨(dú)立的隊(duì)列,【await】就是在當(dāng)前線程持有鎖的基礎(chǔ)上釋放鎖資源,并新建Condition節(jié)點(diǎn)加入到Condition隊(duì)列尾部,阻塞當(dāng)前線程。【signal】就是將當(dāng)前Condition的頭結(jié)點(diǎn)移動(dòng)到AQS等待隊(duì)列節(jié)點(diǎn)尾部,讓其等待再次獲取鎖。下面畫圖演示區(qū)別:

          節(jié)點(diǎn)1執(zhí)行Condition.await()->(1)將head后移 ->(2)釋放節(jié)點(diǎn)1的鎖并從AQS等待隊(duì)列中移除->(3)將節(jié)點(diǎn)1加入到Condition的等待隊(duì)列中->(4)更新lastWrite為節(jié)點(diǎn)1

          節(jié)點(diǎn)2執(zhí)行signal()操作->(1)將firstWrite后移->(2)將節(jié)點(diǎn)4移出Condition隊(duì)列->(3)將節(jié)點(diǎn)4加入到AQS的等待隊(duì)列中去->(4)更新AQS等待隊(duì)列的tail

          3.3、總結(jié):

          一、Condition的數(shù)據(jù)結(jié)構(gòu):

          我們知道一個(gè)Condition可以在多個(gè)地方被await(),那么就需要一個(gè)FIFO的結(jié)構(gòu)將這些Condition串聯(lián)起來,然后根據(jù)需要喚醒一個(gè)或者多個(gè)(通常是所有)。所以在Condition內(nèi)部就需要一個(gè)FIFO的隊(duì)列。private transient Node firstWaiter; private transient Node lastWaiter;上面的兩個(gè)節(jié)點(diǎn)就是描述一個(gè)FIFO的隊(duì)列。我們?cè)俳Y(jié)合前面提到的節(jié)點(diǎn)(Node)數(shù)據(jù)結(jié)構(gòu)。我們就發(fā)現(xiàn)Node.nextWaiter就派上用場(chǎng)了!nextWaiter就是將一系列的Condition.await 串聯(lián)起來組成一個(gè)FIFO的隊(duì)列。

          二、線程何時(shí)阻塞和釋放

          阻塞:await()方法中,在線程釋放鎖資源之后,如果節(jié)點(diǎn)不在AQS等待隊(duì)列,則阻塞當(dāng)前線程,如果在等待隊(duì)列,則自旋等待嘗試獲取鎖 釋放:signal()后,節(jié)點(diǎn)會(huì)從condition隊(duì)列移動(dòng)到AQS等待隊(duì)列,則進(jìn)入正常鎖的獲取流程。

          3.4、【signalAll】signalAll源碼分析

          signalAll】方法,喚醒所有在Condition阻塞隊(duì)列中的線程

          private?void?breakBarrier()?{
          ????????generation.broken?=?true;
          ????????count?=?parties;
          ????????trip.signalAll();//喚醒Condition中等待的線程
          ????}

          public?final?void?signalAll()?{
          ????????????if?(!isHeldExclusively())
          ????????????????throw?new?IllegalMonitorStateException();
          ????????????Node?first?=?firstWaiter;
          ????????????if?(first?!=?null)
          ????????????????doSignalAll(first);
          ?????}
          /**?這個(gè)方法相當(dāng)于把Condition隊(duì)列中的所有Node全部取出插入到等待隊(duì)列中去?*/
          private?void?doSignalAll(Node?first)?{
          ????????????lastWaiter?=?firstWaiter?=?null;
          ????????????do?{
          ????????????????Node?next?=?first.nextWaiter;
          ????????????????first.nextWaiter?=?null;
          ????????????????transferForSignal(first);
          ????????????????first?=?next;
          ????????????}?while?(first?!=?null);
          ???????}
          /**?將節(jié)點(diǎn)從條件隊(duì)列傳輸?shù)酵疥?duì)列AQS的等待隊(duì)列中?*/
          final?boolean?transferForSignal(Node?node)?{
          ????????//核心添加節(jié)點(diǎn)到AQS隊(duì)列方法
          ????????Node?p?=?enq(node);
          ????????int?ws?=?p.waitStatus;
          ????????if?(ws?>?0?||?!compareAndSetWaitStatus(p,?ws,?Node.SIGNAL))
          ????????????LockSupport.unpark(node.thread);
          ????????return?true;
          ????}
          /**?使用CAS+自旋方式插入節(jié)點(diǎn)到等待隊(duì)列,如果隊(duì)列為空,則初始化隊(duì)列?*/
          private?Node?enq(final?Node?node)?{
          ????????for?(;;)?{
          ????????????Node?t?=?tail;
          ????????????if?(t?==?null)?{?//?Must?initialize
          ????????????????if?(compareAndSetHead(new?Node()))
          ????????????????????tail?=?head;
          ????????????}?else?{
          ????????????????node.prev?=?t;
          ????????????????if?(compareAndSetTail(t,?node))?{
          ????????????????????t.next?=?node;
          ????????????????????return?t;
          ????????????????}
          ????????????}
          ????????}

          3.5、【reset】方法源碼分析

          最后,我們來看看怎么重置一個(gè)柵欄:

          將屏障重置為初始狀態(tài)。如果任何一方目前在隔離墻等候,他們將帶著BrokenBarrierException返回。請(qǐng)注意,由于其他原因發(fā)生中斷后的重置可能很復(fù)雜;線程需要以其他方式重新同步,并選擇一種方式執(zhí)行重置。最好是創(chuàng)建一個(gè)新的屏障供以后使用

          ????public?void?reset()?{
          ????????final?ReentrantLock?lock?=?this.lock;
          ????????lock.lock();
          ????????try?{
          ????????????breakBarrier();???//?break?the?current?generation
          ????????????nextGeneration();?//?start?a?new?generation
          ????????}?finally?{
          ????????????lock.unlock();
          ????????}
          ????}

          測(cè)試reset代碼:

          首先,打破柵欄,那意味著所有等待的線程(5個(gè)等待的線程)會(huì)喚醒,【await 】方法會(huì)通過拋出【BrokenBarrierException】異常返回。然后開啟新一代,重置了 count 和 generation,相當(dāng)于一切歸0了。

          4、CyclicBarrier 與 CountDownLatch 的區(qū)別

          相同點(diǎn):

          1、都可以實(shí)現(xiàn)一組線程在到達(dá)某個(gè)條件之前進(jìn)行等待

          2、它們內(nèi)部都有一個(gè)計(jì)數(shù)器,當(dāng)計(jì)數(shù)器的值不斷減為0的時(shí)候,所有阻塞的線程都會(huì)被喚醒!

          不同點(diǎn):

          1、CyclicBarrier 的計(jì)數(shù)器是由它自己來控制,而CountDownLatch 的計(jì)數(shù)器則是由使用則來控制

          2、在CyclicBarrier 中線程調(diào)用 await方法不僅會(huì)將自己阻塞,還會(huì)將計(jì)數(shù)器減1,而在CountDownLatch中線程調(diào)用 await方法只是將自己阻塞而不會(huì)減少計(jì)數(shù)器的值。

          3、另外,CountDownLatch 只能攔截一輪,而CyclicBarrier 可以實(shí)現(xiàn)循環(huán)攔截。一般來說CyclicBarrier 可以實(shí)現(xiàn) CountDownLatch的功能,而反之不能。

          5、總結(jié):

          當(dāng)調(diào)用【cyclicBarrier.await】方法時(shí),最終都會(huì)執(zhí)行【dowait】方法,使用了ReentrantLock去上鎖,每次講計(jì)數(shù)器count值-1,當(dāng)計(jì)數(shù)器值-1為0的時(shí)候,會(huì)先執(zhí)行指定任務(wù),調(diào)用Condition的【trip.signalAll()】喚醒所有線程并進(jìn)入下一代

          如果當(dāng)前計(jì)數(shù)器值-1不為0的時(shí)候,進(jìn)入自旋,執(zhí)行Condition的【await()】方法,將當(dāng)前線程添加到Condition的條件隊(duì)列中等待,執(zhí)行【fullyRelease】調(diào)用【tryRelease】將count值-1,再判斷count值是否為0,為0 則會(huì)先執(zhí)行指定任務(wù),調(diào)用Condition的【trip.signalAll()】喚醒所有線程并進(jìn)入下一代,再判斷是否在AQS等待隊(duì)列中,如果不在的話就park當(dāng)前線程進(jìn)入AQS等待隊(duì)列中,否則自旋直到被喚醒在Condition中的等待隊(duì)列被signalAll進(jìn)入AQS等待隊(duì)列中獲取鎖


          往期推薦

          HarmonyOS 手機(jī)應(yīng)用開發(fā)者 Beta 版到來,對(duì)開發(fā)者意味著什么

          JVM 內(nèi)存工具匯總,建議收藏!

          9 個(gè) JSON 工具,能提升你多少效率?

          GET 和 POST請(qǐng)求的本質(zhì)區(qū)別是什么?原來我一直理解錯(cuò)了

          END



          若覺得文章對(duì)你有幫助,隨手轉(zhuǎn)發(fā)分享,也是我們繼續(xù)更新的動(dòng)力。


          長(zhǎng)按二維碼,掃掃關(guān)注哦

          ?「C語言中文網(wǎng)」官方公眾號(hào),關(guān)注手機(jī)閱讀教程??


          必備編程學(xué)習(xí)資料


          目前收集的資料包括:?Java,Python,C/C++,Linux,PHP,go,C#,QT,git/svn,人工智能,大數(shù)據(jù),單片機(jī),算法,小程序,易語言,安卓,ios,PPT,軟件教程,前端,軟件測(cè)試,簡(jiǎn)歷,畢業(yè)設(shè)計(jì),公開課?等分類,資源在不斷更新中...


          點(diǎn)擊“閱讀原文”,立即免費(fèi)領(lǐng)取最新資料!
          ??????
          瀏覽 30
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  天天日天天爱 | 一道无码| 中文无码免费一区二区三区 | 午夜婷婷福利 | 嫩草影院成人 |