<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          SpringCloud Stream 沒人用?

          共 7059字,需瀏覽 15分鐘

           ·

          2022-06-30 12:50

          簡介

          SpringCloud Stream 是一個用于構(gòu)建與共享消息系統(tǒng)連接的高度可擴(kuò)展的事件驅(qū)動微服務(wù)組件。它提供了一個靈活的編程模型,基于Spring Boot 建立獨立的生產(chǎn)級 Spring 應(yīng)用程序,并使用 Spring Integration 提供與消息代理的連接可以讓我們在使用時幾乎無需關(guān)心具體的消息隊列實現(xiàn)。它屏蔽底層消息中間件的差異,降低切換成本,統(tǒng)一消息的編程模型,讓開發(fā)人員能夠更多的關(guān)注自己的業(yè)務(wù)。

          架構(gòu)模型

          或許我們也可以看一個更為簡潔的圖

          我們可以看到,每個系統(tǒng)只依賴于自己的 Binder 和消息中間件或者說其他系統(tǒng)交互, Stream 隱藏了所有消息的發(fā)送細(xì)節(jié),對于它來說只關(guān)心三個核心模塊

          • Destination Binders:目標(biāo)綁定器,告訴 Stream 你需要綁定到哪個消息隊列服務(wù)的 Binder 實現(xiàn)即可。例如 RabbitMQ 還是 Kafka 的 Binder?這是它的核心構(gòu)建塊,負(fù)責(zé)支持和提供與我們擁有的外部系統(tǒng)或外部消息傳遞系統(tǒng)的集成

          • Destination Bindings:目的地綁定,把消息生產(chǎn)者和消費者之間的橋梁提供給 Stream 。例如對于 RabbitMQ 來說,你需要告訴 Stream 當(dāng)前系統(tǒng)發(fā)送消息所使用的的 channel -> exchange -> routingKey -> queue 分別是什么(當(dāng)然這些都是在配置文件中完成的)

          • Message:就是我們需要發(fā)送的消息

          對于任何消息來說,只需要提供上述三個核心模塊即可,我們無需去關(guān)心發(fā)送的細(xì)節(jié)。

          直至 SpringCloud Stream 3.2.1 版本,它已經(jīng)支持了幾乎所有市面上流行的消息隊列產(chǎn)品。RabbitMQ、Kafka、RocketMQ、AWS SNS/SQS 等等,主要是因為這種一統(tǒng)江湖的趨勢讓不同的消息中間件廠商都開發(fā)了自己的綁定器 Binder 提供給 SpringCloud Stream。

          初體驗

          下面以 RabbitMQ 為例體驗一下 Stream 消息驅(qū)動開發(fā)。首先我們需要引入依賴

          <dependency>    <groupId>org.springframework.cloud</groupId>    <artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

          接下來我們需要在配置文件中指定相關(guān)配置,在此之前請確保你對 RabbitMQ 中的組件有一個基本的認(rèn)識,否則請先閱讀 RabbitMQ 基礎(chǔ)篇

          生產(chǎn)者

          配置文件:

          spring:  #消息隊列地址  rabbitmq:    host: 129.204.178.49 #你的 rabbitmq 服務(wù)地址    port: 5672    username: guest    password: guest      cloud:    stream: #SpringCloud Stream 配置      bindings:        output-channel-demo: # channel 消息輸出通道          destination: demo-exchange  # 交換機(jī)          binder: demo-binder  # 綁定器      binders:        demo-binder: #綁定器          type: rabbit # rabbitmq      rabbit:        bindings:          output-channel-demo: # channel 消息輸出通道            producer:          # 生產(chǎn)者              routing-key-expression: '''demoRoutingKey'''

          聲明輸出通道

          /** * 聲明消息輸出通道 channel * */public interface MessageSource {    @Output("output-channel-demo")    MessageChannel output();}

          定義一個通道綁定類

          /** * 該注解用來指定一個或多個定義了 @Input@Output 注解的接口,以此實現(xiàn)對消息通道(Channel)的綁定 * */@EnableBinding(MessageSource.class)public class MessageSourceHandler {
          }

          接下來我們寫一個集成測試發(fā)送消息即可

          @AutowiredMessageSource messageSource;
          /** 發(fā)送消息測試 */@Testpublic void test() { messageSource.output().send(MessageBuilder.withPayload("測試消息").build());}

          此時消息就成功的發(fā)送出去了,接下來我們來寫消費者

          消費者

          配置文件

          spring:  cloud:    stream:      binders:        demo-binder: #綁定器          type: rabbit  #rabbitmq      rabbit:        bindings:          input-channel-demo: #消息輸入通道 channel            consumer:              binding-routing-key: 'demoRoutingKey'      bindings:        input-channel-demo: #消息輸入通道 channel          group: someGroup #防止多個消費者實例重復(fù)接收消息,這樣一條消息只會發(fā)送給相同組的其中一個實例          destination: demo-exchange #交換機(jī)          binder: demo-binder #綁定器  rabbitmq:      host: 129.204.178.49      port: 5672      username: guest      password: guest

          聲明輸入通道

          /** * 聲明消息輸入通道 channel * */public interface MessageSink {
          @Input("input-channel-demo") SubscribableChannel input();}

          聲明綁定類

          @EnableBinding(MessageSink.class)public class MessageSinkHandler {
          /** * 監(jiān)聽 input-channel-demo 通道的消息,該 @StreamListener 注解支持 SPEL 表達(dá)式,但是被標(biāo)注的方法不能有返回值 * */ @StreamListener("input-channel-demo") public void consume(String message){ System.out.println("接受到消息:"+message); }}

          這樣一個完整的 SpringCloud Stream 微服務(wù)消息驅(qū)動的 demo 就完成了,啟動應(yīng)用,消費者能成功的收到生產(chǎn)者發(fā)送的測試消息。要用好 SpringCloud Stream 你必須弄懂配置文件的內(nèi)容!

          GitHub 源碼地址  https://github.com/yanzhisishui/springcloud-stream.git ,或者微信公眾號回復(fù) “SpringCloud Stream 入門案例源碼”。

          發(fā)送延遲消息

          在 SpringCloud Stream 中發(fā)送延遲消息非常簡單,首先我們需要在生產(chǎn)者、消費者的配置文件中指定交換機(jī)的類型是延遲交換機(jī)

          rabbit:  bindings:    input-channel-demo: #消息輸入通道 channel      consumer:        delayed-exchange: true        binding-routing-key: 'demoRoutingKey'

          生產(chǎn)者一樣,這里省略。然后只需要在上面發(fā)送的代碼中加一個 header 即可

          //設(shè)置消息30秒后發(fā)送到消費者messageSource.output().send(MessageBuilder.withPayload("測試消息")        .setHeader("x-delay",30 * 1000).build());

          如果你發(fā)送延遲消息拋出 unknown exchange type 'x-delayed-message' 異常,那么是因為你的 RabbitMQ 服務(wù)沒有安裝延遲隊列插件。去官網(wǎng)安裝一下即可

          這樣一個延遲消息的業(yè)務(wù)就實現(xiàn)了,看到這里你會發(fā)現(xiàn)使用 SpringCloud Stream 整合消息很簡單,例如實際上對于整合 RabbitMQ 來說,幾乎所有的配置都在 RabbitConsumerProperties、RabbitProductProperties 中,生產(chǎn)者和消費者共有的屬性在它們的父類 RabbitCommonProperties中。幾乎 RabbitMQ 的所有特性和功能都可以直接在配置文件中完成。作者能力有限,其他高級特性配置詳情可以參考官網(wǎng) RabbitMQ Consumer Properties  (https://docs.spring.io/spring-cloud-stream-binder-rabbit/docs/3.2.1/reference/html/spring-cloud-stream-binder-rabbit.html#_rabbitmq_consumer_properties )。

          但如果你真這么覺得那你就大錯特錯了,正如 SpringBoot ,用起來很簡單可能只需要花費 20% 的精力,但是想玩的好,可能要付出 200% 的精力。SpringCloud Stream 其實包含了一系列復(fù)雜技術(shù)體系,Spring Intergration、Spring Message、Spring AMQP 等等,其內(nèi)部原理實現(xiàn)、組件的集成非常復(fù)雜。

          我想 SpringCloud Stream 出生這么久還不廣泛流行的原因之一就是,這一套技術(shù)體系涉及的東西太多了,萬一生產(chǎn)環(huán)境出現(xiàn)什么疑難雜癥,需要去閱讀源碼解決的話,這樣的技術(shù)工作量是很超出預(yù)期的。

          Spring Message

          Spring Message 是 Spring Framework 的一個子模塊,它定義了消息的統(tǒng)一編程模型,實際上 SpringCloud Stream 也是基于它實現(xiàn)的統(tǒng)一。

          Spring Message 定義了上圖的消息編程模型,提出了通道 Channel 和 消息 Message 的抽象,所有的消息都由生產(chǎn)者發(fā)送到輸出通道 Output 中給消息中間件,然后所有的消費者都從輸入通道 Input 中獲取消息,而消息 Message 本身由兩部分組成,消息頭 header 和 消息體 payload

          在上述的 初體驗 中,我們涉及到的幾個核心注解正是該模型的體現(xiàn)

          • @Output:代表輸出通道,生產(chǎn)者從這發(fā)出消息

          • @Input:代表輸入通道,消費者從這讀取消息

          • @EnableBinding:將定義通道的接口綁定到某個 Bean 以便于我們可以通過該 Bean 操作通道進(jìn)行發(fā)送和接收消息。

          • @StreamListener:訂閱輸入通道中的消息

          SpringCloud Function 函數(shù)式編程

          在 SpringCloud Stream 3.1 版本之后,你會發(fā)現(xiàn) @EnableBinding 等幾個核心注解被官方標(biāo)注廢棄了,這是因為官方推出了更新的函數(shù)式編程模型 SpringCloud Function,試圖用這個組件將編程推向一個更高的層次。本篇文章不詳細(xì)介紹該組件,簡單介紹在 SpringCloud Stream 中如何結(jié)合 SpringCloud Function 進(jìn)行消息發(fā)送和消費。

          在結(jié)合 SpringCloud Function 時消息的通道命名要遵循以下約定

          • 輸入 : <functionName> + -in- + <index>

          • 輸出 : <functionName> + -out- + <index>

          index 代表輸入或輸出綁定的索引,目前我們直接寫 0 即可。

          任務(wù)型消息

          參考官方文檔 Suppliers (Sources) (https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#_suppliers_sources),我們開始寫一個生產(chǎn)者的發(fā)送消息方法。

          @Beanpublic Supplier<String> source1() {    return () -> "測試定時消息";}

          然后根據(jù)通道規(guī)則在 application.yml 中配置通道名為 source1-out-0,再配置 spring.cloud.function.destination = source1 ,指定 function 的函數(shù)方法名。

          接下來我們開始寫消費者,同樣我們需要一個消費方法。

          @Beanpublic Consumer<String> sink1() {    return message -> System.out.println("收到消息:" + message);}

          然后根據(jù)通道規(guī)則把配置文件中的通道名改為 sink1-in-0。這樣一個簡單的定時消息的發(fā)送和接收就完成了,生產(chǎn)者會每秒給消費者發(fā)送一條消息,不得不說,SpringCloud Stream 和 SpringCloud Function 的集成真的是......太神奇了。

          業(yè)務(wù)觸發(fā)型消息

          但通常我們更多的應(yīng)用場景是業(yè)務(wù)觸發(fā)發(fā)送消息,所以 SpringCloud Stream 給我們提供了一個 StreamBridge 組件。使用它發(fā)送消息只要指定通道名即可

          @Testpublic void test() {  streamBridge.send("source1-out-0","測試消息");}

          這樣我們就已經(jīng)完成了消息的發(fā)送,消費者還是用上面的消費函數(shù)即可。

          總結(jié)

          不得不說集成 SpringCloud Function 之后,消息的發(fā)送和接收又邁進(jìn)了一個嶄新的階段,但 <functionName> + -in- + <index> 這樣的配置規(guī)約我覺得讓我有些難受......甚至目前我認(rèn)為 3.1 之前被廢棄的注解方式也許更適合我們開發(fā)使用。

          結(jié)語

          去年在生產(chǎn)項目中使用 SpringCloud Stream 的時候它才只支持 RabbitMQ 和 Kafka,但現(xiàn)在幾乎所有流行的消息中間件都開發(fā)了 Binder 去適配它,這也說明了它一統(tǒng)江湖的趨勢。

          雖然我一直推崇技術(shù)的更新迭代,但這次我也要由衷的提醒,如果是新項目我們可以去嘗試引入使用,如果是老項目更新技術(shù)組件,還是要慎重,畢竟 SpringCloud Stream 涉及的一套技術(shù)體系太多,太復(fù)雜,本文僅僅是 SpringCloud Stream 的冰山一角。我們目前并不能很好的駕馭它,但我仍然相信它以后會成為消息中間件的對接主流!

          我們能夠看到 SpringCloud Stream 的一套技術(shù)體系試圖把消息驅(qū)動推動到一個更高的層次,但就目前實際使用情況來看我覺得這個目標(biāo)還是有些遙遠(yuǎn)......


          瀏覽 59
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  抠骚逼| 人人摸人人爱 | 大香蕉在线大香蕉国产 | 婷婷五月天97 | 日韩日屄视频 |