新來個技術總監(jiān),把RabbitMQ講的那叫一個透徹,佩服!
二哥編程知識星球 (戳鏈接加入)正式上線了,來和 310 多名 小伙伴一起打怪升級吧!這是一個 Java 學習指南 + 編程實戰(zhàn)的私密圈子,你可以向二哥提問、幫你制定學習計劃、跟著二哥一起做實戰(zhàn)項目,沖沖沖。
Java程序員進階之路網(wǎng)址:https://tobebetterjavaer.com
常見的消息隊列很多,主要包括 RabbitMQ、Kafka、RocketMQ 和 ActiveMQ。這篇文章只講 RabbitMQ,屬于基礎入門篇,但是肯定不 Low,太 Low 的文章我也不會寫。
如果你是 RabbitMQ 大牛,這篇文章不適合你,如果你對 RabbitMQ 只停留在使用,或者初步了解,甚至完全不了解,這篇文章對你來說就非常適用。
文章非常長,如果你能一次性看完,“大神,請收下我的膝蓋”,所以建議先收藏,啥時需要面試,或者工作中遇到了,可以再慢慢看,先來一副思維導圖。

消息隊列
消息隊列模式
消息隊列目前主要 2 種模式,分別為“點對點模式”和“發(fā)布/訂閱模式”。
點對點模式
一個具體的消息只能由一個消費者消費。多個生產(chǎn)者可以向同一個消息隊列發(fā)送消息;但是,一個消息在被一個消息者處理的時候,這個消息在隊列上會被鎖住或者被移除并且其他消費者無法處理該消息。需要額外注意的是,如果消費者處理一個消息失敗了,消息系統(tǒng)一般會把這個消息放回隊列,這樣其他消費者可以繼續(xù)處理。

發(fā)布/訂閱模式
單個消息可以被多個訂閱者并發(fā)的獲取和處理。一般來說,訂閱有兩種類型:
臨時(ephemeral)訂閱,這種訂閱只有在消費者啟動并且運行的時候才存在。一旦消費者退出,相應的訂閱以及尚未處理的消息就會丟失。 持久(durable)訂閱,這種訂閱會一直存在,除非主動去刪除。消費者退出后,消息系統(tǒng)會繼續(xù)維護該訂閱,并且后續(xù)消息可以被繼續(xù)處理。

