SpringBoot整合RabbitMQ
點擊上方藍色字體,選擇“標星公眾號”
優(yōu)質文章,第一時間送達
? 作者?|??zpk-aaron?
來源 |? urlify.cn/qM7Bzm
基本配置
1. 創(chuàng)建項目導入依賴
創(chuàng)建SpringBoot項目, 并導入如下依賴:?手動導入
<dependency>
????<groupId>org.springframework.bootgroupId>
????<artifactId>spring-boot-starter-amqpartifactId>
dependency>2. application.yml
spring:
??rabbitmq:
????host: 192.168.64.140?# ip地址/域名
????username: admin # 用戶名
????password: admin # 密碼
????port: 5672?# 默認為5672端口, 可省略
????# virtualHost: /pd # 虛擬路徑為了方便測試, 我們可以刪掉自動生成的主啟動類, 下面每個模式都有一個啟動類, 不沖突
簡單模式
1. 主程序
@SpringBootApplication
public?class?Main?{
??public?static?void?main(String[] args)?{
????SpringApplication.run(Main.class, args);
??}
????// Queue的包: org.springframework.amqp.core.Queue
??@Bean
??public?Queue task_queue()?{
????/*
?????* 可用以下形式:
?????* new Queue("helloworld")
?????* 參數1: 隊列名, 參數2: 持久, 參數3: 非排他, 參數4: 非自動刪除
?????* new Queue("helloworld",false,false,false,null)
?????*/
????return?new?Queue("m1Queue",false);
??}
}2. 生產者
AmqpTemplate是rabbitmq客戶端API的一個封裝工具,提供了簡便的方法來執(zhí)行消息操作.
AmqpTemplate由自動配置類自動創(chuàng)建
package?cn.tedu.m1;
import?org.springframework.amqp.core.AmqpTemplate;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.stereotype.Component;
@Component
public?class?SimpleSender?{
??@Autowired
??AmqpTemplate t;
??
??public?void?send()?{
????// 這里向 helloworld 隊列發(fā)送消息
????t.convertAndSend("m1Queue", "Hello world!! "+ System.currentTimeMillis());
????System.out.println("消息已發(fā)送");
??}
}3. 消費者
通過@RabbitListener從指定的隊列接收消息, 使用@RebbitHandler注解的方法來處理消息
@RabbitListener注解也可以直接放在方法上, 這樣一個類中可以有多個方法進行監(jiān)聽消息
package?cn.tedu.m1;
import?org.springframework.amqp.rabbit.annotation.RabbitHandler;
import?org.springframework.amqp.rabbit.annotation.RabbitListener;
import?org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "m1Queue")
public class SimpleReceiver {
??@RabbitHandler
??public void receive(String msg) {
????System.out.println("收到: "+msg);
??}
}或者這樣寫
@Component
public?class?SimpleReceiver?{
??@RabbitListener(queues = "helloworld")
??public?void?receive(String msg)?{
????System.out.println("收到: "+msg);
??}
}另外,@RabbitListener?注解中也可以直接定義隊列:
@RabbitListener(queuesToDeclare = @Queue(name = "helloworld",durable = "false"))4. 測試類
在存放測試代碼的包中,創(chuàng)建測試類
package?cn.tedu.m1;
import?java.util.Scanner;
import?org.junit.jupiter.api.Test;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class?SimpleTests?{
??@Autowired
??SimpleSender simpleSender;
??@Test
??void?test1()?throws?Exception {
????simpleSender.send();
????????// 便于觀察效果, 加入此屬性
????new?Scanner(System.in).nextLine(); // 輸入任何字符或回車結束程序
??}
}工作模式
1. 主程序
package?cn.tedu.m2;
import?org.springframework.amqp.core.Queue;
import?org.springframework.boot.SpringApplication;
import?org.springframework.boot.autoconfigure.SpringBootApplication;
import?org.springframework.context.annotation.Bean;
@SpringBootApplication
public?class?Main?{
????public?static?void?main(String[] args)?{
????????SpringApplication.run(Main.class, args);
????}
????@Bean
????public?Queue task_queue()?{
????????return?new?Queue("m2Queue", false);
????}
}2. 生產者
package?cn.tedu.m2;
import?org.springframework.amqp.core.AmqpTemplate;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.stereotype.Component;
import?java.util.Scanner;
@Component
public?class?SimpleSender?{
????@Autowired
????private?AmqpTemplate t;
????public?void?send()?{
????????while?(true) {
????????????System.out.print("輸入:");
????????????String s = new?Scanner(System.in).nextLine();
????????????//spring 默認將消息的 DeliveryMode 設置為 PERSISTENT 持久化,
????????????t.convertAndSend("m2Queue", s);
????????}
????}
}spring boot封裝的 rabbitmq api 中, 發(fā)送的消息默認是持久化消息.
如果希望發(fā)送非持久化消息, 需要在發(fā)送消息時做以下設置:
使用 MessagePostProcessor 前置處理器參數
從消息中獲取消息的屬性對象
在屬性中把 DeliveryMode 設置為非持久化
//如果需要設置消息為非持久化,可以取得消息的屬性對象,修改它的deliveryMode屬性
??t.convertAndSend("task_queue", (Object) s, new?MessagePostProcessor() {
????@Override
????public?Message postProcessMessage(Message message)?throws?AmqpException {
??????MessageProperties props = message.getMessageProperties();
??????props.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
??????return?message;
????}
??});3. 消費者
package?cn.tedu.m2;
import?org.springframework.amqp.rabbit.annotation.RabbitHandler;
import?org.springframework.amqp.rabbit.annotation.RabbitListener;
import?org.springframework.stereotype.Component;
@Component
public class SimpleReceiver {
????@RabbitListener(queues?= "m2Queue")
????public void receive(String msg) {
????????System.out.println("收到1 " + msg);
????}
????@RabbitListener(queues?= "m2Queue")
????public void receive2(String msg) {
????????System.out.println("收到2 " + msg);
????}
}4. 測試類
package?cn.tedu.m2;
import?org.junit.jupiter.api.Test;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.boot.test.context.SpringBootTest;
import?java.util.Scanner;
@SpringBootTest
public class SimpleTests {
????@Autowired
????private SimpleSender simpleSender;
????@Test
????void test1() throws Exception {
????????simpleSender.send();
????????new?Scanner(System.in).nextLine();
????}
}ack模式
在 spring boot 中提供了三種確認模式:
NONE - 使用rabbitmq的自動確認
AUTO?- 使用rabbitmq的手動確認, springboot會自動發(fā)送確認回執(zhí)?(默認)
MANUAL - 使用rabbitmq的手動確認, 且必須手動執(zhí)行確認操作
默認的?AUTO?模式中, 處理消息的方法拋出異常, 則表示消息沒有被正確處理, 該消息會被重新發(fā)送.
設置 ack 模式
spring:
??rabbitmq:
????listener:
??????simple:
????????# acknowledgeMode: NONE # rabbitmq的自動確認
????????acknowledgeMode: AUTO # rabbitmq的手動確認, springboot會自動發(fā)送確認回執(zhí) (默認)
????????# acknowledgeMode: MANUAL # rabbitmq的手動確認, springboot不發(fā)送回執(zhí), 必須自己編碼發(fā)送回執(zhí)手動執(zhí)行確認操作
如果設置為?MANUAL?模式,必須手動執(zhí)行確認操作
@RabbitListener(queues="task_queue")
public void receive1(String s, Channel c, @Header(name=AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
????System.out.println("receiver1 - 收到: "+s);
????for?(int i = 0; i < s.length(); i++) {
????????if?(s.charAt(i) == '.') {
????????????Thread.sleep(1000);
????????}
????}
????// 手動發(fā)送確認回執(zhí)
????c.basicAck(tag, false);
}抓取數量(qos)
工作模式中, 為了合理地分發(fā)數據, 需要將 qos 設置成 1, 每次只接收一條消息, 處理完成后才接收下一條消息.
spring boot 中是通過?prefetch?屬性進行設置, 改屬性的默認值是 250.
spring:
??rabbitmq:
????listener:
??????simple:
????????prefetch: 1?# qos=1, 默認250發(fā)布和訂閱模式
1. 主程序
創(chuàng)建?FanoutExcnahge?實例, 封裝?fanout?類型交換機定義信息.
spring boot 的自動配置類會自動發(fā)現交換機實例, 并在 RabbitMQ 服務器中定義該交換機.
package?cn.tedu.m3;
import?org.springframework.amqp.core.FanoutExchange;
import?org.springframework.boot.SpringApplication;
import?org.springframework.boot.autoconfigure.SpringBootApplication;
import?org.springframework.context.annotation.Bean;
@SpringBootApplication
public?class?Main?{
??public?static?void?main(String[] args)?{
????SpringApplication.run(Main.class, args);
??}
??@Bean
??public?FanoutExchange fanoutExchange()?{
????return?new?FanoutExchange("logs");
??}
}2. 生產者
生產者向指定的交換機?logs?發(fā)送數據.
不需要指定隊列名或路由鍵, 即使指定也無效, 因為?fanout?交換機會向所有綁定的隊列發(fā)送數據, 而不是有選擇的發(fā)送.
package?cn.tedu.m3;
import?java.util.Scanner;
import?org.springframework.amqp.core.AmqpTemplate;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.stereotype.Component;
@Component
public?class?Publisher?{
??@Autowired
??AmqpTemplate t;
??
??public?void?send()?{
????while?(true) {
??????System.out.print("輸入:");
??????String s = new?Scanner(System.in).nextLine();
??????// 指定向 logs 交換機發(fā)送, 不指定隊列名或路由鍵
??????t.convertAndSend("logs","",s);
????}
??}
}3. 消費者
消費者需要執(zhí)行以下操作:
定義隨機隊列(隨機命名,非持久,排他,自動刪除)
定義交換機(可以省略, 已在主程序中定義)
將隊列綁定到交換機
spring boot 通過注解完成以上操作:
@RabbitListener(bindings = @QueueBinding( //這里進行綁定設置
??value = @Queue, //這里定義隨機隊列,默認屬性: 隨機命名,非持久,排他,自動刪除
??exchange = @Exchange(name = "logs", declare = "false") //指定 logs 交換機,因為主程序中已經定義,這里不進行定義
))package?cn.tedu.m3;
import?org.springframework.amqp.rabbit.annotation.Exchange;
import?org.springframework.amqp.rabbit.annotation.Queue;
import?org.springframework.amqp.rabbit.annotation.QueueBinding;
import?org.springframework.amqp.rabbit.annotation.RabbitListener;
import?org.springframework.stereotype.Component;
@Component
public?class?Subscriber?{
??@RabbitListener(bindings = @QueueBinding(value = @Queue, exchange = @Exchange(name = "logs", declare = "false")))
??public void receive1(String s) throws Exception {
????System.out.println("receiver1 - 收到: "+s);
??}
??@RabbitListener(bindings = @QueueBinding(value = @Queue, exchange = @Exchange(name = "logs", declare = "false")))
??public?void?receive2(String s) throws?Exception?{
????System.out.println("receiver2 - 收到: "+s);
??}
}4. 測試類
package?cn.tedu.m3;
import?org.junit.jupiter.api.Test;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class?PublishSubscribeTests?{
??@Autowired
??Publisher publisher;
??@Test
??void?test1()?throws?Exception {
????publisher.send();
????????Thread.sleep(3000); // 為了防止程序執(zhí)行太快看不到效果
??}
}路由模式
與發(fā)布和訂閱模式代碼類似, 只是做以下三點調整:
使用?
direct?交換機隊列和交換機綁定時, 設置綁定鍵
發(fā)送消息時, 指定路由鍵
1. 主程序
主程序中使用?DirectExcnahge?對象封裝交換機信息, spring boot 自動配置類會自動發(fā)現這個對象, 并在 RabbitMQ 服務器上定義這個交換機.
package?cn.tedu.m4;
import?org.springframework.amqp.core.DirectExchange;
import?org.springframework.boot.SpringApplication;
import?org.springframework.boot.autoconfigure.SpringBootApplication;
import?org.springframework.context.annotation.Bean;
@SpringBootApplication
public?class?Main?{
??public?static?void?main(String[] args)?{
????SpringApplication.run(Main.class, args);
??}
??@Bean
??public?DirectExchange fanoutExchange()?{
????return?new?DirectExchange("direct_logs");
??}
}2. 生產者
生產者向指定的交換機發(fā)送消息, 并指定路由鍵.
package?cn.tedu.m4;
import?java.util.Scanner;
import?org.springframework.amqp.core.AmqpTemplate;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.stereotype.Component;
@Component
public?class?RouteSender?{
??@Autowired
??AmqpTemplate t;
??
??public?void?send()?{
????while?(true) {
??????System.out.print("輸入消息:");
??????String s = new?Scanner(System.in).nextLine();
??????System.out.print("輸入路由鍵:");
??????String key = new?Scanner(System.in).nextLine();
??????// 第二個參數指定路由鍵
??????t.convertAndSend("direct_logs",key,s);
????}
??}
}3. 消費者
消費者通過注解來定義隨機隊列, 綁定到交換機, 并指定綁定鍵:
@RabbitListener(bindings = @QueueBinding( // 這里做綁定設置
??value = @Queue, // 定義隊列, 隨機命名,非持久,排他,自動刪除
??exchange = @Exchange(name = "direct_logs", declare = "false"), // 指定綁定的交換機,主程序中已經定義過隊列,這里不進行定義
??key = {"error","info","warning"} // 設置綁定鍵
))package?cn.tedu.m4;
import?org.springframework.amqp.rabbit.annotation.Exchange;
import?org.springframework.amqp.rabbit.annotation.Queue;
import?org.springframework.amqp.rabbit.annotation.QueueBinding;
import?org.springframework.amqp.rabbit.annotation.RabbitListener;
import?org.springframework.stereotype.Component;
@Component
public?class?RouteReceiver?{
??@RabbitListener(bindings = @QueueBinding(value = @Queue,exchange = @Exchange(name = "direct_logs", declare = "false"),key = {"error"}))
??public void receive1(String s) throws Exception {
????System.out.println("receiver1 - 收到: "+s);
??}
??@RabbitListener(bindings = @QueueBinding(value = @Queue, exchange = @Exchange(name = "direct_logs", declare = "false"),key = {"error","info","warning"}))
??public?void?receive2(String s) throws?Exception?{
????System.out.println("receiver2 - 收到: "+s);
??}
}4. 測試類
package?cn.tedu.m4;
import?org.junit.jupiter.api.Test;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class?RouteTests?{
??@Autowired
??RouteSender sender;
??@Test
??void?test1()?throws?Exception {
????sender.send();
????????Thread.sleep(3000); // 為了防止程序執(zhí)行太快看不到效果
??}
}主題模式
主題模式不過是具有特殊規(guī)則的路由模式, 代碼與路由模式基本相同, 只做如下調整:
使用?
topic?交換機使用特殊的綁定鍵和路由鍵規(guī)則
1. 主程序
package?cn.tedu.m5;
import?org.springframework.amqp.core.TopicExchange;
import?org.springframework.boot.SpringApplication;
import?org.springframework.boot.autoconfigure.SpringBootApplication;
import?org.springframework.context.annotation.Bean;
@SpringBootApplication
public?class?Main?{
??public?static?void?main(String[] args)?{
????SpringApplication.run(Main.class, args);
??}
??@Bean
??public?TopicExchange fanoutExchange()?{
????return?new?TopicExchange("topic_logs");
??}
}2. 生產者
package?cn.tedu.m5;
import?java.util.Scanner;
import?org.springframework.amqp.core.AmqpTemplate;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.stereotype.Component;
@Component
public?class?TopicSender?{
??@Autowired
??AmqpTemplate t;
??
??public?void?send()?{
????while?(true) {
??????System.out.print("輸入消息:");
??????String s = new?Scanner(System.in).nextLine();
??????System.out.print("輸入路由鍵:");
??????String key = new?Scanner(System.in).nextLine();
??????
??????t.convertAndSend("topic_logs",key,s);
????}
??}
}3. 消費者
package?cn.tedu.m5;
import?org.springframework.amqp.rabbit.annotation.Exchange;
import?org.springframework.amqp.rabbit.annotation.Queue;
import?org.springframework.amqp.rabbit.annotation.QueueBinding;
import?org.springframework.amqp.rabbit.annotation.RabbitListener;
import?org.springframework.stereotype.Component;
@Component
public?class?TopicReceiver?{
??@RabbitListener(bindings = @QueueBinding(value = @Queue,exchange = @Exchange(name = "topic_logs", declare = "false"),key = {"*.orange.*"}))
??public void receive1(String s) throws Exception {
????System.out.println("receiver1 - 收到: "+s);
??}
??@RabbitListener(bindings = @QueueBinding(value = @Queue, exchange = @Exchange(name = "topic_logs", declare = "false"),key = {"*.*.rabbit","lazy.#"}))
??public?void?receive2(String s) throws?Exception?{
????System.out.println("receiver2 - 收到: "+s);
??}
}4. 測試類
package?cn.tedu.m5;
import?org.junit.jupiter.api.Test;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class TopicTests {
??@Autowired
??TopicSender sender;
??@Test
??void test1() throws Exception {
????sender.send();
??}
}RPC異步調用
1. 主程序
主程序中定義兩個隊列
發(fā)送調用信息的隊列:?
rpc_queue返回結果的隊列: 隨機命名
package?cn.tedu.m6;
import?java.util.UUID;
import?org.springframework.amqp.core.Queue;
import?org.springframework.boot.SpringApplication;
import?org.springframework.boot.autoconfigure.SpringBootApplication;
import?org.springframework.context.annotation.Bean;
@SpringBootApplication
public?class?Main?{
??public?static?void?main(String[] args)?{
????SpringApplication.run(Main.class, args);
??}
??@Bean
??public?Queue sendQueue()?{
????return?new?Queue("rpc_queue",false);
??}
??@Bean
??public?Queue rndQueue()?{
????return?new?Queue(UUID.randomUUID().toString(), false);
??}
}2. 服務端
從rpc_queue接收調用數據, 執(zhí)行運算求斐波那契數,并返回計算結果.@Rabbitlistener注解對于具有返回值的方法:
會自動獲取?
replyTo?屬性自動獲取?
correlationId?屬性向?
replyTo?屬性指定的隊列發(fā)送計算結果, 并攜帶?correlationId?屬性
package?cn.tedu.m6;
import?org.springframework.amqp.rabbit.annotation.RabbitListener;
import?org.springframework.stereotype.Component;
@Component
public?class?RpcServer?{
??@RabbitListener(queues = "rpc_queue")
??public?long?getFbnq(int?n)?{
????return?f(n);
??}
??private?long?f(int?n)?{
????if?(n==1?|| n==2) {
??????return?1;
????}
????return?f(n-1) + f(n-2);
??}
}3. 客戶端
使用 SPEL 表達式獲取隨機隊列名:?"#{rndQueue.name}"
發(fā)送調用數據時, 攜帶隨機隊列名和correlationId
從隨機隊列接收調用結果, 并獲取correlationId
package?cn.tedu.m6;
import?java.util.UUID;
import?org.springframework.amqp.AmqpException;
import?org.springframework.amqp.core.AmqpTemplate;
import?org.springframework.amqp.core.Message;
import?org.springframework.amqp.core.MessagePostProcessor;
import?org.springframework.amqp.core.MessageProperties;
import?org.springframework.amqp.rabbit.annotation.RabbitListener;
import?org.springframework.amqp.support.AmqpHeaders;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.beans.factory.annotation.Value;
import?org.springframework.messaging.handler.annotation.Header;
import?org.springframework.stereotype.Component;
@Component
public?class?RpcClient?{
??@Autowired
??AmqpTemplate t;
??
??@Value("#{rndQueue.name}")
??String rndQueue;
??
??public?void?send(int?n)?{
????// 發(fā)送調用信息時, 通過前置消息處理器, 對消息屬性進行設置, 添加返回隊列名和關聯id
????t.convertAndSend("rpc_queue", (Object)n, new?MessagePostProcessor() {
??????@Override
??????public?Message postProcessMessage(Message message)?throws?AmqpException {
????????MessageProperties p = message.getMessageProperties();
????????p.setReplyTo(rndQueue);
????????p.setCorrelationId(UUID.randomUUID().toString());
????????return?message;
??????}
????});
??}
??
??//從隨機隊列接收計算結果
??@RabbitListener(queues = "#{rndQueue.name}")
??public?void?receive(long?r, @Header(name=AmqpHeaders.CORRELATION_ID)?String correlationId) {
????System.out.println("\n\n"+correlationId+" - 收到: "+r);
??}
??
}4. 測試類
package?cn.tedu.m6;
import?java.util.Scanner;
import?org.junit.jupiter.api.Test;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class?TopicTests?{
??@Autowired
??RpcClient client;
??@Test
??void?test1()?throws?Exception {
????while?(true) {
??????System.out.print("求第幾個斐波那契數: ");
??????int?n = new?Scanner(System.in).nextInt();
??????client.send(n);
????}
??}
}粉絲福利:108本java從入門到大神精選電子書領取
???
?長按上方鋒哥微信二維碼?2 秒 備注「1234」即可獲取資料以及 可以進入java1234官方微信群
感謝點贊支持下哈?
