RocketMQ 消息消費(fèi)流程
記得點(diǎn)擊?"歡少的成長之路",?設(shè)為星標(biāo)?
后臺點(diǎn)擊【聯(lián)系我】,申請加入優(yōu)質(zhì)技術(shù)學(xué)習(xí)社群
大家好,我是Leo。
今天聊一下RocketMQ消息消費(fèi),消費(fèi)方式,消費(fèi)模式,傳送方式,過濾模式,負(fù)載均衡,重分配機(jī)制,消息拉取,并發(fā)消費(fèi)與順序消費(fèi)

消息消費(fèi)
關(guān)系圖
首先放一下Broker Cluster,Broker,Topic,Queue的關(guān)系圖。因為下文主要會沿著這四塊進(jìn)行梳理

消費(fèi)方式
消息消費(fèi)主要有兩種方式?并發(fā)消費(fèi)?和?順序消費(fèi)。
并發(fā)消費(fèi),一個隊列中的消息可同時被消費(fèi)者的多個線程并發(fā)消費(fèi) 順序消費(fèi),一個隊列中的消息同一時間只能被一個消費(fèi)者的一個線程消費(fèi),通過這種方式達(dá)到順序消費(fèi)的效果
消費(fèi)模式

源碼
/**
?*?Message?model
?*?消息模式
?*/
public?enum?MessageModel?{
????/**
?????*?broadcast
?????*?廣播
?????*/
????BROADCASTING("BROADCASTING"),
????/**
?????*?clustering
?????*?集群
?????*/
????CLUSTERING("CLUSTERING");
????private?String?modeCN;
????MessageModel(String?modeCN)?{
????????this.modeCN?=?modeCN;
????}
????public?String?getModeCN()?{
????????return?modeCN;
????}
}
消費(fèi)傳送方式

消息過濾模式
RocketMQ支持兩種消息過濾模式
表達(dá)式(TAG、SQL92) 類過濾模式

RocketMQ的消息過濾都是發(fā)生在?服務(wù)端?的,可以從下列代碼得知。

負(fù)載均衡規(guī)則
RocketMQ提供了豐富的queue均衡規(guī)則?一共6種,目前只實現(xiàn)了四種
AllocateMessageQueueAveragely:默認(rèn)均衡規(guī)則

AllocateMessageQueueAveragelyByCircle:循環(huán)平均分配。是第1種方式的變種。針對queue數(shù)量多余Consumer數(shù)量的情況下,使用循環(huán)分配規(guī)則。如有3個Consumer、5個queue,則Consumer0消費(fèi)queue0和queue3、Consumer1消費(fèi)queue1和queue4、Consumer2消費(fèi)queue2。

AllocateMessageQueueByMachineRoom:機(jī)房分配策略?。

AllocateMessageQueueConsistentHash:一致性Hash方式分配。

