SpringCloud 整合Stream實(shí)戰(zhàn)
點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”
優(yōu)質(zhì)文章,第一時(shí)間送達(dá)
76套java從入門到精通實(shí)戰(zhàn)課程分享
前言
2-4 Stream快速入門-集成MQ消費(fèi)
創(chuàng)建 stream-sample 項(xiàng)目, 引入依賴
創(chuàng)建監(jiān)聽器 (聲明和綁定信道)
從RabbitMQ觸發(fā)消息
RabbitMQ 界面操作
1、http://192.168.8.240:15672 (guest/guest)
2、查看 Queue input.anonymous.Z5LyfIlEQtqw3hcJTAhHfA
3、發(fā)送消息
1、下方 Publish message
2、Payload: 自定義內(nèi)容
3、點(diǎn)擊按鈕 Publish message
4、IDEA控制臺(tái)就會(huì)提示:“message consumed successfully, payload=自定義內(nèi)容”
2-8 基于發(fā)布訂閱實(shí)現(xiàn)廣播功能
創(chuàng)建消息生產(chǎn)者Producer服務(wù), 配置消息主題
啟動(dòng)多個(gè)消費(fèi)者Consumer節(jié)點(diǎn)測(cè)試消息廣播
RabbitMQ界面查看廣播組(Exchanges)
自定義主題 (Topic)
com.example.springcloud.topic.MyTopic
public interface MyTopic {
/**
* Input channel name.
*/
String INPUT = "myTopic-consumer";
/**
* Output channel name.
*/
String OUTPUT = "myTopic-producer";
/**
* input=消費(fèi)者
*/
@Input(INPUT)
SubscribableChannel input();
/**
* output=生產(chǎn)者
*/
@Output(OUTPUT)
MessageChannel output();
}
添加消費(fèi)者
com.example.springcloud.biz.StreamConsumer
@Slf4j
// 綁定信道
@EnableBinding(
value = {
Sink.class,
MyTopic.class
}
)
public class StreamConsumer {
@StreamListener(Sink.INPUT)
public void consume(Object payload) {
log.info("message consumed successfully, payload={}", payload);
}
@StreamListener(MyTopic.INPUT)
public void consumeMyMessage(Object payload) {
log.info("my message consumed successfully, payload={}", payload);
}
}
使用配置文件, 綁定生產(chǎn)者和消費(fèi)者的通道
application.yml
# 綁定 Channel 到 broadcast
spring:
cloud:
stream:
bindings:
myTopic-consumer: # 消費(fèi)者綁定
destination: broadcast # rabbitMq界面顯示 Exchange
myTopic-producer: # 生產(chǎn)者綁定
destination: broadcast
啟動(dòng)與測(cè)試
(1) 按不同端口啟動(dòng)
StreamApplication (63000) :63000/
StreamApplication (63001) :63001/
(2) Postman (demo - 最簡單的生產(chǎn)者消費(fèi)者)
POST localhost:63000/send
Body (x-www-form-urfencoded)
body:hello broadcast
63000、630001 控制臺(tái)打印:
my message consumed successfully, payload=hello broadcast
(3) RabbitMQ WEB
打開 http://192.168.8.240:15672
查看頂部 Exchanges 下面是否存在 “broadcast”
查看 Bindings (每一個(gè)Queues都對(duì)應(yīng)后臺(tái)一個(gè)監(jiān)聽隊(duì)列)
broadcast.anonymous.DIILcrP3SvaGEUv6dfiAqQ
broadcast.anonymous.UnlUchdPQnaavW5uBIQjEA
查看頂部 Queues 是否存在對(duì)應(yīng) Bindings
點(diǎn)擊 broadcast.anonymous.DIILcrP3SvaGEUv6dfiAqQ
進(jìn)入后, 點(diǎn)擊 Publish message 輸入"queues test"
返回 IDEA控制臺(tái) 就會(huì)顯示該條 Message
my message consumed successfully, payload=queues test



2-10 消費(fèi)組和消息分區(qū)詳解
消費(fèi)組
前面我們接觸的都是廣播場(chǎng)景,話說這個(gè)廣播模式簡直就是個(gè)圍觀模式,所有訂閱相同主題的消費(fèi)者都眼巴巴看著生產(chǎn)者發(fā)布的消息,一個(gè)消息在所有節(jié)點(diǎn)都要被消費(fèi)一遍。如果我只想挑一個(gè)節(jié)點(diǎn)來消費(fèi)消息,而且又不能只逮著一只羊來薅羊毛,必須利用負(fù)載均衡來分發(fā)請(qǐng)求。這個(gè)Stream能不能辦到呢?
這不就是單播模式嗎,那自然不在話下,Stream里的消費(fèi)組就是專門解決這個(gè)問題的。讓我們來用一個(gè)案例說明它的工作模式:

