RabbitMQ的學(xué)習(xí)筆記
1. 什么是MQ
MQ(Message Quene) : ?翻譯為 消息隊(duì)列, 它是典型的 生產(chǎn)者和消費(fèi)者模型, 生產(chǎn)者不斷向消息隊(duì)列中生產(chǎn)消息,消費(fèi)者不斷的從隊(duì)列中獲取消息。因?yàn)橄⒌纳a(chǎn)和消費(fèi)都是異步的,而且只關(guān)心消息的發(fā)送和接收,沒(méi)有業(yè)務(wù)邏輯的侵入,輕松的實(shí)現(xiàn)系統(tǒng)間解耦。別名為消息中間件,通過(guò)利用高效可靠的消息傳遞機(jī)制進(jìn)行平臺(tái)無(wú)關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來(lái)進(jìn)行分布式系統(tǒng)的集成。
2. ?MQ的分類及各自的特點(diǎn)
當(dāng)今市面上有很多主流的消息中間件,如老牌的ActiveMQ、RabbitMQ,炙手可熱的Kafka,阿里巴巴自主開(kāi)發(fā)RocketMQ等。
不同的MQ的特點(diǎn)
#?1.ActiveMQ
?ActiveMQ 是Apache出品,最流行的,能力強(qiáng)勁的開(kāi)源消息隊(duì)列。它是一個(gè)完全支持JMS規(guī)范的的消息中間件。豐富的API,多種集群架構(gòu)模式讓ActiveMQ在業(yè)界成為老牌的消息中間件,在中小型企業(yè)頗受歡迎!
#?2.Kafka
?Kafka是LinkedIn開(kāi)源的分布式發(fā)布-訂閱消息系統(tǒng),目前歸屬于Apache頂級(jí)項(xiàng)目。Kafka主要特點(diǎn)是基于Pull的模式來(lái)處理消息消費(fèi),追求高吞吐量,一開(kāi)始的目的就是用于日志收集和傳輸。0.8版本開(kāi)始支持復(fù)制,不支持事務(wù),對(duì)消息的重復(fù)、丟失、錯(cuò)誤沒(méi)有嚴(yán)格要求,適合產(chǎn)生大量數(shù)據(jù)的互聯(lián)網(wǎng)服務(wù)的數(shù)據(jù)收集業(yè)務(wù)。
#?3.RocketMQ
?RocketMQ是阿里開(kāi)源的消息中間件,它是純Java開(kāi)發(fā),具有高吞吐量、高可用性、適合大規(guī)模分布式系統(tǒng)應(yīng)用的特點(diǎn)。RocketMQ思路起源于Kafka,但并不是Kafka的一個(gè)Copy,它對(duì)消息的可靠傳輸及事務(wù)性做了優(yōu)化,目前在阿里集團(tuán)被廣泛應(yīng)用于交易、充值、流計(jì)算、消息推送、日志流式處理、binglog分發(fā)等場(chǎng)景。
#?4.RabbitMQ
?RabbitMQ是使用Erlang語(yǔ)言開(kāi)發(fā)的開(kāi)源消息隊(duì)列系統(tǒng),基于AMQP協(xié)議來(lái)實(shí)現(xiàn)。AMQP的主要特征是面向消息、隊(duì)列、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱)、可靠性、安全。AMQP協(xié)議更多用在企業(yè)系統(tǒng)內(nèi)對(duì)數(shù)據(jù)一致性、穩(wěn)定性和可靠性要求很高的場(chǎng)景,對(duì)性能和吞吐量的要求還在其次。??
RabbitMQ比Kafka可靠,Kafka更適合IO高吞吐的處理,一般應(yīng)用在大數(shù)據(jù)日志處理或?qū)?shí)時(shí)性(少量延遲),可靠性(少量丟數(shù)據(jù))要求稍低的場(chǎng)景使用,比如ELK日志收集。
3. ?什么是RabbitMQ
基于AMQP協(xié)議,erlang語(yǔ)言開(kāi)發(fā),是部署最廣泛的開(kāi)源消息中間件,是最受歡迎的開(kāi)源消息中間件之一。
官網(wǎng):https://www.rabbitmq.com/
官方教程:https://www.rabbitmq.com/#getstarted


# AMQP協(xié)議:
?AMQP(advanced message queuing protocol)在2003年時(shí)被提出,最早用于解決金融領(lǐng)不同平臺(tái)之間的消息傳遞交互問(wèn)題。顧名思義,AMQP是一種協(xié)議,更準(zhǔn)確的說(shuō)是一種binary wire-level protocol(鏈接協(xié)議)。這是其和JMS的本質(zhì)差別,AMQP不從API層進(jìn)行限定,而是直接定義網(wǎng)絡(luò)交換的數(shù)據(jù)格式。這使得實(shí)現(xiàn)了AMQP的provider天然性就是跨平臺(tái)的。以下是AMQP協(xié)議模型:
AMQP協(xié)議模型:

4. RabbitMQ的安裝
1. 下載(在CentOS7下進(jìn)行)
官網(wǎng)下載地址 : https://www.rabbitmq.com/download.html

要下載的安裝包:

https://wwr.lanzoui.com/b02iazwib ? 密碼:26f5(藍(lán)奏云鏈接)
2. 安裝步驟
#?1.將rabbitmq安裝包系列文件上傳到linux系統(tǒng)中
?erlang-22.0.7-1.el7.x86_64.rpm
?rabbitmq-server-3.7.18-1.el7.noarch.rpm
?socat-1.7.3.2-2.el7.x86_64.rpm
#?2.安裝(依次進(jìn)行,我安裝在了/opt目錄下)?
?rpm?-ivh?erlang-22.0.7-1.el7.x86_64.rpm?-(這是Erlang依賴包)
?rpm?-ivh?socat-1.7.3.2-2.el7.x86_64.rpm?
?rpm?-ivh?rabbitmq-server-3.7.18-1.el7.noarch.rpm?
#?3.復(fù)制配置文件
?cp?/usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example?/etc/rabbitmq/rabbitmq.config
#?4.修改配置文件的副本
?即復(fù)制到/etc/rabbitmq目錄下的rabbitmq.cinfig
?將61行代碼修改一下:將配置文件中去掉%%,以及最后的,逗號(hào)(如下圖)
# 5.注意:
?默認(rèn)安裝完成后配置文件模板在:/usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example目錄中,需要將配置文件復(fù)制到/etc/rabbitmq/目錄中,并修改名稱為rabbitmq.config(使用和redis差不多),以后我們使用的配置文件就是這個(gè)副本
?也可以用命令找到那個(gè)配置文件?:?find?/?-name?rabbitmq.config.example


3. 啟動(dòng)插件管理
rabbitmq-plugins?enable?rabbitmq_management

4. 啟動(dòng)RabbitMQ服務(wù)
systemctl?start?rabbitmq-server?啟動(dòng)
systemctl?restart?rabbitmq-server?重啟
systemctl?stop?rabbitmq-server?停止
systemctl?status?rabbitmq-server?查看服務(wù)狀態(tài)

5. 關(guān)閉防火墻服務(wù)
systemctl?disable?firewalld??從開(kāi)機(jī)啟動(dòng)中移除
systemctl?stop?firewalld???關(guān)閉防火墻
systemctl?status?firewalld??查看防火墻狀態(tài)
注意:如果使用云服務(wù)器,除了防火墻外還要考慮云端的防火墻或者安全組策略(如下圖)



