<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          從實(shí)戰(zhàn)到原理,線程池的各類使用場景整合

          共 13365字,需瀏覽 27分鐘

           ·

          2021-12-27 13:50

          在日常的開發(fā)工作中,我們經(jīng)常會需要使用到線程池這類型的組件。例如下邊幾種應(yīng)用場景:

          線程池經(jīng)典應(yīng)用場景

          異步發(fā)送郵件通知發(fā)送一個(gè)任務(wù),然后注入到線程池中異步發(fā)送。

          心跳請求任務(wù)創(chuàng)建一個(gè)任務(wù),然后定時(shí)發(fā)送請求到線程池中。

          類似的場景有很多,我們下邊一步一步地來介紹不同的應(yīng)用場景下,線程池的具體使用案例:

          異步發(fā)送郵件場景

          定義一個(gè)簡單的郵件發(fā)送接口:

          public?interface?SendEmailService?{
          ????/**
          ?????*?發(fā)送郵件
          ?????*
          ?????*?@param?emailDTO?郵件對象
          ?????*/

          ????void?sendEmail(EmailDTO?emailDTO);
          }

          接著是郵件發(fā)送的簡單實(shí)現(xiàn)類:

          @Service
          public?class?SendEmailServiceImpl?implements?SendEmailService?{
          ????@Resource
          ????private?ExecutorService?emailTaskPool;

          ????@Override
          ????public?void?sendEmail(EmailDTO?emailDTO)?{
          ????????emailTaskPool.submit(()?->?{
          ????????????try?{
          ????????????????System.out.printf("sending?email?....?emailDto?is?%s?\n",?emailDTO);
          ????????????????Thread.sleep(1000);
          ????????????????System.out.println("sended?success");
          ????????????}?catch?(InterruptedException?e)?{
          ????????????????e.printStackTrace();
          ????????????}
          ????????});
          ????}
          }

          郵件的發(fā)送邏輯通過一個(gè)簡單的線程睡眠來模擬發(fā)送過程中的耗時(shí)操作。

          然后是線程池方面的配置:

          @Configuration
          public?class?ThreadPoolConfig?{
          ????@Bean
          ????public?ExecutorService?emailTaskPool()?{
          ????????return?new?ThreadPoolExecutor(2,?4,
          ????????????????0L,?TimeUnit.MILLISECONDS,
          ????????????????new?LinkedBlockingQueue(),?new?SysThreadFactory("email-task"));
          ????}
          }

          controller模塊的觸發(fā)

          @RestController
          @RequestMapping(value?=?"/test")
          public?class?TestController?{
          ????@Resource
          ????private?SendEmailService?sendEmailService;
          ????@GetMapping(value?=?"/send-email")
          ????public?boolean?sendEmail()?{
          ????????EmailDTO?emailDTO?=?new?EmailDTO();
          ????????emailDTO.setContent("測試文案");
          ????????emailDTO.setReceiver("idea");
          ????????emailDTO.setTitle("郵件標(biāo)題");
          ????????sendEmailService.sendEmail(emailDTO);
          ????????return?true;
          ????}
          }

          這是一個(gè)非常簡單的案例,通過一個(gè)http請求,然后觸發(fā)一個(gè)郵件的發(fā)送操作。

          心跳請求場景

          這類應(yīng)用場景一般會在一些基礎(chǔ)組件中使用到,例如一些具有心跳探活機(jī)制類型功能的中間件,如nacos。下邊來看看對應(yīng)的代碼實(shí)踐:首先是心跳模塊代碼:

          public?class?HeartBeatInfo?{
          ????private?String?info;
          ????private?long?nextSendTimeDelay;
          ????public?String?getInfo()?{
          ????????return?info;
          ????}
          ????public?void?setInfo(String?info)?{
          ????????this.info?=?info;
          ????}

          ????public?long?getNextSendTimeDelay()?{
          ????????return?nextSendTimeDelay;
          ????}
          ????public?void?setNextSendTimeDelay(long?nextSendTimeDelay)?{
          ????????this.nextSendTimeDelay?=?nextSendTimeDelay;
          ????}
          ????@Override
          ????public?String?toString()?{
          ????????return?"HeartBeatInfo{"?+
          ????????????????"info='"?+?info?+?'\''?+
          ????????????????",?nextSendTimeDelay="?+?nextSendTimeDelay?+
          ????????????????'}';
          ????}
          }

          然后是模擬一個(gè)心跳包的發(fā)送服務(wù)接口定義:

          public?interface?HeartBeatTaskService?{
          ????void?sendBeatInfo();
          }

          接下來是心跳任務(wù)的發(fā)送核心部分實(shí)現(xiàn):

          @Service
          public?class?HeartBeatTaskServiceImpl?implements?HeartBeatTaskService?{
          ????@Resource
          ????private?ScheduledThreadPoolExecutor?scheduledThreadPoolExecutor;
          ????@Override
          ????public?void?sendBeatInfo()?{
          ????????HeartBeatInfo?heartBeatInfo?=?new?HeartBeatInfo();
          ????????heartBeatInfo.setInfo("test-info");
          ????????heartBeatInfo.setNextSendTimeDelay(1000);
          ????????scheduledThreadPoolExecutor.schedule(new?HeartBeatTask(heartBeatInfo),
          ????????????????heartBeatInfo.getNextSendTimeDelay(),?TimeUnit.MILLISECONDS);
          ????}
          ????class?HeartBeatTask?implements?Runnable?{
          ????????private?HeartBeatInfo?heartBeatInfo;
          ????????public?HeartBeatTask(HeartBeatInfo?heartBeatInfo)?{
          ????????????this.heartBeatInfo?=?heartBeatInfo;
          ????????}
          ????????@Override
          ????????public?void?run()?{
          ????????????System.out.println("發(fā)送心跳數(shù)據(jù)包:"?+?heartBeatInfo.getInfo());
          ????????????HeartBeatInfo?heartBeatInfo?=?new?HeartBeatInfo();
          ????????????heartBeatInfo.setInfo("test-info");
          ????????????heartBeatInfo.setNextSendTimeDelay(1000);
          ????????????scheduledThreadPoolExecutor.schedule(new?HeartBeatTask(heartBeatInfo),
          ????????????????????heartBeatInfo.getNextSendTimeDelay(),?TimeUnit.MILLISECONDS);
          ????????}
          ????}
          }

          在核心實(shí)現(xiàn)的內(nèi)部有一個(gè)延時(shí)線程池ScheduledThreadPoolExecutor,ScheduledThreadPoolExecutor會在放入線程任務(wù)的一段指定的時(shí)間之后才觸發(fā)任務(wù)的執(zhí)行:

          @Configuration
          public?class?ThreadPoolConfig?{
          ?
          ????@Bean
          ????public?ScheduledThreadPoolExecutor??scheduledThreadPoolExecutor(){
          ????????return?new?ScheduledThreadPoolExecutor(2,?new?ThreadFactory()?{
          ????????????@Override
          ????????????public?Thread?newThread(Runnable?r)?{
          ????????????????Thread?thread?=?new?Thread(r);
          ????????????????thread.setDaemon(true);
          ????????????????thread.setName("org.idea.threadpool.beat.sender");
          ????????????????return?thread;
          ????????????}
          ????????});
          ????}
          }

          JDK內(nèi)部線程池的設(shè)計(jì)

          看了上邊兩個(gè)簡單的案例之后,不知道你是否會有好奇:

          到底線程池的內(nèi)部運(yùn)行機(jī)制會是怎樣的呢?

          簡單手寫一個(gè)單消費(fèi)者任務(wù)處理模型

          這里我們可以通過一段簡單的代碼來學(xué)習(xí)這部分的內(nèi)容:首先,我們將需要處理的任務(wù)封裝在一個(gè)對象內(nèi)部,暫時(shí)定義如下所示:

          public?class?AsyncHandlerData?{

          ????private?String?dataInfo;

          ????public?String?getDataInfo()?{
          ????????return?dataInfo;
          ????}

          ????public?void?setDataInfo(String?dataInfo)?{
          ????????this.dataInfo?=?dataInfo;
          ????}

          ????@Override
          ????public?String?toString()?{
          ????????return?"AsyncHandlerData{"?+
          ????????????????"dataInfo='"?+?dataInfo?+?'\''?+
          ????????????????'}';
          ????}
          }

          然后會有一個(gè)專門消費(fèi)這些個(gè)任務(wù)的service:

          public?interface?AsyncHandlerService?{
          ????/**
          ?????*?任務(wù)放入隊(duì)列中
          ?????*?
          ?????*?@param?asyncHandlerData
          ?????*/

          ????void?putTask(AsyncHandlerData?asyncHandlerData);
          }

          最后根據(jù)提前定義好的接口編寫一個(gè)實(shí)現(xiàn)類,此時(shí)將相關(guān)的任務(wù)處理邏輯規(guī)整到了一個(gè)對象當(dāng)中:

          @Service
          public?class?AsyncHandlerServiceImpl?implements?AsyncHandlerService,?CommandLineRunner?{

          ????private?volatile?TaskQueueHandler?taskQueueHandler?=?new?TaskQueueHandler();

          ????@Override
          ????public?void?putTask(AsyncHandlerData?asyncHandlerData)?{
          ????????taskQueueHandler.addTask(asyncHandlerData);
          ????}

          ????@Override
          ????public?void?run(String...?args)?throws?Exception?{
          ????????Thread?thread?=?new?Thread(taskQueueHandler);
          ????????thread.setDaemon(true);
          ????????thread.start();
          ????}


          ????public?class?TaskQueueHandler?implements?Runnable?{

          ????????private?BlockingQueue?tasks?=?new?ArrayBlockingQueue<>(1024?*?1024);

          ????????public?void?addTask(AsyncHandlerData?asyncHandlerData)?{
          ????????????tasks.offer(asyncHandlerData);
          ????????}


          ????????@Override
          ????????public?void?run()?{
          ????????????for?(;?;?)?{
          ????????????????try?{
          ????????????????????AsyncHandlerData?asyncHandlerData?=?tasks.take();
          ????????????????????System.out.println("異步處理任務(wù)數(shù)據(jù):"?+?asyncHandlerData.getDataInfo());
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}
          ????????????}
          ????????}
          ????}
          }

          整個(gè)代碼的思路邏輯比較簡單,大致可以歸整成下圖所示:



          整體的設(shè)計(jì)模式就是一端放入,由單個(gè)消費(fèi)者取出。但是存在一個(gè)不足點(diǎn),一旦消費(fèi)者能力較弱,或者出現(xiàn)任務(wù)堵塞的話,就會導(dǎo)致任務(wù)隊(duì)列出現(xiàn)堆積,然后越堆積越難處理地過來。

          但是這樣的設(shè)計(jì)還是一個(gè)過于簡單的模型,下邊我們來看看jdk內(nèi)部線程池的設(shè)計(jì)模式:

          線程池內(nèi)部的源代碼分析

          我們在項(xiàng)目里使用線程池的時(shí)候,通常都會先創(chuàng)建一個(gè)具體實(shí)現(xiàn)Bean來定義線程池,例如:

          @Bean
          public?ExecutorService?emailTaskPool()?{
          ????return?new?ThreadPoolExecutor(2,?4,
          ????????????0L,?TimeUnit.MILLISECONDS,
          ????????????new?LinkedBlockingQueue(),?new?SysThreadFactory("email-task"));
          }

          ThreadPoolExecutor的父類是AbstractExecutorService,然后AbstractExecutorService的頂層接口是:ExecutorService。

          就例如發(fā)送郵件接口而言,當(dāng)線程池觸發(fā)了submit函數(shù)的時(shí)候,實(shí)際上會調(diào)用到父類AbstractExecutorService對象的java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)方法,然后進(jìn)入到ThreadPoolExecutor#execute部分。

          @Override
          public?void?sendEmail(EmailDTO?emailDTO)?{
          ????emailTaskPool.submit(()?->?{
          ????????try?{
          ????????????System.out.printf("sending?email?....?emailDto?is?%s?\n",?emailDTO);
          ????????????Thread.sleep(1000);
          ????????????System.out.println("sended?success");
          ????????}?catch?(InterruptedException?e)?{
          ????????????e.printStackTrace();
          ????????}
          ????});
          }

          java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)?源代碼位置:

          /**
          ?*?@throws?RejectedExecutionException?{@inheritDoc}
          ?*?@throws?NullPointerException???????{@inheritDoc}
          ?*/

          public?Future?submit(Runnable?task)?{
          ????if?(task?==?null)?throw?new?NullPointerException();
          ????RunnableFuture?ftask?=?newTaskFor(task,?null);
          ????execute(ftask);
          ????return?ftask;
          }

          這里面你會看到返回的是一個(gè)future對象供調(diào)用方判斷線程池內(nèi)部的函數(shù)到底是否有完全執(zhí)行成功。因此如果有時(shí)候如果需要判斷線程池執(zhí)行任務(wù)的結(jié)果話,可以這樣操作:

          Future?future?=?emailTaskPool.submit(()?->?{
          ??????????try?{
          ??????????????System.out.printf("sending?email?....?emailDto?is?%s?\n",?emailDTO);
          ??????????????Thread.sleep(1000);
          ??????????????System.out.println("sended?success");
          ??????????}?catch?(InterruptedException?e)?{
          ??????????????e.printStackTrace();
          ??????????}
          ??????});
          ??????//todo?something
          ??????future.get();
          }

          在jdk8源代碼中,提交任務(wù)的執(zhí)行邏輯部分如下所示:新增線程任務(wù)的時(shí)候代碼:

          ??public?void?execute(Runnable?command)?{
          ????????if?(command?==?null)
          ????????????throw?new?NullPointerException();
          ????????/*
          ?????????*?Proceed?in?3?steps:
          ?????????*
          ?????????*?1.?If?fewer?than?corePoolSize?threads?are?running,?try?to
          ?????????*?start?a?new?thread?with?the?given?command?as?its?first
          ?????????*?task.??The?call?to?addWorker?atomically?checks?runState?and
          ?????????*?workerCount,?and?so?prevents?false?alarms?that?would?add
          ?????????*?threads?when?it?shouldn't,?by?returning?false.
          ?????????*
          ?????????*?2.?If?a?task?can?be?successfully?queued,?then?we?still?need
          ?????????*?to?double-check?whether?we?should?have?added?a?thread
          ?????????*?(because?existing?ones?died?since?last?checking)?or?that
          ?????????*?the?pool?shut?down?since?entry?into?this?method.?So?we
          ?????????*?recheck?state?and?if?necessary?roll?back?the?enqueuing?if
          ?????????*?stopped,?or?start?a?new?thread?if?there?are?none.
          ?????????*
          ?????????*?3.?If?we?cannot?queue?task,?then?we?try?to?add?a?new
          ?????????*?thread.??If?it?fails,?we?know?we?are?shut?down?or?saturated
          ?????????*?and?so?reject?the?task.
          ?????????*/

          ????????int?c?=?ctl.get();
          ????????//工作線程數(shù)小于核心線程的時(shí)候,可以填寫worker線程
          ????????if?(workerCountOf(c)???????????????//新增工作線程的時(shí)候會加鎖
          ????????????if?(addWorker(command,?true))
          ????????????????return;
          ????????????c?=?ctl.get();
          ????????}
          ????????//如果線程池的狀態(tài)正常,切任務(wù)放入就緒隊(duì)列正常
          ????????if?(isRunning(c)?&&?workQueue.offer(command))?{
          ????????????int?recheck?=?ctl.get();
          ????????????if?(!?isRunning(recheck)?&&?remove(command))
          ????????????????//如果當(dāng)前線程池處于關(guān)閉狀態(tài),則拋出拒絕異常
          ????????????????reject(command);
          ????????????//如果工作線程數(shù)超過了核心線程數(shù),那么就需要考慮新增工作線程
          ????????????else?if?(workerCountOf(recheck)?==?0)
          ????????????????addWorker(null,?false);
          ????????}
          ????????//如果新增的工作線程已經(jīng)達(dá)到了最大線程數(shù)限制的條件下,需要觸發(fā)拒絕策略的拋出
          ????????else?if?(!addWorker(command,?false))
          ????????????reject(command);
          ????}

          通過深入閱讀工作線程主要存放在了一個(gè)hashset集合當(dāng)中, 添加工作線程部分的邏輯代碼如下所示:

          private?boolean?addWorker(Runnable?firstTask,?boolean?core)?{
          ????retry:
          ????for?(;;)?{
          ????????int?c?=?ctl.get();
          ????????int?rs?=?runStateOf(c);
          ????????//確保當(dāng)前線程池沒有進(jìn)入到一個(gè)銷毀狀態(tài)中
          ????????//?Check?if?queue?empty?only?if?necessary.
          ????????if?(rs?>=?SHUTDOWN?&&
          ????????????!?(rs?==?SHUTDOWN?&&
          ???????????????firstTask?==?null?&&
          ???????????????!?workQueue.isEmpty()))
          ????????????return?false;

          ????????for?(;;)?{
          ????????????int?wc?=?workerCountOf(c);
          ????????????if?(wc?>=?CAPACITY?||
          ??????????????//?如果傳入的core屬性是false,則這里需要比對maximumPoolSize參數(shù)
          ????????????????wc?>=?(core???corePoolSize?:?maximumPoolSize))
          ????????????????return?false;
          ????????????????//通過cas操作去增加線程池的工作線程數(shù)畝
          ????????????if?(compareAndIncrementWorkerCount(c))
          ????????????????break?retry;
          ????????????c?=?ctl.get();??//?Re-read?ctl
          ????????????if?(runStateOf(c)?!=?rs)
          ????????????????continue?retry;
          ????????????//?else?CAS?failed?due?to?workerCount?change;?retry?inner?loop
          ????????}
          ????}

          ????boolean?workerStarted?=?false;
          ????boolean?workerAdded?=?false;
          ????Worker?w?=?null;
          ????try?{
          ???????//真正需要指定的任務(wù)是firstTask,它會被注入到worker對象當(dāng)中
          ????????w?=?new?Worker(firstTask);
          ????????final?Thread?t?=?w.thread;
          ????????if?(t?!=?null)?{
          ????????//加入了鎖
          ????????????final?ReentrantLock?mainLock?=?this.mainLock;
          ????????????mainLock.lock();
          ????????????try?{
          ????????????????//?Recheck?while?holding?lock.
          ????????????????//?Back?out?on?ThreadFactory?failure?or?if
          ????????????????//?shut?down?before?lock?acquired.
          ????????????????int?rs?=?runStateOf(ctl.get());

          ????????????????if?(rs?????????????????????(rs?==?SHUTDOWN?&&?firstTask?==?null))?{
          ????????????????????if?(t.isAlive())?//?precheck?that?t?is?startable
          ????????????????????????throw?new?IllegalThreadStateException();
          ????????????????????//workers是一個(gè)hashset集合,會往里面新增工作線程????
          ????????????????????workers.add(w);
          ????????????????????int?s?=?workers.size();
          ????????????????????if?(s?>?largestPoolSize)
          ????????????????????????largestPoolSize?=?s;
          ????????????????????workerAdded?=?true;
          ????????????????}
          ????????????}?finally?{
          ????????????????mainLock.unlock();
          ????????????}
          ????????????if?(workerAdded)?{
          ????????????????//worker本身是一個(gè)線程,但是worker對象內(nèi)部還有一個(gè)線程的參數(shù),
          ????????????????//這個(gè)t才是真正的任務(wù)內(nèi)容
          ????????????????t.start();
          ????????????????workerStarted?=?true;
          ????????????}
          ????????}
          ????}?finally?{
          ????????//如果worker線程創(chuàng)建好了,但是內(nèi)部的真正任務(wù)還沒有啟動,此時(shí)突然整個(gè)
          ????????//線程池的狀態(tài)被關(guān)閉了,那么這時(shí)候workerStarted就會為false,然后將
          ????????//工作線程的數(shù)目做自減調(diào)整。
          ????????if?(!?workerStarted)
          ????????????addWorkerFailed(w);
          ????}
          ????return?workerStarted;
          }

          進(jìn)過理解之后,整體執(zhí)行的邏輯以及先后順序如下圖所示:


          首先判斷線程池內(nèi)部的現(xiàn)場是否都有任務(wù)需要執(zhí)行。如果不是,則使用一個(gè)空閑的工作線程用于任務(wù)執(zhí)行。否則會判斷當(dāng)前的堵塞隊(duì)列是否已經(jīng)滿了,如果沒有滿則往隊(duì)列里面投遞任務(wù),等待工作線程去處理。如果堵塞隊(duì)列已經(jīng)滿了,此時(shí)判斷工作線程數(shù)是否大于最大線程數(shù),如果沒有,則繼續(xù)創(chuàng)建工作線程,如果已經(jīng)達(dá)到則根據(jù)飽和策略去判斷是果斷拋出異常還是其他方式來進(jìn)行處理。


          線程池常用參數(shù)介紹

          corePoolSize核心線程數(shù),當(dāng)往線程池內(nèi)部提交任務(wù)的時(shí)候,線程池會創(chuàng)建一個(gè)線程來執(zhí)行任務(wù)。即使此時(shí)有空閑的工作線程能夠處理當(dāng)前任務(wù),只要總的工作線程數(shù)小于corePoolSize,也會創(chuàng)建新的工作線程。

          maximumPoolSize當(dāng)任務(wù)的堵塞隊(duì)列滿了之后,如果還有新的任務(wù)提交到線程池內(nèi)部,此時(shí)倘若工作線程數(shù)小于maximumPoolSize,則會創(chuàng)建新的工作線程。

          keepAliveTime上邊我們說到了工作線程Worker(java.util.concurrent.ThreadPoolExecutor.Worker),當(dāng)工作線程處于空閑狀態(tài)中,如果超過了keepAliveTime依然沒有任務(wù),那么就會銷毀當(dāng)前工作線程。如果工作線程需要一直處于執(zhí)行任務(wù),每個(gè)任務(wù)的連續(xù)間隔都比較短,那么這個(gè)keepAliveTime 屬性可以適當(dāng)?shù)卣{(diào)整大一些。

          unitkeepAliveTime對應(yīng)的時(shí)間單位

          workQueue工作隊(duì)列,當(dāng)工作線程數(shù)達(dá)到了核心線程數(shù),那么此時(shí)新來的線程就會被放入到工作隊(duì)列中。線程池內(nèi)部的工作隊(duì)列全部都是繼承自阻塞隊(duì)列的接口,對于常用的阻塞隊(duì)列類型為:

          • ArrayBlockingQueue
          • LinkedBlockingQueue
          • SynchronousQueue
          • PriorityBlockingQueue

          RejectedExecutionHandlerJDK內(nèi)部的線程拒絕策略包含了多種許多種,這里我羅列一些常見的拒絕策略給大家認(rèn)識下:

          • AbortPolicy 直接拋出異常
          • CallerRunsPolicy 任務(wù)的執(zhí)行由注入的線程自己執(zhí)行
          • DiscardOldestPolicy 直接拋棄掉堵塞隊(duì)列中隊(duì)列頭部的任務(wù),然后執(zhí)行嘗試將當(dāng)前任務(wù)提交到堵塞隊(duì)列中。
          • DiscardPolicy 直接拋棄這個(gè)任務(wù)

          從線程池設(shè)計(jì)中的一些啟發(fā)

          多消費(fèi)隊(duì)列的設(shè)計(jì)場景應(yīng)用:業(yè)務(wù)上游提交任務(wù),然后任務(wù)被放進(jìn)一個(gè)堵塞隊(duì)列中,接下來消費(fèi)者需要從堵塞隊(duì)列中提取元素,并且將它們轉(zhuǎn)發(fā)到多個(gè)子隊(duì)列中,各個(gè)子隊(duì)列分別交給不同的子消費(fèi)者處理數(shù)據(jù)。例如下圖所示:



          public?interface?AsyncHandlerService?{

          ????/**
          ?????*?任務(wù)放入隊(duì)列中
          ?????*?
          ?????*?@param?asyncHandlerData
          ?????*/

          ????boolean?putTask(AsyncHandlerData?asyncHandlerData);

          ????/**
          ?????*?啟動消費(fèi)
          ?????*/

          ????void?startJob();
          }

          多消費(fèi)者分發(fā)處理實(shí)現(xiàn)類:

          @Component("asyncMultiConsumerHandlerHandler")
          public?class?AsyncMultiConsumerHandlerHandler?implements?AsyncHandlerService{
          ????private?volatile?TaskQueueHandler?taskQueueHandler?=?new?TaskQueueHandler(10);
          ????@Override
          ????public?boolean?putTask(AsyncHandlerData?asyncHandlerData)?{
          ????????return?taskQueueHandler.addTask(asyncHandlerData);
          ????}
          ????@Override
          ????public?void?startJob(){
          ????????Thread?thread?=?new?Thread(taskQueueHandler);
          ????????thread.setDaemon(true);
          ????????thread.start();
          ????}
          ????/**
          ?????*?將任務(wù)分發(fā)給各個(gè)子隊(duì)列去處理
          ?????*/

          ????static?class?TaskQueueHandler?implements?Runnable?{
          ????????private?static?BlockingQueue?tasks?=?new?ArrayBlockingQueue<>(11);
          ????????public?static?BlockingQueue?getAllTaskInfo()?{
          ????????????return?tasks;
          ????????}
          ????????private?TaskDispatcherHandler[]?taskDispatcherHandlers;
          ????????private?int?childConsumerSize?=?0;
          ????????public?TaskQueueHandler(int?childConsumerSize)?{
          ????????????this.childConsumerSize?=?childConsumerSize;
          ????????????taskDispatcherHandlers?=?new?TaskDispatcherHandler[childConsumerSize];
          ????????????for?(int?i?=?0;?i?????????????????taskDispatcherHandlers[i]?=?new?TaskDispatcherHandler(new?ArrayBlockingQueue<>(100),?"child-worker-"?+?i);
          ????????????????Thread?thread?=?new?Thread(taskDispatcherHandlers[i]);
          ????????????????thread.setDaemon(false);
          ????????????????thread.setName("taskQueueHandler-child-"+i);
          ????????????????thread.start();
          ????????????}
          ????????}
          ????????public?boolean?addTask(AsyncHandlerData?asyncHandlerData)?{
          ????????????return?tasks.offer(asyncHandlerData);
          ????????}

          ????????@Override
          ????????public?void?run()?{
          ????????????int?index?=?0;
          ????????????for?(;?;?)?{
          ????????????????try?{
          ????????????????????AsyncHandlerData?asyncHandlerData?=?tasks.take();
          ????????????????????index?=?(index?==?taskDispatcherHandlers.length)???0?:?index;
          ????????????????????taskDispatcherHandlers[index].addAsyncHandlerData(asyncHandlerData);
          ????????????????????index++;
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}
          ????????????}
          ????????}
          ????}
          ????static?class?TaskDispatcherHandler?implements?Runnable?{
          ????????private?BlockingQueue?subTaskQueue;
          ????????private?String?childName;
          ????????private?AtomicLong?taskCount?=?new?AtomicLong(0);
          ????????public?TaskDispatcherHandler(BlockingQueue?blockingQueue,?String?childName)?{
          ????????????this.subTaskQueue?=?blockingQueue;
          ????????????this.childName?=?childName;
          ????????}
          ????????public?void?addAsyncHandlerData(AsyncHandlerData?asyncHandlerData)?{
          ????????????subTaskQueue.add(asyncHandlerData);
          ????????}
          ????????@Override
          ????????public?void?run()?{
          ????????????for?(;?;?)?{
          ????????????????try?{
          ????????????????????AsyncHandlerData?asyncHandlerData?=?subTaskQueue.take();
          ????????????????????long?count?=?taskCount.incrementAndGet();
          ????????????????????System.out.println("【"?+?childName?+?"】子任務(wù)隊(duì)列處理:"?+?asyncHandlerData.getDataInfo()?+?count);
          ????????????????????Thread.sleep(3000);
          ????????????????????System.out.println("【"?+?childName?+?"】子任務(wù)隊(duì)列處理:"?+?asyncHandlerData.getDataInfo()+"?任務(wù)處理結(jié)束"?+?count);
          ????????????????}?catch?(Exception?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}
          ????????????}
          ????????}
          ????}
          }

          測試接口:

          ????@GetMapping(value?=?"/send-async-data")
          ????public?boolean?sendAsyncData(){
          ????????AsyncHandlerData?asyncHandlerData?=?new?AsyncHandlerData();
          ????????asyncHandlerData.setDataInfo("data?info");
          ????????boolean?status?=?asyncMultiConsumerHandlerHandler.putTask(asyncHandlerData);
          ????????if(!status){
          ????????????throw?new?RuntimeException("insert?fail");
          ????????}
          ????????return?status;
          ????}

          這種設(shè)計(jì)模型適合用于對于請求吞吐量要求較高,每個(gè)請求都比較耗時(shí)的場景中。

          自定義拒絕策略的應(yīng)用根據(jù)具體的應(yīng)用場景,通過實(shí)現(xiàn)java.util.concurrent.RejectedExecutionHandler接口,自定義拒絕策略,例如對于當(dāng)拋出拒絕異常的時(shí)候,往數(shù)據(jù)庫中記錄一些信息或者日志。

          相關(guān)案例代碼:

          public?class?MyRejectPolicy{

          ????static?class?MyTask?implements?Runnable{

          ????????@Override
          ????????public?void?run()?{
          ????????????System.out.println("this?is?test");
          ????????????try?{
          ????????????????Thread.sleep(100);
          ????????????}?catch?(InterruptedException?e)?{
          ????????????????e.printStackTrace();
          ????????????}
          ????????}
          ????}

          ????public?static?void?main(String[]?args)?{
          ????????ThreadPoolExecutor?threadPoolExecutor?=?new?ThreadPoolExecutor(5,?5,?0L,?TimeUnit.SECONDS
          ????????????????,?new?LinkedBlockingQueue<>(10),?Executors.defaultThreadFactory(),?new?RejectedExecutionHandler()?{
          ????????????@Override
          ????????????public?void?rejectedExecution(Runnable?r,?ThreadPoolExecutor?executor)?{
          ????????????????System.out.println("任務(wù)被拒絕:"?+?r.toString());
          ????????????????//記錄一些信息
          ????????????}
          ????????});
          ????????for(int?i=0;i<100;i++){
          ????????????Thread?thread?=?new?Thread(new?MyTask());
          ????????????threadPoolExecutor.execute(thread);
          ????????}
          ????????Thread.yield();
          ????}
          }

          統(tǒng)計(jì)線程池的詳細(xì)信息

          通過閱讀線程池的源代碼之后,可以借助重寫beforeExecute、afterExecute、terminated 方法去對線程池的每個(gè)線程耗時(shí)做統(tǒng)計(jì)。以及通過繼承 ThreadPoolExecutor 對象之后,對當(dāng)前線程池的coreSIze、maxiMumSize等等屬性進(jìn)行監(jiān)控。

          相關(guān)案例代碼:

          public?class?SysThreadPool?extends?ThreadPoolExecutor?{

          ????private?final?ThreadLocal?startTime?=?new?ThreadLocal<>();

          ????private?Logger?logger?=?LoggerFactory.getLogger(SysThreadPool.class);

          ????public?SysThreadPool(int?corePoolSize,?int?maximumPoolSize,?long?keepAliveTime,?TimeUnit?unit,?BlockingQueue?workQueue)?{
          ????????super(corePoolSize,?maximumPoolSize,?keepAliveTime,?unit,?workQueue);
          ????}

          ????public?SysThreadPool(int?corePoolSize,?int?maximumPoolSize,?long?keepAliveTime,?TimeUnit?unit,?BlockingQueue?workQueue,?ThreadFactory?threadFactory)?{
          ????????super(corePoolSize,?maximumPoolSize,?keepAliveTime,?unit,?workQueue,?threadFactory);
          ????}

          ????public?SysThreadPool(int?corePoolSize,?int?maximumPoolSize,?long?keepAliveTime,?TimeUnit?unit,?BlockingQueue?workQueue,?RejectedExecutionHandler?handler)?{
          ????????super(corePoolSize,?maximumPoolSize,?keepAliveTime,?unit,?workQueue,?handler);
          ????}

          ????public?SysThreadPool(int?corePoolSize,?int?maximumPoolSize,?long?keepAliveTime,?TimeUnit?unit,?BlockingQueue?workQueue,?ThreadFactory?threadFactory,?RejectedExecutionHandler?handler)?{
          ????????super(corePoolSize,?maximumPoolSize,?keepAliveTime,?unit,?workQueue,?threadFactory,?handler);
          ????}

          ????@Override
          ????protected?void?beforeExecute(Thread?t,?Runnable?r)?{
          ????????super.beforeExecute(t,?r);
          ????????startTime.set(System.currentTimeMillis());
          ????}

          ????@Override
          ????protected?void?afterExecute(Runnable?r,?Throwable?t)?{
          ????????super.afterExecute(r,?t);
          ????????long?endTime?=?System.currentTimeMillis();
          ????????long?executeTime?=?endTime?-?startTime.get();
          ????????logger.info("Thread?{}:?ExecuteTime?{}",?r,?executeTime);
          ????}

          ????@Override
          ????public?void?shutdown()?{
          ????????super.shutdown();
          ????}

          ????@Override
          ????public?void?execute(Runnable?command)?{
          ????????super.execute(command);
          ????}

          ????public?void?getTaskInfo(){
          ????????logger.info("coreSize:?{},?maxSize:?{},?activeCount:{},blockQueueSize:{}",super.getCorePoolSize(),super.getMaximumPoolSize(),super.getActiveCount(),super.getQueue().size());
          ????}

          ????static?class?MyTestTask?implements?Runnable{

          ????????@Override
          ????????public?void?run()?{
          ????????????try?{
          ????????????????Thread.sleep(10);
          ????????????}?catch?(InterruptedException?e)?{
          ????????????????e.printStackTrace();
          ????????????}
          ????????}
          ????}

          ????public?static?void?main(String[]?args)?throws?InterruptedException?{
          ????????SysThreadPool?sysThreadPool?=?new?SysThreadPool(2,5,5000,TimeUnit.MILLISECONDS,new?ArrayBlockingQueue(2));
          ????????sysThreadPool.getTaskInfo();
          ????????System.out.println("------------");
          ????????for(int?i=0;i<10;i++){
          ????????????Thread?thread?=?new?Thread(new?MyTestTask());
          ????????????sysThreadPool.submit(thread);
          ????????????sysThreadPool.getTaskInfo();
          ????????}
          ????????System.out.println("------------");
          ????????Thread.sleep(3000);
          ????}

          }

          通過日志打印記錄線程池的參數(shù)變化:



          通過這份案例代碼不妨可以設(shè)想下通過一些定時(shí)上報(bào)邏輯來實(shí)現(xiàn)線程池的監(jiān)控功能。

          本文來源于網(wǎng)絡(luò),如有侵權(quán),聯(lián)系浪尖刪除:langjianliaodashuju


          瀏覽 40
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美国产大屌操疼 | 欧洲一级毛片 | 亚洲午夜男女爽爽影院 | 操高中生到高潮在线观看免费 | 东京热成人免费电影 |