<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spring Boot 中集成ActiveMQ

          共 9789字,需瀏覽 20分鐘

           ·

          2021-03-13 01:21

          本來已收錄到我寫的10萬字Springboot經(jīng)典學(xué)習(xí)筆記中,筆記在持續(xù)更新……文末有領(lǐng)取方式

          1. JMS 和 ActiveMQ 介紹

          1.1 JMS 是啥

          百度百科的解釋:

          JMS 即 Java 消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個Java平臺中關(guān)于面向消息中間件(MOM)的 API,用于在兩個應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進行異步通信。Java 消息服務(wù)是一個與具體平臺無關(guān)的 API,絕大多數(shù) MOM 提供商都對 JMS 提供支持。

          JMS 只是接口,不同的提供商或者開源組織對其有不同的實現(xiàn),ActiveMQ 就是其中之一,它支持JMS,是 Apache 推出的。JMS 中有幾個對象模型:

          連接工廠:ConnectionFactory
          JMS連接:Connection
          JMS會話:Session
          JMS目的:Destination
          JMS生產(chǎn)者:Producer
          JMS消費者:Consumer
          JMS消息兩種類型:點對點和發(fā)布/訂閱。

          可以看出 JMS 實際上和 JDBC 有點類似,JDBC 是可以用來訪問許多不同關(guān)系數(shù)據(jù)庫的 API,而 JMS 則提供同樣與廠商無關(guān)的訪問方法,以訪問消息收發(fā)服務(wù)。本文主要使用 ActiveMQ。

          1.2 ActiveMQ

          ActiveMQ 是 Apache 的一個能力強勁的開源消息總線。ActiveMQ 完全支持JMS1.1和J2EE 1.4規(guī)范,盡管 JMS 規(guī)范出臺已經(jīng)是很久的事情了,但是 JMS 在當(dāng)今的 Java EE 應(yīng)用中間仍然扮演著特殊的地位。ActiveMQ 用在異步消息的處理上,所謂異步消息即消息發(fā)送者無需等待消息接收者的處理以及返回,甚至無需關(guān)心消息是否發(fā)送成功。

          異步消息主要有兩種目的地形式,隊列(queue)和主題(topic),隊列用于點對點形式的消息通信,主題用于發(fā)布/訂閱式的消息通信。本章節(jié)主要來學(xué)習(xí)一下在 Spring Boot 中如何使用這兩種形式的消息。

          2. ActiveMQ安裝

          使用 ActiveMQ 首先需要去官網(wǎng)下載,官網(wǎng)地址為:http://activemq.apache.org/
          本課程使用的版本是 apache-activemq-5.15.3,下載后解壓縮會有一個名為 apache-activemq-5.15.3 的文件夾,沒錯,這就安裝好了,非常簡單,開箱即用。打開文件夾會看到里面有個 activemq-all-5.15.3.jar,這個 jar 我們是可以加進工程里的,但是使用 maven 的話,這個 jar 我們不需要。

          在使用 ActiveMQ 之前,首先得先啟動,剛才解壓后的目錄中有個 bin 目錄,里面有 win32 和 win64 兩個目錄,根據(jù)自己電腦選擇其中一個打開運行里面的 activemq.bat 即可啟動 ActiveMQ。
          消息生產(chǎn)者生產(chǎn)消息發(fā)布到queue中,然后消息消費者從queue中取出,并且消費消息。這里需要注意:消息被消費者消費以后,queue中不再有存儲,所以消息消費者不可消費到已經(jīng)被消費的消息。Queue支持存在多個消息消費者,但是對一個消息而言,只會有一個消費者可以消費 啟動完成后,在瀏覽器中輸入 http://127.0.0.1:8161/admin/ 來訪問 ActiveMQ 的服務(wù)器,用戶名和密碼是 admin/admin。如下:

          activemq

          我們可以看到有 Queues 和 Topics 這兩個選項,這兩個選項分別是點對點消息和發(fā)布/訂閱消息的查看窗口。何為點對點消息和發(fā)布/訂閱消息呢?

          點對點消息:消息生產(chǎn)者生產(chǎn)消息發(fā)布到 queue 中,然后消息消費者從 queue 中取出,并且消費消息。這里需要注意:消息被消費者消費以后,queue 中不再有存儲,所以消息消費者不可消費到已經(jīng)被消費的消息。Queue 支持存在多個消息消費者,但是對一個消息而言,只會有一個消費者可以消費。

          發(fā)布/訂閱消息:消息生產(chǎn)者(發(fā)布)將消息發(fā)布到 topic 中,同時有多個消息消費者(訂閱)消費該消息。和點對點方式不同,發(fā)布到 topic 的消息會被所有訂閱者消費。下面分析具體的實現(xiàn)方式。

          3. ActiveMQ集成

          3.1 依賴導(dǎo)入和配置

          在 Spring Boot 中集成 ActiveMQ 需要導(dǎo)入如下 starter 依賴:

          <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-activemq</artifactId>
          </dependency>

          然后在 application.yml 配置文件中,對 activemq 做一下配置:

          spring:
            activemq:
             # activemq url
              broker-url: tcp://localhost:61616
              in-memory: true
              pool:
                # 如果此處設(shè)置為true,需要添加activemq-pool的依賴包,否則會自動配置失敗,無法注入JmsMessagingTemplate
                enabled: false

          3.2 Queue 和 Topic 的創(chuàng)建

          首先我們需要創(chuàng)建兩種消息 Queue 和 Topic,這兩種消息的創(chuàng)建,我們放到 ActiveMqConfig 中來創(chuàng)建,如下:

          /**
           * activemq的配置
           * @author  shengwu ni
           */

          @Configuration
          public class ActiveMqConfig {
              /**
               * 發(fā)布/訂閱模式隊列名稱
               */

              public static final String TOPIC_NAME = "activemq.topic";
              /**
               * 點對點模式隊列名稱
               */

              public static final String QUEUE_NAME = "activemq.queue";

              @Bean
              public Destination topic() {
                  return new ActiveMQTopic(TOPIC_NAME);
              }

              @Bean
              public Destination queue() {
                  return new ActiveMQQueue(QUEUE_NAME);
              }
          }

          可以看出創(chuàng)建 Queue 和 Topic 兩種消息,分別使用 new ActiveMQQueuenew ActiveMQTopic 來創(chuàng)建,分別跟上對應(yīng)消息的名稱即可。這樣在其他地方就可以直接將這兩種消息作為組件注入進來了。

          3.3 消息的發(fā)送接口

          在 Spring Boot 中,我們只要注入 JmsMessagingTemplate 模板即可快速發(fā)送消息,如下:

          /**
           * 消息發(fā)送者
           * @author shengwu ni
           */

          @Service
          public class MsgProducer {

              @Resource
              private JmsMessagingTemplate jmsMessagingTemplate;

              public void sendMessage(Destination destination, String msg) {
                  jmsMessagingTemplate.convertAndSend(destination, msg);
              }
          }

          convertAndSend 方法中第一個參數(shù)是消息發(fā)送的目的地,第二個參數(shù)是具體的消息內(nèi)容。

          3.4 點對點消息生產(chǎn)與消費

          3.4.1 點對點消息的生產(chǎn)

          消息的生產(chǎn),我們放到 Controller 中來做,由于上面已經(jīng)生成了 Queue 消息的組件,所以在 Controller 中我們直接注入進來即可。然后調(diào)用上文的消息發(fā)送方法 sendMessage 即可成功生產(chǎn)一條消息。

          /**
           * ActiveMQ controller
           * @author shengwu ni
           */

          @RestController
          @RequestMapping("/activemq")
          public class ActiveMqController {

              private static final Logger logger = LoggerFactory.getLogger(ActiveMqController.class);

              @Resource
              private MsgProducer producer;
              @Resource
              private Destination queue;

              @GetMapping("/send/queue")
              public String sendQueueMessage() {

                  logger.info("===開始發(fā)送點對點消息===");
                  producer.sendMessage(queue, "Queue: hello activemq!");
                  return "success";
              }
          }

          3.4.2 點對點消息的消費

          點對點消息的消費很簡單,只要我們指定目的地即可,jms 監(jiān)聽器一直在監(jiān)聽是否有消息過來,如果有,則消費。

          /**
           * 消息消費者
           * @author shengwu ni
           */

          @Service
          public class QueueConsumer {

              /**
               * 接收點對點消息
               * @param msg
               */

              @JmsListener(destination = ActiveMqConfig.QUEUE_NAME)
              public void receiveQueueMsg(String msg) {
                  System.out.println("收到的消息為:" + msg);
              }
          }

          可以看出,使用 @JmsListener 注解來指定要監(jiān)聽的目的地,在消息接收方法內(nèi)部,我們可以根據(jù)具體的業(yè)務(wù)需求做相應(yīng)的邏輯處理即可。

          3.4.3 測試一下

          啟動項目,在瀏覽器中輸入:http://localhost:8081/activemq/send/queue,觀察控制臺的輸出日志,出現(xiàn)下面的日志說明消息發(fā)送和消費成功。

          收到的消息為:Queue: hello activemq!

          3.5 發(fā)布/訂閱消息的生產(chǎn)和消費

          3.5.1 發(fā)布/訂閱消息的生產(chǎn)

          和點對點消息一樣,我們注入 topic 并調(diào)用 producer 的 sendMessage 方法即可發(fā)送訂閱消息,如下,不再贅述:

          @RestController
          @RequestMapping("/activemq")
          public class ActiveMqController {

              private static final Logger logger = LoggerFactory.getLogger(ActiveMqController.class);

              @Resource
              private MsgProducer producer;
              @Resource
              private Destination topic;

              @GetMapping("/send/topic")
              public String sendTopicMessage() {

                  logger.info("===開始發(fā)送訂閱消息===");
                  producer.sendMessage(topic, "Topic: hello activemq!");
                  return "success";
              }
          }

          3.5.2 發(fā)布/訂閱消息的消費

          發(fā)布/訂閱消息的消費和點對點不同,訂閱消息支持多個消費者一起消費。其次,Spring Boot 中默認(rèn)的時點對點消息,所以在使用 topic 時,會不起作用,我們需要在配置文件 application.yml 中添加一個配置:

          spring:
            jms:
              pub-sub-domain: true

          該配置是 false 的話,則為點對點消息,也是 Spring Boot 默認(rèn)的。這樣是可以解決問題,但是如果這樣配置的話,上面提到的點對點消息又不能正常消費了。所以二者不可兼得,這并非一個好的解決辦法。

          比較好的解決辦法是,我們定義一個工廠,@JmsListener 注解默認(rèn)只接收 queue 消息,如果要接收 topic 消息,需要設(shè)置一下 containerFactory。我們還在上面的那個 ActiveMqConfig 配置類中添加:

          /**
           * activemq的配置
           *
           * @author shengwu ni
           */

          @Configuration
          public class ActiveMqConfig {
              // 省略其他內(nèi)容

              /**
               * JmsListener注解默認(rèn)只接收queue消息,如果要接收topic消息,需要設(shè)置containerFactory
               */

              @Bean
              public JmsListenerContainerFactory topicListenerContainer(ConnectionFactory connectionFactory) {
                  DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
                  factory.setConnectionFactory(connectionFactory);
                  // 相當(dāng)于在application.yml中配置:spring.jms.pub-sub-domain=true
                  factory.setPubSubDomain(true);
                  return factory;
              }
          }

          經(jīng)過這樣的配置之后,我們在消費的時候,在 @JmsListener 注解中指定這個容器工廠即可消費 topic 消息。如下:

          /**
           * Topic消息消費者
           * @author shengwu ni
           */

          @Service
          public class TopicConsumer1 {

              /**
               * 接收訂閱消息
               * @param msg
               */

              @JmsListener(destination = ActiveMqConfig.TOPIC_NAME, containerFactory = "topicListenerContainer")
              public void receiveTopicMsg(String msg) {
                  System.out.println("收到的消息為:" + msg);
              }

          }

          指定 containerFactory 屬性為上面我們自己配置的 topicListenerContainer 即可。由于 topic 消息可以多個消費,所以該消費的類可以拷貝幾個一起測試一下,這里我就不貼代碼了,可以參考我的源碼測試。

          3.5.3 測試一下

          啟動項目,在瀏覽器中輸入:http://localhost:8081/activemq/send/topic,觀察控制臺的輸出日志,出現(xiàn)下面的日志說明消息發(fā)送和消費成功。

          收到的消息為:Topic: hello activemq!
          收到的消息為:Topic: hello activemq!

          4. 總結(jié)

          本章主要介紹了 jms 和 activemq 的相關(guān)概念、activemq 的安裝與啟動。詳細分析了 Spring Boot 中點對點消息和發(fā)布/訂閱消息兩種方式的配置、消息生產(chǎn)和消費方式。ActiveMQ 是能力強勁的開源消息總線,在異步消息的處理上很有用,希望大家好好消化一下。

          該文已收錄到我寫的《10萬字Springboot經(jīng)典學(xué)習(xí)筆記》中,點擊下面小卡片,進入【武哥聊編程】,回復(fù):筆記,即可免費獲取。

          點贊是最大的支持 

          瀏覽 73
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩一区高清 | 在线视频这里只有精品6 | 北条麻妃内射 | 亚洲插逼视频 | 日韩一区二区不卡视频 |