<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          消息隊(duì)列之推還是拉,RocketMQ 和 Kafka是如何做的?

          共 4499字,需瀏覽 9分鐘

           ·

          2020-09-10 16:10

          今天我們就來談一談消息隊(duì)列的推拉模式,這也是一個(gè)面試熱點(diǎn),例如你在簡(jiǎn)歷里面寫了 RocketMQ ,基本上會(huì)問你 RocketMQ 采用的是推模式還是拉模式啊?是拉模式?不是有 PushConsumer 嗎?

          今天我們就來談?wù)勍评J剑⑶以賮砜纯?RocketMQ 和 Kafka 是如何做的。


          推拉模式

          首先明確一下推拉模式到底是在討論消息隊(duì)列的哪一個(gè)步驟,一般而言我們?cè)谡務(wù)?span style="font-family: Optima-Regular, PingFangTC-light;font-size: 15px;font-weight: 700;color: rgb(60, 112, 198);">推拉模式的時(shí)候指的是 Comsumer 和 Broker 之間的交互

          默認(rèn)的認(rèn)為 Producer 與 Broker 之間就是推的方式,即 Producer 將消息推送給 Broker,而不是 Broker 主動(dòng)去拉取消息。

          想象一下,如果需要 Broker 去拉取消息,那么 Producer 就必須在本地通過日志的形式保存消息來等待 Broker 的拉取,如果有很多生產(chǎn)者的話,那么消息的可靠性不僅僅靠 Broker 自身,還需要靠成百上千的 Producer。

          Broker 還能靠多副本等機(jī)制來保證消息的存儲(chǔ)可靠,而成百上千的 Producer 可靠性就有點(diǎn)難辦了,所以默認(rèn)的 Producer 都是推消息給 Broker。

          所以說有些情況分布式好,而有些時(shí)候還是集中管理好。


          推模式

          推模式指的是消息從 Broker 推向 Consumer,即 Consumer 被動(dòng)的接收消息,由 Broker 來主導(dǎo)消息的發(fā)送。

          我們來想一下推模式有什么好處?

          消息實(shí)時(shí)性高, Broker 接受完消息之后可以立馬推送給 Consumer。

          對(duì)于消費(fèi)者使用來說更簡(jiǎn)單,簡(jiǎn)單啊就等著,反正有消息來了就會(huì)推過來。

          推模式有什么缺點(diǎn)?

          推送速率難以適應(yīng)消費(fèi)速率,推模式的目標(biāo)就是以最快的速度推送消息,當(dāng)生產(chǎn)者往 Broker 發(fā)送消息的速率大于消費(fèi)者消費(fèi)消息的速率時(shí),隨著時(shí)間的增長消費(fèi)者那邊可能就“爆倉”了,因?yàn)楦鞠M(fèi)不過來啊。當(dāng)推送速率過快就像 DDos 攻擊一樣消費(fèi)者就傻了。

          并且不同的消費(fèi)者的消費(fèi)速率還不一樣,身為 Broker 很難平衡每個(gè)消費(fèi)者的推送速率,如果要實(shí)現(xiàn)自適應(yīng)的推送速率那就需要在推送的時(shí)候消費(fèi)者告訴 Broker ,我不行了你推慢點(diǎn)吧,然后 Broker 需要維護(hù)每個(gè)消費(fèi)者的狀態(tài)進(jìn)行推送速率的變更。

          這其實(shí)就增加了 Broker 自身的復(fù)雜度。

          所以說推模式難以根據(jù)消費(fèi)者的狀態(tài)控制推送速率,適用于消息量不大、消費(fèi)能力強(qiáng)要求實(shí)時(shí)性高的情況下。


          拉模式

          拉模式指的是 Consumer 主動(dòng)向 Broker 請(qǐng)求拉取消息,即 Broker 被動(dòng)的發(fā)送消息給 Consumer。

          我們來想一下拉模式有什么好處?

          拉模式主動(dòng)權(quán)就在消費(fèi)者身上了,消費(fèi)者可以根據(jù)自身的情況來發(fā)起拉取消息的請(qǐng)求。假設(shè)當(dāng)前消費(fèi)者覺得自己消費(fèi)不過來了,它可以根據(jù)一定的策略停止拉取,或者間隔拉取都行。

          拉模式下 Broker 就相對(duì)輕松了,它只管存生產(chǎn)者發(fā)來的消息,至于消費(fèi)的時(shí)候自然由消費(fèi)者主動(dòng)發(fā)起,來一個(gè)請(qǐng)求就給它消息唄,從哪開始拿消息,拿多少消費(fèi)者都告訴它,它就是一個(gè)沒有感情的工具人,消費(fèi)者要是沒來取也不關(guān)它的事。

          拉模式可以更合適的進(jìn)行消息的批量發(fā)送,基于推模式可以來一個(gè)消息就推送,也可以緩存一些消息之后再推送,但是推送的時(shí)候其實(shí)不知道消費(fèi)者到底能不能一次性處理這么多消息。而拉模式就更加合理,它可以參考消費(fèi)者請(qǐng)求的信息來決定緩存多少消息之后批量發(fā)送。

          拉模式有什么缺點(diǎn)?

          消息延遲,畢竟是消費(fèi)者去拉取消息,但是消費(fèi)者怎么知道消息到了呢?所以它只能不斷地拉取,但是又不能很頻繁地請(qǐng)求,太頻繁了就變成消費(fèi)者在攻擊 Broker 了。因此需要降低請(qǐng)求的頻率,比如隔個(gè) 2 秒請(qǐng)求一次,你看著消息就很有可能延遲 2 秒了。

          消息忙請(qǐng)求,忙請(qǐng)求就是比如消息隔了幾個(gè)小時(shí)才有,那么在幾個(gè)小時(shí)之內(nèi)消費(fèi)者的請(qǐng)求都是無效的,在做無用功。


          那到底是推還是拉

          可以看到推模式和拉模式各有優(yōu)缺點(diǎn),到底該如何選擇呢?

          RocketMQ 和 Kafka 都選擇了拉模式,當(dāng)然業(yè)界也有基于推模式的消息隊(duì)列如 ActiveMQ。

          我個(gè)人覺得拉模式更加的合適,因?yàn)楝F(xiàn)在的消息隊(duì)列都有持久化消息的需求,也就是說本身它就有個(gè)存儲(chǔ)功能,它的使命就是接受消息,保存好消息使得消費(fèi)者可以消費(fèi)消息即可。

          而消費(fèi)者各種各樣,身為 Broker 不應(yīng)該有依賴于消費(fèi)者的傾向,我已經(jīng)為你保存好消息了,你要就來拿好了。

          雖說一般而言 Broker 不會(huì)成為瓶頸,因?yàn)橄M(fèi)端有業(yè)務(wù)消耗比較慢,但是 Broker 畢竟是一個(gè)中心點(diǎn),能輕量就盡量輕量。

          那么竟然 RocketMQ 和 Kafka 都選擇了拉模式,它們就不怕拉模式的缺點(diǎn)么?怕,所以它們操作了一波,減輕了拉模式的缺點(diǎn)。


          長輪詢

          RocketMQ 和 Kafka 都是利用“長輪詢”來實(shí)現(xiàn)拉模式,我們就來看看它們是如何操作的。

          為了簡(jiǎn)單化,下面我把消息不滿足本次拉取的條數(shù)啊、總大小啊等等都統(tǒng)一描述成還沒有消息,反正都是不滿足條件。


          RocketMQ 中的長輪詢

          RocketMQ 中的 PushConsumer 其實(shí)是披著拉模式的方法,只是看起來像推模式而已

          因?yàn)?RocketMQ 在被背后偷偷的幫我們?nèi)?Broker 請(qǐng)求數(shù)據(jù)了。

          后臺(tái)會(huì)有個(gè) RebalanceService 線程,這個(gè)線程會(huì)根據(jù) topic 的隊(duì)列數(shù)量和當(dāng)前消費(fèi)組的消費(fèi)者個(gè)數(shù)做負(fù)載均衡,每個(gè)隊(duì)列產(chǎn)生的 pullRequest 放入阻塞隊(duì)列 pullRequestQueue 中。然后又有個(gè) PullMessageService 線程不斷的從阻塞隊(duì)列 pullRequestQueue 中獲取 pullRequest,然后通過網(wǎng)絡(luò)請(qǐng)求 broker,這樣實(shí)現(xiàn)的準(zhǔn)實(shí)時(shí)拉取消息。

          這一部分代碼我不截了,就是這么個(gè)事兒,稍后會(huì)用圖來展示。

          然后 Broker 的 PullMessageProcessor 里面的 processRequest 方法是用來處理拉消息請(qǐng)求的,有消息就直接返回,如果沒有消息怎么辦呢?我們來看一下代碼。

          我們?cè)賮砜聪?suspendPullRequest 方法做了什么。

          而 PullRequestHoldService 這個(gè)線程會(huì)每 5 秒從 pullRequestTable 取PullRequest請(qǐng)求,然后看看待拉取消息請(qǐng)求的偏移量是否小于當(dāng)前消費(fèi)隊(duì)列最大偏移量,如果條件成立則說明有新消息了,則會(huì)調(diào)用 notifyMessageArriving ,最終調(diào)用 PullMessageProcessor 的 executeRequestWhenWakeup() 方法重新嘗試處理這個(gè)消息的請(qǐng)求,也就是再來一次,整個(gè)長輪詢的時(shí)間默認(rèn) 30 秒。

          簡(jiǎn)單的說就是 5 秒會(huì)檢查一次消息時(shí)候到了,如果到了則調(diào)用 processRequest 再處理一次。這好像不太實(shí)時(shí)啊?5秒?

          別急,還有個(gè) ReputMessageService 線程,這個(gè)線程用來不斷地從 commitLog 中解析數(shù)據(jù)并分發(fā)請(qǐng)求,構(gòu)建出 ConsumeQueue 和 IndexFile 兩種類型的數(shù)據(jù),并且也會(huì)有喚醒請(qǐng)求的操作,來彌補(bǔ)每 5s 一次這么慢的延遲

          代碼我就不截了,就是消息寫入并且會(huì)調(diào)用 pullRequestHoldService#notifyMessageArriving。

          最后我再來畫個(gè)圖,描述一下整個(gè)流程。



          Kafka 中的長輪詢

          像 Kafka 在拉請(qǐng)求中有參數(shù),可以使得消費(fèi)者請(qǐng)求在 “長輪詢” 中阻塞等待。

          簡(jiǎn)單的說就是消費(fèi)者去 Broker 拉消息,定義了一個(gè)超時(shí)時(shí)間,也就是說消費(fèi)者去請(qǐng)求消息,如果有的話馬上返回消息,如果沒有的話消費(fèi)者等著直到超時(shí),然后再次發(fā)起拉消息請(qǐng)求。

          并且 Broker 也得配合,如果消費(fèi)者請(qǐng)求過來,有消息肯定馬上返回,沒有消息那就建立一個(gè)延遲操作,等條件滿足了再返回。

          我們來簡(jiǎn)單的看一下源碼,為了突出重點(diǎn),我會(huì)刪減一些代碼。

          先來看消費(fèi)者端的代碼。

          上面那個(gè) poll 接口想必大家都很熟悉,其實(shí)從注解直接就知道了確實(shí)是等待數(shù)據(jù)的到來或者超時(shí),我們?cè)俸?jiǎn)單的往下看。

          我們?cè)賮砜聪伦罱K client.poll 調(diào)用的是什么。

          最后調(diào)用的就是 Kafka 包裝過的 selector,而最終會(huì)調(diào)用 Java nio 的 select(timeout)

          現(xiàn)在消費(fèi)者端的代碼已經(jīng)清晰了,我們?cè)賮砜纯?Broker 如何做的

          Broker 處理所有請(qǐng)求的入口其實(shí)我在之前的文章介紹過,就在 KafkaApis.scala 文件的 handle 方法下,這次的主角就是 handleFetchRequest 。

          這個(gè)方法進(jìn)來,我截取最重要的部分。

          下面的圖片就是 fetchMessages 方法內(nèi)部實(shí)現(xiàn),源碼給的注釋已經(jīng)很清晰了,大家放大圖片看下即可。

          這個(gè)煉獄名字取得很有趣,簡(jiǎn)單的說就是利用我之前文章提到的時(shí)間輪,來執(zhí)行定時(shí)任務(wù),例如這里是delayedFetchPurgatory,專門用來處理延遲拉取操作。

          我們先簡(jiǎn)單想一下,這個(gè)延遲操作都需要實(shí)現(xiàn)哪些方法,首先構(gòu)建的延遲操作需要有檢查機(jī)制,來查看消息是否已經(jīng)到了,然后呢還得有個(gè)消息到了之后該執(zhí)行的方法,還需要有執(zhí)行完畢之后該干啥的方法,當(dāng)然還得有個(gè)超時(shí)之后得干啥的方法。

          這幾個(gè)方法其實(shí)對(duì)應(yīng)的就是代碼里的 DelayedFetch ,這個(gè)類繼承了 DelayedOperation 內(nèi)部有:

          • isCompleted 檢查條件是否滿足的方法

          • tryComplete 條件滿足之后執(zhí)行的方法

          • onComplete 執(zhí)行完畢之后調(diào)用的方法

          • onExpiration 過期之后需要執(zhí)行的方法

          判斷是否過期就是由時(shí)間輪來推動(dòng)判斷的,但是總不能等過期的時(shí)候再去看消息到了沒吧?

          這里 Kafka 和 RocketMQ 的機(jī)制一樣,也會(huì)在消息寫入的時(shí)候提醒這些延遲請(qǐng)求消息來了,具體代碼我不貼了, 在 ReplicaManager#appendRecords 方法內(nèi)部再深入個(gè)兩方法可以看到。

          不過雖說代碼不貼,圖還是要畫一下的。



          小結(jié)一下

          可以看到 RocketMQ ?和 Kafka 都是采用“長輪詢”的機(jī)制,具體的做法都是通過消費(fèi)者等待消息,當(dāng)有消息的時(shí)候 Broker 會(huì)直接返回消息,如果沒有消息都會(huì)采取延遲處理的策略,并且為了保證消息的及時(shí)性,在對(duì)應(yīng)隊(duì)列或者分區(qū)有新消息到來的時(shí)候都會(huì)提醒消息來了,及時(shí)返回消息。

          一句話說就是消費(fèi)者和 Broker 相互配合,拉取消息請(qǐng)求不滿足條件的時(shí)候 hold 住,避免了多次頻繁的拉取動(dòng)作,當(dāng)消息一到就提醒返回。

          ?

          最后

          總的而言推拉模式各有優(yōu)劣,而我個(gè)人覺得一般情況下拉模式更適合于消息隊(duì)列。


          有道無術(shù),術(shù)可成;有術(shù)無道,止于術(shù)

          歡迎大家關(guān)注Java之道公眾號(hào)


          好文章,我在看??

          瀏覽 39
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  奇米狠狠啦av | 中文字幕第45页 | 日韩操逼内射 | 偷拍av| 影音先锋AV啪啪资源 |