面試官:詳細(xì)說(shuō)一下Java線程池,從設(shè)計(jì)思想到源碼解讀!
你知道的越多,不知道的就越多,業(yè)余的像一棵小草!
你來(lái),我們一起精進(jìn)!你不來(lái),我和你的競(jìng)爭(zhēng)對(duì)手一起精進(jìn)!
編輯:業(yè)余草
blog.csdn.net/mu_wind
推薦:https://www.xttblog.com/?p=5306
今天說(shuō)一說(shuō),線程池,從設(shè)計(jì)思想到源碼解析。
前言
各位小伙伴兒,春節(jié)已經(jīng)結(jié)束了,在此獻(xiàn)上一篇肝了一個(gè)春節(jié)假期的遲來(lái)的拜年之作,希望讀者朋友們都能有收獲。多多點(diǎn)贊、評(píng)論、收藏!
初識(shí)線程池
我們知道,線程的創(chuàng)建和銷毀都需要映射到操作系統(tǒng),因此其代價(jià)是比較高昂的。出于避免頻繁創(chuàng)建、銷毀線程以及方便線程管理的需要,線程池應(yīng)運(yùn)而生。
線程池優(yōu)勢(shì)
「降低資源消耗」:線程池通常會(huì)維護(hù)一些線程(數(shù)量為 corePoolSize),這些線程被重復(fù)使用來(lái)執(zhí)行不同的任務(wù),任務(wù)完成后不會(huì)銷毀。在待處理任務(wù)量很大的時(shí)候,通過(guò)對(duì)線程資源的復(fù)用,避免了線程的頻繁創(chuàng)建與銷毀,從而降低了系統(tǒng)資源消耗。「提高響應(yīng)速度」:由于線程池維護(hù)了一批 alive狀態(tài)的線程,當(dāng)任務(wù)到達(dá)時(shí),不需要再創(chuàng)建線程,而是直接由這些線程去執(zhí)行任務(wù),從而減少了任務(wù)的等待時(shí)間。「提高線程的可管理性」:使用線程池可以對(duì)線程進(jìn)行統(tǒng)一的分配,調(diào)優(yōu)和監(jiān)控。
線程池設(shè)計(jì)思路
有句話叫做藝術(shù)來(lái)源于生活,編程語(yǔ)言也是如此,很多設(shè)計(jì)思想能映射到日常生活中,比如面向?qū)ο笏枷搿⒎庋b、繼承,等等。今天我們要說(shuō)的線程池,它同樣可以在現(xiàn)實(shí)世界找到對(duì)應(yīng)的實(shí)體——工廠。
先假想一個(gè)工廠的生產(chǎn)流程:

工廠中有固定的一批工人,稱為正式工人,工廠接收的訂單由這些工人去完成。當(dāng)訂單增加,正式工人已經(jīng)忙不過(guò)來(lái)了,工廠會(huì)將生產(chǎn)原料暫時(shí)堆積在倉(cāng)庫(kù)中,等有空閑的工人時(shí)再處理(因?yàn)楣と丝臻e了也不會(huì)主動(dòng)處理倉(cāng)庫(kù)中的生產(chǎn)任務(wù),所以需要調(diào)度員實(shí)時(shí)調(diào)度)。倉(cāng)庫(kù)堆積滿了后,訂單還在增加怎么辦?工廠只能臨時(shí)擴(kuò)招一批工人來(lái)應(yīng)對(duì)生產(chǎn)高峰,而這批工人高峰結(jié)束后是要清退的,所以稱為臨時(shí)工。當(dāng)時(shí)臨時(shí)工也以招滿后(受限于工位限制,臨時(shí)工數(shù)量有上限),后面的訂單只能忍痛拒絕了。
我們做如下一番映射:
工廠——線程池 訂單——任務(wù)(Runnable) 正式工人——核心線程 臨時(shí)工——普通線程 倉(cāng)庫(kù)——任務(wù)隊(duì)列 調(diào)度員——getTask()
?getTask()是一個(gè)方法,將任務(wù)隊(duì)列中的任務(wù)調(diào)度給空閑線程,在解讀線程池有詳細(xì)介紹
?
映射后,形成線程池流程圖如下,兩者是不是有異曲同工之妙?

這樣,線程池的工作原理或者說(shuō)流程就很好理解了,提煉成一個(gè)簡(jiǎn)圖:

深入線程池
那么接下來(lái),問(wèn)題來(lái)了,線程池是具體如何實(shí)現(xiàn)這套工作機(jī)制的呢?從Java線程池Executor框架體系可以看出:線程池的真正實(shí)現(xiàn)類是ThreadPoolExecutor,因此我們接下來(lái)重點(diǎn)研究這個(gè)類。

構(gòu)造方法
研究一個(gè)類,先從它的構(gòu)造方法開(kāi)始。ThreadPoolExecutor提供了4個(gè)有參構(gòu)造方法:
public?ThreadPoolExecutor(int?corePoolSize,
??????????????????????????int?maximumPoolSize,
??????????????????????????long?keepAliveTime,
??????????????????????????TimeUnit?unit,
??????????????????????????BlockingQueue?workQueue) ?{
????this(corePoolSize,?maximumPoolSize,?keepAliveTime,?unit,?workQueue,
?????????????Executors.defaultThreadFactory(),?defaultHandler);
}
public?ThreadPoolExecutor(int?corePoolSize,
??????????????????????????int?maximumPoolSize,
??????????????????????????long?keepAliveTime,
??????????????????????????TimeUnit?unit,
??????????????????????????BlockingQueue?workQueue,
??????????????????????????ThreadFactory?threadFactory) ?{
????this(corePoolSize,?maximumPoolSize,?keepAliveTime,?unit,?workQueue,
?????????????threadFactory,?defaultHandler);
}
public?ThreadPoolExecutor(int?corePoolSize,
??????????????????????????int?maximumPoolSize,
??????????????????????????long?keepAliveTime,
??????????????????????????TimeUnit?unit,
??????????????????????????BlockingQueue?workQueue,
??????????????????????????RejectedExecutionHandler?handler) ?{
????this(corePoolSize,?maximumPoolSize,?keepAliveTime,?unit,?workQueue,
?????????????Executors.defaultThreadFactory(),?handler);
}
public?ThreadPoolExecutor(int?corePoolSize,
??????????????????????????int?maximumPoolSize,
??????????????????????????long?keepAliveTime,
??????????????????????????TimeUnit?unit,
??????????????????????????BlockingQueue?workQueue,
??????????????????????????ThreadFactory?threadFactory,
??????????????????????????RejectedExecutionHandler?handler) ?{
????if?(corePoolSize?0?||
????????maximumPoolSize?<=?0?||
????????maximumPoolSize?????????keepAliveTime?0)
????????throw?new?IllegalArgumentException();
????if?(workQueue?==?null?||?threadFactory?==?null?||?handler?==?null)
????????throw?new?NullPointerException();
????this.corePoolSize?=?corePoolSize;
????this.maximumPoolSize?=?maximumPoolSize;
????this.workQueue?=?workQueue;
????this.keepAliveTime?=?unit.toNanos(keepAliveTime);
????this.threadFactory?=?threadFactory;
????this.handler?=?handler;
}
解釋一下構(gòu)造方法中涉及到的參數(shù):
「corePoolSize」(必需):核心線程數(shù)。即池中一直保持存活的線程數(shù),即使這些線程處于空閑。但是將 allowCoreThreadTimeOut參數(shù)設(shè)置為true后,核心線程處于空閑一段時(shí)間以上,也會(huì)被回收。「maximumPoolSize」(必需):池中允許的最大線程數(shù)。當(dāng)核心線程全部繁忙且任務(wù)隊(duì)列打滿之后,線程池會(huì)臨時(shí)追加線程,直到總線程數(shù)達(dá)到 maximumPoolSize這個(gè)上限。「keepAliveTime」(必需):線程空閑超時(shí)時(shí)間。當(dāng)非核心線程處于空閑狀態(tài)的時(shí)間超過(guò)這個(gè)時(shí)間后,該線程將被回收。將 allowCoreThreadTimeOut參數(shù)設(shè)置為true后,核心線程也會(huì)被回收。「unit」(必需): keepAliveTime參數(shù)的時(shí)間單位。有:TimeUnit.DAYS(天)、TimeUnit.HOURS(小時(shí))、TimeUnit.MINUTES(分鐘)、「TimeUnit.SECONDS(秒)」、「TimeUnit.MILLISECONDS(毫秒)」、TimeUnit.MICROSECONDS(微秒)、TimeUnit.NANOSECONDS(納秒)「workQueue」(必需):任務(wù)隊(duì)列,采用阻塞隊(duì)列實(shí)現(xiàn)。當(dāng)核心線程全部繁忙時(shí),后續(xù)由 execute方法提交的Runnable將存放在任務(wù)隊(duì)列中,等待被線程處理。「threadFactory」(可選):線程工廠。指定線程池創(chuàng)建線程的方式。 「handler」(可選):拒絕策略。當(dāng)線程池中線程數(shù)達(dá)到 maximumPoolSize且workQueue打滿時(shí),后續(xù)提交的任務(wù)將被拒絕,handler可以指定用什么方式拒絕任務(wù)。
放到一起再看一下:

