Spring Boot 中集成ActiveMQ
作者丨武哥
來(lái)源丨武哥聊編程
1. JMS 和 ActiveMQ 介紹
1.1 JMS 是啥
百度百科的解釋?zhuān)?/p>
JMS 即 Java 消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個(gè)Java平臺(tái)中關(guān)于面向消息中間件(MOM)的 API,用于在兩個(gè)應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信。Java 消息服務(wù)是一個(gè)與具體平臺(tái)無(wú)關(guān)的 API,絕大多數(shù) MOM 提供商都對(duì) JMS 提供支持。
JMS 只是接口,不同的提供商或者開(kāi)源組織對(duì)其有不同的實(shí)現(xiàn),ActiveMQ 就是其中之一,它支持JMS,是 Apache 推出的。JMS 中有幾個(gè)對(duì)象模型:
連接工廠(chǎng):ConnectionFactory
JMS連接:Connection
JMS會(huì)話(huà):Session
JMS目的:Destination
JMS生產(chǎn)者:Producer
JMS消費(fèi)者:Consumer
JMS消息兩種類(lèi)型:點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱。
可以看出 JMS 實(shí)際上和 JDBC 有點(diǎn)類(lèi)似,JDBC 是可以用來(lái)訪(fǎng)問(wèn)許多不同關(guān)系數(shù)據(jù)庫(kù)的 API,而 JMS 則提供同樣與廠(chǎng)商無(wú)關(guān)的訪(fǎng)問(wèn)方法,以訪(fǎng)問(wèn)消息收發(fā)服務(wù)。本文主要使用 ActiveMQ。
1.2 ActiveMQ
ActiveMQ 是 Apache 的一個(gè)能力強(qiáng)勁的開(kāi)源消息總線(xiàn)。ActiveMQ 完全支持JMS1.1和J2EE 1.4規(guī)范,盡管 JMS 規(guī)范出臺(tái)已經(jīng)是很久的事情了,但是 JMS 在當(dāng)今的 Java EE 應(yīng)用中間仍然扮演著特殊的地位。ActiveMQ 用在異步消息的處理上,所謂異步消息即消息發(fā)送者無(wú)需等待消息接收者的處理以及返回,甚至無(wú)需關(guān)心消息是否發(fā)送成功。
異步消息主要有兩種目的地形式,隊(duì)列(queue)和主題(topic),隊(duì)列用于點(diǎn)對(duì)點(diǎn)形式的消息通信,主題用于發(fā)布/訂閱式的消息通信。本章節(jié)主要來(lái)學(xué)習(xí)一下在 Spring Boot 中如何使用這兩種形式的消息。
2. ActiveMQ安裝
使用 ActiveMQ 首先需要去官網(wǎng)下載,官網(wǎng)地址為:http://activemq.apache.org/
本課程使用的版本是 apache-activemq-5.15.3,下載后解壓縮會(huì)有一個(gè)名為 apache-activemq-5.15.3 的文件夾,沒(méi)錯(cuò),這就安裝好了,非常簡(jiǎn)單,開(kāi)箱即用。打開(kāi)文件夾會(huì)看到里面有個(gè) activemq-all-5.15.3.jar,這個(gè) jar 我們是可以加進(jìn)工程里的,但是使用 maven 的話(huà),這個(gè) jar 我們不需要。
在使用 ActiveMQ 之前,首先得先啟動(dòng),剛才解壓后的目錄中有個(gè) bin 目錄,里面有 win32 和 win64 兩個(gè)目錄,根據(jù)自己電腦選擇其中一個(gè)打開(kāi)運(yùn)行里面的 activemq.bat 即可啟動(dòng) ActiveMQ。
消息生產(chǎn)者生產(chǎn)消息發(fā)布到queue中,然后消息消費(fèi)者從queue中取出,并且消費(fèi)消息。這里需要注意:消息被消費(fèi)者消費(fèi)以后,queue中不再有存儲(chǔ),所以消息消費(fèi)者不可消費(fèi)到已經(jīng)被消費(fèi)的消息。Queue支持存在多個(gè)消息消費(fèi)者,但是對(duì)一個(gè)消息而言,只會(huì)有一個(gè)消費(fèi)者可以消費(fèi) 啟動(dòng)完成后,在瀏覽器中輸入 http://127.0.0.1:8161/admin/ 來(lái)訪(fǎng)問(wèn) ActiveMQ 的服務(wù)器,用戶(hù)名和密碼是 admin/admin。如下:
我們可以看到有 Queues 和 Topics 這兩個(gè)選項(xiàng),這兩個(gè)選項(xiàng)分別是點(diǎn)對(duì)點(diǎn)消息和發(fā)布/訂閱消息的查看窗口。何為點(diǎn)對(duì)點(diǎn)消息和發(fā)布/訂閱消息呢?
點(diǎn)對(duì)點(diǎn)消息:消息生產(chǎn)者生產(chǎn)消息發(fā)布到 queue 中,然后消息消費(fèi)者從 queue 中取出,并且消費(fèi)消息。這里需要注意:消息被消費(fèi)者消費(fèi)以后,queue 中不再有存儲(chǔ),所以消息消費(fèi)者不可消費(fèi)到已經(jīng)被消費(fèi)的消息。Queue 支持存在多個(gè)消息消費(fèi)者,但是對(duì)一個(gè)消息而言,只會(huì)有一個(gè)消費(fèi)者可以消費(fèi)。
發(fā)布/訂閱消息:消息生產(chǎn)者(發(fā)布)將消息發(fā)布到 topic 中,同時(shí)有多個(gè)消息消費(fèi)者(訂閱)消費(fèi)該消息。和點(diǎn)對(duì)點(diǎn)方式不同,發(fā)布到 topic 的消息會(huì)被所有訂閱者消費(fèi)。下面分析具體的實(shí)現(xiàn)方式。
3. ActiveMQ集成
3.1 依賴(lài)導(dǎo)入和配置
在 Spring Boot 中集成 ActiveMQ 需要導(dǎo)入如下 starter 依賴(lài):
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
然后在 application.yml 配置文件中,對(duì) activemq 做一下配置:
spring:
activemq:
# activemq url
broker-url: tcp://localhost:61616
in-memory: true
pool:
# 如果此處設(shè)置為true,需要添加activemq-pool的依賴(lài)包,否則會(huì)自動(dòng)配置失敗,無(wú)法注入JmsMessagingTemplate
enabled: false
3.2 Queue 和 Topic 的創(chuàng)建
首先我們需要?jiǎng)?chuàng)建兩種消息 Queue 和 Topic,這兩種消息的創(chuàng)建,我們放到 ActiveMqConfig 中來(lái)創(chuàng)建,如下:
/**
* activemq的配置
* @author shengwu ni
*/
@Configuration
public class ActiveMqConfig {
/**
* 發(fā)布/訂閱模式隊(duì)列名稱(chēng)
*/
public static final String TOPIC_NAME = "activemq.topic";
/**
* 點(diǎn)對(duì)點(diǎn)模式隊(duì)列名稱(chēng)
*/
public static final String QUEUE_NAME = "activemq.queue";
@Bean
public Destination topic() {
return new ActiveMQTopic(TOPIC_NAME);
}
@Bean
public Destination queue() {
return new ActiveMQQueue(QUEUE_NAME);
}
}
可以看出創(chuàng)建 Queue 和 Topic 兩種消息,分別使用 new ActiveMQQueue 和 new ActiveMQTopic 來(lái)創(chuàng)建,分別跟上對(duì)應(yīng)消息的名稱(chēng)即可。這樣在其他地方就可以直接將這兩種消息作為組件注入進(jìn)來(lái)了。
3.3 消息的發(fā)送接口
在 Spring Boot 中,我們只要注入 JmsMessagingTemplate 模板即可快速發(fā)送消息,如下:
/**
* 消息發(fā)送者
* @author shengwu ni
*/
@Service
public class MsgProducer {
@Resource
private JmsMessagingTemplate jmsMessagingTemplate;
public void sendMessage(Destination destination, String msg) {
jmsMessagingTemplate.convertAndSend(destination, msg);
}
}
convertAndSend 方法中第一個(gè)參數(shù)是消息發(fā)送的目的地,第二個(gè)參數(shù)是具體的消息內(nèi)容。
3.4 點(diǎn)對(duì)點(diǎn)消息生產(chǎn)與消費(fèi)
3.4.1 點(diǎn)對(duì)點(diǎn)消息的生產(chǎn)
消息的生產(chǎn),我們放到 Controller 中來(lái)做,由于上面已經(jīng)生成了 Queue 消息的組件,所以在 Controller 中我們直接注入進(jìn)來(lái)即可。然后調(diào)用上文的消息發(fā)送方法 sendMessage 即可成功生產(chǎn)一條消息。
/**
* ActiveMQ controller
* @author shengwu ni
*/
@RestController
@RequestMapping("/activemq")
public class ActiveMqController {
private static final Logger logger = LoggerFactory.getLogger(ActiveMqController.class);
@Resource
private MsgProducer producer;
@Resource
private Destination queue;
@GetMapping("/send/queue")
public String sendQueueMessage() {
logger.info("===開(kāi)始發(fā)送點(diǎn)對(duì)點(diǎn)消息===");
producer.sendMessage(queue, "Queue: hello activemq!");
return "success";
}
}
3.4.2 點(diǎn)對(duì)點(diǎn)消息的消費(fèi)
點(diǎn)對(duì)點(diǎn)消息的消費(fèi)很簡(jiǎn)單,只要我們指定目的地即可,jms 監(jiān)聽(tīng)器一直在監(jiān)聽(tīng)是否有消息過(guò)來(lái),如果有,則消費(fèi)。
/**
* 消息消費(fèi)者
* @author shengwu ni
*/
@Service
public class QueueConsumer {
/**
* 接收點(diǎn)對(duì)點(diǎn)消息
* @param msg
*/
@JmsListener(destination = ActiveMqConfig.QUEUE_NAME)
public void receiveQueueMsg(String msg) {
System.out.println("收到的消息為:" + msg);
}
}
可以看出,使用 @JmsListener 注解來(lái)指定要監(jiān)聽(tīng)的目的地,在消息接收方法內(nèi)部,我們可以根據(jù)具體的業(yè)務(wù)需求做相應(yīng)的邏輯處理即可。
3.4.3 測(cè)試一下
啟動(dòng)項(xiàng)目,在瀏覽器中輸入:http://localhost:8081/activemq/send/queue,觀察控制臺(tái)的輸出日志,出現(xiàn)下面的日志說(shuō)明消息發(fā)送和消費(fèi)成功。
收到的消息為:Queue: hello activemq!
3.5 發(fā)布/訂閱消息的生產(chǎn)和消費(fèi)
3.5.1 發(fā)布/訂閱消息的生產(chǎn)
和點(diǎn)對(duì)點(diǎn)消息一樣,我們注入 topic 并調(diào)用 producer 的 sendMessage 方法即可發(fā)送訂閱消息,如下,不再贅述:
@RestController
@RequestMapping("/activemq")
public class ActiveMqController {
private static final Logger logger = LoggerFactory.getLogger(ActiveMqController.class);
@Resource
private MsgProducer producer;
@Resource
private Destination topic;
@GetMapping("/send/topic")
public String sendTopicMessage() {
logger.info("===開(kāi)始發(fā)送訂閱消息===");
producer.sendMessage(topic, "Topic: hello activemq!");
return "success";
}
}
3.5.2 發(fā)布/訂閱消息的消費(fèi)
發(fā)布/訂閱消息的消費(fèi)和點(diǎn)對(duì)點(diǎn)不同,訂閱消息支持多個(gè)消費(fèi)者一起消費(fèi)。其次,Spring Boot 中默認(rèn)的時(shí)點(diǎn)對(duì)點(diǎn)消息,所以在使用 topic 時(shí),會(huì)不起作用,我們需要在配置文件 application.yml 中添加一個(gè)配置:
spring:
jms:
pub-sub-domain: true
該配置是 false 的話(huà),則為點(diǎn)對(duì)點(diǎn)消息,也是 Spring Boot 默認(rèn)的。這樣是可以解決問(wèn)題,但是如果這樣配置的話(huà),上面提到的點(diǎn)對(duì)點(diǎn)消息又不能正常消費(fèi)了。所以二者不可兼得,這并非一個(gè)好的解決辦法。
比較好的解決辦法是,我們定義一個(gè)工廠(chǎng),@JmsListener 注解默認(rèn)只接收 queue 消息,如果要接收 topic 消息,需要設(shè)置一下 containerFactory。我們還在上面的那個(gè) ActiveMqConfig 配置類(lèi)中添加:
/**
* activemq的配置
*
* @author shengwu ni
*/
@Configuration
public class ActiveMqConfig {
// 省略其他內(nèi)容
/**
* JmsListener注解默認(rèn)只接收queue消息,如果要接收topic消息,需要設(shè)置containerFactory
*/
@Bean
public JmsListenerContainerFactory topicListenerContainer(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 相當(dāng)于在application.yml中配置:spring.jms.pub-sub-domain=true
factory.setPubSubDomain(true);
return factory;
}
}
經(jīng)過(guò)這樣的配置之后,我們?cè)谙M(fèi)的時(shí)候,在 @JmsListener 注解中指定這個(gè)容器工廠(chǎng)即可消費(fèi) topic 消息。如下:
/**
* Topic消息消費(fèi)者
* @author shengwu ni
*/
@Service
public class TopicConsumer1 {
/**
* 接收訂閱消息
* @param msg
*/
@JmsListener(destination = ActiveMqConfig.TOPIC_NAME, containerFactory = "topicListenerContainer")
public void receiveTopicMsg(String msg) {
System.out.println("收到的消息為:" + msg);
}
}
指定 containerFactory 屬性為上面我們自己配置的 topicListenerContainer 即可。由于 topic 消息可以多個(gè)消費(fèi),所以該消費(fèi)的類(lèi)可以拷貝幾個(gè)一起測(cè)試一下,這里我就不貼代碼了,可以參考我的源碼測(cè)試。
3.5.3 測(cè)試一下
啟動(dòng)項(xiàng)目,在瀏覽器中輸入:http://localhost:8081/activemq/send/topic,觀察控制臺(tái)的輸出日志,出現(xiàn)下面的日志說(shuō)明消息發(fā)送和消費(fèi)成功。
收到的消息為:Topic: hello activemq!
收到的消息為:Topic: hello activemq!
4. 總結(jié)
本章主要介紹了 jms 和 activemq 的相關(guān)概念、activemq 的安裝與啟動(dòng)。詳細(xì)分析了 Spring Boot 中點(diǎn)對(duì)點(diǎn)消息和發(fā)布/訂閱消息兩種方式的配置、消息生產(chǎn)和消費(fèi)方式。ActiveMQ 是能力強(qiáng)勁的開(kāi)源消息總線(xiàn),在異步消息的處理上很有用,希望大家好好消化一下。
-End-
最近有一些小伙伴,讓我?guī)兔φ乙恍?nbsp;面試題 資料,于是我翻遍了收藏的 5T 資料后,匯總整理出來(lái),可以說(shuō)是程序員面試必備!所有資料都整理到網(wǎng)盤(pán)了,歡迎下載!

面試題】即可獲取
