面試官:說(shuō)說(shuō)RabbitMQ的消費(fèi)端限流、TTL、死信隊(duì)列
你知道的越多,不知道的就越多,業(yè)余的像一棵小草!
你來(lái),我們一起精進(jìn)!你不來(lái),我和你的競(jìng)爭(zhēng)對(duì)手一起精進(jìn)!
編輯:業(yè)余草
cnblogs.com/Jacian/p/14212401.html
推薦:https://www.xttblog.com/?p=5160
關(guān)于消息隊(duì)列,我們不僅要會(huì)用,還要能說(shuō)出它的設(shè)計(jì)實(shí)現(xiàn)原理,這也是在面試中經(jīng)常被問(wèn)到的一些經(jīng)典面試題。下面通過(guò)本文,我們一起來(lái)重新認(rèn)識(shí)一下RabbitMQ的消費(fèi)端限流、TTL、死信隊(duì)列。
消費(fèi)端限流
1. 為什么要對(duì)消費(fèi)端限流
2.限流的 api 講解
/**
* Request specific "quality of service" settings.
* These settings impose limits on the amount of data the server
* will deliver to consumers before requiring acknowledgements.
* Thus they provide a means of consumer-initiated flow control.
* @param prefetchSize maximum amount of content (measured in
* octets) that the server will deliver, 0 if unlimited
* @param prefetchCount maximum number of messages that the server
* will deliver, 0 if unlimited
* @param global true if the settings should be applied to the
* entire channel rather than each consumer
* @throws java.io.IOException if an error is encountered
*/
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;prefetchSize:0,單條消息大小限制,0代表不限制 prefetchCount:一次性消費(fèi)的消息數(shù)量。會(huì)告訴 RabbitMQ 不要同時(shí)給一個(gè)消費(fèi)者推送多于 N 個(gè)消息,即一旦有 N 個(gè)消息還沒(méi)有 ack,則該 consumer 將 block 掉,直到有消息 ack。 global:true、false 是否將上面設(shè)置應(yīng)用于 channel,簡(jiǎn)單點(diǎn)說(shuō),就是上面限制是 channel 級(jí)別的還是 consumer 級(jí)別。當(dāng)我們?cè)O(shè)置為 false 的時(shí)候生效,設(shè)置為 true 的時(shí)候沒(méi)有了限流功能,因?yàn)?channel 級(jí)別尚未實(shí)現(xiàn)。 注意:prefetchSize 和 global 這兩項(xiàng),rabbitmq 沒(méi)有實(shí)現(xiàn),暫且不研究。特別注意一點(diǎn),prefetchCount 在 no_ask=false 的情況下才生效,即在自動(dòng)應(yīng)答的情況下這兩個(gè)值是不生效的。
3.如何對(duì)消費(fèi)端進(jìn)行限流
首先第一步,我們既然要使用消費(fèi)端限流,我們需要關(guān)閉自動(dòng) ack,將 autoAck 設(shè)置為 falsechannel.basicConsume(queueName, false, consumer); 第二步我們來(lái)設(shè)置具體的限流大小以及數(shù)量。channel.basicQos(0, 15, false); 第三步在消費(fèi)者的 handleDelivery 消費(fèi)方法中手動(dòng) ack,并且設(shè)置批量處理 ack 回應(yīng)為 truechannel.basicAck(envelope.getDeliveryTag(), true);
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class QosProducer {
public static void main(String[] args) throws Exception {
//1. 創(chuàng)建一個(gè) ConnectionFactory 并進(jìn)行設(shè)置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//2. 通過(guò)連接工廠來(lái)創(chuàng)建連接
Connection connection = factory.newConnection();
//3. 通過(guò) Connection 來(lái)創(chuàng)建 Channel
Channel channel = connection.createChannel();
//4. 聲明
String exchangeName = "test_qos_exchange";
String routingKey = "item.add";
//5. 發(fā)送
String msg = "this is qos msg";
for (int i = 0; i < 10; i++) {
String tem = msg + " : " + i;
channel.basicPublish(exchangeName, routingKey, null, tem.getBytes());
System.out.println("Send message : " + tem);
}
//6. 關(guān)閉連接
channel.close();
connection.close();
}
}import com.rabbitmq.client.*;
import java.io.IOException;
public class QosConsumer {
public static void main(String[] args) throws Exception {
//1. 創(chuàng)建一個(gè) ConnectionFactory 并進(jìn)行設(shè)置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
//2. 通過(guò)連接工廠來(lái)創(chuàng)建連接
Connection connection = factory.newConnection();
//3. 通過(guò) Connection 來(lái)創(chuàng)建 Channel
final Channel channel = connection.createChannel();
//4. 聲明
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "item.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.basicQos(0, 3, false);
//一般不用代碼綁定,在管理界面手動(dòng)綁定
channel.queueBind(queueName, exchangeName, routingKey);
//5. 創(chuàng)建消費(fèi)者并接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(body, "UTF-8");
System.out.println("[x] Received '" + message + "'");
channel.basicAck(envelope.getDeliveryTag(), true);
}
};
//6. 設(shè)置 Channel 消費(fèi)者綁定隊(duì)列
channel.basicConsume(queueName, false, consumer);
channel.basicConsume(queueName, false, consumer1);
}
}
TTL
RabbitMQ支持隊(duì)列的過(guò)期時(shí)間,從消息入隊(duì)列開(kāi)始計(jì)算,只要超過(guò)了隊(duì)列的超時(shí)時(shí)間配置,那么消息會(huì)自動(dòng)的清除。
1.消息的 TTL
/**
* deliverMode 設(shè)置為 2 的時(shí)候代表持久化消息
* expiration 意思是設(shè)置消息的有效期,超過(guò)10秒沒(méi)有被消費(fèi)者接收后會(huì)被自動(dòng)刪除
* headers 自定義的一些屬性
* */
//5. 發(fā)送
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("myhead1", "111");
headers.put("myhead2", "222");
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("100000")
.headers(headers)
.build();
String msg = "test message";
channel.basicPublish("", queueName, properties, msg.getBytes());
2.隊(duì)列的 TTL

