肝完這篇線程池,我咳血了
點(diǎn)擊藍(lán)色“程序員cxuan?”關(guān)注我喲
加個(gè)“星標(biāo)”,及時(shí)接收最新文章

更多文章見?
https://github.com/crisxuan/bestJavaer
我們知道,線程需要的時(shí)候要進(jìn)行創(chuàng)建,不需要的時(shí)候需要進(jìn)行銷毀,但是線程的創(chuàng)建和銷毀都是一個(gè)開銷比較大的操作。
為什么開銷大呢?
雖然我們程序員創(chuàng)建一個(gè)線程很容易,直接使用 new Thread() 創(chuàng)建就可以了,但是操作系統(tǒng)做的工作會(huì)多很多,它需要發(fā)出 系統(tǒng)調(diào)用,陷入內(nèi)核,調(diào)用內(nèi)核 API 創(chuàng)建線程,為線程分配資源等,這一些操作有很大的開銷。
所以,在高并發(fā)大流量的情況下,頻繁的創(chuàng)建和銷毀線程會(huì)大大拖慢響應(yīng)速度,那么有什么能夠提高響應(yīng)速度的方式嗎?方式有很多,盡量避免線程的創(chuàng)建和銷毀是一種提升性能的方式,也就是把線程 復(fù)用 起來,因?yàn)樾阅苁俏覀內(nèi)粘W铌P(guān)注的因素。
本篇文章我們先來通過認(rèn)識(shí)一下 Executor 框架、然后通過描述線程池的基本概念入手、逐步認(rèn)識(shí)線程池的核心類,然后慢慢進(jìn)入線程池的原理中,帶你一步一步理解線程池。
在 Java 中可以通過線程池來達(dá)到這樣的效果。今天我們就來詳細(xì)講解一下 Java 的線程池。
Executor 框架
為什么要先說一下 Executor 呢?因?yàn)槲艺J(rèn)為 Executor 是線程池的一個(gè)驅(qū)動(dòng),我們平常創(chuàng)建并執(zhí)行線程用的一般都是 new Thread().start() 這個(gè)方法,這個(gè)方法更多強(qiáng)調(diào) 創(chuàng)建一個(gè)線程并開始運(yùn)行。而我們后面講到創(chuàng)建線程池更多體現(xiàn)在驅(qū)動(dòng)執(zhí)行上。
Executor 的總體框架如下,我們下面會(huì)對(duì) Executor 框架中的每個(gè)類進(jìn)行介紹。

我們首先來認(rèn)識(shí)一下 Executor
Executor 接口
Executor 是 java.util.concurrent 的頂級(jí)接口,這個(gè)接口只有一個(gè)方法,那就是 execute 方法。我們平常創(chuàng)建并啟動(dòng)線程會(huì)使用 new Thread().start() ,而 Executor 中的 execute 方法替代了顯示創(chuàng)建線程的方式。Executor 的設(shè)計(jì)初衷就是將任務(wù)提交和任務(wù)執(zhí)行細(xì)節(jié)進(jìn)行解藕。使用 Executor 框架,你可以使用如下的方式創(chuàng)建線程
Executor?executor?=?Executors.xxx?//?xxx?其實(shí)就是?Executor?的實(shí)現(xiàn)類,我們后面會(huì)說
executor.execute(new?RunnableTask1());
executor.execute(new?RunnableTask2());
execute方法接收一個(gè) Runnable 實(shí)例,它用來執(zhí)行一個(gè)任務(wù),而任務(wù)就是一個(gè)實(shí)現(xiàn)了 Runnable 接口的類,但是 execute 方法不能接收實(shí)現(xiàn)了 Callable 接口的類,也就是說,execute 方法不能接收具有返回值的任務(wù)。
execute 方法創(chuàng)建的線程是異步執(zhí)行的,也就是說,你不用等待每個(gè)任務(wù)執(zhí)行完畢后再執(zhí)行下一個(gè)任務(wù)。

