<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          領(lǐng)導(dǎo)看了同事寫的關(guān)閉超時(shí)訂單,讓他出門左轉(zhuǎn)!

          共 7923字,需瀏覽 16分鐘

           ·

          2022-04-24 19:57

          前幾天領(lǐng)導(dǎo)突然宣布幾年前停用的電商項(xiàng)目又重新啟動(dòng)了,帶著復(fù)雜的心情仔細(xì)賞閱“兒時(shí)”的代碼,心中的酸楚只有自己能夠體會(huì)。

          這不,昨天又被領(lǐng)導(dǎo)叫進(jìn)了“小黑屋”,讓我把代碼重構(gòu)下進(jìn)行升級(jí)??吹竭@么“可愛”的代碼,心中一萬(wàn)只“xx馬”疾馳而過。

          讓我最深惡痛覺的就是里邊竟然用定時(shí)任務(wù)實(shí)現(xiàn)了“關(guān)閉超時(shí)訂單”的功能,現(xiàn)在想來(lái),哭笑不得。我們先分析一波為什么大家都在抵制用定時(shí)任務(wù)來(lái)實(shí)現(xiàn)該功能。

          定時(shí)任務(wù)

          關(guān)閉超時(shí)訂單是在創(chuàng)建訂單之后的一段時(shí)間內(nèi)未完成支付而關(guān)閉訂單的操作,該功能一般要求每筆訂單的超時(shí)時(shí)間是一致的。

          如果我們使用定時(shí)任務(wù)來(lái)進(jìn)行該操作,很難把握定時(shí)任務(wù)輪詢的時(shí)間間隔:

          • 時(shí)間間隔足夠小,在誤差允許的范圍內(nèi)可以達(dá)到我們說(shuō)的時(shí)間一致性問題,但是頻繁掃描數(shù)據(jù)庫(kù),執(zhí)行定時(shí)任務(wù),會(huì)造成網(wǎng)絡(luò)IO和磁盤IO的消耗,對(duì)實(shí)時(shí)交易造成一定的沖擊;
          • 時(shí)間間隔比較大,由于每個(gè)訂單創(chuàng)建的時(shí)間不一致,所以上邊的一致性要求很難達(dá)到,舉例如下:

          假設(shè)30分鐘訂單超時(shí)自動(dòng)關(guān)閉,定時(shí)任務(wù)的執(zhí)行間隔時(shí)間為30分鐘:

          1. 我們?cè)诘?分鐘進(jìn)行下單操作;
          2. 當(dāng)時(shí)間來(lái)到第30分鐘時(shí),定時(shí)任務(wù)執(zhí)行一次,但是我們的訂單未滿足條件,不執(zhí)行;
          3. 當(dāng)時(shí)間來(lái)到第35分鐘時(shí),訂單達(dá)到關(guān)閉條件,但是定時(shí)任務(wù)未執(zhí)行,所以不執(zhí)行;
          4. 當(dāng)時(shí)間來(lái)到第60分鐘時(shí),開始執(zhí)行我們的訂單關(guān)閉操作,而此時(shí),誤差達(dá)到25分鐘。

          經(jīng)此種種,我們需要舍棄該方式。

          延時(shí)隊(duì)列

          為了滿足領(lǐng)導(dǎo)的需求,我便將手伸向了消息隊(duì)列:RabbitMQ。盡管它本身并沒有提供延時(shí)隊(duì)列的功能,但是我們可以利用它的存活時(shí)間和死信交換機(jī)的特性來(lái)間接實(shí)現(xiàn)。

          首先我們先來(lái)簡(jiǎn)單介紹下什么是存活時(shí)間?什么是死信交換機(jī)?

          存活時(shí)間

          存活時(shí)間的全拼是Time To Live,簡(jiǎn)稱 TTL。它既支持對(duì)消息本身進(jìn)行設(shè)置(延遲隊(duì)列的關(guān)鍵),又支持對(duì)隊(duì)列進(jìn)行設(shè)置(該隊(duì)列中所有消息存在相同的過期時(shí)間)。

          • 對(duì)消息本身進(jìn)行設(shè)置:即使消息過期,也不會(huì)馬上從隊(duì)列中抹去,因?yàn)槊織l消息是否過期是在即將投遞到消費(fèi)者之前判定的;
          • 對(duì)隊(duì)列進(jìn)行設(shè)置:一旦消息過期,就會(huì)從隊(duì)列中抹去;

          如果同時(shí)使用這兩種方法,那么以過期時(shí)間的那個(gè)數(shù)值為準(zhǔn)。當(dāng)消息達(dá)到過期時(shí)間還沒有被消費(fèi),那么該消息就“死了”,我們把它稱為 死信 消息。

          消息變?yōu)樗佬诺臈l件:

          • 消息被拒絕(basic.reject/basic.nack),并且requeue=false;
          • 消息的過期時(shí)間到期了;
          • 隊(duì)列達(dá)到最大長(zhǎng)度;

          隊(duì)列設(shè)置注意事項(xiàng)

          1. 隊(duì)列中該屬性的設(shè)置要在第一次聲明隊(duì)列的時(shí)候設(shè)置才有效,如果隊(duì)列一開始已存在且沒有這個(gè)屬性,則要?jiǎng)h掉隊(duì)列再重新聲明才可以;
          2. 隊(duì)列的 ttl 只能被設(shè)置為某個(gè)固定的值,一旦設(shè)置后則不能更改,否則會(huì)拋出異常;

          死信交換機(jī)

          死信交換機(jī)全拼Dead-Letter-Exchange,簡(jiǎn)稱DLX。

          當(dāng)消息在一個(gè)隊(duì)列中變成死信之后,如果這個(gè)消息所在的隊(duì)列設(shè)置了x-dead-letter-exchange參數(shù),那么它會(huì)被發(fā)送到x-dead-letter-exchange對(duì)應(yīng)值的交換機(jī)上,這個(gè)交換機(jī)就稱之為死信交換機(jī),與這個(gè)死信交換器綁定的隊(duì)列就是死信隊(duì)列。

          • x-dead-letter-exchange:出現(xiàn)死信之后將死信重新發(fā)送到指定交換機(jī);
          • x-dead-letter-routing-key:出現(xiàn)死信之后將死信重新按照指定的routing-key發(fā)送,如果不設(shè)置默認(rèn)使用消息本身的routing-key

          死信隊(duì)列與普通隊(duì)列的區(qū)別就是它的RoutingKeyExchange需要作為參數(shù),綁定到正常的隊(duì)列上。

          實(shí)戰(zhàn)教學(xué)

          先來(lái)張圖感受下我們的整體思路

          1. 生產(chǎn)者發(fā)送帶有 ttl 的消息放入交換機(jī)路由到延時(shí)隊(duì)列中;
          2. 在延時(shí)隊(duì)列中綁定死信交換機(jī)與死信轉(zhuǎn)發(fā)的routing-key;
          3. 等延時(shí)隊(duì)列中的消息達(dá)到延時(shí)時(shí)間之后變成死信轉(zhuǎn)發(fā)到死信交換機(jī)并路由到死信隊(duì)列中;
          4. 最后供消費(fèi)者消費(fèi)。

          我們?cè)谏衔牡幕A(chǔ)上進(jìn)行代碼實(shí)現(xiàn):

          配置類

          @Configuration
          public?class?DelayQueueRabbitConfig?{

          ????public?static?final?String?DLX_QUEUE?=?"queue.dlx";//死信隊(duì)列
          ????public?static?final?String?DLX_EXCHANGE?=?"exchange.dlx";//死信交換機(jī)
          ????public?static?final?String?DLX_ROUTING_KEY?=?"routingkey.dlx";//死信隊(duì)列與死信交換機(jī)綁定的routing-key

          ????public?static?final?String?ORDER_QUEUE?=?"queue.order";//訂單的延時(shí)隊(duì)列
          ????public?static?final?String?ORDER_EXCHANGE?=?"exchange.order";//訂單交換機(jī)
          ????public?static?final?String?ORDER_ROUTING_KEY?=?"routingkey.order";//延時(shí)隊(duì)列與訂單交換機(jī)綁定的routing-key

          ?/**
          ?????*?定義死信隊(duì)列
          ?????**/

          ????@Bean
          ????public?Queue?dlxQueue(){
          ????????return?new?Queue(DLX_QUEUE,true);
          ????}

          ????/**
          ?????*?定義死信交換機(jī)
          ?????**/

          ????@Bean
          ????public?DirectExchange?dlxExchange(){
          ????????return?new?DirectExchange(DLX_EXCHANGE,?true,?false);
          ????}


          ????/**
          ?????*?死信隊(duì)列和死信交換機(jī)綁定
          ?????*?設(shè)置路由鍵:routingkey.dlx
          ?????**/

          ????@Bean
          ????Binding?bindingDLX(){
          ????????return?BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);
          ????}


          ????/**
          ?????*?訂單延時(shí)隊(duì)列
          ?????*?設(shè)置隊(duì)列里的死信轉(zhuǎn)發(fā)到的DLX名稱
          ?????*?設(shè)置死信在轉(zhuǎn)發(fā)時(shí)攜帶的?routing-key?名稱
          ?????**/

          ????@Bean
          ????public?Queue?orderQueue()?{
          ????????Map?params?=?new?HashMap<>();
          ????????params.put("x-dead-letter-exchange",?DLX_EXCHANGE);
          ????????params.put("x-dead-letter-routing-key",?DLX_ROUTING_KEY);
          ????????return?new?Queue(ORDER_QUEUE,?true,?false,?false,?params);
          ????}

          ????/**
          ?????*?訂單交換機(jī)
          ?????**/

          ????@Bean
          ????public?DirectExchange?orderExchange()?{
          ????????return?new?DirectExchange(ORDER_EXCHANGE,?true,?false);
          ????}

          ????/**
          ?????*?把訂單隊(duì)列和訂單交換機(jī)綁定在一起
          ?????**/

          ????@Bean
          ????public?Binding?orderBinding()?{
          ????????return?BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);
          ????}
          }

          發(fā)送消息

          @RequestMapping("/order")
          public?class?OrderSendMessageController?{

          ????@Autowired
          ????private?RabbitTemplate?rabbitTemplate;

          ????@GetMapping("/sendMessage")
          ????public?String?sendMessage(){

          ????????String?delayTime?=?"10000";
          ????????//將消息攜帶路由鍵值
          ????????rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,?DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
          ????????????????"發(fā)送消息!",message->{
          ????????????message.getMessageProperties().setExpiration(delayTime);
          ????????????return?message;
          ????????});
          ????????return?"ok";
          ????}

          }

          消費(fèi)消息

          @Component
          @RabbitListener(queues?=?DelayQueueRabbitConfig.DLX_QUEUE)//監(jiān)聽隊(duì)列名稱
          public?class?OrderMQReciever?{

          ????@RabbitHandler
          ????public?void?process(String?message){
          ????????System.out.println("OrderMQReciever接收到的消息是:"+?message);
          ????}
          }

          測(cè)試

          通過調(diào)用接口,發(fā)現(xiàn)10秒之后才會(huì)消費(fèi)消息

          問題升級(jí)

          由于開發(fā)環(huán)境和測(cè)試環(huán)境使用的是同一個(gè)交換機(jī)和隊(duì)列,所以發(fā)送的延時(shí)時(shí)間都是30分鐘。但是為了在測(cè)試環(huán)境讓測(cè)試同學(xué)方便測(cè)試,故手動(dòng)將測(cè)試環(huán)境的時(shí)間改為了1分鐘。

          問題復(fù)現(xiàn)

          接著問題就來(lái)了:延時(shí)時(shí)間為1分鐘的消息并沒有立即被消費(fèi),而是等30分鐘的消息被消費(fèi)完之后才被消費(fèi)了。至于原因,我們下邊再分析,先用代碼來(lái)給大家復(fù)現(xiàn)下該問題。

          @GetMapping("/sendManyMessage")
          public?String?sendManyMessage(){
          ????send("延遲消息睡10秒",10000+"");
          ????send("延遲消息睡2秒",2000+"");
          ????send("延遲消息睡5秒",5000+"");
          ????return?"ok";
          }

          private?void?send(String?msg,?String?delayTime){
          ?rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,?
          ??????????????????????????????????DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
          ??????????????????????????????????msg,message->{
          ??????????????????????????????????????message.getMessageProperties().setExpiration(delayTime);
          ??????????????????????????????????????return?message;
          ??????????????????????????????????});
          }

          執(zhí)行結(jié)果如下:

          OrderMQReciever接收到的消息是:延遲消息睡10秒
          OrderMQReciever接收到的消息是:延遲消息睡2秒
          OrderMQReciever接收到的消息是:延遲消息睡5秒

          原因就是延時(shí)隊(duì)列也滿足隊(duì)列先進(jìn)先出的特征,當(dāng)10秒的消息未出隊(duì)列時(shí),后邊的消息不能順利出隊(duì),造成后邊的消息阻塞了,未能達(dá)到精準(zhǔn)延時(shí)。

          問題解決

          我們可以利用x-delay-message插件來(lái)解決該問題

          消息的延遲范圍是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被設(shè)置的范圍為 (2^32)-1 毫秒)

          1. 生產(chǎn)者發(fā)送消息到交換機(jī)時(shí),并不會(huì)立即進(jìn)入,而是先將消息持久化到 Mnesia(一個(gè)分布式數(shù)據(jù)庫(kù)管理系統(tǒng));
          2. 插件將會(huì)嘗試確認(rèn)消息是否過期;
          3. 如果消息過期,消息會(huì)通過 x-delayed-type 類型標(biāo)記的交換機(jī)投遞至目標(biāo)隊(duì)列,供消費(fèi)者消費(fèi);

          實(shí)踐

          官網(wǎng)下載:https://www.rabbitmq.com/community-plugins.html

          我這邊使用的是v3.8.0.ez,將文件下載下來(lái)放到服務(wù)器的/usr/local/soft/rabbitmq_server-3.7.14/plugins 路徑下,執(zhí)行rabbitmq-plugins enable rabbitmq_delayed_message_exchange命令即可。

          出現(xiàn)如圖所示,代表安裝成功。

          配置類

          @Configuration
          public?class?XDelayedMessageConfig?{

          ????public?static?final?String?DIRECT_QUEUE?=?"queue.direct";//隊(duì)列
          ????public?static?final?String?DELAYED_EXCHANGE?=?"exchange.delayed";//延遲交換機(jī)
          ????public?static?final?String?ROUTING_KEY?=?"routingkey.bind";//綁定的routing-key

          ????/**
          ?????*?定義隊(duì)列
          ?????**/

          ????@Bean
          ????public?Queue?directQueue(){
          ????????return?new?Queue(DIRECT_QUEUE,true);
          ????}

          ????/**
          ?????*?定義延遲交換機(jī)
          ?????*?args:根據(jù)該參數(shù)進(jìn)行靈活路由,設(shè)置為“direct”,意味著該插件具有與直連交換機(jī)具有相同的路由行為,
          ?????*?如果想要不同的路由行為,可以更換現(xiàn)有的交換類型如:“topic”
          ?????*?交換機(jī)類型為?x-delayed-message
          ?????**/

          ????@Bean
          ????public?CustomExchange?delayedExchange(){
          ????????Map?args?=?new?HashMap();
          ????????args.put("x-delayed-type",?"direct");
          ????????return?new?CustomExchange(DELAYED_EXCHANGE,?"x-delayed-message",?true,?false,?args);
          ????}

          ????/**
          ?????*?隊(duì)列和延遲交換機(jī)綁定
          ?????**/

          ????@Bean
          ????public?Binding?orderBinding()?{
          ????????return?BindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
          ????}

          }

          發(fā)送消息

          @RestController
          @RequestMapping("/delayed")
          public?class?DelayedSendMessageController?{

          ????@Autowired
          ????private?RabbitTemplate?rabbitTemplate;

          ????@GetMapping("/sendManyMessage")
          ????public?String?sendManyMessage(){

          ????????send("延遲消息睡10秒",10000);
          ????????send("延遲消息睡2秒",2000);
          ????????send("延遲消息睡5秒",5000);
          ????????return?"ok";
          ????}

          ????private?void?send(String?msg,?Integer?delayTime){
          ????????//將消息攜帶路由鍵值
          ????????rabbitTemplate.convertAndSend(
          ????????????????XDelayedMessageConfig.DELAYED_EXCHANGE,
          ????????????????XDelayedMessageConfig.ROUTING_KEY,
          ????????????????msg,
          ????????????????message->{
          ????????????????????message.getMessageProperties().setDelay(delayTime);
          ????????????????????return?message;
          ????????????????});
          ????}
          }

          消費(fèi)消息

          @Component
          @RabbitListener(queues?=?XDelayedMessageConfig.DIRECT_QUEUE)//監(jiān)聽隊(duì)列名稱
          public?class?DelayedMQReciever?{


          ????@RabbitHandler
          ????public?void?process(String?message){
          ????????System.out.println("DelayedMQReciever接收到的消息是:"+?message);
          ????}
          }

          測(cè)試

          DelayedMQReciever接收到的消息是:延遲消息睡2秒
          DelayedMQReciever接收到的消息是:延遲消息睡5秒
          DelayedMQReciever接收到的消息是:延遲消息睡10秒

          這樣我們的問題就順利解決了。

          局限性

          延遲的消息存儲(chǔ)在一個(gè)Mnesia表中,當(dāng)前節(jié)點(diǎn)上只有一個(gè)磁盤副本,它們將在節(jié)點(diǎn)重啟后存活。

          雖然觸發(fā)計(jì)劃交付的計(jì)時(shí)器不會(huì)持久化,但它將在節(jié)點(diǎn)啟動(dòng)時(shí)的插件激活期間重新初始化。顯然,集群中只有一個(gè)預(yù)定消息的副本意味著丟失該節(jié)點(diǎn)或禁用其上的插件將丟失駐留在該節(jié)點(diǎn)上的消息。

          該插件的當(dāng)前設(shè)計(jì)并不適合延遲消息數(shù)量較多的場(chǎng)景(如數(shù)萬(wàn)條或數(shù)百萬(wàn)條),另外該插件的一個(gè)可變性來(lái)源是依賴于 Erlang 計(jì)時(shí)器,在系統(tǒng)中使用了一定數(shù)量的長(zhǎng)時(shí)間計(jì)時(shí)器之后,它們開始爭(zhēng)用調(diào)度程序資源,并且時(shí)間漂移不斷累積。

          項(xiàng)目源碼地址:https://gitee.com/zhangxiaoQ/rabbit-mq

          程序汪資料鏈接

          程序汪接的7個(gè)私活都在這里,經(jīng)驗(yàn)整理

          Java項(xiàng)目分享 ?最新整理全集,找項(xiàng)目不累啦 07版

          堪稱神級(jí)的Spring Boot手冊(cè),從基礎(chǔ)入門到實(shí)戰(zhàn)進(jìn)階

          臥槽!字節(jié)跳動(dòng)《算法中文手冊(cè)》火了,完整版 PDF 開放下載!

          臥槽!阿里大佬總結(jié)的《圖解Java》火了,完整版PDF開放下載!

          字節(jié)跳動(dòng)總結(jié)的設(shè)計(jì)模式 PDF 火了,完整版開放下載!


          歡迎添加程序汪個(gè)人微信 itwang009? 進(jìn)粉絲群或圍觀朋友圈

          瀏覽 15
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  五月激情逼 | 国产三区在线观看视频 | 99久久久无码国产精精品品不卡 | 天天草天天搞 | 免费成年人在线 |