消息隊列(MQ)之生產(chǎn)者-消費(fèi)者 | 一文搞定
大家好,我是狼王,一個愛打球的程序員
隨著互聯(lián)網(wǎng)的發(fā)展,技術(shù)也在快速的迭代中,由于大流量,高并發(fā)的出現(xiàn),很多問題也隨之而來了,為了解決這些問題,一些高端的人才研究出了各種解決這些問題的東西,
消息隊列就是其中一種。那么今天,我們就來聊聊消息隊列吧!
什么是消息隊列?
消息隊列不知道大家看到這個詞的時候,會不會覺得它是一個比較高端的技術(shù),反正我是覺得它好像是挺牛逼的。
消息隊列,一般我們會簡稱它為MQ(Message Queue),嗯,就是很直白的簡寫。
我們先不管消息(Message)這個詞,來看看隊列(Queue)。這一看,隊列大家應(yīng)該都熟悉吧。
隊列是一種先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu)。

在Java里邊,已經(jīng)實現(xiàn)了不少的隊列了。
那為什么還需要消息隊列(MQ)這種中間件呢???
其實這個問題,跟之前我學(xué)Redis的時候很像。Redis是一個以key-value形式存儲的內(nèi)存數(shù)據(jù)庫,明明我們可以使用類似HashMap這種實現(xiàn)類就可以達(dá)到類似的效果了,那還為什么要Redis?
到這里,大家可以先猜猜為什么要用消息隊列(MQ)這種中間件
消息隊列可以簡單理解為:把要傳輸?shù)臄?shù)據(jù)放在隊列中。
科普:
- 把數(shù)據(jù)放到消息隊列叫做生產(chǎn)者
- 從消息隊列里邊取數(shù)據(jù)叫做消費(fèi)者
市面上的消息隊列產(chǎn)品有很多,比如老牌的
ActiveMQ、RabbitMQ,目前比較火的有Kafka,和阿里巴巴捐贈給Apache的RocketMQ,連redis 這樣的NoSQL 數(shù)據(jù)庫也支持 MQ 功能。總之這塊知名的產(chǎn)品就有十幾種。
為什么要用消息隊列,也就是在問:用了消息隊列有什么好處。
解耦
以常見的訂單系統(tǒng)為例
用戶點擊【下單】按鈕之后的業(yè)務(wù)邏輯可能包括:扣減庫存、生成相應(yīng)單據(jù)、發(fā)貨、發(fā)短信通知等。
在業(yè)務(wù)發(fā)展初期這些邏輯可能放在一起同步執(zhí)行,隨著業(yè)務(wù)的發(fā)展訂單量增長,需要提升系統(tǒng)服務(wù)的性能。
這時可以將一些不需要立即生效的操作拆分出來異步執(zhí)行,比如發(fā)貨、發(fā)短信通知等。
這種場景下就可以用 MQ ,在下單的主流程(比如扣減庫存、生成相應(yīng)單據(jù))完成之后發(fā)送一條消息到 MQ 讓主流程快速完結(jié),而由另外的單獨線程拉取MQ的消息(或者由 MQ 推送消息),當(dāng)發(fā)現(xiàn) MQ 中有發(fā)貨或發(fā)短信之類的消息時,執(zhí)行相應(yīng)的業(yè)務(wù)邏輯。
簡單的說就是原來a服務(wù)需要調(diào)用b服務(wù)的接口或者方法來進(jìn)行數(shù)據(jù)的傳遞,這個時候使用消息隊列的話,a服務(wù)只需將數(shù)據(jù)發(fā)送到消息隊列中,b服務(wù)從消息隊列中取出相應(yīng)的數(shù)據(jù)即可,就實現(xiàn)了解耦
異步
異步其實就是a服務(wù)將數(shù)據(jù)發(fā)送到消息隊列之后就可以進(jìn)行返回或者執(zhí)行其他過程,不需要等待b服務(wù)處理數(shù)據(jù),從而來提高一些使用異步的業(yè)務(wù)場景的效率問題
削峰/限流
我們再來一個場景,比如現(xiàn)在我們每個月要搞一次大促,大促期間的并發(fā)可能會很高的,比如每秒5000個請求。假設(shè)我們現(xiàn)在有兩臺機(jī)器處理請求,并且每臺機(jī)器只能每次處理2000個請求。

那多出來的1000個請求,可能就把我們整個系統(tǒng)給搞崩了,所以,有一種辦法,我們可以寫到消息隊列中:

服務(wù)器A和服務(wù)器B根據(jù)自己的能夠處理的請求數(shù)去消息隊列中拿數(shù)據(jù),這樣即便有每秒有1w個請求,那只是把請求放在消息隊列中,去拿消息隊列的消息由系統(tǒng)自己去控制,這樣就不會把整個系統(tǒng)給搞崩。
使用消息隊列有什么問題?
經(jīng)過我們上面的場景,我們已經(jīng)可以發(fā)現(xiàn),消息隊列能做的事其實還是蠻多的。
說到這里,我們先回到文章的開頭,"明明JDK已經(jīng)有不少的隊列實現(xiàn)了,我們還需要消息隊列中間件呢?"
其實很簡單,JDK實現(xiàn)的隊列種類雖然有很多種,但是都是簡單的內(nèi)存隊列。為什么我說JDK是簡單的內(nèi)存隊列呢?
下面我們來看看要實現(xiàn)消息隊列(中間件)可能要考慮什么問題。
高可用
無論是我們使用消息隊列來做解耦、異步還是削峰,消息隊列肯定不能是單機(jī)的。試著想一下,如果是單機(jī)的消息隊列,萬一這臺機(jī)器掛了,那我們整個系統(tǒng)幾乎就是不可用了,就出現(xiàn)了單點故障。

所以,當(dāng)我們項目中使用消息隊列,都是得集群/分布式的。要做集群/分布式就必然希望該消息隊列能夠提供現(xiàn)成的支持,而不是自己寫代碼手動去實現(xiàn)。

數(shù)據(jù)丟失問題
我們將數(shù)據(jù)寫到消息隊列上,服務(wù)器A和服務(wù)器B還沒來得及消費(fèi)消息隊列的數(shù)據(jù),就掛掉了。如果沒有做任何的措施,我們的數(shù)據(jù)就丟了。

學(xué)過Redis的都知道,Redis可以將數(shù)據(jù)持久化磁盤上,萬一Redis掛了,還能從磁盤將數(shù)據(jù)恢復(fù)過來。同樣地,消息隊列中的數(shù)據(jù)也需要存在別的地方,這樣才盡可能減少數(shù)據(jù)的丟失。
- 那存在哪呢?
- 磁盤?
- 數(shù)據(jù)庫?
- 同步存儲還是異步存儲?
不同的MQ針對消息丟失的處理和解決方案都有所不同,但是肯定都是從生產(chǎn)者和消費(fèi)者兩端進(jìn)行分析的。
生產(chǎn)者端丟失消息
生產(chǎn)者要確保消息發(fā)送到了MQ,就會有回調(diào)確認(rèn)機(jī)制的處理和事務(wù)的方式
消息隊列丟失消息
在消息隊列中假如因為MQ掛了導(dǎo)致消息丟了,那么就可以將消息持久化,或者使用生產(chǎn)者端重發(fā)消息的方式
消費(fèi)者端丟消息
一般消費(fèi)者丟了消息的原因就是從MQ中取到了消息,但是可能消費(fèi)失敗了需要重新消費(fèi),但是MQ中已經(jīng)沒有該條消息了,這樣的話可以通過消費(fèi)者端手動確認(rèn)的機(jī)制,或者讓生產(chǎn)者端重發(fā)消息的方式
消費(fèi)者怎么得到消息隊列的數(shù)據(jù)?
消費(fèi)者怎么從消息隊列里邊得到數(shù)據(jù)?一般有兩種辦法:
- 生產(chǎn)者將數(shù)據(jù)放到消息隊列中,消息隊列有數(shù)據(jù)了,主動叫消費(fèi)者去拿(俗稱push)
- 消費(fèi)者不斷去輪訓(xùn)消息隊列,看看有沒有新的數(shù)據(jù),如果有就消費(fèi)(俗稱pull)
其他問題
除了這些,我們在使用的時候還得考慮各種的問題:
消息重復(fù)消費(fèi)了怎么辦啊?我想保證消息是絕對有順序的怎么做?……..
雖然消息隊列給我們帶來了那么多的好處,但同時我們發(fā)現(xiàn)引入消息隊列也會提高系統(tǒng)的復(fù)雜性。市面上現(xiàn)在已經(jīng)有不少消息隊列輪子了,每種消息隊列都有自己的特點,選取哪種MQ還得好好斟酌。
這次我們先來講講RabbitMQ
RabbitMQ
RabbitMQ 是一個由 Erlang 語言開發(fā)的 AMQP 的開源實現(xiàn)。
AMQP : Advanced Message Queue,高級消息隊列協(xié)議。它是應(yīng)用層協(xié)議的一個開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計,基于此協(xié)議的客戶端與消息中間件可傳遞消息,并不受產(chǎn)品、開發(fā)語言等條件的限制。
RabbitMQ 最初起源于金融系統(tǒng),用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息,在易用性、擴(kuò)展性、高可用性等方面表現(xiàn)不俗。具體特點包括:
可靠性(Reliability)RabbitMQ 使用一些機(jī)制來保證可靠性,如持久化、傳輸確認(rèn)、發(fā)布確認(rèn)。靈活的路由(Flexible Routing)在消息進(jìn)入隊列之前,通過 Exchange 來路由消息的。對于典型的路由功能,RabbitMQ 已經(jīng)提供了一些內(nèi)置的 Exchange 來實現(xiàn)。針對更復(fù)雜的路由功能,可以將多個 Exchange 綁定在一起,也通過插件機(jī)制實現(xiàn)自己的 Exchange 。消息集群(Clustering)多個 RabbitMQ 服務(wù)器可以組成一個集群,形成一個邏輯 Broker 。高可用(Highly Available Queues)隊列可以在集群中的機(jī)器上進(jìn)行鏡像,使得在部分節(jié)點出問題的情況下隊列仍然可用。多種協(xié)議(Multi-protocol)RabbitMQ 支持多種消息隊列協(xié)議,比如 STOMP、MQTT 等等。多語言客戶端(Many Clients)RabbitMQ 幾乎支持所有常用語言,比如 Java、.NET、Ruby 等等。管理界面(Management UI)RabbitMQ 提供了一個易用的用戶界面,使得用戶可以監(jiān)控和管理消息 Broker 的許多方面。跟蹤機(jī)制(Tracing)如果消息異常,RabbitMQ 提供了消息跟蹤機(jī)制,使用者可以找出發(fā)生了什么。插件機(jī)制(Plugin System)RabbitMQ 提供了許多插件,來從多方面進(jìn)行擴(kuò)展,也可以編寫自己的插件。
RabbitMQ 中的概念模型
消息模型
所有 MQ 產(chǎn)品從模型抽象上來說都是一樣的過程:消費(fèi)者(consumer)訂閱某個隊列。生產(chǎn)者(producer)創(chuàng)建消息,然后發(fā)布到隊列(queue)中,最后將消息發(fā)送到監(jiān)聽的消費(fèi)者。
RabbitMQ 基本概念
上面只是最簡單抽象的描述,具體到 RabbitMQ 則有更詳細(xì)的概念需要解釋。上面介紹過 RabbitMQ 是 AMQP 協(xié)議的一個開源實現(xiàn),所以其內(nèi)部實際上也是 AMQP 中的基本概念:

Message 消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對于其他消息的優(yōu)先權(quán))、delivery-mode(指出該消息可能需要持久性存儲)等。Publisher 消息的生產(chǎn)者,也是一個向交換器發(fā)布消息的客戶端應(yīng)用程序。Exchange 交換器,用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊列。Binding 綁定,用于消息隊列和交換器之間的關(guān)聯(lián)。一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規(guī)則,所以可以將交換器理解成一個由綁定構(gòu)成的路由表。Queue 消息隊列,用來保存消息直到發(fā)送給消費(fèi)者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費(fèi)者連接到這個隊列將其取走。Connection 網(wǎng)絡(luò)連接,比如一個TCP連接。Channel 信道,多路復(fù)用連接中的一條獨立的雙向數(shù)據(jù)流通道。信道是建立在真實的TCP連接內(nèi)的虛擬連接,AMQP 命令都是通過信道發(fā)出去的,不管是發(fā)布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于操作系統(tǒng)來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復(fù)用一條 TCP 連接。Consumer 消息的消費(fèi)者,表示一個從消息隊列中取得消息的客戶端應(yīng)用程序。Virtual Host 虛擬主機(jī),表示一批交換器、消息隊列和相關(guān)對象。虛擬主機(jī)是共享相同的身份認(rèn)證和加密環(huán)境的獨立服務(wù)器域。每個vhost 本質(zhì)上就是一個 mini 版的 RabbitMQ 服務(wù)器,擁有自己的隊列、交換器、綁定和權(quán)限機(jī)制。vhost 是 AMQP 概念的基礎(chǔ),必須在連接時指定,RabbitMQ 默認(rèn)的 vhost 是 / 。Broker表示消息隊列服務(wù)器實體。
本文主要講解了什么是消息隊列,消息隊列可以為我們帶來什么好處,以及一個消息隊列可能會涉及到哪些問題,后來會更加深入的去探討哦!希望給大家?guī)硪欢ǖ膸椭?/p>
好了。今天就說到這了,我還會不斷分享自己的所學(xué)所想,希望我們一起走在成功的道路上!
轉(zhuǎn)發(fā)朋友圈是對我最大的支持!樂于輸出干貨的Java技術(shù)公眾號:狼王編程。公眾號內(nèi)有大量的技術(shù)文章、海量視頻資源、精美腦圖,不妨來關(guān)注一下!回復(fù)資料領(lǐng)取大量學(xué)習(xí)資源和免費(fèi)書籍!
?覺得有點東西就點一下“贊和在看”吧!感謝大家的支持了!