要將RocketMQ中臺化,有點小激動
本文字數(shù):4223字
預計閱讀時間:13分鐘
一、RocketMQ簡介
RocketMQ是一個高可用、高性能、高可靠的分布式消息隊列,相對于kafka更適合處理業(yè)務系統(tǒng)之間的消息。
它具有很多特性,例如:
發(fā)布訂閱 順序、事務、定時消息 消息堆積、重試,回溯等等 它通過同步刷盤和同步雙寫等技術(shù)手段來實現(xiàn)高可靠,保證如下情況消息不丟:
可恢復性故障:broker或OS crash等 不可恢復性故障:磁盤損壞等 它采用多項技術(shù)優(yōu)化來滿足性能要求:
順序IO PageCache和mmap 內(nèi)存預熱和鎖定 異步提交和刷盤 堆外內(nèi)存緩沖等等
所以,它的本質(zhì)決定的其架構(gòu)一定是復雜的,參考RocketMQ官方架構(gòu)圖:

這里不再介紹各個組件的含義,可以參考RocketMQ架構(gòu)設(shè)計。
RocketMQ經(jīng)過阿里多年雙十一的檢驗,其穩(wěn)定性不言而喻。
可作為搜狐視頻的消息中臺,還需要很長一段路要走,為什么這么說呢?
二、運維之痛
早在2014年我們就引入了RocketMQ作為消息中間件,其附帶了基本的命令行工具。
但是命令行運維此等龐然大物會讓人感到力不從心,好在社區(qū)提供了一個web控制臺:RocketMQ-Console。
在初期,簡單的控制臺已經(jīng)能滿足基本的需求。但是隨著各個業(yè)務逐漸接入,需求也紛至沓來。
我們在RocketMQ-Console的修修補補已經(jīng)無法滿足了,主要體現(xiàn)在如下幾點:
從業(yè)務方的角度: 偏重運維,一般業(yè)務用戶不關(guān)心集群的數(shù)據(jù)和狀態(tài),無法聚焦。 使用起來繁瑣,且直接操作集群,易誤操作。 沒有監(jiān)控預警功能。 無法滿足業(yè)務用戶的需求,包括但不限于: 序列化 trace 流控,隔離降級 埋點統(tǒng)計監(jiān)控等等 一些隱性問題無法解決。 從管理員維度: 無用戶概念,任何人都能直接操作集群,易誤操作且比較危險。 無集群管理功能,日常更新或機器替換需要手動部署,非常耗時、麻煩且易出錯。 無相關(guān)數(shù)據(jù)統(tǒng)計,監(jiān)控,預警等,往往有問題不能及時發(fā)現(xiàn)。
另外,RocketMQ有一些潛在約定、使用規(guī)范、最佳實踐、bug或優(yōu)化等等,用文檔說明也無濟于事。
所以與其寫文檔不如將經(jīng)驗和實踐轉(zhuǎn)換為產(chǎn)品,能夠更好的服務于業(yè)務及運維集群,于是MQCloud應運而生。
三、MQCloud誕生
先看一下MQCloud的定位:

它是集客戶端SDK,監(jiān)控預警,集群運維于一體的一站式服務平臺。
MQCloud的系統(tǒng)架構(gòu)如下:

下面來分別說明一下MQCloud如何解決上面提到的痛點。

