<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Nacos8# 集群中節(jié)點之間健康檢查

          共 21971字,需瀏覽 44分鐘

           ·

          2021-07-13 10:41

          引言

          當新的節(jié)點加入集群或者集群中有節(jié)點下線了,集群之間可以通過健康檢查發(fā)現(xiàn)。健康檢查的頻率是怎么樣的?節(jié)點的狀態(tài)又是如何變動的?狀態(tài)的變動又會觸發(fā)什么動作。帶著這些問題本文捋一捋。

          一、內(nèi)容提要

          內(nèi)容提要

          健康檢查

          • Nacos節(jié)點會向集群其他節(jié)點發(fā)送健康檢查心跳,每一輪頻率為2秒
          • 當健康檢查異常時設置為不信任「SUSPICIOUS」狀態(tài),超過失敗最大次數(shù)3次設置為下線「DOWN」狀態(tài)
          • 健康檢查成功設置該節(jié)點為科通信「UP」狀態(tài)
          • 無論成功還是失敗當節(jié)點狀態(tài)變更時均發(fā)布MembersChangeEvent事件

          成員變更事件

          • 當集群節(jié)點成員變更時,MemberChangeListener會收到該事件
          • 例如回調(diào)ClusterRpcClientProxy#onEvent觸發(fā)refresh
          • 刷新本節(jié)點與集群中其他節(jié)點的RPC狀態(tài),關閉無效的或者增加新的RPC連接

          二、健康檢查

          代碼翻到ServerMemberManager#onApplicationEvent,在Nacos啟動的時候會啟動一個定時任務,第一次延遲5秒執(zhí)行,該定時任務即負責節(jié)點之間的心跳。

          @Override
          public void onApplicationEvent(WebServerInitializedEvent event) {
              getSelf().setState(NodeState.UP);
              if (!EnvUtil.getStandaloneMode()) { // 注解@1
                  GlobalExecutor.scheduleByCommon(this.infoReportTask, 5_000L);
              }
              EnvUtil.setPort(event.getWebServer().getPort());
              EnvUtil.setLocalAddress(this.localAddress);
              Loggers.CLUSTER.info("This node is ready to provide external services");
          }

          注解@1 非單機模式延遲5秒執(zhí)行,執(zhí)行的infoReportTask為MemberInfoReportTask。

          public abstract class Task implements Runnable {
              
              protected volatile boolean shutdown = false;
              
              @Override
              public void run() // 注解@2
                  if (shutdown) {
                      return;
                  }
                  try {
                      executeBody();
                  } catch (Throwable t) {
                      Loggers.CORE.error("this task execute has error : {}", ExceptionUtil.getStackTrace(t));
                  } finally {
                      if (!shutdown) {
                          after();
                      }
                  }
              }
            
          }

          注解@2 看下這個Task執(zhí)行邏輯,先執(zhí)行 executeBody(),執(zhí)行結束后執(zhí)行after()。

          class MemberInfoReportTask extends Task {

              private final GenericType<RestResult<String>> reference = new GenericType<RestResult<String>>() {
              };

              private int cursor = 0;

              @Override
              protected void executeBody() {
                  // ----------注解@1 start---------------
                 // 獲取集群中除了自身以外的其他節(jié)點列表
                  List<Member> members = ServerMemberManager.this.allMembersWithoutSelf();
                  if (members.isEmpty()) {
                      return;
                  }
                  // 定義一個游標
                  this.cursor = (this.cursor + 1) % members.size();
                  // 獲取每個節(jié)信息
                  Member target = members.get(cursor);
              //-----------注解@1 end-----------------
                  Loggers.CLUSTER.debug("report the metadata to the node : {}", target.getAddress());
                  // 注解@2
                  final String url = HttpUtils
                          .buildUrl(false, target.getAddress(), EnvUtil.getContextPath(), Commons.NACOS_CORE_CONTEXT,
                                  "/cluster/report");
                  try {
                     // 注解@3
                      asyncRestTemplate
                              .post(url, Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER, VersionUtils.version),
                                      Query.EMPTY, getSelf(), reference.getType(), new Callback<String>() { 
                                          @Override
                                          public void onReceive(RestResult<String> result) // 注解@4
                                              // 注解@5 返回版本不一致
                                              if (result.getCode() == HttpStatus.NOT_IMPLEMENTED.value()
                                                      || result.getCode() == HttpStatus.NOT_FOUND.value()) {
                                                  // ...
                                                  Member memberNew = target.copy();
                                                  if (memberNew.getAbilities() != null
                                                          && memberNew.getAbilities().getRemoteAbility() != null && memberNew
                                                          .getAbilities().getRemoteAbility().isSupportRemoteConnection()) {
                                                      memberNew.getAbilities().getRemoteAbility()
                                                              .setSupportRemoteConnection(false);
                                                      update(memberNew); // 更新節(jié)點屬性
                                                  }
                                                  return;
                                              }
                                              // 注解@6
                                              if (result.ok()) {
                                                  MemberUtil.onSuccess(ServerMemberManager.this, target);
                                              } else {
                                               // 注解@7 處理失敗上報
                                                  MemberUtil.onFail(ServerMemberManager.this, target);
                                              }
                                          }

                                          @Override
                                          public void onError(Throwable throwable) {
                                             // 注解@8 處理失敗上報
                                               MemberUtil.onFail(ServerMemberManager.this, target, throwable);
                                          }

                                          @Override
                                          public void onCancel() {

                                          }
                                      });
                  } catch (Throwable ex) {
                      // ...
                  }
              }

              @Override
              protected void after() {
                  GlobalExecutor.scheduleByCommon(this2_000L); // 注解@9
              }
          }

          注解@1 獲取集群中除了自身以外的其他節(jié)點列表,通過游標循環(huán)每個節(jié)點。

          注解@2 構造每個節(jié)點的上報url請求路徑為「/cluster/report」

          注解@3 發(fā)起Post健康檢查請求,請求內(nèi)容為自身信息Member

          注解@4 處理健康檢查返回結果,有以下三種類型

          注解@5 版本過低錯誤,這個可能在集群中版本不一致出現(xiàn)

          注解@6 處理成功上報,更新該節(jié)點member的狀態(tài)為UP表示科通信,設置失敗次數(shù)為0,并發(fā)布成員變更事件

          public static void onSuccess(final ServerMemberManager manager, final Member member) {
              final NodeState old = member.getState();
              manager.getMemberAddressInfos().add(member.getAddress());
              member.setState(NodeState.UP); // 狀態(tài)為UP可通信狀態(tài)
              member.setFailAccessCnt(0); // 失敗次數(shù)為0
              if (!Objects.equals(old, member.getState())) {
                  manager.notifyMemberChange(); // 發(fā)布成員變更事件
              }
          }

          注解@7&注解@8 均為處理失敗的上報,例如:集群中一個節(jié)點被kill -9 殺掉后。在nacos-cluster.log日志文件中會打印如下日志,并發(fā)布成員變更事件

          2021-07-0x 16:30:24,994 ERROR failed to report new info to target node : x.x.x.x:8848, error : caused: Connection refused;

          :2021-07-0x 16:30:30,995 ERROR failed to report new info to target node : x.x.x.x:8848, error : caused: Connection refused;
          public static void onFail(final ServerMemberManager manager, final Member member, Throwable ex) {
              manager.getMemberAddressInfos().remove(member.getAddress());
              final NodeState old = member.getState();

              // 設置該節(jié)點為「不信任」
              member.setState(NodeState.SUSPICIOUS);
              // 失敗次數(shù)遞增+1
              member.setFailAccessCnt(member.getFailAccessCnt() + 1);
              // 默認最大失敗重試次數(shù)為3
              int maxFailAccessCnt = EnvUtil.getProperty("nacos.core.member.fail-access-cnt", Integer.class, 3);

              // If the number of consecutive failures to access the target node reaches
              // a maximum, or the link request is rejected, the state is directly down
              // 超過重試次數(shù)設置節(jié)點狀態(tài)為「下線」
              if (member.getFailAccessCnt() > maxFailAccessCnt || StringUtils
                      .containsIgnoreCase(ex.getMessage(), TARGET_MEMBER_CONNECT_REFUSE_ERRMSG)) {
                  member.setState(NodeState.DOWN);
              }

              if (!Objects.equals(old, member.getState())) {
                  manager.notifyMemberChange(); // 發(fā)布成員變更事件
              }
          }

          被kill -9 殺掉的節(jié)點顯示狀態(tài)為下線DOWN

          注解@9 執(zhí)行完executeBody后延遲2秒繼續(xù)執(zhí)行executeBody,也就是檢查健康檢查的心跳頻率為2秒,一輪全部節(jié)點檢查結束后延遲2秒接著下一輪

          無論檢查成功還是失敗,當節(jié)點狀態(tài)變更時,發(fā)布成員變更事件。

          if (!Objects.equals(old, member.getState())) {
              manager.notifyMemberChange();
          }

          void notifyMemberChange() {
              NotifyCenter.publishEvent(MembersChangeEvent.builder().members(allMembers()).build());
          }

          小結: Nacos節(jié)點會向集群其他節(jié)點發(fā)送健康檢查心跳,每一輪頻率為2秒;當健康檢查異常時設置為不信任「SUSPICIOUS」狀態(tài),超過失敗最大次數(shù)3次設置為下線「DOWN」狀態(tài);健康檢查成功設置該節(jié)點為科通信「UP」狀態(tài);無論成功還是失敗當節(jié)點狀態(tài)變更時均發(fā)布MembersChangeEvent事件。


          三、成員變更事件
          當集群中有節(jié)點下線或者新節(jié)點上線都會通過心跳健康檢查探測對節(jié)點狀態(tài)進行改變。而狀態(tài)的變更均會觸發(fā)成員變更事件MembersChangeEvent。那訂閱到這個事件干啥呢?

          ClusterRpcClientProxy繼承了MemberChangeListener,當有MembersChangeEvent事件時會回調(diào)其onEvent方法。

          @Override
          public void onEvent(MembersChangeEvent event) {
              try {
                  List<Member> members = serverMemberManager.allMembersWithoutSelf();
                  refresh(members);
              } catch (NacosException e) {
                  // ...
              }
          }

          那接著看refresh方法。

          private void refresh(List<Member> members) throws NacosException {

              for (Member member : members) {
                  if (MemberUtil.isSupportedLongCon(member)) {
                      // 注解@10
                      createRpcClientAndStart(member, ConnectionType.GRPC);
                  }
              }
              Set<Map.Entry<String, RpcClient>> allClientEntrys = RpcClientFactory.getAllClientEntries();
              Iterator<Map.Entry<String, RpcClient>> iterator = allClientEntrys.iterator();
              List<String> newMemberKeys = members.stream().filter(a -> MemberUtil.isSupportedLongCon(a))
                      .map(a -> memberClientKey(a)).collect(Collectors.toList());
              // 關閉舊的grpc連接
              while (iterator.hasNext()) {
                  Map.Entry<String, RpcClient> next1 = iterator.next();
                  if (next1.getKey().startsWith("Cluster-") && !newMemberKeys.contains(next1.getKey())) {
                      Loggers.CLUSTER.info("member leave,destroy client of member - > : {}", next1.getKey());
                      RpcClientFactory.getClient(next1.getKey()).shutdown();
                      iterator.remove();
                  }
              }

          }

          注解@10 為集群中每個節(jié)點member創(chuàng)建rcp client,在client啟動時會先目標節(jié)點發(fā)送HealthCheckRequest,如果非健康節(jié)點將會被移除。見RpcClient類部分代碼。

          boolean isHealthy = healthCheck();
          // 非健康節(jié)點
          if (!isHealthy) {
              if (currentConnection == null) {
                  continue;
              }
              LoggerUtils.printIfInfoEnabled(LOGGER,
                      "[{}]Server healthy check fail,currentConnection={}", name,
                      currentConnection.getConnectionId());
              // 標記客戶端狀態(tài)為unhealthy
              rpcClientStatus.set(RpcClientStatus.UNHEALTHY);
              // 重置ReconnectContext移除serverInfo
              reconnectContext = new ReconnectContext(nullfalse);

          這個意味著如果集群中有節(jié)點下線,與下線節(jié)點的rpc將會失效;同樣如果集群中有新節(jié)點加入將會建立新的rpc通道。

          小結: 當集群節(jié)點成員變更時,MemberChangeListener會收到該事件。例如回調(diào)ClusterRpcClientProxy#onEvent觸發(fā)refresh。刷新本節(jié)點與集群中其他節(jié)點的RPC狀態(tài),關閉無效的或者增加新的RPC連接。

          瀏覽 280
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  无码人妻 一区二区三区 | 豆花视频综合 | 黄色片网站在线免费观看 | 啊啊啊啊啊在线免费观看 | 欧美性爱男人的天堂 |