該死的線程池,是時候安排一下了

前言
線程池可以說是 Java 進階必備的知識點了,也是面試中必備的考點,可能不少人看了這篇文章后能對線程池工作原理說上一二,但這還遠遠不夠,如果碰到比較有經(jīng)驗的面試官再繼續(xù)追問,很可能會被吊打,考慮如下問題:
Tomcat 的線程池和 JDK 的線程池實現(xiàn)有啥區(qū)別, Dubbo 中有類似 Tomcat 的線程池實現(xiàn)嗎? 我司網(wǎng)關 dubbo 調(diào)用線程池曾經(jīng)出現(xiàn)過這樣的一個問題:壓測時接口可以正常返回,但接口 RT 很高,假設設置的核心線程大小為 500,最大線程為 800,緩沖隊列為 5000,你能從這個設置中發(fā)現(xiàn)出一些問題并對這些參數(shù)進行調(diào)優(yōu)嗎? 線程池里的線程真的有核心線程和非核心線程之分? 線程池被 shutdown 后,還能產(chǎn)生新的線程? 線程把任務丟給線程池后肯定就馬上返回了? 線程池里的線程異常后會再次新增線程嗎,如何捕獲這些線程拋出的異常? 線程池的大小如何設置,如何動態(tài)設置線程池的參數(shù) 線程池的狀態(tài)機畫一下? 阿里 Java 代碼規(guī)范為什么不允許使用 Executors 快速創(chuàng)建線程池? 使用線程池應該避免哪些問題,能否簡單說下線程池的最佳實踐? 如何優(yōu)雅關閉線程池 如何對線程池進行監(jiān)控
相信不少人看了這些問題會有些懵逼
其實這些問題的答案大多數(shù)都藏在線程池的源碼里,所以深入了解線程池的源碼非常重要,本章我們將會來學習一下線程池的源碼,相信看完之后,以上的問題大部分都能回答,另外一些問題我們也會在文中與大家一起探討。
本文將會從以下幾個方面來介紹線程池的原理。
為什么要用線程池 線程池是如何工作的 線程池提交任務的兩種方式 ThreadPoolExecutor 源碼剖析 解答開篇的問題 線程池的最佳實踐 總結(jié)
相信大家看完對線程池的理解會更進一步,肝文不易,看完別完了三連哦。
為什么要用線程池
在上文也提到過,創(chuàng)建線程有三大開銷,如下:
1、其實 Java 中的線程模型是基于操作系統(tǒng)原生線程模型實現(xiàn)的,也就是說 Java 中的線程其實是基于內(nèi)核線程實現(xiàn)的,線程的創(chuàng)建,析構(gòu)與同步都需要進行系統(tǒng)調(diào)用,而系統(tǒng)調(diào)用需要在用戶態(tài)與內(nèi)核中來回切換,代價相對較高,線程的生命周期包括「線程創(chuàng)建時間」,「線程執(zhí)行任務時間」,「線程銷毀時間」,創(chuàng)建和銷毀都需要導致系統(tǒng)調(diào)用。2、每個 Thread 都需要有一個內(nèi)核線程的支持,也就意味著每個 Thread 都需要消耗一定的內(nèi)核資源(如內(nèi)核線程的??臻g),因此能創(chuàng)建的 Thread 是有限的,默認一個線程的線程棧大小是 1 M,有圖有真相

