圖文并茂!深入了解RocketMQ的過(guò)期刪除機(jī)制
大家好,我是Leo。
今天聊一下RocketMQ的文件過(guò)期刪除機(jī)制
本章概括

源碼定位
Broker是RocketMQ的核心,提供了消息的接收,存儲(chǔ),拉取等功能
我們可以先從Broker服務(wù)入手。從源碼可以得知。RocketMQ啟用了一個(gè)?BrokerController?的?start?函數(shù)
public?static?void?main(String[]?args)?{
????start(createBrokerController(args));
}
public?static?BrokerController?start(BrokerController?controller)?{
????try?{
????????controller.start();
????????String?tip?=?"The?broker[";
????????if?(null?!=?controller.getBrokerConfig().getNamesrvAddr())?{
????????????//?日志拼接
????????}
????????log.info(tip);
????????System.out.printf("%s%n",?tip);
????????return?controller;
????}?catch?(Throwable?e)?{
????????e.printStackTrace();
????????System.exit(-1);
????}
????return?null;
}
下列是start?函數(shù)啟動(dòng)的異步線程,他啟動(dòng)了一個(gè)?messageStore
public?void?start()?throws?Exception?{
????????if?(this.messageStore?!=?null)?{
????????????this.messageStore.start();
????????}
????}
從?messageStore.start()?函數(shù)進(jìn)入后會(huì)有一個(gè)消息存儲(chǔ)的第三方接口。
public?interface?MessageStore?{
????/**
?????*?Load?previously?stored?messages.
?????*
?????*?@return?true?if?success;?false?otherwise.
?????*/
????boolean?load();
????/**
?????*?Launch?this?message?store.
?????*
?????*?@throws?Exception?if?there?is?any?error.
?????*/
????void?start()?throws?Exception;
}
繼續(xù)圍繞?start?函數(shù)展開實(shí)現(xiàn)類查找,可以看到最終由?DefaultMessageStore?實(shí)現(xiàn)類實(shí)現(xiàn)

定位到具體問(wèn)題之后,可以看到?start?調(diào)用了一個(gè)?addScheduleTask?函數(shù)
這個(gè)函數(shù)主要處理的就是清除過(guò)期日志服務(wù)。
?public?void?start()?throws?Exception?{
?????//刷新ConsumeQueue的服務(wù)啟動(dòng)
?????this.flushConsumeQueueService.start();
?????//CommitLog刷新的服務(wù)啟動(dòng)
?????this.commitLog.start();
?????//存儲(chǔ)狀態(tài)檢測(cè)的服務(wù)啟動(dòng)
?????this.storeStatsService.start();
?????//創(chuàng)建臨時(shí)文件,來(lái)表示是否正常關(guān)機(jī)
?????this.createTempFile();
?????//啟動(dòng)其他服務(wù)。比如清除過(guò)期日志的服務(wù)等
?????this.addScheduleTask();
?????this.shutdown?=?false;
?}
這篇文件聊的就是這個(gè)?addScheduleTask?函數(shù)。言歸正傳,步入正題!
流程圖

過(guò)期刪除機(jī)制
文件過(guò)期刪除
首次執(zhí)行時(shí)間是60000毫秒=60秒。其余間隔執(zhí)行都是每10秒執(zhí)行一次刪除。
//?資源回收間隔
private?int?cleanResourceInterval?=?10000;
/**
??*?{}要執(zhí)行的任務(wù)
??*?1.延遲第一次執(zhí)行的時(shí)間
??*?2.兩次執(zhí)行之間的時(shí)間?10000?資源回收間隔
??*?3.毫秒
*/
this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{
???????@Override
???????public?void?run()?{
?????????????DefaultMessageStore.this.cleanFilesPeriodically();
???????}
//?延遲第一次執(zhí)行的時(shí)間
},?1000?*?60,?this.messageStoreConfig.getCleanResourceInterval(),?TimeUnit.MILLISECONDS);

對(duì)于刪除過(guò)期的時(shí)機(jī)包括以下3種:
- 默認(rèn)凌晨4點(diǎn)。這個(gè)也比較好理解,這個(gè)時(shí)候用的人也比較少,刪除對(duì)系統(tǒng)的影響就降到最小。
- 磁盤空間不足。當(dāng)磁盤空間不足的時(shí)候,就要?jiǎng)h除過(guò)期文件以提供更多的空間出來(lái)接收消息。
- 人工觸發(fā),指人為的介入去刪除。

