SpringBoot 如何實(shí)現(xiàn)異步編程,老鳥們都這么玩的!
今天來(lái)聊聊在SpringBoot項(xiàng)目中如何實(shí)現(xiàn)異步編程。
首先我們來(lái)看看在Spring中為什么要使用異步編程,它能解決什么問(wèn)題?
為什么要用異步框架,它解決什么問(wèn)題?
在SpringBoot的日常開發(fā)中,一般都是同步調(diào)用的。但實(shí)際中有很多場(chǎng)景非常適合使用異步來(lái)處理,如:注冊(cè)新用戶,送100個(gè)積分;或下單成功,發(fā)送push消息等等。
就拿注冊(cè)新用戶這個(gè)用例來(lái)說(shuō),為什么要異步處理?
第一個(gè)原因:容錯(cuò)性、健壯性,如果送積分出現(xiàn)異常,不能因?yàn)樗头e分而導(dǎo)致用戶注冊(cè)失敗;因?yàn)橛脩糇?cè)是主要功能,送積分是次要功能,即使送積分異常也要提示用戶注冊(cè)成功,然后后面在針對(duì)積分異常做補(bǔ)償處理。 第二個(gè)原因:提升性能,例如注冊(cè)用戶花了20毫秒,送積分花費(fèi)50毫秒,如果用同步的話,總耗時(shí)70毫秒,用異步的話,無(wú)需等待積分,故耗時(shí)20毫秒。
故,異步能解決2個(gè)問(wèn)題,性能和容錯(cuò)性。
SpringBoot如何實(shí)現(xiàn)異步調(diào)用?
對(duì)于異步方法調(diào)用,從Spring3開始提供了@Async注解,我們只需要在方法上標(biāo)注此注解,此方法即可實(shí)現(xiàn)異步調(diào)用。
當(dāng)然,我們還需要一個(gè)配置類,通過(guò)Enable模塊驅(qū)動(dòng)注解@EnableAsync 來(lái)開啟異步功能。
實(shí)現(xiàn)異步調(diào)用
第一步:新建配置類,開啟@Async功能支持
使用@EnableAsync來(lái)開啟異步任務(wù)支持,@EnableAsync注解可以直接放在SpringBoot啟動(dòng)類上,也可以單獨(dú)放在其他配置類上。我們這里選擇使用單獨(dú)的配置類SyncConfiguration。
@Configuration
@EnableAsync
public?class?AsyncConfiguration?{
}
第二步:在方法上標(biāo)記異步調(diào)用
增加一個(gè)Component類,用來(lái)進(jìn)行業(yè)務(wù)處理,同時(shí)添加@Async注解,代表該方法為異步處理。
@Component
@Slf4j
public?class?AsyncTask?{
????@SneakyThrows
????@Async
????public?void?doTask1()?{
????????long?t1?=?System.currentTimeMillis();
????????Thread.sleep(2000);
????????long?t2?=?System.currentTimeMillis();
????????log.info("task1?cost?{}?ms"?,?t2-t1);
????}
????@SneakyThrows
????@Async
????public?void?doTask2()?{
????????long?t1?=?System.currentTimeMillis();
????????Thread.sleep(3000);
????????long?t2?=?System.currentTimeMillis();
????????log.info("task2?cost?{}?ms"?,?t2-t1);
????}
}
第三步:在Controller中進(jìn)行異步方法調(diào)用
@RestController
@RequestMapping("/async")
@Slf4j
public?class?AsyncController?{
????@Autowired
????private?AsyncTask?asyncTask;
????@RequestMapping("/task")
????public?void?task()?throws?InterruptedException?{
????????long?t1?=?System.currentTimeMillis();
????????asyncTask.doTask1();
????????asyncTask.doTask2();
????????Thread.sleep(1000);
????????long?t2?=?System.currentTimeMillis();
????????log.info("main?cost?{}?ms",?t2-t1);
????}
}
通過(guò)訪問(wèn)http://localhost:8080/async/task查看控制臺(tái)日志:
2021-11-25?15:48:37?[http-nio-8080-exec-8]?INFO??com.jianzh5.blog.async.AsyncController:26?-?main?cost?1009?ms
2021-11-25?15:48:38?[task-1]?INFO??com.jianzh5.blog.async.AsyncTask:22?-?task1?cost?2005?ms
2021-11-25?15:48:39?[task-2]?INFO??com.jianzh5.blog.async.AsyncTask:31?-?task2?cost?3005?ms
通過(guò)日志可以看到:主線程不需要等待異步方法執(zhí)行完成,減少了響應(yīng)時(shí)間,提高了接口性能。
通過(guò)上面三步我們就可以在SpringBoot中歡樂(lè)的使用異步方法來(lái)提高我們接口性能了,是不是很簡(jiǎn)單?
不過(guò),如果你在實(shí)際項(xiàng)目開發(fā)中真這樣寫了,肯定會(huì)被老鳥們無(wú)情嘲諷,就這?
因?yàn)樯厦娴拇a忽略了一個(gè)最大的問(wèn)題,就是給@Async異步框架自定義線程池。
為什么要給@Async自定義線程池?
使用@Async注解,在默認(rèn)情況下用的是SimpleAsyncTaskExecutor線程池,該線程池不是真正意義上的線程池。
使用此線程池?zé)o法實(shí)現(xiàn)線程重用,每次調(diào)用都會(huì)新建一條線程。若系統(tǒng)中不斷的創(chuàng)建線程,最終會(huì)導(dǎo)致系統(tǒng)占用內(nèi)存過(guò)高,引發(fā)OutOfMemoryError錯(cuò)誤,關(guān)鍵代碼如下:
public?void?execute(Runnable?task,?long?startTimeout)?{
??Assert.notNull(task,?"Runnable?must?not?be?null");
??Runnable?taskToUse?=?this.taskDecorator?!=?null???this.taskDecorator.decorate(task)?:?task;
??//判斷是否開啟限流,默認(rèn)為否
??if?(this.isThrottleActive()?&&?startTimeout?>?0L)?{
????//執(zhí)行前置操作,進(jìn)行限流
????this.concurrencyThrottle.beforeAccess();
????this.doExecute(new?SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(taskToUse));
??}?else?{
????//未限流的情況,執(zhí)行線程任務(wù)
????this.doExecute(taskToUse);
??}
}
protected?void?doExecute(Runnable?task)?{
??//不斷創(chuàng)建線程
??Thread?thread?=?this.threadFactory?!=?null???this.threadFactory.newThread(task)?:?this.createThread(task);
??thread.start();
}
//創(chuàng)建線程
public?Thread?createThread(Runnable?runnable)?{
??//指定線程名,task-1,task-2...
??Thread?thread?=?new?Thread(this.getThreadGroup(),?runnable,?this.nextThreadName());
??thread.setPriority(this.getThreadPriority());
??thread.setDaemon(this.isDaemon());
??return?thread;
}
我們也可以直接通過(guò)上面的控制臺(tái)日志觀察,每次打印的線程名都是[task-1]、[task-2]、[task-3]、[task-4].....遞增的。
正因如此,所以我們?cè)谑褂肧pring中的@Async異步框架時(shí)一定要自定義線程池,替代默認(rèn)的SimpleAsyncTaskExecutor。
Spring提供了多種線程池:
SimpleAsyncTaskExecutor:不是真的線程池,這個(gè)類不重用線程,每次調(diào)用都會(huì)創(chuàng)建一個(gè)新的線程。
SyncTaskExecutor:這個(gè)類沒(méi)有實(shí)現(xiàn)異步調(diào)用,只是一個(gè)同步操作。只適用于不需要多線程的地
ConcurrentTaskExecutor:Executor的適配類,不推薦使用。如果ThreadPoolTaskExecutor不滿足要求時(shí),才用考慮使用這個(gè)類
ThreadPoolTaskScheduler:可以使用cron表達(dá)式
ThreadPoolTaskExecutor:最常使用,推薦。其實(shí)質(zhì)是對(duì)java.util.concurrent.ThreadPoolExecutor的包裝
為@Async實(shí)現(xiàn)一個(gè)自定義線程池
@Configuration
@EnableAsync
public?class?SyncConfiguration?{
????@Bean(name?=?"asyncPoolTaskExecutor")
????public?ThreadPoolTaskExecutor?executor()?{
????????ThreadPoolTaskExecutor?taskExecutor?=?new?ThreadPoolTaskExecutor();
????????//核心線程數(shù)
????????taskExecutor.setCorePoolSize(10);
????????//線程池維護(hù)線程的最大數(shù)量,只有在緩沖隊(duì)列滿了之后才會(huì)申請(qǐng)超過(guò)核心線程數(shù)的線程
????????taskExecutor.setMaxPoolSize(100);
????????//緩存隊(duì)列
????????taskExecutor.setQueueCapacity(50);
????????//許的空閑時(shí)間,當(dāng)超過(guò)了核心線程出之外的線程在空閑時(shí)間到達(dá)之后會(huì)被銷毀
????????taskExecutor.setKeepAliveSeconds(200);
????????//異步方法內(nèi)部線程名稱
????????taskExecutor.setThreadNamePrefix("async-");
????????/**
?????????*?當(dāng)線程池的任務(wù)緩存隊(duì)列已滿并且線程池中的線程數(shù)目達(dá)到maximumPoolSize,如果還有任務(wù)到來(lái)就會(huì)采取任務(wù)拒絕策略
?????????*?通常有以下四種策略:
?????????* ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。
?????????* ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
?????????* ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程)
?????????* ThreadPoolExecutor.CallerRunsPolicy:重試添加當(dāng)前的任務(wù),自動(dòng)重復(fù)調(diào)用 execute()?方法,直到成功
?????????*/
????????taskExecutor.setRejectedExecutionHandler(new?ThreadPoolExecutor.CallerRunsPolicy());
????????taskExecutor.initialize();
????????return?taskExecutor;
????}
}
配置自定義線程池以后我們就可以大膽的使用@Async提供的異步處理能力了。
多個(gè)線程池處理
在現(xiàn)實(shí)的互聯(lián)網(wǎng)項(xiàng)目開發(fā)中,針對(duì)高并發(fā)的請(qǐng)求,一般的做法是高并發(fā)接口單獨(dú)線程池隔離處理。
假設(shè)現(xiàn)在2個(gè)高并發(fā)接口:一個(gè)是修改用戶信息接口,刷新用戶redis緩存;一個(gè)是下訂單接口,發(fā)送app push信息。往往會(huì)根據(jù)接口特征定義兩個(gè)線程池,這時(shí)候我們?cè)谑褂?code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(255, 93, 108);">@Async時(shí)就需要通過(guò)指定線程池名稱進(jìn)行區(qū)分。
為@Async指定線程池名字
@SneakyThrows
@Async("asyncPoolTaskExecutor")
public?void?doTask1()?{
??long?t1?=?System.currentTimeMillis();
??Thread.sleep(2000);
??long?t2?=?System.currentTimeMillis();
??log.info("task1?cost?{}?ms"?,?t2-t1);
}
當(dāng)系統(tǒng)存在多個(gè)線程池時(shí),我們也可以配置一個(gè)默認(rèn)線程池,對(duì)于非默認(rèn)的異步任務(wù)再通過(guò)@Async("otherTaskExecutor")來(lái)指定線程池名稱。
配置默認(rèn)線程池
可以修改配置類讓其實(shí)現(xiàn)AsyncConfigurer,并重寫getAsyncExecutor()方法,指定默認(rèn)線程池:
@Configuration
@EnableAsync
@Slf4j
public?class?AsyncConfiguration?implements?AsyncConfigurer?{
????@Bean(name?=?"asyncPoolTaskExecutor")
????public?ThreadPoolTaskExecutor?executor()?{
????????ThreadPoolTaskExecutor?taskExecutor?=?new?ThreadPoolTaskExecutor();
????????//核心線程數(shù)
????????taskExecutor.setCorePoolSize(2);
????????//線程池維護(hù)線程的最大數(shù)量,只有在緩沖隊(duì)列滿了之后才會(huì)申請(qǐng)超過(guò)核心線程數(shù)的線程
????????taskExecutor.setMaxPoolSize(10);
????????//緩存隊(duì)列
????????taskExecutor.setQueueCapacity(50);
????????//許的空閑時(shí)間,當(dāng)超過(guò)了核心線程出之外的線程在空閑時(shí)間到達(dá)之后會(huì)被銷毀
????????taskExecutor.setKeepAliveSeconds(200);
????????//異步方法內(nèi)部線程名稱
????????taskExecutor.setThreadNamePrefix("async-");
????????/**
?????????*?當(dāng)線程池的任務(wù)緩存隊(duì)列已滿并且線程池中的線程數(shù)目達(dá)到maximumPoolSize,如果還有任務(wù)到來(lái)就會(huì)采取任務(wù)拒絕策略
?????????*?通常有以下四種策略:
?????????* ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。
?????????* ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
?????????* ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程)
?????????* ThreadPoolExecutor.CallerRunsPolicy:重試添加當(dāng)前的任務(wù),自動(dòng)重復(fù)調(diào)用 execute()?方法,直到成功
?????????*/
????????taskExecutor.setRejectedExecutionHandler(new?ThreadPoolExecutor.CallerRunsPolicy());
????????taskExecutor.initialize();
????????return?taskExecutor;
????}
????/**
?????*?指定默認(rèn)線程池
?????*/
????@Override
????public?Executor?getAsyncExecutor()?{
????????return?executor();
????}
????@Override
????public?AsyncUncaughtExceptionHandler?getAsyncUncaughtExceptionHandler()?{
????????return?(ex,?method,?params)?->
????????????log.error("線程池執(zhí)行任務(wù)發(fā)送未知錯(cuò)誤,執(zhí)行方法:{}",method.getName(),ex);
????}
}
如下,doTask1()方法使用默認(rèn)使用線程池asyncPoolTaskExecutor,doTask2()使用線程池otherTaskExecutor,非常靈活。
@Async
public?void?doTask1()?{
??long?t1?=?System.currentTimeMillis();
??Thread.sleep(2000);
??long?t2?=?System.currentTimeMillis();
??log.info("task1?cost?{}?ms"?,?t2-t1);
}
@SneakyThrows
@Async("otherTaskExecutor")
public?void?doTask2()?{
??long?t1?=?System.currentTimeMillis();
??Thread.sleep(3000);
??long?t2?=?System.currentTimeMillis();
??log.info("task2?cost?{}?ms"?,?t2-t1);
}
小結(jié)
@Async異步方法在日常開發(fā)中經(jīng)常會(huì)用到,大家好好掌握,爭(zhēng)取早日成為老鳥!!!
程序汪資料鏈接
程序汪接的7個(gè)私活都在這里,經(jīng)驗(yàn)整理
Java項(xiàng)目分享 最新整理全集,找項(xiàng)目不累啦 06版
堪稱神級(jí)的Spring Boot手冊(cè),從基礎(chǔ)入門到實(shí)戰(zhàn)進(jìn)階
臥槽!字節(jié)跳動(dòng)《算法中文手冊(cè)》火了,完整版 PDF 開放下載!
臥槽!阿里大佬總結(jié)的《圖解Java》火了,完整版PDF開放下載!
字節(jié)跳動(dòng)總結(jié)的設(shè)計(jì)模式 PDF 火了,完整版開放下載!
歡迎添加程序汪個(gè)人微信 itwang009? 進(jìn)粉絲群或圍觀朋友圈

