<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          MQ 消息積壓?jiǎn)栴}與解決方案

          共 3185字,需瀏覽 7分鐘

           ·

          2022-02-15 15:04

          面試題

          如何解決消息隊(duì)列的延時(shí)以及過期失效問題?消息隊(duì)列滿了以后該怎么處理?有幾百萬消息持續(xù)積壓幾小時(shí),說說怎么解決?

          面試官心理分析

          你看這問法,其實(shí)本質(zhì)針對(duì)的場(chǎng)景,都是說,可能你的消費(fèi)端出了問題,不消費(fèi)了;或者消費(fèi)的速度極其慢。接著就坑爹了,可能你的消息隊(duì)列集群的磁盤都快寫滿了,都沒人消費(fèi),這個(gè)時(shí)候怎么辦?或者是這整個(gè)就積壓了幾個(gè)小時(shí),你這個(gè)時(shí)候怎么辦?或者是你積壓的時(shí)間太長(zhǎng)了,導(dǎo)致比如 RabbitMQ 設(shè)置了消息過期時(shí)間后就沒了怎么辦?

          所以就這事兒,其實(shí)線上挺常見的,一般不出,一出就是大 case。一般常見于,舉個(gè)例子,消費(fèi)端每次消費(fèi)之后要寫 mysql,結(jié)果 mysql 掛了,消費(fèi)端 hang 那兒了,不動(dòng)了;或者是消費(fèi)端出了個(gè)什么岔子,導(dǎo)致消費(fèi)速度極其慢。

          面試題剖析

          關(guān)于這個(gè)事兒,我們一個(gè)一個(gè)來梳理吧,先假設(shè)一個(gè)場(chǎng)景,我們現(xiàn)在消費(fèi)端出故障了,然后大量消息在 mq 里積壓,現(xiàn)在出事故了,慌了。

          大量消息在 mq 里積壓了幾個(gè)小時(shí)了還沒解決

          幾千萬條數(shù)據(jù)在 MQ 里積壓了七八個(gè)小時(shí),從下午 4 點(diǎn)多,積壓到了晚上 11 點(diǎn)多。這個(gè)是我們真實(shí)遇到過的一個(gè)場(chǎng)景,確實(shí)是線上故障了,這個(gè)時(shí)候要不然就是修復(fù) consumer 的問題,讓它恢復(fù)消費(fèi)速度,然后傻傻的等待幾個(gè)小時(shí)消費(fèi)完畢。這個(gè)肯定不能在面試的時(shí)候說吧。

          一個(gè)消費(fèi)者一秒是 1000 條,一秒 3 個(gè)消費(fèi)者是 3000 條,一分鐘就是 18 萬條。所以如果你積壓了幾百萬到上千萬的數(shù)據(jù),即使消費(fèi)者恢復(fù)了,也需要大概 1 小時(shí)的時(shí)間才能恢復(fù)過來。

          一般這個(gè)時(shí)候,只能臨時(shí)緊急擴(kuò)容了,具體操作步驟和思路如下:

          ?先修復(fù) consumer 的問題,確保其恢復(fù)消費(fèi)速度,然后將現(xiàn)有 consumer 都停掉。?新建一個(gè) topic,partition 是原來的 10 倍,臨時(shí)建立好原先 10 倍的 queue 數(shù)量。?然后寫一個(gè)臨時(shí)的分發(fā)數(shù)據(jù)的 consumer 程序,這個(gè)程序部署上去消費(fèi)積壓的數(shù)據(jù),消費(fèi)之后不做耗時(shí)的處理,直接均勻輪詢寫入臨時(shí)建立好的 10 倍數(shù)量的 queue。?接著臨時(shí)征用 10 倍的機(jī)器來部署 consumer,每一批 consumer 消費(fèi)一個(gè)臨時(shí) queue 的數(shù)據(jù)。這種做法相當(dāng)于是臨時(shí)將 queue 資源和 consumer 資源擴(kuò)大 10 倍,以正常的 10 倍速度來消費(fèi)數(shù)據(jù)。?等快速消費(fèi)完積壓數(shù)據(jù)之后,得恢復(fù)原先部署的架構(gòu)重新用原先的 consumer 機(jī)器來消費(fèi)消息。

          mq 中的消息過期失效了

          假設(shè)你用的是 RabbitMQ,RabbtiMQ 是可以設(shè)置過期時(shí)間的,也就是 TTL。如果消息在 queue 中積壓超過一定的時(shí)間就會(huì)被 RabbitMQ 給清理掉,這個(gè)數(shù)據(jù)就沒了。那這就是第二個(gè)坑了。這就不是說數(shù)據(jù)會(huì)大量積壓在 mq 里,而是大量的數(shù)據(jù)會(huì)直接搞丟

          這個(gè)情況下,就不是說要增加 consumer 消費(fèi)積壓的消息,因?yàn)閷?shí)際上沒啥積壓,而是丟了大量的消息。我們可以采取一個(gè)方案,就是批量重導(dǎo),這個(gè)我們之前線上也有類似的場(chǎng)景干過。就是大量積壓的時(shí)候,我們當(dāng)時(shí)就直接丟棄數(shù)據(jù)了,然后等過了高峰期以后,比如大家一起喝咖啡熬夜到晚上 12 點(diǎn)以后,用戶都睡覺了。這個(gè)時(shí)候我們就開始寫程序,將丟失的那批數(shù)據(jù),寫個(gè)臨時(shí)程序,一點(diǎn)一點(diǎn)的查出來,然后重新灌入 mq 里面去,把白天丟的數(shù)據(jù)給他補(bǔ)回來。也只能是這樣了。

          假設(shè) 1 萬個(gè)訂單積壓在 mq 里面,沒有處理,其中 1000 個(gè)訂單都丟了,你只能手動(dòng)寫程序把那 1000 個(gè)訂單給查出來,手動(dòng)發(fā)到 mq 里去再補(bǔ)一次。

          mq 都快寫滿了

          如果消息積壓在 mq 里,你很長(zhǎng)時(shí)間都沒有處理掉,此時(shí)導(dǎo)致 mq 都快寫滿了,咋辦?這個(gè)還有別的辦法嗎?沒有,誰讓你第一個(gè)方案執(zhí)行的太慢了,你臨時(shí)寫程序,接入數(shù)據(jù)來消費(fèi),消費(fèi)一個(gè)丟棄一個(gè),都不要了,快速消費(fèi)掉所有的消息。然后走第二個(gè)方案,到了晚上再補(bǔ)數(shù)據(jù)吧。


          對(duì)于 RocketMQ,官方針對(duì)消息積壓?jiǎn)栴},提供了解決方案。

          1. 提高消費(fèi)并行度

          絕大部分消息消費(fèi)行為都屬于 IO 密集型,即可能是操作數(shù)據(jù)庫,或者調(diào)用 RPC,這類消費(fèi)行為的消費(fèi)速度在于后端數(shù)據(jù)庫或者外系統(tǒng)的吞吐量,通過增加消費(fèi)并行度,可以提高總的消費(fèi)吞吐量,但是并行度增加到一定程度,反而會(huì)下降。所以,應(yīng)用必須要設(shè)置合理的并行度。如下有幾種修改消費(fèi)并行度的方法:

          同一個(gè) ConsumerGroup 下,通過增加 Consumer 實(shí)例數(shù)量來提高并行度(需要注意的是超過訂閱隊(duì)列數(shù)的 Consumer 實(shí)例無效)。可以通過加機(jī)器,或者在已有機(jī)器啟動(dòng)多個(gè)進(jìn)程的方式。提高單個(gè) Consumer 的消費(fèi)并行線程,通過修改參數(shù) consumeThreadMin、consumeThreadMax 實(shí)現(xiàn)。

          2. 批量方式消費(fèi)

          某些業(yè)務(wù)流程如果支持批量方式消費(fèi),則可以很大程度上提高消費(fèi)吞吐量,例如訂單扣款類應(yīng)用,一次處理一個(gè)訂單耗時(shí) 1 s,一次處理 10 個(gè)訂單可能也只耗時(shí) 2 s,這樣即可大幅度提高消費(fèi)的吞吐量,通過設(shè)置 consumer 的 consumeMessageBatchMaxSize 返個(gè)參數(shù),默認(rèn)是 1,即一次只消費(fèi)一條消息,例如設(shè)置為 N,那么每次消費(fèi)的消息數(shù)小于等于 N。

          3. 跳過非重要消息

          發(fā)生消息堆積時(shí),如果消費(fèi)速度一直追不上發(fā)送速度,如果業(yè)務(wù)對(duì)數(shù)據(jù)要求不高的話,可以選擇丟棄不重要的消息。例如,當(dāng)某個(gè)隊(duì)列的消息數(shù)堆積到 100000 條以上,則嘗試丟棄部分或全部消息,這樣就可以快速追上發(fā)送消息的速度。示例代碼如下:

          public?ConsumeConcurrentlyStatus?consumeMessage(
          ????????????List?msgs,
          ????????????ConsumeConcurrentlyContext?context)?{
          ????long?offset?=?msgs.get(0).getQueueOffset();
          ????String?maxOffset?=
          ????????????msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
          ????long?diff?=?Long.parseLong(maxOffset)?-?offset;
          ????if?(diff?>?100000)?{
          ????????//?TODO?消息堆積情況的特殊處理
          ????????return?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
          ????}
          ????//?TODO?正常消費(fèi)過程
          ????return?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
          }

          4. 優(yōu)化每條消息消費(fèi)過程

          舉例如下,某條消息的消費(fèi)過程如下:

          ?根據(jù)消息從 DB 查詢【數(shù)據(jù) 1】?根據(jù)消息從 DB 查詢【數(shù)據(jù) 2】?復(fù)雜的業(yè)務(wù)計(jì)算?向 DB 插入【數(shù)據(jù) 3】?向 DB 插入【數(shù)據(jù) 4】

          這條消息的消費(fèi)過程中有 4 次與 DB 的 交互,如果按照每次 5ms 計(jì)算,那么總共耗時(shí) 20ms,假設(shè)業(yè)務(wù)計(jì)算耗時(shí) 5ms,那么總過耗時(shí) 25ms,所以如果能把 4 次 DB 交互優(yōu)化為 2 次,那么總耗時(shí)就可以優(yōu)化到 15ms,即總體性能提高了 40%。所以應(yīng)用如果對(duì)時(shí)延敏感的話,可以把 DB 部署在 SSD 硬盤,相比于 SCSI 磁盤,前者的 RT 會(huì)小很多。

          瀏覽 116
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  777777欧美 | 做爱污污短视屏在线观看 | 美女骚逼丝袜野战视频 | 韩国不卡一区二区 | 亚洲三级先锋影音 |