<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RocketMQ(六):nameserver與消息存儲(chǔ)定位實(shí)現(xiàn)

          共 48662字,需瀏覽 98分鐘

           ·

          2021-02-02 08:28

          走過(guò)路過(guò)不要錯(cuò)過(guò)

          點(diǎn)擊藍(lán)字關(guān)注我們


          在rocketmq中,nameserver充當(dāng)了一個(gè)配置管理者的角色,看起來(lái)好似不太重要。然而它是一個(gè)不或缺的角色,沒(méi)有了它的存在,各個(gè)broker就是一盤散沙,各自為戰(zhàn)。

          所以,實(shí)際上,在rocketmq中,nameserver也是一個(gè)領(lǐng)導(dǎo)者的角色。它可以決定哪個(gè)消息存儲(chǔ)到哪里,哪個(gè)broker干活或者上下線,在出現(xiàn)異常情況時(shí),它要能夠及時(shí)處理。以便讓整個(gè)團(tuán)隊(duì)發(fā)揮應(yīng)有的作用。nameserver相當(dāng)于一個(gè)分布式系統(tǒng)的協(xié)調(diào)者。但是這個(gè)名字,是不是看起來(lái)很熟悉?請(qǐng)看后續(xù)!

          1:為什么會(huì)有nameserver?

          如文章開頭所說(shuō),nameserver擔(dān)任的,差不多是一個(gè)系統(tǒng)協(xié)調(diào)者這么個(gè)角色。那么,我們知道,在分布式協(xié)調(diào)工作方面,有很多現(xiàn)成的組件可用。比如 zookeeper, 那么為什么還要自己搞一套nameserver出來(lái)?是為了刷存在感?

          對(duì)于為什么不選擇zk之類的組件實(shí)現(xiàn)協(xié)調(diào)者角色,初衷如何我們不得而知。但至少有幾個(gè)可知答案可以做下支撐:(以zk為例)

              1. zk存在大量的集群間通信;
              2. zk是一個(gè)比較重的組件,而本身就作為消息中間的mq,則最好不好另外再依賴其他組件;(個(gè)人感覺(jué))
              3. zk對(duì)于數(shù)據(jù)的固化能力比較弱,配置往往受限于zk的數(shù)據(jù)格式;

          總體來(lái)說(shuō),可能就是rocketmq想要做的功能在zk上不太好做,或者做起來(lái)也費(fèi)勁,或者太重,索性就不要搞了。自己搞一個(gè)完全定制化的好了。事實(shí)上,rocketmq的nameserver也實(shí)現(xiàn)得相當(dāng)簡(jiǎn)單輕量。這也是設(shè)計(jì)者的初衷吧。

          2. nameserver的啟動(dòng)流程解析

          一般地,一個(gè)框架級(jí)別的服務(wù)啟動(dòng),還是有些復(fù)雜的,那樣的話,我們懶得去看其具體過(guò)程。但前面說(shuō)了,nameserver實(shí)現(xiàn)得非常輕量級(jí),所以,其啟動(dòng)也就相當(dāng)簡(jiǎn)單。所以,我們可以快速一覽其過(guò)程?! ≌麄€(gè)nameserver的啟動(dòng)類是 org.apache.rocketmq.namesrv.NamesrvStartup, 工作過(guò)程大致如下:

          // 入口main    public static void main(String[] args) {        main0(args);    }
          public static NamesrvController main0(String[] args) { try { // 創(chuàng)建本服務(wù)的核心控制器, 解析各種配置參數(shù),默認(rèn)值之類的 NamesrvController controller = createNamesrvController(args); // 開啟服務(wù), 如打開 start(controller); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n", tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); }
          return null; }

          所以整個(gè)啟動(dòng)過(guò)程,基本就是一個(gè) Controller 搞定了,你說(shuō)不簡(jiǎn)單嗎?額,也許不一定!整個(gè)創(chuàng)建 Controller 的過(guò)程就是解析參數(shù)的過(guò)程,有興趣可以打開如下代碼看看:

           public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));        //PackageConflictDetect.detectFastjson();
          Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); return null; }
          final NamesrvConfig namesrvConfig = new NamesrvConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); nettyServerConfig.setListenPort(9876); // -c xx.properties 用于指定配置文件,優(yōu)先級(jí)較低 if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig);
          namesrvConfig.setConfigStorePath(file);
          System.out.printf("load config properties file OK, %s%n", file); in.close(); } } // -p 僅為打印查看啟動(dòng)參數(shù) if (commandLine.hasOption('p')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME); MixAll.printObjectProperties(console, namesrvConfig); MixAll.printObjectProperties(console, nettyServerConfig); System.exit(0); }
          MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
          if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); }
          LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
          log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
          MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); // 將配置參數(shù)傳入controller構(gòu)造實(shí)例 final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
          // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties);
          return controller; } // Controller 構(gòu)造方法 // org.apache.rocketmq.namesrv.NamesrvController#NamesrvController public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) { this.namesrvConfig = namesrvConfig; this.nettyServerConfig = nettyServerConfig; this.kvConfigManager = new KVConfigManager(this); this.routeInfoManager = new RouteInfoManager(); this.brokerHousekeepingService = new BrokerHousekeepingService(this); this.configuration = new Configuration( log, this.namesrvConfig, this.nettyServerConfig ); this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath"); } // org.apache.rocketmq.common.Configuration#registerConfig /** * register config properties * * @return the current Configuration object */ public Configuration registerConfig(Properties extProperties) { if (extProperties == null) { return this; }
          try { readWriteLock.writeLock().lockInterruptibly();
          try { merge(extProperties, this.allConfigs); } finally { readWriteLock.writeLock().unlock(); } } catch (InterruptedException e) { log.error("register lock error. {}" + extProperties); }
          return this; }

          接下來(lái),我們主要來(lái)看看這start()過(guò)程到底如何,復(fù)雜性必然都在這里了。

           // org.apache.rocketmq.namesrv.NamesrvStartup#start    public static NamesrvController start(final NamesrvController controller) throws Exception {
          if (null == controller) { throw new IllegalArgumentException("NamesrvController is null"); } // 初始化controller各環(huán)境,如果失敗,則退出啟動(dòng) boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } // 注冊(cè)一個(gè)關(guān)閉鉤子 Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable() { @Override public Void call() throws Exception { controller.shutdown(); return null; } })); // 核心start()方法 controller.start();
          return controller; } // org.apache.rocketmq.namesrv.NamesrvController#initialize public boolean initialize() {
          this.kvConfigManager.load();
          this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
          this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); // 注冊(cè)處理器 this.registerProcessor(); // 啟動(dòng)后臺(tái)掃描線程,掃描掉線的broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
          @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); // 打印日志定時(shí)任務(wù) this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
          @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES);
          if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // Register a listener to reload SslContext try { fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false; @Override public void onChanged(String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (certChanged && keyChanged) { log.info("The certificate and private key changed, reload the ssl context"); certChanged = keyChanged = false; reloadServerSslContext(); } } private void reloadServerSslContext() { ((NettyRemotingServer) remotingServer).loadSslContext(); } }); } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } } // no false return true; } private void registerProcessor() { if (namesrvConfig.isClusterTest()) {
          this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.remotingExecutor); } else { // 只會(huì)有一個(gè)處理器處理業(yè)務(wù) this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); } }
          // 初始化完成后,接下來(lái)是 start() 方法 // org.apache.rocketmq.namesrv.NamesrvController#start public void start() throws Exception { // 開啟后臺(tái)端口服務(wù),nameserver可連接 this.remotingServer.start(); // 文件檢測(cè)線程 if (this.fileWatchService != null) { this.fileWatchService.start(); } }

          可見,controller的啟動(dòng)過(guò)程也非常簡(jiǎn)單,就是設(shè)置好各初始實(shí)例,開幾個(gè)后臺(tái)定時(shí)任務(wù),注冊(cè)處理器,然后將tcp端口打開,即可。其中端口服務(wù)是使用netty作為通信組件,其操作完全遵從netty編程范式??勺孕胁殚?。

          @Override    public void start() {        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(            nettyServerConfig.getServerWorkerThreads(),            new ThreadFactory() {
          private AtomicInteger threadIndex = new AtomicInteger(0);
          @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } });
          prepareSharableHandlers();
          ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) .addLast(defaultEventExecutorGroup, encoder, new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), connectionManageHandler, serverHandler ); } });
          if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); }
          try { ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); }
          if (this.channelEventListener != null) { this.nettyEventExecutor.start(); }
          this.timer.scheduleAtFixedRate(new TimerTask() {
          @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); }

          至此,nameserver的啟動(dòng)流程就完成了,果然是輕量級(jí)。至于其提供什么樣的服務(wù),我們下一節(jié)再講。

          3. nameserver 業(yè)務(wù)處理框架

          因nameserver和broker一樣,都共用了remoting模塊的代碼,即都依賴于netty的handler處理機(jī)制。所以其處理器入口都是一樣的。反正最終都是找到對(duì)應(yīng)的processor, 然后處理業(yè)務(wù)即可。此處,nameserver只會(huì)提供一個(gè)默認(rèn)的處理器,即DefaultRequestProcessor。所以,只需了解其processRequest()即可知nameserver的整體能力了。

          // org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest    @Override    public RemotingCommand processRequest(ChannelHandlerContext ctx,        RemotingCommand request) throws RemotingCommandException {
          if (ctx != null) { log.debug("receive request, {} {} {}", request.getCode(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), request); }
          switch (request.getCode()) { case RequestCode.PUT_KV_CONFIG: return this.putKVConfig(ctx, request); case RequestCode.GET_KV_CONFIG: return this.getKVConfig(ctx, request); case RequestCode.DELETE_KV_CONFIG: return this.deleteKVConfig(ctx, request); case RequestCode.QUERY_DATA_VERSION: return queryBrokerTopicConfig(ctx, request); // 注冊(cè)broker信息,這種操作一般是在broker啟動(dòng)的時(shí)候進(jìn)行請(qǐng)求 case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } // 下線broker case RequestCode.UNREGISTER_BROKER: return this.unregisterBroker(ctx, request); // 獲取路由信息,即哪個(gè)topic存在于哪些broker上,哪些messageQueue在哪里等 case RequestCode.GET_ROUTEINFO_BY_TOPIC: return this.getRouteInfoByTopic(ctx, request); case RequestCode.GET_BROKER_CLUSTER_INFO: return this.getBrokerClusterInfo(ctx, request); case RequestCode.WIPE_WRITE_PERM_OF_BROKER: return this.wipeWritePermOfBroker(ctx, request); case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: return getAllTopicListFromNameserver(ctx, request); case RequestCode.DELETE_TOPIC_IN_NAMESRV: return deleteTopicInNamesrv(ctx, request); case RequestCode.GET_KVLIST_BY_NAMESPACE: return this.getKVListByNamespace(ctx, request); case RequestCode.GET_TOPICS_BY_CLUSTER: return this.getTopicsByCluster(ctx, request); case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS: return this.getSystemTopicListFromNs(ctx, request); case RequestCode.GET_UNIT_TOPIC_LIST: return this.getUnitTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST: return this.getHasUnitSubTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST: return this.getHasUnitSubUnUnitTopicList(ctx, request); case RequestCode.UPDATE_NAMESRV_CONFIG: return this.updateConfig(ctx, request); case RequestCode.GET_NAMESRV_CONFIG: return this.getConfig(ctx, request); default: break; } return null; }


          以上就是整個(gè)nameserver提供的服務(wù)列表了,也沒(méi)啥注釋,見字如悟吧,我們也不想過(guò)多糾纏。但總體上,其處理的業(yè)務(wù)類型并不多,主要有三類:

              1. 配置信息kv的操作;
              2. broker上下線管理操作;
              3. topic路由信息管理服務(wù);

          各自實(shí)現(xiàn)當(dāng)然是按照業(yè)務(wù)處理,本無(wú)需多說(shuō),但為了解概要,我們還是挑一個(gè)重點(diǎn)來(lái)說(shuō)說(shuō)吧:broker的上線處理注冊(cè):

          // 為保持前沿起見,咱們以高版本服務(wù)展開思路(即版本大于3.0.11)    public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)        throws RemotingCommandException {        final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);        final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();        final RegisterBrokerRequestHeader requestHeader =            (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
          if (!checksum(ctx, request, requestHeader)) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("crc32 not match"); return response; }
          RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
          if (request.getBody() != null) { try { registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed()); } catch (Exception e) { throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e); } } else { registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0)); registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0); } // 重點(diǎn)實(shí)現(xiàn): registerBroker RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker( requestHeader.getClusterName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerName(), requestHeader.getBrokerId(), requestHeader.getHaServerAddr(), registerBrokerBody.getTopicConfigSerializeWrapper(), registerBrokerBody.getFilterServerList(), ctx.channel());
          responseHeader.setHaServerAddr(result.getHaServerAddr()); responseHeader.setMasterAddr(result.getMasterAddr());
          byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG); response.setBody(jsonValue);
          response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } // org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List filterServerList, final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); try { try { // 上鎖更新各表數(shù)據(jù) this.lock.writeLock().lockInterruptibly(); // 集群名表 Set brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName);
          boolean registerFirst = false; // broker詳細(xì)信息表 BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap()); this.brokerAddrTable.put(brokerName, brokerData); } Map brokerAddrsMap = brokerData.getBrokerAddrs(); //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT> //The same IP:PORT must only have one record in brokerAddrTable Iterator> it = brokerAddrsMap.entrySet().iterator(); while (it.hasNext()) { Entry item = it.next(); if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) { it.remove(); } }
          String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr);
          if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { // 首次注冊(cè)或者topic變更,則更新topic信息 ConcurrentMap tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } } // 存活的broker信息表 BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); if (null == prevBrokerLiveInfo) { log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr); }
          if (filterServerList != null) { if (filterServerList.isEmpty()) { this.filterServerTable.remove(brokerAddr); } else { this.filterServerTable.put(brokerAddr, filterServerList); } } // slave節(jié)點(diǎn)注冊(cè)需綁定masterAddr 返回 if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception", e); }
          return result; }

          好吧,是不是很抽象。沒(méi)關(guān)系,能知道大概意思就行了。大體上就是broker上線了,nameserver需要知道這些事,要把這信息加入到各表項(xiàng)中,以備將來(lái)使用。具體理解我們應(yīng)該要從業(yè)務(wù)性質(zhì)出發(fā)才能透徹。反正就和咱們平時(shí)寫業(yè)務(wù)代碼并無(wú)二致。

          4. topic存儲(chǔ)位置策略

          nameserver除了有注冊(cè)broker的核心作用外,還有一個(gè)非常核心的作用就是,為各消費(fèi)者或生產(chǎn)者提供各topic信息所在位置。這個(gè)位置決定了數(shù)據(jù)如何存儲(chǔ)以及如何訪問(wèn)問(wèn)題,只要這個(gè)決策出問(wèn)題,則整個(gè)集群的可靠性就無(wú)法保證了。所以,這個(gè)點(diǎn)需要我們深入理解下。

          在kafka中,其存儲(chǔ)策略是和shard強(qiáng)相關(guān)的,一個(gè)topic分配了多少shard就決定了它可以存儲(chǔ)到幾個(gè)機(jī)器節(jié)點(diǎn)上,即kafka是以shard作為粒度分配存儲(chǔ)的。

          但rocketmq中則不太一樣,類似的概念有:topic是最外層的存儲(chǔ),而messageQueue則是內(nèi)一層的存儲(chǔ),它是否是按照topic存儲(chǔ)或者按照msgQueue存在呢?實(shí)際上,在官方文檔中,已經(jīng)描述清楚了:Broker 在實(shí)際部署過(guò)程中對(duì)應(yīng)一臺(tái)服務(wù)器,每個(gè) Broker 可以存儲(chǔ)多個(gè)Topic的消息,每個(gè)Topic的消息也可以分片存儲(chǔ)于不同的 Broker。Message Queue 用于存儲(chǔ)消息的物理地址,每個(gè)Topic中的消息地址存儲(chǔ)于多個(gè) Message Queue 中。

          即rocketmq中是以message queue作為最細(xì)粒度的存儲(chǔ)的,實(shí)際上這基本無(wú)懸念,因?yàn)榉植际酱鎯?chǔ)需要。(試想以topic為存儲(chǔ)粒度會(huì)帶來(lái)多少問(wèn)題就知道了)

          那么,它又是如何劃分哪個(gè)message queue存儲(chǔ)在哪里的呢?

           // RequestCode.GET_ROUTEINFO_BY_TOPIC    public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,        RemotingCommand request) throws RemotingCommandException {        final RemotingCommand response = RemotingCommand.createResponseCommand(null);        final GetRouteInfoRequestHeader requestHeader =            (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);        // 獲取topic路由信息        TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
          if (topicRouteData != null) { // 順序消費(fèi)配置 if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) { String orderTopicConf = this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, requestHeader.getTopic()); topicRouteData.setOrderTopicConf(orderTopicConf); }
          byte[] content = topicRouteData.encode(); response.setBody(content); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
          response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic() + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; } // org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#pickupTopicRouteData public TopicRouteData pickupTopicRouteData(final String topic) { TopicRouteData topicRouteData = new TopicRouteData(); boolean foundQueueData = false; boolean foundBrokerData = false; Set brokerNameSet = new HashSet(); List brokerDataList = new LinkedList(); topicRouteData.setBrokerDatas(brokerDataList);
          HashMap> filterServerMap = new HashMap>(); topicRouteData.setFilterServerTable(filterServerMap);
          try { try { this.lock.readLock().lockInterruptibly(); // 獲取所有topic的messageQueue信息 List queueDataList = this.topicQueueTable.get(topic); if (queueDataList != null) { topicRouteData.setQueueDatas(queueDataList); foundQueueData = true;
          Iterator it = queueDataList.iterator(); while (it.hasNext()) { QueueData qd = it.next(); brokerNameSet.add(qd.getBrokerName()); } // 根據(jù)brokerName, 查找broker信息,如果沒(méi)找到說(shuō)明該broker可能已經(jīng)下線,不能算在路由信息內(nèi) for (String brokerName : brokerNameSet) { BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null != brokerData) { BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap) brokerData .getBrokerAddrs().clone()); brokerDataList.add(brokerDataClone); // 只要找到一個(gè)broker就可以進(jìn)行路由處理 foundBrokerData = true; for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) { List filterServerList = this.filterServerTable.get(brokerAddr); filterServerMap.put(brokerAddr, filterServerList); } } } } } finally { this.lock.readLock().unlock(); } } catch (Exception e) { log.error("pickupTopicRouteData Exception", e); }
          log.debug("pickupTopicRouteData {} {}", topic, topicRouteData); // 只有隊(duì)列信息和broker信息都找到時(shí),整個(gè)路由信息才可返回 if (foundBrokerData && foundQueueData) { return topicRouteData; }
          return null; } // QueueData 作為路由信息的重要組成部分,其數(shù)據(jù)結(jié)構(gòu)如下public class QueueData implements Comparable { private String brokerName; private int readQueueNums; private int writeQueueNums; private int perm; private int topicSynFlag; ...} // brokerData 數(shù)據(jù)結(jié)構(gòu)如下public class BrokerData implements Comparable { private String cluster; private String brokerName; private HashMap brokerAddrs; ...}


          ok, 從上面的實(shí)現(xiàn)中,我們可以看到,查找路由信息,是根據(jù)topic進(jìn)行查找的。而topic信息保存在 topicQueueTable 中。這里有個(gè)重要點(diǎn)是,整個(gè)路由查找過(guò)程,居然和queueId是無(wú)關(guān)的,那么它又是如何定位queueId所在的位置呢?另外,這個(gè)topicQueTable里的數(shù)據(jù)又是何時(shí)維護(hù)的呢?

          首先,對(duì)于topicQueueTable的維護(hù),是在broker注冊(cè)和解注冊(cè)時(shí)維護(hù)的,這很好理解。

           // 也就前面看到的broker為master節(jié)點(diǎn)時(shí)的 createAndUpdateQueueData()    private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {        QueueData queueData = new QueueData();        queueData.setBrokerName(brokerName);        queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());        queueData.setReadQueueNums(topicConfig.getReadQueueNums());        queueData.setPerm(topicConfig.getPerm());        queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
          List queueDataList = this.topicQueueTable.get(topicConfig.getTopicName()); // topic的首個(gè)broker if (null == queueDataList) { queueDataList = new LinkedList(); queueDataList.add(queueData); this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList); log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData); } else { boolean addNewOne = true;
          Iterator it = queueDataList.iterator(); // 添加一個(gè)broker while (it.hasNext()) { QueueData qd = it.next(); if (qd.getBrokerName().equals(brokerName)) { if (qd.equals(queueData)) { addNewOne = false; } else { log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd, queueData); it.remove(); } } }
          if (addNewOne) { queueDataList.add(queueData); } } }

          但針對(duì)queueId又是何時(shí)進(jìn)行處理的呢?看起來(lái)nameserver不得而知。

          事實(shí)上,數(shù)據(jù)發(fā)送到哪個(gè)broker或從哪個(gè)broker上進(jìn)行數(shù)據(jù)消費(fèi),是由各客戶端根據(jù)策略決定的。比如在producer中是這樣處理的:

          // org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl    private SendResult sendDefaultImpl(        Message msg,        final CommunicationMode communicationMode,        final SendCallback sendCallback,        final long timeout    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        this.makeSureStateOK();        Validators.checkMessage(msg, this.defaultMQProducer);        final long invokeID = random.nextLong();        long beginTimestampFirst = System.currentTimeMillis();        long beginTimestampPrev = beginTimestampFirst;        long endTimestamp = beginTimestampFirst;        // 此處即是nameserver返回的路由信息,即可用的broker列表        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());        if (topicPublishInfo != null && topicPublishInfo.ok()) {            boolean callTimeout = false;            MessageQueue mq = null;            Exception exception = null;            SendResult sendResult = null;            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;            int times = 0;            String[] brokersSent = new String[timesTotal];            for (; times < timesTotal; times++) {                // 首次進(jìn)入時(shí),只是選擇一個(gè)隊(duì)列發(fā)送                String lastBrokerName = null == mq ? null : mq.getBrokerName();                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);                if (mqSelected != null) {                    mq = mqSelected;                    brokersSent[times] = mq.getBrokerName();                    try {                        beginTimestampPrev = System.currentTimeMillis();                        if (times > 0) {                            //Reset topic with namespace during resend.                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));                        }                        long costTime = beginTimestampPrev - beginTimestampFirst;                        if (timeout < costTime) {                            callTimeout = true;                            break;                        }                        // 向選擇出來(lái)的messageQueue 發(fā)送消息數(shù)據(jù)                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);                        endTimestamp = System.currentTimeMillis();                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);                        switch (communicationMode) {                            case ASYNC:                                return null;                            case ONEWAY:                                return null;                            case SYNC:                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {                                        continue;                                    }                                }
          return sendResult; default: break; } } catch (RemotingException e) ... } // org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); } // org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { // 容錯(cuò)處理,不影響策略理解 if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } }
          final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); }
          return tpInfo.selectOneMessageQueue(); }
          return tpInfo.selectOneMessageQueue(lastBrokerName); } // org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue // 直接使用輪詢的方式選擇一個(gè)隊(duì)列 public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { // 任意選擇一個(gè)messageQueue作為發(fā)送目標(biāo) return selectOneMessageQueue(); } else { int index = this.sendWhichQueue.getAndIncrement(); // 最大嘗試n次獲取不一樣的MQueue, 如仍然獲取不到,則隨便選擇一個(gè)即可 for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } }

          好了,通過(guò)上面的描述,我們大概知道了,一個(gè)消息要發(fā)送往消息server時(shí),首先會(huì)根據(jù)topic找到所有可用的broker列表(nameserver提供),然后根據(jù)一個(gè)所謂策略選擇一個(gè)MessageQueue,最后向這個(gè)MessageQueue發(fā)送數(shù)據(jù)即可。所以,這個(gè)MessageQueue是非常重要的,我們來(lái)看下其數(shù)據(jù)結(jié)構(gòu):

          // org.apache.rocketmq.common.message.MessageQueuepublic class MessageQueue implements Comparable, Serializable {    private static final long serialVersionUID = 6191200464116433425L;    private String topic;    private String brokerName;    private int queueId;    ...}


          這是非常之簡(jiǎn)潔啊,僅有主要的三個(gè)核心:topic(主題),brokerName(broker標(biāo)識(shí)),queueId(隊(duì)列id)。?

          前面提到的客戶端策略,會(huì)選擇一個(gè)MessageQueue, 即會(huì)得到一個(gè)broker標(biāo)識(shí),對(duì)應(yīng)一個(gè)queueId。所以,數(shù)據(jù)存放在哪個(gè)broker,是由客戶端決定的,且存放位置未知。也就是說(shuō),rocketmq中同一個(gè)topic的數(shù)據(jù),是散亂存放在一堆broker中的。這和我們通常的認(rèn)知是有一定差距的。

          這樣設(shè)計(jì)有什么好處呢?好處自然是有的,比如假如其中有些broker掛掉了,那么整個(gè)集群無(wú)需經(jīng)過(guò)什么再均衡策略,同樣可以工作得很好,因?yàn)榭蛻舳丝梢灾苯酉蛘5腷roker發(fā)送消息即可。其他好處。。。

          但是我個(gè)人覺(jué)得這樣的設(shè)計(jì),也不見得很好,比如你不能夠很確定地定位到某條消息在哪個(gè)broker上,完全無(wú)規(guī)律可循。另外,如果想在單queueId上保持一定的規(guī)則,則是不可能的(也許有其他曲線救國(guó)之法)。另外,對(duì)于queueId, 只是一個(gè)系統(tǒng)內(nèi)部的概念,實(shí)際上用戶并不能指定該值。

          5. MessageQueue到底存在哪里?

          按照上面說(shuō)的,一個(gè)topic數(shù)據(jù)可能被存放在n個(gè)broker中,且以messageQueue的queueId作為單獨(dú)存儲(chǔ)。那么,到底數(shù)據(jù)存放在哪里?所說(shuō)的n個(gè)broker到底指哪幾個(gè)broker?每個(gè)broker上到底存放了幾個(gè)queueId?這些問(wèn)題如果沒(méi)有搞清楚,我們就無(wú)法說(shuō)清楚這玩意。

          我們先來(lái)回答第一個(gè)問(wèn)題,topic數(shù)據(jù)到底存放在幾個(gè)broker中?回顧下前面broker的注冊(cè)過(guò)程可知:

          // org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker    if (null != topicConfigWrapper        && MixAll.MASTER_ID == brokerId) {        if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())            || registerFirst) {            // 首次注冊(cè)或者topic變更,則更新topic信息            ConcurrentMap tcTable =                topicConfigWrapper.getTopicConfigTable();            if (tcTable != null) {                // 遍歷所有topic, 將當(dāng)前新進(jìn)的broker 加入到處理機(jī)器中                for (Map.Entry entry : tcTable.entrySet()) {                    this.createAndUpdateQueueData(brokerName, entry.getValue());                }            }        }    }


          看完這段,我們就明白了,原來(lái)所謂的n個(gè)broker可處理topic信息,實(shí)際上指的是所有broker?。『冒?,咱也不懂為啥這么干,反正就是這么干了,topic可能分布在所有broker機(jī)器上。至于具體哪一臺(tái),你猜?。?/span>

          接下來(lái)我們看第二個(gè)問(wèn)題,一個(gè)broker到底存儲(chǔ)了幾個(gè)queueId的數(shù)據(jù)?實(shí)際上,我們稍微想想前面的實(shí)現(xiàn),broker是指所有的broker,如果所有broker都是一樣的配置,那么是不是應(yīng)該讓每個(gè)broker都存儲(chǔ)所有queueId呢?(盡管沒(méi)啥依據(jù),還是可以想想的嘛)

          rocketmq的各客戶端(生產(chǎn)者、消費(fèi)者)每次向服務(wù)器發(fā)送生產(chǎn)或消費(fèi)請(qǐng)求時(shí),都可能向nameserver請(qǐng)求拉取路由信息,但這些信息從我們前面調(diào)查的結(jié)果來(lái)看,并不包含queueId信息。那么,后續(xù)又是如何轉(zhuǎn)換為queueId的呢?實(shí)際上,就是在拉取了nameserver的路由信息之后,本地再做一次分配就可以了:

          // 更新topic路由信息    // org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);        if (null == topicPublishInfo || !topicPublishInfo.ok()) {            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);            topicPublishInfo = this.topicPublishInfoTable.get(topic);        }
          if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { // 從nameserver拉取路由數(shù)據(jù) this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } } // org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { try { if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { TopicRouteData topicRouteData; if (isDefault && defaultMQProducer != null) { topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } } else { topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); } if (topicRouteData != null) { TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteDataIsChange(old, topicRouteData); if (!changed) { changed = this.isNeedUpdateTopicRouteInfo(topic); } else { log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData); }
          if (changed) { TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
          for (BrokerData bd : topicRouteData.getBrokerDatas()) { this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); }
          // Update Pub info { // 為每個(gè)broker分配queueId TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData); publishInfo.setHaveTopicRouterInfo(true); Iterator> it = this.producerTable.entrySet().iterator(); while (it.hasNext()) { Entry entry = it.next(); MQProducerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicPublishInfo(topic, publishInfo); } } }
          // Update sub info { Set subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData); Iterator> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicSubscribeInfo(topic, subscribeInfo); } } } log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData); this.topicRouteTable.put(topic, cloneTopicRouteData); return true; } } else { log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic); } } catch (MQClientException e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } } catch (RemotingException e) { log.error("updateTopicRouteInfoFromNameServer Exception", e); throw new IllegalStateException(e); } finally { this.lockNamesrv.unlock(); } } else { log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS); } } catch (InterruptedException e) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); }
          return false; }

          生產(chǎn)者分配queueId的實(shí)現(xiàn)如下:

          // org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteData2TopicPublishInfo    public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {        TopicPublishInfo info = new TopicPublishInfo();        info.setTopicRouteData(route);        // 為每個(gè)broker指定queueId的分配情況(最大queueId)        // 這樣的配置不知道累不累        if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {            String[] brokers = route.getOrderTopicConf().split(";");            for (String broker : brokers) {                String[] item = broker.split(":");                int nums = Integer.parseInt(item[1]);                for (int i = 0; i < nums; i++) {                    MessageQueue mq = new MessageQueue(topic, item[0], i);                    info.getMessageQueueList().add(mq);                }            }
          info.setOrderTopic(true); } else { List qds = route.getQueueDatas(); Collections.sort(qds); for (QueueData qd : qds) { if (PermName.isWriteable(qd.getPerm())) { BrokerData brokerData = null; for (BrokerData bd : route.getBrokerDatas()) { if (bd.getBrokerName().equals(qd.getBrokerName())) { brokerData = bd; break; } } // 還是有broker無(wú)法處理queue哦 if (null == brokerData) { continue; } // 非master節(jié)點(diǎn)不能接受寫請(qǐng)求 if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) { continue; } // 根據(jù) writeQueueNums 數(shù)量,要求該broker接受所有小于該值的queueId for (int i = 0; i < qd.getWriteQueueNums(); i++) { MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i); info.getMessageQueueList().add(mq); } } }
          info.setOrderTopic(false); }
          return info; }


          可以看出,生產(chǎn)者對(duì)應(yīng)的broker中,負(fù)責(zé)寫的broker只能是master節(jié)點(diǎn),負(fù)責(zé)所有小于writeQueueNums的queueId的數(shù)據(jù)存儲(chǔ)。(如果所有broker配置一樣,則相當(dāng)于所有broker都存儲(chǔ)所有queueId),所以,這存儲(chǔ)關(guān)系,可能是理不清楚了。

          我們?cè)賮?lái)看看消費(fèi)者是如何對(duì)應(yīng)queueId的呢?

          // org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteData2TopicSubscribeInfo    public static Set topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {        Set mqList = new HashSet();        List qds = route.getQueueDatas();        for (QueueData qd : qds) {            if (PermName.isReadable(qd.getPerm())) {                // 可讀取broker上對(duì)應(yīng)的所有小于readQueueNums 的隊(duì)列                for (int i = 0; i < qd.getReadQueueNums(); i++) {                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);                    mqList.add(mq);                }            }        }
          return mqList; }

          原理和生產(chǎn)者差不多,就是通過(guò)一個(gè) readQueueNums 來(lái)限定讀取的隊(duì)列數(shù),基本上就是等于所有隊(duì)列了,原因可能是原本數(shù)據(jù)就存儲(chǔ)了所有queueId,如果消費(fèi)者不讀取,又該誰(shuí)來(lái)讀取呢?!?/span>

          好了,到此我們總算厘清了整個(gè)rocketmq的消息存儲(chǔ)定位方式了。總結(jié)一句話就是:任何節(jié)點(diǎn)都可能有任意topic的任意queueId數(shù)據(jù)。這結(jié)果,不禁又讓我有一種千頭萬(wàn)緒的感覺(jué)!

          以上僅是一些正常的rocketmq數(shù)據(jù)存儲(chǔ)的實(shí)現(xiàn),只能算是皮毛。事實(shí)上,分布式系統(tǒng)中一個(gè)非常重要的能力是容錯(cuò),欲知后事如何且聽下回分解。



          往期精彩推薦



          騰訊、阿里、滴滴后臺(tái)面試題匯總總結(jié) — (含答案)

          面試:史上最全多線程面試題 !

          最新阿里內(nèi)推Java后端面試題

          JVM難學(xué)?那是因?yàn)槟銢](méi)認(rèn)真看完這篇文章


          END


          關(guān)注作者微信公眾號(hào) —《JAVA爛豬皮》


          了解更多java后端架構(gòu)知識(shí)以及最新面試寶典


          你點(diǎn)的每個(gè)好看,我都認(rèn)真當(dāng)成了


          看完本文記得給作者點(diǎn)贊+在看哦~~~大家的支持,是作者源源不斷出文的動(dòng)力


          作者:等你歸去來(lái)

          出處:https://www.cnblogs.com/yougewe/p/14128845.html


          瀏覽 31
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  大鸡巴艹艹艹麻豆 | 麻豆三级片在线 | 五月天色婷婷av 五月婷婷激情视频 | 99最新在线视频 | 无码一区二区三区四区五区在线看 |