聊聊線程池,ThreadPoolExecutor源碼詳解
一、線程池起步
1. 線程池的基本介紹
首先Java里的線程利用的線程模型是KLT,這帶來了許多好處,比如線程的阻塞不會帶來進程的阻塞,能更加高效地利用CPU的資源等。但這也意味著在Java里的線程的創(chuàng)建和銷毀是一個相對偏且消耗資源的操作,Java線程依賴于內(nèi)核線程,創(chuàng)建線程需要進行操作系統(tǒng)狀態(tài)切換,為避免資源過度消耗需要設(shè)法重用線程執(zhí)行多個任務(wù)。

線程池就起到這么一個重用,它負(fù)責(zé)了線程緩存,也負(fù)責(zé)對線程進行統(tǒng)一的分配、調(diào)優(yōu)與監(jiān)控。通過多個任務(wù)重用線程,實現(xiàn)線程復(fù)用的效果,最終減少系統(tǒng)開銷;
1.1 什么時候使用線程池?
單個任務(wù)處理時間比較短;
需要處理的任務(wù)數(shù)量很大;
1.2 線程池的優(yōu)勢
重用存在的線程,減少線程創(chuàng)建、消亡的開銷,提高性能;
提高響應(yīng)速度,當(dāng)任務(wù)到達時,任務(wù)可以不需要等待線程創(chuàng)建就能立即執(zhí)行;
提高線程的可管理性,可統(tǒng)一分配,調(diào)優(yōu)和監(jiān)控;
2. Java的線程池相關(guān)類

二、Executor框架接口
Executor框架是一個用戶級的調(diào)度器,用以執(zhí)行策略調(diào)用,執(zhí)行和控制,目的是提供一種將“任務(wù)執(zhí)行”與“任務(wù)運行”分離開來的機制;
Executor框架中最為相關(guān)的接口有以下三個:
Executor:接口類,它是Executor框架的基礎(chǔ),將任務(wù)的提交與任務(wù)的執(zhí)行分離開來;
ExecutorService:擴展了 Executor 接口,添加了一些用來管理執(zhí)行器生命周期和任務(wù)的聲明周期的相關(guān)方法;
ScheduledExecutorService:擴展了ExecutorService接口,支持在給定的延遲后運行命令,或者定期執(zhí)行命令;
1. Executor接口
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*/
void execute(Runnable command);
}
Executor接口中只有一個函數(shù),用來替代我們原先啟動線程的方式,我們可以簡單看下對比:
// ===:原先線程啟動方式
Thread t = new Thread();
t.start();
// ===:將任務(wù)交由Executor去處理
Thread t = new Thread();
executor.executor(t);
最終Executor會將任務(wù)交由具體的實現(xiàn)類處理,不同的實現(xiàn)類處理在不同情況下處理的方式不同,可能是立即創(chuàng)建一個新線程并啟動,也有可能是使用已有的工作線程來運行傳入的任務(wù),也可能是根據(jù)設(shè)置線程池的容量或者阻塞隊列的容量來決定是否要將傳入的線程放入阻塞隊列中或者拒絕接收傳入的線程。
2. ExecutorService接口
ExecutorService擴展了Executor接口,是一個比 Executor 使用更廣泛的子類接口,其提供了生命周期管理的方法;調(diào)用ExecutorService的shutdown()方法可以平滑地關(guān)閉 ExecutorService,調(diào)用該方法后,將導(dǎo)致 ExecutorService 停止接受任何新的任務(wù)且等待已經(jīng)提交的任務(wù)執(zhí)行完成(已經(jīng)提交的任務(wù)會分兩類:一類是已經(jīng)在執(zhí)行的,另一類是還沒有開始執(zhí)行的),當(dāng)所有已經(jīng)提交的任務(wù)執(zhí)行完畢后將會關(guān)閉 ExecutorService。如果需要支持即時關(guān)閉,也就是shutDownNow()方法,此時我們需要考慮任務(wù)中斷的處理。
3. ScheduledExecutorService接口
ScheduledExecutorService 擴展 ExecutorService 接口并增加了 schedule() 、scheduleAtFixedRate()等方法。調(diào)用 schedule() 方法可以在指定的延時后執(zhí)行一個 Runnable 或者Callable 任務(wù)。調(diào)用 scheduleAtFixedRate()方法和scheduleWithFixedDelay()方法可以按照指定時間間隔定期執(zhí)行任務(wù)。
三、ThreadPoolExecutor解析
ThreadPoolExecutor類實現(xiàn) Executor 框架中最為核心的一個類,它是線程池的實現(xiàn)類,接下來介紹下它的主要實現(xiàn)代碼:
1. ThreadPoolExecutor的參數(shù)
首先來看類中定義的幾個關(guān)鍵字段,如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 32 - 3
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
這里的ctl是用來保存線程池運行狀態(tài)(runState)和線程池內(nèi)有效線程數(shù)量(workerCount)的一個字段,聲明為一個 AtomicInteger 對象,主要包括了兩部分信息:高3位保存運行狀態(tài),低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位再 - 1,這個常量表示 workerCount 的最大值,即2的29次方。
接下來定義的幾個字段用來表示線程池的狀態(tài),一個有五種狀態(tài),這里做一個簡單的說明:
RUNNING:能接受新提交的任務(wù),以及對已添加的任務(wù)進行處理;
SHUTDOWN:處于shutdown狀態(tài)下的線程池不能接收新的任務(wù),但能處理已經(jīng)提交的任務(wù);
STOP:線程池處于此狀態(tài)下,不接收新的任務(wù),也不會處理已經(jīng)提交的任務(wù),會將已經(jīng)提交的任務(wù)中斷;
TIDYING:當(dāng)所有的任務(wù)已終止,
ctl記錄的有效線程數(shù)量為0時,線程池會變?yōu)門IDYING狀態(tài)。當(dāng)線程池變?yōu)門IDYING狀態(tài)時,會執(zhí)行鉤子函數(shù)terminated(),該函數(shù)在 ThreadPoolExecutor 類中是空的,若用戶想在線程池變?yōu)?TIDYING 時,進行相應(yīng)的處理,可以通過重載該函數(shù)來實現(xiàn);TERMINATED:
terminated()方法被執(zhí)行完后進入該狀態(tài),線程池徹底終止,變成TERMINATED狀態(tài)。