6. 訪問(wèn)web管理頁(yè)面
到了這一步就說(shuō)明rabbitmq安裝好了已經(jīng)。
http://121.196.207.86:15672/
默認(rèn)端口15672
登錄管理界面:
username:??guest
password:??guest


5. RabbitMQ的相關(guān)命令
5.1 服務(wù)器啟動(dòng)相關(guān)
systemctl?start?rabbitmq-server?啟動(dòng)
systemctl?restart?rabbitmq-server?重啟
systemctl?stop?rabbitmq-server?停止
systemctl?status?rabbitmq-server?查看當(dāng)前狀態(tài)
5.2 管理命令行,用于在不使用web界面情況下用命令操作RabbitMQ
可以使用下面命令查看更多的使用技巧
rabbitmqctl?help
比如查看當(dāng)前用戶
rabbitmqctl?list_users


5.3 插件管理命令行
rabbitmq-plugins?enable|list|disable?

6. RabbitMQ的web界面
6.1 overview概覽


- connections:無(wú)論生產(chǎn)者還是消費(fèi)者,都需要與RabbitMQ建立連接后才可以完成消息的生產(chǎn)和消費(fèi),在這里可以查看連接情況
- channels:通道,建立連接后,會(huì)形成通道,消息的投遞獲取依賴通道。
- Exchanges:交換機(jī),用來(lái)實(shí)現(xiàn)消息的路由
- Queues:隊(duì)列,即消息隊(duì)列,消息存放在隊(duì)列中,等待消費(fèi),消費(fèi)后被移除隊(duì)列。
- admin:管理用戶和虛擬主機(jī)
6.2 添加用戶

上面的Tags選項(xiàng),其實(shí)是指定用戶的角色,可選的有以下幾個(gè):
超級(jí)管理員(administrator)
可登陸管理控制臺(tái),可查看所有的信息,并且可以對(duì)用戶,策略(policy)進(jìn)行操作。
監(jiān)控者(monitoring)
可登陸管理控制臺(tái),同時(shí)可以查看rabbitmq節(jié)點(diǎn)的相關(guān)信息(進(jìn)程數(shù),內(nèi)存使用情況,磁盤(pán)使用情況等)
策略制定者(policymaker)
可登陸管理控制臺(tái), 同時(shí)可以對(duì)policy進(jìn)行管理。但無(wú)法查看節(jié)點(diǎn)的相關(guān)信息(上圖紅框標(biāo)識(shí)的部分)。
普通管理者(management)
僅可登陸管理控制臺(tái),無(wú)法看到節(jié)點(diǎn)信息,也無(wú)法對(duì)策略進(jìn)行管理。
其他
無(wú)法登陸管理控制臺(tái),通常就是普通的生產(chǎn)者和消費(fèi)者。
6.3 添加虛擬主機(jī)
#?虛擬主機(jī)
為了讓各個(gè)用戶可以互不干擾的工作,RabbitMQ添加了虛擬主機(jī)(Virtual Hosts)的概念。其實(shí)就是一個(gè)獨(dú)立的訪問(wèn)路徑,不同用戶使用不同路徑,各自有自己的隊(duì)列、交換機(jī),互相不會(huì)影響。

6.4 將用戶與虛擬主機(jī)進(jìn)行綁定
創(chuàng)建好虛擬主機(jī)

點(diǎn)擊添加好的虛擬主機(jī), 進(jìn)入虛擬機(jī)設(shè)置界面,進(jìn)行用戶設(shè)置:

7. RabbitMQ的七種消息模型
7.1 rabbitMQ支持的消息模型




7.2第一種模型(直連)

在上圖的模型中,有以下概念:
- P:生產(chǎn)者,也就是要發(fā)送消息的程序
- C:消費(fèi)者:消息的接受者,會(huì)一直等待消息到來(lái)。
- queue:消息隊(duì)列,圖中紅色部分。類似一個(gè)郵箱,可以緩存消息;生產(chǎn)者向其中投遞消息,消費(fèi)者從其中取出消息。
準(zhǔn)備
- 創(chuàng)建一個(gè)Virtual Hosts,名字為ems,創(chuàng)建一個(gè)Users,名字為ems ,密碼為123
- 然后將該users添加到名字為ems的Virtual Hosts中

- 注意使用阿里云時(shí)要開(kāi)放端口5672
maven項(xiàng)目導(dǎo)入依賴
<dependency>
????<groupId>com.rabbitmq</groupId>
????<artifactId>amqp-client</artifactId>
????<version>5.7.2</version>
</dependency>
創(chuàng)建生產(chǎn)者
public?class?Provider?{
????//生產(chǎn)消息
????@Test
????public?void?testSendMessage()?throws?IOException,?TimeoutException?{
????????//創(chuàng)建連接mq的連接工廠對(duì)象
????????ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
????????//設(shè)置連接rabbitmq主機(jī)
????????connectionFactory.setHost("121.196.207.86");
????????//設(shè)置端口號(hào)
????????connectionFactory.setPort(5672);
????????//設(shè)置要連接的那個(gè)虛擬主機(jī)
????????connectionFactory.setVirtualHost("ems");
????????//設(shè)置訪問(wèn)虛擬主機(jī)的用戶名和密碼
????????connectionFactory.setUsername("ems");
????????connectionFactory.setPassword("123");
????????//獲取連接對(duì)象
????????Connection?connection?=?connectionFactory.newConnection();
????????//創(chuàng)建通道
????????Channel?channel?=?connection.createChannel();
????????//通道綁定對(duì)應(yīng)消息隊(duì)列
?????????/*?參數(shù)1:??隊(duì)列名稱?如果隊(duì)列不存在自動(dòng)創(chuàng)建
????????????????參數(shù)2:??用來(lái)定義隊(duì)列特性是否要持久化?true?持久化隊(duì)列???false?不持久化
????????????????參數(shù)3:??exclusive?是否獨(dú)占隊(duì)列??true?獨(dú)占隊(duì)列???false??不獨(dú)占
????????????????參數(shù)4:??autoDelete:?是否在消費(fèi)完成后自動(dòng)刪除隊(duì)列??true?自動(dòng)刪除??false?不自動(dòng)刪除
????????????????參數(shù)5:??額外附加參數(shù)?*/
????????channel.queueDeclare("hello1",true,false,false,null);
????????
????????//發(fā)布消息
????????//參數(shù)1:?交換機(jī)名稱?參數(shù)2:隊(duì)列名稱??參數(shù)3:傳遞消息額外設(shè)置??參數(shù)4:消息的具體內(nèi)容
????????channel.basicPublish("","hello1",?MessageProperties.PERSISTENT_TEXT_PLAIN,"hello?rabbitmq".getBytes());
????????
????????//關(guān)閉通道和連接對(duì)象
????????channel.close();
????????connection.close();


創(chuàng)建消費(fèi)者
public?class?Customer?{
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????//創(chuàng)建連接工廠
????????ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
????????//設(shè)置連接rabbitmq主機(jī)
????????connectionFactory.setHost("121.196.207.86");
????????//設(shè)置端口號(hào)
????????connectionFactory.setPort(5672);
????????//設(shè)置要連接那個(gè)虛擬主機(jī)
????????connectionFactory.setVirtualHost("ems");
????????//設(shè)置訪問(wèn)虛擬主機(jī)的用戶名和密碼
????????connectionFactory.setUsername("ems");
????????connectionFactory.setPassword("123");
????????//創(chuàng)建連接對(duì)象
????????Connection?connection?=?connectionFactory.newConnection();
????????//創(chuàng)建通道
????????Channel?channel?=?connection.createChannel();
????????//通道綁定對(duì)象
????????/*?參數(shù)1:??隊(duì)列名稱?如果隊(duì)列不存在自動(dòng)創(chuàng)建
????????????????參數(shù)2:??用來(lái)定義隊(duì)列特性是否要持久化?true?持久化隊(duì)列???false?不持久化
????????????????參數(shù)3:??exclusive?是否獨(dú)占隊(duì)列??true?獨(dú)占隊(duì)列???false??不獨(dú)占
????????????????參數(shù)4:??autoDelete:?是否在消費(fèi)完成后自動(dòng)刪除隊(duì)列??true?自動(dòng)刪除??false?不自動(dòng)刪除
????????????????參數(shù)5:??額外附加參數(shù)?*/
????????channel.queueDeclare("hello1",true,false,false,null);
????????//消費(fèi)消息
????????//參數(shù)1:?消費(fèi)那個(gè)隊(duì)列的消息?隊(duì)列名稱
????????//參數(shù)2:?開(kāi)始消息的自動(dòng)確認(rèn)機(jī)制
????????//參數(shù)3:?消費(fèi)時(shí)的回調(diào)接口
????????channel.basicConsume("hello1",true,new?DefaultConsumer(channel){
????????????@Override?//最后一個(gè)參數(shù):?消息隊(duì)列中取出的消息
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("===================================="+new?String(body));
????????????}
????????});
????}
}



