RocketMQ主機(jī)磁盤空間有限,如何無(wú)限期延長(zhǎng)消息存儲(chǔ)?
今日推薦
這 9 個(gè) Java 開源項(xiàng)目 yyds,你知道幾個(gè)?
K8S 部署 SpringBoot 項(xiàng)目(一篇夠用)
來(lái)源:blog.csdn.net/x763795151/article/
details/118500973
前言
RocketMQ作為國(guó)人開源的一款消息引擎,相對(duì)kafka也更加適合在線的業(yè)務(wù)場(chǎng)景,在業(yè)內(nèi)使用的也是非常廣泛,很多同學(xué)也是非常熟悉它及它的存儲(chǔ)機(jī)制,所以這里不再對(duì)它的原理性東西作太多說(shuō)明。
我們也知道,RocketMQ所有的數(shù)據(jù)如消息信息都是以文件形式保存到broker節(jié)點(diǎn)所在主機(jī)上指定的分區(qū)目錄下,比如消息的數(shù)據(jù)都是保存在commitlog中,默認(rèn)保存72小時(shí)(在磁盤使用率未達(dá)到閾值的情況下)會(huì)在指定時(shí)間清理過期數(shù)據(jù),釋放磁盤空間。
當(dāng)然,如果消息量不大且所在磁盤的分區(qū)夠大,我們可以增加消息的保存時(shí)間。但受限于磁盤大小,這個(gè)保存時(shí)間總歸有限,如果消息比較重要,或者我們想保存的更久一些就需要一些其它方案解決。
背景
我們線上的幾個(gè)集群目前消息保存時(shí)間在2-3天,實(shí)在是磁盤空間大小有限,消息量相對(duì)不算小。比如,有個(gè)比較核心的集群,部署方式是6個(gè)高配物理機(jī)采用DLedger模式4主8從交叉部署,發(fā)送的tps在10000多,所以每個(gè)節(jié)點(diǎn)的日消息量目前應(yīng)該是在600G吧。老大給我說(shuō)他現(xiàn)在設(shè)置的線上保存時(shí)間是2天,業(yè)務(wù)量一直在增加,繼續(xù)增長(zhǎng)下去,就要設(shè)置保存1天了,目前每個(gè)節(jié)點(diǎn)的磁盤使用率將近50%,年初我搭建監(jiān)控平臺(tái)的時(shí)候,注意過還沒這么高。

還有其它集群上的業(yè)務(wù),有些業(yè)務(wù)相關(guān)開發(fā)人員想要他們消息保存7天甚至更久。
基于這些原因,所以我們也的確需要一種過期消息備份的解決方案。
解決思路
如果需要對(duì)過期消息進(jìn)行備份,然后支持過期消息檢索及重新消費(fèi)的能力,我們想到的,常規(guī)的方案有如下兩種:
將發(fā)送到broker的消息持久化一份到第三方存儲(chǔ)介質(zhì),如mysql 備份將要過期的commitlog到其它地方,重新恢復(fù)
業(yè)內(nèi)大廠是采用哪些更好的方案,時(shí)間問題也沒有具體調(diào)研過,我不得而知。關(guān)于第一種方案,老大也跟我聊過,我是不傾向的,原因如下:
我們的消息代理平臺(tái)還沒有建設(shè)出來(lái),業(yè)務(wù)用的基本都是原生的,如果想要在消息生命周期中鏡像一份出來(lái)到其它存儲(chǔ)系統(tǒng),在不改源碼的情況下,確實(shí)沒有很好的切入點(diǎn) 依賴其它存儲(chǔ)介質(zhì),復(fù)雜性,開發(fā)成本也高,我的開發(fā)時(shí)間也不充裕,短期內(nèi)實(shí)現(xiàn)這個(gè),有點(diǎn)難 全量保存的話,消息體的減少很難有質(zhì)的變化,當(dāng)然可以在處理的時(shí)候,去掉一些元數(shù)據(jù)信息,消息體也可以壓縮減少存儲(chǔ)空間的占用,但無(wú)論存哪,質(zhì)量守恒,不會(huì)換個(gè)地方,用的硬盤資源就能等比減少很多倍
當(dāng)然,這種方案的好處也很明顯,可以更精細(xì)化的控制保存時(shí)間及消息類別,設(shè)定對(duì)哪些topic或哪類消息的保存時(shí)限。另外如果我們的MQ代理層建設(shè)完,無(wú)論是RocketMQ還是kafka等都可以采用一種通用方案?jìng)浞荨?/p>
我目前主要采用第2種解決方案并進(jìn)行實(shí)現(xiàn),備份commitlog,支持檢索和重新消費(fèi)。主要思路就是,開發(fā)一個(gè)應(yīng)用,備份集群里將要過期的commitlog到更大的磁盤空間的主機(jī)(一臺(tái)主機(jī),備份整個(gè)集群的數(shù)據(jù),且硬件配置不需要太高,硬盤盡量大即可),并提供接口,支持檢索消息。
解決方案
基本實(shí)現(xiàn)
我們的主要目標(biāo)是讓消息保存的更久一些,不是為了災(zāi)備什么的,所以不需要雙活、冷備這樣搭建一個(gè)同等的部署模型的集群。況且資源有限,不可能再申請(qǐng)同配置或者低配的主機(jī)資源解決,比如上面那個(gè)4主8從Dledger模式,如果需要同樣的集群來(lái)解析commitlog檢索消息,至少也需要4主4從部署8個(gè)節(jié)點(diǎn)才行,雙活太浪費(fèi),冷備維護(hù)也不方便。主要原因是資源也不好申請(qǐng)。
我用了一周的時(shí)間,緊趕趕的寫了一個(gè)工具能支持備份commitlog及檢索消息:rocketmq-reput。
該工具支持3種模式:客戶端、服務(wù)器及混合模式
客戶端:部署在broker節(jié)點(diǎn),定時(shí)掃描上傳將要過期的commitlog 服務(wù)器:保存過期的commitlog并支持消息檢索 混合模式:同時(shí)開啟客戶端和服務(wù)器模式,無(wú)限期備份的關(guān)鍵
主要流程如下:
將reput client部署到rokcetmq集群的各個(gè)broker的從節(jié)點(diǎn)上,配置監(jiān)聽的commitlog目錄,定時(shí)掃描將要過期的commitlog上傳到reput server上。 reput server接收client傳來(lái)的commit log并根據(jù)不同的broker存放在不同的目錄下。 重新分發(fā)commit log的消息(所以我起名reput),構(gòu)建索引文件(消息檢索使用)和邏輯消費(fèi)隊(duì)列。 在reput server端可以通過restful接口查詢指定topic的歷史消息(根據(jù)時(shí)間范圍、消息ID[客戶端ID/服務(wù)端ID],消息key等)

