<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spring Cloud Stream 的基本架構(gòu)

          共 5526字,需瀏覽 12分鐘

           ·

          2021-03-18 09:41

          點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”

          優(yōu)質(zhì)文章,第一時(shí)間送達(dá)

          接著上篇內(nèi)容(事件驅(qū)動(dòng)架構(gòu)的基本原理,以及 Spring 中對(duì)消息傳遞機(jī)制的抽象和對(duì)應(yīng)的開發(fā)框架)繼續(xù)說,


          要想在 SpringHealth 案例系統(tǒng)中添加消息發(fā)送和接收的效果有很多種實(shí)現(xiàn)方法,完全可以直接使用諸如RocketMQ、RabbitMQ、Kafka 等消息中間件來實(shí)現(xiàn)消息傳遞,考慮不同框架的使用方式以及框架之間存在的功能差異性。

          而 Spring Cloud Stream ,它在內(nèi)部整合了多款主流的消息中間件,提供了一個(gè)平臺(tái)型解決方案,從而屏蔽各個(gè)消息中間件在技術(shù)實(shí)現(xiàn)上的差異。


          Spring Cloud Stream 的基本架構(gòu),并給出它與目前主流的各種消息中間件之間的整合機(jī)制。


          Spring Cloud Stream 構(gòu)建消息傳遞機(jī)制的基本工作流程。區(qū)別于直接使用 RabbitMQ、Kafka 、RocketMQ等消息中間件,Spring Cloud Stream 在消息生產(chǎn)者和消費(fèi)者之間添加了一種橋梁機(jī)制,所有的消息都將通過 Spring Cloud Stream 進(jìn)行發(fā)送和接收,如下圖所示:

          =Spring Cloud Stream 工作流程圖=

          圖中,看出 Spring Cloud Stream 具備四個(gè)核心組件,分別是 Binder、Channel、Source 和 Sink,其中 Binder 和 Channel 成對(duì)出現(xiàn),而 Source 和 Sink 分別面向消息的發(fā)布者和消費(fèi)者。


          Source 和 Sink

          在 Spring Cloud Stream 中,Source 組件是真正生成消息的組件,相當(dāng)于是一個(gè)輸出(Output)組件。而 Sink 則是真正消費(fèi)消息的組件,相當(dāng)于是一個(gè)輸入(Input)組件。根據(jù)對(duì)事件驅(qū)動(dòng)架構(gòu)的了解,對(duì)于同一個(gè) Source 組件而言,不同的微服務(wù)可能會(huì)實(shí)現(xiàn)不同的 Sink 組件,分別根據(jù)自身需求進(jìn)行業(yè)務(wù)上的處理。


          在 Spring Cloud Stream 中,Source 組件使用一個(gè)普通的 POJO 對(duì)象來充當(dāng)需要發(fā)布的消息,通過將該對(duì)象進(jìn)行序列化(默認(rèn)的序列化方式是 JSON)然后發(fā)布到 Channel 中。另一方面,Sink 組件監(jiān)聽 Channel 并等待消息的到來,一旦有可用消息,Sink 將該消息反序列化為一個(gè) POJO 對(duì)象并用于處理業(yè)務(wù)邏輯。


          Channel

          Channel 的概念比較容易理解,就是常見的通道,是對(duì)隊(duì)列的一種抽象。在消息傳遞系統(tǒng)中,隊(duì)列的作用就是實(shí)現(xiàn)存儲(chǔ)轉(zhuǎn)發(fā)的媒介,消息生產(chǎn)者所生成的消息都將保存在隊(duì)列中并由消息消費(fèi)者進(jìn)行消費(fèi)。通道的名稱對(duì)應(yīng)的往往就是隊(duì)列的名稱。

          Binder

          Spring Cloud Stream 中最重要的概念就是 Binder。所謂 Binder,顧名思義就是一種黏合劑,將業(yè)務(wù)服務(wù)與消息傳遞系統(tǒng)黏合在一起。通過 Binder,可以很方便地連接消息中間件,可以動(dòng)態(tài)的改變消息的目標(biāo)地址、發(fā)送方式而不需要了解其背后的各種消息中間件在實(shí)現(xiàn)上的差異。

          Spring Cloud Stream 集成 Spring 消息處理機(jī)制

          Spring Cloud Stream 中Source 和 Sink 都是接口,其中 Source 接口的定義如下:

          import org.springframework.cloud.stream.annotation.Output;
          import org.springframework.messaging.MessageChannel;
            
          public interface Source {
           
              String OUTPUT = "output";
           
              @Output(Source.OUTPUT)
              MessageChannel output();
          }

          注意到這里通過 MessageChannel 來發(fā)送消息,而 MessageChannel 類來自 Spring Messaging 組件。在 MessageChannel 上發(fā)現(xiàn)了一個(gè) @Output 注解,該注解定義了一個(gè)輸出通道。


          類似的,Sink 接口定義如下:

          import org.springframework.cloud.stream.annotation.Input;
          import org.springframework.messaging.SubscribableChannel;
            
          public interface Sink{
           
              String INPUT = "input";
           
              @Input(Sink.INPUT)
              SubscribableChannel input();
          }

          同樣,這里通過 Spring Messaging 中的 SubscribableChannel 來實(shí)現(xiàn)消息接收,而 @Input 注解定義了一個(gè)輸入通道。


          注意到 @Input 和 @Output 注解使用通道名稱作為參數(shù),如果沒有名稱,會(huì)使用帶注解的方法名字作為參數(shù),也就是默認(rèn)情況下分別使用“input”和“output”作為通道名稱。從這個(gè)角度講,一個(gè) Spring Cloud Stream 應(yīng)用程序中的 Input 和 Output 通道數(shù)量和名稱都是可以任意設(shè)置的,只需要在這些通道的定義上添加 @Input 和 @Output 注解即可。例如在如下所示的代碼中,定義了 SpringHealthChannel 接口并聲明了一個(gè) Input 通道和兩個(gè) Output 通道,說明使用該通道的服務(wù)會(huì)從外部的一個(gè)通道中獲取消息并向外部的兩個(gè)通道發(fā)送消息:

          public interface SpringHealthChannel {
            
               @Input
               SubscribableChannel input1();
            
               @Output
               MessageChannel output1();
            
               @Output
               MessageChannel output2();
          }

          可以看到上述接口定義中同時(shí)使用到了 Spring Messaging 中的 SubscribableChannel 和 MessageChannel。Spring Cloud Stream 對(duì) Spring Messaging 和 Spring Integration 提供了原生支持。在常規(guī)情況下,不需要使用這些框架中提供的API就能完成常見的開發(fā)需求。但如果確實(shí)有需要,也可以使用更為底層 API 直接操控消息發(fā)布和接收過程。


          Spring Cloud Stream 集成消息中間件


          Spring Cloud Stream ,最核心的無疑是 Binder 組件。


          Binder 組件是服務(wù)與消息中間件之間的一層抽象,各種消息中間件在消息傳遞機(jī)制的設(shè)計(jì)和實(shí)現(xiàn)上存在一定的差異性,我將梳理 Spring Cloud Stream 中的消息傳遞模型,并給出 Binder 與消息中間件如何進(jìn)行整合的過程。


          == =Spring Cloud Stream 中的消息傳遞模型= ==


          Stream 將消息發(fā)布和消費(fèi)抽象成如下三個(gè)核心概念,并結(jié)合目前主流的一些消息中間件對(duì)這些概念提供了統(tǒng)一的實(shí)現(xiàn)方式。


          1、發(fā)布-訂閱模型

          點(diǎn)對(duì)點(diǎn)模型和發(fā)布-訂閱模型是傳統(tǒng)消息傳遞系統(tǒng)的兩大基本模型,其中點(diǎn)對(duì)點(diǎn)模型實(shí)際上可以被視為發(fā)布-訂閱模型在訂閱者數(shù)量為 1 時(shí)的一種特例。Stream 中,統(tǒng)一通過發(fā)布-訂閱模型完成消息的發(fā)布和消費(fèi),如下所示:

          消息發(fā)布-訂閱模型示意圖


          2、消費(fèi)者組

          設(shè)計(jì)消費(fèi)者組(Consumer Group)的目的是應(yīng)對(duì)集群環(huán)境下的多服務(wù)實(shí)例問題。顯然,如果采用發(fā)布-訂閱模式就會(huì)導(dǎo)致一個(gè)服務(wù)的不同實(shí)例都消費(fèi)到了同一條消息。為了解決這個(gè)問題, Stream 中提供了消費(fèi)者組的概念。一旦使用了消費(fèi)組,一條消息就只能被同一個(gè)組中的某一個(gè)服務(wù)實(shí)例所消費(fèi)。消費(fèi)者的基本結(jié)構(gòu)如下圖所示(其中虛線表示不會(huì)發(fā)生的消費(fèi)場(chǎng)景):


          消費(fèi)者組結(jié)構(gòu)示意圖

          3. 消息分區(qū)


          假如希望相同的消息都被同一個(gè)微服務(wù)實(shí)例來處理,但又有多個(gè)服務(wù)實(shí)例組成了負(fù)載均衡結(jié)構(gòu),那么通過上述的消費(fèi)組概念仍然不能滿足要求。針對(duì)這一場(chǎng)景, Stream 又引入了消息分區(qū)(Partition)的概念。引入分區(qū)概念的意義在于,同一分區(qū)中的消息能夠確保始終是由同一個(gè)消費(fèi)者實(shí)例進(jìn)行消費(fèi)。盡管消息分區(qū)的應(yīng)用場(chǎng)景并沒有那么廣泛,但如果想要達(dá)到類似的效果, Stream 也為提供了一種簡(jiǎn)單的實(shí)現(xiàn)方案,消息分區(qū)的基本結(jié)構(gòu)如下圖所示:

          =消息分區(qū)結(jié)構(gòu)示意圖=


          Binder 與消息中間件


          Binder 組件本質(zhì)上是一個(gè)中間層,負(fù)責(zé)與各種消息中間件的交互。目前 Stream 中集成的消息中間件包括 RabbitMQ和Kafka。如何使用 Spring Cloud Stream 進(jìn)行消息發(fā)布和消費(fèi)之前,先來結(jié)合消息傳遞機(jī)制給出 Binder 對(duì)這兩種不同消息中間件的整合方式。


          • RabbitMQ

          RabbitMQ 是 AMQP(Advanced Message Queuing Protocol,高級(jí)消息隊(duì)列協(xié)議)協(xié)議的典型實(shí)現(xiàn)框架。在 RabbitMQ 中,核心概念是交換器(Exchange)。可以通過控制交換器與隊(duì)列之間的路由規(guī)則來實(shí)現(xiàn)對(duì)消息的存儲(chǔ)轉(zhuǎn)發(fā)、點(diǎn)對(duì)點(diǎn)、發(fā)布-訂閱等消息傳遞模型。在一個(gè) RabbitMQ 中可能會(huì)存在多個(gè)隊(duì)列,交換器如果想要把消息發(fā)送到具體某一個(gè)隊(duì)列,就需要通過兩者之間的綁定規(guī)則來設(shè)置路由信息。路由信息的設(shè)置是開發(fā)人員操控 RabbitMQ 的主要手段,而路由過程的執(zhí)行依賴于消息頭中的路由鍵(Routing Key)屬性。交換器會(huì)檢查路由鍵并結(jié)合路由算法來決定將消息路由到哪個(gè)隊(duì)列中去。下圖就是交換器與隊(duì)列之間的路由關(guān)系圖:

          =RabbitMQ 中交換器與隊(duì)列的路由關(guān)系圖=

          一條來自生產(chǎn)者的消息通過交換器中的路由算法可以發(fā)送給一個(gè)或多個(gè)隊(duì)列,從而分別實(shí)現(xiàn)點(diǎn)對(duì)點(diǎn)和發(fā)布訂閱功能。同時(shí),基于上圖也不難得出消費(fèi)者組的實(shí)現(xiàn)方案。因?yàn)?RabbitMQ 里每個(gè)隊(duì)列是被消費(fèi)者競(jìng)爭(zhēng)消費(fèi)的,所以指定同一個(gè)組的消費(fèi)者消費(fèi)同一個(gè)隊(duì)列就可以實(shí)現(xiàn)消費(fèi)者組。


          • Kafka

          在 Kafka 中,生產(chǎn)者使用推模式將消息發(fā)布到服務(wù)器,而消費(fèi)者使用拉模式從服務(wù)器訂閱消息。在 Kafka 中還使用到了 Zookeeper,其作用在于實(shí)現(xiàn)服務(wù)器與消費(fèi)者之間的負(fù)載均衡,所以啟動(dòng) Kafka 之前必須確保 Zookeeper 正常運(yùn)行。同時(shí),Kafka 也實(shí)現(xiàn)了消費(fèi)者組機(jī)制,如下圖所示:

          =Kafka 消費(fèi)者分組=


          多個(gè)消費(fèi)者構(gòu)成了一種組結(jié)構(gòu),消息只能傳輸給某個(gè)組中的某一個(gè)消費(fèi)者。也就是說,Kafka 中消息的消費(fèi)具有顯式的分布式特性,天生就內(nèi)置了 Spring Cloud Stream 中的消費(fèi)組概念。


          Spring Cloud Stream 該框架的核心優(yōu)勢(shì)在于在內(nèi)部集成了 RabbitMQ、Kafka 、RocketMQ等主流消息中間件.而對(duì)外則提供了統(tǒng)一的 API 接入層。


          消費(fèi)者組: 用于分布式多實(shí)例同一條消息只需要一個(gè)實(shí)例處理消息的場(chǎng)景, 同一消費(fèi)組像分工合作一起消費(fèi)消息,避免重復(fù)消費(fèi)。


          消費(fèi)分區(qū): 用于同一分區(qū)只能被同一實(shí)例處理的場(chǎng)景,一個(gè)分區(qū)只會(huì)被一個(gè)消費(fèi)者消費(fèi),

          所以分區(qū)還可以做分區(qū)內(nèi)消息有序。但同一消費(fèi)組無法做消息有序。


          同時(shí),針對(duì)消費(fèi)組實(shí)例數(shù),調(diào)整分區(qū)數(shù)還能提高一定并發(fā)量(具體可能結(jié)合生產(chǎn)者消費(fèi)者自己業(yè)務(wù)瓶頸考慮)。

          一個(gè)消費(fèi)者可以消費(fèi)多個(gè)分區(qū),但一個(gè)分區(qū)只能被一個(gè)消費(fèi)者消費(fèi)。

          ————————————————

          版權(quán)聲明:本文為CSDN博主「Ssssngnth 奕飛」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接及本聲明。

          原文鏈接:

          https://blog.csdn.net/Rinvay_Cui/article/details/114684962





          粉絲福利:Java從入門到入土學(xué)習(xí)路線圖

          ??????

          ??長(zhǎng)按上方微信二維碼 2 秒


          感謝點(diǎn)贊支持下哈 

          瀏覽 126
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  www.青青草 | 豆花AV一区二区无码免费看 | 二区三区免费视频 | 日本三级中国三级99 | gogo大胆无码无码免费衩频 |