<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          “既生 ExecutorService, 何生 CompletionService?”

          共 2435字,需瀏覽 5分鐘

           ·

          2020-08-21 22:20


          ExecutorService VS CompletionService

          假設我們有 4 個任務(A, B, C, D)用來執(zhí)行復雜的計算,每個任務的執(zhí)行時間隨著輸入?yún)?shù)的不同而不同,如果將任務提交到 ExecutorService, 相信你已經(jīng)可以“信手拈來”
          ExecutorService?executorService?=?Executors.newFixedThreadPool(4);
          List?futures?=?new?ArrayList>();
          futures.add(executorService.submit(A));
          futures.add(executorService.submit(B));
          futures.add(executorService.submit(C));
          futures.add(executorService.submit(D));

          //?遍歷?Future?list,通過?get()?方法獲取每個?future?結果
          for?(Future?future:futures)?{
          ????Integer?result?=?future.get();
          ????//?其他業(yè)務邏輯
          }
          先直入主題,用 CompletionService 實現(xiàn)同樣的場景
          ExecutorService?executorService?=?Executors.newFixedThreadPool(4);

          //?ExecutorCompletionService?是?CompletionService?唯一實現(xiàn)類
          CompletionService?executorCompletionService=?new?ExecutorCompletionService<>(executorService?);

          List?futures?=?new?ArrayList>();
          futures.add(executorCompletionService.submit(A));
          futures.add(executorCompletionService.submit(B));
          futures.add(executorCompletionService.submit(C));
          futures.add(executorCompletionService.submit(D));

          //?遍歷?Future?list,通過?get()?方法獲取每個?future?結果
          for?(int?i=0;?i????Integer?result?=?executorCompletionService.take().get();
          ????//?其他業(yè)務邏輯
          }
          兩種方式在代碼實現(xiàn)上幾乎一毛一樣,我們曾經(jīng)說過 JDK 中不會重復造輪子,如果要造一個新輪子,必定是原有的輪子在某些場景的使用上有致命缺陷
          既然新輪子出來了,二者到底有啥不同呢?
          如果 Future 結果沒有完成,調(diào)用 get() 方法,程序會阻塞在那里,直至獲取返回結果
          先來看第一種實現(xiàn)方式,假設任務 A 由于參數(shù)原因,執(zhí)行時間相對任務 B,C,D 都要長很多,但是按照程序的執(zhí)行順序,程序在 get() 任務 A 的執(zhí)行結果會阻塞在那里,導致任務 B,C,D 的后續(xù)任務沒辦法執(zhí)行。又因為每個任務執(zhí)行時間是不固定的,所以無論怎樣調(diào)整將任務放到 List 的順序,都不合適,這就是致命弊端
          新輪子自然要解決這個問題,它的設計理念就是哪個任務先執(zhí)行完成,get() 方法就會獲取到相應的任務結果,這么做的好處是什么呢?來看個圖你就瞬間理解了
          兩張圖一對比,執(zhí)行時長高下立判了,在當今高并發(fā)的時代,這點時間差,在吞吐量上起到的效果可能不是一點半點了
          那 CompletionService 是怎么做到獲取最先執(zhí)行完的任務結果的呢?

          ?

          遠看CompletionService 輪廓

          如果你使用過消息隊列,你應該秒懂我要說什么了,CompletionService 實現(xiàn)原理很簡單
          就是一個將異步任務的生產(chǎn)和任務完成結果的消費解耦的服務
          用人話解釋一下上面的抽象概念我只能再畫一張圖了
          說白了,哪個任務執(zhí)行的完,就直接將執(zhí)行結果放到隊列中,這樣消費者拿到的結果自然就是最早拿到的那個了
          從上圖中看到,有任務,有結果隊列,那 CompletionService 自然也要圍繞著幾個關鍵字做文章了
          • 既然是異步任務,那自然可能用到 Runnable 或 Callable
          • 既然能獲取到結果,自然也會用到 Future 了
          帶著這些線索,我們走進 CompletionService 源碼看一看

          ?

          近看 CompletionService 源碼

          CompletionService ?是一個接口,它簡單的只有 5 個方法:
          Future?submit(Callable?task);
          Future?submit(Runnable?task,?V?result);
          Future?take()?throws?InterruptedException;
          Future?poll();
          Future?poll(long?timeout,?TimeUnit?unit)?throws?InterruptedException;
          關于 2 個 submit 方法, 我在 不會用Java Future,我懷疑你泡茶沒我快 文章中做了非常詳細的分析以及案例使用說明,這里不再過多贅述
          另外 3 個方法都是從阻塞隊列中獲取并移除阻塞隊列第一個元素,只不過他們的功能略有不同
          • Take: 如果隊列為空,那么調(diào)用 take() 方法的線程會被阻塞
          • Poll: 如果隊列為空,那么調(diào)用 poll() 方法的線程會返回 null
          • Poll-timeout: 以超時的方式獲取并移除阻塞隊列中的第一個元素,如果超時時間到,隊列還是空,那么該方法會返回 null
          所以說,按大類劃分上面5個方法,其實就是兩個功能
          • 提交異步任務 (submit)
          • 從隊列中拿取并移除第一個元素 (take/poll)
          CompletionService 只是接口,ExecutorCompletionService 是該接口的唯一實現(xiàn)類

          ExecutorCompletionService 源碼分析

          先來看一下類結構, 實現(xiàn)類里面并沒有多少內(nèi)容
          ExecutorCompletionService 有兩種構造函數(shù):
          private?final?Executor?executor;
          private?final?AbstractExecutorService?aes;
          private?final?BlockingQueue>?completionQueue;

          public?ExecutorCompletionService(Executor?executor)?{
          ????if?(executor?==?null)
          ????????throw?new?NullPointerException();
          ????this.executor?=?executor;
          ????this.aes?=?(executor?instanceof?AbstractExecutorService)??
          ????????(AbstractExecutorService)?executor?:?null;
          ????this.completionQueue?=?new?LinkedBlockingQueue>();
          }
          public?ExecutorCompletionService(Executor?executor,
          ?????????????????????????????????BlockingQueue>?completionQueue)
          ?
          {
          ????if?(executor?==?null?||?completionQueue?==?null)
          ????????throw?new?NullPointerException();
          ????this.executor?=?executor;
          ????this.aes?=?(executor?instanceof?AbstractExecutorService)??
          ????????(AbstractExecutorService)?executor?:?null;
          ????this.completionQueue?=?completionQueue;
          }
          兩個構造函數(shù)都需要傳入一個 Executor 線程池,因為是處理異步任務的,我們是不被允許手動創(chuàng)建線程的,所以這里要使用線程池也就很好理解了
          另外一個參數(shù)是 BlockingQueue,如果不傳該參數(shù),就會默認隊列為 LinkedBlockingQueue,任務執(zhí)行結果就是加入到這個阻塞隊列中的
          所以要徹底理解 ExecutorCompletionService ,我們只需要知道一個問題的答案就可以了:
          它是如何將異步任務結果放到這個阻塞隊列中的?
          想知道這個問題的答案,那只需要看它提交任務之后都做了些什么?
          public?Future?submit(Callable?task)?{
          ????if?(task?==?null)?throw?new?NullPointerException();
          ????RunnableFuture?f?=?newTaskFor(task);
          ????executor.execute(new?QueueingFuture(f));
          ????return?f;
          }
          我們前面也分析過,execute 是提交 Runnable 類型的任務,本身得不到返回值,但又可以將執(zhí)行結果放到阻塞隊列里面,所以肯定是在 QueueingFuture 里面做了文章
          從上圖中看一看出,QueueingFuture 實現(xiàn)的接口非常多,所以說也就具備了相應的接口能力。
          重中之重是,它繼承了 FutureTask ,F(xiàn)utureTask 重寫了 Runnable 的 run() 方法 (方法細節(jié)分析可以查看FutureTask源碼分析 ) 文中詳細說明了,無論是set() 正常結果,還是setException() 結果,都會調(diào)用 finishCompletion() 方法:
          private?void?finishCompletion()?{
          ????//?assert?state?>?COMPLETING;
          ????for?(WaitNode?q;?(q?=?waiters)?!=?null;)?{
          ????????if?(UNSAFE.compareAndSwapObject(this,?waitersOffset,?q,?null))?{
          ????????????for?(;;)?{
          ????????????????Thread?t?=?q.thread;
          ????????????????if?(t?!=?null)?{
          ????????????????????q.thread?=?null;
          ????????????????????LockSupport.unpark(t);
          ????????????????}
          ????????????????WaitNode?next?=?q.next;
          ????????????????if?(next?==?null)
          ????????????????????break;
          ????????????????q.next?=?null;?//?unlink?to?help?gc
          ????????????????q?=?next;
          ????????????}
          ????????????break;
          ????????}
          ????}

          ???//?重點?重點?重點
          ????done();

          ????callable?=?null;????????//?to?reduce?footprint
          }
          上述方法會執(zhí)行 done() 方法,而 QueueingFuture 恰巧重寫了 FutureTask 的 done() 方法:
          方法實現(xiàn)很簡單,就是將 task 放到阻塞隊列中
          protected?void?done()?{?
          ??completionQueue.add(task);?
          }
          執(zhí)行到此的 task 已經(jīng)是前序步驟 set 過結果的 task,所以就可以通過消費阻塞隊列獲取相應的結果了
          相信到這里,CompletionService 在你面前應該沒什么秘密可言了

          ?

          CompletionService 的主要用途

          在 JDK docs 上明確給了兩個例子來說明 CompletionService 的用途:
          假設你有一組針對某個問題的solvers,每個都返回一個類型為Result的值,并且想要并發(fā)地運行它們,處理每個返回一個非空值的結果,在某些方法使用(Result r)
          其實就是文中開頭的使用方式
          ?void?solve(Executor?e,
          ????????????Collection>?solvers)

          ?????throws?InterruptedException,?ExecutionException?
          {
          ?????CompletionService?ecs
          ?????????=?new?ExecutorCompletionService(e);
          ?????for?(Callable?s?:?solvers)
          ?????????ecs.submit(s);
          ?????int?n?=?solvers.size();
          ?????for?(int?i?=?0;?i??????????Result?r?=?ecs.take().get();
          ?????????if?(r?!=?null)
          ?????????????use(r);
          ?????}
          ?}
          假設你想使用任務集的第一個非空結果,忽略任何遇到異常的任務,并在第一個任務準備好時取消所有其他任務
          void?solve(Executor?e,
          ????????????Collection>?solvers)

          ?????throws?InterruptedException?
          {
          ?????CompletionService?ecs
          ?????????=?new?ExecutorCompletionService(e);
          ?????int?n?=?solvers.size();
          ?????List>?futures
          ?????????=?new?ArrayList>(n);
          ?????Result?result?=?null;
          ?????try?{
          ?????????for?(Callable?s?:?solvers)
          ?????????????futures.add(ecs.submit(s));
          ?????????for?(int?i?=?0;?i??????????????try?{
          ?????????????????Result?r?=?ecs.take().get();
          ?????????????????if?(r?!=?null)?{
          ?????????????????????result?=?r;
          ?????????????????????break;
          ?????????????????}
          ?????????????}?catch?(ExecutionException?ignore)?{}
          ?????????}
          ?????}
          ?????finally?{
          ?????????for?(Future?f?:?futures)
          ????????????//?注意這里的參數(shù)給的是?true,詳解同樣在前序?Future?源碼分析文章中
          ?????????????f.cancel(true);
          ?????}

          ?????if?(result?!=?null)
          ?????????use(result);
          ?}
          這兩種方式都是非常經(jīng)典的 CompletionService 使用 范式 ,請大家仔細品味每一行代碼的用意
          范式?jīng)]有說明 Executor 的使用,使用 ExecutorCompletionService,需要自己創(chuàng)建線程池,看上去雖然有些麻煩,但好處是你可以讓多個 ExecutorCompletionService 的線程池隔離,這種隔離性能避免幾個特別耗時的任務拖垮整個應用的風險 (這也是我們反復說過多次的,不要所有業(yè)務共用一個線程池

          ?

          總結

          CompletionService 的應用場景還是非常多的,比如
          • Dubbo 中的 Forking Cluster
          • 多倉庫文件/鏡像下載(從最近的服務中心下載后終止其他下載過程)
          • 多服務調(diào)用(天氣預報服務,最先獲取到的結果)
          CompletionService 不但能滿足獲取最快結果,還能起到一定 "load balancer" 作用,獲取可用服務的結果,使用也非常簡單, 只需要遵循范式即可
          并發(fā)系列 講了這么多,分析源碼的過程也碰到各種隊列,接下來我們就看看那些讓人眼花繚亂的隊列?

          ?

          靈魂追問

          1. 通常處理結果還會用異步方式進行處理,如果采用這種方式,有哪些注意事項?
          2. 如果是你,你會選擇使用無界隊列嗎?為什么?

          有道無術,術可成;有術無道,止于術

          歡迎大家關注Java之道公眾號


          好文章,我在看??

          瀏覽 212
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日本偷拍自拍大香蕉 | 欧美啊国产| 欧美va在线 | 看毛片网站 | 亚洲综合免费观看高清完整版在线观 |