“既生 ExecutorService, 何生 CompletionService?”

ExecutorService VS CompletionService
ExecutorService?executorService?=?Executors.newFixedThreadPool(4);
List?futures?=?new?ArrayList >();
futures.add(executorService.submit(A));
futures.add(executorService.submit(B));
futures.add(executorService.submit(C));
futures.add(executorService.submit(D));
//?遍歷?Future?list,通過?get()?方法獲取每個?future?結果
for?(Future?future:futures)?{
????Integer?result?=?future.get();
????//?其他業(yè)務邏輯
}
ExecutorService?executorService?=?Executors.newFixedThreadPool(4);
//?ExecutorCompletionService?是?CompletionService?唯一實現(xiàn)類
CompletionService?executorCompletionService=?new?ExecutorCompletionService<>(executorService?);
List?futures?=?new?ArrayList >();
futures.add(executorCompletionService.submit(A));
futures.add(executorCompletionService.submit(B));
futures.add(executorCompletionService.submit(C));
futures.add(executorCompletionService.submit(D));
//?遍歷?Future?list,通過?get()?方法獲取每個?future?結果
for?(int?i=0;?i????Integer?result?=?executorCompletionService.take().get();
????//?其他業(yè)務邏輯
}

如果 Future 結果沒有完成,調(diào)用 get() 方法,程序會阻塞在那里,直至獲取返回結果


那 CompletionService 是怎么做到獲取最先執(zhí)行完的任務結果的呢?

?
遠看CompletionService 輪廓
就是一個將異步任務的生產(chǎn)和任務完成結果的消費解耦的服務

CompletionService 自然也要圍繞著幾個關鍵字做文章了既然是異步任務,那自然可能用到 Runnable 或 Callable 既然能獲取到結果,自然也會用到 Future 了
?
近看 CompletionService 源碼
CompletionService ?是一個接口,它簡單的只有 5 個方法:Future ?submit(Callable ;?task)
Future?submit(Runnable?task,?V?result) ;
Future?take()?throws?InterruptedException ;
Future?poll() ;
Future?poll(long?timeout,?TimeUnit?unit)?throws?InterruptedException ;

Take: 如果隊列為空,那么調(diào)用 take() 方法的線程會被阻塞 Poll: 如果隊列為空,那么調(diào)用 poll() 方法的線程會返回 null Poll-timeout: 以超時的方式獲取并移除阻塞隊列中的第一個元素,如果超時時間到,隊列還是空,那么該方法會返回 null
提交異步任務 (submit) 從隊列中拿取并移除第一個元素 (take/poll)
CompletionService 只是接口,ExecutorCompletionService 是該接口的唯一實現(xiàn)類ExecutorCompletionService 源碼分析

ExecutorCompletionService 有兩種構造函數(shù):private?final?Executor?executor;
private?final?AbstractExecutorService?aes;
private?final?BlockingQueue>?completionQueue;
public?ExecutorCompletionService(Executor?executor)?{
????if?(executor?==?null)
????????throw?new?NullPointerException();
????this.executor?=?executor;
????this.aes?=?(executor?instanceof?AbstractExecutorService)??
????????(AbstractExecutorService)?executor?:?null;
????this.completionQueue?=?new?LinkedBlockingQueue>();
}
public?ExecutorCompletionService(Executor?executor,
?????????????????????????????????BlockingQueue>?completionQueue) ?{
????if?(executor?==?null?||?completionQueue?==?null)
????????throw?new?NullPointerException();
????this.executor?=?executor;
????this.aes?=?(executor?instanceof?AbstractExecutorService)??
????????(AbstractExecutorService)?executor?:?null;
????this.completionQueue?=?completionQueue;
}
LinkedBlockingQueue,任務執(zhí)行結果就是加入到這個阻塞隊列中的ExecutorCompletionService ,我們只需要知道一個問題的答案就可以了:它是如何將異步任務結果放到這個阻塞隊列中的?
public?Future ?submit(Callable {?task) ?
????if?(task?==?null)?throw?new?NullPointerException();
????RunnableFuture?f?=?newTaskFor(task);
????executor.execute(new?QueueingFuture(f));
????return?f;
}

finishCompletion() 方法:private?void?finishCompletion()?{
????//?assert?state?>?COMPLETING;
????for?(WaitNode?q;?(q?=?waiters)?!=?null;)?{
????????if?(UNSAFE.compareAndSwapObject(this,?waitersOffset,?q,?null))?{
????????????for?(;;)?{
????????????????Thread?t?=?q.thread;
????????????????if?(t?!=?null)?{
????????????????????q.thread?=?null;
????????????????????LockSupport.unpark(t);
????????????????}
????????????????WaitNode?next?=?q.next;
????????????????if?(next?==?null)
????????????????????break;
????????????????q.next?=?null;?//?unlink?to?help?gc
????????????????q?=?next;
????????????}
????????????break;
????????}
????}
???//?重點?重點?重點
????done();
????callable?=?null;????????//?to?reduce?footprint
}

protected?void?done()?{?
??completionQueue.add(task);?
}
?
CompletionService 的主要用途
假設你有一組針對某個問題的solvers,每個都返回一個類型為Result的值,并且想要并發(fā)地運行它們,處理每個返回一個非空值的結果,在某些方法使用(Result r)
?void?solve(Executor?e,
????????????Collection>?solvers)
?????throws?InterruptedException,?ExecutionException?{
?????CompletionService?ecs
?????????=?new?ExecutorCompletionService(e);
?????for?(Callable?s?:?solvers)
?????????ecs.submit(s);
?????int?n?=?solvers.size();
?????for?(int?i?=?0;?i??????????Result?r?=?ecs.take().get();
?????????if?(r?!=?null)
?????????????use(r);
?????}
?}
假設你想使用任務集的第一個非空結果,忽略任何遇到異常的任務,并在第一個任務準備好時取消所有其他任務
void?solve(Executor?e,
????????????Collection>?solvers)
?????throws?InterruptedException?{
?????CompletionService?ecs
?????????=?new?ExecutorCompletionService(e);
?????int?n?=?solvers.size();
?????List>?futures
?????????=?new?ArrayList>(n);
?????Result?result?=?null;
?????try?{
?????????for?(Callable?s?:?solvers)
?????????????futures.add(ecs.submit(s));
?????????for?(int?i?=?0;?i??????????????try?{
?????????????????Result?r?=?ecs.take().get();
?????????????????if?(r?!=?null)?{
?????????????????????result?=?r;
?????????????????????break;
?????????????????}
?????????????}?catch?(ExecutionException?ignore)?{}
?????????}
?????}
?????finally?{
?????????for?(Future?f?:?futures)
????????????//?注意這里的參數(shù)給的是?true,詳解同樣在前序?Future?源碼分析文章中
?????????????f.cancel(true);
?????}
?????if?(result?!=?null)
?????????use(result);
?}
?
總結
Dubbo 中的 Forking Cluster 多倉庫文件/鏡像下載(從最近的服務中心下載后終止其他下載過程) 多服務調(diào)用(天氣預報服務,最先獲取到的結果)
?
靈魂追問
通常處理結果還會用異步方式進行處理,如果采用這種方式,有哪些注意事項? 如果是你,你會選擇使用無界隊列嗎?為什么?
有道無術,術可成;有術無道,止于術
歡迎大家關注Java之道公眾號
好文章,我在看??
評論
圖片
表情
