《RabbitMQ》| 解決消息延遲和堆積問題
大家好,我是小菜。一個希望能夠成為 吹著牛X談架構(gòu) 的男人!如果你也想成為我想成為的人,不然點個關(guān)注做個伴,讓小菜不再孤單!
本文主要介紹
RabbitMQ的常見問題如有需要,可以參考
如有幫助,不忘 點贊 ?
微信公眾號已開啟,小菜良記,沒關(guān)注的同學(xué)們記得關(guān)注哦!
- 消息可靠性問題:如何確保發(fā)送的消息至少被消費一次?
- 延遲消息問題:如何實現(xiàn)消息的延遲投遞?
- 消息堆積問題:如何解決數(shù)百萬級以上消息堆積,無法及時消費問題?
我們在上篇已經(jīng)說明了如何解決消息丟失的問題,也就是保證了消息的可靠性,那么其余兩個問題同樣重要,這篇我們將講述其余兩個問題的解決方式~!
消息丟失解決方案:《RabbitMQ》 | 消息丟失也就這么回事
一、延遲消息
延遲消息 字面意思就是讓延遲接收消息,那么如何能讓消息延遲到達(dá)?這就是我們要思考解決的問題,在了解延遲隊列之前我們需要先明白 RabbitMQ 中的兩個概念
- 死信交換機(jī)
- TTL
1)死信交換機(jī)
死信(dead letter),也就是廢棄已死亡的消息,那什么情況下一個普通的消息能夠成為死信?需要符合以下三個條件:
- 消費者使用
basic.reject或basic.nack聲明消費失敗,并將消息的 requeue 參數(shù)設(shè)置為false - 消息是一個過期消息,超時后無人消費
- 要投遞的隊列消息堆積滿了,最早的消息就會成為死信
而 死信交換機(jī) 便是 死信 的歸屬。
如果一個隊列配置了 dead-letter-exchange 屬性,指定了一個交換機(jī),那么隊列中的死信就會投遞到這個交換機(jī)中,而這個交換機(jī)就稱為 死信交換機(jī) - DLX(Dead Letter Exchange )
步驟:當(dāng)生產(chǎn)者正常投遞到隊列(simple.queue)中,如果消費者從隊列(simple.queue) 消費消息卻聲明了 reject,那并且隊列綁定了死信交換機(jī)(dl.queue),那么這個時候成為死信的消息就會投遞到這個死信隊列(dl.queue)中。
死信投遞過程從正常隊列 --> 死信隊列 的過程,我們必須聲明兩個關(guān)鍵信息
- 死信交換機(jī)的名稱
- 死信交換機(jī)與死信隊列綁定的路由key
而這兩個信息也是我們投遞消息的基礎(chǔ)配置。
接下來我們簡單模擬一下 條件1 所產(chǎn)生的場景
1、首先聲明一個死信交換機(jī)和死信隊列
我們這邊是使用簡單的注解方式直接生成
生成死信交換機(jī)和死信隊列通過 RabbitMQ 控制臺界面可以看出已經(jīng)成功生成

2、聲明正常使用交換機(jī)與隊列
然后這個時候我們就可以創(chuàng)建一個正常使用的交換機(jī)與隊列,并指明死信交換機(jī)

同樣可以通過控制臺查看創(chuàng)建狀態(tài)

其中是否有聲明死信交換機(jī)我們可以通過隊列的 DLX 和 DLK 標(biāo)志判斷
3、模擬拒收
然后我們現(xiàn)在通過代碼模擬客戶端拒絕消息的場景
1)消息發(fā)送

2)消息接收

查看控制臺,結(jié)果如下:
2021-11-06?23:56:52.095??INFO?2112?---?[ntContainer#0-1]?c.l.m.c.listener.SpringRabbitListener????:?正常業(yè)務(wù)交換機(jī)?|?接收到的消息?:?[hello]
2021-11-06?23:56:52.118??INFO?2112?---?[ntContainer#1-1]?c.l.m.c.listener.SpringRabbitListener????:?死信交換機(jī)?|?接收到的消息?:?hello
這說明我們死信交換機(jī)已經(jīng)成功發(fā)揮作用
2)TTL
以上我們已經(jīng)成功認(rèn)識到了 死信交換機(jī) 的使用,但是這與我們一開始說的 延遲隊列 似乎并沒有太大關(guān)系,莫急~接下來說到的 TTL(Time-To-Live) 就是用來處理延遲消息的~!
在 TTL 的概念中,如果一個隊列中的消息 TTL 結(jié)束后仍未被消費,那么這個消息就會自動變?yōu)樗佬牛?TTL 超時情況分為兩種:
- 消息所在的隊列設(shè)置了存活時間
- 消息本身設(shè)置了存活時間

