<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          SpringCloud 整合Stream實(shí)戰(zhàn)

          共 54439字,需瀏覽 109分鐘

           ·

          2021-03-08 19:20

          點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”

          優(yōu)質(zhì)文章,第一時(shí)間送達(dá)

          76套java從入門到精通實(shí)戰(zhàn)課程分享

          前言

          2-4 Stream快速入門-集成MQ消費(fèi)

          • 創(chuàng)建 stream-sample 項(xiàng)目, 引入依賴

          • 創(chuàng)建監(jiān)聽器 (聲明和綁定信道)

          • 從RabbitMQ觸發(fā)消息

          RabbitMQ 界面操作

            1、http://192.168.8.240:15672 (guest/guest)

            2、查看 Queue input.anonymous.Z5LyfIlEQtqw3hcJTAhHfA

            3、發(fā)送消息

                  1、下方 Publish message

                  2、Payload: 自定義內(nèi)容

                  3、點(diǎn)擊按鈕 Publish message

                  4、IDEA控制臺(tái)就會(huì)提示:“message consumed successfully, payload=自定義內(nèi)容”

          2-8 基于發(fā)布訂閱實(shí)現(xiàn)廣播功能

          • 創(chuàng)建消息生產(chǎn)者Producer服務(wù), 配置消息主題

          • 啟動(dòng)多個(gè)消費(fèi)者Consumer節(jié)點(diǎn)測(cè)試消息廣播

          • RabbitMQ界面查看廣播組(Exchanges)

          自定義主題 (Topic)

          com.example.springcloud.topic.MyTopic

          public interface MyTopic {

           /**
            * Input channel name.
            */
           String INPUT = "myTopic-consumer";

           /**
            * Output channel name.
            */
           String OUTPUT = "myTopic-producer";

           /**
            * input=消費(fèi)者
            */
           @Input(INPUT)
           SubscribableChannel input();

           /**
            * output=生產(chǎn)者
            */
           @Output(OUTPUT)
           MessageChannel output();

          }

          添加消費(fèi)者

          com.example.springcloud.biz.StreamConsumer

          @Slf4j
          // 綁定信道
          @EnableBinding(
                  value = {
                          Sink.class,
              MyTopic.class
                  }
          )
          public class StreamConsumer {

              @StreamListener(Sink.INPUT)
           public void consume(Object payload) {
            log.info("message consumed successfully, payload={}", payload);
           }

           @StreamListener(MyTopic.INPUT)
           public void consumeMyMessage(Object payload) {
            log.info("my message consumed successfully, payload={}", payload);
           }

          }

          使用配置文件, 綁定生產(chǎn)者和消費(fèi)者的通道

          application.yml

          # 綁定 Channel 到 broadcast
          spring:
            cloud:
              stream:
                bindings:
                  myTopic-consumer:  # 消費(fèi)者綁定
                    destination: broadcast # rabbitMq界面顯示 Exchange
                  myTopic-producer:  # 生產(chǎn)者綁定
                    destination: broadcast

          啟動(dòng)與測(cè)試

          (1) 按不同端口啟動(dòng)

          • StreamApplication (63000) :63000/

          • StreamApplication (63001) :63001/


          (2) Postman (demo - 最簡單的生產(chǎn)者消費(fèi)者)

          • POST localhost:63000/send

          • Body (x-www-form-urfencoded)

                 body:hello broadcast

          • 63000、630001 控制臺(tái)打印:

                 my message consumed successfully, payload=hello broadcast


          (3) RabbitMQ WEB

          • 打開 http://192.168.8.240:15672

          • 查看頂部 Exchanges 下面是否存在 “broadcast”

          • 查看 Bindings (每一個(gè)Queues都對(duì)應(yīng)后臺(tái)一個(gè)監(jiān)聽隊(duì)列)

                      broadcast.anonymous.DIILcrP3SvaGEUv6dfiAqQ

                      broadcast.anonymous.UnlUchdPQnaavW5uBIQjEA

          • 查看頂部 Queues 是否存在對(duì)應(yīng) Bindings

                      點(diǎn)擊 broadcast.anonymous.DIILcrP3SvaGEUv6dfiAqQ

                     進(jìn)入后, 點(diǎn)擊 Publish message 輸入"queues test"

                     返回 IDEA控制臺(tái) 就會(huì)顯示該條 Message

                             my message consumed successfully, payload=queues test


          2-10 消費(fèi)組和消息分區(qū)詳解

          消費(fèi)組

          前面我們接觸的都是廣播場(chǎng)景,話說這個(gè)廣播模式簡直就是個(gè)圍觀模式,所有訂閱相同主題的消費(fèi)者都眼巴巴看著生產(chǎn)者發(fā)布的消息,一個(gè)消息在所有節(jié)點(diǎn)都要被消費(fèi)一遍。如果我只想挑一個(gè)節(jié)點(diǎn)來消費(fèi)消息,而且又不能只逮著一只羊來薅羊毛,必須利用負(fù)載均衡來分發(fā)請(qǐng)求。這個(gè)Stream能不能辦到呢?

          這不就是單播模式嗎,那自然不在話下,Stream里的消費(fèi)組就是專門解決這個(gè)問題的。讓我們來用一個(gè)案例說明它的工作模式:



          在上面這個(gè)例子中,“商品發(fā)布”就是一個(gè)消息,它被放到了對(duì)應(yīng)的消息隊(duì)列中,有兩撥人馬同時(shí)盯著這個(gè)Topic,這兩撥人馬各自組成了一個(gè)Group,每次有新消息到來的時(shí)候,每個(gè)Group就派出一個(gè)代表去響應(yīng),而且是從這個(gè)Group中輪流挑選代表(負(fù)載均衡),這里的Group也就是我們說的消費(fèi)者。

          在Stream里配置一個(gè)消費(fèi)組非常簡單,下一小節(jié)我就帶大家去做一個(gè)Demo。在這里我就先小劇透一點(diǎn)內(nèi)容好了:


          spring.cloud.stream.bindings.group-producer.group=Group-A


          看破不說破,這里面是什么含義,且聽下節(jié)分享。


          消費(fèi)分區(qū)

          消費(fèi)分區(qū)消費(fèi)組,傻傻分不清楚。這兩個(gè)名字聽起來很像,其實(shí)并不是一碼事,消費(fèi)組相當(dāng)于是每組派一個(gè)代表去辦事兒,而消費(fèi)分區(qū)相當(dāng)于是專事專辦,也就是說,所有消息都會(huì)根據(jù)分區(qū)Key進(jìn)行劃分,帶有相同Key的消息只能被同一個(gè)消費(fèi)者處理。

          我們來看下面的消息分區(qū)例子:


          [外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來直接上傳(img-PmRdO4BG-1614935805027)(https://raw.githubusercontent.com/eddie-code/springcloud-demo-dec/develop/stream/.README_images/319e2d7e.png)]


          消息分區(qū)有一個(gè)預(yù)定義的分區(qū)Key,它是一個(gè)SpEL表達(dá)式(想想前面哪一章節(jié)講過SpEL?提示換一下,Key Resolver)。我們需要在配置文件中指定分區(qū)的總個(gè)數(shù)N,Stream就會(huì)為我們創(chuàng)建N個(gè)分區(qū),這里面每個(gè)分區(qū)就是一個(gè)Queue(可以在RabbitMQ管理界面中看到所有的分區(qū)隊(duì)列)。

          當(dāng)商品發(fā)布的消息被生產(chǎn)者發(fā)布時(shí),Stream會(huì)計(jì)算得出分區(qū)Key,從而決定這個(gè)消息應(yīng)該加入到哪個(gè)Queue里面。在這個(gè)過程中,每個(gè)消費(fèi)組/消費(fèi)者僅會(huì)連接到一個(gè)Queue,這個(gè)Queue中對(duì)應(yīng)的消息只能被特定的消費(fèi)組/消費(fèi)者來處理。


          2-11 基于消費(fèi)組實(shí)現(xiàn)輪循單播功能

          • 創(chuàng)建 Producer和Consumer

          • 配置消費(fèi)組, 啟動(dòng)兩個(gè)節(jié)點(diǎn)

          • RabbitMQ界面單播和廣播在Exchange中的不同

          • 消費(fèi)分區(qū)的配置項(xiàng)

          創(chuàng)建 GroupTopic

          com.example.springcloud.topic.GroupTopic

          public interface GroupTopic {

           /**
            * Input channel name.
            */
           String INPUT = "group-consumer";

           /**
            * Output channel name.
            */
           String OUTPUT = "group-producer";

           /**
            * input=消費(fèi)者
            */
           @Input(INPUT)
           SubscribableChannel input();

           /**
            * output=生產(chǎn)者
            */
           @Output(OUTPUT)
           MessageChannel output();

          }

          com.example.springcloud.biz.controller.DemoController

          @Autowired
          private GroupTopic groupTopicProducer;  // StreamConsumer 沒有綁定前是找不到 標(biāo)記紅色波浪線

          @PostMapping("sendToGroup")
          public void sendMessageToGroup(@RequestParam(value = "body") String body) {
              groupTopicProducer.output().send(MessageBuilder.withPayload(body).build());
          }

          com.example.springcloud.biz.StreamConsumer

          @Slf4j
          @EnableBinding(
                  value = { GroupTopic.class }
          )
          public class StreamConsumer {

           @StreamListener(GroupTopic.INPUT)
           public void consumeGroupMessage(Object payload) {
            log.info("Gourp message consumed successfully, payload={}", payload);
           }

          }

          ---
          # 消息分組示例
          spring:
            cloud:
              stream:
                bindings:
                  group-consumer:  # 消費(fèi)者綁定
                    destination: group-topic
                    group: Group-A
                  group-producer:  # 生產(chǎn)者綁定
                    destination: group-topic

          ---
          # 消費(fèi)分區(qū)配置
          spring:
            cloud:
              stream:
                bindings:
                  group-consumer: # com.example.springcloud.topic.GroupTopic
                    consumer:
                      partitioned: true # 打開消費(fèi)者的消費(fèi)分區(qū)功能
                  group-producer:
                    producer:
                      partition-count: 2 # 兩個(gè)消息分區(qū)
                      # SpEL (Key resolver) 可以定義復(fù)雜表達(dá)式生成Key
                      # 我們這里用最簡化的配置,只有索引參數(shù)為 1 的節(jié)點(diǎn)(消費(fèi)者),才能消費(fèi)消息 ***
                      partition-key-expression: "1"
                instanceCount: 2 # 當(dāng)前消費(fèi)者實(shí)例總數(shù)
                instanceIndex: 1 # 最大值 instanceCount-1,當(dāng)前實(shí)例的索引號(hào) ***

          啟動(dòng)與測(cè)試

          1、StreamApplication (63000) : Group-A-0

                 修改 “spring.cloud.stream.instanceIndex=0”

          2、StreamApplication (63001) : Group-A-1

                 修改 “spring.cloud.stream.instanceIndex=1”

          3、使用PostMan測(cè)試

                1、localhost:63000/sendToGroup

                2、Body (x-www-form-urfencoded)

                3、body:Test 測(cè)試 1234

          如何指定:
          通過消息分區(qū)實(shí)現(xiàn):
          請(qǐng)求后 Group-A-1 的控制臺(tái)會(huì)出現(xiàn)打印信息 "Test 測(cè)試 1234"
          無論請(qǐng)求多少次都會(huì)在 Group-A-1 打印,
          為什么呢? 
          因?yàn)樵O(shè)置了 "partition-key-expression: "1"" 指定消費(fèi)

          TIPS: 比如已經(jīng)指定了 Group-A-1 端口 63000, 再啟動(dòng)多個(gè) Group-A-1 端口 63001, 然后再次請(qǐng)求, 會(huì)發(fā)現(xiàn)他們是依次輪詢打印到控制臺(tái)

          spring.cloud.stream.bindings.group-consumer.group=Group-A 重點(diǎn)是這個(gè)分組配置來區(qū)分


          2-13 Stream+ MQ插件實(shí)現(xiàn)延遲消息

          配置插件, 重啟RabbitMQ

          創(chuàng)建 Producer 和 Consumer, 配置exchange-type

          添加Message Header傳遞延遲時(shí)間

          啟動(dòng)查看效果

          RabbitMQ部分

          部署與安裝插件

          • Docker - rabbitmq:3.6.15 部署

          • Docker - rabbitmq:3.6.15 部署 (備份地址)

          • 延遲消息 - 官方插件版本

          • 參考資料 - 安裝插件

          (1)   終端直接下載 (部署的版本是:3.6.15)

          [root@k8s-master1 opt]# wget https://dl.bintray.com/rabbitmq/community-plugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip
          --2021-03-01 22:28:32--  https://dl.bintray.com/rabbitmq/community-plugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip
          Resolving dl.bintray.com (dl.bintray.com)... 44.239.142.179, 52.10.12.153, 52.32.247.225, ...
          Connecting to dl.bintray.com (dl.bintray.com)|44.239.142.179|:443... connected.
          HTTP request sent, awaiting response... 200 OK
          Length: 29853 (29K) [application/zip]
          Saving to: ‘rabbitmq_delayed_message_exchange-20171215-3.6.x.zip’

          100%[==============================================================================================================================================================================================>] 29,853      73.7KB/s   in 0.4s   

          (2)   解壓

          [root@k8s-master1 opt]# unzip rabbitmq_delayed_message_exchange-20171215-3.6.x.zip
          Archive:  rabbitmq_delayed_message_exchange-20171215-3.6.x.zip
            inflating: rabbitmq_delayed_message_exchange-20171215-3.6.x.ez  

          (3)   拷貝到容器里

          [root@k8s-master1 ~]# docker cp /opt/rabbitmq_delayed_message_exchange-20171215-3.6.x.ez myrabbit1:/opt

          (4)   進(jìn)入容器

          [root@k8s-master1 opt]# docker exec -it myrabbit1 bash

          root@rabbit1:/# cp /opt/rabbitmq_delayed_message_exchange-20171215-3.6.x.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.15/plugins

          (5)   從 opt 到插件 plugins 里

          root@rabbit1:/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.15/plugins# cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.15/sbin

          root@rabbit1:/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.15/sbin# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
          The following plugins have been enabled:
            rabbitmq_delayed_message_exchange

          Applying plugin configuration to rabbit@rabbit1... started 1 plugin.

          (6)   查看 rabbitmq_delayed_message_exchange 是否安裝成功

          root@rabbit1:/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.15/sbin# rabbitmq-plugins list
           Configured: E = explicitly enabled; e = implicitly enabled
           | Status:   * = running on rabbit@rabbit1
           |/
          [e*] amqp_client                       3.6.15
          [e*] cowboy                            1.0.4
          [e*] cowlib                            1.0.2
          [  ] rabbitmq_amqp1_0                  3.6.15
          [  ] rabbitmq_auth_backend_ldap        3.6.15
          [  ] rabbitmq_auth_mechanism_ssl       3.6.15
          [  ] rabbitmq_consistent_hash_exchange 3.6.15
          [E*] rabbitmq_delayed_message_exchange 20171215-3.6.x
          [  ] rabbitmq_event_exchange           3.6.15
          [  ] rabbitmq_federation               3.6.15
          [  ] rabbitmq_federation_management    3.6.15
          [  ] rabbitmq_jms_topic_exchange       3.6.15
          [E*] rabbitmq_management               3.6.15
          [e*] rabbitmq_management_agent         3.6.15
          [  ] rabbitmq_management_visualiser    3.6.15
          [  ] rabbitmq_mqtt                     3.6.15
          [  ] rabbitmq_random_exchange          3.6.15
          [  ] rabbitmq_recent_history_exchange  3.6.15
          [  ] rabbitmq_sharding                 3.6.15
          [  ] rabbitmq_shovel                   3.6.15
          [  ] rabbitmq_shovel_management        3.6.15
          [  ] rabbitmq_stomp                    3.6.15
          [  ] rabbitmq_top                      3.6.15
          [  ] rabbitmq_tracing                  3.6.15
          [  ] rabbitmq_trust_store              3.6.15
          [e*] rabbitmq_web_dispatch             3.6.15
          [  ] rabbitmq_web_mqtt                 3.6.15
          [  ] rabbitmq_web_mqtt_examples        3.6.15
          [  ] rabbitmq_web_stomp                3.6.15
          [  ] rabbitmq_web_stomp_examples       3.6.15
          [  ] sockjs                            0.3.4

          (7)   Reboot Rabbitmq

          方式一
          docker restart myrabbit1 myrabbit2 myrabbit3

          方式二
          docker exec -it myrabbit1 bash
          rabbitmqctl stop
          rabbitmq-server

          (8)   訪問 WEB UI

          http://192.168.8.240:15672

          [外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來直接上傳(img-ywYxkyFJ-1614935805029)(.README_images/8967dfec.png)]


          Quick start

          (1)   創(chuàng)建Topic

          public interface DelayedTopic {

           /**
            * Input channel name.
            */
           String INPUT = "delayed-consumer";

           /**
            * Output channel name.
            */
           String OUTPUT = "delayed-producer";

           /**
            * input=消費(fèi)者
            */
           @Input(INPUT)
           SubscribableChannel input();

           /**
            * output=生產(chǎn)者
            */
           @Output(OUTPUT)
           MessageChannel output();

          }

          (2)   創(chuàng)建請(qǐng)求接口


          com.example.springcloud.biz.controller.DemoController

          @PostMapping("sendDM")
          public void sendDelayedMessage(@RequestParam(value = "body") String body,
                  @RequestParam(value = "seconds") Integer seconds) {

              MessageBean msg = new MessageBean();
              msg.setPayload(body);

              log.info("[{}]秒后準(zhǔn)備發(fā)送延遲消息",seconds);

              delayedTopicProducer.output().send(
                      MessageBuilder.withPayload(msg)
                              .setHeader("x-delay", seconds * 1000)
                              .build()
              );
          }

          (3)   消費(fèi)者創(chuàng)建打印MessageBaen信息


          com.example.springcloud.biz.StreamConsumer

          @StreamListener(DelayedTopic.INPUT)
          public void consumeDelayedMessage(MessageBean bean) {
              log.info("Delayed message consumed successfully, payload={}", bean.getPayload());
          }

          (4)   application.yml

          # 延遲消息配置
          spring:
            cloud:
              stream:
                bindings:
                  delayed-consumer:
                    destination: delayed-topic
                  delayed-producer:
                    destination: delayed-topic
                rabbit:
                  bindings:
                    delayed-producer:
                      producer:
                        delayed-exchange: true # 延遲隊(duì)列

          (4)   PostMan請(qǐng)求測(cè)試



          控制臺(tái)打印 (38-23) 剛好 15s

          2021-03-03 14:29:23.172  INFO 16512 --- [io-63000-exec-1] c.e.s.biz.controller.DemoController      : [15]秒后準(zhǔn)備發(fā)送延遲消息
          2021-03-03 14:29:23.184  INFO 16512 --- [io-63000-exec-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.8.240:5672]
          2021-03-03 14:29:23.194  INFO 16512 --- [io-63000-exec-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory.publisher#3d78cf08:0/SimpleConnection@4898ec6c [delegate=amqp://[email protected]:5672/, localPort= 11225]
          2021-03-03 14:29:23.197  INFO 16512 --- [io-63000-exec-1] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (input.anonymous.FLQqBEtsQ_-ti45RQP4C5g) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
          2021-03-03 14:29:23.197  INFO 16512 --- [io-63000-exec-1] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (broadcast.anonymous.sVMurJRTTPmzbJeiv2_YCA) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
          2021-03-03 14:29:23.197  INFO 16512 --- [io-63000-exec-1] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (delayed-topic.anonymous.zRO1l6z8R8yoRe-iHDcfMA) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.

          2021-03-03 14:29:38.244 INFO 16512 --- [8yoRe-iHDcfMA-1] c.e.springcloud.biz.StreamConsumer       : Delayed message consumed successfully, payload=歡迎關(guān)注:https://blog.csdn.net/eddielee9217

          2-14 Stream本地重試功能

          • 創(chuàng)建Producer和Consumer, 在Consumer中拋出異常

          • 設(shè)置重試次數(shù)

          • 重試成功和失敗的表現(xiàn)


          異常重試(單機(jī)版)

          (1)   創(chuàng)建 ErrorTopic

          com.example.springcloud.topic.ErrorTopic

          public interface ErrorTopic {

           /**
            * Input channel name.
            */
           String INPUT = "error-consumer";

           /**
            * Output channel name.
            */
           String OUTPUT = "error-producer";

           /**
            * input=消費(fèi)者
            */
           @Input(INPUT)
           SubscribableChannel input();

           /**
            * output=生產(chǎn)者
            */
           @Output(OUTPUT)
           MessageChannel output();

          }

          (2)   創(chuàng)建入口


          com.example.springcloud.biz.controller.DemoController

          @PostMapping("sendError")
          public void sendErrorMessage(@RequestParam(value = "body") String body) {
              MessageBean msg = new MessageBean();
              msg.setPayload(body);
              errorTopicProducer.output().send(
                      MessageBuilder.withPayload(msg).build()
              );
          }

          (3)   創(chuàng)建消費(fèi)


          com.example.springcloud.biz.StreamConsumer

          @StreamListener(ErrorTopic.INPUT)
          public void consumeErrorMessage(MessageBean bean) {
              log.info("你還好嗎?");
              // 每次都自增一 當(dāng)你被三整除就放行
              if (count.incrementAndGet() % 3 == 0) {
                  log.info("很好,謝謝。你呢?");
                  // 成功消費(fèi)以后, 就會(huì)清零
                  count.set(0);
              } else {
                  log.info("你怎么回事啊?");
                  throw new RuntimeException("我不好~");
              }
          }

          (4)   application.yml

          spring:
            cloud:
              stream:
                bindings:
                  error-consumer: # com.example.springcloud.topic.ErrorTopic
                    destination: error-out-topic
                    # 重試次數(shù)(本機(jī)重試)
                    # 次數(shù)=1 相當(dāng)于不重試 (不生效), 至少等于=2 才生效
                    consumer:
                      max-attempts: 2
                  error-producer:
                    destination: error-out-topic

          (5)   PostMan測(cè)試

          POST localhost:63000/sendError

          body:歡迎關(guān)注:https://blog.csdn.net/eddielee9217

          第一次請(qǐng)求控制臺(tái)打印:

          2021-03-03 22:32:01.928  INFO 3232 --- [fWtFNuaMvhNrQ-1] c.e.springcloud.biz.StreamConsumer       : 你還好嗎?
          2021-03-03 22:32:01.928 INFO 3232 --- [fWtFNuaMvhNrQ-1] c.e.springcloud.biz.StreamConsumer       : 你怎么回事啊?
          2021-03-03 22:32:02.931 INFO 3232 --- [fWtFNuaMvhNrQ-1] c.e.springcloud.biz.StreamConsumer       : 你還好嗎?
          2021-03-03 22:32:02.931 INFO 3232 --- [fWtFNuaMvhNrQ-1] c.e.springcloud.biz.StreamConsumer       : 很好,謝謝。你呢?

          第二次請(qǐng)求控制臺(tái)打印:

          2021-03-03 22:46:32.802  INFO 3232 --- [fWtFNuaMvhNrQ-1] c.e.springcloud.biz.StreamConsumer       : 你還好嗎?
          2021-03-03 22:46:32.803 INFO 3232 --- [fWtFNuaMvhNrQ-1] c.e.springcloud.biz.StreamConsumer       : 你怎么回事啊?
          2021-03-03 22:46:33.804 INFO 3232 --- [fWtFNuaMvhNrQ-1] c.e.springcloud.biz.StreamConsumer       : 你還好嗎?
          2021-03-03 22:46:33.804 INFO 3232 --- [fWtFNuaMvhNrQ-1] c.e.springcloud.biz.StreamConsumer       : 你怎么回事啊?
          2021-03-03 22:46:33.807 ERROR 3232 --- [fWtFNuaMvhNrQ-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking com.example.springcloud.biz.StreamConsumer#consumeErrorMessage[1 args]; nested exception is java.lang.RuntimeException: 我不好~, failedMessage=GenericMessage [payload=byte[63], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=error-out-topic, amqp_deliveryTag=2, deliveryAttempt=2, amqp_consumerQueue=error-out-topic.anonymous._hDU9IsTSfWtFNuaMvhNrQ, amqp_redelivered=false, amqp_receivedRoutingKey=error-out-topic, amqp_timestamp=Wed Mar 03 22:46:32 CST 2021, amqp_messageId=6aa4565e-5b6b-ac90-d68e-58d3ec0b0800, id=e372c416-0c2f-6bc6-1509-53dac5f87167, amqp_consumerTag=amq.ctag-WsxmwGdQJ4yHUbdoHvQtdw, contentType=application/json, timestamp=1614782792802}]
           at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
           at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
           at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
           at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
           at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
           at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
           at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
           at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
           at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
           at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
           at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
           at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
           at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
           at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
           at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:57)
           at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:211)
           at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
           at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
           at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:208)
           at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1552)
           at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1478)
           at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1466)
           at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1461)
           at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1410)
           at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870)
           at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854)
           at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78)
           at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137)
           at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043)
           at java.lang.Thread.run(Thread.java:748)
          Caused by: java.lang.RuntimeException: 我不好~
           at com.example.springcloud.biz.StreamConsumer.consumeErrorMessage(StreamConsumer.java:88)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
           at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
           at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
           ... 29 more

          分析:

          第一次:  3 % 3 = 0

          2021-03-03 22:57:59.771 INFO 6548 --- [zmiYknwj6_Azw-1] c.e.springcloud.biz.StreamConsumer       : 你還好嗎?
          2021-03-03 22:58:02.970  INFO 6548 --- [zmiYknwj6_Azw-1] c.e.springcloud.biz.StreamConsumer       : 很好,謝謝。你呢?


          第二次:  1 % 3 = 1

          2021-03-03 22:58:49.909 INFO 6548 --- [zmiYknwj6_Azw-1] c.e.springcloud.biz.StreamConsumer       : 你還好嗎?
          false
          2021-03-03 22:59:39.875 INFO 6548 --- [zmiYknwj6_Azw-1] c.e.springcloud.biz.StreamConsumer       : 你怎么回事啊?
          2021-03-03 22:59:40.876 INFO 6548 --- [zmiYknwj6_Azw-1] c.e.springcloud.biz.StreamConsumer       : 你還好嗎?

          ********
          第二次請(qǐng)求,也是不成功 1 不等于 0 就會(huì)自動(dòng)重試機(jī)制, 就會(huì)打印下面的報(bào)錯(cuò)信息

          為什么是第二次呢?  因?yàn)榕渲茫簊pring.cloud.stream.bindings.error-consumer.consumer.max-attempts=2
          ********

          false
          2021-03-03 23:00:43.011 INFO 6548 --- [zmiYknwj6_Azw-1] c.e.springcloud.biz.StreamConsumer       : 你怎么回事啊?
          2021-03-03 23:00:43.012 ERROR 6548 --- [zmiYknwj6_Azw-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking com.example.springcloud.biz.StreamConsumer#consumeErrorMessage[1 args]; nested exception is java.lang.RuntimeException: 我不好~, failedMessage=GenericMessage [payload=byte[63], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=error-out-topic, amqp_deliveryTag=6, deliveryAttempt=2, amqp_consumerQueue=error-out-topic.anonymous.dt-kEM12TzmiYknwj6_Azw, amqp_redelivered=false, amqp_receivedRoutingKey=error-out-topic, amqp_timestamp=Wed Mar 03 22:58:49 CST 2021, amqp_messageId=2c3f4678-6690-0a84-c5e7-fcf916bdf39c, id=a98a82f7-261d-ba23-d365-a4e1af91a390, amqp_consumerTag=amq.ctag-AatP0-GItUPPvBi92sxuPg, contentType=application/json, timestamp=1614783529909}]


          2-16 Stream實(shí)現(xiàn)Requeue操作

          re-queue(重新入隊(duì)): 是指失敗的消息, 放回到 RabbitMQ 當(dāng)中, 然后讓消費(fèi)者的集群重新拉取消息.


          • 創(chuàng)建Producer和Consumer

          • 開啟 Re-queue功能 ( retry配置有沖突 )

          • 側(cè)首 Re-queue在不同節(jié)點(diǎn)的消費(fèi)情況

          創(chuàng)建主題

          com.example.springcloud.topic.RequeueTopic

          public interface RequeueTopic {

           /**
            * Input channel name.
            */
           String INPUT = "requeue-consumer";

           /**
            * Output channel name.
            */
           String OUTPUT = "requeue-producer";

           /**
            * input=消費(fèi)者
            */
           @Input(INPUT)
           SubscribableChannel input();

           /**
            * output=生產(chǎn)者
            */
           @Output(OUTPUT)
           MessageChannel output();

          }

          創(chuàng)建生產(chǎn)者 (Producer)

          @PostMapping("requeue")
          public void sendErrorMessageToMq(@RequestParam(value = "body") String body) {
              MessageBean msg = new MessageBean();
              msg.setPayload(body);
              requeueTopicProducer.output().send(MessageBuilder.withPayload(msg).build());
          }

          創(chuàng)建消費(fèi)者(Consumer)

          @StreamListener(RequeueTopic.INPUT)
          public void requeueErrorMessage(MessageBean bean) {
              log.info("Are you OK?");
              try {
                  Thread.sleep(3000L);
              } catch (Exception e) {
              }
               throw new RuntimeException("I'm not OK");
          }

          配置 Re-queue功能

          # 異常消息(re-queue重試)
          #
          spring:
            cloud:
              stream:
                bindings:
                  requeue-consumer:
                    destination: requeue-topic
                    group: requeue-group
                    consumer:
                      max-attempts: 1 # 強(qiáng)制 retry 次數(shù)指定=1 不讓你在原地 retry 把失敗消息退回到 rabbit 里在消費(fèi)
                  requeue-producer:
                    destination: requeue-topic
                rabbit:
                  bindings:
                    requeue-consumer:
                      consumer:
                        requeueRejected: true # 僅對(duì)當(dāng)前requeue-consumer,開啟requeue

          ---
          # 默認(rèn)全局開啟requeue
          #spring:
          #  rabbitmq:
          #    listener:
          #      default-requeue-rejected: true

          測(cè)試

          本次Demo是無限循環(huán)來測(cè)試, 每隔三秒一次, 也會(huì)在兩個(gè)服務(wù)之間輪詢打印(在負(fù)載均衡環(huán)境下也是同理效果)


          啟動(dòng)服務(wù)


          StreamApplication (63000) :63000/

          StreamApplication (63001) :63001/

          PostMan

          POST localhost:63000/requeue

          body:歡迎關(guān)注:https://blog.csdn.net/eddielee9217


          2-18 借助死信隊(duì)列實(shí)現(xiàn)異常處理

          • 死信隊(duì)列介紹

          • 使用 rabbitmq-plugins enable 命令開啟RabbitMQ插件

                        rabbitmq_shovel

                        rabbitmq_shovel_management

          • 創(chuàng)建Producer和Consumer, 配置死信隊(duì)列

          • 啟動(dòng)應(yīng)用, 查看RabbitMQ界面的死信隊(duì)列

          • 死信隊(duì)列消息重新消費(fèi)

          死信隊(duì)列 (DLQ)

          (1)   介紹

          • 死信隊(duì)列:DLX,dead-letter-exchange

          • 利用DLX,當(dāng)消息在一個(gè)隊(duì)列中變成死信 (dead message) 之后,它能被重新publish到另一個(gè)Exchange,這個(gè)Exchange就是DLX


          (2)   消息變成死信有以下幾種情況

          • 消息被拒絕(basic.reject / basic.nack),并且requeue = false

          • 消息TTL過期

          • 隊(duì)列達(dá)到最大長度


          (3)   死信處理過程

          • DLX也是一個(gè)正常的Exchange,和一般的Exchange沒有區(qū)別,它能在任何的隊(duì)列上被指定,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性。

          • 當(dāng)這個(gè)隊(duì)列中有死信時(shí),RabbitMQ就會(huì)自動(dòng)的將這個(gè)消息重新發(fā)布到設(shè)置的Exchange上去,進(jìn)而被路由到另一個(gè)隊(duì)列。

          • 可以監(jiān)聽這個(gè)隊(duì)列中的消息做相應(yīng)的處理。


          (4)  

          開啟插件

          基本默認(rèn)已經(jīng)安裝,只要開啟就可以


          (1)   進(jìn)入容器, 查看插件狀態(tài)

          [root@k8s-master1 ~]# docker exec -it myrabbit1 bash
          root@rabbit1:/# rabbitmq-plugins list |grep 'rabbitmq_shovel'
          [  ] rabbitmq_shovel                   3.6.15
          [  ] rabbitmq_shovel_management        3.6.15

          (2)   開啟插件

          root@rabbit1:/# rabbitmq-plugins enable rabbitmq_shovel
          The following plugins have been enabled:
            rabbitmq_shovel

          Applying plugin configuration to rabbit@rabbit1... started 1 plugin.

          root@rabbit1:/# rabbitmq-plugins enable rabbitmq_shovel_management 
          The following plugins have been enabled:
            rabbitmq_shovel_management

          Applying plugin configuration to rabbit@rabbit1... started 1 plugin.

          (3)   確認(rèn)是否開啟插件

          root@rabbit1:/# rabbitmq-plugins list |grep 'rabbitmq_shovel'
          [E*] rabbitmq_shovel                   3.6.15
          [E*] rabbitmq_shovel_management        3.6.15

          主題

          com.example.springcloud.topic.DlqTopic

          public interface DlqTopic {

           /**
            * Input channel name.
            */
           String INPUT = "dlq-consumer";

           /**
            * Output channel name.
            */
           String OUTPUT = "dlq-producer";

           /**
            * input=消費(fèi)者
            */
           @Input(INPUT)
           SubscribableChannel input();

           /**
            * output=生產(chǎn)者
            */
           @Output(OUTPUT)
           MessageChannel output();

          }

          Producer

          com.example.springcloud.biz.controller.DemoController

          @PostMapping("dlq")
          public void sendMessageToDlq(@RequestParam(value = "body") String body) {
              MessageBean msg = new MessageBean();
              msg.setPayload(body);
              dlqTopicProducer.output().send(MessageBuilder.withPayload(msg).build());
          }

          Consumer

          com.example.springcloud.biz.StreamConsumer

          @StreamListener(DlqTopic.INPUT)
          public void consumeDlqMessage(MessageBean bean) {
              log.info("Dlq - 你還好嗎?");
              if (count.incrementAndGet() % 3 == 0) {
                  log.info("Dlq - 很好,謝謝。你呢?");
              } else {
                  log.info("Dlq - 你怎么回事啊?");
                  throw new RuntimeException("我不好~");
              }
          }

          死信隊(duì)列配置

          spring:
            cloud:
              stream:
                bindings:
                  dlq-consumer:
                    destination: dlq-topic
                    consumer:
                      max-attempts: 2
                    group: dlq-group
                  dlq-producer:
                    destination: dlq-topic
                rabbit:
                  bindings:
                    dlq-consumer:
                      consumer:
                        auto-bind-dlq: true # 開啟死信隊(duì)列(默認(rèn) topic.dlq)
                                            # 參數(shù)還有很多,比如:指定某個(gè)Queue 而不是使用自動(dòng)創(chuàng)建的等等...

          測(cè)試環(huán)節(jié)

          • 啟動(dòng)服務(wù)

                   StreamApplication (63000) :63000/

                   訪問網(wǎng)頁 http://192.168.8.240:15672/#/queues

          [外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來直接上傳(img-xZuks5bH-1614935805037)(.README_images/539a4508.png)]


          • PostMan

                    POST localhost:63000/dlq

          控制臺(tái)打印

          2021-03-04 16:31:55.013  INFO 22640 --- [io-63000-exec-2] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.8.240:5672]
          2021-03-04 16:31:55.022  INFO 22640 --- [io-63000-exec-2] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory.publisher#1d503bab:0/SimpleConnection@ce71016 [delegate=amqp://[email protected]:5672/, localPort= 2986]
          2021-03-04 16:31:55.025  INFO 22640 --- [io-63000-exec-2] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (input.anonymous.wFTQGtlIT72X_ay6ogInmQ) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
          2021-03-04 16:31:55.025  INFO 22640 --- [io-63000-exec-2] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (broadcast.anonymous.9qkp5d-qSpCdWhEJHzuojQ) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
          2021-03-04 16:31:55.025  INFO 22640 --- [io-63000-exec-2] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (delayed-topic.anonymous.l1w0SIs3RbadrUSwJYkcQw) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
          2021-03-04 16:31:55.025  INFO 22640 --- [io-63000-exec-2] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (error-out-topic.anonymous.gOKMzyg-SKCYIv4ZvRNM7A) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
          2021-03-04 16:31:55.094  INFO 22640 --- [pic.dlq-group-1] c.e.springcloud.biz.StreamConsumer       : Dlq - 你還好嗎?
          2021-03-04 16:31:55.094  INFO 22640 --- [pic.dlq-group-1] c.e.springcloud.biz.StreamConsumer       : Dlq - 你怎么回事啊?
          2021-03-04 16:31:56.097  INFO 22640 --- [pic.dlq-group-1] c.e.springcloud.biz.StreamConsumer       : Dlq - 你還好嗎?
          2021-03-04 16:31:56.097 INFO 22640 --- [pic.dlq-group-1] c.e.springcloud.biz.StreamConsumer       : Dlq - 很好,謝謝。你呢?

          • 再次請(qǐng)求

                    POST localhost:63000/dlq

          控制臺(tái)打印

          2021-03-04 16:33:32.370  INFO 22640 --- [pic.dlq-group-1] c.e.springcloud.biz.StreamConsumer       : Dlq - 你還好嗎?
          2021-03-04 16:33:32.371  INFO 22640 --- [pic.dlq-group-1] c.e.springcloud.biz.StreamConsumer       : Dlq - 你怎么回事啊?
          2021-03-04 16:33:33.372  INFO 22640 --- [pic.dlq-group-1] c.e.springcloud.biz.StreamConsumer       : Dlq - 你還好嗎?
          2021-03-04 16:33:33.372  INFO 22640 --- [pic.dlq-group-1] c.e.springcloud.biz.StreamConsumer       : Dlq - 你怎么回事啊?
          2021-03-04 16:33:33.375 ERROR 22640 --- [pic.dlq-group-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking com.example.springcloud.biz.StreamConsumer#consumeDlqMessage[1 args]; nested exception is java.lang.RuntimeException: 我不好~, failedMessage=GenericMessage [payload=byte[63], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=dlq-topic, amqp_deliveryTag=2, deliveryAttempt=2, amqp_consumerQueue=dlq-topic.dlq-group, amqp_redelivered=false, amqp_receivedRoutingKey=dlq-topic, amqp_timestamp=Thu Mar 04 16:33:32 CST 2021, amqp_messageId=a3b4012e-4060-12f7-a022-7f640dbf2a58, id=60e3aeda-c236-79b1-c100-fc3ff6a8d540, amqp_consumerTag=amq.ctag-kGeRaKUKk_7lJjOYrlTIeA, contentType=application/json, timestamp=1614846812370}]
           at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
              ...
          Caused by: java.lang.RuntimeException: 我不好~
           at com.example.springcloud.biz.StreamConsumer.consumeDlqMessage(StreamConsumer.java:121) ~[classes/:na]
              ...

          會(huì)發(fā)現(xiàn)這次就拋出異常了.


          然后在查看頁面, 會(huì)發(fā)現(xiàn) Queue dlq-topic.dlq-group.dlq 已經(jīng)有變化了



          將這死信隊(duì)列里面的消息, 重新激活消費(fèi), 可以復(fù)制Queue里面的名稱進(jìn)行重新激活消費(fèi)



          點(diǎn)擊 Move Message 后控制臺(tái)會(huì)打印正常消息

          2021-03-04 17:29:42.034  INFO 22640 --- [pic.dlq-group-1] c.e.springcloud.biz.StreamConsumer       : Dlq - 你還好嗎?
          2021-03-04 17:29:42.038 INFO 22640 --- [pic.dlq-group-1] c.e.springcloud.biz.StreamConsumer       : Dlq - 很好,謝謝。你呢?

          Queue dlq-topic.dlq-group.dlq 的總數(shù)也歸零, 證明消費(fèi)了!


          2-19 消息驅(qū)動(dòng)中的降級(jí)和接口升版

          • 借助spring-integration實(shí)現(xiàn)Fallback邏輯

          • Consumer升級(jí)版的玩法

          (1)   主題

          public interface FallbackTopic {

           /**
            * Input channel name.
            */
           String INPUT = "fallback-consumer";

           /**
            * Output channel name.
            */
           String OUTPUT = "fallback-producer";

           /**
            * input=消費(fèi)者
            */
           @Input(INPUT)
           SubscribableChannel input();

           /**
            * output=生產(chǎn)者
            */
           @Output(OUTPUT)
           MessageChannel output();

          }

          (2)   生產(chǎn)者

          @PostMapping("fallback")
          public void sendMessageToFallback(
                  @RequestParam(value = "body") String body,
                  @RequestParam(value = "version", defaultValue = "1.0") String version) {

              MessageBean msg = new MessageBean();
              msg.setPayload(body);

              fallbackTopicProducer.output().send(
                      MessageBuilder.withPayload(msg)
                              .setHeader("version", version)
                              .build()
              );
          }

          (3)   消費(fèi)者

          /**
           * Fallback + 升級(jí)版本
           * @param bean
           * @param version
           */
          @StreamListener(FallbackTopic.INPUT)
          public void goodbyeBadGuy(MessageBean bean,
                                    @Header("version") String version) {
              log.info("Fallback - 你還好嗎?");

              if ("1.0".equalsIgnoreCase(version)) {
                  log.info("Fallback - 很好,謝謝。你呢");

              } else if ("2.0".equalsIgnoreCase(version)) {
                  log.info("Fallback - 不支持的版本");
                  throw new RuntimeException("我不好");
              } else {
                  log.info("Fallback - 版本={}", version);
              }
          }

          /**
           * 降級(jí)流程
           *
           * input channel -> fallback-topic.fallback-group.errors
           *
           * 對(duì)應(yīng) application.yml 里面參數(shù)
           *
           * 如果出現(xiàn)異常和重試次數(shù)達(dá)到一定就會(huì)跳到這個(gè)方法
           * 
           * @param message
           */
          @ServiceActivator(inputChannel = "fallback-topic.fallback-group.errors")
          public void fallback(Message<?> message) {
              log.info("fallback - 已回退");
              // 可以寫自己邏輯, 或者流程~ 
          }

          (4)   配置

          # Fallback配置
          # input channel -> fallback-topic.fallback-group.errors
          spring:
            cloud:
              stream:
                bindings:
                  fallback-consumer:
                    destination: fallback-topic
                    consumer:
                      max-attempts: 2
                    group: fallback-group
                  fallback-producer:
                    destination: fallback-topic

          (5)   測(cè)試


          第一次請(qǐng)求

          POST localhost:63000/fallback

          body:歡迎關(guān)注:https://blog.csdn.net/eddielee9217
          version:1.0

          2021-03-05 15:39:02.824  INFO 15472 --- [allback-group-1] c.e.springcloud.biz.StreamConsumer       : Fallback - 你還好嗎?
          2021-03-05 15:39:02.824 INFO 15472 --- [allback-group-1] c.e.springcloud.biz.StreamConsumer       : Fallback - 很好,謝謝。你呢

          第二次請(qǐng)求

          POST localhost:63000/fallback

          body:歡迎關(guān)注:https://blog.csdn.net/eddielee9217
          version:2.0

          2021-03-05 15:39:13.131  INFO 15472 --- [allback-group-1] c.e.springcloud.biz.StreamConsumer       : Fallback - 你還好嗎?
          2021-03-05 15:39:13.131  INFO 15472 --- [allback-group-1] c.e.springcloud.biz.StreamConsumer       : Fallback - 不支持的版本
          2021-03-05 15:39:14.135  INFO 15472 --- [allback-group-1] c.e.springcloud.biz.StreamConsumer       : Fallback - 你還好嗎?
          2021-03-05 15:39:14.135  INFO 15472 --- [allback-group-1] c.e.springcloud.biz.StreamConsumer       : Fallback - 不支持的版本
          2021-03-05 15:39:14.139  INFO 15472 --- [allback-group-1] c.e.springcloud.biz.StreamConsumer       : fallback - 已回退

          Code Download

          ————————————————

          版權(quán)聲明:本文為CSDN博主「eddie_k2」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接及本聲明。

          原文鏈接:

          https://blog.csdn.net/eddielee9217/article/details/114404174





          粉絲福利:Java從入門到入土學(xué)習(xí)路線圖

          ??????

          ??長按上方微信二維碼 2 秒


          感謝點(diǎn)贊支持下哈 

          瀏覽 73
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  97成人人妻一区二区三区 | 大香蕉肏屄 | 成人视频网站18 | 欧美成人视频免费网站 | 亚洲精品456在线播放 |