在上面這個(gè)例子中,“商品發(fā)布”就是一個(gè)消息,它被放到了對(duì)應(yīng)的消息隊(duì)列中,有兩撥人馬同時(shí)盯著這個(gè)Topic,這兩撥人馬各自組成了一個(gè)Group,每次有新消息到來的時(shí)候,每個(gè)Group就派出一個(gè)代表去響應(yīng),而且是從這個(gè)Group中輪流挑選代表(負(fù)載均衡),這里的Group也就是我們說的消費(fèi)者。
在Stream里配置一個(gè)消費(fèi)組非常簡單,下一小節(jié)我就帶大家去做一個(gè)Demo。在這里我就先小劇透一點(diǎn)內(nèi)容好了:
spring.cloud.stream.bindings.group-producer.group=Group-A
看破不說破,這里面是什么含義,且聽下節(jié)分享。
消費(fèi)分區(qū)
消費(fèi)分區(qū)消費(fèi)組,傻傻分不清楚。這兩個(gè)名字聽起來很像,其實(shí)并不是一碼事,消費(fèi)組相當(dāng)于是每組派一個(gè)代表去辦事兒,而消費(fèi)分區(qū)相當(dāng)于是專事專辦,也就是說,所有消息都會(huì)根據(jù)分區(qū)Key進(jìn)行劃分,帶有相同Key的消息只能被同一個(gè)消費(fèi)者處理。
我們來看下面的消息分區(qū)例子:
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來直接上傳(img-PmRdO4BG-1614935805027)(https://raw.githubusercontent.com/eddie-code/springcloud-demo-dec/develop/stream/.README_images/319e2d7e.png)]
消息分區(qū)有一個(gè)預(yù)定義的分區(qū)Key,它是一個(gè)SpEL表達(dá)式(想想前面哪一章節(jié)講過SpEL?提示換一下,Key Resolver)。我們需要在配置文件中指定分區(qū)的總個(gè)數(shù)N,Stream就會(huì)為我們創(chuàng)建N個(gè)分區(qū),這里面每個(gè)分區(qū)就是一個(gè)Queue(可以在RabbitMQ管理界面中看到所有的分區(qū)隊(duì)列)。
當(dāng)商品發(fā)布的消息被生產(chǎn)者發(fā)布時(shí),Stream會(huì)計(jì)算得出分區(qū)Key,從而決定這個(gè)消息應(yīng)該加入到哪個(gè)Queue里面。在這個(gè)過程中,每個(gè)消費(fèi)組/消費(fèi)者僅會(huì)連接到一個(gè)Queue,這個(gè)Queue中對(duì)應(yīng)的消息只能被特定的消費(fèi)組/消費(fèi)者來處理。
2-11 基于消費(fèi)組實(shí)現(xiàn)輪循單播功能
創(chuàng)建 Producer和Consumer
配置消費(fèi)組, 啟動(dòng)兩個(gè)節(jié)點(diǎn)
RabbitMQ界面單播和廣播在Exchange中的不同
消費(fèi)分區(qū)的配置項(xiàng)
創(chuàng)建 GroupTopic
com.example.springcloud.topic.GroupTopic
public interface GroupTopic {
/**
* Input channel name.
*/
String INPUT = "group-consumer";
/**
* Output channel name.
*/
String OUTPUT = "group-producer";
/**
* input=消費(fèi)者
*/
@Input(INPUT)
SubscribableChannel input();
/**
* output=生產(chǎn)者
*/
@Output(OUTPUT)
MessageChannel output();
}
com.example.springcloud.biz.controller.DemoController
@Autowired
private GroupTopic groupTopicProducer; // StreamConsumer 沒有綁定前是找不到 標(biāo)記紅色波浪線
@PostMapping("sendToGroup")
public void sendMessageToGroup(@RequestParam(value = "body") String body) {
groupTopicProducer.output().send(MessageBuilder.withPayload(body).build());
}
com.example.springcloud.biz.StreamConsumer
@Slf4j
@EnableBinding(
value = { GroupTopic.class }
)
public class StreamConsumer {
@StreamListener(GroupTopic.INPUT)
public void consumeGroupMessage(Object payload) {
log.info("Gourp message consumed successfully, payload={}", payload);
}
}
---
# 消息分組示例
spring:
cloud:
stream:
bindings:
group-consumer: # 消費(fèi)者綁定
destination: group-topic
group: Group-A
group-producer: # 生產(chǎn)者綁定
destination: group-topic
---
# 消費(fèi)分區(qū)配置
spring:
cloud:
stream:
bindings:
group-consumer: # com.example.springcloud.topic.GroupTopic
consumer:
partitioned: true # 打開消費(fèi)者的消費(fèi)分區(qū)功能
group-producer:
producer:
partition-count: 2 # 兩個(gè)消息分區(qū)
# SpEL (Key resolver) 可以定義復(fù)雜表達(dá)式生成Key
# 我們這里用最簡化的配置,只有索引參數(shù)為 1 的節(jié)點(diǎn)(消費(fèi)者),才能消費(fèi)消息 ***
partition-key-expression: "1"
instanceCount: 2 # 當(dāng)前消費(fèi)者實(shí)例總數(shù)
instanceIndex: 1 # 最大值 instanceCount-1,當(dāng)前實(shí)例的索引號(hào) ***
啟動(dòng)與測(cè)試
1、StreamApplication (63000) : Group-A-0
修改 “spring.cloud.stream.instanceIndex=0”
2、StreamApplication (63001) : Group-A-1
修改 “spring.cloud.stream.instanceIndex=1”
3、使用PostMan測(cè)試
1、localhost:63000/sendToGroup
2、Body (x-www-form-urfencoded)
3、body:Test 測(cè)試 1234
如何指定:
通過消息分區(qū)實(shí)現(xiàn):
請(qǐng)求后 Group-A-1 的控制臺(tái)會(huì)出現(xiàn)打印信息 "Test 測(cè)試 1234",
無論請(qǐng)求多少次都會(huì)在 Group-A-1 打印,
為什么呢?
因?yàn)樵O(shè)置了 "partition-key-expression: "1"" 指定消費(fèi)
TIPS: 比如已經(jīng)指定了 Group-A-1 端口 63000, 再啟動(dòng)多個(gè) Group-A-1 端口 63001, 然后再次請(qǐng)求, 會(huì)發(fā)現(xiàn)他們是依次輪詢打印到控制臺(tái)
spring.cloud.stream.bindings.group-consumer.group=Group-A 重點(diǎn)是這個(gè)分組配置來區(qū)分
2-13 Stream+ MQ插件實(shí)現(xiàn)延遲消息
配置插件, 重啟RabbitMQ
創(chuàng)建 Producer 和 Consumer, 配置exchange-type
添加Message Header傳遞延遲時(shí)間
啟動(dòng)查看效果
RabbitMQ部分
部署與安裝插件
Docker - rabbitmq:3.6.15 部署
Docker - rabbitmq:3.6.15 部署 (備份地址)
延遲消息 - 官方插件版本
參考資料 - 安裝插件
(1) 終端直接下載 (部署的版本是:3.6.15)
[root@k8s-master1 opt]# wget https://dl.bintray.com/rabbitmq/community-plugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip
--2021-03-01 22:28:32-- https://dl.bintray.com/rabbitmq/community-plugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip
Resolving dl.bintray.com (dl.bintray.com)... 44.239.142.179, 52.10.12.153, 52.32.247.225, ...
Connecting to dl.bintray.com (dl.bintray.com)|44.239.142.179|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 29853 (29K) [application/zip]
Saving to: ‘rabbitmq_delayed_message_exchange-20171215-3.6.x.zip’
100%[==============================================================================================================================================================================================>] 29,853 73.7KB/s in 0.4s
(2) 解壓
[root@k8s-master1 opt]# unzip rabbitmq_delayed_message_exchange-20171215-3.6.x.zip
Archive: rabbitmq_delayed_message_exchange-20171215-3.6.x.zip
inflating: rabbitmq_delayed_message_exchange-20171215-3.6.x.ez
(3) 拷貝到容器里
[root@k8s-master1 ~]# docker cp /opt/rabbitmq_delayed_message_exchange-20171215-3.6.x.ez myrabbit1:/opt
(4) 進(jìn)入容器
[root@k8s-master1 opt]# docker exec -it myrabbit1 bash
root@rabbit1:/# cp /opt/rabbitmq_delayed_message_exchange-20171215-3.6.x.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.15/plugins
(5) 從 opt 到插件 plugins 里
root@rabbit1:/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.15/plugins# cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.15/sbin
root@rabbit1:/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.15/sbin# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
The following plugins have been enabled:
rabbitmq_delayed_message_exchange
Applying plugin configuration to rabbit@rabbit1... started 1 plugin.
(6) 查看 rabbitmq_delayed_message_exchange 是否安裝成功
root@rabbit1:/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.15/sbin# rabbitmq-plugins list
Configured: E = explicitly enabled; e = implicitly enabled
| Status: * = running on rabbit@rabbit1
|/
[e*] amqp_client 3.6.15
[e*] cowboy 1.0.4
[e*] cowlib 1.0.2
[ ] rabbitmq_amqp1_0 3.6.15
[ ] rabbitmq_auth_backend_ldap 3.6.15
[ ] rabbitmq_auth_mechanism_ssl 3.6.15
[ ] rabbitmq_consistent_hash_exchange 3.6.15
[E*] rabbitmq_delayed_message_exchange 20171215-3.6.x
[ ] rabbitmq_event_exchange 3.6.15
[ ] rabbitmq_federation 3.6.15
[ ] rabbitmq_federation_management 3.6.15
[ ] rabbitmq_jms_topic_exchange 3.6.15
[E*] rabbitmq_management 3.6.15
[e*] rabbitmq_management_agent 3.6.15
[ ] rabbitmq_management_visualiser 3.6.15
[ ] rabbitmq_mqtt 3.6.15
[ ] rabbitmq_random_exchange 3.6.15
[ ] rabbitmq_recent_history_exchange 3.6.15
[ ] rabbitmq_sharding 3.6.15
[ ] rabbitmq_shovel 3.6.15
[ ] rabbitmq_shovel_management 3.6.15
[ ] rabbitmq_stomp 3.6.15
[ ] rabbitmq_top 3.6.15
[ ] rabbitmq_tracing 3.6.15
[ ] rabbitmq_trust_store 3.6.15
[e*] rabbitmq_web_dispatch 3.6.15
[ ] rabbitmq_web_mqtt 3.6.15
[ ] rabbitmq_web_mqtt_examples 3.6.15
[ ] rabbitmq_web_stomp 3.6.15
[ ] rabbitmq_web_stomp_examples 3.6.15
[ ] sockjs 0.3.4
(7) Reboot Rabbitmq
方式一
docker restart myrabbit1 myrabbit2 myrabbit3
方式二
docker exec -it myrabbit1 bash
rabbitmqctl stop
rabbitmq-server
(8) 訪問 WEB UI
http://192.168.8.240:15672
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來直接上傳(img-ywYxkyFJ-1614935805029)(.README_images/8967dfec.png)]
Quick start
(1) 創(chuàng)建Topic
public interface DelayedTopic {
/**
* Input channel name.
*/
String INPUT = "delayed-consumer";
/**
* Output channel name.
*/
String OUTPUT = "delayed-producer";
/**
* input=消費(fèi)者
*/
@Input(INPUT)
SubscribableChannel input();
/**
* output=生產(chǎn)者
*/
@Output(OUTPUT)
MessageChannel output();
}
(2) 創(chuàng)建請(qǐng)求接口
com.example.springcloud.biz.controller.DemoController
@PostMapping("sendDM")
public void sendDelayedMessage(@RequestParam(value = "body") String body,
@RequestParam(value = "seconds") Integer seconds) {
MessageBean msg = new MessageBean();
msg.setPayload(body);
log.info("[{}]秒后準(zhǔn)備發(fā)送延遲消息",seconds);
delayedTopicProducer.output().send(
MessageBuilder.withPayload(msg)
.setHeader("x-delay", seconds * 1000)
.build()
);
}
(3) 消費(fèi)者創(chuàng)建打印MessageBaen信息
com.example.springcloud.biz.StreamConsumer
@StreamListener(DelayedTopic.INPUT)
public void consumeDelayedMessage(MessageBean bean) {
log.info("Delayed message consumed successfully, payload={}", bean.getPayload());
}
(4) application.yml
# 延遲消息配置
spring:
cloud:
stream:
bindings:
delayed-consumer:
destination: delayed-topic
delayed-producer:
destination: delayed-topic
rabbit:
bindings:
delayed-producer:
producer:
delayed-exchange: true # 延遲隊(duì)列
(4) PostMan請(qǐng)求測(cè)試

