<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          4 個維度搞懂 Nacos 注冊中心

          共 23244字,需瀏覽 47分鐘

           ·

          2023-03-11 04:26

          二哥的編程星球已經有 1800 多名 球友加入了,如果你也需要一個良好的學習氛圍,戳鏈接加入我們吧!這是一個編程學習指南+ Java 項目實戰(zhàn)+ LeetCode 刷題的私密圈子,你可以閱讀星球專欄、向二哥提問、幫你制定學習計劃、和球友一起打卡成長,沖沖沖。

          Nacos 可以作為配置中心和注冊中心,這篇文章從原理到源碼,給大家講解 Nacos 注冊中心的 4 個維度。from 星球嘉賓樓仔。

          不 BB,上文章目錄:

          01 什么是動態(tài)服務發(fā)現(xiàn)?

          服務發(fā)現(xiàn)是指使用一個注冊中心來記錄分布式系統(tǒng)中的全部服務的信息,以便其他服務能夠快速的找到這些已注冊的服務。

          在單體應用中,DNS+Nginx 可以滿足服務發(fā)現(xiàn)的要求,此時服務的IP列表配置在 nginx 上。在微服務架構中,由于服務粒度變的更細,服務的上下線更加頻繁,我們需要一款注冊中心來動態(tài)感知服務的上下線,并且推送IP列表變化給服務消費者,架構如下圖。

          02 Nacos 實現(xiàn)動態(tài)服務發(fā)現(xiàn)的原理

          Nacos實現(xiàn)動態(tài)服務發(fā)現(xiàn)的核心原理如下圖,我們接下來的內容將圍繞這個圖來進行。

          2.1 通訊協(xié)議

          整個服務注冊與發(fā)現(xiàn)過程,都離不開通訊協(xié)議,在1.x的 Nacos 版本中服務端只支持 http 協(xié)議,后來為了提升性能在2.x版本引入了谷歌的 grpc,grpc 是一款長連接協(xié)議,極大的減少了 http 請求頻繁的連接創(chuàng)建和銷毀過程,能大幅度提升性能,節(jié)約資源。

          據(jù)官方測試,Nacos服務端 grpc 版本,相比 http 版本的性能提升了9倍以上。

          2.2 Nacos 服務注冊

          簡單來講,服務注冊的目的就是客戶端將自己的ip端口等信息上報給 Nacos 服務端,過程如下:

          • 創(chuàng)建長連接:Nacos SDK 通過Nacos服務端域名解析出服務端ip列表,選擇其中一個ip創(chuàng)建 grpc 連接,并定時檢查連接狀態(tài),當連接斷開,則自動選擇服務端ip列表中的下一個ip進行重連。
          • 健康檢查請求:在正式發(fā)起注冊之前,Nacos SDK 向服務端發(fā)送一個空請求,服務端回應一個空請求,若Nacos SDK 未收到服務端回應,則認為服務端不健康,并進行一定次數(shù)重試,如果都未收到回應,則注冊失敗。
          • 發(fā)起注冊:當你查看Nacos java SDK的注冊方法時,你會發(fā)現(xiàn)沒有返回值,這是因為Nacos SDK做了補償機制,在真實給服務端上報數(shù)據(jù)之前,會先往緩存中插入一條記錄表示開始注冊,注冊成功之后再從緩存中標記這條記錄為注冊成功,當注冊失敗時,緩存中這條記錄是未注冊成功的狀態(tài),Nacos SDK開啟了一個定時任務,定時查詢異常的緩存數(shù)據(jù),重新發(fā)起注冊。

          Nacos SDK注冊失敗時的自動補償機制時序圖。

          相關源碼如下:

          @Override
          public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
              NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
                      instance);
                  //添加redo日志
              redoService.cacheInstanceForRedo(serviceName, groupName, instance);

              doRegisterService(serviceName, groupName, instance);
          }
          public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
             //向服務端發(fā)起注冊
              InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
                      NamingRemoteConstants.REGISTER_INSTANCE, instance);
              requestToServer(request, Response.class);
              //標記注冊成功
              redoService.instanceRegistered(serviceName, groupName);
          }

          執(zhí)行補償定時任務RedoScheduledTask。

          @Override
          public void run() {
              if (!redoService.isConnected()) {
                  LogUtils.NAMING_LOGGER.warn("Grpc Connection is disconnect, skip current redo task");
                  return;
              }
              try {
                  redoForInstances();
                  redoForSubscribes();
              } catch (Exception e) {
                  LogUtils.NAMING_LOGGER.warn("Redo task run with unexpected exception: ", e);
              }
          }
            private void redoForInstances() {
              for (InstanceRedoData each : redoService.findInstanceRedoData()) {
                  try {
                      redoForInstance(each);
                  } catch (NacosException e) {
                      LogUtils.NAMING_LOGGER.error("Redo instance operation {} for {}@@{} failed. ", each.getRedoType(),
                              each.getGroupName(), each.getServiceName(), e);
                  }
              }
          }
          • 服務端數(shù)據(jù)同步(Distro協(xié)議):Nacos SDK只會與服務端某個節(jié)點建立長連接,當服務端接受到客戶端注冊的實例數(shù)據(jù)后,還需要將實例數(shù)據(jù)同步給其他節(jié)點。Nacos自己實現(xiàn)了一個一致性協(xié)議名為Distro,服務注冊的時候會觸發(fā)Distro一次同步,每個Nacos節(jié)點之間會定時互相發(fā)送Distro數(shù)據(jù),以此保證數(shù)據(jù)最終一致。
          • 服務實例上線推送:Nacos服務端收到服務實例數(shù)據(jù)后會將服務的最新實例列表通過grpc推送給該服務的所有訂閱者。
          • 服務注冊過程源碼時序圖:整理了一下服務注冊過程整體時序圖,對源碼實現(xiàn)感興趣的可以按照根據(jù)這個時序圖view一下源碼。

          2.3 Nacos 心跳機制

          目前主流的注冊中心,比如Consul、Eureka、Zk包括我們公司自研的Gsched,都是通過心跳機制來感知服務的下線。Nacos也是通過心跳機制來實現(xiàn)的。

          Nacos目前SDK維護了兩個分支的版本(1.x、2.x),這兩個版本心跳機制的實現(xiàn)不一樣。其中1.x版本的SDK通過http協(xié)議來定時向服務端發(fā)送心跳維持自己的健康狀態(tài),2.x版本的SDK則通過grpc自身的心跳機制來?;睿擭acos服務端接受不到服務實例的心跳,會認為實例下線。如下圖:

          grpc監(jiān)測到連接斷開事件,發(fā)送ClientDisconnectEvent。

          public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {
            //連接斷開,發(fā)送連接斷開事件
          public boolean clientDisconnected(String clientId) {
              Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
              ConnectionBasedClient client = clients.remove(clientId);
              if (null == client) {
                  return true;
              }
              client.release();
              NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
              return true;
          }
          }

          移除客戶端注冊的服務實例

          public class ClientServiceIndexesManager extends SmartSubscriber {

            @Override
              public void onEvent(Event event) {
              //接收失去連接事件
                  if (event instanceof ClientEvent.ClientDisconnectEvent) {
                      handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
                  } else if (event instanceof ClientOperationEvent) {
                      handleClientOperation((ClientOperationEvent) event);
                  }
              }
              private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {
                  Client client = event.getClient();
                  for (Service each : client.getAllSubscribeService()) {
                      removeSubscriberIndexes(each, client.getClientId());
                  }
                  //移除客戶端注冊的服務實例
                  for (Service each : client.getAllPublishedService()) {
                      removePublisherIndexes(each, client.getClientId());
                  }
              }
              
              //移除客戶端注冊的服務實例
              private void removePublisherIndexes(Service service, String clientId) {
                  if (!publisherIndexes.containsKey(service)) {
                      return;
                  }
                  publisherIndexes.get(service).remove(clientId);
                  NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
              }
          }

          2.4 Nacos 服務訂閱

          當一個服務發(fā)生上下線,Nacos如何知道要推送給哪些客戶端?

          Nacos SDK 提供了訂閱和取消訂閱方法,當客戶端向服務端發(fā)起訂閱請求,服務端會記錄發(fā)起調用的客戶端為該服務的訂閱者,同時將服務的最新實例列表返回。當客戶端發(fā)起了取消訂閱,服務端就會從該服務的訂閱者列表中把當前客戶端移除。

          當客戶端發(fā)起訂閱時,服務端除了會同步返回最新的服務實例列表,還會異步的通過grpc推送給該訂閱者最新的服務實例列表,這樣做的目的是為了異步更新客戶端本地緩存的服務數(shù)據(jù)。

          當客戶端訂閱的服務上下線,該服務所有的訂閱者會立刻收到最新的服務列表并且將服務最新的實例數(shù)據(jù)更新到內存。

          我們也看一下相關源碼,服務端接收到訂閱數(shù)據(jù),首先保存到內存中。

          @Override
          public void subscribeService(Service service, Subscriber subscriber, String clientId) {
              Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
              Client client = clientManager.getClient(clientId);
              //校驗長連接是否正常
              if (!clientIsLegal(client, clientId)) {
                  return;
              }
              //保存訂閱數(shù)據(jù)
              client.addServiceSubscriber(singleton, subscriber);
              client.setLastUpdatedTime();
              //發(fā)送訂閱事件
              NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
          }

              private void handleClientOperation(ClientOperationEvent event) {
              Service service = event.getService();
              String clientId = event.getClientId();
              if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
                  addPublisherIndexes(service, clientId);
              } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
                  removePublisherIndexes(service, clientId);
              } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
              //處理訂閱操作
                  addSubscriberIndexes(service, clientId);
              } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
                  removeSubscriberIndexes(service, clientId);
              }
          }

          然后發(fā)布訂閱事件。

          private void addSubscriberIndexes(Service service, String clientId) {
              //保存訂閱數(shù)據(jù)
              subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
              // Fix #5404, Only first time add need notify event.
              if (subscriberIndexes.get(service).add(clientId)) {
              //發(fā)布訂閱事件
                  NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
              }
          }

          服務端自己消費訂閱事件,并且推送給訂閱的客戶端最新的服務實例數(shù)據(jù)。

          @Override
          public void onEvent(Event event) {
              if (!upgradeJudgement.isUseGrpcFeatures()) {
                  return;
              }
              if (event instanceof ServiceEvent.ServiceChangedEvent) {
                  // If service changed, push to all subscribers.
                  ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
                  Service service = serviceChangedEvent.getService();
                  delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
              } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
                  // If service is subscribed by one client, only push this client.
                  ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
                  Service service = subscribedEvent.getService();
                  delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
                          subscribedEvent.getClientId()));
              }
          }

          2.5 Nacos 推送

          推送方式

          前面說了服務的注冊和訂閱都會發(fā)生推送(服務端->客戶端),那推送到底是如何實現(xiàn)的呢?

          在早期的Nacos版本,當服務實例變化,服務端會通過udp協(xié)議將最新的數(shù)據(jù)發(fā)送給客戶端,后來發(fā)現(xiàn)udp推送有一定的丟包率,于是新版本的Nacos支持了grpc推送。Nacos服務端會自動判斷客戶端的版本來選擇哪種方式來進行推送,如果你使用1.4.2以前的SDK進行注冊,那Nacos服務端會使用udp協(xié)議來進行推送,反之則使用grpc。

          推送失敗重試

          當發(fā)送推送時,客戶端可能正在重啟,或者連接不穩(wěn)定導致推送失敗,這個時候Nacos會進行重試。Nacos將每個推送都封裝成一個任務對象,放入到隊列中,再開啟一個線程不停的從隊列取出任務執(zhí)行,執(zhí)行之前會先刪除該任務,如果執(zhí)行失敗則將任務重新添加到隊列,該線程會記錄任務執(zhí)行的時間,如果超過1秒,則會記錄到日志。

          推送部分源碼

          添加推送任務到執(zhí)行隊列中。

          private static class PushDelayTaskProcessor implements NacosTaskProcessor {

              private final PushDelayTaskExecuteEngine executeEngine;

              public PushDelayTaskProcessor(PushDelayTaskExecuteEngine executeEngine) {
                  this.executeEngine = executeEngine;
              }

              @Override
              public boolean process(NacosTask task) {
                  PushDelayTask pushDelayTask = (PushDelayTask) task;
                  Service service = pushDelayTask.getService();
                  NamingExecuteTaskDispatcher.getInstance()
                          .dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
                  return true;
              }
          }

          推送任務PushExecuteTask 的執(zhí)行。

          public class PushExecuteTask extends AbstractExecuteTask {

          //..省略

          @Override
          public void run() {
              try {
                  //封裝要推送的服務實例數(shù)據(jù)
                  PushDataWrapper wrapper = generatePushData();
                  ClientManager clientManager = delayTaskEngine.getClientManager();
                  //如果是服務上下線導致的推送,獲取所有訂閱者
                  //如果是訂閱導致的推送,獲取訂閱者
                  for (String each : getTargetClientIds()) {
                      Client client = clientManager.getClient(each);
                      if (null == client) {
                          // means this client has disconnect
                          continue;
                      }
                      Subscriber subscriber = clientManager.getClient(each).getSubscriber(service);
                      //推送給訂閱者
                      delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
                              new NamingPushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
                  }
              } catch (Exception e) {
                  Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
                  //當推送發(fā)生異常,重新將推送任務放入執(zhí)行隊列
                  delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
              }
          }

            //如果是服務上下線導致的推送,獲取所有訂閱者
                  //如果是訂閱導致的推送,獲取訂閱者
              private Collection<String> getTargetClientIds() {
              return delayTask.isPushToAll() ? delayTaskEngine.getIndexesManager().getAllClientsSubscribeService(service)
                      : delayTask.getTargetClients();
          }

          執(zhí)行推送任務線程InnerWorker 的執(zhí)行。

          /**

           * Inner execute worker.

           */
          private class InnerWorker extends Thread {

              InnerWorker(String name) {
                  setDaemon(false);
                  setName(name);
              }

              @Override
              public void run() {
                  while (!closed.get()) {
                      try {
                      //從隊列中取出任務PushExecuteTask 
                          Runnable task = queue.take();
                          long begin = System.currentTimeMillis();
                          //執(zhí)行PushExecuteTask 
                          task.run();
                          long duration = System.currentTimeMillis() - begin;
                          if (duration > 1000L) {
                              log.warn("task {} takes {}ms", task, duration);
                          }
                      } catch (Throwable e) {
                          log.error("[TASK-FAILED] " + e.toString(), e);
                      }
                  }
              }
          }

          2.6 Nacos SDK 查詢服務實例

          服務消費者首先需要調用Nacos SDK的接口來獲取最新的服務實例,然后才能從獲取到的實例列表中以加權輪詢的方式選擇出一個實例(包含ip,port等信息),最后再發(fā)起調用。

          前面已經提到Nacos服務發(fā)生上下線、訂閱的時候都會推送最新的服務實例列表到當客戶端,客戶端再更新本地內存中的緩沖數(shù)據(jù),所以調用Nacos SDK提供的查詢實例列表的接口時,不會直接請求服務端獲取數(shù)據(jù),而是會優(yōu)先使用內存中的服務數(shù)據(jù),只有內存中查不到的情況下才會發(fā)起訂閱請求服務端數(shù)據(jù)。

          Nacos SDK內存中的數(shù)據(jù)除了接受來自服務端的推送更新之外,自己本地也會有一個定時任務定時去獲取服務端數(shù)據(jù)來進行兜底。Nacos SDK在查詢的時候也了容災機制,即從磁盤獲取服務數(shù)據(jù),而這個磁盤的數(shù)據(jù)其實也是來自于內存,有一個定時任務定時從內存緩存中獲取然后加載到磁盤。Nacos SDK的容災機制默認關閉,可通過設置環(huán)境變量failover-mode=true來開啟。

          架構圖

          用戶查詢流程

          查詢服務實例部分源碼

          private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
           @Override
          public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,

                  boolean subscribe) throws NacosException {
              ServiceInfo serviceInfo;
              String clusterString = StringUtils.join(clusters, ",");
              //這里默認傳過來是true
              if (subscribe) {
              //從本地內存獲取服務數(shù)據(jù),如果獲取不到則從磁盤獲取
                  serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
                  if (null == serviceInfo || !clientProxy.isSubscribed(serviceName, groupName, clusterString)) {
                //如果從本地獲取不到數(shù)據(jù),則調用訂閱方法
                      serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
                  }
              } else {
               //適用于不走訂閱,直接從服務端獲取數(shù)據(jù)的情況
                  serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
              }
              List<Instance> list;
              if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
                  return new ArrayList<Instance>();
              }
              return list;
          }
          }
            //從本地內存獲取服務數(shù)據(jù),如果開啟了故障轉移則直接從磁盤獲取,因為當服務端掛了,本地啟動時內存中也沒有數(shù)據(jù)
          public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
              NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());
              String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
              String key = ServiceInfo.getKey(groupedServiceName, clusters);
              //故障轉移則直接從磁盤獲取
              if (failoverReactor.isFailoverSwitch()) {
                  return failoverReactor.getService(key);
              }
              //返回內存中數(shù)據(jù)
              return serviceInfoMap.get(key);
          }

          3. 結語

          本篇文章向大家介紹 Nacos 服務發(fā)現(xiàn)的基本概念和核心能力以及實現(xiàn)的原理,旨在讓大家對 Nacos 的服務注冊與發(fā)現(xiàn)功能有更多的了解,做到心中有數(shù)。


          沒有什么使我停留——除了目的,縱然岸旁有玫瑰、有綠蔭、有寧靜的港灣,我是不系之舟。共勉 ??。

          瀏覽 44
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩 人妻 精品 无码 欧美 | 成人做爰黄AA片免费看三区动漫 | 九九九精品影视 | 五月777 | 青青草午夜 |