<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Rocketmq源碼分析04:broker 啟動(dòng)流程

          共 53386字,需瀏覽 107分鐘

           ·

          2021-04-22 21:34

          注:本系列源碼分析基于RocketMq 4.8.0,gitee倉庫鏈接:https://gitee.com/funcy/rocketmq.git.

          前面我們已經(jīng)分析完了NameServer,從本文開始,我們將分析Broker

          1. 啟動(dòng)入口

          broker的啟動(dòng)類為org.apache.rocketmq.broker.BrokerStartup,代碼如下:

          public class BrokerStartup {
              ...

              public static void main(String[] args) {
                  start(createBrokerController(args));
              }
              ...
          }

          main()方法中,僅有一行代碼,這行代碼包含了兩個(gè)操作:

          • createBrokerController(...):創(chuàng)建BrokerController
          • start(...):啟動(dòng)Broker

          接下來我們就來分析這兩個(gè)操作。

          2. 創(chuàng)建BrokerController

          創(chuàng)建BrokerController的方法為BrokerStartup#createBrokerController,代碼如下:

          /**
           * 創(chuàng)建 broker 的配置參數(shù)
           * @param args
           * @return
           */

          public static BrokerController createBrokerController(String[] args) {
              ...

              try {
                  //解析命令行參數(shù)
                  Options options = ServerUtil.buildCommandlineOptions(new Options());
                  commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
                      new PosixParser());
                  if (null == commandLine) {
                      System.exit(-1);
                  }

                  // 處理配置
                  final BrokerConfig brokerConfig = new BrokerConfig();
                  final NettyServerConfig nettyServerConfig = new NettyServerConfig();
                  final NettyClientConfig nettyClientConfig = new NettyClientConfig();

                  // tls安全相關(guān)
                  nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
                      String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
                  // 配置端口
                  nettyServerConfig.setListenPort(10911);
                  // 消息存儲(chǔ)的配置
                  final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();

                  ...
                  // 將命令行中的配置設(shè)置到brokerConfig對象中
                  MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);

                  // 檢查環(huán)境變量:ROCKETMQ_HOME
                  if (null == brokerConfig.getRocketmqHome()) {
                      System.out.printf("Please set the %s variable in your environment to match 
                          the location of the RocketMQ installation"
          , MixAll.ROCKETMQ_HOME_ENV);
                      System.exit(-2);
                  }

                  //省略一些配置
                  ...

                  // 創(chuàng)建 brokerController
                  final BrokerController controller = new BrokerController(
                      brokerConfig,
                      nettyServerConfig,
                      nettyClientConfig,
                      messageStoreConfig);
                  controller.getConfiguration().registerConfig(properties);
                  // 初始化
                  boolean initResult = controller.initialize();
                  if (!initResult) {
                      controller.shutdown();
                      System.exit(-3);
                  }
                  // 關(guān)閉鉤子,在關(guān)閉前處理一些操作
                  Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                      private volatile boolean hasShutdown = false;
                      private AtomicInteger shutdownTimes = new AtomicInteger(0);

                      @Override
                      public void run() {
                          synchronized (this) {
                              if (!this.hasShutdown) {
                                  ...
                                  // 這里會(huì)發(fā)一條注銷消息給nameServer
                                  controller.shutdown();
                                  ...
                              }
                          }
                      }
                  }, "ShutdownHook"));

                  return controller;
              } catch (Throwable e) {
                  e.printStackTrace();
                  System.exit(-1);
              }

              return null;
          }

          這個(gè)方法的代碼有點(diǎn)長,但功能并不多,總的來說就三個(gè)功能:

          1. 處理配置:主要是處理nettyServerConfignettyClientConfig的配置,這塊就是一些配置解析的操作,處理方式與NameServer很類似,這里就不多說了。
          2. 創(chuàng)建及初始化controller:調(diào)用方法controller.initialize(),這塊內(nèi)容我們后面分析。
          3. 注冊關(guān)閉鉤子:調(diào)用Runtime.getRuntime().addShutdownHook(...),可以在jvm進(jìn)程關(guān)閉前進(jìn)行一些操作。

          2.1 controller實(shí)例化

          BrokerController的創(chuàng)建及初始化是在BrokerStartup#createBrokerController方法中進(jìn)行,我們先來看看它的構(gòu)造方法:

          public BrokerController(
              final BrokerConfig brokerConfig,
              final NettyServerConfig nettyServerConfig,
              final NettyClientConfig nettyClientConfig,
              final MessageStoreConfig messageStoreConfig
          )
           
          {
              // 4個(gè)核心配置信息
              this.brokerConfig = brokerConfig;
              this.nettyServerConfig = nettyServerConfig;
              this.nettyClientConfig = nettyClientConfig;
              this.messageStoreConfig = messageStoreConfig;
              // 管理consumer消費(fèi)消息的offset
              this.consumerOffsetManager = new ConsumerOffsetManager(this);
              // 管理topic配置
              this.topicConfigManager = new TopicConfigManager(this);
              // 處理 consumer 拉消息請求的
              this.pullMessageProcessor = new PullMessageProcessor(this);
              this.pullRequestHoldService = new PullRequestHoldService(this);
              // 消息送達(dá)的監(jiān)聽器
              this.messageArrivingListener 
                  = new NotifyMessageArrivingListener(this.pullRequestHoldService);
              ...
              // 往外發(fā)消息的組件
              this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
              ...
          }

          BrokerController的構(gòu)造方法很長,基本都是一些賦值操作,代碼中已列出關(guān)鍵項(xiàng),這些包括:

          • 核心配置賦值:主要是brokerConfig/nettyServerConfig/nettyClientConfig/messageStoreConfig四個(gè)配置
          • ConsumerOffsetManager:管理consumer消費(fèi)消息位置的偏移量,偏移量表示消費(fèi)者組消費(fèi)該topic消息的位置,后面再消費(fèi)時(shí),就從該位置后消費(fèi),避免重復(fù)消費(fèi)消息,也避免了漏消費(fèi)消息。
          • topicConfigManagertopic配置管理器,就是用來管理topic配置的,如topic名稱,topic隊(duì)列數(shù)量
          • pullMessageProcessor:消息處理器,用來處理消費(fèi)者拉消息
          • messageArrivingListener:消息送達(dá)的監(jiān)聽器,當(dāng)生產(chǎn)者的消息送達(dá)時(shí),由該監(jiān)聽器監(jiān)聽
          • brokerOuterAPI:往外發(fā)消息的組件,如向NameServer發(fā)送注冊/注銷消息

          以上這些組件的用處,這里先混個(gè)臉熟,我們后面再分析。

          2.2 初始化controller

          我們再來看看初始化操作,方法為BrokerController#initialize

          public boolean initialize() throws CloneNotSupportedException {
              // 加載配置文件中的配置
              boolean result = this.topicConfigManager.load();
              result = result && this.consumerOffsetManager.load();
              result = result && this.subscriptionGroupManager.load();
              result = result && this.consumerFilterManager.load();

              if (result) {
                  try {
                      // 消息存儲(chǔ)管理組件,管理磁盤上的消息
                      this.messageStore =
                          new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, 
                              this.messageArrivingListener, this.brokerConfig);
                      // 啟用了DLeger,就創(chuàng)建DLeger相關(guān)組件
                      if (messageStoreConfig.isEnableDLegerCommitLog()) {
                          ...
                      }
                      // broker統(tǒng)計(jì)組件
                      this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
                      //load plugin
                      MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, 
                          brokerStatsManager, messageArrivingListener, brokerConfig);
                      this.messageStore = MessageStoreFactory.build(context, this.messageStore);
                      this.messageStore.getDispatcherList().addFirst(
                          new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
                  } catch (IOException e) {
                      result = false;
                      log.error("Failed to initialize", e);
                  }
              }
              // 加載磁盤上的記錄,如commitLog寫入的位置、消費(fèi)者主題/隊(duì)列的信息
              result = result && this.messageStore.load();

              if (result) {
                  // 處理 nettyServer
                  this.remotingServer = new NettyRemotingServer(
                      this.nettyServerConfig, this.clientHousekeepingService);
                  NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
                  fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
                  this.fastRemotingServer = new NettyRemotingServer(
                      fastConfig, this.clientHousekeepingService);

                  // 創(chuàng)建線程池start... 這里會(huì)創(chuàng)建多種類型的線程池
                  ...
                  // 處理consumer pull操作的線程池
                  this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
                      this.brokerConfig.getPullMessageThreadPoolNums(),
                      this.brokerConfig.getPullMessageThreadPoolNums(),
                      1000 * 60,
                      TimeUnit.MILLISECONDS,
                      this.pullThreadPoolQueue,
                      new ThreadFactoryImpl("PullMessageThread_"));
                  ...
                  // 創(chuàng)建線程池end...

                  // 注冊處理器
                  this.registerProcessor();

                  // 啟動(dòng)定時(shí)任務(wù)start... 這里會(huì)啟動(dòng)好多的定時(shí)任務(wù)
                  ...
                  // 定時(shí)將consumer消費(fèi)到的offset進(jìn)行持久化操作,即將數(shù)據(jù)保存到磁盤上
                  this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                      @Override
                      public void run() {
                          try {
                              BrokerController.this.consumerOffsetManager.persist();
                          } catch (Throwable e) {
                              log.error("schedule persist consumerOffset error.", e);
                          }
                      }
                  }, 1000 * 10this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
                  ...
                  // 啟動(dòng)定時(shí)任務(wù)end...

                  ...
                  // 開啟 DLeger 的一些操作
                  if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                      ...
                  }
                  // 處理tls配置
                  if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
                      ...
                  }
                  // 初始化一些操作
                  initialTransaction();
                  initialAcl();
                  initialRpcHooks();
              }
              return result;
          }

          這個(gè)還是很長,關(guān)鍵部分都做了注釋,該方法所做的工作如下:

          1. 加載配置文件中的配置
          2. 賦值與初始化操作
          3. 創(chuàng)建線程池
          4. 注冊處理器
          5. 啟動(dòng)定時(shí)任務(wù)

          這里我們來看下注冊處理器的操作this.registerProcessor():

          1. 注冊處理器:BrokerController#registerProcessor

          this.registerProcessor()實(shí)際調(diào)用的方法是BrokerController#registerProcessor,代碼如下:

          public void registerProcessor() {
              /**
               * SendMessageProcessor
               */

              SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
              sendProcessor.registerSendMessageHook(sendMessageHookList);
              sendProcessor.registerConsumeMessageHook(consumeMessageHookList);

              this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, 
                  this.sendMessageExecutor);
              this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor,  
                  this.sendMessageExecutor);
              this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, 
                  this.sendMessageExecutor);
              this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, 
                  this.sendMessageExecutor);
              ...

              /**
               * PullMessageProcessor
               */

              this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, 
                  this.pullMessageExecutor);
              this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);

              /**
                  * ReplyMessageProcessor
                  */

              ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
              replyMessageProcessor.registerSendMessageHook(sendMessageHookList);

              ...
          }

          這個(gè)方法里注冊了許許多多的處理器,這里僅列出了與消息相關(guān)的內(nèi)容,如發(fā)送消息、回復(fù)消息、拉取消息等,后面在處理producer/consumer的消息時(shí),就會(huì)用到這些處理器,這里先不展開分析。

          2. remotingServer注冊處理器:NettyRemotingServer#registerProcessor

          我們來看下remotingServer注冊處理器的操作,方法為NettyRemotingServer#registerProcessor

          public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {

              ...

              @Override
              public void registerProcessor(int requestCode, NettyRequestProcessor processor, 
                      ExecutorService executor)
           
          {
                  ExecutorService executorThis = executor;
                  if (null == executor) {
                      executorThis = this.publicExecutor;
                  }

                  Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, 
                          ExecutorService>(processor, executorThis);
                  this.processorTable.put(requestCode, pair);
              }

              ...
          }

          最終,這些處理器注冊到了processorTable中,它是NettyRemotingAbstract的成員變量,定義如下:

          HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>>

          這是一個(gè)hashMap的結(jié)構(gòu),keycodevaluePair,該類中有兩個(gè)成員變量:NettyRequestProcessorExecutorServicecodeNettyRequestProcessor的映射關(guān)系就是在hashMap里存儲(chǔ)的。

          2.3 注冊關(guān)閉鉤子:Runtime.getRuntime().addShutdownHook(...)

          接著我們來看看注冊關(guān)閉鉤子的操作:

          // 關(guān)閉鉤子,在關(guān)閉前處理一些操作
          Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
              private volatile boolean hasShutdown = false;
              private AtomicInteger shutdownTimes = new AtomicInteger(0);

              @Override
              public void run() {
                  synchronized (this) {
                      if (!this.hasShutdown) {
                          ...
                          // 這里會(huì)發(fā)一條注銷消息給nameServer
                          controller.shutdown();
                          ...
                      }
                  }
              }
          }, "ShutdownHook"));

          跟進(jìn)BrokerController#shutdown方法:

          public void shutdown() {
              // 調(diào)用各組件的shutdown方法
              ...

              // 發(fā)送注銷消息到NameServer
              this.unregisterBrokerAll();
              ...
              // 持久化consumer的消費(fèi)偏移量
              this.consumerOffsetManager.persist();

              // 又是調(diào)用各組件的shutdown方法
              ...

          這個(gè)方法里會(huì)調(diào)用各組件的shutdown()方法、發(fā)送注銷消息給NameServer、持久化consumer的消費(fèi)偏移量,這里我們主要看發(fā)送注銷消息的方法BrokerController#unregisterBrokerAll:

          private void unregisterBrokerAll() {
              // 發(fā)送一條注銷消息給nameServer
              this.brokerOuterAPI.unregisterBrokerAll(
                  this.brokerConfig.getBrokerClusterName(),
                  this.getBrokerAddr(),
                  this.brokerConfig.getBrokerName(),
                  this.brokerConfig.getBrokerId());
          }

          繼續(xù)進(jìn)入BrokerOuterAPI#unregisterBrokerAll

          public void unregisterBrokerAll(
              final String clusterName,
              final String brokerAddr,
              final String brokerName,
              final long brokerId
          )
           
          {
              // 獲取所有的 nameServer,遍歷發(fā)送注銷消息
              List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
              if (nameServerAddressList != null) {
                  for (String namesrvAddr : nameServerAddressList) {
                      try {
                          this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);
                          log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);
                      } catch (Exception e) {
                          log.warn("unregisterBroker Exception, {}", namesrvAddr, e);
                      }
                  }
              }
          }

          這個(gè)方法里,會(huì)獲取到所有的nameServer,然后逐個(gè)發(fā)送注銷消息,繼續(xù)進(jìn)入BrokerOuterAPI#unregisterBroker方法:

          public void unregisterBroker(
              final String namesrvAddr,
              final String clusterName,
              final String brokerAddr,
              final String brokerName,
              final long brokerId
          )
           throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, 
                  InterruptedException, MQBrokerException 
          {
              UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
              requestHeader.setBrokerAddr(brokerAddr);
              requestHeader.setBrokerId(brokerId);
              requestHeader.setBrokerName(brokerName);
              requestHeader.setClusterName(clusterName);
              // 發(fā)送的注銷消息:RequestCode.UNREGISTER_BROKER
              RemotingCommand request = RemotingCommand.createRequestCommand(
                      c, requestHeader);

              RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);
              assert response != null;
              switch (response.getCode()) {
                  case ResponseCode.SUCCESS: {
                      return;
                  }
                  default:
                      break;
              }

              throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
          }

          最終調(diào)用的是RemotingClient#invokeSync進(jìn)行消息發(fā)送,請求codeRequestCode.UNREGISTER_BROKER,這就與NameServer接收broker的注銷消息對應(yīng)上了。

          3. 啟動(dòng)Brokerstart(...)

          我們再來看看Broker的啟動(dòng)流程,處理方法為BrokerController#start

          public void start() throws Exception {
              // 啟動(dòng)各組件

              // 啟動(dòng)消息存儲(chǔ)相關(guān)組件
              if (this.messageStore != null) {
                  this.messageStore.start();
              }

              // 啟動(dòng) remotingServer,其實(shí)就是啟動(dòng)一個(gè)netty服務(wù),用來接收producer傳來的消息
              if (this.remotingServer != null) {
                  this.remotingServer.start();
              }

              ...

              // broker對外發(fā)放消息的組件,向nameServer上報(bào)存活消息時(shí)使用了它,也是一個(gè)netty服務(wù)
              if (this.brokerOuterAPI != null) {
                  this.brokerOuterAPI.start();
              }

              ...

              // broker 核心的心跳注冊任務(wù)
              this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                  @Override
                  public void run() {
                      try {
                          BrokerController.this.registerBrokerAll(truefalse
                              brokerConfig.isForceRegister());
                      } catch (Throwable e) {
                          log.error("registerBrokerAll Exception", e);
                      }
                  }
                  // brokerConfig.getRegisterNameServerPeriod() 值為 1000 * 30,最終計(jì)算得到默認(rèn)30秒執(zhí)行一次
              }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), 
                      TimeUnit.MILLISECONDS);

              ...

          }

          這個(gè)方法主要就是啟動(dòng)各組件了,這里列出了幾大重要組件的啟動(dòng):

          1. messageStore:消息存儲(chǔ)組件,在這個(gè)組件里,會(huì)啟動(dòng)消息存儲(chǔ)相關(guān)的線程,如消息的投遞操作、commitLog文件的flush操作、comsumeQueue文件的flush操作等
          2. remotingServernetty服務(wù),用來接收請求消息,如producer發(fā)送過來的消息
          3. brokerOuterAPI:也是一個(gè)netty服務(wù),用來對外發(fā)送消息,如向nameServer上報(bào)心跳消息
          4. 啟動(dòng)定時(shí)任務(wù):brokernameServer發(fā)送注冊消息

          這里我們重點(diǎn)來看定時(shí)任務(wù)是如何發(fā)送心跳發(fā)送的。

          處理注冊消息發(fā)送的時(shí)間間隔如下:

          Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)

          這行代碼看著長,但意思就一句話:時(shí)間間隔可以自行配置,但不能小于10s,不能大于60s,默認(rèn)是30s.

          處理消息注冊的方法為BrokerController#registerBrokerAll(...),代碼如下:

          public synchronized void registerBrokerAll(final boolean checkOrderConfig, 
                  boolean oneway, boolean forceRegister)
           
          {
              TopicConfigSerializeWrapper topicConfigWrapper 
                      = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
              // 處理topic相關(guān)配置
              if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
                  || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
                  ...
              }
              // 這里會(huì)判斷是否需要進(jìn)行注冊
              if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
                  this.getBrokerAddr(),
                  this.brokerConfig.getBrokerName(),
                  this.brokerConfig.getBrokerId(),
                  this.brokerConfig.getRegisterBrokerTimeoutMills())) {
                  // 進(jìn)行注冊操作    
                  doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
              }
          }

          這個(gè)方法就是用來處理注冊操作的,不過注冊前會(huì)先驗(yàn)證下是否需要注冊,驗(yàn)證是否需要注冊的方法為BrokerController#needRegister, 代碼如下:

          private boolean needRegister(final String clusterName,
              final String brokerAddr,
              final String brokerName,
              final long brokerId,
              final int timeoutMills)
           
          {

              TopicConfigSerializeWrapper topicConfigWrapper 
                  = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
              // 判斷是否需要進(jìn)行注冊
              List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, 
                  brokerId, topicConfigWrapper, timeoutMills);
              // 有一個(gè)發(fā)生了變化,就表示需要注冊了    
              boolean needRegister = false;
              for (Boolean changed : changeList) {
                  if (changed) {
                      needRegister = true;
                      break;
                  }
              }
              return needRegister;
          }

          這個(gè)方法調(diào)用了brokerOuterAPI.needRegister(...)來判斷broker是否發(fā)生了變化,只要一個(gè)NameServer上發(fā)生了變化,就說明需要進(jìn)行注冊操作。

          brokerOuterAPI.needRegister(...)是如何判斷broker是否發(fā)生了變化的呢?繼續(xù)跟進(jìn)BrokerOuterAPI#needRegister

          public List<Boolean> needRegister(
              final String clusterName,
              final String brokerAddr,
              final String brokerName,
              final long brokerId,
              final TopicConfigSerializeWrapper topicConfigWrapper,
              final int timeoutMills)
           
          {
              final List<Boolean> changedList = new CopyOnWriteArrayList<>();
              // 獲取所有的 nameServer
              List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
              if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
                  final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
                  // 遍歷所有的nameServer,逐一發(fā)送請求
                  for (final String namesrvAddr : nameServerAddressList) {
                      brokerOuterExecutor.execute(new Runnable() {
                          @Override
                          public void run() {
                              try {
                                  QueryDataVersionRequestHeader requestHeader 
                                      = new QueryDataVersionRequestHeader();
                                  ...
                                  // 向nameServer發(fā)送消息,命令是 RequestCode.QUERY_DATA_VERSION
                                  RemotingCommand request = RemotingCommand
                                      .createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
                                  // 把當(dāng)前的 DataVersion 發(fā)到 nameServer     
                                  request.setBody(topicConfigWrapper.getDataVersion().encode());
                                  // 發(fā)請求到nameServer
                                  RemotingCommand response = remotingClient
                                      .invokeSync(namesrvAddr, request, timeoutMills);
                                  DataVersion nameServerDataVersion = null;
                                  Boolean changed = false;
                                  switch (response.getCode()) {
                                      case ResponseCode.SUCCESS: {
                                          QueryDataVersionResponseHeader queryDataVersionResponseHeader =
                                            (QueryDataVersionResponseHeader) response
                                            .decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
                                          changed = queryDataVersionResponseHeader.getChanged();
                                          byte[] body = response.getBody();
                                          if (body != null) {
                                              // 拿到 DataVersion
                                              nameServerDataVersion = DataVersion.decode(body, D
                                                  ataVersion.class);
                                              // 這里是判斷的關(guān)鍵
                                              if (!topicConfigWrapper.getDataVersion()
                                                  .equals(nameServerDataVersion)) {
                                                  changed = true;
                                              }
                                          }
                                          if (changed == null || changed) {
                                              changedList.add(Boolean.TRUE);
                                          }
                                      }
                                      default:
                                          break;
                                  }
                                  ...
                              } catch (Exception e) {
                                  ...
                              } finally {
                                  countDownLatch.countDown();
                              }
                          }
                      });

                  }
                  try {
                      countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
                  } catch (InterruptedException e) {
                      log.error("query dataversion from nameserver countDownLatch await Exception", e);
                  }
              }
              return changedList;
          }

          這個(gè)方法里,先是遍歷所有的nameServer,向每個(gè)nameServer都發(fā)送一條codeRequestCode.QUERY_DATA_VERSION的參數(shù),參數(shù)為當(dāng)前brokerDataVersion,當(dāng)nameServer收到消息后,就返回nameServer中保存的、與當(dāng)前broker對應(yīng)的DataVersion,當(dāng)兩者版本不相等時(shí),就表明當(dāng)前broker發(fā)生了變化,需要重新注冊。

          DataVersion是個(gè)啥呢?它的部分代碼如下:

          public class DataVersion extends RemotingSerializable {
              // 時(shí)間戳
              private long timestamp = System.currentTimeMillis();
              // 計(jì)數(shù)器,可以理解為最近的版本號
              private AtomicLong counter = new AtomicLong(0);

              public void nextVersion() {
                  this.timestamp = System.currentTimeMillis();
                  this.counter.incrementAndGet();
              }

              /**
               * equals 方法,當(dāng) timestamp 與 counter 都相等時(shí),則兩者相等
               */

              @Override
              public boolean equals(final Object o) {
                  if (this == o)
                      return true;
                  if (o == null || getClass() != o.getClass())
                      return false;

                  final DataVersion that = (DataVersion) o;

                  if (timestamp != that.timestamp) {
                      return false;
                  }

                  if (counter != null && that.counter != null) {
                      return counter.longValue() == that.counter.longValue();
                  }

                  return (null == counter) && (null == that.counter);
              }
              ...

          DataVersionequals()方法來看,只有當(dāng)timestampcounter都相等時(shí),兩個(gè)DataVersion對象才相等。那這兩個(gè)值會(huì)在哪里被修改呢?從DataVersion#nextVersion方法的調(diào)用情況來看,引起這兩個(gè)值的變化主要有兩種:

          • broker 上新創(chuàng)建了一個(gè) topic
          • topic的發(fā)了的變化

          在這兩種情況下,DataVersion#nextVersion方法被調(diào)用,從而引起DataVersion的改變。DataVersion改變了,就表明當(dāng)前broker需要向nameServer注冊了。

          讓我們再回到BrokerController#registerBrokerAll(...)方法:

          public synchronized void registerBrokerAll(final boolean checkOrderConfig, 
                  boolean oneway, boolean forceRegister)
           
          {
              ...
              // 這里會(huì)判斷是否需要進(jìn)行注冊
              if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
                  this.getBrokerAddr(),
                  this.brokerConfig.getBrokerName(),
                  this.brokerConfig.getBrokerId(),
                  this.brokerConfig.getRegisterBrokerTimeoutMills())) {
                  // 進(jìn)行注冊操作    
                  doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
              }
          }

          處理注冊的方法為BrokerController#doRegisterBrokerAll,稍微看下它的流程:

          private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
                  TopicConfigSerializeWrapper topicConfigWrapper)
           
          {
              // 注冊
              List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
                  this.brokerConfig.getBrokerClusterName(),
                  this.getBrokerAddr(),
                  this.brokerConfig.getBrokerName(),
                  this.brokerConfig.getBrokerId(),
                  this.getHAServerAddr(),
                  // 這個(gè)對象里就包含了當(dāng)前broker的版本信息
                  topicConfigWrapper,
                  this.filterServerManager.buildNewFilterServerList(),
                  oneway,
                  this.brokerConfig.getRegisterBrokerTimeoutMills(),
                  this.brokerConfig.isCompressedRegister());

              ...
          }

          繼續(xù)跟下去,最終調(diào)用的是BrokerOuterAPI#registerBroker方法:

          private RegisterBrokerResult registerBroker(
              final String namesrvAddr,
              final boolean oneway,
              final int timeoutMills,
              final RegisterBrokerRequestHeader requestHeader,
              final byte[] body
          )
           throws RemotingCommandException, MQBrokerException, RemotingConnectException, 
              RemotingSendRequestException, RemotingTimeoutException, InterruptedException 
          {
              // 構(gòu)建請求
              RemotingCommand request = RemotingCommand
                  .createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
              request.setBody(body);
              // 處理發(fā)送操作:sendOneWay
              if (oneway) {
                  try {
                      // 注冊操作
                      this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
                  } catch (RemotingTooMuchRequestException e) {
                      // Ignore
                  }
                  return null;
                  ...
              }
              ....
          }
                  

          所以,所謂的注冊操作,就是當(dāng)nameServer發(fā)送一條codeRequestCode.REGISTER_BROKER的消息,消息里會(huì)帶上當(dāng)前brokertopic信息、版本號等。

          4.總結(jié)

          本文主要分析了broker的啟動(dòng)流程,總的來說,啟動(dòng)流程分為3個(gè):

          1. 解析配置文件,這一步會(huì)解析各種配置,并將其賦值到對應(yīng)的對象中
          2. BrokerController創(chuàng)建及初始化:創(chuàng)建了BrokerController對象,并進(jìn)行初始化操作,所謂的初始化,就是加載配置文件中配置、創(chuàng)建線程池、注冊請求處理器、啟動(dòng)定時(shí)任務(wù)等
          3. BrokerController啟動(dòng):這一步是啟動(dòng)broker的核心組件,如messageStore(消息存儲(chǔ))、remotingServer(netty服務(wù),用來處理producerconsumer請求)、brokerOuterAPI(netty服務(wù),用來向nameServer上報(bào)當(dāng)前broker信息)等。

          在分析啟動(dòng)過程中,重點(diǎn)分析了兩類消息的發(fā)送:

          1. ShutdownHook中,broker會(huì)向nameServer發(fā)送注銷消息,這表明在broker關(guān)閉前,nameServer會(huì)清除當(dāng)前broker的注冊信息
          2. broker啟動(dòng)后,會(huì)啟動(dòng)一個(gè)定時(shí)任務(wù),定期判斷是否需要向nameServer注冊,判斷是否需要注冊時(shí),會(huì)向nameServer發(fā)送codeQUERY_DATA_VERSION的消息,從nameServer得到當(dāng)前broker的版本號,該版本號與本地版本號不一致,就表示需要向broker重新注冊了,即發(fā)送注冊消息。

          限于篇幅,本文就先分析到這里了,下一篇繼續(xù)分析broker相關(guān)內(nèi)容。


          限于作者個(gè)人水平,文中難免有錯(cuò)誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。

          本文首發(fā)于微信公眾號 Java技術(shù)探秘,如果您喜歡本文,歡迎關(guān)注該公眾號,讓我們一起在技術(shù)的世界里探秘吧!

          - END -


          瀏覽 77
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  免费观看黄色的网站 | 熟妇一区二区三区 | 五月天福利导航 | 日本东京热视频在线播放 | 欧美色色爱爱男人天堂 |