CompletionService 介紹
介紹
java.util.concurrent.CompletionService是對(duì) ExecutorService的一個(gè)功能增強(qiáng)封裝,優(yōu)化了獲取異步操作結(jié)果的接口。
使用場(chǎng)景
假設(shè)我們要向線程池提交一批任務(wù),并獲取任務(wù)結(jié)果。一般的方式是提交任務(wù)后,從線程池得到一批 Future對(duì)象集合,然后依次調(diào)用其 get()方法。
這里有個(gè)問(wèn)題:因?yàn)槲覀儠?huì)要按固定的順序來(lái)遍歷 Future元素,而 get()方法又是阻塞的,因此如果某個(gè) Future對(duì)象執(zhí)行時(shí)間太長(zhǎng),會(huì)使得我們的遍歷過(guò)程阻塞在該元素上,無(wú)法及時(shí)從后面早已完成的 Future當(dāng)中取得結(jié)果。
CompletionService解決了這個(gè)問(wèn)題。它本身不包含線程池,創(chuàng)建一個(gè) CompletionService需要先創(chuàng)建一個(gè) Executor。下面是一個(gè)例子:
String result = completionService.take().get();這個(gè) take()方法返回的是最早完成的任務(wù)的結(jié)果,這個(gè)就解決了一個(gè)任務(wù)被另一個(gè)任務(wù)阻塞的問(wèn)題。下面是一個(gè)完整的例子:
示例
final SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); CompletionService<String> csv = new ExecutorCompletionService<String>(Executors.newFixedThreadPool(10)); // 此線程池運(yùn)行5個(gè)線程 for (int i = 0; i < 5; i++) { final int index = i; csv.submit(new Callable<String>() { @Override public String call() throws Exception { System.out.println("Thread-" + index + "-begin-" + sf.format(new Date())); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Thread-" + index + "-end-" + sf.format(new Date())); return "index-" + index; } }); } try { Future<String> f = csv.poll(); // poll方法 返回的Future可能為 null,因?yàn)閜oll 是非阻塞執(zhí)行的 if (f != null) { System.out.println(f.get()); } else { System.out.println("使用poll 獲取到的Future為 null"); } } catch (Exception e1) { e1.printStackTrace(); } for (int i = 0; i < 5; i++) { try { // csv.take() 返回的是 最先完成任務(wù)的 Future 對(duì)象,take 方法時(shí)阻塞執(zhí)行的 System.out.println(csv.take().get()); } catch (Exception e) { e.printStackTrace(); } } |
這個(gè)例子的執(zhí)行結(jié)果如下所示:
// 示例1:像使用 ExecutorService 一樣使用 CompletionService
10:22:32:271 - Task 4 started, duration=400
10:22:32:271 - Task 3 started, duration=600
10:22:32:271 - Task 2 started, duration=800
10:22:32:271 - Task 1 started, duration=1000
10:22:32:687 - Task 4 completed.
10:22:32:888 - Task 3 completed.
10:22:33:089 - Task 2 completed.
10:22:33:303 - Task 1 completed.
10:22:33:303 - Result of task 1
10:22:33:303 - Result of task 2
10:22:33:303 - Result of task 3
10:22:33:303 - Result of task 4
// 示例2:按標(biāo)準(zhǔn)方式使用 CompletionService
10:22:33:305 - Task 5 started, duration=1000
10:22:33:305 - Task 7 started, duration=600
10:22:33:305 - Task 6 started, duration=800
10:22:33:305 - Task 8 started, duration=400
10:22:33:718 - Task 8 completed.
10:22:33:718 - Result of task 8
10:22:33:918 - Task 7 completed.
10:22:33:918 - Result of task 7
10:22:34:119 - Task 6 completed.
10:22:34:119 - Result of task 6
10:22:34:320 - Task 5 completed.
10:22:34:320 - Result of task 5可以看出,在示例 1 中,雖然 Task 4 執(zhí)行時(shí)間只有 400ms,但因?yàn)槲覀兪前凑?1-2-3-4 的順序依次取結(jié)果,因此 Task 4 完成后并沒(méi)有馬上打印出結(jié)果來(lái)。而在示例 2 中,對(duì)每個(gè) Task 都是在完成時(shí)立刻就將結(jié)果打印出來(lái)了。這就是 CompletionService的優(yōu)勢(shì)所在。
原理解釋
CompletionService之所以能夠做到這點(diǎn),是因?yàn)樗鼪](méi)有采取依次遍歷 Future 的方式,而是在中間加上了一個(gè)結(jié)果隊(duì)列,任務(wù)完成后馬上將結(jié)果放入隊(duì)列,那么從隊(duì)列中取到的就是最早完成的結(jié)果。
如果隊(duì)列為空,那么 take()方法會(huì)阻塞直到隊(duì)列中出現(xiàn)結(jié)果為止。此外 CompletionService還提供一個(gè) poll()方法,返回值與 take()方法一樣,不同之處在于它不會(huì)阻塞,如果隊(duì)列為空則立刻返回 null。這算是給用戶多一種選擇。
