<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RabbitMQ 如何對消費(fèi)端限流?

          共 9946字,需瀏覽 20分鐘

           ·

          2021-03-26 14:30

          作者:海向
          出處:www.cnblogs.com/haixiang/p/10905189.html

          1. 為什么要對消費(fèi)端限流

          假設(shè)一個場景,首先,我們 RabbitMQ 服務(wù)器積壓了有上萬條未處理的消息,我們隨便打開一個消費(fèi)者客戶端,會出現(xiàn)這樣情況: 巨量的消息瞬間全部推送過來,但是我們單個客戶端無法同時處理這么多數(shù)據(jù)!

          當(dāng)數(shù)據(jù)量特別大的時候,我們對生產(chǎn)端限流肯定是不科學(xué)的,因?yàn)橛袝r候并發(fā)量就是特別大,有時候并發(fā)量又特別少,我們無法約束生產(chǎn)端,這是用戶的行為。

          所以我們應(yīng)該對消費(fèi)端限流,用于保持消費(fèi)端的穩(wěn)定,當(dāng)消息數(shù)量激增的時候很有可能造成資源耗盡,以及影響服務(wù)的性能,導(dǎo)致系統(tǒng)的卡頓甚至直接崩潰。

          2.限流的 API 講解

          RabbitMQ 提供了一種 qos (服務(wù)質(zhì)量保證)功能,即在非自動確認(rèn)消息的前提下,如果一定數(shù)目的消息(通過基于 consume 或者 channel 設(shè)置 Qos 的值)未被確認(rèn)前,不進(jìn)行消費(fèi)新的消息。

          /**
          * Request specific "quality of service" settings.
          * These settings impose limits on the amount of data the server
          * will deliver to consumers before requiring acknowledgements.
          * Thus they provide a means of consumer-initiated flow control.
          @param prefetchSize maximum amount of content (measured in
          * octets) that the server will deliver, 0 if unlimited
          @param prefetchCount maximum number of messages that the server
          * will deliver, 0 if unlimited
          @param global true if the settings should be applied to the
          * entire channel rather than each consumer
          @throws java.io.IOException if an error is encountered
          */

          void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
          • prefetchSize:0,單條消息大小限制,0代表不限制
          • prefetchCount:一次性消費(fèi)的消息數(shù)量。會告訴 RabbitMQ 不要同時給一個消費(fèi)者推送多于 N 個消息,即一旦有 N 個消息還沒有 ack,則該 consumer 將 block 掉,直到有消息 ack。
          • global:true、false 是否將上面設(shè)置應(yīng)用于 channel,簡單點(diǎn)說,就是上面限制是 channel 級別的還是 consumer 級別。當(dāng)我們設(shè)置為 false 的時候生效,設(shè)置為 true 的時候沒有了限流功能,因?yàn)?channel 級別尚未實(shí)現(xiàn)。
          • 注意:prefetchSize 和 global 這兩項(xiàng),rabbitmq 沒有實(shí)現(xiàn),暫且不研究。特別注意一點(diǎn),prefetchCount 在 no_ask=false 的情況下才生效,即在自動應(yīng)答的情況下這兩個值是不生效的。

          3.如何對消費(fèi)端進(jìn)行限流

          • 首先第一步,我們既然要使用消費(fèi)端限流,我們需要關(guān)閉自動 ack,將 autoAck 設(shè)置為 falsechannel.basicConsume(queueName, false, consumer);
          • 第二步我們來設(shè)置具體的限流大小以及數(shù)量。channel.basicQos(0, 15, false);
          • 第三步在消費(fèi)者的 handleDelivery 消費(fèi)方法中手動 ack,并且設(shè)置批量處理 ack 回應(yīng)為 truechannel.basicAck(envelope.getDeliveryTag(), true);

          這是生產(chǎn)端代碼,與前幾章的生產(chǎn)端代碼沒有做任何改變,主要的操作集中在消費(fèi)端。RabbitMQ 系列面試題我都整理好了,關(guān)注公眾號Java技術(shù)棧,回復(fù):面試,免費(fèi)獲取哦。

          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.rabbitmq.client.ConnectionFactory;

          public class QosProducer {
              public static void main(String[] args) throws Exception {
                  //1. 創(chuàng)建一個 ConnectionFactory 并進(jìn)行設(shè)置
                  ConnectionFactory factory = new ConnectionFactory();
                  factory.setHost("localhost");
                  factory.setVirtualHost("/");
                  factory.setUsername("guest");
                  factory.setPassword("guest");

                  //2. 通過連接工廠來創(chuàng)建連接
                  Connection connection = factory.newConnection();

                  //3. 通過 Connection 來創(chuàng)建 Channel
                  Channel channel = connection.createChannel();

                  //4. 聲明
                  String exchangeName = "test_qos_exchange";
                  String routingKey = "item.add";

                  //5. 發(fā)送
                  String msg = "this is qos msg";
                  for (int i = 0; i < 10; i++) {
                      String tem = msg + " : " + i;
                      channel.basicPublish(exchangeName, routingKey, null, tem.getBytes());
                      System.out.println("Send message : " + tem);
                  }

                  //6. 關(guān)閉連接
                  channel.close();
                  connection.close();
              }
          }

          這里我們創(chuàng)建一個消費(fèi)者,通過以下代碼來驗(yàn)證限流效果以及 global 參數(shù)設(shè)置為 true 時不起作用。我們通過Thread.sleep(5000); 來讓 ack 即處理消息的過程慢一些,這樣我們就可以從后臺管理工具中清晰觀察到限流情況。

          import com.rabbitmq.client.*;
          import java.io.IOException;
          public class QosConsumer {
              public static void main(String[] args) throws Exception {
                  //1. 創(chuàng)建一個 ConnectionFactory 并進(jìn)行設(shè)置
                  ConnectionFactory factory = new ConnectionFactory();
                  factory.setHost("localhost");
                  factory.setVirtualHost("/");
                  factory.setUsername("guest");
                  factory.setPassword("guest");
                  factory.setAutomaticRecoveryEnabled(true);
                  factory.setNetworkRecoveryInterval(3000);

                  //2. 通過連接工廠來創(chuàng)建連接
                  Connection connection = factory.newConnection();

                  //3. 通過 Connection 來創(chuàng)建 Channel
                  final Channel channel = connection.createChannel();

                  //4. 聲明
                  String exchangeName = "test_qos_exchange";
                  String queueName = "test_qos_queue";
                  String routingKey = "item.#";
                  channel.exchangeDeclare(exchangeName, "topic"truefalsenull);
                  channel.queueDeclare(queueName, truefalsefalsenull);

                  channel.basicQos(03false);

                  //一般不用代碼綁定,在管理界面手動綁定
                  channel.queueBind(queueName, exchangeName, routingKey);

                  //5. 創(chuàng)建消費(fèi)者并接收消息
                  Consumer consumer = new DefaultConsumer(channel) {
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope,
                                                 AMQP.BasicProperties properties, byte[] body)

                              throws IOException 
          {
                          try {
                              Thread.sleep(5000);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          String message = new String(body, "UTF-8");
                          System.out.println("[x] Received '" + message + "'");

                          channel.basicAck(envelope.getDeliveryTag(), true);
                      }
                  };
                  //6. 設(shè)置 Channel 消費(fèi)者綁定隊(duì)列
                  channel.basicConsume(queueName, false, consumer);
                  channel.basicConsume(queueName, false, consumer1);
              }
          }

          我們從下圖中發(fā)現(xiàn) Unacked值一直都是 3 ,每過 5 秒 消費(fèi)一條消息即 Ready 和 Total 都減少 3,而 Unacked的值在這里代表消費(fèi)者正在處理的消息,通過我們的實(shí)驗(yàn)發(fā)現(xiàn)了消費(fèi)者一次性最多處理 3 條消息,達(dá)到了消費(fèi)者限流的預(yù)期功能。

          當(dāng)我們將void basicQos(int prefetchSize, int prefetchCount, boolean global)中的 global 設(shè)置為 true的時候我們發(fā)現(xiàn)并沒有了限流的作用。

          獲取更多優(yōu)質(zhì)文章,點(diǎn)擊關(guān)注

          ??????


          瀏覽 60
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  www.水蜜桃视频 | 无码无套少妇毛多18PXXXX | 中国操逼视频网站 | 中文字幕国产 | 性一交一乱一A片久久99蜜桃 |