SpringBoot開發(fā)秘籍 - 事件異步處理

在項目實際開發(fā)過程中,我們有很多這樣的業(yè)務(wù)場景:一個事務(wù)中處理完一個業(yè)務(wù)邏輯后需要跟著處理另外一個業(yè)務(wù)邏輯,偽碼大致如下:
@Service
public?class?ProductServiceImpl?{
?...
????public?void?saveProduct(Product?product)?{
????????productMapper.saveOrder(product);
????????notifyService.notify(product);
????}
?...
}
很簡單并且很常見的一段業(yè)務(wù)邏輯:首先將產(chǎn)品先保存數(shù)據(jù)庫,然后發(fā)送通知。
某一天你們可能需要把新增的產(chǎn)品存到Es中,這時候也需要代碼可能變成這樣:
@Service
public?class?ProductServiceImpl?{
?...
????public?void?saveProduct(Product?product)?{
????????productMapper.saveProduct(product);
????????esService.saveProduct(product)
????????notifyService.notify(product);
????}
?...
}
隨著業(yè)務(wù)需求的變化,代碼也需要跟著一遍遍的修改。而且還會存在另外一個問題,如果通知系統(tǒng)掛了,那就不能再新增產(chǎn)品了。
對于上面這種情況非常適合引入消息中間件(消息隊列)來對業(yè)務(wù)進行解耦,但并非所有的業(yè)務(wù)系統(tǒng)都會引入消息中間件(引入會第三方架構(gòu)組件會帶來很大的運維成本)。
Spring提供了事件驅(qū)動機制可以幫助我們實現(xiàn)這一需求。
Spring事件驅(qū)動
spring事件驅(qū)動由3個部分組成
ApplicationEvent:表示事件本身,自定義事件需要繼承該類,用來定義事件
ApplicationEventPublisher:事件發(fā)送器,主要用來發(fā)布事件
ApplicationListener:事件監(jiān)聽器接口,監(jiān)聽類實現(xiàn)ApplicationListener 里onApplicationEvent方法即可,也可以在方法上增加@EventListener以實現(xiàn)事件監(jiān)聽。
實現(xiàn)Spring事件驅(qū)動一般只需要三步:
自定義需要發(fā)布的事件類,需要繼承ApplicationEvent類 使用ApplicationEventPublisher來發(fā)布自定義事件 使用@EventListener來監(jiān)聽事件
「這里需要特別注意一點,默認(rèn)情況下事件是同步的。即事件被publish后會等待Listener的處理。如果發(fā)布事件處的業(yè)務(wù)存在事務(wù),監(jiān)聽器處理也會在相同的事務(wù)中。如果需要異步處理事件,可以onApplicationEvent方法上加@Aync支持異步或在有@EventListener的注解方法上加上@Aync。」

源碼實戰(zhàn)
創(chuàng)建事件
public?class?ProductEvent?extends?ApplicationEvent?{
????public?ProductEvent(Product?product)?{
????????super(product);
????}
}
發(fā)布事件
@Service
public?class?ProductServiceImpl?implements?IproductService?{
?...
????@Autowired
????private?ApplicationEventPublisher?publisher;
?
????@Override
????@Transactional(rollbackFor?=?Exception.class)
????public?void?saveProduct(Product?product)?{
??productMapper.saveProduct(product);?
????????//事件發(fā)布
????????publisher.publishEvent(product);
????}
????...
}
事件監(jiān)聽
@Slf4j
@AllArgsConstructor
public?class?ProductListener?{
?private?final?NotifyService?notifyServcie;
?@Async
?@Order
?@EventListener(ProductEvent.class)
?public?void?notify(ProductEvent?event)?{
??Product?product?=?(Product)?event.getSource();
??notifyServcie.notify(product,?"product");
?}
}
在SpringBoot啟動類上增加 @EnableAsync注解
@Slf4j
@EnableSwagger2
@SpringBootApplication
@EnableAsync
public?class?ApplicationBootstrap?{
...
}
使用了Async后會使用默認(rèn)的線程池SimpleAsyncTaskExecutor,一般我們會在項目中自定義一個線程池。
@Configuration
public?class?ExecutorConfig?{
????/**?核心線程數(shù)?*/
????private?int?corePoolSize?=?10;
????/**?最大線程數(shù)??*/
????private?int?maxPoolSize?=?50;
????/**?隊列大小??*/
????private?int?queueCapacity?=?10;
????/**?線程最大空閑時間???*/
????private?int?keepAliveSeconds?=?150;
????@Bean("customExecutor")
????public?Executor?myExecutor()?{
????????ThreadPoolTaskExecutor?executor?=?new?ThreadPoolTaskExecutor();
????????executor.setCorePoolSize(corePoolSize);
????????executor.setMaxPoolSize(maxPoolSize);
????????executor.setQueueCapacity(queueCapacity);
????????executor.setThreadNamePrefix("customExecutor-");
????????executor.setKeepAliveSeconds(keepAliveSeconds);
????????// rejection-policy:當(dāng)pool已經(jīng)達到max size的時候,如何處理新任務(wù)
????????// CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是由調(diào)用者所在的線程來執(zhí)行
????????executor.setRejectedExecutionHandler(new?ThreadPoolExecutor.CallerRunsPolicy());
????????executor.initialize();
????????return?executor;
????}
}

3.?大型網(wǎng)站架構(gòu)演化發(fā)展歷程
8. 深入理解 MySQL:快速學(xué)會分析SQL執(zhí)行效率

掃碼二維碼關(guān)注我
·end·
—如果本文有幫助,請分享到朋友圈吧—
我們一起愉快的玩耍!

