Nacos7# Distro協(xié)議增量同步
引言
本文接著擼Distro協(xié)議,上文中分析了在Nacos server啟動(dòng)時(shí)會(huì)進(jìn)行全量數(shù)據(jù)同步和數(shù)據(jù)校驗(yàn),具體數(shù)據(jù)即客戶端注冊(cè)節(jié)點(diǎn)信息含命名空間、分組名稱、服務(wù)名稱、節(jié)點(diǎn)Instance信息等。什么時(shí)候會(huì)觸發(fā)增量同步?增量同步都干了些啥,下文接著擼擼增量數(shù)據(jù)同步。
增量數(shù)據(jù)同步
在Nacos節(jié)點(diǎn)啟動(dòng)時(shí)通過(guò)事件驅(qū)動(dòng)模式訂閱了ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent事件 當(dāng)節(jié)點(diǎn)收到ClientChangedEvent事件時(shí),會(huì)向集群中其他節(jié)點(diǎn)發(fā)送更新Client信息請(qǐng)求,其他節(jié)點(diǎn)收到后更新緩存 當(dāng)節(jié)點(diǎn)收到ClientVerifyFailedEvent事件時(shí),向該Event指定的目標(biāo)節(jié)點(diǎn)發(fā)起新增該Event指定的Client信息請(qǐng)求,目標(biāo)節(jié)點(diǎn)收到后更新到自己緩存中 當(dāng)節(jié)點(diǎn)收到ClientDisconnectEvent事件時(shí),會(huì)向集群中其他節(jié)點(diǎn)發(fā)送刪除Client信息請(qǐng)求,其他節(jié)點(diǎn)收到后將該Client緩存刪除
增量事件觸發(fā)
當(dāng)有服務(wù)注冊(cè)或者注銷時(shí)會(huì)觸發(fā)ClientEvent.ClientChangedEvent事件,即客戶端調(diào)用naming.registerInstance或者naming.deregisterInstance 定時(shí)任務(wù)每隔3秒鐘定時(shí)檢查緩存中的所有連接,如果超過(guò)保鮮期20秒則再次發(fā)起連接請(qǐng)求,連接未成功則注銷關(guān)閉該連接并發(fā)布ClientEvent.ClientDisconnectEvent事件 Nacos集群之間通過(guò)每5秒發(fā)送心跳校驗(yàn)數(shù)據(jù)請(qǐng)求(具體為本節(jié)點(diǎn)負(fù)責(zé)Client信息),其他節(jié)點(diǎn)接受到校驗(yàn)請(qǐng)求,如果緩存中存在該client表示校驗(yàn)成功,同時(shí)更新保鮮時(shí)間;否則校驗(yàn)失敗,回調(diào)返回失敗Response,請(qǐng)求節(jié)點(diǎn)收到失敗的Response后會(huì)發(fā)布ClientVerifyFailedEvent事件
將代碼翻到DistroClientDataProcessor類中,該類繼承了SmartSubscriber,遵循Subscriber/Notify模式,即事件驅(qū)動(dòng)模式。該模式前面文章中分析過(guò),當(dāng)有訂閱的事件時(shí)會(huì)進(jìn)行回調(diào)通知。
訂閱的事件
DistroClientDataProcessor訂閱了ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent事件。
@Override
public List<Class<? extends Event>> subscribeTypes() {
List<Class<? extends Event>> result = new LinkedList<>();
result.add(ClientEvent.ClientChangedEvent.class);
result.add(ClientEvent.ClientDisconnectEvent.class);
result.add(ClientEvent.ClientVerifyFailedEvent.class);
return result;
}
當(dāng)有上述三個(gè)事件產(chǎn)生時(shí),DefaultPublisher回調(diào)onEvent方法。
public void onEvent(Event event) {
if (EnvUtil.getStandaloneMode()) {
return;
}
if (!upgradeJudgement.isUseGrpcFeatures()) {
return;
}
if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
// 注解@1
syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
} else {
// 注解@2
syncToAllServer((ClientEvent) event);
}
}
注解@1 將ClientVerifyFailedEvent同步給校驗(yàn)失敗的節(jié)點(diǎn),操作類型為ADD
注解@2 將同步給集群中的其他節(jié)
private void syncToAllServer(ClientEvent event) {
Client client = event.getClient();
// Only ephemeral data sync by Distro, persist client should sync by raft.
if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
return;
}
if (event instanceof ClientEvent.ClientDisconnectEvent) {
// 注解@3
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.DELETE);
} else if (event instanceof ClientEvent.ClientChangedEvent) {
// 注解@4
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.CHANGE);
}
}
注解@3 當(dāng)客戶端斷開(kāi)連接事件ClientDisconnectEvent時(shí),向其他節(jié)點(diǎn)同步DELETE操作
注解@4 當(dāng)客戶端變更事件ClientChangedEvent時(shí),向其他節(jié)點(diǎn)同步CHANGE操作
接著看下不同操作類型的處理
@Override
public boolean process(NacosTask task) {
if (!(task instanceof DistroDelayTask)) {
return true;
}
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
switch (distroDelayTask.getAction()) {
case DELETE: // 刪除操作
DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
return true;
case CHANGE:
case ADD: // 更新和新增操作
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
return true;
default:
return false;
}
}
向指定的集群節(jié)點(diǎn)同步更新數(shù)據(jù)
@Override
public boolean syncData(DistroData data, String targetServer) {
if (isNoExistTarget(targetServer)) {
return true;
}
// 構(gòu)造請(qǐng)求數(shù)據(jù)并設(shè)置數(shù)據(jù)類型
DistroDataRequest request = new DistroDataRequest(data, data.getType());
// 查找目標(biāo)節(jié)點(diǎn)緩存數(shù)據(jù)
Member member = memberManager.find(targetServer);
// 節(jié)點(diǎn)狀態(tài)檢查需UP狀態(tài),即:可通信狀態(tài)
if (checkTargetServerStatusUnhealthy(member)) {
Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);
return false;
}
try {
// 向目標(biāo)節(jié)點(diǎn)發(fā)送數(shù)據(jù)
Response response = clusterRpcClientProxy.sendRequest(member, request);
return checkResponse(response);
} catch (NacosException e) {
Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e);
}
return false;
}
異步更新操作
@Override
public void syncData(DistroData data, String targetServer, DistroCallback callback) {
if (isNoExistTarget(targetServer)) {
callback.onSuccess();
}
DistroDataRequest request = new DistroDataRequest(data, data.getType());
Member member = memberManager.find(targetServer);
try {
// 異步更新操作
clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member));
} catch (NacosException nacosException) {
callback.onFailed(nacosException);
}
}
節(jié)點(diǎn)收到這些操作請(qǐng)求如何處理呢?
代碼翻到DistroDataRequestHandler#handle(),集群中節(jié)點(diǎn)收到請(qǐng)求后處理邏輯在這里:
@Override
public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
try {
switch (request.getDataOperation()) {
case VERIFY:
return handleVerify(request.getDistroData(), meta);
case SNAPSHOT:
return handleSnapshot();
case ADD:
case CHANGE:
case DELETE:
return handleSyncData(request.getDistroData());
case QUERY:
return handleQueryData(request.getDistroData());
default:
return new DistroDataResponse();
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
DistroDataResponse result = new DistroDataResponse();
result.setErrorCode(ResponseCode.FAIL.getCode());
result.setMessage("handle distro request with exception");
return result;
}
}
可以看出ADD、CHANGE和DELETE均由handleSyncData處理。
private DistroDataResponse handleSyncData(DistroData distroData) {
DistroDataResponse result = new DistroDataResponse();
if (!distroProtocol.onReceive(distroData)) {
result.setErrorCode(ResponseCode.FAIL.getCode());
result.setMessage("[DISTRO-FAILED] distro data handle failed");
}
return result;
}
@Override
public boolean processData(DistroData distroData) {
switch (distroData.getType()) {
case ADD:
case CHANGE:
ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
.deserialize(distroData.getContent(), ClientSyncData.class);
handlerClientSyncData(clientSyncData); // 注解@5
return true;
case DELETE:
String deleteClientId = distroData.getDistroKey().getResourceKey();
Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
clientManager.clientDisconnected(deleteClientId); // 注解@6
return true;
default:
return false;
}
}
注解@5 將同步過(guò)來(lái)的Client信息進(jìn)行緩存
private void handlerClientSyncData(ClientSyncData clientSyncData) {
Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
Client client = clientManager.getClient(clientSyncData.getClientId());
// 注解@5.1
upgradeClient(client, clientSyncData);
}
需要的是從其他節(jié)點(diǎn)通過(guò)過(guò)來(lái)的Client信息,ConnectionBasedClient屬性isNative為false表示該連接時(shí)從其他節(jié)點(diǎn)同步過(guò)來(lái)的;true表示該連接客戶端直接連接的。
public boolean syncClientConnected(String clientId, ClientSyncAttributes attributes) {
String type = attributes.getClientAttribute(ClientConstants.CONNECTION_TYPE);
ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type);
return clientConnected(clientFactory.newSyncedClient(clientId, attributes));
}
@Override
public ConnectionBasedClient newSyncedClient(String clientId, ClientSyncAttributes attributes) {
return new ConnectionBasedClient(clientId, false); // false表示從其他節(jié)點(diǎn)同步過(guò)來(lái)的client
}
@Override
public boolean clientConnected(Client client) {
Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());
if (!clients.containsKey(client.getClientId())) {
clients.putIfAbsent(client.getClientId(), (ConnectionBasedClient) client); // 緩存client
}
return true;
}
注解@5.1 更新Client的Service以及Instance信息。
private void upgradeClient(Client client, ClientSyncData clientSyncData) {
List<String> namespaces = clientSyncData.getNamespaces();
List<String> groupNames = clientSyncData.getGroupNames();
List<String> serviceNames = clientSyncData.getServiceNames();
List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
Set<Service> syncedService = new HashSet<>();
for (int i = 0; i < namespaces.size(); i++) {
Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
Service singleton = ServiceManager.getInstance().getSingleton(service);
syncedService.add(singleton);
InstancePublishInfo instancePublishInfo = instances.get(i);
if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
client.addServiceInstance(singleton, instancePublishInfo);
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
}
}
for (Service each : client.getAllPublishedService()) {
if (!syncedService.contains(each)) {
client.removeServiceInstance(each);
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
}
}
}
注解@6 響應(yīng)刪除操作,從clients緩存中移除。
@Override
public boolean clientDisconnected(String clientId) {
Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
ConnectionBasedClient client = clients.remove(clientId);
if (null == client) {
return true;
}
client.release();
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
return true;
}
小結(jié): 增量同步的邏輯如下:當(dāng)本節(jié)點(diǎn)DistroClientDataProcessor收到ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent事件時(shí),會(huì)向Nacos集群的其他節(jié)點(diǎn)同步Client信息;集群中其他節(jié)點(diǎn)收到同步信息后更新或者刪除本地緩存的Client信息;通過(guò)增量同步的Client信息isNative為false表示不是由客戶端直連的。
在Nacos server啟動(dòng)時(shí)從運(yùn)行時(shí)內(nèi)存信息可以看出,總共緩存了17個(gè)事件類型。當(dāng)然也包括ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent。

