淺談RabbitMQ的基石—高級(jí)消息隊(duì)列協(xié)議(AMQP)
點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”

?
? 前言
自從去年做了不少流式系統(tǒng)(Flink也好,Spark Streaming也好)對(duì)接RabbitMQ的實(shí)時(shí)作業(yè)。之前一直都在Kafka的領(lǐng)域里摸爬滾打,對(duì)RabbitMQ只是有淺薄的了解而已。隨著自己逐漸把RabbitMQ的官方文檔大致翻完,了解到它是高級(jí)消息隊(duì)列協(xié)議(Advanced Message Queuing Protocol, AMQP)的一種標(biāo)準(zhǔn)實(shí)現(xiàn)。也就是說(shuō),搞清楚AMQP是掌握好RabbitMQ哲學(xué)的基礎(chǔ)。
當(dāng)前AMQP的最新版本為1.0,而主要使用的(也是RabbitMQ實(shí)現(xiàn)的)版本為0-9-1。這兩個(gè)版本之間的差別非常大,本文抄錄的是AMQP 0-9-1的部分細(xì)節(jié)。
AMQP及其模型
通俗地講,AMQP是一個(gè)專(zhuān)門(mén)為消息中間件設(shè)計(jì)的、開(kāi)放標(biāo)準(zhǔn)的應(yīng)用層協(xié)議,它規(guī)定了消息系統(tǒng)中三大組件——消息服務(wù)器/代理節(jié)點(diǎn)(server/broker)、生產(chǎn)者/發(fā)布者(producer/publisher)、消費(fèi)者/訂閱者(consumer/subscriber)之間的通信規(guī)范,以及代理節(jié)點(diǎn)的設(shè)計(jì)規(guī)范等。
AMQP采用的模型就叫做高級(jí)消息隊(duì)列模型,即AMQ模型,它的組成可以用下面的簡(jiǎn)圖來(lái)表示。
下面就圖中出現(xiàn)的一些名詞進(jìn)行解釋。
交換器(exchange):負(fù)責(zé)將生產(chǎn)者發(fā)來(lái)的消息按照特定的路由關(guān)鍵字(routing key)投遞到相應(yīng)的隊(duì)列。
隊(duì)列(queue):代理節(jié)點(diǎn)中存儲(chǔ)將要被消費(fèi)的消息的載體。
綁定(binding):交換器與隊(duì)列之間的映射關(guān)系,可以理解為消息的路由規(guī)則。
AMQP實(shí)體(AMQP entity):交換器、隊(duì)列和綁定三者合起來(lái)就稱(chēng)為一個(gè)AMQP實(shí)體,圖中未示出。交換器、隊(duì)列和綁定都可以有一個(gè)或多個(gè)。
虛擬主機(jī)(virtual host):在代理節(jié)點(diǎn)上邏輯劃分的隔離的環(huán)境,其內(nèi)部包含一個(gè)或多個(gè)AMQP實(shí)體,且虛擬主機(jī)之間互不影響。虛擬主機(jī)可以復(fù)用節(jié)點(diǎn),并實(shí)現(xiàn)權(quán)限管理和多租戶(hù)。
連接(connection):發(fā)布者、消費(fèi)者與代理節(jié)點(diǎn)之間建立的連接,為了保證可靠性,一般都是TCP長(zhǎng)連接。
通道(channel):對(duì)連接的輕量級(jí)復(fù)用,主要針對(duì)多線(xiàn)程的發(fā)布者、消費(fèi)者,因?yàn)榻⒍鄠€(gè)TCP連接是很貴的操作,頻繁建立和銷(xiāo)毀連接也是不科學(xué)的。
接下來(lái)對(duì)交換器和隊(duì)列這兩個(gè)比較重要的組件進(jìn)行介紹,順便牽出一些其他的東西。
交換器
交換器在A(yíng)MQP實(shí)體中負(fù)責(zé)消息路由。它的路由目的地除了由用戶(hù)設(shè)置的綁定規(guī)則來(lái)決定之外,還與交換器的類(lèi)型有關(guān)。AMQP定義了幾種默認(rèn)的交換器。
直連交換器(direct exchange)
直連交換器非常簡(jiǎn)單,它檢查綁定關(guān)鍵字(binding key)與路由關(guān)鍵字(routing key),只要兩者相同,即進(jìn)行投遞。

扇出交換器(fanout exchange)
扇出交換器比直連交換器更簡(jiǎn)單,它會(huì)直接將消息路由到所有與它綁定的隊(duì)列中。

主題交換器(topic exchange) 此主題非彼(對(duì)就是Kafka里的)主題,而更類(lèi)似wildcard matching。具體來(lái)講,綁定關(guān)鍵字是由多個(gè)域組成的點(diǎn)號(hào)分隔的字符串,每個(gè)域可以是實(shí)際的單詞,也可以是通配符,如星號(hào) " * " 表示一個(gè)詞,"#" 表示0個(gè)或多個(gè)詞。在實(shí)際路由時(shí),根據(jù)路由關(guān)鍵字與綁定關(guān)鍵字的匹配結(jié)果來(lái)投遞。比如在下圖中,帶有"little.C.magic"關(guān)鍵字的消息會(huì)投遞到隊(duì)列1,而帶有"bla.bla.B"關(guān)鍵字的消息會(huì)投遞到隊(duì)列2。

