<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RabbitMQ五種工作模式

          共 5650字,需瀏覽 12分鐘

           ·

          2021-02-23 10:57

          點擊上方藍色字體,選擇“標(biāo)星公眾號”

          優(yōu)質(zhì)文章,第一時間送達

          ? 作者?|? howard4

          來源 |? urlify.cn/UB3miu

          76套java從入門到精通實戰(zhàn)課程分享

          在SpringBoot環(huán)境下做的代碼測試,RabbitMQ的包是用SpringBoot的starter-amqp包引入的。

          1、簡單隊列

            

            一個生產(chǎn)者對應(yīng)一個消費者?。?!

            1、pom文件

            SpringBoot導(dǎo)入rabbitmq 啟動包


          ????org.springframework.boot
          ????spring-boot-starter-amqp

            2、工具類

          /**
          ?*?〈簡述〉

          ?*?〈連接RabbitMQ的工具類〉
          ?*
          ?*?@create?2020/7/1
          ?*?@since?1.0.0
          ?*/
          public?class?ConnectionUtil?{
          ????public?static?Connection?getConnection()?throws?Exception?{
          ????????return?getConnection(new?Properties());
          ????}

          ????private?static?Connection?getConnection(Properties?properties)?throws?Exception?{
          ????????return?getConnection(properties.getHost(),
          ????????????????properties.getPort(),
          ????????????????properties.getvHost(),
          ????????????????properties.getUserName(),
          ????????????????properties.getPassWord());
          ????}

          ????public?static?Connection?getConnection(String?host,?int?port,?String?vHost,?String?userName,?String?passWord)?throws?Exception?{
          ????????//1、定義連接工廠
          ????????ConnectionFactory?factory?=?new?ConnectionFactory();
          ????????//2、設(shè)置服務(wù)器地址
          ????????factory.setHost(host);
          ????????//3、設(shè)置端口
          ????????factory.setPort(port);
          ????????//4、設(shè)置虛擬主機、用戶名、密碼
          ????????factory.setVirtualHost(vHost);
          ????????factory.setUsername(userName);
          ????????factory.setPassword(passWord);
          ????????//5、通過連接工廠獲取連接
          ????????Connection?connection?=?factory.newConnection();
          ????????return?connection;
          ????}

          ????public?static?class?Properties?implements?Serializable?{
          ????????String?host?=?"192.168.1.103";
          ????????int?port?=?5672;
          ????????String?vHost?=??"/";
          ????????String?userName?=?"guest";
          ????????String?passWord?=?"guest";

          ????????public?Properties()?{
          ????????}

          ????????public?Properties(String?host,?int?port,?String?vHost,?String?userName,?String?passWord)?{
          ????????????this.host?=?host;
          ????????????this.port?=?port;
          ????????????this.vHost?=?vHost;
          ????????????this.userName?=?userName;
          ????????????this.passWord?=?passWord;
          ????????}

          ????????public?String?getHost()?{
          ????????????return?host;
          ????????}

          ????????public?Properties?setHost(String?host)?{
          ????????????this.host?=?host;
          ????????????return?self();
          ????????}

          ????????public?int?getPort()?{
          ????????????return?port;
          ????????}

          ????????public?Properties?setPort(int?port)?{
          ????????????this.port?=?port;
          ????????????return?self();
          ????????}

          ????????public?String?getvHost()?{
          ????????????return?vHost;
          ????????}

          ????????public?Properties?setvHost(String?vHost)?{
          ????????????this.vHost?=?vHost;
          ????????????return?self();
          ????????}

          ????????public?String?getUserName()?{
          ????????????return?userName;
          ????????}

          ????????public?Properties?setUserName(String?userName)?{
          ????????????this.userName?=?userName;
          ????????????return?self();
          ????????}

          ????????public?String?getPassWord()?{
          ????????????return?passWord;
          ????????}

          ????????public?Properties?setPassWord(String?passWord)?{
          ????????????this.passWord?=?passWord;
          ????????????return?self();
          ????????}

          ????????private?Properties?self(){
          ????????????return?this;
          ????????}
          ????}
          }

            3、生產(chǎn)者 Producer

          /**
          ?*?〈簡述〉
          ?
          ?*?〈簡單隊列——消息生產(chǎn)者〉
          ?*
          ?*?@create?2020/7/1
          ?*?@since?1.0.0
          ?*/
          public?class?Producer?{
          ????private?final?static?String?QUEUE_NAME?=?QueueName.test_simple_queue.toString();

          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????sendMessage();
          ????}

          ????public?static?void?sendMessage()?throws?Exception?{
          ????????//1、獲取連接
          ????????Connection?connection?=?ConnectionUtil.getConnection();
          ????????//2、聲明信道
          ????????Channel?channel?=?connection.createChannel();
          ????????//3、聲明(創(chuàng)建)隊列
          ????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
          ????????//4、定義消息內(nèi)容
          ????????String?message?=?"hello?rabbitmq?";
          ????????//5、發(fā)布消息
          ????????channel.basicPublish("",?QUEUE_NAME,?null,?message.getBytes());
          ????????System.out.println("[x]?Sent'"?+?message?+?"'");
          ????????//6、關(guān)閉通道
          ????????channel.close();
          ????????//7、關(guān)閉連接
          ????????connection.close();
          ????}
          }

           4、消費者Consumer

          /**
          ?*?〈簡述〉

          ?*?〈消息消費者〉
          ?*
          ?*?@create?2020/7/1
          ?*?@since?1.0.0
          ?*/
          public?class?Customer?{

          ????private?final?static?String?QUEUE_NAME?=?QueueName.test_simple_queue.toString();

          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????getMessage();

          ????}

          ????public?static?void?getMessage()?throws?Exception?{
          ????????//1、獲取連接
          ????????Connection?connection?=?ConnectionUtil.getConnection();
          ????????//2、聲明通道
          ????????Channel?channel?=?connection.createChannel();
          ????????//3、聲明隊列
          ????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
          ????????//4、定義隊列的消費者
          ????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,
          ???????????????????????????????????????AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?msgString?=?new?String(body,?"utf-8");
          ????????????????System.out.println("接收的消息:"?+?msgString);
          ????????????}
          ????????};
          ????????//5、監(jiān)聽隊列
          ?????/*
          ???true:表示自動確認,只要消息從隊列中獲取,無論消費者獲取到消息后是否成功消費,都會認為消息已經(jīng)成功消費
          ???false:表示手動確認,消費者獲取消息后,服務(wù)器會將該消息標(biāo)記為不可用狀態(tài),等待消費者的反饋,
          ????如果消費者一直沒有反饋,那么該消息將一直處于不可用狀態(tài),并且服務(wù)器會認為該消費者已經(jīng)掛掉,不會再給其
          ????發(fā)送消息,直到該消費者反饋。
          ???*/
          ????????channel.basicConsume(QUEUE_NAME,?true,?consumer);
          ????}
          }

            注意這里消費者有自動確認消息和手動確認消息兩種模式。

          2、work 模式

            一個生產(chǎn)者對應(yīng)多個消費者,但是一條消息只能有一個消費者獲得消息?。?!

          2.1輪詢分發(fā)

            1、生產(chǎn)者

          /**
          ?*?〈簡述〉

          ?*?〈輪詢分發(fā)——生產(chǎn)者〉
          ?*
          ?*?@create?2020/7/3
          ?*?@since?1.0.0
          ?*/
          public?class?Send?{
          ????private?static?final?String?QUEUE_NAME?=?QueueName.test_work_queue.toString();

          ????public?static?void?main(String?args[])?throws?Exception?{
          ????????//獲取連接
          ????????Connection?connection?=?ConnectionUtil.getConnection();
          ????????//獲取channel
          ????????Channel?channel?=?connection.createChannel();
          ????????//聲明隊列
          ????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
          ????????for?(int?i?=?0;?i?????????????String?msg?=?"hello?"?+?i;
          ????????????System.out.println("[mq]?send:"?+?msg);
          ????????????channel.basicPublish("",?QUEUE_NAME,?null,?msg.getBytes());
          ????????????Thread.sleep(i?*?20);
          ????????}
          ????????channel.close();
          ????????connection.close();
          ????}

          }

            2、消費者

            這里創(chuàng)建兩個消費者

            消費者1:每接收一條消息后休眠1秒

          /**
          ?*?〈簡述〉

          ?*?〈接收者〉
          ?*
          ?*?@create?2020/7/3
          ?*?@since?1.0.0
          ?*/
          public?class?Receive1?{
          ????private?static?final?String?QUEUE_NAME?=?QueueName.test_work_queue.toString();

          ????public?static?void?main(String?args[])?throws?Exception?{
          ????????//獲取連接
          ????????Connection?connection?=?ConnectionUtil.getConnection();
          ????????//獲取channel、
          ????????Channel?channel?=?connection.createChannel();
          ????????//聲明隊列
          ????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
          ????????//定義一個消費這
          ????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?msg?=?new?String(body,?"utf-8");
          ????????????????System.out.println("[1]?Receive1?msg:"?+?msg);
          ????????????????try?{
          ????????????????????Thread.sleep(1000);
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}?finally?{
          ????????????????????System.out.println("[1]?done");
          ????????????????}
          ????????????}
          ????????};
          ????????boolean?autoAck?=?false;
          ????????channel.basicConsume(QUEUE_NAME,?autoAck,?consumer);
          ????}

          }

            消費者2:每接收一條消息后休眠2秒

          /**
          ?*?〈簡述〉

          ?*?〈接收者〉
          ?*
          ?*?@create?2020/7/3
          ?*?@since?1.0.0
          ?*/
          public?class?Receive2?{
          ????private?static?final?String?QUEUE_NAME?=?QueueName.test_work_queue.toString();

          ????public?static?void?main(String?args[])?throws?Exception?{
          ????????//獲取連接
          ????????Connection?connection?=?ConnectionUtil.getConnection();
          ????????//獲取channel
          ????????Channel?channel?=?connection.createChannel();
          ????????//聲明隊列
          ????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
          ????????//定義一個消費這
          ????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?msg?=?new?String(body,?"utf-8");
          ????????????????System.out.println("[2]?Receive2?msg:"?+?msg);
          ????????????????try?{
          ????????????????????Thread.sleep(2000);
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}?finally?{
          ????????????????????System.out.println("[2]?done");
          ????????????????}
          ????????????}
          ????????};
          ????????boolean?autoAck?=?false;
          ????????channel.basicConsume(QUEUE_NAME,?autoAck,?consumer);
          ????}

          }

            3、測試結(jié)果

            生產(chǎn)者從0-49依次發(fā)送消息

          消費者1:接收到偶數(shù)

            

            消費者2:接收到奇數(shù)

            

            4、結(jié)論

            輪詢分發(fā)就是將消息隊列中的消息,依次發(fā)送給所有消費者。一個消息只能被一個消費者獲取。

          2.2公平分發(fā)

          消費者關(guān)閉自動應(yīng)答,開啟手動回執(zhí)

          /**
          ?*?〈簡述〉

          ?*?〈接收者〉
          ?*
          ?*?@create?2020/7/3
          ?*?@since?1.0.0
          ?*/
          public?class?Receive2?{
          ????private?static?final?String?QUEUE_NAME?=?QueueName.test_work_queue.toString();

          ????public?static?void?main(String?args[])?throws?Exception?{
          ????????//獲取連接
          ????????Connection?connection?=?ConnectionUtil.getConnection();
          ????????//獲取channel
          ????????Channel?channel?=?connection.createChannel();
          ????????//聲明隊列
          ????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
          ????????channel.basicQos(1);//保證一次只分發(fā)一個消息
          ????????//定義一個消費這
          ????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?msg?=?new?String(body,?"utf-8");
          ????????????????System.out.println("[2]?Receive2?msg:"?+?msg);
          ????????????????try?{
          ????????????????????Thread.sleep(2000);
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}?finally?{
          ????????????????????System.out.println("[2]?done");
          ????????????????????//手動回執(zhí)
          ????????????????????channel.basicAck(envelope.getDeliveryTag(),false);
          ????????????????}
          ????????????}
          ????????};
          ????????boolean?autoAck?=?false;//自動應(yīng)答
          ????????channel.basicConsume(QUEUE_NAME,?autoAck,?consumer);
          ????}

          }

            手動回執(zhí):消費者完成業(yè)務(wù)接口方法后可以告知消息隊列處理完成,消息隊列從隊列中取一條消息發(fā)送給消費者。

            

            能者多勞:效率高的消費者消費消息多。

          3、發(fā)布/訂閱模式

            

            一個消費者將消息首先發(fā)送到交換器,交換器綁定到多個隊列,然后被監(jiān)聽該隊列的消費者所接收并消費。

            ps:X表示交換器,在RabbitMQ中,交換器主要有四種類型:direct、fanout、topic、headers,這里的交換器是 fanout。下面我們會詳細介紹這幾種交換器。

            1、生產(chǎn)者

          /**
          ?*?〈簡述〉

          ?*?〈訂閱模式——生產(chǎn)者〉
          ?*
          ?*?@create?2020/7/3
          ?*?@since?1.0.0
          ?*/
          public?class?Send?{
          ????private?static?final?String?EXCHANGE_NAME?=?MqName.exchange_fanout.toString();

          ????public?static?void?main(String?args[])?throws?Exception?{
          ????????//獲取連接
          ????????Connection?connection?=?ConnectionUtil.getConnection();
          ????????//獲取channel
          ????????Channel?channel?=?connection.createChannel();
          ????????//聲明交換機
          ????????channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//分發(fā)
          ????????//發(fā)送消息
          ????????String?msg?=?"hello?exchange";
          ????????System.out.println("[mq]?send:"?+?msg);
          ????????channel.basicPublish(EXCHANGE_NAME,?"",?null,?msg.getBytes());
          ????????channel.close();
          ????????connection.close();
          ????}

          }

            2、消費者

          注意:兩個消費者綁定不同的隊列,綁定相同的交換機;

            消費者1:綁定隊列名=queue_fanout_email1

          /**
          ?*?〈簡述〉

          ?*?〈接收者〉
          ?*
          ?*?@create?2020/7/3
          ?*?@since?1.0.0
          ?*/
          public?class?Receive1?{
          ????private?static?final?String?QUEUE_NAME?=?MqName.queue_fanout_email.toString()?+?"1";
          ????private?static?final?String?EXCHANGE_NAME?=?MqName.exchange_fanout.toString();

          ????public?static?void?main(String?args[])?throws?Exception?{
          ????????//獲取連接
          ????????Connection?connection?=?ConnectionUtil.getConnection();
          ????????//獲取channel
          ????????Channel?channel?=?connection.createChannel();
          ????????//聲明隊列
          ????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
          ????????//綁定到交換機
          ????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"");
          ????????//定義一個消費這
          ????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
          ????????????/**
          ?????????????*?No-op?implementation?of?{@link?Consumer#handleDelivery}.
          ?????????????*
          ?????????????*?@param?consumerTag
          ?????????????*?@param?envelope
          ?????????????*?@param?properties
          ?????????????*?@param?body
          ?????????????*/
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?msg?=?new?String(body,?"utf-8");
          ????????????????System.out.println("[1]?Receive1?msg:"?+?msg);
          ????????????????try?{
          ????????????????????Thread.sleep(1000);
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}?finally?{
          ????????????????????System.out.println("[1]?done");
          ????????????????}
          ????????????}
          ????????};
          ????????boolean?autoAck?=?false;
          ????????channel.basicConsume(QUEUE_NAME,?autoAck,?consumer);
          ????}

          }

            消費者2:綁定隊列名=queue_fanout_email2

          /**
          ?*?〈簡述〉

          ?*?〈接收者〉
          ?*
          ?*?@create?2020/7/3
          ?*?@since?1.0.0
          ?*/
          public?class?Receive2?{
          ????private?static?final?String?QUEUE_NAME?=?MqName.queue_fanout_email.toString()?+?"2";
          ????private?static?final?String?EXCHANGE_NAME?=?MqName.exchange_fanout.toString();

          ????public?static?void?main(String?args[])?throws?Exception?{
          ????????//獲取連接
          ????????Connection?connection?=?ConnectionUtil.getConnection();
          ????????//獲取channel
          ????????Channel?channel?=?connection.createChannel();
          ????????//聲明隊列
          ????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
          ????????//綁定到交換機
          ????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"");
          ????????//定義一個消費這
          ????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
          ????????????/**
          ?????????????*?No-op?implementation?of?{@link?Consumer#handleDelivery}.
          ?????????????*
          ?????????????*?@param?consumerTag
          ?????????????*?@param?envelope
          ?????????????*?@param?properties
          ?????????????*?@param?body
          ?????????????*/
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?msg?=?new?String(body,?"utf-8");
          ????????????????System.out.println("[2]?Receive2?msg:"?+?msg);
          ????????????????try?{
          ????????????????????Thread.sleep(2000);
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}?finally?{
          ????????????????????System.out.println("[2]?done");
          ????????????????}
          ????????????}
          ????????};
          ????????boolean?autoAck?=?false;
          ????????channel.basicConsume(QUEUE_NAME,?autoAck,?consumer);
          ????}

          }

            3、測試結(jié)果

          如上圖,兩個消費者獲得了同一條消息。即就是,一個消息從交換機同時發(fā)送給了兩個隊列中,監(jiān)聽這兩個隊列的消費者消費了這個消息;

          如果沒有隊列綁定交換機,則消息將丟失。因為交換機沒有存儲能力,消息只能存儲在隊列中。

          4、路由模式

            

            生產(chǎn)者將消息發(fā)送到direct交換器,在綁定隊列和交換器的時候有一個路由key,生產(chǎn)者發(fā)送的消息會指定一個路由key,那么消息只會發(fā)送到相應(yīng)key相同的隊列,接著監(jiān)聽該隊列的消費者消費消息。

            也就是讓消費者有選擇性的接收消息。

            1、生產(chǎn)者

          /**
          ?*?〈簡述〉
          ?
          ?*?〈路由模式-消息發(fā)送者〉
          ?*
          ?*?@create?2020/7/8
          ?*?@since?1.0.0
          ?*/
          public?class?Send?{

          ????public?static?final?String?EXCHANGE_NAME?=?MqName.exchange_routing.toString();
          ????public?static?final?String?ROUTING_KEY?=?MqName.routing_world.toString();

          ????public?static?void?main(String?args[])?throws?Exception?{
          ????????//獲取連接
          ????????Connection?connection?=?ConnectionUtil.getConnection();
          ????????//獲取channel
          ????????Channel?channel?=?connection.createChannel();
          ????????//聲明交換機
          ????????channel.exchangeDeclare(EXCHANGE_NAME,?"direct");
          ????????String?msg?=?"route?message?->"?+?ROUTING_KEY;
          ????????System.out.println("對?"?+?ROUTING_KEY?+?"?發(fā)送消息:"?+?msg);
          ????????channel.basicPublish(EXCHANGE_NAME,?ROUTING_KEY,?null,?msg.getBytes());
          ????????//關(guān)閉連接
          ????????channel.close();
          ????????connection.close();
          ????}

          }

            2、消費者

          注意:兩個消費者,綁定相同的交換機,不同的隊列,不一樣的路由

            消費者1:路由=routing_hello

          /**
          ?*?〈簡述〉

          ?*?〈接收消息1〉
          ?*
          ?*?@create?2020/7/8
          ?*?@since?1.0.0
          ?*/
          public?class?Receive1?{

          ????public?static?final?String?QUEUE_NAME?=?MqName.queue_routing_001.toString();
          ????public?static?final?String?EXCHANGE_NAME?=?MqName.exchange_routing.toString();
          ????public?static?final?String?ROUTING_KEY_hello?=?MqName.routing_hello.toString();

          ????public?static?void?main(String?args[])?throws?Exception?{
          ????????//獲取連接
          ????????Connection?connection?=?ConnectionUtil.getConnection();
          ????????//獲取channel
          ????????Channel?channel?=?connection.createChannel();
          ????????//聲明隊列
          ????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
          ????????//設(shè)置預(yù)讀取數(shù)
          ????????channel.basicQos(1);
          ????????//綁定交換機
          ????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?ROUTING_KEY_hello);
          ????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
          ????????????/**
          ?????????????*?No-op?implementation?of?{@link?Consumer#handleDelivery}.
          ?????????????*
          ?????????????*?@param?consumerTag
          ?????????????*?@param?envelope
          ?????????????*?@param?properties
          ?????????????*?@param?body
          ?????????????*/
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?msg?=?new?String(body,?"utf-8");
          ????????????????System.out.println(envelope.getRoutingKey()?+?"?[1]?Receive1?msg:"?+?msg);
          ????????????}
          ????????};
          ????????channel.basicConsume(QUEUE_NAME,?true,?consumer);
          ????}

          }

            消費者2:路由1=routing_world 路由2=routing_hello

          /**
          ?*?〈簡述〉

          ?*?〈接收消息2〉
          ?*
          ?*?@create?2020/7/8
          ?*?@since?1.0.0
          ?*/
          public?class?Receive2?{

          ????public?static?final?String?QUEUE_NAME?=?MqName.queue_routing_002.toString();
          ????public?static?final?String?EXCHANGE_NAME?=?MqName.exchange_routing.toString();
          ????public?static?final?String?ROUTING_KEY_world?=?MqName.routing_world.toString();
          ????public?static?final?String?ROUTING_KEY_hello?=?MqName.routing_hello.toString();

          ????public?static?void?main(String?args[])?throws?Exception?{
          ????????//獲取連接
          ????????Connection?connection?=?ConnectionUtil.getConnection();
          ????????//獲取channel
          ????????Channel?channel?=?connection.createChannel();
          ????????//聲明隊列
          ????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
          ????????//設(shè)置預(yù)讀取數(shù)
          ????????channel.basicQos(1);
          ????????//綁定交換機和路由器,可以綁定多個路由
          ????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?ROUTING_KEY_world);
          ????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?ROUTING_KEY_hello);
          ????????//定義消息消費者
          ????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
          ????????????/**
          ?????????????*?No-op?implementation?of?{@link?Consumer#handleDelivery}.
          ?????????????*
          ?????????????*?@param?consumerTag
          ?????????????*?@param?envelope
          ?????????????*?@param?properties
          ?????????????*?@param?body
          ?????????????*/
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?msg?=?new?String(body,?"utf-8");
          ????????????????System.out.println(envelope.getRoutingKey()?+?"?[2]?Receive1?msg:"?+?msg);
          ????????????}
          ????????};
          ????????//接收消息
          ????????channel.basicConsume(QUEUE_NAME,?true,?consumer);
          ????}

          }

            3、測試結(jié)果

          生產(chǎn)者發(fā)送:routing_world

          消費者1:沒有接收到

          消費者2:接收到了


          生產(chǎn)者發(fā)送:routing_hello

          消費者1:接收到了

          消費者2:接收到了

            路由模式,是以路由規(guī)則為導(dǎo)向,引導(dǎo)消息存入符合規(guī)則的隊列中。再由隊列的消費者進行消費的。

          5、主題模式

           

            上面的路由模式是根據(jù)路由key進行完整的匹配(完全相等才發(fā)送消息),這里的通配符模式通俗的來講就是模糊匹配。

            符號“#”表示匹配一個或多個詞,符號“*”表示匹配一個詞。

            1、生產(chǎn)者

          /**
          ?*?〈簡述〉
          ?
          ?*?〈主題模式-消息發(fā)送者〉
          ?*
          ?*?@create?2020/7/8
          ?*?@since?1.0.0
          ?*/
          public?class?Send?{

          ????public?static?final?String?EXCHANGE_NAME?=?MqName.exchange_topic.toString();
          ????public?static?final?String?ROUTING_KEY?=?MqName.routing_goods.toString();

          ????public?static?void?main(String?args[])?throws?Exception?{
          ????????//獲取連接
          ????????Connection?connection?=?ConnectionUtil.getConnection();
          ????????//獲取channel
          ????????Channel?channel?=?connection.createChannel();
          ????????//聲明交換機
          ????????channel.exchangeDeclare(EXCHANGE_NAME,?"topic");
          //????????String?routingKey?=?ROUTING_KEY?+?".add";
          ????????String?routingKey?=?ROUTING_KEY?+?".publish";
          //????????String?routingKey?=?ROUTING_KEY?+?".update";

          ????????String?msg?=?"route?message?->"?+?routingKey;
          ????????channel.basicPublish(EXCHANGE_NAME,?routingKey,?null,?msg.getBytes());
          ????????System.out.println("對?"?+?routingKey?+?"?發(fā)送消息:"?+?msg);
          ????????//關(guān)閉連接
          ????????channel.close();
          ????????connection.close();
          ????}

          }

            2、消費者

          注意兩個消費者,路由的不同

            消費者1:路由1=routing_goods.add 路由2=routing_goods.update

          /**
          ?*?〈簡述〉

          ?*?〈接收消息1〉
          ?*
          ?*?@create?2020/7/8
          ?*?@since?1.0.0
          ?*/
          public?class?Receive1?{

          ????public?static?final?String?QUEUE_NAME?=?MqName.queue_topic_001.toString();
          ????public?static?final?String?EXCHANGE_NAME?=?MqName.exchange_topic.toString();
          ????public?static?final?String?ROUTING_KEY?=?MqName.routing_goods.toString();

          ????public?static?void?main(String?args[])?throws?Exception?{
          ????????//獲取連接
          ????????Connection?connection?=?ConnectionUtil.getConnection();
          ????????//獲取channel
          ????????Channel?channel?=?connection.createChannel();
          ????????//聲明隊列
          ????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
          ????????//設(shè)置預(yù)讀取數(shù)
          ????????channel.basicQos(1);
          ????????//綁定交換機
          ????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?ROUTING_KEY?+?".add");
          ????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?ROUTING_KEY?+?".update");
          ????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
          ????????????/**
          ?????????????*?No-op?implementation?of?{@link?Consumer#handleDelivery}.
          ?????????????*
          ?????????????*?@param?consumerTag
          ?????????????*?@param?envelope
          ?????????????*?@param?properties
          ?????????????*?@param?body
          ?????????????*/
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?msg?=?new?String(body,?"utf-8");
          ????????????????System.out.println(envelope.getRoutingKey()?+?"?[1]?Receive1?msg:"?+?msg);
          ????????????}
          ????????};
          ????????channel.basicConsume(QUEUE_NAME,?true,?consumer);
          ????}

          }

            消費者2:路由=routing_goods.*

          /**
          ?*?〈簡述〉

          ?*?〈接收消息2〉
          ?*
          ?*?@create?2020/7/8
          ?*?@since?1.0.0
          ?*/
          public?class?Receive2?{

          ????public?static?final?String?QUEUE_NAME?=?MqName.queue_routing_002.toString();
          ????public?static?final?String?EXCHANGE_NAME?=?MqName.exchange_topic.toString();
          ????public?static?final?String?ROUTING_KEY?=?MqName.routing_goods.toString();

          ????public?static?void?main(String?args[])?throws?Exception?{
          ????????//獲取連接
          ????????Connection?connection?=?ConnectionUtil.getConnection();
          ????????//獲取channel
          ????????Channel?channel?=?connection.createChannel();
          ????????//聲明隊列
          ????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
          ????????//設(shè)置預(yù)讀取數(shù)
          ????????channel.basicQos(1);
          ????????//綁定交換機和路由器,可以綁定多個路由
          ????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?ROUTING_KEY?+?".*");
          ????????//定義消息消費者
          ????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
          ????????????/**
          ?????????????*?No-op?implementation?of?{@link?Consumer#handleDelivery}.
          ?????????????*
          ?????????????*?@param?consumerTag
          ?????????????*?@param?envelope
          ?????????????*?@param?properties
          ?????????????*?@param?body
          ?????????????*/
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?msg?=?new?String(body,?"utf-8");
          ????????????????System.out.println(envelope.getRoutingKey()?+?"?[2]?Receive1?msg:"?+?msg);
          ????????????}
          ????????};
          ????????//接收消息
          ????????channel.basicConsume(QUEUE_NAME,?true,?consumer);
          ????}

          }

            3、測試結(jié)果

          消費者1只能接收到.add 和 .update的消息

          消費者2可以接收到.add .publish 和 .update的消息

          與路由模式相似,但是,主題模式是一種模糊的匹配方式。

          6.工作模式總結(jié)

          這五種工作模式,可以歸為三類:

          1. 生產(chǎn)者,消息隊列,一個消費者;

          2. 生產(chǎn)者,消息隊列,多個消費者;

          3. 生產(chǎn)者,交換機,多個消息隊列,多個消費者;

          7、四種交換器

            1、direct 如果路由鍵完全匹配的話,消息才會被投放到相應(yīng)的隊列?!?/span>

            2、fanout 當(dāng)發(fā)送一條消息到fanout交換器上時,它會把消息投放到所有附加在此交換器上的隊列。

            3、topic 設(shè)置模糊的綁定方式,“*”操作符將“.”視為分隔符,匹配單個字符;“#”操作符沒有分塊的概念,它將任意“.”均視為關(guān)鍵字的匹配部分,能夠匹配多個字符。

          ?4、header headers 交換器允許匹配 AMQP 消息的 header 而非路由鍵,除此之外,header 交換器和 direct 交換器完全一致,但是性能卻差很多,因此基本上不會用到該交換器






          粉絲福利:Java從入門到入土學(xué)習(xí)路線圖

          ??????

          ??長按上方微信二維碼?2 秒


          感謝點贊支持下哈?

          瀏覽 43
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩无码AV一区二区三区 | www亚洲天堂 | 国产乱弄免费视频观看 | 插插插大香蕉 | 18禁免费网址 |