RocketMQ主機磁盤空間有限,如何無限期延長消息存儲?
你知道的越多,不知道的就越多,業(yè)余的像一棵小草!
你來,我們一起精進(jìn)!你不來,我和你的競爭對手一起精進(jìn)!
編輯:業(yè)余草
blog.csdn.net/x763795151
推薦:https://www.xttblog.com/?p=5294
前言
RocketMQ作為國人開源的一款消息引擎,相對kafka也更加適合在線的業(yè)務(wù)場景,在業(yè)內(nèi)使用的也是非常廣泛,很多同學(xué)也是非常熟悉它及它的存儲機制,所以這里不再對它的原理性東西作太多說明。
我們也知道,RocketMQ所有的數(shù)據(jù)如消息信息都是以文件形式保存到broker節(jié)點所在主機上指定的分區(qū)目錄下,比如消息的數(shù)據(jù)都是保存在commitlog中,默認(rèn)保存72小時(在磁盤使用率未達(dá)到閾值的情況下)會在指定時間清理過期數(shù)據(jù),釋放磁盤空間。
當(dāng)然,如果消息量不大且所在磁盤的分區(qū)夠大,我們可以增加消息的保存時間。但受限于磁盤大小,這個保存時間總歸有限,如果消息比較重要,或者我們想保存的更久一些就需要一些其它方案解決。
背景
我們線上的幾個集群目前消息保存時間在2-3天,實在是磁盤空間大小有限,消息量相對不算小。比如,有個比較核心的集群,部署方式是6個高配物理機采用DLedger模式4主8從交叉部署,發(fā)送的tps在10000多,所以每個節(jié)點的日消息量目前應(yīng)該是在600G吧。老大給我說他現(xiàn)在設(shè)置的線上保存時間是2天,業(yè)務(wù)量一直在增加,繼續(xù)增長下去,就要設(shè)置保存1天了,目前每個節(jié)點的磁盤使用率將近50%,年初我搭建監(jiān)控平臺的時候,注意過還沒這么高。

還有其它集群上的業(yè)務(wù),有些業(yè)務(wù)相關(guān)開發(fā)人員想要他們消息保存7天甚至更久。
基于這些原因,所以我們也的確需要一種過期消息備份的解決方案。
解決思路
如果需要對過期消息進(jìn)行備份,然后支持過期消息檢索及重新消費的能力,我們想到的,常規(guī)的方案有如下兩種:
將發(fā)送到broker的消息持久化一份到第三方存儲介質(zhì),如mysql 備份將要過期的commitlog到其它地方,重新恢復(fù)
業(yè)內(nèi)大廠是采用哪些更好的方案,時間問題也沒有具體調(diào)研過,我不得而知。關(guān)于第一種方案,老大也跟我聊過,我是不傾向的,原因如下:
我們的消息代理平臺還沒有建設(shè)出來,業(yè)務(wù)用的基本都是原生的,如果想要在消息生命周期中鏡像一份出來到其它存儲系統(tǒng),在不改源碼的情況下,確實沒有很好的切入點 依賴其它存儲介質(zhì),復(fù)雜性,開發(fā)成本也高,我的開發(fā)時間也不充裕,短期內(nèi)實現(xiàn)這個,有點難 全量保存的話,消息體的減少很難有質(zhì)的變化,當(dāng)然可以在處理的時候,去掉一些元數(shù)據(jù)信息,消息體也可以壓縮減少存儲空間的占用,但無論存哪,質(zhì)量守恒,不會換個地方,用的硬盤資源就能等比減少很多倍
當(dāng)然,這種方案的好處也很明顯,可以更精細(xì)化的控制保存時間及消息類別,設(shè)定對哪些topic或哪類消息的保存時限。另外如果我們的MQ代理層建設(shè)完,無論是RocketMQ還是kafka等都可以采用一種通用方案備份。
我目前主要采用第2種解決方案并進(jìn)行實現(xiàn),備份commitlog,支持檢索和重新消費。主要思路就是,開發(fā)一個應(yīng)用,備份集群里將要過期的commitlog到更大的磁盤空間的主機(一臺主機,備份整個集群的數(shù)據(jù),且硬件配置不需要太高,硬盤盡量大即可),并提供接口,支持檢索消息。
解決方案
基本實現(xiàn)
我們的主要目標(biāo)是讓消息保存的更久一些,不是為了災(zāi)備什么的,所以不需要雙活、冷備這樣搭建一個同等的部署模型的集群。況且資源有限,不可能再申請同配置或者低配的主機資源解決,比如上面那個4主8從Dledger模式,如果需要同樣的集群來解析commitlog檢索消息,至少也需要4主4從部署8個節(jié)點才行,雙活太浪費,冷備維護(hù)也不方便。主要原因是資源也不好申請。
我用了一周的時間,緊趕趕的寫了一個工具能支持備份commitlog及檢索消息:rocketmq-reput。
該工具支持3種模式:客戶端、服務(wù)器及混合模式
客戶端:部署在broker節(jié)點,定時掃描上傳將要過期的commitlog 服務(wù)器:保存過期的commitlog并支持消息檢索 混合模式:同時開啟客戶端和服務(wù)器模式,無限期備份的關(guān)鍵
主要流程如下:
將reput client部署到rokcetmq集群的各個broker的從節(jié)點上,配置監(jiān)聽的commitlog目錄,定時掃描將要過期的commitlog上傳到reput server上。 reput server接收client傳來的commit log并根據(jù)不同的broker存放在不同的目錄下。 重新分發(fā)commit log的消息(所以我起名reput),構(gòu)建索引文件(消息檢索使用)和邏輯消費隊列。 在reput server端可以通過restful接口查詢指定topic的歷史消息(根據(jù)時間范圍、消息ID[客戶端ID/服務(wù)端ID],消息key等)

