<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RabbitMQ如何保證全鏈路數(shù)據(jù)100%不丟失?

          共 4191字,需瀏覽 9分鐘

           ·

          2021-11-14 11:50


          我們都知道,消息從生產(chǎn)端到消費(fèi)端消費(fèi)要經(jīng)過3個(gè)步驟:

          1. 生產(chǎn)端發(fā)送消息到RabbitMQ;
          2. RabbitMQ發(fā)送消息到消費(fèi)端;
          3. 消費(fèi)端消費(fèi)這條消息;

          這3個(gè)步驟中的每一步都有可能導(dǎo)致消息丟失,消息丟失不可怕,可怕的是丟失了我們還不知道,所以要有一些措施來(lái)保證系統(tǒng)的可靠性。這里的可靠并不是一定就100%不丟失了,磁盤損壞,機(jī)房爆炸等等都能導(dǎo)致數(shù)據(jù)丟失,當(dāng)然這種都是極小概率發(fā)生,能做到99.999999%消息不丟失,就是可靠的了。下面來(lái)具體分析一下問題以及解決方案。

          生產(chǎn)端可靠性投遞

          生產(chǎn)端可靠性投遞,即生產(chǎn)端要確保將消息正確投遞到RabbitMQ中。生產(chǎn)端投遞的消息丟失的原因有很多,比如消息在網(wǎng)絡(luò)傳輸?shù)倪^程中發(fā)生網(wǎng)絡(luò)故障消息丟失,或者消息投遞到RabbitMQ時(shí)RabbitMQ掛了,那消息也可能丟失,而我們根本不知道發(fā)生了什么。針對(duì)以上情況,RabbitMQ本身提供了一些機(jī)制。

          事務(wù)消息機(jī)制

          事務(wù)消息機(jī)制由于會(huì)嚴(yán)重降低性能,所以一般不采用這種方法,我就不介紹了,而采用另一種輕量級(jí)的解決方案——confirm消息確認(rèn)機(jī)制。

          confirm消息確認(rèn)機(jī)制

          什么是confirm消息確認(rèn)機(jī)制?顧名思義,就是生產(chǎn)端投遞的消息一旦投遞到RabbitMQ后,RabbitMQ就會(huì)發(fā)送一個(gè)確認(rèn)消息給生產(chǎn)端,讓生產(chǎn)端知道我已經(jīng)收到消息了,否則這條消息就可能已經(jīng)丟失了,需要生產(chǎn)端重新發(fā)送消息了。

          通過下面這句代碼來(lái)開啟確認(rèn)模式:

          channel.confirmSelect();//?開啟發(fā)送方確認(rèn)模式

          然后異步監(jiān)聽確認(rèn)和未確認(rèn)的消息:

          channel.addConfirmListener(new?ConfirmListener()?{
          ????//消息正確到達(dá)broker
          ????@Override
          ????public?void?handleAck(long?deliveryTag,?boolean?multiple)?throws?IOException?{
          ????????System.out.println("已收到消息");
          ????????//做一些其他處理
          ????}

          ????//RabbitMQ因?yàn)樽陨韮?nèi)部錯(cuò)誤導(dǎo)致消息丟失,就會(huì)發(fā)送一條nack消息
          ????@Override
          ????public?void?handleNack(long?deliveryTag,?boolean?multiple)?throws?IOException?{
          ????????System.out.println("未確認(rèn)消息,標(biāo)識(shí):"?+?deliveryTag);
          ????????//做一些其他處理,比如消息重發(fā)等
          ????}
          });

          這樣就可以讓生產(chǎn)端感知到消息是否投遞到RabbitMQ中了,當(dāng)然這樣還不夠,稍后我會(huì)說(shuō)一下極端情況。

          消息持久化

          那消息持久化呢?我們知道,RabbitMQ收到消息后將這個(gè)消息暫時(shí)存在了內(nèi)存中,那這就會(huì)有個(gè)問題,如果RabbitMQ掛了,那重啟后數(shù)據(jù)就丟失了,所以相關(guān)的數(shù)據(jù)應(yīng)該持久化到硬盤中,這樣就算RabbitMQ重啟后也可以到硬盤中取數(shù)據(jù)恢復(fù)。那如何持久化呢?

          message消息到達(dá)RabbitMQ后先是到exchange交換機(jī)中,然后路由給queue隊(duì)列,最后發(fā)送給消費(fèi)端。

          所有需要給exchange、queue和message都進(jìn)行持久化:

          exchange持久化:

          //第三個(gè)參數(shù)true表示這個(gè)exchange持久化
          channel.exchangeDeclare(EXCHANGE_NAME,?"direct",?true);

          queue持久化:

          //第二個(gè)參數(shù)true表示這個(gè)queue持久化
          channel.queueDeclare(QUEUE_NAME,?true,?false,?false,?null);

          message持久化:

          //第三個(gè)參數(shù)MessageProperties.PERSISTENT_TEXT_PLAIN表示這條消息持久化
          channel.basicPublish(EXCHANGE_NAME,?ROUTING_KEY,?MessageProperties.PERSISTENT_TEXT_PLAIN,?message.getBytes(StandardCharsets.UTF_8));

          這樣,如果RabbitMQ收到消息后掛了,重啟后會(huì)自行恢復(fù)消息。

          到此,RabbitMQ提供的幾種機(jī)制都介紹完了,但這樣還不足以保證消息可靠性投遞RabbitMQ中,上面我也提到了會(huì)有極端情況,比如RabbitMQ收到消息還沒來(lái)得及將消息持久化到硬盤時(shí),RabbitMQ掛了,這樣消息還是丟失了,或者RabbitMQ在發(fā)送確認(rèn)消息給生產(chǎn)端的過程中,由于網(wǎng)絡(luò)故障而導(dǎo)致生產(chǎn)端沒有收到確認(rèn)消息,這樣生產(chǎn)端就不知道RabbitMQ到底有沒有收到消息,就不好做接下來(lái)的處理。

          所以除了RabbitMQ提供的一些機(jī)制外,我們自己也要做一些消息補(bǔ)償機(jī)制,以應(yīng)對(duì)一些極端情況。接下來(lái)我就介紹其中的一種解決方案——消息入庫(kù)。

          消息入庫(kù)

          消息入庫(kù),顧名思義就是將要發(fā)送的消息保存到數(shù)據(jù)庫(kù)中。

          首先發(fā)送消息前先將消息保存到數(shù)據(jù)庫(kù)中,有一個(gè)狀態(tài)字段status=0,表示生產(chǎn)端將消息發(fā)送給了RabbitMQ但還沒收到確認(rèn);在生產(chǎn)端收到確認(rèn)后將status設(shè)為1,表示RabbitMQ已收到消息。這里有可能會(huì)出現(xiàn)上面說(shuō)的兩種情況,所以生產(chǎn)端這邊開一個(gè)定時(shí)器,定時(shí)檢索消息表,將status=0并且超過固定時(shí)間后(可能消息剛發(fā)出去還沒來(lái)得及確認(rèn)這邊定時(shí)器剛好檢索到這條status=0的消息,所以給個(gè)時(shí)間)還沒收到確認(rèn)的消息取出重發(fā)(第二種情況下這里會(huì)造成消息重復(fù),消費(fèi)者端要做冪等性),可能重發(fā)還會(huì)失敗,所以可以做一個(gè)最大重發(fā)次數(shù),超過就做另外的處理。

          這樣消息就可以可靠性投遞到RabbitMQ中了,而生產(chǎn)端也可以感知到了。

          消費(fèi)端消息不丟失

          既然已經(jīng)可以讓生產(chǎn)端100%可靠性投遞到RabbitMQ了,那接下來(lái)就改看看消費(fèi)端的了,如何讓消費(fèi)端不丟失消息。

          默認(rèn)情況下,以下3種情況會(huì)導(dǎo)致消息丟失:

          • 在RabbitMQ將消息發(fā)出后,消費(fèi)端還沒接收到消息之前,發(fā)生網(wǎng)絡(luò)故障,消費(fèi)端與RabbitMQ斷開連接,此時(shí)消息會(huì)丟失;
          • 在RabbitMQ將消息發(fā)出后,消費(fèi)端還沒接收到消息之前,消費(fèi)端掛了,此時(shí)消息會(huì)丟失;
          • 消費(fèi)端正確接收到消息,但在處理消息的過程中發(fā)生異常或宕機(jī)了,消息也會(huì)丟失。

          其實(shí),上述3中情況導(dǎo)致消息丟失歸根結(jié)底是因?yàn)镽abbitMQ的自動(dòng)ack機(jī)制,即默認(rèn)RabbitMQ在消息發(fā)出后就立即將這條消息刪除,而不管消費(fèi)端是否接收到,是否處理完,導(dǎo)致消費(fèi)端消息丟失時(shí)RabbitMQ自己又沒有這條消息了。

          所以就需要將自動(dòng)ack機(jī)制改為手動(dòng)ack機(jī)制。

          消費(fèi)端手動(dòng)確認(rèn)消息:

          DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{
          ????try?{
          ????????//接收到消息,做處理
          ????????//手動(dòng)確認(rèn)
          ????????channel.basicAck(delivery.getEnvelope().getDeliveryTag(),?false);
          ????}?catch?(Exception?e)?{
          ????????//出錯(cuò)處理,這里可以讓消息重回隊(duì)列重新發(fā)送或直接丟棄消息
          ????}
          };
          //第二個(gè)參數(shù)autoAck設(shè)為false表示關(guān)閉自動(dòng)確認(rèn)機(jī)制,需手動(dòng)確認(rèn)
          channel.basicConsume(QUEUE_NAME,?false,?deliverCallback,?consumerTag?->?{});

          這樣,當(dāng)autoAck參數(shù)置為false,對(duì)于RabbitMQ服務(wù)端而言,隊(duì)列中的消息分成了兩個(gè)部分:一部分是等待投遞給消費(fèi)端的消息;一部分是已經(jīng)投遞給消費(fèi)端,但是還沒有收到消費(fèi)端確認(rèn)信號(hào)的消息。如果RabbitMQ一直沒有收到消費(fèi)端的確認(rèn)信號(hào),并且消費(fèi)此消息的消費(fèi)端已經(jīng)斷開連接或宕機(jī)(RabbitMQ會(huì)自己感知到),則RabbitMQ會(huì)安排該消息重新進(jìn)入隊(duì)列(放在隊(duì)列頭部),等待投遞給下一個(gè)消費(fèi)者,當(dāng)然也有能還是原來(lái)的那個(gè)消費(fèi)端,當(dāng)然消費(fèi)端也需要確保冪等性。

          好了,到此從生產(chǎn)端到RabbitMQ再到消費(fèi)端的全鏈路,就可以保證數(shù)據(jù)的不丟失。

          來(lái)源:https://blog.csdn.net/hsz2568952354/article/details/86559470



          往期推薦



          學(xué)會(huì) IDEA 的這個(gè)功能,輕松閱讀源碼!

          IDEA 進(jìn)行遠(yuǎn)程 Debug,太強(qiáng)了...

          mybatis-plus團(tuán)隊(duì)新作:mybatis-mate 輕松搞定數(shù)據(jù)權(quán)限

          Redis 如何高效實(shí)現(xiàn)點(diǎn)贊、取消點(diǎn)贊功能

          MongoDB和MySQL效率性能對(duì)比

          扔掉 Postman ,來(lái)試試神器ApiPost!


          瀏覽 61
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩一级视频网站 | 亚洲中文字幕在线观看 | 国产精品剧情亚洲二区 | 婷婷丁香五月社区亚洲 | 亚洲黄色无码 |