RabbitMQ五種工作模式
點擊上方藍色字體,選擇“標(biāo)星公眾號”
優(yōu)質(zhì)文章,第一時間送達
? 作者?|? howard4
來源 |? urlify.cn/UB3miu
在SpringBoot環(huán)境下做的代碼測試,RabbitMQ的包是用SpringBoot的starter-amqp包引入的。
1、簡單隊列

一個生產(chǎn)者對應(yīng)一個消費者?。?!
1、pom文件
SpringBoot導(dǎo)入rabbitmq 啟動包
????org.springframework.boot
????spring-boot-starter-amqp
2、工具類
/**
?*?〈簡述〉
?*?〈連接RabbitMQ的工具類〉
?*
?*?@create?2020/7/1
?*?@since?1.0.0
?*/
public?class?ConnectionUtil?{
????public?static?Connection?getConnection()?throws?Exception?{
????????return?getConnection(new?Properties());
????}
????private?static?Connection?getConnection(Properties?properties)?throws?Exception?{
????????return?getConnection(properties.getHost(),
????????????????properties.getPort(),
????????????????properties.getvHost(),
????????????????properties.getUserName(),
????????????????properties.getPassWord());
????}
????public?static?Connection?getConnection(String?host,?int?port,?String?vHost,?String?userName,?String?passWord)?throws?Exception?{
????????//1、定義連接工廠
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????//2、設(shè)置服務(wù)器地址
????????factory.setHost(host);
????????//3、設(shè)置端口
????????factory.setPort(port);
????????//4、設(shè)置虛擬主機、用戶名、密碼
????????factory.setVirtualHost(vHost);
????????factory.setUsername(userName);
????????factory.setPassword(passWord);
????????//5、通過連接工廠獲取連接
????????Connection?connection?=?factory.newConnection();
????????return?connection;
????}
????public?static?class?Properties?implements?Serializable?{
????????String?host?=?"192.168.1.103";
????????int?port?=?5672;
????????String?vHost?=??"/";
????????String?userName?=?"guest";
????????String?passWord?=?"guest";
????????public?Properties()?{
????????}
????????public?Properties(String?host,?int?port,?String?vHost,?String?userName,?String?passWord)?{
????????????this.host?=?host;
????????????this.port?=?port;
????????????this.vHost?=?vHost;
????????????this.userName?=?userName;
????????????this.passWord?=?passWord;
????????}
????????public?String?getHost()?{
????????????return?host;
????????}
????????public?Properties?setHost(String?host)?{
????????????this.host?=?host;
????????????return?self();
????????}
????????public?int?getPort()?{
????????????return?port;
????????}
????????public?Properties?setPort(int?port)?{
????????????this.port?=?port;
????????????return?self();
????????}
????????public?String?getvHost()?{
????????????return?vHost;
????????}
????????public?Properties?setvHost(String?vHost)?{
????????????this.vHost?=?vHost;
????????????return?self();
????????}
????????public?String?getUserName()?{
????????????return?userName;
????????}
????????public?Properties?setUserName(String?userName)?{
????????????this.userName?=?userName;
????????????return?self();
????????}
????????public?String?getPassWord()?{
????????????return?passWord;
????????}
????????public?Properties?setPassWord(String?passWord)?{
????????????this.passWord?=?passWord;
????????????return?self();
????????}
????????private?Properties?self(){
????????????return?this;
????????}
????}
}
3、生產(chǎn)者 Producer
/**
?*?〈簡述〉
?
?*?〈簡單隊列——消息生產(chǎn)者〉
?*
?*?@create?2020/7/1
?*?@since?1.0.0
?*/
public?class?Producer?{
????private?final?static?String?QUEUE_NAME?=?QueueName.test_simple_queue.toString();
????public?static?void?main(String[]?args)?throws?Exception?{
????????sendMessage();
????}
????public?static?void?sendMessage()?throws?Exception?{
????????//1、獲取連接
????????Connection?connection?=?ConnectionUtil.getConnection();
????????//2、聲明信道
????????Channel?channel?=?connection.createChannel();
????????//3、聲明(創(chuàng)建)隊列
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????//4、定義消息內(nèi)容
????????String?message?=?"hello?rabbitmq?";
????????//5、發(fā)布消息
????????channel.basicPublish("",?QUEUE_NAME,?null,?message.getBytes());
????????System.out.println("[x]?Sent'"?+?message?+?"'");
????????//6、關(guān)閉通道
????????channel.close();
????????//7、關(guān)閉連接
????????connection.close();
????}
}
4、消費者Consumer
/**
?*?〈簡述〉
?*?〈消息消費者〉
?*
?*?@create?2020/7/1
?*?@since?1.0.0
?*/
public?class?Customer?{
????private?final?static?String?QUEUE_NAME?=?QueueName.test_simple_queue.toString();
????public?static?void?main(String[]?args)?throws?Exception?{
????????getMessage();
????}
????public?static?void?getMessage()?throws?Exception?{
????????//1、獲取連接
????????Connection?connection?=?ConnectionUtil.getConnection();
????????//2、聲明通道
????????Channel?channel?=?connection.createChannel();
????????//3、聲明隊列
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????//4、定義隊列的消費者
????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,
???????????????????????????????????????AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?msgString?=?new?String(body,?"utf-8");
????????????????System.out.println("接收的消息:"?+?msgString);
????????????}
????????};
????????//5、監(jiān)聽隊列
?????/*
???true:表示自動確認,只要消息從隊列中獲取,無論消費者獲取到消息后是否成功消費,都會認為消息已經(jīng)成功消費
???false:表示手動確認,消費者獲取消息后,服務(wù)器會將該消息標(biāo)記為不可用狀態(tài),等待消費者的反饋,
????如果消費者一直沒有反饋,那么該消息將一直處于不可用狀態(tài),并且服務(wù)器會認為該消費者已經(jīng)掛掉,不會再給其
????發(fā)送消息,直到該消費者反饋。
???*/
????????channel.basicConsume(QUEUE_NAME,?true,?consumer);
????}
}
注意這里消費者有自動確認消息和手動確認消息兩種模式。
2、work 模式