7.3 封裝工具類(優(yōu)化代碼)
public?class?RabbitMQUtils?{
????private?static?ConnectionFactory?connectionFactory;
????private?static?Properties?properties;
????static{
????????//重量級(jí)資源??類加載時(shí)執(zhí)行只執(zhí)行一次
????????connectionFactory?=?new?ConnectionFactory();
????????connectionFactory.setHost("121.196.207.86");
????????connectionFactory.setPort(5672);
????????connectionFactory.setVirtualHost("ems");
????????connectionFactory.setUsername("ems");
????????connectionFactory.setPassword("123");
????}
????//定義提供連接對(duì)象的方法
????public?static?Connection?getConnection()?{
????????try?{
????????????return?connectionFactory.newConnection();
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}
????????return?null;
????}
????//關(guān)閉通道和關(guān)閉連接工具方法
????public?static?void?closeConnectionAndChanel(Channel?channel,?Connection?conn)?{
????????try?{
????????????if(channel!=null){
????????????????channel.close();
????????????}
????????????if(conn!=null){
????????????????conn.close();
????????????}
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}
????}
}



7.4 參數(shù)的說(shuō)明
#?通道綁定對(duì)應(yīng)消息隊(duì)列
*?channel.queueDeclare("hello1",true,false,false,null);
//參數(shù)1:??隊(duì)列名稱?如果隊(duì)列不存在自動(dòng)創(chuàng)建,這里只是通道創(chuàng)建一個(gè)隊(duì)列,二者不是一對(duì)一的關(guān)系,通道也可以向別的隊(duì)列發(fā)送消息,決定通道向哪個(gè)隊(duì)列發(fā)送消息的方法是channel.basicPublish(...)中的參數(shù)二
//參數(shù)2:??用來(lái)定義隊(duì)列特性是否要持久化?true?持久化隊(duì)列???false?不持久化,如果選擇為false,則不會(huì)持久化,即在rabbitmq重啟后該隊(duì)列會(huì)丟失,也包括里面的數(shù)據(jù),而持久化后,rabbitmq重啟(systemctl?restart?rabbitmq-server)后,隊(duì)列不會(huì)丟失,而里面的消息會(huì)丟失
//參數(shù)3:??exclusive?是否獨(dú)占隊(duì)列??true?獨(dú)占隊(duì)列???false??不獨(dú)占,獨(dú)占隊(duì)列表示只能被一個(gè)連接通道訪問(wèn),其他的連接通道訪問(wèn)不到這個(gè)隊(duì)列,一般為false
//參數(shù)4:??autoDelete:?是否在消費(fèi)完成后即隊(duì)列中沒(méi)有消息后自動(dòng)刪除隊(duì)列??true?自動(dòng)刪除??false?不自動(dòng)刪除
//參數(shù)5:??額外附加參數(shù)
#?發(fā)布消息
*?channel.basicPublish("","hello1",?MessageProperties.PERSISTENT_TEXT_PLAIN,"hello?rabbitmq".getBytes());
//參數(shù)1:?交換機(jī)名稱?參數(shù)2:隊(duì)列名稱??參數(shù)3:傳遞消息額外設(shè)置,上面的參數(shù)表示在rabbitmq重啟后消息和隊(duì)列都還在(被持久化到本地了)?參數(shù)4:消息的具體內(nèi)容




7.5 第二種模型(work queue)
Work queues,也被稱為(Task queues),任務(wù)模型。當(dāng)消息處理比較耗時(shí)的時(shí)候,可能生產(chǎn)消息的速度會(huì)遠(yuǎn)遠(yuǎn)大于消息的消費(fèi)速度。長(zhǎng)此以往,消息就會(huì)堆積越來(lái)越多,無(wú)法及時(shí)處理。此時(shí)就可以使用work 模型:讓多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列,共同消費(fèi)隊(duì)列中的消息。隊(duì)列中的消息一旦消費(fèi),就會(huì)消失,因此任務(wù)是不會(huì)被重復(fù)執(zhí)行的。

