<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          阿里終面:如何設(shè)計(jì)一個(gè)高性能網(wǎng)關(guān)?

          共 63603字,需瀏覽 128分鐘

           ·

          2021-03-17 14:16

          點(diǎn)擊關(guān)注公眾號(hào),Java干貨及時(shí)送達(dá)

          作者:煙味i
          鏈接:https://cnblogs.com/2YSP/p/14223892.html

          一、前言

          最近在github上看了soul網(wǎng)關(guān)的設(shè)計(jì),突然就來(lái)了興趣準(zhǔn)備自己從零開(kāi)始寫一個(gè)高性能的網(wǎng)關(guān)。

          經(jīng)過(guò)兩周時(shí)間的開(kāi)發(fā),我的網(wǎng)關(guān)ship-gate核心功能基本都已完成,最大的缺陷就是前端功底太差沒(méi)有管理后臺(tái)??。

          二、設(shè)計(jì)

          2.1 技術(shù)選型

          網(wǎng)關(guān)是所有請(qǐng)求的入口,所以要求有很高的吞吐量,為了實(shí)現(xiàn)這點(diǎn)可以使用請(qǐng)求異步化來(lái)解決。

          目前一般有以下兩種方案:

          • Tomcat/Jetty+NIO+Servlet3

          Servlet3已經(jīng)支持異步,這種方案使用比較多,京東,有贊和Zuul,都用的是這種方案。

          • Netty+NIO

          Netty為高并發(fā)而生,目前唯品會(huì)的網(wǎng)關(guān)使用這個(gè)策略,在唯品會(huì)的技術(shù)文章中在相同的情況下Netty是每秒30w+的吞吐量,Tomcat是13w+,可以看出是有一定的差距的,但是Netty需要自己處理HTTP協(xié)議,這一塊比較麻煩。

          后面發(fā)現(xiàn)Soul網(wǎng)關(guān)是基于Spring WebFlux(底層Netty)的,不用太關(guān)心HTTP協(xié)議的處理,于是決定也用Spring WebFlux。

          網(wǎng)關(guān)的第二個(gè)特點(diǎn)是具備可擴(kuò)展性,比如Netflix Zuul有preFilters,postFilters等在不同的階段方便處理不同的業(yè)務(wù),基于責(zé)任鏈模式將請(qǐng)求進(jìn)行鏈?zhǔn)教幚砑纯蓪?shí)現(xiàn)。

          在微服務(wù)架構(gòu)下,服務(wù)都會(huì)進(jìn)行多實(shí)例部署來(lái)保證高可用,請(qǐng)求到達(dá)網(wǎng)關(guān)時(shí),網(wǎng)關(guān)需要根據(jù)URL找到所有可用的實(shí)例,這時(shí)就需要服務(wù)注冊(cè)和發(fā)現(xiàn)功能,即注冊(cè)中心。

          現(xiàn)在流行的注冊(cè)中心有Apache的Zookeeper和阿里的Nacos兩種(consul有點(diǎn)小眾),因?yàn)橹皩慠PC框架時(shí)已經(jīng)用過(guò)了Zookeeper,所以這次就選擇了Nacos。

          2.2 需求清單

          首先要明確目標(biāo),即開(kāi)發(fā)一個(gè)具備哪些特性的網(wǎng)關(guān),總結(jié)下后如下:

          • 自定義路由規(guī)則

            可基于version的路由規(guī)則設(shè)置,路由對(duì)象包括DEFAUL,HEADER和QUERY三種,匹配方式包括=、regex、like三種。

          • 跨語(yǔ)言

            HTTP協(xié)議天生跨語(yǔ)言

          • 高性能

            Netty本身就是一款高性能的通信框架,同時(shí)server將一些路由規(guī)則等數(shù)據(jù)緩存到JVM內(nèi)存避免請(qǐng)求admin服務(wù)。

          • 高可用

            支持集群模式防止單節(jié)點(diǎn)故障,無(wú)狀態(tài)。

          • 灰度發(fā)布

            灰度發(fā)布(又名金絲雀發(fā)布)是指在黑與白之間,能夠平滑過(guò)渡的一種發(fā)布方式。在其上可以進(jìn)行A/B testing,即讓一部分用戶繼續(xù)用產(chǎn)品特性A,一部分用戶開(kāi)始用產(chǎn)品特性B,如果用戶對(duì)B沒(méi)有什么反對(duì)意見(jiàn),那么逐步擴(kuò)大范圍,把所有用戶都遷移到B上面來(lái)。通過(guò)特性一可以實(shí)現(xiàn)。

          • 接口鑒權(quán)

            基于責(zé)任鏈模式,用戶開(kāi)發(fā)自己的鑒權(quán)插件即可。

          • 負(fù)載均衡

            支持多種負(fù)載均衡算法,如隨機(jī),輪詢,加權(quán)輪詢等。利用SPI機(jī)制可以根據(jù)配置進(jìn)行動(dòng)態(tài)加載。

          2.3 架構(gòu)設(shè)計(jì)

          在參考了一些優(yōu)秀的網(wǎng)關(guān)Zuul,Spring Cloud Gateway,Soul后,將項(xiàng)目劃分為以下幾個(gè)模塊。

          名稱描述
          ship-admin后臺(tái)管理界面,配置路由規(guī)則等
          ship-server網(wǎng)關(guān)服務(wù)端,核心功能模塊
          ship-client-spring-boot-starter網(wǎng)關(guān)客戶端,自動(dòng)注冊(cè)服務(wù)信息到注冊(cè)中心
          ship-common一些公共的代碼,如pojo,常量等。

          它們之間的關(guān)系如圖:

          注意: 這張圖與實(shí)際實(shí)現(xiàn)有點(diǎn)出入,Nacos push到本地緩存的那個(gè)環(huán)節(jié)沒(méi)有實(shí)現(xiàn),目前只有ship-sever定時(shí)輪詢pull的過(guò)程。ship-admin從Nacos獲取注冊(cè)服務(wù)信息的過(guò)程,也改成了ServiceA啟動(dòng)時(shí)主動(dòng)發(fā)生HTTP請(qǐng)求通知ship-admin。

          2.4 表結(jié)構(gòu)設(shè)計(jì)

          三、編碼

          3.1 ship-client-spring-boot-starter

          首先創(chuàng)建一個(gè)spring-boot-starter命名為ship-client-spring-boot-starter,不知道如何自定義starter的可以看我以前寫的《開(kāi)發(fā)自己的starter》。

          更多 Spring Boot 教程推薦看這個(gè):

          https://github.com/javastacks/spring-boot-best-practice

          其核心類 AutoRegisterListener 就是在項(xiàng)目啟動(dòng)時(shí)做了兩件事:

          1.將服務(wù)信息注冊(cè)到Nacos注冊(cè)中心

          2.通知ship-admin服務(wù)上線了并注冊(cè)下線hook。

          代碼如下:

          /**
           * Created by 2YSP on 2020/12/21
           */

          public class AutoRegisterListener implements ApplicationListener<ContextRefreshedEvent{
           
              private final static Logger LOGGER = LoggerFactory.getLogger(AutoRegisterListener.class);
           
              private volatile AtomicBoolean registered = new AtomicBoolean(false);
           
              private final ClientConfigProperties properties;
           
              @NacosInjected
              private NamingService namingService;
           
              @Autowired
              private RequestMappingHandlerMapping handlerMapping;
           
              private final ExecutorService pool;
           
              /**
               * url list to ignore
               */

              private static List<String> ignoreUrlList = new LinkedList<>();
           
              static {
                  ignoreUrlList.add("/error");
              }
           
              public AutoRegisterListener(ClientConfigProperties properties) {
                  if (!check(properties)) {
                      LOGGER.error("client config port,contextPath,appName adminUrl and version can't be empty!");
                      throw new ShipException("client config port,contextPath,appName adminUrl and version can't be empty!");
                  }
                  this.properties = properties;
                  pool = new ThreadPoolExecutor(140, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
              }
           
              /**
               * check the ClientConfigProperties
               *
               * @param properties
               * @return
               */

              private boolean check(ClientConfigProperties properties) {
                  if (properties.getPort() == null || properties.getContextPath() == null
                          || properties.getVersion() == null || properties.getAppName() == null
                          || properties.getAdminUrl() == null) {
                      return false;
                  }
                  return true;
              }
           
           
              @Override
              public void onApplicationEvent(ContextRefreshedEvent event) {
                  if (!registered.compareAndSet(falsetrue)) {
                      return;
                  }
                  doRegister();
                  registerShutDownHook();
              }
           
              /**
               * send unregister request to admin when jvm shutdown
               */

              private void registerShutDownHook() {
                  final String url = "http://" + properties.getAdminUrl() + AdminConstants.UNREGISTER_PATH;
                  final UnregisterAppDTO unregisterAppDTO = new UnregisterAppDTO();
                  unregisterAppDTO.setAppName(properties.getAppName());
                  unregisterAppDTO.setVersion(properties.getVersion());
                  unregisterAppDTO.setIp(IpUtil.getLocalIpAddress());
                  unregisterAppDTO.setPort(properties.getPort());
                  Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                      OkhttpTool.doPost(url, unregisterAppDTO);
                      LOGGER.info("[{}:{}] unregister from ship-admin success!", unregisterAppDTO.getAppName(), unregisterAppDTO.getVersion());
                  }));
              }
           
              /**
               * register all interface info to register center
               */

              private void doRegister() {
                  Instance instance = new Instance();
                  instance.setIp(IpUtil.getLocalIpAddress());
                  instance.setPort(properties.getPort());
                  instance.setEphemeral(true);
                  Map<String, String> metadataMap = new HashMap<>();
                  metadataMap.put("version", properties.getVersion());
                  metadataMap.put("appName", properties.getAppName());
                  instance.setMetadata(metadataMap);
                  try {
                      namingService.registerInstance(properties.getAppName(), NacosConstants.APP_GROUP_NAME, instance);
                  } catch (NacosException e) {
                      LOGGER.error("register to nacos fail", e);
                      throw new ShipException(e.getErrCode(), e.getErrMsg());
                  }
                  LOGGER.info("register interface info to nacos success!");
                  // send register request to ship-admin
                  String url = "http://" + properties.getAdminUrl() + AdminConstants.REGISTER_PATH;
                  RegisterAppDTO registerAppDTO = buildRegisterAppDTO(instance);
                  OkhttpTool.doPost(url, registerAppDTO);
                  LOGGER.info("register to ship-admin success!");
              }
           
           
              private RegisterAppDTO buildRegisterAppDTO(Instance instance) {
                  RegisterAppDTO registerAppDTO = new RegisterAppDTO();
                  registerAppDTO.setAppName(properties.getAppName());
                  registerAppDTO.setContextPath(properties.getContextPath());
                  registerAppDTO.setIp(instance.getIp());
                  registerAppDTO.setPort(instance.getPort());
                  registerAppDTO.setVersion(properties.getVersion());
                  return registerAppDTO;
              }
          }
           

          3.2 ship-server

          ship-sever項(xiàng)目主要包括了兩個(gè)部分內(nèi)容:

          1.請(qǐng)求動(dòng)態(tài)路由的主流程

          2.本地緩存數(shù)據(jù)和ship-admin及nacos同步,這部分在后面3.3再講。

          ship-server實(shí)現(xiàn)動(dòng)態(tài)路由的原理是利用WebFilter攔截請(qǐng)求,然后將請(qǐng)求教給plugin chain去鏈?zhǔn)教幚怼?/p>

          PluginFilter根據(jù)URL解析出appName,然后將啟用的plugin組裝成plugin chain。

          最新 Java 核心技術(shù)教程,都在這了!

          public class PluginFilter implements WebFilter {
           
              private ServerConfigProperties properties;
           
              public PluginFilter(ServerConfigProperties properties) {
                  this.properties = properties;
              }
           
              @Override
              public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
                  String appName = parseAppName(exchange);
                  if (CollectionUtils.isEmpty(ServiceCache.getAllInstances(appName))) {
                      throw new ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
                  }
                  PluginChain pluginChain = new PluginChain(properties, appName);
                  pluginChain.addPlugin(new DynamicRoutePlugin(properties));
                  pluginChain.addPlugin(new AuthPlugin(properties));
                  return pluginChain.execute(exchange, pluginChain);
              }
           
              private String parseAppName(ServerWebExchange exchange) {
                  RequestPath path = exchange.getRequest().getPath();
                  String appName = path.value().split("/")[1];
                  return appName;
              }
          }

          PluginChain繼承了AbstractShipPlugin并持有所有要執(zhí)行的插件。

          /**
           * @Author: Ship
           * @Description:
           * @Date: Created in 2020/12/25
           */

          public class PluginChain extends AbstractShipPlugin {
              /**
               * the pos point to current plugin
               */

              private int pos;
              /**
               * the plugins of chain
               */

              private List<ShipPlugin> plugins;
           
              private final String appName;
           
              public PluginChain(ServerConfigProperties properties, String appName) {
                  super(properties);
                  this.appName = appName;
              }
           
              /**
               * add enabled plugin to chain
               *
               * @param shipPlugin
               */

              public void addPlugin(ShipPlugin shipPlugin) {
                  if (plugins == null) {
                      plugins = new ArrayList<>();
                  }
                  if (!PluginCache.isEnabled(appName, shipPlugin.name())) {
                      return;
                  }
                  plugins.add(shipPlugin);
                  // order by the plugin's order
                  plugins.sort(Comparator.comparing(ShipPlugin::order));
              }
           
              @Override
              public Integer order() {
                  return null;
              }
           
              @Override
              public String name() {
                  return null;
              }
           
              @Override
              public Mono<Void> execute(ServerWebExchange exchange, PluginChain pluginChain) {
                  if (pos == plugins.size()) {
                      return exchange.getResponse().setComplete();
                  }
                  return pluginChain.plugins.get(pos++).execute(exchange, pluginChain);
              }
           
              public String getAppName() {
                  return appName;
              }
           
          }
           

          AbstractShipPlugin實(shí)現(xiàn)了ShipPlugin接口,并持有ServerConfigProperties配置對(duì)象。

          public abstract class AbstractShipPlugin implements ShipPlugin {
           
              protected ServerConfigProperties properties;
           
              public AbstractShipPlugin(ServerConfigProperties properties) {
                  this.properties = properties;
              }
          }

          ShipPlugin接口定義了所有插件必須實(shí)現(xiàn)的三個(gè)方法order(),name()和execute()。

          public interface ShipPlugin {
              /**
               * lower values have higher priority
               *
               * @return
               */

              Integer order();
           
              /**
               * return current plugin name
               *
               * @return
               */

              String name();
           
              Mono<Void> execute(ServerWebExchange exchange,PluginChain pluginChain);
           
          }

          DynamicRoutePlugin繼承了抽象類AbstractShipPlugin,包含了動(dòng)態(tài)路由的主要業(yè)務(wù)邏輯。

          /**
           * @Author: Ship
           * @Description:
           * @Date: Created in 2020/12/25
           */

          public class DynamicRoutePlugin extends AbstractShipPlugin {
           
              private final static Logger LOGGER = LoggerFactory.getLogger(DynamicRoutePlugin.class);
           
              private static WebClient webClient;
           
              private static final Gson gson = new GsonBuilder().create();
           
              static {
                  HttpClient httpClient = HttpClient.create()
                          .tcpConfiguration(client ->
                                  client.doOnConnected(conn ->
                                          conn.addHandlerLast(new ReadTimeoutHandler(3))
                                                  .addHandlerLast(new WriteTimeoutHandler(3)))
                                          .option(ChannelOption.TCP_NODELAY, true)
                          );
                  webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient))
                          .build();
              }
           
              public DynamicRoutePlugin(ServerConfigProperties properties) {
                  super(properties);
              }
           
              @Override
              public Integer order() {
                  return ShipPluginEnum.DYNAMIC_ROUTE.getOrder();
              }
           
              @Override
              public String name() {
                  return ShipPluginEnum.DYNAMIC_ROUTE.getName();
              }
           
              @Override
              public Mono<Void> execute(ServerWebExchange exchange, PluginChain pluginChain) {
                  String appName = pluginChain.getAppName();
                  ServiceInstance serviceInstance = chooseInstance(appName, exchange.getRequest());
          //        LOGGER.info("selected instance is [{}]", gson.toJson(serviceInstance));
                  // request service
                  String url = buildUrl(exchange, serviceInstance);
                  return forward(exchange, url);
              }
           
              /**
               * forward request to backend service
               *
               * @param exchange
               * @param url
               * @return
               */

              private Mono<Void> forward(ServerWebExchange exchange, String url) {
                  ServerHttpRequest request = exchange.getRequest();
                  ServerHttpResponse response = exchange.getResponse();
                  HttpMethod method = request.getMethod();
           
                  WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(url).headers((headers) -> {
                      headers.addAll(request.getHeaders());
                  });
           
                  WebClient.RequestHeadersSpec<?> reqHeadersSpec;
                  if (requireHttpBody(method)) {
                      reqHeadersSpec = requestBodySpec.body(BodyInserters.fromDataBuffers(request.getBody()));
                  } else {
                      reqHeadersSpec = requestBodySpec;
                  }
                  // nio->callback->nio
                  return reqHeadersSpec.exchange().timeout(Duration.ofMillis(properties.getTimeOutMillis()))
                          .onErrorResume(ex -> {
                              return Mono.defer(() -> {
                                  String errorResultJson = "";
                                  if (ex instanceof TimeoutException) {
                                      errorResultJson = "{\"code\":5001,\"message\":\"network timeout\"}";
                                  } else {
                                      errorResultJson = "{\"code\":5000,\"message\":\"system error\"}";
                                  }
                                  return ShipResponseUtil.doResponse(exchange, errorResultJson);
                              }).then(Mono.empty());
                          }).flatMap(backendResponse -> {
                              response.setStatusCode(backendResponse.statusCode());
                              response.getHeaders().putAll(backendResponse.headers().asHttpHeaders());
                              return response.writeWith(backendResponse.bodyToFlux(DataBuffer.class));
                          });
              }
           
              /**
               * weather the http method need http body
               *
               * @param method
               * @return
               */

              private boolean requireHttpBody(HttpMethod method) {
                  if (method.equals(HttpMethod.POST) || method.equals(HttpMethod.PUT) || method.equals(HttpMethod.PATCH)) {
                      return true;
                  }
                  return false;
              }
           
              private String buildUrl(ServerWebExchange exchange, ServiceInstance serviceInstance) {
                  ServerHttpRequest request = exchange.getRequest();
                  String query = request.getURI().getQuery();
                  String path = request.getPath().value().replaceFirst("/" + serviceInstance.getAppName(), "");
                  String url = "http://" + serviceInstance.getIp() + ":" + serviceInstance.getPort() + path;
                  if (!StringUtils.isEmpty(query)) {
                      url = url + "?" + query;
                  }
                  return url;
              }
           
           
              /**
               * choose an ServiceInstance according to route rule config and load balancing algorithm
               *
               * @param appName
               * @param request
               * @return
               */

              private ServiceInstance chooseInstance(String appName, ServerHttpRequest request) {
                  List<ServiceInstance> serviceInstances = ServiceCache.getAllInstances(appName);
                  if (CollectionUtils.isEmpty(serviceInstances)) {
                      LOGGER.error("service instance of {} not find", appName);
                      throw new ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
                  }
                  String version = matchAppVersion(appName, request);
                  if (StringUtils.isEmpty(version)) {
                      throw new ShipException("match app version error");
                  }
                  // filter serviceInstances by version
                  List<ServiceInstance> instances = serviceInstances.stream().filter(i -> i.getVersion().equals(version)).collect(Collectors.toList());
                  //Select an instance based on the load balancing algorithm
                  LoadBalance loadBalance = LoadBalanceFactory.getInstance(properties.getLoadBalance(), appName, version);
                  ServiceInstance serviceInstance = loadBalance.chooseOne(instances);
                  return serviceInstance;
              }
           
           
              private String matchAppVersion(String appName, ServerHttpRequest request) {
                  List<AppRuleDTO> rules = RouteRuleCache.getRules(appName);
                  rules.sort(Comparator.comparing(AppRuleDTO::getPriority).reversed());
                  for (AppRuleDTO rule : rules) {
                      if (match(rule, request)) {
                          return rule.getVersion();
                      }
                  }
                  return null;
              }
           
           
              private boolean match(AppRuleDTO rule, ServerHttpRequest request) {
                  String matchObject = rule.getMatchObject();
                  String matchKey = rule.getMatchKey();
                  String matchRule = rule.getMatchRule();
                  Byte matchMethod = rule.getMatchMethod();
                  if (MatchObjectEnum.DEFAULT.getCode().equals(matchObject)) {
                      return true;
                  } else if (MatchObjectEnum.QUERY.getCode().equals(matchObject)) {
                      String param = request.getQueryParams().getFirst(matchKey);
                      if (!StringUtils.isEmpty(param)) {
                          return StringTools.match(param, matchMethod, matchRule);
                      }
                  } else if (MatchObjectEnum.HEADER.getCode().equals(matchObject)) {
                      HttpHeaders headers = request.getHeaders();
                      String headerValue = headers.getFirst(matchKey);
                      if (!StringUtils.isEmpty(headerValue)) {
                          return StringTools.match(headerValue, matchMethod, matchRule);
                      }
                  }
                  return false;
              }
           
          }
           

          3.3 數(shù)據(jù)同步

          app數(shù)據(jù)同步

          后臺(tái)服務(wù)(如訂單服務(wù))啟動(dòng)時(shí),只將服務(wù)名,版本,ip地址和端口號(hào)注冊(cè)到了Nacos,并沒(méi)有實(shí)例的權(quán)重和啟用的插件信息怎么辦?

          一般在線的實(shí)例權(quán)重和插件列表都是在管理界面配置,然后動(dòng)態(tài)生效的,所以需要ship-admin定時(shí)更新實(shí)例的權(quán)重和插件信息到注冊(cè)中心。

          對(duì)應(yīng)代碼ship-admin的NacosSyncListener

          /**
           * @Author: Ship
           * @Description:
           * @Date: Created in 2020/12/30
           */

          @Configuration
          public class NacosSyncListener implements ApplicationListener<ContextRefreshedEvent{
           
              private static final Logger LOGGER = LoggerFactory.getLogger(NacosSyncListener.class);
           
              private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,
                      new ShipThreadFactory("nacos-sync"true).create());
           
              @NacosInjected
              private NamingService namingService;
           
              @Value("${nacos.discovery.server-addr}")
              private String baseUrl;
           
              @Resource
              private AppService appService;
           
              @Override
              public void onApplicationEvent(ContextRefreshedEvent event) {
                  if (event.getApplicationContext().getParent() != null) {
                      return;
                  }
                  String url = "http://" + baseUrl + NacosConstants.INSTANCE_UPDATE_PATH;
                  scheduledPool.scheduleWithFixedDelay(new NacosSyncTask(namingService, url, appService), 030L, TimeUnit.SECONDS);
              }
           
              class NacosSyncTask implements Runnable {
           
                  private NamingService namingService;
           
                  private String url;
           
                  private AppService appService;
           
                  private Gson gson = new GsonBuilder().create();
           
                  public NacosSyncTask(NamingService namingService, String url, AppService appService) {
                      this.namingService = namingService;
                      this.url = url;
                      this.appService = appService;
                  }
           
                  /**
                   * Regular update weight,enabled plugins to nacos instance
                   */

                  @Override
                  public void run() {
                      try {
                          // get all app names
                          ListView<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME);
                          if (CollectionUtils.isEmpty(services.getData())) {
                              return;
                          }
                          List<String> appNames = services.getData();
                          List<AppInfoDTO> appInfos = appService.getAppInfos(appNames);
                          for (AppInfoDTO appInfo : appInfos) {
                              if (CollectionUtils.isEmpty(appInfo.getInstances())) {
                                  continue;
                              }
                              for (ServiceInstance instance : appInfo.getInstances()) {
                                  Map<String, Object> queryMap = buildQueryMap(appInfo, instance);
                                  String resp = OkhttpTool.doPut(url, queryMap, "");
                                  LOGGER.debug("response :{}", resp);
                              }
                          }
           
                      } catch (Exception e) {
                          LOGGER.error("nacos sync task error", e);
                      }
                  }
           
                  private Map<String, Object> buildQueryMap(AppInfoDTO appInfo, ServiceInstance instance) {
                      Map<String, Object> map = new HashMap<>();
                      map.put("serviceName", appInfo.getAppName());
                      map.put("groupName", NacosConstants.APP_GROUP_NAME);
                      map.put("ip", instance.getIp());
                      map.put("port", instance.getPort());
                      map.put("weight", instance.getWeight().doubleValue());
                      NacosMetadata metadata = new NacosMetadata();
                      metadata.setAppName(appInfo.getAppName());
                      metadata.setVersion(instance.getVersion());
                      metadata.setPlugins(String.join(",", appInfo.getEnabledPlugins()));
                      map.put("metadata", StringTools.urlEncode(gson.toJson(metadata)));
                      map.put("ephemeral"true);
                      return map;
                  }
              }
          }
           

          ship-server再定時(shí)從Nacos拉取app數(shù)據(jù)更新到本地Map緩存。

          /**
           * @Author: Ship
           * @Description: sync data to local cache
           * @Date: Created in 2020/12/25
           */

          @Configuration
          public class DataSyncTaskListener implements ApplicationListener<ContextRefreshedEvent{
           
              private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,
                      new ShipThreadFactory("service-sync"true).create());
           
              @NacosInjected
              private NamingService namingService;
           
              @Autowired
              private ServerConfigProperties properties;
           
              @Override
              public void onApplicationEvent(ContextRefreshedEvent event) {
                  if (event.getApplicationContext().getParent() != null) {
                      return;
                  }
                  scheduledPool.scheduleWithFixedDelay(new DataSyncTask(namingService)
                          , 0L, properties.getCacheRefreshInterval(), TimeUnit.SECONDS);
                  WebsocketSyncCacheServer websocketSyncCacheServer = new WebsocketSyncCacheServer(properties.getWebSocketPort());
                  websocketSyncCacheServer.start();
              }
           
           
              class DataSyncTask implements Runnable {
           
                  private NamingService namingService;
           
                  public DataSyncTask(NamingService namingService) {
                      this.namingService = namingService;
                  }
           
                  @Override
                  public void run() {
                      try {
                          // get all app names
                          ListView<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME);
                          if (CollectionUtils.isEmpty(services.getData())) {
                              return;
                          }
                          List<String> appNames = services.getData();
                          // get all instances
                          for (String appName : appNames) {
                              List<Instance> instanceList = namingService.getAllInstances(appName, NacosConstants.APP_GROUP_NAME);
                              if (CollectionUtils.isEmpty(instanceList)) {
                                  continue;
                              }
                              ServiceCache.add(appName, buildServiceInstances(instanceList));
                              List<String> pluginNames = getEnabledPlugins(instanceList);
                              PluginCache.add(appName, pluginNames);
                          }
                          ServiceCache.removeExpired(appNames);
                          PluginCache.removeExpired(appNames);
           
                      } catch (NacosException e) {
                          e.printStackTrace();
                      }
                  }
           
                  private List<String> getEnabledPlugins(List<Instance> instanceList) {
                      Instance instance = instanceList.get(0);
                      Map<String, String> metadata = instance.getMetadata();
                      // plugins: DynamicRoute,Auth
                      String plugins = metadata.getOrDefault("plugins", ShipPluginEnum.DYNAMIC_ROUTE.getName());
                      return Arrays.stream(plugins.split(",")).collect(Collectors.toList());
                  }
           
                  private List<ServiceInstance> buildServiceInstances(List<Instance> instanceList) {
                      List<ServiceInstance> list = new LinkedList<>();
                      instanceList.forEach(instance -> {
                          Map<String, String> metadata = instance.getMetadata();
                          ServiceInstance serviceInstance = new ServiceInstance();
                          serviceInstance.setAppName(metadata.get("appName"));
                          serviceInstance.setIp(instance.getIp());
                          serviceInstance.setPort(instance.getPort());
                          serviceInstance.setVersion(metadata.get("version"));
                          serviceInstance.setWeight((int) instance.getWeight());
                          list.add(serviceInstance);
                      });
                      return list;
                  }
              }
          }
           

          路由規(guī)則數(shù)據(jù)同步

          同時(shí),如果用戶在管理后臺(tái)更新了路由規(guī)則,ship-admin需要推送規(guī)則數(shù)據(jù)到ship-server,這里參考了soul網(wǎng)關(guān)的做法利用websocket在第一次建立連接后進(jìn)行全量同步,此后路由規(guī)則發(fā)生變更就只作增量同步。

          最新 Java 核心技術(shù)教程,都在這了!

          服務(wù)端WebsocketSyncCacheServer:

          /**
           * @Author: Ship
           * @Description:
           * @Date: Created in 2020/12/28
           */

          public class WebsocketSyncCacheServer extends WebSocketServer {
           
              private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheServer.class);
           
              private Gson gson = new GsonBuilder().create();
           
              private MessageHandler messageHandler;
           
              public WebsocketSyncCacheServer(Integer port) {
                  super(new InetSocketAddress(port));
                  this.messageHandler = new MessageHandler();
              }
           
           
              @Override
              public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) {
                  LOGGER.info("server is open");
              }
           
              @Override
              public void onClose(WebSocket webSocket, int i, String s, boolean b) {
                  LOGGER.info("websocket server close...");
              }
           
              @Override
              public void onMessage(WebSocket webSocket, String message) {
                  LOGGER.info("websocket server receive message:\n[{}]", message);
                  this.messageHandler.handler(message);
              }
           
              @Override
              public void onError(WebSocket webSocket, Exception e) {
           
              }
           
              @Override
              public void onStart() {
                  LOGGER.info("websocket server start...");
              }
           
           
              class MessageHandler {
           
                  public void handler(String message) {
                      RouteRuleOperationDTO operationDTO = gson.fromJson(message, RouteRuleOperationDTO.class);
                      if (CollectionUtils.isEmpty(operationDTO.getRuleList())) {
                          return;
                      }
                      Map<String, List<AppRuleDTO>> map = operationDTO.getRuleList()
                              .stream().collect(Collectors.groupingBy(AppRuleDTO::getAppName));
                      if (OperationTypeEnum.INSERT.getCode().equals(operationDTO.getOperationType())
                              || OperationTypeEnum.UPDATE.getCode().equals(operationDTO.getOperationType())) {
                          RouteRuleCache.add(map);
                      } else if (OperationTypeEnum.DELETE.getCode().equals(operationDTO.getOperationType())) {
                          RouteRuleCache.remove(map);
                      }
                  }
              }
          }
           

          客戶端WebsocketSyncCacheClient:

          /**
           * @Author: Ship
           * @Description:
           * @Date: Created in 2020/12/28
           */

          @Component
          public class WebsocketSyncCacheClient {
           
              private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheClient.class);
           
              private WebSocketClient client;
           
              private RuleService ruleService;
           
              private Gson gson = new GsonBuilder().create();
           
              public WebsocketSyncCacheClient(@Value("${ship.server-web-socket-url}") String serverWebSocketUrl,
                                              RuleService ruleService) 
          {
                  if (StringUtils.isEmpty(serverWebSocketUrl)) {
                      throw new ShipException(ShipExceptionEnum.CONFIG_ERROR);
                  }
                  this.ruleService = ruleService;
                  ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1,
                          new ShipThreadFactory("websocket-connect"true).create());
                  try {
                      client = new WebSocketClient(new URI(serverWebSocketUrl)) {
                          @Override
                          public void onOpen(ServerHandshake serverHandshake) {
                              LOGGER.info("client is open");
                              List<AppRuleDTO> list = ruleService.getEnabledRule();
                              String msg = gson.toJson(new RouteRuleOperationDTO(OperationTypeEnum.INSERT, list));
                              send(msg);
                          }
           
                          @Override
                          public void onMessage(String s) {
                          }
           
                          @Override
                          public void onClose(int i, String s, boolean b) {
                          }
           
                          @Override
                          public void onError(Exception e) {
                              LOGGER.error("websocket client error", e);
                          }
                      };
           
                      client.connectBlocking();
                      //使用調(diào)度線程池進(jìn)行斷線重連,30秒進(jìn)行一次
                      executor.scheduleAtFixedRate(() -> {
                          if (client != null && client.isClosed()) {
                              try {
                                  client.reconnectBlocking();
                              } catch (InterruptedException e) {
                                  LOGGER.error("reconnect server fail", e);
                              }
                          }
                      }, 1030, TimeUnit.SECONDS);
           
                  } catch (Exception e) {
                      LOGGER.error("websocket sync cache exception", e);
                      throw new ShipException(e.getMessage());
                  }
              }
           
              public <T> void send(T t) {
                  while (!client.getReadyState().equals(ReadyState.OPEN)) {
                      LOGGER.debug("connecting ...please wait");
                  }
                  client.send(gson.toJson(t));
              }
          }
           

          四、測(cè)試

          4.1 動(dòng)態(tài)路由測(cè)試

          1)本地啟動(dòng)nacos ,sh startup.sh -m standalone

          2)啟動(dòng)ship-admin

          3)本地啟動(dòng)兩個(gè)ship-example實(shí)例。

          實(shí)例1配置:

          ship:
           http:
             app-name: order
             version: gray_1.0
             context-path: /order
             port: 8081
             admin-url: 127.0.0.1:9001

          server:
           port: 8081

          nacos:
           discovery:
             server-addr: 127.0.0.1:8848

          實(shí)例2配置:

          ship:
            http:
              app-name: order
              version: prod_1.0
              context-path: /order
              port: 8082
              admin-url: 127.0.0.1:9001

          server:
            port: 8082

          nacos:
            discovery:
              server-addr: 127.0.0.1:8848
          4)在數(shù)據(jù)庫(kù)添加路由規(guī)則配置,該規(guī)則表示當(dāng)http header 中的name=ship時(shí)請(qǐng)求路由到gray_1.0版本的節(jié)點(diǎn)。
          啟動(dòng)ship-server,看到以下日志時(shí)則可以進(jìn)行測(cè)試了。
             2021-01-02 19:57:09.159  INFO 30413 --- [SocketWorker-29] cn.sp.sync.WebsocketSyncCacheServer      : websocket server receive message:
             [{"operationType":"INSERT","ruleList":[{"id":1,"appId":5,"appName":"order","version":"gray_1.0","matchObject":"HEADER","matchKey":"name","matchMethod":1,"matchRule":"ship","priority":50}]}]
          用Postman請(qǐng)求http://localhost:9000/order/user/add,POST方式,header設(shè)置name=ship,可以看到只有實(shí)例1有日志顯示。
          ==========add user,version:gray_1.0

          4.2 性能壓測(cè)

          壓測(cè)環(huán)境:

          MacBook Pro 13英寸

          處理器 2.3 GHz 四核Intel Core i7

          內(nèi)存 16 GB 3733 MHz LPDDR4X

          后端節(jié)點(diǎn)個(gè)數(shù)一個(gè)

          壓測(cè)工具:wrk

          壓測(cè)結(jié)果:20個(gè)線程,500個(gè)連接數(shù),吞吐量大概每秒9400個(gè)請(qǐng)求。

          五、總結(jié)

          千里之行始于足下,開(kāi)始以為寫一個(gè)網(wǎng)關(guān)會(huì)很難,但當(dāng)你實(shí)際開(kāi)始行動(dòng)時(shí)就會(huì)發(fā)現(xiàn)其實(shí)沒(méi)那么難,所以邁出第一步很重要。過(guò)程中也遇到了很多問(wèn)題,還在github上給soul和nacos這兩個(gè)開(kāi)源項(xiàng)目提了兩個(gè)issue,后來(lái)發(fā)現(xiàn)是自己的問(wèn)題,尷尬??。

          本文代碼已全部上傳到 github:https://github.com/2YSP/ship-gate,最后,希望此文對(duì)你有所幫助。

          參考資料:

          https://nacos.io/zh-cn/docs/quick-start.html
          https://dromara.org/website/zh-cn/docs/soul/soul.html
          https://docs.spring.io/spring-framework/docs/5.1.7.RELEASE/spring-framework-reference/web-reactive.html#webflux
          https://github.com/TooTallNate/Java-WebSocket






          關(guān)注Java技術(shù)??锤喔韶?/strong>



          獲取 Spring Boot 實(shí)戰(zhàn)筆記!
          瀏覽 36
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  好逼av| 是先锋男人的网站 | 少妇厨房偷情理伦 | 一级片在线免费看 | 中文无码视频直接看 |