剛研究完Callable和Future,各位隨便問??!
在Java的多線程編程中,除了Thread類和Runnable接口外,不得不說的就是Callable接口Future接口了。使用繼承Thread類或者實現(xiàn)Runnable接口的線程,無法返回最終的執(zhí)行結(jié)果數(shù)據(jù),只能等待線程執(zhí)行完成。此時,如果想要獲取線程執(zhí)行后的返回結(jié)果,那么,Callable和Future就派上用場了。
Callable接口
1.Callable接口介紹
Callable接口是JDK1.5新增的泛型接口,在JDK1.8中,被聲明為函數(shù)式接口,如下所示。
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
在JDK 1.8中只聲明有一個方法的接口為函數(shù)式接口,函數(shù)式接口可以使用@FunctionalInterface注解修飾,也可以不使用@FunctionalInterface注解修飾。只要一個接口中只包含有一個方法,那么,這個接口就是函數(shù)式接口。
在JDK中,實現(xiàn)Callable接口的子類如下圖所示。

默認的子類層級關(guān)系圖看不清,這里,可以通過IDEA右鍵Callable接口,選擇“Layout”來指定Callable接口的實現(xiàn)類圖的不同結(jié)構(gòu),如下所示。

這里,可以選擇“Organic Layout”選項,選擇后的Callable接口的子類的結(jié)構(gòu)如下圖所示。

在實現(xiàn)Callable接口的子類中,有幾個比較重要的類,如下圖所示。

