Spring Boot 中集成ActiveMQ
本來已收錄到我寫的10萬字Springboot經(jīng)典學(xué)習(xí)筆記中,筆記在持續(xù)更新……文末有領(lǐng)取方式
1. JMS 和 ActiveMQ 介紹
1.1 JMS 是啥
百度百科的解釋:
JMS 即 Java 消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個Java平臺中關(guān)于面向消息中間件(MOM)的 API,用于在兩個應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進行異步通信。Java 消息服務(wù)是一個與具體平臺無關(guān)的 API,絕大多數(shù) MOM 提供商都對 JMS 提供支持。
JMS 只是接口,不同的提供商或者開源組織對其有不同的實現(xiàn),ActiveMQ 就是其中之一,它支持JMS,是 Apache 推出的。JMS 中有幾個對象模型:
連接工廠:ConnectionFactory
JMS連接:Connection
JMS會話:Session
JMS目的:Destination
JMS生產(chǎn)者:Producer
JMS消費者:Consumer
JMS消息兩種類型:點對點和發(fā)布/訂閱。
可以看出 JMS 實際上和 JDBC 有點類似,JDBC 是可以用來訪問許多不同關(guān)系數(shù)據(jù)庫的 API,而 JMS 則提供同樣與廠商無關(guān)的訪問方法,以訪問消息收發(fā)服務(wù)。本文主要使用 ActiveMQ。
1.2 ActiveMQ
ActiveMQ 是 Apache 的一個能力強勁的開源消息總線。ActiveMQ 完全支持JMS1.1和J2EE 1.4規(guī)范,盡管 JMS 規(guī)范出臺已經(jīng)是很久的事情了,但是 JMS 在當(dāng)今的 Java EE 應(yīng)用中間仍然扮演著特殊的地位。ActiveMQ 用在異步消息的處理上,所謂異步消息即消息發(fā)送者無需等待消息接收者的處理以及返回,甚至無需關(guān)心消息是否發(fā)送成功。
異步消息主要有兩種目的地形式,隊列(queue)和主題(topic),隊列用于點對點形式的消息通信,主題用于發(fā)布/訂閱式的消息通信。本章節(jié)主要來學(xué)習(xí)一下在 Spring Boot 中如何使用這兩種形式的消息。
2. ActiveMQ安裝
使用 ActiveMQ 首先需要去官網(wǎng)下載,官網(wǎng)地址為:http://activemq.apache.org/
本課程使用的版本是 apache-activemq-5.15.3,下載后解壓縮會有一個名為 apache-activemq-5.15.3 的文件夾,沒錯,這就安裝好了,非常簡單,開箱即用。打開文件夾會看到里面有個 activemq-all-5.15.3.jar,這個 jar 我們是可以加進工程里的,但是使用 maven 的話,這個 jar 我們不需要。
在使用 ActiveMQ 之前,首先得先啟動,剛才解壓后的目錄中有個 bin 目錄,里面有 win32 和 win64 兩個目錄,根據(jù)自己電腦選擇其中一個打開運行里面的 activemq.bat 即可啟動 ActiveMQ。
消息生產(chǎn)者生產(chǎn)消息發(fā)布到queue中,然后消息消費者從queue中取出,并且消費消息。這里需要注意:消息被消費者消費以后,queue中不再有存儲,所以消息消費者不可消費到已經(jīng)被消費的消息。Queue支持存在多個消息消費者,但是對一個消息而言,只會有一個消費者可以消費
啟動完成后,在瀏覽器中輸入 http://127.0.0.1:8161/admin/ 來訪問 ActiveMQ 的服務(wù)器,用戶名和密碼是 admin/admin。如下:

我們可以看到有 Queues 和 Topics 這兩個選項,這兩個選項分別是點對點消息和發(fā)布/訂閱消息的查看窗口。何為點對點消息和發(fā)布/訂閱消息呢?
點對點消息:消息生產(chǎn)者生產(chǎn)消息發(fā)布到 queue 中,然后消息消費者從 queue 中取出,并且消費消息。這里需要注意:消息被消費者消費以后,queue 中不再有存儲,所以消息消費者不可消費到已經(jīng)被消費的消息。Queue 支持存在多個消息消費者,但是對一個消息而言,只會有一個消費者可以消費。
發(fā)布/訂閱消息:消息生產(chǎn)者(發(fā)布)將消息發(fā)布到 topic 中,同時有多個消息消費者(訂閱)消費該消息。和點對點方式不同,發(fā)布到 topic 的消息會被所有訂閱者消費。下面分析具體的實現(xiàn)方式。
3. ActiveMQ集成
3.1 依賴導(dǎo)入和配置
在 Spring Boot 中集成 ActiveMQ 需要導(dǎo)入如下 starter 依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
然后在 application.yml 配置文件中,對 activemq 做一下配置:
spring:
activemq:
# activemq url
broker-url: tcp://localhost:61616
in-memory: true
pool:
# 如果此處設(shè)置為true,需要添加activemq-pool的依賴包,否則會自動配置失敗,無法注入JmsMessagingTemplate
enabled: false
3.2 Queue 和 Topic 的創(chuàng)建
首先我們需要創(chuàng)建兩種消息 Queue 和 Topic,這兩種消息的創(chuàng)建,我們放到 ActiveMqConfig 中來創(chuàng)建,如下:
/**
* activemq的配置
* @author shengwu ni
*/
@Configuration
public class ActiveMqConfig {
/**
* 發(fā)布/訂閱模式隊列名稱
*/
public static final String TOPIC_NAME = "activemq.topic";
/**
* 點對點模式隊列名稱
*/
public static final String QUEUE_NAME = "activemq.queue";
@Bean
public Destination topic() {
return new ActiveMQTopic(TOPIC_NAME);
}
@Bean
public Destination queue() {
return new ActiveMQQueue(QUEUE_NAME);
}
}
可以看出創(chuàng)建 Queue 和 Topic 兩種消息,分別使用 new ActiveMQQueue 和 new ActiveMQTopic 來創(chuàng)建,分別跟上對應(yīng)消息的名稱即可。這樣在其他地方就可以直接將這兩種消息作為組件注入進來了。
3.3 消息的發(fā)送接口
在 Spring Boot 中,我們只要注入 JmsMessagingTemplate 模板即可快速發(fā)送消息,如下:
/**
* 消息發(fā)送者
* @author shengwu ni
*/
@Service
public class MsgProducer {
@Resource
private JmsMessagingTemplate jmsMessagingTemplate;
public void sendMessage(Destination destination, String msg) {
jmsMessagingTemplate.convertAndSend(destination, msg);
}
}
convertAndSend 方法中第一個參數(shù)是消息發(fā)送的目的地,第二個參數(shù)是具體的消息內(nèi)容。
3.4 點對點消息生產(chǎn)與消費
3.4.1 點對點消息的生產(chǎn)
消息的生產(chǎn),我們放到 Controller 中來做,由于上面已經(jīng)生成了 Queue 消息的組件,所以在 Controller 中我們直接注入進來即可。然后調(diào)用上文的消息發(fā)送方法 sendMessage 即可成功生產(chǎn)一條消息。
/**
* ActiveMQ controller
* @author shengwu ni
*/
@RestController
@RequestMapping("/activemq")
public class ActiveMqController {
private static final Logger logger = LoggerFactory.getLogger(ActiveMqController.class);
@Resource
private MsgProducer producer;
@Resource
private Destination queue;
@GetMapping("/send/queue")
public String sendQueueMessage() {
logger.info("===開始發(fā)送點對點消息===");
producer.sendMessage(queue, "Queue: hello activemq!");
return "success";
}
}
3.4.2 點對點消息的消費
點對點消息的消費很簡單,只要我們指定目的地即可,jms 監(jiān)聽器一直在監(jiān)聽是否有消息過來,如果有,則消費。
/**
* 消息消費者
* @author shengwu ni
*/
@Service
public class QueueConsumer {
/**
* 接收點對點消息
* @param msg
*/
@JmsListener(destination = ActiveMqConfig.QUEUE_NAME)
public void receiveQueueMsg(String msg) {
System.out.println("收到的消息為:" + msg);
}
}
可以看出,使用 @JmsListener 注解來指定要監(jiān)聽的目的地,在消息接收方法內(nèi)部,我們可以根據(jù)具體的業(yè)務(wù)需求做相應(yīng)的邏輯處理即可。
3.4.3 測試一下
啟動項目,在瀏覽器中輸入:http://localhost:8081/activemq/send/queue,觀察控制臺的輸出日志,出現(xiàn)下面的日志說明消息發(fā)送和消費成功。
收到的消息為:Queue: hello activemq!
3.5 發(fā)布/訂閱消息的生產(chǎn)和消費
3.5.1 發(fā)布/訂閱消息的生產(chǎn)
和點對點消息一樣,我們注入 topic 并調(diào)用 producer 的 sendMessage 方法即可發(fā)送訂閱消息,如下,不再贅述:
@RestController
@RequestMapping("/activemq")
public class ActiveMqController {
private static final Logger logger = LoggerFactory.getLogger(ActiveMqController.class);
@Resource
private MsgProducer producer;
@Resource
private Destination topic;
@GetMapping("/send/topic")
public String sendTopicMessage() {
logger.info("===開始發(fā)送訂閱消息===");
producer.sendMessage(topic, "Topic: hello activemq!");
return "success";
}
}
3.5.2 發(fā)布/訂閱消息的消費
發(fā)布/訂閱消息的消費和點對點不同,訂閱消息支持多個消費者一起消費。其次,Spring Boot 中默認(rèn)的時點對點消息,所以在使用 topic 時,會不起作用,我們需要在配置文件 application.yml 中添加一個配置:
spring:
jms:
pub-sub-domain: true
該配置是 false 的話,則為點對點消息,也是 Spring Boot 默認(rèn)的。這樣是可以解決問題,但是如果這樣配置的話,上面提到的點對點消息又不能正常消費了。所以二者不可兼得,這并非一個好的解決辦法。
比較好的解決辦法是,我們定義一個工廠,@JmsListener 注解默認(rèn)只接收 queue 消息,如果要接收 topic 消息,需要設(shè)置一下 containerFactory。我們還在上面的那個 ActiveMqConfig 配置類中添加:
/**
* activemq的配置
*
* @author shengwu ni
*/
@Configuration
public class ActiveMqConfig {
// 省略其他內(nèi)容
/**
* JmsListener注解默認(rèn)只接收queue消息,如果要接收topic消息,需要設(shè)置containerFactory
*/
@Bean
public JmsListenerContainerFactory topicListenerContainer(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 相當(dāng)于在application.yml中配置:spring.jms.pub-sub-domain=true
factory.setPubSubDomain(true);
return factory;
}
}
經(jīng)過這樣的配置之后,我們在消費的時候,在 @JmsListener 注解中指定這個容器工廠即可消費 topic 消息。如下:
/**
* Topic消息消費者
* @author shengwu ni
*/
@Service
public class TopicConsumer1 {
/**
* 接收訂閱消息
* @param msg
*/
@JmsListener(destination = ActiveMqConfig.TOPIC_NAME, containerFactory = "topicListenerContainer")
public void receiveTopicMsg(String msg) {
System.out.println("收到的消息為:" + msg);
}
}
指定 containerFactory 屬性為上面我們自己配置的 topicListenerContainer 即可。由于 topic 消息可以多個消費,所以該消費的類可以拷貝幾個一起測試一下,這里我就不貼代碼了,可以參考我的源碼測試。
3.5.3 測試一下
啟動項目,在瀏覽器中輸入:http://localhost:8081/activemq/send/topic,觀察控制臺的輸出日志,出現(xiàn)下面的日志說明消息發(fā)送和消費成功。
收到的消息為:Topic: hello activemq!
收到的消息為:Topic: hello activemq!
4. 總結(jié)
本章主要介紹了 jms 和 activemq 的相關(guān)概念、activemq 的安裝與啟動。詳細分析了 Spring Boot 中點對點消息和發(fā)布/訂閱消息兩種方式的配置、消息生產(chǎn)和消費方式。ActiveMQ 是能力強勁的開源消息總線,在異步消息的處理上很有用,希望大家好好消化一下。
該文已收錄到我寫的《10萬字Springboot經(jīng)典學(xué)習(xí)筆記》中,點擊下面小卡片,進入【武哥聊編程】,回復(fù):筆記,即可免費獲取。
點贊是最大的支持