一個生產(chǎn)者對應(yīng)多個消費者,但是一條消息只能有一個消費者獲得消息?。?!
2.1輪詢分發(fā)
1、生產(chǎn)者
/**
?*?〈簡述〉
?*?〈輪詢分發(fā)——生產(chǎn)者〉
?*
?*?@create?2020/7/3
?*?@since?1.0.0
?*/
public?class?Send?{
????private?static?final?String?QUEUE_NAME?=?QueueName.test_work_queue.toString();
????public?static?void?main(String?args[])?throws?Exception?{
????????//獲取連接
????????Connection?connection?=?ConnectionUtil.getConnection();
????????//獲取channel
????????Channel?channel?=?connection.createChannel();
????????//聲明隊列
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????for?(int?i?=?0;?i?50;?i++)?{
????????????String?msg?=?"hello?"?+?i;
????????????System.out.println("[mq]?send:"?+?msg);
????????????channel.basicPublish("",?QUEUE_NAME,?null,?msg.getBytes());
????????????Thread.sleep(i?*?20);
????????}
????????channel.close();
????????connection.close();
????}
}
2、消費者
這里創(chuàng)建兩個消費者
消費者1:每接收一條消息后休眠1秒
/**
?*?〈簡述〉
?*?〈接收者〉
?*
?*?@create?2020/7/3
?*?@since?1.0.0
?*/
public?class?Receive1?{
????private?static?final?String?QUEUE_NAME?=?QueueName.test_work_queue.toString();
????public?static?void?main(String?args[])?throws?Exception?{
????????//獲取連接
????????Connection?connection?=?ConnectionUtil.getConnection();
????????//獲取channel、
????????Channel?channel?=?connection.createChannel();
????????//聲明隊列
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????//定義一個消費這
????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?msg?=?new?String(body,?"utf-8");
????????????????System.out.println("[1]?Receive1?msg:"?+?msg);
????????????????try?{
????????????????????Thread.sleep(1000);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}?finally?{
????????????????????System.out.println("[1]?done");
????????????????}
????????????}
????????};
????????boolean?autoAck?=?false;
????????channel.basicConsume(QUEUE_NAME,?autoAck,?consumer);
????}
}
消費者2:每接收一條消息后休眠2秒
/**
?*?〈簡述〉
?*?〈接收者〉
?*
?*?@create?2020/7/3
?*?@since?1.0.0
?*/
public?class?Receive2?{
????private?static?final?String?QUEUE_NAME?=?QueueName.test_work_queue.toString();
????public?static?void?main(String?args[])?throws?Exception?{
????????//獲取連接
????????Connection?connection?=?ConnectionUtil.getConnection();
????????//獲取channel
????????Channel?channel?=?connection.createChannel();
????????//聲明隊列
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????//定義一個消費這
????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?msg?=?new?String(body,?"utf-8");
????????????????System.out.println("[2]?Receive2?msg:"?+?msg);
????????????????try?{
????????????????????Thread.sleep(2000);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}?finally?{
????????????????????System.out.println("[2]?done");
????????????????}
????????????}
????????};
????????boolean?autoAck?=?false;
????????channel.basicConsume(QUEUE_NAME,?autoAck,?consumer);
????}
}
3、測試結(jié)果
生產(chǎn)者從0-49依次發(fā)送消息