數(shù)據(jù)上傳
從方案到開發(fā),因為時間上的原因,我也沒太多時間花費在這上面,所以在實現(xiàn)上并沒有太注意細(xì)節(jié),開發(fā)上也比較粗糙。
數(shù)據(jù)上傳這里也是很簡單的壓縮->傳輸->校驗->保存,基本流程如下:

如果上傳到一半服務(wù)器關(guān)閉等原因?qū)е驴蛻舳水?dāng)前文件上傳失敗,會重置隊列,重新檢查上傳文件,避免有commitlog遺漏。
主機配置
該工具在執(zhí)行時,大多情況下不需要太多算力,所以CPU是雙核的即可,內(nèi)存4G足夠,堆內(nèi)存配置2G就行,需要留一些物理內(nèi)存給操作系統(tǒng)的page cache。我目前測試的時候,堆內(nèi)存只配置了512M,挺好。
reput client盡量部署在從節(jié)點上,可以減少對master的影響。
另外開發(fā)的時候,為了節(jié)省時間,減少開發(fā)的代碼,像文件壓縮和md5檢查,都是直接調(diào)用的shell 命令,這也導(dǎo)致不支持在windows平臺下使用,只能在mac 和linux上運行,mac os不檢查md5,只檢查文件長度是否一致。
因為執(zhí)行腳本命令的原因,會占用一些額外的性能,我觀測的有以下幾點:
壓縮的時候一個cpu的核心使用率達(dá)到100%,所以要求最低雙核cpu,單核會影響broker的處理性能 網(wǎng)絡(luò)傳輸帶寬占用在50M/s,其實壓縮比挺高,一般在72%-92%吧,100M-300M之間,所以傳輸時間大概在2-6秒吧,如果本身帶寬是瓶頸,需要注意 硬盤,硬盤得夠大,畢竟要保存整個集群的commitlog
無限期備份方案
硬盤即使再大,但空間大小也有上限,所以能保存消息量也有限,比如一個節(jié)點消息量600-700G左右,4個節(jié)點一天的量就在2.5T左右,即使申請了一個8T的硬盤,也只能保存2天(3天是不可能了)。
reput自身也是和rockemq一樣的過期刪除策略(這部分代碼直接copy rocketmq的實現(xiàn)的),所以數(shù)據(jù)在reput server上過期也要被清除釋放磁盤空間。
所以目前reput支持混合模式,可以再申請一臺主機,當(dāng)前reput作為客戶端,新reput作為server,將快要過期的文件以同樣方式傳輸過去保存,完整流程如下:

就以這種接力的方式一直保存下去,一個主機保存2天,想要保存多久,就申請多少主機吧。
消息檢索
消息檢索,為了方便和省事,我直接在rocketmq-console控制臺新開發(fā)一個歷史消息的頁面用來查詢消息,reput server會以心跳的方式將自己可查詢的時間段及地址注冊到控制臺上。
在控制臺上選擇topic和時間段,然后根據(jù)選擇的時間段符合條件的一個或多個reput server上獲取消息。如果是消息ID或消息key,那就只能到所有的server上一起查了,只要消息還在,總能查到返回。
效果如下,我還可以查到4天前的消息(測試的這個集群配置的是保存2天的數(shù)據(jù)):