衡量標準
對消息隊列進行技術選型時,需要通過以下指標衡量你所選擇的消息隊列,是否可以滿足你的需求:
消息順序:發(fā)送到隊列的消息,消費時是否可以保證消費的順序,比如 A 先下單,B 后下單,應該是 A 先去扣庫存,B 再去扣,順序不能反。 消息路由:根據(jù)路由規(guī)則,只訂閱匹配路由規(guī)則的消息,比如有 A/B 兩者規(guī)則的消息,消費者可以只訂閱 A 消息,B 消息不會消費。 消息可靠性:是否會存在丟消息的情況,比如有 A/B 兩個消息,最后只有 B 消息能消費,A 消息丟失。 消息時序:主要包括“消息存活時間”和“延遲/預定的消息”,“消息存活時間”表示生產(chǎn)者可以對消息設置 TTL,如果超過該 TTL,消息會自動消失;“延遲/預定的消息”指的是可以延遲或者預訂消費消息,比如延時 5 分鐘,那么消息會 5 分鐘后才能讓消費者消費,時間未到的話,是不能消費的。 消息留存:消息消費成功后,是否還會繼續(xù)保留在消息隊列。 容錯性:當一條消息消費失敗后,是否有一些機制,保證這條消息是一種能成功,比如異步第三方退款消息,需要保證這條消息消費掉,才能確定給用戶退款成功,所以必須保證這條消息消費成功的準確性。 伸縮:當消息隊列性能有問題,比如消費太慢,是否可以快速支持庫容;當消費隊列過多,浪費系統(tǒng)資源,是否可以支持縮容。 吞吐量:支持的最高并發(fā)數(shù)。
RabbitMQ 原理初探
RabbitMQ 是一個純 Java、分布式、隊列模型的開源消息中間件,前身是 MetaQ,是阿里參考 Kafka 特點研發(fā)的一個隊列模型的消息中間件,后開源給 apache 基金會成為了 apache 的頂級開源項目,具有高性能、高可靠、高實時、分布式特點。
基本概念
提到 RabbitMQ,就不得不提 AMQP 協(xié)議。AMQP 協(xié)議是具有現(xiàn)代特征的二進制協(xié)議。是一個提供統(tǒng)一消息服務的應用層標準高級消息隊列協(xié)議,是應用層協(xié)議的一個開放標準,為面向消息的中間件設計。先了解一下 AMQP 協(xié)議中間的幾個重要概念:
Server:接收客戶端的連接,實現(xiàn) AMQP 實體服務。 Connection:連接,應用程序與 Server 的網(wǎng)絡連接,TCP 連接。 Channel:信道,消息讀寫等操作在信道中進行。客戶端可以建立多個信道,每個信道代表一個會話任務。 Message:消息,應用程序和服務器之間傳送的數(shù)據(jù),消息可以非常簡單,也可以很復雜。由 Properties 和 Body 組成。Properties 為外包裝,可以對消息進行修飾,比如消息的優(yōu)先級、延遲等高級特性;Body 就是消息體內容。 Virtual Host:虛擬主機,用于邏輯隔離。一個虛擬主機里面可以有若干個 Exchange 和 Queue,同一個虛擬主機里面不能有相同名稱的 Exchange 或 Queue。 Exchange:交換器,接收消息,按照路由規(guī)則將消息路由到一個或者多個隊列。如果路由不到,或者返回給生產(chǎn)者,或者直接丟棄。RabbitMQ 常用的交換器常用類型有 direct、topic、fanout、headers 四種,后面詳細介紹。 Binding:綁定,交換器和消息隊列之間的虛擬連接,綁定中可以包含一個或者多個 RoutingKey。 RoutingKey:路由鍵,生產(chǎn)者將消息發(fā)送給交換器的時候,會發(fā)送一個 RoutingKey,用來指定路由規(guī)則,這樣交換器就知道把消息發(fā)送到哪個隊列。路由鍵通常為一個“.”分割的字符串,例如“com.rabbitmq”。 Queue:消息隊列,用來保存消息,供消費者消費。
系統(tǒng)架構
AMQP 協(xié)議模型由三部分組成:生產(chǎn)者、消費者和服務端。生產(chǎn)者是投遞消息的一方,首先連接到 Server,建立一個連接,開啟一個信道;然后生產(chǎn)者聲明交換器和隊列,設置相關屬性,并通過路由鍵將交換器和隊列進行綁定。同理,消費者也需要進行建立連接,開啟信道等操作,便于接收消息。接著生產(chǎn)者就可以發(fā)送消息,發(fā)送到服務端中的虛擬主機,虛擬主機中的交換器根據(jù)路由鍵選擇路由規(guī)則,然后發(fā)送到不同的消息隊列中,這樣訂閱了消息隊列的消費者就可以獲取到消息,進行消費。

總結一下整體過程:生產(chǎn)者投遞消息 -> 和 Server 建立連接,開啟信道 -> 聲明交換器和隊列,并通過路由鍵將交換機和隊列綁定 -> 投遞消息到虛擬主機 -> 消息發(fā)送到消息隊列 -> 消費者建立連接 -> 消費消息 -> 關系信道和連接。
常用交換器
RabbitMQ 常用的交換器類型有 direct、topic、fanout、headers 四種:
Direct Exchange:見文知意,直連交換機意思是此交換機需要綁定一個隊列,要求該消息與一個特定的路由鍵完全匹配。簡單點說就是一對一的,點對點的發(fā)送。

Fanout Exchange:這種類型的交換機需要將隊列綁定到交換機上。一個發(fā)送到交換機的消息都會被轉發(fā)到與該交換機綁定的所有隊列上。很像子網(wǎng)廣播,每臺子網(wǎng)內的主機都獲得了一份復制的消息。簡單點說就是發(fā)布訂閱。

Topic Exchange:直接翻譯的話叫做主題交換機,如果從用法上面翻譯可能叫通配符交換機會更加貼切。這種交換機是使用通配符去匹配,路由到對應的隊列。通配符有兩種:"*" 、 "#"。需要注意的是通配符前面必須要加上"."符號。
*符號:有且只匹配一個詞。比如 a.*可以匹配到"a.b"、"a.c",但是匹配不了"a.b.c"。
#符號:匹配一個或多個詞。比如"rabbit.#"既可以匹配到"rabbit.a.b"、"rabbit.a",也可以匹配到"rabbit.a.b.c"。

