<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RabbitMQ 的消息確認(rèn)機(jī)制(圖文+代碼)詳解!

          共 1348字,需瀏覽 3分鐘

           ·

          2020-11-13 15:24

          Java技術(shù)棧

          www.javastack.cn

          關(guān)注閱讀更多優(yōu)質(zhì)文章



          作者:海向?

          出處:www.cnblogs.com/haixiang/p/10900005.html


          生產(chǎn)端 Confirm 消息確認(rèn)機(jī)制

          消息的確認(rèn),是指生產(chǎn)者投遞消息后,如果 Broker 收到消息,則會(huì)給我們生產(chǎn)者一個(gè)應(yīng)答。

          生產(chǎn)者進(jìn)行接收應(yīng)答,用來(lái)確定這條消息是否正常的發(fā)送到 Broker ,這種方式也是消息的可靠性投遞的核心保障!

          Confirm 確認(rèn)機(jī)制流程圖

          如何實(shí)現(xiàn)Confirm確認(rèn)消息?

          • 第一步:在 channel 上開(kāi)啟確認(rèn)模式: channel.confirmSelect()

          • 第二步:在 channel 上添加監(jiān)聽(tīng): channel.addConfirmListener(ConfirmListener listener);, 監(jiān)聽(tīng)成功和失敗的返回結(jié)果,根據(jù)具體的結(jié)果對(duì)消息進(jìn)行重新發(fā)送、或記錄日志等后續(xù)處理!

          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.ConfirmListener;
          import?com.rabbitmq.client.Connection;
          import?com.rabbitmq.client.ConnectionFactory;

          import?java.io.IOException;

          public?class?ConfirmProducer?{
          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????ConnectionFactory?factory?=?new?ConnectionFactory();
          ????????factory.setHost("localhost");
          ????????factory.setVirtualHost("/");
          ????????factory.setUsername("guest");
          ????????factory.setPassword("guest");

          ????????Connection?connection?=?factory.newConnection();

          ????????Channel?channel?=?connection.createChannel();

          ????????String?exchangeName?=?"test_confirm_exchange";
          ????????String?routingKey?=?"item.update";

          ????????//指定消息的投遞模式:confirm 確認(rèn)模式
          ????????channel.confirmSelect();

          ????????//發(fā)送
          ????????final?long?start?=?System.currentTimeMillis();
          ????????for?(int?i?=?0;?i?????????????String?msg?=?"this?is?confirm?msg?";
          ????????????channel.basicPublish(exchangeName,?routingKey,?null,?msg.getBytes());
          ????????????System.out.println("Send?message?:?"?+?msg);
          ????????}

          ????????//添加一個(gè)確認(rèn)監(jiān)聽(tīng),?這里就不關(guān)閉連接了,為了能保證能收到監(jiān)聽(tīng)消息
          ????????channel.addConfirmListener(new?ConfirmListener()?{
          ????????????/**
          ?????????????*?返回成功的回調(diào)函數(shù)
          ?????????????*/
          ????????????public?void?handleAck(long?deliveryTag,?boolean?multiple)?throws?IOException?{
          ????????????????System.out.println("succuss?ack");
          ????????????????System.out.println(multiple);
          ????????????????System.out.println("耗時(shí):"?+?(System.currentTimeMillis()?-?start)?+?"ms");
          ????????????}
          ????????????/**
          ?????????????*?返回失敗的回調(diào)函數(shù)
          ?????????????*/
          ????????????public?void?handleNack(long?deliveryTag,?boolean?multiple)?throws?IOException?{
          ????????????????System.out.printf("defeat?ack");
          ????????????????System.out.println("耗時(shí):"?+?(System.currentTimeMillis()?-?start)?+?"ms");
          ????????????}
          ????????});
          ????}
          }
          import?com.rabbitmq.client.*;
          import?java.io.IOException;

          public?class?ConfirmConsumer?{
          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????ConnectionFactory?factory?=?new?ConnectionFactory();
          ????????factory.setHost("localhost");
          ????????factory.setVirtualHost("/");
          ????????factory.setUsername("guest");
          ????????factory.setPassword("guest");
          ????????factory.setAutomaticRecoveryEnabled(true);
          ????????factory.setNetworkRecoveryInterval(3000);

          ????????Connection?connection?=?factory.newConnection();

          ????????Channel?channel?=?connection.createChannel();
          ??????
          ????????String?exchangeName?=?"test_confirm_exchange";
          ????????String?queueName?=?"test_confirm_queue";
          ????????String?routingKey?=?"item.#";
          ????????channel.exchangeDeclare(exchangeName,?"topic",?true,?false,?null);
          ????????channel.queueDeclare(queueName,?false,?false,?false,?null);

          ????????//一般不用代碼綁定,在管理界面手動(dòng)綁定
          ????????channel.queueBind(queueName,?exchangeName,?routingKey);

          ????????//創(chuàng)建消費(fèi)者并接收消息
          ????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,
          ???????????????????????????????????????AMQP.BasicProperties?properties,?byte[]?body)
          ????????????????????throws?IOException?{
          ????????????????String?message?=?new?String(body,?"UTF-8");
          ????????????????System.out.println("?[x]?Received?'"?+?message?+?"'");
          ????????????}
          ????????};

          ????????//設(shè)置?Channel?消費(fèi)者綁定隊(duì)列
          ????????channel.basicConsume(queueName,?true,?consumer);

          ????}
          }

          我們此處只關(guān)注生產(chǎn)端輸出消息

          Send?message?:?this?is?confirm?msg?
          Send?message?:?this?is?confirm?msg?
          Send?message?:?this?is?confirm?msg?
          Send?message?:?this?is?confirm?msg?
          Send?message?:?this?is?confirm?msg?
          succuss acktrue耗時(shí):3ms
          succuss acktrue耗時(shí):4ms

          注意事項(xiàng)

          • 我們采用的是異步 confirm 模式:提供一個(gè)回調(diào)方法,服務(wù)端 confirm 了一條或者多條消息后 Client 端會(huì)回調(diào)這個(gè)方法。除此之外還有單條同步 confirm 模式、批量同步 confirm 模式,由于現(xiàn)實(shí)場(chǎng)景中很少使用我們?cè)诖瞬蛔鼋榻B,如有興趣直接參考官方文檔。關(guān)注公眾號(hào)Java技術(shù)棧可以獲取更多系列RabbitMQ教程。

          • 我們運(yùn)行生產(chǎn)端會(huì)發(fā)現(xiàn)每次運(yùn)行結(jié)果都不一樣,會(huì)有多種情況出現(xiàn),因?yàn)?Broker 會(huì)進(jìn)行優(yōu)化,有時(shí)會(huì)批量一次性 confirm ,有時(shí)會(huì)分開(kāi)幾條 confirm。

          succuss?ack??true
          耗時(shí):3ms
          succuss?ack??false
          耗時(shí):4ms

          或者
          succuss?ack??true
          耗時(shí):3ms

          Return 消息機(jī)制

          • Return Listener 用于處理一-些不可路由的消息!

          • 消息生產(chǎn)者,通過(guò)指定一個(gè) ExchangeRoutingkey,把消息送達(dá)到某一個(gè)隊(duì)列中去,然后我們的消費(fèi)者監(jiān)聽(tīng)隊(duì)列,進(jìn)行消費(fèi)處理操作!

          • 但是在某些情況下,如果我們?cè)诎l(fā)送消息的時(shí)候,當(dāng)前的 exchange 不存在或者指定的路由 key 路由不到,這個(gè)時(shí)候如果我們需要監(jiān)聽(tīng)這種不可達(dá)的消息,就要使用 Return Listener !

          • 在基礎(chǔ)API中有一個(gè)關(guān)鍵的配置項(xiàng):Mandatory:如果為 true,則監(jiān)聽(tīng)器會(huì)接收到路由不可達(dá)的消息,然后進(jìn)行后續(xù)處理,如果為 false,那么 broker 端自動(dòng)刪除該消息!

          Return 消息機(jī)制流程圖

          Return 消息示例

          • 首先我們需要發(fā)送三條消息,并且故意將第 0 條消息的 routing Key設(shè)置為錯(cuò)誤的,讓他無(wú)法正常路由到消費(fèi)端。

          • mandatory 設(shè)置為 true 路由不可達(dá)的消息會(huì)被監(jiān)聽(tīng)到,不會(huì)被自動(dòng)刪除.即channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());

          • 最后添加監(jiān)聽(tīng)即可監(jiān)聽(tīng)到不可路由到消費(fèi)端的消息channel.addReturnListener(ReturnListener r))

          import?com.rabbitmq.client.*;
          import?java.io.IOException;

          public?class?ReturnListeningProducer?{
          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????ConnectionFactory?factory?=?new?ConnectionFactory();
          ????????factory.setHost("localhost");
          ????????factory.setVirtualHost("/");
          ????????factory.setUsername("guest");
          ????????factory.setPassword("guest");
          ??????
          ????????Connection?connection?=?factory.newConnection();
          ????????Channel?channel?=?connection.createChannel();

          ????????String?exchangeName?=?"test_return_exchange";
          ????????String?routingKey?=?"item.update";
          ????????String?errRoutingKey?=?"error.update";

          ????????//指定消息的投遞模式:confirm 確認(rèn)模式
          ????????channel.confirmSelect();

          ????????//發(fā)送
          ????????for?(int?i?=?0;?i?????????????String?msg?=?"this?is?return——listening?msg?";
          ????????????//@param?mandatory?設(shè)置為?true?路由不可達(dá)的消息會(huì)被監(jiān)聽(tīng)到,不會(huì)被自動(dòng)刪除
          ????????????if?(i?==?0)?{
          ????????????????channel.basicPublish(exchangeName,?errRoutingKey,?true,null,?msg.getBytes());
          ????????????}?else?{
          ????????????????channel.basicPublish(exchangeName,?routingKey,?true,?null,?msg.getBytes());
          ????????????}
          ????????????System.out.println("Send?message?:?"?+?msg);
          ????????}

          ????????//添加一個(gè)確認(rèn)監(jiān)聽(tīng),?這里就不關(guān)閉連接了,為了能保證能收到監(jiān)聽(tīng)消息
          ????????channel.addConfirmListener(new?ConfirmListener()?{
          ????????????/**
          ?????????????*?返回成功的回調(diào)函數(shù)
          ?????????????*/
          ????????????public?void?handleAck(long?deliveryTag,?boolean?multiple)?throws?IOException?{
          ????????????????System.out.println("succuss?ack");
          ????????????}
          ????????????/**
          ?????????????*?返回失敗的回調(diào)函數(shù)
          ?????????????*/
          ????????????public?void?handleNack(long?deliveryTag,?boolean?multiple)?throws?IOException?{
          ????????????????System.out.printf("defeat?ack");
          ????????????}
          ????????});

          ????????//添加一個(gè)?return?監(jiān)聽(tīng)
          ????????channel.addReturnListener(new?ReturnListener()?{
          ????????????public?void?handleReturn(int?replyCode,?String?replyText,?String?exchange,?String?routingKey,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????System.out.println("return?relyCode:?"?+?replyCode);
          ????????????????System.out.println("return?replyText:?"?+?replyText);
          ????????????????System.out.println("return?exchange:?"?+?exchange);
          ????????????????System.out.println("return?routingKey:?"?+?routingKey);
          ????????????????System.out.println("return?properties:?"?+?properties);
          ????????????????System.out.println("return?body:?"?+?new?String(body));
          ????????????}
          ????????});

          ????}
          }
          import?com.rabbitmq.client.*;
          import?java.io.IOException;

          public?class?ReturnListeningConsumer?{
          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????//1.?創(chuàng)建一個(gè)?ConnectionFactory?并進(jìn)行設(shè)置
          ????????ConnectionFactory?factory?=?new?ConnectionFactory();
          ????????factory.setHost("localhost");
          ????????factory.setVirtualHost("/");
          ????????factory.setUsername("guest");
          ????????factory.setPassword("guest");
          ????????factory.setAutomaticRecoveryEnabled(true);
          ????????factory.setNetworkRecoveryInterval(3000);

          ????????//2.?通過(guò)連接工廠來(lái)創(chuàng)建連接
          ????????Connection?connection?=?factory.newConnection();

          ????????//3.?通過(guò)?Connection?來(lái)創(chuàng)建?Channel
          ????????Channel?channel?=?connection.createChannel();

          ????????//4.?聲明
          ????????String?exchangeName?=?"test_return_exchange";
          ????????String?queueName?=?"test_return_queue";
          ????????String?routingKey?=?"item.#";

          ????????channel.exchangeDeclare(exchangeName,?"topic",?true,?false,?null);
          ????????channel.queueDeclare(queueName,?false,?false,?false,?null);

          ????????//一般不用代碼綁定,在管理界面手動(dòng)綁定
          ????????channel.queueBind(queueName,?exchangeName,?routingKey);

          ????????//5.?創(chuàng)建消費(fèi)者并接收消息
          ????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,
          ???????????????????????????????????????AMQP.BasicProperties?properties,?byte[]?body)
          ????????????????????throws?IOException?{
          ????????????????String?message?=?new?String(body,?"UTF-8");
          ????????????????System.out.println("?[x]?Received?'"?+?message?+?"'");
          ????????????}
          ????????};

          ????????//6.?設(shè)置?Channel?消費(fèi)者綁定隊(duì)列
          ????????channel.basicConsume(queueName,?true,?consumer);
          ????}
          }

          我們只關(guān)注生產(chǎn)端結(jié)果,消費(fèi)端只收到兩條消息。

          Send?message?:?this?is?return——listening?msg?
          Send?message?:?this?is?return——listening?msg?
          Send?message?:?this?is?return——listening?msg?
          return?relyCode:?312
          return?replyText:?NO_ROUTE
          return?exchange:?test_return_exchange
          return?routingKey:?error.update
          return?properties:?#contentHeader(content-type=null,?content-encoding=null,?headers=null,?delivery-mode=null,?priority=null,?correlation-id=null,?reply-to=null,?expiration=null,?message-id=null,?timestamp=null,?type=null,?user-id=null,?app-id=null,?cluster-id=null)
          return?body:?this?is?return——listening?msg?
          succuss?ack
          succuss?ack
          succuss?ack

          消費(fèi)端 Ack 和 Nack 機(jī)制

          消費(fèi)端進(jìn)行消費(fèi)的時(shí)候,如果由于業(yè)務(wù)異常我們可以進(jìn)行日志的記錄,然后進(jìn)行補(bǔ)償!如果由于服務(wù)器宕機(jī)等嚴(yán)重問(wèn)題,那我們就需要手工進(jìn)行ACK保障消費(fèi)端消費(fèi)成功!消費(fèi)端重回隊(duì)列是為了對(duì)沒(méi)有處理成功的消息,把消息重新會(huì)遞給Broker!一般我們?cè)趯?shí)際應(yīng)用中,都會(huì)關(guān)閉重回隊(duì)列,也就是設(shè)置為False。

          參考 api

          void?basicNack(long?deliveryTag,?boolean?multiple,?boolean?requeue)?throws?IOException;`
          ?
          void?basicAck(long?deliveryTag,?boolean?multiple)?throws?IOException;

          如何設(shè)置手動(dòng) Ack 、Nack 以及重回隊(duì)列

          • 首先我們發(fā)送五條消息,將每條消息對(duì)應(yīng)的循環(huán)下標(biāo) i 放入消息的 properties 中作為標(biāo)記,以便于我們?cè)诤竺娴幕卣{(diào)方法中識(shí)別。

          • 其次, 我們將消費(fèi)端的 ·channel.basicConsume(queueName, false, consumer); 中的 autoAck屬性設(shè)置為 false,如果設(shè)置為true的話 將會(huì)正常輸出五條消息。

          • 我們通過(guò) Thread.sleep(2000)來(lái)延時(shí)一秒,用以看清結(jié)果。我們獲取到properties中的num之后,通過(guò)channel.basicNack(envelope.getDeliveryTag(), false, true);num為0的消息設(shè)置為 nack,即消費(fèi)失敗,并且將 requeue屬性設(shè)置為true,即消費(fèi)失敗的消息重回隊(duì)列末端。

          import?com.rabbitmq.client.*;
          import?java.util.HashMap;
          import?java.util.Map;

          public?class?AckAndNackProducer?{
          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????ConnectionFactory?factory?=?new?ConnectionFactory();
          ????????factory.setHost("localhost");
          ????????factory.setVirtualHost("/");
          ????????factory.setUsername("guest");
          ????????factory.setPassword("guest");

          ????????Connection?connection?=?factory.newConnection();
          ????????Channel?channel?=?connection.createChannel();

          ????????String?exchangeName?=?"test_ack_exchange";
          ????????String?routingKey?=?"item.update";

          ????????String?msg?=?"this?is?ack?msg";
          ????????for?(int?i?=?0;?i?????????????Map?headers?=?new?HashMap();
          ????????????headers.put("num"?,i);

          ????????????AMQP.BasicProperties?properties?=?new?AMQP.BasicProperties().builder()
          ????????????????????.deliveryMode(2)
          ????????????????????.headers(headers)
          ????????????????????.build();

          ????????????String?tem?=?msg?+?":"?+?i;

          ????????????channel.basicPublish(exchangeName,?routingKey,?true,?properties,?tem.getBytes());
          ????????????System.out.println("Send?message?:?"?+?msg);
          ????????}

          ????????channel.close();
          ????????connection.close();
          ????}
          }
          import?com.rabbitmq.client.*;
          import?java.io.IOException;

          public?class?AckAndNackConsumer?{
          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????ConnectionFactory?factory?=?new?ConnectionFactory();
          ????????factory.setHost("localhost");
          ????????factory.setVirtualHost("/");
          ????????factory.setUsername("guest");
          ????????factory.setPassword("guest");
          ????????factory.setAutomaticRecoveryEnabled(true);
          ????????factory.setNetworkRecoveryInterval(3000);

          ????????Connection?connection?=?factory.newConnection();

          ????????final?Channel?channel?=?connection.createChannel();

          ????????String?exchangeName?=?"test_ack_exchange";
          ????????String?queueName?=?"test_ack_queue";
          ????????String?routingKey?=?"item.#";
          ????????channel.exchangeDeclare(exchangeName,?"topic",?true,?false,?null);
          ????????channel.queueDeclare(queueName,?false,?false,?false,?null);

          ????????//一般不用代碼綁定,在管理界面手動(dòng)綁定
          ????????channel.queueBind(queueName,?exchangeName,?routingKey);

          ????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,
          ???????????????????????????????????????AMQP.BasicProperties?properties,?byte[]?body)
          ????????????????????throws?IOException?{

          ????????????????String?message?=?new?String(body,?"UTF-8");
          ????????????????System.out.println("?[x]?Received?'"?+?message?+?"'");

          ????????????????try?{
          ????????????????????Thread.sleep(2000);
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}

          ????????????????if?((Integer)?properties.getHeaders().get("num")?==?0)?{
          ????????????????????channel.basicNack(envelope.getDeliveryTag(),?false,?true);
          ????????????????}?else?{
          ????????????????????channel.basicAck(envelope.getDeliveryTag(),?false);
          ????????????????}
          ????????????}
          ????????};

          ????????//6.?設(shè)置?Channel?消費(fèi)者綁定隊(duì)列
          ????????channel.basicConsume(queueName,?false,?consumer);

          ????}
          }

          我們此處只關(guān)心消費(fèi)端輸出,可以看到第 0 條消費(fèi)失敗重新回到隊(duì)列尾部消費(fèi)。

          ?[x]?Received?'this?is?ack?msg:1'
          ?[x]?Received?'this?is?ack?msg:2'
          ?[x]?Received?'this?is?ack?msg:3'
          ?[x]?Received?'this?is?ack?msg:4'
          ?[x]?Received?'this?is?ack?msg:0'
          ?[x]?Received?'this?is?ack?msg:0'
          ?[x]?Received?'this?is?ack?msg:0'
          ?[x]?Received?'this?is?ack?msg:0'
          ?[x]?Received?'this?is?ack?msg:0'






          關(guān)注Java技術(shù)??锤喔韶?/strong>



          戳原文,獲取精選面試題!
          瀏覽 60
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  91五月婷婷华人网站 | 青青草wwwwwwwww | 在线天堂在线 | 大鸡吧日逼网站 | 麻豆影音先锋 |