消息隊(duì)列之RabbitMQ
點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號”
優(yōu)質(zhì)文章,第一時間送達(dá)
? 作者?|? pluto_charon
來源 |? urlify.cn/VFrYvm
76套java從入門到精通實(shí)戰(zhàn)課程分享
1.rabbitMQ介紹
rabbitMQ是由erlang語言開發(fā)的,基于AMQP協(xié)議實(shí)現(xiàn)的消息隊(duì)列。他是一種應(yīng)用程序之間的通信方法,在分布式系統(tǒng)開發(fā)中應(yīng)用非常廣泛。
rabbitMq的有點(diǎn):
使用簡單,功能強(qiáng)大
基于AMQP協(xié)議
社區(qū)活躍,文檔完善
高并發(fā)性能好,erlang語言是專門用于開發(fā)高并發(fā)程序的
springBoot默認(rèn)集成rabbitMq
AMQP(advanced Message Queuing Protocol),是一個提供統(tǒng)一消息服務(wù)的應(yīng)用標(biāo)準(zhǔn)高級消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì),基于此協(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端中間件的產(chǎn)品不同和開發(fā)語言不同的限制。JMS和AMQP的區(qū)別在于:JMS是java語言專屬的消息服務(wù)標(biāo)準(zhǔn),他是在api層定義標(biāo)準(zhǔn),并且只能用于java應(yīng)用,而AMQP是在協(xié)議層定義的標(biāo)準(zhǔn),是可以跨語言的。
2.工作流程
發(fā)送消息:
生產(chǎn)者和broker建立TCP連接
生產(chǎn)者和broker建立通道
生產(chǎn)者通過通道消息發(fā)送給broker,由exchange將消息轉(zhuǎn)發(fā)
exchange將消息轉(zhuǎn)發(fā)給指定的queue
接受消息:
消費(fèi)者和broker建立TCP連接
消費(fèi)者和broker建立通道
消費(fèi)者監(jiān)聽指定的queue
當(dāng)有消息到達(dá)queue的時候broker默認(rèn)將消息推送給消費(fèi)者
消費(fèi)者接受到消息并消費(fèi)
3.安裝

如果不想自己下載,需要我這里的軟件的,可以在下面評論郵箱,我私發(fā)給你。
1.安裝erlang的環(huán)境,雙擊otp的運(yùn)行程序,然后一路點(diǎn)擊下一步(next)。

配置環(huán)境變量

在path中添加erlang的路徑

2.安裝rabbitMq,雙擊rabbitmq的運(yùn)行程序


安裝完成之后在菜單頁面可以看到

安裝完RabbitMQ如果想要訪問管理頁面需要在rabbitmq的sbin目錄中使用cmd執(zhí)行:rabbitmq-plugins.bat enable rabbitmq_management(管理員身份運(yùn)行此命令)添加可視化插件。
點(diǎn)擊上圖中的start/stop來開啟/停止服務(wù)。然后在瀏覽器上輸入地址查看,rabbitMq的默認(rèn)端口是15672。默認(rèn)的用戶名和密碼都是guest

如果安裝失敗,需要卸載重裝的時候或者出現(xiàn)rabbitMq服務(wù)注冊失敗時,此時需要進(jìn)入注冊表清理erlang(搜索rabbitMQ,erlsrv將對應(yīng)的項(xiàng)刪除)
4.代碼實(shí)現(xiàn)
1.添加依賴
?com.rabbitmq
?amqp-client
?5.7.3
2.生產(chǎn)者代碼實(shí)現(xiàn)
package?rabbitmq;
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.rabbitmq.client.ConnectionFactory;
import?java.io.IOException;
import?java.net.ConnectException;
import?java.util.concurrent.TimeoutException;
/**
?*?@className:?producer
?*?@description:?rabbitmq的生產(chǎn)者代碼實(shí)現(xiàn)
?*?@author:?charon
?*?@create:?2021-01-03?23:10
?*/
public?class?Producer?{
????/**
?????*?聲明隊(duì)列名
?????*/
????private?static?final?String?QUEUE?=?"hello?charon";
????public?static?void?main(String[]?args)?{
????????//?創(chuàng)建連接工廠
????????ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
????????//?設(shè)置ip,端口,因?yàn)槭潜緳C(jī),所以直接設(shè)置為127.0.0.1
????????connectionFactory.setHost("127.0.0.1");
????????//?web端口默認(rèn)為15672,通信端口為5672
????????connectionFactory.setPort(5672);
????????//?設(shè)置用戶名和密碼
????????connectionFactory.setUsername("guest");
????????connectionFactory.setPassword("guest");
????????//?設(shè)置虛擬ip,默認(rèn)為/,一個rabbitmq的服務(wù)可以設(shè)置多個虛擬機(jī),每個虛擬機(jī)就相當(dāng)于一個獨(dú)立的mq
????????connectionFactory.setVirtualHost("/");
????????Connection?connection?=?null;
????????Channel?channel?=?null;
????????try?{
????????????connection?=?connectionFactory.newConnection();
????????????//?創(chuàng)建通道
????????????channel?=?connection.createChannel();
????????????//?聲明隊(duì)列(隊(duì)列名稱,是否持久化,是否排它,是否自動刪除,隊(duì)列的擴(kuò)展參數(shù)比如設(shè)置存活時間等)
????????????channel.queueDeclare(QUEUE,?true,?false,?false,?null);
????????????String?message?=?"hello?charon?good?evening";
????????????//?發(fā)布消息(交換機(jī),RoutingKey即隊(duì)列名,額外的消息屬性,消息內(nèi)容)
????????????channel.basicPublish("",?QUEUE,?null,?message.getBytes());
????????????System.out.println("發(fā)送消息給mq:"?+?message);
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}finally?{
????????????//?關(guān)閉資源
????????????try?{
????????????????channel.close();
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}?catch?(TimeoutException?e)?{
????????????????e.printStackTrace();
????????????}
????????????try?{
????????????????connection.close();
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????}
}
3.消費(fèi)者代碼實(shí)現(xiàn)
package?rabbitmq;
import?com.rabbitmq.client.AMQP;
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.rabbitmq.client.ConnectionFactory;
import?com.rabbitmq.client.DefaultConsumer;
import?com.rabbitmq.client.Envelope;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
/**
?*?@className:?Consumer
?*?@description:?消費(fèi)者的代碼實(shí)現(xiàn)
?*?@author:?charon
?*?@create:?2021-01-05?08:28
?*/
public?class?Consumer?{
????/**
?????*?聲明隊(duì)列名
?????*/
????private?static?final?String?QUEUE?=?"hello?charon";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????//?創(chuàng)建連接工廠
????????ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
????????//?設(shè)置ip,端口,因?yàn)槭潜緳C(jī),所以直接設(shè)置為127.0.0.1
????????connectionFactory.setHost("127.0.0.1");
????????//?web端口默認(rèn)為15672,通信端口為5672
????????connectionFactory.setPort(5672);
????????//?設(shè)置用戶名和密碼
????????connectionFactory.setUsername("guest");
????????connectionFactory.setPassword("guest");
????????//?設(shè)置虛擬ip,默認(rèn)為/,一個rabbitmq的服務(wù)可以設(shè)置多個虛擬機(jī),每個虛擬機(jī)就相當(dāng)于一個獨(dú)立的mq
????????connectionFactory.setVirtualHost("/");
????????Connection?connection?=?connectionFactory.newConnection();
????????//?創(chuàng)建通道
????????Channel?channel?=?connection.createChannel();
????????//?聲明隊(duì)列(隊(duì)列名稱,是否持久化,是否排它,是否自動刪除,隊(duì)列的擴(kuò)展參數(shù)比如設(shè)置存活時間等)
????????channel.queueDeclare(QUEUE,?true,?false,?false,?null);
????????//?實(shí)現(xiàn)消費(fèi)方法
????????DefaultConsumer?defaultConsumer?=?new?DefaultConsumer(channel){
????????????/**
?????????????*
?????????????*?@param?consumerTag?消費(fèi)者標(biāo)簽
?????????????*?@param?envelope?信封,可以獲取交換機(jī)等信息
?????????????*?@param?properties?消息屬性
?????????????*?@param?body?消費(fèi)內(nèi)容,字節(jié)數(shù)組,可以轉(zhuǎn)成字符串
?????????????*?@throws?IOException
?????????????*/
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
//????????????????String?exchange?=?envelope.getExchange();
//????????????????long?deliveryTag?=?envelope.getDeliveryTag();
????????????????String?message?=?new?String(body,"utf-8");
????????????????System.out.println("收到的消息是:"+message);
????????????}
????????};
????????//?消費(fèi)消息(隊(duì)列名,是否自動確認(rèn),消費(fèi)方法)
????????channel.basicConsume(QUEUE,true,defaultConsumer);
????}
}
5.rabbitMq的工作模式
Work queues 工作隊(duì)列(資源競爭)

生產(chǎn)者將消息放入到隊(duì)列中,消費(fèi)者可以有多個,同時監(jiān)聽同一個隊(duì)列。如上圖,消費(fèi)者c1,c2共同爭搶當(dāng)前消息隊(duì)列的內(nèi)容,誰先拿到誰負(fù)責(zé)消費(fèi)消息,缺點(diǎn)是在高并發(fā)的情況下,默認(rèn)會產(chǎn)品一個消息被多個消費(fèi)者共同使用,可以設(shè)置一個鎖開關(guān),保證一條消息只能被一個消費(fèi)者使用。
上面的代碼,可以再添加一個消費(fèi)者,這樣就可以實(shí)現(xiàn)工作隊(duì)列的工作模式。
2.Publish/Subscribe 發(fā)布訂閱(共享資源)

X代表rabbitMq內(nèi)部組件交換機(jī),生產(chǎn)者將消息放入交換機(jī),交換機(jī)發(fā)布訂閱把消息發(fā)送到所有消息隊(duì)列中,對應(yīng)的消費(fèi)者拿到消息進(jìn)行消費(fèi),對比工作隊(duì)列而言,發(fā)布訂閱可以實(shí)現(xiàn)工作隊(duì)列的功能,但是比工作隊(duì)列更強(qiáng)大。
特點(diǎn):
1.每個消費(fèi)者監(jiān)聽自己的隊(duì)列
2.生產(chǎn)者將消息發(fā)送給Broker,由交換機(jī)將消息轉(zhuǎn)發(fā)到綁定的此交換機(jī)的每個隊(duì)列,每個綁定交換機(jī)的隊(duì)列都將接收到消息;
生產(chǎn)者:
package?rabbitmq.publish;
import?com.rabbitmq.client.BuiltinExchangeType;
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.rabbitmq.client.ConnectionFactory;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
/**
?*?@className:?Producer
?*?@description:?發(fā)布訂閱的生產(chǎn)者
?*?@author:?charon
?*?@create:?2021-01-07?22:02
?*/
public?class?Producer?{
????/**郵件的隊(duì)列*/
????public?static?final?String?QUEUE_INFORM_EMAIL?=?"queue_inform_email";
????/**短信的隊(duì)列*/
????public?static?final?String?QUEUE_INFORM_SMS?=?"queue_inform_sms";
????/**交換機(jī)*/
????public?static?final?String?EXCHANGE_FANOUT_INFORM?=?"exchange_fanout_inform";
????public?static?void?main(String[]?args)?{
????????//?創(chuàng)建連接工廠
????????ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
????????//?設(shè)置ip,端口,因?yàn)槭潜緳C(jī),所以直接設(shè)置為127.0.0.1
????????connectionFactory.setHost("127.0.0.1");
????????//?web端口默認(rèn)為15672,通信端口為5672
????????connectionFactory.setPort(5672);
????????//?設(shè)置用戶名和密碼
????????connectionFactory.setUsername("guest");
????????connectionFactory.setPassword("guest");
????????//?設(shè)置虛擬ip,默認(rèn)為/,一個rabbitmq的服務(wù)可以設(shè)置多個虛擬機(jī),每個虛擬機(jī)就相當(dāng)于一個獨(dú)立的mq
????????connectionFactory.setVirtualHost("/");
????????Connection?connection?=?null;
????????Channel?channel?=?null;
????????try?{
????????????connection?=?connectionFactory.newConnection();
????????????//?創(chuàng)建通道
????????????channel?=?connection.createChannel();
????????????//?聲明隊(duì)列(隊(duì)列名稱,是否持久化,是否排它,是否自動刪除,隊(duì)列的擴(kuò)展參數(shù)比如設(shè)置存活時間等)
????????????channel.queueDeclare(QUEUE_INFORM_EMAIL,?true,?false,?false,?null);
????????????channel.queueDeclare(QUEUE_INFORM_SMS,?true,?false,?false,?null);
????????????//?交換機(jī)(交換機(jī)名稱,交換機(jī)類型(fanout:發(fā)布訂閱,direct:routing,topic:主題,headers:header模式))
????????????channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM,?BuiltinExchangeType.FANOUT);
????????????//?綁定交換機(jī)(隊(duì)列名稱,交換機(jī)名稱,routingKey(發(fā)布訂閱設(shè)置為空))
????????????channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
????????????channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
????????????//?發(fā)送多條消息
????????????for?(int?i?=?0;?i?5;?i++)?{
????????????????String?message?=?"hello?charon?good?evening?by?publish";
????????????????//?指定交換機(jī)(交換機(jī),RoutingKey即隊(duì)列名,額外的消息屬性,消息內(nèi)容)
????????????????channel.basicPublish(EXCHANGE_FANOUT_INFORM,?"",?null,?message.getBytes());
????????????????System.out.println("發(fā)送消息給mq:"?+?message);
????????????}
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}finally?{
????????????//?關(guān)閉資源
????????????try?{
????????????????channel.close();
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}?catch?(TimeoutException?e)?{
????????????????e.printStackTrace();
????????????}
????????????try?{
????????????????connection.close();
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????}
}
消費(fèi)email的消費(fèi)者:
package?rabbitmq.publish;
import?com.rabbitmq.client.AMQP;
import?com.rabbitmq.client.BuiltinExchangeType;
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.rabbitmq.client.ConnectionFactory;
import?com.rabbitmq.client.DefaultConsumer;
import?com.rabbitmq.client.Envelope;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
/**
?*?@className:?EmailConsumer
?*?@description:?郵件的消息消費(fèi)者
?*?@author:?charon
?*?@create:?2021-01-07?22:14
?*/
public?class?EmailConsumer?{
????/**郵件的隊(duì)列*/
????public?static?final?String?QUEUE_INFORM_EMAIL?=?"queue_inform_email";
????/**短信的隊(duì)列*/
????public?static?final?String?QUEUE_INFORM_SMS?=?"queue_inform_sms";
????/**交換機(jī)*/
????public?static?final?String?EXCHANGE_FANOUT_INFORM?=?"exchange_fanout_inform";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????//?創(chuàng)建連接工廠
????????ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
????????//?設(shè)置ip,端口,因?yàn)槭潜緳C(jī),所以直接設(shè)置為127.0.0.1
????????connectionFactory.setHost("127.0.0.1");
????????//?web端口默認(rèn)為15672,通信端口為5672
????????connectionFactory.setPort(5672);
????????//?設(shè)置用戶名和密碼
????????connectionFactory.setUsername("guest");
????????connectionFactory.setPassword("guest");
????????//?設(shè)置虛擬ip,默認(rèn)為/,一個rabbitmq的服務(wù)可以設(shè)置多個虛擬機(jī),每個虛擬機(jī)就相當(dāng)于一個獨(dú)立的mq
????????connectionFactory.setVirtualHost("/");
????????Connection?connection?=?connectionFactory.newConnection();
????????//?創(chuàng)建通道
????????Channel?channel?=?connection.createChannel();
????????//?聲明隊(duì)列(隊(duì)列名稱,是否持久化,是否排它,是否自動刪除,隊(duì)列的擴(kuò)展參數(shù)比如設(shè)置存活時間等)
????????channel.queueDeclare(QUEUE_INFORM_EMAIL,?true,?false,?false,?null);
????????channel.queueDeclare(QUEUE_INFORM_SMS,?true,?false,?false,?null);
????????channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM,?BuiltinExchangeType.FANOUT);
????????//?實(shí)現(xiàn)消費(fèi)方法
????????DefaultConsumer?defaultConsumer?=?new?DefaultConsumer(channel){
????????????/**
?????????????*
?????????????*?@param?consumerTag?消費(fèi)者標(biāo)簽
?????????????*?@param?envelope?信封,可以獲取交換機(jī)等信息
?????????????*?@param?properties?消息屬性
?????????????*?@param?body?消費(fèi)內(nèi)容,字節(jié)數(shù)組,可以轉(zhuǎn)成字符串
?????????????*?@throws?IOException
?????????????*/
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
//????????????????String?exchange?=?envelope.getExchange();
//????????????????long?deliveryTag?=?envelope.getDeliveryTag();
????????????????String?message?=?new?String(body,"utf-8");
????????????????System.out.println("收到的email消息是:"+message);
????????????}
????????};
????????//?消費(fèi)消息(隊(duì)列名,是否自動確認(rèn),消費(fèi)方法)
????????channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
????}
}
消費(fèi)短信的消費(fèi)者:
package?rabbitmq.publish;
import?com.rabbitmq.client.AMQP;
import?com.rabbitmq.client.BuiltinExchangeType;
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.rabbitmq.client.ConnectionFactory;
import?com.rabbitmq.client.DefaultConsumer;
import?com.rabbitmq.client.Envelope;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
/**
?*?@className:?SmsConsumer
?*?@description:
?*?@author:?charon
?*?@create:?2021-01-07?22:17
?*/
public?class?SmsConsumer?{
????/**郵件的隊(duì)列*/
????public?static?final?String?QUEUE_INFORM_EMAIL?=?"queue_inform_email";
????/**短信的隊(duì)列*/
????public?static?final?String?QUEUE_INFORM_SMS?=?"queue_inform_sms";
????/**交換機(jī)*/
????public?static?final?String?EXCHANGE_FANOUT_INFORM?=?"exchange_fanout_inform";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????//?創(chuàng)建連接工廠
????????ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
????????//?設(shè)置ip,端口,因?yàn)槭潜緳C(jī),所以直接設(shè)置為127.0.0.1
????????connectionFactory.setHost("127.0.0.1");
????????//?web端口默認(rèn)為15672,通信端口為5672
????????connectionFactory.setPort(5672);
????????//?設(shè)置用戶名和密碼
????????connectionFactory.setUsername("guest");
????????connectionFactory.setPassword("guest");
????????//?設(shè)置虛擬ip,默認(rèn)為/,一個rabbitmq的服務(wù)可以設(shè)置多個虛擬機(jī),每個虛擬機(jī)就相當(dāng)于一個獨(dú)立的mq
????????connectionFactory.setVirtualHost("/");
????????Connection?connection?=?connectionFactory.newConnection();
????????//?創(chuàng)建通道
????????Channel?channel?=?connection.createChannel();
????????//?聲明隊(duì)列(隊(duì)列名稱,是否持久化,是否排它,是否自動刪除,隊(duì)列的擴(kuò)展參數(shù)比如設(shè)置存活時間等)
????????channel.queueDeclare(QUEUE_INFORM_EMAIL,?true,?false,?false,?null);
????????channel.queueDeclare(QUEUE_INFORM_SMS,?true,?false,?false,?null);
????????channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM,?BuiltinExchangeType.FANOUT);
????????//?實(shí)現(xiàn)消費(fèi)方法
????????DefaultConsumer?defaultConsumer?=?new?DefaultConsumer(channel){
????????????/**
?????????????*
?????????????*?@param?consumerTag?消費(fèi)者標(biāo)簽
?????????????*?@param?envelope?信封,可以獲取交換機(jī)等信息
?????????????*?@param?properties?消息屬性
?????????????*?@param?body?消費(fèi)內(nèi)容,字節(jié)數(shù)組,可以轉(zhuǎn)成字符串
?????????????*?@throws?IOException
?????????????*/
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?message?=?new?String(body,"utf-8");
????????????????System.out.println("收到的短信消息是:"+message);
????????????}
????????};
????????//?消費(fèi)消息(隊(duì)列名,是否自動確認(rèn),消費(fèi)方法)
????????channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);
????}
}
3.Routing 路由模式

