ThreadPoolExecutor 深入解析
作者:小波同學(xué)
鏈接:https://www.jianshu.com/p/69e41da799b9
線程池的作用
● 利用線程池管理并復(fù)用線程、控制最大并發(fā)數(shù)等
既然使用了線程池就需要確保線程池是在復(fù)用的,每次new一個線程池出來可能比不用線程池還糟糕。如果沒有直接聲明線程池而是使用其他人提供的類庫來獲得一個線程池,請務(wù)必查看源碼,以確認(rèn)線程池的實例化方式和配置是符合預(yù)期的。
● 實現(xiàn)任務(wù)線程隊列緩存策略和拒絕機制。
● 實現(xiàn)某些與時間相關(guān)的功能,如定時執(zhí)行、周期執(zhí)行等
● 隔離線程環(huán)境
比如,交易服務(wù)和搜索服務(wù)在同一臺服務(wù)器上,分別開啟兩個線程池,交易線程的資源消耗明顯要大;因此,通過配置獨立的線程池,將較慢的交易服務(wù)與搜索服務(wù)隔離開,避免各服務(wù)線程相互影響。
Java中的線程池是運用場景最多的并發(fā)框架,幾乎所有需要異步或并發(fā)執(zhí)行任務(wù)的程序 都可以使用線程池。合理地使用線程池能夠帶來3個好處:
1、降低資源消耗。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
2、提高響應(yīng)速度。當(dāng)任務(wù)到達時,任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。
3、提高線程的可管理性。線程是稀缺資源,如果無限制地創(chuàng)建,不僅會消耗系統(tǒng)資源, 還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控。
線程池的主要處理流程

接口定義和實現(xiàn)類
| 類型 | 名稱 | 描述 |
|---|---|---|
| 接口 | Executor | 最上層的接口,定義了執(zhí)行任務(wù)的方法execute |
| 接口 | ExecutorService | 繼承了Executor接口,拓展了Callable、Future、關(guān)閉方法 |
| 接口 | ScheduledExecutorService | 繼承了ExecutorService,增加了定時任務(wù)相關(guān)方法 |
| 實現(xiàn)類 | ThreadPoolExecutor | 基礎(chǔ)、標(biāo)準(zhǔn)的線程池實現(xiàn) |
| 實現(xiàn)類 | ScheduledThreadPoolExecutor | 繼承了ThreadPoolExecutor,實現(xiàn)了ScheduledExecutorService中相關(guān)定時任務(wù)的方法 |
ThreadPoolExecutor 類圖

