<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          面試官:你知道消息隊(duì)列如何保證數(shù)據(jù)不丟失嗎?

          共 6064字,需瀏覽 13分鐘

           ·

          2021-11-15 09:47

          程序員的成長之路
          互聯(lián)網(wǎng)/程序員/技術(shù)/資料共享?

          關(guān)注


          閱讀本文大概需要 8 分鐘。
          來自:blog.csdn.net/weixin_42942532/article/details/88951915

          一、生產(chǎn)者

          此時(shí)已經(jīng)可以保證消費(fèi)者出現(xiàn)宕機(jī),可以保證消息不丟失.
          Q: 當(dāng)訂單服務(wù)發(fā)送一條消息到rabbitMQ, rabbitMQ成功接收到了消息并保存在內(nèi)存中, 但是在倉儲(chǔ)服務(wù)沒有拿走此消息之前, rabbitMQ宕機(jī)了. 怎么辦?
          A:此問題需要考慮消息持久化(durable機(jī)制), 通過設(shè)置隊(duì)列的durable參數(shù)為true, 則當(dāng)rabbitMQ重啟之后, 會(huì)恢復(fù)之前的隊(duì)列. 它的工作原理是rabbitMQ會(huì)把隊(duì)列的相關(guān)信息持久化到磁盤. 代碼如下:
          /**
          ?*?queue?:?當(dāng)前操作的隊(duì)列.?設(shè)置隊(duì)列名稱即可
          ?*?durable:?當(dāng)前隊(duì)列是否開啟持久化.?如果為true.當(dāng)前mq服務(wù)重啟之后,隊(duì)列仍然存在
          ?*?exclusive:?當(dāng)前隊(duì)列是否獨(dú)占此連接
          ?*?autoDelete:?當(dāng)前隊(duì)列是否自動(dòng)刪除
          ?*?arguments:?隊(duì)列參數(shù)
          ?*/

          channel.queueDeclare(QUEUE,true,false,false,null);
          此時(shí)當(dāng)rabbitMQ重啟,則會(huì)恢復(fù)之前的存在的隊(duì)列.
          Q: 此時(shí)隊(duì)列中的消息會(huì)一并恢復(fù)么?
          A: 雖然隊(duì)列可以恢復(fù),但是按照當(dāng)前的設(shè)置,隊(duì)列中未消費(fèi)的消息是不會(huì)恢復(fù)的. 如果也要一并恢復(fù)消息,則需要設(shè)置隊(duì)列中的消息持久化. 代碼如下:
          /**
          ?*?exchange:?交換機(jī).?對(duì)于當(dāng)前操作使用默認(rèn)交換機(jī)?""
          ?*?routingKey:?路由key.?如果當(dāng)前使用默認(rèn)交換機(jī),?routingKey的值就是當(dāng)前隊(duì)列的名稱
          ?*?props:?參數(shù)
          ?*?body:?消息體
          ?*/

          String?message?=?"hello?rabbitmq";
          channel.basicPublish("",QUEUE,?MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
          通過第三個(gè)參數(shù)的設(shè)置,可以對(duì)發(fā)送出去的消息進(jìn)行持久化設(shè)置. 工作方式是將消息寫入到磁盤.當(dāng)rabbitMQ重啟,則在恢復(fù)隊(duì)列的同時(shí)也會(huì)一并恢復(fù)隊(duì)列中之前未被消費(fèi)的消息.
          此時(shí)需要注意: 對(duì)于此種方式是不能保證消息的百分百不丟失的. 因?yàn)閞abbitMQ有可能在沒有來得及寫入磁盤的時(shí)候, 服務(wù)器就宕機(jī)了. 此時(shí)消息一樣也會(huì)丟失. 如果要完全100%保證寫入RabbitMQ的數(shù)據(jù)必須落地磁盤,不會(huì)丟失,需要依靠其他的機(jī)制。

          二、消費(fèi)者

          1. 業(yè)務(wù)場(chǎng)景定義

          如上圖,正常的消息隊(duì)列的工作方式, 是可以通過rabbitMQ的workQueues或者routing或者topics進(jìn)行實(shí)現(xiàn)的. 對(duì)于消息生產(chǎn)者采用集群部署,用于進(jìn)行高可用, 消費(fèi)方部署集群保證保證高可用的同時(shí),也可以提高系統(tǒng)的TPS與QPS. 這也是最基礎(chǔ)的使用

          2. 問題場(chǎng)景描述

          2.1 消費(fèi)者服務(wù)宕機(jī)
          Q: 倉儲(chǔ)服務(wù)在接收到一條訂單消息之后, 并對(duì)此條消息沒有處理完之前,突然宕機(jī)了. 換句話說, 倉儲(chǔ)服務(wù)在接收到訂單消息之后, 倉儲(chǔ)服務(wù)調(diào)用發(fā)貨系統(tǒng)之前, 倉儲(chǔ)服務(wù)宕機(jī)了. 這個(gè)時(shí)候應(yīng)該怎么辦?
          A: rabbitMQ默認(rèn)操作是當(dāng)消費(fèi)者成功接收到消息之后,rabbitMQ則會(huì)自動(dòng)的在隊(duì)列中將此條消息刪除. 這種操作稱為自動(dòng)ACK(自動(dòng)回復(fù)). 代碼設(shè)置如下:
          /**
          ?*?queue?:?隊(duì)列名稱
          ?*?autoAck:?是否自動(dòng)應(yīng)答
          ?*callback:?消費(fèi)者
          ?*/

          channel.basicConsume(QUEUE_INFORM_EMAIL,true,consumer);
          在此段代碼中, 第二個(gè)參數(shù):autoAck.則為設(shè)置自動(dòng)應(yīng)答方式. 如果為true.則會(huì)當(dāng)消費(fèi)者接收到消息后,自動(dòng)刪除消息隊(duì)列中的這條消息. 代表這條消息已經(jīng)投遞完畢了.
          但是此時(shí),如果按照這種工作方式, 當(dāng)消費(fèi)者(倉儲(chǔ)服務(wù))接收到消息(訂單消息),消息隊(duì)列自動(dòng)把這條消息(訂單消息)刪除了, 但是倉儲(chǔ)服務(wù)在還沒有調(diào)用發(fā)貨系統(tǒng)之前宕機(jī)了. 那很明顯,這條消息(訂單消息)就丟失了.?這是絕對(duì)不可以忍受的!!!!!!!
          舉例: 用戶在前端系統(tǒng)下了一個(gè)訂單, 這條訂單基于訂單服務(wù)保存成功,提示用戶已經(jīng)下單成功,用戶就等著收貨, 等了好幾天都不發(fā)貨. 因?yàn)槭裁? 因?yàn)閭}儲(chǔ)服務(wù)在接收到訂單消息之后, 突然宕機(jī)了. 那么消息隊(duì)列中沒有了這個(gè)訂單消息, 倉儲(chǔ)服務(wù)也沒有去調(diào)用發(fā)貨系統(tǒng). 所以這個(gè)訂單就卡這了.
          那此時(shí)我們要對(duì)這個(gè)問題進(jìn)行解決. 核心痛點(diǎn)就在于autoAck這個(gè)參數(shù). 需要將此參數(shù)設(shè)置為false. 當(dāng)此參數(shù)設(shè)置為false. 那么當(dāng)消費(fèi)者接收到這個(gè)消息之后,消息隊(duì)列也不會(huì)馬上刪除這條消息. 對(duì)于我們開發(fā)人員要做的就是只有倉儲(chǔ)服務(wù)執(zhí)行完畢并調(diào)用成功發(fā)貨之后才會(huì)向消息對(duì)返回一條確認(rèn)消息,當(dāng)消息隊(duì)列接收到這條消息之后才刪除訂單消息. 核心代碼如下:
          DefaultConsumer?consumer?=?new?DefaultConsumer(finalChannel){
          ????@Override
          ????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ?
          ????????String?value?=?new?String(body,"utf-8");
          ????????System.out.println("開始調(diào)用發(fā)貨功能!!!!!");
          ????????System.out.println("根據(jù)發(fā)貨功能結(jié)果進(jìn)行判斷");
          ????????if?(true){
          ????????????//發(fā)貨成功
          ????????????//通知消息隊(duì)列刪除此消息
          ????????????finalChannel.basicAck(envelope.getDeliveryTag(),false);
          ????????}
          ????}
          };
          channel.basicConsume(QUEUE,false,consumer);
          按照這個(gè)流程改造了之后, 可以確保倉儲(chǔ)服務(wù)在成功調(diào)用了發(fā)貨功能之后才會(huì)通知消息隊(duì)列刪除這條訂單消息, 從而確保了不會(huì)因?yàn)樯鲜雒枋龅膯栴}而導(dǎo)致訂單消息丟失.
          Q:如果一旦消費(fèi)者宕機(jī)了, 那么這個(gè)訂單消息不就卡在消息隊(duì)列了么?
          A: 對(duì)于當(dāng)前的架構(gòu)設(shè)計(jì). 倉儲(chǔ)服務(wù)是以集群方式部署. 會(huì)存在多個(gè)倉儲(chǔ)服務(wù)的實(shí)例. 對(duì)于rabbitMQ來說, 如果一旦發(fā)現(xiàn)某個(gè)倉儲(chǔ)服務(wù)宕機(jī)了. 那么就會(huì)將這個(gè)訂單消息發(fā)送給其他的倉儲(chǔ)服務(wù)實(shí)例去使用這條消息.
          Q:如果其他倉儲(chǔ)實(shí)例調(diào)用完了此訂單消息,但是剛才的倉儲(chǔ)服務(wù)又重新啟動(dòng)了,那因?yàn)樗鼊偛乓呀?jīng)接收到了消息,它又去根據(jù)這個(gè)訂單消息去調(diào)用發(fā)貨功能,但是其他倉儲(chǔ)服務(wù)已經(jīng)用完了這個(gè)訂單消息, 怎么辦?
          A: 此問題無需考慮. 因?yàn)閭}儲(chǔ)服務(wù)只是一個(gè)消費(fèi)者, 它只會(huì)去持續(xù)監(jiān)聽消息隊(duì)列,拿消息進(jìn)行使用.而不會(huì)對(duì)消息進(jìn)行存儲(chǔ).所以該問題不會(huì)發(fā)生.
          Q: rabbitMQ是如何感知到消費(fèi)者宕機(jī)的?
          A: 消費(fèi)者實(shí)例已經(jīng)注冊(cè)到了rabbitMQ, 所以rabbitMQ與消費(fèi)者實(shí)例是存在聯(lián)系的,當(dāng)消費(fèi)者實(shí)例宕機(jī),rabbitMQ必然會(huì)知道
          Q:當(dāng)rabbitMQ感知到某一個(gè)消費(fèi)者實(shí)例宕機(jī),它是如何進(jìn)行消息重發(fā)的?
          DefaultConsumer?consumer?=?new?DefaultConsumer(finalChannel){
          ????@Override
          ????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ?
          ????????String?value?=?new?String(body,"utf-8");
          ????????try{
          ????????????//調(diào)用發(fā)貨
          ????????}
          ????????catch(Exception?e){
          ????????????//異常處理
          ????????}
          ????????finally{
          ????????????//通知消息隊(duì)列刪除此消息
          ????????????finalChannel.basicAck(envelope.getDeliveryTag(),false);
          ????????}
          ????}
          };
          channel.basicConsume(QUEUE,false,consumer);
          當(dāng)消費(fèi)者實(shí)例在處理消息的過程中, 出現(xiàn)了異常怎么辦? 這個(gè)時(shí)候是一定不能通知MQ服務(wù)消息消費(fèi)成功了, 否則消息不是就又丟了么!!. 在catch中,需要做的是通知MQ服務(wù)此條消息沒有處理成功,讓MQ將這個(gè)消息交給其他消費(fèi)者實(shí)例進(jìn)行處理. 具體實(shí)現(xiàn)如下:
          DefaultConsumer?consumer?=?new?DefaultConsumer(finalChannel){
          ????@Override
          ????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ?
          ????????String?value?=?new?String(body,"utf-8");
          ????????try{
          ????????????//調(diào)用發(fā)貨
          ????????}
          ????????catch(Exception?e){
          ????????????//異常處理
          ????????????//第一個(gè)參數(shù):?消息表示信息
          ????????????//第二個(gè)參數(shù):通知MQ服務(wù)當(dāng)前消費(fèi)者實(shí)例沒有處理成功,讓MQ服務(wù)將這個(gè)消息重新投遞給其他消費(fèi)者實(shí)例
          ????????????//如果設(shè)置為了false,會(huì)導(dǎo)致就算MQ服務(wù)知道當(dāng)前消費(fèi)者實(shí)例沒有處理成功,?但是依舊會(huì)刪除這個(gè)消息.
          ????????????channel.basicNack(envelope.getDeliveryTag(),true)
          ????????}
          ????????finally{
          ????????????//通知消息隊(duì)列刪除此消息
          ????????????finalChannel.basicAck(envelope.getDeliveryTag(),false);
          ????????}
          ????}
          };
          channel.basicConsume(QUEUE,false,consumer);
          Q:基于ack機(jī)制,結(jié)合高并發(fā)場(chǎng)景會(huì)出現(xiàn)什么問題?
          A: 對(duì)于當(dāng)前的操作, 每一個(gè)channel都會(huì)存在若干的unack消息(未確認(rèn)消息). 比方說, rabbitMQ正在發(fā)送的消息 、 消費(fèi)者實(shí)例接收到消息之后但沒有處理完 、 執(zhí)行了ack但是因?yàn)閍ck是異步的也不會(huì)馬上變?yōu)閍ck信息 、 開始批量ack延遲時(shí)間會(huì)更長.
          對(duì)于這些場(chǎng)景,都會(huì)存在unack的消息. 此時(shí)如果rabbitMQ無限制的過多過快的向消費(fèi)者實(shí)例發(fā)送消息,就會(huì)導(dǎo)致龐大的unack消息積壓在消費(fèi)者實(shí)例的內(nèi)存中,如果繼續(xù)保持發(fā)與積壓的狀態(tài),最終會(huì)導(dǎo)致消費(fèi)者實(shí)例的oom!!.
          此時(shí)需要考慮消費(fèi)者實(shí)例的處理能力以及如何解決unack消息積壓的問題.
          rabbitMQ基于 prefetch count(預(yù)抓取總數(shù))控制每一個(gè)channel的unack消息的數(shù)量,代碼如下:
          channel.basicQos(10)//設(shè)置預(yù)抓取消息總數(shù)為10
          這個(gè)方法一旦執(zhí)行,相當(dāng)于設(shè)置當(dāng)前的channel里,對(duì)于unack消息總數(shù)不能超過10條.( rabbitMQ正在發(fā)送的消息 、 消費(fèi)者實(shí)例接收到消息之后但沒有處理完 、 執(zhí)行了ack但是因?yàn)閍ck是異步的也不會(huì)馬上變?yōu)閍ck信息 、 開始批量ack延遲時(shí)間會(huì)更長等等這類unack消息的總數(shù)) , 當(dāng)一個(gè)channel中的unack消息超過十條之后, rabbitMQ則會(huì)停止向這個(gè)消費(fèi)者實(shí)例投遞消息, 等待unack消息總數(shù)小于10 或者 將消息轉(zhuǎn)發(fā)給其他的消費(fèi)者實(shí)例.
          此時(shí)需要結(jié)合高并發(fā)場(chǎng)景考慮prefetch count的值設(shè)置多大合適.
          當(dāng)前的這個(gè)值設(shè)置過大或者過小都會(huì)出問題. 過大可能導(dǎo)致系統(tǒng)雪崩, 過小導(dǎo)致系統(tǒng)吞吐量過低,響應(yīng)速度低.
          過大: 在高并發(fā)場(chǎng)景下, 可能每秒都會(huì)幾千上萬條消息. 如果仍舊把prefetch count 設(shè)置過大超出了消費(fèi)者實(shí)例內(nèi)存的處理能力, 消費(fèi)者實(shí)例可能瞬間就崩潰, 然后rabbitMQ感知到當(dāng)前消費(fèi)者實(shí)例宕機(jī),則會(huì)將這些消息交給其他消費(fèi)者實(shí)例,然后后面的消費(fèi)者實(shí)例也崩潰, 最終導(dǎo)致系統(tǒng)雪崩.
          舉個(gè)例子. 你給當(dāng)前消費(fèi)者實(shí)例的prefetch count設(shè)置為10W. 那么在消費(fèi)者實(shí)例中就可以存在10W條unack的消息,超出了消費(fèi)者實(shí)例的內(nèi)存容量, 直接OOM. 最終所有消費(fèi)者實(shí)例全部OOM
          過小: 如果設(shè)置過小,會(huì)導(dǎo)致系統(tǒng)的消息吞吐量降低,影響系統(tǒng)性能. 因?yàn)閳?zhí)行ack方法是異步的 . 舉例. 將prefetch count 設(shè)置為1. 則rabbitMQ最終投遞給消費(fèi)者實(shí)例一條unack消息. 當(dāng)消費(fèi)者實(shí)例消費(fèi)者這條信息,并執(zhí)行了ack方法, 因?yàn)樵摲椒ㄐ枰惒綀?zhí)行. 比方說需耗時(shí)300ms才能成功通知到rabbitMQ. 那么當(dāng)經(jīng)過了300ms之后, rabbitMQ才會(huì)再發(fā)出另外一條消息. 速度可想而知的慢!!! 這種操作可能導(dǎo)致當(dāng)前系統(tǒng)消息吞吐量下降千倍都是有可能的.
          對(duì)于上述問題. 官網(wǎng)給出的建議是設(shè)置在100~300之間. 但是在實(shí)際生產(chǎn)環(huán)境下, 具體環(huán)境需要具體確定.

          推薦閱讀:

          通過 Java 技術(shù)手段,獲取女朋友定位地址 ...

          Tomcat 組成與工作原理

          最近面試BAT,整理一份面試資料《Java面試BATJ通關(guān)手冊(cè)》,覆蓋了Java核心技術(shù)、JVM、Java并發(fā)、SSM、微服務(wù)、數(shù)據(jù)庫、數(shù)據(jù)結(jié)構(gòu)等等。

          獲取方式:點(diǎn)個(gè)「在看」,點(diǎn)擊上方小卡片,進(jìn)入公眾號(hào)后回復(fù)「面試題」領(lǐng)取,更多內(nèi)容陸續(xù)奉上。

          朕已閱?

          瀏覽 71
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  性色国产成人久久久精品 | 蜜桃臀美女被操 | 日本AⅤ在线 | 青青草原免费在线视频 | 黄色黄片网站 |