AllocateMessageQueueByConfig:根據(jù)配置進(jìn)行分配。未實現(xiàn)。
AllocateMachineRoomNearby:根據(jù)Consumer與Broker的距離遠(yuǎn)近進(jìn)行分配,從源碼看,該策略未完整實現(xiàn)。
重新分配機(jī)制
集群消費(fèi)模式下,RocketMQ會把所有的messageQueue按一定的負(fù)載均衡策略分配給不同的消費(fèi)者實例來消費(fèi)。
也就是當(dāng)負(fù)載均衡完成后,一個messageQueue只能被一個消費(fèi)者實例消費(fèi),一個消費(fèi)者實例可以消費(fèi)一個或多個messageQueue,這取決于兩者的數(shù)量,如圖:
Rebalance的觸發(fā)時機(jī)
消費(fèi)者啟動時主動進(jìn)行一次Rebalance 消費(fèi)者啟動后設(shè)置定時進(jìn)行Rebalance,20s/次 消費(fèi)者組實例數(shù)量發(fā)生變化時,broker通知消費(fèi)者進(jìn)行Rebalance 所訂閱的topic的messageQueue數(shù)量發(fā)生變化時、訂閱關(guān)系變化時,broker通知消費(fèi)者進(jìn)行Rebalance
Rebalance的觸發(fā)場景
消費(fèi)者啟動 消費(fèi)者擴(kuò)縮容 消費(fèi)者宕機(jī) broker擴(kuò)縮容 messageQueue數(shù)量調(diào)整 網(wǎng)絡(luò)問題導(dǎo)致客戶端 broker連接斷開
Rebalance帶來的問題
消費(fèi)暫停:只有一個Consumer時,該Consumer負(fù)責(zé)消費(fèi)所有隊列。若新增Consumer,則會觸發(fā)Rebalance,原Consumer就需要暫停部分隊列的消費(fèi)。等到這些隊列分配給新的Consumer,暫停的隊列才能繼續(xù)被消費(fèi)。
重復(fù)消費(fèi):Consumer在消費(fèi)新分配給自己的隊列時,必須接著之前的Consumer提交的消費(fèi)進(jìn)度的offset繼續(xù)消費(fèi)。默認(rèn)情況下,offset是異步提交的,就會導(dǎo)致提交到Broker的offset與Consumer實際消費(fèi)的信息不一致。就可能導(dǎo)致重復(fù)消費(fèi)。
消息突刺:由于Rebalance可能導(dǎo)致重復(fù)消費(fèi),如果重復(fù)消費(fèi)的消息過多,或者因為Rebalance暫停時間過長而導(dǎo)致積壓了部分消息。name有可能會導(dǎo)致在Rebalance結(jié)束后需要瞬間消費(fèi)很多消息。
同步提交和異步提交
同步提交:consumer提交了其消費(fèi)完畢的一批消息的offset給broker后,需要等待broker的成功ACK,收到ACK后,consumer才會繼續(xù)獲取并消費(fèi)下一批消息。在等待ACK期間,consumer是阻塞的。 異步提交:consumer提交了消費(fèi)完畢的一批消息的offset后,不需要等待不容二科的成功ack,consumer可以直接獲取并消費(fèi)下一批消息 對于一次讀取消息的數(shù)量,需要根據(jù)具體業(yè)務(wù)場景選擇一個相對均衡是很有必要的。數(shù)量過的大,產(chǎn)生重復(fù)的消息可能會增加。數(shù)量過小,系統(tǒng)性能會下降。
隊列分配流程
獲取指定 Topic 下的消息隊列集合 如果是廣播模式,則不需要進(jìn)行負(fù)載均衡,消費(fèi)者直接負(fù)責(zé)所有消息隊列 集群模式則需要獲取指定 Topic 的所有消費(fèi)者集合,根據(jù)負(fù)載均衡算法將消息隊列分配給消費(fèi)者 消息隊列分配完畢后,則需要為每個消息隊列創(chuàng)建對應(yīng)的任務(wù)隊列,即 ProcessQueue 為每個任務(wù)隊列創(chuàng)建對應(yīng)的消息拉取任務(wù),后續(xù)消息拉取服務(wù)會定時掃描任務(wù)池進(jìn)行消息拉取操作


隊列分配目的在于指定消費(fèi)者負(fù)責(zé)的隊列集合,分配前需要明確幾點(diǎn):
該 Topic 存在多少隊列 該 Topic 存在多少消費(fèi)者 隊列如何分配給消費(fèi)者,即負(fù)載均衡算法(默認(rèn)是平均分配的算法)
消息拉取
消息拉取流程
RocketMQ的消息拉取 由 PullMessageService處理。
消費(fèi)者啟動后,因集成了ServiceThread,ServiceThread又實現(xiàn)了 Runnable接口。他首先會啟動run線程。每執(zhí)行一次業(yè)務(wù)邏輯檢測一下其運(yùn)行狀態(tài),可以通過其他線程將stopped設(shè)置為true從而停止該線程。 pullRequestQueue?是一個阻塞隊列,只會在有消息之后,才會去拉取,拉取最頂部的對象,其他對象一并移出。拿到? PullRequest?之后,根據(jù)拉取請求的消費(fèi)組反查該消費(fèi)者的拉取規(guī)則。拿到了拉取規(guī)則后通過? pullMessage?函數(shù)獲取?processQueue?隊列消費(fèi)快照,并且檢查是否被dropped,修改最后拉取時間,檢查服務(wù)是否正常,消費(fèi)者是否暫停等執(zhí)行流控,判斷緩存消息數(shù)量是否超過閾值,緩存消息大小是否超過閾值,緩存消息跨度是否超過閾值 獲取? processQueue?鎖,判斷broker是否上鎖,上了就拉,沒上就等待。通過? pullRequest 的 messageQueue?計算拉取偏移量,判斷當(dāng)前偏移量是否小于拉請求的下一個坐標(biāo),如果偏移量大于拉請求的下標(biāo)就代表broker繁忙。對之前加鎖并且初始化拉請求的最新下標(biāo)再根據(jù)拉請求中的消息隊列,取消息隊列中的主題信息,根據(jù)? RebalanceImpl?實現(xiàn)類并且按照用戶的負(fù)載均衡規(guī)則去查詢訂閱數(shù)據(jù)。在? PullCallback?回調(diào)中,根據(jù)狀態(tài)進(jìn)行相應(yīng)的處理(狀態(tài)可以從下方代碼中查看 PullStatus)構(gòu)建消息拉取系統(tǒng)標(biāo)記(標(biāo)記可以從下方代碼中查看PullSysFlag) 最后調(diào)用PullAPIWrapper.pullKernelImpl方法后與服務(wù)端交互