對于生產(chǎn)者來說,他關(guān)心的是topic配置,消息的發(fā)送數(shù)據(jù),誰在消費等等問題,這樣只對他展示相應的數(shù)據(jù)即可; 對于消費者來說,只關(guān)心消費狀況,有沒有堆積,消費失敗等情況; 對于管理員來說,可以進行部署,監(jiān)控,統(tǒng)一配置,審批等日常運維;
每分鐘topic的生產(chǎn)流量:用于繪制topic生產(chǎn)流量圖及監(jiān)控預警。 每分鐘消費者流量:用于繪制消費流量圖及監(jiān)控預警。 每10分鐘topic生產(chǎn)流量:用于按照流量展示topic排序。 每分鐘broker生產(chǎn)、消費流量:用于繪制broker生產(chǎn)消費流量圖。 每分鐘broker集群生產(chǎn)、消費流量:用于繪制broker集群的生產(chǎn)流量圖。 每分鐘生產(chǎn)者百分位耗時、異常統(tǒng)計:以ip維度繪制每個生產(chǎn)者的耗時流量圖及監(jiān)控預警。 機器的cpu,內(nèi)存,io,網(wǎng)絡(luò)流量,網(wǎng)絡(luò)連接等統(tǒng)計:用于服務器的狀況圖和監(jiān)控預警。
TOPIC_PUT_NUMS:某topic消息生產(chǎn)條數(shù),向某個topic寫入消息成功才算
寫入成功包括四種狀態(tài):PUT_OK,F(xiàn)LUSH_DISK_TIMEOUT,F(xiàn)LUSH_SLAVE_TIMEOUT,SLAVE_NOT_AVAILABLE
TOPIC_PUT_SIZE:某topic消息生產(chǎn)大小,向某個topic寫入消息成功才算
StatsItemSet主要字段及方法如下:
ConcurrentMap<String/* statsKey */, StatsItem> statsItemTable; // statsKey<->StatsItem
// 針對某個數(shù)據(jù)項進行記錄
public void addValue(final String statsKey, final int incValue, final int incTimes) {
StatsItem statsItem = this.getAndCreateStatsItem(statsKey);
statsItem.getValue().addAndGet(incValue);
statsItem.getTimes().addAndGet(incTimes);
}
// 獲取并創(chuàng)建StatsItem
public StatsItem getAndCreateStatsItem(final String statsKey) {
StatsItem statsItem = this.statsItemTable.get(statsKey);
if (null == statsItem) {
statsItem = new StatsItem(this.statsName, statsKey);
this.statsItemTable.put(statsKey, statsItem);
}
return statsItem;
}StatsItem主要字段及方法如下:
AtomicLong value; // 統(tǒng)計數(shù)據(jù):比如消息條數(shù),消息大小
AtomicLong times; // 次數(shù)
LinkedList<CallSnapshot> csListMinute; // 每分鐘快照數(shù)據(jù)
LinkedList<CallSnapshot> csListHour; // 每小時快照數(shù)據(jù)
LinkedList<CallSnapshot> csListDay; // 每天快照數(shù)據(jù)
// 分鐘采樣
public void samplingInSeconds() {
synchronized (csListMinute) {
csListMinute.add(new CallSnapshot(System.currentTimeMillis(), times.get(), value.get()));
if (csListMinute.size() > 7) {
csListMinute.removeFirst();
}
}
}
// 小時采樣
public void samplingInMinutes() {
// ...代碼省略
}
// 天采樣
public void samplingInHour() {
// ...代碼省略
}CallSnapshot主要字段如下:
long times; // 次數(shù)快照
long value; // 統(tǒng)計數(shù)據(jù)快照
long timestamp; //快照時間戳
StatsItemSet.addValue("test_topic", 123125123, 1)




客戶端ip->broker ip 發(fā)送消息耗時 消息數(shù)量 發(fā)送異常
創(chuàng)建一個按照最大耗時預哈希的時間跨度不同的耗時分段數(shù)組:
優(yōu)點:此種分段方法占用內(nèi)存是固定的,比如最大耗時如果為3500ms,那么只需要空間大小為96的數(shù)組即可
缺點:分段精度需要提前設(shè)定好,且不可更改
第一段:耗時范圍0ms~10ms,時間跨度為1ms。

第二組:耗時范圍11ms~100ms,時間跨度5ms。

第三組:耗時范圍101ms~3500ms,時間跨度50ms。

針對上面的分段數(shù)組,創(chuàng)建一個大小對應的AtomicLong的計數(shù)數(shù)組,支持并發(fā)統(tǒng)計:

耗時統(tǒng)計時,計算耗時對應的耗時分段數(shù)組下標,然后調(diào)用計數(shù)數(shù)組進行統(tǒng)計即可,參考下圖:
這樣,從計數(shù)數(shù)組就可以得到實時耗時統(tǒng)計,類似如下:

例如某次耗時為18ms,首先找到它所屬的區(qū)間,即歸屬于[16~20]ms之間,對應的數(shù)組下標為12。 根據(jù)第一步找到的數(shù)組下標12,獲取對應的計數(shù)數(shù)組下標12。 獲取對應的計數(shù)器進行+1操作,即表示18ms發(fā)生了一次調(diào)用。 然后定時采樣任務會每分鐘對計數(shù)數(shù)組進行快照,產(chǎn)生如下耗時數(shù)據(jù):

由于上面的耗時數(shù)據(jù)天然就是排好序的,可以很容易計算99%、90%、平均耗時等數(shù)據(jù)了。
優(yōu)點:此種分段方法占用內(nèi)存是固定的,比如最大耗時如果為3500ms,那么只需要空間大小為96的數(shù)組即可
缺點:分段精度需要提前設(shè)定好,且不可更
這樣,從計數(shù)數(shù)組就可以得到實時耗時統(tǒng)計,類似如下:

