用了SpringBoot的發(fā)布訂閱模式,代碼看著高級多了
大家好,我是小富~
在項(xiàng)目里,經(jīng)常會有一些主線業(yè)務(wù)之外的其它業(yè)務(wù),比如,下單之后,發(fā)送通知、監(jiān)控埋點(diǎn)、記錄日志……
這些非核心業(yè)務(wù),如果全部一梭子寫下去,有兩個(gè)問題,一個(gè)是業(yè)務(wù)耦合,一個(gè)是串行耗時(shí)。
下單之后的邏輯所以,一般在開發(fā)的時(shí)候,都會把這些操作?抽象成觀察者模式,也就是發(fā)布/訂閱模式(這里就不討論觀察者模式和發(fā)布/訂閱模式的不同),而且一般會采用多線程的方式來異步執(zhí)行這些觀察者方法。
觀察者模式一開始,我們都是自己去寫觀察者模式。
自己實(shí)現(xiàn)觀察者模式
觀察者簡圖觀察者
- 觀察者定義接口
/**
?*?@Author:?fighter3
?*?@Description:?觀察者接口
?*?@Date:?2022/11/7?11:40?下午
?*/
public?interface?OrderObserver?{
????void?afterPlaceOrder(PlaceOrderMessage?placeOrderMessage);
}
-
具體觀察者
@Slf4j
public?class?OrderMetricsObserver?implements?OrderObserver?{
????@Override
????public?void?afterPlaceOrder(PlaceOrderMessage?placeOrderMessage)?{
????????log.info("[afterPlaceOrder]?metrics");
????}
}@Slf4j
public?class?OrderLogObserver?implements?OrderObserver{
????@Override
????public?void?afterPlaceOrder(PlaceOrderMessage?placeOrderMessage)?{
????????log.info("[afterPlaceOrder]?log.");
????}
}@Slf4j
public?class?OrderNotifyObserver?implements?OrderObserver{
????@Override
????public?void?afterPlaceOrder(PlaceOrderMessage?placeOrderMessage)?{
????????log.info("[afterPlaceOrder]?notify.");
????}
}- 業(yè)務(wù)通知觀察者
- 日志記錄觀察者
- 監(jiān)控埋點(diǎn)觀察者
被觀察者
- 消息實(shí)體定義
@Data
public?class?PlaceOrderMessage?implements?Serializable?{
????/**
?????*?訂單號
?????*/
????private?String?orderId;
????/**
?????*?訂單狀態(tài)
?????*/
????private?Integer?orderStatus;
????/**
?????*?下單用戶ID
?????*/
????private?String?userId;
????//……
}
- 被觀察者抽象類
public?abstract?class?OrderSubject?{
????//定義一個(gè)觀察者列表
????private?List<OrderObserver>?orderObserverList?=?new?ArrayList<>();
????//定義一個(gè)線程池,這里參數(shù)隨便寫的
????ThreadPoolExecutor?threadPoolExecutor?=?new?ThreadPoolExecutor(6,?12,?6,?TimeUnit.SECONDS,?new?ArrayBlockingQueue<>(30));
????//增加一個(gè)觀察者
????public?void?addObserver(OrderObserver?o)?{
????????this.orderObserverList.add(o);
????}
????//刪除一個(gè)觀察者
????public?void?delObserver(OrderObserver?o)?{
????????this.orderObserverList.remove(o);
????}
????//通知所有觀察者
????public?void?notifyObservers(PlaceOrderMessage?placeOrderMessage)?{
????????for?(OrderObserver?orderObserver?:?orderObserverList)?{
????????????//利用多線程異步執(zhí)行
????????????threadPoolExecutor.execute(()?->?{
????????????????orderObserver.afterPlaceOrder(placeOrderMessage);
????????????});
????????}
????}
}
這里利用了多線程,來異步執(zhí)行觀察者。
- 被觀察者實(shí)現(xiàn)類
/**
?*?@Author:?fighter3
?*?@Description:?訂單實(shí)現(xiàn)類-被觀察者實(shí)現(xiàn)類
?*?@Date:?2022/11/7?11:52?下午
?*/
@Service
@Slf4j
public?class?OrderServiceImpl?extends?OrderSubject?implements?OrderService?{
????/**
?????*?下單
?????*/
????@Override
????public?PlaceOrderResVO?placeOrder(PlaceOrderReqVO?reqVO)?{
????????PlaceOrderResVO?resVO?=?new?PlaceOrderResVO();
????????//添加觀察者
????????this.addObserver(new?OrderMetricsObserver());
????????this.addObserver(new?OrderLogObserver());
????????this.addObserver(new?OrderNotifyObserver());
????????//通知觀察者
????????this.notifyObservers(new?PlaceOrderMessage());
????????log.info("[placeOrder]?end.");
????????return?resVO;
????}
}
測試
????@Test
????@DisplayName("下單")
????void?placeOrder()?{
????????PlaceOrderReqVO?placeOrderReqVO?=?new?PlaceOrderReqVO();
????????orderService.placeOrder(placeOrderReqVO);
????}
- 測試執(zhí)行結(jié)果
2022-11-08?00:11:13.617??INFO?20235?---?[pool-1-thread-1]?c.f.obverser.OrderMetricsObserver????????:?[afterPlaceOrder]?metrics
2022-11-08?00:11:13.618??INFO?20235?---?[???????????main]?cn.fighter3.obverser.OrderServiceImpl????:?[placeOrder]?end.
2022-11-08?00:11:13.618??INFO?20235?---?[pool-1-thread-3]?c.fighter3.obverser.OrderNotifyObserver??:?[afterPlaceOrder]?notify.
2022-11-08?00:11:13.617??INFO?20235?---?[pool-1-thread-2]?cn.fighter3.obverser.OrderLogObserver????:?[afterPlaceOrder]?log.
可以看到,觀察者是異步執(zhí)行的。
利用Spring精簡可以看到,觀察者模式寫起來還是比較簡單的,但是既然都用到了Spring來管理Bean的生命周期,代碼還可以更精簡一些。
Spring精簡觀察者模式觀察者實(shí)現(xiàn)類:定義成Bean
-
OrderLogObserver
@Slf4j
@Service
public?class?OrderLogObserver?implements?OrderObserver?{
????@Override
????public?void?afterPlaceOrder(PlaceOrderMessage?placeOrderMessage)?{
????????log.info("[afterPlaceOrder]?log.");
????}
} -
OrderMetricsObserver
@Slf4j
@Service
public?class?OrderMetricsObserver?implements?OrderObserver?{
????@Override
????public?void?afterPlaceOrder(PlaceOrderMessage?placeOrderMessage)?{
????????log.info("[afterPlaceOrder]?metrics");
????}
}
- OrderNotifyObserver
@Slf4j
@Service
public?class?OrderNotifyObserver?implements?OrderObserver?{
????@Override
????public?void?afterPlaceOrder(PlaceOrderMessage?placeOrderMessage)?{
????????log.info("[afterPlaceOrder]?notify.");
????}
}
被觀察者:自動注入Bean
-
OrderSubject
public?abstract?class?OrderSubject?{
????/**
?????*?利用Spring的特性直接注入觀察者
?????*/
????@Autowired
????protected?List<OrderObserver>?orderObserverList;
????//定義一個(gè)線程池,這里參數(shù)隨便寫的
????ThreadPoolExecutor?threadPoolExecutor?=?new?ThreadPoolExecutor(6,?12,?6,?TimeUnit.SECONDS,?new?ArrayBlockingQueue<>(30));
????//通知所有觀察者
????public?void?notifyObservers(PlaceOrderMessage?placeOrderMessage)?{
????????for?(OrderObserver?orderObserver?:?orderObserverList)?{
????????????//利用多線程異步執(zhí)行
????????????threadPoolExecutor.execute(()?->?{
????????????????orderObserver.afterPlaceOrder(placeOrderMessage);
????????????});
????????}
????}
} -
OrderServiceImpl
@Service
@Slf4j
public?class?OrderServiceImpl?extends?OrderSubject?implements?OrderService?{
????/**
?????*?實(shí)現(xiàn)類里也要注入一下
?????*/
????@Autowired
????private?List<OrderObserver>?orderObserverList;
????/**
?????*?下單
?????*/
????@Override
????public?PlaceOrderResVO?placeOrder(PlaceOrderReqVO?reqVO)?{
????????PlaceOrderResVO?resVO?=?new?PlaceOrderResVO();
????????//通知觀察者
????????this.notifyObservers(new?PlaceOrderMessage());
????????log.info("[placeOrder]?end.");
????????return?resVO;
????}
}
這樣一來,發(fā)現(xiàn)被觀察者又簡潔了很多,但是后來我發(fā)現(xiàn),在SpringBoot項(xiàng)目里,利用Spring事件驅(qū)動驅(qū)動模型(event)模型來實(shí)現(xiàn),更加地簡練。
Spring Event實(shí)現(xiàn)發(fā)布/訂閱模式Spring Event對發(fā)布/訂閱模式進(jìn)行了封裝,使用起來更加簡單,還是以我們這個(gè)場景為例,看看怎么來實(shí)現(xiàn)吧。
自定義事件
- PlaceOrderEvent:繼承ApplicationEvent,并重寫構(gòu)造函數(shù)。ApplicationEvent是Spring提供的所有應(yīng)用程序事件擴(kuò)展類。
public?class?PlaceOrderEvent?extends?ApplicationEvent?{
????public?PlaceOrderEvent(PlaceOrderEventMessage?source)?{
????????super(source);
????}
}
- PlaceOrderEventMessage:事件消息,定義了事件的消息體。
@Data
public?class?PlaceOrderEventMessage?implements?Serializable?{
????/**
?????*?訂單號
?????*/
????private?String?orderId;
????/**
?????*?訂單狀態(tài)
?????*/
????private?Integer?orderStatus;
????/**
?????*?下單用戶ID
?????*/
????private?String?userId;
????//……
}
事件監(jiān)聽者
事件監(jiān)聽者,有兩種實(shí)現(xiàn)方式,一種是實(shí)現(xiàn)ApplicationListener接口,另一種是使用@EventListener注解。
事件監(jiān)聽者實(shí)現(xiàn)實(shí)現(xiàn)ApplicationListener接口
實(shí)現(xiàn)ApplicationListener接口,重寫onApplicationEvent方法,將類定義為Bean,這樣,一個(gè)監(jiān)聽者就完成了。
- OrderLogListener
@Slf4j
@Service
public?class?OrderLogListener?implements?ApplicationListener<PlaceOrderEvent>?{
????@Override
????public?void?onApplicationEvent(PlaceOrderEvent?event)?{
????????log.info("[afterPlaceOrder]?log.");
????}
}
- OrderMetricsListener
@Slf4j
@Service
public?class?OrderMetricsListener?implements?ApplicationListener<PlaceOrderEvent>?{
????@Override
????public?void?onApplicationEvent(PlaceOrderEvent?event)?{
????????log.info("[afterPlaceOrder]?metrics");
????}
}
- OrderNotifyListener
@Slf4j
@Service
public?class?OrderNotifyListener?implements?ApplicationListener<PlaceOrderEvent>?{
????@Override
????public?void?onApplicationEvent(PlaceOrderEvent?event)?{
????????log.info("[afterPlaceOrder]?notify.");
????}
}
使用@EventListener注解
使用@EventListener注解就更簡單了,直接在方法上,加上@EventListener注解就行了。
-
OrderLogListener
@Slf4j
@Service
public?class?OrderLogListener??{
????@EventListener
????public?void?orderLog(PlaceOrderEvent?event)?{
????????log.info("[afterPlaceOrder]?log.");
????}
} -
OrderMetricsListener
@Slf4j
@Service
public?class?OrderMetricsListener?{
????@EventListener
????public?void?metrics(PlaceOrderEvent?event)?{
????????log.info("[afterPlaceOrder]?metrics");
????}
} -
OrderNotifyListener
@Slf4j
@Service
public?class?OrderNotifyListener{
????@EventListener
????public?void?notify(PlaceOrderEvent?event)?{
????????log.info("[afterPlaceOrder]?notify.");
????}
}
異步和自定義線程池
異步執(zhí)行
異步執(zhí)行也非常簡單,使用Spring的異步注解@Async就可以了。例如:
- OrderLogListener
@Slf4j
@Service
public?class?OrderLogListener??{
????@EventListener
????@Async
????public?void?orderLog(PlaceOrderEvent?event)?{
????????log.info("[afterPlaceOrder]?log.");
????}
}
當(dāng)然,還需要開啟異步,SpringBoot項(xiàng)目默認(rèn)是沒有開啟異步的,我們需要手動配置開啟異步功能,很簡單,只需要在配置類上加上@EnableAsync注解就行了,這個(gè)注解用于聲明啟用Spring的異步方法執(zhí)行功能,需要和@Configuration注解一起使用,也可以直接加在啟動類上。
@SpringBootApplication
@EnableAsync
public?class?DailyApplication?{
????public?static?void?main(String[]?args)?{
????????SpringApplication.run(DairlyLearnApplication.class,?args);
????}
}
自定義線程池
使用@Async的時(shí)候,一般都會自定義線程池,因?yàn)?code style="font-size:14px;background-color:rgba(27,31,35,.05);font-family:'Operator Mono', Consolas, Monaco, Menlo, monospace;color:rgb(40,202,113);">@Async的默認(rèn)線程池為SimpleAsyncTaskExecutor,不是真的線程池,這個(gè)類不重用線程,默認(rèn)每次調(diào)用都會創(chuàng)建一個(gè)新的線程。
自定義線程池有三種方式:
@Async自定義線程池- 實(shí)現(xiàn)接口AsyncConfigurer
- 繼承AsyncConfigurerSupport
- 配置由自定義的TaskExecutor替代內(nèi)置的任務(wù)執(zhí)行器
我們來看看三種寫法:
- 實(shí)現(xiàn)接口AsyncConfigurer
@Configuration
@Slf4j
public?class?AsyncConfiguration?implements?AsyncConfigurer?{
????@Bean("fighter3AsyncExecutor")
????public?ThreadPoolTaskExecutor?executor()?{
????????//Spring封裝的一個(gè)線程池
????????ThreadPoolTaskExecutor?executor?=?new?ThreadPoolTaskExecutor();
????????//隨便寫的一些配置
????????executor.setCorePoolSize(10);
????????executor.setMaxPoolSize(50);
????????executor.setQueueCapacity(30);
????????executor.setRejectedExecutionHandler(new?ThreadPoolExecutor.CallerRunsPolicy());
????????executor.setThreadNamePrefix("fighter3AsyncExecutor-");
????????executor.initialize();
????????return?executor;
????}
????@Override
????public?Executor?getAsyncExecutor()?{
????????return?executor();
????}
????@Override
????public?AsyncUncaughtExceptionHandler?getAsyncUncaughtExceptionHandler()?{
????????return?(ex,?method,?params)?->?log.error(String.format("[async]?task{}?error:",?method),?ex);
????}
}
- 繼承AsyncConfigurerSupport
@Configuration
@Slf4j
public?class?SpringAsyncConfigurer?extends?AsyncConfigurerSupport?{
????@Bean
????public?ThreadPoolTaskExecutor?asyncExecutor()?{
????????ThreadPoolTaskExecutor?threadPool?=?new?ThreadPoolTaskExecutor();
????????//隨便寫的一些配置
????????threadPool.setCorePoolSize(10);
????????threadPool.setMaxPoolSize(30);
????????threadPool.setWaitForTasksToCompleteOnShutdown(true);
????????threadPool.setAwaitTerminationSeconds(60?*?15);
????????return?threadPool;
????}
????@Override
????public?Executor?getAsyncExecutor()?{
????????return?asyncExecutor();
????}
????@Override
????public?AsyncUncaughtExceptionHandler?getAsyncUncaughtExceptionHandler()?{
????????return?(ex,?method,?params)?->?log.error(String.format("[async]?task{}?error:",?method),?ex);
????}
}
-
配置自定義的TaskExecutor
@Slf4j
@Service
public?class?OrderLogListener??{
????@EventListener
????@Async("asyncExecutor")
????public?void?orderLog(PlaceOrderEvent?event)?{
????????log.info("[afterPlaceOrder]?log.");
????}
}-
配置線程池
@Configuration
public?class?TaskPoolConfig?{
????@Bean(name?=?"asyncExecutor")
????public?Executor?taskExecutor()?{
????????ThreadPoolTaskExecutor?executor?=?new?ThreadPoolTaskExecutor();
????????//隨便寫的一些配置
????????executor.setCorePoolSize(10);
????????executor.setMaxPoolSize(20);
????????executor.setQueueCapacity(200);
????????executor.setKeepAliveSeconds(60);
????????executor.setThreadNamePrefix("asyncExecutor-");
????????executor.setRejectedExecutionHandler(new?ThreadPoolExecutor.CallerRunsPolicy());
????????return?executor;
????}
} -
使用@Async注解的時(shí)候,指定線程池,推薦使用這種方式,因?yàn)樵陧?xiàng)目里,盡量做到線程池隔離,不同的任務(wù)使用不同的線程池
-
異步和自定義線程池這一部分只是一些擴(kuò)展,稍微占了一些篇幅,大家可不要覺得Spring Event用起來很繁瑣。
發(fā)布事件
發(fā)布事件也非常簡單,只需要使用Spring 提供的ApplicationEventPublisher來發(fā)布自定義事件。
-
OrderServiceImpl
@Service
@Slf4j
public?class?OrderServiceImpl?implements?OrderService?{
????@Autowired
????private?ApplicationEventPublisher?applicationEventPublisher;
????/**
?????*?下單
?????*/
????@Override
????public?PlaceOrderResVO?placeOrder(PlaceOrderReqVO?reqVO)?{
????????log.info("[placeOrder]?start.");
????????PlaceOrderResVO?resVO?=?new?PlaceOrderResVO();
????????//消息
????????PlaceOrderEventMessage?eventMessage?=?new?PlaceOrderEventMessage();
????????//發(fā)布事件
????????applicationEventPublisher.publishEvent(new?PlaceOrderEvent(eventMessage));
????????log.info("[placeOrder]?end.");
????????return?resVO;
????}
}
在Idea里查看事件的監(jiān)聽者也比較方便,點(diǎn)擊下面圖中的圖標(biāo),就可以查看監(jiān)聽者。
查看監(jiān)聽者
監(jiān)聽者測試
最后,我們還是測試一下。
????@Test
????void?placeOrder()?{
????????PlaceOrderReqVO?placeOrderReqVO?=?new?PlaceOrderReqVO();
????????orderService.placeOrder(placeOrderReqVO);
????}
- 執(zhí)行結(jié)果
2022-11-08?10:05:14.415??INFO?22674?---?[???????????main]?c.f.o.event.event.OrderServiceImpl???????:?[placeOrder]?start.
2022-11-08?10:05:14.424??INFO?22674?---?[???????????main]?c.f.o.event.event.OrderServiceImpl???????:?[placeOrder]?end.
2022-11-08?10:05:14.434??INFO?22674?---?[sync-executor-3]?c.f.o.event.event.OrderNotifyListener????:?[afterPlaceOrder]?notify.
2022-11-08?10:05:14.435??INFO?22674?---?[sync-executor-2]?c.f.o.event.event.OrderMetricsListener???:?[afterPlaceOrder]?metrics
2022-11-08?10:05:14.436??INFO?22674?---?[sync-executor-1]?c.f.o.event.event.OrderLogListener???????:?[afterPlaceOrder]?log.
可以看到,異步執(zhí)行,而且用到了我們自定義的線程池。
小結(jié)這篇文章里,從最開始自己實(shí)現(xiàn)的觀察者模式,再到利用Spring簡化的觀察者模式,再到使用Spring Event實(shí)現(xiàn)發(fā)布/訂閱模式,可以看到,Spring Event用起來還是比較簡單的。除此之外,還有Guava EventBus這樣的事件驅(qū)動實(shí)現(xiàn),大家更習(xí)慣使用哪種呢?
··········? END? ··············
在看 、 點(diǎn)贊 、 轉(zhuǎn)發(fā) ,是對我最大的鼓勵 。 ? ? ? ?