我靠(call) ,我的未來(Future)在哪里???
點擊藍色“Java建設者?”關注我喲
加個“星標”,及時閱讀最新技術文章

大家好,我是 cxuan,之前一直在分享操作系統(tǒng)相關的文章,兜兜轉轉回到了 Java 文章分享,本篇文章是讀者投稿,來和你一起聊一聊 Future?~
我們大家都知道,在 Java 中創(chuàng)建線程主要有三種方式:
繼承 Thread 類; 實現(xiàn) Runnable 接口; 實現(xiàn) Callable 接口。
而后兩者的區(qū)別在于 Callable 接口中的 call() 方法可以異步地返回一個計算結果 Future,并且一般需要配合 ExecutorService 來執(zhí)行。這一套操作在代碼實現(xiàn)上似乎也并不難,可是對于call()方法具體怎么(被ExecutorService)執(zhí)行的,以及 Future 這個結果是怎么獲取的,卻又不是很清楚了。
那么本篇文章,我們就一起來學習下 Callable 接口以及 Future 的使用,主要面向兩個問題:
承載著具體任務的 call() 方法如何被執(zhí)行的? 任務的執(zhí)行結果如何得到?
你可能會說,這兩個難道不是一個問題嗎?任務執(zhí)行了就會有返回結果,而返回結果也一定是任務執(zhí)行了才返回的,難道還能返回一個其他任務的結果么??不要著急,耐心的看下去,你就會發(fā)現(xiàn),這兩個還真的就是一個問題。
本文將分為兩個部分,第一部分分別介紹 任務、執(zhí)行、以及結果這三個概念在 Java API 中的實體和各自的繼承關系,第二部分通過一個簡單的例子回顧他們的用法,再理解下這兩個問題的答案。
Callable、Executor 與 Future
既然是一個任務被執(zhí)行并返回結果,那么我們先來看看具體的任務,也就是 Callable 接口。
任務:Callable
非常簡單,只包含一個有泛型「返回值」的 call() 方法,需要在最后返回定義類型的結果。如果任務沒有需要返回的結果,那么將泛型 V 設為 void 并return null;就可以了。對比的是 Runnable,另一個明顯的區(qū)別則是 Callable可以拋出異常。
public?interface?Callable<V>?{
????V?call()?throws?Exception;
}
public?interface?Runnable?{
????public?abstract?void?run();
}
執(zhí)行:ExecutorService
說到線程就少不了線程池,而說到線程池肯定離不開 Executor 接口。下面這幅圖是 Executor 的框架,我們常用的是其中的兩個具體實現(xiàn)類 ThreadPoolExecutor 以及 ScheduledThreadPoolExecutor,在 Executors 類中通過靜態(tài)方法獲取。Executors 中包含了線程池以及線程工廠的構造,與 Executor 接口的關系類似于 Collection 接口和 Collections 類的關系。

那么我們自頂向下,從源碼上了解一下 Executor 框架,學習學習任務是如何被執(zhí)行的。首先是 Executor 接口,其中只定義了 execute() 方法。
public?interface?Executor?{
????void?execute(Runnable?command);
}
ExecutorService 接口繼承了 Executor 接口,主要擴展了一系列的 submit() 方法以及對 executor 的終止和判斷狀態(tài)。以第一個為例,其中 task 為用戶定義的執(zhí)行的異步任務,F(xiàn)uture 表示了任務的執(zhí)行結果,泛型 T 代表任務結果的類型。
public?interface?ExecutorService?extends?Executor?{
????void?shutdown();????????????????//?現(xiàn)有任務完成后停止線程池
?
????List?shutdownNow() ;???//?立即停止線程池
????boolean?isShutdown();???????????//?判斷是否已停止
????boolean?isTerminated();
?????Future?submit(Callable?task) ;????????//?提交Callale任務
?????Future?submit(Runnable?task,?T?result) ;
????Future>?submit(Runnable?task);
????//?針對Callable集合的invokeAll()等方法
}
抽象類AbstractExecutorService 是 ThreadPoolExecutor 的基類,在下面的代碼中,它實現(xiàn)了ExecutorService 接口中的 submit() 方法。注釋中是對應的 newTaskFor() 方法的代碼,非常簡單,就是將傳入的Callable 或 Runnable 參數(shù)封裝成一個 FutureTask 對象。
//?1.第一個重載方法,參數(shù)為Callable
public??Future?submit(Callable?task) ? {
??if?(task?==?null)?throw?new?NullPointerException();
??RunnableFuture?ftask?=?newTaskFor(task);
??//?return?new?FutureTask(callable);
??execute(ftask);
??return?ftask;
}
//?2.第二個重載方法,參數(shù)為Runnable
public?Future>?submit(Runnable?task)?{
??if?(task?==?null)?throw?new?NullPointerException();
??RunnableFuture?ftask?=?newTaskFor(task,?null);
??//?return?new?FutureTask(task,?null);
??execute(ftask);
??return?ftask;
}
//?3.第三個重載方法,參數(shù)為Runnable?+?返回對象
public??Future?submit(Runnable?task,?T?result)? {
??if?(task?==?null)?throw?new?NullPointerException();
??RunnableFuture?ftask?=?newTaskFor(task,?result);
??//?return?new?FutureTask(task,?result);
??execute(ftask);
??return?ftask;
}
那么也就是說,無論傳入的是 Callable 還是 Runnable,submit() 方法其實就做了三件事

