<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          使用 Curator 操作 ZooKeeper

          共 6726字,需瀏覽 14分鐘

           ·

          2019-05-24 19:00

          Curator是Netflix公司開源的一個ZooKeeper client library,用于簡化ZooKeeper客戶端編程。它包含如下模塊:

          Framework:Framework是ZooKeeper API的High-Level的封裝,它讓訪問ZooKeeper更加簡單。它基于ZooKeeper添加了一些新的特性,同時屏蔽了訪問ZooKeeper集群在管理連接和重試操作方面的復雜度。

          Recipes:在Framework的基礎上,實現(xiàn)了一些通用的功能,稱之為“菜單”。

          Utilities:訪問ZooKeeper時候的一些公用方法。

          Client:一個Low-Level的ZooKeeper客戶端,并有一些公用方法。

          Errors:Curator的異常處理,包括連接問題,異常恢復等等。

          Extensions:

          連接ZooKeeper

          RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
          CuratorFramework _client = CuratorFrameworkFactory.newClient("10.23.22.237:2181", retryPolicy);
          _client.start();

          + View Code

          Curator通過CuratorFrameworkFactory來創(chuàng)建客戶端。new出來的客戶端可以保存并且重用。在使用之前需要start一下,絕大部分Curator的操作都必須先start。

          在new函數中需要傳入RetryPolicy接口,重連的策略。當和ZooKeeper發(fā)生連接異常或者操作異常的時候,就會使用重連策略。ExponentialBackoffRetry是其中一種重連策略。Curator支持很多種重連策略:RetryNTimes(重連N次策略)、RetryForever(永遠重試策略)、ExponentialBackoffRetry(基于backoff的重連策略)、BoundedExponentialBackoffRetry(有邊界的基于backoff的重連策略,即,設定最大sleep時間)等等。

          ??

          下面是官方例子中,ExponentialBackoffRetry的代碼片段。

          + View Code

          long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)));
           if ( sleepMs > maxSleepMs )
          {
          log.warn(String.format("Sleep extension too large (%d). Pinning to %d", sleepMs, maxSleepMs));
          sleepMs = maxSleepMs;
          }
          return sleepMs;


          可以看出ExponentialBackoffRetry 重連的時間間隔一般是隨著重試的次數遞增的,如果時間間隔計算出來大于默認的最大sleep時間的話,則去最大sleep時間。ExponentialBackoffRetry 除了時間的限制以外,還有最大重連次數的限制。而BoundedExponentialBackoffRetry策略只是讓用戶設置最大sleep時間而已。默認的最大時間是Integer.MAX_VALUE毫秒。

          ZooKeeper節(jié)點操作

          ZooKeeper 節(jié)點優(yōu)點像文件系統(tǒng)的文件夾,每個節(jié)點都可以包含數據。但是ZooKeeper的節(jié)點是有生命周期的,這取決于節(jié)點的類型。在 ZooKeeper 中,節(jié)點類型可以分為持久節(jié)點(PERSISTENT )、臨時節(jié)點(EPHEMERAL),以及時序節(jié)點(SEQUENTIAL ),具體在節(jié)點創(chuàng)建過程中,一般是組合使用,可以生成以下 4 種節(jié)點類型。不同的組合可以應用到不同的業(yè)務場景中。

          ?

          1. 持久化節(jié)點

          持久化節(jié)點創(chuàng)建后,就一直存在,除非有刪除操作主動來刪除這個節(jié)點,持久化節(jié)點不會因為創(chuàng)建該節(jié)點的客戶端會話失效而消失。如果重復創(chuàng)建,客戶端會拋出NodeExistsException異常。

          byte[] data = { 1, 2, 3 };
          _client.create().withMode(CreateMode.PERSISTENT).forPath("/zktest/p1", data);

          ?

          2. 臨時節(jié)點

          創(chuàng)建臨時節(jié)點后,如果客戶端會話失效,那么這個節(jié)點會自動被ZooKeeper刪除。這里是客戶端失效,并不是客戶端斷開連接。因為ZooKeeper服務端和客戶端是用心跳維持狀態(tài),會話留一點時間,這個時間是在創(chuàng)建連接的時候可以設置sessionTimeoutMs參數:

          CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);

          創(chuàng)建臨時節(jié)點的代碼如下:

          _client.create().withMode(CreateMode.EPHEMERAL).forPath("/zktest/e1", data);

          ?

          3. 持久化時序節(jié)點

          _client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/zktest/ps1", data);

          上述代碼執(zhí)行兩次,你會發(fā)現(xiàn)客戶端不會報NodeExistsException異常,ZooKeeper會為你創(chuàng)建2個節(jié)點,ZooKeeper在每個父節(jié)點會為他的第一級子節(jié)點維護一份時序,會記錄每個子節(jié)點創(chuàng)建的先后順序。在創(chuàng)建子節(jié)點的時候,可以設置這個屬性,那么在創(chuàng)建節(jié)點過程中,ZooKeeper會自動為給定節(jié)點名加上一個數字后綴,作為新的節(jié)點名。

          Clipboard Image.png

          ?

          4. 臨時時序節(jié)點

          持久化時序節(jié)點不同的就是節(jié)點會在會話失效的時候回消失。

          _client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/zktest/es1", data);

          ?

          5. 設置和獲取節(jié)點數據

          //設置節(jié)點數據
          _client.setData().forPath("/zktest/ps1", data);
          //獲取節(jié)點數據
          byte[] data2 = _client.getData().forPath("/zktest/ps1");

          ?

          分布式鎖

          使用數據庫、Redis、文件系統(tǒng)都可以實現(xiàn)分布式鎖,同樣ZooKeeper也可以用來實現(xiàn)分布式鎖。Curator提供了InterProcessMutex類來幫助我們實現(xiàn)分布式鎖,其內部就是使用的EPHEMERAL_SEQUENTIAL類型節(jié)點。

          ?

          public void test() throws Exception {
              RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);
          
              _client = CuratorFrameworkFactory.newClient("10.23.22.237:2181", retryPolicy);
              _client.start();
          
              ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
          
              for (int i = 0; i < 5; i++) {
                  fixedThreadPool.submit(new Runnable() {
          
                      @Override
                      public void run() {
          
                          while (true) {
                              try {
                                  dowork();
                              } catch (Exception e) {
                                  // TODO Auto-generated catch block
                                  e.printStackTrace();
                              }
                          }
                      }
                  });
              }
          }
          
          private void dowork() throws Exception {
          
              InterProcessMutex ipm = new InterProcessMutex(_client, "/zktest/distributed_lock");
          
              try {
                  ipm.acquire();
          
                  _logger.info("Thread ID:" + Thread.currentThread().getId() + " acquire the lock");
          
                  Thread.sleep(1000);
          
                  _logger.info("Thread ID:" + Thread.currentThread().getId() + " release the lock");
              } catch (Exception e) {
          
              } finally {
                  ipm.release();
              }
          }


          執(zhí)行結果如下圖:

          Clipboard Image.png

          ?

          acquire()方法,會在給定的路徑下面創(chuàng)建臨時時序節(jié)點的時序節(jié)點。然后它會和父節(jié)點下面的其他節(jié)點比較時序。如果客戶端創(chuàng)建的臨時時序節(jié)點的數字后綴最小的話,則獲得該鎖,函數成功返回。如果沒有獲得到,即,創(chuàng)建的臨時節(jié)點數字后綴不是最小的,則啟動一個watch監(jiān)聽上一個(排在前面一個的節(jié)點)。主線程使用object.wait()進行等待,等待watch觸發(fā)的線程notifyAll(),一旦上一個節(jié)點有事件產生馬上再次出發(fā)時序最小節(jié)點的判斷。

          release()方法就是釋放鎖,內部實現(xiàn)就是刪除創(chuàng)建的EPHEMERAL_SEQUENTIAL節(jié)點。

          Leader選舉

          選舉可以用來實現(xiàn)Master-Slave模式,也可以用來實現(xiàn)主備切換等功能。Curator提供兩種方式實現(xiàn)選舉:LeaderSelector 和 LeaderLatch。兩種方法都可以使用,LeaderLatch語法較為簡單一點,LeaderSelector控制度更高一些。

          使用LeaderSelector:

          public void test() {
              RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);
          
              _client = CuratorFrameworkFactory.newClient("10.23.22.237:2181", retryPolicy);
              _client.start();
          
              dowork();
          
          }
          
          private void dowork() {
          
              LeaderSelectorListener listener = new
          
              LeaderSelectorListenerAdapter() {
                  public void takeLeadership(CuratorFramework client) throws Exception {
                      logger.info("Take the lead.");
          
                      Thread.sleep(10000);
          
                      logger.info("Relinquish the lead.");
                  }
          
              };
          
              LeaderSelector selector = new LeaderSelector(_client, "/zktest/leader", listener);
              selector.autoRequeue();
              selector.start();
          }


          LeaderSelector的內部使用分布式鎖InterProcessMutex實現(xiàn), 并且在LeaderSelector中添加一個Listener,當獲取到鎖的時候執(zhí)行回調函數takeLeadership。函數執(zhí)行完成之后就調用InterProcessMutex.release()釋放鎖,也就是放棄Leader的角色。

          ?

          使用LeaderLatch:

          public void test() {
              RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);
          
              _client = CuratorFrameworkFactory.newClient("10.23.22.237:2181", retryPolicy);
              _client.start();
          
              dowork();
          
          }
          
          private void dowork() {
              LeaderLatch leader = new LeaderLatch(_client, "/zktest/leader");
              leader.addListener(new LeaderLatchListener() {
          
                  @Override
                  public void isLeader() {
                      // TODO Auto-generated method stub
                      logger.info("Take the lead.");
          
                      try {
                          Thread.sleep(10000);
                      } catch (InterruptedException e) {
                          // TODO Auto-generated catch block
                          e.printStackTrace();
                      }
          
                      logger.info("Relinquish the lead.");
                  }
          
                  @Override
                  public void notLeader() {
                      // TODO Auto-generated method stub
                      logger.info("I am not Leader");
                  }
              });
          
              try {
                  leader.start();
              } catch (Exception e) {
                  // TODO Auto-generated catch block
                  e.printStackTrace();
              }
          }


          同樣是實現(xiàn)Leader選舉的LeaderLatch并沒有通過InterProcessMutex實現(xiàn),它使用了原生的創(chuàng)建EPHEMERAL_SEQUENTIAL節(jié)點的功能再次實現(xiàn)了一遍。同樣的在isLeader方法中需要實現(xiàn)Leader的業(yè)務需求,但是一旦isLeader方法返回,就相當于Leader角色放棄了,重新進入選舉過程。



          作者:nick hao

          原文鏈接:https://www.cnblogs.com/haoxinyue/p/6561896.html

          本文轉自博客園網,版權歸原作者所有。

          瀏覽 45
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  精品人妻无码在线 | 三级国产视频 | 国产精品久久久久久妇女6080 | av免费网站 | 亚州成人娱乐网 |