<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          NameServer剖析

          共 12038字,需瀏覽 25分鐘

           ·

          2021-11-03 17:13

          前言

          到現(xiàn)在為止,RocketMQ已經(jīng)更了5篇文章:回過頭看,里面基本就是跟了跟源碼,并沒有對(duì)重要的知識(shí)做剖析,為了讓文章不太臃腫,所以我將跟源碼和知識(shí)的剖析分開來寫,今天先來剖析NameServer(以下簡(jiǎn)稱namesrv)。
          • 一、namesrv的作用

          通過RocketMQ的NameServer(路由中心)我們知道,namesrv的主要作用是為消息生產(chǎn)者和消息消費(fèi)者提供關(guān)于主題(topic)的路由信息,因此namesrv不僅需要存儲(chǔ)路由元信息,還要管理broker節(jié)點(diǎn),包括路由注冊(cè)、路由刪除等功能。
          • 1.1、路由元信息

          找到namesrv的路由實(shí)現(xiàn)類,它位于namesrv模塊下,路徑為org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager,看下RouteInfoManager類中的屬性
          ?   private?final?static?long?BROKER_CHANNEL_EXPIRED_TIME?=?1000?*?60?*?2;    private final ReadWriteLock lock = new ReentrantReadWriteLock();    private final HashMap> topicQueueTable;    private final HashMap brokerAddrTable;    private final HashMap> clusterAddrTable;    private final HashMap brokerLiveTable;    private final HashMap/* Filter Server */> filterServerTable;
          • BROKER_CHANNEL_EXPIRED_TIME:broker啟動(dòng)時(shí)會(huì)向集群中所有的namesrv發(fā)送心跳,之后每隔30s再次發(fā)送,如果namesrv在連續(xù)120s內(nèi)沒有收到broker發(fā)送的心跳,那么此broker就會(huì)被namesrv移除。

          • lock:讀寫鎖,讀寫路由信息時(shí)候是需要加鎖的,不然并發(fā)訪問時(shí)會(huì)有問題。

          • topicQueueTable:topic消息隊(duì)列路由信息,消息發(fā)送時(shí)根據(jù)路由表進(jìn)行負(fù)載均衡。

          • brokerAddrTable:broker的基礎(chǔ)信息,包含brokerName、所屬集群名稱、主從broker的地址。

          • clusterAddrTable:broker集群信息,存儲(chǔ)集群中所有broker的名稱。

          • brokerLiveTable:broker狀態(tài)信息,namesrv每次收到broker發(fā)送的心跳包時(shí)都會(huì)更新該信息。

          • filterServerTable:broker上的FilterServer列表,用于類模式消費(fèi)過濾。

          通過代碼分析可知,其實(shí)namesrv里的核心也就是五個(gè)HashMap,這五個(gè)map中涉及三個(gè)類
          • QueueData:見名猜意,隊(duì)列數(shù)據(jù),該類是topic對(duì)應(yīng)的隊(duì)列信息的抽象

          c919c18b92334a998d4e714fe7ca414b.webp

          brokerName:broker的名稱
          readQueueNums:讀隊(duì)列個(gè)數(shù),一個(gè)broker默認(rèn)為每個(gè)topic創(chuàng)建4個(gè)讀隊(duì)列writeQueueNums:寫隊(duì)列個(gè)數(shù),?一個(gè)broker默認(rèn)為每個(gè)topic創(chuàng)建4個(gè)寫隊(duì)列perm:讀寫權(quán)限,一般設(shè)置為6,6:同時(shí)支持讀寫,?4:禁寫,2禁讀topicSynFlag:topic同步標(biāo)記,同步復(fù)制還是異步復(fù)制
          • BrokerData:broker數(shù)據(jù)的抽象

          36d6049de3a9e5236a379c2f916f1e66.webp

          cluster:broker集群的名稱
          brokerName:broker的名稱

          brokerAddrs:broker的ip集合,為什么是集合,因?yàn)榛橹鲝牡腷roker的brokerName是相同的,key是brokerId(0表示master,大于0表示slave),value是broker的ip地址。

          :一個(gè)broker集群包含多個(gè)broker主從,一個(gè)broker主從中的主和從的brokerName是一樣的。

          random:隨機(jī)數(shù)生成器,當(dāng)一條消息發(fā)送到broker集群,就是根據(jù)隨機(jī)數(shù)生成來選擇消息存儲(chǔ)到哪個(gè)broker實(shí)例上的。
          • BrokerLiveInfo:存活的broker信息,就是和namesrv有心跳連接的broker

          182f14c875e8e3bf5314685fbdeac6c9.webp

          lastUpdateTimestamp:最近一次心跳的時(shí)間戳dataVersion:版本號(hào),topic的配置信息如果有改動(dòng),此版本號(hào)會(huì)+1,channel:當(dāng)前broker和namesrv的socket連接haServerAddr:當(dāng)前主從節(jié)點(diǎn)中主節(jié)點(diǎn)(master)的ip下面我們通過debug來看看這五個(gè)map中的內(nèi)容,debug模式啟動(dòng)本地的namesrv,將centos中rocketmq的集群改為本地namesrv的地址即可

          :這里是為了看map中的集群信息,所以利用了centos中搭建的mq集群。

          b24cb787b0f9672c7e56977e299e580e.webp

          然后在org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest中打上斷點(diǎn),這個(gè)就是namesrv處理來自broker、producer、namesrv請(qǐng)求的類

          9d5fff70a6fdfdf9b76a8e7f46e72381.webp

          上面說了,broker是每隔30s向namesrv發(fā)送一次心跳,那么我們就在心跳請(qǐng)求的過程中窺探下那五個(gè)map中的內(nèi)容

          6cdad1cbd36e80ebc0687a0102c184b1.webp

          可以看到,請(qǐng)求進(jìn)來了,接著讓它進(jìn)入org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#registerBrokerWithFilterServer中的第219行停住,你就可以清晰的看到里面的內(nèi)容了。

          6bbf26fd2c2b56655f6a4a2ed3d04ec4.webp

          • 1.2、路由注冊(cè)

          在1.1中debug心跳的時(shí)候會(huì)發(fā)現(xiàn)請(qǐng)求進(jìn)入了org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest方法中的REGISTER_BROKER(broker注冊(cè))分支,所以說RocketMQ的路由注冊(cè)其實(shí)就是通過心跳功能實(shí)現(xiàn)的,broker會(huì)每隔30s向namesrv發(fā)送一次心跳,證明自己還活著(有點(diǎn)續(xù)命的意思),那broker是怎么發(fā)送心跳的?broker啟動(dòng)過程中調(diào)用的方法如下:
          • org.apache.rocketmq.broker.BrokerStartup#main

          • org.apache.rocketmq.broker.BrokerStartup#start

          • org.apache.rocketmq.broker.BrokerController#start

          我們看下org.apache.rocketmq.broker.BrokerController#start方法

          9e24870698087be41c4449bd862d8bcc.webp

          其實(shí)BrokerController中第814行到823行的定時(shí)任務(wù)就是心跳的核心代碼,brokerConfig.getRegisterNameServerPeriod()就是30s的時(shí)間間隔

          309fdbdc50dc09e34207151c0d0c081c.webp

          當(dāng)然,此時(shí)間可以配置,默認(rèn)是30s。定時(shí)任務(wù)中執(zhí)行的方法其實(shí)就是注冊(cè)的核心方法了,看下注冊(cè)方法的調(diào)用過程:
          • org.apache.rocketmq.broker.BrokerController#registerBrokerAll

          • org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll

          • org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll

          其中registerBrokerAll和doRegisterBrokerAll主要是做了一些校驗(yàn),封裝了一些參數(shù)(這里就不贅述了,不然太臃腫),核心的注冊(cè)邏輯其實(shí)是在registerBrokerAll方法,見注釋
          public List registerBrokerAll(        final String clusterName,        final String brokerAddr,        final String brokerName,        final long brokerId,        final String haServerAddr,        final TopicConfigSerializeWrapper topicConfigWrapper,        final List filterServerList,        final boolean oneway,        final int timeoutMills,        final boolean compressed) {
          final List registerBrokerResultList = Lists.newArrayList();????????//獲取所有namesrv的地址:ip端口 List nameServerAddressList = this.remotingClient.getNameServerAddressList();????????//遍歷namesrv集合 if (nameServerAddressList != null && nameServerAddressList.size() > 0) {????????????//封裝請(qǐng)求頭 final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();????????????//broker地址 requestHeader.setBrokerAddr(brokerAddr);????????????//brokerId,0:master,大于0:slave requestHeader.setBrokerId(brokerId);????????????//broker名稱 requestHeader.setBrokerName(brokerName);????????????//所屬集群的名稱 requestHeader.setClusterName(clusterName);????????????//當(dāng)前broker所屬主從中主節(jié)點(diǎn)的地址 requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed);????????????////封裝請(qǐng)求體 RegisterBrokerBody requestBody = new RegisterBrokerBody();????????????/**主題配置,topicConfigWrapper內(nèi)部封裝的是topicConfigManager中的????????????topicConfigTable,內(nèi)部存儲(chǔ)的是broker啟動(dòng)時(shí)默認(rèn)的一些topic,????????????,MixAll.SELF_TEST_TOPIC、MixAll.DEFAULT_TOPIC(AutoCreateTopic????????????-Enable=true)、MixAll.BENCHMARK_TOPIC、MixAll.OFFSET_MOVED_EVNET、??????????? BrokerConfig中的brokerClusterName、brokerName。Broker中Topic????????????默認(rèn)存儲(chǔ)在${Rocket_Home}/store/config/topic.json中*/ requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); final byte[] body = requestBody.encode(compressed); final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try {????????????????????????????//通過netty向namesrv發(fā)送網(wǎng)絡(luò)請(qǐng)求,注冊(cè)broker RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result != null) { registerBrokerResultList.add(result); }
          log.info("register broker to name server {} OK", namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally { countDownLatch.countDown(); } } }); }
          try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } }
          return registerBrokerResultList; }
          接下來本地啟動(dòng)broker,debug跟下看看它給namesrv發(fā)送請(qǐng)求的過程

          注:為了簡(jiǎn)潔明了,直接將斷點(diǎn)打到

          org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll方法中,著重看下封裝的參數(shù)即可

          4bcc0a672de4974469b7db9921cc5ab0.webp

          放行請(qǐng)求,它將到達(dá)namesrv的org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest方法

          9fbae48fee55fd1883d4e5cbf2648deb.webp

          注:broker每隔30s會(huì)發(fā)一個(gè)心跳,所以你會(huì)看到請(qǐng)求不斷地進(jìn)入

          org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll方法,為了排除干擾,我們將心跳時(shí)間改長(zhǎng)點(diǎn),然后重啟broker

          2eae37e3e58ba323443b6c0e5c38cf7c.webp

          接下來就詳細(xì)看下namesrv是怎么處理心跳包的

          e2a620609083d00c0489eedb0d4679a1.webp

          接著看org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#registerBrokerWithFilterServer方法先校驗(yàn)請(qǐng)求頭和請(qǐng)求體

          8bacfd9df21e08dcbb3be5df6a4ec871.webp

          接著調(diào)用org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker方法,核心邏輯來了先加鎖(讀寫鎖),避免并發(fā)操作路由表引起問題

          c75f77e61cc44ae74f0699135ff00949.webp

          然后根據(jù)集群名稱(clusterName)從clusterAddrTable(五個(gè)map中的其中一個(gè))取brokerNames,可以看到這里取出來的是一個(gè),就是我本地起的broker,然后將傳過來的brokerName添加到brokerNames

          c9752b5dd281354a8610aa3af119ad62.webp

          接著默認(rèn)不是第一次注冊(cè),再根據(jù)傳過來的brokerName從brokerAddrTable中查詢brokerData,如果取出來是空,就以傳過來的brokerName為key,構(gòu)造一個(gè)brokerData放入brokerAddrTable中;如果存在,直接替換原先的,然后把registerFirst標(biāo)識(shí)置為false,表示非第一次注冊(cè)。

          e92da397d7299bd14ec6e3d48eef6a40.webp

          接著判斷當(dāng)前broker是不是master(就是主從中的主),如果是主節(jié)點(diǎn),在判斷,如果topic的配置有更改或者是第一次注冊(cè),則需要?jiǎng)?chuàng)建或更新topic路由元數(shù)據(jù),填充topicQueueTable,其實(shí)就是為默認(rèn)topic自動(dòng)注冊(cè)路由信息,其中包括MixAll.DEFAULT_TOPIC的路由信息,當(dāng)消息生產(chǎn)者發(fā)送topic時(shí),如果該主題未創(chuàng)建并且BrokerConfig中的autoCreateTopicEnable為true時(shí),將返回MixAll.DEFAULT_TOPIC的路由信息。

          69c583acc46c68d1e54da976d92653f4.webp

          然后根據(jù)broker的地址去brokerLiveTable(存活的broker信息表)中取broker最近一次的心跳信息,如果取出來是空,打印一條新注冊(cè)的日志。然后檢查有沒有注冊(cè)broker的過濾器

          de1e86e3ff65285903409ecad5aa6245.webp

          如果當(dāng)前broker是從節(jié)點(diǎn),則需要查找該broker的master的節(jié)點(diǎn)信息,并更新對(duì)應(yīng)的masterAddr和haServerAddr屬性

          37f0a08309977c90d52db26806711027.webp

          最后注冊(cè)完,解鎖,完事

          8922ae4de789907a54427fb3393f924f.webp

          總結(jié):namesrv和broker保持長(zhǎng)連接,broker的存活狀態(tài)存儲(chǔ)在brokerLiveTable中,namesrv每收到一個(gè)心跳包,就會(huì)更新brokerLiveTable中broker的狀態(tài)信息以及其他路由表(topicQueueTable、brokerAddrTable、filterServerTable),在更新這些路由表時(shí)使用了讀寫鎖,允許多個(gè)消息發(fā)送者并發(fā)讀,保證消息發(fā)送時(shí)的高并發(fā),但是同一時(shí)刻namesrv只處理一個(gè)broker心跳請(qǐng)求,多個(gè)心跳請(qǐng)求串行執(zhí)行,這是讀寫鎖的經(jīng)典使用場(chǎng)景。
          • 1.3、路由刪除

          經(jīng)過上面的分析,我們已經(jīng)知道了broker每隔30s會(huì)向namesrv發(fā)送一個(gè)心跳包,那如果broker宕機(jī),namesrv就無(wú)法再收到此broker發(fā)送的心跳包,namesrv是怎么刪除下線的broker的?在1.1介紹路由元信息說了,namesrv如果連續(xù)120s沒有收到broker的心跳包,就認(rèn)為它已經(jīng)下線,移除并關(guān)閉和此broker的連接,同時(shí)更新路由表(就是那五個(gè)map),那namesrv是怎么實(shí)現(xiàn)這個(gè)功能的呢?看下namesrv啟動(dòng)過程中的類調(diào)用關(guān)系
          • org.apache.rocketmq.namesrv.NamesrvStartup#main

          • org.apache.rocketmq.namesrv.NamesrvStartup#main0

          • org.apache.rocketmq.namesrv.NamesrvStartup#start

          • org.apache.rocketmq.namesrv.NamesrvController#initialize

          NamesrvControllerinitialize方法開啟了一個(gè)叫做scanNotActiveBroker的定時(shí)任務(wù),它每隔10s執(zhí)行一次,從名字就可以看出這個(gè)定時(shí)任務(wù)是用來掃描不再存活的broker的

          e23a7eedde2229ba5cdc2a296ff7ca79.webp

          看下scanNotActiveBroker方法的邏輯
          public void scanNotActiveBroker() {        Iterator> it = this.brokerLiveTable.entrySet().iterator();        while (it.hasNext()) {            Entry next = it.next();            long last = next.getValue().getLastUpdateTimestamp();            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {                RemotingUtil.closeChannel(next.getValue().getChannel());                it.remove();                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());            }        }    }
          你會(huì)發(fā)現(xiàn)這個(gè)方法邏輯很簡(jiǎn)單,就是遍歷brokerLiveTable,判斷每一個(gè)brokerLiveInfo的lastUpdateTimestamp和當(dāng)前時(shí)間戳的差距是不是大于120s,如果是,就關(guān)閉和此broker的socket連接,并將此brokerLiveInfo從brokerLiveTable中刪除,然后刪除與該broker相關(guān)的路由信息,路由的具體刪除邏輯在onChannelDestroy方法中

          根據(jù)brokerAddress從brokerLiveTable、filterServerTable移除

          4b8f67ebf9593f23acb4bf10d2033dc2.webp

          遍歷brokerAddrTable,從BrokerData的brokerAddrs中找到具體的broker,從brokerData中移除,如果移除后在brokerData中不再包含其他broker,則在brokerAddrTable中移除該brokerName對(duì)應(yīng)的條目。

          f283dd2ade09bf3b2fd8ec38e41f98ad.webp

          根據(jù)brokerName從clusterAddrTable中找到broker并從集群中刪除,如果移除后,集群中不包含任何的broker,則將該集群從clusterAddrTable中移除。

          0aaebd6babcc7dc1c3512e3a57926c78.webp

          根據(jù)brokerName,遍歷所有topic的隊(duì)列,如果隊(duì)列中包含了當(dāng)前broker的隊(duì)列,則移除,如果topic只包含待移除broker的隊(duì)列的話,從路由表中刪除該topic。

          8cc8caf28d52c7a1f574d2e135d6bddf.webp

          • 1.4、路由發(fā)現(xiàn)

          通過1.3的分析可知,路由信息是在不斷變化的,有新broker加入或者老broker下線都會(huì)引起路由表的變動(dòng),那客戶端(producer、consumer)是怎么知道最新的路由信息的?是namesrv定時(shí)推給客戶端嗎?其實(shí)不是的,是客戶端根據(jù)topic主動(dòng)定時(shí)的到namesrv拉取的,對(duì)應(yīng)的requestCode是RequestCode.GET_ROUTEINTO_BY_TOPIC

          b258d08ea6c9de3ee21ba5692aee086c.webp

          調(diào)用的方法是org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic
          public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,        RemotingCommand request) throws RemotingCommandException {        final RemotingCommand response = RemotingCommand.createResponseCommand(null);        final GetRouteInfoRequestHeader requestHeader =            (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);????????//1:        TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());????????//2:        if (topicRouteData != null) {            if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {                String orderTopicConf =                    this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,                        requestHeader.getTopic());                topicRouteData.setOrderTopicConf(orderTopicConf);            }
          byte[] content = topicRouteData.encode(); response.setBody(content); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } //3: response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic() + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; }
          方法主要邏輯如下:
          • 調(diào)用pickupTopicRouteData方法,從路由表topicQueueTable、brokerAddrTable、filterServerTable中分別填充到TopicRouteData中的queueData、brokerDatas、filterServerTable屬性中。

          1. List:topic隊(duì)列元數(shù)據(jù)

          2. List:topic分布的broker元數(shù)據(jù)

          3. filterServer:broker上過濾服務(wù)器地址列表

          • 如果找到topic對(duì)應(yīng)的路由信息并且該topic為順序消息,則從NameServerKVconfig中獲取關(guān)于順序消息相關(guān)的配置填充路由信息。

          • 如果找不到路由信息就返回ResponseCode.TOPIC_NOT_EXIST

          • 二、總結(jié)

          今天算是詳細(xì)的分析了namesrv的功能,包括路由元數(shù)據(jù)、路由注冊(cè)、路由刪除、路由發(fā)現(xiàn)四大模塊,用張圖形象的表示如下:

          34b211461931633aa0273ba512908067.webp

          瀏覽 93
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日三级毛片 | 国产成人视频免费看 | 成人网AV | 老师的粉嫩小又紧水又多A片视频 | 国产福利一区二区在线观看 |