消費者1:接收到偶數(shù)

消費者2:接收到奇數(shù)

4、結(jié)論
輪詢分發(fā)就是將消息隊列中的消息,依次發(fā)送給所有消費者。一個消息只能被一個消費者獲取。
2.2公平分發(fā)
消費者關(guān)閉自動應(yīng)答,開啟手動回執(zhí)
/**
?*?〈簡述〉
?*?〈接收者〉
?*
?*?@create?2020/7/3
?*?@since?1.0.0
?*/
public?class?Receive2?{
????private?static?final?String?QUEUE_NAME?=?QueueName.test_work_queue.toString();
????public?static?void?main(String?args[])?throws?Exception?{
????????//獲取連接
????????Connection?connection?=?ConnectionUtil.getConnection();
????????//獲取channel
????????Channel?channel?=?connection.createChannel();
????????//聲明隊列
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????channel.basicQos(1);//保證一次只分發(fā)一個消息
????????//定義一個消費這
????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?msg?=?new?String(body,?"utf-8");
????????????????System.out.println("[2]?Receive2?msg:"?+?msg);
????????????????try?{
????????????????????Thread.sleep(2000);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}?finally?{
????????????????????System.out.println("[2]?done");
????????????????????//手動回執(zhí)
????????????????????channel.basicAck(envelope.getDeliveryTag(),false);
????????????????}
????????????}
????????};
????????boolean?autoAck?=?false;//自動應(yīng)答
????????channel.basicConsume(QUEUE_NAME,?autoAck,?consumer);
????}
}
手動回執(zhí):消費者完成業(yè)務(wù)接口方法后可以告知消息隊列處理完成,消息隊列從隊列中取一條消息發(fā)送給消費者。


能者多勞:效率高的消費者消費消息多。
3、發(fā)布/訂閱模式