任務(wù)隊(duì)列
使用ThreadPoolExecutor需要指定一個(gè)實(shí)現(xiàn)了BlockingQueue接口的任務(wù)等待隊(duì)列。在ThreadPoolExecutor線程池的API文檔中,一共推薦了三種等待隊(duì)列,它們是:SynchronousQueue、LinkedBlockingQueue和ArrayBlockingQueue;
「SynchronousQueue」:同步隊(duì)列。這是一個(gè)內(nèi)部沒(méi)有任何容量的阻塞隊(duì)列,任何一次插入操作的元素都要等待相對(duì)的刪除/讀取操作,否則進(jìn)行插入操作的線程就要一直等待,反之亦然。 「LinkedBlockingQueue」:無(wú)界隊(duì)列(嚴(yán)格來(lái)說(shuō)并非無(wú)界,上限是 Integer.MAX_VALUE),基于鏈表結(jié)構(gòu)。使用無(wú)界隊(duì)列后,當(dāng)核心線程都繁忙時(shí),后續(xù)任務(wù)可以無(wú)限加入隊(duì)列,因此線程池中線程數(shù)不會(huì)超過(guò)核心線程數(shù)。這種隊(duì)列可以提高線程池吞吐量,但代價(jià)是犧牲內(nèi)存空間,甚至?xí)?dǎo)致內(nèi)存溢出。另外,使用它時(shí)可以指定容量,這樣它也就是一種有界隊(duì)列了。「ArrayBlockingQueue」:有界隊(duì)列,基于數(shù)組實(shí)現(xiàn)。在線程池初始化時(shí),指定隊(duì)列的容量,后續(xù)無(wú)法再調(diào)整。這種有界隊(duì)列有利于防止資源耗盡,但可能更難調(diào)整和控制。
另外,Java還提供了另外4種隊(duì)列:
「PriorityBlockingQueue」:支持優(yōu)先級(jí)排序的無(wú)界阻塞隊(duì)列。存放在 PriorityBlockingQueue中的元素必須實(shí)現(xiàn)Comparable接口,這樣才能通過(guò)實(shí)現(xiàn)compareTo()方法進(jìn)行排序。優(yōu)先級(jí)最高的元素將始終排在隊(duì)列的頭部;PriorityBlockingQueue不會(huì)保證優(yōu)先級(jí)一樣的元素的排序,也不保證當(dāng)前隊(duì)列中除了優(yōu)先級(jí)最高的元素以外的元素,隨時(shí)處于正確排序的位置。「DelayQueue」:延遲隊(duì)列。基于二叉堆實(shí)現(xiàn),同時(shí)具備:無(wú)界隊(duì)列、阻塞隊(duì)列、優(yōu)先隊(duì)列的特征。 DelayQueue延遲隊(duì)列中存放的對(duì)象,必須是實(shí)現(xiàn)Delayed接口的類對(duì)象。通過(guò)執(zhí)行時(shí)延從隊(duì)列中提取任務(wù),時(shí)間沒(méi)到任務(wù)取不出來(lái)。更多內(nèi)容請(qǐng)見(jiàn)DelayQueue:面試官:談?wù)凧ava中的阻塞延遲隊(duì)列DelayQueue原理和用法。「LinkedBlockingDeque」:雙端隊(duì)列。基于鏈表實(shí)現(xiàn),既可以從尾部插入/取出元素,還可以從頭部插入元素/取出元素。 「LinkedTransferQueue」:由鏈表結(jié)構(gòu)組成的無(wú)界阻塞隊(duì)列。這個(gè)隊(duì)列比較特別的時(shí),采用一種預(yù)占模式,意思就是消費(fèi)者線程取元素時(shí),如果隊(duì)列不為空,則直接取走數(shù)據(jù),若隊(duì)列為空,那就生成一個(gè)節(jié)點(diǎn)(節(jié)點(diǎn)元素為null)入隊(duì),然后消費(fèi)者線程被等待在這個(gè)節(jié)點(diǎn)上,后面生產(chǎn)者線程入隊(duì)時(shí)發(fā)現(xiàn)有一個(gè)元素為null的節(jié)點(diǎn),生產(chǎn)者線程就不入隊(duì)了,直接就將元素填充到該節(jié)點(diǎn),并喚醒該節(jié)點(diǎn)等待的線程,被喚醒的消費(fèi)者線程取走元素。
拒絕策略
線程池有一個(gè)重要的機(jī)制:拒絕策略。當(dāng)線程池workQueue已滿且無(wú)法再創(chuàng)建新線程池時(shí),就要拒絕后續(xù)任務(wù)了。拒絕策略需要實(shí)現(xiàn)RejectedExecutionHandler接口,不過(guò)Executors框架已經(jīng)為我們實(shí)現(xiàn)了4種拒絕策略:
「AbortPolicy」(默認(rèn)):丟棄任務(wù)并拋出 RejectedExecutionException異常。「CallerRunsPolicy」:直接運(yùn)行這個(gè)任務(wù)的 run方法,但并非是由線程池的線程處理,而是交由任務(wù)的調(diào)用線程處理。「DiscardPolicy」:直接丟棄任務(wù),不拋出任何異常。 「DiscardOldestPolicy」:將當(dāng)前處于等待隊(duì)列列頭的等待任務(wù)強(qiáng)行取出,然后再試圖將當(dāng)前被拒絕的任務(wù)提交到線程池執(zhí)行。
線程工廠指定創(chuàng)建線程的方式,這個(gè)參數(shù)不是必選項(xiàng),Executors類已經(jīng)為我們非常貼心地提供了一個(gè)默認(rèn)的線程工廠:
/**
?*?The?default?thread?factory
?*/
static?class?DefaultThreadFactory?implements?ThreadFactory?{
????private?static?final?AtomicInteger?poolNumber?=?new?AtomicInteger(1);
????private?final?ThreadGroup?group;
????private?final?AtomicInteger?threadNumber?=?new?AtomicInteger(1);
????private?final?String?namePrefix;
????DefaultThreadFactory()?{
????????SecurityManager?s?=?System.getSecurityManager();
????????group?=?(s?!=?null)???s.getThreadGroup()?:
??????????????????????????????Thread.currentThread().getThreadGroup();
????????namePrefix?=?"pool-"?+
??????????????????????poolNumber.getAndIncrement()?+
?????????????????????"-thread-";
????}
????public?Thread?newThread(Runnable?r)?{
????????Thread?t?=?new?Thread(group,?r,
??????????????????????????????namePrefix?+?threadNumber.getAndIncrement(),
??????????????????????????????0);
????????if?(t.isDaemon())
????????????t.setDaemon(false);
????????if?(t.getPriority()?!=?Thread.NORM_PRIORITY)
????????????t.setPriority(Thread.NORM_PRIORITY);
????????return?t;
????}
}
線程池狀態(tài)
線程池有5種狀態(tài):
volatile?int?runState;
//?runState?is?stored?in?the?high-order?bits
private?static?final?int?RUNNING????=?-1?<private?static?final?int?SHUTDOWN???=??0?<private?static?final?int?STOP???????=??1?<private?static?final?int?TIDYING????=??2?<private?static?final?int?TERMINATED?=??3?<runState表示當(dāng)前線程池的狀態(tài),它是一個(gè) volatile 變量用來(lái)保證線程之間的可見(jiàn)性。
下面的幾個(gè)static final變量表示runState可能的幾個(gè)取值,有以下幾個(gè)狀態(tài):
「RUNNING」:當(dāng)創(chuàng)建線程池后,初始時(shí),線程池處于 RUNNING狀態(tài);「SHUTDOWN」:如果調(diào)用了 shutdown()方法,則線程池處于SHUTDOWN狀態(tài),此時(shí)線程池不能夠接受新的任務(wù),它會(huì)等待所有任務(wù)執(zhí)行完畢;「STOP」:如果調(diào)用了shutdownNow()方法,則線程池處于 STOP狀態(tài),此時(shí)線程池不能接受新的任務(wù),并且會(huì)去嘗試終止正在執(zhí)行的任務(wù);「TERMINATED」:當(dāng)線程池處于 SHUTDOWN或STOP狀態(tài),并且所有工作線程已經(jīng)銷毀,任務(wù)緩存隊(duì)列已經(jīng)清空或執(zhí)行結(jié)束后,線程池被設(shè)置為TERMINATED狀態(tài)。
初始化&容量調(diào)整&關(guān)閉
「1、線程初始化」
默認(rèn)情況下,創(chuàng)建線程池之后,線程池中是沒(méi)有線程的,需要提交任務(wù)之后才會(huì)創(chuàng)建線程。
在實(shí)際中如果需要線程池創(chuàng)建之后立即創(chuàng)建線程,可以通過(guò)以下兩個(gè)方法辦到:
「prestartCoreThread()」:boolean prestartCoreThread(),初始化一個(gè)核心線程 「prestartAllCoreThreads()」:int prestartAllCoreThreads(),初始化所有核心線程,并返回初始化的線程數(shù)
public?boolean?prestartCoreThread()?{
????return?addIfUnderCorePoolSize(null);?//注意傳進(jìn)去的參數(shù)是null
}
public?int?prestartAllCoreThreads()?{
????int?n?=?0;
????while?(addIfUnderCorePoolSize(null))//注意傳進(jìn)去的參數(shù)是null
????????++n;
????return?n;
}
「2、線程池關(guān)閉」
ThreadPoolExecutor提供了兩個(gè)方法,用于線程池的關(guān)閉:
「shutdown()」:不會(huì)立即終止線程池,而是要等所有任務(wù)緩存隊(duì)列中的任務(wù)都執(zhí)行完后才終止,但再也不會(huì)接受新的任務(wù) 「shutdownNow()」:立即終止線程池,并嘗試打斷正在執(zhí)行的任務(wù),并且清空任務(wù)緩存隊(duì)列,返回尚未執(zhí)行的任務(wù)
「3、線程池容量調(diào)整」
ThreadPoolExecutor提供了動(dòng)態(tài)調(diào)整線程池容量大小的方法:
「setCorePoolSize」:設(shè)置核心池大小 「setMaximumPoolSize」:設(shè)置線程池最大能創(chuàng)建的線程數(shù)目大小
當(dāng)上述參數(shù)從小變大時(shí),ThreadPoolExecutor進(jìn)行線程賦值,還可能立即創(chuàng)建新的線程來(lái)執(zhí)行任務(wù)。
使用線程池
ThreadPoolExecutor
通過(guò)構(gòu)造方法使用ThreadPoolExecutor是線程池最直接的使用方式,下面看一個(gè)實(shí)例:
import?java.util.concurrent.ArrayBlockingQueue;
import?java.util.concurrent.ThreadPoolExecutor;
import?java.util.concurrent.TimeUnit;
public?class?MyTest?{
?public?static?void?main(String[]?args)?{
??//?創(chuàng)建線程池
??ThreadPoolExecutor?threadPool?=?new?ThreadPoolExecutor(3,?5,?5,?TimeUnit.SECONDS,
????new?ArrayBlockingQueue(5));
??//?向線程池提交任務(wù)
??for?(int?i?=?0;?i????threadPool.execute(new?Runnable()?{
????@Override
????public?void?run()?{
?????for?(int?x?=?0;?x?2;?x++)?{
??????System.out.println(Thread.currentThread().getName()?+?":"?+?x);
??????try?{
???????Thread.sleep(2000);
??????}?catch?(InterruptedException?e)?{
???????e.printStackTrace();
??????}
?????}
????}
???});
??}
??//?關(guān)閉線程池
??threadPool.shutdown();?//?設(shè)置線程池的狀態(tài)為SHUTDOWN,然后中斷所有沒(méi)有正在執(zhí)行任務(wù)的線程
??//?threadPool.shutdownNow();?//?設(shè)置線程池的狀態(tài)為STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表,該方法要慎用,容易造成不可控的后果
?}
}
運(yùn)行結(jié)果:
pool-1-thread-2:0
pool-1-thread-1:0
pool-1-thread-3:0
pool-1-thread-2:1
pool-1-thread-3:1
pool-1-thread-1:1
Executors封裝線程池
另外,Executors封裝好了4種常見(jiàn)的功能線程池(還是那么地貼心):
「1、FixedThreadPool」
固定容量線程池。其特點(diǎn)是最大線程數(shù)就是核心線程數(shù),意味著線程池只能創(chuàng)建核心線程,keepAliveTime為0,即線程執(zhí)行完任務(wù)立即回收。任務(wù)隊(duì)列未指定容量,代表使用默認(rèn)值Integer.MAX_VALUE。適用于需要控制并發(fā)線程的場(chǎng)景。
//?使用默認(rèn)線程工廠
public?static?ExecutorService?newFixedThreadPool(int?nThreads)?{
????return?new?ThreadPoolExecutor(nThreads,?nThreads,
??????????????????????????????????0L,?TimeUnit.MILLISECONDS,
??????????????????????????????????new?LinkedBlockingQueue());
}
//?需要自定義線程工廠
public?static?ExecutorService?newFixedThreadPool(int?nThreads,?ThreadFactory?threadFactory)?{
????return?new?ThreadPoolExecutor(nThreads,?nThreads,
??????????????????????????????????0L,?TimeUnit.MILLISECONDS,
??????????????????????????????????new?LinkedBlockingQueue(),
??????????????????????????????????threadFactory);
}
使用示例:
//?1.?創(chuàng)建線程池對(duì)象,設(shè)置核心線程和最大線程數(shù)為5
ExecutorService?fixedThreadPool?=?Executors.newFixedThreadPool(5);
//?2.?創(chuàng)建Runnable(任務(wù))
Runnable?task?=new?Runnable(){
??public?void?run()?{
?????System.out.println(Thread.currentThread().getName()?+?"--->運(yùn)行");
??}
};
//?3.?向線程池提交任務(wù)
fixedThreadPool.execute(task);
「2、 SingleThreadExecutor」
單線程線程池。特點(diǎn)是線程池中只有一個(gè)線程(核心線程),線程執(zhí)行完任務(wù)立即回收,使用有界阻塞隊(duì)列(容量未指定,使用默認(rèn)值Integer.MAX_VALUE)
public?static?ExecutorService?newSingleThreadExecutor()?{
????return?new?FinalizableDelegatedExecutorService
????????(new?ThreadPoolExecutor(1,?1,
????????????????????????????????0L,?TimeUnit.MILLISECONDS,
????????????????????????????????new?LinkedBlockingQueue()));
}
//?為節(jié)省篇幅,省略了自定義線程工廠方式的源碼
使用示例:
//?1.?創(chuàng)建單線程線程池
ExecutorService?singleThreadExecutor?=?Executors.newSingleThreadExecutor();
//?2.?創(chuàng)建Runnable(任務(wù))
Runnable?task?=?new?Runnable(){
??public?void?run()?{
?????System.out.println(Thread.currentThread().getName()?+?"--->運(yùn)行");
??}
};
//?3.?向線程池提交任務(wù)
singleThreadExecutor.execute(task);
「3、 ScheduledThreadPool」
定時(shí)線程池。指定核心線程數(shù)量,普通線程數(shù)量無(wú)限,線程執(zhí)行完任務(wù)立即回收,任務(wù)隊(duì)列為延時(shí)阻塞隊(duì)列。這是一個(gè)比較特別的線程池,適用于「執(zhí)行定時(shí)或周期性的任務(wù)」。
public?static?ScheduledExecutorService?newScheduledThreadPool(int?corePoolSize)?{
????return?new?ScheduledThreadPoolExecutor(corePoolSize);
}
//?繼承了?ThreadPoolExecutor
public?class?ScheduledThreadPoolExecutor?extends?ThreadPoolExecutor
????????implements?ScheduledExecutorService?{
????//?構(gòu)造函數(shù),省略了自定義線程工廠的構(gòu)造函數(shù)
?public?ScheduledThreadPoolExecutor(int?corePoolSize)?{
?????super(corePoolSize,?Integer.MAX_VALUE,?0,?NANOSECONDS,
???????????new?DelayedWorkQueue());
?}
?
?//?延時(shí)執(zhí)行任務(wù)
?public?ScheduledFuture>?schedule(Runnable?command,
???????????????????????????????????????long?delay,
???????????????????????????????????????TimeUnit?unit)?{
????????...
????}
?//?定時(shí)執(zhí)行任務(wù)
?public?ScheduledFuture>?scheduleAtFixedRate(Runnable?command,
??????????????????????????????????????????????????long?initialDelay,
??????????????????????????????????????????????????long?period,
??????????????????????????????????????????????????TimeUnit?unit)?{...}
}
使用示例:
//?1.?創(chuàng)建定時(shí)線程池
ExecutorService?scheduledThreadPool?=?Executors.newScheduledThreadPool(5);
//?2.?創(chuàng)建Runnable(任務(wù))
Runnable?task?=?new?Runnable(){
??public?void?run()?{
?????System.out.println(Thread.currentThread().getName()?+?"--->運(yùn)行");
??}
};
//?3.?向線程池提交任務(wù)
scheduledThreadPool.schedule(task,?2,?TimeUnit.SECONDS);?//?延遲2s后執(zhí)行任務(wù)
scheduledThreadPool.scheduleAtFixedRate(task,50,2000,TimeUnit.MILLISECONDS);//?延遲50ms后、每隔2000ms執(zhí)行任務(wù)
「4、CachedThreadPool」
緩存線程池。沒(méi)有核心線程,普通線程數(shù)量為Integer.MAX_VALUE(可以理解為無(wú)限),線程閑置60s后回收,任務(wù)隊(duì)列使用SynchronousQueue這種無(wú)容量的同步隊(duì)列。適用于「任務(wù)量大但耗時(shí)低」的場(chǎng)景。
public?static?ExecutorService?newCachedThreadPool()?{
????return?new?ThreadPoolExecutor(0,?Integer.MAX_VALUE,
??????????????????????????????????60L,?TimeUnit.SECONDS,
??????????????????????????????????new?SynchronousQueue());
}
使用示例:
//?1.?創(chuàng)建緩存線程池
ExecutorService?cachedThreadPool?=?Executors.newCachedThreadPool();
//?2.?創(chuàng)建Runnable(任務(wù))
Runnable?task?=?new?Runnable(){
??public?void?run()?{
?????System.out.println(Thread.currentThread().getName()?+?"--->運(yùn)行");
??}
};
//?3.?向線程池提交任務(wù)
cachedThreadPool.execute(task);
解讀線程池
OK,相信前面內(nèi)容閱讀起來(lái)還算輕松愉悅吧,那么從這里開(kāi)始就進(jìn)入深水區(qū)了,如果后面內(nèi)容能吃透,那么線程池知識(shí)就真的被你掌握了。
我們知道,向線程池提交任務(wù)是用ThreadPoolExecutor的execute()方法,但在其內(nèi)部,線程任務(wù)的處理其實(shí)是相當(dāng)復(fù)雜的,涉及到ThreadPoolExecutor、Worker、Thread三個(gè)類的6個(gè)方法:

execute()
在ThreadPoolExecutor類中,任務(wù)提交方法的入口是execute(Runnable command)方法(submit()方法也是調(diào)用了execute()),該方法其實(shí)只在嘗試做一件事:經(jīng)過(guò)各種校驗(yàn)之后,調(diào)用 addWorker(Runnable command,boolean core)方法為線程池創(chuàng)建一個(gè)線程并執(zhí)行任務(wù),與之相對(duì)應(yīng),execute() 的結(jié)果有兩個(gè):
「參數(shù)說(shuō)明:」
「Runnable command」:待執(zhí)行的任務(wù)
「執(zhí)行流程:」
1、通過(guò) ctl.get() 得到線程池的當(dāng)前線程數(shù),如果線程數(shù)小于corePoolSize,則調(diào)用 addWorker(commond,true)方法創(chuàng)建新的線程執(zhí)行任務(wù),否則執(zhí)行步驟2;
2、步驟1失敗,說(shuō)明已經(jīng)無(wú)法再創(chuàng)建新線程,那么考慮將任務(wù)放入阻塞隊(duì)列,等待執(zhí)行完任務(wù)的線程來(lái)處理。基于此,判斷線程池是否處于Running狀態(tài)(只有Running狀態(tài)的線程池可以接受新任務(wù)),如果任務(wù)添加到任務(wù)隊(duì)列成功則進(jìn)入步驟3,失敗則進(jìn)入步驟4;
3、來(lái)到這一步需要說(shuō)明任務(wù)已經(jīng)加入任務(wù)隊(duì)列,這時(shí)要二次校驗(yàn)線程池的狀態(tài),會(huì)有以下情形:
線程池不再是 Running狀態(tài)了,需要將任務(wù)從任務(wù)隊(duì)列中移除,如果移除成功則拒絕本次任務(wù)線程池是 Running狀態(tài),則判斷線程池工作線程是否為0,是則調(diào)用addWorker(commond,true)添加一個(gè)沒(méi)有初始任務(wù)的線程(這個(gè)線程將去獲取已經(jīng)加入任務(wù)隊(duì)列的本次任務(wù)并執(zhí)行),否則進(jìn)入步驟4;線程池不是 Running狀態(tài),但從任務(wù)隊(duì)列移除任務(wù)失敗(可能已被某線程獲取?),進(jìn)入步驟4;
4、將線程池?cái)U(kuò)容至maximumPoolSize并調(diào)用 addWorker(commond,false)方法創(chuàng)建新的線程執(zhí)行任務(wù),失敗則拒絕本次任務(wù)。
「流程圖:」

