Springboot+RabbitMQ死信隊(duì)列
點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”
優(yōu)質(zhì)文章,第一時(shí)間送達(dá)
76套java從入門到精通實(shí)戰(zhàn)課程分享
關(guān)于死信隊(duì)列
在大多數(shù)的MQ中間件中,都有死信隊(duì)列的概念。死信隊(duì)列同其他的隊(duì)列一樣都是普通的隊(duì)列。在RabbitMQ中并沒(méi)有特定的“死信隊(duì)列”類型,而是通過(guò)配置,將其實(shí)現(xiàn)。
當(dāng)我們?cè)趧?chuàng)建一個(gè)業(yè)務(wù)的交換機(jī)和隊(duì)列的時(shí)候,可以配置參數(shù),指明另一個(gè)隊(duì)列為當(dāng)前隊(duì)列的死信隊(duì)列,在RabbitMQ中,死信隊(duì)列(嚴(yán)格的說(shuō)應(yīng)該是死信交換機(jī))被稱為DLX Exchange。當(dāng)消息“死掉”后,會(huì)被自動(dòng)路由到DLX Exchange的queue中。
什么樣的消息會(huì)進(jìn)入死信隊(duì)列?
1.消息的TTL過(guò)期。
2.消費(fèi)者對(duì)broker應(yīng)答Nack,并且消息禁止重回隊(duì)列。
3.Queue隊(duì)列長(zhǎng)度已達(dá)上限。
場(chǎng)景分析
以用戶訂單支付為場(chǎng)景。在各大電商平臺(tái)上,訂單的都有待支付時(shí)間,通常為30min。當(dāng)用戶超過(guò)30min未支付訂單,該訂單的狀態(tài)應(yīng)該會(huì)變成“超時(shí)取消”,或類似的狀態(tài)值的改變。
如果不使用MQ,可以設(shè)計(jì)一個(gè)定時(shí)任務(wù),定時(shí)查詢數(shù)據(jù)庫(kù),判斷訂單的狀態(tài)和支付時(shí)間是否已經(jīng)到期,若到期則修改訂單的狀態(tài)。但顯然,這不是一個(gè)很好的操作,頻繁訪問(wèn)數(shù)據(jù)庫(kù),造成不必要的資源浪費(fèi)。
使用MQ,我們可以在下單的時(shí)候,當(dāng)訂單數(shù)據(jù)入庫(kù)后,發(fā)送一條Message到Queue中,并設(shè)置過(guò)期時(shí)間為30min或自定義的支付過(guò)期時(shí)間。
/**
* 發(fā)送帶有過(guò)期時(shí)間的消息
*/
@GetMapping("/sendDlx")
public void sendDlx() {
Order order = new Order();
order.setItemId(1);
order.setStatus(1);
rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey,
JSON.toJSONString(order), message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 模擬,設(shè)置10S后消息過(guò)期
message.getMessageProperties().setExpiration("10000");
return message;
});
}
若30min后,還未有消費(fèi)者(下游服務(wù))消費(fèi)這條消息,那么該條消息就會(huì)被路由到死信隊(duì)列中。我們可以設(shè)置一個(gè)監(jiān)聽去監(jiān)聽死信隊(duì)列,當(dāng)收到死信隊(duì)列的消息后,則根據(jù)消息數(shù)據(jù),查詢數(shù)據(jù)庫(kù)訂單狀態(tài)是否還是待支付狀態(tài),若是,則修改成超時(shí)取消。
代碼實(shí)現(xiàn)
以下是demo,未做服務(wù)的拆分,因此整個(gè)流程都是單個(gè)服務(wù)實(shí)現(xiàn)的,所以就沒(méi)有下游服務(wù),但并不影響整體業(yè)務(wù)。
RabbitMQConfig
將需要的交換機(jī),隊(duì)列,綁定都聲明成SpringBean。Spring會(huì)自動(dòng)創(chuàng)建這些到RabbitMQ服務(wù)中。
@Value注解部分都是配置文件exchange、queue、routingKey的名稱。
/**
* @author wulei
*/
@Configuration
public class RabbitConfig {
@Value("${sunspring.order.exchange}")
private String orderExchange;
@Value("${sunspring.order.queue}")
private String orderQueue;
@Value("${sunspring.order.routingKey}")
private String orderRoutingKey;
@Value("${sunspring.dlx.exchange}")
private String dlxExchange;
@Value("${sunspring.dlx.queue}")
private String dlxQueue;
@Value("${sunspring.dlx.routingKey}")
private String dlxRoutingKey;
/**
* 聲明死信隊(duì)列
* @return DirectExchange
*/
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(dlxExchange);
}
/**
* 聲明死信隊(duì)列
* @return Queue
*/
@Bean
public Queue dlxQueue() {
return new Queue(dlxQueue);
}
/**
* 綁定死信隊(duì)列到死信交換機(jī)
* @return Binding
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(dlxQueue())
.to(dlxExchange())
.with(dlxRoutingKey);
}
/**
* 聲明訂單業(yè)務(wù)交換機(jī)
* @return DirectExchange
*/
@Bean
public DirectExchange orderExchange() {
return new DirectExchange(orderExchange);
}
/**
* 聲明訂單業(yè)務(wù)隊(duì)列
* @return Queue
*/
@Bean
public Queue orderQueue() {
Map<String,Object> arguments = new HashMap<>(2);
// 綁定該隊(duì)列到私信交換機(jī)
arguments.put("x-dead-letter-exchange",dlxExchange);
arguments.put("x-dead-letter-routing-key",dlxRoutingKey);
return new Queue(orderQueue,true,false,false,arguments);
}
/**
* 綁定訂單隊(duì)列到訂單交換機(jī)
* @return Binding
*/
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue())
.to(orderExchange())
.with(orderRoutingKey);
}
}
sunspring.order.exchange=sunspring_order_exchange
sunspring.order.queue=sunspring_order_queue
sunspring.order.routingKey=sunspring.order
sunspring.dlx.exchange=sunspring_dlx_exchange
sunspring.dlx.queue=sunspring.dlx.queue
sunspring.dlx.routingKey=dlx
在聲明業(yè)務(wù)隊(duì)列時(shí),創(chuàng)建了一個(gè)Map,并且put了兩個(gè)值,這兩個(gè)值就是死信隊(duì)列的聲明。
x-dead-letter-exchange:死信交換機(jī)的名稱
x-dead-letter-routing-key:死信交換機(jī)的路由鍵,因?yàn)閐emo中兩個(gè)交換機(jī)的類型都是direct的,因此路由鍵必須相同。
/**
* 聲明訂單業(yè)務(wù)隊(duì)列
* @return Queue
*/
@Bean
public Queue orderQueue() {
Map<String,Object> arguments = new HashMap<>(2);
// 綁定該隊(duì)列到私信交換機(jī)
arguments.put("x-dead-letter-exchange",dlxExchange);
arguments.put("x-dead-letter-routing-key",dlxRoutingKey);
return new Queue(orderQueue,true,false,false,arguments);
}
監(jiān)控頁(yè)面
在exchange列表中有剛剛創(chuàng)建的業(yè)務(wù)交換機(jī)sunspring_order_exchange和死信交換機(jī)
sunspring_dlx_exchange