一個消費者將消息首先發(fā)送到交換器,交換器綁定到多個隊列,然后被監(jiān)聽該隊列的消費者所接收并消費。
ps:X表示交換器,在RabbitMQ中,交換器主要有四種類型:direct、fanout、topic、headers,這里的交換器是 fanout。下面我們會詳細介紹這幾種交換器。
1、生產(chǎn)者
/**
?*?〈簡述〉
?*?〈訂閱模式——生產(chǎn)者〉
?*
?*?@create?2020/7/3
?*?@since?1.0.0
?*/
public?class?Send?{
????private?static?final?String?EXCHANGE_NAME?=?MqName.exchange_fanout.toString();
????public?static?void?main(String?args[])?throws?Exception?{
????????//獲取連接
????????Connection?connection?=?ConnectionUtil.getConnection();
????????//獲取channel
????????Channel?channel?=?connection.createChannel();
????????//聲明交換機
????????channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//分發(fā)
????????//發(fā)送消息
????????String?msg?=?"hello?exchange";
????????System.out.println("[mq]?send:"?+?msg);
????????channel.basicPublish(EXCHANGE_NAME,?"",?null,?msg.getBytes());
????????channel.close();
????????connection.close();
????}
}
2、消費者
注意:兩個消費者綁定不同的隊列,綁定相同的交換機;
消費者1:綁定隊列名=queue_fanout_email1
/**
?*?〈簡述〉
?*?〈接收者〉
?*
?*?@create?2020/7/3
?*?@since?1.0.0
?*/
public?class?Receive1?{
????private?static?final?String?QUEUE_NAME?=?MqName.queue_fanout_email.toString()?+?"1";
????private?static?final?String?EXCHANGE_NAME?=?MqName.exchange_fanout.toString();
????public?static?void?main(String?args[])?throws?Exception?{
????????//獲取連接
????????Connection?connection?=?ConnectionUtil.getConnection();
????????//獲取channel
????????Channel?channel?=?connection.createChannel();
????????//聲明隊列
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????//綁定到交換機
????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"");
????????//定義一個消費這
????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
????????????/**
?????????????*?No-op?implementation?of?{@link?Consumer#handleDelivery}.
?????????????*
?????????????*?@param?consumerTag
?????????????*?@param?envelope
?????????????*?@param?properties
?????????????*?@param?body
?????????????*/
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?msg?=?new?String(body,?"utf-8");
????????????????System.out.println("[1]?Receive1?msg:"?+?msg);
????????????????try?{
????????????????????Thread.sleep(1000);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}?finally?{
????????????????????System.out.println("[1]?done");
????????????????}
????????????}
????????};
????????boolean?autoAck?=?false;
????????channel.basicConsume(QUEUE_NAME,?autoAck,?consumer);
????}
}
消費者2:綁定隊列名=queue_fanout_email2
/**
?*?〈簡述〉
?*?〈接收者〉
?*
?*?@create?2020/7/3
?*?@since?1.0.0
?*/
public?class?Receive2?{
????private?static?final?String?QUEUE_NAME?=?MqName.queue_fanout_email.toString()?+?"2";
????private?static?final?String?EXCHANGE_NAME?=?MqName.exchange_fanout.toString();
????public?static?void?main(String?args[])?throws?Exception?{
????????//獲取連接
????????Connection?connection?=?ConnectionUtil.getConnection();
????????//獲取channel
????????Channel?channel?=?connection.createChannel();
????????//聲明隊列
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????//綁定到交換機
????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"");
????????//定義一個消費這
????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
????????????/**
?????????????*?No-op?implementation?of?{@link?Consumer#handleDelivery}.
?????????????*
?????????????*?@param?consumerTag
?????????????*?@param?envelope
?????????????*?@param?properties
?????????????*?@param?body
?????????????*/
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?msg?=?new?String(body,?"utf-8");
????????????????System.out.println("[2]?Receive2?msg:"?+?msg);
????????????????try?{
????????????????????Thread.sleep(2000);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}?finally?{
????????????????????System.out.println("[2]?done");
????????????????}
????????????}
????????};
????????boolean?autoAck?=?false;
????????channel.basicConsume(QUEUE_NAME,?autoAck,?consumer);
????}
}
3、測試結(jié)果


