<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Rocketmq源碼分析12:consumer 負載均衡

          共 25136字,需瀏覽 51分鐘

           ·

          2021-04-28 00:29

          注:本系列源碼分析基于RocketMq 4.8.0,gitee倉庫鏈接:https://gitee.com/funcy/rocketmq.git.

          接上文,繼續(xù)分析consumer消費流程。

          5. 如何選擇消息隊列:RebalanceService

          讓我們回到PullMessageService#run()方法:

          public class PullMessageService extends ServiceThread {

              ...

              private final LinkedBlockingQueue<PullRequest> pullRequestQueue 
                  = new LinkedBlockingQueue<PullRequest>();

              /**
               * 將 pullRequest 放入 pullRequestQueue 中
               */

              public void executePullRequestImmediately(final PullRequest pullRequest) {
                  try {
                      this.pullRequestQueue.put(pullRequest);
                  } catch (InterruptedException e) {
                      log.error("executePullRequestImmediately pullRequestQueue.put", e);
                  }
              }

              @Override
              public void run() {
                  log.info(this.getServiceName() + " service started");

                  while (!this.isStopped()) {
                      try {
                          // 從 pullRequestQueue 獲取一個 pullRequest,阻塞的方式
                          PullRequest pullRequest = this.pullRequestQueue.take();
                          this.pullMessage(pullRequest);
                      } catch (InterruptedException ignored) {
                      } catch (Exception e) {
                          log.error("Pull Message Service Run Method exception", e);
                      }
                  }

                  log.info(this.getServiceName() + " service end");
              }

              ...
          }

          PullMessageService線程獲得了pullRequest后,然后就開始了一次又一次的拉起消息的操作,那這個pullRequest最初是在哪里添加進來的呢?這就是本節(jié)要分析的「負載均衡」功能了。

          處理負載均衡的線程為RebalanceService,它是在MQClientInstance#start方法中啟動的,我們直接進入其run()方法:

          public class RebalanceService extends ServiceThread {

              // 省略其他
              ...

              @Override
              public void run() {
                  log.info(this.getServiceName() + " service started");

                  while (!this.isStopped()) {
                      this.waitForRunning(waitInterval);
                      this.mqClientFactory.doRebalance();
                  }

                  log.info(this.getServiceName() + " service end");
              }
          }

          在它的run()方法中,僅是調(diào)用了MQClientInstance#doRebalance方法,我們繼續(xù)進入:

          public void doRebalance() {
              // consumerTable 存放的就是當前 consumer
              for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
                  MQConsumerInner impl = entry.getValue();
                  if (impl != null) {
                      try {
                          impl.doRebalance();
                      } catch (Throwable e) {
                          log.error("doRebalance exception", e);
                      }
                  }
              }
          }

          MQClientInstance#doRebalance方法中,會遍歷所有的consumer,然后調(diào)用DefaultMQPushConsumerImpl#doRebalance方法作進一步的處理,consumerTable就是用來保存DefaultMQPushConsumerImpl實例的,繼續(xù)進入DefaultMQPushConsumerImpl#doRebalance方法:

          @Override
          public void doRebalance() {
              if (!this.pause) {
                  this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
              }
          }

          繼續(xù)跟進,來到RebalanceImpl#doRebalance方法:

          public void doRebalance(final boolean isOrder) {
              Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
              if (subTable != null) {
                  for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                      final String topic = entry.getKey();
                      try {
                          // 客戶端負載均衡:根據(jù)主題來處理負載均衡
                          this.rebalanceByTopic(topic, isOrder);
                      } catch (Throwable e) {
                          if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                              log.warn("rebalanceByTopic Exception", e);
                          }
                      }
                  }
              }

              this.truncateMessageQueueNotMyTopic();
          }

          /**
           * 這就是最張?zhí)幚碡撦d均衡的地方了
           */

          private void rebalanceByTopic(final String topic, final boolean isOrder) {
              switch (messageModel) {
                  // 廣播模式:不需要處理負載均衡,每個消費者都要消費,只需要更新負載信息
                  case BROADCASTING: {
                      Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                      if (mqSet != null) {
                          // 更新負載均衡信息,這里傳入的參數(shù)是mqSet,即所有隊列
                          boolean changed = this
                              .updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                          if (changed) {
                              this.messageQueueChanged(topic, mqSet, mqSet);
                              log.info(...);
                          }
                      } else {
                          log.warn(...);
                      }
                      break;
                  }
                  // 集群模式
                  case CLUSTERING: {
                      // 根據(jù)訂閱的主題獲取消息隊列
                      Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                      // 客戶端id,根據(jù) topic 與 consumerGroup 獲取所有的 consumerId
                      List<String> cidAll = this.mQClientFactory
                          .findConsumerIdList(topic, consumerGroup);
                      if (null == mqSet) {
                          if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                              log.warn(...);
                          }
                      }

                      if (null == cidAll) {
                          log.warn(...);
                      }

                      if (mqSet != null && cidAll != null) {
                          List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                          mqAll.addAll(mqSet);
                          // 排序后才能保證消費者負載策略相對穩(wěn)定
                          Collections.sort(mqAll);
                          Collections.sort(cidAll);
                          // MessageQueue 的負載策略
                          AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                          List<MessageQueue> allocateResult = null;
                          try {
                              // 按負載策略進行分配,返回當前消費者實際訂閱的messageQueue集合
                              allocateResult = strategy.allocate(
                                  this.consumerGroup,
                                  this.mQClientFactory.getClientId(),
                                  mqAll,
                                  cidAll);
                          } catch (Throwable e) {
                              log.error(...);
                              return;
                          }

                          Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                          if (allocateResult != null) {
                              allocateResultSet.addAll(allocateResult);
                          }

                          // 更新負載均衡信息,傳入?yún)?shù)是 allocateResultSet,即當前consumer分配到的隊列
                          boolean changed = this
                              .updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                          if (changed) {
                              log.info(...);
                              this.messageQueueChanged(topic, mqSet, allocateResultSet);
                          }
                      }
                      break;
                  }
                  default:
                      break;
              }
          }

          RebalanceImpl#rebalanceByTopic方法就是最終處理負載均衡的方法了,在這個方法里會區(qū)分廣播模式與集群模式的處理。

          在廣播模式下,一條消息會被同一個消費組中的所有consumer消費,而集群模式下,一條消息只會被同一個消費組下的一個consumer消費。

          正是因為如此,廣播模式下并沒有負載均衡可言,直接把所有的隊列都分配給當前consumer處理,然后更新QueueTable的負載均衡信息;而集群模式會先分配當前consumer消費的消息隊列,再更新QueueTable的負載均衡信息。

          這里我們來看看集群模式,看看它的操作:

          1. strategy.allocate(...):按負載均衡策略為當前consumer分配隊列
          2. updateProcessQueueTableInRebalance(...):更新負載均衡信息。

          rocketMq中,提供了這些負載均衡策略:

          • AllocateMessageQueueAveragely:平均負載策略,rocketMq默認使用的策略
          • AllocateMessageQueueAveragelyByCircle:環(huán)形平均分配,這個和平均分配唯一的區(qū)別就是,再分隊列的時候,平均隊列是將屬于自己的MessageQueue全部拿走,而環(huán)形平均則是,一人拿一個,拿到的Queue不是連續(xù)的。
          • AllocateMessageQueueByConfig:用戶自定義配置
          • AllocateMessageQueueByMachineRoom:同機房負載策略,這個策略就是當前Consumer只負載處在指定的機房內(nèi)的MessageQueuebrokerName的命名必須要按要求的格式來設置:機房名@brokerName
          • AllocateMachineRoomNearby:就近機房負載策略,在AllocateMessageQueueByMachineRoom策略中,如果同一機房中只有MessageQueue而沒有consumer,那這個MessageQueue上的消息該如何消費呢?AllocateMachineRoomNearby就是擴充了該功能的處理
          • AllocateMessageQueueConsistentHash:一致性哈希策略

          這里我們重點來分析平均負載策略AllocateMessageQueueAveragely

          public List<MessageQueue> allocate(String consumerGroup, String currentCID, 
                  List<MessageQueue> mqAll, List<String> cidAll)
           
          {
              // 返回值        
              List<MessageQueue> result = new ArrayList<MessageQueue>();

              // 省略一些判斷操作
              ...


              int index = cidAll.indexOf(currentCID);
              int mod = mqAll.size() % cidAll.size();
              // 1. 消費者數(shù)量大于隊列數(shù)量:averageSize = 1
              // 2. 消費者數(shù)量小于等于隊列數(shù)量:averageSize = 隊列數(shù)量 / 消費者數(shù)量,還要處理個+1的操作
              int averageSize = mqAll.size() <= cidAll.size() 
                  ? 1 : (mod > 0 && index < mod 
                      ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
              int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
              int range = Math.min(averageSize, mqAll.size() - startIndex);
              for (int i = 0; i < range; i++) {
                  result.add(mqAll.get((startIndex + i) % mqAll.size()));
              }
              return result;
          }

          這個方法中,關鍵的分配方法就在后面幾行,如果只看代碼,會感覺有點暈,這里我舉一個例子來簡單解釋下:

          假設:messageQueue一共有6個,consumer有4個,當前consumerindex為1,有了這些前提后,接下來我們就來看它的分配過程了。

          1. 計算取余操作:6 % 4 = 2,這表明messageQueue不能平均分配給每個consumer,接下來就來看看這個余數(shù)2是如何處理的

          2. 計算每個consumer平均處理的messageQueue數(shù)量

            消費者索引0123
            處理數(shù)量2211
            • 這里需要注意,如果consumer數(shù)量大于messageQueue數(shù)量,那每個consumer最多只會分配到一個messageQueue,這種情況下,余數(shù)2不會進行處理,并且有的consumer處理的messageQueue數(shù)量為0,同一個messageQueue不會同時被兩個及以上的consumer消費掉
            • 這里的messageQueue數(shù)量為6,consumer為4,計算得到每個consumer處理的隊列數(shù)最少為1,除此之外,為了實現(xiàn)“平均”,有2個consumer會需要多處理1個messageQueue,按“平均”的分配原則,如果index小于mod,則會分配多1個messageQueue,這里的mod為2,結果如下:
          3. 分配完每個consumer處理的messageQueue數(shù)量后,這些messageQueue該如何分配呢?從代碼來看,分配時會先分配完一個consumer,再分配下一個consumer,最終結果就是這樣:

            隊列Q0Q1Q2Q3Q4Q5
            消費者C1C1C2C2C4C5

          從圖中可以看到,在6個messageQueue、4個consumer、當前consumerindex為1的情況下,當前consumer會分到2個隊列,分別為Q2/Q3.

          messageQueue分配完成后,接下來就是更新負載信息了,方法為RebalanceImpl#updateProcessQueueTableInRebalance

          private boolean updateProcessQueueTableInRebalance(final String topic, 
                  final Set<MessageQueue> mqSet, final boolean isOrder)
           
          {

              ...

              List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
              for (MessageQueue mq : mqSet) {
                  if (!this.processQueueTable.containsKey(mq)) {
                      if (isOrder && !this.lock(mq)) {
                          log.warn(...);
                          continue;
                      }

                      this.removeDirtyOffset(mq);
                      ProcessQueue pq = new ProcessQueue();
                      long nextOffset = this.computePullFromWhere(mq);
                      if (nextOffset >= 0) {
                          ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                          if (pre != null) {
                              log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                          } 
                          // pullRequest 最初產(chǎn)生的地方:mq 不存在,就添加
                          else {
                              log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                              // 添加 pullRequest
                              PullRequest pullRequest = new PullRequest();
                              pullRequest.setConsumerGroup(consumerGroup);
                              pullRequest.setNextOffset(nextOffset);
                              pullRequest.setMessageQueue(mq);
                              pullRequest.setProcessQueue(pq);
                              pullRequestList.add(pullRequest);
                              changed = true;
                          }
                      } else {
                          log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                      }
                  }
              }

              // 發(fā)布
              this.dispatchPullRequest(pullRequestList);

              return changed;
          }

          這個方法中最最關鍵的就是pullRequestList的添加操作了:先遍歷傳入的MessageQueue,如果當前consumer沒有消費過該messageQueue,則添加一個新的pullRequestpullRequestList,之后就是發(fā)布pullRequestList了。

          看到這里,我們就應該能明白,最初的pullRequest就是在這里產(chǎn)生的,而發(fā)布pullRequestList的操作,就是將pullRequest丟給pullMessageService線程處理了:

          /**
           * RebalancePushImpl#dispatchPullRequest:發(fā)布pullRequest的操作
           */

          public void dispatchPullRequest(List<PullRequest> pullRequestList) {
              for (PullRequest pullRequest : pullRequestList) {
                  // 在這里執(zhí)行pullRequest,其實就是把 pullRequest 添加到
                  // PullMessageService#pullRequestQueue 中
                  this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
                  log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
              }
          }

          限于篇幅,本文就先到這里了,下篇繼續(xù)。


          限于作者個人水平,文中難免有錯誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權,非商業(yè)轉(zhuǎn)載請注明出處。

          本文首發(fā)于微信公眾號 「Java技術探秘」,如果您喜歡本文,歡迎關注該公眾號,讓我們一起在技術的世界里探秘吧!


          瀏覽 17
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  午夜欧美成人三级 | 草草地址线路①屁屁影院成人 | 北条麻妃乱伦 | 免费看抠逼 | 精品国产精品国产精品 |