<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          領(lǐng)導(dǎo)看了我寫(xiě)的關(guān)閉超時(shí)訂單,讓我出門(mén)左轉(zhuǎn)!

          共 7880字,需瀏覽 16分鐘

           ·

          2022-04-28 17:47

          作者:阿Q

          本文來(lái)源于阿Q說(shuō)代碼


          前幾天領(lǐng)導(dǎo)突然宣布幾年前停用的電商項(xiàng)目又重新啟動(dòng)了,帶著復(fù)雜的心情仔細(xì)賞閱“兒時(shí)”的代碼,心中的酸楚只有自己能夠體會(huì)。

          這不,昨天又被領(lǐng)導(dǎo)叫進(jìn)了“小黑屋”,讓我把代碼重構(gòu)下進(jìn)行升級(jí)??吹竭@么“可愛(ài)”的代碼,心中一萬(wàn)只“xx馬”疾馳而過(guò)。

          讓我最深?lèi)和从X(jué)的就是里邊竟然用定時(shí)任務(wù)實(shí)現(xiàn)了“關(guān)閉超時(shí)訂單”的功能,現(xiàn)在想來(lái),哭笑不得。我們先分析一波為什么大家都在抵制用定時(shí)任務(wù)來(lái)實(shí)現(xiàn)該功能。

          定時(shí)任務(wù)

          關(guān)閉超時(shí)訂單是在創(chuàng)建訂單之后的一段時(shí)間內(nèi)未完成支付而關(guān)閉訂單的操作,該功能一般要求每筆訂單的超時(shí)時(shí)間是一致的。

          如果我們使用定時(shí)任務(wù)來(lái)進(jìn)行該操作,很難把握定時(shí)任務(wù)輪詢(xún)的時(shí)間間隔:

          • 時(shí)間間隔足夠小,在誤差允許的范圍內(nèi)可以達(dá)到我們說(shuō)的時(shí)間一致性問(wèn)題,但是頻繁掃描數(shù)據(jù)庫(kù),執(zhí)行定時(shí)任務(wù),會(huì)造成網(wǎng)絡(luò)IO和磁盤(pán)IO的消耗,對(duì)實(shí)時(shí)交易造成一定的沖擊;
          • 時(shí)間間隔比較大,由于每個(gè)訂單創(chuàng)建的時(shí)間不一致,所以上邊的一致性要求很難達(dá)到,舉例如下:

          假設(shè)30分鐘訂單超時(shí)自動(dòng)關(guān)閉,定時(shí)任務(wù)的執(zhí)行間隔時(shí)間為30分鐘:

          1. 我們?cè)诘?分鐘進(jìn)行下單操作;
          2. 當(dāng)時(shí)間來(lái)到第30分鐘時(shí),定時(shí)任務(wù)執(zhí)行一次,但是我們的訂單未滿足條件,不執(zhí)行;
          3. 當(dāng)時(shí)間來(lái)到第35分鐘時(shí),訂單達(dá)到關(guān)閉條件,但是定時(shí)任務(wù)未執(zhí)行,所以不執(zhí)行;
          4. 當(dāng)時(shí)間來(lái)到第60分鐘時(shí),開(kāi)始執(zhí)行我們的訂單關(guān)閉操作,而此時(shí),誤差達(dá)到25分鐘。

          經(jīng)此種種,我們需要舍棄該方式。

          延時(shí)隊(duì)列

          為了滿足領(lǐng)導(dǎo)的需求,我便將手伸向了消息隊(duì)列:RabbitMQ。盡管它本身并沒(méi)有提供延時(shí)隊(duì)列的功能,但是我們可以利用它的存活時(shí)間和死信交換機(jī)的特性來(lái)間接實(shí)現(xiàn)。

          首先我們先來(lái)簡(jiǎn)單介紹下什么是存活時(shí)間?什么是死信交換機(jī)?

          存活時(shí)間

          存活時(shí)間的全拼是Time To Live,簡(jiǎn)稱(chēng) TTL。它既支持對(duì)消息本身進(jìn)行設(shè)置(延遲隊(duì)列的關(guān)鍵),又支持對(duì)隊(duì)列進(jìn)行設(shè)置(該隊(duì)列中所有消息存在相同的過(guò)期時(shí)間)。

          • 對(duì)消息本身進(jìn)行設(shè)置:即使消息過(guò)期,也不會(huì)馬上從隊(duì)列中抹去,因?yàn)槊織l消息是否過(guò)期是在即將投遞到消費(fèi)者之前判定的;
          • 對(duì)隊(duì)列進(jìn)行設(shè)置:一旦消息過(guò)期,就會(huì)從隊(duì)列中抹去;

          如果同時(shí)使用這兩種方法,那么以過(guò)期時(shí)間的那個(gè)數(shù)值為準(zhǔn)。當(dāng)消息達(dá)到過(guò)期時(shí)間還沒(méi)有被消費(fèi),那么該消息就“死了”,我們把它稱(chēng)為 死信 消息。

          消息變?yōu)樗佬诺臈l件:

          • 消息被拒絕(basic.reject/basic.nack),并且requeue=false;
          • 消息的過(guò)期時(shí)間到期了;
          • 隊(duì)列達(dá)到最大長(zhǎng)度;

          隊(duì)列設(shè)置注意事項(xiàng)

          1. 隊(duì)列中該屬性的設(shè)置要在第一次聲明隊(duì)列的時(shí)候設(shè)置才有效,如果隊(duì)列一開(kāi)始已存在且沒(méi)有這個(gè)屬性,則要?jiǎng)h掉隊(duì)列再重新聲明才可以;
          2. 隊(duì)列的 ttl 只能被設(shè)置為某個(gè)固定的值,一旦設(shè)置后則不能更改,否則會(huì)拋出異常;

          死信交換機(jī)

          死信交換機(jī)全拼Dead-Letter-Exchange,簡(jiǎn)稱(chēng)DLX

          當(dāng)消息在一個(gè)隊(duì)列中變成死信之后,如果這個(gè)消息所在的隊(duì)列設(shè)置了x-dead-letter-exchange參數(shù),那么它會(huì)被發(fā)送到x-dead-letter-exchange對(duì)應(yīng)值的交換機(jī)上,這個(gè)交換機(jī)就稱(chēng)之為死信交換機(jī),與這個(gè)死信交換器綁定的隊(duì)列就是死信隊(duì)列。

          • x-dead-letter-exchange:出現(xiàn)死信之后將死信重新發(fā)送到指定交換機(jī);
          • x-dead-letter-routing-key:出現(xiàn)死信之后將死信重新按照指定的routing-key發(fā)送,如果不設(shè)置默認(rèn)使用消息本身的routing-key

          死信隊(duì)列與普通隊(duì)列的區(qū)別就是它的RoutingKeyExchange需要作為參數(shù),綁定到正常的隊(duì)列上。

          實(shí)戰(zhàn)教學(xué)

          先來(lái)張圖感受下我們的整體思路

          1. 生產(chǎn)者發(fā)送帶有 ttl 的消息放入交換機(jī)路由到延時(shí)隊(duì)列中;
          2. 在延時(shí)隊(duì)列中綁定死信交換機(jī)與死信轉(zhuǎn)發(fā)的routing-key
          3. 等延時(shí)隊(duì)列中的消息達(dá)到延時(shí)時(shí)間之后變成死信轉(zhuǎn)發(fā)到死信交換機(jī)并路由到死信隊(duì)列中;
          4. 最后供消費(fèi)者消費(fèi)。

          我們?cè)?a style="font-weight: bold;color: rgb(239, 112, 96);border-bottom: 1px solid rgb(239, 112, 96);" data-linktype="2">上文的基礎(chǔ)上進(jìn)行代碼實(shí)現(xiàn):

          配置類(lèi)

          @Configuration
          public?class?DelayQueueRabbitConfig?{

          ????public?static?final?String?DLX_QUEUE?=?"queue.dlx";//死信隊(duì)列
          ????public?static?final?String?DLX_EXCHANGE?=?"exchange.dlx";//死信交換機(jī)
          ????public?static?final?String?DLX_ROUTING_KEY?=?"routingkey.dlx";//死信隊(duì)列與死信交換機(jī)綁定的routing-key

          ????public?static?final?String?ORDER_QUEUE?=?"queue.order";//訂單的延時(shí)隊(duì)列
          ????public?static?final?String?ORDER_EXCHANGE?=?"exchange.order";//訂單交換機(jī)
          ????public?static?final?String?ORDER_ROUTING_KEY?=?"routingkey.order";//延時(shí)隊(duì)列與訂單交換機(jī)綁定的routing-key

          ?/**
          ?????*?定義死信隊(duì)列
          ?????**/

          ????@Bean
          ????public?Queue?dlxQueue(){
          ????????return?new?Queue(DLX_QUEUE,true);
          ????}

          ????/**
          ?????*?定義死信交換機(jī)
          ?????**/

          ????@Bean
          ????public?DirectExchange?dlxExchange(){
          ????????return?new?DirectExchange(DLX_EXCHANGE,?true,?false);
          ????}


          ????/**
          ?????*?死信隊(duì)列和死信交換機(jī)綁定
          ?????*?設(shè)置路由鍵:routingkey.dlx
          ?????**/

          ????@Bean
          ????Binding?bindingDLX(){
          ????????return?BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);
          ????}


          ????/**
          ?????*?訂單延時(shí)隊(duì)列
          ?????*?設(shè)置隊(duì)列里的死信轉(zhuǎn)發(fā)到的DLX名稱(chēng)
          ?????*?設(shè)置死信在轉(zhuǎn)發(fā)時(shí)攜帶的?routing-key?名稱(chēng)
          ?????**/

          ????@Bean
          ????public?Queue?orderQueue()?{
          ????????Map?params?=?new?HashMap<>();
          ????????params.put("x-dead-letter-exchange",?DLX_EXCHANGE);
          ????????params.put("x-dead-letter-routing-key",?DLX_ROUTING_KEY);
          ????????return?new?Queue(ORDER_QUEUE,?true,?false,?false,?params);
          ????}

          ????/**
          ?????*?訂單交換機(jī)
          ?????**/

          ????@Bean
          ????public?DirectExchange?orderExchange()?{
          ????????return?new?DirectExchange(ORDER_EXCHANGE,?true,?false);
          ????}

          ????/**
          ?????*?把訂單隊(duì)列和訂單交換機(jī)綁定在一起
          ?????**/

          ????@Bean
          ????public?Binding?orderBinding()?{
          ????????return?BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);
          ????}
          }

          發(fā)送消息

          @RequestMapping("/order")
          public?class?OrderSendMessageController?{

          ????@Autowired
          ????private?RabbitTemplate?rabbitTemplate;

          ????@GetMapping("/sendMessage")
          ????public?String?sendMessage(){

          ????????String?delayTime?=?"10000";
          ????????//將消息攜帶路由鍵值
          ????????rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,?DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
          ????????????????"發(fā)送消息!",message->{
          ????????????message.getMessageProperties().setExpiration(delayTime);
          ????????????return?message;
          ????????});
          ????????return?"ok";
          ????}

          }

          消費(fèi)消息

          @Component
          @RabbitListener(queues?=?DelayQueueRabbitConfig.DLX_QUEUE)//監(jiān)聽(tīng)隊(duì)列名稱(chēng)
          public?class?OrderMQReciever?{

          ????@RabbitHandler
          ????public?void?process(String?message){
          ????????System.out.println("OrderMQReciever接收到的消息是:"+?message);
          ????}
          }

          測(cè)試

          通過(guò)調(diào)用接口,發(fā)現(xiàn)10秒之后才會(huì)消費(fèi)消息

          問(wèn)題升級(jí)

          由于開(kāi)發(fā)環(huán)境和測(cè)試環(huán)境使用的是同一個(gè)交換機(jī)和隊(duì)列,所以發(fā)送的延時(shí)時(shí)間都是30分鐘。但是為了在測(cè)試環(huán)境讓測(cè)試同學(xué)方便測(cè)試,故手動(dòng)將測(cè)試環(huán)境的時(shí)間改為了1分鐘。

          問(wèn)題復(fù)現(xiàn)

          接著問(wèn)題就來(lái)了:延時(shí)時(shí)間為1分鐘的消息并沒(méi)有立即被消費(fèi),而是等30分鐘的消息被消費(fèi)完之后才被消費(fèi)了。至于原因,我們下邊再分析,先用代碼來(lái)給大家復(fù)現(xiàn)下該問(wèn)題。

          @GetMapping("/sendManyMessage")
          public?String?sendManyMessage(){
          ????send("延遲消息睡10秒",10000+"");
          ????send("延遲消息睡2秒",2000+"");
          ????send("延遲消息睡5秒",5000+"");
          ????return?"ok";
          }

          private?void?send(String?msg,?String?delayTime){
          ?rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,?
          ??????????????????????????????????DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
          ??????????????????????????????????msg,message->{
          ??????????????????????????????????????message.getMessageProperties().setExpiration(delayTime);
          ??????????????????????????????????????return?message;
          ??????????????????????????????????});
          }

          執(zhí)行結(jié)果如下:

          OrderMQReciever接收到的消息是:延遲消息睡10秒
          OrderMQReciever接收到的消息是:延遲消息睡2秒
          OrderMQReciever接收到的消息是:延遲消息睡5秒

          原因就是延時(shí)隊(duì)列也滿足隊(duì)列先進(jìn)先出的特征,當(dāng)10秒的消息未出隊(duì)列時(shí),后邊的消息不能順利出隊(duì),造成后邊的消息阻塞了,未能達(dá)到精準(zhǔn)延時(shí)。

          問(wèn)題解決

          我們可以利用x-delay-message插件來(lái)解決該問(wèn)題

          消息的延遲范圍是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被設(shè)置的范圍為 (2^32)-1 毫秒)

          1. 生產(chǎn)者發(fā)送消息到交換機(jī)時(shí),并不會(huì)立即進(jìn)入,而是先將消息持久化到 Mnesia(一個(gè)分布式數(shù)據(jù)庫(kù)管理系統(tǒng));
          2. 插件將會(huì)嘗試確認(rèn)消息是否過(guò)期;
          3. 如果消息過(guò)期,消息會(huì)通過(guò) x-delayed-type 類(lèi)型標(biāo)記的交換機(jī)投遞至目標(biāo)隊(duì)列,供消費(fèi)者消費(fèi);

          實(shí)踐

          官網(wǎng)下載:https://www.rabbitmq.com/community-plugins.html

          我這邊使用的是v3.8.0.ez,將文件下載下來(lái)放到服務(wù)器的/usr/local/soft/rabbitmq_server-3.7.14/plugins 路徑下,執(zhí)行rabbitmq-plugins enable rabbitmq_delayed_message_exchange命令即可。

          出現(xiàn)如圖所示,代表安裝成功。

          配置類(lèi)

          @Configuration
          public?class?XDelayedMessageConfig?{

          ????public?static?final?String?DIRECT_QUEUE?=?"queue.direct";//隊(duì)列
          ????public?static?final?String?DELAYED_EXCHANGE?=?"exchange.delayed";//延遲交換機(jī)
          ????public?static?final?String?ROUTING_KEY?=?"routingkey.bind";//綁定的routing-key

          ????/**
          ?????*?定義隊(duì)列
          ?????**/

          ????@Bean
          ????public?Queue?directQueue(){
          ????????return?new?Queue(DIRECT_QUEUE,true);
          ????}

          ????/**
          ?????*?定義延遲交換機(jī)
          ?????*?args:根據(jù)該參數(shù)進(jìn)行靈活路由,設(shè)置為“direct”,意味著該插件具有與直連交換機(jī)具有相同的路由行為,
          ?????*?如果想要不同的路由行為,可以更換現(xiàn)有的交換類(lèi)型如:“topic”
          ?????*?交換機(jī)類(lèi)型為?x-delayed-message
          ?????**/

          ????@Bean
          ????public?CustomExchange?delayedExchange(){
          ????????Map?args?=?new?HashMap();
          ????????args.put("x-delayed-type",?"direct");
          ????????return?new?CustomExchange(DELAYED_EXCHANGE,?"x-delayed-message",?true,?false,?args);
          ????}

          ????/**
          ?????*?隊(duì)列和延遲交換機(jī)綁定
          ?????**/

          ????@Bean
          ????public?Binding?orderBinding()?{
          ????????return?BindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
          ????}

          }

          發(fā)送消息

          @RestController
          @RequestMapping("/delayed")
          public?class?DelayedSendMessageController?{

          ????@Autowired
          ????private?RabbitTemplate?rabbitTemplate;

          ????@GetMapping("/sendManyMessage")
          ????public?String?sendManyMessage(){

          ????????send("延遲消息睡10秒",10000);
          ????????send("延遲消息睡2秒",2000);
          ????????send("延遲消息睡5秒",5000);
          ????????return?"ok";
          ????}

          ????private?void?send(String?msg,?Integer?delayTime){
          ????????//將消息攜帶路由鍵值
          ????????rabbitTemplate.convertAndSend(
          ????????????????XDelayedMessageConfig.DELAYED_EXCHANGE,
          ????????????????XDelayedMessageConfig.ROUTING_KEY,
          ????????????????msg,
          ????????????????message->{
          ????????????????????message.getMessageProperties().setDelay(delayTime);
          ????????????????????return?message;
          ????????????????});
          ????}
          }

          消費(fèi)消息

          @Component
          @RabbitListener(queues?=?XDelayedMessageConfig.DIRECT_QUEUE)//監(jiān)聽(tīng)隊(duì)列名稱(chēng)
          public?class?DelayedMQReciever?{


          ????@RabbitHandler
          ????public?void?process(String?message){
          ????????System.out.println("DelayedMQReciever接收到的消息是:"+?message);
          ????}
          }

          測(cè)試

          DelayedMQReciever接收到的消息是:延遲消息睡2秒
          DelayedMQReciever接收到的消息是:延遲消息睡5秒
          DelayedMQReciever接收到的消息是:延遲消息睡10秒

          這樣我們的問(wèn)題就順利解決了。

          局限性

          延遲的消息存儲(chǔ)在一個(gè)Mnesia表中,當(dāng)前節(jié)點(diǎn)上只有一個(gè)磁盤(pán)副本,它們將在節(jié)點(diǎn)重啟后存活。

          雖然觸發(fā)計(jì)劃交付的計(jì)時(shí)器不會(huì)持久化,但它將在節(jié)點(diǎn)啟動(dòng)時(shí)的插件激活期間重新初始化。顯然,集群中只有一個(gè)預(yù)定消息的副本意味著丟失該節(jié)點(diǎn)或禁用其上的插件將丟失駐留在該節(jié)點(diǎn)上的消息。

          該插件的當(dāng)前設(shè)計(jì)并不適合延遲消息數(shù)量較多的場(chǎng)景(如數(shù)萬(wàn)條或數(shù)百萬(wàn)條),另外該插件的一個(gè)可變性來(lái)源是依賴(lài)于 Erlang 計(jì)時(shí)器,在系統(tǒng)中使用了一定數(shù)量的長(zhǎng)時(shí)間計(jì)時(shí)器之后,它們開(kāi)始爭(zhēng)用調(diào)度程序資源,并且時(shí)間漂移不斷累積。

          項(xiàng)目源碼地址:https://gitee.com/zhangxiaoQ/rabbit-mq


          往期推薦

          1、整理了38個(gè)Python游戲開(kāi)發(fā)庫(kù)
          2、生前無(wú)人問(wèn)津,死后眾人跑來(lái)追捧...
          3、Python 30秒就能學(xué)會(huì)的漂亮短代碼
          4、這 14 個(gè) VSCode 插件,讓你寫(xiě)代碼如同神一般
          5、黑客大神用什么殺毒?Windows自帶的就夠,只是加了億點(diǎn)微小的強(qiáng)化

          點(diǎn)擊關(guān)注公眾號(hào),閱讀更多精彩內(nèi)容

          瀏覽 10
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美黄色操逼视频 | 天堂在线视频资源 | 欧美mv日韩mv国产网站app | 无码AV影视 | 日本岛国视频在线观看一区二区三区 |