PullRequestQueue:阻塞隊列,存放的是拉請求 PullRequest
ProcessQueue:消費(fèi)進(jìn)度,消息總數(shù)量等一些核心的數(shù)據(jù)都在這里
PullRequest:拉請求,封裝了消費(fèi)者,消息隊列,隊列消費(fèi)快照,下一個下標(biāo),以前鎖定等
第八步中 如果為空,結(jié)束本次消息拉取,關(guān)于該隊列的下一次拉取任務(wù)延遲3s。
消息拉取回調(diào)
消息拉取完畢的后續(xù)處理邏輯:
如果成功拉取到消息,則將消息加入到待處理任務(wù)隊列 ProcessQueue,并提交一個消費(fèi)請求給 ConsumeMessageService,提交下一次消息拉取任務(wù) 如果沒有成功拉取到消息,則根據(jù)服務(wù)端返回的 Offset 進(jìn)行校正處理,重新提交消息拉取任務(wù)

PullRequest
在RocketMQ根據(jù)PullRequest拉取任務(wù)執(zhí)行完一次消息拉取任務(wù)后,又將PullRequest對象放入到pullRequestQueue 在RebalancceImpl中創(chuàng)建。RebalanceImpl就是消息隊列負(fù)載機(jī)制,也就是PullRequest對象真正創(chuàng)建的地方
ProcessQueue
ProcessQueue是MessageQueue在消費(fèi)端的重現(xiàn)、快照。PullMessageService從消息服務(wù)器默認(rèn)每次拉取32條消息,按消息的隊列偏移量順序存放在ProcessQueue中,PullMessageService然后將消息提交到消費(fèi)者消費(fèi)線程池,消息成功消費(fèi)后從ProcessQueue中移除
pullKernelImpl
下列參數(shù)詳解
MessageQueue mq:從哪個消息消費(fèi)隊列拉取消息。 String subExpression:消息過濾表達(dá)式。 String expressionType:消息表達(dá)式類型,分為TAG、SQL92。 long offset:消息拉取偏移量。 int maxNums:本次拉取最大消息條數(shù),默認(rèn)32條。 int sysFlag:拉取系統(tǒng)標(biāo)記。 long commitOffset:當(dāng)前MessageQueue的消費(fèi)進(jìn)度(內(nèi)存中)。 long brokerSuspendMaxTimeMillis:消息拉取過程中允許Broker掛起時間,默認(rèn)15s。 long timeoutMillis:消息拉取超時時間。 CommunicationMode communicationMode:消息拉取模式,默認(rèn)為異步拉取。 PullCallback pullCallback:從Broker拉取到消息后的回調(diào)方法。
執(zhí)行流程如下
根據(jù)brokerName、BrokerId從MQClientInstance中獲取Broker地址, 如果消息過濾模式為類過濾,則需要根據(jù)主題名稱、broker地址找到注冊在Broker上的FilterServer地址,從FilterServer上拉取消息,否則從Broker上拉取消息。 通過MQClientAPIImpl#pullMessageAsync方法異步向Broker拉取消息。
在整個RocketMQ Broker的部署結(jié)構(gòu)中,相同名稱的Broker構(gòu)成主從結(jié)構(gòu),其BrokerId會不一樣,在每次拉取消息后,會給出一個建議,下次拉取從主節(jié)點(diǎn)還是從節(jié)點(diǎn)拉取
源碼
//?PullMessageService?啟動后的?run?函數(shù)
public?void?run()?{
????log.info(this.getServiceName()?+?"?service?started");
????while?(!this.isStopped())?{
????????try?{
????????????//使用BlockingQueue阻塞隊列,當(dāng)提交了消息拉取請求后,馬上執(zhí)行
????????????//?take?移除所有隊列,返回最頂部?隊列中的對象(拉請求)
????????????PullRequest?pullRequest?=?this.pullRequestQueue.take();
????????????this.pullMessage(pullRequest);
????????}?catch?(InterruptedException?ignored)?{
????????}?catch?(Exception?e)?{
????????????log.error("Pull?Message?Service?Run?Method?exception",?e);
????????}
????}
????log.info(this.getServiceName()?+?"?service?end");
}
//?stopped?為true的函數(shù)
public?void?shutdown(final?boolean?interrupt)?{
????log.info("Try?to?shutdown?service?thread:{}?started:{}?lastThread:{}",?getServiceName(),?started.get(),?thread);
????if?(!started.compareAndSet(true,?false))?{
????????return;
????}
????this.stopped?=?true;
????log.info("shutdown?thread?"?+?this.getServiceName()?+?"?interrupt?"?+?interrupt);
????if?(hasNotified.compareAndSet(false,?true))?{
????????//?此函數(shù)將遞減(減一)鎖存器的計數(shù),如果計數(shù)到達(dá)零,則釋放所有等待的線程
????????waitPoint.countDown();?//?notify
????}
}
/**
?*?拉取狀態(tài)
?*/
public?enum?PullStatus?{
????/**
?????*?Founded
?????*?建立
?????*/
????FOUND,
????/**
?????*?No?new?message?can?be?pull
?????*?無法拉取任何新消息
?????*/
????NO_NEW_MSG,
????/**
?????*?Filtering?results?can?not?match
?????*?過濾結(jié)果不匹配
?????*/
????NO_MATCHED_MSG,
????/**
?????*?Illegal?offset,may?be?too?big?or?too?small
?????*?非法偏移,可能太大或太小
?????*/
????OFFSET_ILLEGAL
}
/**
?*?消息拉取系統(tǒng)標(biāo)記
?*/
public?class?PullSysFlag?{
????/**
?????*?表示從內(nèi)存中讀取的消費(fèi)進(jìn)度大于0,則設(shè)置該標(biāo)記位。
?????*/
????private?final?static?int?FLAG_COMMIT_OFFSET?=?0x1;
????/**
?????*?表示消息拉取時支持掛起。
?????*/
????private?final?static?int?FLAG_SUSPEND?=?0x1?<1;
????/**
?????*?消息過濾機(jī)制為表達(dá)式,則設(shè)置該標(biāo)記位。
?????*/
????private?final?static?int?FLAG_SUBSCRIPTION?=?0x1?<2;
????/**
?????*?消息過濾機(jī)制為類過濾模式
?????*/
????private?final?static?int?FLAG_CLASS_FILTER?=?0x1?<3;
????private?final?static?int?FLAG_LITE_PULL_MESSAGE?=?0x1?<4;
}
消息消費(fèi)
消息消費(fèi)的大概流程
消息生產(chǎn)者把消息發(fā)送并存儲到 Rocket MQ 的 broker 上,NameServer 用來發(fā)現(xiàn)和更新 broker。 消費(fèi)者啟動時會啟動 PullMessageService 線程,PullMessageService 線程不斷地從內(nèi)部的隊列中取 PullRequest,然后使用 PullRequest 作為請求去拉取消息。 PullRequest 中的消息處理隊列 ProcessQueue 是 MessageQueue 在消費(fèi)端的重現(xiàn)、快照。PullMessageService 使用消費(fèi)者(DefaultMQPushConsumerImpl)從消息服務(wù)器默認(rèn)每次拉取 32 條消息,按消息的隊列偏移量存放在 ProcessQueue 中,然后消費(fèi)者再將消息提交到消息消費(fèi)線程池中(提交 ConsumeRequest),消息成功消費(fèi)后從 ProcessQueue 中移除。
第二步中
拿到消費(fèi)請求后,消費(fèi)請求里面肯定帶著topic ,queueId, offset,取多少個這四個重要信息,然后獲取到對應(yīng)topic對應(yīng)queueId的consumeQueue,然后定位到offset位置處,往下取出你要個數(shù)的信息。這里舉個例子,比如說有一個topic是xxx,然后有2個queue,這個時候我們消息消費(fèi)者 發(fā)起消費(fèi)請求,要消費(fèi)topic是xxx,queueId是0 ,然后offset =3開始拉取,拉取大小maxMsgNums=2,就是下圖這個樣子

