<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          請按到場順序發(fā)言之CompletionService詳解!

          共 10095字,需瀏覽 21分鐘

           ·

          2022-06-15 14:33

          你知道的越多,不知道的就越多,業(yè)余的像一棵小草!

          你來,我們一起精進!你不來,我和你的競爭對手一起精進!

          編輯:業(yè)余草

          static.kancloud.cn/alex_wsc/java_concurrent_programming/1874829

          推薦:https://www.xttblog.com/?p=5347

          請按到場順序發(fā)言之CompletionService詳解!

          ?

          時間像海綿里的水,只要你愿意擠,總還是有的。

          ——魯迅

          ?
          CompletionService

          講解 CompletionService 之前,我們先回憶一下 ExcutorSevice。ExcutorService 實現(xiàn)了通過線程池來并發(fā)執(zhí)行任務(wù)。其中有一種方式是通過線程池執(zhí)行 Callable 任務(wù),然后通過 Future 獲取異步執(zhí)行的結(jié)果,如下面的代碼:

          public static void main(String[] args) throws ExecutionException, InterruptedException {
              ExecutorService executor = Executors.newFixedThreadPool(5);

              Callable callable1 = () -> {
                  Thread.sleep(10000);
                  return "任務(wù)1完成";
              };

              Callable callable2 = () -> {
                  Thread.sleep(5000);
                  return "任務(wù)2完成";
              };

              Future future1 = executor.submit(callable1);
              Future future2 = executor.submit(callable2);

              System.out.println(future1.get());
              System.out.println(future2.get());
          }

          任務(wù)一執(zhí)行需要 10 秒,任務(wù)二執(zhí)行只需要 5 秒。但是當執(zhí)行到 future1.get () 時,主線程會被阻塞。等待 10 秒后第一個任務(wù)執(zhí)行完才會去獲取第二個任務(wù)。然后執(zhí)行和第二個任務(wù)相關(guān)的打印操作。大家有沒有看出問題?任務(wù) 2 明明在 5 秒前就已經(jīng)執(zhí)行完成,卻不能立刻打印。主線程阻塞在任務(wù)一結(jié)果的獲取上。這樣程序執(zhí)行的效率并不高。如果任務(wù)完成后能夠立刻被取得執(zhí)行結(jié)果,然后執(zhí)行后面的邏輯,效率就會有顯著的提升。今天我們要講解的 CompletionService 就是用來做這個事情的。CompletionService 可以按照執(zhí)行完成結(jié)果的到場順序,被主線程獲取到,從而繼續(xù)執(zhí)行后面邏輯。

          了解 CompletionService

          了解一個類最好、最快的方法就是閱讀源代碼的注解。而大多數(shù)人通常的做法卻是去百度或者 google。這樣有兩個弊端,一是效率并不一定高,可能搜出來很多無用的內(nèi)容。二是看到的文章并不權(quán)威,甚至可能是錯的。有的同學可能覺得英文閱讀費勁,其實作為開發(fā)人員,英語閱讀已經(jīng)是必備技能。這就如同你要熟知 IDE 的快捷鍵一樣,所以如果覺得英文閱讀困難,可以刻意練習。其實多讀一些技術(shù)文檔,會發(fā)現(xiàn)用詞基本都是類似的。

          扯的有點遠,我們收回來,先看看源代碼中對 CompletionService 的解釋:

          ?

          對異步任務(wù)執(zhí)行和執(zhí)行結(jié)果消費解藕。生產(chǎn)者提交任務(wù)執(zhí)行。消費者則獲取完成的任務(wù),然后按照完成任務(wù)的順序?qū)θ蝿?wù)結(jié)果進行處理。

          ?

          官方的解釋是不是十分簡潔明了?

          使用 CompletionService

          下面我們使用 CompletionService 實現(xiàn)一個吃蘋果的程序。首先我聲明一個流,里面是一些水果,每個水果會對應(yīng)一個洗干凈的任務(wù)。然后主線程拿到洗干凈的水果再一個個吃掉。代碼如下:

          public static void main(String[] args) throws InterruptedException, ExecutionException {
              ExecutorService pool = Executors.newFixedThreadPool(5);
              CompletionService<String> service = new ExecutorCompletionService<String>(pool);
              
            Stream.of("蘋果""梨""葡萄""桃")
                      .forEach(fruit -> service.submit(() -> {
                                  if(fruit.equals("蘋果")){
                                      TimeUnit.SECONDS.sleep(6);
                                  }else if(fruit.equals("梨")){
                                      TimeUnit.SECONDS.sleep(1);
                                  }else if(fruit.equals("葡萄")){
                                      TimeUnit.SECONDS.sleep(10);
                                  }else if(fruit.equals("桃")){
                                      TimeUnit.SECONDS.sleep(3);
                                  }
                                  return "洗干凈的"+fruit;
                              })
                      );

              String result;
              while((result=service.take().get())!=null){
                  System.out.println("吃掉"+result);
              }
          }

          可以看到有四種水果。會為每個水果啟一個洗水果的任務(wù)。每種水果洗的時間不同,其中葡萄最不好洗要 10 秒,而梨最好洗,只需要 1 秒。等待水果洗好后,主線程通過 service.take () 取得執(zhí)行完成的 Future,然后從里面 get 出返回值,把洗干凈的水果吃掉。

          我們可以看到輸出如下:

          吃掉洗干凈的梨
          吃掉洗干凈的桃
          吃掉洗干凈的蘋果
          吃掉洗干凈的葡萄

          可以看到哪個水果先洗干凈就會先被吃掉。這也證明了 service.take () 的順序是任務(wù)的完成順序,而不是任務(wù)提交的順序。

          通過 CompletionService 我們就可以一端生產(chǎn),另一端按照完成的順序進行消費。這避免提交大量任務(wù)時,不知道哪個任務(wù)先完成,從而在調(diào)用 Future 的 get 方法時產(chǎn)生阻塞。使用 CompletionService,永遠都是完成一個返回一個,然后消費一個。這樣你的程序才更為高效。

          主線程收到返回后,可以再繼續(xù)使用 CompletionService 來異步執(zhí)行下一步的邏輯,這和非阻塞的編程方式異曲同工。

          CompletionService 源碼分析

          CompletionService 構(gòu)造方法

          我們先看如何初始化 CompletionService:

          ExecutorService pool = Executors.newFixedThreadPool(5);
          CompletionService<String> service = new ExecutorCompletionService<String>(pool);

          首先初始化 ExecutorService,在構(gòu)造 ExecutorCompletionService 時作為參數(shù)傳入。其實 CompletionService 對任務(wù)的執(zhí)行其實就是借助于 ExecutorService 來完成的。接下來我們進入它的構(gòu)造函數(shù):

          public ExecutorCompletionService(Executor executor) {
              if (executor == null)
                  throw new NullPointerException();
              this.executor = executor;
              this.aes = (executor instanceof AbstractExecutorService) ?
                  (AbstractExecutorService) executor : null;
              this.completionQueue = new LinkedBlockingQueue<Future<V>>();
          }

          構(gòu)造函數(shù)中構(gòu)造了三個屬性:

          private final Executor executor;
          private final AbstractExecutorService aes;
          private final BlockingQueue<Future<V>> completionQueue;

          executor 就是你傳入的 ExecutorService,用來執(zhí)行任務(wù)。

          aes 的作用是創(chuàng)建新的 task。它的初始化過程比較有意思,判斷了是否為 AbstractExecutorService 的實例。至于為什么這么做,我們后面再詳細講解。

          completionQueue 是一個存放 Future 的阻塞隊列,并且是無界的。這意味著如果源頭不斷的產(chǎn)生 Future,但是沒有去消費,就會造成內(nèi)存泄漏。

          executor 執(zhí)行完成的 Future 會被放入 completionQueue 中,take 方法將會從

          completionQueue 中取得最新的 future 對象(最近執(zhí)行完的 task 的結(jié)果)。

          CompletionService 的 submit 方法

          public Future<V> submit(Callable<V> task) {
              if (task == nullthrow new NullPointerException();
              RunnableFuture<V> f = newTaskFor(task);
              executor.execute(new QueueingFuture(f));
              return f;
          }

          首先將 Callable 類型的 task 轉(zhuǎn)為 RunnableFuture 類型。RunnableFuture 是個接口,F(xiàn)utureTask 是其一種實現(xiàn)。

          然后通過 new QueueingFuture (f),再將 RunnableFuture 包裝為 QueueingFuture 類型的對象。QueueingFuture 的作用就是在 Future 完成時,加入到 completionQueue 中。

          我們先看 newTaskFor 的源碼:

          private RunnableFuture<V> newTaskFor(Callable<V> task) {
              if (aes == null)
                  return new FutureTask<V>(task);
              else
                  return aes.newTaskFor(task);
          }

          如果 aes 為空,那么直接 new FutureTask。如果不為空則調(diào)用 aes 的 newTaskFor 方法。什么情況 aes 會為空呢?我們再看下 aes 初始化的代碼:

          this.aes = (executor instanceof AbstractExecutorService) ?
          (AbstractExecutorService) executor : null;

          當傳入的 executor 為 AbstractExecutorService 類型時,那么 aes 不為空。否則 aes 為空。這兩處邏輯處理是相關(guān)聯(lián)的,這么做的原因如下:

          1. 如果 executor 是 AbstractExecutorService 的子類,有可能會重寫 newTaskFor 方法,所以這里優(yōu)先使用 executo r 的方法來創(chuàng)建 Task,這樣后面通過 executor 執(zhí)行 task 才能正確。比如 ForkJoinPool 就對 newTaskFor 方法進行了重寫;
          2. 如果 executor 不是繼承自 AbstractExecutorService。那么它可能并沒有 newTaskFor 方法。所以需要 CompletionService 自己來創(chuàng)建 FutureTask。

          這樣看來 aes 的存在,只是為了盡量使用 executor 提供的 newTaskFor 方法來創(chuàng)建 task,以使后面 excute 方法能夠正常運行。

          接下來我們分析 QueueingFuture 方法:

          private class QueueingFuture extends FutureTask<Void{
              QueueingFuture(RunnableFuture<V> task) {
                  super(task, null);
                  this.task = task;
              }
              protected void done() { completionQueue.add(task); }
              private final Future<V> task;
          }

          QueueingFuture 是內(nèi)部靜態(tài)類,并且是 FutureTask 的子類。他只是重寫了 done 方法。大家回憶上一節(jié)對 Future 的分析,應(yīng)該還記得 done 方法在任務(wù)執(zhí)行結(jié)果返回后被調(diào)用,但是留給子類來實。這里就用上了這個特性。done 方法里面做的就是把 task 加入阻塞隊列中。這意味著,先完成的 task 會先把自己的 Future 放入隊列中。那么當然也會被 take 方法先取到。而由于是阻塞隊列,所以 take 方法取不到 task 時,就會阻塞。但由于能被 take 到的 task 肯定已經(jīng)有了返回值,所以調(diào)用 task 的 get 方法時就不會再次阻塞了。也就是說 client 代碼中的下面一行只會在 take 時發(fā)生阻塞:

          while((result=service.take().get())!=null){
                  System.out.println("吃掉"+result);
          }

          executor 執(zhí)行任務(wù)的代碼就不用再次分析了,這在之前學習 Executor 的時候已經(jīng)詳細分析過了。submit 方法分析完后我們再來看看 take 方法。

          CompletionService 的 take 方法

          相比較 submit 方法,take 方法就更為簡單了,如下:

          public Future<V> take() throws InterruptedException {
              return completionQueue.take();
          }

          只有一行代碼,就是從 completionQueue 中取得 Futrue 對象。由于 completionQueue 是阻塞隊列,當沒有 Future 時,就會阻塞在此。而 completionQueue 中保存 Future 的順序是完成順序。

          總結(jié)

          CompletionService 給我們提供了一種非阻塞的異步執(zhí)行方式。讓程序更為高效。他的實現(xiàn)非常的簡單和巧妙,值得我們借鑒。其實我們學習到這里,不知道你是否有這種體會,這些工具實際上就是我們之前學習內(nèi)容的組合運用,如果前面你掌握的很牢固,學習起來一點也不費勁。如果前面就似懂非懂,那么就會越看越糊涂。其實我們在學習上至少有一半的時間都是在打基礎(chǔ),但這個過程必不可少,并且受益更為深遠。

          瀏覽 40
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  羽月希在线播放 | 欧美A片在线免费观看 | 午夜一区 | 日本免费的黄色视频 | 爆操小姐姐 |