RabbitMQ高級(jí)特性實(shí)戰(zhàn)
哈嘍大家好!我是小三,今天寫的是關(guān)于Rabbitmq的高級(jí)特性。如果對(duì)你有幫助也可以點(diǎn)點(diǎn)關(guān)注點(diǎn)點(diǎn)贊哈。
RabbitMQ高級(jí)特性-消息可靠性的投遞+消費(fèi)
那什么是消息的可靠性投遞呢?不知道小伙伴們有沒有考慮過,生產(chǎn)者將數(shù)據(jù)將數(shù)據(jù)發(fā)送到Rabbitmq的時(shí)候,消息可能會(huì)因?yàn)榫W(wǎng)絡(luò)等等一些問題在傳入的過程里弄丟了。
先來看一張簡(jiǎn)單的消費(fèi)流程圖:

在上圖可以想到:
1、從producker在往broker這一階段的消息在傳入的過程中會(huì)丟失。
2、消息發(fā)送到mq后,存入內(nèi)存但還沒來得及處理消息,因?yàn)殄礄C(jī)導(dǎo)致了內(nèi)存中的數(shù)據(jù)丟失。
3、消費(fèi)消費(fèi)到數(shù)據(jù)之后,還沒來得及處理,消費(fèi)的進(jìn)程就已經(jīng)掛掉了,導(dǎo)致了mq以為消息已經(jīng)被處理,事實(shí)上并沒有。
因?yàn)樯鲜鲈?,我們需要保證到mq節(jié)點(diǎn)可以成功的接收到消息,那怎么來保證消息可靠性呢?
我們知道rabbitmq的消息投遞路徑是:生產(chǎn)者->交換機(jī)->隊(duì)列->消費(fèi)者,通過這個(gè)消息投遞路徑我們可以通過兩個(gè)點(diǎn)的控制來保證消息的可靠性傳遞。
1、生產(chǎn)者到交換機(jī),通過confirmCallback,也就是確認(rèn)回調(diào),默認(rèn)的話是不開啟的。當(dāng)這個(gè)模式開啟之后,生產(chǎn)者給交換機(jī)發(fā)送一個(gè)消息,交換機(jī)收到消息后會(huì)給消費(fèi)者發(fā)送一個(gè)確認(rèn)收到的信息
2、交換機(jī)到隊(duì)列,通過returnCallback,也就是返回回調(diào),這個(gè)不像上一個(gè)的機(jī)制,confirmCallback不管消息是否成功到達(dá)交換機(jī)的時(shí)候都會(huì)被調(diào)用,而returnCallback只有在交換機(jī)到達(dá)隊(duì)列失敗的時(shí)候才會(huì)觸發(fā),當(dāng)回調(diào)的函數(shù)被調(diào)用的時(shí)候,說明了交換機(jī)的消息并沒有順利的到達(dá)隊(duì)列。
這里給個(gè)小建議:開啟了消息的確認(rèn)機(jī)制之后,是為了保證消息的準(zhǔn)確送達(dá),但會(huì)因?yàn)轭l繁的確認(rèn)交互,使得rabbitmq的整體效率變得低下,吞吐量會(huì)下降嚴(yán)重。所以不是非常重要的消息真心不建議使用消息確認(rèn)的機(jī)制
Rabbitmq消息可靠性confirmCallback實(shí)戰(zhàn)
生產(chǎn)者到交換機(jī),生產(chǎn)者投遞消息之后,如果Broker接收到消息后,就給生產(chǎn)者發(fā)個(gè)ACK,ACK就好像是回饋消息,生產(chǎn)者就可以通過這一個(gè)ACK就可以確認(rèn)這條消息是否正常地發(fā)送到Broker了,這種方式是消息可靠投遞的核心。
開啟confirmCallback的方法
在舊版中,確認(rèn)消息的發(fā)送成功,可以通過實(shí)現(xiàn)ConfirmCallBack接口,把消息發(fā)送到交換器Exchange后觸發(fā)回調(diào)
spring.rabbitmq.publisher-confirms=true
在新版里,NONE值已經(jīng)禁用了發(fā)布確認(rèn)模式,為默認(rèn)值;CORRELATED的值是發(fā)布消息成功后到交換器會(huì)觸發(fā)回調(diào)方法
spring.rabbitmq.publisher-confirm-type:correlated
實(shí)戰(zhàn)演練
@Autowired
?private?RabbitTemplate?template;
?@Test
?void?testConfirmCallback()?{
?template.setConfirmCallback(new?RabbitTemplate.ConfirmCallback()?{
?/**
?*
?*?@param?correlationData?配置
?*?@param?ack?交換機(jī)是否收到消息,true是成功,false是失敗
?*?@param?cause?失敗的原因
?*/
?@Override
?public?void?confirm(CorrelationData?correlationData,?boolean?ack,?String?cause)?{
?System.out.println("confirm=====>");
?System.out.println("confirm====ack="+ack);
?System.out.println("confirm====cause="+cause);
?//根據(jù)ACK狀態(tài)做對(duì)應(yīng)的消息更新操作?TODO
?}
?});
?
template.convertAndSend(RabbitMQConfig.EXCHAN
GE_NAME+,"order.new","新訂單來啦1");
?}
如果需要模擬異常的話需要對(duì)投遞的交換機(jī)每次進(jìn)行更改!簡(jiǎn)單的實(shí)戰(zhàn)就到這里了
RabbitMQ死信隊(duì)列+TTL
TTL是什么?
TTL的意思就是time to live 消息的存活時(shí)間。如果消息一直在隊(duì)列當(dāng)中沒有被消費(fèi)并且的話存在的時(shí)間已經(jīng)超過了消息的存活時(shí)間,消息就會(huì)標(biāo)稱了“死信”(后續(xù)會(huì)講到),后續(xù)就無法被消費(fèi)了。設(shè)置TTL的話有兩種方式:第一種單獨(dú)消息來進(jìn)行配置ttl,第二種是整個(gè)隊(duì)列來進(jìn)行配置ttl,第二種使用居多。TTL介紹完了,咱們來介紹一下死信隊(duì)列
什么是死信隊(duì)列呢?
可以從概念上來搞清楚,“死信”顧名思義就是無法被消費(fèi)到的消息。一般來說的話,producer將消息投遞到了broker或者是queue里面了,再接著consumer從queue取出消息后進(jìn)行消費(fèi)。這個(gè)沒問題吧,但是在某一些時(shí)候由于特殊的原因會(huì)導(dǎo)致到queue中的某一些消息沒有辦法被消費(fèi)到,這樣的消息如果沒有經(jīng)過后續(xù)的處理的話,就會(huì)變成了死信,有了死信也就有了死信隊(duì)列。
rabbitmq產(chǎn)生死信會(huì)有如下幾種原因:
消息被拒絕了,并且requeue為false 消息的存活時(shí)間過期 隊(duì)列達(dá)到的最大的長度(隊(duì)列已經(jīng)滿了,沒有辦法在添加數(shù)據(jù)到mq里)
rabbitmq死信交換機(jī)
當(dāng)消息成為死信之后,為了防止消息的丟失,會(huì)將這樣無法處理到的消息發(fā)送到一個(gè)叫死信交換機(jī)里,后面再通過死信交換機(jī)綁定的路由鍵發(fā)送到相對(duì)應(yīng)的死信隊(duì)列。當(dāng)消息在一個(gè)隊(duì)列當(dāng)中變成死信,如果也配置了死信隊(duì)列,那么就會(huì)重新的publish到死信交換機(jī)中,私信交換機(jī)在把這些死信投遞到隊(duì)列上,就形成了死信隊(duì)列。

