RabbitMQ 的消息確認(rèn)機(jī)制(圖文+代碼)詳解!

Java技術(shù)棧
www.javastack.cn
關(guān)注閱讀更多優(yōu)質(zhì)文章
作者:海向?
出處:www.cnblogs.com/haixiang/p/10900005.html
生產(chǎn)端 Confirm 消息確認(rèn)機(jī)制
消息的確認(rèn),是指生產(chǎn)者投遞消息后,如果 Broker 收到消息,則會(huì)給我們生產(chǎn)者一個(gè)應(yīng)答。
生產(chǎn)者進(jìn)行接收應(yīng)答,用來(lái)確定這條消息是否正常的發(fā)送到 Broker ,這種方式也是消息的可靠性投遞的核心保障!
Confirm 確認(rèn)機(jī)制流程圖

如何實(shí)現(xiàn)Confirm確認(rèn)消息?
第一步:在 channel 上開(kāi)啟確認(rèn)模式:
channel.confirmSelect()第二步:在 channel 上添加監(jiān)聽(tīng):
channel.addConfirmListener(ConfirmListener listener);, 監(jiān)聽(tīng)成功和失敗的返回結(jié)果,根據(jù)具體的結(jié)果對(duì)消息進(jìn)行重新發(fā)送、或記錄日志等后續(xù)處理!
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.ConfirmListener;
import?com.rabbitmq.client.Connection;
import?com.rabbitmq.client.ConnectionFactory;
import?java.io.IOException;
public?class?ConfirmProducer?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????factory.setHost("localhost");
????????factory.setVirtualHost("/");
????????factory.setUsername("guest");
????????factory.setPassword("guest");
????????Connection?connection?=?factory.newConnection();
????????Channel?channel?=?connection.createChannel();
????????String?exchangeName?=?"test_confirm_exchange";
????????String?routingKey?=?"item.update";
????????//指定消息的投遞模式:confirm 確認(rèn)模式
????????channel.confirmSelect();
????????//發(fā)送
????????final?long?start?=?System.currentTimeMillis();
????????for?(int?i?=?0;?i?5?;?i++)?{
????????????String?msg?=?"this?is?confirm?msg?";
????????????channel.basicPublish(exchangeName,?routingKey,?null,?msg.getBytes());
????????????System.out.println("Send?message?:?"?+?msg);
????????}
????????//添加一個(gè)確認(rèn)監(jiān)聽(tīng),?這里就不關(guān)閉連接了,為了能保證能收到監(jiān)聽(tīng)消息
????????channel.addConfirmListener(new?ConfirmListener()?{
????????????/**
?????????????*?返回成功的回調(diào)函數(shù)
?????????????*/
????????????public?void?handleAck(long?deliveryTag,?boolean?multiple)?throws?IOException?{
????????????????System.out.println("succuss?ack");
????????????????System.out.println(multiple);
????????????????System.out.println("耗時(shí):"?+?(System.currentTimeMillis()?-?start)?+?"ms");
????????????}
????????????/**
?????????????*?返回失敗的回調(diào)函數(shù)
?????????????*/
????????????public?void?handleNack(long?deliveryTag,?boolean?multiple)?throws?IOException?{
????????????????System.out.printf("defeat?ack");
????????????????System.out.println("耗時(shí):"?+?(System.currentTimeMillis()?-?start)?+?"ms");
????????????}
????????});
????}
}
import?com.rabbitmq.client.*;
import?java.io.IOException;
public?class?ConfirmConsumer?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????factory.setHost("localhost");
????????factory.setVirtualHost("/");
????????factory.setUsername("guest");
????????factory.setPassword("guest");
????????factory.setAutomaticRecoveryEnabled(true);
????????factory.setNetworkRecoveryInterval(3000);
????????Connection?connection?=?factory.newConnection();
????????Channel?channel?=?connection.createChannel();
??????
????????String?exchangeName?=?"test_confirm_exchange";
????????String?queueName?=?"test_confirm_queue";
????????String?routingKey?=?"item.#";
????????channel.exchangeDeclare(exchangeName,?"topic",?true,?false,?null);
????????channel.queueDeclare(queueName,?false,?false,?false,?null);
????????//一般不用代碼綁定,在管理界面手動(dòng)綁定
????????channel.queueBind(queueName,?exchangeName,?routingKey);
????????//創(chuàng)建消費(fèi)者并接收消息
????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,
???????????????????????????????????????AMQP.BasicProperties?properties,?byte[]?body)
????????????????????throws?IOException?{
????????????????String?message?=?new?String(body,?"UTF-8");
????????????????System.out.println("?[x]?Received?'"?+?message?+?"'");
????????????}
????????};
????????//設(shè)置?Channel?消費(fèi)者綁定隊(duì)列
????????channel.basicConsume(queueName,?true,?consumer);
????}
}
我們此處只關(guān)注生產(chǎn)端輸出消息
Send?message?:?this?is?confirm?msg?
Send?message?:?this?is?confirm?msg?
Send?message?:?this?is?confirm?msg?
Send?message?:?this?is?confirm?msg?
Send?message?:?this?is?confirm?msg?
succuss acktrue耗時(shí):3ms
succuss acktrue耗時(shí):4ms
注意事項(xiàng)
我們采用的是異步 confirm 模式:提供一個(gè)回調(diào)方法,服務(wù)端 confirm 了一條或者多條消息后 Client 端會(huì)回調(diào)這個(gè)方法。除此之外還有單條同步 confirm 模式、批量同步 confirm 模式,由于現(xiàn)實(shí)場(chǎng)景中很少使用我們?cè)诖瞬蛔鼋榻B,如有興趣直接參考官方文檔。關(guān)注公眾號(hào)Java技術(shù)棧可以獲取更多系列RabbitMQ教程。
我們運(yùn)行生產(chǎn)端會(huì)發(fā)現(xiàn)每次運(yùn)行結(jié)果都不一樣,會(huì)有多種情況出現(xiàn),因?yàn)?Broker 會(huì)進(jìn)行優(yōu)化,有時(shí)會(huì)批量一次性 confirm ,有時(shí)會(huì)分開(kāi)幾條 confirm。
succuss?ack??true
耗時(shí):3ms
succuss?ack??false
耗時(shí):4ms
或者
succuss?ack??true
耗時(shí):3ms
Return 消息機(jī)制
Return Listener 用于處理一-些不可路由的消息!
消息生產(chǎn)者,通過(guò)指定一個(gè)
Exchange和Routingkey,把消息送達(dá)到某一個(gè)隊(duì)列中去,然后我們的消費(fèi)者監(jiān)聽(tīng)隊(duì)列,進(jìn)行消費(fèi)處理操作!但是在某些情況下,如果我們?cè)诎l(fā)送消息的時(shí)候,當(dāng)前的 exchange 不存在或者指定的路由 key 路由不到,這個(gè)時(shí)候如果我們需要監(jiān)聽(tīng)這種不可達(dá)的消息,就要使用
Return Listener !在基礎(chǔ)API中有一個(gè)關(guān)鍵的配置項(xiàng):
Mandatory:如果為true,則監(jiān)聽(tīng)器會(huì)接收到路由不可達(dá)的消息,然后進(jìn)行后續(xù)處理,如果為false,那么 broker 端自動(dòng)刪除該消息!
Return 消息機(jī)制流程圖

