<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Dubbo 原理,服務(wù)是如何注冊(cè)的?

          共 23890字,需瀏覽 48分鐘

           ·

          2022-07-31 21:35


          之前的文章我們講解了服務(wù)在發(fā)布時(shí)有三個(gè)階段:準(zhǔn)備、發(fā)布服務(wù)、注冊(cè)服務(wù)。之前兩個(gè)階段已經(jīng)詳細(xì)講解過(guò),這篇文章將講解最后一個(gè)階段服務(wù)的注冊(cè)。前置知識(shí)不懂的請(qǐng)移步這篇文章。


          Dubbo原理,服務(wù)暴露的前置工作|原創(chuàng)


          「后臺(tái)回復(fù)"dubbo源碼"獲得源碼筆記倉(cāng)庫(kù)地址」

          服務(wù)注冊(cè)

          我們都知道從理論上來(lái)說(shuō),其實(shí)當(dāng)provider暴露服務(wù)之后comsumer就可以直連然后開(kāi)始調(diào)用服務(wù),注冊(cè)中心并不是必須的。但是這樣的方式不利于服務(wù)治理,所以僅在測(cè)試環(huán)境中可以這樣操作,在生產(chǎn)中,注冊(cè)中心實(shí)際上是必不可少的,我們需要依賴注冊(cè)中心來(lái)管理服務(wù)的上下線和拉取。

          在Dubbo中默認(rèn)使用Zookeeper作為注冊(cè)中心,其他類型的注冊(cè)中心原理也類似。

          回到RegistryProtocol.export()方法中,可以看到注冊(cè)中心的邏輯主要分為兩部分,一是獲得注冊(cè)中心實(shí)例,二是向注冊(cè)中心注冊(cè)服務(wù)。

          // 根據(jù) URL 加載 Registry 實(shí)現(xiàn)類,比如 ZookeeperRegistry
          final Registry registry = getRegistry(originInvoker);
          // 獲取已注冊(cè)的服務(wù)提供者 URL
          final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

          // decide if we need to delay publish
          boolean register = providerUrl.getParameter(REGISTER_KEY, true);
          // 根據(jù) register 的值決定是否注冊(cè)服務(wù)
          if (register) {
              // 重點(diǎn)??!注冊(cè)邏輯
              registry.register(registeredProviderUrl);
          }

          創(chuàng)建注冊(cè)中心

          getRegistry()中,首先我們會(huì)根據(jù)url中的參數(shù),獲得自適應(yīng)擴(kuò)展實(shí)例RegistryFactory$Adaptive,然后獲得對(duì)應(yīng)具體實(shí)現(xiàn)。接著調(diào)用RegistryFactory.getRegistry() 方法,最終走到默認(rèn)實(shí)現(xiàn) AbstractRegistryFactory 中。

          AbstractRegistryFactory中,會(huì)首先構(gòu)建注冊(cè)服務(wù)需要的URL和key,然后根據(jù)key查詢本地緩存中服務(wù)是否存在,存在說(shuō)明已經(jīng)暴露,直接返回,不存在則調(diào)用子類實(shí)現(xiàn)createRegistry(url)創(chuàng)建注冊(cè)中心服務(wù),這里再次使用了模板方法的設(shè)計(jì)模式。

          protected Registry getRegistry(final Invoker<?> originInvoker) {
              URL registryUrl = getRegistryUrl(originInvoker);
              return getRegistry(registryUrl);
          }
          @Override
          public Registry getRegistry(URL url) {

              Registry defaultNopRegistry = getDefaultNopRegistryIfDestroyed();
              if (null != defaultNopRegistry) {
                  return defaultNopRegistry;
              }

              //  構(gòu)建注冊(cè)u(píng)rl
              url = URLBuilder.from(url)
                      .setPath(RegistryService.class.getName())
                      .addParameter(INTERFACE_KEYRegistryService.class.getName())
                      .removeParameters(EXPORT_KEYREFER_KEY)
                      .build()
          ;
              String key = createRegistryCacheKey(url);
              // Lock the registry access process to ensure a single instance of the registry
              LOCK.lock();
              try {
                  // double check
                  // fix https://github.com/apache/dubbo/issues/7265.
                  defaultNopRegistry = getDefaultNopRegistryIfDestroyed();
                  if (null != defaultNopRegistry) {
                      return defaultNopRegistry;
                  }
                  // 緩存中獲取key = zookeeper://houduankaifa.club:2181/org.apache.dubbo.registry.RegistryService
                  Registry registry = REGISTRIES.get(key);
                  if (registry != null) {
                      return registry;
                  }
                  //create registry by spi/ioc
                  // 緩存未命中,創(chuàng)建 Registry 實(shí)例,模板方法,子類實(shí)現(xiàn)
                  registry = createRegistry(url);
                  if (registry == null) {
                      throw new IllegalStateException("Can not create registry " + url);
                  }
                  REGISTRIES.put(key, registry);
                  return registry;
              } finally {
                  // Release the lock
                  LOCK.unlock();
              }
          }

          protected abstract Registry createRegistry(URL url);

          進(jìn)入ZookeeperRegistryFactory后,便會(huì)開(kāi)始新建一個(gè)ZookeeperRegistry,這里沒(méi)什么邏輯。

          // zookeeperTransporter 由 SPI 在運(yùn)行時(shí)注入,類型為 ZookeeperTransporter$Adaptive
          private ZookeeperTransporter zookeeperTransporter;

          public ZookeeperRegistryFactory() {
              this.zookeeperTransporter = ZookeeperTransporter.getExtension();
          }

          /**
           * Invisible injection of zookeeper client via IOC/SPI
           *
           * @param zookeeperTransporter
           */

          @DisableInject
          public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
              this.zookeeperTransporter = zookeeperTransporter;
          }

          @Override
          public Registry createRegistry(URL url) {
              // 創(chuàng)建 ZookeeperRegistry
              return new ZookeeperRegistry(url, zookeeperTransporter);
          }

          在下面的代碼代碼中,主要?jiǎng)?chuàng)建了新的Zk客戶端,并且添加了監(jiān)聽(tīng)器對(duì)各種事件進(jìn)行處理。

          我們重點(diǎn)關(guān)注 ZookeeperTransporter 的 connect() 方法調(diào)用,這個(gè)方法用于創(chuàng)建 Zookeeper 客戶端。創(chuàng)建好 Zookeeper 客戶端,意味著注冊(cè)中心的創(chuàng)建過(guò)程就結(jié)束了。接下來(lái),再來(lái)分析一下 Zookeeper 客戶端的創(chuàng)建過(guò)程。

          public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
              super(url);
              if (url.isAnyHost()) {
                  throw new IllegalStateException("registry address == null");
              }
              // 獲取組名,默認(rèn)為 dubbo
              String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
              if (!group.startsWith(PATH_SEPARATOR)) {
                  group = PATH_SEPARATOR + group;
              }
              this.root = group;
              // 創(chuàng)建 Zookeeper 客戶端,默認(rèn)為 CuratorZookeeperTransporter
              zkClient = zookeeperTransporter.connect(url);
              // 添加狀態(tài)監(jiān)聽(tīng)器
              zkClient.addStateListener((state) -> {
                  if (state == StateListener.RECONNECTED) {
                      logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
                              " Since ephemeral ZNode will not get deleted for a connection lose, " +
                              "there's no need to re-register url of this instance.");
                      ZookeeperRegistry.this.fetchLatestAddresses();
                  } else if (state == StateListener.NEW_SESSION_CREATED) {
                      logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
                      try {
                          ZookeeperRegistry.this.recover();
                      } catch (Exception e) {
                          logger.error(e.getMessage(), e);
                      }
                  } else if (state == StateListener.SESSION_LOST) {
                     ……無(wú)處理
              });
          }

          接著跟蹤,會(huì)進(jìn)入AbstractZookeeperTransporterconnect()方法中,這里重點(diǎn)關(guān)注createZookeeperClient()方法,其他都是一些緩存判斷邏輯。

          public ZookeeperClient connect(URL url) {
              ZookeeperClient zookeeperClient;
              // address format: {[username:password@]address}
              List<String> addressList = getURLBackupAddress(url);
              // The field define the zookeeper server , including protocol, host, port, username, password
              if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
                  logger.info("find valid zookeeper client from the cache for address: " + url);
                  return zookeeperClient;
              }
              // avoid creating too many connections, so add lock
              synchronized (zookeeperClientMap) {
                  if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
                      logger.info("find valid zookeeper client from the cache for address: " + url);
                      return zookeeperClient;
                  }

                  // 重點(diǎn)!創(chuàng)建zkclient
                  zookeeperClient = createZookeeperClient(url);
                  logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
                  writeToClientMap(addressList, zookeeperClient);
              }
              return zookeeperClient;
          }

          前面說(shuō)過(guò),這里的 zookeeperTransporter 類型為自適應(yīng)拓展類,因此 connect 方法會(huì)在被調(diào)用時(shí)決定加載什么類型的 ZookeeperTransporter 拓展,默認(rèn)為 CuratorZookeeperTransporter。下面我們到 CuratorZookeeperTransporter 中看一看。

          //CuratorZookeeperTransporter
          public ZookeeperClient connect(URL url) {
              // 創(chuàng)建 CuratorZookeeperClient
              return new CuratorZookeeperClient(url);
          }

          繼續(xù)向下看。CuratorZookeeperClient 構(gòu)造方法主要用于創(chuàng)建和啟動(dòng) CuratorFramework 實(shí)例。以上基本上都是 Curator 框架的代碼,大家如果對(duì) Curator 框架不是很了解,可以參考 Curator 官方文檔。在這個(gè)構(gòu)造方法中,最終會(huì)啟動(dòng)zk客戶端。

          public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher{

              private final CuratorFramework client;
              
              public CuratorZookeeperClient(URL url) {
                  super(url);
                  try {
                      // 創(chuàng)建 CuratorFramework 構(gòu)造器
                      CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                              .connectString(url.getBackupAddress())
                              .retryPolicy(new RetryNTimes(11000))
                              .connectionTimeoutMs(5000);
                      String authority = url.getAuthority();
                      if (authority != null && authority.length() > 0) {
                          builder = builder.authorization("digest", authority.getBytes());
                      }
                      // 構(gòu)建 CuratorFramework 實(shí)例
                      client = builder.build();
                      // 添加監(jiān)聽(tīng)器
                      client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
                          @Override
                          public void stateChanged(CuratorFramework client, ConnectionState state) {
                              if (state == ConnectionState.LOST) {
                                  CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
                              } else if (state == ConnectionState.CONNECTED) {
                                  CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
                              } else if (state == ConnectionState.RECONNECTED) {
                                  CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
                              }
                          }
                      });
                      
                      // 啟動(dòng)客戶端
                      client.start();
                  } catch (Exception e) {
                      throw new IllegalStateException(e.getMessage(), e);
                  }
              }
          }

          到此為止注冊(cè)中心實(shí)例就創(chuàng)建好了,接下來(lái)要做的事情是向注冊(cè)中心注冊(cè)服務(wù)。

          節(jié)點(diǎn)創(chuàng)建

          以 Zookeeper 為例,所謂的服務(wù)注冊(cè),本質(zhì)上是將服務(wù)配置數(shù)據(jù)寫入到 Zookeeper 的某個(gè)路徑的節(jié)點(diǎn)下。每一個(gè)服務(wù)就對(duì)應(yīng)一個(gè)zk中的 node。為了讓大家有一個(gè)直觀的了解,下面我們將 Dubbo 的 demo 跑起來(lái),然后通過(guò) Zookeeper 可視化客戶端 ZooInspector 查看節(jié)點(diǎn)數(shù)據(jù)。如下:

          從上圖中可以看到 com.alibaba.dubbo.demo.DemoService 這個(gè)服務(wù)對(duì)應(yīng)的配置信息(存儲(chǔ)在 URL 中)最終被注冊(cè)到了 /dubbo/com.alibaba.dubbo.demo.DemoService/providers/ 節(jié)點(diǎn)下。搞懂了服務(wù)注冊(cè)的本質(zhì),那么接下來(lái)我們就可以去閱讀服務(wù)注冊(cè)的代碼了。服務(wù)注冊(cè)的接口為 register(URL),這個(gè)方法定義在 FailbackRegistry 抽象類中。代碼如下:

          public void register(URL url) {
              super.register(url);
              failedRegistered.remove(url);
              failedUnregistered.remove(url);
              try {
                  // 模板方法,由子類實(shí)現(xiàn)
                  doRegister(url);
              } catch (Exception e) {
                  Throwable t = e;

                  // 獲取 check 參數(shù),若 check = true 將會(huì)直接拋出異常
                  boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                          && url.getParameter(Constants.CHECK_KEY, true)
                          && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
                  boolean skipFailback = t instanceof SkipFailbackWrapperException;
                  if (check || skipFailback) {
                      if (skipFailback) {
                          t = t.getCause();
                      }
                      throw new IllegalStateException("Failed to register");
                  } else {
                      logger.error("Failed to register");
                  }

                  // 記錄注冊(cè)失敗的鏈接
                  failedRegistered.add(url);
              }
          }

          protected abstract void doRegister(URL url);

          如上,我們重點(diǎn)關(guān)注 doRegister 方法調(diào)用即可,其他的代碼先忽略。doRegister 方法是一個(gè)模板方法,因此我們到 FailbackRegistry 子類 ZookeeperRegistry 中進(jìn)行分析。如下:

          public void doRegister(URL url) {
              try {
                  // 通過(guò) Zookeeper 客戶端創(chuàng)建節(jié)點(diǎn),節(jié)點(diǎn)路徑由 toUrlPath 方法生成,路徑格式如下:
                  //   /${group}/${serviceInterface}/providers/${url}
                  // 比如
                  //   /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
                  zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
              } catch (Throwable e) {
                  throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
              }
          }

          如上,ZookeeperRegistry 在 doRegister 中調(diào)用了 Zookeeper 客戶端創(chuàng)建服務(wù)節(jié)點(diǎn)。節(jié)點(diǎn)路徑由 toUrlPath 方法生成,該方法邏輯不難理解,就不分析了。接下來(lái)分析 create 方法,如下:

          public void create(String path, boolean ephemeral) {
              if (!ephemeral) {
                  if (persistentExistNodePath.contains(path)) {
                      return;
                  }
                  if (checkExists(path)) {
                      // 如果要?jiǎng)?chuàng)建的節(jié)點(diǎn)類型非臨時(shí)節(jié)點(diǎn),那么這里要檢測(cè)節(jié)點(diǎn)是否存在
                      persistentExistNodePath.add(path);
                      return;
                  }
              }
              int i = path.lastIndexOf('/');
              if (i > 0) {
                  // 遞歸創(chuàng)建上一級(jí)路徑
                  create(path.substring(0, i), false);
              }
              // 根據(jù) ephemeral 的值創(chuàng)建臨時(shí)或持久節(jié)點(diǎn)
              if (ephemeral) {
                  createEphemeral(path);
              } else {
                  createPersistent(path);
                  persistentExistNodePath.add(path);
              }
          }

          上面方法先是通過(guò)遞歸創(chuàng)建當(dāng)前節(jié)點(diǎn)的上一級(jí)路徑,然后再根據(jù) ephemeral 的值決定創(chuàng)建臨時(shí)還是持久節(jié)點(diǎn)。createEphemeral 和 createPersistent 這兩個(gè)方法都比較簡(jiǎn)單,這里簡(jiǎn)單分析其中的一個(gè)。如下:

          public void createEphemeral(String path) {
              try {
                  // 通過(guò) Curator 框架創(chuàng)建節(jié)點(diǎn)
                  client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
              } catch (NodeExistsException e) {
                  logger.warn("ZNode " + path + " already exists, since we will only try to recreate a node on a session expiration" +
                          ", this duplication might be caused by a delete delay from the zk server, which means the old expired session" +
                          " may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, " +
                          "we can just try to delete and create again.", e);
                  deletePath(path);
                  createEphemeral(path);
              } catch (Exception e) {
                  throw new IllegalStateException(e.getMessage(), e);
              }
          }

          好了,到此關(guān)于服務(wù)注冊(cè)的過(guò)程就分析完了。

          整個(gè)過(guò)程可簡(jiǎn)單總結(jié)為:先創(chuàng)建注冊(cè)中心實(shí)例,之后再通過(guò)注冊(cè)中心實(shí)例注冊(cè)服務(wù)。

          瀏覽 64
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  香蕉国产视频2024 | 日本黄色免费看 | 欧美性爱日韩精品 | 国内毛片儿 | 成人黄A片 |