信息互聯(lián)網(wǎng)開發(fā)時(shí)代,ActiveMQ了解下?
前言
隨著互聯(lián)網(wǎng)開發(fā)模式的不斷普及,同時(shí)也伴隨著移動(dòng)互聯(lián)網(wǎng)的不斷發(fā)展,給用戶發(fā)送消息成為每一款應(yīng)用都必不可少的組件之一,想想你的淘寶、微信、釘釘,你日常生活中肯定收到過很多很多的消息。當(dāng)然,這也是為了提高用戶體驗(yàn),及時(shí)告知用戶業(yè)務(wù)辦理結(jié)果,給用戶推送消息這樣的功能需求自然也是必不可少(但是現(xiàn)在垃圾信息確實(shí)太多了)。正是在這樣的背景之下,消息隊(duì)列這樣的組件應(yīng)運(yùn)而生,這樣的組件不僅滿足多端消息發(fā)送的需求,同時(shí)還可以作為服務(wù)器削峰去谷,降低系統(tǒng)壓力。今天我們就來簡單了解一款廣泛應(yīng)用的消息隊(duì)列組件——ActiveMQ。
ActiveMQ
ActiveMq什么
百度百科的介紹很簡單,但也說明了activeMq的特性:
Apache ActiveMQ是Apache軟件基金會(huì)所研發(fā)的開放源代碼消息中間件;由于ActiveMQ是一個(gè)純Java程序,因此只要操作系統(tǒng)支持Java虛擬機(jī),ActiveMQ便可執(zhí)行。
支持Java消息服務(wù)(JMS) 1.1 版本 [Spring Framework](https://baike.baidu.com/item/Spring Framework) 集群 (Clustering) 支持的編程語言包括:C、C++、C#、Delphi、Erlang、[Adobe Flash](https://baike.baidu.com/item/Adobe Flash)、Haskell、Java、JavaScript、Perl、PHP、Pike、Python和Ruby 協(xié)議支持包括:OpenWire、REST、STOMP、WS-Notification、MQTT、XMPP以及AMQP [1]
當(dāng)然,作為一個(gè)開發(fā)組件,對(duì)我們而言最重要的就是如何使用它,至于它的特性以及底層的原理,不再我們目前的討論范圍,下來我們來看下如何安裝、部署它。
安裝
這一塊的內(nèi)容就太簡單了,但由于這是一篇科普類文字,所以我就稍微羅嗦一點(diǎn)。但是我個(gè)人覺得,只要你個(gè)人動(dòng)手能力不是特別弱雞,那你大概看下官方文檔,應(yīng)該都可以安裝好,而且現(xiàn)在的組件都大同小異,不用看文檔你也能大概部署起來,無非就是找到bin文件夾,找到啟動(dòng)腳本,找到config文件夾,看下配置,然后先試啟動(dòng)下,一般都可以正常啟動(dòng)了,這些我覺得應(yīng)該是作為一個(gè)開發(fā)人員的最基本修養(yǎng),如果還不具備的小伙伴該好好面壁反思下了。
下載
訪問官方網(wǎng)站,選擇對(duì)應(yīng)的版本,然后下載
https://activemq.apache.org/

這里我選擇classic,也就是主流版本,Artemis是下一代的ActiveMQ,類似于開發(fā)版。
安裝
現(xiàn)在的開發(fā)組件,都是解壓即用的
啟動(dòng)
進(jìn)入bin目錄,直接雙擊運(yùn)行,這時(shí)候我們會(huì)看到,有個(gè)cmd窗口一閃而過,說明報(bào)錯(cuò)了,這時(shí)候我們打開cmd,這里有個(gè)小技巧,在當(dāng)前文件夾直接輸入cmd即可打開cmd窗口:

然后我們在cmd中再次執(zhí)行activemq.bat,這時(shí)候會(huì)發(fā)現(xiàn),cmd中回顯如下提示信息

根據(jù)提示信息,我們知道之前沒有添加參數(shù)(Usage才是最好的文檔,一定要習(xí)慣命令行,沒看明白的小伙伴繼續(xù)面壁),這時(shí)候我們知道啟動(dòng)的命令應(yīng)該是這樣的:
activemq.bat start
回車鍵入,完美啟動(dòng):

測試
我們在瀏覽器打開如下地址,如果正常訪問,說明ActiveMQ啟動(dòng)沒有任何問題:
localhost:8161
用戶名和密碼都是admin,如何修改我們后面再說:



這時(shí)候,我們的ActiveMQ就正常啟動(dòng)了。
docker安裝activemq
這里我們再補(bǔ)充下docker環(huán)境下,如何安裝activemq,很簡單只需要兩行命令即可:
$ docker pull webcenter/activemq:latest # 拉取最新版本的activemq
$ docker run -d --name myactivemq -p 61616:61616 -p 8161:8161 webcenter/activemq:latest # 啟動(dòng)
關(guān)于docker的使用,我們前面已經(jīng)講過了,還想要了解的小伙伴可以翻一下前面的內(nèi)容。
java整合ActiveMQ
創(chuàng)建java項(xiàng)目
這里我們創(chuàng)建一個(gè)java項(xiàng)目,簡單測試下ActiveMQ,創(chuàng)建maven項(xiàng)目,引入如下依賴:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.13.4</version>
</dependency>
創(chuàng)建消息生產(chǎn)者
下面代碼中的注釋已經(jīng)夠詳細(xì)了,需要注意的是session.createQueue("test-queue")中的test-queue是我們要發(fā)送消息的目標(biāo)隊(duì)列的名稱,消費(fèi)者也必須是一樣的名稱,才能正常消費(fèi)消息,否則是沒有辦法消費(fèi)的。
session.createTextMessage("hello!test-queue")是發(fā)送消息,這里的"hello!test-queue是我們發(fā)送的消息內(nèi)容。
/**
*
* 消息生產(chǎn)者
*
* @author syske
* @version 1.0
* @date 2021-04-24 10:20:22
*/
public class ActiveMqProducer {
public void mQProducerQueue() throws Exception{
//1、創(chuàng)建工廠連接對(duì)象,需要制定ip和端口號(hào),這里的端口就是activemq的端口,后面我們再來說如何配置
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
//2、使用連接工廠創(chuàng)建一個(gè)連接對(duì)象
Connection connection = connectionFactory.createConnection();
//3、開啟連接
connection.start();
//4、使用連接對(duì)象創(chuàng)建會(huì)話(session)對(duì)象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用會(huì)話對(duì)象創(chuàng)建目標(biāo)對(duì)象,包含queue和topic(一對(duì)一和一對(duì)多)
Queue queue = session.createQueue("test-queue");
//6、使用會(huì)話對(duì)象創(chuàng)建生產(chǎn)者對(duì)象
MessageProducer producer = session.createProducer(queue);
//7、使用會(huì)話對(duì)象創(chuàng)建一個(gè)消息對(duì)象
TextMessage textMessage = session.createTextMessage("hello!test-queue");
//8、發(fā)送消息
producer.send(textMessage);
//9、關(guān)閉資源
producer.close();
session.close();
connection.close();
}
}
創(chuàng)建消息消費(fèi)者
基本上和消息生產(chǎn)的代碼一致,最大的區(qū)別在于消費(fèi)者這里創(chuàng)建的是消費(fèi)者session.createConsumer(queue),同時(shí)啟動(dòng)了一個(gè)消息監(jiān)聽器,實(shí)時(shí)監(jiān)聽指定隊(duì)列,只要隊(duì)列中有消息進(jìn)來,消費(fèi)者就會(huì)執(zhí)行MessageListener中的操作,將消息消費(fèi)掉,這里的寫法采用了蘭姆達(dá)表達(dá)式
**
*
* 消息消費(fèi)者
*
* @author syske
* @version 1.0
* @date 2021-04-24 10:22:21
*/
public class ActiveMqConsumer {
public void mQConsumerQueue() throws Exception{
//1、創(chuàng)建工廠連接對(duì)象,需要制定ip和端口號(hào)
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
//2、使用連接工廠創(chuàng)建一個(gè)連接對(duì)象
Connection connection = connectionFactory.createConnection();
//3、開啟連接
connection.start();
//4、使用連接對(duì)象創(chuàng)建會(huì)話(session)對(duì)象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用會(huì)話對(duì)象創(chuàng)建目標(biāo)對(duì)象,包含queue和topic(一對(duì)一和一對(duì)多)
Queue queue = session.createQueue("test-queue");
//6、使用會(huì)話對(duì)象創(chuàng)建生產(chǎn)者對(duì)象
MessageConsumer consumer = session.createConsumer(queue);
//7、向consumer對(duì)象中設(shè)置一個(gè)messageListener對(duì)象,用來接收消息
consumer.setMessageListener(message-> {
// TODO Auto-generated method stub
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("消息內(nèi)容:" + textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8、程序等待接收用戶消息
System.in.read();
//9、關(guān)閉資源
consumer.close();
session.close();
connection.close();
}
}
測試
生產(chǎn)者測試
首先我們啟動(dòng)消費(fèi)者,生產(chǎn)消息
/**
* 生產(chǎn)消息
*
* @throws Exception
*/
@Test
public void produceMessage() throws Exception {
new ActiveMqProducer().mQProducerQueue();
}
運(yùn)行上面的測試代碼,然后打開ActiveMQ的管理頁面,我們會(huì)看到,多了一個(gè)消息隊(duì)列:

消息隊(duì)列名稱就是我們上面指定的test-queue,待消費(fèi)消息(pending)數(shù)量1,因?yàn)槲覀儠簳r(shí)沒有啟動(dòng)消費(fèi)者(consumer),所以消費(fèi)者數(shù)量是0,隊(duì)列中排隊(duì)的消息(enqueued)數(shù)量1,被消費(fèi)消息(dequeued)數(shù)量0。如果你再次啟動(dòng)生產(chǎn)者,消息隊(duì)列中的數(shù)據(jù)會(huì)變成2:

消費(fèi)者測試
這時(shí)候,我們啟動(dòng)消費(fèi)者
/**
* 消費(fèi)消息
* @throws Exception
*/
@Test
public void testConsumeMessage() throws Exception {
new ActiveMqConsumer().mQConsumerQueue();
}
然后我們會(huì)看到控制臺(tái)輸出如下信息:

這些消息就是我們在生產(chǎn)者中發(fā)送的消息。我們再訪問隊(duì)列管理頁面看一下:

和上面消費(fèi)者啟動(dòng)之前相比,消息已經(jīng)被消費(fèi),未消費(fèi)消息數(shù)量0,已消費(fèi)2,消費(fèi)者數(shù)量1。
擴(kuò)展
一般在實(shí)際項(xiàng)目中,我們會(huì)啟動(dòng)異步線程發(fā)送消息(消息生產(chǎn)者),然后由消息中心集中對(duì)消息進(jìn)行消費(fèi),并將消息推送至客戶端,比如微信公眾號(hào)、釘釘、安卓端、IOS端等。需要注意的是,不要在循環(huán)中調(diào)用異步線程,否則會(huì)導(dǎo)致線程池資源耗盡報(bào)錯(cuò),而且這種錯(cuò)誤本地和測試環(huán)境很難復(fù)現(xiàn)。
我最近剛處理和優(yōu)化的一個(gè)問題就是for循環(huán)中調(diào)用了異步發(fā)送消息的方法,導(dǎo)致線程池資源耗盡報(bào)錯(cuò),但是本地和測試一直無法復(fù)現(xiàn)錯(cuò)誤,直到最后在異步線程方法中加了sleep之后bug才復(fù)現(xiàn),當(dāng)然也說明線上環(huán)境復(fù)雜,消息隊(duì)列響應(yīng)慢,所以為了避免這類情況出現(xiàn),一定在寫代碼的時(shí)候要盡可能規(guī)范合理,畢竟本地沒問題不代表線上沒問題,并發(fā)量為10的時(shí)候,接口一切響應(yīng)正常,但是如果并發(fā)量發(fā)生量級(jí)變化,變成100甚至1000,性能問題就出現(xiàn)了。
配置
這里我們再簡單說下activemq的配置,進(jìn)入conf文件夾,可以看到如下文件:

端口配置
其中,端口的配置在jetty.xml中:

管理賬戶配置
管理用戶配置在jetty-realm.properties中,配置規(guī)則也很簡單:
用戶名: 密碼, 角色

其他配置
剩余的文件都是有關(guān)登錄驗(yàn)證的配置,比如jaas、credentials,暫時(shí)未涉及,這里也就不做探討,等后面了解之后再來分享。
結(jié)語
今天的內(nèi)容已經(jīng)比較完整地介紹了activemq的基礎(chǔ)內(nèi)容,包括下載、安裝、配置以及項(xiàng)目整合,就算之前沒接觸過activemq的小伙伴,通過今天的內(nèi)容,我想你也應(yīng)該對(duì)activemq有了基本的認(rèn)知,基本上可以確保在工作中用到activemq的時(shí)候,你可以很好地上手。當(dāng)然,今天的內(nèi)容,也算是我最近一段時(shí)間學(xué)習(xí)activemq的一個(gè)簡單總結(jié),后面還需要進(jìn)一步的深入學(xué)習(xí)和探索。
最后,再給各個(gè)小伙伴一個(gè)建議,學(xué)習(xí)的時(shí)候要盡可能多看官方文檔,多留意提示信息,多橫向比較,多做延申,多積累,要慢慢培養(yǎng)自己系統(tǒng)的學(xué)習(xí)思維和習(xí)慣,提升自己知識(shí)橫向擴(kuò)展能力,定期做知識(shí)的歸納整理,將知識(shí)點(diǎn)連成知識(shí)鏈,再將知識(shí)鏈鏈接形成知識(shí)面,這樣隨著你知識(shí)面的不斷擴(kuò)展,你會(huì)發(fā)現(xiàn)你學(xué)東西的時(shí)候效率會(huì)更好,學(xué)東西會(huì)更快,而且學(xué)習(xí)質(zhì)量也更高,畢竟大腦更擅長的是關(guān)聯(lián),而不是記憶。另外,就算大腦是個(gè)數(shù)據(jù)庫,那你也應(yīng)該知道,建立索引,我們的知識(shí)才可以更快速地被檢索,所以管理知識(shí),其實(shí)比學(xué)習(xí)知識(shí)更重要。
- END -