<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          JDK 源碼解析:深入淺出異步任務(wù) FutureTask

          共 7403字,需瀏覽 15分鐘

           ·

          2021-05-07 04:41

          作者:Sumkor

          來源:SegmentFault 思否社區(qū)


          前言



          在 Java 中,Runnable 接口表示一個沒有返回結(jié)果的任務(wù),而 Callable 接口表示具有返回結(jié)果的任務(wù)。


          在并發(fā)編程中,異步執(zhí)行任務(wù),再獲取任務(wù)結(jié)果,可以提高系統(tǒng)的吞吐量。Future 接口應運而生,它表示異步任務(wù)的執(zhí)行結(jié)果,并提供了檢查任務(wù)是否執(zhí)行完、取消任務(wù)、獲取任務(wù)執(zhí)行結(jié)果等功能。FutureTask 是 Future 接口的基本實現(xiàn),常與線程池實現(xiàn)類 ThreadPoolExecutor 配合使用。

          | 本文基于 jdk1.8.0_91


          1. 繼承體系




          RunnableFuture 接口同時實現(xiàn)了 Runnable 接口和 Future 接口,是一種冗余設(shè)計。

          java.util.concurrent.RunnableFuture

          /** * A {@link Future} that is {@link Runnable}. Successful execution of * the {@code run} method causes completion of the {@code Future} * and allows access to its results. *  * @see FutureTask * @see Executor * @since 1.6 * @author Doug Lea * @param <V> The result type returned by this Future's {@code get} method */public interface RunnableFuture<V> extends Runnable, Future<V> {    /**     * Sets this Future to the result of its computation     * unless it has been cancelled.     */    void run();}


          FutureTask 是一個可取消的異步任務(wù),是對 Future 接口的基本實現(xiàn),具有以下功能:

          • 啟動或中斷的任務(wù)的執(zhí)行;

          • 判斷任務(wù)是否執(zhí)行完成;

          • 獲取任務(wù)執(zhí)行完成后的結(jié)果。


          同時,F(xiàn)utureTask 可以用于包裝 Callable 或 Runnable 對象。
          由于它實現(xiàn)了 Runnable 接口,可以提交給 Executor 執(zhí)行。

          /** * A cancellable asynchronous computation.  * * @since 1.5 * @author Doug Lea * @param <V> The result type returned by this FutureTask's {@code get} methods */public class FutureTask<V> implements RunnableFuture<V>

          java.util.concurrent.Executor

          /** * An object that executes submitted {@link Runnable} tasks. * * @since 1.5 * @author Doug Lea */public interface Executor {

          void execute(Runnable command);}

          2. 屬性



          java.util.concurrent.FutureTask

          // The run state of this task, initially NEW.// 任務(wù)的執(zhí)行狀態(tài),初始為 NEW。private volatile int state;

          /** The underlying callable; nulled out after running */// 需要執(zhí)行的任務(wù),任務(wù)執(zhí)行完后為空private Callable<V> callable;

          /** The result to return or exception to throw from get() */// 任務(wù)的執(zhí)行結(jié)果,或者任務(wù)拋出的異常private Object outcome; // non-volatile, protected by state reads/writes

          /** The thread running the callable; CASed during run() */// 執(zhí)行任務(wù)的線程private volatile Thread runner;

          /** Treiber stack of waiting threads */// 指向棧頂?shù)闹羔槪瑮=Y(jié)構(gòu)用于存儲等待任務(wù)執(zhí)行結(jié)果的線程private volatile WaitNode waiters;

          其中 state、runner、waiters 三個屬性在并發(fā)時存在爭用,采用 CAS 維護其準確性。

          // Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long stateOffset;private static final long runnerOffset;private static final long waitersOffset;static {    try {        UNSAFE = sun.misc.Unsafe.getUnsafe();        Class<?> k = FutureTask.class;        stateOffset = UNSAFE.objectFieldOffset            (k.getDeclaredField("state"));        runnerOffset = UNSAFE.objectFieldOffset            (k.getDeclaredField("runner"));        waitersOffset = UNSAFE.objectFieldOffset            (k.getDeclaredField("waiters"));    } catch (Exception e) {        throw new Error(e);    }}


          2.1 狀態(tài)定義



          /** * The run state of this task, initially NEW.  The run state * transitions to a terminal state only in methods set, * setException, and cancel.  During completion, state may take on * transient values of COMPLETING (while outcome is being set) or * INTERRUPTING (only while interrupting the runner to satisfy a * cancel(true)). Transitions from these intermediate to final * states use cheaper ordered/lazy writes because values are unique * and cannot be further modified. * * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */private volatile int state;private static final int NEW          = 0;private static final int COMPLETING   = 1;private static final int NORMAL       = 2;private static final int EXCEPTIONAL  = 3;private static final int CANCELLED    = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED  = 6;

          FutureTask 中使用 state 代表任務(wù)在運行過程中的狀態(tài)。隨著任務(wù)的執(zhí)行,狀態(tài)將不斷地進行轉(zhuǎn)變。


          狀態(tài)的說明:

          • NEW:新建狀態(tài),任務(wù)都從該狀態(tài)開始。

          • COMPLETING:任務(wù)正在執(zhí)行中。

          • NORMAL:任務(wù)正常執(zhí)行完成。

          • EXCEPTIONAL:任務(wù)執(zhí)行過程中拋出了異常。

          • CANCELLED:任務(wù)被取消(不響應中斷)。

          • INTERRUPTING:任務(wù)正在被中斷。

          • INTERRUPTED:任務(wù)已經(jīng)中斷。


          狀態(tài)轉(zhuǎn)移過程:

          NEW -> COMPLETING -> NORMALNEW -> COMPLETING -> EXCEPTIONALNEW -> CANCELLEDNEW -> INTERRUPTING -> INTERRUPTED


          狀態(tài)的分類:

          • 任務(wù)的初始狀態(tài):NEW

          • 任務(wù)的中間狀態(tài):COMPLETING、INTERRUPTING

          • 任務(wù)的終止狀態(tài):NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED


          2.1 狀態(tài)使用



          FutureTask 中判斷任務(wù)是否已取消、是否已完成,是根據(jù) state 來判斷的。

          public boolean isCancelled() {    return state >= CANCELLED; // CANCELLED、INTERRUPTING、INTERRUPTED}

          public boolean isDone() { return state != NEW;}

          可以看到:

          • 被取消或被中斷的任務(wù)(CANCELLED、INTERRUPTING、INTERRUPTED),都視為已取消。

          • 當任務(wù)離開了初始狀態(tài) NEW,就視為任務(wù)已結(jié)束。任務(wù)的中間態(tài)很短暫,并不代表任務(wù)正在執(zhí)行,而是任務(wù)已經(jīng)執(zhí)行完了,正在設(shè)置最終的返回結(jié)果。


          根據(jù)狀態(tài)值,F(xiàn)utureTask 可以保證已經(jīng)完成的任務(wù)不會被再次運行或者被取消。

          中間狀態(tài)雖然是一個瞬時狀態(tài),在 FutureTask 中用于線程間的通訊。


          例如:

          • 在 FutureTask#run 中檢測到狀態(tài) >= INTERRUPTING,說明其他線程發(fā)起了取消操作,當前線程需等待對方完成中斷。

          • 在 FutureTask#get 中檢測到狀態(tài) <= COMPLETING,說明執(zhí)行任務(wù)的線程尚未處理完,當前線程需等待對方完成任務(wù)。


          2.2 棧(Treiber stack)



          /** Treiber stack of waiting threads */private volatile WaitNode waiters; // 棧頂指針

          /** * Simple linked list nodes to record waiting threads in a Treiber * stack. See other classes such as Phaser and SynchronousQueue * for more detailed explanation. */static final class WaitNode { volatile Thread thread; // 等待任務(wù)執(zhí)行結(jié)果的線程 volatile WaitNode next; // 棧的下一個節(jié)點 WaitNode() { thread = Thread.currentThread(); }}

          FutureTask 使用鏈表來構(gòu)造棧(Treiber stack,使用 CAS 保證棧操作的線程安全,參考 java.util.concurrent.SynchronousQueue.TransferStack)。
          其中 waiters 是鏈表的頭節(jié)點,代表棧頂?shù)闹羔槨?/span>


          棧的作用:
          FutureTask 實現(xiàn)了 Future 接口,如果獲取結(jié)果時,任務(wù)還沒有執(zhí)行完畢,那么獲取結(jié)果的線程就在棧中掛起,直到任務(wù)執(zhí)行完畢被喚醒。


          3. 構(gòu)造函數(shù)



          賦值任務(wù),設(shè)置任務(wù)的初始狀態(tài)。

          /** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Callable}. * * @param  callable the callable task * @throws NullPointerException if the callable is null */public FutureTask(Callable<V> callable) {    if (callable == null)        throw new NullPointerException();    this.callable = callable;    this.state = NEW;       // ensure visibility of callable}

          /** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Runnable}, and arrange that {@code get} will return the * given result on successful completion. * * @param runnable the runnable task * @param result the result to return on successful completion. If * you don't need a particular result, consider using * constructions of the form: * {@code Future<?> f = new FutureTask<Void>(runnable, null)} * @throws NullPointerException if the runnable is null */public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable}

          值得注意的兩個地方:

          • FutureTask 創(chuàng)建的時候,狀態(tài)為 NEW。

          • 由于 FutureTask 使用 Callable 表示任務(wù),需用 Executors#callable 方法將 Runnable 轉(zhuǎn)換為 Callable。

          測試:

          @Testpublic void executors() throws Exception {    Callable<String> callable = Executors.callable(new Runnable() {        @Override        public void run() {            System.out.println("run!");        }    }, "haha");    String call = callable.call();    System.out.println("call = " + call);}

          執(zhí)行結(jié)果:

          run!call = haha

          更多原文內(nèi)容提前看:

          • Runnable 實現(xiàn)

            • FutureTask#run

            • FutureTask#set

            • FutureTask#setException

            • FutureTask#finishCompletion

            • FutureTask#handlePossibleCancellationInterrupt

            • FutureTask#runAndReset

          • Future 實現(xiàn)

            • Future#get

            • FutureTask#awaitDone

            • FutureTask#report

            • Future#get(timeout, unit)

            • Future#cancel

          • 實例

          • 總結(jié)




          點擊左下角閱讀原文,到 SegmentFault 思否社區(qū) 和文章作者展開更多互動和交流,掃描下方”二維碼“或在“公眾號后臺回復“ 入群 ”即可加入我們的技術(shù)交流群,收獲更多的技術(shù)文章~

          - END -

          瀏覽 15
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  毛片公开视频 | 成人性交免费看 | 熟女操逼视频 | 日逼黄片视频 | 久久伊人精品 |