線程池技術(shù)之:ThreadPoolExecutor 源碼解析
java中的所說的線程池,一般都是圍繞著 ThreadPoolExecutor 來展開的。其他的實現(xiàn)基本都是基于它,或者模仿它的。所以只要理解 ThreadPoolExecutor, 就相當(dāng)于完全理解了線程池的精髓。
其實要理解一個東西,一般地,我們最好是要抱著自己的疑問或者理解去的。否則,往往收獲甚微。
理解 ThreadPoolExecutor, 我們可以先理解一個線程池的意義: 本質(zhì)上是提供預(yù)先定義好的n個線程,供調(diào)用方直接運行任務(wù)的一個工具。
線程池解決的問題:
1. 提高任務(wù)執(zhí)行的響應(yīng)速度,降低資源消耗。任務(wù)執(zhí)行時,直接立即使用線程池提供的線程運行,避免了臨時創(chuàng)建線程的CPU/內(nèi)存開銷,達(dá)到快速響應(yīng)的效果。
2. 提高線程的可管理性。線程總數(shù)可預(yù)知,避免用戶主動創(chuàng)建無限多線程導(dǎo)致死機風(fēng)險,還可以進(jìn)行線程統(tǒng)一的分配、調(diào)優(yōu)和監(jiān)控。
3. 避免對資源的過度使用。在超出預(yù)期的請求任務(wù)情況,響應(yīng)策略可控。
線程池提供的核心接口:
要想使用線程池,自然是要理解其接口的。一般我們使用 ExecotorService 進(jìn)行線程池的調(diào)用。然而,我們并不針對初學(xué)者。
整體的接口如下:

我們就挑幾個常用接口探討下:
submit(Runnable task): 提交一個無需返回結(jié)果的任務(wù)。
submit(Callable<T> task): 提交一個有返回結(jié)果的任務(wù)。
invokeAll(Collection<? extends Callable<T>> tasks, long, TimeUnit): 同時執(zhí)行n個任務(wù)并返回結(jié)果列表。
shutdown(): 關(guān)閉線程程池。
awaitTermination(long timeout, TimeUnit unit): 等待關(guān)閉結(jié)果,最長不超過timeout時間。
以上是ThreadPoolExector 提供的特性,針對以上特性。
我們應(yīng)該要有自己的幾個實現(xiàn)思路或疑問:
1. 線程池如何接受任務(wù)?
2. 線程如何運行任務(wù)?
3. 線程池如何關(guān)閉?
接下來,就讓我們帶著疑問去看實現(xiàn)吧。
ThreadPoolExecutor 核心實現(xiàn)原理
1. 線程池的處理流程
我們首先重點要看的是,如何執(zhí)行提交的任務(wù)。我可以通過下圖來看看。