具體來說,submit() 中首先生成了一個 RunnableFuture 引用的 FutureTask 實例,然后調用 execute() 方法來執(zhí)行它,那么我們可以推測 FutureTask 繼承自 RunnableFuture,而 RunnableFuture 又實現(xiàn)了 Runnable,因為execute() 的參數(shù)應為 Runnable 類型。上面還涉及到了 FutureTask 的構造函數(shù),也來看一下。
public?FutureTask(Callable?callable) ?{
??this.callable?=?callable;
??this.state?=?NEW;
}
public?FutureTask(Runnable?runnable,?V?result)?{
??this.callable?=?Executors.callable(runnable,?result);?//?通過適配器將runnable在call()中執(zhí)行并返回result
??this.state?=?NEW;
}
FutureTask 共有兩個構造方法。第一個構造方法比較簡單,對應上面的第一個 submit(),采用組合的方式封裝Callable 并將狀態(tài)設為NEW;而第二個構造方法對應上面的后兩個 submit() 重載,不同之處是首先使用了Executors.callable來將 Runnable 和 result 組合成 Callable,這里采用了適配器RunnableAdapter implements Callable,巧妙地在 call() 中執(zhí)行 Runnable 并返回結果。
static?final?class?RunnableAdapter<T>?implements?Callable<T>?{
??final?Runnable?task;
??final?T?result;????????????????//?返回的結果;顯然:需要在run()中賦值
??RunnableAdapter(Runnable?task,?T?result)?{
????this.task?=?task;
????this.result?=?result;
??}
??public?T?call()?{
????task.run();
????return?result;
??}
}
在適配器設計模式中,通常包含目標接口 Target、適配器 Adapter 和被適配者 Adaptee 三類角色,其中目標接口代表客戶端(當前業(yè)務系統(tǒng))所需要的功能,通常為借口或抽象類;被適配者為現(xiàn)存的不能滿足使用需求的類;適配器是一個轉換器,也稱 wrapper,用于給被適配者添加目標功能,使得客戶端可以按照目標接口的格式正確訪問。對于 RunnableAdapter 來說,Callable 是其目標接口,而 Runnable 則是被適配者。RunnableAdapter 通過覆蓋 call() 方法使其可按照 Callable 的要求來使用,同時其構造方法中接收被適配者和目標對象,滿足了 call() 方法有返回值的要求。

