<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spring Boot 中集成ActiveMQ

          共 10188字,需瀏覽 21分鐘

           ·

          2021-04-13 21:43

          ????關(guān)注后回復(fù) “進(jìn)群” ,拉你進(jìn)程序員交流群????


          作者丨武哥

          來(lái)源丨武哥聊編程


          1. JMS 和 ActiveMQ 介紹

          1.1 JMS 是啥

          百度百科的解釋?zhuān)?/p>

          JMS 即 Java 消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個(gè)Java平臺(tái)中關(guān)于面向消息中間件(MOM)的 API,用于在兩個(gè)應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信。Java 消息服務(wù)是一個(gè)與具體平臺(tái)無(wú)關(guān)的 API,絕大多數(shù) MOM 提供商都對(duì) JMS 提供支持。

          JMS 只是接口,不同的提供商或者開(kāi)源組織對(duì)其有不同的實(shí)現(xiàn),ActiveMQ 就是其中之一,它支持JMS,是 Apache 推出的。JMS 中有幾個(gè)對(duì)象模型:

          連接工廠(chǎng):ConnectionFactory
          JMS連接:Connection
          JMS會(huì)話(huà):Session
          JMS目的:Destination
          JMS生產(chǎn)者:Producer
          JMS消費(fèi)者:Consumer
          JMS消息兩種類(lèi)型:點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱。

          可以看出 JMS 實(shí)際上和 JDBC 有點(diǎn)類(lèi)似,JDBC 是可以用來(lái)訪(fǎng)問(wèn)許多不同關(guān)系數(shù)據(jù)庫(kù)的 API,而 JMS 則提供同樣與廠(chǎng)商無(wú)關(guān)的訪(fǎng)問(wèn)方法,以訪(fǎng)問(wèn)消息收發(fā)服務(wù)。本文主要使用 ActiveMQ。

          1.2 ActiveMQ

          ActiveMQ 是 Apache 的一個(gè)能力強(qiáng)勁的開(kāi)源消息總線(xiàn)。ActiveMQ 完全支持JMS1.1和J2EE 1.4規(guī)范,盡管 JMS 規(guī)范出臺(tái)已經(jīng)是很久的事情了,但是 JMS 在當(dāng)今的 Java EE 應(yīng)用中間仍然扮演著特殊的地位。ActiveMQ 用在異步消息的處理上,所謂異步消息即消息發(fā)送者無(wú)需等待消息接收者的處理以及返回,甚至無(wú)需關(guān)心消息是否發(fā)送成功。

          異步消息主要有兩種目的地形式,隊(duì)列(queue)和主題(topic),隊(duì)列用于點(diǎn)對(duì)點(diǎn)形式的消息通信,主題用于發(fā)布/訂閱式的消息通信。本章節(jié)主要來(lái)學(xué)習(xí)一下在 Spring Boot 中如何使用這兩種形式的消息。

          2. ActiveMQ安裝

          使用 ActiveMQ 首先需要去官網(wǎng)下載,官網(wǎng)地址為:http://activemq.apache.org/
          本課程使用的版本是 apache-activemq-5.15.3,下載后解壓縮會(huì)有一個(gè)名為 apache-activemq-5.15.3 的文件夾,沒(méi)錯(cuò),這就安裝好了,非常簡(jiǎn)單,開(kāi)箱即用。打開(kāi)文件夾會(huì)看到里面有個(gè) activemq-all-5.15.3.jar,這個(gè) jar 我們是可以加進(jìn)工程里的,但是使用 maven 的話(huà),這個(gè) jar 我們不需要。

          在使用 ActiveMQ 之前,首先得先啟動(dòng),剛才解壓后的目錄中有個(gè) bin 目錄,里面有 win32 和 win64 兩個(gè)目錄,根據(jù)自己電腦選擇其中一個(gè)打開(kāi)運(yùn)行里面的 activemq.bat 即可啟動(dòng) ActiveMQ。
          消息生產(chǎn)者生產(chǎn)消息發(fā)布到queue中,然后消息消費(fèi)者從queue中取出,并且消費(fèi)消息。這里需要注意:消息被消費(fèi)者消費(fèi)以后,queue中不再有存儲(chǔ),所以消息消費(fèi)者不可消費(fèi)到已經(jīng)被消費(fèi)的消息。Queue支持存在多個(gè)消息消費(fèi)者,但是對(duì)一個(gè)消息而言,只會(huì)有一個(gè)消費(fèi)者可以消費(fèi) 啟動(dòng)完成后,在瀏覽器中輸入 http://127.0.0.1:8161/admin/ 來(lái)訪(fǎng)問(wèn) ActiveMQ 的服務(wù)器,用戶(hù)名和密碼是 admin/admin。如下:

          activemq

          我們可以看到有 Queues 和 Topics 這兩個(gè)選項(xiàng),這兩個(gè)選項(xiàng)分別是點(diǎn)對(duì)點(diǎn)消息和發(fā)布/訂閱消息的查看窗口。何為點(diǎn)對(duì)點(diǎn)消息和發(fā)布/訂閱消息呢?

          點(diǎn)對(duì)點(diǎn)消息:消息生產(chǎn)者生產(chǎn)消息發(fā)布到 queue 中,然后消息消費(fèi)者從 queue 中取出,并且消費(fèi)消息。這里需要注意:消息被消費(fèi)者消費(fèi)以后,queue 中不再有存儲(chǔ),所以消息消費(fèi)者不可消費(fèi)到已經(jīng)被消費(fèi)的消息。Queue 支持存在多個(gè)消息消費(fèi)者,但是對(duì)一個(gè)消息而言,只會(huì)有一個(gè)消費(fèi)者可以消費(fèi)。

          發(fā)布/訂閱消息:消息生產(chǎn)者(發(fā)布)將消息發(fā)布到 topic 中,同時(shí)有多個(gè)消息消費(fèi)者(訂閱)消費(fèi)該消息。和點(diǎn)對(duì)點(diǎn)方式不同,發(fā)布到 topic 的消息會(huì)被所有訂閱者消費(fèi)。下面分析具體的實(shí)現(xiàn)方式。

          3. ActiveMQ集成

          3.1 依賴(lài)導(dǎo)入和配置

          在 Spring Boot 中集成 ActiveMQ 需要導(dǎo)入如下 starter 依賴(lài):

          <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-activemq</artifactId>
          </dependency>

          然后在 application.yml 配置文件中,對(duì) activemq 做一下配置:

          spring:
            activemq:
             # activemq url
              broker-url: tcp://localhost:61616
              in-memory: true
              pool:
                # 如果此處設(shè)置為true,需要添加activemq-pool的依賴(lài)包,否則會(huì)自動(dòng)配置失敗,無(wú)法注入JmsMessagingTemplate
                enabled: false

          3.2 Queue 和 Topic 的創(chuàng)建

          首先我們需要?jiǎng)?chuàng)建兩種消息 Queue 和 Topic,這兩種消息的創(chuàng)建,我們放到 ActiveMqConfig 中來(lái)創(chuàng)建,如下:

          /**
           * activemq的配置
           * @author  shengwu ni
           */

          @Configuration
          public class ActiveMqConfig {
              /**
               * 發(fā)布/訂閱模式隊(duì)列名稱(chēng)
               */

              public static final String TOPIC_NAME = "activemq.topic";
              /**
               * 點(diǎn)對(duì)點(diǎn)模式隊(duì)列名稱(chēng)
               */

              public static final String QUEUE_NAME = "activemq.queue";

              @Bean
              public Destination topic() {
                  return new ActiveMQTopic(TOPIC_NAME);
              }

              @Bean
              public Destination queue() {
                  return new ActiveMQQueue(QUEUE_NAME);
              }
          }

          可以看出創(chuàng)建 Queue 和 Topic 兩種消息,分別使用 new ActiveMQQueuenew ActiveMQTopic 來(lái)創(chuàng)建,分別跟上對(duì)應(yīng)消息的名稱(chēng)即可。這樣在其他地方就可以直接將這兩種消息作為組件注入進(jìn)來(lái)了。

          3.3 消息的發(fā)送接口

          在 Spring Boot 中,我們只要注入 JmsMessagingTemplate 模板即可快速發(fā)送消息,如下:

          /**
           * 消息發(fā)送者
           * @author shengwu ni
           */

          @Service
          public class MsgProducer {

              @Resource
              private JmsMessagingTemplate jmsMessagingTemplate;

              public void sendMessage(Destination destination, String msg) {
                  jmsMessagingTemplate.convertAndSend(destination, msg);
              }
          }

          convertAndSend 方法中第一個(gè)參數(shù)是消息發(fā)送的目的地,第二個(gè)參數(shù)是具體的消息內(nèi)容。

          3.4 點(diǎn)對(duì)點(diǎn)消息生產(chǎn)與消費(fèi)

          3.4.1 點(diǎn)對(duì)點(diǎn)消息的生產(chǎn)

          消息的生產(chǎn),我們放到 Controller 中來(lái)做,由于上面已經(jīng)生成了 Queue 消息的組件,所以在 Controller 中我們直接注入進(jìn)來(lái)即可。然后調(diào)用上文的消息發(fā)送方法 sendMessage 即可成功生產(chǎn)一條消息。

          /**
           * ActiveMQ controller
           * @author shengwu ni
           */

          @RestController
          @RequestMapping("/activemq")
          public class ActiveMqController {

              private static final Logger logger = LoggerFactory.getLogger(ActiveMqController.class);

              @Resource
              private MsgProducer producer;
              @Resource
              private Destination queue;

              @GetMapping("/send/queue")
              public String sendQueueMessage() {

                  logger.info("===開(kāi)始發(fā)送點(diǎn)對(duì)點(diǎn)消息===");
                  producer.sendMessage(queue, "Queue: hello activemq!");
                  return "success";
              }
          }

          3.4.2 點(diǎn)對(duì)點(diǎn)消息的消費(fèi)

          點(diǎn)對(duì)點(diǎn)消息的消費(fèi)很簡(jiǎn)單,只要我們指定目的地即可,jms 監(jiān)聽(tīng)器一直在監(jiān)聽(tīng)是否有消息過(guò)來(lái),如果有,則消費(fèi)。

          /**
           * 消息消費(fèi)者
           * @author shengwu ni
           */

          @Service
          public class QueueConsumer {

              /**
               * 接收點(diǎn)對(duì)點(diǎn)消息
               * @param msg
               */

              @JmsListener(destination = ActiveMqConfig.QUEUE_NAME)
              public void receiveQueueMsg(String msg) {
                  System.out.println("收到的消息為:" + msg);
              }
          }

          可以看出,使用 @JmsListener 注解來(lái)指定要監(jiān)聽(tīng)的目的地,在消息接收方法內(nèi)部,我們可以根據(jù)具體的業(yè)務(wù)需求做相應(yīng)的邏輯處理即可。

          3.4.3 測(cè)試一下

          啟動(dòng)項(xiàng)目,在瀏覽器中輸入:http://localhost:8081/activemq/send/queue,觀察控制臺(tái)的輸出日志,出現(xiàn)下面的日志說(shuō)明消息發(fā)送和消費(fèi)成功。

          收到的消息為:Queue: hello activemq!

          3.5 發(fā)布/訂閱消息的生產(chǎn)和消費(fèi)

          3.5.1 發(fā)布/訂閱消息的生產(chǎn)

          和點(diǎn)對(duì)點(diǎn)消息一樣,我們注入 topic 并調(diào)用 producer 的 sendMessage 方法即可發(fā)送訂閱消息,如下,不再贅述:

          @RestController
          @RequestMapping("/activemq")
          public class ActiveMqController {

              private static final Logger logger = LoggerFactory.getLogger(ActiveMqController.class);

              @Resource
              private MsgProducer producer;
              @Resource
              private Destination topic;

              @GetMapping("/send/topic")
              public String sendTopicMessage() {

                  logger.info("===開(kāi)始發(fā)送訂閱消息===");
                  producer.sendMessage(topic, "Topic: hello activemq!");
                  return "success";
              }
          }

          3.5.2 發(fā)布/訂閱消息的消費(fèi)

          發(fā)布/訂閱消息的消費(fèi)和點(diǎn)對(duì)點(diǎn)不同,訂閱消息支持多個(gè)消費(fèi)者一起消費(fèi)。其次,Spring Boot 中默認(rèn)的時(shí)點(diǎn)對(duì)點(diǎn)消息,所以在使用 topic 時(shí),會(huì)不起作用,我們需要在配置文件 application.yml 中添加一個(gè)配置:

          spring:
            jms:
              pub-sub-domain: true

          該配置是 false 的話(huà),則為點(diǎn)對(duì)點(diǎn)消息,也是 Spring Boot 默認(rèn)的。這樣是可以解決問(wèn)題,但是如果這樣配置的話(huà),上面提到的點(diǎn)對(duì)點(diǎn)消息又不能正常消費(fèi)了。所以二者不可兼得,這并非一個(gè)好的解決辦法。

          比較好的解決辦法是,我們定義一個(gè)工廠(chǎng),@JmsListener 注解默認(rèn)只接收 queue 消息,如果要接收 topic 消息,需要設(shè)置一下 containerFactory。我們還在上面的那個(gè) ActiveMqConfig 配置類(lèi)中添加:

          /**
           * activemq的配置
           *
           * @author shengwu ni
           */

          @Configuration
          public class ActiveMqConfig {
              // 省略其他內(nèi)容

              /**
               * JmsListener注解默認(rèn)只接收queue消息,如果要接收topic消息,需要設(shè)置containerFactory
               */

              @Bean
              public JmsListenerContainerFactory topicListenerContainer(ConnectionFactory connectionFactory) {
                  DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
                  factory.setConnectionFactory(connectionFactory);
                  // 相當(dāng)于在application.yml中配置:spring.jms.pub-sub-domain=true
                  factory.setPubSubDomain(true);
                  return factory;
              }
          }

          經(jīng)過(guò)這樣的配置之后,我們?cè)谙M(fèi)的時(shí)候,在 @JmsListener 注解中指定這個(gè)容器工廠(chǎng)即可消費(fèi) topic 消息。如下:

          /**
           * Topic消息消費(fèi)者
           * @author shengwu ni
           */

          @Service
          public class TopicConsumer1 {

              /**
               * 接收訂閱消息
               * @param msg
               */

              @JmsListener(destination = ActiveMqConfig.TOPIC_NAME, containerFactory = "topicListenerContainer")
              public void receiveTopicMsg(String msg) {
                  System.out.println("收到的消息為:" + msg);
              }

          }

          指定 containerFactory 屬性為上面我們自己配置的 topicListenerContainer 即可。由于 topic 消息可以多個(gè)消費(fèi),所以該消費(fèi)的類(lèi)可以拷貝幾個(gè)一起測(cè)試一下,這里我就不貼代碼了,可以參考我的源碼測(cè)試。

          3.5.3 測(cè)試一下

          啟動(dòng)項(xiàng)目,在瀏覽器中輸入:http://localhost:8081/activemq/send/topic,觀察控制臺(tái)的輸出日志,出現(xiàn)下面的日志說(shuō)明消息發(fā)送和消費(fèi)成功。

          收到的消息為:Topic: hello activemq!
          收到的消息為:Topic: hello activemq!

          4. 總結(jié)

          本章主要介紹了 jms 和 activemq 的相關(guān)概念、activemq 的安裝與啟動(dòng)。詳細(xì)分析了 Spring Boot 中點(diǎn)對(duì)點(diǎn)消息和發(fā)布/訂閱消息兩種方式的配置、消息生產(chǎn)和消費(fèi)方式。ActiveMQ 是能力強(qiáng)勁的開(kāi)源消息總線(xiàn),在異步消息的處理上很有用,希望大家好好消化一下。

          -End-

          最近有一些小伙伴,讓我?guī)兔φ乙恍?nbsp;面試題 資料,于是我翻遍了收藏的 5T 資料后,匯總整理出來(lái),可以說(shuō)是程序員面試必備!所有資料都整理到網(wǎng)盤(pán)了,歡迎下載!

          點(diǎn)擊??卡片,關(guān)注后回復(fù)【面試題】即可獲取

          在看點(diǎn)這里好文分享給更多人↓↓

          瀏覽 31
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  sm调教视频网站 | 国产对白视频在线观看 | 偷拍自拍第五页 | 婷婷五月视频在线观看 | 日本爱爱视频网站 |