微服務(wù)架構(gòu)開發(fā)實(shí)戰(zhàn):SpringCloudBus的設(shè)計(jì)原理
Spring Cloud Bus 設(shè)計(jì)原理
本節(jié)將介紹Spring Cloud Bus的設(shè)計(jì)原理。理解原理有利于更好地基于Spring Cloud Bus來進(jìn)行二次開發(fā)。

基于Spring Cloud Stream
Spring Cloud Bus是基于Spring Cloud Stream基礎(chǔ)之上而做的封裝。Spring Cloud Stream是Spring Cloud家族中一個(gè)構(gòu)建消息驅(qū)動(dòng)微服務(wù)的框架。
圖16-3所示的是來自官方的Spring Cloud Stream應(yīng)用模型。

在該應(yīng)用模型中可以發(fā)現(xiàn)Spring Cloud Stream的幾個(gè)核心概念。
1.Spring Cloud Stream Application
Application通過inputs或outputs來與SpringCloud Stream中的 Binder交互,通過配置來binding,而Spring Cloud Stream的 Binder負(fù)責(zé)與中間件交互。所以只需要搞清楚如何與Spring Cloud Stream交互就可以方便使用消息驅(qū)動(dòng)的方式。
2.Binder
Binder是Spring Cloud Stream 的一個(gè)抽象概念,是應(yīng)用與消息中間件之間的黏合劑。目前Spring Cloud Stream實(shí)現(xiàn)了Kafka和 Rabbit等消息中間件的 Binder。
通過Binder,可以很方便地連接消息中間件,可以動(dòng)態(tài)地改變消息的destinations(對應(yīng)于Kaf-ka 的 topic,Rabbit 的exchanges),這些都可以通過外部配置項(xiàng)做到。通過配置,不需要修改一行代碼,就能實(shí)現(xiàn)消息中間件的更換。
3.訂閱/發(fā)布
消息的發(fā)布(Publish)和訂閱(Subscribe)是事件驅(qū)動(dòng)的經(jīng)典模式,如圖16-4所示。SpringCloud Stream的數(shù)據(jù)交互也是基于這個(gè)思想。生產(chǎn)者把消息通過某個(gè)topic廣播出去(Spring CloudStream 中的destinations)。其他的微服務(wù)通過訂閱特定topic來獲取廣播出來的消息,以觸發(fā)業(yè)務(wù)的進(jìn)行。

這種模式極大地降低了生產(chǎn)者與消費(fèi)者之間的耦合。即使有新的應(yīng)用引入,也不需要破壞當(dāng)前系統(tǒng)的整體結(jié)構(gòu)。
4.消費(fèi)者分組
Spring Cloud Stream的意思基本與Kafka一致。為了防止同一個(gè)事件被重復(fù)消費(fèi),只要把這些應(yīng)用放置于同一個(gè)“group”中,就能夠保證消息只會(huì)被其中一個(gè)應(yīng)用消費(fèi)一次。
每個(gè)binding 都可以使用
spring.cloud.stream.bindings.

圖16-5展示了Stream 的消費(fèi)者分組設(shè)置,屬性值分別設(shè)置為
spring.cloud.stream.bind-ings.
5.持久化
消息事件的持久化是必不可少的。Spring Cloud Stream可以動(dòng)態(tài)地選擇一個(gè)消息隊(duì)列是否需要持久化。
6.Binding
Binding 是通過配置把應(yīng)用與Spring Cloud Stream的 Binder綁定在一起的,之后只需要修改Binding 的配置來達(dá)到動(dòng)態(tài)修改topic、exchange、type等一系列信息,而不需要修改一行代碼。
7.分區(qū)支持
Spring Cloud Stream支持在給定應(yīng)用程序的多個(gè)實(shí)例之間對數(shù)據(jù)進(jìn)行分區(qū)。在分區(qū)方案中,物理通信介質(zhì)(如topic)被視為多個(gè)分區(qū)。
Spring Cloud Stream為統(tǒng)一實(shí)現(xiàn)分區(qū)處理用例提供了一個(gè)通用抽象。無論代理本身是自然分區(qū)(如Kafka)還是非自然分區(qū)(如RabbitMQ),都可以使用分區(qū)。

