<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          萬字長文爆肝線程池

          共 3932字,需瀏覽 8分鐘

           ·

          2021-02-28 19:40

          點擊藍色“程序員cxuan?”關(guān)注我喲

          加個“星標”,及時接收最新文章

          這是程序員cxuan 的第?59?篇原創(chuàng)文章

          更多文章見?

          https://github.com/crisxuan/bestJavaer



          我們知道,線程需要的時候要進行創(chuàng)建,不需要的時候需要進行銷毀,但是線程的創(chuàng)建和銷毀都是一個開銷比較大的操作。

          為什么開銷大呢?

          雖然我們程序員創(chuàng)建一個線程很容易,直接使用 new Thread() 創(chuàng)建就可以了,但是操作系統(tǒng)做的工作會多很多,它需要發(fā)出 系統(tǒng)調(diào)用,陷入內(nèi)核,調(diào)用內(nèi)核 API 創(chuàng)建線程,為線程分配資源等,這一些操作有很大的開銷。

          所以,在高并發(fā)大流量的情況下,頻繁的創(chuàng)建和銷毀線程會大大拖慢響應(yīng)速度,那么有什么能夠提高響應(yīng)速度的方式嗎?方式有很多,盡量避免線程的創(chuàng)建和銷毀是一種提升性能的方式,也就是把線程 復用 起來,因為性能是我們?nèi)粘W铌P(guān)注的因素。

          本篇文章我們先來通過認識一下 Executor 框架、然后通過描述線程池的基本概念入手、逐步認識線程池的核心類,然后慢慢進入線程池的原理中,帶你一步一步理解線程池。

          在 Java 中可以通過線程池來達到這樣的效果。今天我們就來詳細講解一下 Java 的線程池

          Executor 框架

          為什么要先說一下 Executor 呢?因為我認為 Executor 是線程池的一個驅(qū)動,我們平常創(chuàng)建并執(zhí)行線程用的一般都是 new Thread().start() 這個方法,這個方法更多強調(diào) 創(chuàng)建一個線程并開始運行。而我們后面講到創(chuàng)建線程池更多體現(xiàn)在驅(qū)動執(zhí)行上。

          Executor 的總體框架如下,我們下面會對 Executor 框架中的每個類進行介紹。

          我們首先來認識一下 Executor

          Executor 接口

          Executor 是 java.util.concurrent 的頂級接口,這個接口只有一個方法,那就是 execute 方法。我們平常創(chuàng)建并啟動線程會使用 new Thread().start() ,而 Executor 中的 execute 方法替代了顯示創(chuàng)建線程的方式。Executor 的設(shè)計初衷就是將任務(wù)提交和任務(wù)執(zhí)行細節(jié)進行解藕。使用 Executor 框架,你可以使用如下的方式創(chuàng)建線程

          Executor?executor?=?Executors.xxx?//?xxx?其實就是?Executor?的實現(xiàn)類,我們后面會說
          executor.execute(new?RunnableTask1());
          executor.execute(new?RunnableTask2());

          execute方法接收一個 Runnable 實例,它用來執(zhí)行一個任務(wù),而任務(wù)就是一個實現(xiàn)了 Runnable 接口的類,但是 execute 方法不能接收實現(xiàn)了 Callable 接口的類,也就是說,execute 方法不能接收具有返回值的任務(wù)。

          execute 方法創(chuàng)建的線程是異步執(zhí)行的,也就是說,你不用等待每個任務(wù)執(zhí)行完畢后再執(zhí)行下一個任務(wù)。

          比如下面就是一個簡單的使用 Executor 創(chuàng)建并執(zhí)行線程的示例

          public?class?RunnableTask?implements?Runnable{

          ????@Override
          ????public?void?run()?{
          ????????System.out.println("running");
          ????}

          ????public?static?void?main(String[]?args)?{
          ????????Executor?executor?=?Executors.newSingleThreadExecutor();?//?你可能不太理解這是什么意思,我們后面會說。
          ????????executor.execute(new?RunnableTask());
          ????}
          }

          Executor 就相當于是族長,大佬只發(fā)號令,族長讓你異步執(zhí)行你就得異步執(zhí)行,族長說不用匯報任務(wù)你就不用回報,但是這個族長管的事情有點少,所以除了 Executor 之外,我們還需要認識其他管家,比如說管你這個線程啥時候終止,啥時候暫停,判斷你這個線程當前的狀態(tài)等,ExecutorService 就是一位大管家。

          ExecutorService 接口

          ExecutorService 也是一個接口,它是 Executor 的拓展,提供了一些 Executor 中沒有的方法,下面我們來介紹一下這些方法

          void?shutdown();

          shutdown 方法調(diào)用后,ExecutorService 會有序關(guān)閉正在執(zhí)行的任務(wù),但是不接受新任務(wù)。如果任務(wù)已經(jīng)關(guān)閉,那么這個方法不會產(chǎn)生任何影響。

          ExecutorService 還有一個和 shutdown 方法類似的方法是

          List?shutdownNow();

          shutdownNow 會嘗試停止關(guān)閉所有正在執(zhí)行的任務(wù),停止正在等待的任務(wù),并返回正在等待執(zhí)行的任務(wù)列表。

          既然 shutdown 和 shutdownNow 這么相似,那么二者有啥區(qū)別呢?

          • shutdown 方法只是會將線程池的狀態(tài)設(shè)置為 SHUTWDOWN ,正在執(zhí)行的任務(wù)會繼續(xù)執(zhí)行下去,線程池會等待任務(wù)的執(zhí)行完畢,而沒有執(zhí)行的線程則會中斷。
          • shutdownNow 方法會將線程池的狀態(tài)設(shè)置為 STOP,正在執(zhí)行和等待的任務(wù)則被停止,返回等待執(zhí)行的任務(wù)列表

          ExecutorService 還有三個判斷線程狀態(tài)的方法,分別是

          boolean?isShutdown();
          boolean?isTerminated();
          boolean?awaitTermination(long?timeout,?TimeUnit?unit)
          ????????throws?InterruptedException
          ;
          • isShutdown 方法表示執(zhí)行器是否已經(jīng)關(guān)閉,如果已經(jīng)關(guān)閉,返回 true,否則返回 false。
          • isTerminated 方法表示判斷所有任務(wù)再關(guān)閉后是否已完成,如果完成返回 false,這個需要注意一點,除非首先調(diào)用 shutdown 或者 shutdownNow 方法,否則 isTerminated 方法永遠不會為 true。
          • awaitTermination 方法會阻塞,直到發(fā)出調(diào)用 shutdown 請求后所有的任務(wù)已經(jīng)完成執(zhí)行后才會解除。這個方法不是非常容易理解,下面通過一個小例子來看一下。
          public?static?ExecutorService?executorService?=?Executors.newFixedThreadPool(10);

          public?static?void?main(String[]?args)?throws?InterruptedException?{
          ??for?(int?i?=?0;?i?10;?i++)?{
          ????executorService.submit(()?->?{
          ??????System.out.println(Thread.currentThread().getName());
          ??????try?{
          ????????Thread.sleep(10);
          ??????}?catch?(InterruptedException?e)?{
          ????????e.printStackTrace();
          ??????}
          ????});

          ??}

          ??executorService.shutdown();
          ??System.out.println("Waiting...");
          ??boolean?isTermination?=?executorService.awaitTermination(3,?TimeUnit.SECONDS);
          ??System.out.println("Waiting...Done");
          ??if(isTermination){
          ????System.out.println("All?Thread?Done");
          ??}
          ??System.out.println(Thread.currentThread().getName());
          }

          如果在調(diào)用 executorService.shutdown() 之后,所有線程完成任務(wù),isTermination 返回 true,程序才會打印出 All Thread Done ,如果注釋掉 executorService.shutdown() 或者在任務(wù)沒有完成后 awaitTermination 就超時了,那么 isTermination 就會返回 false。

          ExecutorService 當大管家還有一個原因是因為它不僅能夠包容 Runnable 對象,還能夠接納 Callable 對象。在 ExecutorService 中,submit 方法扮演了這個角色。

          ?Future?submit(Callable?task);
          ?Future?submit(Runnable?task,?T?result);
          Future?submit(Runnable?task);

          submit 方法會返回一個 Future對象, 表示范型,它是對 Callable 產(chǎn)生的返回值來說的,submit 方法提交的任務(wù)中的 call 方法如果返回 Integer,那么 submit 方法就返回 Future,依此類推。

          ?List>?invokeAll(Collection>?tasks)
          ????????throws?InterruptedException;
          ?List>?invokeAll(Collection>?tasks,
          ??????????????????????????????????long?timeout,?TimeUnit?unit)
          ????????throws?InterruptedException;

          invokeAll 方法用于執(zhí)行給定的任務(wù)結(jié)合,執(zhí)行完成后會返回一個任務(wù)列表,任務(wù)列表每一項是一個任務(wù),每個任務(wù)會包括任務(wù)狀態(tài)和執(zhí)行結(jié)果,同樣 invokeAll 方法也會返回 Future 對象。

          ?T?invokeAny(Collection>?tasks)
          ????????throws?InterruptedException,?ExecutionException
          ;
          ?T?invokeAny(Collection>?tasks,
          ????????????????????long?timeout,?TimeUnit?unit)

          ????????throws?InterruptedException,?ExecutionException,?TimeoutException
          ;

          invokeAny 會獲得最先完成任務(wù)的結(jié)果,即Callable 接口中的 call 的返回值,在獲得結(jié)果時,會中斷其他正在執(zhí)行的任務(wù),具有阻塞性

          大管家的職責相對于組長來說標準更多,管的事情也比較寬,但是大管家畢竟也是家族的中流砥柱,他不會做具體的活,他的下面有各個干將,干將是一個家族的核心,他負責完成大管家的工作。

          AbstractExecutorService 抽象類

          AbstractExecutorService 是一個抽象類,它實現(xiàn)了 ExecutorService 中的部分方法,它相當一個干將,會分析大管家有哪些要做的工作,然后針對大管家的要求做一些具體的規(guī)劃,然后找他的得力助手 ThreadPoolExecutor 來完成目標。

          AbstractExecutorService 這個抽象類主要實現(xiàn)了 invokeAllinvokeAny 方法,關(guān)于這兩個方法的源碼分析我們會在后面進行解釋。

          ScheduledExecutorService 接口

          ScheduledExecutorService 也是一個接口,它擴展了 ExecutorService 接口,提供了 ExecutorService 接口所沒有的功能,ScheduledExecutorService 顧名思義就是一個定時執(zhí)行器,定時執(zhí)行器可以安排命令在一定延遲時間后運行或者定期執(zhí)行。

          它主要有三個接口方法,一個重載方法。下面我們先來看一下這兩個重載方法。

          public?ScheduledFuture?schedule(Runnable?command,
          ???????????????????????????????????????long?delay,?TimeUnit?unit);
          public??ScheduledFuture?schedule(Callable?callable,
          ???????????????????????????????????????????long?delay,?TimeUnit?unit)
          ;

          schedule 方法能夠延遲一定時間后執(zhí)行任務(wù),并且只能執(zhí)行一次。可以看到,schedule 方法也返回了一個 ScheduledFuture 對象,ScheduledFuture 對象擴展了 Future 和 Delayed 接口,它表示異步延遲計算的結(jié)果。schedule 方法支持零延遲和負延遲,這兩類值都被視為立即執(zhí)行任務(wù)。

          還有一點需要說明的是,schedule 方法能夠接收相對的時間和周期作為參數(shù),而不是固定的日期,你可以使用 date.getTime - System.currentTimeMillis() 來得到相對的時間間隔。

          public?ScheduledFuture?scheduleAtFixedRate(Runnable?command,
          ??????????????????????????????????????????????????long?initialDelay,
          ??????????????????????????????????????????????????long?period,
          ??????????????????????????????????????????????????TimeUnit?unit);

          scheduleAtFixedRate 表示任務(wù)會根據(jù)固定的速率在時間 initialDelay 后不斷地執(zhí)行。

          public?ScheduledFuture?scheduleWithFixedDelay(Runnable?command,
          ?????????????????????????????????????????????????????long?initialDelay,
          ?????????????????????????????????????????????????????long?delay,
          ?????????????????????????????????????????????????????TimeUnit?unit);

          這個方法和上面的方法很類似,它表示的是以固定延遲時間的方式來執(zhí)行任務(wù)。

          scheduleAtFixedRate 和 scheduleWithFixedDelay 這兩個方法容易混淆,下面我們通過一個示例來說明一下這兩個方法的區(qū)別。

          public?class?ScheduleTest?{

          ????public?static?void?main(String[]?args)?{
          ????????Runnable?command?=?()?->?{
          ????????????long?startTime?=?System.currentTimeMillis();
          ????????????System.out.println("current?timestamp?=?"?+?startTime);
          ????????????try?{
          ????????????????TimeUnit.MILLISECONDS.sleep(new?Random().nextInt(100));
          ????????????}?catch?(InterruptedException?e)?{
          ????????????????e.printStackTrace();
          ????????????}
          ????????????System.out.println("time?spend?=?"?+?(System.currentTimeMillis()?-?startTime));
          ????????};

          ????????ScheduledExecutorService?scheduledExecutorService?=?Executors.newScheduledThreadPool(10);
          ????????scheduledExecutorService.scheduleAtFixedRate(command,100,1000,TimeUnit.MILLISECONDS);
          ????}
          }

          輸出結(jié)果大致如下

          可以看到,每次打印出來 current timestamp 的時間間隔大約等于 1000 毫秒,所以可以斷定 scheduleAtFixedRate 是以恒定的速率來執(zhí)行任務(wù)的。

          然后我們再看一下 scheduleWithFixedDelay 方法,和上面測試類一樣,只不過我們把 scheduleAtFixedRate 換為了 scheduleWithFixedDelay 。

          scheduledExecutorService.scheduleWithFixedDelay(command,10,1000,TimeUnit.MILLISECONDS);

          然后觀察一下輸出結(jié)果

          可以看到,兩個 current timestamp 之間的間隔大約等于 1000(固定時間) + delay(time spend) 的總和,由此可以確定 scheduleWithFixedDelay 是以固定時延來執(zhí)行的。

          線程池的描述

          下面我們先來認識一下什么是線程池,線程池從概念上來看就是一個池子,什么池子呢?是指管理同一組工作線程的池子,也就是說,線程池會統(tǒng)一管理內(nèi)部的工作線程。

          wiki 上說,線程池其實就是一種軟件設(shè)計模式,這種設(shè)計模式用于實現(xiàn)計算機程序中的并發(fā)。

          比如下面就是一個簡單的線程池概念圖。

          注意:這個圖只是一個概念模型,不是真正的線程池實現(xiàn),希望讀者不要混淆。

          可以看到,這種其實也相當于是生產(chǎn)者-消費者模型,任務(wù)隊列中的線程會進入到線程池中,由線程池進行管理,線程池中的一個個線程就是工作線程,工作線程執(zhí)行完畢后會放入完成隊列中,代表已經(jīng)完成的任務(wù)。

          上圖有個缺點,那就是隊列中的線程執(zhí)行完畢后就會銷毀,銷毀就會產(chǎn)生性能損耗,降低響應(yīng)速度,而我們使用線程池的目的往往是需要把線程重用起來,提高程序性能。

          所以我們應(yīng)該把執(zhí)行完成后的工作線程重新利用起來,等待下一次使用。

          線程池創(chuàng)建

          我們上面大概聊了一下什么線程池的基本執(zhí)行機制,你知道了線程是如何復用的,那么任何事物不可能是憑空出現(xiàn)的,線程也一樣,那么它是如何創(chuàng)建出來的呢?下面就不得不提一個工具類,那就是 Executors

          Executors 也是java.util.concurrent 包下的成員,它是一個創(chuàng)建線程池的工廠,可以使用靜態(tài)工廠方法來創(chuàng)建線程池,下面就是 Executors 所能夠創(chuàng)建線程池的具體類型。

          • newFixedThreadPool:newFixedThreadPool 將會創(chuàng)建固定數(shù)量的線程池,這個數(shù)量可以由程序員通過創(chuàng)建 Executors.newFixedThreadPool(int nThreads)時手動指定,每次提交一個任務(wù)就會創(chuàng)建一個線程,在任何時候,nThreads 的值是最多允許活動的線程。如果在所有線程都處于活躍狀態(tài)時有額外的任務(wù)被創(chuàng)建,這些新創(chuàng)建的線程會進入等待隊列等待線程調(diào)度。如果有任何線程由于執(zhí)行期間出現(xiàn)意外導致線程終止,那么在執(zhí)行后續(xù)任務(wù)時會使用等待隊列中的線程進行替代。

          • newWorkStealingPool:newWorkStealingPool 是 JDK1.8 新增加的線程池,它是基于 fork-join 機制的一種線程池實現(xiàn),使用了 Work-Stealing 算法。newWorkStealingPool 會創(chuàng)建足夠的線程來支持并行度,會使用多個隊列來減少競爭。work-stealing pool 線程池不會保證提交任務(wù)的執(zhí)行順序。

          • newSingleThreadExecutor:newSingleThreadExecutor 是一個單線程的執(zhí)行器,它只會創(chuàng)建單個線程來執(zhí)行任務(wù),如果這個線程異常結(jié)束,則會創(chuàng)建另外一個線程來替代。newSingleThreadExecutor 會確保任務(wù)在任務(wù)隊列中的執(zhí)行次序,也就是說,任務(wù)的執(zhí)行是 有序的

          • newCachedThreadPool:newCachedThreadPool 會根據(jù)實際需要創(chuàng)建一個可緩存的線程池。如果線程池的線程數(shù)量超過實際需要處理的任務(wù),那么 newCachedThreadPool 將會回收多余的線程。如果實際需要處理的線程不能滿足任務(wù)的數(shù)量,則回你添加新的線程到線程池中,線程池中線程的數(shù)量不存在任何限制。

          • newSingleThreadScheduledExecutor:newSingleThreadScheduledExecutor 和 newSingleThreadExecutor 很類似,只不過帶有 scheduled 的這個執(zhí)行器哥們能夠在一定延遲后執(zhí)行或者定期執(zhí)行任務(wù)。

          • newScheduledThreadPool:這個線程池和上面的 scheduled 執(zhí)行器類似,只不過 newSingleThreadScheduledExecutor 比 newScheduledThreadPool 多加了一個 DelegatedScheduledExecutorService 代理,這其實包裝器設(shè)計模式的體現(xiàn)。

          上面這些線程池的底層實現(xiàn)都是由 ThreadPoolExecutor 來提供支持的,所以要理解這些線程池的工作原理,你就需要先把 ThreadPoolExecutor 搞明白,下面我們就來聊一聊 ThreadPoolExecutor。

          ThreadPoolExecutor 類

          ThreadPoolExecutor 位于 java.util.concurrent 工具類下,可以說它是線程池中最核心的一個類了。如果你要想把線程池理解透徹的話,就要首先了解一下這個類。

          如果我們再拿上面家族舉例子的話,ThreadPoolExecutor 就是一個家族的骨干人才,家族頂梁柱。ThreadPoolExecutor 做的工作真是太多太多了。

          首先,ThreadPoolExecutor 提供了四個構(gòu)造方法,然而前三個構(gòu)造方法最終都會調(diào)用最后一個構(gòu)造方法進行初始化

          public?class?ThreadPoolExecutor?extends?AbstractExecutorService?{
          ????.....
          ??????//?1
          ????public?ThreadPoolExecutor(int?corePoolSize,int?maximumPoolSize,long?keepAliveTime,TimeUnit?unit,
          ????????????BlockingQueue?workQueue)
          ;
          ????//?2
          ????public?ThreadPoolExecutor(int?corePoolSize,int?maximumPoolSize,long?keepAliveTime,TimeUnit?unit,
          ????????????BlockingQueue?workQueue,ThreadFactory?threadFactory)
          ;
          ????//?3
          ????public?ThreadPoolExecutor(int?corePoolSize,int?maximumPoolSize,long?keepAliveTime,TimeUnit?unit,
          ????????????BlockingQueue?workQueue,RejectedExecutionHandler?handler)
          ;
          ????//?4
          ????public?ThreadPoolExecutor(int?corePoolSize,int?maximumPoolSize,long?keepAliveTime,TimeUnit?unit,
          ????????BlockingQueue?workQueue,ThreadFactory?threadFactory,RejectedExecutionHandler?handler)
          ;
          ????...
          }

          所以我們直接就來看一波最后這個線程池,看看參數(shù)都有啥,如果我沒數(shù)錯的話,應(yīng)該是有 7 個參數(shù)(小學數(shù)學水平。。。。。。)

          • 首先,一個非常重要的參數(shù)就是 corePoolSize,核心線程池的容量/大小,你叫啥我覺得都沒毛病。只不過你得理解這個參數(shù)的意義,它和線程池的實現(xiàn)原理有非常密切的關(guān)系。你剛開始創(chuàng)建了一個線程池,此時是沒有任何線程的,這個很好理解,因為我現(xiàn)在沒有任務(wù)可以執(zhí)行啊,創(chuàng)建線程干啥啊?而且創(chuàng)建線程還有開銷啊,所以等到任務(wù)過來時再創(chuàng)建線程也不晚。但是!我要說但是了,如果調(diào)用了 prestartAllCoreThreads 或者 prestartCoreThread 方法,就會在沒有任務(wù)到來時創(chuàng)建線程,前者是創(chuàng)建 corePoolSize 個線程,后者是只創(chuàng)建一個線程。Lea 爺爺本來想讓我們程序員當個懶漢,等任務(wù)來了再干;可是你非要當個餓漢,提前完成任務(wù)。如果我們想當個懶漢的話,在創(chuàng)建了線程池后,線程池中的線程數(shù)為 0,當有任務(wù)來之后,就會創(chuàng)建一個線程去執(zhí)行任務(wù),當線程池中的線程數(shù)目達到 corePoolSize 后,就會把到達的任務(wù)放到緩存隊列當中。

          • maximumPoolSize :又來一個線程池的容量,只不過這個是線程池的最大容量,也就是線程池所能容納最大的線程,而上面的 corePoolSize 只是核心線程容量。

          我知道你此時會有疑問,那就是不知道如何核心線程的容量和線程最大容量的區(qū)別是吧?我們后面會解釋這點。

          • keepAliveTime:這個參數(shù)是線程池的保活機制,表示線程在沒有任務(wù)執(zhí)行的情況下保持多久會終止。在默認情況下,這個參數(shù)只在線程數(shù)量大于 corePoolSize 時才會生效。當線程數(shù)量大于 corePoolSize 時,如果任意一個空閑的線程的等待時間 > keepAliveTime 后,那么這個線程會被剔除,直到線程數(shù)量等于 corePoolSize 為止。如果調(diào)用了 allowCoreThreadTimeOut 方法,線程數(shù)量在 corePoolSize 范圍內(nèi)也會生效,直到線程減為 0。

          • unit :這個參數(shù)好說,它就是一個 TimeUnit 的變量,unit 表示的是 keepAliveTime 的時間單位。unit 的類型有下面這幾種

            TimeUnit.DAYS;???????????????//天
            TimeUnit.HOURS;?????????????//小時
            TimeUnit.MINUTES;???????????//分鐘
            TimeUnit.SECONDS;???????????//秒
            TimeUnit.MILLISECONDS;??????//毫秒
            TimeUnit.MICROSECONDS;??????//微妙
            TimeUnit.NANOSECONDS;???????//納秒
          • workQueue:這個參數(shù)表示的概念就是等待隊列,我們上面說過,如果核心線程 > corePoolSize 的話,就會把任務(wù)放入等待隊列,這個等待隊列的選擇也是一門學問。Lea 爺爺給我們展示了三種等待隊列的選擇

            • SynchronousQueue: 基于阻塞隊列(BlockingQueue)的實現(xiàn),它會直接將任務(wù)交給消費者,必須等隊列中的添加元素被消費后才能繼續(xù)添加新的元素。使用 SynchronousQueue 阻塞隊列一般要求maximumPoolSizes 為無界,也就是 Integer.MAX_VALUE,避免線程拒絕執(zhí)行操作。
            • LinkedBlockingQueue:LinkedBlockingQueue 是一個無界緩存等待隊列。當前執(zhí)行的線程數(shù)量達到 corePoolSize 的數(shù)量時,剩余的元素會在阻塞隊列里等待。
            • ArrayBlockingQueue:ArrayBlockingQueue 是一個有界緩存等待隊列,可以指定緩存隊列的大小,當正在執(zhí)行的線程數(shù)等于 corePoolSize 時,多余的元素緩存在 ArrayBlockingQueue 隊列中等待有空閑的線程時繼續(xù)執(zhí)行,當 ArrayBlockingQueue 已滿時,加入 ArrayBlockingQueue 失敗,會開啟新的線程去執(zhí)行,當線程數(shù)已經(jīng)達到最大的 maximumPoolSizes 時,再有新的元素嘗試加入 ArrayBlockingQueue時會報錯
          • threadFactory:線程工廠,這個參數(shù)主要用來創(chuàng)建線程;

          • handler :拒絕策略,拒絕策略主要有以下取值

            • AbortPolicy:丟棄任務(wù)并拋出 RejectedExecutionException 異常。
            • DiscardPolicy: 直接丟棄任務(wù),但是不拋出異常。
            • DiscardOldestPolicy:直接丟棄隊列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復此過程)。
            • CallerRunsPolicy:由調(diào)用線程處理該任務(wù)。

          深入理解線程池

          上面我和你簡單聊了一下線程池的基本構(gòu)造,線程池有幾個非常重要的參數(shù)可以細細品味,但是哥們醒醒,接下來才是刺激的地方。

          線程池狀態(tài)

          首先我們先來聊聊線程池狀態(tài),線程池狀態(tài)是一個非常有趣的設(shè)計點,ThreadPoolExecutor 使用 ctl 來存儲線程池狀態(tài),這些狀態(tài)也叫做線程池的生命周期。想想也是,線程池作為一個存儲管理線程的資源池,它自己也要有這些狀態(tài),以及狀態(tài)之間的變更才能更好的滿足我們的需求。ctl 其實就是一個 AtomicInteger 類型的變量,保證原子性

          ctl 除了存儲線程池狀態(tài)之外,它還存儲 workerCount 這個概念,workerCount 指示的是有效線程數(shù),workerCount 表示的是已經(jīng)被允許啟動但不允許停止的工作線程數(shù)量。workerCount 的值與實際活動線程的數(shù)量不同。

          ctl 高低位來判斷是線程池狀態(tài)還是工作線程數(shù)量,線程池狀態(tài)位于高位

          這里有個設(shè)計點,為什么使用 AtomicInteger 而不是存儲上線更大的 AtomicLong 之類的呢?

          Lea 并非沒有考慮過這個問題,為了表示 int 值,目前 workerCount 的大小是**(2 ^ 29)-1(約 5 億個線程),而不是(2 ^ 31)-1(20億個)可表示的線程**。如果將來有問題,可以將該變量更改為 AtomicLong。但是在需要之前,使用 int 可以使此代碼更快,更簡單,int 存儲占用存儲空間更小。

          runState 具有如下幾種狀態(tài)

          private?static?final?int?RUNNING????=?-1?<private?static?final?int?SHUTDOWN???=??0?<private?static?final?int?STOP???????=??1?<private?static?final?int?TIDYING????=??2?<private?static?final?int?TERMINATED?=??3?<

          我們先上狀態(tài)輪轉(zhuǎn)圖,然后根據(jù)狀態(tài)輪轉(zhuǎn)圖做詳細的解釋。

          這幾種狀態(tài)的解釋如下

          • RUNNING: 如果線程池處于 RUNNING 狀態(tài)下的話,能夠接收新任務(wù),也能處理正在運行的任務(wù)。可以從 ctl 的初始化得知,線程池一旦創(chuàng)建出來就會處于 RUNNING 狀態(tài),并且線程池中的有效線程數(shù)為 0。
          private?final?AtomicInteger?ctl?=?new?AtomicInteger(ctlOf(RUNNING,?0));
          • SHUTDOWN: 在調(diào)用 shutdown 方法后,線程池的狀態(tài)會由 RUNNING -> SHUTDOWN 狀態(tài),位于 SHUTDOWN 狀態(tài)的線程池能夠處理正在運行的任務(wù),但是不能接受新的任務(wù),這和我們上面說的對于 shutdown 的描述一致。
          • STOP: 和 shutdown 方法類似,在調(diào)用 shutdownNow 方法時,程序會從 RUNNING/SHUTDOWN -> STOP 狀態(tài),處于 STOP 狀態(tài)的線程池,不接收新任務(wù),不處理已添加的任務(wù),并且會中斷正在處理的任務(wù)。
          • TIDYING:TIDYING 狀態(tài)有個前置條件,分為兩種:一種是是當線程池位于 SHUTDOWN 狀態(tài)下,阻塞隊列和線程池中的線程數(shù)量為空時,會由 SHUTDOWN -> TIDYING;另一種是當線程池位于 STOP 狀態(tài)下時,線程池中的數(shù)量為空時,會由 STOP -> TIDYING 狀態(tài)。轉(zhuǎn)換為 TIDYING 的線程池會調(diào)用 terminated這個鉤子方法,terminated 在 ThreadPoolExecutor 類中是空實現(xiàn),若用戶想在線程池變?yōu)?TIDYING 時,進行相應(yīng)的處理,可以通過重載 terminated 函數(shù)來實現(xiàn)。
          • TERMINATED:TERMINATED 狀態(tài)是線程池的最后一個狀態(tài),線程池處在 TIDYING 狀態(tài)時,執(zhí)行完terminated 方法之后,就會由 TIDYING -> TERMINATED 狀態(tài)。此時表示線程池的徹底終止。

          重要變量

          下面我們一起來了解一下線程池中的重要變量。

          private?final?BlockingQueue?workQueue;

          阻塞隊列,這個和我們上面說的阻塞隊列的參數(shù)是一個意思,因為在構(gòu)造 ThreadPoolExecutor 時,會把參數(shù)的值賦給 this.workQueue。

          private?final?ReentrantLock?mainLock?=?new?ReentrantLock();?

          線程池的主要狀態(tài)鎖,對線程池的狀態(tài)(比如線程池大小、運行狀態(tài))的改變都需要使用到這個鎖

          private?final?HashSet?workers?=?new?HashSet();

          workers 持有線程池中所有線程的集合,只有持有上面 mainLock 的鎖才能夠訪問。

          private?final?Condition?termination?=?mainLock.newCondition();

          等待條件,用來支持 awaitTermination 方法。Condition 和 Lock 一起使用可以實現(xiàn)通知/等待機制。

          private?int?largestPoolSize;

          largestPoolSize 表示線程池中最大池的大小,只有持有 mainLock 才能訪問

          private?long?completedTaskCount;

          completedTaskCount 表示任務(wù)完成的計數(shù),它僅僅在任務(wù)終止時更新,需要持有 mainLock 才能訪問。

          private?volatile?ThreadFactory?threadFactory;

          threadFactory 是創(chuàng)建線程的工廠,所有的線程都會使用這個工廠,調(diào)用 addWorker 方法創(chuàng)建。

          private?volatile?RejectedExecutionHandler?handler;

          handler 表示拒絕策略,handler 會在線程飽和或者將要關(guān)閉的時候調(diào)用。

          private?volatile?long?keepAliveTime;

          保活時間,它指的是空閑線程等待工作的超時時間,當存在多個 corePoolSize 或 allowCoreThreadTimeOut 時,線程將使用這個超時時間。

          下面是一些其他變量,這些變量比較簡單,我就直接給出注釋了。

          private?volatile?boolean?allowCoreThreadTimeOut;???//是否允許為核心線程設(shè)置存活時間
          private?volatile?int???corePoolSize;?????//核心池的大小(即線程池中的線程數(shù)目大于這個參數(shù)時,提交的任務(wù)會被放進任務(wù)緩存隊列)
          private?volatile?int???maximumPoolSize;???//線程池最大能容忍的線程數(shù)
          private?static?final?RejectedExecutionHandler?defaultHandler?=
          ????????new?AbortPolicy();?//?默認的拒絕策略

          任務(wù)提交

          現(xiàn)在我們知道了 ThreadPoolExecutor 創(chuàng)建出來就會處于運行狀態(tài),此時線程數(shù)量為 0 ,等任務(wù)到來時,線程池就會創(chuàng)建線程來執(zhí)行任務(wù),而下面我們的關(guān)注點就會放在任務(wù)提交這個過程上。

          通常情況下,我們會使用

          executor.execute()?

          來執(zhí)行任務(wù),我在很多書和博客教程上都看到過這個執(zhí)行過程,下面是一些書和博客教程所畫的 ThreadPoolExecutor 的執(zhí)行示意圖和執(zhí)行流程圖

          執(zhí)行示意圖

          處理流程圖

          ThreadPoolExecutor 的執(zhí)行 execute 的方法分為下面四種情況

          1. 如果當前運行的工作線程少于 corePoolSize 的話,那么會創(chuàng)建新線程來執(zhí)行任務(wù) ,這一步需要獲取 mainLock 全局鎖
          2. 如果運行線程不小于 corePoolSize,則將任務(wù)加入 BlockingQueue 阻塞隊列。
          3. 如果無法將任務(wù)加入 BlockingQueue 中,此時的現(xiàn)象就是隊列已滿,此時需要創(chuàng)建新的線程來處理任務(wù),這一步同樣需要獲取 mainLock 全局鎖。
          4. 如果創(chuàng)建新線程會使當前運行的線程超過 maximumPoolSize 的話,任務(wù)將被拒絕,并且使用 RejectedExecutionHandler.rejectEExecution() 方法拒絕新的任務(wù)。

          ThreadPoolExecutor 采取上面的整體設(shè)計思路,是為了在執(zhí)行 execute 方法時,避免獲取全局鎖,因為頻繁獲取全局鎖會是一個嚴重的可伸縮瓶頸,所以,幾乎所有的 execute 方法調(diào)用都是通過執(zhí)行步驟2。

          上面指出了 execute 的運行過程,整體上來說這個執(zhí)行過程把非常重要的點講解出來了,但是不夠細致,我查閱 ThreadPoolExecute 和部分源碼分析文章后,發(fā)現(xiàn)這事其實沒這么簡單,先來看一下 execute 的源碼,我已經(jīng)給出了中文注釋

          public?void?execute(Runnable?command)?{
          ??if?(command?==?null)
          ????throw?new?NullPointerException();
          ??//?獲取?ctl?的值
          ??int?c?=?ctl.get();
          ??//?判斷?ctl?的值是否小于核心線程池的數(shù)量
          ??if?(workerCountOf(c)?????//?如果小于,增加工作隊列,command?就是一個個的任務(wù)
          ????if?(addWorker(command,?true))
          ??????//?線程創(chuàng)建成功,直接返回
          ??????return;
          ????//?線程添加不成功,需要再次判斷,每需要一次判斷都會獲取?ctl?的值
          ????c?=?ctl.get();
          ??}
          ??//?如果線程池處于運行狀態(tài)并且能夠成功的放入阻塞隊列
          ??if?(isRunning(c)?&&?workQueue.offer(command))?{
          ????//?再次進行檢查
          ????int?recheck?=?ctl.get();
          ????//?如果不是運行態(tài)并且成功的從阻塞隊列中刪除
          ????if?(!?isRunning(recheck)?&&?remove(command))
          ??????//?執(zhí)行拒絕策略
          ??????reject(command);
          ????//?worker?線程數(shù)量是否為?0
          ????else?if?(workerCountOf(recheck)?==?0)
          ??????//?增加工作線程
          ??????addWorker(null,?false);
          ??}
          ??//?如果不能增加工作線程的數(shù)量,就會直接執(zhí)行拒絕策略
          ??else?if?(!addWorker(command,?false))
          ????reject(command);
          }

          下面是我根據(jù)源碼畫出的執(zhí)行流程圖

          下面我們針對 execute 流程進行分析,可能有點啰嗦,因為幾個核心流程上面已經(jīng)提過了,不過為了流程的完整性,我們再在這里重新提一下。

          1. 如果線程池的核心數(shù)量少于 corePoolSize,那么就會使用 addWorker 創(chuàng)建新線程,addworker 的流程我們會在下面進行分析。如果創(chuàng)建成功,那么 execute 方法會直接返回。如果沒創(chuàng)建成功,可能是由于線程池已經(jīng) shutdown,可能是由于并發(fā)情況下 workerCountOf(c) < corePoolSize ,別的線程先創(chuàng)建了 worker 線程,導致 workerCoun t>= corePoolSize。
          2. 如果線程池還在 Running 狀態(tài),會將 task 加入阻塞隊列,加入成功后會進行 double-check 雙重校驗,繼續(xù)下面的步驟,如果加入失敗,可能是由于隊列線程已滿,此時會判斷是否能夠加入線程池中,如果線程池也滿了的話,就會直接執(zhí)行拒絕策略,如果線程池能加入,execute 方法結(jié)束。
          3. 步驟 2 中的 double-check 主要是為了判斷進入 workQueue 中的 task 是否能被執(zhí)行:如果線程池已經(jīng)不是 Running 狀態(tài),則應(yīng)該拒絕添加任務(wù),從 workQueue 隊列中刪除任務(wù)。如果線程池是 Running,但是從 workQueue 中刪除失敗了,此時的原因可能是由于其他線程執(zhí)行了這個任務(wù),此時會直接執(zhí)行拒絕策略。
          4. 如果線程是 Running 狀態(tài),并且不能把任務(wù)從隊列中移除,進而判斷工作線程是否為 0 ,如果不為 0 ,execute 執(zhí)行完畢,如果工作線程是 0 ,則會使用 addWorker 增加工作線程,execute 執(zhí)行完畢。

          添加 worker 線程

          從上面的執(zhí)行流程可以看出,添加一個 worker 涉及的工作也非常多,這也是一個比價難啃的點,我們一起來分析下,這是 worker 的源碼

          private?boolean?addWorker(Runnable?firstTask,?boolean?core)?{
          ??//?retry?的用法相當于?goto
          ??retry:
          ??for?(;;)?{
          ????int?c?=?ctl.get();
          ????int?rs?=?runStateOf(c);

          ????//?Check?if?queue?empty?only?if?necessary.
          ????//?僅在必要時檢查隊列是否為空。
          ????//?線程池狀態(tài)有五種,state?越小越是運行狀態(tài)
          ????//?rs?>=?SHUTDOWN,表示此時線程池狀態(tài)可能是?SHUTDOWN、STOP、TIDYING、TERMINATED
          ????//?默認?rs?>=?SHUTDOWN,如果?rs?=?SHUTDOWN,直接返回?false
          ????//?默認?rs?
          ????//?默認?RUNNING,任務(wù)是空,如果工作隊列為空,返回?false
          ????//
          ????if?(rs?>=?SHUTDOWN?&&
          ????????!?(rs?==?SHUTDOWN?&&
          ???????????firstTask?==?null?&&
          ???????????!?workQueue.isEmpty()))
          ??????return?false;


          ????//?執(zhí)行循環(huán)
          ????for?(;;)?{
          ??????//?統(tǒng)計工作線程數(shù)量
          ??????int?wc?=?workerCountOf(c);
          ??????//?如果?worker?數(shù)量>線程池最大上限?CAPACITY(即使用int低29位可以容納的最大值)
          ??????//?或者?worker數(shù)量?>?corePoolSize?或?worker數(shù)量>maximumPoolSize?),即已經(jīng)超過了給定的邊界
          ??????if?(wc?>=?CAPACITY?||
          ??????????wc?>=?(core???corePoolSize?:?maximumPoolSize))
          ????????return?false;

          ??????//?使用 CAS 增加 worker 數(shù)量,增加成功,跳出循環(huán)。
          ??????if?(compareAndIncrementWorkerCount(c))
          ????????break?retry;

          ??????//?檢查?ctl
          ??????c?=?ctl.get();??//?Re-read?ctl
          ??????//?如果狀態(tài)不等于之前獲取的?state,跳出內(nèi)層循環(huán),繼續(xù)去外層循環(huán)判斷
          ??????if?(runStateOf(c)?!=?rs)
          ????????continue?retry;
          ??????//?else?CAS?failed?due?to?workerCount?change;?retry?inner?loop
          ????}
          ??}

          ??/*
          ??????????worker數(shù)量+1成功的后續(xù)操作
          ????????*?添加到?workers?Set?集合,并啟動?worker?線程
          ?????????*/

          ??boolean?workerStarted?=?false;
          ??boolean?workerAdded?=?false;
          ??Worker?w?=?null;
          ??try?{
          ????//?包裝?Runnable?對象
          ????//?設(shè)置?firstTask?的值為?-1
          ????//?賦值給當前任務(wù)
          ????//?使用?worker?自身這個?runnable,調(diào)用?ThreadFactory?創(chuàng)建一個線程,并設(shè)置給worker的成員變量thread
          ????w?=?new?Worker(firstTask);
          ????final?Thread?t?=?w.thread;
          ????if?(t?!=?null)?{
          ??????final?ReentrantLock?mainLock?=?this.mainLock;
          ??????mainLock.lock();
          ??????try?{
          ????????//?在持有鎖的時候重新檢查
          ????????//?如果 ThreadFactory 失敗或在獲得鎖之前關(guān)閉,請回退。
          ????????int?rs?=?runStateOf(ctl.get());

          ????????//如果線程池在運行?running
          ????????//?(可能是?workQueue?中仍有未執(zhí)行完成的任務(wù),創(chuàng)建沒有初始任務(wù)的?worker?線程執(zhí)行)
          ????????//worker?數(shù)量?-1?的操作在?addWorkerFailed()
          ????????if?(rs?????????????(rs?==?SHUTDOWN?&&?firstTask?==?null))?{
          ??????????if?(t.isAlive())?//?precheck?that?t?is?startable
          ????????????throw?new?IllegalThreadStateException();

          ??????????//?workers?就是一個?HashSet?集合
          ??????????workers.add(w);

          ??????????//?設(shè)置最大的池大小?largestPoolSize,workerAdded?設(shè)置為true
          ??????????int?s?=?workers.size();
          ??????????if?(s?>?largestPoolSize)
          ????????????largestPoolSize?=?s;
          ??????????workerAdded?=?true;
          ????????}
          ??????}?finally?{
          ????????mainLock.unlock();
          ??????}
          ??????if?(workerAdded)?{
          ????????t.start();
          ????????workerStarted?=?true;
          ??????}
          ????}
          ????//如果啟動線程失敗
          ????//?worker?數(shù)量?-1
          ??}?finally?{
          ????if?(!?workerStarted)
          ??????addWorkerFailed(w);
          ??}
          ??return?workerStarted;
          }

          媽的真長的一個方法,有點想吐血,其實我肝到現(xiàn)在已經(jīng)肝不動了,但我一想到看這篇文章的讀者們能給我一個關(guān)注,就算咳出一口老血也值了。

          這個方法的執(zhí)行流程圖如下

          這里我們就不再文字描述了,但是上面流程圖中有一個對象引起了我的注意,那就是 worker 對象,這個對象就代表了線程池中的工作線程,那么這個 worker 對象到底是啥呢?

          worker 對象

          Worker 位于 ThreadPoolExecutor 內(nèi)部,它繼承了 AQS 類并且實現(xiàn)了 Runnable 接口。Worker 類主要維護了線程運行過程中的中斷控制狀態(tài)。它提供了鎖的獲取和釋放操作。在 worker 的實現(xiàn)中,我們使用了非重入的互斥鎖而不是使用重復鎖,因為 Lea 覺得我們不應(yīng)該在調(diào)用諸如 setCorePoolSize 之類的控制方法時能夠重新獲取鎖。

          worker 對象的源碼比較簡單和標準,這里我們只說一下 worker 對象的構(gòu)造方法,也就是

          Worker(Runnable?firstTask)?{
          ??setState(-1);?
          ??this.firstTask?=?firstTask;
          ??this.thread?=?getThreadFactory().newThread(this);
          }

          構(gòu)造一個 worker 對象需要做三步操作:

          • 初始 AQS 狀態(tài)為 -1,此時不允許中斷 interrupt(),只有在 worker 線程啟動了,執(zhí)行了 runWorker() 方法后,將 state 置為0,才能進行中斷。
          • 將 firstTask 賦值給為當前類的全局變量
          • 通過 ThreadFactory 創(chuàng)建一個新的線程。

          ###任務(wù)運行

          我們前面的流程主要分析了線程池的 execute 方法的執(zhí)行過程,這個執(zhí)行過程相當于是任務(wù)提交過程,而我們下面要說的是從隊列中獲取任務(wù)并運行的這個工作流程。

          一般情況下,我們會從初始任務(wù)開始運行,所以我們不需要獲取第一個任務(wù)。否則,只要線程池還處于 Running 狀態(tài),我們會調(diào)用 getTask 方法獲取任務(wù)。getTask 方法可能會返回 null,此時可能是由于線程池狀態(tài)改變或者是配置參數(shù)更改而導致的退出。還有一種情況可能是由于 異常 而引發(fā)的,這個我們后面會細說。

          下面來看一下 runWorker 方法的源碼:

          final?void?runWorker(Worker?w)?{
          ??Thread?wt?=?Thread.currentThread();
          ??Runnable?task?=?w.firstTask;
          ??w.firstTask?=?null;
          ??//?允許打斷
          ??//??new?Worker()?是?state==-1,此處是調(diào)用?Worker?類的?tryRelease()?方法,
          ??//??將?state?置為0
          ??w.unlock();
          ??boolean?completedAbruptly?=?true;
          ??try?{
          ????//?調(diào)用?getTask()?獲取任務(wù)
          ????while?(task?!=?null?||?(task?=?getTask())?!=?null)?{
          ??????//?獲取全局鎖
          ??????w.lock();
          ??????//?確保只有在線程 STOPING 時,才會被設(shè)置中斷標志,否則清除中斷標志。
          ??????//?如果一開始判斷線程池狀態(tài)?
          ??????//?即線程已經(jīng)被中斷,又清除了中斷標示,再次判斷線程池狀態(tài)是否?>=?stop
          ??????//?是,再次設(shè)置中斷標示,wt.interrupt()
          ??????//?否,不做操作,清除中斷標示后進行后續(xù)步驟
          ??????if?((runStateAtLeast(ctl.get(),?STOP)?||
          ???????????(Thread.interrupted()?&&
          ????????????runStateAtLeast(ctl.get(),?STOP)))?&&
          ??????????!wt.isInterrupted())
          ????????wt.interrupt();
          ??????try?{
          ????????//?執(zhí)行前需要調(diào)用的方法,交給程序員自己來實現(xiàn)
          ????????beforeExecute(wt,?task);
          ????????Throwable?thrown?=?null;
          ????????try?{
          ??????????task.run();
          ????????}?catch?(RuntimeException?x)?{
          ??????????thrown?=?x;?throw?x;
          ????????}?catch?(Error?x)?{
          ??????????thrown?=?x;?throw?x;
          ????????}?catch?(Throwable?x)?{
          ??????????thrown?=?x;?throw?new?Error(x);
          ????????}?finally?{
          ??????????//?執(zhí)行后需要調(diào)用的方法,交給程序員自己來實現(xiàn)
          ??????????afterExecute(task,?thrown);
          ????????}
          ??????}?finally?{
          ????????//?把?task?置為?null,完成任務(wù)數(shù)?+?1,并進行解鎖
          ????????task?=?null;
          ????????w.completedTasks++;
          ????????w.unlock();
          ??????}
          ????}
          ????completedAbruptly?=?false;
          ????//?最后處理?worker?的退出
          ??}?finally?{
          ????processWorkerExit(w,?completedAbruptly);
          ??}
          }

          下面是 runWorker 的執(zhí)行流程圖

          這里需要注意一下最后的 processWorkerExit 方法,這里面其實也做了很多事情,包括判斷 completedAbruptly 的布爾值來表示是否完成任務(wù),獲取鎖,嘗試從隊列中移除 worker,然后嘗試中斷,接下來會判斷一下中斷狀態(tài),在線程池當前狀態(tài)小于 STOP 的情況下會創(chuàng)建一個新的 worker 來替換被銷毀的 worker。

          任務(wù)獲取

          任務(wù)獲取就是 getTask 方法的執(zhí)行過程,這個環(huán)節(jié)主要用來獲取任務(wù)和剔除任務(wù)。下面進入源碼分析環(huán)節(jié)

          private?Runnable?getTask()?{
          ??//?判斷最后一個 poll 是否超時。
          ??boolean?timedOut?=?false;?//?Did?the?last?poll()?time?out?

          ??for?(;;)?{
          ????int?c?=?ctl.get();
          ????int?rs?=?runStateOf(c);

          ????//?Check?if?queue?empty?only?if?necessary.
          ????//?必要時檢查隊列是否為空
          ????//?對線程池狀態(tài)的判斷,兩種情況會?workerCount-1,并且返回?null
          ????//?線程池狀態(tài)為?shutdown,且?workQueue?為空(反映了?shutdown?狀態(tài)的線程池還是要執(zhí)行?workQueue?中剩余的任務(wù)的)
          ????//?線程池狀態(tài)為?stop(shutdownNow()?會導致變成?STOP)(此時不用考慮?workQueue?的情況)
          ????if?(rs?>=?SHUTDOWN?&&?(rs?>=?STOP?||?workQueue.isEmpty()))?{
          ??????decrementWorkerCount();
          ??????return?null;
          ????}

          ????int?wc?=?workerCountOf(c);

          ????//?Are?workers?subject?to?culling?
          ????//?是否需要定時從?workQueue?中獲取
          ????boolean?timed?=?allowCoreThreadTimeOut?||?wc?>?corePoolSize;

          ????//?如果工作線程的數(shù)量大于?maximumPoolSize?會進行線程剔除
          ????//?如果使用了?allowCoreThreadTimeOut?,并且工作線程不為0或者隊列有任務(wù)的話,會直接進行線程剔除
          ????if?((wc?>?maximumPoolSize?||?(timed?&&?timedOut))
          ????????&&?(wc?>?1?||?workQueue.isEmpty()))?{
          ??????if?(compareAndDecrementWorkerCount(c))
          ????????return?null;
          ??????continue;
          ????}
          ??
          ????try?{
          ??????Runnable?r?=?timed??
          ????????workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS)?:
          ??????workQueue.take();
          ??????if?(r?!=?null)
          ????????return?r;
          ??????timedOut?=?true;
          ????}?catch?(InterruptedException?retry)?{
          ??????timedOut?=?false;
          ????}
          ??}
          }

          getTask 方法的執(zhí)行流程圖如下

          工作線程退出

          工作線程退出是 runWorker 的最后一步,這一步會判斷工作線程是否突然終止,并且會嘗試終止線程,以及是否需要增加線程來替換原工作線程。

          private?void?processWorkerExit(Worker?w,?boolean?completedAbruptly)?{
          ??//?worker數(shù)量?-1
          ??//?completedAbruptly?是?true,突然終止,說明是?task?執(zhí)行時異常情況導致,即run()方法執(zhí)行時發(fā)生了異常,那么正在工作的?worker?線程數(shù)量需要-1
          ??//?completedAbruptly?是?false?是突然終止,說明是?worker?線程沒有?task?可執(zhí)行了,不用-1,因為已經(jīng)在?getTask()?方法中-1了
          ??if?(completedAbruptly)?//?If?abrupt,?then?workerCount?wasn't?adjusted
          ????decrementWorkerCount();

          ??//?從?Workers?Set?中移除?worker
          ??final?ReentrantLock?mainLock?=?this.mainLock;
          ??mainLock.lock();
          ??try?{
          ????completedTaskCount?+=?w.completedTasks;
          ????workers.remove(w);
          ??}?finally?{
          ????mainLock.unlock();
          ??}

          ??//?嘗試終止線程,
          ??tryTerminate();

          ??//?是否需要增加?worker?線程
          ??//?線程池狀態(tài)是?running?或?shutdown
          ??//?如果當前線程是突然終止的,addWorker()
          ??//?如果當前線程不是突然終止的,但當前線程數(shù)量?
          ??//?故如果調(diào)用線程池?shutdown(),直到workQueue為空前,線程池都會維持?corePoolSize?個線程,
          ??//?然后再逐漸銷毀這?corePoolSize?個線程
          ??int?c?=?ctl.get();
          ??if?(runStateLessThan(c,?STOP))?{
          ????if?(!completedAbruptly)?{
          ??????int?min?=?allowCoreThreadTimeOut???0?:?corePoolSize;
          ??????if?(min?==?0?&&?!?workQueue.isEmpty())
          ????????min?=?1;
          ??????if?(workerCountOf(c)?>=?min)
          ????????return;?//?replacement?not?needed
          ????}
          ????addWorker(null,?false);
          ??}
          }

          源碼搞的有點頭大了,可能一時半會無法理解上面這些源碼,不過你可以先把注釋粘過去,等有時間了需要反復刺激,加深印象!

          其他線程池

          下面我們來了解一下其他線程池的構(gòu)造原理,主要涉及 FixedThreadPool、SingleThreadExecutor、CachedThreadPool

          newFixedThreadPool

          newFixedThreadPool 被稱為可重用固定線程數(shù)的線程池,下面是 newFixedThreadPool 的源碼

          public?static?ExecutorService?newFixedThreadPool(int?nThreads)?{
          ??return?new?ThreadPoolExecutor(nThreads,?nThreads,
          ????????????????????????????????0L,?TimeUnit.MILLISECONDS,
          ????????????????????????????????new?LinkedBlockingQueue());
          }

          可以看到,newFixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被設(shè)置為創(chuàng)建 FixedThreadPool 時指定的參數(shù) nThreads,也就是說,在 newFiexedThreadPool 中,核心線程數(shù)就是最大線程數(shù)。

          下面是 newFixedThreadPool 的執(zhí)行示意圖

          newFixedThreadPool 的工作流程如下

          • 如果當前運行的線程數(shù)少于 corePoolSize,則會創(chuàng)建新線程 addworker 來執(zhí)行任務(wù)
          • 如果當前線程的線程數(shù)等于 corePoolSize,會將任務(wù)直接加入到 LinkedBlockingQueue 無界阻塞隊列中,LinkedBlockingQueue 的上限如果沒有制定,默認為 Integer.MAX_VALUE 大小。
          • 等到線程池中的任務(wù)執(zhí)行完畢后,newFixedThreadPool 會反復從 LinkedBlockingQueue 中獲取任務(wù)來執(zhí)行。

          相較于 ThreadPoolExecutor,newFixedThreadPool 主要做了以下改變

          • 核心線程數(shù)等于最大線程數(shù),因此 newFixedThreadPool 只有兩個最大容量,一個是線程池的線程容量,還有一個是 LinkedBlockingQueue 無界阻塞隊列的線程容量。

          • 這里可以看到還有一個變化是 0L,也就是 keepAliveTime = 0L,keepAliveTime 就是到達工作線程最大容量后的線程等待時間,0L 就意味著當線程池中的線程數(shù)大于 corePoolsize 時,空余的線程會被立即終止。

          • 由于使用無界隊列,運行中的 newFixedThreadPool 不會拒絕任務(wù),也就是不會調(diào)用 RejectedExecutionHandler.rejectedExecution 方法。

          newSingleThreadExecutor

          newSingleThreadExecutor 中只有單個工作線程,也就是說它是一個只有單個 worker 的 Executor。

          public?static?ExecutorService?newSingleThreadExecutor(ThreadFactory?threadFactory)?{
          ??return?new?FinalizableDelegatedExecutorService
          ????(new?ThreadPoolExecutor(1,?1,
          ????????????????????????????0L,?TimeUnit.MILLISECONDS,
          ????????????????????????????new?LinkedBlockingQueue(),
          ????????????????????????????threadFactory));
          }

          可以看到,在 newSingleThreadExecutor 中,corePoolSize 和 maximumPoolSize 都被設(shè)置為 1,也不存在超時情況,同樣使用了 LinkedBlockingQueue 無界阻塞隊列,除了 corePoolSize 和 maximumPoolSize 外,其他幾乎和 newFixedThreadPool 一模一樣。

          下面是 newSingleThreadExecutor ?的執(zhí)行示意圖

          newSingleThreadExecutor 的執(zhí)行過程和 newFixedThreadPool 相同,只是 newSingleThreadExecutor 的工作線程數(shù)為 1。

          newCachedThreadPool

          newCachedThreadPool 是一個根據(jù)需要創(chuàng)建工作線程的線程池,newCachedThreadPool 線程池最大數(shù)量是 Integer.MAX_VALUE,保活時間是 60 秒,使用的是SynchronousQueue 無緩沖阻塞隊列。

          public?static?ExecutorService?newCachedThreadPool(ThreadFactory?threadFactory)?{
          ??return?new?ThreadPoolExecutor(0,?Integer.MAX_VALUE,
          ????????????????????????????????60L,?TimeUnit.SECONDS,
          ????????????????????????????????new?SynchronousQueue(),
          ????????????????????????????????threadFactory);
          }

          它的執(zhí)行示意圖如下

          • 首先會先執(zhí)行 SynchronousQueue.offer 方法,如果當前 maximumPool 中有空閑線程正在執(zhí)行 SynchronousQueue.poll ,就會把任務(wù)交給空閑線程來執(zhí)行,execute 方法執(zhí)行完畢,否則的話,繼續(xù)向下執(zhí)行。
          • 如果 maximumPool 中沒有線程執(zhí)行 SynchronousQueue.poll 方法,這種情況下 newCachedThreadPool 會創(chuàng)建一個新線程執(zhí)行任務(wù),execute 方法執(zhí)行完成。
          • 執(zhí)行完成的線程將執(zhí)行 poll 操作,這個 poll 操作會讓空閑線程最多在 SynchronousQueue 中等待 60 秒鐘。如果 60 秒鐘內(nèi)提交了一個新任務(wù),那么空閑線程會執(zhí)行這個新提交的任務(wù),否則空閑線程將會終止。

          這里的關(guān)鍵點在于 SynchronousQueue 隊列,它是一個沒有容量的阻塞隊列。每個插入操作必須等待另一個線程對應(yīng)的移除操作。這其實就是一種任務(wù)傳遞,如下圖所示

          其實還有一個線程池 ScheduledThreadPoolExecutor ,就先不在此篇文章做詳細贅述了。

          線程池實踐考量因素

          下面介紹幾種在實踐過程中使用線程池需要考慮的幾個點

          • 避免任務(wù)堆積,比如我們上面提到的 newFixedThreadPool,它是創(chuàng)建指定數(shù)目的線程,但是工作隊列是無界的,這就導致如果工作隊列線程太少,導致處理速度跟不上入隊速度,這種情況下很可能會導致 OOM,診斷時可以使用 jmap 檢查是否有大量任務(wù)入隊。
          • 生產(chǎn)實踐中很可能由于邏輯不嚴謹或者工作線程不能及時釋放導致 線程泄漏,這個時候最好檢查一下線程棧
          • 避免死鎖等同步問題
          • 盡量避免在使用線程池時操作 ThreadLocal,因為工作線程的生命周期可能會超過任務(wù)的生命周期。

          線程池大小的設(shè)置

          線程池大小的設(shè)置也是面試官經(jīng)常會考到的一個點,一般需要根據(jù)任務(wù)類型來配置線程池大小

          • 如果是 CPU 密集型任務(wù),那么就意味著 CPU 是稀缺資源,這個時候我們通常不能通過增加線程數(shù)來提高計算能力,因為線程數(shù)量太多,會導致頻繁的上下文切換,一般這種情況下,建議合理的線程數(shù)值是 N(CPU)數(shù) + 1
          • 如果是 I/O 密集型任務(wù),就說明需要較多的等待,這個時候可以參考 Brain Goetz 的推薦方法 線程數(shù) = CPU核數(shù) × (1 + 平均等待時間/平均工作時間)。參考值可以是 N(CPU) 核數(shù) * 2。

          當然,這只是一個參考值,具體的設(shè)置還需要根據(jù)實際情況進行調(diào)整,比如可以先將線程池大小設(shè)置為參考值,再觀察任務(wù)運行情況和系統(tǒng)負載、資源利用率來進行適當調(diào)整。

          后記

          這篇文章真的寫了很久,因為之前對線程池認識不是很深,所以花了大力氣來研究,希望這篇文章對你有所幫助。

          這篇文章探討了對 Executor 框架的主要組成、線程池結(jié)構(gòu)與生命周期,線程池源碼和線程池實現(xiàn)的細節(jié)等方面進行了講解和分析,希望對你有所幫助。

          如果這篇文章寫的還不錯,希望讀者朋友們可以不吝給出四連:點贊、在看、留言、分享,記住這次一定哦!


          往期精選

          騰訊九年,再見嘍!

          面試,真的刺激!

          炸裂!MySQL 82 張圖帶你飛!

          就這樣,我被禁足了!!

          千萬別再瞎招人了 (干貨)

          為什么要有 AtomicReference ?

          動態(tài)代理竟然如此簡單!

          另外,cxuan 肝了六本 PDF,公號回復 cxuan ,領(lǐng)取作者全部 PDF 。


          瀏覽 38
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  操香港女人逼视频 | 大伊香蕉视频在线观看 | 看亚洲A级一级毛片 | 日韩18禁网站 | 精品成人Av一区二区三区 |