數(shù)據(jù)上傳
從方案到開發(fā),因?yàn)闀r(shí)間上的原因,我也沒太多時(shí)間花費(fèi)在這上面,所以在實(shí)現(xiàn)上并沒有太注意細(xì)節(jié),開發(fā)上也比較粗糙。
數(shù)據(jù)上傳這里也是很簡(jiǎn)單的壓縮->傳輸->校驗(yàn)->保存,基本流程如下:

如果上傳到一半服務(wù)器關(guān)閉等原因?qū)е驴蛻舳水?dāng)前文件上傳失敗,會(huì)重置隊(duì)列,重新檢查上傳文件,避免有commitlog遺漏。
主機(jī)配置
該工具在執(zhí)行時(shí),大多情況下不需要太多算力,所以CPU是雙核的即可,內(nèi)存4G足夠,堆內(nèi)存配置2G就行,需要留一些物理內(nèi)存給操作系統(tǒng)的page cache。我目前測(cè)試的時(shí)候,堆內(nèi)存只配置了512M,挺好。
reput client盡量部署在從節(jié)點(diǎn)上,可以減少對(duì)master的影響。
另外開發(fā)的時(shí)候,為了節(jié)省時(shí)間,減少開發(fā)的代碼,像文件壓縮和md5檢查,都是直接調(diào)用的shell 命令,這也導(dǎo)致不支持在windows平臺(tái)下使用,只能在mac 和linux上運(yùn)行,mac os不檢查md5,只檢查文件長(zhǎng)度是否一致。
因?yàn)閳?zhí)行腳本命令的原因,會(huì)占用一些額外的性能,我觀測(cè)的有以下幾點(diǎn):
壓縮的時(shí)候一個(gè)cpu的核心使用率達(dá)到100%,所以要求最低雙核cpu,單核會(huì)影響broker的處理性能 網(wǎng)絡(luò)傳輸帶寬占用在50M/s,其實(shí)壓縮比挺高,一般在72%-92%吧,100M-300M之間,所以傳輸時(shí)間大概在2-6秒吧,如果本身帶寬是瓶頸,需要注意 硬盤,硬盤得夠大,畢竟要保存整個(gè)集群的commitlog
無(wú)限期備份方案
硬盤即使再大,但空間大小也有上限,所以能保存消息量也有限,比如一個(gè)節(jié)點(diǎn)消息量600-700G左右,4個(gè)節(jié)點(diǎn)一天的量就在2.5T左右,即使申請(qǐng)了一個(gè)8T的硬盤,也只能保存2天(3天是不可能了)。
reput自身也是和rockemq一樣的過期刪除策略(這部分代碼直接copy rocketmq的實(shí)現(xiàn)的),所以數(shù)據(jù)在reput server上過期也要被清除釋放磁盤空間。
所以目前reput支持混合模式,可以再申請(qǐng)一臺(tái)主機(jī),當(dāng)前reput作為客戶端,新reput作為server,將快要過期的文件以同樣方式傳輸過去保存,完整流程如下:

就以這種接力的方式一直保存下去,一個(gè)主機(jī)保存2天,想要保存多久,就申請(qǐng)多少主機(jī)吧。
消息檢索
消息檢索,為了方便和省事,我直接在rocketmq-console控制臺(tái)新開發(fā)一個(gè)歷史消息的頁(yè)面用來(lái)查詢消息,reput server會(huì)以心跳的方式將自己可查詢的時(shí)間段及地址注冊(cè)到控制臺(tái)上。
在控制臺(tái)上選擇topic和時(shí)間段,然后根據(jù)選擇的時(shí)間段符合條件的一個(gè)或多個(gè)reput server上獲取消息。如果是消息ID或消息key,那就只能到所有的server上一起查了,只要消息還在,總能查到返回。
效果如下,我還可以查到4天前的消息(測(cè)試的這個(gè)集群配置的是保存2天的數(shù)據(jù)):


重新消費(fèi)
重新消費(fèi)可以將要消費(fèi)的歷史消息檢索出來(lái),重新發(fā)回broker。
寫在最后
其實(shí)開發(fā)上還是遇到不少問題點(diǎn),比如因?yàn)閏ommtlog的生成方式和rocketmq自身的生成是不一樣的,rocketmq是在寫入消息的時(shí)候,commitlog寫不下了才會(huì)創(chuàng)建。在重新構(gòu)建索引和消息隊(duì)列的時(shí)候基于原有流程有些場(chǎng)景走不通,無(wú)法直接滾到下個(gè)文件等。
我是每個(gè)環(huán)節(jié)一一開發(fā)進(jìn)行驗(yàn)證的,最終把所有環(huán)節(jié)走通,寫了個(gè)完整流程的demo。
https://github.com/xxd763795151/rocketmq-reput
我把基本啟停腳本也簡(jiǎn)單補(bǔ)充了下,只是上面有些bug后來(lái)就沒在修改。
整個(gè)流程走通后,我就修改包名提交到私服了,后續(xù)的開發(fā)包括和rocketmq-console的聯(lián)調(diào),支持可視化檢索消息等都是在私服的代碼倉(cāng)庫(kù)上,這部分功能及后續(xù)的bug修復(fù),這個(gè)demo上是沒有了。但是這份demo代碼已支持消息檢索,也提供的有接口,可以直接調(diào)用接口檢索消息看結(jié)果,接口說(shuō)明如下:
/**
* get the total of message between startTime and endTime.
*
* @param topic topic name.
* @param startTime start time.
* @param endTime end time.
* @return a long value, the total of message between startTime and endTime.
*/
@GetMapping("/total/{topic}/{startTime}/{endTime}")
public Object getMessageTotalByTime(@PathVariable String topic, @PathVariable long startTime,
@PathVariable long endTime) {
return ResponseData.create().success().data(messageService.getMessageTotalByTime(topic, startTime, endTime));
}
/**
* get the message list between startTime and endTime.
*
* @param topic topic name.
* @param startTime start time.
* @param endTime end time.
* @return List(MessageExt), he message list between startTime and endTime.
*/
@GetMapping("/list/{topic}/{startTime}/{endTime}")
public Object getMessageByTime(@PathVariable String topic, @PathVariable long startTime,
@PathVariable long endTime) {
return ResponseData.create().success().data(messageService.getMessageByTime(topic, startTime, endTime));
}
/**
* get the message list between startTime and endTime. It differs from the above getMessageByTime is that the
* message body is null , as a result, the size is smaller when return the same messages.
*
* @param topic topic name.
* @param startTime start time.
* @param endTime end time.
* @return List(MessageExt), he message list between startTime and endTime.
*/
@GetMapping("/view/{topic}/{startTime}/{endTime}")
public Object viewMessageList(@PathVariable String topic, @PathVariable long startTime,
@PathVariable long endTime) {
return ResponseData.create().success().data(messageService.viewMessageList(topic, startTime, endTime));
}
/**
* get message by message id(server id(offset id) or client id(unique key)).
*
* @param topic topic name
* @param msgId msg id: server id/ client id.
* @return {@link org.apache.rocketmq.common.message.MessageExt}
*/
@GetMapping("/id/{topic}/{msgId}")
public Object queryMessageByMsgId(@PathVariable final String topic, @PathVariable final String msgId) {
return ResponseData.create().success().data(messageService.queryMessageByMsgId(topic, msgId));
}
/**
* get message by message key.
*
* @param topic topic name
* @param key msg key: custom business key/ client id.
* @return {@link org.apache.rocketmq.common.message.MessageExt}
*/
@GetMapping("/key/{topic}/{key}")
public Object queryMessageByKey(@PathVariable final String topic, @PathVariable final String key) {
return ResponseData.create().success().data(messageService.queryMessageByKey(topic, key));
}
這個(gè)實(shí)現(xiàn)是支持Dledger模式與常規(guī)的部署模型的。最近在測(cè)試環(huán)境(2主2從非DLedger模式)運(yùn)行了幾天,看了下效果,結(jié)果挺預(yù)期的,可以驗(yàn)證該方案是完全可行的。
推薦文章
1、一款高顏值的 SpringBoot+JPA 博客項(xiàng)目 2、超優(yōu) Vue+Element+Spring 中后端解決方案 3、推薦幾個(gè)支付項(xiàng)目! 4、推薦一個(gè) Java 企業(yè)信息化系統(tǒng) 5、一款基于 Spring Boot 的現(xiàn)代化社區(qū)(論壇/問答/社交網(wǎng)絡(luò)/博客)