ClientChangedEvent事件觸發(fā)
當(dāng)處理服務(wù)注冊(cè)和注銷事件時(shí)會(huì)觸發(fā)ClientChangeEvent事件,詳見(jiàn)InstanceRequestHandler#handle處理邏輯。
public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
Service service = Service
.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
switch (request.getType()) {
// 注解@7
case NamingRemoteConstants.REGISTER_INSTANCE:
return registerInstance(service, request, meta);
// 注解@8
case NamingRemoteConstants.DE_REGISTER_INSTANCE:
return deregisterInstance(service, request, meta);
default:
throw new NacosException(NacosException.INVALID_PARAM,
String.format("Unsupported request type %s", request.getType()));
}
}
注解@7 處理注冊(cè)請(qǐng)求,會(huì)調(diào)用到addServiceInstance方法,該方法中發(fā)布了ClientEvent.ClientChangedEvent事件。
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
if (null == publishers.put(service, instancePublishInfo)) {
MetricsMonitor.incrementInstanceCount();
}
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
return true;
}
注解@8 處理注銷請(qǐng)求,會(huì)調(diào)用到removeServiceInstance方法,該方法中發(fā)布了ClientEvent.ClientChangedEvent事件
public InstancePublishInfo removeServiceInstance(Service service) {
InstancePublishInfo result = publishers.remove(service);
if (null != result) {
MetricsMonitor.decrementInstanceCount();
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
}
Loggers.SRV_LOG.info("Client remove for service {}, {}", service, getClientId());
return result;
}
小結(jié): 當(dāng)有服務(wù)注冊(cè)或者注銷時(shí)會(huì)觸發(fā)ClientEvent.ClientChangedEvent事件。
ClientDisconnectEvent事件觸發(fā)
下面一段代碼通過(guò)檢測(cè)連接是否超過(guò)保鮮期,超過(guò)保鮮期的會(huì)被注銷關(guān)閉,翻到代碼ConnectionManager#start()。
@PostConstruct
public void start() {
// 定時(shí)任務(wù)每3秒執(zhí)行一次
RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
// 獲取緩存連接
int totalCount = connections.size();
Loggers.REMOTE_DIGEST.info("Connection check task start");
MetricsMonitor.getLongConnectionMonitor().set(totalCount);
// 所有連接集合
Set<Map.Entry<String, Connection>> entries = connections.entrySet();
// 獲取通過(guò)SDK連接的數(shù)量
int currentSdkClientCount = currentSdkClientCount();
boolean isLoaderClient = loadClient >= 0;
int currentMaxClient = isLoaderClient ? loadClient : connectionLimitRule.countLimit;
int expelCount = currentMaxClient < 0 ? 0 : Math.max(currentSdkClientCount - currentMaxClient, 0);
List<String> expelClient = new LinkedList<>();
Map<String, AtomicInteger> expelForIp = new HashMap<>(16);
// 1. calculate expel count of ip.
// 加載Connection ConnectionLimitRule
// 默認(rèn)路徑為 ${usr.home}/nacos/data/loader/limitRule
for (Map.Entry<String, Connection> entry : entries) {
Connection client = entry.getValue();
String appName = client.getMetaInfo().getAppName();
String clientIp = client.getMetaInfo().getClientIp();
if (client.getMetaInfo().isSdkSource() && !expelForIp.containsKey(clientIp)) {
//get limit for current ip.
// 默認(rèn)無(wú)limit限制
int countLimitOfIp = connectionLimitRule.getCountLimitOfIp(clientIp);
// 默認(rèn)無(wú)limit限制
if (countLimitOfIp < 0) {
int countLimitOfApp = connectionLimitRule.getCountLimitOfApp(appName);
countLimitOfIp = countLimitOfApp < 0 ? countLimitOfIp : countLimitOfApp;
}
if (countLimitOfIp < 0) { // 默認(rèn)無(wú)限制
countLimitOfIp = connectionLimitRule.getCountLimitPerClientIpDefault();
}
if (countLimitOfIp >= 0 && connectionForClientIp.containsKey(clientIp)) {
AtomicInteger currentCountIp = connectionForClientIp.get(clientIp);
if (currentCountIp != null && currentCountIp.get() > countLimitOfIp) {
expelForIp.put(clientIp, new AtomicInteger(currentCountIp.get() - countLimitOfIp));
}
}
}
}
if (expelForIp.size() > 0) { // 默認(rèn)等于0
Loggers.REMOTE_DIGEST.info("Over limit ip expel info,", expelForIp);
}
Set<String> outDatedConnections = new HashSet<>();
long now = System.currentTimeMillis();
// 2.get expel connection for ip limit.
//
for (Map.Entry<String, Connection> entry : entries) {
Connection client = entry.getValue();
String clientIp = client.getMetaInfo().getClientIp();
AtomicInteger integer = expelForIp.get(clientIp);
if (integer != null && integer.intValue() > 0) {
integer.decrementAndGet();
expelClient.add(client.getMetaInfo().getConnectionId());
expelCount--;
} else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) { // 保鮮時(shí)間超過(guò)20秒放入outDatedConnections集合
outDatedConnections.add(client.getMetaInfo().getConnectionId());
}
}
// 3. if total count is still over limit.
// expelCount 默認(rèn)為0
if (expelCount > 0) {
for (Map.Entry<String, Connection> entry : entries) {
Connection client = entry.getValue();
if (!expelForIp.containsKey(client.getMetaInfo().clientIp) && client.getMetaInfo()
.isSdkSource() && expelCount > 0) {
expelClient.add(client.getMetaInfo().getConnectionId());
expelCount--;
outDatedConnections.remove(client.getMetaInfo().getConnectionId());
}
}
}
String serverIp = null;
String serverPort = null;
if (StringUtils.isNotBlank(redirectAddress) && redirectAddress.contains(Constants.COLON)) {
String[] split = redirectAddress.split(Constants.COLON);
serverIp = split[0];
serverPort = split[1];
}
for (String expelledClientId : expelClient) { // 默認(rèn)空集合
try {
Connection connection = getConnection(expelledClientId);
if (connection != null) {
ConnectResetRequest connectResetRequest = new ConnectResetRequest();
connectResetRequest.setServerIp(serverIp);
connectResetRequest.setServerPort(serverPort);
connection.asyncRequest(connectResetRequest, null);
}
} catch (ConnectionAlreadyClosedException e) {
unregister(expelledClientId);
} catch (Exception e) {
Loggers.REMOTE_DIGEST.error("Error occurs when expel connection :", expelledClientId, e);
}
}
//4.client active detection.
Loggers.REMOTE_DIGEST.info("Out dated connection ,size={}", outDatedConnections.size());
// 超過(guò)保鮮期的鏈接集合
if (CollectionUtils.isNotEmpty(outDatedConnections)) {
Set<String> successConnections = new HashSet<>();
final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
for (String outDateConnectionId : outDatedConnections) {
try {
Connection connection = getConnection(outDateConnectionId);
if (connection != null) {
ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
// 超過(guò)保鮮時(shí)間的連接,重新異步發(fā)起連接
connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public long getTimeout() {
return 1000L;
}
@Override
public void onResponse(Response response) {
latch.countDown();
if (response != null && response.isSuccess()) {
// 刷新激活時(shí)間
connection.freshActiveTime();
successConnections.add(outDateConnectionId);
}
}
@Override
public void onException(Throwable e) {
latch.countDown();
}
});
} else {
latch.countDown();
}
} catch (ConnectionAlreadyClosedException e) {
latch.countDown();
} catch (Exception e) {
// ...
latch.countDown();
}
}
latch.await(3000L, TimeUnit.MILLISECONDS);
Loggers.REMOTE_DIGEST
.info("Out dated connection check successCount={}", successConnections.size());
// 無(wú)效連接集合
for (String outDateConnectionId : outDatedConnections) {
if (!successConnections.contains(outDateConnectionId)) {
Loggers.REMOTE_DIGEST
.info("[{}]Unregister Out dated connection....", outDateConnectionId);
// 注銷關(guān)閉connection
unregister(outDateConnectionId);
}
}
}
if (isLoaderClient) { // 重置
loadClient = -1;
redirectAddress = null;
}
} catch (Throwable e) {
}
}
}, 1000L, 3000L, TimeUnit.MILLISECONDS);
}
public synchronized void unregister(String connectionId) {
Connection remove = this.connections.remove(connectionId);
if (remove != null) {
String clientIp = remove.getMetaInfo().clientIp;
AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
if (atomicInteger != null) {
int count = atomicInteger.decrementAndGet();
if (count <= 0) {
connectionForClientIp.remove(clientIp);
}
}
remove.close();
Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId);
clientConnectionEventListenerRegistry.notifyClientDisConnected(remove); // 異步
}
}
public void notifyClientDisConnected(final Connection connection) {
for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {
try {
clientConnectionEventListener.clientDisConnected(connection);
} catch (Throwable throwable) {
Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}",
clientConnectionEventListener.getName(), throwable);
}
}
}
@Override
public boolean clientDisconnected(String clientId) {
Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
ConnectionBasedClient client = clients.remove(clientId);
if (null == client) {
return true;
}
client.release();
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client)); // 發(fā)布ClientDisconnectEvent事件
return true;
}
小結(jié): 連接可以配置限制規(guī)則具體在${usr.home}/nacos/data/loader/limitRule文件配置,默認(rèn)無(wú)限制;通過(guò)定時(shí)任務(wù)每隔3秒鐘定時(shí)檢查緩存中的所有連接包括通過(guò)來(lái)源sdk的連接和集群的連接;如果連接超過(guò)保鮮期20秒,并再次發(fā)起連接請(qǐng)求,未能連接成功則注銷關(guān)閉該連接;注銷關(guān)閉時(shí)發(fā)布ClientEvent.ClientDisconnectEvent事件。
ClientVerifyFailedEvent事件觸發(fā)
上一篇文章中梳理了Nacos集群中,每個(gè)節(jié)點(diǎn)會(huì)對(duì)集群中其他節(jié)點(diǎn)每隔5秒發(fā)送校驗(yàn)數(shù)據(jù),也就是心跳。當(dāng)校驗(yàn)的結(jié)果會(huì)進(jìn)行回調(diào)(gRPC為例),我們翻著看看這部分。
public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {
if (isNoExistTarget(targetServer)) {
callback.onSuccess();
}
DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
Member member = memberManager.find(targetServer);
try {
DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer,
verifyData.getDistroKey().getResourceKey(), callback, member);
clusterRpcClientProxy.asyncRequest(member, request, wrapper); // 向其他節(jié)點(diǎn)發(fā)送本節(jié)點(diǎn)負(fù)責(zé)的cleintId信息
} catch (NacosException nacosException) {
callback.onFailed(nacosException);
}
}
重點(diǎn)看下DistroVerifyCallbackWrapper部分,校驗(yàn)失敗發(fā)布ClientVerifyFailedEvent事件。
@Override
public void onResponse(Response response) {
if (checkResponse(response)) {
NamingTpsMonitor.distroVerifySuccess(member.getAddress(), member.getIp());
distroCallback.onSuccess();
} else {
Loggers.DISTRO.info("Target {} verify client {} failed, sync new client", targetServer, clientId);
// 校驗(yàn)失敗發(fā)布ClientVerifyFailedEvent事件
NotifyCenter.publishEvent(new ClientEvent.ClientVerifyFailedEvent(clientId, targetServer));
NamingTpsMonitor.distroVerifyFail(member.getAddress(), member.getIp());
distroCallback.onFailed(null);
}
}
最后看下ClientVerifyFailedEvent這個(gè)類,關(guān)注下成員變量包含了clientId和targetServer。當(dāng)收到ClientVerifyFailedEvent時(shí)用于向targetServer目標(biāo)節(jié)點(diǎn)添加客戶端clientId信息。
public static class ClientVerifyFailedEvent extends ClientEvent {
private static final long serialVersionUID = 2023951686223780851L;
private final String clientId;
private final String targetServer;
public ClientVerifyFailedEvent(String clientId, String targetServer) {
super(null);
this.clientId = clientId;
this.targetServer = targetServer;
}
public String getClientId() {
return clientId;
}
public String getTargetServer() {
return targetServer;
}
}
小結(jié): Nacos集群之間通過(guò)每5秒發(fā)送心跳校驗(yàn)數(shù)據(jù)請(qǐng)求(具體為本節(jié)點(diǎn)負(fù)責(zé)Client信息),其他節(jié)點(diǎn)接受到校驗(yàn)請(qǐng)求,如果緩存中存在該client表示校驗(yàn)成功,同時(shí)更新保鮮時(shí)間;否則校驗(yàn)失敗,回調(diào)返回失敗Response,請(qǐng)求節(jié)點(diǎn)收到失敗的Response后會(huì)發(fā)布ClientVerifyFailedEvent事件。
