別用ZkClient了,Curator才是ZooKeeper的好伴侶.
zookeeper作為分布式系統(tǒng)中重要的協(xié)調(diào)組件,在后端開發(fā)中是難以繞開的一個(gè)重要知識領(lǐng)域。
可以說,只要在后端領(lǐng)域,比如說Java開發(fā)、大數(shù)據(jù)開發(fā)中待過三年及以上的工程師,或多或少都接觸過或者直接使用過zookeeper。
因此筆者開啟本系列,作為自己學(xué)習(xí)zookeeper(后文均稱為zk)的記錄,如果能夠啟發(fā)讀者那就更好不過了。
zookeeper概述
?ZooKeeper是一個(gè)分布式的,開放源碼的分布式應(yīng)用程序協(xié)調(diào)服務(wù),是Google的Chubby一個(gè)開源的實(shí)現(xiàn),是Hadoop和Hbase的重要組件。
它是一個(gè)為分布式應(yīng)用提供一致性服務(wù)的軟件,提供的功能包括:配置維護(hù)、域名服務(wù)、分布式同步、組服務(wù)等。
?
度娘如是說。
zookeeper,直譯過來就是動(dòng)物園管理員,之所以這么說,主要是因?yàn)樗诖髷?shù)據(jù)技術(shù)棧中扮演了重要的協(xié)調(diào)角色。
在hadoop技術(shù)棧中,各種技術(shù)的logo都是小動(dòng)物,而zookeeper的官方logo也是一位園丁模樣的男士。這也直觀地告訴了使用者,zk的用途和角色。
一言蔽之,zk就是在分布式系統(tǒng)中,對應(yīng)用提供一致性保證,分布式選主,通過各種機(jī)制,對應(yīng)用進(jìn)行協(xié)調(diào),從而使分布式系統(tǒng)對外提供某種特定的服務(wù)。

關(guān)于如何搭建zookeeper,網(wǎng)絡(luò)上文章有很多,本文就不進(jìn)行展開,感興趣的可以自行查找相關(guān)資料。
https://zookeeper.apache.org/
curator概述
?Apache Curator是Netflix公司開源的一套zookeeper客戶端框架,并貢獻(xiàn)給了apache社區(qū)。
它封裝了Zookeeper客戶端底層的api,提供了流式風(fēng)格的api,提供了很多開箱即用的高級特性,如:分布式選主、分布式鎖、path監(jiān)控、node監(jiān)控、更加易用的節(jié)點(diǎn)CRUD操作、分布式隊(duì)列等。
Patrixck Hunt(Zookeeper的commiter)以一句“Guava is to Java that Curator to Zookeeper”給予Curator高度評價(jià)。
?
目前主流的zookeeper客戶端共有三種:
- 官方zookeeper客戶端
- zkClient
- curator
其中,官方的客戶端提供的api都比較底層,開發(fā)者直接拿來用需要進(jìn)行一定的封裝,否則直接使用會顯得過于復(fù)雜和繁瑣;zkclient雖然使用起來比較方便,但是文檔較少,社區(qū)也不太活躍;而curator則是apache頂級項(xiàng)目,擁有活躍的開源社區(qū),且擁有較多的成熟api和高級特性。
因此在zk的客戶端這個(gè)領(lǐng)域,curator大受推崇。
http://curator.apache.org/index.html

