<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          面試官:引入RabbitMQ后,你如何保證全鏈路數(shù)據(jù)100%不丟失?

          共 3769字,需瀏覽 8分鐘

           ·

          2022-05-23 00:13


          來源:blog.csdn.net/hsz2568952354/

          article/details/86559470

          • 生產(chǎn)端可靠性投遞
          • 事務(wù)消息機制
          • confirm消息確認機制
          • 消息持久化
          • 消息入庫
          • 消費端消息不丟失

          我們都知道,消息從生產(chǎn)端到消費端消費要經(jīng)過3個步驟:

          1. 生產(chǎn)端發(fā)送消息到RabbitMQ;
          2. RabbitMQ發(fā)送消息到消費端;
          3. 消費端消費這條消息;

          這3個步驟中的每一步都有可能導(dǎo)致消息丟失,消息丟失不可怕,可怕的是丟失了我們還不知道,所以要有一些措施來保證系統(tǒng)的可靠性。這里的可靠并不是一定就100%不丟失了,磁盤損壞,機房爆炸等等都能導(dǎo)致數(shù)據(jù)丟失,當(dāng)然這種都是極小概率發(fā)生,能做到99.999999%消息不丟失,就是可靠的了。下面來具體分析一下問題以及解決方案。

          生產(chǎn)端可靠性投遞

          生產(chǎn)端可靠性投遞,即生產(chǎn)端要確保將消息正確投遞到RabbitMQ中。生產(chǎn)端投遞的消息丟失的原因有很多,比如消息在網(wǎng)絡(luò)傳輸?shù)倪^程中發(fā)生網(wǎng)絡(luò)故障消息丟失,或者消息投遞到RabbitMQ時RabbitMQ掛了,那消息也可能丟失,而我們根本不知道發(fā)生了什么。針對以上情況,RabbitMQ本身提供了一些機制。

          事務(wù)消息機制

          事務(wù)消息機制由于會嚴重降低性能,所以一般不采用這種方法,我就不介紹了,而采用另一種輕量級的解決方案——confirm消息確認機制。

          confirm消息確認機制

          什么是confirm消息確認機制?顧名思義,就是生產(chǎn)端投遞的消息一旦投遞到RabbitMQ后,RabbitMQ就會發(fā)送一個確認消息給生產(chǎn)端,讓生產(chǎn)端知道我已經(jīng)收到消息了,否則這條消息就可能已經(jīng)丟失了,需要生產(chǎn)端重新發(fā)送消息了。

          通過下面這句代碼來開啟確認模式:

          channel.confirmSelect();//?開啟發(fā)送方確認模式

          然后異步監(jiān)聽確認和未確認的消息:

          channel.addConfirmListener(new?ConfirmListener()?{
          ????//消息正確到達broker
          ????@Override
          ????public?void?handleAck(long?deliveryTag,?boolean?multiple)?throws?IOException?{
          ????????System.out.println("已收到消息");
          ????????//做一些其他處理
          ????}

          ????//RabbitMQ因為自身內(nèi)部錯誤導(dǎo)致消息丟失,就會發(fā)送一條nack消息
          ????@Override
          ????public?void?handleNack(long?deliveryTag,?boolean?multiple)?throws?IOException?{
          ????????System.out.println("未確認消息,標識:"?+?deliveryTag);
          ????????//做一些其他處理,比如消息重發(fā)等
          ????}
          });

          這樣就可以讓生產(chǎn)端感知到消息是否投遞到RabbitMQ中了,當(dāng)然這樣還不夠,稍后我會說一下極端情況。

          消息持久化

          那消息持久化呢?我們知道,RabbitMQ收到消息后將這個消息暫時存在了內(nèi)存中,那這就會有個問題,如果RabbitMQ掛了,那重啟后數(shù)據(jù)就丟失了,所以相關(guān)的數(shù)據(jù)應(yīng)該持久化到硬盤中,這樣就算RabbitMQ重啟后也可以到硬盤中取數(shù)據(jù)恢復(fù)。那如何持久化呢?

          message消息到達RabbitMQ后先是到exchange交換機中,然后路由給queue隊列,最后發(fā)送給消費端。

          所有需要給exchange、queue和message都進行持久化:

          exchange持久化:

          //第三個參數(shù)true表示這個exchange持久化
          channel.exchangeDeclare(EXCHANGE_NAME,?"direct",?true);

          queue持久化:

          //第二個參數(shù)true表示這個queue持久化
          channel.queueDeclare(QUEUE_NAME,?true,?false,?false,?null);

          message持久化:

          //第三個參數(shù)MessageProperties.PERSISTENT_TEXT_PLAIN表示這條消息持久化
          channel.basicPublish(EXCHANGE_NAME,?ROUTING_KEY,?MessageProperties.PERSISTENT_TEXT_PLAIN,?message.getBytes(StandardCharsets.UTF_8));

          這樣,如果RabbitMQ收到消息后掛了,重啟后會自行恢復(fù)消息。

          到此,RabbitMQ提供的幾種機制都介紹完了,但這樣還不足以保證消息可靠性投遞RabbitMQ中,上面我也提到了會有極端情況,比如RabbitMQ收到消息還沒來得及將消息持久化到硬盤時,RabbitMQ掛了,這樣消息還是丟失了,或者RabbitMQ在發(fā)送確認消息給生產(chǎn)端的過程中,由于網(wǎng)絡(luò)故障而導(dǎo)致生產(chǎn)端沒有收到確認消息,這樣生產(chǎn)端就不知道RabbitMQ到底有沒有收到消息,就不好做接下來的處理。

          所以除了RabbitMQ提供的一些機制外,我們自己也要做一些消息補償機制,以應(yīng)對一些極端情況。接下來我就介紹其中的一種解決方案——消息入庫。

          消息入庫

          消息入庫,顧名思義就是將要發(fā)送的消息保存到數(shù)據(jù)庫中。

          首先發(fā)送消息前先將消息保存到數(shù)據(jù)庫中,有一個狀態(tài)字段status=0,表示生產(chǎn)端將消息發(fā)送給了RabbitMQ但還沒收到確認;在生產(chǎn)端收到確認后將status設(shè)為1,表示RabbitMQ已收到消息。這里有可能會出現(xiàn)上面說的兩種情況,所以生產(chǎn)端這邊開一個定時器,定時檢索消息表,將status=0并且超過固定時間后(可能消息剛發(fā)出去還沒來得及確認這邊定時器剛好檢索到這條status=0的消息,所以給個時間)還沒收到確認的消息取出重發(fā)(第二種情況下這里會造成消息重復(fù),消費者端要做冪等性),可能重發(fā)還會失敗,所以可以做一個最大重發(fā)次數(shù),超過就做另外的處理。

          這樣消息就可以可靠性投遞到RabbitMQ中了,而生產(chǎn)端也可以感知到了。

          消費端消息不丟失

          既然已經(jīng)可以讓生產(chǎn)端100%可靠性投遞到RabbitMQ了,那接下來就改看看消費端的了,如何讓消費端不丟失消息。

          默認情況下,以下3種情況會導(dǎo)致消息丟失:

          • 在RabbitMQ將消息發(fā)出后,消費端還沒接收到消息之前,發(fā)生網(wǎng)絡(luò)故障,消費端與RabbitMQ斷開連接,此時消息會丟失;
          • 在RabbitMQ將消息發(fā)出后,消費端還沒接收到消息之前,消費端掛了,此時消息會丟失;
          • 消費端正確接收到消息,但在處理消息的過程中發(fā)生異常或宕機了,消息也會丟失。

          其實,上述3中情況導(dǎo)致消息丟失歸根結(jié)底是因為RabbitMQ的自動ack機制,即默認RabbitMQ在消息發(fā)出后就立即將這條消息刪除,而不管消費端是否接收到,是否處理完,導(dǎo)致消費端消息丟失時RabbitMQ自己又沒有這條消息了。

          所以就需要將自動ack機制改為手動ack機制。

          消費端手動確認消息:

          DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{
          ????try?{
          ????????//接收到消息,做處理
          ????????//手動確認
          ????????channel.basicAck(delivery.getEnvelope().getDeliveryTag(),?false);
          ????}?catch?(Exception?e)?{
          ????????//出錯處理,這里可以讓消息重回隊列重新發(fā)送或直接丟棄消息
          ????}
          };
          //第二個參數(shù)autoAck設(shè)為false表示關(guān)閉自動確認機制,需手動確認
          channel.basicConsume(QUEUE_NAME,?false,?deliverCallback,?consumerTag?->?{});

          這樣,當(dāng)autoAck參數(shù)置為false,對于RabbitMQ服務(wù)端而言,隊列中的消息分成了兩個部分:一部分是等待投遞給消費端的消息;一部分是已經(jīng)投遞給消費端,但是還沒有收到消費端確認信號的消息。如果RabbitMQ一直沒有收到消費端的確認信號,并且消費此消息的消費端已經(jīng)斷開連接或宕機(RabbitMQ會自己感知到),則RabbitMQ會安排該消息重新進入隊列(放在隊列頭部),等待投遞給下一個消費者,當(dāng)然也有能還是原來的那個消費端,當(dāng)然消費端也需要確保冪等性。

          好了,到此從生產(chǎn)端到RabbitMQ再到消費端的全鏈路,就可以保證數(shù)據(jù)的不丟失。

          由于個人水平有限,有些地方可能理解錯了或理解不到位的,請大家多多指出!Thanks

          程序汪資料鏈接

          程序汪接的7個私活都在這里,經(jīng)驗整理

          Java項目分享 ?最新整理全集,找項目不累啦 07版

          堪稱神級的Spring Boot手冊,從基礎(chǔ)入門到實戰(zhàn)進階

          臥槽!字節(jié)跳動《算法中文手冊》火了,完整版 PDF 開放下載!

          臥槽!阿里大佬總結(jié)的《圖解Java》火了,完整版PDF開放下載!

          字節(jié)跳動總結(jié)的設(shè)計模式 PDF 火了,完整版開放下載!


          歡迎添加程序汪個人微信 itwang009? 進粉絲群或圍觀朋友圈

          瀏覽 46
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  操大骚逼视频 | 69国产精品成人无码 | 人人天天爽 | 久操香蕉| 丁香婷婷网 |