JAVA線程池源碼全解析
線程池的具體使用方法和參數(shù)解析等我在之前已經(jīng)講解過(guò),如果對(duì)線程池基本用法和概念不清晰的可以先看下我之前的線程池的文章,這里就通過(guò)一張線程池運(yùn)行流程圖來(lái)幫助大家去簡(jiǎn)單了解下線程池的工作原理。

線程池源碼我們主要通過(guò)ThreadPoolExecutor進(jìn)行分析,一步一步剖析線程池源碼的核心內(nèi)容。
屬性解析
//高3位:表示當(dāng)前線程池運(yùn)行狀態(tài) 除去高3位之后的低位:
// 表示當(dāng)前線程池所擁有的線程數(shù)量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 表示在ctl中,低COUNT_BITS位 用于存放當(dāng)前線程數(shù)量的位
private static final int COUNT_BITS = Integer.SIZE - 3;
//低COUNT_BITS位 所能表達(dá)的最大數(shù)值
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//表示可接受新任務(wù),且可執(zhí)行隊(duì)列中的任務(wù);
private static final int RUNNING = -1 << COUNT_BITS;
//表示不接受新任務(wù),但可執(zhí)行隊(duì)列中的任務(wù);
private static final int SHUTDOWN = 0 << COUNT_BITS;
//表示不接受新任務(wù),且不再執(zhí)行隊(duì)列中的任務(wù),且中斷正在執(zhí)行的任務(wù);
private static final int STOP = 1 << COUNT_BITS;
// 所有任務(wù)已經(jīng)中止,且工作線程數(shù)量為0,最后變遷到這個(gè)狀態(tài)的線程將要執(zhí)行terminated()鉤子方法,只會(huì)有一個(gè)線程執(zhí)行這個(gè)方法;
private static final int TIDYING = 2 << COUNT_BITS;
//中止?fàn)顟B(tài),已經(jīng)執(zhí)行完terminated()鉤子方法;
private static final int TERMINATED = 3 << COUNT_BITS;
//任務(wù)隊(duì)列,當(dāng)線程池中的線程達(dá)到核心線程數(shù)量時(shí),再提交任務(wù) 就會(huì)直接提交到 workQueue
private final BlockingQueue<Runnable> workQueue;
//線程池全局鎖,增加worker 減少 worker 時(shí)需要持有mainLock , 修改線程池運(yùn)行狀態(tài)時(shí),也需要。
private final ReentrantLock mainLock = new ReentrantLock();
//線程池中真正存放 worker->thread 的地方。
private final HashSet<Worker> workers = new HashSet<Worker>();
private final Condition termination = mainLock.newCondition();
// 記錄線程池生命周期內(nèi) 線程數(shù)最大值
private int largestPoolSize;
// 記錄線程池所完成任務(wù)總數(shù)
private long completedTaskCount;
// 創(chuàng)建線程會(huì)使用線程工廠
private volatile ThreadFactory threadFactory;
/**
* 拒絕策略
*/
private volatile RejectedExecutionHandler handler;
//空閑線程存活時(shí)間,當(dāng)allowCoreThreadTimeOut == false 時(shí),會(huì)維護(hù)核心線程數(shù)量?jī)?nèi)的線程存活,超出部分會(huì)被超時(shí)。
//allowCoreThreadTimeOut == true 核心數(shù)量?jī)?nèi)的線程 空閑時(shí) 也會(huì)被回收。
private volatile long keepAliveTime;
//控制核心線程數(shù)量?jī)?nèi)的線程 是否可以被回收。true 可以,false不可以。
private volatile boolean allowCoreThreadTimeOut;
// 核心線程池?cái)?shù)量
private volatile int corePoolSize;
// 線程池最大數(shù)量
private volatile int maximumPoolSize;
描述線程池狀態(tài)的屬性是貫穿整個(gè)線程池源碼的核心,這里用一張圖來(lái)描述一下。

