<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          詳述Java線程池實(shí)現(xiàn)原理

          共 66713字,需瀏覽 134分鐘

           ·

          2021-06-09 06:48

          點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”

          優(yōu)質(zhì)文章,第一時(shí)間送達(dá)

          一、寫(xiě)在前面

          1.1 線程池是什么

          線程池(Thread Pool) 是一種池化思想管理線程的工具,經(jīng)常出現(xiàn)在多線程服務(wù)器中,如MySQL。

          線程過(guò)多會(huì)帶來(lái)額外的開(kāi)銷,其中包括創(chuàng)建銷毀線程的開(kāi)銷,操作系統(tǒng)調(diào)度線程的開(kāi)銷等等,同時(shí)也降低了計(jì)算機(jī)的整體性能。線程池維護(hù)多個(gè)線程,等待監(jiān)督管理者分配可并發(fā)執(zhí)行的任務(wù)。這種做法,一方面避免了處理任務(wù)是創(chuàng)建銷毀線程開(kāi)銷代價(jià),另一方面避免了線程數(shù)量膨脹導(dǎo)致的過(guò)分調(diào)度問(wèn)題,保證了對(duì)操作系統(tǒng)內(nèi)核的充分利用。

          本文描述的線程池是JDK提供的ThreadPoolExecutor類

          使用線程池帶來(lái)的好處

          • 降低資源消耗:通過(guò)赤化技術(shù)重復(fù)利用已創(chuàng)建的線程,降低想成創(chuàng)建和 銷毀造成的消耗

          • 提高響應(yīng)速度:任務(wù)到達(dá)時(shí),無(wú)需等待線程創(chuàng)建即可立即執(zhí)行

          • 提高線程的可管理性:線程是稀缺資源,如果無(wú)限制創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)因?yàn)榫€程的不合理分配導(dǎo)致資源調(diào)度失衡,降低系統(tǒng)的穩(wěn)定性。使用線程池可以進(jìn)行統(tǒng)一的分配、調(diào)優(yōu)和監(jiān)控

          • 提供更多更強(qiáng)大的功能:線程池具備可拓展性,允許開(kāi)發(fā)人員向其中增加風(fēng)多的功能。比如延時(shí)定時(shí)線程池ScheduledThreadPoolExecutor,就允許任務(wù)延期執(zhí)行或定期執(zhí)行

          1.2 線程池解決的問(wèn)題是什么

          線程池解決的問(wèn)題就是資源管理的問(wèn)題。在并發(fā)環(huán)境下,系統(tǒng)不能夠確定在任意時(shí)刻有多少任務(wù)需要執(zhí)行,有多少資源需要投入。

          在這種不確定性下將會(huì)帶來(lái)以下若干的問(wèn)題

          • 頻繁申請(qǐng)/銷毀資源和調(diào)度資源,將帶來(lái)額外的開(kāi)銷,可能是非常巨大的

          • 對(duì)資源無(wú)限申請(qǐng)缺少抑制手段,容易引發(fā)系統(tǒng)資源耗盡問(wèn)題的風(fēng)險(xiǎn)

          • 系統(tǒng)無(wú)法合理管理內(nèi)部的資源分布,會(huì)減低系統(tǒng)的穩(wěn)定性

          為了解決資源分配的問(wèn)題,線程池采用“池化”(Pooling)思想。池化,顧名思義,是為了做大化收益并最小化風(fēng)險(xiǎn),而將資源統(tǒng)一在一起管理的一種思想。

          在計(jì)算機(jī)領(lǐng)域池化技術(shù)表現(xiàn)為:統(tǒng)一管理IT資源,包括服務(wù)器資源、存儲(chǔ)、網(wǎng)絡(luò)資源等。通過(guò)共享資源,使用戶在第投入中獲益。

          除去線程池其他比較典型的幾種使用策略包括

          • 內(nèi)存池(Memory Pooling):預(yù)先申請(qǐng)內(nèi)存,提升申請(qǐng)內(nèi)存的速度,減少內(nèi)存碎片

          • 連接池(Connection Pooling):預(yù)先申請(qǐng)數(shù)據(jù)庫(kù)連接,提升申請(qǐng)連接的速度,降低系統(tǒng)開(kāi)銷

          • 實(shí)例池(Object Pooling):循環(huán)使用對(duì)象,減資源在初始化和釋放時(shí)昂貴的損耗

          二、線程池和核心設(shè)計(jì)與實(shí)現(xiàn)

          2.1 總體設(shè)計(jì)

          Java中線程池核心實(shí)現(xiàn)類是ThreadPoolExecutor,本章基于JDK1.8的源碼來(lái)分析Java線程池的核心設(shè)計(jì)與實(shí)現(xiàn)。首先看一下ThreadPoolExecutor的UML圖,了解ThreadPoolExecutor的繼承關(guān)系

          ThreadPoolExecutor實(shí)現(xiàn)的頂層接口是Executor,頂層接口Executor提供了一種思想:將任務(wù)提交和任務(wù)執(zhí)行進(jìn)行解耦。用戶無(wú)需關(guān)注如何創(chuàng)建線程,如何調(diào)度線程來(lái)執(zhí)行任務(wù),用戶只需提供Runnable對(duì)象,將任務(wù)的運(yùn)行邏輯提交到執(zhí)行器Executor中,由Executor框架完成線程的調(diào)配和任務(wù)的執(zhí)行部分。

          ExecutorService

          • 擴(kuò)充執(zhí)行任務(wù)的能力,補(bǔ)充可以為一個(gè)或者一批異步任務(wù)生成Future的方法

          • 提供了管理線程池的方法,比如停止線程池的運(yùn)行

          AbstractExecutorService

          • 串聯(lián)任務(wù)流程,保證下層的實(shí)現(xiàn)只需要關(guān)注一個(gè)執(zhí)行任務(wù)的方法

          ThreadPoolExecutor

          • 維護(hù)自身的生命周期

          • 管理線程和任務(wù),使兩者良好的結(jié)合從而執(zhí)行并行任務(wù)

          ThreadPoolExecutor是如何運(yùn)行,如何同時(shí)維護(hù)線程和執(zhí)行任務(wù)的呢?其運(yùn)行機(jī)制如下圖所示

          ThreadPoolExecutor運(yùn)行流程


          線程池在內(nèi)部實(shí)際上構(gòu)造了一個(gè)生產(chǎn)者消費(fèi)者模型,將線程和任務(wù)兩者解耦,并不直接關(guān)聯(lián),從而良好的管理緩沖任務(wù),復(fù)用線程。線程池的運(yùn)行主要分成兩部分::任務(wù)管理、線程管理。任務(wù)管理充當(dāng)生產(chǎn)者角色,當(dāng)任務(wù)提交后,線程池會(huì)判斷該任務(wù)后續(xù)流轉(zhuǎn)

          • 任務(wù)申請(qǐng)線程執(zhí)行該任務(wù)

          • 緩沖到隊(duì)列中等待線程執(zhí)行

          • 拒絕該任務(wù)

          線程管理部分是消費(fèi)者,它們被統(tǒng)一維護(hù)在線程池內(nèi),根據(jù)任務(wù)請(qǐng)求進(jìn)行線程的分配,當(dāng)線程執(zhí)行完任務(wù)后會(huì)繼續(xù)獲取新的任務(wù)執(zhí)行,最終獲取不到任務(wù)的時(shí)候,線程會(huì)被回收。

          接下來(lái)按照如下三個(gè)方面講解線程池的運(yùn)行機(jī)制:

          • 線程池如何維護(hù)自身狀態(tài)

          • 線程池如何管理任務(wù)

          • 線程池如何管理線程

          2.2 生命周期管理

          線程池運(yùn)行的狀態(tài),并不是用戶顯式設(shè)置的,而是伴隨著線程池的運(yùn)行,由內(nèi)部來(lái)維 護(hù)。線程池內(nèi)部使用一個(gè)變量維護(hù)兩個(gè)值:運(yùn)行狀態(tài) (runState) 和線程數(shù)量 (workerCount)。在具體實(shí)現(xiàn)中,線程池將運(yùn)行狀態(tài) (runState)、線程數(shù)量 (workerCount)

          兩個(gè)關(guān)鍵參數(shù)的維護(hù)放在了一起,如下代碼所示:

          private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));


          ctl 這個(gè) AtomicInteger 類型,是對(duì)線程池的運(yùn)行狀態(tài)和線程池中有效線程的數(shù)量 進(jìn)行控制的一個(gè)字段,它同時(shí)包含兩部分的信息:線程池的運(yùn)行狀態(tài) (runState) 和 線程池內(nèi)有效線程的數(shù)量 (workerCount),高 3 位保存 runState,低 29 位保存 workerCount,兩個(gè)變量之間互不干擾。用一個(gè)變量去存儲(chǔ)兩個(gè)值,可避免在做相關(guān) 決策時(shí),出現(xiàn)不一致的情況,不必為了維護(hù)兩者的一致,而占用鎖資源。通過(guò)閱讀線 程池源代碼也可以發(fā)現(xiàn),經(jīng)常出現(xiàn)要同時(shí)判斷線程池運(yùn)行狀態(tài)和線程數(shù)量的情況。線程池也提供了若干方法去供用戶獲得線程池當(dāng)前的運(yùn)行狀態(tài)、線程個(gè)數(shù)。這里都使用的是位運(yùn)算的方式,相比于基本運(yùn)算,速度也會(huì)快很多

          關(guān)于內(nèi)部封裝的獲取生命周期狀態(tài)、獲取線程池線程數(shù)量的計(jì)算方法如以下代碼 所示:

          // Packing and unpacking ctl
          // 計(jì)算當(dāng)前運(yùn)行狀態(tài)
          private static int runStateOf(int c)     { return c & ~CAPACITY; } 
          // 計(jì)算當(dāng)前線程數(shù)據(jù)
          private static int workerCountOf(int c)  { return c & CAPACITY; }
          // 通過(guò)狀態(tài)和線程數(shù)生成ctl
          private static int ctlOf(int rs, int wc) { return rs | wc; }

          ThreadPoolExecutor 的運(yùn)行狀態(tài)有 5 種,分別為:

          // runState is stored in the high-order bits
          private static final int RUNNING    = -1 << COUNT_BITS;
          private static final int SHUTDOWN   =  0 << COUNT_BITS;
          private static final int STOP       =  1 << COUNT_BITS;
          private static final int TIDYING    =  2 << COUNT_BITS;
          private static final int TERMINATED =  3 << COUNT_BITS;

          運(yùn)行狀態(tài)狀態(tài)描述
          RUNNING能接受新提交的任務(wù),并且也能處理阻塞隊(duì)列中的任務(wù)
          SHUTDOWN狀態(tài)關(guān)閉,不在接受新提交的任務(wù),但是能繼續(xù)處理阻塞隊(duì)列已保存的讓任務(wù)
          STOP不接受新任務(wù),也不處理隊(duì)列中的任務(wù),會(huì)中斷正在處理任務(wù)的線程
          TIDYING所有讓任務(wù)都已終止,workerCount(有效處理讓任務(wù)線程)狀態(tài)為0
          TERMINATED在terminated()方法執(zhí)行結(jié)束后進(jìn)入該狀態(tài)

          其生命周期轉(zhuǎn)換如下圖所示

          線程池生命周期


          2.3 任務(wù)調(diào)度機(jī)制

          2.3.1 任務(wù)調(diào)度

          任務(wù)調(diào)度是線程池的主要入口,當(dāng)用戶提交了一個(gè)任務(wù),接下來(lái)這個(gè)任務(wù)將如何執(zhí)行 都是由這個(gè)階段決定的。了解這部分就相當(dāng)于了解了線程池的核心運(yùn)行機(jī)制。首先,所有任務(wù)的調(diào)度都是由 execute 方法完成的,這部分完成的工作是:檢查現(xiàn)在線程池的運(yùn)行狀態(tài)、運(yùn)行線程數(shù)、運(yùn)行策略,決定接下來(lái)執(zhí)行的流程,是直接申請(qǐng)線程執(zhí)行,或是緩沖到隊(duì)列中執(zhí)行,亦或是直接拒絕該任務(wù)。其執(zhí)行過(guò)程如下:

          • 首先檢測(cè)線程池運(yùn)行狀態(tài),如果不是 RUNNING,則直接拒絕,線程池要保證在 RUNNING 的狀態(tài)下執(zhí)行任務(wù)

          • 如果 workerCount < corePoolSize,則創(chuàng)建并啟動(dòng)一個(gè)線程來(lái)執(zhí)行新提交的任務(wù)

          • 如果 workerCount >= corePoolSize,且線程池內(nèi)的阻塞隊(duì)列未滿,則將任務(wù)添加到該阻塞隊(duì)列中。

          • 如 果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內(nèi)的阻塞隊(duì)列已滿,則創(chuàng)建并啟動(dòng)一個(gè)線程來(lái)執(zhí)行新提交的任務(wù)。

          • 如果 workerCount >= maximumPoolSize,并且線程池內(nèi)的阻塞隊(duì)列已滿 , 則根據(jù)拒絕策略來(lái)處理該任務(wù) , 默認(rèn)的處理方式是直接拋異常。

          其執(zhí)行流程如下

          任務(wù)調(diào)度流程圖


          2.3.2 任務(wù)緩沖

          任務(wù)緩沖模塊是線程池能夠管理任務(wù)的核心部分。線程池的本質(zhì)是對(duì)任務(wù)和線程的管 理,而做到這一點(diǎn)最關(guān)鍵的思想就是將任務(wù)和線程兩者解耦,不讓兩者直接關(guān)聯(lián),才 可以做后續(xù)的分配工作。線程池中是以生產(chǎn)者消費(fèi)者模式,通過(guò)一個(gè)阻塞隊(duì)列來(lái)實(shí)現(xiàn) 的。阻塞隊(duì)列緩存任務(wù),工作線程從阻塞隊(duì)列中獲取任務(wù)。

          阻塞隊(duì)列 (BlockingQueue) 是一個(gè)支持兩個(gè)附加操作的隊(duì)列。這兩個(gè)附加的操作是:在隊(duì)列為空時(shí),獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强铡.?dāng)隊(duì)列滿時(shí),存儲(chǔ)元素的線程 會(huì)等待隊(duì)列可用。阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場(chǎng)景,生產(chǎn)者是往隊(duì)列里添加元 素的線程,消費(fèi)者是從隊(duì)列里拿元素的線程。阻塞隊(duì)列就是生產(chǎn)者存放元素的容器,而消費(fèi)者也只從容器里拿元素。

          阻塞隊(duì)列


          使用不同的隊(duì)列可以實(shí)現(xiàn)不一樣的任務(wù)存取策略。在這里,我們可以再介紹下阻塞隊(duì)列的成員:

          名稱描述
          ArrayBlockingQueue一個(gè)用數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列,此隊(duì)列按照先進(jìn)先出(FIFO)的原則對(duì)元素進(jìn)行排序。支持公平鎖和非公平鎖
          LinkedBlockingDeque一個(gè)由鏈表結(jié)構(gòu)組成的有界隊(duì)列,此隊(duì)列按照先進(jìn)先出(FIFO)的原則對(duì)元素進(jìn)行排序。此隊(duì)列的默認(rèn)長(zhǎng)度為Integer.MAX_VALUE,所以默認(rèn)創(chuàng)建此隊(duì)列有容量危險(xiǎn)
          PriorityBlockingQueue一個(gè)支持線程優(yōu)先級(jí)排序的無(wú)界隊(duì)列,默認(rèn)自然進(jìn)行排序,也可以自定義實(shí)現(xiàn)compareTo()方法指定排序故障,不能保證同優(yōu)先級(jí)元素的順序。
          DelayQueue一個(gè)實(shí)現(xiàn)PriorityBlockingQueue實(shí)現(xiàn)延遲獲取的無(wú)界隊(duì)列,在創(chuàng)建元素時(shí),可以指定多久才能從隊(duì)列中獲取當(dāng)前元素。只有延遲期滿后才能從隊(duì)列中獲取元素。
          SynchronousQueue一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)put操作必須等待take操作,否則不能添加元素。支持公平鎖和非公平鎖。SynchronousQueue的一個(gè)使用場(chǎng)景是在線程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個(gè)線程池根據(jù)需要(新任務(wù)來(lái))創(chuàng)建新的線程,如果有空閑的線程就使用空閑線程,線程空閑60秒會(huì)被回收。
          return new ThreadPoolExecutor(
          0, 
          Integer.MAX_VALUE, 
          60L, TimeUnit.SECONDS, 
          new SynchronousQueue());
          LinkedTransferQueue一個(gè)由鏈表結(jié)構(gòu)組成的無(wú)界阻塞隊(duì)列,相當(dāng)于其他隊(duì)列,LinkedTransferQueue多了transfer和tryTransfer方法
          LinkedBlockingQueue一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列,隊(duì)列的頭部和尾部都可以插入和刪除元素,多線程并發(fā)時(shí),可以將鎖的競(jìng)爭(zhēng)最多降到一半

          2.3.3 任務(wù)申請(qǐng)

          由上文的任務(wù)分配部分可知,任務(wù)的執(zhí)行有兩種可能:一種是任務(wù)直接由新創(chuàng)建的線 程執(zhí)行。另一種是線程從任務(wù)隊(duì)列中獲取任務(wù)然后執(zhí)行,執(zhí)行完任務(wù)的空閑線程會(huì)再 次去從隊(duì)列中申請(qǐng)任務(wù)再去執(zhí)行。第一種情況僅出現(xiàn)在線程初始創(chuàng)建的時(shí)候,第二種 是線程獲取任務(wù)絕大多數(shù)的情況。

          線程需要從任務(wù)緩存模塊中不斷地取任務(wù)執(zhí)行,幫助線程從阻塞隊(duì)列中獲取任務(wù),實(shí)現(xiàn)線程管理模塊和任務(wù)管理模塊之間的通信。這部分策略由 getTask 方法實(shí)現(xiàn),其 執(zhí)行流程如下圖所示:

          線程獲取任務(wù)的流程


          getTask 這部分進(jìn)行了多次判斷,為的是控制線程的數(shù)量,使其符合線程池的狀 態(tài)。如果線程池現(xiàn)在不應(yīng)該持有那么多線程,則會(huì)返回 null 值。工作線程 Worker 會(huì)不斷接收新任務(wù)去執(zhí)行,而當(dāng)工作線程 Worker 接收不到任務(wù)的時(shí)候,就會(huì)開(kāi)始 被回收。

          源碼分析

          private Runnable getTask() {
              boolean timedOut = false; // Did the last poll() time out?

              for (;;) {
                  int c = ctl.get();
                  int rs = runStateOf(c);

                  // Check if queue empty only if necessary.
                  // 判斷線程池是否已停止運(yùn)行
                  if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                      decrementWorkerCount();
                      return null;
                  }

                  int wc = workerCountOf(c);

                  // Are workers subject to culling?
                  boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 判斷線程現(xiàn)階段是否夠多
                  if ((wc > maximumPoolSize || (timed && timedOut))
                      && (wc > 1 || workQueue.isEmpty())) {
                      if (compareAndDecrementWorkerCount(c))
                          return null;
                      continue;
                  }
            // 限時(shí)任務(wù)獲取和阻塞獲取
                  try {
                      Runnable r = timed ?
                          workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                      workQueue.take();
                      if (r != null)
                          return r;
                      timedOut = true;
                  } catch (InterruptedException retry) {
                      timedOut = false;
                  }
              }
          }

          2.3.4 任務(wù)拒絕

          任務(wù)拒絕模塊是線程池的保護(hù)部分,線程池有一個(gè)最大的容量,當(dāng)線程池的任務(wù)緩存 隊(duì)列已滿,并且線程池中的線程數(shù)目達(dá)到 maximumPoolSize 時(shí),就需要拒絕掉該任務(wù),采取任務(wù)拒絕策略,保護(hù)線程池。

          拒絕策略是一個(gè)接口,其設(shè)計(jì)如下:

          public interface RejectedExecutionHandler {

              /**
               * Method that may be invoked by a {@link ThreadPoolExecutor} when
               * {@link ThreadPoolExecutor#execute execute} cannot accept a
               * task.  This may occur when no more threads or queue slots are
               * available because their bounds would be exceeded, or upon
               * shutdown of the Executor.
               *
               * <p>In the absence of other alternatives, the method may throw
               * an unchecked {@link RejectedExecutionException}, which will be
               * propagated to the caller of {@code execute}.
               *
               * @param r the runnable task requested to be executed
               * @param executor the executor attempting to execute this task
               * @throws RejectedExecutionException if there is no remedy
               */
              void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
          }

          用戶可以通過(guò)實(shí)現(xiàn)這個(gè)接口去定制拒絕策略,也可以選擇 JDK 提供的四種已有拒絕策略,其特點(diǎn)如下

          名稱描述
          ThreadPoolExecutor.AbortPolicy丟棄任務(wù)并拋出RejectedExecutionException異常。這是線程池默認(rèn)的拒絕策略,在任務(wù)不能在提交的時(shí)候,拋出異常,及時(shí)反饋程序運(yùn)行狀態(tài)。如果是比較關(guān)鍵的業(yè)務(wù),推薦使用該策略,這樣子在系統(tǒng)不能承載更大并發(fā)的時(shí)候,能過(guò)及時(shí)的通過(guò)異常發(fā)現(xiàn)。
          ThreadPoolExecutor.DiscardPolicy丟棄任務(wù),但是不拋出異常。使用該策略,可能會(huì)使我們無(wú)法發(fā)現(xiàn)系統(tǒng)的異常狀態(tài)。建議一些無(wú)關(guān)緊要的業(yè)務(wù)采用此策略。
          ThreadPoolExecutor.DiscardOldestPolicy丟棄隊(duì)列最前面的任務(wù),然后重新提交比拒接的任務(wù)。是否要采用此種策略,需要根據(jù)實(shí)際業(yè)務(wù)是否允許丟棄老任務(wù)來(lái)認(rèn)真衡量
          ThreadPoolExecutor.CallerRunsPolicy由調(diào)用線程(提交任務(wù)的線程)來(lái)處理任務(wù)。這種情況是需要讓所有的任務(wù)都執(zhí)行完畢,那么就適合大量計(jì)算的任務(wù)類型去執(zhí)行,多線程僅僅是增加大吞吐量的手段,最終必須要讓每個(gè)任務(wù)都執(zhí)行
          /**
               * A handler for rejected tasks that runs the rejected task
               * directly in the calling thread of the {@code execute} method,
               * unless the executor has been shut down, in which case the task
               * is discarded.
               */
          public static class CallerRunsPolicy implements RejectedExecutionHandler {
              /**
                   * Creates a {@code CallerRunsPolicy}.
                   */
              public CallerRunsPolicy() { }

              /**
                   * Executes task r in the caller's thread, unless the executor
                   * has been shut down, in which case the task is discarded.
                   *
                   * @param r the runnable task requested to be executed
                   * @param e the executor attempting to execute this task
                   */
              public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                  if (!e.isShutdown()) {
                      r.run();
                  }
              }
          }

          /**
               * A handler for rejected tasks that throws a
               * {@code RejectedExecutionException}.
               */
          public static class AbortPolicy implements RejectedExecutionHandler {
              /**
                   * Creates an {@code AbortPolicy}.
                   */
              public AbortPolicy() { }

              /**
                   * Always throws RejectedExecutionException.
                   *
                   * @param r the runnable task requested to be executed
                   * @param e the executor attempting to execute this task
                   * @throws RejectedExecutionException always
                   */
              public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                  throw new RejectedExecutionException("Task " + r.toString() +
                                                       " rejected from " +
                                                       e.toString());
              }
          }

          /**
               * A handler for rejected tasks that silently discards the
               * rejected task.
               */
          public static class DiscardPolicy implements RejectedExecutionHandler {
              /**
                   * Creates a {@code DiscardPolicy}.
                   */
              public DiscardPolicy() { }

              /**
                   * Does nothing, which has the effect of discarding task r.
                   *
                   * @param r the runnable task requested to be executed
                   * @param e the executor attempting to execute this task
                   */
              public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
              }
          }

          /**
               * A handler for rejected tasks that discards the oldest unhandled
               * request and then retries {@code execute}, unless the executor
               * is shut down, in which case the task is discarded.
               */
          public static class DiscardOldestPolicy implements RejectedExecutionHandler {
              /**
                   * Creates a {@code DiscardOldestPolicy} for the given executor.
                   */
              public DiscardOldestPolicy() { }

              /**
                   * Obtains and ignores the next task that the executor
                   * would otherwise execute, if one is immediately available,
                   * and then retries execution of task r, unless the executor
                   * is shut down, in which case task r is instead discarded.
                   *
                   * @param r the runnable task requested to be executed
                   * @param e the executor attempting to execute this task
                   */
              public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                  if (!e.isShutdown()) {
                      e.getQueue().poll();
                      e.execute(r);
                  }
              }
          }

          2.4 Worker線程管理

          2.4.1 Worker線程

          線程池為了掌握線程的狀態(tài)并維護(hù)線程的生命周期,設(shè)計(jì)了線程池內(nèi)的工作線程Worker。

          Java Worker源碼部分

          private final class Worker
                  extends AbstractQueuedSynchronizer
                  implements Runnable
              {
                  /** Thread this worker is running in.  Null if factory fails. */
                  // worker持有的線程
                  final Thread thread;
                  /** Initial task to run.  Possibly null. */
                  // 初始化的任務(wù),可以為null
                  Runnable firstTask;
              
                  ...
          }

          Worker 這個(gè)工作線程,實(shí)現(xiàn)了 Runnable 接口,并持有一個(gè)線程 thread,一個(gè)初始化的任務(wù) firstTask。thread 是在調(diào)用構(gòu)造方法時(shí)通過(guò) ThreadFactory 來(lái)創(chuàng)建的線程,可以用來(lái)執(zhí)行任務(wù);firstTask 用它來(lái)保存?zhèn)魅氲牡谝粋€(gè)任務(wù),這個(gè)任務(wù)可以有也可以為 null。如果這個(gè)值是非空的,那么線程就會(huì)在啟動(dòng)初期立即執(zhí)行這個(gè)任務(wù),也就對(duì)應(yīng)核心線程創(chuàng)建時(shí)的情況;如果這個(gè)值是 null,那么就需要?jiǎng)?chuàng)建一個(gè)線程去執(zhí)行任務(wù)列表(workQueue)中的任務(wù),也就是非核心線程的創(chuàng)建

          /**
               * The queue used for holding tasks and handing off to worker
               * threads.  We do not require that workQueue.poll() returning
               * null necessarily means that workQueue.isEmpty(), so rely
               * solely on isEmpty to see if the queue is empty (which we must
               * do for example when deciding whether to transition from
               * SHUTDOWN to TIDYING).  This accommodates special-purpose
               * queues such as DelayQueues for which poll() is allowed to
               * return null even if it may later return non-null when delays
               * expire.
               */
          # workerQueue 源碼定義
          private final BlockingQueue<Runnable> workQueue;

          worker執(zhí)行任務(wù)


          線程池需要管理線程的生命周期,需要在線程長(zhǎng)時(shí)間不運(yùn)行的時(shí)候進(jìn)行回收。線程池 使用一張 Hash 表去持有線程的引用,這樣可以通過(guò)添加引用、移除引用這樣的操作 來(lái)控制線程的生命周期。這個(gè)時(shí)候重要的就是如何判斷線程是否在運(yùn)行。

          /**
               * Set containing all worker threads in pool. Accessed only when
               * holding mainLock.
               */
          private final HashSet<Worker> workers = new HashSet<Worker>();

          Worker 是通過(guò)繼承 AQS,使用 AQS 來(lái)實(shí)現(xiàn)獨(dú)占鎖這個(gè)功能。沒(méi)有使用可重入鎖ReentrantLock,而是使用 AQS,為的就是實(shí)現(xiàn)不可重入的特性去反應(yīng)線程現(xiàn)在的執(zhí)行狀態(tài)。

          private final class Worker
                  extends AbstractQueuedSynchronizer
                  implements Runnable

          • lock方法一旦獲取了獨(dú)占鎖,表示當(dāng)前線程正在執(zhí)行任務(wù)中

          • 如果正在執(zhí)行任務(wù),則不應(yīng)該中斷線程

          • 如果該線程現(xiàn)在不是獨(dú)占鎖狀態(tài),也就是空閑狀態(tài),說(shuō)明它沒(méi)有正在處理任務(wù),這時(shí)可以對(duì)該線程進(jìn)行中斷

          • 線程池在執(zhí)行shutdown方法或tryTeriminate方法是或調(diào)用interruptIdleWorkers方法來(lái)中斷空閑線程,interruptIdleWorkers方法會(huì)使用tryLock方法來(lái)判斷線程池中的線程是否是空閑狀態(tài),如果是空閑狀態(tài)則可以安全回收

          shutdown方法源碼

           public void shutdown() {
                  final ReentrantLock mainLock = this.mainLock;
                  mainLock.lock();
                  try {
                      checkShutdownAccess();
                      advanceRunState(SHUTDOWN);
                      // 執(zhí)行interruptIdleWorkers方法
                      interruptIdleWorkers();
                      onShutdown(); // hook for ScheduledThreadPoolExecutor
                  } finally {
                      mainLock.unlock();
                  }
                  tryTerminate();
              }

          tryTerminate方法源碼

          final void tryTerminate() {
                  for (;;) {
                      int c = ctl.get();
                      if (isRunning(c) ||
                          runStateAtLeast(c, TIDYING) ||
                          (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                          return;
                      if (workerCountOf(c) != 0) { // Eligible to terminate
                          // 執(zhí)行interruptIdleWorkers
                          interruptIdleWorkers(ONLY_ONE);
                          return;
                      }

                      final ReentrantLock mainLock = this.mainLock;
                      mainLock.lock();
                      try {
                          if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                              try {
                                  terminated();
                              } finally {
                                  ctl.set(ctlOf(TERMINATED, 0));
                                  termination.signalAll();
                              }
                              return;
                          }
                      } finally {
                          mainLock.unlock();
                      }
                      // else retry on failed CAS
                  }
              }

          interruptIdleWorkers方法源碼

           private void interruptIdleWorkers(boolean onlyOne) {
                  final ReentrantLock mainLock = this.mainLock;
                  mainLock.lock();
                  try {
                      for (Worker w : workers) {
                          Thread t = w.thread;
                          if (!t.isInterrupted() && w.tryLock()) {
                              try {
                                  t.interrupt();
                              } catch (SecurityException ignore) {
                              } finally {
                                  w.unlock();
                              }
                          }
                          if (onlyOne)
                              break;
                      }
                  } finally {
                      mainLock.unlock();
                  }
              }

          在線程回收過(guò)程中就使用到了這種特性,回收過(guò)程如下圖所示:

          線程池回收過(guò)程

          2.4.2 worker線程增加

          增加線程是通過(guò)線程池中的 addWorker 方法,該方法的功能就是增加一個(gè)線程, 該方法不考慮線程池是在哪個(gè)階段增加的該線程,這個(gè)分配線程的策略是在上個(gè)步 驟完成的,該步驟僅僅完成增加線程,并使它運(yùn)行,最后返回是否成功這個(gè)結(jié)果。addWorker 方法有兩個(gè)參數(shù):firstTask、core。firstTask 參數(shù)用于指定新增的線程執(zhí)行的第一個(gè)任務(wù),該參數(shù)可以為空;core 參數(shù)為 true 表示在新增線程時(shí)會(huì)判斷當(dāng)前活動(dòng)線程數(shù)是否少于 corePoolSize,false 表示新增線程前需要判斷當(dāng)前活動(dòng)線程數(shù)是否少于 maximumPoolSize,其執(zhí)行流程如下圖所示:

          申請(qǐng)線程執(zhí)行流程圖


          源碼分析

           private boolean addWorker(Runnable firstTask, boolean core) {
                  retry:
                  for (;;) {
                      int c = ctl.get();
                      int rs = runStateOf(c);

                      // 判斷線程是否已經(jīng)停止
                      // 判斷線程是否正在停止 如果是則判斷線程是否用于執(zhí)行剩余任務(wù)firstTask
                      // workQueue是否為空
                      if (rs >= SHUTDOWN &&
                          ! (rs == SHUTDOWN &&
                             firstTask == null &&
                             ! workQueue.isEmpty()))
                          return false;

                      for (;;) {
                          // 獲取線程數(shù)量
                          int wc = workerCountOf(c);
                          // 判斷線程是否超過(guò)容量 
                          // 判斷線程是否超過(guò)對(duì)應(yīng)核心數(shù)  上面講了core 傳true/false區(qū)別
                          if (wc >= CAPACITY ||
                              wc >= (core ? corePoolSize : maximumPoolSize))
                              return false;
                         
                          if (compareAndIncrementWorkerCount(c))
                              break retry;
                          c = ctl.get();  // Re-read ctl
                          if (runStateOf(c) != rs)
                              continue retry;
                          // else CAS failed due to workerCount change; retry inner loop
                      }
                  }

                  // 嘗試登記線程
                  boolean workerStarted = false;
                  boolean workerAdded = false;
                  Worker w = null;
                  try {
                      w = new Worker(firstTask);
                      final Thread t = w.thread;
                      if (t != null) {
                          // 加鎖
                          final ReentrantLock mainLock = this.mainLock;
                          mainLock.lock();
                          try {
                              // Recheck while holding lock.
                              // Back out on ThreadFactory failure or if
                              // shut down before lock acquired.
                              int rs = runStateOf(ctl.get());
               // 判斷線程池狀態(tài)是否改變
                              if (rs < SHUTDOWN ||
                                  (rs == SHUTDOWN && firstTask == null)) {
                                  if (t.isAlive()) // precheck that t is startable
                                      throw new IllegalThreadStateException();
                                  // 增加線程
                                  workers.add(w);
                                  int s = workers.size();
                                  if (s > largestPoolSize)
                                      largestPoolSize = s;
                                  workerAdded = true;
                              }
                          } finally {
                              // 釋放鎖
                              mainLock.unlock();
                          }
                          // 增加成功啟動(dòng)線程
                          if (workerAdded) {
                              t.start();
                              workerStarted = true;
                          }
                      }
                  } finally {
                      if (! workerStarted)
                          addWorkerFailed(w);
                  }
                  return workerStarted;
              }

          2.4.3 worker線程回收

          線程池中線程的銷毀依賴 JVM 自動(dòng)的回收,線程池做的工作是根據(jù)當(dāng)前線程池的狀態(tài)維護(hù)一定數(shù)量的線程引用,防止這部分線程被 JVM 回收,當(dāng)線程池決定哪些線 程需要回收時(shí),只需要將其引用消除即可。Worker 被創(chuàng)建出來(lái)后,就會(huì)不斷地進(jìn)行輪詢,然后獲取任務(wù)去執(zhí)行,核心線程可以無(wú)限等待獲取任務(wù),非核心線程要限時(shí)獲取任務(wù)。當(dāng) Worker 無(wú)法獲取到任務(wù),也就是獲取的任務(wù)為空時(shí),循環(huán)會(huì)結(jié)束,Worker 會(huì)主動(dòng)消除自身在線程池內(nèi)的引用。

          final void runWorker(Worker w) {
                  Thread wt = Thread.currentThread();
                  Runnable task = w.firstTask;
                  w.firstTask = null;
                  w.unlock(); // allow interrupts
                  boolean completedAbruptly = true;
                  try {
                      while (task != null || (task = getTask()) != null) {
                          // 執(zhí)行任務(wù)
                  } finally {
                          // 獲取不到任務(wù),主動(dòng)回收自己
                      processWorkerExit(w, completedAbruptly);
                  }
              }
              
              
              
           private void processWorkerExit(Worker w, boolean completedAbruptly) {
                  if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
                      decrementWorkerCount();

                  final ReentrantLock mainLock = this.mainLock;
                  mainLock.lock();
                  try {
                      completedTaskCount += w.completedTasks;
                      // 回收
                      workers.remove(w);
                  } finally {
                      mainLock.unlock();
                  }
            ...
              }

          事實(shí)上,在這個(gè)方法中,將線程引用移出線程池就已經(jīng)結(jié)束了線程銷毀的部分。但由于引起線程銷毀的可能性有很多,線程池還要判斷是什么引發(fā)了這次銷毀,是否要改變線程池的現(xiàn)階段狀態(tài),是否要根據(jù)新?tīng)顟B(tài),重新分配線程。

          線程銷毀流程


          2.4.4 worker線程執(zhí)行任務(wù)

          在 Worker 類中的 run 方法調(diào)用了 runWorker 方法來(lái)執(zhí)行任務(wù),runWorker 方法的執(zhí)行過(guò)程如下:

          • while循環(huán)不斷獲取getTask()方法獲取任務(wù)

          • getTask()方法從阻塞隊(duì)列獲取任務(wù)

          • 如果線程池正在停止,那么保證當(dāng)前線程是中斷狀態(tài),否則要保證當(dāng)前線程不是中斷狀態(tài)

          • 執(zhí)行任務(wù)

          • 如果getTask結(jié)果為null則調(diào)出循環(huán),執(zhí)行processWorkerExit(),銷毀線程

            執(zhí)行任務(wù)流程



          2.4.5 worker如何保證核心線程不被回收

          源碼分析

          我們通常都是通過(guò)執(zhí)行execute(Runnable command)方法來(lái)向線程池提交一個(gè)不需要返回結(jié)果的任務(wù)的如果你需要返回結(jié)果那么就是 <T> Future<T> submit(Callable<T> task)方法)

          • 第一步:execute方法分析

          public void execute(Runnable command) {
               // 提交任務(wù)為null 拋出異常
                  if (command == null)
                      throw new NullPointerException();
              // 獲取線程池狀態(tài)\線程池線程數(shù)據(jù)
                  int c = ctl.get();
               // 小于核心線程數(shù) addWorker()
                  if (workerCountOf(c) < corePoolSize) {
                      if (addWorker(commandtrue))
                          return;
                      c = ctl.get();
                  }
               // 大于核心線程數(shù),當(dāng)前線程池是運(yùn)行狀態(tài),向阻塞隊(duì)列中添加任務(wù)
                  if (isRunning(c) && workQueue.offer(command)) {
                      int recheck = ctl.get();
                      if (! isRunning(recheck) && remove(command))
                          reject(command);
                      else if (workerCountOf(recheck) == 0)
                          addWorker(null, false);
                  }
               // 隊(duì)列添加失敗 拒絕策略處理
                  else if (!addWorker(commandfalse))
                      reject(command);
              }

          • 第二步:addWorker()方法分析

           private boolean addWorker(Runnable firstTask, boolean core) {
                  retry:
                 // 死循環(huán)
                  for (;;) {
                      int c = ctl.get();
                      int rs = runStateOf(c);
             // 如果當(dāng)前線程狀態(tài)是SHUTDOWN STOP TIDYING TERMINATED 并且SHUTDOWN狀態(tài)時(shí)任務(wù)隊(duì)列為空 返回false
                      // Check if queue empty only if necessary.
                      if (rs >= SHUTDOWN &&
                          ! (rs == SHUTDOWN &&
                             firstTask == null &&
                             ! workQueue.isEmpty()))
                          return false;
             // 死循環(huán)
                      for (;;) {
                          int wc = workerCountOf(c);
                          // core參數(shù) true corePoolSize核心線程數(shù) false maximumPoolSize最大線程數(shù)
                          // CAPACITY integer最大值 (1 << COUNT_BITS) - 1;
                          if (wc >= CAPACITY ||
                              wc >= (core ? corePoolSize : maximumPoolSize))
                              return false;
                          // 如果增加任務(wù)成功,退出該循環(huán)執(zhí)行下面代碼,否則繼續(xù)
                          if (compareAndIncrementWorkerCount(c))
                              break retry;
                          c = ctl.get();  // Re-read ctl
                          if (runStateOf(c) != rs)
                              continue retry;
                          // else CAS failed due to workerCount change; retry inner loop
                      }
                  }

                  boolean workerStarted = false;
                  boolean workerAdded = false;
                  Worker w = null;
                  try {
                      // 重點(diǎn)代碼 后續(xù)分析
                      w = new Worker(firstTask);
                      final Thread t = w.thread;
                      if (t != null) {
                          // 內(nèi)置鎖 加鎖
                          final ReentrantLock mainLock = this.mainLock;
                          mainLock.lock();
                          try {
                              // Recheck while holding lock.
                              // Back out on ThreadFactory failure or if
                              // shut down before lock acquired.
                              int rs = runStateOf(ctl.get());
               // 判斷線程池狀態(tài),防止使用過(guò)程中線程池被關(guān)閉
                              if (rs < SHUTDOWN ||
                                  (rs == SHUTDOWN && firstTask == null)) {
                                  if (t.isAlive()) // precheck that t is startable
                                      throw new IllegalThreadStateException();
                                  // 向正在被執(zhí)行的任務(wù)隊(duì)列workers中添加worker 
                                  // 注意區(qū)分 
                                  // HashSet<Worker> workers = new HashSet<Worker>()  線程池中線程
                                  // private final BlockingQueue<Runnable> workQueue 等待被執(zhí)行的任務(wù)
                                  workers.add(w);
                                  int s = workers.size();
                                  // 記錄任務(wù)最大數(shù)
                                  if (s > largestPoolSize)
                                      largestPoolSize = s;
                                  // 添加任務(wù)成功
                                  workerAdded = true;
                              }
                          } finally {
                              // 釋放鎖
                              mainLock.unlock();
                          }
                          // 添加任務(wù)成功,那么開(kāi)始執(zhí)行任務(wù)
                          if (workerAdded) {
                              // 重點(diǎn)代碼 -- 我們需要查看worker中的run()
                              t.start();
                              workerStarted = true;
                          }
                      }
                  } finally {
                      if (! workerStarted)
                          addWorkerFailed(w);
                  }
                  return workerStarted;
              }

          • 第三步:查看worker中的run()

           /** Delegates main run loop to outer runWorker  */
          public void run() {
              runWorker(this);
          }

          • 第四步:查看runWorker()

           final void runWorker(Worker w) {
                  Thread wt = Thread.currentThread();
                // 獲取worker對(duì)象中的任務(wù) 可以為null 
                  Runnable task = w.firstTask;
                  w.firstTask = null;
                  w.unlock(); // allow interrupts
                  boolean completedAbruptly = true;
                  try {
                      // 死循環(huán)
                      // 判斷任務(wù)是否為空,如果為空則getTask()獲取任務(wù)
                      while (task != null || (task = getTask()) != null) {
                          w.lock();
                          // If pool is stopping, ensure thread is interrupted;
                          // if not, ensure thread is not interrupted.  This
                          // requires a recheck in second case to deal with
                          // shutdownNow race while clearing interrupt
                          if ((runStateAtLeast(ctl.get(), STOP) ||
                               (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                              !wt.isInterrupted())
                              wt.interrupt();
                          try {
                              // 任務(wù)執(zhí)行前調(diào)用
                              beforeExecute(wt, task);
                              Throwable thrown = null;
                              try {
                                  task.run();
                              } catch (RuntimeException x) {
                                  thrown = x; throw x;
                              } catch (Error x) {
                                  thrown = x; throw x;
                              } catch (Throwable x) {
                                  thrown = x; throw new Error(x);
                              } finally {
                                  // 任務(wù)執(zhí)行后調(diào)用
                                  afterExecute(task, thrown);
                              }
                          } finally {
                              // 重點(diǎn)代碼,執(zhí)行完任務(wù)將task設(shè)置為null 則會(huì)從getTask()重新獲取
                              task = null;
                              w.completedTasks++;
                              w.unlock();
                          }
                      }
                      completedAbruptly = false;
                  } finally {
                      // 回收worker 
                      processWorkerExit(w, completedAbruptly);
                  }
              }

          我們可以看到beforeExecute(Thread t, Runnable r)方法和afterExecute(Runnable r, Throwable t)會(huì)在任務(wù)的執(zhí)行前后執(zhí)行,我們可以通過(guò)繼承線程池的方式來(lái)重寫(xiě)這兩個(gè)方法,這樣就能夠?qū)θ蝿?wù)的執(zhí)行進(jìn)行監(jiān)控啦。

          • 第五步:查看getTask()

          private Runnable getTask() {
                  boolean timedOut = false; // Did the last poll() time out?
            // 死循環(huán)
                  for (;;) {
                      int c = ctl.get();
                      int rs = runStateOf(c);
             // 判斷線程池狀態(tài)
                      // Check if queue empty only if necessary.
                      if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                          decrementWorkerCount();
                          return null;
                      }
             // 統(tǒng)計(jì)worker
                      int wc = workerCountOf(c);

                      // 如果設(shè)置了allowCoreThreadTimeOut(true) 或者當(dāng)前運(yùn)行的統(tǒng)計(jì)worker數(shù)大于設(shè)置的核心線程數(shù),那么timed =true
                      // Are workers subject to culling?
                      boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

                      if ((wc > maximumPoolSize || (timed && timedOut))
                          && (wc > 1 || workQueue.isEmpty())) {
                          if (compareAndDecrementWorkerCount(c))
                              return null;
                          continue;
                      }

                      // 核心代碼
                      try {
                          // 看完這里就明白了
                          // 阻塞隊(duì)列獲取
                          // workQueue.poll() 規(guī)定時(shí)間獲取任務(wù)
                          // workQueue.take() 會(huì)一直等待,知道阻塞隊(duì)列中任務(wù)不為空
                          Runnable r = timed ?
                              workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                              workQueue.take();
                          // 獲取任務(wù)返回
                          if (r != null)
                              return r;
                          timedOut = true;
                      } catch (InterruptedException retry) {
                          timedOut = false;
                      }
                  }
              }


          版權(quán)聲明:本文為博主原創(chuàng)文章,遵循 CC 4.0 BY-SA 版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接和本聲明。

          本文鏈接:

          https://blog.csdn.net/qq_41125219/article/details/117535516







          瀏覽 59
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩精品福利 | 日逼123 | 亚洲欧美成人电影 | 人妻久久久久免费肉丝足交 | 青青草视频涩情 |