接著獲取到3,4這兩個之后,遍歷,根據(jù)它們里面的commitlog offset 再去commitlog獲取到對應(yīng)真實的消息。

消息拉取完之后,會提交一個消費(fèi)任務(wù)給 ConsumeMessageService 進(jìn)行處理。ConsumeMessageService 有兩個實現(xiàn)類:
并發(fā)處理,對應(yīng)實現(xiàn)類為 ConsumeMessageConcurrentlyService 順序處理,對應(yīng)實現(xiàn)類為 ConsumeMessageOrderlyService
并發(fā)消費(fèi)
并發(fā)消費(fèi)的主要方法是?submitConsumeRequest?邏輯如下
consumeMessageBatchMaxSize,消息批次,在這里看來也就是一次消息消費(fèi)任務(wù)ConsumeRequest中包含的消息條數(shù),默認(rèn)為1, msgs.size()默認(rèn)最多為32條,受DefaultMQPushConsumer.pullBatchSize屬性控制,如果msgs.size()小于consume Message-BatchMaxSize,則直接將拉取到的消息放入到ConsumeRequest中,然后將consumeRequest提交到消息消費(fèi)者線程池中,如果提交過程中出現(xiàn)拒絕提交異常則延遲5s再提交,這里其實是給出一種標(biāo)準(zhǔn)的拒絕提交實現(xiàn)方式,實際過程中由于消費(fèi)者線程池使用的任務(wù)隊列為LinkedBlockingQueue無界隊列,故不會出現(xiàn)拒絕提交異常。 如果拉取的消息條數(shù)大于consumeMessageBatchMaxSize,則對拉取消息進(jìn)行分頁,每頁consumeMessageBatchMaxSize條消息,創(chuàng)建多個ConsumeRequest任務(wù)并提交到消費(fèi)線程池。ConsumeRequest的run方法封裝了具體消息消費(fèi)邏輯。

