并發(fā)工具類Phaser、Exchanger使用
0x01:Phaser
Phaser?是一個更加復雜和強大的同步輔助類,對 CountDownLatch 與 CyclicBarrier 的全面升級,是一個 java 并發(fā) api 的一個重量級類。
常用api:
arriveAndAwaitAdvance()? ? 每湊齊指定人數就報團執(zhí)行一次,同一個線程可以執(zhí)行多次arriveAndAwaitAdvance(),表示不同階段的報團
arriveAndDeregister()? ? 退出當前團,且當前團規(guī)則人數減1(報完當前團后,不再報下階段的團)
getArrivedParties()? ? 當前團湊足了多少人
getRegisteredParties()? ? 獲取注冊的團規(guī)定人數
arrive()? ? 使getArrivedParties()數量加1,即用一個虛擬線程占據一個線程的位置, 此虛擬線程不阻塞
register()? ? 動態(tài)增加一個團的規(guī)定人數
bulkRegister(int parties)? ? 動態(tài)的增加規(guī)定報團人數,是register()的多次調用版
forceTermination()? ? 取消報團,線程執(zhí)行各自代碼,不再有Phaser阻塞等待情況
getUnarrivedParties()? ? 當前還差多少線程開團,是getArrivedParties()方法的補集
isTerminated()? ? 判斷Phaser對象是否已為銷毀狀態(tài)
使用案例
作CountDownLatch使用
import?java.util.Date;
import?java.util.concurrent.Phaser;
import?java.util.concurrent.TimeUnit;
import?static?java.util.concurrent.ThreadLocalRandom.current;
import?static?java.lang.Thread.currentThread;
/**
?*?將?Phaser當作?CountDownLatch來用
?*?
?**/
public?class?PhaserForCountDownLatch?{
????public?static?void?main(String[]?args)?throws?InterruptedException?{
????????//?定義一個?Phaser?,?并未指定“分片數量?parties”,此時在?Phaser?內部分片的數量?parties?默認為?0?,
????????//?后面可以通過?register()?方法來動態(tài)增加
????????final?Phaser?phaser?=?new?Phaser();
????????//?定義?5?個線程
????????for?(int?i?=?0;?i?5;?i++)?{
????????????new?Thread(()?->?{
????????????????//?調用?Phaser?的?register()?方法使得?phaser?內部的?parties?加一
????????????????phaser.register();
????????????????try?{
????????????????????//?采用隨機休眠的方式模擬線程的運行時間開銷
????????????????????TimeUnit.SECONDS.sleep(current().nextInt(20));
????????????????????//?線程任務結束,執(zhí)行?arrive()
????????????????????/**
?????????????????????*?補充:arrive()?方法類似于 CountDownLatch 的 countdown()
?????????????????????*?方法,代表著“當前線程已經到達屏障”,
?????????????????????*?但是它不需要等待其他的線程也到達屏障。因此該方法“不是阻塞的方法”,執(zhí)行之后會立即返回,
?????????????????????*?同時該方法會返回一個整數類型的數字,代表著已經到達的?Phase(階段)編號
?????????????????????*/
????????????????????phaser.arrive();
????????????????????System.out.println(new?Date()?+?":"?+?currentThread()?+?"?completed?the?work.");
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????},?"T-"?+?i).start();
????????}
????????/**
?????????*?這里讓線程休眠的目的:?為了保證在主線程 register()?之前,所有的子線程都能順利 register ,?否則就會出現
?????????* phaser 只注冊一個 parties ,?并且很快 arrive 的情況。
?????????*/
????????TimeUnit.SECONDS.sleep(current().nextInt(10));
????????//?主線程也調用注冊方法
????????phaser.register();
????????//?主線程也?arrive()?,?但是它要等待下一個階段,等待下一個階段的前提“所有的線程都?arrive?,
????????//?也就是?phaser?內部當前?phase?的?unarrived?數量為?0?”
????????phaser.arriveAndAwaitAdvance();
????????System.out.println(new?Date()?+?":?all?of?sub?task?completed?work.");
????}
}作CyclicBarrier使用
/**
?*?將?Phaser?當作?CyclicBarrier?來使用
?*
?*??phaser.arriveAndAwaitAdvance():?該方法會等待當前?Phaser?中所有的?part(子線程)都完成了
?*??????????????????????????????????任務才能使線程退出阻塞狀態(tài)
?**/
public?class?PhaserForCyclicBarrier
{
???public?static?void?main(String[]?args)?throws?InterruptedException
???{
???????//?定義一個分片?parties?為0?的?Phaser
??????final?Phaser?phaser?=?new?Phaser();
??????for?(int?i?=?0;?i?5;?i++)
??????{
?????????new?Thread(()->
?????????{
????????????//?子線程調用注冊方法
????????????phaser.register();
????????????try
????????????{
???????????????TimeUnit.SECONDS.sleep(current().nextInt(20));
???????????????//?調用?arriveAndAwaitAdvance()?等待所有線程?arrive?然后繼續(xù)前行
???????????????phaser.arriveAndAwaitAdvance();
???????????????System.out.println(new?Date()?+?":"?+?currentThread()?+?"?completed?the?work.");
????????????}
????????????catch?(InterruptedException?e)
????????????{
???????????????e.printStackTrace();
????????????}
?????????}?,?"T-"+i).start();
??????}
??????//?休眠以確保其他子線程順利調用?register()
??????TimeUnit.SECONDS.sleep(10);
??????//?主線程調用?register()
??????phaser.register();
??????phaser.arriveAndAwaitAdvance();
??????System.out.println(new?Date()?+?":?all?of?sub?task?completed?work.");
???}
}0x02:Exchanger
Exchanger是什么?
它提供一個同步點,在這個同步點兩個線程可以交換彼此的數據。這個兩個線程通過exchange方法交換數據,如果第一個線程先執(zhí)行exchange方法,它會一直等待第二個線程也執(zhí)行exchange,當兩個線程都到達同步點時,這兩個線程就可以交換數據,將本線程生產出來的數據傳遞給對方。因此使用Exchanger的中斷時成對的線程使用exchange()方法,當有一對線程到達了同步點,就會進行交換數據,因此該工具類的線程對象是成對的。
線程可以在成對內配對和交換元素的同步點。每個線程在輸入exchange方法時提供一些對象,與合作者線程匹配,并在返回時接收其合作伙伴的對象。交換器可以被視為一個的雙向形式的SynchroniuzedQueue。交換器在諸如遺傳算法和管道設計的應用中可能是有用的。
一個用于兩個工作線程之間交換數據的封裝工具類,簡單說就是一個線程在完成一定事物后想與另一個線程交換數據,則第一個先拿出數據的線程會一直等待第二個線程,直到第二個線程拿著數據到來時才能彼此交換對應數據。
常用方法:
Exchanger 泛型類型,其中V表示可交換的數據類型
V exchanger(V v):等待另一個線程到達此交換點(除非當前線程被中斷),然后將給定的對象傳送該線程,并接收該線程的對象。
V exchanger(V v, long timeout, TimeUnit unit):等待另一個線程到達此交換點(除非當前線程被中斷或超出類指定的等待時間),然后將給定的對象傳送給該線程,并接收該線程的對象。
import?java.util.concurrent.Exchanger;
public?class?ExechangerExample?{
????public?static?void?main(String[]?args)?{
????????Exchanger?exchanger?=?new?Exchanger<>();
????????new?Thread()?{
????????????@Override
????????????public?void?run()?{
????????????????String?data1?=?"data1";
????????????????try?{
????????????????????System.out.println(Thread.currentThread().getName()?+?"交換前的數據:"?+?data1);
????????????????????String?data2?=?exchanger.exchange(data1);
????????????????????System.out.println(Thread.currentThread().getName()?+?"交換后的數據:"?+?data2);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????}.start();
????????new?Thread()?{
????????????@Override
????????????public?void?run()?{
????????????????String?data2?=?"data2";
????????????????try?{
????????????????????System.out.println(Thread.currentThread().getName()?+?"交換前的數據:"?+?data2);
????????????????????String?data1?=?exchanger.exchange(data2);
????????????????????System.out.println(Thread.currentThread().getName()?+?"交換后的數據:"?+?data1);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????}.start();
????}
} 喜歡,在看