2. execute函數(shù)
首先我們先來了解ThreadPoolExecutor執(zhí)行execute()方法的示意圖:

通過上圖我們能了解,線程池有三個梯度,分別是核心線程池、等待隊列、整個線程池,這三個梯度在代碼的執(zhí)行流程中發(fā)揮了重要的作用,接下來我們來了解它的源碼執(zhí)行過程,如下所示:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 獲取clt,即獲取runState和workerCount
int c = ctl.get();
// workerCountOf(c)取出低29位的值即當(dāng)前活動的線程數(shù),如果小于corePoolSize,
// 則新建一工作線程放入線程池中,并將該線程用來執(zhí)行當(dāng)前任務(wù)
if (workerCountOf(c) < corePoolSize) {
// addWorker中的第二個參數(shù)表示限制添加線程的數(shù)量是根據(jù) corePoolSize 來判斷還是 maximumPoolSize 來判斷;
// 如果為true,根據(jù)corePoolSize來判斷;如果為false,則根據(jù)maximumPoolSize來判斷
if (addWorker(command, true))
// 添加成功直接返回
return;
// 如果添加失敗,則重新獲取ctl值
c = ctl.get();
}
// 如果當(dāng)前線程池是運行狀態(tài)并且任務(wù)添加到隊列成功
if (isRunning(c) && workQueue.offer(command)) {
// 重新獲取ctl值
int recheck = ctl.get();
// 再次判斷線程池的運行狀態(tài),若不是運行狀態(tài),由于之前已經(jīng)把command添加到隊列中了,現(xiàn)在需要移除該command
if (!isRunning(recheck) && remove(command))
// 使用拒絕策略對該任務(wù)進行處理
reject(command);
// 獲取線程池中的有效線程數(shù),如果數(shù)量是0,則執(zhí)行addWorker方法
// 如果判斷workerCount大于0,則直接返回,因為任務(wù)已經(jīng)加入到隊列中了,之后會被調(diào)度執(zhí)行,這里不操心、。
else if (workerCountOf(recheck) == 0)
// 添加一個新的工作線程,任務(wù)已經(jīng)在工作隊列里了,所以第一個參數(shù)為null
addWorker(null, false);
}
/*
* 代碼如果走到這,那么有以下兩種情況:
* 1. 線程池已經(jīng)不是RUNNING狀態(tài)了;
* 2. 線程池是RUNNING狀態(tài),但現(xiàn)有線程>=核心線程池的數(shù)量,并且Blocking隊列已滿,
* 那么線程到第三個梯度,整個線程池了,這時,再次調(diào)用addWorker方法,
* 第二個參數(shù)傳入為false,將線程池的有限線程數(shù)量設(shè)置為maximumPoolSize;
*/
else if (!addWorker(command, false))
// 如果失敗則拒絕該任務(wù)
reject(command);
}
簡單來說,如果線程池一直處于 RUNNING 狀態(tài)的話,則函數(shù)的執(zhí)行流程如下:
如果當(dāng)前運行的線程少于 corePoolSize,則創(chuàng)建新線程來執(zhí)行任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖,后面再介紹);
如果運行的線程等于或多于 corePoolSize,則將任務(wù)假如到 BlockingQueue;
如果無法將任務(wù)加入 BlockingQueue(隊列已滿),在創(chuàng)建新的線程來處理任務(wù)(注意,執(zhí)行這一步需要獲取全局鎖);
如果創(chuàng)建新線程將使用當(dāng)前運行的線程大于 maximumPoolSize,任務(wù)將被拒絕,并調(diào)用
RejectedExecutionHandler.reijectedExecution()方法;

