注冊中心選型以及Spring Cloud 是如何實現(xiàn)服務發(fā)現(xiàn)的
注冊中心
CAP原則
CAP原則又稱CAP定理,指的是在一個分布式系統(tǒng)中,一致性(Consistency)、可用性(Availability)、分區(qū)容錯性(Partition tolerance),這三個要素最多只能同時實現(xiàn)兩點,不可能三者兼顧。
| CAP | 適用場景 | 解釋 |
|---|---|---|
| CA | 幾乎不存在 | 在分布式系統(tǒng)中,P必然存在,除非適用單機,要提升分區(qū)可靠性,需要通過提升基礎設施的可靠性實現(xiàn) |
| CP | 分布式數(shù)據(jù)庫(Redis、HBase、zk、etcd) | 分布式數(shù)據(jù)庫極端情況下優(yōu)先保證數(shù)據(jù)一致性 |
| AP | 大部分對數(shù)據(jù)一致性沒有嚴格要求的場景 | 優(yōu)先保證服務可用 |
BASE
BASE是Basically Available(基本可用)、Soft state(軟狀態(tài))和 Eventually consistent(最終一致性)三個短語的簡寫。
BASE是對 CAP 中一致性和可用性權衡的結果,其來源于對大規(guī)模互聯(lián)網(wǎng)系統(tǒng)分布式實踐的結論,是基于CAP定理逐步演化而來的,其核心思想是即使無法做到強一致性(Strong consistency),但每個應用都可以根據(jù)自身的業(yè)務特點,采用適當?shù)姆绞絹硎瓜到y(tǒng)達到最終一致性(Eventual consistency)。接下來我們著重對BASE中的三要素進行詳細講解?;究捎茫褐阜植际较到y(tǒng)在出現(xiàn)不可預知故障的時候,允許損失部分可用性。
常見注冊中心比較
| Zookeeper | Eureka | Consul | Nacos | Etcd | |
|---|---|---|---|---|---|
| 數(shù)據(jù)一致性 | CP | AP | CP | AP/CP | CP |
| 健康檢查 | Keep Alive | ClientBeat | TCP/HTTP/grpc/Cmd | TCP/HTTP/MySql/ClientBeat | ClientBeat |
| 負載均衡策略 | Ribbon | Fabio | 權重/metadata/Selector | ||
| 雪崩保護 | 無 | 有 | 無 | 有 | |
| 自動注銷實例 | √ | × | √ | ||
| 訪問協(xié)議 | TCP | HTTP | HTTP/DNS | HTTP/DNS | HTTP |
| 監(jiān)聽支持 | √ | √ | √ | √ | √ |
| 多數(shù)據(jù)中心 | × | √ | √ | √ | |
| 跨注冊中心同步 | × | × | √ | √ | |
| Spring Cloud集成 | √ | √ | √ | √ | |
| Dubbo集成 | √ | × | × | √ | |
| K8s集成 | × | × | √ | √ | √ |
| 部署難度 | 4 | 1 | 3 | 2 | 4 |
| 開發(fā)語言 | Java | Java | Go | Java | Go |
| 功能 | 分布式數(shù)據(jù)協(xié)同 | 基于 HTTP 協(xié)議的服務發(fā)現(xiàn) | 多種機制的服務發(fā)現(xiàn)和 KV 存儲 | 多種機制的服務發(fā)現(xiàn)、KV 存儲、配置中心、大而全的功能 | 分布式數(shù)據(jù)協(xié)同 |
| 時效性 | 秒級 | 取決于具體配置。默認 30s 更新服務實例信息,90s 才會去剔除失效的節(jié)點,在這種配置下可能 2 分鐘才能獲取到最新的配置 | 看具體配置 | 正常情況下秒級,異常情況取決于具體配置。默認 15s |
一個基本的注冊中心需要以下 4 個基本的功能:
注冊服務實例信息
心跳機制
剔除失敗的服務實例信息
查詢服務實例信息操作
zookeeper
zk 本身并不是為了做注冊中心的,不過其提供的通用樹狀存儲結構和 znode 機可以間接完成服務發(fā)現(xiàn)的必要功能。比如我們有 2 個服務 a 和 b
/
├ a
┆ ├ a1
┆ └ a2
└ b
└ b1
這樣存儲,可以通過查詢 a 節(jié)點,獲取服務 a 下面的實例信息。
在 zk 中,可以在使用臨時節(jié)點創(chuàng)建 a1、a2、b1 這樣的用來存儲服務實例信息的節(jié)點,當服務實例關閉或者通信異常時,zookeeper 可以自動刪除這些臨時節(jié)點,這樣就實現(xiàn)了剔除機制。
zk,一旦服務掛掉,zk感知到以及通知其他服務的時效性,服務注冊到zk之后通知到其他服務的時效性,leader掛掉之后可用性是否會出現(xiàn)短暫的問題,為了去換取一致性
注冊機制:客戶端主動創(chuàng)建臨時節(jié)點
心跳機制:因為創(chuàng)建的是臨時節(jié)點,依靠 zk 本身的會話機制
剔除機制:會話失效后,臨時節(jié)點自動剔除
查詢機制:使用 zk 協(xié)議去查詢節(jié)點
Eureka

