<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          延遲隊列實現(xiàn),定時任務,關閉訂單

          共 6230字,需瀏覽 13分鐘

           ·

          2020-10-02 17:56

          場景

          開發(fā)中經常需要用到定時任務,對于商城來說,定時任務尤其多,比如優(yōu)惠券定時過期、訂單定時關閉、微信支付2小時未支付關閉訂單等等,都需要用到定時任務,但是定時任務本身有一個問題,一般來說我們都是通過定時輪詢查詢數(shù)據(jù)庫來判斷是否有任務需要執(zhí)行,也就是說不管怎么樣,我們需要先查詢數(shù)據(jù)庫,而且有些任務對時間準確要求比較高的,需要每秒查詢一次,對于系統(tǒng)小倒是無所謂,如果系統(tǒng)本身就大而且數(shù)據(jù)也多的情況下,這就不大現(xiàn)實了,所以需要其他方式的,當然實現(xiàn)的方式有多種多樣的,比如Redis實現(xiàn)定時隊列、基于優(yōu)先級隊列的JDK延遲隊列、時間輪等。因為我們項目中本身就使用到了Rabbitmq,所以基于方便開發(fā)和維護的原則,我們使用了Rabbitmq延遲隊列來實現(xiàn)定時任務,不知道rabbitmq是什么的和不知道springboot怎么集成Rabbitmq的可以查看我之前的文章Spring boot集成RabbitMQ

          Rabbitmq延遲隊列

          Rabbitmq本身是沒有延遲隊列的,只能通過Rabbitmq本身隊列的特性來實現(xiàn),想要Rabbitmq實現(xiàn)延遲隊列,需要使用Rabbitmq的死信交換機(Exchange)和消息的存活時間TTL(Time To Live)

          死信交換機

          一個消息在滿足如下條件下,會進死信交換機,記住這里是交換機而不是隊列,一個交換機可以對應很多隊列。

          一個消息被Consumer拒收了,并且reject方法的參數(shù)里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。

          上面的消息的TTL到了,消息過期了。

          隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。

          死信交換機就是普通的交換機,只是因為我們把過期的消息扔進去,所以叫死信交換機,并不是說死信交換機是某種特定的交換機

          消息TTL(消息存活時間)

          消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連著的保留時間,也可以對每一個單獨的消息做單獨的設置。超過了這個時間,我們認為這個消息就死了,稱之為死信。如果隊列設置了,消息也設置了,那么會取小的。所以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設置)。這里單講單個消息的TTL,因為它才是實現(xiàn)延遲任務的關鍵。

          1. byte[] messageBodyBytes = "Hello, world!".getBytes(); ?

          2. AMQP.BasicProperties properties = new AMQP.BasicProperties(); ?

          3. properties.setExpiration("60000"); ?

          4. channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes); ?

          可以通過設置消息的expiration字段或者x-message-ttl屬性來設置時間,兩者是一樣的效果。只是expiration字段是字符串參數(shù),所以要寫個int類型的字符串:當上面的消息扔到隊列中后,過了60秒,如果沒有被消費,它就死了。不會被消費者消費到。這個消息后面的,沒有“死掉”的消息對頂上來,被消費者消費。死信在隊列中并不會被刪除和釋放,它會被統(tǒng)計到隊列的消息數(shù)中去

          處理流程圖

          創(chuàng)建交換機(Exchanges)和隊列(Queues)

          創(chuàng)建死信交換機

          如圖所示,就是創(chuàng)建一個普通的交換機,這里為了方便區(qū)分,把交換機的名字取為:delay

          創(chuàng)建自動過期消息隊列

          這個隊列的主要作用是讓消息定時過期的,比如我們需要2小時候關閉訂單,我們就需要把消息放進這個隊列里面,把消息過期時間設置為2小時創(chuàng)建一個一個名為delay_queue1的自動過期的隊列,當然圖片上面的參數(shù)并不會讓消息自動過期,因為我們并沒有設置x-message-ttl參數(shù),如果整個隊列的消息有消息都是相同的,可以設置,這里為了靈活,所以并沒有設置,另外兩個參數(shù)x-dead-letter-exchange代表消息過期后,消息要進入的交換機,這里配置的是delay,也就是死信交換機,x-dead-letter-routing-key是配置消息過期后,進入死信交換機的routing-key,跟發(fā)送消息的routing-key一個道理,根據(jù)這個key將消息放入不同的隊列

          創(chuàng)建消息處理隊列

          這個隊列才是真正處理消息的隊列,所有進入這個隊列的消息都會被處理消息隊列的名字為delay_queue2

          消息隊列綁定到交換機

          進入交換機詳情頁面,將創(chuàng)建的2個隊列(delayqueue1和delayqueue2)綁定到交換機上面自動過期消息隊列的routing key 設置為delay

          綁定delayqueue2delayqueue2 的key要設置為創(chuàng)建自動過期的隊列的x-dead-letter-routing-key參數(shù),這樣當消息過期的時候就可以自動把消息放入delay_queue2這個隊列中了

          綁定后的管理頁面如下圖:

          當然這個綁定也可以使用代碼來實現(xiàn),只是為了直觀表現(xiàn),所以本文使用的管理平臺來操作

          發(fā)送消息

          1. String msg = "hello word"; ?

          2. MessageProperties messageProperties = newMessageProperties(); ?

          3. ? ? ? ?messageProperties.setExpiration("6000");

          4. ? ? ? ?messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());

          5. ? ? ? ?Message message = newMessage(msg.getBytes(), messageProperties);

          6. ? ? ? ?rabbitTemplate.convertAndSend("delay", "delay",message);

          主要的代碼就是

          1. messageProperties.setExpiration("6000"); ?

          設置了讓消息6秒后過期

          注意:因為要讓消息自動過期,所以一定不能設置delay_queue1的監(jiān)聽,不能讓這個隊列里面的消息被接受到,否則消息一旦被消費,就不存在過期了

          接收消息

          接收消息配置好delay_queue2的監(jiān)聽就好了

          1. package wang.raye.rabbitmq.demo1;

          2. import org.springframework.amqp.core.AcknowledgeMode; ?

          3. import org.springframework.amqp.core.Binding; ?

          4. import org.springframework.amqp.core.BindingBuilder; ?

          5. import org.springframework.amqp.core.DirectExchange; ?

          6. import org.springframework.amqp.core.Message; ?

          7. import org.springframework.amqp.core.Queue; ?

          8. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; ?

          9. import org.springframework.amqp.rabbit.connection.ConnectionFactory; ?

          10. import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; ?

          11. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; ?

          12. import org.springframework.beans.factory.annotation.Autowired; ?

          13. import org.springframework.context.annotation.Bean; ?

          14. import org.springframework.context.annotation.Configuration;

          15. @Configuration

          16. publicclassDelayQueue{ ?

          17. ? ?/** 消息交換機的名字*/

          18. ? ?publicstaticfinalString EXCHANGE = "delay";

          19. ? ?/** 隊列key1*/

          20. ? ?publicstaticfinalString ROUTINGKEY1 = "delay";

          21. ? ?/** 隊列key2*/

          22. ? ?publicstaticfinalString ROUTINGKEY2 = "delay_key";

          23. ? ?/**

          24. ? ? * 配置鏈接信息

          25. ? ? * @return

          26. ? ? */

          27. ? ?@Bean

          28. ? ?publicConnectionFactory connectionFactory() {

          29. ? ? ? ?CachingConnectionFactory connectionFactory = newCachingConnectionFactory("120.76.237.8",5672);

          30. ? ? ? ?connectionFactory.setUsername("kberp");

          31. ? ? ? ?connectionFactory.setPassword("kberp");

          32. ? ? ? ?connectionFactory.setVirtualHost("/");

          33. ? ? ? ?connectionFactory.setPublisherConfirms(true); // 必須要設置

          34. ? ? ? ?return connectionFactory;

          35. ? ?}

          36. ? ?/** ?

          37. ? ? * 配置消息交換機

          38. ? ? * 針對消費者配置 ?

          39. ? ? ? ?FanoutExchange: 將消息分發(fā)到所有的綁定隊列,無routingkey的概念 ?

          40. ? ? ? HeadersExchange :通過添加屬性key-value匹配 ?

          41. ? ? ? ?DirectExchange:按照routingkey分發(fā)到指定隊列 ?

          42. ? ? ? ?TopicExchange:多關鍵字匹配 ?

          43. ? ? */ ?

          44. ? ?@Bean ?

          45. ? ?publicDirectExchange defaultExchange() { ?

          46. ? ? ? ?returnnewDirectExchange(EXCHANGE, true, false);

          47. ? ?}

          48. ? ?/**

          49. ? ? * 配置消息隊列2

          50. ? ? * 針對消費者配置 ?

          51. ? ? * @return

          52. ? ? */

          53. ? ?@Bean

          54. ? ?publicQueue queue() { ?

          55. ? ? ? returnnewQueue("delay_queue2", true); //隊列持久 ?

          56. ? ?}

          57. ? ?/**

          58. ? ? * 將消息隊列2與交換機綁定

          59. ? ? * 針對消費者配置 ?

          60. ? ? * @return

          61. ? ? */

          62. ? ?@Bean ?

          63. ? ?@Autowired

          64. ? ?publicBinding binding() { ?

          65. ? ? ? ?returnBindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2); ?

          66. ? ?}

          67. ? ?/**

          68. ? ? * 接受消息的監(jiān)聽,這個監(jiān)聽會接受消息隊列1的消息

          69. ? ? * 針對消費者配置 ?

          70. ? ? * @return

          71. ? ? */

          72. ? ?@Bean ?

          73. ? ?@Autowired

          74. ? ?publicSimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) { ?

          75. ? ? ? ?SimpleMessageListenerContainer container = newSimpleMessageListenerContainer(connectionFactory()); ?

          76. ? ? ? ?container.setQueues(queue()); ?

          77. ? ? ? ?container.setExposeListenerChannel(true); ?

          78. ? ? ? ?container.setMaxConcurrentConsumers(1); ?

          79. ? ? ? ?container.setConcurrentConsumers(1); ?

          80. ? ? ? ?container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認 ?

          81. ? ? ? ?container.setMessageListener(newChannelAwareMessageListener() {

          82. ? ? ? ? ? ?publicvoid onMessage(Message message, com.rabbitmq.client.Channel channel) throwsException{

          83. ? ? ? ? ? ? ? ?byte[] body = message.getBody(); ?

          84. ? ? ? ? ? ? ? ?System.out.println("delay_queue2 收到消息 : "+ newString(body)); ?

          85. ? ? ? ? ? ? ? ?channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費 ?

          86. ? ? ? ? ? ?} ?

          87. ? ? ? ?}); ?

          88. ? ? ? ?return container; ?

          89. ? ?} ?

          90. }

          在消息監(jiān)聽中處理需要定時處理的任務就好了,因為Rabbitmq能發(fā)送消息,所以可以把任務特征碼發(fā)過來,比如關閉訂單就把訂單id發(fā)過來,這樣就避免了需要查詢一下那些訂單需要關閉而加重MySQL負擔了,畢竟一旦訂單量大的話,查詢本身也是一件很費IO的事情

          總結

          基于Rabbitmq實現(xiàn)定時任務,就是將消息設置一個過期時間,放入一個沒有讀取的隊列中,讓消息過期后自動轉入另外一個隊列中,監(jiān)控這個隊列消息的監(jiān)聽處來處理定時任務具體的操作

          瀏覽 56
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  中文字幕欧美色图 | 日本操逼片 | 欧美成人不卡免费在线视频 | 在线三级片视频 | 一级a作爱视频 |