<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          并發(fā)工具類Phaser、Exchanger使用

          共 5657字,需瀏覽 12分鐘

           ·

          2021-02-03 07:31


          0x01:Phaser

          Phaser?是一個更加復雜和強大的同步輔助類,對 CountDownLatch 與 CyclicBarrier 的全面升級,是一個 java 并發(fā) api 的一個重量級類。

          常用api:

          • arriveAndAwaitAdvance()? ? 每湊齊指定人數就報團執(zhí)行一次,同一個線程可以執(zhí)行多次arriveAndAwaitAdvance(),表示不同階段的報團

          • arriveAndDeregister()? ? 退出當前團,且當前團規(guī)則人數減1(報完當前團后,不再報下階段的團)

          • getArrivedParties()? ? 當前團湊足了多少人

          • getRegisteredParties()? ? 獲取注冊的團規(guī)定人數

          • arrive()? ? 使getArrivedParties()數量加1,即用一個虛擬線程占據一個線程的位置, 此虛擬線程不阻塞

          • register()? ? 動態(tài)增加一個團的規(guī)定人數

          • bulkRegister(int parties)? ? 動態(tài)的增加規(guī)定報團人數,是register()的多次調用版

          • forceTermination()? ? 取消報團,線程執(zhí)行各自代碼,不再有Phaser阻塞等待情況

          • getUnarrivedParties()? ? 當前還差多少線程開團,是getArrivedParties()方法的補集

          • isTerminated()? ? 判斷Phaser對象是否已為銷毀狀態(tài)

          使用案例

          • 作CountDownLatch使用

          import?java.util.Date;
          import?java.util.concurrent.Phaser;
          import?java.util.concurrent.TimeUnit;

          import?static?java.util.concurrent.ThreadLocalRandom.current;
          import?static?java.lang.Thread.currentThread;

          /**
          ?*?將?Phaser當作?CountDownLatch來用
          ?*?
          ?**/

          public?class?PhaserForCountDownLatch?{
          ????public?static?void?main(String[]?args)?throws?InterruptedException?{
          ????????//?定義一個?Phaser?,?并未指定“分片數量?parties”,此時在?Phaser?內部分片的數量?parties?默認為?0?,
          ????????//?后面可以通過?register()?方法來動態(tài)增加
          ????????final?Phaser?phaser?=?new?Phaser();
          ????????//?定義?5?個線程
          ????????for?(int?i?=?0;?i?5
          ;?i++)?{
          ????????????new?Thread(()?->?{
          ????????????????//?調用?Phaser?的?register()?方法使得?phaser?內部的?parties?加一
          ????????????????phaser.register();
          ????????????????try?{
          ????????????????????//?采用隨機休眠的方式模擬線程的運行時間開銷
          ????????????????????TimeUnit.SECONDS.sleep(current().nextInt(20));

          ????????????????????//?線程任務結束,執(zhí)行?arrive()
          ????????????????????/**
          ?????????????????????*?補充:arrive()?方法類似于 CountDownLatch 的 countdown()
          ?????????????????????*?方法,代表著“當前線程已經到達屏障”,
          ?????????????????????*?但是它不需要等待其他的線程也到達屏障。因此該方法“不是阻塞的方法”,執(zhí)行之后會立即返回,
          ?????????????????????*?同時該方法會返回一個整數類型的數字,代表著已經到達的?Phase(階段)編號
          ?????????????????????*/

          ????????????????????phaser.arrive();
          ????????????????????System.out.println(new?Date()?+?":"?+?currentThread()?+?"?completed?the?work.");
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}
          ????????????},?"T-"?+?i).start();
          ????????}
          ????????/**
          ?????????*?這里讓線程休眠的目的:?為了保證在主線程 register()?之前,所有的子線程都能順利 register ,?否則就會出現
          ?????????* phaser 只注冊一個 parties ,?并且很快 arrive 的情況。
          ?????????*/

          ????????TimeUnit.SECONDS.sleep(current().nextInt(10));
          ????????//?主線程也調用注冊方法
          ????????phaser.register();

          ????????//?主線程也?arrive()?,?但是它要等待下一個階段,等待下一個階段的前提“所有的線程都?arrive?,
          ????????//?也就是?phaser?內部當前?phase?的?unarrived?數量為?0?”
          ????????phaser.arriveAndAwaitAdvance();
          ????????System.out.println(new?Date()?+?":?all?of?sub?task?completed?work.");
          ????}
          }
          • 作CyclicBarrier使用

          /**
          ?*?將?Phaser?當作?CyclicBarrier?來使用
          ?*
          ?*??phaser.arriveAndAwaitAdvance():?該方法會等待當前?Phaser?中所有的?part(子線程)都完成了
          ?*??????????????????????????????????任務才能使線程退出阻塞狀態(tài)

          ?**/

          public?class?PhaserForCyclicBarrier
          {

          ???public?static?void?main(String[]?args)?throws?InterruptedException
          ???
          {
          ???????//?定義一個分片?parties?為0?的?Phaser
          ??????final?Phaser?phaser?=?new?Phaser();
          ??????for?(int?i?=?0;?i?5
          ;?i++)
          ??????{
          ?????????new?Thread(()->
          ?????????{
          ????????????//?子線程調用注冊方法
          ????????????phaser.register();
          ????????????try
          ????????????{
          ???????????????TimeUnit.SECONDS.sleep(current().nextInt(20));
          ???????????????//?調用?arriveAndAwaitAdvance()?等待所有線程?arrive?然后繼續(xù)前行
          ???????????????phaser.arriveAndAwaitAdvance();
          ???????????????System.out.println(new?Date()?+?":"?+?currentThread()?+?"?completed?the?work.");

          ????????????}
          ????????????catch?(InterruptedException?e)
          ????????????{
          ???????????????e.printStackTrace();
          ????????????}
          ?????????}?,?"T-"+i).start();
          ??????}
          ??????//?休眠以確保其他子線程順利調用?register()
          ??????TimeUnit.SECONDS.sleep(10);
          ??????//?主線程調用?register()
          ??????phaser.register();
          ??????phaser.arriveAndAwaitAdvance();
          ??????System.out.println(new?Date()?+?":?all?of?sub?task?completed?work.");
          ???}
          }


          0x02:Exchanger

          • Exchanger是什么?

          它提供一個同步點,在這個同步點兩個線程可以交換彼此的數據。這個兩個線程通過exchange方法交換數據,如果第一個線程先執(zhí)行exchange方法,它會一直等待第二個線程也執(zhí)行exchange,當兩個線程都到達同步點時,這兩個線程就可以交換數據,將本線程生產出來的數據傳遞給對方。因此使用Exchanger的中斷時成對的線程使用exchange()方法,當有一對線程到達了同步點,就會進行交換數據,因此該工具類的線程對象是成對的。

          線程可以在成對內配對和交換元素的同步點。每個線程在輸入exchange方法時提供一些對象,與合作者線程匹配,并在返回時接收其合作伙伴的對象。交換器可以被視為一個的雙向形式的SynchroniuzedQueue。交換器在諸如遺傳算法和管道設計的應用中可能是有用的。

          一個用于兩個工作線程之間交換數據的封裝工具類,簡單說就是一個線程在完成一定事物后想與另一個線程交換數據,則第一個先拿出數據的線程會一直等待第二個線程,直到第二個線程拿著數據到來時才能彼此交換對應數據。

          常用方法:

          • Exchanger 泛型類型,其中V表示可交換的數據類型

          • V exchanger(V v):等待另一個線程到達此交換點(除非當前線程被中斷),然后將給定的對象傳送該線程,并接收該線程的對象。

          • V exchanger(V v, long timeout, TimeUnit unit):等待另一個線程到達此交換點(除非當前線程被中斷或超出類指定的等待時間),然后將給定的對象傳送給該線程,并接收該線程的對象。


          import?java.util.concurrent.Exchanger;

          public?class?ExechangerExample?{

          ????public?static?void?main(String[]?args)?{

          ????????Exchanger?exchanger?=?new?Exchanger<>();
          ????????new?Thread()?{
          ????????????@Override
          ????????????public?void?run()?
          {
          ????????????????String?data1?=?"data1";
          ????????????????try?{
          ????????????????????System.out.println(Thread.currentThread().getName()?+?"交換前的數據:"?+?data1);
          ????????????????????String?data2?=?exchanger.exchange(data1);
          ????????????????????System.out.println(Thread.currentThread().getName()?+?"交換后的數據:"?+?data2);
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}
          ????????????}
          ????????}.start();
          ????????new?Thread()?{
          ????????????@Override
          ????????????public?void?run()?
          {
          ????????????????String?data2?=?"data2";
          ????????????????try?{
          ????????????????????System.out.println(Thread.currentThread().getName()?+?"交換前的數據:"?+?data2);
          ????????????????????String?data1?=?exchanger.exchange(data2);
          ????????????????????System.out.println(Thread.currentThread().getName()?+?"交換后的數據:"?+?data1);
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}
          ????????????}
          ????????}.start();
          ????}
          }

          喜歡,在看


          瀏覽 44
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美大黑吊插老女人肛门 | 欧美婬乱片A片AAA毛片地址 | 韩国免费一区二区三区 | 青娱乐亚洲领先精品 | 亚洲日本香蕉 |