<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          消息隊(duì)列之RabbitMQ

          共 7847字,需瀏覽 16分鐘

           ·

          2021-01-19 08:55

          點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號”

          優(yōu)質(zhì)文章,第一時間送達(dá)

          ? 作者?|? pluto_charon

          來源 |? urlify.cn/VFrYvm

          76套java從入門到精通實(shí)戰(zhàn)課程分享

          1.rabbitMQ介紹

          rabbitMQ是由erlang語言開發(fā)的,基于AMQP協(xié)議實(shí)現(xiàn)的消息隊(duì)列。他是一種應(yīng)用程序之間的通信方法,在分布式系統(tǒng)開發(fā)中應(yīng)用非常廣泛。

          rabbitMq的有點(diǎn):

          1. 使用簡單,功能強(qiáng)大

          2. 基于AMQP協(xié)議

          3. 社區(qū)活躍,文檔完善

          4. 高并發(fā)性能好,erlang語言是專門用于開發(fā)高并發(fā)程序的

          5. springBoot默認(rèn)集成rabbitMq

          AMQP(advanced Message Queuing Protocol),是一個提供統(tǒng)一消息服務(wù)的應(yīng)用標(biāo)準(zhǔn)高級消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì),基于此協(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端中間件的產(chǎn)品不同和開發(fā)語言不同的限制。JMS和AMQP的區(qū)別在于:JMS是java語言專屬的消息服務(wù)標(biāo)準(zhǔn),他是在api層定義標(biāo)準(zhǔn),并且只能用于java應(yīng)用,而AMQP是在協(xié)議層定義的標(biāo)準(zhǔn),是可以跨語言的。

          2.工作流程

          發(fā)送消息:

          1. 生產(chǎn)者和broker建立TCP連接

          2. 生產(chǎn)者和broker建立通道

          3. 生產(chǎn)者通過通道消息發(fā)送給broker,由exchange將消息轉(zhuǎn)發(fā)

          4. exchange將消息轉(zhuǎn)發(fā)給指定的queue

          接受消息:

          1. 消費(fèi)者和broker建立TCP連接

          2. 消費(fèi)者和broker建立通道

          3. 消費(fèi)者監(jiān)聽指定的queue

          4. 當(dāng)有消息到達(dá)queue的時候broker默認(rèn)將消息推送給消費(fèi)者

          5. 消費(fèi)者接受到消息并消費(fèi)

          3.安裝

          如果不想自己下載,需要我這里的軟件的,可以在下面評論郵箱,我私發(fā)給你。

          1.安裝erlang的環(huán)境,雙擊otp的運(yùn)行程序,然后一路點(diǎn)擊下一步(next)。

          配置環(huán)境變量

          在path中添加erlang的路徑

          2.安裝rabbitMq,雙擊rabbitmq的運(yùn)行程序

          安裝完成之后在菜單頁面可以看到

          安裝完RabbitMQ如果想要訪問管理頁面需要在rabbitmq的sbin目錄中使用cmd執(zhí)行:rabbitmq-plugins.bat enable rabbitmq_management(管理員身份運(yùn)行此命令)添加可視化插件。

          點(diǎn)擊上圖中的start/stop來開啟/停止服務(wù)。然后在瀏覽器上輸入地址查看,rabbitMq的默認(rèn)端口是15672。默認(rèn)的用戶名和密碼都是guest

          如果安裝失敗,需要卸載重裝的時候或者出現(xiàn)rabbitMq服務(wù)注冊失敗時,此時需要進(jìn)入注冊表清理erlang(搜索rabbitMQ,erlsrv將對應(yīng)的項(xiàng)刪除)

          4.代碼實(shí)現(xiàn)

          1.添加依賴



          ?com.rabbitmq
          ?amqp-client
          ?5.7.3

          2.生產(chǎn)者代碼實(shí)現(xiàn)

          package?rabbitmq;

          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.rabbitmq.client.ConnectionFactory;

          import?java.io.IOException;
          import?java.net.ConnectException;
          import?java.util.concurrent.TimeoutException;

          /**
          ?*?@className:?producer
          ?*?@description:?rabbitmq的生產(chǎn)者代碼實(shí)現(xiàn)
          ?*?@author:?charon
          ?*?@create:?2021-01-03?23:10
          ?*/
          public?class?Producer?{
          ????/**
          ?????*?聲明隊(duì)列名
          ?????*/
          ????private?static?final?String?QUEUE?=?"hello?charon";

          ????public?static?void?main(String[]?args)?{
          ????????//?創(chuàng)建連接工廠
          ????????ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
          ????????//?設(shè)置ip,端口,因?yàn)槭潜緳C(jī),所以直接設(shè)置為127.0.0.1
          ????????connectionFactory.setHost("127.0.0.1");
          ????????//?web端口默認(rèn)為15672,通信端口為5672
          ????????connectionFactory.setPort(5672);
          ????????//?設(shè)置用戶名和密碼
          ????????connectionFactory.setUsername("guest");
          ????????connectionFactory.setPassword("guest");
          ????????//?設(shè)置虛擬ip,默認(rèn)為/,一個rabbitmq的服務(wù)可以設(shè)置多個虛擬機(jī),每個虛擬機(jī)就相當(dāng)于一個獨(dú)立的mq
          ????????connectionFactory.setVirtualHost("/");
          ????????Connection?connection?=?null;
          ????????Channel?channel?=?null;
          ????????try?{
          ????????????connection?=?connectionFactory.newConnection();
          ????????????//?創(chuàng)建通道
          ????????????channel?=?connection.createChannel();
          ????????????//?聲明隊(duì)列(隊(duì)列名稱,是否持久化,是否排它,是否自動刪除,隊(duì)列的擴(kuò)展參數(shù)比如設(shè)置存活時間等)
          ????????????channel.queueDeclare(QUEUE,?true,?false,?false,?null);
          ????????????String?message?=?"hello?charon?good?evening";
          ????????????//?發(fā)布消息(交換機(jī),RoutingKey即隊(duì)列名,額外的消息屬性,消息內(nèi)容)
          ????????????channel.basicPublish("",?QUEUE,?null,?message.getBytes());
          ????????????System.out.println("發(fā)送消息給mq:"?+?message);
          ????????}?catch?(Exception?e)?{
          ????????????e.printStackTrace();
          ????????}finally?{
          ????????????//?關(guān)閉資源
          ????????????try?{
          ????????????????channel.close();
          ????????????}?catch?(IOException?e)?{
          ????????????????e.printStackTrace();
          ????????????}?catch?(TimeoutException?e)?{
          ????????????????e.printStackTrace();
          ????????????}
          ????????????try?{
          ????????????????connection.close();
          ????????????}?catch?(IOException?e)?{
          ????????????????e.printStackTrace();
          ????????????}
          ????????}
          ????}
          }

          3.消費(fèi)者代碼實(shí)現(xiàn)

          package?rabbitmq;

          import?com.rabbitmq.client.AMQP;
          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.rabbitmq.client.ConnectionFactory;
          import?com.rabbitmq.client.DefaultConsumer;
          import?com.rabbitmq.client.Envelope;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          /**
          ?*?@className:?Consumer
          ?*?@description:?消費(fèi)者的代碼實(shí)現(xiàn)
          ?*?@author:?charon
          ?*?@create:?2021-01-05?08:28
          ?*/
          public?class?Consumer?{
          ????/**
          ?????*?聲明隊(duì)列名
          ?????*/
          ????private?static?final?String?QUEUE?=?"hello?charon";

          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????//?創(chuàng)建連接工廠
          ????????ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
          ????????//?設(shè)置ip,端口,因?yàn)槭潜緳C(jī),所以直接設(shè)置為127.0.0.1
          ????????connectionFactory.setHost("127.0.0.1");
          ????????//?web端口默認(rèn)為15672,通信端口為5672
          ????????connectionFactory.setPort(5672);
          ????????//?設(shè)置用戶名和密碼
          ????????connectionFactory.setUsername("guest");
          ????????connectionFactory.setPassword("guest");
          ????????//?設(shè)置虛擬ip,默認(rèn)為/,一個rabbitmq的服務(wù)可以設(shè)置多個虛擬機(jī),每個虛擬機(jī)就相當(dāng)于一個獨(dú)立的mq
          ????????connectionFactory.setVirtualHost("/");
          ????????Connection?connection?=?connectionFactory.newConnection();
          ????????//?創(chuàng)建通道
          ????????Channel?channel?=?connection.createChannel();
          ????????//?聲明隊(duì)列(隊(duì)列名稱,是否持久化,是否排它,是否自動刪除,隊(duì)列的擴(kuò)展參數(shù)比如設(shè)置存活時間等)
          ????????channel.queueDeclare(QUEUE,?true,?false,?false,?null);
          ????????//?實(shí)現(xiàn)消費(fèi)方法
          ????????DefaultConsumer?defaultConsumer?=?new?DefaultConsumer(channel){
          ????????????/**
          ?????????????*
          ?????????????*?@param?consumerTag?消費(fèi)者標(biāo)簽
          ?????????????*?@param?envelope?信封,可以獲取交換機(jī)等信息
          ?????????????*?@param?properties?消息屬性
          ?????????????*?@param?body?消費(fèi)內(nèi)容,字節(jié)數(shù)組,可以轉(zhuǎn)成字符串
          ?????????????*?@throws?IOException
          ?????????????*/
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          //????????????????String?exchange?=?envelope.getExchange();
          //????????????????long?deliveryTag?=?envelope.getDeliveryTag();
          ????????????????String?message?=?new?String(body,"utf-8");
          ????????????????System.out.println("收到的消息是:"+message);
          ????????????}
          ????????};
          ????????//?消費(fèi)消息(隊(duì)列名,是否自動確認(rèn),消費(fèi)方法)
          ????????channel.basicConsume(QUEUE,true,defaultConsumer);
          ????}
          }

          5.rabbitMq的工作模式

          1. Work queues 工作隊(duì)列(資源競爭)

          生產(chǎn)者將消息放入到隊(duì)列中,消費(fèi)者可以有多個,同時監(jiān)聽同一個隊(duì)列。如上圖,消費(fèi)者c1,c2共同爭搶當(dāng)前消息隊(duì)列的內(nèi)容,誰先拿到誰負(fù)責(zé)消費(fèi)消息,缺點(diǎn)是在高并發(fā)的情況下,默認(rèn)會產(chǎn)品一個消息被多個消費(fèi)者共同使用,可以設(shè)置一個鎖開關(guān),保證一條消息只能被一個消費(fèi)者使用。

          上面的代碼,可以再添加一個消費(fèi)者,這樣就可以實(shí)現(xiàn)工作隊(duì)列的工作模式。

          2.Publish/Subscribe 發(fā)布訂閱(共享資源)

          X代表rabbitMq內(nèi)部組件交換機(jī),生產(chǎn)者將消息放入交換機(jī),交換機(jī)發(fā)布訂閱把消息發(fā)送到所有消息隊(duì)列中,對應(yīng)的消費(fèi)者拿到消息進(jìn)行消費(fèi),對比工作隊(duì)列而言,發(fā)布訂閱可以實(shí)現(xiàn)工作隊(duì)列的功能,但是比工作隊(duì)列更強(qiáng)大。

          特點(diǎn):
          1.每個消費(fèi)者監(jiān)聽自己的隊(duì)列
          2.生產(chǎn)者將消息發(fā)送給Broker,由交換機(jī)將消息轉(zhuǎn)發(fā)到綁定的此交換機(jī)的每個隊(duì)列,每個綁定交換機(jī)的隊(duì)列都將接收到消息;

          生產(chǎn)者:

          package?rabbitmq.publish;

          import?com.rabbitmq.client.BuiltinExchangeType;
          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.rabbitmq.client.ConnectionFactory;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          /**
          ?*?@className:?Producer
          ?*?@description:?發(fā)布訂閱的生產(chǎn)者
          ?*?@author:?charon
          ?*?@create:?2021-01-07?22:02
          ?*/
          public?class?Producer?{

          ????/**郵件的隊(duì)列*/
          ????public?static?final?String?QUEUE_INFORM_EMAIL?=?"queue_inform_email";

          ????/**短信的隊(duì)列*/
          ????public?static?final?String?QUEUE_INFORM_SMS?=?"queue_inform_sms";

          ????/**交換機(jī)*/
          ????public?static?final?String?EXCHANGE_FANOUT_INFORM?=?"exchange_fanout_inform";

          ????public?static?void?main(String[]?args)?{
          ????????//?創(chuàng)建連接工廠
          ????????ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
          ????????//?設(shè)置ip,端口,因?yàn)槭潜緳C(jī),所以直接設(shè)置為127.0.0.1
          ????????connectionFactory.setHost("127.0.0.1");
          ????????//?web端口默認(rèn)為15672,通信端口為5672
          ????????connectionFactory.setPort(5672);
          ????????//?設(shè)置用戶名和密碼
          ????????connectionFactory.setUsername("guest");
          ????????connectionFactory.setPassword("guest");
          ????????//?設(shè)置虛擬ip,默認(rèn)為/,一個rabbitmq的服務(wù)可以設(shè)置多個虛擬機(jī),每個虛擬機(jī)就相當(dāng)于一個獨(dú)立的mq
          ????????connectionFactory.setVirtualHost("/");
          ????????Connection?connection?=?null;
          ????????Channel?channel?=?null;
          ????????try?{
          ????????????connection?=?connectionFactory.newConnection();
          ????????????//?創(chuàng)建通道
          ????????????channel?=?connection.createChannel();
          ????????????//?聲明隊(duì)列(隊(duì)列名稱,是否持久化,是否排它,是否自動刪除,隊(duì)列的擴(kuò)展參數(shù)比如設(shè)置存活時間等)
          ????????????channel.queueDeclare(QUEUE_INFORM_EMAIL,?true,?false,?false,?null);
          ????????????channel.queueDeclare(QUEUE_INFORM_SMS,?true,?false,?false,?null);
          ????????????//?交換機(jī)(交換機(jī)名稱,交換機(jī)類型(fanout:發(fā)布訂閱,direct:routing,topic:主題,headers:header模式))
          ????????????channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM,?BuiltinExchangeType.FANOUT);
          ????????????//?綁定交換機(jī)(隊(duì)列名稱,交換機(jī)名稱,routingKey(發(fā)布訂閱設(shè)置為空))
          ????????????channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
          ????????????channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
          ????????????//?發(fā)送多條消息
          ????????????for?(int?i?=?0;?i?????????????????String?message?=?"hello?charon?good?evening?by?publish";
          ????????????????//?指定交換機(jī)(交換機(jī),RoutingKey即隊(duì)列名,額外的消息屬性,消息內(nèi)容)
          ????????????????channel.basicPublish(EXCHANGE_FANOUT_INFORM,?"",?null,?message.getBytes());
          ????????????????System.out.println("發(fā)送消息給mq:"?+?message);
          ????????????}
          ????????}?catch?(Exception?e)?{
          ????????????e.printStackTrace();
          ????????}finally?{
          ????????????//?關(guān)閉資源
          ????????????try?{
          ????????????????channel.close();
          ????????????}?catch?(IOException?e)?{
          ????????????????e.printStackTrace();
          ????????????}?catch?(TimeoutException?e)?{
          ????????????????e.printStackTrace();
          ????????????}
          ????????????try?{
          ????????????????connection.close();
          ????????????}?catch?(IOException?e)?{
          ????????????????e.printStackTrace();
          ????????????}
          ????????}
          ????}
          }

          消費(fèi)email的消費(fèi)者:

          package?rabbitmq.publish;

          import?com.rabbitmq.client.AMQP;
          import?com.rabbitmq.client.BuiltinExchangeType;
          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.rabbitmq.client.ConnectionFactory;
          import?com.rabbitmq.client.DefaultConsumer;
          import?com.rabbitmq.client.Envelope;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          /**
          ?*?@className:?EmailConsumer
          ?*?@description:?郵件的消息消費(fèi)者
          ?*?@author:?charon
          ?*?@create:?2021-01-07?22:14
          ?*/
          public?class?EmailConsumer?{

          ????/**郵件的隊(duì)列*/
          ????public?static?final?String?QUEUE_INFORM_EMAIL?=?"queue_inform_email";

          ????/**短信的隊(duì)列*/
          ????public?static?final?String?QUEUE_INFORM_SMS?=?"queue_inform_sms";

          ????/**交換機(jī)*/
          ????public?static?final?String?EXCHANGE_FANOUT_INFORM?=?"exchange_fanout_inform";

          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????//?創(chuàng)建連接工廠
          ????????ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
          ????????//?設(shè)置ip,端口,因?yàn)槭潜緳C(jī),所以直接設(shè)置為127.0.0.1
          ????????connectionFactory.setHost("127.0.0.1");
          ????????//?web端口默認(rèn)為15672,通信端口為5672
          ????????connectionFactory.setPort(5672);
          ????????//?設(shè)置用戶名和密碼
          ????????connectionFactory.setUsername("guest");
          ????????connectionFactory.setPassword("guest");
          ????????//?設(shè)置虛擬ip,默認(rèn)為/,一個rabbitmq的服務(wù)可以設(shè)置多個虛擬機(jī),每個虛擬機(jī)就相當(dāng)于一個獨(dú)立的mq
          ????????connectionFactory.setVirtualHost("/");
          ????????Connection?connection?=?connectionFactory.newConnection();
          ????????//?創(chuàng)建通道
          ????????Channel?channel?=?connection.createChannel();
          ????????//?聲明隊(duì)列(隊(duì)列名稱,是否持久化,是否排它,是否自動刪除,隊(duì)列的擴(kuò)展參數(shù)比如設(shè)置存活時間等)
          ????????channel.queueDeclare(QUEUE_INFORM_EMAIL,?true,?false,?false,?null);
          ????????channel.queueDeclare(QUEUE_INFORM_SMS,?true,?false,?false,?null);
          ????????channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM,?BuiltinExchangeType.FANOUT);
          ????????//?實(shí)現(xiàn)消費(fèi)方法
          ????????DefaultConsumer?defaultConsumer?=?new?DefaultConsumer(channel){
          ????????????/**
          ?????????????*
          ?????????????*?@param?consumerTag?消費(fèi)者標(biāo)簽
          ?????????????*?@param?envelope?信封,可以獲取交換機(jī)等信息
          ?????????????*?@param?properties?消息屬性
          ?????????????*?@param?body?消費(fèi)內(nèi)容,字節(jié)數(shù)組,可以轉(zhuǎn)成字符串
          ?????????????*?@throws?IOException
          ?????????????*/
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          //????????????????String?exchange?=?envelope.getExchange();
          //????????????????long?deliveryTag?=?envelope.getDeliveryTag();
          ????????????????String?message?=?new?String(body,"utf-8");
          ????????????????System.out.println("收到的email消息是:"+message);
          ????????????}
          ????????};
          ????????//?消費(fèi)消息(隊(duì)列名,是否自動確認(rèn),消費(fèi)方法)
          ????????channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
          ????}
          }

          消費(fèi)短信的消費(fèi)者:

          package?rabbitmq.publish;

          import?com.rabbitmq.client.AMQP;
          import?com.rabbitmq.client.BuiltinExchangeType;
          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.rabbitmq.client.ConnectionFactory;
          import?com.rabbitmq.client.DefaultConsumer;
          import?com.rabbitmq.client.Envelope;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          /**
          ?*?@className:?SmsConsumer
          ?*?@description:
          ?*?@author:?charon
          ?*?@create:?2021-01-07?22:17
          ?*/
          public?class?SmsConsumer?{


          ????/**郵件的隊(duì)列*/
          ????public?static?final?String?QUEUE_INFORM_EMAIL?=?"queue_inform_email";

          ????/**短信的隊(duì)列*/
          ????public?static?final?String?QUEUE_INFORM_SMS?=?"queue_inform_sms";

          ????/**交換機(jī)*/
          ????public?static?final?String?EXCHANGE_FANOUT_INFORM?=?"exchange_fanout_inform";

          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????//?創(chuàng)建連接工廠
          ????????ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
          ????????//?設(shè)置ip,端口,因?yàn)槭潜緳C(jī),所以直接設(shè)置為127.0.0.1
          ????????connectionFactory.setHost("127.0.0.1");
          ????????//?web端口默認(rèn)為15672,通信端口為5672
          ????????connectionFactory.setPort(5672);
          ????????//?設(shè)置用戶名和密碼
          ????????connectionFactory.setUsername("guest");
          ????????connectionFactory.setPassword("guest");
          ????????//?設(shè)置虛擬ip,默認(rèn)為/,一個rabbitmq的服務(wù)可以設(shè)置多個虛擬機(jī),每個虛擬機(jī)就相當(dāng)于一個獨(dú)立的mq
          ????????connectionFactory.setVirtualHost("/");
          ????????Connection?connection?=?connectionFactory.newConnection();
          ????????//?創(chuàng)建通道
          ????????Channel?channel?=?connection.createChannel();
          ????????//?聲明隊(duì)列(隊(duì)列名稱,是否持久化,是否排它,是否自動刪除,隊(duì)列的擴(kuò)展參數(shù)比如設(shè)置存活時間等)
          ????????channel.queueDeclare(QUEUE_INFORM_EMAIL,?true,?false,?false,?null);
          ????????channel.queueDeclare(QUEUE_INFORM_SMS,?true,?false,?false,?null);
          ????????channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM,?BuiltinExchangeType.FANOUT);
          ????????//?實(shí)現(xiàn)消費(fèi)方法
          ????????DefaultConsumer?defaultConsumer?=?new?DefaultConsumer(channel){
          ????????????/**
          ?????????????*
          ?????????????*?@param?consumerTag?消費(fèi)者標(biāo)簽
          ?????????????*?@param?envelope?信封,可以獲取交換機(jī)等信息
          ?????????????*?@param?properties?消息屬性
          ?????????????*?@param?body?消費(fèi)內(nèi)容,字節(jié)數(shù)組,可以轉(zhuǎn)成字符串
          ?????????????*?@throws?IOException
          ?????????????*/
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?message?=?new?String(body,"utf-8");
          ????????????????System.out.println("收到的短信消息是:"+message);
          ????????????}
          ????????};
          ????????//?消費(fèi)消息(隊(duì)列名,是否自動確認(rèn),消費(fèi)方法)
          ????????channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);
          ????}
          }

          3.Routing 路由模式

          生產(chǎn)者將消息發(fā)送給交換機(jī)按照路由判斷,交換機(jī)根據(jù)路由的key,只能匹配上路由key的對應(yīng)的消息隊(duì)列,對應(yīng)的消費(fèi)者才能消費(fèi)消息。

          如上圖,rabbitMq根據(jù)對應(yīng)的key,將消息發(fā)送到對應(yīng)的隊(duì)列中,error通知將發(fā)送到amqp.gen-S9b上,由消費(fèi)者c1消費(fèi)。error,info,warning通知將發(fā)送到amqp.gen-Ag1上,由消費(fèi)者c2消費(fèi)。

          特點(diǎn):
          1.每個消費(fèi)者監(jiān)聽自己的隊(duì)列,并且設(shè)置路由key
          2.生產(chǎn)者將消息發(fā)送給交換機(jī),由交換機(jī)根據(jù)路由key來轉(zhuǎn)發(fā)消息到指定的隊(duì)列

          生產(chǎn)者:

          package?rabbitmq.routing;

          import?com.rabbitmq.client.BuiltinExchangeType;
          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.rabbitmq.client.ConnectionFactory;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          /**
          ?*?@className:?Producer
          ?*?@description:?路由模式下的生成者
          ?*?@author:?charon
          ?*?@create:?2021-01-07?22:34
          ?*/
          public?class?Producer?{

          ????/**郵件的隊(duì)列*/
          ????public?static?final?String?QUEUE_ROUTING_EMAIL?=?"queue_routing_email";

          ????/**短信的隊(duì)列*/
          ????public?static?final?String?QUEUE_ROUTING_SMS?=?"queue_routing_sms";

          ????/**交換機(jī)*/
          ????public?static?final?String?EXCHANGE_ROUTING_INFORM?=?"exchange_routing_inform";

          ????/**?設(shè)置email的路由key?*/
          ????public?static?final?String?ROUTING_EMAIL?=?"routing_email";

          ????/**?設(shè)置sms的路由key?*/
          ????public?static?final?String?ROUTING_SMS?=?"routing_sms";

          ????public?static?void?main(String[]?args)?{
          ????????//?創(chuàng)建連接工廠
          ????????ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
          ????????//?設(shè)置ip,端口,因?yàn)槭潜緳C(jī),所以直接設(shè)置為127.0.0.1
          ????????connectionFactory.setHost("127.0.0.1");
          ????????//?web端口默認(rèn)為15672,通信端口為5672
          ????????connectionFactory.setPort(5672);
          ????????//?設(shè)置用戶名和密碼
          ????????connectionFactory.setUsername("guest");
          ????????connectionFactory.setPassword("guest");
          ????????//?設(shè)置虛擬ip,默認(rèn)為/,一個rabbitmq的服務(wù)可以設(shè)置多個虛擬機(jī),每個虛擬機(jī)就相當(dāng)于一個獨(dú)立的mq
          ????????connectionFactory.setVirtualHost("/");
          ????????Connection?connection?=?null;
          ????????Channel?channel?=?null;
          ????????try?{
          ????????????connection?=?connectionFactory.newConnection();
          ????????????//?創(chuàng)建通道
          ????????????channel?=?connection.createChannel();
          ????????????//?聲明隊(duì)列(隊(duì)列名稱,是否持久化,是否排它,是否自動刪除,隊(duì)列的擴(kuò)展參數(shù)比如設(shè)置存活時間等)
          ????????????channel.queueDeclare(QUEUE_ROUTING_EMAIL,?true,?false,?false,?null);
          ????????????channel.queueDeclare(QUEUE_ROUTING_SMS,?true,?false,?false,?null);
          ????????????//?交換機(jī)(交換機(jī)名稱,交換機(jī)類型(fanout:發(fā)布訂閱,direct:routing,topic:主題,headers:header模式))
          ????????????channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM,?BuiltinExchangeType.DIRECT);
          ????????????//?綁定交換機(jī)(隊(duì)列名稱,交換機(jī)名稱,routingKey)
          ????????????channel.queueBind(QUEUE_ROUTING_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTING_EMAIL);
          ????????????channel.queueBind(QUEUE_ROUTING_SMS,EXCHANGE_ROUTING_INFORM,ROUTING_SMS);
          ????????????//?發(fā)送多條消息
          ????????????for?(int?i?=?0;?i?????????????????String?message?=?"hello?charon?good?evening?by?routing?--email";
          ????????????????//?指定交換機(jī)(交換機(jī),RoutingKey,額外的消息屬性,消息內(nèi)容)
          ????????????????channel.basicPublish(EXCHANGE_ROUTING_INFORM,?ROUTING_EMAIL,?null,?message.getBytes());
          ????????????????System.out.println("發(fā)送消息給mq:"?+?message);
          ????????????}
          ????????????//?發(fā)送多條消息
          ????????????for?(int?i?=?0;?i?????????????????String?message?=?"hello?charon?good?evening?by?routing?--sms";
          ????????????????//?指定交換機(jī)(交換機(jī),RoutingKey,額外的消息屬性,消息內(nèi)容)
          ????????????????channel.basicPublish(EXCHANGE_ROUTING_INFORM,?ROUTING_SMS,?null,?message.getBytes());
          ????????????????System.out.println("發(fā)送消息給mq:"?+?message);
          ????????????}
          ????????}?catch?(Exception?e)?{
          ????????????e.printStackTrace();
          ????????}finally?{
          ????????????//?關(guān)閉資源
          ????????????try?{
          ????????????????channel.close();
          ????????????}?catch?(IOException?e)?{
          ????????????????e.printStackTrace();
          ????????????}?catch?(TimeoutException?e)?{
          ????????????????e.printStackTrace();
          ????????????}
          ????????????try?{
          ????????????????connection.close();
          ????????????}?catch?(IOException?e)?{
          ????????????????e.printStackTrace();
          ????????????}
          ????????}
          ????}
          }

          消費(fèi)email的消費(fèi)者:

          package?rabbitmq.routing;

          import?com.rabbitmq.client.AMQP;
          import?com.rabbitmq.client.BuiltinExchangeType;
          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.rabbitmq.client.ConnectionFactory;
          import?com.rabbitmq.client.DefaultConsumer;
          import?com.rabbitmq.client.Envelope;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          /**
          ?*?@className:?EmailConsumer
          ?*?@description:?路由模式下的email消費(fèi)者
          ?*?@author:?charon
          ?*?@create:?2021-01-07?22:40
          ?*/
          public?class?EmailConsumer?{
          ????/**郵件的隊(duì)列*/
          ????public?static?final?String?QUEUE_ROUTING_EMAIL?=?"queue_routing_email";

          ????/**交換機(jī)*/
          ????public?static?final?String?EXCHANGE_ROUTING_INFORM?=?"exchange_routing_inform";

          ????/**?設(shè)置email的路由key?*/
          ????public?static?final?String?ROUTING_EMAIL?=?"routing_email";

          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????//?創(chuàng)建連接工廠
          ????????ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
          ????????//?設(shè)置ip,端口,因?yàn)槭潜緳C(jī),所以直接設(shè)置為127.0.0.1
          ????????connectionFactory.setHost("127.0.0.1");
          ????????//?web端口默認(rèn)為15672,通信端口為5672
          ????????connectionFactory.setPort(5672);
          ????????//?設(shè)置用戶名和密碼
          ????????connectionFactory.setUsername("guest");
          ????????connectionFactory.setPassword("guest");
          ????????//?設(shè)置虛擬ip,默認(rèn)為/,一個rabbitmq的服務(wù)可以設(shè)置多個虛擬機(jī),每個虛擬機(jī)就相當(dāng)于一個獨(dú)立的mq
          ????????connectionFactory.setVirtualHost("/");
          ????????Connection?connection?=?connectionFactory.newConnection();
          ????????//?創(chuàng)建通道
          ????????Channel?channel?=?connection.createChannel();
          ????????//?聲明隊(duì)列(隊(duì)列名稱,是否持久化,是否排它,是否自動刪除,隊(duì)列的擴(kuò)展參數(shù)比如設(shè)置存活時間等)
          ????????channel.queueDeclare(QUEUE_ROUTING_EMAIL,?true,?false,?false,?null);
          ????????channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM,?BuiltinExchangeType.DIRECT);
          ????????//?綁定隊(duì)列并指明路由key
          ????????channel.queueBind(QUEUE_ROUTING_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTING_EMAIL);
          ????????//?實(shí)現(xiàn)消費(fèi)方法
          ????????DefaultConsumer?defaultConsumer?=?new?DefaultConsumer(channel){
          ????????????/**
          ?????????????*
          ?????????????*?@param?consumerTag?消費(fèi)者標(biāo)簽
          ?????????????*?@param?envelope?信封,可以獲取交換機(jī)等信息
          ?????????????*?@param?properties?消息屬性
          ?????????????*?@param?body?消費(fèi)內(nèi)容,字節(jié)數(shù)組,可以轉(zhuǎn)成字符串
          ?????????????*?@throws?IOException
          ?????????????*/
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?message?=?new?String(body,"utf-8");
          ????????????????System.out.println("收到的email消息是:"+message);
          ????????????}
          ????????};
          ????????//?消費(fèi)消息(隊(duì)列名,是否自動確認(rèn),消費(fèi)方法)
          ????????channel.basicConsume(QUEUE_ROUTING_EMAIL,true,defaultConsumer);
          ????}
          }

          消費(fèi)短信的消費(fèi)者:

          package?rabbitmq.routing;

          import?com.rabbitmq.client.AMQP;
          import?com.rabbitmq.client.BuiltinExchangeType;
          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.rabbitmq.client.ConnectionFactory;
          import?com.rabbitmq.client.DefaultConsumer;
          import?com.rabbitmq.client.Envelope;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          /**
          ?*?@className:?EmailConsumer
          ?*?@description:?路由模式下的email消費(fèi)者
          ?*?@author:?charon
          ?*?@create:?2021-01-07?22:40
          ?*/
          public?class?SmsConsumer?{
          ????/**郵件的隊(duì)列*/
          ????public?static?final?String?QUEUE_ROUTING_SMS?=?"queue_routing_sms";

          ????/**交換機(jī)*/
          ????public?static?final?String?EXCHANGE_ROUTING_INFORM?=?"exchange_routing_inform";

          ????/**?設(shè)置email的路由key?*/
          ????public?static?final?String?ROUTING_SMS?=?"routing_sms";

          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????//?創(chuàng)建連接工廠
          ????????ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
          ????????//?設(shè)置ip,端口,因?yàn)槭潜緳C(jī),所以直接設(shè)置為127.0.0.1
          ????????connectionFactory.setHost("127.0.0.1");
          ????????//?web端口默認(rèn)為15672,通信端口為5672
          ????????connectionFactory.setPort(5672);
          ????????//?設(shè)置用戶名和密碼
          ????????connectionFactory.setUsername("guest");
          ????????connectionFactory.setPassword("guest");
          ????????//?設(shè)置虛擬ip,默認(rèn)為/,一個rabbitmq的服務(wù)可以設(shè)置多個虛擬機(jī),每個虛擬機(jī)就相當(dāng)于一個獨(dú)立的mq
          ????????connectionFactory.setVirtualHost("/");
          ????????Connection?connection?=?connectionFactory.newConnection();
          ????????//?創(chuàng)建通道
          ????????Channel?channel?=?connection.createChannel();
          ????????//?聲明隊(duì)列(隊(duì)列名稱,是否持久化,是否排它,是否自動刪除,隊(duì)列的擴(kuò)展參數(shù)比如設(shè)置存活時間等)
          ????????channel.queueDeclare(QUEUE_ROUTING_SMS,?true,?false,?false,?null);
          ????????channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM,?BuiltinExchangeType.DIRECT);
          ????????//?綁定隊(duì)列并指明路由key
          ????????channel.queueBind(QUEUE_ROUTING_SMS,EXCHANGE_ROUTING_INFORM,ROUTING_SMS);
          ????????//?實(shí)現(xiàn)消費(fèi)方法
          ????????DefaultConsumer?defaultConsumer?=?new?DefaultConsumer(channel){
          ????????????/**
          ?????????????*
          ?????????????*?@param?consumerTag?消費(fèi)者標(biāo)簽
          ?????????????*?@param?envelope?信封,可以獲取交換機(jī)等信息
          ?????????????*?@param?properties?消息屬性
          ?????????????*?@param?body?消費(fèi)內(nèi)容,字節(jié)數(shù)組,可以轉(zhuǎn)成字符串
          ?????????????*?@throws?IOException
          ?????????????*/
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?message?=?new?String(body,"utf-8");
          ????????????????System.out.println("收到的短信消息是:"+message);
          ????????????}
          ????????};
          ????????//?消費(fèi)消息(隊(duì)列名,是否自動確認(rèn),消費(fèi)方法)
          ????????channel.basicConsume(QUEUE_ROUTING_SMS,true,defaultConsumer);
          ????}
          }

          4.Topic 主題模式

          1. 星號井號代表通配符

          2. 星號代表一個單詞,井號代表一個或多個單詞

          3. 路由功能添加模糊匹配

          4. 消息產(chǎn)生者產(chǎn)生消息,把消息交給交換機(jī)

          5. 交換機(jī)根據(jù)key的規(guī)則模糊匹配到對應(yīng)的隊(duì)列,由隊(duì)列的監(jiān)聽消費(fèi)者接收消息消費(fèi)

          特點(diǎn):
          1.每個消費(fèi)者監(jiān)聽自己的隊(duì)列,并且設(shè)置帶通配符的routingkey
          2.生產(chǎn)者將消息發(fā)送給broker,由交換機(jī)及根據(jù)路由key來轉(zhuǎn)發(fā)消息到指定的隊(duì)列

          5.Header 轉(zhuǎn)發(fā)器

          取消了路由key,使用header中的key/value(鍵值對)來匹配隊(duì)列。

          6.RPC 遠(yuǎn)程調(diào)用

          基于direct類型交換機(jī)實(shí)現(xiàn)。生產(chǎn)者將消息遠(yuǎn)程發(fā)送給rpc隊(duì)列,消費(fèi)者監(jiān)聽rpc消息隊(duì)列的消息并消息,然后將返回結(jié)果放入到響應(yīng)隊(duì)列中,生產(chǎn)者監(jiān)聽響應(yīng)隊(duì)列中的消息,拿到消費(fèi)者的處理結(jié)果,實(shí)現(xiàn)遠(yuǎn)程RPC遠(yuǎn)程調(diào)用。

          參考文件:

          https://www.cnblogs.com/Jeely/p/10784013.html
          https://lovnx.blog.csdn.net/article/details/70991021

          本文版權(quán)歸Charon和博客園共有,原創(chuàng)文章,歡迎轉(zhuǎn)載,但未經(jīng)作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責(zé)任的權(quán)利。





          粉絲福利:Java從入門到入土學(xué)習(xí)路線圖

          ??????

          ??長按上方微信二維碼?2 秒


          感謝點(diǎn)贊支持下哈?

          瀏覽 56
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩成人无码人妻 | 欧美成人三级在线 | 岛国成人电影在线网站 | 婷婷丁香五月花 | 日韩av手机在线 日韩va在线观看 |