多線程進(jìn)階——JUC并發(fā)編程之CyclicBarrier源碼一探究竟
點(diǎn)擊上方?藍(lán)字?關(guān)注我們!
1、學(xué)習(xí)切入點(diǎn)

百度翻譯大概意思就是:
一種同步輔助程序,允許一組線程相互等待到達(dá)一個(gè)公共的屏障點(diǎn)。CyclicBarrier在涉及固定大小的線程方的程序中非常有用,這些線程方有時(shí)必須相互等待。這個(gè)屏障被稱為循環(huán)屏障,因?yàn)樗梢栽诘却木€程被釋放后重新使用。
CyclicBarrier支持可選的Runnable命令,該命令在參與方中的最后一個(gè)線程到達(dá)后,但在釋放任何線程之前,每個(gè)屏障點(diǎn)運(yùn)行一次。此屏障操作有助于在任何參與方繼續(xù)之前更新共享狀態(tài)。
動(dòng)圖演示:

在上文中我們分析完了 CountDownLatch源碼,可以理解為減法計(jì)數(shù)器,是基于AQS的共享模式使用,而CyclicBarrier相比于CountDownLatch 來說,要簡(jiǎn)單很多,它類似于加法計(jì)數(shù)器,在源碼中使用?ReentrantLock 和 Condition 的組合來使用。
2、案例演示 CyclicBarrier
//加法計(jì)數(shù)器
public?class?CyclicBarrierDemo?{
????public?static?void?main(String[]?args)?{
????????/**
?????????*?集齊5名隊(duì)員,開始游戲
?????????*/
????????//?開始戰(zhàn)斗的線程
????????CyclicBarrier?cyclicBarrier?=?new?CyclicBarrier(5,()->{
????????????System.out.println("歡迎來到王者榮耀,敵軍還有五秒到達(dá)戰(zhàn)場(chǎng)!全軍出擊!");
????????});
????????for?(int?i?=?1;?i?<=5?;?i++)?{
????????????final?int?temp?=?i;
????????????//?lambda能操作到?i?嗎
????????????new?Thread(()->{
????????????????System.out.println(Thread.currentThread().getName()+"第"+temp+"個(gè)進(jìn)入游戲!");
????????????????try?{
????????????????????cyclicBarrier.await();?//?等待
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}?catch?(BrokenBarrierException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}).start();
????????}
????}
}

3、入手構(gòu)造器
//構(gòu)造器1
/**?創(chuàng)建一個(gè)新的CyclicBarrier,它將在給定數(shù)量的參與方(線程)等待時(shí)觸發(fā),并在觸發(fā)屏障時(shí)執(zhí)行給定的屏障操作,由最后一個(gè)進(jìn)入屏障的線程執(zhí)行?*/???
public?CyclicBarrier(int?parties,?Runnable?barrierAction)?{
????????if?(parties?<=?0)?throw?new?IllegalArgumentException();
????????this.parties?=?parties;
????????this.count?=?parties;
????????this.barrierCommand?=?barrierAction;
????}
//構(gòu)造器2
/**?創(chuàng)建一個(gè)新的CyclicBarrier,當(dāng)給定數(shù)量的參與方(線程)在等待它時(shí),它將跳閘,并且在屏障跳閘時(shí)不執(zhí)行預(yù)定義的操作?*/
public?CyclicBarrier(int?parties)?{
????????this(parties,?null);
????}
其中構(gòu)造器1為核心構(gòu)造器,在這里你可以指定?parties?本局游戲的參與者的數(shù)量(要攔截的線程數(shù))以及?barrierAction?本局游戲結(jié)束時(shí)要執(zhí)行的任務(wù)。
3、入手成員變量
???/**?同步操作鎖?*/
????private?final?ReentrantLock?lock?=?new?ReentrantLock();
????/**?線程攔截器?Condition維護(hù)了一個(gè)阻塞隊(duì)列*/
????private?final?Condition?trip?=?lock.newCondition();
????/**?每次攔截的線程數(shù)?*/
????private?final?int?parties;
????/*?換代前執(zhí)行的任務(wù)?*/
????private?final?Runnable?barrierCommand;
????/**?表示柵欄的當(dāng)前代?類似代表本局游戲*/
????private?Generation?generation?=?new?Generation();
????/**?計(jì)數(shù)器?*/
????private?int?count;
????/**?靜態(tài)內(nèi)部類Generation??*/
????private?static?class?Generation?{
????????boolean?broken?=?false;
????}
3、入手核心方法

3.1、【await】方法源碼分析
下面分析這兩個(gè)方法,分別為【非定時(shí)等待】和【定時(shí)等待】!
//非定時(shí)等待
public?int?await()?throws?InterruptedException,?BrokenBarrierException?{
????????try?{
????????????return?dowait(false,?0L);
????????}?catch?(TimeoutException?toe)?{
????????????throw?new?Error(toe);?//?cannot?happen
????????}
????}
//定時(shí)等待
public?int?await(long?timeout,?TimeUnit?unit)throws?InterruptedException,
??????????????BrokenBarrierException,
??????????????TimeoutException?{
???????return?dowait(true,?unit.toNanos(timeout));
???}
可以看到,最終兩個(gè)方法都走【dowait】 方法,只不過參數(shù)不同。下面我們重點(diǎn)看看這個(gè)方法到底做了哪些事情。
//核心等待方法
?private?int?dowait(boolean?timed,?long?nanos)
????????throws?InterruptedException,?BrokenBarrierException,
???????????????TimeoutException?{
????????final?ReentrantLock?lock?=?this.lock;
????????lock.lock();//加鎖操作
????????try?{
????????????final?Generation?g?=?generation;
????????????//檢查當(dāng)前柵欄是否被打翻
????????????if?(g.broken)
????????????????throw?new?BrokenBarrierException();
????????????//檢查當(dāng)前線程是否被中斷
????????????if?(Thread.interrupted())?{
????????????????breakBarrier();
????????????????throw?new?InterruptedException();
????????????}
????????????//每次都將計(jì)數(shù)器的值-1
????????????int?index?=?--count;
????????????//計(jì)數(shù)器的值減為0,則需要喚醒所有線程并轉(zhuǎn)換到下一代
????????????if?(index?==?0)?{??//?tripped
????????????????boolean?ranAction?=?false;
????????????????try?{
????????????????????//喚醒所有線程前先執(zhí)行指定的任務(wù)
????????????????????final?Runnable?command?=?barrierCommand;
????????????????????if?(command?!=?null)
????????????????????????command.run();
????????????????????ranAction?=?true;
????????????????????//喚醒所有線程并轉(zhuǎn)換到下一代
????????????????????nextGeneration();
????????????????????return?0;
????????????????}?finally?{
????????????????????//確保在任務(wù)未成功執(zhí)行時(shí)能將所有線程喚醒
????????????????????if?(!ranAction)
????????????????????????breakBarrier();
????????????????}
????????????}
????????????//如果計(jì)數(shù)器不為0?則執(zhí)行此循環(huán)
????????????//?loop?until?tripped,?broken,?interrupted,?or?timed?out
????????????for?(;;)?{
????????????????try?{
????????????????????//根據(jù)傳入的參數(shù)來覺得是定時(shí)等待還是非定時(shí)等待
????????????????????if?(!timed)
????????????????????????//如果沒有時(shí)間限制,則直接等待,直到被喚醒
????????????????????????trip.await();
????????????????????else?if?(nanos?>?0L)
????????????????????????//如果有時(shí)間限制,則等待指定時(shí)間
????????????????????????nanos?=?trip.awaitNanos(nanos);
????????????????}?catch?(InterruptedException?ie)?{
????????????????????//若當(dāng)前線程在等待期間被中斷則打翻柵欄喚醒其它線程
????????????????????if?(g?==?generation?&&?!?g.broken)?{
????????????????????????breakBarrier();
????????????????????????throw?ie;
????????????????????}?else?{
????????????????????????//?若在捕獲中斷異常前已經(jīng)完成在柵欄上的等待,則直接調(diào)用中斷操作
????????????????????????Thread.currentThread().interrupt();
????????????????????}
????????????????}
????????????????//如果線程因?yàn)榇蚍瓥艡诓僮鞫粏拘褎t拋出異常
????????????????if?(g.broken)
????????????????????throw?new?BrokenBarrierException();
????????????????//如果線程因?yàn)閾Q代操作而被喚醒則返回計(jì)數(shù)器的值
????????????????if?(g?!=?generation)
????????????????????return?index;
????????????????//如果線程因?yàn)闀r(shí)間到了而被喚醒則打翻柵欄并拋出異常
????????????????if?(timed?&&?nanos?<=?0L)?{
????????????????????breakBarrier();
????????????????????throw?new?TimeoutException();
????????????????}
????????????}
????????}?finally?{
????????????lock.unlock();//最終解鎖
????????}
????}
分兩步分析,首先計(jì)數(shù)器的值減為0的情況,和計(jì)數(shù)器不為0的情況,首先第一種情況下:

第二種情況,計(jì)數(shù)器不為0,則進(jìn)入自旋for(;;):

多線程同時(shí)并發(fā)訪問,如何阻塞當(dāng)前線程?

我們翻看源碼,這里就看一下沒有時(shí)間限制的【trip.await】方法:

整個(gè)await的過程:
1、將當(dāng)前線程加入到Condition鎖隊(duì)列中。特別主要要區(qū)分AQS的等待隊(duì)列,這里進(jìn)入的是Condition的FIFO隊(duì)列
2、釋放鎖。這里可以看到【fullyRelease】將鎖釋放了,否則【acquireQueued(node, savedState)】別的線程就無法拿到鎖而發(fā)生死鎖。
3、自旋(while)掛起,直到被喚醒或者超時(shí)或者CACELLED等。
4、獲取鎖【acquireQueued】方法,并將自己從Condition的FIFO隊(duì)列中釋放,表面自己不再需要鎖(我已經(jīng)有鎖了)
3.2、Condition 隊(duì)列與AQS等待隊(duì)列 補(bǔ)充
AQS等待隊(duì)列與Condition隊(duì)列是兩個(gè)相互獨(dú)立的隊(duì)列,【await】就是在當(dāng)前線程持有鎖的基礎(chǔ)上釋放鎖資源,并新建Condition節(jié)點(diǎn)加入到Condition隊(duì)列尾部,阻塞當(dāng)前線程。【signal】就是將當(dāng)前Condition的頭結(jié)點(diǎn)移動(dòng)到AQS等待隊(duì)列節(jié)點(diǎn)尾部,讓其等待再次獲取鎖。下面畫圖演示區(qū)別:

節(jié)點(diǎn)1執(zhí)行Condition.await()->(1)將head后移 ->(2)釋放節(jié)點(diǎn)1的鎖并從AQS等待隊(duì)列中移除->(3)將節(jié)點(diǎn)1加入到Condition的等待隊(duì)列中->(4)更新lastWrite為節(jié)點(diǎn)1

節(jié)點(diǎn)2執(zhí)行signal()操作->(1)將firstWrite后移->(2)將節(jié)點(diǎn)4移出Condition隊(duì)列->(3)將節(jié)點(diǎn)4加入到AQS的等待隊(duì)列中去->(4)更新AQS等待隊(duì)列的tail

3.3、總結(jié):
一、Condition的數(shù)據(jù)結(jié)構(gòu):

我們知道一個(gè)Condition可以在多個(gè)地方被await(),那么就需要一個(gè)FIFO的結(jié)構(gòu)將這些Condition串聯(lián)起來,然后根據(jù)需要喚醒一個(gè)或者多個(gè)(通常是所有)。所以在Condition內(nèi)部就需要一個(gè)FIFO的隊(duì)列。private transient Node firstWaiter; private transient Node lastWaiter;上面的兩個(gè)節(jié)點(diǎn)就是描述一個(gè)FIFO的隊(duì)列。我們?cè)俳Y(jié)合前面提到的節(jié)點(diǎn)(Node)數(shù)據(jù)結(jié)構(gòu)。我們就發(fā)現(xiàn)Node.nextWaiter就派上用場(chǎng)了!nextWaiter就是將一系列的Condition.await 串聯(lián)起來組成一個(gè)FIFO的隊(duì)列。
二、線程何時(shí)阻塞和釋放
阻塞:await()方法中,在線程釋放鎖資源之后,如果節(jié)點(diǎn)不在AQS等待隊(duì)列,則阻塞當(dāng)前線程,如果在等待隊(duì)列,則自旋等待嘗試獲取鎖 釋放:signal()后,節(jié)點(diǎn)會(huì)從condition隊(duì)列移動(dòng)到AQS等待隊(duì)列,則進(jìn)入正常鎖的獲取流程。
3.4、【signalAll】signalAll源碼分析
【signalAll】方法,喚醒所有在Condition阻塞隊(duì)列中的線程
private?void?breakBarrier()?{
????????generation.broken?=?true;
????????count?=?parties;
????????trip.signalAll();//喚醒Condition中等待的線程
????}
public?final?void?signalAll()?{
????????????if?(!isHeldExclusively())
????????????????throw?new?IllegalMonitorStateException();
????????????Node?first?=?firstWaiter;
????????????if?(first?!=?null)
????????????????doSignalAll(first);
?????}
/**?這個(gè)方法相當(dāng)于把Condition隊(duì)列中的所有Node全部取出插入到等待隊(duì)列中去?*/
private?void?doSignalAll(Node?first)?{
????????????lastWaiter?=?firstWaiter?=?null;
????????????do?{
????????????????Node?next?=?first.nextWaiter;
????????????????first.nextWaiter?=?null;
????????????????transferForSignal(first);
????????????????first?=?next;
????????????}?while?(first?!=?null);
???????}
/**?將節(jié)點(diǎn)從條件隊(duì)列傳輸?shù)酵疥?duì)列AQS的等待隊(duì)列中?*/
final?boolean?transferForSignal(Node?node)?{
????????//核心添加節(jié)點(diǎn)到AQS隊(duì)列方法
????????Node?p?=?enq(node);
????????int?ws?=?p.waitStatus;
????????if?(ws?>?0?||?!compareAndSetWaitStatus(p,?ws,?Node.SIGNAL))
????????????LockSupport.unpark(node.thread);
????????return?true;
????}
/**?使用CAS+自旋方式插入節(jié)點(diǎn)到等待隊(duì)列,如果隊(duì)列為空,則初始化隊(duì)列?*/
private?Node?enq(final?Node?node)?{
????????for?(;;)?{
????????????Node?t?=?tail;
????????????if?(t?==?null)?{?//?Must?initialize
????????????????if?(compareAndSetHead(new?Node()))
????????????????????tail?=?head;
????????????}?else?{
????????????????node.prev?=?t;
????????????????if?(compareAndSetTail(t,?node))?{
????????????????????t.next?=?node;
????????????????????return?t;
????????????????}
????????????}
????????}
3.5、【reset】方法源碼分析
最后,我們來看看怎么重置一個(gè)柵欄:

將屏障重置為初始狀態(tài)。如果任何一方目前在隔離墻等候,他們將帶著BrokenBarrierException返回。請(qǐng)注意,由于其他原因發(fā)生中斷后的重置可能很復(fù)雜;線程需要以其他方式重新同步,并選擇一種方式執(zhí)行重置。最好是創(chuàng)建一個(gè)新的屏障供以后使用
????public?void?reset()?{
????????final?ReentrantLock?lock?=?this.lock;
????????lock.lock();
????????try?{
????????????breakBarrier();???//?break?the?current?generation
????????????nextGeneration();?//?start?a?new?generation
????????}?finally?{
????????????lock.unlock();
????????}
????}
測(cè)試reset代碼:


首先,打破柵欄,那意味著所有等待的線程(5個(gè)等待的線程)會(huì)喚醒,【await 】方法會(huì)通過拋出【BrokenBarrierException】異常返回。然后開啟新一代,重置了 count 和 generation,相當(dāng)于一切歸0了。
4、CyclicBarrier 與 CountDownLatch 的區(qū)別
相同點(diǎn):
1、都可以實(shí)現(xiàn)一組線程在到達(dá)某個(gè)條件之前進(jìn)行等待
2、它們內(nèi)部都有一個(gè)計(jì)數(shù)器,當(dāng)計(jì)數(shù)器的值不斷減為0的時(shí)候,所有阻塞的線程都會(huì)被喚醒!
不同點(diǎn):
1、CyclicBarrier 的計(jì)數(shù)器是由它自己來控制,而CountDownLatch 的計(jì)數(shù)器則是由使用則來控制
2、在CyclicBarrier 中線程調(diào)用 await方法不僅會(huì)將自己阻塞,還會(huì)將計(jì)數(shù)器減1,而在CountDownLatch中線程調(diào)用 await方法只是將自己阻塞而不會(huì)減少計(jì)數(shù)器的值。
3、另外,CountDownLatch 只能攔截一輪,而CyclicBarrier 可以實(shí)現(xiàn)循環(huán)攔截。一般來說CyclicBarrier 可以實(shí)現(xiàn) CountDownLatch的功能,而反之不能。
5、總結(jié):
當(dāng)調(diào)用【cyclicBarrier.await】方法時(shí),最終都會(huì)執(zhí)行【dowait】方法,使用了ReentrantLock去上鎖,每次講計(jì)數(shù)器count值-1,當(dāng)計(jì)數(shù)器值-1為0的時(shí)候,會(huì)先執(zhí)行指定任務(wù),調(diào)用Condition的【trip.signalAll()】喚醒所有線程并進(jìn)入下一代
如果當(dāng)前計(jì)數(shù)器值-1不為0的時(shí)候,進(jìn)入自旋,執(zhí)行Condition的【await()】方法,將當(dāng)前線程添加到Condition的條件隊(duì)列中等待,執(zhí)行【fullyRelease】調(diào)用【tryRelease】將count值-1,再判斷count值是否為0,為0 則會(huì)先執(zhí)行指定任務(wù),調(diào)用Condition的【trip.signalAll()】喚醒所有線程并進(jìn)入下一代,再判斷是否在AQS等待隊(duì)列中,如果不在的話就park當(dāng)前線程進(jìn)入AQS等待隊(duì)列中,否則自旋直到被喚醒在Condition中的等待隊(duì)列被signalAll進(jìn)入AQS等待隊(duì)列中獲取鎖
往期推薦
END
若覺得文章對(duì)你有幫助,隨手轉(zhuǎn)發(fā)分享,也是我們繼續(xù)更新的動(dòng)力。
長(zhǎng)按二維碼,掃掃關(guān)注哦
?「C語言中文網(wǎng)」官方公眾號(hào),關(guān)注手機(jī)閱讀教程??
目前收集的資料包括:?Java,Python,C/C++,Linux,PHP,go,C#,QT,git/svn,人工智能,大數(shù)據(jù),單片機(jī),算法,小程序,易語言,安卓,ios,PPT,軟件教程,前端,軟件測(cè)試,簡(jiǎn)歷,畢業(yè)設(shè)計(jì),公開課?等分類,資源在不斷更新中...