相比于 zookeeper 來說,Eureka 是專門用來做注冊中心的,本身提供了注冊中心需要的所有的功能。其提供了 SDK 和 HTTP 接口來訪問 Eureka Server.
其部分 API 如下,更多的查看 Eureka REST operations
| Operation | HTTP action | Description |
|---|---|---|
| Register new application instance | POST /eureka/v2/apps/appID | Input: JSON/XML payload HTTP Code: 204 on success |
| De-register application instance | DELETE /eureka/v2/apps/appID/instanceID | HTTP Code: 200 on success |
| Send application instance heartbeat | PUT /eureka/v2/apps/appID/instanceID | HTTP Code: * 200 on success * 404 if instanceID doesn’t exist |
| Query for all instances | GET /eureka/v2/apps | HTTP Code: 200 on success Output: JSON/XML |
| Query for all appID instances | GET /eureka/v2/apps/appID | HTTP Code: 200 on success Output: JSON/XML |
| Query for a specific appID/instanceID | GET /eureka/v2/apps/appID/instanceID | HTTP Code: 200 on success Output: JSON/XML |
如果不想使用 HTTP 接口,也可以直接使用 Eureka 提供的 Java SDK
Eureka 更側重于 AP,其通過自我保護機制,可以在網(wǎng)絡異常的情況下,保留大部分節(jié)點信息,來防止雪崩的情況
如果 Eureka 服務器檢測到比預期數(shù)量多的注冊客戶端以不合適的方式終止了它們的連接,并且同時等待驅逐,它們將進入自我保護模式。這樣做是為了確保災難性網(wǎng)絡事件不會清除 eureka 注冊表數(shù)據(jù),并將其向下傳播到所有客戶端。
自我保護機制的工作機制是:如果在15分鐘內(nèi)超過85%的客戶端節(jié)點都沒有正常的心跳,那么Eureka就認為客戶端與注冊中心出現(xiàn)了網(wǎng)絡故障,Eureka Server自動進入自我保護機制,此時會出現(xiàn)以下幾種情況:
Eureka Server不再從注冊列表中移除因為長時間沒收到心跳而應該過期的服務。
Eureka Server仍然能夠接受新服務的注冊和查詢請求,但是不會被同步到其它節(jié)點上,保證當前節(jié)點依然可用。
當網(wǎng)絡穩(wěn)定時,當前Eureka Server新的注冊信息會被同步到其它節(jié)點中。
具體可見 Server Self Preservation Mode
當 Eureka 進入自我保護機制的情況下,會造成服務實例無法剔除的情況,Client 在查詢的時候可能查詢到已經(jīng)掛掉的實例信息。
Eureka 是 peer-to-peer 模式,可能還沒同步數(shù)據(jù)過去,結果自己就死了,此時還是可以繼續(xù)從別的機器上拉取注冊表,但是看到的就不是最新的數(shù)據(jù)了,但是保證了可用性,強一致,最終一致性
注冊機制:客戶端主動創(chuàng)建節(jié)點信息(使用 SDK 或者 HTTP 接口)
心跳機制:客戶端主動維持上報(使用 SDK 或者 HTTP 接口,默認 30s 上報一次)
剔除機制:未收到客戶端 3 次心跳后,服務端主動刪除
查詢機制:客戶端主動查詢節(jié)點信息(使用 SDK 或者 HTTP 接口)
Consul
關于 Consul 和其他注冊中心的對比,因為 Consul 本身出了文檔這里不在贅敘 Consul VS Other