在Queue列表中,有死信隊(duì)列sunspring_dlx_queue和業(yè)務(wù)隊(duì)列sunspring_order_queue
并且業(yè)務(wù)隊(duì)列上有DLX標(biāo)記,可見當(dāng)前隊(duì)列已經(jīng)綁定了一個(gè)死信隊(duì)列。DLK表示的路由鍵。

場(chǎng)景模擬
生產(chǎn)者
生產(chǎn)者發(fā)送了一個(gè)過(guò)期時(shí)間為10S的消息。
message.getMessageProperties().setExpiration(“10000”);
/**
* 發(fā)送帶有過(guò)期時(shí)間的消息
*/
@GetMapping("/sendDlx")
public void sendDlx() {
Order order = new Order();
order.setItemId(1);
order.setStatus(1);
rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey,
JSON.toJSONString(order), message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setExpiration("10000");
return message;
});
}
sunspring_order_queue接受到了一條消息,當(dāng)前消息的狀態(tài)是ready的,表示沒(méi)有任何消費(fèi)者消費(fèi)這條消息。

10s后,當(dāng)前消息路由到了死信隊(duì)列中,sunspring_order_queue消息數(shù)量變成0,sunspring_dlx_queue數(shù)量變成1。

消費(fèi)者,設(shè)置死信隊(duì)列監(jiān)聽
通過(guò)設(shè)置對(duì)死信隊(duì)列的監(jiān)聽,可以發(fā)現(xiàn),在Springboot啟動(dòng)之后,創(chuàng)建了對(duì)RabbitMQ的監(jiān)聽,死信隊(duì)列的消息也立刻被消費(fèi)了。
因此,我們可以監(jiān)聽死信隊(duì)列,對(duì)未被消費(fèi)的消息進(jìn)行下一步操作。如場(chǎng)景分析中的更改訂單狀態(tài)。
@RabbitListener(queues = "sunspring.dlx.queue")
public void dlxListener(Message message,Channel channel) throws IOException {
System.out.println(new String(message.getBody()));
//對(duì)消息進(jìn)行業(yè)務(wù)處理....
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
2019-08-20 20:05:05.158 INFO 4420 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [120.27.243.91:5672]
2019-08-20 20:05:05.224 INFO 4420 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#68ab0936:0/SimpleConnection@74606204 [delegate=amqp://[email protected]:5672/, localPort= 13563]
{"itemId":1,"status":1}
————————————————
版權(quán)聲明:本文為CSDN博主「小伙子你那什么車啊」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接及本聲明。
原文鏈接:
https://blog.csdn.net/shishishi777/article/details/99879419
粉絲福利:Java從入門到入土學(xué)習(xí)路線圖
??????

??長(zhǎng)按上方微信二維碼 2 秒
感謝點(diǎn)贊支持下哈 
