<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          手撕源碼!線程池核心組件源碼剖析

          共 4126字,需瀏覽 9分鐘

           ·

          2020-08-19 03:52

          本文選自 Doocs 開(kāi)源社區(qū)旗下“源碼獵人”項(xiàng)目,作者 AmyliaY。

          項(xiàng)目將會(huì)持續(xù)更新,歡迎 Star 關(guān)注。

          項(xiàng)目地址:https://github.com/doocs/source-code-hunter

          線程池核心組件圖解

          看源碼之前,先了解一下該組件 最主要的幾個(gè) 接口、抽象類和實(shí)現(xiàn)類的結(jié)構(gòu)關(guān)系。

          該組件中,Executor 和 ExecutorService 接口 定義了線程池最核心的幾個(gè)方法,提交任務(wù) submit ()、關(guān)閉線程池 shutdown()。抽象類 AbstractExecutorService 主要對(duì)公共行為 submit()系列方法進(jìn)行了實(shí)現(xiàn),這些 submit()方法 的實(shí)現(xiàn)使用了 模板方法模式,其中調(diào)用的 execute()方法 是未實(shí)現(xiàn)的 來(lái)自 Executor 接口 的方法。實(shí)現(xiàn)類 ThreadPoolExecutor 則對(duì)線程池進(jìn)行了具體而復(fù)雜的實(shí)現(xiàn)。

          另外還有一個(gè)常見(jiàn)的工具類 Executors,里面為開(kāi)發(fā)者封裝了一些可以直接拿來(lái)用的線程池。

          源碼賞析

          話不多說(shuō),直接上源碼。(這里只看最主要的代碼部分)

          Executor 和 ExecutorService 接口

          public interface Executor {    /**     * 在將來(lái)的某個(gè)時(shí)間執(zhí)行給定的 Runnable。該 Runnable 可以在新線程、池線程或調(diào)用線程中執(zhí)行。     */    void execute(Runnable command);}public interface ExecutorService extends Executor {    /**     * 優(yōu)雅關(guān)閉,該關(guān)閉會(huì)繼續(xù)執(zhí)行完以前提交的任務(wù),但不再接受新任務(wù)。     */    void shutdown();    /**     * 提交一個(gè)有返回值的任務(wù),并返回該任務(wù)的 未來(lái)執(zhí)行完成后的結(jié)果。     * Future get()方法 將在成功完成后返回任務(wù)的結(jié)果。     */     Future submit(Callable task);     Future submit(Runnable task, T result);    Future submit(Runnable task);}

          AbstractExecutorService 抽象類

          /** * 該抽象類最主要的內(nèi)容就是,實(shí)現(xiàn)了 ExecutorService 中的 submit()系列方法 */public abstract class AbstractExecutorService implements ExecutorService {    /**     * 提交任務(wù) 進(jìn)行執(zhí)行,返回獲取未來(lái)結(jié)果的 Future對(duì)象。     * 這里使用了 “模板方法模式”,execute()方法來(lái)自 Executor接口,該抽象類中并未進(jìn)行實(shí)現(xiàn),     * 而是交由子類具體實(shí)現(xiàn)。     */    public Future submit(Runnable task) {        if (task == null) throw new NullPointerException();        RunnableFuture<Void> ftask = newTaskFor(task, null);        execute(ftask);        return ftask;    }    public <T> Future<T> submit(Runnable task, T result) {        if (task == null) throw new NullPointerException();        RunnableFuture<T> ftask = newTaskFor(task, result);        execute(ftask);        return ftask;    }    public <T> Future<T> submit(Callable<T> task) {        if (task == null) throw new NullPointerException();        RunnableFuture<T> ftask = newTaskFor(task);        execute(ftask);        return ftask;    }}

          ThreadPoolExecutor

          public class ThreadPoolExecutor extends AbstractExecutorService {    /**     * **************     * ** 主要屬性 **     * **************     */    /** 阻塞隊(duì)列 */    private final BlockingQueue<Runnable> workQueue;    /** 用于創(chuàng)建線程的 線程工廠 */    private volatile ThreadFactory threadFactory;    /** 核心線程數(shù) */    private volatile int corePoolSize;    /** 最大線程數(shù) */    private volatile int maximumPoolSize;    /**     * **************     * ** 構(gòu)造方法 **     * **************     */    /** 最后都使用了最后一個(gè)構(gòu)造方法的實(shí)現(xiàn) */    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,             Executors.defaultThreadFactory(), defaultHandler);    }    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,             threadFactory, defaultHandler);    }    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              RejectedExecutionHandler handler) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,             Executors.defaultThreadFactory(), handler);    }    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory,                              RejectedExecutionHandler handler) {        if (corePoolSize < 0 ||            maximumPoolSize <= 0 ||            maximumPoolSize < corePoolSize ||            keepAliveTime < 0)            throw new IllegalArgumentException();        if (workQueue == null || threadFactory == null || handler == null)            throw new NullPointerException();        this.corePoolSize = corePoolSize;        this.maximumPoolSize = maximumPoolSize;        this.workQueue = workQueue;        this.keepAliveTime = unit.toNanos(keepAliveTime);        this.threadFactory = threadFactory;        this.handler = handler;    }    /**     * **************     * ** 主要實(shí)現(xiàn) **     * **************     */    /** 執(zhí)行 Runnable任務(wù) */    public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        /*         * 分三步進(jìn)行:         *         * 1、如果運(yùn)行的線程少于 corePoolSize,嘗試開(kāi)啟一個(gè)新的線程;否則嘗試進(jìn)入工作隊(duì)列         *         * 2. 如果工作隊(duì)列沒(méi)滿,則進(jìn)入工作隊(duì)列;否則 判斷是否超出最大線程數(shù)         *         * 3. 如果未超出最大線程數(shù),則嘗試開(kāi)啟一個(gè)新的線程;否則 按飽和策略處理無(wú)法執(zhí)行的任務(wù)         */        int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;            c = ctl.get();        }        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        else if (!addWorker(command, false))            reject(command);    }    /**     * 優(yōu)雅關(guān)閉,在其中執(zhí)行以前提交的任務(wù),但不接受新任務(wù)。如果已關(guān)閉,則調(diào)用沒(méi)有其他效果。     */    public void shutdown() {        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            checkShutdownAccess();            advanceRunState(SHUTDOWN);            interruptIdleWorkers();            onShutdown(); // hook for ScheduledThreadPoolExecutor        } finally {            mainLock.unlock();        }        tryTerminate();    }}

          ThreadPoolExecutor 中的 execute()方法 執(zhí)行 Runnable 任務(wù) 的流程邏輯可以用下圖表示。

          工具類 Executors

          看類名也知道,它最主要的作用就是提供 static 的工具方法,為開(kāi)發(fā)者提供各種封裝好的 具有各自特性的線程池。

          public class Executors {    /**     * 創(chuàng)建一個(gè)固定線程數(shù)量的線程池     */    public static ExecutorService newFixedThreadPool(int nThreads) {        return new ThreadPoolExecutor(nThreads, nThreads,                                      0L, TimeUnit.MILLISECONDS,                                      new LinkedBlockingQueue<Runnable>());    }    /**     * 創(chuàng)建一個(gè)單線程的線程池     */    public static ExecutorService newSingleThreadExecutor() {        return new FinalizableDelegatedExecutorService            (new ThreadPoolExecutor(1, 1,                                    0L, TimeUnit.MILLISECONDS,                                    new LinkedBlockingQueue<Runnable>()));    }    /**     * 創(chuàng)建一個(gè)緩存的,可動(dòng)態(tài)伸縮的線程池。     * 可以看出來(lái):核心線程數(shù)為0,最大線程數(shù)為Integer.MAX_VALUE,如果任務(wù)數(shù)在某一瞬間暴漲,     * 這個(gè)線程池很可能會(huì)把 服務(wù)器撐爆。     * 另外需要注意的是,它們底層都是使用了 ThreadPoolExecutor,只不過(guò)幫我們配好了參數(shù)     */    public static ExecutorService newCachedThreadPool() {        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                      60L, TimeUnit.SECONDS,                                      new SynchronousQueue<Runnable>());    }}


          全文完!

          希望本文對(duì)大家有所幫助。如果感覺(jué)本文有幫助,有勞轉(zhuǎn)發(fā)或點(diǎn)一下“在看”!讓更多人收獲知識(shí)!


          長(zhǎng)按識(shí)別下圖二維碼,關(guān)注公眾號(hào)「Doocs 開(kāi)源社區(qū)」,第一時(shí)間跟你們分享好玩、實(shí)用的技術(shù)文章與業(yè)內(nèi)最新資訊。



          瀏覽 39
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  免费看黄色录像 | 久久天堂AV综合合色蜜桃网 | 日韩欧美一级 | 日韩人妻无码一区二区三区99 | 中文字幕人妻一区二区 |