互聯(lián)網(wǎng)/程序員/技術(shù)/資料共享?來自:blog.csdn.net/weixin_42942532/article/details/88951915一、生產(chǎn)者
此時(shí)已經(jīng)可以保證消費(fèi)者出現(xiàn)宕機(jī),可以保證消息不丟失.Q: 當(dāng)訂單服務(wù)發(fā)送一條消息到rabbitMQ, rabbitMQ成功接收到了消息并保存在內(nèi)存中, 但是在倉儲(chǔ)服務(wù)沒有拿走此消息之前, rabbitMQ宕機(jī)了. 怎么辦?A:此問題需要考慮消息持久化(durable機(jī)制), 通過設(shè)置隊(duì)列的durable參數(shù)為true, 則當(dāng)rabbitMQ重啟之后, 會(huì)恢復(fù)之前的隊(duì)列. 它的工作原理是rabbitMQ會(huì)把隊(duì)列的相關(guān)信息持久化到磁盤. 代碼如下:/**
?*?queue?:?當(dāng)前操作的隊(duì)列.?設(shè)置隊(duì)列名稱即可
?*?durable:?當(dāng)前隊(duì)列是否開啟持久化.?如果為true.當(dāng)前mq服務(wù)重啟之后,隊(duì)列仍然存在
?*?exclusive:?當(dāng)前隊(duì)列是否獨(dú)占此連接
?*?autoDelete:?當(dāng)前隊(duì)列是否自動(dòng)刪除
?*?arguments:?隊(duì)列參數(shù)
?*/
channel.queueDeclare(QUEUE,true,false,false,null);
此時(shí)當(dāng)rabbitMQ重啟,則會(huì)恢復(fù)之前的存在的隊(duì)列.Q: 此時(shí)隊(duì)列中的消息會(huì)一并恢復(fù)么?A: 雖然隊(duì)列可以恢復(fù),但是按照當(dāng)前的設(shè)置,隊(duì)列中未消費(fèi)的消息是不會(huì)恢復(fù)的. 如果也要一并恢復(fù)消息,則需要設(shè)置隊(duì)列中的消息持久化. 代碼如下:/**
?*?exchange:?交換機(jī).?對(duì)于當(dāng)前操作使用默認(rèn)交換機(jī)?""
?*?routingKey:?路由key.?如果當(dāng)前使用默認(rèn)交換機(jī),?routingKey的值就是當(dāng)前隊(duì)列的名稱
?*?props:?參數(shù)
?*?body:?消息體
?*/
String?message?=?"hello?rabbitmq";
channel.basicPublish("",QUEUE,?MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
通過第三個(gè)參數(shù)的設(shè)置,可以對(duì)發(fā)送出去的消息進(jìn)行持久化設(shè)置. 工作方式是將消息寫入到磁盤.當(dāng)rabbitMQ重啟,則在恢復(fù)隊(duì)列的同時(shí)也會(huì)一并恢復(fù)隊(duì)列中之前未被消費(fèi)的消息.此時(shí)需要注意: 對(duì)于此種方式是不能保證消息的百分百不丟失的. 因?yàn)閞abbitMQ有可能在沒有來得及寫入磁盤的時(shí)候, 服務(wù)器就宕機(jī)了. 此時(shí)消息一樣也會(huì)丟失. 如果要完全100%保證寫入RabbitMQ的數(shù)據(jù)必須落地磁盤,不會(huì)丟失,需要依靠其他的機(jī)制。二、消費(fèi)者
1. 業(yè)務(wù)場(chǎng)景定義
如上圖,正常的消息隊(duì)列的工作方式, 是可以通過rabbitMQ的workQueues或者routing或者topics進(jìn)行實(shí)現(xiàn)的. 對(duì)于消息生產(chǎn)者采用集群部署,用于進(jìn)行高可用, 消費(fèi)方部署集群保證保證高可用的同時(shí),也可以提高系統(tǒng)的TPS與QPS. 這也是最基礎(chǔ)的使用2. 問題場(chǎng)景描述
2.1 消費(fèi)者服務(wù)宕機(jī)
Q: 倉儲(chǔ)服務(wù)在接收到一條訂單消息之后, 并對(duì)此條消息沒有處理完之前,突然宕機(jī)了. 換句話說, 倉儲(chǔ)服務(wù)在接收到訂單消息之后, 倉儲(chǔ)服務(wù)調(diào)用發(fā)貨系統(tǒng)之前, 倉儲(chǔ)服務(wù)宕機(jī)了. 這個(gè)時(shí)候應(yīng)該怎么辦?A: rabbitMQ默認(rèn)操作是當(dāng)消費(fèi)者成功接收到消息之后,rabbitMQ則會(huì)自動(dòng)的在隊(duì)列中將此條消息刪除. 這種操作稱為自動(dòng)ACK(自動(dòng)回復(fù)). 代碼設(shè)置如下:/**
?*?queue?:?隊(duì)列名稱
?*?autoAck:?是否自動(dòng)應(yīng)答
?*callback:?消費(fèi)者
?*/
channel.basicConsume(QUEUE_INFORM_EMAIL,true,consumer);
在此段代碼中, 第二個(gè)參數(shù):autoAck.則為設(shè)置自動(dòng)應(yīng)答方式. 如果為true.則會(huì)當(dāng)消費(fèi)者接收到消息后,自動(dòng)刪除消息隊(duì)列中的這條消息. 代表這條消息已經(jīng)投遞完畢了.但是此時(shí),如果按照這種工作方式, 當(dāng)消費(fèi)者(倉儲(chǔ)服務(wù))接收到消息(訂單消息),消息隊(duì)列自動(dòng)把這條消息(訂單消息)刪除了, 但是倉儲(chǔ)服務(wù)在還沒有調(diào)用發(fā)貨系統(tǒng)之前宕機(jī)了. 那很明顯,這條消息(訂單消息)就丟失了.?這是絕對(duì)不可以忍受的!!!!!!!舉例: 用戶在前端系統(tǒng)下了一個(gè)訂單, 這條訂單基于訂單服務(wù)保存成功,提示用戶已經(jīng)下單成功,用戶就等著收貨, 等了好幾天都不發(fā)貨. 因?yàn)槭裁? 因?yàn)閭}儲(chǔ)服務(wù)在接收到訂單消息之后, 突然宕機(jī)了. 那么消息隊(duì)列中沒有了這個(gè)訂單消息, 倉儲(chǔ)服務(wù)也沒有去調(diào)用發(fā)貨系統(tǒng). 所以這個(gè)訂單就卡這了.
那此時(shí)我們要對(duì)這個(gè)問題進(jìn)行解決. 核心痛點(diǎn)就在于autoAck這個(gè)參數(shù). 需要將此參數(shù)設(shè)置為false. 當(dāng)此參數(shù)設(shè)置為false. 那么當(dāng)消費(fèi)者接收到這個(gè)消息之后,消息隊(duì)列也不會(huì)馬上刪除這條消息. 對(duì)于我們開發(fā)人員要做的就是只有倉儲(chǔ)服務(wù)執(zhí)行完畢并調(diào)用成功發(fā)貨之后才會(huì)向消息對(duì)返回一條確認(rèn)消息,當(dāng)消息隊(duì)列接收到這條消息之后才刪除訂單消息. 核心代碼如下:DefaultConsumer?consumer?=?new?DefaultConsumer(finalChannel){
????@Override
????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
?
????????String?value?=?new?String(body,"utf-8");
????????System.out.println("開始調(diào)用發(fā)貨功能!!!!!");
????????System.out.println("根據(jù)發(fā)貨功能結(jié)果進(jìn)行判斷");
????????if?(true){
????????????//發(fā)貨成功
????????????//通知消息隊(duì)列刪除此消息
????????????finalChannel.basicAck(envelope.getDeliveryTag(),false);
????????}
????}
};
channel.basicConsume(QUEUE,false,consumer);
按照這個(gè)流程改造了之后, 可以確保倉儲(chǔ)服務(wù)在成功調(diào)用了發(fā)貨功能之后才會(huì)通知消息隊(duì)列刪除這條訂單消息, 從而確保了不會(huì)因?yàn)樯鲜雒枋龅膯栴}而導(dǎo)致訂單消息丟失.Q:如果一旦消費(fèi)者宕機(jī)了, 那么這個(gè)訂單消息不就卡在消息隊(duì)列了么?A: 對(duì)于當(dāng)前的架構(gòu)設(shè)計(jì). 倉儲(chǔ)服務(wù)是以集群方式部署. 會(huì)存在多個(gè)倉儲(chǔ)服務(wù)的實(shí)例. 對(duì)于rabbitMQ來說, 如果一旦發(fā)現(xiàn)某個(gè)倉儲(chǔ)服務(wù)宕機(jī)了. 那么就會(huì)將這個(gè)訂單消息發(fā)送給其他的倉儲(chǔ)服務(wù)實(shí)例去使用這條消息.Q:如果其他倉儲(chǔ)實(shí)例調(diào)用完了此訂單消息,但是剛才的倉儲(chǔ)服務(wù)又重新啟動(dòng)了,那因?yàn)樗鼊偛乓呀?jīng)接收到了消息,它又去根據(jù)這個(gè)訂單消息去調(diào)用發(fā)貨功能,但是其他倉儲(chǔ)服務(wù)已經(jīng)用完了這個(gè)訂單消息, 怎么辦?A: 此問題無需考慮. 因?yàn)閭}儲(chǔ)服務(wù)只是一個(gè)消費(fèi)者, 它只會(huì)去持續(xù)監(jiān)聽消息隊(duì)列,拿消息進(jìn)行使用.而不會(huì)對(duì)消息進(jìn)行存儲(chǔ).所以該問題不會(huì)發(fā)生.Q: rabbitMQ是如何感知到消費(fèi)者宕機(jī)的?A: 消費(fèi)者實(shí)例已經(jīng)注冊(cè)到了rabbitMQ, 所以rabbitMQ與消費(fèi)者實(shí)例是存在聯(lián)系的,當(dāng)消費(fèi)者實(shí)例宕機(jī),rabbitMQ必然會(huì)知道Q:當(dāng)rabbitMQ感知到某一個(gè)消費(fèi)者實(shí)例宕機(jī),它是如何進(jìn)行消息重發(fā)的?DefaultConsumer?consumer?=?new?DefaultConsumer(finalChannel){
????@Override
????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
?
????????String?value?=?new?String(body,"utf-8");
????????try{
????????????//調(diào)用發(fā)貨
????????}
????????catch(Exception?e){
????????????//異常處理
????????}
????????finally{
????????????//通知消息隊(duì)列刪除此消息
????????????finalChannel.basicAck(envelope.getDeliveryTag(),false);
????????}
????}
};
channel.basicConsume(QUEUE,false,consumer);
當(dāng)消費(fèi)者實(shí)例在處理消息的過程中, 出現(xiàn)了異常怎么辦? 這個(gè)時(shí)候是一定不能通知MQ服務(wù)消息消費(fèi)成功了, 否則消息不是就又丟了么!!. 在catch中,需要做的是通知MQ服務(wù)此條消息沒有處理成功,讓MQ將這個(gè)消息交給其他消費(fèi)者實(shí)例進(jìn)行處理. 具體實(shí)現(xiàn)如下:DefaultConsumer?consumer?=?new?DefaultConsumer(finalChannel){
????@Override
????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
?
????????String?value?=?new?String(body,"utf-8");
????????try{
????????????//調(diào)用發(fā)貨
????????}
????????catch(Exception?e){
????????????//異常處理
????????????//第一個(gè)參數(shù):?消息表示信息
????????????//第二個(gè)參數(shù):通知MQ服務(wù)當(dāng)前消費(fèi)者實(shí)例沒有處理成功,讓MQ服務(wù)將這個(gè)消息重新投遞給其他消費(fèi)者實(shí)例
????????????//如果設(shè)置為了false,會(huì)導(dǎo)致就算MQ服務(wù)知道當(dāng)前消費(fèi)者實(shí)例沒有處理成功,?但是依舊會(huì)刪除這個(gè)消息.
????????????channel.basicNack(envelope.getDeliveryTag(),true)
????????}
????????finally{
????????????//通知消息隊(duì)列刪除此消息
????????????finalChannel.basicAck(envelope.getDeliveryTag(),false);
????????}
????}
};
channel.basicConsume(QUEUE,false,consumer);
Q:基于ack機(jī)制,結(jié)合高并發(fā)場(chǎng)景會(huì)出現(xiàn)什么問題?A: 對(duì)于當(dāng)前的操作, 每一個(gè)channel都會(huì)存在若干的unack消息(未確認(rèn)消息). 比方說, rabbitMQ正在發(fā)送的消息 、 消費(fèi)者實(shí)例接收到消息之后但沒有處理完 、 執(zhí)行了ack但是因?yàn)閍ck是異步的也不會(huì)馬上變?yōu)閍ck信息 、 開始批量ack延遲時(shí)間會(huì)更長.對(duì)于這些場(chǎng)景,都會(huì)存在unack的消息. 此時(shí)如果rabbitMQ無限制的過多過快的向消費(fèi)者實(shí)例發(fā)送消息,就會(huì)導(dǎo)致龐大的unack消息積壓在消費(fèi)者實(shí)例的內(nèi)存中,如果繼續(xù)保持發(fā)與積壓的狀態(tài),最終會(huì)導(dǎo)致消費(fèi)者實(shí)例的oom!!.此時(shí)需要考慮消費(fèi)者實(shí)例的處理能力以及如何解決unack消息積壓的問題.rabbitMQ基于 prefetch count(預(yù)抓取總數(shù))控制每一個(gè)channel的unack消息的數(shù)量,代碼如下:channel.basicQos(10)//設(shè)置預(yù)抓取消息總數(shù)為10
這個(gè)方法一旦執(zhí)行,相當(dāng)于設(shè)置當(dāng)前的channel里,對(duì)于unack消息總數(shù)不能超過10條.( rabbitMQ正在發(fā)送的消息 、 消費(fèi)者實(shí)例接收到消息之后但沒有處理完 、 執(zhí)行了ack但是因?yàn)閍ck是異步的也不會(huì)馬上變?yōu)閍ck信息 、 開始批量ack延遲時(shí)間會(huì)更長等等這類unack消息的總數(shù)) , 當(dāng)一個(gè)channel中的unack消息超過十條之后, rabbitMQ則會(huì)停止向這個(gè)消費(fèi)者實(shí)例投遞消息, 等待unack消息總數(shù)小于10 或者 將消息轉(zhuǎn)發(fā)給其他的消費(fèi)者實(shí)例.此時(shí)需要結(jié)合高并發(fā)場(chǎng)景考慮prefetch count的值設(shè)置多大合適.當(dāng)前的這個(gè)值設(shè)置過大或者過小都會(huì)出問題. 過大可能導(dǎo)致系統(tǒng)雪崩, 過小導(dǎo)致系統(tǒng)吞吐量過低,響應(yīng)速度低.過大: 在高并發(fā)場(chǎng)景下, 可能每秒都會(huì)幾千上萬條消息. 如果仍舊把prefetch count 設(shè)置過大超出了消費(fèi)者實(shí)例內(nèi)存的處理能力, 消費(fèi)者實(shí)例可能瞬間就崩潰, 然后rabbitMQ感知到當(dāng)前消費(fèi)者實(shí)例宕機(jī),則會(huì)將這些消息交給其他消費(fèi)者實(shí)例,然后后面的消費(fèi)者實(shí)例也崩潰, 最終導(dǎo)致系統(tǒng)雪崩.舉個(gè)例子. 你給當(dāng)前消費(fèi)者實(shí)例的prefetch count設(shè)置為10W. 那么在消費(fèi)者實(shí)例中就可以存在10W條unack的消息,超出了消費(fèi)者實(shí)例的內(nèi)存容量, 直接OOM. 最終所有消費(fèi)者實(shí)例全部OOM過小: 如果設(shè)置過小,會(huì)導(dǎo)致系統(tǒng)的消息吞吐量降低,影響系統(tǒng)性能. 因?yàn)閳?zhí)行ack方法是異步的 . 舉例. 將prefetch count 設(shè)置為1. 則rabbitMQ最終投遞給消費(fèi)者實(shí)例一條unack消息. 當(dāng)消費(fèi)者實(shí)例消費(fèi)者這條信息,并執(zhí)行了ack方法, 因?yàn)樵摲椒ㄐ枰惒綀?zhí)行. 比方說需耗時(shí)300ms才能成功通知到rabbitMQ. 那么當(dāng)經(jīng)過了300ms之后, rabbitMQ才會(huì)再發(fā)出另外一條消息. 速度可想而知的慢!!! 這種操作可能導(dǎo)致當(dāng)前系統(tǒng)消息吞吐量下降千倍都是有可能的.對(duì)于上述問題. 官網(wǎng)給出的建議是設(shè)置在100~300之間. 但是在實(shí)際生產(chǎn)環(huán)境下, 具體環(huán)境需要具體確定.推薦閱讀:
通過 Java 技術(shù)手段,獲取女朋友定位地址 ...
Tomcat 組成與工作原理
最近面試BAT,整理一份面試資料《Java面試BATJ通關(guān)手冊(cè)》,覆蓋了Java核心技術(shù)、JVM、Java并發(fā)、SSM、微服務(wù)、數(shù)據(jù)庫、數(shù)據(jù)結(jié)構(gòu)等等。
獲取方式:點(diǎn)個(gè)「在看」,點(diǎn)擊上方小卡片,進(jìn)入公眾號(hào)后回復(fù)「面試題」領(lǐng)取,更多內(nèi)容陸續(xù)奉上。朕已閱?