那么總結一下 submit() 方法執(zhí)行的流程,就是:「Callable 被封裝在 Runnable 的子類中傳入 execute() 得以執(zhí)行」。
結果:Future
要說 Future 就是異步任務的執(zhí)行結果其實并不準確,因為它代表了一個任務的執(zhí)行過程,有狀態(tài)、可以被取消,而 get() 方法的返回值才是任務的結果。
public?interface?Future<V>?{
????boolean?cancel(boolean?mayInterruptIfRunning);
????boolean?isCancelled();
????boolean?isDone();
????V?get()?throws?InterruptedException,?ExecutionException;
????V?get(long?timeout,?TimeUnit?unit)
????????throws?InterruptedException,?ExecutionException,?TimeoutException;
}
我們在上面中還提到了 RuunableFuture 和 FutureTask。從官方的注釋來看,RuunableFuture 就是一個可以 run的 future,實現(xiàn)了 Runnable 和 Future 兩個接口,在 run() 方法中執(zhí)行完計算時應該將結果保存起來以便通過 get()獲取。
public?interface?RunnableFuture<V>?extends?Runnable,?Future<V>?{
????/**
?????*?Sets?this?Future?to?the?result?of?its?computation?unless?it?has?been?cancelled.
?????*/
????void?run();
}
FutureTask 直接實現(xiàn)了 RunnableFuture 接口,作為執(zhí)行過程,共有下面這幾種狀態(tài),其中 COMPLETING 為一個暫時狀態(tài),表示正在設置結果或異常,對應的,設置完成后狀態(tài)變?yōu)?NORMAL 或 EXCEPTIONAL;CANCELLED、INTERRUPTED 表示任務被取消或中斷。在上面的構造方法中,將 state 初始化為 NEW。
????private?volatile?int?state;
????private?static?final?int?NEW??????????=?0;
????private?static?final?int?COMPLETING???=?1;
????private?static?final?int?NORMAL???????=?2;
????private?static?final?int?EXCEPTIONAL??=?3;
????private?static?final?int?CANCELLED????=?4;
????private?static?final?int?INTERRUPTING?=?5;
????private?static?final?int?INTERRUPTED??=?6;
然后是 FutureTask 的主要內容,主要是 run() 和 get()。注意 outcome 的注釋,無論是否發(fā)生異常返回的都是這個 outcome,因為在執(zhí)行中如果執(zhí)行成功就將結果設置給了它(set()),而發(fā)生異常時將異常賦給了他(setException()),而在獲取結果時也都返回了 outcome(通過report())。
public?class?FutureTask<V>?implements?RunnableFuture<V>?{
????
????private?Callable?callable;?????????//?target,待執(zhí)行的任務
????
????/**?保存執(zhí)行結果或異常,在get()方法中返回/拋出?*/
????private?Object?outcome;?//?非volatile,通過CAS保證線程安全
????
????
????public?void?run()?{
????????......
????????Callable?c?=?callable;
????????if?(c?!=?null?&&?state?==?NEW)?{
????????????V?result;
????????????boolean?ran;
????????????try?{
????????????????result?=?c.call();????????????//?調用call()執(zhí)行用戶任務并獲取結果
????????????????ran?=?true;???????????????????//?執(zhí)行完成,ran置為true
????????????}?catch?(Throwable?ex)?{??????????//?調用call()出現(xiàn)異常,而run()方法繼續(xù)執(zhí)行
?????????????????result?=?null;
?????????????????ran?=?false;
?????????????????setException(ex);????????????
?????????????????//?setException(Throwable?t):?compareAndSwapInt(NEW,?COMPLETING);??outcome?=?t;??????
????????????}
????????????if?(ran)
????????????????set(result);??????????????????
?????????????//?set(V?v):?compareAndSwapInt(NEW,?COMPLETING);??outcome?=?v;
????????}
????}
????
????
????public?V?get()?throws?InterruptedException,?ExecutionException?{
????????int?s?=?state;
????????if?(s?<=?COMPLETING)
????????????s?=?awaitDone(false,?0L);?????????//?加入隊列等待COMPLETING完成,可響應超時、中斷
????????return?report(s);
????}
????public?V?get(long?timeout,?TimeUnit?unit)
????????throws?InterruptedException,?ExecutionException,?TimeoutException?{
????????//?超時等待
????}
????
????private?V?report(int?s)?throws?ExecutionException?{
????????Object?x?=?outcome;
????????if?(s?==?NORMAL)??????????????????????????????//?將outcome作為執(zhí)行結果返回
????????????return?(V)x;
????????if?(s?>=?CANCELLED)
????????????throw?new?CancellationException();
????????throw?new?ExecutionException((Throwable)x);???//?將outcome作為捕獲的返回
????}
}
FutureTask 實現(xiàn)了 RunnableFuture 接口,所以有兩方面的作用。
第一,作為 Runnable 傳入 execute() 方法來執(zhí)行,同時封裝 Callable 對象并在 run() 中調用其 call() 方法; 第二,作為 Future 管理任務的執(zhí)行狀態(tài),將 call() 的返回值保存在 outcome 中以通過 get() 獲取。這似乎就能回答開頭的兩個問題,并且渾然天成,就好像是一個問題,除非發(fā)生異常的時候返回的不是任務的結果而是異常對象。
總結一下繼承關系:

二、使用舉例
文章的標題有點唬人,說到底還是講 Callable 的用法?,F(xiàn)在我們知道了 Future 代表了任務執(zhí)行的過程和結果,作為 call() 方法的返回值來獲取執(zhí)行結果;而 FutureTask 是一個 Runnable 的 Future,既是任務執(zhí)行的過程和結果,又是 call 方法最終執(zhí)行的載體。下面通過一個例子看看他們在使用上的區(qū)別。
首先創(chuàng)建一個任務,即定義一個任務類實現(xiàn) Callable 接口,在 call() 方法里添加我們的操作,這里用耗時三秒然后返回 100 模擬計算過程。
class?MyTask?implements?Callable<Integer>?{
????@Override
????public?Integer?call()?throws?Exception?{
????????System.out.println("子線程開始計算...");
????????for?(int?i=0;i<3;++i){
????????????Thread.sleep(1000);
????????????System.out.println("子線程計算中,用時?"+(i+1)+"?秒");
????????}
????????System.out.println("子線程計算完成,返回:100");
????????return?100;
????}
}
然后呢,創(chuàng)建一個線程池,并實例化一個 MyTask 備用。
ExecutorService?executor?=?Executors.newCachedThreadPool();
MyTask?task?=?new?MyTask();
現(xiàn)在,分別使用 Future 和 FutureTask 來獲取執(zhí)行結果,看看他們有什么區(qū)別。
使用Future
Future 一般作為 submit() 的返回值使用,并在主線程中以阻塞的方式獲取異步任務的執(zhí)行結果。
System.out.println("主線程啟動線程池");
Future?future?=?executor.submit(task);
System.out.println("主線程得到返回結果:"+future.get());
executor.shutdown();
看看輸出結果:
主線程啟動線程池
子線程開始計算...
子線程計算中,用時?1?秒
子線程計算中,用時?2?秒
子線程計算中,用時?3?秒
子線程計算完成,返回:100
主線程得到返回結果:100
由于 get() 方法阻塞獲取結果,所以輸出順序為子線程計算完成后主線程輸出結果。
使用FutureTask
由于 FutureTask 集「任務與結果」于一身,所以我們可以使用 FutureTask 自身而非返回值來管理任務,這需要首先利用 Callable 對象來構造 FutureTask,并調用不同的submit()重載方法。
System.out.println("主線程啟動線程池");
FutureTask?futureTask?=?new?FutureTask<>(task);
executor.submit(futureTask);?????????????????????????????????//?作為Ruunable傳入submit()中
System.out.println("主線程得到返回結果:"+futureTask.get());????//?作為Future獲取結果
executor.shutdown();
這段程序的輸出與上面中完全相同,其實兩者在實際執(zhí)行中的區(qū)別也不大,雖然前者調用了submit(Callable而后者調用了submit(Runnable task),但最終都通過execute(futuretask)來把任務加入線程池中。
總結
上面大費周章其實只是盡可能細致地講清楚了 Callable 中的任務是如何執(zhí)行的,總結起來就是:
線程池中,submit() 方法實際上將 Callable 封裝在 FutureTask 中,將其作為 Runnable 的子類傳給 execute()真正執(zhí)行; FutureTask 在 run() 中調用 Callable 對象的 call() 方法并接收返回值或捕獲異常保存在 Object outcome中,同時管理執(zhí)行過程中的狀態(tài)state;FutureTask 同時作為 Future 的子類,通過 get() 返回任務的執(zhí)行結果,若未執(zhí)行完成則通過等待隊列進行阻塞等待完成;
FutureTask 作為一個 Runnable 的 Future,其中最重要的兩個方法如下。



8. 深入理解 MySQL:快速學會分析SQL執(zhí)行效率

掃碼二維碼關注我
·end·
—如果本文有幫助,請分享到朋友圈吧—
我們一起愉快的玩耍!