Consul 本身提供了 Go SDK 和 HTTP 接口, 其中包括服務注冊、健康檢查、服務查詢、kv 操作等功能的 API, 雖然沒有提供其他的語言的官方 SDK, 但也有一些個人去封裝了?;蛟S可以使用非官方的或者自己封裝 HTTP 接口。
相對于 Eureka,Consul 提供了多種心跳機制,包括:
Script + Interval
HTTP + Interval
TCP + Interval
Time to Live (TTL)
Docker + Interval
gRPC + Interval
H2ping + Interval
Alias
注冊機制:客戶端主動創(chuàng)建節(jié)點信息(使用 SDK 或者 HTTP 接口)
心跳機制:服務端根據(jù)你采用心跳機制對客戶端進行心跳測試(和 Eureka、zk 不同,這里是服務端向客戶端發(fā)起)
剔除機制:服務端未成功檢測到客戶端心跳反應后,服務端主動刪除
查詢機制:客戶端主動查詢節(jié)點信息(使用 SDK 或者 HTTP 接口)
Nacos
Nacos 支持基于 DNS 和基于 RPC 的服務發(fā)現(xiàn)。服務提供者使用 原生SDK、OpenAPI、或一個獨立的Agent TODO注冊 Service 后,服務消費者可以使用DNS TODO 或HTTP&API查找和發(fā)現(xiàn)服務。
Nacos 提供對服務的實時的健康檢查,阻止向不健康的主機或服務實例發(fā)送請求。Nacos 支持傳輸層 (PING 或 TCP)和應用層 (如 HTTP、MySQL、用戶自定義)的健康檢查。對于復雜的云環(huán)境和網(wǎng)絡拓撲環(huán)境中(如 VPC、邊緣網(wǎng)絡等)服務的健康檢查,Nacos 提供了 agent 上報模式和服務端主動檢測2種健康檢查模式。Nacos 還提供了統(tǒng)一的健康檢查儀表盤,幫助您根據(jù)健康狀態(tài)管理服務的可用性及流量。



