阿里終面:如何設計一個高性能網關?
原文:https://www.cnblogs.com/2YSP/p/14223892.html
一、前言
最近在 github 上看了 soul 網關的設計,突然就來了興趣準備自己從零開始寫一個高性能的網關。經過兩周時間的開發(fā),我的網關 ship-gate 核心功能基本都已完成,最大的缺陷就是前端功底太差沒有管理后臺 ??。
二、設計
2.1 技術選型
網關是所有請求的入口,所以要求有很高的吞吐量,為了實現這點可以使用請求異步化來解決。目前一般有以下兩種方案:
Tomcat/Jetty+NIO+Servlet3
Servlet3 已經支持異步,這種方案使用比較多,京東,有贊和 Zuul,都用的是這種方案。
Netty+NIO
Netty 為高并發(fā)而生,目前唯品會的網關使用這個策略,在唯品會的技術文章中在相同的情況下 Netty 是每秒 30w+的吞吐量,Tomcat 是 13w+,可以看出是有一定的差距的,但是 Netty 需要自己處理 HTTP 協議,這一塊比較麻煩。
后面發(fā)現 Soul 網關是基于 Spring WebFlux(底層 Netty)的,不用太關心 HTTP 協議的處理,于是決定也用 Spring WebFlux。
網關的第二個特點是具備可擴展性,比如 Netflix Zuul 有 preFilters,postFilters 等在不同的階段方便處理不同的業(yè)務,基于責任鏈模式將請求進行鏈式處理即可實現。
在微服務架構下,服務都會進行多實例部署來保證高可用,請求到達網關時,網關需要根據 URL 找到所有可用的實例,這時就需要服務注冊和發(fā)現功能,即注冊中心。
現在流行的注冊中心有 Apache 的 Zookeeper 和阿里的 Nacos 兩種(consul 有點小眾),因為之前寫 RPC 框架時已經用過了 Zookeeper,所以這次就選擇了 Nacos。
2.2 需求清單
首先要明確目標,即開發(fā)一個具備哪些特性的網關,總結下后如下:
自定義路由規(guī)則
可基于 version 的路由規(guī)則設置,路由對象包括 DEFAUL,HEADER 和 QUERY 三種,匹配方式包括=、regex、like 三種。
跨語言
HTTP 協議天生跨語言
高性能
Netty 本身就是一款高性能的通信框架,同時 server 將一些路由規(guī)則等數據緩存到 JVM 內存避免請求 admin 服務。
高可用
支持集群模式防止單節(jié)點故障,無狀態(tài)。
灰度發(fā)布
灰度發(fā)布(又名金絲雀發(fā)布)是指在黑與白之間,能夠平滑過渡的一種發(fā)布方式。在其上可以進行 A/B testing,即讓一部分用戶繼續(xù)用產品特性 A,一部分用戶開始用產品特性 B,如果用戶對 B 沒有什么反對意見,那么逐步擴大范圍,把所有用戶都遷移到 B 上面來。通過特性一可以實現。
接口鑒權
基于責任鏈模式,用戶開發(fā)自己的鑒權插件即可。
負載均衡
支持多種負載均衡算法,如隨機,輪詢,加權輪詢等。利用 SPI 機制可以根據配置進行動態(tài)加載。
2.3 架構設計
在參考了一些優(yōu)秀的網關 Zuul,Spring Cloud Gateway,Soul 后,將項目劃分為以下幾個模塊。
| 名稱 | 描述 |
|---|---|
| ship-admin | 后臺管理界面,配置路由規(guī)則等 |
| ship-server | 網關服務端,核心功能模塊 |
| ship-client-spring-boot-starter | 網關客戶端,自動注冊服務信息到注冊中心 |
| ship-common | 一些公共的代碼,如 pojo,常量等。 |
它們之間的關系如圖:

注意:這張圖與實際實現有點出入,Nacos push 到本地緩存的那個環(huán)節(jié)沒有實現,目前只有 ship-sever 定時輪詢 pull 的過程。ship-admin 從 Nacos 獲取注冊服務信息的過程,也改成了 ServiceA 啟動時主動發(fā)生 HTTP 請求通知 ship-admin。
2.4 表結構設計