在RabbitMQ管控臺(tái)消息TTL進(jìn)行測(cè)試
設(shè)置隊(duì)列過期使用的參數(shù),對(duì)整個(gè)隊(duì)列的消息統(tǒng)一設(shè)置過期 x-message-ttl,消息過期時(shí)間使用的參數(shù)(如果隊(duì)列的頭部消息沒有過期,隊(duì)列中消息就已經(jīng)過期了,已經(jīng)還在隊(duì)列里)expiration
RabbitMQ的Web控制臺(tái)測(cè)試
新建死信交換機(jī),這個(gè)和普通的沒啥區(qū)別

新建死信隊(duì)列

把死信交換機(jī)和隊(duì)列進(jìn)行綁定

新建普通的隊(duì)列,并設(shè)置過期時(shí)間與指定的死信交換機(jī)

【面試題】RabbitMQ核心技能面試
1.面試官:你是如何理解RabbitMQ里面的虛擬主機(jī)的呢?
vhost可以認(rèn)為是一個(gè)虛擬的小型版rabbitmq隊(duì)列,它的內(nèi)部有含有獨(dú)立的queue、exchange和binding等等,還擁有獨(dú)立的權(quán)限系統(tǒng),可以做到vhost的范圍用戶控制,更多用戶做不同的權(quán)限隔離。在用于不同的業(yè)務(wù)模塊的邏輯隔離,一個(gè)Virtual Host里面可以有一個(gè)或者是多個(gè)Exchange和Queue,同一個(gè)VirtualHost的話里面不可以有相同名稱的Exchange或者是Queue
2.面試官:在項(xiàng)目里,如何選擇哪一個(gè)隊(duì)列產(chǎn)品呢?
主流的消息隊(duì)列有:ActiveMQ、Kafka、RabbitMQ、RocketMQ
ActiveMQ:Apache出品,歷史悠久,?持多種語?的客戶端和協(xié)議,?持多種語?Java, .NET, C++ 等,基于JMS Provider的實(shí)現(xiàn)
缺點(diǎn):吞吐量不?,多隊(duì)列的時(shí)候性能下降,存在消息丟失的情況,?較少?規(guī)模使?
Kafka:是由Apache軟件基?會(huì)開發(fā)的?個(gè)開源流處理平臺(tái),由Scala和Java編寫。Kafka是?種?吞吐量的分
布式發(fā)布訂閱消息系統(tǒng),它可以處理?規(guī)模的?站中的所有動(dòng)作流數(shù)據(jù)(??瀏覽,搜索和其他?戶的?動(dòng)),副本集機(jī)制,實(shí)現(xiàn)數(shù)據(jù)冗余,保障數(shù)據(jù)盡量不丟失;?持多個(gè)?產(chǎn)者和消費(fèi)者類似MQ,功能較為簡(jiǎn)單,主要?持簡(jiǎn)單的MQ功能
缺點(diǎn):不?持批量和?播消息,運(yùn)維難度?,?檔?較少, 需要掌握Scala
RocketMQ:阿?開源的?款的消息中間件, 純Java開發(fā),具有?吞吐量、?可?性、適合?規(guī)模分布式系統(tǒng)應(yīng)?的
特點(diǎn), 性能強(qiáng)勁(零拷?技術(shù)),?持海量堆積, ?持指定次數(shù)和時(shí)間間隔的失敗消息重發(fā),?持consumer端tag過濾、延遲消息等,在阿?內(nèi)部進(jìn)??規(guī)模使?,適合在電商,互聯(lián)??融等領(lǐng)域,基于JMS Provider的實(shí)現(xiàn)
缺點(diǎn):社區(qū)相對(duì)不活躍,更新?較快,純java?持
RabbitMQ:是?個(gè)開源的AMQP實(shí)現(xiàn),服務(wù)器端?Erlang語?編寫,?持多種客戶端,如:Python、Ruby、.NET、Java、C、?于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息,在易?性、擴(kuò)展性、?可?性等??表現(xiàn)不錯(cuò)
缺點(diǎn):使?Erlang開發(fā),閱讀和修改源碼難度?
3.面試官:講解下怎么樣可以避免重復(fù)消費(fèi)
任何消息隊(duì)列產(chǎn)品不保證消息不重復(fù),如果你的業(yè)務(wù)需要保證嚴(yán)格的不重復(fù)消息,需要你??在業(yè)務(wù)端去重,kafka、rocketmq、rabbitmq等都是?樣的,接?冪等性保障 ,消費(fèi)端處理業(yè)務(wù)消息要保持冪等性。
「使用redis」
setNX
//Redis中操作,判斷是否已經(jīng)操作過
TODO
?boolean?flag?=?jedis.setNX(key);
?if(flag){
?//消費(fèi)
?}else{
?//忽略,重復(fù)消費(fèi)
?}
Incr原子操作:key?增,?于0 返回值?于0則說明消費(fèi)過
int?num?=?jedis.incr(key);
?if(num?==?1){
?//消費(fèi)
?}else{
?//忽略,重復(fù)消費(fèi)
?}
上述兩個(gè)?式都可以,但是排重可以不考慮原?問題,數(shù)據(jù)量多需要設(shè)置過期時(shí)間,考慮原?問題