「源碼詳讀:」
/**
?*?在將來(lái)的某個(gè)時(shí)候執(zhí)行給定的任務(wù)。任務(wù)可以在新線程中執(zhí)行,也可以在現(xiàn)有的池線程中執(zhí)行。
?*?如果由于此執(zhí)行器已關(guān)閉或已達(dá)到其容量而無(wú)法提交任務(wù)以供執(zhí)行,則由當(dāng)前的{@code?RejectedExecutionHandler}處理該任務(wù)。
?*?
?*?@param?command?the?task?to?execute??待執(zhí)行的任務(wù)命令
?*/
public?void?execute(Runnable?command)?{
????if?(command?==?null)
????????throw?new?NullPointerException();
????/*
?????*?Proceed?in?3?steps:
?????*?
?????* 1. 如果運(yùn)行的線程少于corePoolSize,將嘗試以給定的命令作為第一個(gè)任務(wù)啟動(dòng)新線程。
?????*
?????*?2.?如果一個(gè)任務(wù)可以成功排隊(duì),那么我們?nèi)匀恍枰屑?xì)檢查兩點(diǎn),其一,我們是否應(yīng)該添加一個(gè)線程
?????*?(因?yàn)樽詮纳洗螜z查至今,一些存在的線程已經(jīng)死亡),其二,線程池狀態(tài)此時(shí)已改變成非運(yùn)行態(tài)。因此,我們重新檢查狀態(tài),如果檢查不通過(guò),則移除已經(jīng)入列的任務(wù),如果檢查通過(guò)且線程池線程數(shù)為0,則啟動(dòng)新線程。
?????*?
?????* 3. 如果無(wú)法將任務(wù)加入任務(wù)隊(duì)列,則將線程池?cái)U(kuò)容到極限容量并嘗試創(chuàng)建一個(gè)新線程,如果失敗則拒絕任務(wù)。
?????*/
????int?c?=?ctl.get();
???
????//?步驟1:判斷線程池當(dāng)前線程數(shù)是否小于線程池大小
????if?(workerCountOf(c)?????????//?增加一個(gè)工作線程并添加任務(wù),成功則返回,否則進(jìn)行步驟2
????????//?true代表使用coreSize作為邊界約束,否則使用maximumPoolSize
????????if?(addWorker(command,?true))
????????????return;
????????c?=?ctl.get();
????}
????//?步驟2:不滿足workerCountOf(c)?< corePoolSize或addWorker失敗,進(jìn)入步驟2
????//?校驗(yàn)線程池是否是Running狀態(tài)且任務(wù)是否成功放入workQueue(阻塞隊(duì)列)
????if?(isRunning(c)?&&?workQueue.offer(command))?{
????????int?recheck?=?ctl.get();
????????//?再次校驗(yàn),如果線程池非Running且從任務(wù)隊(duì)列中移除任務(wù)成功,則拒絕該任務(wù)
????????if?(!?isRunning(recheck)?&&?remove(command))
????????????reject(command);
????????//?如果線程池工作線程數(shù)量為0,則新建一個(gè)空任務(wù)的線程
????????else?if?(workerCountOf(recheck)?==?0)
????????????//?如果線程池不是Running狀態(tài),是加入不進(jìn)去的
????????????addWorker(null,?false);
????}
????//?步驟3:如果線程池不是Running狀態(tài)或任務(wù)入列失敗,嘗試擴(kuò)容maxPoolSize后再次addWorker,失敗則拒絕任務(wù)
????else?if?(!addWorker(command,?false))
????????reject(command);
}
addWorker()
addWorker(Runnable firstTask, boolean core) 方法,顧名思義,向線程池添加一個(gè)帶有任務(wù)的工作線程。
「參數(shù)說(shuō)明:」
「Runnable firstTask」:新創(chuàng)建的線程應(yīng)該首先運(yùn)行的任務(wù)(如果沒(méi)有,則為空)。 「boolean core」:該參數(shù)決定了線程池容量的約束條件,即當(dāng)前線程數(shù)量以何值為極限值。參數(shù)為 true則使用corePollSize作為約束值,否則使用maximumPoolSize。
「執(zhí)行流程:」
1、外層循環(huán)判斷線程池的狀態(tài)是否可以新增工作線程。這層校驗(yàn)基于下面兩個(gè)原則:
線程池為 Running狀態(tài)時(shí),既可以接受新任務(wù)也可以處理任務(wù)線程池為關(guān)閉狀態(tài)時(shí)只能新增空任務(wù)的工作線程( worker)處理任務(wù)隊(duì)列(workQueue)中的任務(wù)不能接受新任務(wù)
2、內(nèi)層循環(huán)向線程池添加工作線程并返回是否添加成功的結(jié)果。
首先校驗(yàn)線程數(shù)是否已經(jīng)超限制,是則返回 false,否則進(jìn)入下一步通過(guò) CAS使工作線程數(shù)+1,成功則進(jìn)入步驟3,失敗則再次校驗(yàn)線程池是否是運(yùn)行狀態(tài),是則繼續(xù)內(nèi)層循環(huán),不是則返回外層循環(huán)
3、核心線程數(shù)量+1成功的后續(xù)操作:添加到工作線程集合,并啟動(dòng)工作線程
首先獲取鎖之后,再次校驗(yàn)線程池狀態(tài)(具體校驗(yàn)規(guī)則見(jiàn)代碼注解),通過(guò)則進(jìn)入下一步,未通過(guò)則添加線程失敗 線程池狀態(tài)校驗(yàn)通過(guò)后,再檢查線程是否已經(jīng)啟動(dòng),是則拋出異常,否則嘗試將線程加入線程池 檢查線程是否啟動(dòng)成功,成功則返回 true,失敗則進(jìn)入addWorkerFailed方法
「流程圖:」

