Java線程池之ThreadPoolExecutor
日常開(kāi)發(fā)中對(duì)于多線程的使用,一般很少直接new Thread。因?yàn)榫€程的頻繁創(chuàng)建、銷(xiāo)毀會(huì)耗費(fèi)大量的系統(tǒng)資源。為此基于池化技術(shù)的線程池應(yīng)運(yùn)而生

Executors類
在Executors類中提供了很多創(chuàng)建線程池的工廠方法。這里介紹下一些常見(jiàn)的工廠方法
newFixedThreadPool
該方法創(chuàng)建一個(gè)固定數(shù)量線程的線程池
public?class?TestThreadPool?{
????/**
?????*?定長(zhǎng)的線程池
?????*/
????public?static?void?test1()?{
????????ExecutorService?executor?=?Executors.newFixedThreadPool(3);
????????work(executor);
????????executor.shutdown();
????}
????private?static?void?work(ExecutorService?executor)?{
????????for?(int?i=0;?i<10;?i++)?{
????????????executor.submit(?new?Job("Job-"+i)?);
????????}
????}
}
從測(cè)試結(jié)果可以看到,無(wú)論多少個(gè)任務(wù),可用的線程數(shù)量都是固定的

newCachedThreadPool
該方法創(chuàng)建一個(gè)線程池,當(dāng)沒(méi)有空閑線程可用時(shí),其會(huì)一直創(chuàng)建新的線程來(lái)處理任務(wù)
public?class?TestThreadPool?{
????/**
?????*?可緩存的線程池
?????*/
????public?static?void?test2()?{
????????ExecutorService?executor?=?Executors.newCachedThreadPool();
????????work(executor);
????????executor.shutdown();
????}
}
測(cè)試結(jié)果如下所示

newSingleThreadExecutor
該方法創(chuàng)建的線程池中只有一個(gè)線程,故提交至此的任務(wù)會(huì)依次執(zhí)行
public?class?TestThreadPool?{
????/**
?????*?單線程的線程池
?????*/
????public?static?void?test3()?{
????????ExecutorService?executor?=?Executors.newSingleThreadExecutor();
????????work(executor);
????????executor.shutdown();
????}
}
測(cè)試結(jié)果如下所示

newScheduledThreadPool
該方法創(chuàng)建的線程池可用于執(zhí)行定時(shí)任務(wù)
public?class?TestThreadPool?{
????/**
?????*?定時(shí)任務(wù)的線程池
?????*/
????public?static?void?test4()?{
????????Consumer?task?=?(String?taskName)?->?{
????????????String?now?=?DateUtil.format(?new?Date(),?"HH:mm:ss"?);
????????????String?info?=?"?+?now?+?"?[Thread]:?"?+?Thread.currentThread().getName()?+?"?:?" ?+?taskName;
????????????System.out.println(?info?);
????????????try{
????????????????//?耗時(shí)模擬:5秒
????????????????Thread.sleep(1000*5);
????????????}catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????};
????????ScheduledExecutorService?executorService?=?Executors.newScheduledThreadPool(4);
????????System.out.println(?"?+?DateUtil.format(?new?Date(),?"HH:mm:ss")?);
????????//?一次性任務(wù),?延遲10秒后執(zhí)行
????????executorService.schedule(?()->task.accept("OneTime"),?10,?TimeUnit.SECONDS?);
????????//?定時(shí)任務(wù):?延遲30秒啟動(dòng),每次間隔10秒開(kāi)始執(zhí)行
????????executorService.scheduleAtFixedRate(?()->task.accept("fixedRate"),?30,?10,?TimeUnit.SECONDS);
????????//?定時(shí)任務(wù):?延遲30秒啟動(dòng),每次完成10秒后再次執(zhí)行
????????executorService.scheduleWithFixedDelay(?()->task.accept("fixedDelay")?,?30,?10,?TimeUnit.SECONDS);
????}
}
其中上述代碼中scheduleAtFixedRate、scheduleWithFixedDelay方法第三個(gè)參數(shù)的含義分別是兩次任務(wù)開(kāi)始執(zhí)行的間隔時(shí)間、上一次任務(wù)結(jié)束至本次任務(wù)開(kāi)始的間隔時(shí)間。與SpringBoot中的@Scheduled(fixedRate)、@Scheduled(fixedDelay)注解的用途類似。值得一提的是,對(duì)于scheduleAtFixedRate而言,當(dāng) 我們指定的兩次任務(wù)開(kāi)始執(zhí)行的間隔時(shí)間 小于 該任務(wù)執(zhí)行一次所需的耗時(shí) 時(shí),將會(huì)以 該任務(wù)執(zhí)行所需的耗時(shí) 作為 兩次任務(wù)開(kāi)始執(zhí)行的實(shí)際間隔時(shí)間
測(cè)試結(jié)果如下所示。從藍(lán)框可以看出,三個(gè)任務(wù)的第一次執(zhí)行時(shí)機(jī)均按指定的延時(shí)時(shí)間(分別延遲10秒、30秒、30秒)啟動(dòng);從綠框可知,對(duì)于名為fixedRate的任務(wù)而言,每次開(kāi)始執(zhí)行的間隔為10秒;從紅框可知,對(duì)于名為fixedDelay的任務(wù)而言,每次開(kāi)始執(zhí)行的間隔為15秒。因?yàn)槠渖弦淮稳蝿?wù)結(jié)束至本次任務(wù)開(kāi)始的時(shí)間間隔為10秒,加上該任務(wù)本身耗時(shí)5秒,故累計(jì)為15秒

