<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Rocketmq源碼分析08:producer 啟動(dòng)流程

          共 27614字,需瀏覽 56分鐘

           ·

          2021-04-22 21:33

          注:本系列源碼分析基于RocketMq 4.8.0,gitee倉(cāng)庫(kù)鏈接:https://gitee.com/funcy/rocketmq.git.

          本文我們來(lái)分析rocketMq producer 發(fā)送消息的流程.

          producer發(fā)送消息的示例在org.apache.rocketmq.example.simple.Producer類(lèi)中,代碼如下:

          public class Producer {
              public static void main(String[] args) 
                      throws MQClientException, InterruptedException 
          {
                  String nameServer = "localhost:9876";
                  // 1. 創(chuàng)建 DefaultMQProducer 對(duì)象
                  DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
                  producer.setNamesrvAddr(nameServer);
                  // 2. 啟動(dòng) producer
                  producer.start();
                  for (int i = 0; i < 1; i++)
                      try {
                          Message msg = new Message("TopicTest",
                              "TagA",
                              "OrderID188",
                              "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                          // 3. 發(fā)送消息    
                          SendResult sendResult = producer.send(msg);
                          System.out.printf("%s%n", sendResult);
                      } catch (Exception e) {
                          e.printStackTrace();
                      }
                  producer.shutdown();
              }
          }

          以上代碼分三步走:

          1. 創(chuàng)建 DefaultMQProducer 對(duì)象
          2. 啟動(dòng) producer
          3. 發(fā)送消息

          接下來(lái)我們的分析也按這三步進(jìn)行。

          1. DefaultMQProducer構(gòu)造方法

          DefaultMQProducer構(gòu)造方法代碼如下:

          public DefaultMQProducer(final String producerGroup) {
              // 繼續(xù)調(diào)用
              this(null, producerGroup, null);
          }


          /**
           * 最終調(diào)用的構(gòu)造方法
           */

          public DefaultMQProducer(final String namespace, 
                  final String producerGroup, RPCHook rpcHook)
           
          {
              this.namespace = namespace;
              this.producerGroup = producerGroup;
              defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
          }

          這個(gè)方法就是簡(jiǎn)單地賦了值,然后創(chuàng)建了DefaultMQProducerImpl實(shí)例,我們繼續(xù)看DefaultMQProducerImpl的構(gòu)造方法:

          public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
              this.defaultMQProducer = defaultMQProducer;
              this.rpcHook = rpcHook;
              // 異步發(fā)送的隊(duì)列
              this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
              // 處理異步發(fā)送的線(xiàn)程池
              this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
                      Runtime.getRuntime().availableProcessors(),
                      Runtime.getRuntime().availableProcessors(),
                      1000 * 60,
                      TimeUnit.MILLISECONDS,
                      this.asyncSenderThreadPoolQueue,
                  new ThreadFactory() {
                      private AtomicInteger threadIndex = new AtomicInteger(0);

                      @Override
                      public Thread newThread(Runnable r) {
                          return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
                      }
                  });
          }

          這個(gè)構(gòu)造方法依然還是處理賦值操作,并沒(méi)做什么實(shí)質(zhì)性?xún)?nèi)容,就不繼續(xù)深究了。

          2. DefaultMQProducer#start:?jiǎn)?dòng)producer

          接著我們來(lái)看看producer的啟動(dòng)流程,進(jìn)入DefaultMQProducer#start方法:

          public void start() throws MQClientException {
              this.setProducerGroup(withNamespace(this.producerGroup));
              // 調(diào)用 defaultMQProducerImpl 的 start() 方法
              this.defaultMQProducerImpl.start();
              // 消息軌跡相關(guān),我們不關(guān)注
              if (null != traceDispatcher) {
                  ...
              }
          }

          這個(gè)方法先是調(diào)用了defaultMQProducerImpl#start方法,然后處理消息軌跡相關(guān)操作,關(guān)于rocketMq消息軌跡相關(guān)內(nèi)容,本文就不過(guò)多探討了,我們將目光聚集于DefaultMQProducerImpl#start(boolean)方法:

          public void start(final boolean startFactory) throws MQClientException {
              switch (this.serviceState) {
                  case CREATE_JUST:
                      this.serviceState = ServiceState.START_FAILED;
                      // 檢查一些配置信息
                      this.checkConfig();
                      // 修改當(dāng)前的 instanceName 為當(dāng)前進(jìn)程id
                      if (!this.defaultMQProducer.getProducerGroup()
                              .equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                          this.defaultMQProducer.changeInstanceNameToPID();
                      }
                      // 獲取mq實(shí)例
                      this.mQClientFactory = MQClientManager.getInstance()
                          .getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                      // 注冊(cè) mqClient 實(shí)例
                      boolean registerOK = mQClientFactory.registerProducer(
                          this.defaultMQProducer.getProducerGroup(), this);
                      if (!registerOK) {
                          this.serviceState = ServiceState.CREATE_JUST;
                          throw new MQClientException(...);
                      }

                      this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), 
                              new TopicPublishInfo());
                      // 啟動(dòng)實(shí)例
                      if (startFactory) {
                          mQClientFactory.start();
                      }

                      log.info(...);
                      this.serviceState = ServiceState.RUNNING;
                      break;
                  case RUNNING:
                  case START_FAILED:
                  case SHUTDOWN_ALREADY:
                      throw new MQClientException(...);
                  default:
                      break;
              }

              // 發(fā)送心跳到所有的broker
              this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
              // 定時(shí)掃描異步請(qǐng)求的返回結(jié)果
              this.timer.scheduleAtFixedRate(new TimerTask() {
                  @Override
                  public void run() {
                      try {
                          RequestFutureTable.scanExpiredRequest();
                      } catch (Throwable e) {
                          log.error("scan RequestFutureTable exception", e);
                      }
                  }
              }, 1000 * 31000);
          }

          這個(gè)方法并不復(fù)雜相關(guān)內(nèi)容都已經(jīng)作了注釋?zhuān)@里重點(diǎn)提出3個(gè)方法:

          1. mQClientFactory.start():執(zhí)行方法為MQClientInstance#start,這個(gè)方法里會(huì)啟動(dòng)一些組件,我們稍后會(huì)分析。
          2. mQClientFactory.sendHeartbeatToAllBrokerWithLock():發(fā)送心跳到所有的broker,最終執(zhí)行的方法為MQClientAPIImpl#sendHearbeat
            public int sendHearbeat(
                final String addr,
                final HeartbeatData heartbeatData,
                final long timeoutMillis
            )
             throws RemotingException, MQBrokerException, InterruptedException 
            {
                // request 的 code 為 HEART_BEAT
                RemotingCommand request = RemotingCommand
                    .createRequestCommand(RequestCode.HEART_BEAT, null);
                request.setLanguage(clientConfig.getLanguage());
                request.setBody(heartbeatData.encode());
                // 異步調(diào)用
                RemotingCommand response = this.remotingClient
                    .invokeSync(addr, request, timeoutMillis);
                assert response != null;
                switch (response.getCode()) {
                    case ResponseCode.SUCCESS: {
                        return response.getVersion();
                    }
                    default:
                        break;
                }

                throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
            }
            這里是與broker通信,requestcodeHEART_BEAT,后面的分析中我們會(huì)看到,producer也會(huì)同nameServer通信。
          3. 定時(shí)掃描異步請(qǐng)求的返回結(jié)果:最終執(zhí)行的方法為RequestFutureTable.scanExpiredRequest(),關(guān)于該方法的內(nèi)容,我們?cè)诜治?code style="overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(145, 109, 213);font-weight: bolder;background-image: none;background-position: initial;background-size: initial;background-repeat: initial;background-attachment: initial;background-origin: initial;background-clip: initial;">producer發(fā)送異步消息時(shí)再分析。

          2.1 MQClientInstance#start:?jiǎn)?dòng)MQClientInstance

          接下來(lái)我們來(lái)看看MQClientInstance的啟動(dòng),方法為MQClientInstance#start,代碼如下:

          public void start() throws MQClientException {

              synchronized (this) {
                  switch (this.serviceState) {
                      case CREATE_JUST:
                          this.serviceState = ServiceState.START_FAILED;
                          // 獲取 nameServer 的地址
                          if (null == this.clientConfig.getNamesrvAddr()) {
                              this.mQClientAPIImpl.fetchNameServerAddr();
                          }
                          // 啟動(dòng)遠(yuǎn)程服務(wù),這個(gè)方法只是裝配了netty客戶(hù)端相關(guān)配置
                          // 注意:1. 這里是netty客戶(hù)端,2. 這里并沒(méi)有創(chuàng)建連接
                          this.mQClientAPIImpl.start();
                          // 啟動(dòng)定時(shí)任務(wù)
                          this.startScheduledTask();
                          // pull服務(wù),僅對(duì)consumer啟作用
                          this.pullMessageService.start();
                          // 啟動(dòng)負(fù)載均衡服務(wù),僅對(duì)consumer啟作用
                          this.rebalanceService.start();
                          // 啟用內(nèi)部的 producer
                          this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                          log.info("the client factory [{}] start OK"this.clientId);
                          this.serviceState = ServiceState.RUNNING;
                          break;
                      case START_FAILED:
                          throw new MQClientException(...);
                      default:
                          break;
                  }
              }
          }

          這個(gè)方法進(jìn)行的操作在注釋中已經(jīng)說(shuō)明得很清楚了,接下來(lái)我們對(duì)以上的部分操作做進(jìn)一步分析。

          1. mQClientAPIImpl.start():配置netty客戶(hù)端

          這里調(diào)用的是NettyRemotingClient#start方法,代碼如下:

           @Override
          public void start() {
              this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
                  nettyClientConfig.getClientWorkerThreads(),
                  new ThreadFactory() {
                      ...
                  });
              // 這里使用的是Bootstrap而非ServerBootstrap,表示這是netty客戶(hù)端
              Bootstrap handler = this.bootstrap
                  .group(this.eventLoopGroupWorker)
                  .channel(NioSocketChannel.class)
                  .option(...)
                  // 省略各種option
                  .handler(new ChannelInitializer<SocketChannel>() 
          {
                      @Override
                      public void initChannel(SocketChannel ch) throws Exception {
                          ChannelPipeline pipeline = ch.pipeline();
                          // 省略pipeline的裝配
                          ...
                      }
                  });

              this.timer.scheduleAtFixedRate(new TimerTask() {
                  @Override
                  public void run() {
                      ...
                  }
              }, 1000 * 31000);

              if (this.channelEventListener != null) {
                  this.nettyEventExecutor.start();
              }
          }

          對(duì)于這個(gè)方法,說(shuō)明有兩個(gè)點(diǎn):

          1. 方法里使用的是Bootstrap而非ServerBootstrap,表示這是netty客戶(hù)端
          2. 整個(gè)方法中并沒(méi)有創(chuàng)建連接

          2. startScheduledTask():?jiǎn)?dòng)定時(shí)任務(wù)

          啟動(dòng)定時(shí)任務(wù)的方法為MQClientInstance#startScheduledTask,代碼如下:

          private void startScheduledTask() {
              if (null == this.clientConfig.getNamesrvAddr()) {
                  // 定時(shí)獲取 nameServer 的地址
                  this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                      @Override
                      public void run() {
                          try {
                              MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                          } catch (Exception e) {
                              log.error("ScheduledTask fetchNameServerAddr exception", e);
                          }
                      }
                  }, 1000 * 101000 * 60 * 2, TimeUnit.MILLISECONDS);
              }

              // 定時(shí)更新topic的路由信息
              this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                  @Override
                  public void run() {
                      try {
                          MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                      } catch (Exception e) {
                          log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                      }
                  }
              }, 10this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

              // 定時(shí)發(fā)送心跳信息
              this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                  @Override
                  public void run() {
                      try {
                          MQClientInstance.this.cleanOfflineBroker();
                          MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                      } catch (Exception e) {
                          log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                      }
                  }
              }, 1000this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

              // 持久化消息者的消費(fèi)偏移量,可以放在本地文件,也可以推送到 broker
              this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                  @Override
                  public void run() {
                      try {
                          MQClientInstance.this.persistAllConsumerOffset();
                      } catch (Exception e) {
                          log.error("ScheduledTask persistAllConsumerOffset exception", e);
                      }
                  }
              }, 1000 * 10this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

              // 調(diào)整線(xiàn)程池的線(xiàn)程數(shù)量,并沒(méi)有用上
              this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                  @Override
                  public void run() {
                      try {
                          MQClientInstance.this.adjustThreadPool();
                      } catch (Exception e) {
                          log.error("ScheduledTask adjustThreadPool exception", e);
                      }
                  }
              }, 11, TimeUnit.MINUTES);
          }

          這里共有5個(gè)定時(shí)任務(wù):

          1. 定時(shí)獲取 nameServer 的地址,MQClientInstance#start一開(kāi)始會(huì)調(diào)用MQClientAPIImpl#fetchNameServerAddr獲取nameServer,這里也調(diào)用了這個(gè)方法
          2. 定時(shí)更新topic的路由信息,這里會(huì)去nameServer獲取路由信息,之后再分析
          3. 定時(shí)發(fā)送心跳信息到nameServer,在DefaultMQProducerImpl#start(boolean)中,我們也提到了向nameServer發(fā)送心跳信息,兩處調(diào)用的是同一個(gè)方法
          4. 持久化消費(fèi)者的消費(fèi)偏移量,這個(gè)僅對(duì)消費(fèi)者consumer有效,后面分析消費(fèi)者時(shí)再作分析
          5. 調(diào)整線(xiàn)程池的線(xiàn)程數(shù)量,不過(guò)追蹤到最后,發(fā)現(xiàn)這個(gè)并沒(méi)有生效,就不多說(shuō)了

          這里我們重點(diǎn)來(lái)看topic路由信息的獲取,我們經(jīng)過(guò)對(duì)MQClientInstance#updateTopicRouteInfoFromNameServer()的一路追蹤,我們來(lái)到了MQClientAPIImpl#getTopicRouteInfoFromNameServer(...)

          public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
                  boolean allowTopicNotExist)
           throws MQClientException, InterruptedException, 
                  RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException 
          {
              GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
              requestHeader.setTopic(topic);
              // 發(fā)送請(qǐng)求的 code 為 GET_ROUTEINFO_BY_TOPIC
              RemotingCommand request = RemotingCommand
                  .createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
              RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
              assert response != null;
              switch (response.getCode()) {
                  ...
                  case ResponseCode.SUCCESS: {
                      byte[] body = response.getBody();
                      if (body != null) {
                          return TopicRouteData.decode(body, TopicRouteData.class);
                      }
                  }
                  ...
              }
              ...
          }

          這里發(fā)送向NameServer發(fā)送消息的codeGET_ROUTEINFO_BY_TOPIC,這點(diǎn)在前面分析nameServer的消息處理時(shí)也分析過(guò)了,并且還分析了當(dāng)消息送達(dá)nameServer后,nameServer是如何返回topic數(shù)據(jù)的,遺忘的小伙伴可以看下之前分析nameServer的文章。

          限于篇幅,本文就先到這里了,本文主要是分析producer啟動(dòng)流程,下一篇文章將分析消息發(fā)送流程。


          限于作者個(gè)人水平,文中難免有錯(cuò)誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。

          本文首發(fā)于微信公眾號(hào) 「Java技術(shù)探秘」,如果您喜歡本文,歡迎關(guān)注該公眾號(hào),讓我們一起在技術(shù)的世界里探秘吧!


          瀏覽 43
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  成人性爱在线观看 | 日逼视频网站免费观看 | 激情五月丁香色婷婷 | 日本va在线观看 日本va中文字幕 | 在线欧美视频 |