<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          萬字解讀Java線程池設(shè)計思想及源碼實現(xiàn)

          共 92917字,需瀏覽 186分鐘

           ·

          2021-06-13 04:21

          前言

          相信大家都看過很多的關(guān)于線程池的文章,基本上也是面試的時候必問的,如果你在看過很多文章以后,還是一知半解的,那希望這篇文章能讓你真正的掌握好Java線程池。

          本文一大重點是源碼解析,同時會有少量篇幅介紹線程池設(shè)計思想以及作者Doug Lea實現(xiàn)過程中的一些巧妙用法。本文還是會一行行關(guān)鍵代碼進(jìn)行分析,目的是為了讓看源碼不是很理解的同學(xué)可以得到參考。

          線程池是非常重要的工具,如果你要成為一個好的工程師,還是得比較好地掌握這個知識,很多線上問題都是因為沒有用好線程池導(dǎo)致的。即使你為了謀生,也要知道,這基本上是面試必問的題目,而且面試官很容易從被面試者的回答中捕捉到被面試者的技術(shù)水平。

          本文略長,建議在pc上閱讀,邊看文章邊翻源碼(Java7和Java8都一樣),建議想好好看的讀者抽出至少30分鐘的整塊時間來閱讀。當(dāng)然,如果讀者僅為面試準(zhǔn)備,可以直接滑到最后的總結(jié)部分。

          總覽

          開篇來一些廢話。下圖是 java 線程池幾個相關(guān)類的繼承結(jié)構(gòu):

          1

          先簡單說說這個繼承結(jié)構(gòu),Executor位于最頂層,也是最簡單的,就一個execute(Runnable runnable) 接口方法定義。

          ExecutorService也是接口,在Executor接口的基礎(chǔ)上添加了很多的接口方法,所以一般來說我們會使用這個接口

          再下來一層是AbstractExecutorService,從名字就知道是抽象類,這里實現(xiàn)了非常有用的一些方法供子類直接使用,之后再細(xì)說。

          然后才到重點部分ThreadPoolExecutor類,這個類提供了關(guān)于線程池所需的非常豐富的功能。

          另外,我們還涉及到下圖中的這些類:

          others

          同在并發(fā)包中的Executors類,類名中帶字母s,我們猜到這個是工具類,里面的方法都是靜態(tài)方法,如以下我們最常用的用于生成ThreadPoolExecutor的實例的一些方法:

          public static ExecutorService newCachedThreadPool() {
              return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                            60L, TimeUnit.SECONDS,
                                            new SynchronousQueue<Runnable>());
          }
          public static ExecutorService newFixedThreadPool(int nThreads) {
              return new ThreadPoolExecutor(nThreads, nThreads,
                                            0L, TimeUnit.MILLISECONDS,
                                            new LinkedBlockingQueue<Runnable>());
          }

          另外,由于線程池支持獲取線程執(zhí)行的結(jié)果,所以,引入了Future接口,RunnableFuture繼承自此接口,然后我們最需要關(guān)心的就是它的實現(xiàn)類FutureTask。到這里,記住這個概念,在線程池的使用過程中,我們是往線程池提交任務(wù)(task),使用過線程池的都知道,我們提交的每個任務(wù)是實現(xiàn)了Runnable接口的,其實就是先將Runnable的任務(wù)包裝成FutureTask,然后再提交到線程池。這樣,讀者才能比較容易記住FutureTask這個類名:它首先是一個任務(wù)(Task),然后具有Future接口的語義,即可以在將來(Future)得到執(zhí)行的結(jié)果。

          當(dāng)然,線程池中的BlockingQueue也是非常重要的概念,如果線程數(shù)達(dá)到corePoolSize,我們的每個任務(wù)會提交到等待隊列中,等待線程池中的線程來取任務(wù)并執(zhí)行。這里的BlockingQueue通常我們使用其實現(xiàn)類LinkedBlockingQueue、ArrayBlockingQueue和SynchronousQueue,每個實現(xiàn)類都有不同的特征,使用場景之后會慢慢分析。想要詳細(xì)了解各個BlockingQueue的讀者,可以參考我的前面的一篇對BlockingQueue的各個實現(xiàn)類進(jìn)行詳細(xì)分析的文章。

          把事情說完整:除了上面說的這些類外,還有一個很重要的類,就是定時任務(wù)實現(xiàn)類ScheduledThreadPoolExecutor,它繼承自本文要重點講解的ThreadPoolExecutor,用于實現(xiàn)定時執(zhí)行。不過本文不會介紹它的實現(xiàn),我相信讀者看完本文后可以比較容易地看懂它的源碼。

          以上就是本文要介紹的知識,廢話不多說,開始進(jìn)入正文。

          Executor 接口

          /* 
           * @since 1.5
           * @author Doug Lea
           */

          public interface Executor {
              void execute(Runnable command);
          }

          我們可以看到Executor接口非常簡單,就一個void execute(Runnable command) 方法,代表提交一個任務(wù)。為了讓大家理解 java 線程池的整個設(shè)計方案,我會按照Doug Lea的設(shè)計思路來多說一些相關(guān)的東西。

          我們經(jīng)常這樣啟動一個線程:

          new Thread(new Runnable(){
            // do something
          }).start();

          用了線程池Executor后就可以像下面這么使用:

          Executor executor = anExecutor;
          executor.execute(new RunnableTask1());
          executor.execute(new RunnableTask2());

          如果我們希望線程池同步執(zhí)行每一個任務(wù),我們可以這么實現(xiàn)這個接口:

          class DirectExecutor implements Executor {
              public void execute(Runnable r) {
                  r.run();// 這里不是用的new Thread(r).start(),也就是說沒有啟動任何一個新的線程。
              }
          }

          我們希望每個任務(wù)提交進(jìn)來后,直接啟動一個新的線程來執(zhí)行這個任務(wù),我們可以這么實現(xiàn):

          class ThreadPerTaskExecutor implements Executor {
              public void execute(Runnable r) {
                  new Thread(r).start();  // 每個任務(wù)都用一個新的線程來執(zhí)行
              }
          }

          我們再來看下怎么組合兩個Executor來使用,下面這個實現(xiàn)是將所有的任務(wù)都加到一個queue中,然后從queue中取任務(wù),交給真正的執(zhí)行器執(zhí)行,這里采用synchronized進(jìn)行并發(fā)控制:

          class SerialExecutor implements Executor {
              // 任務(wù)隊列
              final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
              // 這個才是真正的執(zhí)行器
              final Executor executor;
              // 當(dāng)前正在執(zhí)行的任務(wù)
              Runnable active;
            
              // 初始化的時候,指定執(zhí)行器
              SerialExecutor(Executor executor) {
                  this.executor = executor;
              }
           
              // 添加任務(wù)到線程池: 將任務(wù)添加到任務(wù)隊列,scheduleNext 觸發(fā)執(zhí)行器去任務(wù)隊列取任務(wù)
              public synchronized void execute(final Runnable r) {
                  tasks.offer(new Runnable() {
                      public void run() {
                          try {
                              r.run();
                          } finally {
                              scheduleNext();
                          }
                      }
                  });
                  if (active == null) {
                      scheduleNext();
                  }
              }

              protected synchronized void scheduleNext() {
                  if ((active = tasks.poll()) != null) {
                      // 具體的執(zhí)行轉(zhuǎn)給真正的執(zhí)行器 executor
                      executor.execute(active);
                  }
              }
          }

          當(dāng)然了,Executor這個接口只有提交任務(wù)的功能,太簡單了,我們想要更豐富的功能,比如我們想知道執(zhí)行結(jié)果、我們想知道當(dāng)前線程池有多少個線程活著、已經(jīng)完成了多少任務(wù)等等,這些都是這個接口的不足的地方。接下來我們要介紹的是繼承自Executor接口的ExecutorService接口,這個接口提供了比較豐富的功能,也是我們最常使用到的接口。

          ExecutorService

          一般我們定義一個線程池的時候,往往都是使用這個接口:

          ExecutorService executor = Executors.newFixedThreadPool(args...);
          ExecutorService executor = Executors.newCachedThreadPool(args...);

          因為這個接口中定義的一系列方法大部分情況下已經(jīng)可以滿足我們的需要了。

          那么我們簡單初略地來看一下這個接口中都有哪些方法:

          public interface ExecutorService extends Executor {

              // 關(guān)閉線程池,已提交的任務(wù)繼續(xù)執(zhí)行,不接受繼續(xù)提交新任務(wù)
              void shutdown();

              // 關(guān)閉線程池,嘗試停止正在執(zhí)行的所有任務(wù),不接受繼續(xù)提交新任務(wù)
              // 它和前面的方法相比,加了一個單詞“now”,區(qū)別在于它會去停止當(dāng)前正在進(jìn)行的任務(wù)
              List<Runnable> shutdownNow();

              // 線程池是否已關(guān)閉
              boolean isShutdown();

              // 如果調(diào)用了 shutdown() 或 shutdownNow() 方法后,所有任務(wù)結(jié)束了,那么返回true
              // 這個方法必須在調(diào)用shutdown或shutdownNow方法之后調(diào)用才會返回true
              boolean isTerminated();

              // 等待所有任務(wù)完成,并設(shè)置超時時間
              // 我們這么理解,實際應(yīng)用中是,先調(diào)用 shutdown 或 shutdownNow,
              // 然后再調(diào)這個方法等待所有的線程真正地完成,返回值意味著有沒有超時
              boolean awaitTermination(long timeout, TimeUnit unit)
                      throws InterruptedException
          ;

              // 提交一個 Callable 任務(wù)
              <T> Future<T> submit(Callable<T> task);

              // 提交一個 Runnable 任務(wù),第二個參數(shù)將會放到 Future 中,作為返回值,
              // 因為 Runnable 的 run 方法本身并不返回任何東西
              <T> Future<T> submit(Runnable task, T result);

              // 提交一個 Runnable 任務(wù)
              Future<?> submit(Runnable task);

              // 執(zhí)行所有任務(wù),返回 Future 類型的一個 list
              <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
                      throws InterruptedException;

              // 也是執(zhí)行所有任務(wù),但是這里設(shè)置了超時時間
              <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                            long timeout, TimeUnit unit)
                      throws InterruptedException;

              // 只有其中的一個任務(wù)結(jié)束了,就可以返回,返回執(zhí)行完的那個任務(wù)的結(jié)果
              <T> invokeAny(Collection<? extends Callable<T>> tasks)
                      throws InterruptedException, ExecutionException
          ;
            
              // 同上一個方法,只有其中的一個任務(wù)結(jié)束了,就可以返回,返回執(zhí)行完的那個任務(wù)的結(jié)果,
              // 不過這個帶超時,超過指定的時間,拋出 TimeoutException 異常
              <T> invokeAny(Collection<? extends Callable<T>> tasks,
                              long timeout, TimeUnit unit)

                      throws InterruptedException, ExecutionException, TimeoutException
          ;
          }

          這些方法都很好理解,一個簡單的線程池主要就是這些功能,能提交任務(wù),能獲取結(jié)果,能關(guān)閉線程池,這也是為什么我們經(jīng)常用這個接口的原因。

          FutureTask

          在繼續(xù)往下層介紹ExecutorService的實現(xiàn)類之前,我們先來說說相關(guān)的類FutureTask。

          Future      Runnable
             \           /
              \         /
             RunnableFuture
                    |
                    |
                FutureTask
                
          FutureTask 通過 RunnableFuture 間接實現(xiàn)了 Runnable 接口,
          所以每個 Runnable 通常都先包裝成 FutureTask,
          然后調(diào)用 executor.execute(Runnable command) 將其提交給線程池

          我們知道,Runnable的void run() 方法是沒有返回值的,所以,通常,如果我們需要的話,會在submit中指定第二個參數(shù)作為返回值:

          <T> Future<T> submit(Runnable task, T result);

          其實到時候會通過這兩個參數(shù),將其包裝成Callable。它和Runnable的區(qū)別在于run()沒有返回值,而Callable的call()方法有返回值,同時,如果運行出現(xiàn)異常,call()方法會拋出異常。

          public interface Callable<V{
             
              call() throws Exception;
          }

          在這里,就不展開說FutureTask類了,因為本文篇幅本來就夠大了,這里我們需要知道怎么用就行了。

          下面,我們來看看ExecutorService的抽象實現(xiàn)AbstractExecutorService

          AbstractExecutorService

          AbstractExecutorService抽象類派生自ExecutorService接口,然后在其基礎(chǔ)上實現(xiàn)了幾個實用的方法,這些方法提供給子類進(jìn)行調(diào)用。

          這個抽象類實現(xiàn)了invokeAny方法和invokeAll方法,這里的兩個newTaskFor方法也比較有用,用于將任務(wù)包裝成FutureTask。定義于最上層接口Executor中的void execute(Runnable command)由于不需要獲取結(jié)果,不會進(jìn)行FutureTask的包裝。

          需要獲取結(jié)果(FutureTask),用submit方法,不需要獲取結(jié)果,可以用execute方法。

          下面,我將一行一行源碼地來分析這個類,跟著源碼來看看其實現(xiàn)吧:

          Tips: invokeAny和invokeAll方法占了這整個類的絕大多數(shù)篇幅,讀者可以選擇適當(dāng)跳過,因為它們可能在你的實踐中使用的頻次比較低,而且它們不帶有承前啟后的作用,不用擔(dān)心會漏掉什么導(dǎo)致看不懂后面的代碼。

          public abstract class AbstractExecutorService implements ExecutorService {

              // RunnableFuture 是用于獲取執(zhí)行結(jié)果的,我們常用它的子類 FutureTask
              // 下面兩個 newTaskFor 方法用于將我們的任務(wù)包裝成 FutureTask 提交到線程池中執(zhí)行
              protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
                  return new FutureTask<T>(runnable, value);
              }

              protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
                  return new FutureTask<T>(callable);
              }

              // 提交任務(wù)
              public Future<?> submit(Runnable task) {
                  if (task == nullthrow new NullPointerException();
                  // 1. 將任務(wù)包裝成 FutureTask
                  RunnableFuture<Void> ftask = newTaskFor(task, null);
                  // 2. 交給執(zhí)行器執(zhí)行,execute 方法由具體的子類來實現(xiàn)
                  // 前面也說了,F(xiàn)utureTask 間接實現(xiàn)了Runnable 接口。
                  execute(ftask);
                  return ftask;
              }

              public <T> Future<T> submit(Runnable task, T result) {
                  if (task == nullthrow new NullPointerException();
                  // 1. 將任務(wù)包裝成 FutureTask
                  RunnableFuture<T> ftask = newTaskFor(task, result);
                  // 2. 交給執(zhí)行器執(zhí)行
                  execute(ftask);
                  return ftask;
              }
            
              public <T> Future<T> submit(Callable<T> task) {
                  if (task == nullthrow new NullPointerException();
                  // 1. 將任務(wù)包裝成 FutureTask
                  RunnableFuture<T> ftask = newTaskFor(task);
                  // 2. 交給執(zhí)行器執(zhí)行
                  execute(ftask);
                  return ftask;
              }

              // 此方法目的:將 tasks 集合中的任務(wù)提交到線程池執(zhí)行,任意一個線程執(zhí)行完后就可以結(jié)束了
              // 第二個參數(shù) timed 代表是否設(shè)置超時機制,超時時間為第三個參數(shù),
              // 如果 timed 為 true,同時超時了還沒有一個線程返回結(jié)果,那么拋出 TimeoutException 異常
              private <T> doInvokeAny(Collection<? extends Callable<T>> tasks,
                                      boolean timed, long nanos)

                  throws InterruptedException, ExecutionException, TimeoutException 
          {
                  if (tasks == null)
                      throw new NullPointerException();
                  // 任務(wù)數(shù)
                  int ntasks = tasks.size();
                  if (ntasks == 0)
                      throw new IllegalArgumentException();
                  // 
                  List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
                
                  // ExecutorCompletionService 不是一個真正的執(zhí)行器,參數(shù) this 才是真正的執(zhí)行器
                  // 它對執(zhí)行器進(jìn)行了包裝,每個任務(wù)結(jié)束后,將結(jié)果保存到內(nèi)部的一個 completionQueue 隊列中
                  // 這也是為什么這個類的名字里面有個 Completion 的原因吧。
                  ExecutorCompletionService<T> ecs =
                      new ExecutorCompletionService<T>(this);
                  try {
                      // 用于保存異常信息,此方法如果沒有得到任何有效的結(jié)果,那么我們可以拋出最后得到的一個異常
                      ExecutionException ee = null;
                      long lastTime = timed ? System.nanoTime() : 0;
                      Iterator<? extends Callable<T>> it = tasks.iterator();

                      // 首先先提交一個任務(wù),后面的任務(wù)到下面的 for 循環(huán)一個個提交
                      futures.add(ecs.submit(it.next()));
                      // 提交了一個任務(wù),所以任務(wù)數(shù)量減 1
                      --ntasks;
                      // 正在執(zhí)行的任務(wù)數(shù)(提交的時候 +1,任務(wù)結(jié)束的時候 -1)
                      int active = 1;

                      for (;;) {
                          // ecs 上面說了,其內(nèi)部有一個 completionQueue 用于保存執(zhí)行完成的結(jié)果
                          // BlockingQueue 的 poll 方法不阻塞,返回 null 代表隊列為空
                          Future<T> f = ecs.poll();
                          // 為 null,說明剛剛提交的第一個線程還沒有執(zhí)行完成
                          // 在前面先提交一個任務(wù),加上這里做一次檢查,也是為了提高性能
                          if (f == null) {
                              if (ntasks > 0) {
                                  --ntasks;
                                  futures.add(ecs.submit(it.next()));
                                  ++active;
                              }
                              // 這里是 else if,不是 if。這里說明,沒有任務(wù)了,同時 active 為 0 說明
                              // 任務(wù)都執(zhí)行完成了。其實我也沒理解為什么這里做一次 break?
                              // 因為我認(rèn)為 active 為 0 的情況,必然從下面的 f.get() 返回了
                              
                              // 2018-02-23 感謝讀者 newmicro 的 comment,
                              //  這里的 active == 0,說明所有的任務(wù)都執(zhí)行失敗,那么這里是 for 循環(huán)出口
                              else if (active == 0)
                                  break;
                              // 這里也是 else if。這里說的是,沒有任務(wù)了,但是設(shè)置了超時時間,這里檢測是否超時
                              else if (timed) {
                                  // 帶等待的 poll 方法
                                  f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                                  // 如果已經(jīng)超時,拋出 TimeoutException 異常,這整個方法就結(jié)束了
                                  if (f == null)
                                      throw new TimeoutException();
                                  long now = System.nanoTime();
                                  nanos -= now - lastTime;
                                  lastTime = now;
                              }
                              // 這里是 else。說明,沒有任務(wù)需要提交,但是池中的任務(wù)沒有完成,還沒有超時(如果設(shè)置了超時)
                              // take() 方法會阻塞,直到有元素返回,說明有任務(wù)結(jié)束了
                              else
                                  f = ecs.take();
                          }
                          /*
                           * 我感覺上面這一段并不是很好理解,這里簡單說下。
                           * 1. 首先,這在一個 for 循環(huán)中,我們設(shè)想每一個任務(wù)都沒那么快結(jié)束,
                           *     那么,每一次都會進(jìn)到第一個分支,進(jìn)行提交任務(wù),直到將所有的任務(wù)都提交了
                           * 2. 任務(wù)都提交完成后,如果設(shè)置了超時,那么 for 循環(huán)其實進(jìn)入了“一直檢測是否超時”
                                 這件事情上
                           * 3. 如果沒有設(shè)置超時機制,那么不必要檢測超時,那就會阻塞在 ecs.take() 方法上,
                                 等待獲取第一個執(zhí)行結(jié)果
                           * 4. 如果所有的任務(wù)都執(zhí)行失敗,也就是說 future 都返回了,
                                 但是 f.get() 拋出異常,那么從 active == 0 分支出去(感謝 newmicro 提出)
                                   // 當(dāng)然,這個需要看下面的 if 分支。
                           */

                        
                        
                        
                          // 有任務(wù)結(jié)束了
                          if (f != null) {
                              --active;
                              try {
                                  // 返回執(zhí)行結(jié)果,如果有異常,都包裝成 ExecutionException
                                  return f.get();
                              } catch (ExecutionException eex) {
                                  ee = eex;
                              } catch (RuntimeException rex) {
                                  ee = new ExecutionException(rex);
                              }
                          }
                      }// 注意看 for 循環(huán)的范圍,一直到這里
                    
                      if (ee == null)
                          ee = new ExecutionException();
                      throw ee;
            
                  } finally {
                      // 方法退出之前,取消其他的任務(wù)
                      for (Future<T> f : futures)
                          f.cancel(true);
                  }
              }

              public <T> invokeAny(Collection<? extends Callable<T>> tasks)
                  throws InterruptedException, ExecutionException 
          {
                  try {
                      return doInvokeAny(tasks, false0);
                  } catch (TimeoutException cannotHappen) {
                      assert false;
                      return null;
                  }
              }

              public <T> invokeAny(Collection<? extends Callable<T>> tasks,
                                     long timeout, TimeUnit unit)

                  throws InterruptedException, ExecutionException, TimeoutException 
          {
                  return doInvokeAny(tasks, true, unit.toNanos(timeout));
              }

              // 執(zhí)行所有的任務(wù),返回任務(wù)結(jié)果。
              // 先不要看這個方法,我們先想想,其實我們自己提交任務(wù)到線程池,也是想要線程池執(zhí)行所有的任務(wù)
              // 只不過,我們是每次 submit 一個任務(wù),這里以一個集合作為參數(shù)提交
              public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
                  throws InterruptedException {
                  if (tasks == null)
                      throw new NullPointerException();
                  List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
                  boolean done = false;
                  try {
                      // 這個很簡單
                      for (Callable<T> t : tasks) {
                          // 包裝成 FutureTask
                          RunnableFuture<T> f = newTaskFor(t);
                          futures.add(f);
                          // 提交任務(wù)
                          execute(f);
                      }
                      for (Future<T> f : futures) {
                          if (!f.isDone()) {
                              try {
                                  // 這是一個阻塞方法,直到獲取到值,或拋出了異常
                                  // 這里有個小細(xì)節(jié),其實 get 方法簽名上是會拋出 InterruptedException 的
                                  // 可是這里沒有進(jìn)行處理,而是拋給外層去了。此異常發(fā)生于還沒執(zhí)行完的任務(wù)被取消了
                                  f.get();
                              } catch (CancellationException ignore) {
                              } catch (ExecutionException ignore) {
                              }
                          }
                      }
                      done = true;
                      // 這個方法返回,不像其他的場景,返回 List<Future>,其實執(zhí)行結(jié)果還沒出來
                      // 這個方法返回是真正的返回,任務(wù)都結(jié)束了
                      return futures;
                  } finally {
                      // 為什么要這個?就是上面說的有異常的情況
                      if (!done)
                          for (Future<T> f : futures)
                              f.cancel(true);
                  }
              }

              // 帶超時的 invokeAll,我們找不同吧
              public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                                   long timeout, TimeUnit unit)
                  throws InterruptedException {
                  if (tasks == null || unit == null)
                      throw new NullPointerException();
                  long nanos = unit.toNanos(timeout);
                  List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
                  boolean done = false;
                  try {
                      for (Callable<T> t : tasks)
                          futures.add(newTaskFor(t));

                      long lastTime = System.nanoTime();

                      Iterator<Future<T>> it = futures.iterator();
                      // 每提交一個任務(wù),檢測一次是否超時
                      while (it.hasNext()) {
                          execute((Runnable)(it.next()));
                          long now = System.nanoTime();
                          nanos -= now - lastTime;
                          lastTime = now;
                          // 超時
                          if (nanos <= 0)
                              return futures;
                      }

                      for (Future<T> f : futures) {
                          if (!f.isDone()) {
                              if (nanos <= 0)
                                  return futures;
                              try {
                                  // 調(diào)用帶超時的 get 方法,這里的參數(shù) nanos 是剩余的時間,
                                  // 因為上面其實已經(jīng)用掉了一些時間了
                                  f.get(nanos, TimeUnit.NANOSECONDS);
                              } catch (CancellationException ignore) {
                              } catch (ExecutionException ignore) {
                              } catch (TimeoutException toe) {
                                  return futures;
                              }
                              long now = System.nanoTime();
                              nanos -= now - lastTime;
                              lastTime = now;
                          }
                      }
                      done = true;
                      return futures;
                  } finally {
                      if (!done)
                          for (Future<T> f : futures)
                              f.cancel(true);
                  }
              }

          }

          到這里,我們發(fā)現(xiàn),這個抽象類包裝了一些基本的方法,可是像submit、invokeAny、invokeAll等方法,它們都沒有真正開啟線程來執(zhí)行任務(wù),它們都只是在方法內(nèi)部調(diào)用了execute方法,所以最重要的 execute(Runnable runnable)方法還沒出現(xiàn),需要等具體執(zhí)行器來實現(xiàn)這個最重要的部分,這里我們要說的就是ThreadPoolExecutor 類了。

          鑒于本文的篇幅,我覺得看到這里的讀者應(yīng)該已經(jīng)不多了,大家都習(xí)慣了快餐文化。我寫的每篇文章都力求讓讀者可以通過我的一篇文章而對相關(guān)內(nèi)容有全面的了解,所以篇幅不免長了些。

          ThreadPoolExecutor

          ThreadPoolExecutor是JDK中的線程池實現(xiàn),這個類實現(xiàn)了一個線程池需要的各個方法,它實現(xiàn)了任務(wù)提交、線程管理、監(jiān)控等等方法。

          我們可以基于它來進(jìn)行業(yè)務(wù)上的擴展,以實現(xiàn)我們需要的其他功能,比如實現(xiàn)定時任務(wù)的類ScheduledThreadPoolExecutor就繼承自ThreadPoolExecutor。當(dāng)然,這不是本文關(guān)注的重點,下面,還是趕緊進(jìn)行源碼分析吧。

          首先,我們來看看線程池實現(xiàn)中的幾個概念和處理流程。

          我們先回顧下提交任務(wù)的幾個方法:

          public Future<?> submit(Runnable task) {
              if (task == nullthrow new NullPointerException();
              RunnableFuture<Void> ftask = newTaskFor(task, null);
              execute(ftask);
              return ftask;
          }
          public <T> Future<T> submit(Runnable task, T result) {
              if (task == nullthrow new NullPointerException();
              RunnableFuture<T> ftask = newTaskFor(task, result);
              execute(ftask);
              return ftask;
          }
          public <T> Future<T> submit(Callable<T> task) {
              if (task == nullthrow new NullPointerException();
              RunnableFuture<T> ftask = newTaskFor(task);
              execute(ftask);
              return ftask;
          }

          一個最基本的概念是,submit方法中,參數(shù)是Runnable類型(也有Callable類型),這個參數(shù)不是用于 new Thread(runnable).start()中的,此處的這個參數(shù)不是用于啟動線程的,這里指的是任務(wù),任務(wù)要做的事情是run()方法里面定義的或Callable中的call()方法里面定義的。

          初學(xué)者往往會搞混這個,因為Runnable總是在各個地方出現(xiàn),經(jīng)常把一個Runnable包到另一個Runnable中。請把它想象成有個Task接口,這個接口里面有一個run()方法。

          我們回過神來繼續(xù)往下看,我畫了一個簡單的示意圖來描述線程池中的一些主要的構(gòu)件:

          pool-1

          當(dāng)然,上圖沒有考慮隊列是否有界,提交任務(wù)時隊列滿了怎么辦?什么情況下會創(chuàng)建新的線程?提交任務(wù)時線程池滿了怎么辦?空閑線程怎么關(guān)掉?這些問題下面我們會一一解決。

          我們經(jīng)常會使用Executors這個工具類來快速構(gòu)造一個線程池,對于初學(xué)者而言,這種工具類是很有用的,開發(fā)者不需要關(guān)注太多的細(xì)節(jié),只要知道自己需要一個線程池,僅僅提供必需的參數(shù)就可以了,其他參數(shù)都采用作者提供的默認(rèn)值。

          public static ExecutorService newFixedThreadPool(int nThreads) {
              return new ThreadPoolExecutor(nThreads, nThreads,
                                            0L, TimeUnit.MILLISECONDS,
                                            new LinkedBlockingQueue<Runnable>());
          }
          public static ExecutorService newCachedThreadPool() {
              return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                            60L, TimeUnit.SECONDS,
                                            new SynchronousQueue<Runnable>());
          }

          這里先不說有什么區(qū)別,它們最終都會導(dǎo)向這個構(gòu)造方法:

              public ThreadPoolExecutor(int corePoolSize,
                                        int maximumPoolSize,
                                        long keepAliveTime,
                                        TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue,
                                        ThreadFactory threadFactory,
                                        RejectedExecutionHandler handler)
           
          {
                  if (corePoolSize < 0 ||
                      maximumPoolSize <= 0 ||
                      maximumPoolSize < corePoolSize ||
                      keepAliveTime < 0)
                      throw new IllegalArgumentException();
                  // 這幾個參數(shù)都是必須要有的
                  if (workQueue == null || threadFactory == null || handler == null)
                      throw new NullPointerException();
                
                  this.corePoolSize = corePoolSize;
                  this.maximumPoolSize = maximumPoolSize;
                  this.workQueue = workQueue;
                  this.keepAliveTime = unit.toNanos(keepAliveTime);
                  this.threadFactory = threadFactory;
                  this.handler = handler;
              }

          基本上,上面的構(gòu)造方法中列出了我們最需要關(guān)心的幾個屬性了,下面逐個介紹下構(gòu)造方法中出現(xiàn)的這幾個屬性:

          • corePoolSize

            核心線程數(shù),不要摳字眼,反正先記著有這么個屬性就可以了。

          • maximumPoolSize

            最大線程數(shù),線程池允許創(chuàng)建的最大線程數(shù)。

          • workQueue

            任務(wù)隊列,BlockingQueue接口的某個實現(xiàn)(常使用ArrayBlockingQueue和LinkedBlockingQueue)。

          • keepAliveTime

            空閑線程的保活時間,如果某線程的空閑時間超過這個值都沒有任務(wù)給它做,那么可以被關(guān)閉了。注意這個值并不會對所有線程起作用,如果線程池中的線程數(shù)少于等于核心線程數(shù)corePoolSize,那么這些線程不會因為空閑太長時間而被關(guān)閉,當(dāng)然,也可以通過調(diào)用allowCoreThreadTimeOut(true)使核心線程數(shù)內(nèi)的線程也可以被回收。

          • threadFactory

            用于生成線程,一般我們可以用默認(rèn)的就可以了。通常,我們可以通過它將我們的線程的名字設(shè)置得比較可讀一些,如Message-Thread-1, Message-Thread-2類似這樣。

          • handler:

            當(dāng)線程池已經(jīng)滿了,但是又有新的任務(wù)提交的時候,該采取什么策略由這個來指定。有幾種方式可供選擇,像拋出異常、直接拒絕然后返回等,也可以自己實現(xiàn)相應(yīng)的接口實現(xiàn)自己的邏輯,這個之后再說。

          除了上面幾個屬性外,我們再看看其他重要的屬性。

          Doug Lea采用一個32位的整數(shù)來存放線程池的狀態(tài)和當(dāng)前池中的線程數(shù),其中高3位用于存放線程池狀態(tài),低29位表示線程數(shù)(即使只有29位,也已經(jīng)不小了,大概5億多,現(xiàn)在還沒有哪個機器能起這么多線程的吧)。我們知道,java語言在整數(shù)編碼上是統(tǒng)一的,都是采用補碼的形式,下面是簡單的移位操作和布爾操作,都是挺簡單的。


          private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

          // 這里 COUNT_BITS 設(shè)置為 29(32-3),意味著前三位用于存放線程狀態(tài),后29位用于存放線程數(shù)
          // 很多初學(xué)者很喜歡在自己的代碼中寫很多 29 這種數(shù)字,或者某個特殊的字符串,然后分布在各個地方,這是非常糟糕的
          private static final int COUNT_BITS = Integer.SIZE - 3;

          // 000 11111111111111111111111111111
          // 這里得到的是 29 個 1,也就是說線程池的最大線程數(shù)是 2^29-1=536870911
          // 以我們現(xiàn)在計算機的實際情況,這個數(shù)量還是夠用的
          private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

          // 我們說了,線程池的狀態(tài)存放在高 3 位中
          // 運算結(jié)果為 111跟29個0:111 00000000000000000000000000000
          private static final int RUNNING    = -1 << COUNT_BITS;
          // 000 00000000000000000000000000000
          private static final int SHUTDOWN   =  0 << COUNT_BITS;
          // 001 00000000000000000000000000000
          private static final int STOP       =  1 << COUNT_BITS;
          // 010 00000000000000000000000000000
          private static final int TIDYING    =  2 << COUNT_BITS;
          // 011 00000000000000000000000000000
          private static final int TERMINATED =  3 << COUNT_BITS;

          // 將整數(shù) c 的低 29 位修改為 0,就得到了線程池的狀態(tài)
          private static int runStateOf(int c)     return c & ~CAPACITY; }
          // 將整數(shù) c 的高 3 為修改為 0,就得到了線程池中的線程數(shù)
          private static int workerCountOf(int c)  return c & CAPACITY; }

          private static int ctlOf(int rs, int wc) return rs | wc; }

          /*
           * Bit field accessors that don't require unpacking ctl.
           * These depend on the bit layout and on workerCount being never negative.
           */


          private static boolean runStateLessThan(int c, int s) {
              return c < s;
          }

          private static boolean runStateAtLeast(int c, int s) {
              return c >= s;
          }

          private static boolean isRunning(int c) {
              return c < SHUTDOWN;
          }

          上面就是對一個整數(shù)的簡單的位操作,幾個操作方法將會在后面的源碼中一直出現(xiàn),所以讀者最好把方法名字和其代表的功能記住,看源碼的時候也就不需要來來回回翻了。

          在這里,介紹下線程池中的各個狀態(tài)和狀態(tài)變化的轉(zhuǎn)換過程:

          • RUNNING:這個沒什么好說的,這是最正常的狀態(tài):接受新的任務(wù),處理等待隊列中的任務(wù)
          • SHUTDOWN:不接受新的任務(wù)提交,但是會繼續(xù)處理等待隊列中的任務(wù)
          • STOP:不接受新的任務(wù)提交,不再處理等待隊列中的任務(wù),中斷正在執(zhí)行任務(wù)的線程
          • TIDYING:所有的任務(wù)都銷毀了,workCount為0。線程池的狀態(tài)在轉(zhuǎn)換為TIDYING狀態(tài)時,會執(zhí)行鉤子方法 terminated()
          • TERMINATED:terminated() 方法結(jié)束后,線程池的狀態(tài)就會變成這個

          RUNNING 定義為-1,SHUTDOWN定義為0,其他的都比0大,所以等于0的時候不能提交任務(wù),大于0的話,連正在執(zhí)行的任務(wù)也需要中斷。


          看了這幾種狀態(tài)的介紹,讀者大體也可以猜到十之八九的狀態(tài)轉(zhuǎn)換了,各個狀態(tài)的轉(zhuǎn)換過程有以下幾種:

          • RUNNING -> SHUTDOWN:當(dāng)調(diào)用了shutdown()后,會發(fā)生這個狀態(tài)轉(zhuǎn)換,這也是最重要的
          • (RUNNING or SHUTDOWN) -> STOP:當(dāng)調(diào)用shutdownNow()后,會發(fā)生這個狀態(tài)轉(zhuǎn)換,這下要清楚shutDown()和shutDownNow()的區(qū)別了
          • SHUTDOWN -> TIDYING:當(dāng)任務(wù)隊列和線程池都清空后,會由SHUTDOWN轉(zhuǎn)換為 TIDYING
          • STOP -> TIDYING:當(dāng)任務(wù)隊列清空后,發(fā)生這個轉(zhuǎn)換
          • TIDYING -> TERMINATED:這個前面說了,當(dāng)terminated()方法結(jié)束后

          上面的幾個記住核心的就可以了,尤其第一個和第二個。

          另外,我們還要看看一個內(nèi)部類Worker,因為Doug Lea把線程池中的線程包裝成了一個個Worker,翻譯成工人,就是線程池中做任務(wù)的線程。所以到這里,我們知道任務(wù)是Runnable(內(nèi)部變量名叫task或command),線程是Worker

          Worker這里又用到了抽象類AbstractQueuedSynchronizer。題外話,AQS在并發(fā)中真的是到處出現(xiàn),而且非常容易使用,寫少量的代碼就能實現(xiàn)自己需要的同步方式(對AQS源碼感興趣的讀者請參看我之前寫的幾篇文章)。

          private final class Worker
              extends AbstractQueuedSynchronizer
              implements Runnable
          {
              private static final long serialVersionUID = 6138294804551838833L;

              // 這個是真正的線程,任務(wù)靠你啦
              final Thread thread;
              
              // 前面說了,這里的 Runnable 是任務(wù)。為什么叫 firstTask?因為在創(chuàng)建線程的時候,如果同時指定了
              // 這個線程起來以后需要執(zhí)行的第一個任務(wù),那么第一個任務(wù)就是存放在這里的(線程可不止執(zhí)行這一個任務(wù))
              // 當(dāng)然了,也可以為 null,這樣線程起來了,自己到任務(wù)隊列(BlockingQueue)中取任務(wù)(getTask 方法)就行了
              Runnable firstTask;
              
              // 用于存放此線程完成的任務(wù)數(shù),注意了,這里用了 volatile,保證可見性
              volatile long completedTasks;

           // Worker 只有這一個構(gòu)造方法,傳入 firstTask,也可以傳 null
              Worker(Runnable firstTask) {
                  setState(-1); // inhibit interrupts until runWorker
                  this.firstTask = firstTask;
                  // 調(diào)用 ThreadFactory 來創(chuàng)建一個新的線程
                  this.thread = getThreadFactory().newThread(this);
              }

              // 這里調(diào)用了外部類的 runWorker 方法
              public void run() {
                  runWorker(this);
              }

           ...// 其他幾個方法沒什么好看的,就是用 AQS 操作,來獲取這個線程的執(zhí)行權(quán),用了獨占鎖
          }

          前面雖然啰嗦,但是簡單。有了上面的這些基礎(chǔ)后,我們終于可以看看ThreadPoolExecutor的execute方法了,前面源碼分析的時候也說了,各種方法都最終依賴于execute方法:

          public void execute(Runnable command) {
              if (command == null)
                  throw new NullPointerException();
            
              // 前面說的那個表示 “線程池狀態(tài)” 和 “線程數(shù)” 的整數(shù)
              int c = ctl.get();
            
              // 如果當(dāng)前線程數(shù)少于核心線程數(shù),那么直接添加一個 worker 來執(zhí)行任務(wù),
              // 創(chuàng)建一個新的線程,并把當(dāng)前任務(wù) command 作為這個線程的第一個任務(wù)(firstTask)
              if (workerCountOf(c) < corePoolSize) {
                  // 添加任務(wù)成功,那么就結(jié)束了。提交任務(wù)嘛,線程池已經(jīng)接受了這個任務(wù),這個方法也就可以返回了
                  // 至于執(zhí)行的結(jié)果,到時候會包裝到 FutureTask 中。
                  // 返回 false 代表線程池不允許提交任務(wù)
                  if (addWorker(command, true))
                      return;
                  c = ctl.get();
              }
              // 到這里說明,要么當(dāng)前線程數(shù)大于等于核心線程數(shù),要么剛剛 addWorker 失敗了
            
              // 如果線程池處于 RUNNING 狀態(tài),把這個任務(wù)添加到任務(wù)隊列 workQueue 中
              if (isRunning(c) && workQueue.offer(command)) {
                  /* 這里面說的是,如果任務(wù)進(jìn)入了 workQueue,我們是否需要開啟新的線程
                   * 因為線程數(shù)在 [0, corePoolSize) 是無條件開啟新的線程
                   * 如果線程數(shù)已經(jīng)大于等于 corePoolSize,那么將任務(wù)添加到隊列中,然后進(jìn)到這里
                   */

                  int recheck = ctl.get();
                  // 如果線程池已不處于 RUNNING 狀態(tài),那么移除已經(jīng)入隊的這個任務(wù),并且執(zhí)行拒絕策略
                  if (! isRunning(recheck) && remove(command))
                      reject(command);
                  // 如果線程池還是 RUNNING 的,并且線程數(shù)為 0,那么開啟新的線程
                  // 到這里,我們知道了,這塊代碼的真正意圖是:擔(dān)心任務(wù)提交到隊列中了,但是線程都關(guān)閉了
                  else if (workerCountOf(recheck) == 0)
                      addWorker(nullfalse);
              }
              // 如果 workQueue 隊列滿了,那么進(jìn)入到這個分支
              // 以 maximumPoolSize 為界創(chuàng)建新的 worker,
              // 如果失敗,說明當(dāng)前線程數(shù)已經(jīng)達(dá)到 maximumPoolSize,執(zhí)行拒絕策略
              else if (!addWorker(command, false))
                  reject(command);
          }

          對創(chuàng)建線程的錯誤理解:如果線程數(shù)少于corePoolSize,創(chuàng)建一個線程,如果線程數(shù)在 [corePoolSize, maximumPoolSize] 之間那么可以創(chuàng)建線程或復(fù)用空閑線程,keepAliveTime對這個區(qū)間的線程有效。

          從上面的幾個分支,我們就可以看出,上面的這段話是錯誤的。

          上面這些一時半會也不可能全部消化搞定,我們先繼續(xù)往下吧,到時候再回頭看幾遍。

          這個方法非常重要addWorker(Runnable firstTask, boolean core) 方法,我們看看它是怎么創(chuàng)建新的線程的:

          // 第一個參數(shù)是準(zhǔn)備提交給這個線程執(zhí)行的任務(wù),之前說了,可以為 null
          // 第二個參數(shù)為 true 代表使用核心線程數(shù) corePoolSize 作為創(chuàng)建線程的界限,也就說創(chuàng)建這個線程的時候,
          //   如果線程池中的線程總數(shù)已經(jīng)達(dá)到 corePoolSize,那么不能響應(yīng)這次創(chuàng)建線程的請求
          //   如果是 false,代表使用最大線程數(shù) maximumPoolSize 作為界限
          private boolean addWorker(Runnable firstTask, boolean core) {
              retry:
              for (;;) {
                  int c = ctl.get();
                  int rs = runStateOf(c);

                  // 這個非常不好理解
                  // 如果線程池已關(guān)閉,并滿足以下條件之一,那么不創(chuàng)建新的 worker:
                  // 1. 線程池狀態(tài)大于 SHUTDOWN,其實也就是 STOP, TIDYING, 或 TERMINATED
                  // 2. firstTask != null
                  // 3. workQueue.isEmpty()
                  // 簡單分析下:
                  // 還是狀態(tài)控制的問題,當(dāng)線程池處于 SHUTDOWN 的時候,不允許提交任務(wù),但是已有的任務(wù)繼續(xù)執(zhí)行
                  // 當(dāng)狀態(tài)大于 SHUTDOWN 時,不允許提交任務(wù),且中斷正在執(zhí)行的任務(wù)
                  // 多說一句:如果線程池處于 SHUTDOWN,但是 firstTask 為 null,且 workQueue 非空,那么是允許創(chuàng)建 worker 的
                  // 這是因為 SHUTDOWN 的語義:不允許提交新的任務(wù),但是要把已經(jīng)進(jìn)入到 workQueue 的任務(wù)執(zhí)行完,所以在滿足條件的基礎(chǔ)上,是允許創(chuàng)建新的 Worker 的
                  if (rs >= SHUTDOWN &&
                      ! (rs == SHUTDOWN &&
                         firstTask == null &&
                         ! workQueue.isEmpty()))
                      return false;

                  for (;;) {
                      int wc = workerCountOf(c);
                      if (wc >= CAPACITY ||
                          wc >= (core ? corePoolSize : maximumPoolSize))
                          return false;
                      // 如果成功,那么就是所有創(chuàng)建線程前的條件校驗都滿足了,準(zhǔn)備創(chuàng)建線程執(zhí)行任務(wù)了
                      // 這里失敗的話,說明有其他線程也在嘗試往線程池中創(chuàng)建線程
                      if (compareAndIncrementWorkerCount(c))
                          break retry;
                      // 由于有并發(fā),重新再讀取一下 ctl
                      c = ctl.get();
                      // 正常如果是 CAS 失敗的話,進(jìn)到下一個里層的for循環(huán)就可以了
                      // 可是如果是因為其他線程的操作,導(dǎo)致線程池的狀態(tài)發(fā)生了變更,如有其他線程關(guān)閉了這個線程池
                      // 那么需要回到外層的for循環(huán)
                      if (runStateOf(c) != rs)
                          continue retry;
                      // else CAS failed due to workerCount change; retry inner loop
                  }
              }

              /* 
               * 到這里,我們認(rèn)為在當(dāng)前這個時刻,可以開始創(chuàng)建線程來執(zhí)行任務(wù)了,
               * 因為該校驗的都校驗了,至于以后會發(fā)生什么,那是以后的事,至少當(dāng)前是滿足條件的
               */

            
              // worker 是否已經(jīng)啟動
              boolean workerStarted = false;
              // 是否已將這個 worker 添加到 workers 這個 HashSet 中
              boolean workerAdded = false;
              Worker w = null;
              try {
                  final ReentrantLock mainLock = this.mainLock;
                  // 把 firstTask 傳給 worker 的構(gòu)造方法
                  w = new Worker(firstTask);
                  // 取 worker 中的線程對象,之前說了,Worker的構(gòu)造方法會調(diào)用 ThreadFactory 來創(chuàng)建一個新的線程
                  final Thread t = w.thread;
                  if (t != null) {
                      // 這個是整個線程池的全局鎖,持有這個鎖才能讓下面的操作“順理成章”,
                      // 因為關(guān)閉一個線程池需要這個鎖,至少我持有鎖的期間,線程池不會被關(guān)閉
                      mainLock.lock();
                      try {

                          int c = ctl.get();
                          int rs = runStateOf(c);

                          // 小于 SHUTTDOWN 那就是 RUNNING,這個自不必說,是最正常的情況
                          // 如果等于 SHUTDOWN,前面說了,不接受新的任務(wù),但是會繼續(xù)執(zhí)行等待隊列中的任務(wù)
                          if (rs < SHUTDOWN ||
                              (rs == SHUTDOWN && firstTask == null)) {
                              // worker 里面的 thread 可不能是已經(jīng)啟動的
                              if (t.isAlive())
                                  throw new IllegalThreadStateException();
                              // 加到 workers 這個 HashSet 中
                              workers.add(w);
                              int s = workers.size();
                              // largestPoolSize 用于記錄 workers 中的個數(shù)的最大值
                              // 因為 workers 是不斷增加減少的,通過這個值可以知道線程池的大小曾經(jīng)達(dá)到的最大值
                              if (s > largestPoolSize)
                                  largestPoolSize = s;
                              workerAdded = true;
                          }
                      } finally {
                          mainLock.unlock();
                      }
                      // 添加成功的話,啟動這個線程
                      if (workerAdded) {
                          // 啟動線程
                          t.start();
                          workerStarted = true;
                      }
                  }
              } finally {
                  // 如果線程沒有啟動,需要做一些清理工作,如前面 workCount 加了 1,將其減掉
                  if (! workerStarted)
                      addWorkerFailed(w);
              }
              // 返回線程是否啟動成功
              return workerStarted;
          }

          簡單看下addWorkFailed的處理:

          // workers 中刪除掉相應(yīng)的 worker
          // workCount 減 1
          private void addWorkerFailed(Worker w) {
              final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              try {
                  if (w != null)
                      workers.remove(w);
                  decrementWorkerCount();
                  // rechecks for termination, in case the existence of this worker was holding up termination
                  tryTerminate();
              } finally {
                  mainLock.unlock();
              }
          }

          回過頭來,繼續(xù)往下走。我們知道,worker中的線程start后,其run方法會調(diào)用runWorker方法:

          // Worker 類的 run() 方法
          public void run() {
              runWorker(this);
          }

          繼續(xù)往下看runWorker方法:

          // 此方法由 worker 線程啟動后調(diào)用,這里用一個 while 循環(huán)來不斷地從等待隊列中獲取任務(wù)并執(zhí)行
          // 前面說了,worker 在初始化的時候,可以指定 firstTask,那么第一個任務(wù)也就可以不需要從隊列中獲取
          final void runWorker(Worker w) {
              // 
              Thread wt = Thread.currentThread();
              // 該線程的第一個任務(wù)(如果有的話)
              Runnable task = w.firstTask;
              w.firstTask = null;
              w.unlock(); // allow interrupts
              boolean completedAbruptly = true;
              try {
                  // 循環(huán)調(diào)用 getTask 獲取任務(wù)
                  while (task != null || (task = getTask()) != null) {
                      w.lock();          
                      // 如果線程池狀態(tài)大于等于 STOP,那么意味著該線程也要中斷
                      if ((runStateAtLeast(ctl.get(), STOP) ||
                           (Thread.interrupted() &&
                            runStateAtLeast(ctl.get(), STOP))) &&
                          !wt.isInterrupted())
                          wt.interrupt();
                      try {
                          // 這是一個鉤子方法,留給需要的子類實現(xiàn)
                          beforeExecute(wt, task);
                          Throwable thrown = null;
                          try {
                              // 到這里終于可以執(zhí)行任務(wù)了
                              task.run();
                          } catch (RuntimeException x) {
                              thrown = x; throw x;
                          } catch (Error x) {
                              thrown = x; throw x;
                          } catch (Throwable x) {
                              // 這里不允許拋出 Throwable,所以轉(zhuǎn)換為 Error
                              thrown = x; throw new Error(x);
                          } finally {
                              // 也是一個鉤子方法,將 task 和異常作為參數(shù),留給需要的子類實現(xiàn)
                              afterExecute(task, thrown);
                          }
                      } finally {
                          // 置空 task,準(zhǔn)備 getTask 獲取下一個任務(wù)
                          task = null;
                          // 累加完成的任務(wù)數(shù)
                          w.completedTasks++;
                          // 釋放掉 worker 的獨占鎖
                          w.unlock();
                      }
                  }
                  completedAbruptly = false;
              } finally {
                  // 如果到這里,需要執(zhí)行線程關(guān)閉:
                  // 1. 說明 getTask 返回 null,也就是說,隊列中已經(jīng)沒有任務(wù)需要執(zhí)行了,執(zhí)行關(guān)閉
                  // 2. 任務(wù)執(zhí)行過程中發(fā)生了異常
                  // 第一種情況,已經(jīng)在代碼處理了將 workCount 減 1,這個在 getTask 方法分析中會說
                  // 第二種情況,workCount 沒有進(jìn)行處理,所以需要在 processWorkerExit 中處理
                  // 限于篇幅,我不準(zhǔn)備分析這個方法了,感興趣的讀者請自行分析源碼
                  processWorkerExit(w, completedAbruptly);
              }
          }

          我們看看getTask()是怎么獲取任務(wù)的,這個方法寫得真的很好,每一行都很簡單,組合起來卻所有的情況都想好了:

          // 此方法有三種可能:
          // 1. 阻塞直到獲取到任務(wù)返回。我們知道,默認(rèn) corePoolSize 之內(nèi)的線程是不會被回收的,
          //      它們會一直等待任務(wù)
          // 2. 超時退出。keepAliveTime 起作用的時候,也就是如果這么多時間內(nèi)都沒有任務(wù),那么應(yīng)該執(zhí)行關(guān)閉
          // 3. 如果發(fā)生了以下條件,此方法必須返回 null:
          //    - 池中有大于 maximumPoolSize 個 workers 存在(通過調(diào)用 setMaximumPoolSize 進(jìn)行設(shè)置)
          //    - 線程池處于 SHUTDOWN,而且 workQueue 是空的,前面說了,這種不再接受新的任務(wù)
          //    - 線程池處于 STOP,不僅不接受新的線程,連 workQueue 中的線程也不再執(zhí)行
          private Runnable getTask() {
              boolean timedOut = false// Did the last poll() time out?
            
              retry:
              for (;;) {
                  int c = ctl.get();
                  int rs = runStateOf(c);
                  // 兩種可能
                  // 1. rs == SHUTDOWN && workQueue.isEmpty()
                  // 2. rs >= STOP
                  if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                      // CAS 操作,減少工作線程數(shù)
                      decrementWorkerCount();
                      return null;
                  }

                  boolean timed;      // Are workers subject to culling?
                  for (;;) {
                      int wc = workerCountOf(c);
                      // 允許核心線程數(shù)內(nèi)的線程回收,或當(dāng)前線程數(shù)超過了核心線程數(shù),那么有可能發(fā)生超時關(guān)閉
                      timed = allowCoreThreadTimeOut || wc > corePoolSize;

                      // 這里 break,是為了不往下執(zhí)行后一個 if (compareAndDecrementWorkerCount(c))
                      // 兩個 if 一起看:如果當(dāng)前線程數(shù) wc > maximumPoolSize,或者超時,都返回 null
                      // 那這里的問題來了,wc > maximumPoolSize 的情況,為什么要返回 null?
                      //    換句話說,返回 null 意味著關(guān)閉線程。
                      // 那是因為有可能開發(fā)者調(diào)用了 setMaximumPoolSize() 將線程池的 maximumPoolSize 調(diào)小了,那么多余的 Worker 就需要被關(guān)閉
                      if (wc <= maximumPoolSize && ! (timedOut && timed))
                          break;
                      if (compareAndDecrementWorkerCount(c))
                          return null;
                      c = ctl.get();  // Re-read ctl
                      // compareAndDecrementWorkerCount(c) 失敗,線程池中的線程數(shù)發(fā)生了改變
                      if (runStateOf(c) != rs)
                          continue retry;
                      // else CAS failed due to workerCount change; retry inner loop
                  }
                  // wc <= maximumPoolSize 同時沒有超時
                  try {
                      // 到 workQueue 中獲取任務(wù)
                      Runnable r = timed ?
                          workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                          workQueue.take();
                      if (r != null)
                          return r;
                      timedOut = true;
                  } catch (InterruptedException retry) {
                      // 如果此 worker 發(fā)生了中斷,采取的方案是重試
                      // 解釋下為什么會發(fā)生中斷,這個讀者要去看 setMaximumPoolSize 方法。
                    
                      // 如果開發(fā)者將 maximumPoolSize 調(diào)小了,導(dǎo)致其小于當(dāng)前的 workers 數(shù)量,
                      // 那么意味著超出的部分線程要被關(guān)閉。重新進(jìn)入 for 循環(huán),自然會有部分線程會返回 null
                      timedOut = false;
                  }
              }
          }

          到這里,基本上也說完了整個流程,讀者這個時候應(yīng)該回到execute(Runnable command)方法,看看各個分支,我把代碼貼過來一下:

          public void execute(Runnable command) {
              if (command == null)
                  throw new NullPointerException();
            
              // 前面說的那個表示 “線程池狀態(tài)” 和 “線程數(shù)” 的整數(shù)
              int c = ctl.get();
            
              // 如果當(dāng)前線程數(shù)少于核心線程數(shù),那么直接添加一個 worker 來執(zhí)行任務(wù),
              // 創(chuàng)建一個新的線程,并把當(dāng)前任務(wù) command 作為這個線程的第一個任務(wù)(firstTask)
              if (workerCountOf(c) < corePoolSize) {
                  // 添加任務(wù)成功,那么就結(jié)束了。提交任務(wù)嘛,線程池已經(jīng)接受了這個任務(wù),這個方法也就可以返回了
                  // 至于執(zhí)行的結(jié)果,到時候會包裝到 FutureTask 中。
                  // 返回 false 代表線程池不允許提交任務(wù)
                  if (addWorker(command, true))
                      return;
                  c = ctl.get();
              }
              // 到這里說明,要么當(dāng)前線程數(shù)大于等于核心線程數(shù),要么剛剛 addWorker 失敗了
            
              // 如果線程池處于 RUNNING 狀態(tài),把這個任務(wù)添加到任務(wù)隊列 workQueue 中
              if (isRunning(c) && workQueue.offer(command)) {
                  /* 這里面說的是,如果任務(wù)進(jìn)入了 workQueue,我們是否需要開啟新的線程
                   * 因為線程數(shù)在 [0, corePoolSize) 是無條件開啟新的線程
                   * 如果線程數(shù)已經(jīng)大于等于 corePoolSize,那么將任務(wù)添加到隊列中,然后進(jìn)到這里
                   */

                  int recheck = ctl.get();
                  // 如果線程池已不處于 RUNNING 狀態(tài),那么移除已經(jīng)入隊的這個任務(wù),并且執(zhí)行拒絕策略
                  if (! isRunning(recheck) && remove(command))
                      reject(command);
                  // 如果線程池還是 RUNNING 的,并且線程數(shù)為 0,那么開啟新的線程
                  // 到這里,我們知道了,這塊代碼的真正意圖是:擔(dān)心任務(wù)提交到隊列中了,但是線程都關(guān)閉了
                  else if (workerCountOf(recheck) == 0)
                      addWorker(nullfalse);
              }
              // 如果 workQueue 隊列滿了,那么進(jìn)入到這個分支
              // 以 maximumPoolSize 為界創(chuàng)建新的 worker,
              // 如果失敗,說明當(dāng)前線程數(shù)已經(jīng)達(dá)到 maximumPoolSize,執(zhí)行拒絕策略
              else if (!addWorker(command, false))
                  reject(command);
          }

          上面各個分支中,有兩種情況會調(diào)用reject(command)來處理任務(wù),因為按照正常的流程,線程池此時不能接受這個任務(wù),所以需要執(zhí)行我們的拒絕策略。接下來,我們說一說ThreadPoolExecutor中的拒絕策略。

          final void reject(Runnable command) {
              // 執(zhí)行拒絕策略
              handler.rejectedExecution(command, this);
          }

          此處的handler我們需要在構(gòu)造線程池的時候就傳入這個參數(shù),它是RejectedExecutionHandler的實例。

          RejectedExecutionHandler在ThreadPoolExecutor中有四個已經(jīng)定義好的實現(xiàn)類可供我們直接使用,當(dāng)然,我們也可以實現(xiàn)自己的策略,不過一般也沒有必要。

          // 只要線程池沒有被關(guān)閉,那么由提交任務(wù)的線程自己來執(zhí)行這個任務(wù)。
          public static class CallerRunsPolicy implements RejectedExecutionHandler {
              public CallerRunsPolicy() { }
              public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                  if (!e.isShutdown()) {
                      r.run();
                  }
              }
          }

          // 不管怎樣,直接拋出 RejectedExecutionException 異常
          // 這個是默認(rèn)的策略,如果我們構(gòu)造線程池的時候不傳相應(yīng)的 handler 的話,那就會指定使用這個
          public static class AbortPolicy implements RejectedExecutionHandler {
              public AbortPolicy() { }
              public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                  throw new RejectedExecutionException("Task " + r.toString() +
                                                       " rejected from " +
                                                       e.toString());
              }
          }

          // 不做任何處理,直接忽略掉這個任務(wù)
          public static class DiscardPolicy implements RejectedExecutionHandler {
              public DiscardPolicy() { }
              public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
              }
          }

          // 這個相對霸道一點,如果線程池沒有被關(guān)閉的話,
          // 把隊列隊頭的任務(wù)(也就是等待了最長時間的)直接扔掉,然后提交這個任務(wù)到等待隊列中
          public static class DiscardOldestPolicy implements RejectedExecutionHandler {
              public DiscardOldestPolicy() { }
              public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                  if (!e.isShutdown()) {
                      e.getQueue().poll();
                      e.execute(r);
                  }
              }
          }

          到這里,ThreadPoolExecutor的源碼算是分析結(jié)束了。單純從源碼的難易程度來說,ThreadPoolExecutor的源碼還算是比較簡單的,只是需要我們靜下心來好好看看罷了。

          Executors

          這節(jié)其實也不是分析Executors這個類,因為它僅僅是工具類,它的所有方法都是static的。

          • 生成一個固定大小的線程池:
          public static ExecutorService newFixedThreadPool(int nThreads) {
              return new ThreadPoolExecutor(nThreads, nThreads,
                                            0L, TimeUnit.MILLISECONDS,
                                            new LinkedBlockingQueue<Runnable>());
          }

          最大線程數(shù)設(shè)置為與核心線程數(shù)相等,此時keepAliveTime設(shè)置為0(因為這里它是沒用的,即使不為0,線程池默認(rèn)也不會回收corePoolSize內(nèi)的線程),任務(wù)隊列采用LinkedBlockingQueue,無界隊列。

          過程分析:剛開始,每提交一個任務(wù)都創(chuàng)建一個worker,當(dāng)worker的數(shù)量達(dá)到nThreads后,不再創(chuàng)建新的線程,而是把任務(wù)提交到LinkedBlockingQueue中,而且之后線程數(shù)始終為nThreads。

          • 生成只有一個線程的固定線程池,這個更簡單,和上面的一樣,只要設(shè)置線程數(shù)為1就可以了:
          public static ExecutorService newSingleThreadExecutor() {
              return new FinalizableDelegatedExecutorService
                  (new ThreadPoolExecutor(11,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>()));
          }
          • 生成一個需要的時候就創(chuàng)建新的線程,同時可以復(fù)用之前創(chuàng)建的線程(如果這個線程當(dāng)前沒有任務(wù))的線程池:
          public static ExecutorService newCachedThreadPool() {
              return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                            60L, TimeUnit.SECONDS,
                                            new SynchronousQueue<Runnable>());
          }

          核心線程數(shù)為0,最大線程數(shù)為Integer.MAX_VALUE,keepAliveTime為60秒,任務(wù)隊列采用SynchronousQueue。

          這種線程池對于任務(wù)可以比較快速地完成的情況有比較好的性能。如果線程空閑了60秒都沒有任務(wù),那么將關(guān)閉此線程并從線程池中移除。所以如果線程池空閑了很長時間也不會有問題,因為隨著所有的線程都會被關(guān)閉,整個線程池不會占用任何的系統(tǒng)資源。

          過程分析:把execute方法的主體黏貼過來,讓大家看得明白些。鑒于corePoolSize是0,那么提交任務(wù)的時候,直接將任務(wù)提交到隊列中,由于采用了SynchronousQueue,所以如果是第一個任務(wù)提交的時候,offer方法肯定會返回false,因為此時沒有任何worker對這個任務(wù)進(jìn)行接收,那么將進(jìn)入到最后一個分支來創(chuàng)建第一個worker。

          之后再提交任務(wù)的話,取決于是否有空閑下來的線程對任務(wù)進(jìn)行接收,如果有,會進(jìn)入到第二個if語句塊中,否則就是和第一個任務(wù)一樣,進(jìn)到最后的else if分支創(chuàng)建新線程。

          int c = ctl.get();
          // corePoolSize 為 0,所以不會進(jìn)到這個 if 分支
          if (workerCountOf(c) < corePoolSize) {
              if (addWorker(command, true))
                  return;
              c = ctl.get();
          }
          // offer 如果有空閑線程剛好可以接收此任務(wù),那么返回 true,否則返回 false
          if (isRunning(c) && workQueue.offer(command)) {
              int recheck = ctl.get();
              if (! isRunning(recheck) && remove(command))
                  reject(command);
              else if (workerCountOf(recheck) == 0)
                  addWorker(nullfalse);
          }
          else if (!addWorker(command, false))
              reject(command);

          SynchronousQueue是一個比較特殊的BlockingQueue,其本身不儲存任何元素,它有一個虛擬隊列(或虛擬棧),不管讀操作還是寫操作,如果當(dāng)前隊列中存儲的是與當(dāng)前操作相同模式的線程,那么當(dāng)前操作也進(jìn)入隊列中等待;如果是相反模式,則配對成功,從當(dāng)前隊列中取隊頭節(jié)點。具體的信息,可以看我的另一篇關(guān)于BlockingQueue的文章。


          總結(jié)

          本文的總結(jié)部分為準(zhǔn)備面試的讀者而寫,希望能幫到面試者或者沒有足夠的時間看完全文的讀者。

          1. java線程池有哪些關(guān)鍵屬性?

            corePoolSize,maximumPoolSize,workQueue,keepAliveTime,rejectedExecutionHandler

            corePoolSize到 maximumPoolSize 之間的線程會被回收,當(dāng)然corePoolSize 的線程也可以通過設(shè)置而得到回收(allowCoreThreadTimeOut(true))。

            workQueue用于存放任務(wù),添加任務(wù)的時候,如果當(dāng)前線程數(shù)超過了corePoolSize,那么往該隊列中插入任務(wù),線程池中的線程會負(fù)責(zé)到隊列中拉取任務(wù)。

            keepAliveTime用于設(shè)置空閑時間,如果線程數(shù)超出了corePoolSize,并且有些線程的空閑時間超過了這個值,會執(zhí)行關(guān)閉這些線程的操作

            rejectedExecutionHandler用于處理當(dāng)線程池不能執(zhí)行此任務(wù)時的情況,默認(rèn)有拋出RejectedExecutionException 異常忽略任務(wù)使用提交任務(wù)的線程來執(zhí)行此任務(wù)將隊列中等待最久的任務(wù)刪除,然后提交此任務(wù)這四種策略,默認(rèn)為拋出異常。

            說說線程池中的線程創(chuàng)建時機?

            a、如果當(dāng)前線程數(shù)少于 corePoolSize,那么提交任務(wù)的時候創(chuàng)建一個新的線程,并由這個線程執(zhí)行這個任務(wù);b、如果當(dāng)前線程數(shù)已經(jīng)達(dá)到 corePoolSize,那么將提交的任務(wù)添加到隊列中,等待線程池中的線程去隊列中取任務(wù)。c、如果隊列已滿,那么創(chuàng)建新的線程來執(zhí)行任務(wù),需要保證池中的線程數(shù)不會超過 maximumPoolSize,如果此時線程數(shù)超過了 maximumPoolSize,那么執(zhí)行拒絕策略

            * 注意:如果將隊列設(shè)置為無界隊列,那么線程數(shù)達(dá)到corePoolSize后,其實線程數(shù)就不會再增長了。因為后面的任務(wù)直接往隊列塞就行了,此時maximumPoolSize參數(shù)就沒有什么意義。

          2. Executors.newFixedThreadPool(…)和Executors.newCachedThreadPool()構(gòu)造出來的線程池有什么差別?

            細(xì)說太長,往上滑一點點,在Executors的小節(jié)進(jìn)行了詳盡的描述。

          3. 任務(wù)執(zhí)行過程中發(fā)生異常怎么處理?

            如果某個任務(wù)執(zhí)行出現(xiàn)異常,那么執(zhí)行任務(wù)的線程會被關(guān)閉,而不是繼續(xù)接收其他任務(wù)。然后會啟動一個新的線程來代替它。

          4. 什么時候會執(zhí)行拒絕策略?

            a、workers的數(shù)量達(dá)到了corePoolSize(任務(wù)此時需要進(jìn)入任務(wù)隊列),任務(wù)入隊成功,與此同時線程池被關(guān)閉了,而且關(guān)閉線程池并沒有將這個任務(wù)出隊,那么執(zhí)行拒絕策略。這里說的是非常邊界的問題,入隊和關(guān)閉線程池并發(fā)執(zhí)行,讀者仔細(xì)看看execute方法是怎么進(jìn)到第一個reject(command)里面的;b、workers 的數(shù)量大于等于 corePoolSize,將任務(wù)加入到任務(wù)隊列,可是隊列滿了,任務(wù)入隊失敗,那么準(zhǔn)備開啟新的線程,可是線程數(shù)已經(jīng)達(dá)到 maximumPoolSize,那么執(zhí)行拒絕策略

          因為本文實在太長了,所以就沒有說執(zhí)行結(jié)果是怎么獲取的,也沒有說關(guān)閉線程池相關(guān)的部分,這個就留給讀者吧。

          本文篇幅是有點長,如果讀者發(fā)現(xiàn)什么不對的地方,或者有需要補充的地方,請不吝提出,謝謝。

          往期推薦

          監(jiān)聽器模式和觀察者模式的關(guān)系,寫點你不知道的

          直觀講解一下RPC調(diào)用和HTTP調(diào)用的區(qū)別!

          Github代碼fork之后,如何與原倉庫進(jìn)行同步?

          @PostConstruct注解是Spring提供的?今天講點不一樣的

          再聊面試,這次關(guān)于錢,關(guān)于培訓(xùn),關(guān)于內(nèi)卷



          如果你覺得這篇文章不錯,那么,下篇通常會更好。添加微信好友,可備注“加群”(微信號:zhuan2quan)

          一篇文章就看透技術(shù)本質(zhì)的人,
            和花一輩子都看不清的人,
            注定是截然不同的搬磚生涯。
          ▲ 按關(guān)注”程序新視界“,洞察技術(shù)內(nèi)幕
          瀏覽 71
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲不卡无码影院 | 综合 欧美 亚洲 | 麻豆国产精品视频 | 懂色av无码任你操久久久久蜜桃av | 特级西西高清4Www电影 |