一文讀懂RocketMQ的高可用機(jī)制——集群管理高可用

點(diǎn)擊上方老周聊架構(gòu)關(guān)注我
一、前言
一文讀懂RocketMQ的高可用機(jī)制——消息存儲(chǔ)高可用
一文讀懂RocketMQ的高可用機(jī)制——消息發(fā)送高可用
一文讀懂RocketMQ的高可用機(jī)制——消息消費(fèi)高可用
二、架構(gòu)設(shè)計(jì)
Broker 每隔 30 秒向所有 NameServer 發(fā)送心跳,心跳包含了自身的 topic 配置信息。
NameServer 每隔 10 秒,掃描所有還存活的 broker 連接,如果某個(gè)連接的最后更新時(shí)間與當(dāng)前時(shí)間差值超過 2 分鐘,則斷開此連接,NameServer 也會(huì)斷開此 broker 下所有與 slave 的連接。同時(shí)更新 topic 與隊(duì)列的對(duì)應(yīng)關(guān)系,但不通知生產(chǎn)者和消費(fèi)者。
Broker slave 同步或者異步從 Broker master 上拷貝數(shù)據(jù)。
Consumer 主要從 NameServer 中根據(jù) Topic 查詢 Broker 的地址,查到就會(huì)緩存到客戶端,并向提供 Topic 服務(wù)的 Master、Slave 建立長(zhǎng)連接,且定時(shí)向 Master、Slave 發(fā)送心跳。
如果 Broker 宕機(jī),則 NameServer 會(huì)將其剔除,而 Consumer 端的定時(shí)任務(wù) MQClientInstance.this.updateTopicRouteInfoFromNameServer 每 30 秒執(zhí)行一次,將 Topic 對(duì)應(yīng)的 Broker 地址拉取下來,此地址只有 Slave 地址了,此時(shí) Consumer 從 Slave 上消費(fèi)。
消費(fèi)者與 Master 和 Slave 都建有連接,在不同場(chǎng)景有不同的消費(fèi)規(guī)則。
生產(chǎn)者與所有的 master 連接,但不能向 slave 寫入。
客戶端是先從 NameServer 尋址的,得到可用 Broker 的 IP 和端口信息,然后據(jù)此信息連接 broker。
NameServer 用來保存活躍的 broker 列表,包括 Master 和 Slave 。
NameServer 用來保存所有 topic 和該 topic 所有隊(duì)列的列表。
NameServer 用來保存所有 broker 的 Filter 列表。
命名服務(wù)器為客戶端,包括生產(chǎn)者,消費(fèi)者和命令行客戶端提供最新的路由信息。
三、啟動(dòng)流程
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));Options options = ServerUtil.buildCommandlineOptions(new Options());commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());if (null == commandLine) {System.exit(-1);return null;}// 創(chuàng)建NamesrvConfigfinal NamesrvConfig namesrvConfig = new NamesrvConfig();// 創(chuàng)建NettyServerConfigfinal NettyServerConfig nettyServerConfig = new NettyServerConfig();// 設(shè)置啟動(dòng)端口號(hào)nettyServerConfig.setListenPort(9876);// 解析啟動(dòng)-c參數(shù)if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}// 解析啟動(dòng)-p參數(shù)if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);MixAll.printObjectProperties(console, namesrvConfig);MixAll.printObjectProperties(console, nettyServerConfig);System.exit(0);}// 將啟動(dòng)參數(shù)填充到namesrvConfig,nettyServerConfigMixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);// 創(chuàng)建NameServerControllerfinal NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);return controller;}
rocketmqHome:rocketmq 主目錄
kvConfig:NameServer 存儲(chǔ) KV 配置屬性的持久化路徑
configStorePath:nameServer 默認(rèn)配置文件路徑
orderMessageEnable:是否支持順序消息
listenPort:NameServer 監(jiān)聽端口,該值默認(rèn)會(huì)被初始化為 9876;
serverWorkerThreads:Netty 業(yè)務(wù)線程池線程個(gè)數(shù);
serverCallbackExecutorThreads:Netty public 任務(wù)線程池線程個(gè)數(shù), Netty 網(wǎng)絡(luò)設(shè)計(jì),根據(jù)業(yè)務(wù)類型會(huì)創(chuàng)建不同的線程池,比如處理消息發(fā)送、消息消費(fèi)、心跳檢測(cè)等。如果該業(yè)務(wù)類型未注冊(cè)線程池,則由 public 線程池執(zhí)行;
serverSelectorThreads:IO 線程池個(gè)數(shù),主要是 NameServer、Broker 端解析請(qǐng)求、返回相應(yīng)的線程個(gè)數(shù),這類線程主要是處理網(wǎng)路請(qǐng)求的,解析請(qǐng)求包,然后轉(zhuǎn)發(fā)到各個(gè)業(yè)務(wù)線程池完成具體的操作,然后將結(jié)果返回給調(diào)用方;
serverOnewaySemaphoreValue:send oneway 消息請(qǐng)求并發(fā)讀(Broker端參數(shù));
serverAsyncSemaphoreValue:異步消息發(fā)送最大并發(fā)度;
serverChannelMaxIdleTimeSeconds:網(wǎng)絡(luò)連接最大的空閑時(shí)間,默認(rèn) 120s;
serverSocketSndBufSize:網(wǎng)絡(luò)socket發(fā)送緩沖區(qū)大小;
serverSocketRcvBufSize:網(wǎng)絡(luò)接收端緩存區(qū)大小;
serverPooledByteBufAllocatorEnable:ByteBuffer 是否開啟緩存;
useEpollNativeSelector:是否啟用 Epoll IO 模型;
代碼:
public boolean initialize() {// 加載KV配置this.kvConfigManager.load();// 創(chuàng)建NettyServer網(wǎng)絡(luò)處理對(duì)象this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);// 開啟定時(shí)任務(wù):每隔10s掃描一次Broker,移除不活躍的Brokerthis.remotingExecutor =Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));this.registerProcessor();this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {public void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);// 開啟定時(shí)任務(wù):每隔10min打印一次KV配置this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {public void run() {NamesrvController.this.kvConfigManager.printAllPeriodically();}}, 1, 10, TimeUnit.MINUTES);if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {// Register a listener to reload SslContexttry {fileWatchService = new FileWatchService(new String[] {TlsSystemConfig.tlsServerCertPath,TlsSystemConfig.tlsServerKeyPath,TlsSystemConfig.tlsServerTrustCertPath},new FileWatchService.Listener() {boolean certChanged, keyChanged = false;public void onChanged(String path) {if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {log.info("The trust certificate changed, reload the ssl context");reloadServerSslContext();}if (path.equals(TlsSystemConfig.tlsServerCertPath)) {certChanged = true;}if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {keyChanged = true;}if (certChanged && keyChanged) {log.info("The certificate and private key changed, reload the ssl context");certChanged = keyChanged = false;reloadServerSslContext();}}private void reloadServerSslContext() {((NettyRemotingServer) remotingServer).loadSslContext();}});} catch (Exception e) {log.warn("FileWatchService created error, can't load the certificate dynamically");}}return true;}
代碼:
public static NamesrvController start(final NamesrvController controller) throws Exception {if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");}boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}// 注冊(cè)JVM鉤子函數(shù)代碼Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {public Void call() throws Exception {// 釋放資源controller.shutdown();return null;}}));controller.start();return controller;}
四、路由管理
topicQueueTable
維護(hù)了 Topic 和其對(duì)應(yīng)消息隊(duì)列的映射關(guān)系,QueueData 記錄了一條隊(duì)列的元信息:所在 Broker、讀隊(duì)列數(shù)量、寫隊(duì)列數(shù)量等。
brokerAddrTable
維護(hù)了 Broker Name 和 Broker 元信息的映射關(guān)系,Broker 通常以 Master-Slave 架構(gòu)部署,BrokerData 記錄了同一個(gè) Broker Name 下所有節(jié)點(diǎn)的地址信息。
clusterAddrTable
維護(hù)了 Broker 的集群信息。
brokerLiveTable
維護(hù)了 Broker 的存活信息。NameServer 在收到來自 Broker 的心跳消息后,更新 BrokerLiveInfo 中的 lastUpdateTimestamp,如果 NameServer 長(zhǎng)時(shí)間未收到 Broker 的心跳信息,NameServer 就會(huì)將其移除。
filterServerTable
Broker 上的 FilterServer 列表,用于類模式消息過濾。
// org.apache.rocketmq.remoting.netty.NettyRequestProcessorpublic interface NettyRequestProcessor {RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws Exception;boolean rejectRequest();}
public List<RegisterBrokerResult> registerBrokerAll(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final boolean oneway,final int timeoutMills,final boolean compressed) {final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();// 獲得nameServer地址信息List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();// 遍歷所有nameserver列表if (nameServerAddressList != null && nameServerAddressList.size() > 0) {// 封裝請(qǐng)求頭final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);requestHeader.setHaServerAddr(haServerAddr);requestHeader.setCompressed(compressed);// 封裝請(qǐng)求體RegisterBrokerBody requestBody = new RegisterBrokerBody();requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);requestBody.setFilterServerList(filterServerList);final byte[] body = requestBody.encode(compressed);final int bodyCrc32 = UtilAll.crc32(body);requestHeader.setBodyCrc32(bodyCrc32);final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());for (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(new Runnable() {@Overridepublic void run() {try {// 分別向NameServer注冊(cè)RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);if (result != null) {registerBrokerResultList.add(result);}log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);} catch (Exception e) {log.warn("registerBroker Exception, {}", namesrvAddr, e);} finally {countDownLatch.countDown();}}});}try {countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {}}return registerBrokerResultList;}
public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {try {// 加鎖this.lock.writeLock().lockInterruptibly();// 維護(hù)clusterAddrTableSet<String> brokerNames = this.clusterAddrTable.get(clusterName);if (null == brokerNames) {brokerNames = new HashSet<String>();this.clusterAddrTable.put(clusterName, brokerNames);}brokerNames.add(brokerName);boolean registerFirst = false;// 維護(hù)brokerAddrTableBrokerData brokerData = this.brokerAddrTable.get(brokerName);// 第一次注冊(cè),則創(chuàng)建brokerDataif (null == brokerData) {registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());this.brokerAddrTable.put(brokerName, brokerData);}// 非第一次注冊(cè),更新BrokerMap<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>//The same IP:PORT must only have one record in brokerAddrTableIterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();while (it.hasNext()) {Entry<Long, String> item = it.next();if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {it.remove();}}String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);registerFirst = registerFirst || (null == oldAddr);// 維護(hù)topicQueueTableif (null != topicConfigWrapper&& MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst) {ConcurrentMap<String, TopicConfig> tcTable =topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {this.createAndUpdateQueueData(brokerName, entry.getValue());}}}}// 維護(hù)brokerLiveTableBrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);}// 維護(hù)filterServerListif (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr);} else {this.filterServerTable.put(brokerAddr, filterServerList);}}if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if (brokerLiveInfo != null) {result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("registerBroker Exception", e);}return result;}
NameServer 定期掃描 brokerLiveTable 檢測(cè)上次心跳包與當(dāng)前系統(tǒng)的時(shí)間差,如果時(shí)間超過 120s,則需要移除 broker。
Broker 在正常關(guān)閉的情況下,會(huì)執(zhí)行 unregisterBroker 指令。

public void scanNotActiveBroker() {// 獲得brokerLiveTableIterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();// 遍歷brokerLiveTablewhile (it.hasNext()) {Entry<String, BrokerLiveInfo> next = it.next();long last = next.getValue().getLastUpdateTimestamp();// 如果收到心跳包的時(shí)間距當(dāng)時(shí)時(shí)間是否超過120sif ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {// 關(guān)閉連接RemotingUtil.closeChannel(next.getValue().getChannel());// 移除brokerit.remove();log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);// 維護(hù)路由表this.onChannelDestroy(next.getKey(), next.getValue().getChannel());}}}
public void onChannelDestroy(String remoteAddr, Channel channel) {String brokerAddrFound = null;if (channel != null) {try {try {this.lock.readLock().lockInterruptibly();Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =this.brokerLiveTable.entrySet().iterator();while (itBrokerLiveTable.hasNext()) {Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();if (entry.getValue().getChannel() == channel) {brokerAddrFound = entry.getKey();break;}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}}if (null == brokerAddrFound) {brokerAddrFound = remoteAddr;} else {log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);}if (brokerAddrFound != null && brokerAddrFound.length() > 0) {try {try {// 申請(qǐng)寫鎖,根據(jù)brokerAddress從brokerLiveTable和filterServerTable移除this.lock.writeLock().lockInterruptibly();this.brokerLiveTable.remove(brokerAddrFound);this.filterServerTable.remove(brokerAddrFound);// 維護(hù)brokerAddrTableString brokerNameFound = null;boolean removeBrokerName = false;Iterator<Entry<String, BrokerData>> itBrokerAddrTable =this.brokerAddrTable.entrySet().iterator();// 遍歷brokerAddrTablewhile (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {BrokerData brokerData = itBrokerAddrTable.next().getValue();// 遍歷broker地址Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();while (it.hasNext()) {Entry<Long, String> entry = it.next();Long brokerId = entry.getKey();String brokerAddr = entry.getValue();// 根據(jù)broker地址移除brokerAddrif (brokerAddr.equals(brokerAddrFound)) {brokerNameFound = brokerData.getBrokerName();it.remove();log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",brokerId, brokerAddr);break;}}// 如果當(dāng)前主題只包含待移除的broker,則移除該topicif (brokerData.getBrokerAddrs().isEmpty()) {removeBrokerName = true;itBrokerAddrTable.remove();log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",brokerData.getBrokerName());}}// 維護(hù)clusterAddrTableif (brokerNameFound != null && removeBrokerName) {Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();// 遍歷clusterAddrTablewhile (it.hasNext()) {Entry<String, Set<String>> entry = it.next();// 獲得集群名稱String clusterName = entry.getKey();// 獲得集群中brokerName集合Set<String> brokerNames = entry.getValue();// 從brokerNames中移除brokerNameFoundboolean removed = brokerNames.remove(brokerNameFound);if (removed) {log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",brokerNameFound, clusterName);if (brokerNames.isEmpty()) {log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",clusterName);// 如果集群中不包含任何broker,則移除該集群it.remove();}break;}}}// 維護(hù)topicQueueTable隊(duì)列if (removeBrokerName) {// 遍歷topicQueueTableIterator<Entry<String, List<QueueData>>> itTopicQueueTable =this.topicQueueTable.entrySet().iterator();while (itTopicQueueTable.hasNext()) {Entry<String, List<QueueData>> entry = itTopicQueueTable.next();// 主題名稱String topic = entry.getKey();// 隊(duì)列集合List<QueueData> queueDataList = entry.getValue();// 遍歷該主題隊(duì)列Iterator<QueueData> itQueueData = queueDataList.iterator();while (itQueueData.hasNext()) {// 從隊(duì)列中移除為活躍broker信息QueueData queueData = itQueueData.next();if (queueData.getBrokerName().equals(brokerNameFound)) {itQueueData.remove();log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",topic, queueData);}}// 如果該topic的隊(duì)列為空,則移除該topicif (queueDataList.isEmpty()) {itTopicQueueTable.remove();log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",topic);}}}} finally {// 釋放寫鎖this.lock.writeLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}}}
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final GetRouteInfoRequestHeader requestHeader =(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);// 調(diào)用RouteInfoManager的方法,從路由表topicQueueTable、brokerAddrTable、 filterServerTable中// 分別填充TopicRouteData的List<QueueData>、List<BrokerData>、 filterServerTopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());// 如果找到主題對(duì)應(yīng)你的路由信息并且該主題為順序消息,則從NameServer KVConfig中獲取 關(guān)于順序消息相關(guān)的配置填充路由信息if (topicRouteData != null) {if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {String orderTopicConf =this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,requestHeader.getTopic());topicRouteData.setOrderTopicConf(orderTopicConf);}byte[] content = topicRouteData.encode();response.setBody(content);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));return response;}
五、總結(jié)
歡迎大家關(guān)注我的公眾號(hào)【老周聊架構(gòu)】,Java后端主流技術(shù)棧的原理、源碼分析、架構(gòu)以及各種互聯(lián)網(wǎng)高并發(fā)、高性能、高可用的解決方案。

點(diǎn)個(gè)在看你最好看





















