《蹲坑也能進(jìn)大廠》多線程系列-線程池源碼終結(jié)篇

前言
多線程系列我們前面已經(jīng)更新過(guò)七個(gè)章節(jié)了,強(qiáng)烈建議小伙伴按照順序?qū)W習(xí):
《蹲坑也能進(jìn)大廠》多線程系列文章目錄
無(wú)論是Java小白還是高級(jí)大佬,線程池基本是面試中的必考題,這個(gè)知識(shí)點(diǎn)雖然涉及的東西較多,但在理解上卻不是很難,非常適合在面試上拿分。在上一篇文章《蹲坑也能進(jìn)大廠》多線程系列-線程池精講(必看)中,小伙伴們對(duì)線程池應(yīng)該有了一個(gè)基本的認(rèn)識(shí),對(duì)線程池不是很了解的同學(xué),建議先看上一篇。
本篇花Gie對(duì)如何用好線程池以及它的原理進(jìn)行探討,在面試中想要拿到更理想的工資,以及在日常工作中的問題排查,了解它的內(nèi)部結(jié)構(gòu)是必不可少的。
ps:本文使用JDK8環(huán)境講解
正文
我:狗哥狗哥,學(xué)完了上一章,可以幫我總結(jié)一下線程池的有哪幾個(gè)重要部分組成嗎?
問題不大,線程池的組成部分主要有四個(gè):
線程池管理器:用于管理線程池,如停止線程池、創(chuàng)建線程池等;
工作線程:用于從隊(duì)列中讀取并執(zhí)行任務(wù);
任務(wù)隊(duì)列:存放來(lái)不及執(zhí)行的任務(wù);
任務(wù)接口:一個(gè)一個(gè)被用來(lái)執(zhí)行的任務(wù),未執(zhí)行時(shí)存放在任務(wù)隊(duì)列中。
我:線程池的家族史可以介紹一下嗎?那么多類我都快亂死了
線程池涉及的類是比較多,但區(qū)分下來(lái)還是不難理解的,我們先來(lái)看這個(gè)結(jié)構(gòu)圖,這幾種是我們經(jīng)常看到的:
Executor:是一個(gè)頂級(jí)接口,內(nèi)部只包含一個(gè)
execute()方法;ExecutorService:也是一個(gè)接口,它繼承了Executor接口,并新增了
shutdown()、submit()等方法;Executors:是一個(gè)工具類,它提供了我們常用的創(chuàng)建線程方法,例如:
newSingleThreadExecutor、newFixedThreadPool等。ThreadPoolExecutor:是真正意義的線程池。

我:搜得思耐,也不是很難嘛,那如何向線程池中提交任務(wù)呢?
花Gie,你居然在我面前裝X,看我教你做人。
提交任務(wù)方式有兩種,其實(shí)本質(zhì)上還是一種,因?yàn)?code>submit最終調(diào)用的還是execute()方法:
execute():用于提交不需要返回值的任務(wù),所以也就意味著無(wú)法判斷是否執(zhí)行成功。
ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
service.execute(new Runnable() {
@Override
public void run() {}
});
復(fù)制代碼submit:線程池會(huì)返回一個(gè)future類型的對(duì)象,通過(guò)這個(gè)future對(duì)象可以判讀是否執(zhí)行成功,并且還可以通過(guò)get()方法來(lái)獲取返回值。
ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
Future<Object> future =(Future<Object>) service.submit(new Runnable() {
@Override
public void run() {
System.out.println(1);
}
});
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}finally {
//關(guān)閉線程池
service.shutdown();
}
復(fù)制代碼我:so easy嘛,狗子,有沒有刺激點(diǎn)的
嘚瑟,你再給我嘚瑟,既然基礎(chǔ)知識(shí)點(diǎn)掌握完了,那我們就來(lái)深入了解一下源碼吧,用源碼的方式了解線程池的一生。
這么突然嗎,我甚至還不知道線程池的生命周期有哪幾種......

