通過(guò)這三個(gè)文件徹底搞懂rocketmq的存儲(chǔ)原理
RocketMQ是阿里開(kāi)發(fā)的一個(gè)高性能的消息隊(duì)列,支持各種消息類型,而且支持事務(wù)消息,可以說(shuō)是現(xiàn)在的很多系統(tǒng)中的香餑餑了,所以呢,怎么使用大家肯定是要學(xué)習(xí)的
?
我們作為一個(gè)有夢(mèng)想的程序員,在學(xué)習(xí)一門技術(shù)的時(shí)候,肯定是不能光知其然,這是遠(yuǎn)遠(yuǎn)不夠的,我們必須要知其所以然,這樣才能在面試的時(shí)候侃侃而談,啊呸,不對(duì),這樣我們才能在工作中遇到問(wèn)題的時(shí)候,理性的去思考如何解決問(wèn)題
?
我們知道RocketMQ的架構(gòu)是producer、NameServer、broker、Consumer,producer是生產(chǎn)消息的,NameServer是路由中心,負(fù)責(zé)服務(wù)的注冊(cè)發(fā)現(xiàn)以及路由管理這些。
Consumer是屬于消費(fèi)消息的,broker則屬于真正的存儲(chǔ)消息,以及進(jìn)行消息的持久化,也就是存儲(chǔ)消息的文件和索引消息的文件都在broker上
?
消息隊(duì)列的主要作用是解耦異步削峰,也就意味著消息隊(duì)列中的存儲(chǔ)功能是必不可少的,而隨著時(shí)代的發(fā)展,業(yè)務(wù)量的增加也對(duì)消息隊(duì)列的存儲(chǔ)功能的強(qiáng)度的要求越來(lái)越高了
?
也就是說(shuō)你不能光性能好,你得存儲(chǔ)的消息也得足夠支撐我的業(yè)務(wù)量,你只能存儲(chǔ)100MB的消息,我這系統(tǒng)每分鐘的消息業(yè)務(wù)量可能500MB了,那肯定不夠使啊,那還削個(gè)啥的峰啊,峰來(lái)了你自己都頂不住

?
RocketMQ憑借其強(qiáng)大的存儲(chǔ)能力和強(qiáng)大的消息索引能力,以及各種類型消息和消息的特性脫穎而出,于是乎,我們這些有夢(mèng)想的程序員學(xué)習(xí)RocketMQ的存儲(chǔ)原理也變得尤為重要
?
而要說(shuō)起這個(gè)存儲(chǔ)原理,則不得不說(shuō)的就是RocketMQ的消息存儲(chǔ)文件commitLog文件,消費(fèi)方則是憑借著巧妙的設(shè)計(jì)Consumerqueue文件來(lái)進(jìn)行高性能并且不混亂的消費(fèi),還有RocketMQ的強(qiáng)大的支持消息索引的特性,靠的就是indexfile索引文件
?
我們這篇文章就從這commitLog、Consumerqueue、indexfile這三個(gè)神秘的文件說(shuō)起,搞懂這三個(gè)文件,RocketMQ的核心就被你掏空了
?
先上個(gè)圖,寫入commitLog文件時(shí)commitLog和Consumerqueue、indexfile文件三者的關(guān)系

Commitlog文件
大小和命名規(guī)則

文件存儲(chǔ)規(guī)則和特點(diǎn)

簡(jiǎn)單看下源碼
public PutMessageResult putMessage(MessageExtBrokerInner msg) {if (this.shutdown) {log.warn("message store has shutdown, so putMessage is forbidden");return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);}// 從節(jié)點(diǎn)不允許寫入if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {long value = this.printTimes.getAndIncrement();if ((value % 50000) == 0) {log.warn("message store is slave mode, so putMessage is forbidden ");}return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);}// store是否允許寫入if (!this.runningFlags.isWriteable()) {long value = this.printTimes.getAndIncrement();if ((value % 50000) == 0) {log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());}return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);} else {this.printTimes.set(0);}// topic過(guò)長(zhǎng)if (msg.getTopic().length() > Byte.MAX_VALUE) {log.warn("putMessage message topic length too long " + msg.getTopic().length());return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);}// 消息附加屬性過(guò)長(zhǎng)if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);}if (this.isOSPageCacheBusy()) {return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);}long beginTime = this.getSystemClock().now();// 添加消息到commitLogPutMessageResult result = this.commitLog.putMessage(msg);long eclipseTime = this.getSystemClock().now() - beginTime;if (eclipseTime > 500) {log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);}this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);if (null == result || !result.isOk()) {this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();}return result;}
consumerQueue文件

存放位置和結(jié)構(gòu)

