<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          面試官:詳細(xì)說(shuō)一下Java線程池,從設(shè)計(jì)思想到源碼解讀!

          共 2834字,需瀏覽 6分鐘

           ·

          2022-01-08 20:48

          你知道的越多,不知道的就越多,業(yè)余的像一棵小草!

          你來(lái),我們一起精進(jìn)!你不來(lái),我和你的競(jìng)爭(zhēng)對(duì)手一起精進(jìn)!

          編輯:業(yè)余草

          blog.csdn.net/mu_wind

          推薦:https://www.xttblog.com/?p=5306

          今天說(shuō)一說(shuō),線程池,從設(shè)計(jì)思想到源碼解析。

          前言

          各位小伙伴兒,春節(jié)已經(jīng)結(jié)束了,在此獻(xiàn)上一篇肝了一個(gè)春節(jié)假期的遲來(lái)的拜年之作,希望讀者朋友們都能有收獲。多多點(diǎn)贊、評(píng)論、收藏!

          初識(shí)線程池

          我們知道,線程的創(chuàng)建和銷毀都需要映射到操作系統(tǒng),因此其代價(jià)是比較高昂的。出于避免頻繁創(chuàng)建、銷毀線程以及方便線程管理的需要,線程池應(yīng)運(yùn)而生。

          線程池優(yōu)勢(shì)

          • 「降低資源消耗」:線程池通常會(huì)維護(hù)一些線程(數(shù)量為 corePoolSize),這些線程被重復(fù)使用來(lái)執(zhí)行不同的任務(wù),任務(wù)完成后不會(huì)銷毀。在待處理任務(wù)量很大的時(shí)候,通過(guò)對(duì)線程資源的復(fù)用,避免了線程的頻繁創(chuàng)建與銷毀,從而降低了系統(tǒng)資源消耗。
          • 「提高響應(yīng)速度」:由于線程池維護(hù)了一批 alive 狀態(tài)的線程,當(dāng)任務(wù)到達(dá)時(shí),不需要再創(chuàng)建線程,而是直接由這些線程去執(zhí)行任務(wù),從而減少了任務(wù)的等待時(shí)間。
          • 「提高線程的可管理性」:使用線程池可以對(duì)線程進(jìn)行統(tǒng)一的分配,調(diào)優(yōu)和監(jiān)控。

          線程池設(shè)計(jì)思路

          有句話叫做藝術(shù)來(lái)源于生活,編程語(yǔ)言也是如此,很多設(shè)計(jì)思想能映射到日常生活中,比如面向?qū)ο笏枷搿⒎庋b、繼承,等等。今天我們要說(shuō)的線程池,它同樣可以在現(xiàn)實(shí)世界找到對(duì)應(yīng)的實(shí)體——工廠。

          先假想一個(gè)工廠的生產(chǎn)流程:

          線程池設(shè)計(jì)思路

          工廠中有固定的一批工人,稱為正式工人,工廠接收的訂單由這些工人去完成。當(dāng)訂單增加,正式工人已經(jīng)忙不過(guò)來(lái)了,工廠會(huì)將生產(chǎn)原料暫時(shí)堆積在倉(cāng)庫(kù)中,等有空閑的工人時(shí)再處理(因?yàn)楣と丝臻e了也不會(huì)主動(dòng)處理倉(cāng)庫(kù)中的生產(chǎn)任務(wù),所以需要調(diào)度員實(shí)時(shí)調(diào)度)。倉(cāng)庫(kù)堆積滿了后,訂單還在增加怎么辦?工廠只能臨時(shí)擴(kuò)招一批工人來(lái)應(yīng)對(duì)生產(chǎn)高峰,而這批工人高峰結(jié)束后是要清退的,所以稱為臨時(shí)工。當(dāng)時(shí)臨時(shí)工也以招滿后(受限于工位限制,臨時(shí)工數(shù)量有上限),后面的訂單只能忍痛拒絕了。

          我們做如下一番映射:

          • 工廠——線程池
          • 訂單——任務(wù)(Runnable)
          • 正式工人——核心線程
          • 臨時(shí)工——普通線程
          • 倉(cāng)庫(kù)——任務(wù)隊(duì)列
          • 調(diào)度員——getTask()
          ?

          getTask()是一個(gè)方法,將任務(wù)隊(duì)列中的任務(wù)調(diào)度給空閑線程,在解讀線程池有詳細(xì)介紹

          ?

          映射后,形成線程池流程圖如下,兩者是不是有異曲同工之妙?

          線程池流程圖

          這樣,線程池的工作原理或者說(shuō)流程就很好理解了,提煉成一個(gè)簡(jiǎn)圖:

          線程池的工作原理

          深入線程池

          那么接下來(lái),問(wèn)題來(lái)了,線程池是具體如何實(shí)現(xiàn)這套工作機(jī)制的呢?從Java線程池Executor框架體系可以看出:線程池的真正實(shí)現(xiàn)類是ThreadPoolExecutor,因此我們接下來(lái)重點(diǎn)研究這個(gè)類。

          線程池工作機(jī)制

          構(gòu)造方法

          研究一個(gè)類,先從它的構(gòu)造方法開(kāi)始。ThreadPoolExecutor提供了4個(gè)有參構(gòu)造方法:

          public?ThreadPoolExecutor(int?corePoolSize,
          ??????????????????????????int?maximumPoolSize,
          ??????????????????????????long?keepAliveTime,
          ??????????????????????????TimeUnit?unit,
          ??????????????????????????BlockingQueue?workQueue)
          ?
          {
          ????this(corePoolSize,?maximumPoolSize,?keepAliveTime,?unit,?workQueue,
          ?????????????Executors.defaultThreadFactory(),?defaultHandler);
          }

          public?ThreadPoolExecutor(int?corePoolSize,
          ??????????????????????????int?maximumPoolSize,
          ??????????????????????????long?keepAliveTime,
          ??????????????????????????TimeUnit?unit,
          ??????????????????????????BlockingQueue?workQueue,
          ??????????????????????????ThreadFactory?threadFactory)
          ?
          {
          ????this(corePoolSize,?maximumPoolSize,?keepAliveTime,?unit,?workQueue,
          ?????????????threadFactory,?defaultHandler);
          }

          public?ThreadPoolExecutor(int?corePoolSize,
          ??????????????????????????int?maximumPoolSize,
          ??????????????????????????long?keepAliveTime,
          ??????????????????????????TimeUnit?unit,
          ??????????????????????????BlockingQueue?workQueue,
          ??????????????????????????RejectedExecutionHandler?handler)
          ?
          {
          ????this(corePoolSize,?maximumPoolSize,?keepAliveTime,?unit,?workQueue,
          ?????????????Executors.defaultThreadFactory(),?handler);
          }

          public?ThreadPoolExecutor(int?corePoolSize,
          ??????????????????????????int?maximumPoolSize,
          ??????????????????????????long?keepAliveTime,
          ??????????????????????????TimeUnit?unit,
          ??????????????????????????BlockingQueue?workQueue,
          ??????????????????????????ThreadFactory?threadFactory,
          ??????????????????????????RejectedExecutionHandler?handler)
          ?
          {
          ????if?(corePoolSize?0?||
          ????????maximumPoolSize?<=?0?||
          ????????maximumPoolSize?????????keepAliveTime?0)
          ????????throw?new?IllegalArgumentException();
          ????if?(workQueue?==?null?||?threadFactory?==?null?||?handler?==?null)
          ????????throw?new?NullPointerException();
          ????this.corePoolSize?=?corePoolSize;
          ????this.maximumPoolSize?=?maximumPoolSize;
          ????this.workQueue?=?workQueue;
          ????this.keepAliveTime?=?unit.toNanos(keepAliveTime);
          ????this.threadFactory?=?threadFactory;
          ????this.handler?=?handler;
          }

          解釋一下構(gòu)造方法中涉及到的參數(shù):

          • 「corePoolSize」(必需):核心線程數(shù)。即池中一直保持存活的線程數(shù),即使這些線程處于空閑。但是將allowCoreThreadTimeOut參數(shù)設(shè)置為true后,核心線程處于空閑一段時(shí)間以上,也會(huì)被回收。
          • 「maximumPoolSize」(必需):池中允許的最大線程數(shù)。當(dāng)核心線程全部繁忙且任務(wù)隊(duì)列打滿之后,線程池會(huì)臨時(shí)追加線程,直到總線程數(shù)達(dá)到maximumPoolSize這個(gè)上限。
          • 「keepAliveTime」(必需):線程空閑超時(shí)時(shí)間。當(dāng)非核心線程處于空閑狀態(tài)的時(shí)間超過(guò)這個(gè)時(shí)間后,該線程將被回收。將allowCoreThreadTimeOut參數(shù)設(shè)置為true后,核心線程也會(huì)被回收。
          • 「unit」(必需):keepAliveTime參數(shù)的時(shí)間單位。有:TimeUnit.DAYS(天)、TimeUnit.HOURS(小時(shí))、TimeUnit.MINUTES(分鐘)、「TimeUnit.SECONDS(秒)」「TimeUnit.MILLISECONDS(毫秒)」、TimeUnit.MICROSECONDS(微秒)、TimeUnit.NANOSECONDS(納秒)
          • 「workQueue」(必需):任務(wù)隊(duì)列,采用阻塞隊(duì)列實(shí)現(xiàn)。當(dāng)核心線程全部繁忙時(shí),后續(xù)由execute方法提交的Runnable將存放在任務(wù)隊(duì)列中,等待被線程處理。
          • 「threadFactory」(可選):線程工廠。指定線程池創(chuàng)建線程的方式。
          • 「handler」(可選):拒絕策略。當(dāng)線程池中線程數(shù)達(dá)到maximumPoolSizeworkQueue打滿時(shí),后續(xù)提交的任務(wù)將被拒絕,handler可以指定用什么方式拒絕任務(wù)。

          放到一起再看一下:

          工廠與線程池

          任務(wù)隊(duì)列

          使用ThreadPoolExecutor需要指定一個(gè)實(shí)現(xiàn)了BlockingQueue接口的任務(wù)等待隊(duì)列。在ThreadPoolExecutor線程池的API文檔中,一共推薦了三種等待隊(duì)列,它們是:SynchronousQueueLinkedBlockingQueueArrayBlockingQueue

          1. 「SynchronousQueue」:同步隊(duì)列。這是一個(gè)內(nèi)部沒(méi)有任何容量的阻塞隊(duì)列,任何一次插入操作的元素都要等待相對(duì)的刪除/讀取操作,否則進(jìn)行插入操作的線程就要一直等待,反之亦然。
          2. 「LinkedBlockingQueue」:無(wú)界隊(duì)列(嚴(yán)格來(lái)說(shuō)并非無(wú)界,上限是Integer.MAX_VALUE),基于鏈表結(jié)構(gòu)。使用無(wú)界隊(duì)列后,當(dāng)核心線程都繁忙時(shí),后續(xù)任務(wù)可以無(wú)限加入隊(duì)列,因此線程池中線程數(shù)不會(huì)超過(guò)核心線程數(shù)。這種隊(duì)列可以提高線程池吞吐量,但代價(jià)是犧牲內(nèi)存空間,甚至?xí)?dǎo)致內(nèi)存溢出。另外,使用它時(shí)可以指定容量,這樣它也就是一種有界隊(duì)列了。
          3. 「ArrayBlockingQueue」:有界隊(duì)列,基于數(shù)組實(shí)現(xiàn)。在線程池初始化時(shí),指定隊(duì)列的容量,后續(xù)無(wú)法再調(diào)整。這種有界隊(duì)列有利于防止資源耗盡,但可能更難調(diào)整和控制。

          另外,Java還提供了另外4種隊(duì)列:

          1. 「PriorityBlockingQueue」:支持優(yōu)先級(jí)排序的無(wú)界阻塞隊(duì)列。存放在PriorityBlockingQueue中的元素必須實(shí)現(xiàn)Comparable接口,這樣才能通過(guò)實(shí)現(xiàn)compareTo()方法進(jìn)行排序。優(yōu)先級(jí)最高的元素將始終排在隊(duì)列的頭部;PriorityBlockingQueue不會(huì)保證優(yōu)先級(jí)一樣的元素的排序,也不保證當(dāng)前隊(duì)列中除了優(yōu)先級(jí)最高的元素以外的元素,隨時(shí)處于正確排序的位置。
          2. 「DelayQueue」:延遲隊(duì)列。基于二叉堆實(shí)現(xiàn),同時(shí)具備:無(wú)界隊(duì)列、阻塞隊(duì)列、優(yōu)先隊(duì)列的特征。DelayQueue延遲隊(duì)列中存放的對(duì)象,必須是實(shí)現(xiàn)Delayed接口的類對(duì)象。通過(guò)執(zhí)行時(shí)延從隊(duì)列中提取任務(wù),時(shí)間沒(méi)到任務(wù)取不出來(lái)。更多內(nèi)容請(qǐng)見(jiàn)DelayQueue:面試官:談?wù)凧ava中的阻塞延遲隊(duì)列DelayQueue原理和用法
          3. 「LinkedBlockingDeque」:雙端隊(duì)列。基于鏈表實(shí)現(xiàn),既可以從尾部插入/取出元素,還可以從頭部插入元素/取出元素。
          4. 「LinkedTransferQueue」:由鏈表結(jié)構(gòu)組成的無(wú)界阻塞隊(duì)列。這個(gè)隊(duì)列比較特別的時(shí),采用一種預(yù)占模式,意思就是消費(fèi)者線程取元素時(shí),如果隊(duì)列不為空,則直接取走數(shù)據(jù),若隊(duì)列為空,那就生成一個(gè)節(jié)點(diǎn)(節(jié)點(diǎn)元素為null)入隊(duì),然后消費(fèi)者線程被等待在這個(gè)節(jié)點(diǎn)上,后面生產(chǎn)者線程入隊(duì)時(shí)發(fā)現(xiàn)有一個(gè)元素為null的節(jié)點(diǎn),生產(chǎn)者線程就不入隊(duì)了,直接就將元素填充到該節(jié)點(diǎn),并喚醒該節(jié)點(diǎn)等待的線程,被喚醒的消費(fèi)者線程取走元素。

          拒絕策略

          線程池有一個(gè)重要的機(jī)制:拒絕策略。當(dāng)線程池workQueue已滿且無(wú)法再創(chuàng)建新線程池時(shí),就要拒絕后續(xù)任務(wù)了。拒絕策略需要實(shí)現(xiàn)RejectedExecutionHandler接口,不過(guò)Executors框架已經(jīng)為我們實(shí)現(xiàn)了4種拒絕策略:

          1. 「AbortPolicy」(默認(rèn)):丟棄任務(wù)并拋出RejectedExecutionException異常。
          2. 「CallerRunsPolicy」:直接運(yùn)行這個(gè)任務(wù)的run方法,但并非是由線程池的線程處理,而是交由任務(wù)的調(diào)用線程處理。
          3. 「DiscardPolicy」:直接丟棄任務(wù),不拋出任何異常。
          4. 「DiscardOldestPolicy」:將當(dāng)前處于等待隊(duì)列列頭的等待任務(wù)強(qiáng)行取出,然后再試圖將當(dāng)前被拒絕的任務(wù)提交到線程池執(zhí)行。

          線程工廠指定創(chuàng)建線程的方式,這個(gè)參數(shù)不是必選項(xiàng),Executors類已經(jīng)為我們非常貼心地提供了一個(gè)默認(rèn)的線程工廠:

          /**
          ?*?The?default?thread?factory
          ?*/

          static?class?DefaultThreadFactory?implements?ThreadFactory?{
          ????private?static?final?AtomicInteger?poolNumber?=?new?AtomicInteger(1);
          ????private?final?ThreadGroup?group;
          ????private?final?AtomicInteger?threadNumber?=?new?AtomicInteger(1);
          ????private?final?String?namePrefix;

          ????DefaultThreadFactory()?{
          ????????SecurityManager?s?=?System.getSecurityManager();
          ????????group?=?(s?!=?null)???s.getThreadGroup()?:
          ??????????????????????????????Thread.currentThread().getThreadGroup();
          ????????namePrefix?=?"pool-"?+
          ??????????????????????poolNumber.getAndIncrement()?+
          ?????????????????????"-thread-";
          ????}

          ????public?Thread?newThread(Runnable?r)?{
          ????????Thread?t?=?new?Thread(group,?r,
          ??????????????????????????????namePrefix?+?threadNumber.getAndIncrement(),
          ??????????????????????????????0);
          ????????if?(t.isDaemon())
          ????????????t.setDaemon(false);
          ????????if?(t.getPriority()?!=?Thread.NORM_PRIORITY)
          ????????????t.setPriority(Thread.NORM_PRIORITY);
          ????????return?t;
          ????}
          }

          線程池狀態(tài)

          線程池有5種狀態(tài):

          volatile?int?runState;
          //?runState?is?stored?in?the?high-order?bits
          private?static?final?int?RUNNING????=?-1?<private?static?final?int?SHUTDOWN???=??0?<private?static?final?int?STOP???????=??1?<private?static?final?int?TIDYING????=??2?<private?static?final?int?TERMINATED?=??3?<

          runState表示當(dāng)前線程池的狀態(tài),它是一個(gè) volatile 變量用來(lái)保證線程之間的可見(jiàn)性。

          下面的幾個(gè)static final變量表示runState可能的幾個(gè)取值,有以下幾個(gè)狀態(tài):

          • 「RUNNING」:當(dāng)創(chuàng)建線程池后,初始時(shí),線程池處于RUNNING狀態(tài);
          • 「SHUTDOWN」:如果調(diào)用了shutdown()方法,則線程池處于SHUTDOWN狀態(tài),此時(shí)線程池不能夠接受新的任務(wù),它會(huì)等待所有任務(wù)執(zhí)行完畢;
          • 「STOP」:如果調(diào)用了shutdownNow()方法,則線程池處于STOP狀態(tài),此時(shí)線程池不能接受新的任務(wù),并且會(huì)去嘗試終止正在執(zhí)行的任務(wù);
          • 「TERMINATED」:當(dāng)線程池處于SHUTDOWNSTOP狀態(tài),并且所有工作線程已經(jīng)銷毀,任務(wù)緩存隊(duì)列已經(jīng)清空或執(zhí)行結(jié)束后,線程池被設(shè)置為TERMINATED狀態(tài)。

          初始化&容量調(diào)整&關(guān)閉

          「1、線程初始化」

          默認(rèn)情況下,創(chuàng)建線程池之后,線程池中是沒(méi)有線程的,需要提交任務(wù)之后才會(huì)創(chuàng)建線程。

          在實(shí)際中如果需要線程池創(chuàng)建之后立即創(chuàng)建線程,可以通過(guò)以下兩個(gè)方法辦到:

          • 「prestartCoreThread()」:boolean prestartCoreThread(),初始化一個(gè)核心線程
          • 「prestartAllCoreThreads()」:int prestartAllCoreThreads(),初始化所有核心線程,并返回初始化的線程數(shù)
          public?boolean?prestartCoreThread()?{
          ????return?addIfUnderCorePoolSize(null);?//注意傳進(jìn)去的參數(shù)是null
          }

          public?int?prestartAllCoreThreads()?{
          ????int?n?=?0;
          ????while?(addIfUnderCorePoolSize(null))//注意傳進(jìn)去的參數(shù)是null
          ????????++n;
          ????return?n;
          }

          「2、線程池關(guān)閉」

          ThreadPoolExecutor提供了兩個(gè)方法,用于線程池的關(guān)閉:

          • 「shutdown()」:不會(huì)立即終止線程池,而是要等所有任務(wù)緩存隊(duì)列中的任務(wù)都執(zhí)行完后才終止,但再也不會(huì)接受新的任務(wù)
          • 「shutdownNow()」:立即終止線程池,并嘗試打斷正在執(zhí)行的任務(wù),并且清空任務(wù)緩存隊(duì)列,返回尚未執(zhí)行的任務(wù)

          「3、線程池容量調(diào)整」

          ThreadPoolExecutor提供了動(dòng)態(tài)調(diào)整線程池容量大小的方法:

          • 「setCorePoolSize」:設(shè)置核心池大小
          • 「setMaximumPoolSize」:設(shè)置線程池最大能創(chuàng)建的線程數(shù)目大小

          當(dāng)上述參數(shù)從小變大時(shí),ThreadPoolExecutor進(jìn)行線程賦值,還可能立即創(chuàng)建新的線程來(lái)執(zhí)行任務(wù)。

          使用線程池

          ThreadPoolExecutor

          通過(guò)構(gòu)造方法使用ThreadPoolExecutor是線程池最直接的使用方式,下面看一個(gè)實(shí)例:

          import?java.util.concurrent.ArrayBlockingQueue;
          import?java.util.concurrent.ThreadPoolExecutor;
          import?java.util.concurrent.TimeUnit;

          public?class?MyTest?{
          ?public?static?void?main(String[]?args)?{
          ??//?創(chuàng)建線程池
          ??ThreadPoolExecutor?threadPool?=?new?ThreadPoolExecutor(3,?5,?5,?TimeUnit.SECONDS,
          ????new?ArrayBlockingQueue(5));
          ??//?向線程池提交任務(wù)
          ??for?(int?i?=?0;?i????threadPool.execute(new?Runnable()?{
          ????@Override
          ????public?void?run()?{
          ?????for?(int?x?=?0;?x?2;?x++)?{
          ??????System.out.println(Thread.currentThread().getName()?+?":"?+?x);
          ??????try?{
          ???????Thread.sleep(2000);
          ??????}?catch?(InterruptedException?e)?{
          ???????e.printStackTrace();
          ??????}
          ?????}
          ????}
          ???});
          ??}

          ??//?關(guān)閉線程池
          ??threadPool.shutdown();?//?設(shè)置線程池的狀態(tài)為SHUTDOWN,然后中斷所有沒(méi)有正在執(zhí)行任務(wù)的線程
          ??//?threadPool.shutdownNow();?//?設(shè)置線程池的狀態(tài)為STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表,該方法要慎用,容易造成不可控的后果
          ?}
          }

          運(yùn)行結(jié)果:

          pool-1-thread-2:0
          pool-1-thread-1:0
          pool-1-thread-3:0
          pool-1-thread-2:1
          pool-1-thread-3:1
          pool-1-thread-1:1

          Executors封裝線程池

          另外,Executors封裝好了4種常見(jiàn)的功能線程池(還是那么地貼心):

          「1、FixedThreadPool」

          固定容量線程池。其特點(diǎn)是最大線程數(shù)就是核心線程數(shù),意味著線程池只能創(chuàng)建核心線程,keepAliveTime為0,即線程執(zhí)行完任務(wù)立即回收。任務(wù)隊(duì)列未指定容量,代表使用默認(rèn)值Integer.MAX_VALUE。適用于需要控制并發(fā)線程的場(chǎng)景。

          //?使用默認(rèn)線程工廠
          public?static?ExecutorService?newFixedThreadPool(int?nThreads)?{
          ????return?new?ThreadPoolExecutor(nThreads,?nThreads,
          ??????????????????????????????????0L,?TimeUnit.MILLISECONDS,
          ??????????????????????????????????new?LinkedBlockingQueue());
          }
          //?需要自定義線程工廠
          public?static?ExecutorService?newFixedThreadPool(int?nThreads,?ThreadFactory?threadFactory)?{
          ????return?new?ThreadPoolExecutor(nThreads,?nThreads,
          ??????????????????????????????????0L,?TimeUnit.MILLISECONDS,
          ??????????????????????????????????new?LinkedBlockingQueue(),
          ??????????????????????????????????threadFactory);
          }

          使用示例:

          //?1.?創(chuàng)建線程池對(duì)象,設(shè)置核心線程和最大線程數(shù)為5
          ExecutorService?fixedThreadPool?=?Executors.newFixedThreadPool(5);
          //?2.?創(chuàng)建Runnable(任務(wù))
          Runnable?task?=new?Runnable(){
          ??public?void?run()?{
          ?????System.out.println(Thread.currentThread().getName()?+?"--->運(yùn)行");
          ??}
          };
          //?3.?向線程池提交任務(wù)
          fixedThreadPool.execute(task);

          「2、 SingleThreadExecutor」

          單線程線程池。特點(diǎn)是線程池中只有一個(gè)線程(核心線程),線程執(zhí)行完任務(wù)立即回收,使用有界阻塞隊(duì)列(容量未指定,使用默認(rèn)值Integer.MAX_VALUE

          public?static?ExecutorService?newSingleThreadExecutor()?{
          ????return?new?FinalizableDelegatedExecutorService
          ????????(new?ThreadPoolExecutor(1,?1,
          ????????????????????????????????0L,?TimeUnit.MILLISECONDS,
          ????????????????????????????????new?LinkedBlockingQueue()));
          }
          //?為節(jié)省篇幅,省略了自定義線程工廠方式的源碼

          使用示例:

          //?1.?創(chuàng)建單線程線程池
          ExecutorService?singleThreadExecutor?=?Executors.newSingleThreadExecutor();
          //?2.?創(chuàng)建Runnable(任務(wù))
          Runnable?task?=?new?Runnable(){
          ??public?void?run()?{
          ?????System.out.println(Thread.currentThread().getName()?+?"--->運(yùn)行");
          ??}
          };
          //?3.?向線程池提交任務(wù)
          singleThreadExecutor.execute(task);

          「3、 ScheduledThreadPool」

          定時(shí)線程池。指定核心線程數(shù)量,普通線程數(shù)量無(wú)限,線程執(zhí)行完任務(wù)立即回收,任務(wù)隊(duì)列為延時(shí)阻塞隊(duì)列。這是一個(gè)比較特別的線程池,適用于「執(zhí)行定時(shí)或周期性的任務(wù)」

          public?static?ScheduledExecutorService?newScheduledThreadPool(int?corePoolSize)?{
          ????return?new?ScheduledThreadPoolExecutor(corePoolSize);
          }

          //?繼承了?ThreadPoolExecutor
          public?class?ScheduledThreadPoolExecutor?extends?ThreadPoolExecutor
          ????????implements?ScheduledExecutorService?
          {
          ????//?構(gòu)造函數(shù),省略了自定義線程工廠的構(gòu)造函數(shù)
          ?public?ScheduledThreadPoolExecutor(int?corePoolSize)?{
          ?????super(corePoolSize,?Integer.MAX_VALUE,?0,?NANOSECONDS,
          ???????????new?DelayedWorkQueue());
          ?}
          ?
          ?//?延時(shí)執(zhí)行任務(wù)
          ?public?ScheduledFuture?schedule(Runnable?command,
          ???????????????????????????????????????long?delay,
          ???????????????????????????????????????TimeUnit?unit)?{
          ????????...
          ????}
          ?//?定時(shí)執(zhí)行任務(wù)
          ?public?ScheduledFuture?scheduleAtFixedRate(Runnable?command,
          ??????????????????????????????????????????????????long?initialDelay,
          ??????????????????????????????????????????????????long?period,
          ??????????????????????????????????????????????????TimeUnit?unit)?{...}
          }

          使用示例:

          //?1.?創(chuàng)建定時(shí)線程池
          ExecutorService?scheduledThreadPool?=?Executors.newScheduledThreadPool(5);
          //?2.?創(chuàng)建Runnable(任務(wù))
          Runnable?task?=?new?Runnable(){
          ??public?void?run()?{
          ?????System.out.println(Thread.currentThread().getName()?+?"--->運(yùn)行");
          ??}
          };
          //?3.?向線程池提交任務(wù)
          scheduledThreadPool.schedule(task,?2,?TimeUnit.SECONDS);?//?延遲2s后執(zhí)行任務(wù)
          scheduledThreadPool.scheduleAtFixedRate(task,50,2000,TimeUnit.MILLISECONDS);//?延遲50ms后、每隔2000ms執(zhí)行任務(wù)

          「4、CachedThreadPool」

          緩存線程池。沒(méi)有核心線程,普通線程數(shù)量為Integer.MAX_VALUE(可以理解為無(wú)限),線程閑置60s后回收,任務(wù)隊(duì)列使用SynchronousQueue這種無(wú)容量的同步隊(duì)列。適用于「任務(wù)量大但耗時(shí)低」的場(chǎng)景。

          public?static?ExecutorService?newCachedThreadPool()?{
          ????return?new?ThreadPoolExecutor(0,?Integer.MAX_VALUE,
          ??????????????????????????????????60L,?TimeUnit.SECONDS,
          ??????????????????????????????????new?SynchronousQueue());
          }

          使用示例:

          //?1.?創(chuàng)建緩存線程池
          ExecutorService?cachedThreadPool?=?Executors.newCachedThreadPool();
          //?2.?創(chuàng)建Runnable(任務(wù))
          Runnable?task?=?new?Runnable(){
          ??public?void?run()?{
          ?????System.out.println(Thread.currentThread().getName()?+?"--->運(yùn)行");
          ??}
          };
          //?3.?向線程池提交任務(wù)
          cachedThreadPool.execute(task);

          解讀線程池

          OK,相信前面內(nèi)容閱讀起來(lái)還算輕松愉悅吧,那么從這里開(kāi)始就進(jìn)入深水區(qū)了,如果后面內(nèi)容能吃透,那么線程池知識(shí)就真的被你掌握了。

          我們知道,向線程池提交任務(wù)是用ThreadPoolExecutorexecute()方法,但在其內(nèi)部,線程任務(wù)的處理其實(shí)是相當(dāng)復(fù)雜的,涉及到ThreadPoolExecutorWorkerThread三個(gè)類的6個(gè)方法:

          向線程池提交任務(wù)

          execute()

          ThreadPoolExecutor類中,任務(wù)提交方法的入口是execute(Runnable command)方法(submit()方法也是調(diào)用了execute()),該方法其實(shí)只在嘗試做一件事:經(jīng)過(guò)各種校驗(yàn)之后,調(diào)用 addWorker(Runnable command,boolean core)方法為線程池創(chuàng)建一個(gè)線程并執(zhí)行任務(wù),與之相對(duì)應(yīng),execute() 的結(jié)果有兩個(gè):

          「參數(shù)說(shuō)明:」

          1. 「Runnable command」:待執(zhí)行的任務(wù)

          「執(zhí)行流程:」

          1、通過(guò) ctl.get() 得到線程池的當(dāng)前線程數(shù),如果線程數(shù)小于corePoolSize,則調(diào)用 addWorker(commond,true)方法創(chuàng)建新的線程執(zhí)行任務(wù),否則執(zhí)行步驟2;

          2、步驟1失敗,說(shuō)明已經(jīng)無(wú)法再創(chuàng)建新線程,那么考慮將任務(wù)放入阻塞隊(duì)列,等待執(zhí)行完任務(wù)的線程來(lái)處理。基于此,判斷線程池是否處于Running狀態(tài)(只有Running狀態(tài)的線程池可以接受新任務(wù)),如果任務(wù)添加到任務(wù)隊(duì)列成功則進(jìn)入步驟3,失敗則進(jìn)入步驟4;

          3、來(lái)到這一步需要說(shuō)明任務(wù)已經(jīng)加入任務(wù)隊(duì)列,這時(shí)要二次校驗(yàn)線程池的狀態(tài),會(huì)有以下情形:

          • 線程池不再是Running狀態(tài)了,需要將任務(wù)從任務(wù)隊(duì)列中移除,如果移除成功則拒絕本次任務(wù)
          • 線程池是Running狀態(tài),則判斷線程池工作線程是否為0,是則調(diào)用 addWorker(commond,true)添加一個(gè)沒(méi)有初始任務(wù)的線程(這個(gè)線程將去獲取已經(jīng)加入任務(wù)隊(duì)列的本次任務(wù)并執(zhí)行),否則進(jìn)入步驟4;
          • 線程池不是Running狀態(tài),但從任務(wù)隊(duì)列移除任務(wù)失敗(可能已被某線程獲取?),進(jìn)入步驟4;

          4、將線程池?cái)U(kuò)容至maximumPoolSize并調(diào)用 addWorker(commond,false)方法創(chuàng)建新的線程執(zhí)行任務(wù),失敗則拒絕本次任務(wù)。

          「流程圖:」

          創(chuàng)建新的線程執(zhí)行任務(wù)

          「源碼詳讀:」

          /**
          ?*?在將來(lái)的某個(gè)時(shí)候執(zhí)行給定的任務(wù)。任務(wù)可以在新線程中執(zhí)行,也可以在現(xiàn)有的池線程中執(zhí)行。
          ?*?如果由于此執(zhí)行器已關(guān)閉或已達(dá)到其容量而無(wú)法提交任務(wù)以供執(zhí)行,則由當(dāng)前的{@code?RejectedExecutionHandler}處理該任務(wù)。
          ?*?
          ?*?@param?command?the?task?to?execute??待執(zhí)行的任務(wù)命令
          ?*/

          public?void?execute(Runnable?command)?{
          ????if?(command?==?null)
          ????????throw?new?NullPointerException();
          ????/*
          ?????*?Proceed?in?3?steps:
          ?????*?
          ?????* 1. 如果運(yùn)行的線程少于corePoolSize,將嘗試以給定的命令作為第一個(gè)任務(wù)啟動(dòng)新線程。
          ?????*
          ?????*?2.?如果一個(gè)任務(wù)可以成功排隊(duì),那么我們?nèi)匀恍枰屑?xì)檢查兩點(diǎn),其一,我們是否應(yīng)該添加一個(gè)線程
          ?????*?(因?yàn)樽詮纳洗螜z查至今,一些存在的線程已經(jīng)死亡),其二,線程池狀態(tài)此時(shí)已改變成非運(yùn)行態(tài)。因此,我們重新檢查狀態(tài),如果檢查不通過(guò),則移除已經(jīng)入列的任務(wù),如果檢查通過(guò)且線程池線程數(shù)為0,則啟動(dòng)新線程。
          ?????*?
          ?????* 3. 如果無(wú)法將任務(wù)加入任務(wù)隊(duì)列,則將線程池?cái)U(kuò)容到極限容量并嘗試創(chuàng)建一個(gè)新線程,如果失敗則拒絕任務(wù)。
          ?????*/

          ????int?c?=?ctl.get();
          ???
          ????//?步驟1:判斷線程池當(dāng)前線程數(shù)是否小于線程池大小
          ????if?(workerCountOf(c)?????????//?增加一個(gè)工作線程并添加任務(wù),成功則返回,否則進(jìn)行步驟2
          ????????//?true代表使用coreSize作為邊界約束,否則使用maximumPoolSize
          ????????if?(addWorker(command,?true))
          ????????????return;
          ????????c?=?ctl.get();
          ????}
          ????//?步驟2:不滿足workerCountOf(c)?< corePoolSize或addWorker失敗,進(jìn)入步驟2
          ????//?校驗(yàn)線程池是否是Running狀態(tài)且任務(wù)是否成功放入workQueue(阻塞隊(duì)列)
          ????if?(isRunning(c)?&&?workQueue.offer(command))?{
          ????????int?recheck?=?ctl.get();
          ????????//?再次校驗(yàn),如果線程池非Running且從任務(wù)隊(duì)列中移除任務(wù)成功,則拒絕該任務(wù)
          ????????if?(!?isRunning(recheck)?&&?remove(command))
          ????????????reject(command);
          ????????//?如果線程池工作線程數(shù)量為0,則新建一個(gè)空任務(wù)的線程
          ????????else?if?(workerCountOf(recheck)?==?0)
          ????????????//?如果線程池不是Running狀態(tài),是加入不進(jìn)去的
          ????????????addWorker(null,?false);
          ????}
          ????//?步驟3:如果線程池不是Running狀態(tài)或任務(wù)入列失敗,嘗試擴(kuò)容maxPoolSize后再次addWorker,失敗則拒絕任務(wù)
          ????else?if?(!addWorker(command,?false))
          ????????reject(command);
          }

          addWorker()

          addWorker(Runnable firstTask, boolean core) 方法,顧名思義,向線程池添加一個(gè)帶有任務(wù)的工作線程。

          「參數(shù)說(shuō)明:」

          1. 「Runnable firstTask」:新創(chuàng)建的線程應(yīng)該首先運(yùn)行的任務(wù)(如果沒(méi)有,則為空)。
          2. 「boolean core」:該參數(shù)決定了線程池容量的約束條件,即當(dāng)前線程數(shù)量以何值為極限值。參數(shù)為 true 則使用corePollSize 作為約束值,否則使用maximumPoolSize

          「執(zhí)行流程:」

          1、外層循環(huán)判斷線程池的狀態(tài)是否可以新增工作線程。這層校驗(yàn)基于下面兩個(gè)原則:

          • 線程池為Running狀態(tài)時(shí),既可以接受新任務(wù)也可以處理任務(wù)
          • 線程池為關(guān)閉狀態(tài)時(shí)只能新增空任務(wù)的工作線程(worker)處理任務(wù)隊(duì)列(workQueue)中的任務(wù)不能接受新任務(wù)

          2、內(nèi)層循環(huán)向線程池添加工作線程并返回是否添加成功的結(jié)果。

          • 首先校驗(yàn)線程數(shù)是否已經(jīng)超限制,是則返回false,否則進(jìn)入下一步
          • 通過(guò)CAS使工作線程數(shù)+1,成功則進(jìn)入步驟3,失敗則再次校驗(yàn)線程池是否是運(yùn)行狀態(tài),是則繼續(xù)內(nèi)層循環(huán),不是則返回外層循環(huán)

          3、核心線程數(shù)量+1成功的后續(xù)操作:添加到工作線程集合,并啟動(dòng)工作線程

          • 首先獲取鎖之后,再次校驗(yàn)線程池狀態(tài)(具體校驗(yàn)規(guī)則見(jiàn)代碼注解),通過(guò)則進(jìn)入下一步,未通過(guò)則添加線程失敗
          • 線程池狀態(tài)校驗(yàn)通過(guò)后,再檢查線程是否已經(jīng)啟動(dòng),是則拋出異常,否則嘗試將線程加入線程池
          • 檢查線程是否啟動(dòng)成功,成功則返回true,失敗則進(jìn)入 addWorkerFailed 方法

          「流程圖:」

          向線程池添加一個(gè)帶有任務(wù)的工作線程

          「源碼詳讀:」

          private?boolean?addWorker(Runnable?firstTask,?boolean?core)?{
          ????//?外層循環(huán):判斷線程池狀態(tài)
          ????retry:
          ????for?(;;)?{
          ????????int?c?=?ctl.get();
          ????????int?rs?=?runStateOf(c);

          ????????/**?
          ?????????*?1.線程池為非Running狀態(tài)(Running狀態(tài)則既可以新增核心線程也可以接受任務(wù))
          ?????????*?2.線程為shutdown狀態(tài)且firstTask為空且隊(duì)列不為空
          ?????????*?3.滿足條件1且條件2不滿足,則返回false
          ?????????* 4.條件2解讀:線程池為shutdown狀態(tài)時(shí)且任務(wù)隊(duì)列不為空時(shí),可以新增空任務(wù)的線程來(lái)處理隊(duì)列中的任務(wù)
          ?????????*/

          ????????if?(rs?>=?SHUTDOWN?&&
          ????????????!?(rs?==?SHUTDOWN?&&
          ???????????????firstTask?==?null?&&
          ???????????????!?workQueue.isEmpty()))
          ????????????return?false;

          ??//?內(nèi)層循環(huán):線程池添加核心線程并返回是否添加成功的結(jié)果
          ????????for?(;;)?{
          ????????????int?wc?=?workerCountOf(c);
          ????????????//?校驗(yàn)線程池已有線程數(shù)量是否超限:
          ????????????//?1.線程池最大上限CAPACITY?
          ????????????//?2.corePoolSize或maximumPoolSize(取決于入?yún)ore)
          ????????????if?(wc?>=?CAPACITY?||
          ????????????????wc?>=?(core???corePoolSize?:?maximumPoolSize))?
          ????????????????return?false;
          ????????????//?通過(guò)CAS操作使工作線程數(shù)+1,跳出外層循環(huán)
          ????????????if?(compareAndIncrementWorkerCount(c))?
          ????????????????break?retry;
          ????????????//?線程+1失敗,重讀ctl
          ????????????c?=?ctl.get();???//?Re-read?ctl
          ????????????//?如果此時(shí)線程池狀態(tài)不再是running,則重新進(jìn)行外層循環(huán)
          ????????????if?(runStateOf(c)?!=?rs)
          ????????????????continue?retry;
          ????????????//?其他?CAS?失敗是因?yàn)楣ぷ骶€程數(shù)量改變了,繼續(xù)內(nèi)層循環(huán)嘗試CAS對(duì)線程數(shù)+1
          ????????????//?else?CAS?failed?due?to?workerCount?change;?retry?inner?loop
          ????????}
          ????}

          ????/**
          ?????*?核心線程數(shù)量+1成功的后續(xù)操作:添加到工作線程集合,并啟動(dòng)工作線程
          ?????*/

          ????boolean?workerStarted?=?false;
          ????boolean?workerAdded?=?false;
          ????Worker?w?=?null;
          ????try?{
          ????????final?ReentrantLock?mainLock?=?this.mainLock;
          ????????w?=?new?Worker(firstTask);
          ????????final?Thread?t?=?w.thread;
          ????????if?(t?!=?null)?{
          ????????????//?下面代碼需要加鎖:線程池主鎖
          ????????????mainLock.lock();?
          ????????????try?{
          ????????????????//?持鎖期間重新檢查,線程工廠創(chuàng)建線程失敗或獲取鎖之前關(guān)閉的情況發(fā)生時(shí),退出
          ????????????????int?c?=?ctl.get();
          ????????????????int?rs?=?runStateOf(c);

          ????//?再次檢驗(yàn)線程池是否是running狀態(tài)或線程池shutdown但線程任務(wù)為空
          ????????????????if?(rs?????????????????????(rs?==?SHUTDOWN?&&?firstTask?==?null))?{
          ????????????????????//?線程已經(jīng)啟動(dòng),則拋出非法線程狀態(tài)異常
          ????????????????????//?為什么會(huì)存在這種狀態(tài)呢?未解決
          ????????????????????if?(t.isAlive())?//?precheck?that?t?is?startable
          ????????????????????????throw?new?IllegalThreadStateException();
          ????????????????????workers.add(w);?//加入線程池
          ????????????????????int?s?=?workers.size();
          ????????????????????//?如果當(dāng)前工作線程數(shù)超過(guò)線程池曾經(jīng)出現(xiàn)過(guò)的最大線程數(shù),刷新后者值
          ????????????????????if?(s?>?largestPoolSize)
          ????????????????????????largestPoolSize?=?s;?
          ????????????????????workerAdded?=?true;
          ????????????????}
          ????????????}?finally?{
          ????????????????mainLock.unlock();??//?釋放鎖
          ????????????}
          ????????????if?(workerAdded)?{?//?工作線程添加成功,啟動(dòng)該線程
          ????????????????t.start();
          ????????????????workerStarted?=?true;
          ????????????}
          ????????}
          ????}?finally?{
          ????????//線程啟動(dòng)失敗,則進(jìn)入addWorkerFailed
          ????????if?(!?workerStarted)?
          ????????????addWorkerFailed(w);
          ????}
          ????return?workerStarted;
          }

          Worker類

          Worker類是內(nèi)部類,既實(shí)現(xiàn)了Runnable,又繼承了AbstractQueuedSynchronizer(以下簡(jiǎn)稱AQS),所以其既是一個(gè)可執(zhí)行的任務(wù),又可以達(dá)到鎖的效果。

          Worker類主要維護(hù)正在運(yùn)行任務(wù)的線程的中斷控制狀態(tài),以及其他次要的記錄。這個(gè)類適時(shí)地繼承了AbstractQueuedSynchronizer類,以簡(jiǎn)化獲取和釋放鎖(該鎖作用于每個(gè)任務(wù)執(zhí)行代碼)的過(guò)程。這樣可以防止去中斷正在運(yùn)行中的任務(wù),只會(huì)中斷在等待從任務(wù)隊(duì)列中獲取任務(wù)的線程。

          我們實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的不可重入互斥鎖,而不是使用可重入鎖,因?yàn)槲覀儾幌Mぷ魅蝿?wù)在調(diào)用setCorePoolSize之類的池控制方法時(shí)能夠重新獲取鎖。另外,為了在線程真正開(kāi)始運(yùn)行任務(wù)之前禁止中斷,我們將鎖狀態(tài)初始化為負(fù)值,并在啟動(dòng)時(shí)清除它(在runWorker中)。

          private?final?class?Worker
          ????extends?AbstractQueuedSynchronizer
          ????implements?Runnable
          {
          ????/**
          ?????*?This?class?will?never?be?serialized,?but?we?provide?a
          ?????*?serialVersionUID?to?suppress?a?javac?warning.
          ?????*/

          ????private?static?final?long?serialVersionUID?=?6138294804551838833L;
          ?
          ????/**?Thread?this?worker?is?running?in.??Null?if?factory?fails.?*/
          ????final?Thread?thread;?
          ?????
          ????/**?Initial?task?to?run.??Possibly?null.?*/
          ????Runnable?firstTask;
          ?????
          ????/**?Per-thread?task?counter?*/
          ????volatile?long?completedTasks;
          ?
          ????/**
          ?????*?Creates?with?given?first?task?and?thread?from?ThreadFactory.
          ?????*?@param?firstTask?the?first?task?(null?if?none)
          ?????*/

          ????//?通過(guò)構(gòu)造函數(shù)初始化,
          ????Worker(Runnable?firstTask)?{
          ????????//設(shè)置AQS的同步狀態(tài)
          ????????// state:鎖狀態(tài),-1為初始值,0為unlock狀態(tài),1為lock狀態(tài)
          ????????setState(-1);?//?inhibit?interrupts?until?runWorker??在調(diào)用runWorker前,禁止中斷
          ???????
          ????????this.firstTask?=?firstTask;
          ????????//?線程工廠創(chuàng)建一個(gè)線程
          ????????this.thread?=?getThreadFactory().newThread(this);?
          ????}
          ?
          ????/**?Delegates?main?run?loop?to?outer?runWorker??*/
          ????public?void?run()?{
          ????????runWorker(this);?//runWorker()是ThreadPoolExecutor的方法
          ????}
          ?
          ????//?Lock?methods
          ????//?The?value?0?represents?the?unlocked?state.?0代表“沒(méi)被鎖定”狀態(tài)
          ????//?The?value?1?represents?the?locked?state.?1代表“鎖定”狀態(tài)
          ?
          ????protected?boolean?isHeldExclusively()?{
          ????????return?getState()?!=?0;
          ????}
          ?
          ????/**
          ?????*?嘗試獲取鎖的方法
          ?????*?重寫(xiě)AQS的tryAcquire(),AQS本來(lái)就是讓子類來(lái)實(shí)現(xiàn)的
          ?????*/

          ????protected?boolean?tryAcquire(int?unused)?{
          ????????//?判斷原值為0,且重置為1,所以state為-1時(shí),鎖無(wú)法獲取。
          ????????//?每次都是0->1,保證了鎖的不可重入性
          ????????if?(compareAndSetState(0,?1))?{
          ????????????//?設(shè)置exclusiveOwnerThread=當(dāng)前線程
          ????????????setExclusiveOwnerThread(Thread.currentThread());?
          ????????????return?true;
          ????????}
          ????????return?false;
          ????}
          ?
          ????/**
          ?????*?嘗試釋放鎖
          ?????*?不是state-1,而是置為0
          ?????*/

          ????protected?boolean?tryRelease(int?unused)?{
          ????????setExclusiveOwnerThread(null);?
          ????????setState(0);
          ????????return?true;
          ????}
          ?
          ????public?void?lock()????????{?acquire(1);?}
          ????public?boolean?tryLock()??{?return?tryAcquire(1);?}
          ????public?void?unlock()??????{?release(1);?}
          ????public?boolean?isLocked()?{?return?isHeldExclusively();?}
          ?
          ????/**
          ?????*?中斷(如果運(yùn)行)
          ?????*?shutdownNow時(shí)會(huì)循環(huán)對(duì)worker線程執(zhí)行
          ?????*?且不需要獲取worker鎖,即使在worker運(yùn)行時(shí)也可以中斷
          ?????*/

          ????void?interruptIfStarted()?{
          ????????Thread?t;
          ????????//如果state>=0、t!=null、且t沒(méi)有被中斷
          ????????//new?Worker()時(shí)state==-1,說(shuō)明不能中斷
          ????????if?(getState()?>=?0?&&?(t?=?thread)?!=?null?&&?!t.isInterrupted())?{
          ????????????try?{
          ????????????????t.interrupt();
          ????????????}?catch?(SecurityException?ignore)?{
          ????????????}
          ????????}
          ????}
          }

          runWorker()

          可以說(shuō),runWorker(Worker w) 是線程池中真正處理任務(wù)的方法,前面的execute()addWorker() 都是在為該方法做準(zhǔn)備和鋪墊。

          「參數(shù)說(shuō)明:」

          1. 「Worker w」:封裝的Worker,攜帶了工作線程的諸多要素,包括Runnable(待處理任務(wù))、lock(鎖)、completedTasks(記錄線程池已完成任務(wù)數(shù))

          「執(zhí)行流程:」

          1、判斷當(dāng)前任務(wù)或者從任務(wù)隊(duì)列中獲取的任務(wù)是否不為空,都為空則進(jìn)入步驟2,否則進(jìn)入步驟3

          2、任務(wù)為空,則將completedAbruptly置為false(即線程不是突然終止),并執(zhí)行processWorkerExit(w,completedAbruptly)方法進(jìn)入線程退出程序

          3、任務(wù)不為空,則進(jìn)入循環(huán),并加鎖

          4、判斷是否為線程添加中斷標(biāo)識(shí),以下兩個(gè)條件滿足其一則添加中斷標(biāo)識(shí):

          • 線程池狀態(tài)>=STOP,即STOPTERMINATED
          • 一開(kāi)始判斷線程池狀態(tài)<STOP,接下來(lái)檢查發(fā)現(xiàn)Thread.interrupted()true,即線程已經(jīng)被中斷,再次檢查線程池狀態(tài)是否>=STOP(以消除該瞬間shutdown方法生效,使線程池處于STOPTERMINATED

          5、執(zhí)行前置方法 beforeExecute(wt, task)(該方法為空方法,由子類實(shí)現(xiàn))后執(zhí)行task.run() 方法執(zhí)行任務(wù)(執(zhí)行不成功拋出相應(yīng)異常)

          6、執(zhí)行后置方法 afterExecute(task, thrown)(該方法為空方法,由子類實(shí)現(xiàn))后將線程池已完成的任務(wù)數(shù)+1,并釋放鎖。

          7、再次進(jìn)行循環(huán)條件判斷。

          「流程圖:」

          線程池流程圖

          「源碼詳讀:」

          final?void?runWorker(Worker?w)?{
          ????Thread?wt?=?Thread.currentThread();
          ????Runnable?task?=?w.firstTask;
          ????w.firstTask?=?null;
          ????//?allow?interrupts
          ????//?new?Worker()是state==-1,此處是調(diào)用Worker類的tryRelease()方法,將state置為0,而interruptIfStarted()中只有state>=0才允許調(diào)用中斷
          ????w.unlock();?
          ????????????
          ????//?線程退出的原因,true是任務(wù)導(dǎo)致,false是線程正常退出
          ????boolean?completedAbruptly?=?true;?
          ????try?{
          ????????//?當(dāng)前任務(wù)和從任務(wù)隊(duì)列中獲取的任務(wù)都為空,方停止循環(huán)
          ????????while?(task?!=?null?||?(task?=?getTask())?!=?null)?{
          ????????????//上鎖可以防止在shutdown()時(shí)終止正在運(yùn)行的worker,而不是應(yīng)對(duì)并發(fā)
          ????????????w.lock();?
          ?????????????
          ????????????//?If?pool?is?stopping,?ensure?thread?is?interrupted;
          ????????????//?if?not,?ensure?thread?is?not?interrupted.??This
          ????????????//?requires?a?recheck?in?second?case?to?deal?with
          ????????????//?shutdownNow?race?while?clearing?interrupt
          ????????????/**
          ?????????????*?判斷1:確保只有在線程處于stop狀態(tài)且wt未中斷時(shí),wt才會(huì)被設(shè)置中斷標(biāo)識(shí)
          ?????????????*?條件1:線程池狀態(tài)>=STOP,即STOP或TERMINATED
          ?????????????*?條件2:一開(kāi)始判斷線程池狀態(tài)=STOP(以消除該瞬間shutdown方法生效,使線程池處于STOP或TERMINATED),
          ?????????????*?條件1與條件2任意滿意一個(gè),且wt不是中斷狀態(tài),則中斷wt,否則進(jìn)入下一步
          ?????????????*/

          ????????????if?((runStateAtLeast(ctl.get(),?STOP)?||
          ?????????????????(Thread.interrupted()?&&
          ??????????????????runStateAtLeast(ctl.get(),?STOP)))?&&
          ????????????????!wt.isInterrupted())
          ????????????????wt.interrupt();?//當(dāng)前線程調(diào)用interrupt()中斷
          ?????????????
          ????????????try?{
          ????????????????//執(zhí)行前(空方法,由子類重寫(xiě)實(shí)現(xiàn))
          ????????????????beforeExecute(wt,?task);
          ?????????????????
          ????????????????Throwable?thrown?=?null;
          ????????????????try?{
          ????????????????????task.run();
          ????????????????}?
          ????????????????catch?(RuntimeException?x)?{
          ????????????????????thrown?=?x;?throw?x;
          ????????????????}?
          ????????????????catch?(Error?x)?{
          ????????????????????thrown?=?x;?throw?x;
          ????????????????}?
          ????????????????catch?(Throwable?x)?{
          ????????????????????thrown?=?x;?throw?new?Error(x);
          ????????????????}?
          ????????????????finally?{
          ????????????????????//執(zhí)行后(空方法,由子類重寫(xiě)實(shí)現(xiàn))
          ????????????????????afterExecute(task,?thrown);?
          ????????????????}
          ????????????}?
          ????????????finally?{
          ????????????????task?=?null;?
          ????????????????w.completedTasks++;?//完成任務(wù)數(shù)+1
          ????????????????w.unlock();?//釋放鎖
          ????????????}
          ????????}
          ????????//?
          ????????completedAbruptly?=?false;
          ????}?
          ????finally?{
          ????????//處理worker的退出
          ????????processWorkerExit(w,?completedAbruptly);
          ????}
          }

          「5、getTask()」

          由函數(shù)調(diào)用關(guān)系圖可知,在ThreadPoolExecutor類的實(shí)現(xiàn)中,Runnable getTask() 方法是為void runWorker(Worker w)方法服務(wù)的,它的作用就是在任務(wù)隊(duì)列(workQueue)中獲取 task(Runnable)。

          「參數(shù)說(shuō)明」:無(wú)參數(shù)

          「執(zhí)行流程」

          1. timedOut(上次獲取任務(wù)是否超時(shí))置為false(首次執(zhí)行方法,無(wú)上次,自然為false),進(jìn)入一個(gè)無(wú)限循環(huán)
          2. 如果線程池為Shutdown狀態(tài)且任務(wù)隊(duì)列為空(線程池shutdown狀態(tài)可以處理任務(wù)隊(duì)列中的任務(wù),不再接受新任務(wù),這個(gè)是重點(diǎn))或者線程池為STOPTERMINATED狀態(tài),則意味著線程池不必再獲取任務(wù)了,當(dāng)前工作線程數(shù)量-1并返回null,否則進(jìn)入步驟3
          3. 如果線程池?cái)?shù)量超限制或者時(shí)間超限且(任務(wù)隊(duì)列為空或當(dāng)前線程數(shù)>1),則進(jìn)入步驟4,否則進(jìn)入步驟5。
          4. 移除工作線程,成功則返回null,不成功則進(jìn)入下輪循環(huán)。
          5. 嘗試用poll() 或者 take()(具體用哪個(gè)取決于timed的值)獲取任務(wù),如果任務(wù)不為空,則返回該任務(wù)。如果為空,則將timeOut 置為 true進(jìn)入下一輪循環(huán)。如果獲取任務(wù)過(guò)程發(fā)生異常,則將 timeOut置為 false 后進(jìn)入下一輪循環(huán)。

          「流程圖」

          線程池流程圖

          「源碼詳讀:」

          private?Runnable?getTask()?{
          ????//?最新一次poll是否超時(shí)
          ????boolean?timedOut?=?false;?//?Did?the?last?poll()?time?out?

          ????for?(;;)?{
          ????????int?c?=?ctl.get();
          ????????int?rs?=?runStateOf(c);

          ????????//?Check?if?queue?empty?only?if?necessary.
          ????????/**
          ?????????*?條件1:線程池狀態(tài)SHUTDOWN、STOP、TERMINATED狀態(tài)
          ?????????*?條件2:線程池STOP、TERMINATED狀態(tài)或workQueue為空
          ?????????*?條件1與條件2同時(shí)為true,則workerCount-1,并且返回null
          ?????????*?注:條件2是考慮到SHUTDOWN狀態(tài)的線程池不會(huì)接受任務(wù),但仍會(huì)處理任務(wù)
          ?????????*/

          ????????if?(rs?>=?SHUTDOWN?&&?(rs?>=?STOP?||?workQueue.isEmpty()))?{
          ????????????decrementWorkerCount();
          ????????????return?null;
          ????????}

          ????????int?wc?=?workerCountOf(c);

          ????????//?Are?workers?subject?to?culling?
          ????????/**
          ?????????*?下列兩個(gè)條件滿足任意一個(gè),則給當(dāng)前正在嘗試獲取任務(wù)的工作線程設(shè)置阻塞時(shí)間限制(超時(shí)會(huì)被銷毀?不太確定這點(diǎn)),否則線程可以一直保持活躍狀態(tài)
          ?????????* 1.allowCoreThreadTimeOut:當(dāng)前線程是否以keepAliveTime為超時(shí)時(shí)限等待任務(wù)
          ?????????*?2.當(dāng)前線程數(shù)量已經(jīng)超越了核心線程數(shù)
          ?????????*/

          ????????boolean?timed?=?allowCoreThreadTimeOut?||?wc?>?corePoolSize;
          ????????????
          ????????//?兩個(gè)條件全部為true,則通過(guò)CAS使工作線程數(shù)-1,即剔除工作線程
          ????????//?條件1:工作線程數(shù)大于maximumPoolSize,或(工作線程阻塞時(shí)間受限且上次在任務(wù)隊(duì)列拉取任務(wù)超時(shí))
          ????????//?條件2:wc > 1或任務(wù)隊(duì)列為空
          ????????if?((wc?>?maximumPoolSize?||?(timed?&&?timedOut))
          ????????????&&?(wc?>?1?||?workQueue.isEmpty()))?{
          ????????????//?移除工作線程,成功則返回null,不成功則進(jìn)入下輪循環(huán)
          ????????????if?(compareAndDecrementWorkerCount(c))
          ????????????????return?null;
          ????????????continue;
          ????????}

          ?????//?執(zhí)行到這里,說(shuō)明已經(jīng)經(jīng)過(guò)前面重重校驗(yàn),開(kāi)始真正獲取task了
          ????????try?{
          ????????????//?如果工作線程阻塞時(shí)間受限,則使用poll(),否則使用take()
          ????????????//?poll()設(shè)定阻塞時(shí)間,而take()無(wú)時(shí)間限制,直到拿到結(jié)果為止
          ????????????Runnable?r?=?timed??
          ????????????????workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS)?:
          ????????????????workQueue.take();
          ????????????//?r不為空,則返回該Runnable
          ????????????if?(r?!=?null)
          ????????????????return?r;
          ????????????//?沒(méi)能獲取到Runable,則將最近獲取任務(wù)是否超時(shí)設(shè)置為true
          ????????????timedOut?=?true;
          ????????}?catch?(InterruptedException?retry)?{
          ????????????//?響應(yīng)中斷,進(jìn)入下一次循環(huán)前將最近獲取任務(wù)超時(shí)狀態(tài)置為false
          ????????????timedOut?=?false;
          ????????}
          ????}
          }

          processWorkerExit()

          processWorkerExit(Worker w, boolean completedAbruptly)執(zhí)行線程退出的方法

          「參數(shù)說(shuō)明:」

          1. 「Worker w」:要結(jié)束的工作線程。
          2. 「boolean completedAbruptly」:是否突然完成(異常導(dǎo)致),如果工作線程因?yàn)橛脩舢惓K劳觯瑒tcompletedAbruptly參數(shù)為 true

          「執(zhí)行流程:」

          1、如果 completedAbruptlytrue,即工作線程因?yàn)楫惓M蝗凰劳觯瑒t執(zhí)行工作線程-1操作。

          2、主線程獲取鎖后,線程池已經(jīng)完成的任務(wù)數(shù)追加 w(當(dāng)前工作線程) 完成的任務(wù)數(shù),并從workerset集合中移除當(dāng)前worker

          3、根據(jù)線程池狀態(tài)進(jìn)行判斷是否執(zhí)行tryTerminate()結(jié)束線程池。

          4、是否需要增加工作線程,如果線程池還沒(méi)有完全終止,仍需要保持一定數(shù)量的線程。

          • 如果當(dāng)前線程是突然終止的,調(diào)用addWorker()創(chuàng)建工作線程

          • 當(dāng)前線程不是突然終止,但當(dāng)前工作線程數(shù)量小于線程池需要維護(hù)的線程數(shù)量,則創(chuàng)建工作線程。需要維護(hù)的線程數(shù)量為corePoolSize(取決于成員變量 allowCoreThreadTimeOut是否為 false)或1。

          • 源碼詳讀:

          /**
          ?*?Performs?cleanup?and?bookkeeping?for?a?dying?worker.?Called
          ?*?only?from?worker?threads.?Unless?completedAbruptly?is?set,
          ?*?assumes?that?workerCount?has?already?been?adjusted?to?account
          ?*?for?exit.??This?method?removes?thread?from?worker?set,?and
          ?*?possibly?terminates?the?pool?or?replaces?the?worker?if?either
          ?*?it?exited?due?to?user?task?exception?or?if?fewer?than
          ?*?corePoolSize?workers?are?running?or?queue?is?non-empty?but
          ?*?there?are?no?workers.
          ?*
          ?*?@param?w?the?worker
          ?*?@param?completedAbruptly?if?the?worker?died?due?to?user?exception
          ?*/

          private?void?processWorkerExit(Worker?w,?boolean?completedAbruptly)?{
          ????/**
          ?????*?1.工作線程-1操作
          ?????*?1)如果completedAbruptly?為true,說(shuō)明工作線程發(fā)生異常,那么將正在工作的線程數(shù)量-1
          ?????*?2)如果completedAbruptly?為false,說(shuō)明工作線程無(wú)任務(wù)可以執(zhí)行,由getTask()執(zhí)行worker-1操作
          ?????*/

          ????if?(completedAbruptly)?//?If?abrupt,?then?workerCount?wasn't?adjusted
          ????????decrementWorkerCount();

          ????//?2.從線程set集合中移除工作線程,該過(guò)程需要加鎖
          ????final?ReentrantLock?mainLock?=?this.mainLock;
          ????mainLock.lock();
          ????try?{
          ????????//?將該worker已完成的任務(wù)數(shù)追加到線程池已完成的任務(wù)數(shù)
          ????????completedTaskCount?+=?w.completedTasks;
          ????????//?HashSet中移除該worker
          ????????workers.remove(w);
          ????}?finally?{
          ????????mainLock.unlock();
          ????}
          ????
          ?//?3.根據(jù)線程池狀態(tài)進(jìn)行判斷是否結(jié)束線程池
          ????tryTerminate();
          ?
          ?/**
          ?????*?4.是否需要增加工作線程
          ?????*?線程池狀態(tài)是running?或?shutdown
          ?????*?如果當(dāng)前線程是突然終止的,addWorker()
          ?????*?如果當(dāng)前線程不是突然終止的,但當(dāng)前線程數(shù)量??????*?故如果調(diào)用線程池shutdown(),直到workQueue為空前,線程池都會(huì)維持corePoolSize個(gè)線程,然后再逐漸銷毀這corePoolSize個(gè)線程
          ?????*/

          ????int?c?=?ctl.get();
          ????if?(runStateLessThan(c,?STOP))?{
          ???????if?(!completedAbruptly)?{
          ????????????int?min?=?allowCoreThreadTimeOut???0?:?corePoolSize;
          ????????????if?(min?==?0?&&?!?workQueue.isEmpty())
          ????????????????min?=?1;
          ????????????if?(workerCountOf(c)?>=?min)
          ????????????????return;?//?replacement?not?needed
          ????????}
          ????????addWorker(null,?false);
          ????}
          }

          好啦,以上就是Java線程池的全部?jī)?nèi)容啦,堅(jiān)持讀完的伙伴兒們你們收獲如何?覺(jué)得有幫助的就順手點(diǎn)個(gè)贊吧,祝大家新年新氣象,升級(jí)加薪!

          瀏覽 69
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  色吊丝最新永久网址大全 | 黄色考逼视频免费观看网站www | 在线观看黄视频 | 亚洲欧美不卡高清在线 | 清清草视频 |