比如下面就是一個(gè)簡(jiǎn)單的使用 Executor 創(chuàng)建并執(zhí)行線程的示例
public?class?RunnableTask?implements?Runnable{
????@Override
????public?void?run()?{
????????System.out.println("running");
????}
????public?static?void?main(String[]?args)?{
????????Executor?executor?=?Executors.newSingleThreadExecutor();?//?你可能不太理解這是什么意思,我們后面會(huì)說。
????????executor.execute(new?RunnableTask());
????}
}
Executor 就相當(dāng)于是族長(zhǎng),大佬只發(fā)號(hào)令,族長(zhǎng)讓你異步執(zhí)行你就得異步執(zhí)行,族長(zhǎng)說不用匯報(bào)任務(wù)你就不用回報(bào),但是這個(gè)族長(zhǎng)管的事情有點(diǎn)少,所以除了 Executor 之外,我們還需要認(rèn)識(shí)其他管家,比如說管你這個(gè)線程啥時(shí)候終止,啥時(shí)候暫停,判斷你這個(gè)線程當(dāng)前的狀態(tài)等,ExecutorService 就是一位大管家。
ExecutorService 接口
ExecutorService 也是一個(gè)接口,它是 Executor 的拓展,提供了一些 Executor 中沒有的方法,下面我們來介紹一下這些方法
void?shutdown();
shutdown 方法調(diào)用后,ExecutorService 會(huì)有序關(guān)閉正在執(zhí)行的任務(wù),但是不接受新任務(wù)。如果任務(wù)已經(jīng)關(guān)閉,那么這個(gè)方法不會(huì)產(chǎn)生任何影響。
ExecutorService 還有一個(gè)和 shutdown 方法類似的方法是
List?shutdownNow() ;
shutdownNow 會(huì)嘗試停止關(guān)閉所有正在執(zhí)行的任務(wù),停止正在等待的任務(wù),并返回正在等待執(zhí)行的任務(wù)列表。
既然 shutdown 和 shutdownNow 這么相似,那么二者有啥區(qū)別呢?
shutdown 方法只是會(huì)將 線程池的狀態(tài)設(shè)置為SHUTWDOWN,正在執(zhí)行的任務(wù)會(huì)繼續(xù)執(zhí)行下去,線程池會(huì)等待任務(wù)的執(zhí)行完畢,而沒有執(zhí)行的線程則會(huì)中斷。shutdownNow 方法會(huì)將線程池的狀態(tài)設(shè)置為 STOP,正在執(zhí)行和等待的任務(wù)則被停止,返回等待執(zhí)行的任務(wù)列表
ExecutorService 還有三個(gè)判斷線程狀態(tài)的方法,分別是
boolean?isShutdown();
boolean?isTerminated();
boolean?awaitTermination(long?timeout,?TimeUnit?unit)
????????throws?InterruptedException;
isShutdown方法表示執(zhí)行器是否已經(jīng)關(guān)閉,如果已經(jīng)關(guān)閉,返回 true,否則返回 false。isTerminated方法表示判斷所有任務(wù)再關(guān)閉后是否已完成,如果完成返回 false,這個(gè)需要注意一點(diǎn),除非首先調(diào)用 shutdown 或者 shutdownNow 方法,否則 isTerminated 方法永遠(yuǎn)不會(huì)為 true。awaitTermination方法會(huì)阻塞,直到發(fā)出調(diào)用 shutdown 請(qǐng)求后所有的任務(wù)已經(jīng)完成執(zhí)行后才會(huì)解除。這個(gè)方法不是非常容易理解,下面通過一個(gè)小例子來看一下。
public?static?ExecutorService?executorService?=?Executors.newFixedThreadPool(10);
public?static?void?main(String[]?args)?throws?InterruptedException?{
??for?(int?i?=?0;?i?10;?i++)?{
????executorService.submit(()?->?{
??????System.out.println(Thread.currentThread().getName());
??????try?{
????????Thread.sleep(10);
??????}?catch?(InterruptedException?e)?{
????????e.printStackTrace();
??????}
????});
??}
??executorService.shutdown();
??System.out.println("Waiting...");
??boolean?isTermination?=?executorService.awaitTermination(3,?TimeUnit.SECONDS);
??System.out.println("Waiting...Done");
??if(isTermination){
????System.out.println("All?Thread?Done");
??}
??System.out.println(Thread.currentThread().getName());
}
如果在調(diào)用 executorService.shutdown() 之后,所有線程完成任務(wù),isTermination 返回 true,程序才會(huì)打印出 All Thread Done ,如果注釋掉 executorService.shutdown() 或者在任務(wù)沒有完成后 awaitTermination 就超時(shí)了,那么 isTermination 就會(huì)返回 false。
ExecutorService 當(dāng)大管家還有一個(gè)原因是因?yàn)樗粌H能夠包容 Runnable 對(duì)象,還能夠接納 Callable 對(duì)象。在 ExecutorService 中,submit 方法扮演了這個(gè)角色。
?Future?submit(Callable?task) ;
?Future?submit(Runnable?task,?T?result) ;
Future>?submit(Runnable?task);
submit 方法會(huì)返回一個(gè) Future對(duì)象, 表示范型,它是對(duì) Callable 產(chǎn)生的返回值來說的,submit 方法提交的任務(wù)中的 call 方法如果返回 Integer,那么 submit 方法就返回 Future,依此類推。
?List>?invokeAll(Collection?extends?Callable>?tasks)
????????throws?InterruptedException;
?List>?invokeAll(Collection?extends?Callable>?tasks,
??????????????????????????????????long?timeout,?TimeUnit?unit)
????????throws?InterruptedException;
invokeAll 方法用于執(zhí)行給定的任務(wù)結(jié)合,執(zhí)行完成后會(huì)返回一個(gè)任務(wù)列表,任務(wù)列表每一項(xiàng)是一個(gè)任務(wù),每個(gè)任務(wù)會(huì)包括任務(wù)狀態(tài)和執(zhí)行結(jié)果,同樣 invokeAll 方法也會(huì)返回 Future 對(duì)象。
?T?invokeAny(Collection?extends?Callable>?tasks)
????????throws?InterruptedException,?ExecutionException;
?T?invokeAny(Collection?extends?Callable>?tasks,
????????????????????long?timeout,?TimeUnit?unit)
????????throws?InterruptedException,?ExecutionException,?TimeoutException;
invokeAny 會(huì)獲得最先完成任務(wù)的結(jié)果,即Callable 接口中的 call 的返回值,在獲得結(jié)果時(shí),會(huì)中斷其他正在執(zhí)行的任務(wù),具有阻塞性。
大管家的職責(zé)相對(duì)于組長(zhǎng)來說標(biāo)準(zhǔn)更多,管的事情也比較寬,但是大管家畢竟也是家族的中流砥柱,他不會(huì)做具體的活,他的下面有各個(gè)干將,干將是一個(gè)家族的核心,他負(fù)責(zé)完成大管家的工作。
AbstractExecutorService 抽象類
AbstractExecutorService 是一個(gè)抽象類,它實(shí)現(xiàn)了 ExecutorService 中的部分方法,它相當(dāng)一個(gè)干將,會(huì)分析大管家有哪些要做的工作,然后針對(duì)大管家的要求做一些具體的規(guī)劃,然后找他的得力助手 ThreadPoolExecutor 來完成目標(biāo)。
AbstractExecutorService 這個(gè)抽象類主要實(shí)現(xiàn)了 invokeAll 和 invokeAny 方法,關(guān)于這兩個(gè)方法的源碼分析我們會(huì)在后面進(jìn)行解釋。
ScheduledExecutorService 接口
ScheduledExecutorService 也是一個(gè)接口,它擴(kuò)展了 ExecutorService 接口,提供了 ExecutorService 接口所沒有的功能,ScheduledExecutorService 顧名思義就是一個(gè)定時(shí)執(zhí)行器,定時(shí)執(zhí)行器可以安排命令在一定延遲時(shí)間后運(yùn)行或者定期執(zhí)行。
它主要有三個(gè)接口方法,一個(gè)重載方法。下面我們先來看一下這兩個(gè)重載方法。
public?ScheduledFuture>?schedule(Runnable?command,
???????????????????????????????????????long?delay,?TimeUnit?unit);
public??ScheduledFuture?schedule(Callable?callable,
???????????????????????????????????????????long?delay,?TimeUnit?unit) ;
schedule 方法能夠延遲一定時(shí)間后執(zhí)行任務(wù),并且只能執(zhí)行一次。可以看到,schedule 方法也返回了一個(gè) ScheduledFuture 對(duì)象,ScheduledFuture 對(duì)象擴(kuò)展了 Future 和 Delayed 接口,它表示異步延遲計(jì)算的結(jié)果。schedule 方法支持零延遲和負(fù)延遲,這兩類值都被視為立即執(zhí)行任務(wù)。
還有一點(diǎn)需要說明的是,schedule 方法能夠接收相對(duì)的時(shí)間和周期作為參數(shù),而不是固定的日期,你可以使用 date.getTime - System.currentTimeMillis() 來得到相對(duì)的時(shí)間間隔。
public?ScheduledFuture>?scheduleAtFixedRate(Runnable?command,
??????????????????????????????????????????????????long?initialDelay,
??????????????????????????????????????????????????long?period,
??????????????????????????????????????????????????TimeUnit?unit);
scheduleAtFixedRate 表示任務(wù)會(huì)根據(jù)固定的速率在時(shí)間 initialDelay 后不斷地執(zhí)行。
public?ScheduledFuture>?scheduleWithFixedDelay(Runnable?command,
?????????????????????????????????????????????????????long?initialDelay,
?????????????????????????????????????????????????????long?delay,
?????????????????????????????????????????????????????TimeUnit?unit);
這個(gè)方法和上面的方法很類似,它表示的是以固定延遲時(shí)間的方式來執(zhí)行任務(wù)。
scheduleAtFixedRate 和 scheduleWithFixedDelay 這兩個(gè)方法容易混淆,下面我們通過一個(gè)示例來說明一下這兩個(gè)方法的區(qū)別。
public?class?ScheduleTest?{
????public?static?void?main(String[]?args)?{
????????Runnable?command?=?()?->?{
????????????long?startTime?=?System.currentTimeMillis();
????????????System.out.println("current?timestamp?=?"?+?startTime);
????????????try?{
????????????????TimeUnit.MILLISECONDS.sleep(new?Random().nextInt(100));
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????????System.out.println("time?spend?=?"?+?(System.currentTimeMillis()?-?startTime));
????????};
????????ScheduledExecutorService?scheduledExecutorService?=?Executors.newScheduledThreadPool(10);
????????scheduledExecutorService.scheduleAtFixedRate(command,100,1000,TimeUnit.MILLISECONDS);
????}
}
輸出結(jié)果大致如下

可以看到,每次打印出來 current timestamp 的時(shí)間間隔大約等于 1000 毫秒,所以可以斷定 scheduleAtFixedRate 是以恒定的速率來執(zhí)行任務(wù)的。
然后我們?cè)倏匆幌?scheduleWithFixedDelay 方法,和上面測(cè)試類一樣,只不過我們把 scheduleAtFixedRate 換為了 scheduleWithFixedDelay 。
scheduledExecutorService.scheduleWithFixedDelay(command,10,1000,TimeUnit.MILLISECONDS);
然后觀察一下輸出結(jié)果

可以看到,兩個(gè) current timestamp 之間的間隔大約等于 1000(固定時(shí)間) + delay(time spend) 的總和,由此可以確定 scheduleWithFixedDelay 是以固定時(shí)延來執(zhí)行的。
線程池的描述
下面我們先來認(rèn)識(shí)一下什么是線程池,線程池從概念上來看就是一個(gè)池子,什么池子呢?是指管理同一組工作線程的池子,也就是說,線程池會(huì)統(tǒng)一管理內(nèi)部的工作線程。
wiki 上說,線程池其實(shí)就是一種軟件設(shè)計(jì)模式,這種設(shè)計(jì)模式用于實(shí)現(xiàn)計(jì)算機(jī)程序中的并發(fā)。
比如下面就是一個(gè)簡(jiǎn)單的線程池概念圖。