角色:
- P:生產(chǎn)者:任務(wù)的發(fā)布者
- C1:消費(fèi)者-1,領(lǐng)取任務(wù)并且完成任務(wù),假設(shè)完成速度較慢
- C2:消費(fèi)者-2:領(lǐng)取任務(wù)并完成任務(wù),假設(shè)完成速度快
開(kāi)發(fā)生產(chǎn)者
通過(guò)上面的工具類來(lái)定義一個(gè)生產(chǎn)者
public?class?Provider?{
????public?static?void?main(String[]?args)?throws?IOException?{
????????//獲取連接對(duì)象
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????//通過(guò)連接對(duì)象創(chuàng)建通道對(duì)象
????????Channel?channel?=?connection.createChannel();
????????//通過(guò)通道聲明隊(duì)列
????????channel.queueDeclare("work",?true,?false,?false,?null);
????????for?(int?i?=?1;?i?<=20;?i++)?{
????????????//生產(chǎn)消息
????????????channel.basicPublish("",?"work",?null,?(i?+?"hello?work?quene").getBytes());
????????}
????????//關(guān)閉資源
????????RabbitMQUtils.closeConnectionAndChanel(channel,?connection);
????}
}
開(kāi)發(fā)消費(fèi)者-1
//通過(guò)工具類獲取連接
Connection?connection?=?RabbitMQUtils.getConnection();
//通過(guò)連接創(chuàng)建通道
Channel?channel?=?connection.createChannel();
//通道綁定隊(duì)列
channel.queueDeclare("work",true,false,false,null);
//消費(fèi)?隊(duì)列中的消息
??//參數(shù)1:?消費(fèi)那個(gè)隊(duì)列的消息?隊(duì)列名稱
????????//參數(shù)2:?開(kāi)始消息的自動(dòng)確認(rèn)機(jī)制
????????//參數(shù)3:?消費(fèi)時(shí)的回調(diào)接口
channel.basicConsume("work",true,new?DefaultConsumer(channel){
??@Override
??public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????System.out.println("消費(fèi)者1:?"+new?String(body));
??}
});
開(kāi)發(fā)消費(fèi)者-2
//通過(guò)工具類獲取連接
Connection?connection?=?RabbitMQUtils.getConnection();
//通過(guò)連接創(chuàng)建通道
Channel?channel?=?connection.createChannel();
//通道綁定隊(duì)列
channel.queueDeclare("work",true,false,false,null);
//消費(fèi)?隊(duì)列中的消息
??//參數(shù)1:?消費(fèi)那個(gè)隊(duì)列的消息?隊(duì)列名稱
????????//參數(shù)2:?開(kāi)始消息的自動(dòng)確認(rèn)機(jī)制
????????//參數(shù)3:?消費(fèi)時(shí)的回調(diào)接口
channel.basicConsume("work",true,new?DefaultConsumer(channel){
??@Override
??public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????try?{
??????Thread.sleep(1000);???//處理消息比較慢?一秒處理一個(gè)消息
????}?catch?(InterruptedException?e)?{
??????e.printStackTrace();
????}
????System.out.println("消費(fèi)者2:?"+new?String(body));??
??}
});
測(cè)試結(jié)果


總結(jié):默認(rèn)情況下,RabbitMQ將按順序?qū)⒚總€(gè)消息發(fā)送給下一個(gè)使用者。平均而言,每個(gè)消費(fèi)者都會(huì)收到相同數(shù)量的消息。這種分發(fā)消息的方式稱為循環(huán)。
但是,上面的方式存在問(wèn)題,即如果兩個(gè)消費(fèi)者中一個(gè)消費(fèi)者消費(fèi)的的消息快,另一個(gè)慢,而兩個(gè)又因?yàn)闀?huì)最終消費(fèi)的消息一樣,這就會(huì)導(dǎo)致效率問(wèn)題,我們要保證能者多勞,不是平均分配
7.6 消息自動(dòng)確認(rèn)機(jī)制
*?Doing?a?task?can?take?a?few?seconds.?You?may?wonder?what?happens?if?one?of?the?consumers?starts?a?long?task?and?dies?with?it?only?partly?done.?With?our?current?code,?once?RabbitMQ?delivers?a?message?to?the?consumer?it?immediately?marks?it?for?deletion.?In?this?case,?if?you?kill?a?worker?we?will?lose?the?message?it?was?just?processing.?We'll?also?lose?all?the?messages?that?were?dispatched?to?this?particular?worker?but?were?not?yet?handled.
(完成一項(xiàng)任務(wù)可能需要幾秒鐘。你可能想知道,如果一個(gè)消費(fèi)者開(kāi)始了一項(xiàng)漫長(zhǎng)的任務(wù),但只完成了一部分就死了,會(huì)發(fā)生什么。在我們當(dāng)前的代碼中,一旦RabbitMQ向消費(fèi)者發(fā)送了一條消息,它就會(huì)立即將其標(biāo)記為刪除。在這種情況下,如果您殺死一個(gè)worker,我們將丟失它正在處理的消息。我們還將丟失所有發(fā)送給這個(gè)特定worker但尚未處理的消息)
But?we?don't?want?to?lose?any?tasks.?If?a?worker?dies,?we'd?like?the?task?to?be?delivered?to?another?worker.
(但我們不想失去任何任務(wù)。如果一個(gè)消費(fèi)者死了,我們希望把任務(wù)交給另一個(gè)消費(fèi)者。)
對(duì)消費(fèi)者1,2進(jìn)行改變:
public?class?Customer1?{
????public?static?void?main(String[]?args)?throws?IOException?{
????????//獲取連接
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????//通過(guò)連接創(chuàng)建通道
????????final?Channel?channel?=?connection.createChannel();
????????channel.basicQos(1);//每一次只能消費(fèi)一個(gè)消息
????????//通道綁定隊(duì)列
????????channel.queueDeclare("work",true,false,false,null);
????????//參數(shù)1:隊(duì)列名稱??參數(shù)2:消息自動(dòng)確認(rèn)?true??消費(fèi)者自動(dòng)向rabbitmq確認(rèn)消息消費(fèi)??false?不會(huì)自動(dòng)確認(rèn)消息
????????//參數(shù)三?消費(fèi)時(shí)的回調(diào)接口
????????channel.basicConsume("work",false,new?DefaultConsumer(channel){
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????try{
????????????????????Thread.sleep(2000);
????????????????}catch?(Exception?e){
????????????????????e.printStackTrace();
????????????????}
????????????????System.out.println("消費(fèi)者-1:?"+new?String(body));
????????????????//?參數(shù)1:確認(rèn)隊(duì)列中那個(gè)具體消息?參數(shù)2:是否開(kāi)啟多個(gè)消息同時(shí)
????????????????channel.basicAck(envelope.getDeliveryTag(),false);
????????????}
????????});
????}
}
public?class?Customer2?{
????public?static?void?main(String[]?args)?throws?IOException?{
????????//獲取連接
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????//通過(guò)連接創(chuàng)建通道
????????final?Channel?channel?=?connection.createChannel();
????????//每一次只能消費(fèi)一個(gè)消息
????????channel.basicQos(1);
????????//通道綁定對(duì)列
????????channel.queueDeclare("work",true,false,false,null);
????????//參數(shù)1:隊(duì)列名稱??參數(shù)2:消息自動(dòng)確認(rèn)?true??消費(fèi)者自動(dòng)向rabbitmq確認(rèn)消息消費(fèi)??false?不會(huì)自動(dòng)確認(rèn)消息
????????//參數(shù)3?消費(fèi)時(shí)的回調(diào)接口
????????channel.basicConsume("work",false,new?DefaultConsumer(channel){
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("消費(fèi)者-2:?"+new?String(body));
????????????????//手動(dòng)確認(rèn)??參數(shù)1:手動(dòng)確認(rèn)消息標(biāo)識(shí)??參數(shù)2:false?每次確認(rèn)一個(gè)
????????????????channel.basicAck(envelope.getDeliveryTag(),?false);
????????????}
????????});
????}
}
設(shè)置每個(gè)通道一次只能消費(fèi)一個(gè)消息,避免接收了消息但因?yàn)闆](méi)有時(shí)間處理而導(dǎo)致的消費(fèi)者效率低下。
關(guān)閉消息的自動(dòng)確認(rèn),開(kāi)啟手動(dòng)確認(rèn)消息,每收到一條消息就會(huì)發(fā)送確認(rèn)給隊(duì)列,然后接收下一條。
這樣就可以能者多勞,消費(fèi)快的消費(fèi)的消息多。


7.7 第三種模型(fanout 廣播)
fanout 扇出 也稱為廣播

