<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          3分鐘白話RocketMQ系列—— 如何消費消息

          共 3990字,需瀏覽 8分鐘

           ·

          2023-08-24 13:36

          1069b382667c33f90f63b7d8f9ad2469.webp


          白話3分鐘,快速了解RocketMQ如何消費消息。

          看完如果不了解,歡迎來打我。

          我們知道RocketMQ主要分為消息 生產(chǎn)、存儲(消息堆積)、消費 三大塊領(lǐng)域。

          前面已經(jīng)介紹了 生產(chǎn)消息、存儲消息 兩大塊內(nèi)容,那接下來,我們白話一下RocketMQ是如何消費消息的,揭秘消息消費全過程。

          注意,如果白話中不小心提到相關(guān)代碼配置與類名,請參考RocketMQ 4.9.4版本

          關(guān)鍵字摘要

          • 核心概念:消費者與消費組、訂閱關(guān)系、消費模式

          • 核心流程:消費拉取、負(fù)載均衡、消息消費

          Q1: 消息消費有哪些核心概念?

          c0cff3dc950df34ad8256fbe13c220c8.webp

          消費者與消費組、訂閱關(guān)系

          1)消費者與消費組

          消息消費以 組 的模式開展。每個消費組ConsumerGroup可以包含多個消費者Consumer,并且可以訂閱多個主題Topic

          如果多個消費者設(shè)置了相同的ConsumerGroup,我們認(rèn)為這些消費者在同一個消費組ConsumerGroup內(nèi)。

          2)訂閱關(guān)系

          訂閱關(guān)系Subscription由消費者組ConsumerGroup動態(tài)注冊到服務(wù)端系統(tǒng),并在后續(xù)的消息傳輸中按照訂閱關(guān)系中的過濾規(guī)則進(jìn)行 消息過濾與匹配

          原則:

          • 不同消費組ConsumerGroup對于同一個Topic的訂閱相互獨立

          • 同一個消費組ConsumerGroup對于不同Topic的訂閱也相互獨立

          • 同一消費組ConsumerGroup內(nèi)的多個消費者Consumer的訂閱關(guān)系必須保持一致!否則可能會導(dǎo)致部分消息消費不到

          3)消費模式

          消費組之間有兩種消費模式:「集群模式」和「廣播模式」。

          在「集群模式」下,同一主題下的消息只能被消費組內(nèi)的某一個消費者處理,一條消息會被 1 個消費組內(nèi)的 N 個消費者消費 1 次。

          在「廣播模式」下,同一主題下的消息將會被消費組內(nèi)的所有消費者處理一次,一條消息會被 1 個消費組內(nèi)的 N 個消費者消費 N 次。

          如果消息消費是「集群模式」,那么消息進(jìn)度保存在Broker上; 如果是「廣播模式」,那么消息消費進(jìn)度存儲在Consumer端本地。

          Q2:消費者怎么拉取消息?

          整體流程包括:

          • 消費者啟動。主要包括訂閱Topic、初始化消息進(jìn)度。

          • 消費者發(fā)送拉取請求。主要查詢路由表找到目標(biāo)Broker發(fā)送請求。

          • Broker查找并返回消息。根據(jù)訂閱關(guān)系Subscription和 消息進(jìn)度 進(jìn)行消息過濾和匹配,然后返回消息。

          • 消費者接收并處理消息。

          消息服務(wù)器與消費者之間有兩種消息傳送方式:「推模式」和「拉模式」。

          「拉模式」是消費者主動向消息服務(wù)器請求拉取消息。「推模式」是消息到達(dá)消息服務(wù)器后,由服務(wù)器主動推送給消息消費者。

          在 RocketMQ 中,Consumer端的兩種消費模式(Push/Pull)底層其實都是基于「拉模式」來獲取消息的。

          具體實現(xiàn)方式是,消息拉取線程從服務(wù)器 拉取 一批消息后,將其提交給消息消費線程池,并立即繼續(xù)向服務(wù)器嘗試?yán)∠ⅲ员3窒⒌倪B續(xù)性。

          那如果拉取消息時,Broker端暫時沒有新消息可以返回怎么辦?會一直無腦發(fā)送拉取請求嗎?

          嗯,一定不會啦。

          RocketMQ默認(rèn)會開啟「長輪詢機制」,這個機制能夠平衡 輪詢壓力新消息的實時性

          • 消費者發(fā)送拉取請求到Broker,如果沒有新消息,Broker會暫時 掛起 請求不返回

          • Broker每隔5s檢查一次掛起的請求,是否有滿足條件的新消息,如果有就返回,如果沒有就繼續(xù)掛起,直到超時返回

          • 如果在掛起的過程中,有滿足條件的新消息寫入commitLog,也會立即返回新消息

          Q3:消費者怎么知道去哪里拉取消息?

          這就需要聊一聊消息消費的「負(fù)載均衡機制」了。

          注意,RocketMQ 5.x版本,對「推模式」底層增加了一種「Pop模式」的實現(xiàn)。PopPull區(qū)別在于,Pop消費的重平衡是在 Broker 端做的,而之前的 Pull 消費都是由客戶端完成重平衡。本文還是介紹4.x版本。

          消費端的負(fù)載均衡是指將Broker端中多個隊列queue按照某種算法分配給同一個消費組中的不同消費者,負(fù)載均衡是客戶端開始消費的起點。

          注意,從RocketMQ服務(wù)端5.0版本開始額外支持了「消息粒度」的負(fù)載均衡策略,4.x/3.x版本僅支持「隊列粒度」的負(fù)載均衡策略。本文只介紹4.x的「隊列粒度」的。

          RocketMQ「隊列粒度」的負(fù)載均衡的核心設(shè)計理念是:

          • 消費隊列在同一時間只允許被同一消費組內(nèi)的一個消費者消費

          • 一個消費者能同時消費多個消息隊列

          負(fù)載均衡基本流程:

          • Consumer啟動后,它就會通過定時任務(wù)向所有Broker實例發(fā)送心跳包(包含消費分組名稱、訂閱關(guān)系集合、消息通信模式和客戶端id等信息),Broker會緩存這些信息。

          • Consumer每隔10ms從Nameserver獲取Topic與隊列queue的路由信息,緩存本地

          • 每隔20s,Consumer端會請求Broekr獲取該消費組下消費者Id列表,然后根據(jù)Topic下的隊列queue、消費組下消費者Id進(jìn)行排序,計算出待拉取的隊列queue

          • 根據(jù)新算出的本地應(yīng)該消費隊列queue,重新計算本地隊列消費任務(wù)。

          特別注意,無論是消息粒度負(fù)載均衡策略還是隊列粒度負(fù)載均衡策略,在消費者上線或下線、服務(wù)端擴縮容等場景下,都會觸發(fā)短暫的重新負(fù)載均衡動作,可能會存在短暫的負(fù)載不一致情況,出現(xiàn)少量消息重復(fù)的現(xiàn)象。

          因此,需要在下游消費邏輯中做好消息「冪等去重」處理。

          Q4: 消費者拉到消息了,怎么消費呢?

          消息消費,主要關(guān)注兩個事情:

          • 會不會消息丟失?

          • 會不會消費重復(fù)?

          怎么保證消息消費不丟失?

          其實思路是比較直接的,就是 「消息確認(rèn)機制」和「失敗重試機制」

          消費者從RocketMQ拉取消息后,需要返回"CONSUME_SUCCESS"來表示業(yè)務(wù)方已經(jīng)正常完成消費。只有返回"CONSUME_SUCCESS"才算作消費完成。這就是消費時的「消息確認(rèn)機制」。

          如果返回"CONSUME_LATER",則會按照不同的消息延遲級別進(jìn)行再次消費,延遲級別從秒到小時不等,最長延遲時間為2個小時后再次嘗試消費。這就是消費時的「失敗重試機制」。

          重試消息會被存入名為 "%RETRY%+消費組名稱"Topic中,原始主題Topic會存入屬性中。然后會基于定時任務(wù)機制,在到期時將任務(wù)再次拉取出來。

          注意,從重試Topic的名稱我們可以了解到,RocketMQ消息重試是以消費組為單位,而不是Topic

          另外,RocketMQ跟kafka不同的是,天然支持了 「死信隊列機制」

          如果在嘗試消費的過程中達(dá)到了最大重試次數(shù)(通常為16次),仍然無法成功消費,則消息將被發(fā)送到死信隊列,以確保消息存儲的可靠性。后續(xù)業(yè)務(wù)可以根據(jù)死信隊列,來做相關(guān)補償措施。

          怎么保證消息消費不重復(fù)?

          其實思路也很直接,就是不保證不重復(fù)。

          所有消息隊列的設(shè)計,都是不保證消息消費不重復(fù)的。所以使用消息隊列時,要特別注意,如果有唯一性要求,必須做好消費端的「冪等設(shè)計」。

          總結(jié)

          • 消息拉取:「推模式」與「拉模式」本質(zhì)都是「拉模式」、「長輪詢機制」平衡 輪詢壓力 與 新消息的實時性。

          • 消息消費負(fù)載均衡:定時獲取Topic下的隊列queue、消費組下消費者Id等信息,本地計算負(fù)載均衡策略,存在消息重復(fù)的可能性。

          • 消息消費:「消息確認(rèn)機制」和「失敗重試機制」 保證消息不丟失、消息隊列都存在重復(fù)消費。

          3分鐘到了嗎?應(yīng)該對RocketMQ如何消費消息有全面了解了吧。
          如果還想了解更多,歡迎關(guān)注下一期內(nèi)容。




          往期熱門筆記合集推薦:


          原創(chuàng):阿丸筆記(微信公眾號:aone_note),歡迎  分享 ,轉(zhuǎn)載請保留出處。

          沒有留言功能的悲傷,掃描下方二維碼「加我」聊些有的沒的吧~

                                                                                        覺得不錯,就點個 再看 吧??



          瀏覽 67
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  女毛片 | 内射无码高清视频 | 亚洲风情中文字幕 | 国产欧美欧美金五星的户外操逼。 | 国产国产日韩欧美V∧ |