java中的線程池都是基于ThreadPoolExecutor 來實現(xiàn)的。
可以認(rèn)為ScheduledThreadPoolExecutor是最豐富的實現(xiàn)類。
ExecutorService 方法定義
public interface ExecutorService extends Executor {
/**
* 在之前提交的,需要被執(zhí)行的任務(wù)中,有序的進行關(guān)閉操作,并且此時不會再接受新的任務(wù)
* 如果此時所有的任務(wù)已經(jīng)關(guān)閉的話,那么就不會起到什么效果,因為已經(jīng)沒有任務(wù)可關(guān)閉了
*/
void shutdown();
/**
* 嘗試關(guān)閉所有正在執(zhí)行的任務(wù),并且中斷正在等待要執(zhí)行的任務(wù),返回一個包含正在等待的任務(wù)的列表
* @return
*/
List<Runnable> shutdownNow();
/**
* 如果線程已經(jīng)關(guān)閉了,就返回true
* @return
*/
boolean isShutdown();
/**
* 如果所有的線程任務(wù)已經(jīng)關(guān)閉了,就返回true
* @return
*/
boolean isTerminated();
/**
* 只有當(dāng)所有的任務(wù)都成功執(zhí)行,否則會一直處于阻塞狀態(tài),只有當(dāng)一下情況發(fā)生時,才會中斷阻塞
* 例如收到一個關(guān)閉的請求,或者超時發(fā)生、或者當(dāng)前的線程被中斷后
* @param timeout
* @param unit
* @return
* @throws InterruptedException
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 提交一個需要返回結(jié)果的任務(wù)去執(zhí)行,返回一個有結(jié)果的消息體,只有成功執(zhí)行后,才會返回結(jié)果
* @param task
* @param <T>
* @return
*/
<T> Future<T> submit(Callable<T> task);
/**
* 只有當(dāng)任務(wù)成功被執(zhí)行后,才會返回給定的結(jié)果
* @param task
* @param result
* @param <T>
* @return
*/
<T> Future<T> submit(Runnable task, T result);
/**
* 提交一個Runnable任務(wù)用于執(zhí)行,和返回代表任務(wù)的Future。
* Future的get方法成功執(zhí)行后,返回null
*/
Future<?> submit(Runnable task);
/**
* 提交一批任務(wù),并返回一批任務(wù)的結(jié)果列表
* @param tasks
* @param <T>
* @return
* @throws InterruptedException
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/**
* 執(zhí)行給定的任務(wù)集合,執(zhí)行完畢或者超時后,返回結(jié)果,其他任務(wù)終止
*
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 提交一批任務(wù)信息,當(dāng)其中一個成功的執(zhí)行,沒有返回異常的時候,就返回結(jié)果
* @param tasks
* @param <T>
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
/**
* 執(zhí)行給定的任務(wù)集合,任意一個執(zhí)行成功或超時后,返回結(jié)果,其他任務(wù)終止
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ScheduledExecutorService
public interface ScheduledExecutorService extends ExecutorService {
//創(chuàng)建并執(zhí)行一個一次性任務(wù), 過了延遲時間就會被執(zhí)行
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
//創(chuàng)建并執(zhí)行一個一次性任務(wù), 過了延遲時間就會被執(zhí)行
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
//創(chuàng)建并執(zhí)行一個周期性任務(wù)
//過了給定的初始延遲時間,會第一次被執(zhí)行
//執(zhí)行過程中發(fā)生了異常,那么任務(wù)就停止
//一次任務(wù) 執(zhí)行時長超過了周期時間,下一次任務(wù)會等到該次任務(wù)執(zhí)行結(jié)束后,立刻執(zhí)行,
//這也是它和scheduleWithFixedDelay的重要區(qū)別
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
//創(chuàng)建并執(zhí)行一個周期性任務(wù)
//過了初始延遲時間,第一次被執(zhí)行,后續(xù)以給定的周期時間執(zhí)行
//執(zhí)行過程中發(fā)生了異常,那么任務(wù)就停止
//一次任務(wù)執(zhí)行時長超過了周期時間,下一次任務(wù)會在該次任務(wù)執(zhí)行結(jié)束的時間基礎(chǔ)上,計算執(zhí)行延時。
//對于超過周期的長時間處理任務(wù)的不同處理方式,這是它和scheduleAtFixedRate的重要區(qū)別。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
Executors工具類
可以自己實例化線程池,也可用Executors創(chuàng)建線程池的工廠類,推薦自己實例化線程池。
常用方法
ExecutorService 的抽象類AbstractExecutorService提供了submit、invokeAll 等方法的實現(xiàn),但是核心方法Executor.execute()并沒有在這里實現(xiàn)。
因為所有的任務(wù)都在該方法執(zhí)行,不同實現(xiàn)會帶來不同的執(zhí)行策略。
通過Executors的靜態(tài)工廠方法可以創(chuàng)建三個線程池的包裝對象
ForkJoinPool
ThreadPoolExecutor
ScheduledThreadPoolExecutor
● Executors.newWorkStealingPool
JDK8 引入,創(chuàng)建持有足夠線程的線程池支持給定的并行度,并通過使用多個隊列減少競爭,構(gòu)造方法中把CPU數(shù)量設(shè)置為默認(rèn)的并行度。
返回ForkJoinPool ( JDK7引入)對象,它也是AbstractExecutorService 的子類
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
● Executors.newCachedThreadPool
創(chuàng)建一個無界的緩沖線程池,它的任務(wù)隊列是一個同步隊列。
任務(wù)加入到池中
若池中有空閑線程,則用空閑線程執(zhí)行
若無,則創(chuàng)建新線程執(zhí)行
池中的線程空閑超過60秒,將被銷毀。線程數(shù)隨任務(wù)的多少變化。適用于執(zhí)行耗時較小的異步任務(wù)。
線程池的核心線程數(shù)=0,最大線程數(shù)= Integer.MAX_ _VALUE
maximumPoolSize 最大可至Integer.MAX_VALUE,是高度可伸縮的線程池。若達到該上限,沒有服務(wù)器能夠繼續(xù)工作,直接OOM。
keepAliveTime默認(rèn)為60秒;
工作線程處于空閑狀態(tài),則回收工作線程;如果任務(wù)數(shù)增加,再次創(chuàng)建出新線程處理任務(wù)。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
● Executors.newScheduledThreadPool
能定時執(zhí)行任務(wù)的線程池。該池的核心線程數(shù)由參數(shù)指定,線程數(shù)最大至Integer.MAX_ VALUE,與上述一樣存在OOM風(fēng)險。
ScheduledExecutorService接口的實現(xiàn)類,支持定時及周期性任務(wù)執(zhí)行;相比Timer、ScheduledExecutorService 更安全,功能更強大。
與newCachedThreadPool的區(qū)別是不回收工作線程。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
● Executors.newSingleThreadExecutor
創(chuàng)建一個單線程的線程池,相當(dāng)于單線程串行執(zhí)行所有任務(wù),保證按任務(wù)的提交順序依次執(zhí)行。
只有1個線程來執(zhí)行無界任務(wù)隊列的單-線程池。該線程池確保任務(wù)按加入的順序一個一
個依次執(zhí)行。當(dāng)唯一的線程因任務(wù)異常中止時,將創(chuàng)建一個新的線程來繼續(xù)執(zhí)行后續(xù)的任務(wù)。
與newFixedThreadPool(1)的區(qū)別在于,單線程池的池大小在newSingleThreadExecutor方法中硬編碼,不能再改變的。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
● Executors.newFixedThreadPool
創(chuàng)建一個固定大小任務(wù)隊列容量無界的線程池,輸入的參數(shù)即是固定線程數(shù);既是核心線程數(shù)也是最大線程數(shù);不存在空閑線程,所以keepAliveTime等于0。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
ThreadPoolExecutor 核心屬性
// 狀態(tài)控制屬性:高3位表示線程池的運行狀態(tài),剩下的29位表示當(dāng)前有效的線程數(shù)量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 線程池的基本大小,當(dāng)提交一個任務(wù)到線程池時,線程池會創(chuàng)建一個線程來執(zhí)行任務(wù),
// 即使其他空閑的基本線程能夠執(zhí)行新任務(wù)也會創(chuàng)建線程,等到需要執(zhí)行的任務(wù)數(shù)大于
// 線程池基本大小時就不再創(chuàng)建。如果調(diào)用了線程池的prestartAllCoreThreads()方法,
// 線程池會提前創(chuàng)建并啟動所有基本線程。
private volatile int corePoolSize;
// 線程池線程最大數(shù)量,如果隊列滿了,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),
// 則線程池會再創(chuàng)建新的線程執(zhí)行任務(wù)。如果使用了無界的任務(wù)隊列這個參數(shù)就沒什么效果。
private volatile int maximumPoolSize;
// 用于設(shè)置創(chuàng)建線程的工廠,可以通過線程工廠給每個創(chuàng)建出來的線程設(shè) 置更有意義的名字。
private volatile ThreadFactory threadFactory;
// 飽和策略,默認(rèn)情況下是AbortPolicy。
private volatile RejectedExecutionHandler handler;
// 線程池的工作線程空閑后,保持存活的時間。如果任務(wù)很多,并且每個任務(wù)執(zhí)行的時間比較短,
// 可以調(diào)大時間,提高線程的利用率。
private volatile long keepAliveTime;
// 用于保存等待執(zhí)行的任務(wù)的阻塞隊列
private final BlockingQueue<Runnable> workQueue;
// 存放工作線程的容器,必須獲取到鎖才能訪問
private final HashSet<Worker> workers = new HashSet<Worker>();
// ctl的拆包和包裝
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl狀態(tài)控制屬性,高3位表示線程池的運行狀態(tài)(runState),剩下的29位表示當(dāng)前有效的線程數(shù)量(workerCount)
線程池最大線程數(shù)是(1 << COUNT_BITS) - 1 = 536 870 911
@Native public static final int SIZE = 32;
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
線程池的運行狀態(tài)runState
| 狀態(tài) | 解釋 |
|---|---|
| RUNNING | 運行態(tài),可處理新任務(wù)并執(zhí)行隊列中的任務(wù) |
| SHUTDOW | 關(guān)閉態(tài),不接受新任務(wù),但處理隊列中的任務(wù) |
| STOP | 停止態(tài),不接受新任務(wù),不處理隊列中任務(wù),且打斷運行中任務(wù) |
| TIDYING | 整理態(tài),所有任務(wù)已經(jīng)結(jié)束,workerCount = 0 ,將執(zhí)行terminated()方法 |
| TERMINATED | 結(jié)束態(tài),terminated() 方法已完成 |

RejectedExecutionHandler(拒絕策略)
AbortPolicy:直接拋出異常。
CallerRunsPolicy:只用調(diào)用者所在線程來運行任務(wù)。
DiscardOldestPolicy:丟棄隊列里最近的一個任務(wù),并執(zhí)行當(dāng)前任務(wù)。
DiscardPolicy:不處理,丟棄掉。
核心內(nèi)部類 Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// 正在執(zhí)行任務(wù)的線程
final Thread thread;
// 線程創(chuàng)建時初始化的任務(wù)
Runnable firstTask;
// 完成任務(wù)計數(shù)器
volatile long completedTasks;
Worker(Runnable firstTask) {
// 在runWorker方法運行之前禁止中斷,要中斷線程必須先獲取worker內(nèi)部的互斥鎖
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** delegates main run loop to outer runworker */
// 直接委托給外部runworker方法
public void run() {
runWorker(this);
}
...
}
Worker 類將執(zhí)行任務(wù)的線程封裝到了內(nèi)部,在初始化Worker 的時候,會調(diào)用ThreadFactory初始化新線程;Worker 繼承了AbstractQueuedSynchronizer,在內(nèi)部實現(xiàn)了一個互斥鎖,主要目的是控制工作線程的中斷狀態(tài)。
線程的中斷一般是由其他線程發(fā)起的,比如ThreadPoolExecutor#interruptIdleWorkers(boolean)方法,它在調(diào)用過程中會去中斷worker內(nèi)部的工作線程,Work的互斥鎖可以保證正在執(zhí)行的任務(wù)不被打斷。它是怎么保證的呢?在線程真正執(zhí)行任務(wù)的時候,也就是runWorker方法被調(diào)用時,它會先獲取到Work的鎖,當(dāng)我們在其他線程需要中斷當(dāng)前線程時也需要獲取到work的互斥鎖,否則不能中斷。
構(gòu)造函數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
通過構(gòu)造函數(shù)我們可以發(fā)現(xiàn),構(gòu)造函數(shù)就是在對線程池核心屬性進行賦值,下面我們來介紹一下這些核心屬性:
corePoolSize:核心線程數(shù)
maximumPoolSize:線程池最大數(shù)量
keepAliveTime:線程池的工作線程空閑后,保持存活的時間。
unit:線程活動保持時間的單位。
workQueue:用于保存等待執(zhí)行的任務(wù)的阻塞隊列,具體可以參考JAVA并發(fā)容器-阻塞隊列
threadFactory:用于設(shè)置創(chuàng)建線程的工廠
handler:飽和策略,默認(rèn)情況下是AbortPolicy。
execute() 提交線程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 獲取控制的值
int c = ctl.get();
// 判斷工作線程數(shù)是否小于corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 新創(chuàng)建核心線程
if (addWorker(command, true))
return;
c = ctl.get();
}
// 工作線程數(shù)大于或等于corePoolSize
// 判斷線程池是否處于運行狀態(tài),如果是將任務(wù)command入隊
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次檢查線程池的運行狀態(tài),如果不在運行中,那么將任務(wù)從隊列里面刪除,并嘗試結(jié)束線程池
if (! isRunning(recheck) && remove(command))
// 調(diào)用驅(qū)逐策略
reject(command);
// 檢查活躍線程總數(shù)是否為0
else if (workerCountOf(recheck) == 0)
// 新創(chuàng)建非核心線程
addWorker(null, false);
}
// 隊列滿了,新創(chuàng)建非核心線程
else if (!addWorker(command, false))
// 調(diào)用驅(qū)逐策略
reject(command);
}
該方法是沒有返回值的
addWorker() 新創(chuàng)建線程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 僅在必要的時候檢查隊列是否為NULL
// 檢查隊列是否處于非運行狀態(tài)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 獲取活躍線程數(shù)
int wc = workerCountOf(c);
// 判斷線程是否超過最大值,當(dāng)隊列滿了則驗證線程數(shù)是否大于maximumPoolSize,
// 沒有滿則驗證corePoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 增加活躍線程總數(shù),否則重試
if (compareAndIncrementWorkerCount(c))
// 如果成功跳出外層循環(huán)
break retry;
c = ctl.get(); // Re-read ctl
// 再次校驗一下線程池運行狀態(tài)
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 工作線程是否啟動
boolean workerStarted = false;
// 工作線程是否創(chuàng)建
boolean workerAdded = false;
Worker w = null;
try {
// 新創(chuàng)建線程
w = new Worker(firstTask);
// 獲取新創(chuàng)建的線程
final Thread t = w.thread;
if (t != null) {
// 創(chuàng)建線程要獲得全局鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 檢查線程池的運行狀態(tài)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 檢查線程的狀態(tài)
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 將新建工作線程存放到容器
workers.add(w);
int s = workers.size();
if (s > largestPoolSize) {
// 跟蹤線程池最大的工作線程總數(shù)
largestPoolSize = s;
}
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 啟動工作線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 啟動新的工作線程失敗,
// 1\. 將工作線程移除workers容器
// 2\. 還原工作線程總數(shù)(workerCount)
// 3\. 嘗試結(jié)束線程
addWorkerFailed(w);
}
return workerStarted;
}
如果啟動新線程失敗那么addWorkerFailed()這個方法將做以下三件事:
1、將工作線程移除workers容器
2、還原工作線程總數(shù)(workerCount)
3、嘗試結(jié)束線程
execute() 執(zhí)行過程

