<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Nacos7# Distro協(xié)議增量同步

          共 59874字,需瀏覽 120分鐘

           ·

          2021-07-08 18:44

          引言

          本文接著擼Distro協(xié)議,上文中分析了在Nacos server啟動(dòng)時(shí)會(huì)進(jìn)行全量數(shù)據(jù)同步和數(shù)據(jù)校驗(yàn),具體數(shù)據(jù)即客戶端注冊(cè)節(jié)點(diǎn)信息含命名空間、分組名稱、服務(wù)名稱、節(jié)點(diǎn)Instance信息等。什么時(shí)候會(huì)觸發(fā)增量同步?增量同步都干了些啥,下文接著擼擼增量數(shù)據(jù)同步。

          一、內(nèi)容提要

          增量數(shù)據(jù)同步

          • 在Nacos節(jié)點(diǎn)啟動(dòng)時(shí)通過(guò)事件驅(qū)動(dòng)模式訂閱了ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent事件
          • 當(dāng)節(jié)點(diǎn)收到ClientChangedEvent事件時(shí),會(huì)向集群中其他節(jié)點(diǎn)發(fā)送更新Client信息請(qǐng)求,其他節(jié)點(diǎn)收到后更新緩存
          • 當(dāng)節(jié)點(diǎn)收到ClientVerifyFailedEvent事件時(shí),向該Event指定的目標(biāo)節(jié)點(diǎn)發(fā)起新增該Event指定的Client信息請(qǐng)求,目標(biāo)節(jié)點(diǎn)收到后更新到自己緩存中
          • 當(dāng)節(jié)點(diǎn)收到ClientDisconnectEvent事件時(shí),會(huì)向集群中其他節(jié)點(diǎn)發(fā)送刪除Client信息請(qǐng)求,其他節(jié)點(diǎn)收到后將該Client緩存刪除

          增量事件觸發(fā)

          • 當(dāng)有服務(wù)注冊(cè)或者注銷時(shí)會(huì)觸發(fā)ClientEvent.ClientChangedEvent事件,即客戶端調(diào)用naming.registerInstance或者naming.deregisterInstance
          • 定時(shí)任務(wù)每隔3秒鐘定時(shí)檢查緩存中的所有連接,如果超過(guò)保鮮期20秒則再次發(fā)起連接請(qǐng)求,連接未成功則注銷關(guān)閉該連接并發(fā)布ClientEvent.ClientDisconnectEvent事件
          • Nacos集群之間通過(guò)每5秒發(fā)送心跳校驗(yàn)數(shù)據(jù)請(qǐng)求(具體為本節(jié)點(diǎn)負(fù)責(zé)Client信息),其他節(jié)點(diǎn)接受到校驗(yàn)請(qǐng)求,如果緩存中存在該client表示校驗(yàn)成功,同時(shí)更新保鮮時(shí)間;否則校驗(yàn)失敗,回調(diào)返回失敗Response,請(qǐng)求節(jié)點(diǎn)收到失敗的Response后會(huì)發(fā)布ClientVerifyFailedEvent事件

          二、增量數(shù)據(jù)同步

          將代碼翻到DistroClientDataProcessor類中,該類繼承了SmartSubscriber,遵循Subscriber/Notify模式,即事件驅(qū)動(dòng)模式。該模式前面文章中分析過(guò),當(dāng)有訂閱的事件時(shí)會(huì)進(jìn)行回調(diào)通知。

          訂閱的事件

          DistroClientDataProcessor訂閱了ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent事件。

          @Override
          public List<Class<? extends Event>> subscribeTypes() {
            List<Class<? extends Event>> result = new LinkedList<>();
            result.add(ClientEvent.ClientChangedEvent.class);
            result.add(ClientEvent.ClientDisconnectEvent.class);
            result.add(ClientEvent.ClientVerifyFailedEvent.class);
            return result;
          }

          當(dāng)有上述三個(gè)事件產(chǎn)生時(shí),DefaultPublisher回調(diào)onEvent方法。

          public void onEvent(Event event) {
              if (EnvUtil.getStandaloneMode()) {
                  return;
              }
              if (!upgradeJudgement.isUseGrpcFeatures()) {
                  return;
              }
              if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
                 // 注解@1
                  syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
              } else {
                  // 注解@2
                  syncToAllServer((ClientEvent) event);
              }
          }

          注解@1 將ClientVerifyFailedEvent同步給校驗(yàn)失敗的節(jié)點(diǎn),操作類型為ADD

          注解@2 將同步給集群中的其他節(jié)

          private void syncToAllServer(ClientEvent event) {
              Client client = event.getClient();
              // Only ephemeral data sync by Distro, persist client should sync by raft.
              if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
                  return;
              }
              if (event instanceof ClientEvent.ClientDisconnectEvent) {
                 // 注解@3
                  DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
                  distroProtocol.sync(distroKey, DataOperation.DELETE);
              } else if (event instanceof ClientEvent.ClientChangedEvent) {
                 // 注解@4
                  DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
                  distroProtocol.sync(distroKey, DataOperation.CHANGE);
              }
          }

          注解@3 當(dāng)客戶端斷開(kāi)連接事件ClientDisconnectEvent時(shí),向其他節(jié)點(diǎn)同步DELETE操作

          注解@4 當(dāng)客戶端變更事件ClientChangedEvent時(shí),向其他節(jié)點(diǎn)同步CHANGE操作

          接著看下不同操作類型的處理

          @Override
          public boolean process(NacosTask task) {
              if (!(task instanceof DistroDelayTask)) {
                  return true;
              }
              DistroDelayTask distroDelayTask = (DistroDelayTask) task;
              DistroKey distroKey = distroDelayTask.getDistroKey();
              switch (distroDelayTask.getAction()) {
                  case DELETE: // 刪除操作
                      DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
                      distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
                      return true;
                  case CHANGE:
                  case ADD: // 更新和新增操作
                      DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
                      distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
                      return true;
                  default:
                      return false;
              }
          }

          向指定的集群節(jié)點(diǎn)同步更新數(shù)據(jù)

          @Override
          public boolean syncData(DistroData data, String targetServer) {
              if (isNoExistTarget(targetServer)) {
                  return true;
              }
              // 構(gòu)造請(qǐng)求數(shù)據(jù)并設(shè)置數(shù)據(jù)類型
              DistroDataRequest request = new DistroDataRequest(data, data.getType());
              // 查找目標(biāo)節(jié)點(diǎn)緩存數(shù)據(jù)
              Member member = memberManager.find(targetServer);
              // 節(jié)點(diǎn)狀態(tài)檢查需UP狀態(tài),即:可通信狀態(tài)
              if (checkTargetServerStatusUnhealthy(member)) {
                  Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);
                  return false;
              }
              try {
                  // 向目標(biāo)節(jié)點(diǎn)發(fā)送數(shù)據(jù)
                  Response response = clusterRpcClientProxy.sendRequest(member, request);
                  return checkResponse(response);
              } catch (NacosException e) {
                  Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e);
              }
              return false;
          }

          異步更新操作

          @Override
          public void syncData(DistroData data, String targetServer, DistroCallback callback) {
              if (isNoExistTarget(targetServer)) {
                  callback.onSuccess();
              }
              DistroDataRequest request = new DistroDataRequest(data, data.getType());
              Member member = memberManager.find(targetServer);
              try {
                  // 異步更新操作
                  clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member));
              } catch (NacosException nacosException) {
                  callback.onFailed(nacosException);
              }
          }

          節(jié)點(diǎn)收到這些操作請(qǐng)求如何處理呢?

          代碼翻到DistroDataRequestHandler#handle(),集群中節(jié)點(diǎn)收到請(qǐng)求后處理邏輯在這里:

          @Override
          public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
              try {
                  switch (request.getDataOperation()) {
                      case VERIFY:
                          return handleVerify(request.getDistroData(), meta);
                      case SNAPSHOT:
                          return handleSnapshot();
                      case ADD:
                      case CHANGE:
                      case DELETE:
                          return handleSyncData(request.getDistroData());
                      case QUERY:
                          return handleQueryData(request.getDistroData());
                      default:
                          return new DistroDataResponse();
                  }
              } catch (Exception e) {
                  Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
                  DistroDataResponse result = new DistroDataResponse();
                  result.setErrorCode(ResponseCode.FAIL.getCode());
                  result.setMessage("handle distro request with exception");
                  return result;
              }
          }

          可以看出ADD、CHANGE和DELETE均由handleSyncData處理。

          private DistroDataResponse handleSyncData(DistroData distroData) {
              DistroDataResponse result = new DistroDataResponse();
              if (!distroProtocol.onReceive(distroData)) {
                  result.setErrorCode(ResponseCode.FAIL.getCode());
                  result.setMessage("[DISTRO-FAILED] distro data handle failed");
              }
              return result;
          }
          @Override
          public boolean processData(DistroData distroData) {
              switch (distroData.getType()) {
                  case ADD:
                  case CHANGE:
                      ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
                              .deserialize(distroData.getContent(), ClientSyncData.class)
          ;
                      handlerClientSyncData(clientSyncData); // 注解@5
                      return true;
                  case DELETE:
                      String deleteClientId = distroData.getDistroKey().getResourceKey();
                      Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
                      clientManager.clientDisconnected(deleteClientId); // 注解@6
                      return true;
                  default:
                      return false;
              }
          }

          注解@5 將同步過(guò)來(lái)的Client信息進(jìn)行緩存

          private void handlerClientSyncData(ClientSyncData clientSyncData) {
              Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
              clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
              Client client = clientManager.getClient(clientSyncData.getClientId());
             // 注解@5.1
              upgradeClient(client, clientSyncData);
          }

          需要的是從其他節(jié)點(diǎn)通過(guò)過(guò)來(lái)的Client信息,ConnectionBasedClient屬性isNative為false表示該連接時(shí)從其他節(jié)點(diǎn)同步過(guò)來(lái)的;true表示該連接客戶端直接連接的。

          public boolean syncClientConnected(String clientId, ClientSyncAttributes attributes) {
              String type = attributes.getClientAttribute(ClientConstants.CONNECTION_TYPE);
              ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type);
              return clientConnected(clientFactory.newSyncedClient(clientId, attributes));
          }

          @Override
          public ConnectionBasedClient newSyncedClient(String clientId, ClientSyncAttributes attributes) {
            return new ConnectionBasedClient(clientId, false); // false表示從其他節(jié)點(diǎn)同步過(guò)來(lái)的client
          }

          @Override
          public boolean clientConnected(Client client) {
            Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());
            if (!clients.containsKey(client.getClientId())) {
              clients.putIfAbsent(client.getClientId(), (ConnectionBasedClient) client); // 緩存client
            }
            return true;
          }

          注解@5.1  更新Client的Service以及Instance信息。

          private void upgradeClient(Client client, ClientSyncData clientSyncData) {

              List<String> namespaces = clientSyncData.getNamespaces();
              List<String> groupNames = clientSyncData.getGroupNames();
              List<String> serviceNames = clientSyncData.getServiceNames();
              List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
              Set<Service> syncedService = new HashSet<>();
              for (int i = 0; i < namespaces.size(); i++) {
                  Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
                  Service singleton = ServiceManager.getInstance().getSingleton(service);
                  syncedService.add(singleton);
                  InstancePublishInfo instancePublishInfo = instances.get(i);
                  if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
                      client.addServiceInstance(singleton, instancePublishInfo);
                      NotifyCenter.publishEvent(
                              new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
                  }
              }
              for (Service each : client.getAllPublishedService()) {
                  if (!syncedService.contains(each)) {
                      client.removeServiceInstance(each);
                      NotifyCenter.publishEvent(
                              new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
                  }
              }
          }

          注解@6 響應(yīng)刪除操作,從clients緩存中移除。

          @Override
          public boolean clientDisconnected(String clientId) {
              Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
              ConnectionBasedClient client = clients.remove(clientId);
              if (null == client) {
                  return true;
              }
              client.release();
              NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
              return true;
          }

          小結(jié): 增量同步的邏輯如下:當(dāng)本節(jié)點(diǎn)DistroClientDataProcessor收到ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent事件時(shí),會(huì)向Nacos集群的其他節(jié)點(diǎn)同步Client信息;集群中其他節(jié)點(diǎn)收到同步信息后更新或者刪除本地緩存的Client信息;通過(guò)增量同步的Client信息isNative為false表示不是由客戶端直連的。

          三、增量事件觸發(fā)

          在Nacos server啟動(dòng)時(shí)從運(yùn)行時(shí)內(nèi)存信息可以看出,總共緩存了17個(gè)事件類型。當(dāng)然也包括ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent。

          ClientChangedEvent事件觸發(fā)

          當(dāng)處理服務(wù)注冊(cè)和注銷事件時(shí)會(huì)觸發(fā)ClientChangeEvent事件,詳見(jiàn)InstanceRequestHandler#handle處理邏輯。

          public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
              Service service = Service
                      .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
              switch (request.getType()) {
                  // 注解@7
                  case NamingRemoteConstants.REGISTER_INSTANCE:
                      return registerInstance(service, request, meta);
                  // 注解@8
                  case NamingRemoteConstants.DE_REGISTER_INSTANCE:
                      return deregisterInstance(service, request, meta);
                  default:
                      throw new NacosException(NacosException.INVALID_PARAM,
                              String.format("Unsupported request type %s", request.getType()));
              }
          }

          注解@7 處理注冊(cè)請(qǐng)求,會(huì)調(diào)用到addServiceInstance方法,該方法中發(fā)布了ClientEvent.ClientChangedEvent事件。

          public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
              if (null == publishers.put(service, instancePublishInfo)) {
                  MetricsMonitor.incrementInstanceCount();
              }
              NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
              Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
              return true;
          }

          注解@8 處理注銷請(qǐng)求,會(huì)調(diào)用到removeServiceInstance方法,該方法中發(fā)布了ClientEvent.ClientChangedEvent事件

          public InstancePublishInfo removeServiceInstance(Service service) {
                  InstancePublishInfo result = publishers.remove(service);
                  if (null != result) {
                      MetricsMonitor.decrementInstanceCount();
                      NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
                  }
                  Loggers.SRV_LOG.info("Client remove for service {}, {}", service, getClientId());
                  return result;
          }

          小結(jié): 當(dāng)有服務(wù)注冊(cè)或者注銷時(shí)會(huì)觸發(fā)ClientEvent.ClientChangedEvent事件。

          ClientDisconnectEvent事件觸發(fā)

          下面一段代碼通過(guò)檢測(cè)連接是否超過(guò)保鮮期,超過(guò)保鮮期的會(huì)被注銷關(guān)閉,翻到代碼ConnectionManager#start()。

          @PostConstruct
          public void start() {
              // 定時(shí)任務(wù)每3秒執(zhí)行一次
              RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
                  @Override
                  public void run() {
                      try {
                          // 獲取緩存連接
                          int totalCount = connections.size();
                          Loggers.REMOTE_DIGEST.info("Connection check task start");
                          MetricsMonitor.getLongConnectionMonitor().set(totalCount);
                          // 所有連接集合
                          Set<Map.Entry<String, Connection>> entries = connections.entrySet();
                          // 獲取通過(guò)SDK連接的數(shù)量
                          int currentSdkClientCount = currentSdkClientCount();
                          boolean isLoaderClient = loadClient >= 0;
                          int currentMaxClient = isLoaderClient ? loadClient : connectionLimitRule.countLimit;

                          int expelCount = currentMaxClient < 0 ? 0 : Math.max(currentSdkClientCount - currentMaxClient, 0);

                          List<String> expelClient = new LinkedList<>();

                          Map<String, AtomicInteger> expelForIp = new HashMap<>(16);

                          // 1. calculate expel count  of ip.
                          // 加載Connection ConnectionLimitRule
                          // 默認(rèn)路徑為 ${usr.home}/nacos/data/loader/limitRule
                          for (Map.Entry<String, Connection> entry : entries) {

                              Connection client = entry.getValue();
                              String appName = client.getMetaInfo().getAppName();
                              String clientIp = client.getMetaInfo().getClientIp();
                              if (client.getMetaInfo().isSdkSource() && !expelForIp.containsKey(clientIp)) {
                                  //get limit for current ip.
                                  // 默認(rèn)無(wú)limit限制
                                  int countLimitOfIp = connectionLimitRule.getCountLimitOfIp(clientIp);
                                  // 默認(rèn)無(wú)limit限制
                                  if (countLimitOfIp < 0) {
                                      int countLimitOfApp = connectionLimitRule.getCountLimitOfApp(appName);
                                      countLimitOfIp = countLimitOfApp < 0 ? countLimitOfIp : countLimitOfApp;
                                  }
                                  if (countLimitOfIp < 0) { // 默認(rèn)無(wú)限制
                                      countLimitOfIp = connectionLimitRule.getCountLimitPerClientIpDefault();
                                  }

                                  if (countLimitOfIp >= 0 && connectionForClientIp.containsKey(clientIp)) {
                                      AtomicInteger currentCountIp = connectionForClientIp.get(clientIp);
                                      if (currentCountIp != null && currentCountIp.get() > countLimitOfIp) {
                                          expelForIp.put(clientIp, new AtomicInteger(currentCountIp.get() - countLimitOfIp));
                                      }
                                  }
                              }
                          }

                          if (expelForIp.size() > 0) { // 默認(rèn)等于0
                              Loggers.REMOTE_DIGEST.info("Over limit ip expel info,", expelForIp);
                          }

                          Set<String> outDatedConnections = new HashSet<>();
                          long now = System.currentTimeMillis();
                          // 2.get expel connection for ip limit.
                          //
                          for (Map.Entry<String, Connection> entry : entries) {
                              Connection client = entry.getValue();
                              String clientIp = client.getMetaInfo().getClientIp();
                              AtomicInteger integer = expelForIp.get(clientIp);
                              if (integer != null && integer.intValue() > 0) {
                                  integer.decrementAndGet();
                                  expelClient.add(client.getMetaInfo().getConnectionId());
                                  expelCount--;
                              } else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) { // 保鮮時(shí)間超過(guò)20秒放入outDatedConnections集合
                                  outDatedConnections.add(client.getMetaInfo().getConnectionId());
                              }

                          }

                          // 3. if total count is still over limit.
                          // expelCount 默認(rèn)為0
                          if (expelCount > 0) {
                              for (Map.Entry<String, Connection> entry : entries) {
                                  Connection client = entry.getValue();
                                  if (!expelForIp.containsKey(client.getMetaInfo().clientIp) && client.getMetaInfo()
                                          .isSdkSource() && expelCount > 0) {
                                      expelClient.add(client.getMetaInfo().getConnectionId());
                                      expelCount--;
                                      outDatedConnections.remove(client.getMetaInfo().getConnectionId());
                                  }
                              }
                          }

                          String serverIp = null;
                          String serverPort = null;
                          if (StringUtils.isNotBlank(redirectAddress) && redirectAddress.contains(Constants.COLON)) {
                              String[] split = redirectAddress.split(Constants.COLON);
                              serverIp = split[0];
                              serverPort = split[1];
                          }

                          for (String expelledClientId : expelClient) { // 默認(rèn)空集合
                              try {
                                  Connection connection = getConnection(expelledClientId);
                                  if (connection != null) {
                                      ConnectResetRequest connectResetRequest = new ConnectResetRequest();
                                      connectResetRequest.setServerIp(serverIp);
                                      connectResetRequest.setServerPort(serverPort);
                                      connection.asyncRequest(connectResetRequest, null);
                                  }

                              } catch (ConnectionAlreadyClosedException e) {
                                  unregister(expelledClientId);
                              } catch (Exception e) {
                                  Loggers.REMOTE_DIGEST.error("Error occurs when expel connection :", expelledClientId, e);
                              }
                          }

                          //4.client active detection.
                          Loggers.REMOTE_DIGEST.info("Out dated connection ,size={}", outDatedConnections.size());
                          // 超過(guò)保鮮期的鏈接集合
                          if (CollectionUtils.isNotEmpty(outDatedConnections)) {
                              Set<String> successConnections = new HashSet<>();
                              final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
                              for (String outDateConnectionId : outDatedConnections) {
                                  try {
                                      Connection connection = getConnection(outDateConnectionId);
                                      if (connection != null) {
                                          ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
                                          // 超過(guò)保鮮時(shí)間的連接,重新異步發(fā)起連接
                                          connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
                                              @Override
                                              public Executor getExecutor() {
                                                  return null;
                                              }

                                              @Override
                                              public long getTimeout() {
                                                  return 1000L;
                                              }

                                              @Override
                                              public void onResponse(Response response) {
                                                  latch.countDown();
                                                  if (response != null && response.isSuccess()) {
                                                      // 刷新激活時(shí)間
                                                      connection.freshActiveTime();
                                                      successConnections.add(outDateConnectionId);
                                                  }
                                              }

                                              @Override
                                              public void onException(Throwable e) {
                                                  latch.countDown();
                                              }
                                          });
                 
                                      } else {
                                          latch.countDown();
                                      }

                                  } catch (ConnectionAlreadyClosedException e) {
                                      latch.countDown();
                                  } catch (Exception e) {
                                      // ... 
                                      latch.countDown();
                                  }
                              }

                              latch.await(3000L, TimeUnit.MILLISECONDS);
                              Loggers.REMOTE_DIGEST
                                      .info("Out dated connection check successCount={}", successConnections.size());

                              // 無(wú)效連接集合
                              for (String outDateConnectionId : outDatedConnections) {
                                  if (!successConnections.contains(outDateConnectionId)) {
                                      Loggers.REMOTE_DIGEST
                                              .info("[{}]Unregister Out dated connection....", outDateConnectionId);
                                      // 注銷關(guān)閉connection
                                      unregister(outDateConnectionId);
                                  }
                              }
                          }

                          if (isLoaderClient) {  // 重置
                              loadClient = -1;
                              redirectAddress = null;
                          }

                      } catch (Throwable e) {
                         
                      }
                  }
              }, 1000L3000L, TimeUnit.MILLISECONDS);

          }
          public synchronized void unregister(String connectionId) {
              Connection remove = this.connections.remove(connectionId);
              if (remove != null) {
                  String clientIp = remove.getMetaInfo().clientIp;
                  AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
                  if (atomicInteger != null) {
                      int count = atomicInteger.decrementAndGet();
                      if (count <= 0) {
                          connectionForClientIp.remove(clientIp);
                      }
                  }
                  remove.close();
                  Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId);
                  clientConnectionEventListenerRegistry.notifyClientDisConnected(remove); // 異步
              }
          }

          public void notifyClientDisConnected(final Connection connection) {
                  
                  for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {
                      try {
                          clientConnectionEventListener.clientDisConnected(connection);
                      } catch (Throwable throwable) {
                          Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}",
                                  clientConnectionEventListener.getName(), throwable);
                      }
                  }
                  
           }

          @Override
          public boolean clientDisconnected(String clientId) {
            Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
            ConnectionBasedClient client = clients.remove(clientId);
            if (null == client) {
              return true;
            }
            client.release();
            NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client)); // 發(fā)布ClientDisconnectEvent事件
            return true;
          }

          小結(jié): 連接可以配置限制規(guī)則具體在${usr.home}/nacos/data/loader/limitRule文件配置,默認(rèn)無(wú)限制;通過(guò)定時(shí)任務(wù)每隔3秒鐘定時(shí)檢查緩存中的所有連接包括通過(guò)來(lái)源sdk的連接和集群的連接;如果連接超過(guò)保鮮期20秒,并再次發(fā)起連接請(qǐng)求,未能連接成功則注銷關(guān)閉該連接;注銷關(guān)閉時(shí)發(fā)布ClientEvent.ClientDisconnectEvent事件。

          ClientVerifyFailedEvent事件觸發(fā)

          上一篇文章中梳理了Nacos集群中,每個(gè)節(jié)點(diǎn)會(huì)對(duì)集群中其他節(jié)點(diǎn)每隔5秒發(fā)送校驗(yàn)數(shù)據(jù),也就是心跳。當(dāng)校驗(yàn)的結(jié)果會(huì)進(jìn)行回調(diào)(gRPC為例),我們翻著看看這部分。

          public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {
              if (isNoExistTarget(targetServer)) {
                  callback.onSuccess();
              }
              DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
              Member member = memberManager.find(targetServer);
              try {
                  DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer,
                          verifyData.getDistroKey().getResourceKey(), callback, member);
                  clusterRpcClientProxy.asyncRequest(member, request, wrapper); // 向其他節(jié)點(diǎn)發(fā)送本節(jié)點(diǎn)負(fù)責(zé)的cleintId信息
              } catch (NacosException nacosException) {
                  callback.onFailed(nacosException);
              }
          }

          重點(diǎn)看下DistroVerifyCallbackWrapper部分,校驗(yàn)失敗發(fā)布ClientVerifyFailedEvent事件。

          @Override
          public void onResponse(Response response) {
              if (checkResponse(response)) {
                  NamingTpsMonitor.distroVerifySuccess(member.getAddress(), member.getIp());
                  distroCallback.onSuccess();
              } else {
                  Loggers.DISTRO.info("Target {} verify client {} failed, sync new client", targetServer, clientId);
                 // 校驗(yàn)失敗發(fā)布ClientVerifyFailedEvent事件
                  NotifyCenter.publishEvent(new ClientEvent.ClientVerifyFailedEvent(clientId, targetServer));
                  NamingTpsMonitor.distroVerifyFail(member.getAddress(), member.getIp());
                  distroCallback.onFailed(null);
              }
          }

          最后看下ClientVerifyFailedEvent這個(gè)類,關(guān)注下成員變量包含了clientId和targetServer。當(dāng)收到ClientVerifyFailedEvent時(shí)用于向targetServer目標(biāo)節(jié)點(diǎn)添加客戶端clientId信息。

          public static class ClientVerifyFailedEvent extends ClientEvent {

              private static final long serialVersionUID = 2023951686223780851L;

              private final String clientId;
              
              private final String targetServer;
              
              public ClientVerifyFailedEvent(String clientId, String targetServer) {
                  super(null);
                  this.clientId = clientId;
                  this.targetServer = targetServer;
              }
              
              public String getClientId() {
                  return clientId;
              }
              
              public String getTargetServer() {
                  return targetServer;
              }
          }

          小結(jié): Nacos集群之間通過(guò)每5秒發(fā)送心跳校驗(yàn)數(shù)據(jù)請(qǐng)求(具體為本節(jié)點(diǎn)負(fù)責(zé)Client信息),其他節(jié)點(diǎn)接受到校驗(yàn)請(qǐng)求,如果緩存中存在該client表示校驗(yàn)成功,同時(shí)更新保鮮時(shí)間;否則校驗(yàn)失敗,回調(diào)返回失敗Response,請(qǐng)求節(jié)點(diǎn)收到失敗的Response后會(huì)發(fā)布ClientVerifyFailedEvent事件

          瀏覽 73
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  视频网站国产日本 | 操逼网站免费在线观看 | 国产精品久久久久久 | 五月婷婷中文 | 99热在线观看国产 |