速入! 3W字帶你迅速上手MQ
?高清思維導(dǎo)圖已同步Git:https://github.com/SoWhat1412/xmindfile,關(guān)注公眾號(hào)sowhat1412獲取海量資源
?

1. 消息隊(duì)列解決了什么問(wèn)題
消息中間件是目前比較流行的一個(gè)中間件,其中RabbitMQ更是占有一定的市場(chǎng)份額,主要用來(lái)做異步處理、應(yīng)用解耦、流量削峰、日志處理等等方面。
1. 異步處理
一個(gè)用戶(hù)登陸網(wǎng)址注冊(cè),然后系統(tǒng)發(fā)短信跟郵件告知注冊(cè)成功,一般有三種解決方法。
串行到依次執(zhí)行,問(wèn)題是用戶(hù)注冊(cè)后就可以使用了,沒(méi)必要等驗(yàn)證碼跟郵件。 注冊(cè)成功后,郵件跟驗(yàn)證碼用并行等方式執(zhí)行,問(wèn)題是郵件跟驗(yàn)證碼是非重要的任務(wù),系統(tǒng)注冊(cè)還要等這倆完成么? 基于異步MQ的處理,用戶(hù)注冊(cè)成功后直接把信息異步發(fā)送到MQ中,然后郵件系統(tǒng)跟驗(yàn)證碼系統(tǒng)主動(dòng)去拉取數(shù)據(jù)。 
2. 應(yīng)用解耦
比如我們有一個(gè)訂單系統(tǒng),還要一個(gè)庫(kù)存系統(tǒng),用戶(hù)下訂單了就要調(diào)用下庫(kù)存系統(tǒng)來(lái)處理,直接調(diào)用到話(huà)庫(kù)存系統(tǒng)出現(xiàn)問(wèn)題咋辦呢?
3. 流量削峰
舉辦一個(gè) 秒殺活動(dòng),如何較好到設(shè)計(jì)?服務(wù)層直接接受瞬間搞密度訪問(wèn)絕對(duì)不可以起碼要加入一個(gè)MQ。
4. 日志處理
用戶(hù)通過(guò)WebUI訪問(wèn)發(fā)送請(qǐng)求到時(shí)候后端如何接受跟處理呢一般?
2. RabbitMQ 安裝跟配置
官網(wǎng):https://www.rabbitmq.com/download.html
開(kāi)發(fā)語(yǔ)言:https://www.erlang.org/
正式到安裝跟允許需要Erlang跟RabbitMQ倆版本之間相互兼容!我這里圖省事直接用Docker 拉取鏡像了。下載:
開(kāi)啟:管理頁(yè)面 默認(rèn)賬號(hào):guest 默認(rèn)密碼:guest 。Docker啟動(dòng)時(shí)候可以指定賬號(hào)密碼對(duì)外端口以及
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin-e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672-p 61613:61613 -p 1883:1883 rabbitmq:management
啟動(dòng):
用戶(hù)添加:
vitrual hosts 相當(dāng)于mysql中的DB。創(chuàng)建一個(gè)virtual hosts,一般以/ 開(kāi)頭。
對(duì)用戶(hù)進(jìn)行授權(quán),點(diǎn)擊/vhost_mmr,
至于WebUI多點(diǎn)點(diǎn)即可了解。
3. 實(shí)戰(zhàn)
RabbitMQ 官網(wǎng)支持任務(wù)模式:https://www.rabbitmq.com/getstarted.htm
l創(chuàng)建Maven項(xiàng)目導(dǎo)入必要依賴(lài):
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
</dependencies>
0. 獲取MQ連接
package com.sowhat.mq.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionUtils {
/**
* 連接器
* @return
* @throws IOException
* @throws TimeoutException
*/
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/vhost_mmr");
factory.setUsername("user_mmr");
factory.setPassword("sowhat");
Connection connection = factory.newConnection();
return connection;
}
}
1. 簡(jiǎn)單隊(duì)列
P:Producer 消息的生產(chǎn)者
中間:Queue消息隊(duì)列
C:Consumer 消息的消費(fèi)者
package com.sowhat.mq.simple;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取一個(gè)連接
Connection connection = ConnectionUtils.getConnection();
// 從連接獲取一個(gè)通道
Channel channel = connection.createChannel();
// 創(chuàng)建隊(duì)列聲明
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "hello Simple";
// exchange,隊(duì)列,參數(shù),消息字節(jié)體
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("--send msg:" + msg);
channel.close();
connection.close();
}
}
---
package com.sowhat.mq.simple;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消費(fèi)者獲取消息
*/
public class Recv {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
newApi();
oldApi();
}
private static void newApi() throws IOException, TimeoutException {
// 創(chuàng)建連接
Connection connection = ConnectionUtils.getConnection();
// 創(chuàng)建頻道
Channel channel = connection.createChannel();
// 隊(duì)列聲明 隊(duì)列名,是否持久化,是否獨(dú)占模式,無(wú)消息后是否自動(dòng)刪除,消息攜帶參數(shù)
channel.queueDeclare(Send.QUEUE_NAME,false,false,false,null);
// 定義消費(fèi)者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override // 事件模型,消息來(lái)了會(huì)觸發(fā)該函數(shù)
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("---new api recv:" + s);
}
};
// 監(jiān)聽(tīng)隊(duì)列
channel.basicConsume(Send.QUEUE_NAME,true,defaultConsumer);
}
// 老方法 消費(fèi)者 MQ 在3。4以下 用次方法,
private static void oldApi() throws IOException, TimeoutException, InterruptedException {
// 創(chuàng)建連接
Connection connection = ConnectionUtils.getConnection();
// 創(chuàng)建頻道
Channel channel = connection.createChannel();
// 定義隊(duì)列消費(fèi)者
QueueingConsumer consumer = new QueueingConsumer(channel);
//監(jiān)聽(tīng)隊(duì)列
channel.basicConsume(Send.QUEUE_NAME, true, consumer);
while (true) {
// 發(fā)貨體
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
byte[] body = delivery.getBody();
String s = new String(body);
System.out.println("---Recv:" + s);
}
}
}
右上角有可以設(shè)置頁(yè)面刷新頻率,然后可以在UI界面直接手動(dòng)消費(fèi)掉,如下圖:
簡(jiǎn)單隊(duì)列的不足:耦合性過(guò)高,生產(chǎn)者一一對(duì)應(yīng)消費(fèi)者,如果有多個(gè)消費(fèi)者想消費(fèi)隊(duì)列中信息就無(wú)法實(shí)現(xiàn)了。
2. WorkQueue 工作隊(duì)列
Simple隊(duì)列中只能一一對(duì)應(yīng)的生產(chǎn)消費(fèi),實(shí)際開(kāi)發(fā)中生產(chǎn)者發(fā)消息很簡(jiǎn)單,而消費(fèi)者要跟業(yè)務(wù)結(jié)合,消費(fèi)者接受到消息后要處理從而會(huì)耗時(shí)。「可能會(huì)出現(xiàn)隊(duì)列中出現(xiàn)消息積壓」。所以如果多個(gè)消費(fèi)者可以加速消費(fèi)。
1. round robin 輪詢(xún)分發(fā)
代碼編程一個(gè)生產(chǎn)者兩個(gè)消費(fèi)者:
package com.sowhat.mq.work;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 獲取連接
Connection connection = ConnectionUtils.getConnection();
// 獲取 channel
Channel channel = connection.createChannel();
// 聲明隊(duì)列
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i <50 ; i++) {
String msg = "hello-" + i;
System.out.println("WQ send " + msg);
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
Thread.sleep(i*20);
}
channel.close();
connection.close();
}
}
---
package com.sowhat.mq.work;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取連接
Connection connection = ConnectionUtils.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
//定義消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件觸發(fā)機(jī)制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【1】:" + s);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【1】 done");
}
}
};
boolean autoAck = true;
channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
}
}
---
package com.sowhat.mq.work;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取連接
Connection connection = ConnectionUtils.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
//定義消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件觸發(fā)機(jī)制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【2】:" + s);
try {
Thread.sleep(1000 );
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【2】 done");
}
}
};
boolean autoAck = true;
channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
}
}
現(xiàn)象:消費(fèi)者1 跟消費(fèi)者2 處理的數(shù)據(jù)量完全一樣的個(gè)數(shù):消費(fèi)者1:處理偶數(shù)
消費(fèi)者2:處理奇數(shù)
這種方式叫輪詢(xún)分發(fā)(round-robin)結(jié)果就是不管兩個(gè)消費(fèi)者誰(shuí)忙,「數(shù)據(jù)總是你一個(gè)我一個(gè)」,MQ 給兩個(gè)消費(fèi)發(fā)數(shù)據(jù)的時(shí)候是不知道消費(fèi)者性能的,默認(rèn)就是雨露均沾。此時(shí) autoAck = true。
2. 公平分發(fā) fair dipatch
如果要實(shí)現(xiàn)公平分發(fā),要讓消費(fèi)者消費(fèi)完畢一條數(shù)據(jù)后就告知MQ,再讓MQ發(fā)數(shù)據(jù)即可。自動(dòng)應(yīng)答要關(guān)閉!
package com.sowhat.mq.work;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 獲取連接
Connection connection = ConnectionUtils.getConnection();
// 獲取 channel
Channel channel = connection.createChannel();
// s聲明隊(duì)列
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 每個(gè)消費(fèi)者發(fā)送確認(rèn)消息之前,消息隊(duì)列不發(fā)送下一個(gè)消息到消費(fèi)者,一次只發(fā)送一個(gè)消息
// 從而限制一次性發(fā)送給消費(fèi)者到消息不得超過(guò)1個(gè)。
int perfetchCount = 1;
channel.basicQos(perfetchCount);
for (int i = 0; i <50 ; i++) {
String msg = "hello-" + i;
System.out.println("WQ send " + msg);
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
Thread.sleep(i*20);
}
channel.close();
connection.close();
}
}
---
package com.sowhat.mq.work;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取連接
Connection connection = ConnectionUtils.getConnection();
// 獲取通道
final Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
// 保證一次只分發(fā)一個(gè)
channel.basicQos(1);
//定義消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件觸發(fā)機(jī)制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【1】:" + s);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【1】 done");
// 手動(dòng)回執(zhí)
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自動(dòng)應(yīng)答
boolean autoAck = false;
channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
}
}
---
package com.sowhat.mq.work;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取連接
Connection connection = ConnectionUtils.getConnection();
// 獲取通道
final Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
// 保證一次只分發(fā)一個(gè)
channel.basicQos(1);
//定義消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件觸發(fā)機(jī)制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【2】:" + s);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【2】 done");
// 手動(dòng)回執(zhí)
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自動(dòng)應(yīng)答
boolean autoAck = false;
channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
}
}
結(jié)果:實(shí)現(xiàn)了公平分發(fā),消費(fèi)者2 是消費(fèi)者1消費(fèi)數(shù)量的2倍。
3. publish/subscribe 發(fā)布訂閱模式
類(lèi)似公眾號(hào)的訂閱跟發(fā)布,無(wú)需指定routingKey:
解讀:
一個(gè)生產(chǎn)者多個(gè)消費(fèi)者 每一個(gè)消費(fèi)者都有一個(gè)自己的隊(duì)列 生產(chǎn)者沒(méi)有把消息直接發(fā)送到隊(duì)列而是發(fā)送到了 交換機(jī)轉(zhuǎn)化器(exchange)。每一個(gè)隊(duì)列都要綁定到交換機(jī)上。 生產(chǎn)者發(fā)送的消息經(jīng)過(guò)交換機(jī)到達(dá)隊(duì)列,從而實(shí)現(xiàn)一個(gè)消息被多個(gè)消費(fèi)者消費(fèi)。
生產(chǎn)者:
package com.sowhat.mq.ps;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//聲明交換機(jī)
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 分發(fā)= fanout
// 發(fā)送消息
String msg = "hello ps ";
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
System.out.println("Send:" + msg);
channel.close();
connection.close();
}
}