我們同樣進(jìn)行上述 條件2 的模擬場景
1、聲明死信交換機(jī)與死信隊列(上述已完成)
2、聲明延遲隊列并指定死信交換機(jī)

同樣控制臺查看創(chuàng)建結(jié)果,并且我們發(fā)現(xiàn)不止有 DLX 和 DLK 標(biāo)志,還多了個 TTL ,說明該隊列是延遲隊列

3、模擬消費超時情況
我們往延遲隊列中發(fā)送一條消息,并且沒有消費者進(jìn)行消費,等待 1 分鐘后查看是否能進(jìn)入 死信隊列 中

我們已經(jīng)發(fā)送了一條消息到延遲隊列并且一分鐘后也成功在控制臺發(fā)現(xiàn)了這條信息已經(jīng)進(jìn)入到了死信交換機(jī)
2021-11-07?00:01:30.854??INFO?32752?---?[ntContainer#1-1]?c.l.m.c.listener.SpringRabbitListener????:?死信交換機(jī)?|?接收到的消息?:?test?ttl-message
以上是配置了隊列超時時間,消息本身自然也能配置超時時間,當(dāng) 消息 和 隊列 都存在超時時間時,那么就以最短的 TTL 為準(zhǔn),消息的超時配置如下:

如上圖所示,我們可以利用 Message 這個類來傳遞消息信息,并設(shè)置上超時時間,我們設(shè)置的是 5000 ms,等待發(fā)送成功后,控制臺過5000 ms 也成功打印了死信交換機(jī)消費的消息:
2021-11-07?00:03:09.048??INFO?39996?---?[ntContainer#1-1]?c.l.m.c.listener.SpringRabbitListener????:?死信交換機(jī)?|?接收到的消息?:?this?is?a?ttl?message
3)延遲隊列
我們上述是使用 死信交換機(jī) 來間接實現(xiàn) 延遲隊列 的效果,但實際在 RabbitMQ 不必如此麻煩,RabbitMQ 已經(jīng)為我們封裝好了插件,我們只需要下載安裝即可~
RabbitMQ 插件下載地址
我們進(jìn)入地址可以發(fā)現(xiàn)有許多插件,搜索 delay 關(guān)鍵字找到我們需要的插件進(jìn)行下載

下載完后直接上傳到 RabbitMQ 的插件目錄 - plugins,小菜這邊是使用 docker 臨時安裝測試的,所以已經(jīng)將該插件目錄掛載出來了:
docker?run?-itd?--name?rabbitmq?-v?plugins:/plugins?-p?15672:15672?-p?5672:5672?rabbitmq:management
因此我這邊直接將插件上傳到容器中的 plugins 目錄即可~
然后進(jìn)入到容器中執(zhí)行以下命令進(jìn)行插件開啟
rabbitmq-plugins?enable?rabbitmq_delayed_message_exchange

并且我們在控制臺創(chuàng)建交換機(jī)的時候可以看到 type 類型多了個選項

成功執(zhí)行到這步就說明已經(jīng)開啟了 RabbitMQ 的延遲隊列功能
那接下來我們就可以來使用 DelayExchange,首先我們需要了解代碼的方式創(chuàng)建延遲交換機(jī):
方式1

方式2

當(dāng)我們?nèi)f事具備之后就可以來發(fā)送消息了
在發(fā)送消息的時候,消息頭中一定要攜帶上 x-delay 參數(shù),指定上延遲時間