Headers Exchange:這種交換機用的相對沒這么多。它跟上面三種有點區(qū)別,它的路由不是用 routingKey 進行路由匹配,而是在匹配請求頭中所帶的鍵值進行路由。創(chuàng)建隊列需要設置綁定的頭部信息,有兩種模式:全部匹配和部分匹配。如上圖所示,交換機會根據(jù)生產(chǎn)者發(fā)送過來的頭部信息攜帶的鍵值去匹配隊列綁定的鍵值,路由到對應的隊列。

消費原理
我們先看幾個基本概念:
broker:每個節(jié)點運行的服務程序,功能為維護該節(jié)點的隊列的增刪以及轉發(fā)隊列操作請求。 master queue:每個隊列都分為一個主隊列和若干個鏡像隊列。 mirror queue:鏡像隊列,作為 master queue 的備份。在 master queue 所在節(jié)點掛掉之后,系統(tǒng)把 mirror queue 提升為 master queue,負責處理客戶端隊列操作請求。注意,mirror queue 只做鏡像,設計目的不是為了承擔客戶端讀寫壓力。
集群中有兩個節(jié)點,每個節(jié)點上有一個 broker,每個 broker 負責本機上隊列的維護,并且 borker 之間可以互相通信。集群中有兩個隊列 A 和 B,每個隊列都分為 master queue 和 mirror queue(備份)。那么隊列上的生產(chǎn)消費怎么實現(xiàn)的呢?

對于消費隊列,如下圖有兩個 consumer 消費隊列 A,這兩個 consumer 連在了集群的不同機器上。RabbitMQ 集群中的任何一個節(jié)點都擁有集群上所有隊列的元信息,所以連接到集群中的任何一個節(jié)點都可以,主要區(qū)別在于有的 consumer 連在 master queue 所在節(jié)點,有的連在非 master queue 節(jié)點上。
因為 mirror queue 要和 master queue 保持一致,故需要同步機制,正因為一致性的限制,導致所有的讀寫操作都必須都操作在 master queue 上(想想,為啥讀也要從 master queue 中讀?和數(shù)據(jù)庫讀寫分離是不一樣的),然后由 master 節(jié)點同步操作到 mirror queue 所在的節(jié)點。即使 consumer 連接到了非 master queue 節(jié)點,該 consumer 的操作也會被路由到 master queue 所在的節(jié)點上,這樣才能進行消費。

對于生成隊列,原理和消費一樣,如果連接到非 master queue 節(jié)點,則路由過去。