消息哪兒去了?丟失了,在RabbitMQ中只有隊(duì)列有存儲(chǔ)能力,「因?yàn)檫@個(gè)時(shí)候隊(duì)列還沒(méi)有綁定到交換機(jī) 所以消息丟失了」。消費(fèi)者:
package com.sowhat.mq.ps;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv1 {
public static final String QUEUE_NAME = "test_queue_fanout_email";
public static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 隊(duì)列聲明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 綁定隊(duì)列到交換機(jī)轉(zhuǎn)發(fā)器
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"" );
// 保證一次只分發(fā)一個(gè)
channel.basicQos(1);
//定義消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件觸發(fā)機(jī)制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【1】:" + s);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【1】 done");
// 手動(dòng)回執(zhí)
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自動(dòng)應(yīng)答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
---
package com.sowhat.mq.ps;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv2 {
public static final String QUEUE_NAME = "test_queue_fanout_sms";
public static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 隊(duì)列聲明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 綁定隊(duì)列到交換機(jī)轉(zhuǎn)發(fā)器
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"" );
// 保證一次只分發(fā)一個(gè)
channel.basicQos(1);
//定義消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件觸發(fā)機(jī)制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【2】:" + s);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【2】 done");
// 手動(dòng)回執(zhí)
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自動(dòng)應(yīng)答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
「同時(shí)還可以自己手動(dòng)的添加一個(gè)隊(duì)列監(jiān)控到該exchange」
4. routing 路由選擇 通配符模式
Exchange(交換機(jī),轉(zhuǎn)發(fā)器):「一方面接受生產(chǎn)者消息,另一方面是向隊(duì)列推送消息」。匿名轉(zhuǎn)發(fā)用 "" 表示,比如前面到簡(jiǎn)單隊(duì)列跟WorkQueue。fanout:不處理路由鍵。「不需要指定routingKey」,我們只需要把隊(duì)列綁定到交換機(jī), 「消息就會(huì)被發(fā)送到所有到隊(duì)列中」。direct:處理路由鍵,「需要指定routingKey」,此時(shí)生產(chǎn)者發(fā)送數(shù)據(jù)到時(shí)候會(huì)指定key,任務(wù)隊(duì)列也會(huì)指定key,只有key一樣消息才會(huì)被傳送到隊(duì)列中。如下圖
package com.sowhat.mq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// exchange
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
String msg = "hello info!";
// 可以指定類(lèi)型
String routingKey = "info";
channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
System.out.println("Send : " + msg);
channel.close();
connection.close();
}
}
---
package com.sowhat.mq.routing;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv1 {
public static final String EXCHANGE_NAME = "test_exchange_direct";
public static final String QUEUE_NAME = "test_queue_direct_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicQos(1);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
//定義消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件觸發(fā)機(jī)制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【1】:" + s);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【1】 done");
// 手動(dòng)回執(zhí)
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自動(dòng)應(yīng)答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
---
package com.sowhat.mq.routing;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv2 {
public static final String EXCHANGE_NAME = "test_exchange_direct";
public static final String QUEUE_NAME = "test_queue_direct_2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
// 綁定種類(lèi)似 Key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
//定義消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件觸發(fā)機(jī)制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【2】:" + s);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【2】 done");
// 手動(dòng)回執(zhí)
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 自動(dòng)應(yīng)答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
WebUI:
缺點(diǎn):路由key必須要明確,無(wú)法實(shí)現(xiàn)規(guī)則性模糊匹配。
5. Topics 主題
將路由鍵跟某個(gè)模式匹配,# 表示匹配 >=1個(gè)字符, *表示匹配一個(gè)。生產(chǎn)者會(huì)帶routingKey,但是消費(fèi)者的MQ會(huì)帶模糊routingKey。
商品:發(fā)布、刪除、修改、查詢(xún)。
package com.sowhat.mq.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String msg = "商品!";
// 可以指定類(lèi)型
String routingKey = "goods.find";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
System.out.println("Send : " + msg);
channel.close();
connection.close();
}
}
---
package com.sowhat.mq.topic;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv1 {
public static final String EXCHANGE_NAME = "test_exchange_topic";
public static final String QUEUE_NAME = "test_queue_topic_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicQos(1);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");
//定義消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件觸發(fā)機(jī)制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【1】:" + s);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【1】 done");
// 手動(dòng)回執(zhí)
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自動(dòng)應(yīng)答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
---
package com.sowhat.mq.topic;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv2 {
public static final String EXCHANGE_NAME = "test_exchange_topic";
public static final String QUEUE_NAME = "test_queue_topic_2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
// 此乃重點(diǎn)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");
//定義消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件觸發(fā)機(jī)制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【2】:" + s);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【2】 done");
// 手動(dòng)回執(zhí)
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 自動(dòng)應(yīng)答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
6. MQ的持久化跟非持久化
因?yàn)橄⒃趦?nèi)存中,如果MQ掛了那么消息也丟失了,所以應(yīng)該考慮MQ的持久化。MQ是支持持久化的,
// 聲明隊(duì)列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
boolean durable就是表明是否可以持久化,如果我們將程序中的durable = false改為true是不可以的!因?yàn)槲覀円呀?jīng)定義過(guò)的test_work_queue,這個(gè)queue已聲明為未持久化的。結(jié)論:MQ 不允許修改一個(gè)已經(jīng)存在的隊(duì)列參數(shù)。
7. 消費(fèi)者端手動(dòng)跟自動(dòng)確認(rèn)消息