通過這樣配置之后,我們可以在控制臺看到,經(jīng)過10秒后 delay.queue 才收到對應(yīng)消息,然后被對應(yīng)消費者消費
3)總結(jié)
我們上面從 死信交換機(jī) 到 TTL 到 延遲隊列,一步步認(rèn)識了如何實現(xiàn)延遲消息的功能,然后我們進(jìn)行一個小小的總結(jié):
問題1:什么樣的消息會成為死信?
- 消息被消費者 reject 或返回 nack
- 消息超時未及時消費
- 消息隊列滿了
問題2:消息超時的方式
- 給隊列設(shè)置 TTL 屬性
- 給消息設(shè)置 TTL 屬性
問題3:如何使用延遲隊列
- 下載并啟用 RabbitMQ 延遲隊列插件
- 聲明一個交換機(jī),并將 delayed 屬性設(shè)置為 true
- 發(fā)送消息時,添加 x-delay 頭,值為超時時間
問題4:延遲隊列的使用場景
- 延遲發(fā)送短信通知
- 訂單自動取消
- 庫存自動回滾
二、惰性隊列
講完延遲隊列,我們繼續(xù)來認(rèn)識惰性隊列
講惰性隊列之前,我們先拋出一個問題~
RabbitMQ 如何解決消息堆積問題
什么情況下會出現(xiàn)消息堆積問題?
- 當(dāng)生產(chǎn)者生產(chǎn)速度遠(yuǎn)遠(yuǎn)消費者消費速度
- 當(dāng)消費者宕機(jī)沒有及時重啟
那么如何解決這個問題?通常思路如下:
- 在消費者機(jī)器重啟后,增加更多的消費者進(jìn)行處理
- 在消費者處理邏輯內(nèi)部開辟線程池,利用多線程的方式提高處理速度
- 擴(kuò)大隊列的容量,提高堆積上限
這幾個方式從理論上來說解決消息堆積問題也是沒有問題的,但是處理方式不夠優(yōu)雅甚至不夠靈活~ 那么除了以上的幾種解決方式,我們可以利用 RabbitMQ 中自帶的一種隊列類型 -- 惰性隊列
什么是惰性隊列?我們認(rèn)識一下惰性隊列的幾個特性:
- 接收到消息后直接存入磁盤而非內(nèi)存
- 消費者要消費消息時才會從磁盤中讀取并加載到內(nèi)存中
- 它支持百萬級消息的存儲
說到底,就是利用磁盤的緩沖機(jī)制,而這種機(jī)制的缺點就是消息的時效性會降低,性能受限于磁盤的IO,認(rèn)識特性和缺點之后,我們便來看看如何創(chuàng)建惰性隊列
方式1

方式2

方式3
該方式是直接基于命令行修改將一個正在運行中的隊列修改為惰性隊列
rabbitmqctl?set_policy?Lazy?"^lazy-queue$"?'{"queue-mode":"lazy"}'?--apply-to?queues??
其中幾個命令參數(shù)含義如下:
- rabbitmqctl:命令行工具
- set_policy:添加一個策略
- Lazy:策略名稱,可以自定義
- ^lazy-queue$:用正則表達(dá)式匹配隊列的名稱
- '{"queue-mode":"lazy"}':設(shè)置隊列為 lazy 模式
- --apply-to queues:策略的作用對象,是所有的隊列
這種惰性隊列的方式盡管缺點是消息時效性會降低,但是在某些場景下也不是不能接受,何況它的優(yōu)點同樣明顯:
- 基于磁盤存儲,消息上限高
- 沒有間歇性的 page-out,性能穩(wěn)定
到這里,我們就已經(jīng)講述了 RabbitMQ 的常見問題,對于我們來說,普通的開發(fā)場景可能比較少遇到這些問題,但是沒遇到不等于沒有,所以我們還是需要多認(rèn)識來防患于未然!
不要空談,不要貪懶,和小菜一起做個吹著牛X做架構(gòu)的程序猿吧~點個關(guān)注做個伴,讓小菜不再孤單。咱們下文見!
看完不贊,都是壞蛋今天的你多努力一點,明天的你就能少說一句求人的話!我是小菜,一個和你一起變強(qiáng)的男人。
??微信公眾號已開啟,小菜良記,沒關(guān)注的同學(xué)們記得關(guān)注哦!