ThreadPoolExecutor類
概述
事實(shí)上對(duì)于上述的工廠方法而言,其內(nèi)部是使用線程池ThreadPoolExecutor類。該類的繼承結(jié)構(gòu)如下所示

與線程類似。對(duì)于線程池而言,其整個(gè)生命周期階段也存在若干不同的狀態(tài)。具體如下
Running:該狀態(tài)下,線程池可以接受新任務(wù),并能夠處理阻塞隊(duì)列中的任務(wù) ShutDown:該狀態(tài)下,線程池不再可以接受新任務(wù),但能夠繼續(xù)處理阻塞隊(duì)列中的任務(wù) Stop:該狀態(tài)下,線程池不再可以接受新任務(wù),也不會(huì)繼續(xù)處理阻塞隊(duì)列中的任務(wù)。同時(shí)會(huì)中斷正在處理的任務(wù) Tidying:該狀態(tài)下,線程池中的工作線程數(shù)量為0。并且會(huì)調(diào)用terminated()鉤子方法(hook method) Terminated:當(dāng)terminated()鉤子方法(hook method)執(zhí)行完畢后,線程池進(jìn)入該狀態(tài)
各狀態(tài)的變化流程如下所示

值得一提的是,在ThreadPoolExecutor的實(shí)現(xiàn)過(guò)程中。其通過(guò)一個(gè)AtomicInteger類型的原子變量ctl實(shí)現(xiàn)了對(duì)線程池狀態(tài)、工作線程數(shù)的記錄。具體來(lái)說(shuō),是將高3位用于表示線程池狀態(tài),剩余位表示工作線程數(shù)。runStateOf方法用于獲取線程池狀態(tài)信息,workerCountOf方法用于獲取工作線程數(shù)

在實(shí)際應(yīng)用過(guò)程中,線程池ThreadPoolExecutor常見(jiàn)參數(shù)如下:
corePoolSize:線程池的核心線程數(shù) maximumPoolSize:線程池的最大線程數(shù) keepAliveTime:空閑線程的超時(shí)時(shí)間,用于終止空閑線程。通常其只對(duì)線程池中超過(guò)corePoolSize的多余線程生效。除非allowCoreThreadTimeOut屬性設(shè)為true,才會(huì)對(duì)核心線程生效 unit:keepAliveTime參數(shù)的時(shí)間單位。其可選值定義在枚舉類TimeUnit中 workQueue:任務(wù)的阻塞隊(duì)列 handler:當(dāng) 任務(wù)隊(duì)列workQueue已滿 且 線程數(shù)已達(dá)到maximumPoolSize ,提交新任務(wù)時(shí)的拒絕策略
其在接收任務(wù)后的基本流程如下所示

