<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          rabbitMq工作模式特性及整合springboot

          共 52599字,需瀏覽 106分鐘

           ·

          2021-04-01 02:38

          點擊上方藍色字體,選擇“標(biāo)星公眾號”

          優(yōu)質(zhì)文章,第一時間送達

          因為公司項目后面需要用到mq做數(shù)據(jù)的同步,所以學(xué)習(xí)mq并在此記錄,這里的是rabbitMq


          mq(message queue)消息隊列

          官網(wǎng):www.rabbitmq.com
          使用消息隊列的優(yōu)點:
              1、異步可加快訪問速度 (以前一個訂單接口需要做下單、庫存、付款、快遞等相關(guān)操作,有了mq只需要給相關(guān)信息傳入隊列,下單、庫存、付款、快遞等相關(guān)操作會自動從隊列中收到信息進行異步操作)
              2、解耦下游服務(wù)或其他服務(wù)或語言可接入
              3、削峰高并發(fā)訪問量可分攤多個隊列分攤
          缺點:
              1、系統(tǒng)可用性降低(一旦mq掛了系統(tǒng)就宕機了)
              2、系統(tǒng)復(fù)雜性增大 (增加了mq模塊需要考慮更多)

          RabbitMQ的高級特性

          • 消費端限流

          • TTL 全稱time to live(存活時間/過期時間) - 當(dāng)消息到達存活時間后還沒被消費會被丟棄 ttl+死信隊列可實現(xiàn)延遲隊列效果

          • 死信隊列

          • 延遲隊列

          • 消息可靠性投遞

          • Consumer ACK

          rabbitMq為了確保消息投遞的可靠性提供了兩種方式 confirm和return

          rabbitmq整個消息投遞的路徑為
          producer--->rabbitmq broker--->exchange--->queue--->consumer
          1.消息從producer到exchange則會返回一個confirmCallback.
          2.消息從exchange到queue投遞失敗則會返回一個returnCallBack.
          我們將利用這兩個callback控制消息的可靠性投遞

          Consumer ACK

          ack指acknowledge,確認。表示消費者端接收到消息后的確認方式
          有三種方式確認:
              自動確認:acknowledge="none"
              手動確認:acknowledge="manual"
              根據(jù)異常情況確認:acknowledge="auto"
              
          自動確認指,當(dāng)消息一旦被消費者接收到,則自動確認收到,并將相應(yīng)的message從mq的消息緩存中移除。
          但是在實際業(yè)務(wù)處理中,很可能消息接收到,業(yè)務(wù)處理出現(xiàn)異常,那么該消息就會丟失。
          如果設(shè)置了手動確認模式,則需要在業(yè)務(wù)處理成功后,調(diào)用channel.basicAck(),手動簽收,如果出現(xiàn)異常,
          則調(diào)用channel.basicNack()方法,讓其自動重新發(fā)送消息。    

          我這里學(xué)習(xí)了前面五種

          1:簡單模式

          2:工作隊列模式

          3:發(fā)布訂閱模式

          4:路由模式

          5:主題模式


          簡單模式:即一條線一個發(fā)送到隊列,隊列發(fā)送到接收者

          工作隊列模式:即有一個發(fā)送者發(fā)送信息到隊列,隊列發(fā)給多個接收者,比如群發(fā)

          發(fā)布訂閱模式:這個是使用的最多的,發(fā)布者需要先發(fā)送到交換機,交換機再發(fā)送到與之綁定的隊列, 然后隊列在發(fā)送到與之綁定隊列的接收者

          路由模式:路由模式在發(fā)布訂閱上增加了條件篩選,在消息到達交換機后發(fā)送隊列時進行條件匹配,匹配成功才能發(fā)送給對應(yīng)綁定的隊列,最后再發(fā)送給接收者

          主題模式:主題模式在路由模式上面進行升級,條件可進行模糊匹配,通配符規(guī)則 #可以匹配多個詞 * 只能匹配一個詞 如:test.# 匹配 test.one.tow test.one.q.wqe / test.* 匹配 test.one test.two


          先安裝rabbitMq,不同的環(huán)境可安裝相關(guān)的版本,我這里已經(jīng)安裝好了

          然后運行sbin下面的rabbitmq-server.bat


          然后網(wǎng)頁localhost:15672,如下頁面即安裝成功


          然后去rabbitmq的官網(wǎng)


          左邊是下載右邊是文檔


          文檔中也會有一些代碼案例,點擊文檔可以看到mq有七種方式


          第一個是在測試的時候需要引入的包,第二個是在springboot上需要引入的包


          com.rabbitmq

          amqp-client

          5.3.0

              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-amqp</artifactId>
              </dependency>

          一:簡單模式

          我給mq的連接封裝在工具類里,一些隊列名放在常量類里了

          工具類代碼:

          package com.lansi.realtynavi.test.utils;

          import com.rabbitmq.client.Connection;
          import com.rabbitmq.client.ConnectionFactory;

          /**
           * @Description 描述
           * @Date 2021/3/23 11:22
           * @Created by huyao
           */
          public class RabbitUtils {

              public static ConnectionFactory factory = new ConnectionFactory();
              static {
                  factory.setHost("localhost");
              }

              public static Connection getConnection() throws Exception{
                  Connection connection = null;
                  try {
                      //獲取長連接
                      connection  = factory.newConnection();
                  }catch (Exception e){
                      e.printStackTrace();
                  }/*finally {
                      connection.close();
                  } */
                  return connection;
              }


          }


          常量類代碼:

          package com.lansi.realtynavi.test.constant;

          /**
           * @Description 描述
           * @Date 2021/3/23 11:01
           * @Created by huyao
           */
          public class MqConstant {

              public static final String MQ_HELLO_WORD = "helloWord";
              public static final String MQ_PUBLISH = "publish";
              public static final String MQ_ROUTING = "routing";
              public static final String MQ_TOPICS = "topics";
              public static final String MQ_WORK_QUEUES = "workQueues";


              public static final String MQ_QUEUE_BAIDU = "baidu";
              public static final String MQ_QUEUE_XINLANG = "xinlang";



              public static final String MQ_PUBLISH_JHJ = "jiaohuanji";
              public static final String MQ_ROUTING_JHJ = "jiaohuanjiRout";
              public static final String MQ_TOPIC_JHJ = "jiaohuanjiTopic";


          }


          生產(chǎn)者代碼

          package com.lansi.realtynavi.test.helloWord;

          import com.lansi.realtynavi.test.constant.MqConstant;
          import com.lansi.realtynavi.test.utils.RabbitUtils;
          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;

          /**
           * @Description 簡單模式
           * @Date 2021/3/22 17:19
           * @Created by huyao
           */
          public class Producer {


              public static void main(String[] args) throws Exception{


                  Channel channel = null;

                  Connection connection = null;
                  try {
                      //獲取長連接
                      connection = RabbitUtils.getConnection();
                      channel = connection.createChannel();

                      channel.queueDeclare(MqConstant.MQ_HELLO_WORD, falsefalsefalse, null);
                      String message = "這是我發(fā)送的第三個隊列消息";
                      //第一個參數(shù)是交換機信息   簡單隊列不需要交換機  第二個參數(shù)隊列名稱 ,第三個額外信息,第四個需要發(fā)布的信息
                      channel.basicPublish("", MqConstant.MQ_HELLO_WORD, null, message.getBytes());
                      System.out.println("[x] Send ‘" + message + "’");
                  }catch (Exception e){
                      e.printStackTrace();
                  }finally {
                      channel.close();
                      connection.close();
                  }

              }

          }


          消費者代碼:

          package com.lansi.realtynavi.test.helloWord;

          import com.lansi.realtynavi.test.constant.MqConstant;
          import com.lansi.realtynavi.test.utils.RabbitUtils;
          import com.rabbitmq.client.*;

          import java.io.IOException;

          /**
           * @Description 描述
           * @Date 2021/3/22 17:27
           * @Created by huyao
           */
          public class Consumer {

              public static void main(String[] argv) throws Exception {
                  //連接
                  Connection connection = RabbitUtils.getConnection();
                  Channel channel = connection.createChannel();
                  //聲明并創(chuàng)建一個隊列
                  //參數(shù)1 隊列ID
                  //參數(shù)2 是否持久化,false對應(yīng)不持久化數(shù)據(jù),mq停掉數(shù)據(jù)就會丟失
                  //參數(shù)3 是否隊列私有化,false則代表所有消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用
                  //參數(shù)4 是否自動刪除, false代表連接停掉后不自動刪除這個隊列
                  // 其他額外的參數(shù),null
                  channel.queueDeclare(MqConstant.MQ_HELLO_WORD, falsefalsefalse, null);


                  //從MQ服務(wù)器中獲取數(shù)據(jù)

                  //創(chuàng)建一個消息消費者
                  //參數(shù)1:隊列ID
                  //參數(shù)2:代表是否自動確認收到消息,false代表手動編程來確認消息,這是mq的推薦做法
                  //參數(shù)3:參數(shù)要傳入的DefaultConsumer的實現(xiàn)類
                  channel.basicConsume(MqConstant.MQ_HELLO_WORD, false, new Reciver(channel));
              }
          }

          class Reciver extends DefaultConsumer {
              private Channel channel;
              //重寫構(gòu)造函數(shù),Channel通道對象需要從外層傳入,在handleDelivery中用到
              public Reciver(Channel channel) {
                  super(channel);
                  this.channel = channel;
              }

              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  String message = new String(body);
                  System.out.println("消費者接收到的消息:"+message);
                  System.out.println("消息的ID:"+envelope.getDeliveryTag());

                  //false只確認簽收當(dāng)前的消息,設(shè)置為true的時候則代表簽收該消費者所有未簽收的消息
                  channel.basicAck(envelope.getDeliveryTag(), false);

              }
          }


          測試的時候隊列需要手動去創(chuàng)建,不過springboot的話可以自動創(chuàng)建

          這里已經(jīng)手動創(chuàng)建好了

          運行接收者,運行啟動者

          這里接收者自動接收消息


          二:工作隊列模式

            一個隊列多個接收者


          生產(chǎn)者代碼:

          package com.lansi.realtynavi.test.workQueues;

          import com.google.gson.Gson;
          import com.lansi.realtynavi.test.constant.MqConstant;
          import com.lansi.realtynavi.test.utils.RabbitUtils;
          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;

          /**
           * @Description 工作隊列模式
           * @Date 2021/3/22 17:33
           * @Created by huyao
           */
          public class Producer {


              public static void main(String[] args) throws Exception{

                  Connection connection = RabbitUtils.getConnection();
                  Channel channel = connection.createChannel();
                  channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, falsefalsefalse, null);

                  for(int i = 1; i<=20; i++){
                      SMS sms = new SMS("乘客" + i, "123456789""你的車票已預(yù)訂成功");
                      String message = new Gson().toJson(sms);
                      channel.basicPublish("", MqConstant.MQ_WORK_QUEUES, null, message.getBytes());
                  }

                  System.out.println("發(fā)送數(shù)據(jù)成功");
                  channel.close();
                  connection.close();
              }

          }


          封裝對象代碼:

          package com.lansi.realtynavi.test.workQueues;

          /**
           * @Description 描述
           * @Date 2021/3/23 11:28
           * @Created by huyao
           */
          public class SMS {

              private String name;
              private String mobile;
              private String content;

              public SMS(String name, String mobile, String content) {
                  this.name = name;
                  this.mobile = mobile;
                  this.content = content;
              }

              public String getName() {
                  return name;
              }

              public void setName(String name) {
                  this.name = name;
              }

              public String getMobile() {
                  return mobile;
              }

              public void setMobile(String mobile) {
                  this.mobile = mobile;
              }

              public String getContent() {
                  return content;
              }

              public void setContent(String content) {
                  this.content = content;
              }
          }


          三個接收者代碼


          接收者1

          package com.lansi.realtynavi.test.workQueues;

          import com.lansi.realtynavi.test.constant.MqConstant;
          import com.lansi.realtynavi.test.utils.RabbitUtils;
          import com.rabbitmq.client.*;

          import java.io.IOException;

          /**
           * @Description 描述
           * @Date 2021/3/23 11:33
           * @Created by huyao
           */
          public class Consumer1 {

              public static void main(String[] args) throws Exception{

                  Connection connection = RabbitUtils.getConnection();
                  Channel channel = connection.createChannel();
                  channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, falsefalsefalse, null);

                  //如果不寫baiscQos(1) 則自動mq會將所有請求平均發(fā)送給所有消費者
                  //baiscQos,mq不再對消費者一次發(fā)送多個請求,而是消費者處理完一個消息后(確認后),再從隊列中獲取一個新的
                  channel.basicQos(1);//處理完一個取一個

                  channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          String message = new String(body);
                          System.out.println("smsConsumer1-短信發(fā)送成功:"+message);

                          //服務(wù)器好的話可以在這里睡眠   這里可動態(tài)配置開啟和設(shè)置睡眠時間
                          /*try {
                              Thread.sleep(10);
                          }catch (Exception e){
                              e.printStackTrace();
                          }*/

                          channel.basicAck(envelope.getDeliveryTag(), false);
                      }
                  });
              }
          }


          接收者2

          package com.lansi.realtynavi.test.workQueues;

          import com.lansi.realtynavi.test.constant.MqConstant;
          import com.lansi.realtynavi.test.utils.RabbitUtils;
          import com.rabbitmq.client.*;

          import java.io.IOException;

          /**
           * @Description 描述
           * @Date 2021/3/23 11:40
           * @Created by huyao
           */
          public class Consumer2 {

              public static void main(String[] args) throws Exception{

                  Connection connection = RabbitUtils.getConnection();
                  Channel channel = connection.createChannel();
                  channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, falsefalsefalse, null);

                  channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          String message = new String(body);
                          System.out.println("smsConsumer2-短信發(fā)送成功:"+message);
                          channel.basicAck(envelope.getDeliveryTag(), false);
                      }
                  });
              }

          }


          接收者3

          package com.lansi.realtynavi.test.workQueues;

          import com.lansi.realtynavi.test.constant.MqConstant;
          import com.lansi.realtynavi.test.utils.RabbitUtils;
          import com.rabbitmq.client.*;

          import java.io.IOException;

          /**
           * @Description 描述
           * @Date 2021/3/23 11:41
           * @Created by huyao
           */
          public class Consumer3 {

              public static void main(String[] args) throws Exception{

                  Connection connection = RabbitUtils.getConnection();
                  Channel channel = connection.createChannel();
                  channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, falsefalsefalse, null);

                  channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          String message = new String(body);
                          System.out.println("smsConsumer1-短信發(fā)送成功:"+message);
                          channel.basicAck(envelope.getDeliveryTag(), false);
                      }
                  });
              }

          }


          啟動三個接收類,啟動發(fā)送類





          三個接收都拿到了數(shù)據(jù),我學(xué)習(xí)的時候隊列是以輪詢的方式給三個消費者發(fā)送數(shù)據(jù),這里出現(xiàn)了接收數(shù)據(jù)不均衡的情況應(yīng)該是緩存沒用清理,給隊列刪掉重新創(chuàng)建就好了

          三:發(fā)布訂閱模式

          生成者代碼:

          這里和前面兩種模式不同,發(fā)送者綁定了交換機,沒用綁定隊列,需要消費者綁定交換機和隊列

          package com.lansi.realtynavi.test.publish;

          import com.lansi.realtynavi.test.constant.MqConstant;
          import com.lansi.realtynavi.test.utils.RabbitUtils;
          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;

          import java.util.Scanner;

          /**
           * @Description 發(fā)布訂閱模式
           * @Date 2021/3/23 13:31
           * @Created by huyao
           */
          public class Producer {

              public static void main(String[] args) throws Exception{

                  Connection connection = RabbitUtils.getConnection();
                  Channel channel = connection.createChannel();
                  channel.queueDeclare(MqConstant.MQ_PUBLISH, falsefalsefalse, null);

                  String input = new Scanner(System.in).next();


                  //第一個參數(shù)交換機名字,其他參數(shù)和之前一樣
                  channel.basicPublish(MqConstant.MQ_PUBLISH_JHJ, "", null, input.getBytes());

                  channel.close();
                  connection.close();

              }
          }


          接收者1代碼:

          package com.lansi.realtynavi.test.publish;

          import com.lansi.realtynavi.test.constant.MqConstant;
          import com.lansi.realtynavi.test.utils.RabbitUtils;
          import com.rabbitmq.client.*;

          import java.io.IOException;

          /**
           * @Description 消費者
           * @Date 2021/3/23 13:50
           * @Created by huyao
           */
          public class ConsumerXinLang {

              public static void main(String[] args) throws Exception{

                  Connection connection = RabbitUtils.getConnection();
                  Channel channel = connection.createChannel();
                  channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, falsefalsefalse, null);


                  //隊列綁定交換機
                  //參數(shù)1:隊列名,參數(shù)2:交換機名,參數(shù)3:路由key(目前用不到,路由模式通配符模式使用)
                  channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_PUBLISH_JHJ, "");
                  channel.basicQos(1);

                  channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          System.out.println("消費者新浪收到消息:"+new String(body));
                          channel.basicAck(envelope.getDeliveryTag(), false);
                      }
                  });


              }



          }


          接收者2代碼:

          package com.lansi.realtynavi.test.publish;

          import com.lansi.realtynavi.test.constant.MqConstant;
          import com.lansi.realtynavi.test.utils.RabbitUtils;
          import com.rabbitmq.client.*;

          import java.io.IOException;

          /**
           * @Description 消費者
           * @Date 2021/3/23 13:50
           * @Created by huyao
           */
          public class ConsumerBaiDu {

              public static void main(String[] args) throws Exception{

                  Connection connection = RabbitUtils.getConnection();
                  Channel channel = connection.createChannel();
                  channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, falsefalsefalse, null);


                  //隊列綁定交換機   目前交換機需要在rabbit也手動創(chuàng)建,在和spring整合的時候spring會自動幫我們創(chuàng)建
                  //參數(shù)1:隊列名,參數(shù)2:交換機名,參數(shù)3:路由key(目前用不到,路由模式通配符模式使用)
                  channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_PUBLISH_JHJ, "");
                  channel.basicQos(1);

                  channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          System.out.println("消費者百度收到消息:"+new String(body));
                          channel.basicAck(envelope.getDeliveryTag(), false);
                      }
                  });


              }



          }


          啟動生產(chǎn)者消費者,在生產(chǎn)者控制臺輸入信息:



          兩個消費者都接收到了



          四 :路由模式

          路由模式發(fā)送需要攜帶路由key,用作接收者進行判斷

          生產(chǎn)者代碼:

          package com.lansi.realtynavi.test.routing;

          import com.lansi.realtynavi.test.constant.MqConstant;
          import com.lansi.realtynavi.test.utils.RabbitUtils;
          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;

          import java.util.Iterator;
          import java.util.LinkedHashMap;
          import java.util.Map;

          /**
           * @Description 路由模式
           * @Date 2021/3/23 13:31
           * @Created by huyao
           *
           *
           * 交換機類型:fanout廣播(發(fā)布訂閱)   direct轉(zhuǎn)發(fā)(路由)  topic通配符(通配模式)
           *
           */
          public class Producer {

              public static void main(String[] args) throws Exception{

                  Connection connection = RabbitUtils.getConnection();
                  Channel channel = connection.createChannel();
                  channel.queueDeclare(MqConstant.MQ_PUBLISH, falsefalsefalse, null);
                  LinkedHashMap<String, String> map = new LinkedHashMap<>();
                  map.put("test1","測試一數(shù)據(jù)");
                  map.put("test2","測試二數(shù)據(jù)");
                  map.put("test3","測試三數(shù)據(jù)");
                  map.put("test4","測試四數(shù)據(jù)");
                  map.put("test5","測試五數(shù)據(jù)");
                  map.put("test6","測試六數(shù)據(jù)");
                  map.put("test7","測試七數(shù)據(jù)");
                  Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();
                  while (iterator.hasNext()){
                      Map.Entry<String, String> next = iterator.next();
                      //第一個參數(shù)交換機名字,第二個參數(shù)指定rout_key
                      channel.basicPublish(MqConstant.MQ_ROUTING_JHJ, next.getKey(), null, next.getValue().getBytes());
                  }



                  channel.close();
                  connection.close();

              }



          }


          接收者1:

          package com.lansi.realtynavi.test.routing;

          import com.lansi.realtynavi.test.constant.MqConstant;
          import com.lansi.realtynavi.test.utils.RabbitUtils;
          import com.rabbitmq.client.*;

          import java.io.IOException;

          /**
           * @Description 消費者
           * @Date 2021/3/23 13:50
           * @Created by huyao
           */
          public class ConsumerBaiDu {

              public static void main(String[] args) throws Exception{

                  Connection connection = RabbitUtils.getConnection();
                  Channel channel = connection.createChannel();
                  channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, falsefalsefalse, null);


                  //隊列綁定交換機   目前交換機需要在rabbit也手動創(chuàng)建,在和spring整合的時候spring會自動幫我們創(chuàng)建
                  //參數(shù)1:隊列名,參數(shù)2:交換機名,參數(shù)3:路由key(目前用不到,路由模式通配符模式使用)
                  channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_ROUTING_JHJ, "test1");
                  channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_ROUTING_JHJ, "test2");
                  channel.basicQos(1);

                  channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          System.out.println("消費者百度收到消息:"+new String(body));
                          channel.basicAck(envelope.getDeliveryTag(), false);
                      }
                  });


              }



          }


          接收者二

          package com.lansi.realtynavi.test.routing;

          import com.lansi.realtynavi.test.constant.MqConstant;
          import com.lansi.realtynavi.test.utils.RabbitUtils;
          import com.rabbitmq.client.*;

          import java.io.IOException;

          /**
           * @Description 消費者
           * @Date 2021/3/23 13:50
           * @Created by huyao
           */
          public class ConsumerXinLang {

              public static void main(String[] args) throws Exception{

                  Connection connection = RabbitUtils.getConnection();
                  Channel channel = connection.createChannel();
                  channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, falsefalsefalse, null);


                  //隊列綁定交換機
                  //參數(shù)1:隊列名,參數(shù)2:交換機名,參數(shù)3:路由key
                  channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test10");
                  channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test6");
                  channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test5");
                  channel.basicQos(1);

                  channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          System.out.println("消費者新浪收到消息:"+new String(body));
                          channel.basicAck(envelope.getDeliveryTag(), false);
                      }
                  });


              }



          }


          在這里看到百度接收者只接受test1、test2,所以只接收到了1和2的數(shù)據(jù),新浪同理


          五 :主題模式

           在路由的基礎(chǔ)上增加了通配符匹配
           通配符規(guī)則  #可以匹配多個詞  * 只能匹配一個詞

          生產(chǎn)者代碼:

          package com.lansi.realtynavi.test.topics;

          import com.lansi.realtynavi.test.constant.MqConstant;
          import com.lansi.realtynavi.test.utils.RabbitUtils;
          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;

          import java.util.Iterator;
          import java.util.LinkedHashMap;
          import java.util.Map;

          /**
           * @Description 通配符模式
           * @Date 2021/3/23 13:31
           * @Created by huyao
           *
           *
           * 交換機類型:fanout廣播(發(fā)布訂閱)   direct轉(zhuǎn)發(fā)(路由)  topic通配符(通配模式)
           *
           *  通配符規(guī)則  #可以匹配多個詞  * 只能匹配一個詞
           *              test.#  test.one.tow  test.one.q.wqe  /  test.*   test.one test.two
           */
          public class Producer {

              public static void main(String[] args) throws Exception{

                  Connection connection = RabbitUtils.getConnection();
                  Channel channel = connection.createChannel();
                  channel.queueDeclare(MqConstant.MQ_TOPIC_JHJ, falsefalsefalse, null);
                  LinkedHashMap<String, String> map = new LinkedHashMap<>();
                  map.put("test.one","測試一數(shù)據(jù)");
                  map.put("test2.two.one","測試二數(shù)據(jù)");
                  map.put("test.wqe","測試三數(shù)據(jù)");
                  map.put("test4.com.hash.oqp","測試四數(shù)據(jù)");
                  map.put("test5.com.code.oqp","測試五數(shù)據(jù)");
                  map.put("test6.com.code.oqp","測試六數(shù)據(jù)");
                  Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();
                  while (iterator.hasNext()){
                      Map.Entry<String, String> next = iterator.next();
                      //第一個參數(shù)交換機名字,第二個參數(shù)指定rout_key
                      channel.basicPublish(MqConstant.MQ_TOPIC_JHJ, next.getKey(), null, next.getValue().getBytes());
                  }
                  channel.close();
                  connection.close();

              }



          }


          接收者1代碼:

          package com.lansi.realtynavi.test.topics;

          import com.lansi.realtynavi.test.constant.MqConstant;
          import com.lansi.realtynavi.test.utils.RabbitUtils;
          import com.rabbitmq.client.*;

          import java.io.IOException;

          /**
           * @Description 消費者
           * @Date 2021/3/23 13:50
           * @Created by huyao
           */
          public class ConsumerBaiDu {

              public static void main(String[] args) throws Exception{

                  Connection connection = RabbitUtils.getConnection();
                  Channel channel = connection.createChannel();
                  channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, falsefalsefalse, null);


                  //隊列綁定交換機   目前交換機需要在rabbit也手動創(chuàng)建,在和spring整合的時候spring會自動幫我們創(chuàng)建
                  //參數(shù)1:隊列名,參數(shù)2:交換機名,參數(shù)3:路由key(目前用不到,路由模式通配符模式使用)
                  channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_TOPIC_JHJ, "*.*.*.oqp");
                  channel.basicQos(1);

                  channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          System.out.println("消費者百度收到消息:"+new String(body));
                          channel.basicAck(envelope.getDeliveryTag(), false);
                      }
                  });


              }



          }


          接收者2代碼

          package com.lansi.realtynavi.test.topics;

          import com.lansi.realtynavi.test.constant.MqConstant;
          import com.lansi.realtynavi.test.utils.RabbitUtils;
          import com.rabbitmq.client.*;

          import java.io.IOException;

          /**
           * @Description 消費者
           * @Date 2021/3/23 13:50
           * @Created by huyao
           */
          public class ConsumerXinLang {

              public static void main(String[] args) throws Exception{

                  Connection connection = RabbitUtils.getConnection();
                  Channel channel = connection.createChannel();
                  channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, falsefalsefalse, null);


                  //隊列綁定交換機
                  //參數(shù)1:隊列名,參數(shù)2:交換機名,參數(shù)3:路由key
                  channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_TOPIC_JHJ, "test.#");
                  channel.basicQos(1);

                  channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          System.out.println("消費者新浪收到消息:"+new String(body));
                          channel.basicAck(envelope.getDeliveryTag(), false);
                      }
                  });


              }



          }



          最后就是springboot上整合rabbitmq

          需要用到的依賴

            <dependency>
                      <groupId>org.springframework.boot</groupId>
                      <artifactId>spring-boot-starter-amqp</artifactId>
                  </dependency>

          然后配置rabbitmq連接

          spring.rabbitmq.host=127.0.0.1
          spring.rabbitmq.port=5672
          spring.rabbitmq.username=admin
          spring.rabbitmq.password=111111
          #發(fā)送者開啟confirm確認機制
          spring.rabbitmq.publisher-confirms=true
          #發(fā)送者開啟return確認機制
          spring.rabbitmq.publisher-returns=true

          #開啟ack

          spring.rabbitmq.listener.type=simple
          spring.rabbitmq.listener.simple.acknowledge-mode=manual
          spring.rabbitmq.listener.simple.default-requeue-rejected=false

          接下來一個rabbitmq的配置

          package com.lansi.realtynavi.config;

          import org.springframework.amqp.core.*;
          import org.springframework.beans.factory.annotation.Qualifier;
          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;

          /**
           * @Description mq的配置
           * @Date 2021/3/24 14:19
           * @Created by huyao
           */
          @Configuration
          public class RabbitMqConfig {

              //定義交換機的名字
              public static final String EXCHANGE_NAME = "boot_topic_exchange";

              public static final String QUEUE_NAME = "boot_queue";

              //1.聲明交換機
              @Bean("bootExchange")
              public Exchange bootExchange(){
                  return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
              }

              //2.聲明隊列
              @Bean("bootQueue")
              public Queue bootQueue(){
                  return QueueBuilder.durable(QUEUE_NAME).build();
              }

              //3.綁定
              @Bean
              public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
                  return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
              }
          }


          接收者

          package com.lansi.realtynavi.config;

          import com.rabbitmq.client.Channel;
          import org.springframework.amqp.core.Message;
          import org.springframework.amqp.rabbit.annotation.RabbitListener;
          import org.springframework.stereotype.Component;

          /**
           * @Description mq監(jiān)聽/消費者手動簽收消息
           * @Date 2021/3/24 14:44
           * @Created by huyao
           *
           *rabbitmq給了兩種消息的可靠性  confirm和return
           *
           */
          @Component
          public class RabbitMqConsumer {


              //可監(jiān)聽分布式其他項目,只要mq連接的地址相同監(jiān)聽的隊列名存在即可
              //消費者
              @RabbitListener(queues = "boot_queue")
              public void ListenerQueue(Message message, Channel channel) throws Exception{

                  System.out.println("消費者接收到消息:"+new String(message.getBody()));

                  
                  try{
                      //開始業(yè)務(wù)處理

                      System.out.println("開始業(yè)務(wù)處理");

                      //int i = 5/0;

                      System.out.println("業(yè)務(wù)處理完成");
                      //業(yè)務(wù)處理完成確認收到消息  , 第二個參數(shù)為true支持多消息
                      channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);

                  }catch (Exception e){
                      System.out.println("業(yè)務(wù)處理異常");
                      //業(yè)務(wù)異常,拒收消息,請求重發(fā)    參數(shù)三為true則重回隊列發(fā)送
                      channel.basicNack(message.getMessageProperties().getDeliveryTag(), truetrue);
                  }


              }

          }


          這里的生產(chǎn)者我寫的一個controller中的列子(錯誤示范,只能調(diào)用一次)

          testTopic1 是測試mq的高級特性,這里只用到testTopic就可以

          package com.lansi.realtynavi.rabbitmq;

          import com.lansi.realtynavi.config.RabbitMqConfig;
          import com.lansi.realtynavi.dev.helloWord.HelloSender;
          import org.springframework.amqp.core.Message;
          import org.springframework.amqp.rabbit.core.RabbitTemplate;
          import org.springframework.amqp.rabbit.support.CorrelationData;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.web.bind.annotation.GetMapping;
          import org.springframework.web.bind.annotation.RequestMapping;
          import org.springframework.web.bind.annotation.RestController;

          /**
           * @Description 描述
           * @Date 2021/3/24 13:46
           * @Created by huyao
           */
          @RestController
          @RequestMapping("api/rabbitMq")
          public class RabbitMqController {


              @Autowired
              private HelloSender helloSender;

              @Autowired
              private RabbitTemplate rabbitTemplate;

              @GetMapping("helloWorld")
              public void hello(){
                  helloSender.send();
              }

              @GetMapping("testTopic")
              public void testTopic(){
                  rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"boot.hhh""topic的mq.......");
              }

              //mq的可靠性機制,必須要在配置文件中開啟
              @GetMapping("testTopic1")
              public void testTopic1(){

                  rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                      @Override
                      public void confirm(CorrelationData correlationData, boolean b, String s) {
                          System.out.println("confirm方法被執(zhí)行了。。。");
                          if(b){
                              System.out.println("交換機確認成功??!");
                          } else {
                              System.out.println("交換機確認失?。?!");
                          }

                      }
                  });

                  //設(shè)置交換機處理失敗消息的模式,為true的時候,消息打到不了隊列時,會將消息重新返回給生產(chǎn)者
                  rabbitTemplate.setMandatory(true);
                  rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

                      /**
                       * @param message 消息對象
                       * @param returnCode 錯誤碼
                       * @param returnText 錯誤信息
                       * @param exchange 交換機
                       * @param routingKey 路由鍵
                       *
                       * */
                      @Override
                      public void returnedMessage(Message message, int returnCode, String returnText, String exchange,String routingKey) {
                          System.out.println("return被執(zhí)行了。。。");
                          System.out.println("message:"+new String(message.getBody()));
                          System.out.println("錯誤碼:"+returnCode);
                          System.out.println("錯誤信息:"+returnText);
                          System.out.println("交換機:"+exchange);
                          System.out.println("路由鍵:"+routingKey);
                      }
                  });
                  rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"abc.boot.hhh""topic的mq.......");
              }

          }


          運行后掉對應(yīng)的接口,消費者接收



          這樣rabbitmq就整合進springboot中了

          ————————————————

          版權(quán)聲明:本文為CSDN博主「oNuoyi」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接及本聲明。

          原文鏈接:

          https://blog.csdn.net/qq_41973632/article/details/115233999




          鋒哥最新SpringCloud分布式電商秒殺課程發(fā)布

          ??????

          ??長按上方微信二維碼 2 秒





          感謝點贊支持下哈 

          瀏覽 38
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  一区二区久久 | 无码中文字幕在线视频 | 免费黄色电影网站在线观看 | 国产一级a毛一级a看免费视频野外 | 亚洲午夜免费视频 |