<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Rocketmq源碼分析15:延遲消息

          共 35925字,需瀏覽 72分鐘

           ·

          2021-05-02 15:10

          注:本系列源碼分析基于RocketMq 4.8.0,gitee倉庫鏈接:https://gitee.com/funcy/rocketmq.git.

          rocketmq支持延遲消息,本文我們將從源碼角度分析延遲消息的實現(xiàn)原理。

          1. demo 準(zhǔn)備

          延遲消息的demo在org.apache.rocketmq.example.delay包下,發(fā)送消息的producer如下:

          public class Producer {

              public static void main(String[] args) throws MQClientException, InterruptedException {
                  String nameServer = "localhost:9876";
                  DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
                  producer.setNamesrvAddr(nameServer);
                  producer.start();

                  for (int i = 0; i < 1; i++)
                      try {
                          {
                              Message msg = new Message("TopicTest",
                                  "TagA",
                                  "OrderID188",
                                  "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                              // delayLevel=1  2  3   4   5  6  7  8  9  10 11 12 13 14  15  16  17 18
                              // delayTime =1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
                              // 設(shè)置延遲延遲級別
                              msg.setDelayTimeLevel(5);
                              SendResult sendResult = producer.send(msg);
                              System.out.printf("%s%n", sendResult);
                          }

                      } catch (Exception e) {
                          e.printStackTrace();
                      }

                  producer.shutdown();
              }
          }

          rocketmq在實現(xiàn)延遲消息時,會準(zhǔn)備18個延遲級別,這些級別對應(yīng)的延遲時間如下:

          123456789101112131415161718
          1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h

          在發(fā)送延遲消息時,需要指定消息的延遲級別:

          msg.setDelayTimeLevel(5);

          這里指定的延遲級別為5,即延遲1分鐘后發(fā)送。

          2. 延遲消息的存儲

          延遲消息與普通消息的發(fā)送并無太多差別,不過在broker在存儲延遲消息時,會做一些額外的處理,進(jìn)入CommitLog#asyncPutMessage方法:

           public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
              // 消息的存儲時間
              msg.setStoreTimestamp(System.currentTimeMillis());
              msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
              AppendMessageResult result = null;

              StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

              String topic = msg.getTopic();
              int queueId = msg.getQueueId();

              final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
              if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                      || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
                  // 延遲消息
                  if (msg.getDelayTimeLevel() > 0) {
                      if (msg.getDelayTimeLevel() > this.defaultMessageStore
                              .getScheduleMessageService().getMaxDelayLevel()) {
                          msg.setDelayTimeLevel(
                              this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                      }
                      // 指定延遲消息對應(yīng)的topic
                      topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                      // 延遲級別對應(yīng)的隊列,即每個延遲級別都對應(yīng)一條隊列
                      queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                      // 原始的topic與queueId
                      MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                      MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, 
                          String.valueOf(msg.getQueueId()));
                      msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                      msg.setTopic(topic);
                      msg.setQueueId(queueId);
                  }
              }

              // 省略消息寫入 commitLog 的操作
              ...
          }

          在延遲消息寫入前,會做一些特別處理,其實就是將消息的topicqueueId修改為延遲消息專用的topicqueueId

          獲取延遲隊列的方法為ScheduleMessageService#delayLevel2QueueId,代碼如下:

          public static int delayLevel2QueueId(final int delayLevel) {
              return delayLevel - 1;
          }

          這里的delayLevel,就對應(yīng)前面提到的18個延遲級別,這也就是說,每個延遲級別的消息都會有一個專門隊列來存儲。這樣存儲有何好處呢?最大的好處就是避免了排序,舉個簡單的例子:上午10:00broker收到了一條延遲消息1,延遲級別為5;然后在10:02又收到了一條延遲消息2,延遲級別也為5,由于延遲級別相同,他們會存儲在同一條隊列中.

          由于隊列天生有序,入隊時間先按送達(dá)broker的時間先后進(jìn)行排序,而同一隊列上延遲時間也相同,因此延遲消息1一定會在延遲消息2前進(jìn)行消消費,后面如果有消息再進(jìn)入該隊列中,也會按照先進(jìn)先出的方式進(jìn)行消費。

          3. 延遲消息的投遞

          上一節(jié)分析了延遲消息的存儲,本節(jié)我們來分析延遲消息的消費。

          延遲消息存儲到隊列后,會有一個專門的線程定期掃描這些隊列,找到滿足消費時間的消息,然后將其投遞到真正的topicqueueId中,這樣這條消息就能被consumer消息了。

          處理延遲隊列掃描的線程為scheduleMessageService,它在DefaultMessageStore#start方法中啟動:

          public void start() throws Exception {
              ...
              if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                  this.haService.start();
                  // 這里處理延遲消息
                  this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
              }
              ...
          }

          繼續(xù)跟進(jìn),進(jìn)入DefaultMessageStore#handleScheduleMessageService 方法:

          @Override
          public void handleScheduleMessageService(final BrokerRole brokerRole) {
              if (this.scheduleMessageService != null) {
                  if (brokerRole == BrokerRole.SLAVE) {
                      this.scheduleMessageService.shutdown();
                  } else {
                      // 啟動
                      this.scheduleMessageService.start();
                  }
              }

          }

          繼續(xù)跟進(jìn),進(jìn)入ScheduleMessageService#start方法:

          /**
           * 延遲消息服務(wù)的啟動方式
           */

          public void start() {
              // CAS 鎖機(jī)制保證必須 shutdown 后才能再次start
              if (started.compareAndSet(falsetrue)) {
                  this.timer = new Timer("ScheduleMessageTimerThread"true);
                  for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                      Integer level = entry.getKey();
                      Long timeDelay = entry.getValue();
                      Long offset = this.offsetTable.get(level);
                      if (null == offset) {
                          offset = 0L;
                      }

                      if (timeDelay != null) {
                          // 定時執(zhí)行延遲消息處理任務(wù)
                          this.timer.schedule(
                              new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                      }
                  }
                  // 每隔10s,將延遲消息的相關(guān)信息持久化到硬盤中
                  this.timer.scheduleAtFixedRate(new TimerTask() {

                      @Override
                      public void run() {
                          try {
                              if (started.get()) ScheduleMessageService.this.persist();
                          } catch (Throwable e) {
                              log.error("scheduleAtFixedRate flush exception", e);
                          }
                      }
                  }, 10000this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
              }
          }

          在這個線程中,主要做了兩件事:

          1. 遍歷所有的延遲級別,為每個延遲級別在延遲FIRST_DELAY_TIME毫秒后就處理延遲消息的投遞操作
          2. 開啟執(zhí)久化定時任務(wù):定時將延遲消息的相關(guān)信息持久化到硬盤中

          3.1 投遞操作

          處理延遲消息的投遞任務(wù)為DeliverDelayedMessageTimerTask#run方法,代碼如下:

          public void run() {
              try {
                  if (isStarted()) {
                      this.executeOnTimeup();
                  }
              } catch (Exception e) {
                  // XXX: warn and notify me
                  log.error("ScheduleMessageService, executeOnTimeup exception", e);
                  ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                      this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
              }
          }

          在這個方法中,調(diào)用了executeOnTimeup()方法繼續(xù)操作,我們再進(jìn)入ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup方法:

          public void executeOnTimeup() {
              // 獲得一條隊列
              ConsumeQueue cq =
                  ScheduleMessageService.this.defaultMessageStore
                      .findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                          delayLevel2QueueId(delayLevel));

              long failScheduleOffset = offset;

              if (cq != null) {
                  SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                  if (bufferCQ != null) {
                      try {
                          long nextOffset = offset;
                          int i = 0;
                          ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                          for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                              long offsetPy = bufferCQ.getByteBuffer().getLong();
                              int sizePy = bufferCQ.getByteBuffer().getInt();
                              long tagsCode = bufferCQ.getByteBuffer().getLong();

                              if (cq.isExtAddr(tagsCode)) {
                                  if (cq.getExt(tagsCode, cqExtUnit)) {
                                      tagsCode = cqExtUnit.getTagsCode();
                                  } else {
                                      log.error(...);
                                      // 1. 消息的寫入的時間
                                      long msgStoreTime = defaultMessageStore.getCommitLog()
                                          .pickupStoreTimestamp(offsetPy, sizePy);
                                      // 2. 計算投遞時間,投遞時間 = 消息寫入時間 + 延遲級別對應(yīng)的時間
                                      tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                                  }
                              }

                              // 處理投遞時間,保證投遞時間必須小于(當(dāng)前時間 + 延遲級別對應(yīng)的時間)
                              long now = System.currentTimeMillis();
                              long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                              nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                              long countdown = deliverTimestamp - now;

                              // 小于等于0,表示消費需要投遞
                              if (countdown <= 0) {
                                  MessageExt msgExt =
                                      ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                          offsetPy, sizePy);

                                  if (msgExt != null) {
                                      try {
                                          MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                          if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC
                                                  .equals(msgInner.getTopic())) {
                                              log.error(...);
                                              continue;
                                          }
                                          // 3. 投遞操作
                                          PutMessageResult putMessageResult =
                                              ScheduleMessageService.this.writeMessageStore
                                                  .putMessage(msgInner);

                                          if (putMessageResult != null && putMessageResult
                                                  .getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                              continue;
                                          } else {
                                              // XXX: warn and notify me
                                              log.error(...);
                                              ScheduleMessageService.this.timer.schedule(
                                                  new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                      nextOffset), DELAY_FOR_A_PERIOD);
                                              ScheduleMessageService.this.updateOffset(this.delayLevel,
                                                  nextOffset);
                                              return;
                                          }
                                      } catch (Exception e) {
                                          log.error(...);
                                      }
                                  }
                              } else {
                                  // 4. 安排下一次執(zhí)行,執(zhí)行時間為 countdown 毫秒后
                                  ScheduleMessageService.this.timer.schedule(
                                      new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                                      countdown);
                                  // 5. 更新偏移量
                                  ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                                  return;
                              }
                          }
                          // 之后再執(zhí)行
                          nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                          ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                              this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                          // 更新偏移量
                          ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                          return;
                      } finally {

                          bufferCQ.release();
                      }
                  }
                  else {

                      long cqMinOffset = cq.getMinOffsetInQueue();
                      if (offset < cqMinOffset) {
                          failScheduleOffset = cqMinOffset;
                          log.error(...);
                      }
                  }
              }

              ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                  failScheduleOffset), DELAY_FOR_A_WHILE);
          }

          這個方法雖然有點長,但邏輯很清晰,執(zhí)行過程如下:

          1. 獲取消息寫入時間,就是寫入到commitLog的時間
          2. 計算投遞時間,投遞時間 = 消息寫入時間 + 延遲級別對應(yīng)的時間,如果當(dāng)前時間大于等于投遞時間,就表示消息需要進(jìn)行投遞操作
          3. 如果消息滿足投遞時間,就進(jìn)行投遞操作,所謂的投遞操作,就是將消息寫入到真正的topicqueueId的隊列中
          4. 如果當(dāng)前消息不滿足投遞時間,就表明該隊列上之后的消息也不會投遞時間,就計算投遞時間與當(dāng)前時間的差值,這個差值就是下次執(zhí)行executeOnTimeup()方法的時間
          5. 更新偏移量,就是記錄當(dāng)前隊列的消費位置

          我們來看看偏移量的更新操作,進(jìn)入ScheduleMessageService#updateOffset方法:

          public class ScheduleMessageService extends ConfigManager {
              private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

              private static final long FIRST_DELAY_TIME = 1000L;
              private static final long DELAY_FOR_A_WHILE = 100L;
              private static final long DELAY_FOR_A_PERIOD = 10000L;

              /** 延遲級別對應(yīng)的延遲時間,單位為毫秒 */
              private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
                  new ConcurrentHashMap<Integer, Long>(32);

              /** 延遲級別對應(yīng)的偏移量 */
              private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
                  new ConcurrentHashMap<Integer, Long>(32);

              private final DefaultMessageStore defaultMessageStore;
              private final AtomicBoolean started = new AtomicBoolean(false);
              private Timer timer;
              private MessageStore writeMessageStore;
              private int maxDelayLevel;

              ...

              /**
               * 更新偏移量的操作
               */

              private void updateOffset(int delayLevel, long offset) {
                  this.offsetTable.put(delayLevel, offset);
              }

              ...
          }

          可以看到,這里的更新偏移量,就是將當(dāng)前延遲級別消費位置的偏移量添加到offsetTable中進(jìn)行保存。

          3.2 持久化

          讓我們回到``ScheduleMessageService#start`方法,這個方法中開啟了一個持久化任務(wù):

          this.timer.scheduleAtFixedRate(new TimerTask() {

              @Override
              public void run() {
                  try {
                      if (started.get()) ScheduleMessageService.this.persist();
                  } catch (Throwable e) {
                      log.error("scheduleAtFixedRate flush exception", e);
                  }
              }
          }, 10000this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());

          該任務(wù)會定期執(zhí)行ConfigManager#persist方法進(jìn)行持久化操作:

          public synchronized void persist() {
              String jsonString = this.encode(true);
              if (jsonString != null) {
                  String fileName = this.configFilePath();
                  try {
                      MixAll.string2File(jsonString, fileName);
                  } catch (IOException e) {
                      log.error("persist file " + fileName + " exception", e);
                  }
              }
          }

          這個方法主要進(jìn)行了兩個操作:

          1. 調(diào)用this.encode(true)得到json字符串
          2. json字符串寫入到文件中

          這個json字符串是個啥呢?我們進(jìn)入ScheduleMessageService#encode(boolean)方法:

          public String encode(final boolean prettyFormat) {
              DelayOffsetSerializeWrapper delayOffsetSerializeWrapper 
                  = new DelayOffsetSerializeWrapper();
              // 這個 offsetTable 就是用來保存消費位置偏移量的
              delayOffsetSerializeWrapper.setOffsetTable(this.offsetTable);
              return delayOffsetSerializeWrapper.toJson(prettyFormat);
          }

          從代碼來看,這個方法就是將ScheduleMessageService#offsetTable序列化成json字符串的, 這個 offsetTable 就是用來保存消費位置偏移量的。由此不難得出這個定時任務(wù)的作用:定期將延遲隊列的消費位置偏移量持久化到文件中。

          4. 總結(jié)

          1. RocketMq支持了18種延遲級別,每個延遲級別對應(yīng)不同的延遲時間
          2. 延遲消息對應(yīng)著一個topic,每個延遲級別都對應(yīng)著該topic下的一個隊列
          3. 當(dāng)broker收到延遲消息后,會將該消息放入到延遲級別對應(yīng)的延遲消息中
          4. 消息投遞由定時線程執(zhí)行,當(dāng)消息達(dá)到投遞時間后,會從延遲隊列中寫入到真正需要投遞的隊列中

          客觀來說,開源版 RocketMq 的延遲消息比較簡陋,僅支持18種延遲級別,而阿里云版可指定發(fā)送時間。


          限于作者個人水平,文中難免有錯誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。

          本文首發(fā)于微信公眾號 「Java技術(shù)探秘」,如果您喜歡本文,歡迎關(guān)注該公眾號,讓我們一起在技術(shù)的世界里探秘吧!


          瀏覽 41
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  偷拍乱码在线一区二区 | 久久无码电影 | 免费三级黄色 | 污片网站| 久久久久久久久久久网站 |