如上圖,兩個消費者獲得了同一條消息。即就是,一個消息從交換機同時發(fā)送給了兩個隊列中,監(jiān)聽這兩個隊列的消費者消費了這個消息;
如果沒有隊列綁定交換機,則消息將丟失。因為交換機沒有存儲能力,消息只能存儲在隊列中。
4、路由模式

生產(chǎn)者將消息發(fā)送到direct交換器,在綁定隊列和交換器的時候有一個路由key,生產(chǎn)者發(fā)送的消息會指定一個路由key,那么消息只會發(fā)送到相應(yīng)key相同的隊列,接著監(jiān)聽該隊列的消費者消費消息。
也就是讓消費者有選擇性的接收消息。
1、生產(chǎn)者
/**
?*?〈簡述〉
?
?*?〈路由模式-消息發(fā)送者〉
?*
?*?@create?2020/7/8
?*?@since?1.0.0
?*/
public?class?Send?{
????public?static?final?String?EXCHANGE_NAME?=?MqName.exchange_routing.toString();
????public?static?final?String?ROUTING_KEY?=?MqName.routing_world.toString();
????public?static?void?main(String?args[])?throws?Exception?{
????????//獲取連接
????????Connection?connection?=?ConnectionUtil.getConnection();
????????//獲取channel
????????Channel?channel?=?connection.createChannel();
????????//聲明交換機
????????channel.exchangeDeclare(EXCHANGE_NAME,?"direct");
????????String?msg?=?"route?message?->"?+?ROUTING_KEY;
????????System.out.println("對?"?+?ROUTING_KEY?+?"?發(fā)送消息:"?+?msg);
????????channel.basicPublish(EXCHANGE_NAME,?ROUTING_KEY,?null,?msg.getBytes());
????????//關(guān)閉連接
????????channel.close();
????????connection.close();
????}
}
2、消費者
注意:兩個消費者,綁定相同的交換機,不同的隊列,不一樣的路由
消費者1:路由=routing_hello
/**
?*?〈簡述〉
?*?〈接收消息1〉
?*
?*?@create?2020/7/8
?*?@since?1.0.0
?*/
public?class?Receive1?{
????public?static?final?String?QUEUE_NAME?=?MqName.queue_routing_001.toString();
????public?static?final?String?EXCHANGE_NAME?=?MqName.exchange_routing.toString();
????public?static?final?String?ROUTING_KEY_hello?=?MqName.routing_hello.toString();
????public?static?void?main(String?args[])?throws?Exception?{
????????//獲取連接
????????Connection?connection?=?ConnectionUtil.getConnection();
????????//獲取channel
????????Channel?channel?=?connection.createChannel();
????????//聲明隊列
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????//設(shè)置預(yù)讀取數(shù)
????????channel.basicQos(1);
????????//綁定交換機
????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?ROUTING_KEY_hello);
????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
????????????/**
?????????????*?No-op?implementation?of?{@link?Consumer#handleDelivery}.
?????????????*
?????????????*?@param?consumerTag
?????????????*?@param?envelope
?????????????*?@param?properties
?????????????*?@param?body
?????????????*/
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?msg?=?new?String(body,?"utf-8");
????????????????System.out.println(envelope.getRoutingKey()?+?"?[1]?Receive1?msg:"?+?msg);
????????????}
????????};
????????channel.basicConsume(QUEUE_NAME,?true,?consumer);
????}
}
消費者2:路由1=routing_world 路由2=routing_hello
/**
?*?〈簡述〉
?*?〈接收消息2〉
?*
?*?@create?2020/7/8
?*?@since?1.0.0
?*/
public?class?Receive2?{
????public?static?final?String?QUEUE_NAME?=?MqName.queue_routing_002.toString();
????public?static?final?String?EXCHANGE_NAME?=?MqName.exchange_routing.toString();
????public?static?final?String?ROUTING_KEY_world?=?MqName.routing_world.toString();
????public?static?final?String?ROUTING_KEY_hello?=?MqName.routing_hello.toString();
????public?static?void?main(String?args[])?throws?Exception?{
????????//獲取連接
????????Connection?connection?=?ConnectionUtil.getConnection();
????????//獲取channel
????????Channel?channel?=?connection.createChannel();
????????//聲明隊列
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????//設(shè)置預(yù)讀取數(shù)
????????channel.basicQos(1);
????????//綁定交換機和路由器,可以綁定多個路由
????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?ROUTING_KEY_world);
????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?ROUTING_KEY_hello);
????????//定義消息消費者
????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
????????????/**
?????????????*?No-op?implementation?of?{@link?Consumer#handleDelivery}.
?????????????*
?????????????*?@param?consumerTag
?????????????*?@param?envelope
?????????????*?@param?properties
?????????????*?@param?body
?????????????*/
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?msg?=?new?String(body,?"utf-8");
????????????????System.out.println(envelope.getRoutingKey()?+?"?[2]?Receive1?msg:"?+?msg);
????????????}
????????};
????????//接收消息
????????channel.basicConsume(QUEUE_NAME,?true,?consumer);
????}
}
3、測試結(jié)果
生產(chǎn)者發(fā)送:routing_world
消費者1:沒有接收到
消費者2:接收到了
生產(chǎn)者發(fā)送:routing_hello
消費者1:接收到了
消費者2:接收到了
路由模式,是以路由規(guī)則為導(dǎo)向,引導(dǎo)消息存入符合規(guī)則的隊列中。再由隊列的消費者進行消費的。
5、主題模式

