<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RocketMQ(十):數(shù)據(jù)存儲(chǔ)模型的設(shè)計(jì)與實(shí)現(xiàn)

          共 21767字,需瀏覽 44分鐘

           ·

          2021-02-22 03:29

          走過(guò)路過(guò)不要錯(cuò)過(guò)

          點(diǎn)擊藍(lán)字關(guān)注我們


          消息中間件,說(shuō)是一個(gè)通信組件也沒(méi)有錯(cuò),因?yàn)樗谋韭毠ぷ魇亲鱿⒌膫鬟f。然而要做到高效的消息傳遞,很重要的一點(diǎn)是數(shù)據(jù)結(jié)構(gòu),數(shù)據(jù)結(jié)構(gòu)設(shè)計(jì)的好壞,一定程度上決定了該消息組件的性能以及能力上限。


          1. 消息中間件的實(shí)現(xiàn)方式概述

          消息中間件實(shí)現(xiàn)起來(lái)自然是很難的,但我們可以從某些角度,簡(jiǎn)單了說(shuō)說(shuō)實(shí)現(xiàn)思路。

          它的最基本的兩個(gè)功能接口為:接收消息的發(fā)送(produce), 消息的消費(fèi)(consume). 就像一個(gè)郵遞員一樣,經(jīng)過(guò)它與不經(jīng)過(guò)它實(shí)質(zhì)性的東西沒(méi)有變化,它只是一個(gè)中介(其他功能效應(yīng),咱們拋卻不說(shuō))。

          為了實(shí)現(xiàn)這兩個(gè)基本的接口,我們就得實(shí)現(xiàn)兩個(gè)最基本的能力:消息的存儲(chǔ)和查詢(xún)。存儲(chǔ)即是接收發(fā)送過(guò)來(lái)的消息,查詢(xún)則包括業(yè)務(wù)查詢(xún)與系統(tǒng)自行查詢(xún)推送。

          我們先來(lái)看第一個(gè)點(diǎn):消息的存儲(chǔ)。

          直接基于內(nèi)存的消息組件,可以做到非常高效的傳遞,基本上此時(shí)的消息中間件就是由幾個(gè)內(nèi)存隊(duì)列組成,只要保證這幾個(gè)隊(duì)列的安全性和實(shí)時(shí)性,就可以工作得很好了。然而基于內(nèi)存則必然意味著能力有限或者成本相當(dāng)高,所以這樣的設(shè)計(jì)適用范圍得結(jié)合業(yè)務(wù)現(xiàn)狀做下比對(duì)。

          另一個(gè)就是基于磁盤(pán)的消息組件,磁盤(pán)往往意味著更大的存儲(chǔ)空間,或者某種程度上意味著無(wú)限的存儲(chǔ)空間,因?yàn)楫吘顾械拇髷?shù)據(jù)都是存放在磁盤(pán)上的,前提是系統(tǒng)需要協(xié)調(diào)好各磁盤(pán)間的數(shù)據(jù)關(guān)系。然而,磁盤(pán)也意味著性能的下降,數(shù)據(jù)存放起來(lái)更麻煩。但rocketmq借助于操作系統(tǒng)的pagecache和mmap以及順序?qū)憴C(jī)制,在讀寫(xiě)性能方面已經(jīng)非常優(yōu)化。所以,更重要的是如何設(shè)計(jì)好磁盤(pán)的數(shù)據(jù)據(jù)結(jié)構(gòu)。

          然后是第二個(gè)點(diǎn):消息的查詢(xún)。

          具體如何查詢(xún),則必然依賴(lài)于如何存儲(chǔ),與上面的原理類(lèi)似,不必細(xì)說(shuō)。但一般會(huì)有兩種消費(fèi)模型:推送消息模型和拉取消費(fèi)模型。即是消息中間件主動(dòng)向消費(fèi)者推送消息,或者是消費(fèi)者主動(dòng)查詢(xún)消息中間件。二者也各有優(yōu)劣,推送模型一般可以體現(xiàn)出更強(qiáng)的實(shí)時(shí)性以及保持比較小的server端存儲(chǔ)空間占用,但是也帶來(lái)了非常大的復(fù)雜度,它需要處理各種消費(fèi)異常、重試、負(fù)載均衡、上下線,這不是件小事。而拉取模型則會(huì)對(duì)消息中間件減輕許多工作,主要是省去了異常、重試、負(fù)載均衡類(lèi)的工作,將這些工作轉(zhuǎn)嫁到消費(fèi)者客戶(hù)端上。但與此同時(shí),也會(huì)對(duì)消息中間件提出更多要求,即要求能夠保留足夠長(zhǎng)時(shí)間的數(shù)據(jù),以便所有合法的消費(fèi)者都可以進(jìn)行消費(fèi)。而對(duì)于客戶(hù)端,則也需要中間件提供相應(yīng)的便利,以便可以實(shí)現(xiàn)客戶(hù)端的基本訴求,比如消費(fèi)組管理,上下線管理以及最基本的高效查詢(xún)能力。

          2. rocketmq存儲(chǔ)模型設(shè)計(jì)概述

          很明顯,rocketmq的初衷就是要應(yīng)對(duì)大數(shù)據(jù)的消息傳遞,所以其必然是基于磁盤(pán)的存儲(chǔ)。而其性能如上節(jié)所述,其利用操作系統(tǒng)的pagecache和mmap機(jī)制,讀寫(xiě)性能非常好,另外他使用順序?qū)憴C(jī)制,使普通磁盤(pán)也能體現(xiàn)出非常高的性能。

          但是,以上幾項(xiàng),只是為高性能提供了必要的前提。但具體如何利用,還需要從重設(shè)計(jì)。畢竟,快不是目的,實(shí)現(xiàn)需求才是意義。

          rocketmq中主要有四種存儲(chǔ)文件:commitlog 數(shù)據(jù)文件, consumequeue 消費(fèi)隊(duì)列文件, index 索引文件, 元數(shù)據(jù)信息文件。最后一個(gè)元數(shù)據(jù)信息文件比較簡(jiǎn)單,因其數(shù)據(jù)量小,方便操作。但針對(duì)前三個(gè)文件,都會(huì)涉及大量的數(shù)據(jù)問(wèn)題,所以必然好詳細(xì)設(shè)計(jì)其結(jié)構(gòu)。

          從總體上來(lái)說(shuō),rocketmq都遵從定長(zhǎng)數(shù)據(jù)結(jié)構(gòu)存儲(chǔ),定長(zhǎng)的最大好處就在于可以快速定位位置,這是其高性能的出發(fā)點(diǎn)。定長(zhǎng)模型。

          從核心上來(lái)說(shuō),commitlog文件保存了所有原始數(shù)據(jù),所有數(shù)據(jù)想要獲取,都能從或也只能從commitlog文件中獲取,由于commitlog文件保持了順序?qū)懙奶匦裕云湫阅芊浅8摺6驍?shù)據(jù)只有一份,所以也就從根本上保證了數(shù)據(jù)一致性。

          而根據(jù)各業(yè)務(wù)場(chǎng)景,衍生出了consumequeue和index文件,即 consumequeue 文件是為了消費(fèi)者能夠快速獲取到相應(yīng)消息而設(shè)計(jì),而index文件則為了能夠快速搜索到消息而設(shè)計(jì)。從功能上說(shuō),consumequeue和index文件都是索引文件,只是索引的維度不同。consumequeue 是以topic和queueId維度進(jìn)行劃分的索引,而index 則是以時(shí)間和key作為劃分的索引。有了這兩個(gè)索引之后,就可以為各自的業(yè)務(wù)場(chǎng)景,提供高性能的服務(wù)了。具體其如何實(shí)現(xiàn)索引,我們稍后再講!

          commitlog vs consumequeue 的存儲(chǔ)模型如下:

          3. commitlog文件的存儲(chǔ)結(jié)構(gòu)

          直接順序?qū)懙男问酱鎯?chǔ),每個(gè)文件設(shè)定固定大小,默認(rèn)是1G即: 1073741824 bytes. 寫(xiě)滿(mǎn)一個(gè)文件后,新開(kāi)一個(gè)文件寫(xiě)入。文件名就是其存儲(chǔ)的起始消息偏移量。

          官方描述如下:

          CommitLog:消息主體以及元數(shù)據(jù)的存儲(chǔ)主體,存儲(chǔ)Producer端寫(xiě)入的消息主體內(nèi)容,消息內(nèi)容不是定長(zhǎng)的。單個(gè)文件大小默認(rèn)1G ,文件名長(zhǎng)度為20位,左邊補(bǔ)零,剩余為起始偏移量,比如00000000000000000000代表了第一個(gè)文件,起始偏移量為0,文件大小為1G=1073741824;當(dāng)?shù)谝粋€(gè)文件寫(xiě)滿(mǎn)了,第二個(gè)文件為00000000001073741824,起始偏移量為1073741824,以此類(lèi)推。消息主要是順序?qū)懭肴罩疚募?dāng)文件滿(mǎn)了,寫(xiě)入下一個(gè)文件;

          當(dāng)給定一個(gè)偏移量,要查找某條消息時(shí),只需在所有的commitlog文件中,根據(jù)其名字即可知道偏移的數(shù)據(jù)信息是否存在其中,即相當(dāng)于可基于文件實(shí)現(xiàn)一個(gè)二分查找,實(shí)際上rocketmq實(shí)現(xiàn)得更簡(jiǎn)潔,直接一次性查找即可定位:

          // org.apache.rocketmq.store.CommitLog#getData    public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();        // 1. 先在所有commitlog文件中查找到對(duì)應(yīng)所在的 commitlog 分片文件        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);        if (mappedFile != null) {            // 再?gòu)脑摲制募校苿?dòng)余數(shù)的大小偏移,即可定位到要查找的消息記錄了            int pos = (int) (offset % mappedFileSize);            SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);            return result;        }
          return null; } // 查找偏移所在commitlog文件的實(shí)現(xiàn)方式: // org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean) // firstMappedFile.getFileFromOffset() / this.mappedFileSize 代表了第一條記錄所處的文件位置編號(hào) // offset / this.mappedFileSize 代表當(dāng)前offset所處的文件編號(hào) // 那么,兩個(gè)編號(hào)相減就是當(dāng)前offset對(duì)應(yīng)的文件編號(hào),因?yàn)榈谝粋€(gè)文件編號(hào)的相對(duì)位置是0 // 但有個(gè)前提:就是每個(gè)文件存儲(chǔ)的大小必須是真實(shí)的對(duì)應(yīng)的 offset 大小之差,而實(shí)際上consumeQueue根本無(wú)法確定它存了多少offset // 也就是說(shuō),只要文件定長(zhǎng),offset用于定位 commitlog文件就是合理的 int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize)); MappedFile targetFile = null; try { // 所以,此處可以找到 commitlog 文件對(duì)應(yīng)的 mappedFile targetFile = this.mappedFiles.get(index); } catch (Exception ignored) { } if (targetFile != null && offset >= targetFile.getFileFromOffset() && offset < targetFile.getFileFromOffset() + this.mappedFileSize) { return targetFile; } // 如果快速查找失敗,則退回到遍歷方式, 使用O(n)的復(fù)雜度再查找一次 for (MappedFile tmpMappedFile : this.mappedFiles) { if (offset >= tmpMappedFile.getFileFromOffset() && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) { return tmpMappedFile; } }


          定位到具體的消息記錄位置后,如何知道要讀多少數(shù)據(jù)呢?這實(shí)際上在commitlog的數(shù)據(jù)第1個(gè)字節(jié)中標(biāo)明,只需讀出即可知道。

          具體commitlog的存儲(chǔ)實(shí)現(xiàn)如下:

           // org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend    ...    // Initialization of storage space    this.resetByteBuffer(msgStoreItemMemory, msgLen);    // 1 TOTALSIZE, 首先將消息大小寫(xiě)入    this.msgStoreItemMemory.putInt(msgLen);    // 2 MAGICCODE    this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);    // 3 BODYCRC    this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());    // 4 QUEUEID    this.msgStoreItemMemory.putInt(msgInner.getQueueId());    // 5 FLAG    this.msgStoreItemMemory.putInt(msgInner.getFlag());    // 6 QUEUEOFFSET    this.msgStoreItemMemory.putLong(queueOffset);    // 7 PHYSICALOFFSET    this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());    // 8 SYSFLAG    this.msgStoreItemMemory.putInt(msgInner.getSysFlag());    // 9 BORNTIMESTAMP    this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());    // 10 BORNHOST    this.resetByteBuffer(bornHostHolder, bornHostLength);    this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));    // 11 STORETIMESTAMP    this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());    // 12 STOREHOSTADDRESS    this.resetByteBuffer(storeHostHolder, storeHostLength);    this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));    // 13 RECONSUMETIMES    this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());    // 14 Prepared Transaction Offset    this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());    // 15 BODY    this.msgStoreItemMemory.putInt(bodyLength);    if (bodyLength > 0)        this.msgStoreItemMemory.put(msgInner.getBody());    // 16 TOPIC    this.msgStoreItemMemory.put((byte) topicLength);    this.msgStoreItemMemory.put(topicData);    // 17 PROPERTIES    this.msgStoreItemMemory.putShort((short) propertiesLength);    if (propertiesLength > 0)        this.msgStoreItemMemory.put(propertiesData);
          final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // Write messages to the queue buffer byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); ...

          可以看出,commitlog的存儲(chǔ)還是比較簡(jiǎn)單的,因?yàn)槠渲饕褪秦?fù)責(zé)將接收到的所有消息,依次寫(xiě)入同一文件中。因?yàn)閷?zhuān)一所以專(zhuān)業(yè)。

          4. consumequeue文件的存儲(chǔ)結(jié)構(gòu)

          consumequeue作為消費(fèi)者的重要依據(jù),同樣起著非常重要的作用。消費(fèi)者在進(jìn)行消費(fèi)時(shí),會(huì)使用一些偏移量作為依據(jù)(拉取模型實(shí)現(xiàn))。而這些個(gè)偏移量,實(shí)際上就是指的consumequeue的偏移量(注意不是commitlog的偏移量)。這樣做有什么好處呢?首先,consumequeue作為索引文件,它被要求要有非常高的查詢(xún)性能,所以越簡(jiǎn)單越好。最好是能夠一次性定位到數(shù)據(jù)!

          如果想一次性定位數(shù)據(jù),那么唯一的辦法是直接使用commitlog的offset。但這會(huì)帶來(lái)一個(gè)最大的問(wèn)題,就是當(dāng)我當(dāng)前消息消費(fèi)拉取完成后,下一條消息在哪里呢?如果單靠commitlog文件,那么,它必然需要將下一條消息讀入,然后再根據(jù)topic判定是不是需要的數(shù)據(jù)。如此一來(lái),就必然存在大量的commitlog文件的io問(wèn)題了。所以,這看起來(lái)是非常快速的一個(gè)解決方案,最終又變成了非常費(fèi)力的方案了。

          而使用commitlog文件的offset,則好了許多。因?yàn)閏onsumequeue的文件存儲(chǔ)格式是一條消息占20字節(jié),即定長(zhǎng)。根據(jù)這20字節(jié),你可以找到commitlog的offset. 而因?yàn)閏onsumequeue本身就是按照topic/queueId進(jìn)行劃分的,所以,本次消費(fèi)完成后,下一次消費(fèi)的數(shù)據(jù)必定就在consumequeue的下一位置。如此簡(jiǎn)單快速搞得定了。具體consume的存儲(chǔ)格式,如官方描述:

          ConsumeQueue:消息消費(fèi)隊(duì)列,引入的目的主要是提高消息消費(fèi)的性能,由于RocketMQ是基于主題topic的訂閱模式,消息消費(fèi)是針對(duì)主題進(jìn)行的,如果要遍歷commitlog文件中根據(jù)topic檢索消息是非常低效的。Consumer即可根據(jù)ConsumeQueue來(lái)查找待消費(fèi)的消息。其中,ConsumeQueue(邏輯消費(fèi)隊(duì)列)作為消費(fèi)消息的索引,保存了指定Topic下的隊(duì)列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夾的組織方式如下:topic/queue/file三層組織結(jié)構(gòu),具體存儲(chǔ)路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同樣consumequeue文件采取定長(zhǎng)設(shè)計(jì),每一個(gè)條目共20個(gè)字節(jié),分別為8字節(jié)的commitlog物理偏移量、4字節(jié)的消息長(zhǎng)度、8字節(jié)tag hashcode,單個(gè)文件由30W個(gè)條目組成,可以像數(shù)組一樣隨機(jī)訪問(wèn)每一個(gè)條目,每個(gè)ConsumeQueue文件大小約5.72M;

          其中fileName也是以偏移量作為命名依據(jù),因?yàn)檫@樣才能根據(jù)offset快速查找到數(shù)據(jù)所在的分片文件。

          其存儲(chǔ)實(shí)現(xiàn)如下:

              // org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo    private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,        final long cqOffset) {
          if (offset + size <= this.maxPhysicOffset) { log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset); return true; } // 依次寫(xiě)入 offset + size + tagsCode this.byteBufferIndex.flip(); this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); this.byteBufferIndex.putLong(offset); this.byteBufferIndex.putInt(size); this.byteBufferIndex.putLong(tagsCode);
          final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
          MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset); if (mappedFile != null) {
          if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { this.minLogicOffset = expectLogicOffset; this.mappedFileQueue.setFlushedWhere(expectLogicOffset); this.mappedFileQueue.setCommittedWhere(expectLogicOffset); this.fillPreBlank(mappedFile, expectLogicOffset); log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " " + mappedFile.getWrotePosition()); }
          if (cqOffset != 0) { long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
          if (expectLogicOffset < currentLogicOffset) { log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset); return true; }
          if (expectLogicOffset != currentLogicOffset) { LOG_ERROR.warn( "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset ); } } this.maxPhysicOffset = offset + size; // 將buffer寫(xiě)入 consumequeue 的 mappedFile 中 return mappedFile.appendMessage(this.byteBufferIndex.array()); } return false; } 當(dāng)需要進(jìn)行查找進(jìn),也就會(huì)根據(jù)offset, 定位到某個(gè) consumequeue 文件,然后再根據(jù)偏移余數(shù)信息,再找到對(duì)應(yīng)記錄,取出20字節(jié),即是 commitlog信息。此處實(shí)現(xiàn)與 commitlog 的offset查找實(shí)現(xiàn)如出一轍。 // 查找索引所在文件的實(shí)現(xiàn),如下: // org.apache.rocketmq.store.ConsumeQueue#getIndexBuffer public SelectMappedBufferResult getIndexBuffer(final long startIndex) { int mappedFileSize = this.mappedFileSize; // 給到客戶(hù)端的偏移量是除以 20 之后的,也就是說(shuō) 如果上一次的偏移量是 1, 那么下一次的偏移量應(yīng)該是2 // 一次性消費(fèi)多條記錄另算, 自行加減 long offset = startIndex * CQ_STORE_UNIT_SIZE; if (offset >= this.getMinLogicOffset()) { // 委托給mappedFileQueue進(jìn)行查找到單個(gè)具體的consumequeue文件 // 根據(jù) offset 和規(guī)范的命名,可以快速定位分片文件,如上 commitlog 的查找實(shí)現(xiàn) MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset); if (mappedFile != null) { // 再根據(jù)剩余的偏移量,直接類(lèi)似于數(shù)組下標(biāo)的形式,一次性定位到具體的數(shù)據(jù)記錄 SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize)); return result; } } return null; }

          如果想一次性消費(fèi)多條消息,則只需要依次從查找到索引記錄開(kāi)始,依次讀取多條,然后同理回查commitlog即可。即consumequeue的連續(xù),成就了commitlog的不連續(xù)。如下消息拉取實(shí)現(xiàn):

              // org.apache.rocketmq.store.DefaultMessageStore#getMessage    // 其中 bufferConsumeQueue 是剛剛查找出的consumequeue的起始消費(fèi)位置    // 基于此文件迭代,完成多消息記錄消費(fèi)    ...    long nextPhyFileStartOffset = Long.MIN_VALUE;    long maxPhyOffsetPulling = 0;
          int i = 0; final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded(); ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // 依次取出commitlog的偏移量,數(shù)據(jù)大小,hashCode // 一次循環(huán)即是取走一條記錄,多次循環(huán)則依次往下讀取 long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
          maxPhyOffsetPulling = offsetPy;
          if (nextPhyFileStartOffset != Long.MIN_VALUE) { if (offsetPy < nextPhyFileStartOffset) continue; }
          boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
          if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) { break; }
          boolean extRet = false, isTagsCodeLegal = true; if (consumeQueue.isExtAddr(tagsCode)) { extRet = consumeQueue.getExt(tagsCode, cqExtUnit); if (extRet) { tagsCode = cqExtUnit.getTagsCode(); } else { // can't find ext content.Client will filter messages by tag also. log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}", tagsCode, offsetPy, sizePy, topic, group); isTagsCodeLegal = false; } }
          if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; }
          continue; }
          SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); if (null == selectResult) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.MESSAGE_WAS_REMOVING; }
          nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy); continue; }
          if (messageFilter != null && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } // release... selectResult.release(); continue; }
          this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet(); getResult.addMessage(selectResult); status = GetMessageStatus.FOUND; nextPhyFileStartOffset = Long.MIN_VALUE; }
          if (diskFallRecorded) { long fallBehind = maxOffsetPy - maxPhyOffsetPulling; brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind); } // 分配下一次讀取的offset偏移信息,同樣要除以單條索引大小 nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
          long diff = maxOffsetPy - maxPhyOffsetPulling; long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); getResult.setSuggestPullingFromSlave(diff > memory); ...

          以上即理論的實(shí)現(xiàn),無(wú)須多言。

          5. index文件的存儲(chǔ)結(jié)構(gòu)

          index文件是為搜索場(chǎng)景而生的,如果沒(méi)有搜索業(yè)務(wù)需求,則這個(gè)實(shí)現(xiàn)是意義不大的。一般這種搜索,主要用于后臺(tái)查詢(xún)驗(yàn)證類(lèi)使用,或者有其他同的有妙用,不得而知。總之,一切為搜索。它更多的需要借助于時(shí)間限定,以key或者id進(jìn)行查詢(xún)。

          官方描述如下:

          IndexFile(索引文件)提供了一種可以通過(guò)key或時(shí)間區(qū)間來(lái)查詢(xún)消息的方法。Index文件的存儲(chǔ)位置是:$HOME \store\index\${fileName},文件名fileName是以創(chuàng)建時(shí)的時(shí)間戳命名的,固定的單個(gè)IndexFile文件大小約為400M,一個(gè)IndexFile可以保存 2000W個(gè)索引,IndexFile的底層存儲(chǔ)設(shè)計(jì)為在文件系統(tǒng)中實(shí)現(xiàn)HashMap結(jié)構(gòu),故rocketmq的索引文件其底層實(shí)現(xiàn)為hash索引。
          IndexFile索引文件為用戶(hù)提供通過(guò)“按照Message Key查詢(xún)消息”的消息索引查詢(xún)服務(wù),IndexFile文件的存儲(chǔ)位置是:$HOME\store\index\${fileName},文件名fileName是以創(chuàng)建時(shí)的時(shí)間戳命名的,文件大小是固定的,等于40+500W\*4+2000W\*20= 420000040個(gè)字節(jié)大小。如果消息的properties中設(shè)置了UNIQ_KEY這個(gè)屬性,就用 topic + “#” + UNIQ_KEY的value作為 key 來(lái)做寫(xiě)入操作。如果消息設(shè)置了KEYS屬性(多個(gè)KEY以空格分隔),也會(huì)用 topic + “#” + KEY 來(lái)做索引。
          其中的索引數(shù)據(jù)包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 這四個(gè)字段,一共20 Byte。NextIndex offset 即前面讀出來(lái)的 slotValue,如果有 hash沖突,就可以用這個(gè)字段將所有沖突的索引用鏈表的方式串起來(lái)了。Timestamp記錄的是消息storeTimestamp之間的差,并不是一個(gè)絕對(duì)的時(shí)間。整個(gè)Index File的結(jié)構(gòu)如圖,40 Byte 的Header用于保存一些總的統(tǒng)計(jì)信息,4\*500W的 Slot Table并不保存真正的索引數(shù)據(jù),而是保存每個(gè)槽位對(duì)應(yīng)的單向鏈表的頭。20\*2000W 是真正的索引數(shù)據(jù),即一個(gè) Index File 可以保存 2000W個(gè)索引。

          具體結(jié)構(gòu)圖如下:

          那么,如果要查找一個(gè)key, 應(yīng)當(dāng)如何查找呢?rocketmq會(huì)根據(jù)時(shí)間段找到一個(gè)index索引分版,然后再根據(jù)key做hash得到一個(gè)值,然后定位到 slotValue . 然后再?gòu)膕lotValue去取出索引數(shù)據(jù)的地址,找到索引數(shù)據(jù),然后再回查 commitlog 文件。從而得到具體的消息數(shù)據(jù)。也就是,相當(dāng)于搜索經(jīng)歷了四級(jí)查詢(xún):索引分片文件查詢(xún) -> slotValue 查詢(xún) -> 索引數(shù)據(jù)查詢(xún) -> commitlog 查詢(xún) 。?

          具體查找實(shí)現(xiàn)如下:

              // org.apache.rocketmq.broker.processor.QueryMessageProcessor#queryMessage    public RemotingCommand queryMessage(ChannelHandlerContext ctx, RemotingCommand request)        throws RemotingCommandException {        final RemotingCommand response =            RemotingCommand.createResponseCommand(QueryMessageResponseHeader.class);        final QueryMessageResponseHeader responseHeader =            (QueryMessageResponseHeader) response.readCustomHeader();        final QueryMessageRequestHeader requestHeader =            (QueryMessageRequestHeader) request                .decodeCommandCustomHeader(QueryMessageRequestHeader.class);
          response.setOpaque(request.getOpaque());
          String isUniqueKey = request.getExtFields().get(MixAll.UNIQUE_MSG_QUERY_FLAG); if (isUniqueKey != null && isUniqueKey.equals("true")) { requestHeader.setMaxNum(this.brokerController.getMessageStoreConfig().getDefaultQueryMaxNum()); } // 從索引文件中查詢(xún)消息 final QueryMessageResult queryMessageResult = this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(), requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(), requestHeader.getEndTimestamp()); assert queryMessageResult != null;
          responseHeader.setIndexLastUpdatePhyoffset(queryMessageResult.getIndexLastUpdatePhyoffset()); responseHeader.setIndexLastUpdateTimestamp(queryMessageResult.getIndexLastUpdateTimestamp());
          if (queryMessageResult.getBufferTotalSize() > 0) { response.setCode(ResponseCode.SUCCESS); response.setRemark(null);
          try { FileRegion fileRegion = new QueryMessageTransfer(response.encodeHeader(queryMessageResult .getBufferTotalSize()), queryMessageResult); ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { queryMessageResult.release(); if (!future.isSuccess()) { log.error("transfer query message by page cache failed, ", future.cause()); } } }); } catch (Throwable e) { log.error("", e); queryMessageResult.release(); }
          return null; }
          response.setCode(ResponseCode.QUERY_NOT_FOUND); response.setRemark("can not find message, maybe time range not correct"); return response; } // org.apache.rocketmq.store.DefaultMessageStore#queryMessage @Override public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) { QueryMessageResult queryMessageResult = new QueryMessageResult();
          long lastQueryMsgTime = end;
          for (int i = 0; i < 3; i++) { // 委托給 indexService 搜索記錄, 時(shí)間是必備參數(shù) QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime); if (queryOffsetResult.getPhyOffsets().isEmpty()) { break; }
          Collections.sort(queryOffsetResult.getPhyOffsets());
          queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset()); queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp());
          for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) { long offset = queryOffsetResult.getPhyOffsets().get(m);
          try {
          boolean match = true; MessageExt msg = this.lookMessageByOffset(offset); if (0 == m) { lastQueryMsgTime = msg.getStoreTimestamp(); }
          if (match) { SelectMappedBufferResult result = this.commitLog.getData(offset, false); if (result != null) { int size = result.getByteBuffer().getInt(0); result.getByteBuffer().limit(size); result.setSize(size); queryMessageResult.addMessage(result); } } else { log.warn("queryMessage hash duplicate, {} {}", topic, key); } } catch (Exception e) { log.error("queryMessage exception", e); } }
          if (queryMessageResult.getBufferTotalSize() > 0) { break; }
          if (lastQueryMsgTime < begin) { break; } }
          return queryMessageResult; }
          public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) { List phyOffsets = new ArrayList(maxNum);
          long indexLastUpdateTimestamp = 0; long indexLastUpdatePhyoffset = 0; maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch()); try { this.readWriteLock.readLock().lock(); if (!this.indexFileList.isEmpty()) { //從最后一個(gè)索引文件,依次搜索 for (int i = this.indexFileList.size(); i > 0; i--) { IndexFile f = this.indexFileList.get(i - 1); boolean lastFile = i == this.indexFileList.size(); if (lastFile) { indexLastUpdateTimestamp = f.getEndTimestamp(); indexLastUpdatePhyoffset = f.getEndPhyOffset(); } // 判定該時(shí)間段是否數(shù)據(jù)是否在該索引文件中 if (f.isTimeMatched(begin, end)) { // 構(gòu)建出 key的hash, 然后查找 slotValue, 然后得以索引數(shù)據(jù), 然后將offset放入 phyOffsets 中 f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile); }
          if (f.getBeginTimestamp() < begin) { break; }
          if (phyOffsets.size() >= maxNum) { break; } } } } catch (Exception e) { log.error("queryMsg exception", e); } finally { this.readWriteLock.readLock().unlock(); }
          return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset); } // org.apache.rocketmq.store.index.IndexFile#selectPhyOffset public void selectPhyOffset(final List phyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock) { if (this.mappedFile.hold()) { int keyHash = indexKeyHashMethod(key); int slotPos = keyHash % this.hashSlotNum; int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
          FileLock fileLock = null; try { int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
          if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount() || this.indexHeader.getIndexCount() <= 1) { // 超出搜索范圍,不處理 } else { for (int nextIndexToRead = slotValue; ; ) { if (phyOffsets.size() >= maxNum) { break; }
          int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + nextIndexToRead * indexSize; // 依次讀出 keyHash+offset+timeDiff+nextOffset int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos); long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
          long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8); int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
          if (timeDiff < 0) { break; }
          timeDiff *= 1000L; // 根據(jù)文件名可得到索引寫(xiě)入時(shí)間 long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
          if (keyHash == keyHashRead && timeMatched) { phyOffsets.add(phyOffsetRead); }
          if (prevIndexRead <= invalidIndex || prevIndexRead > this.indexHeader.getIndexCount() || prevIndexRead == nextIndexToRead || timeRead < begin) { break; }
          nextIndexToRead = prevIndexRead; } } } catch (Exception e) { log.error("selectPhyOffset exception ", e); } finally { if (fileLock != null) { try { fileLock.release(); } catch (IOException e) { log.error("Failed to release the lock", e); } }
          this.mappedFile.release(); } } }


          看起來(lái)挺費(fèi)勁,但真正處理起來(lái)性能還好,雖然沒(méi)有consumequeue高效,但有mmap和pagecache的加持,效率還是扛扛的。而且,搜索相對(duì)慢一些,用戶(hù)也是可以接受的嘛。畢竟這只是一個(gè)附加功能,并非核心所在。

          而索引文件并沒(méi)有使用什么高效的搜索算法,而是簡(jiǎn)單從最后一個(gè)文件遍歷完成,因?yàn)闀r(shí)間戳不一定總是有規(guī)律的,與其隨意查找,還不如直接線性查找。另外,實(shí)際上對(duì)于索引重建問(wèn)題,搜索可能不一定會(huì)有效。不過(guò),我們可以通過(guò)擴(kuò)大搜索時(shí)間范圍的方式,總是能夠找到存在的數(shù)據(jù)。而且因其使用hash索引實(shí)現(xiàn),性能還是不錯(cuò)的。

          另外,index索引文件與commitlog和consumequeue有一個(gè)不一樣的地方,就是它不能進(jìn)行順序?qū)懀驗(yàn)閔ash存儲(chǔ),寫(xiě)一定是任意的。且其slotValue以一些統(tǒng)計(jì)信息可能隨時(shí)發(fā)生變化,這也給順序?qū)憥?lái)了不可解決的問(wèn)題。

          其具體寫(xiě)索引過(guò)程如下:

              // org.apache.rocketmq.store.index.IndexFile#putKey    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {        if (this.indexHeader.getIndexCount() < this.indexNum) {            int keyHash = indexKeyHashMethod(key);            int slotPos = keyHash % this.hashSlotNum;            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
          FileLock fileLock = null;
          try { // 先嘗試?yán)lot對(duì)應(yīng)的數(shù)據(jù) // 如果為0則說(shuō)明是第一次寫(xiě)入, 否則為當(dāng)前的索引條數(shù) int slotValue = this.mappedByteBuffer.getInt(absSlotPos); if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) { slotValue = invalidIndex; }
          long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
          timeDiff = timeDiff / 1000;
          if (this.indexHeader.getBeginTimestamp() <= 0) { timeDiff = 0; } else if (timeDiff > Integer.MAX_VALUE) { timeDiff = Integer.MAX_VALUE; } else if (timeDiff < 0) { timeDiff = 0; } // 直接計(jì)算出本次存儲(chǔ)的索引記錄位置 // 因索引條數(shù)只會(huì)依次增加,故索引數(shù)據(jù)將表現(xiàn)為順序?qū)憳幼樱饕潜WC了數(shù)據(jù)不會(huì)寫(xiě)沖突了 int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize; // 按協(xié)議寫(xiě)入內(nèi)容即可 this.mappedByteBuffer.putInt(absIndexPos, keyHash); this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); // 寫(xiě)入slotValue為當(dāng)前可知的索引記錄條數(shù) // 即每次寫(xiě)入索引之后,如果存在hash沖突,那么它會(huì)寫(xiě)入自身的位置 // 而此時(shí) slotValue 必定存在一個(gè)值,那就是上一個(gè)發(fā)生沖突的索引,從而形成自然的鏈表 // 查找數(shù)據(jù)時(shí),只需根據(jù)slotValue即可以找到上一個(gè)寫(xiě)入的索引,這設(shè)計(jì)妙哉! // 做了2點(diǎn)關(guān)鍵性保證:1. 數(shù)據(jù)自增不沖突; 2. hash沖突自刷新; 磁盤(pán)版的hash結(jié)構(gòu)已然形成 this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
          if (this.indexHeader.getIndexCount() <= 1) { this.indexHeader.setBeginPhyOffset(phyOffset); this.indexHeader.setBeginTimestamp(storeTimestamp); }
          if (invalidIndex == slotValue) { this.indexHeader.incHashSlotCount(); } this.indexHeader.incIndexCount(); this.indexHeader.setEndPhyOffset(phyOffset); this.indexHeader.setEndTimestamp(storeTimestamp);
          return true; } catch (Exception e) { log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e); } finally { if (fileLock != null) { try { fileLock.release(); } catch (IOException e) { log.error("Failed to release the lock", e); } } } } else { log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount() + "; index max num = " + this.indexNum); }
          return false; }

          rocketmq 巧妙地使用了自增結(jié)構(gòu)和hash slot, 完美實(shí)現(xiàn)一個(gè)磁盤(pán)版的hash索引。相信這也會(huì)給我們平時(shí)的工作帶來(lái)一些提示。

          6. 寫(xiě)在最后

          以上就是本文對(duì)rocketmq的存儲(chǔ)模型設(shè)計(jì)的解析了,通過(guò)這些解析,相信大家對(duì)其工作原理也會(huì)有質(zhì)的理解。存儲(chǔ)實(shí)際上是目前我們的許多的系統(tǒng)中的非常核心部分,因?yàn)榇蟛糠值臉I(yè)務(wù)幾乎都是在存儲(chǔ)之前做一些簡(jiǎn)單的計(jì)算。

          很顯然業(yè)務(wù)很重要,但有了存儲(chǔ)的底子,還何愁業(yè)務(wù)實(shí)現(xiàn)難?




          往期精彩推薦



          騰訊、阿里、滴滴后臺(tái)面試題匯總總結(jié) — (含答案)

          面試:史上最全多線程面試題 !

          最新阿里內(nèi)推Java后端面試題

          JVM難學(xué)?那是因?yàn)槟銢](méi)認(rèn)真看完這篇文章


          END


          關(guān)注作者微信公眾號(hào) —《JAVA爛豬皮》


          了解更多java后端架構(gòu)知識(shí)以及最新面試寶典


          你點(diǎn)的每個(gè)好看,我都認(rèn)真當(dāng)成了


          看完本文記得給作者點(diǎn)贊+在看哦~~~大家的支持,是作者源源不斷出文的動(dòng)力


          作者:等你歸去來(lái)

          出處:https://www.cnblogs.com/yougewe/p/14224366.html

          瀏覽 56
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲电影在线观看 | 热久久免费视频在线观看 | 亚洲欧美一级 | 日本黄色一级 | 超碰自拍 |