SpringBoot 使用注解實(shí)現(xiàn)消息廣播功能
背景
在開(kāi)發(fā)工作中,會(huì)遇到一種場(chǎng)景,做完某一件事情以后,需要廣播一些消息或者通知,告訴其他的模塊進(jìn)行一些事件處理,一般來(lái)說(shuō),可以一個(gè)一個(gè)發(fā)送請(qǐng)求去通知,但是有一種更好的方式,那就是事件監(jiān)聽(tīng),事件監(jiān)聽(tīng)也是設(shè)計(jì)模式中發(fā)布-訂閱模式、觀察者模式的一種實(shí)現(xiàn)。
觀察者模式:簡(jiǎn)單的來(lái)講就是你在做事情的時(shí)候身邊有人在盯著你,當(dāng)你做的某一件事情是旁邊觀察的人感興趣的事情的時(shí)候,他會(huì)根據(jù)這個(gè)事情做一些其他的事,但是盯著你看的人必須要到你這里來(lái)登記,否則你無(wú)法通知到他(或者說(shuō)他沒(méi)有資格來(lái)盯著你做事情)。
對(duì)于Spring容器的一些事件,可以監(jiān)聽(tīng)并且觸發(fā)相應(yīng)的方法。通常的方法有 2 種,ApplicationListener 接口和@EventListener 注解。
簡(jiǎn)介
要想順利的創(chuàng)建監(jiān)聽(tīng)器,并起作用,這個(gè)過(guò)程中需要這樣幾個(gè)角色:
事件 (event)可以封裝和傳遞監(jiān)聽(tīng)器中要處理的參數(shù),如對(duì)象或字符串,并作為監(jiān)聽(tīng)器中監(jiān)聽(tīng)的目標(biāo)。監(jiān)聽(tīng)器 (listener)具體根據(jù)事件發(fā)生的業(yè)務(wù)處理模塊,這里可以接收處理事件中封裝的對(duì)象或字符串。事件發(fā)布者 (publisher)事件發(fā)生的觸發(fā)者。
ApplicationListener 接口
ApplicationListener接口的定義如下:

它是一個(gè)泛型接口,泛型的類(lèi)型必須是ApplicationEvent 及其子類(lèi),只要實(shí)現(xiàn)了這個(gè)接口,那么當(dāng)容器有相應(yīng)的事件觸發(fā)時(shí),就能觸發(fā) onApplicationEvent 方法。ApplicationEvent 類(lèi)的子類(lèi)有很多,Spring 框架自帶的如下幾個(gè)。

? ?簡(jiǎn)單使用
使用方法很簡(jiǎn)單,就是實(shí)現(xiàn)一個(gè) ApplicationListener 接口,并且將加入到容器中就行。
@Component
public?class?DefinitionApplicationListener?
????????????implements?ApplicationListener<ApplicationEvent>?{
????//?-----------?簡(jiǎn)單使用-實(shí)現(xiàn)ApplicationListener?接口
????@Override
????public?void?onApplicationEvent(ApplicationEvent?event)?{
????????System.out.println("事件觸發(fā):"?+?event.getClass().getName());
????}
}
啟動(dòng)項(xiàng)目
@SpringBootApplication
public?class?EngineSupportApplication?{
????public?static?void?main(String[]?args)?{
????????SpringApplication.run(EngineSupportApplication.class);
????}
}
查看日志

