<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          實(shí)戰(zhàn):如何設(shè)計(jì)一個(gè)高性能網(wǎng)關(guān)

          共 57386字,需瀏覽 115分鐘

           ·

          2021-03-20 09:32

          你知道的越多,不知道的就越多,業(yè)余的像一棵小草!

          你來(lái),我們一起精進(jìn)!你不來(lái),我和你的競(jìng)爭(zhēng)對(duì)手一起精進(jìn)!

          編輯:業(yè)余草

          cnblogs.com/2YSP/p/14223892.html

          推薦:https://www.xttblog.com/?p=5165


          01


          背景


          最近看了soul網(wǎng)關(guān)的設(shè)計(jì),突然就來(lái)了興趣準(zhǔn)備自己從零開始寫一個(gè)高性能的網(wǎng)關(guān)。經(jīng)過(guò)兩周時(shí)間的開發(fā),我的網(wǎng)關(guān)ship-gate核心功能基本都已完成,最大的缺陷就是前端功底太差沒有管理后臺(tái)??。

          02


          設(shè)計(jì)

          2.1 技術(shù)選型

          網(wǎng)關(guān)是所有請(qǐng)求的入口,所以要求有很高的吞吐量,為了實(shí)現(xiàn)這點(diǎn)可以使用請(qǐng)求異步化來(lái)解決。目前一般有以下兩種方案:
          • Tomcat/Jetty+NIO+Servlet3
          Servlet3已經(jīng)支持異步,這種方案使用比較多,京東,有贊和Zuul,都用的是這種方案。
          • Netty+NIO
          Netty為高并發(fā)而生,目前唯品會(huì)的網(wǎng)關(guān)使用這個(gè)策略,在唯品會(huì)的技術(shù)文章中在相同的情況下Netty是每秒30w+的吞吐量,Tomcat是13w+,可以看出是有一定的差距的,但是Netty需要自己處理HTTP協(xié)議,這一塊比較麻煩。
          后面發(fā)現(xiàn)Soul網(wǎng)關(guān)是基于Spring WebFlux(底層Netty)的,不用太關(guān)心HTTP協(xié)議的處理,于是決定也用Spring WebFlux。
          網(wǎng)關(guān)的第二個(gè)特點(diǎn)是具備可擴(kuò)展性,比如Netflix Zuul有preFilters,postFilters等在不同的階段方便處理不同的業(yè)務(wù),基于責(zé)任鏈模式將請(qǐng)求進(jìn)行鏈?zhǔn)教幚砑纯蓪?shí)現(xiàn)。
          在微服務(wù)架構(gòu)下,服務(wù)都會(huì)進(jìn)行多實(shí)例部署來(lái)保證高可用,請(qǐng)求到達(dá)網(wǎng)關(guān)時(shí),網(wǎng)關(guān)需要根據(jù)URL找到所有可用的實(shí)例,這時(shí)就需要服務(wù)注冊(cè)和發(fā)現(xiàn)功能,即注冊(cè)中心。
          現(xiàn)在流行的注冊(cè)中心有Apache的Zookeeper和阿里的Nacos兩種(consul有點(diǎn)小眾),因?yàn)橹皩慠PC框架時(shí)已經(jīng)用過(guò)了Zookeeper,所以這次就選擇了Nacos。

          2.2 需求清單

          首先要明確目標(biāo),即開發(fā)一個(gè)具備哪些特性的網(wǎng)關(guān),總結(jié)下后如下:
          自定義路由規(guī)則
          可基于version的路由規(guī)則設(shè)置,路由對(duì)象包括DEFAUL,HEADER和QUERY三種,匹配方式包括=、regex、like三種。
          跨語(yǔ)言
          HTTP協(xié)議天生跨語(yǔ)言
          高性能
          Netty本身就是一款高性能的通信框架,同時(shí)server將一些路由規(guī)則等數(shù)據(jù)緩存到JVM內(nèi)存避免請(qǐng)求admin服務(wù)。
          高可用
          支持集群模式防止單節(jié)點(diǎn)故障,無(wú)狀態(tài)。
          灰度發(fā)布
          灰度發(fā)布(又名金絲雀發(fā)布)是指在黑與白之間,能夠平滑過(guò)渡的一種發(fā)布方式。在其上可以進(jìn)行A/B testing,即讓一部分用戶繼續(xù)用產(chǎn)品特性A,一部分用戶開始用產(chǎn)品特性B,如果用戶對(duì)B沒有什么反對(duì)意見,那么逐步擴(kuò)大范圍,把所有用戶都遷移到B上面來(lái)。通過(guò)特性一可以實(shí)現(xiàn)。
          接口鑒權(quán)
          基于責(zé)任鏈模式,用戶開發(fā)自己的鑒權(quán)插件即可。
          負(fù)載均衡
          支持多種負(fù)載均衡算法,如隨機(jī),輪詢,加權(quán)輪詢等。利用SPI機(jī)制可以根據(jù)配置進(jìn)行動(dòng)態(tài)加載。

          2.3 架構(gòu)設(shè)計(jì)

          在參考了一些優(yōu)秀的網(wǎng)關(guān)Zuul,Spring Cloud Gateway,Soul后,將項(xiàng)目劃分為以下幾個(gè)模塊。

          它們之間的關(guān)系如圖:

          網(wǎng)關(guān)設(shè)計(jì)

          注意: 這張圖與實(shí)際實(shí)現(xiàn)有點(diǎn)出入,Nacos push到本地緩存的那個(gè)環(huán)節(jié)沒有實(shí)現(xiàn),目前只有ship-sever定時(shí)輪詢pull的過(guò)程。ship-admin從Nacos獲取注冊(cè)服務(wù)信息的過(guò)程,也改成了ServiceA啟動(dòng)時(shí)主動(dòng)發(fā)生HTTP請(qǐng)求通知ship-admin。

          2.4 表結(jié)構(gòu)設(shè)計(jì)



          03


          編碼


          3.1 ship-client-spring-boot-starter
          首先創(chuàng)建一個(gè)spring-boot-starter命名為ship-client-spring-boot-starter,不知道如何自定義starter的可以看我以前寫的《開發(fā)自己的starter》。
          其核心類 AutoRegisterListener 就是在項(xiàng)目啟動(dòng)時(shí)做了兩件事:
          1.將服務(wù)信息注冊(cè)到Nacos注冊(cè)中心
          2.通知ship-admin服務(wù)上線了并注冊(cè)下線hook。
          代碼如下:
          * Created by 2YSP on 2020/12/21
          */
          public class AutoRegisterListener implements ApplicationListener<ContextRefreshedEvent{

             private final static Logger LOGGER = LoggerFactory.getLogger(AutoRegisterListener.class);

             private volatile AtomicBoolean registered = new AtomicBoolean(false);

             private final ClientConfigProperties properties;

             @NacosInjected
             private NamingService namingService;

             @Autowired
             private RequestMappingHandlerMapping handlerMapping;

             private final ExecutorService pool;

             /**
          * url list to ignore
          */

             private static List<String> ignoreUrlList = new LinkedList<>();

             static {
                 ignoreUrlList.add("/error");
             }

             public AutoRegisterListener(ClientConfigProperties properties) {
                 if (!check(properties)) {
                     LOGGER.error("client config port,contextPath,appName adminUrl and version can't be empty!");
                     throw new ShipException("client config port,contextPath,appName adminUrl and version can't be empty!");
                 }
                 this.properties = properties;
                 pool = new ThreadPoolExecutor(140, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
             }

             /**
          * check the ClientConfigProperties
          *
          @param properties
          @return
          */

             private boolean check(ClientConfigProperties properties) {
                 if (properties.getPort() == null| properties.getContextPath() == null
                        | properties.getVersion() == null| properties.getAppName() == null
                        | properties.getAdminUrl() == null) {
                     return false;
                 }
                 return true;
             }


             @Override
             public void onApplicationEvent(ContextRefreshedEvent event) {
                 if (!registered.compareAndSet(falsetrue)) {
                     return;
                 }
                 doRegister();
                 registerShutDownHook();
             }

             /**
          * send unregister request to admin when jvm shutdown
          */

             private void registerShutDownHook() {
                 final String url = "http://" + properties.getAdminUrl() + AdminConstants.UNREGISTER_PATH;
                 final UnregisterAppDTO unregisterAppDTO = new UnregisterAppDTO();
                 unregisterAppDTO.setAppName(properties.getAppName());
                 unregisterAppDTO.setVersion(properties.getVersion());
                 unregisterAppDTO.setIp(IpUtil.getLocalIpAddress());
                 unregisterAppDTO.setPort(properties.getPort());
                 Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                     OkhttpTool.doPost(url, unregisterAppDTO);
                     LOGGER.info("[{}:{}] unregister from ship-admin success!", unregisterAppDTO.getAppName(), unregisterAppDTO.getVersion());
                 }));
             }

             /**
          * register all interface info to register center
          */

             private void doRegister() {
                 Instance instance = new Instance();
                 instance.setIp(IpUtil.getLocalIpAddress());
                 instance.setPort(properties.getPort());
                 instance.setEphemeral(true);
                 Map<String, String> metadataMap = new HashMap<>();
                 metadataMap.put("version", properties.getVersion());
                 metadataMap.put("appName", properties.getAppName());
                 instance.setMetadata(metadataMap);
                 try {
                     namingService.registerInstance(properties.getAppName(), NacosConstants.APP_GROUP_NAME, instance);
                 } catch (NacosException e) {
                     LOGGER.error("register to nacos fail", e);
                     throw new ShipException(e.getErrCode(), e.getErrMsg());
                 }
                 LOGGER.info("register interface info to nacos success!");
                 // send register request to ship-admin
                 String url = "http://" + properties.getAdminUrl() + AdminConstants.REGISTER_PATH;
                 RegisterAppDTO registerAppDTO = buildRegisterAppDTO(instance);
                 OkhttpTool.doPost(url, registerAppDTO);
                 LOGGER.info("register to ship-admin success!");
             }


             private RegisterAppDTO buildRegisterAppDTO(Instance instance) {
                 RegisterAppDTO registerAppDTO = new RegisterAppDTO();
                 registerAppDTO.setAppName(properties.getAppName());
                 registerAppDTO.setContextPath(properties.getContextPath());
                 registerAppDTO.setIp(instance.getIp());
                 registerAppDTO.setPort(instance.getPort());
                 registerAppDTO.setVersion(properties.getVersion());
                 return registerAppDTO;
             }
          }
          3.2 ship-server
          ship-sever項(xiàng)目主要包括了兩個(gè)部分內(nèi)容, 1.請(qǐng)求動(dòng)態(tài)路由的主流程 2.本地緩存數(shù)據(jù)和ship-admin及nacos同步,這部分在后面3.3再講。
          ship-server實(shí)現(xiàn)動(dòng)態(tài)路由的原理是利用WebFilter攔截請(qǐng)求,然后將請(qǐng)求教給plugin chain去鏈?zhǔn)教幚怼?/span>
          PluginFilter根據(jù)URL解析出appName,然后將啟用的plugin組裝成plugin chain。
          public class PluginFilter implements WebFilter {

             private ServerConfigProperties properties;

             public PluginFilter(ServerConfigProperties properties) {
                 this.properties = properties;
             }

             @Override
             public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
                 String appName = parseAppName(exchange);
                 if (CollectionUtils.isEmpty(ServiceCache.getAllInstances(appName))) {
                     throw new ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
                 }
                 PluginChain pluginChain = new PluginChain(properties, appName);
                 pluginChain.addPlugin(new DynamicRoutePlugin(properties));
                 pluginChain.addPlugin(new AuthPlugin(properties));
                 return pluginChain.execute(exchange, pluginChain);
             }

             private String parseAppName(ServerWebExchange exchange) {
                 RequestPath path = exchange.getRequest().getPath();
                 String appName = path.value().split("/")[1];
                 return appName;
             }
          }```

          PluginChain繼承了AbstractShipPlugin并持有所有要執(zhí)行的插件。

          ```java
          @Author: Ship
          @Description:
          @Date: Created in 2020/12/25
          */
          public class PluginChain extends AbstractShipPlugin {
             /**
          * the pos point to current plugin
          */

             private int pos;
             /**
          * the plugins of chain
          */

             private List<ShipPlugin> plugins;

             private final String appName;

             public PluginChain(ServerConfigProperties properties, String appName) {
                 super(properties);
                 this.appName = appName;
             }

             /**
          * add enabled plugin to chain
          *
          @param shipPlugin
          */

             public void addPlugin(ShipPlugin shipPlugin) {
                 if (plugins == null) {
                     plugins = new ArrayList<>();
                 }
                 if (!PluginCache.isEnabled(appName, shipPlugin.name())) {
                     return;
                 }
                 plugins.add(shipPlugin);
                 // order by the plugin's order
                 plugins.sort(Comparator.comparing(ShipPlugin::order));
             }

             @Override
             public Integer order() {
                 return null;
             }

             @Override
             public String name() {
                 return null;
             }

             @Override
             public Mono<Void> execute(ServerWebExchange exchange, PluginChain pluginChain) {
                 if (pos == plugins.size()) {
                     return exchange.getResponse().setComplete();
                 }
                 return pluginChain.plugins.get(pos++).execute(exchange, pluginChain);
             }

             public String getAppName() {
                 return appName;
             }

          }
          AbstractShipPlugin實(shí)現(xiàn)了ShipPlugin接口,并持有ServerConfigProperties配置對(duì)象。
          public abstract class AbstractShipPlugin implements ShipPlugin {

             protected ServerConfigProperties properties;

             public AbstractShipPlugin(ServerConfigProperties properties) {
                 this.properties = properties;
             }
          }```

          ShipPlugin接口定義了所有插件必須實(shí)現(xiàn)的三個(gè)方法order(),name()和execute()。

          ```java
          public interface ShipPlugin {
             /**
          * lower values have higher priority
          *
          @return
          */

             Integer order();

             /**
          * return current plugin name
          *
          @return
          */

             String name();

             Mono<Void> execute(ServerWebExchange exchange,PluginChain pluginChain);

          }```

          DynamicRoutePlugin繼承了抽象類AbstractShipPlugin,包含了動(dòng)態(tài)路由的主要業(yè)務(wù)邏輯。

          ```java
          @Author: Ship
          @Description:
          @Date: Created in 2020/12/25
          */
          public class DynamicRoutePlugin extends AbstractShipPlugin {

             private final static Logger LOGGER = LoggerFactory.getLogger(DynamicRoutePlugin.class);

             private static WebClient webClient;

             private static final Gson gson = new GsonBuilder().create();

             static {
                 HttpClient httpClient = HttpClient.create()
                         .tcpConfiguration(client ->
                                 client.doOnConnected(conn ->
                                         conn.addHandlerLast(new ReadTimeoutHandler(3))
                                                 .addHandlerLast(new WriteTimeoutHandler(3)))
                                         .option(ChannelOption.TCP_NODELAY, true)
                         );
                 webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient))
                         .build();
             }

             public DynamicRoutePlugin(ServerConfigProperties properties) {
                 super(properties);
             }

             @Override
             public Integer order() {
                 return ShipPluginEnum.DYNAMIC_ROUTE.getOrder();
             }

             @Override
             public String name() {
                 return ShipPluginEnum.DYNAMIC_ROUTE.getName();
             }

             @Override
             public Mono<Void> execute(ServerWebExchange exchange, PluginChain pluginChain) {
                 String appName = pluginChain.getAppName();
                 ServiceInstance serviceInstance = chooseInstance(appName, exchange.getRequest());
          //        LOGGER.info("selected instance is [{}]", gson.toJson(serviceInstance));
                 // request service
                 String url = buildUrl(exchange, serviceInstance);
                 return forward(exchange, url);
             }

             /**
          * forward request to backend service
          *
          @param exchange
          @param url
          @return
          */

             private Mono<Void> forward(ServerWebExchange exchange, String url) {
                 ServerHttpRequest request = exchange.getRequest();
                 ServerHttpResponse response = exchange.getResponse();
                 HttpMethod method = request.getMethod();

                 WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(url).headers((headers) -> {
                     headers.addAll(request.getHeaders());
                 });

                 WebClient.RequestHeadersSpec<?> reqHeadersSpec;
                 if (requireHttpBody(method)) {
                     reqHeadersSpec = requestBodySpec.body(BodyInserters.fromDataBuffers(request.getBody()));
                 } else {
                     reqHeadersSpec = requestBodySpec;
                 }
                 // nio->callback->nio
                 return reqHeadersSpec.exchange().timeout(Duration.ofMillis(properties.getTimeOutMillis()))
                         .onErrorResume(ex -> {
                             return Mono.defer(() -> {
                                 String errorResultJson = "";
                                 if (ex instanceof TimeoutException) {
                                     errorResultJson = "{\"code\":5001,\"message\":\"network timeout\"}";
                                 } else {
                                     errorResultJson = "{\"code\":5000,\"message\":\"system error\"}";
                                 }
                                 return ShipResponseUtil.doResponse(exchange, errorResultJson);
                             }).then(Mono.empty());
                         }).flatMap(backendResponse -> {
                             response.setStatusCode(backendResponse.statusCode());
                             response.getHeaders().putAll(backendResponse.headers().asHttpHeaders());
                             return response.writeWith(backendResponse.bodyToFlux(DataBuffer.class));
                         });
             }

             /**
          * weather the http method need http body
          *
          @param method
          @return
          */

             private boolean requireHttpBody(HttpMethod method) {
                 if (method.equals(HttpMethod.POST)| method.equals(HttpMethod.PUT)| method.equals(HttpMethod.PATCH)) {
                     return true;
                 }
                 return false;
             }

             private String buildUrl(ServerWebExchange exchange, ServiceInstance serviceInstance) {
                 ServerHttpRequest request = exchange.getRequest();
                 String query = request.getURI().getQuery();
                 String path = request.getPath().value().replaceFirst("/" + serviceInstance.getAppName(), "");
                 String url = "http://" + serviceInstance.getIp() + ":" + serviceInstance.getPort() + path;
                 if (!StringUtils.isEmpty(query)) {
                     url = url + "?" + query;
                 }
                 return url;
             }


             /**
          * choose an ServiceInstance according to route rule config and load balancing algorithm
          *
          @param appName
          @param request
          @return
          */

             private ServiceInstance chooseInstance(String appName, ServerHttpRequest request) {
                 List<ServiceInstance> serviceInstances = ServiceCache.getAllInstances(appName);
                 if (CollectionUtils.isEmpty(serviceInstances)) {
                     LOGGER.error("service instance of {} not find", appName);
                     throw new ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
                 }
                 String version = matchAppVersion(appName, request);
                 if (StringUtils.isEmpty(version)) {
                     throw new ShipException("match app version error");
                 }
                 // filter serviceInstances by version
                 List<ServiceInstance> instances = serviceInstances.stream().filter(i -> i.getVersion().equals(version)).collect(Collectors.toList());
                 //Select an instance based on the load balancing algorithm
                 LoadBalance loadBalance = LoadBalanceFactory.getInstance(properties.getLoadBalance(), appName, version);
                 ServiceInstance serviceInstance = loadBalance.chooseOne(instances);
                 return serviceInstance;
             }


             private String matchAppVersion(String appName, ServerHttpRequest request) {
                 List<AppRuleDTO> rules = RouteRuleCache.getRules(appName);
                 rules.sort(Comparator.comparing(AppRuleDTO::getPriority).reversed());
                 for (AppRuleDTO rule : rules) {
                     if (match(rule, request)) {
                         return rule.getVersion();
                     }
                 }
                 return null;
             }


             private boolean match(AppRuleDTO rule, ServerHttpRequest request) {
                 String matchObject = rule.getMatchObject();
                 String matchKey = rule.getMatchKey();
                 String matchRule = rule.getMatchRule();
                 Byte matchMethod = rule.getMatchMethod();
                 if (MatchObjectEnum.DEFAULT.getCode().equals(matchObject)) {
                     return true;
                 } else if (MatchObjectEnum.QUERY.getCode().equals(matchObject)) {
                     String param = request.getQueryParams().getFirst(matchKey);
                     if (!StringUtils.isEmpty(param)) {
                         return StringTools.match(param, matchMethod, matchRule);
                     }
                 } else if (MatchObjectEnum.HEADER.getCode().equals(matchObject)) {
                     HttpHeaders headers = request.getHeaders();
                     String headerValue = headers.getFirst(matchKey);
                     if (!StringUtils.isEmpty(headerValue)) {
                         return StringTools.match(headerValue, matchMethod, matchRule);
                     }
                 }
                 return false;
             }

          }

          3.3 數(shù)據(jù)同步

          app數(shù)據(jù)同步
          后臺(tái)服務(wù)(如訂單服務(wù))啟動(dòng)時(shí),只將服務(wù)名,版本,ip地址和端口號(hào)注冊(cè)到了Nacos,并沒有實(shí)例的權(quán)重和啟用的插件信息怎么辦?
          一般在線的實(shí)例權(quán)重和插件列表都是在管理界面配置,然后動(dòng)態(tài)生效的,所以需要ship-admin定時(shí)更新實(shí)例的權(quán)重和插件信息到注冊(cè)中心。
          對(duì)應(yīng)代碼ship-admin的NacosSyncListener
          @Author: Ship
          @Description:
          @Date: Created in 2020/12/30
          */
          @Configuration
          public class NacosSyncListener implements ApplicationListener<ContextRefreshedEvent{

             private static final Logger LOGGER = LoggerFactory.getLogger(NacosSyncListener.class);

             private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,
                     new ShipThreadFactory("nacos-sync"true).create());

             @NacosInjected
             private NamingService namingService;

             @Value("${nacos.discovery.server-addr}")
             private String baseUrl;

             @Resource
             private AppService appService;

             @Override
             public void onApplicationEvent(ContextRefreshedEvent event) {
                 if (event.getApplicationContext().getParent() != null) {
                     return;
                 }
                 String url = "http://" + baseUrl + NacosConstants.INSTANCE_UPDATE_PATH;
                 scheduledPool.scheduleWithFixedDelay(new NacosSyncTask(namingService, url, appService), 030L, TimeUnit.SECONDS);
             }

             class NacosSyncTask implements Runnable {

                 private NamingService namingService;

                 private String url;

                 private AppService appService;

                 private Gson gson = new GsonBuilder().create();

                 public NacosSyncTask(NamingService namingService, String url, AppService appService) {
                     this.namingService = namingService;
                     this.url = url;
                     this.appService = appService;
                 }

                 /**
          * Regular update weight,enabled plugins to nacos instance
          */

                 @Override
                 public void run() {
                     try {
                         // get all app names
                         ListView<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME);
                         if (CollectionUtils.isEmpty(services.getData())) {
                             return;
                         }
                         List<String> appNames = services.getData();
                         List<AppInfoDTO> appInfos = appService.getAppInfos(appNames);
                         for (AppInfoDTO appInfo : appInfos) {
                             if (CollectionUtils.isEmpty(appInfo.getInstances())) {
                                 continue;
                             }
                             for (ServiceInstance instance : appInfo.getInstances()) {
                                 Map<String, Object> queryMap = buildQueryMap(appInfo, instance);
                                 String resp = OkhttpTool.doPut(url, queryMap, "");
                                 LOGGER.debug("response :{}", resp);
                             }
                         }

                     } catch (Exception e) {
                         LOGGER.error("nacos sync task error", e);
                     }
                 }

                 private Map<String, Object> buildQueryMap(AppInfoDTO appInfo, ServiceInstance instance) {
                     Map<String, Object> map = new HashMap<>();
                     map.put("serviceName", appInfo.getAppName());
                     map.put("groupName", NacosConstants.APP_GROUP_NAME);
                     map.put("ip", instance.getIp());
                     map.put("port", instance.getPort());
                     map.put("weight", instance.getWeight().doubleValue());
                     NacosMetadata metadata = new NacosMetadata();
                     metadata.setAppName(appInfo.getAppName());
                     metadata.setVersion(instance.getVersion());
                     metadata.setPlugins(String.join(",", appInfo.getEnabledPlugins()));
                     map.put("metadata", StringTools.urlEncode(gson.toJson(metadata)));
                     map.put("ephemeral"true);
                     return map;
                 }
             }
          }
          ship-server再定時(shí)從Nacos拉取app數(shù)據(jù)更新到本地Map緩存。
          @Author: Ship
          @Description: sync data to local cache
          @Date: Created in 2020/12/25
          */
          @Configuration
          public class DataSyncTaskListener implements ApplicationListener<ContextRefreshedEvent{

             private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,
                     new ShipThreadFactory("service-sync"true).create());

             @NacosInjected
             private NamingService namingService;

             @Autowired
             private ServerConfigProperties properties;

             @Override
             public void onApplicationEvent(ContextRefreshedEvent event) {
                 if (event.getApplicationContext().getParent() != null) {
                     return;
                 }
                 scheduledPool.scheduleWithFixedDelay(new DataSyncTask(namingService)
                         , 0L, properties.getCacheRefreshInterval(), TimeUnit.SECONDS);
                 WebsocketSyncCacheServer websocketSyncCacheServer = new WebsocketSyncCacheServer(properties.getWebSocketPort());
                 websocketSyncCacheServer.start();
             }


             class DataSyncTask implements Runnable {

                 private NamingService namingService;

                 public DataSyncTask(NamingService namingService) {
                     this.namingService = namingService;
                 }

                 @Override
                 public void run() {
                     try {
                         // get all app names
                         ListView<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME);
                         if (CollectionUtils.isEmpty(services.getData())) {
                             return;
                         }
                         List<String> appNames = services.getData();
                         // get all instances
                         for (String appName : appNames) {
                             List<Instance> instanceList = namingService.getAllInstances(appName, NacosConstants.APP_GROUP_NAME);
                             if (CollectionUtils.isEmpty(instanceList)) {
                                 continue;
                             }
                             ServiceCache.add(appName, buildServiceInstances(instanceList));
                             List<String> pluginNames = getEnabledPlugins(instanceList);
                             PluginCache.add(appName, pluginNames);
                         }
                         ServiceCache.removeExpired(appNames);
                         PluginCache.removeExpired(appNames);

                     } catch (NacosException e) {
                         e.printStackTrace();
                     }
                 }

                 private List<String> getEnabledPlugins(List<Instance> instanceList) {
                     Instance instance = instanceList.get(0);
                     Map<String, String> metadata = instance.getMetadata();
                     // plugins: DynamicRoute,Auth
                     String plugins = metadata.getOrDefault("plugins", ShipPluginEnum.DYNAMIC_ROUTE.getName());
                     return Arrays.stream(plugins.split(",")).collect(Collectors.toList());
                 }

                 private List<ServiceInstance> buildServiceInstances(List<Instance> instanceList) {
                     List<ServiceInstance> list = new LinkedList<>();
                     instanceList.forEach(instance -> {
                         Map<String, String> metadata = instance.getMetadata();
                         ServiceInstance serviceInstance = new ServiceInstance();
                         serviceInstance.setAppName(metadata.get("appName"));
                         serviceInstance.setIp(instance.getIp());
                         serviceInstance.setPort(instance.getPort());
                         serviceInstance.setVersion(metadata.get("version"));
                         serviceInstance.setWeight((int) instance.getWeight());
                         list.add(serviceInstance);
                     });
                     return list;
                 }
             }
          }
          路由規(guī)則數(shù)據(jù)同步
          同時(shí),如果用戶在管理后臺(tái)更新了路由規(guī)則,ship-admin需要推送規(guī)則數(shù)據(jù)到ship-server,這里參考了soul網(wǎng)關(guān)的做法利用websocket在第一次建立連接后進(jìn)行全量同步,此后路由規(guī)則發(fā)生變更就只作增量同步。
          服務(wù)端WebsocketSyncCacheServer:
          @Author: Ship
          @Description:
          @Date: Created in 2020/12/28
          */
          public class WebsocketSyncCacheServer extends WebSocketServer {

             private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheServer.class);

             private Gson gson = new GsonBuilder().create();

             private MessageHandler messageHandler;

             public WebsocketSyncCacheServer(Integer port) {
                 super(new InetSocketAddress(port));
                 this.messageHandler = new MessageHandler();
             }


             @Override
             public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) {
                 LOGGER.info("server is open");
             }

             @Override
             public void onClose(WebSocket webSocket, int i, String s, boolean b) {
                 LOGGER.info("websocket server close...");
             }

             @Override
             public void onMessage(WebSocket webSocket, String message) {
                 LOGGER.info("websocket server receive message:\n[{}]", message);
                 this.messageHandler.handler(message);
             }

             @Override
             public void onError(WebSocket webSocket, Exception e) {

             }

             @Override
             public void onStart() {
                 LOGGER.info("websocket server start...");
             }


             class MessageHandler {

                 public void handler(String message) {
                     RouteRuleOperationDTO operationDTO = gson.fromJson(message, RouteRuleOperationDTO.class);
                     if (CollectionUtils.isEmpty(operationDTO.getRuleList())) {
                         return;
                     }
                     Map<String, List<AppRuleDTO>> map = operationDTO.getRuleList()
                             .stream().collect(Collectors.groupingBy(AppRuleDTO::getAppName));
                     if (OperationTypeEnum.INSERT.getCode().equals(operationDTO.getOperationType())
                            | OperationTypeEnum.UPDATE.getCode().equals(operationDTO.getOperationType())) {
                         RouteRuleCache.add(map);
                     } else if (OperationTypeEnum.DELETE.getCode().equals(operationDTO.getOperationType())) {
                         RouteRuleCache.remove(map);
                     }
                 }
             }
          }
          客戶端WebsocketSyncCacheClient:
          @Author: Ship
          @Description:
          @Date: Created in 2020/12/28
          */
          @Component
          public class WebsocketSyncCacheClient {

             private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheClient.class);

             private WebSocketClient client;

             private RuleService ruleService;

             private Gson gson = new GsonBuilder().create();

             public WebsocketSyncCacheClient(@Value("${ship.server-web-socket-url}") String serverWebSocketUrl,
          RuleService ruleService) 
          {
                 if (StringUtils.isEmpty(serverWebSocketUrl)) {
                     throw new ShipException(ShipExceptionEnum.CONFIG_ERROR);
                 }
                 this.ruleService = ruleService;
                 ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1,
                         new ShipThreadFactory("websocket-connect"true).create());
                 try {
                     client = new WebSocketClient(new URI(serverWebSocketUrl)) {
                         @Override
                         public void onOpen(ServerHandshake serverHandshake) {
                             LOGGER.info("client is open");
                             List<AppRuleDTO> list = ruleService.getEnabledRule();
                             String msg = gson.toJson(new RouteRuleOperationDTO(OperationTypeEnum.INSERT, list));
                             send(msg);
                         }

                         @Override
                         public void onMessage(String s) {
                         }

                         @Override
                         public void onClose(int i, String s, boolean b) {
                         }

                         @Override
                         public void onError(Exception e) {
                             LOGGER.error("websocket client error", e);
                         }
                     };

                     client.connectBlocking();
                     //使用調(diào)度線程池進(jìn)行斷線重連,30秒進(jìn)行一次
                     executor.scheduleAtFixedRate(() -> {
                         if (client != null && client.isClosed()) {
                             try {
                                 client.reconnectBlocking();
                             } catch (InterruptedException e) {
                                 LOGGER.error("reconnect server fail", e);
                             }
                         }
                     }, 1030, TimeUnit.SECONDS);

                 } catch (Exception e) {
                     LOGGER.error("websocket sync cache exception", e);
                     throw new ShipException(e.getMessage());
                 }
             }

             public <T> void send(T t) {
                 while (!client.getReadyState().equals(ReadyState.OPEN)) {
                     LOGGER.debug("connecting ...please wait");
                 }
                 client.send(gson.toJson(t));
             }
          }



          04


          測(cè)試

          4.1動(dòng)態(tài)路由測(cè)試

          1、本地啟動(dòng)nacos ,sh startup.sh -m standalone
          2、啟動(dòng)ship-admin
          3、本地啟動(dòng)兩個(gè)ship-example實(shí)例。
          實(shí)例1配置:
          ship:
           http:
             app-name: order
             version: gray_1.0
             context-path: /order
             port: 8081
             admin-url: 127.0.0.1:9001

           server:
           port: 8081

           nacos:
           discovery:
             server-addr: 127.0.0.1:8848
          實(shí)例2配置:
          ship:
           http:
             app-name: order
             version: prod_1.0
             context-path: /order
             port: 8082
             admin-url: 127.0.0.1:9001

           server:
           port: 8082

           nacos:
           discovery:
             server-addr: 127.0.0.1:8848
          4、在數(shù)據(jù)庫(kù)添加路由規(guī)則配置,該規(guī)則表示當(dāng)http header 中的name=ship時(shí)請(qǐng)求路由到gray_1.0版本的節(jié)點(diǎn)。

          5、啟動(dòng)ship-server,看到以下日志時(shí)則可以進(jìn)行測(cè)試了。
          2021-01-02 19:57:09.159  INFO 30413 --- [SocketWorker-29] cn.sp.sync.WebsocketSyncCacheServer      : websocket server receive message:
           [{"operationType":"INSERT","ruleList":[{"id":1,"appId":5,"appName":"order","version":"gray_1.0","matchObject":"HEADER","matchKey":"name","matchMethod":1,"matchRule":"ship","priority":50}]}]
          6、用Postman請(qǐng)求http://localhost:9000/order/user/add,POST方式,header設(shè)置name=ship,可以看到只有實(shí)例1有日志顯示。
          ==========add user,version:gray_1.0

          4.2性能壓測(cè)

          壓測(cè)環(huán)境:
          • MacBook Pro 13英寸
          • 處理器 2.3 GHz 四核Intel Core i7
          • 內(nèi)存 16 GB 3733 MHz LPDDR4X
          • 后端節(jié)點(diǎn)個(gè)數(shù)一個(gè)
          • 壓測(cè)工具:wrk
          • 壓測(cè)結(jié)果:20個(gè)線程,500個(gè)連接數(shù),吞吐量大概每秒9400個(gè)請(qǐng)求。

          壓測(cè)結(jié)果


          05


          總結(jié)


          千里之行始于足下,開始以為寫一個(gè)網(wǎng)關(guān)會(huì)很難,但當(dāng)你實(shí)際開始行動(dòng)時(shí)就會(huì)發(fā)現(xiàn)其實(shí)沒那么難,所以邁出第一步很重要。過(guò)程中也許你會(huì)遇到了很多問題,但你會(huì)收獲的更多!需要示例源碼的可以加我微信:dart996,免費(fèi)發(fā)!

          瀏覽 39
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产男女免费啪啪 | 日本黄色视频在线免费 | www.4438AV | 一级A片一毛片大全 | av黄色电影一区天堂一区二区三区 |