Spring Cloud Bus的編程模型
當(dāng)微服務(wù)之間需要通信時(shí),先將消息傳遞給消息總線,而其他微服務(wù)實(shí)現(xiàn)接收消息總線分發(fā)信息。Spring Cloud Bus提供了簡化微服務(wù)發(fā)送和接收消息總線指令的能力。
1.AbstractBusEndpoint及其子類
通過這個(gè)接口來實(shí)現(xiàn)用戶的訪問,都需要繼承AbstractBusEndpoint。
以下是AbstractBusEndpoint.java的核心代碼。
package org.springframework.cloud.bus.endpoint;
import org.springframework.boot.actuate.endpoint.Endpoint;
import org.springframework.boot.actuate.endpoint.mvc.MvcEndpoint;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
public class AbstractBusEndpoint implements MvcEndpoint
private ApplicationEventPublisher context;
private BusEndpoint delegate;
private string appId;
public AbstractBusEndpoint(ApplicationEventPublisher context,String
appId,
BusEndpoint busEndpoint){
this.context =context;
this.apprd = appId;
this.delegate = busEndpoint;
}
protected string getInstanceId() {
return this.appld;
protected void publish(ApplicationEvent event)
context.publishEvent (event);
}
@override
public String getPath()
return "/"+this.delegate.getld();
}
override
public boolean issensitive({
return this.delegate.isSensitive(;
}
@override
@Suppresswarnings("rawtypes")
public Class getEndpointType()f
return this.delegate.getClass();
}
}最常用的AbstractBusEndpoint 的子類,莫過于EnvironmentBusEndpoint和RefreshBusEndpoint。
這兩個(gè)類分別實(shí)現(xiàn)了/bus/env和/bus/refresh的HTTP接口。
以下是
EnvironmentBusEndpoint.java的源碼。
package org.springframework.cloud.bus.endpoint;
import java.util.Map;
import org.springframework.cloud.bus.event.EnvironmentChangeRemote
ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.jmx.export.annotation.Managed0peration;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
@ManagedResource
public class EnvironmentBusEndpoint extends AbstractBusEndpoint {
public EnvironmentBusEndpoint (ApplicationEventPublisher context,
string id,
BusEndpoint delegate) {
super(context, id,delegate);
}
@RequestMapping(value = "env", method = RequestMethod.POST)
@ResponseBody
ManagedOperation
public void env(@RequestParam Map params,
@RequestParam(value = "destination",required = false)
String destination){
publish(new EnvironmentChangeRemoteApplicationEvent(this,
getInstancerd(,
destination,params));
}
} 以下是RefreshBusEndpoint.java的源碼。
package org.springframework.cloud.bus.endpoint;
import org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.jmx.export.annotation.Managed0peration;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
@ManagedResource
public class RefreshBusEndpoint extends AbstractBusEndpoint
public RefreshBusEndpoint (ApplicationEventPublisher context,Stringid,
BusEndpoint delegate)f
super(context, id, delegate);
RequestMapping(value = "refresh",method = RequestMethod.POST)
ResponseBody
@Managedoperation
public void refresh(
@RequestParam(value = "destination", required = false)
String destination){
publish(new RefreshRemoteApplicationEvent(this, getInstanceId(),
destination));
}
}2.RemoteApplicationEvent及其子類
RemoteApplicationEvent用來定義被傳輸?shù)南⑹录?/p>
以下是
RemoteApplicationEvent.java的源碼。
package org.springframework.cloud.bus.event;
import java.util.UUID;
import org.springframework.context.ApplicationEvent;
import org.springframework.util.StringUtils;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
@suppresswarnings( "serial")
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,property = "type")
@JsonIgnoreProperties( "source")
public abstract class RemoteApplicationEvent extends ApplicationEvent(
private static final 0bject TRANSTENT_SOURCE= new object();
private final string originService;
private final String destinationService;
private final String id;
protected RemoteApplicationEvent({
//for serialization libs like jackson
this(TRANSIENT_SOURCE,null,null);
}
protected RemoteApplicationEvent(Object source,String origin
service,
String destinationService){
super(source);
this.originservice = originService;
if(destinationService -=null)
destinationService ="**";
)
1/ If the destinationService is not already a wildcard,match
everything that follows
// if there at most two path elements,and last element is not
a global wildcard already
if(!"**".equals(destinationService)){
if (StringUtils.countoccurrencesof(destinationService,":")
<= 1
&& !StringUtils.endsWithIgnoreCase (destination
Service,":**")) {
//All instances of the destination unless specifically
requested
destinationService = destinationService +":**;
}
this.destinationService = destinationService;
this.id= UUID.randomUUID().toString(;
protected RemoteApplicationEvent (Object source,String origin
Service){
this(source,originservice,null);
}
//省略 getter/setter方法
}最常用的RemoteApplicationEvent的子類,莫過于
EnvironmentChangeRemoteApplicationEvent和RefreshRemoteApplicationEvent。
以下是
EnvironmentChangeRemoteApplicationEvent.java的源碼。
package org.springframework.cloud.bus.event;
import java.util.Map;
@SuppressWarnings( "serial")
public class EnvironmentChangeRemoteApplicationEvent extends Remote
ApplicationEvent {
private final Map<String,String> values;
@SuppressWarnings("unused")
private EnvironmentChangeRemoteApplicationEvent() {
//for serializers
values = null;
public EnvironmentChangeRemoteApplicationEvent (Object source,String
originService,
String destinationService,Map<String,String> values)
super(source,originService,destinationService);
this.values =values;
//省略 getter/setter 方法
}以下是
RefreshRemoteApplicationEvent.java的源碼。
package org.springframework.cloud.bus.event;
@suppressWarnings("serial")
public class RefreshRemoteApplicationEvent extends RemoteApplication
Event {
SuppressWarnings ("unused")
private RefreshRemoteApplicationEvent( {
//for serializers
}
public RefreshRemoteApplicationEvent(Object source,String origin
service,
String destinationservice){
super(source, originService,destinationService);
}
}3.ApplicationListener及其子類
ApplicationListener是用來處理消息事件的監(jiān)聽器,是Spring框架的核心接口。該接口只有一個(gè)方法。
public interface ApplicationListenerE extends ApplicationEvent> extends
EventListener {
★★
*Handle an application event.
* @param event the event to respond to
*/
void onApplicationEvent(E event) ;
}Spring Cloud Bus中的監(jiān)聽器都需要實(shí)現(xiàn)該接口。EnvironmentChangeListener及RefreshListener是其中兩個(gè)常用的實(shí)現(xiàn)類。
以下是
EnvironmentChangeListener.java的源碼。
package org.springframework.cloud.bus.event;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.context.environment.EnvironmentManager;
import org.springframework.context.ApplicationListener;
public class EnvironmentChangeListener
implements ApplicationListener<EnvironmentChangeRemote
ApplicationEvent> {
private static Log log = LogFactory.getLog(EnvironmentChangeListener.class);
@Autowired
private EnvironmentManager env;
aoverride
public void onApplicationEvent(EnvironmentChangeRemoteApplication
Event event){
Map<String,String> values = event.getValues();
log.info ("Received remote environment change request. Keys/
values to update "
+values);
for (Map.Entry<String,String> entry : values.entrySet()){
env.setProperty(entry.getKey(,entry.getvalue());
}
}
}以下是RefreshListener.java的源碼。
package org.springframework.cloud.bus.event;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.context.refresh.ContextRefresher;
import org.springframework.context.ApplicationListener;
public class RefreshListener
implements ApplicationListener<RefreshRemoteApplicationEvent>{
private static Log log = LogFactory.getLog (RefreshListener.class);
private ContextRefresher contextRefresher;
public RefreshListener(ContextRefresher contextRefresher)
this.contextRefresher = contextRefresher;
@override
public void onApplicationEvent(RefreshRemoteApplicationEvent event)
Set keys = contextRefresher.refresh();
log.info ("Received remote refresh request.Keys refreshed " +keys);
}
}本篇文章內(nèi)容給大家講解的是SpringCloudBus 設(shè)計(jì)原理
下篇文章給大家講解的是如何集成 BuS;
覺得文章不錯(cuò)的朋友可以轉(zhuǎn)發(fā)此文關(guān)注小編;
感謝大家的支持!
本文就是愿天堂沒有BUG給大家分享的內(nèi)容,大家有收獲的話可以分享下,想學(xué)習(xí)更多的話可以到微信公眾號里找我,我等你哦。
