rabbitMq工作模式特性及整合springboot
點擊上方藍色字體,選擇“標(biāo)星公眾號”
優(yōu)質(zhì)文章,第一時間送達
因為公司項目后面需要用到mq做數(shù)據(jù)的同步,所以學(xué)習(xí)mq并在此記錄,這里的是rabbitMq
mq(message queue)消息隊列
官網(wǎng):www.rabbitmq.com
使用消息隊列的優(yōu)點:
1、異步可加快訪問速度 (以前一個訂單接口需要做下單、庫存、付款、快遞等相關(guān)操作,有了mq只需要給相關(guān)信息傳入隊列,下單、庫存、付款、快遞等相關(guān)操作會自動從隊列中收到信息進行異步操作)
2、解耦下游服務(wù)或其他服務(wù)或語言可接入
3、削峰高并發(fā)訪問量可分攤多個隊列分攤
缺點:
1、系統(tǒng)可用性降低(一旦mq掛了系統(tǒng)就宕機了)
2、系統(tǒng)復(fù)雜性增大 (增加了mq模塊需要考慮更多)
RabbitMQ的高級特性
消費端限流
TTL 全稱time to live(存活時間/過期時間) - 當(dāng)消息到達存活時間后還沒被消費會被丟棄 ttl+死信隊列可實現(xiàn)延遲隊列效果
死信隊列
延遲隊列
消息可靠性投遞
Consumer ACK
rabbitMq為了確保消息投遞的可靠性提供了兩種方式 confirm和return
rabbitmq整個消息投遞的路徑為
producer--->rabbitmq broker--->exchange--->queue--->consumer
1.消息從producer到exchange則會返回一個confirmCallback.
2.消息從exchange到queue投遞失敗則會返回一個returnCallBack.
我們將利用這兩個callback控制消息的可靠性投遞
Consumer ACK
ack指acknowledge,確認。表示消費者端接收到消息后的確認方式
有三種方式確認:
自動確認:acknowledge="none"
手動確認:acknowledge="manual"
根據(jù)異常情況確認:acknowledge="auto"
自動確認指,當(dāng)消息一旦被消費者接收到,則自動確認收到,并將相應(yīng)的message從mq的消息緩存中移除。
但是在實際業(yè)務(wù)處理中,很可能消息接收到,業(yè)務(wù)處理出現(xiàn)異常,那么該消息就會丟失。
如果設(shè)置了手動確認模式,則需要在業(yè)務(wù)處理成功后,調(diào)用channel.basicAck(),手動簽收,如果出現(xiàn)異常,
則調(diào)用channel.basicNack()方法,讓其自動重新發(fā)送消息。
我這里學(xué)習(xí)了前面五種
1:簡單模式
2:工作隊列模式
3:發(fā)布訂閱模式
4:路由模式
5:主題模式
簡單模式:即一條線一個發(fā)送到隊列,隊列發(fā)送到接收者
工作隊列模式:即有一個發(fā)送者發(fā)送信息到隊列,隊列發(fā)給多個接收者,比如群發(fā)
發(fā)布訂閱模式:這個是使用的最多的,發(fā)布者需要先發(fā)送到交換機,交換機再發(fā)送到與之綁定的隊列, 然后隊列在發(fā)送到與之綁定隊列的接收者
路由模式:路由模式在發(fā)布訂閱上增加了條件篩選,在消息到達交換機后發(fā)送隊列時進行條件匹配,匹配成功才能發(fā)送給對應(yīng)綁定的隊列,最后再發(fā)送給接收者
主題模式:主題模式在路由模式上面進行升級,條件可進行模糊匹配,通配符規(guī)則 #可以匹配多個詞 * 只能匹配一個詞 如:test.# 匹配 test.one.tow test.one.q.wqe / test.* 匹配 test.one test.two
先安裝rabbitMq,不同的環(huán)境可安裝相關(guān)的版本,我這里已經(jīng)安裝好了

然后運行sbin下面的rabbitmq-server.bat

然后網(wǎng)頁localhost:15672,如下頁面即安裝成功


然后去rabbitmq的官網(wǎng)


左邊是下載右邊是文檔
文檔中也會有一些代碼案例,點擊文檔可以看到mq有七種方式