注冊機制:客戶端主動創(chuàng)建節(jié)點信息(使用 SDK 或者 HTTP 接口)
心跳機制:客戶端主動維持上報(使用 SDK 或者 HTTP 接口,默認 30s 上報一次)
剔除機制:未收到客戶端 3 次心跳后,服務端主動刪除
查詢機制:客戶端主動查詢節(jié)點信息(使用 SDK 或者 HTTP 接口)
Spring Cloud 是如何實現(xiàn)服務治理的
Spring Cloud Commons 之服務治理淺析
Spring 在設計的時候,通常會考慮方便擴展和消除樣板代碼,在 Spring Clond 同樣存在這樣的設計。
在 Spring Cloud 體系中,Spring Cloud Commons 是最重要的一個項目,其中定義了服務注冊、服務發(fā)現(xiàn)、負載均衡相關的接口以及一些公共組件,通過看這個項目,我們可以簡單的理解一下 Spring Cloud 注冊發(fā)現(xiàn)的核心流程。
Spring Clond Commons 項目中提供了如下的項目結構(在這里省略了部分代碼文件和結構)
└── src
├── main
│ ├── java
│ │ └── org
│ │ └── springframework
│ │ └── cloud
│ │ ├── client
│ │ │ ├── DefaultServiceInstance.java
│ │ │ ├── ServiceInstance.java Spring Cloud 對服務實例信息的定義
│ │ │ ├── discovery 服務發(fā)現(xiàn)相關
│ │ │ │ ├── DiscoveryClient.java
│ │ │ │ ├── EnableDiscoveryClient.java
│ │ │ │ ├── EnableDiscoveryClientImportSelector.java
│ │ │ │ ├── ManagementServerPortUtils.java
│ │ │ │ ├── ReactiveDiscoveryClient.java
│ │ │ │ ├── composite
│ │ │ │ │ ├── CompositeDiscoveryClient.java
│ │ │ │ │ ├── CompositeDiscoveryClientAutoConfiguration.java
│ │ │ │ │ └── reactive
│ │ │ │ │ ├── ReactiveCompositeDiscoveryClient.java
│ │ │ │ │ └── ReactiveCompositeDiscoveryClientAutoConfiguration.java
│ │ │ │ ├── health 健康檢查相關
│ │ │ │ ├── DiscoveryClientHealthIndicator.java
│ │ │ │ ├── DiscoveryClientHealthIndicatorProperties.java
│ │ │ │ ├── DiscoveryCompositeHealthContributor.java
│ │ │ │ ├── DiscoveryHealthIndicator.java
│ │ │ │ └── reactive
│ │ │ │ ├── ReactiveDiscoveryClientHealthIndicator.java
│ │ │ │ ├── ReactiveDiscoveryCompositeHealthContributor.java
│ │ │ │ └── ReactiveDiscoveryHealthIndicator.java
│ │ │ ├── loadbalancer 這下面是負載均衡相關邏輯
│ │ │ └── serviceregistry 服務注冊相關
│ │ │ ├── AbstractAutoServiceRegistration.java
│ │ │ ├── AutoServiceRegistration.java
│ │ │ ├── AutoServiceRegistrationAutoConfiguration.java
│ │ │ ├── AutoServiceRegistrationConfiguration.java
│ │ │ ├── AutoServiceRegistrationProperties.java
│ │ │ ├── Registration.java
│ │ │ ├── ServiceRegistry.java
│ │ │ ├── ServiceRegistryAutoConfiguration.java
│ │ ├── commons
│ │ ├── httpclient http 工廠類,在配置中可以選擇使用 Apache Http 還是 OKHttp
│ │ │ ├── ApacheHttpClientFactory.java
│ │ │ └── OkHttpClientFactory.java
│ │ └── util
│ │ ├── IdUtils.java 通過這工具類來生成實例 id
│ │ └── InetUtils.java Spring Cloud 就是通過這個工具類是獲取服務項目的 ip 地址的
│ └── resources
│ └── META-INF
│ ├── additional-spring-configuration-metadata.json
│ └── spring.factories
└── test
├── java 測試相關代碼
在項目結構中可以看出各個部分對應的源碼,在服務治理中,首先是服務信息 ServiceInstance , 其中包括
服務名 ServiceId 這個就是我們類似的 xxx-server (spring.application.name)
服務實例唯一標識符 InstanceId
host
port
一些擴展信息 metadata, 這個主要用來提供給三方實現(xiàn)增加以下擴展信息
// 為了縮短篇幅,刪除了一些注釋
public interface ServiceInstance {
default String getInstanceId() {
return null;
}
String getServiceId();
String getHost();
int getPort();
boolean isSecure();
URI getUri();
Map<String, String> getMetadata();
default String getScheme() {
return null;
}
}
服務注冊
Registration 是 Spring Cloud 提供的一個注冊實現(xiàn)
public interface Registration extends ServiceInstance {
// 這里面是真沒有代碼
}
服務注冊的實際接口是 ServiceRegistry
public interface ServiceRegistry<R extends Registration> {
/**
* Registers the registration. A registration typically has information about an
* instance, such as its hostname and port.
* @param registration registration meta data
*/
void register(R registration);
/**
* Deregisters the registration.
* @param registration registration meta data
*/
void deregister(R registration);
/**
* Closes the ServiceRegistry. This is a lifecycle method.
*/
void close();
/**
* Sets the status of the registration. The status values are determined by the
* individual implementations.
* @param registration The registration to update.
* @param status The status to set.
* @see org.springframework.cloud.client.serviceregistry.endpoint.ServiceRegistryEndpoint
*/
void setStatus(R registration, String status);
/**
* Gets the status of a particular registration.
* @param registration The registration to query.
* @param <T> The type of the status.
* @return The status of the registration.
* @see org.springframework.cloud.client.serviceregistry.endpoint.ServiceRegistryEndpoint
*/
<T> T getStatus(R registration);
}
通過實現(xiàn) ServiceRegistry 即可完成一個簡單服務注冊功能
服務發(fā)現(xiàn)
在 discovery 下存在兩個服務發(fā)現(xiàn)定義接口 DiscoveryClient 和 ReactiveDiscoveryClient
其提供了如下功能:
獲取所有的服務名稱
根據(jù)服務名稱獲取對應的服務實例列表
public interface DiscoveryClient extends Ordered {
/**
* Default order of the discovery client.
*/
int DEFAULT_ORDER = 0;
/**
* A human-readable description of the implementation, used in HealthIndicator.
* @return The description.
*/
String description();
/**
* Gets all ServiceInstances associated with a particular serviceId.
* @param serviceId The serviceId to query.
* @return A List of ServiceInstance.
*/
List<ServiceInstance> getInstances(String serviceId);
/**
* @return All known service IDs.
*/
List<String> getServices();
/**
* Default implementation for getting order of discovery clients.
* @return order
*/
@Override
default int getOrder() {
return DEFAULT_ORDER;
}
}
通過實現(xiàn) DiscoveryClient 即可完成服務發(fā)現(xiàn)
健康檢測
ReactiveDiscoveryClientHealthIndicator 提供了健康檢測功能
從 DiscoveryClient 中獲取所有的服務名列表
根據(jù)服務名列表獲取對應的服務實例列表
對每個實例進行健康檢測,如果響應成功則 UP 否則為 DOWN
public class ReactiveDiscoveryClientHealthIndicator
implements ReactiveDiscoveryHealthIndicator, Ordered, ApplicationListener<InstanceRegisteredEvent<?>> {
private final ReactiveDiscoveryClient discoveryClient;
private final DiscoveryClientHealthIndicatorProperties properties;
private final Log log = LogFactory.getLog(ReactiveDiscoveryClientHealthIndicator.class);
private AtomicBoolean discoveryInitialized = new AtomicBoolean(false);
private int order = Ordered.HIGHEST_PRECEDENCE;
public ReactiveDiscoveryClientHealthIndicator(ReactiveDiscoveryClient discoveryClient,
DiscoveryClientHealthIndicatorProperties properties) {
this.discoveryClient = discoveryClient;
this.properties = properties;
}
@Override
public void onApplicationEvent(InstanceRegisteredEvent<?> event) {
if (this.discoveryInitialized.compareAndSet(false, true)) {
this.log.debug("Discovery Client has been initialized");
}
}
@Override
public Mono<Health> health() {
if (this.discoveryInitialized.get()) {
return doHealthCheck();
}
else {
return Mono.just(
Health.status(new Status(Status.UNKNOWN.getCode(), "Discovery Client not initialized")).build());
}
}
private Mono<Health> doHealthCheck() {
// @formatter:off
return Mono.justOrEmpty(this.discoveryClient)
.flatMapMany(ReactiveDiscoveryClient::getServices)
.collectList()
.defaultIfEmpty(emptyList())
.map(services -> {
ReactiveDiscoveryClient client = this.discoveryClient;
String description = (this.properties.isIncludeDescription())
? client.description() : "";
return Health.status(new Status("UP", description))
.withDetail("services", services).build();
})
.onErrorResume(exception -> {
this.log.error("Error", exception);
return Mono.just(Health.down().withException(exception).build());
});
// @formatter:on
}
@Override
public String getName() {
return discoveryClient.description();
}
@Override
public int getOrder() {
return this.order;
}
public void setOrder(int order) {
this.order = order;
}
}
通過上面的接口定義和自帶的健康檢測邏輯可以看出做一個服務治理需要實現(xiàn)的最簡單的邏輯
實現(xiàn) ServiceRegistry 功能
實現(xiàn) DiscoveryClient 功能
Spring Cloud Consul 實現(xiàn)
實現(xiàn) ServiceRegistry 功能
在 Spring Cloud Consul 中,首先自定義了 Registration 的實現(xiàn)
其中 NewService 為 Consul 定義的一些服務實例信息
public class ConsulRegistration implements Registration {
private final NewService service;
private ConsulDiscoveryProperties properties;
public ConsulRegistration(NewService service, ConsulDiscoveryProperties properties) {
this.service = service;
this.properties = properties;
}
public NewService getService() {
return this.service;
}
protected ConsulDiscoveryProperties getProperties() {
return this.properties;
}
public String getInstanceId() {
return getService().getId();
}
public String getServiceId() {
return getService().getName();
}
@Override
public String getHost() {
return getService().getAddress();
}
@Override
public int getPort() {
return getService().getPort();
}
@Override
public boolean isSecure() {
return this.properties.getScheme().equalsIgnoreCase("https");
}
@Override
public URI getUri() {
return DefaultServiceInstance.getUri(this);
}
@Override
public Map<String, String> getMetadata() {
return getService().getMeta();
}
}
NewService
其包含了服務的基本信息和 Consul 本身提供一些特有功能如:Tags、Check
// 刪除了通用的 getter、setter、toString 方法
public class NewService {
@SerializedName("ID")
private String id;
@SerializedName("Name")
private String name;
@SerializedName("Tags")
private List<String> tags;
@SerializedName("Address")
private String address;
@SerializedName("Meta")
private Map<String, String> meta;
@SerializedName("Port")
private Integer port;
@SerializedName("EnableTagOverride")
private Boolean enableTagOverride;
@SerializedName("Check")
private NewService.Check check;
@SerializedName("Checks")
private List<NewService.Check> checks;
public NewService() {
}
public static class Check {
@SerializedName("Script")
private String script;
@SerializedName("DockerContainerID")
private String dockerContainerID;
@SerializedName("Shell")
private String shell;
@SerializedName("Interval")
private String interval;
@SerializedName("TTL")
private String ttl;
@SerializedName("HTTP")
private String http;
@SerializedName("Method")
private String method;
@SerializedName("Header")
private Map<String, List<String>> header;
@SerializedName("TCP")
private String tcp;
@SerializedName("Timeout")
private String timeout;
@SerializedName("DeregisterCriticalServiceAfter")
private String deregisterCriticalServiceAfter;
@SerializedName("TLSSkipVerify")
private Boolean tlsSkipVerify;
@SerializedName("Status")
private String status;
@SerializedName("GRPC")
private String grpc;
@SerializedName("GRPCUseTLS")
private Boolean grpcUseTLS;
public Check() {
}
}
}
ConsulServiceRegistry 實現(xiàn) ServiceRegistry
public class ConsulServiceRegistry implements ServiceRegistry<ConsulRegistration> {
private static Log log = LogFactory.getLog(ConsulServiceRegistry.class);
private final ConsulClient client;
private final ConsulDiscoveryProperties properties;
private final TtlScheduler ttlScheduler;
private final HeartbeatProperties heartbeatProperties;
public ConsulServiceRegistry(ConsulClient client, ConsulDiscoveryProperties properties, TtlScheduler ttlScheduler,
HeartbeatProperties heartbeatProperties) {
this.client = client;
this.properties = properties;
this.ttlScheduler = ttlScheduler;
this.heartbeatProperties = heartbeatProperties;
}
@Override
public void register(ConsulRegistration reg) {
log.info("Registering service with consul: " + reg.getService());
try {
// 同樣是通過 consul 提供的 api 接口進行服務注冊
this.client.agentServiceRegister(reg.getService(), this.properties.getAclToken());
NewService service = reg.getService();
if (this.heartbeatProperties.isEnabled() && this.ttlScheduler != null && service.getCheck() != null
&& service.getCheck().getTtl() != null) {
this.ttlScheduler.add(reg.getInstanceId());
}
}
catch (ConsulException e) {
if (this.properties.isFailFast()) {
log.error("Error registering service with consul: " + reg.getService(), e);
ReflectionUtils.rethrowRuntimeException(e);
}
log.warn("Failfast is false. Error registering service with consul: " + reg.getService(), e);
}
}
@Override
public void deregister(ConsulRegistration reg) {
if (this.ttlScheduler != null) {
this.ttlScheduler.remove(reg.getInstanceId());
}
if (log.isInfoEnabled()) {
log.info("Deregistering service with consul: " + reg.getInstanceId());
}
this.client.agentServiceDeregister(reg.getInstanceId(), this.properties.getAclToken());
}
@Override
public void close() {
}
@Override
public void setStatus(ConsulRegistration registration, String status) {
if (status.equalsIgnoreCase(OUT_OF_SERVICE.getCode())) {
this.client.agentServiceSetMaintenance(registration.getInstanceId(), true);
}
else if (status.equalsIgnoreCase(UP.getCode())) {
this.client.agentServiceSetMaintenance(registration.getInstanceId(), false);
}
else {
throw new IllegalArgumentException("Unknown status: " + status);
}
}
// 服務實例狀態(tài)
@Override
public Object getStatus(ConsulRegistration registration) {
String serviceId = registration.getServiceId();
Response<List<Check>> response = this.client.getHealthChecksForService(serviceId,
HealthChecksForServiceRequest.newBuilder().setQueryParams(QueryParams.DEFAULT).build());
List<Check> checks = response.getValue();
for (Check check : checks) {
if (check.getServiceId().equals(registration.getInstanceId())) {
if (check.getName().equalsIgnoreCase("Service Maintenance Mode")) {
return OUT_OF_SERVICE.getCode();
}
}
}
return UP.getCode();
}
}
ConsulDiscoveryClient 實現(xiàn) DiscoveryClient
在發(fā)現(xiàn)邏輯中也是通過 consul 提供的 api 接口進行查詢
public class ConsulDiscoveryClient implements DiscoveryClient {
private final ConsulClient client;
private final ConsulDiscoveryProperties properties;
public ConsulDiscoveryClient(ConsulClient client, ConsulDiscoveryProperties properties) {
this.client = client;
this.properties = properties;
}
@Override
public String description() {
return "Spring Cloud Consul Discovery Client";
}
@Override
public List<ServiceInstance> getInstances(final String serviceId) {
return getInstances(serviceId, new QueryParams(this.properties.getConsistencyMode()));
}
public List<ServiceInstance> getInstances(final String serviceId, final QueryParams queryParams) {
List<ServiceInstance> instances = new ArrayList<>();
addInstancesToList(instances, serviceId, queryParams);
return instances;
}
private void addInstancesToList(List<ServiceInstance> instances, String serviceId, QueryParams queryParams) {
HealthServicesRequest.Builder requestBuilder = HealthServicesRequest.newBuilder()
.setPassing(this.properties.isQueryPassing()).setQueryParams(queryParams)
.setToken(this.properties.getAclToken());
String queryTag = this.properties.getQueryTagForService(serviceId);
if (queryTag != null) {
requestBuilder.setTag(queryTag);
}
HealthServicesRequest request = requestBuilder.build();
Response<List<HealthService>> services = this.client.getHealthServices(serviceId, request);
for (HealthService service : services.getValue()) {
instances.add(new ConsulServiceInstance(service, serviceId));
}
}
public List<ServiceInstance> getAllInstances() {
List<ServiceInstance> instances = new ArrayList<>();
Response<Map<String, List<String>>> services = this.client
.getCatalogServices(CatalogServicesRequest.newBuilder().setQueryParams(QueryParams.DEFAULT).build());
for (String serviceId : services.getValue().keySet()) {
addInstancesToList(instances, serviceId, QueryParams.DEFAULT);
}
return instances;
}
@Override
public List<String> getServices() {
CatalogServicesRequest request = CatalogServicesRequest.newBuilder().setQueryParams(QueryParams.DEFAULT)
.setToken(this.properties.getAclToken()).build();
return new ArrayList<>(this.client.getCatalogServices(request).getValue().keySet());
}
@Override
public int getOrder() {
return this.properties.getOrder();
}
}
總結
簡要的 Spring Cloud Consul 的服務治理邏輯大致如此,當然 Spring Cloud Consul 還要處理大量的細節(jié),代碼還是很多的
在 Spring Cloud 體系中 Consul 并不提供服務請求轉發(fā)的功能,只是提供對服務信息的保存、查詢、健康檢測剔除功能
參考
Consul 官方介紹 https://www.consul.io/docs/intro
Spring Cloud Consul https://github.com/spring-cloud/spring-cloud-consul
Spring Cloud Commons https://github.com/spring-cloud/spring-cloud-commons
Nacos 文檔 https://nacos.io
https://github.com/Netflix/eureka