所以,到這里小伙伴們就可以看到 RabbitMQ 的不足:由于 master queue 單節(jié)點,導致性能瓶頸,吞吐量受限。雖然為了提高性能,內部使用了 Erlang 這個語言實現(xiàn),但是終究擺脫不了架構設計上的致命缺陷。
高級特性
過期時間
Time To Live,也就是生存時間,是一條消息在隊列中的最大存活時間,單位是毫秒,下面看看 RabbitMQ 過期時間特性:
RabbitMQ 可以對消息和隊列設置 TTL。 RabbitMQ 支持設置消息的過期時間,在消息發(fā)送的時候可以進行指定,每條消息的過期時間可以不同。 RabbitMQ 支持設置隊列的過期時間,從消息入隊列開始計算,直到超過了隊列的超時時間配置,那么消息會變成死信,自動清除。 如果兩種方式一起使用,則過期時間以兩者中較小的那個數(shù)值為準。 當然也可以不設置 TTL,不設置表示消息不會過期;如果設置為 0,則表示除非此時可以直接將消息投遞到消費者,否則該消息將被立即丟棄。
消息確認
為了保證消息從隊列可靠地到達消費者,RabbitMQ 提供了消息確認機制。消費者訂閱隊列的時候,可以指定 autoAck 參數(shù),當 autoAck 為 true 的時候,RabbitMQ 采用自動確認模式,RabbitMQ 自動把發(fā)送出去的消息設置為確認,然后從內存或者硬盤中刪除,而不管消費者是否真正消費到了這些消息。當 autoAck 為 false 的時候,RabbitMQ 會等待消費者回復的確認信號,收到確認信號之后才從內存或者磁盤中刪除消息。
消息確認機制是 RabbitMQ 消息可靠性投遞的基礎,只要設置 autoAck 參數(shù)為 false,消費者就有足夠的時間處理消息,不用擔心處理消息的過程中消費者進程掛掉后消息丟失的問題。
持久化
消息的可靠性是 RabbitMQ 的一大特色,那么 RabbitMQ 是如何保證消息可靠性的呢?答案就是消息持久化。持久化可以防止在異常情況下丟失數(shù)據(jù)。RabbitMQ 的持久化分為三個部分:交換器持久化、隊列持久化和消息的持久化。
交換器持久化可以通過在聲明隊列時將 durable 參數(shù)設置為 true。如果交換器不設置持久化,那么在 RabbitMQ 服務重啟之后,相關的交換器元數(shù)據(jù)會丟失,不過消息不會丟失,只是不能將消息發(fā)送到這個交換器了。
隊列的持久化能保證其本身的元數(shù)據(jù)不會因異常情況而丟失,但是不能保證內部所存儲的消息不會丟失。要確保消息不會丟失,需要將其設置為持久化。隊列的持久化可以通過在聲明隊列時將 durable 參數(shù)設置為 true。
設置了隊列和消息的持久化,當 RabbitMQ 服務重啟之后,消息依然存在。如果只設置隊列持久化或者消息持久化,重啟之后消息都會消失。
當然,也可以將所有的消息都設置為持久化,但是這樣做會影響 RabbitMQ 的性能,因為磁盤的寫入速度比內存的寫入要慢得多。對于可靠性不是那么高的消息可以不采用持久化處理以提高整體的吞吐量。魚和熊掌不可兼得,關鍵在于選擇和取舍。在實際中,需要根據(jù)實際情況在可靠性和吞吐量之間做一個權衡。
死信隊列
當消息在一個隊列中變成死信之后,他能被重新發(fā)送到另一個交換器中,這個交換器成為死信交換器,與該交換器綁定的隊列稱為死信隊列。消息變成死信有下面幾種情況:
消息被拒絕。 消息過期 隊列達到最大長度
DLX 也是一個正常的交換器,和一般的交換器沒有區(qū)別,他能在任何的隊列上面被指定,實際上就是設置某個隊列的屬性。當這個隊列中有死信的時候,RabbitMQ 會自動將這個消息重新發(fā)送到設置的交換器上,進而被路由到另一個隊列,我們可以監(jiān)聽這個隊列中消息做相應的處理。
死信隊列有什么用?當發(fā)生異常的時候,消息不能夠被消費者正常消費,被加入到了死信隊列中。后續(xù)的程序可以根據(jù)死信隊列中的內容分析當時發(fā)生的異常,進而改善和優(yōu)化系統(tǒng)。
延遲隊列
一般的隊列,消息一旦進入隊列就會被消費者立即消費。延遲隊列就是進入該隊列的消息會被消費者延遲消費,延遲隊列中存儲的對象是的延遲消息,“延遲消息”是指當消息被發(fā)送以后,等待特定的時間后,消費者才能拿到這個消息進行消費。
延遲隊列用于需要延遲工作的場景。最常見的使用場景:淘寶或者天貓我們都使用過,用戶在下單之后通常有 30 分鐘的時間進行支付,如果這 30 分鐘之內沒有支付成功,那么訂單就會自動取消。除了延遲消費,延遲隊列的典型應用場景還有延遲重試。比如消費者從隊列里面消費消息失敗了,可以延遲一段時間以后進行重試。
特性分析
這里才是內容的重點,不僅需要知道 Rabbit 的特性,還需要知道支持這些特性的原因:
消息路由(支持):RabbitMQ 可以通過不同的交換器支持不同種類的消息路由; 消息有序(不支持):當消費消息時,如果消費失敗,消息會被放回隊列,然后重新消費,這樣會導致消息無序; 消息時序(非常好):通過延時隊列,可以指定消息的延時時間,過期時間 TTL 等; 容錯處理(非常好):通過交付重試和死信交換器(DLX)來處理消息處理故障; 伸縮(一般):伸縮其實沒有非常智能,因為即使伸縮了,master queue 還是只有一個,負載還是只有這一個 master queue 去抗,所以我理解 RabbitMQ 的伸縮很弱(個人理解)。 持久化(不太好):沒有消費的消息,可以支持持久化,這個是為了保證機器宕機時消息可以恢復,但是消費過的消息,就會被馬上刪除,因為 RabbitMQ 設計時,就不是為了去存儲歷史數(shù)據(jù)的。 消息回溯(不支持):因為消息不支持永久保存,所以自然就不支持回溯。 高吞吐(中等):因為所有的請求的執(zhí)行,最后都是在 master queue,它的這個設計,導致單機性能達不到十萬級的標準。
RabbitMQ 環(huán)境搭建
因為我用的是 Mac,所以直接可以參考官網(wǎng):
https://www.rabbitmq.com/install-homebrew.html
需要注意的是,一定需要先執(zhí)行:
brew update
然后再執(zhí)行:
brew install rabbitmq
之前沒有執(zhí)行 brew update,直接執(zhí)行 brew install rabbitmq 時,會報各種各樣奇怪的錯誤,其中“403 Forbidde”居多。
但是在執(zhí)行“brew install rabbitmq”,會自動安裝其它的程序,如果你使用源碼安裝 Rabbitmq,因為啟動該服務依賴 erlang 環(huán)境,所以你還需手動安裝 erlang,但是目前官方已經(jīng)一鍵給你搞定,會自動安裝 Rabbitmq 依賴的所有程序,是不是很棒!

