RocketMQ核心概念掃盲篇
點(diǎn)擊上方“中間件興趣圈”,選擇“設(shè)為星標(biāo)”
在正式進(jìn)入RocketMQ的學(xué)習(xí)之前,我覺得有必要梳理一下RocketMQ核心概念,為大家學(xué)習(xí)RocketMQ打下牢固的基礎(chǔ)。
1、RocketMQ部署架構(gòu)

Nameserver
Nameserver集群,topic的路由注冊(cè)中心,為客戶端根據(jù)Topic提供路由服務(wù),從而引導(dǎo)客戶端向Broker發(fā)送消息。Nameserver之間的節(jié)點(diǎn)不通信。路由信息在Nameserver集群中數(shù)據(jù)一致性采取的最終一致性。
Broker
消息存儲(chǔ)服務(wù)器,分為兩種角色:Master與Slave,上圖中呈現(xiàn)的就是2主2從的部署架構(gòu),在RocketMQ中,主服務(wù)承擔(dān)讀寫操作,從服務(wù)器作為一個(gè)備份,當(dāng)主服務(wù)器存在壓力時(shí),從服務(wù)器可以承擔(dān)讀服務(wù)(消息消費(fèi))。所有Broker,包含Slave服務(wù)器每隔30s會(huì)向Nameserver發(fā)送心跳包,心跳包中會(huì)包含存在在Broker上所有的topic的路由信息。
Client
消息客戶端,包括Producer(消息發(fā)送者)和Consumer(消費(fèi)消費(fèi)者).客戶端在同一時(shí)間只會(huì)連接一臺(tái)nameserver,只有在連接出現(xiàn)異常時(shí)才會(huì)嘗試連接另外一臺(tái)。客戶端每隔30s向Nameserver發(fā)起topic的路由信息查詢。
溫馨提示:Nameserver是在內(nèi)存中存儲(chǔ)Topic的路由信息,持久化Topic路由信息的地方是在Broker中,即${ ? ?ROCKETMQ_HOME}/store/config/topics.json。
在RocketMQ4.5.0版本后引入了多副本機(jī)制,即一個(gè)復(fù)制組(m-s)可以演變?yōu)榛趓aft協(xié)議的復(fù)制組,復(fù)制組內(nèi)部使用raft協(xié)議保證broker節(jié)點(diǎn)數(shù)據(jù)的強(qiáng)一致性,該部署架構(gòu)在金融行業(yè)用的比較多。
2、消息訂閱模型
在RocketMQ的消息消費(fèi)模式采用的是發(fā)布與訂閱模式。
topic:一類消息的集合,消息發(fā)送者將一類消息發(fā)送到一個(gè)主題中,例如訂單模塊將訂單發(fā)送到 order_topic 中,而用戶登錄時(shí),將登錄事件發(fā)送到 user_login_topic 中。
consumegroup:消息消費(fèi)組,一個(gè)消費(fèi)單位的“群體”,消費(fèi)組首先在啟動(dòng)時(shí)需要訂閱需要消費(fèi)的topic。一個(gè)topic可以被多個(gè)消費(fèi)組訂閱,同樣一個(gè)消費(fèi)組也可以訂閱多個(gè)主題。一個(gè)消費(fèi)組擁有多個(gè)消費(fèi)者。
術(shù)語(yǔ)解釋起來有點(diǎn)枯燥晦澀,接下來我舉例來闡述。
例如我們?cè)陂_發(fā)一個(gè)訂單系統(tǒng),其中有一個(gè)子系統(tǒng):order-service-app,在該項(xiàng)目中會(huì)創(chuàng)建一個(gè)消費(fèi)組order_consumer來訂閱 order_topic,并且基于分布式部署,order-service-app的部署情況如下:

