<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          速入! 3W字帶你迅速上手MQ

          共 84449字,需瀏覽 169分鐘

           ·

          2021-04-06 13:46

          ????關(guān)注后回復(fù) “進(jìn)群” ,拉你進(jìn)程序員交流群????
          作者丨SoWhat1412
          來(lái)源丨sowhat1412
          ?

          高清思維導(dǎo)圖已同步Git:https://github.com/SoWhat1412/xmindfile,關(guān)注公眾號(hào)sowhat1412獲取海量資源

          ?

          1. 消息隊(duì)列解決了什么問(wèn)題

          消息中間件是目前比較流行的一個(gè)中間件,其中RabbitMQ更是占有一定的市場(chǎng)份額,主要用來(lái)做異步處理、應(yīng)用解耦、流量削峰、日志處理等等方面。

          1. 異步處理

          一個(gè)用戶(hù)登陸網(wǎng)址注冊(cè),然后系統(tǒng)發(fā)短信跟郵件告知注冊(cè)成功,一般有三種解決方法。

          1. 串行到依次執(zhí)行,問(wèn)題是用戶(hù)注冊(cè)后就可以使用了,沒(méi)必要等驗(yàn)證碼跟郵件。
          2. 注冊(cè)成功后,郵件跟驗(yàn)證碼用并行等方式執(zhí)行,問(wèn)題是郵件跟驗(yàn)證碼是非重要的任務(wù),系統(tǒng)注冊(cè)還要等這倆完成么?
          3. 基于異步MQ的處理,用戶(hù)注冊(cè)成功后直接把信息異步發(fā)送到MQ中,然后郵件系統(tǒng)跟驗(yàn)證碼系統(tǒng)主動(dòng)去拉取數(shù)據(jù)。

          2. 應(yīng)用解耦

          比如我們有一個(gè)訂單系統(tǒng),還要一個(gè)庫(kù)存系統(tǒng),用戶(hù)下訂單了就要調(diào)用下庫(kù)存系統(tǒng)來(lái)處理,直接調(diào)用到話(huà)庫(kù)存系統(tǒng)出現(xiàn)問(wèn)題咋辦呢?

          3. 流量削峰

          舉辦一個(gè) 秒殺活動(dòng),如何較好到設(shè)計(jì)?服務(wù)層直接接受瞬間搞密度訪問(wèn)絕對(duì)不可以起碼要加入一個(gè)MQ。

          4. 日志處理

          用戶(hù)通過(guò)WebUI訪問(wèn)發(fā)送請(qǐng)求到時(shí)候后端如何接受跟處理呢一般?

          2. RabbitMQ 安裝跟配置

          官網(wǎng):https://www.rabbitmq.com/download.html

          開(kāi)發(fā)語(yǔ)言:https://www.erlang.org/

          正式到安裝跟允許需要Erlang跟RabbitMQ倆版本之間相互兼容!我這里圖省事直接用Docker 拉取鏡像了。下載:開(kāi)啟:管理頁(yè)面 默認(rèn)賬號(hào):guest  默認(rèn)密碼:guest 。Docker啟動(dòng)時(shí)候可以指定賬號(hào)密碼對(duì)外端口以及

          docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management 

          啟動(dòng):用戶(hù)添加:vitrual hosts 相當(dāng)于mysql中的DB。創(chuàng)建一個(gè)virtual hosts,一般以/ 開(kāi)頭。對(duì)用戶(hù)進(jìn)行授權(quán),點(diǎn)擊/vhost_mmr,至于WebUI多點(diǎn)點(diǎn)即可了解。

          3. 實(shí)戰(zhàn)

          RabbitMQ 官網(wǎng)支持任務(wù)模式:https://www.rabbitmq.com/getstarted.htm

          l創(chuàng)建Maven項(xiàng)目導(dǎo)入必要依賴(lài):

              <dependencies>
                  <dependency>
                      <groupId>com.rabbitmq</groupId>
                      <artifactId>amqp-client</artifactId>
                      <version>4.0.2</version>
                  </dependency>

                  <dependency>
                      <groupId>org.slf4j</groupId>
                      <artifactId>slf4j-api</artifactId>
                      <version>1.7.10</version>
                  </dependency>

                  <dependency>
                      <groupId>org.slf4j</groupId>
                      <artifactId>slf4j-log4j12</artifactId>
                      <version>1.7.5</version>
                  </dependency>

                  <dependency>
                      <groupId>log4j</groupId>
                      <artifactId>log4j</artifactId>
                      <version>1.2.17</version>
                  </dependency>

                  <dependency>
                      <groupId>junit</groupId>
                      <artifactId>junit</artifactId>
                      <version>4.11</version>
                  </dependency>
              </dependencies>

          0. 獲取MQ連接

          package com.sowhat.mq.util;

          import com.rabbitmq.client.Connection;
          import com.rabbitmq.client.ConnectionFactory;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          public class ConnectionUtils {
              /**
               * 連接器
               * @return
               * @throws IOException
               * @throws TimeoutException
               */

              public static Connection getConnection() throws IOException, TimeoutException {
                  ConnectionFactory factory = new ConnectionFactory();
                  factory.setHost("127.0.0.1");
                  factory.setPort(5672);
                  factory.setVirtualHost("/vhost_mmr");
                  factory.setUsername("user_mmr");
                  factory.setPassword("sowhat");
                  Connection connection = factory.newConnection();
                  return connection;
              }
          }

          1. 簡(jiǎn)單隊(duì)列

          P:Producer 消息的生產(chǎn)者 中間:Queue消息隊(duì)列 C:Consumer 消息的消費(fèi)者

          package com.sowhat.mq.simple;

          import com.rabbitmq.client.AMQP;
          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          public class Send {
              public static final String QUEUE_NAME = "test_simple_queue";

              public static void main(String[] args) throws IOException, TimeoutException {
                  // 獲取一個(gè)連接
                  Connection connection = ConnectionUtils.getConnection();
                  // 從連接獲取一個(gè)通道
                  Channel channel = connection.createChannel();
                  // 創(chuàng)建隊(duì)列聲明
                  AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);

                  String msg = "hello Simple";
                  // exchange,隊(duì)列,參數(shù),消息字節(jié)體
                  channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

                  System.out.println("--send msg:" + msg);

                  channel.close();

                  connection.close();

              }
          }
          ---
          package com.sowhat.mq.simple;

          import com.rabbitmq.client.*;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          /**
           * 消費(fèi)者獲取消息
           */

          public class Recv {
              public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
                  newApi();
                  oldApi();
              }

              private static void newApi() throws IOException, TimeoutException {
                  // 創(chuàng)建連接
                  Connection connection = ConnectionUtils.getConnection();
                  // 創(chuàng)建頻道
                  Channel channel = connection.createChannel();
                  // 隊(duì)列聲明  隊(duì)列名,是否持久化,是否獨(dú)占模式,無(wú)消息后是否自動(dòng)刪除,消息攜帶參數(shù)
                  channel.queueDeclare(Send.QUEUE_NAME,false,false,false,null);
                  // 定義消費(fèi)者
                  DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                      @Override  // 事件模型,消息來(lái)了會(huì)觸發(fā)該函數(shù)
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          String s = new String(body, "utf-8");
                          System.out.println("---new api recv:" + s);
                      }
                  };
                  // 監(jiān)聽(tīng)隊(duì)列
                  channel.basicConsume(Send.QUEUE_NAME,true,defaultConsumer);
              }

              // 老方法 消費(fèi)者 MQ 在3。4以下 用次方法,
              private static void oldApi() throws IOException, TimeoutException, InterruptedException {
                  // 創(chuàng)建連接
                  Connection connection = ConnectionUtils.getConnection();
                  // 創(chuàng)建頻道
                  Channel channel = connection.createChannel();
                  // 定義隊(duì)列消費(fèi)者
                  QueueingConsumer consumer = new QueueingConsumer(channel);
                  //監(jiān)聽(tīng)隊(duì)列
                  channel.basicConsume(Send.QUEUE_NAME, true, consumer);
                  while (true) {
                      // 發(fā)貨體
                      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                      byte[] body = delivery.getBody();
                      String s = new String(body);
                      System.out.println("---Recv:" + s);
                  }
              }
          }

          右上角有可以設(shè)置頁(yè)面刷新頻率,然后可以在UI界面直接手動(dòng)消費(fèi)掉,如下圖:簡(jiǎn)單隊(duì)列的不足:耦合性過(guò)高,生產(chǎn)者一一對(duì)應(yīng)消費(fèi)者,如果有多個(gè)消費(fèi)者想消費(fèi)隊(duì)列中信息就無(wú)法實(shí)現(xiàn)了。

          2. WorkQueue 工作隊(duì)列

          Simple隊(duì)列中只能一一對(duì)應(yīng)的生產(chǎn)消費(fèi),實(shí)際開(kāi)發(fā)中生產(chǎn)者發(fā)消息很簡(jiǎn)單,而消費(fèi)者要跟業(yè)務(wù)結(jié)合,消費(fèi)者接受到消息后要處理從而會(huì)耗時(shí)。「可能會(huì)出現(xiàn)隊(duì)列中出現(xiàn)消息積壓」。所以如果多個(gè)消費(fèi)者可以加速消費(fèi)。

          1. round robin 輪詢(xún)分發(fā)

          代碼編程一個(gè)生產(chǎn)者兩個(gè)消費(fèi)者:

          package com.sowhat.mq.work;

          import com.rabbitmq.client.AMQP;
          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          public class Send {
              public static final String  QUEUE_NAME = "test_work_queue";
              public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
                  // 獲取連接
                  Connection connection = ConnectionUtils.getConnection();
                  // 獲取 channel
                  Channel channel = connection.createChannel();
                  // 聲明隊(duì)列
                  AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);
                  for (int i = 0; i <50 ; i++) {
                      String msg = "hello-" + i;
                      System.out.println("WQ send " + msg);
                      channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
                      Thread.sleep(i*20);
                  }
                  channel.close();
                  connection.close();
              }
          }

          ---
          package com.sowhat.mq.work;

          import com.rabbitmq.client.*;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          public class Recv1 {
              public static void main(String[] args) throws IOException, TimeoutException {
                  // 獲取連接
                  Connection connection = ConnectionUtils.getConnection();
                  // 獲取通道
                  Channel channel = connection.createChannel();
                  // 聲明隊(duì)列
                  channel.queueDeclare(Send.QUEUE_NAME, falsefalsefalsenull);
                  //定義消費(fèi)者
                  DefaultConsumer consumer = new DefaultConsumer(channel) {

                      @Override // 事件觸發(fā)機(jī)制
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          String s = new String(body, "utf-8");
                          System.out.println("【1】:" + s);
                          try {
                              Thread.sleep(2000);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          } finally {
                              System.out.println("【1】 done");
                          }
                      }
                  };
                  boolean autoAck = true;
                  channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
              }
          }
          ---
          package com.sowhat.mq.work;

          import com.rabbitmq.client.*;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          public class Recv2 {
              public static void main(String[] args) throws IOException, TimeoutException {
                  // 獲取連接
                  Connection connection = ConnectionUtils.getConnection();
                  // 獲取通道
                  Channel channel = connection.createChannel();
                  // 聲明隊(duì)列
                  channel.queueDeclare(Send.QUEUE_NAME, falsefalsefalsenull);
                  //定義消費(fèi)者
                  DefaultConsumer consumer = new DefaultConsumer(channel) {

                      @Override // 事件觸發(fā)機(jī)制
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          String s = new String(body, "utf-8");
                          System.out.println("【2】:" + s);
                          try {
                              Thread.sleep(1000 );
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          } finally {
                              System.out.println("【2】 done");
                          }
                      }
                  };
                  boolean autoAck = true;
                  channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
              }
          }

          現(xiàn)象:消費(fèi)者1 跟消費(fèi)者2 處理的數(shù)據(jù)量完全一樣的個(gè)數(shù):消費(fèi)者1:處理偶數(shù) 消費(fèi)者2:處理奇數(shù) 這種方式叫輪詢(xún)分發(fā)(round-robin)結(jié)果就是不管兩個(gè)消費(fèi)者誰(shuí)忙,「數(shù)據(jù)總是你一個(gè)我一個(gè)」,MQ 給兩個(gè)消費(fèi)發(fā)數(shù)據(jù)的時(shí)候是不知道消費(fèi)者性能的,默認(rèn)就是雨露均沾。此時(shí) autoAck = true。

          2. 公平分發(fā) fair dipatch

          如果要實(shí)現(xiàn)公平分發(fā),要讓消費(fèi)者消費(fèi)完畢一條數(shù)據(jù)后就告知MQ,再讓MQ發(fā)數(shù)據(jù)即可。自動(dòng)應(yīng)答要關(guān)閉!

          package com.sowhat.mq.work;

          import com.rabbitmq.client.AMQP;
          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          public class Send {
              public static final String  QUEUE_NAME = "test_work_queue";
              public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
                  // 獲取連接
                  Connection connection = ConnectionUtils.getConnection();
                  // 獲取 channel
                  Channel channel = connection.createChannel();
                  // s聲明隊(duì)列
                  AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);

                  // 每個(gè)消費(fèi)者發(fā)送確認(rèn)消息之前,消息隊(duì)列不發(fā)送下一個(gè)消息到消費(fèi)者,一次只發(fā)送一個(gè)消息
                  // 從而限制一次性發(fā)送給消費(fèi)者到消息不得超過(guò)1個(gè)。
                  int perfetchCount = 1;
                  channel.basicQos(perfetchCount);

                  for (int i = 0; i <50 ; i++) {
                      String msg = "hello-" + i;
                      System.out.println("WQ send " + msg);
                      channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
                      Thread.sleep(i*20);
                  }
                  channel.close();
                  connection.close();
              }
          }
          ---
          package com.sowhat.mq.work;

          import com.rabbitmq.client.*;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          public class Recv1 {
              public static void main(String[] args) throws IOException, TimeoutException {
                  // 獲取連接
                  Connection connection = ConnectionUtils.getConnection();
                  // 獲取通道
                  final Channel channel = connection.createChannel();
                  // 聲明隊(duì)列
                  channel.queueDeclare(Send.QUEUE_NAME, falsefalsefalsenull);
                  // 保證一次只分發(fā)一個(gè)
                  channel.basicQos(1);
                  //定義消費(fèi)者
                  DefaultConsumer consumer = new DefaultConsumer(channel) {

                      @Override // 事件觸發(fā)機(jī)制
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          String s = new String(body, "utf-8");
                          System.out.println("【1】:" + s);
                          try {
                              Thread.sleep(2000);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          } finally {
                              System.out.println("【1】 done");
                              // 手動(dòng)回執(zhí)
                              channel.basicAck(envelope.getDeliveryTag(),false);
                          }
                      }
                  };
                  // 自動(dòng)應(yīng)答
                  boolean autoAck = false;
                  channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
              }
          }
          ---
          package com.sowhat.mq.work;

          import com.rabbitmq.client.*;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          public class Recv2 {
              public static void main(String[] args) throws IOException, TimeoutException {
                  // 獲取連接
                  Connection connection = ConnectionUtils.getConnection();
                  // 獲取通道
                  final Channel channel = connection.createChannel();
                  // 聲明隊(duì)列
                  channel.queueDeclare(Send.QUEUE_NAME, falsefalsefalsenull);
                  // 保證一次只分發(fā)一個(gè)
                  channel.basicQos(1);
                  //定義消費(fèi)者
                  DefaultConsumer consumer = new DefaultConsumer(channel) {

                      @Override // 事件觸發(fā)機(jī)制
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          String s = new String(body, "utf-8");
                          System.out.println("【2】:" + s);
                          try {
                              Thread.sleep(1000);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          } finally {
                              System.out.println("【2】 done");
                              // 手動(dòng)回執(zhí)
                              channel.basicAck(envelope.getDeliveryTag(),false);
                          }
                      }
                  };
                  // 自動(dòng)應(yīng)答
                  boolean autoAck = false;
                  channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
              }
          }

          結(jié)果:實(shí)現(xiàn)了公平分發(fā),消費(fèi)者2 是消費(fèi)者1消費(fèi)數(shù)量的2倍。

          3. publish/subscribe 發(fā)布訂閱模式

          類(lèi)似公眾號(hào)的訂閱跟發(fā)布,無(wú)需指定routingKey:

          解讀:

          1. 一個(gè)生產(chǎn)者多個(gè)消費(fèi)者
          2. 每一個(gè)消費(fèi)者都有一個(gè)自己的隊(duì)列
          3. 生產(chǎn)者沒(méi)有把消息直接發(fā)送到隊(duì)列而是發(fā)送到了交換機(jī)轉(zhuǎn)化器(exchange)。
          4. 每一個(gè)隊(duì)列都要綁定到交換機(jī)上。
          5. 生產(chǎn)者發(fā)送的消息經(jīng)過(guò)交換機(jī)到達(dá)隊(duì)列,從而實(shí)現(xiàn)一個(gè)消息被多個(gè)消費(fèi)者消費(fèi)。

          生產(chǎn)者:

          package com.sowhat.mq.ps;

          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          public class Send {
              public static final String EXCHANGE_NAME = "test_exchange_fanout";
              public static void main(String[] args) throws IOException, TimeoutException {
                  Connection connection = ConnectionUtils.getConnection();
                  Channel channel = connection.createChannel();

                  //聲明交換機(jī)
                  channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 分發(fā)= fanout

                  // 發(fā)送消息
                  String msg = "hello ps ";

                  channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
                  System.out.println("Send:" + msg);

                  channel.close();
                  connection.close();
              }
          }

          消息哪兒去了?丟失了,在RabbitMQ中只有隊(duì)列有存儲(chǔ)能力,「因?yàn)檫@個(gè)時(shí)候隊(duì)列還沒(méi)有綁定到交換機(jī) 所以消息丟失了」。消費(fèi)者:

          package com.sowhat.mq.ps;

          import com.rabbitmq.client.*;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          public class Recv1 {
              public static final String  QUEUE_NAME = "test_queue_fanout_email";
              public static final String EXCHANGE_NAME = "test_exchange_fanout";
              public static void main(String[] args) throws IOException, TimeoutException {
                  Connection connection = ConnectionUtils.getConnection();
                  final Channel channel = connection.createChannel();
                  // 隊(duì)列聲明
                  channel.queueDeclare(QUEUE_NAME,false,false,false,null);
                  // 綁定隊(duì)列到交換機(jī)轉(zhuǎn)發(fā)器
                  channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"" );

                  // 保證一次只分發(fā)一個(gè)
                  channel.basicQos(1);
                  //定義消費(fèi)者
                  DefaultConsumer consumer = new DefaultConsumer(channel) {

                      @Override // 事件觸發(fā)機(jī)制
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          String s = new String(body, "utf-8");
                          System.out.println("【1】:" + s);
                          try {
                              Thread.sleep(2000);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          } finally {
                              System.out.println("【1】 done");
                              // 手動(dòng)回執(zhí)
                              channel.basicAck(envelope.getDeliveryTag(),false);
                          }
                      }
                  };
                  // 自動(dòng)應(yīng)答
                  boolean autoAck = false;
                  channel.basicConsume(QUEUE_NAME, autoAck, consumer);
              }
          }
          ---
          package com.sowhat.mq.ps;

          import com.rabbitmq.client.*;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          public class Recv2 {
              public static final String  QUEUE_NAME = "test_queue_fanout_sms";
              public static final String EXCHANGE_NAME = "test_exchange_fanout";
              public static void main(String[] args) throws IOException, TimeoutException {
                  Connection connection = ConnectionUtils.getConnection();
                  final Channel channel = connection.createChannel();
                  // 隊(duì)列聲明
                  channel.queueDeclare(QUEUE_NAME,false,false,false,null);
                  // 綁定隊(duì)列到交換機(jī)轉(zhuǎn)發(fā)器
                  channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"" );
                  // 保證一次只分發(fā)一個(gè)
                  channel.basicQos(1);
                  //定義消費(fèi)者
                  DefaultConsumer consumer = new DefaultConsumer(channel) {
                      @Override // 事件觸發(fā)機(jī)制
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          String s = new String(body, "utf-8");
                          System.out.println("【2】:" + s);
                          try {
                              Thread.sleep(1000);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          } finally {
                              System.out.println("【2】 done");
                              // 手動(dòng)回執(zhí)
                              channel.basicAck(envelope.getDeliveryTag(),false);
                          }
                      }
                  };
                  // 自動(dòng)應(yīng)答
                  boolean autoAck = false;
                  channel.basicConsume(QUEUE_NAME, autoAck, consumer);
              }
          }

          「同時(shí)還可以自己手動(dòng)的添加一個(gè)隊(duì)列監(jiān)控到該exchange」

          4. routing 路由選擇 通配符模式

          Exchange(交換機(jī),轉(zhuǎn)發(fā)器):「一方面接受生產(chǎn)者消息,另一方面是向隊(duì)列推送消息」。匿名轉(zhuǎn)發(fā)用 ""  表示,比如前面到簡(jiǎn)單隊(duì)列跟WorkQueue。fanout:不處理路由鍵。「不需要指定routingKey」,我們只需要把隊(duì)列綁定到交換機(jī), 「消息就會(huì)被發(fā)送到所有到隊(duì)列中」direct:處理路由鍵,「需要指定routingKey」,此時(shí)生產(chǎn)者發(fā)送數(shù)據(jù)到時(shí)候會(huì)指定key,任務(wù)隊(duì)列也會(huì)指定key,只有key一樣消息才會(huì)被傳送到隊(duì)列中。如下圖

          package com.sowhat.mq.routing;

          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          public class Send {
              public static final String  EXCHANGE_NAME = "test_exchange_direct";
              public static void main(String[] args) throws IOException, TimeoutException {
                  Connection connection = ConnectionUtils.getConnection();
                  Channel channel = connection.createChannel();
                  // exchange
                  channel.exchangeDeclare(EXCHANGE_NAME,"direct");

                  String msg = "hello info!";

                  // 可以指定類(lèi)型
                  String routingKey = "info";
                  channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
                  System.out.println("Send : " + msg);
                  channel.close();
                  connection.close();
              }
          }
          ---
          package com.sowhat.mq.routing;

          import com.rabbitmq.client.*;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          public class Recv1 {
              public static final String  EXCHANGE_NAME = "test_exchange_direct";
              public static final String QUEUE_NAME = "test_queue_direct_1";
              public static void main(String[] args) throws IOException, TimeoutException {
                  Connection connection = ConnectionUtils.getConnection();
                  final Channel channel = connection.createChannel();

                  channel.queueDeclare(QUEUE_NAME,false,false,false,null);
                  channel.basicQos(1);

                  channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");

                  //定義消費(fèi)者
                  DefaultConsumer consumer = new DefaultConsumer(channel) {

                      @Override // 事件觸發(fā)機(jī)制
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          String s = new String(body, "utf-8");
                          System.out.println("【1】:" + s);
                          try {
                              Thread.sleep(2000);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          } finally {
                              System.out.println("【1】 done");
                              // 手動(dòng)回執(zhí)
                              channel.basicAck(envelope.getDeliveryTag(),false);
                          }
                      }
                  };
                  // 自動(dòng)應(yīng)答
                  boolean autoAck = false;
                  channel.basicConsume(QUEUE_NAME, autoAck, consumer);
              }
          }
          ---
          package com.sowhat.mq.routing;

          import com.rabbitmq.client.*;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          public class Recv2 {
              public static final String EXCHANGE_NAME = "test_exchange_direct";
              public static final String QUEUE_NAME = "test_queue_direct_2";

              public static void main(String[] args) throws IOException, TimeoutException {
                  Connection connection = ConnectionUtils.getConnection();
                  final Channel channel = connection.createChannel();

                  channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);
                  channel.basicQos(1);

                  // 綁定種類(lèi)似 Key
                  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
                  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
                  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");

                  //定義消費(fèi)者
                  DefaultConsumer consumer = new DefaultConsumer(channel) {
                      @Override // 事件觸發(fā)機(jī)制
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          String s = new String(body, "utf-8");
                          System.out.println("【2】:" + s);
                          try {
                              Thread.sleep(1000);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          } finally {
                              System.out.println("【2】 done");
                              // 手動(dòng)回執(zhí)
                              channel.basicAck(envelope.getDeliveryTag(), false);
                          }
                      }
                  };
                  // 自動(dòng)應(yīng)答
                  boolean autoAck = false;
                  channel.basicConsume(QUEUE_NAME, autoAck, consumer);

              }
          }

          WebUI:缺點(diǎn):路由key必須要明確,無(wú)法實(shí)現(xiàn)規(guī)則性模糊匹配。

          5. Topics 主題

          將路由鍵跟某個(gè)模式匹配,# 表示匹配 >=1個(gè)字符, *表示匹配一個(gè)。生產(chǎn)者會(huì)帶routingKey,但是消費(fèi)者的MQ會(huì)帶模糊routingKey。商品:發(fā)布、刪除、修改、查詢(xún)。

          package com.sowhat.mq.topic;

          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          public class Send {
              public static final String EXCHANGE_NAME = "test_exchange_topic";

              public static void main(String[] args) throws IOException, TimeoutException {
                  Connection connection = ConnectionUtils.getConnection();
                  Channel channel = connection.createChannel();
                  // exchange
                  channel.exchangeDeclare(EXCHANGE_NAME, "topic");

                  String msg = "商品!";

                  // 可以指定類(lèi)型
                  String routingKey = "goods.find";
                  channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
                  System.out.println("Send : " + msg);
                  channel.close();
                  connection.close();
              }
          }
          ---
          package com.sowhat.mq.topic;

          import com.rabbitmq.client.*;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          public class Recv1 {
              public static final String  EXCHANGE_NAME = "test_exchange_topic";
              public static final String QUEUE_NAME = "test_queue_topic_1";
              public static void main(String[] args) throws IOException, TimeoutException {
                  Connection connection = ConnectionUtils.getConnection();
                  final Channel channel = connection.createChannel();

                  channel.queueDeclare(QUEUE_NAME,false,false,false,null);
                  channel.basicQos(1);

                  channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");

                  //定義消費(fèi)者
                  DefaultConsumer consumer = new DefaultConsumer(channel) {

                      @Override // 事件觸發(fā)機(jī)制
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          String s = new String(body, "utf-8");
                          System.out.println("【1】:" + s);
                          try {
                              Thread.sleep(2000);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          } finally {
                              System.out.println("【1】 done");
                              // 手動(dòng)回執(zhí)
                              channel.basicAck(envelope.getDeliveryTag(),false);
                          }
                      }
                  };
                  // 自動(dòng)應(yīng)答
                  boolean autoAck = false;
                  channel.basicConsume(QUEUE_NAME, autoAck, consumer);
              }
          }
          ---
          package com.sowhat.mq.topic;

          import com.rabbitmq.client.*;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          public class Recv2 {
              public static final String EXCHANGE_NAME = "test_exchange_topic";
              public static final String QUEUE_NAME = "test_queue_topic_2";

              public static void main(String[] args) throws IOException, TimeoutException {
                  Connection connection = ConnectionUtils.getConnection();
                  final Channel channel = connection.createChannel();

                  channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);
                  channel.basicQos(1);
                  // 此乃重點(diǎn)
                  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");

                  //定義消費(fèi)者
                  DefaultConsumer consumer = new DefaultConsumer(channel) {
                      @Override // 事件觸發(fā)機(jī)制
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          String s = new String(body, "utf-8");
                          System.out.println("【2】:" + s);
                          try {
                              Thread.sleep(1000);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          } finally {
                              System.out.println("【2】 done");
                              // 手動(dòng)回執(zhí)
                              channel.basicAck(envelope.getDeliveryTag(), false);
                          }
                      }
                  };
                  // 自動(dòng)應(yīng)答
                  boolean autoAck = false;
                  channel.basicConsume(QUEUE_NAME, autoAck, consumer);
              }
          }

          6. MQ的持久化跟非持久化

          因?yàn)橄⒃趦?nèi)存中,如果MQ掛了那么消息也丟失了,所以應(yīng)該考慮MQ的持久化。MQ是支持持久化的,

          // 聲明隊(duì)列
          channel.queueDeclare(Send.QUEUE_NAME, falsefalsefalsenull);
              /**
               * Declare a queue
               * @see com.rabbitmq.client.AMQP.Queue.Declare
               * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
               * @param queue the name of the queue
               * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
               * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
               * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
               * @param arguments other properties (construction arguments) for the queue
               * @return a declaration-confirm method to indicate the queue was successfully declared
               * @throws java.io.IOException if an error is encountered
               */

              Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                           Map<String, Object> arguments)
           throws IOException
          ;

          boolean durable就是表明是否可以持久化,如果我們將程序中的durable = false改為true是不可以的!因?yàn)槲覀円呀?jīng)定義過(guò)的test_work_queue,這個(gè)queue已聲明為未持久化的。結(jié)論:MQ 不允許修改一個(gè)已經(jīng)存在的隊(duì)列參數(shù)。

          7. 消費(fèi)者端手動(dòng)跟自動(dòng)確認(rèn)消息


                  // 自動(dòng)應(yīng)答
                  boolean autoAck = false;
                  channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);

          當(dāng)MQ發(fā)送數(shù)據(jù)個(gè)消費(fèi)者后,消費(fèi)者要對(duì)收到對(duì)信息應(yīng)答給MQ。

          如果autoAck = true 表示「自動(dòng)確認(rèn)模式」,一旦MQ把消息分發(fā)給消費(fèi)者就會(huì)把消息從內(nèi)存中刪除。如果消費(fèi)者收到消息但是還沒(méi)有消費(fèi)完而MQ中數(shù)據(jù)已刪除則會(huì)導(dǎo)致丟失了正在處理對(duì)消息。

          如果autoAck = false表示「手動(dòng)確認(rèn)模式」,如果有個(gè)消費(fèi)者掛了,MQ因?yàn)闆](méi)有收到回執(zhí)信息可以把該信息再發(fā)送給其他對(duì)消費(fèi)者。

          MQ支持消息應(yīng)答(Message acknowledgement),消費(fèi)者發(fā)送一個(gè)消息應(yīng)答告訴MQ這個(gè)消息已經(jīng)被消費(fèi)了,MQ才從內(nèi)存中刪除。消息應(yīng)答模式「默認(rèn)為 false」

          8. RabbitMQ生產(chǎn)者端消息確認(rèn)機(jī)制(事務(wù) + confirm)

          在RabbitMQ中我們可以通過(guò)持久化來(lái)解決MQ服務(wù)器異常的數(shù)據(jù)丟失問(wèn)題,但是「生產(chǎn)者如何確保數(shù)據(jù)發(fā)送到MQ了」?默認(rèn)情況下生產(chǎn)者也是不知道的。如何解決 呢?

          1. AMQP事務(wù)

          第一種方式AMQP實(shí)現(xiàn)了事務(wù)機(jī)制,類(lèi)似mysql的事務(wù)機(jī)制。txSelect:用戶(hù)將當(dāng)前channel設(shè)置為transition模式。txCommit:用于提交事務(wù)。txRollback:用于回滾事務(wù)。

          以上都是對(duì)生產(chǎn)者對(duì)操作。

          package com.sowhat.mq.tx;

          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          public class TxSend {
              public static final String QUEUE_NAME = "test_queue_tx";

              public static void main(String[] args) throws IOException, TimeoutException {
                  Connection connection = ConnectionUtils.getConnection();
                  Channel channel = connection.createChannel();
                  channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);

                  String msg = "hello tx message";

                  try {
                      //開(kāi)啟事務(wù)模式
                      channel.txSelect();
                      channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
                      int x = 1 / 0;
                      
                      // 提交事務(wù)
                      channel.txCommit();
                  } catch (IOException e) {
                      // 回滾
                      channel.txRollback();
                      System.out.println("send message rollback");
                  } finally {
                      channel.close();
                      connection.close();
                  }
              }
          }
          ---
          package com.sowhat.mq.tx;

          import com.rabbitmq.client.*;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          public class TxRecv {
              public static final String QUEUE_NAME = "test_queue_tx";

              public static void main(String[] args) throws IOException, TimeoutException {
                  Connection connection = ConnectionUtils.getConnection();
                  Channel channel = connection.createChannel();

                  channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);

                  String s = channel.basicConsume(QUEUE_NAME, truenew DefaultConsumer(channel) {
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          System.out.println("recv[tx] msg:" + new String(body, "utf-8"));
                      }
                  });
                  channel.close();
                  connection.close();
              }
          }

          缺點(diǎn)就是大量對(duì)請(qǐng)求嘗試然后失敗然后回滾,會(huì)降低MQ的吞吐量。

          2. Confirm模式。

          「生產(chǎn)者端confirm實(shí)現(xiàn)原理」生產(chǎn)者將信道設(shè)置為confirm模式,一旦信道進(jìn)入了confirm模式,所以該信道上發(fā)布的信息都會(huì)被派一個(gè)唯一的ID(從1開(kāi)始),一旦消息被投遞到所有的匹配隊(duì)列后,Broker就回發(fā)送一個(gè)確認(rèn)給生產(chǎn)者(包含消息唯一ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊(duì)列了,如果消息跟隊(duì)列是可持久化的,那么確認(rèn)消息會(huì)在消息寫(xiě)入到磁盤(pán)后才發(fā)出。broker回傳給生產(chǎn)者到確認(rèn)消息中deliver-tag域包含了確認(rèn)消息到序列號(hào),此外broker也可以設(shè)置basic.ack的multiple域,表示這個(gè)序列號(hào)之前所以信息都已經(jīng)得到處理。

          Confirm模式最大的好處在于是異步的。第一條消息發(fā)送后不用一直等待回復(fù)后才發(fā)第二條消息。

          開(kāi)啟confirm模式:channel.confimSelect()編程模式:

          1. 普通的發(fā)送一個(gè)消息后就 waitForConfirms()
          package com.sowhat.confirm;

          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          public class Send1 {
              public static final String QUEUE_NAME = "test_queue_confirm1";

              public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
                  Connection connection = ConnectionUtils.getConnection();
                  Channel channel = connection.createChannel();
                  channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);

                  // 將channel模式設(shè)置為 confirm模式,注意設(shè)置這個(gè)不能設(shè)置為事務(wù)模式。
                  channel.confirmSelect();

                  String msg = "hello confirm message";
                  channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
                  if (!channel.waitForConfirms()) {
                      System.out.println("消息發(fā)送失敗");
                  } else {
                      System.out.println("消息發(fā)送OK");
                  }
                  channel.close();
                  connection.close();
              }
          }
          ---
          package com.sowhat.confirm;

          import com.rabbitmq.client.*;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          public class Recv {
              public static final String QUEUE_NAME = "test_queue_confirm1";

              public static void main(String[] args) throws IOException, TimeoutException {
                  Connection connection = ConnectionUtils.getConnection();
                  Channel channel = connection.createChannel();

                  channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);

                  String s = channel.basicConsume(QUEUE_NAME, truenew DefaultConsumer(channel) {
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          System.out.println("recv[tx] msg:" + new String(body, "utf-8"));
                      }
                  });
              }
          }
          2. 批量的發(fā)一批數(shù)據(jù) waitForConfirms()
          package com.sowhat.confirm;

          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.concurrent.TimeoutException;

          public class Send2 {
              public static final String QUEUE_NAME = "test_queue_confirm1";

              public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
                  Connection connection = ConnectionUtils.getConnection();
                  Channel channel = connection.createChannel();
                  channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);

                  // 將channel模式設(shè)置為 confirm模式,注意設(shè)置這個(gè)不能設(shè)置為事務(wù)模式。
                  channel.confirmSelect();

                  String msg = "hello confirm message";
                  // 批量發(fā)送
                  for (int i = 0; i < 10; i++) {
                      channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
                  }
                  // 確認(rèn)
                  if (!channel.waitForConfirms()) {
                      System.out.println("消息發(fā)送失敗");
                  } else {
                      System.out.println("消息發(fā)送OK");
                  }
                  channel.close();
                  connection.close();
              }
          }
          ---
          接受信息跟上面一樣
          3. 異步confirm模式,提供一個(gè)回調(diào)方法。

          Channel對(duì)象提供的ConfirmListener()回調(diào)方法只包含deliveryTag(包含當(dāng)前發(fā)出消息序號(hào)),我們需要自己為每一個(gè)Channel維護(hù)一個(gè)unconfirm的消息序號(hào)集合,每publish一條數(shù)據(jù),集合中元素加1,每回調(diào)一次handleAck方法,unconfirm集合刪掉響應(yīng)的一條(multiple=false)或多條(multiple=true)記錄,從運(yùn)行效率來(lái)看,unconfirm集合最好采用有序集合SortedSet存儲(chǔ)結(jié)構(gòu)。

          package com.sowhat.mq.confirm;

          import com.rabbitmq.client.*;
          import com.sowhat.mq.util.ConnectionUtils;

          import java.io.IOException;
          import java.util.Collections;
          import java.util.SortedSet;
          import java.util.TreeSet;
          import java.util.concurrent.TimeoutException;

          public class Send3 {
              public static final String QUEUE_NAME = "test_queue_confirm3";

              public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
                  Connection connection = ConnectionUtils.getConnection();
                  Channel channel = connection.createChannel();
                  channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);

                  //生產(chǎn)者調(diào)用confirmSelect
                  channel.confirmSelect();

                  // 存放未確認(rèn)消息
                  final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

                 // 添加監(jiān)聽(tīng)通道
                  channel.addConfirmListener(new ConfirmListener() {
                      // 回執(zhí)有問(wèn)題的
                      public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                          if (multiple) {
                              System.out.println("--handleNack---multiple");
                              confirmSet.headSet(deliveryTag + 1).clear();
                          } else {
                              System.out.println("--handleNack-- multiple false");
                              confirmSet.remove(deliveryTag);
                          }
                      }

                      // 沒(méi)有問(wèn)題的handleAck
                      public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                          if (multiple) {
                              System.out.println("--handleAck---multiple");
                              confirmSet.headSet(deliveryTag + 1).clear();
                          } else {
                              System.out.println("--handleAck--multiple false");
                              confirmSet.remove(deliveryTag);
                          }
                      }
                  });

                  // 一般情況下是先開(kāi)啟 消費(fèi)者,指定好 exchange跟routingkey,如果生產(chǎn)者等routingkey 就會(huì)觸發(fā)這個(gè)return 方法
                  channel.addReturnListener(new ReturnListener() {
                      public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          System.out.println("---- handle return----");
                          System.out.println("replyCode:" + replyCode );
                          System.out.println("replyText:" +replyText );
                          System.out.println("exchange:" + exchange);
                          System.out.println("routingKey:" + routingKey);
                          System.out.println("properties:" + properties);
                          System.out.println("body:" + new String(body));
                      }
                  });

                  String msgStr = "sssss";
                  while(true){
                      long nextPublishSeqNo = channel.getNextPublishSeqNo();
                      channel.basicPublish("",QUEUE_NAME,null,msgStr.getBytes());
                      confirmSet.add(nextPublishSeqNo);
                      Thread.sleep(1000);
                  }
              }
          }

          總結(jié):AMQP模式相對(duì)來(lái)說(shuō)沒(méi)Confirm模式性能好些,推薦使用后者。

          9. RabbitMQ延遲隊(duì)列 跟死信

          淘寶訂單付款,驗(yàn)證碼等限時(shí)類(lèi)型服務(wù)。

                  Map<String,Object> headers =  new HashMap<String,Object>();
                  headers.put("my1","111");
                  headers.put("my2","222");
                  AMQP.BasicProperties build = new AMQP.BasicProperties().builder().deliveryMode(2).contentEncoding("utf-8").expiration("10000").headers(headers).build();

          死信的處理:

          10. SpringBoot Tpoic Demo

          需求圖:新建SpringBoot 項(xiàng)目添加如下依賴(lài):

                 <dependency>
                      <groupId>org.springframework.boot</groupId>
                      <artifactId>spring-boot-starter-amqp</artifactId>
                  </dependency>
          1. 生產(chǎn)者

          application.yml

          spring:
            rabbitmq:
              host: 127.0.0.1
              username: admin
              password: admin

          測(cè)試用例:

          package com.sowhat.mqpublisher;

          import org.junit.jupiter.api.Test;
          import org.springframework.amqp.core.AmqpTemplate;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.boot.test.context.SpringBootTest;

          @SpringBootTest
          class MqpublisherApplicationTests {
              @Autowired
              private AmqpTemplate amqpTemplate;

              @Test
              void userInfo() {
                  /**
                   * exchange,routingKey,message
                   */

                  this.amqpTemplate.convertAndSend("log.topic","user.log.error","Users...");
              }
          }
          2. 消費(fèi)者

          application.xml

          spring:
            rabbitmq:
              host: 127.0.0.1
              username: admin
              password: admin

          # 自定義配置
          mq:
            config:
              exchange_name: log.topic
              # 配置隊(duì)列名稱(chēng)
              queue_name:
                  info: log.info
                  error: log.error
                  logs: log.logs

          三個(gè)不同的消費(fèi)者:

          package com.sowhat.mqconsumer.service;

          import org.springframework.amqp.core.ExchangeTypes;
          import org.springframework.amqp.rabbit.annotation.Exchange;
          import org.springframework.amqp.rabbit.annotation.Queue;
          import org.springframework.amqp.rabbit.annotation.QueueBinding;
          import org.springframework.amqp.rabbit.annotation.RabbitListener;
          import org.springframework.stereotype.Service;

          /**
           * @QueueBinding value屬性:用于綁定一個(gè)隊(duì)列。@Queue去查找一個(gè)名字為value屬性中的值得隊(duì)列,如果沒(méi)有則創(chuàng)建,如果有則返回
           * type = ExchangeTypes.TOPIC 指定交換器類(lèi)型。默認(rèn)的direct交換器
           */

          @Service
          public class ErrorReceiverService {

              /**
                * 把一個(gè)方法跟一個(gè)隊(duì)列進(jìn)行綁定,收到消息后綁定給msg
               */

              @RabbitListener(bindings = @QueueBinding(
                      value = @Queue(value = "${mq.config.queue_name.error}"),
                      exchange = @Exchange(value = "${mq.config.exchange_name}", type = ExchangeTypes.TOPIC),
                      key = "*.log.error"
                  )
              )
              public void process(String msg) {
                  System.out.println(msg + " Logs...........");
              }
          }
          ---
          package com.sowhat.mqconsumer.service;

          import org.springframework.amqp.core.ExchangeTypes;
          import org.springframework.amqp.rabbit.annotation.Exchange;
          import org.springframework.amqp.rabbit.annotation.Queue;
          import org.springframework.amqp.rabbit.annotation.QueueBinding;
          import org.springframework.amqp.rabbit.annotation.RabbitListener;
          import org.springframework.stereotype.Service;

          /**
           * @QueueBinding value屬性:用于綁定一個(gè)隊(duì)列。
           * @Queue去查找一個(gè)名字為value屬性中的值得隊(duì)列,如果沒(méi)有則創(chuàng)建,如果有則返回
           */

          @Service
          public class InfoReceiverService {

              /**
               * 添加一個(gè)能夠處理消息的方法
               */

              @RabbitListener(bindings = @QueueBinding(
                      value = @Queue(value ="${mq.config.queue_name.info}"),
                      exchange = @Exchange(value = "${mq.config.exchange_name}",type = ExchangeTypes.TOPIC),
                      key = "*.log.info"
              ))
              public void process(String msg){
                  System.out.println(msg+" Info...........");

              }
          }
          --
          package com.sowhat.mqconsumer.service;

          import org.springframework.amqp.core.ExchangeTypes;
          import org.springframework.amqp.rabbit.annotation.Exchange;
          import org.springframework.amqp.rabbit.annotation.Queue;
          import org.springframework.amqp.rabbit.annotation.QueueBinding;
          import org.springframework.amqp.rabbit.annotation.RabbitListener;
          import org.springframework.stereotype.Service;

          /**
           * @QueueBinding value屬性:用于綁定一個(gè)隊(duì)列。
           * @Queue去查找一個(gè)名字為value屬性中的值得隊(duì)列,如果沒(méi)有則創(chuàng)建,如果有則返回
           */

          @Service
          public class LogsReceiverService {

              /**
               * 添加一個(gè)能夠處理消息的方法
               */

              @RabbitListener(bindings = @QueueBinding(
                      value = @Queue(value ="${mq.config.queue_name.logs}"),
                      exchange = @Exchange(value = "${mq.config.exchange_name}",type = ExchangeTypes.TOPIC),
                      key = "*.log.*"
              ))
              public void process(String msg){
                  System.out.println(msg+" Error...........");
              }
          }

          詳細(xì)安裝跟代碼看參考下載:

          總結(jié)

          如果需要指定模式一般是在消費(fèi)者端設(shè)置,靈活性調(diào)節(jié)。

          模式生產(chǎn)者Queue生產(chǎn)者exchange生產(chǎn)者routingKey消費(fèi)者exchange消費(fèi)者queueroutingKey
          Simple(簡(jiǎn)單模式少用)指定不指定不指定不指定指定不指定
          WorkQueue(多個(gè)消費(fèi)者少用)指定不指定不指定不指定指定不指定
          fanout(publish/subscribe模式)不指定指定不指定指定指定不指定
          direct(路由模式)不指定指定指定指定指定消費(fèi)者routingKey精確指定多個(gè)
          topic(主題模糊匹配)不指定指定指定指定指定消費(fèi)者routingKey可以進(jìn)行模糊匹配


          -End-


          最近有一些小伙伴,讓我?guī)兔φ乙恍?nbsp;面試題 資料,于是我翻遍了收藏的 5T 資料后,匯總整理出來(lái),可以說(shuō)是程序員面試必備!所有資料都整理到網(wǎng)盤(pán)了,歡迎下載!


          點(diǎn)擊??卡片,關(guān)注后回復(fù)【面試題】即可獲取

          在看點(diǎn)這里好文分享給更多人↓↓

          瀏覽 18
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  美女超碰在线 | 青青草男人天堂 | 狠狠操狠狠爱五月婷婷 | 激情综合激情网 | 丝袜网站 |