<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RocketMQ:主從同步的實(shí)現(xiàn)

          共 40550字,需瀏覽 82分鐘

           ·

          2020-12-29 15:58

          走過(guò)路過(guò)不要錯(cuò)過(guò)

          點(diǎn)擊藍(lán)字關(guān)注我們


          分布式系統(tǒng)的三大理論CAP就不說(shuō)了,但是作為分布式消息系統(tǒng)的rocketmq, 主從功能是最最基礎(chǔ)的保證可用性的手段了。也許該功能現(xiàn)在已經(jīng)不是很常用了,但是對(duì)于我們理解一些分布式系統(tǒng)的常用工作原理還是有些積極意義的。

          今天就一起來(lái)挖挖rocketmq是如何實(shí)現(xiàn)主從數(shù)據(jù)同步的吧。

          1. 主從同步概述

          主從同步這個(gè)概念相信大家在平時(shí)的工作中,多少都會(huì)聽(tīng)到。其目的主要是用于做一備份類操作,以及一些讀寫(xiě)分離場(chǎng)景。比如我們常用的關(guān)系型數(shù)據(jù)庫(kù)mysql,就有主從同步功能在。

          主從同步,就是將主服務(wù)器上的數(shù)據(jù)同步到從服務(wù)器上,也就是相當(dāng)于新增了一個(gè)副本。

          而具體的主從同步的實(shí)現(xiàn)也各有千秋,如mysql中通過(guò)binlog實(shí)現(xiàn)主從同步,es中通過(guò)translog實(shí)現(xiàn)主從同步,redis中通過(guò)aof實(shí)現(xiàn)主從同步。那么,rocketmq又是如何實(shí)現(xiàn)的主從同步呢?

          另外,主從同步需要考慮的問(wèn)題是哪些呢?

          1. 數(shù)據(jù)同步的及時(shí)性?(延遲與一致性)
          2. 對(duì)主服務(wù)器的影響性?(可用性)
          3. 是否可替代主服務(wù)器?(可用性或者分區(qū)容忍性)

          前面兩個(gè)點(diǎn)是必須要考慮的,但對(duì)于第3個(gè)點(diǎn),則可能不會(huì)被考慮。因?yàn)橥ㄟ^(guò)系統(tǒng)可能無(wú)法很好的做到這一點(diǎn),所以很多系統(tǒng)就直接忽略這一點(diǎn)了,簡(jiǎn)單嘛。即很多時(shí)候只把從服務(wù)器當(dāng)作是一個(gè)備份存在,不會(huì)接受寫(xiě)請(qǐng)求。如果要進(jìn)行主從切換,必須要人工介入,做預(yù)知的有損切換。但隨著技術(shù)的發(fā)展,現(xiàn)在已有非常多的自動(dòng)切換主從的服務(wù)存在,這是在分布式系統(tǒng)滿天下的當(dāng)今的必然趨勢(shì)。

          2. rocketmq主從同步配置

          在rocketmq中,最核心的組件是 broker, 它負(fù)責(zé)幾乎所有的存儲(chǔ)讀取業(yè)務(wù)。所以,要談主從同步,那必然是針對(duì)broker進(jìn)行的。我們?cè)賮?lái)回看rocketmq的部署架構(gòu)圖,以便全局觀察:

          非常清晰的架構(gòu),無(wú)須多言。因?yàn)槲覀冎v的是主從同步,所以只看broker這個(gè)組件,那么整個(gè)架構(gòu)就可以簡(jiǎn)化為:? BrokerMaster -> BrokerSlave 了。同樣,再簡(jiǎn)化,主從同步就是如何將Master的數(shù)據(jù)同步到Slave這么個(gè)過(guò)程。

          那么,如何配置使用主從同步呢?

          conf/broker-a.properties? (master配置)

          #所屬集群名字brokerClusterName=DefaultCluster#broker名字,名字可重復(fù),為了管理,每個(gè)master起一個(gè)名字,他的slave同他,eg:Amaster叫broker-a,他的slave也叫broker-abrokerName=broker-a#0 表示 Master,>0 表示 SlavebrokerId=0#Broker 的角色#- ASYNC_MASTER 異步復(fù)制Master#- SYNC_MASTER 同步雙寫(xiě)Master#- SLAVEbrokerRole=ASYNC_MASTER#刷盤方式#- ASYNC_FLUSH 異步刷盤#- SYNC_FLUSH 同步刷盤flushDiskType=ASYNC_FLUSH#nameServer地址,分號(hào)分割namesrvAddr=172.0.1.5:9876;172.0.1.6:9876#在發(fā)送消息時(shí),自動(dòng)創(chuàng)建服務(wù)器不存在的topic,默認(rèn)創(chuàng)建的隊(duì)列數(shù)defaultTopicQueueNums=4#是否允許 Broker 自動(dòng)創(chuàng)建Topic,建議線下開(kāi)啟,線上關(guān)閉autoCreateTopicEnable=true#是否允許 Broker 自動(dòng)創(chuàng)建訂閱組,建議線下開(kāi)啟,線上關(guān)閉autoCreateSubscriptionGroup=true#Broker 對(duì)外服務(wù)的監(jiān)聽(tīng)端口,listenPort=10911#刪除文件時(shí)間點(diǎn),默認(rèn)凌晨 4點(diǎn)deleteWhen=04#文件保留時(shí)間,默認(rèn) 48 小時(shí)fileReservedTime=120#commitLog每個(gè)文件的大小默認(rèn)1GmapedFileSizeCommitLog=1073741824#ConsumeQueue每個(gè)文件默認(rèn)存30W條,根據(jù)業(yè)務(wù)情況調(diào)整mapedFileSizeConsumeQueue=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#檢測(cè)物理文件磁盤空間diskMaxUsedSpaceRatio=88#存儲(chǔ)路徑storePathRootDir=/usr/local/rocketmq/store/broker-a#commitLog 存儲(chǔ)路徑storePathCommitLog=/usr/local/rocketmq/store/broker-a/commitlog#消費(fèi)隊(duì)列存儲(chǔ)路徑存儲(chǔ)路徑storePathConsumeQueue=/usr/local/rocketmq/store/broker-a/consumequeue#消息索引存儲(chǔ)路徑storePathIndex=/usr/local/rocketmq/store/broker-a/index#checkpoint 文件存儲(chǔ)路徑storeCheckpoint=/usr/local/rocketmq/store/checkpoint#abort 文件存儲(chǔ)路徑abortFile=/usr/local/rocketmq/store/abort#限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#checkTransactionMessageEnable=false#發(fā)消息線程池?cái)?shù)量#sendMessageThreadPoolNums=128#拉消息線程池?cái)?shù)量#pullMessageThreadPoolNums=128

          conf/broker-a-s.properties (slave配置)

          #所屬集群名字brokerClusterName=DefaultCluster#broker名字,名字可重復(fù),為了管理,每個(gè)master起一個(gè)名字,他的slave同他,eg:Amaster叫broker-a,他的slave也叫broker-abrokerName=broker-a#0 表示 Master,>0 表示 SlavebrokerId=1#Broker 的角色#- ASYNC_MASTER 異步復(fù)制Master#- SYNC_MASTER 同步雙寫(xiě)Master#- SLAVEbrokerRole=SLAVE#刷盤方式#- ASYNC_FLUSH 異步刷盤#- SYNC_FLUSH 同步刷盤flushDiskType=ASYNC_FLUSH#nameServer地址,分號(hào)分割namesrvAddr=172.0.1.5:9876;172.0.1.6:9876#在發(fā)送消息時(shí),自動(dòng)創(chuàng)建服務(wù)器不存在的topic,默認(rèn)創(chuàng)建的隊(duì)列數(shù)defaultTopicQueueNums=4#是否允許 Broker 自動(dòng)創(chuàng)建Topic,建議線下開(kāi)啟,線上關(guān)閉autoCreateTopicEnable=true#是否允許 Broker 自動(dòng)創(chuàng)建訂閱組,建議線下開(kāi)啟,線上關(guān)閉autoCreateSubscriptionGroup=true#Broker 對(duì)外服務(wù)的監(jiān)聽(tīng)端口,listenPort=10920#刪除文件時(shí)間點(diǎn),默認(rèn)凌晨 4點(diǎn)deleteWhen=04#文件保留時(shí)間,默認(rèn) 48 小時(shí)fileReservedTime=120#commitLog每個(gè)文件的大小默認(rèn)1GmapedFileSizeCommitLog=1073741824#ConsumeQueue每個(gè)文件默認(rèn)存30W條,根據(jù)業(yè)務(wù)情況調(diào)整mapedFileSizeConsumeQueue=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#檢測(cè)物理文件磁盤空間diskMaxUsedSpaceRatio=88#存儲(chǔ)路徑storePathRootDir=/usr/local/rocketmq/store/broker-a-s#commitLog 存儲(chǔ)路徑storePathCommitLog=/usr/local/rocketmq/store/broker-a-s/commitlog#消費(fèi)隊(duì)列存儲(chǔ)路徑存儲(chǔ)路徑storePathConsumeQueue=/usr/local/rocketmq/store/broker-a-s/consumequeue#消息索引存儲(chǔ)路徑storePathIndex=/usr/local/rocketmq/store/broker-a-s/index#checkpoint 文件存儲(chǔ)路徑storeCheckpoint=/usr/local/rocketmq/store/checkpoint#abort 文件存儲(chǔ)路徑abortFile=/usr/local/rocketmq/store/abort#限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#checkTransactionMessageEnable=false#發(fā)消息線程池?cái)?shù)量#sendMessageThreadPoolNums=128#拉消息線程池?cái)?shù)量#pullMessageThreadPoolNums=128

          實(shí)際上具體配置文件叫什么名字不重要,重要的是要在啟動(dòng)時(shí)指定指定對(duì)應(yīng)的配置文件位置即可。啟動(dòng)master/slave命令如下:

            nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties > logs/broker-a.log 2>&1 &    nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a-s.properties > logs/broker-a-s.log 2>&1 &

          以上配置,如果怕啟動(dòng)命令出錯(cuò),也可以統(tǒng)一使用一個(gè) broker.properties (默認(rèn)查找), 里面寫(xiě)不同的內(nèi)容,這樣就無(wú)需在不同機(jī)器上使用不同的命令啟動(dòng)了,也避免了一定程度的誤操作。

          當(dāng)然要在啟動(dòng)broker之前啟動(dòng)nameserver節(jié)點(diǎn)。這樣,一個(gè)rocketmq的主從集群就配置好了。配置項(xiàng)看起來(lái)有點(diǎn)多,但核心實(shí)際上只有一個(gè):在保持brokderName相同的前提下配置brokerRole=ASYNC_MASTER|SLAVE|SYNC_MASTER, 通過(guò)這個(gè)值就可以確定是主是從。從向主復(fù)制數(shù)據(jù)或者主向從同步數(shù)據(jù)。

          3. rocketmq主從同步的實(shí)現(xiàn)

          了解完主從配置,才是我們理解實(shí)現(xiàn)的開(kāi)始。也從上面的說(shuō)明中,我們看出,一個(gè)broker是master或者slave是在配置文件中就指定了的,也就是說(shuō)這個(gè)性質(zhì)是改不了的了。所以,這個(gè)主從相關(guān)的動(dòng)作,會(huì)在broker啟動(dòng)時(shí)就表現(xiàn)出不一樣了。

          我們先看看broker運(yùn)行同步的大體框架如何:

          // org.apache.rocketmq.broker.BrokerController#start    public void start() throws Exception {        if (this.messageStore != null) {            this.messageStore.start();        }
          if (this.remotingServer != null) { this.remotingServer.start(); }
          if (this.fastRemotingServer != null) { this.fastRemotingServer.start(); }
          if (this.fileWatchService != null) { this.fileWatchService.start(); }
          if (this.brokerOuterAPI != null) { this.brokerOuterAPI.start(); }
          if (this.pullRequestHoldService != null) { this.pullRequestHoldService.start(); }
          if (this.clientHousekeepingService != null) { this.clientHousekeepingService.start(); }
          if (this.filterServerManager != null) { this.filterServerManager.start(); }
          if (!messageStoreConfig.isEnableDLegerCommitLog()) { startProcessorByHa(messageStoreConfig.getBrokerRole()); // 處理SLAVE消息同步 handleSlaveSynchronize(messageStoreConfig.getBrokerRole()); // 強(qiáng)制做一次注冊(cè)動(dòng)作 this.registerBrokerAll(true, false, true); } // 定期向nameserver注冊(cè)自身狀態(tài) this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
          @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
          if (this.brokerStatsManager != null) { this.brokerStatsManager.start(); }
          if (this.brokerFastFailure != null) { this.brokerFastFailure.start(); }

          }
          private void handleSlaveSynchronize(BrokerRole role) { // 只有slave節(jié)點(diǎn),才進(jìn)行同步操作 if (role == BrokerRole.SLAVE) { if (null != slaveSyncFuture) { slaveSyncFuture.cancel(false); } // 設(shè)置master節(jié)點(diǎn)為空,避免一開(kāi)始就進(jìn)行同步 // 后續(xù)必然有其他地方設(shè)計(jì) master信息 // 實(shí)際上它是在registerBrokerAll() 的時(shí)候,將master信息放入的 this.slaveSynchronize.setMasterAddr(null); // 10秒鐘同步一次數(shù)據(jù) slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.slaveSynchronize.syncAll(); } catch (Throwable e) { log.error("ScheduledTask SlaveSynchronize syncAll error.", e); } } }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS); } else { //handle the slave synchronise if (null != slaveSyncFuture) { slaveSyncFuture.cancel(false); } this.slaveSynchronize.setMasterAddr(null); } } public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
          if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { ConcurrentHashMap topicConfigTable = new ConcurrentHashMap(); for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) { TopicConfig tmp = new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), this.brokerConfig.getBrokerPermission()); topicConfigTable.put(topicConfig.getTopicName(), tmp); } topicConfigWrapper.setTopicConfigTable(topicConfigTable); } // 強(qiáng)制注冊(cè)或者進(jìn)行周期性注冊(cè)時(shí)間到時(shí),向nameserver注冊(cè)自身 if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.brokerConfig.getRegisterBrokerTimeoutMills())) { doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); } }
          private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway, TopicConfigSerializeWrapper topicConfigWrapper) { // 向多個(gè)nameserver依次注冊(cè) List registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll( this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.getHAServerAddr(), topicConfigWrapper, this.filterServerManager.buildNewFilterServerList(), oneway, this.brokerConfig.getRegisterBrokerTimeoutMills(), this.brokerConfig.isCompressedRegister());
          if (registerBrokerResultList.size() > 0) { RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0); if (registerBrokerResult != null) { if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); } // 更新master地址信息 this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
          if (checkOrderConfig) { this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); } } } } // org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll public List registerBrokerAll( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List filterServerList, final boolean oneway, final int timeoutMills, final boolean compressed) {
          final List registerBrokerResultList = Lists.newArrayList(); List nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
          final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed);
          RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); final byte[] body = requestBody.encode(compressed); final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { // 多線程同時(shí)注冊(cè)多個(gè)nameserver, 效果更佳 brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result != null) { registerBrokerResultList.add(result); }
          log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally { countDownLatch.countDown(); } } }); }
          try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } }
          return registerBrokerResultList; }

          基本上,master與slave差別不大,各broker需要的功能,都會(huì)具有的。比如都會(huì)開(kāi)啟各服務(wù)端口,都會(huì)進(jìn)行文件清理動(dòng)作,都會(huì)向nameserver注冊(cè)自身等等。唯一的差別在于,slave會(huì)另外開(kāi)啟一個(gè)同步的定時(shí)任務(wù),每10秒向master發(fā)送一次同步請(qǐng)求,即 syncAll(); 那么,所謂的同步,到底是同步個(gè)啥?即其如何實(shí)現(xiàn)同步?

          所有的主從同步的實(shí)現(xiàn)都在這里了:syncAll();

          // org.apache.rocketmq.broker.slave.SlaveSynchronize#syncAll    public void syncAll() {        // 同步topic配置信息        this.syncTopicConfig();        // 同步消費(fèi)偏移量信息        this.syncConsumerOffset();        // 同步延遲信息        this.syncDelayOffset();        // 同步消費(fèi)組信息數(shù)據(jù),所以主從同步的核心,是基于消息的訂閱來(lái)實(shí)現(xiàn)的        this.syncSubscriptionGroupConfig();    }
          // 同步topic配置信息 private void syncTopicConfig() { String masterAddrBak = this.masterAddr; // 存在master地址,且該地址不是自身時(shí),才會(huì)進(jìn)行同步動(dòng)作 if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) { try { TopicConfigSerializeWrapper topicWrapper = this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak); // 版本發(fā)生變更,即數(shù)據(jù)有變化,則寫(xiě)入新的版本數(shù)據(jù) if (!this.brokerController.getTopicConfigManager().getDataVersion() .equals(topicWrapper.getDataVersion())) {
          this.brokerController.getTopicConfigManager().getDataVersion() .assignNewOne(topicWrapper.getDataVersion()); this.brokerController.getTopicConfigManager().getTopicConfigTable().clear(); this.brokerController.getTopicConfigManager().getTopicConfigTable() .putAll(topicWrapper.getTopicConfigTable()); // 持久化topic信息 this.brokerController.getTopicConfigManager().persist();
          log.info("Update slave topic config from master, {}", masterAddrBak); } } catch (Exception e) { log.error("SyncTopicConfig Exception, {}", masterAddrBak, e); } } }
          // 同步消費(fèi)偏移量信息 private void syncConsumerOffset() { String masterAddrBak = this.masterAddr; if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) { try { ConsumerOffsetSerializeWrapper offsetWrapper = this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak); this.brokerController.getConsumerOffsetManager().getOffsetTable() .putAll(offsetWrapper.getOffsetTable()); this.brokerController.getConsumerOffsetManager().persist(); log.info("Update slave consumer offset from master, {}", masterAddrBak); } catch (Exception e) { log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e); } } } // 額。。。 反正就是一個(gè)數(shù)字吧, 存儲(chǔ)在 config/delayOffset.json 下 private void syncDelayOffset() { String masterAddrBak = this.masterAddr; if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) { try { String delayOffset = this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak); if (delayOffset != null) {
          String fileName = StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController .getMessageStoreConfig().getStorePathRootDir()); try { MixAll.string2File(delayOffset, fileName); } catch (IOException e) { log.error("Persist file Exception, {}", fileName, e); } } log.info("Update slave delay offset from master, {}", masterAddrBak); } catch (Exception e) { log.error("SyncDelayOffset Exception, {}", masterAddrBak, e); } } }
          // 同步消費(fèi)組信息數(shù)據(jù) private void syncSubscriptionGroupConfig() { String masterAddrBak = this.masterAddr; if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) { try { SubscriptionGroupWrapper subscriptionWrapper = this.brokerController.getBrokerOuterAPI() .getAllSubscriptionGroupConfig(masterAddrBak);
          if (!this.brokerController.getSubscriptionGroupManager().getDataVersion() .equals(subscriptionWrapper.getDataVersion())) { SubscriptionGroupManager subscriptionGroupManager = this.brokerController.getSubscriptionGroupManager(); subscriptionGroupManager.getDataVersion().assignNewOne( subscriptionWrapper.getDataVersion()); subscriptionGroupManager.getSubscriptionGroupTable().clear(); subscriptionGroupManager.getSubscriptionGroupTable().putAll( subscriptionWrapper.getSubscriptionGroupTable()); // 持久化消費(fèi)組信息 subscriptionGroupManager.persist(); log.info("Update slave Subscription Group from master, {}", masterAddrBak); } } catch (Exception e) { log.error("SyncSubscriptionGroup Exception, {}", masterAddrBak, e); } } }

          以上,就是rocketmq的主從同步的主體框架代碼了。回答上面的幾個(gè)疑問(wèn):同步個(gè)啥?同步4種數(shù)據(jù):topic信息、消費(fèi)偏移信息、延遲信息、訂閱組信息;同步的及時(shí)性如何?每10秒發(fā)起一步同步請(qǐng)求,即延遲是10秒級(jí)的。

          等等,以上同步的信息,看起來(lái)都是元數(shù)據(jù)信息。那么消息數(shù)據(jù)的同步去哪里了?這可是我們最關(guān)心的啊!

          4. rocketmq消息數(shù)據(jù)的同步實(shí)現(xiàn)

          經(jīng)過(guò)上一節(jié)的分析,我們好像摸到了點(diǎn)皮毛,然后發(fā)現(xiàn)不是想要的。因?yàn)槎〞r(shí)任務(wù)只同步了元數(shù)據(jù)信息,而真正的數(shù)據(jù)信息同步去了哪里呢?實(shí)際上,它是由一個(gè)HAService去承載該功能的,HAService會(huì)使用的一個(gè)主循環(huán),一直不停地向master拉取數(shù)據(jù),然后添加到自身的commitlog文件中,從而實(shí)現(xiàn)真正的數(shù)據(jù)同步。

          4.1. HAService的開(kāi)啟

          同步服務(wù)是一系列專門的實(shí)現(xiàn)的,它包括server端,客戶端以及一些維護(hù)線程。這需要我們分開(kāi)理解。同步服務(wù)的開(kāi)啟,是在messageStore初始化時(shí)做的。它會(huì)讀取一個(gè)單獨(dú)的端口配置,開(kāi)啟HA同步服務(wù)

          // org.apache.rocketmq.store.DefaultMessageStore#DefaultMessageStore    public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,        final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {        this.messageArrivingListener = messageArrivingListener;        this.brokerConfig = brokerConfig;        this.messageStoreConfig = messageStoreConfig;        this.brokerStatsManager = brokerStatsManager;        this.allocateMappedFileService = new AllocateMappedFileService(this);        if (messageStoreConfig.isEnableDLegerCommitLog()) {            this.commitLog = new DLedgerCommitLog(this);        } else {            this.commitLog = new CommitLog(this);        }        this.consumeQueueTable = new ConcurrentHashMap<>(32);
          this.flushConsumeQueueService = new FlushConsumeQueueService(); this.cleanCommitLogService = new CleanCommitLogService(); this.cleanConsumeQueueService = new CleanConsumeQueueService(); this.storeStatsService = new StoreStatsService(); this.indexService = new IndexService(this); if (!messageStoreConfig.isEnableDLegerCommitLog()) { // 初始化 HAService this.haService = new HAService(this); } else { this.haService = null; } ... File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir())); MappedFile.ensureDirOK(file.getParent()); lockFile = new RandomAccessFile(file, "rw"); }
          // org.apache.rocketmq.store.ha.HAService#HAService public HAService(final DefaultMessageStore defaultMessageStore) throws IOException { this.defaultMessageStore = defaultMessageStore; // 開(kāi)啟server端服務(wù) this.acceptSocketService = new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort()); this.groupTransferService = new GroupTransferService(); // 初始化client this.haClient = new HAClient(); } // 具體運(yùn)行則都會(huì)被視為一個(gè)個(gè)的后臺(tái)線程,會(huì)在start()操作中統(tǒng)一運(yùn)行起來(lái) public void start() throws Exception { // server端服務(wù)啟動(dòng),由master節(jié)點(diǎn)管控 this.acceptSocketService.beginAccept(); this.acceptSocketService.start(); // 數(shù)據(jù)中轉(zhuǎn)服務(wù),它會(huì)接收用戶的寫(xiě)請(qǐng)求,然后吐數(shù)據(jù)給到各slave節(jié)點(diǎn) this.groupTransferService.start(); // 客戶端請(qǐng)求服務(wù),由slave節(jié)點(diǎn)發(fā)起 this.haClient.start(); }

          HAService作為rocketmq中的一個(gè)小型服務(wù),運(yùn)行在后臺(tái)線程中,為了簡(jiǎn)單起見(jiàn)或者資源隔離,它使用一些單獨(dú)的端口和通信實(shí)現(xiàn)處理。也可謂麻雀雖小,五臟俱全。下面我就分三個(gè)單獨(dú)的部分講解下如何實(shí)現(xiàn)數(shù)據(jù)同步。

          4.2. 從節(jié)點(diǎn)同步實(shí)現(xiàn)

          從節(jié)點(diǎn)負(fù)責(zé)主動(dòng)拉取主節(jié)點(diǎn)數(shù)據(jù),是一個(gè)比較重要的步驟。它的實(shí)現(xiàn)是在 HAClient 中的,該client啟動(dòng)起來(lái)之后,會(huì)一直不停地向master請(qǐng)求新的數(shù)據(jù),然后同步到自己的commitlog中。

          // org.apache.rocketmq.store.ha.HAService.HAClient#run        @Override        public void run() {            log.info(this.getServiceName() + " service started");
          while (!this.isStopped()) { try { // 使用原生nio, 嘗試連接至master if (this.connectMaster()) {
          if (this.isTimeToReportOffset()) { // 隔一段時(shí)間向master匯報(bào)一次本slave的同步信息 boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset); // 如果連接無(wú)效,則關(guān)閉,下次再循環(huán)周期將會(huì)重新發(fā)起連接 if (!result) { this.closeMaster(); } } this.selector.select(1000); // 核心邏輯:處理獲取到的消息數(shù)據(jù) boolean ok = this.processReadEvent(); if (!ok) { this.closeMaster(); }
          if (!reportSlaveMaxOffsetPlus()) { continue; }
          long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp; if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig() .getHaHousekeepingInterval()) { log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress + "] expired, " + interval); this.closeMaster(); log.warn("HAClient, master not response some time, so close connection"); } } else { // 未連接成功,5秒后重試,可能會(huì)一直無(wú)用 this.waitForRunning(1000 * 5); } } catch (Exception e) { log.warn(this.getServiceName() + " service has exception. ", e); this.waitForRunning(1000 * 5); } }
          log.info(this.getServiceName() + " service end"); }
          private boolean connectMaster() throws ClosedChannelException { // 單例長(zhǎng)鏈接 if (null == socketChannel) { String addr = this.masterAddress.get(); // 如果沒(méi)有master, 則返回空 // 針對(duì)master節(jié)點(diǎn),也是同樣的運(yùn)行,只是不會(huì)連接到任何節(jié)點(diǎn)而已 if (addr != null) {
          SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr); if (socketAddress != null) { // 原生nio實(shí)現(xiàn) this.socketChannel = RemotingUtil.connect(socketAddress); if (this.socketChannel != null) { this.socketChannel.register(this.selector, SelectionKey.OP_READ); } } }
          this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
          this.lastWriteTimestamp = System.currentTimeMillis(); }
          return this.socketChannel != null; } // org.apache.rocketmq.remoting.common.RemotingUtil#connect public static SocketChannel connect(SocketAddress remote) { return connect(remote, 1000 * 5); } public static SocketChannel connect(SocketAddress remote, final int timeoutMillis) { SocketChannel sc = null; try { sc = SocketChannel.open(); sc.configureBlocking(true); sc.socket().setSoLinger(false, -1); sc.socket().setTcpNoDelay(true); sc.socket().setReceiveBufferSize(1024 * 64); sc.socket().setSendBufferSize(1024 * 64); sc.socket().connect(remote, timeoutMillis); sc.configureBlocking(false); return sc; } catch (Exception e) { if (sc != null) { try { sc.close(); } catch (IOException e1) { e1.printStackTrace(); } } } return null; } processReadEvent() 即是在收到master的新數(shù)據(jù)后,實(shí)現(xiàn)如何同步到本broker的commitlog中。其實(shí)現(xiàn)主要還是依賴于commitlogService. // org.apache.rocketmq.store.ha.HAService.HAClient#processReadEvent private boolean processReadEvent() { int readSizeZeroTimes = 0; while (this.byteBufferRead.hasRemaining()) { try { int readSize = this.socketChannel.read(this.byteBufferRead); if (readSize > 0) { readSizeZeroTimes = 0; boolean result = this.dispatchReadRequest(); if (!result) { log.error("HAClient, dispatchReadRequest error"); return false; } } else if (readSize == 0) { if (++readSizeZeroTimes >= 3) { break; } } else { log.info("HAClient, processReadEvent read socket < 0"); return false; } } catch (IOException e) { log.info("HAClient, processReadEvent read socket exception", e); return false; } }
          return true; }
          private boolean dispatchReadRequest() { // 按協(xié)議讀取數(shù)據(jù) final int msgHeaderSize = 8 + 4; // phyoffset + size int readSocketPos = this.byteBufferRead.position();
          while (true) { int diff = this.byteBufferRead.position() - this.dispatchPosition; if (diff >= msgHeaderSize) { long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition); int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
          long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
          if (slavePhyOffset != 0) { if (slavePhyOffset != masterPhyOffset) { log.error("master pushed offset not equal the max phy offset in slave, SLAVE: " + slavePhyOffset + " MASTER: " + masterPhyOffset); return false; } } // 數(shù)據(jù)讀取完成,則立即添加到存儲(chǔ)中 if (diff >= (msgHeaderSize + bodySize)) { byte[] bodyData = new byte[bodySize]; this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize); this.byteBufferRead.get(bodyData);
          HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
          this.byteBufferRead.position(readSocketPos); this.dispatchPosition += msgHeaderSize + bodySize;
          if (!reportSlaveMaxOffsetPlus()) { return false; }
          continue; } }
          if (!this.byteBufferRead.hasRemaining()) { this.reallocateByteBuffer(); }
          break; }
          return true; } // org.apache.rocketmq.store.DefaultMessageStore#appendToCommitLog @Override public boolean appendToCommitLog(long startOffset, byte[] data) { if (this.shutdown) { log.warn("message store has shutdown, so appendToPhyQueue is forbidden"); return false; } // 添加到commitlog中,并生成后續(xù)的consumeQueue,index等相關(guān)信息 boolean result = this.commitLog.appendData(startOffset, data); if (result) { this.reputMessageService.wakeup(); } else { log.error("appendToPhyQueue failed " + startOffset + " " + data.length); }
          return result; }

          從slave節(jié)點(diǎn)的處理流程,我們基本上已經(jīng)完全搞清楚了rocketmq如何同步數(shù)據(jù)的了。單獨(dú)開(kāi)啟一個(gè)端口用于同步數(shù)據(jù),slave一直不停地輪詢master, 拿到新數(shù)據(jù)后,就將其添加到自身的commitlog中,構(gòu)造自身的數(shù)據(jù)集。從而保持與master的同步。(請(qǐng)需要注意數(shù)據(jù)一致性)

          4.3. master的數(shù)據(jù)同步服務(wù)

          從節(jié)點(diǎn)負(fù)責(zé)不停從主節(jié)點(diǎn)拉取數(shù)據(jù),所以主節(jié)點(diǎn)只要給到數(shù)據(jù)就可以了。但至少,主節(jié)點(diǎn)還是有一個(gè)網(wǎng)絡(luò)服務(wù),以便接受從節(jié)點(diǎn)的請(qǐng)求。

          這同樣是在 HAService中,它直接以nio的形式開(kāi)啟一個(gè)服務(wù)端口,從而接收請(qǐng)求:

          // org.apache.rocketmq.store.ha.HAService.AcceptSocketService    /**     * Listens to slave connections to create {@link HAConnection}.     */    class AcceptSocketService extends ServiceThread {        private final SocketAddress socketAddressListen;        private ServerSocketChannel serverSocketChannel;        private Selector selector;        // 給定端口監(jiān)聽(tīng)        public AcceptSocketService(final int port) {            this.socketAddressListen = new InetSocketAddress(port);        }
          /** * Starts listening to slave connections. * * @throws Exception If fails. */ public void beginAccept() throws Exception { this.serverSocketChannel = ServerSocketChannel.open(); this.selector = RemotingUtil.openSelector(); this.serverSocketChannel.socket().setReuseAddress(true); this.serverSocketChannel.socket().bind(this.socketAddressListen); this.serverSocketChannel.configureBlocking(false); this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); }
          /** * {@inheritDoc} */ @Override public void run() { log.info(this.getServiceName() + " service started");
          while (!this.isStopped()) { try { this.selector.select(1000); Set selected = this.selector.selectedKeys();
          if (selected != null) { for (SelectionKey k : selected) { if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
          if (sc != null) { HAService.log.info("HAService receive new connection, " + sc.socket().getRemoteSocketAddress());
          try { HAConnection conn = new HAConnection(HAService.this, sc); // accept 接入后,開(kāi)啟另外的讀線程處理數(shù)據(jù)請(qǐng)求 conn.start(); HAService.this.addConnection(conn); } catch (Exception e) { log.error("new HAConnection exception", e); sc.close(); } } } else { log.warn("Unexpected ops in select " + k.readyOps()); } }
          selected.clear(); } } catch (Exception e) { log.error(this.getServiceName() + " service has exception.", e); } }
          log.info(this.getServiceName() + " service end"); } ... } // org.apache.rocketmq.store.ha.HAConnection#start public void start() { this.readSocketService.start(); this.writeSocketService.start(); } // org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#run @Override public void run() { HAConnection.log.info(this.getServiceName() + " service started");
          while (!this.isStopped()) { try { this.selector.select(1000); boolean ok = this.processReadEvent(); if (!ok) { HAConnection.log.error("processReadEvent error"); break; }
          long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp; if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) { log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval); break; } } catch (Exception e) { HAConnection.log.error(this.getServiceName() + " service has exception.", e); break; } }
          this.makeStop();
          writeSocketService.makeStop();
          haService.removeConnection(HAConnection.this);
          HAConnection.this.haService.getConnectionCount().decrementAndGet();
          SelectionKey sk = this.socketChannel.keyFor(this.selector); if (sk != null) { sk.cancel(); }
          try { this.selector.close(); this.socketChannel.close(); } catch (IOException e) { HAConnection.log.error("", e); }
          HAConnection.log.info(this.getServiceName() + " service end"); }
          private boolean processReadEvent() { int readSizeZeroTimes = 0;
          if (!this.byteBufferRead.hasRemaining()) { this.byteBufferRead.flip(); this.processPosition = 0; }
          while (this.byteBufferRead.hasRemaining()) { try { int readSize = this.socketChannel.read(this.byteBufferRead); if (readSize > 0) { readSizeZeroTimes = 0; this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); if ((this.byteBufferRead.position() - this.processPosition) >= 8) { int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8); long readOffset = this.byteBufferRead.getLong(pos - 8); this.processPosition = pos; // 讀取唯一參數(shù) HAConnection.this.slaveAckOffset = readOffset; if (HAConnection.this.slaveRequestOffset < 0) { HAConnection.this.slaveRequestOffset = readOffset; log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset); } // ... HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset); } } else if (readSize == 0) { if (++readSizeZeroTimes >= 3) { break; } } else { log.error("read socket[" + HAConnection.this.clientAddr + "] < 0"); return false; } } catch (IOException e) { log.error("processReadEvent exception", e); return false; } }
          return true; } // org.apache.rocketmq.store.ha.HAService#notifyTransferSome public void notifyTransferSome(final long offset) { for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) { boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset); if (ok) { this.groupTransferService.notifyTransferSome(); break; } else { value = this.push2SlaveMaxOffset.get(); } } }

          端口開(kāi)啟及接受請(qǐng)求很容易,但如何響應(yīng)客戶端還是有點(diǎn)復(fù)雜的。各自同學(xué)自行深入吧!

          GroupCommitService 通過(guò)一個(gè)寫(xiě)隊(duì)列和讀隊(duì)列,在有消息寫(xiě)入時(shí)將被調(diào)用,從而達(dá)到實(shí)時(shí)通知的目的。

          // org.apache.rocketmq.store.ha.HAService.GroupTransferService#putRequest        public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {            synchronized (this.requestsWrite) {                this.requestsWrite.add(request);            }            this.wakeup();        }
          public void notifyTransferSome() { this.notifyTransferObject.wakeup(); }
          private void swapRequests() { // 交換buffer List tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; }
          private void doWaitTransfer() { synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) { for (CommitLog.GroupCommitRequest req : this.requestsRead) { boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now() + HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(); while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) { this.notifyTransferObject.waitForRunning(1000); transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); }
          if (!transferOK) { log.warn("transfer messsage to slave timeout, " + req.getNextOffset()); }
          req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT); }
          this.requestsRead.clear(); } } }
          public void run() { log.info(this.getServiceName() + " service started");
          while (!this.isStopped()) { try { this.waitForRunning(10); this.doWaitTransfer(); } catch (Exception e) { log.warn(this.getServiceName() + " service has exception. ", e); } }
          log.info(this.getServiceName() + " service end"); }


          至此,rocketmq主從同步解析完成。rocketmq基于commitlog實(shí)現(xiàn)核心主從同步,以及其他多個(gè)元數(shù)據(jù)信息的簡(jiǎn)單定時(shí)同步,并以兩個(gè)緩沖buffer的形式,及時(shí)將數(shù)據(jù)推送到從節(jié)點(diǎn)。保證了盡量好的數(shù)據(jù)一致性。




          往期精彩推薦



          騰訊、阿里、滴滴后臺(tái)面試題匯總總結(jié) — (含答案)

          面試:史上最全多線程面試題 !

          最新阿里內(nèi)推Java后端面試題

          JVM難學(xué)?那是因?yàn)槟銢](méi)認(rèn)真看完這篇文章


          END


          關(guān)注作者微信公眾號(hào) —《JAVA爛豬皮》


          了解更多java后端架構(gòu)知識(shí)以及最新面試寶典


          你點(diǎn)的每個(gè)好看,我都認(rèn)真當(dāng)成了


          看完本文記得給作者點(diǎn)贊+在看哦~~~大家的支持,是作者源源不斷出文的動(dòng)力


          作者:等你歸去來(lái)

          出處:https://www.cnblogs.com/yougewe/p/14198675.html

          瀏覽 32
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  性爱网站在线 | 成人做爰黄A片免费看直播室动漫 | 欧美国产手机在线 | 在线免费观看黄色视频网站 | 日逼视频免费国产 |