「源碼詳讀:」
private?boolean?addWorker(Runnable?firstTask,?boolean?core)?{
????//?外層循環(huán):判斷線程池狀態(tài)
????retry:
????for?(;;)?{
????????int?c?=?ctl.get();
????????int?rs?=?runStateOf(c);
????????/**?
?????????*?1.線程池為非Running狀態(tài)(Running狀態(tài)則既可以新增核心線程也可以接受任務(wù))
?????????*?2.線程為shutdown狀態(tài)且firstTask為空且隊(duì)列不為空
?????????*?3.滿足條件1且條件2不滿足,則返回false
?????????* 4.條件2解讀:線程池為shutdown狀態(tài)時(shí)且任務(wù)隊(duì)列不為空時(shí),可以新增空任務(wù)的線程來(lái)處理隊(duì)列中的任務(wù)
?????????*/
????????if?(rs?>=?SHUTDOWN?&&
????????????!?(rs?==?SHUTDOWN?&&
???????????????firstTask?==?null?&&
???????????????!?workQueue.isEmpty()))
????????????return?false;
??//?內(nèi)層循環(huán):線程池添加核心線程并返回是否添加成功的結(jié)果
????????for?(;;)?{
????????????int?wc?=?workerCountOf(c);
????????????//?校驗(yàn)線程池已有線程數(shù)量是否超限:
????????????//?1.線程池最大上限CAPACITY?
????????????//?2.corePoolSize或maximumPoolSize(取決于入?yún)ore)
????????????if?(wc?>=?CAPACITY?||
????????????????wc?>=?(core???corePoolSize?:?maximumPoolSize))?
????????????????return?false;
????????????//?通過(guò)CAS操作使工作線程數(shù)+1,跳出外層循環(huán)
????????????if?(compareAndIncrementWorkerCount(c))?
????????????????break?retry;
????????????//?線程+1失敗,重讀ctl
????????????c?=?ctl.get();???//?Re-read?ctl
????????????//?如果此時(shí)線程池狀態(tài)不再是running,則重新進(jìn)行外層循環(huán)
????????????if?(runStateOf(c)?!=?rs)
????????????????continue?retry;
????????????//?其他?CAS?失敗是因?yàn)楣ぷ骶€程數(shù)量改變了,繼續(xù)內(nèi)層循環(huán)嘗試CAS對(duì)線程數(shù)+1
????????????//?else?CAS?failed?due?to?workerCount?change;?retry?inner?loop
????????}
????}
????/**
?????*?核心線程數(shù)量+1成功的后續(xù)操作:添加到工作線程集合,并啟動(dòng)工作線程
?????*/
????boolean?workerStarted?=?false;
????boolean?workerAdded?=?false;
????Worker?w?=?null;
????try?{
????????final?ReentrantLock?mainLock?=?this.mainLock;
????????w?=?new?Worker(firstTask);
????????final?Thread?t?=?w.thread;
????????if?(t?!=?null)?{
????????????//?下面代碼需要加鎖:線程池主鎖
????????????mainLock.lock();?
????????????try?{
????????????????//?持鎖期間重新檢查,線程工廠創(chuàng)建線程失敗或獲取鎖之前關(guān)閉的情況發(fā)生時(shí),退出
????????????????int?c?=?ctl.get();
????????????????int?rs?=?runStateOf(c);
????//?再次檢驗(yàn)線程池是否是running狀態(tài)或線程池shutdown但線程任務(wù)為空
????????????????if?(rs?????????????????????(rs?==?SHUTDOWN?&&?firstTask?==?null))?{
????????????????????//?線程已經(jīng)啟動(dòng),則拋出非法線程狀態(tài)異常
????????????????????//?為什么會(huì)存在這種狀態(tài)呢?未解決
????????????????????if?(t.isAlive())?//?precheck?that?t?is?startable
????????????????????????throw?new?IllegalThreadStateException();
????????????????????workers.add(w);?//加入線程池
????????????????????int?s?=?workers.size();
????????????????????//?如果當(dāng)前工作線程數(shù)超過(guò)線程池曾經(jīng)出現(xiàn)過(guò)的最大線程數(shù),刷新后者值
????????????????????if?(s?>?largestPoolSize)
????????????????????????largestPoolSize?=?s;?
????????????????????workerAdded?=?true;
????????????????}
????????????}?finally?{
????????????????mainLock.unlock();??//?釋放鎖
????????????}
????????????if?(workerAdded)?{?//?工作線程添加成功,啟動(dòng)該線程
????????????????t.start();
????????????????workerStarted?=?true;
????????????}
????????}
????}?finally?{
????????//線程啟動(dòng)失敗,則進(jìn)入addWorkerFailed
????????if?(!?workerStarted)?
????????????addWorkerFailed(w);
????}
????return?workerStarted;
}
Worker類
Worker類是內(nèi)部類,既實(shí)現(xiàn)了Runnable,又繼承了AbstractQueuedSynchronizer(以下簡(jiǎn)稱AQS),所以其既是一個(gè)可執(zhí)行的任務(wù),又可以達(dá)到鎖的效果。
Worker類主要維護(hù)正在運(yùn)行任務(wù)的線程的中斷控制狀態(tài),以及其他次要的記錄。這個(gè)類適時(shí)地繼承了AbstractQueuedSynchronizer類,以簡(jiǎn)化獲取和釋放鎖(該鎖作用于每個(gè)任務(wù)執(zhí)行代碼)的過(guò)程。這樣可以防止去中斷正在運(yùn)行中的任務(wù),只會(huì)中斷在等待從任務(wù)隊(duì)列中獲取任務(wù)的線程。
我們實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的不可重入互斥鎖,而不是使用可重入鎖,因?yàn)槲覀儾幌Mぷ魅蝿?wù)在調(diào)用setCorePoolSize之類的池控制方法時(shí)能夠重新獲取鎖。另外,為了在線程真正開(kāi)始運(yùn)行任務(wù)之前禁止中斷,我們將鎖狀態(tài)初始化為負(fù)值,并在啟動(dòng)時(shí)清除它(在runWorker中)。
private?final?class?Worker
????extends?AbstractQueuedSynchronizer
????implements?Runnable
{
????/**
?????*?This?class?will?never?be?serialized,?but?we?provide?a
?????*?serialVersionUID?to?suppress?a?javac?warning.
?????*/
????private?static?final?long?serialVersionUID?=?6138294804551838833L;
?
????/**?Thread?this?worker?is?running?in.??Null?if?factory?fails.?*/
????final?Thread?thread;?
?????
????/**?Initial?task?to?run.??Possibly?null.?*/
????Runnable?firstTask;
?????
????/**?Per-thread?task?counter?*/
????volatile?long?completedTasks;
?
????/**
?????*?Creates?with?given?first?task?and?thread?from?ThreadFactory.
?????*?@param?firstTask?the?first?task?(null?if?none)
?????*/
????//?通過(guò)構(gòu)造函數(shù)初始化,
????Worker(Runnable?firstTask)?{
????????//設(shè)置AQS的同步狀態(tài)
????????// state:鎖狀態(tài),-1為初始值,0為unlock狀態(tài),1為lock狀態(tài)
????????setState(-1);?//?inhibit?interrupts?until?runWorker??在調(diào)用runWorker前,禁止中斷
???????
????????this.firstTask?=?firstTask;
????????//?線程工廠創(chuàng)建一個(gè)線程
????????this.thread?=?getThreadFactory().newThread(this);?
????}
?
????/**?Delegates?main?run?loop?to?outer?runWorker??*/
????public?void?run()?{
????????runWorker(this);?//runWorker()是ThreadPoolExecutor的方法
????}
?
????//?Lock?methods
????//?The?value?0?represents?the?unlocked?state.?0代表“沒(méi)被鎖定”狀態(tài)
????//?The?value?1?represents?the?locked?state.?1代表“鎖定”狀態(tài)
?
????protected?boolean?isHeldExclusively()?{
????????return?getState()?!=?0;
????}
?
????/**
?????*?嘗試獲取鎖的方法
?????*?重寫(xiě)AQS的tryAcquire(),AQS本來(lái)就是讓子類來(lái)實(shí)現(xiàn)的
?????*/
????protected?boolean?tryAcquire(int?unused)?{
????????//?判斷原值為0,且重置為1,所以state為-1時(shí),鎖無(wú)法獲取。
????????//?每次都是0->1,保證了鎖的不可重入性
????????if?(compareAndSetState(0,?1))?{
????????????//?設(shè)置exclusiveOwnerThread=當(dāng)前線程
????????????setExclusiveOwnerThread(Thread.currentThread());?
????????????return?true;
????????}
????????return?false;
????}
?
????/**
?????*?嘗試釋放鎖
?????*?不是state-1,而是置為0
?????*/
????protected?boolean?tryRelease(int?unused)?{
????????setExclusiveOwnerThread(null);?
????????setState(0);
????????return?true;
????}
?
????public?void?lock()????????{?acquire(1);?}
????public?boolean?tryLock()??{?return?tryAcquire(1);?}
????public?void?unlock()??????{?release(1);?}
????public?boolean?isLocked()?{?return?isHeldExclusively();?}
?
????/**
?????*?中斷(如果運(yùn)行)
?????*?shutdownNow時(shí)會(huì)循環(huán)對(duì)worker線程執(zhí)行
?????*?且不需要獲取worker鎖,即使在worker運(yùn)行時(shí)也可以中斷
?????*/
????void?interruptIfStarted()?{
????????Thread?t;
????????//如果state>=0、t!=null、且t沒(méi)有被中斷
????????//new?Worker()時(shí)state==-1,說(shuō)明不能中斷
????????if?(getState()?>=?0?&&?(t?=?thread)?!=?null?&&?!t.isInterrupted())?{
????????????try?{
????????????????t.interrupt();
????????????}?catch?(SecurityException?ignore)?{
????????????}
????????}
????}
}
runWorker()
可以說(shuō),runWorker(Worker w) 是線程池中真正處理任務(wù)的方法,前面的execute() 和 addWorker() 都是在為該方法做準(zhǔn)備和鋪墊。
「參數(shù)說(shuō)明:」
「Worker w」:封裝的Worker,攜帶了工作線程的諸多要素,包括 Runnable(待處理任務(wù))、lock(鎖)、completedTasks(記錄線程池已完成任務(wù)數(shù))
「執(zhí)行流程:」
1、判斷當(dāng)前任務(wù)或者從任務(wù)隊(duì)列中獲取的任務(wù)是否不為空,都為空則進(jìn)入步驟2,否則進(jìn)入步驟3
2、任務(wù)為空,則將completedAbruptly置為false(即線程不是突然終止),并執(zhí)行processWorkerExit(w,completedAbruptly)方法進(jìn)入線程退出程序
3、任務(wù)不為空,則進(jìn)入循環(huán),并加鎖
4、判斷是否為線程添加中斷標(biāo)識(shí),以下兩個(gè)條件滿足其一則添加中斷標(biāo)識(shí):
線程池狀態(tài)>= STOP,即STOP或TERMINATED一開(kāi)始判斷線程池狀態(tài)< STOP,接下來(lái)檢查發(fā)現(xiàn)Thread.interrupted()為true,即線程已經(jīng)被中斷,再次檢查線程池狀態(tài)是否>=STOP(以消除該瞬間shutdown方法生效,使線程池處于STOP或TERMINATED)
5、執(zhí)行前置方法 beforeExecute(wt, task)(該方法為空方法,由子類實(shí)現(xiàn))后執(zhí)行task.run() 方法執(zhí)行任務(wù)(執(zhí)行不成功拋出相應(yīng)異常)
6、執(zhí)行后置方法 afterExecute(task, thrown)(該方法為空方法,由子類實(shí)現(xiàn))后將線程池已完成的任務(wù)數(shù)+1,并釋放鎖。
7、再次進(jìn)行循環(huán)條件判斷。
「流程圖:」