2.1 消費(fèi)模式
那這三個(gè)消費(fèi)者如何來分工來共同消費(fèi) order_topic 中的消息呢?
在RocketMQ中支持廣播模式與集群模式。
廣播模式:一個(gè)消費(fèi)組內(nèi)的所有消費(fèi)者每一個(gè)都會(huì)處理topic中的每一條消息,通常用于刷新內(nèi)存緩存。
集群模式:一個(gè)消費(fèi)組內(nèi)的所有消費(fèi)者共同消費(fèi)一個(gè)topic中的消息,即分工協(xié)作,一個(gè)消費(fèi)者消費(fèi)一部分?jǐn)?shù)據(jù),啟動(dòng)負(fù)載均衡,
集群模式是非常普遍的模式,符合分布式架構(gòu)的基本理念,即橫向擴(kuò)容,當(dāng)前消費(fèi)者如果無法快速及時(shí)處理消息時(shí),可以通過增加消費(fèi)者的個(gè)數(shù)橫向擴(kuò)容,快速提高消費(fèi)能力,及時(shí)處理擠壓的消息。
2.2 消費(fèi)隊(duì)列負(fù)載算法與重平衡機(jī)制
那集群模式下,消費(fèi)者是如何來分配消息的呢?
例如上面實(shí)例中order_topic有16個(gè)隊(duì)列,那一個(gè)擁有3個(gè)消費(fèi)者的消費(fèi)組如何來分配隊(duì)列中。
在MQ領(lǐng)域有一個(gè)不成文的約定:同一個(gè)消費(fèi)者同一時(shí)間可以分配多個(gè)隊(duì)列,但一個(gè)隊(duì)列同一時(shí)間只會(huì)分配給一個(gè)消費(fèi)者。
RocketMQ提供了眾多的隊(duì)列負(fù)載算法,其中最常用的兩種平均分配算法。
AllocateMessageQueueAveragely
平均分配
AllocateMessageQueueAveragelyByCircle
輪流平均分配
為了說明這兩種分配算法的分配規(guī)則,現(xiàn)在對(duì)16個(gè)隊(duì)列,進(jìn)行編號(hào),用q0~q15表示,消費(fèi)者用c0~c2表示。
AllocateMessageQueueAveragely分配算法的隊(duì)列負(fù)載機(jī)制如下:
c0:q0 q1 q2 q3 q4 q5
c1: ? q6 q7 q8 q9 q10
c2: ? ?q11 q12 q13 q14 q15
其算法的特點(diǎn)是用總數(shù)除以消費(fèi)者個(gè)數(shù),余數(shù)按消費(fèi)者順序分配給消費(fèi)者,故c0會(huì)多分配一個(gè)隊(duì)列,而且隊(duì)列分配是連續(xù)的。
AllocateMessageQueueAveragelyByCircle分配算法的隊(duì)列負(fù)載機(jī)制如下:
c0:q0 ?q3 q6 q9 q12 q15
c1: ? q1 ? q4 q7 q10 q13
c2: ? ?q2 ? q5 q8 q11 q14
該分配算法的特點(diǎn)就是輪流一個(gè)一個(gè)分配。
溫馨提示:如果topic的隊(duì)列個(gè)數(shù)小于消費(fèi)者的個(gè)數(shù),那有些消費(fèi)者無法分配到消息。在RocketMQ中一個(gè)topic的隊(duì)列數(shù)直接決定了最大消費(fèi)者的個(gè)數(shù),但topic隊(duì)列個(gè)數(shù)的增加對(duì)RocketMQ的性能不會(huì)產(chǎn)生影響。
在實(shí)際過程中,對(duì)主題進(jìn)行擴(kuò)容(增加隊(duì)列個(gè)數(shù))或者對(duì)消費(fèi)者進(jìn)行擴(kuò)容、縮容是一件非常尋常的事情,那如果新增一個(gè)消費(fèi)者,該消費(fèi)者消費(fèi)哪些隊(duì)列呢?這就涉及到消息消費(fèi)隊(duì)列的重新分配,即消費(fèi)隊(duì)列重平衡機(jī)制。
在RocketMQ客戶端中會(huì)每隔20s去查詢當(dāng)前topic的所有隊(duì)列、消費(fèi)者的個(gè)數(shù),運(yùn)用隊(duì)列負(fù)載算法進(jìn)行重新分配,然后與上一次的分配結(jié)果進(jìn)行對(duì)比,如果發(fā)生了變化,則進(jìn)行隊(duì)列重新分配;如果沒有發(fā)生變化,則忽略。
例如采取的分配算法如下圖所示,現(xiàn)在增加一個(gè)消費(fèi)者c3,那隊(duì)列的分布情況是怎樣的呢?