進(jìn)入具體消息消費(fèi)時會先檢查processQueue的dropped,如果設(shè)置為true,則停止該隊列的消費(fèi),在進(jìn)行消息重新負(fù)載時如果該消息隊列被分配給消費(fèi)組內(nèi)其他消費(fèi)者后,需要droped設(shè)置為true,阻止消費(fèi)者繼續(xù)消費(fèi)不屬于自己的消息隊列。 執(zhí)行消息消費(fèi)鉤子函數(shù)ConsumeMessageHook#consumeMessageBefore函數(shù),通過consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(hook),方法消息消費(fèi)執(zhí)行鉤子函數(shù)。 恢復(fù)重試消息主題名。這是為什么呢?這是由消息重試機(jī)制決定的,RocketMQ將消息存入commitlog文件時,如果發(fā)現(xiàn)消息的延時級別delayTimeLevel大于0,會首先將重試主題存入在消息的屬性中,然后設(shè)置主題名稱為SCHEDULE_TOPIC,以便時間到后重新參與消息消費(fèi)。 執(zhí)行具體的消息消費(fèi),調(diào)用應(yīng)用程序消息監(jiān)聽器的consumeMessage方法,進(jìn)入到具體的消息消費(fèi)業(yè)務(wù)邏輯,返回該批消息的消費(fèi)結(jié)果。最終將返回CONSUME_SUCCESS(消費(fèi)成功)或RECONSUME_LATER(需要重新消費(fèi))。 執(zhí)行消息消費(fèi)鉤子函數(shù)ConsumeMessageHook#consumeMessageAfter函數(shù) 執(zhí)行業(yè)務(wù)消息消費(fèi)后,在處理結(jié)果前再次驗證一下ProcessQueue的isDroped狀態(tài)值,如果設(shè)置為true,將不對結(jié)果進(jìn)行處理,也就是說如果在消息消費(fèi)過程中進(jìn)入到第四步時,如果由于由新的消費(fèi)者加入或原先的消費(fèi)者出現(xiàn)宕機(jī)導(dǎo)致原先分給消費(fèi)者的隊列在負(fù)載之后分配給別的消費(fèi)者,那么在應(yīng)用程序的角度來看的話,消息會被重復(fù)消費(fèi)。 根據(jù)消息監(jiān)聽器返回的結(jié)果,計算ackIndex,如果返回CONSUME_SUCCESS, ackIndex設(shè)置為msgs.size() - 1,如果返回RECONSUME_LATER, ackIndex=-1,這是為下文發(fā)送msg back(ACK)消息做準(zhǔn)備的。 如果是集群模式,業(yè)務(wù)方返回RECONSUME_LATER,消息并不會重新被消費(fèi),只是以警告級別輸出到日志文件。如果是集群模式,消息消費(fèi)成功,由于ackIndex=consumeRequest.getMsgs().size()-1,故i=ackIndex+1等于consumeRequest.getMsgs().size(),并不會執(zhí)行sendMessageBack。只有在業(yè)務(wù)方返回RECONSUME_LATER時,該批消息都需要發(fā)ACK消息,如果消息發(fā)送ACK失敗,則直接將本批ACK消費(fèi)發(fā)送失敗的消息再次封裝為ConsumeRequest,然后延遲5s后重新消費(fèi)。如果ACK消息發(fā)送成功,則該消息會延遲消費(fèi)。 從ProcessQueue中移除這批消息,這里返回的偏移量是移除該批消息后最小的偏移量,然后用該偏移量更新消息消費(fèi)進(jìn)度,以便在消費(fèi)者重啟后能從上一次的消費(fèi)進(jìn)度開始消費(fèi),避免消息重復(fù)消費(fèi)。值得重點(diǎn)注意的是當(dāng)消息監(jiān)聽器返回RECONSUME_LATER,消息消費(fèi)進(jìn)度也會向前推進(jìn),用ProcessQueue中最小的隊列偏移量調(diào)用消息消費(fèi)進(jìn)度存儲器OffsetStore更新消費(fèi)進(jìn)度,這是因為當(dāng)返回RECONSUME_LATER, RocketMQ會創(chuàng)建一條與原先消息屬性相同的消息,擁有一個唯一的新msgId,并存儲原消息ID,該消息會存入到commitlog文件中,與原先的消息沒有任何關(guān)聯(lián),那該消息當(dāng)然也會進(jìn)入到ConsuemeQueue隊列中,將擁有一個全新的隊列偏移量。
在段落最后,會附上消費(fèi)成功后提交消費(fèi)進(jìn)度的過程,重置消費(fèi)進(jìn)度的過程