「源碼詳讀:」
final?void?runWorker(Worker?w)?{
????Thread?wt?=?Thread.currentThread();
????Runnable?task?=?w.firstTask;
????w.firstTask?=?null;
????//?allow?interrupts
????//?new?Worker()是state==-1,此處是調(diào)用Worker類的tryRelease()方法,將state置為0,而interruptIfStarted()中只有state>=0才允許調(diào)用中斷
????w.unlock();?
????????????
????//?線程退出的原因,true是任務(wù)導(dǎo)致,false是線程正常退出
????boolean?completedAbruptly?=?true;?
????try?{
????????//?當(dāng)前任務(wù)和從任務(wù)隊(duì)列中獲取的任務(wù)都為空,方停止循環(huán)
????????while?(task?!=?null?||?(task?=?getTask())?!=?null)?{
????????????//上鎖可以防止在shutdown()時(shí)終止正在運(yùn)行的worker,而不是應(yīng)對(duì)并發(fā)
????????????w.lock();?
?????????????
????????????//?If?pool?is?stopping,?ensure?thread?is?interrupted;
????????????//?if?not,?ensure?thread?is?not?interrupted.??This
????????????//?requires?a?recheck?in?second?case?to?deal?with
????????????//?shutdownNow?race?while?clearing?interrupt
????????????/**
?????????????*?判斷1:確保只有在線程處于stop狀態(tài)且wt未中斷時(shí),wt才會(huì)被設(shè)置中斷標(biāo)識(shí)
?????????????*?條件1:線程池狀態(tài)>=STOP,即STOP或TERMINATED
?????????????*?條件2:一開(kāi)始判斷線程池狀態(tài)=STOP(以消除該瞬間shutdown方法生效,使線程池處于STOP或TERMINATED),
?????????????*?條件1與條件2任意滿意一個(gè),且wt不是中斷狀態(tài),則中斷wt,否則進(jìn)入下一步
?????????????*/
????????????if?((runStateAtLeast(ctl.get(),?STOP)?||
?????????????????(Thread.interrupted()?&&
??????????????????runStateAtLeast(ctl.get(),?STOP)))?&&
????????????????!wt.isInterrupted())
????????????????wt.interrupt();?//當(dāng)前線程調(diào)用interrupt()中斷
?????????????
????????????try?{
????????????????//執(zhí)行前(空方法,由子類重寫(xiě)實(shí)現(xiàn))
????????????????beforeExecute(wt,?task);
?????????????????
????????????????Throwable?thrown?=?null;
????????????????try?{
????????????????????task.run();
????????????????}?
????????????????catch?(RuntimeException?x)?{
????????????????????thrown?=?x;?throw?x;
????????????????}?
????????????????catch?(Error?x)?{
????????????????????thrown?=?x;?throw?x;
????????????????}?
????????????????catch?(Throwable?x)?{
????????????????????thrown?=?x;?throw?new?Error(x);
????????????????}?
????????????????finally?{
????????????????????//執(zhí)行后(空方法,由子類重寫(xiě)實(shí)現(xiàn))
????????????????????afterExecute(task,?thrown);?
????????????????}
????????????}?
????????????finally?{
????????????????task?=?null;?
????????????????w.completedTasks++;?//完成任務(wù)數(shù)+1
????????????????w.unlock();?//釋放鎖
????????????}
????????}
????????//?
????????completedAbruptly?=?false;
????}?
????finally?{
????????//處理worker的退出
????????processWorkerExit(w,?completedAbruptly);
????}
}
「5、getTask()」
由函數(shù)調(diào)用關(guān)系圖可知,在ThreadPoolExecutor類的實(shí)現(xiàn)中,Runnable getTask() 方法是為void runWorker(Worker w)方法服務(wù)的,它的作用就是在任務(wù)隊(duì)列(workQueue)中獲取 task(Runnable)。
「參數(shù)說(shuō)明」:無(wú)參數(shù)
「執(zhí)行流程」:
將 timedOut(上次獲取任務(wù)是否超時(shí))置為false(首次執(zhí)行方法,無(wú)上次,自然為false),進(jìn)入一個(gè)無(wú)限循環(huán)如果線程池為 Shutdown狀態(tài)且任務(wù)隊(duì)列為空(線程池shutdown狀態(tài)可以處理任務(wù)隊(duì)列中的任務(wù),不再接受新任務(wù),這個(gè)是重點(diǎn))或者線程池為STOP或TERMINATED狀態(tài),則意味著線程池不必再獲取任務(wù)了,當(dāng)前工作線程數(shù)量-1并返回null,否則進(jìn)入步驟3如果線程池?cái)?shù)量超限制或者時(shí)間超限且(任務(wù)隊(duì)列為空或當(dāng)前線程數(shù)>1),則進(jìn)入步驟4,否則進(jìn)入步驟5。 移除工作線程,成功則返回 null,不成功則進(jìn)入下輪循環(huán)。嘗試用 poll()或者take()(具體用哪個(gè)取決于timed的值)獲取任務(wù),如果任務(wù)不為空,則返回該任務(wù)。如果為空,則將timeOut置為true進(jìn)入下一輪循環(huán)。如果獲取任務(wù)過(guò)程發(fā)生異常,則將timeOut置為 false 后進(jìn)入下一輪循環(huán)。
「流程圖」:

「源碼詳讀:」
private?Runnable?getTask()?{
????//?最新一次poll是否超時(shí)
????boolean?timedOut?=?false;?//?Did?the?last?poll()?time?out?
????for?(;;)?{
????????int?c?=?ctl.get();
????????int?rs?=?runStateOf(c);
????????//?Check?if?queue?empty?only?if?necessary.
????????/**
?????????*?條件1:線程池狀態(tài)SHUTDOWN、STOP、TERMINATED狀態(tài)
?????????*?條件2:線程池STOP、TERMINATED狀態(tài)或workQueue為空
?????????*?條件1與條件2同時(shí)為true,則workerCount-1,并且返回null
?????????*?注:條件2是考慮到SHUTDOWN狀態(tài)的線程池不會(huì)接受任務(wù),但仍會(huì)處理任務(wù)
?????????*/
????????if?(rs?>=?SHUTDOWN?&&?(rs?>=?STOP?||?workQueue.isEmpty()))?{
????????????decrementWorkerCount();
????????????return?null;
????????}
????????int?wc?=?workerCountOf(c);
????????//?Are?workers?subject?to?culling?
????????/**
?????????*?下列兩個(gè)條件滿足任意一個(gè),則給當(dāng)前正在嘗試獲取任務(wù)的工作線程設(shè)置阻塞時(shí)間限制(超時(shí)會(huì)被銷毀?不太確定這點(diǎn)),否則線程可以一直保持活躍狀態(tài)
?????????* 1.allowCoreThreadTimeOut:當(dāng)前線程是否以keepAliveTime為超時(shí)時(shí)限等待任務(wù)
?????????*?2.當(dāng)前線程數(shù)量已經(jīng)超越了核心線程數(shù)
?????????*/
????????boolean?timed?=?allowCoreThreadTimeOut?||?wc?>?corePoolSize;
????????????
????????//?兩個(gè)條件全部為true,則通過(guò)CAS使工作線程數(shù)-1,即剔除工作線程
????????//?條件1:工作線程數(shù)大于maximumPoolSize,或(工作線程阻塞時(shí)間受限且上次在任務(wù)隊(duì)列拉取任務(wù)超時(shí))
????????//?條件2:wc > 1或任務(wù)隊(duì)列為空
????????if?((wc?>?maximumPoolSize?||?(timed?&&?timedOut))
????????????&&?(wc?>?1?||?workQueue.isEmpty()))?{
????????????//?移除工作線程,成功則返回null,不成功則進(jìn)入下輪循環(huán)
????????????if?(compareAndDecrementWorkerCount(c))
????????????????return?null;
????????????continue;
????????}
?????//?執(zhí)行到這里,說(shuō)明已經(jīng)經(jīng)過(guò)前面重重校驗(yàn),開(kāi)始真正獲取task了
????????try?{
????????????//?如果工作線程阻塞時(shí)間受限,則使用poll(),否則使用take()
????????????//?poll()設(shè)定阻塞時(shí)間,而take()無(wú)時(shí)間限制,直到拿到結(jié)果為止
????????????Runnable?r?=?timed??
????????????????workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS)?:
????????????????workQueue.take();
????????????//?r不為空,則返回該Runnable
????????????if?(r?!=?null)
????????????????return?r;
????????????//?沒(méi)能獲取到Runable,則將最近獲取任務(wù)是否超時(shí)設(shè)置為true
????????????timedOut?=?true;
????????}?catch?(InterruptedException?retry)?{
????????????//?響應(yīng)中斷,進(jìn)入下一次循環(huán)前將最近獲取任務(wù)超時(shí)狀態(tài)置為false
????????????timedOut?=?false;
????????}
????}
}
processWorkerExit()
processWorkerExit(Worker w, boolean completedAbruptly)執(zhí)行線程退出的方法
「參數(shù)說(shuō)明:」
「Worker w」:要結(jié)束的工作線程。 「boolean completedAbruptly」:是否突然完成(異常導(dǎo)致),如果工作線程因?yàn)橛脩舢惓K劳觯瑒t completedAbruptly參數(shù)為true。
「執(zhí)行流程:」
1、如果 completedAbruptly 為 true,即工作線程因?yàn)楫惓M蝗凰劳觯瑒t執(zhí)行工作線程-1操作。
2、主線程獲取鎖后,線程池已經(jīng)完成的任務(wù)數(shù)追加 w(當(dāng)前工作線程) 完成的任務(wù)數(shù),并從worker的set集合中移除當(dāng)前worker。
3、根據(jù)線程池狀態(tài)進(jìn)行判斷是否執(zhí)行tryTerminate()結(jié)束線程池。
4、是否需要增加工作線程,如果線程池還沒(méi)有完全終止,仍需要保持一定數(shù)量的線程。
如果當(dāng)前線程是突然終止的,調(diào)用
addWorker()創(chuàng)建工作線程當(dāng)前線程不是突然終止,但當(dāng)前工作線程數(shù)量小于線程池需要維護(hù)的線程數(shù)量,則創(chuàng)建工作線程。需要維護(hù)的線程數(shù)量為
corePoolSize(取決于成員變量allowCoreThreadTimeOut是否為false)或1。源碼詳讀:
/**
?*?Performs?cleanup?and?bookkeeping?for?a?dying?worker.?Called
?*?only?from?worker?threads.?Unless?completedAbruptly?is?set,
?*?assumes?that?workerCount?has?already?been?adjusted?to?account
?*?for?exit.??This?method?removes?thread?from?worker?set,?and
?*?possibly?terminates?the?pool?or?replaces?the?worker?if?either
?*?it?exited?due?to?user?task?exception?or?if?fewer?than
?*?corePoolSize?workers?are?running?or?queue?is?non-empty?but
?*?there?are?no?workers.
?*
?*?@param?w?the?worker
?*?@param?completedAbruptly?if?the?worker?died?due?to?user?exception
?*/
private?void?processWorkerExit(Worker?w,?boolean?completedAbruptly)?{
????/**
?????*?1.工作線程-1操作
?????*?1)如果completedAbruptly?為true,說(shuō)明工作線程發(fā)生異常,那么將正在工作的線程數(shù)量-1
?????*?2)如果completedAbruptly?為false,說(shuō)明工作線程無(wú)任務(wù)可以執(zhí)行,由getTask()執(zhí)行worker-1操作
?????*/
????if?(completedAbruptly)?//?If?abrupt,?then?workerCount?wasn't?adjusted
????????decrementWorkerCount();
????//?2.從線程set集合中移除工作線程,該過(guò)程需要加鎖
????final?ReentrantLock?mainLock?=?this.mainLock;
????mainLock.lock();
????try?{
????????//?將該worker已完成的任務(wù)數(shù)追加到線程池已完成的任務(wù)數(shù)
????????completedTaskCount?+=?w.completedTasks;
????????//?HashSet中移除該worker
????????workers.remove(w);
????}?finally?{
????????mainLock.unlock();
????}
????
?//?3.根據(jù)線程池狀態(tài)進(jìn)行判斷是否結(jié)束線程池
????tryTerminate();
?
?/**
?????*?4.是否需要增加工作線程
?????*?線程池狀態(tài)是running?或?shutdown
?????*?如果當(dāng)前線程是突然終止的,addWorker()
?????*?如果當(dāng)前線程不是突然終止的,但當(dāng)前線程數(shù)量??????*?故如果調(diào)用線程池shutdown(),直到workQueue為空前,線程池都會(huì)維持corePoolSize個(gè)線程,然后再逐漸銷毀這corePoolSize個(gè)線程
?????*/
????int?c?=?ctl.get();
????if?(runStateLessThan(c,?STOP))?{
???????if?(!completedAbruptly)?{
????????????int?min?=?allowCoreThreadTimeOut???0?:?corePoolSize;
????????????if?(min?==?0?&&?!?workQueue.isEmpty())
????????????????min?=?1;
????????????if?(workerCountOf(c)?>=?min)
????????????????return;?//?replacement?not?needed
????????}
????????addWorker(null,?false);
????}
}
好啦,以上就是Java線程池的全部?jī)?nèi)容啦,堅(jiān)持讀完的伙伴兒們你們收獲如何?覺(jué)得有幫助的就順手點(diǎn)個(gè)贊吧,祝大家新年新氣象,升級(jí)加薪!