圖中所示,在 Java 8 下,創(chuàng)建 19 個線程(thread #19)需要創(chuàng)建 19535 KB,即 1 M 左右,reserved 代表如果創(chuàng)建 19 個線程,操作系統(tǒng)保證會為其分配這么多空間(實際上并不一定分配),committed 則表示實際已分配的空間大小。
畫外音:注意,這是在 Java 8 下的線程占用空間情況,但在 Java 11 中,對線程作了很大的優(yōu)化,創(chuàng)建一個線程大概只需要 40 KB,空間消耗大大減少
3、線程多了,導致不可忽視的上下文切換開銷。
由此可見,線程的創(chuàng)建是昂貴的,所以必須以線程池的形式來管理這些線程,在線程池中合理設置線程大小和管理線程,以達到以合理的創(chuàng)建線程大小以達到最大化收益,最小化風險的目的,對于開發(fā)人員來說,要完成任務不用關心線程如何創(chuàng)建,如何銷毀,如何協(xié)作,只需要關心提交的任務何時完成即可,對線程的調(diào)優(yōu),監(jiān)控等這些細枝末節(jié)的工作通通交給線程池來實現(xiàn),所以也讓開發(fā)人員得到極大的解脫!
類似線程池的這種池化思想應用在很多地方,比如數(shù)據(jù)庫連接池,Http 連接池等,避免了昂貴資源的創(chuàng)建,提升了性能,也解放了開發(fā)人員。
ThreadPoolExecutor 設計架構(gòu)圖
首先我們來看看 Executor 框架的設計圖
Executor: 最頂層的 Executor 接口只提供了一個 execute 接口,實現(xiàn)了提交任務與執(zhí)行任務的解藕,這個方法是最核心的,也是我們源碼剖析的重點,此方法最終是由 ThreadPoolExecutor 實現(xiàn)的, ExecutorService 擴展了 Executor 接口,實現(xiàn)了終止執(zhí)行器,單個/批量提交任務等方法 AbstractExecutorService 實現(xiàn)了 ExecutorService 接口,實現(xiàn)了除 execute 以外的所有方法,只將一個最重要的 execute 方法交給 ThreadPoolExecutor 實現(xiàn)。
這樣的分層設計雖然層次看起來挺多,但每一層每司其職,邏輯清晰,值得借鑒。
線程池是如何工作的
首先我們來看下如何創(chuàng)建一個線程池
ThreadPoolExecutor?threadPool?=?new?ThreadPoolExecutor(10,?20,?600L,
????????????????????TimeUnit.SECONDS,?new?LinkedBlockingQueue<>(4096),
????????????????????new?NamedThreadFactory("common-work-thread"));
//?設置拒絕策略,默認為?AbortPolicy
threadPool.setRejectedExecutionHandler(new?ThreadPoolExecutor.AbortPolicy());
看下其構(gòu)造方法簽名如下
public?ThreadPoolExecutor(int?corePoolSize,
??????????????????????????????int?maximumPoolSize,
??????????????????????????????long?keepAliveTime,
??????????????????????????????TimeUnit?unit,
??????????????????????????????BlockingQueue?workQueue,
??????????????????????????????ThreadFactory?threadFactory,
??????????????????????????????RejectedExecutionHandler?handler) ?{
????????????//?省略代碼若干
}
要理解這些參數(shù)具體代表的意義,必須清楚線程池提交任務與執(zhí)行任務流程,如下
圖片來自美團技術團隊
步驟如下
1、corePoolSize:如果提交任務后線程還在運行,當線程數(shù)小于 corePoolSize 值時,無論線程池中的線程是否忙碌,都會創(chuàng)建線程,并把任務交給此新創(chuàng)建的線程進行處理,如果線程數(shù)少于等于 corePoolSize,那么這些線程不會回收,除非將 allowCoreThreadTimeOut 設置為 true,但一般不這么干,因為頻繁地創(chuàng)建銷毀線程會極大地增加系統(tǒng)調(diào)用的開銷。
2、workQueue:如果線程數(shù)大于核心數(shù)(corePoolSize)且小于最大線程數(shù)(maximumPoolSize),則會將任務先丟到阻塞隊列里,然后線程自己去阻塞隊列中拉取任務執(zhí)行。
3、maximumPoolSize: 線程池中最大可創(chuàng)建的線程數(shù),如果提交任務時隊列滿了且線程數(shù)未到達這個設定值,則會創(chuàng)建線程并執(zhí)行此次提交的任務,如果提交任務時隊列滿了但線池數(shù)已經(jīng)到達了這個值,此時說明已經(jīng)超出了線池程的負載能力,就會執(zhí)行拒絕策略,這也好理解,總不能讓源源不斷地任務進來把線程池給壓垮了吧,我們首先要保證線程池能正常工作。
4、RejectedExecutionHandler:一共有以下四種拒絕策略
AbortPolicy:丟棄任務并拋出異常,這也是默認策略; CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務,所以開頭的問題「線程把任務丟給線程池后肯定就馬上返回了?」我們可以回答了,如果用的是 CallerRunsPolicy 策略,提交任務的線程(比如主線程)提交任務后并不能保證馬上就返回,當觸發(fā)了這個 reject 策略不得不親自來處理這個任務。 DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,并執(zhí)行當前任務。 DiscardPolicy:直接丟棄任務,不拋出任何異常,這種策略只適用于不重要的任務。
5、keepAliveTime: 線程存活時間,如果在此時間內(nèi)超出 corePoolSize 大小的線程處于 idle 狀態(tài),這些線程會被回收
6、threadFactory:可以用此參數(shù)設置線程池的命名,指定 defaultUncaughtExceptionHandler(有啥用,后文闡述),甚至可以設定線程為守護線程。
現(xiàn)在問題來了,該如何合理設置這些參數(shù)呢。
首先來看線程大小設置
<
針對 CPU 密集型的任務,在有 Ncpu個處理器的系統(tǒng)上,當線程池的大小為 Ncpu + 1 時,通常能實現(xiàn)最優(yōu)的利用率,+1 是因為當計算密集型線程偶爾由于缺頁故障或其他原因而暫停工作時,這個"額外"的線程也能確保 CPU 的時鐘周期不會被浪費,所謂 CPU 密集,就是線程一直在忙碌,這樣將線程池的大小設置為 Ncpu + 1 避免了線程的上下文切換,讓線程時刻處于忙碌狀態(tài),將 CPU 的利用率最大化。 針對 IO 密集型的任務,它也給出了如下計算公式 
這些公式看看就好,實際的業(yè)務場景中基本用不上,這些公式太過理論化了,脫離業(yè)務場景,僅可作個理論參考,舉個例子,你說 CPU 密集型任務設置線程池大小為 N + 1個,但實際上在業(yè)務中往往不只設置一個線程池,這種情況套用的公式就懵逼了

再來看 workQueue 的大小設置
由上文可知,如果最大線程大于核心線程數(shù),當且僅當核心線程滿了且 workQueue 也滿的情況下,才會新增新的線程,也就是說如果 workQueue 是無界隊列,那么當線程數(shù)增加到 corePoolSize 后,永遠不會再新增新的線程了,也就是說此時 maximumPoolSize 的設置就無效了,也無法觸發(fā) RejectedExecutionHandler 拒絕策略,任務只會源源不斷地填充到 workQueue,直到 OOM。

所以 workQueue 應該為有界隊列,至少保證在任務過載的情況下線程池還能正常工作,那么哪些是有有界隊列,哪些是無界隊列呢。
有界隊列我們常用的以下兩個
LinkedBlockingQueue: 鏈表構(gòu)成的有界隊列,按先進先出(FIFO)的順序?qū)υ剡M行排列,但注意在創(chuàng)建時需指定其大小,否則其大小默認為 Integer.MAX_VALUE,相當于無界隊列了 ArrayBlockingQueue: 數(shù)組實現(xiàn)的有界隊列,按先進先出(FIFO)的順序?qū)υ剡M行排列。
無界隊列我們常用 PriorityBlockingQueue 這個優(yōu)先級隊列,任務插入的時候可以指定其權(quán)重以讓這些任務優(yōu)先執(zhí)行,但這個隊列很少用,原因很簡單,線程池里的任務執(zhí)行順序一般是平等的,如果真有必須某些類型的任務需要優(yōu)先執(zhí)行,大不了再開個線程池好了,將不同的任務類型用不同的線程池隔離開來,也是合理利用線程池的一種實踐。
說到這我相信大家應該能回答開頭的問題「阿里 Java 代碼規(guī)范為什么不允許使用 Executors 快速創(chuàng)建線程池?」,最常見的是以下兩種創(chuàng)建方式

