<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Java線程池之ThreadPoolExecutor

          共 4252字,需瀏覽 9分鐘

           ·

          2021-10-25 10:13

          日常開(kāi)發(fā)中對(duì)于多線程的使用,一般很少直接new Thread。因?yàn)榫€程的頻繁創(chuàng)建、銷(xiāo)毀會(huì)耗費(fèi)大量的系統(tǒng)資源。為此基于池化技術(shù)的線程池應(yīng)運(yùn)而生

          abstract.jpeg

          Executors類

          在Executors類中提供了很多創(chuàng)建線程池的工廠方法。這里介紹下一些常見(jiàn)的工廠方法

          newFixedThreadPool

          該方法創(chuàng)建一個(gè)固定數(shù)量線程的線程池

          public?class?TestThreadPool?{
          ????/**
          ?????*?定長(zhǎng)的線程池
          ?????*/

          ????public?static?void?test1()?{
          ????????ExecutorService?executor?=?Executors.newFixedThreadPool(3);
          ????????work(executor);
          ????????executor.shutdown();
          ????}

          ????private?static?void?work(ExecutorService?executor)?{
          ????????for?(int?i=0;?i<10;?i++)?{
          ????????????executor.submit(?new?Job("Job-"+i)?);
          ????????}
          ????}
          }

          從測(cè)試結(jié)果可以看到,無(wú)論多少個(gè)任務(wù),可用的線程數(shù)量都是固定的

          figure 1.jpeg

          newCachedThreadPool

          該方法創(chuàng)建一個(gè)線程池,當(dāng)沒(méi)有空閑線程可用時(shí),其會(huì)一直創(chuàng)建新的線程來(lái)處理任務(wù)

          public?class?TestThreadPool?{
          ????/**
          ?????*?可緩存的線程池
          ?????*/

          ????public?static?void?test2()?{
          ????????ExecutorService?executor?=?Executors.newCachedThreadPool();
          ????????work(executor);
          ????????executor.shutdown();
          ????}
          }

          測(cè)試結(jié)果如下所示

          figure 2.jpeg

          newSingleThreadExecutor

          該方法創(chuàng)建的線程池中只有一個(gè)線程,故提交至此的任務(wù)會(huì)依次執(zhí)行

          public?class?TestThreadPool?{
          ????/**
          ?????*?單線程的線程池
          ?????*/

          ????public?static?void?test3()?{
          ????????ExecutorService?executor?=?Executors.newSingleThreadExecutor();
          ????????work(executor);
          ????????executor.shutdown();
          ????}
          }

          測(cè)試結(jié)果如下所示

          figure 3.jpeg

          newScheduledThreadPool

          該方法創(chuàng)建的線程池可用于執(zhí)行定時(shí)任務(wù)

          public?class?TestThreadPool?{
          ????/**
          ?????*?定時(shí)任務(wù)的線程池
          ?????*/

          ????public?static?void?test4()?{

          ????????Consumer?task?=?(String?taskName)?->?{
          ????????????String?now?=?DateUtil.format(?new?Date(),?"HH:mm:ss"?);
          ????????????String?info?=?"?+?now?+?"?[Thread]:?"?+?Thread.currentThread().getName()?+?"?:?"?+?taskName;
          ????????????System.out.println(?info?);
          ????????????try{
          ????????????????//?耗時(shí)模擬:5秒
          ????????????????Thread.sleep(1000*5);
          ????????????}catch?(InterruptedException?e)?{
          ????????????????e.printStackTrace();
          ????????????}
          ????????};

          ????????ScheduledExecutorService?executorService?=?Executors.newScheduledThreadPool(4);
          ????????System.out.println(?"?+?DateUtil.format(?new?Date(),?"HH:mm:ss")?);

          ????????//?一次性任務(wù),?延遲10秒后執(zhí)行
          ????????executorService.schedule(?()->task.accept("OneTime"),?10,?TimeUnit.SECONDS?);

          ????????//?定時(shí)任務(wù):?延遲30秒啟動(dòng),每次間隔10秒開(kāi)始執(zhí)行
          ????????executorService.scheduleAtFixedRate(?()->task.accept("fixedRate"),?30,?10,?TimeUnit.SECONDS);

          ????????//?定時(shí)任務(wù):?延遲30秒啟動(dòng),每次完成10秒后再次執(zhí)行
          ????????executorService.scheduleWithFixedDelay(?()->task.accept("fixedDelay")?,?30,?10,?TimeUnit.SECONDS);
          ????}
          }

          其中上述代碼中scheduleAtFixedRate、scheduleWithFixedDelay方法第三個(gè)參數(shù)的含義分別是兩次任務(wù)開(kāi)始執(zhí)行的間隔時(shí)間上一次任務(wù)結(jié)束至本次任務(wù)開(kāi)始的間隔時(shí)間。與SpringBoot中的@Scheduled(fixedRate)、@Scheduled(fixedDelay)注解的用途類似。值得一提的是,對(duì)于scheduleAtFixedRate而言,當(dāng) 我們指定的兩次任務(wù)開(kāi)始執(zhí)行的間隔時(shí)間 小于 該任務(wù)執(zhí)行一次所需的耗時(shí) 時(shí),將會(huì)以 該任務(wù)執(zhí)行所需的耗時(shí) 作為 兩次任務(wù)開(kāi)始執(zhí)行的實(shí)際間隔時(shí)間

          測(cè)試結(jié)果如下所示。從藍(lán)框可以看出,三個(gè)任務(wù)的第一次執(zhí)行時(shí)機(jī)均按指定的延時(shí)時(shí)間(分別延遲10秒、30秒、30秒)啟動(dòng);從綠框可知,對(duì)于名為fixedRate的任務(wù)而言,每次開(kāi)始執(zhí)行的間隔為10秒;從紅框可知,對(duì)于名為fixedDelay的任務(wù)而言,每次開(kāi)始執(zhí)行的間隔為15秒。因?yàn)槠渖弦淮稳蝿?wù)結(jié)束至本次任務(wù)開(kāi)始的時(shí)間間隔為10秒,加上該任務(wù)本身耗時(shí)5秒,故累計(jì)為15秒

          figure 4.jpeg

          ThreadPoolExecutor類

          概述

          事實(shí)上對(duì)于上述的工廠方法而言,其內(nèi)部是使用線程池ThreadPoolExecutor類。該類的繼承結(jié)構(gòu)如下所示

          figure 5.jpeg

          與線程類似。對(duì)于線程池而言,其整個(gè)生命周期階段也存在若干不同的狀態(tài)。具體如下

          • Running:該狀態(tài)下,線程池可以接受新任務(wù),并能夠處理阻塞隊(duì)列中的任務(wù)
          • ShutDown:該狀態(tài)下,線程池不再可以接受新任務(wù),但能夠繼續(xù)處理阻塞隊(duì)列中的任務(wù)
          • Stop:該狀態(tài)下,線程池不再可以接受新任務(wù),也不會(huì)繼續(xù)處理阻塞隊(duì)列中的任務(wù)。同時(shí)會(huì)中斷正在處理的任務(wù)
          • Tidying:該狀態(tài)下,線程池中的工作線程數(shù)量為0。并且會(huì)調(diào)用terminated()鉤子方法(hook method)
          • Terminated:當(dāng)terminated()鉤子方法(hook method)執(zhí)行完畢后,線程池進(jìn)入該狀態(tài)

          各狀態(tài)的變化流程如下所示

          figure 6.jpeg

          值得一提的是,在ThreadPoolExecutor的實(shí)現(xiàn)過(guò)程中。其通過(guò)一個(gè)AtomicInteger類型的原子變量ctl實(shí)現(xiàn)了對(duì)線程池狀態(tài)、工作線程數(shù)的記錄。具體來(lái)說(shuō),是將高3位用于表示線程池狀態(tài),剩余位表示工作線程數(shù)。runStateOf方法用于獲取線程池狀態(tài)信息,workerCountOf方法用于獲取工作線程數(shù)

          figure 7.jpeg

          在實(shí)際應(yīng)用過(guò)程中,線程池ThreadPoolExecutor常見(jiàn)參數(shù)如下:

          • corePoolSize:線程池的核心線程數(shù)
          • maximumPoolSize:線程池的最大線程數(shù)
          • keepAliveTime:空閑線程的超時(shí)時(shí)間,用于終止空閑線程。通常其只對(duì)線程池中超過(guò)corePoolSize的多余線程生效。除非allowCoreThreadTimeOut屬性設(shè)為true,才會(huì)對(duì)核心線程生效
          • unit:keepAliveTime參數(shù)的時(shí)間單位。其可選值定義在枚舉類TimeUnit中
          • workQueue:任務(wù)的阻塞隊(duì)列
          • handler:當(dāng) 任務(wù)隊(duì)列workQueue已滿 且 線程數(shù)已達(dá)到maximumPoolSize ,提交新任務(wù)時(shí)的拒絕策略

          其在接收任務(wù)后的基本流程如下所示

          figure 8.jpeg

          拒絕策略

          JDK提供了四種拒絕策略,均實(shí)現(xiàn)了RejectedExecutionHandler接口

          DiscardPolicy 丟棄策略

          該策略下當(dāng)提交的任務(wù) 無(wú)空閑線程執(zhí)行 或 任務(wù)隊(duì)列已滿 時(shí),則會(huì)直接被丟棄且不會(huì)產(chǎn)生任何異常

          public?class?RejectedPolicyDemo?{

          ????/**
          ?????*?創(chuàng)建一個(gè)線程池,其最多只會(huì)創(chuàng)建2個(gè)線程,任務(wù)隊(duì)列最多存放1個(gè)任務(wù)
          ?????*/

          ????private?static?ThreadPoolExecutor?executor?=?new?ThreadPoolExecutor(
          ????????2,?2,?60,?TimeUnit.SECONDS,?new?ArrayBlockingQueue<>(1)?);

          ????public?static?void?test1()?{
          ????????//?拒絕策略:直接丟棄
          ????????System.out.println("--------------------?拒絕策略:直接丟棄?--------------------");
          ????????executor.setRejectedExecutionHandler(?new?ThreadPoolExecutor.DiscardPolicy()?);
          ????????executeJob(?"clean"?);

          ????????executor.shutdown();
          ????}

          ????private?static?void?executeJob(String?name)?{
          ????????for?(int?i=0;?i<10;?i++)?{
          ????????????executor.submit(?new?Job(?name+"-"+i)?);
          ????????}
          ????}
          }

          測(cè)試結(jié)果如下所示,#3-#9號(hào)任務(wù)被直接丟棄了

          figure 9.jpeg

          DiscardOldestPolicy 丟棄最老策略

          該策略下當(dāng)提交的任務(wù) 無(wú)空閑線程執(zhí)行 或 任務(wù)隊(duì)列已滿 時(shí),則會(huì)丟棄隊(duì)列中最舊的任務(wù)以釋放空間來(lái)存儲(chǔ)該任務(wù)

          public?class?RejectedPolicyDemo?{

          ????/**
          ?????*?創(chuàng)建一個(gè)線程池,其最多只會(huì)創(chuàng)建2個(gè)線程,任務(wù)隊(duì)列最多存放1個(gè)任務(wù)
          ?????*/

          ????private?static?ThreadPoolExecutor?executor?=?new?ThreadPoolExecutor(
          ????????2,?2,?60,?TimeUnit.SECONDS,?new?ArrayBlockingQueue<>(1)?);

          ????public?static?void?test2(){
          ????????//?拒絕策略:丟棄隊(duì)列中最舊的任務(wù)
          ????????System.out.println("--------------------?拒絕策略:丟棄隊(duì)列中最舊的任務(wù)?--------------------");
          ????????executor.setRejectedExecutionHandler(?new?ThreadPoolExecutor.DiscardOldestPolicy()?);
          ????????executeJob(?"register"?);

          ????????executor.shutdown();
          ????}
          }

          測(cè)試結(jié)果如下所示。當(dāng)#0、#1號(hào)任務(wù)在執(zhí)行時(shí),#2~#9號(hào)任務(wù)不斷被存儲(chǔ)到隊(duì)列、然后被丟棄以存放最新的任務(wù)。所以#9號(hào)任務(wù)最終被保留并執(zhí)行

          figure 10.jpeg

          AbortPolicy 中止策略

          在該策略下,當(dāng)線程池?zé)o法繼續(xù)接收提交的任務(wù)時(shí)會(huì)拋出RejectedExecutionException異常。其也是線程池的默認(rèn)拒絕策略。顯然拋出異常的方式可以讓開(kāi)發(fā)者更好的把握系統(tǒng)的運(yùn)行狀態(tài)。當(dāng)然在此種拒絕策略下,我們需要處理好其所拋出的異常,以免打斷當(dāng)前的執(zhí)行流程

          public?class?RejectedPolicyDemo?{

          ????/**
          ?????*?創(chuàng)建一個(gè)線程池,其最多只會(huì)創(chuàng)建2個(gè)線程,任務(wù)隊(duì)列最多存放1個(gè)任務(wù)
          ?????*/

          ????private?static?ThreadPoolExecutor?executor?=?new?ThreadPoolExecutor(
          ????????2,?2,?60,?TimeUnit.SECONDS,?new?ArrayBlockingQueue<>(1)?);

          ????public?static?void?test3(){
          ????????//?拒絕策略:拋異常
          ????????System.out.println("--------------------?拒絕策略:拋異常?--------------------");
          ????????executor.setRejectedExecutionHandler(?new?ThreadPoolExecutor.AbortPolicy()?);
          ????????try{
          ????????????executeJob(?"request"?);
          ????????}catch?(RejectedExecutionException?e)?{
          ????????????System.out.println("[Error]?提交到線程池的任務(wù)量過(guò)多");
          ????????}
          ????????executor.shutdown();
          ????}
          }

          測(cè)試結(jié)果如下所示

          figure 11.jpeg

          CallerRunsPolicy 調(diào)用者執(zhí)行策略

          在該策略下,當(dāng)線程池?zé)o法繼續(xù)接收提交的任務(wù)時(shí),其會(huì)交由調(diào)用者(提交任務(wù)的線程)去執(zhí)行完成

          public?class?RejectedPolicyDemo?{

          ????/**
          ?????*?創(chuàng)建一個(gè)線程池,其最多只會(huì)創(chuàng)建2個(gè)線程,任務(wù)隊(duì)列最多存放1個(gè)任務(wù)
          ?????*/

          ????private?static?ThreadPoolExecutor?executor?=?new?ThreadPoolExecutor(
          ????????2,?2,?60,?TimeUnit.SECONDS,?new?ArrayBlockingQueue<>(1)?);

          ????public?static?void?test4()?{
          ????????//?拒絕策略:由調(diào)用者執(zhí)行
          ????????System.out.println("--------------------?拒絕策略:由調(diào)用者執(zhí)行?--------------------");
          ????????executor.setRejectedExecutionHandler(?new?ThreadPoolExecutor.CallerRunsPolicy()?);
          ????????executeJob(?"caller"?);

          ????????executor.shutdown();
          ????}??
          }

          測(cè)試結(jié)果如下所示,符合預(yù)期。線程池?zé)o法繼續(xù)接收新任務(wù)時(shí),其會(huì)被提交任務(wù)的線程(即這里的main線程)執(zhí)行完成

          figure 12.jpeg

          Note

          • 當(dāng)線程池的拒絕策略為DiscardPolicy、DiscardOldestPolicy時(shí),則對(duì)于被拒絕任務(wù)的Future實(shí)例而言。如果在其上調(diào)用無(wú)參的get()方法,則會(huì)導(dǎo)致一直被阻塞。故在此種場(chǎng)景下,推薦使用支持超時(shí)機(jī)制的get()方法。測(cè)試代碼如下所示
          public?class?ThreadPoolDemo?{

          ????@Test
          ????public?void?test1()?throws?Exception?{
          ????????ThreadPoolExecutor?executor?=?new?ThreadPoolExecutor(1,?1,?0,
          ????????????????TimeUnit.SECONDS,?new?ArrayBlockingQueue<>(1),?new?ThreadPoolExecutor.DiscardPolicy());

          ????????Future?future1?=?executor.submit(?taskFactory("Task?1")?);
          ????????Future?future2?=?executor.submit(?taskFactory("Task?2")?);
          ????????Future?future3?=?executor.submit(?taskFactory("Task?3")?);

          ????????future1.get();
          ????????System.out.println("future?1?Over");

          ????????future2.get();
          ????????System.out.println("future?2?Over");

          ????????future3.get();
          ????????System.out.println("future?3?Over");
          ????}

          ????private?static?Runnable?taskFactory(String?taskName)?{
          ????????return?()?->?{
          ????????????//?模擬業(yè)務(wù)耗時(shí)
          ????????????try{Thread.sleep(3000);}?catch?(Exception?e){}
          ????????????System.out.println(taskName?+?":?Over");
          ????????};
          ????}
          }

          測(cè)試結(jié)果如下所示,符合預(yù)期

          figure 13.jpeg
          • 線程池使用完畢后,應(yīng)通過(guò) shutdown()?方法進(jìn)行關(guān)閉。實(shí)例代碼如下所示
          public?static?void?main(String[]?args)?{
          ????System.out.println("Hello?World");
          ????ThreadPoolExecutor?threadPoolExecutor?=?(ThreadPoolExecutor)?Executors.newFixedThreadPool(5);
          ????threadPoolExecutor.execute(?()->?System.out.println("Test?Task")?);

          ????//?關(guān)閉線程池
          ????//threadPoolExecutor.shutdown();

          ????System.out.println("Main?Over");
          }

          測(cè)試結(jié)果如下所示,左下角處的紅色方塊表明由于線程池未關(guān)閉,JVM依然存在并未退出。正確做法是放開(kāi)上述代碼中對(duì)線程池關(guān)閉操作的注釋

          figure 14.jpeg

          參考文獻(xiàn)

          1. Java并發(fā)編程之美 翟陸續(xù)、薛賓田著
          瀏覽 34
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产精品精品国产婷婷这里Av | 2019中文字幕在线视频 | 一级黄网站 | 一区日| 国产传媒三级 |