手把手教你學會RabbitMQ
每天早上七點三十,準時推送干貨

前幾天阿粉在看關于如何處理分布式事務的解決方案,于是就看到了關于使用最大努力通知來處理分布式事務的問題,而這其中最不可或缺的就是消息中間件了,那么什么是消息中間件呢?
為什么有消息中間件
前幾天阿粉在看關于如何處理分布式事務的解決方案,于是就看到了關于使用最大努力通知來處理分布式事務的問題,而這其中最不可或缺的就是消息中間件了,那么什么是消息中間件呢?
1. 什么是消息中間件
在百度百科給出的解釋是:消息中間件是基于隊列與消息傳遞技術,在網絡環(huán)境中為應用系統(tǒng)提供同步或異步、可靠的消息傳輸?shù)闹涡攒浖到y(tǒng)。

大家看上圖,實際上就是生產者發(fā)給消息中間件一點東西,然后提供給消費者去消費,這樣理解是不是就比百度百科要簡單了很多了。
2. 消息中間件的種類
我們在這里先不討論消息中間件的組成,下面會繼續(xù)講解,我們先看看都有哪些消息中間件,以及他們之間都有什么特點
1.ActiveMQ
ActiveMQ是Apache軟件基金會所研發(fā)的開放源代碼消息中間件;由于ActiveMQ是一個純Java程序,因此只需要操作系統(tǒng)支持Java虛擬機,ActiveMQ便可執(zhí)行。畢竟是Apache來維護的,功能還是非常強大的,
支持Java消息服務(JMS) 集群 (Clustering) 協(xié)議支持包括:OpenWire、REST、STOMP、WS-Notification、MQTT、XMPP以及AMQP。
但是ActiveMQ的缺點一樣也是很明顯的,版本更新很緩慢。集群模式需要依賴Zookeeper實現(xiàn)。雖然現(xiàn)在有了Apollo,號稱下一代ActiveMQ,目前案例那是少的可憐。
2.RabbitMQ
RabbitMQ是實現(xiàn)了高級消息隊列協(xié)議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)
生態(tài)豐富,使用者眾,有很多人在前面踩坑。AMQP協(xié)議的領導實現(xiàn),支持多種場景
3.RocketMQ
RocketMQ是阿里開源的消息中間件,目前在Apache孵化,使用純Java開發(fā),具有高吞吐量、高可用性、適合大規(guī)模分布式系統(tǒng)應用的特點,如果大家使用過Kafka的話,那么你就會發(fā)現(xiàn)RocketMQ其實和Kafka很相似,但是絕對不是單純的摘出來了一塊內容那么簡單,目前在阿里集團被廣泛應用于交易、充值、流計算、消息推送、日志流式處理、binglog分發(fā)等場景,支撐了阿里多次雙十一活動。
4.Kafka
Kafka是LinkedIn于2010年12月開發(fā)并開源的一個分布式流平臺,現(xiàn)在是Apache的頂級項目,是一個高性能跨語言分布式Publish/Subscribe消息隊列系統(tǒng),以Pull的形式消費消息。Kafka自身服務與消息的生產和消費都依賴于Zookeeper,使用Scala語言開發(fā)。學習成本有時候會非常的大,不過阿粉也是相信大家對這個東西肯定很好奇,因為畢竟他是大數(shù)據(jù)生態(tài)系統(tǒng)中不可缺少的一環(huán),以后阿粉會陸陸續(xù)續(xù)的去帶著大家學習這塊的內容。
說完了區(qū)分,那么我們就得開始正兒八經的學習RabbitMQ了,安裝,使用,整合到項目中,一氣呵成。
3. RabbitMQ的安裝
關于安裝的教程,阿粉就不再給大家一一的去說了,畢竟網上有的是教程,官網也有指定的教程,【https://www.rabbitmq.com/】 官網在這里,不過大家需要注意一件事情,RabbitMQ如果你想要安裝Windows版本的話,那么你一定得先裝一個環(huán)境,那就是erlang語言的環(huán)境,否則,你是裝不到Windows上的。
安裝完成之后,登錄上他的后臺,IP/port,

默認登錄進去就是這個樣子滴。

4.RabbitMQ的使用
上一階段,我們已經完全的把RabbitMQ進行了安裝,接下來我們就要看他的使用了,來,繼續(xù)找官網的文檔。