生產(chǎn)者將消息發(fā)送給交換機(jī)按照路由判斷,交換機(jī)根據(jù)路由的key,只能匹配上路由key的對應(yīng)的消息隊(duì)列,對應(yīng)的消費(fèi)者才能消費(fèi)消息。
如上圖,rabbitMq根據(jù)對應(yīng)的key,將消息發(fā)送到對應(yīng)的隊(duì)列中,error通知將發(fā)送到amqp.gen-S9b上,由消費(fèi)者c1消費(fèi)。error,info,warning通知將發(fā)送到amqp.gen-Ag1上,由消費(fèi)者c2消費(fèi)。
特點(diǎn):
1.每個消費(fèi)者監(jiān)聽自己的隊(duì)列,并且設(shè)置路由key
2.生產(chǎn)者將消息發(fā)送給交換機(jī),由交換機(jī)根據(jù)路由key來轉(zhuǎn)發(fā)消息到指定的隊(duì)列
生產(chǎn)者:
package?rabbitmq.routing;
import?com.rabbitmq.client.BuiltinExchangeType;
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.rabbitmq.client.ConnectionFactory;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
/**
?*?@className:?Producer
?*?@description:?路由模式下的生成者
?*?@author:?charon
?*?@create:?2021-01-07?22:34
?*/
public?class?Producer?{
????/**郵件的隊(duì)列*/
????public?static?final?String?QUEUE_ROUTING_EMAIL?=?"queue_routing_email";
????/**短信的隊(duì)列*/
????public?static?final?String?QUEUE_ROUTING_SMS?=?"queue_routing_sms";
????/**交換機(jī)*/
????public?static?final?String?EXCHANGE_ROUTING_INFORM?=?"exchange_routing_inform";
????/**?設(shè)置email的路由key?*/
????public?static?final?String?ROUTING_EMAIL?=?"routing_email";
????/**?設(shè)置sms的路由key?*/
????public?static?final?String?ROUTING_SMS?=?"routing_sms";
????public?static?void?main(String[]?args)?{
????????//?創(chuàng)建連接工廠
????????ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
????????//?設(shè)置ip,端口,因?yàn)槭潜緳C(jī),所以直接設(shè)置為127.0.0.1
????????connectionFactory.setHost("127.0.0.1");
????????//?web端口默認(rèn)為15672,通信端口為5672
????????connectionFactory.setPort(5672);
????????//?設(shè)置用戶名和密碼
????????connectionFactory.setUsername("guest");
????????connectionFactory.setPassword("guest");
????????//?設(shè)置虛擬ip,默認(rèn)為/,一個rabbitmq的服務(wù)可以設(shè)置多個虛擬機(jī),每個虛擬機(jī)就相當(dāng)于一個獨(dú)立的mq
????????connectionFactory.setVirtualHost("/");
????????Connection?connection?=?null;
????????Channel?channel?=?null;
????????try?{
????????????connection?=?connectionFactory.newConnection();
????????????//?創(chuàng)建通道
????????????channel?=?connection.createChannel();
????????????//?聲明隊(duì)列(隊(duì)列名稱,是否持久化,是否排它,是否自動刪除,隊(duì)列的擴(kuò)展參數(shù)比如設(shè)置存活時間等)
????????????channel.queueDeclare(QUEUE_ROUTING_EMAIL,?true,?false,?false,?null);
????????????channel.queueDeclare(QUEUE_ROUTING_SMS,?true,?false,?false,?null);
????????????//?交換機(jī)(交換機(jī)名稱,交換機(jī)類型(fanout:發(fā)布訂閱,direct:routing,topic:主題,headers:header模式))
????????????channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM,?BuiltinExchangeType.DIRECT);
????????????//?綁定交換機(jī)(隊(duì)列名稱,交換機(jī)名稱,routingKey)
????????????channel.queueBind(QUEUE_ROUTING_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTING_EMAIL);
????????????channel.queueBind(QUEUE_ROUTING_SMS,EXCHANGE_ROUTING_INFORM,ROUTING_SMS);
????????????//?發(fā)送多條消息
????????????for?(int?i?=?0;?i?5;?i++)?{
????????????????String?message?=?"hello?charon?good?evening?by?routing?--email";
????????????????//?指定交換機(jī)(交換機(jī),RoutingKey,額外的消息屬性,消息內(nèi)容)
????????????????channel.basicPublish(EXCHANGE_ROUTING_INFORM,?ROUTING_EMAIL,?null,?message.getBytes());
????????????????System.out.println("發(fā)送消息給mq:"?+?message);
????????????}
????????????//?發(fā)送多條消息
????????????for?(int?i?=?0;?i?5;?i++)?{
????????????????String?message?=?"hello?charon?good?evening?by?routing?--sms";
????????????????//?指定交換機(jī)(交換機(jī),RoutingKey,額外的消息屬性,消息內(nèi)容)
????????????????channel.basicPublish(EXCHANGE_ROUTING_INFORM,?ROUTING_SMS,?null,?message.getBytes());
????????????????System.out.println("發(fā)送消息給mq:"?+?message);
????????????}
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}finally?{
????????????//?關(guān)閉資源
????????????try?{
????????????????channel.close();
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}?catch?(TimeoutException?e)?{
????????????????e.printStackTrace();
????????????}
????????????try?{
????????????????connection.close();
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????}
}
消費(fèi)email的消費(fèi)者:
package?rabbitmq.routing;
import?com.rabbitmq.client.AMQP;
import?com.rabbitmq.client.BuiltinExchangeType;
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.rabbitmq.client.ConnectionFactory;
import?com.rabbitmq.client.DefaultConsumer;
import?com.rabbitmq.client.Envelope;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
/**
?*?@className:?EmailConsumer
?*?@description:?路由模式下的email消費(fèi)者
?*?@author:?charon
?*?@create:?2021-01-07?22:40
?*/
public?class?EmailConsumer?{
????/**郵件的隊(duì)列*/
????public?static?final?String?QUEUE_ROUTING_EMAIL?=?"queue_routing_email";
????/**交換機(jī)*/
????public?static?final?String?EXCHANGE_ROUTING_INFORM?=?"exchange_routing_inform";
????/**?設(shè)置email的路由key?*/
????public?static?final?String?ROUTING_EMAIL?=?"routing_email";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????//?創(chuàng)建連接工廠
????????ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
????????//?設(shè)置ip,端口,因?yàn)槭潜緳C(jī),所以直接設(shè)置為127.0.0.1
????????connectionFactory.setHost("127.0.0.1");
????????//?web端口默認(rèn)為15672,通信端口為5672
????????connectionFactory.setPort(5672);
????????//?設(shè)置用戶名和密碼
????????connectionFactory.setUsername("guest");
????????connectionFactory.setPassword("guest");
????????//?設(shè)置虛擬ip,默認(rèn)為/,一個rabbitmq的服務(wù)可以設(shè)置多個虛擬機(jī),每個虛擬機(jī)就相當(dāng)于一個獨(dú)立的mq
????????connectionFactory.setVirtualHost("/");
????????Connection?connection?=?connectionFactory.newConnection();
????????//?創(chuàng)建通道
????????Channel?channel?=?connection.createChannel();
????????//?聲明隊(duì)列(隊(duì)列名稱,是否持久化,是否排它,是否自動刪除,隊(duì)列的擴(kuò)展參數(shù)比如設(shè)置存活時間等)
????????channel.queueDeclare(QUEUE_ROUTING_EMAIL,?true,?false,?false,?null);
????????channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM,?BuiltinExchangeType.DIRECT);
????????//?綁定隊(duì)列并指明路由key
????????channel.queueBind(QUEUE_ROUTING_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTING_EMAIL);
????????//?實(shí)現(xiàn)消費(fèi)方法
????????DefaultConsumer?defaultConsumer?=?new?DefaultConsumer(channel){
????????????/**
?????????????*
?????????????*?@param?consumerTag?消費(fèi)者標(biāo)簽
?????????????*?@param?envelope?信封,可以獲取交換機(jī)等信息
?????????????*?@param?properties?消息屬性
?????????????*?@param?body?消費(fèi)內(nèi)容,字節(jié)數(shù)組,可以轉(zhuǎn)成字符串
?????????????*?@throws?IOException
?????????????*/
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?message?=?new?String(body,"utf-8");
????????????????System.out.println("收到的email消息是:"+message);
????????????}
????????};
????????//?消費(fèi)消息(隊(duì)列名,是否自動確認(rèn),消費(fèi)方法)
????????channel.basicConsume(QUEUE_ROUTING_EMAIL,true,defaultConsumer);
????}
}
消費(fèi)短信的消費(fèi)者:
package?rabbitmq.routing;
import?com.rabbitmq.client.AMQP;
import?com.rabbitmq.client.BuiltinExchangeType;
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.rabbitmq.client.ConnectionFactory;
import?com.rabbitmq.client.DefaultConsumer;
import?com.rabbitmq.client.Envelope;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
/**
?*?@className:?EmailConsumer
?*?@description:?路由模式下的email消費(fèi)者
?*?@author:?charon
?*?@create:?2021-01-07?22:40
?*/
public?class?SmsConsumer?{
????/**郵件的隊(duì)列*/
????public?static?final?String?QUEUE_ROUTING_SMS?=?"queue_routing_sms";
????/**交換機(jī)*/
????public?static?final?String?EXCHANGE_ROUTING_INFORM?=?"exchange_routing_inform";
????/**?設(shè)置email的路由key?*/
????public?static?final?String?ROUTING_SMS?=?"routing_sms";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????//?創(chuàng)建連接工廠
????????ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
????????//?設(shè)置ip,端口,因?yàn)槭潜緳C(jī),所以直接設(shè)置為127.0.0.1
????????connectionFactory.setHost("127.0.0.1");
????????//?web端口默認(rèn)為15672,通信端口為5672
????????connectionFactory.setPort(5672);
????????//?設(shè)置用戶名和密碼
????????connectionFactory.setUsername("guest");
????????connectionFactory.setPassword("guest");
????????//?設(shè)置虛擬ip,默認(rèn)為/,一個rabbitmq的服務(wù)可以設(shè)置多個虛擬機(jī),每個虛擬機(jī)就相當(dāng)于一個獨(dú)立的mq
????????connectionFactory.setVirtualHost("/");
????????Connection?connection?=?connectionFactory.newConnection();
????????//?創(chuàng)建通道
????????Channel?channel?=?connection.createChannel();
????????//?聲明隊(duì)列(隊(duì)列名稱,是否持久化,是否排它,是否自動刪除,隊(duì)列的擴(kuò)展參數(shù)比如設(shè)置存活時間等)
????????channel.queueDeclare(QUEUE_ROUTING_SMS,?true,?false,?false,?null);
????????channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM,?BuiltinExchangeType.DIRECT);
????????//?綁定隊(duì)列并指明路由key
????????channel.queueBind(QUEUE_ROUTING_SMS,EXCHANGE_ROUTING_INFORM,ROUTING_SMS);
????????//?實(shí)現(xiàn)消費(fèi)方法
????????DefaultConsumer?defaultConsumer?=?new?DefaultConsumer(channel){
????????????/**
?????????????*
?????????????*?@param?consumerTag?消費(fèi)者標(biāo)簽
?????????????*?@param?envelope?信封,可以獲取交換機(jī)等信息
?????????????*?@param?properties?消息屬性
?????????????*?@param?body?消費(fèi)內(nèi)容,字節(jié)數(shù)組,可以轉(zhuǎn)成字符串
?????????????*?@throws?IOException
?????????????*/
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?message?=?new?String(body,"utf-8");
????????????????System.out.println("收到的短信消息是:"+message);
????????????}
????????};
????????//?消費(fèi)消息(隊(duì)列名,是否自動確認(rèn),消費(fèi)方法)
????????channel.basicConsume(QUEUE_ROUTING_SMS,true,defaultConsumer);
????}
}
4.Topic 主題模式