總結(jié)描述下就是:
1. 判斷核心線程池是否已滿,如果不是,則創(chuàng)建線程執(zhí)行任務(wù)
2. 如果核心線程池滿了,判斷隊列是否滿了,如果隊列沒滿,將任務(wù)放在隊列中
3. 如果隊列滿了,則判斷線程池是否已滿,如果沒滿,創(chuàng)建線程執(zhí)行任務(wù)
4. 如果線程池也滿了,則按照拒絕策略對任務(wù)進(jìn)行處理
另外,我們來看一下 ThreadPoolExecutor 的構(gòu)造方法,因為這里會體現(xiàn)出每個屬性的含義。
/*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.** @param corePoolSize the number of threads to keep in the pool, even* if they are idle, unless {@code allowCoreThreadTimeOut} is set* @param maximumPoolSize the maximum number of threads to allow in the* pool* @param keepAliveTime when the number of threads is greater than* the core, this is the maximum time that excess idle threads* will wait for new tasks before terminating.* @param unit the time unit for the {@code keepAliveTime} argument* @param workQueue the queue to use for holding tasks before they are* executed. This queue will hold only the {@code Runnable}* tasks submitted by the {@code execute} method.* @param threadFactory the factory to use when the executor* creates a new thread* @param handler the handler to use when execution is blocked* because the thread bounds and queue capacities are reached* @throws IllegalArgumentException if one of the following holds:<br>* {@code corePoolSize < 0}<br>* {@code keepAliveTime < 0}<br>* {@code maximumPoolSize <= 0}<br>* {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}* or {@code threadFactory} or {@code handler} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
從構(gòu)造方法可以看出 ThreadPoolExecutor 的主要參數(shù) 7 個,在其注釋上也有說明功能,咱們翻譯下每個參數(shù)的功能:
corePoolSize: 線程池核心線程數(shù)(平時保留的線程數(shù)),使用時機: 在初始時刻,每次請求進(jìn)來都會創(chuàng)建一個線程直到達(dá)到該sizemaximumPoolSize: 線程池最大線程數(shù),使用時機: 當(dāng)workQueue都放不下時,啟動新線程,直到最大線程數(shù),此時到達(dá)線程池的極限keepAliveTime/unit: 超出corePoolSize數(shù)量的線程的保留時間,unit為時間單位workQueue: 阻塞隊列,當(dāng)核心線程數(shù)達(dá)到或者超出后,會先嘗試將任務(wù)放入該隊列由各線程自行消費;ArrayBlockingQueue: 構(gòu)造函數(shù)一定要傳大小LinkedBlockingQueue: 構(gòu)造函數(shù)不傳大小會默認(rèn)為65536(Integer.MAX_VALUE ),當(dāng)大量請求任務(wù)時,容易造成 內(nèi)存耗盡。SynchronousQueue: 同步隊列,一個沒有存儲空間的阻塞隊列 ,將任務(wù)同步交付給工作線程。PriorityBlockingQueue: 優(yōu)先隊列threadFactory:線程工廠,用于線程需要創(chuàng)建時,調(diào)用其newThread()生產(chǎn)新線程使用handler: 飽和策略,當(dāng)隊列已放不下任務(wù),且創(chuàng)建的線程已達(dá)到 maximum 后,則不能再處理任務(wù),直接將任務(wù)交給飽和策略AbortPolicy: 直接拋棄(默認(rèn))CallerRunsPolicy: 用調(diào)用者的線程執(zhí)行任務(wù)DiscardOldestPolicy: 拋棄隊列中最久的任務(wù)DiscardPolicy: 拋棄當(dāng)前任務(wù)
2. submit 流程詳解
當(dāng)調(diào)用 submit 方法,就是向線程池中提交一個任務(wù),處理流程如步驟1所示。但是我們需要更深入理解。
submit 方法是定義在 AbstractExecutorService 中,最終調(diào)用 ThreadPoolExecutor 的 execute 方法,即是模板方法模式的應(yīng)用。
// java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable, T)/*** @throws RejectedExecutionException {@inheritDoc}* @throws NullPointerException {@inheritDoc}*/public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();// 封裝任務(wù)和返回結(jié)果為 RunnableFuture, 統(tǒng)一交由具體的子類執(zhí)行RunnableFuture<T> ftask = newTaskFor(task, result);// execute 將會調(diào)用 ThreadPoolExecutor 的實現(xiàn),是我們討論的重要核心execute(ftask);return ftask;}// FutureTask 是個重要的線程池組件,它承載了具體的任務(wù)執(zhí)行流/*** Returns a {@code RunnableFuture} for the given runnable and default* value.** @param runnable the runnable task being wrapped* @param value the default value for the returned future* @param <T> the type of the given value* @return a {@code RunnableFuture} which, when run, will run the* underlying runnable and which, as a {@code Future}, will yield* the given value as its result and provide for cancellation of* the underlying task* @since 1.6*/protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);}// ThreadPoolExecutor 的任務(wù)提交過程// java.util.concurrent.ThreadPoolExecutor#execute/*** Executes the given task sometime in the future. The task* may execute in a new thread or in an existing pooled thread.** If the task cannot be submitted for execution, either because this* executor has been shutdown or because its capacity has been reached,* the task is handled by the current {@code RejectedExecutionHandler}.** @param command the task to execute* @throws RejectedExecutionException at discretion of* {@code RejectedExecutionHandler}, if the task* cannot be accepted for execution* @throws NullPointerException if {@code command} is null*/public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/// ctl 是一個重要的控制全局狀態(tài)的數(shù)據(jù)結(jié)構(gòu),定義為一個線程安全的 AtomicInteger// ctl = new AtomicInteger(ctlOf(RUNNING, 0));int c = ctl.get();// 當(dāng)還沒有達(dá)到核心線程池的數(shù)量時,直接添加1個新線程,然后讓其執(zhí)行任務(wù)即可if (workerCountOf(c) < corePoolSize) {// 2.1. 添加新線程,且執(zhí)行command任務(wù)// 添加成功,即不需要后續(xù)操作了,添加失敗,則說明外部環(huán)境變化了if (addWorker(command, true))return;c = ctl.get();}// 當(dāng)核心線程達(dá)到后,則嘗試添加到阻塞隊列中,具體添加方法由阻塞隊列實現(xiàn)// isRunning => c < SHUTDOWN;if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 2.2. 添加隊列成功后,還要再次檢測線程池的運行狀態(tài),決定啟動線程或者狀態(tài)過期// 2.2.1. 當(dāng)線程池已關(guān)閉,則將剛剛添加的任務(wù)移除,走reject策略if (! isRunning(recheck) && remove(command))reject(command);// 2.2.2. 當(dāng)一個worker都沒有時,則添加workerelse if (workerCountOf(recheck) == 0)addWorker(null, false);}// 當(dāng)隊列滿后,則直接再創(chuàng)建新的線程運行,如果不能再創(chuàng)建線程了,則 rejectelse if (!addWorker(command, false))// 2.3. 拒絕策略處理reject(command);}
通過上面這一小段代碼,我們就已經(jīng)完整地看到了。通過一個 ctl 變量進(jìn)行全局狀態(tài)控制,從而保證了線程安全性。整個框架并沒有使用鎖,但是卻是線程安全的。
整段代碼剛好完整描述了線程池的執(zhí)行流程:
1. 判斷核心線程池是否已滿,如果不是,則創(chuàng)建線程執(zhí)行任務(wù);
2. 如果核心線程池滿了,判斷隊列是否滿了,如果隊列沒滿,將任務(wù)放在隊列中;
3. 如果隊列滿了,則判斷線程池是否已滿,如果沒滿,創(chuàng)建線程執(zhí)行任務(wù);
4. 如果線程池也滿了,則按照拒絕策略對任務(wù)進(jìn)行處理;
2.1. 添加新的worker
一個worker,即是一個工作線程。
/*** Checks if a new worker can be added with respect to current* pool state and the given bound (either core or maximum). If so,* the worker count is adjusted accordingly, and, if possible, a* new worker is created and started, running firstTask as its* first task. This method returns false if the pool is stopped or* eligible to shut down. It also returns false if the thread* factory fails to create a thread when asked. If the thread* creation fails, either due to the thread factory returning* null, or due to an exception (typically OutOfMemoryError in* Thread.start()), we roll back cleanly.** @param firstTask the task the new thread should run first (or* null if none). Workers are created with an initial first task* (in method execute()) to bypass queuing when there are fewer* than corePoolSize threads (in which case we always start one),* or when the queue is full (in which case we must bypass queue).* Initially idle threads are usually created via* prestartCoreThread or to replace other dying workers.** @param core if true use corePoolSize as bound, else* maximumPoolSize. (A boolean indicator is used here rather than a* value to ensure reads of fresh values after checking other pool* state).* @return true if successful*/private boolean addWorker(Runnable firstTask, boolean core) {// 為確保線程安全,進(jìn)行CAS反復(fù)重試retry:for (;;) {int c = ctl.get();// 獲取runState , c 的高位存儲// c & ~CAPACITY;int rs = runStateOf(c);// Check if queue empty only if necessary.// 已經(jīng)shutdown, firstTask 為空的添加并不會成功if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);// 如果超出最大允許創(chuàng)建的線程數(shù),則直接失敗if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// CAS 更新worker+1數(shù),成功則說明占位成功退出retry,后續(xù)的添加操作將是安全的,失敗則說明已有其他線程變更該值if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctl// runState 變更,則退出到 retry 重新循環(huán)if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}// 以下為添加 worker 過程boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 使用 Worker 封閉 firstTask 任務(wù),后續(xù)運行將由 Worker 接管w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;// 添加 worker 的過程,需要保證線程安全mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());// SHUTDOWN 情況下還是會創(chuàng)建 Worker, 但是后續(xù)檢測將會失敗if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 既然是新添加的線程,就不應(yīng)該是 alive 狀態(tài)if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// workers 只是一個工作線程的容器,使用 HashSet 承載// private final HashSet<Worker> workers = new HashSet<Worker>();workers.add(w);int s = workers.size();// 維護(hù)一個全局達(dá)到過的最大線程數(shù)計數(shù)器if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}// worker 添加成功后,進(jìn)行將worker啟起來,里面應(yīng)該是有一個 死循環(huán),一直在獲取任務(wù)// 不然怎么運行添加到隊列里的任務(wù)呢?if (workerAdded) {t.start();workerStarted = true;}}} finally {// 如果任務(wù)啟動失敗,則必須進(jìn)行清理,返回失敗if (! workerStarted)addWorkerFailed(w);}return workerStarted;}// 大概添加 worker 的框架明白了,重點對象是 Worker, 我們稍后再講// 現(xiàn)在先來看看,添加失敗的情況,如何進(jìn)行/*** Rolls back the worker thread creation.* - removes worker from workers, if present* - decrements worker count* - rechecks for termination, in case the existence of this* worker was holding up termination*/private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (w != null)workers.remove(w);// ctl 中的 workerCount - 1 , CAS 實現(xiàn)decrementWorkerCount();// 嘗試處理空閑線程tryTerminate();} finally {mainLock.unlock();}}/*** Decrements the workerCount field of ctl. This is called only on* abrupt termination of a thread (see processWorkerExit). Other* decrements are performed within getTask.*/private void decrementWorkerCount() {do {} while (! compareAndDecrementWorkerCount(ctl.get()));}// 停止可能啟動的 worker/*** Transitions to TERMINATED state if either (SHUTDOWN and pool* and queue empty) or (STOP and pool empty). If otherwise* eligible to terminate but workerCount is nonzero, interrupts an* idle worker to ensure that shutdown signals propagate. This* method must be called following any action that might make* termination possible -- reducing worker count or removing tasks* from the queue during shutdown. The method is non-private to* allow access from ScheduledThreadPoolExecutor.*/final void tryTerminate() {for (;;) {int c = ctl.get();// 線程池正在運行、正在清理、已關(guān)閉但隊列還未處理完,都不會進(jìn)行 terminate 操作if (isRunning(c) ||// c >= TIDYINGrunStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;if (workerCountOf(c) != 0) { // Eligible to terminate// 停止線程的兩個方式之一,只中斷一個 workerinterruptIdleWorkers(ONLY_ONE);return;}// 以下為整個線程池的后置操作final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 設(shè)置正在清理標(biāo)識if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {// 線程池已終止的鉤子方法,默認(rèn)實現(xiàn)為空terminated();} finally {ctl.set(ctlOf(TERMINATED, 0));// 此處 termination 為喚醒等待關(guān)閉的線程termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}}/*** Interrupts threads that might be waiting for tasks (as* indicated by not being locked) so they can check for* termination or configuration changes. Ignores* SecurityExceptions (in which case some threads may remain* uninterrupted).** @param onlyOne If true, interrupt at most one worker. This is* called only from tryTerminate when termination is otherwise* enabled but there are still other workers. In this case, at* most one waiting worker is interrupted to propagate shutdown* signals in case all threads are currently waiting.* Interrupting any arbitrary thread ensures that newly arriving* workers since shutdown began will also eventually exit.* To guarantee eventual termination, it suffices to always* interrupt only one idle worker, but shutdown() interrupts all* idle workers so that redundant workers exit promptly, not* waiting for a straggler task to finish.*/private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 迭代所有 workerfor (Worker w : workers) {Thread t = w.thread;// 獲取到 worker 的鎖之后,再進(jìn)行 interruptif (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}// 只中斷一個 worker, 立即返回, 不保證 interrupt 成功if (onlyOne)break;}} finally {mainLock.unlock();}}
2.2. 當(dāng)添加隊列成功后,發(fā)現(xiàn)線程池狀態(tài)變更,需要進(jìn)行移除隊列操作
/*** Removes this task from the executor's internal queue if it is* present, thus causing it not to be run if it has not already* started.** <p>This method may be useful as one part of a cancellation* scheme. It may fail to remove tasks that have been converted* into other forms before being placed on the internal queue. For* example, a task entered using {@code submit} might be* converted into a form that maintains {@code Future} status.* However, in such cases, method {@link #purge} may be used to* remove those Futures that have been cancelled.** @param task the task to remove* @return {@code true} if the task was removed*/public boolean remove(Runnable task) {// 此移除不一定能成功boolean removed = workQueue.remove(task);// 上面已經(jīng)看過,它會嘗試停止一個 worker 線程tryTerminate(); // In case SHUTDOWN and now emptyreturn removed;}
3. 添加失敗進(jìn)行執(zhí)行拒絕策略
/*** Invokes the rejected execution handler for the given command.* Package-protected for use by ScheduledThreadPoolExecutor.*/final void reject(Runnable command) {// 拒絕策略是在構(gòu)造方法時傳入的,默認(rèn)為 RejectedExecutionHandler// 即用戶只需實現(xiàn) rejectedExecution 方法,即可以自定義拒絕策略了handler.rejectedExecution(command, this);}
4. Worker 的工作機制
從上面的實現(xiàn)中,我們可以看到,主要是對 Worker 的添加和 workQueue 的添加,所以具體的工作是由誰完成呢?自然就是 Worker 了。
// Worker 的構(gòu)造方法,主要是接受一個 task, 可以為 null, 如果非null, 將在不久的將來被執(zhí)行// private final class Worker extends AbstractQueuedSynchronizer implements Runnable/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;// 將 Worker 自身當(dāng)作一個 任務(wù),綁定到 worker.thread 中// thread 啟動時,worker 就啟動了this.thread = getThreadFactory().newThread(this);}// Worker 的主要工作實現(xiàn),通過一個循環(huán)掃描實現(xiàn)/** Delegates main run loop to outer runWorker */public void run() {// 調(diào)用 ThreadPoolExecutor 外部實現(xiàn)的 runWorker 方法runWorker(this);}/*** Main worker run loop. Repeatedly gets tasks from queue and* executes them, while coping with a number of issues:** 1. We may start out with an initial task, in which case we* don't need to get the first one. Otherwise, as long as pool is* running, we get tasks from getTask. If it returns null then the* worker exits due to changed pool state or configuration* parameters. Other exits result from exception throws in* external code, in which case completedAbruptly holds, which* usually leads processWorkerExit to replace this thread.** 2. Before running any task, the lock is acquired to prevent* other pool interrupts while the task is executing, and then we* ensure that unless pool is stopping, this thread does not have* its interrupt set.** 3. Each task run is preceded by a call to beforeExecute, which* might throw an exception, in which case we cause thread to die* (breaking loop with completedAbruptly true) without processing* the task.** 4. Assuming beforeExecute completes normally, we run the task,* gathering any of its thrown exceptions to send to afterExecute.* We separately handle RuntimeException, Error (both of which the* specs guarantee that we trap) and arbitrary Throwables.* Because we cannot rethrow Throwables within Runnable.run, we* wrap them within Errors on the way out (to the thread's* UncaughtExceptionHandler). Any thrown exception also* conservatively causes thread to die.** 5. After task.run completes, we call afterExecute, which may* also throw an exception, which will also cause thread to* die. According to JLS Sec 14.20, this exception is the one that* will be in effect even if task.run throws.** The net effect of the exception mechanics is that afterExecute* and the thread's UncaughtExceptionHandler have as accurate* information as we can provide about any problems encountered by* user code.** @param w the worker*/final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 不停地從 workQueue 中獲取任務(wù),然后執(zhí)行,就是這么個邏輯// getTask() 會阻塞式獲取,所以 Worker 往往不會立即退出while (task != null || (task = getTask()) != null) {// 執(zhí)行過程中是不允許并發(fā)的,即同時只能一個 task 在運行,此時也不允許進(jìn)行 interruptw.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt// 檢測是否已被線程池是否停止 或者當(dāng)前 worker 被中斷// STOP = 1 << COUNT_BITS;if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())// 中斷信息傳遞wt.interrupt();try {// 任務(wù)開始前 切點,默認(rèn)為空執(zhí)行beforeExecute(wt, task);Throwable thrown = null;try {// 直接調(diào)用任務(wù)的run方法, 具體的返回結(jié)果,會被 FutureTask 封裝到 某個變量中// 可以參考以前的文章 (FutureTask是怎樣獲取到異步執(zhí)行結(jié)果的?https://www.cnblogs.com/yougewe/p/11666284.html)task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 任務(wù)開始后 切點,默認(rèn)為空執(zhí)行afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}// 正常退出,有必要的話,可能重新將 Worker 添加進(jìn)來completedAbruptly = false;} finally {// 處理退出后下一步操作,可能重新添加 WorkerprocessWorkerExit(w, completedAbruptly);}}/*** Performs cleanup and bookkeeping for a dying worker. Called* only from worker threads. Unless completedAbruptly is set,* assumes that workerCount has already been adjusted to account* for exit. This method removes thread from worker set, and* possibly terminates the pool or replaces the worker if either* it exited due to user task exception or if fewer than* corePoolSize workers are running or queue is non-empty but* there are no workers.** @param w the worker* @param completedAbruptly if the worker died due to user exception*/private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}tryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {// 在 Worker 正常退出的情況下,檢查是否超時導(dǎo)致,維持最小線程數(shù)if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;// 如果滿足最小線程要求,則直接返回if (workerCountOf(c) >= min)return; // replacement not needed}// 否則再添加一個Worker到線程池中備用// 非正常退出,會直接再添加一個WorkeraddWorker(null, false);}}/*** Performs blocking or timed wait for a task, depending on* current configuration settings, or returns null if this worker* must exit because of any of:* 1. There are more than maximumPoolSize workers (due to* a call to setMaximumPoolSize).* 2. The pool is stopped.* 3. The pool is shutdown and the queue is empty.* 4. This worker timed out waiting for a task, and timed-out* workers are subject to termination (that is,* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})* both before and after the timed wait, and if the queue is* non-empty, this worker is not the last thread in the pool.** @return task, or null if the worker must exit, in which case* workerCount is decremented*/private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.// 如果進(jìn)行了 shutdown, 且隊列為空, 則需要將 worker 退出if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {// do {} while (! compareAndDecrementWorkerCount(ctl.get()));decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 線程數(shù)據(jù)大于最大允許線程,需要刪除多余的 Workerif ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 如果開戶了超時刪除功能,則使用 poll, 否則使用 take() 進(jìn)行阻塞獲取Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();// 獲取到任務(wù),則可以進(jìn)行執(zhí)行了if (r != null)return r;// 如果有超時設(shè)置,則會在下一循環(huán)時退出timedOut = true;}// 忽略中斷異常// 在這種情況下,Worker如何響應(yīng)外部的中斷請求呢???思考catch (InterruptedException retry) {timedOut = false;}}}
所以,Worker的作用就體現(xiàn)出來了,一個循環(huán)取任務(wù)執(zhí)行任務(wù)過程:
1. 有一個主循環(huán)一直進(jìn)行任務(wù)的獲取;
2. 針對有超時的設(shè)置,會使用poll進(jìn)行獲取任務(wù),如果超時,則 Worker 將會退出循環(huán)結(jié)束線程;
3. 無超時的設(shè)置,則會使用 take 進(jìn)行阻塞式獲取,直到有值;
4. 獲取任務(wù)執(zhí)行前置+業(yè)務(wù)+后置任務(wù);
5. 當(dāng)獲取到null的任務(wù)之后,當(dāng)前Worker將會結(jié)束;
6. 當(dāng)前Worker結(jié)束后,將會判斷是否有必要維護(hù)最低Worker數(shù),從而決定是否再添加Worker進(jìn)來。
還是借用一個網(wǎng)上同學(xué)比較通用的一個圖來表述下 Worker/ThreadPoolExecutor 的工作流程吧(已經(jīng)很完美,不需要再造這輪子了)

5. shutdown 操作實現(xiàn)
ThreadPoolExecutor 是通過 ctl 這個變量進(jìn)行全局狀態(tài)維護(hù)的,shutdown 在線程池中也是表現(xiàn)為一個狀態(tài),所以應(yīng)該是比較簡單的。
/*** Initiates an orderly shutdown in which previously submitted* tasks are executed, but no new tasks will be accepted.* Invocation has no additional effect if already shut down.** <p>This method does not wait for previously submitted tasks to* complete execution. Use {@link #awaitTermination awaitTermination}* to do that.** @throws SecurityException {@inheritDoc}*/public void shutdown() {// 為保證線程安全,使用 mainLockfinal ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// SecurityManager 檢查checkShutdownAccess();// 設(shè)置狀態(tài)為 SHUTDOWNadvanceRunState(SHUTDOWN);// 中斷空閑的 Worker, 即相當(dāng)于依次關(guān)閉每個空閑線程interruptIdleWorkers();// 關(guān)閉鉤子,默認(rèn)實現(xiàn)為空操作,為方便子類實現(xiàn)自定義清理功能onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}// 再tryTerminate();}/*** Transitions runState to given target, or leaves it alone if* already at least the given target.** @param targetState the desired state, either SHUTDOWN or STOP* (but not TIDYING or TERMINATED -- use tryTerminate for that)*/private void advanceRunState(int targetState) {for (;;) {int c = ctl.get();// 自身CAS更新成功或者被其他線程更新成功if (runStateAtLeast(c, targetState) ||ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))break;}}// 關(guān)閉空閑線程(非 running 狀態(tài))/*** Common form of interruptIdleWorkers, to avoid having to* remember what the boolean argument means.*/private void interruptIdleWorkers() {// 上文已介紹, 此處 ONLY_ONE 為 false, 即是最大可能地中斷所有 WorkerinterruptIdleWorkers(false);}與 shutdown 對應(yīng)的,有一個 shutdownNow, 其語義是 立即停止所有任務(wù)。/*** Attempts to stop all actively executing tasks, halts the* processing of waiting tasks, and returns a list of the tasks* that were awaiting execution. These tasks are drained (removed)* from the task queue upon return from this method.** <p>This method does not wait for actively executing tasks to* terminate. Use {@link #awaitTermination awaitTermination} to* do that.** <p>There are no guarantees beyond best-effort attempts to stop* processing actively executing tasks. This implementation* cancels tasks via {@link Thread#interrupt}, so any task that* fails to respond to interrupts may never terminate.** @throws SecurityException {@inheritDoc}*/public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 與 shutdown 的差別,設(shè)置的狀態(tài)不一樣advanceRunState(STOP);// 強行中斷線程interruptWorkers();// 將未完成的任務(wù)返回tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;}/*** Interrupts all threads, even if active. Ignores SecurityExceptions* (in which case some threads may remain uninterrupted).*/private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers)// 調(diào)用 worker 的提供的中斷方法w.interruptIfStarted();} finally {mainLock.unlock();}}// ThreadPoolExecutor.Worker#interruptIfStartedvoid interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {// 直接調(diào)用任務(wù)的 interruptt.interrupt();} catch (SecurityException ignore) {}}}
6. invokeAll 的實現(xiàn)方式
invokeAll, 望文生義,即是調(diào)用所有給定的任務(wù)。想來應(yīng)該是一個個地添加任務(wù)到線程池隊列吧。
// invokeAll 的方法直接在抽象方便中就實現(xiàn)了,它的語義是同時執(zhí)行n個任務(wù),并同步等待結(jié)果返回// java.util.concurrent.AbstractExecutorService#invokeAll(java.util.Collection<? extends java.util.concurrent.Callable<T>>)public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {if (tasks == null)throw new NullPointerException();ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());boolean done = false;try {for (Callable<T> t : tasks) {RunnableFuture<T> f = newTaskFor(t);futures.add(f);// 依次調(diào)用各子類的實現(xiàn),添加任務(wù)execute(f);}for (int i = 0, size = futures.size(); i < size; i++) {Future<T> f = futures.get(i);if (!f.isDone()) {try {// 依次等待執(zhí)行結(jié)果f.get();} catch (CancellationException ignore) {} catch (ExecutionException ignore) {}}}done = true;return futures;} finally {if (!done)for (int i = 0, size = futures.size(); i < size; i++)futures.get(i).cancel(true);}}
實現(xiàn)很簡單,都是些外圍調(diào)用。
7. ThreadPoolExecutor 的狀態(tài)值的設(shè)計
通過上面的過程,可以看到,整個ThreadPoolExecutor 非狀態(tài)的依賴是非常強的。所以一個好的狀態(tài)值的設(shè)計就顯得很重要了,runState 代表線程池或者 Worker 的運行狀態(tài)。如下:
// runState is stored in the high-order bits// 整個狀態(tài)使值使用 ctl 的高三位值進(jìn)行控制, COUNT_BITS=29// 1110 0000 0000 0000private static final int RUNNING = -1 << COUNT_BITS;// 0000 0000 0000 0000private static final int SHUTDOWN = 0 << COUNT_BITS;// 0010 0000 0000 0000private static final int STOP = 1 << COUNT_BITS;// 0100 0000 0000 0000private static final int TIDYING = 2 << COUNT_BITS;// 0110 0000 0000 0000private static final int TERMINATED = 3 << COUNT_BITS;// 整個狀態(tài)值的大小順序主: RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED// 而低 29位,則用來保存 worker 的數(shù)量,當(dāng)worker增加時,只要將整個 ctl 增加即可。// 0001 1111 1111 1111, 即是最大的 worker 數(shù)量private static final int CAPACITY = (1 << COUNT_BITS) - 1;// 整個 ctl 描述為一個 AtomicInteger, 功能如下:/*** The main pool control state, ctl, is an atomic integer packing* two conceptual fields* workerCount, indicating the effective number of threads* runState, indicating whether running, shutting down etc** In order to pack them into one int, we limit workerCount to* (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2* billion) otherwise representable. If this is ever an issue in* the future, the variable can be changed to be an AtomicLong,* and the shift/mask constants below adjusted. But until the need* arises, this code is a bit faster and simpler using an int.** The workerCount is the number of workers that have been* permitted to start and not permitted to stop. The value may be* transiently different from the actual number of live threads,* for example when a ThreadFactory fails to create a thread when* asked, and when exiting threads are still performing* bookkeeping before terminating. The user-visible pool size is* reported as the current size of the workers set.** The runState provides the main lifecycle control, taking on values:** RUNNING: Accept new tasks and process queued tasks* SHUTDOWN: Don't accept new tasks, but process queued tasks* STOP: Don't accept new tasks, don't process queued tasks,* and interrupt in-progress tasks* TIDYING: All tasks have terminated, workerCount is zero,* the thread transitioning to state TIDYING* will run the terminated() hook method* TERMINATED: terminated() has completed** The numerical order among these values matters, to allow* ordered comparisons. The runState monotonically increases over* time, but need not hit each state. The transitions are:** RUNNING -> SHUTDOWN* On invocation of shutdown(), perhaps implicitly in finalize()* (RUNNING or SHUTDOWN) -> STOP* On invocation of shutdownNow()* SHUTDOWN -> TIDYING* When both queue and pool are empty* STOP -> TIDYING* When pool is empty* TIDYING -> TERMINATED* When the terminated() hook method has completed** Threads waiting in awaitTermination() will return when the* state reaches TERMINATED.** Detecting the transition from SHUTDOWN to TIDYING is less* straightforward than you'd like because the queue may become* empty after non-empty and vice versa during SHUTDOWN state, but* we can only terminate if, after seeing that it is empty, we see* that workerCount is 0 (which sometimes entails a recheck -- see* below).*/private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
8. awaitTermination 等待關(guān)閉完成
從上面的 shutdown, 可以看到,只是寫了 SHUTDOWN 標(biāo)識后,嘗試盡可能地中斷停止Worker線程,但并不保證中斷成功。要想保證停止完成,需要有另外的機制來保證。從 awaitTermination 的語義來說,它是能保證任務(wù)停止完成的,那么它是如何保證的呢?
// ThreadPoolExecutor.awaitTermination()public boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (;;) {// 只是循環(huán) ctl 狀態(tài), 只要 狀態(tài)為 TERMINATED 狀態(tài),則說明已經(jīng)關(guān)閉成功// 此處 termination 的狀態(tài)觸發(fā)是在 tryTerminate 中觸發(fā)的if (runStateAtLeast(ctl.get(), TERMINATED))return true;if (nanos <= 0)return false;nanos = termination.awaitNanos(nanos);}} finally {mainLock.unlock();}}
看起來, awaitTermination 并沒有什么特殊操作,而是一直在等待。所以 TERMINATED 是 Worker 自行發(fā)生的動作。
那是在哪里做的操作呢?其實是在獲取任務(wù)的時候,會檢測當(dāng)前狀態(tài)是否是 SHUTDOWN, 如果是SHUTDOWN且 隊列為空,則會觸發(fā)獲取任務(wù)的返回null.從而結(jié)束當(dāng)前 Worker.
Worker 在結(jié)束前會調(diào)用 processWorkerExit() 方法,里面會再次調(diào)用 tryTerminate(), 當(dāng)所有 Worker 都運行到這個點后, awaitTermination() 就會收到通知了。(注意: processWorkerExit() 會在每次運行后進(jìn)行 addWorker() 嘗試,但是在 SHUTDOWN 狀態(tài)的添加操作總是失敗的,所以不用考慮)
到此,你是否可以解答前面的幾個問題了呢?

騰訊、阿里、滴滴后臺面試題匯總總結(jié) — (含答案)
面試:史上最全多線程面試題 !
最新阿里內(nèi)推Java后端面試題
JVM難學(xué)?那是因為你沒認(rèn)真看完這篇文章

關(guān)注作者微信公眾號 —《JAVA爛豬皮》
了解更多java后端架構(gòu)知識以及最新面試寶典


看完本文記得給作者點贊+在看哦~~~大家的支持,是作者源源不斷出文的動力
作者:等你歸去來
出處:https://www.cnblogs.com/yougewe/p/12267274.html
