從實(shí)戰(zhàn)到原理,線程池的各類使用場景整合
在日常的開發(fā)工作中,我們經(jīng)常會需要使用到線程池這類型的組件。例如下邊幾種應(yīng)用場景:
線程池經(jīng)典應(yīng)用場景
異步發(fā)送郵件通知發(fā)送一個(gè)任務(wù),然后注入到線程池中異步發(fā)送。
心跳請求任務(wù)創(chuàng)建一個(gè)任務(wù),然后定時(shí)發(fā)送請求到線程池中。
類似的場景有很多,我們下邊一步一步地來介紹不同的應(yīng)用場景下,線程池的具體使用案例:
異步發(fā)送郵件場景
定義一個(gè)簡單的郵件發(fā)送接口:
public?interface?SendEmailService?{
????/**
?????*?發(fā)送郵件
?????*
?????*?@param?emailDTO?郵件對象
?????*/
????void?sendEmail(EmailDTO?emailDTO);
}
接著是郵件發(fā)送的簡單實(shí)現(xiàn)類:
@Service
public?class?SendEmailServiceImpl?implements?SendEmailService?{
????@Resource
????private?ExecutorService?emailTaskPool;
????@Override
????public?void?sendEmail(EmailDTO?emailDTO)?{
????????emailTaskPool.submit(()?->?{
????????????try?{
????????????????System.out.printf("sending?email?....?emailDto?is?%s?\n",?emailDTO);
????????????????Thread.sleep(1000);
????????????????System.out.println("sended?success");
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????});
????}
}
郵件的發(fā)送邏輯通過一個(gè)簡單的線程睡眠來模擬發(fā)送過程中的耗時(shí)操作。
然后是線程池方面的配置:
@Configuration
public?class?ThreadPoolConfig?{
????@Bean
????public?ExecutorService?emailTaskPool()?{
????????return?new?ThreadPoolExecutor(2,?4,
????????????????0L,?TimeUnit.MILLISECONDS,
????????????????new?LinkedBlockingQueue(),?new?SysThreadFactory("email-task"));
????}
}
controller模塊的觸發(fā)
@RestController
@RequestMapping(value?=?"/test")
public?class?TestController?{
????@Resource
????private?SendEmailService?sendEmailService;
????@GetMapping(value?=?"/send-email")
????public?boolean?sendEmail()?{
????????EmailDTO?emailDTO?=?new?EmailDTO();
????????emailDTO.setContent("測試文案");
????????emailDTO.setReceiver("idea");
????????emailDTO.setTitle("郵件標(biāo)題");
????????sendEmailService.sendEmail(emailDTO);
????????return?true;
????}
}
這是一個(gè)非常簡單的案例,通過一個(gè)http請求,然后觸發(fā)一個(gè)郵件的發(fā)送操作。
心跳請求場景
這類應(yīng)用場景一般會在一些基礎(chǔ)組件中使用到,例如一些具有心跳探活機(jī)制類型功能的中間件,如nacos。下邊來看看對應(yīng)的代碼實(shí)踐:首先是心跳模塊代碼:
public?class?HeartBeatInfo?{
????private?String?info;
????private?long?nextSendTimeDelay;
????public?String?getInfo()?{
????????return?info;
????}
????public?void?setInfo(String?info)?{
????????this.info?=?info;
????}
????public?long?getNextSendTimeDelay()?{
????????return?nextSendTimeDelay;
????}
????public?void?setNextSendTimeDelay(long?nextSendTimeDelay)?{
????????this.nextSendTimeDelay?=?nextSendTimeDelay;
????}
????@Override
????public?String?toString()?{
????????return?"HeartBeatInfo{"?+
????????????????"info='"?+?info?+?'\''?+
????????????????",?nextSendTimeDelay="?+?nextSendTimeDelay?+
????????????????'}';
????}
}
然后是模擬一個(gè)心跳包的發(fā)送服務(wù)接口定義:
public?interface?HeartBeatTaskService?{
????void?sendBeatInfo();
}
接下來是心跳任務(wù)的發(fā)送核心部分實(shí)現(xiàn):
@Service
public?class?HeartBeatTaskServiceImpl?implements?HeartBeatTaskService?{
????@Resource
????private?ScheduledThreadPoolExecutor?scheduledThreadPoolExecutor;
????@Override
????public?void?sendBeatInfo()?{
????????HeartBeatInfo?heartBeatInfo?=?new?HeartBeatInfo();
????????heartBeatInfo.setInfo("test-info");
????????heartBeatInfo.setNextSendTimeDelay(1000);
????????scheduledThreadPoolExecutor.schedule(new?HeartBeatTask(heartBeatInfo),
????????????????heartBeatInfo.getNextSendTimeDelay(),?TimeUnit.MILLISECONDS);
????}
????class?HeartBeatTask?implements?Runnable?{
????????private?HeartBeatInfo?heartBeatInfo;
????????public?HeartBeatTask(HeartBeatInfo?heartBeatInfo)?{
????????????this.heartBeatInfo?=?heartBeatInfo;
????????}
????????@Override
????????public?void?run()?{
????????????System.out.println("發(fā)送心跳數(shù)據(jù)包:"?+?heartBeatInfo.getInfo());
????????????HeartBeatInfo?heartBeatInfo?=?new?HeartBeatInfo();
????????????heartBeatInfo.setInfo("test-info");
????????????heartBeatInfo.setNextSendTimeDelay(1000);
????????????scheduledThreadPoolExecutor.schedule(new?HeartBeatTask(heartBeatInfo),
????????????????????heartBeatInfo.getNextSendTimeDelay(),?TimeUnit.MILLISECONDS);
????????}
????}
}
在核心實(shí)現(xiàn)的內(nèi)部有一個(gè)延時(shí)線程池ScheduledThreadPoolExecutor,ScheduledThreadPoolExecutor會在放入線程任務(wù)的一段指定的時(shí)間之后才觸發(fā)任務(wù)的執(zhí)行:
@Configuration
public?class?ThreadPoolConfig?{
?
????@Bean
????public?ScheduledThreadPoolExecutor??scheduledThreadPoolExecutor(){
????????return?new?ScheduledThreadPoolExecutor(2,?new?ThreadFactory()?{
????????????@Override
????????????public?Thread?newThread(Runnable?r)?{
????????????????Thread?thread?=?new?Thread(r);
????????????????thread.setDaemon(true);
????????????????thread.setName("org.idea.threadpool.beat.sender");
????????????????return?thread;
????????????}
????????});
????}
}
JDK內(nèi)部線程池的設(shè)計(jì)
看了上邊兩個(gè)簡單的案例之后,不知道你是否會有好奇:
到底線程池的內(nèi)部運(yùn)行機(jī)制會是怎樣的呢?
簡單手寫一個(gè)單消費(fèi)者任務(wù)處理模型
這里我們可以通過一段簡單的代碼來學(xué)習(xí)這部分的內(nèi)容:首先,我們將需要處理的任務(wù)封裝在一個(gè)對象內(nèi)部,暫時(shí)定義如下所示:
public?class?AsyncHandlerData?{
????private?String?dataInfo;
????public?String?getDataInfo()?{
????????return?dataInfo;
????}
????public?void?setDataInfo(String?dataInfo)?{
????????this.dataInfo?=?dataInfo;
????}
????@Override
????public?String?toString()?{
????????return?"AsyncHandlerData{"?+
????????????????"dataInfo='"?+?dataInfo?+?'\''?+
????????????????'}';
????}
}
然后會有一個(gè)專門消費(fèi)這些個(gè)任務(wù)的service:
public?interface?AsyncHandlerService?{
????/**
?????*?任務(wù)放入隊(duì)列中
?????*?
?????*?@param?asyncHandlerData
?????*/
????void?putTask(AsyncHandlerData?asyncHandlerData);
}
最后根據(jù)提前定義好的接口編寫一個(gè)實(shí)現(xiàn)類,此時(shí)將相關(guān)的任務(wù)處理邏輯規(guī)整到了一個(gè)對象當(dāng)中:
@Service
public?class?AsyncHandlerServiceImpl?implements?AsyncHandlerService,?CommandLineRunner?{
????private?volatile?TaskQueueHandler?taskQueueHandler?=?new?TaskQueueHandler();
????@Override
????public?void?putTask(AsyncHandlerData?asyncHandlerData)?{
????????taskQueueHandler.addTask(asyncHandlerData);
????}
????@Override
????public?void?run(String...?args)?throws?Exception?{
????????Thread?thread?=?new?Thread(taskQueueHandler);
????????thread.setDaemon(true);
????????thread.start();
????}
????public?class?TaskQueueHandler?implements?Runnable?{
????????private?BlockingQueue?tasks?=?new?ArrayBlockingQueue<>(1024?*?1024);
????????public?void?addTask(AsyncHandlerData?asyncHandlerData)?{
????????????tasks.offer(asyncHandlerData);
????????}
????????@Override
????????public?void?run()?{
????????????for?(;?;?)?{
????????????????try?{
????????????????????AsyncHandlerData?asyncHandlerData?=?tasks.take();
????????????????????System.out.println("異步處理任務(wù)數(shù)據(jù):"?+?asyncHandlerData.getDataInfo());
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????}
????}
}
整個(gè)代碼的思路邏輯比較簡單,大致可以歸整成下圖所示:

整體的設(shè)計(jì)模式就是一端放入,由單個(gè)消費(fèi)者取出。但是存在一個(gè)不足點(diǎn),一旦消費(fèi)者能力較弱,或者出現(xiàn)任務(wù)堵塞的話,就會導(dǎo)致任務(wù)隊(duì)列出現(xiàn)堆積,然后越堆積越難處理地過來。
但是這樣的設(shè)計(jì)還是一個(gè)過于簡單的模型,下邊我們來看看jdk內(nèi)部線程池的設(shè)計(jì)模式:
線程池內(nèi)部的源代碼分析
我們在項(xiàng)目里使用線程池的時(shí)候,通常都會先創(chuàng)建一個(gè)具體實(shí)現(xiàn)Bean來定義線程池,例如:
@Bean
public?ExecutorService?emailTaskPool()?{
????return?new?ThreadPoolExecutor(2,?4,
????????????0L,?TimeUnit.MILLISECONDS,
????????????new?LinkedBlockingQueue(),?new?SysThreadFactory("email-task"));
}
ThreadPoolExecutor的父類是AbstractExecutorService,然后AbstractExecutorService的頂層接口是:ExecutorService。
就例如發(fā)送郵件接口而言,當(dāng)線程池觸發(fā)了submit函數(shù)的時(shí)候,實(shí)際上會調(diào)用到父類AbstractExecutorService對象的java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)方法,然后進(jìn)入到ThreadPoolExecutor#execute部分。
@Override
public?void?sendEmail(EmailDTO?emailDTO)?{
????emailTaskPool.submit(()?->?{
????????try?{
????????????System.out.printf("sending?email?....?emailDto?is?%s?\n",?emailDTO);
????????????Thread.sleep(1000);
????????????System.out.println("sended?success");
????????}?catch?(InterruptedException?e)?{
????????????e.printStackTrace();
????????}
????});
}
java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)?源代碼位置:
/**
?*?@throws?RejectedExecutionException?{@inheritDoc}
?*?@throws?NullPointerException???????{@inheritDoc}
?*/
public?Future>?submit(Runnable?task)?{
????if?(task?==?null)?throw?new?NullPointerException();
????RunnableFuture?ftask?=?newTaskFor(task,?null);
????execute(ftask);
????return?ftask;
}
這里面你會看到返回的是一個(gè)future對象供調(diào)用方判斷線程池內(nèi)部的函數(shù)到底是否有完全執(zhí)行成功。因此如果有時(shí)候如果需要判斷線程池執(zhí)行任務(wù)的結(jié)果話,可以這樣操作:
Future?future?=?emailTaskPool.submit(()?->?{
??????????try?{
??????????????System.out.printf("sending?email?....?emailDto?is?%s?\n",?emailDTO);
??????????????Thread.sleep(1000);
??????????????System.out.println("sended?success");
??????????}?catch?(InterruptedException?e)?{
??????????????e.printStackTrace();
??????????}
??????});
??????//todo?something
??????future.get();
}
在jdk8源代碼中,提交任務(wù)的執(zhí)行邏輯部分如下所示:新增線程任務(wù)的時(shí)候代碼:
??public?void?execute(Runnable?command)?{
????????if?(command?==?null)
????????????throw?new?NullPointerException();
????????/*
?????????*?Proceed?in?3?steps:
?????????*
?????????*?1.?If?fewer?than?corePoolSize?threads?are?running,?try?to
?????????*?start?a?new?thread?with?the?given?command?as?its?first
?????????*?task.??The?call?to?addWorker?atomically?checks?runState?and
?????????*?workerCount,?and?so?prevents?false?alarms?that?would?add
?????????*?threads?when?it?shouldn't,?by?returning?false.
?????????*
?????????*?2.?If?a?task?can?be?successfully?queued,?then?we?still?need
?????????*?to?double-check?whether?we?should?have?added?a?thread
?????????*?(because?existing?ones?died?since?last?checking)?or?that
?????????*?the?pool?shut?down?since?entry?into?this?method.?So?we
?????????*?recheck?state?and?if?necessary?roll?back?the?enqueuing?if
?????????*?stopped,?or?start?a?new?thread?if?there?are?none.
?????????*
?????????*?3.?If?we?cannot?queue?task,?then?we?try?to?add?a?new
?????????*?thread.??If?it?fails,?we?know?we?are?shut?down?or?saturated
?????????*?and?so?reject?the?task.
?????????*/
????????int?c?=?ctl.get();
????????//工作線程數(shù)小于核心線程的時(shí)候,可以填寫worker線程
????????if?(workerCountOf(c)???????????????//新增工作線程的時(shí)候會加鎖
????????????if?(addWorker(command,?true))
????????????????return;
????????????c?=?ctl.get();
????????}
????????//如果線程池的狀態(tài)正常,切任務(wù)放入就緒隊(duì)列正常
????????if?(isRunning(c)?&&?workQueue.offer(command))?{
????????????int?recheck?=?ctl.get();
????????????if?(!?isRunning(recheck)?&&?remove(command))
????????????????//如果當(dāng)前線程池處于關(guān)閉狀態(tài),則拋出拒絕異常
????????????????reject(command);
????????????//如果工作線程數(shù)超過了核心線程數(shù),那么就需要考慮新增工作線程
????????????else?if?(workerCountOf(recheck)?==?0)
????????????????addWorker(null,?false);
????????}
????????//如果新增的工作線程已經(jīng)達(dá)到了最大線程數(shù)限制的條件下,需要觸發(fā)拒絕策略的拋出
????????else?if?(!addWorker(command,?false))
????????????reject(command);
????}
通過深入閱讀工作線程主要存放在了一個(gè)hashset集合當(dāng)中, 添加工作線程部分的邏輯代碼如下所示:
private?boolean?addWorker(Runnable?firstTask,?boolean?core)?{
????retry:
????for?(;;)?{
????????int?c?=?ctl.get();
????????int?rs?=?runStateOf(c);
????????//確保當(dāng)前線程池沒有進(jìn)入到一個(gè)銷毀狀態(tài)中
????????//?Check?if?queue?empty?only?if?necessary.
????????if?(rs?>=?SHUTDOWN?&&
????????????!?(rs?==?SHUTDOWN?&&
???????????????firstTask?==?null?&&
???????????????!?workQueue.isEmpty()))
????????????return?false;
????????for?(;;)?{
????????????int?wc?=?workerCountOf(c);
????????????if?(wc?>=?CAPACITY?||
??????????????//?如果傳入的core屬性是false,則這里需要比對maximumPoolSize參數(shù)
????????????????wc?>=?(core???corePoolSize?:?maximumPoolSize))
????????????????return?false;
????????????????//通過cas操作去增加線程池的工作線程數(shù)畝
????????????if?(compareAndIncrementWorkerCount(c))
????????????????break?retry;
????????????c?=?ctl.get();??//?Re-read?ctl
????????????if?(runStateOf(c)?!=?rs)
????????????????continue?retry;
????????????//?else?CAS?failed?due?to?workerCount?change;?retry?inner?loop
????????}
????}
????boolean?workerStarted?=?false;
????boolean?workerAdded?=?false;
????Worker?w?=?null;
????try?{
???????//真正需要指定的任務(wù)是firstTask,它會被注入到worker對象當(dāng)中
????????w?=?new?Worker(firstTask);
????????final?Thread?t?=?w.thread;
????????if?(t?!=?null)?{
????????//加入了鎖
????????????final?ReentrantLock?mainLock?=?this.mainLock;
????????????mainLock.lock();
????????????try?{
????????????????//?Recheck?while?holding?lock.
????????????????//?Back?out?on?ThreadFactory?failure?or?if
????????????????//?shut?down?before?lock?acquired.
????????????????int?rs?=?runStateOf(ctl.get());
????????????????if?(rs?????????????????????(rs?==?SHUTDOWN?&&?firstTask?==?null))?{
????????????????????if?(t.isAlive())?//?precheck?that?t?is?startable
????????????????????????throw?new?IllegalThreadStateException();
????????????????????//workers是一個(gè)hashset集合,會往里面新增工作線程????
????????????????????workers.add(w);
????????????????????int?s?=?workers.size();
????????????????????if?(s?>?largestPoolSize)
????????????????????????largestPoolSize?=?s;
????????????????????workerAdded?=?true;
????????????????}
????????????}?finally?{
????????????????mainLock.unlock();
????????????}
????????????if?(workerAdded)?{
????????????????//worker本身是一個(gè)線程,但是worker對象內(nèi)部還有一個(gè)線程的參數(shù),
????????????????//這個(gè)t才是真正的任務(wù)內(nèi)容
????????????????t.start();
????????????????workerStarted?=?true;
????????????}
????????}
????}?finally?{
????????//如果worker線程創(chuàng)建好了,但是內(nèi)部的真正任務(wù)還沒有啟動,此時(shí)突然整個(gè)
????????//線程池的狀態(tài)被關(guān)閉了,那么這時(shí)候workerStarted就會為false,然后將
????????//工作線程的數(shù)目做自減調(diào)整。
????????if?(!?workerStarted)
????????????addWorkerFailed(w);
????}
????return?workerStarted;
}
進(jìn)過理解之后,整體執(zhí)行的邏輯以及先后順序如下圖所示:
首先判斷線程池內(nèi)部的現(xiàn)場是否都有任務(wù)需要執(zhí)行。如果不是,則使用一個(gè)空閑的工作線程用于任務(wù)執(zhí)行。否則會判斷當(dāng)前的堵塞隊(duì)列是否已經(jīng)滿了,如果沒有滿則往隊(duì)列里面投遞任務(wù),等待工作線程去處理。如果堵塞隊(duì)列已經(jīng)滿了,此時(shí)會判斷工作線程數(shù)是否大于最大線程數(shù),如果沒有,則繼續(xù)創(chuàng)建工作線程,如果已經(jīng)達(dá)到則根據(jù)飽和策略去判斷是果斷拋出異常還是其他方式來進(jìn)行處理。線程池常用參數(shù)介紹
corePoolSize核心線程數(shù),當(dāng)往線程池內(nèi)部提交任務(wù)的時(shí)候,線程池會創(chuàng)建一個(gè)線程來執(zhí)行任務(wù)。即使此時(shí)有空閑的工作線程能夠處理當(dāng)前任務(wù),只要總的工作線程數(shù)小于corePoolSize,也會創(chuàng)建新的工作線程。
maximumPoolSize當(dāng)任務(wù)的堵塞隊(duì)列滿了之后,如果還有新的任務(wù)提交到線程池內(nèi)部,此時(shí)倘若工作線程數(shù)小于maximumPoolSize,則會創(chuàng)建新的工作線程。
keepAliveTime上邊我們說到了工作線程Worker(java.util.concurrent.ThreadPoolExecutor.Worker),當(dāng)工作線程處于空閑狀態(tài)中,如果超過了keepAliveTime依然沒有任務(wù),那么就會銷毀當(dāng)前工作線程。如果工作線程需要一直處于執(zhí)行任務(wù),每個(gè)任務(wù)的連續(xù)間隔都比較短,那么這個(gè)keepAliveTime 屬性可以適當(dāng)?shù)卣{(diào)整大一些。
unitkeepAliveTime對應(yīng)的時(shí)間單位
workQueue工作隊(duì)列,當(dāng)工作線程數(shù)達(dá)到了核心線程數(shù),那么此時(shí)新來的線程就會被放入到工作隊(duì)列中。線程池內(nèi)部的工作隊(duì)列全部都是繼承自阻塞隊(duì)列的接口,對于常用的阻塞隊(duì)列類型為:
ArrayBlockingQueue LinkedBlockingQueue SynchronousQueue PriorityBlockingQueue
RejectedExecutionHandlerJDK內(nèi)部的線程拒絕策略包含了多種許多種,這里我羅列一些常見的拒絕策略給大家認(rèn)識下:
AbortPolicy 直接拋出異常 CallerRunsPolicy 任務(wù)的執(zhí)行由注入的線程自己執(zhí)行 DiscardOldestPolicy 直接拋棄掉堵塞隊(duì)列中隊(duì)列頭部的任務(wù),然后執(zhí)行嘗試將當(dāng)前任務(wù)提交到堵塞隊(duì)列中。 DiscardPolicy 直接拋棄這個(gè)任務(wù)
從線程池設(shè)計(jì)中的一些啟發(fā)
多消費(fèi)隊(duì)列的設(shè)計(jì)場景應(yīng)用:業(yè)務(wù)上游提交任務(wù),然后任務(wù)被放進(jìn)一個(gè)堵塞隊(duì)列中,接下來消費(fèi)者需要從堵塞隊(duì)列中提取元素,并且將它們轉(zhuǎn)發(fā)到多個(gè)子隊(duì)列中,各個(gè)子隊(duì)列分別交給不同的子消費(fèi)者處理數(shù)據(jù)。例如下圖所示:

public?interface?AsyncHandlerService?{
????/**
?????*?任務(wù)放入隊(duì)列中
?????*?
?????*?@param?asyncHandlerData
?????*/
????boolean?putTask(AsyncHandlerData?asyncHandlerData);
????/**
?????*?啟動消費(fèi)
?????*/
????void?startJob();
}
多消費(fèi)者分發(fā)處理實(shí)現(xiàn)類:
@Component("asyncMultiConsumerHandlerHandler")
public?class?AsyncMultiConsumerHandlerHandler?implements?AsyncHandlerService{
????private?volatile?TaskQueueHandler?taskQueueHandler?=?new?TaskQueueHandler(10);
????@Override
????public?boolean?putTask(AsyncHandlerData?asyncHandlerData)?{
????????return?taskQueueHandler.addTask(asyncHandlerData);
????}
????@Override
????public?void?startJob(){
????????Thread?thread?=?new?Thread(taskQueueHandler);
????????thread.setDaemon(true);
????????thread.start();
????}
????/**
?????*?將任務(wù)分發(fā)給各個(gè)子隊(duì)列去處理
?????*/
????static?class?TaskQueueHandler?implements?Runnable?{
????????private?static?BlockingQueue?tasks?=?new?ArrayBlockingQueue<>(11);
????????public?static?BlockingQueue?getAllTaskInfo()? {
????????????return?tasks;
????????}
????????private?TaskDispatcherHandler[]?taskDispatcherHandlers;
????????private?int?childConsumerSize?=?0;
????????public?TaskQueueHandler(int?childConsumerSize)?{
????????????this.childConsumerSize?=?childConsumerSize;
????????????taskDispatcherHandlers?=?new?TaskDispatcherHandler[childConsumerSize];
????????????for?(int?i?=?0;?i?????????????????taskDispatcherHandlers[i]?=?new?TaskDispatcherHandler(new?ArrayBlockingQueue<>(100),?"child-worker-"?+?i);
????????????????Thread?thread?=?new?Thread(taskDispatcherHandlers[i]);
????????????????thread.setDaemon(false);
????????????????thread.setName("taskQueueHandler-child-"+i);
????????????????thread.start();
????????????}
????????}
????????public?boolean?addTask(AsyncHandlerData?asyncHandlerData)?{
????????????return?tasks.offer(asyncHandlerData);
????????}
????????@Override
????????public?void?run()?{
????????????int?index?=?0;
????????????for?(;?;?)?{
????????????????try?{
????????????????????AsyncHandlerData?asyncHandlerData?=?tasks.take();
????????????????????index?=?(index?==?taskDispatcherHandlers.length)???0?:?index;
????????????????????taskDispatcherHandlers[index].addAsyncHandlerData(asyncHandlerData);
????????????????????index++;
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????}
????}
????static?class?TaskDispatcherHandler?implements?Runnable?{
????????private?BlockingQueue?subTaskQueue;
????????private?String?childName;
????????private?AtomicLong?taskCount?=?new?AtomicLong(0);
????????public?TaskDispatcherHandler(BlockingQueue?blockingQueue,?String?childName) ?{
????????????this.subTaskQueue?=?blockingQueue;
????????????this.childName?=?childName;
????????}
????????public?void?addAsyncHandlerData(AsyncHandlerData?asyncHandlerData)?{
????????????subTaskQueue.add(asyncHandlerData);
????????}
????????@Override
????????public?void?run()?{
????????????for?(;?;?)?{
????????????????try?{
????????????????????AsyncHandlerData?asyncHandlerData?=?subTaskQueue.take();
????????????????????long?count?=?taskCount.incrementAndGet();
????????????????????System.out.println("【"?+?childName?+?"】子任務(wù)隊(duì)列處理:"?+?asyncHandlerData.getDataInfo()?+?count);
????????????????????Thread.sleep(3000);
????????????????????System.out.println("【"?+?childName?+?"】子任務(wù)隊(duì)列處理:"?+?asyncHandlerData.getDataInfo()+"?任務(wù)處理結(jié)束"?+?count);
????????????????}?catch?(Exception?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????}
????}
}
測試接口:
????@GetMapping(value?=?"/send-async-data")
????public?boolean?sendAsyncData(){
????????AsyncHandlerData?asyncHandlerData?=?new?AsyncHandlerData();
????????asyncHandlerData.setDataInfo("data?info");
????????boolean?status?=?asyncMultiConsumerHandlerHandler.putTask(asyncHandlerData);
????????if(!status){
????????????throw?new?RuntimeException("insert?fail");
????????}
????????return?status;
????}
這種設(shè)計(jì)模型適合用于對于請求吞吐量要求較高,每個(gè)請求都比較耗時(shí)的場景中。
自定義拒絕策略的應(yīng)用根據(jù)具體的應(yīng)用場景,通過實(shí)現(xiàn)java.util.concurrent.RejectedExecutionHandler接口,自定義拒絕策略,例如對于當(dāng)拋出拒絕異常的時(shí)候,往數(shù)據(jù)庫中記錄一些信息或者日志。
相關(guān)案例代碼:
public?class?MyRejectPolicy{
????static?class?MyTask?implements?Runnable{
????????@Override
????????public?void?run()?{
????????????System.out.println("this?is?test");
????????????try?{
????????????????Thread.sleep(100);
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????}
????public?static?void?main(String[]?args)?{
????????ThreadPoolExecutor?threadPoolExecutor?=?new?ThreadPoolExecutor(5,?5,?0L,?TimeUnit.SECONDS
????????????????,?new?LinkedBlockingQueue<>(10),?Executors.defaultThreadFactory(),?new?RejectedExecutionHandler()?{
????????????@Override
????????????public?void?rejectedExecution(Runnable?r,?ThreadPoolExecutor?executor)?{
????????????????System.out.println("任務(wù)被拒絕:"?+?r.toString());
????????????????//記錄一些信息
????????????}
????????});
????????for(int?i=0;i<100;i++){
????????????Thread?thread?=?new?Thread(new?MyTask());
????????????threadPoolExecutor.execute(thread);
????????}
????????Thread.yield();
????}
}
統(tǒng)計(jì)線程池的詳細(xì)信息
通過閱讀線程池的源代碼之后,可以借助重寫beforeExecute、afterExecute、terminated 方法去對線程池的每個(gè)線程耗時(shí)做統(tǒng)計(jì)。以及通過繼承 ThreadPoolExecutor 對象之后,對當(dāng)前線程池的coreSIze、maxiMumSize等等屬性進(jìn)行監(jiān)控。
相關(guān)案例代碼:
public?class?SysThreadPool?extends?ThreadPoolExecutor?{
????private?final?ThreadLocal?startTime?=?new?ThreadLocal<>();
????private?Logger?logger?=?LoggerFactory.getLogger(SysThreadPool.class);
????public?SysThreadPool(int?corePoolSize,?int?maximumPoolSize,?long?keepAliveTime,?TimeUnit?unit,?BlockingQueue?workQueue) ?{
????????super(corePoolSize,?maximumPoolSize,?keepAliveTime,?unit,?workQueue);
????}
????public?SysThreadPool(int?corePoolSize,?int?maximumPoolSize,?long?keepAliveTime,?TimeUnit?unit,?BlockingQueue?workQueue,?ThreadFactory?threadFactory) ?{
????????super(corePoolSize,?maximumPoolSize,?keepAliveTime,?unit,?workQueue,?threadFactory);
????}
????public?SysThreadPool(int?corePoolSize,?int?maximumPoolSize,?long?keepAliveTime,?TimeUnit?unit,?BlockingQueue?workQueue,?RejectedExecutionHandler?handler) ?{
????????super(corePoolSize,?maximumPoolSize,?keepAliveTime,?unit,?workQueue,?handler);
????}
????public?SysThreadPool(int?corePoolSize,?int?maximumPoolSize,?long?keepAliveTime,?TimeUnit?unit,?BlockingQueue?workQueue,?ThreadFactory?threadFactory,?RejectedExecutionHandler?handler) ?{
????????super(corePoolSize,?maximumPoolSize,?keepAliveTime,?unit,?workQueue,?threadFactory,?handler);
????}
????@Override
????protected?void?beforeExecute(Thread?t,?Runnable?r)?{
????????super.beforeExecute(t,?r);
????????startTime.set(System.currentTimeMillis());
????}
????@Override
????protected?void?afterExecute(Runnable?r,?Throwable?t)?{
????????super.afterExecute(r,?t);
????????long?endTime?=?System.currentTimeMillis();
????????long?executeTime?=?endTime?-?startTime.get();
????????logger.info("Thread?{}:?ExecuteTime?{}",?r,?executeTime);
????}
????@Override
????public?void?shutdown()?{
????????super.shutdown();
????}
????@Override
????public?void?execute(Runnable?command)?{
????????super.execute(command);
????}
????public?void?getTaskInfo(){
????????logger.info("coreSize:?{},?maxSize:?{},?activeCount:{},blockQueueSize:{}",super.getCorePoolSize(),super.getMaximumPoolSize(),super.getActiveCount(),super.getQueue().size());
????}
????static?class?MyTestTask?implements?Runnable{
????????@Override
????????public?void?run()?{
????????????try?{
????????????????Thread.sleep(10);
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????}
????public?static?void?main(String[]?args)?throws?InterruptedException?{
????????SysThreadPool?sysThreadPool?=?new?SysThreadPool(2,5,5000,TimeUnit.MILLISECONDS,new?ArrayBlockingQueue(2));
????????sysThreadPool.getTaskInfo();
????????System.out.println("------------");
????????for(int?i=0;i<10;i++){
????????????Thread?thread?=?new?Thread(new?MyTestTask());
????????????sysThreadPool.submit(thread);
????????????sysThreadPool.getTaskInfo();
????????}
????????System.out.println("------------");
????????Thread.sleep(3000);
????}
}
通過日志打印記錄線程池的參數(shù)變化:

通過這份案例代碼不妨可以設(shè)想下通過一些定時(shí)上報(bào)邏輯來實(shí)現(xiàn)線程池的監(jiān)控功能。