星號井號代表通配符
星號代表一個單詞,井號代表一個或多個單詞
路由功能添加模糊匹配
消息產(chǎn)生者產(chǎn)生消息,把消息交給交換機(jī)
交換機(jī)根據(jù)key的規(guī)則模糊匹配到對應(yīng)的隊(duì)列,由隊(duì)列的監(jiān)聽消費(fèi)者接收消息消費(fèi)
特點(diǎn):
1.每個消費(fèi)者監(jiān)聽自己的隊(duì)列,并且設(shè)置帶通配符的routingkey
2.生產(chǎn)者將消息發(fā)送給broker,由交換機(jī)及根據(jù)路由key來轉(zhuǎn)發(fā)消息到指定的隊(duì)列
5.Header 轉(zhuǎn)發(fā)器
取消了路由key,使用header中的key/value(鍵值對)來匹配隊(duì)列。
6.RPC 遠(yuǎn)程調(diào)用

基于direct類型交換機(jī)實(shí)現(xiàn)。生產(chǎn)者將消息遠(yuǎn)程發(fā)送給rpc隊(duì)列,消費(fèi)者監(jiān)聽rpc消息隊(duì)列的消息并消息,然后將返回結(jié)果放入到響應(yīng)隊(duì)列中,生產(chǎn)者監(jiān)聽響應(yīng)隊(duì)列中的消息,拿到消費(fèi)者的處理結(jié)果,實(shí)現(xiàn)遠(yuǎn)程RPC遠(yuǎn)程調(diào)用。
參考文件:
https://www.cnblogs.com/Jeely/p/10784013.html
https://lovnx.blog.csdn.net/article/details/70991021
本文版權(quán)歸Charon和博客園共有,原創(chuàng)文章,歡迎轉(zhuǎn)載,但未經(jīng)作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責(zé)任的權(quán)利。
粉絲福利:Java從入門到入土學(xué)習(xí)路線圖
??????

??長按上方微信二維碼?2 秒
感謝點(diǎn)贊支持下哈?
