深入了解ActiveMQ!

認(rèn)識(shí)MQ(Message Queue)
什么是消息隊(duì)列

首先我們先從以下幾個(gè)維度來(lái)認(rèn)識(shí)一下消息隊(duì)列:
消息隊(duì)列:一般我們會(huì)簡(jiǎn)稱它為MQ(MessageQueue) 消息(Message):傳輸?shù)臄?shù)據(jù)。 隊(duì)列(Queue):隊(duì)列是一種先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu)。 消息隊(duì)列從字面的含義來(lái)看就是一個(gè)存放消息的容器。 消息隊(duì)列可以簡(jiǎn)單理解為:把要傳輸?shù)臄?shù)據(jù)放在隊(duì)列中。 把數(shù)據(jù)放到消息隊(duì)列叫做生產(chǎn)者。 從消息隊(duì)列里邊取數(shù)據(jù)叫做消費(fèi)者。
為什么需要消息隊(duì)列
使用消息隊(duì)列主要是基于以下三個(gè)主要場(chǎng)景:
解耦 異步 削峰/限流
下面我們分場(chǎng)景來(lái)描述下使用消息隊(duì)列帶來(lái)的好處
解耦
假設(shè)我們有一個(gè)用戶系統(tǒng)A,用戶系統(tǒng)A可以產(chǎn)生一個(gè)userId。
然后,現(xiàn)在有系統(tǒng)B和系統(tǒng)C都需要這個(gè)userId去做相關(guān)的操作。

偽碼大致如下:
java
public?class?SystemA?{
????//?系統(tǒng)B和系統(tǒng)C的依賴
????SystemB?systemB?=?new?SystemB();
????SystemC?systemC?=?new?SystemC();
????//?系統(tǒng)A獨(dú)有的數(shù)據(jù)userId
????private?String?userId?=?"activeMq-1234567890";
????public?void?doSomething()?{
????????//?系統(tǒng)B和系統(tǒng)C都需要拿著系統(tǒng)A的userId去操作其他的事
????????systemB.SystemBNeed2do(userId);
????????systemC.SystemCNeed2do(userId);
????}
}
「這樣類似的業(yè)務(wù)場(chǎng)景大家是不是很熟悉,大家是不是這樣寫(xiě)很合情合理,也很簡(jiǎn)單?!?/strong>
某一天,系統(tǒng)B的負(fù)責(zé)人告訴系統(tǒng)A的負(fù)責(zé)人,現(xiàn)在系統(tǒng)B的SystemBNeed2do(String userId)這個(gè)接口不再使用了,讓系統(tǒng)A別去調(diào)它了。
于是,系統(tǒng)A的負(fù)責(zé)人說(shuō)"好的,那我就不調(diào)用你了。",于是就把調(diào)用系統(tǒng)B接口的代碼給刪掉了。代碼變成這樣了:
java
public?void?doSomething()?{
??//?系統(tǒng)A不再調(diào)用系統(tǒng)B的接口了
??//systemB.SystemBNeed2do(userId);
??systemC.SystemCNeed2do(userId);
}
由于業(yè)務(wù)需要,系統(tǒng)D說(shuō)也需要用到系統(tǒng)A的userId,于是代碼改成了這樣:
java
public?void?doSomething()?{
????????//?已經(jīng)不再需要系統(tǒng)B的依賴了
????????//systemB.SystemBNeed2do(userId);
????????//?系統(tǒng)C和系統(tǒng)D都需要拿著系統(tǒng)A的userId去操作其他的事
????????systemC.SystemCNeed2do(userId);
????????systemD.SystemDNeed2do(userId);
}
當(dāng)前系統(tǒng)A、B、C、D系統(tǒng)的交互是這樣子的。

隨著業(yè)務(wù)需求的變化,代碼也要一遍一遍的修改。
還會(huì)存在另外一個(gè)問(wèn)題,調(diào)用系統(tǒng)C的時(shí)候,如果系統(tǒng)C掛了,系統(tǒng)A還要想辦法處理。如果調(diào)用系統(tǒng)D時(shí),由于網(wǎng)絡(luò)延遲,請(qǐng)求超時(shí)了,那系統(tǒng)A是反饋fail還是重試?
那么怎么去解決這樣的現(xiàn)狀呢,如何從頻繁的修改代碼中解脫呢?
這時(shí)候我們就引入一層消息隊(duì)列中間件,交互圖如下:

將系統(tǒng)A產(chǎn)生的userId寫(xiě)到消息隊(duì)列中,系統(tǒng)C和系統(tǒng)D從消息隊(duì)列中拿數(shù)據(jù)。
這樣有什么好處?
系統(tǒng)A只負(fù)責(zé)把數(shù)據(jù)寫(xiě)到隊(duì)列中,誰(shuí)想要或不想要這個(gè)數(shù)據(jù)(消息),系統(tǒng)A一點(diǎn)都不關(guān)心。 即便現(xiàn)在系統(tǒng)D不想要userId這個(gè)數(shù)據(jù)了,系統(tǒng)B又突然想要userId這個(gè)數(shù)據(jù)了,都跟系統(tǒng)A無(wú)關(guān),系統(tǒng)A一點(diǎn)代碼都不用改。 系統(tǒng)D拿userId不再經(jīng)過(guò)系統(tǒng)A,而是從消息隊(duì)列里邊拿。系統(tǒng)D即便掛了或者請(qǐng)求超時(shí),都跟系統(tǒng)A無(wú)關(guān),
只跟消息隊(duì)列有關(guān)。這樣一來(lái),系統(tǒng)A與系統(tǒng)B、C、D都解耦了。
異步
系統(tǒng)A做的是主要的業(yè)務(wù),而系統(tǒng)B、C、D是非主要的業(yè)務(wù)。比如系統(tǒng)A處理的是訂單下單,而系統(tǒng)B是訂單下單成功了,那發(fā)送一條短信告訴具體的用戶此訂單已成功,而系統(tǒng)C和系統(tǒng)D也是處理一些小事而已。
那么此時(shí),為了提高用戶體驗(yàn)和吞吐量,其實(shí)可以異步地調(diào)用系統(tǒng)B、C、D的接口。

削峰/限流
我們?cè)賮?lái)一個(gè)場(chǎng)景,現(xiàn)在我們每個(gè)月要搞一次大促,大促期間的并發(fā)可能會(huì)很高的,比如每秒3000個(gè)請(qǐng)求。假設(shè)我們現(xiàn)在有兩臺(tái)機(jī)器處理請(qǐng)求,并且每臺(tái)機(jī)器只能每次處理1000個(gè)請(qǐng)求。

系統(tǒng)B和系統(tǒng)C根據(jù)自己的能夠處理的請(qǐng)求數(shù)去消息隊(duì)列中拿數(shù)據(jù),這樣即便有每秒有8000個(gè)請(qǐng)求,那只是把請(qǐng)求放在消息隊(duì)列中,去拿消息隊(duì)列的消息由系統(tǒng)自己去控制,這樣就不會(huì)把整個(gè)系統(tǒng)給搞崩。

什么是JMS MQ
全稱:Java MessageService 中文:Java 消息服務(wù)。
JMS 是 Java 的一套 API 標(biāo)準(zhǔn),最初的目的是為了使應(yīng)用程序能夠訪問(wèn)現(xiàn)有的MOM 系 統(tǒng)(MOM 是 MessageOriented Middleware 的英文縮寫(xiě),指的是利用高效可靠的消息傳遞機(jī)制進(jìn)行平臺(tái)無(wú)關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來(lái)進(jìn)行分布式系統(tǒng)的集成。) 后來(lái)被許多現(xiàn)有的 MOM 供應(yīng)商采用,并實(shí)現(xiàn)為MOM 系統(tǒng)。
常見(jiàn) MOM 系統(tǒng)包括 Apache的 ActiveMQ、阿里巴巴的 RocketMQ、IBM 的 MQSeries、Microsoft 的 MSMQ、BEA 的 RabbitMQ 等。(并非全部的 MOM 系統(tǒng)都遵循JMS 規(guī)范)】
基于 JMS 實(shí)現(xiàn)的 MOM,又被稱為JMSProvider。
JMS中的一些概念
「Broker」
消息服務(wù)器,作為server提供消息核心服務(wù)
「Provider 生產(chǎn)者」
消息生產(chǎn)者是由會(huì)話創(chuàng)建的一個(gè)對(duì)象,用于把消息發(fā)動(dòng)到一個(gè)目的地
「Consumer 消費(fèi)者」
消息消費(fèi)者是由會(huì)話創(chuàng)建的一個(gè)對(duì)象,它用于接收發(fā)送到目的地的消息。消息的消費(fèi)可以采用以下兩種方法:
同步消費(fèi)。通過(guò)調(diào)用消費(fèi)者的receive方法從目的地中顯式提取消息。receive方法可以一直阻塞到消息到達(dá)。
異步消費(fèi)??蛻艨梢詾橄M(fèi)者注冊(cè)一個(gè)消息監(jiān)聽(tīng)器,以定義在消息到達(dá)時(shí)所采取的動(dòng)作。
「P2P 點(diǎn)對(duì)點(diǎn)消息模型」
消息生產(chǎn)者生產(chǎn)消息發(fā)送到queue 中,然后消息消費(fèi)者從queue 中取出并且消費(fèi)消息。消息被消費(fèi)以后,queue 中不再有存儲(chǔ),所以消息消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息。Queue支持存在多個(gè)消費(fèi)者,但是對(duì)一個(gè)消息而言,只會(huì)有一個(gè)消費(fèi)者可以消費(fèi)、其它的則不能消費(fèi)此消息了。當(dāng)消費(fèi)者不存在時(shí),消息會(huì)一直保存,直到有消費(fèi)消費(fèi)。
「Pub/Sub 發(fā)布訂閱消息模型」
消息生產(chǎn)者(發(fā)布)將消息發(fā)布到topic 中,同時(shí)有多個(gè)消息消費(fèi)者(訂閱)消費(fèi)該消息。和點(diǎn)對(duì)點(diǎn)方式不同,發(fā)布到 topic 的消息會(huì)被所有訂閱者消費(fèi)。當(dāng)生產(chǎn)者發(fā)布消息,不管是否有消費(fèi)者。都不會(huì)保存消息一定要先有消息的消費(fèi)者,后有消息的生產(chǎn)者。
「P2P vs Pub/Sub」

