NameServer剖析
前言
到現(xiàn)在為止,RocketMQ已經(jīng)更了5篇文章:一、namesrv的作用
1.1、路由元信息
? private?final?static?long?BROKER_CHANNEL_EXPIRED_TIME?=?1000?*?60?*?2;private final ReadWriteLock lock = new ReentrantReadWriteLock();private final HashMap> topicQueueTable; private final HashMapbrokerAddrTable; private final HashMap> clusterAddrTable; private final HashMapbrokerLiveTable; private final HashMap/* Filter Server */> filterServerTable;
BROKER_CHANNEL_EXPIRED_TIME:broker啟動(dòng)時(shí)會(huì)向集群中所有的namesrv發(fā)送心跳,之后每隔30s再次發(fā)送,如果namesrv在連續(xù)120s內(nèi)沒有收到broker發(fā)送的心跳,那么此broker就會(huì)被namesrv移除。
lock:讀寫鎖,讀寫路由信息時(shí)候是需要加鎖的,不然并發(fā)訪問時(shí)會(huì)有問題。
topicQueueTable:topic消息隊(duì)列路由信息,消息發(fā)送時(shí)根據(jù)路由表進(jìn)行負(fù)載均衡。
brokerAddrTable:broker的基礎(chǔ)信息,包含brokerName、所屬集群名稱、主從broker的地址。
clusterAddrTable:broker集群信息,存儲(chǔ)集群中所有broker的名稱。
brokerLiveTable:broker狀態(tài)信息,namesrv每次收到broker發(fā)送的心跳包時(shí)都會(huì)更新該信息。
filterServerTable:broker上的FilterServer列表,用于類模式消費(fèi)過濾。
QueueData:見名猜意,隊(duì)列數(shù)據(jù),該類是topic對(duì)應(yīng)的隊(duì)列信息的抽象

readQueueNums:讀隊(duì)列個(gè)數(shù),一個(gè)broker默認(rèn)為每個(gè)topic創(chuàng)建4個(gè)讀隊(duì)列writeQueueNums:寫隊(duì)列個(gè)數(shù),?一個(gè)broker默認(rèn)為每個(gè)topic創(chuàng)建4個(gè)寫隊(duì)列perm:讀寫權(quán)限,一般設(shè)置為6,6:同時(shí)支持讀寫,?4:禁寫,2:禁讀topicSynFlag:topic同步標(biāo)記,同步復(fù)制還是異步復(fù)制
BrokerData:broker數(shù)據(jù)的抽象

brokerName:broker的名稱
brokerAddrs:broker的ip集合,為什么是集合,因?yàn)榛橹鲝牡腷roker的brokerName是相同的,key是brokerId(0表示master,大于0表示slave),value是broker的ip地址。
注:一個(gè)broker集群包含多個(gè)broker主從,一個(gè)broker主從中的主和從的brokerName是一樣的。
random:隨機(jī)數(shù)生成器,當(dāng)一條消息發(fā)送到broker集群,就是根據(jù)隨機(jī)數(shù)生成來選擇消息存儲(chǔ)到哪個(gè)broker實(shí)例上的。BrokerLiveInfo:存活的broker信息,就是和namesrv有心跳連接的broker

注:這里是為了看map中的集群信息,所以利用了centos中搭建的mq集群。




1.2、路由注冊(cè)
org.apache.rocketmq.broker.BrokerStartup#main
org.apache.rocketmq.broker.BrokerStartup#start
org.apache.rocketmq.broker.BrokerController#start