頭部交換器(header exchange) AMQP消息與HTTP報(bào)文的格式類(lèi)似,都有頭部(header)和消息體(body),其中頭部會(huì)保存與消息相關(guān)的許多元數(shù)據(jù),消息體才是有效的載荷(payload)。頭部交換器就不依賴(lài)綁定關(guān)鍵字和路由關(guān)鍵字的匹配,而是檢查消息頭部中的元數(shù)據(jù)是否匹配,相對(duì)而言更加靈活。
根據(jù)AMQP的規(guī)定,交換器的幾個(gè)重要屬性有:
名稱(chēng)(name);
持久性(durable):當(dāng)代理節(jié)點(diǎn)或虛擬主機(jī)重置后,交換器是被保留還是被刪除;
自動(dòng)刪除(auto-delete):是否在所有隊(duì)列的綁定解除之后被刪除;
擴(kuò)展參數(shù)(arguments)。
如果交換器無(wú)法將消息路由到隊(duì)列該怎么辦呢?AMQP給出了幾種解決方法,一是直接丟棄,二是返還給生產(chǎn)者,三是放入死信隊(duì)列中等待進(jìn)一步處理。這由消息頭部中的屬性來(lái)決定。
隊(duì)列和消息
隊(duì)列相對(duì)而言比較簡(jiǎn)單,它的主要功能就是存儲(chǔ)要被消費(fèi)的消息。隊(duì)列也有一些重要的屬性,如下:
名稱(chēng)(name);
持久性(durable):當(dāng)代理節(jié)點(diǎn)或虛擬主機(jī)重置后,隊(duì)列是被保留還是被刪除;
獨(dú)占性(exclusive):是否只允許被一個(gè)連接使用;
自動(dòng)刪除(auto-delete):是否在所有消費(fèi)者取消訂閱之后被刪除;
擴(kuò)展參數(shù)(arguments):如隊(duì)列緩存長(zhǎng)度、消息TTL等。
需要注意,如果一個(gè)隊(duì)列是持久的,那么只是代表重啟之后這個(gè)隊(duì)列不用重新創(chuàng)建而已,但其中的消息還是有可能被刪除。只有那些被標(biāo)記為persistent的消息才不會(huì)被刪除。
AMQP規(guī)范下的隊(duì)列和消費(fèi)者都同時(shí)支持推模式和拉模式消費(fèi)。前者即AMQP實(shí)體將消息投遞到消費(fèi)者,后者即消費(fèi)者主動(dòng)地從隊(duì)列中獲取消息。無(wú)論推模式還是拉模式,每個(gè)消費(fèi)者也有一個(gè)標(biāo)識(shí),稱(chēng)為tag。
在隊(duì)列中的消息投遞出去之后,消費(fèi)者需要告訴代理節(jié)點(diǎn)自己是否收到了它,因此會(huì)涉及消息確認(rèn)(ack)的問(wèn)題。AMQP默認(rèn)定義了兩種ack機(jī)制:
自動(dòng)ack:當(dāng)消息從隊(duì)列中出去后就刪除它(即at most once);
顯式ack:當(dāng)消費(fèi)者發(fā)送的確認(rèn)回執(zhí)到達(dá)代理節(jié)點(diǎn)后,再?gòu)年?duì)列中刪除它。如果ack超時(shí),則會(huì)再次嘗試投遞(即at least once)。
除了ack之外,消費(fèi)者在處理時(shí)有可能會(huì)出現(xiàn)問(wèn)題,或認(rèn)為此消息非法,因此也會(huì)出現(xiàn)拒絕消息(reject)的情況。此時(shí)代理節(jié)點(diǎn)可以銷(xiāo)毀這條消息,也可以重新將它放入隊(duì)列并投遞給另一個(gè)消費(fèi)者。
vs Kafka?
說(shuō)了這么多,那么Kafka和AMQP有什么關(guān)系呢?答案是沒(méi)關(guān)系。
也就是說(shuō),Kafka不是消息隊(duì)列。按官方說(shuō)法,Kafka是一個(gè)流式處理平臺(tái)(stream processing platform)。Kafka在設(shè)計(jì)之初是為了支持高吞吐量的日志處理的,只不過(guò)它恰好也可以實(shí)現(xiàn)消息隊(duì)列的大部分功能而已。Kafka所用的“黑科技”(如零拷貝/內(nèi)存映射,以及對(duì)page cache的利用)都是脫離標(biāo)準(zhǔn)消息隊(duì)列的設(shè)計(jì)范疇的,所以不能簡(jiǎn)單地認(rèn)為Kafka比RabbitMQ等符合AMQP的消息隊(duì)列更優(yōu)。例如,RabbitMQ支持死信隊(duì)列、延遲隊(duì)列、優(yōu)先隊(duì)列、多租戶(hù)、推模式消費(fèi)等,Kafka統(tǒng)統(tǒng)不支持。

版權(quán)聲明:
文章不錯(cuò)?點(diǎn)個(gè)【在看】吧!??