大家看到了這個Java案例了么?來打開了瞅瞅,先把生產者和消費者確定一下,生產者Producer,消費者Consumer,生產者提供消息,然后把消息發(fā)送到消息隊列,消費者監(jiān)聽到消息之后,對消息進行消費。
我們也手把手寫一個,
Producer
private final static String QUEUE_NAME = "Test_queue";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" 我生產了一條消息 " + message );
}catch (Exception e){
}
}
我生產了一條消息Hello World
這個時候,我們就能在后端的控制臺上看到內容了,

看到了我們的total,還有Ready,證明我們發(fā)送消息已經成功了,現(xiàn)在已經有一條叫做“Hello World”的消息在我們的消息隊列里面了。

既然作為生產者的我們,消息已經發(fā)送了,這時候是不是就得開始寫個消費者了?來,我們再寫一個最最基礎的消費者
Consumer
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, deliver) -> {
String message = new String(deliver.getBody(), "UTF-8");
System.out.println(" 我是來消費消息的" + message );
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
我是來消費消息的'Hello World'

大家看,這是不是就把你剛才生產的消息給消費掉了呢?
實際上Demo就是這么簡單,但是不能這么簡單的去寫,今天阿粉先來講講這個RabbitMQ里面的一些內容,之后再繼續(xù)給大家整合一下SpringBoot,SpringCloud這些內容。
5.RabbitMQ的一些重點基礎
5.1 Channel(通道)
多路復用連接中的一條獨立的雙向數(shù)據(jù)流通道。信道是建立在真實的TCP連接內的虛擬連接,復用TCP連接的通道。
5.2 Queue(消息隊列)
在這里阿粉就要給大家開始劃重點了,Queue是RabbitMQ 的內部對象,用于存儲消息。RabbitMQ中消息都只能存儲在隊列中,隊列的特性是先進先出。
我們剛才也看到了圖片,發(fā)送的消息都是在Queue中存儲的,可以理解成一個存儲數(shù)據(jù)的結構,我們把“Hello World”發(fā)送到Queue中,然后提供給消費者消費。
至于生產者Producer和消費者Consumer,阿粉肯定也不用說了,一個是生產消息,一個消費消息。
5.3 Exchange: 交換器
Exchange交換機扮演著接收生產者生產的消息的角色。同時Exchange交換機還需要將接收到的消息傳遞給queue隊列進行存儲或消費,不同類型的exchange配合著routing_key就能按照exchange類型對應的路有規(guī)則將消息傳遞到指定的某個或者某些queue隊列。
說到交換器,那肯定就想到了路由,而在這里也是有這個名字的,不過不是單純的路由,而是RoutingKey。
RoutingKey: 相當于一個路由鍵,一般是用來指定路由規(guī)則的。而他經常搭配和Binding(綁定鍵)一起使用。
生產者將消息發(fā)送給交換器時,需要一個RoutingKey,當BindingKey和 RoutingKey相匹配時,消息會被路由到對應的隊列中。
Exchange分類:
Direct
Fanout
Topic
Headers
對于Direct類型的交換器,在接收到生產者發(fā)送的消息時會將消息路由給與該exchange綁定的且與該消息的routing_key同名的queue隊列。如果exchange上未綁定與routing_key同名的queue,消息將會被拋棄。
對于Fanout類型的交換器,將接收到的消息路由投遞到所有與其綁定的queue隊列上,此種類型exchange消息路由與routing_key無關。
對于Topic類型的交換器,采用模糊匹配的方式,可以通過通配符滿足一部分規(guī)則,這就和Direct不一樣了,Direct是完全匹配BindingKey和RoutingKey。
對于Headers類型的交換器,不依賴于路由鍵的匹配規(guī)則來路由消息,而是根據(jù)發(fā)送的消息內容中的 headers 屬性進行匹配.這時候我們需要在代碼里面去設置Headers中的信息(鍵值對),對比其中的鍵值對是否完全匹配隊列和交換器綁定時指定的鍵值對,如果完全匹配則消息會路由到該隊列,否則不會路由到該隊列 。
他們這些也是各有特點,大家記住了么?到這里文章就告一段落了,接下來的文章會深入講解一下怎么去配置這些交換器類型,還有整合SpringBoot,大家快去趕緊動手安裝并且寫個測試吧。
PS:公號內回復「Python」即可進入Python 新手學習交流群,一起 100 天計劃!
老規(guī)矩,兄弟們還記得么,右下角的 “在看” 點一下,如果感覺文章內容不錯的話,記得分享朋友圈讓更多的人知道!


【神秘禮包獲取方式】