running狀態(tài):當(dāng)線程池是運(yùn)行狀態(tài)時(shí),可以接收任務(wù),也可以運(yùn)行任務(wù)。
shutdown狀態(tài):此狀態(tài)下,線程池不會(huì)再接收新的任務(wù),當(dāng)前的任務(wù)會(huì)繼續(xù)執(zhí)行完成
stop狀態(tài):當(dāng)調(diào)用shutdownNow方法時(shí),線程池會(huì)進(jìn)入stop狀態(tài),不會(huì)接受新的任務(wù),正在運(yùn)行的任務(wù)也會(huì)被立即終止
tidying狀態(tài):進(jìn)入該狀態(tài)下此時(shí)線程池中任務(wù)和線程數(shù)量都為空
線程池運(yùn)行任務(wù)可以通過(guò)submit方法和execute方法來(lái)完成
它量的區(qū)別在于submit會(huì)有返回值,返回Future對(duì)象,通過(guò)這個(gè)對(duì)象可以獲取線程執(zhí)行結(jié)果,execute沒(méi)有返回值,下面來(lái)分別進(jìn)行進(jìn)行分析
execute方法
首先來(lái)分析execute方法,這也是線程池最核心的方法,因?yàn)閟ubmit方法其底層也是調(diào)用execute方法進(jìn)行執(zhí)行。
說(shuō)execute方法之前,先來(lái)看下ThreadPoolExecutor的靜態(tài)內(nèi)部類(lèi)Worker類(lèi)。
//Worker采用了AQS的獨(dú)占模式
//獨(dú)占模式:兩個(gè)重要屬性 state 和 ExclusiveOwnerThread
//state:0時(shí)表示未被占用 > 0時(shí)表示被占用 < 0 時(shí) 表示初始狀態(tài),這種情況下不能被搶鎖。
//ExclusiveOwnerThread:表示獨(dú)占鎖的線程。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
// worker內(nèi)部封裝的工作線程
final Thread thread;
//假設(shè)firstTask不為空,那么當(dāng)worker啟動(dòng)后(內(nèi)部的線程啟動(dòng))會(huì)優(yōu)先執(zhí)行firstTask,當(dāng)執(zhí)行完firstTask后,會(huì)到queue中去獲取下一個(gè)任務(wù)。
Runnable firstTask;
// 記錄當(dāng)前worker所完成的任務(wù)數(shù)量
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
*
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 設(shè)置AQS獨(dú)占模式為初始化中的狀態(tài),這時(shí)候不能被搶占
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 使用線程工廠創(chuàng)建一個(gè)線程
this.thread = getThreadFactory().newThread(this);
}
線程池中的工作線程以Worker作為體現(xiàn),真正工作的線程為Worker的成員變量,Worker即是Runnable,又是同步器。Worker從工作隊(duì)列中取出任務(wù)來(lái)執(zhí)行,并能通過(guò)Worker控制任務(wù)狀態(tài)。
接下來(lái)通過(guò)execute方法源碼來(lái)看下如何通過(guò)Worker完成任務(wù)的創(chuàng)建及運(yùn)行。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 獲取ctl的值
int c = ctl.get();
// 當(dāng)前線程數(shù)小于核心線程池?cái)?shù)量,此次提交任務(wù),直接創(chuàng)建一個(gè)新的worker
// 相對(duì)應(yīng)線程池多了一個(gè)新的線程
if (workerCountOf(c) < corePoolSize) {
// addWorker 即為創(chuàng)建線程的過(guò)程,會(huì)創(chuàng)建worker對(duì)象,并且將command作為firstTask
// core==true 表示采用核心線程數(shù)量限制,false采用maxinumPoolSize
if (addWorker(command, true))
return;
c = ctl.get();
}
// 執(zhí)行到這里有幾種情況?
// 1.當(dāng)前線程池?cái)?shù)量已經(jīng)達(dá)到corePoolSize
// 2. addWorker失敗
// 當(dāng)前線程池處于running狀態(tài),嘗試將task放入到workQueue中
if (isRunning(c) && workQueue.offer(command)) {
// 獲取dangqctl
int recheck = ctl.get();
// !isRunning()成功,代表當(dāng)你提交到任務(wù)隊(duì)列后,線程池狀態(tài)被外部線程給修改,例如調(diào)用了shutDown(),shutDownNow()
// remove成功,提交之后,線程池中的線程還沒(méi)消費(fèi)
// remove 失敗,說(shuō)明在shutDown或者shutDown之前,就被線程池的線程給處理了
if (!isRunning(recheck) && remove(command))
reject(command);
// 當(dāng)前線程池是running狀態(tài),
else if (workerCountOf(recheck) == 0)
// 如果當(dāng)前沒(méi)有線程,就添加一個(gè)線程保證當(dāng)前至少有一個(gè)線程存在
addWorker(null, false);
}
//執(zhí)行到這里,有幾種情況?
//1.offer失敗
//2.當(dāng)前線程池是非running狀態(tài)
//1.offer失敗,需要做什么? 說(shuō)明當(dāng)前queue 滿了!這個(gè)時(shí)候 如果當(dāng)前線程數(shù)量尚未達(dá)到maximumPoolSize的話,會(huì)創(chuàng)建新的worker直接執(zhí)行command
//假設(shè)當(dāng)前線程數(shù)量達(dá)到maximumPoolSize的話,這里也會(huì)失敗,也走拒絕策略。
//2.線程池狀態(tài)為非running狀態(tài),這個(gè)時(shí)候因?yàn)?command != null addWorker 一定是返回false。
else if (!addWorker(command, false))
reject(command);
}
execute方法的執(zhí)行流程大致可以分為以下幾步:
工作線程數(shù)量小于核心數(shù)量,創(chuàng)建核心線程;
達(dá)到核心數(shù)量,進(jìn)入任務(wù)隊(duì)列;
任務(wù)隊(duì)列滿了,創(chuàng)建非核心線程;
達(dá)到最大數(shù)量,執(zhí)行拒絕策略;

