RabbitMQ 如何對消費(fèi)端限流?
1. 為什么要對消費(fèi)端限流
假設(shè)一個場景,首先,我們 RabbitMQ 服務(wù)器積壓了有上萬條未處理的消息,我們隨便打開一個消費(fèi)者客戶端,會出現(xiàn)這樣情況: 巨量的消息瞬間全部推送過來,但是我們單個客戶端無法同時處理這么多數(shù)據(jù)!
當(dāng)數(shù)據(jù)量特別大的時候,我們對生產(chǎn)端限流肯定是不科學(xué)的,因?yàn)橛袝r候并發(fā)量就是特別大,有時候并發(fā)量又特別少,我們無法約束生產(chǎn)端,這是用戶的行為。
所以我們應(yīng)該對消費(fèi)端限流,用于保持消費(fèi)端的穩(wěn)定,當(dāng)消息數(shù)量激增的時候很有可能造成資源耗盡,以及影響服務(wù)的性能,導(dǎo)致系統(tǒng)的卡頓甚至直接崩潰。
2.限流的 API 講解
RabbitMQ 提供了一種 qos (服務(wù)質(zhì)量保證)功能,即在非自動確認(rèn)消息的前提下,如果一定數(shù)目的消息(通過基于 consume 或者 channel 設(shè)置 Qos 的值)未被確認(rèn)前,不進(jìn)行消費(fèi)新的消息。
/**
* Request specific "quality of service" settings.
* These settings impose limits on the amount of data the server
* will deliver to consumers before requiring acknowledgements.
* Thus they provide a means of consumer-initiated flow control.
* @param prefetchSize maximum amount of content (measured in
* octets) that the server will deliver, 0 if unlimited
* @param prefetchCount maximum number of messages that the server
* will deliver, 0 if unlimited
* @param global true if the settings should be applied to the
* entire channel rather than each consumer
* @throws java.io.IOException if an error is encountered
*/
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
prefetchSize:0,單條消息大小限制,0代表不限制 prefetchCount:一次性消費(fèi)的消息數(shù)量。會告訴 RabbitMQ 不要同時給一個消費(fèi)者推送多于 N 個消息,即一旦有 N 個消息還沒有 ack,則該 consumer 將 block 掉,直到有消息 ack。 global:true、false 是否將上面設(shè)置應(yīng)用于 channel,簡單點(diǎn)說,就是上面限制是 channel 級別的還是 consumer 級別。當(dāng)我們設(shè)置為 false 的時候生效,設(shè)置為 true 的時候沒有了限流功能,因?yàn)?channel 級別尚未實(shí)現(xiàn)。 注意:prefetchSize 和 global 這兩項(xiàng),rabbitmq 沒有實(shí)現(xiàn),暫且不研究。特別注意一點(diǎn),prefetchCount 在 no_ask=false 的情況下才生效,即在自動應(yīng)答的情況下這兩個值是不生效的。
3.如何對消費(fèi)端進(jìn)行限流
首先第一步,我們既然要使用消費(fèi)端限流,我們需要關(guān)閉自動 ack,將 autoAck 設(shè)置為 false channel.basicConsume(queueName, false, consumer);第二步我們來設(shè)置具體的限流大小以及數(shù)量。 channel.basicQos(0, 15, false);第三步在消費(fèi)者的 handleDelivery 消費(fèi)方法中手動 ack,并且設(shè)置批量處理 ack 回應(yīng)為 true channel.basicAck(envelope.getDeliveryTag(), true);
這是生產(chǎn)端代碼,與前幾章的生產(chǎn)端代碼沒有做任何改變,主要的操作集中在消費(fèi)端。RabbitMQ 系列面試題我都整理好了,關(guān)注公眾號Java技術(shù)棧,回復(fù):面試,免費(fèi)獲取哦。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class QosProducer {
public static void main(String[] args) throws Exception {
//1. 創(chuàng)建一個 ConnectionFactory 并進(jìn)行設(shè)置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//2. 通過連接工廠來創(chuàng)建連接
Connection connection = factory.newConnection();
//3. 通過 Connection 來創(chuàng)建 Channel
Channel channel = connection.createChannel();
//4. 聲明
String exchangeName = "test_qos_exchange";
String routingKey = "item.add";
//5. 發(fā)送
String msg = "this is qos msg";
for (int i = 0; i < 10; i++) {
String tem = msg + " : " + i;
channel.basicPublish(exchangeName, routingKey, null, tem.getBytes());
System.out.println("Send message : " + tem);
}
//6. 關(guān)閉連接
channel.close();
connection.close();
}
}
這里我們創(chuàng)建一個消費(fèi)者,通過以下代碼來驗(yàn)證限流效果以及 global 參數(shù)設(shè)置為 true 時不起作用。我們通過Thread.sleep(5000); 來讓 ack 即處理消息的過程慢一些,這樣我們就可以從后臺管理工具中清晰觀察到限流情況。
import com.rabbitmq.client.*;
import java.io.IOException;
public class QosConsumer {
public static void main(String[] args) throws Exception {
//1. 創(chuàng)建一個 ConnectionFactory 并進(jìn)行設(shè)置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
//2. 通過連接工廠來創(chuàng)建連接
Connection connection = factory.newConnection();
//3. 通過 Connection 來創(chuàng)建 Channel
final Channel channel = connection.createChannel();
//4. 聲明
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "item.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.basicQos(0, 3, false);
//一般不用代碼綁定,在管理界面手動綁定
channel.queueBind(queueName, exchangeName, routingKey);
//5. 創(chuàng)建消費(fèi)者并接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(body, "UTF-8");
System.out.println("[x] Received '" + message + "'");
channel.basicAck(envelope.getDeliveryTag(), true);
}
};
//6. 設(shè)置 Channel 消費(fèi)者綁定隊(duì)列
channel.basicConsume(queueName, false, consumer);
channel.basicConsume(queueName, false, consumer1);
}
}
我們從下圖中發(fā)現(xiàn) Unacked值一直都是 3 ,每過 5 秒 消費(fèi)一條消息即 Ready 和 Total 都減少 3,而 Unacked的值在這里代表消費(fèi)者正在處理的消息,通過我們的實(shí)驗(yàn)發(fā)現(xiàn)了消費(fèi)者一次性最多處理 3 條消息,達(dá)到了消費(fèi)者限流的預(yù)期功能。

當(dāng)我們將void basicQos(int prefetchSize, int prefetchCount, boolean global)中的 global 設(shè)置為 true的時候我們發(fā)現(xiàn)并沒有了限流的作用。
獲取更多優(yōu)質(zhì)文章,點(diǎn)擊關(guān)注
??????