第一個是在測試的時候需要引入的包,第二個是在springboot上需要引入的包
com.rabbitmq
amqp-client
5.3.0
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
一:簡單模式
我給mq的連接封裝在工具類里,一些隊列名放在常量類里了
工具類代碼:
package com.lansi.realtynavi.test.utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Description 描述
* @Date 2021/3/23 11:22
* @Created by huyao
*/
public class RabbitUtils {
public static ConnectionFactory factory = new ConnectionFactory();
static {
factory.setHost("localhost");
}
public static Connection getConnection() throws Exception{
Connection connection = null;
try {
//獲取長連接
connection = factory.newConnection();
}catch (Exception e){
e.printStackTrace();
}/*finally {
connection.close();
} */
return connection;
}
}
常量類代碼:
package com.lansi.realtynavi.test.constant;
/**
* @Description 描述
* @Date 2021/3/23 11:01
* @Created by huyao
*/
public class MqConstant {
public static final String MQ_HELLO_WORD = "helloWord";
public static final String MQ_PUBLISH = "publish";
public static final String MQ_ROUTING = "routing";
public static final String MQ_TOPICS = "topics";
public static final String MQ_WORK_QUEUES = "workQueues";
public static final String MQ_QUEUE_BAIDU = "baidu";
public static final String MQ_QUEUE_XINLANG = "xinlang";
public static final String MQ_PUBLISH_JHJ = "jiaohuanji";
public static final String MQ_ROUTING_JHJ = "jiaohuanjiRout";
public static final String MQ_TOPIC_JHJ = "jiaohuanjiTopic";
}
生產(chǎn)者代碼
package com.lansi.realtynavi.test.helloWord;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @Description 簡單模式
* @Date 2021/3/22 17:19
* @Created by huyao
*/
public class Producer {
public static void main(String[] args) throws Exception{
Channel channel = null;
Connection connection = null;
try {
//獲取長連接
connection = RabbitUtils.getConnection();
channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_HELLO_WORD, false, false, false, null);
String message = "這是我發(fā)送的第三個隊列消息";
//第一個參數(shù)是交換機信息 簡單隊列不需要交換機 第二個參數(shù)隊列名稱 ,第三個額外信息,第四個需要發(fā)布的信息
channel.basicPublish("", MqConstant.MQ_HELLO_WORD, null, message.getBytes());
System.out.println("[x] Send ‘" + message + "’");
}catch (Exception e){
e.printStackTrace();
}finally {
channel.close();
connection.close();
}
}
}
消費者代碼:
package com.lansi.realtynavi.test.helloWord;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Description 描述
* @Date 2021/3/22 17:27
* @Created by huyao
*/
public class Consumer {
public static void main(String[] argv) throws Exception {
//連接
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
//聲明并創(chuàng)建一個隊列
//參數(shù)1 隊列ID
//參數(shù)2 是否持久化,false對應(yīng)不持久化數(shù)據(jù),mq停掉數(shù)據(jù)就會丟失
//參數(shù)3 是否隊列私有化,false則代表所有消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用
//參數(shù)4 是否自動刪除, false代表連接停掉后不自動刪除這個隊列
// 其他額外的參數(shù),null
channel.queueDeclare(MqConstant.MQ_HELLO_WORD, false, false, false, null);
//從MQ服務(wù)器中獲取數(shù)據(jù)
//創(chuàng)建一個消息消費者
//參數(shù)1:隊列ID
//參數(shù)2:代表是否自動確認收到消息,false代表手動編程來確認消息,這是mq的推薦做法
//參數(shù)3:參數(shù)要傳入的DefaultConsumer的實現(xiàn)類
channel.basicConsume(MqConstant.MQ_HELLO_WORD, false, new Reciver(channel));
}
}
class Reciver extends DefaultConsumer {
private Channel channel;
//重寫構(gòu)造函數(shù),Channel通道對象需要從外層傳入,在handleDelivery中用到
public Reciver(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消費者接收到的消息:"+message);
System.out.println("消息的ID:"+envelope.getDeliveryTag());
//false只確認簽收當(dāng)前的消息,設(shè)置為true的時候則代表簽收該消費者所有未簽收的消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
測試的時候隊列需要手動去創(chuàng)建,不過springboot的話可以自動創(chuàng)建

這里已經(jīng)手動創(chuàng)建好了
運行接收者,運行啟動者


這里接收者自動接收消息
二:工作隊列模式
一個隊列多個接收者
生產(chǎn)者代碼:
package com.lansi.realtynavi.test.workQueues;
import com.google.gson.Gson;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @Description 工作隊列模式
* @Date 2021/3/22 17:33
* @Created by huyao
*/
public class Producer {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, false, false, false, null);
for(int i = 1; i<=20; i++){
SMS sms = new SMS("乘客" + i, "123456789", "你的車票已預(yù)訂成功");
String message = new Gson().toJson(sms);
channel.basicPublish("", MqConstant.MQ_WORK_QUEUES, null, message.getBytes());
}
System.out.println("發(fā)送數(shù)據(jù)成功");
channel.close();
connection.close();
}
}
封裝對象代碼:
package com.lansi.realtynavi.test.workQueues;
/**
* @Description 描述
* @Date 2021/3/23 11:28
* @Created by huyao
*/
public class SMS {
private String name;
private String mobile;
private String content;
public SMS(String name, String mobile, String content) {
this.name = name;
this.mobile = mobile;
this.content = content;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMobile() {
return mobile;
}
public void setMobile(String mobile) {
this.mobile = mobile;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
三個接收者代碼
接收者1
package com.lansi.realtynavi.test.workQueues;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Description 描述
* @Date 2021/3/23 11:33
* @Created by huyao
*/
public class Consumer1 {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, false, false, false, null);
//如果不寫baiscQos(1) 則自動mq會將所有請求平均發(fā)送給所有消費者
//baiscQos,mq不再對消費者一次發(fā)送多個請求,而是消費者處理完一個消息后(確認后),再從隊列中獲取一個新的
channel.basicQos(1);//處理完一個取一個
channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("smsConsumer1-短信發(fā)送成功:"+message);
//服務(wù)器好的話可以在這里睡眠 這里可動態(tài)配置開啟和設(shè)置睡眠時間
/*try {
Thread.sleep(10);
}catch (Exception e){
e.printStackTrace();
}*/
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
接收者2
package com.lansi.realtynavi.test.workQueues;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Description 描述
* @Date 2021/3/23 11:40
* @Created by huyao
*/
public class Consumer2 {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, false, false, false, null);
channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("smsConsumer2-短信發(fā)送成功:"+message);
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
接收者3
package com.lansi.realtynavi.test.workQueues;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Description 描述
* @Date 2021/3/23 11:41
* @Created by huyao
*/
public class Consumer3 {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, false, false, false, null);
channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("smsConsumer1-短信發(fā)送成功:"+message);
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
啟動三個接收類,啟動發(fā)送類




三個接收都拿到了數(shù)據(jù),我學(xué)習(xí)的時候隊列是以輪詢的方式給三個消費者發(fā)送數(shù)據(jù),這里出現(xiàn)了接收數(shù)據(jù)不均衡的情況應(yīng)該是緩存沒用清理,給隊列刪掉重新創(chuàng)建就好了
三:發(fā)布訂閱模式
生成者代碼:
這里和前面兩種模式不同,發(fā)送者綁定了交換機,沒用綁定隊列,需要消費者綁定交換機和隊列
package com.lansi.realtynavi.test.publish;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.util.Scanner;
/**
* @Description 發(fā)布訂閱模式
* @Date 2021/3/23 13:31
* @Created by huyao
*/
public class Producer {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_PUBLISH, false, false, false, null);
String input = new Scanner(System.in).next();
//第一個參數(shù)交換機名字,其他參數(shù)和之前一樣
channel.basicPublish(MqConstant.MQ_PUBLISH_JHJ, "", null, input.getBytes());
channel.close();
connection.close();
}
}
接收者1代碼:
package com.lansi.realtynavi.test.publish;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Description 消費者
* @Date 2021/3/23 13:50
* @Created by huyao
*/
public class ConsumerXinLang {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, false, false, false, null);
//隊列綁定交換機
//參數(shù)1:隊列名,參數(shù)2:交換機名,參數(shù)3:路由key(目前用不到,路由模式通配符模式使用)
channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_PUBLISH_JHJ, "");
channel.basicQos(1);
channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者新浪收到消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
接收者2代碼:
package com.lansi.realtynavi.test.publish;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Description 消費者
* @Date 2021/3/23 13:50
* @Created by huyao
*/
public class ConsumerBaiDu {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, false, false, false, null);
//隊列綁定交換機 目前交換機需要在rabbit也手動創(chuàng)建,在和spring整合的時候spring會自動幫我們創(chuàng)建
//參數(shù)1:隊列名,參數(shù)2:交換機名,參數(shù)3:路由key(目前用不到,路由模式通配符模式使用)
channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_PUBLISH_JHJ, "");
channel.basicQos(1);
channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者百度收到消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
啟動生產(chǎn)者消費者,在生產(chǎn)者控制臺輸入信息:



兩個消費者都接收到了


四 :路由模式
路由模式發(fā)送需要攜帶路由key,用作接收者進行判斷
生產(chǎn)者代碼:
package com.lansi.realtynavi.test.routing;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* @Description 路由模式
* @Date 2021/3/23 13:31
* @Created by huyao
*
*
* 交換機類型:fanout廣播(發(fā)布訂閱) direct轉(zhuǎn)發(fā)(路由) topic通配符(通配模式)
*
*/
public class Producer {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_PUBLISH, false, false, false, null);
LinkedHashMap<String, String> map = new LinkedHashMap<>();
map.put("test1","測試一數(shù)據(jù)");
map.put("test2","測試二數(shù)據(jù)");
map.put("test3","測試三數(shù)據(jù)");
map.put("test4","測試四數(shù)據(jù)");
map.put("test5","測試五數(shù)據(jù)");
map.put("test6","測試六數(shù)據(jù)");
map.put("test7","測試七數(shù)據(jù)");
Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();
while (iterator.hasNext()){
Map.Entry<String, String> next = iterator.next();
//第一個參數(shù)交換機名字,第二個參數(shù)指定rout_key
channel.basicPublish(MqConstant.MQ_ROUTING_JHJ, next.getKey(), null, next.getValue().getBytes());
}
channel.close();
connection.close();
}
}
接收者1:
package com.lansi.realtynavi.test.routing;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Description 消費者
* @Date 2021/3/23 13:50
* @Created by huyao
*/
public class ConsumerBaiDu {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, false, false, false, null);
//隊列綁定交換機 目前交換機需要在rabbit也手動創(chuàng)建,在和spring整合的時候spring會自動幫我們創(chuàng)建
//參數(shù)1:隊列名,參數(shù)2:交換機名,參數(shù)3:路由key(目前用不到,路由模式通配符模式使用)
channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_ROUTING_JHJ, "test1");
channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_ROUTING_JHJ, "test2");
channel.basicQos(1);
channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者百度收到消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
接收者二
package com.lansi.realtynavi.test.routing;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Description 消費者
* @Date 2021/3/23 13:50
* @Created by huyao
*/
public class ConsumerXinLang {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, false, false, false, null);
//隊列綁定交換機
//參數(shù)1:隊列名,參數(shù)2:交換機名,參數(shù)3:路由key
channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test10");
channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test6");
channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test5");
channel.basicQos(1);
channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者新浪收到消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}

