RocketMQ(九):主從同步的實(shí)現(xiàn)
分布式系統(tǒng)的三大理論CAP就不說(shuō)了,但是作為分布式消息系統(tǒng)的rocketmq, 主從功能是最最基礎(chǔ)的保證可用性的手段了。也許該功能現(xiàn)在已經(jīng)不是很常用了,但是對(duì)于我們理解一些分布式系統(tǒng)的常用工作原理還是有些積極意義的。
今天就一起來(lái)挖挖rocketmq是如何實(shí)現(xiàn)主從數(shù)據(jù)同步的吧。
1:主從同步概述
主從同步這個(gè)概念相信大家在平時(shí)的工作中,多少都會(huì)聽(tīng)到。其目的主要是用于做一備份類操作,以及一些讀寫(xiě)分離場(chǎng)景。比如我們常用的關(guān)系型數(shù)據(jù)庫(kù)mysql,就有主從同步功能在。
主從同步,就是將主服務(wù)器上的數(shù)據(jù)同步到從服務(wù)器上,也就是相當(dāng)于新增了一個(gè)副本。
而具體的主從同步的實(shí)現(xiàn)也各有千秋,如mysql中通過(guò)binlog實(shí)現(xiàn)主從同步,es中通過(guò)translog實(shí)現(xiàn)主從同步,redis中通過(guò)aof實(shí)現(xiàn)主從同步。那么,rocketmq又是如何實(shí)現(xiàn)的主從同步呢?
另外,主從同步需要考慮的問(wèn)題是哪些呢?
1. 數(shù)據(jù)同步的及時(shí)性?(延遲與一致性)
2. 對(duì)主服務(wù)器的影響性?(可用性)
3. 是否可替代主服務(wù)器?(可用性或者分區(qū)容忍性)
前面兩個(gè)點(diǎn)是必須要考慮的,但對(duì)于第3個(gè)點(diǎn),則可能不會(huì)被考慮。因?yàn)橥ㄟ^(guò)系統(tǒng)可能無(wú)法很好的做到這一點(diǎn),所以很多系統(tǒng)就直接忽略這一點(diǎn)了,簡(jiǎn)單嘛。即很多時(shí)候只把從服務(wù)器當(dāng)作是一個(gè)備份存在,不會(huì)接受寫(xiě)請(qǐng)求。如果要進(jìn)行主從切換,必須要人工介入,做預(yù)知的有損切換。但隨著技術(shù)的發(fā)展,現(xiàn)在已有非常多的自動(dòng)切換主從的服務(wù)存在,這是在分布式系統(tǒng)滿天下的當(dāng)今的必然趨勢(shì)。
2. rocketmq主從同步配置
在rocketmq中,最核心的組件是 broker, 它負(fù)責(zé)幾乎所有的存儲(chǔ)讀取業(yè)務(wù)。所以,要談主從同步,那必然是針對(duì)broker進(jìn)行的。我們?cè)賮?lái)回看rocketmq的部署架構(gòu)圖,以便全局觀察:

非常清晰的架構(gòu),無(wú)須多言。因?yàn)槲覀冎v的是主從同步,所以只看broker這個(gè)組件,那么整個(gè)架構(gòu)就可以簡(jiǎn)化為:? BrokerMaster -> BrokerSlave 了。同樣,再簡(jiǎn)化,主從同步就是如何將Master的數(shù)據(jù)同步到Slave這么個(gè)過(guò)程。
那么,如何配置使用主從同步呢?
conf/broker-a.properties? (master配置)
#所屬集群名字brokerClusterName=DefaultCluster#broker名字,名字可重復(fù),為了管理,每個(gè)master起一個(gè)名字,他的slave同他,eg:Amaster叫broker-a,他的slave也叫broker-abrokerName=broker-a#0 表示 Master,>0 表示 SlavebrokerId=0#Broker 的角色#- ASYNC_MASTER 異步復(fù)制Master#- SYNC_MASTER 同步雙寫(xiě)Master#- SLAVEbrokerRole=ASYNC_MASTER#刷盤方式#- ASYNC_FLUSH 異步刷盤#- SYNC_FLUSH 同步刷盤flushDiskType=ASYNC_FLUSH#nameServer地址,分號(hào)分割namesrvAddr=172.0.1.5:9876;172.0.1.6:9876#在發(fā)送消息時(shí),自動(dòng)創(chuàng)建服務(wù)器不存在的topic,默認(rèn)創(chuàng)建的隊(duì)列數(shù)defaultTopicQueueNums=4#是否允許 Broker 自動(dòng)創(chuàng)建Topic,建議線下開(kāi)啟,線上關(guān)閉autoCreateTopicEnable=true#是否允許 Broker 自動(dòng)創(chuàng)建訂閱組,建議線下開(kāi)啟,線上關(guān)閉autoCreateSubscriptionGroup=true#Broker 對(duì)外服務(wù)的監(jiān)聽(tīng)端口,listenPort=10911#刪除文件時(shí)間點(diǎn),默認(rèn)凌晨 4點(diǎn)deleteWhen=04#文件保留時(shí)間,默認(rèn) 48 小時(shí)fileReservedTime=120#commitLog每個(gè)文件的大小默認(rèn)1GmapedFileSizeCommitLog=1073741824#ConsumeQueue每個(gè)文件默認(rèn)存30W條,根據(jù)業(yè)務(wù)情況調(diào)整mapedFileSizeConsumeQueue=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#檢測(cè)物理文件磁盤空間diskMaxUsedSpaceRatio=88#存儲(chǔ)路徑storePathRootDir=/usr/local/rocketmq/store/broker-a#commitLog 存儲(chǔ)路徑storePathCommitLog=/usr/local/rocketmq/store/broker-a/commitlog#消費(fèi)隊(duì)列存儲(chǔ)路徑存儲(chǔ)路徑storePathConsumeQueue=/usr/local/rocketmq/store/broker-a/consumequeue#消息索引存儲(chǔ)路徑storePathIndex=/usr/local/rocketmq/store/broker-a/index#checkpoint 文件存儲(chǔ)路徑storeCheckpoint=/usr/local/rocketmq/store/checkpoint#abort 文件存儲(chǔ)路徑abortFile=/usr/local/rocketmq/store/abort#限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#checkTransactionMessageEnable=false#發(fā)消息線程池?cái)?shù)量#sendMessageThreadPoolNums=128#拉消息線程池?cái)?shù)量#pullMessageThreadPoolNums=128
conf/broker-a-s.properties (slave配置)
#所屬集群名字brokerClusterName=DefaultCluster#broker名字,名字可重復(fù),為了管理,每個(gè)master起一個(gè)名字,他的slave同他,eg:Amaster叫broker-a,他的slave也叫broker-abrokerName=broker-a#0 表示 Master,>0 表示 SlavebrokerId=1#Broker 的角色#- ASYNC_MASTER 異步復(fù)制Master#- SYNC_MASTER 同步雙寫(xiě)Master#- SLAVEbrokerRole=SLAVE#刷盤方式#- ASYNC_FLUSH 異步刷盤#- SYNC_FLUSH 同步刷盤flushDiskType=ASYNC_FLUSH#nameServer地址,分號(hào)分割namesrvAddr=172.0.1.5:9876;172.0.1.6:9876#在發(fā)送消息時(shí),自動(dòng)創(chuàng)建服務(wù)器不存在的topic,默認(rèn)創(chuàng)建的隊(duì)列數(shù)defaultTopicQueueNums=4#是否允許 Broker 自動(dòng)創(chuàng)建Topic,建議線下開(kāi)啟,線上關(guān)閉autoCreateTopicEnable=true#是否允許 Broker 自動(dòng)創(chuàng)建訂閱組,建議線下開(kāi)啟,線上關(guān)閉autoCreateSubscriptionGroup=true#Broker 對(duì)外服務(wù)的監(jiān)聽(tīng)端口,listenPort=10920#刪除文件時(shí)間點(diǎn),默認(rèn)凌晨 4點(diǎn)deleteWhen=04#文件保留時(shí)間,默認(rèn) 48 小時(shí)fileReservedTime=120#commitLog每個(gè)文件的大小默認(rèn)1GmapedFileSizeCommitLog=1073741824#ConsumeQueue每個(gè)文件默認(rèn)存30W條,根據(jù)業(yè)務(wù)情況調(diào)整mapedFileSizeConsumeQueue=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#檢測(cè)物理文件磁盤空間diskMaxUsedSpaceRatio=88#存儲(chǔ)路徑storePathRootDir=/usr/local/rocketmq/store/broker-a-s#commitLog 存儲(chǔ)路徑storePathCommitLog=/usr/local/rocketmq/store/broker-a-s/commitlog#消費(fèi)隊(duì)列存儲(chǔ)路徑存儲(chǔ)路徑storePathConsumeQueue=/usr/local/rocketmq/store/broker-a-s/consumequeue#消息索引存儲(chǔ)路徑storePathIndex=/usr/local/rocketmq/store/broker-a-s/index#checkpoint 文件存儲(chǔ)路徑storeCheckpoint=/usr/local/rocketmq/store/checkpoint#abort 文件存儲(chǔ)路徑abortFile=/usr/local/rocketmq/store/abort#限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#checkTransactionMessageEnable=false#發(fā)消息線程池?cái)?shù)量#sendMessageThreadPoolNums=128#拉消息線程池?cái)?shù)量#pullMessageThreadPoolNums=128
實(shí)際上具體配置文件叫什么名字不重要,重要的是要在啟動(dòng)時(shí)指定指定對(duì)應(yīng)的配置文件位置即可。啟動(dòng)master/slave命令如下:
nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties > logs/broker-a.log 2>&1 &nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a-s.properties > logs/broker-a-s.log 2>&1 &
以上配置,如果怕啟動(dòng)命令出錯(cuò),也可以統(tǒng)一使用一個(gè) broker.properties (默認(rèn)查找), 里面寫(xiě)不同的內(nèi)容,這樣就無(wú)需在不同機(jī)器上使用不同的命令啟動(dòng)了,也避免了一定程度的誤操作。
當(dāng)然要在啟動(dòng)broker之前啟動(dòng)nameserver節(jié)點(diǎn)。這樣,一個(gè)rocketmq的主從集群就配置好了。配置項(xiàng)看起來(lái)有點(diǎn)多,但核心實(shí)際上只有一個(gè):在保持brokderName相同的前提下配置brokerRole=ASYNC_MASTER|SLAVE|SYNC_MASTER, 通過(guò)這個(gè)值就可以確定是主是從。從向主復(fù)制數(shù)據(jù)或者主向從同步數(shù)據(jù)。
3. rocketmq主從同步的實(shí)現(xiàn)
了解完主從配置,才是我們理解實(shí)現(xiàn)的開(kāi)始。也從上面的說(shuō)明中,我們看出,一個(gè)broker是master或者slave是在配置文件中就指定了的,也就是說(shuō)這個(gè)性質(zhì)是改不了的了。所以,這個(gè)主從相關(guān)的動(dòng)作,會(huì)在broker啟動(dòng)時(shí)就表現(xiàn)出不一樣了。
我們先看看broker運(yùn)行同步的大體框架如何:
// org.apache.rocketmq.broker.BrokerController#startpublic void start() throws Exception {if (this.messageStore != null) {this.messageStore.start();}if (this.remotingServer != null) {this.remotingServer.start();}if (this.fastRemotingServer != null) {this.fastRemotingServer.start();}if (this.fileWatchService != null) {this.fileWatchService.start();}if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();}if (this.pullRequestHoldService != null) {this.pullRequestHoldService.start();}if (this.clientHousekeepingService != null) {this.clientHousekeepingService.start();}if (this.filterServerManager != null) {this.filterServerManager.start();}if (!messageStoreConfig.isEnableDLegerCommitLog()) {startProcessorByHa(messageStoreConfig.getBrokerRole());// 處理SLAVE消息同步handleSlaveSynchronize(messageStoreConfig.getBrokerRole());// 強(qiáng)制做一次注冊(cè)動(dòng)作this.registerBrokerAll(true, false, true);}// 定期向nameserver注冊(cè)自身狀態(tài)this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager != null) {this.brokerStatsManager.start();}if (this.brokerFastFailure != null) {this.brokerFastFailure.start();}}private void handleSlaveSynchronize(BrokerRole role) {// 只有slave節(jié)點(diǎn),才進(jìn)行同步操作if (role == BrokerRole.SLAVE) {if (null != slaveSyncFuture) {slaveSyncFuture.cancel(false);}// 設(shè)置master節(jié)點(diǎn)為空,避免一開(kāi)始就進(jìn)行同步// 后續(xù)必然有其他地方設(shè)計(jì) master信息// 實(shí)際上它是在registerBrokerAll() 的時(shí)候,將master信息放入的this.slaveSynchronize.setMasterAddr(null);// 10秒鐘同步一次數(shù)據(jù)slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.slaveSynchronize.syncAll();}catch (Throwable e) {log.error("ScheduledTask SlaveSynchronize syncAll error.", e);}}}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);} else {//handle the slave synchroniseif (null != slaveSyncFuture) {slaveSyncFuture.cancel(false);}this.slaveSynchronize.setMasterAddr(null);}}public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {ConcurrentHashMaptopicConfigTable = new ConcurrentHashMap (); for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {TopicConfig tmp =new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),this.brokerConfig.getBrokerPermission());topicConfigTable.put(topicConfig.getTopicName(), tmp);}topicConfigWrapper.setTopicConfigTable(topicConfigTable);}// 強(qiáng)制注冊(cè)或者進(jìn)行周期性注冊(cè)時(shí)間到時(shí),向nameserver注冊(cè)自身if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.brokerConfig.getRegisterBrokerTimeoutMills())) {doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);}}private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,TopicConfigSerializeWrapper topicConfigWrapper) {// 向多個(gè)nameserver依次注冊(cè)ListregisterBrokerResultList = this.brokerOuterAPI.registerBrokerAll( this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.getHAServerAddr(),topicConfigWrapper,this.filterServerManager.buildNewFilterServerList(),oneway,this.brokerConfig.getRegisterBrokerTimeoutMills(),this.brokerConfig.isCompressedRegister());if (registerBrokerResultList.size() > 0) {RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);if (registerBrokerResult != null) {if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());}// 更新master地址信息this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());if (checkOrderConfig) {this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());}}}}// org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAllpublic ListregisterBrokerAll( final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final ListfilterServerList, final boolean oneway,final int timeoutMills,final boolean compressed) {final ListregisterBrokerResultList = Lists.newArrayList(); ListnameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) {final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);requestHeader.setHaServerAddr(haServerAddr);requestHeader.setCompressed(compressed);RegisterBrokerBody requestBody = new RegisterBrokerBody();requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);requestBody.setFilterServerList(filterServerList);final byte[] body = requestBody.encode(compressed);final int bodyCrc32 = UtilAll.crc32(body);requestHeader.setBodyCrc32(bodyCrc32);final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());for (final String namesrvAddr : nameServerAddressList) {// 多線程同時(shí)注冊(cè)多個(gè)nameserver, 效果更佳brokerOuterExecutor.execute(new Runnable() {@Overridepublic void run() {try {RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);if (result != null) {registerBrokerResultList.add(result);}log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);} catch (Exception e) {log.warn("registerBroker Exception, {}", namesrvAddr, e);} finally {countDownLatch.countDown();}}});}try {countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {}}return registerBrokerResultList;}
基本上,master與slave差別不大,各broker需要的功能,都會(huì)具有的。比如都會(huì)開(kāi)啟各服務(wù)端口,都會(huì)進(jìn)行文件清理動(dòng)作,都會(huì)向nameserver注冊(cè)自身等等。唯一的差別在于,slave會(huì)另外開(kāi)啟一個(gè)同步的定時(shí)任務(wù),每10秒向master發(fā)送一次同步請(qǐng)求,即 syncAll(); 那么,所謂的同步,到底是同步個(gè)啥?即其如何實(shí)現(xiàn)同步?
所有的主從同步的實(shí)現(xiàn)都在這里了:syncAll();
// org.apache.rocketmq.broker.slave.SlaveSynchronize#syncAllpublic void syncAll() {// 同步topic配置信息this.syncTopicConfig();// 同步消費(fèi)偏移量信息this.syncConsumerOffset();// 同步延遲信息this.syncDelayOffset();// 同步消費(fèi)組信息數(shù)據(jù),所以主從同步的核心,是基于消息的訂閱來(lái)實(shí)現(xiàn)的this.syncSubscriptionGroupConfig();}// 同步topic配置信息private void syncTopicConfig() {String masterAddrBak = this.masterAddr;// 存在master地址,且該地址不是自身時(shí),才會(huì)進(jìn)行同步動(dòng)作if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {try {TopicConfigSerializeWrapper topicWrapper =this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);// 版本發(fā)生變更,即數(shù)據(jù)有變化,則寫(xiě)入新的版本數(shù)據(jù)if (!this.brokerController.getTopicConfigManager().getDataVersion().equals(topicWrapper.getDataVersion())) {this.brokerController.getTopicConfigManager().getDataVersion().assignNewOne(topicWrapper.getDataVersion());this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();this.brokerController.getTopicConfigManager().getTopicConfigTable().putAll(topicWrapper.getTopicConfigTable());// 持久化topic信息this.brokerController.getTopicConfigManager().persist();log.info("Update slave topic config from master, {}", masterAddrBak);}} catch (Exception e) {log.error("SyncTopicConfig Exception, {}", masterAddrBak, e);}}}// 同步消費(fèi)偏移量信息private void syncConsumerOffset() {String masterAddrBak = this.masterAddr;if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {try {ConsumerOffsetSerializeWrapper offsetWrapper =this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);this.brokerController.getConsumerOffsetManager().getOffsetTable().putAll(offsetWrapper.getOffsetTable());this.brokerController.getConsumerOffsetManager().persist();log.info("Update slave consumer offset from master, {}", masterAddrBak);} catch (Exception e) {log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);}}}// 額。。。反正就是一個(gè)數(shù)字吧, 存儲(chǔ)在 config/delayOffset.json 下private void syncDelayOffset() {String masterAddrBak = this.masterAddr;if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {try {String delayOffset =this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);if (delayOffset != null) {String fileName =StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());try {MixAll.string2File(delayOffset, fileName);} catch (IOException e) {log.error("Persist file Exception, {}", fileName, e);}}log.info("Update slave delay offset from master, {}", masterAddrBak);} catch (Exception e) {log.error("SyncDelayOffset Exception, {}", masterAddrBak, e);}}}// 同步消費(fèi)組信息數(shù)據(jù)private void syncSubscriptionGroupConfig() {String masterAddrBak = this.masterAddr;if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {try {SubscriptionGroupWrapper subscriptionWrapper =this.brokerController.getBrokerOuterAPI().getAllSubscriptionGroupConfig(masterAddrBak);if (!this.brokerController.getSubscriptionGroupManager().getDataVersion().equals(subscriptionWrapper.getDataVersion())) {SubscriptionGroupManager subscriptionGroupManager =this.brokerController.getSubscriptionGroupManager();subscriptionGroupManager.getDataVersion().assignNewOne(subscriptionWrapper.getDataVersion());subscriptionGroupManager.getSubscriptionGroupTable().clear();subscriptionGroupManager.getSubscriptionGroupTable().putAll(subscriptionWrapper.getSubscriptionGroupTable());// 持久化消費(fèi)組信息subscriptionGroupManager.persist();log.info("Update slave Subscription Group from master, {}", masterAddrBak);}} catch (Exception e) {log.error("SyncSubscriptionGroup Exception, {}", masterAddrBak, e);}}}
以上,就是rocketmq的主從同步的主體框架代碼了。回答上面的幾個(gè)疑問(wèn):同步個(gè)啥?同步4種數(shù)據(jù):topic信息、消費(fèi)偏移信息、延遲信息、訂閱組信息;同步的及時(shí)性如何?每10秒發(fā)起一步同步請(qǐng)求,即延遲是10秒級(jí)的。
等等,以上同步的信息,看起來(lái)都是元數(shù)據(jù)信息。那么消息數(shù)據(jù)的同步去哪里了?這可是我們最關(guān)心的啊!
4. rocketmq消息數(shù)據(jù)的同步實(shí)現(xiàn)
經(jīng)過(guò)上一節(jié)的分析,我們好像摸到了點(diǎn)皮毛,然后發(fā)現(xiàn)不是想要的。因?yàn)槎〞r(shí)任務(wù)只同步了元數(shù)據(jù)信息,而真正的數(shù)據(jù)信息同步去了哪里呢?實(shí)際上,它是由一個(gè)HAService去承載該功能的,HAService會(huì)使用的一個(gè)主循環(huán),一直不停地向master拉取數(shù)據(jù),然后添加到自身的commitlog文件中,從而實(shí)現(xiàn)真正的數(shù)據(jù)同步。
4.1. HAService的開(kāi)啟
同步服務(wù)是一系列專門的實(shí)現(xiàn)的,它包括server端,客戶端以及一些維護(hù)線程。這需要我們分開(kāi)理解。同步服務(wù)的開(kāi)啟,是在messageStore初始化時(shí)做的。它會(huì)讀取一個(gè)單獨(dú)的端口配置,開(kāi)啟HA同步服務(wù)。
// org.apache.rocketmq.store.DefaultMessageStore#DefaultMessageStorepublic DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {this.messageArrivingListener = messageArrivingListener;this.brokerConfig = brokerConfig;this.messageStoreConfig = messageStoreConfig;this.brokerStatsManager = brokerStatsManager;this.allocateMappedFileService = new AllocateMappedFileService(this);if (messageStoreConfig.isEnableDLegerCommitLog()) {this.commitLog = new DLedgerCommitLog(this);} else {this.commitLog = new CommitLog(this);}this.consumeQueueTable = new ConcurrentHashMap<>(32);this.flushConsumeQueueService = new FlushConsumeQueueService();this.cleanCommitLogService = new CleanCommitLogService();this.cleanConsumeQueueService = new CleanConsumeQueueService();this.storeStatsService = new StoreStatsService();this.indexService = new IndexService(this);if (!messageStoreConfig.isEnableDLegerCommitLog()) {// 初始化 HAServicethis.haService = new HAService(this);} else {this.haService = null;}...File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));MappedFile.ensureDirOK(file.getParent());lockFile = new RandomAccessFile(file, "rw");}// org.apache.rocketmq.store.ha.HAService#HAServicepublic HAService(final DefaultMessageStore defaultMessageStore) throws IOException {this.defaultMessageStore = defaultMessageStore;// 開(kāi)啟server端服務(wù)this.acceptSocketService =new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());this.groupTransferService = new GroupTransferService();// 初始化clientthis.haClient = new HAClient();}// 具體運(yùn)行則都會(huì)被視為一個(gè)個(gè)的后臺(tái)線程,會(huì)在start()操作中統(tǒng)一運(yùn)行起來(lái)public void start() throws Exception {// server端服務(wù)啟動(dòng),由master節(jié)點(diǎn)管控this.acceptSocketService.beginAccept();this.acceptSocketService.start();// 數(shù)據(jù)中轉(zhuǎn)服務(wù),它會(huì)接收用戶的寫(xiě)請(qǐng)求,然后吐數(shù)據(jù)給到各slave節(jié)點(diǎn)this.groupTransferService.start();// 客戶端請(qǐng)求服務(wù),由slave節(jié)點(diǎn)發(fā)起this.haClient.start();}
HAService作為rocketmq中的一個(gè)小型服務(wù),運(yùn)行在后臺(tái)線程中,為了簡(jiǎn)單起見(jiàn)或者資源隔離,它使用一些單獨(dú)的端口和通信實(shí)現(xiàn)處理。也可謂麻雀雖小,五臟俱全。下面我就分三個(gè)單獨(dú)的部分講解下如何實(shí)現(xiàn)數(shù)據(jù)同步。
4.2. 從節(jié)點(diǎn)同步實(shí)現(xiàn)
從節(jié)點(diǎn)負(fù)責(zé)主動(dòng)拉取主節(jié)點(diǎn)數(shù)據(jù),是一個(gè)比較重要的步驟。它的實(shí)現(xiàn)是在 HAClient 中的,該client啟動(dòng)起來(lái)之后,會(huì)一直不停地向master請(qǐng)求新的數(shù)據(jù),然后同步到自己的commitlog中。
// org.apache.rocketmq.store.ha.HAService.HAClient#run@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// 使用原生nio, 嘗試連接至masterif (this.connectMaster()) {if (this.isTimeToReportOffset()) {// 隔一段時(shí)間向master匯報(bào)一次本slave的同步信息boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);// 如果連接無(wú)效,則關(guān)閉,下次再循環(huán)周期將會(huì)重新發(fā)起連接if (!result) {this.closeMaster();}}this.selector.select(1000);// 核心邏輯:處理獲取到的消息數(shù)據(jù)boolean ok = this.processReadEvent();if (!ok) {this.closeMaster();}if (!reportSlaveMaxOffsetPlus()) {continue;}long interval =HAService.this.getDefaultMessageStore().getSystemClock().now()- this.lastWriteTimestamp;if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress+ "] expired, " + interval);this.closeMaster();log.warn("HAClient, master not response some time, so close connection");}} else {// 未連接成功,5秒后重試,可能會(huì)一直無(wú)用this.waitForRunning(1000 * 5);}} catch (Exception e) {log.warn(this.getServiceName() + " service has exception. ", e);this.waitForRunning(1000 * 5);}}log.info(this.getServiceName() + " service end");}private boolean connectMaster() throws ClosedChannelException {// 單例長(zhǎng)鏈接if (null == socketChannel) {String addr = this.masterAddress.get();// 如果沒(méi)有master, 則返回空// 針對(duì)master節(jié)點(diǎn),也是同樣的運(yùn)行,只是不會(huì)連接到任何節(jié)點(diǎn)而已if (addr != null) {SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);if (socketAddress != null) {// 原生nio實(shí)現(xiàn)this.socketChannel = RemotingUtil.connect(socketAddress);if (this.socketChannel != null) {this.socketChannel.register(this.selector, SelectionKey.OP_READ);}}}this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();this.lastWriteTimestamp = System.currentTimeMillis();}return this.socketChannel != null;}// org.apache.rocketmq.remoting.common.RemotingUtil#connectpublic static SocketChannel connect(SocketAddress remote) {return connect(remote, 1000 * 5);}public static SocketChannel connect(SocketAddress remote, final int timeoutMillis) {SocketChannel sc = null;try {sc = SocketChannel.open();sc.configureBlocking(true);sc.socket().setSoLinger(false, -1);sc.socket().setTcpNoDelay(true);sc.socket().setReceiveBufferSize(1024 * 64);sc.socket().setSendBufferSize(1024 * 64);sc.socket().connect(remote, timeoutMillis);sc.configureBlocking(false);return sc;} catch (Exception e) {if (sc != null) {try {sc.close();} catch (IOException e1) {e1.printStackTrace();}}}return null;}processReadEvent() 即是在收到master的新數(shù)據(jù)后,實(shí)現(xiàn)如何同步到本broker的commitlog中。其實(shí)現(xiàn)主要還是依賴于commitlogService.// org.apache.rocketmq.store.ha.HAService.HAClient#processReadEventprivate boolean processReadEvent() {int readSizeZeroTimes = 0;while (this.byteBufferRead.hasRemaining()) {try {int readSize = this.socketChannel.read(this.byteBufferRead);if (readSize > 0) {readSizeZeroTimes = 0;boolean result = this.dispatchReadRequest();if (!result) {log.error("HAClient, dispatchReadRequest error");return false;}} else if (readSize == 0) {if (++readSizeZeroTimes >= 3) {break;}} else {log.info("HAClient, processReadEvent read socket < 0");return false;}} catch (IOException e) {log.info("HAClient, processReadEvent read socket exception", e);return false;}}return true;}private boolean dispatchReadRequest() {// 按協(xié)議讀取數(shù)據(jù)final int msgHeaderSize = 8 + 4; // phyoffset + sizeint readSocketPos = this.byteBufferRead.position();while (true) {int diff = this.byteBufferRead.position() - this.dispatchPosition;if (diff >= msgHeaderSize) {long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();if (slavePhyOffset != 0) {if (slavePhyOffset != masterPhyOffset) {log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "+ slavePhyOffset + " MASTER: " + masterPhyOffset);return false;}}// 數(shù)據(jù)讀取完成,則立即添加到存儲(chǔ)中if (diff >= (msgHeaderSize + bodySize)) {byte[] bodyData = new byte[bodySize];this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);this.byteBufferRead.get(bodyData);HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);this.byteBufferRead.position(readSocketPos);this.dispatchPosition += msgHeaderSize + bodySize;if (!reportSlaveMaxOffsetPlus()) {return false;}continue;}}if (!this.byteBufferRead.hasRemaining()) {this.reallocateByteBuffer();}break;}return true;}// org.apache.rocketmq.store.DefaultMessageStore#appendToCommitLog@Overridepublic boolean appendToCommitLog(long startOffset, byte[] data) {if (this.shutdown) {log.warn("message store has shutdown, so appendToPhyQueue is forbidden");return false;}// 添加到commitlog中,并生成后續(xù)的consumeQueue,index等相關(guān)信息boolean result = this.commitLog.appendData(startOffset, data);if (result) {this.reputMessageService.wakeup();} else {log.error("appendToPhyQueue failed " + startOffset + " " + data.length);}return result;}
從slave節(jié)點(diǎn)的處理流程,我們基本上已經(jīng)完全搞清楚了rocketmq如何同步數(shù)據(jù)的了。單獨(dú)開(kāi)啟一個(gè)端口用于同步數(shù)據(jù),slave一直不停地輪詢master, 拿到新數(shù)據(jù)后,就將其添加到自身的commitlog中,構(gòu)造自身的數(shù)據(jù)集。從而保持與master的同步。(請(qǐng)需要注意數(shù)據(jù)一致性)
4.3. master的數(shù)據(jù)同步服務(wù)
從節(jié)點(diǎn)負(fù)責(zé)不停從主節(jié)點(diǎn)拉取數(shù)據(jù),所以主節(jié)點(diǎn)只要給到數(shù)據(jù)就可以了。但至少,主節(jié)點(diǎn)還是有一個(gè)網(wǎng)絡(luò)服務(wù),以便接受從節(jié)點(diǎn)的請(qǐng)求。
這同樣是在 HAService中,它直接以nio的形式開(kāi)啟一個(gè)服務(wù)端口,從而接收請(qǐng)求:
// org.apache.rocketmq.store.ha.HAService.AcceptSocketService/*** Listens to slave connections to create {@link HAConnection}.*/class AcceptSocketService extends ServiceThread {private final SocketAddress socketAddressListen;private ServerSocketChannel serverSocketChannel;private Selector selector;// 給定端口監(jiān)聽(tīng)public AcceptSocketService(final int port) {this.socketAddressListen = new InetSocketAddress(port);}/*** Starts listening to slave connections.** @throws Exception If fails.*/public void beginAccept() throws Exception {this.serverSocketChannel = ServerSocketChannel.open();this.selector = RemotingUtil.openSelector();this.serverSocketChannel.socket().setReuseAddress(true);this.serverSocketChannel.socket().bind(this.socketAddressListen);this.serverSocketChannel.configureBlocking(false);this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);}/*** {@inheritDoc}*/@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {this.selector.select(1000);Setselected = this.selector.selectedKeys(); if (selected != null) {for (SelectionKey k : selected) {if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();if (sc != null) {HAService.log.info("HAService receive new connection, "+ sc.socket().getRemoteSocketAddress());try {HAConnection conn = new HAConnection(HAService.this, sc);// accept 接入后,開(kāi)啟另外的讀線程處理數(shù)據(jù)請(qǐng)求conn.start();HAService.this.addConnection(conn);} catch (Exception e) {log.error("new HAConnection exception", e);sc.close();}}} else {log.warn("Unexpected ops in select " + k.readyOps());}}selected.clear();}} catch (Exception e) {log.error(this.getServiceName() + " service has exception.", e);}}log.info(this.getServiceName() + " service end");}...}// org.apache.rocketmq.store.ha.HAConnection#startpublic void start() {this.readSocketService.start();this.writeSocketService.start();}// org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#run@Overridepublic void run() {HAConnection.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {this.selector.select(1000);boolean ok = this.processReadEvent();if (!ok) {HAConnection.log.error("processReadEvent error");break;}long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);break;}} catch (Exception e) {HAConnection.log.error(this.getServiceName() + " service has exception.", e);break;}}this.makeStop();writeSocketService.makeStop();haService.removeConnection(HAConnection.this);HAConnection.this.haService.getConnectionCount().decrementAndGet();SelectionKey sk = this.socketChannel.keyFor(this.selector);if (sk != null) {sk.cancel();}try {this.selector.close();this.socketChannel.close();} catch (IOException e) {HAConnection.log.error("", e);}HAConnection.log.info(this.getServiceName() + " service end");}private boolean processReadEvent() {int readSizeZeroTimes = 0;if (!this.byteBufferRead.hasRemaining()) {this.byteBufferRead.flip();this.processPosition = 0;}while (this.byteBufferRead.hasRemaining()) {try {int readSize = this.socketChannel.read(this.byteBufferRead);if (readSize > 0) {readSizeZeroTimes = 0;this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();if ((this.byteBufferRead.position() - this.processPosition) >= 8) {int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);long readOffset = this.byteBufferRead.getLong(pos - 8);this.processPosition = pos;// 讀取唯一參數(shù)HAConnection.this.slaveAckOffset = readOffset;if (HAConnection.this.slaveRequestOffset < 0) {HAConnection.this.slaveRequestOffset = readOffset;log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);}// ...HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);}} else if (readSize == 0) {if (++readSizeZeroTimes >= 3) {break;}} else {log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");return false;}} catch (IOException e) {log.error("processReadEvent exception", e);return false;}}return true;}// org.apache.rocketmq.store.ha.HAService#notifyTransferSomepublic void notifyTransferSome(final long offset) {for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);if (ok) {this.groupTransferService.notifyTransferSome();break;} else {value = this.push2SlaveMaxOffset.get();}}}
端口開(kāi)啟及接受請(qǐng)求很容易,但如何響應(yīng)客戶端還是有點(diǎn)復(fù)雜的。各自同學(xué)自行深入吧!
GroupCommitService 通過(guò)一個(gè)寫(xiě)隊(duì)列和讀隊(duì)列,在有消息寫(xiě)入時(shí)將被調(diào)用,從而達(dá)到實(shí)時(shí)通知的目的。
// org.apache.rocketmq.store.ha.HAService.GroupTransferService#putRequestpublic synchronized void putRequest(final CommitLog.GroupCommitRequest request) {synchronized (this.requestsWrite) {this.requestsWrite.add(request);}this.wakeup();}public void notifyTransferSome() {this.notifyTransferObject.wakeup();}private void swapRequests() {// 交換bufferListtmp = this.requestsWrite; this.requestsWrite = this.requestsRead;this.requestsRead = tmp;}private void doWaitTransfer() {synchronized (this.requestsRead) {if (!this.requestsRead.isEmpty()) {for (CommitLog.GroupCommitRequest req : this.requestsRead) {boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()+ HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {this.notifyTransferObject.waitForRunning(1000);transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();}if (!transferOK) {log.warn("transfer messsage to slave timeout, " + req.getNextOffset());}req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);}this.requestsRead.clear();}}}public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {this.waitForRunning(10);this.doWaitTransfer();} catch (Exception e) {log.warn(this.getServiceName() + " service has exception. ", e);}}log.info(this.getServiceName() + " service end");}
至此,rocketmq主從同步解析完成。rocketmq基于commitlog實(shí)現(xiàn)核心主從同步,以及其他多個(gè)元數(shù)據(jù)信息的簡(jiǎn)單定時(shí)同步,并以兩個(gè)緩沖buffer的形式,及時(shí)將數(shù)據(jù)推送到從節(jié)點(diǎn)。保證了盡量好的數(shù)據(jù)一致性。
?
最后,我們需要注意一個(gè)問(wèn)題,就是主從的數(shù)據(jù)一致性到底是如何保證的?因?yàn)橹鞯臄?shù)據(jù)是直接寫(xiě)入的,那么從的數(shù)據(jù)又如何保證與主的一樣,或者簡(jiǎn)單說(shuō)就是,如何保證寫(xiě)入的順序呢?如果某兩條記錄插入commitlog的順序不一樣,那么最終就會(huì)亂序,結(jié)果就完不一樣了,比如進(jìn)行主從切換,那么如果使用相同的偏移量進(jìn)行取值,必然會(huì)得到不一樣的結(jié)果。
實(shí)際上,從服務(wù)器僅使用一條線程進(jìn)行數(shù)據(jù)同步,即拉取到的數(shù)據(jù)順序是一致的,寫(xiě)入commitlog也是用同一條線程進(jìn)行寫(xiě)入,自然就不會(huì)存在亂序問(wèn)題了。這可能也是主從同步不能使用netty這種通信框架的原因,沒(méi)必要也不能做。主從同步要求保證嚴(yán)格的順序性,而無(wú)需過(guò)多考慮并發(fā)性。就像redis的單線程,同樣撐起超高的性能。rocketmq主從同步基于原生 nio, 加上pagecache, mmap 同樣實(shí)現(xiàn)了超高的性能。也就無(wú)需單線程同步會(huì)導(dǎo)致很大延遲了。

騰訊、阿里、滴滴后臺(tái)面試題匯總總結(jié) — (含答案)
面試:史上最全多線程面試題 !
最新阿里內(nèi)推Java后端面試題
JVM難學(xué)?那是因?yàn)槟銢](méi)認(rèn)真看完這篇文章

關(guān)注作者微信公眾號(hào) —《JAVA爛豬皮》
了解更多java后端架構(gòu)知識(shí)以及最新面試寶典


看完本文記得給作者點(diǎn)贊+在看哦~~~大家的支持,是作者源源不斷出文的動(dòng)力
作者:等你歸去來(lái)
出處:https://www.cnblogs.com/yougewe/p/14198675.html
