<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          剛研究完Callable和Future,各位隨便問??!

          共 44765字,需瀏覽 90分鐘

           ·

          2021-04-29 03:59


          在Java的多線程編程中,除了Thread類和Runnable接口外,不得不說的就是Callable接口Future接口了。使用繼承Thread類或者實現(xiàn)Runnable接口的線程,無法返回最終的執(zhí)行結(jié)果數(shù)據(jù),只能等待線程執(zhí)行完成。此時,如果想要獲取線程執(zhí)行后的返回結(jié)果,那么,Callable和Future就派上用場了。

          Callable接口

          1.Callable接口介紹

          Callable接口是JDK1.5新增的泛型接口,在JDK1.8中,被聲明為函數(shù)式接口,如下所示。

          @FunctionalInterface
          public interface Callable<V{
              call() throws Exception;
          }

          在JDK 1.8中只聲明有一個方法的接口為函數(shù)式接口,函數(shù)式接口可以使用@FunctionalInterface注解修飾,也可以不使用@FunctionalInterface注解修飾。只要一個接口中只包含有一個方法,那么,這個接口就是函數(shù)式接口。

          在JDK中,實現(xiàn)Callable接口的子類如下圖所示。

          默認的子類層級關(guān)系圖看不清,這里,可以通過IDEA右鍵Callable接口,選擇“Layout”來指定Callable接口的實現(xiàn)類圖的不同結(jié)構(gòu),如下所示。

          這里,可以選擇“Organic Layout”選項,選擇后的Callable接口的子類的結(jié)構(gòu)如下圖所示。

          在實現(xiàn)Callable接口的子類中,有幾個比較重要的類,如下圖所示。

          分別是:Executors類中的靜態(tài)內(nèi)部類:PrivilegedCallable、PrivilegedCallableUsingCurrentClassLoader、RunnableAdapter和Task類下的TaskCallable。

          2.實現(xiàn)Callable接口的重要類分析

          接下來,分析的類主要有:PrivilegedCallable、PrivilegedCallableUsingCurrentClassLoader、RunnableAdapter和Task類下的TaskCallable。雖然這些類在實際工作中很少被直接用到,但是作為一名合格的開發(fā)工程師,設(shè)置是禿頂?shù)馁Y深專家來說,了解并掌握這些類的實現(xiàn)有助你進一步理解Callable接口,并提高專業(yè)技能(頭發(fā)再掉一批,哇哈哈哈。。。)。

          • PrivilegedCallable

          PrivilegedCallable類是Callable接口的一個特殊實現(xiàn)類,它表明Callable對象有某種特權(quán)來訪問系統(tǒng)的某種資源,PrivilegedCallable類的源代碼如下所示。

          /**
           * A callable that runs under established access control settings
           */

          static final class PrivilegedCallable<Timplements Callable<T{
           private final Callable<T> task;
           private final AccessControlContext acc;

           PrivilegedCallable(Callable<T> task) {
            this.task = task;
            this.acc = AccessController.getContext();
           }

           public T call() throws Exception {
            try {
             return AccessController.doPrivileged(
              new PrivilegedExceptionAction<T>() {
               public T run() throws Exception {
                return task.call();
               }
              }, acc);
            } catch (PrivilegedActionException e) {
             throw e.getException();
            }
           }
          }

          從PrivilegedCallable類的源代碼來看,可以將PrivilegedCallable看成是對Callable接口的封裝,并且這個類也繼承了Callable接口。

          在PrivilegedCallable類中有兩個成員變量,分別是Callable接口的實例對象和AccessControlContext類的實例對象,如下所示。

          private final Callable<T> task;
          private final AccessControlContext acc;

          其中,AccessControlContext類可以理解為一個具有系統(tǒng)資源訪問決策的上下文類,通過這個類可以訪問系統(tǒng)的特定資源。通過類的構(gòu)造方法可以看出,在實例化AccessControlContext類的對象時,只需要傳遞Callable接口子類的對象即可,如下所示。

          PrivilegedCallable(Callable<T> task) {
           this.task = task;
           this.acc = AccessController.getContext();
          }

          AccessControlContext類的對象是通過AccessController類的getContext()方法獲取的,這里,查看AccessController類的getContext()方法,如下所示。

          public static AccessControlContext getContext(){
           AccessControlContext acc = getStackAccessControlContext();
           if (acc == null) {
            return new AccessControlContext(nulltrue);
           } else {
            return acc.optimize();
           }
          }

          通過AccessController的getContext()方法可以看出,首先通過getStackAccessControlContext()方法來獲取AccessControlContext對象實例。如果獲取的AccessControlContext對象實例為空,則通過調(diào)用AccessControlContext類的構(gòu)造方法實例化,否則,調(diào)用AccessControlContext對象實例的optimize()方法返回AccessControlContext對象實例。

          這里,我們先看下getStackAccessControlContext()方法是個什么鬼。

          private static native AccessControlContext getStackAccessControlContext();

          原來是個本地方法,方法的字面意思就是獲取能夠訪問系統(tǒng)棧的決策上下文對象。

          接下來,我們回到PrivilegedCallable類的call()方法,如下所示。

          public T call() throws Exception {
           try {
            return AccessController.doPrivileged(
             new PrivilegedExceptionAction<T>() {
              public T run() throws Exception {
               return task.call();
              }
             }, acc);
           } catch (PrivilegedActionException e) {
            throw e.getException();
           }
          }

          通過調(diào)用AccessController.doPrivileged()方法,傳遞PrivilegedExceptionAction。接口對象和AccessControlContext對象,并最終返回泛型的實例對象。

          首先,看下AccessController.doPrivileged()方法,如下所示。

          @CallerSensitive
          public static native <T> T
              doPrivileged(PrivilegedExceptionAction<T> action,
                           AccessControlContext context)

              throws PrivilegedActionException
          ;

          可以看到,又是一個本地方法。也就是說,最終的執(zhí)行情況是將PrivilegedExceptionAction接口對象和AccessControlContext對象實例傳遞給這個本地方法執(zhí)行。并且在PrivilegedExceptionAction接口對象的run()方法中調(diào)用Callable接口的call()方法來執(zhí)行最終的業(yè)務(wù)邏輯,并且返回泛型對象。

          • PrivilegedCallableUsingCurrentClassLoader

          此類表示為在已經(jīng)建立的特定訪問控制和當(dāng)前的類加載器下運行的Callable類,源代碼如下所示。

          /**
           * A callable that runs under established access control settings and
           * current ClassLoader
           */

          static final class PrivilegedCallableUsingCurrentClassLoader<Timplements Callable<T{
           private final Callable<T> task;
           private final AccessControlContext acc;
           private final ClassLoader ccl;

           PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) {
            SecurityManager sm = System.getSecurityManager();
            if (sm != null) {
             sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
             sm.checkPermission(new RuntimePermission("setContextClassLoader"));
            }
            this.task = task;
            this.acc = AccessController.getContext();
            this.ccl = Thread.currentThread().getContextClassLoader();
           }

           public T call() throws Exception {
            try {
             return AccessController.doPrivileged(
              new PrivilegedExceptionAction<T>() {
               public T run() throws Exception {
                Thread t = Thread.currentThread();
                ClassLoader cl = t.getContextClassLoader();
                if (ccl == cl) {
                 return task.call();
                } else {
                 t.setContextClassLoader(ccl);
                 try {
                  return task.call();
                 } finally {
                  t.setContextClassLoader(cl);
                 }
                }
               }
              }, acc);
            } catch (PrivilegedActionException e) {
             throw e.getException();
            }
           }
          }

          這個類理解起來比較簡單,首先,在類中定義了三個成員變量,如下所示。

          private final Callable<T> task;
          private final AccessControlContext acc;
          private final ClassLoader ccl;

          接下來,通過構(gòu)造方法注入Callable對象,在構(gòu)造方法中,首先獲取系統(tǒng)安全管理器對象實例,通過系統(tǒng)安全管理器對象實例檢查是否具有獲取ClassLoader和設(shè)置ContextClassLoader的權(quán)限。并在構(gòu)造方法中為三個成員變量賦值,如下所示。

          PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) {
           SecurityManager sm = System.getSecurityManager();
           if (sm != null) {
            sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
            sm.checkPermission(new RuntimePermission("setContextClassLoader"));
           }
           this.task = task;
           this.acc = AccessController.getContext();
           this.ccl = Thread.currentThread().getContextClassLoader();
          }

          接下來,通過調(diào)用call()方法來執(zhí)行具體的業(yè)務(wù)邏輯,如下所示。

          public T call() throws Exception {
           try {
            return AccessController.doPrivileged(
             new PrivilegedExceptionAction<T>() {
              public T run() throws Exception {
               Thread t = Thread.currentThread();
               ClassLoader cl = t.getContextClassLoader();
               if (ccl == cl) {
                return task.call();
               } else {
                t.setContextClassLoader(ccl);
                try {
                 return task.call();
                } finally {
                 t.setContextClassLoader(cl);
                }
               }
              }
             }, acc);
           } catch (PrivilegedActionException e) {
            throw e.getException();
           }
          }

          在call()方法中同樣是通過調(diào)用AccessController類的本地方法doPrivileged,傳遞PrivilegedExceptionAction接口的實例對象和AccessControlContext類的對象實例。

          具體執(zhí)行邏輯為:在PrivilegedExceptionAction對象的run()方法中獲取當(dāng)前線程的ContextClassLoader對象,如果在構(gòu)造方法中獲取的ClassLoader對象與此處的ContextClassLoader對象是同一個對象(不止對象實例相同,而且內(nèi)存地址也相同),則直接調(diào)用Callable對象的call()方法返回結(jié)果。否則,將PrivilegedExceptionAction對象的run()方法中的當(dāng)前線程的ContextClassLoader設(shè)置為在構(gòu)造方法中獲取的類加載器對象,接下來,再調(diào)用Callable對象的call()方法返回結(jié)果。最終將當(dāng)前線程的ContextClassLoader重置為之前的ContextClassLoader。

          • RunnableAdapter

          RunnableAdapter類比較簡單,給定運行的任務(wù)和結(jié)果,運行給定的任務(wù)并返回給定的結(jié)果,源代碼如下所示。

          /**
           * A callable that runs given task and returns given result
           */

          static final class RunnableAdapter<Timplements Callable<T{
           final Runnable task;
           final T result;
           RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
           }
           public T call() {
            task.run();
            return result;
           }
          }
          • TaskCallable

          TaskCallable類是javafx.concurrent.Task類的靜態(tài)內(nèi)部類,TaskCallable類主要是實現(xiàn)了Callable接口并且被定義為FutureTask的類,并且在這個類中允許我們攔截call()方法來更新task任務(wù)的狀態(tài)。源代碼如下所示。

          private static final class TaskCallable<Vimplements Callable<V{

           private Task<V> task;
           private TaskCallable() { }

           @Override 
           public V call() throws Exception {
            task.started = true;
            task.runLater(() -> {
             task.setState(State.SCHEDULED);
             task.setState(State.RUNNING);
            });
            try {
             final V result = task.call();
             if (!task.isCancelled()) {
              task.runLater(() -> {
               task.updateValue(result);
               task.setState(State.SUCCEEDED);
              });
              return result;
             } else {
              return null;
             }
            } catch (final Throwable th) {
             task.runLater(() -> {
              task._setException(th);
              task.setState(State.FAILED);
             });
             if (th instanceof Exception) {
              throw (Exception) th;
             } else {
              throw new Exception(th);
             }
            }
           }
          }

          從TaskCallable類的源代碼可以看出,只定義了一個Task類型的成員變量。下面主要分析TaskCallable類的call()方法。

          當(dāng)程序的執(zhí)行進入到call()方法時,首先將task對象的started屬性設(shè)置為true,表示任務(wù)已經(jīng)開始,并且將任務(wù)的狀態(tài)依次設(shè)置為State.SCHEDULED和State.RUNNING,依次觸發(fā)任務(wù)的調(diào)度事件和運行事件。如下所示。

          task.started = true;
          task.runLater(() -> {
           task.setState(State.SCHEDULED);
           task.setState(State.RUNNING);
          });

          接下來,在try代碼塊中執(zhí)行Task對象的call()方法,返回泛型對象。如果任務(wù)沒有被取消,則更新任務(wù)的緩存,將調(diào)用call()方法返回的泛型對象綁定到Task對象中的ObjectProperty對象中,其中,ObjectProperty在Task類中的定義如下。

          private final ObjectProperty<V> value = new SimpleObjectProperty<>(this"value");

          接下來,將任務(wù)的狀態(tài)設(shè)置為成功狀態(tài)。如下所示。

          try {
           final V result = task.call();
           if (!task.isCancelled()) {
            task.runLater(() -> {
             task.updateValue(result);
             task.setState(State.SUCCEEDED);
            });
            return result;
           } else {
            return null;
           }
          }

          如果程序拋出了異?;蛘咤e誤,會進入catch()代碼塊,設(shè)置Task對象的Exception信息并將狀態(tài)設(shè)置為State.FAILED,也就是將任務(wù)標(biāo)記為失敗。接下來,判斷異?;蝈e誤的類型,如果是Exception類型的異常,則直接強轉(zhuǎn)為Exception類型的異常并拋出。否則,將異?;蛘咤e誤封裝為Exception對象并拋出,如下所示。

          catch (final Throwable th) {
           task.runLater(() -> {
            task._setException(th);
            task.setState(State.FAILED);
           });
           if (th instanceof Exception) {
            throw (Exception) th;
           } else {
            throw new Exception(th);
           }
          }

          兩種異步模型與深度解析Future接口

          兩種異步模型

          在Java的并發(fā)編程中,大體上會分為兩種異步編程模型,一類是直接以異步的形式來并行運行其他的任務(wù),不需要返回任務(wù)的結(jié)果數(shù)據(jù)。一類是以異步的形式運行其他任務(wù),需要返回結(jié)果。

          1.無返回結(jié)果的異步模型

          無返回結(jié)果的異步任務(wù),可以直接將任務(wù)丟進線程或線程池中運行,此時,無法直接獲得任務(wù)的執(zhí)行結(jié)果數(shù)據(jù),一種方式是可以使用回調(diào)方法來獲取任務(wù)的運行結(jié)果。

          具體的方案是:定義一個回調(diào)接口,并在接口中定義接收任務(wù)結(jié)果數(shù)據(jù)的方法,具體邏輯在回調(diào)接口的實現(xiàn)類中完成。將回調(diào)接口與任務(wù)參數(shù)一同放進線程或線程池中運行,任務(wù)運行后調(diào)用接口方法,執(zhí)行回調(diào)接口實現(xiàn)類中的邏輯來處理結(jié)果數(shù)據(jù)。這里,給出一個簡單的示例供參考。

          • 定義回調(diào)接口
          package io.binghe.concurrent.lab04;

          /**
           * @author binghe
           * @version 1.0.0
           * @description 定義回調(diào)接口
           */

          public interface TaskCallable<T{
              callable(T t);
          }

          便于接口的通用型,這里為回調(diào)接口定義了泛型。

          • 定義任務(wù)結(jié)果數(shù)據(jù)的封裝類
          package io.binghe.concurrent.lab04;

          import java.io.Serializable;

          /**
           * @author binghe
           * @version 1.0.0
           * @description 任務(wù)執(zhí)行結(jié)果
           */

          public class TaskResult implements Serializable {
              private static final long serialVersionUID = 8678277072402730062L;
              /**
               * 任務(wù)狀態(tài)
               */

              private Integer taskStatus;

              /**
               * 任務(wù)消息
               */

              private String taskMessage;

              /**
               * 任務(wù)結(jié)果數(shù)據(jù)
               */

              private String taskResult;
           
           //省略getter和setter方法
           @Override
              public String toString() {
                  return "TaskResult{" +
                          "taskStatus=" + taskStatus +
                          ", taskMessage='" + taskMessage + '\'' +
                          ", taskResult='" + taskResult + '\'' +
                          '}';
              }
          }
          • 創(chuàng)建回調(diào)接口的實現(xiàn)類

          回調(diào)接口的實現(xiàn)類主要用來對任務(wù)的返回結(jié)果進行相應(yīng)的業(yè)務(wù)處理,這里,為了方便演示,只是將結(jié)果數(shù)據(jù)返回。大家需要根據(jù)具體的業(yè)務(wù)場景來做相應(yīng)的分析和處理。

          package io.binghe.concurrent.lab04;

          /**
           * @author binghe
           * @version 1.0.0
           * @description 回調(diào)函數(shù)的實現(xiàn)類
           */

          public class TaskHandler implements TaskCallable<TaskResult{
              @Override
          public TaskResult callable(TaskResult taskResult) {
          //TODO 拿到結(jié)果數(shù)據(jù)后進一步處理
              System.out.println(taskResult.toString());
                  return taskResult;
              }
          }
          • 創(chuàng)建任務(wù)的執(zhí)行類

          任務(wù)的執(zhí)行類是具體執(zhí)行任務(wù)的類,實現(xiàn)Runnable接口,在此類中定義一個回調(diào)接口類型的成員變量和一個String類型的任務(wù)參數(shù)(模擬任務(wù)的參數(shù)),并在構(gòu)造方法中注入回調(diào)接口和任務(wù)參數(shù)。在run方法中執(zhí)行任務(wù),任務(wù)完成后將任務(wù)的結(jié)果數(shù)據(jù)封裝成TaskResult對象,調(diào)用回調(diào)接口的方法將TaskResult對象傳遞到回調(diào)方法中。

          package io.binghe.concurrent.lab04;
          /**
           * @author binghe
           * @version 1.0.0
           * @description 任務(wù)執(zhí)行類
           */

          public class TaskExecutor implements Runnable{
              private TaskCallable<TaskResult> taskCallable;
              private String taskParameter;

              public TaskExecutor(TaskCallable<TaskResult> taskCallable, String taskParameter){
                  this.taskCallable = taskCallable;
                  this.taskParameter = taskParameter;
              }

              @Override
              public void run() {
                  //TODO 一系列業(yè)務(wù)邏輯,將結(jié)果數(shù)據(jù)封裝成TaskResult對象并返回
                  TaskResult result = new TaskResult();
                  result.setTaskStatus(1);
                  result.setTaskMessage(this.taskParameter);
                  result.setTaskResult("異步回調(diào)成功");
                  taskCallable.callable(result);
              }
          }

          到這里,整個大的框架算是完成了,接下來,就是測試看能否獲取到異步任務(wù)的結(jié)果了。

          • 異步任務(wù)測試類
          package io.binghe.concurrent.lab04;

          /**
           * @author binghe
           * @version 1.0.0
           * @description 測試回調(diào)
           */

          public class TaskCallableTest {
              public static void main(String[] args){
                  TaskCallable<TaskResult> taskCallable = new TaskHandler();
                  TaskExecutor taskExecutor = new TaskExecutor(taskCallable, "測試回調(diào)任務(wù)");
                  new Thread(taskExecutor).start();
              }
          }

          在測試類中,使用Thread類創(chuàng)建一個新的線程,并啟動線程運行任務(wù)。運行程序最終的接口數(shù)據(jù)如下所示。

          TaskResult{taskStatus=1, taskMessage='測試回調(diào)任務(wù)', taskResult='異步回調(diào)成功'}

          大家可以細細品味下這種獲取異步結(jié)果的方式。這里,只是簡單的使用了Thread類來創(chuàng)建并啟動線程,也可以使用線程池的方式實現(xiàn)。大家可自行實現(xiàn)以線程池的方式通過回調(diào)接口獲取異步結(jié)果。

          2.有返回結(jié)果的異步模型

          盡管使用回調(diào)接口能夠獲取異步任務(wù)的結(jié)果,但是這種方式使用起來略顯復(fù)雜。在JDK中提供了可以直接返回異步結(jié)果的處理方案。最常用的就是使用Future接口或者其實現(xiàn)類FutureTask來接收任務(wù)的返回結(jié)果。

          • 使用Future接口獲取異步結(jié)果

          使用Future接口往往配合線程池來獲取異步執(zhí)行結(jié)果,如下所示。

          package io.binghe.concurrent.lab04;

          import java.util.concurrent.*;

          /**
           * @author binghe
           * @version 1.0.0
           * @description 測試Future獲取異步結(jié)果
           */

          public class FutureTest {

              public static void main(String[] args) throws ExecutionException, InterruptedException {
                  ExecutorService executorService = Executors.newSingleThreadExecutor();
                  Future<String> future = executorService.submit(new Callable<String>() {
                      @Override
                      public String call() throws Exception {
                          return "測試Future獲取異步結(jié)果";
                      }
                  });
                  System.out.println(future.get());
                  executorService.shutdown();
              }
          }

          運行結(jié)果如下所示。

          測試Future獲取異步結(jié)果
          • 使用FutureTask類獲取異步結(jié)果

          FutureTask類既可以結(jié)合Thread類使用也可以結(jié)合線程池使用,接下來,就看下這兩種使用方式。

          結(jié)合Thread類的使用示例如下所示。

          package io.binghe.concurrent.lab04;

          import java.util.concurrent.*;

          /**
           * @author binghe
           * @version 1.0.0
           * @description 測試FutureTask獲取異步結(jié)果
           */

          public class FutureTaskTest {

              public static void main(String[] args)throws ExecutionException, InterruptedException{
                  FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
                      @Override
                      public String call() throws Exception {
                          return "測試FutureTask獲取異步結(jié)果";
                      }
                  });
                  new Thread(futureTask).start();
                  System.out.println(futureTask.get());
              }
          }

          運行結(jié)果如下所示。

          測試FutureTask獲取異步結(jié)果

          結(jié)合線程池的使用示例如下。

          package io.binghe.concurrent.lab04;

          import java.util.concurrent.*;

          /**
           * @author binghe
           * @version 1.0.0
           * @description 測試FutureTask獲取異步結(jié)果
           */

          public class FutureTaskTest {

              public static void main(String[] args) throws ExecutionException, InterruptedException {
                  ExecutorService executorService = Executors.newSingleThreadExecutor();
                  FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
                      @Override
                      public String call() throws Exception {
                          return "測試FutureTask獲取異步結(jié)果";
                      }
                  });
                  executorService.execute(futureTask);
                  System.out.println(futureTask.get());
                  executorService.shutdown();
              }
          }

          運行結(jié)果如下所示。

          測試FutureTask獲取異步結(jié)果

          可以看到使用Future接口或者FutureTask類來獲取異步結(jié)果比使用回調(diào)接口獲取異步結(jié)果簡單多了。注意:實現(xiàn)異步的方式很多,這里只是用多線程舉例。

          接下來,就深入分析下Future接口。

          深度解析Future接口

          1.Future接口

          Future是JDK1.5新增的異步編程接口,其源代碼如下所示。

          package java.util.concurrent;

          public interface Future<V{

              boolean cancel(boolean mayInterruptIfRunning);

              boolean isCancelled();

              boolean isDone();

              get() throws InterruptedException, ExecutionException;

              get(long timeout, TimeUnit unit)
                  throws InterruptedException, ExecutionException, TimeoutException
          ;
          }

          可以看到,在Future接口中,總共定義了5個抽象方法。接下來,就分別介紹下這5個方法的含義。

          • cancel(boolean)

          取消任務(wù)的執(zhí)行,接收一個boolean類型的參數(shù),成功取消任務(wù),則返回true,否則返回false。當(dāng)任務(wù)已經(jīng)完成,已經(jīng)結(jié)束或者因其他原因不能取消時,方法會返回false,表示任務(wù)取消失敗。當(dāng)任務(wù)未啟動調(diào)用了此方法,并且結(jié)果返回true(取消成功),則當(dāng)前任務(wù)不再運行。如果任務(wù)已經(jīng)啟動,會根據(jù)當(dāng)前傳遞的boolean類型的參數(shù)來決定是否中斷當(dāng)前運行的線程來取消當(dāng)前運行的任務(wù)。

          • isCancelled()

          判斷任務(wù)在完成之前是否被取消,如果在任務(wù)完成之前被取消,則返回true;否則,返回false。

          這里需要注意一個細節(jié):只有任務(wù)未啟動,或者在完成之前被取消,才會返回true,表示任務(wù)已經(jīng)被成功取消。其他情況都會返回false。

          • isDone()

          判斷任務(wù)是否已經(jīng)完成,如果任務(wù)正常結(jié)束、拋出異常退出、被取消,都會返回true,表示任務(wù)已經(jīng)完成。

          • get()

          當(dāng)任務(wù)完成時,直接返回任務(wù)的結(jié)果數(shù)據(jù);當(dāng)任務(wù)未完成時,等待任務(wù)完成并返回任務(wù)的結(jié)果數(shù)據(jù)。

          • get(long, TimeUnit)

          當(dāng)任務(wù)完成時,直接返回任務(wù)的結(jié)果數(shù)據(jù);當(dāng)任務(wù)未完成時,等待任務(wù)完成,并設(shè)置了超時等待時間。在超時時間內(nèi)任務(wù)完成,則返回結(jié)果;否則,拋出TimeoutException異常。

          2.RunnableFuture接口

          Future接口有一個重要的子接口,那就是RunnableFuture接口,RunnableFuture接口不但繼承了Future接口,而且繼承了java.lang.Runnable接口,其源代碼如下所示。

          package java.util.concurrent;

          public interface RunnableFuture<Vextends RunnableFuture<V{
              void run();
          }

          這里,問一下,RunnableFuture接口中有幾個抽象方法?想好了再說!哈哈哈。。。

          這個接口比較簡單run()方法就是運行任務(wù)時調(diào)用的方法。

          3.FutureTask類

          FutureTask類是RunnableFuture接口的一個非常重要的實現(xiàn)類,它實現(xiàn)了RunnableFuture接口、Future接口和Runnable接口的所有方法。FutureTask類的源代碼比較多,這個就不粘貼了,大家自行到j(luò)ava.util.concurrent下查看。

          (1)FutureTask類中的變量與常量

          在FutureTask類中首先定義了一個狀態(tài)變量state,這個變量使用了volatile關(guān)鍵字修飾,這里,大家只需要知道volatile關(guān)鍵字通過內(nèi)存屏障和禁止重排序優(yōu)化來實現(xiàn)線程安全,后續(xù)會單獨深度分析volatile關(guān)鍵字是如何保證線程安全的。緊接著,定義了幾個任務(wù)運行時的狀態(tài)常量,如下所示。

          private volatile int state;
          private static final int NEW          = 0;
          private static final int COMPLETING   = 1;
          private static final int NORMAL       = 2;
          private static final int EXCEPTIONAL  = 3;
          private static final int CANCELLED    = 4;
          private static final int INTERRUPTING = 5;
          private static final int INTERRUPTED  = 6;

          其中,代碼注釋中給出了幾個可能的狀態(tài)變更流程,如下所示。

          NEW -> COMPLETING -> NORMAL
          NEW -> COMPLETING -> EXCEPTIONAL
          NEW -> CANCELLED
          NEW -> INTERRUPTING -> INTERRUPTED

          接下來,定義了其他幾個成員變量,如下所示。

          private Callable<V> callable;
          private Object outcome; 
          private volatile Thread runner;
          private volatile WaitNode waiters;

          又看到我們所熟悉的Callable接口了,Callable接口那肯定就是用來調(diào)用call()方法執(zhí)行具體任務(wù)了。

          • outcome:Object類型,表示通過get()方法獲取到的結(jié)果數(shù)據(jù)或者異常信息。

          • runner:運行Callable的線程,運行期間會使用CAS保證線程安全,這里大家只需要知道CAS是Java保證線程安全的一種方式,后續(xù)文章中會深度分析CAS如何保證線程安全。

          • waiters:WaitNode類型的變量,表示等待線程的堆棧,在FutureTask的實現(xiàn)中,會通過CAS結(jié)合此堆棧交換任務(wù)的運行狀態(tài)。

          看一下WaitNode類的定義,如下所示。

          static final class WaitNode {
           volatile Thread thread;
           volatile WaitNode next;
           WaitNode() { thread = Thread.currentThread(); }
          }

          可以看到,WaitNode類是FutureTask類的靜態(tài)內(nèi)部類,類中定義了一個Thread成員變量和指向下一個WaitNode節(jié)點的引用。其中通過構(gòu)造方法將thread變量設(shè)置為當(dāng)前線程。

          (2)構(gòu)造方法

          接下來,是FutureTask的兩個構(gòu)造方法,比較簡單,如下所示。

          public FutureTask(Callable<V> callable) {
           if (callable == null)
            throw new NullPointerException();
           this.callable = callable;
           this.state = NEW;
          }

          public FutureTask(Runnable runnable, V result) {
           this.callable = Executors.callable(runnable, result);
           this.state = NEW;
          }

          (3)是否取消與完成方法

          繼續(xù)向下看源碼,看到一個任務(wù)是否取消的方法,和一個任務(wù)是否完成的方法,如下所示。

          public boolean isCancelled() {
           return state >= CANCELLED;
          }

          public boolean isDone() {
           return state != NEW;
          }

          這兩方法中,都是通過判斷任務(wù)的狀態(tài)來判定任務(wù)是否已取消和已完成的。為啥會這樣判斷呢?再次查看FutureTask類中定義的狀態(tài)常量發(fā)現(xiàn),其常量的定義是有規(guī)律的,并不是隨意定義的。其中,大于或者等于CANCELLED的常量為CANCELLED、INTERRUPTING和INTERRUPTED,這三個狀態(tài)均可以表示線程已經(jīng)被取消。當(dāng)狀態(tài)不等于NEW時,可以表示任務(wù)已經(jīng)完成。

          通過這里,大家可以學(xué)到一點:以后在編碼過程中,要按照規(guī)律來定義自己使用的狀態(tài),尤其是涉及到業(yè)務(wù)中有頻繁的狀態(tài)變更的操作,有規(guī)律的狀態(tài)可使業(yè)務(wù)處理變得事半功倍,這也是通過看別人的源碼設(shè)計能夠?qū)W到的,這里,建議大家還是多看別人寫的優(yōu)秀的開源框架的源碼。

          (4)取消方法

          我們繼續(xù)向下看源碼,接下來,看到的是cancel(boolean)方法,如下所示。

          public boolean cancel(boolean mayInterruptIfRunning) {
           if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
               mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
           try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
             try {
              Thread t = runner;
              if (t != null)
               t.interrupt();
             } finally { // final state
              UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
             }
            }
           } finally {
            finishCompletion();
           }
           return true;
          }

          接下來,拆解cancel(boolean)方法。在cancel(boolean)方法中,首先判斷任務(wù)的狀態(tài)和CAS的操作結(jié)果,如果任務(wù)的狀態(tài)不等于NEW或者CAS的操作返回false,則直接返回false,表示任務(wù)取消失敗。如下所示。

          if (!(state == NEW &&
             UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
           return false;

          接下來,在try代碼塊中,首先判斷是否可以中斷當(dāng)前任務(wù)所在的線程來取消任務(wù)的運行。如果可以中斷當(dāng)前任務(wù)所在的線程,則以一個Thread臨時變量來指向運行任務(wù)的線程,當(dāng)指向的變量不為空時,調(diào)用線程對象的interrupt()方法來中斷線程的運行,最后將線程標(biāo)記為被中斷的狀態(tài)。如下所示。

          try {
           if (mayInterruptIfRunning) {
            try {
             Thread t = runner;
             if (t != null)
              t.interrupt();
            } finally { // final state
             UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
           }
          }

          這里,發(fā)現(xiàn)變更任務(wù)狀態(tài)使用的是UNSAFE.putOrderedInt()方法,這個方法是個什么鬼呢?點進去看一下,如下所示。

          public native void putOrderedInt(Object var1, long var2, int var4);

          可以看到,又是一個本地方法,嘿嘿,這里先不管它,后續(xù)文章會詳解這些方法的作用。

          接下來,cancel(boolean)方法會進入finally代碼塊,如下所示。

          finally {
           finishCompletion();
          }

          可以看到在finallly代碼塊中調(diào)用了finishCompletion()方法,顧名思義,finishCompletion()方法表示結(jié)束任務(wù)的運行,接下來看看它是如何實現(xiàn)的。點到finishCompletion()方法中看一下,如下所示。

          private void finishCompletion() {
           // assert state > COMPLETING;
           for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
             for (;;) {
              Thread t = q.thread;
              if (t != null) {
               q.thread = null;
               LockSupport.unpark(t);
              }
              WaitNode next = q.next;
              if (next == null)
               break;
              q.next = null// unlink to help gc
              q = next;
             }
             break;
            }
           }
           done();
           callable = null;        // to reduce footprint
          }

          在finishCompletion()方法中,首先定義一個for循環(huán),循環(huán)終止因子為waiters為null,在循環(huán)中,判斷CAS操作是否成功,如果成功進行if條件中的邏輯。首先,定義一個for自旋循環(huán),在自旋循環(huán)體中,喚醒WaitNode堆棧中的線程,使其運行完成。當(dāng)WaitNode堆棧中的線程運行完成后,通過break退出外層for循環(huán)。接下來調(diào)用done()方法。done()方法又是個什么鬼呢?點進去看一下,如下所示。

          protected void done() { }

          可以看到,done()方法是一個空的方法體,交由子類來實現(xiàn)具體的業(yè)務(wù)邏輯。

          當(dāng)我們的具體業(yè)務(wù)中,需要在取消任務(wù)時,執(zhí)行一些額外的業(yè)務(wù)邏輯,可以在子類中覆寫done()方法的實現(xiàn)。

          (5)get()方法

          繼續(xù)向下看FutureTask類的代碼,F(xiàn)utureTask類中實現(xiàn)了兩個get()方法,如下所示。

          public V get() throws InterruptedException, ExecutionException {
           int s = state;
           if (s <= COMPLETING)
            s = awaitDone(false0L);
           return report(s);
          }

          public V get(long timeout, TimeUnit unit)
           throws InterruptedException, ExecutionException, TimeoutException 
          {
           if (unit == null)
            throw new NullPointerException();
           int s = state;
           if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
           return report(s);
          }

          沒參數(shù)的get()方法為當(dāng)任務(wù)未運行完成時,會阻塞,直到返回任務(wù)結(jié)果。有參數(shù)的get()方法為當(dāng)任務(wù)未運行完成,并且等待時間超出了超時時間,會TimeoutException異常。

          兩個get()方法的主要邏輯差不多,一個沒有超時設(shè)置,一個有超時設(shè)置,這里說一下主要邏輯。判斷任務(wù)的當(dāng)前狀態(tài)是否小于或者等于COMPLETING,也就是說,任務(wù)是NEW狀態(tài)或者COMPLETING,調(diào)用awaitDone()方法,看下awaitDone()方法的實現(xiàn),如下所示。

          private int awaitDone(boolean timed, long nanos)
           throws InterruptedException 
          {
           final long deadline = timed ? System.nanoTime() + nanos : 0L;
           WaitNode q = null;
           boolean queued = false;
           for (;;) {
            if (Thread.interrupted()) {
             removeWaiter(q);
             throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
             if (q != null)
              q.thread = null;
             return s;
            }
            else if (s == COMPLETING) // cannot time out yet
             Thread.yield();
            else if (q == null)
             q = new WaitNode();
            else if (!queued)
             queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                       q.next = waiters, q);
            else if (timed) {
             nanos = deadline - System.nanoTime();
             if (nanos <= 0L) {
              removeWaiter(q);
              return state;
             }
             LockSupport.parkNanos(this, nanos);
            }
            else
             LockSupport.park(this);
           }
          }

          接下來,拆解awaitDone()方法。在awaitDone()方法中,最重要的就是for自旋循環(huán),在循環(huán)中首先判斷當(dāng)前線程是否被中斷,如果已經(jīng)被中斷,則調(diào)用removeWaiter()將當(dāng)前線程從堆棧中移除,并且拋出InterruptedException異常,如下所示。

          if (Thread.interrupted()) {
           removeWaiter(q);
           throw new InterruptedException();
          }

          接下來,判斷任務(wù)的當(dāng)前狀態(tài)是否完成,如果完成,并且堆棧句柄不為空,則將堆棧中的當(dāng)前線程設(shè)置為空,返回當(dāng)前任務(wù)的狀態(tài),如下所示。

          int s = state;
          if (s > COMPLETING) {
           if (q != null)
            q.thread = null;
           return s;
          }

          當(dāng)任務(wù)的狀態(tài)為COMPLETING時,使當(dāng)前線程讓出CPU資源,如下所示。

          else if (s == COMPLETING)
           Thread.yield();

          如果堆棧為空,則創(chuàng)建堆棧對象,如下所示。

          else if (q == null)
           q = new WaitNode();

          如果queued變量為false,通過CAS操作為queued賦值,如果awaitDone()方法傳遞的timed參數(shù)為true,則計算超時時間,當(dāng)時間已超時,則在堆棧中移除當(dāng)前線程并返回任務(wù)狀態(tài),如下所示。如果未超時,則重置超時時間,如下所示。

          else if (!queued)
           queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                     q.next = waiters, q);
          else if (timed) {
           nanos = deadline - System.nanoTime();
           if (nanos <= 0L) {
            removeWaiter(q);
            return state;
           }
           LockSupport.parkNanos(this, nanos);
          }

          如果不滿足上述的所有條件,則將當(dāng)前線程設(shè)置為等待狀態(tài),如下所示。

          else
           LockSupport.park(this);

          接下來,回到get()方法中,當(dāng)awaitDone()方法返回結(jié)果,或者任務(wù)的狀態(tài)不滿足條件時,都會調(diào)用report()方法,并將當(dāng)前任務(wù)的狀態(tài)傳遞到report()方法中,并返回結(jié)果,如下所示。

          return report(s);

          看來,這里還要看下report()方法啊,點進去看下report()方法的實現(xiàn),如下所示。

          private V report(int s) throws ExecutionException {
           Object x = outcome;
           if (s == NORMAL)
            return (V)x;
           if (s >= CANCELLED)
            throw new CancellationException();
           throw new ExecutionException((Throwable)x);
          }

          可以看到,report()方法的實現(xiàn)比較簡單,首先,將outcome數(shù)據(jù)賦值給x變量,接下來,主要是判斷接收到的任務(wù)狀態(tài),如果狀態(tài)為NORMAL,則將x強轉(zhuǎn)為泛型類型返回;當(dāng)任務(wù)的狀態(tài)大于或者等于CANCELLED,也就是任務(wù)已經(jīng)取消,則拋出CancellationException異常,其他情況則拋出ExecutionException異常。

          至此,get()方法分析完成。注意:一定要理解get()方法的實現(xiàn),因為get()方法是我們使用Future接口和FutureTask類時,使用的比較頻繁的一個方法。

          (6)set()方法與setException()方法

          繼續(xù)看FutureTask類的代碼,接下來看到的是set()方法與setException()方法,如下所示。

          protected void set(V v) {
           if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
           }
          }

          protected void setException(Throwable t) {
           if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
           }
          }

          通過源碼可以看出,set()方法與setException()方法整體邏輯幾乎一樣,只是在設(shè)置任務(wù)狀態(tài)時一個將狀態(tài)設(shè)置為NORMAL,一個將狀態(tài)設(shè)置為EXCEPTIONAL。

          至于finishCompletion()方法,前面已經(jīng)分析過。

          (7)run()方法與runAndReset()方法

          接下來,就是run()方法了,run()方法的源代碼如下所示。

          public void run() {
           if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                    null, Thread.currentThread()))
            return;
           try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
             V result;
             boolean ran;
             try {
              result = c.call();
              ran = true;
             } catch (Throwable ex) {
              result = null;
              ran = false;
              setException(ex);
             }
             if (ran)
              set(result);
            }
           } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
             handlePossibleCancellationInterrupt(s);
           }
          }

          可以這么說,只要使用了Future和FutureTask,就必然會調(diào)用run()方法來運行任務(wù),掌握run()方法的流程是非常有必要的。在run()方法中,如果當(dāng)前狀態(tài)不是NEW,或者CAS操作返回的結(jié)果為false,則直接返回,不再執(zhí)行后續(xù)邏輯,如下所示。

          if (state != NEW ||
           !UNSAFE.compareAndSwapObject(this, runnerOffset,
                   null, Thread.currentThread()))
           return;

          接下來,在try代碼塊中,將成員變量callable賦值給一個臨時變量c,判斷臨時變量不等于null,并且任務(wù)狀態(tài)為NEW,則調(diào)用Callable接口的call()方法,并接收結(jié)果數(shù)據(jù)。并將ran變量設(shè)置為true。當(dāng)程序拋出異常時,將接收結(jié)果的變量設(shè)置為null,ran變量設(shè)置為false,并且調(diào)用setException()方法將任務(wù)的狀態(tài)設(shè)置為EXCEPTIONA。接下來,如果ran變量為true,則調(diào)用set()方法,如下所示。

          try {
           Callable<V> c = callable;
           if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
             result = c.call();
             ran = true;
            } catch (Throwable ex) {
             result = null;
             ran = false;
             setException(ex);
            }
            if (ran)
             set(result);
           }
          }

          接下來,程序會進入finally代碼塊中,如下所示。

          finally {
           // runner must be non-null until state is settled to
           // prevent concurrent calls to run()
           runner = null;
           // state must be re-read after nulling runner to prevent
           // leaked interrupts
           int s = state;
           if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
          }

          這里,將runner設(shè)置為null,如果任務(wù)的當(dāng)前狀態(tài)大于或者等于INTERRUPTING,也就是線程被中斷了。則調(diào)用handlePossibleCancellationInterrupt()方法,接下來,看下handlePossibleCancellationInterrupt()方法的實現(xiàn)。

          private void handlePossibleCancellationInterrupt(int s) {
           if (s == INTERRUPTING)
            while (state == INTERRUPTING)
             Thread.yield();
          }

          可以看到,handlePossibleCancellationInterrupt()方法的實現(xiàn)比較簡單,當(dāng)任務(wù)的狀態(tài)為INTERRUPTING時,使用while()循環(huán),條件為當(dāng)前任務(wù)狀態(tài)為INTERRUPTING,將當(dāng)前線程占用的CPU資源釋放,也就是說,當(dāng)任務(wù)運行完成后,釋放線程所占用的資源。

          runAndReset()方法的邏輯與run()差不多,只是runAndReset()方法會在finally代碼塊中將任務(wù)狀態(tài)重置為NEW。runAndReset()方法的源代碼如下所示,就不重復(fù)說了。

          protected boolean runAndReset() {
           if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                    null, Thread.currentThread()))
            return false;
           boolean ran = false;
           int s = state;
           try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
             try {
              c.call(); // don't set result
              ran = true;
             } catch (Throwable ex) {
              setException(ex);
             }
            }
           } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
             handlePossibleCancellationInterrupt(s);
           }
           return ran && s == NEW;
          }

          (8)removeWaiter()方法

          removeWaiter()方法中主要是使用自旋循環(huán)的方式來移除WaitNode中的線程,比較簡單,如下所示。

          private void removeWaiter(WaitNode node) {
           if (node != null) {
            node.thread = null;
            retry:
            for (;;) {          // restart on removeWaiter race
             for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
              s = q.next;
              if (q.thread != null)
               pred = q;
              else if (pred != null) {
               pred.next = s;
               if (pred.thread == null// check for race
                continue retry;
              }
              else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                         q, s))
               continue retry;
             }
             break;
            }
           }
          }

          最后,在FutureTask類的最后,有如下代碼。

          // Unsafe mechanics
          private static final sun.misc.Unsafe UNSAFE;
          private static final long stateOffset;
          private static final long runnerOffset;
          private static final long waitersOffset;
          static {
           try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset
             (k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset
             (k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset
             (k.getDeclaredField("waiters"));
           } catch (Exception e) {
            throw new Error(e);
           }
          }

          關(guān)于這些代碼的作用,會在后續(xù)深度解析CAS文章中詳細說明,這里就不再探討。

          至此,關(guān)于Future接口和FutureTask類的源碼就分析完了。

          瀏覽 52
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  香蕉人妻AV久久久久天天 | 亚洲日韩日韩人兽在线 | 99热99在线观看 | 亚洲一区福利在线 | 大香蕉太香蕉成人现现 |