Stream 消息驅(qū)動(dòng)
點(diǎn)擊上方“程序員大白”,選擇“星標(biāo)”公眾號(hào)
重磅干貨,第一時(shí)間送達(dá)

一、什么是Spring Cloud Stream?
官方定義Spring Cloud Stream是一個(gè)構(gòu)建消息驅(qū)動(dòng)微服務(wù)的框架。
應(yīng)用程序通過inputs或者 outputs 來與Spring Cloud Stream中binder對(duì)象交互。
通過我們配置來binding(綁定),而Spring Cloud Stream 的binder對(duì)象負(fù)責(zé)與消息中間件交互。所以,我們只需要搞清楚如何與Spring Cloud Stream交互就可以方便使用消息驅(qū)動(dòng)的方式。
通過使用Spring Integration來連接消息代理中間件以實(shí)現(xiàn)消息事件驅(qū)動(dòng)。
Spring Cloud Stream為一些供應(yīng)商的消息中間件產(chǎn)品提供了個(gè)性化的自動(dòng)化配置實(shí)現(xiàn),引用了發(fā)布-訂閱、消費(fèi)組、分區(qū)的三個(gè)核心概念。
目前僅支持RabbitMQ、 Kafka。
二、Stream的設(shè)計(jì)思想
1、標(biāo)準(zhǔn)MQ

生產(chǎn)者/消費(fèi)者之間靠消息媒介傳遞信息內(nèi)容
消息必須走特定的通道 - 消息通道 Message Channel
消息通道里的消息如何被消費(fèi)呢,誰負(fù)責(zé)收發(fā)處理 - 消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息處理器所訂閱。
2、為什么用Cloud Stream?
比方說我們用到了RabbitMQ和Kafka,由于這兩個(gè)消息中間件的架構(gòu)上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分區(qū)。

這些中間件的差異性導(dǎo)致我們實(shí)際項(xiàng)目開發(fā)給我們?cè)斐闪艘欢ǖ睦_,我們?nèi)绻昧藘蓚€(gè)消息隊(duì)列的其中一種,后面的業(yè)務(wù)需求,我想往另外一種消息隊(duì)列進(jìn)行遷移,這時(shí)候無疑就是一個(gè)災(zāi)難性的,一大堆東西都要重新推倒重新做,因?yàn)樗覀兊南到y(tǒng)耦合了,這時(shí)候Spring Cloud Stream給我們提供了—種解耦合的方式。
3、Stream憑什么可以統(tǒng)一底層差異?
在沒有綁定器這個(gè)概念的情況下,我們的SpringBoot應(yīng)用要直接與消息中間件進(jìn)行信息交互的時(shí)候,由于各消息中間件構(gòu)建的初衷不同,它們的實(shí)現(xiàn)細(xì)節(jié)上會(huì)有較大的差異性通過定義綁定器作為中間層,完美地實(shí)現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離。通過向應(yīng)用程序暴露統(tǒng)一的Channel通道,使得應(yīng)用程序不需要再考慮各種不同的消息中間件實(shí)現(xiàn)。
4、通過定義綁定器Binder作為中間層,實(shí)現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離。
Binder:
INPUT對(duì)應(yīng)于消費(fèi)者
OUTPUT對(duì)應(yīng)于生產(chǎn)者

Stream中的消息通信方式遵循了發(fā)布-訂閱模式
Topic主題進(jìn)行廣播
在RabbitMQ就是Exchange
在Kakfa中就是Topic
三、Stream編碼常用注解簡(jiǎn)介
1. Spring Cloud Stream標(biāo)準(zhǔn)流程套路


Binder - 很方便的連接中間件,屏蔽差異。
Channel - 通道,是隊(duì)列Queue的一種抽象,在消息通訊系統(tǒng)中就是實(shí)現(xiàn)存儲(chǔ)和轉(zhuǎn)發(fā)的媒介,通過Channel對(duì)隊(duì)列進(jìn)行配置。
Source和Sink - 簡(jiǎn)單的可理解為參照對(duì)象是Spring Cloud Stream自身,從Stream發(fā)布消息就是輸出,接受消息就是輸入。
2. 編碼API和常用注解

四、Stream之消息重復(fù)消費(fèi)
運(yùn)行后有兩個(gè)問題
有重復(fù)消費(fèi)問題
消息持久化問題
消費(fèi)
http://localhost:8801/sendMessage
目前是8802/8803同時(shí)都收到了,存在重復(fù)消費(fèi)問題
如何解決:分組和持久化屬性group(重要)
生產(chǎn)實(shí)際案例
比如在如下場(chǎng)景中,訂單系統(tǒng)我們做集群部署,都會(huì)從RabbitMQ中獲取訂單信息,那如果一個(gè)訂單同時(shí)被兩個(gè)服務(wù)獲取到,那么就會(huì)造成數(shù)據(jù)錯(cuò)誤,我們得避免這種情況。這時(shí)我們就可以使用Stream中的消息分組來解決。

注意在Stream中處于同一個(gè)group中的多個(gè)消費(fèi)者是競(jìng)爭(zhēng)關(guān)系,就能夠保證消息只會(huì)被其中一個(gè)應(yīng)用消費(fèi)一次。不同組是可以全面消費(fèi)的(重復(fù)消費(fèi))。
五、Stream之group解決消息重復(fù)消費(fèi)
1. 原理
微服務(wù)應(yīng)用放置于同一個(gè)group中,就能夠保證消息只會(huì)被其中一個(gè)應(yīng)用消費(fèi)一次。
不同的組是可以重復(fù)消費(fèi)的,同一個(gè)組內(nèi)會(huì)發(fā)生競(jìng)爭(zhēng)關(guān)系,只有其中一個(gè)可以消費(fèi)。
8802/8803都變成不同組,group兩個(gè)不同
group: A_Group、B_Group
六、Stream之消息持久化
通過上述,解決了重復(fù)消費(fèi)問題,再看看持久化。
停止8802/8803并去除掉8802的分組group: A_Group,8803的分組group: A_Group沒有去掉。
8801先發(fā)送4條消息到RabbitMq。
先啟動(dòng)8802,無分組屬性配置,后臺(tái)沒有打出來消息。
再啟動(dòng)8803,有分組屬性配置,后臺(tái)打出來了MQ上的消息。(消息持久化體現(xiàn))
source:https://www.yuque.com/yanzipang-wf7ur/hkyrfw/vbkxz8
推薦閱讀
關(guān)于程序員大白
程序員大白是一群哈工大,東北大學(xué),西湖大學(xué)和上海交通大學(xué)的碩士博士運(yùn)營(yíng)維護(hù)的號(hào),大家樂于分享高質(zhì)量文章,喜歡總結(jié)知識(shí),歡迎關(guān)注[程序員大白],大家一起學(xué)習(xí)進(jìn)步!