c0:q0 q1 q2 q3
c1: ? q4 q5 q6 q7
c2: ? ?q8 q9 q10 q11
c3: ? ?q12 q13 q14 ?q15
上述整個(gè)過程無需應(yīng)用程序干預(yù),由RocketMQ完成。大概的做法就是將將原先分配給自己但這次不屬于的隊(duì)列進(jìn)行丟棄,新分配的隊(duì)列則創(chuàng)建新的拉取任務(wù)。
2.3 消費(fèi)進(jìn)度
消費(fèi)者消費(fèi)一條消息后需要記錄消費(fèi)的位置,這樣在消費(fèi)端重啟的時(shí)候,繼續(xù)從上一次消費(fèi)的位點(diǎn)開始進(jìn)行處理新的消息。在RocketMQ中,消息消費(fèi)位點(diǎn)的存儲(chǔ)是以消費(fèi)組為單位的。
集群模式下,消息消費(fèi)進(jìn)度存儲(chǔ)在broker端,
不能識(shí)別此Latex公式:
?{ ROCKETMQ_HOME }/store/config/consumerOffset.json 是其具體的存儲(chǔ)文件,其中內(nèi)容截圖如下:
可見消費(fèi)進(jìn)度的Key為:topic@consumeGroup,然后每一個(gè)隊(duì)列一個(gè)偏移量。
廣播模式的消費(fèi)進(jìn)度文件存儲(chǔ)在用戶的主目錄,默認(rèn)文件全路勁名:
{ USER_HOME }/.rocketmq_offsets。2.4 消費(fèi)模型
RocketMQ提供了并發(fā)消費(fèi)、順序消費(fèi)兩種消費(fèi)模型。
并發(fā)消費(fèi):對(duì)一個(gè)隊(duì)列中消息,每一個(gè)消費(fèi)者內(nèi)部都會(huì)創(chuàng)建一個(gè)線程池,對(duì)隊(duì)列中的消息多線程處理,即偏移量大的消息比偏移量小的消息有可能先消費(fèi)。
順序消費(fèi):在某一項(xiàng)場(chǎng)景,例如MySQL binlog 場(chǎng)景,需要消息按順序進(jìn)行消費(fèi)。在RocketMQ中提供了基于隊(duì)列的順序消費(fèi)模型,即盡管一個(gè)消費(fèi)組中的消費(fèi)者會(huì)創(chuàng)建一個(gè)多線程,但針對(duì)同一個(gè)Queue,會(huì)加鎖。
溫馨提示:并發(fā)消費(fèi)模型中,消息消費(fèi)失敗默認(rèn)會(huì)重試16次,每一次的間隔時(shí)間不一樣;而順序消費(fèi),如果一條消息消費(fèi)失敗,則會(huì)一直消費(fèi),直到消費(fèi)成功。故在順序消費(fèi)的使用過程中,應(yīng)用程序需要區(qū)分系統(tǒng)異常、業(yè)務(wù)異常,如果是不符合業(yè)務(wù)規(guī)則導(dǎo)致的異常,則重試多少次都無法消費(fèi)成功,這個(gè)時(shí)候一定要告警機(jī)制,及時(shí)進(jìn)行人為干預(yù),否則消費(fèi)會(huì)積壓。
3、事務(wù)消息
事務(wù)消息并不是為了解決分布式事務(wù),而是提供消息發(fā)送與業(yè)務(wù)落庫(kù)的一致性,其實(shí)現(xiàn)原理就是一次分布式事務(wù)的具體運(yùn)用,請(qǐng)看如下示例:

溫馨提示,本節(jié)主要的目的是讓大家知曉各個(gè)術(shù)語(yǔ)的概念,由于事務(wù)消息的使用,將在該專欄的后續(xù)文章中詳細(xì)介紹。
4、定時(shí)消息
開源版本的RocketMQ目前并不支持任意精度的定時(shí)消息。所謂的定時(shí)消息就是將消息發(fā)送到Broker,但消費(fèi)端不會(huì)立即消費(fèi),而是要到指定延遲時(shí)間后才能被消費(fèi)端消費(fèi)。
RocketMQ目前支持指定級(jí)別的延遲,其延遲級(jí)別如下:
1s?5s?10s?30s?1m?2m?3m?4m?5m?6m?7m?8m?9m?10m?20m?30m?1h?2h
5、消息過濾
消息過濾是指消費(fèi)端可以根據(jù)某些條件對(duì)一個(gè)topic中的消息進(jìn)行過濾,即只消費(fèi)一個(gè)主題下滿足過濾條件的消息。
RocketMQ目前主要的過濾機(jī)制是基于tag的過濾與基于消息屬性的過濾,基于消息屬性的過濾支持SQL92表達(dá)式,對(duì)消息進(jìn)行過濾。
6、小結(jié)
本文的主要目的是介紹RocketMQ常見的術(shù)語(yǔ),例如nameserver、broker、主題、消費(fèi)組、消費(fèi)者、隊(duì)列負(fù)載算法、隊(duì)列重平衡機(jī)制、并發(fā)消費(fèi)、順序消費(fèi)、消費(fèi)進(jìn)度存儲(chǔ)、定時(shí)消息、事務(wù)消息、消息過濾等基本概念,為后續(xù)的實(shí)戰(zhàn)系列打下堅(jiān)實(shí)基礎(chǔ)。