注意:這個(gè)圖只是一個(gè)概念模型,不是真正的線程池實(shí)現(xiàn),希望讀者不要混淆。
可以看到,這種其實(shí)也相當(dāng)于是生產(chǎn)者-消費(fèi)者模型,任務(wù)隊(duì)列中的線程會(huì)進(jìn)入到線程池中,由線程池進(jìn)行管理,線程池中的一個(gè)個(gè)線程就是工作線程,工作線程執(zhí)行完畢后會(huì)放入完成隊(duì)列中,代表已經(jīng)完成的任務(wù)。
上圖有個(gè)缺點(diǎn),那就是隊(duì)列中的線程執(zhí)行完畢后就會(huì)銷毀,銷毀就會(huì)產(chǎn)生性能損耗,降低響應(yīng)速度,而我們使用線程池的目的往往是需要把線程重用起來,提高程序性能。
所以我們應(yīng)該把執(zhí)行完成后的工作線程重新利用起來,等待下一次使用。
線程池創(chuàng)建
我們上面大概聊了一下什么線程池的基本執(zhí)行機(jī)制,你知道了線程是如何復(fù)用的,那么任何事物不可能是憑空出現(xiàn)的,線程也一樣,那么它是如何創(chuàng)建出來的呢?下面就不得不提一個(gè)工具類,那就是 Executors。
Executors 也是java.util.concurrent 包下的成員,它是一個(gè)創(chuàng)建線程池的工廠,可以使用靜態(tài)工廠方法來創(chuàng)建線程池,下面就是 Executors 所能夠創(chuàng)建線程池的具體類型。

newFixedThreadPool:newFixedThreadPool 將會(huì)創(chuàng)建固定數(shù)量的線程池,這個(gè)數(shù)量可以由程序員通過創(chuàng)建Executors.newFixedThreadPool(int nThreads)時(shí)手動(dòng)指定,每次提交一個(gè)任務(wù)就會(huì)創(chuàng)建一個(gè)線程,在任何時(shí)候,nThreads 的值是最多允許活動(dòng)的線程。如果在所有線程都處于活躍狀態(tài)時(shí)有額外的任務(wù)被創(chuàng)建,這些新創(chuàng)建的線程會(huì)進(jìn)入等待隊(duì)列等待線程調(diào)度。如果有任何線程由于執(zhí)行期間出現(xiàn)意外導(dǎo)致線程終止,那么在執(zhí)行后續(xù)任務(wù)時(shí)會(huì)使用等待隊(duì)列中的線程進(jìn)行替代。newWorkStealingPool:newWorkStealingPool 是 JDK1.8 新增加的線程池,它是基于fork-join機(jī)制的一種線程池實(shí)現(xiàn),使用了Work-Stealing算法。newWorkStealingPool 會(huì)創(chuàng)建足夠的線程來支持并行度,會(huì)使用多個(gè)隊(duì)列來減少競(jìng)爭(zhēng)。work-stealing pool 線程池不會(huì)保證提交任務(wù)的執(zhí)行順序。newSingleThreadExecutor:newSingleThreadExecutor 是一個(gè)單線程的執(zhí)行器,它只會(huì)創(chuàng)建單個(gè)線程來執(zhí)行任務(wù),如果這個(gè)線程異常結(jié)束,則會(huì)創(chuàng)建另外一個(gè)線程來替代。newSingleThreadExecutor 會(huì)確保任務(wù)在任務(wù)隊(duì)列中的執(zhí)行次序,也就是說,任務(wù)的執(zhí)行是有序的。newCachedThreadPool:newCachedThreadPool 會(huì)根據(jù)實(shí)際需要?jiǎng)?chuàng)建一個(gè)可緩存的線程池。如果線程池的線程數(shù)量超過實(shí)際需要處理的任務(wù),那么 newCachedThreadPool 將會(huì)回收多余的線程。如果實(shí)際需要處理的線程不能滿足任務(wù)的數(shù)量,則回你添加新的線程到線程池中,線程池中線程的數(shù)量不存在任何限制。newSingleThreadScheduledExecutor:newSingleThreadScheduledExecutor 和 newSingleThreadExecutor 很類似,只不過帶有 scheduled 的這個(gè)執(zhí)行器哥們能夠在一定延遲后執(zhí)行或者定期執(zhí)行任務(wù)。newScheduledThreadPool:這個(gè)線程池和上面的 scheduled 執(zhí)行器類似,只不過 newSingleThreadScheduledExecutor 比 newScheduledThreadPool 多加了一個(gè)DelegatedScheduledExecutorService代理,這其實(shí)包裝器設(shè)計(jì)模式的體現(xiàn)。
上面這些線程池的底層實(shí)現(xiàn)都是由 ThreadPoolExecutor 來提供支持的,所以要理解這些線程池的工作原理,你就需要先把 ThreadPoolExecutor 搞明白,下面我們就來聊一聊 ThreadPoolExecutor。
ThreadPoolExecutor 類
ThreadPoolExecutor 位于 java.util.concurrent 工具類下,可以說它是線程池中最核心的一個(gè)類了。如果你要想把線程池理解透徹的話,就要首先了解一下這個(gè)類。
如果我們?cè)倌蒙厦婕易迮e例子的話,ThreadPoolExecutor 就是一個(gè)家族的骨干人才,家族頂梁柱。ThreadPoolExecutor 做的工作真是太多太多了。
首先,ThreadPoolExecutor 提供了四個(gè)構(gòu)造方法,然而前三個(gè)構(gòu)造方法最終都會(huì)調(diào)用最后一個(gè)構(gòu)造方法進(jìn)行初始化
public?class?ThreadPoolExecutor?extends?AbstractExecutorService?{
????.....
??????//?1
????public?ThreadPoolExecutor(int?corePoolSize,int?maximumPoolSize,long?keepAliveTime,TimeUnit?unit,
????????????BlockingQueue?workQueue) ;
????//?2
????public?ThreadPoolExecutor(int?corePoolSize,int?maximumPoolSize,long?keepAliveTime,TimeUnit?unit,
????????????BlockingQueue?workQueue,ThreadFactory?threadFactory) ;
????//?3
????public?ThreadPoolExecutor(int?corePoolSize,int?maximumPoolSize,long?keepAliveTime,TimeUnit?unit,
????????????BlockingQueue?workQueue,RejectedExecutionHandler?handler) ;
????//?4
????public?ThreadPoolExecutor(int?corePoolSize,int?maximumPoolSize,long?keepAliveTime,TimeUnit?unit,
????????BlockingQueue?workQueue,ThreadFactory?threadFactory,RejectedExecutionHandler?handler) ;
????...
}
所以我們直接就來看一波最后這個(gè)線程池,看看參數(shù)都有啥,如果我沒數(shù)錯(cuò)的話,應(yīng)該是有 7 個(gè)參數(shù)(小學(xué)數(shù)學(xué)水平。。。。。。)
首先,一個(gè)非常重要的參數(shù)就是 corePoolSize,核心線程池的容量/大小,你叫啥我覺得都沒毛病。只不過你得理解這個(gè)參數(shù)的意義,它和線程池的實(shí)現(xiàn)原理有非常密切的關(guān)系。你剛開始創(chuàng)建了一個(gè)線程池,此時(shí)是沒有任何線程的,這個(gè)很好理解,因?yàn)槲椰F(xiàn)在沒有任務(wù)可以執(zhí)行啊,創(chuàng)建線程干啥啊?而且創(chuàng)建線程還有開銷啊,所以等到任務(wù)過來時(shí)再創(chuàng)建線程也不晚。但是!我要說但是了,如果調(diào)用了 prestartAllCoreThreads 或者 prestartCoreThread 方法,就會(huì)在沒有任務(wù)到來時(shí)創(chuàng)建線程,前者是創(chuàng)建 corePoolSize 個(gè)線程,后者是只創(chuàng)建一個(gè)線程。Lea 爺爺本來想讓我們程序員當(dāng)個(gè)懶漢,等任務(wù)來了再干;可是你非要當(dāng)個(gè)餓漢,提前完成任務(wù)。如果我們想當(dāng)個(gè)懶漢的話,在創(chuàng)建了線程池后,線程池中的線程數(shù)為 0,當(dāng)有任務(wù)來之后,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到 corePoolSize 后,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中。