上面的路由模式是根據(jù)路由key進行完整的匹配(完全相等才發(fā)送消息),這里的通配符模式通俗的來講就是模糊匹配。
符號“#”表示匹配一個或多個詞,符號“*”表示匹配一個詞。
1、生產(chǎn)者
/**
?*?〈簡述〉
?
?*?〈主題模式-消息發(fā)送者〉
?*
?*?@create?2020/7/8
?*?@since?1.0.0
?*/
public?class?Send?{
????public?static?final?String?EXCHANGE_NAME?=?MqName.exchange_topic.toString();
????public?static?final?String?ROUTING_KEY?=?MqName.routing_goods.toString();
????public?static?void?main(String?args[])?throws?Exception?{
????????//獲取連接
????????Connection?connection?=?ConnectionUtil.getConnection();
????????//獲取channel
????????Channel?channel?=?connection.createChannel();
????????//聲明交換機
????????channel.exchangeDeclare(EXCHANGE_NAME,?"topic");
//????????String?routingKey?=?ROUTING_KEY?+?".add";
????????String?routingKey?=?ROUTING_KEY?+?".publish";
//????????String?routingKey?=?ROUTING_KEY?+?".update";
????????String?msg?=?"route?message?->"?+?routingKey;
????????channel.basicPublish(EXCHANGE_NAME,?routingKey,?null,?msg.getBytes());
????????System.out.println("對?"?+?routingKey?+?"?發(fā)送消息:"?+?msg);
????????//關(guān)閉連接
????????channel.close();
????????connection.close();
????}
}
2、消費者
注意兩個消費者,路由的不同
消費者1:路由1=routing_goods.add 路由2=routing_goods.update
/**
?*?〈簡述〉
?*?〈接收消息1〉
?*
?*?@create?2020/7/8
?*?@since?1.0.0
?*/
public?class?Receive1?{
????public?static?final?String?QUEUE_NAME?=?MqName.queue_topic_001.toString();
????public?static?final?String?EXCHANGE_NAME?=?MqName.exchange_topic.toString();
????public?static?final?String?ROUTING_KEY?=?MqName.routing_goods.toString();
????public?static?void?main(String?args[])?throws?Exception?{
????????//獲取連接
????????Connection?connection?=?ConnectionUtil.getConnection();
????????//獲取channel
????????Channel?channel?=?connection.createChannel();
????????//聲明隊列
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????//設(shè)置預(yù)讀取數(shù)
????????channel.basicQos(1);
????????//綁定交換機
????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?ROUTING_KEY?+?".add");
????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?ROUTING_KEY?+?".update");
????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
????????????/**
?????????????*?No-op?implementation?of?{@link?Consumer#handleDelivery}.
?????????????*
?????????????*?@param?consumerTag
?????????????*?@param?envelope
?????????????*?@param?properties
?????????????*?@param?body
?????????????*/
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?msg?=?new?String(body,?"utf-8");
????????????????System.out.println(envelope.getRoutingKey()?+?"?[1]?Receive1?msg:"?+?msg);
????????????}
????????};
????????channel.basicConsume(QUEUE_NAME,?true,?consumer);
????}
}
消費者2:路由=routing_goods.*
/**
?*?〈簡述〉
?*?〈接收消息2〉
?*
?*?@create?2020/7/8
?*?@since?1.0.0
?*/
public?class?Receive2?{
????public?static?final?String?QUEUE_NAME?=?MqName.queue_routing_002.toString();
????public?static?final?String?EXCHANGE_NAME?=?MqName.exchange_topic.toString();
????public?static?final?String?ROUTING_KEY?=?MqName.routing_goods.toString();
????public?static?void?main(String?args[])?throws?Exception?{
????????//獲取連接
????????Connection?connection?=?ConnectionUtil.getConnection();
????????//獲取channel
????????Channel?channel?=?connection.createChannel();
????????//聲明隊列
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????//設(shè)置預(yù)讀取數(shù)
????????channel.basicQos(1);
????????//綁定交換機和路由器,可以綁定多個路由
????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?ROUTING_KEY?+?".*");
????????//定義消息消費者
????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
????????????/**
?????????????*?No-op?implementation?of?{@link?Consumer#handleDelivery}.
?????????????*
?????????????*?@param?consumerTag
?????????????*?@param?envelope
?????????????*?@param?properties
?????????????*?@param?body
?????????????*/
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?msg?=?new?String(body,?"utf-8");
????????????????System.out.println(envelope.getRoutingKey()?+?"?[2]?Receive1?msg:"?+?msg);
????????????}
????????};
????????//接收消息
????????channel.basicConsume(QUEUE_NAME,?true,?consumer);
????}
}
3、測試結(jié)果
消費者1只能接收到.add 和 .update的消息
消費者2可以接收到.add .publish 和 .update的消息
與路由模式相似,但是,主題模式是一種模糊的匹配方式。
6.工作模式總結(jié)
這五種工作模式,可以歸為三類:
生產(chǎn)者,消息隊列,一個消費者;
生產(chǎn)者,消息隊列,多個消費者;
生產(chǎn)者,交換機,多個消息隊列,多個消費者;
7、四種交換器
1、direct 如果路由鍵完全匹配的話,消息才會被投放到相應(yīng)的隊列?!?/span>
2、fanout 當(dāng)發(fā)送一條消息到fanout交換器上時,它會把消息投放到所有附加在此交換器上的隊列。
3、topic 設(shè)置模糊的綁定方式,“*”操作符將“.”視為分隔符,匹配單個字符;“#”操作符沒有分塊的概念,它將任意“.”均視為關(guān)鍵字的匹配部分,能夠匹配多個字符。
?4、header headers 交換器允許匹配 AMQP 消息的 header 而非路由鍵,除此之外,header 交換器和 direct 交換器完全一致,但是性能卻差很多,因此基本上不會用到該交換器
粉絲福利:Java從入門到入土學(xué)習(xí)路線圖
??????

??長按上方微信二維碼?2 秒
感謝點贊支持下哈?
