手撕源碼!線程池核心組件源碼剖析
本文選自 Doocs 開(kāi)源社區(qū)旗下“源碼獵人”項(xiàng)目,作者 AmyliaY。
項(xiàng)目將會(huì)持續(xù)更新,歡迎 Star 關(guān)注。
項(xiàng)目地址:https://github.com/doocs/source-code-hunter
線程池核心組件圖解
看源碼之前,先了解一下該組件 最主要的幾個(gè) 接口、抽象類和實(shí)現(xiàn)類的結(jié)構(gòu)關(guān)系。

該組件中,Executor 和 ExecutorService 接口 定義了線程池最核心的幾個(gè)方法,提交任務(wù) submit ()、關(guān)閉線程池 shutdown()。抽象類 AbstractExecutorService 主要對(duì)公共行為 submit()系列方法進(jìn)行了實(shí)現(xiàn),這些 submit()方法 的實(shí)現(xiàn)使用了 模板方法模式,其中調(diào)用的 execute()方法 是未實(shí)現(xiàn)的 來(lái)自 Executor 接口 的方法。實(shí)現(xiàn)類 ThreadPoolExecutor 則對(duì)線程池進(jìn)行了具體而復(fù)雜的實(shí)現(xiàn)。
另外還有一個(gè)常見(jiàn)的工具類 Executors,里面為開(kāi)發(fā)者封裝了一些可以直接拿來(lái)用的線程池。
源碼賞析
話不多說(shuō),直接上源碼。(這里只看最主要的代碼部分)
Executor 和 ExecutorService 接口
public interface Executor {/*** 在將來(lái)的某個(gè)時(shí)間執(zhí)行給定的 Runnable。該 Runnable 可以在新線程、池線程或調(diào)用線程中執(zhí)行。*/void execute(Runnable command);}public interface ExecutorService extends Executor {/*** 優(yōu)雅關(guān)閉,該關(guān)閉會(huì)繼續(xù)執(zhí)行完以前提交的任務(wù),但不再接受新任務(wù)。*/void shutdown();/*** 提交一個(gè)有返回值的任務(wù),并返回該任務(wù)的 未來(lái)執(zhí)行完成后的結(jié)果。* Future的 get()方法 將在成功完成后返回任務(wù)的結(jié)果。*/Future submit(Callable task); Future submit(Runnable task, T result); Future> submit(Runnable task);}
AbstractExecutorService 抽象類
/*** 該抽象類最主要的內(nèi)容就是,實(shí)現(xiàn)了 ExecutorService 中的 submit()系列方法*/public abstract class AbstractExecutorService implements ExecutorService {/*** 提交任務(wù) 進(jìn)行執(zhí)行,返回獲取未來(lái)結(jié)果的 Future對(duì)象。* 這里使用了 “模板方法模式”,execute()方法來(lái)自 Executor接口,該抽象類中并未進(jìn)行實(shí)現(xiàn),* 而是交由子類具體實(shí)現(xiàn)。*/public Future> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;}public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}}
ThreadPoolExecutor
public class ThreadPoolExecutor extends AbstractExecutorService {/*** *************** ** 主要屬性 *** ***************//** 阻塞隊(duì)列 */private final BlockingQueue<Runnable> workQueue;/** 用于創(chuàng)建線程的 線程工廠 */private volatile ThreadFactory threadFactory;/** 核心線程數(shù) */private volatile int corePoolSize;/** 最大線程數(shù) */private volatile int maximumPoolSize;/*** *************** ** 構(gòu)造方法 *** ***************//** 最后都使用了最后一個(gè)構(gòu)造方法的實(shí)現(xiàn) */public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);}public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);}public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}/*** *************** ** 主要實(shí)現(xiàn) *** ***************//** 執(zhí)行 Runnable任務(wù) */public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** 分三步進(jìn)行:** 1、如果運(yùn)行的線程少于 corePoolSize,嘗試開(kāi)啟一個(gè)新的線程;否則嘗試進(jìn)入工作隊(duì)列** 2. 如果工作隊(duì)列沒(méi)滿,則進(jìn)入工作隊(duì)列;否則 判斷是否超出最大線程數(shù)** 3. 如果未超出最大線程數(shù),則嘗試開(kāi)啟一個(gè)新的線程;否則 按飽和策略處理無(wú)法執(zhí)行的任務(wù)*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}/*** 優(yōu)雅關(guān)閉,在其中執(zhí)行以前提交的任務(wù),但不接受新任務(wù)。如果已關(guān)閉,則調(diào)用沒(méi)有其他效果。*/public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();}}
ThreadPoolExecutor 中的 execute()方法 執(zhí)行 Runnable 任務(wù) 的流程邏輯可以用下圖表示。

工具類 Executors
看類名也知道,它最主要的作用就是提供 static 的工具方法,為開(kāi)發(fā)者提供各種封裝好的 具有各自特性的線程池。
public class Executors {/*** 創(chuàng)建一個(gè)固定線程數(shù)量的線程池*/public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}/*** 創(chuàng)建一個(gè)單線程的線程池*/public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}/*** 創(chuàng)建一個(gè)緩存的,可動(dòng)態(tài)伸縮的線程池。* 可以看出來(lái):核心線程數(shù)為0,最大線程數(shù)為Integer.MAX_VALUE,如果任務(wù)數(shù)在某一瞬間暴漲,* 這個(gè)線程池很可能會(huì)把 服務(wù)器撐爆。* 另外需要注意的是,它們底層都是使用了 ThreadPoolExecutor,只不過(guò)幫我們配好了參數(shù)*/public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}}
全文完!
希望本文對(duì)大家有所幫助。如果感覺(jué)本文有幫助,有勞轉(zhuǎn)發(fā)或點(diǎn)一下“在看”!讓更多人收獲知識(shí)!
長(zhǎng)按識(shí)別下圖二維碼,關(guān)注公眾號(hào)「Doocs 開(kāi)源社區(qū)」,第一時(shí)間跟你們分享好玩、實(shí)用的技術(shù)文章與業(yè)內(nèi)最新資訊。
