任務(wù)數(shù)量超過(guò)線程池負(fù)荷了怎么辦?拒絕策略安排起來(lái)!

通過(guò)之前三篇關(guān)于Spring Boot異步任務(wù)實(shí)現(xiàn)的博文,我們分別學(xué)會(huì)了:
今天我們繼續(xù)對(duì)異步任務(wù)的實(shí)現(xiàn)進(jìn)行完善和優(yōu)化!
如果你已經(jīng)看過(guò)上面幾篇內(nèi)容并已經(jīng)掌握之后,一起來(lái)思考下面這個(gè)問(wèn)題:
假設(shè),線程池配置為核心線程數(shù)2、最大線程數(shù)2、緩沖隊(duì)列長(zhǎng)度2。此時(shí),有5個(gè)異步任務(wù)同時(shí)開(kāi)始,會(huì)發(fā)生什么?
場(chǎng)景重現(xiàn)
我們先來(lái)把上面的假設(shè)用代碼實(shí)現(xiàn)一下:
第一步:創(chuàng)建Spring Boot應(yīng)用,根據(jù)上面的假設(shè)寫(xiě)好線程池配置。
@EnableAsync
@SpringBootApplication
public class Chapter78Application {
public static void main(String[] args) {
SpringApplication.run(Chapter78Application.class, args);
}
@EnableAsync
@Configuration
class TaskPoolConfig {
@Bean
public Executor taskExecutor1() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(2);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("executor-1-");
return executor;
}
}
}
@Async注解實(shí)現(xiàn)一個(gè)部分任務(wù)
@Slf4j
@Component
public class AsyncTasks {
public static Random random = new Random();
@Async("taskExecutor1")
public CompletableFuture<String> doTaskOne(String taskNo) throws Exception {
log.info("開(kāi)始任務(wù):{}", taskNo);
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
log.info("完成任務(wù):{},耗時(shí):{} 毫秒", taskNo, end - start);
return CompletableFuture.completedFuture("任務(wù)完成");
}
}
第三步:編寫(xiě)測(cè)試用例
@Slf4j
@SpringBootTest
public class Chapter78ApplicationTests {
@Autowired
private AsyncTasks asyncTasks;
@Test
public void test2() throws Exception {
// 線程池配置:core-2,max-2,queue=2,同時(shí)有5個(gè)任務(wù),出現(xiàn)下面異常:
// org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@59901c4d[Running, pool size = 2,
// active threads = 0, queued tasks = 2, completed tasks = 4]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@408e96d9
long start = System.currentTimeMillis();
// 線程池1
CompletableFuture<String> task1 = asyncTasks.doTaskOne("1");
CompletableFuture<String> task2 = asyncTasks.doTaskOne("2");
CompletableFuture<String> task3 = asyncTasks.doTaskOne("3");
CompletableFuture<String> task4 = asyncTasks.doTaskOne("4");
CompletableFuture<String> task5 = asyncTasks.doTaskOne("5");
// 一起執(zhí)行
CompletableFuture.allOf(task1, task2, task3, task4, task5).join();
long end = System.currentTimeMillis();
log.info("任務(wù)全部完成,總耗時(shí):" + (end - start) + "毫秒");
}
}
執(zhí)行一下,可以類似下面這樣的日志信息:
2021-09-22 17:33:08.159 INFO 21119 --- [ executor-1-2] com.didispace.chapter78.AsyncTasks : 開(kāi)始任務(wù):2
2021-09-22 17:33:08.159 INFO 21119 --- [ executor-1-1] com.didispace.chapter78.AsyncTasks : 開(kāi)始任務(wù):1
org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@64968732
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:324)
at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
at java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:274)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
at com.didispace.chapter78.AsyncTasks$$EnhancerBySpringCGLIB$$c7e8d57b.doTaskOne(<generated>)
at com.didispace.chapter78.Chapter78ApplicationTests.test2(Chapter78ApplicationTests.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
...
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@64968732 rejected from java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:321)
... 74 more
org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task:中,可以明確知道:第5個(gè)任務(wù)因?yàn)槌^(guò)了執(zhí)行線程+緩沖隊(duì)列長(zhǎng)度,而被拒絕了。
所有,默認(rèn)情況下,線程池的拒絕策略是:當(dāng)線程池隊(duì)列滿了,會(huì)丟棄這個(gè)任務(wù),并拋出異常。
配置拒絕策略
雖然線程池有默認(rèn)的拒絕策略,但實(shí)際開(kāi)發(fā)過(guò)程中,有些業(yè)務(wù)場(chǎng)景,直接拒絕的策略往往并不適用,有時(shí)候我們可能會(huì)選擇舍棄最早開(kāi)始執(zhí)行而未完成的任務(wù)、也可能會(huì)選擇舍棄剛開(kāi)始執(zhí)行而未完成的任務(wù)等更貼近業(yè)務(wù)需要的策略。所以,為線程池配置其他拒絕策略或自定義拒絕策略是很常見(jiàn)的需求,那么這個(gè)要怎么實(shí)現(xiàn)呢?
下面就來(lái)具體說(shuō)說(shuō)今天的正題,如何為線程池配置拒絕策略、如何自定義拒絕策略。
看下面這段代碼的最后一行,setRejectedExecutionHandler方法就是為線程池設(shè)置拒絕策略的方法:
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//...其他線程池配置
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
在ThreadPoolExecutor中提供了4種線程的策略可以供開(kāi)發(fā)者直接使用,你只需要像下面這樣設(shè)置即可:
// AbortPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// DiscardPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// DiscardOldestPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
// CallerRunsPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
這四個(gè)策略對(duì)應(yīng)的含義分別是:
AbortPolicy策略:默認(rèn)策略,如果線程池隊(duì)列滿了丟掉這個(gè)任務(wù)并且拋出RejectedExecutionException異常。 DiscardPolicy策略:如果線程池隊(duì)列滿了,會(huì)直接丟掉這個(gè)任務(wù)并且不會(huì)有任何異常。 DiscardOldestPolicy策略:如果隊(duì)列滿了,會(huì)將最早進(jìn)入隊(duì)列的任務(wù)刪掉騰出空間,再嘗試加入隊(duì)列。 CallerRunsPolicy策略:如果添加到線程池失敗,那么主線程會(huì)自己去執(zhí)行該任務(wù),不會(huì)等待線程池中的線程去執(zhí)行。
而如果你要自定義一個(gè)拒絕策略,那么可以這樣寫(xiě):
executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 拒絕策略的邏輯
}
});
當(dāng)然如果你喜歡用Lamba表達(dá)式,也可以這樣寫(xiě):
executor.setRejectedExecutionHandler((r, executor1) -> {
// 拒絕策略的邏輯
});
好了,今天的學(xué)習(xí)就到這里!更多Spring Boot教程可以點(diǎn)擊文末閱讀原文直達(dá)教程目錄!
代碼示例
本文的完整工程可以查看下面?zhèn)}庫(kù)中2.x目錄下的chapter7-8工程:
Github:https://github.com/dyc87112/SpringBoot-Learning/ Gitee:https://gitee.com/didispace/SpringBoot-Learning/
如果您覺(jué)得本文不錯(cuò),歡迎Star支持,您的關(guān)注是我堅(jiān)持的動(dòng)力!
往期推薦
技術(shù)交流群
最近有很多人問(wèn),有沒(méi)有讀者交流群,想知道怎么加入。加入方式很簡(jiǎn)單,有興趣的同學(xué),只需要點(diǎn)擊下方卡片,回復(fù)“加群“,即可免費(fèi)加入我們的高質(zhì)量技術(shù)交流群!
點(diǎn)擊閱讀原文,直達(dá)教程目錄