在廣播模式下,消息發(fā)送流程是這樣的:
- 可以有多個(gè)消費(fèi)者
- 每個(gè)消費(fèi)者有自己的queue(隊(duì)列)
- 每個(gè)隊(duì)列都要綁定到Exchange(交換機(jī))
- 生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機(jī),交換機(jī)來(lái)決定要發(fā)給哪個(gè)隊(duì)列,生產(chǎn)者無(wú)法決定。
- 交換機(jī)把消息發(fā)送給綁定過(guò)的所有隊(duì)列
- 隊(duì)列的消費(fèi)者都能拿到消息。實(shí)現(xiàn)一條消息被多個(gè)消費(fèi)者消費(fèi)
開(kāi)發(fā)生產(chǎn)者:
public?class?Provider?{
????public?static?void?main(String[]?args)?throws?IOException?{
????????//獲取連接對(duì)象
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????//通過(guò)連接對(duì)象創(chuàng)建通道
????????Channel?channel?=?connection.createChannel();
????????//將通道聲明指定交換機(jī)???
????????//?參數(shù)1:?交換機(jī)名稱????參數(shù)2:?交換機(jī)類型??fanout?廣播類型
????????channel.exchangeDeclare("logs","fanout");
????????//發(fā)送消息給交換機(jī)
????????channel.basicPublish("logs","",null,"fanout?type?message".getBytes());
????????//釋放資源
????????RabbitMQUtils.closeConnectionAndChanel(channel,connection);
????}
}

開(kāi)發(fā)消費(fèi)者-1
public?class?Customer1?{
????public?static?void?main(String[]?args)?throws?IOException?{
????????//獲取連接對(duì)象
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????//通過(guò)連接對(duì)象創(chuàng)建通道
????????Channel?channel?=?connection.createChannel();
????????//通道綁定交換機(jī)
????????channel.exchangeDeclare("logs","fanout");
????????//臨時(shí)隊(duì)列
????????String?queueName?=?channel.queueDeclare().getQueue();
????????//綁定交換機(jī)和隊(duì)列
????????channel.queueBind(queueName,"logs","");
????????//消費(fèi)消息
????????channel.basicConsume(queueName,true,new?DefaultConsumer(channel){
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("消費(fèi)者1:?"+new?String(body));
????????????}
????????});
???}
開(kāi)發(fā)消費(fèi)者-2
public?class?Customer2?{
????public?static?void?main(String[]?args)?throws?IOException?{
????????//獲取連接對(duì)象
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????Channel?channel?=?connection.createChannel();
????????//通道綁定交換機(jī)
????????channel.exchangeDeclare("logs","fanout");
????????//臨時(shí)隊(duì)列
????????String?queueName?=?channel.queueDeclare().getQueue();
????????//綁定交換機(jī)和隊(duì)列
????????channel.queueBind(queueName,"logs","");
????????//消費(fèi)消息
????????channel.basicConsume(queueName,true,new?DefaultConsumer(channel){
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("消費(fèi)者2:?"+new?String(body));
????????????}
????????});
????}
}
開(kāi)發(fā)消費(fèi)者-3
public?class?Customer3?{
????public?static?void?main(String[]?args)?throws?IOException?{
????????//獲取連接對(duì)象
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????Channel?channel?=?connection.createChannel();
????????//通道綁定交換機(jī)
????????channel.exchangeDeclare("logs","fanout");
????????//臨時(shí)隊(duì)列
????????String?queueName?=?channel.queueDeclare().getQueue();
????????//綁定交換機(jī)和隊(duì)列
????????channel.queueBind(queueName,"logs","");
????????//消費(fèi)消息
????????channel.basicConsume(queueName,true,new?DefaultConsumer(channel){
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("消費(fèi)者3:?"+new?String(body));
????????????}
????????});
????}
}
測(cè)試結(jié)果
三個(gè)消費(fèi)者都會(huì)從交換機(jī)收到消息



7.8 第四種模型(Routing )
7.8.1 Routing 之訂閱模型-Direct(直連)
在Fanout模式中,一條消息,會(huì)被所有訂閱的隊(duì)列都消費(fèi)。但是,在某些場(chǎng)景下,我們希望不同的消息被不同的隊(duì)列消費(fèi)。這時(shí)就要用到Direct類型的交換機(jī)(Exchange)。
在Direct模型下:
- 隊(duì)列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個(gè)RoutingKey(路由key)
- 消息的發(fā)送方在 向 Exchange發(fā)送消息時(shí),也必須指定消息的 RoutingKey。
- Exchange不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的Routing Key進(jìn)行判斷,只有隊(duì)列的Routingkey與消息的Routing key完全一致,才會(huì)接收到消息
流程:

圖解:
- **P:生產(chǎn)者,**向Exchange發(fā)送消息,發(fā)送消息時(shí),會(huì)指定一個(gè)routing key。
- X:Exchange(交換機(jī)),接收生產(chǎn)者的消息,然后把消息遞交給 與routing key完全匹配的隊(duì)列
- C1:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 error 的消息
- C2:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 info、error、warning 的消息
開(kāi)發(fā)生產(chǎn)者
public?class?Provider?{
????public?static?void?main(String[]?args)?throws?IOException?{
????????//獲取連接對(duì)象
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????//獲取連接通道對(duì)象
????????Channel?channel?=?connection.createChannel();
????????//聲明交換機(jī)名字(復(fù)用性)
????????String?exchangeName?=?"logs_direct";
????????//通過(guò)通道聲明交換機(jī)??參數(shù)1:交換機(jī)名稱??參數(shù)2:direct??路由模式
????????channel.exchangeDeclare(exchangeName,"direct");
????????//指定routingkey
????????String?routingkey1?=?"error";
????????String?routingkey2?=?"info";
????????//發(fā)送消息
????????channel.basicPublish(exchangeName,routingkey1,null,("這是direct模型發(fā)布的基于route?key:?["+routingkey1+"]?發(fā)送的消息").getBytes());
????????channel.basicPublish(exchangeName,routingkey2,null,("這是direct模型發(fā)布的基于route?key:?["+routingkey2+"]?發(fā)送的消息").getBytes());
????????//關(guān)閉資源
????????RabbitMQUtils.closeConnectionAndChanel(channel,connection);
????}
}
開(kāi)發(fā)消費(fèi)者-1
public?class?Customer1?{
????public?static?void?main(String[]?args)?throws?IOException?{
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????Channel?channel?=?connection.createChannel();
????????String?exchangeName?=?"logs_direct";
????????//通道聲明交換機(jī)以及交換的類型
????????channel.exchangeDeclare(exchangeName,"direct");
????????//創(chuàng)建一個(gè)臨時(shí)隊(duì)列
????????String?queue?=?channel.queueDeclare().getQueue();
????????//基于route?key綁定隊(duì)列和交換機(jī)
????????channel.queueBind(queue,exchangeName,"error");
????????//獲取消費(fèi)的消息
????????channel.basicConsume(queue,true,new?DefaultConsumer(channel){
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("消費(fèi)者1:?"+?new?String(body));
????????????}
????????});
????}
}
開(kāi)發(fā)消費(fèi)者-2
public?class?Customer2?{
????public?static?void?main(String[]?args)?throws?IOException?{
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????Channel?channel?=?connection.createChannel();
????????String?exchangeName?=?"logs_direct";
????????//聲明交換機(jī)?以及交換機(jī)類型?direct
????????channel.exchangeDeclare(exchangeName,"direct");
????????//創(chuàng)建一個(gè)臨時(shí)隊(duì)列
????????String?queue?=?channel.queueDeclare().getQueue();
????????//臨時(shí)隊(duì)列和交換機(jī)綁定
????????channel.queueBind(queue,exchangeName,"info");
????????channel.queueBind(queue,exchangeName,"error");
????????channel.queueBind(queue,exchangeName,"warning");
????????//消費(fèi)消息
????????channel.basicConsume(queue,true,new?DefaultConsumer(channel){
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("消費(fèi)者2:?"+new?String(body));
????????????}
????????});
????}
}
測(cè)試生產(chǎn)者發(fā)送Route key為error和info的消息時(shí)