通過(guò)這個(gè)運(yùn)行圖再結(jié)合上面的源碼可能對(duì)這個(gè)execute方法的具體執(zhí)行流程就更加清楚了,下面就深入到每一個(gè)流程的細(xì)節(jié)去分析。
如果工作線程小于核心線程就會(huì)通過(guò)addWorker方法創(chuàng)建新的核心任務(wù)線程。
addWorker方法
//firstTask 可以為null,表示啟動(dòng)worker之后,worker自動(dòng)到queue中獲取任務(wù).. 如果不是null,則worker優(yōu)先執(zhí)行firstTask
//core 采用的線程數(shù)限制 如果為true 采用 核心線程數(shù)限制 false采用 maximumPoolSize線程數(shù)限制.
private boolean addWorker(Runnable firstTask, boolean core) {
// 自旋:判斷當(dāng)前線程池狀態(tài)是否允許創(chuàng)建線程的事情
retry:
for (; ; ) {
// 獲取當(dāng)前ctl值
int c = ctl.get();
// 獲取當(dāng)前線程池運(yùn)行狀態(tài)
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 判斷當(dāng)前線程池是否允許添加線程
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
// 內(nèi)部自旋:獲取創(chuàng)建線程令牌的過(guò)程
for (; ; ) {
int wc = workerCountOf(c);
//判斷當(dāng)前線程是否超過(guò)限制,超過(guò)限制就無(wú)法創(chuàng)建線程
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 通過(guò)cas將線程數(shù)量加1,能夠成功加1相當(dāng)于申請(qǐng)到創(chuàng)建線程的令牌
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 判斷當(dāng)前線程狀態(tài)是否發(fā)生變化
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 創(chuàng)建work
w = new Worker(firstTask);
//將新創(chuàng)建的work節(jié)點(diǎn)的線程 賦值給t
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//持有全局鎖,可能會(huì)阻塞,直到獲取成功為止,同一時(shí)刻操縱 線程池內(nèi)部相關(guān)的操作,都必須持鎖。
mainLock.lock();
try {
//獲取最新線程池運(yùn)行狀態(tài)保存到rs中
int rs = runStateOf(ctl.get());
//
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//將創(chuàng)建的work添加到線程池中
workers.add(w);
// 獲取最新當(dāng)前線程池線程數(shù)量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 釋放鎖
mainLock.unlock();
}
if (workerAdded) {
// 添加work成功后,將創(chuàng)建的線程啟動(dòng)
t.start();
workerStarted = true;
}
}
} finally {
// 啟動(dòng)失敗
if (!workerStarted)
// 釋放令牌
// 將當(dāng)前worker清理出workers集合
addWorkerFailed(w);
}
return workerStarted;
}
addWorker方法總體就是做了兩件事
第一步:判斷是否可以創(chuàng)建新的Work
第二步:如果可以創(chuàng)建就創(chuàng)建新的Work,然后添加到任務(wù)隊(duì)列當(dāng)中,最后啟動(dòng)該線程。
這里會(huì)看到,創(chuàng)建Work會(huì)加鎖,加了一個(gè)來(lái)保證線程安全,新創(chuàng)建的Work會(huì)添加到任務(wù)隊(duì)列當(dāng)中,這個(gè)任務(wù)隊(duì)列其實(shí)就是通過(guò)HashSet來(lái)存儲(chǔ)work,最后啟動(dòng)線程,啟動(dòng)線程后,真正運(yùn)行這個(gè)任務(wù)的方法就不在execute當(dāng)中,而是通過(guò)
Work類(lèi)中的run方法來(lái)執(zhí)行。