maximumPoolSize:又來一個(gè)線程池的容量,只不過這個(gè)是線程池的最大容量,也就是線程池所能容納最大的線程,而上面的 corePoolSize 只是核心線程容量。
我知道你此時(shí)會(huì)有疑問,那就是不知道如何核心線程的容量和線程最大容量的區(qū)別是吧?我們后面會(huì)解釋這點(diǎn)。
keepAliveTime:這個(gè)參數(shù)是線程池的保活機(jī)制,表示線程在沒有任務(wù)執(zhí)行的情況下保持多久會(huì)終止。在默認(rèn)情況下,這個(gè)參數(shù)只在線程數(shù)量大于 corePoolSize 時(shí)才會(huì)生效。當(dāng)線程數(shù)量大于 corePoolSize 時(shí),如果任意一個(gè)空閑的線程的等待時(shí)間 > keepAliveTime 后,那么這個(gè)線程會(huì)被剔除,直到線程數(shù)量等于 corePoolSize 為止。如果調(diào)用了 allowCoreThreadTimeOut 方法,線程數(shù)量在 corePoolSize 范圍內(nèi)也會(huì)生效,直到線程減為 0。unit:這個(gè)參數(shù)好說,它就是一個(gè)TimeUnit的變量,unit 表示的是 keepAliveTime 的時(shí)間單位。unit 的類型有下面這幾種TimeUnit.DAYS;???????????????//天
TimeUnit.HOURS;?????????????//小時(shí)
TimeUnit.MINUTES;???????????//分鐘
TimeUnit.SECONDS;???????????//秒
TimeUnit.MILLISECONDS;??????//毫秒
TimeUnit.MICROSECONDS;??????//微妙
TimeUnit.NANOSECONDS;???????//納秒workQueue:這個(gè)參數(shù)表示的概念就是等待隊(duì)列,我們上面說過,如果核心線程 > corePoolSize 的話,就會(huì)把任務(wù)放入等待隊(duì)列,這個(gè)等待隊(duì)列的選擇也是一門學(xué)問。Lea 爺爺給我們展示了三種等待隊(duì)列的選擇
SynchronousQueue: 基于阻塞隊(duì)列(BlockingQueue)的實(shí)現(xiàn),它會(huì)直接將任務(wù)交給消費(fèi)者,必須等隊(duì)列中的添加元素被消費(fèi)后才能繼續(xù)添加新的元素。使用 SynchronousQueue 阻塞隊(duì)列一般要求maximumPoolSizes 為無界,也就是 Integer.MAX_VALUE,避免線程拒絕執(zhí)行操作。LinkedBlockingQueue:LinkedBlockingQueue 是一個(gè)無界緩存等待隊(duì)列。當(dāng)前執(zhí)行的線程數(shù)量達(dá)到 corePoolSize 的數(shù)量時(shí),剩余的元素會(huì)在阻塞隊(duì)列里等待。ArrayBlockingQueue:ArrayBlockingQueue 是一個(gè)有界緩存等待隊(duì)列,可以指定緩存隊(duì)列的大小,當(dāng)正在執(zhí)行的線程數(shù)等于 corePoolSize 時(shí),多余的元素緩存在 ArrayBlockingQueue 隊(duì)列中等待有空閑的線程時(shí)繼續(xù)執(zhí)行,當(dāng) ArrayBlockingQueue 已滿時(shí),加入 ArrayBlockingQueue 失敗,會(huì)開啟新的線程去執(zhí)行,當(dāng)線程數(shù)已經(jīng)達(dá)到最大的 maximumPoolSizes 時(shí),再有新的元素嘗試加入 ArrayBlockingQueue時(shí)會(huì)報(bào)錯(cuò)threadFactory:線程工廠,這個(gè)參數(shù)主要用來創(chuàng)建線程;handler:拒絕策略,拒絕策略主要有以下取值
AbortPolicy:丟棄任務(wù)并拋出 RejectedExecutionException 異常。DiscardPolicy: 直接丟棄任務(wù),但是不拋出異常。DiscardOldestPolicy:直接丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)。CallerRunsPolicy:由調(diào)用線程處理該任務(wù)。
深入理解線程池
上面我和你簡(jiǎn)單聊了一下線程池的基本構(gòu)造,線程池有幾個(gè)非常重要的參數(shù)可以細(xì)細(xì)品味,但是哥們醒醒,接下來才是刺激的地方。
線程池狀態(tài)
首先我們先來聊聊線程池狀態(tài),線程池狀態(tài)是一個(gè)非常有趣的設(shè)計(jì)點(diǎn),ThreadPoolExecutor 使用 ctl 來存儲(chǔ)線程池狀態(tài),這些狀態(tài)也叫做線程池的生命周期。想想也是,線程池作為一個(gè)存儲(chǔ)管理線程的資源池,它自己也要有這些狀態(tài),以及狀態(tài)之間的變更才能更好的滿足我們的需求。ctl 其實(shí)就是一個(gè) AtomicInteger 類型的變量,保證原子性。
ctl 除了存儲(chǔ)線程池狀態(tài)之外,它還存儲(chǔ) workerCount 這個(gè)概念,workerCount 指示的是有效線程數(shù),workerCount 表示的是已經(jīng)被允許啟動(dòng)但不允許停止的工作線程數(shù)量。workerCount 的值與實(shí)際活動(dòng)線程的數(shù)量不同。
ctl 高低位來判斷是線程池狀態(tài)還是工作線程數(shù)量,線程池狀態(tài)位于高位。
這里有個(gè)設(shè)計(jì)點(diǎn),為什么使用 AtomicInteger 而不是存儲(chǔ)上線更大的 AtomicLong 之類的呢?
Lea 并非沒有考慮過這個(gè)問題,為了表示 int 值,目前 workerCount 的大小是**(2 ^ 29)-1(約 5 億個(gè)線程),而不是(2 ^ 31)-1(20億個(gè))可表示的線程**。如果將來有問題,可以將該變量更改為 AtomicLong。但是在需要之前,使用 int 可以使此代碼更快,更簡(jiǎn)單,int 存儲(chǔ)占用存儲(chǔ)空間更小。
runState 具有如下幾種狀態(tài)
private?static?final?int?RUNNING????=?-1?<private?static?final?int?SHUTDOWN???=??0?<private?static?final?int?STOP???????=??1?<private?static?final?int?TIDYING????=??2?<private?static?final?int?TERMINATED?=??3?<我們先上狀態(tài)輪轉(zhuǎn)圖,然后根據(jù)狀態(tài)輪轉(zhuǎn)圖做詳細(xì)的解釋。