7.8.2 Routing 之訂閱模型-Topic
Topic類型的Exchange與Direct相比,都是可以根據(jù)RoutingKey把消息路由到不同的隊(duì)列。只不過(guò)Topic類型Exchange可以讓隊(duì)列在綁定Routing key 的時(shí)候使用通配符!這種模型Routingkey **一般都是由一個(gè)或多個(gè)單詞組成,**多個(gè)單詞之間以”.”分割,例如:item.insert

#?通配符
?*?(star)?can?substitute?for?exactly?one?word.????匹配不多不少恰好1個(gè)詞
?#?(hash)?can?substitute?for?zero?or?more?words.??匹配零個(gè)或多個(gè)詞
#?如:
?audit.#????匹配audit.irs.corporate或者?audit.irs?等
?audit.*???只能匹配?audit.irs
開(kāi)發(fā)生產(chǎn)者
public?class?Provider?{
????public?static?void?main(String[]?args)?throws?IOException?{
????????//獲取連接對(duì)象
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????Channel?channel?=?connection.createChannel();
????????//聲明交換機(jī)以及交換機(jī)類型?topic
????????channel.exchangeDeclare("topics","topic");
????????//定義routekey
????????String?routekey?=?"save.user.delete.test";
????????//發(fā)布消息
????????channel.basicPublish("topics",routekey,null,("這里是topic動(dòng)態(tài)路由模型,routekey:?["+routekey+"]").getBytes());
????????//關(guān)閉資源
????????RabbitMQUtils.closeConnectionAndChanel(channel,connection);
????}
}
開(kāi)發(fā)消費(fèi)者-1
Routing Key中使用*通配符方式
public?class?Customer1?{
????public?static?void?main(String[]?args)?throws?IOException?{
????????//獲取連接
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????Channel?channel?=?connection.createChannel();
????????//聲明交換機(jī)以及交換機(jī)類型
????????channel.exchangeDeclare("topics","topic");
????????//創(chuàng)建一個(gè)臨時(shí)隊(duì)列
????????String?queue?=?channel.queueDeclare().getQueue();
????????//綁定隊(duì)列和交換機(jī)??動(dòng)態(tài)統(tǒng)配符形式route key
????????channel.queueBind(queue,"topics","*.user.*");
????????//消費(fèi)消息
????????channel.basicConsume(queue,true,new?DefaultConsumer(channel){
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("消費(fèi)者1:?"+?new?String(body));
????????????}
????????});
????}
}
開(kāi)發(fā)消費(fèi)者-2
Routing Key中使用#通配符方式
public?class?Customer2?{
????public?static?void?main(String[]?args)?throws?IOException?{
????????//獲取連接
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????Channel?channel?=?connection.createChannel();
????????//聲明交換機(jī)以及交換機(jī)類型
????????channel.exchangeDeclare("topics","topic");
????????//創(chuàng)建一個(gè)臨時(shí)隊(duì)列
????????String?queue?=?channel.queueDeclare().getQueue();
????????//綁定隊(duì)列和交換機(jī)??動(dòng)態(tài)統(tǒng)配符形式route key
????????channel.queueBind(queue,"topics","*.user.#");
????????//消費(fèi)消息
????????channel.basicConsume(queue,true,new?DefaultConsumer(channel){
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("消費(fèi)者2:?"+?new?String(body));
????????????}
????????});
????}
}
測(cè)試結(jié)果


8. RabbitMQ整合SpringBoot
8.1 搭建環(huán)境
配置文件application.yml
spring:
??application:
????name:?rabbitmq-springboot
??rabbitmq:
????host:?121.196.207.86
????port:?5672
????username:?ems
????password:?123
????virtual-host:?ems
引入依賴,也可以在建項(xiàng)目的時(shí)候勾選
<!--引入與rabbitmq集成依賴-->
<dependency>
????<groupId>org.springframework.boot</groupId>
????<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
RabbitTemplate ?用來(lái)簡(jiǎn)化操作,使用時(shí)候直接在項(xiàng)目中注入即可使用
8.2 第一種直連模型
生產(chǎn)者:
@Autowired
private?RabbitTemplate?rabbitTemplate;
@Test
public?void?testHello(){
??rabbitTemplate.convertAndSend("hello","hello?world");
}
消費(fèi)者
@Component
@RabbitListener(queuesToDeclare?=?@Queue(value?=?"hello"))
public?class?HelloCustomer?{
????@RabbitHandler
????public?void?receive1(String?message){
????????System.out.println("=================================");
????????System.out.println("message?=?"?+?message);
????????System.out.println("=================================");
????}
}

8.3 第二種work模型
開(kāi)發(fā)生產(chǎn)者
@Autowired
private?RabbitTemplate?rabbitTemplate;
@Test
public?void?testWork(){
??for?(int?i?=?0;?i?<?10;?i++)?{
????rabbitTemplate.convertAndSend("work","hello?work!");
??}
}
開(kāi)發(fā)消費(fèi)者
@Component
public?class?WorkCustomer?{
????//一個(gè)消費(fèi)者
????@RabbitListener(queuesToDeclare?=?@Queue("work"))
????public?void?receive1(String?message){
????????System.out.println("message1?=?"+message);
????}
????
????//一個(gè)消費(fèi)者
????@RabbitListener(queuesToDeclare?=?@Queue("work"))
????public?void?receive2(String?message){
????????System.out.println("message2?=?"+message);
????}
}

說(shuō)明: ?默認(rèn)在Spring AMQP實(shí)現(xiàn)中Work這種方式就是公平調(diào)度,如果需要實(shí)現(xiàn)能者多勞需要額外配置
8.4 第三種Fanout 廣播模型
開(kāi)發(fā)生產(chǎn)者
//注入rabbitTemplate
@Autowired
private?RabbitTemplate?rabbitTemplate;
//fanout?廣播
@Test
public?void?testFanout(){
????rabbitTemplate.convertAndSend("logs","","Fanout的模型發(fā)送的消息");
}
開(kāi)發(fā)消費(fèi)者
@Component
public?class?FanoutCustomer?{
????@RabbitListener(bindings?=?{
????????????@QueueBinding(
????????????????????value?=?@Queue,//創(chuàng)建臨時(shí)隊(duì)列
????????????????????exchange?=@Exchange(value?=?"logs",type?=?"fanout")??//綁定的交換機(jī)
????????????)
????})
????public?void?receive1(String??message){
????????System.out.println("message1?=?"?+?message);
????}
????@RabbitListener(bindings?=?{
????????????@QueueBinding(
????????????????????value?=?@Queue,//創(chuàng)建臨時(shí)隊(duì)列
????????????????????exchange?=@Exchange(value?=?"logs",type?=?"fanout")??//綁定的交換機(jī)
????????????)
????})
????public?void?receive2(String??message){
????????System.out.println("message2?=?"?+?message);
????}
}

