<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          面試官:說(shuō)說(shuō)RabbitMQ的消費(fèi)端限流、TTL、死信隊(duì)列

          共 9814字,需瀏覽 20分鐘

           ·

          2021-03-01 11:14

          你知道的越多,不知道的就越多,業(yè)余的像一棵小草!

          你來(lái),我們一起精進(jìn)!你不來(lái),我和你的競(jìng)爭(zhēng)對(duì)手一起精進(jìn)!

          編輯:業(yè)余草

          cnblogs.com/Jacian/p/14212401.html

          推薦:https://www.xttblog.com/?p=5160


          關(guān)于消息隊(duì)列,我們不僅要會(huì)用,還要能說(shuō)出它的設(shè)計(jì)實(shí)現(xiàn)原理,這也是在面試中經(jīng)常被問(wèn)到的一些經(jīng)典面試題。下面通過(guò)本文,我們一起來(lái)重新認(rèn)識(shí)一下RabbitMQ的消費(fèi)端限流、TTL、死信隊(duì)列。


          消費(fèi)端限流


          1. 為什么要對(duì)消費(fèi)端限流


          假設(shè)一個(gè)場(chǎng)景,首先,我們 Rabbitmq 服務(wù)器積壓了有上萬(wàn)條未處理的消息,我們隨便打開(kāi)一個(gè)消費(fèi)者客戶(hù)端,會(huì)出現(xiàn)這樣情況: 巨量的消息瞬間全部推送過(guò)來(lái),但是我們單個(gè)客戶(hù)端無(wú)法同時(shí)處理這么多數(shù)據(jù)!

          當(dāng)數(shù)據(jù)量特別大的時(shí)候,我們對(duì)生產(chǎn)端限流肯定是不科學(xué)的,因?yàn)橛袝r(shí)候并發(fā)量就是特別大,有時(shí)候并發(fā)量又特別少,我們無(wú)法約束生產(chǎn)端,這是用戶(hù)的行為。所以我們應(yīng)該對(duì)消費(fèi)端限流,用于保持消費(fèi)端的穩(wěn)定,當(dāng)消息數(shù)量激增的時(shí)候很有可能造成資源耗盡,以及影響服務(wù)的性能,導(dǎo)致系統(tǒng)的卡頓甚至直接崩潰。

          2.限流的 api 講解


          RabbitMQ 提供了一種 qos (服務(wù)質(zhì)量保證)功能,即在非自動(dòng)確認(rèn)消息的前提下,如果一定數(shù)目的消息(通過(guò)基于 consume 或者 channel 設(shè)置 Qos 的值)未被確認(rèn)前,不進(jìn)行消費(fèi)新的消息。
          /**
          * Request specific "quality of service" settings.
          * These settings impose limits on the amount of data the server
          * will deliver to consumers before requiring acknowledgements.
          * Thus they provide a means of consumer-initiated flow control.
          * @param prefetchSize maximum amount of content (measured in
          * octets) that the server will deliver, 0 if unlimited
          * @param prefetchCount maximum number of messages that the server
          * will deliver, 0 if unlimited
          * @param global true if the settings should be applied to the
          * entire channel rather than each consumer
          * @throws java.io.IOException if an error is encountered
          */

          void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;


          • prefetchSize:0,單條消息大小限制,0代表不限制


          • prefetchCount:一次性消費(fèi)的消息數(shù)量。會(huì)告訴 RabbitMQ 不要同時(shí)給一個(gè)消費(fèi)者推送多于 N 個(gè)消息,即一旦有 N 個(gè)消息還沒(méi)有 ack,則該 consumer 將 block 掉,直到有消息 ack。


          • global:true、false 是否將上面設(shè)置應(yīng)用于 channel,簡(jiǎn)單點(diǎn)說(shuō),就是上面限制是 channel 級(jí)別的還是 consumer 級(jí)別。當(dāng)我們?cè)O(shè)置為 false 的時(shí)候生效,設(shè)置為 true 的時(shí)候沒(méi)有了限流功能,因?yàn)?channel 級(jí)別尚未實(shí)現(xiàn)。


          • 注意:prefetchSize 和 global 這兩項(xiàng),rabbitmq 沒(méi)有實(shí)現(xiàn),暫且不研究。特別注意一點(diǎn),prefetchCount 在 no_ask=false 的情況下才生效,即在自動(dòng)應(yīng)答的情況下這兩個(gè)值是不生效的。

          3.如何對(duì)消費(fèi)端進(jìn)行限流


          • 首先第一步,我們既然要使用消費(fèi)端限流,我們需要關(guān)閉自動(dòng) ack,將 autoAck 設(shè)置為 falsechannel.basicConsume(queueName, false, consumer);


          • 第二步我們來(lái)設(shè)置具體的限流大小以及數(shù)量。channel.basicQos(0, 15, false);


          • 第三步在消費(fèi)者的 handleDelivery 消費(fèi)方法中手動(dòng) ack,并且設(shè)置批量處理 ack 回應(yīng)為 truechannel.basicAck(envelope.getDeliveryTag(), true);

          這是生產(chǎn)端代碼,與前幾章的生產(chǎn)端代碼沒(méi)有做任何改變,主要的操作集中在消費(fèi)端。
          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.rabbitmq.client.ConnectionFactory;

          public class QosProducer {
          public static void main(String[] args) throws Exception {
          //1. 創(chuàng)建一個(gè) ConnectionFactory 并進(jìn)行設(shè)置
          ConnectionFactory factory = new ConnectionFactory();
          factory.setHost("localhost");
          factory.setVirtualHost("/");
          factory.setUsername("guest");
          factory.setPassword("guest");

          //2. 通過(guò)連接工廠來(lái)創(chuàng)建連接
          Connection connection = factory.newConnection();

          //3. 通過(guò) Connection 來(lái)創(chuàng)建 Channel
          Channel channel = connection.createChannel();

          //4. 聲明
          String exchangeName = "test_qos_exchange";
          String routingKey = "item.add";

          //5. 發(fā)送
          String msg = "this is qos msg";
          for (int i = 0; i < 10; i++) {
          String tem = msg + " : " + i;
          channel.basicPublish(exchangeName, routingKey, null, tem.getBytes());
          System.out.println("Send message : " + tem);
          }

          //6. 關(guān)閉連接
          channel.close();
          connection.close();
          }
          }


          這里我們創(chuàng)建一個(gè)消費(fèi)者,通過(guò)以下代碼來(lái)驗(yàn)證限流效果以及 global 參數(shù)設(shè)置為 true 時(shí)不起作用.。我們通過(guò)Thread.sleep(5000); 來(lái)讓 ack 即處理消息的過(guò)程慢一些,這樣我們就可以從后臺(tái)管理工具中清晰觀察到限流情況。
          import com.rabbitmq.client.*;
          import java.io.IOException;
          public class QosConsumer {
          public static void main(String[] args) throws Exception {
          //1. 創(chuàng)建一個(gè) ConnectionFactory 并進(jìn)行設(shè)置
          ConnectionFactory factory = new ConnectionFactory();
          factory.setHost("localhost");
          factory.setVirtualHost("/");
          factory.setUsername("guest");
          factory.setPassword("guest");
          factory.setAutomaticRecoveryEnabled(true);
          factory.setNetworkRecoveryInterval(3000);

          //2. 通過(guò)連接工廠來(lái)創(chuàng)建連接
          Connection connection = factory.newConnection();

          //3. 通過(guò) Connection 來(lái)創(chuàng)建 Channel
          final Channel channel = connection.createChannel();

          //4. 聲明
          String exchangeName = "test_qos_exchange";
          String queueName = "test_qos_queue";
          String routingKey = "item.#";
          channel.exchangeDeclare(exchangeName, "topic", true, false, null);
          channel.queueDeclare(queueName, true, false, false, null);

          channel.basicQos(0, 3, false);

          //一般不用代碼綁定,在管理界面手動(dòng)綁定
          channel.queueBind(queueName, exchangeName, routingKey);

          //5. 創(chuàng)建消費(fèi)者并接收消息
          Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope,
          AMQP.BasicProperties properties, byte[] body)
          throws IOException {
          try {
          Thread.sleep(5000);
          } catch (InterruptedException e) {
          e.printStackTrace();
          }
          String message = new String(body, "UTF-8");
          System.out.println("[x] Received '" + message + "'");

          channel.basicAck(envelope.getDeliveryTag(), true);
          }
          };
          //6. 設(shè)置 Channel 消費(fèi)者綁定隊(duì)列
          channel.basicConsume(queueName, false, consumer);
          channel.basicConsume(queueName, false, consumer1);
          }
          }


          我們從下圖中發(fā)現(xiàn) Unacked值一直都是 3 ,每過(guò) 5 秒 消費(fèi)一條消息即 Ready 和 Total 都減少 3,而 Unacked的值在這里代表消費(fèi)者正在處理的消息,通過(guò)我們的實(shí)驗(yàn)發(fā)現(xiàn)了消費(fèi)者一次性最多處理 3 條消息,達(dá)到了消費(fèi)者限流的預(yù)期功能。

          當(dāng)我們將void basicQos(int prefetchSize, int prefetchCount, boolean global)中的 global 設(shè)置為 true的時(shí)候我們發(fā)現(xiàn)并沒(méi)有了限流的作用。

          TTL


          TTL是Time To Live的縮寫(xiě),也就是生存時(shí)間。RabbitMQ支持消息的過(guò)期時(shí)間,在消息發(fā)送時(shí)可以進(jìn)行指定。

          RabbitMQ支持隊(duì)列的過(guò)期時(shí)間,從消息入隊(duì)列開(kāi)始計(jì)算,只要超過(guò)了隊(duì)列的超時(shí)時(shí)間配置,那么消息會(huì)自動(dòng)的清除。

          這與 Redis 中的過(guò)期時(shí)間概念類(lèi)似。我們應(yīng)該合理使用 TTL 技術(shù),可以有效的處理過(guò)期垃圾消息,從而降低服務(wù)器的負(fù)載,最大化的發(fā)揮服務(wù)器的性能。

          RabbitMQ allows you to set TTL (time to live) for both messages and queues. This can be done using optional queue arguments or policies (the latter option is recommended). Message TTL can be enforced for a single queue, a group of queues or applied for individual messages.

          RabbitMQ允許您為消息和隊(duì)列設(shè)置TTL(生存時(shí)間)。 這可以使用可選的隊(duì)列參數(shù)或策略來(lái)完成(建議使用后一個(gè)選項(xiàng))。 可以對(duì)單個(gè)隊(duì)列,一組隊(duì)列強(qiáng)制執(zhí)行消息TTL,也可以為單個(gè)消息應(yīng)用消息TTL。


          ——摘自 RabbitMQ 官方文檔


          1.消息的 TTL


          我們?cè)谏a(chǎn)端發(fā)送消息的時(shí)候可以在 properties 中指定 expiration屬性來(lái)對(duì)消息過(guò)期時(shí)間進(jìn)行設(shè)置,單位為毫秒(ms)。
          /**
          * deliverMode 設(shè)置為 2 的時(shí)候代表持久化消息
          * expiration 意思是設(shè)置消息的有效期,超過(guò)10秒沒(méi)有被消費(fèi)者接收后會(huì)被自動(dòng)刪除
          * headers 自定義的一些屬性
          * */

          //5. 發(fā)送
          Map<String, Object> headers = new HashMap<String, Object>();
          headers.put("myhead1", "111");
          headers.put("myhead2", "222");

          AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
          .deliveryMode(2)
          .contentEncoding("UTF-8")
          .expiration("100000")
          .headers(headers)
          .build();
          String msg = "test message";
          channel.basicPublish("", queueName, properties, msg.getBytes());

          我們也可以后臺(tái)管理頁(yè)面中進(jìn)入 Exchange 發(fā)送消息指定expiration

          2.隊(duì)列的 TTL


          我們也可以在后臺(tái)管理界面中新增一個(gè) queue,創(chuàng)建時(shí)可以設(shè)置 ttl,對(duì)于隊(duì)列中超過(guò)該時(shí)間的消息將會(huì)被移除。

          死信隊(duì)列


          死信隊(duì)列:沒(méi)有被及時(shí)消費(fèi)的消息存放的隊(duì)列

          消息沒(méi)有被及時(shí)消費(fèi)的原因:

          • a.消息被拒絕(basic.reject/ basic.nack)并且不再重新投遞 requeue=false


          • b.TTL(time-to-live) 消息超時(shí)未消費(fèi)


          • c.達(dá)到最大隊(duì)列長(zhǎng)度

          實(shí)現(xiàn)死信隊(duì)列步驟


          • 首先需要設(shè)置死信隊(duì)列的 exchange 和 queue,然后進(jìn)行綁定:

          Exchange: dlx.exchange
          Queue: dlx.queue
          RoutingKey: # 代表接收所有路由 key

          • 然后我們進(jìn)行正常聲明交換機(jī)、隊(duì)列、綁定,只不過(guò)我們需要在普通隊(duì)列加上一個(gè)參數(shù)即可: arguments.put("x-dead-letter-exchange",' dlx.exchange' )


          • 這樣消息在過(guò)期、requeue失敗、 隊(duì)列在達(dá)到最大長(zhǎng)度時(shí),消息就可以直接路由到死信隊(duì)列!

          import com.rabbitmq.client.AMQP;
          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.rabbitmq.client.ConnectionFactory;
          public class DlxProducer {
          public static void main(String[] args) throws Exception {
          //設(shè)置連接以及創(chuàng)建 channel 湖綠
          String exchangeName = "test_dlx_exchange";
          String routingKey = "item.update";

          String msg = "this is dlx msg";

          //我們?cè)O(shè)置消息過(guò)期時(shí)間,10秒后再消費(fèi) 讓消息進(jìn)入死信隊(duì)列
          AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
          .deliveryMode(2)
          .expiration("10000")
          .build();

          channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());
          System.out.println("Send message : " + msg);

          channel.close();
          connection.close();
          }
          }


          import com.rabbitmq.client.*;
          import java.io.IOException;
          import java.util.HashMap;
          import java.util.Map;

          public class DlxConsumer {
          public static void main(String[] args) throws Exception {
          //創(chuàng)建連接、創(chuàng)建channel忽略 內(nèi)容可以在上面代碼中獲取
          String exchangeName = "test_dlx_exchange";
          String queueName = "test_dlx_queue";
          String routingKey = "item.#";

          //必須設(shè)置參數(shù)到 arguments 中
          Map<String, Object> arguments = new HashMap<String, Object>();
          arguments.put("x-dead-letter-exchange", "dlx.exchange");

          channel.exchangeDeclare(exchangeName, "topic", true, false, null);
          //將 arguments 放入隊(duì)列的聲明中
          channel.queueDeclare(queueName, true, false, false, arguments);

          //一般不用代碼綁定,在管理界面手動(dòng)綁定
          channel.queueBind(queueName, exchangeName, routingKey);


          //聲明死信隊(duì)列
          channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
          channel.queueDeclare("dlx.queue", true, false, false, null);
          //路由鍵為 # 代表可以路由到所有消息
          channel.queueBind("dlx.queue", "dlx.exchange", "#");

          Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope,
          AMQP.BasicProperties properties, byte[] body)
          throws IOException {

          String message = new String(body, "UTF-8");
          System.out.println(" [x] Received '" + message + "'");

          }
          };

          //6. 設(shè)置 Channel 消費(fèi)者綁定隊(duì)列
          channel.basicConsume(queueName, true, consumer);
          }
          }


          總結(jié)


          DLX也是一個(gè)正常的 Exchange,和一般的 Exchange 沒(méi)有區(qū)別,它能在任何的隊(duì)列上被指定,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性。當(dāng)這個(gè)隊(duì)列中有死信時(shí),RabbitMQ 就會(huì)自動(dòng)的將這個(gè)消息重新發(fā)布到設(shè)置的 Exchange 上去,進(jìn)而被路由到另一個(gè)隊(duì)列。可以監(jiān)聽(tīng)這個(gè)隊(duì)列中消息做相應(yīng)的處理。
          瀏覽 64
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  太原操逼网站 | 色老板网站 | 18禁免费看网站 | 亚洲中文视频在线观看 | 久久色免费视频 |