在這里看到百度接收者只接受test1、test2,所以只接收到了1和2的數(shù)據(jù),新浪同理

五 :主題模式
在路由的基礎(chǔ)上增加了通配符匹配
通配符規(guī)則 #可以匹配多個詞 * 只能匹配一個詞
生產(chǎn)者代碼:
package com.lansi.realtynavi.test.topics;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* @Description 通配符模式
* @Date 2021/3/23 13:31
* @Created by huyao
*
*
* 交換機類型:fanout廣播(發(fā)布訂閱) direct轉(zhuǎn)發(fā)(路由) topic通配符(通配模式)
*
* 通配符規(guī)則 #可以匹配多個詞 * 只能匹配一個詞
* test.# test.one.tow test.one.q.wqe / test.* test.one test.two
*/
public class Producer {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_TOPIC_JHJ, false, false, false, null);
LinkedHashMap<String, String> map = new LinkedHashMap<>();
map.put("test.one","測試一數(shù)據(jù)");
map.put("test2.two.one","測試二數(shù)據(jù)");
map.put("test.wqe","測試三數(shù)據(jù)");
map.put("test4.com.hash.oqp","測試四數(shù)據(jù)");
map.put("test5.com.code.oqp","測試五數(shù)據(jù)");
map.put("test6.com.code.oqp","測試六數(shù)據(jù)");
Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();
while (iterator.hasNext()){
Map.Entry<String, String> next = iterator.next();
//第一個參數(shù)交換機名字,第二個參數(shù)指定rout_key
channel.basicPublish(MqConstant.MQ_TOPIC_JHJ, next.getKey(), null, next.getValue().getBytes());
}
channel.close();
connection.close();
}
}
接收者1代碼:
package com.lansi.realtynavi.test.topics;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Description 消費者
* @Date 2021/3/23 13:50
* @Created by huyao
*/
public class ConsumerBaiDu {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, false, false, false, null);
//隊列綁定交換機 目前交換機需要在rabbit也手動創(chuàng)建,在和spring整合的時候spring會自動幫我們創(chuàng)建
//參數(shù)1:隊列名,參數(shù)2:交換機名,參數(shù)3:路由key(目前用不到,路由模式通配符模式使用)
channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_TOPIC_JHJ, "*.*.*.oqp");
channel.basicQos(1);
channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者百度收到消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
接收者2代碼
package com.lansi.realtynavi.test.topics;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Description 消費者
* @Date 2021/3/23 13:50
* @Created by huyao
*/
public class ConsumerXinLang {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, false, false, false, null);
//隊列綁定交換機
//參數(shù)1:隊列名,參數(shù)2:交換機名,參數(shù)3:路由key
channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_TOPIC_JHJ, "test.#");
channel.basicQos(1);
channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者新浪收到消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}