這幾種狀態(tài)的解釋如下
RUNNING: 如果線程池處于 RUNNING 狀態(tài)下的話,能夠接收新任務(wù),也能處理正在運(yùn)行的任務(wù)。可以從 ctl 的初始化得知,線程池一旦創(chuàng)建出來就會(huì)處于 RUNNING 狀態(tài),并且線程池中的有效線程數(shù)為 0。
private?final?AtomicInteger?ctl?=?new?AtomicInteger(ctlOf(RUNNING,?0));
SHUTDOWN: 在調(diào)用 shutdown 方法后,線程池的狀態(tài)會(huì)由 RUNNING -> SHUTDOWN 狀態(tài),位于 SHUTDOWN 狀態(tài)的線程池能夠處理正在運(yùn)行的任務(wù),但是不能接受新的任務(wù),這和我們上面說的對(duì)于 shutdown 的描述一致。STOP: 和 shutdown 方法類似,在調(diào)用 shutdownNow 方法時(shí),程序會(huì)從 RUNNING/SHUTDOWN -> STOP 狀態(tài),處于 STOP 狀態(tài)的線程池,不接收新任務(wù),不處理已添加的任務(wù),并且會(huì)中斷正在處理的任務(wù)。TIDYING:TIDYING 狀態(tài)有個(gè)前置條件,分為兩種:一種是是當(dāng)線程池位于 SHUTDOWN 狀態(tài)下,阻塞隊(duì)列和線程池中的線程數(shù)量為空時(shí),會(huì)由 SHUTDOWN -> TIDYING;另一種是當(dāng)線程池位于 STOP 狀態(tài)下時(shí),線程池中的數(shù)量為空時(shí),會(huì)由 STOP -> TIDYING 狀態(tài)。轉(zhuǎn)換為 TIDYING 的線程池會(huì)調(diào)用terminated這個(gè)鉤子方法,terminated 在 ThreadPoolExecutor 類中是空實(shí)現(xiàn),若用戶想在線程池變?yōu)?TIDYING 時(shí),進(jìn)行相應(yīng)的處理,可以通過重載 terminated 函數(shù)來實(shí)現(xiàn)。TERMINATED:TERMINATED 狀態(tài)是線程池的最后一個(gè)狀態(tài),線程池處在 TIDYING 狀態(tài)時(shí),執(zhí)行完terminated 方法之后,就會(huì)由 TIDYING -> TERMINATED 狀態(tài)。此時(shí)表示線程池的徹底終止。
重要變量
下面我們一起來了解一下線程池中的重要變量。
private?final?BlockingQueue?workQueue;
阻塞隊(duì)列,這個(gè)和我們上面說的阻塞隊(duì)列的參數(shù)是一個(gè)意思,因?yàn)樵跇?gòu)造 ThreadPoolExecutor 時(shí),會(huì)把參數(shù)的值賦給 this.workQueue。
private?final?ReentrantLock?mainLock?=?new?ReentrantLock();?
線程池的主要狀態(tài)鎖,對(duì)線程池的狀態(tài)(比如線程池大小、運(yùn)行狀態(tài))的改變都需要使用到這個(gè)鎖
private?final?HashSet?workers?=?new?HashSet();
workers 持有線程池中所有線程的集合,只有持有上面 mainLock 的鎖才能夠訪問。
private?final?Condition?termination?=?mainLock.newCondition();
等待條件,用來支持 awaitTermination 方法。Condition 和 Lock 一起使用可以實(shí)現(xiàn)通知/等待機(jī)制。
private?int?largestPoolSize;
largestPoolSize 表示線程池中最大池的大小,只有持有 mainLock 才能訪問
private?long?completedTaskCount;
completedTaskCount 表示任務(wù)完成的計(jì)數(shù),它僅僅在任務(wù)終止時(shí)更新,需要持有 mainLock 才能訪問。
private?volatile?ThreadFactory?threadFactory;
threadFactory 是創(chuàng)建線程的工廠,所有的線程都會(huì)使用這個(gè)工廠,調(diào)用 addWorker 方法創(chuàng)建。
private?volatile?RejectedExecutionHandler?handler;
handler 表示拒絕策略,handler 會(huì)在線程飽和或者將要關(guān)閉的時(shí)候調(diào)用。
private?volatile?long?keepAliveTime;
保活時(shí)間,它指的是空閑線程等待工作的超時(shí)時(shí)間,當(dāng)存在多個(gè) corePoolSize 或 allowCoreThreadTimeOut 時(shí),線程將使用這個(gè)超時(shí)時(shí)間。
下面是一些其他變量,這些變量比較簡(jiǎn)單,我就直接給出注釋了。
private?volatile?boolean?allowCoreThreadTimeOut;???//是否允許為核心線程設(shè)置存活時(shí)間
private?volatile?int???corePoolSize;?????//核心池的大小(即線程池中的線程數(shù)目大于這個(gè)參數(shù)時(shí),提交的任務(wù)會(huì)被放進(jìn)任務(wù)緩存隊(duì)列)
private?volatile?int???maximumPoolSize;???//線程池最大能容忍的線程數(shù)
private?static?final?RejectedExecutionHandler?defaultHandler?=
????????new?AbortPolicy();?//?默認(rèn)的拒絕策略
任務(wù)提交
現(xiàn)在我們知道了 ThreadPoolExecutor 創(chuàng)建出來就會(huì)處于運(yùn)行狀態(tài),此時(shí)線程數(shù)量為 0 ,等任務(wù)到來時(shí),線程池就會(huì)創(chuàng)建線程來執(zhí)行任務(wù),而下面我們的關(guān)注點(diǎn)就會(huì)放在任務(wù)提交這個(gè)過程上。
通常情況下,我們會(huì)使用
executor.execute()?
來執(zhí)行任務(wù),我在很多書和博客教程上都看到過這個(gè)執(zhí)行過程,下面是一些書和博客教程所畫的 ThreadPoolExecutor 的執(zhí)行示意圖和執(zhí)行流程圖
執(zhí)行示意圖

處理流程圖

ThreadPoolExecutor 的執(zhí)行 execute 的方法分為下面四種情況
如果當(dāng)前運(yùn)行的工作線程少于 corePoolSize 的話,那么會(huì)創(chuàng)建新線程來執(zhí)行任務(wù) ,這一步需要獲取 mainLock 全局鎖。如果運(yùn)行線程不小于 corePoolSize,則將任務(wù)加入 BlockingQueue 阻塞隊(duì)列。 如果無法將任務(wù)加入 BlockingQueue 中,此時(shí)的現(xiàn)象就是隊(duì)列已滿,此時(shí)需要?jiǎng)?chuàng)建新的線程來處理任務(wù),這一步同樣需要獲取 mainLock 全局鎖。 如果創(chuàng)建新線程會(huì)使當(dāng)前運(yùn)行的線程超過 maximumPoolSize的話,任務(wù)將被拒絕,并且使用RejectedExecutionHandler.rejectEExecution()方法拒絕新的任務(wù)。
ThreadPoolExecutor 采取上面的整體設(shè)計(jì)思路,是為了在執(zhí)行 execute 方法時(shí),避免獲取全局鎖,因?yàn)轭l繁獲取全局鎖會(huì)是一個(gè)嚴(yán)重的可伸縮瓶頸,所以,幾乎所有的 execute 方法調(diào)用都是通過執(zhí)行步驟2。
上面指出了 execute 的運(yùn)行過程,整體上來說這個(gè)執(zhí)行過程把非常重要的點(diǎn)講解出來了,但是不夠細(xì)致,我查閱 ThreadPoolExecute 和部分源碼分析文章后,發(fā)現(xiàn)這事其實(shí)沒這么簡(jiǎn)單,先來看一下 execute 的源碼,我已經(jīng)給出了中文注釋
public?void?execute(Runnable?command)?{
??if?(command?==?null)
????throw?new?NullPointerException();
??//?獲取?ctl?的值
??int?c?=?ctl.get();
??//?判斷?ctl?的值是否小于核心線程池的數(shù)量
??if?(workerCountOf(c)?????//?如果小于,增加工作隊(duì)列,command?就是一個(gè)個(gè)的任務(wù)
????if?(addWorker(command,?true))
??????//?線程創(chuàng)建成功,直接返回
??????return;
????//?線程添加不成功,需要再次判斷,每需要一次判斷都會(huì)獲取?ctl?的值
????c?=?ctl.get();
??}
??//?如果線程池處于運(yùn)行狀態(tài)并且能夠成功的放入阻塞隊(duì)列
??if?(isRunning(c)?&&?workQueue.offer(command))?{
????//?再次進(jìn)行檢查
????int?recheck?=?ctl.get();
????//?如果不是運(yùn)行態(tài)并且成功的從阻塞隊(duì)列中刪除
????if?(!?isRunning(recheck)?&&?remove(command))
??????//?執(zhí)行拒絕策略
??????reject(command);
????//?worker?線程數(shù)量是否為?0
????else?if?(workerCountOf(recheck)?==?0)
??????//?增加工作線程
??????addWorker(null,?false);
??}
??//?如果不能增加工作線程的數(shù)量,就會(huì)直接執(zhí)行拒絕策略
??else?if?(!addWorker(command,?false))
????reject(command);
}
下面是我根據(jù)源碼畫出的執(zhí)行流程圖