2021-11-12?21:04:16.114??INFO?83691?---?[???????????main]?o.s.s.concurrent.ThreadPoolTaskExecutor??:?Initializing?ExecutorService?'applicationTaskExecutor'
事件觸發(fā):org.springframework.context.event.ContextRefreshedEvent
2021-11-12?21:04:16.263??INFO?83691?---?[???????????main]?o.s.b.w.embedded.tomcat.TomcatWebServer??:?Tomcat?started?on?port(s):?8080?(http)?with?context?path?''
事件觸發(fā):org.springframework.boot.web.servlet.context.ServletWebServerInitializedEvent
2021-11-12?21:04:16.266??INFO?83691?---?[???????????main]?com.alibaba.EngineSupportApplication?????:?Started?EngineSupportApplication?in?1.975?seconds?(JVM?running?for?3.504)
事件觸發(fā):org.springframework.boot.context.event.ApplicationStartedEvent
事件觸發(fā):org.springframework.boot.context.event.ApplicationReadyEvent
自定義事件以及監(jiān)聽(tīng)
? 定義事件
public?class?DefinitionEvent?extends?ApplicationEvent?{
????public?boolean?enable;
????/**
?????*?Create?a?new?ApplicationEvent.
?????*
?????*?@param?source?the?object?on?which?the?event?initially?occurred?(never?{@code?null})
?????*?@param?enable
?????*/
????public?DefinitionEvent(Object?source,?boolean?enable)?{
????????super(source);
????????this.enable?=?enable;
????}
? 定義監(jiān)聽(tīng)器
@Component
public?class?DefinitionApplicationListener?
????????????implements?ApplicationListener<ApplicationEvent>?{
????//?-----------?簡(jiǎn)單使用-實(shí)現(xiàn)ApplicationListener?接口
????@Override
????public?void?onApplicationEvent(ApplicationEvent?event)?{
????????System.out.println("事件觸發(fā):"?+?event.getClass().getName());
????}
????//?-----------?自定義事件以及監(jiān)聽(tīng)
????@Autowired
????private?ApplicationEventPublisher?eventPublisher;
????/**
?????*?事件發(fā)布方法
?????*/
????public?void?pushListener(String?msg)?{
????????eventPublisher.publishEvent(new?DefinitionEvent(this,?false));
????}
}
@EventListener 注解
? 簡(jiǎn)單使用
除了通過(guò)實(shí)現(xiàn)接口,還可以使用@EventListener 注解,實(shí)現(xiàn)對(duì)任意的方法都能監(jiān)聽(tīng)事件。
在任意方法上標(biāo)注@EventListener 注解,指定 classes,即需要處理的事件類(lèi)型,一般就是 ApplicationEvent及其子類(lèi),可以設(shè)置多項(xiàng)。
@Component
public?class?DefinitionAnnotationEventListener?{
????@EventListener(classes?=?{DefinitionEvent.class})
????public?void?listen(DefinitionEvent?event)?{
????????System.out.println("注解監(jiān)聽(tīng)器:"?+?event.getClass().getName());
????}
}
此時(shí),就可以有一個(gè)發(fā)布,兩個(gè)監(jiān)聽(tīng)器監(jiān)聽(tīng)到發(fā)布的消息了,一個(gè)是注解方式,一個(gè)是非注解方式
結(jié)果:
注解監(jiān)聽(tīng)器:com.alibaba.spring.context.event.DefinitionEvent
事件觸發(fā):com.alibaba.spring.context.event.DefinitionEvent
原理
其實(shí)上面添加@EventListener注解的方法被包裝成了ApplicationListener對(duì)象,上面的類(lèi)似于下面這種寫(xiě)法,這個(gè)應(yīng)該比較好理解。
@Component
public?class?DefinitionAnnotationEventListener?
??????????????implements?ApplicationListener<DefinitionEvent>?{
????
????@Override
????public?void?onApplicationEvent(DefinitionEvent?event)?{
?????????System.out.println("注解監(jiān)聽(tīng)器:"?+?event.getMsg());
????}
}
查看SpringBoot的源碼,找到下面的代碼,因?yàn)槲沂荰omcat環(huán)境,這里創(chuàng)建的ApplicationContext是org.springframework.bootweb.servlet.context.AnnotationConfigServletWebServerApplicationContext
protected?ConfigurableApplicationContext?createApplicationContext()?{
????????Class>?contextClass?=?this.applicationContextClass;
????????if?(contextClass?==?null)?{
????????????try?{
????????????????switch?(this.webApplicationType)?{
????????????????case?SERVLET:
????????????????????contextClass?=?Class.forName(DEFAULT_SERVLET_WEB_CONTEXT_CLASS);
????????????????????break;
????????????????case?REACTIVE:
????????????????????contextClass?=?Class.forName(DEFAULT_REACTIVE_WEB_CONTEXT_CLASS);
????????????????????break;
????????????????default:
????????????????????contextClass?=?Class.forName(DEFAULT_CONTEXT_CLASS);
????????????????}
????????????}
????????????catch?(ClassNotFoundException?ex)?{
????????????????throw?new?IllegalStateException(
????????????????????????"Unable?create?a?default?ApplicationContext,?"?+?"please?specify?an?ApplicationContextClass",
????????????????????????ex);
????????????}
????????}
????????return?(ConfigurableApplicationContext)?BeanUtils.instantiateClass(contextClass);
????}
構(gòu)造方法如下
public?AnnotationConfigApplicationContext()?{
??this.reader?=?new?AnnotatedBeanDefinitionReader(this);
??this.scanner?=?new?ClassPathBeanDefinitionScanner(this);
}
進(jìn)入AnnotatedBeanDefinitionReader里面
/**
??*?Create?a?new?{@code?AnnotatedBeanDefinitionReader}?for?the?given?registry,
??*?using?the?given?{@link?Environment}.
??*?@param?registry?the?{@code?BeanFactory}?to?load?bean?definitions?into,
??*?in?the?form?of?a?{@code?BeanDefinitionRegistry}
??*?@param?environment?the?{@code?Environment}?to?use?when?evaluating?bean?definition
??*?profiles.
??*?@since?3.1
??*/
?public?AnnotatedBeanDefinitionReader(BeanDefinitionRegistry?registry,?Environment?environment)?{
??Assert.notNull(registry,?"BeanDefinitionRegistry?must?not?be?null");
??Assert.notNull(environment,?"Environment?must?not?be?null");
??this.registry?=?registry;
??this.conditionEvaluator?=?new?ConditionEvaluator(registry,?environment,?null);
??AnnotationConfigUtils.registerAnnotationConfigProcessors(this.registry);
?}
再進(jìn)到AnnotationConfigUtils的方法里面,省略了一部分代碼,可以看到他注冊(cè)了一個(gè)EventListenerMethodProcessor類(lèi)到工廠了。這是一個(gè)BeanFactory的后置處理器。
public?static?Set?registerAnnotationConfigProcessors(
????????????BeanDefinitionRegistry?registry,?@Nullable?Object?source)? {
????????DefaultListableBeanFactory?beanFactory?=?unwrapDefaultListableBeanFactory(registry);
????......
????.....
????......????
????if?(!registry.containsBeanDefinition(EVENT_LISTENER_PROCESSOR_BEAN_NAME))?{
????????????RootBeanDefinition?def?=?new?RootBeanDefinition(EventListenerMethodProcessor.class);
????????????def.setSource(source);
????????????beanDefs.add(registerPostProcessor(registry,?def,?EVENT_LISTENER_PROCESSOR_BEAN_NAME));
????????}
????
????......
????......
????????return?beanDefs;
????}
查看這個(gè)BeanFactory的后置處理器EventListenerMethodProcessor,下面方法,他會(huì)遍歷所有bean,找到其中帶有@EventListener的方法,將它包裝成ApplicationListenerMethodAdapter,注冊(cè)到工廠里,這樣就成功注冊(cè)到Spring的監(jiān)聽(tīng)系統(tǒng)里了。
????@Override
????public?void?afterSingletonsInstantiated()?{
????????ConfigurableListableBeanFactory?beanFactory?=?this.beanFactory;
????????Assert.state(this.beanFactory?!=?null,?"No?ConfigurableListableBeanFactory?set");
????????String[]?beanNames?=?beanFactory.getBeanNamesForType(Object.class);
????????for?(String?beanName?:?beanNames)?{
????????????if?(!ScopedProxyUtils.isScopedTarget(beanName))?{
????????????????Class>?type?=?null;
????????????????try?{
????????????????????type?=?AutoProxyUtils.determineTargetClass(beanFactory,?beanName);
????????????????}
????????????????catch?(Throwable?ex)?{
????????????????????//?An?unresolvable?bean?type,?probably?from?a?lazy?bean?-?let's?ignore?it.
????????????????????if?(logger.isDebugEnabled())?{
????????????????????????logger.debug("Could?not?resolve?target?class?for?bean?with?name?'"?+?beanName?+?"'",?ex);
????????????????????}
????????????????}
????????????????if?(type?!=?null)?{
????????????????????if?(ScopedObject.class.isAssignableFrom(type))?{
????????????????????????try?{
????????????????????????????Class>?targetClass?=?AutoProxyUtils.determineTargetClass(
????????????????????????????????????beanFactory,?ScopedProxyUtils.getTargetBeanName(beanName));
????????????????????????????if?(targetClass?!=?null)?{
????????????????????????????????type?=?targetClass;
????????????????????????????}
????????????????????????}
????????????????????????catch?(Throwable?ex)?{
????????????????????????????//?An?invalid?scoped?proxy?arrangement?-?let's?ignore?it.
????????????????????????????if?(logger.isDebugEnabled())?{
????????????????????????????????logger.debug("Could?not?resolve?target?bean?for?scoped?proxy?'"?+?beanName?+?"'",?ex);
????????????????????????????}
????????????????????????}
????????????????????}
????????????????????try?{
????????????????????????processBean(beanName,?type);
????????????????????}
????????????????????catch?(Throwable?ex)?{
????????????????????????throw?new?BeanInitializationException("Failed?to?process?@EventListener?"?+
????????????????????????????????"annotation?on?bean?with?name?'"?+?beanName?+?"'",?ex);
????????????????????}
????????????????}
????????????}
????????}
????}
private?void?processBean(final?String?beanName,?final?Class>?targetType)?{
????????if?(!this.nonAnnotatedClasses.contains(targetType)?&&
????????????????!targetType.getName().startsWith("java")?&&
????????????????!isSpringContainerClass(targetType))?{
????????????Map?annotatedMethods?=?null;
????????????try?{
????????????????annotatedMethods?=?MethodIntrospector.selectMethods(targetType,
????????????????????????(MethodIntrospector.MetadataLookup)?method?->
????????????????????????????????AnnotatedElementUtils.findMergedAnnotation(method,?EventListener.class));
????????????}
????????????catch?(Throwable?ex)?{
????????????????//?An?unresolvable?type?in?a?method?signature,?probably?from?a?lazy?bean?-?let's?ignore?it.
????????????????if?(logger.isDebugEnabled())?{
????????????????????logger.debug("Could?not?resolve?methods?for?bean?with?name?'"?+?beanName?+?"'",?ex);
????????????????}
????????????}
????????????if?(CollectionUtils.isEmpty(annotatedMethods))?{
????????????????this.nonAnnotatedClasses.add(targetType);
????????????????if?(logger.isTraceEnabled())?{
????????????????????logger.trace("No?@EventListener?annotations?found?on?bean?class:?"?+?targetType.getName());
????????????????}
????????????}
????????????else?{
????????????????//?Non-empty?set?of?methods
????????????????ConfigurableApplicationContext?context?=?this.applicationContext;
????????????????Assert.state(context?!=?null,?"No?ApplicationContext?set");
????????????????List?factories?=?this.eventListenerFactories;
????????????????Assert.state(factories?!=?null,?"EventListenerFactory?List?not?initialized");
????????????????for?(Method?method?:?annotatedMethods.keySet())?{
????????????????????for?(EventListenerFactory?factory?:?factories)?{
????????????????????????if?(factory.supportsMethod(method))?{
????????????????????????????Method?methodToUse?=?AopUtils.selectInvocableMethod(method,?context.getType(beanName));
????????????????????????????ApplicationListener>?applicationListener?=
????????????????????????????????????factory.createApplicationListener(beanName,?targetType,?methodToUse);
????????????????????????????if?(applicationListener?instanceof?ApplicationListenerMethodAdapter)?{
????????????????????????????????((ApplicationListenerMethodAdapter)?applicationListener).init(context,?this.evaluator);
????????????????????????????}
????????????????????????????context.addApplicationListener(applicationListener);
????????????????????????????break;
????????????????????????}
????????????????????}
????????????????}
????????????????if?(logger.isDebugEnabled())?{
????????????????????logger.debug(annotatedMethods.size()?+?"?@EventListener?methods?processed?on?bean?'"?+
????????????????????????????beanName?+?"':?"?+?annotatedMethods);
????????????????}
????????????}
????????}
????}
由方法生成Listener的邏輯由EventListenerFactory完成的,這又分為兩種,一種是普通的@EventLintener另一種是@TransactionalEventListener,是由兩個(gè)工廠處理的。
總結(jié)
上面介紹了@EventListener的原理,其實(shí)上面方法里還有一個(gè)@TransactionalEventListener注解,其實(shí)原理是一模一樣的,只是這個(gè)監(jiān)聽(tīng)者可以選擇在事務(wù)完成后才會(huì)被執(zhí)行,事務(wù)執(zhí)行失敗就不會(huì)被執(zhí)行。
這兩個(gè)注解的邏輯是一模一樣的,并且@TransactionalEventListener本身就被標(biāo)記有@EventListener,只是最后生成監(jiān)聽(tīng)器時(shí)所用的工廠不一樣而已。

往期推薦

MyBatis 中為什么不建議使用 where 1=1?

MyBatis原生批量插入的坑與解決方案!

聊聊sql優(yōu)化的15個(gè)小技巧
