SpringBoot異步接口實(shí)現(xiàn):提高系統(tǒng)的吞吐量
共 8881字,需瀏覽 18分鐘
·
2024-08-15 07:33
閱讀本文大概需要 5 分鐘。
來(lái)自:juejin.cn/post/7367186272849788962
前言
ResponseBodyEmitter、SseEmitter、StreamingResponseBody,不在本文介紹內(nèi),之后新寫(xiě)文章介紹):
-
AsyncContext -
Callable -
WebAsyncTask -
DeferredResult
AsyncContext是Servlet層級(jí)的,比較原生的方式,本文不對(duì)此介紹(一般都不使用它,太麻煩了)。本文著重介紹后面三種方式。
特別說(shuō)明:服務(wù)端的異步或同步對(duì)于客戶(hù)端而言是不可見(jiàn)的。不會(huì)因?yàn)榉?wù)端使用了異步,接口的結(jié)果就和同步不一樣了。另外,對(duì)于單個(gè)請(qǐng)求而言,使用異步接口會(huì)導(dǎo)致響應(yīng)時(shí)間比同步大,但不特別明顯。具體后文分析。
基于Callable實(shí)現(xiàn)
java.util.concurrent.Callable包裝的任何值,都表示該接口是一個(gè)異步接口:
@GetMapping("/testCallAble")
public Callable<String> testCallAble() {
return () -> {
Thread.sleep(40000);
return "hello";
};
}
Callable 處理過(guò)程如下:
Callable 。
-
Spring MVC 調(diào)用 request.startAsync()并將 Callable 提交給AsyncTaskExecutor以在單獨(dú)的線(xiàn)程中進(jìn)行處理。 -
同時(shí), DispatcherServlet和所有過(guò)濾器退出 Servlet 容器線(xiàn)程,但response保持打開(kāi)狀態(tài)。 -
最終 Callable 產(chǎn)生結(jié)果,Spring MVC將請(qǐng)求分派回Servlet容器以完成處理。 -
再次調(diào)用 DispatcherServlet,并使用 Callable 異步生成的返回值繼續(xù)處理。
Callable默認(rèn)使用SimpleAsyncTaskExecutor類(lèi)來(lái)執(zhí)行,這個(gè)類(lèi)非常簡(jiǎn)單而且沒(méi)有重用線(xiàn)程。在實(shí)踐中,需要使用AsyncTaskExecutor類(lèi)來(lái)對(duì)線(xiàn)程進(jìn)行配置。
基于WebAsyncTask實(shí)現(xiàn)
WebAsyncTask是對(duì)Callable的包裝,提供了更強(qiáng)大的功能,比如:處理超時(shí)回調(diào)、錯(cuò)誤回調(diào)、完成回調(diào)等。本質(zhì)上,和Callable區(qū)別不大,但是由于它額外封裝了一些事件的回調(diào),所有,通常都使用WebAsyncTask而不是Callable:
@GetMapping("/webAsyncTask")
public WebAsyncTask<String> webAsyncTask() {
WebAsyncTask<String> result = new WebAsyncTask<>(30003, () -> {
return "success";
});
result.onTimeout(() -> {
log.info("timeout callback");
return "timeout callback";
});
result.onCompletion(() -> log.info("finish callback"));
return result;
}
WebAsyncTask可以配置一個(gè)超時(shí)時(shí)間,這里配置的超時(shí)時(shí)間比全局配置的超時(shí)時(shí)間優(yōu)先級(jí)都高(會(huì)覆蓋全局配置的超時(shí)時(shí)間)。
基于DeferredResult實(shí)現(xiàn)
DeferredResult使用方式與Callable類(lèi)似,但在返回結(jié)果時(shí)不一樣,它返回的時(shí)實(shí)際結(jié)果可能沒(méi)有生成,實(shí)際的結(jié)果可能會(huì)在另外的線(xiàn)程里面設(shè)置到DeferredResult中去。
//定義一個(gè)全局的變量,用來(lái)存儲(chǔ)DeferredResult對(duì)象
private Map<String, DeferredResult<String>> deferredResultMap = new ConcurrentHashMap<>();
@GetMapping("/testDeferredResult")
public DeferredResult<String> testDeferredResult(){
DeferredResult<String> deferredResult = new DeferredResult<>();
deferredResultMap.put("test", deferredResult);
return deferredResult;
}
DeferredResult對(duì)象存放在了一個(gè)Map集合中,實(shí)際應(yīng)用中可以設(shè)計(jì)一個(gè)對(duì)象管理器來(lái)統(tǒng)一管理這些個(gè)對(duì)象。
注意:要考慮定時(shí)輪詢(xún)(或其他方式)這些對(duì)象,將已經(jīng)處理過(guò)或無(wú)效的 DeferredResult對(duì)象清理掉(DeferredResult.isSetOrExpired方法可以判斷是否還有效),避免內(nèi)存泄露。
@GetMapping("/testSetDeferredResult")
public String testSetDeferredResult() throws InterruptedException {
DeferredResult<String> deferredResult = deferredResultMap.get("test");
boolean flag = deferredResult.setResult("testSetDeferredResult");
if(!flag){
log.info("結(jié)果已經(jīng)被處理,此次操作無(wú)效");
}
return "ok";
}
DeferredResult的值:首先是從之前存放DeferredResult的map中拿到DeferredResult的值,然后設(shè)置它的返回值。當(dāng)執(zhí)行deferredResult.setResult之后,可以看到之前pending狀態(tài)的接口完成了響應(yīng),得到的結(jié)果,就是這里設(shè)置的值。
DeferredResult時(shí)也可以設(shè)置超時(shí)時(shí)間,這個(gè)時(shí)間的優(yōu)先級(jí)也是大于全局設(shè)置的。另外,判斷DeferredResult是否有效,只是一個(gè)簡(jiǎn)單的判斷,實(shí)際中判斷有效的并不一定是有效的(比如:客戶(hù)端取消了請(qǐng)求,服務(wù)端是不知道的),但是一般判斷為無(wú)效的,那肯定是無(wú)效了。
DeferredResult 處理過(guò)程如下:
-
控制器返回一個(gè) DeferredResult并將其保存在可以訪(fǎng)問(wèn)的內(nèi)存隊(duì)列或列表中。 -
Spring MVC 調(diào)用 request.startAsync()。 -
同時(shí), DispatcherServlet和所有配置的過(guò)濾器退出請(qǐng)求處理線(xiàn)程,但響應(yīng)保持打開(kāi)狀態(tài)。 -
應(yīng)用程序從某個(gè)線(xiàn)程設(shè)置 DeferredResult,Spring MVC 將請(qǐng)求分派回 Servlet 容器。 -
再次調(diào)用 DispatcherServlet,并使用異步生成的返回值繼續(xù)處理。
提供一個(gè)線(xiàn)程池
@Bean("mvcAsyncTaskExecutor")
public AsyncTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 線(xiàn)程池維護(hù)線(xiàn)程的最少數(shù)量
// asyncServiceExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors() + 1);
executor.setCorePoolSize(5);
// 線(xiàn)程池維護(hù)線(xiàn)程的最大數(shù)量
executor.setMaxPoolSize(10);
// 線(xiàn)程池所使用的緩沖隊(duì)列
executor.setQueueCapacity(10);
// asyncServiceExecutor.prefersShortLivedTasks();
executor.setThreadNamePrefix("fyk-mvcAsyncTask-Thread-");
asyncServiceExecutor.setBeanName("TaskId" + taskId);
// asyncServiceExecutor.setKeepAliveSeconds(20);
//調(diào)用者執(zhí)行
// asyncServiceExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 線(xiàn)程全部結(jié)束才關(guān)閉線(xiàn)程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 如果超過(guò)60s還沒(méi)有銷(xiāo)毀就強(qiáng)制銷(xiāo)毀,以確保應(yīng)用最后能夠被關(guān)閉,而不是阻塞住
executor.setAwaitTerminationSeconds(30);
executor.initialize();
return executor;
}
@Configuration
public class FykWebMvcConfigurer implements WebMvcConfigurer {
@Autowired
@Qualifier("mvcAsyncTaskExecutor")
private AsyncTaskExecutor asyncTaskExecutor;
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
//異步操作的超時(shí)時(shí)間,值為0或者更小,表示永不超時(shí)
configurer.setDefaultTimeout(60001);
configurer.setTaskExecutor(asyncTaskExecutor);
}
}
什么時(shí)候使用異步請(qǐng)求
推薦閱讀:
船新 IDEA 2024.2 正式發(fā)布,新特性真香!
如何為開(kāi)放平臺(tái)設(shè)計(jì)一個(gè)安全好用的OpenApi
程序員在線(xiàn)工具站:cxytools.com 推薦一個(gè)自己寫(xiě)的工具站:http://cxytools.com,專(zhuān)為程序員設(shè)計(jì),包括時(shí)間日期、JSON處理、SQL格式化、隨機(jī)字符串生成、UUID生成、文本Hash...等功能,提升開(kāi)發(fā)效率。
?戳閱讀原文直達(dá)! 朕已閱 ![]()
評(píng)論
圖片
表情