分別是:Executors類中的靜態(tài)內(nèi)部類:PrivilegedCallable、PrivilegedCallableUsingCurrentClassLoader、RunnableAdapter和Task類下的TaskCallable。
2.實現(xiàn)Callable接口的重要類分析
接下來,分析的類主要有:PrivilegedCallable、PrivilegedCallableUsingCurrentClassLoader、RunnableAdapter和Task類下的TaskCallable。雖然這些類在實際工作中很少被直接用到,但是作為一名合格的開發(fā)工程師,設(shè)置是禿頂?shù)馁Y深專家來說,了解并掌握這些類的實現(xiàn)有助你進一步理解Callable接口,并提高專業(yè)技能(頭發(fā)再掉一批,哇哈哈哈。。。)。
PrivilegedCallable
PrivilegedCallable類是Callable接口的一個特殊實現(xiàn)類,它表明Callable對象有某種特權(quán)來訪問系統(tǒng)的某種資源,PrivilegedCallable類的源代碼如下所示。
/**
* A callable that runs under established access control settings
*/
static final class PrivilegedCallable<T> implements Callable<T> {
private final Callable<T> task;
private final AccessControlContext acc;
PrivilegedCallable(Callable<T> task) {
this.task = task;
this.acc = AccessController.getContext();
}
public T call() throws Exception {
try {
return AccessController.doPrivileged(
new PrivilegedExceptionAction<T>() {
public T run() throws Exception {
return task.call();
}
}, acc);
} catch (PrivilegedActionException e) {
throw e.getException();
}
}
}
從PrivilegedCallable類的源代碼來看,可以將PrivilegedCallable看成是對Callable接口的封裝,并且這個類也繼承了Callable接口。
在PrivilegedCallable類中有兩個成員變量,分別是Callable接口的實例對象和AccessControlContext類的實例對象,如下所示。
private final Callable<T> task;
private final AccessControlContext acc;
其中,AccessControlContext類可以理解為一個具有系統(tǒng)資源訪問決策的上下文類,通過這個類可以訪問系統(tǒng)的特定資源。通過類的構(gòu)造方法可以看出,在實例化AccessControlContext類的對象時,只需要傳遞Callable接口子類的對象即可,如下所示。
PrivilegedCallable(Callable<T> task) {
this.task = task;
this.acc = AccessController.getContext();
}
AccessControlContext類的對象是通過AccessController類的getContext()方法獲取的,這里,查看AccessController類的getContext()方法,如下所示。
public static AccessControlContext getContext(){
AccessControlContext acc = getStackAccessControlContext();
if (acc == null) {
return new AccessControlContext(null, true);
} else {
return acc.optimize();
}
}
通過AccessController的getContext()方法可以看出,首先通過getStackAccessControlContext()方法來獲取AccessControlContext對象實例。如果獲取的AccessControlContext對象實例為空,則通過調(diào)用AccessControlContext類的構(gòu)造方法實例化,否則,調(diào)用AccessControlContext對象實例的optimize()方法返回AccessControlContext對象實例。
這里,我們先看下getStackAccessControlContext()方法是個什么鬼。
private static native AccessControlContext getStackAccessControlContext();
原來是個本地方法,方法的字面意思就是獲取能夠訪問系統(tǒng)棧的決策上下文對象。
接下來,我們回到PrivilegedCallable類的call()方法,如下所示。
public T call() throws Exception {
try {
return AccessController.doPrivileged(
new PrivilegedExceptionAction<T>() {
public T run() throws Exception {
return task.call();
}
}, acc);
} catch (PrivilegedActionException e) {
throw e.getException();
}
}
通過調(diào)用AccessController.doPrivileged()方法,傳遞PrivilegedExceptionAction。接口對象和AccessControlContext對象,并最終返回泛型的實例對象。
首先,看下AccessController.doPrivileged()方法,如下所示。
@CallerSensitive
public static native <T> T
doPrivileged(PrivilegedExceptionAction<T> action,
AccessControlContext context)
throws PrivilegedActionException;
可以看到,又是一個本地方法。也就是說,最終的執(zhí)行情況是將PrivilegedExceptionAction接口對象和AccessControlContext對象實例傳遞給這個本地方法執(zhí)行。并且在PrivilegedExceptionAction接口對象的run()方法中調(diào)用Callable接口的call()方法來執(zhí)行最終的業(yè)務(wù)邏輯,并且返回泛型對象。
PrivilegedCallableUsingCurrentClassLoader
此類表示為在已經(jīng)建立的特定訪問控制和當(dāng)前的類加載器下運行的Callable類,源代碼如下所示。
/**
* A callable that runs under established access control settings and
* current ClassLoader
*/
static final class PrivilegedCallableUsingCurrentClassLoader<T> implements Callable<T> {
private final Callable<T> task;
private final AccessControlContext acc;
private final ClassLoader ccl;
PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
sm.checkPermission(new RuntimePermission("setContextClassLoader"));
}
this.task = task;
this.acc = AccessController.getContext();
this.ccl = Thread.currentThread().getContextClassLoader();
}
public T call() throws Exception {
try {
return AccessController.doPrivileged(
new PrivilegedExceptionAction<T>() {
public T run() throws Exception {
Thread t = Thread.currentThread();
ClassLoader cl = t.getContextClassLoader();
if (ccl == cl) {
return task.call();
} else {
t.setContextClassLoader(ccl);
try {
return task.call();
} finally {
t.setContextClassLoader(cl);
}
}
}
}, acc);
} catch (PrivilegedActionException e) {
throw e.getException();
}
}
}
這個類理解起來比較簡單,首先,在類中定義了三個成員變量,如下所示。
private final Callable<T> task;
private final AccessControlContext acc;
private final ClassLoader ccl;
接下來,通過構(gòu)造方法注入Callable對象,在構(gòu)造方法中,首先獲取系統(tǒng)安全管理器對象實例,通過系統(tǒng)安全管理器對象實例檢查是否具有獲取ClassLoader和設(shè)置ContextClassLoader的權(quán)限。并在構(gòu)造方法中為三個成員變量賦值,如下所示。
PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
sm.checkPermission(new RuntimePermission("setContextClassLoader"));
}
this.task = task;
this.acc = AccessController.getContext();
this.ccl = Thread.currentThread().getContextClassLoader();
}
接下來,通過調(diào)用call()方法來執(zhí)行具體的業(yè)務(wù)邏輯,如下所示。
public T call() throws Exception {
try {
return AccessController.doPrivileged(
new PrivilegedExceptionAction<T>() {
public T run() throws Exception {
Thread t = Thread.currentThread();
ClassLoader cl = t.getContextClassLoader();
if (ccl == cl) {
return task.call();
} else {
t.setContextClassLoader(ccl);
try {
return task.call();
} finally {
t.setContextClassLoader(cl);
}
}
}
}, acc);
} catch (PrivilegedActionException e) {
throw e.getException();
}
}
在call()方法中同樣是通過調(diào)用AccessController類的本地方法doPrivileged,傳遞PrivilegedExceptionAction接口的實例對象和AccessControlContext類的對象實例。
具體執(zhí)行邏輯為:在PrivilegedExceptionAction對象的run()方法中獲取當(dāng)前線程的ContextClassLoader對象,如果在構(gòu)造方法中獲取的ClassLoader對象與此處的ContextClassLoader對象是同一個對象(不止對象實例相同,而且內(nèi)存地址也相同),則直接調(diào)用Callable對象的call()方法返回結(jié)果。否則,將PrivilegedExceptionAction對象的run()方法中的當(dāng)前線程的ContextClassLoader設(shè)置為在構(gòu)造方法中獲取的類加載器對象,接下來,再調(diào)用Callable對象的call()方法返回結(jié)果。最終將當(dāng)前線程的ContextClassLoader重置為之前的ContextClassLoader。
RunnableAdapter
RunnableAdapter類比較簡單,給定運行的任務(wù)和結(jié)果,運行給定的任務(wù)并返回給定的結(jié)果,源代碼如下所示。
/**
* A callable that runs given task and returns given result
*/
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
TaskCallable
TaskCallable類是javafx.concurrent.Task類的靜態(tài)內(nèi)部類,TaskCallable類主要是實現(xiàn)了Callable接口并且被定義為FutureTask的類,并且在這個類中允許我們攔截call()方法來更新task任務(wù)的狀態(tài)。源代碼如下所示。
private static final class TaskCallable<V> implements Callable<V> {
private Task<V> task;
private TaskCallable() { }
@Override
public V call() throws Exception {
task.started = true;
task.runLater(() -> {
task.setState(State.SCHEDULED);
task.setState(State.RUNNING);
});
try {
final V result = task.call();
if (!task.isCancelled()) {
task.runLater(() -> {
task.updateValue(result);
task.setState(State.SUCCEEDED);
});
return result;
} else {
return null;
}
} catch (final Throwable th) {
task.runLater(() -> {
task._setException(th);
task.setState(State.FAILED);
});
if (th instanceof Exception) {
throw (Exception) th;
} else {
throw new Exception(th);
}
}
}
}
從TaskCallable類的源代碼可以看出,只定義了一個Task類型的成員變量。下面主要分析TaskCallable類的call()方法。
當(dāng)程序的執(zhí)行進入到call()方法時,首先將task對象的started屬性設(shè)置為true,表示任務(wù)已經(jīng)開始,并且將任務(wù)的狀態(tài)依次設(shè)置為State.SCHEDULED和State.RUNNING,依次觸發(fā)任務(wù)的調(diào)度事件和運行事件。如下所示。
task.started = true;
task.runLater(() -> {
task.setState(State.SCHEDULED);
task.setState(State.RUNNING);
});
接下來,在try代碼塊中執(zhí)行Task對象的call()方法,返回泛型對象。如果任務(wù)沒有被取消,則更新任務(wù)的緩存,將調(diào)用call()方法返回的泛型對象綁定到Task對象中的ObjectProperty
private final ObjectProperty<V> value = new SimpleObjectProperty<>(this, "value");
接下來,將任務(wù)的狀態(tài)設(shè)置為成功狀態(tài)。如下所示。
try {
final V result = task.call();
if (!task.isCancelled()) {
task.runLater(() -> {
task.updateValue(result);
task.setState(State.SUCCEEDED);
});
return result;
} else {
return null;
}
}
如果程序拋出了異?;蛘咤e誤,會進入catch()代碼塊,設(shè)置Task對象的Exception信息并將狀態(tài)設(shè)置為State.FAILED,也就是將任務(wù)標(biāo)記為失敗。接下來,判斷異?;蝈e誤的類型,如果是Exception類型的異常,則直接強轉(zhuǎn)為Exception類型的異常并拋出。否則,將異?;蛘咤e誤封裝為Exception對象并拋出,如下所示。
catch (final Throwable th) {
task.runLater(() -> {
task._setException(th);
task.setState(State.FAILED);
});
if (th instanceof Exception) {
throw (Exception) th;
} else {
throw new Exception(th);
}
}
兩種異步模型與深度解析Future接口
兩種異步模型
在Java的并發(fā)編程中,大體上會分為兩種異步編程模型,一類是直接以異步的形式來并行運行其他的任務(wù),不需要返回任務(wù)的結(jié)果數(shù)據(jù)。一類是以異步的形式運行其他任務(wù),需要返回結(jié)果。
1.無返回結(jié)果的異步模型
無返回結(jié)果的異步任務(wù),可以直接將任務(wù)丟進線程或線程池中運行,此時,無法直接獲得任務(wù)的執(zhí)行結(jié)果數(shù)據(jù),一種方式是可以使用回調(diào)方法來獲取任務(wù)的運行結(jié)果。
具體的方案是:定義一個回調(diào)接口,并在接口中定義接收任務(wù)結(jié)果數(shù)據(jù)的方法,具體邏輯在回調(diào)接口的實現(xiàn)類中完成。將回調(diào)接口與任務(wù)參數(shù)一同放進線程或線程池中運行,任務(wù)運行后調(diào)用接口方法,執(zhí)行回調(diào)接口實現(xiàn)類中的邏輯來處理結(jié)果數(shù)據(jù)。這里,給出一個簡單的示例供參考。
定義回調(diào)接口
package io.binghe.concurrent.lab04;
/**
* @author binghe
* @version 1.0.0
* @description 定義回調(diào)接口
*/
public interface TaskCallable<T> {
T callable(T t);
}
便于接口的通用型,這里為回調(diào)接口定義了泛型。
定義任務(wù)結(jié)果數(shù)據(jù)的封裝類
package io.binghe.concurrent.lab04;
import java.io.Serializable;
/**
* @author binghe
* @version 1.0.0
* @description 任務(wù)執(zhí)行結(jié)果
*/
public class TaskResult implements Serializable {
private static final long serialVersionUID = 8678277072402730062L;
/**
* 任務(wù)狀態(tài)
*/
private Integer taskStatus;
/**
* 任務(wù)消息
*/
private String taskMessage;
/**
* 任務(wù)結(jié)果數(shù)據(jù)
*/
private String taskResult;
//省略getter和setter方法
@Override
public String toString() {
return "TaskResult{" +
"taskStatus=" + taskStatus +
", taskMessage='" + taskMessage + '\'' +
", taskResult='" + taskResult + '\'' +
'}';
}
}
創(chuàng)建回調(diào)接口的實現(xiàn)類
回調(diào)接口的實現(xiàn)類主要用來對任務(wù)的返回結(jié)果進行相應(yīng)的業(yè)務(wù)處理,這里,為了方便演示,只是將結(jié)果數(shù)據(jù)返回。大家需要根據(jù)具體的業(yè)務(wù)場景來做相應(yīng)的分析和處理。
package io.binghe.concurrent.lab04;
/**
* @author binghe
* @version 1.0.0
* @description 回調(diào)函數(shù)的實現(xiàn)類
*/
public class TaskHandler implements TaskCallable<TaskResult> {
@Override
public TaskResult callable(TaskResult taskResult) {
//TODO 拿到結(jié)果數(shù)據(jù)后進一步處理
System.out.println(taskResult.toString());
return taskResult;
}
}
創(chuàng)建任務(wù)的執(zhí)行類
任務(wù)的執(zhí)行類是具體執(zhí)行任務(wù)的類,實現(xiàn)Runnable接口,在此類中定義一個回調(diào)接口類型的成員變量和一個String類型的任務(wù)參數(shù)(模擬任務(wù)的參數(shù)),并在構(gòu)造方法中注入回調(diào)接口和任務(wù)參數(shù)。在run方法中執(zhí)行任務(wù),任務(wù)完成后將任務(wù)的結(jié)果數(shù)據(jù)封裝成TaskResult對象,調(diào)用回調(diào)接口的方法將TaskResult對象傳遞到回調(diào)方法中。
package io.binghe.concurrent.lab04;
/**
* @author binghe
* @version 1.0.0
* @description 任務(wù)執(zhí)行類
*/
public class TaskExecutor implements Runnable{
private TaskCallable<TaskResult> taskCallable;
private String taskParameter;
public TaskExecutor(TaskCallable<TaskResult> taskCallable, String taskParameter){
this.taskCallable = taskCallable;
this.taskParameter = taskParameter;
}
@Override
public void run() {
//TODO 一系列業(yè)務(wù)邏輯,將結(jié)果數(shù)據(jù)封裝成TaskResult對象并返回
TaskResult result = new TaskResult();
result.setTaskStatus(1);
result.setTaskMessage(this.taskParameter);
result.setTaskResult("異步回調(diào)成功");
taskCallable.callable(result);
}
}
到這里,整個大的框架算是完成了,接下來,就是測試看能否獲取到異步任務(wù)的結(jié)果了。
異步任務(wù)測試類
package io.binghe.concurrent.lab04;
/**
* @author binghe
* @version 1.0.0
* @description 測試回調(diào)
*/
public class TaskCallableTest {
public static void main(String[] args){
TaskCallable<TaskResult> taskCallable = new TaskHandler();
TaskExecutor taskExecutor = new TaskExecutor(taskCallable, "測試回調(diào)任務(wù)");
new Thread(taskExecutor).start();
}
}
在測試類中,使用Thread類創(chuàng)建一個新的線程,并啟動線程運行任務(wù)。運行程序最終的接口數(shù)據(jù)如下所示。
TaskResult{taskStatus=1, taskMessage='測試回調(diào)任務(wù)', taskResult='異步回調(diào)成功'}
大家可以細細品味下這種獲取異步結(jié)果的方式。這里,只是簡單的使用了Thread類來創(chuàng)建并啟動線程,也可以使用線程池的方式實現(xiàn)。大家可自行實現(xiàn)以線程池的方式通過回調(diào)接口獲取異步結(jié)果。
2.有返回結(jié)果的異步模型
盡管使用回調(diào)接口能夠獲取異步任務(wù)的結(jié)果,但是這種方式使用起來略顯復(fù)雜。在JDK中提供了可以直接返回異步結(jié)果的處理方案。最常用的就是使用Future接口或者其實現(xiàn)類FutureTask來接收任務(wù)的返回結(jié)果。
使用Future接口獲取異步結(jié)果
使用Future接口往往配合線程池來獲取異步執(zhí)行結(jié)果,如下所示。
package io.binghe.concurrent.lab04;
import java.util.concurrent.*;
/**
* @author binghe
* @version 1.0.0
* @description 測試Future獲取異步結(jié)果
*/
public class FutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "測試Future獲取異步結(jié)果";
}
});
System.out.println(future.get());
executorService.shutdown();
}
}
運行結(jié)果如下所示。
測試Future獲取異步結(jié)果
使用FutureTask類獲取異步結(jié)果
FutureTask類既可以結(jié)合Thread類使用也可以結(jié)合線程池使用,接下來,就看下這兩種使用方式。
結(jié)合Thread類的使用示例如下所示。
package io.binghe.concurrent.lab04;
import java.util.concurrent.*;
/**
* @author binghe
* @version 1.0.0
* @description 測試FutureTask獲取異步結(jié)果
*/
public class FutureTaskTest {
public static void main(String[] args)throws ExecutionException, InterruptedException{
FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
return "測試FutureTask獲取異步結(jié)果";
}
});
new Thread(futureTask).start();
System.out.println(futureTask.get());
}
}
運行結(jié)果如下所示。
測試FutureTask獲取異步結(jié)果
結(jié)合線程池的使用示例如下。
package io.binghe.concurrent.lab04;
import java.util.concurrent.*;
/**
* @author binghe
* @version 1.0.0
* @description 測試FutureTask獲取異步結(jié)果
*/
public class FutureTaskTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
return "測試FutureTask獲取異步結(jié)果";
}
});
executorService.execute(futureTask);
System.out.println(futureTask.get());
executorService.shutdown();
}
}
運行結(jié)果如下所示。
測試FutureTask獲取異步結(jié)果
可以看到使用Future接口或者FutureTask類來獲取異步結(jié)果比使用回調(diào)接口獲取異步結(jié)果簡單多了。注意:實現(xiàn)異步的方式很多,這里只是用多線程舉例。
接下來,就深入分析下Future接口。
深度解析Future接口
1.Future接口
Future是JDK1.5新增的異步編程接口,其源代碼如下所示。
package java.util.concurrent;
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
可以看到,在Future接口中,總共定義了5個抽象方法。接下來,就分別介紹下這5個方法的含義。
cancel(boolean)
取消任務(wù)的執(zhí)行,接收一個boolean類型的參數(shù),成功取消任務(wù),則返回true,否則返回false。當(dāng)任務(wù)已經(jīng)完成,已經(jīng)結(jié)束或者因其他原因不能取消時,方法會返回false,表示任務(wù)取消失敗。當(dāng)任務(wù)未啟動調(diào)用了此方法,并且結(jié)果返回true(取消成功),則當(dāng)前任務(wù)不再運行。如果任務(wù)已經(jīng)啟動,會根據(jù)當(dāng)前傳遞的boolean類型的參數(shù)來決定是否中斷當(dāng)前運行的線程來取消當(dāng)前運行的任務(wù)。
isCancelled()
判斷任務(wù)在完成之前是否被取消,如果在任務(wù)完成之前被取消,則返回true;否則,返回false。
這里需要注意一個細節(jié):只有任務(wù)未啟動,或者在完成之前被取消,才會返回true,表示任務(wù)已經(jīng)被成功取消。其他情況都會返回false。
isDone()
判斷任務(wù)是否已經(jīng)完成,如果任務(wù)正常結(jié)束、拋出異常退出、被取消,都會返回true,表示任務(wù)已經(jīng)完成。
get()
當(dāng)任務(wù)完成時,直接返回任務(wù)的結(jié)果數(shù)據(jù);當(dāng)任務(wù)未完成時,等待任務(wù)完成并返回任務(wù)的結(jié)果數(shù)據(jù)。
get(long, TimeUnit)
當(dāng)任務(wù)完成時,直接返回任務(wù)的結(jié)果數(shù)據(jù);當(dāng)任務(wù)未完成時,等待任務(wù)完成,并設(shè)置了超時等待時間。在超時時間內(nèi)任務(wù)完成,則返回結(jié)果;否則,拋出TimeoutException異常。
2.RunnableFuture接口
Future接口有一個重要的子接口,那就是RunnableFuture接口,RunnableFuture接口不但繼承了Future接口,而且繼承了java.lang.Runnable接口,其源代碼如下所示。
package java.util.concurrent;
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
這里,問一下,RunnableFuture接口中有幾個抽象方法?想好了再說!哈哈哈。。。
這個接口比較簡單run()方法就是運行任務(wù)時調(diào)用的方法。
3.FutureTask類
FutureTask類是RunnableFuture接口的一個非常重要的實現(xiàn)類,它實現(xiàn)了RunnableFuture接口、Future接口和Runnable接口的所有方法。FutureTask類的源代碼比較多,這個就不粘貼了,大家自行到j(luò)ava.util.concurrent下查看。
(1)FutureTask類中的變量與常量
在FutureTask類中首先定義了一個狀態(tài)變量state,這個變量使用了volatile關(guān)鍵字修飾,這里,大家只需要知道volatile關(guān)鍵字通過內(nèi)存屏障和禁止重排序優(yōu)化來實現(xiàn)線程安全,后續(xù)會單獨深度分析volatile關(guān)鍵字是如何保證線程安全的。緊接著,定義了幾個任務(wù)運行時的狀態(tài)常量,如下所示。
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
其中,代碼注釋中給出了幾個可能的狀態(tài)變更流程,如下所示。
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
接下來,定義了其他幾個成員變量,如下所示。
private Callable<V> callable;
private Object outcome;
private volatile Thread runner;
private volatile WaitNode waiters;
又看到我們所熟悉的Callable接口了,Callable接口那肯定就是用來調(diào)用call()方法執(zhí)行具體任務(wù)了。
outcome:Object類型,表示通過get()方法獲取到的結(jié)果數(shù)據(jù)或者異常信息。
runner:運行Callable的線程,運行期間會使用CAS保證線程安全,這里大家只需要知道CAS是Java保證線程安全的一種方式,后續(xù)文章中會深度分析CAS如何保證線程安全。
waiters:WaitNode類型的變量,表示等待線程的堆棧,在FutureTask的實現(xiàn)中,會通過CAS結(jié)合此堆棧交換任務(wù)的運行狀態(tài)。
看一下WaitNode類的定義,如下所示。
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
可以看到,WaitNode類是FutureTask類的靜態(tài)內(nèi)部類,類中定義了一個Thread成員變量和指向下一個WaitNode節(jié)點的引用。其中通過構(gòu)造方法將thread變量設(shè)置為當(dāng)前線程。
(2)構(gòu)造方法
接下來,是FutureTask的兩個構(gòu)造方法,比較簡單,如下所示。
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
(3)是否取消與完成方法
繼續(xù)向下看源碼,看到一個任務(wù)是否取消的方法,和一個任務(wù)是否完成的方法,如下所示。
public boolean isCancelled() {
return state >= CANCELLED;
}
public boolean isDone() {
return state != NEW;
}
這兩方法中,都是通過判斷任務(wù)的狀態(tài)來判定任務(wù)是否已取消和已完成的。為啥會這樣判斷呢?再次查看FutureTask類中定義的狀態(tài)常量發(fā)現(xiàn),其常量的定義是有規(guī)律的,并不是隨意定義的。其中,大于或者等于CANCELLED的常量為CANCELLED、INTERRUPTING和INTERRUPTED,這三個狀態(tài)均可以表示線程已經(jīng)被取消。當(dāng)狀態(tài)不等于NEW時,可以表示任務(wù)已經(jīng)完成。
通過這里,大家可以學(xué)到一點:以后在編碼過程中,要按照規(guī)律來定義自己使用的狀態(tài),尤其是涉及到業(yè)務(wù)中有頻繁的狀態(tài)變更的操作,有規(guī)律的狀態(tài)可使業(yè)務(wù)處理變得事半功倍,這也是通過看別人的源碼設(shè)計能夠?qū)W到的,這里,建議大家還是多看別人寫的優(yōu)秀的開源框架的源碼。
(4)取消方法
我們繼續(xù)向下看源碼,接下來,看到的是cancel(boolean)方法,如下所示。
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
接下來,拆解cancel(boolean)方法。在cancel(boolean)方法中,首先判斷任務(wù)的狀態(tài)和CAS的操作結(jié)果,如果任務(wù)的狀態(tài)不等于NEW或者CAS的操作返回false,則直接返回false,表示任務(wù)取消失敗。如下所示。
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
接下來,在try代碼塊中,首先判斷是否可以中斷當(dāng)前任務(wù)所在的線程來取消任務(wù)的運行。如果可以中斷當(dāng)前任務(wù)所在的線程,則以一個Thread臨時變量來指向運行任務(wù)的線程,當(dāng)指向的變量不為空時,調(diào)用線程對象的interrupt()方法來中斷線程的運行,最后將線程標(biāo)記為被中斷的狀態(tài)。如下所示。
try {
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
}
這里,發(fā)現(xiàn)變更任務(wù)狀態(tài)使用的是UNSAFE.putOrderedInt()方法,這個方法是個什么鬼呢?點進去看一下,如下所示。
public native void putOrderedInt(Object var1, long var2, int var4);
可以看到,又是一個本地方法,嘿嘿,這里先不管它,后續(xù)文章會詳解這些方法的作用。
接下來,cancel(boolean)方法會進入finally代碼塊,如下所示。
finally {
finishCompletion();
}
可以看到在finallly代碼塊中調(diào)用了finishCompletion()方法,顧名思義,finishCompletion()方法表示結(jié)束任務(wù)的運行,接下來看看它是如何實現(xiàn)的。點到finishCompletion()方法中看一下,如下所示。
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
在finishCompletion()方法中,首先定義一個for循環(huán),循環(huán)終止因子為waiters為null,在循環(huán)中,判斷CAS操作是否成功,如果成功進行if條件中的邏輯。首先,定義一個for自旋循環(huán),在自旋循環(huán)體中,喚醒WaitNode堆棧中的線程,使其運行完成。當(dāng)WaitNode堆棧中的線程運行完成后,通過break退出外層for循環(huán)。接下來調(diào)用done()方法。done()方法又是個什么鬼呢?點進去看一下,如下所示。
protected void done() { }
可以看到,done()方法是一個空的方法體,交由子類來實現(xiàn)具體的業(yè)務(wù)邏輯。
當(dāng)我們的具體業(yè)務(wù)中,需要在取消任務(wù)時,執(zhí)行一些額外的業(yè)務(wù)邏輯,可以在子類中覆寫done()方法的實現(xiàn)。
(5)get()方法
繼續(xù)向下看FutureTask類的代碼,F(xiàn)utureTask類中實現(xiàn)了兩個get()方法,如下所示。
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
沒參數(shù)的get()方法為當(dāng)任務(wù)未運行完成時,會阻塞,直到返回任務(wù)結(jié)果。有參數(shù)的get()方法為當(dāng)任務(wù)未運行完成,并且等待時間超出了超時時間,會TimeoutException異常。
兩個get()方法的主要邏輯差不多,一個沒有超時設(shè)置,一個有超時設(shè)置,這里說一下主要邏輯。判斷任務(wù)的當(dāng)前狀態(tài)是否小于或者等于COMPLETING,也就是說,任務(wù)是NEW狀態(tài)或者COMPLETING,調(diào)用awaitDone()方法,看下awaitDone()方法的實現(xiàn),如下所示。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
接下來,拆解awaitDone()方法。在awaitDone()方法中,最重要的就是for自旋循環(huán),在循環(huán)中首先判斷當(dāng)前線程是否被中斷,如果已經(jīng)被中斷,則調(diào)用removeWaiter()將當(dāng)前線程從堆棧中移除,并且拋出InterruptedException異常,如下所示。
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
接下來,判斷任務(wù)的當(dāng)前狀態(tài)是否完成,如果完成,并且堆棧句柄不為空,則將堆棧中的當(dāng)前線程設(shè)置為空,返回當(dāng)前任務(wù)的狀態(tài),如下所示。
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
當(dāng)任務(wù)的狀態(tài)為COMPLETING時,使當(dāng)前線程讓出CPU資源,如下所示。
else if (s == COMPLETING)
Thread.yield();
如果堆棧為空,則創(chuàng)建堆棧對象,如下所示。
else if (q == null)
q = new WaitNode();
如果queued變量為false,通過CAS操作為queued賦值,如果awaitDone()方法傳遞的timed參數(shù)為true,則計算超時時間,當(dāng)時間已超時,則在堆棧中移除當(dāng)前線程并返回任務(wù)狀態(tài),如下所示。如果未超時,則重置超時時間,如下所示。
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
如果不滿足上述的所有條件,則將當(dāng)前線程設(shè)置為等待狀態(tài),如下所示。
else
LockSupport.park(this);
接下來,回到get()方法中,當(dāng)awaitDone()方法返回結(jié)果,或者任務(wù)的狀態(tài)不滿足條件時,都會調(diào)用report()方法,并將當(dāng)前任務(wù)的狀態(tài)傳遞到report()方法中,并返回結(jié)果,如下所示。
return report(s);
看來,這里還要看下report()方法啊,點進去看下report()方法的實現(xiàn),如下所示。
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
可以看到,report()方法的實現(xiàn)比較簡單,首先,將outcome數(shù)據(jù)賦值給x變量,接下來,主要是判斷接收到的任務(wù)狀態(tài),如果狀態(tài)為NORMAL,則將x強轉(zhuǎn)為泛型類型返回;當(dāng)任務(wù)的狀態(tài)大于或者等于CANCELLED,也就是任務(wù)已經(jīng)取消,則拋出CancellationException異常,其他情況則拋出ExecutionException異常。
至此,get()方法分析完成。注意:一定要理解get()方法的實現(xiàn),因為get()方法是我們使用Future接口和FutureTask類時,使用的比較頻繁的一個方法。
(6)set()方法與setException()方法
繼續(xù)看FutureTask類的代碼,接下來看到的是set()方法與setException()方法,如下所示。
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
通過源碼可以看出,set()方法與setException()方法整體邏輯幾乎一樣,只是在設(shè)置任務(wù)狀態(tài)時一個將狀態(tài)設(shè)置為NORMAL,一個將狀態(tài)設(shè)置為EXCEPTIONAL。
至于finishCompletion()方法,前面已經(jīng)分析過。
(7)run()方法與runAndReset()方法
接下來,就是run()方法了,run()方法的源代碼如下所示。
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
可以這么說,只要使用了Future和FutureTask,就必然會調(diào)用run()方法來運行任務(wù),掌握run()方法的流程是非常有必要的。在run()方法中,如果當(dāng)前狀態(tài)不是NEW,或者CAS操作返回的結(jié)果為false,則直接返回,不再執(zhí)行后續(xù)邏輯,如下所示。
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
接下來,在try代碼塊中,將成員變量callable賦值給一個臨時變量c,判斷臨時變量不等于null,并且任務(wù)狀態(tài)為NEW,則調(diào)用Callable接口的call()方法,并接收結(jié)果數(shù)據(jù)。并將ran變量設(shè)置為true。當(dāng)程序拋出異常時,將接收結(jié)果的變量設(shè)置為null,ran變量設(shè)置為false,并且調(diào)用setException()方法將任務(wù)的狀態(tài)設(shè)置為EXCEPTIONA。接下來,如果ran變量為true,則調(diào)用set()方法,如下所示。
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
}
接下來,程序會進入finally代碼塊中,如下所示。
finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
這里,將runner設(shè)置為null,如果任務(wù)的當(dāng)前狀態(tài)大于或者等于INTERRUPTING,也就是線程被中斷了。則調(diào)用handlePossibleCancellationInterrupt()方法,接下來,看下handlePossibleCancellationInterrupt()方法的實現(xiàn)。
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield();
}
可以看到,handlePossibleCancellationInterrupt()方法的實現(xiàn)比較簡單,當(dāng)任務(wù)的狀態(tài)為INTERRUPTING時,使用while()循環(huán),條件為當(dāng)前任務(wù)狀態(tài)為INTERRUPTING,將當(dāng)前線程占用的CPU資源釋放,也就是說,當(dāng)任務(wù)運行完成后,釋放線程所占用的資源。
runAndReset()方法的邏輯與run()差不多,只是runAndReset()方法會在finally代碼塊中將任務(wù)狀態(tài)重置為NEW。runAndReset()方法的源代碼如下所示,就不重復(fù)說了。
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
(8)removeWaiter()方法
removeWaiter()方法中主要是使用自旋循環(huán)的方式來移除WaitNode中的線程,比較簡單,如下所示。
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
最后,在FutureTask類的最后,有如下代碼。
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
關(guān)于這些代碼的作用,會在后續(xù)深度解析CAS文章中詳細說明,這里就不再探討。
至此,關(guān)于Future接口和FutureTask類的源碼就分析完了。
