<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          肝完這篇線程池,我咳血了

          共 3932字,需瀏覽 8分鐘

           ·

          2021-02-12 07:30

          點(diǎn)擊藍(lán)色“程序員cxuan?”關(guān)注我喲

          加個(gè)“星標(biāo)”,及時(shí)接收最新文章

          這是程序員cxuan 的第?59?篇原創(chuàng)文章

          更多文章見?

          https://github.com/crisxuan/bestJavaer



          我們知道,線程需要的時(shí)候要進(jìn)行創(chuàng)建,不需要的時(shí)候需要進(jìn)行銷毀,但是線程的創(chuàng)建和銷毀都是一個(gè)開銷比較大的操作。

          為什么開銷大呢?

          雖然我們程序員創(chuàng)建一個(gè)線程很容易,直接使用 new Thread() 創(chuàng)建就可以了,但是操作系統(tǒng)做的工作會(huì)多很多,它需要發(fā)出 系統(tǒng)調(diào)用,陷入內(nèi)核,調(diào)用內(nèi)核 API 創(chuàng)建線程,為線程分配資源等,這一些操作有很大的開銷。

          所以,在高并發(fā)大流量的情況下,頻繁的創(chuàng)建和銷毀線程會(huì)大大拖慢響應(yīng)速度,那么有什么能夠提高響應(yīng)速度的方式嗎?方式有很多,盡量避免線程的創(chuàng)建和銷毀是一種提升性能的方式,也就是把線程 復(fù)用 起來,因?yàn)樾阅苁俏覀內(nèi)粘W铌P(guān)注的因素。

          本篇文章我們先來通過認(rèn)識(shí)一下 Executor 框架、然后通過描述線程池的基本概念入手、逐步認(rèn)識(shí)線程池的核心類,然后慢慢進(jìn)入線程池的原理中,帶你一步一步理解線程池。

          在 Java 中可以通過線程池來達(dá)到這樣的效果。今天我們就來詳細(xì)講解一下 Java 的線程池

          Executor 框架

          為什么要先說一下 Executor 呢?因?yàn)槲艺J(rèn)為 Executor 是線程池的一個(gè)驅(qū)動(dòng),我們平常創(chuàng)建并執(zhí)行線程用的一般都是 new Thread().start() 這個(gè)方法,這個(gè)方法更多強(qiáng)調(diào) 創(chuàng)建一個(gè)線程并開始運(yùn)行。而我們后面講到創(chuàng)建線程池更多體現(xiàn)在驅(qū)動(dòng)執(zhí)行上。

          Executor 的總體框架如下,我們下面會(huì)對(duì) Executor 框架中的每個(gè)類進(jìn)行介紹。

          我們首先來認(rèn)識(shí)一下 Executor

          Executor 接口

          Executor 是 java.util.concurrent 的頂級(jí)接口,這個(gè)接口只有一個(gè)方法,那就是 execute 方法。我們平常創(chuàng)建并啟動(dòng)線程會(huì)使用 new Thread().start() ,而 Executor 中的 execute 方法替代了顯示創(chuàng)建線程的方式。Executor 的設(shè)計(jì)初衷就是將任務(wù)提交和任務(wù)執(zhí)行細(xì)節(jié)進(jìn)行解藕。使用 Executor 框架,你可以使用如下的方式創(chuàng)建線程

          Executor?executor?=?Executors.xxx?//?xxx?其實(shí)就是?Executor?的實(shí)現(xiàn)類,我們后面會(huì)說
          executor.execute(new?RunnableTask1());
          executor.execute(new?RunnableTask2());

          execute方法接收一個(gè) Runnable 實(shí)例,它用來執(zhí)行一個(gè)任務(wù),而任務(wù)就是一個(gè)實(shí)現(xiàn)了 Runnable 接口的類,但是 execute 方法不能接收實(shí)現(xiàn)了 Callable 接口的類,也就是說,execute 方法不能接收具有返回值的任務(wù)。

          execute 方法創(chuàng)建的線程是異步執(zhí)行的,也就是說,你不用等待每個(gè)任務(wù)執(zhí)行完畢后再執(zhí)行下一個(gè)任務(wù)。

          比如下面就是一個(gè)簡(jiǎn)單的使用 Executor 創(chuàng)建并執(zhí)行線程的示例

          public?class?RunnableTask?implements?Runnable{

          ????@Override
          ????public?void?run()?{
          ????????System.out.println("running");
          ????}

          ????public?static?void?main(String[]?args)?{
          ????????Executor?executor?=?Executors.newSingleThreadExecutor();?//?你可能不太理解這是什么意思,我們后面會(huì)說。
          ????????executor.execute(new?RunnableTask());
          ????}
          }

          Executor 就相當(dāng)于是族長(zhǎng),大佬只發(fā)號(hào)令,族長(zhǎng)讓你異步執(zhí)行你就得異步執(zhí)行,族長(zhǎng)說不用匯報(bào)任務(wù)你就不用回報(bào),但是這個(gè)族長(zhǎng)管的事情有點(diǎn)少,所以除了 Executor 之外,我們還需要認(rèn)識(shí)其他管家,比如說管你這個(gè)線程啥時(shí)候終止,啥時(shí)候暫停,判斷你這個(gè)線程當(dāng)前的狀態(tài)等,ExecutorService 就是一位大管家。

          ExecutorService 接口

          ExecutorService 也是一個(gè)接口,它是 Executor 的拓展,提供了一些 Executor 中沒有的方法,下面我們來介紹一下這些方法

          void?shutdown();

          shutdown 方法調(diào)用后,ExecutorService 會(huì)有序關(guān)閉正在執(zhí)行的任務(wù),但是不接受新任務(wù)。如果任務(wù)已經(jīng)關(guān)閉,那么這個(gè)方法不會(huì)產(chǎn)生任何影響。

          ExecutorService 還有一個(gè)和 shutdown 方法類似的方法是

          List?shutdownNow();

          shutdownNow 會(huì)嘗試停止關(guān)閉所有正在執(zhí)行的任務(wù),停止正在等待的任務(wù),并返回正在等待執(zhí)行的任務(wù)列表。

          既然 shutdown 和 shutdownNow 這么相似,那么二者有啥區(qū)別呢?

          • shutdown 方法只是會(huì)將線程池的狀態(tài)設(shè)置為 SHUTWDOWN ,正在執(zhí)行的任務(wù)會(huì)繼續(xù)執(zhí)行下去,線程池會(huì)等待任務(wù)的執(zhí)行完畢,而沒有執(zhí)行的線程則會(huì)中斷。
          • shutdownNow 方法會(huì)將線程池的狀態(tài)設(shè)置為 STOP,正在執(zhí)行和等待的任務(wù)則被停止,返回等待執(zhí)行的任務(wù)列表

          ExecutorService 還有三個(gè)判斷線程狀態(tài)的方法,分別是

          boolean?isShutdown();
          boolean?isTerminated();
          boolean?awaitTermination(long?timeout,?TimeUnit?unit)
          ????????throws?InterruptedException
          ;
          • isShutdown 方法表示執(zhí)行器是否已經(jīng)關(guān)閉,如果已經(jīng)關(guān)閉,返回 true,否則返回 false。
          • isTerminated 方法表示判斷所有任務(wù)再關(guān)閉后是否已完成,如果完成返回 false,這個(gè)需要注意一點(diǎn),除非首先調(diào)用 shutdown 或者 shutdownNow 方法,否則 isTerminated 方法永遠(yuǎn)不會(huì)為 true。
          • awaitTermination 方法會(huì)阻塞,直到發(fā)出調(diào)用 shutdown 請(qǐng)求后所有的任務(wù)已經(jīng)完成執(zhí)行后才會(huì)解除。這個(gè)方法不是非常容易理解,下面通過一個(gè)小例子來看一下。
          public?static?ExecutorService?executorService?=?Executors.newFixedThreadPool(10);

          public?static?void?main(String[]?args)?throws?InterruptedException?{
          ??for?(int?i?=?0;?i?10;?i++)?{
          ????executorService.submit(()?->?{
          ??????System.out.println(Thread.currentThread().getName());
          ??????try?{
          ????????Thread.sleep(10);
          ??????}?catch?(InterruptedException?e)?{
          ????????e.printStackTrace();
          ??????}
          ????});

          ??}

          ??executorService.shutdown();
          ??System.out.println("Waiting...");
          ??boolean?isTermination?=?executorService.awaitTermination(3,?TimeUnit.SECONDS);
          ??System.out.println("Waiting...Done");
          ??if(isTermination){
          ????System.out.println("All?Thread?Done");
          ??}
          ??System.out.println(Thread.currentThread().getName());
          }

          如果在調(diào)用 executorService.shutdown() 之后,所有線程完成任務(wù),isTermination 返回 true,程序才會(huì)打印出 All Thread Done ,如果注釋掉 executorService.shutdown() 或者在任務(wù)沒有完成后 awaitTermination 就超時(shí)了,那么 isTermination 就會(huì)返回 false。

          ExecutorService 當(dāng)大管家還有一個(gè)原因是因?yàn)樗粌H能夠包容 Runnable 對(duì)象,還能夠接納 Callable 對(duì)象。在 ExecutorService 中,submit 方法扮演了這個(gè)角色。

          ?Future?submit(Callable?task);
          ?Future?submit(Runnable?task,?T?result);
          Future?submit(Runnable?task);

          submit 方法會(huì)返回一個(gè) Future對(duì)象, 表示范型,它是對(duì) Callable 產(chǎn)生的返回值來說的,submit 方法提交的任務(wù)中的 call 方法如果返回 Integer,那么 submit 方法就返回 Future,依此類推。

          ?List>?invokeAll(Collection>?tasks)
          ????????throws?InterruptedException;
          ?List>?invokeAll(Collection>?tasks,
          ??????????????????????????????????long?timeout,?TimeUnit?unit)
          ????????throws?InterruptedException;

          invokeAll 方法用于執(zhí)行給定的任務(wù)結(jié)合,執(zhí)行完成后會(huì)返回一個(gè)任務(wù)列表,任務(wù)列表每一項(xiàng)是一個(gè)任務(wù),每個(gè)任務(wù)會(huì)包括任務(wù)狀態(tài)和執(zhí)行結(jié)果,同樣 invokeAll 方法也會(huì)返回 Future 對(duì)象。

          ?T?invokeAny(Collection>?tasks)
          ????????throws?InterruptedException,?ExecutionException
          ;
          ?T?invokeAny(Collection>?tasks,
          ????????????????????long?timeout,?TimeUnit?unit)

          ????????throws?InterruptedException,?ExecutionException,?TimeoutException
          ;

          invokeAny 會(huì)獲得最先完成任務(wù)的結(jié)果,即Callable 接口中的 call 的返回值,在獲得結(jié)果時(shí),會(huì)中斷其他正在執(zhí)行的任務(wù),具有阻塞性

          大管家的職責(zé)相對(duì)于組長(zhǎng)來說標(biāo)準(zhǔn)更多,管的事情也比較寬,但是大管家畢竟也是家族的中流砥柱,他不會(huì)做具體的活,他的下面有各個(gè)干將,干將是一個(gè)家族的核心,他負(fù)責(zé)完成大管家的工作。

          AbstractExecutorService 抽象類

          AbstractExecutorService 是一個(gè)抽象類,它實(shí)現(xiàn)了 ExecutorService 中的部分方法,它相當(dāng)一個(gè)干將,會(huì)分析大管家有哪些要做的工作,然后針對(duì)大管家的要求做一些具體的規(guī)劃,然后找他的得力助手 ThreadPoolExecutor 來完成目標(biāo)。

          AbstractExecutorService 這個(gè)抽象類主要實(shí)現(xiàn)了 invokeAllinvokeAny 方法,關(guān)于這兩個(gè)方法的源碼分析我們會(huì)在后面進(jìn)行解釋。

          ScheduledExecutorService 接口

          ScheduledExecutorService 也是一個(gè)接口,它擴(kuò)展了 ExecutorService 接口,提供了 ExecutorService 接口所沒有的功能,ScheduledExecutorService 顧名思義就是一個(gè)定時(shí)執(zhí)行器,定時(shí)執(zhí)行器可以安排命令在一定延遲時(shí)間后運(yùn)行或者定期執(zhí)行。

          它主要有三個(gè)接口方法,一個(gè)重載方法。下面我們先來看一下這兩個(gè)重載方法。

          public?ScheduledFuture?schedule(Runnable?command,
          ???????????????????????????????????????long?delay,?TimeUnit?unit);
          public??ScheduledFuture?schedule(Callable?callable,
          ???????????????????????????????????????????long?delay,?TimeUnit?unit)
          ;

          schedule 方法能夠延遲一定時(shí)間后執(zhí)行任務(wù),并且只能執(zhí)行一次。可以看到,schedule 方法也返回了一個(gè) ScheduledFuture 對(duì)象,ScheduledFuture 對(duì)象擴(kuò)展了 Future 和 Delayed 接口,它表示異步延遲計(jì)算的結(jié)果。schedule 方法支持零延遲和負(fù)延遲,這兩類值都被視為立即執(zhí)行任務(wù)。

          還有一點(diǎn)需要說明的是,schedule 方法能夠接收相對(duì)的時(shí)間和周期作為參數(shù),而不是固定的日期,你可以使用 date.getTime - System.currentTimeMillis() 來得到相對(duì)的時(shí)間間隔。

          public?ScheduledFuture?scheduleAtFixedRate(Runnable?command,
          ??????????????????????????????????????????????????long?initialDelay,
          ??????????????????????????????????????????????????long?period,
          ??????????????????????????????????????????????????TimeUnit?unit);

          scheduleAtFixedRate 表示任務(wù)會(huì)根據(jù)固定的速率在時(shí)間 initialDelay 后不斷地執(zhí)行。

          public?ScheduledFuture?scheduleWithFixedDelay(Runnable?command,
          ?????????????????????????????????????????????????????long?initialDelay,
          ?????????????????????????????????????????????????????long?delay,
          ?????????????????????????????????????????????????????TimeUnit?unit);

          這個(gè)方法和上面的方法很類似,它表示的是以固定延遲時(shí)間的方式來執(zhí)行任務(wù)。

          scheduleAtFixedRate 和 scheduleWithFixedDelay 這兩個(gè)方法容易混淆,下面我們通過一個(gè)示例來說明一下這兩個(gè)方法的區(qū)別。

          public?class?ScheduleTest?{

          ????public?static?void?main(String[]?args)?{
          ????????Runnable?command?=?()?->?{
          ????????????long?startTime?=?System.currentTimeMillis();
          ????????????System.out.println("current?timestamp?=?"?+?startTime);
          ????????????try?{
          ????????????????TimeUnit.MILLISECONDS.sleep(new?Random().nextInt(100));
          ????????????}?catch?(InterruptedException?e)?{
          ????????????????e.printStackTrace();
          ????????????}
          ????????????System.out.println("time?spend?=?"?+?(System.currentTimeMillis()?-?startTime));
          ????????};

          ????????ScheduledExecutorService?scheduledExecutorService?=?Executors.newScheduledThreadPool(10);
          ????????scheduledExecutorService.scheduleAtFixedRate(command,100,1000,TimeUnit.MILLISECONDS);
          ????}
          }

          輸出結(jié)果大致如下

          可以看到,每次打印出來 current timestamp 的時(shí)間間隔大約等于 1000 毫秒,所以可以斷定 scheduleAtFixedRate 是以恒定的速率來執(zhí)行任務(wù)的。

          然后我們?cè)倏匆幌?scheduleWithFixedDelay 方法,和上面測(cè)試類一樣,只不過我們把 scheduleAtFixedRate 換為了 scheduleWithFixedDelay 。

          scheduledExecutorService.scheduleWithFixedDelay(command,10,1000,TimeUnit.MILLISECONDS);

          然后觀察一下輸出結(jié)果

          可以看到,兩個(gè) current timestamp 之間的間隔大約等于 1000(固定時(shí)間) + delay(time spend) 的總和,由此可以確定 scheduleWithFixedDelay 是以固定時(shí)延來執(zhí)行的。

          線程池的描述

          下面我們先來認(rèn)識(shí)一下什么是線程池,線程池從概念上來看就是一個(gè)池子,什么池子呢?是指管理同一組工作線程的池子,也就是說,線程池會(huì)統(tǒng)一管理內(nèi)部的工作線程。

          wiki 上說,線程池其實(shí)就是一種軟件設(shè)計(jì)模式,這種設(shè)計(jì)模式用于實(shí)現(xiàn)計(jì)算機(jī)程序中的并發(fā)。

          比如下面就是一個(gè)簡(jiǎn)單的線程池概念圖。

          注意:這個(gè)圖只是一個(gè)概念模型,不是真正的線程池實(shí)現(xiàn),希望讀者不要混淆。

          可以看到,這種其實(shí)也相當(dāng)于是生產(chǎn)者-消費(fèi)者模型,任務(wù)隊(duì)列中的線程會(huì)進(jìn)入到線程池中,由線程池進(jìn)行管理,線程池中的一個(gè)個(gè)線程就是工作線程,工作線程執(zhí)行完畢后會(huì)放入完成隊(duì)列中,代表已經(jīng)完成的任務(wù)。

          上圖有個(gè)缺點(diǎn),那就是隊(duì)列中的線程執(zhí)行完畢后就會(huì)銷毀,銷毀就會(huì)產(chǎn)生性能損耗,降低響應(yīng)速度,而我們使用線程池的目的往往是需要把線程重用起來,提高程序性能。

          所以我們應(yīng)該把執(zhí)行完成后的工作線程重新利用起來,等待下一次使用。

          線程池創(chuàng)建

          我們上面大概聊了一下什么線程池的基本執(zhí)行機(jī)制,你知道了線程是如何復(fù)用的,那么任何事物不可能是憑空出現(xiàn)的,線程也一樣,那么它是如何創(chuàng)建出來的呢?下面就不得不提一個(gè)工具類,那就是 Executors

          Executors 也是java.util.concurrent 包下的成員,它是一個(gè)創(chuàng)建線程池的工廠,可以使用靜態(tài)工廠方法來創(chuàng)建線程池,下面就是 Executors 所能夠創(chuàng)建線程池的具體類型。

          • newFixedThreadPool:newFixedThreadPool 將會(huì)創(chuàng)建固定數(shù)量的線程池,這個(gè)數(shù)量可以由程序員通過創(chuàng)建 Executors.newFixedThreadPool(int nThreads)時(shí)手動(dòng)指定,每次提交一個(gè)任務(wù)就會(huì)創(chuàng)建一個(gè)線程,在任何時(shí)候,nThreads 的值是最多允許活動(dòng)的線程。如果在所有線程都處于活躍狀態(tài)時(shí)有額外的任務(wù)被創(chuàng)建,這些新創(chuàng)建的線程會(huì)進(jìn)入等待隊(duì)列等待線程調(diào)度。如果有任何線程由于執(zhí)行期間出現(xiàn)意外導(dǎo)致線程終止,那么在執(zhí)行后續(xù)任務(wù)時(shí)會(huì)使用等待隊(duì)列中的線程進(jìn)行替代。

          • newWorkStealingPool:newWorkStealingPool 是 JDK1.8 新增加的線程池,它是基于 fork-join 機(jī)制的一種線程池實(shí)現(xiàn),使用了 Work-Stealing 算法。newWorkStealingPool 會(huì)創(chuàng)建足夠的線程來支持并行度,會(huì)使用多個(gè)隊(duì)列來減少競(jìng)爭(zhēng)。work-stealing pool 線程池不會(huì)保證提交任務(wù)的執(zhí)行順序。

          • newSingleThreadExecutor:newSingleThreadExecutor 是一個(gè)單線程的執(zhí)行器,它只會(huì)創(chuàng)建單個(gè)線程來執(zhí)行任務(wù),如果這個(gè)線程異常結(jié)束,則會(huì)創(chuàng)建另外一個(gè)線程來替代。newSingleThreadExecutor 會(huì)確保任務(wù)在任務(wù)隊(duì)列中的執(zhí)行次序,也就是說,任務(wù)的執(zhí)行是 有序的

          • newCachedThreadPool:newCachedThreadPool 會(huì)根據(jù)實(shí)際需要?jiǎng)?chuàng)建一個(gè)可緩存的線程池。如果線程池的線程數(shù)量超過實(shí)際需要處理的任務(wù),那么 newCachedThreadPool 將會(huì)回收多余的線程。如果實(shí)際需要處理的線程不能滿足任務(wù)的數(shù)量,則回你添加新的線程到線程池中,線程池中線程的數(shù)量不存在任何限制。

          • newSingleThreadScheduledExecutor:newSingleThreadScheduledExecutor 和 newSingleThreadExecutor 很類似,只不過帶有 scheduled 的這個(gè)執(zhí)行器哥們能夠在一定延遲后執(zhí)行或者定期執(zhí)行任務(wù)。

          • newScheduledThreadPool:這個(gè)線程池和上面的 scheduled 執(zhí)行器類似,只不過 newSingleThreadScheduledExecutor 比 newScheduledThreadPool 多加了一個(gè) DelegatedScheduledExecutorService 代理,這其實(shí)包裝器設(shè)計(jì)模式的體現(xiàn)。

          上面這些線程池的底層實(shí)現(xiàn)都是由 ThreadPoolExecutor 來提供支持的,所以要理解這些線程池的工作原理,你就需要先把 ThreadPoolExecutor 搞明白,下面我們就來聊一聊 ThreadPoolExecutor。

          ThreadPoolExecutor 類

          ThreadPoolExecutor 位于 java.util.concurrent 工具類下,可以說它是線程池中最核心的一個(gè)類了。如果你要想把線程池理解透徹的話,就要首先了解一下這個(gè)類。

          如果我們?cè)倌蒙厦婕易迮e例子的話,ThreadPoolExecutor 就是一個(gè)家族的骨干人才,家族頂梁柱。ThreadPoolExecutor 做的工作真是太多太多了。

          首先,ThreadPoolExecutor 提供了四個(gè)構(gòu)造方法,然而前三個(gè)構(gòu)造方法最終都會(huì)調(diào)用最后一個(gè)構(gòu)造方法進(jìn)行初始化

          public?class?ThreadPoolExecutor?extends?AbstractExecutorService?{
          ????.....
          ??????//?1
          ????public?ThreadPoolExecutor(int?corePoolSize,int?maximumPoolSize,long?keepAliveTime,TimeUnit?unit,
          ????????????BlockingQueue?workQueue)
          ;
          ????//?2
          ????public?ThreadPoolExecutor(int?corePoolSize,int?maximumPoolSize,long?keepAliveTime,TimeUnit?unit,
          ????????????BlockingQueue?workQueue,ThreadFactory?threadFactory)
          ;
          ????//?3
          ????public?ThreadPoolExecutor(int?corePoolSize,int?maximumPoolSize,long?keepAliveTime,TimeUnit?unit,
          ????????????BlockingQueue?workQueue,RejectedExecutionHandler?handler)
          ;
          ????//?4
          ????public?ThreadPoolExecutor(int?corePoolSize,int?maximumPoolSize,long?keepAliveTime,TimeUnit?unit,
          ????????BlockingQueue?workQueue,ThreadFactory?threadFactory,RejectedExecutionHandler?handler)
          ;
          ????...
          }

          所以我們直接就來看一波最后這個(gè)線程池,看看參數(shù)都有啥,如果我沒數(shù)錯(cuò)的話,應(yīng)該是有 7 個(gè)參數(shù)(小學(xué)數(shù)學(xué)水平。。。。。。)

          • 首先,一個(gè)非常重要的參數(shù)就是 corePoolSize,核心線程池的容量/大小,你叫啥我覺得都沒毛病。只不過你得理解這個(gè)參數(shù)的意義,它和線程池的實(shí)現(xiàn)原理有非常密切的關(guān)系。你剛開始創(chuàng)建了一個(gè)線程池,此時(shí)是沒有任何線程的,這個(gè)很好理解,因?yàn)槲椰F(xiàn)在沒有任務(wù)可以執(zhí)行啊,創(chuàng)建線程干啥啊?而且創(chuàng)建線程還有開銷啊,所以等到任務(wù)過來時(shí)再創(chuàng)建線程也不晚。但是!我要說但是了,如果調(diào)用了 prestartAllCoreThreads 或者 prestartCoreThread 方法,就會(huì)在沒有任務(wù)到來時(shí)創(chuàng)建線程,前者是創(chuàng)建 corePoolSize 個(gè)線程,后者是只創(chuàng)建一個(gè)線程。Lea 爺爺本來想讓我們程序員當(dāng)個(gè)懶漢,等任務(wù)來了再干;可是你非要當(dāng)個(gè)餓漢,提前完成任務(wù)。如果我們想當(dāng)個(gè)懶漢的話,在創(chuàng)建了線程池后,線程池中的線程數(shù)為 0,當(dāng)有任務(wù)來之后,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到 corePoolSize 后,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中。

          • maximumPoolSize :又來一個(gè)線程池的容量,只不過這個(gè)是線程池的最大容量,也就是線程池所能容納最大的線程,而上面的 corePoolSize 只是核心線程容量。

          我知道你此時(shí)會(huì)有疑問,那就是不知道如何核心線程的容量和線程最大容量的區(qū)別是吧?我們后面會(huì)解釋這點(diǎn)。

          • keepAliveTime:這個(gè)參數(shù)是線程池的保活機(jī)制,表示線程在沒有任務(wù)執(zhí)行的情況下保持多久會(huì)終止。在默認(rèn)情況下,這個(gè)參數(shù)只在線程數(shù)量大于 corePoolSize 時(shí)才會(huì)生效。當(dāng)線程數(shù)量大于 corePoolSize 時(shí),如果任意一個(gè)空閑的線程的等待時(shí)間 > keepAliveTime 后,那么這個(gè)線程會(huì)被剔除,直到線程數(shù)量等于 corePoolSize 為止。如果調(diào)用了 allowCoreThreadTimeOut 方法,線程數(shù)量在 corePoolSize 范圍內(nèi)也會(huì)生效,直到線程減為 0。

          • unit :這個(gè)參數(shù)好說,它就是一個(gè) TimeUnit 的變量,unit 表示的是 keepAliveTime 的時(shí)間單位。unit 的類型有下面這幾種

            TimeUnit.DAYS;???????????????//天
            TimeUnit.HOURS;?????????????//小時(shí)
            TimeUnit.MINUTES;???????????//分鐘
            TimeUnit.SECONDS;???????????//秒
            TimeUnit.MILLISECONDS;??????//毫秒
            TimeUnit.MICROSECONDS;??????//微妙
            TimeUnit.NANOSECONDS;???????//納秒
          • workQueue:這個(gè)參數(shù)表示的概念就是等待隊(duì)列,我們上面說過,如果核心線程 > corePoolSize 的話,就會(huì)把任務(wù)放入等待隊(duì)列,這個(gè)等待隊(duì)列的選擇也是一門學(xué)問。Lea 爺爺給我們展示了三種等待隊(duì)列的選擇

            • SynchronousQueue: 基于阻塞隊(duì)列(BlockingQueue)的實(shí)現(xiàn),它會(huì)直接將任務(wù)交給消費(fèi)者,必須等隊(duì)列中的添加元素被消費(fèi)后才能繼續(xù)添加新的元素。使用 SynchronousQueue 阻塞隊(duì)列一般要求maximumPoolSizes 為無界,也就是 Integer.MAX_VALUE,避免線程拒絕執(zhí)行操作。
            • LinkedBlockingQueue:LinkedBlockingQueue 是一個(gè)無界緩存等待隊(duì)列。當(dāng)前執(zhí)行的線程數(shù)量達(dá)到 corePoolSize 的數(shù)量時(shí),剩余的元素會(huì)在阻塞隊(duì)列里等待。
            • ArrayBlockingQueue:ArrayBlockingQueue 是一個(gè)有界緩存等待隊(duì)列,可以指定緩存隊(duì)列的大小,當(dāng)正在執(zhí)行的線程數(shù)等于 corePoolSize 時(shí),多余的元素緩存在 ArrayBlockingQueue 隊(duì)列中等待有空閑的線程時(shí)繼續(xù)執(zhí)行,當(dāng) ArrayBlockingQueue 已滿時(shí),加入 ArrayBlockingQueue 失敗,會(huì)開啟新的線程去執(zhí)行,當(dāng)線程數(shù)已經(jīng)達(dá)到最大的 maximumPoolSizes 時(shí),再有新的元素嘗試加入 ArrayBlockingQueue時(shí)會(huì)報(bào)錯(cuò)
          • threadFactory:線程工廠,這個(gè)參數(shù)主要用來創(chuàng)建線程;

          • handler :拒絕策略,拒絕策略主要有以下取值

            • AbortPolicy:丟棄任務(wù)并拋出 RejectedExecutionException 異常。
            • DiscardPolicy: 直接丟棄任務(wù),但是不拋出異常。
            • DiscardOldestPolicy:直接丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)。
            • CallerRunsPolicy:由調(diào)用線程處理該任務(wù)。

          深入理解線程池

          上面我和你簡(jiǎn)單聊了一下線程池的基本構(gòu)造,線程池有幾個(gè)非常重要的參數(shù)可以細(xì)細(xì)品味,但是哥們醒醒,接下來才是刺激的地方。

          線程池狀態(tài)

          首先我們先來聊聊線程池狀態(tài),線程池狀態(tài)是一個(gè)非常有趣的設(shè)計(jì)點(diǎn),ThreadPoolExecutor 使用 ctl 來存儲(chǔ)線程池狀態(tài),這些狀態(tài)也叫做線程池的生命周期。想想也是,線程池作為一個(gè)存儲(chǔ)管理線程的資源池,它自己也要有這些狀態(tài),以及狀態(tài)之間的變更才能更好的滿足我們的需求。ctl 其實(shí)就是一個(gè) AtomicInteger 類型的變量,保證原子性

          ctl 除了存儲(chǔ)線程池狀態(tài)之外,它還存儲(chǔ) workerCount 這個(gè)概念,workerCount 指示的是有效線程數(shù),workerCount 表示的是已經(jīng)被允許啟動(dòng)但不允許停止的工作線程數(shù)量。workerCount 的值與實(shí)際活動(dòng)線程的數(shù)量不同。

          ctl 高低位來判斷是線程池狀態(tài)還是工作線程數(shù)量,線程池狀態(tài)位于高位

          這里有個(gè)設(shè)計(jì)點(diǎn),為什么使用 AtomicInteger 而不是存儲(chǔ)上線更大的 AtomicLong 之類的呢?

          Lea 并非沒有考慮過這個(gè)問題,為了表示 int 值,目前 workerCount 的大小是**(2 ^ 29)-1(約 5 億個(gè)線程),而不是(2 ^ 31)-1(20億個(gè))可表示的線程**。如果將來有問題,可以將該變量更改為 AtomicLong。但是在需要之前,使用 int 可以使此代碼更快,更簡(jiǎn)單,int 存儲(chǔ)占用存儲(chǔ)空間更小。

          runState 具有如下幾種狀態(tài)

          private?static?final?int?RUNNING????=?-1?<private?static?final?int?SHUTDOWN???=??0?<private?static?final?int?STOP???????=??1?<private?static?final?int?TIDYING????=??2?<private?static?final?int?TERMINATED?=??3?<

          我們先上狀態(tài)輪轉(zhuǎn)圖,然后根據(jù)狀態(tài)輪轉(zhuǎn)圖做詳細(xì)的解釋。

          這幾種狀態(tài)的解釋如下

          • RUNNING: 如果線程池處于 RUNNING 狀態(tài)下的話,能夠接收新任務(wù),也能處理正在運(yùn)行的任務(wù)。可以從 ctl 的初始化得知,線程池一旦創(chuàng)建出來就會(huì)處于 RUNNING 狀態(tài),并且線程池中的有效線程數(shù)為 0。
          private?final?AtomicInteger?ctl?=?new?AtomicInteger(ctlOf(RUNNING,?0));
          • SHUTDOWN: 在調(diào)用 shutdown 方法后,線程池的狀態(tài)會(huì)由 RUNNING -> SHUTDOWN 狀態(tài),位于 SHUTDOWN 狀態(tài)的線程池能夠處理正在運(yùn)行的任務(wù),但是不能接受新的任務(wù),這和我們上面說的對(duì)于 shutdown 的描述一致。
          • STOP: 和 shutdown 方法類似,在調(diào)用 shutdownNow 方法時(shí),程序會(huì)從 RUNNING/SHUTDOWN -> STOP 狀態(tài),處于 STOP 狀態(tài)的線程池,不接收新任務(wù),不處理已添加的任務(wù),并且會(huì)中斷正在處理的任務(wù)。
          • TIDYING:TIDYING 狀態(tài)有個(gè)前置條件,分為兩種:一種是是當(dāng)線程池位于 SHUTDOWN 狀態(tài)下,阻塞隊(duì)列和線程池中的線程數(shù)量為空時(shí),會(huì)由 SHUTDOWN -> TIDYING;另一種是當(dāng)線程池位于 STOP 狀態(tài)下時(shí),線程池中的數(shù)量為空時(shí),會(huì)由 STOP -> TIDYING 狀態(tài)。轉(zhuǎn)換為 TIDYING 的線程池會(huì)調(diào)用 terminated這個(gè)鉤子方法,terminated 在 ThreadPoolExecutor 類中是空實(shí)現(xiàn),若用戶想在線程池變?yōu)?TIDYING 時(shí),進(jìn)行相應(yīng)的處理,可以通過重載 terminated 函數(shù)來實(shí)現(xiàn)。
          • TERMINATED:TERMINATED 狀態(tài)是線程池的最后一個(gè)狀態(tài),線程池處在 TIDYING 狀態(tài)時(shí),執(zhí)行完terminated 方法之后,就會(huì)由 TIDYING -> TERMINATED 狀態(tài)。此時(shí)表示線程池的徹底終止。

          重要變量

          下面我們一起來了解一下線程池中的重要變量。

          private?final?BlockingQueue?workQueue;

          阻塞隊(duì)列,這個(gè)和我們上面說的阻塞隊(duì)列的參數(shù)是一個(gè)意思,因?yàn)樵跇?gòu)造 ThreadPoolExecutor 時(shí),會(huì)把參數(shù)的值賦給 this.workQueue。

          private?final?ReentrantLock?mainLock?=?new?ReentrantLock();?

          線程池的主要狀態(tài)鎖,對(duì)線程池的狀態(tài)(比如線程池大小、運(yùn)行狀態(tài))的改變都需要使用到這個(gè)鎖

          private?final?HashSet?workers?=?new?HashSet();

          workers 持有線程池中所有線程的集合,只有持有上面 mainLock 的鎖才能夠訪問。

          private?final?Condition?termination?=?mainLock.newCondition();

          等待條件,用來支持 awaitTermination 方法。Condition 和 Lock 一起使用可以實(shí)現(xiàn)通知/等待機(jī)制。

          private?int?largestPoolSize;

          largestPoolSize 表示線程池中最大池的大小,只有持有 mainLock 才能訪問

          private?long?completedTaskCount;

          completedTaskCount 表示任務(wù)完成的計(jì)數(shù),它僅僅在任務(wù)終止時(shí)更新,需要持有 mainLock 才能訪問。

          private?volatile?ThreadFactory?threadFactory;

          threadFactory 是創(chuàng)建線程的工廠,所有的線程都會(huì)使用這個(gè)工廠,調(diào)用 addWorker 方法創(chuàng)建。

          private?volatile?RejectedExecutionHandler?handler;

          handler 表示拒絕策略,handler 會(huì)在線程飽和或者將要關(guān)閉的時(shí)候調(diào)用。

          private?volatile?long?keepAliveTime;

          保活時(shí)間,它指的是空閑線程等待工作的超時(shí)時(shí)間,當(dāng)存在多個(gè) corePoolSize 或 allowCoreThreadTimeOut 時(shí),線程將使用這個(gè)超時(shí)時(shí)間。

          下面是一些其他變量,這些變量比較簡(jiǎn)單,我就直接給出注釋了。

          private?volatile?boolean?allowCoreThreadTimeOut;???//是否允許為核心線程設(shè)置存活時(shí)間
          private?volatile?int???corePoolSize;?????//核心池的大小(即線程池中的線程數(shù)目大于這個(gè)參數(shù)時(shí),提交的任務(wù)會(huì)被放進(jìn)任務(wù)緩存隊(duì)列)
          private?volatile?int???maximumPoolSize;???//線程池最大能容忍的線程數(shù)
          private?static?final?RejectedExecutionHandler?defaultHandler?=
          ????????new?AbortPolicy();?//?默認(rèn)的拒絕策略

          任務(wù)提交

          現(xiàn)在我們知道了 ThreadPoolExecutor 創(chuàng)建出來就會(huì)處于運(yùn)行狀態(tài),此時(shí)線程數(shù)量為 0 ,等任務(wù)到來時(shí),線程池就會(huì)創(chuàng)建線程來執(zhí)行任務(wù),而下面我們的關(guān)注點(diǎn)就會(huì)放在任務(wù)提交這個(gè)過程上。

          通常情況下,我們會(huì)使用

          executor.execute()?

          來執(zhí)行任務(wù),我在很多書和博客教程上都看到過這個(gè)執(zhí)行過程,下面是一些書和博客教程所畫的 ThreadPoolExecutor 的執(zhí)行示意圖和執(zhí)行流程圖

          執(zhí)行示意圖

          處理流程圖

          ThreadPoolExecutor 的執(zhí)行 execute 的方法分為下面四種情況

          1. 如果當(dāng)前運(yùn)行的工作線程少于 corePoolSize 的話,那么會(huì)創(chuàng)建新線程來執(zhí)行任務(wù) ,這一步需要獲取 mainLock 全局鎖
          2. 如果運(yùn)行線程不小于 corePoolSize,則將任務(wù)加入 BlockingQueue 阻塞隊(duì)列。
          3. 如果無法將任務(wù)加入 BlockingQueue 中,此時(shí)的現(xiàn)象就是隊(duì)列已滿,此時(shí)需要?jiǎng)?chuàng)建新的線程來處理任務(wù),這一步同樣需要獲取 mainLock 全局鎖。
          4. 如果創(chuàng)建新線程會(huì)使當(dāng)前運(yùn)行的線程超過 maximumPoolSize 的話,任務(wù)將被拒絕,并且使用 RejectedExecutionHandler.rejectEExecution() 方法拒絕新的任務(wù)。

          ThreadPoolExecutor 采取上面的整體設(shè)計(jì)思路,是為了在執(zhí)行 execute 方法時(shí),避免獲取全局鎖,因?yàn)轭l繁獲取全局鎖會(huì)是一個(gè)嚴(yán)重的可伸縮瓶頸,所以,幾乎所有的 execute 方法調(diào)用都是通過執(zhí)行步驟2。

          上面指出了 execute 的運(yùn)行過程,整體上來說這個(gè)執(zhí)行過程把非常重要的點(diǎn)講解出來了,但是不夠細(xì)致,我查閱 ThreadPoolExecute 和部分源碼分析文章后,發(fā)現(xiàn)這事其實(shí)沒這么簡(jiǎn)單,先來看一下 execute 的源碼,我已經(jīng)給出了中文注釋

          public?void?execute(Runnable?command)?{
          ??if?(command?==?null)
          ????throw?new?NullPointerException();
          ??//?獲取?ctl?的值
          ??int?c?=?ctl.get();
          ??//?判斷?ctl?的值是否小于核心線程池的數(shù)量
          ??if?(workerCountOf(c)?????//?如果小于,增加工作隊(duì)列,command?就是一個(gè)個(gè)的任務(wù)
          ????if?(addWorker(command,?true))
          ??????//?線程創(chuàng)建成功,直接返回
          ??????return;
          ????//?線程添加不成功,需要再次判斷,每需要一次判斷都會(huì)獲取?ctl?的值
          ????c?=?ctl.get();
          ??}
          ??//?如果線程池處于運(yùn)行狀態(tài)并且能夠成功的放入阻塞隊(duì)列
          ??if?(isRunning(c)?&&?workQueue.offer(command))?{
          ????//?再次進(jìn)行檢查
          ????int?recheck?=?ctl.get();
          ????//?如果不是運(yùn)行態(tài)并且成功的從阻塞隊(duì)列中刪除
          ????if?(!?isRunning(recheck)?&&?remove(command))
          ??????//?執(zhí)行拒絕策略
          ??????reject(command);
          ????//?worker?線程數(shù)量是否為?0
          ????else?if?(workerCountOf(recheck)?==?0)
          ??????//?增加工作線程
          ??????addWorker(null,?false);
          ??}
          ??//?如果不能增加工作線程的數(shù)量,就會(huì)直接執(zhí)行拒絕策略
          ??else?if?(!addWorker(command,?false))
          ????reject(command);
          }

          下面是我根據(jù)源碼畫出的執(zhí)行流程圖

          下面我們針對(duì) execute 流程進(jìn)行分析,可能有點(diǎn)啰嗦,因?yàn)閹讉€(gè)核心流程上面已經(jīng)提過了,不過為了流程的完整性,我們?cè)僭谶@里重新提一下。

          1. 如果線程池的核心數(shù)量少于 corePoolSize,那么就會(huì)使用 addWorker 創(chuàng)建新線程,addworker 的流程我們會(huì)在下面進(jìn)行分析。如果創(chuàng)建成功,那么 execute 方法會(huì)直接返回。如果沒創(chuàng)建成功,可能是由于線程池已經(jīng) shutdown,可能是由于并發(fā)情況下 workerCountOf(c) < corePoolSize ,別的線程先創(chuàng)建了 worker 線程,導(dǎo)致 workerCoun t>= corePoolSize。
          2. 如果線程池還在 Running 狀態(tài),會(huì)將 task 加入阻塞隊(duì)列,加入成功后會(huì)進(jìn)行 double-check 雙重校驗(yàn),繼續(xù)下面的步驟,如果加入失敗,可能是由于隊(duì)列線程已滿,此時(shí)會(huì)判斷是否能夠加入線程池中,如果線程池也滿了的話,就會(huì)直接執(zhí)行拒絕策略,如果線程池能加入,execute 方法結(jié)束。
          3. 步驟 2 中的 double-check 主要是為了判斷進(jìn)入 workQueue 中的 task 是否能被執(zhí)行:如果線程池已經(jīng)不是 Running 狀態(tài),則應(yīng)該拒絕添加任務(wù),從 workQueue 隊(duì)列中刪除任務(wù)。如果線程池是 Running,但是從 workQueue 中刪除失敗了,此時(shí)的原因可能是由于其他線程執(zhí)行了這個(gè)任務(wù),此時(shí)會(huì)直接執(zhí)行拒絕策略。
          4. 如果線程是 Running 狀態(tài),并且不能把任務(wù)從隊(duì)列中移除,進(jìn)而判斷工作線程是否為 0 ,如果不為 0 ,execute 執(zhí)行完畢,如果工作線程是 0 ,則會(huì)使用 addWorker 增加工作線程,execute 執(zhí)行完畢。

          添加 worker 線程

          從上面的執(zhí)行流程可以看出,添加一個(gè) worker 涉及的工作也非常多,這也是一個(gè)比價(jià)難啃的點(diǎn),我們一起來分析下,這是 worker 的源碼

          private?boolean?addWorker(Runnable?firstTask,?boolean?core)?{
          ??//?retry?的用法相當(dāng)于?goto
          ??retry:
          ??for?(;;)?{
          ????int?c?=?ctl.get();
          ????int?rs?=?runStateOf(c);

          ????//?Check?if?queue?empty?only?if?necessary.
          ????//?僅在必要時(shí)檢查隊(duì)列是否為空。
          ????//?線程池狀態(tài)有五種,state?越小越是運(yùn)行狀態(tài)
          ????//?rs?>=?SHUTDOWN,表示此時(shí)線程池狀態(tài)可能是?SHUTDOWN、STOP、TIDYING、TERMINATED
          ????//?默認(rèn)?rs?>=?SHUTDOWN,如果?rs?=?SHUTDOWN,直接返回?false
          ????//?默認(rèn)?rs?
          ????//?默認(rèn)?RUNNING,任務(wù)是空,如果工作隊(duì)列為空,返回?false
          ????//
          ????if?(rs?>=?SHUTDOWN?&&
          ????????!?(rs?==?SHUTDOWN?&&
          ???????????firstTask?==?null?&&
          ???????????!?workQueue.isEmpty()))
          ??????return?false;


          ????//?執(zhí)行循環(huán)
          ????for?(;;)?{
          ??????//?統(tǒng)計(jì)工作線程數(shù)量
          ??????int?wc?=?workerCountOf(c);
          ??????//?如果?worker?數(shù)量>線程池最大上限?CAPACITY(即使用int低29位可以容納的最大值)
          ??????//?或者?worker數(shù)量?>?corePoolSize?或?worker數(shù)量>maximumPoolSize?),即已經(jīng)超過了給定的邊界
          ??????if?(wc?>=?CAPACITY?||
          ??????????wc?>=?(core???corePoolSize?:?maximumPoolSize))
          ????????return?false;

          ??????//?使用 CAS 增加 worker 數(shù)量,增加成功,跳出循環(huán)。
          ??????if?(compareAndIncrementWorkerCount(c))
          ????????break?retry;

          ??????//?檢查?ctl
          ??????c?=?ctl.get();??//?Re-read?ctl
          ??????//?如果狀態(tài)不等于之前獲取的?state,跳出內(nèi)層循環(huán),繼續(xù)去外層循環(huán)判斷
          ??????if?(runStateOf(c)?!=?rs)
          ????????continue?retry;
          ??????//?else?CAS?failed?due?to?workerCount?change;?retry?inner?loop
          ????}
          ??}

          ??/*
          ??????????worker數(shù)量+1成功的后續(xù)操作
          ????????*?添加到?workers?Set?集合,并啟動(dòng)?worker?線程
          ?????????*/

          ??boolean?workerStarted?=?false;
          ??boolean?workerAdded?=?false;
          ??Worker?w?=?null;
          ??try?{
          ????//?包裝?Runnable?對(duì)象
          ????//?設(shè)置?firstTask?的值為?-1
          ????//?賦值給當(dāng)前任務(wù)
          ????//?使用?worker?自身這個(gè)?runnable,調(diào)用?ThreadFactory?創(chuàng)建一個(gè)線程,并設(shè)置給worker的成員變量thread
          ????w?=?new?Worker(firstTask);
          ????final?Thread?t?=?w.thread;
          ????if?(t?!=?null)?{
          ??????final?ReentrantLock?mainLock?=?this.mainLock;
          ??????mainLock.lock();
          ??????try?{
          ????????//?在持有鎖的時(shí)候重新檢查
          ????????//?如果 ThreadFactory 失敗或在獲得鎖之前關(guān)閉,請(qǐng)回退。
          ????????int?rs?=?runStateOf(ctl.get());

          ????????//如果線程池在運(yùn)行?running
          ????????//?(可能是?workQueue?中仍有未執(zhí)行完成的任務(wù),創(chuàng)建沒有初始任務(wù)的?worker?線程執(zhí)行)
          ????????//worker?數(shù)量?-1?的操作在?addWorkerFailed()
          ????????if?(rs?????????????(rs?==?SHUTDOWN?&&?firstTask?==?null))?{
          ??????????if?(t.isAlive())?//?precheck?that?t?is?startable
          ????????????throw?new?IllegalThreadStateException();

          ??????????//?workers?就是一個(gè)?HashSet?集合
          ??????????workers.add(w);

          ??????????//?設(shè)置最大的池大小?largestPoolSize,workerAdded?設(shè)置為true
          ??????????int?s?=?workers.size();
          ??????????if?(s?>?largestPoolSize)
          ????????????largestPoolSize?=?s;
          ??????????workerAdded?=?true;
          ????????}
          ??????}?finally?{
          ????????mainLock.unlock();
          ??????}
          ??????if?(workerAdded)?{
          ????????t.start();
          ????????workerStarted?=?true;
          ??????}
          ????}
          ????//如果啟動(dòng)線程失敗
          ????//?worker?數(shù)量?-1
          ??}?finally?{
          ????if?(!?workerStarted)
          ??????addWorkerFailed(w);
          ??}
          ??return?workerStarted;
          }

          媽的真長(zhǎng)的一個(gè)方法,有點(diǎn)想吐血,其實(shí)我肝到現(xiàn)在已經(jīng)肝不動(dòng)了,但我一想到看這篇文章的讀者們能給我一個(gè)關(guān)注,就算咳出一口老血也值了。

          這個(gè)方法的執(zhí)行流程圖如下

          這里我們就不再文字描述了,但是上面流程圖中有一個(gè)對(duì)象引起了我的注意,那就是 worker 對(duì)象,這個(gè)對(duì)象就代表了線程池中的工作線程,那么這個(gè) worker 對(duì)象到底是啥呢?

          worker 對(duì)象

          Worker 位于 ThreadPoolExecutor 內(nèi)部,它繼承了 AQS 類并且實(shí)現(xiàn)了 Runnable 接口。Worker 類主要維護(hù)了線程運(yùn)行過程中的中斷控制狀態(tài)。它提供了鎖的獲取和釋放操作。在 worker 的實(shí)現(xiàn)中,我們使用了非重入的互斥鎖而不是使用重復(fù)鎖,因?yàn)?Lea 覺得我們不應(yīng)該在調(diào)用諸如 setCorePoolSize 之類的控制方法時(shí)能夠重新獲取鎖。

          worker 對(duì)象的源碼比較簡(jiǎn)單和標(biāo)準(zhǔn),這里我們只說一下 worker 對(duì)象的構(gòu)造方法,也就是

          Worker(Runnable?firstTask)?{
          ??setState(-1);?
          ??this.firstTask?=?firstTask;
          ??this.thread?=?getThreadFactory().newThread(this);
          }

          構(gòu)造一個(gè) worker 對(duì)象需要做三步操作:

          • 初始 AQS 狀態(tài)為 -1,此時(shí)不允許中斷 interrupt(),只有在 worker 線程啟動(dòng)了,執(zhí)行了 runWorker() 方法后,將 state 置為0,才能進(jìn)行中斷。
          • 將 firstTask 賦值給為當(dāng)前類的全局變量
          • 通過 ThreadFactory 創(chuàng)建一個(gè)新的線程。

          ###任務(wù)運(yùn)行

          我們前面的流程主要分析了線程池的 execute 方法的執(zhí)行過程,這個(gè)執(zhí)行過程相當(dāng)于是任務(wù)提交過程,而我們下面要說的是從隊(duì)列中獲取任務(wù)并運(yùn)行的這個(gè)工作流程。

          一般情況下,我們會(huì)從初始任務(wù)開始運(yùn)行,所以我們不需要獲取第一個(gè)任務(wù)。否則,只要線程池還處于 Running 狀態(tài),我們會(huì)調(diào)用 getTask 方法獲取任務(wù)。getTask 方法可能會(huì)返回 null,此時(shí)可能是由于線程池狀態(tài)改變或者是配置參數(shù)更改而導(dǎo)致的退出。還有一種情況可能是由于 異常 而引發(fā)的,這個(gè)我們后面會(huì)細(xì)說。

          下面來看一下 runWorker 方法的源碼:

          final?void?runWorker(Worker?w)?{
          ??Thread?wt?=?Thread.currentThread();
          ??Runnable?task?=?w.firstTask;
          ??w.firstTask?=?null;
          ??//?允許打斷
          ??//??new?Worker()?是?state==-1,此處是調(diào)用?Worker?類的?tryRelease()?方法,
          ??//??將?state?置為0
          ??w.unlock();
          ??boolean?completedAbruptly?=?true;
          ??try?{
          ????//?調(diào)用?getTask()?獲取任務(wù)
          ????while?(task?!=?null?||?(task?=?getTask())?!=?null)?{
          ??????//?獲取全局鎖
          ??????w.lock();
          ??????//?確保只有在線程 STOPING 時(shí),才會(huì)被設(shè)置中斷標(biāo)志,否則清除中斷標(biāo)志。
          ??????//?如果一開始判斷線程池狀態(tài)?
          ??????//?即線程已經(jīng)被中斷,又清除了中斷標(biāo)示,再次判斷線程池狀態(tài)是否?>=?stop
          ??????//?是,再次設(shè)置中斷標(biāo)示,wt.interrupt()
          ??????//?否,不做操作,清除中斷標(biāo)示后進(jìn)行后續(xù)步驟
          ??????if?((runStateAtLeast(ctl.get(),?STOP)?||
          ???????????(Thread.interrupted()?&&
          ????????????runStateAtLeast(ctl.get(),?STOP)))?&&
          ??????????!wt.isInterrupted())
          ????????wt.interrupt();
          ??????try?{
          ????????//?執(zhí)行前需要調(diào)用的方法,交給程序員自己來實(shí)現(xiàn)
          ????????beforeExecute(wt,?task);
          ????????Throwable?thrown?=?null;
          ????????try?{
          ??????????task.run();
          ????????}?catch?(RuntimeException?x)?{
          ??????????thrown?=?x;?throw?x;
          ????????}?catch?(Error?x)?{
          ??????????thrown?=?x;?throw?x;
          ????????}?catch?(Throwable?x)?{
          ??????????thrown?=?x;?throw?new?Error(x);
          ????????}?finally?{
          ??????????//?執(zhí)行后需要調(diào)用的方法,交給程序員自己來實(shí)現(xiàn)
          ??????????afterExecute(task,?thrown);
          ????????}
          ??????}?finally?{
          ????????//?把?task?置為?null,完成任務(wù)數(shù)?+?1,并進(jìn)行解鎖
          ????????task?=?null;
          ????????w.completedTasks++;
          ????????w.unlock();
          ??????}
          ????}
          ????completedAbruptly?=?false;
          ????//?最后處理?worker?的退出
          ??}?finally?{
          ????processWorkerExit(w,?completedAbruptly);
          ??}
          }

          下面是 runWorker 的執(zhí)行流程圖

          這里需要注意一下最后的 processWorkerExit 方法,這里面其實(shí)也做了很多事情,包括判斷 completedAbruptly 的布爾值來表示是否完成任務(wù),獲取鎖,嘗試從隊(duì)列中移除 worker,然后嘗試中斷,接下來會(huì)判斷一下中斷狀態(tài),在線程池當(dāng)前狀態(tài)小于 STOP 的情況下會(huì)創(chuàng)建一個(gè)新的 worker 來替換被銷毀的 worker。

          任務(wù)獲取

          任務(wù)獲取就是 getTask 方法的執(zhí)行過程,這個(gè)環(huán)節(jié)主要用來獲取任務(wù)和剔除任務(wù)。下面進(jìn)入源碼分析環(huán)節(jié)

          private?Runnable?getTask()?{
          ??//?判斷最后一個(gè) poll 是否超時(shí)。
          ??boolean?timedOut?=?false;?//?Did?the?last?poll()?time?out?

          ??for?(;;)?{
          ????int?c?=?ctl.get();
          ????int?rs?=?runStateOf(c);

          ????//?Check?if?queue?empty?only?if?necessary.
          ????//?必要時(shí)檢查隊(duì)列是否為空
          ????//?對(duì)線程池狀態(tài)的判斷,兩種情況會(huì)?workerCount-1,并且返回?null
          ????//?線程池狀態(tài)為?shutdown,且?workQueue?為空(反映了?shutdown?狀態(tài)的線程池還是要執(zhí)行?workQueue?中剩余的任務(wù)的)
          ????//?線程池狀態(tài)為?stop(shutdownNow()?會(huì)導(dǎo)致變成?STOP)(此時(shí)不用考慮?workQueue?的情況)
          ????if?(rs?>=?SHUTDOWN?&&?(rs?>=?STOP?||?workQueue.isEmpty()))?{
          ??????decrementWorkerCount();
          ??????return?null;
          ????}

          ????int?wc?=?workerCountOf(c);

          ????//?Are?workers?subject?to?culling?
          ????//?是否需要定時(shí)從?workQueue?中獲取
          ????boolean?timed?=?allowCoreThreadTimeOut?||?wc?>?corePoolSize;

          ????//?如果工作線程的數(shù)量大于?maximumPoolSize?會(huì)進(jìn)行線程剔除
          ????//?如果使用了?allowCoreThreadTimeOut?,并且工作線程不為0或者隊(duì)列有任務(wù)的話,會(huì)直接進(jìn)行線程剔除
          ????if?((wc?>?maximumPoolSize?||?(timed?&&?timedOut))
          ????????&&?(wc?>?1?||?workQueue.isEmpty()))?{
          ??????if?(compareAndDecrementWorkerCount(c))
          ????????return?null;
          ??????continue;
          ????}
          ??
          ????try?{
          ??????Runnable?r?=?timed??
          ????????workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS)?:
          ??????workQueue.take();
          ??????if?(r?!=?null)
          ????????return?r;
          ??????timedOut?=?true;
          ????}?catch?(InterruptedException?retry)?{
          ??????timedOut?=?false;
          ????}
          ??}
          }

          getTask 方法的執(zhí)行流程圖如下

          工作線程退出

          工作線程退出是 runWorker 的最后一步,這一步會(huì)判斷工作線程是否突然終止,并且會(huì)嘗試終止線程,以及是否需要增加線程來替換原工作線程。

          private?void?processWorkerExit(Worker?w,?boolean?completedAbruptly)?{
          ??//?worker數(shù)量?-1
          ??//?completedAbruptly?是?true,突然終止,說明是?task?執(zhí)行時(shí)異常情況導(dǎo)致,即run()方法執(zhí)行時(shí)發(fā)生了異常,那么正在工作的?worker?線程數(shù)量需要-1
          ??//?completedAbruptly?是?false?是突然終止,說明是?worker?線程沒有?task?可執(zhí)行了,不用-1,因?yàn)橐呀?jīng)在?getTask()?方法中-1了
          ??if?(completedAbruptly)?//?If?abrupt,?then?workerCount?wasn't?adjusted
          ????decrementWorkerCount();

          ??//?從?Workers?Set?中移除?worker
          ??final?ReentrantLock?mainLock?=?this.mainLock;
          ??mainLock.lock();
          ??try?{
          ????completedTaskCount?+=?w.completedTasks;
          ????workers.remove(w);
          ??}?finally?{
          ????mainLock.unlock();
          ??}

          ??//?嘗試終止線程,
          ??tryTerminate();

          ??//?是否需要增加?worker?線程
          ??//?線程池狀態(tài)是?running?或?shutdown
          ??//?如果當(dāng)前線程是突然終止的,addWorker()
          ??//?如果當(dāng)前線程不是突然終止的,但當(dāng)前線程數(shù)量?
          ??//?故如果調(diào)用線程池?shutdown(),直到workQueue為空前,線程池都會(huì)維持?corePoolSize?個(gè)線程,
          ??//?然后再逐漸銷毀這?corePoolSize?個(gè)線程
          ??int?c?=?ctl.get();
          ??if?(runStateLessThan(c,?STOP))?{
          ????if?(!completedAbruptly)?{
          ??????int?min?=?allowCoreThreadTimeOut???0?:?corePoolSize;
          ??????if?(min?==?0?&&?!?workQueue.isEmpty())
          ????????min?=?1;
          ??????if?(workerCountOf(c)?>=?min)
          ????????return;?//?replacement?not?needed
          ????}
          ????addWorker(null,?false);
          ??}
          }

          源碼搞的有點(diǎn)頭大了,可能一時(shí)半會(huì)無法理解上面這些源碼,不過你可以先把注釋粘過去,等有時(shí)間了需要反復(fù)刺激,加深印象!

          其他線程池

          下面我們來了解一下其他線程池的構(gòu)造原理,主要涉及 FixedThreadPool、SingleThreadExecutor、CachedThreadPool

          newFixedThreadPool

          newFixedThreadPool 被稱為可重用固定線程數(shù)的線程池,下面是 newFixedThreadPool 的源碼

          public?static?ExecutorService?newFixedThreadPool(int?nThreads)?{
          ??return?new?ThreadPoolExecutor(nThreads,?nThreads,
          ????????????????????????????????0L,?TimeUnit.MILLISECONDS,
          ????????????????????????????????new?LinkedBlockingQueue());
          }

          可以看到,newFixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被設(shè)置為創(chuàng)建 FixedThreadPool 時(shí)指定的參數(shù) nThreads,也就是說,在 newFiexedThreadPool 中,核心線程數(shù)就是最大線程數(shù)。

          下面是 newFixedThreadPool 的執(zhí)行示意圖

          newFixedThreadPool 的工作流程如下

          • 如果當(dāng)前運(yùn)行的線程數(shù)少于 corePoolSize,則會(huì)創(chuàng)建新線程 addworker 來執(zhí)行任務(wù)
          • 如果當(dāng)前線程的線程數(shù)等于 corePoolSize,會(huì)將任務(wù)直接加入到 LinkedBlockingQueue 無界阻塞隊(duì)列中,LinkedBlockingQueue 的上限如果沒有制定,默認(rèn)為 Integer.MAX_VALUE 大小。
          • 等到線程池中的任務(wù)執(zhí)行完畢后,newFixedThreadPool 會(huì)反復(fù)從 LinkedBlockingQueue 中獲取任務(wù)來執(zhí)行。

          相較于 ThreadPoolExecutor,newFixedThreadPool 主要做了以下改變

          • 核心線程數(shù)等于最大線程數(shù),因此 newFixedThreadPool 只有兩個(gè)最大容量,一個(gè)是線程池的線程容量,還有一個(gè)是 LinkedBlockingQueue 無界阻塞隊(duì)列的線程容量。

          • 這里可以看到還有一個(gè)變化是 0L,也就是 keepAliveTime = 0L,keepAliveTime 就是到達(dá)工作線程最大容量后的線程等待時(shí)間,0L 就意味著當(dāng)線程池中的線程數(shù)大于 corePoolsize 時(shí),空余的線程會(huì)被立即終止。

          • 由于使用無界隊(duì)列,運(yùn)行中的 newFixedThreadPool 不會(huì)拒絕任務(wù),也就是不會(huì)調(diào)用 RejectedExecutionHandler.rejectedExecution 方法。

          newSingleThreadExecutor

          newSingleThreadExecutor 中只有單個(gè)工作線程,也就是說它是一個(gè)只有單個(gè) worker 的 Executor。

          public?static?ExecutorService?newSingleThreadExecutor(ThreadFactory?threadFactory)?{
          ??return?new?FinalizableDelegatedExecutorService
          ????(new?ThreadPoolExecutor(1,?1,
          ????????????????????????????0L,?TimeUnit.MILLISECONDS,
          ????????????????????????????new?LinkedBlockingQueue(),
          ????????????????????????????threadFactory));
          }

          可以看到,在 newSingleThreadExecutor 中,corePoolSize 和 maximumPoolSize 都被設(shè)置為 1,也不存在超時(shí)情況,同樣使用了 LinkedBlockingQueue 無界阻塞隊(duì)列,除了 corePoolSize 和 maximumPoolSize 外,其他幾乎和 newFixedThreadPool 一模一樣。

          下面是 newSingleThreadExecutor ?的執(zhí)行示意圖

          newSingleThreadExecutor 的執(zhí)行過程和 newFixedThreadPool 相同,只是 newSingleThreadExecutor 的工作線程數(shù)為 1。

          newCachedThreadPool

          newCachedThreadPool 是一個(gè)根據(jù)需要?jiǎng)?chuàng)建工作線程的線程池,newCachedThreadPool 線程池最大數(shù)量是 Integer.MAX_VALUE,保活時(shí)間是 60 秒,使用的是SynchronousQueue 無緩沖阻塞隊(duì)列。

          public?static?ExecutorService?newCachedThreadPool(ThreadFactory?threadFactory)?{
          ??return?new?ThreadPoolExecutor(0,?Integer.MAX_VALUE,
          ????????????????????????????????60L,?TimeUnit.SECONDS,
          ????????????????????????????????new?SynchronousQueue(),
          ????????????????????????????????threadFactory);
          }

          它的執(zhí)行示意圖如下

          • 首先會(huì)先執(zhí)行 SynchronousQueue.offer 方法,如果當(dāng)前 maximumPool 中有空閑線程正在執(zhí)行 SynchronousQueue.poll ,就會(huì)把任務(wù)交給空閑線程來執(zhí)行,execute 方法執(zhí)行完畢,否則的話,繼續(xù)向下執(zhí)行。
          • 如果 maximumPool 中沒有線程執(zhí)行 SynchronousQueue.poll 方法,這種情況下 newCachedThreadPool 會(huì)創(chuàng)建一個(gè)新線程執(zhí)行任務(wù),execute 方法執(zhí)行完成。
          • 執(zhí)行完成的線程將執(zhí)行 poll 操作,這個(gè) poll 操作會(huì)讓空閑線程最多在 SynchronousQueue 中等待 60 秒鐘。如果 60 秒鐘內(nèi)提交了一個(gè)新任務(wù),那么空閑線程會(huì)執(zhí)行這個(gè)新提交的任務(wù),否則空閑線程將會(huì)終止。

          這里的關(guān)鍵點(diǎn)在于 SynchronousQueue 隊(duì)列,它是一個(gè)沒有容量的阻塞隊(duì)列。每個(gè)插入操作必須等待另一個(gè)線程對(duì)應(yīng)的移除操作。這其實(shí)就是一種任務(wù)傳遞,如下圖所示

          其實(shí)還有一個(gè)線程池 ScheduledThreadPoolExecutor ,就先不在此篇文章做詳細(xì)贅述了。

          線程池實(shí)踐考量因素

          下面介紹幾種在實(shí)踐過程中使用線程池需要考慮的幾個(gè)點(diǎn)

          • 避免任務(wù)堆積,比如我們上面提到的 newFixedThreadPool,它是創(chuàng)建指定數(shù)目的線程,但是工作隊(duì)列是無界的,這就導(dǎo)致如果工作隊(duì)列線程太少,導(dǎo)致處理速度跟不上入隊(duì)速度,這種情況下很可能會(huì)導(dǎo)致 OOM,診斷時(shí)可以使用 jmap 檢查是否有大量任務(wù)入隊(duì)。
          • 生產(chǎn)實(shí)踐中很可能由于邏輯不嚴(yán)謹(jǐn)或者工作線程不能及時(shí)釋放導(dǎo)致 線程泄漏,這個(gè)時(shí)候最好檢查一下線程棧
          • 避免死鎖等同步問題
          • 盡量避免在使用線程池時(shí)操作 ThreadLocal,因?yàn)楣ぷ骶€程的生命周期可能會(huì)超過任務(wù)的生命周期。

          線程池大小的設(shè)置

          線程池大小的設(shè)置也是面試官經(jīng)常會(huì)考到的一個(gè)點(diǎn),一般需要根據(jù)任務(wù)類型來配置線程池大小

          • 如果是 CPU 密集型任務(wù),那么就意味著 CPU 是稀缺資源,這個(gè)時(shí)候我們通常不能通過增加線程數(shù)來提高計(jì)算能力,因?yàn)榫€程數(shù)量太多,會(huì)導(dǎo)致頻繁的上下文切換,一般這種情況下,建議合理的線程數(shù)值是 N(CPU)數(shù) + 1
          • 如果是 I/O 密集型任務(wù),就說明需要較多的等待,這個(gè)時(shí)候可以參考 Brain Goetz 的推薦方法 線程數(shù) = CPU核數(shù) × (1 + 平均等待時(shí)間/平均工作時(shí)間)。參考值可以是 N(CPU) 核數(shù) * 2。

          當(dāng)然,這只是一個(gè)參考值,具體的設(shè)置還需要根據(jù)實(shí)際情況進(jìn)行調(diào)整,比如可以先將線程池大小設(shè)置為參考值,再觀察任務(wù)運(yùn)行情況和系統(tǒng)負(fù)載、資源利用率來進(jìn)行適當(dāng)調(diào)整。

          后記

          這篇文章真的寫了很久,因?yàn)橹皩?duì)線程池認(rèn)識(shí)不是很深,所以花了大力氣來研究,希望這篇文章對(duì)你有所幫助。

          這篇文章探討了對(duì) Executor 框架的主要組成、線程池結(jié)構(gòu)與生命周期,線程池源碼和線程池實(shí)現(xiàn)的細(xì)節(jié)等方面進(jìn)行了講解和分析,希望對(duì)你有所幫助。

          如果這篇文章寫的還不錯(cuò),希望讀者朋友們可以不吝給出四連:點(diǎn)贊、在看、留言、分享,記住這次一定哦!

          我是 cxuan ,認(rèn)真寫好每篇文章的技術(shù)人,歡迎關(guān)注我的公眾號(hào)






          ?往期推薦?

          ??

          ARP,這個(gè)隱匿在計(jì)網(wǎng)背后的男人

          我畫了 40 張圖就是為了讓你搞懂計(jì)算機(jī)網(wǎng)絡(luò)層

          沖吧!考研人!

          閱片無數(shù)的 cxuan 給你推薦比某 hub 更爽的網(wǎng)站

          路由器你竟然是這樣的...

          搞懂這 10 張腦圖后,我膨脹了。


          瀏覽 67
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产裸体XXXX187 | 一级毛片a一级毛片免费看黄道婆 | 亚洲免费观看AV四虎 | 免费A级毛片 | 色婷在线视频 |