org.apache.rocketmq.broker.BrokerController#registerBrokerAll
org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll
org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll
接下來本地啟動(dòng)broker,debug跟下看看它給namesrv發(fā)送請(qǐng)求的過程public ListregisterBrokerAll( final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final ListfilterServerList, final boolean oneway,final int timeoutMills,final boolean compressed) {final ListregisterBrokerResultList = Lists.newArrayList(); ????????//獲取所有namesrv的地址:ip端口ListnameServerAddressList = this.remotingClient.getNameServerAddressList(); ????????//遍歷namesrv集合if (nameServerAddressList != null && nameServerAddressList.size() > 0) {????????????//封裝請(qǐng)求頭final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();????????????//broker地址requestHeader.setBrokerAddr(brokerAddr);????????????//brokerId,0:master,大于0:slaverequestHeader.setBrokerId(brokerId);????????????//broker名稱requestHeader.setBrokerName(brokerName);????????????//所屬集群的名稱requestHeader.setClusterName(clusterName);????????????//當(dāng)前broker所屬主從中主節(jié)點(diǎn)的地址requestHeader.setHaServerAddr(haServerAddr);requestHeader.setCompressed(compressed);????????????////封裝請(qǐng)求體RegisterBrokerBody requestBody = new RegisterBrokerBody();????????????/**主題配置,topicConfigWrapper內(nèi)部封裝的是topicConfigManager中的????????????topicConfigTable,內(nèi)部存儲(chǔ)的是broker啟動(dòng)時(shí)默認(rèn)的一些topic,????????????,MixAll.SELF_TEST_TOPIC、MixAll.DEFAULT_TOPIC(AutoCreateTopic????????????-Enable=true)、MixAll.BENCHMARK_TOPIC、MixAll.OFFSET_MOVED_EVNET、??????????? BrokerConfig中的brokerClusterName、brokerName。Broker中Topic????????????默認(rèn)存儲(chǔ)在${Rocket_Home}/store/config/topic.json中*/requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);requestBody.setFilterServerList(filterServerList);final byte[] body = requestBody.encode(compressed);final int bodyCrc32 = UtilAll.crc32(body);requestHeader.setBodyCrc32(bodyCrc32);final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());for (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(new Runnable() {@Overridepublic void run() {try {????????????????????????????//通過netty向namesrv發(fā)送網(wǎng)絡(luò)請(qǐng)求,注冊(cè)brokerRegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);if (result != null) {registerBrokerResultList.add(result);}log.info("register broker to name server {} OK", namesrvAddr);} catch (Exception e) {log.warn("registerBroker Exception, {}", namesrvAddr, e);} finally {countDownLatch.countDown();}}});}try {countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {}}return registerBrokerResultList;}
注:為了簡(jiǎn)潔明了,直接將斷點(diǎn)打到
org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll方法中,著重看下封裝的參數(shù)即可


注:broker每隔30s會(huì)發(fā)一個(gè)心跳,所以你會(huì)看到請(qǐng)求不斷地進(jìn)入
org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll方法,為了排除干擾,我們將心跳時(shí)間改長(zhǎng)點(diǎn),然后重啟broker










1.3、路由刪除
org.apache.rocketmq.namesrv.NamesrvStartup#main
org.apache.rocketmq.namesrv.NamesrvStartup#main0
org.apache.rocketmq.namesrv.NamesrvStartup#start
org.apache.rocketmq.namesrv.NamesrvController#initialize

你會(huì)發(fā)現(xiàn)這個(gè)方法邏輯很簡(jiǎn)單,就是遍歷brokerLiveTable,判斷每一個(gè)brokerLiveInfo的lastUpdateTimestamp和當(dāng)前時(shí)間戳的差距是不是大于120s,如果是,就關(guān)閉和此broker的socket連接,并將此brokerLiveInfo從brokerLiveTable中刪除,然后刪除與該broker相關(guān)的路由信息,路由的具體刪除邏輯在onChannelDestroy方法中public void scanNotActiveBroker() {Iterator> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) {Entrynext = it.next(); long last = next.getValue().getLastUpdateTimestamp();if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {RemotingUtil.closeChannel(next.getValue().getChannel());it.remove();log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);this.onChannelDestroy(next.getKey(), next.getValue().getChannel());}}}
根據(jù)brokerAddress從brokerLiveTable、filterServerTable移除




1.4、路由發(fā)現(xiàn)

方法主要邏輯如下:public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final GetRouteInfoRequestHeader requestHeader =(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);????????//1:TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());????????//2:if (topicRouteData != null) {if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {String orderTopicConf =this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,requestHeader.getTopic());topicRouteData.setOrderTopicConf(orderTopicConf);}byte[] content = topicRouteData.encode();response.setBody(content);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}//3:response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));return response;}
調(diào)用pickupTopicRouteData方法,從路由表topicQueueTable、brokerAddrTable、filterServerTable中分別填充到TopicRouteData中的queueData、brokerDatas、filterServerTable屬性中。
List
:topic隊(duì)列元數(shù)據(jù) List
:topic分布的broker元數(shù)據(jù) filterServer:broker上過濾服務(wù)器地址列表
如果找到topic對(duì)應(yīng)的路由信息并且該topic為順序消息,則從NameServerKVconfig中獲取關(guān)于順序消息相關(guān)的配置填充路由信息。
如果找不到路由信息就返回ResponseCode.TOPIC_NOT_EXIST
二、總結(jié)

