JDK 源碼解析:深入淺出異步任務(wù) FutureTask
作者:Sumkor
來源:SegmentFault 思否社區(qū)
前言
在 Java 中,Runnable 接口表示一個沒有返回結(jié)果的任務(wù),而 Callable 接口表示具有返回結(jié)果的任務(wù)。
在并發(fā)編程中,異步執(zhí)行任務(wù),再獲取任務(wù)結(jié)果,可以提高系統(tǒng)的吞吐量。Future 接口應運而生,它表示異步任務(wù)的執(zhí)行結(jié)果,并提供了檢查任務(wù)是否執(zhí)行完、取消任務(wù)、獲取任務(wù)執(zhí)行結(jié)果等功能。FutureTask 是 Future 接口的基本實現(xiàn),常與線程池實現(xiàn)類 ThreadPoolExecutor 配合使用。
| 本文基于 jdk1.8.0_91
1. 繼承體系

RunnableFuture 接口同時實現(xiàn)了 Runnable 接口和 Future 接口,是一種冗余設(shè)計。
java.util.concurrent.RunnableFuture
/*** A {@link Future} that is {@link Runnable}. Successful execution of* the {@code run} method causes completion of the {@code Future}* and allows access to its results.** @see FutureTask* @see Executor* @since 1.6* @author Doug Lea* @param <V> The result type returned by this Future's {@code get} method*/public interface RunnableFuture<V> extends Runnable, Future<V> {/*** Sets this Future to the result of its computation* unless it has been cancelled.*/void run();}
FutureTask 是一個可取消的異步任務(wù),是對 Future 接口的基本實現(xiàn),具有以下功能:
啟動或中斷的任務(wù)的執(zhí)行;
判斷任務(wù)是否執(zhí)行完成;
獲取任務(wù)執(zhí)行完成后的結(jié)果。
同時,F(xiàn)utureTask 可以用于包裝 Callable 或 Runnable 對象。
由于它實現(xiàn)了 Runnable 接口,可以提交給 Executor 執(zhí)行。
/*** A cancellable asynchronous computation.** @since 1.5* @author Doug Lea* @param <V> The result type returned by this FutureTask's {@code get} methods*/public class FutureTask<V> implements RunnableFuture<V>
java.util.concurrent.Executor
/*** An object that executes submitted {@link Runnable} tasks.** @since 1.5* @author Doug Lea*/public interface Executor {void execute(Runnable command);}
2. 屬性
java.util.concurrent.FutureTask
// The run state of this task, initially NEW.// 任務(wù)的執(zhí)行狀態(tài),初始為 NEW。private volatile int state;/** The underlying callable; nulled out after running */// 需要執(zhí)行的任務(wù),任務(wù)執(zhí)行完后為空private Callable<V> callable;/** The result to return or exception to throw from get() */// 任務(wù)的執(zhí)行結(jié)果,或者任務(wù)拋出的異常private Object outcome; // non-volatile, protected by state reads/writes/** The thread running the callable; CASed during run() */// 執(zhí)行任務(wù)的線程private volatile Thread runner;/** Treiber stack of waiting threads */// 指向棧頂?shù)闹羔槪瑮=Y(jié)構(gòu)用于存儲等待任務(wù)執(zhí)行結(jié)果的線程private volatile WaitNode waiters;
其中 state、runner、waiters 三個屬性在并發(fā)時存在爭用,采用 CAS 維護其準確性。
// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long stateOffset;private static final long runnerOffset;private static final long waitersOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = FutureTask.class;stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));} catch (Exception e) {throw new Error(e);}}
2.1 狀態(tài)定義
/*** The run state of this task, initially NEW. The run state* transitions to a terminal state only in methods set,* setException, and cancel. During completion, state may take on* transient values of COMPLETING (while outcome is being set) or* INTERRUPTING (only while interrupting the runner to satisfy a* cancel(true)). Transitions from these intermediate to final* states use cheaper ordered/lazy writes because values are unique* and cannot be further modified.** Possible state transitions:* NEW -> COMPLETING -> NORMAL* NEW -> COMPLETING -> EXCEPTIONAL* NEW -> CANCELLED* NEW -> INTERRUPTING -> INTERRUPTED*/private volatile int state;private static final int NEW = 0;private static final int COMPLETING = 1;private static final int NORMAL = 2;private static final int EXCEPTIONAL = 3;private static final int CANCELLED = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED = 6;
FutureTask 中使用 state 代表任務(wù)在運行過程中的狀態(tài)。隨著任務(wù)的執(zhí)行,狀態(tài)將不斷地進行轉(zhuǎn)變。
狀態(tài)的說明:
NEW:新建狀態(tài),任務(wù)都從該狀態(tài)開始。
COMPLETING:任務(wù)正在執(zhí)行中。
NORMAL:任務(wù)正常執(zhí)行完成。
EXCEPTIONAL:任務(wù)執(zhí)行過程中拋出了異常。
CANCELLED:任務(wù)被取消(不響應中斷)。
INTERRUPTING:任務(wù)正在被中斷。
INTERRUPTED:任務(wù)已經(jīng)中斷。
狀態(tài)轉(zhuǎn)移過程:
NEW -> COMPLETING -> NORMALNEW -> COMPLETING -> EXCEPTIONALNEW -> CANCELLEDNEW -> INTERRUPTING -> INTERRUPTED
狀態(tài)的分類:
任務(wù)的初始狀態(tài):NEW
任務(wù)的中間狀態(tài):COMPLETING、INTERRUPTING
任務(wù)的終止狀態(tài):NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED
2.1 狀態(tài)使用
FutureTask 中判斷任務(wù)是否已取消、是否已完成,是根據(jù) state 來判斷的。
public boolean isCancelled() {return state >= CANCELLED; // CANCELLED、INTERRUPTING、INTERRUPTED}public boolean isDone() {return state != NEW;}
可以看到:
被取消或被中斷的任務(wù)(CANCELLED、INTERRUPTING、INTERRUPTED),都視為已取消。
當任務(wù)離開了初始狀態(tài) NEW,就視為任務(wù)已結(jié)束。任務(wù)的中間態(tài)很短暫,并不代表任務(wù)正在執(zhí)行,而是任務(wù)已經(jīng)執(zhí)行完了,正在設(shè)置最終的返回結(jié)果。
根據(jù)狀態(tài)值,F(xiàn)utureTask 可以保證已經(jīng)完成的任務(wù)不會被再次運行或者被取消。
中間狀態(tài)雖然是一個瞬時狀態(tài),在 FutureTask 中用于線程間的通訊。
例如:
在 FutureTask#run 中檢測到狀態(tài) >= INTERRUPTING,說明其他線程發(fā)起了取消操作,當前線程需等待對方完成中斷。
在 FutureTask#get 中檢測到狀態(tài) <= COMPLETING,說明執(zhí)行任務(wù)的線程尚未處理完,當前線程需等待對方完成任務(wù)。
2.2 棧(Treiber stack)
/** Treiber stack of waiting threads */private volatile WaitNode waiters; // 棧頂指針/*** Simple linked list nodes to record waiting threads in a Treiber* stack. See other classes such as Phaser and SynchronousQueue* for more detailed explanation.*/static final class WaitNode {volatile Thread thread; // 等待任務(wù)執(zhí)行結(jié)果的線程volatile WaitNode next; // 棧的下一個節(jié)點WaitNode() { thread = Thread.currentThread(); }}
FutureTask 使用鏈表來構(gòu)造棧(Treiber stack,使用 CAS 保證棧操作的線程安全,參考 java.util.concurrent.SynchronousQueue.TransferStack)。
其中 waiters 是鏈表的頭節(jié)點,代表棧頂?shù)闹羔槨?/span>
棧的作用:
FutureTask 實現(xiàn)了 Future 接口,如果獲取結(jié)果時,任務(wù)還沒有執(zhí)行完畢,那么獲取結(jié)果的線程就在棧中掛起,直到任務(wù)執(zhí)行完畢被喚醒。
3. 構(gòu)造函數(shù)
賦值任務(wù),設(shè)置任務(wù)的初始狀態(tài)。
/*** Creates a {@code FutureTask} that will, upon running, execute the* given {@code Callable}.** @param callable the callable task* @throws NullPointerException if the callable is null*/public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW; // ensure visibility of callable}/*** Creates a {@code FutureTask} that will, upon running, execute the* given {@code Runnable}, and arrange that {@code get} will return the* given result on successful completion.** @param runnable the runnable task* @param result the result to return on successful completion. If* you don't need a particular result, consider using* constructions of the form:* {@code Future<?> f = new FutureTask<Void>(runnable, null)}* @throws NullPointerException if the runnable is null*/public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW; // ensure visibility of callable}
值得注意的兩個地方:
FutureTask 創(chuàng)建的時候,狀態(tài)為 NEW。
由于 FutureTask 使用 Callable 表示任務(wù),需用 Executors#callable 方法將 Runnable 轉(zhuǎn)換為 Callable。
測試:
public void executors() throws Exception {Callable<String> callable = Executors.callable(new Runnable() {public void run() {System.out.println("run!");}}, "haha");String call = callable.call();System.out.println("call = " + call);}
執(zhí)行結(jié)果:
run!call = haha
更多原文內(nèi)容提前看:
Runnable 實現(xiàn)
FutureTask#run
FutureTask#set
FutureTask#setException
FutureTask#finishCompletion
FutureTask#handlePossibleCancellationInterrupt
FutureTask#runAndReset
Future 實現(xiàn)
Future#get
FutureTask#awaitDone
FutureTask#report
Future#get(timeout, unit)
Future#cancel
實例
總結(jié)

