<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          通過(guò)Nacos配置刷新進(jìn)行RabbitMQ消費(fèi)者在線(xiàn)啟停

          共 21163字,需瀏覽 43分鐘

           ·

          2023-05-23 06:26

          前提

          公司在做一些金融相關(guān)業(yè)務(wù),某些時(shí)候由于數(shù)據(jù)提供商定期維護(hù)或者特殊原因需要暫停某些服務(wù)的消費(fèi)者。之前選用的消息隊(duì)列技術(shù)棧是RabbitMQ,用于微服務(wù)之間的消息投遞,對(duì)于這類(lèi)需要暫停消費(fèi)者的場(chǎng)景是選用注釋掉消費(fèi)者Bean中的相應(yīng)Spring(Boot)注解重新發(fā)布來(lái)實(shí)現(xiàn),后面需要重新啟動(dòng)消費(fèi)就是解開(kāi)對(duì)應(yīng)的注釋再發(fā)布一次。這樣的處理流程既繁瑣,也顯得沒(méi)有技術(shù)含量,所以筆者就這個(gè)問(wèn)題結(jié)合已有的配置中心Nacos集群做了一個(gè)方案,使用Nacos的配置準(zhǔn)實(shí)時(shí)刷新功能去控制某個(gè)微服務(wù)實(shí)例的所有RabbitMQ消費(fèi)者(容器)的停止和啟動(dòng)。

          a78c7d5b96d63967ec0137ef5c56c183.webpspring-boot-rabbit-nacos-control-1

          方案原理

          下面探討一下方案的原理和可行性,主要包括:

          • RabbitMQ消費(fèi)者生命周期管理
          • Nacos長(zhǎng)輪詢(xún)與配置刷新

          因?yàn)楣ぷ髦械闹饕夹g(shù)棧是SpringBoot + RabbitMQ,下文是探討場(chǎng)景針對(duì)spring-boot-starter-amqp(下面簡(jiǎn)稱(chēng)amqp)展開(kāi)。

          ?

          使用SpringBoot版本為2.3.0.RELEASE,spring-cloud-alibaba-nacos-config的版本為2.2.0.RELEASE

          ?

          RabbitMQ消費(fèi)者生命周期管理

          查看RabbitAnnotationDrivenConfiguration的源碼:

          1be5c63795f376e47177ce26f4e882d0.webpspring-boot-rabbit-nacos-control-2

          amqp中默認(rèn)啟用spring.rabbitmq.listener.type=simple,使用的RabbitListenerContainerFactory(消息監(jiān)聽(tīng)器容器工廠)實(shí)現(xiàn)為SimpleRabbitListenerContainerFactory,使用的MessageListenerContainer(消息監(jiān)聽(tīng)器容器)實(shí)現(xiàn)為SimpleMessageListenerContainer。在amqp中,無(wú)論注解聲明式或者編程式注冊(cè)的消費(fèi)者最終都會(huì)封裝為MessageListenerContainer實(shí)例,因此消費(fèi)者生命周期可以直接通過(guò)MessageListenerContainer進(jìn)行管理,MessageListenerContainer的生命周期管理API會(huì)直接作用于最底層的真實(shí)消費(fèi)者實(shí)現(xiàn)BlockingQueueConsumer。幾者的關(guān)系如下:

          5dfab887eb954c9db79ed8b8ec3c0300.webpspring-boot-rabbit-nacos-control-3

          一般聲明式消費(fèi)者注冊(cè)方式如下:

                
                @Slf4j
          @RabbitListener(id?=?"SingleAnnoMethodDemoConsumer",?queues?=?"srd->srd.demo")
          @Component
          public?class?SingleAnnoMethodDemoConsumer?{

          ????@RabbitHandler
          ????public?void?onMessage(Message?message)?{
          ????????log.info("SingleAnnoMethodDemoConsumer.onMessage?=>?{}",?new?String(message.getBody(),?StandardCharsets.UTF_8));
          ????}
          }

          @RabbitListener(id?=?"MultiAnnoMethodDemoConsumer",?queues?=?"srd->srd.demo")
          @Component
          @Slf4j
          public?class?MultiAnnoMethodDemoConsumer?{

          ????@RabbitHandler
          ????public?void?firstOnMessage(Message?message)?{
          ????????log.info("MultiAnnoMethodDemoConsumer.firstOnMessage?=>?{}",?new?String(message.getBody(),?StandardCharsets.UTF_8));
          ????}

          ????@RabbitHandler
          ????public?void?secondOnMessage(Message?message)?{
          ????????log.info("MultiAnnoMethodDemoConsumer.secondOnMessage?=>?{}",?new?String(message.getBody(),?StandardCharsets.UTF_8));
          ????}
          }

          @Component
          @Slf4j
          public?class?MultiAnnoInstanceDemoConsumer?{

          ????@RabbitListener(id?=?"MultiAnnoInstanceDemoConsumer-firstOnInstanceMessage",?queues?=?"srd->srd.demo")
          ????public?void?firstOnInstanceMessage(Message?message)?{
          ????????log.info("MultiAnnoInstanceDemoConsumer.firstOnInstanceMessage?=>?{}",?new?String(message.getBody(),?StandardCharsets.UTF_8));
          ????}

          ????@RabbitListener(id?=?"MultiAnnoInstanceDemoConsumer-secondOnInstanceMessage",?queues?=?"srd->srd.sec")
          ????public?void?secondOnInstanceMessage(Message?message)?{
          ????????log.info("MultiAnnoInstanceDemoConsumer.secondOnInstanceMessage?=>?{}",?new?String(message.getBody(),?StandardCharsets.UTF_8));
          ????}
          }

          對(duì)于基于@RabbitListener進(jìn)行聲明式注冊(cè)的消費(fèi)者,每個(gè)被@RabbitListener修飾的Bean或者方法最終都會(huì)單獨(dú)生成一個(gè)SimpleMessageListenerContainer實(shí)例,這些SimpleMessageListenerContainer實(shí)例的唯一標(biāo)識(shí)由@RabbitListenerid屬性指定,缺省值為org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#N,建議在使用時(shí)候通過(guò)規(guī)范約束必須定義此id屬性。分析源碼可以得知這類(lèi)型的消費(fèi)者通過(guò)RabbitListenerAnnotationBeanPostProcessor進(jìn)行發(fā)現(xiàn)和自動(dòng)注冊(cè),并且在RabbitListenerEndpointRegistry緩存了注冊(cè)信息,因此可以通過(guò)RabbitListenerEndpointRegistry直接獲取這些聲明式的消費(fèi)者容器實(shí)例:

                
                RabbitListenerEndpointRegistry?endpointRegistry?=?configurableListableBeanFactory.getBean(
          ????????????????RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
          ????????????????RabbitListenerEndpointRegistry.class);
          Set<String>?listenerContainerIds?=?endpointRegistry.getListenerContainerIds();
          for?(String?containerId?:?listenerContainerIds)?{
          ????MessageListenerContainer?messageListenerContainer?=?endpointRegistry.getListenerContainer(containerId);
          ????//?do?something?with?messageListenerContainer
          }

          一般編程式消費(fèi)者注冊(cè)方式如下:

                
                //?MessageListenerDemoConsumer
          @Component
          @Slf4j
          public?class?MessageListenerDemoConsumer?implements?MessageListener?{

          ????@Override
          ????public?void?onMessage(Message?message)?{
          ????????log.info("MessageListenerDemoConsumer.onMessage?=>?{}",?new?String(message.getBody(),?StandardCharsets.UTF_8));
          ????}
          }

          //?CustomMethodDemoConsumer
          @Component
          @Slf4j
          public?class?CustomMethodDemoConsumer?{

          ????public?void?customOnMessage(Message?message)?{
          ????????log.info("CustomMethodDemoConsumer.customOnMessage?=>?{}",?new?String(message.getBody(),?StandardCharsets.UTF_8));
          ????}
          }

          //?configuration?class
          //?通過(guò)現(xiàn)存的MessageListener實(shí)例進(jìn)行消費(fèi)
          @Bean
          public?SimpleMessageListenerContainer?messageListenerDemoConsumerContainer(
          ????????ConnectionFactory?connectionFactory,
          ????????@Qualifier("messageListenerDemoConsumer")
          ?MessageListener?messageListener)?
          {
          ????SimpleMessageListenerContainer?container?=?new?SimpleMessageListenerContainer();
          ????container.setListenerId("MessageListenerDemoConsumer");
          ????container.setConnectionFactory(connectionFactory);
          ????container.setConcurrentConsumers(1);
          ????container.setMaxConcurrentConsumers(1);
          ????container.setQueueNames("srd->srd.demo");
          ????container.setAcknowledgeMode(AcknowledgeMode.AUTO);
          ????container.setPrefetchCount(10);
          ????container.setAutoStartup(true);
          ????container.setMessageListener(messageListener);
          ????return?container;
          }

          //?通過(guò)IOC容器中某個(gè)Bean的具體方法進(jìn)行消費(fèi)
          @Bean
          public?SimpleMessageListenerContainer?customMethodDemoConsumerContainer(
          ????????ConnectionFactory?connectionFactory,
          ????????CustomMethodDemoConsumer?customMethodDemoConsumer)
          ?
          {
          ????SimpleMessageListenerContainer?container?=?new?SimpleMessageListenerContainer();
          ????container.setListenerId("CustomMethodDemoConsumer");
          ????container.setConnectionFactory(connectionFactory);
          ????container.setConcurrentConsumers(1);
          ????container.setMaxConcurrentConsumers(1);
          ????container.setQueueNames("srd->srd.demo");
          ????container.setAcknowledgeMode(AcknowledgeMode.AUTO);
          ????container.setPrefetchCount(10);
          ????container.setAutoStartup(true);
          ????MessageListenerAdapter?messageListenerAdapter?=?new?MessageListenerAdapter();
          ????messageListenerAdapter.setDelegate(customMethodDemoConsumer);
          ????messageListenerAdapter.setDefaultListenerMethod("customOnMessage");
          ????container.setMessageListener(messageListenerAdapter);
          ????return?container;
          }

          編程式注冊(cè)的SimpleMessageListenerContainer可以直接從IOC容器中獲?。?/p>

                
                Map<String,?MessageListenerContainer>?messageListenerContainerBeans
          ????????=?configurableListableBeanFactory.getBeansOfType(MessageListenerContainer.class);
          if?(!CollectionUtils.isEmpty(messageListenerContainerBeans))?{
          ????messageListenerContainerBeans.forEach((beanId,?messageListenerContainer)?->?{
          ????????//?do?something?with?messageListenerContainer
          ????});
          }

          至此,我們知道可以比較輕松地拿到服務(wù)中所有的MessageListenerContainer的實(shí)例,從而可以管理服務(wù)內(nèi)所有消費(fèi)者的生命周期。

          Nacos長(zhǎng)輪詢(xún)與配置刷新

          Nacos的客戶(hù)端通過(guò)LongPolling(長(zhǎng)輪詢(xún))的方式監(jiān)聽(tīng)Nacos服務(wù)端集群對(duì)應(yīng)dataIdgroup的配置數(shù)據(jù)變更,具體可以參考ClientWorker的源碼實(shí)現(xiàn),實(shí)現(xiàn)的過(guò)程大致如下:

          db41075f9d32dff28874ff87c24292d4.webpspring-boot-rabbit-nacos-control-4

          在非Spring(Boot)體系中,可以通過(guò)ConfigService#addListener()進(jìn)行配置變更監(jiān)聽(tīng),示例代碼如下:

                
                Properties?properties?=?new?Properties();
          properties.put(PropertyKeyConst.SERVER_ADDR,?"127.0.0.1:8848");
          properties.put(PropertyKeyConst.NAMESPACE,?"LOCAL");
          ConfigService?configService?=?NacosFactory.createConfigService(properties);
          Executor?executor?=?Executors.newSingleThreadExecutor(runnable?->?{
          ????Thread?thread?=?new?Thread(runnable);
          ????thread.setDaemon(true);
          ????thread.setName("NacosConfigSyncWorker");
          ????return?thread;
          });
          configService.addListener("application-aplha.properties",?"customer-service",?new?Listener()?{
          ????@Override
          ????public?Executor?getExecutor()?{
          ????????return?executor;
          ????}

          ????@Override
          ????public?void?receiveConfigInfo(String?configInfo)?{
          ????????????//?do?something?with?'configInfo'
          ????}
          });

          這種LongPolling的方式目前來(lái)看可靠性是比較高,因?yàn)?code style="background-color:rgba(27,31,35,.05);font-family:'Operator Mono', Consolas, Monaco, Menlo, monospace;">Nacos服務(wù)端集群一般在生產(chǎn)部署是大于3的奇數(shù)個(gè)實(shí)例節(jié)點(diǎn),并且底層基于raft共識(shí)算法實(shí)現(xiàn)集群通訊,只要不是同一時(shí)間超過(guò)半數(shù)節(jié)點(diǎn)宕機(jī)集群還是能正常提供服務(wù)。但是從實(shí)現(xiàn)上來(lái)看會(huì)有一些局限性:

          • 如果注冊(cè)過(guò)多的配置變更監(jiān)聽(tīng)器有可能會(huì)對(duì)Nacos服務(wù)端造成比較大的壓力,畢竟是多個(gè)客戶(hù)端進(jìn)行輪詢(xún)
          • 配置變更是由Nacos客戶(hù)端向Nacos服務(wù)端發(fā)起請(qǐng)求,因此監(jiān)聽(tīng)器回調(diào)有可能不是實(shí)時(shí)的(有可能延遲到客戶(hù)端下一輪的LongPolling提交)
          • Nacos客戶(hù)端會(huì)緩存每次從Nacos服務(wù)端拉取的配置內(nèi)容,如果要變更配置文件過(guò)大有可能導(dǎo)致緩存的數(shù)據(jù)占用大量?jī)?nèi)存,影響客戶(hù)端所在服務(wù)的性能
          ?

          關(guān)于配置變更監(jiān)聽(tīng)其實(shí)有其他候選的方案,例如Redis的發(fā)布訂閱,Zookeeper的節(jié)點(diǎn)路徑變更監(jiān)聽(tīng)甚至是使用消息隊(duì)列進(jìn)行通知,本文使用Nacos配置變更監(jiān)聽(tīng)的原因是更好的劃分不同應(yīng)用配置文件的編輯查看權(quán)限方便進(jìn)行管理,其他候選方案要實(shí)現(xiàn)分權(quán)限管理需要二次開(kāi)發(fā)

          ?

          使用SpringCloudAlibaba提供的spring-cloud-alibaba-nacos-config可以更加簡(jiǎn)便地使用Nacos配置刷新監(jiān)聽(tīng),并且會(huì)把變更的PropertySource重新綁定到對(duì)應(yīng)的配置屬性Bean。引入依賴(lài):

                
                <dependency>
          ????<groupId>com.alibaba.cloud</groupId>
          ????<artifactId>spring-cloud-alibaba-nacos-config</artifactId>
          </dependency>
          <dependency>
          ????<groupId>com.alibaba.nacos</groupId>
          ????<artifactId>nacos-client</artifactId>
          </dependency>

          具體的配置類(lèi)是NacosConfigProperties

          5b617515028e9e561354ea013bb73d07.webpspring-boot-rabbit-nacos-control-5

          紅圈中是需要關(guān)注的配置項(xiàng),refreshEnabled是配置刷新的開(kāi)關(guān),默認(rèn)是開(kāi)啟的。sharedConfigsextensionConfigs雖然命名不同,但是兩者實(shí)現(xiàn)和功能沒(méi)有差異,都是類(lèi)似于共享或者說(shuō)擴(kuò)展配置,每個(gè)共享(擴(kuò)展)配置支持單獨(dú)配置刷新開(kāi)關(guān)。舉個(gè)例子,在Nacos服務(wù)端的某個(gè)配置如下圖:

          5b296f5a6badf2f5b77e2b81747f39d5.webpspring-boot-rabbit-nacos-control-6

          為了支持配置變更和對(duì)應(yīng)的實(shí)體類(lèi)成員變量更新,對(duì)應(yīng)客戶(hù)端的配置文件是這樣的:

                
                spring.cloud.nacos.config.refresh-enabled=true
          spring.cloud.nacos.config.shared-configs[0].data-id=shared.properties
          spring.cloud.nacos.config.shared-configs[0].group=shared-conf
          spring.cloud.nacos.config.shared-configs[0].refresh=true

          對(duì)應(yīng)的配置屬性Bean如下:

                
                @Data
          @ConfigurationProperties(prefix?=?"shared")
          public?class?SharedProperties?{

          ????private?String?foo;?
          }

          只要客戶(hù)端所在SpringBoot服務(wù)啟動(dòng)完成后,修改Nacos服務(wù)端對(duì)應(yīng)dataIdshared.propertiesshared.foo屬性值,那邊SharedPropertiesfoo屬性就會(huì)準(zhǔn)實(shí)時(shí)刷新??梢栽?code style="background-color:rgba(27,31,35,.05);font-family:'Operator Mono', Consolas, Monaco, Menlo, monospace;">SharedProperties添加一個(gè)@PostConstruct來(lái)觀察這個(gè)屬性更新的過(guò)程:

                
                @Slf4j
          @Data
          @ConfigurationProperties(prefix?=?"shared")
          public?class?SharedProperties?{

          ????private?final?AtomicBoolean?firstInit?=?new?AtomicBoolean();

          ????private?String?foo;

          ????@PostConstruct
          ????public?void?postConstruct()?{
          ????????if?(!firstInit.compareAndSet(false,?true))?{
          ????????????log.info("SharedProperties?refresh...");
          ????????}?else?{
          ????????????log.info("SharedProperties?first?init...");
          ????????}
          ????}
          }

          方案實(shí)施

          整個(gè)方案實(shí)施包括下面幾步:

          • 配置變更通知與配置類(lèi)刷新
          • 發(fā)現(xiàn)所有消費(fèi)者容器
          • 管理消費(fèi)者容器生命周期

          初始化一個(gè)Maven項(xiàng)目,引入下面的依賴(lài):

          • org.projectlombok:lombok:1.18.12
          • org.springframework.boot:spring-boot-starter-web:2.3.0.RELEASE
          • org.springframework.boot:spring-boot-starter-amqp:2.3.0.RELEASE
          • com.alibaba.cloud:spring-cloud-alibaba-nacos-config:2.2.0.RELEASE
          • com.alibaba.nacos:nacos-client:1.4.4

          下載Nacos服務(wù)并且啟動(dòng)一個(gè)單機(jī)實(shí)例(當(dāng)前2023-02的最新穩(wěn)定版為2.2.0),新建命名空間LOCAL并且添加四份配置文件:

          be1f984b12039fd74aae05c2f6f82d46.webpspring-boot-rabbit-nacos-control-7
          ?

          可以使用1.x的Nacos客戶(hù)端去連接2.x的Nacos服務(wù)端,這個(gè)是Nacos做的向下兼容,反過(guò)來(lái)不行

          ?

          前文提到的Nacos客戶(hù)端中,ConfigService是通過(guò)dataIdgroup定位到具體的配置文件,一般dataId按照配置文件的內(nèi)容命名,對(duì)于SpringBoot的應(yīng)用配置文件一般命名為application-${profile}.[properties,yml],group是配置文件的分組,對(duì)于SpringBoot的應(yīng)用配置文件一般命名為${spring.application.name}。筆者在在這份SpringBoot的應(yīng)用配置文件中只添加了RabbitMQ的配置:

          3532125311f34a390051603082bb892d.webpspring-boot-rabbit-nacos-control-8

          確保本地或者遠(yuǎn)程有一個(gè)可用的RabbitMQ服務(wù),接下來(lái)往下開(kāi)始實(shí)施方案。

          配置變更通知與配置類(lèi)刷新

          前面已經(jīng)提到過(guò)SpringBoot結(jié)合Nacos進(jìn)行配置屬性Bean的成員變量刷新,在項(xiàng)目的Classpathresources文件夾)添加bootstrap.properties文件,內(nèi)容如下:

                
                spring.application.name=rabbitmq-rocketmq-demo
          spring.profiles.active=default
          # nacos配置
          spring.cloud.nacos.config.enabled=true
          spring.cloud.nacos.config.server-addr=127.0.0.1:8848
          spring.cloud.nacos.config.namespace=LOCAL
          spring.cloud.nacos.config.group=rabbitmq-rocketmq-demo
          spring.cloud.nacos.config.prefix=application
          spring.cloud.nacos.config.file-extension=properties
          spring.cloud.nacos.config.refresh-enabled=true
          spring.cloud.nacos.config.shared-configs[0].data-id=shared.properties
          spring.cloud.nacos.config.shared-configs[0].group=shared-conf
          spring.cloud.nacos.config.shared-configs[0].refresh=true
          spring.cloud.nacos.config.extension-configs[0].data-id=extension.properties
          spring.cloud.nacos.config.extension-configs[0].group=extension-conf
          spring.cloud.nacos.config.extension-configs[0].refresh=true
          spring.cloud.nacos.config.extension-configs[1].data-id=rabbitmq-toggle.properties
          spring.cloud.nacos.config.extension-configs[1].group=rabbitmq-rocketmq-demo
          spring.cloud.nacos.config.extension-configs[1].refresh=true

          這里profile定義為default也就是會(huì)關(guān)聯(lián)到NacosdataId = 'application.properties', group = 'rabbitmq-rocketmq-demo'那份配置文件,主要是用于定義amqp需要的配置屬性。對(duì)于RabbitMQ消費(fèi)者的開(kāi)關(guān),定義在dataId = 'rabbitmq-toggle.properties', group = 'rabbitmq-rocketmq-demo'的文件中。添加RabbitmqToggleProperties

                
                //?RabbitmqToggleProperties
          @Slf4j
          @Data
          @ConfigurationProperties(prefix?=?"rabbitmq.toggle")
          public?class?RabbitmqToggleProperties?{

          ????private?final?AtomicBoolean?firstInit?=?new?AtomicBoolean();

          ????private?List<RabbitmqConsumer>?consumers;

          ????@PostConstruct
          ????public?void?postConstruct()?{
          ????????if?(!firstInit.compareAndSet(false,?true))?{
          ????????????StaticEventPublisher.publishEvent(new?RabbitmqToggleRefreshEvent(this));
          ????????????log.info("RabbitmqToggleProperties?refresh,?publish?RabbitmqToggleRefreshEvent...");
          ????????}?else?{
          ????????????log.info("RabbitmqToggleProperties?first?init...");
          ????????}
          ????}

          ????@Data
          ????public?static?class?RabbitmqConsumer?{

          ????????private?String?listenerId;

          ????????private?Integer?concurrentConsumers;

          ????????private?Integer?maxConcurrentConsumers;

          ????????private?Boolean?enable;
          ????}
          }

          //?RabbitmqToggleRefreshEvent
          @Getter
          public?class?RabbitmqToggleRefreshEvent?extends?ApplicationEvent?{

          ????private?final?RabbitmqToggleProperties?rabbitmqToggleProperties;

          ????public?RabbitmqToggleRefreshEvent(RabbitmqToggleProperties?rabbitmqToggleProperties)?{
          ????????super("RabbitmqToggleRefreshEvent");
          ????????this.rabbitmqToggleProperties?=?rabbitmqToggleProperties;
          ????}
          }

          //?StaticEventPublisher
          public?class?StaticEventPublisher?{

          ????private?static?ApplicationEventPublisher?PUBLISHER?=?null;

          ????public?static?void?publishEvent(ApplicationEvent?applicationEvent)?{
          ????????if?(Objects.nonNull(PUBLISHER))?{
          ????????????PUBLISHER.publishEvent(applicationEvent);
          ????????}
          ????}

          ????public?static?void?attachApplicationEventPublisher(ApplicationEventPublisher?publisher)?{
          ????????PUBLISHER?=?publisher;
          ????}
          }

          這里prefix定義為rabbitmq.toggle,為了和rabbitmq-toggle.properties的屬性一一綁定,該文件中的配置Key必須以rabbitmq.toggle為前綴。RabbitmqToggleProperties首次回調(diào)@PostConstruct方法只打印初始化日志,再次回調(diào)@PostConstruct方法則發(fā)布RabbitmqToggleRefreshEvent事件,用于后面通知對(duì)應(yīng)的消費(fèi)者容器Bean進(jìn)行啟停。

          發(fā)現(xiàn)所有消費(fèi)者容器

          為了統(tǒng)一管理服務(wù)中所有消費(fèi)者容器Bean,需要定義一個(gè)類(lèi)似于消費(fèi)者容器注冊(cè)或者緩存中心類(lèi),緩存Key可以考慮使用listenerId,Value就直接使用MessageListenerContainer實(shí)例即可:

                
                private?final?ConcurrentMap<String,?MessageListenerContainer>?containerCache?=?Maps.newConcurrentMap();
          ?

          這里既然選定了listenerId作為緩存的Key,那么必須定義好規(guī)范,要求無(wú)論注解聲明式定義的消費(fèi)者還是編程式定義的消費(fèi)者,必須明確指定具體意義的listenerId,否則到時(shí)候存在Key的格式為org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#N會(huì)比較混亂

          ?

          接下來(lái)發(fā)現(xiàn)和緩存所有消費(fèi)者容器:

                
                private?ConfigurableListableBeanFactory?configurableListableBeanFactory;

          private?ApplicationEventPublisher?applicationEventPublisher;

          //?----------------------------------------------------------------------

          //?獲取聲明式消費(fèi)者容器
          RabbitListenerEndpointRegistry?endpointRegistry?=?configurableListableBeanFactory.getBean(
          ????????RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
          ????????RabbitListenerEndpointRegistry.class);
          Set<String>?listenerContainerIds?=?endpointRegistry.getListenerContainerIds();
          for?(String?containerId?:?listenerContainerIds)?{
          ????MessageListenerContainer?messageListenerContainer?=?endpointRegistry.getListenerContainer(containerId);
          ????containerCache.putIfAbsent(containerId,?messageListenerContainer);
          }
          //?獲取編程式消費(fèi)者容器
          Map<String,?MessageListenerContainer>?messageListenerContainerBeans
          ????????=?configurableListableBeanFactory.getBeansOfType(MessageListenerContainer.class);
          if?(!CollectionUtils.isEmpty(messageListenerContainerBeans))?{
          ????messageListenerContainerBeans.forEach((beanId,?bean)?->?{
          ????????if?(bean?instanceof?AbstractMessageListenerContainer)?{
          ????????????AbstractMessageListenerContainer?abstractMessageListenerContainer?=?(AbstractMessageListenerContainer)?bean;
          ????????????String?listenerId?=?abstractMessageListenerContainer.getListenerId();
          ????????????if?(StringUtils.hasLength(listenerId))?{
          ????????????????containerCache.putIfAbsent(listenerId,?abstractMessageListenerContainer);
          ????????????}?else?{
          ????????????????containerCache.putIfAbsent(beanId,?bean);
          ????????????}
          ????????}?else?{
          ????????????containerCache.putIfAbsent(beanId,?bean);
          ????????}
          ????});
          }
          Set<String>?listenerIds?=?containerCache.keySet();
          listenerIds.forEach(listenerId?->?log.info("Cache?message?listener?container?=>?{}",?listenerId));
          //?所有消費(fèi)者容器Bean發(fā)現(xiàn)完成后才接收刷新事件
          StaticEventPublisher.attachApplicationEventPublisher(this.applicationEventPublisher);

          StaticEventPublisher中的ApplicationEventPublisher屬性延遲到所有消費(fèi)者容器緩存完成后賦值,防止過(guò)早的屬性變更通知導(dǎo)致部分消費(fèi)者容器的啟停操作被忽略。

          管理消費(fèi)者容器生命周期

          接收到RabbitmqToggleRefreshEvent事件后,然后遍歷傳遞過(guò)來(lái)的RabbitmqToggleProperties里面的consumers,再基于已經(jīng)發(fā)現(xiàn)的消費(fèi)者容器進(jìn)行處理,代碼大概如下:

                
                @EventListener(classes?=?RabbitmqToggleRefreshEvent.class)
          public?void?onRabbitmqToggleRefreshEvent(RabbitmqToggleRefreshEvent?event)?
          {
          ????RabbitmqToggleProperties?rabbitmqToggleProperties?=?event.getRabbitmqToggleProperties();
          ????List<RabbitmqToggleProperties.RabbitmqConsumer>?consumers?=?rabbitmqToggleProperties.getConsumers();
          ????if?(!CollectionUtils.isEmpty(consumers))?{
          ????????consumers.forEach(consumerConf?->?{
          ????????????String?listenerId?=?consumerConf.getListenerId();
          ????????????if?(StringUtils.hasLength(listenerId))?{
          ????????????????MessageListenerContainer?messageListenerContainer?=?containerCache.get(listenerId);
          ????????????????if?(Objects.nonNull(messageListenerContainer))?{
          ????????????????????//?running?->?stop
          ????????????????????if?(messageListenerContainer.isRunning()?&&?Objects.equals(Boolean.FALSE,?consumerConf.getEnable()))?{
          ????????????????????????messageListenerContainer.stop();
          ????????????????????????log.info("Message?listener?container?=>?{}?stop?successfully",?listenerId);
          ????????????????????}
          ????????????????????//?modify?concurrency
          ????????????????????if?(messageListenerContainer?instanceof?SimpleMessageListenerContainer)?{
          ????????????????????????SimpleMessageListenerContainer?simpleMessageListenerContainer
          ????????????????????????????????=?(SimpleMessageListenerContainer)?messageListenerContainer;
          ????????????????????????if?(Objects.nonNull(consumerConf.getConcurrentConsumers()))?{
          ????????????????????????????simpleMessageListenerContainer.setConcurrentConsumers(consumerConf.getConcurrentConsumers());
          ????????????????????????}
          ????????????????????????if?(Objects.nonNull(consumerConf.getMaxConcurrentConsumers()))?{
          ????????????????????????????simpleMessageListenerContainer.setMaxConcurrentConsumers(consumerConf.getMaxConcurrentConsumers());
          ????????????????????????}
          ????????????????????}
          ????????????????????//?stop?->?running
          ????????????????????if?(!messageListenerContainer.isRunning()?&&?Objects.equals(Boolean.TRUE,?consumerConf.getEnable()))?{
          ????????????????????????messageListenerContainer.start();
          ????????????????????????log.info("Message?listener?container?=>?{}?start?successfully",?listenerId);
          ????????????????????}
          ????????????????}
          ????????????}
          ????????});
          ????}
          }

          修改Nacos服務(wù)里面的rabbitmq-toggle.properties文件,輸入內(nèi)容如下:

                
                rabbitmq.toggle.consumers[0].listenerId=MultiAnnoInstanceDemoConsumer-firstOnInstanceMessage
          rabbitmq.toggle.consumers[0].enable=true
          rabbitmq.toggle.consumers[1].listenerId=MultiAnnoInstanceDemoConsumer-secondOnInstanceMessage
          rabbitmq.toggle.consumers[1].enable=true
          rabbitmq.toggle.consumers[2].listenerId=MultiAnnoMethodDemoConsumer
          rabbitmq.toggle.consumers[2].enable=true
          rabbitmq.toggle.consumers[3].listenerId=SingleAnnoMethodDemoConsumer
          rabbitmq.toggle.consumers[3].enable=true
          rabbitmq.toggle.consumers[4].listenerId=CustomMethodDemoConsumer
          rabbitmq.toggle.consumers[4].enable=true
          rabbitmq.toggle.consumers[5].listenerId=MessageListenerDemoConsumer
          rabbitmq.toggle.consumers[5].enable=true

          啟動(dòng)項(xiàng)目,觀察RabbitMQ WebUI對(duì)應(yīng)的隊(duì)列消費(fèi)者數(shù)量:

          b5e0cd7090f477a1d13952a2eeed6fc6.webpspring-boot-rabbit-nacos-control-9

          然后隨機(jī)修改rabbitmq-toggle.properties文件某個(gè)消費(fèi)者容器設(shè)置為enable = 'fasle',觀察服務(wù)日志和觀察RabbitMQ WebUI的變化:

          ecf920c6470cfb4ad9809a9ddb94d593.webpspring-boot-rabbit-nacos-control-10

          可見(jiàn)RabbitMQ WebUI中隊(duì)列消費(fèi)者數(shù)量減少,服務(wù)日志也提示listenerId = 'MessageListenerDemoConsumer'的消費(fèi)者容器被停止了。

          一些思考

          為了更精確控制有消費(fèi)者容器的啟停,可以考慮在配置文件中定義關(guān)閉消費(fèi)者容器的自動(dòng)啟動(dòng)開(kāi)關(guān):

                
                spring.rabbitmq.listener.simple.auto-startup=false

          可以考慮在RabbitmqToggleProperties首次回調(diào)@PostConstruct方法時(shí)候發(fā)布RabbitmqToggleInitEvent事件,然后監(jiān)聽(tīng)此事件啟動(dòng)所有已經(jīng)發(fā)現(xiàn)的消費(fèi)者容器。這樣就能做到應(yīng)用內(nèi)部的消費(fèi)者的啟停行為總是以Nacos的開(kāi)關(guān)配置文件為準(zhǔn),并且可以實(shí)現(xiàn)「在線(xiàn)」啟停和動(dòng)態(tài)調(diào)整最小最大消費(fèi)者數(shù)量。

          另外,如果細(xì)心的話(huà)能夠觀察到服務(wù)日志中,每當(dāng)監(jiān)聽(tīng)到Nacos配置變動(dòng)會(huì)打印Started application in N seconds (JVM running for M)的日志,這個(gè)并不是服務(wù)重啟了,而是啟動(dòng)了一個(gè)Spring子容器用于構(gòu)建一個(gè)全新的StandardEnvironment(見(jiàn)文末Demo項(xiàng)目中的EnvironmentCaptureApplicationRunner)用來(lái)承載刷新后的配置文件內(nèi)容,然后再拷貝或者覆蓋到當(dāng)前的Spring容器中的PropertySources,這個(gè)過(guò)程的代碼實(shí)現(xiàn)類(lèi)似這樣:

          5f35c4573318a4f82e69cb0495ed3a8b.webpspring-boot-rabbit-nacos-control-11

          小結(jié)

          本文探討了一種通過(guò)Nacos配置刷新方式管理SpringBoot服務(wù)中RabbitMQ消費(fèi)者生命周期管理的方案,目前只是提供了完整的思路和一些Demo級(jí)別代碼,后續(xù)應(yīng)該會(huì)完善方案和具體的工程級(jí)別編碼實(shí)現(xiàn)。

          本文Demo項(xiàng)目倉(cāng)庫(kù):

          • framework-mesh/rabbitmq-rocketmq-demo

          (本文完 c-3-d e-a-20230212)


          瀏覽 55
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  气质女人操逼 | 日韩免费片 | 在线a网站 | 特级AV免费在线观看 | 亚洲色图欧美色图自慰直播 |