拒絕策略
JDK提供了四種拒絕策略,均實(shí)現(xiàn)了RejectedExecutionHandler接口
DiscardPolicy 丟棄策略
該策略下當(dāng)提交的任務(wù) 無(wú)空閑線程執(zhí)行 或 任務(wù)隊(duì)列已滿 時(shí),則會(huì)直接被丟棄且不會(huì)產(chǎn)生任何異常
public?class?RejectedPolicyDemo?{
????/**
?????*?創(chuàng)建一個(gè)線程池,其最多只會(huì)創(chuàng)建2個(gè)線程,任務(wù)隊(duì)列最多存放1個(gè)任務(wù)
?????*/
????private?static?ThreadPoolExecutor?executor?=?new?ThreadPoolExecutor(
????????2,?2,?60,?TimeUnit.SECONDS,?new?ArrayBlockingQueue<>(1)?);
????public?static?void?test1()?{
????????//?拒絕策略:直接丟棄
????????System.out.println("--------------------?拒絕策略:直接丟棄?--------------------");
????????executor.setRejectedExecutionHandler(?new?ThreadPoolExecutor.DiscardPolicy()?);
????????executeJob(?"clean"?);
????????executor.shutdown();
????}
????private?static?void?executeJob(String?name)?{
????????for?(int?i=0;?i<10;?i++)?{
????????????executor.submit(?new?Job(?name+"-"+i)?);
????????}
????}
}
測(cè)試結(jié)果如下所示,#3-#9號(hào)任務(wù)被直接丟棄了

DiscardOldestPolicy 丟棄最老策略
該策略下當(dāng)提交的任務(wù) 無(wú)空閑線程執(zhí)行 或 任務(wù)隊(duì)列已滿 時(shí),則會(huì)丟棄隊(duì)列中最舊的任務(wù)以釋放空間來(lái)存儲(chǔ)該任務(wù)
public?class?RejectedPolicyDemo?{
????/**
?????*?創(chuàng)建一個(gè)線程池,其最多只會(huì)創(chuàng)建2個(gè)線程,任務(wù)隊(duì)列最多存放1個(gè)任務(wù)
?????*/
????private?static?ThreadPoolExecutor?executor?=?new?ThreadPoolExecutor(
????????2,?2,?60,?TimeUnit.SECONDS,?new?ArrayBlockingQueue<>(1)?);
????public?static?void?test2(){
????????//?拒絕策略:丟棄隊(duì)列中最舊的任務(wù)
????????System.out.println("--------------------?拒絕策略:丟棄隊(duì)列中最舊的任務(wù)?--------------------");
????????executor.setRejectedExecutionHandler(?new?ThreadPoolExecutor.DiscardOldestPolicy()?);
????????executeJob(?"register"?);
????????executor.shutdown();
????}
}
測(cè)試結(jié)果如下所示。當(dāng)#0、#1號(hào)任務(wù)在執(zhí)行時(shí),#2~#9號(hào)任務(wù)不斷被存儲(chǔ)到隊(duì)列、然后被丟棄以存放最新的任務(wù)。所以#9號(hào)任務(wù)最終被保留并執(zhí)行

AbortPolicy 中止策略
在該策略下,當(dāng)線程池?zé)o法繼續(xù)接收提交的任務(wù)時(shí)會(huì)拋出RejectedExecutionException異常。其也是線程池的默認(rèn)拒絕策略。顯然拋出異常的方式可以讓開(kāi)發(fā)者更好的把握系統(tǒng)的運(yùn)行狀態(tài)。當(dāng)然在此種拒絕策略下,我們需要處理好其所拋出的異常,以免打斷當(dāng)前的執(zhí)行流程
public?class?RejectedPolicyDemo?{
????/**
?????*?創(chuàng)建一個(gè)線程池,其最多只會(huì)創(chuàng)建2個(gè)線程,任務(wù)隊(duì)列最多存放1個(gè)任務(wù)
?????*/
????private?static?ThreadPoolExecutor?executor?=?new?ThreadPoolExecutor(
????????2,?2,?60,?TimeUnit.SECONDS,?new?ArrayBlockingQueue<>(1)?);
????public?static?void?test3(){
????????//?拒絕策略:拋異常
????????System.out.println("--------------------?拒絕策略:拋異常?--------------------");
????????executor.setRejectedExecutionHandler(?new?ThreadPoolExecutor.AbortPolicy()?);
????????try{
????????????executeJob(?"request"?);
????????}catch?(RejectedExecutionException?e)?{
????????????System.out.println("[Error]?提交到線程池的任務(wù)量過(guò)多");
????????}
????????executor.shutdown();
????}
}
測(cè)試結(jié)果如下所示