// 自動(dòng)應(yīng)答
boolean autoAck = false;
channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
當(dāng)MQ發(fā)送數(shù)據(jù)個(gè)消費(fèi)者后,消費(fèi)者要對(duì)收到對(duì)信息應(yīng)答給MQ。
如果autoAck = true 表示「自動(dòng)確認(rèn)模式」,一旦MQ把消息分發(fā)給消費(fèi)者就會(huì)把消息從內(nèi)存中刪除。如果消費(fèi)者收到消息但是還沒(méi)有消費(fèi)完而MQ中數(shù)據(jù)已刪除則會(huì)導(dǎo)致丟失了正在處理對(duì)消息。
如果autoAck = false表示「手動(dòng)確認(rèn)模式」,如果有個(gè)消費(fèi)者掛了,MQ因?yàn)闆](méi)有收到回執(zhí)信息可以把該信息再發(fā)送給其他對(duì)消費(fèi)者。
MQ支持消息應(yīng)答(Message acknowledgement),消費(fèi)者發(fā)送一個(gè)消息應(yīng)答告訴MQ這個(gè)消息已經(jīng)被消費(fèi)了,MQ才從內(nèi)存中刪除。消息應(yīng)答模式「默認(rèn)為 false」。
8. RabbitMQ生產(chǎn)者端消息確認(rèn)機(jī)制(事務(wù) + confirm)
在RabbitMQ中我們可以通過(guò)持久化來(lái)解決MQ服務(wù)器異常的數(shù)據(jù)丟失問(wèn)題,但是「生產(chǎn)者如何確保數(shù)據(jù)發(fā)送到MQ了」?默認(rèn)情況下生產(chǎn)者也是不知道的。如何解決 呢?
1. AMQP事務(wù)
第一種方式AMQP實(shí)現(xiàn)了事務(wù)機(jī)制,類(lèi)似mysql的事務(wù)機(jī)制。txSelect:用戶(hù)將當(dāng)前channel設(shè)置為transition模式。txCommit:用于提交事務(wù)。txRollback:用于回滾事務(wù)。
以上都是對(duì)生產(chǎn)者對(duì)操作。
package com.sowhat.mq.tx;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TxSend {
public static final String QUEUE_NAME = "test_queue_tx";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "hello tx message";
try {
//開(kāi)啟事務(wù)模式
channel.txSelect();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
int x = 1 / 0;
// 提交事務(wù)
channel.txCommit();
} catch (IOException e) {
// 回滾
channel.txRollback();
System.out.println("send message rollback");
} finally {
channel.close();
connection.close();
}
}
}
---
package com.sowhat.mq.tx;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TxRecv {
public static final String QUEUE_NAME = "test_queue_tx";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String s = channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv[tx] msg:" + new String(body, "utf-8"));
}
});
channel.close();
connection.close();
}
}
缺點(diǎn)就是大量對(duì)請(qǐng)求嘗試然后失敗然后回滾,會(huì)降低MQ的吞吐量。
2. Confirm模式。
「生產(chǎn)者端confirm實(shí)現(xiàn)原理」生產(chǎn)者將信道設(shè)置為confirm模式,一旦信道進(jìn)入了confirm模式,所以該信道上發(fā)布的信息都會(huì)被派一個(gè)唯一的ID(從1開(kāi)始),一旦消息被投遞到所有的匹配隊(duì)列后,Broker就回發(fā)送一個(gè)確認(rèn)給生產(chǎn)者(包含消息唯一ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊(duì)列了,如果消息跟隊(duì)列是可持久化的,那么確認(rèn)消息會(huì)在消息寫(xiě)入到磁盤(pán)后才發(fā)出。broker回傳給生產(chǎn)者到確認(rèn)消息中deliver-tag域包含了確認(rèn)消息到序列號(hào),此外broker也可以設(shè)置basic.ack的multiple域,表示這個(gè)序列號(hào)之前所以信息都已經(jīng)得到處理。
Confirm模式最大的好處在于是異步的。第一條消息發(fā)送后不用一直等待回復(fù)后才發(fā)第二條消息。
開(kāi)啟confirm模式:channel.confimSelect()編程模式:
1. 普通的發(fā)送一個(gè)消息后就 waitForConfirms()
package com.sowhat.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send1 {
public static final String QUEUE_NAME = "test_queue_confirm1";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 將channel模式設(shè)置為 confirm模式,注意設(shè)置這個(gè)不能設(shè)置為事務(wù)模式。
channel.confirmSelect();
String msg = "hello confirm message";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
if (!channel.waitForConfirms()) {
System.out.println("消息發(fā)送失敗");
} else {
System.out.println("消息發(fā)送OK");
}
channel.close();
connection.close();
}
}
---
package com.sowhat.confirm;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv {
public static final String QUEUE_NAME = "test_queue_confirm1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String s = channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv[tx] msg:" + new String(body, "utf-8"));
}
});
}
}
2. 批量的發(fā)一批數(shù)據(jù) waitForConfirms()
package com.sowhat.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send2 {
public static final String QUEUE_NAME = "test_queue_confirm1";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 將channel模式設(shè)置為 confirm模式,注意設(shè)置這個(gè)不能設(shè)置為事務(wù)模式。
channel.confirmSelect();
String msg = "hello confirm message";
// 批量發(fā)送
for (int i = 0; i < 10; i++) {
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
}
// 確認(rèn)
if (!channel.waitForConfirms()) {
System.out.println("消息發(fā)送失敗");
} else {
System.out.println("消息發(fā)送OK");
}
channel.close();
connection.close();
}
}
---
接受信息跟上面一樣
3. 異步confirm模式,提供一個(gè)回調(diào)方法。
Channel對(duì)象提供的ConfirmListener()回調(diào)方法只包含deliveryTag(包含當(dāng)前發(fā)出消息序號(hào)),我們需要自己為每一個(gè)Channel維護(hù)一個(gè)unconfirm的消息序號(hào)集合,每publish一條數(shù)據(jù),集合中元素加1,每回調(diào)一次handleAck方法,unconfirm集合刪掉響應(yīng)的一條(multiple=false)或多條(multiple=true)記錄,從運(yùn)行效率來(lái)看,unconfirm集合最好采用有序集合SortedSet存儲(chǔ)結(jié)構(gòu)。