三、編碼
3.1 ship-client-spring-boot-starter
首先創(chuàng)建一個 spring-boot-starter 命名為 ship-client-spring-boot-starter,不知道如何自定義 starter 的可以看我以前寫的《開發(fā)自己的 starter》。
其核心類?AutoRegisterListener?就是在項目啟動時做了兩件事:
1.將服務信息注冊到 Nacos 注冊中心
2.通知 ship-admin 服務上線了并注冊下線 hook。
代碼如下:
/**
?*?Created?by?2YSP?on?2020/12/21
?*/
public?class?AutoRegisterListener?implements?ApplicationListener<ContextRefreshedEvent>?{
????private?final?static?Logger?LOGGER?=?LoggerFactory.getLogger(AutoRegisterListener.class);
????private?volatile?AtomicBoolean?registered?=?new?AtomicBoolean(false);
????private?final?ClientConfigProperties?properties;
????@NacosInjected
????private?NamingService?namingService;
????@Autowired
????private?RequestMappingHandlerMapping?handlerMapping;
????private?final?ExecutorService?pool;
????/**
?????*?url?list?to?ignore
?????*/
????private?static?List?ignoreUrlList?=?new?LinkedList<>();
????static?{
????????ignoreUrlList.add("/error");
????}
????public?AutoRegisterListener(ClientConfigProperties?properties)?{
????????if?(!check(properties))?{
????????????LOGGER.error("client?config?port,contextPath,appName?adminUrl?and?version?can't?be?empty!");
????????????throw?new?ShipException("client?config?port,contextPath,appName?adminUrl?and?version?can't?be?empty!");
????????}
????????this.properties?=?properties;
????????pool?=?new?ThreadPoolExecutor(1,?4,?0,?TimeUnit.SECONDS,?new?LinkedBlockingQueue<>());
????}
????/**
?????*?check?the?ClientConfigProperties
?????*
?????*?@param?properties
?????*?@return
?????*/
????private?boolean?check(ClientConfigProperties?properties)?{
????????if?(properties.getPort()?==?null?||?properties.getContextPath()?==?null
????????????????||?properties.getVersion()?==?null?||?properties.getAppName()?==?null
????????????????||?properties.getAdminUrl()?==?null)?{
????????????return?false;
????????}
????????return?true;
????}
????@Override
????public?void?onApplicationEvent(ContextRefreshedEvent?event)?{
????????if?(!registered.compareAndSet(false,?true))?{
????????????return;
????????}
????????doRegister();
????????registerShutDownHook();
????}
????/**
?????*?send?unregister?request?to?admin?when?jvm?shutdown
?????*/
????private?void?registerShutDownHook()?{
????????final?String?url?=?"http://"?+?properties.getAdminUrl()?+?AdminConstants.UNREGISTER_PATH;
????????final?UnregisterAppDTO?unregisterAppDTO?=?new?UnregisterAppDTO();
????????unregisterAppDTO.setAppName(properties.getAppName());
????????unregisterAppDTO.setVersion(properties.getVersion());
????????unregisterAppDTO.setIp(IpUtil.getLocalIpAddress());
????????unregisterAppDTO.setPort(properties.getPort());
????????Runtime.getRuntime().addShutdownHook(new?Thread(()?->?{
????????????OkhttpTool.doPost(url,?unregisterAppDTO);
????????????LOGGER.info("[{}:{}]?unregister?from?ship-admin?success!",?unregisterAppDTO.getAppName(),?unregisterAppDTO.getVersion());
????????}));
????}
????/**
?????*?register?all?interface?info?to?register?center
?????*/
????private?void?doRegister()?{
????????Instance?instance?=?new?Instance();
????????instance.setIp(IpUtil.getLocalIpAddress());
????????instance.setPort(properties.getPort());
????????instance.setEphemeral(true);
????????Map?metadataMap?=?new?HashMap<>();
????????metadataMap.put("version",?properties.getVersion());
????????metadataMap.put("appName",?properties.getAppName());
????????instance.setMetadata(metadataMap);
????????try?{
????????????namingService.registerInstance(properties.getAppName(),?NacosConstants.APP_GROUP_NAME,?instance);
????????}?catch?(NacosException?e)?{
????????????LOGGER.error("register?to?nacos?fail",?e);
????????????throw?new?ShipException(e.getErrCode(),?e.getErrMsg());
????????}
????????LOGGER.info("register?interface?info?to?nacos?success!");
????????//?send?register?request?to?ship-admin
????????String?url?=?"http://"?+?properties.getAdminUrl()?+?AdminConstants.REGISTER_PATH;
????????RegisterAppDTO?registerAppDTO?=?buildRegisterAppDTO(instance);
????????OkhttpTool.doPost(url,?registerAppDTO);
????????LOGGER.info("register?to?ship-admin?success!");
????}
????private?RegisterAppDTO?buildRegisterAppDTO(Instance?instance)?{
????????RegisterAppDTO?registerAppDTO?=?new?RegisterAppDTO();
????????registerAppDTO.setAppName(properties.getAppName());
????????registerAppDTO.setContextPath(properties.getContextPath());
????????registerAppDTO.setIp(instance.getIp());
????????registerAppDTO.setPort(instance.getPort());
????????registerAppDTO.setVersion(properties.getVersion());
????????return?registerAppDTO;
????}
}
3.2 ship-server
ship-sever 項目主要包括了兩個部分內容, 1.請求動態(tài)路由的主流程 2.本地緩存數據和 ship-admin 及 nacos 同步,這部分在后面 3.3 再講。
ship-server 實現動態(tài)路由的原理是利用 WebFilter 攔截請求,然后將請求教給 plugin chain 去鏈式處理。
PluginFilter 根據 URL 解析出 appName,然后將啟用的 plugin 組裝成 plugin chain。
public?class?PluginFilter?implements?WebFilter?{
????private?ServerConfigProperties?properties;
????public?PluginFilter(ServerConfigProperties?properties)?{
????????this.properties?=?properties;
????}
????@Override
????public?Mono?filter(ServerWebExchange?exchange,?WebFilterChain?chain)? {
????????String?appName?=?parseAppName(exchange);
????????if?(CollectionUtils.isEmpty(ServiceCache.getAllInstances(appName)))?{
????????????throw?new?ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
????????}
????????PluginChain?pluginChain?=?new?PluginChain(properties,?appName);
????????pluginChain.addPlugin(new?DynamicRoutePlugin(properties));
????????pluginChain.addPlugin(new?AuthPlugin(properties));
????????return?pluginChain.execute(exchange,?pluginChain);
????}
????private?String?parseAppName(ServerWebExchange?exchange)?{
????????RequestPath?path?=?exchange.getRequest().getPath();
????????String?appName?=?path.value().split("/")[1];
????????return?appName;
????}
}
PluginChain 繼承了 AbstractShipPlugin 并持有所有要執(zhí)行的插件。
/**
?*?@Author:?Ship
?*?@Description:
?*?@Date:?Created?in?2020/12/25
?*/
public?class?PluginChain?extends?AbstractShipPlugin?{
????/**
?????*?the?pos?point?to?current?plugin
?????*/
????private?int?pos;
????/**
?????*?the?plugins?of?chain
?????*/
????private?List?plugins;
????private?final?String?appName;
????public?PluginChain(ServerConfigProperties?properties,?String?appName)?{
????????super(properties);
????????this.appName?=?appName;
????}
????/**
?????*?add?enabled?plugin?to?chain
?????*
?????*?@param?shipPlugin
?????*/
????public?void?addPlugin(ShipPlugin?shipPlugin)?{
????????if?(plugins?==?null)?{
????????????plugins?=?new?ArrayList<>();
????????}
????????if?(!PluginCache.isEnabled(appName,?shipPlugin.name()))?{
????????????return;
????????}
????????plugins.add(shipPlugin);
????????//?order?by?the?plugin's?order
????????plugins.sort(Comparator.comparing(ShipPlugin::order));
????}
????@Override
????public?Integer?order()?{
????????return?null;
????}
????@Override
????public?String?name()?{
????????return?null;
????}
????@Override
????public?Mono?execute(ServerWebExchange?exchange,?PluginChain?pluginChain)? {
????????if?(pos?==?plugins.size())?{
????????????return?exchange.getResponse().setComplete();
????????}
????????return?pluginChain.plugins.get(pos++).execute(exchange,?pluginChain);
????}
????public?String?getAppName()?{
????????return?appName;
????}
}
AbstractShipPlugin 實現了 ShipPlugin 接口,并持有 ServerConfigProperties 配置對象。
public?abstract?class?AbstractShipPlugin?implements?ShipPlugin?{
????protected?ServerConfigProperties?properties;
????public?AbstractShipPlugin(ServerConfigProperties?properties)?{
????????this.properties?=?properties;
????}
}
ShipPlugin 接口定義了所有插件必須實現的三個方法 order(),name()和 execute()。
public?interface?ShipPlugin?{
????/**
?????*?lower?values?have?higher?priority
?????*
?????*?@return
?????*/
????Integer?order();
????/**
?????*?return?current?plugin?name
?????*
?????*?@return
?????*/
????String?name();
????Mono?execute(ServerWebExchange?exchange,PluginChain?pluginChain) ;
}
DynamicRoutePlugin 繼承了抽象類 AbstractShipPlugin,包含了動態(tài)路由的主要業(yè)務邏輯。
/**
?*?@Author:?Ship
?*?@Description:
?*?@Date:?Created?in?2020/12/25
?*/
public?class?DynamicRoutePlugin?extends?AbstractShipPlugin?{
????private?final?static?Logger?LOGGER?=?LoggerFactory.getLogger(DynamicRoutePlugin.class);
????private?static?WebClient?webClient;
????private?static?final?Gson?gson?=?new?GsonBuilder().create();
????static?{
????????HttpClient?httpClient?=?HttpClient.create()
????????????????.tcpConfiguration(client?->
????????????????????????client.doOnConnected(conn?->
????????????????????????????????conn.addHandlerLast(new?ReadTimeoutHandler(3))
????????????????????????????????????????.addHandlerLast(new?WriteTimeoutHandler(3)))
????????????????????????????????.option(ChannelOption.TCP_NODELAY,?true)
????????????????);
????????webClient?=?WebClient.builder().clientConnector(new?ReactorClientHttpConnector(httpClient))
????????????????.build();
????}
????public?DynamicRoutePlugin(ServerConfigProperties?properties)?{
????????super(properties);
????}
????@Override
????public?Integer?order()?{
????????return?ShipPluginEnum.DYNAMIC_ROUTE.getOrder();
????}
????@Override
????public?String?name()?{
????????return?ShipPluginEnum.DYNAMIC_ROUTE.getName();
????}
????@Override
????public?Mono?execute(ServerWebExchange?exchange,?PluginChain?pluginChain)? {
????????String?appName?=?pluginChain.getAppName();
????????ServiceInstance?serviceInstance?=?chooseInstance(appName,?exchange.getRequest());
//????????LOGGER.info("selected?instance?is?[{}]",?gson.toJson(serviceInstance));
????????//?request?service
????????String?url?=?buildUrl(exchange,?serviceInstance);
????????return?forward(exchange,?url);
????}
????/**
?????*?forward?request?to?backend?service
?????*
?????*?@param?exchange
?????*?@param?url
?????*?@return
?????*/
????private?Mono?forward(ServerWebExchange?exchange,?String?url)? {
????????ServerHttpRequest?request?=?exchange.getRequest();
????????ServerHttpResponse?response?=?exchange.getResponse();
????????HttpMethod?method?=?request.getMethod();
????????WebClient.RequestBodySpec?requestBodySpec?=?webClient.method(method).uri(url).headers((headers)?->?{
????????????headers.addAll(request.getHeaders());
????????});
????????WebClient.RequestHeadersSpec>?reqHeadersSpec;
????????if?(requireHttpBody(method))?{
????????????reqHeadersSpec?=?requestBodySpec.body(BodyInserters.fromDataBuffers(request.getBody()));
????????}?else?{
????????????reqHeadersSpec?=?requestBodySpec;
????????}
????????//?nio->callback->nio
????????return?reqHeadersSpec.exchange().timeout(Duration.ofMillis(properties.getTimeOutMillis()))
????????????????.onErrorResume(ex?->?{
????????????????????return?Mono.defer(()?->?{
????????????????????????String?errorResultJson?=?"";
????????????????????????if?(ex?instanceof?TimeoutException)?{
????????????????????????????errorResultJson?=?"{\"code\":5001,\"message\":\"network?timeout\"}";
????????????????????????}?else?{
????????????????????????????errorResultJson?=?"{\"code\":5000,\"message\":\"system?error\"}";
????????????????????????}
????????????????????????return?ShipResponseUtil.doResponse(exchange,?errorResultJson);
????????????????????}).then(Mono.empty());
????????????????}).flatMap(backendResponse?->?{
????????????????????response.setStatusCode(backendResponse.statusCode());
????????????????????response.getHeaders().putAll(backendResponse.headers().asHttpHeaders());
????????????????????return?response.writeWith(backendResponse.bodyToFlux(DataBuffer.class));
????????????????});
????}
????/**
?????*?weather?the?http?method?need?http?body
?????*
?????*?@param?method
?????*?@return
?????*/
????private?boolean?requireHttpBody(HttpMethod?method)?{
????????if?(method.equals(HttpMethod.POST)?||?method.equals(HttpMethod.PUT)?||?method.equals(HttpMethod.PATCH))?{
????????????return?true;
????????}
????????return?false;
????}
????private?String?buildUrl(ServerWebExchange?exchange,?ServiceInstance?serviceInstance)?{
????????ServerHttpRequest?request?=?exchange.getRequest();
????????String?query?=?request.getURI().getQuery();
????????String?path?=?request.getPath().value().replaceFirst("/"?+?serviceInstance.getAppName(),?"");
????????String?url?=?"http://"?+?serviceInstance.getIp()?+?":"?+?serviceInstance.getPort()?+?path;
????????if?(!StringUtils.isEmpty(query))?{
????????????url?=?url?+?"?"?+?query;
????????}
????????return?url;
????}
????/**
?????*?choose?an?ServiceInstance?according?to?route?rule?config?and?load?balancing?algorithm
?????*
?????*?@param?appName
?????*?@param?request
?????*?@return
?????*/
????private?ServiceInstance?chooseInstance(String?appName,?ServerHttpRequest?request)?{
????????List?serviceInstances?=?ServiceCache.getAllInstances(appName);
????????if?(CollectionUtils.isEmpty(serviceInstances))?{
????????????LOGGER.error("service?instance?of?{}?not?find",?appName);
????????????throw?new?ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
????????}
????????String?version?=?matchAppVersion(appName,?request);
????????if?(StringUtils.isEmpty(version))?{
????????????throw?new?ShipException("match?app?version?error");
????????}
????????//?filter?serviceInstances?by?version
????????List?instances?=?serviceInstances.stream().filter(i?->?i.getVersion().equals(version)).collect(Collectors.toList());
????????//Select?an?instance?based?on?the?load?balancing?algorithm
????????LoadBalance?loadBalance?=?LoadBalanceFactory.getInstance(properties.getLoadBalance(),?appName,?version);
????????ServiceInstance?serviceInstance?=?loadBalance.chooseOne(instances);
????????return?serviceInstance;
????}
????private?String?matchAppVersion(String?appName,?ServerHttpRequest?request)?{
????????List?rules?=?RouteRuleCache.getRules(appName);
????????rules.sort(Comparator.comparing(AppRuleDTO::getPriority).reversed());
????????for?(AppRuleDTO?rule?:?rules)?{
????????????if?(match(rule,?request))?{
????????????????return?rule.getVersion();
????????????}
????????}
????????return?null;
????}
????private?boolean?match(AppRuleDTO?rule,?ServerHttpRequest?request)?{
????????String?matchObject?=?rule.getMatchObject();
????????String?matchKey?=?rule.getMatchKey();
????????String?matchRule?=?rule.getMatchRule();
????????Byte?matchMethod?=?rule.getMatchMethod();
????????if?(MatchObjectEnum.DEFAULT.getCode().equals(matchObject))?{
????????????return?true;
????????}?else?if?(MatchObjectEnum.QUERY.getCode().equals(matchObject))?{
????????????String?param?=?request.getQueryParams().getFirst(matchKey);
????????????if?(!StringUtils.isEmpty(param))?{
????????????????return?StringTools.match(param,?matchMethod,?matchRule);
????????????}
????????}?else?if?(MatchObjectEnum.HEADER.getCode().equals(matchObject))?{
????????????HttpHeaders?headers?=?request.getHeaders();
????????????String?headerValue?=?headers.getFirst(matchKey);
????????????if?(!StringUtils.isEmpty(headerValue))?{
????????????????return?StringTools.match(headerValue,?matchMethod,?matchRule);
????????????}
????????}
????????return?false;
????}
}
3.3 數據同步
app 數據同步
后臺服務(如訂單服務)啟動時,只將服務名,版本,ip 地址和端口號注冊到了 Nacos,并沒有實例的權重和啟用的插件信息怎么辦?
一般在線的實例權重和插件列表都是在管理界面配置,然后動態(tài)生效的,所以需要 ship-admin 定時更新實例的權重和插件信息到注冊中心。
對應代碼 ship-admin 的 NacosSyncListener
/**
?*?@Author:?Ship
?*?@Description:
?*?@Date:?Created?in?2020/12/30
?*/
@Configuration
public?class?NacosSyncListener?implements?ApplicationListener<ContextRefreshedEvent>?{
????private?static?final?Logger?LOGGER?=?LoggerFactory.getLogger(NacosSyncListener.class);
????private?static?ScheduledThreadPoolExecutor?scheduledPool?=?new?ScheduledThreadPoolExecutor(1,
????????????new?ShipThreadFactory("nacos-sync",?true).create());
????@NacosInjected
????private?NamingService?namingService;
????@Value("${nacos.discovery.server-addr}")
????private?String?baseUrl;
????@Resource
????private?AppService?appService;
????@Override
????public?void?onApplicationEvent(ContextRefreshedEvent?event)?{
????????if?(event.getApplicationContext().getParent()?!=?null)?{
????????????return;
????????}
????????String?url?=?"http://"?+?baseUrl?+?NacosConstants.INSTANCE_UPDATE_PATH;
????????scheduledPool.scheduleWithFixedDelay(new?NacosSyncTask(namingService,?url,?appService),?0,?30L,?TimeUnit.SECONDS);
????}
????class?NacosSyncTask?implements?Runnable?{
????????private?NamingService?namingService;
????????private?String?url;
????????private?AppService?appService;
????????private?Gson?gson?=?new?GsonBuilder().create();
????????public?NacosSyncTask(NamingService?namingService,?String?url,?AppService?appService)?{
????????????this.namingService?=?namingService;
????????????this.url?=?url;
????????????this.appService?=?appService;
????????}
????????/**
?????????*?Regular?update?weight,enabled?plugins?to?nacos?instance
?????????*/
????????@Override
????????public?void?run()?{
????????????try?{
????????????????//?get?all?app?names
????????????????ListView?services?=?namingService.getServicesOfServer(1,?Integer.MAX_VALUE,?NacosConstants.APP_GROUP_NAME);
????????????????if?(CollectionUtils.isEmpty(services.getData()))?{
????????????????????return;
????????????????}
????????????????List?appNames?=?services.getData();
????????????????List?appInfos?=?appService.getAppInfos(appNames);
????????????????for?(AppInfoDTO?appInfo?:?appInfos)?{
????????????????????if?(CollectionUtils.isEmpty(appInfo.getInstances()))?{
????????????????????????continue;
????????????????????}
????????????????????for?(ServiceInstance?instance?:?appInfo.getInstances())?{
????????????????????????Map?queryMap?=?buildQueryMap(appInfo,?instance);
????????????????????????String?resp?=?OkhttpTool.doPut(url,?queryMap,?"");
????????????????????????LOGGER.debug("response?:{}",?resp);
????????????????????}
????????????????}
????????????}?catch?(Exception?e)?{
????????????????LOGGER.error("nacos?sync?task?error",?e);
????????????}
????????}
????????private?Map?buildQueryMap(AppInfoDTO?appInfo,?ServiceInstance?instance)? {
????????????Map?map?=?new?HashMap<>();
????????????map.put("serviceName",?appInfo.getAppName());
????????????map.put("groupName",?NacosConstants.APP_GROUP_NAME);
????????????map.put("ip",?instance.getIp());
????????????map.put("port",?instance.getPort());
????????????map.put("weight",?instance.getWeight().doubleValue());
????????????NacosMetadata?metadata?=?new?NacosMetadata();
????????????metadata.setAppName(appInfo.getAppName());
????????????metadata.setVersion(instance.getVersion());
????????????metadata.setPlugins(String.join(",",?appInfo.getEnabledPlugins()));
????????????map.put("metadata",?StringTools.urlEncode(gson.toJson(metadata)));
????????????map.put("ephemeral",?true);
????????????return?map;
????????}
????}
}
ship-server 再定時從 Nacos 拉取 app 數據更新到本地 Map 緩存。
/**
?*?@Author:?Ship
?*?@Description:?sync?data?to?local?cache
?*?@Date:?Created?in?2020/12/25
?*/
@Configuration
public?class?DataSyncTaskListener?implements?ApplicationListener<ContextRefreshedEvent>?{
????private?static?ScheduledThreadPoolExecutor?scheduledPool?=?new?ScheduledThreadPoolExecutor(1,
????????????new?ShipThreadFactory("service-sync",?true).create());
????@NacosInjected
????private?NamingService?namingService;
????@Autowired
????private?ServerConfigProperties?properties;
????@Override
????public?void?onApplicationEvent(ContextRefreshedEvent?event)?{
????????if?(event.getApplicationContext().getParent()?!=?null)?{
????????????return;
????????}
????????scheduledPool.scheduleWithFixedDelay(new?DataSyncTask(namingService)
????????????????,?0L,?properties.getCacheRefreshInterval(),?TimeUnit.SECONDS);
????????WebsocketSyncCacheServer?websocketSyncCacheServer?=?new?WebsocketSyncCacheServer(properties.getWebSocketPort());
????????websocketSyncCacheServer.start();
????}
????class?DataSyncTask?implements?Runnable?{
????????private?NamingService?namingService;
????????public?DataSyncTask(NamingService?namingService)?{
????????????this.namingService?=?namingService;
????????}
????????@Override
????????public?void?run()?{
????????????try?{
????????????????//?get?all?app?names
????????????????ListView?services?=?namingService.getServicesOfServer(1,?Integer.MAX_VALUE,?NacosConstants.APP_GROUP_NAME);
????????????????if?(CollectionUtils.isEmpty(services.getData()))?{
????????????????????return;
????????????????}
????????????????List?appNames?=?services.getData();
????????????????//?get?all?instances
????????????????for?(String?appName?:?appNames)?{
????????????????????List?instanceList?=?namingService.getAllInstances(appName,?NacosConstants.APP_GROUP_NAME);
????????????????????if?(CollectionUtils.isEmpty(instanceList))?{
????????????????????????continue;
????????????????????}
????????????????????ServiceCache.add(appName,?buildServiceInstances(instanceList));
????????????????????List?pluginNames?=?getEnabledPlugins(instanceList);
????????????????????PluginCache.add(appName,?pluginNames);
????????????????}
????????????????ServiceCache.removeExpired(appNames);
????????????????PluginCache.removeExpired(appNames);
????????????}?catch?(NacosException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????????private?List?getEnabledPlugins(List?instanceList) ? {
????????????Instance?instance?=?instanceList.get(0);
????????????Map?metadata?=?instance.getMetadata();
????????????//?plugins:?DynamicRoute,Auth
????????????String?plugins?=?metadata.getOrDefault("plugins",?ShipPluginEnum.DYNAMIC_ROUTE.getName());
????????????return?Arrays.stream(plugins.split(",")).collect(Collectors.toList());
????????}
????????private?List?buildServiceInstances(List?instanceList) ? {
????????????List?list?=?new?LinkedList<>();
????????????instanceList.forEach(instance?->?{
????????????????Map?metadata?=?instance.getMetadata();
????????????????ServiceInstance?serviceInstance?=?new?ServiceInstance();
????????????????serviceInstance.setAppName(metadata.get("appName"));
????????????????serviceInstance.setIp(instance.getIp());
????????????????serviceInstance.setPort(instance.getPort());
????????????????serviceInstance.setVersion(metadata.get("version"));
????????????????serviceInstance.setWeight((int)?instance.getWeight());
????????????????list.add(serviceInstance);
????????????});
????????????return?list;
????????}
????}
}
路由規(guī)則數據同步
同時,如果用戶在管理后臺更新了路由規(guī)則,ship-admin 需要推送規(guī)則數據到 ship-server,這里參考了 soul 網關的做法利用 websocket 在第一次建立連接后進行全量同步,此后路由規(guī)則發(fā)生變更就只作增量同步。
服務端 WebsocketSyncCacheServer:
/**
?*?@Author:?Ship
?*?@Description:
?*?@Date:?Created?in?2020/12/28
?*/
public?class?WebsocketSyncCacheServer?extends?WebSocketServer?{
????private?final?static?Logger?LOGGER?=?LoggerFactory.getLogger(WebsocketSyncCacheServer.class);
????private?Gson?gson?=?new?GsonBuilder().create();
????private?MessageHandler?messageHandler;
????public?WebsocketSyncCacheServer(Integer?port)?{
????????super(new?InetSocketAddress(port));
????????this.messageHandler?=?new?MessageHandler();
????}
????@Override
????public?void?onOpen(WebSocket?webSocket,?ClientHandshake?clientHandshake)?{
????????LOGGER.info("server?is?open");
????}
????@Override
????public?void?onClose(WebSocket?webSocket,?int?i,?String?s,?boolean?b)?{
????????LOGGER.info("websocket?server?close...");
????}
????@Override
????public?void?onMessage(WebSocket?webSocket,?String?message)?{
????????LOGGER.info("websocket?server?receive?message:\n[{}]",?message);
????????this.messageHandler.handler(message);
????}
????@Override
????public?void?onError(WebSocket?webSocket,?Exception?e)?{
????}
????@Override
????public?void?onStart()?{
????????LOGGER.info("websocket?server?start...");
????}
????class?MessageHandler?{
????????public?void?handler(String?message)?{
????????????RouteRuleOperationDTO?operationDTO?=?gson.fromJson(message,?RouteRuleOperationDTO.class);
????????????if?(CollectionUtils.isEmpty(operationDTO.getRuleList()))?{
????????????????return;
????????????}
????????????Map>?map?=?operationDTO.getRuleList()
????????????????????.stream().collect(Collectors.groupingBy(AppRuleDTO::getAppName));
????????????if?(OperationTypeEnum.INSERT.getCode().equals(operationDTO.getOperationType())
????????????????????||?OperationTypeEnum.UPDATE.getCode().equals(operationDTO.getOperationType()))?{
????????????????RouteRuleCache.add(map);
????????????}?else?if?(OperationTypeEnum.DELETE.getCode().equals(operationDTO.getOperationType()))?{
????????????????RouteRuleCache.remove(map);
????????????}
????????}
????}
}
客戶端 WebsocketSyncCacheClient:
/**
?*?@Author:?Ship
?*?@Description:
?*?@Date:?Created?in?2020/12/28
?*/
@Component
public?class?WebsocketSyncCacheClient?{
????private?final?static?Logger?LOGGER?=?LoggerFactory.getLogger(WebsocketSyncCacheClient.class);
????private?WebSocketClient?client;
????private?RuleService?ruleService;
????private?Gson?gson?=?new?GsonBuilder().create();
????public?WebsocketSyncCacheClient(@Value("${ship.server-web-socket-url}")?String?serverWebSocketUrl,
????????????????????????????????????RuleService?ruleService)?{
????????if?(StringUtils.isEmpty(serverWebSocketUrl))?{
????????????throw?new?ShipException(ShipExceptionEnum.CONFIG_ERROR);
????????}
????????this.ruleService?=?ruleService;
????????ScheduledThreadPoolExecutor?executor?=?new?ScheduledThreadPoolExecutor(1,
????????????????new?ShipThreadFactory("websocket-connect",?true).create());
????????try?{
????????????client?=?new?WebSocketClient(new?URI(serverWebSocketUrl))?{
????????????????@Override
????????????????public?void?onOpen(ServerHandshake?serverHandshake)?{
????????????????????LOGGER.info("client?is?open");
????????????????????List?list?=?ruleService.getEnabledRule();
????????????????????String?msg?=?gson.toJson(new?RouteRuleOperationDTO(OperationTypeEnum.INSERT,?list));
????????????????????send(msg);
????????????????}
????????????????@Override
????????????????public?void?onMessage(String?s)?{
????????????????}
????????????????@Override
????????????????public?void?onClose(int?i,?String?s,?boolean?b)?{
????????????????}
????????????????@Override
????????????????public?void?onError(Exception?e)?{
????????????????????LOGGER.error("websocket?client?error",?e);
????????????????}
????????????};
????????????client.connectBlocking();
????????????//使用調度線程池進行斷線重連,30秒進行一次
????????????executor.scheduleAtFixedRate(()?->?{
????????????????if?(client?!=?null?&&?client.isClosed())?{
????????????????????try?{
????????????????????????client.reconnectBlocking();
????????????????????}?catch?(InterruptedException?e)?{
????????????????????????LOGGER.error("reconnect?server?fail",?e);
????????????????????}
????????????????}
????????????},?10,?30,?TimeUnit.SECONDS);
????????}?catch?(Exception?e)?{
????????????LOGGER.error("websocket?sync?cache?exception",?e);
????????????throw?new?ShipException(e.getMessage());
????????}
????}
????public??void?send(T?t)?{
????????while?(!client.getReadyState().equals(ReadyState.OPEN))?{
????????????LOGGER.debug("connecting?...please?wait");
????????}
????????client.send(gson.toJson(t));
????}
}
四、測試
4.1 動態(tài)路由測試
本地啟動 nacos ,sh startup.sh -m standalone
啟動 ship-admin
本地啟動兩個 ship-example 實例。
實例 1 配置:
ship:
??http:
????app-name:?order
????version:?gray_1.0
????context-path:?/order
????port:?8081
????admin-url:?127.0.0.1:9001
server:
??port:?8081
nacos:
??discovery:
????server-addr:?127.0.0.1:8848實例 2 配置:
ship:
??http:
????app-name:?order
????version:?prod_1.0
????context-path:?/order
????port:?8082
????admin-url:?127.0.0.1:9001
server:
??port:?8082
nacos:
??discovery:
????server-addr:?127.0.0.1:8848在數據庫添加路由規(guī)則配置,該規(guī)則表示當 http header 中的 name=ship 時請求路由到 gray_1.0 版本的節(jié)點。

啟動 ship-server,看到以下日志時則可以進行測試了。
2021-01-02?19:57:09.159??INFO?30413?---?[SocketWorker-29]?cn.sp.sync.WebsocketSyncCacheServer??????:?websocket?server?receive?message:
[{"operationType":"INSERT","ruleList":[{"id":1,"appId":5,"appName":"order","version":"gray_1.0","matchObject":"HEADER","matchKey":"name","matchMethod":1,"matchRule":"ship","priority":50}]}]用 Postman 請求http://localhost:9000/order/user/add,POST方式,header設置name=ship,可以看到只有實例1有日志顯示。
==========add?user,version:gray_1.0
4.2 性能壓測
壓測環(huán)境:
MacBook Pro 13 英寸
處理器 2.3 GHz 四核 Intel Core i7
內存 16 GB 3733 MHz LPDDR4X
后端節(jié)點個數一個
壓測工具:wrk
壓測結果:20 個線程,500 個連接數,吞吐量大概每秒 9400 個請求。

五、總結
千里之行始于足下,開始以為寫一個網關會很難,但當你實際開始行動時就會發(fā)現其實沒那么難,所以邁出第一步很重要。過程中也遇到了很多問題,還在 github 上給 soul 和 nacos 這兩個開源項目提了兩個 issue,后來發(fā)現是自己的問題,尷尬 ??。本文代碼已全部上傳到 github,點擊這里即可,最后,希望此文對你有所幫助。
參考資料:
https://nacos.io/zh-cn/docs/quick-start.html
https://dromara.org/website/zh-cn/docs/soul/soul.html
https://docs.spring.io/spring-framework/docs/5.1.7.RELEASE/spring-framework-reference/web-reactive.html#webflux
https://github.com/TooTallNate/Java-WebSocket
推薦??: ?Github掘金計劃:Github上的一些優(yōu)質項目搜羅
推薦??:V4.0 《JavaGuide 面試突擊版》來啦!年初搞波大的