線程池的生命周期有五個(gè):
RUNNING:此時(shí)能夠接受新任務(wù),并處理排隊(duì)任務(wù);
SHUTDOWN:不再接受新任務(wù),但是會(huì)處理排隊(duì)任務(wù);
STOP:不接受新任務(wù),也不處理排隊(duì)任務(wù),并且會(huì)中斷正在執(zhí)行的任務(wù);
TIDYING:所有任務(wù)都已終止,workworkerCount為零時(shí),線程就會(huì)轉(zhuǎn)換到此狀態(tài),并且運(yùn)行
terminated()函數(shù);TERMINATED:terminated()函數(shù)運(yùn)行完成。
我:阿里嘎多歐卡桑,嚶嚶嚶~
花Gie,你這么浪真的好么。接下來(lái)正式介紹線程池的一些重要源碼吧,首先要看的是上面提到過(guò)的execute方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//步驟一
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//步驟二
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//步驟三
else if (!addWorker(command, false))
reject(command);
}
復(fù)制代碼我們可以根據(jù)條件分為三個(gè)大的步驟來(lái)分析:
步驟一分析
代碼第三局有一個(gè)ctl,它是用于記錄線程池狀態(tài)和運(yùn)行線程數(shù)。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
復(fù)制代碼這里會(huì)判斷正在運(yùn)行的線程是否達(dá)到核心線程數(shù),如果為true,就會(huì)調(diào)用addWorker新增一個(gè)工作線程,并運(yùn)行當(dāng)前任務(wù)(command),如果新增線程失敗,就會(huì)重新獲取ctl。
//運(yùn)行線程數(shù)是否小于核心線程數(shù)
if (workerCountOf(c) < corePoolSize) {
//新增線程到線程池,并將當(dāng)前任務(wù)添加到新增的線程中
if (addWorker(command, true))
return;
//創(chuàng)建線程失敗,重新獲取clt。
c = ctl.get();
}
復(fù)制代碼步驟二
isRunning:判斷線程池的是否為運(yùn)行狀態(tài)
如果運(yùn)行線程數(shù)不小于核心線程數(shù),就會(huì)執(zhí)行以下6個(gè)子步驟:
//1.線程池是運(yùn)行狀態(tài)并且運(yùn)行線程大于核心線程數(shù)時(shí),把任務(wù)放入隊(duì)列中。
if (isRunning(c) && workQueue.offer(command)) {
//2.獲取線程池狀態(tài)
int recheck = ctl.get();
//3.如果線程池不是運(yùn)行狀態(tài),把任務(wù)移除隊(duì)列
if (! isRunning(recheck) && remove(command))
//4.執(zhí)行拒絕策略
reject(command);
//5.判斷當(dāng)前運(yùn)行線程數(shù)是否為0
else if (workerCountOf(recheck) == 0)
//6.創(chuàng)建線程并加入到線程池
addWorker(null, false);
}
復(fù)制代碼//移除任務(wù)
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
復(fù)制代碼步驟三
如果前幾個(gè)條件都不滿足,也就是運(yùn)行線程大于核心線程數(shù)時(shí)并且隊(duì)列已滿時(shí),就會(huì)調(diào)用addWorker新建線程執(zhí)行當(dāng)前任務(wù),如果新建失敗,則表示運(yùn)行線程已達(dá)到最大線程數(shù),不能再次創(chuàng)建新的線程,此時(shí)就會(huì)執(zhí)行拒絕策略。
//創(chuàng)建線程放入線程池中,并且運(yùn)行當(dāng)前任務(wù)。
else if (!addWorker(command, false))
//運(yùn)行線程大于最大線程數(shù)時(shí),失敗則拒絕該任務(wù)
reject(command);
復(fù)制代碼上面多次用到addWorker方法,簡(jiǎn)單看下它的實(shí)現(xiàn)邏輯。
這里做一個(gè)總結(jié)并附上部分源碼注釋,小伙伴們啃起來(lái),略長(zhǎng):
addWorker(command, true):當(dāng)線程數(shù)小于corePoolSize時(shí),創(chuàng)建核心線程并且運(yùn)行task。
addWorker(command, false):當(dāng)核心線程數(shù)已滿,阻塞隊(duì)列已滿,并且線程數(shù)小于
maximumPoolSize時(shí),創(chuàng)建非核心線程并且運(yùn)行task。addWorker(null, false):如果工作線程為0是,創(chuàng)建一個(gè)核心線程但是不運(yùn)行task。(主要是避免工作隊(duì)列中還有任務(wù),但是工作線程為0,導(dǎo)致工作隊(duì)列中的任務(wù)一直沒有執(zhí)行)
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//獲取線程池狀態(tài)和運(yùn)行線程數(shù)。
int c = ctl.get();
//獲取線程池的運(yùn)行狀態(tài)
int rs = runStateOf(c);
//線程池處于關(guān)閉狀態(tài)、當(dāng)前任務(wù)為null、隊(duì)列不為空,斗直接返回失敗
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//獲取線程池中的線程數(shù)
int wc = workerCountOf(c);
//線程數(shù)超過(guò)CAPACITY,直接返回false;
//如果core為true,則運(yùn)行線程數(shù)與核心線程數(shù)進(jìn)行比較,為false則與最大線程數(shù)進(jìn)行比較。
//并且運(yùn)行線程數(shù)大于等于core時(shí),返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//嘗試增加線程數(shù),如果成功,則跳出第一個(gè)for循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
//如果增加線程數(shù)失敗,則重新獲取ctl
c = ctl.get();
//如果當(dāng)前的運(yùn)行狀態(tài)不等于rs,說(shuō)明狀態(tài)已被改變,
//返回第一個(gè)for循環(huán)繼續(xù)執(zhí)行
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//根據(jù)當(dāng)前任務(wù)來(lái)創(chuàng)建Worker對(duì)象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//獲得鎖以后,重新檢查線程池狀態(tài)
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
//把剛剛創(chuàng)建的線程加入到線程池中
workers.add(w);
int s = workers.size();
//記錄線程池中出現(xiàn)過(guò)的最大線程數(shù)量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//啟動(dòng)線程,開始運(yùn)行任務(wù)
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
復(fù)制代碼我:這一波...我可能要啃一個(gè)周末了,那線程池最后應(yīng)該怎樣關(guān)閉呢?
有兩種方式可以關(guān)閉正在運(yùn)行的線程池:
shutdown: 將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài),然后將沒有執(zhí)行任務(wù)的所有線程停止。
shutdownNow: 通過(guò)遍歷線程池中的工作線程,并逐一調(diào)用線程的
interrupt方法來(lái)中斷線程,對(duì)于無(wú)法響應(yīng)中斷的任務(wù)可能會(huì)永遠(yuǎn)無(wú)法終止。shutdownNow會(huì)首先將線程池的狀態(tài)設(shè)置成STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表。
無(wú)論調(diào)用哪一種方式去停止線程,再次調(diào)用isShutdown方法都會(huì)返回true,當(dāng)所有的任務(wù)都已關(guān)閉后,才表示線程池關(guān)閉成功,這時(shí)調(diào)用isTerminaed方法會(huì)返回true。
至于我們應(yīng)該調(diào)用哪一種方法來(lái)關(guān)閉線程池,取決于我們添加到線程池中任務(wù)的特性來(lái)決定,如果任務(wù)不要求執(zhí)行完整,可以調(diào)用shutdownNow,但通常會(huì)使用shutdown來(lái)關(guān)閉線程池。
總結(jié)
以上就是線程池的全部?jī)?nèi)容了,確實(shí)有點(diǎn)長(zhǎng),建議小伙伴能夠靜下心慢慢吭,切不可囫圇吞棗,不然浪費(fèi)時(shí)間還沒有學(xué)到東西,有疑問的小伙伴可以在下方留言。
點(diǎn)關(guān)注,防走丟
以上就是本期全部?jī)?nèi)容,如有紕漏之處,請(qǐng)留言指教,非常感謝。我是花GieGie ,有問題大家隨時(shí)留言討論 ,我們下期見??。
文章持續(xù)更新,可以微信搜一搜 Java開發(fā)零到壹 第一時(shí)間閱讀,并且可以獲取面試資料學(xué)習(xí)視頻等,有興趣的小伙伴歡迎關(guān)注,一起學(xué)習(xí),一起哈????。
原創(chuàng)不易,你怎忍心白嫖,如果你覺得這篇文章對(duì)你有點(diǎn)用的話,感謝老鐵為本文點(diǎn)個(gè)贊、評(píng)論或轉(zhuǎn)發(fā)一下,因?yàn)檫@將是我輸出更多優(yōu)質(zhì)文章的動(dòng)力,感謝!
參考鏈接:
xujiajia.blog.csdn.net/article/det…
blog.csdn.net/heihaozi/ar…
blog.csdn.net/sihai12345/…
ifeve.com/java-thread…
作者:JavaGieGie
鏈接:https://juejin.cn/post/6977756041610002469
來(lái)源:掘金
著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。