package com.sowhat.mq.confirm;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
public class Send3 {
public static final String QUEUE_NAME = "test_queue_confirm3";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//生產(chǎn)者調(diào)用confirmSelect
channel.confirmSelect();
// 存放未確認(rèn)消息
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
// 添加監(jiān)聽(tīng)通道
channel.addConfirmListener(new ConfirmListener() {
// 回執(zhí)有問(wèn)題的
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("--handleNack---multiple");
confirmSet.headSet(deliveryTag + 1).clear();
} else {
System.out.println("--handleNack-- multiple false");
confirmSet.remove(deliveryTag);
}
}
// 沒(méi)有問(wèn)題的handleAck
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("--handleAck---multiple");
confirmSet.headSet(deliveryTag + 1).clear();
} else {
System.out.println("--handleAck--multiple false");
confirmSet.remove(deliveryTag);
}
}
});
// 一般情況下是先開(kāi)啟 消費(fèi)者,指定好 exchange跟routingkey,如果生產(chǎn)者等routingkey 就會(huì)觸發(fā)這個(gè)return 方法
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("---- handle return----");
System.out.println("replyCode:" + replyCode );
System.out.println("replyText:" +replyText );
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
});
String msgStr = "sssss";
while(true){
long nextPublishSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish("",QUEUE_NAME,null,msgStr.getBytes());
confirmSet.add(nextPublishSeqNo);
Thread.sleep(1000);
}
}
}
總結(jié):AMQP模式相對(duì)來(lái)說(shuō)沒(méi)Confirm模式性能好些,推薦使用后者。
9. RabbitMQ延遲隊(duì)列 跟死信
淘寶訂單付款,驗(yàn)證碼等限時(shí)類(lèi)型服務(wù)。
Map<String,Object> headers = new HashMap<String,Object>();
headers.put("my1","111");
headers.put("my2","222");
AMQP.BasicProperties build = new AMQP.BasicProperties().builder().deliveryMode(2).contentEncoding("utf-8").expiration("10000").headers(headers).build();
死信的處理:
10. SpringBoot Tpoic Demo
需求圖:
新建SpringBoot 項(xiàng)目添加如下依賴(lài):
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1. 生產(chǎn)者
application.yml
spring:
rabbitmq:
host: 127.0.0.1
username: admin
password: admin
測(cè)試用例:
package com.sowhat.mqpublisher;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class MqpublisherApplicationTests {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
void userInfo() {
/**
* exchange,routingKey,message
*/
this.amqpTemplate.convertAndSend("log.topic","user.log.error","Users...");
}
}
2. 消費(fèi)者
application.xml
spring:
rabbitmq:
host: 127.0.0.1
username: admin
password: admin
# 自定義配置
mq:
config:
exchange_name: log.topic
# 配置隊(duì)列名稱(chēng)
queue_name:
info: log.info
error: log.error
logs: log.logs
三個(gè)不同的消費(fèi)者:
package com.sowhat.mqconsumer.service;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* @QueueBinding value屬性:用于綁定一個(gè)隊(duì)列。@Queue去查找一個(gè)名字為value屬性中的值得隊(duì)列,如果沒(méi)有則創(chuàng)建,如果有則返回
* type = ExchangeTypes.TOPIC 指定交換器類(lèi)型。默認(rèn)的direct交換器
*/
@Service
public class ErrorReceiverService {
/**
* 把一個(gè)方法跟一個(gè)隊(duì)列進(jìn)行綁定,收到消息后綁定給msg
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue_name.error}"),
exchange = @Exchange(value = "${mq.config.exchange_name}", type = ExchangeTypes.TOPIC),
key = "*.log.error"
)
)
public void process(String msg) {
System.out.println(msg + " Logs...........");
}
}
---
package com.sowhat.mqconsumer.service;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* @QueueBinding value屬性:用于綁定一個(gè)隊(duì)列。
* @Queue去查找一個(gè)名字為value屬性中的值得隊(duì)列,如果沒(méi)有則創(chuàng)建,如果有則返回
*/
@Service
public class InfoReceiverService {
/**
* 添加一個(gè)能夠處理消息的方法
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value ="${mq.config.queue_name.info}"),
exchange = @Exchange(value = "${mq.config.exchange_name}",type = ExchangeTypes.TOPIC),
key = "*.log.info"
))
public void process(String msg){
System.out.println(msg+" Info...........");
}
}
--
package com.sowhat.mqconsumer.service;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* @QueueBinding value屬性:用于綁定一個(gè)隊(duì)列。
* @Queue去查找一個(gè)名字為value屬性中的值得隊(duì)列,如果沒(méi)有則創(chuàng)建,如果有則返回
*/
@Service
public class LogsReceiverService {
/**
* 添加一個(gè)能夠處理消息的方法
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value ="${mq.config.queue_name.logs}"),
exchange = @Exchange(value = "${mq.config.exchange_name}",type = ExchangeTypes.TOPIC),
key = "*.log.*"
))
public void process(String msg){
System.out.println(msg+" Error...........");
}
}
詳細(xì)安裝跟代碼看參考下載:
總結(jié)
如果需要指定模式一般是在消費(fèi)者端設(shè)置,靈活性調(diào)節(jié)。
| 模式 | 生產(chǎn)者Queue | 生產(chǎn)者exchange | 生產(chǎn)者routingKey | 消費(fèi)者exchange | 消費(fèi)者queue | routingKey |
|---|---|---|---|---|---|---|
| Simple(簡(jiǎn)單模式少用) | 指定 | 不指定 | 不指定 | 不指定 | 指定 | 不指定 |
| WorkQueue(多個(gè)消費(fèi)者少用) | 指定 | 不指定 | 不指定 | 不指定 | 指定 | 不指定 |
| fanout(publish/subscribe模式) | 不指定 | 指定 | 不指定 | 指定 | 指定 | 不指定 |
| direct(路由模式) | 不指定 | 指定 | 指定 | 指定 | 指定 | 消費(fèi)者routingKey精確指定多個(gè) |
| topic(主題模糊匹配) | 不指定 | 指定 | 指定 | 指定 | 指定 | 消費(fèi)者routingKey可以進(jìn)行模糊匹配 |
-End-
最近有一些小伙伴,讓我?guī)兔φ乙恍?nbsp;面試題 資料,于是我翻遍了收藏的 5T 資料后,匯總整理出來(lái),可以說(shuō)是程序員面試必備!所有資料都整理到網(wǎng)盤(pán)了,歡迎下載!

面試題】即可獲取