8.5 第四種Routing 路由模型
開(kāi)發(fā)生產(chǎn)者
//注入rabbitTemplate
@Autowired
private?RabbitTemplate?rabbitTemplate;
//route?路由模式
@Test
public?void?testRoute(){
????rabbitTemplate.convertAndSend("directs","error","發(fā)送info的key的路由信息");
}
開(kāi)發(fā)消費(fèi)者
@Component
public?class?RouteCustomer?{
????@RabbitListener(bindings?=?{
????????????@QueueBinding(
????????????????????value?=?@Queue,//創(chuàng)建臨時(shí)隊(duì)列
????????????????????exchange?=?@Exchange(value?=?"directs",type?=?"direct"),//自定交換機(jī)名稱和類型
????????????????????key?=?{"info","error","warn"}
????????????)
????})
????public?void?receive1(String?message){
????????System.out.println("message1?=?"?+?message);
????}
????@RabbitListener(bindings?=?{
????????????@QueueBinding(
????????????????????value?=?@Queue,//創(chuàng)建臨時(shí)隊(duì)列
????????????????????exchange?=?@Exchange(value?=?"directs",type?=?"direct"),//自定交換機(jī)名稱和類型
????????????????????key?=?{"error",}
????????????)
????})
????public?void?receive2(String?message){
????????System.out.println("message2?=?"?+?message);
????}
}

8.6 ?第五種Topic 訂閱模型(動(dòng)態(tài)路由模型)
開(kāi)發(fā)生產(chǎn)者
//注入rabbitTemplate
@Autowired
private?RabbitTemplate?rabbitTemplate;
//topic?動(dòng)態(tài)路由??訂閱模式
@Test
public?void?testTopic(){
????rabbitTemplate.convertAndSend("topics","product.save.add","produce.save.add?路由消息");
}
開(kāi)發(fā)消費(fèi)者
@Component
public?class?TopicCustomer?{
????@RabbitListener(bindings?=?{
????????????@QueueBinding(
????????????????????value?=?@Queue,
????????????????????exchange?=?@Exchange(type?=?"topic",name="topics"),
????????????????????key={"user.save","user.*"}
????????????)
????})
????public?void?receive1(String?message){
????????System.out.println("message1?=?"?+?message);
????}
????@RabbitListener(bindings?=?{
????????????@QueueBinding(
????????????????????value?=?@Queue,
????????????????????exchange?=?@Exchange(type?=?"topic",name="topics"),
????????????????????key={"order.#","product.#","user.*"}
????????????)
????})
????public?void?receive2(String?message){
????????System.out.println("message2?=?"?+?message);
????}
}

9.RabbitMQ的應(yīng)用場(chǎng)景
9.1 異步處理
場(chǎng)景說(shuō)明:用戶注冊(cè)后,需要發(fā)注冊(cè)郵件和注冊(cè)短信,傳統(tǒng)的做法有兩種 1.串行的方式 2.并行的方式
- 串行方式: 將注冊(cè)信息寫(xiě)入數(shù)據(jù)庫(kù)后,發(fā)送注冊(cè)郵件,再發(fā)送注冊(cè)短信,以上三個(gè)任務(wù)全部完成后才返回給客戶端。這有一個(gè)問(wèn)題是,郵件,短信并不是必須的,它只是一個(gè)通知,而這種做法讓客戶端等待沒(méi)有必要等待的東西.

- 并行方式: 將注冊(cè)信息寫(xiě)入數(shù)據(jù)庫(kù)后,發(fā)送郵件的同時(shí),發(fā)送短信,以上三個(gè)任務(wù)完成后,返回給客戶端,并行的方式能提高處理的時(shí)間。

- 消息隊(duì)列:假設(shè)三個(gè)業(yè)務(wù)節(jié)點(diǎn)分別使用50ms,串行方式使用時(shí)間150ms,并行使用時(shí)間100ms。雖然并行已經(jīng)提高的處理時(shí)間,但是,前面說(shuō)過(guò),郵件和短信對(duì)我正常的使用網(wǎng)站沒(méi)有任何影響,客戶端沒(méi)有必要等著其發(fā)送完成才顯示注冊(cè)成功,應(yīng)該是寫(xiě)入數(shù)據(jù)庫(kù)后就返回. ?消息隊(duì)列: 引入消息隊(duì)列后,把發(fā)送郵件,短信不是必須的業(yè)務(wù)邏輯異步處理

由此可以看出,引入消息隊(duì)列后,用戶的響應(yīng)時(shí)間就等于寫(xiě)入數(shù)據(jù)庫(kù)的時(shí)間+寫(xiě)入消息隊(duì)列的時(shí)間(可以忽略不計(jì)),引入消息隊(duì)列后處理后,響應(yīng)時(shí)間是串行的3倍,是并行的2倍。
9.2 應(yīng)用解耦
場(chǎng)景:雙11是購(gòu)物狂節(jié),用戶下單后,訂單系統(tǒng)需要通知庫(kù)存系統(tǒng),傳統(tǒng)的做法就是訂單系統(tǒng)調(diào)用庫(kù)存系統(tǒng)的接口.

這種做法有一個(gè)缺點(diǎn):
當(dāng)庫(kù)存系統(tǒng)出現(xiàn)故障時(shí),訂單就會(huì)失敗。訂單系統(tǒng)和庫(kù)存系統(tǒng)高耦合. ?引入消息隊(duì)列

訂單系統(tǒng) :用戶下單后,訂單系統(tǒng)完成持久化處理,將消息寫(xiě)入消息隊(duì)列,返回用戶訂單下單成功。
庫(kù)存系統(tǒng) :訂閱下單的消息,獲取下單消息,進(jìn)行庫(kù)操作。?就算庫(kù)存系統(tǒng)出現(xiàn)故障,消息隊(duì)列也能保證消息的可靠投遞,不會(huì)導(dǎo)致消息丟失.
9.3 流量削峰
場(chǎng)景: 秒殺活動(dòng),一般會(huì)因?yàn)榱髁窟^(guò)大,導(dǎo)致應(yīng)用掛掉,為了解決這個(gè)問(wèn)題,一般在應(yīng)用前端加入消息隊(duì)列。
作用:
可以控制活動(dòng)人數(shù),超過(guò)此一定閥值的訂單直接丟棄(我為什么秒殺一次都沒(méi)有成功過(guò)呢^^)
可以緩解短時(shí)間的高流量壓垮應(yīng)用(應(yīng)用程序按自己的最大處理能力獲取訂單)

