MQ的路由模式(direct)| RabbitMQ系列
前言
模擬一個場景
現(xiàn)在有三種級別的日志 info、wearing、error
現(xiàn)在想把error日志給保存到本地, info和wearing直接丟棄掉(就是直接消費,不作任何處理,不消費也不行呀,就形成積壓了)
這樣一想,我們是不是可以定義一個消費者綁定專門處理保存error日志,另一個消費者綁定info和wearing直接消費,不作任何處理
一、生產(chǎn)者
public static void publishMessageIndividually() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(ChangeNameConstant.DIRECT_MODEL, BuiltinExchangeType.DIRECT);
//創(chuàng)建多個 bindingKey
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("info","普通 info 信息");
bindingKeyMap.put("warning","警告 warning 信息");
bindingKeyMap.put("error","錯誤 error 信息");
//debug 沒有消費這接收這個消息 所有就丟失了
bindingKeyMap.put("debug","調(diào)試 debug 信息");
for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(ChangeNameConstant.DIRECT_MODEL,bindingKey, null,
message.getBytes("UTF-8"));
System.out.println("生產(chǎn)者發(fā)出消息:" + message);
}
}
復制代碼
可以看到:direct_pattern交換機上設(shè)置了三個路由
二、消費者
消費者A
/**
* 這是一個測試的消費者
*@author DingYongJun
*@date 2021/8/1
*/
public class DyConsumerTest_direct01 {
public static void main(String[] args) throws Exception{
//使用工具類來創(chuàng)建通道
Channel channel = RabbitMqUtils.getChannel();
String queueName = "disk";
channel.queueDeclare(queueName, false, false, false, null);
//這個專門處理error日志,將其保存至本地
channel.queueBind(queueName, ChangeNameConstant.DIRECT_MODEL, "error");
System.out.println("A等待接收消息,把接收到的消息打印在屏幕.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(message+"已經(jīng)保存到本地啦");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
System.out.println("消息中斷了~");
});
}
}
復制代碼消費者B
/**
* 這是一個測試的消費者
*@author DingYongJun
*@date 2021/8/1
*/
public class DyConsumerTest_direct02 {
public static void main(String[] args) throws Exception{
//使用工具類來創(chuàng)建通道
Channel channel = RabbitMqUtils.getChannel();
String queueName = "console";
channel.queueDeclare(queueName, false, false, false, null);
//這個專門處理error日志,將其保存至本地
channel.queueBind(queueName, ChangeNameConstant.DIRECT_MODEL, "warning");
channel.queueBind(queueName, ChangeNameConstant.DIRECT_MODEL, "info");
System.out.println("B等待接收消息,把接收到的消息打印在屏幕.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(message+"已經(jīng)消費完并丟棄了");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
System.out.println("消息中斷了~");
});
}
}
復制代碼
消費者AB都已準備好。
執(zhí)行結(jié)果
生產(chǎn)者

消費者A

消費者B

完美符合我們模擬的場景需求!vnice!
三、總結(jié)

多重綁定
在這種綁定情況下,生產(chǎn)者發(fā)布消息到 exchange 上,綁定鍵為 orange 的消息會被發(fā)布到隊列 Q1。
綁定鍵為 black、green 和的消息會被發(fā)布到隊列 Q2,其他消息類型的消息將被丟棄。
是不是比發(fā)布訂閱模式更加智能了呢?
當然如果 exchange 的綁定類型是 direct,但是它綁定的多個隊列的 key 如果都相同。
在這種情 況下雖然綁定類型是 direct 但是它表現(xiàn)的就和 fanout 有點類似了,就跟廣播差不多。
也就是這玩意是復雜模式可以向下兼容簡單模式!
路漫漫其修遠兮,吾必將上下求索~
如果你認為i博主寫的不錯!寫作不易,請點贊、關(guān)注、評論給博主一個鼓勵吧~hahah
作者:大魚丶
鏈接:https://juejin.cn/post/6995709405085827108
來源:掘金
著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。