最后就是springboot上整合rabbitmq
需要用到的依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
然后配置rabbitmq連接
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=111111
#發(fā)送者開啟confirm確認機制
spring.rabbitmq.publisher-confirms=true
#發(fā)送者開啟return確認機制
spring.rabbitmq.publisher-returns=true
#開啟ack
spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.default-requeue-rejected=false
接下來一個rabbitmq的配置
package com.lansi.realtynavi.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Description mq的配置
* @Date 2021/3/24 14:19
* @Created by huyao
*/
@Configuration
public class RabbitMqConfig {
//定義交換機的名字
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_queue";
//1.聲明交換機
@Bean("bootExchange")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//2.聲明隊列
@Bean("bootQueue")
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
//3.綁定
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
接收者
package com.lansi.realtynavi.config;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Description mq監(jiān)聽/消費者手動簽收消息
* @Date 2021/3/24 14:44
* @Created by huyao
*
*rabbitmq給了兩種消息的可靠性 confirm和return
*
*/
@Component
public class RabbitMqConsumer {
//可監(jiān)聽分布式其他項目,只要mq連接的地址相同監(jiān)聽的隊列名存在即可
//消費者
@RabbitListener(queues = "boot_queue")
public void ListenerQueue(Message message, Channel channel) throws Exception{
System.out.println("消費者接收到消息:"+new String(message.getBody()));
try{
//開始業(yè)務(wù)處理
System.out.println("開始業(yè)務(wù)處理");
//int i = 5/0;
System.out.println("業(yè)務(wù)處理完成");
//業(yè)務(wù)處理完成確認收到消息 , 第二個參數(shù)為true支持多消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}catch (Exception e){
System.out.println("業(yè)務(wù)處理異常");
//業(yè)務(wù)異常,拒收消息,請求重發(fā) 參數(shù)三為true則重回隊列發(fā)送
channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
}
}
}
這里的生產(chǎn)者我寫的一個controller中的列子(錯誤示范,只能調(diào)用一次)
testTopic1 是測試mq的高級特性,這里只用到testTopic就可以
package com.lansi.realtynavi.rabbitmq;
import com.lansi.realtynavi.config.RabbitMqConfig;
import com.lansi.realtynavi.dev.helloWord.HelloSender;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Description 描述
* @Date 2021/3/24 13:46
* @Created by huyao
*/
@RestController
@RequestMapping("api/rabbitMq")
public class RabbitMqController {
@Autowired
private HelloSender helloSender;
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("helloWorld")
public void hello(){
helloSender.send();
}
@GetMapping("testTopic")
public void testTopic(){
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"boot.hhh", "topic的mq.......");
}
//mq的可靠性機制,必須要在配置文件中開啟
@GetMapping("testTopic1")
public void testTopic1(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("confirm方法被執(zhí)行了。。。");
if(b){
System.out.println("交換機確認成功??!");
} else {
System.out.println("交換機確認失?。?!");
}
}
});
//設(shè)置交換機處理失敗消息的模式,為true的時候,消息打到不了隊列時,會將消息重新返回給生產(chǎn)者
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* @param message 消息對象
* @param returnCode 錯誤碼
* @param returnText 錯誤信息
* @param exchange 交換機
* @param routingKey 路由鍵
*
* */
@Override
public void returnedMessage(Message message, int returnCode, String returnText, String exchange,String routingKey) {
System.out.println("return被執(zhí)行了。。。");
System.out.println("message:"+new String(message.getBody()));
System.out.println("錯誤碼:"+returnCode);
System.out.println("錯誤信息:"+returnText);
System.out.println("交換機:"+exchange);
System.out.println("路由鍵:"+routingKey);
}
});
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"abc.boot.hhh", "topic的mq.......");
}
}
運行后掉對應(yīng)的接口,消費者接收


這樣rabbitmq就整合進springboot中了
————————————————
版權(quán)聲明:本文為CSDN博主「oNuoyi」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接及本聲明。
原文鏈接:
https://blog.csdn.net/qq_41973632/article/details/115233999
鋒哥最新SpringCloud分布式電商秒殺課程發(fā)布
??????
??長按上方微信二維碼 2 秒
感謝點贊支持下哈 
