<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RocketMQ 消息消費(fèi)流程

          共 6187字,需瀏覽 13分鐘

           ·

          2022-05-30 11:09

          記得點(diǎn)擊?"歡少的成長之路",?設(shè)為星標(biāo)?

          后臺點(diǎn)擊【聯(lián)系我】,申請加入優(yōu)質(zhì)技術(shù)學(xué)習(xí)社群

          大家好,我是Leo。

          今天聊一下RocketMQ消息消費(fèi),消費(fèi)方式,消費(fèi)模式,傳送方式,過濾模式,負(fù)載均衡,重分配機(jī)制,消息拉取,并發(fā)消費(fèi)與順序消費(fèi)



          消息消費(fèi)

          關(guān)系圖

          首先放一下Broker Cluster,Broker,Topic,Queue的關(guān)系圖。因為下文主要會沿著這四塊進(jìn)行梳理

          消費(fèi)方式

          消息消費(fèi)主要有兩種方式?并發(fā)消費(fèi)?和?順序消費(fèi)

          • 并發(fā)消費(fèi),一個隊列中的消息可同時被消費(fèi)者的多個線程并發(fā)消費(fèi)
          • 順序消費(fèi),一個隊列中的消息同一時間只能被一個消費(fèi)者的一個線程消費(fèi),通過這種方式達(dá)到順序消費(fèi)的效果

          消費(fèi)模式

          源碼

          /**
          ?*?Message?model
          ?*?消息模式
          ?*/

          public?enum?MessageModel?{
          ????/**
          ?????*?broadcast
          ?????*?廣播
          ?????*/

          ????BROADCASTING("BROADCASTING"),
          ????/**
          ?????*?clustering
          ?????*?集群
          ?????*/

          ????CLUSTERING("CLUSTERING");

          ????private?String?modeCN;

          ????MessageModel(String?modeCN)?{
          ????????this.modeCN?=?modeCN;
          ????}

          ????public?String?getModeCN()?{
          ????????return?modeCN;
          ????}
          }

          消費(fèi)傳送方式

          消息過濾模式

          RocketMQ支持兩種消息過濾模式

          • 表達(dá)式(TAG、SQL92)
          • 類過濾模式

          RocketMQ的消息過濾都是發(fā)生在?服務(wù)端?的,可以從下列代碼得知。

          負(fù)載均衡規(guī)則

          RocketMQ提供了豐富的queue均衡規(guī)則?一共6種,目前只實現(xiàn)了四種

          • AllocateMessageQueueAveragely:默認(rèn)均衡規(guī)則

          • AllocateMessageQueueAveragelyByCircle:循環(huán)平均分配。是第1種方式的變種。針對queue數(shù)量多余Consumer數(shù)量的情況下,使用循環(huán)分配規(guī)則。如有3個Consumer、5個queue,則Consumer0消費(fèi)queue0和queue3、Consumer1消費(fèi)queue1和queue4、Consumer2消費(fèi)queue2。

          • AllocateMessageQueueByMachineRoom:機(jī)房分配策略?。

          • AllocateMessageQueueConsistentHash:一致性Hash方式分配

          AllocateMessageQueueByConfig:根據(jù)配置進(jìn)行分配。未實現(xiàn)。

          AllocateMachineRoomNearby:根據(jù)Consumer與Broker的距離遠(yuǎn)近進(jìn)行分配,從源碼看,該策略未完整實現(xiàn)。

          重新分配機(jī)制

          集群消費(fèi)模式下,RocketMQ會把所有的messageQueue按一定的負(fù)載均衡策略分配給不同的消費(fèi)者實例來消費(fèi)。

          也就是當(dāng)負(fù)載均衡完成后,一個messageQueue只能被一個消費(fèi)者實例消費(fèi),一個消費(fèi)者實例可以消費(fèi)一個或多個messageQueue,這取決于兩者的數(shù)量,如圖:

          Rebalance的觸發(fā)時機(jī)

          • 消費(fèi)者啟動時主動進(jìn)行一次Rebalance
          • 消費(fèi)者啟動后設(shè)置定時進(jìn)行Rebalance,20s/次
          • 消費(fèi)者組實例數(shù)量發(fā)生變化時,broker通知消費(fèi)者進(jìn)行Rebalance
          • 所訂閱的topic的messageQueue數(shù)量發(fā)生變化時、訂閱關(guān)系變化時,broker通知消費(fèi)者進(jìn)行Rebalance

          Rebalance的觸發(fā)場景

          • 消費(fèi)者啟動
          • 消費(fèi)者擴(kuò)縮容
          • 消費(fèi)者宕機(jī)
          • broker擴(kuò)縮容
          • messageQueue數(shù)量調(diào)整
          • 網(wǎng)絡(luò)問題導(dǎo)致客戶端
          • broker連接斷開

          Rebalance帶來的問題

          • 消費(fèi)暫停:只有一個Consumer時,該Consumer負(fù)責(zé)消費(fèi)所有隊列。若新增Consumer,則會觸發(fā)Rebalance,原Consumer就需要暫停部分隊列的消費(fèi)。等到這些隊列分配給新的Consumer,暫停的隊列才能繼續(xù)被消費(fèi)。

          • 重復(fù)消費(fèi):Consumer在消費(fèi)新分配給自己的隊列時,必須接著之前的Consumer提交的消費(fèi)進(jìn)度的offset繼續(xù)消費(fèi)。默認(rèn)情況下,offset是異步提交的,就會導(dǎo)致提交到Broker的offset與Consumer實際消費(fèi)的信息不一致。就可能導(dǎo)致重復(fù)消費(fèi)。

          • 消息突刺:由于Rebalance可能導(dǎo)致重復(fù)消費(fèi),如果重復(fù)消費(fèi)的消息過多,或者因為Rebalance暫停時間過長而導(dǎo)致積壓了部分消息。name有可能會導(dǎo)致在Rebalance結(jié)束后需要瞬間消費(fèi)很多消息。

          同步提交和異步提交

          • 同步提交:consumer提交了其消費(fèi)完畢的一批消息的offset給broker后,需要等待broker的成功ACK,收到ACK后,consumer才會繼續(xù)獲取并消費(fèi)下一批消息。在等待ACK期間,consumer是阻塞的。
          • 異步提交:consumer提交了消費(fèi)完畢的一批消息的offset后,不需要等待不容二科的成功ack,consumer可以直接獲取并消費(fèi)下一批消息
          • 對于一次讀取消息的數(shù)量,需要根據(jù)具體業(yè)務(wù)場景選擇一個相對均衡是很有必要的。數(shù)量過的大,產(chǎn)生重復(fù)的消息可能會增加。數(shù)量過小,系統(tǒng)性能會下降。

          隊列分配流程

          1. 獲取指定 Topic 下的消息隊列集合
          2. 如果是廣播模式,則不需要進(jìn)行負(fù)載均衡,消費(fèi)者直接負(fù)責(zé)所有消息隊列
          3. 集群模式則需要獲取指定 Topic 的所有消費(fèi)者集合,根據(jù)負(fù)載均衡算法將消息隊列分配給消費(fèi)者
          4. 消息隊列分配完畢后,則需要為每個消息隊列創(chuàng)建對應(yīng)的任務(wù)隊列,即 ProcessQueue
          5. 為每個任務(wù)隊列創(chuàng)建對應(yīng)的消息拉取任務(wù),后續(xù)消息拉取服務(wù)會定時掃描任務(wù)池進(jìn)行消息拉取操作

          隊列分配目的在于指定消費(fèi)者負(fù)責(zé)的隊列集合,分配前需要明確幾點(diǎn):

          1. 該 Topic 存在多少隊列
          2. 該 Topic 存在多少消費(fèi)者
          3. 隊列如何分配給消費(fèi)者,即負(fù)載均衡算法(默認(rèn)是平均分配的算法)

          消息拉取

          消息拉取流程

          RocketMQ的消息拉取 由 PullMessageService處理。

          1. 消費(fèi)者啟動后,因集成了ServiceThread,ServiceThread又實現(xiàn)了 Runnable接口。他首先會啟動run線程。每執(zhí)行一次業(yè)務(wù)邏輯檢測一下其運(yùn)行狀態(tài),可以通過其他線程將stopped設(shè)置為true從而停止該線程。
          2. pullRequestQueue?是一個阻塞隊列,只會在有消息之后,才會去拉取,拉取最頂部的對象,其他對象一并移出。
          3. 拿到?PullRequest?之后,根據(jù)拉取請求的消費(fèi)組反查該消費(fèi)者的拉取規(guī)則。
          4. 拿到了拉取規(guī)則后通過?pullMessage?函數(shù)獲取?processQueue?隊列消費(fèi)快照,并且檢查是否被dropped,修改最后拉取時間,檢查服務(wù)是否正常,消費(fèi)者是否暫停等
          5. 執(zhí)行流控,判斷緩存消息數(shù)量是否超過閾值,緩存消息大小是否超過閾值,緩存消息跨度是否超過閾值
          6. 獲取?processQueue?鎖,判斷broker是否上鎖,上了就拉,沒上就等待。
          7. 通過?pullRequest 的 messageQueue?計算拉取偏移量,判斷當(dāng)前偏移量是否小于拉請求的下一個坐標(biāo),如果偏移量大于拉請求的下標(biāo)就代表broker繁忙。對之前加鎖并且初始化拉請求的最新下標(biāo)
          8. 再根據(jù)拉請求中的消息隊列,取消息隊列中的主題信息,根據(jù)?RebalanceImpl?實現(xiàn)類并且按照用戶的負(fù)載均衡規(guī)則去查詢訂閱數(shù)據(jù)。
          9. 在?PullCallback?回調(diào)中,根據(jù)狀態(tài)進(jìn)行相應(yīng)的處理(狀態(tài)可以從下方代碼中查看 PullStatus)
          10. 構(gòu)建消息拉取系統(tǒng)標(biāo)記(標(biāo)記可以從下方代碼中查看PullSysFlag)
          11. 最后調(diào)用PullAPIWrapper.pullKernelImpl方法后與服務(wù)端交互

          PullRequestQueue:阻塞隊列,存放的是拉請求 PullRequest

          ProcessQueue:消費(fèi)進(jìn)度,消息總數(shù)量等一些核心的數(shù)據(jù)都在這里

          PullRequest:拉請求,封裝了消費(fèi)者,消息隊列,隊列消費(fèi)快照,下一個下標(biāo),以前鎖定等

          第八步中 如果為空,結(jié)束本次消息拉取,關(guān)于該隊列的下一次拉取任務(wù)延遲3s。

          消息拉取回調(diào)

          消息拉取完畢的后續(xù)處理邏輯:

          • 如果成功拉取到消息,則將消息加入到待處理任務(wù)隊列 ProcessQueue,并提交一個消費(fèi)請求給 ConsumeMessageService,提交下一次消息拉取任務(wù)
          • 如果沒有成功拉取到消息,則根據(jù)服務(wù)端返回的 Offset 進(jìn)行校正處理,重新提交消息拉取任務(wù)

          PullRequest

          1. 在RocketMQ根據(jù)PullRequest拉取任務(wù)執(zhí)行完一次消息拉取任務(wù)后,又將PullRequest對象放入到pullRequestQueue
          2. 在RebalancceImpl中創(chuàng)建。RebalanceImpl就是消息隊列負(fù)載機(jī)制,也就是PullRequest對象真正創(chuàng)建的地方

          ProcessQueue

          ProcessQueue是MessageQueue在消費(fèi)端的重現(xiàn)、快照。PullMessageService從消息服務(wù)器默認(rèn)每次拉取32條消息,按消息的隊列偏移量順序存放在ProcessQueue中,PullMessageService然后將消息提交到消費(fèi)者消費(fèi)線程池,消息成功消費(fèi)后從ProcessQueue中移除

          pullKernelImpl

          下列參數(shù)詳解

          • MessageQueue mq:從哪個消息消費(fèi)隊列拉取消息。
          • String subExpression:消息過濾表達(dá)式。
          • String expressionType:消息表達(dá)式類型,分為TAG、SQL92。
          • long offset:消息拉取偏移量。
          • int maxNums:本次拉取最大消息條數(shù),默認(rèn)32條。
          • int sysFlag:拉取系統(tǒng)標(biāo)記。
          • long commitOffset:當(dāng)前MessageQueue的消費(fèi)進(jìn)度(內(nèi)存中)。
          • long brokerSuspendMaxTimeMillis:消息拉取過程中允許Broker掛起時間,默認(rèn)15s。
          • long timeoutMillis:消息拉取超時時間。
          • CommunicationMode communicationMode:消息拉取模式,默認(rèn)為異步拉取。
          • PullCallback pullCallback:從Broker拉取到消息后的回調(diào)方法。

          執(zhí)行流程如下

          1. 根據(jù)brokerName、BrokerId從MQClientInstance中獲取Broker地址,
          2. 如果消息過濾模式為類過濾,則需要根據(jù)主題名稱、broker地址找到注冊在Broker上的FilterServer地址,從FilterServer上拉取消息,否則從Broker上拉取消息。
          3. 通過MQClientAPIImpl#pullMessageAsync方法異步向Broker拉取消息。

          在整個RocketMQ Broker的部署結(jié)構(gòu)中,相同名稱的Broker構(gòu)成主從結(jié)構(gòu),其BrokerId會不一樣,在每次拉取消息后,會給出一個建議,下次拉取從主節(jié)點(diǎn)還是從節(jié)點(diǎn)拉取

          源碼

          //?PullMessageService?啟動后的?run?函數(shù)
          public?void?run()?{
          ????log.info(this.getServiceName()?+?"?service?started");

          ????while?(!this.isStopped())?{
          ????????try?{
          ????????????//使用BlockingQueue阻塞隊列,當(dāng)提交了消息拉取請求后,馬上執(zhí)行
          ????????????//?take?移除所有隊列,返回最頂部?隊列中的對象(拉請求)
          ????????????PullRequest?pullRequest?=?this.pullRequestQueue.take();
          ????????????this.pullMessage(pullRequest);
          ????????}?catch?(InterruptedException?ignored)?{

          ????????}?catch?(Exception?e)?{
          ????????????log.error("Pull?Message?Service?Run?Method?exception",?e);
          ????????}
          ????}

          ????log.info(this.getServiceName()?+?"?service?end");
          }
          //?stopped?為true的函數(shù)
          public?void?shutdown(final?boolean?interrupt)?{
          ????log.info("Try?to?shutdown?service?thread:{}?started:{}?lastThread:{}",?getServiceName(),?started.get(),?thread);
          ????if?(!started.compareAndSet(true,?false))?{
          ????????return;
          ????}
          ????this.stopped?=?true;
          ????log.info("shutdown?thread?"?+?this.getServiceName()?+?"?interrupt?"?+?interrupt);

          ????if?(hasNotified.compareAndSet(false,?true))?{
          ????????//?此函數(shù)將遞減(減一)鎖存器的計數(shù),如果計數(shù)到達(dá)零,則釋放所有等待的線程
          ????????waitPoint.countDown();?//?notify
          ????}
          }

          /**
          ?*?拉取狀態(tài)
          ?*/

          public?enum?PullStatus?{
          ????/**
          ?????*?Founded
          ?????*?建立
          ?????*/

          ????FOUND,
          ????/**
          ?????*?No?new?message?can?be?pull
          ?????*?無法拉取任何新消息
          ?????*/

          ????NO_NEW_MSG,
          ????/**
          ?????*?Filtering?results?can?not?match
          ?????*?過濾結(jié)果不匹配
          ?????*/

          ????NO_MATCHED_MSG,
          ????/**
          ?????*?Illegal?offset,may?be?too?big?or?too?small
          ?????*?非法偏移,可能太大或太小
          ?????*/

          ????OFFSET_ILLEGAL
          }

          /**
          ?*?消息拉取系統(tǒng)標(biāo)記
          ?*/

          public?class?PullSysFlag?{
          ????/**
          ?????*?表示從內(nèi)存中讀取的消費(fèi)進(jìn)度大于0,則設(shè)置該標(biāo)記位。
          ?????*/

          ????private?final?static?int?FLAG_COMMIT_OFFSET?=?0x1;
          ????/**
          ?????*?表示消息拉取時支持掛起。
          ?????*/

          ????private?final?static?int?FLAG_SUSPEND?=?0x1?<1;
          ????/**
          ?????*?消息過濾機(jī)制為表達(dá)式,則設(shè)置該標(biāo)記位。
          ?????*/

          ????private?final?static?int?FLAG_SUBSCRIPTION?=?0x1?<2;
          ????/**
          ?????*?消息過濾機(jī)制為類過濾模式
          ?????*/

          ????private?final?static?int?FLAG_CLASS_FILTER?=?0x1?<3;
          ????private?final?static?int?FLAG_LITE_PULL_MESSAGE?=?0x1?<4;
          }

          消息消費(fèi)

          消息消費(fèi)的大概流程

          1. 消息生產(chǎn)者把消息發(fā)送并存儲到 Rocket MQ 的 broker 上,NameServer 用來發(fā)現(xiàn)和更新 broker。
          2. 消費(fèi)者啟動時會啟動 PullMessageService 線程,PullMessageService 線程不斷地從內(nèi)部的隊列中取 PullRequest,然后使用 PullRequest 作為請求去拉取消息。
          3. PullRequest 中的消息處理隊列 ProcessQueue 是 MessageQueue 在消費(fèi)端的重現(xiàn)、快照。PullMessageService 使用消費(fèi)者(DefaultMQPushConsumerImpl)從消息服務(wù)器默認(rèn)每次拉取 32 條消息,按消息的隊列偏移量存放在 ProcessQueue 中,然后消費(fèi)者再將消息提交到消息消費(fèi)線程池中(提交 ConsumeRequest),消息成功消費(fèi)后從 ProcessQueue 中移除。

          第二步中

          拿到消費(fèi)請求后,消費(fèi)請求里面肯定帶著topic ,queueId, offset,取多少個這四個重要信息,然后獲取到對應(yīng)topic對應(yīng)queueId的consumeQueue,然后定位到offset位置處,往下取出你要個數(shù)的信息。這里舉個例子,比如說有一個topic是xxx,然后有2個queue,這個時候我們消息消費(fèi)者 發(fā)起消費(fèi)請求,要消費(fèi)topic是xxx,queueId是0 ,然后offset =3開始拉取,拉取大小maxMsgNums=2,就是下圖這個樣子

          接著獲取到3,4這兩個之后,遍歷,根據(jù)它們里面的commitlog offset 再去commitlog獲取到對應(yīng)真實的消息。

          消息拉取完之后,會提交一個消費(fèi)任務(wù)給 ConsumeMessageService 進(jìn)行處理。ConsumeMessageService 有兩個實現(xiàn)類:

          • 并發(fā)處理,對應(yīng)實現(xiàn)類為 ConsumeMessageConcurrentlyService
          • 順序處理,對應(yīng)實現(xiàn)類為 ConsumeMessageOrderlyService

          并發(fā)消費(fèi)

          并發(fā)消費(fèi)的主要方法是?submitConsumeRequest?邏輯如下

          1. consumeMessageBatchMaxSize,消息批次,在這里看來也就是一次消息消費(fèi)任務(wù)ConsumeRequest中包含的消息條數(shù),默認(rèn)為1, msgs.size()默認(rèn)最多為32條,受DefaultMQPushConsumer.pullBatchSize屬性控制,如果msgs.size()小于consume Message-BatchMaxSize,則直接將拉取到的消息放入到ConsumeRequest中,然后將consumeRequest提交到消息消費(fèi)者線程池中,如果提交過程中出現(xiàn)拒絕提交異常則延遲5s再提交,這里其實是給出一種標(biāo)準(zhǔn)的拒絕提交實現(xiàn)方式,實際過程中由于消費(fèi)者線程池使用的任務(wù)隊列為LinkedBlockingQueue無界隊列,故不會出現(xiàn)拒絕提交異常。
          2. 如果拉取的消息條數(shù)大于consumeMessageBatchMaxSize,則對拉取消息進(jìn)行分頁,每頁consumeMessageBatchMaxSize條消息,創(chuàng)建多個ConsumeRequest任務(wù)并提交到消費(fèi)線程池。ConsumeRequest的run方法封裝了具體消息消費(fèi)邏輯。

          1. 進(jìn)入具體消息消費(fèi)時會先檢查processQueue的dropped,如果設(shè)置為true,則停止該隊列的消費(fèi),在進(jìn)行消息重新負(fù)載時如果該消息隊列被分配給消費(fèi)組內(nèi)其他消費(fèi)者后,需要droped設(shè)置為true,阻止消費(fèi)者繼續(xù)消費(fèi)不屬于自己的消息隊列。
          2. 執(zhí)行消息消費(fèi)鉤子函數(shù)ConsumeMessageHook#consumeMessageBefore函數(shù),通過consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(hook),方法消息消費(fèi)執(zhí)行鉤子函數(shù)。
          3. 恢復(fù)重試消息主題名。這是為什么呢?這是由消息重試機(jī)制決定的,RocketMQ將消息存入commitlog文件時,如果發(fā)現(xiàn)消息的延時級別delayTimeLevel大于0,會首先將重試主題存入在消息的屬性中,然后設(shè)置主題名稱為SCHEDULE_TOPIC,以便時間到后重新參與消息消費(fèi)。
          4. 執(zhí)行具體的消息消費(fèi),調(diào)用應(yīng)用程序消息監(jiān)聽器的consumeMessage方法,進(jìn)入到具體的消息消費(fèi)業(yè)務(wù)邏輯,返回該批消息的消費(fèi)結(jié)果。最終將返回CONSUME_SUCCESS(消費(fèi)成功)或RECONSUME_LATER(需要重新消費(fèi))。
          5. 執(zhí)行消息消費(fèi)鉤子函數(shù)ConsumeMessageHook#consumeMessageAfter函數(shù)
          6. 執(zhí)行業(yè)務(wù)消息消費(fèi)后,在處理結(jié)果前再次驗證一下ProcessQueue的isDroped狀態(tài)值,如果設(shè)置為true,將不對結(jié)果進(jìn)行處理,也就是說如果在消息消費(fèi)過程中進(jìn)入到第四步時,如果由于由新的消費(fèi)者加入或原先的消費(fèi)者出現(xiàn)宕機(jī)導(dǎo)致原先分給消費(fèi)者的隊列在負(fù)載之后分配給別的消費(fèi)者,那么在應(yīng)用程序的角度來看的話,消息會被重復(fù)消費(fèi)。
          7. 根據(jù)消息監(jiān)聽器返回的結(jié)果,計算ackIndex,如果返回CONSUME_SUCCESS, ackIndex設(shè)置為msgs.size() - 1,如果返回RECONSUME_LATER, ackIndex=-1,這是為下文發(fā)送msg back(ACK)消息做準(zhǔn)備的。
          8. 如果是集群模式,業(yè)務(wù)方返回RECONSUME_LATER,消息并不會重新被消費(fèi),只是以警告級別輸出到日志文件。如果是集群模式,消息消費(fèi)成功,由于ackIndex=consumeRequest.getMsgs().size()-1,故i=ackIndex+1等于consumeRequest.getMsgs().size(),并不會執(zhí)行sendMessageBack。只有在業(yè)務(wù)方返回RECONSUME_LATER時,該批消息都需要發(fā)ACK消息,如果消息發(fā)送ACK失敗,則直接將本批ACK消費(fèi)發(fā)送失敗的消息再次封裝為ConsumeRequest,然后延遲5s后重新消費(fèi)。如果ACK消息發(fā)送成功,則該消息會延遲消費(fèi)。
          9. 從ProcessQueue中移除這批消息,這里返回的偏移量是移除該批消息后最小的偏移量,然后用該偏移量更新消息消費(fèi)進(jìn)度,以便在消費(fèi)者重啟后能從上一次的消費(fèi)進(jìn)度開始消費(fèi),避免消息重復(fù)消費(fèi)。值得重點(diǎn)注意的是當(dāng)消息監(jiān)聽器返回RECONSUME_LATER,消息消費(fèi)進(jìn)度也會向前推進(jìn),用ProcessQueue中最小的隊列偏移量調(diào)用消息消費(fèi)進(jìn)度存儲器OffsetStore更新消費(fèi)進(jìn)度,這是因為當(dāng)返回RECONSUME_LATER, RocketMQ會創(chuàng)建一條與原先消息屬性相同的消息,擁有一個唯一的新msgId,并存儲原消息ID,該消息會存入到commitlog文件中,與原先的消息沒有任何關(guān)聯(lián),那該消息當(dāng)然也會進(jìn)入到ConsuemeQueue隊列中,將擁有一個全新的隊列偏移量。

          在段落最后,會附上消費(fèi)成功后提交消費(fèi)進(jìn)度的過程,重置消費(fèi)進(jìn)度的過程

          順序消費(fèi)

          RocketMQ 實現(xiàn)順序消費(fèi)的思路比較簡單,在默認(rèn)的情況下消息發(fā)送會采取Round Robin輪詢方式把消息發(fā)送到不同的queue(分區(qū)隊列);而消費(fèi)消息的時候從多個queue上拉取消息,這種情況發(fā)送和消費(fèi)是不能保證順序。但是如果控制發(fā)送的順序消息只依次發(fā)送到同一個queue中,消費(fèi)的時候只從這個queue上依次拉取,則就保證了順序。當(dāng)發(fā)送和消費(fèi)參與的queue只有一個,則是全局有序;如果多個queue參與,則為分區(qū)有序,即相對每個queue,消息都是有序的。

          下面列舉兩種無法保證順序消費(fèi)的場景:

          • 消費(fèi)者A正在消費(fèi)隊列A的消息,此時消費(fèi)者B發(fā)生了隊列的負(fù)載均衡,也分配到了隊列A,在同一時間相當(dāng)于有兩個消費(fèi)者可以同時消費(fèi)一個隊列的消息
          • 當(dāng)前隊列A由一個消費(fèi)者A負(fù)責(zé),但消費(fèi)者A內(nèi)部可以進(jìn)行并發(fā)消費(fèi),即多個消費(fèi)線程同時消費(fèi)隊列A的消息

          因此還需要結(jié)合鎖的機(jī)制來實現(xiàn)順序消費(fèi):

          • 同一時間一個隊列只能分配給一個消費(fèi)者,通過給 Broker 端隊列上鎖實現(xiàn)
          • 同一時間一個隊列只能有一個消費(fèi)線程進(jìn)行消費(fèi),通過給本地隊列上鎖實現(xiàn)

          在隊列負(fù)載均衡階段,如果是順序消費(fèi),會向 Broker 發(fā)起隊列加鎖請求,如果加鎖成功則創(chuàng)建對應(yīng)的任務(wù)隊列及消息拉取請求,反之不創(chuàng)建。

          ConsumeMessageOrderlyService 在啟動后會定時向 Broker 發(fā)送隊列加鎖的請求,目的是續(xù)期鎖。

          具體的加鎖操作如下:

          • 獲取消費(fèi)者負(fù)責(zé)的消息隊列集合 HashMap
          • 依次對每個 Broker 下的消息隊列進(jìn)行加鎖操作,Broker 會響應(yīng)加鎖成功的消息隊列集合
          • 如果消息隊列加鎖成功,則將本地對應(yīng)的任務(wù)隊列設(shè)置為加鎖成功的狀態(tài);反之則設(shè)置成加鎖失敗狀態(tài)

          消費(fèi)的過程中則通過對本地隊列加鎖來實現(xiàn)同一時間一個隊列只能有一個消費(fèi)線程進(jìn)行消費(fèi)。

          看到消費(fèi)任務(wù) ConsumeRequest 的定義, 它是 ConsumeMessageOrderlyService 的內(nèi)部類,不同于之前并發(fā)消費(fèi)的任務(wù),可以看到主要區(qū)別在于消費(fèi)時增加了本地隊列的加鎖操作,以及鎖狀態(tài)的校驗。

          順序消費(fèi)時如果消費(fèi)失敗,會直接將消息放回任務(wù)隊列中等待重新消費(fèi),且重試次數(shù)默認(rèn)是 Integer.MAX_VALUE

          提交進(jìn)度

          消費(fèi)成功后提交消費(fèi)進(jìn)度的過程

          重置消費(fèi)進(jìn)度的過程

          二者共同點(diǎn):

          ? 都是由Broker統(tǒng)一管理消費(fèi)者的消費(fèi)進(jìn)度

          ? 都需要由消費(fèi)者“主動上報”最新的消費(fèi)進(jìn)度

          二者的差異點(diǎn):

          ? 正常消費(fèi)時提交消費(fèi)進(jìn)度,一般消費(fèi)進(jìn)度是向前推進(jìn)

          ? 重置消費(fèi)進(jìn)度時提交消費(fèi)進(jìn)度,消費(fèi)進(jìn)度可能向前推進(jìn),也可能向后回溯

          往期推薦

          2022年5月文章目錄整理

          Redis事務(wù)機(jī)制ACID的實現(xiàn),Redis主從同步的實戰(zhàn)細(xì)節(jié)問題

          緩存預(yù)熱,Redis單線程為什么那么快,過期策略,過期機(jī)制,緩存一致性

          Redis數(shù)據(jù)結(jié)構(gòu),rehash,漸進(jìn)式rehash,AOF,RDB實現(xiàn)原理

          MySQL千萬數(shù)據(jù)調(diào)研,order by 原理分析

          3萬字聊聊什么是MySQL

          為什么就查了一行數(shù)據(jù),執(zhí)行那么慢?

          為什么需要消息隊列?應(yīng)用場景?MQ的技術(shù)選型分析?主題和隊列的實現(xiàn)原理與流程

          結(jié)尾

          非常歡迎大家加我個人微信有關(guān)后端方面的問題我們在群內(nèi)一起討論!?我們下期再見!

          分析不對的地方,還請指出一起修補(bǔ)

          負(fù)載均衡規(guī)則部分圖片參考博主:CSDN每天都要進(jìn)步一點(diǎn)點(diǎn)

          資料參考:RocketMQ官方源碼,《RocketMQ技術(shù)內(nèi)幕》

          歡迎『點(diǎn)贊』、『在看』、『轉(zhuǎn)發(fā)』三連支持一下,下次見~


          瀏覽 121
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  中文字幕亚洲在线 | 美女扒开尿口让男生桶爽视频 | 亚洲AV无码AV制服另类专区 | 国产一级在线电影 | 日韩黄色电影网址网站 |