SpringCloud Stream 沒人用?
簡介
SpringCloud Stream 是一個用于構(gòu)建與共享消息系統(tǒng)連接的高度可擴(kuò)展的事件驅(qū)動微服務(wù)組件。它提供了一個靈活的編程模型,基于Spring Boot 建立獨立的生產(chǎn)級 Spring 應(yīng)用程序,并使用 Spring Integration 提供與消息代理的連接可以讓我們在使用時幾乎無需關(guān)心具體的消息隊列實現(xiàn)。它屏蔽底層消息中間件的差異,降低切換成本,統(tǒng)一消息的編程模型,讓開發(fā)人員能夠更多的關(guān)注自己的業(yè)務(wù)。
架構(gòu)模型

或許我們也可以看一個更為簡潔的圖

我們可以看到,每個系統(tǒng)只依賴于自己的 Binder 和消息中間件或者說其他系統(tǒng)交互, Stream 隱藏了所有消息的發(fā)送細(xì)節(jié),對于它來說只關(guān)心三個核心模塊
Destination Binders:目標(biāo)綁定器,告訴 Stream 你需要綁定到哪個消息隊列服務(wù)的
Binder實現(xiàn)即可。例如RabbitMQ還是Kafka的Binder?這是它的核心構(gòu)建塊,負(fù)責(zé)支持和提供與我們擁有的外部系統(tǒng)或外部消息傳遞系統(tǒng)的集成Destination Bindings:目的地綁定,把消息生產(chǎn)者和消費者之間的橋梁提供給 Stream 。例如對于
RabbitMQ來說,你需要告訴 Stream 當(dāng)前系統(tǒng)發(fā)送消息所使用的的channel -> exchange -> routingKey -> queue分別是什么(當(dāng)然這些都是在配置文件中完成的)Message:就是我們需要發(fā)送的消息
對于任何消息來說,只需要提供上述三個核心模塊即可,我們無需去關(guān)心發(fā)送的細(xì)節(jié)。
直至 SpringCloud Stream 3.2.1 版本,它已經(jīng)支持了幾乎所有市面上流行的消息隊列產(chǎn)品。RabbitMQ、Kafka、RocketMQ、AWS SNS/SQS 等等,主要是因為這種一統(tǒng)江湖的趨勢讓不同的消息中間件廠商都開發(fā)了自己的綁定器 Binder 提供給 SpringCloud Stream。
初體驗
下面以 RabbitMQ 為例體驗一下 Stream 消息驅(qū)動開發(fā)。首先我們需要引入依賴
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
接下來我們需要在配置文件中指定相關(guān)配置,在此之前請確保你對 RabbitMQ 中的組件有一個基本的認(rèn)識,否則請先閱讀 RabbitMQ 基礎(chǔ)篇
生產(chǎn)者
配置文件:
spring:#消息隊列地址rabbitmq:host: 129.204.178.49 #你的 rabbitmq 服務(wù)地址port: 5672username: guestpassword: guestcloud:stream: #SpringCloud Stream 配置bindings:output-channel-demo: # channel 消息輸出通道destination: demo-exchange # 交換機(jī)binder: demo-binder # 綁定器binders:demo-binder: #綁定器type: rabbit # rabbitmqrabbit:bindings:output-channel-demo: # channel 消息輸出通道producer: # 生產(chǎn)者routing-key-expression: '''demoRoutingKey'''
聲明輸出通道
/*** 聲明消息輸出通道 channel* */public interface MessageSource {@Output("output-channel-demo")MessageChannel output();}
定義一個通道綁定類
/*** 該注解用來指定一個或多個定義了 @Input 或 @Output 注解的接口,以此實現(xiàn)對消息通道(Channel)的綁定* */public class MessageSourceHandler {}
接下來我們寫一個集成測試發(fā)送消息即可
@AutowiredMessageSource messageSource;/** 發(fā)送消息測試 */@Testpublic void test() {messageSource.output().send(MessageBuilder.withPayload("測試消息").build());}
此時消息就成功的發(fā)送出去了,接下來我們來寫消費者
消費者
配置文件
spring:cloud:stream:binders:: #綁定器type: rabbit #rabbitmqrabbit:bindings:: #消息輸入通道 channelconsumer:: 'demoRoutingKey'bindings:: #消息輸入通道 channelgroup: someGroup #防止多個消費者實例重復(fù)接收消息,這樣一條消息只會發(fā)送給相同組的其中一個實例destination: demo-exchange #交換機(jī)binder: demo-binder #綁定器rabbitmq:host: 129.204.178.49port: 5672username: guestpassword: guest
聲明輸入通道
/*** 聲明消息輸入通道 channel* */public interface MessageSink {@Input("input-channel-demo")SubscribableChannel input();}
聲明綁定類
@EnableBinding(MessageSink.class)public class MessageSinkHandler {/*** 監(jiān)聽 input-channel-demo 通道的消息,該 @StreamListener 注解支持 SPEL 表達(dá)式,但是被標(biāo)注的方法不能有返回值* */@StreamListener("input-channel-demo")public void consume(String message){System.out.println("接受到消息:"+message);}}
這樣一個完整的 SpringCloud Stream 微服務(wù)消息驅(qū)動的 demo 就完成了,啟動應(yīng)用,消費者能成功的收到生產(chǎn)者發(fā)送的測試消息。要用好 SpringCloud Stream 你必須弄懂配置文件的內(nèi)容!
GitHub 源碼地址 https://github.com/yanzhisishui/springcloud-stream.git ,或者微信公眾號回復(fù) “SpringCloud Stream 入門案例源碼”。
發(fā)送延遲消息
在 SpringCloud Stream 中發(fā)送延遲消息非常簡單,首先我們需要在生產(chǎn)者、消費者的配置文件中指定交換機(jī)的類型是延遲交換機(jī)
rabbit:bindings:input-channel-demo: #消息輸入通道 channelconsumer:delayed-exchange: truebinding-routing-key: 'demoRoutingKey'
生產(chǎn)者一樣,這里省略。然后只需要在上面發(fā)送的代碼中加一個 header 即可
//設(shè)置消息30秒后發(fā)送到消費者messageSource.output().send(MessageBuilder.withPayload("測試消息").setHeader("x-delay",30 * 1000).build());
如果你發(fā)送延遲消息拋出 unknown exchange type 'x-delayed-message' 異常,那么是因為你的 RabbitMQ 服務(wù)沒有安裝延遲隊列插件。去官網(wǎng)安裝一下即可
這樣一個延遲消息的業(yè)務(wù)就實現(xiàn)了,看到這里你會發(fā)現(xiàn)使用 SpringCloud Stream 整合消息很簡單,例如實際上對于整合 RabbitMQ 來說,幾乎所有的配置都在 RabbitConsumerProperties、RabbitProductProperties 中,生產(chǎn)者和消費者共有的屬性在它們的父類 RabbitCommonProperties中。幾乎 RabbitMQ 的所有特性和功能都可以直接在配置文件中完成。作者能力有限,其他高級特性配置詳情可以參考官網(wǎng) RabbitMQ Consumer Properties (https://docs.spring.io/spring-cloud-stream-binder-rabbit/docs/3.2.1/reference/html/spring-cloud-stream-binder-rabbit.html#_rabbitmq_consumer_properties )。
但如果你真這么覺得那你就大錯特錯了,正如 SpringBoot ,用起來很簡單可能只需要花費 20% 的精力,但是想玩的好,可能要付出 200% 的精力。SpringCloud Stream 其實包含了一系列復(fù)雜技術(shù)體系,Spring Intergration、Spring Message、Spring AMQP 等等,其內(nèi)部原理實現(xiàn)、組件的集成非常復(fù)雜。
我想 SpringCloud Stream 出生這么久還不廣泛流行的原因之一就是,這一套技術(shù)體系涉及的東西太多了,萬一生產(chǎn)環(huán)境出現(xiàn)什么疑難雜癥,需要去閱讀源碼解決的話,這樣的技術(shù)工作量是很超出預(yù)期的。
Spring Message
Spring Message 是 Spring Framework 的一個子模塊,它定義了消息的統(tǒng)一編程模型,實際上 SpringCloud Stream 也是基于它實現(xiàn)的統(tǒng)一。

Spring Message 定義了上圖的消息編程模型,提出了通道 Channel 和 消息 Message 的抽象,所有的消息都由生產(chǎn)者發(fā)送到輸出通道 Output 中給消息中間件,然后所有的消費者都從輸入通道 Input 中獲取消息,而消息 Message 本身由兩部分組成,消息頭 header 和 消息體 payload。
在上述的 初體驗 中,我們涉及到的幾個核心注解正是該模型的體現(xiàn)
@Output:代表輸出通道,生產(chǎn)者從這發(fā)出消息
@Input:代表輸入通道,消費者從這讀取消息
@EnableBinding:將定義通道的接口綁定到某個
Bean以便于我們可以通過該Bean操作通道進(jìn)行發(fā)送和接收消息。@StreamListener:訂閱輸入通道中的消息
SpringCloud Function 函數(shù)式編程
在 SpringCloud Stream 3.1 版本之后,你會發(fā)現(xiàn) @EnableBinding 等幾個核心注解被官方標(biāo)注廢棄了,這是因為官方推出了更新的函數(shù)式編程模型 SpringCloud Function,試圖用這個組件將編程推向一個更高的層次。本篇文章不詳細(xì)介紹該組件,簡單介紹在 SpringCloud Stream 中如何結(jié)合 SpringCloud Function 進(jìn)行消息發(fā)送和消費。
在結(jié)合 SpringCloud Function 時消息的通道命名要遵循以下約定
輸入 :
<functionName> + -in- + <index>輸出 :
<functionName> + -out- + <index>
index 代表輸入或輸出綁定的索引,目前我們直接寫 0 即可。
任務(wù)型消息
參考官方文檔 Suppliers (Sources) (https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#_suppliers_sources),我們開始寫一個生產(chǎn)者的發(fā)送消息方法。
public Supplier<String> source1() {return () -> "測試定時消息";}
然后根據(jù)通道規(guī)則在 application.yml 中配置通道名為 source1-out-0,再配置 spring.cloud.function.destination = source1 ,指定 function 的函數(shù)方法名。
接下來我們開始寫消費者,同樣我們需要一個消費方法。
public Consumer<String> sink1() {return message -> System.out.println("收到消息:" + message);}
然后根據(jù)通道規(guī)則把配置文件中的通道名改為 sink1-in-0。這樣一個簡單的定時消息的發(fā)送和接收就完成了,生產(chǎn)者會每秒給消費者發(fā)送一條消息,不得不說,SpringCloud Stream 和 SpringCloud Function 的集成真的是......太神奇了。
業(yè)務(wù)觸發(fā)型消息
但通常我們更多的應(yīng)用場景是業(yè)務(wù)觸發(fā)發(fā)送消息,所以 SpringCloud Stream 給我們提供了一個 StreamBridge 組件。使用它發(fā)送消息只要指定通道名即可
public void test() {streamBridge.send("source1-out-0","測試消息");}
這樣我們就已經(jīng)完成了消息的發(fā)送,消費者還是用上面的消費函數(shù)即可。
總結(jié)
不得不說集成 SpringCloud Function 之后,消息的發(fā)送和接收又邁進(jìn)了一個嶄新的階段,但 <functionName> + -in- + <index> 這樣的配置規(guī)約我覺得讓我有些難受......甚至目前我認(rèn)為 3.1 之前被廢棄的注解方式也許更適合我們開發(fā)使用。
結(jié)語
去年在生產(chǎn)項目中使用 SpringCloud Stream 的時候它才只支持 RabbitMQ 和 Kafka,但現(xiàn)在幾乎所有流行的消息中間件都開發(fā)了 Binder 去適配它,這也說明了它一統(tǒng)江湖的趨勢。
雖然我一直推崇技術(shù)的更新迭代,但這次我也要由衷的提醒,如果是新項目我們可以去嘗試引入使用,如果是老項目更新技術(shù)組件,還是要慎重,畢竟 SpringCloud Stream 涉及的一套技術(shù)體系太多,太復(fù)雜,本文僅僅是 SpringCloud Stream 的冰山一角。我們目前并不能很好的駕馭它,但我仍然相信它以后會成為消息中間件的對接主流!
我們能夠看到 SpringCloud Stream 的一套技術(shù)體系試圖把消息驅(qū)動推動到一個更高的層次,但就目前實際使用情況來看我覺得這個目標(biāo)還是有些遙遠(yuǎn)......
