<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          微服務(wù)架構(gòu)開發(fā)實(shí)戰(zhàn):SpringCloudBus的設(shè)計(jì)原理

          共 2696字,需瀏覽 6分鐘

           ·

          2022-05-10 13:07

          Spring Cloud Bus 設(shè)計(jì)原理

          本節(jié)將介紹Spring Cloud Bus的設(shè)計(jì)原理。理解原理有利于更好地基于Spring Cloud Bus來進(jìn)行二次開發(fā)。


          基于Spring Cloud Stream

          Spring Cloud Bus是基于Spring Cloud Stream基礎(chǔ)之上而做的封裝。Spring Cloud Stream是Spring Cloud家族中一個(gè)構(gòu)建消息驅(qū)動(dòng)微服務(wù)的框架。

          圖16-3所示的是來自官方的Spring Cloud Stream應(yīng)用模型。


          在該應(yīng)用模型中可以發(fā)現(xiàn)Spring Cloud Stream的幾個(gè)核心概念。

          1.Spring Cloud Stream Application

          Application通過inputs或outputs來與SpringCloud Stream中的 Binder交互,通過配置來binding,而Spring Cloud Stream的 Binder負(fù)責(zé)與中間件交互。所以只需要搞清楚如何與Spring Cloud Stream交互就可以方便使用消息驅(qū)動(dòng)的方式。

          2.Binder

          Binder是Spring Cloud Stream 的一個(gè)抽象概念,是應(yīng)用與消息中間件之間的黏合劑。目前Spring Cloud Stream實(shí)現(xiàn)了Kafka和 Rabbit等消息中間件的 Binder。

          通過Binder,可以很方便地連接消息中間件,可以動(dòng)態(tài)地改變消息的destinations(對應(yīng)于Kaf-ka 的 topic,Rabbit 的exchanges),這些都可以通過外部配置項(xiàng)做到。通過配置,不需要修改一行代碼,就能實(shí)現(xiàn)消息中間件的更換。

          3.訂閱/發(fā)布

          消息的發(fā)布(Publish)和訂閱(Subscribe)是事件驅(qū)動(dòng)的經(jīng)典模式,如圖16-4所示。SpringCloud Stream的數(shù)據(jù)交互也是基于這個(gè)思想。生產(chǎn)者把消息通過某個(gè)topic廣播出去(Spring CloudStream 中的destinations)。其他的微服務(wù)通過訂閱特定topic來獲取廣播出來的消息,以觸發(fā)業(yè)務(wù)的進(jìn)行。


          這種模式極大地降低了生產(chǎn)者與消費(fèi)者之間的耦合。即使有新的應(yīng)用引入,也不需要破壞當(dāng)前系統(tǒng)的整體結(jié)構(gòu)。

          4.消費(fèi)者分組

          Spring Cloud Stream的意思基本與Kafka一致。為了防止同一個(gè)事件被重復(fù)消費(fèi),只要把這些應(yīng)用放置于同一個(gè)“group”中,就能夠保證消息只會(huì)被其中一個(gè)應(yīng)用消費(fèi)一次。

          每個(gè)binding 都可以使用
          spring.cloud.stream.bindings..group來指定分組的名稱,如圖16-5所示。


          圖16-5展示了Stream 的消費(fèi)者分組設(shè)置,屬性值分別設(shè)置為
          spring.cloud.stream.bind-ings..group=hdfsWrite和?spring.cloud.stream.bindings..group=average.

          5.持久化

          消息事件的持久化是必不可少的。Spring Cloud Stream可以動(dòng)態(tài)地選擇一個(gè)消息隊(duì)列是否需要持久化。

          6.Binding

          Binding 是通過配置把應(yīng)用與Spring Cloud Stream的 Binder綁定在一起的,之后只需要修改Binding 的配置來達(dá)到動(dòng)態(tài)修改topic、exchange、type等一系列信息,而不需要修改一行代碼。

          7.分區(qū)支持

          Spring Cloud Stream支持在給定應(yīng)用程序的多個(gè)實(shí)例之間對數(shù)據(jù)進(jìn)行分區(qū)。在分區(qū)方案中,物理通信介質(zhì)(如topic)被視為多個(gè)分區(qū)。

          Spring Cloud Stream為統(tǒng)一實(shí)現(xiàn)分區(qū)處理用例提供了一個(gè)通用抽象。無論代理本身是自然分區(qū)(如Kafka)還是非自然分區(qū)(如RabbitMQ),都可以使用分區(qū)。


          Spring Cloud Bus的編程模型

          當(dāng)微服務(wù)之間需要通信時(shí),先將消息傳遞給消息總線,而其他微服務(wù)實(shí)現(xiàn)接收消息總線分發(fā)信息。Spring Cloud Bus提供了簡化微服務(wù)發(fā)送和接收消息總線指令的能力。

          1.AbstractBusEndpoint及其子類

          通過這個(gè)接口來實(shí)現(xiàn)用戶的訪問,都需要繼承AbstractBusEndpoint。

          以下是AbstractBusEndpoint.java的核心代碼。

          package org.springframework.cloud.bus.endpoint;
          import org.springframework.boot.actuate.endpoint.Endpoint;
          import org.springframework.boot.actuate.endpoint.mvc.MvcEndpoint;
          import org.springframework.context.ApplicationEvent;
          import org.springframework.context.ApplicationEventPublisher;
          public class AbstractBusEndpoint implements MvcEndpoint
          private ApplicationEventPublisher context;
          private BusEndpoint delegate;
          private string appId;
          public AbstractBusEndpoint(ApplicationEventPublisher context,String
          appId,
          BusEndpoint busEndpoint){
          this.context =context;
          this.apprd = appId;
          this.delegate = busEndpoint;
          }
          protected string getInstanceId() {
          return this.appld;
          protected void publish(ApplicationEvent event)
          context.publishEvent (event);
          }
          @override
          public String getPath()
          return "/"+this.delegate.getld();
          }
          override
          public boolean issensitive({
          return this.delegate.isSensitive(;
          }
          @override
          @Suppresswarnings("rawtypes"
          public Class getEndpointType()
          f
          return this.delegate.getClass();
          }
          }

          最常用的AbstractBusEndpoint 的子類,莫過于EnvironmentBusEndpoint和RefreshBusEndpoint。

          這兩個(gè)類分別實(shí)現(xiàn)了/bus/env和/bus/refresh的HTTP接口。

          以下是
          EnvironmentBusEndpoint.java的源碼。

          package org.springframework.cloud.bus.endpoint;
          import java.util.Map;
          import org.springframework.cloud.bus.event.EnvironmentChangeRemote
          ApplicationEvent;
          import org.springframework.context.ApplicationEventPublisher;
          import org.springframework.jmx.export.annotation.Managed0peration;
          import org.springframework.jmx.export.annotation.ManagedResource;
          import org.springframework.web.bind.annotation.RequestMapping;
          import org.springframework.web.bind.annotation.RequestMethod;
          import org.springframework.web.bind.annotation.RequestParam;
          import org.springframework.web.bind.annotation.ResponseBody;
          @ManagedResource
          public class EnvironmentBusEndpoint extends AbstractBusEndpoint {
          public EnvironmentBusEndpoint (ApplicationEventPublisher context,
          string id,
          BusEndpoint delegate) {
          super(context, id,delegate);
          }
          @RequestMapping(value = "env", method = RequestMethod.POST)
          @ResponseBody
          ManagedOperation
          public void env(@RequestParam Map params,
          @RequestParam(value = "destination",required = false)
          String destination){
          publish(new EnvironmentChangeRemoteApplicationEvent(this,
          getInstancerd(,
          destination,params));
          }
          }

          以下是RefreshBusEndpoint.java的源碼。

          package org.springframework.cloud.bus.endpoint;
          import org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent;
          import org.springframework.context.ApplicationEventPublisher;
          import org.springframework.jmx.export.annotation.Managed0peration;
          import org.springframework.jmx.export.annotation.ManagedResource;
          import org.springframework.web.bind.annotation.RequestMapping;
          import org.springframework.web.bind.annotation.RequestMethod;
          import org.springframework.web.bind.annotation.RequestParam;
          import org.springframework.web.bind.annotation.ResponseBody;
          @ManagedResource
          public class RefreshBusEndpoint extends AbstractBusEndpoint
          public RefreshBusEndpoint (ApplicationEventPublisher context,Stringid,
          BusEndpoint delegate)f
          super(context, id, delegate);
          RequestMapping(value = "refresh",method = RequestMethod.POST)
          ResponseBody
          @Managedoperation
          public void refresh(
          @RequestParam(value = "destination", required = false)
          String destination){
          publish(new RefreshRemoteApplicationEvent(this, getInstanceId(),
          destination));
          }
          }

          2.RemoteApplicationEvent及其子類

          RemoteApplicationEvent用來定義被傳輸?shù)南⑹录?/p>

          以下是
          RemoteApplicationEvent.java的源碼。

          package org.springframework.cloud.bus.event;
          import java.util.UUID;
          import org.springframework.context.ApplicationEvent;
          import org.springframework.util.StringUtils;
          import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
          import com.fasterxml.jackson.annotation.JsonTypeInfo;
          @suppresswarnings( "serial")
          @JsonTypeInfo(use = JsonTypeInfo.Id.NAME,property = "type")
          @JsonIgnoreProperties( "source")
          public abstract class RemoteApplicationEvent extends ApplicationEvent(
          private static final 0bject TRANSTENT_SOURCE= new object();
          private final string originService;
          private final String destinationService;
          private final String id;
          protected RemoteApplicationEvent({
          //for serialization libs like jackson
          this(TRANSIENT_SOURCE,null,null);
          }
          protected RemoteApplicationEvent(Object source,String origin
          service,
          String destinationService){
          super(source);
          this.originservice = originService;
          if(destinationService -=null
          destinationService ="**";
          )
          1/ If the destinationService is not already a wildcard,match
          everything that follows
          // if there at most two path elements,and last element is not
          a global wildcard already
          if(!"**".equals(destinationService)){
          if (StringUtils.countoccurrencesof(destinationService,":")
          <= 1
          && !StringUtils.endsWithIgnoreCase (destination
          Service,":**")) {
          //All instances of the destination unless specifically
          requested
          destinationService = destinationService +":**;
          }
          this.destinationService = destinationService;
          this.id= UUID.randomUUID().toString(;
          protected RemoteApplicationEvent (Object source,String origin
          Service){
          this(source,originservice,null);
          }
          //省略 getter/setter方法
          }

          最常用的RemoteApplicationEvent的子類,莫過于
          EnvironmentChangeRemoteApplicationEvent和RefreshRemoteApplicationEvent。

          以下是
          EnvironmentChangeRemoteApplicationEvent.java的源碼。

          package org.springframework.cloud.bus.event;
          import java.util.Map;
          @SuppressWarnings( "serial")
          public class EnvironmentChangeRemoteApplicationEvent extends Remote
          ApplicationEvent {
          private final Map<String,String> values;
          @SuppressWarnings("unused")
          private EnvironmentChangeRemoteApplicationEvent() {
          //for serializers
          values = null;
          public EnvironmentChangeRemoteApplicationEvent (Object source,String
          originService,
          String destinationService,Map<String,String> values)
          super(source,originService,destinationService);
          this.values =values;
          //省略 getter/setter 方法
          }

          以下是
          RefreshRemoteApplicationEvent.java的源碼。

          package org.springframework.cloud.bus.event;
          @suppressWarnings("serial")
          public class RefreshRemoteApplicationEvent extends RemoteApplication
          Event
          {
          SuppressWarnings ("unused")
          private RefreshRemoteApplicationEvent( {
          //for serializers
          }
          public RefreshRemoteApplicationEvent(Object source,String origin
          service,
          String destinationservice)
          {
          super(source, originService,destinationService);
          }
          }

          3.ApplicationListener及其子類

          ApplicationListener是用來處理消息事件的監(jiān)聽器,是Spring框架的核心接口。該接口只有一個(gè)方法。

          public interface ApplicationListenerE extends ApplicationEvent> extends
          EventListener
          {
          ★★
          *Handle an application event.
          * @param event the event to respond to
          */
          void onApplicationEvent(E event) ;
          }

          Spring Cloud Bus中的監(jiān)聽器都需要實(shí)現(xiàn)該接口。EnvironmentChangeListener及RefreshListener是其中兩個(gè)常用的實(shí)現(xiàn)類。

          以下是
          EnvironmentChangeListener.java的源碼。

          package org.springframework.cloud.bus.event;
          import java.util.Map;
          import org.apache.commons.logging.Log;
          import org.apache.commons.logging.LogFactory;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.cloud.context.environment.EnvironmentManager;
          import org.springframework.context.ApplicationListener;
          public class EnvironmentChangeListener
          implements ApplicationListener<EnvironmentChangeRemote
          ApplicationEvent>
          {
          private static Log log = LogFactory.getLog(EnvironmentChangeListener.class);
          @Autowired
          private EnvironmentManager env;
          aoverride
          public void onApplicationEvent(EnvironmentChangeRemoteApplication
          Event event
          {
          Map<String,String> values = event.getValues();
          log.info ("Received remote environment change request. Keys/
          values to update "

          +values);
          for (Map.Entry<String,String> entry : values.entrySet()){
          env.setProperty(entry.getKey(,entry.getvalue());
          }
          }
          }

          以下是RefreshListener.java的源碼。

          package org.springframework.cloud.bus.event;
          import java.util.Set;
          import org.apache.commons.logging.Log;
          import org.apache.commons.logging.LogFactory;
          import org.springframework.cloud.context.refresh.ContextRefresher;
          import org.springframework.context.ApplicationListener;
          public class RefreshListener
          implements ApplicationListener<RefreshRemoteApplicationEvent>
          {
          private static Log log = LogFactory.getLog (RefreshListener.class);
          private ContextRefresher contextRefresher;
          public RefreshListener(ContextRefresher contextRefresher)
          this.contextRefresher
          = contextRefresher;
          @override
          public void onApplicationEvent(RefreshRemoteApplicationEvent event)
          Set keys
          = contextRefresher.refresh();
          log.info ("Received remote refresh request.Keys refreshed " +keys);
          }
          }

          本篇文章內(nèi)容給大家講解的是SpringCloudBus 設(shè)計(jì)原理

          1. 下篇文章給大家講解的是如何集成 BuS;

          2. 覺得文章不錯(cuò)的朋友可以轉(zhuǎn)發(fā)此文關(guān)注小編;

          3. 感謝大家的支持!


          本文就是愿天堂沒有BUG給大家分享的內(nèi)容,大家有收獲的話可以分享下,想學(xué)習(xí)更多的話可以到微信公眾號里找我,我等你哦。

          瀏覽 71
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲欧美国产精品久久久久久久 | 日撸夜撸在线看 | 国产亚洲中文字幕在线观看 | 人人看人人干人人过人人 | 怡红院成人视频 |