下面我們針對(duì) execute 流程進(jìn)行分析,可能有點(diǎn)啰嗦,因?yàn)閹讉€(gè)核心流程上面已經(jīng)提過了,不過為了流程的完整性,我們?cè)僭谶@里重新提一下。
如果線程池的核心數(shù)量少于 corePoolSize,那么就會(huì)使用 addWorker 創(chuàng)建新線程,addworker 的流程我們會(huì)在下面進(jìn)行分析。如果創(chuàng)建成功,那么 execute 方法會(huì)直接返回。如果沒創(chuàng)建成功,可能是由于線程池已經(jīng) shutdown,可能是由于并發(fā)情況下 workerCountOf(c) < corePoolSize ,別的線程先創(chuàng)建了 worker 線程,導(dǎo)致 workerCoun t>= corePoolSize。如果線程池還在 Running 狀態(tài),會(huì)將 task 加入阻塞隊(duì)列,加入成功后會(huì)進(jìn)行 double-check雙重校驗(yàn),繼續(xù)下面的步驟,如果加入失敗,可能是由于隊(duì)列線程已滿,此時(shí)會(huì)判斷是否能夠加入線程池中,如果線程池也滿了的話,就會(huì)直接執(zhí)行拒絕策略,如果線程池能加入,execute 方法結(jié)束。步驟 2 中的 double-check 主要是為了判斷進(jìn)入 workQueue 中的 task 是否能被執(zhí)行:如果線程池已經(jīng)不是 Running 狀態(tài),則應(yīng)該拒絕添加任務(wù),從 workQueue 隊(duì)列中刪除任務(wù)。如果線程池是 Running,但是從 workQueue 中刪除失敗了,此時(shí)的原因可能是由于其他線程執(zhí)行了這個(gè)任務(wù),此時(shí)會(huì)直接執(zhí)行拒絕策略。 如果線程是 Running 狀態(tài),并且不能把任務(wù)從隊(duì)列中移除,進(jìn)而判斷工作線程是否為 0 ,如果不為 0 ,execute 執(zhí)行完畢,如果工作線程是 0 ,則會(huì)使用 addWorker 增加工作線程,execute 執(zhí)行完畢。
添加 worker 線程
從上面的執(zhí)行流程可以看出,添加一個(gè) worker 涉及的工作也非常多,這也是一個(gè)比價(jià)難啃的點(diǎn),我們一起來分析下,這是 worker 的源碼
private?boolean?addWorker(Runnable?firstTask,?boolean?core)?{
??//?retry?的用法相當(dāng)于?goto
??retry:
??for?(;;)?{
????int?c?=?ctl.get();
????int?rs?=?runStateOf(c);
????//?Check?if?queue?empty?only?if?necessary.
????//?僅在必要時(shí)檢查隊(duì)列是否為空。
????//?線程池狀態(tài)有五種,state?越小越是運(yùn)行狀態(tài)
????//?rs?>=?SHUTDOWN,表示此時(shí)線程池狀態(tài)可能是?SHUTDOWN、STOP、TIDYING、TERMINATED
????//?默認(rèn)?rs?>=?SHUTDOWN,如果?rs?=?SHUTDOWN,直接返回?false
????//?默認(rèn)?rs?
????//?默認(rèn)?RUNNING,任務(wù)是空,如果工作隊(duì)列為空,返回?false
????//
????if?(rs?>=?SHUTDOWN?&&
????????!?(rs?==?SHUTDOWN?&&
???????????firstTask?==?null?&&
???????????!?workQueue.isEmpty()))
??????return?false;
????//?執(zhí)行循環(huán)
????for?(;;)?{
??????//?統(tǒng)計(jì)工作線程數(shù)量
??????int?wc?=?workerCountOf(c);
??????//?如果?worker?數(shù)量>線程池最大上限?CAPACITY(即使用int低29位可以容納的最大值)
??????//?或者?worker數(shù)量?>?corePoolSize?或?worker數(shù)量>maximumPoolSize?),即已經(jīng)超過了給定的邊界
??????if?(wc?>=?CAPACITY?||
??????????wc?>=?(core???corePoolSize?:?maximumPoolSize))
????????return?false;
??????//?使用 CAS 增加 worker 數(shù)量,增加成功,跳出循環(huán)。
??????if?(compareAndIncrementWorkerCount(c))
????????break?retry;
??????//?檢查?ctl
??????c?=?ctl.get();??//?Re-read?ctl
??????//?如果狀態(tài)不等于之前獲取的?state,跳出內(nèi)層循環(huán),繼續(xù)去外層循環(huán)判斷
??????if?(runStateOf(c)?!=?rs)
????????continue?retry;
??????//?else?CAS?failed?due?to?workerCount?change;?retry?inner?loop
????}
??}
??/*
??????????worker數(shù)量+1成功的后續(xù)操作
????????*?添加到?workers?Set?集合,并啟動(dòng)?worker?線程
?????????*/
??boolean?workerStarted?=?false;
??boolean?workerAdded?=?false;
??Worker?w?=?null;
??try?{
????//?包裝?Runnable?對(duì)象
????//?設(shè)置?firstTask?的值為?-1
????//?賦值給當(dāng)前任務(wù)
????//?使用?worker?自身這個(gè)?runnable,調(diào)用?ThreadFactory?創(chuàng)建一個(gè)線程,并設(shè)置給worker的成員變量thread
????w?=?new?Worker(firstTask);
????final?Thread?t?=?w.thread;
????if?(t?!=?null)?{
??????final?ReentrantLock?mainLock?=?this.mainLock;
??????mainLock.lock();
??????try?{
????????//?在持有鎖的時(shí)候重新檢查
????????//?如果 ThreadFactory 失敗或在獲得鎖之前關(guān)閉,請(qǐng)回退。
????????int?rs?=?runStateOf(ctl.get());
????????//如果線程池在運(yùn)行?running
????????//?(可能是?workQueue?中仍有未執(zhí)行完成的任務(wù),創(chuàng)建沒有初始任務(wù)的?worker?線程執(zhí)行)
????????//worker?數(shù)量?-1?的操作在?addWorkerFailed()
????????if?(rs?????????????(rs?==?SHUTDOWN?&&?firstTask?==?null))?{
??????????if?(t.isAlive())?//?precheck?that?t?is?startable
????????????throw?new?IllegalThreadStateException();
??????????//?workers?就是一個(gè)?HashSet?集合
??????????workers.add(w);
??????????//?設(shè)置最大的池大小?largestPoolSize,workerAdded?設(shè)置為true
??????????int?s?=?workers.size();
??????????if?(s?>?largestPoolSize)
????????????largestPoolSize?=?s;
??????????workerAdded?=?true;
????????}
??????}?finally?{
????????mainLock.unlock();
??????}
??????if?(workerAdded)?{
????????t.start();
????????workerStarted?=?true;
??????}
????}
????//如果啟動(dòng)線程失敗
????//?worker?數(shù)量?-1
??}?finally?{
????if?(!?workerStarted)
??????addWorkerFailed(w);
??}
??return?workerStarted;
}
媽的真長(zhǎng)的一個(gè)方法,有點(diǎn)想吐血,其實(shí)我肝到現(xiàn)在已經(jīng)肝不動(dòng)了,但我一想到看這篇文章的讀者們能給我一個(gè)關(guān)注,就算咳出一口老血也值了。
這個(gè)方法的執(zhí)行流程圖如下

這里我們就不再文字描述了,但是上面流程圖中有一個(gè)對(duì)象引起了我的注意,那就是 worker 對(duì)象,這個(gè)對(duì)象就代表了線程池中的工作線程,那么這個(gè) worker 對(duì)象到底是啥呢?
worker 對(duì)象
Worker 位于 ThreadPoolExecutor 內(nèi)部,它繼承了 AQS 類并且實(shí)現(xiàn)了 Runnable 接口。Worker 類主要維護(hù)了線程運(yùn)行過程中的中斷控制狀態(tài)。它提供了鎖的獲取和釋放操作。在 worker 的實(shí)現(xiàn)中,我們使用了非重入的互斥鎖而不是使用重復(fù)鎖,因?yàn)?Lea 覺得我們不應(yīng)該在調(diào)用諸如 setCorePoolSize 之類的控制方法時(shí)能夠重新獲取鎖。
worker 對(duì)象的源碼比較簡(jiǎn)單和標(biāo)準(zhǔn),這里我們只說一下 worker 對(duì)象的構(gòu)造方法,也就是
Worker(Runnable?firstTask)?{
??setState(-1);?
??this.firstTask?=?firstTask;
??this.thread?=?getThreadFactory().newThread(this);
}
構(gòu)造一個(gè) worker 對(duì)象需要做三步操作:
初始 AQS 狀態(tài)為 -1,此時(shí)不允許中斷 interrupt(),只有在 worker 線程啟動(dòng)了,執(zhí)行了 runWorker() 方法后,將 state 置為0,才能進(jìn)行中斷。 將 firstTask 賦值給為當(dāng)前類的全局變量 通過 ThreadFactory創(chuàng)建一個(gè)新的線程。
###任務(wù)運(yùn)行
我們前面的流程主要分析了線程池的 execute 方法的執(zhí)行過程,這個(gè)執(zhí)行過程相當(dāng)于是任務(wù)提交過程,而我們下面要說的是從隊(duì)列中獲取任務(wù)并運(yùn)行的這個(gè)工作流程。
一般情況下,我們會(huì)從初始任務(wù)開始運(yùn)行,所以我們不需要獲取第一個(gè)任務(wù)。否則,只要線程池還處于 Running 狀態(tài),我們會(huì)調(diào)用 getTask 方法獲取任務(wù)。getTask 方法可能會(huì)返回 null,此時(shí)可能是由于線程池狀態(tài)改變或者是配置參數(shù)更改而導(dǎo)致的退出。還有一種情況可能是由于 異常 而引發(fā)的,這個(gè)我們后面會(huì)細(xì)說。
下面來看一下 runWorker 方法的源碼:
final?void?runWorker(Worker?w)?{
??Thread?wt?=?Thread.currentThread();
??Runnable?task?=?w.firstTask;
??w.firstTask?=?null;
??//?允許打斷
??//??new?Worker()?是?state==-1,此處是調(diào)用?Worker?類的?tryRelease()?方法,
??//??將?state?置為0
??w.unlock();
??boolean?completedAbruptly?=?true;
??try?{
????//?調(diào)用?getTask()?獲取任務(wù)
????while?(task?!=?null?||?(task?=?getTask())?!=?null)?{
??????//?獲取全局鎖
??????w.lock();
??????//?確保只有在線程 STOPING 時(shí),才會(huì)被設(shè)置中斷標(biāo)志,否則清除中斷標(biāo)志。
??????//?如果一開始判斷線程池狀態(tài)?
??????//?即線程已經(jīng)被中斷,又清除了中斷標(biāo)示,再次判斷線程池狀態(tài)是否?>=?stop
??????//?是,再次設(shè)置中斷標(biāo)示,wt.interrupt()
??????//?否,不做操作,清除中斷標(biāo)示后進(jìn)行后續(xù)步驟
??????if?((runStateAtLeast(ctl.get(),?STOP)?||
???????????(Thread.interrupted()?&&
????????????runStateAtLeast(ctl.get(),?STOP)))?&&
??????????!wt.isInterrupted())
????????wt.interrupt();
??????try?{
????????//?執(zhí)行前需要調(diào)用的方法,交給程序員自己來實(shí)現(xiàn)
????????beforeExecute(wt,?task);
????????Throwable?thrown?=?null;
????????try?{
??????????task.run();
????????}?catch?(RuntimeException?x)?{
??????????thrown?=?x;?throw?x;
????????}?catch?(Error?x)?{
??????????thrown?=?x;?throw?x;
????????}?catch?(Throwable?x)?{
??????????thrown?=?x;?throw?new?Error(x);
????????}?finally?{
??????????//?執(zhí)行后需要調(diào)用的方法,交給程序員自己來實(shí)現(xiàn)
??????????afterExecute(task,?thrown);
????????}
??????}?finally?{
????????//?把?task?置為?null,完成任務(wù)數(shù)?+?1,并進(jìn)行解鎖
????????task?=?null;
????????w.completedTasks++;
????????w.unlock();
??????}
????}
????completedAbruptly?=?false;
????//?最后處理?worker?的退出
??}?finally?{
????processWorkerExit(w,?completedAbruptly);
??}
}
下面是 runWorker 的執(zhí)行流程圖