機器統(tǒng)計
關(guān)于集群狀況收集主要采用了將nmon自動放置到/tmp目錄,定時采用ssh連接到機器執(zhí)行nmon命令,解析返回的數(shù)據(jù),然后進行存儲。
上面這些工作就為監(jiān)控和預警奠定了堅實的數(shù)據(jù)基礎(chǔ)。
單獨定制的客戶端

多集群支持
MQCloud儲存了生產(chǎn)者、消費者和集群的關(guān)系,通過路由適配,客戶端可以自動路由到目標集群上,使客戶端對多集群透明。
trace
通過搭建單獨的trace集群和定制客戶端,使trace數(shù)據(jù)能夠發(fā)往獨立的集群,防止影響主集群。
序列化
通過集成不同的序列化機制,配合MQCloud,客戶端無需關(guān)心序列化問題。
目前支持的序列化為protobuf和json,并且通過類型檢測支持在線修改序列化方式。
流控
通過提供令牌桶和漏桶限流機制,自動開啟流控機制,防止消息洪峰沖垮業(yè)務端。
隔離降級
使用hystrix提供隔離降級策略,使業(yè)務端在broker故障時可以避免拖累。
埋點監(jiān)控
通過對客戶端數(shù)據(jù)進行統(tǒng)計,收集,在MQCloud里進行監(jiān)控,使客戶端任何風吹草動都能及時得知。
規(guī)范問題
通過編碼保障,使某些約定,規(guī)范和最佳實踐得以實現(xiàn)。包括但不限于:
命名規(guī)范 消費組全局唯一,防止重復導致消費問題 重試消息跳過 安全關(guān)閉等等 更完善的重試機制
部署
手動部署一臺broker實例沒什么問題,但是當實例變多時,手動部署極易出錯且耗時耗力。
MQCloud提供了一套自動化部署機制,并支持配置模板功能,支持一鍵部署。
機器運維
MQCloud提供了一整套機器的運維機制,包括上下線,機器狀況收集、監(jiān)控、預警等等,大大提升了生產(chǎn)力。
一、開啟管理員權(quán)限
RocketMQ從4.4.0開始支持ACL,但是默認沒有開啟,也就是任何人使用管理工具或API就可以直接操縱線上集群。但是開啟ACL對現(xiàn)有業(yè)務影響太大,針對這種情況MQCloud進行專門定制。
借鑒RocketMQ ACL機制,只針對RocketMQ管理員操作加固權(quán)限校驗:

并且支持自定義和熱加載管理員請求碼,使得非法操作RocketMQ集群成為不可能,安全性大大提升。
二、broker通信加固
if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
long readOffset = this.byteBufferRead.getLong(pos - 8);
this.processPostion = pos;
HAConnection.this.slaveAckOffset = readOffset;
if (HAConnection.this.slaveRequestOffset < 0) {
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
}
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
目前MQCloud運維規(guī)模如下:
服務器:50臺+ 集群:5個+ topic:700個+ 生產(chǎn)消費消息量/日:4億條+ 生產(chǎn)消費消息大小/日:400G+
MQCloud在充分考慮和吸收實際業(yè)務的需求后,以各個角色聚焦為核心,以全面監(jiān)控為目標,
以滿足各業(yè)務端需求為己任,在不斷地發(fā)展和完善。
在MQCloud逐漸成熟之后,秉承著服務于社區(qū)和吸收更多經(jīng)驗的理念,我們開放了源代碼。
四、開源之路
開放源代碼說不難也不難,說難也難。為什么這么說?
不難就是因為代碼已經(jīng)有了,只是換個倉庫而已。
而難點就是需要進行抽象設(shè)計,剝離不能開源的代碼(內(nèi)部模塊,代碼,地址等等)。
經(jīng)過設(shè)計和拆分,MQCloud于18年開源了,從第一個版本release到現(xiàn)在已經(jīng)過去兩年了,
期間隨著更新迭代大大小小一共release了20多個版本。
其中不但包含功能更新、bug修復、wiki說明等,而且每個大版本都經(jīng)過詳細的測試和內(nèi)部的運行。
之后很多小伙伴躍躍欲試,來試用它,并提出一些建議和意見,我們根據(jù)反饋來進一步完善它。
我們將一直遵循我們的目標,堅定的走自己的開源之路:
為業(yè)務提供可監(jiān)控,可預警,可滿足其各種需求的穩(wěn)定的MQ服務。 積累MQ領(lǐng)域經(jīng)驗,將經(jīng)驗轉(zhuǎn)化為產(chǎn)品,更好的服務業(yè)務。
后臺回復 學習資料 領(lǐng)取學習視頻
如有收獲,點個在看,誠摯感謝