ConsumeQueue的作用
offsetTable.offset
簡(jiǎn)單看下構(gòu)建過(guò)程
public void run() {while (!this.isStopped()) {try {Thread.sleep(1);this.doReput(); // 構(gòu)建ComsumerQueue} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}}}
private void doReput() {for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);// 拿到所有的最新寫入CommitLog的數(shù)據(jù)if (result != null) {try {this.reputFromOffset = result.getStartOffset();for (int readSize = 0; readSize < result.getSize() && doNext; ) {DispatchRequest dispatchRequest =DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); // 一條一條的讀消息int size = dispatchRequest.getMsgSize();if (dispatchRequest.isSuccess()) {if (size > 0) {DefaultMessageStore.this.doDispatch(dispatchRequest); // 派發(fā)消息,進(jìn)行處理,其中就包括構(gòu)建ComsumerQueuethis.reputFromOffset += size;readSize += size;} else if (size == 0) { //this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);readSize = result.getSize();}} else if (!dispatchRequest.isSuccess()) { // 獲取消息異常if (size > 0) {log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);this.reputFromOffset += size;} else {doNext = false;if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {this.reputFromOffset += result.getSize() - readSize;}}}}} finally {result.release();}} else {doNext = false;}}}
indexFile文件
文件結(jié)構(gòu)

文件詳細(xì)信息
//8位 該索引文件的第一個(gè)消息(Message)的存儲(chǔ)時(shí)間(落盤時(shí)間)this.byteBuffer.putLong(beginTimestampIndex, this.beginTimestamp.get());//8位 該索引文件的最后一個(gè)消息(Message)的存儲(chǔ)時(shí)間(落盤時(shí)間)this.byteBuffer.putLong(endTimestampIndex, this.endTimestamp.get());//8位 該索引文件第一個(gè)消息(Message)的在CommitLog(消息存儲(chǔ)文件)的物理位置偏移量(可以通過(guò)該物理偏移直接獲取到該消息)this.byteBuffer.putLong(beginPhyoffsetIndex, this.beginPhyOffset.get());//8位 該索引文件最后一個(gè)消息(Message)的在CommitLog(消息存儲(chǔ)文件)的物理位置偏移量this.byteBuffer.putLong(endPhyoffsetIndex, this.endPhyOffset.get());//4位 該索引文件目前的hash slot的個(gè)數(shù)this.byteBuffer.putInt(hashSlotcountIndex, this.hashSlotCount.get());//4位 索引文件目前的索引個(gè)數(shù)this.byteBuffer.putInt(indexCountIndex, this.indexCount.get());
//slot的數(shù)據(jù)存放位置 40 + keyHash %(500W)* 4int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;//Slot Table//4字節(jié)//記錄該slot當(dāng)前index,如果hash沖突(即absSlotPos一致)作為下一次該slot新增的前置indexthis.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
//Index Linked list//topic+message key的hash值this.mappedByteBuffer.putInt(absIndexPos, keyHash);//消息在CommitLog的物理文件地址, 可以直接查詢到該消息(索引的核心機(jī)制)this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);//消息的落盤時(shí)間與header里的beginTimestamp的差值(為了節(jié)省存儲(chǔ)空間,如果直接存message的落盤時(shí)間就得8bytes)this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);//9、記錄該slot上一個(gè)index//hash沖突處理的關(guān)鍵之處, 相同hash值上一個(gè)消息索引的index(如果當(dāng)前消息索引是該hash值的第一個(gè)索引,則prevIndex=0, 也是消息索引查找時(shí)的停止條件),每個(gè)slot位置的第一個(gè)消息的prevIndex就是0的this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
再論結(jié)構(gòu)

查詢流程
公式1:第n個(gè)slot在indexFile中的起始位置是這樣:40+(n-1)*4 公式2:第s個(gè)index在indexFile中的起始位置是這樣:40+5000000*4+(s-1)*20
包括當(dāng)前機(jī)器IP+進(jìn)程號(hào)+MessageClientIDSetter.class.getClassLoader()的hashCode值+消息生產(chǎn)時(shí)間與broker啟動(dòng)時(shí)間的差值+broker啟動(dòng)后從0開(kāi)始單調(diào)自增的int值,前面三項(xiàng)很明顯可能重復(fù),后面兩項(xiàng)一個(gè)是時(shí)間差,一個(gè)是重啟歸零,也可能重復(fù)
簡(jiǎn)單看下源碼,感興趣的下載源碼去研究
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {//1. 判斷該索引文件的索引數(shù)小于最大的索引數(shù),如果>=最大索引數(shù),IndexService就會(huì)嘗試新建一個(gè)索引文件if (this.indexHeader.getIndexCount() < this.indexNum) {//2. 計(jì)算該message key的hash值int keyHash = indexKeyHashMethod(key);//3. 根據(jù)message key的hash值散列到某個(gè)hash slot里int slotPos = keyHash % this.hashSlotNum;//4. 計(jì)算得到該hash slot的實(shí)際文件位置Positionint absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;try {//5. 根據(jù)該hash slot的實(shí)際文件位置absSlotPos得到slot里的值//這里有兩種情況://1). slot=0, 當(dāng)前message的key是該hash值第一個(gè)消息索引//2). slot>0, 該key hash值上一個(gè)消息索引的位置int slotValue = this.mappedByteBuffer.getInt(absSlotPos);//6. 數(shù)據(jù)校驗(yàn)及修正if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {slotValue = invalidIndex;}long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();timeDiff = timeDiff / 1000;if (this.indexHeader.getBeginTimestamp() <= 0) {timeDiff = 0;} else if (timeDiff > Integer.MAX_VALUE) {timeDiff = Integer.MAX_VALUE;} else if (timeDiff < 0) {timeDiff = 0;}//7. 計(jì)算當(dāng)前消息索引具體的存儲(chǔ)位置(Append模式)int absIndexPos =IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize+ this.indexHeader.getIndexCount() * indexSize;//8. 存入該消息索引this.mappedByteBuffer.putInt(absIndexPos, keyHash);this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);//9. 關(guān)鍵之處:在該key hash slot處存入當(dāng)前消息索引的位置,下次通過(guò)該key進(jìn)行搜索時(shí)//會(huì)找到該key hash slot -> slot value -> curIndex ->//if(curIndex.prevIndex>0) pre index (一直循環(huán) 直至該curIndex.prevIndex==0就停止)this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());if (this.indexHeader.getIndexCount() <= 1) {this.indexHeader.setBeginPhyOffset(phyOffset);this.indexHeader.setBeginTimestamp(storeTimestamp);}this.indexHeader.incHashSlotCount();this.indexHeader.incIndexCount();this.indexHeader.setEndPhyOffset(phyOffset);this.indexHeader.setEndTimestamp(storeTimestamp);return true;} catch (Exception e) {log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);}} else {log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()+ "; index max num = " + this.indexNum);}return false;}
public void selectPhyOffset(final ListphyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock) {if (this.mappedFile.hold()) {//1. 計(jì)算該key的hashint keyHash = indexKeyHashMethod(key);//2. 計(jì)算該hash value 對(duì)應(yīng)的hash slot位置int slotPos = keyHash % this.hashSlotNum;//3. 計(jì)算該hash value 對(duì)應(yīng)的hash slot物理文件位置int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;FileLock fileLock = null;try {//4. 取出該hash slot 的值int slotValue = this.mappedByteBuffer.getInt(absSlotPos);//5. 該slot value <= 0 就代表沒(méi)有該key對(duì)應(yīng)的消息索引,直接結(jié)束搜索// 該slot value > maxIndexCount 就代表該key對(duì)應(yīng)的消息索引超過(guò)最大限制,數(shù)據(jù)有誤,直接結(jié)束搜索if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()|| this.indexHeader.getIndexCount() <= 1) {} else {//6. 從當(dāng)前slot value 開(kāi)始搜索for (int nextIndexToRead = slotValue; ; ) {if (phyOffsets.size() >= maxNum) {break;}//7. 找到當(dāng)前slot value(也就是index count)物理文件位置int absIndexPos =IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize+ nextIndexToRead * indexSize;//8. 讀取消息索引數(shù)據(jù)int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);//9. 獲取該消息索引的上一個(gè)消息索引index(可以看成鏈表的prev 指向上一個(gè)鏈節(jié)點(diǎn)的引用)int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);//10. 數(shù)據(jù)校驗(yàn)if (timeDiff < 0) {break;}timeDiff *= 1000L;long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;boolean timeMatched = (timeRead >= begin) && (timeRead <= end);//10. 數(shù)據(jù)校驗(yàn)比對(duì) hash值和落盤時(shí)間if (keyHash == keyHashRead && timeMatched) {phyOffsets.add(phyOffsetRead);}//當(dāng)prevIndex <= 0 或prevIndex > maxIndexCount 或prevIndexRead == nextIndexToRead 或 timeRead < begin 停止搜索if (prevIndexRead <= invalidIndex|| prevIndexRead > this.indexHeader.getIndexCount()|| prevIndexRead == nextIndexToRead || timeRead < begin) {break;}nextIndexToRead = prevIndexRead;}}} catch (Exception e) {log.error("selectPhyOffset exception ", e);} finally {this.mappedFile.release();}}}
有道無(wú)術(shù),術(shù)可成;有術(shù)無(wú)道,止于術(shù)
歡迎大家關(guān)注Java之道公眾號(hào)
好文章,我在看??