這里需要注意一下最后的 processWorkerExit 方法,這里面其實(shí)也做了很多事情,包括判斷 completedAbruptly 的布爾值來表示是否完成任務(wù),獲取鎖,嘗試從隊(duì)列中移除 worker,然后嘗試中斷,接下來會(huì)判斷一下中斷狀態(tài),在線程池當(dāng)前狀態(tài)小于 STOP 的情況下會(huì)創(chuàng)建一個(gè)新的 worker 來替換被銷毀的 worker。
任務(wù)獲取
任務(wù)獲取就是 getTask 方法的執(zhí)行過程,這個(gè)環(huán)節(jié)主要用來獲取任務(wù)和剔除任務(wù)。下面進(jìn)入源碼分析環(huán)節(jié)
private?Runnable?getTask()?{
??//?判斷最后一個(gè) poll 是否超時(shí)。
??boolean?timedOut?=?false;?//?Did?the?last?poll()?time?out?
??for?(;;)?{
????int?c?=?ctl.get();
????int?rs?=?runStateOf(c);
????//?Check?if?queue?empty?only?if?necessary.
????//?必要時(shí)檢查隊(duì)列是否為空
????//?對(duì)線程池狀態(tài)的判斷,兩種情況會(huì)?workerCount-1,并且返回?null
????//?線程池狀態(tài)為?shutdown,且?workQueue?為空(反映了?shutdown?狀態(tài)的線程池還是要執(zhí)行?workQueue?中剩余的任務(wù)的)
????//?線程池狀態(tài)為?stop(shutdownNow()?會(huì)導(dǎo)致變成?STOP)(此時(shí)不用考慮?workQueue?的情況)
????if?(rs?>=?SHUTDOWN?&&?(rs?>=?STOP?||?workQueue.isEmpty()))?{
??????decrementWorkerCount();
??????return?null;
????}
????int?wc?=?workerCountOf(c);
????//?Are?workers?subject?to?culling?
????//?是否需要定時(shí)從?workQueue?中獲取
????boolean?timed?=?allowCoreThreadTimeOut?||?wc?>?corePoolSize;
????//?如果工作線程的數(shù)量大于?maximumPoolSize?會(huì)進(jìn)行線程剔除
????//?如果使用了?allowCoreThreadTimeOut?,并且工作線程不為0或者隊(duì)列有任務(wù)的話,會(huì)直接進(jìn)行線程剔除
????if?((wc?>?maximumPoolSize?||?(timed?&&?timedOut))
????????&&?(wc?>?1?||?workQueue.isEmpty()))?{
??????if?(compareAndDecrementWorkerCount(c))
????????return?null;
??????continue;
????}
??
????try?{
??????Runnable?r?=?timed??
????????workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS)?:
??????workQueue.take();
??????if?(r?!=?null)
????????return?r;
??????timedOut?=?true;
????}?catch?(InterruptedException?retry)?{
??????timedOut?=?false;
????}
??}
}
getTask 方法的執(zhí)行流程圖如下

工作線程退出
工作線程退出是 runWorker 的最后一步,這一步會(huì)判斷工作線程是否突然終止,并且會(huì)嘗試終止線程,以及是否需要增加線程來替換原工作線程。
private?void?processWorkerExit(Worker?w,?boolean?completedAbruptly)?{
??//?worker數(shù)量?-1
??//?completedAbruptly?是?true,突然終止,說明是?task?執(zhí)行時(shí)異常情況導(dǎo)致,即run()方法執(zhí)行時(shí)發(fā)生了異常,那么正在工作的?worker?線程數(shù)量需要-1
??//?completedAbruptly?是?false?是突然終止,說明是?worker?線程沒有?task?可執(zhí)行了,不用-1,因?yàn)橐呀?jīng)在?getTask()?方法中-1了
??if?(completedAbruptly)?//?If?abrupt,?then?workerCount?wasn't?adjusted
????decrementWorkerCount();
??//?從?Workers?Set?中移除?worker
??final?ReentrantLock?mainLock?=?this.mainLock;
??mainLock.lock();
??try?{
????completedTaskCount?+=?w.completedTasks;
????workers.remove(w);
??}?finally?{
????mainLock.unlock();
??}
??//?嘗試終止線程,
??tryTerminate();
??//?是否需要增加?worker?線程
??//?線程池狀態(tài)是?running?或?shutdown
??//?如果當(dāng)前線程是突然終止的,addWorker()
??//?如果當(dāng)前線程不是突然終止的,但當(dāng)前線程數(shù)量?
??//?故如果調(diào)用線程池?shutdown(),直到workQueue為空前,線程池都會(huì)維持?corePoolSize?個(gè)線程,
??//?然后再逐漸銷毀這?corePoolSize?個(gè)線程
??int?c?=?ctl.get();
??if?(runStateLessThan(c,?STOP))?{
????if?(!completedAbruptly)?{
??????int?min?=?allowCoreThreadTimeOut???0?:?corePoolSize;
??????if?(min?==?0?&&?!?workQueue.isEmpty())
????????min?=?1;
??????if?(workerCountOf(c)?>=?min)
????????return;?//?replacement?not?needed
????}
????addWorker(null,?false);
??}
}
源碼搞的有點(diǎn)頭大了,可能一時(shí)半會(huì)無法理解上面這些源碼,不過你可以先把注釋粘過去,等有時(shí)間了需要反復(fù)刺激,加深印象!
其他線程池
下面我們來了解一下其他線程池的構(gòu)造原理,主要涉及 FixedThreadPool、SingleThreadExecutor、CachedThreadPool。
newFixedThreadPool
newFixedThreadPool 被稱為可重用固定線程數(shù)的線程池,下面是 newFixedThreadPool 的源碼
public?static?ExecutorService?newFixedThreadPool(int?nThreads)?{
??return?new?ThreadPoolExecutor(nThreads,?nThreads,
????????????????????????????????0L,?TimeUnit.MILLISECONDS,
????????????????????????????????new?LinkedBlockingQueue());
}
可以看到,newFixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被設(shè)置為創(chuàng)建 FixedThreadPool 時(shí)指定的參數(shù) nThreads,也就是說,在 newFiexedThreadPool 中,核心線程數(shù)就是最大線程數(shù)。
下面是 newFixedThreadPool 的執(zhí)行示意圖