重新消費
重新消費可以將要消費的歷史消息檢索出來,重新發(fā)回broker。
寫在最后
其實開發(fā)上還是遇到不少問題點,比如因為commtlog的生成方式和rocketmq自身的生成是不一樣的,rocketmq是在寫入消息的時候,commitlog寫不下了才會創(chuàng)建。在重新構(gòu)建索引和消息隊列的時候基于原有流程有些場景走不通,無法直接滾到下個文件等。
我是每個環(huán)節(jié)一一開發(fā)進(jìn)行驗證的,最終把所有環(huán)節(jié)走通,寫了個完整流程的demo。這個demo本文就不貼代碼了,感謝的加我微信:xttblog2,我發(fā)給大家,可以玩一下。
我把基本啟停腳本也簡單補充了下,只是上面有些bug后來就沒在修改。
整個流程走通后,我就修改包名提交到私服了,后續(xù)的開發(fā)包括和rocketmq-console的聯(lián)調(diào),支持可視化檢索消息等都是在私服的代碼倉庫上,這部分功能及后續(xù)的bug修復(fù),這個demo上是沒有了。但是這份demo代碼已支持消息檢索,也提供的有接口,可以直接調(diào)用接口檢索消息看結(jié)果,接口說明如下:
????/**
?????*?get?the?total?of?message?between?startTime?and?endTime.
?????*
?????*?@param?topic?????topic?name.
?????*?@param?startTime?start?time.
?????*?@param?endTime???end?time.
?????*?@return?a?long?value,?the?total?of?message?between?startTime?and?endTime.
?????*/
????@GetMapping("/total/{topic}/{startTime}/{endTime}")
????public?Object?getMessageTotalByTime(@PathVariable?String?topic,?@PathVariable?long?startTime,
????????@PathVariable?long?endTime)?{
????????return?ResponseData.create().success().data(messageService.getMessageTotalByTime(topic,?startTime,?endTime));
????}
????/**
?????*?get?the?message?list?between?startTime?and?endTime.
?????*
?????*?@param?topic?????topic?name.
?????*?@param?startTime?start?time.
?????*?@param?endTime???end?time.
?????*?@return?List(MessageExt),??he?message?list?between?startTime?and?endTime.
?????*/
????@GetMapping("/list/{topic}/{startTime}/{endTime}")
????public?Object?getMessageByTime(@PathVariable?String?topic,?@PathVariable?long?startTime,
????????@PathVariable?long?endTime)?{
????????return?ResponseData.create().success().data(messageService.getMessageByTime(topic,?startTime,?endTime));
????}
????/**
?????*?get?the?message?list?between?startTime?and?endTime.?It?differs?from?the?above?getMessageByTime?is?that?the
?????*?message?body?is?null?,?as?a?result,??the?size?is?smaller?when?return?the?same?messages.
?????*
?????*?@param?topic?????topic?name.
?????*?@param?startTime?start?time.
?????*?@param?endTime???end?time.
?????*?@return?List(MessageExt),??he?message?list?between?startTime?and?endTime.
?????*/
????@GetMapping("/view/{topic}/{startTime}/{endTime}")
????public?Object?viewMessageList(@PathVariable?String?topic,?@PathVariable?long?startTime,
????????@PathVariable?long?endTime)?{
????????return?ResponseData.create().success().data(messageService.viewMessageList(topic,?startTime,?endTime));
????}
????/**
?????*?get?message?by?message?id(server?id(offset?id)?or?client?id(unique?key)).
?????*
?????*?@param?topic?topic?name
?????*?@param?msgId?msg?id:?server?id/?client?id.
?????*?@return?{@link?org.apache.rocketmq.common.message.MessageExt}
?????*/
????@GetMapping("/id/{topic}/{msgId}")
????public?Object?queryMessageByMsgId(@PathVariable?final?String?topic,?@PathVariable?final?String?msgId)?{
????????return?ResponseData.create().success().data(messageService.queryMessageByMsgId(topic,?msgId));
????}
????/**
?????*?get?message?by?message?key.
?????*
?????*?@param?topic?topic?name
?????*?@param?key???msg?key:?custom?business?key/?client?id.
?????*?@return?{@link?org.apache.rocketmq.common.message.MessageExt}
?????*/
????@GetMapping("/key/{topic}/{key}")
????public?Object?queryMessageByKey(@PathVariable?final?String?topic,?@PathVariable?final?String?key)?{
????????return?ResponseData.create().success().data(messageService.queryMessageByKey(topic,?key));
????}
這個實現(xiàn)是支持Dledger模式與常規(guī)的部署模型的。最近在測試環(huán)境(2主2從非DLedger模式)運行了幾天,看了下效果,結(jié)果挺預(yù)期的,可以驗證該方案是完全可行的。
