<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          別用ZkClient了,Curator才是ZooKeeper的好伴侶.

          共 10033字,需瀏覽 21分鐘

           ·

          2022-04-01 20:43

          zookeeper作為分布式系統(tǒng)中重要的協(xié)調(diào)組件,在后端開發(fā)中是難以繞開的一個(gè)重要知識領(lǐng)域。

          可以說,只要在后端領(lǐng)域,比如說Java開發(fā)、大數(shù)據(jù)開發(fā)中待過三年及以上的工程師,或多或少都接觸過或者直接使用過zookeeper。

          因此筆者開啟本系列,作為自己學(xué)習(xí)zookeeper(后文均稱為zk)的記錄,如果能夠啟發(fā)讀者那就更好不過了。

          zookeeper概述

          ?

          ZooKeeper是一個(gè)分布式的,開放源碼的分布式應(yīng)用程序協(xié)調(diào)服務(wù),是Google的Chubby一個(gè)開源的實(shí)現(xiàn),是Hadoop和Hbase的重要組件。

          它是一個(gè)為分布式應(yīng)用提供一致性服務(wù)的軟件,提供的功能包括:配置維護(hù)、域名服務(wù)、分布式同步、組服務(wù)等。

          ?

          度娘如是說。

          zookeeper,直譯過來就是動(dòng)物園管理員,之所以這么說,主要是因?yàn)樗诖髷?shù)據(jù)技術(shù)棧中扮演了重要的協(xié)調(diào)角色。

          在hadoop技術(shù)棧中,各種技術(shù)的logo都是小動(dòng)物,而zookeeper的官方logo也是一位園丁模樣的男士。這也直觀地告訴了使用者,zk的用途和角色。

          一言蔽之,zk就是在分布式系統(tǒng)中,對應(yīng)用提供一致性保證,分布式選主,通過各種機(jī)制,對應(yīng)用進(jìn)行協(xié)調(diào),從而使分布式系統(tǒng)對外提供某種特定的服務(wù)。

          3c8180b69a99bef6e7ae03b075112675.webp

          zookeeper官方形象

          關(guān)于如何搭建zookeeper,網(wǎng)絡(luò)上文章有很多,本文就不進(jìn)行展開,感興趣的可以自行查找相關(guān)資料。

          https://zookeeper.apache.org/

          curator概述

          ?

          Apache Curator是Netflix公司開源的一套zookeeper客戶端框架,并貢獻(xiàn)給了apache社區(qū)。

          它封裝了Zookeeper客戶端底層的api,提供了流式風(fēng)格的api,提供了很多開箱即用的高級特性,如:分布式選主、分布式鎖、path監(jiān)控、node監(jiān)控、更加易用的節(jié)點(diǎn)CRUD操作、分布式隊(duì)列等。

          Patrixck Hunt(Zookeeper的commiter)以一句“Guava is to Java that Curator to Zookeeper”給予Curator高度評價(jià)。

          ?

          目前主流的zookeeper客戶端共有三種:

          • 官方zookeeper客戶端
          • zkClient
          • curator

          其中,官方的客戶端提供的api都比較底層,開發(fā)者直接拿來用需要進(jìn)行一定的封裝,否則直接使用會顯得過于復(fù)雜和繁瑣;zkclient雖然使用起來比較方便,但是文檔較少,社區(qū)也不太活躍;而curator則是apache頂級項(xiàng)目,擁有活躍的開源社區(qū),且擁有較多的成熟api和高級特性。

          因此在zk的客戶端這個(gè)領(lǐng)域,curator大受推崇。

          http://curator.apache.org/index.html

          f742ad55ac1389acd3ed09d54e292d40.webp

          curator基礎(chǔ)操作

          我們通過代碼來直接感受一下curator操作的快捷。

          首先需要在工程中引入curator的依賴:

          ????
          ????
          ??????org.apache.curator
          ??????curator-framework
          ??????4.3.0
          ????

          ????
          ????
          ??????org.apache.curator
          ??????curator-recipes
          ??????4.3.0
          ????

          「注意」 筆者使用的zookeeper服務(wù)端版本為3.5.6,因此使用4.0.0以上版本的curator是兼容的。

          對于較低版本的zookeeper服務(wù)端,如3.4.x,則需要依賴curator2.x版本,如:2.12.0。如果使用高版本的curator,需要將curator自身依賴的ZooKeeper在maven中exclude掉。并引入對應(yīng)的低版本zookeeper客戶端。

          關(guān)于curator與zookeeper具體的版本依賴,請參考官方的說明 ZooKeeper Version Compatibility

          創(chuàng)建客戶端實(shí)例

          使用Curator第一步是要?jiǎng)?chuàng)建一個(gè)客戶端實(shí)例,代碼如下:(后續(xù)的操作中,第一步均是創(chuàng)建客戶端實(shí)例。后續(xù)講解過程中只粘貼關(guān)鍵代碼,在文章的末尾會粘貼本文中所有的代碼案例的完整代碼)。

          ????RetryPolicy?retry?=?new?ExponentialBackoffRetry(1000,?3);
          ????CuratorFramework?client?=
          ????????????CuratorFrameworkFactory.newClient("127.0.0.1:2181",?retry);
          ????client.start();
          ????System.out.println("啟動(dòng)curator客戶端");

          簡單解釋下這段代碼的含義:

          1. 首先創(chuàng)建一個(gè)重試策略實(shí)例RetryPolicy,當(dāng)客戶端與zk服務(wù)端連接失敗或者超時(shí),curator會使用我們指定的 重試策略進(jìn)行重試。RetryPolicy有多個(gè)實(shí)現(xiàn),這里使用ExponentialBackoffRetry策略,重試三次,每次間隔1秒鐘。

          2. 通過CuratorFrameworkFactory創(chuàng)建一個(gè)CuratorFramework實(shí)例,傳入zk連接地址以及重試策略。示例代碼中為單機(jī)方式連接串,如果是多節(jié)點(diǎn)方式只需要通過半角逗號分割的方式進(jìn)行連接即可。

          ????多節(jié)點(diǎn)連接串
          ????ip0:port0,ip1:port1,ip2:port2
          1. 調(diào)用CuratorFramework.start()方法,與zk服務(wù)端建立連接。

          如此,我們的客戶端便能夠與zk服務(wù)端建立起長連接,從而為各種交互做準(zhǔn)備。

          節(jié)點(diǎn)操作

          連接建立好后,我們來學(xué)習(xí)一下curator如何對zk節(jié)點(diǎn)進(jìn)行操作。通俗地說就是通過curator對zk的node做增刪改查操作。

          ????????try?{
          ????????????//?增加
          ????????????client.create()
          ????????????????????.creatingParentsIfNeeded()
          ????????????????????.withMode(CreateMode.PERSISTENT)
          ????????????????????.forPath("/snowalker/path",?"100".getBytes());
          ????????????????????
          ????????????//?讀取node的值
          ????????????byte[]?dataBytes?=?client.getData().forPath("/snowalker/path");
          ????????????
          ????????????System.out.println(new?String(dataBytes));
          ????????????//?修改node對應(yīng)的值
          ????????????client.setData().forPath("/snowalker/path",?"120".getBytes());
          ????????????byte[]?dataBytes1?=?client.getData().forPath("/snowalker/path");
          ????????????
          ????????????System.out.println(new?String(dataBytes1));
          ????????????//?獲取子節(jié)點(diǎn)
          ????????????List?children?=?client.getChildren().forPath("/snowalker");
          ????????????System.out.println(children);
          ????????????
          ????????????//?刪除節(jié)點(diǎn)
          ????????????client.delete().forPath("/snowalker/path");
          ????????}?catch?(Exception?e)?{
          ????????????e.printStackTrace();
          ????????}?finally?{
          ????????????client.close();
          ????????}

          這段代碼基本上就涵蓋了curator對zk的node進(jìn)行增刪改查的主流操作了。介紹下代碼含義:

          1. 「增」 首先,通過create方法在zk上創(chuàng)建了 ”/snowalker/path“ 這樣一個(gè)持久化節(jié)點(diǎn)。方法 creatingParentsIfNeeded() 表示 「如果有必要?jiǎng)t創(chuàng)建父節(jié)點(diǎn)」,也就是遞歸地創(chuàng)建多個(gè)節(jié)點(diǎn)。
          2. 節(jié)點(diǎn)建立后,寫入value;value在zk的node上以字節(jié)形式進(jìn)行存儲;初始值為100,并進(jìn)行打印
          3. 「查」 通過CuratorFramework.getData().forPath("/snowalker/path") ?可以讀取對應(yīng)節(jié)點(diǎn)的value
          4. 「改」 接著我們通過CuratorFramework.setData() 修改”/snowalker/path“ 對應(yīng)的value為120,并進(jìn)行打印
          5. 如果想要獲取某個(gè)子節(jié)點(diǎn),我們可以通過CuratorFramework.getChildren().forPath(path) 方法獲取,返回一個(gè)list;也就是說,zk的子節(jié)點(diǎn)是一對多的(zk文件系統(tǒng)是樹形結(jié)構(gòu))
          6. 「刪」 通過執(zhí)行CuratorFramework.delete().forPath(path) 能夠?qū)⒅付╬ath進(jìn)行刪除

          運(yùn)行代碼,觀察到日志打印如下:

          ????啟動(dòng)curator客戶端
          ????100
          ????120
          ????[path]

          使用zkcli連接zookeeper服務(wù)端,ls看一下 /snowalker 目錄下的節(jié)點(diǎn):

          ????[zk:?localhost:2181(CONNECTED)?6]?ls?/
          ????[snowalker]
          ????[zk:?localhost:2181(CONNECTED)?7]?ls?/snowalker
          ????[]
          ????[zk:?localhost:2181(CONNECTED)?8]

          當(dāng)前只有 /snowalker 節(jié)點(diǎn)存在,子節(jié)點(diǎn)已經(jīng)被刪除。

          可以看到,通過curator,我們通過幾行代碼便實(shí)現(xiàn)了對zk node的增刪改查

          curator進(jìn)階操作

          zk提供了分布式選主、watcher動(dòng)態(tài)監(jiān)聽等機(jī)制,能夠?yàn)榉植际较到y(tǒng)提供分布式協(xié)調(diào),配置實(shí)時(shí)變更通知等能力。Curator當(dāng)然也提供了對應(yīng)的API供我們進(jìn)行調(diào)用。

          接下來我們就分別看一下如何使用Curator來使用這些能力。

          集群選主

          首先看一下如何利用Curator實(shí)現(xiàn)集群選主。

          Curator提供兩種方式進(jìn)行集群選主,分別為:

          • LeaderLatch方式
          • LeaderElection方式

          LeaderLatch方式

          首先觀察一下leaderLatch選主方式調(diào)用方式:

          ????public?class?LeaderLatchDemo?{
          ????
          ????????public?static?void?main(String[]?args)?throws?Exception?{
          ????????????new?LeaderLatchDemo().leaderLatch();
          ????????}
          ????
          ????????public?void?leaderLatch()?throws?Exception?{

          首先我們還是要實(shí)例化一個(gè)CuratorFramework客戶端,與zk服務(wù)端建立連接

          ????????????RetryPolicy?retry?=?new?ExponentialBackoffRetry(1000,?3);
          ????????????CuratorFramework?client?=
          ????????????????????CuratorFrameworkFactory.newClient(
          ????????????????????????????"127.0.0.1:2181",
          ????????????????????????????5000,
          ????????????????????????????3000,
          ????????????????????????????retry);
          ????????????client.start();
          ????????????System.out.println("啟動(dòng)curator客戶端");

          接著注冊一個(gè)連接狀態(tài)監(jiān)聽器,在回調(diào)方法中根據(jù)返回的連接狀態(tài)進(jìn)行對應(yīng)操作。

          當(dāng)連接狀態(tài)ConnectionState為LOST時(shí),表明客戶端到服務(wù)端的連接已經(jīng)斷開,如果當(dāng)前節(jié)點(diǎn)已經(jīng)是leader,那么我們就需要暫停leader身份下的一切事情。

          如果我們查看源碼的話,會發(fā)現(xiàn)curator內(nèi)部會將是否為leader的狀態(tài)設(shè)置為false(已經(jīng)不是leader了)

          ????????????client.getConnectionStateListenable().addListener(new?ConnectionStateListener()?{
          ????????????????@Override
          ????????????????public?void?stateChanged(CuratorFramework?client,?ConnectionState?newState)?{
          ????????????????????switch?(newState)?{
          ????????????????????????case?LOST:?{
          ????????????????????????????break;
          ????????????????????????}
          ????????????????????}
          ????????????????}
          ????????????});

          我們聲明一個(gè)路徑作為選主的依據(jù)。

          通過 「LeaderLatch leaderLatch = new LeaderLatch(client, latchPath);」 實(shí)例化一個(gè)LeaderLatch實(shí)例,通過它進(jìn)行l(wèi)eader選舉操作。

          ????????????//?latch
          ????????????String?latchPath?=?"/snowalker/leader_latch";
          ????????????LeaderLatch?leaderLatch?=?new?LeaderLatch(client,?latchPath);
          ????????????//?開啟leader選舉過程
          ????????????leaderLatch.start();
          ????????????//?判斷當(dāng)前節(jié)點(diǎn)是否為leader
          ????????????boolean?hasLeadershipBefore?=?leaderLatch.hasLeadership();
          ????????????System.out.println("是否成為leader:"?+?hasLeadershipBefore);
          ????
          ????????????leaderLatch.await();????

          當(dāng)通過 「leaderLatch.start()」 開啟leader選舉之后,我們需要調(diào)用 ?「leaderLatch.await()」

          如果當(dāng)前的客戶端未成為leader,則會進(jìn)行等待,(內(nèi)部源碼實(shí)現(xiàn)是通過Object.wait()進(jìn)行阻塞) 直到成為leader后,當(dāng)前客戶端線程會被喚醒,繼續(xù)執(zhí)行后續(xù)邏輯。

          這里說的后續(xù)邏輯,實(shí)際上就是客戶端作為leader節(jié)點(diǎn)需要執(zhí)行的業(yè)務(wù)邏輯。比如:hdfs中兩臺機(jī)器,當(dāng)其中一臺機(jī)器成為主節(jié)點(diǎn)就會以主節(jié)點(diǎn)的身份對外提供文件相關(guān)服務(wù),而另外一臺非leader機(jī)器則會await在這里,直到它成為leader才會作為主節(jié)點(diǎn)提供服務(wù)。(也就是說,只有被選為leader的節(jié)點(diǎn)才會真正的提供服務(wù),否則它看起來就好像 “假死” 了)

          ????????????boolean?hasLeadershipAfter?=?leaderLatch.hasLeadership();
          ????????????System.out.println("是否成為leader:"?+?hasLeadershipAfter);
          ????????????Thread.sleep(100000);
          ????????}
          ????}

          最后我們再打印一下節(jié)點(diǎn)的狀態(tài),看當(dāng)前節(jié)點(diǎn)是否成為leader。

          運(yùn)行效果

          我們同時(shí)啟動(dòng)兩個(gè)LeaderLatchDemo主進(jìn)程,模擬雙節(jié)點(diǎn)下的leader選舉過程。

          兩個(gè)客戶端控制臺打印如下:

          ?

          客戶端A

          ?
          啟動(dòng)curator客戶端
          是否成為leader:false
          ?

          客戶端B

          ?
          啟動(dòng)curator客戶端
          是否成為leader:false
          是否成為leader:true

          上述日志打印,表明開始階段,兩個(gè)客戶端均非leader。

          當(dāng)經(jīng)過競爭之后,客戶端B成為leader,而客戶端A則阻塞。我們嘗試關(guān)閉客戶端B進(jìn)程,觀察客戶端A的控制臺日志打印:

          ?

          客戶端A

          ?
          啟動(dòng)curator客戶端
          是否成為leader:false
          是否成為leader:true

          我們發(fā)現(xiàn),客戶端A成為了leader,從阻塞中喚醒。

          這個(gè)小demo直觀地為我們展現(xiàn)了通過leaderLatch進(jìn)行l(wèi)eader選舉的場景。

          LeaderElection方式

          curator為我們提供了一種更為簡潔的leader選舉方式,它就是 「LeaderElection」 方式。(「實(shí)際上」,LeaderElection與LeaderLatch在原理上幾乎沒有差別,他們的原理都是基于分布式鎖實(shí)現(xiàn)的,只不過LeaderElection方式在使用上更加簡潔,開發(fā)效率更高)

          話不多說,直接上代碼:

          ????public?class?LeaderElectionDemo?{
          ????
          ????????public?static?void?main(String[]?args)?throws?InterruptedException?{

          首先還是實(shí)例化一個(gè)CuratorFramework建立到zk服務(wù)端的連接。

          ????????????RetryPolicy?retry?=?new?ExponentialBackoffRetry(1000,?3);
          ????????????CuratorFramework?client?=
          ????????????????????CuratorFrameworkFactory.newClient(
          ????????????????????????????"127.0.0.1:2181",
          ????????????????????????????5000,
          ????????????????????????????3000,
          ????????????????????????????retry);
          ????????????client.start();
          ????????????System.out.println("啟動(dòng)curator客戶端");

          接著定義一個(gè)leader選舉節(jié)點(diǎn),這個(gè)操作和LeaderLatch相似。

          ????????????String?election?=?"/leader/election";

          這里就不同了,我們需要建立一個(gè)LeaderSelector實(shí)例,它接收CuratorFramework實(shí)例、選舉節(jié)點(diǎn)、以及一個(gè)LeaderSelectorListener ?Leader選舉監(jiān)聽器。

          我們需要實(shí)現(xiàn)LeaderSelectorListener的回調(diào)方法:

          • takeLeadership回調(diào)中需要開發(fā)者實(shí)現(xiàn)當(dāng)成為leader之后的業(yè)務(wù)邏輯。當(dāng)一個(gè)客戶端成為leader之后,便會回調(diào)takeLeadership方法,執(zhí)行l(wèi)eader角色的業(yè)務(wù)邏輯

          • stateChanged方法需要開發(fā)者實(shí)現(xiàn)當(dāng)連接狀態(tài)發(fā)生變化之后的業(yè)務(wù)邏輯。比如:我們可以直接拋出異常,阻止leader業(yè)務(wù)邏輯繼續(xù)進(jìn)行。待另外的節(jié)點(diǎn)成為leader后執(zhí)行takeLeadership方法

          ??????LeaderSelector?leaderSelector?=?new?LeaderSelector(
          ??????????????client,
          ??????????????election,
          ??????????????new?LeaderSelectorListener()?{
          ??????????????????@Override
          ??????????????????public?void?takeLeadership(CuratorFramework?curatorFramework)?throws?Exception?{
          ??????????????????????System.out.println("你已經(jīng)成為leader");
          ??????????????????????//?在?這里干leader的所有事情,此時(shí)方法不能退出
          ??????????????????????Thread.sleep(Integer.MAX_VALUE);
          ??????????????????}
          ??????
          ??????????????????@Override
          ??????????????????public?void?stateChanged(CuratorFramework?curatorFramework,?ConnectionState?connectionState)?{
          ??????????????????????System.out.println("你已經(jīng)不是leader,鏈接狀態(tài)發(fā)生變化,connectionState"?+?connectionState);
          ??????????????????????if?(connectionState.equals(ConnectionState.LOST))?{
          ??????????????????????????throw?new?CancelLeadershipException();
          ??????????????????????}
          ??????????????????}
          ??????????????});

          通過調(diào)用LeaderSelector.start之后,多個(gè)客戶端會在election節(jié)點(diǎn)下競爭leader角色。當(dāng)某個(gè)客戶端競爭leader成功,就會執(zhí)行takeLeadership回調(diào)方法通知當(dāng)前應(yīng)用節(jié)點(diǎn)已經(jīng)成為leader。接著執(zhí)行l(wèi)eader角色的邏輯即可

          ????????????leaderSelector.start();
          ????????????Thread.sleep(Integer.MAX_VALUE);
          ????????}
          ????}
          運(yùn)行代碼

          我們啟動(dòng)兩個(gè)LeaderElectionDemo客戶端,讓他們進(jìn)行l(wèi)eader角色的選舉操作,觀察控制臺輸出:

          客戶端A打印如下:

          ????啟動(dòng)curator客戶端
          ????你已經(jīng)成為leader

          客戶單B打印如下:

          ????啟動(dòng)curator客戶端

          這表明,客戶端A競爭leader成功,并成功執(zhí)行了回調(diào)方法takeLeadership。客戶端B競爭leader失敗,進(jìn)程阻塞。

          我們強(qiáng)制關(guān)閉客戶端A,此時(shí)客戶端B控制臺輸出如下:

          ????啟動(dòng)curator客戶端
          ????你已經(jīng)成為leader

          這表明,客戶端A釋放了leader角色,客戶端B競爭成功,并開始執(zhí)行l(wèi)eader角色的方法。

          「事實(shí)上」 LeaderElection方式內(nèi)部實(shí)現(xiàn)機(jī)制幾乎與LeaderLatch方式一模一樣,它本質(zhì)上也是通過分布式鎖競爭成為leader的。

          具體到細(xì)節(jié)就是,LeaderElection是通過使用curator實(shí)現(xiàn)的mutex鎖進(jìn)行l(wèi)eader競爭。如果獲取到的鎖就是leader。如果競爭leader的時(shí)候競爭鎖失敗,則會阻塞,并為上個(gè)節(jié)點(diǎn)添加watcher。

          當(dāng)上個(gè)節(jié)點(diǎn)對應(yīng)的客戶端down機(jī)或者長時(shí)間斷開連接,則順序臨時(shí)節(jié)點(diǎn)就消失了,此時(shí)watcher會通知后一個(gè)節(jié)點(diǎn)進(jìn)行加鎖。后面的節(jié)點(diǎn)加鎖成功便會成為leader角色。

          我們發(fā)現(xiàn),這其實(shí)就是Curator的分布式鎖實(shí)現(xiàn)機(jī)制啊。

          后續(xù)我們會對LeaderElection具體的代碼實(shí)現(xiàn)進(jìn)行展開講解。敬請期待。

          小結(jié)

          本文到此就告一段落了,我們對curator的基本使用以及重要的leader選舉特性進(jìn)行了全方位的講解和展示。

          這里做個(gè)預(yù)告,接下來我會帶領(lǐng)讀者朋友們繼續(xù)學(xué)習(xí)curator對zookeeper的watcher機(jī)制的封裝和增強(qiáng)。


          瀏覽 101
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲草逼| 亚洲内射电影 | 白白日韩在线观看 | 丁香婷婷亚洲 | 亚洲狼人影院 |