刪除的文件是過(guò)期文件,那哪些文件是過(guò)期的呢?
首先是保留時(shí)間,默認(rèn)72小時(shí),也就是3天,超過(guò)3天的數(shù)據(jù),是需要?jiǎng)h除的。
deleteExpiredFiles?是用于刪除過(guò)期文件。執(zhí)行步驟如下:
- 首先是需要判斷是否需要?jiǎng)h除文件,通過(guò)兩個(gè)方法的調(diào)用
isTimeToDelete和isSpaceToDelete判斷是否達(dá)到定時(shí)刪除時(shí)間以及是否磁盤已滿需要?jiǎng)h除,以及判斷屬性DefaultMessageStore.CleanCommitLogService#manualDeleteFileSeveralTimes是否大于0意味著需要手動(dòng)刪除。如果這三個(gè)條件任意為真,意味著需要執(zhí)行刪除,那就繼續(xù)后續(xù)的流程。否則結(jié)束當(dāng)前方法。 - 如果是手動(dòng)刪除,則屬性
DefaultMessageStore.CleanCommitLogService#manualDeleteFileSeveralTimes減1. - 如果屬性
MessageStoreConfig#cleanFileForciblyEnable和DefaultMessageStore.CleanCommitLogService#cleanImmediately為真,聲明cleanAtOnece為true,否則為false。 - 調(diào)用方法?
CommitLog#deleteExpiredFile?進(jìn)行文件刪除。方法需要4個(gè)入?yún)?,分別是:- expiredTime:過(guò)期時(shí)間或者說(shuō)文件刪除前的保留時(shí)間,默認(rèn)為72小時(shí)。
- deleteFilesInterval:文件刪除間隔,這里取值為100.
- intervalForcibly:該參數(shù)用于強(qiáng)制文件強(qiáng)制釋放時(shí)間間隔,單位是毫秒。這里取值為120*1000,
- cleanImmediately:是否立即執(zhí)行刪除,這邊使用的就是步驟3中的數(shù)據(jù)。
/**
?*?刪除已經(jīng)失效的
?*/
private?void?deleteExpiredFiles()?{
????int?deleteCount?=?0;
????//?文件保留時(shí)長(zhǎng)?72
????long?fileReservedTime?=?DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
????//?100
????int?deletePhysicFilesInterval?=?DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
????//?1000?*?120?=?120000毫秒?=?120秒
????int?destroyMapedFileIntervalForcibly?=?DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
????//?判斷有沒(méi)到凌晨4點(diǎn)
????boolean?timeup?=?this.isTimeToDelete();
????//?空間是否上限
????boolean?spacefull?=?this.isSpaceToDelete();
????//?手動(dòng)刪除??經(jīng)過(guò)20次的調(diào)度
????boolean?manualDelete?=?this.manualDeleteFileSeveralTimes?>?0;
????if?(timeup?||?spacefull?||?manualDelete)?{
????????if?(manualDelete)
????????????this.manualDeleteFileSeveralTimes--;
????????boolean?cleanAtOnce?=?DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable()?&&?this.cleanImmediately;
????????log.info("begin?to?delete?before?{}?hours?file.?timeup:?{}?spacefull:?{}?manualDeleteFileSeveralTimes:?{}?cleanAtOnce:?{}",?fileReservedTime,?timeup,?spacefull,?manualDeleteFileSeveralTimes,?cleanAtOnce);
????????fileReservedTime?*=?60?*?60?*?1000;
????????deleteCount?=?DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime,?deletePhysicFilesInterval,?destroyMapedFileIntervalForcibly,?cleanAtOnce);
????????if?(deleteCount?>?0)?{}?else?if?(spacefull)?{
????????????//?刪除文件失敗
????????????log.warn("disk?space?will?be?full?soon,?but?delete?file?failed.");
????????}
????}
}
如果這個(gè)文件被其他線程引用了,此時(shí)就不會(huì)進(jìn)行刪除,記錄第一次刪除的時(shí)間戳,退出本次任務(wù),等120s后,就會(huì)把文件的引用減1000,再?gòu)?qiáng)制刪除。
在刪除的過(guò)程中,會(huì)存在刪除多個(gè)文件的情況,每個(gè)文件之間,還有一個(gè)時(shí)間間隔,比如第一個(gè)文件刪除完后,需要等100ms再刪除第二個(gè)文件。
120s可以通過(guò) destroyMapedFileIntervalForcibly 得知
100ms可以通過(guò) deletePhysicFilesInterval 得知