最后執(zhí)行成功的輸出如下:

啟動服務:
# 啟動方式1:后臺啟動
brew services start rabbitmq
# 啟動方式2:當前窗口啟動
cd /usr/local/Cellar/rabbitmq/3.8.19
rabbitmq-server
在瀏覽器輸入:
http://localhost:15672/
會出現(xiàn) RabbitMQ 后臺管理界面(用戶名和密碼都為 guest):

通過 brew 安裝,一行命令搞定,真香!
RabbitMQ 測試
添加賬號
首先得啟動 mq
## 添加賬號
./rabbitmqctl add_user admin admin
## 添加訪問權限
./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
## 設置超級權限
./rabbitmqctl set_user_tags admin administrator
編碼實測
因為代碼中引入了 java 8 的特性,pom 引入依賴:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.1</version>
</dependency>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
開始寫代碼:
public class RabbitMqTest {
//消息隊列名稱
private final static String QUEUE_NAME = "hello";
@Test
public void send() throws java.io.IOException, TimeoutException {
//創(chuàng)建連接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
//創(chuàng)建連接
Connection connection = factory.newConnection();
//創(chuàng)建消息通道
Channel channel = connection.createChannel();
//生成一個消息隊列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for (int i = 0; i < 10; i++) {
String message = "Hello World RabbitMQ count: " + i;
//發(fā)布消息,第一個參數(shù)表示路由(Exchange名稱),為""則表示使用默認消息路由
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
//關閉消息通道和連接
channel.close();
connection.close();
}
@Test
public void consumer() throws java.io.IOException, TimeoutException {
//創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
//創(chuàng)建連接
Connection connection = factory.newConnection();
//創(chuàng)建消息信道
final Channel channel = connection.createChannel();
//消息隊列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("[*] Waiting for message. To exist press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
執(zhí)行 send()后控制臺輸出:
[x] Sent 'Hello World RabbitMQ count: 0'
[x] Sent 'Hello World RabbitMQ count: 1'
[x] Sent 'Hello World RabbitMQ count: 2'
[x] Sent 'Hello World RabbitMQ count: 3'
[x] Sent 'Hello World RabbitMQ count: 4'
[x] Sent 'Hello World RabbitMQ count: 5'
[x] Sent 'Hello World RabbitMQ count: 6'
[x] Sent 'Hello World RabbitMQ count: 7'
[x] Sent 'Hello World RabbitMQ count: 8'
[x] Sent 'Hello World RabbitMQ count: 9'

執(zhí)行 consumer()后:

示例中的代碼講解,可以直接參考官網(wǎng):https://www.rabbitmq.com/tutorials/tutorial-one-java.html
基本使用姿勢
公共代碼封裝
封裝工廠類:
public class RabbitUtil {
public static ConnectionFactory getConnectionFactory() {
//創(chuàng)建連接工程,下面給出的是默認的case
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
return factory;
}
}
封裝生成者:
public class MsgProducer {
public static void publishMsg(String exchange, BuiltinExchangeType exchangeType, String toutingKey, String message) throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
//創(chuàng)建連接
Connection connection = factory.newConnection();
//創(chuàng)建消息通道
Channel channel = connection.createChannel();
// 聲明exchange中的消息為可持久化,不自動刪除
channel.exchangeDeclare(exchange, exchangeType, true, false, null);
// 發(fā)布消息
channel.basicPublish(exchange, toutingKey, null, message.getBytes());
System.out.println("Sent '" + message + "'");
channel.close();
connection.close();
}
}
封裝消費者:
public class MsgConsumer {
public static void consumerMsg(String exchange, String queue, String routingKey)
throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
//創(chuàng)建連接
Connection connection = factory.newConnection();
//創(chuàng)建消息信道
final Channel channel = connection.createChannel();
//消息隊列
channel.queueDeclare(queue, true, false, false, null);
//綁定隊列到交換機
channel.queueBind(queue, exchange, routingKey);
System.out.println("[*] Waiting for message. To exist press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
try {
System.out.println(" [x] Received '" + message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 取消自動ack
channel.basicConsume(queue, false, consumer);
}
}
Direct 方式

Direct 示例
生產(chǎn)者:
public class DirectConsumer {
private static final String exchangeName = "direct.exchange";
public void msgConsumer(String queueName, String routingKey) {
try {
MsgConsumer.consumerMsg(exchangeName, queueName, routingKey);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
DirectConsumer consumer = new DirectConsumer();
String[] routingKey = new String[]{"aaa", "bbb", "ccc"};
String[] queueNames = new String[]{"qa", "qb", "qc"};
for (int i = 0; i < 3; i++) {
consumer.msgConsumer(queueNames[i], routingKey[i]);
}
Thread.sleep(1000 * 60 * 100);
}
}
執(zhí)行生產(chǎn)者,往消息隊列中放入 10 條消息,其中 key 分別為“aaa”、“bbb”和“ccc”,分別放入 qa、qb、qc 三個隊列:

下面是 qa 隊列的信息:

消費者:
public class DirectProducer {
private static final String EXCHANGE_NAME = "direct.exchange";
public void publishMsg(String routingKey, String msg) {
try {
MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, routingKey, msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
DirectProducer directProducer = new DirectProducer();
String[] routingKey = new String[]{"aaa", "bbb", "ccc"};
String msg = "hello >>> ";
for (int i = 0; i < 10; i++) {
directProducer.publishMsg(routingKey[i % 3], msg + i);
}
System.out.println("----over-------");
Thread.sleep(1000 * 60 * 100);
}
}
執(zhí)行后的輸出:
[*] Waiting for message. To exist press CTRL+C
[x] Received 'hello >>> 0
[x] Done
[x] Received 'hello >>> 3
[x] Done
[x] Received 'hello >>> 6
[x] Done
[x] Received 'hello >>> 9
[x] Done
[*] Waiting for message. To exist press CTRL+C
[x] Received 'hello >>> 1
[x] Done
[x] Received 'hello >>> 4
[x] Done
[x] Received 'hello >>> 7
[x] Done
[*] Waiting for message. To exist press CTRL+C
[x] Received 'hello >>> 2
[x] Done
[x] Received 'hello >>> 5
[x] Done
[x] Received 'hello >>> 8
[x] Done
可以看到,分別從 qa、qb、qc 中將不同的 key 的數(shù)據(jù)消費掉。
問題探討
有個疑問:這個隊列的名稱 qa、qb 和 qc 是 RabbitMQ 自動生成的么,我們可以指定隊列名稱么?
我做了個簡單的實驗,我把消費者代碼修改了一下:
public static void main(String[] args) throws InterruptedException {
DirectConsumer consumer = new DirectConsumer();
String[] routingKey = new String[]{"aaa", "bbb", "ccc"};
String[] queueNames = new String[]{"qa", "qb", "qc1"}; // 將qc修改為qc1
for (int i = 0; i < 3; i++) {
consumer.msgConsumer(queueNames[i], routingKey[i]);
}
Thread.sleep(1000 * 60 * 100);
}
執(zhí)行后如下圖所示:

我們可以發(fā)現(xiàn),多了一個 qc1,所以可以判斷這個界面中的 queues,是消費者執(zhí)行時,會將消費者指定的隊列名稱和 direct.exchange 綁定,綁定的依據(jù)就是 key。
當我們把隊列中的數(shù)據(jù)全部消費掉,然后重新執(zhí)行生成者后,會發(fā)現(xiàn) qc 和 qc1 中都有 3 條待消費的數(shù)據(jù),因為綁定的 key 都是“ccc”,所以兩者的數(shù)據(jù)是一樣的:

綁定關系如下:

注意:當沒有 Queue 綁定到 Exchange 時,往 Exchange 中寫入的消息也不會重新分發(fā)到之后綁定的 queue 上。
思考:不執(zhí)行消費者,看不到這個 Queres 中信息,我其實可以把這個界面理解為消費者信息界面。不過感覺還是怪怪的,這個 queues 如果是消費者信息,就不應該叫 queues,我理解 queues 應該是 RabbitMQ 中實際存放數(shù)據(jù)的 queues,難道是我理解錯了?
Fanout 方式(指定隊列)

生產(chǎn)者封裝:
public class FanoutProducer {
private static final String EXCHANGE_NAME = "fanout.exchange";
public void publishMsg(String routingKey, String msg) {
try {
MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
FanoutProducer directProducer = new FanoutProducer();
String msg = "hello >>> ";
for (int i = 0; i < 10; i++) {
directProducer.publishMsg("", msg + i);
}
}
}
消費者:
public class FanoutConsumer {
private static final String EXCHANGE_NAME = "fanout.exchange";
public void msgConsumer(String queueName, String routingKey) {
try {
MsgConsumer.consumerMsg(EXCHANGE_NAME, queueName, routingKey);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
FanoutConsumer consumer = new FanoutConsumer();
String[] queueNames = new String[]{"qa-2", "qb-2", "qc-2"};
for (int i = 0; i < 3; i++) {
consumer.msgConsumer(queueNames[i], "");
}
}
}
執(zhí)行生成者,結果如下:

我們發(fā)現(xiàn),生產(chǎn)者生產(chǎn)的 10 條數(shù)據(jù),在每個消費者中都可以消費,這個是和 Direct 不同的地方,但是使用 Fanout 方式時,有幾個點需要注意一下:
生產(chǎn)者的 routkey 可以為空,因為生產(chǎn)者的所有數(shù)據(jù),會下放到每一個隊列,所以不會通過 routkey 去路由; 消費者需要指定 queues,因為消費者需要綁定到指定的 queues 才能消費。

這幅圖就畫出了 Fanout 的精髓之處,exchange 會和所有的 queue 進行綁定,不區(qū)分路由,消費者需要綁定指定的 queue 才能發(fā)起消費。
注意:往隊列塞數(shù)據(jù)時,可能通過界面看不到消息個數(shù)的增加,可能是你之前已經(jīng)開啟了消費進程,導致增加的消息馬上被消費了。
Fanout 方式(隨機獲取隊列)
上面我們是指定了隊列,這個方式其實很不友好,比如對于 Fanout,我其實根本無需關心隊列的名字,如果還指定對應隊列進行消費,感覺這個很冗余,所以我們這里就采用隨機獲取隊列名字的方式,下面代碼直接 Copy 官網(wǎng)。
生成者封裝:
public static void publishMsgV2(String exchange, BuiltinExchangeType exchangeType, String message) throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
//創(chuàng)建連接
Connection connection = factory.newConnection();
//創(chuàng)建消息通道
Channel channel = connection.createChannel();
// 聲明exchange中的消息
channel.exchangeDeclare(exchange, exchangeType);
// 發(fā)布消息
channel.basicPublish(exchange, "", null, message.getBytes("UTF-8"));
System.out.println("Sent '" + message + "'");
channel.close();
connection.close();
}
消費者封裝:
public static void consumerMsgV2(String exchange) throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(exchange, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchange, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
生產(chǎn)者:
public class FanoutProducer {
private static final String EXCHANGE_NAME = "fanout.exchange.v2";
public void publishMsg(String msg) {
try {
MsgProducer.publishMsgV2(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
FanoutProducer directProducer = new FanoutProducer();
String msg = "hello >>> ";
for (int i = 0; i < 10000; i++) {
directProducer.publishMsg(msg + i);
}
}
}
消費者:
public class FanoutConsumer {
private static final String EXCHANGE_NAME = "fanout.exchange.v2";
public void msgConsumer() {
try {
MsgConsumer.consumerMsgV2(EXCHANGE_NAME);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
FanoutConsumer consumer = new FanoutConsumer();
for (int i = 0; i < 3; i++) {
consumer.msgConsumer();
}
}
}
執(zhí)行后,管理界面如下:



Topic 方式

代碼詳見官網(wǎng):https://www.rabbitmq.com/tutorials/tutorial-five-java.html
更多方式,請直接查看官網(wǎng):https://www.rabbitmq.com/getstarted.html

RabbitMQ 進階
參考文章:https://liuyueyi.github.io/hexblog/2018/05/29/RabbitMQ%E5%9F%BA%E7%A1%80%E6%95%99%E7%A8%8B%E4%B9%8B%E4%BD%BF%E7%94%A8%E8%BF%9B%E9%98%B6%E7%AF%87/
durable 和 autoDeleted
在定義 Queue 時,可以指定這兩個參數(shù):
/**
* Declare an exchange.
* @see com.rabbitmq.client.AMQP.Exchange.Declare
* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
* @param exchange the name of the exchange
* @param type the exchange type
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
* @param autoDelete true if the server should delete the exchange when it is no longer in use
* @param arguments other properties (construction arguments) for the exchange
* @return a declaration-confirm method to indicate the exchange was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
durable
持久化,保證 RabbitMQ 在退出或者 crash 等異常情況下數(shù)據(jù)沒有丟失,需要將 queue,exchange 和 Message 都持久化。
若是將 queue 的持久化標識 durable 設置為 true,則代表是一個持久的隊列,那么在服務重啟之后,會重新讀取之前被持久化的 queue。
雖然隊列可以被持久化,但是里面的消息是否為持久化,還要看消息的持久化設置。即重啟 queue,但是 queue 里面還沒有發(fā)出去的消息,那隊列里面還存在該消息么?這個取決于該消息的設置。
autoDeleted
自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用于臨時隊列。
當一個 Queue 被設置為自動刪除時,當消費者斷掉之后,queue 會被刪除,這個主要針對的是一些不是特別重要的數(shù)據(jù),不希望出現(xiàn)消息積累的情況。
小節(jié)
當一個 Queue 已經(jīng)聲明好了之后,不能更新 durable 或者 autoDelted 值;當需要修改時,需要先刪除再重新聲明 消費的 Queue 聲明應該和投遞的 Queue 聲明的 durable,autoDelted 屬性一致,否則會報錯 對于重要的數(shù)據(jù),一般設置 durable=true, autoDeleted=false 對于設置 autoDeleted=true 的隊列,當沒有消費者之后,隊列會自動被刪除
ACK
執(zhí)行一個任務可能需要花費幾秒鐘,你可能會擔心如果一個消費者在執(zhí)行任務過程中掛掉了。一旦 RabbitMQ 將消息分發(fā)給了消費者,就會從內存中刪除。在這種情況下,如果正在執(zhí)行任務的消費者宕機,會丟失正在處理的消息和分發(fā)給這個消費者但尚未處理的消息。
但是,我們不想丟失任何任務,如果有一個消費者掛掉了,那么我們應該將分發(fā)給它的任務交付給另一個消費者去處理。
為了確保消息不會丟失,RabbitMQ 支持消息應答。消費者發(fā)送一個消息應答,告訴 RabbitMQ 這個消息已經(jīng)接收并且處理完畢了。RabbitMQ 就可以刪除它了。
因此手動 ACK 的常見手段:
// 接收消息之后,主動ack/nak
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
try {
System.out.println(" [ " + queue + " ] Received '" + message);
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
};
// 取消自動ack
channel.basicConsume(queue, false, consumer);
盡信書則不如無書,因個人能力有限,難免有疏漏和錯誤之處,如發(fā)現(xiàn) bug 或者有更好的建議,歡迎批評指正,不吝感激
沒有什么使我停留——除了目的,縱然岸旁有玫瑰、有綠蔭、有寧靜的港灣,我是不系之舟。
推薦閱讀:

