<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          還在使用Future輪詢獲取結(jié)果嗎?CompletionService快來(lái)了解下。

          共 16922字,需瀏覽 34分鐘

           ·

          2021-09-07 05:14

          ????關(guān)注后回復(fù) “進(jìn)群” ,拉你進(jìn)程序員交流群????
          作者丨java金融
          來(lái)源丨java金融

          同步入庫(kù)

          二胖拿到任務(wù),三下五除二就把任務(wù)完成了。

           public static void main(String[] args) throws InterruptedException {
                  String mouZhuFlightPrice = getMouZhuFlightPrice();
                  String mouXieFlightPrice = getMouXieFlightPrice();
                  String mouTuanFlightPrice = getMouTuanFlightPrice();
                  saveDb(mouZhuFlightPrice);
                  saveDb(mouXieFlightPrice);
                  saveDb(mouTuanFlightPrice);
              }


              /**
               * 模擬請(qǐng)求某豬網(wǎng)站 爬取機(jī)票信息
               *
               *
               * @return
               * @throws InterruptedException
               */

              public static String getMouZhuFlightPrice() throws InterruptedException {
                  // 模擬請(qǐng)求某豬網(wǎng)站 爬取機(jī)票信息
                  Thread.sleep(10000);
                  return "獲取到某豬網(wǎng)站的機(jī)票信息了";
              }

              /**
               * 模擬請(qǐng)求某攜網(wǎng)站 爬取機(jī)票信息
               *
               * @return
               * @throws InterruptedException
               */

              public static String getMouXieFlightPrice() throws InterruptedException {
                  // 模擬請(qǐng)求某攜網(wǎng)站 爬取機(jī)票信息
                  Thread.sleep(5000);
                  return "獲取到某攜網(wǎng)站的機(jī)票信息了";
              }


              /**
               * 模擬請(qǐng)求團(tuán)網(wǎng)站 爬取機(jī)票信息
               *
               * @return
               * @throws InterruptedException
               */

              public static String getMouTuanFlightPrice() throws InterruptedException {
                  // 模擬請(qǐng)求某團(tuán)網(wǎng)站 爬取機(jī)票信息
                  Thread.sleep(3000);
                  return "獲取到某團(tuán)網(wǎng)站的機(jī)票信息了";
              }

              /**
               * 保存DB
               *
               * @param flightPriceList
               */

              public static void saveDb(String flightPriceList) {
                      // 解析字符串 進(jìn)行異步入庫(kù)
              }

          這次二胖學(xué)乖了,任務(wù)完成了先去找下坐他對(duì)面的技術(shù)大拿(看他那發(fā)際線就知道了)同事“二狗”讓二狗大拿幫忙指點(diǎn)一二,看看代碼是否還能有優(yōu)化的地方。畢竟領(lǐng)導(dǎo)對(duì)代碼的性能、以及代碼的優(yōu)雅是有要求的。領(lǐng)導(dǎo)多次在部門的周會(huì)上提到讓我們多看看“二狗”寫的代碼,學(xué)習(xí)下人家寫代碼的優(yōu)雅、抽象、封裝等等。二狗大概的瞄了下二胖寫的代碼,提出了個(gè)小小的建議“這個(gè)代碼可以采用多線程來(lái)優(yōu)化下哦,你看某豬這個(gè)網(wǎng)站耗時(shí)是拿到結(jié)果需要10s,其他的耗時(shí)都比它短,先有結(jié)果的我們可以先處理的,不需要等到大家都返回了再來(lái)處理的”。

          輪循futureList獲取結(jié)果

          幸好二胖對(duì)多線程了解一點(diǎn)點(diǎn),于是乎采用future的方式來(lái)實(shí)現(xiàn)。二胖使用一個(gè)List來(lái)保存每個(gè)任務(wù)返回的Future,然后去輪詢這些Future,直到每個(gè)Future都已完成。由于需要先完成的任務(wù)需要先執(zhí)行,且不希望出現(xiàn)因?yàn)榕旁谇懊娴娜蝿?wù)阻塞導(dǎo)致后面先完成的任務(wù)的結(jié)果沒(méi)有及時(shí)獲取的情況,所以在調(diào)用get方式時(shí),需要將超時(shí)時(shí)間設(shè)置為0

            public static void main(String[] args) {
                  int taskSize = 3;
                  Future<String> mouZhuFlightPriceFuture = executor.submit(() -> getMouZhuFlightPrice());
                  Future<String> mouXieFlightPriceFuture = executor.submit(() -> getMouXieFlightPrice());
                  Future<String> mouTuanFlightPriceFuture = executor.submit(() -> getMouTuanFlightPrice());
                  List<Future<String>> futureList = new ArrayList<>();
                  futureList.add(mouZhuFlightPriceFuture);
                  futureList.add(mouXieFlightPriceFuture);
                  futureList.add(mouTuanFlightPriceFuture);
                  // 輪詢,獲取完成任務(wù)的返回結(jié)果
                  while (taskSize > 0) {
                      for (Future<String> future : futureList) {
                          String result = null;
                          try {
                              result = future.get(0, TimeUnit.SECONDS);
                          } catch (InterruptedException e) {
                              taskSize--;
                              e.printStackTrace();
                          } catch (ExecutionException e) {
                              taskSize--;
                              e.printStackTrace();
                          } catch (TimeoutException e) {
                              // 超時(shí)異常需要忽略,因?yàn)槲覀冊(cè)O(shè)置了等待時(shí)間為0,只要任務(wù)沒(méi)有完成,就會(huì)報(bào)該異常
                          }
                          // 任務(wù)已經(jīng)完成
                          if (result != null) {
                              System.out.println("result=" + result);
                              // 從future列表中刪除已經(jīng)完成的任務(wù)
                              futureList.remove(future);
                              taskSize--;
                              // 此處必須break,否則會(huì)拋出并發(fā)修改異常。(也可以通過(guò)將futureList聲明為CopyOnWriteArrayList類型解決)
                              break// 進(jìn)行下一次while循環(huán)
                          }
                      }
                  }
              }

          上述代碼有兩個(gè)小細(xì)節(jié)需要注意下:

          • 如采用ArrayList的話futureList刪除之后需要break進(jìn)行下一次while循環(huán),否則會(huì)產(chǎn)生我們意想不到的ConcurrentModificationException異常。具體原因可看下《ArrayList的刪除姿勢(shì)你都掌握了嗎》這個(gè)文章,里面有詳細(xì)的介紹。

          • 在捕獲了InterruptedExceptionExecutionException異常后記得 taskSize--否則就會(huì)發(fā)生死循環(huán)。如果生產(chǎn)發(fā)生了死循環(huán)你懂的,cpu被你打滿,程序假死等。你離被開(kāi)除也不遠(yuǎn)了。

          • 上面輪詢future列表非常的復(fù)雜,而且還有很多異常需要處理,還有很多細(xì)節(jié)需要考慮,還有被開(kāi)除的風(fēng)險(xiǎn)。所以這種方案也被pass了。

          自定義BlockingQueue實(shí)現(xiàn)

          • 上述方案被pass之后,二胖就在思考可以借用哪種數(shù)據(jù)來(lái)實(shí)現(xiàn)下先進(jìn)先出的功能,貌似隊(duì)列可以實(shí)現(xiàn)下這個(gè)功能。所以二胖又寫了一版采用隊(duì)列來(lái)實(shí)現(xiàn)的功能。
            final static ExecutorService executor = new ThreadPoolExecutor(66,
                      0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

              public static void main(String[] args) throws InterruptedException, ExecutionException {
                  Future<String> mouZhuFlightPriceFuture = executor.submit(() -> getMouZhuFlightPrice());
                  Future<String> mouXieFlightPriceFuture = executor.submit(() -> getMouXieFlightPrice());
                  Future<String> mouTuanFlightPriceFuture = executor.submit(() -> getMouTuanFlightPrice());

                  // 創(chuàng)建阻塞隊(duì)列
                  BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3);
                  executor.execute(() -> run(mouZhuFlightPriceFuture, blockingQueue));
                  executor.execute(() -> run(mouXieFlightPriceFuture, blockingQueue));
                  executor.execute(() -> run(mouTuanFlightPriceFuture, blockingQueue));
                  // 異步保存所有機(jī)票價(jià)格
                  for (int i = 0; i < 3; i++) {
                      String result = blockingQueue.take();
                      System.out.println(result);
                      saveDb(result);
                  }
              }

              private static void run(Future<String> flightPriceFuture, BlockingQueue<String> blockingQueue) {
                  try {
                      blockingQueue.put(flightPriceFuture.get());
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  } catch (ExecutionException e) {
                      e.printStackTrace();
                  }
              }
          • 這次比上個(gè)版本好多了,代碼也簡(jiǎn)潔多了。不過(guò)按理說(shuō)這種需求應(yīng)該是大家經(jīng)常遇到的,應(yīng)該不需要自己來(lái)實(shí)現(xiàn)把,JAVA這么貼心的語(yǔ)言應(yīng)該會(huì)有api可以直接拿來(lái)用吧。

          CompletionService實(shí)現(xiàn)

          • 二胖現(xiàn)在畢竟也是對(duì)代碼的簡(jiǎn)潔性有追求的人了。于是乎二胖去翻翻自己躺在書柜里吃灰的并發(fā)相關(guān)的書籍,看看是否有解決方案。終于皇天不負(fù)有心人在二胖快要放棄的時(shí)候突然發(fā)現(xiàn)了新大陸。 《Java并發(fā)編程實(shí)戰(zhàn)》一書6.3.5節(jié)CompletionService:ExecutorBlockingQueue,有這樣一段話:

          如果向Executor提交了一組計(jì)算任務(wù),并且希望在計(jì)算完成后獲得結(jié)果,那么可以保留與每個(gè)任務(wù)關(guān)聯(lián)的Future,然后反復(fù)使用get方法,同時(shí)將參數(shù)timeout指定為0,從而通過(guò)輪詢來(lái)判斷任務(wù)是否完成。這種方法雖然可行,但卻有些繁瑣。幸運(yùn)的是,還有一種更好的方法:完成服務(wù)CompletionService。

            final static ExecutorService executor = new ThreadPoolExecutor(66,
                      0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

              public static void main(String[] args) throws ExecutionException, InterruptedException {
                  CompletionService completionService = new ExecutorCompletionService(executor);
                  completionService.submit(() -> getMouZhuFlightPrice());
                  completionService.submit(() -> getMouXieFlightPrice());
                  completionService.submit(() -> getMouTuanFlightPrice());
                  for (int i = 0; i < 3; i++) {
                      String result = (String)completionService.take().get();
                      System.out.println(result);
                      saveDb(result);
                  }
              }

          當(dāng)我們使用了CompletionService不用遍歷future列表,也不需要去自定義隊(duì)列了,代碼變得簡(jiǎn)潔了。下面我們就來(lái)分析下CompletionService實(shí)現(xiàn)的原理吧。

          CompletionService 介紹

          • 我們可以先看下JDK源碼中CompletionServicejavadoc說(shuō)明吧
          /**
           * A service that decouples the production of new asynchronous tasks
           * from the consumption of the results of completed tasks.  Producers
           * {@code submit} tasks for execution. Consumers {@code take}
           * completed tasks and process their results in the order they
           * complete.

          大概意思是CompletionService實(shí)現(xiàn)了生產(chǎn)者提交任務(wù)和消費(fèi)者獲取結(jié)果的解耦,生產(chǎn)者和消費(fèi)者都不用關(guān)心任務(wù)的完成順序,由CompletionService來(lái)保證,消費(fèi)者一定是按照任務(wù)完成的先后順序來(lái)獲取執(zhí)行結(jié)果。

          成員變量

          既然需要按照任務(wù)的完成順序獲取結(jié)果,那內(nèi)部應(yīng)該也是通過(guò)隊(duì)列來(lái)實(shí)現(xiàn)的吧。打開(kāi)源碼我們可以看到,里面有三個(gè)成員變量

          public class ExecutorCompletionService<Vimplements CompletionService<V{
           // 執(zhí)行task的線程池,創(chuàng)建CompletionService必須指定;
              private final Executor executor;
              //主要用于創(chuàng)建待執(zhí)行task;
              private final AbstractExecutorService aes;
              //存儲(chǔ)已完成狀態(tài)的task,默認(rèn)是基于鏈表結(jié)構(gòu)的阻塞隊(duì)列LinkedBlockingQueue。     
              private final BlockingQueue<Future<V>> completionQueue;

          任務(wù)提交

          ExecutorCompletionService任務(wù)的提交和執(zhí)行都是委托給Executor來(lái)完成。當(dāng)提交某個(gè)任務(wù)時(shí),該任務(wù)首先將被包裝為一個(gè)QueueingFuture

          public Future<V> submit(Callable<V> task) {
                  if (task == nullthrow new NullPointerException();
                  RunnableFuture<V> f = newTaskFor(task);
                  executor.execute(new QueueingFuture(f));
                  return f;
              }

          任務(wù)完成后何時(shí)進(jìn)入隊(duì)列

          從源碼可以看出,QueueingFutureFutureTask的子類,實(shí)現(xiàn)了done方法,在task執(zhí)行完成之后將當(dāng)前task添加到completionQueue,將返回結(jié)果加入到阻塞隊(duì)列中,加入的順序就是任務(wù)完成的先后順序。done方法的具體調(diào)用在FutureTaskfinishCompletion方法。

          獲取已完成任務(wù)

           public Future<V> take() throws InterruptedException {
                  return completionQueue.take();
              }

              public Future<V> poll() {
                  return completionQueue.poll();
              }

              public Future<V> poll(long timeout, TimeUnit unit)
                      throws InterruptedException 
          {
                  return completionQueue.poll(timeout, unit);
              }

          takepoll都是調(diào)用BlockingQueue提供的方法。

          • take() 獲取任務(wù)阻塞,直到可以拿到任務(wù)為止。
          • poll() 獲取任務(wù)不阻塞,如果沒(méi)有獲取到任務(wù)直接返回null
          • poll(long timeout, TimeUnit unit) 帶超時(shí)時(shí)間等待的獲取任務(wù)方法(一般推薦使用這種

          總結(jié)

          • CompletionService 把線程池 Executor 和阻塞隊(duì)列 BlockingQueue融合在一起,能夠讓批異步任務(wù)的管理更簡(jiǎn)單,將生產(chǎn)者提交任務(wù)和消費(fèi)者獲取結(jié)果的解耦。
          • CompletionService 能夠讓異步任務(wù)的執(zhí)行結(jié)果有序化,先執(zhí)行完的先進(jìn)入阻塞隊(duì)列,利用這個(gè)特性,我們可以輕松實(shí)現(xiàn)后續(xù)處理的有序性,避免無(wú)謂的等待。

          結(jié)束

          • 由于自己才疏學(xué)淺,難免會(huì)有紕漏,假如你發(fā)現(xiàn)了錯(cuò)誤的地方,還望留言給我指出來(lái),我會(huì)對(duì)其加以修正。
          • 如果你覺(jué)得文章還不錯(cuò),你的轉(zhuǎn)發(fā)、分享、贊賞、點(diǎn)贊、留言就是對(duì)我最大的鼓勵(lì)。
          • 感謝您的閱讀,十分歡迎并感謝您的關(guān)注。


          • 參考
            《java并發(fā)編程實(shí)戰(zhàn)》
            https://www.jianshu.com/p/19093422dd57 https://blog.csdn.net/cbjcry/article/details/84222853 https://www.jianshu.com/p/493ae1b107e4

          -End-

          最近有一些小伙伴,讓我?guī)兔φ乙恍?nbsp;面試題 資料,于是我翻遍了收藏的 5T 資料后,匯總整理出來(lái),可以說(shuō)是程序員面試必備!所有資料都整理到網(wǎng)盤了,歡迎下載!

          點(diǎn)擊??卡片,關(guān)注后回復(fù)【面試題】即可獲取

          在看點(diǎn)這里好文分享給更多人↓↓

          瀏覽 33
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  狼友在线观看视频 | a√天堂中文8 | 天天干天天爽天天玩 | 青青干视频 | 亚洲在线三级片 |