<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Nacos2# 服務(wù)注冊與發(fā)現(xiàn)客戶端示例與源碼解析(二)

          共 54898字,需瀏覽 110分鐘

           ·

          2021-05-23 23:32

          引言


          引言

          上一篇客戶端初始化沒有擼完,這篇繼續(xù)。Nacos從2.0以后增加了對grpc的支持,代碼中HTTP的代理初始化還有保留,我們注冊發(fā)現(xiàn)通常為臨時節(jié)點(diǎn),這部分已由gRPC接管。可以對比下新舊邏輯的實(shí)現(xiàn)差異。

          一、內(nèi)容提要

          HTTP代理初始化

          HTTP心跳檢測器

          • HTTP心跳檢測只適用于注冊的節(jié)點(diǎn)持久節(jié)點(diǎn),臨時節(jié)點(diǎn)會使用grpc代理(HTTP的心跳檢測默認(rèn)廢棄由grpc替代)
          • 在初始化時客戶端注冊代理NamingClientProxy時,初始化了一個HTTP心跳器用于向Nacos Server發(fā)起心跳
          • 在注冊節(jié)點(diǎn)時通過向心跳執(zhí)行器添加心跳任務(wù)addBeatInfo觸發(fā)
          • 心跳執(zhí)行器通過每隔五秒中向Nacos Server發(fā)起HTTP請求
          • 如果返回的server not found會向Nacos Server發(fā)起注冊請求重新注冊

          UDP接受服務(wù)端推送

          • Client通過UDP接受到nacos server推動的消息
          • 如果服務(wù)端推送的為服務(wù)信息通過processServiceInfo處理邏輯見上篇,主要實(shí)例變更時的通知機(jī)制
          • 如果dump類型,則客戶端發(fā)送服務(wù)信息serviceInfoMap的ack信息到服務(wù)端

          gRPC代理初始化

          gRPC初始化邏輯概覽

          • gRPC 客戶端代理的初始化主要邏輯為創(chuàng)建gRPC Client并啟動
          • 并注冊ServerRequestHandler用于處理Nacos Server推送的NotifySubscriberRequest請求
          • 注冊ConnectionListener用于處理gRPC建立和斷開連接事件
          • 請求超時時間可以通過「namingRequestTimeout」設(shè)置,默認(rèn)為3秒

          gRPC Client啟動邏輯

          • gRPC Client啟動邏輯主要在于建立與nacos server的grpc連接,其中兩個守護(hù)線程一直在運(yùn)行
          • 守護(hù)線程1用于處理grpc連接的建立和關(guān)閉事件
          • 守護(hù)線程2用于與nacos server的心跳保鮮,并負(fù)責(zé)異步建立grpc連接
          • 守護(hù)線程2同時負(fù)責(zé)當(dāng)nacos server的地址信息發(fā)生變更時重新與新server建立連接
          • nacos server的地址變更通過grpc通道由server推送ConnectResetRequest到client
          • grpc client只與nacos server集群中一臺建立grpc連接

          二、源碼分析
          public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties,
                      InstancesChangeNotifier changeNotifier)
           throws NacosException 
          {
                  this.serviceInfoUpdateService = new ServiceInfoUpdateService(properties, serviceInfoHolder, this,
                          changeNotifier);
                this.serverListManager = new ServerListManager(properties);
                this.serviceInfoHolder = serviceInfoHolder;
                  this.securityProxy = new SecurityProxy(properties, NamingHttpClientManager.getInstance().getNacosRestTemplate());
                  initSecurityProxy();
                // @注解7.4
                  this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties,serviceInfoHolder);
               // @注解7.5
                  this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,serviceInfoHolder);
          }


          三、HTTP代理初始化

          @注解7.4 Http代理的初始化,該代理主要在nacos 2.0以前版本使用,2.0之后通過grpc與nacos server通信。

          public NamingHttpClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListManager serverListManager,
                      Properties properties, ServiceInfoHolder serviceInfoHolder)
           
          {
                  super(securityProxy, properties);
                  this.serverListManager = serverListManager;
                  this.setServerPort(DEFAULT_SERVER_PORT);
                  this.namespaceId = namespaceId;
               // @注解7.4.1
                  this.beatReactor = new BeatReactor(this, properties);
               // @注解7.4.2
                  this.pushReceiver = new PushReceiver(serviceInfoHolder);
               // @注解7.4.3
                  this.maxRetry = ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_REQUEST_DOMAIN_RETRY_COUNT,
                          String.valueOf(UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT)));
           }

          HTTP心跳檢測器

          @注解7.4.1 初始化BeatReactor,用于向nacos server發(fā)送心跳

          public BeatReactor(NamingHttpClientProxy serverProxy, Properties properties) {
                  this.serverProxy = serverProxy;
                  // 心跳線程池大小,默認(rèn)為核數(shù)的二分之一,最小為1,可通過properties參數(shù)「namingClientBeatThreadCount」設(shè)置
                  int threadCount = initClientBeatThreadCount(properties);
               // 初始化線程執(zhí)行器
                  this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
                      @Override
                      public Thread newThread(Runnable r) {
                          Thread thread = new Thread(r);
                          thread.setDaemon(true);
                          thread.setName("com.alibaba.nacos.naming.beat.sender");
                          return thread;
                      }
                  });
          }

          接著一下這個執(zhí)行器再做什么事情。

          public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
                  NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
                  String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
                  BeatInfo existBeat = null;
                  //fix #1733
                  if ((existBeat = dom2Beat.remove(key)) != null) {
                      existBeat.setStopped(true);
                  }
                  dom2Beat.put(key, beatInfo);
                  // 默認(rèn)延遲5秒
                  executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
                  MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
          }

          當(dāng)通過addBeatInfo增加一個心跳信息BeatInfo時,執(zhí)行器會創(chuàng)建BeatTask(Runnable)延遲5秒運(yùn)行。

          class BeatTask implements Runnable {
               BeatInfo beatInfo;
               public BeatTask(BeatInfo beatInfo) {
                      this.beatInfo = beatInfo;
                  }
               @Override
                  public void run() {
                      if (beatInfo.isStopped()) {
                          return;
                      }
                      long nextTime = beatInfo.getPeriod();
                      try {
                         // 向nacos server「/nacos/v1/ns/instance/beat」發(fā)送心跳
                          JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
                          long interval = result.get("clientBeatInterval").asLong();
                          boolean lightBeatEnabled = false;
                          if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
                              lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
                          }
                          BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
                          if (interval > 0) {
                              nextTime = interval;
                          }
                          int code = NamingResponseCode.OK;
                          if (result.has(CommonParams.CODE)) {
                              code = result.get(CommonParams.CODE).asInt();
                          }
                         // 如果nacos server返回NOT FOUND則重新發(fā)起注冊請求
                          if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
                              Instance instance = new Instance();
                              instance.setPort(beatInfo.getPort());
                              instance.setIp(beatInfo.getIp());
                              instance.setWeight(beatInfo.getWeight());
                              instance.setMetadata(beatInfo.getMetadata());
                              instance.setClusterName(beatInfo.getCluster());
                              instance.setServiceName(beatInfo.getServiceName());
                              instance.setInstanceId(instance.getInstanceId());
                              instance.setEphemeral(true);
                              try {
                                  serverProxy.registerService(beatInfo.getServiceName(),
                                          NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                              } catch (Exception ignore) {
                              }
                          }
                      } catch (NacosException ex) {
                          NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
                                  JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());

                      }
                     // 默認(rèn)為5秒,可以通過PreservedMetadataKeys.HEART_BEAT_INTERVAL設(shè)置
                      executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
                  }
          }

          addBeatInfo調(diào)用時機(jī),當(dāng)節(jié)點(diǎn)在注冊時如果實(shí)例為臨時節(jié)點(diǎn),則會創(chuàng)建心跳任務(wù)發(fā)起

          public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
                  
                  NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,instance);
                  String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
                  if (instance.isEphemeral()) {
                      BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
                     // 添加心跳任務(wù)
                      beatReactor.addBeatInfo(groupedServiceName, beatInfo);
                  }
                  final Map<String, String> params = new HashMap<String, String>(16);
                  params.put(CommonParams.NAMESPACE_ID, namespaceId);
                  params.put(CommonParams.SERVICE_NAME, groupedServiceName);
                  params.put(CommonParams.GROUP_NAME, groupName);
                  params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
                  params.put("ip", instance.getIp());
                  params.put("port", String.valueOf(instance.getPort()));
                  params.put("weight", String.valueOf(instance.getWeight()));
                  params.put("enable", String.valueOf(instance.isEnabled()));
                  params.put("healthy", String.valueOf(instance.isHealthy()));
                  params.put("ephemeral", String.valueOf(instance.isEphemeral()));
                  params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
                  
                  reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
                  
          }

          再跟蹤下注冊入口,判讀使用哪個ClientProxy

          @Override
          public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
              getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
          }

          private NamingClientProxy getExecuteClientProxy(Instance instance) {
                  // 是否為臨時節(jié)點(diǎn),臨時節(jié)點(diǎn)使用grpc,持久節(jié)點(diǎn)使用http;默認(rèn)為true,也就是默認(rèn)使用grpcClientProxy
                  return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
          }

          小結(jié): HTTP心跳檢測只適用于注冊的節(jié)點(diǎn)持久節(jié)點(diǎn),臨時節(jié)點(diǎn)會使用grpc代理,即HTTP的心跳檢測默認(rèn)廢棄由grpc替代;在初始化時客戶端注冊代理NamingClientProxy時,初始化了一個HTTP心跳器用于向Nacos Server發(fā)起心跳;在注冊節(jié)點(diǎn)時通過向心跳執(zhí)行器添加心跳任務(wù)addBeatInfo觸發(fā);心跳執(zhí)行器通過每隔五秒中向Nacos Server發(fā)起HTTP請求,如果返回的server not found會向Nacos Server發(fā)起注冊請求重新注冊;

          UDP接受服務(wù)端推送

          @注解7.4.2 初始化PushReceiver用于接受nacos server信息推送,使用UDP協(xié)議。

           public PushReceiver(ServiceInfoHolder serviceInfoHolder) {
                  try {
                      this.serviceInfoHolder = serviceInfoHolder;
                      this.udpSocket = new DatagramSocket();
                      this.executorService = new ScheduledThreadPoolExecutor(1new ThreadFactory() {
                          @Override
                          public Thread newThread(Runnable r) {
                              Thread thread = new Thread(r);
                              thread.setDaemon(true);
                              thread.setName("com.alibaba.nacos.naming.push.receiver");
                              return thread;
                          }
                      });
                      
                      this.executorService.execute(this);
                  } catch (Exception e) {
                      NAMING_LOGGER.error("[NA] init udp socket failed", e);
                  }
          }

          備注: PushReceiver實(shí)現(xiàn)Runnable接口,在構(gòu)造方法中通過守護(hù)線程運(yùn)行。

          public void run() {
            while (!closed) {
              try {
                // byte[] is initialized with 0 full filled by default
                byte[] buffer = new byte[UDP_MSS];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                // 接受nacos server推送
                udpSocket.receive(packet);
                // 將推送內(nèi)容轉(zhuǎn)換為json字符串
                String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
                NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
                PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
                String ack;
                // 推送類型服務(wù)信息(例如訂閱實(shí)例的變更)會通知訂閱者邏輯已在上篇分析
                if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                  serviceInfoHolder.processServiceInfo(pushPacket.data);
                  // send ack to server
                  ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
                    + "\"\"}";
                } else if ("dump".equals(pushPacket.type)) {
                  // dump data to server
                  ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
                    + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(serviceInfoHolder.getServiceInfoMap()))
                    + "\"}";
                } else {
                  // do nothing send ack only
                  ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                    + "\", \"data\":" + "\"\"}";
                }
             // 向Server發(fā)送ack消息
                udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
                                                  packet.getSocketAddress()));
              } catch (Exception e) {
                if (closed) {
                  return;
                }
                NAMING_LOGGER.error("[NA] error while receiving push data", e);
              }
            }
          }

          小結(jié): Client通過UDP接受到nacos server推動的消息:@1如果推送的為服務(wù)信息通過processServiceInfo處理,邏輯見上篇;@2 如果dump類型,則客戶端發(fā)送服務(wù)信息serviceInfoMap的ack信息到服務(wù)端。

          HTTP重試次數(shù)

          @注解7.4.3  client通過HTTP向Nacos Server請求的重試次數(shù),默認(rèn)為3次??梢酝ㄟ^「namingRequestDomainMaxRetryCount」指定

           public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,
                      String method)
           throws NacosException 
          {
             // ...
             if (serverListManager.isDomain()) {
                      String nacosDomain = serverListManager.getNacosDomain();
                      for (int i = 0; i < maxRetry; i++) { // 請求發(fā)送異常最大重試次數(shù)
                          try {
                              return callServer(api, params, body, nacosDomain, method);
                          } catch (NacosException e) {
                              exception = e;
                              if (NAMING_LOGGER.isDebugEnabled()) {
                                  NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
                              }
                          }
                      }
             }
             //....
           }


          四、gRPC代理初始化

          gRPC初始化邏輯概覽

          @注解7.5 下面接著gRPC 客戶端代理的初始化邏輯

          public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
                      Properties properties, ServiceInfoHolder serviceInfoHolder)
           throws NacosException 
          {
                  super(securityProxy, properties);
                  this.namespaceId = namespaceId;
                  this.uuid = UUID.randomUUID().toString();
                  // 設(shè)置請求超時時間,默認(rèn)為3秒??梢酝ㄟ^參數(shù)「namingRequestTimeout」設(shè)置
                  this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));
                  Map<String, String> labels = new HashMap<String, String>();
                  // 設(shè)置source=sdk,module=naming
                  labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
                  labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
                  // 創(chuàng)建gRPC Client:clientName=uuid,ConnectionType=GRPC
                  this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
                  // 創(chuàng)建ConnectionEventListener用于建立和斷開gRPC連接時的事件響應(yīng)
                  this.namingGrpcConnectionEventListener = new NamingGrpcConnectionEventListener(this);
                // 啟動grpc client
                  start(serverListFactory, serviceInfoHolder);
          }
          private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
                  rpcClient.serverListFactory(serverListFactory);
               // @注解7.5.1 gRPC Client啟動
                  rpcClient.start();
               // 注冊registerServerRequestHandler用于處理從Nacos Push到Client的請求
                  rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
               // 注冊連接事件Listener,當(dāng)連接建立和斷開時處理事件
                  rpcClient.registerConnectionListener(namingGrpcConnectionEventListener);
          }

          小結(jié): gRPC 客戶端代理的初始化主要邏輯為創(chuàng)建gRPC Client并啟動;并注冊ServerRequestHandler用于處理Nacos Server推送的NotifySubscriberRequest請求;注冊ConnectionListener用于處理gRPC建立和斷開連接事件;另外,請求超時時間可以通過「namingRequestTimeout」設(shè)置,默認(rèn)為3秒。

          gRPC Client啟動邏輯

          @注解7.5.1  gRPC Client啟動邏輯

          public final void start() throws NacosException {
            // 將Client狀態(tài)由INITIALIZED變更為STARTING
            boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
            if (!success) {
              return;
            }
            // -------------------------@1 satart---------------------------------------------
            // 守護(hù)線程執(zhí)行器
            clientEventExecutor = new ScheduledThreadPoolExecutor(2new ThreadFactory() {
              @Override
              public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.remote.worker");
                t.setDaemon(true);
                return t;
              }
            });
           
            // 從BlockingQueue中不斷獲取連接Event,根據(jù)事件類型回調(diào)onConnected()/onDisConnect()事件
            clientEventExecutor.submit(new Runnable() {
              @Override
              public void run() {
                while (true) {
                  ConnectionEvent take = null;
                  try {
                    take = eventLinkedBlockingQueue.take();
                    if (take.isConnected()) {
                      notifyConnected();
                    } else if (take.isDisConnected()) {
                      notifyDisConnected();
                    }
                  } catch (Throwable e) {
                    //Do nothing
                  }
                }
              }
            });
           // -------------------------@1 end---------------------------------------------
            
            
            // -------------------------@2 start---------------------------------------------
            clientEventExecutor.submit(new Runnable() {
              @Override
              public void run() {
                while (true) {
                  try {
                    // 獲取重定向連接上下文,指重新連接到其他server節(jié)點(diǎn)
                    ReconnectContext reconnectContext = reconnectionSignal
                      .poll(keepAliveTime, TimeUnit.MILLISECONDS);
                    if (reconnectContext == null) {
                      // check alive time.
                      // client活動時間超過5秒鐘,向Nacos Server發(fā)起健康檢測
                      if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
                        // 發(fā)送健康檢查
                        boolean isHealthy = healthCheck();
                        // 非健康節(jié)點(diǎn)
                        if (!isHealthy) {
                          if (currentConnection == null) {
                            continue;
                          }
                          LoggerUtils.printIfInfoEnabled(LOGGER,
                                                         "[{}]Server healthy check fail,currentConnection={}", name,
                                                         currentConnection.getConnectionId());
                          // 標(biāo)記客戶端狀態(tài)為unhealthy
                          rpcClientStatus.set(RpcClientStatus.UNHEALTHY);
                          // 重置ReconnectContext移除serverInfo
                          reconnectContext = new ReconnectContext(nullfalse);

                        } else {
                          // 健康連接更新時間戳
                          lastActiveTimeStamp = System.currentTimeMillis();
                          continue;
                        }
                      } else {
                        // 心跳保鮮未過期,跳過本次檢測
                        continue;
                      }

                    }

                    if (reconnectContext.serverInfo != null) {
                      // clear recommend server if server is not in server list.
                      boolean serverExist = false;
                      // 判斷連接上下文的reconnectContext.serverInfo是否在我們推薦設(shè)置的列表中
                      for (String server : getServerListFactory().getServerList()) {
                        ServerInfo serverInfo = resolveServerInfo(server);
                        if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
                          serverExist = true;
                          reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
                          break;
                        }
                      }
                      // 不再推薦的列表中則移除,改為隨機(jī)選擇
                      if (!serverExist) {
                        LoggerUtils.printIfInfoEnabled(LOGGER,
                                                       "[{}] Recommend server is not in server list ,ignore recommend server {}", name,
                                                       reconnectContext.serverInfo.getAddress());

                        reconnectContext.serverInfo = null;

                      }
                    }
                    // 發(fā)起重新連接
                    reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
                  } catch (Throwable throwable) {
                    //Do nothing
                  }
                }
              }
            });
            // -------------------------@2 end---------------------------------------------
            
            
            // -------------------------@3 start---------------------------------------------
            // 異步連接nacos server失敗,改為同步連接
            //connect to server ,try to connect to server sync once, async starting if fail.
            Connection connectToServer = null;
            rpcClientStatus.set(RpcClientStatus.STARTING);

            int startUpRetryTimes = RETRY_TIMES;
            while (startUpRetryTimes > 0 && connectToServer == null) {
              try {
                startUpRetryTimes--;
                ServerInfo serverInfo = nextRpcServer();

                LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}", name,
                                               serverInfo);

                connectToServer = connectToServer(serverInfo);
              } catch (Throwable e) {
                LoggerUtils.printIfWarnEnabled(LOGGER,
                                               "[{}]Fail to connect to server on start up, error message={}, start up retry times left: {}",
                                               name, e.getMessage(), startUpRetryTimes);
              }

            }

            // -------------------------@3 end---------------------------------------------
            
            
           // -------------------------@4 start---------------------------------------------
            
            if (connectToServer != null) {
              LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up,connectionId={}",name, connectToServer.serverInfo.getAddress(),connectToServer.getConnectionId());
                this.currentConnection = connectToServer;
                rpcClientStatus.set(RpcClientStatus.RUNNING);
                // 連接成功添加ConnectionEvent
                eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
              } else {
               // 未成功建立連接重新發(fā)起異步建立連接需求
                switchServerAsync();
              }
            
           // 注冊ConnectResetRequestHandler用于處理nacos server推送的重置連接請求
            registerServerRequestHandler(new ConnectResetRequestHandler());

            //register client detection request.
            registerServerRequestHandler(new ServerRequestHandler() {
              @Override
              public Response requestReply(Request request) {
                if (request instanceof ClientDetectionRequest) {
                  return new ClientDetectionResponse();
                }

                return null;
              }
            });
            Runtime.getRuntime().addShutdownHook(new Thread() {
              @Override
              public void run() {
                try {
                  RpcClient.this.shutdown();
                } catch (NacosException e) {
                  LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]RpcClient shutdown exception, errorMessage ={}", name,
                                                  e.getMessage());
                }
              }
            });
            // -------------------------@4 end---------------------------------------------
          }

          備注: grpc client啟動時的邏輯:邏輯塊@1  守護(hù)線程不斷從阻塞隊(duì)列eventLinkedBlockingQueue獲取grpc連接/斷開事件,并調(diào)用上文中注冊的namingGrpcConnectionEventListener回調(diào)其onConnected/onDisConnect方法。其中事件添加時機(jī)為:

          grpc連接建立時,添加連接事件:

          // 連接成功添加ConnectionEvent
          eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));

          grpc連接關(guān)閉時,添加關(guān)閉事件:

          private void closeConnection(Connection connection) {
                  if (connection != null) {
                      connection.close();
                      // 斷開連接添加DISCONNECTED事件
                      eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.DISCONNECTED));
                  }
          }

          邏輯塊@2  守護(hù)線程不斷從阻塞隊(duì)列reconnectionSignal獲取重新連接事件(ReconnectContext)也就是更換nacos server的連接grpc通道:

          阻塞隊(duì)列沒有重新連接事件:則做心跳保鮮檢測,心跳頻率為5秒。當(dāng)超過5秒時會向Nacos Server發(fā)起健康檢查,當(dāng)返回不健康時,將grpc client標(biāo)記為unhealthy;返回健康則刷新心跳時間lastActiveTimeStamp。

          阻塞隊(duì)列有重新連接事件:重連事件上下文reconnectContext的的server ip在我們設(shè)置的nacos server 列表則使用,否則改為隨機(jī)選擇nacos server ip地址,并與新server建立連接

          protected void reconnect(final ServerInfo recommendServerInfo, boolean onRequestFail) {

            try {

              AtomicReference<ServerInfo> recommendServer = new AtomicReference<ServerInfo>(recommendServerInfo);
              // onRequestFail=true表示當(dāng)健康檢查失敗grpcClient被設(shè)置為unhealthy,重連時重新發(fā)起健康檢查,如果檢查通過則不再執(zhí)行重連
              if (onRequestFail && healthCheck()) {
                LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Server check success,currentServer is{} ", name,
                                               currentConnection.serverInfo.getAddress());
                rpcClientStatus.set(RpcClientStatus.RUNNING);
                return;
              }

              LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] try to re connect to a new server ,server is {}", name,
                                             recommendServerInfo == null ? " not appointed,will choose a random server."
                                             : (recommendServerInfo.getAddress() + ", will try it once."));

              // loop until start client success.
              boolean switchSuccess = false;

              int reConnectTimes = 0;
              int retryTurns = 0;
              Exception lastException = null;
              // 切換nacos server沒有成功則會一直重試
              while (!switchSuccess && !isShutdown()) {

                //1.get a new server
                ServerInfo serverInfo = null;
                try {
                  // 獲取需要重新連接的server地址
                  serverInfo = recommendServer.get() == null ? nextRpcServer() : recommendServer.get();
                  //2.create a new channel to new server
                  // 與新的server建立grpc連接,如果連接失敗返回null
                  Connection connectionNew = connectToServer(serverInfo);
                  // 關(guān)閉緩存的當(dāng)前連接并重定向到新的連接
                  if (connectionNew != null) {
                    LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] success to connect a server  [{}],connectionId={}",
                                                   name, serverInfo.getAddress(), connectionNew.getConnectionId());
                    //successfully create a new connect.
                    if (currentConnection != null) {
                      LoggerUtils.printIfInfoEnabled(LOGGER,"[{}] Abandon prev connection ,server is  {}, connectionId is {}", name,currentConnection.serverInfo.getAddress(), currentConnection.getConnectionId());
                      //set current connection to enable connection event.
                      currentConnection.setAbandon(true);
                      closeConnection(currentConnection);
                    }
                    currentConnection = connectionNew;
                    rpcClientStatus.set(RpcClientStatus.RUNNING);
                    switchSuccess = true;
                    // 添加連接成功時間到阻塞隊(duì)列
                    boolean s = eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.CONNECTED));
                    return;
                  }

                  //close connection if client is already shutdown.
                  if (isShutdown()) {
                    closeConnection(currentConnection);
                  }

                  lastException = null;

                } catch (Exception e) {
                  lastException = e;
                } finally {
                  // 清理本次重連請求
                  recommendServer.set(null);
                }

                // 執(zhí)行到這里表示上面沒有成功建立連接,打印重試次數(shù)日志
                if (reConnectTimes > 0
                    && reConnectTimes % RpcClient.this.serverListFactory.getServerList().size() == 0) {
                  LoggerUtils.printIfInfoEnabled(LOGGER,"[{}] fail to connect server,after trying {} times, last try server is {},error={}", name,reConnectTimes, serverInfo, lastException == null ? "unknown" : lastException);
                  if (Integer.MAX_VALUE == retryTurns) {
                    retryTurns = 50;
                  } else {
                    retryTurns++;
                  }
                }

                reConnectTimes++;
             // 重試時等待特定的時間
                try {
                  //sleep x milliseconds to switch next server.
                  if (!isRunning()) {
                    // first round ,try servers at a delay 100ms;second round ,200ms; max delays 5s. to be reconsidered.
                    Thread.sleep(Math.min(retryTurns + 150) * 100L);
                  }
                } catch (InterruptedException e) {
                  // Do  nothing.
                }
              }

              if (isShutdown()) {
                LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Client is shutdown ,stop reconnect to server", name);
              }

            } catch (Exception e) {
              LoggerUtils.printIfWarnEnabled(LOGGER, "[{}] Fail to  re connect to server ,error is {}", name, e);
            }
          }

          備注: 重新切換連接server邏輯:@1當(dāng)檢查失敗grpc client會被標(biāo)記為unhealthy這類型onRequestFail為true,重連時重新發(fā)起健康檢查,如果檢查成功,則退出本次重連。@2 獲取重連的server地址和端口,并建立grpc連接,關(guān)閉當(dāng)前緩存的舊連接并重定向到新連接,同時添加連接成功時間到阻塞隊(duì)列。@3 一直重試直到連接建立成功,每次重試等待一些時間(100ms,200ms...最大為5秒)。

          邏輯塊@3 當(dāng)異步與nacos server建立失敗時,改為嘗試同步建立連接。

          邏輯塊@4 如果連接建立成功添加連接事件到阻塞隊(duì)列;連接建立失敗發(fā)起異步建立連接請求;注冊ConnectResetRequestHandler用于處理nacos server推送的重置連接請求;jvm退出時通過hook關(guān)閉grpc client。

          小結(jié):  gRPC Client啟動邏輯主要在于建立與nacos server的grpc連接,其中兩個守護(hù)線程一直在運(yùn)行。一個用于處理grpc連接的建立和關(guān)閉事件;一個用于與nacos server的心跳保鮮,并負(fù)責(zé)異步建立grpc連接,當(dāng)nacos server的地址信息發(fā)生變更時負(fù)責(zé)重新與新server建立連接;grpc client只與nacos server集群中一臺建立grpc連接



          瀏覽 212
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  中文字幕av一区二区三区 | 国产视频1区 | 国产又粗又细又黄视频 | 高 h 小说 视频 成人 | 毛片电影在线香 |