<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Nacos6# Distro協(xié)議全量同步與校驗

          共 30747字,需瀏覽 62分鐘

           ·

          2021-06-28 08:20

          引言

          本文接著擼Distro協(xié)議,上文中分析了尋址模式。有了地址就要建立連接,有了連接就能通信了。集群之間都交互啥數(shù)據(jù)?本文就扒一扒全量同步和節(jié)點之間數(shù)據(jù)校驗。

          一、內(nèi)容提要

          節(jié)點間建立RCP連接

          • 訂閱了MembersChangeEvent事件,集群節(jié)點有變更能夠收到回調通知
          • 與集群中其他節(jié)點建立grpc連接并緩存到Map其中key格式為「Cluster-IP:Port」

          節(jié)點間校驗數(shù)據(jù)通信

          • 節(jié)點之間發(fā)送校驗數(shù)據(jù)是在全量同步后進行的
          • 發(fā)送校驗的頻率默認為5秒鐘一次
          • 校驗數(shù)據(jù)包括clientId和version,其中version為保留字段當前為0
          • 接受到校驗數(shù)據(jù)后如果緩存中存在該client表示校驗成功,同時更新保鮮時間,否則校驗失敗

          全量數(shù)據(jù)同步

          • 在節(jié)點啟動時會從集群中其他節(jié)點中的一個節(jié)點同步快照數(shù)據(jù)并緩存在Map中
          • 緩存的數(shù)據(jù)類型分類兩類分別為HTTP和gRPC
          • 具體數(shù)據(jù)即客戶端注冊節(jié)點信息含命名空間、分組名稱、服務名稱、節(jié)點Instance信息等
          • 集群中每個節(jié)點都擁有所有的快照數(shù)據(jù)

          二、節(jié)點間建立RPC連接

          節(jié)點之間要通信,需要建立連接。Nacos集群節(jié)點之間也不例外,下面看下Nacos是如何和集群之間建立連接的,以gRPC為例。

          Nacos中ClusterRpcClientProxy封裝了集群中節(jié)點之間的通道。

          @PostConstruct
          public void init() {
            try {
              // 注解@1
              NotifyCenter.registerSubscriber(this);
              // 注解@2
              List<Member> members = serverMemberManager.allMembersWithoutSelf(); 
              // 注解@3
              refresh(members);
              Loggers.CLUSTER
                .warn("[ClusterRpcClientProxy] success to refresh cluster rpc client on start up,members ={} ",
                      members);
            } catch (NacosException e) {
              Loggers.CLUSTER.warn("[ClusterRpcClientProxy] fail to refresh cluster rpc client,{} ", e.getMessage());
            }
          }

          注解@1 注冊自己訂閱MembersChangeEvent事件

          注解@2 獲取集群中的節(jié)點列表剔除自身節(jié)點

          注解@3 與各個節(jié)點建立rpc通道

          private void refresh(List<Member> members) throws NacosException {
             for (Member member : members) {

                  if (MemberUtil.isSupportedLongCon(member)) {
                      // 注解@3.1
                      createRpcClientAndStart(member, ConnectionType.GRPC);
                  }
              }
             Set<Map.Entry<String, RpcClient>> allClientEntrys = RpcClientFactory.getAllClientEntries();
              Iterator<Map.Entry<String, RpcClient>> iterator = allClientEntrys.iterator();
              List<String> newMemberKeys = members.stream().filter(a -> MemberUtil.isSupportedLongCon(a))
                      .map(a -> memberClientKey(a)).collect(Collectors.toList());
              // 注解@3.2
              while (iterator.hasNext()) {
                  Map.Entry<String, RpcClient> next1 = iterator.next();
                  if (next1.getKey().startsWith("Cluster-") && !newMemberKeys.contains(next1.getKey())) {
                      Loggers.CLUSTER.info("member leave,destroy client of member - > : {}", next1.getKey());
                      RpcClientFactory.getClient(next1.getKey()).shutdown();
                      iterator.remove();
                  }
              }

          }

          注解@3.1 為集群中每個節(jié)點member創(chuàng)建rcp client

          注解@3.2 關閉舊的grpc連接

          private void createRpcClientAndStart(Member member, ConnectionType type) throws NacosException {
              Map<String, String> labels = new HashMap<String, String>(2);
              labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_CLUSTER);
              // 注解@3.1.1
              String memberClientKey = memberClientKey(member);
              RpcClient client = RpcClientFactory.createClusterClient(memberClientKey, type, labels);
              if (!client.getConnectionType().equals(type)) {
                  Loggers.CLUSTER.info(",connection type changed,destroy client of member - > : {}", member);
                  RpcClientFactory.destroyClient(memberClientKey);
                  // 注解@3.1.2
                  client = RpcClientFactory.createClusterClient(memberClientKey, type, labels);
              }

              if (client.isWaitInitiated()) {
                  Loggers.CLUSTER.info("start a new rpc client to member - > : {}", member);

                  // 注解@3.1.3
                  client.serverListFactory(new ServerListFactory() {
                      @Override
                      public String genNextServer() {
                          return member.getAddress(); // 返回連接集群其他節(jié)點地址
                      }

                      @Override
                      public String getCurrentServer() {
                          return member.getAddress();
                      }

                      @Override
                      public List<String> getServerList() {
                          return Lists.newArrayList(member.getAddress());
                      }
                  });
                  // 注解@3.1.4
                  client.start(); 
              }
          }

          注解@3.1.1 memberClientKey由「Cluster-IP:Port」構成,例如:Cluster-1.2.3.4:2008

          注解@3.1.2 創(chuàng)建grpc client并緩存在 clientMap,key為memberClientKey 此時client的狀態(tài)為WAIT_INIT

          注解@3.1.3 集群中固定的某一臺節(jié)點

          注解@3.1.4  grpc連接集群中的member節(jié)點設置client的狀態(tài)RUNNING

          小結: 在與Nacos集群其他節(jié)點建立連接的過程中做了兩件事情:@1.訂閱了MembersChangeEvent事件 @2.與集群中其他節(jié)點建立grpc連接并緩存到Map其中key格式為「Cluster-IP:Port」

          三、節(jié)點間校驗數(shù)據(jù)通信

          節(jié)點之間建立rpc通道必然是為了互相之間能通信,其中一個通信是節(jié)點之間發(fā)送校驗數(shù)據(jù)。那為什么要發(fā)這些校驗數(shù)據(jù)?這些數(shù)據(jù)都是些什么內(nèi)容?下面咱就去扒一扒。

          在DistroProtocol的構造函數(shù)中的最后一個行有一個startDistroTask(),主要分析startVerifyTask的邏輯。

          public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
                  DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig)
           
          {
              this.memberManager = memberManager;
              this.distroComponentHolder = distroComponentHolder;
              this.distroTaskEngineHolder = distroTaskEngineHolder;
              this.distroConfig = distroConfig;
              startDistroTask();
          }
          private void startDistroTask() {
              // 單機模式直接返回
              if (EnvUtil.getStandaloneMode()) {
                  isInitialized = true;
                  return;
              }
              startVerifyTask();
              startLoadTask();
          }
          private void startVerifyTask() {
             // 注解@4
              GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTimedTask(memberManager, distroComponentHolder,
                      distroTaskEngineHolder.getExecuteWorkersManager()), distroConfig.getVerifyIntervalMillis());
          }

          注解@4  每隔5秒執(zhí)行,也就是節(jié)點之間發(fā)送校驗時間的默認頻率是5秒。

          可以通過配置參數(shù)「nacos.core.protocol.distro.data.verify_interval_ms」自定義。

          接著看DistroVerifyTimedTask的run方法。

          @Override
          public void run() {
              try {
                  // 注解@5
                  List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
                  if (Loggers.DISTRO.isDebugEnabled()) {
                      Loggers.DISTRO.debug("server list is: {}", targetServer);
                  }

                  // 注解@6
                  for (String each : distroComponentHolder.getDataStorageTypes()) {
                      verifyForDataStorage(each, targetServer);
                  }
              } catch (Exception e) {
                  Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);
              }
          }

          注解@5 拿到集群中其他節(jié)點

          注解@6 在Nacos server啟動時初始化時兩種類型HTTP和gRPC,本文以gRPC為例進行分析。

          private void verifyForDataStorage(String type, List<Member> targetServer) {
              // 注解@7
              DistroDataStorage dataStorage = distroComponentHolder.findDataStorage(type);
              // 注解@8
              if (!dataStorage.isFinishInitial()) {  // 未完成全量數(shù)據(jù)同步退出
                  Loggers.DISTRO.warn("data storage {} has not finished initial step, do not send verify data",
                          dataStorage.getClass().getSimpleName());
                  return;
              }

              //注解@9
              List<DistroData> verifyData = dataStorage.getVerifyData();
              if (null == verifyData || verifyData.isEmpty()) {
                  return;
              }

              for (Member member : targetServer) {
                  DistroTransportAgent agent = distroComponentHolder.findTransportAgent(type);
                  if (null == agent) {
                      continue;
                  }
                 // 注解@10
                  executeTaskExecuteEngine.addTask(member.getAddress() + type,
                          new DistroVerifyExecuteTask(agent, verifyData, member.getAddress(), type));
              }
          }

          注解@7 Nacos啟動時緩存在dataStorageMap中兩種類型處理器分別用于處理gRPC和HTTP通信方式。

          「Nacos:Naming:v2:ClientData->DistroClientDataProcessor」和 「com.alibaba.nacos.naming.iplist.->DistroDataStorageImpl」

          注解@8 當從其他節(jié)點同步了全部數(shù)據(jù)后,則完成了初始化finished initial,全量數(shù)據(jù)同步下小節(jié)分析。

          注解@9  獲取校驗的數(shù)據(jù),數(shù)據(jù)為由本節(jié)點負責的clientId列表。

          @Override
          public List<DistroData> getVerifyData() {
              List<DistroData> result = new LinkedList<>(); // 一組DistroData
              for (String each : clientManager.allClientId()) {
                  Client client = clientManager.getClient(each);
                  if (null == client || !client.isEphemeral()) { // 無效client或者非臨時節(jié)點
                      continue;
                  }
                  // 注解@9.1
                  if (clientManager.isResponsibleClient(client)) {
                      // 注解@9.2
                      DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(), 0);
                      DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
                      DistroData data = new DistroData(distroKey,
                              ApplicationUtils.getBean(Serializer.class).serialize(verifyData))// 序列化校驗數(shù)據(jù)
                      data.setType(DataOperation.VERIFY);
                      result.add(data);
                  }
              }
              return result;
          }

          注解@9.1 判斷client是否為本幾點負責的邏輯為ClientManagerDelegate#isResponsibleClient。即:屬于ConnectionBasedClient并且

          isNative為true表示該client是直連到該節(jié)點的。

          @Override
          public boolean isResponsibleClient(Client client) {
              return (client instanceof ConnectionBasedClient) && ((ConnectionBasedClient) client).isNative();
          }

          注解@9.2 構造Verify Data 主要信息為clientId,還有一個版本信息作為保留字段,目前都是0。

          注解@10 向集群其他節(jié)點發(fā)送校驗數(shù)據(jù)DistroVerifyExecuteTask#run

          @Override
          public void run() {
              for (DistroData each : verifyData) {
                  try {
                      if (transportAgent.supportCallbackTransport()) { // grpc支持回調
                          doSyncVerifyDataWithCallback(each);
                      } else { // http不支持回調使用同步
                          doSyncVerifyData(each);
                      }
                  } catch (Exception e) {
                    //...
                  }
              }
          }
          @Override
          public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {
              if (isNoExistTarget(targetServer)) {
                  callback.onSuccess();
              }
              DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
              Member member = memberManager.find(targetServer);
              try {
                  DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer,
                          verifyData.getDistroKey().getResourceKey(), callback, member);
                  // 注解@11         
                  clusterRpcClientProxy.asyncRequest(member, request, wrapper); 
              } catch (NacosException nacosException) {
                  callback.onFailed(nacosException);
              }
          }

          注解@11 向其他節(jié)點發(fā)送本節(jié)點負責的clientId信息

          那集群其他節(jié)點接收到校驗數(shù)據(jù)做什么處理呢?

          翻到DistroDataRequestHandler#handle,此處包含了處理校驗數(shù)據(jù)的邏輯。

          @Override
          public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
              try {
                  switch (request.getDataOperation()) {
                      case VERIFY:
                          return handleVerify(request.getDistroData(), meta);
                      case SNAPSHOT:
                          return handleSnapshot();
                      case ADD:
                      case CHANGE:
                      case DELETE:
                          return handleSyncData(request.getDistroData());
                      case QUERY:
                          return handleQueryData(request.getDistroData());
                      default:
                          return new DistroDataResponse();
                  }
              } catch (Exception e) {
                  // ...
              }
          }
          private DistroDataResponse handleVerify(DistroData distroData, RequestMeta meta) {
              DistroDataResponse result = new DistroDataResponse();
             // 注解@12
              if (!distroProtocol.onVerify(distroData, meta.getClientIp())) {
                  result.setErrorInfo(ResponseCode.FAIL.getCode(), "[DISTRO-FAILED] distro data verify failed");
              }
              return result;
          }

          注解@12 數(shù)據(jù)校驗,下面可以看到,如果緩存存在client則校驗成功,刷新client保鮮時間,否則校驗失敗。

          @Override
          public boolean verifyClient(String clientId) {
              ConnectionBasedClient client = clients.get(clientId);
              if (null != client) {
                  client.setLastRenewTime();
                  return true;
              }
              return false;
          }

          小結: 節(jié)點之間發(fā)送校驗數(shù)據(jù)是在全量同步后進行的;發(fā)送校驗的頻率默認為5秒鐘一次;校驗數(shù)據(jù)包括clientId和version,其中version為保留字段當前為0;接受到校驗數(shù)據(jù)后如果緩存中存在該client表示校驗成功,同時更新保鮮時間,否則校驗失敗。

          四、全量數(shù)據(jù)同步

          上文中提到在發(fā)送校驗數(shù)據(jù)之前需要先完成全量數(shù)據(jù)同步,先翻回DistroProtocol#startDistroTask()方法的startLoadTask()部分。

          private void startLoadTask() {
              DistroCallback loadCallback = new DistroCallback() {
                  @Override
                  public void onSuccess() {
                      isInitialized = true;
                  }
                  @Override
                  public void onFailed(Throwable throwable) {
                      isInitialized = false;
                  }
              };
              GlobalExecutor.submitLoadDataTask(
                      new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
          }

          DistroLoadDataTask#run

          @Override
          public void run() {
            try {
              load(); // 注解@13
              if (!checkCompleted()) { // 注解@14
                GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
              } else {
                loadCallback.onSuccess();
                Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
              }
            } catch (Exception e) {
              loadCallback.onFailed(e);
              Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
            }
          }

          注解@13 從集群中其他節(jié)點全量加載數(shù)據(jù)

          注解@14 如果沒有加載成功延遲30秒鐘重新執(zhí)行一次,可以通過參數(shù)「nacos.core.protocol.distro.data.load_retry_delay_ms」指定

          private void load() throws Exception {
              while (memberManager.allMembersWithoutSelf().isEmpty()) {
                  Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
                  TimeUnit.SECONDS.sleep(1);
              }
              while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
                  Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
                  TimeUnit.SECONDS.sleep(1);
              }
              for (String each : distroComponentHolder.getDataStorageTypes()) { // 注解@15
                  if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
                      loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each)); // 加載快照
                  }
              }
          }

          注解@15 為不同的數(shù)據(jù)類型緩存快照,此處有gRPC和http兩類數(shù)據(jù)類型。即:Nacos:Naming:v2:ClientData和com.alibaba.nacos.naming.iplist.

          private boolean loadAllDataSnapshotFromRemote(String resourceType) {
              DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
              DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
              if (null == transportAgent || null == dataProcessor) {
                  return false;
              }
              for (Member each : memberManager.allMembersWithoutSelf()) { // 注解@16
                  try {
                     DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
                      boolean result = dataProcessor.processSnapshot(distroData);
                      if (result) {
                          distroComponentHolder.findDataStorage(resourceType).finishInitial(); // 設置為完成初始化
                          return true;
                      }
                  } catch (Exception e) {
                     
                  }
              }
              return false;
          }

          注解@16 獲取集群中除了本節(jié)點的其他節(jié)點,循環(huán)重試獲取快照,直到有成功節(jié)點返回快照,成功后設置狀態(tài)狀態(tài)完成初始化「finishInitial」。

          @Override
          public DistroData getDatumSnapshot(String targetServer) {
              Member member = memberManager.find(targetServer);
              if (checkTargetServerStatusUnhealthy(member)) {
                  throw new DistroException(
                          String.format("[DISTRO] Cancel get snapshot caused by target server %s unhealthy", targetServer));
              }
              DistroDataRequest request = new DistroDataRequest();
             // 設置請求操作為SNAPSHOT
              request.setDataOperation(DataOperation.SNAPSHOT); 
              try {
                 // 發(fā)起請求快照數(shù)據(jù)
                  Response response = clusterRpcClientProxy.sendRequest(member, request);
                  if (checkResponse(response)) {
                      return ((DistroDataResponse) response).getDistroData();
                  } else {
                      throw new DistroException(
                              String.format("[DISTRO-FAILED] Get snapshot request to %s failed, code: %d, message: %s",
                                      targetServer, response.getErrorCode(), response.getMessage()));
                  }
              } catch (NacosException e) {
                  throw new DistroException("[DISTRO-FAILED] Get distro snapshot failed! ", e);
              }
          }

          接下來看看其他節(jié)點收到快照請求如何響應的

          還是翻到DistroDataRequestHandler#handle,具體由handleSnapshot()方法來處理。

          private DistroDataResponse handleSnapshot() {
              DistroDataResponse result = new DistroDataResponse();
              DistroData distroData = distroProtocol.onSnapshot(DistroClientDataProcessor.TYPE);
              result.setDistroData(distroData);
              return result;
          }
          @Override
          public DistroData getDatumSnapshot() {
              List<ClientSyncData> datum = new LinkedList<>();
              // 把本節(jié)點的所有client數(shù)據(jù)全部封裝
              for (String each : clientManager.allClientId()) {
                  Client client = clientManager.getClient(each);
                  if (null == client || !client.isEphemeral()) {
                      continue;
                  }
                  datum.add(client.generateSyncData());
              }
              ClientSyncDatumSnapshot snapshot = new ClientSyncDatumSnapshot();
              snapshot.setClientSyncDataList(datum);
              byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(snapshot)// 序列化數(shù)據(jù)
              return new DistroData(new DistroKey(DataOperation.SNAPSHOT.name(), TYPE), data);
          }

          下面看下client數(shù)據(jù)信息,命名空間、分組名稱、服務名稱、節(jié)點Instance信息(IP、端口等等)。

          public ClientSyncData generateSyncData() {
              List<String> namespaces = new LinkedList<>();
              List<String> groupNames = new LinkedList<>();
              List<String> serviceNames = new LinkedList<>();
              List<InstancePublishInfo> instances = new LinkedList<>();
              for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
                  namespaces.add(entry.getKey().getNamespace());
                  groupNames.add(entry.getKey().getGroup());
                  serviceNames.add(entry.getKey().getName());
                  instances.add(entry.getValue());
              }
              return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances);
          }

          小結: 集群中每個節(jié)點都擁有所有的快照數(shù)據(jù);在節(jié)點啟動時會從集群中其他節(jié)點中的一個節(jié)點同步快照數(shù)據(jù)并緩存在Map中;緩存的數(shù)據(jù)類型分類兩類分別為HTTP和gRPC;具體數(shù)據(jù)即客戶端注冊節(jié)點信息含命名空間、分組名稱、服務名稱、節(jié)點Instance信息等。


          瀏覽 52
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  婷婷五月亚洲 | 码人妻免费视频 | 无码性生活视频 | 国产精品视频导航 | AV大全免费看 |