順序消費(fèi)
RocketMQ 實現(xiàn)順序消費(fèi)的思路比較簡單,在默認(rèn)的情況下消息發(fā)送會采取Round Robin輪詢方式把消息發(fā)送到不同的queue(分區(qū)隊列);而消費(fèi)消息的時候從多個queue上拉取消息,這種情況發(fā)送和消費(fèi)是不能保證順序。但是如果控制發(fā)送的順序消息只依次發(fā)送到同一個queue中,消費(fèi)的時候只從這個queue上依次拉取,則就保證了順序。當(dāng)發(fā)送和消費(fèi)參與的queue只有一個,則是全局有序;如果多個queue參與,則為分區(qū)有序,即相對每個queue,消息都是有序的。
下面列舉兩種無法保證順序消費(fèi)的場景:
消費(fèi)者A正在消費(fèi)隊列A的消息,此時消費(fèi)者B發(fā)生了隊列的負(fù)載均衡,也分配到了隊列A,在同一時間相當(dāng)于有兩個消費(fèi)者可以同時消費(fèi)一個隊列的消息 當(dāng)前隊列A由一個消費(fèi)者A負(fù)責(zé),但消費(fèi)者A內(nèi)部可以進(jìn)行并發(fā)消費(fèi),即多個消費(fèi)線程同時消費(fèi)隊列A的消息
因此還需要結(jié)合鎖的機(jī)制來實現(xiàn)順序消費(fèi):
同一時間一個隊列只能分配給一個消費(fèi)者,通過給 Broker 端隊列上鎖實現(xiàn) 同一時間一個隊列只能有一個消費(fèi)線程進(jìn)行消費(fèi),通過給本地隊列上鎖實現(xiàn)
在隊列負(fù)載均衡階段,如果是順序消費(fèi),會向 Broker 發(fā)起隊列加鎖請求,如果加鎖成功則創(chuàng)建對應(yīng)的任務(wù)隊列及消息拉取請求,反之不創(chuàng)建。
ConsumeMessageOrderlyService 在啟動后會定時向 Broker 發(fā)送隊列加鎖的請求,目的是續(xù)期鎖。
具體的加鎖操作如下:
獲取消費(fèi)者負(fù)責(zé)的消息隊列集合 HashMap 依次對每個 Broker 下的消息隊列進(jìn)行加鎖操作,Broker 會響應(yīng)加鎖成功的消息隊列集合 如果消息隊列加鎖成功,則將本地對應(yīng)的任務(wù)隊列設(shè)置為加鎖成功的狀態(tài);反之則設(shè)置成加鎖失敗狀態(tài)
消費(fèi)的過程中則通過對本地隊列加鎖來實現(xiàn)同一時間一個隊列只能有一個消費(fèi)線程進(jìn)行消費(fèi)。
看到消費(fèi)任務(wù) ConsumeRequest 的定義, 它是 ConsumeMessageOrderlyService 的內(nèi)部類,不同于之前并發(fā)消費(fèi)的任務(wù),可以看到主要區(qū)別在于消費(fèi)時增加了本地隊列的加鎖操作,以及鎖狀態(tài)的校驗。
順序消費(fèi)時如果消費(fèi)失敗,會直接將消息放回任務(wù)隊列中等待重新消費(fèi),且重試次數(shù)默認(rèn)是 Integer.MAX_VALUE
提交進(jìn)度
消費(fèi)成功后提交消費(fèi)進(jìn)度的過程

