<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Nacos10# 健康檢查類型與場(chǎng)景

          共 21897字,需瀏覽 44分鐘

           ·

          2021-07-26 16:31

          引言

          Nacos支持眾多健康檢查類型,心跳、HTTP、TCP、MySQL等類型,這些都作用于什么場(chǎng)景?他們又是如何實(shí)現(xiàn)的呢?本文就擼一擼這個(gè)。

          一、內(nèi)容提要

          臨時(shí)節(jié)點(diǎn)續(xù)約

          • 臨時(shí)節(jié)點(diǎn)續(xù)約通過gRPC連接保鮮實(shí)現(xiàn)
          • 執(zhí)行頻率5秒一次
          • 檢查結(jié)果健康刷新保鮮時(shí)間
          • 檢查結(jié)果不可用標(biāo)記節(jié)點(diǎn)不健康
          • 當(dāng)節(jié)點(diǎn)不健康時(shí)重新連接時(shí)會(huì)從server列表選擇下一個(gè)節(jié)點(diǎn)連接

          持久節(jié)點(diǎn)心跳檢測(cè)

          • 心跳執(zhí)行器通過每隔五秒中向Nacos Server發(fā)起HTTP請(qǐng)求
          • 如果返回的server not found會(huì)向Nacos Server發(fā)起注冊(cè)請(qǐng)求重新注冊(cè)

          持久節(jié)點(diǎn)探活

          • Nacos探活只有在持久節(jié)點(diǎn)注冊(cè)時(shí)才會(huì)支持
          • 探活支持HTTP、TCP、Mysql三種探活類型
          • HTTP通過檢測(cè)返回200狀態(tài)碼標(biāo)記是否健康
          • TPC通過Channel連接方式標(biāo)記是否健康
          • Mysql則保證當(dāng)前節(jié)點(diǎn)為主節(jié)點(diǎn),可用于主從切換場(chǎng)景

          二、臨時(shí)節(jié)點(diǎn)續(xù)約

          在《Nacos2# 服務(wù)注冊(cè)與發(fā)現(xiàn)客戶端示例與源碼解析(二)》分析gRPC Client啟動(dòng)邏輯時(shí)有分析連接健康檢查邏輯。具體代碼在RpcClient#start()中,下面再次聚焦下。

          client連接心跳檢查

          clientEventExecutor.submit(new Runnable() {
              @Override
              public void run() {
                  while (true) {
                      try {
                          // 獲取重定向連接上下文,指重新連接到其他server節(jié)點(diǎn)
                          ReconnectContext reconnectContext = reconnectionSignal
                                  .poll(keepAliveTime, TimeUnit.MILLISECONDS);
                          if (reconnectContext == null) {
                              // check alive time.
                              // client活動(dòng)時(shí)間超過5秒鐘,向Nacos Server發(fā)起健康檢測(cè)
                              if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
                                  // 發(fā)送健康檢查
                                  boolean isHealthy = healthCheck();
                                  // 非健康節(jié)點(diǎn)
                                  if (!isHealthy) {
                                      if (currentConnection == null) {
                                          continue;
                                      }
                                      LoggerUtils.printIfInfoEnabled(LOGGER,
                                              "[{}]Server healthy check fail,currentConnection={}", name,
                                              currentConnection.getConnectionId());
                                      // 標(biāo)記客戶端狀態(tài)為unhealthy
                                      rpcClientStatus.set(RpcClientStatus.UNHEALTHY);
                                      // 重置ReconnectContext移除serverInfo
                                      reconnectContext = new ReconnectContext(nullfalse);

                                      // 健康連接更新時(shí)間戳
                                      lastActiveTimeStamp = System.currentTimeMillis();
                                      continue;
                                  }
                              } else {
                                  // 心跳保鮮未過期,跳過本次檢測(cè)
                                  continue;
                              }

                          }
                         // ...
                      } catch (Throwable throwable) {
                          //Do nothing
                      }
                  }
              }
          });

          服務(wù)端響應(yīng)

          代碼翻到GrpcRequestAcceptor#request部分,執(zhí)行RequestHandler邏輯。

          Response response = requestHandler.handleRequest(request, requestMeta);
          @TpsControl(pointName = "HealthCheck")
          public HealthCheckResponse handle(HealthCheckRequest request, RequestMeta meta) {
              return new HealthCheckResponse();
          }

          當(dāng)服務(wù)端收到健康檢查請(qǐng)求時(shí),通過HealthCheckRequestHandler#handle返回HealthCheckResponse。

          節(jié)點(diǎn)選擇

          while (startUpRetryTimes > 0 && connectToServer == null) {
              try {
                  startUpRetryTimes--;
                  ServerInfo serverInfo = nextRpcServer(); // 需連接的節(jié)點(diǎn)

                  LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}", name,
                          serverInfo);

                  connectToServer = connectToServer(serverInfo); // 發(fā)起rpc連接,連接到集群中其他節(jié)點(diǎn)
              } catch (Throwable e) {
                  LoggerUtils.printIfWarnEnabled(LOGGER,
                          "[{}]Fail to connect to server on start up, error message={}, start up retry times left: {}",
                          name, e.getMessage(), startUpRetryTimes);
              }

          }

          在選擇節(jié)點(diǎn)時(shí)從server地址列表中自增選擇下一個(gè)。

          @Override
          public String genNextServer() {
              int index = currentIndex.incrementAndGet() % getServerList().size(); // 獲取集群中下一個(gè)節(jié)點(diǎn)
              return getServerList().get(index);
          }

          清理無(wú)效連接

          詳見:RpcScheduledExecutor#start();定時(shí)任務(wù)會(huì)清理無(wú)效連接

          // 定時(shí)任務(wù)每3秒執(zhí)行一次
          RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
              @Override
              public void run() {
               // 超過保鮮時(shí)間的連接,重新異步發(fā)起連接
                  connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
                  
                  // 刷新激活時(shí)間
                  connection.freshActiveTime();
                  successConnections.add(outDateConnectionId);
                    
                  //當(dāng)失效時(shí)(保鮮時(shí)間超過20秒),注銷關(guān)閉connection
                  unregister(outDateConnectionId);
              }
           }, 1000L3000L, TimeUnit.MILLISECONDS);

          小結(jié): 在臨時(shí)節(jié)點(diǎn)注冊(cè)時(shí),客戶端gRPC啟動(dòng)時(shí)會(huì)啟動(dòng)一個(gè)守護(hù)線程用戶健康檢查;健康檢查的頻率為5秒執(zhí)行一次;當(dāng)檢查結(jié)果健康則刷新保鮮時(shí)間;檢查結(jié)果不可用標(biāo)記gRPC客戶端狀態(tài)為unhealthy;不健康節(jié)點(diǎn)在發(fā)起連接時(shí)會(huì)從server地址列表中選擇下一個(gè)發(fā)起連接;服務(wù)端也會(huì)定時(shí)清理超過保鮮時(shí)間連接。

          三、持久節(jié)點(diǎn)心跳檢測(cè)

          永久節(jié)點(diǎn)心跳檢測(cè)在《Nacos2# 服務(wù)注冊(cè)與發(fā)現(xiàn)客戶端示例與源碼解析(二)》HTTP心跳檢測(cè)器有詳細(xì)分析,這里把內(nèi)容要點(diǎn)摘錄如下:

          • HTTP心跳檢測(cè)只適用于注冊(cè)的節(jié)點(diǎn)持久節(jié)點(diǎn),臨時(shí)節(jié)點(diǎn)會(huì)使用grpc代理
          • 心跳執(zhí)行器通過每隔五秒中向Nacos Server發(fā)起HTTP請(qǐng)求
          • 如果返回的server not found會(huì)向Nacos Server發(fā)起注冊(cè)請(qǐng)求重新注冊(cè)

          四、持久節(jié)點(diǎn)探活

          持久節(jié)點(diǎn)探活支持HTTP、TCP和Mysql幾種類型,下面以HTTP為例分析其運(yùn)行邏輯。

          探活示例

          當(dāng)注冊(cè)時(shí)節(jié)點(diǎn)設(shè)置為不健康即Healthy設(shè)置為false,當(dāng)服務(wù)端探活正常后將節(jié)點(diǎn)設(shè)置為健康即Healthy為true。

          @SpringBootApplication
          public class BootApplication {
          public static void main(String[] args) throws Exception {
            SpringApplication.run(BootApplication.classargs);
            System.setProperty("serverAddr""127.0.0.1:8848");
            System.setProperty("namespace""public");


            Properties properties = new Properties();
            properties.setProperty("serverAddr", System.getProperty("serverAddr"));
            properties.setProperty("namespace", System.getProperty("namespace"));

            Instance instance = new Instance();
            instance.setClusterName("clusterDemo3");
            instance.setHealthy(false);//默認(rèn)健康檢查通過后,才認(rèn)為是真正的健康
            instance.setIp(getIpAddress());
            instance.setPort(8282);
            instance.setWeight(100);
            instance.setServiceName("AppNacosDemo3");
            instance.setEphemeral(false);

            Map<String, String> map = new HashMap<String, String>();
            map.put("unit""shunit");
            instance.setMetadata(map);

            NamingService naming = NamingFactory.createNamingService(properties);
            naming.registerInstance("AppNacosDemo3", instance);

          }

          private static String getIpAddress() throws SocketException {
            Enumeration<NetworkInterface> allNetInterfaces = NetworkInterface.getNetworkInterfaces();
            while (allNetInterfaces.hasMoreElements()) {
              NetworkInterface netInterface = allNetInterfaces.nextElement();
              if (!netInterface.isLoopback() && !netInterface.isVirtual() && netInterface.isUp()) {
                Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
                while (addresses.hasMoreElements()) {
                  InetAddress ip = addresses.nextElement();
                  final String hostAddress = ip.getHostAddress();
                  if (ip instanceof Inet4Address) {
                    return hostAddress;
                  }
                }
              }
            }
            return "";
          }
          }

          @RestController
          public class HelloController {
              @RequestMapping(value = "/bike",method = RequestMethod.GET)
              public void hello(){
                  System.out.println("receive message from server.");
              }
          }

          探活地址設(shè)置

          探活輸出

          receive message from server.
          receive message from server.

          小結(jié): 啟動(dòng)時(shí)注冊(cè)節(jié)點(diǎn)為非健康節(jié)點(diǎn),Nacos通過檢查路徑請(qǐng)求返回200正確后將節(jié)點(diǎn)設(shè)置為健康狀態(tài)。

          源碼分析

          當(dāng)持久節(jié)點(diǎn)注冊(cè)時(shí),會(huì)請(qǐng)求到InstanceController#register方法。

          @Override
          public void registerInstance(String namespaceId, String serviceName, Instance instance){
            boolean ephemeral = instance.isEphemeral();
            String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
            createIpPortClientIfAbsent(clientId, ephemeral); // 創(chuàng)建IpPortClient
            Service service = getService(namespaceId, serviceName, ephemeral);
            clientOperationService.registerInstance(service, instance, clientId);
          }
          public IpPortBasedClient(String clientId, boolean ephemeral) {
              this.ephemeral = ephemeral;
              this.clientId = clientId;
              this.responsibleId = getResponsibleTagFromId();
              if (ephemeral) {
                  beatCheckTask = new ClientBeatCheckTaskV2(this);
                  HealthCheckReactor.scheduleCheck(beatCheckTask);
              } else {
                 // 持久節(jié)點(diǎn)創(chuàng)建HealthCheckTaskV2并定時(shí)任務(wù)調(diào)度
                  healthCheckTaskV2 = new HealthCheckTaskV2(this);
                  HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
              }
          }
          @Override
          public void doHealthCheck() {
            try {
              // 獲取Client的service列表
              for (Service each : client.getAllPublishedService()) {
                // 開啟健康檢查
                if (switchDomain.isHealthCheckEnabled(each.getGroupedServiceName())) {
                  // 注冊(cè)節(jié)點(diǎn)信息
                  InstancePublishInfo instancePublishInfo = client.getInstancePublishInfo(each);
                  ClusterMetadata metadata = getClusterMetadata(each, instancePublishInfo);
                  // 調(diào)用
                  ApplicationUtils.getBean(HealthCheckProcessorV2Delegate.class).process(thiseachmetadata);
                  // ...
                }
              }
            } catch (Throwable e) {
              
            } finally {
              if (!cancelled) {
                // 下一輪調(diào)度
                HealthCheckReactor.scheduleCheck(this);
                if (this.getCheckRtWorst() > 0) {
                    // ...
                    }
                  }
                }
              }
            }
          }

          備注:定時(shí)任務(wù)調(diào)度,不斷向服務(wù)的注冊(cè)節(jié)點(diǎn)發(fā)送探活請(qǐng)求。

          探活處理器選擇

          備注:由運(yùn)行時(shí)緩存情況可以看出,支持TPC、HTTP、MYSQL三種類型探活處理。

          HTTP探活

          代碼坐標(biāo)HttpHealthCheckProcessor#process

          @Override
          public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
              HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) task.getClient()
                      .getInstancePublishInfo(service);
              if (null == instance) {
                  return;
              }
              try {
                  // TODO handle marked(white list) logic like v1.x.
                  if (!instance.tryStartCheck()) {
                      // ...
                      return;
                  }
                  Http healthChecker = (Http) metadata.getHealthChecker();
                  // 默認(rèn)使用注冊(cè)實(shí)例端口
                  int ckPort = metadata.isUseInstancePortForCheck() ? instance.getPort() : metadata.getHealthyCheckPort();
                  // 組織url請(qǐng)求
                  URL host = new URL("http://" + instance.getIp() + ":" + ckPort);
                  URL target = new URL(host, healthChecker.getPath());
                  Map<String, String> customHeaders = healthChecker.getCustomHeaders();
                  Header header = Header.newInstance();
                  header.addAll(customHeaders);
                  // 發(fā)送HTTP請(qǐng)求
                  ASYNC_REST_TEMPLATE.get(target.toString(), header, Query.EMPTY, String.class,
                          new HttpHealthCheckCallback(instancetaskservice))
          ;
                  MetricsMonitor.getHttpHealthCheckMonitor().incrementAndGet();
              } catch (Throwable e) {
                  instance.setCheckRt(switchDomain.getHttpHealthParams().getMax());
                  healthCheckCommon.checkFail(task, service, "http:error:" + e.getMessage());
                  healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,
                          switchDomain.getHttpHealthParams());
              }
          }
          @Override
          public void onReceive(RestResult<String> result) {
              instance.setCheckRt(System.currentTimeMillis() - startTime);
              int httpCode = result.getCode();
              // 返回200
              if (HttpURLConnection.HTTP_OK == httpCode) {
                  // 將節(jié)點(diǎn)變更為健康
                  healthCheckCommon.checkOk(task, service, "http:" + httpCode);
                  healthCheckCommon.reEvaluateCheckRT(System.currentTimeMillis() - startTime, task,
                          switchDomain.getHttpHealthParams());
              } else if (HttpURLConnection.HTTP_UNAVAILABLE == httpCode
                      || HttpURLConnection.HTTP_MOVED_TEMP == httpCode) {
                 
              } else {
                
              }
          }

           public void checkOk(HealthCheckTaskV2 task, Service service, String msg) {
             // 節(jié)點(diǎn)設(shè)置為健康狀態(tài)
             healthStatusSynchronizer.instanceHealthStatusChange(true, task.getClient(), service, instance);
           }

          備注:向節(jié)點(diǎn)發(fā)起HTTP請(qǐng)求,返回狀態(tài)碼為200表示,將節(jié)點(diǎn)標(biāo)志為健康狀態(tài);否則標(biāo)記非健康狀態(tài)。

          TCP探活

          TPC探活的代碼詳見TcpHealthCheckProcessor,不再詳細(xì)分析。大體邏輯為通過與注冊(cè)實(shí)例建立channel,不斷ping 注冊(cè)實(shí)例的端口是否可用,從而判斷服務(wù)是否健康。

          MYSQL探活

          備注:主要檢查當(dāng)前節(jié)點(diǎn)為主庫(kù),不能訪問到從庫(kù),可能在主從切換中使用。

          總結(jié): 本文就臨時(shí)節(jié)點(diǎn)續(xù)約、持久節(jié)點(diǎn)心跳、持久節(jié)點(diǎn)的探活代碼實(shí)現(xiàn)做了熟練。相信通過代碼走查,對(duì)其使用場(chǎng)景和實(shí)現(xiàn)不再陌生。

          瀏覽 231
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  91日韩在线视频 | 欧美日韩国产高清 | 黄视频免费 | 毛片大香蕉 | 黄色的视频网站 |