<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          線程池技術(shù)之:ThreadPoolExecutor 源碼解析

          共 40089字,需瀏覽 81分鐘

           ·

          2021-03-13 01:09

          走過路過不要錯過

          點擊藍(lán)字關(guān)注我們


          java中的所說的線程池,一般都是圍繞著 ThreadPoolExecutor 來展開的。其他的實現(xiàn)基本都是基于它,或者模仿它的。所以只要理解 ThreadPoolExecutor, 就相當(dāng)于完全理解了線程池的精髓。

          其實要理解一個東西,一般地,我們最好是要抱著自己的疑問或者理解去的。否則,往往收獲甚微。

           理解 ThreadPoolExecutor, 我們可以先理解一個線程池的意義: 本質(zhì)上是提供預(yù)先定義好的n個線程,供調(diào)用方直接運行任務(wù)的一個工具。

          線程池解決的問題:

          1. 提高任務(wù)執(zhí)行的響應(yīng)速度,降低資源消耗。任務(wù)執(zhí)行時,直接立即使用線程池提供的線程運行,避免了臨時創(chuàng)建線程的CPU/內(nèi)存開銷,達(dá)到快速響應(yīng)的效果。

          2. 提高線程的可管理性。線程總數(shù)可預(yù)知,避免用戶主動創(chuàng)建無限多線程導(dǎo)致死機風(fēng)險,還可以進(jìn)行線程統(tǒng)一的分配、調(diào)優(yōu)和監(jiān)控。

          3. 避免對資源的過度使用。在超出預(yù)期的請求任務(wù)情況,響應(yīng)策略可控。

          線程池提供的核心接口:

          要想使用線程池,自然是要理解其接口的。一般我們使用 ExecotorService 進(jìn)行線程池的調(diào)用。然而,我們并不針對初學(xué)者。

          整體的接口如下:

          我們就挑幾個常用接口探討下:

          submit(Runnable task): 提交一個無需返回結(jié)果的任務(wù)。
          submit(Callable<T> task): 提交一個有返回結(jié)果的任務(wù)。
          invokeAll(Collection<? extends Callable<T>> tasks, long, TimeUnit): 同時執(zhí)行n個任務(wù)并返回結(jié)果列表。
          shutdown(): 關(guān)閉線程程池。
          awaitTermination(long timeout, TimeUnit unit): 等待關(guān)閉結(jié)果,最長不超過timeout時間。

          以上是ThreadPoolExector 提供的特性,針對以上特性。

          我們應(yīng)該要有自己的幾個實現(xiàn)思路或疑問:

          1. 線程池如何接受任務(wù)?

          2. 線程如何運行任務(wù)?

          3. 線程池如何關(guān)閉?

           接下來,就讓我們帶著疑問去看實現(xiàn)吧。

          ThreadPoolExecutor 核心實現(xiàn)原理

          1. 線程池的處理流程

          我們首先重點要看的是,如何執(zhí)行提交的任務(wù)。我可以通過下圖來看看。

          總結(jié)描述下就是:

              1. 判斷核心線程池是否已滿,如果不是,則創(chuàng)建線程執(zhí)行任務(wù)
              2. 如果核心線程池滿了,判斷隊列是否滿了,如果隊列沒滿,將任務(wù)放在隊列中
              3. 如果隊列滿了,則判斷線程池是否已滿,如果沒滿,創(chuàng)建線程執(zhí)行任務(wù)
              4. 如果線程池也滿了,則按照拒絕策略對任務(wù)進(jìn)行處理

           

          另外,我們來看一下 ThreadPoolExecutor 的構(gòu)造方法,因為這里會體現(xiàn)出每個屬性的含義。

              /**     * Creates a new {@code ThreadPoolExecutor} with the given initial     * parameters.     *     * @param corePoolSize the number of threads to keep in the pool, even     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set     * @param maximumPoolSize the maximum number of threads to allow in the     *        pool     * @param keepAliveTime when the number of threads is greater than     *        the core, this is the maximum time that excess idle threads     *        will wait for new tasks before terminating.     * @param unit the time unit for the {@code keepAliveTime} argument     * @param workQueue the queue to use for holding tasks before they are     *        executed.  This queue will hold only the {@code Runnable}     *        tasks submitted by the {@code execute} method.     * @param threadFactory the factory to use when the executor     *        creates a new thread     * @param handler the handler to use when execution is blocked     *        because the thread bounds and queue capacities are reached     * @throws IllegalArgumentException if one of the following holds:<br>     *         {@code corePoolSize < 0}<br>     *         {@code keepAliveTime < 0}<br>     *         {@code maximumPoolSize <= 0}<br>     *         {@code maximumPoolSize < corePoolSize}     * @throws NullPointerException if {@code workQueue}     *         or {@code threadFactory} or {@code handler} is null     */    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory,                              RejectedExecutionHandler handler) {        if (corePoolSize < 0 ||            maximumPoolSize <= 0 ||            maximumPoolSize < corePoolSize ||            keepAliveTime < 0)            throw new IllegalArgumentException();        if (workQueue == null || threadFactory == null || handler == null)            throw new NullPointerException();        this.corePoolSize = corePoolSize;        this.maximumPoolSize = maximumPoolSize;        this.workQueue = workQueue;        this.keepAliveTime = unit.toNanos(keepAliveTime);        this.threadFactory = threadFactory;        this.handler = handler;    }

          從構(gòu)造方法可以看出 ThreadPoolExecutor 的主要參數(shù) 7 個,在其注釋上也有說明功能,咱們翻譯下每個參數(shù)的功能:

              corePoolSize: 線程池核心線程數(shù)(平時保留的線程數(shù)),使用時機: 在初始時刻,每次請求進(jìn)來都會創(chuàng)建一個線程直到達(dá)到該size    maximumPoolSize: 線程池最大線程數(shù),使用時機: 當(dāng)workQueue都放不下時,啟動新線程,直到最大線程數(shù),此時到達(dá)線程池的極限    keepAliveTime/unit: 超出corePoolSize數(shù)量的線程的保留時間,unit為時間單位    workQueue: 阻塞隊列,當(dāng)核心線程數(shù)達(dá)到或者超出后,會先嘗試將任務(wù)放入該隊列由各線程自行消費;          ArrayBlockingQueue: 構(gòu)造函數(shù)一定要傳大小        LinkedBlockingQueue: 構(gòu)造函數(shù)不傳大小會默認(rèn)為65536(Integer.MAX_VALUE ),當(dāng)大量請求任務(wù)時,容易造成 內(nèi)存耗盡。        SynchronousQueue: 同步隊列,一個沒有存儲空間的阻塞隊列 ,將任務(wù)同步交付給工作線程。        PriorityBlockingQueue: 優(yōu)先隊列    threadFactory:線程工廠,用于線程需要創(chuàng)建時,調(diào)用其newThread()生產(chǎn)新線程使用    handler: 飽和策略,當(dāng)隊列已放不下任務(wù),且創(chuàng)建的線程已達(dá)到 maximum 后,則不能再處理任務(wù),直接將任務(wù)交給飽和策略        AbortPolicy: 直接拋棄(默認(rèn))        CallerRunsPolicy: 用調(diào)用者的線程執(zhí)行任務(wù)        DiscardOldestPolicy: 拋棄隊列中最久的任務(wù)        DiscardPolicy: 拋棄當(dāng)前任務(wù)

          2. submit 流程詳解

          當(dāng)調(diào)用 submit 方法,就是向線程池中提交一個任務(wù),處理流程如步驟1所示。但是我們需要更深入理解。

          submit 方法是定義在 AbstractExecutorService 中,最終調(diào)用 ThreadPoolExecutor 的 execute 方法,即是模板方法模式的應(yīng)用。

              // java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable, T)    /**     * @throws RejectedExecutionException {@inheritDoc}     * @throws NullPointerException       {@inheritDoc}     */    public <T> Future<T> submit(Runnable task, T result) {        if (task == null) throw new NullPointerException();        // 封裝任務(wù)和返回結(jié)果為 RunnableFuture, 統(tǒng)一交由具體的子類執(zhí)行        RunnableFuture<T> ftask = newTaskFor(task, result);        // execute 將會調(diào)用 ThreadPoolExecutor 的實現(xiàn),是我們討論的重要核心        execute(ftask);        return ftask;    }    // FutureTask 是個重要的線程池組件,它承載了具體的任務(wù)執(zhí)行流    /**     * Returns a {@code RunnableFuture} for the given runnable and default     * value.     *     * @param runnable the runnable task being wrapped     * @param value the default value for the returned future     * @param <T> the type of the given value     * @return a {@code RunnableFuture} which, when run, will run the     * underlying runnable and which, as a {@code Future}, will yield     * the given value as its result and provide for cancellation of     * the underlying task     * @since 1.6     */    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {        return new FutureTask<T>(runnable, value);    }
          // ThreadPoolExecutor 的任務(wù)提交過程 // java.util.concurrent.ThreadPoolExecutor#execute /** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ // ctl 是一個重要的控制全局狀態(tài)的數(shù)據(jù)結(jié)構(gòu),定義為一個線程安全的 AtomicInteger // ctl = new AtomicInteger(ctlOf(RUNNING, 0)); int c = ctl.get(); // 當(dāng)還沒有達(dá)到核心線程池的數(shù)量時,直接添加1個新線程,然后讓其執(zhí)行任務(wù)即可 if (workerCountOf(c) < corePoolSize) { // 2.1. 添加新線程,且執(zhí)行command任務(wù) // 添加成功,即不需要后續(xù)操作了,添加失敗,則說明外部環(huán)境變化了 if (addWorker(command, true)) return; c = ctl.get(); } // 當(dāng)核心線程達(dá)到后,則嘗試添加到阻塞隊列中,具體添加方法由阻塞隊列實現(xiàn) // isRunning => c < SHUTDOWN; if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 2.2. 添加隊列成功后,還要再次檢測線程池的運行狀態(tài),決定啟動線程或者狀態(tài)過期 // 2.2.1. 當(dāng)線程池已關(guān)閉,則將剛剛添加的任務(wù)移除,走reject策略 if (! isRunning(recheck) && remove(command)) reject(command); // 2.2.2. 當(dāng)一個worker都沒有時,則添加worker else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 當(dāng)隊列滿后,則直接再創(chuàng)建新的線程運行,如果不能再創(chuàng)建線程了,則 reject else if (!addWorker(command, false)) // 2.3. 拒絕策略處理 reject(command); }

          通過上面這一小段代碼,我們就已經(jīng)完整地看到了。通過一個 ctl 變量進(jìn)行全局狀態(tài)控制,從而保證了線程安全性。整個框架并沒有使用鎖,但是卻是線程安全的。

          整段代碼剛好完整描述了線程池的執(zhí)行流程:

          1. 判斷核心線程池是否已滿,如果不是,則創(chuàng)建線程執(zhí)行任務(wù);
          2. 如果核心線程池滿了,判斷隊列是否滿了,如果隊列沒滿,將任務(wù)放在隊列中;
          3. 如果隊列滿了,則判斷線程池是否已滿,如果沒滿,創(chuàng)建線程執(zhí)行任務(wù);
          4. 如果線程池也滿了,則按照拒絕策略對任務(wù)進(jìn)行處理;

          2.1. 添加新的worker

          一個worker,即是一個工作線程。

              /**     * Checks if a new worker can be added with respect to current     * pool state and the given bound (either core or maximum). If so,     * the worker count is adjusted accordingly, and, if possible, a     * new worker is created and started, running firstTask as its     * first task. This method returns false if the pool is stopped or     * eligible to shut down. It also returns false if the thread     * factory fails to create a thread when asked.  If the thread     * creation fails, either due to the thread factory returning     * null, or due to an exception (typically OutOfMemoryError in     * Thread.start()), we roll back cleanly.     *     * @param firstTask the task the new thread should run first (or     * null if none). Workers are created with an initial first task     * (in method execute()) to bypass queuing when there are fewer     * than corePoolSize threads (in which case we always start one),     * or when the queue is full (in which case we must bypass queue).     * Initially idle threads are usually created via     * prestartCoreThread or to replace other dying workers.     *     * @param core if true use corePoolSize as bound, else     * maximumPoolSize. (A boolean indicator is used here rather than a     * value to ensure reads of fresh values after checking other pool     * state).     * @return true if successful     */    private boolean addWorker(Runnable firstTask, boolean core) {        // 為確保線程安全,進(jìn)行CAS反復(fù)重試        retry:        for (;;) {            int c = ctl.get();            // 獲取runState , c 的高位存儲            // c & ~CAPACITY;            int rs = runStateOf(c);
          // Check if queue empty only if necessary. // 已經(jīng)shutdown, firstTask 為空的添加并不會成功 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;
          for (;;) { int wc = workerCountOf(c); // 如果超出最大允許創(chuàng)建的線程數(shù),則直接失敗 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // CAS 更新worker+1數(shù),成功則說明占位成功退出retry,后續(xù)的添加操作將是安全的,失敗則說明已有其他線程變更該值 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // runState 變更,則退出到 retry 重新循環(huán) if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 以下為添加 worker 過程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 使用 Worker 封閉 firstTask 任務(wù),后續(xù)運行將由 Worker 接管 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; // 添加 worker 的過程,需要保證線程安全 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // SHUTDOWN 情況下還是會創(chuàng)建 Worker, 但是后續(xù)檢測將會失敗 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 既然是新添加的線程,就不應(yīng)該是 alive 狀態(tài) if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // workers 只是一個工作線程的容器,使用 HashSet 承載 // private final HashSet<Worker> workers = new HashSet<Worker>(); workers.add(w); int s = workers.size(); // 維護(hù)一個全局達(dá)到過的最大線程數(shù)計數(shù)器 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // worker 添加成功后,進(jìn)行將worker啟起來,里面應(yīng)該是有一個 死循環(huán),一直在獲取任務(wù) // 不然怎么運行添加到隊列里的任務(wù)呢? if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 如果任務(wù)啟動失敗,則必須進(jìn)行清理,返回失敗 if (! workerStarted) addWorkerFailed(w); } return workerStarted; } // 大概添加 worker 的框架明白了,重點對象是 Worker, 我們稍后再講 // 現(xiàn)在先來看看,添加失敗的情況,如何進(jìn)行 /** * Rolls back the worker thread creation. * - removes worker from workers, if present * - decrements worker count * - rechecks for termination, in case the existence of this * worker was holding up termination */ private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); // ctl 中的 workerCount - 1 , CAS 實現(xiàn) decrementWorkerCount(); // 嘗試處理空閑線程 tryTerminate(); } finally { mainLock.unlock(); } } /** * Decrements the workerCount field of ctl. This is called only on * abrupt termination of a thread (see processWorkerExit). Other * decrements are performed within getTask. */ private void decrementWorkerCount() { do {} while (! compareAndDecrementWorkerCount(ctl.get())); } // 停止可能啟動的 worker /** * Transitions to TERMINATED state if either (SHUTDOWN and pool * and queue empty) or (STOP and pool empty). If otherwise * eligible to terminate but workerCount is nonzero, interrupts an * idle worker to ensure that shutdown signals propagate. This * method must be called following any action that might make * termination possible -- reducing worker count or removing tasks * from the queue during shutdown. The method is non-private to * allow access from ScheduledThreadPoolExecutor. */ final void tryTerminate() { for (;;) { int c = ctl.get(); // 線程池正在運行、正在清理、已關(guān)閉但隊列還未處理完,都不會進(jìn)行 terminate 操作 if (isRunning(c) || // c >= TIDYING runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate // 停止線程的兩個方式之一,只中斷一個 worker interruptIdleWorkers(ONLY_ONE); return; } // 以下為整個線程池的后置操作 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 設(shè)置正在清理標(biāo)識 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 線程池已終止的鉤子方法,默認(rèn)實現(xiàn)為空 terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); // 此處 termination 為喚醒等待關(guān)閉的線程 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } } /** * Interrupts threads that might be waiting for tasks (as * indicated by not being locked) so they can check for * termination or configuration changes. Ignores * SecurityExceptions (in which case some threads may remain * uninterrupted). * * @param onlyOne If true, interrupt at most one worker. This is * called only from tryTerminate when termination is otherwise * enabled but there are still other workers. In this case, at * most one waiting worker is interrupted to propagate shutdown * signals in case all threads are currently waiting. * Interrupting any arbitrary thread ensures that newly arriving * workers since shutdown began will also eventually exit. * To guarantee eventual termination, it suffices to always * interrupt only one idle worker, but shutdown() interrupts all * idle workers so that redundant workers exit promptly, not * waiting for a straggler task to finish. */ private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 迭代所有 worker for (Worker w : workers) { Thread t = w.thread; // 獲取到 worker 的鎖之后,再進(jìn)行 interrupt if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } // 只中斷一個 worker, 立即返回, 不保證 interrupt 成功 if (onlyOne) break; } } finally { mainLock.unlock(); } }

          2.2. 當(dāng)添加隊列成功后,發(fā)現(xiàn)線程池狀態(tài)變更,需要進(jìn)行移除隊列操作

              /**     * Removes this task from the executor's internal queue if it is     * present, thus causing it not to be run if it has not already     * started.     *     * <p>This method may be useful as one part of a cancellation     * scheme.  It may fail to remove tasks that have been converted     * into other forms before being placed on the internal queue. For     * example, a task entered using {@code submit} might be     * converted into a form that maintains {@code Future} status.     * However, in such cases, method {@link #purge} may be used to     * remove those Futures that have been cancelled.     *     * @param task the task to remove     * @return {@code true} if the task was removed     */    public boolean remove(Runnable task) {        // 此移除不一定能成功        boolean removed = workQueue.remove(task);        // 上面已經(jīng)看過,它會嘗試停止一個 worker 線程        tryTerminate(); // In case SHUTDOWN and now empty        return removed;    }

          3. 添加失敗進(jìn)行執(zhí)行拒絕策略

              /**     * Invokes the rejected execution handler for the given command.     * Package-protected for use by ScheduledThreadPoolExecutor.     */    final void reject(Runnable command) {        // 拒絕策略是在構(gòu)造方法時傳入的,默認(rèn)為 RejectedExecutionHandler        // 即用戶只需實現(xiàn) rejectedExecution 方法,即可以自定義拒絕策略了        handler.rejectedExecution(command, this);    }

          4. Worker 的工作機制

          從上面的實現(xiàn)中,我們可以看到,主要是對 Worker 的添加和 workQueue 的添加,所以具體的工作是由誰完成呢?自然就是 Worker 了。

                  // Worker 的構(gòu)造方法,主要是接受一個 task, 可以為 null, 如果非null, 將在不久的將來被執(zhí)行        // private final class Worker extends AbstractQueuedSynchronizer implements Runnable        /**         * Creates with given first task and thread from ThreadFactory.         * @param firstTask the first task (null if none)         */        Worker(Runnable firstTask) {            setState(-1); // inhibit interrupts until runWorker            this.firstTask = firstTask;            // 將 Worker 自身當(dāng)作一個 任務(wù),綁定到 worker.thread 中            // thread 啟動時,worker 就啟動了            this.thread = getThreadFactory().newThread(this);        }        // Worker 的主要工作實現(xiàn),通過一個循環(huán)掃描實現(xiàn)                /** Delegates main run loop to outer runWorker  */        public void run() {            // 調(diào)用 ThreadPoolExecutor 外部實現(xiàn)的 runWorker 方法            runWorker(this);        }
          /** * Main worker run loop. Repeatedly gets tasks from queue and * executes them, while coping with a number of issues: * * 1. We may start out with an initial task, in which case we * don't need to get the first one. Otherwise, as long as pool is * running, we get tasks from getTask. If it returns null then the * worker exits due to changed pool state or configuration * parameters. Other exits result from exception throws in * external code, in which case completedAbruptly holds, which * usually leads processWorkerExit to replace this thread. * * 2. Before running any task, the lock is acquired to prevent * other pool interrupts while the task is executing, and then we * ensure that unless pool is stopping, this thread does not have * its interrupt set. * * 3. Each task run is preceded by a call to beforeExecute, which * might throw an exception, in which case we cause thread to die * (breaking loop with completedAbruptly true) without processing * the task. * * 4. Assuming beforeExecute completes normally, we run the task, * gathering any of its thrown exceptions to send to afterExecute. * We separately handle RuntimeException, Error (both of which the * specs guarantee that we trap) and arbitrary Throwables. * Because we cannot rethrow Throwables within Runnable.run, we * wrap them within Errors on the way out (to the thread's * UncaughtExceptionHandler). Any thrown exception also * conservatively causes thread to die. * * 5. After task.run completes, we call afterExecute, which may * also throw an exception, which will also cause thread to * die. According to JLS Sec 14.20, this exception is the one that * will be in effect even if task.run throws. * * The net effect of the exception mechanics is that afterExecute * and the thread's UncaughtExceptionHandler have as accurate * information as we can provide about any problems encountered by * user code. * * @param w the worker */ final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 不停地從 workQueue 中獲取任務(wù),然后執(zhí)行,就是這么個邏輯 // getTask() 會阻塞式獲取,所以 Worker 往往不會立即退出 while (task != null || (task = getTask()) != null) { // 執(zhí)行過程中是不允許并發(fā)的,即同時只能一個 task 在運行,此時也不允許進(jìn)行 interrupt w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 檢測是否已被線程池是否停止 或者當(dāng)前 worker 被中斷 // STOP = 1 << COUNT_BITS; if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) // 中斷信息傳遞 wt.interrupt(); try { // 任務(wù)開始前 切點,默認(rèn)為空執(zhí)行 beforeExecute(wt, task); Throwable thrown = null; try { // 直接調(diào)用任務(wù)的run方法, 具體的返回結(jié)果,會被 FutureTask 封裝到 某個變量中 // 可以參考以前的文章 (FutureTask是怎樣獲取到異步執(zhí)行結(jié)果的?https://www.cnblogs.com/yougewe/p/11666284.html) task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 任務(wù)開始后 切點,默認(rèn)為空執(zhí)行 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } // 正常退出,有必要的話,可能重新將 Worker 添加進(jìn)來 completedAbruptly = false; } finally { // 處理退出后下一步操作,可能重新添加 Worker processWorkerExit(w, completedAbruptly); } }
          /** * Performs cleanup and bookkeeping for a dying worker. Called * only from worker threads. Unless completedAbruptly is set, * assumes that workerCount has already been adjusted to account * for exit. This method removes thread from worker set, and * possibly terminates the pool or replaces the worker if either * it exited due to user task exception or if fewer than * corePoolSize workers are running or queue is non-empty but * there are no workers. * * @param w the worker * @param completedAbruptly if the worker died due to user exception */ private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount();
          final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); }
          tryTerminate();
          int c = ctl.get(); if (runStateLessThan(c, STOP)) { // 在 Worker 正常退出的情況下,檢查是否超時導(dǎo)致,維持最小線程數(shù) if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; // 如果滿足最小線程要求,則直接返回 if (workerCountOf(c) >= min) return; // replacement not needed } // 否則再添加一個Worker到線程池中備用 // 非正常退出,會直接再添加一個Worker addWorker(null, false); } }
          /** * Performs blocking or timed wait for a task, depending on * current configuration settings, or returns null if this worker * must exit because of any of: * 1. There are more than maximumPoolSize workers (due to * a call to setMaximumPoolSize). * 2. The pool is stopped. * 3. The pool is shutdown and the queue is empty. * 4. This worker timed out waiting for a task, and timed-out * workers are subject to termination (that is, * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) * both before and after the timed wait, and if the queue is * non-empty, this worker is not the last thread in the pool. * * @return task, or null if the worker must exit, in which case * workerCount is decremented */ private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out?
          for (;;) { int c = ctl.get(); int rs = runStateOf(c);
          // Check if queue empty only if necessary. // 如果進(jìn)行了 shutdown, 且隊列為空, 則需要將 worker 退出 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // do {} while (! compareAndDecrementWorkerCount(ctl.get())); decrementWorkerCount(); return null; }
          int wc = workerCountOf(c);
          // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 線程數(shù)據(jù)大于最大允許線程,需要刪除多余的 Worker if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; }
          try { // 如果開戶了超時刪除功能,則使用 poll, 否則使用 take() 進(jìn)行阻塞獲取 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // 獲取到任務(wù),則可以進(jìn)行執(zhí)行了 if (r != null) return r; // 如果有超時設(shè)置,則會在下一循環(huán)時退出 timedOut = true; } // 忽略中斷異常 // 在這種情況下,Worker如何響應(yīng)外部的中斷請求呢???思考 catch (InterruptedException retry) { timedOut = false; } } }

          所以,Worker的作用就體現(xiàn)出來了,一個循環(huán)取任務(wù)執(zhí)行任務(wù)過程:

              1. 有一個主循環(huán)一直進(jìn)行任務(wù)的獲取;
              2. 針對有超時的設(shè)置,會使用poll進(jìn)行獲取任務(wù),如果超時,則 Worker 將會退出循環(huán)結(jié)束線程;
              3. 無超時的設(shè)置,則會使用 take 進(jìn)行阻塞式獲取,直到有值;
              4. 獲取任務(wù)執(zhí)行前置+業(yè)務(wù)+后置任務(wù);
              5. 當(dāng)獲取到null的任務(wù)之后,當(dāng)前Worker將會結(jié)束;
              6. 當(dāng)前Worker結(jié)束后,將會判斷是否有必要維護(hù)最低Worker數(shù),從而決定是否再添加Worker進(jìn)來。

          還是借用一個網(wǎng)上同學(xué)比較通用的一個圖來表述下 Worker/ThreadPoolExecutor 的工作流程吧(已經(jīng)很完美,不需要再造這輪子了)

          5. shutdown 操作實現(xiàn)

          ThreadPoolExecutor 是通過 ctl 這個變量進(jìn)行全局狀態(tài)維護(hù)的,shutdown 在線程池中也是表現(xiàn)為一個狀態(tài),所以應(yīng)該是比較簡單的。

              /**     * Initiates an orderly shutdown in which previously submitted     * tasks are executed, but no new tasks will be accepted.     * Invocation has no additional effect if already shut down.     *     * <p>This method does not wait for previously submitted tasks to     * complete execution.  Use {@link #awaitTermination awaitTermination}     * to do that.     *     * @throws SecurityException {@inheritDoc}     */    public void shutdown() {        // 為保證線程安全,使用 mainLock        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            // SecurityManager 檢查            checkShutdownAccess();            // 設(shè)置狀態(tài)為 SHUTDOWN            advanceRunState(SHUTDOWN);            // 中斷空閑的 Worker, 即相當(dāng)于依次關(guān)閉每個空閑線程            interruptIdleWorkers();            // 關(guān)閉鉤子,默認(rèn)實現(xiàn)為空操作,為方便子類實現(xiàn)自定義清理功能            onShutdown(); // hook for ScheduledThreadPoolExecutor        } finally {            mainLock.unlock();        }        // 再        tryTerminate();    }    /**     * Transitions runState to given target, or leaves it alone if     * already at least the given target.     *     * @param targetState the desired state, either SHUTDOWN or STOP     *        (but not TIDYING or TERMINATED -- use tryTerminate for that)     */    private void advanceRunState(int targetState) {        for (;;) {            int c = ctl.get();            // 自身CAS更新成功或者被其他線程更新成功            if (runStateAtLeast(c, targetState) ||                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))                break;        }    }    // 關(guān)閉空閑線程(非 running 狀態(tài))    /**     * Common form of interruptIdleWorkers, to avoid having to     * remember what the boolean argument means.     */    private void interruptIdleWorkers() {        // 上文已介紹, 此處 ONLY_ONE 為 false, 即是最大可能地中斷所有 Worker        interruptIdleWorkers(false);    }
          與 shutdown 對應(yīng)的,有一個 shutdownNow, 其語義是 立即停止所有任務(wù)。 /** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks * that were awaiting execution. These tasks are drained (removed) * from the task queue upon return from this method. * * <p>This method does not wait for actively executing tasks to * terminate. Use {@link #awaitTermination awaitTermination} to * do that. * * <p>There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. This implementation * cancels tasks via {@link Thread#interrupt}, so any task that * fails to respond to interrupts may never terminate. * * @throws SecurityException {@inheritDoc} */ public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 與 shutdown 的差別,設(shè)置的狀態(tài)不一樣 advanceRunState(STOP); // 強行中斷線程 interruptWorkers(); // 將未完成的任務(wù)返回 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
          /** * Interrupts all threads, even if active. Ignores SecurityExceptions * (in which case some threads may remain uninterrupted). */ private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) // 調(diào)用 worker 的提供的中斷方法 w.interruptIfStarted(); } finally { mainLock.unlock(); } } // ThreadPoolExecutor.Worker#interruptIfStarted void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { // 直接調(diào)用任務(wù)的 interrupt t.interrupt(); } catch (SecurityException ignore) { } } }

          6. invokeAll 的實現(xiàn)方式

          invokeAll, 望文生義,即是調(diào)用所有給定的任務(wù)。想來應(yīng)該是一個個地添加任務(wù)到線程池隊列吧。

              // invokeAll 的方法直接在抽象方便中就實現(xiàn)了,它的語義是同時執(zhí)行n個任務(wù),并同步等待結(jié)果返回    // java.util.concurrent.AbstractExecutorService#invokeAll(java.util.Collection<? extends java.util.concurrent.Callable<T>>)    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)        throws InterruptedException {        if (tasks == null)            throw new NullPointerException();        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());        boolean done = false;        try {            for (Callable<T> t : tasks) {                RunnableFuture<T> f = newTaskFor(t);                futures.add(f);                // 依次調(diào)用各子類的實現(xiàn),添加任務(wù)                execute(f);            }            for (int i = 0, size = futures.size(); i < size; i++) {                Future<T> f = futures.get(i);                if (!f.isDone()) {                    try {                        // 依次等待執(zhí)行結(jié)果                        f.get();                    } catch (CancellationException ignore) {                    } catch (ExecutionException ignore) {                    }                }            }            done = true;            return futures;        } finally {            if (!done)                for (int i = 0, size = futures.size(); i < size; i++)                    futures.get(i).cancel(true);        }    }

          實現(xiàn)很簡單,都是些外圍調(diào)用。

          7. ThreadPoolExecutor 的狀態(tài)值的設(shè)計

          通過上面的過程,可以看到,整個ThreadPoolExecutor 非狀態(tài)的依賴是非常強的。所以一個好的狀態(tài)值的設(shè)計就顯得很重要了,runState 代表線程池或者 Worker 的運行狀態(tài)。如下:

              // runState is stored in the high-order bits    // 整個狀態(tài)使值使用 ctl 的高三位值進(jìn)行控制, COUNT_BITS=29    // 1110 0000 0000 0000    private static final int RUNNING    = -1 << COUNT_BITS;    // 0000 0000 0000 0000    private static final int SHUTDOWN   =  0 << COUNT_BITS;    // 0010 0000 0000 0000    private static final int STOP       =  1 << COUNT_BITS;    // 0100 0000 0000 0000    private static final int TIDYING    =  2 << COUNT_BITS;    // 0110 0000 0000 0000    private static final int TERMINATED =  3 << COUNT_BITS;    // 整個狀態(tài)值的大小順序主: RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
          // 而低 29位,則用來保存 worker 的數(shù)量,當(dāng)worker增加時,只要將整個 ctl 增加即可。 // 0001 1111 1111 1111, 即是最大的 worker 數(shù)量 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
          // 整個 ctl 描述為一個 AtomicInteger, 功能如下: /** * The main pool control state, ctl, is an atomic integer packing * two conceptual fields * workerCount, indicating the effective number of threads * runState, indicating whether running, shutting down etc * * In order to pack them into one int, we limit workerCount to * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 * billion) otherwise representable. If this is ever an issue in * the future, the variable can be changed to be an AtomicLong, * and the shift/mask constants below adjusted. But until the need * arises, this code is a bit faster and simpler using an int. * * The workerCount is the number of workers that have been * permitted to start and not permitted to stop. The value may be * transiently different from the actual number of live threads, * for example when a ThreadFactory fails to create a thread when * asked, and when exiting threads are still performing * bookkeeping before terminating. The user-visible pool size is * reported as the current size of the workers set. * * The runState provides the main lifecycle control, taking on values: * * RUNNING: Accept new tasks and process queued tasks * SHUTDOWN: Don't accept new tasks, but process queued tasks * STOP: Don't accept new tasks, don't process queued tasks, * and interrupt in-progress tasks * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method * TERMINATED: terminated() has completed * * The numerical order among these values matters, to allow * ordered comparisons. The runState monotonically increases over * time, but need not hit each state. The transitions are: * * RUNNING -> SHUTDOWN * On invocation of shutdown(), perhaps implicitly in finalize() * (RUNNING or SHUTDOWN) -> STOP * On invocation of shutdownNow() * SHUTDOWN -> TIDYING * When both queue and pool are empty * STOP -> TIDYING * When pool is empty * TIDYING -> TERMINATED * When the terminated() hook method has completed * * Threads waiting in awaitTermination() will return when the * state reaches TERMINATED. * * Detecting the transition from SHUTDOWN to TIDYING is less * straightforward than you'd like because the queue may become * empty after non-empty and vice versa during SHUTDOWN state, but * we can only terminate if, after seeing that it is empty, we see * that workerCount is 0 (which sometimes entails a recheck -- see * below). */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

          8. awaitTermination 等待關(guān)閉完成

          從上面的 shutdown, 可以看到,只是寫了 SHUTDOWN 標(biāo)識后,嘗試盡可能地中斷停止Worker線程,但并不保證中斷成功。要想保證停止完成,需要有另外的機制來保證。從 awaitTermination 的語義來說,它是能保證任務(wù)停止完成的,那么它是如何保證的呢?

              // ThreadPoolExecutor.awaitTermination()    public boolean awaitTermination(long timeout, TimeUnit unit)        throws InterruptedException {        long nanos = unit.toNanos(timeout);        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            for (;;) {                // 只是循環(huán) ctl 狀態(tài), 只要 狀態(tài)為 TERMINATED 狀態(tài),則說明已經(jīng)關(guān)閉成功                // 此處 termination 的狀態(tài)觸發(fā)是在 tryTerminate 中觸發(fā)的                if (runStateAtLeast(ctl.get(), TERMINATED))                    return true;                if (nanos <= 0)                    return false;                nanos = termination.awaitNanos(nanos);            }        } finally {            mainLock.unlock();        }    }

          看起來, awaitTermination 并沒有什么特殊操作,而是一直在等待。所以 TERMINATED 是 Worker 自行發(fā)生的動作。

          那是在哪里做的操作呢?其實是在獲取任務(wù)的時候,會檢測當(dāng)前狀態(tài)是否是 SHUTDOWN, 如果是SHUTDOWN且 隊列為空,則會觸發(fā)獲取任務(wù)的返回null.從而結(jié)束當(dāng)前 Worker.

          Worker 在結(jié)束前會調(diào)用 processWorkerExit() 方法,里面會再次調(diào)用 tryTerminate(), 當(dāng)所有 Worker 都運行到這個點后, awaitTermination() 就會收到通知了。(注意: processWorkerExit() 會在每次運行后進(jìn)行 addWorker() 嘗試,但是在 SHUTDOWN 狀態(tài)的添加操作總是失敗的,所以不用考慮)

          到此,你是否可以解答前面的幾個問題了呢?




          往期精彩推薦



          騰訊、阿里、滴滴后臺面試題匯總總結(jié) — (含答案)

          面試:史上最全多線程面試題 !

          最新阿里內(nèi)推Java后端面試題

          JVM難學(xué)?那是因為你沒認(rèn)真看完這篇文章


          END


          關(guān)注作者微信公眾號 —《JAVA爛豬皮》


          了解更多java后端架構(gòu)知識以及最新面試寶典


          你點的每個好看,我都認(rèn)真當(dāng)成了


          看完本文記得給作者點贊+在看哦~~~大家的支持,是作者源源不斷出文的動力


          作者:等你歸去來

          出處:https://www.cnblogs.com/yougewe/p/12267274.html

          瀏覽 48
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  久久久精品影院av | 俺也去成人视频 | 亚洲综合成人在线 | 欧洲亚洲免费视频 | 日本午夜一级理论片 |