<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          領(lǐng)導(dǎo)看了我寫的關(guān)閉超時(shí)訂單,讓我出門左轉(zhuǎn)!

          共 7814字,需瀏覽 16分鐘

           ·

          2022-03-07 15:24

          前幾天領(lǐng)導(dǎo)突然宣布幾年前停用的電商項(xiàng)目又重新啟動(dòng)了,帶著復(fù)雜的心情仔細(xì)賞閱“兒時(shí)”的代碼,心中的酸楚只有自己能夠體會(huì)。

          這不,昨天又被領(lǐng)導(dǎo)叫進(jìn)了“小黑屋”,讓我把代碼重構(gòu)下進(jìn)行升級(jí)。看到這么“可愛”的代碼,心中一萬只“xx馬”疾馳而過。

          讓我最深惡痛覺的就是里邊竟然用定時(shí)任務(wù)實(shí)現(xiàn)了“關(guān)閉超時(shí)訂單”的功能,現(xiàn)在想來,哭笑不得。我們先分析一波為什么大家都在抵制用定時(shí)任務(wù)來實(shí)現(xiàn)該功能。

          ?

          定時(shí)任務(wù)

          關(guān)閉超時(shí)訂單是在創(chuàng)建訂單之后的一段時(shí)間內(nèi)未完成支付而關(guān)閉訂單的操作,該功能一般要求每筆訂單的超時(shí)時(shí)間是一致的。

          如果我們使用定時(shí)任務(wù)來進(jìn)行該操作,很難把握定時(shí)任務(wù)輪詢的時(shí)間間隔:

          • 時(shí)間間隔足夠小,在誤差允許的范圍內(nèi)可以達(dá)到我們說的時(shí)間一致性問題,但是頻繁掃描數(shù)據(jù)庫,執(zhí)行定時(shí)任務(wù),會(huì)造成網(wǎng)絡(luò)IO和磁盤IO的消耗,對(duì)實(shí)時(shí)交易造成一定的沖擊;

          • 時(shí)間間隔比較大,由于每個(gè)訂單創(chuàng)建的時(shí)間不一致,所以上邊的一致性要求很難達(dá)到,舉例如下:



          假設(shè)30分鐘訂單超時(shí)自動(dòng)關(guān)閉,定時(shí)任務(wù)的執(zhí)行間隔時(shí)間為30分鐘:

          1. 我們?cè)诘?分鐘進(jìn)行下單操作;

          2. 當(dāng)時(shí)間來到第30分鐘時(shí),定時(shí)任務(wù)執(zhí)行一次,但是我們的訂單未滿足條件,不執(zhí)行;

          3. 當(dāng)時(shí)間來到第35分鐘時(shí),訂單達(dá)到關(guān)閉條件,但是定時(shí)任務(wù)未執(zhí)行,所以不執(zhí)行;

          4. 當(dāng)時(shí)間來到第60分鐘時(shí),開始執(zhí)行我們的訂單關(guān)閉操作,而此時(shí),誤差達(dá)到25分鐘。

          經(jīng)此種種,我們需要舍棄該方式。

          ?

          延時(shí)隊(duì)列

          為了滿足領(lǐng)導(dǎo)的需求,我便將手伸向了消息隊(duì)列:RabbitMQ。盡管它本身并沒有提供延時(shí)隊(duì)列的功能,但是我們可以利用它的存活時(shí)間和死信交換機(jī)的特性來間接實(shí)現(xiàn)。

          首先我們先來簡(jiǎn)單介紹下什么是存活時(shí)間?什么是死信交換機(jī)?

          存活時(shí)間

          存活時(shí)間的全拼是Time To Live,簡(jiǎn)稱 TTL。它既支持對(duì)消息本身進(jìn)行設(shè)置(延遲隊(duì)列的關(guān)鍵),又支持對(duì)隊(duì)列進(jìn)行設(shè)置(該隊(duì)列中所有消息存在相同的過期時(shí)間)。

          • 對(duì)消息本身進(jìn)行設(shè)置:即使消息過期,也不會(huì)馬上從隊(duì)列中抹去,因?yàn)槊織l消息是否過期是在即將投遞到消費(fèi)者之前判定的;

          • 對(duì)隊(duì)列進(jìn)行設(shè)置:一旦消息過期,就會(huì)從隊(duì)列中抹去;

          如果同時(shí)使用這兩種方法,那么以過期時(shí)間的那個(gè)數(shù)值為準(zhǔn)。當(dāng)消息達(dá)到過期時(shí)間還沒有被消費(fèi),那么該消息就“死了”,我們把它稱為 死信 消息。

          消息變?yōu)樗佬诺臈l件:

          • 消息被拒絕(basic.reject/basic.nack),并且requeue=false;

          • 消息的過期時(shí)間到期了;

          • 隊(duì)列達(dá)到最大長(zhǎng)度;

          隊(duì)列設(shè)置注意事項(xiàng)

          1. 隊(duì)列中該屬性的設(shè)置要在第一次聲明隊(duì)列的時(shí)候設(shè)置才有效,如果隊(duì)列一開始已存在且沒有這個(gè)屬性,則要?jiǎng)h掉隊(duì)列再重新聲明才可以;

          2. 隊(duì)列的 ttl 只能被設(shè)置為某個(gè)固定的值,一旦設(shè)置后則不能更改,否則會(huì)拋出異常;

          死信交換機(jī)

          死信交換機(jī)全拼Dead-Letter-Exchange,簡(jiǎn)稱DLX。

          當(dāng)消息在一個(gè)隊(duì)列中變成死信之后,如果這個(gè)消息所在的隊(duì)列設(shè)置了x-dead-letter-exchange參數(shù),那么它會(huì)被發(fā)送到x-dead-letter-exchange對(duì)應(yīng)值的交換機(jī)上,這個(gè)交換機(jī)就稱之為死信交換機(jī),與這個(gè)死信交換器綁定的隊(duì)列就是死信隊(duì)列。

          • x-dead-letter-exchange:出現(xiàn)死信之后將死信重新發(fā)送到指定交換機(jī);

          • x-dead-letter-routing-key:出現(xiàn)死信之后將死信重新按照指定的routing-key發(fā)送,如果不設(shè)置默認(rèn)使用消息本身的routing-key

          死信隊(duì)列與普通隊(duì)列的區(qū)別就是它的RoutingKeyExchange需要作為參數(shù),綁定到正常的隊(duì)列上。

          ?

          實(shí)戰(zhàn)教學(xué)

          先來張圖感受下我們的整體思路


          1. 生產(chǎn)者發(fā)送帶有 ttl 的消息放入交換機(jī)路由到延時(shí)隊(duì)列中;

          2. 在延時(shí)隊(duì)列中綁定死信交換機(jī)與死信轉(zhuǎn)發(fā)的routing-key;

          3. 等延時(shí)隊(duì)列中的消息達(dá)到延時(shí)時(shí)間之后變成死信轉(zhuǎn)發(fā)到死信交換機(jī)并路由到死信隊(duì)列中;

          4. 最后供消費(fèi)者消費(fèi)。

          我們?cè)?/span>上文的基礎(chǔ)上進(jìn)行代碼實(shí)現(xiàn):

          配置類

          @Configuration
          public?class?DelayQueueRabbitConfig?{

          ????public?static?final?String?DLX_QUEUE?=?"queue.dlx";//死信隊(duì)列
          ????public?static?final?String?DLX_EXCHANGE?=?"exchange.dlx";//死信交換機(jī)
          ????public?static?final?String?DLX_ROUTING_KEY?=?"routingkey.dlx";//死信隊(duì)列與死信交換機(jī)綁定的routing-key

          ????public?static?final?String?ORDER_QUEUE?=?"queue.order";//訂單的延時(shí)隊(duì)列
          ????public?static?final?String?ORDER_EXCHANGE?=?"exchange.order";//訂單交換機(jī)
          ????public?static?final?String?ORDER_ROUTING_KEY?=?"routingkey.order";//延時(shí)隊(duì)列與訂單交換機(jī)綁定的routing-key

          ?/**
          ?????*?定義死信隊(duì)列
          ?????**/

          ????@Bean
          ????public?Queue?dlxQueue(){
          ????????return?new?Queue(DLX_QUEUE,true);
          ????}

          ????/**
          ?????*?定義死信交換機(jī)
          ?????**/

          ????@Bean
          ????public?DirectExchange?dlxExchange(){
          ????????return?new?DirectExchange(DLX_EXCHANGE,?true,?false);
          ????}


          ????/**
          ?????*?死信隊(duì)列和死信交換機(jī)綁定
          ?????*?設(shè)置路由鍵:routingkey.dlx
          ?????**/

          ????@Bean
          ????Binding?bindingDLX(){
          ????????return?BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);
          ????}


          ????/**
          ?????*?訂單延時(shí)隊(duì)列
          ?????*?設(shè)置隊(duì)列里的死信轉(zhuǎn)發(fā)到的DLX名稱
          ?????*?設(shè)置死信在轉(zhuǎn)發(fā)時(shí)攜帶的?routing-key?名稱
          ?????**/

          ????@Bean
          ????public?Queue?orderQueue()?{
          ????????Map?params?=?new?HashMap<>();
          ????????params.put("x-dead-letter-exchange",?DLX_EXCHANGE);
          ????????params.put("x-dead-letter-routing-key",?DLX_ROUTING_KEY);
          ????????return?new?Queue(ORDER_QUEUE,?true,?false,?false,?params);
          ????}

          ????/**
          ?????*?訂單交換機(jī)
          ?????**/

          ????@Bean
          ????public?DirectExchange?orderExchange()?{
          ????????return?new?DirectExchange(ORDER_EXCHANGE,?true,?false);
          ????}

          ????/**
          ?????*?把訂單隊(duì)列和訂單交換機(jī)綁定在一起
          ?????**/

          ????@Bean
          ????public?Binding?orderBinding()?{
          ????????return?BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);
          ????}
          }

          發(fā)送消息

          @RequestMapping("/order")
          public?class?OrderSendMessageController?{

          ????@Autowired
          ????private?RabbitTemplate?rabbitTemplate;

          ????@GetMapping("/sendMessage")
          ????public?String?sendMessage(){

          ????????String?delayTime?=?"10000";
          ????????//將消息攜帶路由鍵值
          ????????rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,?DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
          ????????????????"發(fā)送消息!",message->{
          ????????????message.getMessageProperties().setExpiration(delayTime);
          ????????????return?message;
          ????????});
          ????????return?"ok";
          ????}

          }

          消費(fèi)消息

          @Component
          @RabbitListener(queues?=?DelayQueueRabbitConfig.DLX_QUEUE)//監(jiān)聽隊(duì)列名稱
          public?class?OrderMQReciever?{

          ????@RabbitHandler
          ????public?void?process(String?message){
          ????????System.out.println("OrderMQReciever接收到的消息是:"+?message);
          ????}
          }

          測(cè)試

          通過調(diào)用接口,發(fā)現(xiàn)10秒之后才會(huì)消費(fèi)消息

          ?

          問題升級(jí)

          由于開發(fā)環(huán)境和測(cè)試環(huán)境使用的是同一個(gè)交換機(jī)和隊(duì)列,所以發(fā)送的延時(shí)時(shí)間都是30分鐘。但是為了在測(cè)試環(huán)境讓測(cè)試同學(xué)方便測(cè)試,故手動(dòng)將測(cè)試環(huán)境的時(shí)間改為了1分鐘。

          問題復(fù)現(xiàn)

          接著問題就來了:延時(shí)時(shí)間為1分鐘的消息并沒有立即被消費(fèi),而是等30分鐘的消息被消費(fèi)完之后才被消費(fèi)了。至于原因,我們下邊再分析,先用代碼來給大家復(fù)現(xiàn)下該問題。

          @GetMapping("/sendManyMessage")
          public?String?sendManyMessage(){
          ????send("延遲消息睡10秒",10000+"");
          ????send("延遲消息睡2秒",2000+"");
          ????send("延遲消息睡5秒",5000+"");
          ????return?"ok";
          }

          private?void?send(String?msg,?String?delayTime){
          ?rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,?
          ??????????????????????????????????DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
          ??????????????????????????????????msg,message->{
          ??????????????????????????????????????message.getMessageProperties().setExpiration(delayTime);
          ??????????????????????????????????????return?message;
          ??????????????????????????????????});
          }

          執(zhí)行結(jié)果如下:

          OrderMQReciever接收到的消息是:延遲消息睡10秒
          OrderMQReciever接收到的消息是:延遲消息睡2秒
          OrderMQReciever接收到的消息是:延遲消息睡5秒

          原因就是延時(shí)隊(duì)列也滿足隊(duì)列先進(jìn)先出的特征,當(dāng)10秒的消息未出隊(duì)列時(shí),后邊的消息不能順利出隊(duì),造成后邊的消息阻塞了,未能達(dá)到精準(zhǔn)延時(shí)。

          問題解決

          我們可以利用x-delay-message插件來解決該問題

          消息的延遲范圍是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被設(shè)置的范圍為 (2^32)-1 毫秒)


          1. 生產(chǎn)者發(fā)送消息到交換機(jī)時(shí),并不會(huì)立即進(jìn)入,而是先將消息持久化到 Mnesia(一個(gè)分布式數(shù)據(jù)庫管理系統(tǒng));

          2. 插件將會(huì)嘗試確認(rèn)消息是否過期;

          3. 如果消息過期,消息會(huì)通過 x-delayed-type 類型標(biāo)記的交換機(jī)投遞至目標(biāo)隊(duì)列,供消費(fèi)者消費(fèi);

          實(shí)踐

          官網(wǎng)下載:https://www.rabbitmq.com/community-plugins.html

          我這邊使用的是v3.8.0.ez,將文件下載下來放到服務(wù)器的/usr/local/soft/rabbitmq_server-3.7.14/plugins 路徑下,執(zhí)行rabbitmq-plugins enable rabbitmq_delayed_message_exchange命令即可。


          出現(xiàn)如圖所示,代表安裝成功。

          配置類

          @Configuration
          public?class?XDelayedMessageConfig?{

          ????public?static?final?String?DIRECT_QUEUE?=?"queue.direct";//隊(duì)列
          ????public?static?final?String?DELAYED_EXCHANGE?=?"exchange.delayed";//延遲交換機(jī)
          ????public?static?final?String?ROUTING_KEY?=?"routingkey.bind";//綁定的routing-key

          ????/**
          ?????*?定義隊(duì)列
          ?????**/

          ????@Bean
          ????public?Queue?directQueue(){
          ????????return?new?Queue(DIRECT_QUEUE,true);
          ????}

          ????/**
          ?????*?定義延遲交換機(jī)
          ?????*?args:根據(jù)該參數(shù)進(jìn)行靈活路由,設(shè)置為“direct”,意味著該插件具有與直連交換機(jī)具有相同的路由行為,
          ?????*?如果想要不同的路由行為,可以更換現(xiàn)有的交換類型如:“topic”
          ?????*?交換機(jī)類型為?x-delayed-message
          ?????**/

          ????@Bean
          ????public?CustomExchange?delayedExchange(){
          ????????Map?args?=?new?HashMap();
          ????????args.put("x-delayed-type",?"direct");
          ????????return?new?CustomExchange(DELAYED_EXCHANGE,?"x-delayed-message",?true,?false,?args);
          ????}

          ????/**
          ?????*?隊(duì)列和延遲交換機(jī)綁定
          ?????**/

          ????@Bean
          ????public?Binding?orderBinding()?{
          ????????return?BindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
          ????}

          }

          發(fā)送消息

          @RestController
          @RequestMapping("/delayed")
          public?class?DelayedSendMessageController?{

          ????@Autowired
          ????private?RabbitTemplate?rabbitTemplate;

          ????@GetMapping("/sendManyMessage")
          ????public?String?sendManyMessage(){

          ????????send("延遲消息睡10秒",10000);
          ????????send("延遲消息睡2秒",2000);
          ????????send("延遲消息睡5秒",5000);
          ????????return?"ok";
          ????}

          ????private?void?send(String?msg,?Integer?delayTime){
          ????????//將消息攜帶路由鍵值
          ????????rabbitTemplate.convertAndSend(
          ????????????????XDelayedMessageConfig.DELAYED_EXCHANGE,
          ????????????????XDelayedMessageConfig.ROUTING_KEY,
          ????????????????msg,
          ????????????????message->{
          ????????????????????message.getMessageProperties().setDelay(delayTime);
          ????????????????????return?message;
          ????????????????});
          ????}
          }

          消費(fèi)消息

          @Component
          @RabbitListener(queues?=?XDelayedMessageConfig.DIRECT_QUEUE)//監(jiān)聽隊(duì)列名稱
          public?class?DelayedMQReciever?{


          ????@RabbitHandler
          ????public?void?process(String?message){
          ????????System.out.println("DelayedMQReciever接收到的消息是:"+?message);
          ????}
          }

          測(cè)試

          DelayedMQReciever接收到的消息是:延遲消息睡2秒
          DelayedMQReciever接收到的消息是:延遲消息睡5秒
          DelayedMQReciever接收到的消息是:延遲消息睡10秒

          這樣我們的問題就順利解決了。

          局限性

          延遲的消息存儲(chǔ)在一個(gè)Mnesia表中,當(dāng)前節(jié)點(diǎn)上只有一個(gè)磁盤副本,它們將在節(jié)點(diǎn)重啟后存活。

          雖然觸發(fā)計(jì)劃交付的計(jì)時(shí)器不會(huì)持久化,但它將在節(jié)點(diǎn)啟動(dòng)時(shí)的插件激活期間重新初始化。顯然,集群中只有一個(gè)預(yù)定消息的副本意味著丟失該節(jié)點(diǎn)或禁用其上的插件將丟失駐留在該節(jié)點(diǎn)上的消息。

          該插件的當(dāng)前設(shè)計(jì)并不適合延遲消息數(shù)量較多的場(chǎng)景(如數(shù)萬條或數(shù)百萬條),另外該插件的一個(gè)可變性來源是依賴于 Erlang 計(jì)時(shí)器,在系統(tǒng)中使用了一定數(shù)量的長(zhǎng)時(shí)間計(jì)時(shí)器之后,它們開始爭(zhēng)用調(diào)度程序資源,并且時(shí)間漂移不斷累積。


          有道無術(shù),術(shù)可成;有術(shù)無道,止于術(shù)

          歡迎大家關(guān)注Java之道公眾號(hào)


          好文章,我在看??

          瀏覽 81
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  黄色一级免费 | 肏逼视频免费 | 色老太HD老太色HD盘 | 黄色免费毛片 | 国产精品一级无码免费 |