領(lǐng)導(dǎo)看了我寫(xiě)的關(guān)閉超時(shí)訂單,讓我出門(mén)左轉(zhuǎn)!

作者:阿Q
本文來(lái)源于阿Q說(shuō)代碼
前幾天領(lǐng)導(dǎo)突然宣布幾年前停用的電商項(xiàng)目又重新啟動(dòng)了,帶著復(fù)雜的心情仔細(xì)賞閱“兒時(shí)”的代碼,心中的酸楚只有自己能夠體會(huì)。
這不,昨天又被領(lǐng)導(dǎo)叫進(jìn)了“小黑屋”,讓我把代碼重構(gòu)下進(jìn)行升級(jí)??吹竭@么“可愛(ài)”的代碼,心中一萬(wàn)只“xx馬”疾馳而過(guò)。
讓我最深?lèi)和从X(jué)的就是里邊竟然用定時(shí)任務(wù)實(shí)現(xiàn)了“關(guān)閉超時(shí)訂單”的功能,現(xiàn)在想來(lái),哭笑不得。我們先分析一波為什么大家都在抵制用定時(shí)任務(wù)來(lái)實(shí)現(xiàn)該功能。
定時(shí)任務(wù)
關(guān)閉超時(shí)訂單是在創(chuàng)建訂單之后的一段時(shí)間內(nèi)未完成支付而關(guān)閉訂單的操作,該功能一般要求每筆訂單的超時(shí)時(shí)間是一致的。
如果我們使用定時(shí)任務(wù)來(lái)進(jìn)行該操作,很難把握定時(shí)任務(wù)輪詢(xún)的時(shí)間間隔:
時(shí)間間隔足夠小,在誤差允許的范圍內(nèi)可以達(dá)到我們說(shuō)的時(shí)間一致性問(wèn)題,但是頻繁掃描數(shù)據(jù)庫(kù),執(zhí)行定時(shí)任務(wù),會(huì)造成網(wǎng)絡(luò)IO和磁盤(pán)IO的消耗,對(duì)實(shí)時(shí)交易造成一定的沖擊; 時(shí)間間隔比較大,由于每個(gè)訂單創(chuàng)建的時(shí)間不一致,所以上邊的一致性要求很難達(dá)到,舉例如下:

假設(shè)30分鐘訂單超時(shí)自動(dòng)關(guān)閉,定時(shí)任務(wù)的執(zhí)行間隔時(shí)間為30分鐘:
我們?cè)诘?分鐘進(jìn)行下單操作; 當(dāng)時(shí)間來(lái)到第30分鐘時(shí),定時(shí)任務(wù)執(zhí)行一次,但是我們的訂單未滿足條件,不執(zhí)行; 當(dāng)時(shí)間來(lái)到第35分鐘時(shí),訂單達(dá)到關(guān)閉條件,但是定時(shí)任務(wù)未執(zhí)行,所以不執(zhí)行; 當(dāng)時(shí)間來(lái)到第60分鐘時(shí),開(kāi)始執(zhí)行我們的訂單關(guān)閉操作,而此時(shí),誤差達(dá)到25分鐘。
經(jīng)此種種,我們需要舍棄該方式。
延時(shí)隊(duì)列
為了滿足領(lǐng)導(dǎo)的需求,我便將手伸向了消息隊(duì)列:RabbitMQ。盡管它本身并沒(méi)有提供延時(shí)隊(duì)列的功能,但是我們可以利用它的存活時(shí)間和死信交換機(jī)的特性來(lái)間接實(shí)現(xiàn)。
首先我們先來(lái)簡(jiǎn)單介紹下什么是存活時(shí)間?什么是死信交換機(jī)?
存活時(shí)間
存活時(shí)間的全拼是Time To Live,簡(jiǎn)稱(chēng) TTL。它既支持對(duì)消息本身進(jìn)行設(shè)置(延遲隊(duì)列的關(guān)鍵),又支持對(duì)隊(duì)列進(jìn)行設(shè)置(該隊(duì)列中所有消息存在相同的過(guò)期時(shí)間)。
對(duì)消息本身進(jìn)行設(shè)置:即使消息過(guò)期,也不會(huì)馬上從隊(duì)列中抹去,因?yàn)槊織l消息是否過(guò)期是在即將投遞到消費(fèi)者之前判定的; 對(duì)隊(duì)列進(jìn)行設(shè)置:一旦消息過(guò)期,就會(huì)從隊(duì)列中抹去;
如果同時(shí)使用這兩種方法,那么以過(guò)期時(shí)間小的那個(gè)數(shù)值為準(zhǔn)。當(dāng)消息達(dá)到過(guò)期時(shí)間還沒(méi)有被消費(fèi),那么該消息就“死了”,我們把它稱(chēng)為 死信 消息。
消息變?yōu)樗佬诺臈l件:
消息被拒絕( basic.reject/basic.nack),并且requeue=false;消息的過(guò)期時(shí)間到期了; 隊(duì)列達(dá)到最大長(zhǎng)度;
隊(duì)列設(shè)置注意事項(xiàng)
隊(duì)列中該屬性的設(shè)置要在第一次聲明隊(duì)列的時(shí)候設(shè)置才有效,如果隊(duì)列一開(kāi)始已存在且沒(méi)有這個(gè)屬性,則要?jiǎng)h掉隊(duì)列再重新聲明才可以; 隊(duì)列的 ttl只能被設(shè)置為某個(gè)固定的值,一旦設(shè)置后則不能更改,否則會(huì)拋出異常;
死信交換機(jī)
死信交換機(jī)全拼Dead-Letter-Exchange,簡(jiǎn)稱(chēng)DLX。
當(dāng)消息在一個(gè)隊(duì)列中變成死信之后,如果這個(gè)消息所在的隊(duì)列設(shè)置了x-dead-letter-exchange參數(shù),那么它會(huì)被發(fā)送到x-dead-letter-exchange對(duì)應(yīng)值的交換機(jī)上,這個(gè)交換機(jī)就稱(chēng)之為死信交換機(jī),與這個(gè)死信交換器綁定的隊(duì)列就是死信隊(duì)列。
x-dead-letter-exchange:出現(xiàn)死信之后將死信重新發(fā)送到指定交換機(jī);x-dead-letter-routing-key:出現(xiàn)死信之后將死信重新按照指定的routing-key發(fā)送,如果不設(shè)置默認(rèn)使用消息本身的routing-key
死信隊(duì)列與普通隊(duì)列的區(qū)別就是它的
RoutingKey和Exchange需要作為參數(shù),綁定到正常的隊(duì)列上。
實(shí)戰(zhàn)教學(xué)
先來(lái)張圖感受下我們的整體思路