runWorker方法
通過(guò)execute方法來(lái)啟動(dòng)線程后,就會(huì)通過(guò)work類(lèi)中的run方法調(diào)用ThreadPoolExecutor的runWork方法來(lái)運(yùn)行任務(wù)。
// 當(dāng)worker啟動(dòng)時(shí),會(huì)執(zhí)行run方法
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
// 工作線程
Thread wt = Thread.currentThread();
// 任務(wù)
Runnable task = w.firstTask;
// 強(qiáng)制釋放鎖
// 這里相當(dāng)于無(wú)視那邊的中斷標(biāo)記
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 取任務(wù),如果有第一個(gè)任務(wù),這里先執(zhí)行第一個(gè)任務(wù)
// 只要能取到任務(wù),這就是個(gè)死循環(huán)
// getTask:取任務(wù)
while (task != null || (task = getTask()) != null) {
// 加鎖,是因?yàn)楫?dāng)調(diào)用shutDown方法它會(huì)判斷當(dāng)前是否加鎖,加鎖就會(huì)跳過(guò)它接著執(zhí)行下一個(gè)任務(wù)
w.lock();
// 檢查線程池狀態(tài)
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 鉤子方法,方便子類(lèi)在任務(wù)執(zhí)行前做一些處理
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 真正任務(wù)執(zhí)行的地方
//task 可能是FutureTask 也可能是 普通的Runnable接口實(shí)現(xiàn)類(lèi)。
//如果前面是通過(guò)submit()提交的 runnable/callable 會(huì)被封裝成 FutureTask。這個(gè)不清楚,請(qǐng)看上一期,在b站。
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
runWorker方法就是真正執(zhí)行任務(wù)的方法,如果有第一個(gè)任務(wù)就先執(zhí)行第一個(gè)任務(wù),第一個(gè)任務(wù)執(zhí)行完后就通過(guò)getTask()方法從任務(wù)隊(duì)列中獲取任務(wù)來(lái)執(zhí)行。
getTask()方法
private Runnable getTask() {
// 是否超時(shí)
boolean timedOut = false; // Did the last poll() time out?
// 自旋
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
//當(dāng)前程池狀態(tài)是SHUTDOWN的時(shí)候會(huì)把隊(duì)列中的任務(wù)執(zhí)行完直到隊(duì)列為空
// 線程池狀態(tài)是stop時(shí)退出
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 獲取工作線程數(shù)量
int wc = workerCountOf(c);
// 是否允許超時(shí),有兩種情況:
// 1. 是允許核心線程數(shù)超時(shí),這種就是說(shuō)所有的線程都可能超時(shí)
// 2. 是工作線程數(shù)大于了核心數(shù)量,這種肯定是允許超時(shí)的
// 注意,非核心線程是一定允許超時(shí)的,這里的超時(shí)其實(shí)是指取任務(wù)超時(shí)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 真正取任務(wù)的地方
// 默認(rèn)情況,只有當(dāng)工作線程數(shù)量大于核心線程數(shù)量時(shí),才會(huì)調(diào)用poll方法觸發(fā)超時(shí)調(diào)用
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 取到任務(wù)就返回
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
這里取任務(wù)會(huì)根據(jù)工作線程的數(shù)量判斷是使用BlockingQueue的poll(timeout, unit)方法還是take()方法。
poll(timeout, unit)方法會(huì)在超時(shí)時(shí)返回null,如果timeout<=0,隊(duì)列為空時(shí)直接返回null。
take()方法會(huì)一直阻塞直到取到任務(wù)或拋出中斷異常。
submit方法
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
submit方法是支持傳入Runnable或者Callable,通過(guò)newFaskFor方法將其包裝到FutureTask進(jìn)行處理,F(xiàn)utureTask會(huì)在下篇文章進(jìn)行詳細(xì)講解,futureTask主要做了兩件事,一件事是擴(kuò)展run方法,用來(lái)完成結(jié)果值的處理,另一件事是暴露其get方法,通過(guò)get方法獲取執(zhí)行結(jié)果,這個(gè)get方法是阻塞的。
我做了一張思維導(dǎo)圖,通過(guò)這個(gè)思維導(dǎo)圖來(lái)梳理一遍線程池的大概脈絡(luò)

PS:如果覺(jué)得我的分享不錯(cuò),歡迎大家隨手點(diǎn)贊、在看。
(完) 加我"微信" 獲取一份 最新Java面試題資料 請(qǐng)備注:666,不然不通過(guò)~
最近好文
1、Spring Boot 實(shí)現(xiàn)掃碼登錄,這種方式太香了!!
2、SpringSecurity + JWT 實(shí)現(xiàn)單點(diǎn)登錄
3、基于 Vue+Spring 前后端分離管理系統(tǒng)ELAdmin
最近面試BAT,整理一份面試資料《Java面試BAT通關(guān)手冊(cè)》,覆蓋了Java核心技術(shù)、JVM、Java并發(fā)、SSM、微服務(wù)、數(shù)據(jù)庫(kù)、數(shù)據(jù)結(jié)構(gòu)等等。 獲取方式:關(guān)注公眾號(hào)并回復(fù) java 領(lǐng)取,更多內(nèi)容陸續(xù)奉上。 明天見(jiàn)(??ω??)??