newFixedThreadPool 的工作流程如下
如果當(dāng)前運(yùn)行的線程數(shù)少于 corePoolSize,則會(huì)創(chuàng)建新線程 addworker 來執(zhí)行任務(wù) 如果當(dāng)前線程的線程數(shù)等于 corePoolSize,會(huì)將任務(wù)直接加入到 LinkedBlockingQueue無界阻塞隊(duì)列中,LinkedBlockingQueue 的上限如果沒有制定,默認(rèn)為 Integer.MAX_VALUE 大小。等到線程池中的任務(wù)執(zhí)行完畢后,newFixedThreadPool 會(huì)反復(fù)從 LinkedBlockingQueue 中獲取任務(wù)來執(zhí)行。
相較于 ThreadPoolExecutor,newFixedThreadPool 主要做了以下改變
核心線程數(shù)等于最大線程數(shù),因此 newFixedThreadPool 只有兩個(gè)最大容量,一個(gè)是線程池的線程容量,還有一個(gè)是 LinkedBlockingQueue 無界阻塞隊(duì)列的線程容量。
這里可以看到還有一個(gè)變化是 0L,也就是 keepAliveTime = 0L,keepAliveTime 就是到達(dá)工作線程最大容量后的線程等待時(shí)間,0L 就意味著當(dāng)線程池中的線程數(shù)大于 corePoolsize 時(shí),空余的線程會(huì)被立即終止。
由于使用無界隊(duì)列,運(yùn)行中的 newFixedThreadPool 不會(huì)拒絕任務(wù),也就是不會(huì)調(diào)用 RejectedExecutionHandler.rejectedExecution 方法。
newSingleThreadExecutor
newSingleThreadExecutor 中只有單個(gè)工作線程,也就是說它是一個(gè)只有單個(gè) worker 的 Executor。
public?static?ExecutorService?newSingleThreadExecutor(ThreadFactory?threadFactory)?{
??return?new?FinalizableDelegatedExecutorService
????(new?ThreadPoolExecutor(1,?1,
????????????????????????????0L,?TimeUnit.MILLISECONDS,
????????????????????????????new?LinkedBlockingQueue(),
????????????????????????????threadFactory));
}
可以看到,在 newSingleThreadExecutor 中,corePoolSize 和 maximumPoolSize 都被設(shè)置為 1,也不存在超時(shí)情況,同樣使用了 LinkedBlockingQueue 無界阻塞隊(duì)列,除了 corePoolSize 和 maximumPoolSize 外,其他幾乎和 newFixedThreadPool 一模一樣。
下面是 newSingleThreadExecutor ?的執(zhí)行示意圖

newSingleThreadExecutor 的執(zhí)行過程和 newFixedThreadPool 相同,只是 newSingleThreadExecutor 的工作線程數(shù)為 1。
newCachedThreadPool
newCachedThreadPool 是一個(gè)根據(jù)需要?jiǎng)?chuàng)建工作線程的線程池,newCachedThreadPool 線程池最大數(shù)量是 Integer.MAX_VALUE,保活時(shí)間是 60 秒,使用的是SynchronousQueue 無緩沖阻塞隊(duì)列。
public?static?ExecutorService?newCachedThreadPool(ThreadFactory?threadFactory)?{
??return?new?ThreadPoolExecutor(0,?Integer.MAX_VALUE,
????????????????????????????????60L,?TimeUnit.SECONDS,
????????????????????????????????new?SynchronousQueue(),
????????????????????????????????threadFactory);
}
它的執(zhí)行示意圖如下

首先會(huì)先執(zhí)行 SynchronousQueue.offer 方法,如果當(dāng)前 maximumPool 中有空閑線程正在執(zhí)行 SynchronousQueue.poll,就會(huì)把任務(wù)交給空閑線程來執(zhí)行,execute 方法執(zhí)行完畢,否則的話,繼續(xù)向下執(zhí)行。如果 maximumPool 中沒有線程執(zhí)行 SynchronousQueue.poll 方法,這種情況下 newCachedThreadPool 會(huì)創(chuàng)建一個(gè)新線程執(zhí)行任務(wù),execute 方法執(zhí)行完成。 執(zhí)行完成的線程將執(zhí)行 poll 操作,這個(gè) poll 操作會(huì)讓空閑線程最多在 SynchronousQueue 中等待 60 秒鐘。如果 60 秒鐘內(nèi)提交了一個(gè)新任務(wù),那么空閑線程會(huì)執(zhí)行這個(gè)新提交的任務(wù),否則空閑線程將會(huì)終止。
這里的關(guān)鍵點(diǎn)在于 SynchronousQueue 隊(duì)列,它是一個(gè)沒有容量的阻塞隊(duì)列。每個(gè)插入操作必須等待另一個(gè)線程對(duì)應(yīng)的移除操作。這其實(shí)就是一種任務(wù)傳遞,如下圖所示

其實(shí)還有一個(gè)線程池 ScheduledThreadPoolExecutor ,就先不在此篇文章做詳細(xì)贅述了。
線程池實(shí)踐考量因素
下面介紹幾種在實(shí)踐過程中使用線程池需要考慮的幾個(gè)點(diǎn)
避免任務(wù)堆積,比如我們上面提到的 newFixedThreadPool,它是創(chuàng)建指定數(shù)目的線程,但是工作隊(duì)列是無界的,這就導(dǎo)致如果工作隊(duì)列線程太少,導(dǎo)致處理速度跟不上入隊(duì)速度,這種情況下很可能會(huì)導(dǎo)致 OOM,診斷時(shí)可以使用 jmap檢查是否有大量任務(wù)入隊(duì)。生產(chǎn)實(shí)踐中很可能由于邏輯不嚴(yán)謹(jǐn)或者工作線程不能及時(shí)釋放導(dǎo)致 線程泄漏,這個(gè)時(shí)候最好檢查一下線程棧 避免死鎖等同步問題 盡量避免在使用線程池時(shí)操作 ThreadLocal,因?yàn)楣ぷ骶€程的生命周期可能會(huì)超過任務(wù)的生命周期。
線程池大小的設(shè)置
線程池大小的設(shè)置也是面試官經(jīng)常會(huì)考到的一個(gè)點(diǎn),一般需要根據(jù)任務(wù)類型來配置線程池大小
如果是 CPU 密集型任務(wù),那么就意味著 CPU 是稀缺資源,這個(gè)時(shí)候我們通常不能通過增加線程數(shù)來提高計(jì)算能力,因?yàn)榫€程數(shù)量太多,會(huì)導(dǎo)致頻繁的上下文切換,一般這種情況下,建議合理的線程數(shù)值是 N(CPU)數(shù) + 1。如果是 I/O 密集型任務(wù),就說明需要較多的等待,這個(gè)時(shí)候可以參考 Brain Goetz 的推薦方法 線程數(shù) = CPU核數(shù) × (1 + 平均等待時(shí)間/平均工作時(shí)間)。參考值可以是 N(CPU) 核數(shù) * 2。
當(dāng)然,這只是一個(gè)參考值,具體的設(shè)置還需要根據(jù)實(shí)際情況進(jìn)行調(diào)整,比如可以先將線程池大小設(shè)置為參考值,再觀察任務(wù)運(yùn)行情況和系統(tǒng)負(fù)載、資源利用率來進(jìn)行適當(dāng)調(diào)整。
后記
這篇文章真的寫了很久,因?yàn)橹皩?duì)線程池認(rèn)識(shí)不是很深,所以花了大力氣來研究,希望這篇文章對(duì)你有所幫助。
這篇文章探討了對(duì) Executor 框架的主要組成、線程池結(jié)構(gòu)與生命周期,線程池源碼和線程池實(shí)現(xiàn)的細(xì)節(jié)等方面進(jìn)行了講解和分析,希望對(duì)你有所幫助。
如果這篇文章寫的還不錯(cuò),希望讀者朋友們可以不吝給出四連:點(diǎn)贊、在看、留言、分享,記住這次一定哦!
我是 cxuan ,認(rèn)真寫好每篇文章的技術(shù)人,歡迎關(guān)注我的公眾號(hào)
完
?往期推薦?
??
ARP,這個(gè)隱匿在計(jì)網(wǎng)背后的男人
我畫了 40 張圖就是為了讓你搞懂計(jì)算機(jī)網(wǎng)絡(luò)層
閱片無數(shù)的 cxuan 給你推薦比某 hub 更爽的網(wǎng)站
