SpringBoot+RabbitMQ 死信隊列
點擊關(guān)注公眾號,Java干貨及時送達??
前言
死信:無法被消費的消息,稱為死信。
如果死信一直留在隊列中,會導致一直被消費,卻從不消費成功。
所以我們專門開辟了一個來存放死信的隊列,叫死信隊列(DLX,dead-letter-exchange)。
死信的幾種來源:
消息 TTL 過期(time to live,存活時間,可以用在限時支付消息) 隊列達到最大長度(隊列滿了,無法路由到該隊列) 消息被拒絕( basic.reject / basic.nack),并且requeue = false

環(huán)境準備配置
準備 MQ 的隊列和環(huán)境:
正常交換機 正常隊列(最長隊列 5) ---- 正常消費者,拒絕消息 ttl 隊列(過期時間 60 秒) ---- 沒有消費者 死信交換機 死信隊列
主要配置文件如下:
@Configuration
public?class?DeadConfig?{
????/*?正常配置?**********************************************************************************************************/
????/**
?????*?正常交換機,開啟持久化
?????*/
????@Bean
????DirectExchange?normalExchange()?{
????????return?new?DirectExchange("normalExchange",?true,?false);
????}
????@Bean
????public?Queue?normalQueue()?{
????????// durable:?是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效
????????// exclusive:?默認也是false,只能被當前創(chuàng)建的連接使用,而且當連接關(guān)閉后隊列即被刪除。此參考優(yōu)先級高于durable
????????// autoDelete:?是否自動刪除,當沒有生產(chǎn)者或者消費者使用此隊列,該隊列會自動刪除。
????????Map?args?=?deadQueueArgs();
????????//?隊列設(shè)置最大長度
????????args.put("x-max-length",?5);
????????return?new?Queue("normalQueue",?true,?false,?false,?args);
????}
????@Bean
????public?Queue?ttlQueue()?{
????????Map?args?=?deadQueueArgs();
????????//?隊列設(shè)置消息過期時間?60?秒
????????args.put("x-message-ttl",?60?*?1000);
????????return?new?Queue("ttlQueue",?true,?false,?false,?args);
????}
????@Bean
????Binding?normalRouteBinding()?{
????????return?BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normalRouting");
????}
????@Bean
????Binding?ttlRouteBinding()?{
????????return?BindingBuilder.bind(ttlQueue()).to(normalExchange()).with("ttlRouting");
????}
????/*?死信配置?**********************************************************************************************************/
????/**
?????*?死信交換機
?????*/
????@Bean
????DirectExchange?deadExchange()?{
????????return?new?DirectExchange("deadExchange",?true,?false);
????}
????/**
?????*?死信隊列
?????*/
????@Bean
????public?Queue?deadQueue()?{
????????return?new?Queue("deadQueue",?true,?false,?false);
????}
????@Bean
????Binding?deadRouteBinding()?{
????????return?BindingBuilder.bind(deadQueue()).to(deadExchange()).with("deadRouting");
????}
????/**
?????*?轉(zhuǎn)發(fā)到?死信隊列,配置參數(shù)
?????*/
????private?Map?deadQueueArgs()? {
????????Map?map?=?new?HashMap<>();
????????//?綁定該隊列到私信交換機
????????map.put("x-dead-letter-exchange",?"deadExchange");
????????map.put("x-dead-letter-routing-key",?"deadRouting");
????????return?map;
????}
}
arguments 具體參數(shù)如下:

隊列達到最大長度
首先測試最簡單的,沒有消費者。
調(diào)用6次正常隊列的生產(chǎn)方法。
?/**
??*?正常消息隊列,隊列最大長度5
??*/
?@GetMapping("/normalQueue")
?public?String?normalQueue()?{
?????Map?map?=?new?HashMap<>();
?????map.put("messageId",?String.valueOf(UUID.randomUUID()));
?????map.put("data",?System.currentTimeMillis()?+?",?正常隊列消息,最大長度?5");
?????rabbitTemplate.convertAndSend("normalExchange",?"normalRouting",?map,?new?CorrelationData());
?????return?JSONObject.toJSONString(map);
?}
MQ 結(jié)果如下:

消息 TTL 過期
消息的TTL 指的是消息的存活時間,我們可以通過設(shè)置消息的TTL或者隊列的TTL來實現(xiàn)。
消息的TTL :對于設(shè)置了過期時間屬性(expiration)的消息,消息如果在過期時間內(nèi)沒被消費,會過期 隊列的TTL :對于設(shè)置了過期時間屬性(x-message-ttl)的隊列,所有路由到這個隊列的消息,都會設(shè)置上這個過期時間
兩種配置都行,一般都用在定時任務,限時支付這種地方。
?/**
??*?消息?TTL,?time?to?live
??*/
?@GetMapping("/ttlToDead")
?public?String?ttlToDead()?{
?????Map?map?=?new?HashMap<>();
?????map.put("messageId",?String.valueOf(UUID.randomUUID()));
?????map.put("data",?System.currentTimeMillis()?+?",?ttl隊列消息");
?????rabbitTemplate.convertAndSend("normalExchange",?"ttlRouting",?map,?new?CorrelationData());
?????return?JSONObject.toJSONString(map);
?}
發(fā)送后:

等待過期后:

Demo 中只是為了方便,代碼中盡量使用 消息TTL,不要用 隊列TTL
拒絕消息
正常隊列消費后拒絕消息,并且不進行重新入隊:
@Component
@RabbitListener(queues?=?"normalQueue")
public?class?NormalConsumer?{
????@RabbitHandler
????public?void?process(Map?message,?Channel?channel,?Message?mqMsg) ?throws?IOException?{
????????System.out.println("收到消息,并拒絕重新入隊?:?"?+?message.toString());
????????channel.basicReject(mqMsg.getMessageProperties().getDeliveryTag(),?false);
????}
}
MQ 控制臺:

死信隊列消費:
@Component
@RabbitListener(queues?=?"deadQueue")
public?class?DeadConsumer?{
????@RabbitHandler
????public?void?process(Map?message,?Channel?channel,?Message?mqMsg) ?throws?IOException?{
????????System.out.println("死信隊列收到消息?:?"?+?message.toString());
????????channel.basicAck(mqMsg.getMessageProperties().getDeliveryTag(),?false);
????}
}
消息順序和實驗一致:
死信隊列收到消息?:?{data=1631534291765,?正常隊列消息,最大長度?5,?messageId=bce3888b-da38-4299-ac88-d22cbe164739}
死信隊列收到消息?:?{data=1631535222745,?ttl隊列消息,?messageId=a4617445-5aab-4fac-aec7-5709ea699598}
死信隊列收到消息?:?{data=1631534503765,?正常隊列消息,最大長度?5,?messageId=b65ecaab-5ce7-4597-a32c-c90b67ec46da}
死信隊列收到消息?:?{data=1631534511468,?正常隊列消息,最大長度?5,?messageId=d63d2a4c-e7d3-4f00-a6ca-78e2d62d1d92}
死信隊列收到消息?:?{data=1631534585087,?正常隊列消息,最大長度?5,?messageId=eed0c349-415b-43dc-aa79-c683122a1289}
死信隊列收到消息?:?{data=1631534588311,?正常隊列消息,最大長度?5,?messageId=7a7bd152-f2fa-4a74-b9e6-943ac7cbb3d4}
死信隊列收到消息?:?{data=1631534608504,?正常隊列消息,最大長度?5,?messageId=9de512a1-4ca4-4060-9096-27aba01c1687}
來源:https://blog.csdn.net/m0_46144826
1.?任務調(diào)度框架 Quartz 用法指南(超詳細)
最近面試BAT,整理一份面試資料《Java面試BATJ通關(guān)手冊》,覆蓋了Java核心技術(shù)、JVM、Java并發(fā)、SSM、微服務、數(shù)據(jù)庫、數(shù)據(jù)結(jié)構(gòu)等等。
獲取方式:點“在看”,關(guān)注公眾號并回復?Java?領(lǐng)取,更多內(nèi)容陸續(xù)奉上。
PS:因公眾號平臺更改了推送規(guī)則,如果不想錯過內(nèi)容,記得讀完點一下“在看”,加個“星標”,這樣每次新文章推送才會第一時間出現(xiàn)在你的訂閱列表里。
點“在看”支持小哈呀,謝謝啦??
評論
圖片
表情

