RabbitMQ延時隊列應用場景,學到了!
應用場景
我們系統(tǒng)未付款的訂單,超過一定時間后,需要系統(tǒng)自動取消訂單并釋放占有物品

常用的方案
就是利用Spring schedule定時任務,輪詢檢查數(shù)據(jù)庫
但是會消耗系統(tǒng)內(nèi)存,增加了數(shù)據(jù)庫的壓力、還存在較大的時間誤差

解決:rabbitmq的消息TTL和死信Exchange結合
介紹
1.何為消息TTL、死信
死信:對消息設置的過期時間到了,這個消息還沒有被消費就認為這個消息死了,死了的消息會進入死信交換機(Dead Letter Exchanges)
成為死信的三種條件:
一個消息被Consumer拒收了,并且reject方法的參數(shù)里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。(basic.reject/ basic.nack)requeue=false
上面的消息的TTL到了,消息過期了。
隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上
消息TTL:消息的TTL就是消息的存活時間
RabbitMQ可以對隊列和消息都設置過期時間,但代表的都是一個意思,只要消息在設置時間內(nèi)沒有消費,消息就死了,就被稱為死信
如果隊列和消息都設置了過期時間,那么就取時間最小的,單個消息的過期時間才是延時隊列的關鍵
2.如何運作
設置隊列過期時間

消費者P會通過一個路由鍵deal.message發(fā)送消息給X交換機,然后繼續(xù)發(fā)送給delay queau隊列,這個隊列比較特殊,設置了過期時間5分鐘過期,還設置了x-dead-letter-exchange用于指定下一個接收的交換機,消息過期之后會成為死信直接進入delay.exchange交換機,利用x-dead-letter-routing-key綁定的路由鍵找到下一個隊列,這時候只需要有人監(jiān)聽這個隊列。
設置消息過期時間

消費者發(fā)送一個消息,設置了5分鐘過期時間,最后交給了延時隊列,延時隊列說消息死了不要亂放,指定了一個死信路由,用于找到下一個隊列的路由鍵,等到五分鐘后服務器會自動檢查是否過期,過期的話會交給delay.exchange路由,最后再交給delay.message
代碼模擬

下訂單成功先發(fā)動給order-event-exchange,order-event-exchange綁定了兩個路由鍵order.create.order、order.release.order,根據(jù)order.create.order路由鍵找到order.delay.queue隊列,這是一個特殊的隊列,上圖所訴,消息的存活時間為一分鐘,消息在order.delay.queue隊列中沒人使用變成死信了,交給order-event-exchange交換機,最后通過order.release.order綁定關系找到了order.release.order.queue隊列
@Configuration
public?class?MyMQConfig?{
????//監(jiān)聽最后一個隊列,獲取那些過期的訂單消息
????@RabbitListener(queues?=?"order.release.order.queue")
????public??void??listerner(OrderEntity?orderEntity,Channel?channel,Message?message)?throws?IOException?{
????????System.out.println("收到過期訂單信息"+orderEntity.getOrderSn());
????????channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
????}
????//特殊隊列
????@Bean
????public?Queue?orderDelayQueue()?{
????????Map?arguments?=?new?HashMap<>();
????????arguments.put("x-dead-letter-exchange",?"order-event-exchange");
????????arguments.put("x-dead-letter-routing-key",?"order.release.order");
????????arguments.put("x-message-ttl",?60000);
????????Queue?orderDelayQueue?=?new?Queue("order.delay.queue",?true,?false,?false,?arguments);
????????return?orderDelayQueue;
????}
????//最后接收死信消息的隊列
????@Bean
????public?Queue?orderReleaseQueue()?{
????????return?new?Queue("order.release.order.queue",?true,?false,?false);
????}
???//事件交換機
????@Bean
????public?Exchange?orderEventExchange()?{
????????return?new?TopicExchange("order-event-exchange",?true,?false);
????}
????//綁定order.delay.queue隊列和的order-event-exchange交換機的路由鍵
????@Bean
????public?Binding?orderCreateBingding()?{
????????return?new?Binding("order.delay.queue",?Binding.DestinationType.QUEUE,?"order-event-exchange",?"order.create.order",?null);
????}
????//綁定order.release.order.queue隊列和的order-event-exchange交換機的路由鍵
????@Bean
????public?Binding?orderReleaseBingding()?{
????????return?new?Binding("order.release.order.queue",?Binding.DestinationType.QUEUE,?"order-event-exchange",?"order.release.order",?null);
????}
?
}
測試
????@Autowired
????RabbitTemplate?rabbitTemplate;
????
????@ResponseBody
????@GetMapping("/test/createOrder")
????public?String?createOrderTest(){
????????OrderEntity?entity?=?new?OrderEntity();
????????entity.setOrderSn(UUID.randomUUID().toString());
????????entity.setModifyTime(new?Date());
????????rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",entity);
????????return?"ok";
????}
庫存解鎖實際場景
在庫存服務有個stock-event-exchange交換機,如果我們想要解鎖庫存,
1、首先訂單下成功、庫存鎖定成功
2、鎖定成功就要通過stock.locked路由鍵發(fā)送一個消息給交換機stock-event-exchange,消息內(nèi)容包括哪個訂單、哪些商品、多少庫存等等
3、交換機通過綁定關系再發(fā)送給延時隊列stock.delay.queue
4、訂單可能需要30分鐘才會自動關閉,50分鐘之后來檢查庫存,就會知道訂單支付沒有
5、50分鐘消息沒有被消息,就變?yōu)樗佬牛ㄟ^stock.release路由鍵綁定關系交給stock-event-exchange交換機
6、stock-event-exchange交換機通過stock.release路由鍵綁定關系找到strock.relelase.stock.queue隊列
7、所有的解鎖庫存服務就監(jiān)聽這個隊列里的消息,只要這個隊列里消息能夠到達的都是超時沒有支付訂單的
下單遠程鎖定庫存,然后將倉庫鎖定庫存的數(shù)據(jù)發(fā)給訂單,當在訂單下單失敗時,由于不是分布式事務,訂單回滾,但倉庫不回滾,所以訂單一失敗,就需要通過訂單拿到mq中倉庫傳來的數(shù)據(jù)通知倉庫解鎖庫存
庫存解鎖場景:
1、下訂單成功,訂單過期沒有支付被系統(tǒng)自動取消或者用戶手動取消,都要解鎖庫存
2、下訂單成功、庫存鎖定成功,但是業(yè)務調用失敗導致訂單回滾,之前鎖定的庫存就自動解鎖,Seata分布式事務太慢,就要用一段時間后自動解決庫存。
3、訂單失敗,因為鎖庫存失敗有一個商品沒有鎖成功,導致整個鎖庫存服務都回滾,
消息隊列收到庫存消息場景
消息隊列收到消息之后
如果沒有查到數(shù)據(jù)庫有鎖定成功的數(shù)據(jù),說明庫存鎖失敗了,鎖庫存自動回滾,數(shù)據(jù)庫查不到記錄無需解鎖
如果查到有數(shù)據(jù),就說明庫存鎖定成功了
沒有這個訂單必須解鎖庫存
有訂單,訂單沒人支付失效了才能解鎖庫存
定時關閉訂單實際場景

同上原理類似也是利用死信路由,訂單創(chuàng)建后,默認放入延時隊列,也就是訂單的有效時間,超過這個時間沒有支付或者用戶主動取消都會導致訂單信息進入order.release.order.queue隊列,最后被釋放
? 作者?|??cg-ww
來源 |??w.cnblogs.com/cg-ww/p/15449767.html