用戶的請(qǐng)求,服務(wù)器收到之后,首先寫(xiě)入消息隊(duì)列,加入消息隊(duì)列長(zhǎng)度超過(guò)最大值,則直接拋棄用戶請(qǐng)求或跳轉(zhuǎn)到錯(cuò)誤頁(yè)面.
秒殺業(yè)務(wù)根據(jù)消息隊(duì)列中的請(qǐng)求信息,再做后續(xù)處理.
10. RabbitMQ的集群
10.1 普通集群(副本集群)
All data/state required for the operation of a RabbitMQ broker is replicated across all nodes. An exception to this are message queues, which by default reside on one node, though they are visible and reachable from all nodes. To replicate queues across nodes in a cluster(RabbitMQ broker操作所需的所有數(shù)據(jù)/狀態(tài)都被復(fù)制到所有節(jié)點(diǎn)。一個(gè)例外是消息隊(duì)列,它在默認(rèn)情況下駐留在一個(gè)節(jié)點(diǎn)上,盡管它們?cè)谒泄?jié)點(diǎn)上都是可見(jiàn)和可達(dá)的。在集群中的節(jié)點(diǎn)間復(fù)制隊(duì)列) ? --摘自官網(wǎng)
默認(rèn)情況下:RabbitMQ代理操作所需的所有數(shù)據(jù)/狀態(tài)都將跨所有節(jié)點(diǎn)復(fù)制。這方面的一個(gè)例外是消息隊(duì)列,默認(rèn)情況下,消息隊(duì)列位于一個(gè)節(jié)點(diǎn)上,盡管它們可以從所有節(jié)點(diǎn)看到和訪問(wèn)
master節(jié)點(diǎn)來(lái)發(fā)送消息,slave節(jié)點(diǎn)只是備份。
核心解決問(wèn)題:當(dāng)集群中某一時(shí)刻master節(jié)點(diǎn)宕機(jī),可以對(duì)Quene(隊(duì)列)中信息進(jìn)行備份,防止丟失
架構(gòu)圖:

集群搭建
#?0.集群規(guī)劃
?node1:?10.15.0.3??mq1??master?主節(jié)點(diǎn)
?node2:?10.15.0.4??mq2??repl1??副本節(jié)點(diǎn)
?node3:?10.15.0.5??mq3??repl2??副本節(jié)點(diǎn)
#?1.克隆三臺(tái)機(jī)器主機(jī)名和ip映射
?vim?/etc/hosts加入:
??10.15.0.3?mq1
?????10.15.0.4?mq2
?????10.15.0.5?mq3
?node1:?vim?/etc/hostname?加入:??mq1
?node2:?vim?/etc/hostname?加入:??mq2
?node3:?vim?/etc/hostname?加入:??mq3
#?2.三個(gè)機(jī)器安裝rabbitmq,并同步cookie文件,在node1上執(zhí)行:
?scp?/var/lib/rabbitmq/.erlang.cookie?root@mq2:/var/lib/rabbitmq/
?scp?/var/lib/rabbitmq/.erlang.cookie?root@mq3:/var/lib/rabbitmq/
#?3.查看cookie是否一致:
?node1:?cat?/var/lib/rabbitmq/.erlang.cookie?
?node2:?cat?/var/lib/rabbitmq/.erlang.cookie?
?node3:?cat?/var/lib/rabbitmq/.erlang.cookie?
#?4.后臺(tái)啟動(dòng)rabbitmq所有節(jié)點(diǎn)執(zhí)行如下命令,啟動(dòng)成功訪問(wèn)管理界面:
?rabbitmq-server?-detached?
#?5.在node2和node3執(zhí)行加入集群命令:
?1.關(guān)閉???????rabbitmqctl?stop_app
?2.加入集群????rabbitmqctl?join_cluster?rabbit@mq1
?3.啟動(dòng)服務(wù)????rabbitmqctl?start_app
#?6.查看集群狀態(tài),任意節(jié)點(diǎn)執(zhí)行:
?rabbitmqctl?cluster_status
#?7.如果出現(xiàn)如下顯示,集群搭建成功:
?Cluster?status?of?node?rabbit@mq3?...
?[{nodes,[{disc,[rabbit@mq1,rabbit@mq2,rabbit@mq3]}]},
?{running_nodes,[rabbit@mq1,rabbit@mq2,rabbit@mq3]},
?{cluster_name,<<"rabbit@mq1">>},
?{partitions,[]},
?{alarms,[{rabbit@mq1,[]},{rabbit@mq2,[]},{rabbit@mq3,[]}]}]
#?8.登錄管理界面,展示如下?tīng)顟B(tài):

#?9.測(cè)試集群在node1上,創(chuàng)建隊(duì)列

#?10.查看node2和node3節(jié)點(diǎn):


#?11.關(guān)閉node1節(jié)點(diǎn),執(zhí)行如下命令,查看node2和node3:
?rabbitmqctl?stop_app


10.2 鏡像集群
This guide covers mirroring (queue contents replication) of classic queues ?--摘自官網(wǎng)
By default, contents of a queue within a RabbitMQ cluster are located on a single node (the node on which the queue was declared). This is in contrast to exchanges and bindings, which can always be considered to be on all nodes. Queues can optionally be made mirrored across multiple nodes. --摘自官網(wǎng)
鏡像隊(duì)列機(jī)制就是將隊(duì)列在三個(gè)節(jié)點(diǎn)之間設(shè)置主從關(guān)系,消息會(huì)在三個(gè)節(jié)點(diǎn)之間進(jìn)行自動(dòng)同步,且如果其中一個(gè)節(jié)點(diǎn)不可用,并不會(huì)導(dǎo)致消息丟失或服務(wù)不可用的情況,提升MQ集群的整體高可用性。
集群架構(gòu)圖

配置集群架構(gòu)
#?0.策略說(shuō)明
?rabbitmqctl?set_policy?[-p?<vhost>]?[--priority?<priority>]?[--apply-to?<apply-to>]?<name>?<pattern>??<definition>
?-p Vhost:?可選參數(shù),針對(duì)指定vhost下的queue進(jìn)行設(shè)置
?Name:?????policy的名稱
?Pattern:?queue的匹配模式(正則表達(dá)式)
?Definition:鏡像定義,包括三個(gè)部分ha-mode, ha-params, ha-sync-mode
?????????????ha-mode:指明鏡像隊(duì)列的模式,有效值為?all/exactly/nodes
??????????????????? all:表示在集群中所有的節(jié)點(diǎn)上進(jìn)行鏡像
??????????????????? exactly:表示在指定個(gè)數(shù)的節(jié)點(diǎn)上進(jìn)行鏡像,節(jié)點(diǎn)的個(gè)數(shù)由ha-params指定
??????????????????? nodes:表示在指定的節(jié)點(diǎn)上進(jìn)行鏡像,節(jié)點(diǎn)名稱通過(guò)ha-params指定
???????????? ha-params:ha-mode模式需要用到的參數(shù)
??????????????? ha-sync-mode:進(jìn)行隊(duì)列中消息的同步方式,有效值為automatic和manual
??????????????? priority:可選參數(shù),policy的優(yōu)先級(jí)???????????????????????????????
#?1.查看當(dāng)前策略
?rabbitmqctl?list_policies
#?2.添加策略
?rabbitmqctl?set_policy?ha-all?'^hello'?'{"ha-mode":"all","ha-sync-mode":"automatic"}'?
?說(shuō)明:策略正則表達(dá)式為?“^”?表示所有匹配所有隊(duì)列名稱??^hello:匹配hello開(kāi)頭隊(duì)列
#?3.刪除策略
?rabbitmqctl?clear_policy?ha-all
#?4.測(cè)試集群
