<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          2W 字帶你,深入一下線程池

          共 11636字,需瀏覽 24分鐘

           ·

          2020-12-23 16:55

          前言

          線程池可以說是 Java 進(jìn)階必備的知識(shí)點(diǎn)了,也是面試中必備的考點(diǎn),可能不少人看了這篇文章后能對(duì)線程池工作原理說上一二,但這還遠(yuǎn)遠(yuǎn)不夠,如果碰到比較有經(jīng)驗(yàn)的面試官再繼續(xù)追問,很可能會(huì)被吊打,考慮如下問題:

          1. Tomcat 的線程池和 JDK 的線程池實(shí)現(xiàn)有啥區(qū)別, Dubbo 中有類似 Tomcat 的線程池實(shí)現(xiàn)嗎?
          2. 我司網(wǎng)關(guān) dubbo 調(diào)用線程池曾經(jīng)出現(xiàn)過這樣的一個(gè)問題:壓測(cè)時(shí)接口可以正常返回,但接口 RT 很高,假設(shè)設(shè)置的核心線程大小為 500,最大線程為 800,緩沖隊(duì)列為 5000,你能從這個(gè)設(shè)置中發(fā)現(xiàn)出一些問題并對(duì)這些參數(shù)進(jìn)行調(diào)優(yōu)嗎?
          3. 線程池里的線程真的有核心線程和非核心線程之分?
          4. 線程池被 shutdown 后,還能產(chǎn)生新的線程?
          5. 線程把任務(wù)丟給線程池后肯定就馬上返回了?
          6. 線程池里的線程異常后會(huì)再次新增線程嗎,如何捕獲這些線程拋出的異常?
          7. 線程池的大小如何設(shè)置,如何動(dòng)態(tài)設(shè)置線程池的參數(shù)
          8. 線程池的狀態(tài)機(jī)畫一下?
          9. 阿里 Java 代碼規(guī)范為什么不允許使用 Executors 快速創(chuàng)建線程池?
          10. 使用線程池應(yīng)該避免哪些問題,能否簡(jiǎn)單說下線程池的最佳實(shí)踐?
          11. 如何優(yōu)雅關(guān)閉線程池
          12. 如何對(duì)線程池進(jìn)行監(jiān)控

          相信不少人看了這些問題會(huì)有些懵逼

          其實(shí)這些問題的答案大多數(shù)都藏在線程池的源碼里,所以深入了解線程池的源碼非常重要,本章我們將會(huì)來學(xué)習(xí)一下線程池的源碼,相信看完之后,以上的問題大部分都能回答,另外一些問題我們也會(huì)在文中與大家一起探討。

          本文將會(huì)從以下幾個(gè)方面來介紹線程池的原理。

          1. 為什么要用線程池
          2. 線程池是如何工作的
          3. 線程池提交任務(wù)的兩種方式
          4. ThreadPoolExecutor 源碼剖析
          5. 解答開篇的問題
          6. 線程池的最佳實(shí)踐
          7. 總結(jié)

          相信大家看完對(duì)線程池的理解會(huì)更進(jìn)一步,肝文不易,看完別完了三連哦。

          為什么要用線程池

          上文也提到過,創(chuàng)建線程有三大開銷,如下:

          1、其實(shí) Java 中的線程模型是基于操作系統(tǒng)原生線程模型實(shí)現(xiàn)的,也就是說 Java 中的線程其實(shí)是基于內(nèi)核線程實(shí)現(xiàn)的,線程的創(chuàng)建,析構(gòu)與同步都需要進(jìn)行系統(tǒng)調(diào)用,而系統(tǒng)調(diào)用需要在用戶態(tài)與內(nèi)核中來回切換,代價(jià)相對(duì)較高,線程的生命周期包括「線程創(chuàng)建時(shí)間」,「線程執(zhí)行任務(wù)時(shí)間」,「線程銷毀時(shí)間」,創(chuàng)建和銷毀都需要導(dǎo)致系統(tǒng)調(diào)用。2、每個(gè) Thread 都需要有一個(gè)內(nèi)核線程的支持,也就意味著每個(gè) Thread 都需要消耗一定的內(nèi)核資源(如內(nèi)核線程的棧空間),因此能創(chuàng)建的 Thread 是有限的,默認(rèn)一個(gè)線程的線程棧大小是 1 M,有圖有真相

          圖中所示,在 Java 8 下,創(chuàng)建 19 個(gè)線程(thread #19)需要?jiǎng)?chuàng)建 19535 KB,即 1 M 左右,reserved 代表如果創(chuàng)建 19 個(gè)線程,操作系統(tǒng)保證會(huì)為其分配這么多空間(實(shí)際上并不一定分配),committed 則表示實(shí)際已分配的空間大小。

          畫外音:注意,這是在 Java 8 下的線程占用空間情況,但在 Java 11 中,對(duì)線程作了很大的優(yōu)化,創(chuàng)建一個(gè)線程大概只需要 40 KB,空間消耗大大減少

          3、線程多了,導(dǎo)致不可忽視的上下文切換開銷。

          由此可見,線程的創(chuàng)建是昂貴的,所以必須以線程池的形式來管理這些線程,在線程池中合理設(shè)置線程大小和管理線程,以達(dá)到以合理的創(chuàng)建線程大小以達(dá)到最大化收益,最小化風(fēng)險(xiǎn)的目的,對(duì)于開發(fā)人員來說,要完成任務(wù)不用關(guān)心線程如何創(chuàng)建,如何銷毀,如何協(xié)作,只需要關(guān)心提交的任務(wù)何時(shí)完成即可,對(duì)線程的調(diào)優(yōu),監(jiān)控等這些細(xì)枝末節(jié)的工作通通交給線程池來實(shí)現(xiàn),所以也讓開發(fā)人員得到極大的解脫!

          類似線程池的這種池化思想應(yīng)用在很多地方,比如數(shù)據(jù)庫(kù)連接池,Http 連接池等,避免了昂貴資源的創(chuàng)建,提升了性能,也解放了開發(fā)人員。

          ThreadPoolExecutor 設(shè)計(jì)架構(gòu)圖

          首先我們來看看 Executor 框架的設(shè)計(jì)圖

          • Executor: 最頂層的 Executor 接口只提供了一個(gè) execute 接口,實(shí)現(xiàn)了提交任務(wù)與執(zhí)行任務(wù)的解藕,這個(gè)方法是最核心的,也是我們?cè)创a剖析的重點(diǎn),此方法最終是由 ThreadPoolExecutor 實(shí)現(xiàn)的,
          • ExecutorService 擴(kuò)展了 Executor 接口,實(shí)現(xiàn)了終止執(zhí)行器,單個(gè)/批量提交任務(wù)等方法
          • AbstractExecutorService 實(shí)現(xiàn)了 ExecutorService 接口,實(shí)現(xiàn)了除 execute 以外的所有方法,只將一個(gè)最重要的 execute 方法交給 ThreadPoolExecutor 實(shí)現(xiàn)。

          這樣的分層設(shè)計(jì)雖然層次看起來挺多,但每一層每司其職,邏輯清晰,值得借鑒。

          線程池是如何工作的

          首先我們來看下如何創(chuàng)建一個(gè)線程池

          ThreadPoolExecutor?threadPool?=?new?ThreadPoolExecutor(10,?20,?600L,
          ????????????????????TimeUnit.SECONDS,?new?LinkedBlockingQueue<>(4096),
          ????????????????????new?NamedThreadFactory("common-work-thread"));
          //?設(shè)置拒絕策略,默認(rèn)為?AbortPolicy
          threadPool.setRejectedExecutionHandler(new?ThreadPoolExecutor.AbortPolicy());

          看下其構(gòu)造方法簽名如下

          public?ThreadPoolExecutor(int?corePoolSize,
          ??????????????????????????????int?maximumPoolSize,
          ??????????????????????????????long?keepAliveTime,
          ??????????????????????????????TimeUnit?unit,
          ??????????????????????????????BlockingQueue?workQueue,
          ??????????????????????????????ThreadFactory?threadFactory,
          ??????????????????????????????RejectedExecutionHandler?handler)
          ?
          {
          ????????????//?省略代碼若干
          }

          要理解這些參數(shù)具體代表的意義,必須清楚線程池提交任務(wù)與執(zhí)行任務(wù)流程,如下

          圖片來自美團(tuán)技術(shù)團(tuán)隊(duì)

          步驟如下

          1、corePoolSize:如果提交任務(wù)后線程還在運(yùn)行,當(dāng)線程數(shù)小于 corePoolSize 值時(shí),無論線程池中的線程是否忙碌,都會(huì)創(chuàng)建線程,并把任務(wù)交給此新創(chuàng)建的線程進(jìn)行處理,如果線程數(shù)少于等于 corePoolSize,那么這些線程不會(huì)回收,除非將 allowCoreThreadTimeOut 設(shè)置為 true,但一般不這么干,因?yàn)轭l繁地創(chuàng)建銷毀線程會(huì)極大地增加系統(tǒng)調(diào)用的開銷。

          2、workQueue:如果線程數(shù)大于核心數(shù)(corePoolSize)且小于最大線程數(shù)(maximumPoolSize),則會(huì)將任務(wù)先丟到阻塞隊(duì)列里,然后線程自己去阻塞隊(duì)列中拉取任務(wù)執(zhí)行。

          3、maximumPoolSize: 線程池中最大可創(chuàng)建的線程數(shù),如果提交任務(wù)時(shí)隊(duì)列滿了且線程數(shù)未到達(dá)這個(gè)設(shè)定值,則會(huì)創(chuàng)建線程并執(zhí)行此次提交的任務(wù),如果提交任務(wù)時(shí)隊(duì)列滿了但線池?cái)?shù)已經(jīng)到達(dá)了這個(gè)值,此時(shí)說明已經(jīng)超出了線池程的負(fù)載能力,就會(huì)執(zhí)行拒絕策略,這也好理解,總不能讓源源不斷地任務(wù)進(jìn)來把線程池給壓垮了吧,我們首先要保證線程池能正常工作。

          4、RejectedExecutionHandler:一共有以下四種拒絕策略

          • AbortPolicy:丟棄任務(wù)并拋出異常,這也是默認(rèn)策略;
          • CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù),所以開頭的問題「線程把任務(wù)丟給線程池后肯定就馬上返回了?」我們可以回答了,如果用的是 CallerRunsPolicy 策略,提交任務(wù)的線程(比如主線程)提交任務(wù)后并不能保證馬上就返回,當(dāng)觸發(fā)了這個(gè) reject 策略不得不親自來處理這個(gè)任務(wù)。
          • DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù)。
          • DiscardPolicy:直接丟棄任務(wù),不拋出任何異常,這種策略只適用于不重要的任務(wù)。

          5、keepAliveTime: 線程存活時(shí)間,如果在此時(shí)間內(nèi)超出 corePoolSize 大小的線程處于 idle 狀態(tài),這些線程會(huì)被回收

          6、threadFactory:可以用此參數(shù)設(shè)置線程池的命名,指定 defaultUncaughtExceptionHandler(有啥用,后文闡述),甚至可以設(shè)定線程為守護(hù)線程。

          現(xiàn)在問題來了,該如何合理設(shè)置這些參數(shù)呢。

          首先來看線程大小設(shè)置

          <>告訴我們應(yīng)該分兩種情況

          1. 針對(duì) CPU 密集型的任務(wù),在有 Ncpu個(gè)處理器的系統(tǒng)上,當(dāng)線程池的大小為 Ncpu + 1 時(shí),通常能實(shí)現(xiàn)最優(yōu)的利用率,+1 是因?yàn)楫?dāng)計(jì)算密集型線程偶爾由于缺頁(yè)故障或其他原因而暫停工作時(shí),這個(gè)"額外"的線程也能確保 CPU 的時(shí)鐘周期不會(huì)被浪費(fèi),所謂 CPU 密集,就是線程一直在忙碌,這樣將線程池的大小設(shè)置為 Ncpu + 1 避免了線程的上下文切換,讓線程時(shí)刻處于忙碌狀態(tài),將 CPU 的利用率最大化。
          2. 針對(duì) IO 密集型的任務(wù),它也給出了如下計(jì)算公式

          這些公式看看就好,實(shí)際的業(yè)務(wù)場(chǎng)景中基本用不上,這些公式太過理論化了,脫離業(yè)務(wù)場(chǎng)景,僅可作個(gè)理論參考,舉個(gè)例子,你說 CPU 密集型任務(wù)設(shè)置線程池大小為 N + 1個(gè),但實(shí)際上在業(yè)務(wù)中往往不只設(shè)置一個(gè)線程池,這種情況套用的公式就懵逼了

          再來看 workQueue 的大小設(shè)置

          由上文可知,如果最大線程大于核心線程數(shù),當(dāng)且僅當(dāng)核心線程滿了且 workQueue 也滿的情況下,才會(huì)新增新的線程,也就是說如果 workQueue 是無界隊(duì)列,那么當(dāng)線程數(shù)增加到 corePoolSize 后,永遠(yuǎn)不會(huì)再新增新的線程了,也就是說此時(shí) maximumPoolSize 的設(shè)置就無效了,也無法觸發(fā) RejectedExecutionHandler 拒絕策略,任務(wù)只會(huì)源源不斷地填充到 workQueue,直到 OOM。

          所以 workQueue 應(yīng)該為有界隊(duì)列,至少保證在任務(wù)過載的情況下線程池還能正常工作,那么哪些是有有界隊(duì)列,哪些是無界隊(duì)列呢。

          有界隊(duì)列我們常用的以下兩個(gè)

          • LinkedBlockingQueue: 鏈表構(gòu)成的有界隊(duì)列,按先進(jìn)先出(FIFO)的順序?qū)υ剡M(jìn)行排列,但注意在創(chuàng)建時(shí)需指定其大小,否則其大小默認(rèn)為 Integer.MAX_VALUE,相當(dāng)于無界隊(duì)列了
          • ArrayBlockingQueue: 數(shù)組實(shí)現(xiàn)的有界隊(duì)列,按先進(jìn)先出(FIFO)的順序?qū)υ剡M(jìn)行排列。

          無界隊(duì)列我們常用 PriorityBlockingQueue 這個(gè)優(yōu)先級(jí)隊(duì)列,任務(wù)插入的時(shí)候可以指定其權(quán)重以讓這些任務(wù)優(yōu)先執(zhí)行,但這個(gè)隊(duì)列很少用,原因很簡(jiǎn)單,線程池里的任務(wù)執(zhí)行順序一般是平等的,如果真有必須某些類型的任務(wù)需要優(yōu)先執(zhí)行,大不了再開個(gè)線程池好了,將不同的任務(wù)類型用不同的線程池隔離開來,也是合理利用線程池的一種實(shí)踐。

          說到這我相信大家應(yīng)該能回答開頭的問題「阿里 Java 代碼規(guī)范為什么不允許使用 Executors 快速創(chuàng)建線程池?」,最常見的是以下兩種創(chuàng)建方式

          image-20201109002227476

          newCachedThreadPool 方法的最大線程數(shù)設(shè)置成了 Integer.MAX_VALUE,而 newSingleThreadExecutor 方法創(chuàng)建 workQueue 時(shí) LinkedBlockingQueue 未聲明大小,相當(dāng)于創(chuàng)建了無界隊(duì)列,一不小心就會(huì)導(dǎo)致 OOM。

          threadFactory 如何設(shè)置

          一般業(yè)務(wù)中會(huì)有多個(gè)線程池,如果某個(gè)線程池出現(xiàn)了問題,定位是哪一個(gè)線程出問題很重要,所以為每個(gè)線程池取一個(gè)名字就很有必要了,我司用的 dubbo 的 NamedThreadFactory 來生成 threadFactory,創(chuàng)建很簡(jiǎn)單

          new?NamedThreadFactory("demo-work")

          它的實(shí)現(xiàn)還是很巧妙的,有興趣地可以看看它的源碼,每調(diào)用一次,底層有個(gè)計(jì)數(shù)器會(huì)加一,會(huì)依次命名為 「demo-work-thread-1」, 「demo-work-thread-2」, 「demo-work-thread-3」這樣遞增的字符串。

          在實(shí)際的業(yè)務(wù)場(chǎng)景中,一般很難確定 corePoolSize, workQueue,maximumPoolSize 的大小,如果出問題了,一般來說只能重新設(shè)置一下這些參數(shù)再發(fā)布,這樣往往需要耗費(fèi)一些時(shí)間,美團(tuán)的這篇文章給出了讓人眼前一亮的解決方案,當(dāng)發(fā)現(xiàn)問題(線程池監(jiān)控告警)時(shí),動(dòng)態(tài)調(diào)整這些參數(shù),可以讓這些參數(shù)實(shí)時(shí)生效,能在發(fā)現(xiàn)問題時(shí)及時(shí)解決,確實(shí)是個(gè)很好的思路。

          線程池提交任務(wù)的兩種方式

          線程池創(chuàng)建好了,該怎么給它提交任務(wù),有兩種方式,調(diào)用 execute 和 submit 方法,來看下這兩個(gè)方法的方法簽名

          //?方式一:execute 方法
          public?void?execute(Runnable?command)?{
          }

          //?方式二:ExecutorService 中 submit 的三個(gè)方法
          ?Future?submit(Callable?task);
          ?Future?submit(Runnable?task,?T?result);
          Future?submit(Runnable?task);

          區(qū)別在于調(diào)用 execute 無返回值,而調(diào)用 ?submit 可以返回 Future,那么這個(gè) Future 能到底能干啥呢,看它的接口

          public?interface?Future<V>?{

          ????/**
          ?????*?取消正在執(zhí)行的任務(wù),如果任務(wù)已執(zhí)行或已被取消,或者由于某些原因不能取消則返回?false
          ?????*?如果任務(wù)未開始或者任務(wù)已開始但可以中斷(mayInterruptIfRunning?為?true),則
          ?????*?可以取消/中斷此任務(wù)
          ?????*/

          ????boolean?cancel(boolean?mayInterruptIfRunning);

          ????/**
          ?????*?任務(wù)在完成前是否已被取消
          ?????*/

          ????boolean?isCancelled();

          ????/**
          ?????*?正常的執(zhí)行完流程流程,或拋出異常,或取消導(dǎo)致的任務(wù)完成都會(huì)返回?true
          ?????*/

          ????boolean?isDone();

          ????/**
          ?????*?阻塞等待任務(wù)的執(zhí)行結(jié)果
          ?????*/

          ????V?get()?throws?InterruptedException,?ExecutionException;

          ????/**
          ?????*?阻塞等待任務(wù)的執(zhí)行結(jié)果,不過這里指定了時(shí)間,如果在?timeout?時(shí)間內(nèi)任務(wù)還未執(zhí)行完成,
          ?????*?則拋出?TimeoutException?異常
          ?????*/

          ????V?get(long?timeout,?TimeUnit?unit)
          ????????throws?InterruptedException,?ExecutionException,?TimeoutException
          ;
          }

          可以用 Future 取消任務(wù),判斷任務(wù)是否已取消/完成,甚至可以阻塞等待結(jié)果。

          submit 為啥能提交任務(wù)(Runnable)的同時(shí)也能返回任務(wù)(Future)的執(zhí)行結(jié)果呢

          原來在最后執(zhí)行 execute 前用 newTaskFor 將 task 封裝成了 RunnableFuture,newTaskFor 返回了 FutureTask 這個(gè)類,結(jié)構(gòu)圖如下

          可以看到 FutureTask 這個(gè)接口既實(shí)現(xiàn)了 Runnable 接口,也實(shí)現(xiàn) Future 接口,所以在提交任務(wù)的同時(shí)也能利用 Future 接口來執(zhí)行任務(wù)的取消,獲取任務(wù)的狀態(tài),等待執(zhí)行結(jié)果這些操作。

          execute 與 submit 除了是否能返回執(zhí)行結(jié)果這一區(qū)別外,還有一個(gè)重要區(qū)別,那就是使用 execute 執(zhí)行如果發(fā)生了異常,是捕獲不到的,默認(rèn)會(huì)執(zhí)行 ThreadGroup 的 uncaughtException 方法(下圖數(shù)字 2 對(duì)應(yīng)的邏輯)

          所以如果你想監(jiān)控執(zhí)行 execute 方法時(shí)發(fā)生的異常,需要通過 threadFactory 來指定一個(gè) UncaughtExceptionHandler,這樣就會(huì)執(zhí)行上圖中的 1,進(jìn)而執(zhí)行 UncaughtExceptionHandler 中的邏輯,如下所示:

          //1.實(shí)現(xiàn)一個(gè)自己的線程池工廠
          ThreadFactory?factory?=?(Runnable?r)?->?{
          ????//創(chuàng)建一個(gè)線程
          ????Thread?t?=?new?Thread(r);
          ????//給創(chuàng)建的線程設(shè)置UncaughtExceptionHandler對(duì)象?里面實(shí)現(xiàn)異常的默認(rèn)邏輯
          ????t.setDefaultUncaughtExceptionHandler((Thread?thread1,?Throwable?e)?->?{
          ????????//?在此設(shè)置統(tǒng)計(jì)監(jiān)控邏輯
          ????????System.out.println("線程工廠設(shè)置的exceptionHandler"?+?e.getMessage());
          ????});
          ????return?t;
          };

          //?2.創(chuàng)建一個(gè)自己定義的線程池,使用自己定義的線程工廠
          ExecutorService?service?=?new?ThreadPoolExecutor(1,?1,?0,?TimeUnit.MILLISECONDS,new?LinkedBlockingQueue(10),factory);

          //3.提交任務(wù)
          service.execute(()->{
          ????int?i=1/0;
          });

          執(zhí)行以上邏輯最終會(huì)輸出「線程工廠設(shè)置的exceptionHandler/ by zero」,通過這樣的方式就能通過設(shè)定的 defaultUncaughtExceptionHandler 來執(zhí)行我們的監(jiān)控邏輯了。

          如果用 submit ,如何捕獲異常呢,當(dāng)我們調(diào)用 future.get 就可以捕獲

          Callable?testCallable?=?xxx;
          Future?future?=?executor.submit(myCallable);
          try?{
          ????future1.get(3));
          }?catch?(InterruptedException?e)?{
          ????e.printStackTrace();
          }?catch?(ExecutionException?e)?{
          ????e.printStackTrace();
          }

          那么 future 為啥在 get 的時(shí)候才捕獲異步呢,因?yàn)樵趫?zhí)行 submit 時(shí)拋出異常后此異常被保存了起來,而在 get 的時(shí)候才被拋出

          關(guān)于 execute 和 submit 的執(zhí)行流程 why 神的這篇文章寫得非常透徹,我就不拾人牙慧了,建議大家好好品品,收獲會(huì)很大!

          ThreadPoolExecutor 源碼剖析

          前面鋪墊了這么多,終于到了最核心的源碼剖析環(huán)節(jié)了。

          對(duì)于線程池來說,我們最關(guān)心的是它的「狀態(tài)」和「可運(yùn)行的線程數(shù)量」,一般來說我們可以選擇用兩個(gè)變量來記錄,不過 Doug Lea 只用了一個(gè)變量(ctl)就達(dá)成目的了,我們知道變量越多,代碼的可維護(hù)性就越差,也越容易出 bug, 所以只用一個(gè)變量就達(dá)成了兩個(gè)變量的效果,這讓代碼的可維護(hù)性大大提高,那么他是怎么設(shè)計(jì)的呢

          //?ThreadPoolExecutor.java
          public?class?ThreadPoolExecutor?extends?AbstractExecutorService?{
          ????private?final?AtomicInteger?ctl?=?new?AtomicInteger(ctlOf(RUNNING,?0));
          ????private?static?final?int?COUNT_BITS?=?Integer.SIZE?-?3;
          ????private?static?final?int?CAPACITY???=?(1?<1;

          ????//?結(jié)果:111 00000000000000000000000000000
          ????private?static?final?int?RUNNING????=?-1?<????//?結(jié)果:?000?00000000000000000000000000000
          ????private?static?final?int?SHUTDOWN???=??0?<????//?結(jié)果:?001 00000000000000000000000000000
          ????private?static?final?int?STOP???????=??1?<????//?結(jié)果:?010?00000000000000000000000000000
          ????private?static?final?int?TIDYING????=??2?<????//?結(jié)果:?011 00000000000000000000000000000
          ????private?static?final?int?TERMINATED?=??3?<
          ????//?獲取線程池的狀態(tài)
          ????private?static?int?runStateOf(int?c)?????{?return?c?&?~CAPACITY;?}
          ????//?獲取線程數(shù)量
          ????private?static?int?workerCountOf(int?c)??{?return?c?&?CAPACITY;?}
          }

          可以看到,ctl 是一個(gè) 原子類的 Integer 變量,有 32 位,低 29 位表示線程數(shù)量, 29 位最大可以表示 (2^29)-1 (大概 5 億多),足夠記錄線程大小了,如果未來還是不夠,可以把 ctl 聲明為 AtomicLong,高 3 位用來表示線程池的狀態(tài),3 位可以表示 8 個(gè)線程池的狀態(tài),由于線程池總共只有五個(gè)狀態(tài),所以 3 位也是足夠了,線程池的五個(gè)狀態(tài)如下

          • RUNNING: 接收新的任務(wù),并能繼續(xù)處理 workQueue 中的任務(wù)
          • SHUTDOWN: 不再接收新的任務(wù),不過能繼續(xù)處理 workQueue 中的任務(wù)
          • STOP: 不再接收新的任務(wù),也不再處理 workQueue 中的任務(wù),并且會(huì)中斷正在處理任務(wù)的線程
          • TIDYING: 所有的任務(wù)都完結(jié)了,并且線程數(shù)量(workCount)為 0 時(shí)即為此狀態(tài),進(jìn)入此狀態(tài)后會(huì)調(diào)用 terminated() 這個(gè)鉤子方法進(jìn)入 TERMINATED 狀態(tài)
          • TERMINATED: 調(diào)用 terminated() 方法后即為此狀態(tài)

          線程池的狀態(tài)流轉(zhuǎn)及觸發(fā)條件如下

          有了這些基礎(chǔ),我們來分析下 execute 的源碼


          public?void?execute(Runnable?command)?{
          ????if?(command?==?null)
          ????????throw?new?NullPointerException();
          ????int?c?=?ctl.get();
          ????//?如果當(dāng)前線程數(shù)少于核心線程數(shù)(corePoolSize),無論核心線程是否忙碌,都創(chuàng)建線程,直到達(dá)到?corePoolSize?為止
          ????if?(workerCountOf(c)?????????//?創(chuàng)建線程并將此任務(wù)交給?worker?處理(此時(shí)此任務(wù)即?worker?中的?firstTask)
          ????????if?(addWorker(command,?true))
          ????????????return;
          ????????c?=?ctl.get();
          ????}

          ????//?如果線程池處于?RUNNING?狀態(tài),并且線程數(shù)大于?corePoolSize?或者?
          ????//?線程數(shù)少于?corePoolSize?但創(chuàng)建線程失敗了,則將任務(wù)丟進(jìn)?workQueue?中
          ????if?(isRunning(c)?&&?workQueue.offer(command))?{
          ????????int?recheck?=?ctl.get();
          ????????//?這里需要再次檢查線程池是否處于?RUNNING?狀態(tài),因?yàn)樵谌蝿?wù)入隊(duì)后可能線程池狀態(tài)會(huì)發(fā)生變化,(比如調(diào)用了?shutdown?方法等),如果線程狀態(tài)發(fā)生變化了,則移除此任務(wù),執(zhí)行拒絕策略
          ????????if?(!?isRunning(recheck)?&&?remove(command))
          ????????????reject(command);
          ????????//?如果線程池在?RUNNING?狀態(tài)下,線程數(shù)為?0,則新建線程加速處理?workQueue?中的任務(wù)
          ????????else?if?(workerCountOf(recheck)?==?0)
          ????????????addWorker(null,?false);
          ????}
          ????//?這段邏輯說明線程數(shù)大于?corePoolSize?且任務(wù)入隊(duì)失敗了,此時(shí)會(huì)以最大線程數(shù)(maximumPoolSize)為界來創(chuàng)建線程,如果失敗,說明線程數(shù)超過了?maximumPoolSize,則執(zhí)行拒絕策略
          ????else?if?(!addWorker(command,?false))
          ????????reject(command);
          }

          從這段代碼中可以看到,創(chuàng)建線程是調(diào)用 addWorker 實(shí)現(xiàn)的,在分析 addWorker 之前,有必要簡(jiǎn)單提一下 Worker,線程池把每一個(gè)執(zhí)行任務(wù)的線程都封裝為 Worker 的形式,取名為 Worker 很形象,線程池的本質(zhì)是生產(chǎn)者-消費(fèi)者模型,生產(chǎn)者不斷地往 workQueue 中丟 task, workQueue 就像流水線一樣不斷地輸送著任務(wù),而 worker(工人) 不斷地取任務(wù)來執(zhí)行

          那么問題來了,為啥要把線程封裝到 worker 中呢,線程池拿到 task 后直接丟給線程處理或者讓線程自己去 workQueue 中處理不就完了?

          將線程封裝為 worker 主要是為了更好地管理線程的中斷

          來看下 Worker 的定義

          //?此處可以看出?worker?既是一個(gè)?Runnable?任務(wù),也實(shí)現(xiàn)了?AQS(實(shí)際上是用?AQS?實(shí)現(xiàn)了一個(gè)獨(dú)占鎖,這樣由于?worker?運(yùn)行時(shí)會(huì)上鎖,執(zhí)行?shutdown,setCorePoolSize,setMaximumPoolSize等方法時(shí)會(huì)試著中斷線程(interruptIdleWorkers)?,在這個(gè)方法中斷方法中會(huì)先嘗試獲取?worker?的鎖,如果不成功,說明?worker?在運(yùn)行中,此時(shí)會(huì)先讓?worker?執(zhí)行完任務(wù)再關(guān)閉?worker?的線程,實(shí)現(xiàn)優(yōu)雅關(guān)閉線程的目的)
          private?final?class?Worker
          ????extends?AbstractQueuedSynchronizer
          ????implements?Runnable
          ????
          {
          ????????private?static?final?long?serialVersionUID?=?6138294804551838833L;

          ????????//?實(shí)際執(zhí)行任務(wù)的線程
          ????????final?Thread?thread;
          ????????//?上文提到,如果當(dāng)前線程數(shù)少于核心線程數(shù),創(chuàng)建線程并將提交的任務(wù)交給?worker?處理處理,此時(shí)?firstTask?即為此提交的任務(wù),如果?worker?從?workQueue?中獲取任務(wù),則?firstTask?為空
          ????????Runnable?firstTask;
          ????????//?統(tǒng)計(jì)完成的任務(wù)數(shù)
          ????????volatile?long?completedTasks;

          ????????Worker(Runnable?firstTask)?{
          ????????????//?初始化為?-1,這樣在線程運(yùn)行前(調(diào)用runWorker)禁止中斷,在?interruptIfStarted()?方法中會(huì)判斷?getState()>=0
          ????????????setState(-1);?
          ????????????this.firstTask?=?firstTask;

          ????????????//?根據(jù)線程池的?threadFactory?創(chuàng)建一個(gè)線程,將?worker?本身傳給線程(因?yàn)?worker?實(shí)現(xiàn)了?Runnable?接口)
          ????????????this.thread?=?getThreadFactory().newThread(this);
          ????????}

          ????????public?void?run()?{
          ????????????//?thread?啟動(dòng)后會(huì)調(diào)用此方法
          ????????????runWorker(this);
          ????????}

          ???????
          ????????//?1?代表被鎖住了,0?代表未鎖
          ????????protected?boolean?isHeldExclusively()?{
          ????????????return?getState()?!=?0;
          ????????}

          ????????//?嘗試獲取鎖
          ????????protected?boolean?tryAcquire(int?unused)?{
          ????????????//?從這里可以看出它是一個(gè)獨(dú)占鎖,因?yàn)楫?dāng)獲取鎖后,cas?設(shè)置?state?不可能成功,這里我們也能明白上文中將?state?設(shè)置為?-1?的作用,這種情況下永遠(yuǎn)不可能獲取得鎖,而?worker?要被中斷首先必須獲取鎖
          ????????????if?(compareAndSetState(0,?1))?{
          ????????????????setExclusiveOwnerThread(Thread.currentThread());
          ????????????????return?true;
          ????????????}
          ????????????return?false;
          ????????}

          ????????//?嘗試釋放鎖
          ????????protected?boolean?tryRelease(int?unused)?{
          ????????????setExclusiveOwnerThread(null);
          ????????????setState(0);
          ????????????return?true;
          ????????}????

          ????????public?void?lock()????????{?acquire(1);?}
          ????????public?boolean?tryLock()??{?return?tryAcquire(1);?}
          ????????public?void?unlock()??????{?release(1);?}
          ????????public?boolean?isLocked()?{?return?isHeldExclusively();?}
          ????????????
          ????????//?中斷線程,這個(gè)方法會(huì)被?shutdowNow?調(diào)用,從中可以看出?shutdownNow?要中斷線程不需要獲取鎖,也就是說如果線程正在運(yùn)行,照樣會(huì)給你中斷掉,所以一般來說我們不用?shutdowNow?來中斷線程,太粗暴了,中斷時(shí)線程很可能在執(zhí)行任務(wù),影響任務(wù)執(zhí)行
          ????????void?interruptIfStarted()?{
          ????????????Thread?t;
          ????????????//?中斷也是有條件的,必須是?state?>=?0?且?t?!=?null?且線程未被中斷
          ????????????//?如果?state?==?-1?,不執(zhí)行中斷,再次明白了為啥上文中?setState(-1)?的意義
          ????????????if?(getState()?>=?0?&&?(t?=?thread)?!=?null?&&?!t.isInterrupted())?{
          ????????????????try?{
          ????????????????????t.interrupt();
          ????????????????}?catch?(SecurityException?ignore)?{
          ????????????????}
          ????????????}
          ????????}
          ????}

          通過上文對(duì) Worker 類的分析,相信大家不難理解 將線程封裝為 worker 主要是為了更好地管理線程的中斷 這句話。

          理解了 Worker 的意義,我們?cè)賮砜?addWorker 的方法

          private?boolean?addWorker(Runnable?firstTask,?boolean?core)?{
          ????retry:
          ????for?(;;)?{
          ????????int?c?=?ctl.get();

          ????????//?獲取線程池的狀態(tài)
          ????????int?rs?=?runStateOf(c);

          ????????//?如果線程池的狀態(tài)?>=?SHUTDOWN,即為?SHUTDOWN,STOP,TIDYING,TERMINATED?這四個(gè)狀態(tài),只有一種情況有可能創(chuàng)建線程,即線程狀態(tài)為?SHUTDOWN,?且隊(duì)列非空時(shí),firstTask?==?null?代表創(chuàng)建一個(gè)不接收新任務(wù)的線程(此線程會(huì)從?workQueue?中獲取任務(wù)再執(zhí)行),這種情況下創(chuàng)建線程是為了加速處理完?workQueue?中的任務(wù)
          ????????if?(rs?>=?SHUTDOWN?&&
          ????????????!?(rs?==?SHUTDOWN?&&
          ???????????????firstTask?==?null?&&
          ???????????????!?workQueue.isEmpty()))
          ????????????return?false;

          ????????for?(;;)?{
          ????????????//?獲取線程數(shù)
          ????????????int?wc?=?workerCountOf(c);
          ????????????//?如果超過了線程池的最大?CAPACITY(5?億多,基本不可能)
          ????????????//?或者?超過了?corePoolSize(core?為?true)?或者?maximumPoolSize(core?為?false)?時(shí)
          ????????????//?則返回?false
          ????????????if?(wc?>=?CAPACITY?||
          ????????????????wc?>=?(core???corePoolSize?:?maximumPoolSize))
          ????????????????return?false;
          ????????????//?否則?CAS?增加線程的數(shù)量,如果成功跳出雙重循環(huán)
          ????????????if?(compareAndIncrementWorkerCount(c))
          ????????????????break?retry;
          ????????????c?=?ctl.get();??//?Re-read?ctl

          ????????????//?如果線程運(yùn)行狀態(tài)發(fā)生變化,跳到外層循環(huán)繼續(xù)執(zhí)行
          ????????????if?(runStateOf(c)?!=?rs)
          ????????????????continue?retry;
          ????????????//?說明是因?yàn)?CAS?增加線程數(shù)量失敗所致,繼續(xù)執(zhí)行?retry?的內(nèi)層循環(huán)
          ????????}
          ????}

          ????boolean?workerStarted?=?false;
          ????boolean?workerAdded?=?false;
          ????Worker?w?=?null;
          ????try?{
          ????????//?能執(zhí)行到這里,說明滿足增加?worker?的條件了,所以創(chuàng)建?worker,準(zhǔn)備添加進(jìn)線程池中執(zhí)行任務(wù)
          ????????w?=?new?Worker(firstTask);
          ????????final?Thread?t?=?w.thread;
          ????????if?(t?!=?null)?{
          ????????????//?加鎖,是因?yàn)橄挛囊?w?添加進(jìn)?workers?中,?workers?是?HashSet,不是線程安全的,所以需要加鎖予以保證
          ????????????final?ReentrantLock?mainLock?=?this.mainLock;
          ????????????mainLock.lock();
          ????????????try?{
          ????????????????//??再次?check?線程池的狀態(tài)以防執(zhí)行到此步時(shí)發(fā)生中斷等
          ????????????????int?rs?=?runStateOf(ctl.get());
          ????????????????//?如果線程池狀態(tài)小于?SHUTDOWN(即為?RUNNING),
          ????????????????//?或者狀態(tài)為?SHUTDOWN?但?firstTask?==?null(代表不接收任務(wù),只是創(chuàng)建線程處理?workQueue?中的任務(wù)),則滿足添加?worker?的條件
          ????????????????if?(rs?????????????????????(rs?==?SHUTDOWN?&&?firstTask?==?null))?{
          ????????????????????????????????????????//?如果線程已啟動(dòng),顯然有問題(因?yàn)閯?chuàng)建?worker?后,還沒啟動(dòng)線程呢),拋出異常
          ????????????????????if?(t.isAlive())?
          ????????????????????????throw?new?IllegalThreadStateException();
          ????????????????????workers.add(w);
          ????????????????????int?s?=?workers.size();

          ????????????????????//?記錄最大的線程池大小以作監(jiān)控之用
          ????????????????????if?(s?>?largestPoolSize)
          ????????????????????????largestPoolSize?=?s;
          ????????????????????workerAdded?=?true;
          ????????????????}
          ????????????}?finally?{
          ????????????????mainLock.unlock();
          ????????????}

          ????????????//?說明往?workers?中添加?worker?成功,此時(shí)啟動(dòng)線程
          ????????????if?(workerAdded)?{
          ????????????????t.start();
          ????????????????workerStarted?=?true;
          ????????????}
          ????????}
          ????}?finally?{
          ????????//?添加線程失敗,執(zhí)行?addWorkerFailed?方法,主要做了將?worker?從?workers?中移除,減少線程數(shù),并嘗試著關(guān)閉線程池這樣的操作
          ????????if?(!?workerStarted)
          ????????????addWorkerFailed(w);
          ????}
          ????return?workerStarted;
          }

          從這段代碼我們可以看到多線程下情況的不可預(yù)料性,我們發(fā)現(xiàn)在滿足條件情況下,又對(duì)線程狀態(tài)重新進(jìn)行了 check,以防期間出現(xiàn)中斷等線程池狀態(tài)發(fā)生變更的操作,這也給我們以啟發(fā):多線程環(huán)境下的各種臨界條件一定要考慮到位。

          執(zhí)行 addWorker 創(chuàng)建 worker 成功后,線程開始執(zhí)行了(t.start()),由于在創(chuàng)建 Worker 時(shí),將 Worker ?自己傳給了此線程,所以啟動(dòng)線程后,會(huì)調(diào)用 ?Worker 的 run 方法

          public?void?run()?{
          ????runWorker(this);
          }

          可以看到最終會(huì)調(diào)用 ?runWorker 方法,接下來我們來分析下 runWorker 方法

          final?void?runWorker(Worker?w)?{
          ????Thread?wt?=?Thread.currentThread();
          ????Runnable?task?=?w.firstTask;
          ????w.firstTask?=?null;
          ????//?unlock?會(huì)調(diào)用?tryRelease?方法將?state?設(shè)置成?0,代表允許中斷,允許中斷的條件上文我們?cè)?interruptIfStarted()?中有提過,即?state?>=?0
          ????w.unlock();
          ????boolean?completedAbruptly?=?true;
          ????try?{
          ????????//?如果在提交任務(wù)時(shí)創(chuàng)建了線程,并把任務(wù)丟給此線程,則會(huì)先執(zhí)行此?task
          ????????//?否則從任務(wù)隊(duì)列中獲取?task?來執(zhí)行(即?getTask()?方法)
          ????????while?(task?!=?null?||?(task?=?getTask())?!=?null)?{
          ????????????w.lock();
          ????????????
          ????????????//?如果線程池狀態(tài)為?>=?STOP(即?STOP,TIDYING,TERMINATED?)時(shí),則線程應(yīng)該中斷
          ????????????//?如果線程池狀態(tài)?
          ????????????if?((runStateAtLeast(ctl.get(),?STOP)?||
          ?????????????????(Thread.interrupted()?&&
          ??????????????????runStateAtLeast(ctl.get(),?STOP)))?&&
          ????????????????!wt.isInterrupted())
          ????????????????wt.interrupt();
          ????????????try?{
          ????????????????//?執(zhí)行任務(wù)前,子類可實(shí)現(xiàn)此鉤子方法作為統(tǒng)計(jì)之用
          ????????????????beforeExecute(wt,?task);
          ????????????????Throwable?thrown?=?null;
          ????????????????try?{
          ????????????????????task.run();
          ????????????????}?catch?(RuntimeException?x)?{
          ????????????????????thrown?=?x;?throw?x;
          ????????????????}?catch?(Error?x)?{
          ????????????????????thrown?=?x;?throw?x;
          ????????????????}?catch?(Throwable?x)?{
          ????????????????????thrown?=?x;?throw?new?Error(x);
          ????????????????}?finally?{
          ????????????????????//?執(zhí)行任務(wù)后,子類可實(shí)現(xiàn)此鉤子方法作為統(tǒng)計(jì)之用
          ????????????????????afterExecute(task,?thrown);
          ????????????????}
          ????????????}?finally?{
          ????????????????task?=?null;
          ????????????????w.completedTasks++;
          ????????????????w.unlock();
          ????????????}
          ????????}
          ????????completedAbruptly?=?false;
          ????}?finally?{
          ????????//?如果執(zhí)行到這只有兩種可能,一種是執(zhí)行過程中異常中斷了,一種是隊(duì)列里沒有任務(wù)了,從這里可以看出線程沒有核心線程與非核心線程之分,哪個(gè)任務(wù)異常了或者正常退出了都會(huì)執(zhí)行此方法,此方法會(huì)根據(jù)情況將線程數(shù)-1
          ????????processWorkerExit(w,?completedAbruptly);
          ????}
          }

          來看看 processWorkerExit 方法是咋樣的

          private?void?processWorkerExit(Worker?w,?boolean?completedAbruptly)?{
          ????????//?如果異常退出,cas?執(zhí)行線程池減?1?操作
          ????if?(completedAbruptly)?
          ????????decrementWorkerCount();

          ????final?ReentrantLock?mainLock?=?this.mainLock;
          ????mainLock.lock();
          ????try?{
          ????????completedTaskCount?+=?w.completedTasks;
          ????????//?加鎖確保線程安全地移除?worker?
          ????????workers.remove(w);
          ????}?finally?{
          ????????mainLock.unlock();
          ????}

          ????//?woker?既然異常退出,可能線程池狀態(tài)變了(如執(zhí)行?shutdown?等),嘗試著關(guān)閉線程池
          ????tryTerminate();

          ????int?c?=?ctl.get();

          ????//??如果線程池處于?STOP?狀態(tài),則如果?woker?是異常退出的,重新新增一個(gè)?woker,如果是正常退出的,在?wokerQueue?為非空的條件下,確保至少有一個(gè)線程在運(yùn)行以執(zhí)行?wokerQueue?中的任務(wù)
          ????if?(runStateLessThan(c,?STOP))?{
          ????????if?(!completedAbruptly)?{
          ????????????int?min?=?allowCoreThreadTimeOut???0?:?corePoolSize;
          ????????????if?(min?==?0?&&?!?workQueue.isEmpty())
          ????????????????min?=?1;
          ????????????if?(workerCountOf(c)?>=?min)
          ????????????????return;?//?replacement?not?needed
          ????????}
          ????????addWorker(null,?false);
          ????}
          }

          接下來我們分析 woker 從 workQueue 中取任務(wù)的方法 getTask

          private?Runnable?getTask()?{
          ????boolean?timedOut?=?false;?//?Did?the?last?poll()?time?out?

          ????for?(;;)?{
          ????????int?c?=?ctl.get();
          ????????int?rs?=?runStateOf(c);

          ????????//?如果線程池狀態(tài)至少為?STOP?或者
          ????????//?線程池狀態(tài)?==?SHUTDOWN?并且任務(wù)隊(duì)列是空的
          ????????//?則減少線程數(shù)量,返回?null,這種情況下上文分析的?runWorker?會(huì)執(zhí)行?processWorkerExit?從而讓獲取此?Task?的?woker?退出
          ????????if?(rs?>=?SHUTDOWN?&&?(rs?>=?STOP?||?workQueue.isEmpty()))?{
          ????????????decrementWorkerCount();
          ????????????return?null;
          ????????}

          ????????int?wc?=?workerCountOf(c);

          ????????//?如果?allowCoreThreadTimeOut?為?true,代表任何線程在?keepAliveTime?時(shí)間內(nèi)處于?idle?狀態(tài)都會(huì)被回收,如果線程數(shù)大于?corePoolSize,本身在?keepAliveTime?時(shí)間內(nèi)處于?idle?狀態(tài)就會(huì)被回收
          ????????boolean?timed?=?allowCoreThreadTimeOut?||?wc?>?corePoolSize;

          ????????//?worker?應(yīng)該被回收的幾個(gè)條件,這個(gè)比較簡(jiǎn)單,就此略過
          ????????if?((wc?>?maximumPoolSize?||?(timed?&&?timedOut))
          ????????????&&?(wc?>?1?||?workQueue.isEmpty()))?{
          ????????????if?(compareAndDecrementWorkerCount(c))
          ????????????????return?null;
          ????????????continue;
          ????????}

          ????????try?{
          ???????????//?阻塞獲取?task,如果在?keepAliveTime?時(shí)間內(nèi)未獲取任務(wù),說明超時(shí)了,此時(shí)?timedOut?為?true
          ????????????Runnable?r?=?timed??
          ????????????????workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS)?:
          ????????????????workQueue.take();
          ????????????if?(r?!=?null)
          ????????????????return?r;
          ????????????timedOut?=?true;
          ????????}?catch?(InterruptedException?retry)?{
          ????????????timedOut?=?false;
          ????????}
          ????}
          }

          經(jīng)過以上源碼剖析,相信我們對(duì)線程池的工作原理了解得八九不離十了,再來簡(jiǎn)單過一下其他一些比較有用的方法,開頭我們提到線程池的監(jiān)控問題,我們看一下可以監(jiān)控哪些指標(biāo)

          • int getCorePoolSize():獲取核心線程數(shù)。
          • int getLargestPoolSize():歷史峰值線程數(shù)。
          • int getMaximumPoolSize():最大線程數(shù)(線程池線程容量)。
          • int getActiveCount():當(dāng)前活躍線程數(shù)
          • int getPoolSize():當(dāng)前線程池中的線程總數(shù)
          • BlockingQueuegetQueue() 當(dāng)前線程池的任務(wù)隊(duì)列,據(jù)此可以獲取積壓任務(wù)的總數(shù),getQueue.size()

          監(jiān)控思路也很簡(jiǎn)單,開啟一個(gè)定時(shí)線程 ScheduledThreadPoolExecutor,定期對(duì)這些線程池指標(biāo)進(jìn)行采集,一般會(huì)采用一些開源工具如 Grafana + Prometheus + MicroMeter 來實(shí)現(xiàn)。

          如何實(shí)現(xiàn)核心線程池的預(yù)熱

          使用 ?prestartAllCoreThreads() 方法,這個(gè)方法會(huì)一次性創(chuàng)建 corePoolSize 個(gè)線程,無需等到提交任務(wù)時(shí)才創(chuàng)建,提交創(chuàng)建好線程的話,一有任務(wù)提交過來,這些線程就可以立即處理。

          如何實(shí)現(xiàn)動(dòng)態(tài)調(diào)整線程池參數(shù)

          • setCorePoolSize(int corePoolSize) 調(diào)整核心線程池大小
          • setMaximumPoolSize(int maximumPoolSize)
          • setKeepAliveTime() 設(shè)置線程的存活時(shí)間

          解答開篇的問題

          其它問題基本都在源碼剖析環(huán)節(jié)回答了,這里簡(jiǎn)單說下其他問題

          1、Tomcat 的線程池和 JDK 的線程池實(shí)現(xiàn)有啥區(qū)別, Dubbo 中有類似 Tomcat 的線程池實(shí)現(xiàn)嗎? Dubbo 中一個(gè)叫 EagerThreadPool 的東西,可以看看它的使用說明

          從注釋里可以看出,如果核心線程都處于 busy 狀態(tài),如果有新的請(qǐng)求進(jìn)來,EagerThreadPool 會(huì)選擇先創(chuàng)建線程,而不是將其放入任務(wù)隊(duì)列中,這樣可以更快地響應(yīng)這些請(qǐng)求。

          Tomcat 實(shí)現(xiàn)也是與此類似的,只不過稍微有所不同,當(dāng) Tomcat 啟動(dòng)時(shí),會(huì)先創(chuàng)建 minSpareThreads 個(gè)線程,如果經(jīng)過一段時(shí)間收到請(qǐng)求時(shí)這些線程都處于忙碌狀態(tài),每次都會(huì)以 minSpareThreads 的步長(zhǎng)創(chuàng)建線程,本質(zhì)上也是為了更快地響應(yīng)處理請(qǐng)求。具體的源碼可以看它的 ThreadPool 實(shí)現(xiàn),這里就不展開了。

          2、我司網(wǎng)關(guān) dubbo 調(diào)用線程池曾經(jīng)出現(xiàn)過這樣的一個(gè)問題:壓測(cè)時(shí)接口可以正常返回,但接口 RT 很高,假設(shè)設(shè)置的核心線程大小為 500,最大線程為 800,緩沖隊(duì)列為 5000,你能從這個(gè)設(shè)置中發(fā)現(xiàn)出一些問題并對(duì)這些參數(shù)進(jìn)行調(diào)優(yōu)嗎?這個(gè)參數(shù)明顯能看出問題來,首先任務(wù)隊(duì)列設(shè)置過大,任務(wù)達(dá)到核心線程后,如果再有請(qǐng)求進(jìn)來會(huì)先進(jìn)入任務(wù)隊(duì)列,隊(duì)列滿了之后才創(chuàng)建線程,創(chuàng)建線程也是需要不少開銷的,所以我們后來把核心線程設(shè)置成了與最大線程一樣,并且調(diào)用 prestartAllCoreThreads() 來預(yù)熱核心線程,就不用等請(qǐng)求來時(shí)再創(chuàng)建線程了。

          線程池的幾個(gè)最佳實(shí)踐

          1、線程池執(zhí)行的任務(wù)應(yīng)該是互相獨(dú)立的,如果互相依賴的話,可能導(dǎo)致死鎖,比如下面這樣的代碼

          ExecutorService?pool?=?Executors
          ??.newSingleThreadExecutor();
          pool.submit(()?->?{
          ??try?{
          ????String?qq=pool.submit(()->"QQ").get();
          ????System.out.println(qq);
          ??}?catch?(Exception?e)?{
          ??}
          });

          2、核心任務(wù)與非核心任務(wù)最好能用多個(gè)線程池隔離開來

          曾經(jīng)我們業(yè)務(wù)上就出現(xiàn)這樣的一個(gè)故障:突然很多用戶反饋短信收不到了,排查才發(fā)現(xiàn)發(fā)短信是在一個(gè)線程池里,而另外的定時(shí)腳本也是用的這個(gè)線程池來執(zhí)行任務(wù),這個(gè)腳本一分鐘可能產(chǎn)生幾百上千條任務(wù),導(dǎo)致發(fā)短信的方法在線程池里基本沒機(jī)會(huì)執(zhí)行,后來我們用了兩個(gè)線程池把發(fā)短信和執(zhí)行腳本隔離開來解決了問題。

          3、添加線程池監(jiān)控,動(dòng)態(tài)設(shè)置線程池

          如前文所述,線程池的各個(gè)參數(shù)很難一次性確定,既然難以確定,又要保證發(fā)現(xiàn)問題后及時(shí)解決,我們就需要為線程池增加監(jiān)控,監(jiān)控隊(duì)列大小,線程數(shù)量等,我們可以設(shè)置 3 分鐘內(nèi)比如隊(duì)列任務(wù)一直都是滿了的話,就觸發(fā)告警,這樣可以提前預(yù)警,如果線上因?yàn)榫€程池參數(shù)設(shè)置不合理而觸發(fā)了降級(jí)等操作,可以通過動(dòng)態(tài)設(shè)置線程池的方式來實(shí)時(shí)修改核心線程數(shù),最大線程數(shù)等,將問題及時(shí)修復(fù)。

          總結(jié)

          本文詳細(xì)剖析了線程池的工作原理,相信大家對(duì)其工作機(jī)制應(yīng)該有了較深入的了解,也對(duì)開頭的幾個(gè)問題有了較清楚的認(rèn)識(shí),本質(zhì)上設(shè)置線程池的目的是為了利用有效的資源最大化性能,最小化風(fēng)險(xiǎn),同時(shí)線程池的使用本質(zhì)上是為了更好地為用戶服務(wù),據(jù)此也不難明白 Tomcat, Dubbo 要另起爐灶來設(shè)置自己的線程池了。

          ? 巨人的肩膀

          • https://dzone.com/articles/how-much-memory-does-a-java-thread-take
          • https://segmentfault.com/a/1190000021047279
          • https://www.cnblogs.com/trust-freedom/p/6681948.html
          • 深入理解線程池 https://tinyurl.com/y675j928
          • 有的線程它死了,于是它變成一道面試題 https://mp.weixin.qq.com/s/wrTVGLDvhE-eb5lhygWEqQ
          • Java 并發(fā)編程實(shí)戰(zhàn)
          • Java線程池實(shí)現(xiàn)原理及其在美團(tuán)業(yè)務(wù)中的實(shí)踐: https://mp.weixin.qq.com/s/baYuX8aCwQ9PP6k7TDl2Ww
          • 線程池異常處理詳解,一文搞懂!https://www.cnblogs.com/ncy1/articles/11629933.html

          推薦閱讀
          點(diǎn)個(gè)外賣,我把「軟中斷」搞懂了
          讀者問:小林怎么學(xué)操作系統(tǒng)和計(jì)算機(jī)網(wǎng)絡(luò)呀?
          讀者問:小林你的 500 張圖是怎么畫的?
          讀者問:小林你能分享做公眾號(hào)的經(jīng)驗(yàn)嗎?
          瀏覽 61
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产亚洲一区二区三区 | 欧美精品久久久又大又粗 | 西西444WWW无码视频 | 一级黄色性爱免费网站 | 伊人免费成人视频 |