<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RocketMQ 消息丟失場景及解決辦法

          共 3073字,需瀏覽 7分鐘

           ·

          2020-09-02 02:15


          點擊上方藍色“程序猿DD”,選擇“設為星標”

          回復“資源”獲取獨家整理的學習資料!

          來源 |?blog.csdn.net/LO_YUN/article/details/103949317

          既然使用在項目中使用了MQ,那么就不可避免的需要考慮消息丟失問題。在一些涉及到了金錢交易的場景下,消息丟失還是很致命的。那么在RocketMQ中存在哪幾種消息丟失的場景呢?

          先來一張最簡單的消費流程圖:

          上圖中大致包含了這么幾種場景:

          • 生產(chǎn)者產(chǎn)生消息發(fā)送給RocketMQ
          • RocketMQ接收到了消息之后,必然需要存到磁盤中,否則斷電或宕機之后會造成數(shù)據(jù)的丟失
          • 消費者從RocketMQ中獲取消息消費,消費成功之后,整個流程結(jié)束

          這三種場景都可能會產(chǎn)生消息的丟失,如下圖所示:

          1、場景1中生產(chǎn)者將消息發(fā)送給Rocket MQ的時候,如果出現(xiàn)了網(wǎng)絡抖動或者通信異常等問題,消息就有可能會丟失

          2、場景2中消息需要持久化到磁盤中,這時會有兩種情況導致消息丟失

          • RocketMQ為了減少磁盤的IO,會先將消息寫入到os cache中,而不是直接寫入到磁盤中,消費者從os cache中獲取消息類似于直接從內(nèi)存中獲取消息,速度更快,過一段時間會由os線程異步的將消息刷入磁盤中,此時才算真正完成了消息的持久化。在這個過程中,如果消息還沒有完成異步刷盤,RocketMQ中的Broker宕機的話,就會導致消息丟失
          • 如果消息已經(jīng)被刷入了磁盤中,但是數(shù)據(jù)沒有做任何備份,一旦磁盤損壞,那么消息也會丟失

          3、消費者成功從RocketMQ中獲取到了消息,還沒有將消息完全消費完的時候,就通知RocketMQ我已經(jīng)將消息消費了,然后消費者宕機,但是RocketMQ認為消費者已經(jīng)成功消費了數(shù)據(jù),所以數(shù)據(jù)依舊丟失了。

          那么如何保證消息的零丟失呢?

          1、場景1中保證消息不丟失的方案是使用RocketMQ自帶的事務機制來發(fā)送消息,大致流程為

          • 首先生產(chǎn)者發(fā)送half消息到RocketMQ中,此時消費者是無法消費half消息的,若half消息就發(fā)送失敗了,則執(zhí)行相應的回滾邏輯
          • half消息發(fā)送成功之后,且RocketMQ返回成功響應,則執(zhí)行生產(chǎn)者的核心鏈路
          • 如果生產(chǎn)者自己的核心鏈路執(zhí)行失敗,則回滾,并通知RocketMQ刪除half消息
          • 如果生產(chǎn)者的核心鏈路執(zhí)行成功,則通知RocketMQ commit half消息,讓消費者可以消費這條數(shù)據(jù)

          其中還有一些RocketMQ長時間沒有收到生產(chǎn)者是要commit/rollback操作的響應,回調(diào)生產(chǎn)者接口的細節(jié),感興趣的可以參考:

          https://blog.csdn.net/LO_YUN/article/details/101673893

          在使用了RocketMQ事務將生產(chǎn)者的消息成功發(fā)送給RocketMQ,就可以保證在這個階段消息不會丟失

          2、在場景2中要保證消息不丟失,首先需要將os cache的異步刷盤策略改為同步刷盤,這一步需要修改Broker的配置文件,將flushDiskType改為SYNC_FLUSH同步刷盤策略,默認的是ASYNC_FLUSH異步刷盤。

          一旦同步刷盤返回成功,那么就一定保證消息已經(jīng)持久化到磁盤中了;為了保證磁盤損壞不會丟失數(shù)據(jù),我們需要對RocketMQ采用主從機構(gòu),集群部署,Leader中的數(shù)據(jù)在多個Follower中都存有備份,防止單點故障。

          搜索公眾號程序猿DD,回復“資源”,送你豐富的Java學習資料

          3、在場景3中,消息到達了消費者,RocketMQ在代碼中就能保證消息不會丟失

          //注冊消息監(jiān)聽器處理消息
          consumer.registerMessageListener(new?MessageListenerConcurrently()?{
          ???@Override
          ????public?ConsumeConcurrentlyStatus?consumeMessage(List?msgs,?ConsumeConcurrentlyContext?context){??????????????????????????????????
          ????????//對消息進行處理
          ????????return?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
          ????}
          });

          上面這段代碼中,RocketMQ在消費者中注冊了一個監(jiān)聽器,當消費者獲取到了消息,就會去回調(diào)這個監(jiān)聽器函數(shù),去處理里面的消息

          當你的消息處理完畢之后,才會返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS 只有返回了CONSUME_SUCCESS,消費者才會告訴RocketMQ我已經(jīng)消費完了,此時如果消費者宕機,消息已經(jīng)處理完了,也就不會丟失消息了

          如果消費者還沒有返回CONSUME_SUCCESS時就宕機了,那么RocketMQ就會認為你這個消費者節(jié)點掛掉了,會自動故障轉(zhuǎn)移,將消息交給消費者組的其他消費者去消費這個消息,保證消息不會丟失

          為了保證消息不會丟失,在consumeMessage方法中就直接寫消息消費的業(yè)務邏輯就可以了,如果非要搞一些騷操作,比如下面的代碼

          //注冊消息監(jiān)聽器處理消息
          consumer.registerMessageListener(new?MessageListenerConcurrently()?{
          ???@Override
          ????public?ConsumeConcurrentlyStatus?consumeMessage(List?msgs,?ConsumeConcurrentlyContext?context){?
          ?????//開啟子線程異步處理消息
          ?????new?Thread()?{
          ???public?void?run()?{
          ????//對消息進行處理
          ???}
          ??}.start();?????????????????????????????????
          ????????return?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
          ????}
          });

          如果新開子線程異步處理消息的話,就有可能出現(xiàn)消息還沒有被消費完,消費者告訴RocketMQ消息已經(jīng)被消費了,結(jié)果宕機丟失消息的情況。

          使用上面一整套的方案就可以在使用RocketMQ時保證消息零丟失,但是性能和吞吐量也將大幅下降

          • 使用事務機制傳輸消息,會比普通的消息傳輸多出很多步驟,耗費性能
          • 同步刷盤相比異步刷盤,一個是存儲在磁盤中,一個存儲在內(nèi)存中,速度完全不是一個數(shù)量級
          • 主從機構(gòu)的話,需要Leader將數(shù)據(jù)同步給Follower
          • 消費時無法異步消費,只能等待消費完成再通知RocketMQ消費完成

          消息零丟失是一把雙刃劍,要想用好,還是要視具體的業(yè)務場景而定,選擇合適的方案才是最好的


          往期推薦

          IntelliJ IDEA 2020.2.1 發(fā)布,Lombok插件可能被官方支持

          用戶密碼加密存儲十問十答,一文說透密碼安全存儲

          美國如果把根域名服務器封了,中國會從網(wǎng)絡上消失?

          用樹莓派打造世界上最小的“iMac”

          fastjson 的作者,在阿里內(nèi)網(wǎng)挨罵了?!


          星球限時拼團優(yōu)惠進行中


          我的星球是否適合你?

          點擊閱讀原文看看我們都聊過啥?

          瀏覽 34
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  乱子伦国产精品www | 亚洲欧美高清在线 | h片网站在线观看 | 2016亚洲天堂 | 逼特逼 |