重置消費(fèi)進(jìn)度的過程

二者共同點(diǎn):
? 都是由Broker統(tǒng)一管理消費(fèi)者的消費(fèi)進(jìn)度
? 都需要由消費(fèi)者“主動上報”最新的消費(fèi)進(jìn)度
二者的差異點(diǎn):
? 正常消費(fèi)時提交消費(fèi)進(jìn)度,一般消費(fèi)進(jìn)度是向前推進(jìn)
? 重置消費(fèi)進(jìn)度時提交消費(fèi)進(jìn)度,消費(fèi)進(jìn)度可能向前推進(jìn),也可能向后回溯
往期推薦
Redis事務(wù)機(jī)制ACID的實現(xiàn),Redis主從同步的實戰(zhàn)細(xì)節(jié)問題
緩存預(yù)熱,Redis單線程為什么那么快,過期策略,過期機(jī)制,緩存一致性
Redis數(shù)據(jù)結(jié)構(gòu),rehash,漸進(jìn)式rehash,AOF,RDB實現(xiàn)原理
MySQL千萬數(shù)據(jù)調(diào)研,order by 原理分析
為什么就查了一行數(shù)據(jù),執(zhí)行那么慢?
為什么需要消息隊列?應(yīng)用場景?MQ的技術(shù)選型分析?主題和隊列的實現(xiàn)原理與流程
結(jié)尾
非常歡迎大家加我個人微信有關(guān)后端方面的問題我們在群內(nèi)一起討論!?我們下期再見!
分析不對的地方,還請指出一起修補(bǔ)
負(fù)載均衡規(guī)則部分圖片參考博主:CSDN每天都要進(jìn)步一點(diǎn)點(diǎn)
資料參考:RocketMQ官方源碼,《RocketMQ技術(shù)內(nèi)幕》
歡迎『點(diǎn)贊』、『在看』、『轉(zhuǎn)發(fā)』三連支持一下,下次見~

