ThreadPoolExecutor 深入解析
點(diǎn)擊下方“IT牧場(chǎng)”,選擇“設(shè)為星標(biāo)”

本文來(lái)源:http://rrd.me/g6P3V
Java中的線程池是運(yùn)用場(chǎng)景最多的并發(fā)框架,幾乎所有需要異步或并發(fā)執(zhí)行任務(wù)的程序 都可以使用線程池。合理地使用線程池能夠帶來(lái)3個(gè)好處:
降低資源消耗。通過(guò)重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。 提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。 提高線程的可管理性。線程是稀缺資源,如果無(wú)限制地創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源, 還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控。
線程池的主要處理流程

ThreadPoolExecutor 類圖

java中的線程池都是基于ThreadPoolExecutor 來(lái)實(shí)現(xiàn)的。
核心屬性
// 狀態(tài)控制屬性:高3位表示線程池的運(yùn)行狀態(tài),剩下的29位表示當(dāng)前有效的線程數(shù)量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 線程池的基本大小,當(dāng)提交一個(gè)任務(wù)到線程池時(shí),線程池會(huì)創(chuàng)建一個(gè)線程來(lái)執(zhí)行任務(wù),
// 即使其他空閑的基本線程能夠執(zhí)行新任務(wù)也會(huì)創(chuàng)建線程,等到需要執(zhí)行的任務(wù)數(shù)大于
// 線程池基本大小時(shí)就不再創(chuàng)建。如果調(diào)用了線程池的prestartAllCoreThreads()方法,
// 線程池會(huì)提前創(chuàng)建并啟動(dòng)所有基本線程。
private volatile int corePoolSize;
// 線程池線程最大數(shù)量,如果隊(duì)列滿了,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),
// 則線程池會(huì)再創(chuàng)建新的線程執(zhí)行任務(wù)。如果使用了無(wú)界的任務(wù)隊(duì)列這個(gè)參數(shù)就沒(méi)什么效果。
private volatile int maximumPoolSize;
// 用于設(shè)置創(chuàng)建線程的工廠,可以通過(guò)線程工廠給每個(gè)創(chuàng)建出來(lái)的線程設(shè) 置更有意義的名字。
private volatile ThreadFactory threadFactory;
// 飽和策略,默認(rèn)情況下是AbortPolicy。
private volatile RejectedExecutionHandler handler;
// 線程池的工作線程空閑后,保持存活的時(shí)間。如果任務(wù)很多,并且每個(gè)任務(wù)執(zhí)行的時(shí)間比較短,
// 可以調(diào)大時(shí)間,提高線程的利用率。
private volatile long keepAliveTime;
// 用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列,具體可以參考[JAVA并發(fā)容器-阻塞隊(duì)列](https://www.jianshu.com/p/5646fb5faee1)
private final BlockingQueue<Runnable> workQueue;
// 存放工作線程的容器,必須獲取到鎖才能訪問(wèn)
private final HashSet<Worker> workers = new HashSet<Worker>();
// ctl的拆包和包裝
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
阻塞隊(duì)列可以參考JAVA并發(fā)容器-阻塞隊(duì)列。 ctl狀態(tài)控制屬性,高3位表示線程池的運(yùn)行狀態(tài)(runState),剩下的29位表示當(dāng)前有效的線程數(shù)量(workerCount)線程池最大線程數(shù)是 (1 << COUNT_BITS) - 1 = 536 870 911
線程池的運(yùn)行狀態(tài)runState
| 狀態(tài) | 解釋 |
|---|---|
| RUNNING | 運(yùn)行態(tài),可處理新任務(wù)并執(zhí)行隊(duì)列中的任務(wù) |
| SHUTDOW | 關(guān)閉態(tài),不接受新任務(wù),但處理隊(duì)列中的任務(wù) |
| STOP | 停止態(tài),不接受新任務(wù),不處理隊(duì)列中任務(wù),且打斷運(yùn)行中任務(wù) |
| TIDYING | 整理態(tài),所有任務(wù)已經(jīng)結(jié)束,workerCount = 0 ,將執(zhí)行terminated()方法 |
| TERMINATED | 結(jié)束態(tài),terminated() 方法已完成 |
![]() |
RejectedExecutionHandler(拒絕策略)
AbortPolicy:直接拋出異常。 CallerRunsPolicy:只用調(diào)用者所在線程來(lái)運(yùn)行任務(wù)。 DiscardOldestPolicy:丟棄隊(duì)列里最近的一個(gè)任務(wù),并執(zhí)行當(dāng)前任務(wù)。 DiscardPolicy:不處理,丟棄掉。
核心內(nèi)部類 Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// 正在執(zhí)行任務(wù)的線程
final Thread thread;
// 線程創(chuàng)建時(shí)初始化的任務(wù)
Runnable firstTask;
// 完成任務(wù)計(jì)數(shù)器
volatile long completedTasks;
Worker(Runnable firstTask) {
// 在runWorker方法運(yùn)行之前禁止中斷,要中斷線程必須先獲取worker內(nèi)部的互斥鎖
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** delegates main run loop to outer runworker */
// 直接委托給外部runworker方法
public void run() {
runWorker(this);
}
...
}
Worker 類他將執(zhí)行任務(wù)的線程封裝到了內(nèi)部,在初始化Worker 的時(shí)候,會(huì)調(diào)用ThreadFactory初始化新線程;Worker 繼承了AbstractQueuedSynchronizer,在內(nèi)部實(shí)現(xiàn)了一個(gè)互斥鎖,主要目的是控制工作線程的中斷狀態(tài)。
線程的中斷一般是由其他線程發(fā)起的,比如ThreadPoolExecutor#interruptIdleWorkers(boolean)方法,它在調(diào)用過(guò)程中會(huì)去中斷worker內(nèi)部的工作線程,Work的互斥鎖可以保證正在執(zhí)行的任務(wù)不被打斷。它是怎么保證的呢?在線程真正執(zhí)行任務(wù)的時(shí)候,也就是runWorker方法被調(diào)用時(shí),它會(huì)先獲取到Work的鎖,當(dāng)我們?cè)谄渌€程需要中斷當(dāng)前線程時(shí)也需要獲取到work的互斥鎖,否則不能中斷。
構(gòu)造函數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
通過(guò)構(gòu)造函數(shù)我們可以發(fā)現(xiàn),構(gòu)造函數(shù)就是在對(duì)線程池核心屬性進(jìn)行賦值,下面我們來(lái)介紹一下這些核心屬性:
corePoolSize:核心線程數(shù) maximumPoolSize:線程池最大數(shù)量 keepAliveTime:線程池的工作線程空閑后,保持存活的時(shí)間。 unit:線程活動(dòng)保持時(shí)間的單位。 workQueue:用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列,具體可以參考JAVA并發(fā)容器-阻塞隊(duì)列 threadFactory:用于設(shè)置創(chuàng)建線程的工廠 handler:飽和策略,默認(rèn)情況下是AbortPolicy。
execute() 提交線程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 獲取控制的值
int c = ctl.get();
// 判斷工作線程數(shù)是否小于corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 新創(chuàng)建核心線程
if (addWorker(command, true))
return;
c = ctl.get();
}
// 工作線程數(shù)大于或等于corePoolSize
// 判斷線程池是否處于運(yùn)行狀態(tài),如果是將任務(wù)command入隊(duì)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次檢查線程池的運(yùn)行狀態(tài),如果不在運(yùn)行中,那么將任務(wù)從隊(duì)列里面刪除,并嘗試結(jié)束線程池
if (! isRunning(recheck) && remove(command))
// 調(diào)用驅(qū)逐策略
reject(command);
// 檢查活躍線程總數(shù)是否為0
else if (workerCountOf(recheck) == 0)
// 新創(chuàng)建非核心線程
addWorker(null, false);
}
// 隊(duì)列滿了,新創(chuàng)建非核心線程
else if (!addWorker(command, false))
// 調(diào)用驅(qū)逐策略
reject(command);
}
該方法是沒(méi)有返回值的
addWorker() 新創(chuàng)建線程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 僅在必要的時(shí)候檢查隊(duì)列是否為NULL
// 檢查隊(duì)列是否處于非運(yùn)行狀態(tài)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 獲取活躍線程數(shù)
int wc = workerCountOf(c);
// 判斷線程是否超過(guò)最大值,當(dāng)隊(duì)列滿了則驗(yàn)證線程數(shù)是否大于maximumPoolSize,
// 沒(méi)有滿則驗(yàn)證corePoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 增加活躍線程總數(shù),否則重試
if (compareAndIncrementWorkerCount(c))
// 如果成功跳出外層循環(huán)
break retry;
c = ctl.get(); // Re-read ctl
// 再次校驗(yàn)一下線程池運(yùn)行狀態(tài)
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 工作線程是否啟動(dòng)
boolean workerStarted = false;
// 工作線程是否創(chuàng)建
boolean workerAdded = false;
Worker w = null;
try {
// 新創(chuàng)建線程
w = new Worker(firstTask);
// 獲取新創(chuàng)建的線程
final Thread t = w.thread;
if (t != null) {
// 創(chuàng)建線程要獲得全局鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 檢查線程池的運(yùn)行狀態(tài)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 檢查線程的狀態(tài)
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 將新建工作線程存放到容器
workers.add(w);
int s = workers.size();
if (s > largestPoolSize) {
// 跟蹤線程池最大的工作線程總數(shù)
largestPoolSize = s;
}
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 啟動(dòng)工作線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 啟動(dòng)新的工作線程失敗,
// 1. 將工作線程移除workers容器
// 2. 還原工作線程總數(shù)(workerCount)
// 3. 嘗試結(jié)束線程
addWorkerFailed(w);
}
return workerStarted;
}
如果啟動(dòng)新線程失敗那么addWorkerFailed()這個(gè)方法將做一下三件事:
將工作線程移除workers容器 還原工作線程總數(shù)(workerCount) 嘗試結(jié)束線程
execute() 執(zhí)行過(guò)程

