<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Rocketmq源碼分析06:broker 消息分發(fā)流程

          共 19366字,需瀏覽 39分鐘

           ·

          2021-04-22 21:34

          注:本系列源碼分析基于RocketMq 4.8.0,gitee倉庫鏈接:https://gitee.com/funcy/rocketmq.git.

          RocketMq消息處理整個流程如下:

          1. 消息接收:消息接收是指接收producer的消息,處理類是SendMessageProcessor,將消息寫入到commigLog文件后,接收流程處理完畢;
          2. 消息分發(fā):broker處理消息分發(fā)的類是ReputMessageService,它會啟動一個線程,不斷地將commitLong分到到對應的consumerQueue,這一步操作會寫兩個文件:consumerQueueindexFile,寫入后,消息分發(fā)流程處理 完畢;
          3. 消息投遞:消息投遞是指將消息發(fā)往consumer的流程,consumer會發(fā)起獲取消息的請求,broker收到請求后,調(diào)用PullMessageProcessor類處理,從consumerQueue文件獲取消息,返回給consumer后,投遞流程處理完畢。

          以上就是rocketMq處理消息的流程了,接下來我們就從源碼來分析消息分發(fā)的實現(xiàn)。

          1. 分發(fā)線程的啟動

          消息寫入到commitlog后,接著broker會對這些消息進行分發(fā)操作,這里的分發(fā),是指broker將消息寫入到consumerQueue文件中。

          broker消息分發(fā)的操作是在一個單獨的線程中進行的,這里我們來回憶下BrokerController的啟動流程,進入BrokerController#start方法:

          public void start() throws Exception {
              // 啟動各組件
              if (this.messageStore != null) {
                  this.messageStore.start();
              }
              ...
          }

          繼續(xù)進入DefaultMessageStore#start方法:

          public void start() throws Exception {
              ...
              // 處理 maxPhysicalPosInLogicQueue 的值
              long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
              for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
                  for (ConsumeQueue logic : maps.values()) {
                      if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
                          maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
                      }
                  }
              }
              if (maxPhysicalPosInLogicQueue < 0) {
                  maxPhysicalPosInLogicQueue = 0;
              }
              if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
                  maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
              }
              this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
              // 消息分發(fā)操作,啟動新線程來處理
              this.reputMessageService.start();
              ...
          }

          BrokerController啟動時,會處理maxPhysicalPosInLogicQueue的值,這個值就是分發(fā)commitlog消息的偏移量,之后就啟動ReputMessageService服務(wù)來處理。ReputMessageServiceDefaultMessageStore的內(nèi)部類,它是ServiceThread的子類,start()方法如下:

          public abstract class ServiceThread implements Runnable {
              public void start() {
                  if (!started.compareAndSet(falsetrue)) {
                      return;
                  }
                  stopped = false;
                  this.thread = new Thread(this, getServiceName());
                  this.thread.setDaemon(isDaemon);
                  this.thread.start();
              }
              ...
          }

          這個方法僅僅是處理線程的啟動,我們繼續(xù)看ServiceThreadServiceThreadRunnable的子類,它的run()方法如下:

          class ReputMessageService extends ServiceThread {
              @Override
              public void run() {
                  DefaultMessageStore.log.info(...);

                  while (!this.isStopped()) {
                      try {
                          Thread.sleep(1);
                          // 調(diào)用的是 doReput() 方法
                          this.doReput();
                      } catch (Exception e) {
                          DefaultMessageStore.log.warn(...);
                      }
                  }

                  DefaultMessageStore.log.info(...);
              }
          }

          ReputMessageService#run()方法來看,該線程會休眠1ms,然后調(diào)用doReput()方法處理,看來doReput()方法就是關(guān)鍵了!

          2. 消息分發(fā):DefaultMessageStore.ReputMessageService#doReput

          我們進入DefaultMessageStore.ReputMessageService#doReput方法:

          private void doReput() {
              // 處理 reputFromOffset
              if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
                  log.warn(...);
                  this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
              }
              for (boolean doNext = truethis.isCommitLogAvailable() && doNext; ) {

                  if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                          && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                      break;
                  }

                  // 從CommitLog中獲取需要進行轉(zhuǎn)發(fā)的消息
                  SelectMappedBufferResult result 
                      = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                  if (result != null) {
                      try {
                          this.reputFromOffset = result.getStartOffset();

                          for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                              // 檢驗數(shù)據(jù)
                              DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog
                                  .checkMessageAndReturnSize(result.getByteBuffer(), falsefalse);
                              int size = dispatchRequest.getBufferSize() == -1 
                                  ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

                              if (dispatchRequest.isSuccess()) {
                                  if (size > 0) {
                                      // 分發(fā)消息
                                      DefaultMessageStore.this.doDispatch(dispatchRequest);
                                      // 長輪詢:如果有消息到了主節(jié)點,并且開啟了長輪詢
                                      if (BrokerRole.SLAVE != DefaultMessageStore.this
                                              .getMessageStoreConfig().getBrokerRole()
                                              &&DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){
                                          // 調(diào)用NotifyMessageArrivingListener的arriving方法
                                          DefaultMessageStore.this.messageArrivingListener.arriving(
                                              dispatchRequest.getTopic(),
                                              dispatchRequest.getQueueId(), 
                                              dispatchRequest.getConsumeQueueOffset() + 1,
                                              dispatchRequest.getTagsCode(), 
                                              dispatchRequest.getStoreTimestamp(),
                                              dispatchRequest.getBitMap(), 
                                              dispatchRequest.getPropertiesMap());
                                      }

                                      ...
                                  } else if (size == 0) {
                                      ...
                                  }
                              } else if (!dispatchRequest.isSuccess()) {
                                  ...
                              }
                          }
                      } finally {
                          result.release();
                      }
                  } else {
                      doNext = false;
                  }
              }
          }

          該方法依舊很長,我們重點關(guān)注與分發(fā)相關(guān)的流程:

          1. commitLog.getData(...):從CommitLog中獲取DispatchRequest需要分發(fā)的消息,參數(shù)reputFromOffset就是消息在文件中的偏移量
          2. this.doDispatch(...):分發(fā)操作,就是把消息的相關(guān)寫入ConsumeQueueIndexFile兩個文件中
          3. 如果當前節(jié)點為主節(jié)點,且啟用了長輪詢,則調(diào)用NotifyMessageArrivingListenerarriving方法,在這里會把消息主動投遞到consumer

          總的來說,當消息寫入到commitLog后,ReputMessage會根據(jù)上一次分發(fā)消息的偏移量依次從commitLog文件中讀取消息信息,寫入到ConsumeQueueIndexFile兩個文件中,當然了,這里寫入的只是消息的發(fā)送時間、在commitLog中的位置信息,完整的消息只有commitLog文件才存在。

          寫完這兩個文件后,接下來就等待consumer來拉取消息了。當然,consumer主動來拉取可能會導致消息無法實時送達,為解決這個問題,rocketMq給出的解決方案是長輪詢,具體為:如果當前沒有消息,就holdconsumer的請求30s,這30s內(nèi)一旦有消息過來,就及時喚醒consumer的請求,實際將消息發(fā)送出去,就也是NotifyMessageArrivingListener#arriving方法所做的工作,關(guān)于這點我們在分析consumer拉取消息時再詳細分析。

          我們再來看看消息分發(fā)消息,進入DefaultMessageStore#doDispatch

          public class DefaultMessageStore implements MessageStore {

              private final LinkedList<CommitLogDispatcher> dispatcherList;

              /**
               * DefaultMessageStore 構(gòu)造方法
               */

              public DefaultMessageStore(...) throws IOException {
                  ...
                  // 消息分發(fā)處理
                  this.dispatcherList = new LinkedList<>();
                  // 寫入 ConsumeQueue 文件
                  this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
                  // 寫入 Index 文件
                  this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
                  ...
              }

              /**
               * 分發(fā)操作
               */

              public void doDispatch(DispatchRequest req) {
                  // 進行分發(fā)操作,dispatcherList 包含兩個對象:
                  // 1. CommitLogDispatcherBuildConsumeQueue:寫入 ConsumeQueue 文件
                  // 2. CommitLogDispatcherBuildIndex:寫入 Index 文件
                  for (CommitLogDispatcher dispatcher : this.dispatcherList) {
                      dispatcher.dispatch(req);
                  }
              }
          }

          從整個方法的運行來看,DefaultMessageStore在創(chuàng)建時,會準備兩個CommitLogDispatcher

          • CommitLogDispatcherBuildConsumeQueue:處理ConsumeQueue文件的寫入
          • CommitLogDispatcherBuildIndex:處理IndexFile文件的寫入

          DefaultMessageStore#doDispatch方法中,就是對這兩個文件的寫入操作了:

          /**
           * consumerQueue 文件分發(fā)的構(gòu)建器
           */

          class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

              @Override
              public void dispatch(DispatchRequest request) {
                  final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
                  switch (tranType) {
                      case MessageSysFlag.TRANSACTION_NOT_TYPE:
                      case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                          // 將消息在commitLog文件的位置、tags等信息寫入ConsumerQueue文件
                          DefaultMessageStore.this.putMessagePositionInfo(request);
                          break;
                      case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                      case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                          break;
                  }
              }
          }

          /**
           * indexFile 文件分發(fā)的構(gòu)建器
           */

          class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

              @Override
              public void dispatch(DispatchRequest request) {
                  if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
                      DefaultMessageStore.this.indexService.buildIndex(request);
                  }
              }
          }

          需要注意的是,在這兩個文件中,寫入的僅是消息的位置信息,完整的消息內(nèi)容僅在commitLog中保存。

          3. 總結(jié)

          本文主要分析了broker消息分發(fā)分發(fā),這里說的分發(fā)流程,是指broker將消息寫入到consumerQueue文件的流程。

          broker啟動時,會啟動一個專門的線程:ReputMessageService,該線程會不停地從comsumer獲取消息,然后將其寫入到consumerQueue文件與IndexFile文件中。

          當消息分發(fā)到consumerQueue文件后,接著consumer就可以很方便地從各隊列中獲取消息了,下一篇我們來分析broker是如何響應consumer獲取消息請求的。


          限于作者個人水平,文中難免有錯誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。

          本文首發(fā)于微信公眾號 Java技術(shù)探秘,如果您喜歡本文,歡迎關(guān)注該公眾號,讓我們一起在技術(shù)的世界里探秘吧!

          - END -


          瀏覽 68
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  在线视频天堂网 | 久久色在线视频 | 麻豆影院Av | 亚洲在线大香蕉 | A毛黄片|