newCachedThreadPool 方法的最大線程數(shù)設置成了 Integer.MAX_VALUE,而 newSingleThreadExecutor 方法創(chuàng)建 workQueue 時 LinkedBlockingQueue 未聲明大小,相當于創(chuàng)建了無界隊列,一不小心就會導致 OOM。
threadFactory 如何設置
一般業(yè)務中會有多個線程池,如果某個線程池出現(xiàn)了問題,定位是哪一個線程出問題很重要,所以為每個線程池取一個名字就很有必要了,我司用的 dubbo 的 NamedThreadFactory 來生成 threadFactory,創(chuàng)建很簡單
new?NamedThreadFactory("demo-work")
它的實現(xiàn)還是很巧妙的,有興趣地可以看看它的源碼,每調(diào)用一次,底層有個計數(shù)器會加一,會依次命名為 「demo-work-thread-1」, 「demo-work-thread-2」, 「demo-work-thread-3」這樣遞增的字符串。
在實際的業(yè)務場景中,一般很難確定 corePoolSize, workQueue,maximumPoolSize 的大小,如果出問題了,一般來說只能重新設置一下這些參數(shù)再發(fā)布,這樣往往需要耗費一些時間,美團的這篇文章給出了讓人眼前一亮的解決方案,當發(fā)現(xiàn)問題(線程池監(jiān)控告警)時,動態(tài)調(diào)整這些參數(shù),可以讓這些參數(shù)實時生效,能在發(fā)現(xiàn)問題時及時解決,確實是個很好的思路。
線程池提交任務的兩種方式
線程池創(chuàng)建好了,該怎么給它提交任務,有兩種方式,調(diào)用 execute 和 submit 方法,來看下這兩個方法的方法簽名
//?方式一:execute 方法
public?void?execute(Runnable?command)?{
}
//?方式二:ExecutorService 中 submit 的三個方法
?Future?submit(Callable?task) ;
?Future?submit(Runnable?task,?T?result) ;
Future>?submit(Runnable?task);
區(qū)別在于調(diào)用 execute 無返回值,而調(diào)用 ?submit 可以返回 Future,那么這個 Future 能到底能干啥呢,看它的接口
public?interface?Future<V>?{
????/**
?????*?取消正在執(zhí)行的任務,如果任務已執(zhí)行或已被取消,或者由于某些原因不能取消則返回?false
?????*?如果任務未開始或者任務已開始但可以中斷(mayInterruptIfRunning?為?true),則
?????*?可以取消/中斷此任務
?????*/
????boolean?cancel(boolean?mayInterruptIfRunning);
????/**
?????*?任務在完成前是否已被取消
?????*/
????boolean?isCancelled();
????/**
?????*?正常的執(zhí)行完流程流程,或拋出異常,或取消導致的任務完成都會返回?true
?????*/
????boolean?isDone();
????/**
?????*?阻塞等待任務的執(zhí)行結(jié)果
?????*/
????V?get()?throws?InterruptedException,?ExecutionException;
????/**
?????*?阻塞等待任務的執(zhí)行結(jié)果,不過這里指定了時間,如果在?timeout?時間內(nèi)任務還未執(zhí)行完成,
?????*?則拋出?TimeoutException?異常
?????*/
????V?get(long?timeout,?TimeUnit?unit)
????????throws?InterruptedException,?ExecutionException,?TimeoutException;
}
可以用 Future 取消任務,判斷任務是否已取消/完成,甚至可以阻塞等待結(jié)果。
submit 為啥能提交任務(Runnable)的同時也能返回任務(Future)的執(zhí)行結(jié)果呢

原來在最后執(zhí)行 execute 前用 newTaskFor 將 task 封裝成了 RunnableFuture,newTaskFor 返回了 FutureTask 這個類,結(jié)構(gòu)圖如下

可以看到 FutureTask 這個接口既實現(xiàn)了 Runnable 接口,也實現(xiàn) Future 接口,所以在提交任務的同時也能利用 Future 接口來執(zhí)行任務的取消,獲取任務的狀態(tài),等待執(zhí)行結(jié)果這些操作。
execute 與 submit 除了是否能返回執(zhí)行結(jié)果這一區(qū)別外,還有一個重要區(qū)別,那就是使用 execute 執(zhí)行如果發(fā)生了異常,是捕獲不到的,默認會執(zhí)行 ThreadGroup 的 uncaughtException 方法(下圖數(shù)字 2 對應的邏輯)
所以如果你想監(jiān)控執(zhí)行 execute 方法時發(fā)生的異常,需要通過 threadFactory 來指定一個 UncaughtExceptionHandler,這樣就會執(zhí)行上圖中的 1,進而執(zhí)行 UncaughtExceptionHandler 中的邏輯,如下所示:
//1.實現(xiàn)一個自己的線程池工廠
ThreadFactory?factory?=?(Runnable?r)?->?{
????//創(chuàng)建一個線程
????Thread?t?=?new?Thread(r);
????//給創(chuàng)建的線程設置UncaughtExceptionHandler對象?里面實現(xiàn)異常的默認邏輯
????t.setDefaultUncaughtExceptionHandler((Thread?thread1,?Throwable?e)?->?{
????????//?在此設置統(tǒng)計監(jiān)控邏輯
????????System.out.println("線程工廠設置的exceptionHandler"?+?e.getMessage());
????});
????return?t;
};
//?2.創(chuàng)建一個自己定義的線程池,使用自己定義的線程工廠
ExecutorService?service?=?new?ThreadPoolExecutor(1,?1,?0,?TimeUnit.MILLISECONDS,new?LinkedBlockingQueue(10),factory);
//3.提交任務
service.execute(()->{
????int?i=1/0;
});
執(zhí)行以上邏輯最終會輸出「線程工廠設置的exceptionHandler/ by zero」,通過這樣的方式就能通過設定的 defaultUncaughtExceptionHandler 來執(zhí)行我們的監(jiān)控邏輯了。
如果用 submit ,如何捕獲異常呢,當我們調(diào)用 future.get 就可以捕獲
Callable?testCallable?=?xxx;
Future?future?=?executor.submit(myCallable);
try?{
????future1.get(3));
}?catch?(InterruptedException?e)?{
????e.printStackTrace();
}?catch?(ExecutionException?e)?{
????e.printStackTrace();
}
那么 future 為啥在 get 的時候才捕獲異步呢,因為在執(zhí)行 submit 時拋出異常后此異常被保存了起來,而在 get 的時候才被拋出