Return 消息示例
首先我們需要發(fā)送三條消息,并且故意將第 0 條消息的
routing Key設(shè)置為錯(cuò)誤的,讓他無(wú)法正常路由到消費(fèi)端。mandatory設(shè)置為true路由不可達(dá)的消息會(huì)被監(jiān)聽(tīng)到,不會(huì)被自動(dòng)刪除.即channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());最后添加監(jiān)聽(tīng)即可監(jiān)聽(tīng)到不可路由到消費(fèi)端的消息
channel.addReturnListener(ReturnListener r))
import?com.rabbitmq.client.*;
import?java.io.IOException;
public?class?ReturnListeningProducer?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????factory.setHost("localhost");
????????factory.setVirtualHost("/");
????????factory.setUsername("guest");
????????factory.setPassword("guest");
??????
????????Connection?connection?=?factory.newConnection();
????????Channel?channel?=?connection.createChannel();
????????String?exchangeName?=?"test_return_exchange";
????????String?routingKey?=?"item.update";
????????String?errRoutingKey?=?"error.update";
????????//指定消息的投遞模式:confirm 確認(rèn)模式
????????channel.confirmSelect();
????????//發(fā)送
????????for?(int?i?=?0;?i?3?;?i++)?{
????????????String?msg?=?"this?is?return——listening?msg?";
????????????//@param?mandatory?設(shè)置為?true?路由不可達(dá)的消息會(huì)被監(jiān)聽(tīng)到,不會(huì)被自動(dòng)刪除
????????????if?(i?==?0)?{
????????????????channel.basicPublish(exchangeName,?errRoutingKey,?true,null,?msg.getBytes());
????????????}?else?{
????????????????channel.basicPublish(exchangeName,?routingKey,?true,?null,?msg.getBytes());
????????????}
????????????System.out.println("Send?message?:?"?+?msg);
????????}
????????//添加一個(gè)確認(rèn)監(jiān)聽(tīng),?這里就不關(guān)閉連接了,為了能保證能收到監(jiān)聽(tīng)消息
????????channel.addConfirmListener(new?ConfirmListener()?{
????????????/**
?????????????*?返回成功的回調(diào)函數(shù)
?????????????*/
????????????public?void?handleAck(long?deliveryTag,?boolean?multiple)?throws?IOException?{
????????????????System.out.println("succuss?ack");
????????????}
????????????/**
?????????????*?返回失敗的回調(diào)函數(shù)
?????????????*/
????????????public?void?handleNack(long?deliveryTag,?boolean?multiple)?throws?IOException?{
????????????????System.out.printf("defeat?ack");
????????????}
????????});
????????//添加一個(gè)?return?監(jiān)聽(tīng)
????????channel.addReturnListener(new?ReturnListener()?{
????????????public?void?handleReturn(int?replyCode,?String?replyText,?String?exchange,?String?routingKey,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("return?relyCode:?"?+?replyCode);
????????????????System.out.println("return?replyText:?"?+?replyText);
????????????????System.out.println("return?exchange:?"?+?exchange);
????????????????System.out.println("return?routingKey:?"?+?routingKey);
????????????????System.out.println("return?properties:?"?+?properties);
????????????????System.out.println("return?body:?"?+?new?String(body));
????????????}
????????});
????}
}
import?com.rabbitmq.client.*;
import?java.io.IOException;
public?class?ReturnListeningConsumer?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????//1.?創(chuàng)建一個(gè)?ConnectionFactory?并進(jìn)行設(shè)置
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????factory.setHost("localhost");
????????factory.setVirtualHost("/");
????????factory.setUsername("guest");
????????factory.setPassword("guest");
????????factory.setAutomaticRecoveryEnabled(true);
????????factory.setNetworkRecoveryInterval(3000);
????????//2.?通過(guò)連接工廠來(lái)創(chuàng)建連接
????????Connection?connection?=?factory.newConnection();
????????//3.?通過(guò)?Connection?來(lái)創(chuàng)建?Channel
????????Channel?channel?=?connection.createChannel();
????????//4.?聲明
????????String?exchangeName?=?"test_return_exchange";
????????String?queueName?=?"test_return_queue";
????????String?routingKey?=?"item.#";
????????channel.exchangeDeclare(exchangeName,?"topic",?true,?false,?null);
????????channel.queueDeclare(queueName,?false,?false,?false,?null);
????????//一般不用代碼綁定,在管理界面手動(dòng)綁定
????????channel.queueBind(queueName,?exchangeName,?routingKey);
????????//5.?創(chuàng)建消費(fèi)者并接收消息
????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,
???????????????????????????????????????AMQP.BasicProperties?properties,?byte[]?body)
????????????????????throws?IOException?{
????????????????String?message?=?new?String(body,?"UTF-8");
????????????????System.out.println("?[x]?Received?'"?+?message?+?"'");
????????????}
????????};
????????//6.?設(shè)置?Channel?消費(fèi)者綁定隊(duì)列
????????channel.basicConsume(queueName,?true,?consumer);
????}
}
我們只關(guān)注生產(chǎn)端結(jié)果,消費(fèi)端只收到兩條消息。
Send?message?:?this?is?return——listening?msg?
Send?message?:?this?is?return——listening?msg?
Send?message?:?this?is?return——listening?msg?
return?relyCode:?312
return?replyText:?NO_ROUTE
return?exchange:?test_return_exchange
return?routingKey:?error.update
return?properties:?#contentHeader(content-type=null,?content-encoding=null,?headers=null,?delivery-mode=null,?priority=null,?correlation-id=null,?reply-to=null,?expiration=null,?message-id=null,?timestamp=null,?type=null,?user-id=null,?app-id=null,?cluster-id=null)
return?body:?this?is?return——listening?msg?
succuss?ack
succuss?ack
succuss?ack
消費(fèi)端 Ack 和 Nack 機(jī)制
消費(fèi)端進(jìn)行消費(fèi)的時(shí)候,如果由于業(yè)務(wù)異常我們可以進(jìn)行日志的記錄,然后進(jìn)行補(bǔ)償!如果由于服務(wù)器宕機(jī)等嚴(yán)重問(wèn)題,那我們就需要手工進(jìn)行ACK保障消費(fèi)端消費(fèi)成功!消費(fèi)端重回隊(duì)列是為了對(duì)沒(méi)有處理成功的消息,把消息重新會(huì)遞給Broker!一般我們?cè)趯?shí)際應(yīng)用中,都會(huì)關(guān)閉重回隊(duì)列,也就是設(shè)置為False。
參考 api
void?basicNack(long?deliveryTag,?boolean?multiple,?boolean?requeue)?throws?IOException;`
?
void?basicAck(long?deliveryTag,?boolean?multiple)?throws?IOException;
如何設(shè)置手動(dòng) Ack 、Nack 以及重回隊(duì)列
首先我們發(fā)送五條消息,將每條消息對(duì)應(yīng)的循環(huán)下標(biāo) i 放入消息的
properties中作為標(biāo)記,以便于我們?cè)诤竺娴幕卣{(diào)方法中識(shí)別。其次, 我們將消費(fèi)端的 ·
channel.basicConsume(queueName, false, consumer);中的autoAck屬性設(shè)置為false,如果設(shè)置為true的話 將會(huì)正常輸出五條消息。我們通過(guò)
Thread.sleep(2000)來(lái)延時(shí)一秒,用以看清結(jié)果。我們獲取到properties中的num之后,通過(guò)channel.basicNack(envelope.getDeliveryTag(), false, true);將num為0的消息設(shè)置為 nack,即消費(fèi)失敗,并且將requeue屬性設(shè)置為true,即消費(fèi)失敗的消息重回隊(duì)列末端。
import?com.rabbitmq.client.*;
import?java.util.HashMap;
import?java.util.Map;
public?class?AckAndNackProducer?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????factory.setHost("localhost");
????????factory.setVirtualHost("/");
????????factory.setUsername("guest");
????????factory.setPassword("guest");
????????Connection?connection?=?factory.newConnection();
????????Channel?channel?=?connection.createChannel();
????????String?exchangeName?=?"test_ack_exchange";
????????String?routingKey?=?"item.update";
????????String?msg?=?"this?is?ack?msg";
????????for?(int?i?=?0;?i?5;?i++)?{
????????????Map?headers?=?new?HashMap();
????????????headers.put("num"?,i);
????????????AMQP.BasicProperties?properties?=?new?AMQP.BasicProperties().builder()
????????????????????.deliveryMode(2)
????????????????????.headers(headers)
????????????????????.build();
????????????String?tem?=?msg?+?":"?+?i;
????????????channel.basicPublish(exchangeName,?routingKey,?true,?properties,?tem.getBytes());
????????????System.out.println("Send?message?:?"?+?msg);
????????}
????????channel.close();
????????connection.close();
????}
}
import?com.rabbitmq.client.*;
import?java.io.IOException;
public?class?AckAndNackConsumer?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????factory.setHost("localhost");
????????factory.setVirtualHost("/");
????????factory.setUsername("guest");
????????factory.setPassword("guest");
????????factory.setAutomaticRecoveryEnabled(true);
????????factory.setNetworkRecoveryInterval(3000);
????????Connection?connection?=?factory.newConnection();
????????final?Channel?channel?=?connection.createChannel();
????????String?exchangeName?=?"test_ack_exchange";
????????String?queueName?=?"test_ack_queue";
????????String?routingKey?=?"item.#";
????????channel.exchangeDeclare(exchangeName,?"topic",?true,?false,?null);
????????channel.queueDeclare(queueName,?false,?false,?false,?null);
????????//一般不用代碼綁定,在管理界面手動(dòng)綁定
????????channel.queueBind(queueName,?exchangeName,?routingKey);
????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,
???????????????????????????????????????AMQP.BasicProperties?properties,?byte[]?body)
????????????????????throws?IOException?{
????????????????String?message?=?new?String(body,?"UTF-8");
????????????????System.out.println("?[x]?Received?'"?+?message?+?"'");
????????????????try?{
????????????????????Thread.sleep(2000);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????????if?((Integer)?properties.getHeaders().get("num")?==?0)?{
????????????????????channel.basicNack(envelope.getDeliveryTag(),?false,?true);
????????????????}?else?{
????????????????????channel.basicAck(envelope.getDeliveryTag(),?false);
????????????????}
????????????}
????????};
????????//6.?設(shè)置?Channel?消費(fèi)者綁定隊(duì)列
????????channel.basicConsume(queueName,?false,?consumer);
????}
}
我們此處只關(guān)心消費(fèi)端輸出,可以看到第 0 條消費(fèi)失敗重新回到隊(duì)列尾部消費(fèi)。
?[x]?Received?'this?is?ack?msg:1'
?[x]?Received?'this?is?ack?msg:2'
?[x]?Received?'this?is?ack?msg:3'
?[x]?Received?'this?is?ack?msg:4'
?[x]?Received?'this?is?ack?msg:0'
?[x]?Received?'this?is?ack?msg:0'
?[x]?Received?'this?is?ack?msg:0'
?[x]?Received?'this?is?ack?msg:0'
?[x]?Received?'this?is?ack?msg:0'






關(guān)注Java技術(shù)??锤喔韶?/strong>