curator基礎(chǔ)操作
我們通過代碼來直接感受一下curator操作的快捷。
首先需要在工程中引入curator的依賴:
????
????
??????org.apache.curator
??????curator-framework
??????4.3.0
????
????
????
??????org.apache.curator
??????curator-recipes
??????4.3.0
????
「注意」 筆者使用的zookeeper服務(wù)端版本為3.5.6,因此使用4.0.0以上版本的curator是兼容的。
對于較低版本的zookeeper服務(wù)端,如3.4.x,則需要依賴curator2.x版本,如:2.12.0。如果使用高版本的curator,需要將curator自身依賴的ZooKeeper在maven中exclude掉。并引入對應(yīng)的低版本zookeeper客戶端。
關(guān)于curator與zookeeper具體的版本依賴,請參考官方的說明 ZooKeeper Version Compatibility
創(chuàng)建客戶端實(shí)例
使用Curator第一步是要?jiǎng)?chuàng)建一個(gè)客戶端實(shí)例,代碼如下:(后續(xù)的操作中,第一步均是創(chuàng)建客戶端實(shí)例。后續(xù)講解過程中只粘貼關(guān)鍵代碼,在文章的末尾會粘貼本文中所有的代碼案例的完整代碼)。
????RetryPolicy?retry?=?new?ExponentialBackoffRetry(1000,?3);
????CuratorFramework?client?=
????????????CuratorFrameworkFactory.newClient("127.0.0.1:2181",?retry);
????client.start();
????System.out.println("啟動(dòng)curator客戶端");
簡單解釋下這段代碼的含義:
首先創(chuàng)建一個(gè)重試策略實(shí)例RetryPolicy,當(dāng)客戶端與zk服務(wù)端連接失敗或者超時(shí),curator會使用我們指定的 重試策略進(jìn)行重試。RetryPolicy有多個(gè)實(shí)現(xiàn),這里使用ExponentialBackoffRetry策略,重試三次,每次間隔1秒鐘。
通過CuratorFrameworkFactory創(chuàng)建一個(gè)CuratorFramework實(shí)例,傳入zk連接地址以及重試策略。示例代碼中為單機(jī)方式連接串,如果是多節(jié)點(diǎn)方式只需要通過半角逗號分割的方式進(jìn)行連接即可。
????多節(jié)點(diǎn)連接串
????ip0:port0,ip1:port1,ip2:port2
- 調(diào)用CuratorFramework.start()方法,與zk服務(wù)端建立連接。
如此,我們的客戶端便能夠與zk服務(wù)端建立起長連接,從而為各種交互做準(zhǔn)備。
節(jié)點(diǎn)操作
連接建立好后,我們來學(xué)習(xí)一下curator如何對zk節(jié)點(diǎn)進(jìn)行操作。通俗地說就是通過curator對zk的node做增刪改查操作。
????????try?{
????????????//?增加
????????????client.create()
????????????????????.creatingParentsIfNeeded()
????????????????????.withMode(CreateMode.PERSISTENT)
????????????????????.forPath("/snowalker/path",?"100".getBytes());
????????????????????
????????????//?讀取node的值
????????????byte[]?dataBytes?=?client.getData().forPath("/snowalker/path");
????????????
????????????System.out.println(new?String(dataBytes));
????????????//?修改node對應(yīng)的值
????????????client.setData().forPath("/snowalker/path",?"120".getBytes());
????????????byte[]?dataBytes1?=?client.getData().forPath("/snowalker/path");
????????????
????????????System.out.println(new?String(dataBytes1));
????????????//?獲取子節(jié)點(diǎn)
????????????List?children?=?client.getChildren().forPath("/snowalker");
????????????System.out.println(children);
????????????
????????????//?刪除節(jié)點(diǎn)
????????????client.delete().forPath("/snowalker/path");
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}?finally?{
????????????client.close();
????????}
這段代碼基本上就涵蓋了curator對zk的node進(jìn)行增刪改查的主流操作了。介紹下代碼含義:
- 「增」 首先,通過create方法在zk上創(chuàng)建了 ”/snowalker/path“ 這樣一個(gè)持久化節(jié)點(diǎn)。方法 creatingParentsIfNeeded() 表示 「如果有必要?jiǎng)t創(chuàng)建父節(jié)點(diǎn)」,也就是遞歸地創(chuàng)建多個(gè)節(jié)點(diǎn)。
- 節(jié)點(diǎn)建立后,寫入value;value在zk的node上以字節(jié)形式進(jìn)行存儲;初始值為100,并進(jìn)行打印
- 「查」 通過CuratorFramework.getData().forPath("/snowalker/path") ?可以讀取對應(yīng)節(jié)點(diǎn)的value
- 「改」 接著我們通過CuratorFramework.setData() 修改”/snowalker/path“ 對應(yīng)的value為120,并進(jìn)行打印
- 如果想要獲取某個(gè)子節(jié)點(diǎn),我們可以通過CuratorFramework.getChildren().forPath(path) 方法獲取,返回一個(gè)list;也就是說,zk的子節(jié)點(diǎn)是一對多的(zk文件系統(tǒng)是樹形結(jié)構(gòu))
- 「刪」 通過執(zhí)行CuratorFramework.delete().forPath(path) 能夠?qū)⒅付╬ath進(jìn)行刪除
運(yùn)行代碼,觀察到日志打印如下:
????啟動(dòng)curator客戶端
????100
????120
????[path]
使用zkcli連接zookeeper服務(wù)端,ls看一下 /snowalker 目錄下的節(jié)點(diǎn):
????[zk:?localhost:2181(CONNECTED)?6]?ls?/
????[snowalker]
????[zk:?localhost:2181(CONNECTED)?7]?ls?/snowalker
????[]
????[zk:?localhost:2181(CONNECTED)?8]
當(dāng)前只有 /snowalker 節(jié)點(diǎn)存在,子節(jié)點(diǎn)已經(jīng)被刪除。
可以看到,通過curator,我們通過幾行代碼便實(shí)現(xiàn)了對zk node的增刪改查
curator進(jìn)階操作
zk提供了分布式選主、watcher動(dòng)態(tài)監(jiān)聽等機(jī)制,能夠?yàn)榉植际较到y(tǒng)提供分布式協(xié)調(diào),配置實(shí)時(shí)變更通知等能力。Curator當(dāng)然也提供了對應(yīng)的API供我們進(jìn)行調(diào)用。
接下來我們就分別看一下如何使用Curator來使用這些能力。
集群選主
首先看一下如何利用Curator實(shí)現(xiàn)集群選主。
Curator提供兩種方式進(jìn)行集群選主,分別為:
- LeaderLatch方式
- LeaderElection方式
LeaderLatch方式
首先觀察一下leaderLatch選主方式調(diào)用方式:
????public?class?LeaderLatchDemo?{
????
????????public?static?void?main(String[]?args)?throws?Exception?{
????????????new?LeaderLatchDemo().leaderLatch();
????????}
????
????????public?void?leaderLatch()?throws?Exception?{
首先我們還是要實(shí)例化一個(gè)CuratorFramework客戶端,與zk服務(wù)端建立連接
????????????RetryPolicy?retry?=?new?ExponentialBackoffRetry(1000,?3);
????????????CuratorFramework?client?=
????????????????????CuratorFrameworkFactory.newClient(
????????????????????????????"127.0.0.1:2181",
????????????????????????????5000,
????????????????????????????3000,
????????????????????????????retry);
????????????client.start();
????????????System.out.println("啟動(dòng)curator客戶端");
接著注冊一個(gè)連接狀態(tài)監(jiān)聽器,在回調(diào)方法中根據(jù)返回的連接狀態(tài)進(jìn)行對應(yīng)操作。
當(dāng)連接狀態(tài)ConnectionState為LOST時(shí),表明客戶端到服務(wù)端的連接已經(jīng)斷開,如果當(dāng)前節(jié)點(diǎn)已經(jīng)是leader,那么我們就需要暫停leader身份下的一切事情。
如果我們查看源碼的話,會發(fā)現(xiàn)curator內(nèi)部會將是否為leader的狀態(tài)設(shè)置為false(已經(jīng)不是leader了)
????????????client.getConnectionStateListenable().addListener(new?ConnectionStateListener()?{
????????????????@Override
????????????????public?void?stateChanged(CuratorFramework?client,?ConnectionState?newState)?{
????????????????????switch?(newState)?{
????????????????????????case?LOST:?{
????????????????????????????break;
????????????????????????}
????????????????????}
????????????????}
????????????});
我們聲明一個(gè)路徑作為選主的依據(jù)。
通過 「LeaderLatch leaderLatch = new LeaderLatch(client, latchPath);」 實(shí)例化一個(gè)LeaderLatch實(shí)例,通過它進(jìn)行l(wèi)eader選舉操作。
????????????//?latch
????????????String?latchPath?=?"/snowalker/leader_latch";
????????????LeaderLatch?leaderLatch?=?new?LeaderLatch(client,?latchPath);
????????????//?開啟leader選舉過程
????????????leaderLatch.start();
????????????//?判斷當(dāng)前節(jié)點(diǎn)是否為leader
????????????boolean?hasLeadershipBefore?=?leaderLatch.hasLeadership();
????????????System.out.println("是否成為leader:"?+?hasLeadershipBefore);
????
????????????leaderLatch.await();????
當(dāng)通過 「leaderLatch.start()」 開啟leader選舉之后,我們需要調(diào)用 ?「leaderLatch.await()」 。
如果當(dāng)前的客戶端未成為leader,則會進(jìn)行等待,(內(nèi)部源碼實(shí)現(xiàn)是通過Object.wait()進(jìn)行阻塞) 直到成為leader后,當(dāng)前客戶端線程會被喚醒,繼續(xù)執(zhí)行后續(xù)邏輯。
這里說的后續(xù)邏輯,實(shí)際上就是客戶端作為leader節(jié)點(diǎn)需要執(zhí)行的業(yè)務(wù)邏輯。比如:hdfs中兩臺機(jī)器,當(dāng)其中一臺機(jī)器成為主節(jié)點(diǎn)就會以主節(jié)點(diǎn)的身份對外提供文件相關(guān)服務(wù),而另外一臺非leader機(jī)器則會await在這里,直到它成為leader才會作為主節(jié)點(diǎn)提供服務(wù)。(也就是說,只有被選為leader的節(jié)點(diǎn)才會真正的提供服務(wù),否則它看起來就好像 “假死” 了)
????????????boolean?hasLeadershipAfter?=?leaderLatch.hasLeadership();
????????????System.out.println("是否成為leader:"?+?hasLeadershipAfter);
????????????Thread.sleep(100000);
????????}
????}
最后我們再打印一下節(jié)點(diǎn)的狀態(tài),看當(dāng)前節(jié)點(diǎn)是否成為leader。
運(yùn)行效果
我們同時(shí)啟動(dòng)兩個(gè)LeaderLatchDemo主進(jìn)程,模擬雙節(jié)點(diǎn)下的leader選舉過程。
兩個(gè)客戶端控制臺打印如下:
?客戶端A
?
啟動(dòng)curator客戶端
是否成為leader:false
?客戶端B
?
啟動(dòng)curator客戶端
是否成為leader:false
是否成為leader:true
上述日志打印,表明開始階段,兩個(gè)客戶端均非leader。
當(dāng)經(jīng)過競爭之后,客戶端B成為leader,而客戶端A則阻塞。我們嘗試關(guān)閉客戶端B進(jìn)程,觀察客戶端A的控制臺日志打印:
?客戶端A
?
啟動(dòng)curator客戶端
是否成為leader:false
是否成為leader:true
我們發(fā)現(xiàn),客戶端A成為了leader,從阻塞中喚醒。
這個(gè)小demo直觀地為我們展現(xiàn)了通過leaderLatch進(jìn)行l(wèi)eader選舉的場景。
LeaderElection方式
curator為我們提供了一種更為簡潔的leader選舉方式,它就是 「LeaderElection」 方式。(「實(shí)際上」,LeaderElection與LeaderLatch在原理上幾乎沒有差別,他們的原理都是基于分布式鎖實(shí)現(xiàn)的,只不過LeaderElection方式在使用上更加簡潔,開發(fā)效率更高)
話不多說,直接上代碼:
????public?class?LeaderElectionDemo?{
????
????????public?static?void?main(String[]?args)?throws?InterruptedException?{
首先還是實(shí)例化一個(gè)CuratorFramework建立到zk服務(wù)端的連接。
????????????RetryPolicy?retry?=?new?ExponentialBackoffRetry(1000,?3);
????????????CuratorFramework?client?=
????????????????????CuratorFrameworkFactory.newClient(
????????????????????????????"127.0.0.1:2181",
????????????????????????????5000,
????????????????????????????3000,
????????????????????????????retry);
????????????client.start();
????????????System.out.println("啟動(dòng)curator客戶端");
接著定義一個(gè)leader選舉節(jié)點(diǎn),這個(gè)操作和LeaderLatch相似。
????????????String?election?=?"/leader/election";
這里就不同了,我們需要建立一個(gè)LeaderSelector實(shí)例,它接收CuratorFramework實(shí)例、選舉節(jié)點(diǎn)、以及一個(gè)LeaderSelectorListener ?Leader選舉監(jiān)聽器。
我們需要實(shí)現(xiàn)LeaderSelectorListener的回調(diào)方法:
takeLeadership回調(diào)中需要開發(fā)者實(shí)現(xiàn)當(dāng)成為leader之后的業(yè)務(wù)邏輯。當(dāng)一個(gè)客戶端成為leader之后,便會回調(diào)takeLeadership方法,執(zhí)行l(wèi)eader角色的業(yè)務(wù)邏輯
stateChanged方法需要開發(fā)者實(shí)現(xiàn)當(dāng)連接狀態(tài)發(fā)生變化之后的業(yè)務(wù)邏輯。比如:我們可以直接拋出異常,阻止leader業(yè)務(wù)邏輯繼續(xù)進(jìn)行。待另外的節(jié)點(diǎn)成為leader后執(zhí)行takeLeadership方法
??????LeaderSelector?leaderSelector?=?new?LeaderSelector(
??????????????client,
??????????????election,
??????????????new?LeaderSelectorListener()?{
??????????????????@Override
??????????????????public?void?takeLeadership(CuratorFramework?curatorFramework)?throws?Exception?{
??????????????????????System.out.println("你已經(jīng)成為leader");
??????????????????????//?在?這里干leader的所有事情,此時(shí)方法不能退出
??????????????????????Thread.sleep(Integer.MAX_VALUE);
??????????????????}
??????
??????????????????@Override
??????????????????public?void?stateChanged(CuratorFramework?curatorFramework,?ConnectionState?connectionState)?{
??????????????????????System.out.println("你已經(jīng)不是leader,鏈接狀態(tài)發(fā)生變化,connectionState"?+?connectionState);
??????????????????????if?(connectionState.equals(ConnectionState.LOST))?{
??????????????????????????throw?new?CancelLeadershipException();
??????????????????????}
??????????????????}
??????????????});
通過調(diào)用LeaderSelector.start之后,多個(gè)客戶端會在election節(jié)點(diǎn)下競爭leader角色。當(dāng)某個(gè)客戶端競爭leader成功,就會執(zhí)行takeLeadership回調(diào)方法通知當(dāng)前應(yīng)用節(jié)點(diǎn)已經(jīng)成為leader。接著執(zhí)行l(wèi)eader角色的邏輯即可
????????????leaderSelector.start();
????????????Thread.sleep(Integer.MAX_VALUE);
????????}
????}
運(yùn)行代碼
我們啟動(dòng)兩個(gè)LeaderElectionDemo客戶端,讓他們進(jìn)行l(wèi)eader角色的選舉操作,觀察控制臺輸出:
客戶端A打印如下:
????啟動(dòng)curator客戶端
????你已經(jīng)成為leader
客戶單B打印如下:
????啟動(dòng)curator客戶端
這表明,客戶端A競爭leader成功,并成功執(zhí)行了回調(diào)方法takeLeadership。客戶端B競爭leader失敗,進(jìn)程阻塞。
我們強(qiáng)制關(guān)閉客戶端A,此時(shí)客戶端B控制臺輸出如下:
????啟動(dòng)curator客戶端
????你已經(jīng)成為leader
這表明,客戶端A釋放了leader角色,客戶端B競爭成功,并開始執(zhí)行l(wèi)eader角色的方法。
「事實(shí)上」 LeaderElection方式內(nèi)部實(shí)現(xiàn)機(jī)制幾乎與LeaderLatch方式一模一樣,它本質(zhì)上也是通過分布式鎖競爭成為leader的。
具體到細(xì)節(jié)就是,LeaderElection是通過使用curator實(shí)現(xiàn)的mutex鎖進(jìn)行l(wèi)eader競爭。如果獲取到的鎖就是leader。如果競爭leader的時(shí)候競爭鎖失敗,則會阻塞,并為上個(gè)節(jié)點(diǎn)添加watcher。
當(dāng)上個(gè)節(jié)點(diǎn)對應(yīng)的客戶端down機(jī)或者長時(shí)間斷開連接,則順序臨時(shí)節(jié)點(diǎn)就消失了,此時(shí)watcher會通知后一個(gè)節(jié)點(diǎn)進(jìn)行加鎖。后面的節(jié)點(diǎn)加鎖成功便會成為leader角色。
我們發(fā)現(xiàn),這其實(shí)就是Curator的分布式鎖實(shí)現(xiàn)機(jī)制啊。
后續(xù)我們會對LeaderElection具體的代碼實(shí)現(xiàn)進(jìn)行展開講解。敬請期待。
小結(jié)
本文到此就告一段落了,我們對curator的基本使用以及重要的leader選舉特性進(jìn)行了全方位的講解和展示。
這里做個(gè)預(yù)告,接下來我會帶領(lǐng)讀者朋友們繼續(xù)學(xué)習(xí)curator對zookeeper的watcher機(jī)制的封裝和增強(qiáng)。
