RabbitMQ 如何對消費端限流?
點擊關(guān)注公眾號,Java干貨及時送達
1. 為什么要對消費端限流
假設(shè)一個場景,首先,我們 RabbitMQ 服務(wù)器積壓了有上萬條未處理的消息,我們隨便打開一個消費者客戶端,會出現(xiàn)這樣情況: 巨量的消息瞬間全部推送過來,但是我們單個客戶端無法同時處理這么多數(shù)據(jù)!
當數(shù)據(jù)量特別大的時候,我們對生產(chǎn)端限流肯定是不科學的,因為有時候并發(fā)量就是特別大,有時候并發(fā)量又特別少,我們無法約束生產(chǎn)端,這是用戶的行為。
所以我們應該對消費端限流,用于保持消費端的穩(wěn)定,當消息數(shù)量激增的時候很有可能造成資源耗盡,以及影響服務(wù)的性能,導致系統(tǒng)的卡頓甚至直接崩潰。
2.限流的 API 講解
RabbitMQ 提供了一種 qos (服務(wù)質(zhì)量保證)功能,即在非自動確認消息的前提下,如果一定數(shù)目的消息(通過基于 consume 或者 channel 設(shè)置 Qos 的值)未被確認前,不進行消費新的消息。
/**
* Request specific "quality of service" settings.
* These settings impose limits on the amount of data the server
* will deliver to consumers before requiring acknowledgements.
* Thus they provide a means of consumer-initiated flow control.
* @param prefetchSize maximum amount of content (measured in
* octets) that the server will deliver, 0 if unlimited
* @param prefetchCount maximum number of messages that the server
* will deliver, 0 if unlimited
* @param global true if the settings should be applied to the
* entire channel rather than each consumer
* @throws java.io.IOException if an error is encountered
*/
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
prefetchSize:0,單條消息大小限制,0代表不限制 prefetchCount:一次性消費的消息數(shù)量。會告訴 RabbitMQ 不要同時給一個消費者推送多于 N 個消息,即一旦有 N 個消息還沒有 ack,則該 consumer 將 block 掉,直到有消息 ack。 global:true、false 是否將上面設(shè)置應用于 channel,簡單點說,就是上面限制是 channel 級別的還是 consumer 級別。當我們設(shè)置為 false 的時候生效,設(shè)置為 true 的時候沒有了限流功能,因為 channel 級別尚未實現(xiàn)。 注意:prefetchSize 和 global 這兩項,rabbitmq 沒有實現(xiàn),暫且不研究。特別注意一點,prefetchCount 在 no_ask=false 的情況下才生效,即在自動應答的情況下這兩個值是不生效的。
3.如何對消費端進行限流
首先第一步,我們既然要使用消費端限流,我們需要關(guān)閉自動 ack,將 autoAck 設(shè)置為 false channel.basicConsume(queueName, false, consumer);第二步我們來設(shè)置具體的限流大小以及數(shù)量。 channel.basicQos(0, 15, false);第三步在消費者的 handleDelivery 消費方法中手動 ack,并且設(shè)置批量處理 ack 回應為 true channel.basicAck(envelope.getDeliveryTag(), true);
這是生產(chǎn)端代碼,與前幾章的生產(chǎn)端代碼沒有做任何改變,主要的操作集中在消費端。RabbitMQ 系列面試題我都整理好了,關(guān)注公眾號Java技術(shù)棧,回復:面試,免費獲取哦。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class QosProducer {
public static void main(String[] args) throws Exception {
//1. 創(chuàng)建一個 ConnectionFactory 并進行設(shè)置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//2. 通過連接工廠來創(chuàng)建連接
Connection connection = factory.newConnection();
//3. 通過 Connection 來創(chuàng)建 Channel
Channel channel = connection.createChannel();
//4. 聲明
String exchangeName = "test_qos_exchange";
String routingKey = "item.add";
//5. 發(fā)送
String msg = "this is qos msg";
for (int i = 0; i < 10; i++) {
String tem = msg + " : " + i;
channel.basicPublish(exchangeName, routingKey, null, tem.getBytes());
System.out.println("Send message : " + tem);
}
//6. 關(guān)閉連接
channel.close();
connection.close();
}
}
這里我們創(chuàng)建一個消費者,通過以下代碼來驗證限流效果以及 global 參數(shù)設(shè)置為 true 時不起作用。我們通過Thread.sleep(5000); 來讓 ack 即處理消息的過程慢一些,這樣我們就可以從后臺管理工具中清晰觀察到限流情況。
import com.rabbitmq.client.*;
import java.io.IOException;
public class QosConsumer {
public static void main(String[] args) throws Exception {
//1. 創(chuàng)建一個 ConnectionFactory 并進行設(shè)置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
//2. 通過連接工廠來創(chuàng)建連接
Connection connection = factory.newConnection();
//3. 通過 Connection 來創(chuàng)建 Channel
final Channel channel = connection.createChannel();
//4. 聲明
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "item.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.basicQos(0, 3, false);
//一般不用代碼綁定,在管理界面手動綁定
channel.queueBind(queueName, exchangeName, routingKey);
//5. 創(chuàng)建消費者并接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(body, "UTF-8");
System.out.println("[x] Received '" + message + "'");
channel.basicAck(envelope.getDeliveryTag(), true);
}
};
//6. 設(shè)置 Channel 消費者綁定隊列
channel.basicConsume(queueName, false, consumer);
channel.basicConsume(queueName, false, consumer1);
}
}
我們從下圖中發(fā)現(xiàn) Unacked值一直都是 3 ,每過 5 秒 消費一條消息即 Ready 和 Total 都減少 3,而 Unacked的值在這里代表消費者正在處理的消息,通過我們的實驗發(fā)現(xiàn)了消費者一次性最多處理 3 條消息,達到了消費者限流的預期功能。

當我們將void basicQos(int prefetchSize, int prefetchCount, boolean global)中的 global 設(shè)置為 true的時候我們發(fā)現(xiàn)并沒有了限流的作用。






關(guān)注Java技術(shù)棧看更多干貨


