<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          遇到了消息堆積,但是問題不大

          共 3233字,需瀏覽 7分鐘

           ·

          2021-10-28 23:58

          b05a85ff348fa27bd770116f01365f21.webp


          大家好,我是captain,如果覺得下面的文章對你有所幫助,歡迎大家關(guān)注


          上一篇說過了如何保證消息不丟失,分別從producer、broker和consumer三個(gè)角色來保證消息的不丟失,其實(shí)說到底也不可能百分百保證消息不丟失,這種當(dāng)然是極端的情況下


          在這之前還有很多篇,傳送門在這里



          這一篇我們要說的話題是消息的堆積處理,其實(shí)這個(gè)話題還是挺大的,因?yàn)橄⒍逊e還是真的很令人頭疼的,當(dāng)堆積的量很大的時(shí)候,這真的是個(gè)很暴躁的問題,不過這時(shí)候真考驗(yàn)大家冷靜的處理問題的能力了


          ? ? ? ? ? ? ? ? ? ? ? ? ? ??6a226acca2f4ad3ae2bcc9efaa527a54.webp


          我們一起來分析分析有關(guān)問題吧


          大量的消息堆積在MQ中幾個(gè)小時(shí)還沒解決怎么辦呢


          一般這種比較著急的問題,最好的辦法就是臨時(shí)擴(kuò)容,用更快的速度來消費(fèi)數(shù)據(jù)


          ? ? ?1、臨時(shí)建立一個(gè)新的Topic,然后調(diào)整queue的數(shù)量為原來的10倍或者20倍,根據(jù)堆積情況來決定


          ? ? ?2、然后寫一個(gè)臨時(shí)分發(fā)消息的consumer程序,這個(gè)程序部署上去消費(fèi)積壓的消息,消費(fèi)的就是剛剛新建的Topic,消費(fèi)之后不做耗時(shí)的處理,只需要直接均勻的輪詢將這些消息輪詢的寫入到臨時(shí)創(chuàng)建的queue里面即可


          ? ? ?3、然后增加相應(yīng)倍數(shù)的機(jī)器來部署真正的consumer消費(fèi),注意這里的Topic,然后讓這些consumer去真正的消費(fèi)這些臨時(shí)的queue里面的消息


          不知道大家明白沒有,很簡單的道理,我給大家舉個(gè)形象的例子


          一個(gè)topic堵住了,新建一個(gè)topic去進(jìn)行分流,臨時(shí)將queue資源和consumer資源擴(kuò)大10倍,將消息平均分配到這些新增的queue資源和consumer資源上,以正常10倍的速度來消費(fèi)消息,等到這些堆積的消息消費(fèi)完了,便可以恢復(fù)到原來的部署架構(gòu)


          這種只是用于臨時(shí)解決一些異常情況導(dǎo)致的消息堆積的處理,如果消息經(jīng)常出現(xiàn)堵塞的情況,那該考慮一下徹底增強(qiáng)系統(tǒng)的部署架構(gòu)了


          消息設(shè)置了過期時(shí)間,過期就丟了怎么辦呢


          在rabbitmq中,可以設(shè)置過期時(shí)間TTL,和Redis的過期時(shí)間一樣,如果消息在queue中積壓超過一定時(shí)間就會被rabbitmq清理掉,這個(gè)數(shù)據(jù)就沒了


          這樣可能會造成大量的數(shù)據(jù)丟失


          這種情況下上面的解決方案就不太合適了,可以采取批量重導(dǎo)的方案來解決,在系統(tǒng)流量比較低的時(shí)候,用程序去查詢丟失的這部分?jǐn)?shù)據(jù),然后將消息重新發(fā)送到MQ中,把丟失的數(shù)據(jù)重新補(bǔ)回來


          這也算是一種補(bǔ)償任務(wù)吧,補(bǔ)償任務(wù)一般是用于對定時(shí)跑批的一種補(bǔ)償


          分析下RocketMQ中的消息堆積原因


          消息的堆積歸根到底就是生產(chǎn)者生產(chǎn)消息的速度和消費(fèi)者消費(fèi)的速度不匹配導(dǎo)致的,輸入的和消費(fèi)的速度不統(tǒng)一


          或許是突然搞了一波促銷,系統(tǒng)業(yè)務(wù)量暴增,導(dǎo)致生產(chǎn)者發(fā)消息暴增,消費(fèi)速度跟不上


          也有可能是消費(fèi)方出現(xiàn)失敗的情況,瘋狂重試,也或者就是消費(fèi)方的消費(fèi)能力太低了


          ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?e4baf9b6feb2bedf9cf1fb91a96124a6.webp


          RocketMQ是按照隊(duì)列進(jìn)行消息負(fù)載的,如果consumer中的一臺機(jī)器由于硬件各方面原因?qū)е略摍C(jī)器上的消息隊(duì)列不能及時(shí)處理,就會造成整個(gè)消息隊(duì)列的堆積


          RocketMQ分為發(fā)布方和訂閱方,雙方都有負(fù)載均衡策略,默認(rèn)都是采用平均分配,producer消息以輪詢方式發(fā)送到消息隊(duì)列queue中,broker將這些的queue再平均分配到屬于同一個(gè)group id的訂閱方集群


          • .如果消費(fèi)者consumer機(jī)器數(shù)量和消息隊(duì)列相等,則消息隊(duì)列平均分配到每一個(gè)consumer上

          • 如果consumer數(shù)量大于消息隊(duì)列數(shù)量,則超出消息隊(duì)列數(shù)量的機(jī)器沒有可以處理的消息隊(duì)列

          • 若消息隊(duì)列數(shù)量不是consumer的整數(shù)倍,則部分consumer會承擔(dān)跟多的消息隊(duì)列的消費(fèi)任務(wù)


          如果其中一臺機(jī)器處理變慢,可能是機(jī)器硬件、系統(tǒng)、遠(yuǎn)程 RPC 調(diào)用或 Java GC 等原因?qū)е路峙渲链藱C(jī)器上的 Queue 的消息不能及時(shí)處理


          消息隊(duì)列 RocketMQ 版的消息負(fù)載是按 Queue 為粒度維護(hù),所以,整個(gè) Queue 上的消息都會堆積


          那說一下解決思路吧


          我們知道了最根本原因是生產(chǎn)和消費(fèi)速度不匹配導(dǎo)致的,這種問題要是經(jīng)常出現(xiàn),就是系統(tǒng)架構(gòu)導(dǎo)致,這種需要考慮增加消費(fèi)方的數(shù)量了


          如果是搞促銷的這種臨時(shí)情況導(dǎo)致的,這種情況下系統(tǒng)應(yīng)該會比較快的消化掉,堆積時(shí)間不會很快,如果搞促銷時(shí)間很長,持續(xù)高流量時(shí)間很長,那沒得辦法,還是得加機(jī)器


          經(jīng)常出現(xiàn)這種消息堆積問題,需要先定位一下消費(fèi)滿的原因,也也可能是代碼bug,導(dǎo)致多次重試,如果是bug則處理bug,優(yōu)化下消費(fèi)的邏輯


          再者就要考慮水平擴(kuò)容,增加Topic的queue數(shù)量和消費(fèi)者的數(shù)量,這兩者增加的時(shí)候需要考慮兩邊的平衡,隊(duì)列數(shù)量一定要增加,不然新增加的消費(fèi)數(shù)量者會導(dǎo)致無消息消費(fèi)的尷尬場面,一個(gè)topic中的一個(gè)隊(duì)列只會分配給一個(gè)消費(fèi)者


          消費(fèi)者數(shù)量超過隊(duì)列數(shù)量的時(shí)候,超出的部分消費(fèi)者就無消息可以消費(fèi)了


          RocketMQ中消費(fèi)完的消息去了哪里呢


          消息的存儲是一直存在于CommitLog文件中的,大家都知道CommitLog是以文件為單位存在的,而且RocketMQ的設(shè)計(jì)是只允許順序?qū)懀簿鸵馕吨邢⒍际琼樞虻膶懭氲竭@個(gè)文件中的


          而每個(gè)消息的大小又不是定長的,所以這就決定了消息幾乎不可能按照消息為單位進(jìn)行刪除,邏輯極其復(fù)雜


          消息一旦被消費(fèi)了之后是不會被立即清除的,還是會存在于CommitLog文件中的,那問題來了,消息未刪除,RocketMQ是如何知道哪些消息已經(jīng)被消費(fèi)過,哪些還未消費(fèi)呢


          答案就是客戶端會維護(hù)一個(gè)消息的offset,客戶端拉取完消息之后,broker會隨著響應(yīng)體返回一個(gè)下一次拉取的位置,消費(fèi)者會更新自己的下一次的pull的位置


          CommitLog文件什么時(shí)候進(jìn)行清除


          消息存儲到該文件之后,也是會被清理的,但是這個(gè)清理只會在下面這些條件中,任一條件成立的時(shí)候才會批量的刪除CommitLog消息文件


          • 消息文件過期(默認(rèn)72小時(shí)),且到達(dá)清理時(shí)點(diǎn)(默認(rèn)是凌晨4點(diǎn)),刪除過期文件。

          • 消息文件過期(默認(rèn)72小時(shí)),且磁盤空間達(dá)到了水位線(默認(rèn)75%),刪除過期文件。

          • 磁盤已經(jīng)達(dá)到必須釋放的上限(85%水位線)的時(shí)候,則開始批量清理文件(無論是否過期),直到空間充足。



          注:若磁盤空間達(dá)到危險(xiǎn)水位線(默認(rèn)90%),出于保護(hù)自身的目的,broker會拒絕寫入服務(wù)。

          ?

          為什么這么設(shè)計(jì)呢


          CommitLog文件默認(rèn)大小是1GB,在清理的時(shí)候?qū)儆诖笪募僮髁耍琁O壓力也是有的,這樣設(shè)計(jì)該文件的優(yōu)點(diǎn)我大概說幾個(gè),當(dāng)然肯定還有些別的


          只需要保存一份消息文件:一個(gè)消息如果需要被多個(gè)消費(fèi)者組消費(fèi),消息只需要保存一份即可,消費(fèi)進(jìn)度單獨(dú)保存,這樣比較容易支撐強(qiáng)大的消息存儲能力


          支持回溯:把消息的消費(fèi)位置的決定權(quán)放在客戶端,只要消息還在,就可以消費(fèi),所以也就有了RocketMQ支持的回溯消費(fèi)


          像看視頻一樣,可以把鏡頭調(diào)到前面去,重新看一遍剛剛的視頻


          支持消息索引服務(wù):RocketMQ中有一個(gè)索引文件,消息只要還存在于CommitLog中,就可以被搜索出來,方便排查問題



          佛系求關(guān)注




          Captain希望有一天能夠靠寫作養(yǎng)活自己,現(xiàn)在還在磨練,這個(gè)時(shí)間可能會持續(xù)很久,但是,請看我漂亮的堅(jiān)持


          感謝大家能夠做我最初的讀者和傳播者,請大家相信,只要你給我一份愛,我終究會還你們一頁情的。


          Captain會持續(xù)更新技術(shù)文章,和生活中的暴躁文章,歡迎大家關(guān)注【Java賊船】,成為船長的學(xué)習(xí)小伙伴,和船長一起乘千里風(fēng)、破萬里浪


          哦對了,后續(xù)所有的遠(yuǎn)程文章都會更新到這里


          https://github.com/DayuMM2021/Java


          e4656741ba4c8d29f3c15c7bec0b4b91.webp


          瀏覽 61
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  五月四虎影院 | 欧美高清猛交xxx黑人猛交性乱 | 午夜爽爽 | 免费日皮视频在线观看 | 女公安一级毛片陆放 |