生產(chǎn)者發(fā)送帶有 ttl的消息放入交換機(jī)路由到延時(shí)隊(duì)列中;在延時(shí)隊(duì)列中綁定死信交換機(jī)與死信轉(zhuǎn)發(fā)的 routing-key;等延時(shí)隊(duì)列中的消息達(dá)到延時(shí)時(shí)間之后變成死信轉(zhuǎn)發(fā)到死信交換機(jī)并路由到死信隊(duì)列中; 最后供消費(fèi)者消費(fèi)。
我們?cè)?a style="font-weight: bold;color: rgb(239, 112, 96);border-bottom: 1px solid rgb(239, 112, 96);" data-linktype="2">上文的基礎(chǔ)上進(jìn)行代碼實(shí)現(xiàn):
配置類(lèi)
@Configuration
public?class?DelayQueueRabbitConfig?{
????public?static?final?String?DLX_QUEUE?=?"queue.dlx";//死信隊(duì)列
????public?static?final?String?DLX_EXCHANGE?=?"exchange.dlx";//死信交換機(jī)
????public?static?final?String?DLX_ROUTING_KEY?=?"routingkey.dlx";//死信隊(duì)列與死信交換機(jī)綁定的routing-key
????public?static?final?String?ORDER_QUEUE?=?"queue.order";//訂單的延時(shí)隊(duì)列
????public?static?final?String?ORDER_EXCHANGE?=?"exchange.order";//訂單交換機(jī)
????public?static?final?String?ORDER_ROUTING_KEY?=?"routingkey.order";//延時(shí)隊(duì)列與訂單交換機(jī)綁定的routing-key
?/**
?????*?定義死信隊(duì)列
?????**/
????@Bean
????public?Queue?dlxQueue(){
????????return?new?Queue(DLX_QUEUE,true);
????}
????/**
?????*?定義死信交換機(jī)
?????**/
????@Bean
????public?DirectExchange?dlxExchange(){
????????return?new?DirectExchange(DLX_EXCHANGE,?true,?false);
????}
????/**
?????*?死信隊(duì)列和死信交換機(jī)綁定
?????*?設(shè)置路由鍵:routingkey.dlx
?????**/
????@Bean
????Binding?bindingDLX(){
????????return?BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);
????}
????/**
?????*?訂單延時(shí)隊(duì)列
?????*?設(shè)置隊(duì)列里的死信轉(zhuǎn)發(fā)到的DLX名稱(chēng)
?????*?設(shè)置死信在轉(zhuǎn)發(fā)時(shí)攜帶的?routing-key?名稱(chēng)
?????**/
????@Bean
????public?Queue?orderQueue()?{
????????Map?params?=?new?HashMap<>();
????????params.put("x-dead-letter-exchange",?DLX_EXCHANGE);
????????params.put("x-dead-letter-routing-key",?DLX_ROUTING_KEY);
????????return?new?Queue(ORDER_QUEUE,?true,?false,?false,?params);
????}
????/**
?????*?訂單交換機(jī)
?????**/
????@Bean
????public?DirectExchange?orderExchange()?{
????????return?new?DirectExchange(ORDER_EXCHANGE,?true,?false);
????}
????/**
?????*?把訂單隊(duì)列和訂單交換機(jī)綁定在一起
?????**/
????@Bean
????public?Binding?orderBinding()?{
????????return?BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);
????}
}
發(fā)送消息
@RequestMapping("/order")
public?class?OrderSendMessageController?{
????@Autowired
????private?RabbitTemplate?rabbitTemplate;
????@GetMapping("/sendMessage")
????public?String?sendMessage(){
????????String?delayTime?=?"10000";
????????//將消息攜帶路由鍵值
????????rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,?DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
????????????????"發(fā)送消息!",message->{
????????????message.getMessageProperties().setExpiration(delayTime);
????????????return?message;
????????});
????????return?"ok";
????}
}
消費(fèi)消息
@Component
@RabbitListener(queues?=?DelayQueueRabbitConfig.DLX_QUEUE)//監(jiān)聽(tīng)隊(duì)列名稱(chēng)
public?class?OrderMQReciever?{
????@RabbitHandler
????public?void?process(String?message){
????????System.out.println("OrderMQReciever接收到的消息是:"+?message);
????}
}
測(cè)試
通過(guò)調(diào)用接口,發(fā)現(xiàn)10秒之后才會(huì)消費(fèi)消息