ThreadPoolExecutor采取上述步驟的總體設(shè)計思路,是為了在執(zhí)行execute()方法時,盡可能避免記獲取全局鎖,那將會是一個很嚴(yán)重的可伸縮瓶頸)。在ThreadPoolExecutor完成預(yù)熱之后(當(dāng)前運行的線程數(shù)大于大于corePoolSize),幾乎所有的 execute()方法調(diào)用都是通過執(zhí)行步驟2,而步驟2不需要獲取全局鎖。
3. 構(gòu)造函數(shù)
接下來我們來了解如何創(chuàng)建一個新的線程池,它的構(gòu)造函數(shù)的源代碼如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 驗證參數(shù)合法
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
簡單介紹下構(gòu)造函數(shù)中幾個變量的定義:
corePoolSize:定義核心線程的數(shù)量,當(dāng)提交一個任務(wù)到線程池時,線程池會創(chuàng)建一個線程來執(zhí)行任務(wù),即使其他空閑線程的基本線程能夠執(zhí)行新任務(wù);
maximumPoolSize:定義線程池最大線程的數(shù)量;
keepAliveTime:線程池維護線程所允許的空閑時間。當(dāng)線程池中的線程數(shù)量大于
corePoolSize的時候,如果這時沒有新的任務(wù)提交,核心線程外的線程不會立即銷毀,而是會等待,直到等待的時間超過了keepAliveTime;unit:時間單位,為
keepAliveTime指定時間單位;workQueue:等待隊列,當(dāng)任務(wù)提交時,如果線程池中的線程數(shù)量大于等于
corePoolSize的時候,把該任務(wù)封裝成一個Worker對象放入等待隊列,它有以下幾個實現(xiàn)類可以選擇:ArrayBlockingQueue:是一個基于數(shù)組結(jié)構(gòu)的有界阻塞隊列,此隊列按FIFO原則對元素進行排序;
LinkedBlockingQueue:一個基于鏈表結(jié)構(gòu)的阻塞隊列,此隊列按FIFO排序元素,吞吐量要高于 ArrayBlockingQueue;
SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài);
PriorityBlockingQueue:一個具有優(yōu)先級的無限阻塞隊列;
threadFactory:它是
ThreadFactory類型的變量,用來創(chuàng)建新線程。默認(rèn)使用Executors.defaultThreadFactory()來創(chuàng)建線程。使用默認(rèn)的ThreadFactory來創(chuàng)建線程時,會使新創(chuàng)建的線程具有相同的NORM_PRIORITY優(yōu)先級并且是非守護線程,同時也設(shè)置了線程的名稱。handler:它是
RejectedExecutionHandler類型的變量,表示線程池的飽和策略。如果阻塞隊列滿了并且沒有空閑的線程,這時如果繼續(xù)提交任務(wù),就需要采取一種策略處理該任務(wù)。線程池提供了4種策略:AbortPolicy:直接拋出異常,這是默認(rèn)策略;
CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù);
DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);
DiscardPolicy:直接丟棄任務(wù);
4. addWorker函數(shù)
上面我們講execute()函數(shù)時,提到了addWorker()函數(shù),該函數(shù)的作用主要就是在線程池中新建一個新的線程并執(zhí)行,firstTask參數(shù)表示指定新增的線程執(zhí)行的第一個任務(wù),core參數(shù)為 true 表示在此時線程數(shù)量的上限為corePoolSize,當(dāng)這個參數(shù)為 false 時,表示此時代碼判斷的線程數(shù)量上限為maximunPoolSize,這一點在前面代碼已經(jīng)有所提及。源代碼如下所示:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 獲取運行狀態(tài)
int rs = runStateOf(c);
// 檢查隊列是否在必要的時候為空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 這一循環(huán)內(nèi)主要就是通過CAS增加線程個數(shù)
for (;;) {
int wc = workerCountOf(c);
// 如果線程數(shù)量超出限制,返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS增加workerCount,如果成功,則跳出第一個for循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果增加workerCount失敗,則重新獲取ctl的值
c = ctl.get();
// 如果當(dāng)前的運行狀態(tài)不等于rs,說明狀態(tài)已被改變,返回第一個for循環(huán)繼續(xù)執(zhí)行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 執(zhí)行到這里說明CAS增加新線程個數(shù)成功了,下面開始要創(chuàng)建新的工作線程Worker
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 根據(jù)firstTask來創(chuàng)建 Worker對象
w = new Worker(firstTask);
// Worker對象內(nèi)部維護著一個線程對象
final Thread t = w.thread;
if (t != null) {
// 加鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 重新檢查線程池狀態(tài),以免在獲取鎖之前調(diào)用shutdown方法改變線程池狀態(tài)
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN表示是 RUNNING 狀態(tài);
// 如果rs是RUNNING狀態(tài)或者rs是SHUTDOWN狀態(tài)并且 firstTask 為null,向線程池中添加線程。
// 因為在 SHUTDOWN 時不會在添加新的任務(wù),但還是會執(zhí)行workQueue中的任務(wù)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
// workers是一個HashSet
workers.add(w);
int s = workers.size();
// largestPoolSize記錄著線程池中出現(xiàn)過的最大線程數(shù)量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 釋放鎖
mainLock.unlock();
}
if (workerAdded) {
// 啟動線程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
上面代碼的最后幾行,會判斷添加工作線程是否成功,如果失敗,會執(zhí)行addWorkerFailed()函數(shù),將任務(wù)從workers中移除,并且做workerCount-1操作,簡單看一下該函數(shù)的源代碼:
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
// 加鎖
mainLock.lock();
try {
// 如果worker不為null
if (w != null)
// workers移除worker
workers.remove(w);
// 通過CAS操作使 workerCount-1
decrementWorkerCount();
tryTerminate();
} finally {
// 釋放鎖
mainLock.unlock();
}
}
5. Worker對象
回顧上面代碼,我們發(fā)現(xiàn)線程池創(chuàng)建新的工作線程都是去創(chuàng)建一個新的 Worker 對象,事實上線程池中的每一個工作線程都被封裝為Worker對象,ThreadPool 其實就是在維護著一組 Worker 對象,接下來我們來了解Worker類的源代碼,如下:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
// 給出初始firstTask,由線程創(chuàng)建工廠創(chuàng)建新的線程
Worker(Runnable firstTask) {
// 將狀態(tài)設(shè)置為-1,防止被調(diào)用前就被中斷
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
// 將run方法的運行委托給外部runWorker()函數(shù)
public void run() {
runWorker(this);
}
// 關(guān)于同步狀態(tài)(鎖)
//
// 同步狀態(tài)state=0表示鎖未被獲取
// 同步狀態(tài)state=1表示鎖被獲取
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker類繼承了AQS,并實現(xiàn)了Runnable接口,其中 firstTask 屬性用來保存?zhèn)魅氲娜蝿?wù),而 thread 屬性是在調(diào)用構(gòu)造方法時傳入的 ThreadFactory 創(chuàng)建出來的線程,是用來處理任務(wù)的線程。
在調(diào)用構(gòu)造方法時,需要把任務(wù)傳入,這里通過getThreadFactory().newThread(this);來新建一個線程,newThread() 方法傳入的參數(shù)是 this;因為 Worker 本身繼承了 Runnable 接口,也就是一個線程,所以一個 Worker 對象在啟動的時候會調(diào)用 Worker 類中的 run 方法。而run方法中又調(diào)用了runWorker()方法。
6. runWorker函數(shù)
通過上面源碼的了解,我們知道任務(wù)最終的執(zhí)行是在runWorker()函數(shù)中,接下來我們就通過源碼來了解這一過程:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 調(diào)用unlock(),將state設(shè)置為0,允許中斷
w.unlock();
// 是否因為異常退出循環(huán)
boolean completedAbruptly = true;
try {
// 如果task為空,則通過getTask()從隊列中獲取任務(wù)
while (task != null || (task = getTask()) != null) {
// 加鎖
w.lock();
// 如果線程池已被停止(STOP)(至少大于STOP狀態(tài)),要確保線程都被中斷
// 如果狀態(tài)不對,檢查當(dāng)前線程是否中斷并清除中斷狀態(tài),并且再次檢查線程池狀態(tài)是否大于STOP
// 如果上述滿足,檢查該對象是否處于中斷狀態(tài),不清除中斷標(biāo)記
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 中斷該對象
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
// 更新當(dāng)前已經(jīng)完成任務(wù)數(shù)量
w.completedTasks++;
// 釋放鎖
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 執(zhí)行清理工作,處理并退出當(dāng)前 worker
processWorkerExit(w, completedAbruptly);
}
}
看完上面代碼,我們可以大致地了解runWorker()函數(shù)的執(zhí)行流程:
首先執(zhí)行
unlock()方法,將Worker的state置為0這樣工作線程就可以被中斷了(因為后續(xù)線程池關(guān)閉操作需要去中斷線程);判斷當(dāng)前工作的任務(wù)(當(dāng)前工作線程中的task,或者從任務(wù)隊列中取出的task)是否為null,如果不為null就往下執(zhí)行,為null就執(zhí)行
processWorkerExit()方法;獲取工作線程內(nèi)部持有的獨占鎖(避免在執(zhí)行任務(wù)期間,其他線程調(diào)用shutdown后正在執(zhí)行的任務(wù)被中斷,shutdown只會中斷當(dāng)前被阻塞掛起的沒有執(zhí)行任務(wù)的線程);
之后執(zhí)行
beforeExecute()函數(shù),該方法為擴展接口代碼,表示在具體執(zhí)行任務(wù)之前出一些處理,然后就開始執(zhí)行task.run()函數(shù)去真正地執(zhí)行具體任務(wù),執(zhí)行完之后會調(diào)用afterExecute()方法,用以處理任務(wù)執(zhí)行完畢之后的工作,也是一個擴展接口代碼;更新當(dāng)前線程池完成的任務(wù)數(shù),并釋放鎖;
7. getTask函數(shù)
上面runWorker()函數(shù)中我們稍微提及了getTask()函數(shù),該函數(shù)表示從阻塞隊列中獲取任務(wù),接下來我們就來進行了解,其源代碼如下:
private Runnable getTask() {
// timeOut變量的值表示上次從阻塞隊列中取任務(wù)時是否超時
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果線程池處于非RUNNING狀態(tài)或者處于STOP以上狀態(tài)并且等待隊列為空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 將workCount-1并返回null
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// timed 變量用于判斷是否需要進行超時控制。
// allowCoreThreadTimeOut默認(rèn)是false,也就是核心線程不允許進行超時;
// wc > corePoolSize,表示當(dāng)前線程池中的線程數(shù)量大于核心線程數(shù)量;
// 對于超過核心線程數(shù)量的這些線程,需要進行超時控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
* wc > maximumPoolSize的情況是因為可能在此方法執(zhí)行階段同時執(zhí)行了setMaximumPoolSize方法;
* timed && timedOut 如果為true,表示當(dāng)前操作需要進行超時控制,并且上次從阻塞隊列中獲取任務(wù)發(fā)生了超時
* 接下來判斷,如果有效線程數(shù)量大于1,或者阻塞隊列是空的,那么嘗試將workerCount減1;
* 如果減1失敗,則返回重試。
* 如果wc == 1時,也就說明當(dāng)前線程是線程池中唯一的一個線程了。
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/*
* 根據(jù)timed來判斷,如果為true,則通過阻塞隊列的poll方法進行超時控制,如果在keepAliveTime時間內(nèi)沒有獲取到任務(wù),則返回null;
* 否則通過take方法,如果這時隊列為空,則take方法會阻塞直到隊列不為空。
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 如果 r == null,說明已經(jīng)超時,timedOut設(shè)置為true
timedOut = true;
} catch (InterruptedException retry) {
// 如果獲取任務(wù)時當(dāng)前線程發(fā)生了中斷,則設(shè)置timedOut為false并返回循環(huán)重試
timedOut = false;
}
}
}
這里主要講一下第二個 if 判斷,這段代碼的目的是為了控制線程池的有效線程數(shù)量:
由上文中的分析可以知道,在執(zhí)行execute方法時,如果當(dāng)前線程池的線程數(shù)量超過了corePoolSize 且小于 maximumPoolSize,并且 workQueue 已滿時,則可以增加工作線程,但這時如果超時沒有獲取到任務(wù),也就是 timedOut 為true的情況,說明 workQueue 已經(jīng)為空了,也就說明了當(dāng)前線程池中不需要那么多線程來執(zhí)行任務(wù)了,可以把多于corePoolSize數(shù)量的線程銷毀掉,保持線程數(shù)量在corePoolSize即可。
什么時候會銷毀?當(dāng)然是runWorker()方法執(zhí)行完之后,也就是Worker中的run方法執(zhí)行完,由 JVM 自動回收。getTask()函數(shù)返回null時,在runWorker()方法中會跳出while循環(huán),最后會執(zhí)行processWorkerExit()方法。
8. processWorkerExit函數(shù)
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly值為true,則說明線程執(zhí)行時出現(xiàn)了異常,需要將workerCount減1;
if (completedAbruptly)
decrementWorkerCount();
// 獲取全局鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 統(tǒng)計完成的任務(wù)數(shù)
completedTaskCount += w.completedTasks;
// 從 workers 中移除一個工作線程
workers.remove(w);
} finally {
// 釋放鎖
mainLock.unlock();
}
// 根據(jù)線程池狀態(tài)判斷是否結(jié)束線程池
tryTerminate();
int c = ctl.get();
// 判斷當(dāng)前線程池中的數(shù)量是否少于核心線程數(shù)
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// allowCoreThreadTimeOut表示是否允許核心線程超時,默認(rèn)為false
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果min等于0并且隊列不為空
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 現(xiàn)有工作線程大于min,直接返回
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false);
}
}
我們大致來縷清下processWorkerExit()函數(shù)的工作流程:
首先獲取全局鎖,之后對線程池完成的任務(wù)個數(shù)進行統(tǒng)計,之后再從工作線程的集合中移除當(dāng)前工作線程,完成清理工作;
調(diào)用
tryTerminate()函數(shù),根據(jù)線程池狀態(tài)判斷是否結(jié)束線程池,下面詳細(xì)講該函數(shù)實現(xiàn);判斷當(dāng)前線程池中的線程個數(shù)是否小于核心線程數(shù),如果是就新增一個線程保證有足夠的線程可以執(zhí)行任務(wù)隊列中的任務(wù)或者提交的新任務(wù);
9. tryTerminate方法
tryTerminate()方法的主要工作就是根據(jù)線程池狀態(tài)進行判斷是否結(jié)束線程池,代碼如下:
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/*
* 當(dāng)線程池的狀態(tài)為以下幾種情況時,直接返回,不調(diào)用terminated():
* RUNNING:線程池還在運行中,不能停止去terminated;
* TIDYING或TERMINATED:因為線程池中已經(jīng)沒有正在運行的線程了;
* SHUTDOWN并且等待隊列非空:需要先執(zhí)行完workQueue中的task;
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果線程數(shù)量不為0,則中斷一個空閑的工作線程,并返回
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
// 加鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 通過CAS嘗試設(shè)置狀態(tài)為TIDYING,如果設(shè)置成功,則調(diào)用terminated方法
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 調(diào)用terminated()方法
terminated();
} finally {
// 設(shè)置狀態(tài)為TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
// 解鎖
mainLock.unlock();
}
// else retry on failed CAS
}
}
這里需要注意的一點是,terminated()默認(rèn)什么都不實現(xiàn),需要繼承類根據(jù)業(yè)務(wù)場景去完成該方法;

騰訊、阿里、滴滴后臺面試題匯總總結(jié) — (含答案)
面試:史上最全多線程面試題 !
最新阿里內(nèi)推Java后端面試題
JVM難學(xué)?那是因為你沒認(rèn)真看完這篇文章

關(guān)注作者微信公眾號 —《JAVA爛豬皮》
了解更多java后端架構(gòu)知識以及最新面試寶典


看完本文記得給作者點贊+在看哦~~~大家的支持,是作者源源不斷出文的動力
作者:周二鴨
出處:https://www.cnblogs.com/jojop/p/14118479.html
