《RabbitMQ》 | 消息丟失也就這么回事
大家好,我是小菜。一個希望能夠成為 吹著牛X談架構 的男人!如果你也想成為我想成為的人,不然點個關注做個伴,讓小菜不再孤單!
本文主要介紹
RabbitMQ的消息丟失問題如有需要,可以參考
如有幫助,不忘 點贊 ?
微信公眾號已開啟,小菜良記,沒關注的同學們記得關注哦!
是的,最終是對 RabbitMQ 下手了!
面試中常見的RabbitMQ面試題也是多了去了,常見的如下:
消息可靠性問題:如何確保發(fā)送的消息至少被消費一次? 延遲消息問題:如何實現(xiàn)消息的延遲投遞? 高可用問題:如何避免單點的MQ故障而導致的不可用問題? 消息堆積問題:如何解決數(shù)百萬級以上消息堆積,無法及時消費問題?
這幾個問題又得讓你腦殼疼一陣子,是不是也在網(wǎng)上看了挺多博文介紹這方面的解決方案,但是卻看了又忘,實際便是因為缺少實操,這篇小菜便重點講述下 RabbitMQ 如何解決消息丟失問題~
一、消息可靠性問題
消息可靠性問題我們又可能將其理解為如何防止消息丟失?那為什么消息會丟失呢?我們可以先看看消息投遞的整個過程:

我們從圖中可以從三個階段分析可能造成消息丟失:
publisher 發(fā)送消息到 exchange
exchange 分發(fā)到 queue
queue 投遞到 customer
既然我們知道了哪些階段可能造成數(shù)據(jù)丟失,那我們就可以從源頭防范于未然~!
工程結構
工程結構很簡單,就是一個簡單的 Spring Boot 項目,里面有個 消費者 和 生產(chǎn)者 兩個模塊

1、生產(chǎn)者發(fā)送丟失
RabbitMQ 中提供了 publisher confirm 機制來避免消息發(fā)送到 MQ 的過程中丟失的問題。消息發(fā)送到 MQ 以后,會返回一個確認結果給生產(chǎn)者,用于表示消息是否確認成功。該確認結果存在兩種請求:
publisher-confirm
該類型是 發(fā)送者確認 ,存在兩種情況
消息成功投遞到交換機,返回 ack消息未投遞到交換機,返回 nack
publisher-return
該類型是 發(fā)送者回執(zhí) ,存在兩種情況
消息投遞到交換機,且成功分發(fā)到隊列,返回 ack消息投遞到交換機,但未成功分發(fā)到隊列,返回 nack

注意:確認機制發(fā)送消息時,需要給每個消息設置一個全局唯一ID,以區(qū)分不同消息,避免ack沖突
接下來我們用代碼來說明具體的操作方式
1)配置文件
我們首先看下 生產(chǎn)者 的配置文件

前面幾個配置 RabbitMQ 的連接信息沒啥好講的,我們來看幾個比較陌生的配置
publisher-confirm-type
開啟發(fā)送確認,這里可以支持兩種類型
simple:同步等待 confirm 結果,直到超時 correlated:異步回調(diào),定義 ConfirmCallback,MQ返回結果時會回調(diào)這個 ConfirmCallback
publisher-returns
開啟 public-return,同樣是基于 CallBack 機制,不過是定義 ReturnCallback
template.mandatory
定義路由失敗時的策略。
true:調(diào)用 ReturnCallback false:直接丟棄消息
2)定義回調(diào)事件
每個 RabbitTemplate 只能配置一個 ReturnCallback

3)發(fā)送消息