問(wèn)題升級(jí)
由于開(kāi)發(fā)環(huán)境和測(cè)試環(huán)境使用的是同一個(gè)交換機(jī)和隊(duì)列,所以發(fā)送的延時(shí)時(shí)間都是30分鐘。但是為了在測(cè)試環(huán)境讓測(cè)試同學(xué)方便測(cè)試,故手動(dòng)將測(cè)試環(huán)境的時(shí)間改為了1分鐘。
問(wèn)題復(fù)現(xiàn)
接著問(wèn)題就來(lái)了:延時(shí)時(shí)間為1分鐘的消息并沒(méi)有立即被消費(fèi),而是等30分鐘的消息被消費(fèi)完之后才被消費(fèi)了。至于原因,我們下邊再分析,先用代碼來(lái)給大家復(fù)現(xiàn)下該問(wèn)題。
@GetMapping("/sendManyMessage")
public?String?sendManyMessage(){
????send("延遲消息睡10秒",10000+"");
????send("延遲消息睡2秒",2000+"");
????send("延遲消息睡5秒",5000+"");
????return?"ok";
}
private?void?send(String?msg,?String?delayTime){
?rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,?
??????????????????????????????????DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
??????????????????????????????????msg,message->{
??????????????????????????????????????message.getMessageProperties().setExpiration(delayTime);
??????????????????????????????????????return?message;
??????????????????????????????????});
}
執(zhí)行結(jié)果如下:
OrderMQReciever接收到的消息是:延遲消息睡10秒
OrderMQReciever接收到的消息是:延遲消息睡2秒
OrderMQReciever接收到的消息是:延遲消息睡5秒
原因就是延時(shí)隊(duì)列也滿足隊(duì)列先進(jìn)先出的特征,當(dāng)10秒的消息未出隊(duì)列時(shí),后邊的消息不能順利出隊(duì),造成后邊的消息阻塞了,未能達(dá)到精準(zhǔn)延時(shí)。
問(wèn)題解決
我們可以利用x-delay-message插件來(lái)解決該問(wèn)題
消息的延遲范圍是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被設(shè)置的范圍為 (2^32)-1 毫秒)

生產(chǎn)者發(fā)送消息到交換機(jī)時(shí),并不會(huì)立即進(jìn)入,而是先將消息持久化到 Mnesia(一個(gè)分布式數(shù)據(jù)庫(kù)管理系統(tǒng));插件將會(huì)嘗試確認(rèn)消息是否過(guò)期; 如果消息過(guò)期,消息會(huì)通過(guò) x-delayed-type類(lèi)型標(biāo)記的交換機(jī)投遞至目標(biāo)隊(duì)列,供消費(fèi)者消費(fèi);
實(shí)踐
官網(wǎng)下載:https://www.rabbitmq.com/community-plugins.html
我這邊使用的是v3.8.0.ez,將文件下載下來(lái)放到服務(wù)器的/usr/local/soft/rabbitmq_server-3.7.14/plugins 路徑下,執(zhí)行rabbitmq-plugins enable rabbitmq_delayed_message_exchange命令即可。