控制臺(tái)打印 (38-23) 剛好 15s
2021-03-03 14:29:23.172 INFO 16512 --- [io-63000-exec-1] c.e.s.biz.controller.DemoController : [15]秒后準(zhǔn)備發(fā)送延遲消息
2021-03-03 14:29:23.184 INFO 16512 --- [io-63000-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.8.240:5672]
2021-03-03 14:29:23.194 INFO 16512 --- [io-63000-exec-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory.publisher#3d78cf08:0/SimpleConnection@4898ec6c [delegate=amqp://[email protected]:5672/, localPort= 11225]
2021-03-03 14:29:23.197 INFO 16512 --- [io-63000-exec-1] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (input.anonymous.FLQqBEtsQ_-ti45RQP4C5g) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2021-03-03 14:29:23.197 INFO 16512 --- [io-63000-exec-1] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (broadcast.anonymous.sVMurJRTTPmzbJeiv2_YCA) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2021-03-03 14:29:23.197 INFO 16512 --- [io-63000-exec-1] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (delayed-topic.anonymous.zRO1l6z8R8yoRe-iHDcfMA) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2021-03-03 14:29:38.244 INFO 16512 --- [8yoRe-iHDcfMA-1] c.e.springcloud.biz.StreamConsumer : Delayed message consumed successfully, payload=歡迎關(guān)注:https://blog.csdn.net/eddielee9217

2-14 Stream本地重試功能
創(chuàng)建Producer和Consumer, 在Consumer中拋出異常
設(shè)置重試次數(shù)
重試成功和失敗的表現(xiàn)
異常重試(單機(jī)版)
(1) 創(chuàng)建 ErrorTopic
com.example.springcloud.topic.ErrorTopic
public interface ErrorTopic {
/**
* Input channel name.
*/
String INPUT = "error-consumer";
/**
* Output channel name.
*/
String OUTPUT = "error-producer";
/**
* input=消費(fèi)者
*/
@Input(INPUT)
SubscribableChannel input();
/**
* output=生產(chǎn)者
*/
@Output(OUTPUT)
MessageChannel output();
}
(2) 創(chuàng)建入口
com.example.springcloud.biz.controller.DemoController
@PostMapping("sendError")
public void sendErrorMessage(@RequestParam(value = "body") String body) {
MessageBean msg = new MessageBean();
msg.setPayload(body);
errorTopicProducer.output().send(
MessageBuilder.withPayload(msg).build()
);
}
(3) 創(chuàng)建消費(fèi)
com.example.springcloud.biz.StreamConsumer
@StreamListener(ErrorTopic.INPUT)
public void consumeErrorMessage(MessageBean bean) {
log.info("你還好嗎?");
// 每次都自增一 當(dāng)你被三整除就放行
if (count.incrementAndGet() % 3 == 0) {
log.info("很好,謝謝。你呢?");
// 成功消費(fèi)以后, 就會(huì)清零
count.set(0);
} else {
log.info("你怎么回事啊?");
throw new RuntimeException("我不好~");
}
}
(4) application.yml
spring:
cloud:
stream:
bindings:
error-consumer: # com.example.springcloud.topic.ErrorTopic
destination: error-out-topic
# 重試次數(shù)(本機(jī)重試)
# 次數(shù)=1 相當(dāng)于不重試 (不生效), 至少等于=2 才生效
consumer:
max-attempts: 2
error-producer:
destination: error-out-topic
(5) PostMan測(cè)試
POST localhost:63000/sendError
body:歡迎關(guān)注:https://blog.csdn.net/eddielee9217
第一次請(qǐng)求控制臺(tái)打印:
2021-03-03 22:32:01.928 INFO 3232 --- [fWtFNuaMvhNrQ-1] c.e.springcloud.biz.StreamConsumer : 你還好嗎?
2021-03-03 22:32:01.928 INFO 3232 --- [fWtFNuaMvhNrQ-1] c.e.springcloud.biz.StreamConsumer : 你怎么回事啊?
2021-03-03 22:32:02.931 INFO 3232 --- [fWtFNuaMvhNrQ-1] c.e.springcloud.biz.StreamConsumer : 你還好嗎?
2021-03-03 22:32:02.931 INFO 3232 --- [fWtFNuaMvhNrQ-1] c.e.springcloud.biz.StreamConsumer : 很好,謝謝。你呢?
第二次請(qǐng)求控制臺(tái)打印:
2021-03-03 22:46:32.802 INFO 3232 --- [fWtFNuaMvhNrQ-1] c.e.springcloud.biz.StreamConsumer : 你還好嗎?
2021-03-03 22:46:32.803 INFO 3232 --- [fWtFNuaMvhNrQ-1] c.e.springcloud.biz.StreamConsumer : 你怎么回事啊?
2021-03-03 22:46:33.804 INFO 3232 --- [fWtFNuaMvhNrQ-1] c.e.springcloud.biz.StreamConsumer : 你還好嗎?
2021-03-03 22:46:33.804 INFO 3232 --- [fWtFNuaMvhNrQ-1] c.e.springcloud.biz.StreamConsumer : 你怎么回事啊?
2021-03-03 22:46:33.807 ERROR 3232 --- [fWtFNuaMvhNrQ-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking com.example.springcloud.biz.StreamConsumer#consumeErrorMessage[1 args]; nested exception is java.lang.RuntimeException: 我不好~, failedMessage=GenericMessage [payload=byte[63], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=error-out-topic, amqp_deliveryTag=2, deliveryAttempt=2, amqp_consumerQueue=error-out-topic.anonymous._hDU9IsTSfWtFNuaMvhNrQ, amqp_redelivered=false, amqp_receivedRoutingKey=error-out-topic, amqp_timestamp=Wed Mar 03 22:46:32 CST 2021, amqp_messageId=6aa4565e-5b6b-ac90-d68e-58d3ec0b0800, id=e372c416-0c2f-6bc6-1509-53dac5f87167, amqp_consumerTag=amq.ctag-WsxmwGdQJ4yHUbdoHvQtdw, contentType=application/json, timestamp=1614782792802}]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:57)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:211)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:208)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1552)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1478)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1466)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1461)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1410)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: 我不好~
at com.example.springcloud.biz.StreamConsumer.consumeErrorMessage(StreamConsumer.java:88)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
... 29 more
分析:
第一次: 3 % 3 = 0
2021-03-03 22:57:59.771 INFO 6548 --- [zmiYknwj6_Azw-1] c.e.springcloud.biz.StreamConsumer : 你還好嗎?
2021-03-03 22:58:02.970 INFO 6548 --- [zmiYknwj6_Azw-1] c.e.springcloud.biz.StreamConsumer : 很好,謝謝。你呢?
第二次: 1 % 3 = 1
2021-03-03 22:58:49.909 INFO 6548 --- [zmiYknwj6_Azw-1] c.e.springcloud.biz.StreamConsumer : 你還好嗎?
false
2021-03-03 22:59:39.875 INFO 6548 --- [zmiYknwj6_Azw-1] c.e.springcloud.biz.StreamConsumer : 你怎么回事啊?
2021-03-03 22:59:40.876 INFO 6548 --- [zmiYknwj6_Azw-1] c.e.springcloud.biz.StreamConsumer : 你還好嗎?
********
第二次請(qǐng)求,也是不成功 1 不等于 0 就會(huì)自動(dòng)重試機(jī)制, 就會(huì)打印下面的報(bào)錯(cuò)信息
為什么是第二次呢? 因?yàn)榕渲茫簊pring.cloud.stream.bindings.error-consumer.consumer.max-attempts=2
********
false
2021-03-03 23:00:43.011 INFO 6548 --- [zmiYknwj6_Azw-1] c.e.springcloud.biz.StreamConsumer : 你怎么回事啊?
2021-03-03 23:00:43.012 ERROR 6548 --- [zmiYknwj6_Azw-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking com.example.springcloud.biz.StreamConsumer#consumeErrorMessage[1 args]; nested exception is java.lang.RuntimeException: 我不好~, failedMessage=GenericMessage [payload=byte[63], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=error-out-topic, amqp_deliveryTag=6, deliveryAttempt=2, amqp_consumerQueue=error-out-topic.anonymous.dt-kEM12TzmiYknwj6_Azw, amqp_redelivered=false, amqp_receivedRoutingKey=error-out-topic, amqp_timestamp=Wed Mar 03 22:58:49 CST 2021, amqp_messageId=2c3f4678-6690-0a84-c5e7-fcf916bdf39c, id=a98a82f7-261d-ba23-d365-a4e1af91a390, amqp_consumerTag=amq.ctag-AatP0-GItUPPvBi92sxuPg, contentType=application/json, timestamp=1614783529909}]
2-16 Stream實(shí)現(xiàn)Requeue操作
re-queue(重新入隊(duì)): 是指失敗的消息, 放回到 RabbitMQ 當(dāng)中, 然后讓消費(fèi)者的集群重新拉取消息.
創(chuàng)建Producer和Consumer
開啟 Re-queue功能 ( retry配置有沖突 )
側(cè)首 Re-queue在不同節(jié)點(diǎn)的消費(fèi)情況
創(chuàng)建主題
com.example.springcloud.topic.RequeueTopic
public interface RequeueTopic {
/**
* Input channel name.
*/
String INPUT = "requeue-consumer";
/**
* Output channel name.
*/
String OUTPUT = "requeue-producer";
/**
* input=消費(fèi)者
*/
@Input(INPUT)
SubscribableChannel input();
/**
* output=生產(chǎn)者
*/
@Output(OUTPUT)
MessageChannel output();
}
創(chuàng)建生產(chǎn)者 (Producer)
@PostMapping("requeue")
public void sendErrorMessageToMq(@RequestParam(value = "body") String body) {
MessageBean msg = new MessageBean();
msg.setPayload(body);
requeueTopicProducer.output().send(MessageBuilder.withPayload(msg).build());
}
創(chuàng)建消費(fèi)者(Consumer)
@StreamListener(RequeueTopic.INPUT)
public void requeueErrorMessage(MessageBean bean) {
log.info("Are you OK?");
try {
Thread.sleep(3000L);
} catch (Exception e) {
}
throw new RuntimeException("I'm not OK");
}
配置 Re-queue功能
# 異常消息(re-queue重試)
#
spring:
cloud:
stream:
bindings:
requeue-consumer:
destination: requeue-topic
group: requeue-group
consumer:
max-attempts: 1 # 強(qiáng)制 retry 次數(shù)指定=1 不讓你在原地 retry 把失敗消息退回到 rabbit 里在消費(fèi)
requeue-producer:
destination: requeue-topic
rabbit:
bindings:
requeue-consumer:
consumer:
requeueRejected: true # 僅對(duì)當(dāng)前requeue-consumer,開啟requeue
---
# 默認(rèn)全局開啟requeue
#spring:
# rabbitmq:
# listener:
# default-requeue-rejected: true
測(cè)試
本次Demo是無限循環(huán)來測(cè)試, 每隔三秒一次, 也會(huì)在兩個(gè)服務(wù)之間輪詢打印(在負(fù)載均衡環(huán)境下也是同理效果)
啟動(dòng)服務(wù)
StreamApplication (63000) :63000/
StreamApplication (63001) :63001/
PostMan
POST localhost:63000/requeue
body:歡迎關(guān)注:https://blog.csdn.net/eddielee9217


2-18 借助死信隊(duì)列實(shí)現(xiàn)異常處理
死信隊(duì)列介紹
使用 rabbitmq-plugins enable 命令開啟RabbitMQ插件
rabbitmq_shovel
rabbitmq_shovel_management
創(chuàng)建Producer和Consumer, 配置死信隊(duì)列
啟動(dòng)應(yīng)用, 查看RabbitMQ界面的死信隊(duì)列
死信隊(duì)列消息重新消費(fèi)
死信隊(duì)列 (DLQ)
(1) 介紹
死信隊(duì)列:DLX,dead-letter-exchange
利用DLX,當(dāng)消息在一個(gè)隊(duì)列中變成死信 (dead message) 之后,它能被重新publish到另一個(gè)Exchange,這個(gè)Exchange就是DLX
(2) 消息變成死信有以下幾種情況
消息被拒絕(basic.reject / basic.nack),并且requeue = false
消息TTL過期
隊(duì)列達(dá)到最大長度
(3) 死信處理過程
DLX也是一個(gè)正常的Exchange,和一般的Exchange沒有區(qū)別,它能在任何的隊(duì)列上被指定,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性。
當(dāng)這個(gè)隊(duì)列中有死信時(shí),RabbitMQ就會(huì)自動(dòng)的將這個(gè)消息重新發(fā)布到設(shè)置的Exchange上去,進(jìn)而被路由到另一個(gè)隊(duì)列。
可以監(jiān)聽這個(gè)隊(duì)列中的消息做相應(yīng)的處理。
(4)
開啟插件
基本默認(rèn)已經(jīng)安裝,只要開啟就可以
(1) 進(jìn)入容器, 查看插件狀態(tài)
[root@k8s-master1 ~]# docker exec -it myrabbit1 bash
root@rabbit1:/# rabbitmq-plugins list |grep 'rabbitmq_shovel'
[ ] rabbitmq_shovel 3.6.15
[ ] rabbitmq_shovel_management 3.6.15
(2) 開啟插件
root@rabbit1:/# rabbitmq-plugins enable rabbitmq_shovel
The following plugins have been enabled:
rabbitmq_shovel
Applying plugin configuration to rabbit@rabbit1... started 1 plugin.
root@rabbit1:/# rabbitmq-plugins enable rabbitmq_shovel_management
The following plugins have been enabled:
rabbitmq_shovel_management
Applying plugin configuration to rabbit@rabbit1... started 1 plugin.
(3) 確認(rèn)是否開啟插件
root@rabbit1:/# rabbitmq-plugins list |grep 'rabbitmq_shovel'
[E*] rabbitmq_shovel 3.6.15
[E*] rabbitmq_shovel_management 3.6.15
主題
com.example.springcloud.topic.DlqTopic
public interface DlqTopic {
/**
* Input channel name.
*/
String INPUT = "dlq-consumer";
/**
* Output channel name.
*/
String OUTPUT = "dlq-producer";
/**
* input=消費(fèi)者
*/
@Input(INPUT)
SubscribableChannel input();
/**
* output=生產(chǎn)者
*/
@Output(OUTPUT)
MessageChannel output();
}
Producer
com.example.springcloud.biz.controller.DemoController
@PostMapping("dlq")
public void sendMessageToDlq(@RequestParam(value = "body") String body) {
MessageBean msg = new MessageBean();
msg.setPayload(body);
dlqTopicProducer.output().send(MessageBuilder.withPayload(msg).build());
}
Consumer
com.example.springcloud.biz.StreamConsumer
@StreamListener(DlqTopic.INPUT)
public void consumeDlqMessage(MessageBean bean) {
log.info("Dlq - 你還好嗎?");
if (count.incrementAndGet() % 3 == 0) {
log.info("Dlq - 很好,謝謝。你呢?");
} else {
log.info("Dlq - 你怎么回事啊?");
throw new RuntimeException("我不好~");
}
}
死信隊(duì)列配置
spring:
cloud:
stream:
bindings:
dlq-consumer:
destination: dlq-topic
consumer:
max-attempts: 2
group: dlq-group
dlq-producer:
destination: dlq-topic
rabbit:
bindings:
dlq-consumer:
consumer:
auto-bind-dlq: true # 開啟死信隊(duì)列(默認(rèn) topic.dlq)
# 參數(shù)還有很多,比如:指定某個(gè)Queue 而不是使用自動(dòng)創(chuàng)建的等等...
測(cè)試環(huán)節(jié)
啟動(dòng)服務(wù)
StreamApplication (63000) :63000/
訪問網(wǎng)頁 http://192.168.8.240:15672/#/queues
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來直接上傳(img-xZuks5bH-1614935805037)(.README_images/539a4508.png)]
PostMan
POST localhost:63000/dlq
控制臺(tái)打印
2021-03-04 16:31:55.013 INFO 22640 --- [io-63000-exec-2] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.8.240:5672]
2021-03-04 16:31:55.022 INFO 22640 --- [io-63000-exec-2] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory.publisher#1d503bab:0/SimpleConnection@ce71016 [delegate=amqp://[email protected]:5672/, localPort= 2986]
2021-03-04 16:31:55.025 INFO 22640 --- [io-63000-exec-2] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (input.anonymous.wFTQGtlIT72X_ay6ogInmQ) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2021-03-04 16:31:55.025 INFO 22640 --- [io-63000-exec-2] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (broadcast.anonymous.9qkp5d-qSpCdWhEJHzuojQ) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2021-03-04 16:31:55.025 INFO 22640 --- [io-63000-exec-2] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (delayed-topic.anonymous.l1w0SIs3RbadrUSwJYkcQw) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2021-03-04 16:31:55.025 INFO 22640 --- [io-63000-exec-2] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (error-out-topic.anonymous.gOKMzyg-SKCYIv4ZvRNM7A) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2021-03-04 16:31:55.094 INFO 22640 --- [pic.dlq-group-1] c.e.springcloud.biz.StreamConsumer : Dlq - 你還好嗎?
2021-03-04 16:31:55.094 INFO 22640 --- [pic.dlq-group-1] c.e.springcloud.biz.StreamConsumer : Dlq - 你怎么回事啊?
2021-03-04 16:31:56.097 INFO 22640 --- [pic.dlq-group-1] c.e.springcloud.biz.StreamConsumer : Dlq - 你還好嗎?
2021-03-04 16:31:56.097 INFO 22640 --- [pic.dlq-group-1] c.e.springcloud.biz.StreamConsumer : Dlq - 很好,謝謝。你呢?
再次請(qǐng)求
POST localhost:63000/dlq
控制臺(tái)打印
2021-03-04 16:33:32.370 INFO 22640 --- [pic.dlq-group-1] c.e.springcloud.biz.StreamConsumer : Dlq - 你還好嗎?
2021-03-04 16:33:32.371 INFO 22640 --- [pic.dlq-group-1] c.e.springcloud.biz.StreamConsumer : Dlq - 你怎么回事啊?
2021-03-04 16:33:33.372 INFO 22640 --- [pic.dlq-group-1] c.e.springcloud.biz.StreamConsumer : Dlq - 你還好嗎?
2021-03-04 16:33:33.372 INFO 22640 --- [pic.dlq-group-1] c.e.springcloud.biz.StreamConsumer : Dlq - 你怎么回事啊?
2021-03-04 16:33:33.375 ERROR 22640 --- [pic.dlq-group-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking com.example.springcloud.biz.StreamConsumer#consumeDlqMessage[1 args]; nested exception is java.lang.RuntimeException: 我不好~, failedMessage=GenericMessage [payload=byte[63], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=dlq-topic, amqp_deliveryTag=2, deliveryAttempt=2, amqp_consumerQueue=dlq-topic.dlq-group, amqp_redelivered=false, amqp_receivedRoutingKey=dlq-topic, amqp_timestamp=Thu Mar 04 16:33:32 CST 2021, amqp_messageId=a3b4012e-4060-12f7-a022-7f640dbf2a58, id=60e3aeda-c236-79b1-c100-fc3ff6a8d540, amqp_consumerTag=amq.ctag-kGeRaKUKk_7lJjOYrlTIeA, contentType=application/json, timestamp=1614846812370}]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
...
Caused by: java.lang.RuntimeException: 我不好~
at com.example.springcloud.biz.StreamConsumer.consumeDlqMessage(StreamConsumer.java:121) ~[classes/:na]
...
會(huì)發(fā)現(xiàn)這次就拋出異常了.
然后在查看頁面, 會(huì)發(fā)現(xiàn) Queue dlq-topic.dlq-group.dlq 已經(jīng)有變化了

將這死信隊(duì)列里面的消息, 重新激活消費(fèi), 可以復(fù)制Queue里面的名稱進(jìn)行重新激活消費(fèi)

點(diǎn)擊 Move Message 后控制臺(tái)會(huì)打印正常消息
2021-03-04 17:29:42.034 INFO 22640 --- [pic.dlq-group-1] c.e.springcloud.biz.StreamConsumer : Dlq - 你還好嗎?
2021-03-04 17:29:42.038 INFO 22640 --- [pic.dlq-group-1] c.e.springcloud.biz.StreamConsumer : Dlq - 很好,謝謝。你呢?
Queue dlq-topic.dlq-group.dlq 的總數(shù)也歸零, 證明消費(fèi)了!
2-19 消息驅(qū)動(dòng)中的降級(jí)和接口升版
借助spring-integration實(shí)現(xiàn)Fallback邏輯
Consumer升級(jí)版的玩法
(1) 主題
public interface FallbackTopic {
/**
* Input channel name.
*/
String INPUT = "fallback-consumer";
/**
* Output channel name.
*/
String OUTPUT = "fallback-producer";
/**
* input=消費(fèi)者
*/
@Input(INPUT)
SubscribableChannel input();
/**
* output=生產(chǎn)者
*/
@Output(OUTPUT)
MessageChannel output();
}
(2) 生產(chǎn)者
@PostMapping("fallback")
public void sendMessageToFallback(
@RequestParam(value = "body") String body,
@RequestParam(value = "version", defaultValue = "1.0") String version) {
MessageBean msg = new MessageBean();
msg.setPayload(body);
fallbackTopicProducer.output().send(
MessageBuilder.withPayload(msg)
.setHeader("version", version)
.build()
);
}
(3) 消費(fèi)者
/**
* Fallback + 升級(jí)版本
* @param bean
* @param version
*/
@StreamListener(FallbackTopic.INPUT)
public void goodbyeBadGuy(MessageBean bean,
@Header("version") String version) {
log.info("Fallback - 你還好嗎?");
if ("1.0".equalsIgnoreCase(version)) {
log.info("Fallback - 很好,謝謝。你呢");
} else if ("2.0".equalsIgnoreCase(version)) {
log.info("Fallback - 不支持的版本");
throw new RuntimeException("我不好");
} else {
log.info("Fallback - 版本={}", version);
}
}
/**
* 降級(jí)流程
*
* input channel -> fallback-topic.fallback-group.errors
*
* 對(duì)應(yīng) application.yml 里面參數(shù)
*
* 如果出現(xiàn)異常和重試次數(shù)達(dá)到一定就會(huì)跳到這個(gè)方法
*
* @param message
*/
@ServiceActivator(inputChannel = "fallback-topic.fallback-group.errors")
public void fallback(Message<?> message) {
log.info("fallback - 已回退");
// 可以寫自己邏輯, 或者流程~
}
(4) 配置
# Fallback配置
# input channel -> fallback-topic.fallback-group.errors
spring:
cloud:
stream:
bindings:
fallback-consumer:
destination: fallback-topic
consumer:
max-attempts: 2
group: fallback-group
fallback-producer:
destination: fallback-topic
(5) 測(cè)試
第一次請(qǐng)求
POST localhost:63000/fallback
body:歡迎關(guān)注:https://blog.csdn.net/eddielee9217
version:1.0
2021-03-05 15:39:02.824 INFO 15472 --- [allback-group-1] c.e.springcloud.biz.StreamConsumer : Fallback - 你還好嗎?
2021-03-05 15:39:02.824 INFO 15472 --- [allback-group-1] c.e.springcloud.biz.StreamConsumer : Fallback - 很好,謝謝。你呢
第二次請(qǐng)求
POST localhost:63000/fallback
body:歡迎關(guān)注:https://blog.csdn.net/eddielee9217
version:2.0
2021-03-05 15:39:13.131 INFO 15472 --- [allback-group-1] c.e.springcloud.biz.StreamConsumer : Fallback - 你還好嗎?
2021-03-05 15:39:13.131 INFO 15472 --- [allback-group-1] c.e.springcloud.biz.StreamConsumer : Fallback - 不支持的版本
2021-03-05 15:39:14.135 INFO 15472 --- [allback-group-1] c.e.springcloud.biz.StreamConsumer : Fallback - 你還好嗎?
2021-03-05 15:39:14.135 INFO 15472 --- [allback-group-1] c.e.springcloud.biz.StreamConsumer : Fallback - 不支持的版本
2021-03-05 15:39:14.139 INFO 15472 --- [allback-group-1] c.e.springcloud.biz.StreamConsumer : fallback - 已回退
Code Download
————————————————
版權(quán)聲明:本文為CSDN博主「eddie_k2」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接及本聲明。
原文鏈接:
https://blog.csdn.net/eddielee9217/article/details/114404174
粉絲福利:Java從入門到入土學(xué)習(xí)路線圖
??????

??長按上方微信二維碼 2 秒
感謝點(diǎn)贊支持下哈 