「Queue」
隊(duì)列存儲(chǔ),常用于點(diǎn)對(duì)點(diǎn)消息模型
默認(rèn)只能由唯一的一個(gè)消費(fèi)者處理。一旦處理消息刪除。
「Topic」
主題存儲(chǔ),用于訂閱/發(fā)布消息模型
主題中的消息,會(huì)發(fā)送給所有的消費(fèi)者同時(shí)處理。只有在消息可以重復(fù)處理的業(yè)務(wù)場(chǎng)景中可使用。
「ConnectionFactory」
連接工廠,jms中用它創(chuàng)建連接
連接工廠是客戶用來(lái)創(chuàng)建連接的對(duì)象,例如ActiveMQ提供的ActiveMQConnectionFactory。
「Connection」
JMS Connection封裝了客戶與JMS提供者之間的一個(gè)虛擬的連接。
「Destination 消息的目的地」
目的地是客戶用來(lái)指定它生產(chǎn)的消息的目標(biāo)和它消費(fèi)的消息的來(lái)源的對(duì)象。
訂閱一個(gè)主題的消費(fèi)者只能消費(fèi)自它訂閱之后發(fā)布的消息。JMS規(guī)范允許客戶創(chuàng)建持久訂閱,這在一定程度上放松了時(shí)間上的相關(guān)性要求。持久訂閱允許消費(fèi)者消費(fèi)它在未處于激活狀態(tài)時(shí)發(fā)送的消息。在點(diǎn)對(duì)點(diǎn)消息傳遞域中,目的地被成為隊(duì)列(queue);在發(fā)布/訂閱消息傳遞域中,目的地被成為主題(topic)。
「Session」
JMS Session是生產(chǎn)和消費(fèi)消息的一個(gè)單線程上下文。會(huì)話用于創(chuàng)建消息生產(chǎn)者(producer)、消息消費(fèi)者(consumer)和消息(message)等。會(huì)話提供了一個(gè)事務(wù)性的上下文,在這個(gè)上下文中,一組發(fā)送和接收被組合到了一個(gè)原子操作中。
消息可靠性機(jī)制
「確認(rèn) JMS消息」
只有在被確認(rèn)之后,才認(rèn)為已經(jīng)被成功地消費(fèi)了。消息的成功消費(fèi)通常包含三個(gè)階段:客戶接收消息、客戶處理消息和消息被確認(rèn)。
在事務(wù)性會(huì)話中,當(dāng)一個(gè)事務(wù)被提交的時(shí)候,確認(rèn)自動(dòng)發(fā)生。
在非事務(wù)性會(huì)話中,消息何時(shí)被確認(rèn)取決于創(chuàng)建會(huì)話時(shí)的應(yīng)答模式(acknowledgement mode)。該參數(shù)有以下三個(gè)可選值:
「Session.AUTO_ACKNOWLEDGE」。當(dāng)客戶成功的從receive方法返回的時(shí)候,或者從MessageListener.onMessage方法成功返回的時(shí)候,會(huì)話自動(dòng)確認(rèn)客戶收到的消息。
「Session.CLIENT_ACKNOWLEDGE」??蛻敉ㄟ^(guò)消息的acknowledge方法確認(rèn)消息。需要注意的是,在這種模式中,確認(rèn)是在會(huì)話層上進(jìn)行:確認(rèn)一個(gè)被消費(fèi)的消息將自動(dòng)確認(rèn)所有已被會(huì)話消費(fèi)的消息。例如,如果一個(gè)消息消費(fèi)者消費(fèi)了10個(gè)消息,然后確認(rèn)第5個(gè)消息,那么所有10個(gè)消息都被確認(rèn)。
「Session.DUPS_ACKNOWLEDGE」。該選擇只是會(huì)話遲鈍的確認(rèn)消息的提交。如果JMS Provider失敗,那么可能會(huì)導(dǎo)致一些重復(fù)的消息。如果是重復(fù)的消息,那么JMS Provider必須把消息頭的JMSRedelivered字段設(shè)置為true。
「持久性」
JMS 支持以下兩種消息提交模式:
「PERSISTENT」。指示JMSProvider持久保存消息,以保證消息不會(huì)因?yàn)镴MS Provider的失敗而丟失。
「NON_PERSISTENT」。不要求JMS Provider持久保存消息。
「優(yōu)先級(jí)」
可以使用消息優(yōu)先級(jí)來(lái)指示JMS Provider首先提交緊急的消息。優(yōu)先級(jí)分10個(gè)級(jí)別,從0(最低)到9(最高)。如果不指定優(yōu)先級(jí),默認(rèn)級(jí)別是4。「需要注意的是,JMSProvider并不一定保證按照優(yōu)先級(jí)的順序提交消息?!?/strong>
「消息過(guò)期」
可以設(shè)置消息在一定時(shí)間后過(guò)期,默認(rèn)是永不過(guò)期
「臨時(shí)目的地」
可以通過(guò)會(huì)話上的createTemporaryQueue方法和createTemporaryTopic方法來(lái)創(chuàng)建臨時(shí)目的地。它們的存在時(shí)間只限于創(chuàng)建它們的連接所保持的時(shí)間。只有創(chuàng)建該臨時(shí)目的地的連接上的消息消費(fèi)者才能夠從臨時(shí)目的地中提取消息。
「持久訂閱」
首先消息生產(chǎn)者必須使用PERSISTENT提交消息??蛻艨梢酝ㄟ^(guò)會(huì)話上的createDurableSubscriber方法來(lái)創(chuàng)建一個(gè)持久訂閱,該方法的第一個(gè)參數(shù)必須是一個(gè)topic,第二個(gè)參數(shù)是訂閱的名稱。
JMS Provider會(huì)存儲(chǔ)發(fā)布到持久訂閱對(duì)應(yīng)的topic上的消息。如果最初創(chuàng)建持久訂閱的客戶或者任何其它客戶使用相同的連接工廠和連接的客戶ID、相同的主題和相同的訂閱名再次調(diào)用會(huì)話上的createDurableSubscriber方法,那么該持久訂閱就會(huì)被激活。
JMS Provider會(huì)向客戶發(fā)送客戶處于非激活狀態(tài)時(shí)所發(fā)布的消息。
持久訂閱在某個(gè)時(shí)刻只能有一個(gè)激活的訂閱者。持久訂閱在創(chuàng)建之后會(huì)一直保留,直到應(yīng)用程序調(diào)用會(huì)話上的unsubscribe方法。
「本地事務(wù)」
在一個(gè)JMS客戶端,可以使用本地事務(wù)來(lái)組合消息的發(fā)送和接收。JMS Session接口提供了commit和rollback方法。事務(wù)提交意味著生產(chǎn)的所有消息被發(fā)送,消費(fèi)的所有消息被確認(rèn);事務(wù)回滾意味著生產(chǎn)的所有消息被銷毀,消費(fèi)的所有消息被恢復(fù)并重新提交,除非它們已經(jīng)過(guò)期。
事務(wù)性的會(huì)話總是牽涉到事務(wù)處理中,commit或rollback方法一旦被調(diào)用,一個(gè)事務(wù)就結(jié)束了,而另一個(gè)事務(wù)被開(kāi)始。關(guān)閉事務(wù)性會(huì)話將回滾其中的事務(wù)。
需要注意的是,如果使用請(qǐng)求/回復(fù)機(jī)制,即發(fā)送一個(gè)消息,同時(shí)希望在同一個(gè)事務(wù)中等待接收該消息的回復(fù),那么程序?qū)⒈粧炱?,因?yàn)橹朗聞?wù)提交,發(fā)送操作才會(huì)真正執(zhí)行。需要注意的還有一個(gè),消息的生產(chǎn)和消費(fèi)不能包含在同一個(gè)事務(wù)中。
ActiveMQ
存儲(chǔ)
ActiveMQ支持很多種存儲(chǔ)方式,常見(jiàn)的有 KahaDB存儲(chǔ),AMQ存儲(chǔ),JDBC存儲(chǔ),LevelDB存儲(chǔ),Memory 消息存儲(chǔ)。我們重點(diǎn)介紹一下KahaDB和JDBC存儲(chǔ)方式。
KahaDB存儲(chǔ)
KahaDB是默認(rèn)的持久化策略,所有消息順序添加到一個(gè)日志文件中,同時(shí)另外有一個(gè)索引文件記錄指向這些日志的存儲(chǔ)地址,還有一個(gè)事務(wù)日志用于消息回復(fù)操作。是一個(gè)專門針對(duì)消息持久化的解決方案,它對(duì)典型的消息使用模式進(jìn)行了優(yōu)化。
在data/kahadb這個(gè)目錄下,會(huì)生成四個(gè)文件,來(lái)完成消息持久化 db.data 它是消息的索引文件,本質(zhì)上是B-Tree(B樹(shù)),使用B-Tree作為索引指向db-*.log里面存儲(chǔ)的消息 db.redo 用來(lái)進(jìn)行消息恢復(fù) *db-.log 存儲(chǔ)消息內(nèi)容。