CallerRunsPolicy 調(diào)用者執(zhí)行策略
在該策略下,當(dāng)線程池?zé)o法繼續(xù)接收提交的任務(wù)時(shí),其會(huì)交由調(diào)用者(提交任務(wù)的線程)去執(zhí)行完成
public?class?RejectedPolicyDemo?{
????/**
?????*?創(chuàng)建一個(gè)線程池,其最多只會(huì)創(chuàng)建2個(gè)線程,任務(wù)隊(duì)列最多存放1個(gè)任務(wù)
?????*/
????private?static?ThreadPoolExecutor?executor?=?new?ThreadPoolExecutor(
????????2,?2,?60,?TimeUnit.SECONDS,?new?ArrayBlockingQueue<>(1)?);
????public?static?void?test4()?{
????????//?拒絕策略:由調(diào)用者執(zhí)行
????????System.out.println("--------------------?拒絕策略:由調(diào)用者執(zhí)行?--------------------");
????????executor.setRejectedExecutionHandler(?new?ThreadPoolExecutor.CallerRunsPolicy()?);
????????executeJob(?"caller"?);
????????executor.shutdown();
????}??
}
測(cè)試結(jié)果如下所示,符合預(yù)期。線程池?zé)o法繼續(xù)接收新任務(wù)時(shí),其會(huì)被提交任務(wù)的線程(即這里的main線程)執(zhí)行完成

Note
當(dāng)線程池的拒絕策略為DiscardPolicy、DiscardOldestPolicy時(shí),則對(duì)于被拒絕任務(wù)的Future實(shí)例而言。如果在其上調(diào)用無(wú)參的get()方法,則會(huì)導(dǎo)致一直被阻塞。故在此種場(chǎng)景下,推薦使用支持超時(shí)機(jī)制的get()方法。測(cè)試代碼如下所示
public?class?ThreadPoolDemo?{
????@Test
????public?void?test1()?throws?Exception?{
????????ThreadPoolExecutor?executor?=?new?ThreadPoolExecutor(1,?1,?0,
????????????????TimeUnit.SECONDS,?new?ArrayBlockingQueue<>(1),?new?ThreadPoolExecutor.DiscardPolicy());
????????Future?future1?=?executor.submit(?taskFactory("Task?1")?);
????????Future?future2?=?executor.submit(?taskFactory("Task?2")?);
????????Future?future3?=?executor.submit(?taskFactory("Task?3")?);
????????future1.get();
????????System.out.println("future?1?Over");
????????future2.get();
????????System.out.println("future?2?Over");
????????future3.get();
????????System.out.println("future?3?Over");
????}
????private?static?Runnable?taskFactory(String?taskName)?{
????????return?()?->?{
????????????//?模擬業(yè)務(wù)耗時(shí)
????????????try{Thread.sleep(3000);}?catch?(Exception?e){}
????????????System.out.println(taskName?+?":?Over");
????????};
????}
}
測(cè)試結(jié)果如下所示,符合預(yù)期

線程池使用完畢后,應(yīng)通過(guò) shutdown()?方法進(jìn)行關(guān)閉。實(shí)例代碼如下所示
public?static?void?main(String[]?args)?{
????System.out.println("Hello?World");
????ThreadPoolExecutor?threadPoolExecutor?=?(ThreadPoolExecutor)?Executors.newFixedThreadPool(5);
????threadPoolExecutor.execute(?()->?System.out.println("Test?Task")?);
????//?關(guān)閉線程池
????//threadPoolExecutor.shutdown();
????System.out.println("Main?Over");
}
測(cè)試結(jié)果如下所示,左下角處的紅色方塊表明由于線程池未關(guān)閉,JVM依然存在并未退出。正確做法是放開(kāi)上述代碼中對(duì)線程池關(guān)閉操作的注釋

參考文獻(xiàn)
Java并發(fā)編程之美 翟陸續(xù)、薛賓田著
