<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RabbitMQ 最常用的 3 大模式!

          共 9193字,需瀏覽 19分鐘

           ·

          2020-09-19 05:12

          Java技術(shù)棧

          www.javastack.cn

          關(guān)注閱讀更多優(yōu)質(zhì)文章



          作者:海向

          出處:www.cnblogs.com/haixiang/p/10864339.html

          Direct 模式

          • 所有發(fā)送到 Direct Exchange 的消息被轉(zhuǎn)發(fā)到 RouteKey 中指定的 Queue。

          • Direct 模式可以使用 RabbitMQ 自帶的 Exchange: default Exchange,所以不需要將 Exchange 進(jìn)行任何綁定(binding)操作。

          • 消息傳遞時(shí),RouteKey 必須完全匹配才會(huì)被隊(duì)列接收,否則該消息會(huì)被拋棄,

          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.rabbitmq.client.ConnectionFactory;

          public?class?DirectProducer?{
          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????//1.?創(chuàng)建一個(gè)?ConnectionFactory?并進(jìn)行設(shè)置
          ????????ConnectionFactory?factory?=?new?ConnectionFactory();
          ????????factory.setHost("localhost");
          ????????factory.setVirtualHost("/");
          ????????factory.setUsername("guest");
          ????????factory.setPassword("guest");

          ????????//2.?通過(guò)連接工廠來(lái)創(chuàng)建連接
          ????????Connection?connection?=?factory.newConnection();

          ????????//3.?通過(guò)?Connection?來(lái)創(chuàng)建?Channel
          ????????Channel?channel?=?connection.createChannel();

          ????????//4.?聲明
          ????????String?exchangeName?=?"test_direct_exchange";
          ????????String?routingKey?=?"item.direct";

          ????????//5.?發(fā)送
          ????????String?msg?=?"this?is?direct?msg";
          ????????channel.basicPublish(exchangeName,?routingKey,?null,?msg.getBytes());
          ????????System.out.println("Send?message?:?"?+?msg);

          ????????//6.?關(guān)閉連接
          ????????channel.close();
          ????????connection.close();
          ????}
          }
          import?com.rabbitmq.client.*;
          import?java.io.IOException;

          public?class?DirectConsumer?{

          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????//1.?創(chuàng)建一個(gè)?ConnectionFactory?并進(jìn)行設(shè)置
          ????????ConnectionFactory?factory?=?new?ConnectionFactory();
          ????????factory.setHost("localhost");
          ????????factory.setVirtualHost("/");
          ????????factory.setUsername("guest");
          ????????factory.setPassword("guest");
          ????????factory.setAutomaticRecoveryEnabled(true);
          ????????factory.setNetworkRecoveryInterval(3000);
          ??????
          ????????//2.?通過(guò)連接工廠來(lái)創(chuàng)建連接
          ????????Connection?connection?=?factory.newConnection();

          ????????//3.?通過(guò)?Connection?來(lái)創(chuàng)建?Channel
          ????????Channel?channel?=?connection.createChannel();

          ????????//4.?聲明
          ????????String?exchangeName?=?"test_direct_exchange";
          ????????String?queueName?=?"test_direct_queue";
          ????????String?routingKey?=?"item.direct";
          ????????channel.exchangeDeclare(exchangeName,?"direct",?true,?false,?null);
          ????????channel.queueDeclare(queueName,?false,?false,?false,?null);

          ????????//一般不用代碼綁定,在管理界面手動(dòng)綁定
          ????????channel.queueBind(queueName,?exchangeName,?routingKey);

          ????????//5.?創(chuàng)建消費(fèi)者并接收消息
          ????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,
          ???????????????????????????????????????AMQP.BasicProperties?properties,?byte[]?body)
          ????????????????????throws?IOException?{
          ????????????????String?message?=?new?String(body,?"UTF-8");
          ????????????????System.out.println("?[x]?Received?'"?+?message?+?"'");
          ????????????}
          ????????};

          ????????//6.?設(shè)置?Channel?消費(fèi)者綁定隊(duì)列
          ????????channel.basicConsume(queueName,?true,?consumer);

          ????}
          }
          ?Send?message?:?this?is?direct?msg
          ?
          ?[x]?Received?'this?is?direct?msg'

          Topic 模式

          可以使用通配符進(jìn)行模糊匹配

          • 符號(hào)'#" 匹配一個(gè)或多個(gè)詞

          • 符號(hào)"*”匹配不多不少一個(gè)詞

          例如:

          • 'log.#"能夠匹配到'log.info.oa"

          • "log.*"只會(huì)匹配到"log.erro“

          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.rabbitmq.client.ConnectionFactory;

          public?class?TopicProducer?{

          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????//1.?創(chuàng)建一個(gè)?ConnectionFactory?并進(jìn)行設(shè)置
          ????????ConnectionFactory?factory?=?new?ConnectionFactory();
          ????????factory.setHost("localhost");
          ????????factory.setVirtualHost("/");
          ????????factory.setUsername("guest");
          ????????factory.setPassword("guest");

          ????????//2.?通過(guò)連接工廠來(lái)創(chuàng)建連接
          ????????Connection?connection?=?factory.newConnection();

          ????????//3.?通過(guò)?Connection?來(lái)創(chuàng)建?Channel
          ????????Channel?channel?=?connection.createChannel();

          ????????//4.?聲明
          ????????String?exchangeName?=?"test_topic_exchange";
          ????????String?routingKey1?=?"item.update";
          ????????String?routingKey2?=?"item.delete";
          ????????String?routingKey3?=?"user.add";

          ????????//5.?發(fā)送
          ????????String?msg?=?"this?is?topic?msg";
          ????????channel.basicPublish(exchangeName,?routingKey1,?null,?msg.getBytes());
          ????????channel.basicPublish(exchangeName,?routingKey2,?null,?msg.getBytes());
          ????????channel.basicPublish(exchangeName,?routingKey3,?null,?msg.getBytes());
          ????????System.out.println("Send?message?:?"?+?msg);

          ????????//6.?關(guān)閉連接
          ????????channel.close();
          ????????connection.close();
          ????}
          }
          import?com.rabbitmq.client.*;
          import?java.io.IOException;

          public?class?TopicConsumer?{

          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????//1.?創(chuàng)建一個(gè)?ConnectionFactory?并進(jìn)行設(shè)置
          ????????ConnectionFactory?factory?=?new?ConnectionFactory();
          ????????factory.setHost("localhost");
          ????????factory.setVirtualHost("/");
          ????????factory.setUsername("guest");
          ????????factory.setPassword("guest");
          ????????factory.setAutomaticRecoveryEnabled(true);
          ????????factory.setNetworkRecoveryInterval(3000);

          ????????//2.?通過(guò)連接工廠來(lái)創(chuàng)建連接
          ????????Connection?connection?=?factory.newConnection();

          ????????//3.?通過(guò)?Connection?來(lái)創(chuàng)建?Channel
          ????????Channel?channel?=?connection.createChannel();

          ????????//4.?聲明
          ????????String?exchangeName?=?"test_topic_exchange";
          ????????String?queueName?=?"test_topic_queue";
          ????????String?routingKey?=?"item.#";
          ????????channel.exchangeDeclare(exchangeName,?"topic",?true,?false,?null);
          ????????channel.queueDeclare(queueName,?false,?false,?false,?null);

          ????????//一般不用代碼綁定,在管理界面手動(dòng)綁定
          ????????channel.queueBind(queueName,?exchangeName,?routingKey);

          ????????//5.?創(chuàng)建消費(fèi)者并接收消息
          ????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,
          ???????????????????????????????????????AMQP.BasicProperties?properties,?byte[]?body)
          ????????????????????throws?IOException?{
          ????????????????String?message?=?new?String(body,?"UTF-8");
          ????????????????System.out.println("?[x]?Received?'"?+?message?+?"'");
          ????????????}
          ????????};
          ????????//6.?設(shè)置?Channel?消費(fèi)者綁定隊(duì)列
          ????????channel.basicConsume(queueName,?true,?consumer);

          ????}
          }
          Send?message?:?this?is?topc?msg

          [x]?Received?'this?is?topc?msg'
          [x]?Received?'this?is?topc?msg'

          Fanout 模式

          不處理路由鍵,只需要簡(jiǎn)單的將隊(duì)列綁定到交換機(jī)上發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上。系列RabbitMQ教程請(qǐng)關(guān)注公眾號(hào)Java技術(shù)棧獲取閱讀。

          Fanout交換機(jī)轉(zhuǎn)發(fā)消息是最快的。

          import?com.rabbitmq.client.*;
          import?java.io.IOException;

          public?class?FanoutConsumer?{
          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????//1.?創(chuàng)建一個(gè)?ConnectionFactory?并進(jìn)行設(shè)置
          ????????ConnectionFactory?factory?=?new?ConnectionFactory();
          ????????factory.setHost("localhost");
          ????????factory.setVirtualHost("/");
          ????????factory.setUsername("guest");
          ????????factory.setPassword("guest");
          ????????factory.setAutomaticRecoveryEnabled(true);
          ????????factory.setNetworkRecoveryInterval(3000);

          ????????//2.?通過(guò)連接工廠來(lái)創(chuàng)建連接
          ????????Connection?connection?=?factory.newConnection();

          ????????//3.?通過(guò)?Connection?來(lái)創(chuàng)建?Channel
          ????????Channel?channel?=?connection.createChannel();

          ????????//4.?聲明
          ????????String?exchangeName?=?"test_fanout_exchange";
          ????????String?queueName?=?"test_fanout_queue";
          ????????String?routingKey?=?"item.#";
          ????????channel.exchangeDeclare(exchangeName,?"fanout",?true,?false,?null);
          ????????channel.queueDeclare(queueName,?false,?false,?false,?null);

          ????????//一般不用代碼綁定,在管理界面手動(dòng)綁定
          ????????channel.queueBind(queueName,?exchangeName,?routingKey);

          ????????//5.?創(chuàng)建消費(fèi)者并接收消息
          ????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,
          ???????????????????????????????????????AMQP.BasicProperties?properties,?byte[]?body)
          ????????????????????throws?IOException?{
          ????????????????String?message?=?new?String(body,?"UTF-8");
          ????????????????System.out.println("?[x]?Received?'"?+?message?+?"'");
          ????????????}
          ????????};

          ????????//6.?設(shè)置?Channel?消費(fèi)者綁定隊(duì)列
          ????????channel.basicConsume(queueName,?true,?consumer);
          ????}
          }
          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.rabbitmq.client.ConnectionFactory;
          public?class?FanoutProducer?{

          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????//1.?創(chuàng)建一個(gè)?ConnectionFactory?并進(jìn)行設(shè)置
          ????????ConnectionFactory?factory?=?new?ConnectionFactory();
          ????????factory.setHost("localhost");
          ????????factory.setVirtualHost("/");
          ????????factory.setUsername("guest");
          ????????factory.setPassword("guest");

          ????????//2.?通過(guò)連接工廠來(lái)創(chuàng)建連接
          ????????Connection?connection?=?factory.newConnection();

          ????????//3.?通過(guò)?Connection?來(lái)創(chuàng)建?Channel
          ????????Channel?channel?=?connection.createChannel();

          ????????//4.?聲明
          ????????String?exchangeName?=?"test_fanout_exchange";
          ????????String?routingKey1?=?"item.update";
          ????????String?routingKey2?=?"";
          ????????String?routingKey3?=?"ookjkjjkhjhk";//任意routingkey

          ????????//5.?發(fā)送
          ????????String?msg?=?"this?is?fanout?msg";
          ????????channel.basicPublish(exchangeName,?routingKey1,?null,?msg.getBytes());
          ????????channel.basicPublish(exchangeName,?routingKey2,?null,?msg.getBytes());
          ????????channel.basicPublish(exchangeName,?routingKey3,?null,?msg.getBytes());
          ????????System.out.println("Send?message?:?"?+?msg);

          ????????//6.?關(guān)閉連接
          ????????channel.close();
          ????????connection.close();
          ????}
          }
          Send?message?:?this?is?fanout?msg

          [x]?Received?'this?is?fanout?msg'
          [x]?Received?'this?is?fanout?msg'
          [x]?Received?'this?is?fanout?msg'




          關(guān)注Java技術(shù)??锤喔韶?/strong>



          戳原文,獲取更多福利!
          瀏覽 53
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  北条麻妃中文字幕一区 | 中文字幕日韩欧美在线 | www三级免费 | www.欧美一区 | 插逼丝袜视频 |