執(zhí)行發(fā)送代碼之前,我們確保已經(jīng)創(chuàng)建了(一個直連交換機
direct-exchange,一個隊列direct-queue,且綁定的 key 為direct
正常情況下,我們執(zhí)行代碼肯定是發(fā)送成功的,可以看到控制臺綠色輸出

且我們在消息隊列中也成功接收到了消息:

到這步是沒有任何問題的,那我們就需要手動給它制造點問題~ 我們可以修改 交換機名稱,這個時候發(fā)送消息的時候找不到交換機,那么交換機肯定就會返回 nack ,再看是否可以進入到我們代碼中的判斷:

代碼執(zhí)行雖然是綠色的,但因為rabbitMQ找不到正確的交換機,而導致消息發(fā)送失敗,也就是下圖的這個過程:

這一個是 publish -> exchange 失敗我們順利的捕獲到了,那么 exchange -> queue 這步的失敗是我們是否能夠正常捕獲?我們可以通過修改 路由 key 使交換機路由不到對應的 queue

可以發(fā)現(xiàn)當交換機沒有路由到相對應的 queue 時,也成功觸發(fā)了我們自定義的回調(diào)函數(shù),然后看 rabbitMQ 控制臺是可以發(fā)現(xiàn)消息已經(jīng)成功投遞到交換機

到這里,我們通過兩種簡單的錯誤模擬,使程序都能順利的進入到我們預先定義的回調(diào)中,如果遇到發(fā)送失敗的情況,我們可以在失敗的回調(diào)中自定義消息重發(fā)機制,最大程度上避免消息丟失的問題
4)總結
我們可以通過 publisher-confirm 和 publisher-return 兩種錯誤捕獲機制,來避免 生產(chǎn)者 -> exchange -> queue 這條鏈路的消息丟失
publisher-confirm消息成功發(fā)送到 exchange,返回 ack 消息未能成功發(fā)送到 exchange,返回 nack 消息發(fā)送過程中出現(xiàn)異常,沒有收到回執(zhí),則進入 failureCallback 回調(diào) publisher-return消息成功發(fā)送到 exchange,但沒有路由到 queue,調(diào)用自定義回調(diào)函數(shù) returnCallback
2、消息存儲丟失
消息存儲丟失是啥意思?其實就是持久化 的概念,當消息已經(jīng)成功發(fā)送到 queue 時,這個時候如果消費者沒有及時進行消費,rabbitMQ 又剛好宕機重啟了,那么這個時候就會發(fā)現(xiàn)消息丟失了。
這是因為 MQ 默認是內(nèi)存存儲消息,我們可以通過開啟持久化的功能來確保在 MQ 中的消息不丟失
其實我們通過 RabbitMQ 提供的 GUI 創(chuàng)建交換機或隊列的時候就可以發(fā)現(xiàn)有持久化的這個選項

如果將 durability 設為 durable 后,我們可以發(fā)現(xiàn)無論如何重啟 MQ,重啟后交換機和隊列依然存在。
但是很多時候我們交換機和 隊列 的創(chuàng)建并非在 GUI 上創(chuàng)建,而是通過應用代碼的方式創(chuàng)建
交換機持久化

隊列持久化

消息持久化
默認情況下,AMQP 發(fā)出的消息都是持久化的,不用特意指定

3、消費者消費丟失
RabbitMQ 采取的機制是當確認消息被消費者消費后就會立即刪除
那么如何確認消息已被消費者消費?那就還得依靠回執(zhí)來確認,消費者獲取消息后,需要向 RabbitMQ 發(fā)送 ack 回執(zhí),表明自己已經(jīng)處理消息。其中 ack 在 AMQP 中有三種確認模式:
manual:手動 ack,需要在業(yè)務代碼結束后,調(diào)用 api 發(fā)送 ack auto:自動 ack,由 spring 監(jiān)測 listener 代碼是否出現(xiàn)異常,沒有異常則返回 ack,反之返回 nack none:關閉 ack,MQ 在消息投遞后會立即刪除消息
上述三種方式都是通過修改配置文件:

1)manual
該方式需要用戶自己手動確認,靈活性較好

這個時候如果執(zhí)行邏輯是正常的,那么在 RabbitMQ 上就會將該消息刪除,但是如果執(zhí)行的邏輯拋出了異常,沒有進入到手動確認的環(huán)節(jié),RabbitMQ 將會把該消息保留:


2)auto
該方式在沒有異常發(fā)生時會自動進行消息確認
我們在配置文件中將確認方式改為 auto 進行測試:

正常情況下接收消息是沒有任何問題的,那我們同樣制造些非正常情況:

我們手動制造了點異常,發(fā)現(xiàn)消息沒有被 RabbitMQ 刪除的同時,而且控制臺一直在報錯,無止境的在嘗試重新消費,這如果放在線上環(huán)境難免有些令人崩潰。
當消費者出現(xiàn)異常后,消息會不斷 requeue(重新入隊)到隊列,再重新發(fā)送給消費者,然后再次異常,再次 requeue,無限循環(huán),就會導致 MQ 的消息處理飆升
而發(fā)生這種情況的原因所在便是因為 RabbitMQ的消息失敗重試機制,但很多時候我們可能不想一直重試,只需要經(jīng)過幾次嘗試,如果失敗就放棄處理,這個時候我們就需要在配置文件中配置失敗重試機制:

開啟該配置后,我們重啟項目進行觀察

通過控制臺可以看到在重試 3 次后,SpringAMQP會拋出異常AmqpRejectAndDontRequeueException,說明本地重試機制生效了。而且我們回到 RabbitMQ 控制臺可以看到對應消息被刪除了,說明最后 SpringAMQP 返回的是 ack,導致消息被 MQ 刪除

但是這種處理方式并不優(yōu)雅,重試后直接刪除消息過于 暴力,那么有沒有更好的處理方式?答案是有的!
我們可以利用 AMQP 提供的 MessageRecovery 接口來實現(xiàn),該接口有三種不同的實現(xiàn)方式:
RejectAndDontRequeueRecoverer:重試耗盡后,直接 reject,丟失消息。默認方式,以上就是采用這種方式 ImmediateRequeueMessageRecoverer:重試耗盡后,返回 nack,消息重新入隊 RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機
三種方式可以根據(jù)不同場景進行采用,分析一下,不難發(fā)現(xiàn)第三種 RepublishMessageRecoverer 是比較優(yōu)雅的~ 當重試失敗后會將消息投遞到一個指定專門存放異常消息的隊列,后續(xù)由人工集中進行處理!具體使用方式如下:

通過自定義異常處理后,我們重啟項目查看控制臺:

可以發(fā)現(xiàn)重試3次后,我們的異常消息進入到了我們自定義的異常隊列中

3)none
該方式?jīng)]啥好講的~ 無論消息異常與否 MQ 都會進行刪除!
4、總結
假如這個時候面試再問你,如何確保 RabbitMQ消息的可靠性?那你可得好好嘮嗑嘮嗑
如何保證消息不丟失?
1)首先分析丟失的場景有哪些?
消息丟失可能發(fā)生在 發(fā)送時丟失(未送達 exchange / 未路由到 queue)、消息未持久化而MQ宕機、消費者接收消息未能正確消費
2)然后如何預防
開啟生產(chǎn)者確認機制,確保生產(chǎn)者的消息能到達隊列
確認機制包括 publisher-confirm 和 publisher-return
當未送達到 交換機 我們可以通過 publisher-confirm 返回的 ack 和 nack 來確認
當 交換機 未成功路由到 隊列,我們可以通過 publisher-return 自定義的回調(diào)函數(shù)來確認,每個 RabbitTemplate 只能配置一個 ReturnCallback
開啟持久化功能,確保消息未消費前在隊列中不會丟失
持久化功能分為 交換機持久化、隊列持久化 和 消息持久化,我們都需要將 durable 設置為 true
開啟消費者確認機制最低為 auto級別
消費者確認機制有三種類型:manual (手動確認)、auto (自動確認)、none (關閉 ack)
失敗重試機制
我們手動設置 MessageResoverer 為 RepublishMessageRecoverer 方式,將投遞失敗的消息轉(zhuǎn)到異常隊列中,交由人工處理
這一套組合拳回答下來,面試官還不得默默承認你有點東西?
當然這只是 RabbitMQ 的問題之一,我們下篇繼續(xù)其他幾個問題的解決方式~
不要空談,不要貪懶,和小菜一起做個吹著牛X做架構的程序猿吧~點個關注做個伴,讓小菜不再孤單。咱們下文見!

今天的你多努力一點,明天的你就能少說一句求人的話!我是小菜,一個和你一起變強的男人。
??微信公眾號已開啟,小菜良記,沒關注的同學們記得關注哦!