如果當(dāng)前刪除的文件數(shù)量,已經(jīng)超過(guò)了可以刪除的最大批量數(shù),則退出本次任務(wù)??梢酝ㄟ^(guò)上述代碼中的?spacefull?得出
/**
?*?根據(jù)時(shí)間刪除過(guò)期文件
?*?@param?expiredTime?保留時(shí)長(zhǎng)??一般是?72
?*?@param?deleteFilesInterval?刪除間隔?100
?*?@param?intervalForcibly?120秒?延遲
?*?@param?cleanImmediately?是否強(qiáng)制啟用
?*?@return
?*/
public?int?deleteExpiredFileByTime(final?long?expiredTime,?final?int?deleteFilesInterval,?final?long?intervalForcibly,?final?boolean?cleanImmediately)?{
????//獲取映射文件列表?commitlog文件可能隨時(shí)有寫入,copy一份不影響寫入
????Object[]?mfs?=?this.copyMappedFiles(0);
????//如果映射文件列表為空直接返回
????if?(null?==?mfs)
????????return?0;
????int?mfsLength?=?mfs.length?-?1;
????int?deleteCount?=?0;
????//?存放要?jiǎng)h除的MappedFile
????List??files?=?new?ArrayList??();
????if?(null?!=?mfs)?{
????????//對(duì)映射文件進(jìn)行遍歷
????????for?(int?i?=?0;?i?????????????MappedFile?mappedFile?=?(MappedFile)?mfs[i];
????????????//文件最后的修改時(shí)間?+?過(guò)期時(shí)間?=?文件最終能夠存活的時(shí)間
????????????long?liveMaxTimestamp?=?mappedFile.getLastModifiedTimestamp()?+?expiredTime;
????????????//?如果文件最新修改已經(jīng)超過(guò)三天或者是磁盤空間達(dá)到85%以上??而要在此之前需要滿足3個(gè)條件之一,時(shí)間,容量,和手動(dòng)觸發(fā)
????????????if?(System.currentTimeMillis()?>=?liveMaxTimestamp?||?cleanImmediately)?{
????????????????//刪除文件,就是解除對(duì)文件的引用
????????????????if?(mappedFile.destroy(intervalForcibly))?{
????????????????????//要?jiǎng)h除的的文件加入到要?jiǎng)h除的集合中
????????????????????files.add(mappedFile);
????????????????????//增加計(jì)數(shù)
????????????????????deleteCount++;
????????????????????if?(files.size()?>=?DELETE_FILES_BATCH_MAX)?{
????????????????????????break;
????????????????????}
????????????????????//如果刪除時(shí)間間隔大于0,并且沒(méi)有循環(huán)玩,則睡眠指定的刪除間隔時(shí)長(zhǎng)后在殺出
????????????????????if?(deleteFilesInterval?>?0?&&?(i?+?1)?????????????????????????try?{
????????????????????????????Thread.sleep(deleteFilesInterval);
????????????????????????}?catch?(InterruptedException?e)?{}
????????????????????}
????????????????}?else?break;
????????????}?else?{
????????????????//?避免在中間刪除文件
????????????????break;
????????????}
????????}
????}
????//從文件映射隊(duì)列中刪除對(duì)應(yīng)的文件映射
????deleteExpiredFile(files);
????//返回刪除的文件個(gè)數(shù)
????return?deleteCount;
}
由?timeup?變量我們可以引申出 isTimeToDelete函數(shù)
RocketMQ會(huì)配置執(zhí)行刪除工作的時(shí)間,默認(rèn)是早上四點(diǎn)。如果當(dāng)前時(shí)間在04:00~04:59之間,就返回true。
/**
?*?判斷時(shí)間是否到?凌晨4點(diǎn)
?*?@return
?*/
private?boolean?isTimeToDelete()?{
????//?04
????String?when?=?DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
????if?(UtilAll.isItTimeToDo(when))?{
????????DefaultMessageStore.log.info("it's?time?to?reclaim?disk?space,?"?+?when);
????????return?true;
????}
????return?false;
}
由?spacefull?變量我們可以引申出 isSpaceToDelete函數(shù)
判斷磁盤空間是否滿足刪除的條件,判斷要求如下:
- 使用提交日志的路徑,檢查其所在的磁盤空間的使用率。默認(rèn)情況下,使用率超過(guò)90%,設(shè)置磁盤不可用標(biāo)志位,并且設(shè)置屬性
DefaultMessageStore.CleanCommitLogService#cleanImmediately為true。使用率超過(guò)85%,設(shè)置屬性DefaultMessageStore.CleanCommitLogService#cleanImmediately為true。其他情況,設(shè)置運(yùn)行狀態(tài)位為磁盤可用。 - 磁盤使用率小于0或者大于屬性
MessageStoreConfig#diskMaxUsedSpaceRatio的要求,默認(rèn)是75%,則返回true給調(diào)用。 - 針對(duì)消費(fèi)隊(duì)列的文件路徑,上述步驟重復(fù)一次。
- 如果步驟1~3都沒(méi)有返回true,則返回false給調(diào)用者。意味著此時(shí)磁盤空間有剩余,不要求刪除。
/**
?*?空間是否上限
?*?@return
?*/
private?boolean?isSpaceToDelete()?{
????double?ratio?=?DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio()?/?100.0;
????cleanImmediately?=?false;
????{
????????String?commitLogStorePath?=?DefaultMessageStore.this.getStorePathPhysic();
????????String[]?storePaths?=?commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
????????Set??fullStorePath?=?new?HashSet?>?();
????????double?minPhysicRatio?=?100;
????????String?minStorePath?=?null;
????????for?(String?storePathPhysic:?storePaths)?{
????????????double?physicRatio?=?UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
????????????if?(minPhysicRatio?>?physicRatio)?{
????????????????minPhysicRatio?=?physicRatio;
????????????????minStorePath?=?storePathPhysic;
????????????}
????????????if?(physicRatio?>?diskSpaceCleanForciblyRatio)?{
????????????????fullStorePath.add(storePathPhysic);
????????????}
????????}
????????DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
????????if?(minPhysicRatio?>?diskSpaceWarningLevelRatio)?{
????????????boolean?diskok?=?DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
????????????if?(diskok)?{
????????????????DefaultMessageStore.log.error("physic?disk?maybe?full?soon?"?+?minPhysicRatio?+
????????????????????",?so?mark?disk?full,?storePathPhysic="?+?minStorePath);
????????????}
????????????cleanImmediately?=?true;
????????}?else?if?(minPhysicRatio?>?diskSpaceCleanForciblyRatio)?{
????????????cleanImmediately?=?true;
????????}?else?{
????????????boolean?diskok?=?DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
????????????if?(!diskok)?{
????????????????DefaultMessageStore.log.info("physic?disk?space?OK?"?+?minPhysicRatio?+?",?so?mark?disk?ok,?storePathPhysic="?+?minStorePath);
????????????}
????????}
????????if?(minPhysicRatio?0?||?minPhysicRatio?>?ratio)?{
????????????DefaultMessageStore.log.info("physic?disk?maybe?full?soon,?so?reclaim?space,?"?+?minPhysicRatio?+?",?storePathPhysic="?+?minStorePath);
????????????return?true;
????????}
????}
????{
????????String?storePathLogics?=?DefaultMessageStore.this.getStorePathLogic();
????????double?logicsRatio?=?UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
????????if?(logicsRatio?>?diskSpaceWarningLevelRatio)?{
????????????boolean?diskok?=?DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
????????????if?(diskok)?{
????????????????DefaultMessageStore.log.error("logics?disk?maybe?full?soon?"?+?logicsRatio?+?",?so?mark?disk?full");
????????????}
????????????cleanImmediately?=?true;
????????}?else?if?(logicsRatio?>?diskSpaceCleanForciblyRatio)?{
????????????cleanImmediately?=?true;
????????}?else?{
????????????boolean?diskok?=?DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
????????????if?(!diskok)?{
????????????????DefaultMessageStore.log.info("logics?disk?space?OK?"?+?logicsRatio?+?",?so?mark?disk?ok");
????????????}
????????}
????????if?(logicsRatio?0?||?logicsRatio?>?ratio)?{
????????????DefaultMessageStore.log.info("logics?disk?maybe?full?soon,?so?reclaim?space,?"?+?logicsRatio);
????????????return?true;
????????}
????}
????return?false;
}
消費(fèi)隊(duì)列過(guò)期刪除
CLeanConsumeQueueService的run方法就是直接委托這個(gè)方法來(lái)實(shí)現(xiàn)。這個(gè)方法的作用就是刪除無(wú)效的消費(fèi)隊(duì)列條目?jī)?nèi)容或者文件本身。其代碼邏輯如下:
- 通過(guò)方法
CommitLog#getMinOffset獲取提交日志最小的偏移量,聲明為minOffset。 - 如果
minOffset大于類屬性lastPhysicalMinOffset,那么意味著當(dāng)前提交日志的最小偏移量對(duì)比上一次查詢的值發(fā)生了變化,也就是說(shuō)必然是最少一個(gè)提交日志文件被刪除,那么相應(yīng)的在消費(fèi)隊(duì)列中的過(guò)期數(shù)據(jù)也可以被刪除,就執(zhí)行后面的流程。反之,則意味著不需要執(zhí)行任何操作,結(jié)束方法即可。 - 將
minOffset賦值給lastPhysicalMinOffset。 - 對(duì)屬性
consumeQueueTable進(jìn)行遍歷,遍歷其中每一個(gè)ConsumeQueue對(duì)象。使用本次的minOffset作為入?yún)?,調(diào)用方法ConsumeQueue#deleteExpiredFile刪除過(guò)期的消費(fèi)隊(duì)列文件以及更新消費(fèi)隊(duì)列的最小偏移量。如果有刪除到文件,則休眠MessageStoreConfig#deleteConsumeQueueFilesInterval配置的時(shí)間,繼續(xù)對(duì)下一個(gè)消費(fèi)隊(duì)列執(zhí)行刪除。 - 當(dāng)循環(huán)執(zhí)行完畢,使用參數(shù)
minOffset作為入?yún)?,調(diào)用方法IndexService#deleteExpiredFile(long)來(lái)刪除索引文件中已經(jīng)完全無(wú)效的索引文件。
public?void?run()?{
????try?{
????????this.deleteExpiredFiles();
????}?catch?(Throwable?e)?{
????????DefaultMessageStore.log.warn(this.getServiceName()?+?"?service?has?exception.?",?e);
????}
}
private?void?deleteExpiredFiles()?{
????//?0.1秒
????int?deleteLogicsFilesInterval?=?DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
????//?得到commitlog中第一個(gè)文件的起始物理offset
????long?minOffset?=?DefaultMessageStore.this.commitLog.getMinOffset();
????if?(minOffset?>?this.lastPhysicalMinOffset)?{
????????//?發(fā)現(xiàn)上次的已經(jīng)變小了???說(shuō)明commitlog已經(jīng)發(fā)生過(guò)刪除操作了
????????this.lastPhysicalMinOffset?=?minOffset;
????????ConcurrentMap?>?tables?=?DefaultMessageStore.this.consumeQueueTable;
????????for?(ConcurrentMap??maps:?tables.values())?{
????????????for?(ConsumeQueue?logic:?maps.values())?{
????????????????//?對(duì)某一個(gè)消費(fèi)隊(duì)列做刪除??參數(shù)是commitlog最小的物理點(diǎn)位
????????????????int?deleteCount?=?logic.deleteExpiredFile(minOffset);
????????????????if?(deleteCount?>?0?&&?deleteLogicsFilesInterval?>?0)?{
????????????????????try?{
????????????????????????//?當(dāng)上一個(gè)ConsumeQueue成功刪除之后,下一個(gè)ConsumeQueue刪除需要等待0.1s
????????????????????????Thread.sleep(deleteLogicsFilesInterval);
????????????????????}?catch?(InterruptedException?ignored)?{
????????????????????}
????????????????}
????????????}
????????}
????????//?刪除索引文件
????????DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
????}
}
索引文件刪除
索引文件的刪除是在消費(fèi)隊(duì)列刪除完成后,調(diào)用方法?deleteExpiredFile?完成的。
該方法是用于刪除索引文件中的無(wú)效文件。執(zhí)行流程如下:
- 首先需要確認(rèn),索引文件中是否存在無(wú)效文件。獲取第一個(gè)索引文件,獲取其
endPhyOffset屬性,判斷該屬性的值是否小于入?yún)⒌?code style="font-size:14px;font-family:'Operator Mono', Consolas, Monaco, Menlo, monospace;color:rgb(239,112,96);background-color:rgba(27,31,35,.05);">offset。如果是的話,至少意味著有一個(gè)文件是無(wú)效的,則執(zhí)行后續(xù)流程。否則沒(méi)有無(wú)效文件,則直接結(jié)束整個(gè)方法。 - 聲明一個(gè)局部變量
fileList,遍歷索引文件IndexFile對(duì)象,如果其endPhyOffset小于入?yún)⒌?code style="font-size:14px;font-family:'Operator Mono', Consolas, Monaco, Menlo, monospace;color:rgb(239,112,96);background-color:rgba(27,31,35,.05);">offset,說(shuō)明該文件是無(wú)效的,添加到fileList中。 - 使用第二步的
fileList作為入?yún)ⅲ{(diào)用方法IndexService#deleteExpiredFile(List)。該方法內(nèi)部調(diào)用了IndexFile#destory方法,內(nèi)部也是委托了MappedFile#destory方法實(shí)現(xiàn)的文件銷毀。并且刪除成功的IndexFile還會(huì)從屬性indexFileList列表中刪除對(duì)應(yīng)的對(duì)象。
/**
?*?刪除索引文件
?*?@param?offset
?*/
public?void?deleteExpiredFile(long?offset)?{
????Object[]?files?=?null;
????try?{
????????this.readWriteLock.readLock().lock();
????????if?(this.indexFileList.isEmpty())?{
????????????return;
????????}
????????long?endPhyOffset?=?this.indexFileList.get(0).getEndPhyOffset();
????????if?(endPhyOffset?????????????files?=?this.indexFileList.toArray();
????????}
????}?catch?(Exception?e)?{
????????log.error("destroy?exception",?e);
????}?finally?{
????????this.readWriteLock.readLock().unlock();
????}
????if?(files?!=?null)?{
????????List??fileList?=?new?ArrayList??();
????????for?(int?i?=?0;?i?(files.length?-?1);?i++)?{
????????????IndexFile?f?=?(IndexFile)?files[i];
????????????if?(f.getEndPhyOffset()?????????????????fileList.add(f);
????????????}?else?{
????????????????break;
????????????}
????????}
????????this.deleteExpiredFile(fileList);
????}
}
文件恢復(fù)機(jī)制
從源碼定位中,我們可以看到執(zhí)行?./mqbroker?命令后,會(huì)啟動(dòng)main函數(shù)的 createBrokerController函數(shù)。
在函數(shù)中調(diào)用了一個(gè)?initialize?初始化 ,我們?cè)诔跏蓟瘮?shù)中找到了?this.messageStore.load
public?static?void?main(String[]?args)?{
????start(createBrokerController(args));
}
public?static?BrokerController?createBrokerController(String[]?args)?{
????boolean?initResult?=?controller.initialize();
????if?(!initResult)?{
????????controller.shutdown();
????????System.exit(-3);
????}
????return?controller;
}?catch?(Throwable?e)?{
????e.printStackTrace();
????System.exit(-1);
}
public?boolean?initialize()?throws?CloneNotSupportedException?{
????result?=?result?&&?this.messageStore.load();
????if?(result)?{
????}
????return?result;
}
這里的?load?和上面的代碼一樣,都是接口實(shí)現(xiàn)類。統(tǒng)一由 DefaultMessageStore 實(shí)現(xiàn)。
所以文件恢復(fù)函數(shù)?recover?從 Broker啟動(dòng)之后,就會(huì)隨之啟動(dòng)。啟動(dòng)之后
- 檢查當(dāng)前文件是否損壞(異常關(guān)閉)或者存不存在 (檢查依據(jù)已在下列代碼的尾部貼出)
- 加載Commit Log 和 Consume Queue文件。加載成功之后進(jìn)行?
recover?文件恢復(fù)
/**
?*?檢查abort文件是不是存在,如果存在表示上次是異常關(guān)閉,這個(gè)文件是一個(gè)空文件,在啟動(dòng)之后會(huì)創(chuàng)建,正常關(guān)閉的情況會(huì)刪除掉。
?*?加載延遲消息相關(guān)的配置,加載?Commit?Log文件,加載Consume?Queue文件
?*?如果步驟2成功加載,則加載checkpoint文件,加載indexFile然后進(jìn)行文件的恢復(fù)邏輯
?*?對(duì)于文件的恢復(fù)邏輯在recover方法中,會(huì)調(diào)用CommitLog類中的方法
?*?@throws?IOException
?*/
public?boolean?load()?{
????boolean?result?=?true;
????try?{
????????//是否存在abort文件,如果存在說(shuō)明上次服務(wù)關(guān)閉時(shí)異常關(guān)閉的
????????boolean?lastExitOK?=?!this.isTempFileExist();
????????log.info("last?shutdown?{}",?lastExitOK???"normally"?:?"abnormally");
????????//?加載?Commit?Log文件
????????result?=?result?&&?this.commitLog.load();
????????//?加載?Consume?Queue文件
????????result?=?result?&&?this.loadConsumeQueue();
????????//檢查前面3個(gè)文件是不是加載成功
????????if?(result)?{
????????????//加載成功則繼續(xù)加載checkpoint文件
????????????this.storeCheckpoint?=?new?StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
????????????//加載indexFile
????????????this.indexService.load(lastExitOK);
????????????//進(jìn)行文件的恢復(fù)邏輯
????????????this.recover(lastExitOK);
????????????log.info("load?over,?and?the?max?phy?offset?=?{}",?this.getMaxPhyOffset());
????????????if?(null?!=?scheduleMessageService)?{
????????????????result?=?this.scheduleMessageService.load();
????????????}
????????}
????}?catch?(Exception?e)?{
????????log.error("load?exception",?e);
????????result?=?false;
????}
????if?(!result)?{
????????this.allocateMappedFileService.shutdown();
????}
????return?result;
}
//?檢查依據(jù)是從這個(gè)路徑中
private?String?storePathRootDir?=?System.getProperty("user.home")?+?File.separator?+?"store";
recover 函數(shù)的實(shí)現(xiàn)邏輯
從 ConsumeQueue文件的集合中取出,從倒數(shù)第三個(gè)文件開始,逐條遍歷消息,如果取出的物理點(diǎn)位大于0并且message的size大于0,說(shuō)明數(shù)據(jù)有效。
恢復(fù)commitlog分正常退出和非正常退出。
正常退出的commitlog所有數(shù)據(jù)都是flush完成的,所以只要從倒數(shù)第三個(gè)文件開始恢復(fù)即可,遍歷每一個(gè)message,并校驗(yàn)其CRC。
非正常退出則從最后一個(gè)文件開始恢復(fù),一般出現(xiàn)問(wèn)題的都是最后一個(gè)文件,然后獲取文件中的第一個(gè)message,其存儲(chǔ)時(shí)間是否小于checkpoint時(shí)間點(diǎn)中的最小的一個(gè),如果是,表示其就是需要恢復(fù)的起始文件。然后檢驗(yàn)每一個(gè)message的CRC,并將通過(guò)校驗(yàn)的數(shù)據(jù)dispatch到consumelog和index文件中。
/**
?*?進(jìn)行文件的恢復(fù)邏輯
?*?@param?lastExitOK
?*/
private?void?recover(final?boolean?lastExitOK)?{
????long?maxPhyOffsetOfConsumeQueue?=?this.recoverConsumeQueue();
????//上次服務(wù)關(guān)閉是不是正常關(guān)閉
????if?(lastExitOK)?{
????????//正常情況關(guān)閉
????????this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
????}?else?{
????????//異常情況關(guān)閉
????????this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
????}
????//恢復(fù)topic消費(fèi)相關(guān)相關(guān)的緩存
????this.recoverTopicQueueTable();
}
/**
?*?計(jì)算恢復(fù)ConsumeQueue文件集合的下標(biāo)?
?*/
private?long?recoverConsumeQueue()?{
????long?maxPhysicOffset?=?-1;
????for?(ConcurrentMap??maps:?this.consumeQueueTable.values())?{
????????for?(ConsumeQueue?logic:?maps.values())?{
????????????logic.recover();
????????????if?(logic.getMaxPhysicOffset()?>?maxPhysicOffset)?{
????????????????maxPhysicOffset?=?logic.getMaxPhysicOffset();
????????????}
????????}
????}
????return?maxPhysicOffset;
}
/**
?*?恢復(fù)topic消費(fèi)相關(guān)相關(guān)的緩存
?*/
public?void?recoverTopicQueueTable()?{
????/*?topic-queueid?*/
????/*?offset?*/
????HashMap??table?=?new?HashMap??(1024);
????long?minPhyOffset?=?this.commitLog.getMinOffset();
????for?(ConcurrentMap??maps:?this.consumeQueueTable.values())?{
????????for?(ConsumeQueue?logic:?maps.values())?{
????????????String?key?=?logic.getTopic()?+?"-"?+?logic.getQueueId();
????????????table.put(key,?logic.getMaxOffsetInQueue());
????????????logic.correctMinOffset(minPhyOffset);
????????}
????}
????this.commitLog.setTopicQueueTable(table);
}
/**
?*?當(dāng)正常退出、數(shù)據(jù)恢復(fù)時(shí),所有內(nèi)存數(shù)據(jù)均已刷新
?*?服務(wù)正?;謴?fù)?加載的映射文件列表進(jìn)行遍歷,對(duì)文件進(jìn)行校驗(yàn),和文件中的消息的魔數(shù)進(jìn)行校驗(yàn),來(lái)判斷哪些數(shù)據(jù)是正常的,
?*?并計(jì)算出正常的數(shù)據(jù)的最大偏移量。然后,根據(jù)偏移量設(shè)置對(duì)應(yīng)的提交和刷新的位置以及不正常數(shù)據(jù)的刪除。
?*/
public?void?recoverNormally(long?maxPhyOffsetOfConsumeQueue)?{
????boolean?checkCRCOnRecover?=?this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
????final?List??mappedFiles?=?this.mappedFileQueue.getMappedFiles();
????if?(!mappedFiles.isEmpty())?{
????????//?Began?to?recover?from?the?last?third?file
????????//如果文件列表大于3就從倒數(shù)第3個(gè)開始,否則從第一個(gè)開始
????????int?index?=?mappedFiles.size()?-?3;
????????if?(index?0)
????????????index?=?0;
????????MappedFile?mappedFile?=?mappedFiles.get(index);
????????ByteBuffer?byteBuffer?=?mappedFile.sliceByteBuffer();
????????long?processOffset?=?mappedFile.getFileFromOffset();
????????long?mappedFileOffset?=?0;
????????while?(true)?{
????????????//校驗(yàn)消息,然后返回轉(zhuǎn)發(fā)請(qǐng)求,根據(jù)Magic_code正確,并且crc32正確,并且消息的msgSize記錄大小和消息整體大小相等。則表示是合格的消息
????????????DispatchRequest?dispatchRequest?=?this.checkMessageAndReturnSize(byteBuffer,?checkCRCOnRecover);
????????????int?size?=?dispatchRequest.getMsgSize();
????????????//?Normal?data
????????????//?是一個(gè)合格的消息并且消息體大于0
????????????if?(dispatchRequest.isSuccess()?&&?size?>?0)?{
????????????????//?則讀取的偏移量mapedFileOffset累加msgSize
????????????????mappedFileOffset?+=?size;
????????????}
????????????//?Come?the?end?of?the?file,?switch?to?the?next?file?Since?the?return?0?representatives?met?last?hole,?this?can?not?be?included?in?truncate?offset
????????????//?是合格的消息,但是消息體為0,表示讀取到了文件的最后一塊信息
????????????else?if?(dispatchRequest.isSuccess()?&&?size?==?0)?{
????????????????index++;
????????????????//?文件讀完了
????????????????if?(index?>=?mappedFiles.size())?{
????????????????????//?Current?branch?can?not?happen
????????????????????log.info("recover?last?3?physics?file?over,?last?mapped?file?"?+?mappedFile.getFileName());
????????????????????break;
????????????????}?else?{
????????????????????mappedFile?=?mappedFiles.get(index);
????????????????????byteBuffer?=?mappedFile.sliceByteBuffer();
????????????????????processOffset?=?mappedFile.getFileFromOffset();
????????????????????mappedFileOffset?=?0;
????????????????????log.info("recover?next?physics?file,?"?+?mappedFile.getFileName());
????????????????}
????????????}
????????????//?Intermediate?file?read?error
????????????else?if?(!dispatchRequest.isSuccess())?{
????????????????log.info("recover?physics?file?end,?"?+?mappedFile.getFileName());
????????????????break;
????????????}
????????}
????????//?最后讀取的MapedFile對(duì)象的fileFromOffset加上最后讀取的位置mapedFileOffset值
????????processOffset?+=?mappedFileOffset;
????????//?設(shè)置文件刷新到的offset
????????this.mappedFileQueue.setFlushedWhere(processOffset);
????????//?設(shè)置文件提交到的offset
????????this.mappedFileQueue.setCommittedWhere(processOffset);
????????//?刪除offset之后的臟數(shù)據(jù)文件
????????this.mappedFileQueue.truncateDirtyFiles(processOffset);
????????//?Clear?ConsumeQueue?redundant?data
????????//?清除ConsumeQueue冗余數(shù)據(jù)
????????if?(maxPhyOffsetOfConsumeQueue?>=?processOffset)?{
????????????log.warn("maxPhyOffsetOfConsumeQueue({})?>=?processOffset({}),?truncate?dirty?logic?files",?maxPhyOffsetOfConsumeQueue,?processOffset);
????????????this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
????????}
????}?else?{
????????//?Commitlog?case?files?are?deleted??案例文件被刪除
????????log.warn("The?commitlog?files?are?deleted,?and?delete?the?consume?queue?files");
????????this.mappedFileQueue.setFlushedWhere(0);
????????this.mappedFileQueue.setCommittedWhere(0);
????????this.defaultMessageStore.destroyLogics();
????}
}
往期推薦
結(jié)尾
本篇文件介紹的就是RocketMQ的過(guò)期刪除機(jī)制,與恢復(fù)機(jī)制。
文件過(guò)期刪除機(jī)制?觸發(fā)主要有三點(diǎn)
- 默認(rèn)凌晨4點(diǎn)。這個(gè)也比較好理解,這個(gè)時(shí)候用的人也比較少,刪除對(duì)系統(tǒng)的影響就降到最小。
- 磁盤空間不足。當(dāng)磁盤空間不足的時(shí)候,就要?jiǎng)h除過(guò)期文件以提供更多的空間出來(lái)接收消息。
- 人工觸發(fā),指人為的介入去刪除。
由上述三種情況展開聊了一些文件過(guò)大,被占用,文件損壞的一些安全性處理。
恢復(fù)機(jī)制?沒(méi)有硬性條件,主要有以下2點(diǎn)
- 檢查當(dāng)前文件是否損壞(異常關(guān)閉)或者存不存在
- 加載Commit Log 和 Consume Queue文件。加載成功之后才執(zhí)行
消息隊(duì)列過(guò)期刪除
取出commitlog中第一個(gè)文件的起始物理offset位置,與末次最小物理坐標(biāo)offset做對(duì)比。如果發(fā)現(xiàn)上次的下標(biāo)已經(jīng)變小了,說(shuō)明commitlog已經(jīng)發(fā)生過(guò)刪除操作了
索引過(guò)期刪除
執(zhí)行完消息隊(duì)列的過(guò)期刪除,根據(jù)坐標(biāo)直接刪掉對(duì)應(yīng)的索引
非常歡迎大家加我個(gè)人微信有關(guān)后端方面的問(wèn)題我們?cè)谌簝?nèi)一起討論!?我們下期再見(jiàn)!
歡迎『點(diǎn)贊』、『在看』、『轉(zhuǎn)發(fā)』三連支持一下,下次見(jiàn)~
