<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          通過(guò)這三個(gè)文件徹底搞懂rocketmq的存儲(chǔ)原理

          共 15613字,需瀏覽 32分鐘

           ·

          2021-10-25 18:58

          RocketMQ是阿里開(kāi)發(fā)的一個(gè)高性能的消息隊(duì)列,支持各種消息類型,而且支持事務(wù)消息,可以說(shuō)是現(xiàn)在的很多系統(tǒng)中的香餑餑了,所以呢,怎么使用大家肯定是要學(xué)習(xí)的

          ?

          我們作為一個(gè)有夢(mèng)想的程序員,在學(xué)習(xí)一門技術(shù)的時(shí)候,肯定是不能光知其然,這是遠(yuǎn)遠(yuǎn)不夠的,我們必須要知其所以然,這樣才能在面試的時(shí)候侃侃而談,啊呸,不對(duì),這樣我們才能在工作中遇到問(wèn)題的時(shí)候,理性的去思考如何解決問(wèn)題

          ?

          我們知道RocketMQ的架構(gòu)是producer、NameServer、broker、Consumer,producer是生產(chǎn)消息的,NameServer是路由中心,負(fù)責(zé)服務(wù)的注冊(cè)發(fā)現(xiàn)以及路由管理這些。

          Consumer是屬于消費(fèi)消息的,broker則屬于真正的存儲(chǔ)消息,以及進(jìn)行消息的持久化,也就是存儲(chǔ)消息的文件和索引消息的文件都在broker上

          ?

          消息隊(duì)列的主要作用是解耦異步削峰,也就意味著消息隊(duì)列中的存儲(chǔ)功能是必不可少的,而隨著時(shí)代的發(fā)展,業(yè)務(wù)量的增加也對(duì)消息隊(duì)列的存儲(chǔ)功能的強(qiáng)度的要求越來(lái)越高了

          ?

          也就是說(shuō)你不能光性能好,你得存儲(chǔ)的消息也得足夠支撐我的業(yè)務(wù)量,你只能存儲(chǔ)100MB的消息,我這系統(tǒng)每分鐘的消息業(yè)務(wù)量可能500MB了,那肯定不夠使啊,那還削個(gè)啥的峰啊,峰來(lái)了你自己都頂不住


          ?

          RocketMQ憑借其強(qiáng)大的存儲(chǔ)能力和強(qiáng)大的消息索引能力,以及各種類型消息和消息的特性脫穎而出,于是乎,我們這些有夢(mèng)想的程序員學(xué)習(xí)RocketMQ的存儲(chǔ)原理也變得尤為重要

          ?

          而要說(shuō)起這個(gè)存儲(chǔ)原理,則不得不說(shuō)的就是RocketMQ的消息存儲(chǔ)文件commitLog文件,消費(fèi)方則是憑借著巧妙的設(shè)計(jì)Consumerqueue文件來(lái)進(jìn)行高性能并且不混亂的消費(fèi),還有RocketMQ的強(qiáng)大的支持消息索引的特性,靠的就是indexfile索引文件

          ?

          我們這篇文章就從這commitLog、Consumerqueue、indexfile這三個(gè)神秘的文件說(shuō)起,搞懂這三個(gè)文件,RocketMQ的核心就被你掏空了

          ?

          先上個(gè)圖,寫入commitLog文件時(shí)commitLog和Consumerqueue、indexfile文件三者的關(guān)系


          Commitlog文件

          大小和命名規(guī)則

          RocketMQ中的消息存儲(chǔ)文件放在${ROCKET_HOME}/store 目錄下,當(dāng)生產(chǎn)者發(fā)送消息時(shí),broker會(huì)將消息存儲(chǔ)到Commit文件夾下,文件夾下面會(huì)有一個(gè)commitLog文件,但是并不是意味著這個(gè)文件叫這個(gè),文件命名是根據(jù)消息的偏移量來(lái)決定的
          ?

          文件有自己的生成規(guī)則,每個(gè)commitLog文件的大小是1G,一般情況下第一個(gè) CommitLog 的起始偏移量為 0,第二個(gè) CommitLog 的起始偏移量為 1073741824 (1G = 1073741824byte)。
          ?
          也正是因?yàn)樵撐募奈募忠?guī)則,所以也可以更好的知道消息處于哪個(gè)文件中,假設(shè)物理偏移量是1073741830,則相對(duì)的偏移量是6(6 = 1073741830 - 1073741824),于是判斷出該消息位于第二個(gè)commitLog文件上,下面要說(shuō)的Consumerqueue文件和indexfile文件都是通過(guò)偏移量來(lái)計(jì)算出消息位于哪個(gè)文件,進(jìn)行更為精準(zhǔn)的定位,減少了IO次數(shù)

          文件存儲(chǔ)規(guī)則和特點(diǎn)

          commitLog文件的最大的一個(gè)特點(diǎn)就是消息的順序?qū)懭?,隨機(jī)讀寫,關(guān)于commitLog的文件的落盤有兩種,一種是同步刷盤,一種是異步刷盤,可通過(guò) flushDiskType 進(jìn)行配置
          ?
          在寫入commitLog的時(shí)候內(nèi)部會(huì)有一個(gè)mappedFile內(nèi)存映射文件,消息是先寫入到這個(gè)內(nèi)存映射文件中,然后根據(jù)刷盤策略寫到硬盤中,對(duì)于producer的角度來(lái)說(shuō)就是,同步就是當(dāng)消息真正的寫到硬盤的時(shí)候才會(huì)給producer返回成功,而異步就是當(dāng)消息到達(dá)內(nèi)存的時(shí)候就返回成功了,然后異步的去刷盤
          ?
          跑題了,最大的特點(diǎn)順序?qū)懭?/strong>,所有的topic的消息都存儲(chǔ)到commitLog文件中,順序?qū)懭肟梢猿浞值睦么疟P順序減少了IO爭(zhēng)用數(shù)據(jù)存儲(chǔ)的性能,kafka也是通過(guò)硬盤順序存盤的
          ?
          大家都常說(shuō)硬盤的速度比內(nèi)存慢,其實(shí)這句話也是有歧義的,當(dāng)硬盤順序?qū)懭牒妥x取的時(shí)候,速度不比內(nèi)存慢,甚至比內(nèi)存速度快,這種存儲(chǔ)方式就好比數(shù)組,我們?nèi)绻罃?shù)組的下標(biāo),則可以直接通過(guò)下標(biāo)計(jì)算出位置,找到內(nèi)存地址,眾所周知,數(shù)組的讀取是很快的,但是數(shù)組的缺點(diǎn)在于插入數(shù)據(jù)比較慢,因?yàn)槿绻谥虚g插入數(shù)據(jù)需要將后面的數(shù)據(jù)往后移動(dòng)
          ?
          而對(duì)于數(shù)組來(lái)說(shuō),如果我們只會(huì)順序的往后添加,數(shù)組的速度也是很快的,因?yàn)閿?shù)組沒(méi)有后續(xù)的數(shù)據(jù)的移動(dòng),這一操作很耗時(shí)

          回到RocketMQ中的commitLog文件,也是同樣的道理,順序的寫入文件也就不需要太多的去考慮寫入的位置,直接找到文件往后放就可以了,而取數(shù)據(jù)的時(shí)候,也是和數(shù)組一樣,我們可以通過(guò)文件的大小去精準(zhǔn)的定位到哪一個(gè)文件,然后再精準(zhǔn)的定位到文件的位置



          當(dāng)然,至于這個(gè)索引位置就是靠下面的Consumerqueue文件和indexfile文件來(lái)找到消息的位置的,也就是索引地址
          ?
          哦對(duì)了,數(shù)組的元素大小是一樣的,并不意味這commitLog文件的各個(gè)消息存儲(chǔ)空間一樣
          簡(jiǎn)單看下源碼

          這部分源碼在DefaultMessageStore.putMessage
            @Override    public PutMessageResult putMessage(MessageExtBrokerInner msg) {        if (this.shutdown) {            log.warn("message store has shutdown, so putMessage is forbidden");            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);        }
          // 從節(jié)點(diǎn)不允許寫入 if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("message store is slave mode, so putMessage is forbidden "); }
          return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); }
          // store是否允許寫入 if (!this.runningFlags.isWriteable()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits()); }
          return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } else { this.printTimes.set(0); }
          // topic過(guò)長(zhǎng) if (msg.getTopic().length() > Byte.MAX_VALUE) { log.warn("putMessage message topic length too long " + msg.getTopic().length()); return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); }
          // 消息附加屬性過(guò)長(zhǎng) if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); }
          if (this.isOSPageCacheBusy()) { return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); }
          long beginTime = this.getSystemClock().now(); // 添加消息到commitLog PutMessageResult result = this.commitLog.putMessage(msg);
          long eclipseTime = this.getSystemClock().now() - beginTime; if (eclipseTime > 500) { log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length); } this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
          if (null == result || !result.isOk()) { this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); }
          return result; }
          中間的commitLog.putMessage就是負(fù)責(zé)實(shí)現(xiàn)消息寫入commitLog文件,這個(gè)太長(zhǎng)了,我就不給大家截了
          ?
          大致流程就是組裝消息,放入屬性,然后通過(guò)MappedFile對(duì)象寫入文件,緊接著根據(jù)刷盤策略刷盤,最后進(jìn)行主從同步


          consumerQueue文件


          RocketMQ是分為多個(gè)topic,消息所屬主題,屬于消息類型,每一個(gè)topic有多個(gè)queue,每個(gè)queue放著不同的消息,在同一個(gè)消費(fèi)者組下的消費(fèi)者,可以同時(shí)消費(fèi)同一個(gè)topic下的不同queue隊(duì)列的消息。不同消費(fèi)者下的消費(fèi)者,可以同時(shí)消費(fèi)同一個(gè)topic下的相同的隊(duì)列的消息。而同一個(gè)消費(fèi)者組下的消費(fèi)者,不可以同時(shí)消費(fèi)不同topic下的消息
          ?
          而每個(gè)topic下的queue隊(duì)列都會(huì)對(duì)應(yīng)一個(gè)Consumerqueue文件,例如Topic中有三個(gè)隊(duì)列,每個(gè)隊(duì)列中的消息索引都會(huì)有一個(gè)編號(hào),編號(hào)從0開(kāi)始,往上遞增。并由此一個(gè)位點(diǎn)offset的概念,有了這個(gè)概念,就可以對(duì)Consumer端的消費(fèi)情況進(jìn)行隊(duì)列定義。
          ?
          消息消費(fèi)完成后,需要將消費(fèi)進(jìn)度存儲(chǔ)起來(lái),即前面提到的offset。廣播模式下,同消費(fèi)組的消費(fèi)者相互獨(dú)立,消費(fèi)進(jìn)度要單獨(dú)存儲(chǔ);集群模式下,同一條消息只會(huì)被同一個(gè)消費(fèi)組消費(fèi)一次,消費(fèi)進(jìn)度會(huì)參與到負(fù)載均衡中,故消費(fèi)進(jìn)度是需要共享的。
          ?
          消費(fèi)進(jìn)度,也就是由Broker管理每一個(gè)消費(fèi)者消費(fèi)Topic的進(jìn)度,包含正常提交消費(fèi)進(jìn)度和重置消費(fèi)進(jìn)度,消費(fèi)進(jìn)度管理的目的是保證消費(fèi)者在正常運(yùn)行狀態(tài)、重啟、異常關(guān)閉等狀態(tài)下都能準(zhǔn)確續(xù)接“上一次”未處理的消息。
          ?
          在RocketMQ中,實(shí)現(xiàn)的消費(fèi)語(yǔ)義叫“至少投遞一次”,也就是所有的消息至少有一次機(jī)會(huì)消費(fèi)不用擔(dān)心會(huì)丟消息。用戶需要實(shí)現(xiàn)消費(fèi)冪等來(lái)避免重復(fù)投遞對(duì)業(yè)務(wù)實(shí)際數(shù)據(jù)的影響。

          冪等是啥應(yīng)該不用我多說(shuō)了吧,親愛(ài)的你們肯定知道了


          如上圖所示,消費(fèi)者一般在兩種情況下“上報(bào)”消費(fèi)進(jìn)度,消費(fèi)成功后(包含正常消費(fèi)成功、重試消費(fèi)成功)和重置消費(fèi)進(jìn)度。
          ?
          而消費(fèi)進(jìn)度的標(biāo)準(zhǔn)就是Consumerqueue文件,這個(gè)文件中存儲(chǔ)的是投遞到某一個(gè)messagequeue中的位置信息
          ?
          比如我們知道消息存儲(chǔ)到commitLog文件中,一個(gè)消費(fèi)者A對(duì)應(yīng)著消費(fèi)messagequeueA這個(gè)隊(duì)列,但是無(wú)法確定在commitLog文件中該隊(duì)列中的消息的位置,于是就有了ConsumerqueueA這個(gè)文件,這個(gè)文件對(duì)應(yīng)一個(gè)messagequeueA,消費(fèi)者A便可以通過(guò)ConsumerqueueA來(lái)確定自己的消費(fèi)進(jìn)度,獲取消息在commitLog文件中的具體的offset和大小
          存放位置和結(jié)構(gòu)
          consumequeue存放在store文件里面,里面的consumequeue文件里面按照topic排放,然后每個(gè)topic默認(rèn)4個(gè)隊(duì)列,里面存放的consumequeue文件
          ?
          ConsumeQueue中并不需要存儲(chǔ)消息的內(nèi)容,而存儲(chǔ)的是消息在CommitLog中的offset。也就是說(shuō)ConsumeQueue其實(shí)是CommitLog的一個(gè)索引文件。
          ?
          consumequeue是定長(zhǎng)結(jié)構(gòu),每個(gè)記錄固定大小20個(gè)字節(jié),單個(gè)consumequeue文件默認(rèn)包含30w個(gè)條目,所以單個(gè)文件大小大概6M左右


          很顯然,Consumer消費(fèi)消息的時(shí)候,要讀2次:先讀ConsumeQueue得到offset,再通過(guò)offset找到CommitLog對(duì)應(yīng)的消息內(nèi)容。

          ConsumeQueue的作用
          消費(fèi)者通過(guò)broker保存的offset(offsetTable.offset json文件中保存的ConsumerQueue的下標(biāo))可以在ConsumeQueue中獲取消息,從而快速的定位到commitLog的消息位置,由于每個(gè)消息的大小是不一樣的,也可以通過(guò)size獲取到消息的大小,從而讀取完整的消息
          ?
          過(guò)濾tag是也是通過(guò)遍歷ConsumeQueue來(lái)實(shí)現(xiàn)的(先比較hash(tag)符合條件的再到具體消息比較tag)

          offsetTable.offset

          和commitLog的offset不是一回事,這個(gè)offset是ConsumeQueue文件的(已經(jīng)消費(fèi)的)下標(biāo)/行數(shù),可以直接定位到ConsumeQueue并找到commitlogOffset從而找到消息體原文。這個(gè)offset是消息消費(fèi)進(jìn)度的核心,不同的消費(fèi)模式,保存地址不同
          ?
          廣播模式:DefaultMQPushConsumer的BROADCASTING模式,各個(gè)Consumer沒(méi)有互相干擾,使用LoclaFileOffsetStore,把Offset存儲(chǔ)在Consumer本地
          ?
          集群模式:DefaultMQPushConsumer的CLUSTERING模式,由Broker端存儲(chǔ)和控制Offset的值,使用RemoteBrokerOffsetStore
          簡(jiǎn)單看下構(gòu)建過(guò)程

          在Broker中,構(gòu)建ComsummerQueue不是存儲(chǔ)完CommitLog就馬上同步構(gòu)建的,而是通過(guò)一個(gè)線程任務(wù)異步的去做這個(gè)事情。在DefaultMessageStore中有一個(gè)ReputMessageService成員,它就是負(fù)責(zé)構(gòu)建ComsumerQueue的任務(wù)線程。
          ?
          ReputMessageService繼承自ServiceThread,表明其是一個(gè)服務(wù)線程,它的run方法很簡(jiǎn)單,如下所示:
          public void run() {            while (!this.isStopped()) {                try {                    Thread.sleep(1);                    this.doReput(); // 構(gòu)建ComsumerQueue                } catch (Exception e) {                    DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);                }            }        }
          在run方法里,每休息1毫秒就進(jìn)行一次構(gòu)建ComsumerQueue的動(dòng)作。因?yàn)楸仨毾葘懭隒ommitLog,然后才能進(jìn)行ComsumerQueue的構(gòu)建。那么不排除構(gòu)建ComsumerQueue的速度太快了,而CommitLog還沒(méi)寫入新的消息。這時(shí)就需要sleep下,讓出cpu時(shí)間片,避免浪費(fèi)CPU資源。
          ?
          我們點(diǎn)進(jìn)去這個(gè)doReput()看核心處理邏輯
          private void doReput() {            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);// 拿到所有的最新寫入CommitLog的數(shù)據(jù)                if (result != null) {                    try {                        this.reputFromOffset = result.getStartOffset();
          for (int readSize = 0; readSize < result.getSize() && doNext; ) { DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); // 一條一條的讀消息 int size = dispatchRequest.getMsgSize();
          if (dispatchRequest.isSuccess()) { if (size > 0) { DefaultMessageStore.this.doDispatch(dispatchRequest); // 派發(fā)消息,進(jìn)行處理,其中就包括構(gòu)建ComsumerQueue this.reputFromOffset += size; readSize += size; } else if (size == 0) { // this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); readSize = result.getSize(); } } else if (!dispatchRequest.isSuccess()) { // 獲取消息異常
          if (size > 0) { log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset); this.reputFromOffset += size; } else { doNext = false; if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) { this.reputFromOffset += result.getSize() - readSize; } } } } } finally { result.release(); } } else { doNext = false; } } }
          我在這里省略了一些和構(gòu)建ComsumerQueue不相干的代碼。
          ?
          其實(shí)在doReput里面就做了三件事:

          1、獲取最新寫入到CommitLog中的數(shù)據(jù)byteBuffer。
          2、從byteBuffer中一條條的讀取消息,并派發(fā)出去處理。
          3、更新reputFromOffset位移。
          ?
          感興趣的可以打斷點(diǎn)走一遍


          indexFile文件



          RocketMQ還支持通過(guò)MessageID或者M(jìn)essageKey來(lái)查詢消息,使用ID查詢時(shí),因?yàn)镮D就是用broker+offset生成的(這里msgId指的是服務(wù)端的),所以很容易就找到對(duì)應(yīng)的commitLog文件來(lái)讀取消息。

          對(duì)于用MessageKey來(lái)查詢消息,MessageStore通過(guò)構(gòu)建一個(gè)index來(lái)提高讀取速度

          文件結(jié)構(gòu)


          indexfile文件存儲(chǔ)在store目錄下的index文件里面,里面存放的是消息的hashcode和index內(nèi)容,文件由一個(gè)文件頭組成:長(zhǎng)40字節(jié)。500w個(gè)hashslot,每個(gè)4字節(jié)。2000w個(gè)index條目,每個(gè)20字節(jié)。
          ?
          所以這里我們可以估算每個(gè)indexfile的大小為:40+500w4+2000w20個(gè)字節(jié),大約400M左右

          文件詳細(xì)信息

          IndexHeader:索引文件頭信息由40個(gè)字節(jié)組成
          //8位 該索引文件的第一個(gè)消息(Message)的存儲(chǔ)時(shí)間(落盤時(shí)間)this.byteBuffer.putLong(beginTimestampIndex, this.beginTimestamp.get());//8位 該索引文件的最后一個(gè)消息(Message)的存儲(chǔ)時(shí)間(落盤時(shí)間)this.byteBuffer.putLong(endTimestampIndex, this.endTimestamp.get());//8位 該索引文件第一個(gè)消息(Message)的在CommitLog(消息存儲(chǔ)文件)的物理位置偏移量(可以通過(guò)該物理偏移直接獲取到該消息)this.byteBuffer.putLong(beginPhyoffsetIndex, this.beginPhyOffset.get());//8位 該索引文件最后一個(gè)消息(Message)的在CommitLog(消息存儲(chǔ)文件)的物理位置偏移量this.byteBuffer.putLong(endPhyoffsetIndex, this.endPhyOffset.get());//4位 該索引文件目前的hash slot的個(gè)數(shù)this.byteBuffer.putInt(hashSlotcountIndex, this.hashSlotCount.get());//4位 索引文件目前的索引個(gè)數(shù)this.byteBuffer.putInt(indexCountIndex, this.indexCount.get());
          Slot槽位,默認(rèn)每個(gè)文件配置的slot是500萬(wàn)個(gè),每個(gè)slot是4位的整型數(shù)據(jù),Slot每個(gè)節(jié)點(diǎn)保存當(dāng)前已經(jīng)擁有多少個(gè)index數(shù)據(jù)了
          //slot的數(shù)據(jù)存放位置 40 + keyHash %(500W)* 4int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
          //Slot Table//4字節(jié)//記錄該slot當(dāng)前index,如果hash沖突(即absSlotPos一致)作為下一次該slot新增的前置indexthis.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
          ?索引消息內(nèi)容,消息長(zhǎng)度固定為20位
          //Index Linked list//topic+message key的hash值this.mappedByteBuffer.putInt(absIndexPos, keyHash);//消息在CommitLog的物理文件地址, 可以直接查詢到該消息(索引的核心機(jī)制)this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);//消息的落盤時(shí)間與header里的beginTimestamp的差值(為了節(jié)省存儲(chǔ)空間,如果直接存message的落盤時(shí)間就得8bytes)this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);//9、記錄該slot上一個(gè)index//hash沖突處理的關(guān)鍵之處, 相同hash值上一個(gè)消息索引的index(如果當(dāng)前消息索引是該hash值的第一個(gè)索引,則prevIndex=0, 也是消息索引查找時(shí)的停止條件),每個(gè)slot位置的第一個(gè)消息的prevIndex就是0的this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
          再論結(jié)構(gòu)

          文件結(jié)構(gòu)slot和indexLinkedList可以理解成java中的HashMap

          哎,你說(shuō)HashMap我可不困了啊,你可別蒙我,這個(gè)我熟,什么負(fù)載因子、默認(rèn)大小、擴(kuò)容機(jī)制、紅黑樹(shù),還有多線程下不安全這些

          乖,我知道你熟悉,你跟著我一起學(xué)習(xí),這些當(dāng)然了如指掌,只需要你了解HashMap的結(jié)構(gòu)和沖突即可


          每放入一個(gè)新消息的index進(jìn)來(lái),首先會(huì)取MessageKey的HashCode,然后用Hashcode對(duì)slot的總數(shù)進(jìn)行取模,決定該消息key的位置,slot的總數(shù)默認(rèn)是500W個(gè)

          只要取hash就必然面臨著hash沖突的問(wèn)題,indexfile也是采用鏈表結(jié)構(gòu)來(lái)解決hash沖突,這一點(diǎn)和HashMap一樣的,不過(guò)這個(gè)不存在紅黑樹(shù)轉(zhuǎn)換這一說(shuō),個(gè)人猜測(cè)這個(gè)的沖突數(shù)量也達(dá)不到很高的級(jí)別,所以進(jìn)行這方面的設(shè)計(jì)也沒(méi)啥必要,甚至變成了強(qiáng)行增加indexfile的文件結(jié)構(gòu)難度

          還有,在indexfile中的slot中放的是最新的index的指針,因?yàn)橐话悴樵兊臅r(shí)候大概率是優(yōu)先查詢最近的消息

          每個(gè)slot中放的指針值是索引在indexfile中的偏移量,也就是后面index的位置,而index中存放的就是該消息在commitlog文件中的offset,每個(gè)index的大小是20字節(jié),所以根據(jù)當(dāng)前索引是這個(gè)文件中的第幾個(gè)偏移量,也就很容易定位到索引的位置,根據(jù)前面的固定大小可以很快把真實(shí)坐標(biāo)算出來(lái),以此類推,形成一個(gè)鏈表的結(jié)構(gòu)

          查詢流程

          由于indexHeader,slot,index都是固定大小,所以:
          ?
          • 公式1:第n個(gè)slot在indexFile中的起始位置是這樣:40+(n-1)*4

          • 公式2:第s個(gè)index在indexFile中的起始位置是這樣:40+5000000*4+(s-1)*20
          ?
          查詢的傳入值除了key外,還包含一個(gè)時(shí)間起始值以及截止值
          ?
          為啥還要傳時(shí)間范圍呢?

          一個(gè)indexFile寫完一個(gè)會(huì)繼續(xù)寫下一個(gè),僅僅一個(gè)key無(wú)法定位到具體的indexFile,時(shí)間范圍就為了更精確的定位到具體的indexFile,縮小查找的范圍,indexFile文件名是一個(gè)時(shí)間戳,根據(jù)這個(gè)日期就可以定位到傳入的日期范圍對(duì)應(yīng)在哪個(gè)或者哪些indexFile中,是不是很棒。

          好了,我們接著說(shuō)查詢流程
          ?
          key-->計(jì)算hash值-->hash值對(duì)500萬(wàn)取余算出對(duì)應(yīng)的slot序號(hào)-->根據(jù)40+(n-1)*4(公式1)算出該slot在文件中的位置-->讀取slot值,也就是index序號(hào)-->根據(jù)40+5000000*4+(s-1)*20(公式2)算出該index在文件中的位置-->讀取該index-->將key的hash值以及傳入的時(shí)間范圍與index的keyHash值以及timeDiff值進(jìn)行比對(duì)
          ?
          不滿足則根據(jù)index中的preIndexNo找到上一個(gè)index,繼續(xù)上一步;滿足則根據(jù)index中的phyOffset拿到commitLog中的消息
          ?
          為啥比對(duì)時(shí)還要帶上時(shí)間范圍呢?

          只比key不行嗎?答案是不行,因?yàn)?span style="font-family: Optima-Regular, PingFangTC-light;font-size: 15px;color: rgb(61, 167, 66);">key可能會(huì)重復(fù),producer在消息生產(chǎn)時(shí)可以指定消息的key,這個(gè)key顯然無(wú)法保證唯一性,那自動(dòng)生成的msgId呢?也不能保證唯一,你可以去看看msgId的生成規(guī)則
          ?
          包括當(dāng)前機(jī)器IP+進(jìn)程號(hào)+MessageClientIDSetter.class.getClassLoader()的hashCode值+消息生產(chǎn)時(shí)間與broker啟動(dòng)時(shí)間的差值+broker啟動(dòng)后從0開(kāi)始單調(diào)自增的int值,前面三項(xiàng)很明顯可能重復(fù),后面兩項(xiàng)一個(gè)是時(shí)間差,一個(gè)是重啟歸零,也可能重復(fù)

          簡(jiǎn)單看下源碼,感興趣的下載源碼去研究

          indexfile的添加消息索引的過(guò)程
          public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {        //1. 判斷該索引文件的索引數(shù)小于最大的索引數(shù),如果>=最大索引數(shù),IndexService就會(huì)嘗試新建一個(gè)索引文件        if (this.indexHeader.getIndexCount() < this.indexNum) {            //2. 計(jì)算該message key的hash值            int keyHash = indexKeyHashMethod(key);            //3. 根據(jù)message key的hash值散列到某個(gè)hash slot里            int slotPos = keyHash % this.hashSlotNum;            //4. 計(jì)算得到該hash slot的實(shí)際文件位置Position            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
          try { //5. 根據(jù)該hash slot的實(shí)際文件位置absSlotPos得到slot里的值 //這里有兩種情況: //1). slot=0, 當(dāng)前message的key是該hash值第一個(gè)消息索引 //2). slot>0, 該key hash值上一個(gè)消息索引的位置 int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
          //6. 數(shù)據(jù)校驗(yàn)及修正 if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) { slotValue = invalidIndex; }
          long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
          timeDiff = timeDiff / 1000;
          if (this.indexHeader.getBeginTimestamp() <= 0) { timeDiff = 0; } else if (timeDiff > Integer.MAX_VALUE) { timeDiff = Integer.MAX_VALUE; } else if (timeDiff < 0) { timeDiff = 0; }
          //7. 計(jì)算當(dāng)前消息索引具體的存儲(chǔ)位置(Append模式) int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize; //8. 存入該消息索引 this.mappedByteBuffer.putInt(absIndexPos, keyHash); this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
          //9. 關(guān)鍵之處:在該key hash slot處存入當(dāng)前消息索引的位置,下次通過(guò)該key進(jìn)行搜索時(shí) //會(huì)找到該key hash slot -> slot value -> curIndex -> //if(curIndex.prevIndex>0) pre index (一直循環(huán) 直至該curIndex.prevIndex==0就停止) this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
          if (this.indexHeader.getIndexCount() <= 1) { this.indexHeader.setBeginPhyOffset(phyOffset); this.indexHeader.setBeginTimestamp(storeTimestamp); }
          this.indexHeader.incHashSlotCount(); this.indexHeader.incIndexCount(); this.indexHeader.setEndPhyOffset(phyOffset); this.indexHeader.setEndTimestamp(storeTimestamp);
          return true; } catch (Exception e) { log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e); } } else { log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount() + "; index max num = " + this.indexNum); }
          return false; }
          indexfile的索引搜索源碼
          public void selectPhyOffset(final List phyOffsets, final String key, final int maxNum,        final long begin, final long end, boolean lock) {        if (this.mappedFile.hold()) {            //1. 計(jì)算該key的hash            int keyHash = indexKeyHashMethod(key);            //2. 計(jì)算該hash value 對(duì)應(yīng)的hash slot位置            int slotPos = keyHash % this.hashSlotNum;            //3. 計(jì)算該hash value 對(duì)應(yīng)的hash slot物理文件位置            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
          FileLock fileLock = null; try { //4. 取出該hash slot 的值 int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
          //5. 該slot value <= 0 就代表沒(méi)有該key對(duì)應(yīng)的消息索引,直接結(jié)束搜索 // 該slot value > maxIndexCount 就代表該key對(duì)應(yīng)的消息索引超過(guò)最大限制,數(shù)據(jù)有誤,直接結(jié)束搜索 if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount() || this.indexHeader.getIndexCount() <= 1) { } else { //6. 從當(dāng)前slot value 開(kāi)始搜索 for (int nextIndexToRead = slotValue; ; ) { if (phyOffsets.size() >= maxNum) { break; }
          //7. 找到當(dāng)前slot value(也就是index count)物理文件位置 int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + nextIndexToRead * indexSize;
          //8. 讀取消息索引數(shù)據(jù) int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos); long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
          long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8); //9. 獲取該消息索引的上一個(gè)消息索引index(可以看成鏈表的prev 指向上一個(gè)鏈節(jié)點(diǎn)的引用) int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4); //10. 數(shù)據(jù)校驗(yàn) if (timeDiff < 0) { break; }
          timeDiff *= 1000L;
          long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; boolean timeMatched = (timeRead >= begin) && (timeRead <= end); //10. 數(shù)據(jù)校驗(yàn)比對(duì) hash值和落盤時(shí)間 if (keyHash == keyHashRead && timeMatched) { phyOffsets.add(phyOffsetRead); }
          //當(dāng)prevIndex <= 0 或prevIndex > maxIndexCount 或prevIndexRead == nextIndexToRead 或 timeRead < begin 停止搜索 if (prevIndexRead <= invalidIndex || prevIndexRead > this.indexHeader.getIndexCount() || prevIndexRead == nextIndexToRead || timeRead < begin) { break; }
          nextIndexToRead = prevIndexRead; } } } catch (Exception e) { log.error("selectPhyOffset exception ", e); } finally {
          this.mappedFile.release(); } } }

          有道無(wú)術(shù),術(shù)可成;有術(shù)無(wú)道,止于術(shù)

          歡迎大家關(guān)注Java之道公眾號(hào)


          好文章,我在看??

          瀏覽 101
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  免费做爱网站在线观看 | 韩国一区二区三区在线观看 | 免费的一级黄色片 | 伊人AV综合 | 先锋自拍偷拍网 |