出現(xiàn)如圖所示,代表安裝成功。
配置類(lèi)
@Configuration
public?class?XDelayedMessageConfig?{
????public?static?final?String?DIRECT_QUEUE?=?"queue.direct";//隊(duì)列
????public?static?final?String?DELAYED_EXCHANGE?=?"exchange.delayed";//延遲交換機(jī)
????public?static?final?String?ROUTING_KEY?=?"routingkey.bind";//綁定的routing-key
????/**
?????*?定義隊(duì)列
?????**/
????@Bean
????public?Queue?directQueue(){
????????return?new?Queue(DIRECT_QUEUE,true);
????}
????/**
?????*?定義延遲交換機(jī)
?????*?args:根據(jù)該參數(shù)進(jìn)行靈活路由,設(shè)置為“direct”,意味著該插件具有與直連交換機(jī)具有相同的路由行為,
?????*?如果想要不同的路由行為,可以更換現(xiàn)有的交換類(lèi)型如:“topic”
?????*?交換機(jī)類(lèi)型為?x-delayed-message
?????**/
????@Bean
????public?CustomExchange?delayedExchange(){
????????Map?args?=?new?HashMap();
????????args.put("x-delayed-type",?"direct");
????????return?new?CustomExchange(DELAYED_EXCHANGE,?"x-delayed-message",?true,?false,?args);
????}
????/**
?????*?隊(duì)列和延遲交換機(jī)綁定
?????**/
????@Bean
????public?Binding?orderBinding()?{
????????return?BindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
????}
}
發(fā)送消息
@RestController
@RequestMapping("/delayed")
public?class?DelayedSendMessageController?{
????@Autowired
????private?RabbitTemplate?rabbitTemplate;
????@GetMapping("/sendManyMessage")
????public?String?sendManyMessage(){
????????send("延遲消息睡10秒",10000);
????????send("延遲消息睡2秒",2000);
????????send("延遲消息睡5秒",5000);
????????return?"ok";
????}
????private?void?send(String?msg,?Integer?delayTime){
????????//將消息攜帶路由鍵值
????????rabbitTemplate.convertAndSend(
????????????????XDelayedMessageConfig.DELAYED_EXCHANGE,
????????????????XDelayedMessageConfig.ROUTING_KEY,
????????????????msg,
????????????????message->{
????????????????????message.getMessageProperties().setDelay(delayTime);
????????????????????return?message;
????????????????});
????}
}
消費(fèi)消息
@Component
@RabbitListener(queues?=?XDelayedMessageConfig.DIRECT_QUEUE)//監(jiān)聽(tīng)隊(duì)列名稱(chēng)
public?class?DelayedMQReciever?{
????@RabbitHandler
????public?void?process(String?message){
????????System.out.println("DelayedMQReciever接收到的消息是:"+?message);
????}
}
測(cè)試
DelayedMQReciever接收到的消息是:延遲消息睡2秒
DelayedMQReciever接收到的消息是:延遲消息睡5秒
DelayedMQReciever接收到的消息是:延遲消息睡10秒
這樣我們的問(wèn)題就順利解決了。
局限性
延遲的消息存儲(chǔ)在一個(gè)Mnesia表中,當(dāng)前節(jié)點(diǎn)上只有一個(gè)磁盤(pán)副本,它們將在節(jié)點(diǎn)重啟后存活。
雖然觸發(fā)計(jì)劃交付的計(jì)時(shí)器不會(huì)持久化,但它將在節(jié)點(diǎn)啟動(dòng)時(shí)的插件激活期間重新初始化。顯然,集群中只有一個(gè)預(yù)定消息的副本意味著丟失該節(jié)點(diǎn)或禁用其上的插件將丟失駐留在該節(jié)點(diǎn)上的消息。
該插件的當(dāng)前設(shè)計(jì)并不適合延遲消息數(shù)量較多的場(chǎng)景(如數(shù)萬(wàn)條或數(shù)百萬(wàn)條),另外該插件的一個(gè)可變性來(lái)源是依賴(lài)于 Erlang 計(jì)時(shí)器,在系統(tǒng)中使用了一定數(shù)量的長(zhǎng)時(shí)間計(jì)時(shí)器之后,它們開(kāi)始爭(zhēng)用調(diào)度程序資源,并且時(shí)間漂移不斷累積。
項(xiàng)目源碼地址:https://gitee.com/zhangxiaoQ/rabbit-mq




