<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          阿里面試官:聽說你精通 RocketMQ,我們來大戰(zhàn) 23 回合

          共 22652字,需瀏覽 46分鐘

           ·

          2022-07-26 08:23

          二哥的編程星球已經(jīng)有 450 多名 小伙伴加入了,如果你也需要一個良好的學(xué)習(xí)氛圍,戳鏈接加入我們吧!這是一個 Java 學(xué)習(xí)指南 + 編程實(shí)戰(zhàn)的私密圈子,你可以向二哥提問、幫你制定學(xué)習(xí)計劃、跟著二哥一起做項目、刷力扣,沖沖沖。

          大家好,我是二哥呀,今天繼續(xù)來給大家分享三弟的面渣逆襲:RocketMQ二十三問,建議大家收藏了慢慢看,秋招、春招、金九銀十、金三銀四沖!

          基礎(chǔ)

          1.為什么要使用消息隊列呢?

          消息隊列主要有三大用途,我們拿一個電商系統(tǒng)的下單舉例:

          • 解耦:引入消息隊列之前,下單完成之后,需要訂單服務(wù)去調(diào)用庫存服務(wù)減庫存,調(diào)用營銷服務(wù)加營銷數(shù)據(jù)……引入消息隊列之后,可以把訂單完成的消息丟進(jìn)隊列里,下游服務(wù)自己去調(diào)用就行了,這樣就完成了訂單服務(wù)和其它服務(wù)的解耦合。
          消息隊列解耦
          • 異步:訂單支付之后,我們要扣減庫存、增加積分、發(fā)送消息等等,這樣一來這個鏈路就長了,鏈路一長,響應(yīng)時間就變長了。引入消息隊列,除了更新訂單狀態(tài),其它的都可以異步去做,這樣一來就來,就能降低響應(yīng)時間。
          消息隊列異步
          • 削峰:消息隊列合一用來削峰,例如秒殺系統(tǒng),平時流量很低,但是要做秒殺活動,秒殺的時候流量瘋狂懟進(jìn)來,我們的服務(wù)器,Redis,MySQL各自的承受能力都不一樣,直接全部流量照單全收肯定有問題啊,嚴(yán)重點(diǎn)可能直接打掛了。

          我們可以把請求扔到隊列里面,只放出我們服務(wù)能處理的流量,這樣就能抗住短時間的大流量了。

          消息隊列削峰

          解耦、異步、削峰,是消息隊列最主要的三大作用。

          2.為什么要選擇RocketMQ?

          市場上幾大消息隊列對比如下:

          四大消息隊列對比

          總結(jié)一下

          選擇中間件的可以從這些維度來考慮:可靠性,性能,功能,可運(yùn)維行,可拓展性,社區(qū)活躍度。目前常用的幾個中間件,ActiveMQ作為“老古董”,市面上用的已經(jīng)不多,其它幾種:

          • RabbitMQ:

          • 優(yōu)點(diǎn):輕量,迅捷,容易部署和使用,擁有靈活的路由配置

          • 缺點(diǎn):性能和吞吐量不太理想,不易進(jìn)行二次開發(fā)

          • RocketMQ:

          • 優(yōu)點(diǎn):性能好,高吞吐量,穩(wěn)定可靠,有活躍的中文社區(qū)

          • 缺點(diǎn):兼容性上不是太好

          • Kafka:

          • 優(yōu)點(diǎn):擁有強(qiáng)大的性能及吞吐量,兼容性很好

          • 缺點(diǎn):由于“攢一波再處理”導(dǎo)致延遲比較高

          我們的系統(tǒng)是面向用戶的C端系統(tǒng),具有一定的并發(fā)量,對性能也有比較高的要求,所以選擇了低延遲、吞吐量比較高,可用性比較好的RocketMQ。

          3.RocketMQ有什么優(yōu)缺點(diǎn)?

          RocketMQ優(yōu)點(diǎn):

          • 單機(jī)吞吐量:十萬級
          • 可用性:非常高,分布式架構(gòu)
          • 消息可靠性:經(jīng)過參數(shù)優(yōu)化配置,消息可以做到0丟失
          • 功能支持:MQ功能較為完善,還是分布式的,擴(kuò)展性好
          • 支持10億級別的消息堆積,不會因為堆積導(dǎo)致性能下降
          • 源碼是Java,方便結(jié)合公司自己的業(yè)務(wù)二次開發(fā)
          • 天生為金融互聯(lián)網(wǎng)領(lǐng)域而生,對于可靠性要求很高的場景,尤其是電商里面的訂單扣款,以及業(yè)務(wù)削峰,在大量交易涌入時,后端可能無法及時處理的情況
          • RoketMQ在穩(wěn)定性上可能更值得信賴,這些業(yè)務(wù)場景在阿里雙11已經(jīng)經(jīng)歷了多次考驗,如果你的業(yè)務(wù)有上述并發(fā)場景,建議可以選擇RocketMQ

          RocketMQ缺點(diǎn):

          • 支持的客戶端語言不多,目前是Java及c++,其中c++不成熟
          • 沒有在 MQ核心中去實(shí)現(xiàn)JMS等接口,有些系統(tǒng)要遷移需要修改大量代碼

          4.消息隊列有哪些消息模型?

          消息隊列有兩種模型:隊列模型發(fā)布/訂閱模型

          • 隊列模型

          這是最初的一種消息隊列模型,對應(yīng)著消息隊列“發(fā)-存-收”的模型。生產(chǎn)者往某個隊列里面發(fā)送消息,一個隊列可以存儲多個生產(chǎn)者的消息,一個隊列也可以有多個消費(fèi)者,但是消費(fèi)者之間是競爭關(guān)系,也就是說每條消息只能被一個消費(fèi)者消費(fèi)。

          隊列模型
          • 發(fā)布/訂閱模型

          如果需要將一份消息數(shù)據(jù)分發(fā)給多個消費(fèi)者,并且每個消費(fèi)者都要求收到全量的消息。很顯然,隊列模型無法滿足這個需求。解決的方式就是發(fā)布/訂閱模型。

          在發(fā)布 - 訂閱模型中,消息的發(fā)送方稱為發(fā)布者(Publisher),消息的接收方稱為訂閱者(Subscriber),服務(wù)端存放消息的容器稱為主題(Topic)。發(fā)布者將消息發(fā)送到主題中,訂閱者在接收消息之前需要先“訂閱主題”。“訂閱”在這里既是一個動作,同時還可以認(rèn)為是主題在消費(fèi)時的一個邏輯副本,每份訂閱中,訂閱者都可以接收到主題的所有消息。

          發(fā)布-訂閱模型

          它和 “隊列模式” 的異同:生產(chǎn)者就是發(fā)布者,隊列就是主題,消費(fèi)者就是訂閱者,無本質(zhì)區(qū)別。唯一的不同點(diǎn)在于:一份消息數(shù)據(jù)是否可以被多次消費(fèi)。

          5.那RocketMQ的消息模型呢?

          RocketMQ使用的消息模型是標(biāo)準(zhǔn)的發(fā)布-訂閱模型,在RocketMQ的術(shù)語表中,生產(chǎn)者、消費(fèi)者和主題,與發(fā)布-訂閱模型中的概念是完全一樣的。

          RocketMQ本身的消息是由下面幾部分組成:

          RocketMQ消息的組成
          • Message

          Message(消息)就是要傳輸?shù)男畔ⅰ?/p>

          一條消息必須有一個主題(Topic),主題可以看做是你的信件要郵寄的地址。

          一條消息也可以擁有一個可選的標(biāo)簽(Tag)和額處的鍵值對,它們可以用于設(shè)置一個業(yè)務(wù) Key 并在 Broker 上查找此消息以便在開發(fā)期間查找問題。

          • Topic

          Topic(主題)可以看做消息的歸類,它是消息的第一級類型。比如一個電商系統(tǒng)可以分為:交易消息、物流消息等,一條消息必須有一個 Topic 。

          Topic 與生產(chǎn)者和消費(fèi)者的關(guān)系非常松散,一個 Topic 可以有0個、1個、多個生產(chǎn)者向其發(fā)送消息,一個生產(chǎn)者也可以同時向不同的 Topic 發(fā)送消息。

          一個 Topic 也可以被 0個、1個、多個消費(fèi)者訂閱。

          • Tag

          Tag(標(biāo)簽)可以看作子主題,它是消息的第二級類型,用于為用戶提供額外的靈活性。使用標(biāo)簽,同一業(yè)務(wù)模塊不同目的的消息就可以用相同 Topic 而不同的 Tag 來標(biāo)識。比如交易消息又可以分為:交易創(chuàng)建消息、交易完成消息等,一條消息可以沒有 Tag

          標(biāo)簽有助于保持你的代碼干凈和連貫,并且還可以為 RocketMQ 提供的查詢系統(tǒng)提供幫助。

          • Group

          RocketMQ中,訂閱者的概念是通過消費(fèi)組(Consumer Group)來體現(xiàn)的。每個消費(fèi)組都消費(fèi)主題中一份完整的消息,不同消費(fèi)組之間消費(fèi)進(jìn)度彼此不受影響,也就是說,一條消息被Consumer Group1消費(fèi)過,也會再給Consumer Group2消費(fèi)。

          消費(fèi)組中包含多個消費(fèi)者,同一個組內(nèi)的消費(fèi)者是競爭消費(fèi)的關(guān)系,每個消費(fèi)者負(fù)責(zé)消費(fèi)組內(nèi)的一部分消息。默認(rèn)情況,如果一條消息被消費(fèi)者Consumer1消費(fèi)了,那同組的其他消費(fèi)者就不會再收到這條消息。

          • Message Queue

          Message Queue(消息隊列),一個 Topic 下可以設(shè)置多個消息隊列,Topic 包括多個 Message Queue ,如果一個 Consumer 需要獲取 Topic下所有的消息,就要遍歷所有的 Message Queue。

          RocketMQ還有一些其它的Queue——例如ConsumerQueue。

          • Offset

          在Topic的消費(fèi)過程中,由于消息需要被不同的組進(jìn)行多次消費(fèi),所以消費(fèi)完的消息并不會立即被刪除,這就需要RocketMQ為每個消費(fèi)組在每個隊列上維護(hù)一個消費(fèi)位置(Consumer Offset),這個位置之前的消息都被消費(fèi)過,之后的消息都沒有被消費(fèi)過,每成功消費(fèi)一條消息,消費(fèi)位置就加一。

          也可以這么說,Queue 是一個長度無限的數(shù)組,Offset 就是下標(biāo)。

          RocketMQ的消息模型中,這些就是比較關(guān)鍵的概念了。畫張圖總結(jié)一下:

          6.消息的消費(fèi)模式了解嗎?

          消息消費(fèi)模式有兩種:Clustering(集群消費(fèi))和Broadcasting(廣播消費(fèi))。

          兩種消費(fèi)模式

          默認(rèn)情況下就是集群消費(fèi),這種模式下一個消費(fèi)者組共同消費(fèi)一個主題的多個隊列,一個隊列只會被一個消費(fèi)者消費(fèi),如果某個消費(fèi)者掛掉,分組內(nèi)其它消費(fèi)者會接替掛掉的消費(fèi)者繼續(xù)消費(fèi)。

          而廣播消費(fèi)消息會發(fā)給消費(fèi)者組中的每一個消費(fèi)者進(jìn)行消費(fèi)。

          7.RoctetMQ基本架構(gòu)了解嗎?

          先看圖,RocketMQ的基本架構(gòu):

          RocketMQ架構(gòu)

          RocketMQ 一共有四個部分組成:NameServer,Broker,Producer 生產(chǎn)者,Consumer 消費(fèi)者,它們對應(yīng)了:發(fā)現(xiàn)、發(fā)、存、收,為了保證高可用,一般每一部分都是集群部署的。

          8.那能介紹一下這四部分嗎?

          類比一下我們生活的郵政系統(tǒng)——

          郵政系統(tǒng)要正常運(yùn)行,離不開下面這四個角色, 一是發(fā)信者,二 是收信者, 三是負(fù)責(zé)暫存?zhèn)鬏數(shù)泥]局, 四是負(fù)責(zé)協(xié)調(diào)各個地方郵局的管理機(jī)構(gòu)。對應(yīng)到 RocketMQ 中,這四個角色就是 Producer、 Consumer、 Broker 、NameServer。

          RocketMQ類比郵政體系
          NameServer

          NameServer 是一個無狀態(tài)的服務(wù)器,角色類似于 Kafka使用的 Zookeeper,但比 Zookeeper 更輕量。

          特點(diǎn):

          • 每個 NameServer 結(jié)點(diǎn)之間是相互獨(dú)立,彼此沒有任何信息交互。
          • Nameserver 被設(shè)計成幾乎是無狀態(tài)的,通過部署多個結(jié)點(diǎn)來標(biāo)識自己是一個偽集群,Producer 在發(fā)送消息前從 NameServer 中獲取 Topic 的路由信息也就是發(fā)往哪個 Broker,Consumer 也會定時從 NameServer 獲取 Topic 的路由信息,Broker 在啟動時會向 NameServer 注冊,并定時進(jìn)行心跳連接,且定時同步維護(hù)的 Topic 到 NameServer。

          功能主要有兩個:

          • 1、和Broker 結(jié)點(diǎn)保持長連接。
          • 2、維護(hù) Topic 的路由信息。
          Broker

          消息存儲和中轉(zhuǎn)角色,負(fù)責(zé)存儲和轉(zhuǎn)發(fā)消息。

          • Broker 內(nèi)部維護(hù)著一個個 Consumer Queue,用來存儲消息的索引,真正存儲消息的地方是 CommitLog(日志文件)。
          RocketMQ存儲-圖片來源官網(wǎng)
          • 單個 Broker 與所有的 Nameserver 保持著長連接和心跳,并會定時將 Topic 信息同步到 NameServer,和 NameServer 的通信底層是通過 Netty 實(shí)現(xiàn)的。
          Producer

          消息生產(chǎn)者,業(yè)務(wù)端負(fù)責(zé)發(fā)送消息,由用戶自行實(shí)現(xiàn)和分布式部署。

          • Producer由用戶進(jìn)行分布式部署,消息由Producer通過多種負(fù)載均衡模式發(fā)送到Broker集群,發(fā)送低延時,支持快速失敗。

          • RocketMQ 提供了三種方式發(fā)送消息:同步、異步和單向

          • 同步發(fā)送:同步發(fā)送指消息發(fā)送方發(fā)出數(shù)據(jù)后會在收到接收方發(fā)回響應(yīng)之后才發(fā)下一個數(shù)據(jù)包。一般用于重要通知消息,例如重要通知郵件、營銷短信。

          • 異步發(fā)送:異步發(fā)送指發(fā)送方發(fā)出數(shù)據(jù)后,不等接收方發(fā)回響應(yīng),接著發(fā)送下個數(shù)據(jù)包,一般用于可能鏈路耗時較長而對響應(yīng)時間敏感的業(yè)務(wù)場景,例如用戶視頻上傳后通知啟動轉(zhuǎn)碼服務(wù)。

          • 單向發(fā)送:單向發(fā)送是指只負(fù)責(zé)發(fā)送消息而不等待服務(wù)器回應(yīng)且沒有回調(diào)函數(shù)觸發(fā),適用于某些耗時非常短但對可靠性要求并不高的場景,例如日志收集。

          Consumer

          消息消費(fèi)者,負(fù)責(zé)消費(fèi)消息,一般是后臺系統(tǒng)負(fù)責(zé)異步消費(fèi)。

          • Consumer也由用戶部署,支持PUSH和PULL兩種消費(fèi)模式,支持集群消費(fèi)廣播消費(fèi),提供實(shí)時的消息訂閱機(jī)制
          • Pull:拉取型消費(fèi)者(Pull Consumer)主動從消息服務(wù)器拉取信息,只要批量拉取到消息,用戶應(yīng)用就會啟動消費(fèi)過程,所以 Pull 稱為主動消費(fèi)型。
          • Push:推送型消費(fèi)者(Push Consumer)封裝了消息的拉取、消費(fèi)進(jìn)度和其他的內(nèi)部維護(hù)工作,將消息到達(dá)時執(zhí)行的回調(diào)接口留給用戶應(yīng)用程序來實(shí)現(xiàn)。所以 Push 稱為被動消費(fèi)類型,但其實(shí)從實(shí)現(xiàn)上看還是從消息服務(wù)器中拉取消息,不同于 Pull 的是 Push 首先要注冊消費(fèi)監(jiān)聽器,當(dāng)監(jiān)聽器處觸發(fā)后才開始消費(fèi)消息。

          進(jìn)階

          9.如何保證消息的可用性/可靠性/不丟失呢?

          消息可能在哪些階段丟失呢?可能會在這三個階段發(fā)生丟失:生產(chǎn)階段、存儲階段、消費(fèi)階段。

          所以要從這三個階段考慮:

          消息傳遞三階段
          生產(chǎn)

          在生產(chǎn)階段,主要通過請求確認(rèn)機(jī)制,來保證消息的可靠傳遞

          • 1、同步發(fā)送的時候,要注意處理響應(yīng)結(jié)果和異常。如果返回響應(yīng)OK,表示消息成功發(fā)送到了Broker,如果響應(yīng)失敗,或者發(fā)生其它異常,都應(yīng)該重試。
          • 2、異步發(fā)送的時候,應(yīng)該在回調(diào)方法里檢查,如果發(fā)送失敗或者異常,都應(yīng)該進(jìn)行重試。
          • 3、如果發(fā)生超時的情況,也可以通過查詢?nèi)罩镜腁PI,來檢查是否在Broker存儲成功。
          存儲

          存儲階段,可以通過配置可靠性優(yōu)先的 Broker 參數(shù)來避免因為宕機(jī)丟消息,簡單說就是可靠性優(yōu)先的場景都應(yīng)該使用同步。

          • 1、消息只要持久化到CommitLog(日志文件)中,即使Broker宕機(jī),未消費(fèi)的消息也能重新恢復(fù)再消費(fèi)。
          • 2、Broker的刷盤機(jī)制:同步刷盤和異步刷盤,不管哪種刷盤都可以保證消息一定存儲在pagecache中(內(nèi)存中),但是同步刷盤更可靠,它是Producer發(fā)送消息后等數(shù)據(jù)持久化到磁盤之后再返回響應(yīng)給Producer。
          同步刷盤和異步刷盤-圖片來源官網(wǎng)
          • 3、Broker通過主從模式來保證高可用,Broker支持Master和Slave同步復(fù)制、Master和Slave異步復(fù)制模式,生產(chǎn)者的消息都是發(fā)送給Master,但是消費(fèi)既可以從Master消費(fèi),也可以從Slave消費(fèi)。同步復(fù)制模式可以保證即使Master宕機(jī),消息肯定在Slave中有備份,保證了消息不會丟失。
          消費(fèi)

          從Consumer角度分析,如何保證消息被成功消費(fèi)?

          • Consumer保證消息成功消費(fèi)的關(guān)鍵在于確認(rèn)的時機(jī),不要在收到消息后就立即發(fā)送消費(fèi)確認(rèn),而是應(yīng)該在執(zhí)行完所有消費(fèi)業(yè)務(wù)邏輯之后,再發(fā)送消費(fèi)確認(rèn)。因為消息隊列維護(hù)了消費(fèi)的位置,邏輯執(zhí)行失敗了,沒有確認(rèn),再去隊列拉取消息,就還是之前的一條。

          10.如何處理消息重復(fù)的問題呢?

          對分布式消息隊列來說,同時做到確保一定投遞和不重復(fù)投遞是很難的,就是所謂的“有且僅有一次” 。RocketMQ擇了確保一定投遞,保證消息不丟失,但有可能造成消息重復(fù)。

          處理消息重復(fù)問題,主要有業(yè)務(wù)端自己保證,主要的方式有兩種:業(yè)務(wù)冪等消息去重

          消息重復(fù)處理

          業(yè)務(wù)冪等:第一種是保證消費(fèi)邏輯的冪等性,也就是多次調(diào)用和一次調(diào)用的效果是一樣的。這樣一來,不管消息消費(fèi)多少次,對業(yè)務(wù)都沒有影響。

          消息去重:第二種是業(yè)務(wù)端,對重復(fù)的消息就不再消費(fèi)了。這種方法,需要保證每條消息都有一個惟一的編號,通常是業(yè)務(wù)相關(guān)的,比如訂單號,消費(fèi)的記錄需要落庫,而且需要保證和消息確認(rèn)這一步的原子性。

          具體做法是可以建立一個消費(fèi)記錄表,拿到這個消息做數(shù)據(jù)庫的insert操作。給這個消息做一個唯一主鍵(primary key)或者唯一約束,那么就算出現(xiàn)重復(fù)消費(fèi)的情況,就會導(dǎo)致主鍵沖突,那么就不再處理這條消息。

          11.怎么處理消息積壓?

          發(fā)生了消息積壓,這時候就得想辦法趕緊把積壓的消息消費(fèi)完,就得考慮提高消費(fèi)能力,一般有兩種辦法:

          消息積壓處理
          • 消費(fèi)者擴(kuò)容:如果當(dāng)前Topic的Message Queue的數(shù)量大于消費(fèi)者數(shù)量,就可以對消費(fèi)者進(jìn)行擴(kuò)容,增加消費(fèi)者,來提高消費(fèi)能力,盡快把積壓的消息消費(fèi)玩。
          • 消息遷移Queue擴(kuò)容:如果當(dāng)前Topic的Message Queue的數(shù)量小于或者等于消費(fèi)者數(shù)量,這種情況,再擴(kuò)容消費(fèi)者就沒什么用,就得考慮擴(kuò)容Message Queue。可以新建一個臨時的Topic,臨時的Topic多設(shè)置一些Message Queue,然后先用一些消費(fèi)者把消費(fèi)的數(shù)據(jù)丟到臨時的Topic,因為不用業(yè)務(wù)處理,只是轉(zhuǎn)發(fā)一下消息,還是很快的。接下來用擴(kuò)容的消費(fèi)者去消費(fèi)新的Topic里的數(shù)據(jù),消費(fèi)完了之后,恢復(fù)原狀。
          消息遷移擴(kuò)容消費(fèi)

          12.順序消息如何實(shí)現(xiàn)?

          順序消息是指消息的消費(fèi)順序和產(chǎn)生順序相同,在有些業(yè)務(wù)邏輯下,必須保證順序,比如訂單的生成、付款、發(fā)貨,這個消息必須按順序處理才行。

          順序消息

          順序消息分為全局順序消息和部分順序消息,全局順序消息指某個 Topic 下的所有消息都要保證順序;

          部分順序消息只要保證每一組消息被順序消費(fèi)即可,比如訂單消息,只要保證同一個訂單 ID 個消息能按順序消費(fèi)即可。

          部分順序消息

          部分順序消息相對比較好實(shí)現(xiàn),生產(chǎn)端需要做到把同 ID 的消息發(fā)送到同一個 Message Queue ;在消費(fèi)過程中,要做到從同一個Message Queue讀取的消息順序處理——消費(fèi)端不能并發(fā)處理順序消息,這樣才能達(dá)到部分有序。

          部分順序消息

          發(fā)送端使用 MessageQueueSelector 類來控制 把消息發(fā)往哪個 Message Queue 。

          順序消息生產(chǎn)-例子來源官方

          消費(fèi)端通過使用 MessageListenerOrderly 來解決單 Message Queue 的消息被并發(fā)處理的問題。

          全局順序消息

          RocketMQ 默認(rèn)情況下不保證順序,比如創(chuàng)建一個 Topic ,默認(rèn)八個寫隊列,八個讀隊列,這時候一條消息可能被寫入任意一個隊列里;在數(shù)據(jù)的讀取過程中,可能有多個 Consumer ,每個 Consumer 也可能啟動多個線程并行處理,所以消息被哪個 Consumer 消費(fèi),被消費(fèi)的順序和寫人的順序是否一致是不確定的。

          要保證全局順序消息, 需要先把 Topic 的讀寫隊列數(shù)設(shè)置為 一,然后Producer Consumer 的并發(fā)設(shè)置,也要是一。簡單來說,為了保證整個 Topic全局消息有序,只能消除所有的并發(fā)處理,各部分都設(shè)置成單線程處理 ,這時候就完全犧牲RocketMQ的高并發(fā)、高吞吐的特性了。

          全局順序消息

          13.如何實(shí)現(xiàn)消息過濾?

          有兩種方案:

          • 一種是在 Broker 端按照 Consumer 的去重邏輯進(jìn)行過濾,這樣做的好處是避免了無用的消息傳輸?shù)?Consumer 端,缺點(diǎn)是加重了 Broker 的負(fù)擔(dān),實(shí)現(xiàn)起來相對復(fù)雜。
          • 另一種是在 Consumer 端過濾,比如按照消息設(shè)置的 tag 去重,這樣的好處是實(shí)現(xiàn)起來簡單,缺點(diǎn)是有大量無用的消息到達(dá)了 Consumer 端只能丟棄不處理。

          一般采用Cosumer端過濾,如果希望提高吞吐量,可以采用Broker過濾。

          對消息的過濾有三種方式:

          消息過濾
          • 根據(jù)Tag過濾:這是最常見的一種,用起來高效簡單
          DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
          consumer.subscribe("TOPIC""TAGA || TAGB || TAGC");
          • SQL 表達(dá)式過濾:SQL表達(dá)式過濾更加靈活
          DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
          // 只有訂閱的消息有這個屬性a, a >=0 and a <= 3
          consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
          consumer.registerMessageListener(new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             }
          });
          consumer.start();

          • Filter Server 方式:最靈活,也是最復(fù)雜的一種方式,允許用戶自定義函數(shù)進(jìn)行過濾

          14.延時消息了解嗎?

          電商的訂單超時自動取消,就是一個典型的利用延時消息的例子,用戶提交了一個訂單,就可以發(fā)送一個延時消息,1h后去檢查這個訂單的狀態(tài),如果還是未付款就取消訂單釋放庫存。

          RocketMQ是支持延時消息的,只需要在生產(chǎn)消息的時候設(shè)置消息的延時級別:

          // 實(shí)例化一個生產(chǎn)者來產(chǎn)生延時消息
          DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
          // 啟動生產(chǎn)者
          producer.start();
          int totalMessagesToSend = 100;
          for (int i = 0; i < totalMessagesToSend; i++) {
              Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
              // 設(shè)置延時等級3,這個消息將在10s之后發(fā)送(現(xiàn)在只支持固定的幾個時間,詳看delayTimeLevel)
              message.setDelayTimeLevel(3);
              // 發(fā)送消息
              producer.send(message);
          }

          但是目前RocketMQ支持的延時級別是有限的:

          private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

          RocketMQ怎么實(shí)現(xiàn)延時消息的?

          簡單,八個字:臨時存儲+定時任務(wù)

          Broker收到延時消息了,會先發(fā)送到主題(SCHEDULE_TOPIC_XXXX)的相應(yīng)時間段的Message Queue中,然后通過一個定時任務(wù)輪詢這些隊列,到期后,把消息投遞到目標(biāo)Topic的隊列中,然后消費(fèi)者就可以正常消費(fèi)這些消息。

          延遲消息處理流程-圖片來源見水印

          15.怎么實(shí)現(xiàn)分布式消息事務(wù)的?半消息?

          半消息:是指暫時還不能被 Consumer 消費(fèi)的消息,Producer 成功發(fā)送到 Broker 端的消息,但是此消息被標(biāo)記為 “暫不可投遞” 狀態(tài),只有等 Producer 端執(zhí)行完本地事務(wù)后經(jīng)過二次確認(rèn)了之后,Consumer 才能消費(fèi)此條消息。

          依賴半消息,可以實(shí)現(xiàn)分布式消息事務(wù),其中的關(guān)鍵在于二次確認(rèn)以及消息回查:

          RocketMQ實(shí)現(xiàn)消息事務(wù)
          • 1、Producer 向 broker 發(fā)送半消息
          • 2、Producer 端收到響應(yīng),消息發(fā)送成功,此時消息是半消息,標(biāo)記為 “不可投遞” 狀態(tài),Consumer 消費(fèi)不了。
          • 3、Producer 端執(zhí)行本地事務(wù)。
          • 4、正常情況本地事務(wù)執(zhí)行完成,Producer 向 Broker 發(fā)送 Commit/Rollback,如果是 Commit,Broker 端將半消息標(biāo)記為正常消息,Consumer 可以消費(fèi),如果是 Rollback,Broker 丟棄此消息。
          • 5、異常情況,Broker 端遲遲等不到二次確認(rèn)。在一定時間后,會查詢所有的半消息,然后到 Producer 端查詢半消息的執(zhí)行情況。
          • 6、Producer 端查詢本地事務(wù)的狀態(tài)
          • 7、根據(jù)事務(wù)的狀態(tài)提交 commit/rollback 到 broker 端。(5,6,7 是消息回查)
          • 8、消費(fèi)者段消費(fèi)到消息之后,執(zhí)行本地事務(wù),執(zhí)行本地事務(wù)。

          16.死信隊列知道嗎?

          死信隊列用于處理無法被正常消費(fèi)的消息,即死信消息。

          當(dāng)一條消息初次消費(fèi)失敗,消息隊列 RocketMQ 會自動進(jìn)行消息重試;達(dá)到最大重試次數(shù)后,若消費(fèi)依然失敗,則表明消費(fèi)者在正常情況下無法正確地消費(fèi)該消息,此時,消息隊列 RocketMQ 不會立刻將消息丟棄,而是將其發(fā)送到該消費(fèi)者對應(yīng)的特殊隊列中,該特殊隊列稱為死信隊列

          死信消息的特點(diǎn)

          • 不會再被消費(fèi)者正常消費(fèi)。
          • 有效期與正常消息相同,均為 3 天,3 天后會被自動刪除。因此,需要在死信消息產(chǎn)生后的 3 天內(nèi)及時處理。

          死信隊列的特點(diǎn)

          • 一個死信隊列對應(yīng)一個 Group ID, 而不是對應(yīng)單個消費(fèi)者實(shí)例。
          • 如果一個 Group ID 未產(chǎn)生死信消息,消息隊列 RocketMQ 不會為其創(chuàng)建相應(yīng)的死信隊列。
          • 一個死信隊列包含了對應(yīng) Group ID 產(chǎn)生的所有死信消息,不論該消息屬于哪個 Topic。

          RocketMQ 控制臺提供對死信消息的查詢、導(dǎo)出和重發(fā)的功能。

          17.如何保證RocketMQ的高可用?

          NameServer因為是無狀態(tài),且不相互通信的,所以只要集群部署就可以保證高可用。

          NameServer集群

          RocketMQ的高可用主要是在體現(xiàn)在Broker的讀和寫的高可用,Broker的高可用是通過集群主從實(shí)現(xiàn)的。

          Broker集群、主從示意圖

          Broker可以配置兩種角色:Master和Slave,Master角色的Broker支持讀和寫,Slave角色的Broker只支持讀,Master會向Slave同步消息。

          也就是說Producer只能向Master角色的Broker寫入消息,Cosumer可以從Master和Slave角色的Broker讀取消息。

          Consumer 的配置文件中,并不需要設(shè)置是從 Master 讀還是從 Slave讀,當(dāng) Master 不可用或者繁忙的時候, Consumer 的讀請求會被自動切換到從 Slave。有了自動切換 Consumer 這種機(jī)制,當(dāng)一個 Master 角色的機(jī)器出現(xiàn)故障后,Consumer 仍然可以從 Slave 讀取消息,不影響 Consumer 讀取消息,這就實(shí)現(xiàn)了讀的高可用。

          如何達(dá)到發(fā)送端寫的高可用性呢?在創(chuàng)建 Topic 的時候,把 Topic 的多個Message Queue 創(chuàng)建在多個 Broker 組上(相同 Broker 名稱,不同 brokerId機(jī)器組成 Broker 組),這樣當(dāng) Broker 組的 Master 不可用后,其他組Master 仍然可用, Producer 仍然可以發(fā)送消息 RocketMQ 目前還不支持把Slave自動轉(zhuǎn)成 Master ,如果機(jī)器資源不足,需要把 Slave 轉(zhuǎn)成 Master ,則要手動停止 Slave 色的 Broker ,更改配置文件,用新的配置文件啟動 Broker。

          原理

          18.說一下RocketMQ的整體工作流程?

          簡單來說,RocketMQ是一個分布式消息隊列,也就是消息隊列+分布式系統(tǒng)

          作為消息隊列,它是發(fā)--的一個模型,對應(yīng)的就是Producer、Broker、Cosumer;作為分布式系統(tǒng),它要有服務(wù)端、客戶端、注冊中心,對應(yīng)的就是Broker、Producer/Consumer、NameServer

          所以我們看一下它主要的工作流程:RocketMQ由NameServer注冊中心集群、Producer生產(chǎn)者集群、Consumer消費(fèi)者集群和若干Broker(RocketMQ進(jìn)程)組成:

          1. Broker在啟動的時候去向所有的NameServer注冊,并保持長連接,每30s發(fā)送一次心跳
          2. Producer在發(fā)送消息的時候從NameServer獲取Broker服務(wù)器地址,根據(jù)負(fù)載均衡算法選擇一臺服務(wù)器來發(fā)送消息
          3. Conusmer消費(fèi)消息的時候同樣從NameServer獲取Broker地址,然后主動拉取消息來消費(fèi)
          RocketMQ整體工作流程

          19.為什么RocketMQ不使用Zookeeper作為注冊中心呢?

          Kafka我們都知道采用Zookeeper作為注冊中心——當(dāng)然也開始逐漸去Zookeeper,RocketMQ不使用Zookeeper其實(shí)主要可能從這幾方面來考慮:

          1. 基于可用性的考慮,根據(jù)CAP理論,同時最多只能滿足兩個點(diǎn),而Zookeeper滿足的是CP,也就是說Zookeeper并不能保證服務(wù)的可用性,Zookeeper在進(jìn)行選舉的時候,整個選舉的時間太長,期間整個集群都處于不可用的狀態(tài),而這對于一個注冊中心來說肯定是不能接受的,作為服務(wù)發(fā)現(xiàn)來說就應(yīng)該是為可用性而設(shè)計。
          2. 基于性能的考慮,NameServer本身的實(shí)現(xiàn)非常輕量,而且可以通過增加機(jī)器的方式水平擴(kuò)展,增加集群的抗壓能力,而Zookeeper的寫是不可擴(kuò)展的,Zookeeper要解決這個問題只能通過劃分領(lǐng)域,劃分多個Zookeeper集群來解決,首先操作起來太復(fù)雜,其次這樣還是又違反了CAP中的A的設(shè)計,導(dǎo)致服務(wù)之間是不連通的。
          3. 持久化的機(jī)制來帶的問題,ZooKeeper 的 ZAB 協(xié)議對每一個寫請求,會在每個 ZooKeeper 節(jié)點(diǎn)上保持寫一個事務(wù)日志,同時再加上定期的將內(nèi)存數(shù)據(jù)鏡像(Snapshot)到磁盤來保證數(shù)據(jù)的一致性和持久性,而對于一個簡單的服務(wù)發(fā)現(xiàn)的場景來說,這其實(shí)沒有太大的必要,這個實(shí)現(xiàn)方案太重了。而且本身存儲的數(shù)據(jù)應(yīng)該是高度定制化的。
          4. 消息發(fā)送應(yīng)該弱依賴注冊中心,而RocketMQ的設(shè)計理念也正是基于此,生產(chǎn)者在第一次發(fā)送消息的時候從NameServer獲取到Broker地址后緩存到本地,如果NameServer整個集群不可用,短時間內(nèi)對于生產(chǎn)者和消費(fèi)者并不會產(chǎn)生太大影響。

          20.Broker是怎么保存數(shù)據(jù)的呢?

          RocketMQ主要的存儲文件包括CommitLog文件、ConsumeQueue文件、Indexfile文件。

          消息存儲的整體的設(shè)計:

          消息存儲整體設(shè)計-來源官網(wǎng)
          • CommitLog:消息主體以及元數(shù)據(jù)的存儲主體,存儲Producer端寫入的消息主體內(nèi)容,消息內(nèi)容不是定長的。單個文件大小默認(rèn)1G, 文件名長度為20位,左邊補(bǔ)零,剩余為起始偏移量,比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當(dāng)?shù)谝粋€文件寫滿了,第二個文件為00000000001073741824,起始偏移量為1073741824,以此類推。消息主要是順序?qū)懭肴罩疚募?dāng)文件滿了,寫入下一個文件。

          CommitLog文件保存于${Rocket_Home}/store/commitlog目錄中,從圖中我們可以明顯看出來文件名的偏移量,每個文件默認(rèn)1G,寫滿后自動生成一個新的文件。

          CommitLog
          • ConsumeQueue:消息消費(fèi)隊列,引入的目的主要是提高消息消費(fèi)的性能,由于RocketMQ是基于主題topic的訂閱模式,消息消費(fèi)是針對主題進(jìn)行的,如果要遍歷commitlog文件中根據(jù)topic檢索消息是非常低效的。

          Consumer即可根據(jù)ConsumeQueue來查找待消費(fèi)的消息。其中,ConsumeQueue(邏輯消費(fèi)隊列)作為消費(fèi)消息的索引,保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。

          ConsumeQueue文件可以看成是基于Topic的CommitLog索引文件,故ConsumeQueue文件夾的組織方式如下:topic/queue/file三層組織結(jié)構(gòu),具體存儲路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同樣ConsumeQueue文件采取定長設(shè)計,每一個條目共20個字節(jié),分別為8字節(jié)的CommitLog物理偏移量、4字節(jié)的消息長度、8字節(jié)tag hashcode,單個文件由30W個條目組成,可以像數(shù)組一樣隨機(jī)訪問每一個條目,每個ConsumeQueue文件大小約5.72M;

          Comsumer Queue
          • IndexFile:IndexFile(索引文件)提供了一種可以通過key或時間區(qū)間來查詢消息的方法。Index文件的存儲位置是:{fileName},文件名fileName是以創(chuàng)建時的時間戳命名的,固定的單個IndexFile文件大小約為400M,一個IndexFile可以保存 2000W個索引,IndexFile的底層存儲設(shè)計為在文件系統(tǒng)中實(shí)現(xiàn)HashMap結(jié)構(gòu),故RocketMQ的索引文件其底層實(shí)現(xiàn)為hash索引。
          IndexFile文件示意圖

          總結(jié)一下:RocketMQ采用的是混合型的存儲結(jié)構(gòu),即為Broker單個實(shí)例下所有的隊列共用一個日志數(shù)據(jù)文件(即為CommitLog)來存儲。

          RocketMQ的混合型存儲結(jié)構(gòu)(多個Topic的消息實(shí)體內(nèi)容都存儲于一個CommitLog中)針對Producer和Consumer分別采用了數(shù)據(jù)和索引部分相分離的存儲結(jié)構(gòu),Producer發(fā)送消息至Broker端,然后Broker端使用同步或者異步的方式對消息刷盤持久化,保存至CommitLog中。

          只要消息被刷盤持久化至磁盤文件CommitLog中,那么Producer發(fā)送的消息就不會丟失。正因為如此,Consumer也就肯定有機(jī)會去消費(fèi)這條消息。當(dāng)無法拉取到消息后,可以等下一次消息拉取,同時服務(wù)端也支持長輪詢模式,如果一個消息拉取請求未拉取到消息,Broker允許等待30s的時間,只要這段時間內(nèi)有新消息到達(dá),將直接返回給消費(fèi)端。

          這里,RocketMQ的具體做法是,使用Broker端的后臺服務(wù)線程—ReputMessageService不停地分發(fā)請求并異步構(gòu)建ConsumeQueue(邏輯消費(fèi)隊列)和IndexFile(索引文件)數(shù)據(jù)。

          21.說說RocketMQ怎么對文件進(jìn)行讀寫的?

          RocketMQ對文件的讀寫巧妙地利用了操作系統(tǒng)的一些高效文件讀寫方式——PageCache順序讀寫零拷貝

          • PageCache、順序讀取

          在RocketMQ中,ConsumeQueue邏輯消費(fèi)隊列存儲的數(shù)據(jù)較少,并且是順序讀取,在page cache機(jī)制的預(yù)讀取作用下,Consume Queue文件的讀性能幾乎接近讀內(nèi)存,即使在有消息堆積情況下也不會影響性能。而對于CommitLog消息存儲的日志數(shù)據(jù)文件來說,讀取消息內(nèi)容時候會產(chǎn)生較多的隨機(jī)訪問讀取,嚴(yán)重影響性能。如果選擇合適的系統(tǒng)IO調(diào)度算法,比如設(shè)置調(diào)度算法為“Deadline”(此時塊存儲采用SSD的話),隨機(jī)讀的性能也會有所提升。

          頁緩存(PageCache)是OS對文件的緩存,用于加速對文件的讀寫。一般來說,程序?qū)ξ募M(jìn)行順序讀寫的速度幾乎接近于內(nèi)存的讀寫速度,主要原因就是由于OS使用PageCache機(jī)制對讀寫訪問操作進(jìn)行了性能優(yōu)化,將一部分的內(nèi)存用作PageCache。對于數(shù)據(jù)的寫入,OS會先寫入至Cache內(nèi),隨后通過異步的方式由pdflush內(nèi)核線程將Cache內(nèi)的數(shù)據(jù)刷盤至物理磁盤上。對于數(shù)據(jù)的讀取,如果一次讀取文件時出現(xiàn)未命中PageCache的情況,OS從物理磁盤上訪問讀取文件的同時,會順序?qū)ζ渌噜弶K的數(shù)據(jù)文件進(jìn)行預(yù)讀取。

          • 零拷貝

          另外,RocketMQ主要通過MappedByteBuffer對文件進(jìn)行讀寫操作。其中,利用了NIO中的FileChannel模型將磁盤上的物理文件直接映射到用戶態(tài)的內(nèi)存地址中(這種Mmap的方式減少了傳統(tǒng)IO,將磁盤文件數(shù)據(jù)在操作系統(tǒng)內(nèi)核地址空間的緩沖區(qū),和用戶應(yīng)用程序地址空間的緩沖區(qū)之間來回進(jìn)行拷貝的性能開銷),將對文件的操作轉(zhuǎn)化為直接對內(nèi)存地址進(jìn)行操作,從而極大地提高了文件的讀寫效率(正因為需要使用內(nèi)存映射機(jī)制,故RocketMQ的文件存儲都使用定長結(jié)構(gòu)來存儲,方便一次將整個文件映射至內(nèi)存)。

          說說什么是零拷貝?

          在操作系統(tǒng)中,使用傳統(tǒng)的方式,數(shù)據(jù)需要經(jīng)歷幾次拷貝,還要經(jīng)歷用戶態(tài)/內(nèi)核態(tài)切換。

          傳統(tǒng)文件傳輸示意圖-來源《圖解操作系統(tǒng)》
          1. 從磁盤復(fù)制數(shù)據(jù)到內(nèi)核態(tài)內(nèi)存;
          2. 從內(nèi)核態(tài)內(nèi)存復(fù)制到用戶態(tài)內(nèi)存;
          3. 然后從用戶態(tài)內(nèi)存復(fù)制到網(wǎng)絡(luò)驅(qū)動的內(nèi)核態(tài)內(nèi)存;
          4. 最后是從網(wǎng)絡(luò)驅(qū)動的內(nèi)核態(tài)內(nèi)存復(fù)制到網(wǎng)卡中進(jìn)行傳輸。

          所以,可以通過零拷貝的方式,減少用戶態(tài)與內(nèi)核態(tài)的上下文切換內(nèi)存拷貝的次數(shù),用來提升I/O的性能。零拷貝比較常見的實(shí)現(xiàn)方式是mmap,這種機(jī)制在Java中是通過MappedByteBuffer實(shí)現(xiàn)的。

          mmap示意圖-來源《圖解操作系統(tǒng)》

          22.消息刷盤怎么實(shí)現(xiàn)的呢?

          RocketMQ提供了兩種刷盤策略:同步刷盤和異步刷盤

          • 同步刷盤:在消息達(dá)到Broker的內(nèi)存之后,必須刷到commitLog日志文件中才算成功,然后返回Producer數(shù)據(jù)已經(jīng)發(fā)送成功。
          • 異步刷盤:異步刷盤是指消息達(dá)到Broker內(nèi)存后就返回Producer數(shù)據(jù)已經(jīng)發(fā)送成功,會喚醒一個線程去將數(shù)據(jù)持久化到CommitLog日志文件中。

          Broker 在消息的存取時直接操作的是內(nèi)存(內(nèi)存映射文件),這可以提供系統(tǒng)的吞吐量,但是無法避免機(jī)器掉電時數(shù)據(jù)丟失,所以需要持久化到磁盤中。

          刷盤的最終實(shí)現(xiàn)都是使用NIO中的 MappedByteBuffer.force() 將映射區(qū)的數(shù)據(jù)寫入到磁盤,如果是同步刷盤的話,在Broker把消息寫到CommitLog映射區(qū)后,就會等待寫入完成。

          異步而言,只是喚醒對應(yīng)的線程,不保證執(zhí)行的時機(jī),流程如圖所示。

          異步刷盤

          22.能說下 RocketMQ 的負(fù)載均衡是如何實(shí)現(xiàn)的?

          RocketMQ中的負(fù)載均衡都在Client端完成,具體來說的話,主要可以分為Producer端發(fā)送消息時候的負(fù)載均衡和Consumer端訂閱消息的負(fù)載均衡。

          Producer的負(fù)載均衡

          Producer端在發(fā)送消息的時候,會先根據(jù)Topic找到指定的TopicPublishInfo,在獲取了TopicPublishInfo路由信息后,RocketMQ的客戶端在默認(rèn)方式下selectOneMessageQueue()方法會從TopicPublishInfo中的messageQueueList中選擇一個隊列(MessageQueue)進(jìn)行發(fā)送消息。具這里有一個sendLatencyFaultEnable開關(guān)變量,如果開啟,在隨機(jī)遞增取模的基礎(chǔ)上,再過濾掉not available的Broker代理。

          所謂的"latencyFaultTolerance",是指對之前失敗的,按一定的時間做退避。例如,如果上次請求的latency超過550Lms,就退避3000Lms;超過1000L,就退避60000L;如果關(guān)閉,采用隨機(jī)遞增取模的方式選擇一個隊列(MessageQueue)來發(fā)送消息,latencyFaultTolerance機(jī)制是實(shí)現(xiàn)消息發(fā)送高可用的核心關(guān)鍵所在。

          Consumer的負(fù)載均衡

          在RocketMQ中,Consumer端的兩種消費(fèi)模式(Push/Pull)都是基于拉模式來獲取消息的,而在Push模式只是對pull模式的一種封裝,其本質(zhì)實(shí)現(xiàn)為消息拉取線程在從服務(wù)器拉取到一批消息后,然后提交到消息消費(fèi)線程池后,又“馬不停蹄”的繼續(xù)向服務(wù)器再次嘗試?yán)∠ⅰH绻蠢〉较ⅲ瑒t延遲一下又繼續(xù)拉取。在兩種基于拉模式的消費(fèi)方式(Push/Pull)中,均需要Consumer端知道從Broker端的哪一個消息隊列中去獲取消息。因此,有必要在Consumer端來做負(fù)載均衡,即Broker端中多個MessageQueue分配給同一個ConsumerGroup中的哪些Consumer消費(fèi)。

          1. Consumer端的心跳包發(fā)送

          在Consumer啟動后,它就會通過定時任務(wù)不斷地向RocketMQ集群中的所有Broker實(shí)例發(fā)送心跳包(其中包含了,消息消費(fèi)分組名稱、訂閱關(guān)系集合、消息通信模式和客戶端id的值等信息)。Broker端在收到Consumer的心跳消息后,會將它維護(hù)在ConsumerManager的本地緩存變量—consumerTable,同時并將封裝后的客戶端網(wǎng)絡(luò)通道信息保存在本地緩存變量—channelInfoTable中,為之后做Consumer端的負(fù)載均衡提供可以依據(jù)的元數(shù)據(jù)信息。

          1. Consumer端實(shí)現(xiàn)負(fù)載均衡的核心類—RebalanceImpl

          在Consumer實(shí)例的啟動流程中的啟動MQClientInstance實(shí)例部分,會完成負(fù)載均衡服務(wù)線程—RebalanceService的啟動(每隔20s執(zhí)行一次)。

          通過查看源碼可以發(fā)現(xiàn),RebalanceService線程的run()方法最終調(diào)用的是RebalanceImpl類的rebalanceByTopic()方法,這個方法是實(shí)現(xiàn)Consumer端負(fù)載均衡的核心。

          rebalanceByTopic()方法會根據(jù)消費(fèi)者通信類型為“廣播模式”還是“集群模式”做不同的邏輯處理。這里主要來看下集群模式下的主要處理流程:

          (1) 從rebalanceImpl實(shí)例的本地緩存變量—topicSubscribeInfoTable中,獲取該Topic主題下的消息消費(fèi)隊列集合(mqSet);

          (2) 根據(jù)topic和consumerGroup為參數(shù)調(diào)用mQClientFactory.findConsumerIdList()方法向Broker端發(fā)送通信請求,獲取該消費(fèi)組下消費(fèi)者Id列表;

          (3) 先對Topic下的消息消費(fèi)隊列、消費(fèi)者Id排序,然后用消息隊列分配策略算法(默認(rèn)為:消息隊列的平均分配算法),計算出待拉取的消息隊列。這里的平均分配算法,類似于分頁的算法,將所有MessageQueue排好序類似于記錄,將所有消費(fèi)端Consumer排好序類似頁數(shù),并求出每一頁需要包含的平均size和每個頁面記錄的范圍range,最后遍歷整個range而計算出當(dāng)前Consumer端應(yīng)該分配到的的MessageQueue。

          Cosumer分配

          (4) 然后,調(diào)用updateProcessQueueTableInRebalance()方法,具體的做法是,先將分配到的消息隊列集合(mqSet)與processQueueTable做一個過濾比對。

          • 上圖中processQueueTable標(biāo)注的紅色部分,表示與分配到的消息隊列集合mqSet互不包含。將這些隊列設(shè)置Dropped屬性為true,然后查看這些隊列是否可以移除出processQueueTable緩存變量,這里具體執(zhí)行removeUnnecessaryMessageQueue()方法,即每隔1s 查看是否可以獲取當(dāng)前消費(fèi)處理隊列的鎖,拿到的話返回true。如果等待1s后,仍然拿不到當(dāng)前消費(fèi)處理隊列的鎖則返回false。如果返回true,則從processQueueTable緩存變量中移除對應(yīng)的Entry;
          • 上圖中processQueueTable的綠色部分,表示與分配到的消息隊列集合mqSet的交集。判斷該P(yáng)rocessQueue是否已經(jīng)過期了,在Pull模式的不用管,如果是Push模式的,設(shè)置Dropped屬性為true,并且調(diào)用removeUnnecessaryMessageQueue()方法,像上面一樣嘗試移除Entry;
          • 最后,為過濾后的消息隊列集合(mqSet)中的每個MessageQueue創(chuàng)建一個ProcessQueue對象并存入RebalanceImpl的processQueueTable隊列中(其中調(diào)用RebalanceImpl實(shí)例的computePullFromWhere(MessageQueue mq)方法獲取該MessageQueue對象的下一個進(jìn)度消費(fèi)值offset,隨后填充至接下來要創(chuàng)建的pullRequest對象屬性中),并創(chuàng)建拉取請求對象—pullRequest添加到拉取列表—pullRequestList中,最后執(zhí)行dispatchPullRequest()方法,將Pull消息的請求對象PullRequest依次放入PullMessageService服務(wù)線程的阻塞隊列pullRequestQueue中,待該服務(wù)線程取出后向Broker端發(fā)起Pull消息的請求。其中,可以重點(diǎn)對比下,RebalancePushImpl和RebalancePullImpl兩個實(shí)現(xiàn)類的dispatchPullRequest()方法不同,RebalancePullImpl類里面的該方法為空。

          消息消費(fèi)隊列在同一消費(fèi)組不同消費(fèi)者之間的負(fù)載均衡,其核心設(shè)計理念是在一個消息消費(fèi)隊列在同一時間只允許被同一消費(fèi)組內(nèi)的一個消費(fèi)者消費(fèi),一個消息消費(fèi)者能同時消費(fèi)多個消息隊列。

          23.RocketMQ消息長輪詢了解嗎?

          所謂的長輪詢,就是Consumer 拉取消息,如果對應(yīng)的 Queue 如果沒有數(shù)據(jù),Broker 不會立即返回,而是把 PullReuqest hold起來,等待 queue 有了消息后,或者長輪詢阻塞時間到了,再重新處理該 queue 上的所有 PullRequest。

          長輪詢簡單示意圖
          • PullMessageProcessor#processRequest
          //如果沒有拉到數(shù)據(jù)
          case ResponseCode.PULL_NOT_FOUND:
          // broker 和 consumer 都允許 suspend,默認(rèn)開啟
          if (brokerAllowSuspend && hasSuspendFlag) {
              long pollingTimeMills = suspendTimeoutMillisLong;
              if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                  pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
              }

              String topic = requestHeader.getTopic();
              long offset = requestHeader.getQueueOffset();
              int queueId = requestHeader.getQueueId();
              //封裝一個PullRequest
              PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                      this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
              //把PullRequest掛起來
              this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
              response = null;
              break;
          }

          掛起的請求,有一個服務(wù)線程會不停地檢查,看queue中是否有數(shù)據(jù),或者超時。

          • PullRequestHoldService#run()
          @Override
          public void run() {
              log.info("{} service started", this.getServiceName());
              while (!this.isStopped()) {
                  try {
                      if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                          this.waitForRunning(5 * 1000);
                      } else {
                          this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                      }

                      long beginLockTimestamp = this.systemClock.now();
                      //檢查hold住的請求
                      this.checkHoldRequest();
                      long costTime = this.systemClock.now() - beginLockTimestamp;
                      if (costTime > 5 * 1000) {
                          log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
                      }
                  } catch (Throwable e) {
                      log.warn(this.getServiceName() + " service has exception. ", e);
                  }
              }

              log.info("{} service end", this.getServiceName());
          }

          沒有什么使我停留——除了目的,縱然岸旁有玫瑰、有綠蔭、有寧靜的港灣,我是不系之舟

          推薦閱讀

          瀏覽 49
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  五月色丁香亚洲色综合 | 欧美激情综合五月色丁香 | 麻豆91久久久久久 | 变态另类成人AV一区二区 | 大香蕉在线视频看精品 |