新的數(shù)據(jù)以APPEND的方式追加到日志文件末尾。屬于順序?qū)懭?,因此消息存?chǔ)是比較 快的。默認(rèn)是32M,達(dá)到閥值會(huì)自動(dòng)遞增 lock文件 鎖,寫(xiě)入當(dāng)前獲得kahadb讀寫(xiě)權(quán)限的broker ,用于在集群環(huán)境下的競(jìng)爭(zhēng)處理。
KahaDB有如下幾個(gè)特性:
日志形式存儲(chǔ)消息; 消息索引以 B-Tree 結(jié)構(gòu)存儲(chǔ),可以快速更新; 完全支持 JMS 事務(wù); 支持多種恢復(fù)機(jī)制kahadb 可以限制每個(gè)數(shù)據(jù)文件的大小。不代表總計(jì)數(shù)據(jù)容量。
配置方式如下:
????"${activemq.data}/kahadb"/>
JDBC 存儲(chǔ)
支持通過(guò) JDBC 將消息存儲(chǔ)到關(guān)系數(shù)據(jù)庫(kù),性能上不如文件存儲(chǔ),能通過(guò)關(guān)系型數(shù)據(jù)庫(kù)查詢到消息的信息。
MQ 支持的數(shù)據(jù)庫(kù):Apache Derby、MySQL、PostgreSQL、Oracle、SQLServer、Sybase、Informix、MaxDB。使用JDBC存儲(chǔ)需要用到下面三張數(shù)據(jù)表。
「activemq_acks」:用于存儲(chǔ)訂閱關(guān)系。如果是持久化Topic,訂閱者和服務(wù)器的訂閱關(guān)系在這個(gè)表保存。主要的數(shù)據(jù)庫(kù)字段如下:
container:消息的destination sub_dest:如果是使用static集群,這個(gè)字段會(huì)有集群其他系統(tǒng)的信息 client_id:每個(gè)訂閱者都必須有一個(gè)唯一的客戶端id用以區(qū)分 sub_name:訂閱者名稱 selector:選擇器,可以選擇只消費(fèi)滿足條件的消息。條件可以用自定義屬性實(shí)現(xiàn),可支持多屬性and和or操作 last_acked_id:記錄消費(fèi)過(guò)的消息的id。
「activemq_lock」:在集群環(huán)境中才有用,只有一個(gè)Broker可以獲得消息,稱為Master Broker,其他的只能作為備份等待Master Broker不可用,才可能成為下一個(gè)Master Broker。這個(gè)表用于記錄哪個(gè)Broker是當(dāng)前的Master Broker。
「activemq_msgs」:用于存儲(chǔ)消息,Queue和Topic都存儲(chǔ)在這個(gè)表中。主要的數(shù)據(jù)庫(kù)字段如下
id:自增的數(shù)據(jù)庫(kù)主鍵 container:消息的destination msgid_prod:消息發(fā)送者客戶端的主鍵 msg_seq:是發(fā)送消息的順序,msgid_prod+msg_seq可以組成jms的messageid expiration:消息的過(guò)期時(shí)間,存儲(chǔ)的是從1970-01-01到現(xiàn)在的毫秒數(shù) msg:消息本體的java序列化對(duì)象的二進(jìn)制數(shù)據(jù) priority:優(yōu)先級(jí),從0-9,數(shù)值越大優(yōu)先級(jí)越高 xid:topic
配置方式如下:
配置數(shù)據(jù)源 conf/acticvemq.xml 文件:
"mysql-ds"?class="org.apache.commons.dbcp.BasicDataSource"?destroy-method="close">
???"driverClassName"?value="com.mysql.jdbc.Driver"/>
???"url"?value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/>
???"username"?value="root"/>
???"password"?value="111111"/>
???"maxActive"?value="200"/>
???"poolPreparedStatements"?value="true"/>
???
配置 broke 中的 persistenceAdapter dataSource 指定持久化數(shù)據(jù)庫(kù)的 bean,createTablesOnStartup 是否在啟動(dòng)的時(shí)候創(chuàng)建數(shù)據(jù)表,默認(rèn)值是 true,這樣每次啟動(dòng)都會(huì)去創(chuàng)建數(shù)據(jù)表了,一般是第一次啟動(dòng)的時(shí)候設(shè)置為 true,之后改成 false。
<persistenceAdapter>
??<jdbcPersistenceAdapter?dataSource="#mysql-ds"??createTablesOnStartup="false"/>
persistenceAdapter>?
協(xié)議
ActiveMQ支持的client-broker通訊協(xié)議有:TCP、NIO、UDP、SSL、Http(s)、VM。
Transmission Control Protocol (TCP)
這是默認(rèn)的Broker配置,TCP的Client監(jiān)聽(tīng)端口是61616。
在網(wǎng)絡(luò)傳輸數(shù)據(jù)前,必須要序列化數(shù)據(jù),消息是通過(guò)一個(gè)叫wire protocol的來(lái)序列化成字節(jié)流。默認(rèn)情況下,ActiveMQ把wire protocol叫做OpenWire,它的目的是促使網(wǎng)絡(luò)上的效率和數(shù)據(jù)快速交互。
TCP連接的URI形式:tcp://hostname:port?key=value&key=value
TCP傳輸?shù)膬?yōu)點(diǎn):(1)TCP協(xié)議傳輸可靠性高,穩(wěn)定性強(qiáng) (2)高效性:字節(jié)流方式傳遞,效率很高 (3)有效性、可用性:應(yīng)用廣泛,支持任何平臺(tái)
New I/O API Protocol(NIO)
NIO協(xié)議和TCP協(xié)議類似,但NIO更側(cè)重于底層的訪問(wèn)操作。它允許開(kāi)發(fā)人員對(duì)同一資源可有更多的client調(diào)用和服務(wù)端有更多的負(fù)載。
適合使用NIO協(xié)議的場(chǎng)景:(1)可能有大量的Client去鏈接到Broker上一般情況下,大量的Client去鏈接Broker是被操作系統(tǒng)的線程數(shù)所限制的。因此,NIO的實(shí)現(xiàn)比TCP需要更少的線程去運(yùn)行,所以建議使用NIO協(xié)議 (2)可能對(duì)于Broker有一個(gè)很遲鈍的網(wǎng)絡(luò)傳輸NIO比TCP提供更好的性能
NIO連接的URI形式:nio://hostname:port?key=value
Transport Connector配置示例:
??????name="nio"
????uri="nio://localhost:61618?trace=true"?/>
User Datagram Protocol(UDP)
UDP和TCP的區(qū)別 (1)TCP是一個(gè)原始流的傳遞協(xié)議,意味著數(shù)據(jù)包是有保證的,換句話說(shuō),數(shù)據(jù)包是不會(huì)被復(fù)制和丟失的。UDP,另一方面,它是不會(huì)保證數(shù)據(jù)包的傳遞的 (2)TCP也是一個(gè)穩(wěn)定可靠的數(shù)據(jù)包傳遞協(xié)議,意味著數(shù)據(jù)在傳遞的過(guò)程中不會(huì)被丟失。這樣確保了在發(fā)送和接收之間能夠可靠的傳遞。相反,UDP僅僅是一個(gè)鏈接協(xié)議,所以它沒(méi)有可靠性之說(shuō)
從上面可以得出:TCP是被用在穩(wěn)定可靠的場(chǎng)景中使用的;UDP通常用在快速數(shù)據(jù)傳遞和不怕數(shù)據(jù)丟失的場(chǎng)景中,還有ActiveMQ通過(guò)防火墻時(shí),只能用UDP
UDP連接的URI形式:udp://hostname:port?key=value
Transport Connector配置示例:
????????????name="udp"
????????uri="udp://localhost:61618?trace=true"?/>
Active MQ的安全機(jī)制
「web控制臺(tái)安全」
修改jetty-realm.properties# username: password [,rolename ...](用戶名:密碼 角色)
注意:配置需重啟ActiveMQ才會(huì)生效
「消息安全機(jī)制」
修改activemq.xml 在中添加如下代碼:
<plugins>
??????<simpleAuthenticationPlugin>
??????????<users>
??????????????<authenticationUser?username="admin"?password="admin"?groups="admins,publishers,consumers"/>
??????????????<authenticationUser?username="publisher"?password="publisher"??groups="publishers,consumers"/>
??????????????<authenticationUser?username="consumer"?password="consumer"?groups="consumers"/>
??????????????<authenticationUser?username="guest"?password="guest"??groups="guests"/>
??????????users>
??????simpleAuthenticationPlugin>
?plugins>
ActiveMQ 使用
在java中使用ActiveMQ只需要引入相關(guān)依賴
<dependency>
????<groupId>org.apache.activemqgroupId>
????<artifactId>activemq-allartifactId>
????<version>5.15.11version>
dependency>
編寫(xiě)生產(chǎn)者
public?class?Sender?{
?public?static?void?main(String[]?args)?throws?JMSException?{
?//?1.?建立工廠對(duì)象,
?ActiveMQConnectionFactory?acf?=?new?ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://localhost:61618");
?//2?從工廠里拿一個(gè)連接
?Connection?connection?=?acf.createConnection();
?connection.start();
?//3?從連接中獲取Session(會(huì)話)
?Session?session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);
?//4?從會(huì)話中獲取目的地(Destination)消費(fèi)者會(huì)從這個(gè)目的地取消息
?Queue?queue?=?session.createQueue("mq.test");
?//5?從會(huì)話中創(chuàng)建消息提供者
?MessageProducer?producer?=?session.createProducer(queue);
?//6?從會(huì)話中創(chuàng)建文本消息(也可以創(chuàng)建其它類型的消息體)
?TextMessage?message?=?session.createTextMessage("msg:?hello?world");
?//7?通過(guò)消息提供者發(fā)送消息到ActiveMQ
?producer.send(message);
?//8?關(guān)閉連接
?connection.close();
?}
}
編寫(xiě)消費(fèi)者
public?class?Receiver?{
?public?static?void?main(String[]?args)?throws?JMSException?{
?//?1.?建立工廠對(duì)象,
?ActiveMQConnectionFactory?acf?=?new?ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://localhost:61618");
?//2?從工廠里拿一個(gè)連接
?Connection?connection?=?acf.createConnection();
?connection.start();
?//3?從連接中獲取Session(會(huì)話)
?Session?session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);
?//4?從會(huì)話中獲取目的地(Destination)消費(fèi)者會(huì)從這個(gè)目的地取消息
?Queue?queue?=?session.createQueue("mq.test");
?//5?從會(huì)話中創(chuàng)建消息消費(fèi)者
?MessageConsumer?consumer?=?session.createConsumer(queue);
?while?(true){
??//6?消費(fèi)者接收消息
??Message?msg?=?consumer.receive();
??TextMessage?textMessage?=?(TextMessage)?msg;
??System.out.println("text:"+textMessage.getText());
??}
?}
}
常用API及特性
事務(wù)消息 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);提交事務(wù):session.commit();
回滾事務(wù):session.rollback();
開(kāi)啟事務(wù)后,只有事務(wù)commit成功,消息才會(huì)發(fā)送到MQ中持久化
默認(rèn)持久化是開(kāi)啟的;
開(kāi)啟非持久化示例代碼:producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)設(shè)置消息優(yōu)先級(jí) producer.setPriority();設(shè)置消息超時(shí)/過(guò)期時(shí)間 producer.setTimeToLive
設(shè)置了消息超時(shí)的消息,消費(fèi)端在超時(shí)后無(wú)法在消費(fèi)到此消息。死信
此類消息會(huì)進(jìn)入到ActiveMQ.DLQ隊(duì)列且不會(huì)自動(dòng)清除,稱為死信,有消息堆積的風(fēng)險(xiǎn)。簽收模式
簽收代表接收端的session已收到消息的一次確認(rèn),反饋給broker
如果session帶有事務(wù),并且事務(wù)成功提交,則消息被自動(dòng)簽收。如果事務(wù)回滾,則消息會(huì)被再次傳送。
消息事務(wù)是在生產(chǎn)者producer到broker或broker到consumer過(guò)程中同一個(gè)session中發(fā)生的,保證幾條消息在發(fā)送過(guò)程中的原子性。在支持事務(wù)的session中,producer發(fā)送message時(shí)在message中帶有transactionID。broker收到message后判斷是否有transactionID,如果有就把message保存在transaction store中,等待commit或者rollback消息。
?ActiveMQ支持自動(dòng)簽收與手動(dòng)簽收
「Session.AUTO_ACKNOWLEDGE」
當(dāng)客戶端從receiver或onMessage成功返回時(shí),Session自動(dòng)簽收客戶端的這條消息的收條。
「Session.CLIENT_ACKNOWLEDGE」
客戶端通過(guò)調(diào)用消息(Message)的acknowledge方法簽收消息。在這種情況下,簽收發(fā)生在Session層面:簽收一個(gè)已經(jīng)消費(fèi)的消息會(huì)自動(dòng)地簽收這個(gè)Session所有已消費(fèi)的收條。
「Session.DUPS_OK_ACKNOWLEDGE」
Session不必確保對(duì)傳送消息的簽收,這個(gè)模式可能會(huì)引起消息的重復(fù),但是降低了Session的開(kāi)銷,所以只有客戶端能容忍重復(fù)的消息,才可使用。獨(dú)占消費(fèi)者 Queue queue = session.createQueue("xxoo?consumer.exclusive=true");發(fā)送異步消息
ActiveMQConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory(
????"admin",
????"admin",
????"tcp://localhost:61616"
????);
//?2.獲取一個(gè)向ActiveMQ的連接
connectionFactory.setUseAsyncSend(true);
ActiveMQConnection?connection?=?(ActiveMQConnection)connectionFactory.createConnection();
connection.setUseAsyncSend(true);
消息堆積
producer每發(fā)送一個(gè)消息,統(tǒng)計(jì)一下發(fā)送的字節(jié)數(shù),當(dāng)字節(jié)數(shù)達(dá)到ProducerWindowSize值時(shí),需要等待broker的確認(rèn),才能繼續(xù)發(fā)送。
brokerUrl中設(shè)置:tcp://localhost:61616?jms.producerWindowSize=1048576
destinationUri中設(shè)置:myQueue?producer.windowSize=1048576延遲消息投遞
首先在配置文件中開(kāi)啟延遲和調(diào)度
<broker?xmlns="http://activemq.apache.org/schema/core"?brokerName="localhost"?dataDirectory="${activemq.data}"?schedulerSupport="true">
延遲發(fā)送示例代碼:message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,10*1000);
創(chuàng)建監(jiān)聽(tīng)器
ActiveMQConnectionFactory?acf?=?new?ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
??ActiveMQConnectionFactory.DEFAULT_PASSWORD,
??"tcp://localhost:61618");
//2?從工廠里拿一個(gè)連接
Connection?connection?=?acf.createConnection();
connection.start();
//3?從連接中獲取Session(會(huì)話)
Session?session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);
//4?從會(huì)話中獲取目的地(Destination)消費(fèi)者會(huì)從這個(gè)目的地取消息
Queue?queue?=?session.createQueue("mq.test");
//5?從會(huì)話中創(chuàng)建消息消費(fèi)者
MessageConsumer?consumer?=?session.createConsumer(queue);
MyListener?myListener?=?new?MyListener();
MessageListener?listener?=?myListener::receiveMessage;
consumer.setMessageListener(listener);
SpringBoot整合ActiveMQ
添加依賴
?????org.springframework.boot
?????spring-boot-starter-activemq
配置文件
server:
??port:?80
spring:
??activemq:
????broker-url:?tcp://localhost:61618
????user:?admin
????password:?admin
????pool:
??????enabled:?true
??????#連接池最大連接數(shù)
??????max-connections:?5
??????#空閑的連接過(guò)期時(shí)間,默認(rèn)為30秒
??????idle-timeout:?0
????packages:
??????trust-all:?true
??jms:
????pub-sub-domain:?true
配置類
@Configuration
@EnableJms
public?class?ActiveMqConfig?{
//?topic模式的ListenerContainer
?@Bean
public?JmsListenerContainerFactory>?jmsListenerContainerTopic(ConnectionFactory?activeMQConnectionFactory)?{
?????????DefaultJmsListenerContainerFactory?bean?=?new?DefaultJmsListenerContainerFactory();
?????????bean.setPubSubDomain(true);
?????????bean.setConnectionFactory(activeMQConnectionFactory);
?????????return?bean;
?????}
//?queue模式的ListenerContainer
@Bean
public?JmsListenerContainerFactory>?jmsListenerContainerQueue(ConnectionFactory?activeMQConnectionFactory)?{
?????????DefaultJmsListenerContainerFactory?bean?=?new?DefaultJmsListenerContainerFactory();
?????????bean.setConnectionFactory(activeMQConnectionFactory);
?????????return?bean;
?????}
}
編寫(xiě)生產(chǎn)者
@Service
public?class?MqProducerService?{
?@Autowired
?private?JmsMessagingTemplate?jmsMessagingTemplate;
??
?public?void?sendStringQueue(String?destination,?String?msg)?{
??System.out.println("send...");
??ActiveMQQueue?queue?=?new?ActiveMQQueue(destination);
??jmsMessagingTemplate.afterPropertiesSet();
??ConnectionFactory?factory?=?jmsMessagingTemplate.getConnectionFactory();
??try?{
???Connection?connection?=?factory.createConnection();
???connection.start();
???Session?session?=?connection.createSession(true,?Session.AUTO_ACKNOWLEDGE);
???Queue?queue2?=?session.createQueue(destination);
???MessageProducer?producer?=?session.createProducer(queue2);
???TextMessage?message?=?session.createTextMessage("hahaha");
???producer.send(message);
??}?catch?(JMSException?e)?{
???//?TODO?Auto-generated?catch?block
???e.printStackTrace();
??}
??jmsMessagingTemplate.convertAndSend(queue,?msg);
?}
?public?void?sendStringQueueList(String?destination,?String?msg)?{
??System.out.println("xxooq");
??ArrayList?list?=?new?ArrayList<>();
??list.add("1");
??list.add("2");
??jmsMessagingTemplate.convertAndSend(new?ActiveMQQueue(destination),?list);
?}
}
編寫(xiě)消費(fèi)者
@JmsListener(destination?=?"user",containerFactory?=?"jmsListenerContainerQueue")
public?void?receiveStringQueue(String?msg)?{
????????System.out.println("接收到消息...."?+?msg);
????}
@JmsListener(destination?=?"ooo",containerFactory?=?"jmsListenerContainerTopic")
public?void?receiveStringTopic(String?msg)?{
?????System.out.println("接收到消息...."?+?msg);
?}
小結(jié)
本文詳細(xì)介紹了為什么需要引入消息隊(duì)列,JMS、ActiveMQ的基礎(chǔ)概念以及常用API,與原生JAVA整合及SpringBoot整合等知識(shí)點(diǎn),可以讓大家更好的了解ActiveMQ的使用場(chǎng)景及使用方式。

1.?人人都能看懂的 6 種限流實(shí)現(xiàn)方案!
3.?大型網(wǎng)站架構(gòu)演化發(fā)展歷程
6. 看完這篇Redis緩存三大問(wèn)題,保你能和面試官互扯
7. 程序員必知的 89 個(gè)操作系統(tǒng)核心概念
8. 深入理解 MySQL:快速學(xué)會(huì)分析SQL執(zhí)行效率
10. Spring Boot 面試,一個(gè)問(wèn)題就干趴下了!

掃碼二維碼關(guān)注我
·end·
—如果本文有幫助,請(qǐng)分享到朋友圈吧—
我們一起愉快的玩耍!