死信隊(duì)列
a.消息被拒絕(basic.reject/ basic.nack)并且不再重新投遞 requeue=false b.TTL(time-to-live) 消息超時(shí)未消費(fèi) c.達(dá)到最大隊(duì)列長(zhǎng)度
實(shí)現(xiàn)死信隊(duì)列步驟
首先需要設(shè)置死信隊(duì)列的 exchange 和 queue,然后進(jìn)行綁定:
Exchange: dlx.exchange
Queue: dlx.queue
RoutingKey: # 代表接收所有路由 key然后我們進(jìn)行正常聲明交換機(jī)、隊(duì)列、綁定,只不過(guò)我們需要在普通隊(duì)列加上一個(gè)參數(shù)即可: arguments.put("x-dead-letter-exchange",' dlx.exchange' ) 這樣消息在過(guò)期、requeue失敗、 隊(duì)列在達(dá)到最大長(zhǎng)度時(shí),消息就可以直接路由到死信隊(duì)列!
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class DlxProducer {
public static void main(String[] args) throws Exception {
//設(shè)置連接以及創(chuàng)建 channel 湖綠
String exchangeName = "test_dlx_exchange";
String routingKey = "item.update";
String msg = "this is dlx msg";
//我們?cè)O(shè)置消息過(guò)期時(shí)間,10秒后再消費(fèi) 讓消息進(jìn)入死信隊(duì)列
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.expiration("10000")
.build();
channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());
System.out.println("Send message : " + msg);
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class DlxConsumer {
public static void main(String[] args) throws Exception {
//創(chuàng)建連接、創(chuàng)建channel忽略 內(nèi)容可以在上面代碼中獲取
String exchangeName = "test_dlx_exchange";
String queueName = "test_dlx_queue";
String routingKey = "item.#";
//必須設(shè)置參數(shù)到 arguments 中
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "dlx.exchange");
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
//將 arguments 放入隊(duì)列的聲明中
channel.queueDeclare(queueName, true, false, false, arguments);
//一般不用代碼綁定,在管理界面手動(dòng)綁定
channel.queueBind(queueName, exchangeName, routingKey);
//聲明死信隊(duì)列
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
//路由鍵為 # 代表可以路由到所有消息
channel.queueBind("dlx.queue", "dlx.exchange", "#");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//6. 設(shè)置 Channel 消費(fèi)者綁定隊(duì)列
channel.basicConsume(queueName, true, consumer);
}
}總結(jié)
評(píng)論
圖片
表情
