<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          delaymq高性能的任意延時消息隊列

          聯(lián)合創(chuàng)作 · 2023-10-01 04:02

          RocketMQ 開源版本任意時間延時隊列實現(xiàn)

          定時消息:Producer將消息發(fā)送到消息隊列RocketMQ版服務(wù)端,但并不期望立馬投遞這條消息,而是推遲到在當(dāng)前時間點之后的某一個時間投遞到Consumer進(jìn)行消費,該消息即定時消息。

          延時消息:Producer將消息發(fā)送到消息隊列RocketMQ版服務(wù)端,但并不期望立馬投遞這條消息,而是延遲一定時間后才投遞到Consumer進(jìn)行消費,該消息即延時消息。

          定時消息與延時消息在代碼配置上存在一些差異,但是最終達(dá)到的效果相同:消息在發(fā)送到消息隊列RocketMQ版服務(wù)端后并不會立馬投遞,而是根據(jù)消息中的屬性延遲固定時間后才投遞給消費者。

          適用場景

          定時消息和延時消息適用于以下一些場景:

          消息生產(chǎn)和消費有時間窗口要求,例如在電商交易中超時未支付關(guān)閉訂單的場景,在訂單創(chuàng)建時會發(fā)送一條延時消息。

          這條消息將會在30分鐘以后投遞給消費者,消費者收到此消息后需要判斷對應(yīng)的訂單是否已完成支付。

          如支付未完成,則關(guān)閉訂單。如已完成支付則忽略。

          通過消息觸發(fā)一些定時任務(wù),例如在某一固定時間點向用戶發(fā)送提醒消息。

          使用方式 定時消息和延時消息的使用在代碼編寫上存在略微的區(qū)別:

          發(fā)送定時消息需要明確指定消息發(fā)送時間點之后的某一時間點作為消息投遞的時間點。

          發(fā)送延時消息時需要設(shè)定一個延時時間長度,消息將從當(dāng)前發(fā)送時間點開始延遲固定時間之后才開始投遞。

          注意事項

          定時消息的精度會有1s~2s的延遲誤差。

          定時和延時消息的msg.setStartDeliverTime參數(shù)需要設(shè)置成當(dāng)前時間戳之后的某個時刻(單位毫秒)。

          如果被設(shè)置成當(dāng)前時間戳之前的某個時刻,消息將立刻投遞給消費者。

          定時和延時消息的msg.setStartDeliverTime參數(shù)可設(shè)置40天內(nèi)的任何時刻(單位毫秒),超過40天消息發(fā)送將失敗。

          StartDeliverTime是服務(wù)端開始向消費端投遞的時間。如果消費者當(dāng)前有消息堆積,那么定時和延時消息會排在堆積消息后面,將不能嚴(yán)格按照配置的時間進(jìn)行投遞。

          由于客戶端和服務(wù)端可能存在時間差,消息的實際投遞時間與客戶端設(shè)置的投遞時間之間可能存在偏差。

          如何使用

          推薦使用阿里云提供的rocketmq版本的pom

                 <dependency>
                      <groupId>com.aliyun.openservicesgroupId>
                      <artifactId>ons-clientartifactId>
                      <version>1.8.4.Finalversion>
                  dependency>
          

          消息發(fā)送

          import com.aliyun.openservices.ons.api.*;
          import com.aliyun.openservices.shade.org.apache.commons.lang3.time.DateFormatUtils;
          
          import java.util.Date;
          import java.util.Properties;
          
          public class ProducerDelayTest {
              public static void main(String[] args) {
                  Properties properties = new Properties();
                  // AccessKey ID阿里云身份驗證,在阿里云RAM控制臺創(chuàng)建。
                  properties.put(PropertyKeyConst.AccessKey, "XXX");
                  // AccessKey Secret阿里云身份驗證,在阿里云RAM控制臺創(chuàng)建。
                  properties.put(PropertyKeyConst.SecretKey, "XXX");
                  // 設(shè)置TCP接入域名,進(jìn)入消息隊列RocketMQ版控制臺實例詳情頁面的接入點區(qū)域查看。
                  properties.put(PropertyKeyConst.NAMESRV_ADDR, "localhost:9876");
                  
          
                  Producer producer = ONSFactory.createProducer(properties);
                  // 在發(fā)送消息前,必須調(diào)用start方法來啟動Producer,只需調(diào)用一次即可。
                  producer.start();
          
                   {
                      Message msg = new Message(
                              // 您在消息隊列RocketMQ版控制臺創(chuàng)建的Topic。
                              "TopicTest",
                              // Message Tag,可理解為Gmail中的標(biāo)簽,對消息進(jìn)行再歸類,方便Consumer指定過濾條件在消息隊列RocketMQ版服務(wù)器過濾。
                              "TagA",
                              // Message Body可以是任何二進(jìn)制形式的數(shù)據(jù),消息隊列RocketMQ版不做任何干預(yù),需要Producer與Consumer協(xié)商好一致的序列化和反序列化方式。
                              "演示15秒鐘>>> ".getBytes());
                      // 設(shè)置代表消息的業(yè)務(wù)關(guān)鍵屬性,請盡可能全局唯一。
                      // 以方便您在無法正常收到消息情況下,可通過控制臺查詢消息并補發(fā)。
                      // 注意:不設(shè)置也不會影響消息正常收發(fā)。
                      msg.setKey("ORDERID_100e");
                      try {
                          // 延時消息,單位毫秒(ms),在指定延遲時間(當(dāng)前時間之后)進(jìn)行投遞,例如消息在15秒后投遞。
                          long delayTime = System.currentTimeMillis() + 15000;
                          System.out.println("發(fā)送時間>>" + DateFormatUtils.format(new Date(), "yyyy-MM-dd HH????ss"));
          
                          // 設(shè)置消息需要被投遞的時間。
                          msg.setStartDeliverTime(delayTime);
          
                          SendResult sendResult = producer.send(msg);
                          // 同步發(fā)送消息,只要不拋異常就是成功。
                          if (sendResult != null) {
                              System.out.println(DateFormatUtils.format(new Date(), "yyyy-MM-dd HH????ss") + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                          }
                      } catch (Exception e) {
                          // 消息發(fā)送失敗,需要進(jìn)行重試處理,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進(jìn)行補償處理。
                          System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                          e.printStackTrace();
                      }
                  }
                  // 在應(yīng)用退出前,銷毀Producer對象。
                  // 注意:如果不銷毀也沒有問題。
                  producer.shutdown();
              }
          }           
          
          
          瀏覽 17
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          編輯 分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          編輯 分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  尻屄视频免费在线观看 | 91精品少妇高潮一区二区三区不卡 | 激情五月天丁香婷婷 | 青娱乐中文无码在线观看 | 欧美色图欧美色图 |