<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Rocketmq源碼分析10:consumer 啟動(dòng)流程

          共 21796字,需瀏覽 44分鐘

           ·

          2021-04-28 00:28

          注:本系列源碼分析基于RocketMq 4.8.0,gitee倉(cāng)庫(kù)鏈接:https://gitee.com/funcy/rocketmq.git.

          前面分析了producer發(fā)送消息的流程,本文我們來(lái)分析consumer消費(fèi)消息的流程。

          consumer消費(fèi)消息的demoorg.apache.rocketmq.example.simple.PushConsumer,代碼如下:

          public class PushConsumer {

              public static void main(String[] args) 
                      throws InterruptedException, MQClientException 
          {
                  String nameServer = "localhost:9876";
                  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
                  consumer.setNamesrvAddr(nameServer);
                  consumer.subscribe("TopicTest""*");
                  consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                  //wrong time format 2017_0422_221800
                  consumer.setConsumeTimestamp("20181109221800");
                  // 注冊(cè)監(jiān)聽(tīng)器,監(jiān)聽(tīng)消息
                  consumer.registerMessageListener(new MessageListenerConcurrently() {
                      @Override
                      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                              ConsumeConcurrentlyContext context)
           
          {
                          // 這里獲得了消息
                          System.out.printf("%s Receive New Messages: %s %n"
                              Thread.currentThread().getName(), msgs);
                          return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                      }
                  });
                  // 啟動(dòng)
                  consumer.start();
                  System.out.printf("Consumer Started.%n");
              }
          }

          consumer使用起來(lái)還是挺簡(jiǎn)單的,先是創(chuàng)建了一個(gè)DefaultMQPushConsumer對(duì)象,然后配置了一些屬性,比較關(guān)鍵的就是注冊(cè)消息監(jiān)聽(tīng)器(在這個(gè)監(jiān)聽(tīng)器里會(huì)獲取消息),之后就調(diào)用start()方法啟動(dòng)consumer.

          接下來(lái)我們就來(lái)分析這塊的消費(fèi)過(guò)程。

          1. 構(gòu)造方法:DefaultMQPushConsumer

          consumer的處理類為DefaultMQPushConsumer,我們先來(lái)看看DefaultMQPushConsumer的構(gòu)造方法:

          public DefaultMQPushConsumer(final String consumerGroup) {
              // 這里指定了隊(duì)列分配策略
              this(null, consumerGroup, nullnew AllocateMessageQueueAveragely());
          }

          public DefaultMQPushConsumer(final String namespace, final String consumerGroup, 
                  RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy)
           
          {
              this.consumerGroup = consumerGroup;
              this.namespace = namespace;
              this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
              defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
          }

          在構(gòu)造方法中,就只是做了一些成員變量的賦值操作,比較關(guān)鍵的是分配消息隊(duì)列的策略:allocateMessageQueueStrategy,如果指定,默認(rèn)就使用AllocateMessageQueueAveragely,即從各隊(duì)列平均獲取消息。

          2. 啟動(dòng)consumerDefaultMQPushConsumer#start

          consumer的啟動(dòng)方法為DefaultMQPushConsumer#start,代碼如下:

          public void start() throws MQClientException {
              setConsumerGroup(
                  NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
              // 啟動(dòng)
              this.defaultMQPushConsumerImpl.start();
              // 消息軌跡相關(guān)內(nèi)容,我們不關(guān)注
              if (null != traceDispatcher) {
                  ...
              }
          }

          繼續(xù)進(jìn)入DefaultMQPushConsumerImpl#start

          public synchronized void start() throws MQClientException {
              switch (this.serviceState) {
                  case CREATE_JUST:
                      log.info(...);
                      this.serviceState = ServiceState.START_FAILED;

                      this.checkConfig();

                      this.copySubscription();

                      if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                          this.defaultMQPushConsumer.changeInstanceNameToPID();
                      }
                      // 客戶端
                      this.mQClientFactory = MQClientManager.getInstance()
                          .getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
                      // 設(shè)置負(fù)載均衡相關(guān)屬性
                      this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                      this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                      this.rebalanceImpl.setAllocateMessageQueueStrategy(
                          this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                      this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

                      this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,
                          this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                      this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

                      if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                          this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                      } else {
                          // 消息模式:廣播模式存在本地,集群模式存在遠(yuǎn)程(broker)
                          switch (this.defaultMQPushConsumer.getMessageModel()) {
                              case BROADCASTING:
                                  this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, 
                                      this.defaultMQPushConsumer.getConsumerGroup());
                                  break;
                              case CLUSTERING:
                                  this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, 
                                      this.defaultMQPushConsumer.getConsumerGroup());
                                  break;
                              default:
                                  break;
                          }
                          this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                      }
                      // 加載消費(fèi)信息的偏移量
                      this.offsetStore.load();
                      // 根據(jù)客戶端實(shí)例化不同的consumeMessageService:順序消息與并發(fā)消息
                      if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                          this.consumeOrderly = true;
                          this.consumeMessageService = new ConsumeMessageOrderlyService(this
                              (MessageListenerOrderly) this.getMessageListenerInner());
                      } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                          this.consumeOrderly = false;
                          this.consumeMessageService = new ConsumeMessageConcurrentlyService(this
                              (MessageListenerConcurrently) this.getMessageListenerInner());
                      }

                      this.consumeMessageService.start();
                      // 注冊(cè)消費(fèi)組
                      boolean registerOK = mQClientFactory.registerConsumer(
                          this.defaultMQPushConsumer.getConsumerGroup(), this);
                      if (!registerOK) {
                          this.serviceState = ServiceState.CREATE_JUST;
                          this.consumeMessageService.shutdown(
                              defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                          throw new MQClientException(...);
                      }
                      // 啟動(dòng)
                      mQClientFactory.start();
                      log.info(...);
                      this.serviceState = ServiceState.RUNNING;
                      break;
                  case RUNNING:
                  case START_FAILED:
                  case SHUTDOWN_ALREADY:
                      throw new MQClientException(...);
                  default:
                      break;
              }

              // 更新 topic 的信息,從nameServer獲取數(shù)據(jù)
              this.updateTopicSubscribeInfoWhenSubscriptionChanged();
              this.mQClientFactory.checkClientInBroker();
              // 發(fā)送心跳,發(fā)送到所有的broker
              this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
              // 負(fù)載均衡
              this.mQClientFactory.rebalanceImmediately();
          }

          這個(gè)方法比較長(zhǎng),整個(gè)consumer的啟動(dòng)流程都在這里了,咱們挑重點(diǎn)說(shuō),來(lái)總結(jié)下這個(gè)方法做了什么。

          1. 獲取客戶端mQClientFactory,類型為org.apache.rocketmq.client.impl.factory.MQClientInstance,如果對(duì)producer還有印象的話,我們就會(huì)發(fā)現(xiàn),producer中的mQClientFactory的類型也是它
          2. 區(qū)分廣播模式與集群模式的offsetStore,所謂的offsetStore,就是一存儲(chǔ)器,用來(lái)存儲(chǔ)當(dāng)前消費(fèi)者消費(fèi)信息的偏移量。在廣播模式中,該偏移量保存在本地文件中,而在集群模式中,該偏移量保存在遠(yuǎn)程broker中,廣播模式與集群模式,我們后面再詳細(xì)分析
          3. 根據(jù)客戶端實(shí)例化不同的consumeMessageService,這里用來(lái)區(qū)分順序消息與并發(fā)消息,依然是后面再分析
          4. 啟動(dòng)mQClientFactory,也就是啟動(dòng)客戶端
          5. 更新topic信息、發(fā)送心跳信息到broker、處理負(fù)載均衡功能

          以上就是DefaultMQPushConsumerImpl#start方法所做的的主要工作了。實(shí)際上,上面的123點(diǎn)都是一些配置工作,這些配置對(duì)應(yīng)的服務(wù)是在mQClientFactory.start()方法中啟動(dòng)的,我們繼續(xù)。

          3. 啟動(dòng)mQClientFactoryMQClientInstance#start

          我們來(lái)看看mQClientFactory的啟動(dòng)流程,進(jìn)入MQClientInstance#start

          public void start() throws MQClientException {
              synchronized (this) {
                  switch (this.serviceState) {
                      case CREATE_JUST:
                          this.serviceState = ServiceState.START_FAILED;
                          // 獲取 nameServer 的地址
                          if (null == this.clientConfig.getNamesrvAddr()) {
                              this.mQClientAPIImpl.fetchNameServerAddr();
                          }
                          // 啟動(dòng)客戶端的遠(yuǎn)程服務(wù),這個(gè)方法會(huì)配置netty客戶端
                          this.mQClientAPIImpl.start();
                          // 啟動(dòng)定時(shí)任務(wù)
                          this.startScheduledTask();
                          // pull服務(wù),僅對(duì)consumer啟作用
                          this.pullMessageService.start();
                          // 啟動(dòng)負(fù)載均衡服務(wù),僅對(duì)consumer啟作用
                          this.rebalanceService.start();
                          // 啟用內(nèi)部的 producer
                          this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                          log.info("the client factory [{}] start OK"this.clientId);
                          this.serviceState = ServiceState.RUNNING;
                          break;
                      case START_FAILED:
                          throw new MQClientException(...);
                      default:
                          break;
                  }
              }
          }

          producer的啟動(dòng)過(guò)程中,也會(huì)調(diào)用這個(gè)方法,前面我們已經(jīng)分析過(guò)了一波了,這次我們?cè)?code style="overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(145, 109, 213);font-weight: bolder;background-image: none;background-position: initial;background-size: initial;background-repeat: initial;background-attachment: initial;background-origin: initial;background-clip: initial;">consumer的角度再來(lái)分析這個(gè)方法。

          該方法所做的工作如下:

          1. 獲取 nameServer 的地址
          2. 啟動(dòng)客戶端的遠(yuǎn)程服務(wù),這個(gè)方法會(huì)配置netty客戶端
          3. 啟動(dòng)定時(shí)任務(wù)
          4. 啟動(dòng)拉取消息服務(wù)
          5. 啟動(dòng)負(fù)載均衡服務(wù)

          上面的12producer的流程并無(wú)區(qū)別,就不再分析了,我們來(lái)看看定時(shí)任務(wù)的啟動(dòng),進(jìn)入方法MQClientInstance#startScheduledTask

          private void startScheduledTask() {
              ...

              // 持久化消費(fèi)者的消費(fèi)偏移量,每5秒一次
              this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                  @Override
                  public void run() {
                      try {
                          MQClientInstance.this.persistAllConsumerOffset();
                      } catch (Exception e) {
                          log.error("ScheduledTask persistAllConsumerOffset exception", e);
                      }
                  }
              }, 1000 * 10this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

              // 省略其他定時(shí)任務(wù)
              ...
          }

          這個(gè)方法中還啟動(dòng)了其他一些定時(shí)任務(wù),這里我們重點(diǎn)關(guān)注執(zhí)行MQClientInstance#persistAllConsumerOffset()方法的定時(shí)任務(wù),該定時(shí)任務(wù)會(huì)持久化當(dāng)前消費(fèi)者消費(fèi)消息的偏移量,在本節(jié)我們先對(duì)這個(gè)定時(shí)任務(wù)有個(gè)印象,在分析偏移量持久化一節(jié)再詳細(xì)分析持久化流程。

          我們?cè)倩氐?code style="overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(145, 109, 213);font-weight: bolder;background-image: none;background-position: initial;background-size: initial;background-repeat: initial;background-attachment: initial;background-origin: initial;background-clip: initial;">MQClientInstance#start的流程,第4與第5步,主要是啟動(dòng)了兩個(gè)服務(wù):pullMessageServicerebalanceService,這個(gè)類的信息如下:

          /**
           * PullMessageService
           */

          public class PullMessageService extends ServiceThread {
              ...
          }

          /**
           * RebalanceService
           */

          public class RebalanceService extends ServiceThread {
              ...
          }

          這兩個(gè)類都是ServiceThread的子類,這兩個(gè)類的start()方法也都是來(lái)自于ServiceThread

          public abstract class ServiceThread implements Runnable {

              // 省略其他代碼
              ...

              /**
               * start() 方法
               */

              public void start() {
                  log.info(...);
                  if (!started.compareAndSet(falsetrue)) {
                      return;
                  }
                  stopped = false;
                  this.thread = new Thread(this, getServiceName());
                  this.thread.setDaemon(isDaemon);
                  this.thread.start();
              }
          }

          從代碼來(lái)看,ServiceThread實(shí)現(xiàn)了Runnable接口,在其start()方法中,啟動(dòng)了一個(gè)線程,線程的執(zhí)行邏輯正是來(lái)自于其子類的run()方法,因此我們要看pullMessageServicerebalanceServicestart()方法執(zhí)行邏輯,只需要看對(duì)應(yīng)類的run()方法即可。

          到此為止,consumer的啟動(dòng)就已經(jīng)完成了,各項(xiàng)服務(wù)也啟動(dòng)起來(lái)了,而consumer拉取消息也正是由這些服務(wù)的配合處理的,接下來(lái)我們就來(lái)分析這些服務(wù)做了什么。

          限于篇幅,本文就先到這里了,下篇繼續(xù)。


          限于作者個(gè)人水平,文中難免有錯(cuò)誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。

          本文首發(fā)于微信公眾號(hào) 「Java技術(shù)探秘」,如果您喜歡本文,歡迎關(guān)注該公眾號(hào),讓我們一起在技術(shù)的世界里探秘吧!


          瀏覽 39
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩性做爰免费A片AA片 | 免费在线日韩 | 小早川怜子爆乿护士中文 | 国产日韩无码视频 | 午夜欧美性爱视频 |