如果當(dāng)前運(yùn)行的線程少于corePoolSize,即使有空閑線程也會(huì)創(chuàng)建新線程來(lái)執(zhí)行任務(wù),(注意,執(zhí)行這一步驟 需要獲取全局鎖)。如果調(diào)用了線程池的 restartAllCoreThreads()方法, 線程池會(huì)提前創(chuàng)建并啟動(dòng)所有基本線程。如果運(yùn)行的線程等于或多于corePoolSize,則將任務(wù)加入BlockingQueue。 如果無(wú)法將任務(wù)加入BlockingQueue(隊(duì)列已滿),則創(chuàng)建新的線程來(lái)處理任務(wù)(注意,執(zhí) 行這一步驟需要獲取全局鎖)。 如果創(chuàng)建新線程將使當(dāng)前運(yùn)行的線程超出maximumPoolSize,任務(wù)將被拒絕,并調(diào)用 RejectedExecutionHandler.rejectedExecution()方法。
線程任務(wù)的執(zhí)行
線程的正在執(zhí)行是ThreadPoolExecutor.Worker#run()方法,但是這個(gè)方法直接委托給了外部的runWorker()方法,源碼如下:
// 直接委托給外部runworker方法
public void run() {
runWorker(this);
}
runWorker() 執(zhí)行任務(wù)
final void runWorker(Worker w) {
// 當(dāng)前Work中的工作線程
Thread wt = Thread.currentThread();
// 獲取初始任務(wù)
Runnable task = w.firstTask;
// 初始任務(wù)置NULL(表示不是建線程)
w.firstTask = null;
// 修改鎖的狀態(tài),使需發(fā)起中斷的線程可以獲取到鎖(使工作線程可以響應(yīng)中斷)
w.unlock(); // allow interrupts
// 工作線程是否是異常結(jié)束
boolean completedAbruptly = true;
try {
// 循環(huán)的從隊(duì)列里面獲取任務(wù)
while (task != null || (task = getTask()) != null) {
// 每次執(zhí)行任務(wù)時(shí)需要獲取到內(nèi)置的互斥鎖
w.lock();
// 1. 當(dāng)前工作線程不是中斷狀態(tài),且線程池是STOP,TIDYING,TERMINATED狀態(tài),我們需要中斷當(dāng)前工作線程
// 2. 當(dāng)前工作線程是中斷狀態(tài),且線程池是STOP,TIDYING,TERMINATED狀態(tài),我們需要中斷當(dāng)前工作線程
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())
// 中斷線程,中斷標(biāo)志位設(shè)置成true
wt.interrupt();
try {
// 執(zhí)行任務(wù)前置方法,擴(kuò)展用
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 執(zhí)行任務(wù)后置方法,擴(kuò)展用
afterExecute(task, thrown);
}
} finally {
// 任務(wù)NULL表示已經(jīng)處理了
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 將工作線程從容器中剔除
processWorkerExit(w, completedAbruptly);
}
}
正在執(zhí)行線程的方法,執(zhí)行流程:
獲取到當(dāng)前的工作線程 獲取初始化的線程任務(wù) 修改鎖的狀態(tài),使工作線程可以響應(yīng)中斷 獲取工作線程的鎖(保證在任務(wù)執(zhí)行過(guò)程中工作線程不被外部線程中斷),如果獲取到的任務(wù)是NULL,則結(jié)束當(dāng)前工作線程 判斷先測(cè)試狀態(tài),看是否需要中斷當(dāng)前工作線程 執(zhí)行任務(wù)前置方法 beforeExecute(wt, task);執(zhí)行任務(wù)(執(zhí)行提交到線程池的線程) task.run();執(zhí)行任務(wù)后置方法 afterExecute(task, thrown);,處理異常信息修改完成任務(wù)的總數(shù) 解除當(dāng)前工作線程的鎖 獲取隊(duì)列里面的任務(wù),循環(huán)第4步 將工作線程從容器中剔除
wt.isInterrupted():獲取中斷狀態(tài),無(wú)副作用Thread.interrupted():獲取中斷狀態(tài),并將中斷狀態(tài)恢重置成false(不中斷)beforeExecute(wt, task);:執(zhí)行任務(wù)前置方法,擴(kuò)展用。如果這個(gè)方法在執(zhí)行過(guò)程中拋出異常,那么會(huì)導(dǎo)致當(dāng)前工作線程直接死亡而被回收,工作線程異常結(jié)束標(biāo)記位completedAbruptly被設(shè)置成true,任務(wù)線程不能被執(zhí)行task.run();:執(zhí)行任務(wù)afterExecute(task, thrown);:執(zhí)行任務(wù)后置方法,擴(kuò)展用。這個(gè)方法可以收集到任務(wù)運(yùn)行的異常信息,這個(gè)方法如果有異常拋出,也會(huì)導(dǎo)致當(dāng)前工作線程直接死亡而被回收,工作線程異常結(jié)束標(biāo)記位completedAbruptly被設(shè)置成true任務(wù)運(yùn)行過(guò)程中的異常信息除了 RuntimeException以外,其他全部封裝成Error,然后被afterExecute方法收集terminated()這也是一個(gè)擴(kuò)展方法,在線程池結(jié)束的時(shí)候調(diào)用
getTask() 獲取任務(wù)
private Runnable getTask() {
// 記錄最后一次獲取任務(wù)是不是超時(shí)了
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// 獲取線程池狀態(tài)
int rs = runStateOf(c);
// 線程池是停止?fàn)顟B(tài)或者狀態(tài)是關(guān)閉并且隊(duì)列為空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 扣減工作線程總數(shù)
decrementWorkerCount();
return null;
}
// 獲取工作線程總數(shù)
int wc = workerCountOf(c);
// 工作線程是否需要剔除
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 扣減工作線程總數(shù)
if (compareAndDecrementWorkerCount(c))
// 剔除工作線程,當(dāng)返回為NULL的時(shí)候,runWorker方法的while循環(huán)會(huì)結(jié)束
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
getTask() 阻塞或定時(shí)獲取任務(wù)。當(dāng)該方法返回NULL時(shí),當(dāng)前工作線程會(huì)結(jié)束,最后被回收,下面是返回NULL的幾種情況:
當(dāng)前工作線程總數(shù) wc大于maximumPoolSize最大工作線程總數(shù)。maximumPoolSize可能被setMaximumPoolSize方法改變。當(dāng)線程池處于停止?fàn)顟B(tài)時(shí)。 當(dāng)線程池處于關(guān)閉狀態(tài)且阻塞隊(duì)列為空。 當(dāng)前工作線程超時(shí)等待任務(wù),并且當(dāng)前工作線程總數(shù) wc大于corePoolSize或者allowCoreThreadTimeOut=true允許核心線程超時(shí)被回收,默認(rèn)是false。
線程池在運(yùn)行過(guò)程中可以調(diào)用
setMaximumPoolSize()方法來(lái)修改maximumPoolSize值,新的值必須大于corePoolSize,如果新的maximumPoolSize小于原來(lái)的值,那么在該方法會(huì)去中斷當(dāng)前的空閑線程(工作線程內(nèi)置鎖的是解鎖狀態(tài)的線程為空閑線程)。
processWorkerExit() 工作線程結(jié)束
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 判斷是否是異常情況導(dǎo)致工作線程被回收
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
// 如果是扣減工作線程總數(shù),如果不是在getTask()方法就已經(jīng)扣減了
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 將當(dāng)前工作線程完成任務(wù)的總數(shù)加到completedTaskCount標(biāo)志位上
completedTaskCount += w.completedTasks;
// 剔除當(dāng)前工作線程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 嘗試結(jié)束線程池
tryTerminate();
// 判刑是否需要新實(shí)例化工程線程
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
剔除線程流程:
判斷是否是異常情況導(dǎo)致工作線程被回收,如果是 workerCount--獲取到全局鎖 將當(dāng)前工作線程完成任務(wù)的總數(shù)加到 completedTaskCount標(biāo)志位上剔除工作線程 解鎖 嘗試結(jié)束線程池 tryTerminate()判刑是否需要重新實(shí)例化工程線程放到 workers容器
結(jié)束線程池
shutdown() 關(guān)閉線程池
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 檢查權(quán)限
checkShutdownAccess();
// 設(shè)置線程池狀態(tài)為關(guān)閉
advanceRunState(SHUTDOWN);
// 中斷線程
interruptIdleWorkers();
// 擴(kuò)展方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 嘗試結(jié)束線池
tryTerminate();
}
通過(guò)遍歷工作線程容器 workers,然后逐個(gè)中斷工作線程,如果無(wú)法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無(wú)法終止shutdown只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài),然后中斷所有沒(méi)有正在執(zhí)行任務(wù)的線程。
shutdown() 關(guān)閉線程池
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 檢查權(quán)限
checkShutdownAccess();
// 設(shè)置線程池狀態(tài)為停止?fàn)顟B(tài)
advanceRunState(STOP);
// 中斷線程
interruptIdleWorkers();
// 將所有任務(wù)移動(dòng)到list容器
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 嘗試結(jié)束線池
tryTerminate();
// 返回所有未執(zhí)行的任務(wù)
return tasks;
}
通過(guò)遍歷工作線程容器 workers,然后逐個(gè)中斷工作線程,如果無(wú)法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無(wú)法終止shutdownNow首先將線程池的狀態(tài)設(shè)置成STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表
tryTerminate() 嘗試結(jié)束線程池
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 判斷是否在運(yùn)行中,如果是直接返回
if (isRunning(c) ||
// 判斷是否進(jìn)入整理狀態(tài),如果進(jìn)入了直接返回
runStateAtLeast(c, TIDYING) ||
// 如果是狀態(tài)是關(guān)閉并且隊(duì)列非空,也直接返回(關(guān)閉狀態(tài)需要等到隊(duì)列里面的線程處理完)
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 判斷工作線程是否都關(guān)閉了
if (workerCountOf(c) != 0) { // Eligible to terminate
// 中斷空閑線程
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 將狀態(tài)替換成整理狀態(tài)
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 整理發(fā)放執(zhí)行
terminated();
} finally {
// 狀態(tài)替換成結(jié)束狀態(tài)
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
結(jié)束線程池大致流程為:
判斷是否在運(yùn)行中,如果是則不結(jié)束線程 判斷是否進(jìn)入整理狀態(tài),如果是也不用執(zhí)行后面內(nèi)容了 判斷如果線程池是關(guān)閉狀態(tài)并且隊(duì)列非空,則不結(jié)束線程池(關(guān)閉狀態(tài)需要等到隊(duì)列里面的線程處理完) 判斷工作線程是否都關(guān)閉了,如果沒(méi)有就發(fā)起中斷工作線程的請(qǐng)求 獲取全局鎖將線程池狀態(tài)替換成整理狀態(tài) 調(diào)用 terminated();擴(kuò)展方法(這也是一個(gè)擴(kuò)展方法,在線程池結(jié)束的時(shí)候調(diào)用)將線程池狀態(tài)替換成結(jié)束狀態(tài) 解除全局鎖
注意: 我們可以通過(guò)的 shutdown或shutdownNow方法來(lái)結(jié)束線程池。他們都是通過(guò)遍歷工作線程容器,然后逐個(gè)中斷工作線程,所以無(wú)法響應(yīng)中斷的任務(wù) 可能永遠(yuǎn)無(wú)法終止。shutdown和shutdownNow的區(qū)別在于:shutdownNow首先將線程池的狀態(tài)設(shè)置成STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表;而shutdown只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài),然后中斷所有沒(méi)有正在執(zhí)行任務(wù)的線 程。只要調(diào)用了 shutdown和shutdownNow那么isShutdown方法就會(huì)返回true當(dāng)所有的任務(wù)都已關(guān)閉后,才表示線程池關(guān)閉成功,這時(shí)調(diào)用 isTerminaed方法會(huì)返回true
線程池的監(jiān)控
通過(guò)擴(kuò)展線程池進(jìn)行監(jiān)控。可以通過(guò)繼承線程池來(lái)自定義線程池,重寫(xiě)線程池的 beforeExecute、afterExecute和terminated方法,也可以在任務(wù)執(zhí)行前、執(zhí)行后和線程池關(guān)閉前執(zhí) 行一些代碼來(lái)進(jìn)行監(jiān)控。例如,監(jiān)控任務(wù)的平均執(zhí)行時(shí)間、最大執(zhí)行時(shí)間和最小執(zhí)行時(shí)間等。這幾個(gè)方法在線程池里是空方法。
getTaskCount()
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
if (w.isLocked())
++n;
}
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}
獲取線程池需要執(zhí)行的任務(wù)數(shù)量。總數(shù)=已經(jīng)結(jié)束線工作程完成的任務(wù)數(shù)(completedTaskCount) + 還未結(jié)束線程工作線程完成的任務(wù)數(shù)(w.completedTasks)+正在執(zhí)行的任務(wù)數(shù)(w.isLocked())+還未執(zhí)行的任務(wù)數(shù)(workQueue.size())
getCompletedTaskCount()
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}
獲取線程池在運(yùn)行過(guò)程中已完成的任務(wù)數(shù)量。總數(shù)=已經(jīng)結(jié)束線工作程完成的任務(wù)數(shù)(completedTaskCount) + 還未結(jié)束線程工作線程完成的任務(wù)數(shù)(w.completedTasks)
getLargestPoolSize()
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}
獲取線程池里曾經(jīng)創(chuàng)建過(guò)的最大線程數(shù)量。通過(guò)這個(gè)數(shù)據(jù)可以知道線程池是 否曾經(jīng)滿過(guò)。如該數(shù)值等于線程池的最大大小,則表示線程池曾經(jīng)滿過(guò)。
getPoolSize()
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Remove rare and surprising possibility of
// isTerminated() && getPoolSize() > 0
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}
獲取線程池的線程數(shù)量。如果線程池不銷毀的話,線程池里的線程不會(huì)自動(dòng)銷 毀,所以這個(gè)大小只增不減。
getActiveCount()
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers)
if (w.isLocked())
++n;
return n;
} finally {
mainLock.unlock();
}
}
獲取活動(dòng)的線程數(shù)。
合理地配置線程池
要想合理地配置線程池,就必須首先分析任務(wù)特性,可以從以下幾個(gè)角度來(lái)分析。
任務(wù)的性質(zhì):CPU密集型任務(wù)、IO密集型任務(wù)和混合型任務(wù)。 任務(wù)的優(yōu)先級(jí):高、中和低。 任務(wù)的執(zhí)行時(shí)間:長(zhǎng)、中和短。 任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫(kù)連接。
性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開(kāi)處理。CPU密集型任務(wù)應(yīng)配置盡可能小的 線程,如配置Ncpu+1個(gè)線程的線程池。由于IO密集型任務(wù)線程并不是一直在執(zhí)行任務(wù),則應(yīng)配 置盡可能多的線程,如2*Ncpu。混合型的任務(wù),如果可以拆分,將其拆分成一個(gè)CPU密集型任務(wù) 和一個(gè)IO密集型任務(wù),只要這兩個(gè)任務(wù)執(zhí)行的時(shí)間相差不是太大,那么分解后執(zhí)行的吞吐量 將高于串行執(zhí)行的吞吐量。如果這兩個(gè)任務(wù)執(zhí)行時(shí)間相差太大,則沒(méi)必要進(jìn)行分解。
優(yōu)先級(jí)不同的任務(wù)可以使用優(yōu)先級(jí)隊(duì)列PriorityBlockingQueue來(lái)處理。它可以讓優(yōu)先級(jí)高 的任務(wù)先執(zhí)行。
如果一直有優(yōu)先級(jí)高的任務(wù)提交到隊(duì)列里,那么優(yōu)先級(jí)低的任務(wù)可能永遠(yuǎn)不能 執(zhí)行。 可以通過(guò) Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個(gè)數(shù)。 建議使用有界隊(duì)列。有界隊(duì)列能增加系統(tǒng)的穩(wěn)定性和預(yù)警能力,可以根據(jù)需要設(shè)大一點(diǎn) 兒,比如幾千。無(wú)界隊(duì)列在某些異常情況下可能會(huì)撐爆內(nèi)存。
N核服務(wù)器,通過(guò)執(zhí)行業(yè)務(wù)的單線程分析出本地計(jì)算時(shí)間為x,等待時(shí)間為y,則工作線程數(shù)(線程池線程數(shù))設(shè)置為 N*(x+y)/x,能讓CPU的利用率最大化,詳情可以參考線程數(shù)究竟設(shè)多少合理
參考
《java并發(fā)編程的藝術(shù)》
源碼
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases
spring-boot-student-concurrent 工程
干貨分享
最近將個(gè)人學(xué)習(xí)筆記整理成冊(cè),使用PDF分享。關(guān)注我,回復(fù)如下代碼,即可獲得百度盤(pán)地址,無(wú)套路領(lǐng)取!
?001:《Java并發(fā)與高并發(fā)解決方案》學(xué)習(xí)筆記;?002:《深入JVM內(nèi)核——原理、診斷與優(yōu)化》學(xué)習(xí)筆記;?003:《Java面試寶典》?004:《Docker開(kāi)源書(shū)》?005:《Kubernetes開(kāi)源書(shū)》?006:《DDD速成(領(lǐng)域驅(qū)動(dòng)設(shè)計(jì)速成)》?007:全部?008:加技術(shù)群討論
近期熱文
?LinkedBlockingQueue vs ConcurrentLinkedQueue?解讀Java 8 中為并發(fā)而生的 ConcurrentHashMap?Redis性能監(jiān)控指標(biāo)匯總?最全的DevOps工具集合,再也不怕選型了!?微服務(wù)架構(gòu)下,解決數(shù)據(jù)庫(kù)跨庫(kù)查詢的一些思路?聊聊大廠面試官必問(wèn)的 MySQL 鎖機(jī)制
關(guān)注我
喜歡就點(diǎn)個(gè)"在看"唄^_^