關于 execute 和 submit 的執(zhí)行流程 why 神的這篇文章寫得非常透徹,我就不拾人牙慧了,建議大家好好品品,收獲會很大!
ThreadPoolExecutor 源碼剖析
前面鋪墊了這么多,終于到了最核心的源碼剖析環(huán)節(jié)了。
對于線程池來說,我們最關心的是它的「狀態(tài)」和「可運行的線程數(shù)量」,一般來說我們可以選擇用兩個變量來記錄,不過 Doug Lea 只用了一個變量(ctl)就達成目的了,我們知道變量越多,代碼的可維護性就越差,也越容易出 bug, 所以只用一個變量就達成了兩個變量的效果,這讓代碼的可維護性大大提高,那么他是怎么設計的呢
//?ThreadPoolExecutor.java
public?class?ThreadPoolExecutor?extends?AbstractExecutorService?{
????private?final?AtomicInteger?ctl?=?new?AtomicInteger(ctlOf(RUNNING,?0));
????private?static?final?int?COUNT_BITS?=?Integer.SIZE?-?3;
????private?static?final?int?CAPACITY???=?(1?<1;
????//?結(jié)果:111 00000000000000000000000000000
????private?static?final?int?RUNNING????=?-1?<????//?結(jié)果:?000?00000000000000000000000000000
????private?static?final?int?SHUTDOWN???=??0?<????//?結(jié)果:?001 00000000000000000000000000000
????private?static?final?int?STOP???????=??1?<????//?結(jié)果:?010?00000000000000000000000000000
????private?static?final?int?TIDYING????=??2?<????//?結(jié)果:?011 00000000000000000000000000000
????private?static?final?int?TERMINATED?=??3?<
????//?獲取線程池的狀態(tài)
????private?static?int?runStateOf(int?c)?????{?return?c?&?~CAPACITY;?}
????//?獲取線程數(shù)量
????private?static?int?workerCountOf(int?c)??{?return?c?&?CAPACITY;?}
}
可以看到,ctl 是一個 原子類的 Integer 變量,有 32 位,低 29 位表示線程數(shù)量, 29 位最大可以表示 (2^29)-1 (大概 5 億多),足夠記錄線程大小了,如果未來還是不夠,可以把 ctl 聲明為 AtomicLong,高 3 位用來表示線程池的狀態(tài),3 位可以表示 8 個線程池的狀態(tài),由于線程池總共只有五個狀態(tài),所以 3 位也是足夠了,線程池的五個狀態(tài)如下
RUNNING: 接收新的任務,并能繼續(xù)處理 workQueue 中的任務 SHUTDOWN: 不再接收新的任務,不過能繼續(xù)處理 workQueue 中的任務 STOP: 不再接收新的任務,也不再處理 workQueue 中的任務,并且會中斷正在處理任務的線程 TIDYING: 所有的任務都完結(jié)了,并且線程數(shù)量(workCount)為 0 時即為此狀態(tài),進入此狀態(tài)后會調(diào)用 terminated() 這個鉤子方法進入 TERMINATED 狀態(tài) TERMINATED: 調(diào)用 terminated() 方法后即為此狀態(tài)
線程池的狀態(tài)流轉(zhuǎn)及觸發(fā)條件如下
有了這些基礎,我們來分析下 execute 的源碼
public?void?execute(Runnable?command)?{
????if?(command?==?null)
????????throw?new?NullPointerException();
????int?c?=?ctl.get();
????//?如果當前線程數(shù)少于核心線程數(shù)(corePoolSize),無論核心線程是否忙碌,都創(chuàng)建線程,直到達到?corePoolSize?為止
????if?(workerCountOf(c)?????????//?創(chuàng)建線程并將此任務交給?worker?處理(此時此任務即?worker?中的?firstTask)
????????if?(addWorker(command,?true))
????????????return;
????????c?=?ctl.get();
????}
????//?如果線程池處于?RUNNING?狀態(tài),并且線程數(shù)大于?corePoolSize?或者?
????//?線程數(shù)少于?corePoolSize?但創(chuàng)建線程失敗了,則將任務丟進?workQueue?中
????if?(isRunning(c)?&&?workQueue.offer(command))?{
????????int?recheck?=?ctl.get();
????????//?這里需要再次檢查線程池是否處于?RUNNING?狀態(tài),因為在任務入隊后可能線程池狀態(tài)會發(fā)生變化,(比如調(diào)用了?shutdown?方法等),如果線程狀態(tài)發(fā)生變化了,則移除此任務,執(zhí)行拒絕策略
????????if?(!?isRunning(recheck)?&&?remove(command))
????????????reject(command);
????????//?如果線程池在?RUNNING?狀態(tài)下,線程數(shù)為?0,則新建線程加速處理?workQueue?中的任務
????????else?if?(workerCountOf(recheck)?==?0)
????????????addWorker(null,?false);
????}
????//?這段邏輯說明線程數(shù)大于?corePoolSize?且任務入隊失敗了,此時會以最大線程數(shù)(maximumPoolSize)為界來創(chuàng)建線程,如果失敗,說明線程數(shù)超過了?maximumPoolSize,則執(zhí)行拒絕策略
????else?if?(!addWorker(command,?false))
????????reject(command);
}
從這段代碼中可以看到,創(chuàng)建線程是調(diào)用 addWorker 實現(xiàn)的,在分析 addWorker 之前,有必要簡單提一下 Worker,線程池把每一個執(zhí)行任務的線程都封裝為 Worker 的形式,取名為 Worker 很形象,線程池的本質(zhì)是生產(chǎn)者-消費者模型,生產(chǎn)者不斷地往 workQueue 中丟 task, workQueue 就像流水線一樣不斷地輸送著任務,而 worker(工人) 不斷地取任務來執(zhí)行

將線程封裝為 worker 主要是為了更好地管理線程的中斷
來看下 Worker 的定義
//?此處可以看出?worker?既是一個?Runnable?任務,也實現(xiàn)了?AQS(實際上是用?AQS?實現(xiàn)了一個獨占鎖,這樣由于?worker?運行時會上鎖,執(zhí)行?shutdown,setCorePoolSize,setMaximumPoolSize等方法時會試著中斷線程(interruptIdleWorkers)?,在這個方法中斷方法中會先嘗試獲取?worker?的鎖,如果不成功,說明?worker?在運行中,此時會先讓?worker?執(zhí)行完任務再關閉?worker?的線程,實現(xiàn)優(yōu)雅關閉線程的目的)
private?final?class?Worker
????extends?AbstractQueuedSynchronizer
????implements?Runnable
????{
????????private?static?final?long?serialVersionUID?=?6138294804551838833L;
????????//?實際執(zhí)行任務的線程
????????final?Thread?thread;
????????//?上文提到,如果當前線程數(shù)少于核心線程數(shù),創(chuàng)建線程并將提交的任務交給?worker?處理處理,此時?firstTask?即為此提交的任務,如果?worker?從?workQueue?中獲取任務,則?firstTask?為空
????????Runnable?firstTask;
????????//?統(tǒng)計完成的任務數(shù)
????????volatile?long?completedTasks;
????????Worker(Runnable?firstTask)?{
????????????//?初始化為?-1,這樣在線程運行前(調(diào)用runWorker)禁止中斷,在?interruptIfStarted()?方法中會判斷?getState()>=0
????????????setState(-1);?
????????????this.firstTask?=?firstTask;
????????????//?根據(jù)線程池的?threadFactory?創(chuàng)建一個線程,將?worker?本身傳給線程(因為?worker?實現(xiàn)了?Runnable?接口)
????????????this.thread?=?getThreadFactory().newThread(this);
????????}
????????public?void?run()?{
????????????//?thread?啟動后會調(diào)用此方法
????????????runWorker(this);
????????}
???????
????????//?1?代表被鎖住了,0?代表未鎖
????????protected?boolean?isHeldExclusively()?{
????????????return?getState()?!=?0;
????????}
????????//?嘗試獲取鎖
????????protected?boolean?tryAcquire(int?unused)?{
????????????//?從這里可以看出它是一個獨占鎖,因為當獲取鎖后,cas?設置?state?不可能成功,這里我們也能明白上文中將?state?設置為?-1?的作用,這種情況下永遠不可能獲取得鎖,而?worker?要被中斷首先必須獲取鎖
????????????if?(compareAndSetState(0,?1))?{
????????????????setExclusiveOwnerThread(Thread.currentThread());
????????????????return?true;
????????????}
????????????return?false;
????????}
????????//?嘗試釋放鎖
????????protected?boolean?tryRelease(int?unused)?{
????????????setExclusiveOwnerThread(null);
????????????setState(0);
????????????return?true;
????????}????
????????public?void?lock()????????{?acquire(1);?}
????????public?boolean?tryLock()??{?return?tryAcquire(1);?}
????????public?void?unlock()??????{?release(1);?}
????????public?boolean?isLocked()?{?return?isHeldExclusively();?}
????????????
????????//?中斷線程,這個方法會被?shutdowNow?調(diào)用,從中可以看出?shutdownNow?要中斷線程不需要獲取鎖,也就是說如果線程正在運行,照樣會給你中斷掉,所以一般來說我們不用?shutdowNow?來中斷線程,太粗暴了,中斷時線程很可能在執(zhí)行任務,影響任務執(zhí)行
????????void?interruptIfStarted()?{
????????????Thread?t;
????????????//?中斷也是有條件的,必須是?state?>=?0?且?t?!=?null?且線程未被中斷
????????????//?如果?state?==?-1?,不執(zhí)行中斷,再次明白了為啥上文中?setState(-1)?的意義
????????????if?(getState()?>=?0?&&?(t?=?thread)?!=?null?&&?!t.isInterrupted())?{
????????????????try?{
????????????????????t.interrupt();
????????????????}?catch?(SecurityException?ignore)?{
????????????????}
????????????}
????????}
????}
通過上文對 Worker 類的分析,相信大家不難理解 將線程封裝為 worker 主要是為了更好地管理線程的中斷 這句話。
理解了 Worker 的意義,我們再來看 addWorker 的方法
private?boolean?addWorker(Runnable?firstTask,?boolean?core)?{
????retry:
????for?(;;)?{
????????int?c?=?ctl.get();
????????//?獲取線程池的狀態(tài)
????????int?rs?=?runStateOf(c);
????????//?如果線程池的狀態(tài)?>=?SHUTDOWN,即為?SHUTDOWN,STOP,TIDYING,TERMINATED?這四個狀態(tài),只有一種情況有可能創(chuàng)建線程,即線程狀態(tài)為?SHUTDOWN,?且隊列非空時,firstTask?==?null?代表創(chuàng)建一個不接收新任務的線程(此線程會從?workQueue?中獲取任務再執(zhí)行),這種情況下創(chuàng)建線程是為了加速處理完?workQueue?中的任務
????????if?(rs?>=?SHUTDOWN?&&
????????????!?(rs?==?SHUTDOWN?&&
???????????????firstTask?==?null?&&
???????????????!?workQueue.isEmpty()))
????????????return?false;
????????for?(;;)?{
????????????//?獲取線程數(shù)
????????????int?wc?=?workerCountOf(c);
????????????//?如果超過了線程池的最大?CAPACITY(5?億多,基本不可能)
????????????//?或者?超過了?corePoolSize(core?為?true)?或者?maximumPoolSize(core?為?false)?時
????????????//?則返回?false
????????????if?(wc?>=?CAPACITY?||
????????????????wc?>=?(core???corePoolSize?:?maximumPoolSize))
????????????????return?false;
????????????//?否則?CAS?增加線程的數(shù)量,如果成功跳出雙重循環(huán)
????????????if?(compareAndIncrementWorkerCount(c))
????????????????break?retry;
????????????c?=?ctl.get();??//?Re-read?ctl
????????????//?如果線程運行狀態(tài)發(fā)生變化,跳到外層循環(huán)繼續(xù)執(zhí)行
????????????if?(runStateOf(c)?!=?rs)
????????????????continue?retry;
????????????//?說明是因為?CAS?增加線程數(shù)量失敗所致,繼續(xù)執(zhí)行?retry?的內(nèi)層循環(huán)
????????}
????}
????boolean?workerStarted?=?false;
????boolean?workerAdded?=?false;
????Worker?w?=?null;
????try?{
????????//?能執(zhí)行到這里,說明滿足增加?worker?的條件了,所以創(chuàng)建?worker,準備添加進線程池中執(zhí)行任務
????????w?=?new?Worker(firstTask);
????????final?Thread?t?=?w.thread;
????????if?(t?!=?null)?{
????????????//?加鎖,是因為下文要把?w?添加進?workers?中,?workers?是?HashSet,不是線程安全的,所以需要加鎖予以保證
????????????final?ReentrantLock?mainLock?=?this.mainLock;
????????????mainLock.lock();
????????????try?{
????????????????//??再次?check?線程池的狀態(tài)以防執(zhí)行到此步時發(fā)生中斷等
????????????????int?rs?=?runStateOf(ctl.get());
????????????????//?如果線程池狀態(tài)小于?SHUTDOWN(即為?RUNNING),
????????????????//?或者狀態(tài)為?SHUTDOWN?但?firstTask?==?null(代表不接收任務,只是創(chuàng)建線程處理?workQueue?中的任務),則滿足添加?worker?的條件
????????????????if?(rs?????????????????????(rs?==?SHUTDOWN?&&?firstTask?==?null))?{
????????????????????????????????????????//?如果線程已啟動,顯然有問題(因為創(chuàng)建?worker?后,還沒啟動線程呢),拋出異常
????????????????????if?(t.isAlive())?
????????????????????????throw?new?IllegalThreadStateException();
????????????????????workers.add(w);
????????????????????int?s?=?workers.size();
????????????????????//?記錄最大的線程池大小以作監(jiān)控之用
????????????????????if?(s?>?largestPoolSize)
????????????????????????largestPoolSize?=?s;
????????????????????workerAdded?=?true;
????????????????}
????????????}?finally?{
????????????????mainLock.unlock();
????????????}
????????????//?說明往?workers?中添加?worker?成功,此時啟動線程
????????????if?(workerAdded)?{
????????????????t.start();
????????????????workerStarted?=?true;
????????????}
????????}
????}?finally?{
????????//?添加線程失敗,執(zhí)行?addWorkerFailed?方法,主要做了將?worker?從?workers?中移除,減少線程數(shù),并嘗試著關閉線程池這樣的操作
????????if?(!?workerStarted)
????????????addWorkerFailed(w);
????}
????return?workerStarted;
}
從這段代碼我們可以看到多線程下情況的不可預料性,我們發(fā)現(xiàn)在滿足條件情況下,又對線程狀態(tài)重新進行了 check,以防期間出現(xiàn)中斷等線程池狀態(tài)發(fā)生變更的操作,這也給我們以啟發(fā):多線程環(huán)境下的各種臨界條件一定要考慮到位。
執(zhí)行 addWorker 創(chuàng)建 worker 成功后,線程開始執(zhí)行了(t.start()),由于在創(chuàng)建 Worker 時,將 Worker ?自己傳給了此線程,所以啟動線程后,會調(diào)用 ?Worker 的 run 方法
public?void?run()?{
????runWorker(this);
}
可以看到最終會調(diào)用 ?runWorker 方法,接下來我們來分析下 runWorker 方法
final?void?runWorker(Worker?w)?{
????Thread?wt?=?Thread.currentThread();
????Runnable?task?=?w.firstTask;
????w.firstTask?=?null;
????//?unlock?會調(diào)用?tryRelease?方法將?state?設置成?0,代表允許中斷,允許中斷的條件上文我們在?interruptIfStarted()?中有提過,即?state?>=?0
????w.unlock();
????boolean?completedAbruptly?=?true;
????try?{
????????//?如果在提交任務時創(chuàng)建了線程,并把任務丟給此線程,則會先執(zhí)行此?task
????????//?否則從任務隊列中獲取?task?來執(zhí)行(即?getTask()?方法)
????????while?(task?!=?null?||?(task?=?getTask())?!=?null)?{
????????????w.lock();
????????????
????????????//?如果線程池狀態(tài)為?>=?STOP(即?STOP,TIDYING,TERMINATED?)時,則線程應該中斷
????????????//?如果線程池狀態(tài)?
????????????if?((runStateAtLeast(ctl.get(),?STOP)?||
?????????????????(Thread.interrupted()?&&
??????????????????runStateAtLeast(ctl.get(),?STOP)))?&&
????????????????!wt.isInterrupted())
????????????????wt.interrupt();
????????????try?{
????????????????//?執(zhí)行任務前,子類可實現(xiàn)此鉤子方法作為統(tǒng)計之用
????????????????beforeExecute(wt,?task);
????????????????Throwable?thrown?=?null;
????????????????try?{
????????????????????task.run();
????????????????}?catch?(RuntimeException?x)?{
????????????????????thrown?=?x;?throw?x;
????????????????}?catch?(Error?x)?{
????????????????????thrown?=?x;?throw?x;
????????????????}?catch?(Throwable?x)?{
????????????????????thrown?=?x;?throw?new?Error(x);
????????????????}?finally?{
????????????????????//?執(zhí)行任務后,子類可實現(xiàn)此鉤子方法作為統(tǒng)計之用
????????????????????afterExecute(task,?thrown);
????????????????}
????????????}?finally?{
????????????????task?=?null;
????????????????w.completedTasks++;
????????????????w.unlock();
????????????}
????????}
????????completedAbruptly?=?false;
????}?finally?{
????????//?如果執(zhí)行到這只有兩種可能,一種是執(zhí)行過程中異常中斷了,一種是隊列里沒有任務了,從這里可以看出線程沒有核心線程與非核心線程之分,哪個任務異常了或者正常退出了都會執(zhí)行此方法,此方法會根據(jù)情況將線程數(shù)-1
????????processWorkerExit(w,?completedAbruptly);
????}
}
來看看 processWorkerExit 方法是咋樣的
private?void?processWorkerExit(Worker?w,?boolean?completedAbruptly)?{
????????//?如果異常退出,cas?執(zhí)行線程池減?1?操作
????if?(completedAbruptly)?
????????decrementWorkerCount();
????final?ReentrantLock?mainLock?=?this.mainLock;
????mainLock.lock();
????try?{
????????completedTaskCount?+=?w.completedTasks;
????????//?加鎖確保線程安全地移除?worker?
????????workers.remove(w);
????}?finally?{
????????mainLock.unlock();
????}
????//?woker?既然異常退出,可能線程池狀態(tài)變了(如執(zhí)行?shutdown?等),嘗試著關閉線程池
????tryTerminate();
????int?c?=?ctl.get();
????//??如果線程池處于?STOP?狀態(tài),則如果?woker?是異常退出的,重新新增一個?woker,如果是正常退出的,在?wokerQueue?為非空的條件下,確保至少有一個線程在運行以執(zhí)行?wokerQueue?中的任務????if?(runStateLessThan(c,?STOP))?{
????????if?(!completedAbruptly)?{
????????????int?min?=?allowCoreThreadTimeOut???0?:?corePoolSize;
????????????if?(min?==?0?&&?!?workQueue.isEmpty())
????????????????min?=?1;
????????????if?(workerCountOf(c)?>=?min)
????????????????return;?//?replacement?not?needed
????????}
????????addWorker(null,?false);
????}
}
接下來我們分析 woker 從 workQueue 中取任務的方法 getTask
private?Runnable?getTask()?{
????boolean?timedOut?=?false;?//?Did?the?last?poll()?time?out?
????for?(;;)?{
????????int?c?=?ctl.get();
????????int?rs?=?runStateOf(c);
????????//?如果線程池狀態(tài)至少為?STOP?或者
????????//?線程池狀態(tài)?==?SHUTDOWN?并且任務隊列是空的
????????//?則減少線程數(shù)量,返回?null,這種情況下上文分析的?runWorker?會執(zhí)行?processWorkerExit?從而讓獲取此?Task?的?woker?退出
????????if?(rs?>=?SHUTDOWN?&&?(rs?>=?STOP?||?workQueue.isEmpty()))?{
????????????decrementWorkerCount();
????????????return?null;
????????}
????????int?wc?=?workerCountOf(c);
????????//?如果?allowCoreThreadTimeOut?為?true,代表任何線程在?keepAliveTime?時間內(nèi)處于?idle?狀態(tài)都會被回收,如果線程數(shù)大于?corePoolSize,本身在?keepAliveTime?時間內(nèi)處于?idle?狀態(tài)就會被回收
????????boolean?timed?=?allowCoreThreadTimeOut?||?wc?>?corePoolSize;
????????//?worker?應該被回收的幾個條件,這個比較簡單,就此略過
????????if?((wc?>?maximumPoolSize?||?(timed?&&?timedOut))
????????????&&?(wc?>?1?||?workQueue.isEmpty()))?{
????????????if?(compareAndDecrementWorkerCount(c))
????????????????return?null;
????????????continue;
????????}
????????try?{
???????????//?阻塞獲取?task,如果在?keepAliveTime?時間內(nèi)未獲取任務,說明超時了,此時?timedOut?為?true
????????????Runnable?r?=?timed??
????????????????workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS)?:
????????????????workQueue.take();
????????????if?(r?!=?null)
????????????????return?r;
????????????timedOut?=?true;
????????}?catch?(InterruptedException?retry)?{
????????????timedOut?=?false;
????????}
????}
}
經(jīng)過以上源碼剖析,相信我們對線程池的工作原理了解得八九不離十了,再來簡單過一下其他一些比較有用的方法,開頭我們提到線程池的監(jiān)控問題,我們看一下可以監(jiān)控哪些指標
int getCorePoolSize():獲取核心線程數(shù)。 int getLargestPoolSize():歷史峰值線程數(shù)。 int getMaximumPoolSize():最大線程數(shù)(線程池線程容量)。 int getActiveCount():當前活躍線程數(shù) int getPoolSize():當前線程池中的線程總數(shù) BlockingQueue getQueue() 當前線程池的任務隊列,據(jù)此可以獲取積壓任務的總數(shù),getQueue.size()
監(jiān)控思路也很簡單,開啟一個定時線程 ScheduledThreadPoolExecutor,定期對這些線程池指標進行采集,一般會采用一些開源工具如 Grafana + Prometheus + MicroMeter 來實現(xiàn)。
如何實現(xiàn)核心線程池的預熱
使用 ?prestartAllCoreThreads() 方法,這個方法會一次性創(chuàng)建 corePoolSize 個線程,無需等到提交任務時才創(chuàng)建,提交創(chuàng)建好線程的話,一有任務提交過來,這些線程就可以立即處理。
如何實現(xiàn)動態(tài)調(diào)整線程池參數(shù)
setCorePoolSize(int corePoolSize) 調(diào)整核心線程池大小 setMaximumPoolSize(int maximumPoolSize) setKeepAliveTime() 設置線程的存活時間
解答開篇的問題
其它問題基本都在源碼剖析環(huán)節(jié)回答了,這里簡單說下其他問題
1、Tomcat 的線程池和 JDK 的線程池實現(xiàn)有啥區(qū)別, Dubbo 中有類似 Tomcat 的線程池實現(xiàn)嗎?
Dubbo 中一個叫 EagerThreadPool 的東西,可以看看它的使用說明
從注釋里可以看出,如果核心線程都處于 busy 狀態(tài),如果有新的請求進來,EagerThreadPool 會選擇先創(chuàng)建線程,而不是將其放入任務隊列中,這樣可以更快地響應這些請求。
Tomcat 實現(xiàn)也是與此類似的,只不過稍微有所不同,當 Tomcat 啟動時,會先創(chuàng)建 minSpareThreads 個線程,如果經(jīng)過一段時間收到請求時這些線程都處于忙碌狀態(tài),每次都會以 minSpareThreads 的步長創(chuàng)建線程,本質(zhì)上也是為了更快地響應處理請求。具體的源碼可以看它的 ThreadPool 實現(xiàn),這里就不展開了。
2、我司網(wǎng)關 dubbo 調(diào)用線程池曾經(jīng)出現(xiàn)過這樣的一個問題:壓測時接口可以正常返回,但接口 RT 很高,假設設置的核心線程大小為 500,最大線程為 800,緩沖隊列為 5000,你能從這個設置中發(fā)現(xiàn)出一些問題并對這些參數(shù)進行調(diào)優(yōu)嗎?這個參數(shù)明顯能看出問題來,首先任務隊列設置過大,任務達到核心線程后,如果再有請求進來會先進入任務隊列,隊列滿了之后才創(chuàng)建線程,創(chuàng)建線程也是需要不少開銷的,所以我們后來把核心線程設置成了與最大線程一樣,并且調(diào)用 prestartAllCoreThreads() 來預熱核心線程,就不用等請求來時再創(chuàng)建線程了。
線程池的幾個最佳實踐
1、線程池執(zhí)行的任務應該是互相獨立的,如果互相依賴的話,可能導致死鎖,比如下面這樣的代碼
ExecutorService?pool?=?Executors
??.newSingleThreadExecutor();
pool.submit(()?->?{
??try?{
????String?qq=pool.submit(()->"QQ").get();
????System.out.println(qq);
??}?catch?(Exception?e)?{
??}
});
2、核心任務與非核心任務最好能用多個線程池隔離開來
曾經(jīng)我們業(yè)務上就出現(xiàn)這樣的一個故障:突然很多用戶反饋短信收不到了,排查才發(fā)現(xiàn)發(fā)短信是在一個線程池里,而另外的定時腳本也是用的這個線程池來執(zhí)行任務,這個腳本一分鐘可能產(chǎn)生幾百上千條任務,導致發(fā)短信的方法在線程池里基本沒機會執(zhí)行,后來我們用了兩個線程池把發(fā)短信和執(zhí)行腳本隔離開來解決了問題。
3、添加線程池監(jiān)控,動態(tài)設置線程池
如前文所述,線程池的各個參數(shù)很難一次性確定,既然難以確定,又要保證發(fā)現(xiàn)問題后及時解決,我們就需要為線程池增加監(jiān)控,監(jiān)控隊列大小,線程數(shù)量等,我們可以設置 3 分鐘內(nèi)比如隊列任務一直都是滿了的話,就觸發(fā)告警,這樣可以提前預警,如果線上因為線程池參數(shù)設置不合理而觸發(fā)了降級等操作,可以通過動態(tài)設置線程池的方式來實時修改核心線程數(shù),最大線程數(shù)等,將問題及時修復。
總結(jié)
本文詳細剖析了線程池的工作原理,相信大家對其工作機制應該有了較深入的了解,也對開頭的幾個問題有了較清楚的認識,本質(zhì)上設置線程池的目的是為了利用有效的資源最大化性能,最小化風險,同時線程池的使用本質(zhì)上是為了更好地為用戶服務,據(jù)此也不難明白 Tomcat, Dubbo 要另起爐灶來設置自己的線程池了。
最后歡迎大家加我私人微信,一起討論,共同進步,拉你進讀者群,2020 難過,我們一起抱團取暖!
巨人的肩膀
https://dzone.com/articles/how-much-memory-does-a-java-thread-take https://segmentfault.com/a/1190000021047279 https://www.cnblogs.com/trust-freedom/p/6681948.html 深入理解線程池 https://tinyurl.com/y675j928 有的線程它死了,于是它變成一道面試題 https://mp.weixin.qq.com/s/wrTVGLDvhE-eb5lhygWEqQ Java 并發(fā)編程實戰(zhàn) Java線程池實現(xiàn)原理及其在美團業(yè)務中的實踐: https://mp.weixin.qq.com/s/baYuX8aCwQ9PP6k7TDl2Ww
最后歡迎大家加我好友,拉你進技術交流群,群里有很多 BAT 的大咖,可以提問,互相交流,內(nèi)推等,進群一起抱團取暖


