<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          圖文并茂!深入了解RocketMQ的過(guò)期刪除機(jī)制

          共 12141字,需瀏覽 25分鐘

           ·

          2022-05-11 20:29

          大家好,我是Leo。

          今天聊一下RocketMQ的文件過(guò)期刪除機(jī)制

          本章概括

          1206bc7c54cecd46df5c080d965be665.webp

          源碼定位

          Broker是RocketMQ的核心,提供了消息的接收,存儲(chǔ),拉取等功能

          我們可以先從Broker服務(wù)入手。從源碼可以得知。RocketMQ啟用了一個(gè)?BrokerController?的?start?函數(shù)

          public?static?void?main(String[]?args)?{
          ????start(createBrokerController(args));
          }

          public?static?BrokerController?start(BrokerController?controller)?{
          ????try?{
          ????????controller.start();

          ????????String?tip?=?"The?broker[";
          ????????if?(null?!=?controller.getBrokerConfig().getNamesrvAddr())?{
          ????????????//?日志拼接
          ????????}

          ????????log.info(tip);
          ????????System.out.printf("%s%n",?tip);
          ????????return?controller;
          ????}?catch?(Throwable?e)?{
          ????????e.printStackTrace();
          ????????System.exit(-1);
          ????}

          ????return?null;
          }

          下列是start?函數(shù)啟動(dòng)的異步線程,他啟動(dòng)了一個(gè)?messageStore

          public?void?start()?throws?Exception?{
          ????????if?(this.messageStore?!=?null)?{
          ????????????this.messageStore.start();
          ????????}
          ????}

          從?messageStore.start()?函數(shù)進(jìn)入后會(huì)有一個(gè)消息存儲(chǔ)的第三方接口。

          public?interface?MessageStore?{

          ????/**
          ?????*?Load?previously?stored?messages.
          ?????*
          ?????*?@return?true?if?success;?false?otherwise.
          ?????*/

          ????boolean?load();

          ????/**
          ?????*?Launch?this?message?store.
          ?????*
          ?????*?@throws?Exception?if?there?is?any?error.
          ?????*/

          ????void?start()?throws?Exception;
          }

          繼續(xù)圍繞?start?函數(shù)展開實(shí)現(xiàn)類查找,可以看到最終由?DefaultMessageStore?實(shí)現(xiàn)類實(shí)現(xiàn)

          4dc4e9bd505db30d4eaeb257abb0f5c2.webp

          定位到具體問(wèn)題之后,可以看到?start?調(diào)用了一個(gè)?addScheduleTask?函數(shù)

          這個(gè)函數(shù)主要處理的就是清除過(guò)期日志服務(wù)。

          ?public?void?start()?throws?Exception?{
          ?????//刷新ConsumeQueue的服務(wù)啟動(dòng)
          ?????this.flushConsumeQueueService.start();
          ?????//CommitLog刷新的服務(wù)啟動(dòng)
          ?????this.commitLog.start();
          ?????//存儲(chǔ)狀態(tài)檢測(cè)的服務(wù)啟動(dòng)
          ?????this.storeStatsService.start();

          ?????//創(chuàng)建臨時(shí)文件,來(lái)表示是否正常關(guān)機(jī)
          ?????this.createTempFile();
          ?????//啟動(dòng)其他服務(wù)。比如清除過(guò)期日志的服務(wù)等
          ?????this.addScheduleTask();
          ?????this.shutdown?=?false;
          ?}

          這篇文件聊的就是這個(gè)?addScheduleTask?函數(shù)。言歸正傳,步入正題!

          流程圖

          b7e1728065b82e48460e8d25b1a1ef9c.webp

          過(guò)期刪除機(jī)制

          文件過(guò)期刪除

          首次執(zhí)行時(shí)間是60000毫秒=60秒。其余間隔執(zhí)行都是每10秒執(zhí)行一次刪除。

          //?資源回收間隔
          private?int?cleanResourceInterval?=?10000;
          /**
          ??*?{}要執(zhí)行的任務(wù)
          ??*?1.延遲第一次執(zhí)行的時(shí)間
          ??*?2.兩次執(zhí)行之間的時(shí)間?10000?資源回收間隔
          ??*?3.毫秒
          */

          this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{
          ???????@Override
          ???????public?void?run()?{
          ?????????????DefaultMessageStore.this.cleanFilesPeriodically();
          ???????}
          //?延遲第一次執(zhí)行的時(shí)間
          },?1000?*?60,?this.messageStoreConfig.getCleanResourceInterval(),?TimeUnit.MILLISECONDS);

          1812b9f117dce499b1cc341c7997731b.webp

          對(duì)于刪除過(guò)期的時(shí)機(jī)包括以下3種:

          1. 默認(rèn)凌晨4點(diǎn)。這個(gè)也比較好理解,這個(gè)時(shí)候用的人也比較少,刪除對(duì)系統(tǒng)的影響就降到最小。
          2. 磁盤空間不足。當(dāng)磁盤空間不足的時(shí)候,就要?jiǎng)h除過(guò)期文件以提供更多的空間出來(lái)接收消息。
          3. 人工觸發(fā),指人為的介入去刪除。

          d74b147316390db6efeb3a2c893073c9.webp

          刪除的文件是過(guò)期文件,那哪些文件是過(guò)期的呢?

          首先是保留時(shí)間,默認(rèn)72小時(shí),也就是3天,超過(guò)3天的數(shù)據(jù),是需要?jiǎng)h除的。

          deleteExpiredFiles?是用于刪除過(guò)期文件。執(zhí)行步驟如下:

          1. 首先是需要判斷是否需要?jiǎng)h除文件,通過(guò)兩個(gè)方法的調(diào)用isTimeToDeleteisSpaceToDelete判斷是否達(dá)到定時(shí)刪除時(shí)間以及是否磁盤已滿需要?jiǎng)h除,以及判斷屬性DefaultMessageStore.CleanCommitLogService#manualDeleteFileSeveralTimes是否大于0意味著需要手動(dòng)刪除。如果這三個(gè)條件任意為真,意味著需要執(zhí)行刪除,那就繼續(xù)后續(xù)的流程。否則結(jié)束當(dāng)前方法。
          2. 如果是手動(dòng)刪除,則屬性DefaultMessageStore.CleanCommitLogService#manualDeleteFileSeveralTimes減1.
          3. 如果屬性MessageStoreConfig#cleanFileForciblyEnableDefaultMessageStore.CleanCommitLogService#cleanImmediately為真,聲明cleanAtOnece為true,否則為false。
          4. 調(diào)用方法?CommitLog#deleteExpiredFile?進(jìn)行文件刪除。方法需要4個(gè)入?yún)?,分別是:
            1. expiredTime:過(guò)期時(shí)間或者說(shuō)文件刪除前的保留時(shí)間,默認(rèn)為72小時(shí)。
            2. deleteFilesInterval:文件刪除間隔,這里取值為100.
            3. intervalForcibly:該參數(shù)用于強(qiáng)制文件強(qiáng)制釋放時(shí)間間隔,單位是毫秒。這里取值為120*1000,
            4. cleanImmediately:是否立即執(zhí)行刪除,這邊使用的就是步驟3中的數(shù)據(jù)。
          /**
          ?*?刪除已經(jīng)失效的
          ?*/

          private?void?deleteExpiredFiles()?{
          ????int?deleteCount?=?0;
          ????//?文件保留時(shí)長(zhǎng)?72
          ????long?fileReservedTime?=?DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
          ????//?100
          ????int?deletePhysicFilesInterval?=?DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
          ????//?1000?*?120?=?120000毫秒?=?120秒
          ????int?destroyMapedFileIntervalForcibly?=?DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
          ????//?判斷有沒(méi)到凌晨4點(diǎn)
          ????boolean?timeup?=?this.isTimeToDelete();
          ????//?空間是否上限
          ????boolean?spacefull?=?this.isSpaceToDelete();
          ????//?手動(dòng)刪除??經(jīng)過(guò)20次的調(diào)度
          ????boolean?manualDelete?=?this.manualDeleteFileSeveralTimes?>?0;

          ????if?(timeup?||?spacefull?||?manualDelete)?{
          ????????if?(manualDelete)
          ????????????this.manualDeleteFileSeveralTimes--;

          ????????boolean?cleanAtOnce?=?DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable()?&&?this.cleanImmediately;

          ????????log.info("begin?to?delete?before?{}?hours?file.?timeup:?{}?spacefull:?{}?manualDeleteFileSeveralTimes:?{}?cleanAtOnce:?{}",?fileReservedTime,?timeup,?spacefull,?manualDeleteFileSeveralTimes,?cleanAtOnce);

          ????????fileReservedTime?*=?60?*?60?*?1000;

          ????????deleteCount?=?DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime,?deletePhysicFilesInterval,?destroyMapedFileIntervalForcibly,?cleanAtOnce);
          ????????if?(deleteCount?>?0)?{}?else?if?(spacefull)?{
          ????????????//?刪除文件失敗
          ????????????log.warn("disk?space?will?be?full?soon,?but?delete?file?failed.");
          ????????}
          ????}
          }

          如果這個(gè)文件被其他線程引用了,此時(shí)就不會(huì)進(jìn)行刪除,記錄第一次刪除的時(shí)間戳,退出本次任務(wù),等120s后,就會(huì)把文件的引用減1000,再?gòu)?qiáng)制刪除。

          在刪除的過(guò)程中,會(huì)存在刪除多個(gè)文件的情況,每個(gè)文件之間,還有一個(gè)時(shí)間間隔,比如第一個(gè)文件刪除完后,需要等100ms再刪除第二個(gè)文件。

          120s可以通過(guò) destroyMapedFileIntervalForcibly 得知

          100ms可以通過(guò) deletePhysicFilesInterval 得知

          02dfb4b3789501e0f591a4e14146649f.webp

          如果當(dāng)前刪除的文件數(shù)量,已經(jīng)超過(guò)了可以刪除的最大批量數(shù),則退出本次任務(wù)??梢酝ㄟ^(guò)上述代碼中的?spacefull?得出

          /**
          ?*?根據(jù)時(shí)間刪除過(guò)期文件
          ?*?@param?expiredTime?保留時(shí)長(zhǎng)??一般是?72
          ?*?@param?deleteFilesInterval?刪除間隔?100
          ?*?@param?intervalForcibly?120秒?延遲
          ?*?@param?cleanImmediately?是否強(qiáng)制啟用
          ?*?@return
          ?*/

          public?int?deleteExpiredFileByTime(final?long?expiredTime,?final?int?deleteFilesInterval,?final?long?intervalForcibly,?final?boolean?cleanImmediately)?{
          ????//獲取映射文件列表?commitlog文件可能隨時(shí)有寫入,copy一份不影響寫入
          ????Object[]?mfs?=?this.copyMappedFiles(0);
          ????//如果映射文件列表為空直接返回
          ????if?(null?==?mfs)
          ????????return?0;

          ????int?mfsLength?=?mfs.length?-?1;
          ????int?deleteCount?=?0;
          ????//?存放要?jiǎng)h除的MappedFile
          ????List??files?=?new?ArrayList??();
          ????if?(null?!=?mfs)?{
          ????????//對(duì)映射文件進(jìn)行遍歷
          ????????for?(int?i?=?0;?i?????????????MappedFile?mappedFile?=?(MappedFile)?mfs[i];
          ????????????//文件最后的修改時(shí)間?+?過(guò)期時(shí)間?=?文件最終能夠存活的時(shí)間
          ????????????long?liveMaxTimestamp?=?mappedFile.getLastModifiedTimestamp()?+?expiredTime;
          ????????????//?如果文件最新修改已經(jīng)超過(guò)三天或者是磁盤空間達(dá)到85%以上??而要在此之前需要滿足3個(gè)條件之一,時(shí)間,容量,和手動(dòng)觸發(fā)
          ????????????if?(System.currentTimeMillis()?>=?liveMaxTimestamp?||?cleanImmediately)?{
          ????????????????//刪除文件,就是解除對(duì)文件的引用
          ????????????????if?(mappedFile.destroy(intervalForcibly))?{
          ????????????????????//要?jiǎng)h除的的文件加入到要?jiǎng)h除的集合中
          ????????????????????files.add(mappedFile);
          ????????????????????//增加計(jì)數(shù)
          ????????????????????deleteCount++;

          ????????????????????if?(files.size()?>=?DELETE_FILES_BATCH_MAX)?{
          ????????????????????????break;
          ????????????????????}

          ????????????????????//如果刪除時(shí)間間隔大于0,并且沒(méi)有循環(huán)玩,則睡眠指定的刪除間隔時(shí)長(zhǎng)后在殺出
          ????????????????????if?(deleteFilesInterval?>?0?&&?(i?+?1)?????????????????????????try?{
          ????????????????????????????Thread.sleep(deleteFilesInterval);
          ????????????????????????}?catch?(InterruptedException?e)?{}
          ????????????????????}
          ????????????????}?else?break;
          ????????????}?else?{
          ????????????????//?避免在中間刪除文件
          ????????????????break;
          ????????????}
          ????????}
          ????}
          ????//從文件映射隊(duì)列中刪除對(duì)應(yīng)的文件映射
          ????deleteExpiredFile(files);
          ????//返回刪除的文件個(gè)數(shù)
          ????return?deleteCount;
          }

          由?timeup?變量我們可以引申出 isTimeToDelete函數(shù)

          RocketMQ會(huì)配置執(zhí)行刪除工作的時(shí)間,默認(rèn)是早上四點(diǎn)。如果當(dāng)前時(shí)間在04:00~04:59之間,就返回true。

          /**
          ?*?判斷時(shí)間是否到?凌晨4點(diǎn)
          ?*?@return
          ?*/

          private?boolean?isTimeToDelete()?{
          ????//?04
          ????String?when?=?DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
          ????if?(UtilAll.isItTimeToDo(when))?{
          ????????DefaultMessageStore.log.info("it's?time?to?reclaim?disk?space,?"?+?when);
          ????????return?true;
          ????}
          ????return?false;
          }

          由?spacefull?變量我們可以引申出 isSpaceToDelete函數(shù)

          判斷磁盤空間是否滿足刪除的條件,判斷要求如下:

          1. 使用提交日志的路徑,檢查其所在的磁盤空間的使用率。默認(rèn)情況下,使用率超過(guò)90%,設(shè)置磁盤不可用標(biāo)志位,并且設(shè)置屬性DefaultMessageStore.CleanCommitLogService#cleanImmediately為true。使用率超過(guò)85%,設(shè)置屬性DefaultMessageStore.CleanCommitLogService#cleanImmediately為true。其他情況,設(shè)置運(yùn)行狀態(tài)位為磁盤可用。
          2. 磁盤使用率小于0或者大于屬性MessageStoreConfig#diskMaxUsedSpaceRatio的要求,默認(rèn)是75%,則返回true給調(diào)用。
          3. 針對(duì)消費(fèi)隊(duì)列的文件路徑,上述步驟重復(fù)一次。
          4. 如果步驟1~3都沒(méi)有返回true,則返回false給調(diào)用者。意味著此時(shí)磁盤空間有剩余,不要求刪除。
          /**
          ?*?空間是否上限
          ?*?@return
          ?*/

          private?boolean?isSpaceToDelete()?{
          ????double?ratio?=?DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio()?/?100.0;

          ????cleanImmediately?=?false;

          ????{
          ????????String?commitLogStorePath?=?DefaultMessageStore.this.getStorePathPhysic();
          ????????String[]?storePaths?=?commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
          ????????Set??fullStorePath?=?new?HashSet??();
          ????????double?minPhysicRatio?=?100;
          ????????String?minStorePath?=?null;
          ????????for?(String?storePathPhysic:?storePaths)?{
          ????????????double?physicRatio?=?UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
          ????????????if?(minPhysicRatio?>?physicRatio)?{
          ????????????????minPhysicRatio?=?physicRatio;
          ????????????????minStorePath?=?storePathPhysic;
          ????????????}
          ????????????if?(physicRatio?>?diskSpaceCleanForciblyRatio)?{
          ????????????????fullStorePath.add(storePathPhysic);
          ????????????}
          ????????}
          ????????DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
          ????????if?(minPhysicRatio?>?diskSpaceWarningLevelRatio)?{
          ????????????boolean?diskok?=?DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
          ????????????if?(diskok)?{
          ????????????????DefaultMessageStore.log.error("physic?disk?maybe?full?soon?"?+?minPhysicRatio?+
          ????????????????????",?so?mark?disk?full,?storePathPhysic="?+?minStorePath);
          ????????????}

          ????????????cleanImmediately?=?true;
          ????????}?else?if?(minPhysicRatio?>?diskSpaceCleanForciblyRatio)?{
          ????????????cleanImmediately?=?true;
          ????????}?else?{
          ????????????boolean?diskok?=?DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
          ????????????if?(!diskok)?{
          ????????????????DefaultMessageStore.log.info("physic?disk?space?OK?"?+?minPhysicRatio?+?",?so?mark?disk?ok,?storePathPhysic="?+?minStorePath);
          ????????????}
          ????????}

          ????????if?(minPhysicRatio?0?||?minPhysicRatio?>?ratio)?{
          ????????????DefaultMessageStore.log.info("physic?disk?maybe?full?soon,?so?reclaim?space,?"?+?minPhysicRatio?+?",?storePathPhysic="?+?minStorePath);
          ????????????return?true;
          ????????}
          ????}

          ????{
          ????????String?storePathLogics?=?DefaultMessageStore.this.getStorePathLogic();
          ????????double?logicsRatio?=?UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
          ????????if?(logicsRatio?>?diskSpaceWarningLevelRatio)?{
          ????????????boolean?diskok?=?DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
          ????????????if?(diskok)?{
          ????????????????DefaultMessageStore.log.error("logics?disk?maybe?full?soon?"?+?logicsRatio?+?",?so?mark?disk?full");
          ????????????}

          ????????????cleanImmediately?=?true;
          ????????}?else?if?(logicsRatio?>?diskSpaceCleanForciblyRatio)?{
          ????????????cleanImmediately?=?true;
          ????????}?else?{
          ????????????boolean?diskok?=?DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
          ????????????if?(!diskok)?{
          ????????????????DefaultMessageStore.log.info("logics?disk?space?OK?"?+?logicsRatio?+?",?so?mark?disk?ok");
          ????????????}
          ????????}

          ????????if?(logicsRatio?0?||?logicsRatio?>?ratio)?{
          ????????????DefaultMessageStore.log.info("logics?disk?maybe?full?soon,?so?reclaim?space,?"?+?logicsRatio);
          ????????????return?true;
          ????????}
          ????}

          ????return?false;
          }

          消費(fèi)隊(duì)列過(guò)期刪除

          CLeanConsumeQueueServicerun方法就是直接委托這個(gè)方法來(lái)實(shí)現(xiàn)。這個(gè)方法的作用就是刪除無(wú)效的消費(fèi)隊(duì)列條目?jī)?nèi)容或者文件本身。其代碼邏輯如下:

          1. 通過(guò)方法CommitLog#getMinOffset獲取提交日志最小的偏移量,聲明為minOffset。
          2. 如果minOffset大于類屬性lastPhysicalMinOffset,那么意味著當(dāng)前提交日志的最小偏移量對(duì)比上一次查詢的值發(fā)生了變化,也就是說(shuō)必然是最少一個(gè)提交日志文件被刪除,那么相應(yīng)的在消費(fèi)隊(duì)列中的過(guò)期數(shù)據(jù)也可以被刪除,就執(zhí)行后面的流程。反之,則意味著不需要執(zhí)行任何操作,結(jié)束方法即可。
          3. minOffset賦值給lastPhysicalMinOffset。
          4. 對(duì)屬性consumeQueueTable進(jìn)行遍歷,遍歷其中每一個(gè)ConsumeQueue對(duì)象。使用本次的minOffset作為入?yún)?,調(diào)用方法ConsumeQueue#deleteExpiredFile刪除過(guò)期的消費(fèi)隊(duì)列文件以及更新消費(fèi)隊(duì)列的最小偏移量。如果有刪除到文件,則休眠MessageStoreConfig#deleteConsumeQueueFilesInterval配置的時(shí)間,繼續(xù)對(duì)下一個(gè)消費(fèi)隊(duì)列執(zhí)行刪除。
          5. 當(dāng)循環(huán)執(zhí)行完畢,使用參數(shù)minOffset作為入?yún)?,調(diào)用方法IndexService#deleteExpiredFile(long)來(lái)刪除索引文件中已經(jīng)完全無(wú)效的索引文件。
          public?void?run()?{
          ????try?{
          ????????this.deleteExpiredFiles();
          ????}?catch?(Throwable?e)?{
          ????????DefaultMessageStore.log.warn(this.getServiceName()?+?"?service?has?exception.?",?e);
          ????}
          }

          private?void?deleteExpiredFiles()?{
          ????//?0.1秒
          ????int?deleteLogicsFilesInterval?=?DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
          ????//?得到commitlog中第一個(gè)文件的起始物理offset
          ????long?minOffset?=?DefaultMessageStore.this.commitLog.getMinOffset();
          ????if?(minOffset?>?this.lastPhysicalMinOffset)?{
          ????????//?發(fā)現(xiàn)上次的已經(jīng)變小了???說(shuō)明commitlog已經(jīng)發(fā)生過(guò)刪除操作了
          ????????this.lastPhysicalMinOffset?=?minOffset;

          ????????ConcurrentMap?>?tables?=?DefaultMessageStore.this.consumeQueueTable;

          ????????for?(ConcurrentMap??maps:?tables.values())?{
          ????????????for?(ConsumeQueue?logic:?maps.values())?{
          ????????????????//?對(duì)某一個(gè)消費(fèi)隊(duì)列做刪除??參數(shù)是commitlog最小的物理點(diǎn)位
          ????????????????int?deleteCount?=?logic.deleteExpiredFile(minOffset);

          ????????????????if?(deleteCount?>?0?&&?deleteLogicsFilesInterval?>?0)?{
          ????????????????????try?{
          ????????????????????????//?當(dāng)上一個(gè)ConsumeQueue成功刪除之后,下一個(gè)ConsumeQueue刪除需要等待0.1s
          ????????????????????????Thread.sleep(deleteLogicsFilesInterval);
          ????????????????????}?catch?(InterruptedException?ignored)?{

          ????????????????????}
          ????????????????}
          ????????????}
          ????????}
          ????????//?刪除索引文件
          ????????DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
          ????}
          }

          索引文件刪除

          索引文件的刪除是在消費(fèi)隊(duì)列刪除完成后,調(diào)用方法?deleteExpiredFile?完成的。

          該方法是用于刪除索引文件中的無(wú)效文件。執(zhí)行流程如下:

          1. 首先需要確認(rèn),索引文件中是否存在無(wú)效文件。獲取第一個(gè)索引文件,獲取其endPhyOffset屬性,判斷該屬性的值是否小于入?yún)⒌?code style="font-size:14px;font-family:'Operator Mono', Consolas, Monaco, Menlo, monospace;color:rgb(239,112,96);background-color:rgba(27,31,35,.05);">offset。如果是的話,至少意味著有一個(gè)文件是無(wú)效的,則執(zhí)行后續(xù)流程。否則沒(méi)有無(wú)效文件,則直接結(jié)束整個(gè)方法。
          2. 聲明一個(gè)局部變量fileList,遍歷索引文件IndexFile對(duì)象,如果其endPhyOffset小于入?yún)⒌?code style="font-size:14px;font-family:'Operator Mono', Consolas, Monaco, Menlo, monospace;color:rgb(239,112,96);background-color:rgba(27,31,35,.05);">offset,說(shuō)明該文件是無(wú)效的,添加到fileList中。
          3. 使用第二步的fileList作為入?yún)ⅲ{(diào)用方法IndexService#deleteExpiredFile(List)。該方法內(nèi)部調(diào)用了IndexFile#destory方法,內(nèi)部也是委托了MappedFile#destory方法實(shí)現(xiàn)的文件銷毀。并且刪除成功的IndexFile還會(huì)從屬性indexFileList列表中刪除對(duì)應(yīng)的對(duì)象。
          /**
          ?*?刪除索引文件
          ?*?@param?offset
          ?*/

          public?void?deleteExpiredFile(long?offset)?{
          ????Object[]?files?=?null;
          ????try?{
          ????????this.readWriteLock.readLock().lock();
          ????????if?(this.indexFileList.isEmpty())?{
          ????????????return;
          ????????}

          ????????long?endPhyOffset?=?this.indexFileList.get(0).getEndPhyOffset();
          ????????if?(endPhyOffset?????????????files?=?this.indexFileList.toArray();
          ????????}
          ????}?catch?(Exception?e)?{
          ????????log.error("destroy?exception",?e);
          ????}?finally?{
          ????????this.readWriteLock.readLock().unlock();
          ????}

          ????if?(files?!=?null)?{
          ????????List??fileList?=?new?ArrayList??();
          ????????for?(int?i?=?0;?i?1);?i++)?{
          ????????????IndexFile?f?=?(IndexFile)?files[i];
          ????????????if?(f.getEndPhyOffset()?????????????????fileList.add(f);
          ????????????}?else?{
          ????????????????break;
          ????????????}
          ????????}

          ????????this.deleteExpiredFile(fileList);
          ????}
          }

          文件恢復(fù)機(jī)制

          從源碼定位中,我們可以看到執(zhí)行?./mqbroker?命令后,會(huì)啟動(dòng)main函數(shù)的 createBrokerController函數(shù)。

          在函數(shù)中調(diào)用了一個(gè)?initialize?初始化 ,我們?cè)诔跏蓟瘮?shù)中找到了?this.messageStore.load

          public?static?void?main(String[]?args)?{
          ????start(createBrokerController(args));
          }
          public?static?BrokerController?createBrokerController(String[]?args)?{
          ????boolean?initResult?=?controller.initialize();
          ????if?(!initResult)?{
          ????????controller.shutdown();
          ????????System.exit(-3);
          ????}
          ????return?controller;
          }?catch?(Throwable?e)?{
          ????e.printStackTrace();
          ????System.exit(-1);
          }
          public?boolean?initialize()?throws?CloneNotSupportedException?{
          ????result?=?result?&&?this.messageStore.load();

          ????if?(result)?{

          ????}
          ????return?result;
          }

          這里的?load?和上面的代碼一樣,都是接口實(shí)現(xiàn)類。統(tǒng)一由 DefaultMessageStore 實(shí)現(xiàn)。

          所以文件恢復(fù)函數(shù)?recover?從 Broker啟動(dòng)之后,就會(huì)隨之啟動(dòng)。啟動(dòng)之后

          1. 檢查當(dāng)前文件是否損壞(異常關(guān)閉)或者存不存在 (檢查依據(jù)已在下列代碼的尾部貼出)
          2. 加載Commit Log 和 Consume Queue文件。加載成功之后進(jìn)行?recover?文件恢復(fù)
          /**
          ?*?檢查abort文件是不是存在,如果存在表示上次是異常關(guān)閉,這個(gè)文件是一個(gè)空文件,在啟動(dòng)之后會(huì)創(chuàng)建,正常關(guān)閉的情況會(huì)刪除掉。
          ?*?加載延遲消息相關(guān)的配置,加載?Commit?Log文件,加載Consume?Queue文件
          ?*?如果步驟2成功加載,則加載checkpoint文件,加載indexFile然后進(jìn)行文件的恢復(fù)邏輯
          ?*?對(duì)于文件的恢復(fù)邏輯在recover方法中,會(huì)調(diào)用CommitLog類中的方法
          ?*?@throws?IOException
          ?*/

          public?boolean?load()?{
          ????boolean?result?=?true;

          ????try?{
          ????????//是否存在abort文件,如果存在說(shuō)明上次服務(wù)關(guān)閉時(shí)異常關(guān)閉的
          ????????boolean?lastExitOK?=?!this.isTempFileExist();
          ????????log.info("last?shutdown?{}",?lastExitOK???"normally"?:?"abnormally");

          ????????//?加載?Commit?Log文件
          ????????result?=?result?&&?this.commitLog.load();

          ????????//?加載?Consume?Queue文件
          ????????result?=?result?&&?this.loadConsumeQueue();

          ????????//檢查前面3個(gè)文件是不是加載成功
          ????????if?(result)?{
          ????????????//加載成功則繼續(xù)加載checkpoint文件
          ????????????this.storeCheckpoint?=?new?StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
          ????????????//加載indexFile
          ????????????this.indexService.load(lastExitOK);
          ????????????//進(jìn)行文件的恢復(fù)邏輯
          ????????????this.recover(lastExitOK);

          ????????????log.info("load?over,?and?the?max?phy?offset?=?{}",?this.getMaxPhyOffset());

          ????????????if?(null?!=?scheduleMessageService)?{
          ????????????????result?=?this.scheduleMessageService.load();
          ????????????}
          ????????}

          ????}?catch?(Exception?e)?{
          ????????log.error("load?exception",?e);
          ????????result?=?false;
          ????}

          ????if?(!result)?{
          ????????this.allocateMappedFileService.shutdown();
          ????}

          ????return?result;
          }

          //?檢查依據(jù)是從這個(gè)路徑中
          private?String?storePathRootDir?=?System.getProperty("user.home")?+?File.separator?+?"store";

          recover 函數(shù)的實(shí)現(xiàn)邏輯

          從 ConsumeQueue文件的集合中取出,從倒數(shù)第三個(gè)文件開始,逐條遍歷消息,如果取出的物理點(diǎn)位大于0并且message的size大于0,說(shuō)明數(shù)據(jù)有效。

          恢復(fù)commitlog分正常退出和非正常退出。

          正常退出的commitlog所有數(shù)據(jù)都是flush完成的,所以只要從倒數(shù)第三個(gè)文件開始恢復(fù)即可,遍歷每一個(gè)message,并校驗(yàn)其CRC。

          非正常退出則從最后一個(gè)文件開始恢復(fù),一般出現(xiàn)問(wèn)題的都是最后一個(gè)文件,然后獲取文件中的第一個(gè)message,其存儲(chǔ)時(shí)間是否小于checkpoint時(shí)間點(diǎn)中的最小的一個(gè),如果是,表示其就是需要恢復(fù)的起始文件。然后檢驗(yàn)每一個(gè)message的CRC,并將通過(guò)校驗(yàn)的數(shù)據(jù)dispatch到consumelog和index文件中。

          /**
          ?*?進(jìn)行文件的恢復(fù)邏輯
          ?*?@param?lastExitOK
          ?*/

          private?void?recover(final?boolean?lastExitOK)?{
          ????long?maxPhyOffsetOfConsumeQueue?=?this.recoverConsumeQueue();

          ????//上次服務(wù)關(guān)閉是不是正常關(guān)閉
          ????if?(lastExitOK)?{
          ????????//正常情況關(guān)閉
          ????????this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
          ????}?else?{
          ????????//異常情況關(guān)閉
          ????????this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
          ????}

          ????//恢復(fù)topic消費(fèi)相關(guān)相關(guān)的緩存
          ????this.recoverTopicQueueTable();
          }
          /**
          ?*?計(jì)算恢復(fù)ConsumeQueue文件集合的下標(biāo)?
          ?*/

          private?long?recoverConsumeQueue()?{
          ????long?maxPhysicOffset?=?-1;
          ????for?(ConcurrentMap??maps:?this.consumeQueueTable.values())?{
          ????????for?(ConsumeQueue?logic:?maps.values())?{
          ????????????logic.recover();
          ????????????if?(logic.getMaxPhysicOffset()?>?maxPhysicOffset)?{
          ????????????????maxPhysicOffset?=?logic.getMaxPhysicOffset();
          ????????????}
          ????????}
          ????}
          ????return?maxPhysicOffset;
          }
          /**
          ?*?恢復(fù)topic消費(fèi)相關(guān)相關(guān)的緩存
          ?*/

          public?void?recoverTopicQueueTable()?{
          ????/*?topic-queueid?*/
          ????/*?offset?*/
          ????HashMap??table?=?new?HashMap??(1024);
          ????long?minPhyOffset?=?this.commitLog.getMinOffset();
          ????for?(ConcurrentMap??maps:?this.consumeQueueTable.values())?{
          ????????for?(ConsumeQueue?logic:?maps.values())?{
          ????????????String?key?=?logic.getTopic()?+?"-"?+?logic.getQueueId();
          ????????????table.put(key,?logic.getMaxOffsetInQueue());
          ????????????logic.correctMinOffset(minPhyOffset);
          ????????}
          ????}

          ????this.commitLog.setTopicQueueTable(table);
          }
          /**
          ?*?當(dāng)正常退出、數(shù)據(jù)恢復(fù)時(shí),所有內(nèi)存數(shù)據(jù)均已刷新
          ?*?服務(wù)正?;謴?fù)?加載的映射文件列表進(jìn)行遍歷,對(duì)文件進(jìn)行校驗(yàn),和文件中的消息的魔數(shù)進(jìn)行校驗(yàn),來(lái)判斷哪些數(shù)據(jù)是正常的,
          ?*?并計(jì)算出正常的數(shù)據(jù)的最大偏移量。然后,根據(jù)偏移量設(shè)置對(duì)應(yīng)的提交和刷新的位置以及不正常數(shù)據(jù)的刪除。
          ?*/

          public?void?recoverNormally(long?maxPhyOffsetOfConsumeQueue)?{
          ????boolean?checkCRCOnRecover?=?this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
          ????final?List??mappedFiles?=?this.mappedFileQueue.getMappedFiles();
          ????if?(!mappedFiles.isEmpty())?{
          ????????//?Began?to?recover?from?the?last?third?file
          ????????//如果文件列表大于3就從倒數(shù)第3個(gè)開始,否則從第一個(gè)開始
          ????????int?index?=?mappedFiles.size()?-?3;
          ????????if?(index?0)
          ????????????index?=?0;

          ????????MappedFile?mappedFile?=?mappedFiles.get(index);
          ????????ByteBuffer?byteBuffer?=?mappedFile.sliceByteBuffer();
          ????????long?processOffset?=?mappedFile.getFileFromOffset();
          ????????long?mappedFileOffset?=?0;
          ????????while?(true)?{
          ????????????//校驗(yàn)消息,然后返回轉(zhuǎn)發(fā)請(qǐng)求,根據(jù)Magic_code正確,并且crc32正確,并且消息的msgSize記錄大小和消息整體大小相等。則表示是合格的消息
          ????????????DispatchRequest?dispatchRequest?=?this.checkMessageAndReturnSize(byteBuffer,?checkCRCOnRecover);
          ????????????int?size?=?dispatchRequest.getMsgSize();
          ????????????//?Normal?data
          ????????????//?是一個(gè)合格的消息并且消息體大于0
          ????????????if?(dispatchRequest.isSuccess()?&&?size?>?0)?{
          ????????????????//?則讀取的偏移量mapedFileOffset累加msgSize
          ????????????????mappedFileOffset?+=?size;
          ????????????}
          ????????????//?Come?the?end?of?the?file,?switch?to?the?next?file?Since?the?return?0?representatives?met?last?hole,?this?can?not?be?included?in?truncate?offset
          ????????????//?是合格的消息,但是消息體為0,表示讀取到了文件的最后一塊信息
          ????????????else?if?(dispatchRequest.isSuccess()?&&?size?==?0)?{
          ????????????????index++;
          ????????????????//?文件讀完了
          ????????????????if?(index?>=?mappedFiles.size())?{
          ????????????????????//?Current?branch?can?not?happen
          ????????????????????log.info("recover?last?3?physics?file?over,?last?mapped?file?"?+?mappedFile.getFileName());
          ????????????????????break;
          ????????????????}?else?{
          ????????????????????mappedFile?=?mappedFiles.get(index);
          ????????????????????byteBuffer?=?mappedFile.sliceByteBuffer();
          ????????????????????processOffset?=?mappedFile.getFileFromOffset();
          ????????????????????mappedFileOffset?=?0;
          ????????????????????log.info("recover?next?physics?file,?"?+?mappedFile.getFileName());
          ????????????????}
          ????????????}
          ????????????//?Intermediate?file?read?error
          ????????????else?if?(!dispatchRequest.isSuccess())?{
          ????????????????log.info("recover?physics?file?end,?"?+?mappedFile.getFileName());
          ????????????????break;
          ????????????}
          ????????}
          ????????//?最后讀取的MapedFile對(duì)象的fileFromOffset加上最后讀取的位置mapedFileOffset值
          ????????processOffset?+=?mappedFileOffset;
          ????????//?設(shè)置文件刷新到的offset
          ????????this.mappedFileQueue.setFlushedWhere(processOffset);
          ????????//?設(shè)置文件提交到的offset
          ????????this.mappedFileQueue.setCommittedWhere(processOffset);
          ????????//?刪除offset之后的臟數(shù)據(jù)文件
          ????????this.mappedFileQueue.truncateDirtyFiles(processOffset);

          ????????//?Clear?ConsumeQueue?redundant?data
          ????????//?清除ConsumeQueue冗余數(shù)據(jù)
          ????????if?(maxPhyOffsetOfConsumeQueue?>=?processOffset)?{
          ????????????log.warn("maxPhyOffsetOfConsumeQueue({})?>=?processOffset({}),?truncate?dirty?logic?files",?maxPhyOffsetOfConsumeQueue,?processOffset);
          ????????????this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
          ????????}
          ????}?else?{
          ????????//?Commitlog?case?files?are?deleted??案例文件被刪除
          ????????log.warn("The?commitlog?files?are?deleted,?and?delete?the?consume?queue?files");
          ????????this.mappedFileQueue.setFlushedWhere(0);
          ????????this.mappedFileQueue.setCommittedWhere(0);
          ????????this.defaultMessageStore.destroyLogics();
          ????}
          }

          往期推薦

          2022年文章目錄整理

          RocketMQ性能提升

          RocketMQ刷盤機(jī)制

          結(jié)尾

          本篇文件介紹的就是RocketMQ的過(guò)期刪除機(jī)制,與恢復(fù)機(jī)制。

          文件過(guò)期刪除機(jī)制?觸發(fā)主要有三點(diǎn)

          1. 默認(rèn)凌晨4點(diǎn)。這個(gè)也比較好理解,這個(gè)時(shí)候用的人也比較少,刪除對(duì)系統(tǒng)的影響就降到最小。
          2. 磁盤空間不足。當(dāng)磁盤空間不足的時(shí)候,就要?jiǎng)h除過(guò)期文件以提供更多的空間出來(lái)接收消息。
          3. 人工觸發(fā),指人為的介入去刪除。

          由上述三種情況展開聊了一些文件過(guò)大,被占用,文件損壞的一些安全性處理。

          恢復(fù)機(jī)制?沒(méi)有硬性條件,主要有以下2點(diǎn)

          1. 檢查當(dāng)前文件是否損壞(異常關(guān)閉)或者存不存在
          2. 加載Commit Log 和 Consume Queue文件。加載成功之后才執(zhí)行

          消息隊(duì)列過(guò)期刪除

          取出commitlog中第一個(gè)文件的起始物理offset位置,與末次最小物理坐標(biāo)offset做對(duì)比。如果發(fā)現(xiàn)上次的下標(biāo)已經(jīng)變小了,說(shuō)明commitlog已經(jīng)發(fā)生過(guò)刪除操作了

          索引過(guò)期刪除

          執(zhí)行完消息隊(duì)列的過(guò)期刪除,根據(jù)坐標(biāo)直接刪掉對(duì)應(yīng)的索引

          非常歡迎大家加我個(gè)人微信有關(guān)后端方面的問(wèn)題我們?cè)谌簝?nèi)一起討論!?我們下期再見(jiàn)!

          歡迎『點(diǎn)贊』、『在看』、『轉(zhuǎn)發(fā)』三連支持一下,下次見(jiàn)~


          瀏覽 144
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  在线看黄 片 | 99re在线视频免费观看 | 久久三级毛片 | 午夜理理伦电影A片无码蜜桃av | 911在线无码精品秘 入口楼风 |