<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          ThreadPoolExecutor 深入解析

          共 42235字,需瀏覽 85分鐘

           ·

          2021-03-14 13:45

          點(diǎn)擊下方“IT牧場(chǎng)”,選擇“設(shè)為星標(biāo)”


          本文來(lái)源:http://rrd.me/g6P3V


          Java中的線程池是運(yùn)用場(chǎng)景最多的并發(fā)框架,幾乎所有需要異步或并發(fā)執(zhí)行任務(wù)的程序 都可以使用線程池。合理地使用線程池能夠帶來(lái)3個(gè)好處:

          1. 降低資源消耗。通過(guò)重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
          2. 提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。
          3. 提高線程的可管理性。線程是稀缺資源,如果無(wú)限制地創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源, 還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控。

          線程池的主要處理流程

          ThreadPoolExecutor 類圖

          java中的線程池都是基于ThreadPoolExecutor 來(lái)實(shí)現(xiàn)的。

          核心屬性

          // 狀態(tài)控制屬性:高3位表示線程池的運(yùn)行狀態(tài),剩下的29位表示當(dāng)前有效的線程數(shù)量
          private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

          // 線程池的基本大小,當(dāng)提交一個(gè)任務(wù)到線程池時(shí),線程池會(huì)創(chuàng)建一個(gè)線程來(lái)執(zhí)行任務(wù),
          // 即使其他空閑的基本線程能夠執(zhí)行新任務(wù)也會(huì)創(chuàng)建線程,等到需要執(zhí)行的任務(wù)數(shù)大于
          // 線程池基本大小時(shí)就不再創(chuàng)建。如果調(diào)用了線程池的prestartAllCoreThreads()方法,
          // 線程池會(huì)提前創(chuàng)建并啟動(dòng)所有基本線程。
          private volatile int corePoolSize;

          // 線程池線程最大數(shù)量,如果隊(duì)列滿了,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),
          // 則線程池會(huì)再創(chuàng)建新的線程執(zhí)行任務(wù)。如果使用了無(wú)界的任務(wù)隊(duì)列這個(gè)參數(shù)就沒(méi)什么效果。
          private volatile int maximumPoolSize;

          // 用于設(shè)置創(chuàng)建線程的工廠,可以通過(guò)線程工廠給每個(gè)創(chuàng)建出來(lái)的線程設(shè) 置更有意義的名字。
          private volatile ThreadFactory threadFactory;

          // 飽和策略,默認(rèn)情況下是AbortPolicy。
          private volatile RejectedExecutionHandler handler;

          // 線程池的工作線程空閑后,保持存活的時(shí)間。如果任務(wù)很多,并且每個(gè)任務(wù)執(zhí)行的時(shí)間比較短,
          // 可以調(diào)大時(shí)間,提高線程的利用率。
          private volatile long keepAliveTime;

          // 用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列,具體可以參考[JAVA并發(fā)容器-阻塞隊(duì)列](https://www.jianshu.com/p/5646fb5faee1)
          private final BlockingQueue<Runnable> workQueue;

          // 存放工作線程的容器,必須獲取到鎖才能訪問(wèn)
          private final HashSet<Worker> workers = new HashSet<Worker>();

          // ctl的拆包和包裝
          private static int runStateOf(int c)     return c & ~CAPACITY; }
          private static int workerCountOf(int c)  return c & CAPACITY; }
          private static int ctlOf(int rs, int wc) return rs | wc; }
          • 阻塞隊(duì)列可以參考JAVA并發(fā)容器-阻塞隊(duì)列。
          • ctl狀態(tài)控制屬性,高3位表示線程池的運(yùn)行狀態(tài)(runState),剩下的29位表示當(dāng)前有效的線程數(shù)量(workerCount
          • 線程池最大線程數(shù)是(1 << COUNT_BITS) - 1 = 536 870 911

          線程池的運(yùn)行狀態(tài)runState

          狀態(tài)解釋
          RUNNING運(yùn)行態(tài),可處理新任務(wù)并執(zhí)行隊(duì)列中的任務(wù)
          SHUTDOW關(guān)閉態(tài),不接受新任務(wù),但處理隊(duì)列中的任務(wù)
          STOP停止態(tài),不接受新任務(wù),不處理隊(duì)列中任務(wù),且打斷運(yùn)行中任務(wù)
          TIDYING整理態(tài),所有任務(wù)已經(jīng)結(jié)束,workerCount = 0 ,將執(zhí)行terminated()方法
          TERMINATED結(jié)束態(tài),terminated() 方法已完成

          RejectedExecutionHandler(拒絕策略)

          • AbortPolicy:直接拋出異常。
          • CallerRunsPolicy:只用調(diào)用者所在線程來(lái)運(yùn)行任務(wù)。
          • DiscardOldestPolicy:丟棄隊(duì)列里最近的一個(gè)任務(wù),并執(zhí)行當(dāng)前任務(wù)。
          • DiscardPolicy:不處理,丟棄掉。

          核心內(nèi)部類 Worker

          private final class Worker  extends AbstractQueuedSynchronizer  implements Runnable {
              // 正在執(zhí)行任務(wù)的線程
              final Thread thread;
              // 線程創(chuàng)建時(shí)初始化的任務(wù)
              Runnable firstTask;
              // 完成任務(wù)計(jì)數(shù)器
              volatile long completedTasks;

              Worker(Runnable firstTask) {
                  // 在runWorker方法運(yùn)行之前禁止中斷,要中斷線程必須先獲取worker內(nèi)部的互斥鎖
                  setState(-1); // inhibit interrupts until runWorker
                  this.firstTask = firstTask;
                  this.thread = getThreadFactory().newThread(this);
              }

              /** delegates main run loop to outer runworker  */
              // 直接委托給外部runworker方法
              public void run() {
                  runWorker(this);
              }
          ...
          }

          Worker 類他將執(zhí)行任務(wù)的線程封裝到了內(nèi)部,在初始化Worker 的時(shí)候,會(huì)調(diào)用ThreadFactory初始化新線程;Worker 繼承了AbstractQueuedSynchronizer,在內(nèi)部實(shí)現(xiàn)了一個(gè)互斥鎖,主要目的是控制工作線程的中斷狀態(tài)。

          線程的中斷一般是由其他線程發(fā)起的,比如ThreadPoolExecutor#interruptIdleWorkers(boolean)方法,它在調(diào)用過(guò)程中會(huì)去中斷worker內(nèi)部的工作線程,Work的互斥鎖可以保證正在執(zhí)行的任務(wù)不被打斷。它是怎么保證的呢?在線程真正執(zhí)行任務(wù)的時(shí)候,也就是runWorker方法被調(diào)用時(shí),它會(huì)先獲取到Work的鎖,當(dāng)我們?cè)谄渌€程需要中斷當(dāng)前線程時(shí)也需要獲取到work的互斥鎖,否則不能中斷。

          構(gòu)造函數(shù)

              public ThreadPoolExecutor(int corePoolSize,
                                        int maximumPoolSize,
                                        long keepAliveTime,
                                        TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue,
                                        ThreadFactory threadFactory,
                                        RejectedExecutionHandler handler)
           
          {
                  if (corePoolSize < 0 ||
                      maximumPoolSize <= 0 ||
                      maximumPoolSize < corePoolSize ||
                      keepAliveTime < 0)
                      throw new IllegalArgumentException();
                  if (workQueue == null || threadFactory == null || handler == null)
                      throw new NullPointerException();
                  this.corePoolSize = corePoolSize;
                  this.maximumPoolSize = maximumPoolSize;
                  this.workQueue = workQueue;
                  this.keepAliveTime = unit.toNanos(keepAliveTime);
                  this.threadFactory = threadFactory;
                  this.handler = handler;
              }

          通過(guò)構(gòu)造函數(shù)我們可以發(fā)現(xiàn),構(gòu)造函數(shù)就是在對(duì)線程池核心屬性進(jìn)行賦值,下面我們來(lái)介紹一下這些核心屬性:

          • corePoolSize:核心線程數(shù)
          • maximumPoolSize:線程池最大數(shù)量
          • keepAliveTime:線程池的工作線程空閑后,保持存活的時(shí)間。
          • unit:線程活動(dòng)保持時(shí)間的單位。
          • workQueue:用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列,具體可以參考JAVA并發(fā)容器-阻塞隊(duì)列
          • threadFactory:用于設(shè)置創(chuàng)建線程的工廠
          • handler:飽和策略,默認(rèn)情況下是AbortPolicy。

          execute() 提交線程

          public void execute(Runnable command) {
              if (command == null)
                  throw new NullPointerException();
              // 獲取控制的值
              int c = ctl.get();
              // 判斷工作線程數(shù)是否小于corePoolSize
              if (workerCountOf(c) < corePoolSize) {
                  // 新創(chuàng)建核心線程
                  if (addWorker(command, true))
                      return;
                  c = ctl.get();
              }
              // 工作線程數(shù)大于或等于corePoolSize
              // 判斷線程池是否處于運(yùn)行狀態(tài),如果是將任務(wù)command入隊(duì)
              if (isRunning(c) && workQueue.offer(command)) {
                  int recheck = ctl.get();
                  // 再次檢查線程池的運(yùn)行狀態(tài),如果不在運(yùn)行中,那么將任務(wù)從隊(duì)列里面刪除,并嘗試結(jié)束線程池
                  if (! isRunning(recheck) && remove(command))
                      // 調(diào)用驅(qū)逐策略
                      reject(command);
                  // 檢查活躍線程總數(shù)是否為0
                  else if (workerCountOf(recheck) == 0)
                      // 新創(chuàng)建非核心線程
                      addWorker(nullfalse);
              }
              // 隊(duì)列滿了,新創(chuàng)建非核心線程
              else if (!addWorker(command, false))
                  // 調(diào)用驅(qū)逐策略
                  reject(command);
          }

          該方法是沒(méi)有返回值的

          addWorker() 新創(chuàng)建線程

          private boolean addWorker(Runnable firstTask, boolean core) {
              retry:
              for (;;) {
                  int c = ctl.get();
                  int rs = runStateOf(c);

                  // 僅在必要的時(shí)候檢查隊(duì)列是否為NULL
                  // 檢查隊(duì)列是否處于非運(yùn)行狀態(tài)
                  if (rs >= SHUTDOWN &&
                      ! (rs == SHUTDOWN &&
                         firstTask == null &&
                         ! workQueue.isEmpty()))
                      return false;

                  for (;;) {
                      // 獲取活躍線程數(shù)
                      int wc = workerCountOf(c);
                      // 判斷線程是否超過(guò)最大值,當(dāng)隊(duì)列滿了則驗(yàn)證線程數(shù)是否大于maximumPoolSize,
                      // 沒(méi)有滿則驗(yàn)證corePoolSize
                      if (wc >= CAPACITY ||
                          wc >= (core ? corePoolSize : maximumPoolSize))
                          return false;
                      // 增加活躍線程總數(shù),否則重試
                      if (compareAndIncrementWorkerCount(c))
                          // 如果成功跳出外層循環(huán)
                          break retry;
                      c = ctl.get();  // Re-read ctl
                      // 再次校驗(yàn)一下線程池運(yùn)行狀態(tài)
                      if (runStateOf(c) != rs)
                          continue retry;
                      // else CAS failed due to workerCount change; retry inner loop
                  }
              }

              // 工作線程是否啟動(dòng)
              boolean workerStarted = false;
              // 工作線程是否創(chuàng)建
              boolean workerAdded = false;
              Worker w = null;
              try {
                  // 新創(chuàng)建線程
                  w = new Worker(firstTask);
                  // 獲取新創(chuàng)建的線程
                  final Thread t = w.thread;
                  if (t != null) {
                      // 創(chuàng)建線程要獲得全局鎖
                      final ReentrantLock mainLock = this.mainLock;
                      mainLock.lock();
                      try {
                          // Recheck while holding lock.
                          // Back out on ThreadFactory failure or if
                          // shut down before lock acquired.
                          int rs = runStateOf(ctl.get());
                          // 檢查線程池的運(yùn)行狀態(tài)
                          if (rs < SHUTDOWN ||
                              (rs == SHUTDOWN && firstTask == null)) {
                              // 檢查線程的狀態(tài)
                              if (t.isAlive()) // precheck that t is startable
                                  throw new IllegalThreadStateException();
                              // 將新建工作線程存放到容器
                              workers.add(w);
                              int s = workers.size();
                              if (s > largestPoolSize) {
                                  // 跟蹤線程池最大的工作線程總數(shù)
                                  largestPoolSize = s;
                              }
                              workerAdded = true;
                          }
                      } finally {
                          mainLock.unlock();
                      }
                      // 啟動(dòng)工作線程
                      if (workerAdded) {
                          t.start();
                          workerStarted = true;
                      }
                  }
              } finally {
                  if (! workerStarted)
                      // 啟動(dòng)新的工作線程失敗,
                      // 1. 將工作線程移除workers容器
                      // 2. 還原工作線程總數(shù)(workerCount)
                      // 3. 嘗試結(jié)束線程
                      addWorkerFailed(w);
              }
              return workerStarted;
          }

          如果啟動(dòng)新線程失敗那么addWorkerFailed()這個(gè)方法將做一下三件事:

          1. 將工作線程移除workers容器
          2. 還原工作線程總數(shù)(workerCount)
          3. 嘗試結(jié)束線程

          execute() 執(zhí)行過(guò)程

          1. 如果當(dāng)前運(yùn)行的線程少于corePoolSize,即使有空閑線程也會(huì)創(chuàng)建新線程來(lái)執(zhí)行任務(wù),(注意,執(zhí)行這一步驟 需要獲取全局鎖)。如果調(diào)用了線程池的restartAllCoreThreads()方法, 線程池會(huì)提前創(chuàng)建并啟動(dòng)所有基本線程。
          2. 如果運(yùn)行的線程等于或多于corePoolSize,則將任務(wù)加入BlockingQueue。
          3. 如果無(wú)法將任務(wù)加入BlockingQueue(隊(duì)列已滿),則創(chuàng)建新的線程來(lái)處理任務(wù)(注意,執(zhí) 行這一步驟需要獲取全局鎖)。
          4. 如果創(chuàng)建新線程將使當(dāng)前運(yùn)行的線程超出maximumPoolSize,任務(wù)將被拒絕,并調(diào)用 RejectedExecutionHandler.rejectedExecution()方法。

          線程任務(wù)的執(zhí)行

          線程的正在執(zhí)行是ThreadPoolExecutor.Worker#run()方法,但是這個(gè)方法直接委托給了外部的runWorker()方法,源碼如下:

          // 直接委托給外部runworker方法
          public void run() {
              runWorker(this);
          }

          runWorker() 執(zhí)行任務(wù)

          final void runWorker(Worker w) {
              // 當(dāng)前Work中的工作線程
              Thread wt = Thread.currentThread();
              // 獲取初始任務(wù)
              Runnable task = w.firstTask;
              // 初始任務(wù)置NULL(表示不是建線程)
              w.firstTask = null;
              // 修改鎖的狀態(tài),使需發(fā)起中斷的線程可以獲取到鎖(使工作線程可以響應(yīng)中斷)
              w.unlock(); // allow interrupts
              // 工作線程是否是異常結(jié)束
              boolean completedAbruptly = true;
              try {
                  // 循環(huán)的從隊(duì)列里面獲取任務(wù)
                  while (task != null || (task = getTask()) != null) {
                      // 每次執(zhí)行任務(wù)時(shí)需要獲取到內(nèi)置的互斥鎖
                      w.lock();
                      // 1. 當(dāng)前工作線程不是中斷狀態(tài),且線程池是STOP,TIDYING,TERMINATED狀態(tài),我們需要中斷當(dāng)前工作線程
                      // 2. 當(dāng)前工作線程是中斷狀態(tài),且線程池是STOP,TIDYING,TERMINATED狀態(tài),我們需要中斷當(dāng)前工作線程
                      if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                              && !wt.isInterrupted())
                          // 中斷線程,中斷標(biāo)志位設(shè)置成true
                          wt.interrupt();
                      try {
                          // 執(zhí)行任務(wù)前置方法,擴(kuò)展用
                          beforeExecute(wt, task);
                          Throwable thrown = null;
                          try {
                              // 執(zhí)行任務(wù)
                              task.run();
                          } catch (RuntimeException x) {
                              thrown = x; throw x;
                          } catch (Error x) {
                              thrown = x; throw x;
                          } catch (Throwable x) {
                              thrown = x; throw new Error(x);
                          } finally {
                              // 執(zhí)行任務(wù)后置方法,擴(kuò)展用
                              afterExecute(task, thrown);
                          }
                      } finally {
                          // 任務(wù)NULL表示已經(jīng)處理了
                          task = null;
                          w.completedTasks++;
                          w.unlock();
                      }
                  }
                  completedAbruptly = false;
              } finally {
                  // 將工作線程從容器中剔除
                  processWorkerExit(w, completedAbruptly);
              }
          }

          正在執(zhí)行線程的方法,執(zhí)行流程:

          1. 獲取到當(dāng)前的工作線程
          2. 獲取初始化的線程任務(wù)
          3. 修改鎖的狀態(tài),使工作線程可以響應(yīng)中斷
          4. 獲取工作線程的鎖(保證在任務(wù)執(zhí)行過(guò)程中工作線程不被外部線程中斷),如果獲取到的任務(wù)是NULL,則結(jié)束當(dāng)前工作線程
          5. 判斷先測(cè)試狀態(tài),看是否需要中斷當(dāng)前工作線程
          6. 執(zhí)行任務(wù)前置方法beforeExecute(wt, task);
          7. 執(zhí)行任務(wù)(執(zhí)行提交到線程池的線程)task.run();
          8. 執(zhí)行任務(wù)后置方法afterExecute(task, thrown);,處理異常信息
          9. 修改完成任務(wù)的總數(shù)
          10. 解除當(dāng)前工作線程的鎖
          11. 獲取隊(duì)列里面的任務(wù),循環(huán)第4步
          12. 將工作線程從容器中剔除
          • wt.isInterrupted():獲取中斷狀態(tài),無(wú)副作用
          • Thread.interrupted():獲取中斷狀態(tài),并將中斷狀態(tài)恢重置成false(不中斷)
          • beforeExecute(wt, task);:執(zhí)行任務(wù)前置方法,擴(kuò)展用。如果這個(gè)方法在執(zhí)行過(guò)程中拋出異常,那么會(huì)導(dǎo)致當(dāng)前工作線程直接死亡而被回收,工作線程異常結(jié)束標(biāo)記位completedAbruptly被設(shè)置成true,任務(wù)線程不能被執(zhí)行
          • task.run();:執(zhí)行任務(wù)
          • afterExecute(task, thrown);:執(zhí)行任務(wù)后置方法,擴(kuò)展用。這個(gè)方法可以收集到任務(wù)運(yùn)行的異常信息,這個(gè)方法如果有異常拋出,也會(huì)導(dǎo)致當(dāng)前工作線程直接死亡而被回收,工作線程異常結(jié)束標(biāo)記位completedAbruptly被設(shè)置成true
          • 任務(wù)運(yùn)行過(guò)程中的異常信息除了RuntimeException以外,其他全部封裝成Error,然后被afterExecute方法收集
          • terminated()這也是一個(gè)擴(kuò)展方法,在線程池結(jié)束的時(shí)候調(diào)用

          getTask() 獲取任務(wù)

          private Runnable getTask() {
              // 記錄最后一次獲取任務(wù)是不是超時(shí)了
              boolean timedOut = false// Did the last poll() time out?

              for (;;) {
                  int c = ctl.get();
                  // 獲取線程池狀態(tài)
                  int rs = runStateOf(c);

                  // 線程池是停止?fàn)顟B(tài)或者狀態(tài)是關(guān)閉并且隊(duì)列為空
                  if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                      // 扣減工作線程總數(shù)
                      decrementWorkerCount();
                      return null;
                  }
                  // 獲取工作線程總數(shù)
                  int wc = workerCountOf(c);

                  // 工作線程是否需要剔除
                  boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

                  if ((wc > maximumPoolSize || (timed && timedOut))
                      && (wc > 1 || workQueue.isEmpty())) {
                      // 扣減工作線程總數(shù)
                      if (compareAndDecrementWorkerCount(c))
                          // 剔除工作線程,當(dāng)返回為NULL的時(shí)候,runWorker方法的while循環(huán)會(huì)結(jié)束
                          return null;
                      continue;
                  }

                  try {
                      Runnable r = timed ?
                          workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                          workQueue.take();
                      if (r != null)
                          return r;
                      timedOut = true;
                  } catch (InterruptedException retry) {
                      timedOut = false;
                  }
              }
          }

          getTask() 阻塞或定時(shí)獲取任務(wù)。當(dāng)該方法返回NULL時(shí),當(dāng)前工作線程會(huì)結(jié)束,最后被回收,下面是返回NULL的幾種情況:

          1. 當(dāng)前工作線程總數(shù)wc大于maximumPoolSize最大工作線程總數(shù)。maximumPoolSize可能被setMaximumPoolSize方法改變。
          2. 當(dāng)線程池處于停止?fàn)顟B(tài)時(shí)。
          3. 當(dāng)線程池處于關(guān)閉狀態(tài)且阻塞隊(duì)列為空。
          4. 當(dāng)前工作線程超時(shí)等待任務(wù),并且當(dāng)前工作線程總數(shù)wc大于corePoolSize或者allowCoreThreadTimeOut=true允許核心線程超時(shí)被回收,默認(rèn)是false。

          線程池在運(yùn)行過(guò)程中可以調(diào)用setMaximumPoolSize()方法來(lái)修改maximumPoolSize值,新的值必須大于corePoolSize,如果新的maximumPoolSize小于原來(lái)的值,那么在該方法會(huì)去中斷當(dāng)前的空閑線程(工作線程內(nèi)置鎖的是解鎖狀態(tài)的線程為空閑線程)。

          processWorkerExit() 工作線程結(jié)束

          private void processWorkerExit(Worker w, boolean completedAbruptly) {
              // 判斷是否是異常情況導(dǎo)致工作線程被回收
              if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
                  // 如果是扣減工作線程總數(shù),如果不是在getTask()方法就已經(jīng)扣減了
                  decrementWorkerCount();

              final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              try {
                  // 將當(dāng)前工作線程完成任務(wù)的總數(shù)加到completedTaskCount標(biāo)志位上
                  completedTaskCount += w.completedTasks;
                  // 剔除當(dāng)前工作線程
                  workers.remove(w);
              } finally {
                  mainLock.unlock();
              }
              // 嘗試結(jié)束線程池
              tryTerminate();

              // 判刑是否需要新實(shí)例化工程線程
              int c = ctl.get();
              if (runStateLessThan(c, STOP)) {
                  if (!completedAbruptly) {
                      int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                      if (min == 0 && ! workQueue.isEmpty())
                          min = 1;
                      if (workerCountOf(c) >= min)
                          return// replacement not needed
                  }
                  addWorker(nullfalse);
              }
          }

          剔除線程流程:

          1. 判斷是否是異常情況導(dǎo)致工作線程被回收,如果是workerCount--
          2. 獲取到全局鎖
          3. 將當(dāng)前工作線程完成任務(wù)的總數(shù)加到completedTaskCount標(biāo)志位上
          4. 剔除工作線程
          5. 解鎖
          6. 嘗試結(jié)束線程池tryTerminate()
          7. 判刑是否需要重新實(shí)例化工程線程放到workers容器

          結(jié)束線程池

          shutdown() 關(guān)閉線程池

          public void shutdown() {
              final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              try {
                  // 檢查權(quán)限
                  checkShutdownAccess();
                  // 設(shè)置線程池狀態(tài)為關(guān)閉
                  advanceRunState(SHUTDOWN);
                  // 中斷線程
                  interruptIdleWorkers();
                  // 擴(kuò)展方法
                  onShutdown(); // hook for ScheduledThreadPoolExecutor
              } finally {
                  mainLock.unlock();
              }
              // 嘗試結(jié)束線池
              tryTerminate();
          }
          • 通過(guò)遍歷工作線程容器workers,然后逐個(gè)中斷工作線程,如果無(wú)法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無(wú)法終止
          • shutdown只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài),然后中斷所有沒(méi)有正在執(zhí)行任務(wù)的線程。

          shutdown() 關(guān)閉線程池

          public List<Runnable> shutdownNow() {
              List<Runnable> tasks;
              final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              try {
                  // 檢查權(quán)限
                  checkShutdownAccess();
                  // 設(shè)置線程池狀態(tài)為停止?fàn)顟B(tài)
                  advanceRunState(STOP);
                  // 中斷線程
                  interruptIdleWorkers();
                  // 將所有任務(wù)移動(dòng)到list容器
                  tasks = drainQueue();
              } finally {
                  mainLock.unlock();
              }
              // 嘗試結(jié)束線池
              tryTerminate();
              // 返回所有未執(zhí)行的任務(wù)
              return tasks;
          }
          • 通過(guò)遍歷工作線程容器workers,然后逐個(gè)中斷工作線程,如果無(wú)法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無(wú)法終止
          • shutdownNow首先將線程池的狀態(tài)設(shè)置成 STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表

          tryTerminate() 嘗試結(jié)束線程池

          final void tryTerminate() {
              for (;;) {
                  int c = ctl.get();
                  //  判斷是否在運(yùn)行中,如果是直接返回
                  if (isRunning(c) ||
                      // 判斷是否進(jìn)入整理狀態(tài),如果進(jìn)入了直接返回
                      runStateAtLeast(c, TIDYING) ||
                      // 如果是狀態(tài)是關(guān)閉并且隊(duì)列非空,也直接返回(關(guān)閉狀態(tài)需要等到隊(duì)列里面的線程處理完)
                      (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                      return;
                  // 判斷工作線程是否都關(guān)閉了
                  if (workerCountOf(c) != 0) { // Eligible to terminate
                      // 中斷空閑線程
                      interruptIdleWorkers(ONLY_ONE);
                      return;
                  }

                  final ReentrantLock mainLock = this.mainLock;
                  mainLock.lock();
                  try {
                      // 將狀態(tài)替換成整理狀態(tài)
                      if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                          try {
                              // 整理發(fā)放執(zhí)行
                              terminated();
                          } finally {
                              // 狀態(tài)替換成結(jié)束狀態(tài)
                              ctl.set(ctlOf(TERMINATED, 0));
                              termination.signalAll();
                          }
                          return;
                      }
                  } finally {
                      mainLock.unlock();
                  }
                  // else retry on failed CAS
              }
          }

          結(jié)束線程池大致流程為:

          1. 判斷是否在運(yùn)行中,如果是則不結(jié)束線程
          2. 判斷是否進(jìn)入整理狀態(tài),如果是也不用執(zhí)行后面內(nèi)容了
          3. 判斷如果線程池是關(guān)閉狀態(tài)并且隊(duì)列非空,則不結(jié)束線程池(關(guān)閉狀態(tài)需要等到隊(duì)列里面的線程處理完)
          4. 判斷工作線程是否都關(guān)閉了,如果沒(méi)有就發(fā)起中斷工作線程的請(qǐng)求
          5. 獲取全局鎖將線程池狀態(tài)替換成整理狀態(tài)
          6. 調(diào)用terminated();擴(kuò)展方法(這也是一個(gè)擴(kuò)展方法,在線程池結(jié)束的時(shí)候調(diào)用)
          7. 將線程池狀態(tài)替換成結(jié)束狀態(tài)
          8. 解除全局鎖
          • 注意:
          • 我們可以通過(guò)的shutdownshutdownNow方法來(lái)結(jié)束線程池。他們都是通過(guò)遍歷工作線程容器,然后逐個(gè)中斷工作線程,所以無(wú)法響應(yīng)中斷的任務(wù) 可能永遠(yuǎn)無(wú)法終止。
          • shutdownshutdownNow的區(qū)別在于:shutdownNow首先將線程池的狀態(tài)設(shè)置成 STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表;而 shutdown只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài),然后中斷所有沒(méi)有正在執(zhí)行任務(wù)的線 程。
          • 只要調(diào)用了shutdownshutdownNow那么isShutdown方法就會(huì)返回true
          • 當(dāng)所有的任務(wù)都已關(guān)閉后,才表示線程池關(guān)閉成功,這時(shí)調(diào)用isTerminaed方法會(huì)返回true

          線程池的監(jiān)控

          通過(guò)擴(kuò)展線程池進(jìn)行監(jiān)控。可以通過(guò)繼承線程池來(lái)自定義線程池,重寫(xiě)線程池的 beforeExecute、afterExecute和terminated方法,也可以在任務(wù)執(zhí)行前、執(zhí)行后和線程池關(guān)閉前執(zhí) 行一些代碼來(lái)進(jìn)行監(jiān)控。例如,監(jiān)控任務(wù)的平均執(zhí)行時(shí)間、最大執(zhí)行時(shí)間和最小執(zhí)行時(shí)間等。這幾個(gè)方法在線程池里是空方法。

          getTaskCount()

          public long getTaskCount() {
              final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              try {
                  long n = completedTaskCount;
                  for (Worker w : workers) {
                      n += w.completedTasks;
                      if (w.isLocked())
                          ++n;
                  }
                  return n + workQueue.size();
              } finally {
                  mainLock.unlock();
              }
          }

          獲取線程池需要執(zhí)行的任務(wù)數(shù)量。總數(shù)=已經(jīng)結(jié)束線工作程完成的任務(wù)數(shù)(completedTaskCount) + 還未結(jié)束線程工作線程完成的任務(wù)數(shù)(w.completedTasks)+正在執(zhí)行的任務(wù)數(shù)(w.isLocked())+還未執(zhí)行的任務(wù)數(shù)(workQueue.size())

          getCompletedTaskCount()

          public long getCompletedTaskCount() {
              final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              try {
                  long n = completedTaskCount;
                  for (Worker w : workers)
                      n += w.completedTasks;
                  return n;
              } finally {
                  mainLock.unlock();
              }
          }

          獲取線程池在運(yùn)行過(guò)程中已完成的任務(wù)數(shù)量。總數(shù)=已經(jīng)結(jié)束線工作程完成的任務(wù)數(shù)(completedTaskCount) + 還未結(jié)束線程工作線程完成的任務(wù)數(shù)(w.completedTasks)

          getLargestPoolSize()

          public int getLargestPoolSize() {
              final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              try {
                  return largestPoolSize;
              } finally {
                  mainLock.unlock();
              }
          }

          獲取線程池里曾經(jīng)創(chuàng)建過(guò)的最大線程數(shù)量。通過(guò)這個(gè)數(shù)據(jù)可以知道線程池是 否曾經(jīng)滿過(guò)。如該數(shù)值等于線程池的最大大小,則表示線程池曾經(jīng)滿過(guò)。

          getPoolSize()

          public int getPoolSize() {
              final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              try {
                  // Remove rare and surprising possibility of
                  // isTerminated() && getPoolSize() > 0
                  return runStateAtLeast(ctl.get(), TIDYING) ? 0
                      : workers.size();
              } finally {
                  mainLock.unlock();
              }
          }

          獲取線程池的線程數(shù)量。如果線程池不銷毀的話,線程池里的線程不會(huì)自動(dòng)銷 毀,所以這個(gè)大小只增不減。

          getActiveCount()

          public int getActiveCount() {
              final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              try {
                  int n = 0;
                  for (Worker w : workers)
                      if (w.isLocked())
                          ++n;
                  return n;
              } finally {
                  mainLock.unlock();
              }
          }

          獲取活動(dòng)的線程數(shù)。

          合理地配置線程池

          要想合理地配置線程池,就必須首先分析任務(wù)特性,可以從以下幾個(gè)角度來(lái)分析。

          • 任務(wù)的性質(zhì):CPU密集型任務(wù)、IO密集型任務(wù)和混合型任務(wù)。
          • 任務(wù)的優(yōu)先級(jí):高、中和低。
          • 任務(wù)的執(zhí)行時(shí)間:長(zhǎng)、中和短。
          • 任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫(kù)連接。

          性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開(kāi)處理。CPU密集型任務(wù)應(yīng)配置盡可能小的 線程,如配置Ncpu+1個(gè)線程的線程池。由于IO密集型任務(wù)線程并不是一直在執(zhí)行任務(wù),則應(yīng)配 置盡可能多的線程,如2*Ncpu。混合型的任務(wù),如果可以拆分,將其拆分成一個(gè)CPU密集型任務(wù) 和一個(gè)IO密集型任務(wù),只要這兩個(gè)任務(wù)執(zhí)行的時(shí)間相差不是太大,那么分解后執(zhí)行的吞吐量 將高于串行執(zhí)行的吞吐量。如果這兩個(gè)任務(wù)執(zhí)行時(shí)間相差太大,則沒(méi)必要進(jìn)行分解。

          優(yōu)先級(jí)不同的任務(wù)可以使用優(yōu)先級(jí)隊(duì)列PriorityBlockingQueue來(lái)處理。它可以讓優(yōu)先級(jí)高 的任務(wù)先執(zhí)行。

          • 如果一直有優(yōu)先級(jí)高的任務(wù)提交到隊(duì)列里,那么優(yōu)先級(jí)低的任務(wù)可能永遠(yuǎn)不能 執(zhí)行。
          • 可以通過(guò) Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個(gè)數(shù)。
          • 建議使用有界隊(duì)列。有界隊(duì)列能增加系統(tǒng)的穩(wěn)定性和預(yù)警能力,可以根據(jù)需要設(shè)大一點(diǎn) 兒,比如幾千。無(wú)界隊(duì)列在某些異常情況下可能會(huì)撐爆內(nèi)存。

          N核服務(wù)器,通過(guò)執(zhí)行業(yè)務(wù)的單線程分析出本地計(jì)算時(shí)間為x,等待時(shí)間為y,則工作線程數(shù)(線程池線程數(shù))設(shè)置為 N*(x+y)/x,能讓CPU的利用率最大化,詳情可以參考線程數(shù)究竟設(shè)多少合理

          參考

          《java并發(fā)編程的藝術(shù)》

          源碼

          https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

          spring-boot-student-concurrent 工程

          干貨分享

          最近將個(gè)人學(xué)習(xí)筆記整理成冊(cè),使用PDF分享。關(guān)注我,回復(fù)如下代碼,即可獲得百度盤(pán)地址,無(wú)套路領(lǐng)取!

          ?001:《Java并發(fā)與高并發(fā)解決方案》學(xué)習(xí)筆記;?002:《深入JVM內(nèi)核——原理、診斷與優(yōu)化》學(xué)習(xí)筆記;?003:《Java面試寶典》?004:《Docker開(kāi)源書(shū)》?005:《Kubernetes開(kāi)源書(shū)》?006:《DDD速成(領(lǐng)域驅(qū)動(dòng)設(shè)計(jì)速成)》?007:全部?008:加技術(shù)群討論

          近期熱文

          ?LinkedBlockingQueue vs ConcurrentLinkedQueue?解讀Java 8 中為并發(fā)而生的 ConcurrentHashMap?Redis性能監(jiān)控指標(biāo)匯總?最全的DevOps工具集合,再也不怕選型了!?微服務(wù)架構(gòu)下,解決數(shù)據(jù)庫(kù)跨庫(kù)查詢的一些思路?聊聊大廠面試官必問(wèn)的 MySQL 鎖機(jī)制

          關(guān)注我

          喜歡就點(diǎn)個(gè)"在看"唄^_^

          瀏覽 70
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  91AV片 | 通野未帆一区二区三区 | 国产射精视频 | 艹逼美女a级毛片 | 国产久久久久久 |