多線程進階-CyclicBarrier 源碼超詳細解析,學(xué)到就賺到
點擊上方?泥瓦匠 關(guān)注我!
老家浙江東海邊,靠海吃海,目前經(jīng)營一個小品牌,讓普通人吃到最新鮮的海鮮。有興趣可以點擊了解:《浙里有漁,鮮人一步!》???
1、學(xué)習(xí)切入點

百度翻譯大概意思就是:
一種同步輔助程序,允許一組線程相互等待到達一個公共的屏障點。CyclicBarrier在涉及固定大小的線程方的程序中非常有用,這些線程方有時必須相互等待。這個屏障被稱為循環(huán)屏障,因為它可以在等待的線程被釋放后重新使用。
CyclicBarrier支持可選的Runnable命令,該命令在參與方中的最后一個線程到達后,但在釋放任何線程之前,每個屏障點運行一次。此屏障操作有助于在任何參與方繼續(xù)之前更新共享狀態(tài)。
動圖演示:

在上文中我們分析完了 CountDownLatch源碼,可以理解為減法計數(shù)器,是基于AQS的共享模式使用,而CyclicBarrier相比于CountDownLatch 來說,要簡單很多,它類似于加法計數(shù)器,在源碼中使用?ReentrantLock 和 Condition 的組合來使用。
2、案例演示 CyclicBarrier
//加法計數(shù)器
public?class?CyclicBarrierDemo?{
????public?static?void?main(String[]?args)?{
????????/**
?????????*?集齊5名隊員,開始游戲
?????????*/
????????//?開始戰(zhàn)斗的線程
????????CyclicBarrier?cyclicBarrier?=?new?CyclicBarrier(5,()->{
????????????System.out.println("歡迎來到王者榮耀,敵軍還有五秒到達戰(zhàn)場!全軍出擊!");
????????});
????????for?(int?i?=?1;?i?<=5?;?i++)?{
????????????final?int?temp?=?i;
????????????//?lambda能操作到?i?嗎
????????????new?Thread(()->{
????????????????System.out.println(Thread.currentThread().getName()+"第"+temp+"個進入游戲!");
????????????????try?{
????????????????????cyclicBarrier.await();?//?等待
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}?catch?(BrokenBarrierException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}).start();
????????}
????}
}

3、入手構(gòu)造器
//構(gòu)造器1
/**?創(chuàng)建一個新的CyclicBarrier,它將在給定數(shù)量的參與方(線程)等待時觸發(fā),并在觸發(fā)屏障時執(zhí)行給定的屏障操作,由最后一個進入屏障的線程執(zhí)行?*/???
public?CyclicBarrier(int?parties,?Runnable?barrierAction)?{
????????if?(parties?<=?0)?throw?new?IllegalArgumentException();
????????this.parties?=?parties;
????????this.count?=?parties;
????????this.barrierCommand?=?barrierAction;
????}
//構(gòu)造器2
/**?創(chuàng)建一個新的CyclicBarrier,當給定數(shù)量的參與方(線程)在等待它時,它將跳閘,并且在屏障跳閘時不執(zhí)行預(yù)定義的操作?*/
public?CyclicBarrier(int?parties)?{
????????this(parties,?null);
????}
其中構(gòu)造器1為核心構(gòu)造器,在這里你可以指定?parties?本局游戲的參與者的數(shù)量(要攔截的線程數(shù))以及?barrierAction?本局游戲結(jié)束時要執(zhí)行的任務(wù)。
3、入手成員變量
???/**?同步操作鎖?*/
????private?final?ReentrantLock?lock?=?new?ReentrantLock();
????/**?線程攔截器?Condition維護了一個阻塞隊列*/
????private?final?Condition?trip?=?lock.newCondition();
????/**?每次攔截的線程數(shù)?*/
????private?final?int?parties;
????/*?換代前執(zhí)行的任務(wù)?*/
????private?final?Runnable?barrierCommand;
????/**?表示柵欄的當前代?類似代表本局游戲*/
????private?Generation?generation?=?new?Generation();
????/**?計數(shù)器?*/
????private?int?count;
????/**?靜態(tài)內(nèi)部類Generation??*/
????private?static?class?Generation?{
????????boolean?broken?=?false;
????}
3、入手核心方法

3.1、【await】方法源碼分析
下面分析這兩個方法,分別為【非定時等待】和【定時等待】!
//非定時等待
public?int?await()?throws?InterruptedException,?BrokenBarrierException?{
????????try?{
????????????return?dowait(false,?0L);
????????}?catch?(TimeoutException?toe)?{
????????????throw?new?Error(toe);?//?cannot?happen
????????}
????}
//定時等待
public?int?await(long?timeout,?TimeUnit?unit)throws?InterruptedException,
??????????????BrokenBarrierException,
??????????????TimeoutException?{
???????return?dowait(true,?unit.toNanos(timeout));
???}
可以看到,最終兩個方法都走【dowait】 方法,只不過參數(shù)不同。下面我們重點看看這個方法到底做了哪些事情。
//核心等待方法
?private?int?dowait(boolean?timed,?long?nanos)
????????throws?InterruptedException,?BrokenBarrierException,
???????????????TimeoutException?{
????????final?ReentrantLock?lock?=?this.lock;
????????lock.lock();//加鎖操作
????????try?{
????????????final?Generation?g?=?generation;
????????????//檢查當前柵欄是否被打翻
????????????if?(g.broken)
????????????????throw?new?BrokenBarrierException();
????????????//檢查當前線程是否被中斷
????????????if?(Thread.interrupted())?{
????????????????breakBarrier();
????????????????throw?new?InterruptedException();
????????????}
????????????//每次都將計數(shù)器的值-1
????????????int?index?=?--count;
????????????//計數(shù)器的值減為0,則需要喚醒所有線程并轉(zhuǎn)換到下一代
????????????if?(index?==?0)?{??//?tripped
????????????????boolean?ranAction?=?false;
????????????????try?{
????????????????????//喚醒所有線程前先執(zhí)行指定的任務(wù)
????????????????????final?Runnable?command?=?barrierCommand;
????????????????????if?(command?!=?null)
????????????????????????command.run();
????????????????????ranAction?=?true;
????????????????????//喚醒所有線程并轉(zhuǎn)換到下一代
????????????????????nextGeneration();
????????????????????return?0;
????????????????}?finally?{
????????????????????//確保在任務(wù)未成功執(zhí)行時能將所有線程喚醒
????????????????????if?(!ranAction)
????????????????????????breakBarrier();
????????????????}
????????????}
????????????//如果計數(shù)器不為0?則執(zhí)行此循環(huán)
????????????//?loop?until?tripped,?broken,?interrupted,?or?timed?out
????????????for?(;;)?{
????????????????try?{
????????????????????//根據(jù)傳入的參數(shù)來覺得是定時等待還是非定時等待
????????????????????if?(!timed)
????????????????????????//如果沒有時間限制,則直接等待,直到被喚醒
????????????????????????trip.await();
????????????????????else?if?(nanos?>?0L)
????????????????????????//如果有時間限制,則等待指定時間
????????????????????????nanos?=?trip.awaitNanos(nanos);
????????????????}?catch?(InterruptedException?ie)?{
????????????????????//若當前線程在等待期間被中斷則打翻柵欄喚醒其它線程
????????????????????if?(g?==?generation?&&?!?g.broken)?{
????????????????????????breakBarrier();
????????????????????????throw?ie;
????????????????????}?else?{
????????????????????????//?若在捕獲中斷異常前已經(jīng)完成在柵欄上的等待,則直接調(diào)用中斷操作
????????????????????????Thread.currentThread().interrupt();
????????????????????}
????????????????}
????????????????//如果線程因為打翻柵欄操作而被喚醒則拋出異常
????????????????if?(g.broken)
????????????????????throw?new?BrokenBarrierException();
????????????????//如果線程因為換代操作而被喚醒則返回計數(shù)器的值
????????????????if?(g?!=?generation)
????????????????????return?index;
????????????????//如果線程因為時間到了而被喚醒則打翻柵欄并拋出異常
????????????????if?(timed?&&?nanos?<=?0L)?{
????????????????????breakBarrier();
????????????????????throw?new?TimeoutException();
????????????????}
????????????}
????????}?finally?{
????????????lock.unlock();//最終解鎖
????????}
????}
分兩步分析,首先計數(shù)器的值減為0的情況,和計數(shù)器不為0的情況,首先第一種情況下:

第二種情況,計數(shù)器不為0,則進入自旋for(;;):

多線程同時并發(fā)訪問,如何阻塞當前線程?

我們翻看源碼,這里就看一下沒有時間限制的【trip.await】方法:

整個await的過程:
1、將當前線程加入到Condition鎖隊列中。特別主要要區(qū)分AQS的等待隊列,這里進入的是Condition的FIFO隊列
2、釋放鎖。這里可以看到【fullyRelease】將鎖釋放了,否則【acquireQueued(node, savedState)】別的線程就無法拿到鎖而發(fā)生死鎖。
3、自旋(while)掛起,直到被喚醒或者超時或者CACELLED等。
4、獲取鎖【acquireQueued】方法,并將自己從Condition的FIFO隊列中釋放,表面自己不再需要鎖(我已經(jīng)有鎖了)
3.2、Condition 隊列與AQS等待隊列 補充
AQS等待隊列與Condition隊列是兩個相互獨立的隊列,【await】就是在當前線程持有鎖的基礎(chǔ)上釋放鎖資源,并新建Condition節(jié)點加入到Condition隊列尾部,阻塞當前線程。【signal】就是將當前Condition的頭結(jié)點移動到AQS等待隊列節(jié)點尾部,讓其等待再次獲取鎖。下面畫圖演示區(qū)別:

節(jié)點1執(zhí)行Condition.await()->(1)將head后移 ->(2)釋放節(jié)點1的鎖并從AQS等待隊列中移除->(3)將節(jié)點1加入到Condition的等待隊列中->(4)更新lastWrite為節(jié)點1

節(jié)點2執(zhí)行signal()操作->(1)將firstWrite后移->(2)將節(jié)點4移出Condition隊列->(3)將節(jié)點4加入到AQS的等待隊列中去->(4)更新AQS等待隊列的tail

3.3、總結(jié):
一、Condition的數(shù)據(jù)結(jié)構(gòu):

我們知道一個Condition可以在多個地方被await(),那么就需要一個FIFO的結(jié)構(gòu)將這些Condition串聯(lián)起來,然后根據(jù)需要喚醒一個或者多個(通常是所有)。所以在Condition內(nèi)部就需要一個FIFO的隊列。private transient Node firstWaiter; private transient Node lastWaiter;上面的兩個節(jié)點就是描述一個FIFO的隊列。我們再結(jié)合前面提到的節(jié)點(Node)數(shù)據(jù)結(jié)構(gòu)。我們就發(fā)現(xiàn)Node.nextWaiter就派上用場了!nextWaiter就是將一系列的Condition.await 串聯(lián)起來組成一個FIFO的隊列。
二、線程何時阻塞和釋放
阻塞:await()方法中,在線程釋放鎖資源之后,如果節(jié)點不在AQS等待隊列,則阻塞當前線程,如果在等待隊列,則自旋等待嘗試獲取鎖 釋放:signal()后,節(jié)點會從condition隊列移動到AQS等待隊列,則進入正常鎖的獲取流程。
3.4、【signalAll】signalAll源碼分析
【signalAll】方法,喚醒所有在Condition阻塞隊列中的線程
private?void?breakBarrier()?{
????????generation.broken?=?true;
????????count?=?parties;
????????trip.signalAll();//喚醒Condition中等待的線程
????}
public?final?void?signalAll()?{
????????????if?(!isHeldExclusively())
????????????????throw?new?IllegalMonitorStateException();
????????????Node?first?=?firstWaiter;
????????????if?(first?!=?null)
????????????????doSignalAll(first);
?????}
/**?這個方法相當于把Condition隊列中的所有Node全部取出插入到等待隊列中去?*/
private?void?doSignalAll(Node?first)?{
????????????lastWaiter?=?firstWaiter?=?null;
????????????do?{
????????????????Node?next?=?first.nextWaiter;
????????????????first.nextWaiter?=?null;
????????????????transferForSignal(first);
????????????????first?=?next;
????????????}?while?(first?!=?null);
???????}
/**?將節(jié)點從條件隊列傳輸?shù)酵疥犃蠥QS的等待隊列中?*/
final?boolean?transferForSignal(Node?node)?{
????????//核心添加節(jié)點到AQS隊列方法
????????Node?p?=?enq(node);
????????int?ws?=?p.waitStatus;
????????if?(ws?>?0?||?!compareAndSetWaitStatus(p,?ws,?Node.SIGNAL))
????????????LockSupport.unpark(node.thread);
????????return?true;
????}
/**?使用CAS+自旋方式插入節(jié)點到等待隊列,如果隊列為空,則初始化隊列?*/
private?Node?enq(final?Node?node)?{
????????for?(;;)?{
????????????Node?t?=?tail;
????????????if?(t?==?null)?{?//?Must?initialize
????????????????if?(compareAndSetHead(new?Node()))
????????????????????tail?=?head;
????????????}?else?{
????????????????node.prev?=?t;
????????????????if?(compareAndSetTail(t,?node))?{
????????????????????t.next?=?node;
????????????????????return?t;
????????????????}
????????????}
????????}
3.5、【reset】方法源碼分析
最后,我們來看看怎么重置一個柵欄:

將屏障重置為初始狀態(tài)。如果任何一方目前在隔離墻等候,他們將帶著BrokenBarrierException返回。請注意,由于其他原因發(fā)生中斷后的重置可能很復(fù)雜;線程需要以其他方式重新同步,并選擇一種方式執(zhí)行重置。最好是創(chuàng)建一個新的屏障供以后使用
????public?void?reset()?{
????????final?ReentrantLock?lock?=?this.lock;
????????lock.lock();
????????try?{
????????????breakBarrier();???//?break?the?current?generation
????????????nextGeneration();?//?start?a?new?generation
????????}?finally?{
????????????lock.unlock();
????????}
????}
測試reset代碼:


首先,打破柵欄,那意味著所有等待的線程(5個等待的線程)會喚醒,【await 】方法會通過拋出【BrokenBarrierException】異常返回。然后開啟新一代,重置了 count 和 generation,相當于一切歸0了。
4、CyclicBarrier 與 CountDownLatch 的區(qū)別
相同點:
1、都可以實現(xiàn)一組線程在到達某個條件之前進行等待
2、它們內(nèi)部都有一個計數(shù)器,當計數(shù)器的值不斷減為0的時候,所有阻塞的線程都會被喚醒!
不同點:
1、CyclicBarrier 的計數(shù)器是由它自己來控制,而CountDownLatch 的計數(shù)器則是由使用則來控制
2、在CyclicBarrier 中線程調(diào)用 await方法不僅會將自己阻塞,還會將計數(shù)器減1,而在CountDownLatch中線程調(diào)用 await方法只是將自己阻塞而不會減少計數(shù)器的值。
3、另外,CountDownLatch 只能攔截一輪,而CyclicBarrier 可以實現(xiàn)循環(huán)攔截。一般來說CyclicBarrier 可以實現(xiàn) CountDownLatch的功能,而反之不能。
5、總結(jié):
當調(diào)用【cyclicBarrier.await】方法時,最終都會執(zhí)行【dowait】方法,使用了ReentrantLock去上鎖,每次講計數(shù)器count值-1,當計數(shù)器值-1為0的時候,會先執(zhí)行指定任務(wù),調(diào)用Condition的【trip.signalAll()】喚醒所有線程并進入下一代
如果當前計數(shù)器值-1不為0的時候,進入自旋,執(zhí)行Condition的【await()】方法,將當前線程添加到Condition的條件隊列中等待,執(zhí)行【fullyRelease】調(diào)用【tryRelease】將count值-1,再判斷count值是否為0,為0 則會先執(zhí)行指定任務(wù),調(diào)用Condition的【trip.signalAll()】喚醒所有線程并進入下一代,再判斷是否在AQS等待隊列中,如果不在的話就park當前線程進入AQS等待隊列中,否則自旋直到被喚醒在Condition中的等待隊列被signalAll進入AQS等待隊列中獲取鎖
往期推薦

老家浙江東海邊,靠海吃海,目前經(jīng)營一個小品牌,讓普通人吃到最新鮮的海鮮。有興趣可以點擊了解:《浙里有漁,鮮人一步!》???
下方二維碼關(guān)注我

技術(shù)草根,堅持分享?編程,算法,架構(gòu)