1、如果當(dāng)前運行的線程少于corePoolSize,即使有空閑線程也會創(chuàng)建新線程來執(zhí)行任務(wù),(注意,執(zhí)行這一步驟 需要獲取全局鎖)。如果調(diào)用了線程池的restartAllCoreThreads()方法, 線程池會提前創(chuàng)建并啟動所有基本線程。
2、如果運行的線程等于或多于corePoolSize,則將任務(wù)加入BlockingQueue。
3、如果無法將任務(wù)加入BlockingQueue(隊列已滿),則創(chuàng)建新的線程來處理任務(wù)(注意,執(zhí) 行這一步驟需要獲取全局鎖)。
4、如果創(chuàng)建新線程將使當(dāng)前運行的線程超出maximumPoolSize,任務(wù)將被拒絕,并調(diào)用 RejectedExecutionHandler.rejectedExecution()方法。
線程任務(wù)的執(zhí)行
線程的正在執(zhí)行是ThreadPoolExecutor.Worker#run()方法,但是這個方法直接委托給了外部的runWorker()方法,源碼如下:
// 直接委托給外部runworker方法
public void run() {
runWorker(this);
}
runWorker() 執(zhí)行任務(wù)
final void runWorker(Worker w) {
// 當(dāng)前Work中的工作線程
Thread wt = Thread.currentThread();
// 獲取初始任務(wù)
Runnable task = w.firstTask;
// 初始任務(wù)置NULL(表示不是建線程)
w.firstTask = null;
// 修改鎖的狀態(tài),使需發(fā)起中斷的線程可以獲取到鎖(使工作線程可以響應(yīng)中斷)
w.unlock(); // allow interrupts
// 工作線程是否是異常結(jié)束
boolean completedAbruptly = true;
try {
// 循環(huán)的從隊列里面獲取任務(wù)
while (task != null || (task = getTask()) != null) {
// 每次執(zhí)行任務(wù)時需要獲取到內(nèi)置的互斥鎖
w.lock();
// 1\. 當(dāng)前工作線程不是中斷狀態(tài),且線程池是STOP,TIDYING,TERMINATED狀態(tài),我們需要中斷當(dāng)前工作線程
// 2\. 當(dāng)前工作線程是中斷狀態(tài),且線程池是STOP,TIDYING,TERMINATED狀態(tài),我們需要中斷當(dāng)前工作線程
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())
// 中斷線程,中斷標(biāo)志位設(shè)置成true
wt.interrupt();
try {
// 執(zhí)行任務(wù)前置方法,擴展用
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 執(zhí)行任務(wù)后置方法,擴展用
afterExecute(task, thrown);
}
} finally {
// 任務(wù)NULL表示已經(jīng)處理了
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 將工作線程從容器中剔除
processWorkerExit(w, completedAbruptly);
}
}
正在執(zhí)行線程的方法,執(zhí)行流程:
1、獲取到當(dāng)前的工作線程
2、獲取初始化的線程任務(wù)
3、修改鎖的狀態(tài),使工作線程可以響應(yīng)中斷
4、獲取工作線程的鎖(保證在任務(wù)執(zhí)行過程中工作線程不被外部線程中斷),如果獲取到的任務(wù)是NULL,則結(jié)束當(dāng)前工作線程
5、判斷先測試狀態(tài),看是否需要中斷當(dāng)前工作線程
6、執(zhí)行任務(wù)前置方法beforeExecute(wt, task);
7、執(zhí)行任務(wù)(執(zhí)行提交到線程池的線程)task.run();
8、執(zhí)行任務(wù)后置方法afterExecute(task, thrown);,處理異常信息
9、修改完成任務(wù)的總數(shù)
10、解除當(dāng)前工作線程的鎖
11、獲取隊列里面的任務(wù),循環(huán)第4步
12、將工作線程從容器中剔除
wt.isInterrupted():獲取中斷狀態(tài),無副作用
Thread.interrupted():獲取中斷狀態(tài),并將中斷狀態(tài)恢重置成false(不中斷)
beforeExecute(wt, task);:執(zhí)行任務(wù)前置方法,擴展用。如果這個方法在執(zhí)行過程中拋出異常,那么會導(dǎo)致當(dāng)前工作線程直接死亡而被回收,工作線程異常結(jié)束標(biāo)記位completedAbruptly被設(shè)置成true,任務(wù)線程不能被執(zhí)行
task.run();:執(zhí)行任務(wù)
afterExecute(task, thrown);:執(zhí)行任務(wù)后置方法,擴展用。這個方法可以收集到任務(wù)運行的異常信息,這個方法如果有異常拋出,也會導(dǎo)致當(dāng)前工作線程直接死亡而被回收,工作線程異常結(jié)束標(biāo)記位completedAbruptly被設(shè)置成true
任務(wù)運行過程中的異常信息除了RuntimeException以外,其他全部封裝成Error,然后被afterExecute方法收集
terminated()這也是一個擴展方法,在線程池結(jié)束的時候調(diào)用
getTask() 獲取任務(wù)
private Runnable getTask() {
// 記錄最后一次獲取任務(wù)是不是超時了
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// 獲取線程池狀態(tài)
int rs = runStateOf(c);
// 線程池是停止?fàn)顟B(tài)或者狀態(tài)是關(guān)閉并且隊列為空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 扣減工作線程總數(shù)
decrementWorkerCount();
return null;
}
// 獲取工作線程總數(shù)
int wc = workerCountOf(c);
// 工作線程是否需要剔除
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 扣減工作線程總數(shù)
if (compareAndDecrementWorkerCount(c))
// 剔除工作線程,當(dāng)返回為NULL的時候,runWorker方法的while循環(huán)會結(jié)束
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
getTask() 阻塞或定時獲取任務(wù)。當(dāng)該方法返回NULL時,當(dāng)前工作線程會結(jié)束,最后被回收,下面是返回NULL的幾種情況:
1、當(dāng)前工作線程總數(shù)wc大于maximumPoolSize最大工作線程總數(shù)。maximumPoolSize可能被setMaximumPoolSize方法改變。
2、當(dāng)線程池處于停止?fàn)顟B(tài)時。
3、當(dāng)線程池處于關(guān)閉狀態(tài)且阻塞隊列為空。
4、當(dāng)前工作線程超時等待任務(wù),并且當(dāng)前工作線程總數(shù)wc大于corePoolSize或者allowCoreThreadTimeOut=true允許核心線程超時被回收,默認(rèn)是false。
processWorkerExit() 工作線程結(jié)束
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 判斷是否是異常情況導(dǎo)致工作線程被回收
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
// 如果是扣減工作線程總數(shù),如果不是在getTask()方法就已經(jīng)扣減了
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 將當(dāng)前工作線程完成任務(wù)的總數(shù)加到completedTaskCount標(biāo)志位上
completedTaskCount += w.completedTasks;
// 剔除當(dāng)前工作線程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 嘗試結(jié)束線程池
tryTerminate();
// 判刑是否需要新實例化工程線程
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
剔除線程流程:
1、判斷是否是異常情況導(dǎo)致工作線程被回收,如果是workerCount--
2、獲取到全局鎖
3、將當(dāng)前工作線程完成任務(wù)的總數(shù)加到completedTaskCount標(biāo)志位上
4、剔除工作線程
5、解鎖
6、嘗試結(jié)束線程池tryTerminate()
7、判刑是否需要重新實例化工程線程放到workers容器
結(jié)束線程池
shutdown() 關(guān)閉線程池
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 檢查權(quán)限
checkShutdownAccess();
// 設(shè)置線程池狀態(tài)為關(guān)閉
advanceRunState(SHUTDOWN);
// 中斷線程
interruptIdleWorkers();
// 擴展方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 嘗試結(jié)束線池
tryTerminate();
}
1、通過遍歷工作線程容器workers,然后逐個中斷工作線程,如果無法響應(yīng)中斷的任務(wù)可能永遠無法終止。
2、shutdown只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài),然后中斷所有沒有正在執(zhí)行任務(wù)的線程。
shutdown() 關(guān)閉線程池
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 檢查權(quán)限
checkShutdownAccess();
// 設(shè)置線程池狀態(tài)為停止?fàn)顟B(tài)
advanceRunState(STOP);
// 中斷線程
interruptIdleWorkers();
// 將所有任務(wù)移動到list容器
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 嘗試結(jié)束線池
tryTerminate();
// 返回所有未執(zhí)行的任務(wù)
return tasks;
}
1、通過遍歷工作線程容器workers,然后逐個中斷工作線程,如果無法響應(yīng)中斷的任務(wù)可能永遠無法終止。
2、shutdownNow首先將線程池的狀態(tài)設(shè)置成 STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表。
tryTerminate() 嘗試結(jié)束線程池
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 判斷是否在運行中,如果是直接返回
if (isRunning(c) ||
// 判斷是否進入整理狀態(tài),如果進入了直接返回
runStateAtLeast(c, TIDYING) ||
// 如果是狀態(tài)是關(guān)閉并且隊列非空,也直接返回(關(guān)閉狀態(tài)需要等到隊列里面的線程處理完)
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 判斷工作線程是否都關(guān)閉了
if (workerCountOf(c) != 0) { // Eligible to terminate
// 中斷空閑線程
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 將狀態(tài)替換成整理狀態(tài)
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 整理發(fā)放執(zhí)行
terminated();
} finally {
// 狀態(tài)替換成結(jié)束狀態(tài)
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
結(jié)束線程池大致流程為:
1、判斷是否在運行中,如果是則不結(jié)束線程
2、判斷是否進入整理狀態(tài),如果是也不用執(zhí)行后面內(nèi)容了
3、判斷如果線程池是關(guān)閉狀態(tài)并且隊列非空,則不結(jié)束線程池(關(guān)閉狀態(tài)需要等到隊列里面的線程處理完)
4、判斷工作線程是否都關(guān)閉了,如果沒有就發(fā)起中斷工作線程的請求
5、獲取全局鎖將線程池狀態(tài)替換成整理狀態(tài)
6、調(diào)用terminated();擴展方法(這也是一個擴展方法,在線程池結(jié)束的時候調(diào)用)
7、將線程池狀態(tài)替換成結(jié)束狀態(tài)
8、解除全局鎖
注意:
1、我們可以通過的shutdown或shutdownNow方法來結(jié)束線程池。他們都是通過遍歷工作線程容器,然后逐個中斷工作線程,所以無法響應(yīng)中斷的任務(wù) 可能永遠無法終止。
2、shutdown和shutdownNow的區(qū)別在于:shutdownNow首先將線程池的狀態(tài)設(shè)置成 STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表;而 shutdown只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài),然后中斷所有沒有正在執(zhí)行任務(wù)的線程。
3、只要調(diào)用了shutdown和shutdownNow那么isShutdown方法就會返回true。
4、當(dāng)所有的任務(wù)都已關(guān)閉后,才表示線程池關(guān)閉成功,這時調(diào)用isTerminaed方法會返回true。
線程池的監(jiān)控
通過擴展線程池進行監(jiān)控。可以通過繼承線程池來自定義線程池,重寫線程池的 beforeExecute、afterExecute和terminated方法,也可以在任務(wù)執(zhí)行前、執(zhí)行后和線程池關(guān)閉前執(zhí) 行一些代碼來進行監(jiān)控。例如,監(jiān)控任務(wù)的平均執(zhí)行時間、最大執(zhí)行時間和最小執(zhí)行時間等。這幾個方法在線程池里是空方法。
getTaskCount()
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
if (w.isLocked())
++n;
}
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}
獲取線程池需要執(zhí)行的任務(wù)數(shù)量。總數(shù)=已經(jīng)結(jié)束線工作程完成的任務(wù)數(shù)(completedTaskCount) + 還未結(jié)束線程工作線程完成的任務(wù)數(shù)(w.completedTasks)+正在執(zhí)行的任務(wù)數(shù)(w.isLocked())+還未執(zhí)行的任務(wù)數(shù)(workQueue.size())
getCompletedTaskCount()
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}
獲取線程池在運行過程中已完成的任務(wù)數(shù)量。總數(shù)=已經(jīng)結(jié)束線工作程完成的任務(wù)數(shù)(completedTaskCount) + 還未結(jié)束線程工作線程完成的任務(wù)數(shù)(w.completedTasks)
getLargestPoolSize()
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}
獲取線程池里曾經(jīng)創(chuàng)建過的最大線程數(shù)量。通過這個數(shù)據(jù)可以知道線程池是 否曾經(jīng)滿過。如該數(shù)值等于線程池的最大大小,則表示線程池曾經(jīng)滿過。
getPoolSize()
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Remove rare and surprising possibility of
// isTerminated() && getPoolSize() > 0
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}
獲取線程池的線程數(shù)量。如果線程池不銷毀的話,線程池里的線程不會自動銷 毀,所以這個大小只增不減。
getActiveCount()
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers)
if (w.isLocked())
++n;
return n;
} finally {
mainLock.unlock();
}
}
獲取活動的線程數(shù)。
合理地配置線程池
要想合理地配置線程池,就必須首先分析任務(wù)特性,可以從以下幾個角度來分析。
任務(wù)的性質(zhì):CPU密集型任務(wù)、IO密集型任務(wù)和混合型任務(wù)。
任務(wù)的優(yōu)先級:高、中和低。
任務(wù)的執(zhí)行時間:長、中和短。
任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接。
性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理。CPU密集型任務(wù)應(yīng)配置盡可能小的 線程,如配置Ncpu+1個線程的線程池。由于IO密集型任務(wù)線程并不是一直在執(zhí)行任務(wù),則應(yīng)配 置盡可能多的線程,如2*Ncpu。混合型的任務(wù),如果可以拆分,將其拆分成一個CPU密集型任務(wù) 和一個IO密集型任務(wù),只要這兩個任務(wù)執(zhí)行的時間相差不是太大,那么分解后執(zhí)行的吞吐量 將高于串行執(zhí)行的吞吐量。如果這兩個任務(wù)執(zhí)行時間相差太大,則沒必要進行分解。
優(yōu)先級不同的任務(wù)可以使用優(yōu)先級隊列PriorityBlockingQueue來處理。它可以讓優(yōu)先級高 的任務(wù)先執(zhí)行。
如果一直有優(yōu)先級高的任務(wù)提交到隊列里,那么優(yōu)先級低的任務(wù)可能永遠不能 執(zhí)行。
可以通過 Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個數(shù)。
建議使用有界隊列。有界隊列能增加系統(tǒng)的穩(wěn)定性和預(yù)警能力,可以根據(jù)需要設(shè)大一點 兒,比如幾千。無界隊列在某些異常情況下可能會撐爆內(nèi)存。
N核服務(wù)器,通過執(zhí)行業(yè)務(wù)的單線程分析出本地計算時間為x,等待時間為y,則工作線程數(shù)(線程池線程數(shù))設(shè)置為 N*(x+y)/x,能讓CPU的利用率最大化,詳情可以參考線程數(shù)究竟設(shè)多少合理。
最近給大家找了 Vue進階
資源,怎么領(lǐng)取?
掃二維碼,加我微信,回復(fù):Vue進階
注意,不要亂回復(fù) 沒錯,不是機器人 記得一定要等待,等待才有好東西
