不會還有人不懂Stream源碼吧?10年架構師帶你一次性搞懂
Stream源碼解析
Spring Cloud Stream(簡稱SCS)提供了一系列預先定義的注解來聲明輸入型和輸出型Channel,業(yè)務系統(tǒng)基于這些Channel與消息中間件進行通信,而不是直接與具體的消息中間件進行通信。跟蹤SCS的源碼就會發(fā)現,Stream有很多外部依賴,最主要的就是Messaging和Integration兩個項目,所以在講解SCS源碼前,有必要先介紹一下Messaging和Integration與SCS體系的關系。
SCS的目標是建立一套統(tǒng)一的基于注解的消息發(fā)送機制,屏蔽開發(fā)人員直接與底層消息系統(tǒng)進行細節(jié)交互,而Messaging模塊正是Spring框架中用來做統(tǒng)一消息編程模型的,在Messaging中最關鍵的數據結構是Message,代碼如下:

在Messaging模塊中消息通道MessageChannel是一個接口類,用于發(fā)送Message消息,可以理解為Messaging模塊中的標準接口,類似于J2EE中的Servlet接口,具體實現類可以實現具體消息通道。下面是MessageChannel的代碼:

在Messaging模塊中,消息通道的子接口SubscribableChannel繼承了MessageHandler消息處理器:

由MessageHandler真正地消費/處理消息:

Integration基于Spring框架可以實現輕量級的消息傳遞,也是對Messaging的擴展實現,支持通過聲明適配器與SCS集成。它實現了消息 過 濾 、 消 息 轉 換 、 消 息 聚 合 和 消 息 分 割 等 功 能 , 提 供 了 對MessageChannel 和 MessageHandler 的 實 現 , 包 括 DirectChannel 、ExecutorChannel、PublishSubscribeChannel,以及MessageFilter、ServiceActivatingHandler、MethodInvokingSplitter等。下面介紹Integration 中 的 兩 種 消 息 分 發(fā) 器 :DirectChannel 和PublishSubscribeChannel。

從代碼可知,DirectChannel內部的UnicastingDispatcher類型分發(fā)器會發(fā)到對應消息通道的MessageChannel中,從名字也可以看出來,UnicastingDispatcher是一個單播的分發(fā)器,只能選擇一個消息通道。而PublishSubscribeChannel使用BroadcastingDispatcher作為廣播消息分發(fā)器,會把消息分發(fā)給所有的MessageHandler。
SCS在Integration的集成上進行了封裝,通過注解的方式和統(tǒng)一的API進行消息的發(fā)送和消費,底層消息中間件的實現細節(jié)由各個消息中間件的Binder完成,同時,通過與Spring Boot的ExternalizedConfiguration整合,SCS提供了BindingProperties等外部化配置類,這些具體的配置信息將綁定到具體的消息中間件的配置類中。
SCS的架構流程圖
下面是SCS的架構流程圖,我們會從幾個層次分別講解其中相關聯的源碼和它們之間的交互關系。

應用層
SCS為用戶提供了三個綁定消息通道的默認實現。
● Sink:通過指定消費消息的目標來標識消息消費者。
● Source:與Sink相反,用于標識消息生產者。
● Processor:集成了Sink和Source的功能,用于標識消息生產者和消費者。

對 應 用 而 言 , 想 要 啟 動 SCS 的 功 能 , 需 要 先 啟 動 注 解 。
@EnableBinding注解是Stream框架運轉的起點,通過這個注解可以實現動態(tài)注冊BeanDefinition,它會將消息通道綁定到自己修飾的目標實例上,從而讓這些實例具備與消息隊列進行交互的能力。下面我們看源碼:

●
BindingServiceConfiguration的 作 用 是 完 成BindingService、InputBindingLifecycle、OutputBindingLifecycle等重要Bean的初始化及相關配置文件加載。
● BindingBeansRegistrar的作用是注冊聲明通道的接口類的BeanDefinition,從而獲取這些接口類的實例,并使用這些實例進行消息的發(fā)送和接收,具體代碼實現如下:

registerBindingTargetBeanDefinitions方法會調用ReflectionUtils類完成掃描所有被注解@Input和@Output標注了的方法,然后注冊BeanDefinition。下面是代碼示例:


registerBindingTargetsQualifiedBeanDefinitions 是 在 注 冊registerBindingTargetBeanDefinitions 時 使 用 的 工 廠 類BeanDefinition,這個工廠類用來生成registerBindingTargetBeanDefinition注冊的Bean實例,如下所示:

Stream層
Stream 層 的 BindableProxyFactory 被 初 始 化 為 一 個rootBeanDefinition,并注冊為一個FactoryBean,這樣Spring容器就可 以 獲 得
registerBindingTargetBeanDefinitions 方 法 中 所 注 冊 的Bean實例(MessageChannel對象實例)。BindableProxyFactory可以說是SCS實現通道接口類聲明及相關類型的核心類,代碼如下:



afterPropertiesSet方法會處理所有被@Input和@Output注解的函數 , 并 將 生 成 函 數 返 回 類 型 實 例 存 儲 在 BoundTargetHolder 中 ,getBindingTargetName方法會返回
SubscribableChannelBindingTargetFactory 實 例 , 它 會 在createOutput方法中返回一個DirectChannel實例,該實例會被存儲起來供BindableProxyFactory使用。
名稱為output的BeanDefinition將BindableProxyFactory設置成其實例工廠類,并將outputMessagefunction方法設置成其實例的工廠函數(BeanFactoryMethod)。當Spring容器創(chuàng)建該實例時,會調用BindableProxyFactory 的 outputMessagefunction 方 法 , 由 于BindableProxyFactory實現了Methodlnterceptor接口,所以就調用了其invoke方法。invoke方法會從BindableProxyFactory緩存的Channel實例中匹配符合的實例方法,并反射調用。
BindingService是Stream層獲取綁定器和執(zhí)行綁定任務的一個重要類,首先我們看BindingService的bindProducer方法,代碼如下:

在 BindingService 實 現 中 , getBinder 方 法 最 終 會 調 用DefaultBinderFactory中的getBinder方法實現,我們可以看到,DefaultBinderFactory的作用就是獲取具體的Binder實現并提供給相應的MessageChannel實例。DefaultBinderFactory的初始化依賴于BinderTypeRegistry獲得的BinderType列表。DefaultBinderFactory的getBinder實現中會調用BinderConfiguration獲取對應的Binder實例 , 通 過 跟 蹤 BinderConfiguration 的 初 始 化 過 程 , 可 以 發(fā) 現BinderConfiguration 是 在
BinderFactoryConfiguration 執(zhí) 行getBinderConfiguration方法時將bindingServiceProperties變量中的BinderProperties與BinderTypeRegistry中的BinderType結合,封裝成BinderConfiguration對象。BinderProperties封裝了Stream從application.yml文件中讀取的關于Binder的配置信息,而BinderType則 是 具 體 Binder 的 實 現 類 信 息 。DefaultBinderFactory 的getBinderInstance實現如下:


這 里 的 getBinderInstance 方 法 中 會 生 成 一 個
ConfigurableApplicationContext 來 創(chuàng) 建 Binder 實 例 , 在 創(chuàng) 建ConfigurableApplicationContext實例時,它會將BinderConfiguration設置到SpringApplicationBuilder中。
ConfigurableApplicationContext調用getBinder方法時,會使用BinderConfiguration的屬性和配置生成BinderConfiguration中設置的具體類型的Binder實現。如果你使用的Binder是RabbitMQ,那么對應 的 RabbitServiceAutoConfiguration 會 自 動 初 始 化 并 加 載RabbitMessageChannelBinder實例。
在 Stream 層 對 Binder 實 例 的 初 始 化 工 作 都 完 成 后 , 再 回 到BindingService 的 bindProducer 方 法 實 現 , 它 會 調 用
AbstractMessagChannlBinder 的 doBindProducer 方 法 , 關 鍵 代 碼 如下:

從源碼可知,ProvisioningProvider是一個接口,不同的Binder實 現 可 以 根 據 接 口 實 現 各 自 不 同 的 ProducerDestination 和ConsumerDestination,代碼如下:

doBindProducer會調用
createProducerMessageHandler方法創(chuàng)建MessageHandler實例,MessageChannel會使用SendingHandler封裝后的MessageHandler實例,當有output消息時,將消息發(fā)送給最終的Binder實例。
通過上面的步驟,基本上在Stream層就完成了對生產者的綁定操作,消費者的綁定就是將SubscribableChannel與具體的消息隊列實現連接,doBindConsumer與doBindProducer流程類似。
首先通過ProvisioningProvider的
provisionConsumerDestination方法創(chuàng)建ConsumerDestination,然后調用createConsumerEndpoint方法創(chuàng)建MessageProducer實例,最后生成DefaultBinding實例,代碼如下:


Message/Integrate/消息中間件Binder層
從@Output注解可以看到,Stream框架會使用MessageChannel發(fā)送消 息 。通 過 BindingService 的 doBindProducer 方 法 創(chuàng) 建 并 綁 定SendingHandler對象,然后調用handleMessageInternal方法,它會將消息再發(fā)送給delegate對象處理。下面是SendingHandler對象的handleMessageInternal方法的代碼實現:

delegate是之前在BindingServer中抽象類
AbstractMessageChannelBinder執(zhí)行的createProducerMessageHandler方法返回的生產者MessageHandler實例。對于RabbitMQ Binder來說,就是rmqpOutboundEndpoint對象,該實 例 將 最 終 調 用 其 handlerMessage 方 法 , 該 方 法 進 一 步 調 用RabbitTemplate的send方法。消息發(fā)送流程如下圖所示。

消息的接收過程
消息的接收過程可以分為兩個階段:第一個階段是從RabbitMQ到SubscribableChannel的過程。我們從@Input注解可以看到,Stream框架 會 使 用 SubscribableChannel 接 收 消 息 。第 二 個 階 段 是 注 解@StreamListener告訴SubscribableChannel如何將消息發(fā)送給對應的Sink接收端對應的回調方法。
Spring的RabbitMQ使用InternalConsumer作為默認的消息消費方,當接收到對應消息后,會調用handleDelivery方法將RabbitMQ消息發(fā)送給BlockingQueueConsumer中的隊列。下面是handleDelivery的源碼實現。

AsyncMessageProcessingConsumer類是Runnable類型的,它會消費 阻 塞 隊 列 , 并 將 消 息 傳 給 AmqpInboundChannelAdapter 。
AmqpInboundChannelAdapter 實 例 是 在 BindingService 構 造createConsumerEndpoint時創(chuàng)建的consumerEndpoint,并將它與對應的Channel綁定。下面是AmqpInboundChannelAdapter的關鍵代碼,即processMessage方法,它會調用MessagingTemplate對象的send方法將消息發(fā)送給SubscribableChannel模塊。


下面就是消息處理的第二個階段,就是將SubscribableChannel中的 消 息 發(fā) 送 給 指 定 的 方 法 , 主 要 靠 @StreamListener 注 解 實 現 。
@StreamListener是注釋在消費方法上的注解,用來接收輸入型通道的消 息 , Stream 定 義 了
StreamListenerAnnotationBeanPostProcessor類,用來處理項目中的@SteamListener注解。
StreamListenerAnnotationBeanPostProcessor實現了BeanPostProcessor接口,用來在Bean初始化之前和之后兩個時間點對Bean實例進行處理。
postProcessAfterlnitialization是在Bean實例初始化之后被調用 的 方 法 , 它 會 遍 歷 Bean 實 例 中 的 所 有 函 數 , 處 理 那 些 被@StreamListener注解修飾的函數。
afterSingletonsInstantiated方法會遍歷mappedListenerMethods 對 應 的 所 有 Entry 對 象 , 為 每 一 個StreamListenerHandlerMethodMapping 創(chuàng) 建 一 個 MessageHandler 實例。然后根據條件生成DispatchingStreamListenerMessageHandler并注冊給SubscribableChannel。
下 面 是
StreamListenerAnnotationBeanPostProcessor 的 代 碼 實現:


當 SubscribableChannel 接 收 到 消 息 后 , 會 調 用
DispatchingStreamListenerMessageHandler類的handleRequestMessage方法,該方法會調用ConditionalStreamListenerHandler的handleMessage方法。
findMatchingHandlers方法根據
ConditionalStreamListenerHandler 的 Expression 實 例 來 判 斷ConditionalStreamListenerHandler是否適合處理當前這個消息,最終消息經過InvocableHandlerMethod傳遞給對應的函數。SCS消費消息的整體流程如下圖所示。

本文給大家講解的內容是MOM異步通信,Stream源碼解析
下篇文章給大家講解的內容是MOM異步通信,Stream應用進階
覺得文章不錯的朋友可以轉發(fā)此文關注小編;
感謝大家的支持!
本文就是愿天堂沒有BUG給大家分享的內容,大家有收獲的話可以分享下,想學習更多的話可以到微信公眾號里找我,我等你哦。
