Zookeeper 典型應(yīng)用場(chǎng)景場(chǎng)景
之前自己寫(xiě)了一些關(guān)于Zookeeper的基礎(chǔ)知識(shí),Zookeeper作為一種協(xié)調(diào)分布式應(yīng)用高性能的調(diào)度服務(wù),實(shí)際的應(yīng)用場(chǎng)景也非常的廣泛,這里主要通過(guò)幾個(gè)例子來(lái)具體的說(shuō)明Zookeeper在特定場(chǎng)景下的使用方式(下面的這些功能估計(jì)consul和etcd也能實(shí)現(xiàn),以后學(xué)到了再說(shuō)吧)。
2.1.一致性配置管理
我們?cè)陂_(kāi)發(fā)的時(shí)候,有時(shí)候需要獲取一些公共的配置,比如數(shù)據(jù)庫(kù)連接信息等,并且偶然可能需要更新配置。如果我們的服務(wù)器有N多臺(tái)的話(huà),那修改起來(lái)會(huì)特別的麻煩,并且還需要重新啟動(dòng)。這里Zookeeper就可以很方便的實(shí)現(xiàn)類(lèi)似的功能。
2.1.1.思路
1. 將公共的配置存放在Zookeeper的節(jié)點(diǎn)中
2. 應(yīng)用程序可以連接到Zookeeper中并對(duì)Zookeeper中配置節(jié)點(diǎn)進(jìn)行讀取或者修改(對(duì)于寫(xiě)操作可以進(jìn)行權(quán)限驗(yàn)證設(shè)置),下面是具體的流程圖:

2.1.2.事例
public?class?CommonConfig implements?Serializable{
??// 數(shù)據(jù)庫(kù)連接配置
??private?String?dbUrl;
??private?String?username;
??private?String?password;
??private?String?driverClass;
??
??public?CommonConfig() {}
??
??public?CommonConfig(String?dbUrl, String?username, String?password, String?driverClass) {
????super();
????this.dbUrl = dbUrl;
????this.username = username;
????this.password = password;
????this.driverClass = driverClass;
??}
?
??public?String?getDbUrl() {
????return?dbUrl;
??}
?
??public?void?setDbUrl(String?dbUrl) {
????this.dbUrl = dbUrl;
??}
?
??public?String?getUsername() {
????return?username;
??}
?
??public?void?setUsername(String?username) {
????this.username = username;
??}
?
??public?String?getPassword() {
????return?password;
??}
?
??public?void?setPassword(String?password) {
????this.password = password;
??}
?
??public?String?getDriverClass() {
????return?driverClass;
??}
?
??public?void?setDriverClass(String?driverClass) {
????this.driverClass = driverClass;
??}
??
??@Override
??public?String?toString() {
????return?"CommonConfig:{dbUrl:"?+ this.dbUrl +
????????", username:"?+ this.username +
????????", password:"?+ this.password +
????????", driverClass:"?+ this.driverClass + "}";
??}
}public?class?ZkConfigMng?{
??private?String nodePath = "/commConfig";
??private?CommonConfig commonConfig;
??private?ZkClient zkClient;
??
??public?CommonConfig initConfig(CommonConfig commonConfig)?{
????if(commonConfig == null) {
??????this.commonConfig = new?CommonConfig("jdbc:mysql://127.0.0.1:3306/mydata?useUnicode=true&characterEncoding=utf-8",
??????????"root", "root", "com.mysql.jdbc.Driver");
????} else?{
??????this.commonConfig = commonConfig;
????}
????return?this.commonConfig;
??}
??
??/**
???* 更新配置
???*
???* @param?commonConfig
???* @return
???*/
??public?CommonConfig update(CommonConfig commonConfig)?{
????if(commonConfig != null) {
??????this.commonConfig = commonConfig;
????}
????syncConfigToZookeeper();
????return?this.commonConfig;
??}
??
??public?void?syncConfigToZookeeper()?{
????if(zkClient == null) {
??????zkClient = new?ZkClient("127.0.0.1:2181");
????}
????if(!zkClient.exists(nodePath)) {
??????zkClient.createPersistent(nodePath);
????}
????zkClient.writeData(nodePath, commonConfig);
??}
}public?class?ZkConfigClient?implements?Runnable?{
??
??private?String nodePath = "/commConfig";
??
??private?CommonConfig commonConfig;
?
??@Override
??public?void?run()?{
????ZkClient zkClient = new?ZkClient(new?ZkConnection("127.0.0.1:2181", 5000));
????while?(!zkClient.exists(nodePath)) {
??????System.out.println("配置節(jié)點(diǎn)不存在!");
??????try?{
????????TimeUnit.SECONDS.sleep(1);
??????} catch?(InterruptedException e) {
????????e.printStackTrace();
??????}
????}
????// 獲取節(jié)點(diǎn)
????commonConfig = (CommonConfig)zkClient.readData(nodePath);
????System.out.println(commonConfig.toString());
????zkClient.subscribeDataChanges(nodePath, new?IZkDataListener() {
??????
??????@Override
??????public?void?handleDataDeleted(String dataPath)?throws?Exception {
????????if(dataPath.equals(nodePath)) {
??????????System.out.println("節(jié)點(diǎn):"?+ dataPath + "被刪除了!");
????????}
??????}
??????
??????@Override
??????public?void?handleDataChange(String dataPath, Object data)?throws?Exception {
????????if(dataPath.equals(nodePath)) {
??????????System.out.println("節(jié)點(diǎn):"?+ dataPath + ", 數(shù)據(jù):"?+ data + " - 更新");
??????????commonConfig = (CommonConfig) data;
????????}
??????}
????});
??}
?
}下面啟動(dòng)Main函數(shù)
配置管理服務(wù)啟動(dòng)
public?static?void?main(String[] args)?throws?InterruptedException {
????SpringApplication.run(ZookeeperApiDemoApplication.class, args);
????
????ZkConfigMng zkConfigMng = new?ZkConfigMng();
????zkConfigMng.initConfig(null);
????zkConfigMng.syncConfigToZookeeper();
????TimeUnit.SECONDS.sleep(10);
????
????// 修改值
????zkConfigMng.update(new?CommonConfig("jdbc:mysql://192.168.1.122:3306/mydata?useUnicode=true&characterEncoding=utf-8",
????????"root", "wxh", "com.mysql.jdbc.Driver"));
??}
}
public?static?void?main(String[] args)?throws?InterruptedException {
????SpringApplication.run(ZookeeperApiDemoApplication.class, args);
?
????ExecutorService executorService = Executors.newFixedThreadPool(3);
????// 模擬多個(gè)客戶(hù)端獲取配置
????executorService.submit(new?ZkConfigClient());
????executorService.submit(new?ZkConfigClient());
????executorService.submit(new?ZkConfigClient());
??}
}
public?class?DistributedLock?{
??
??// 常亮
??static?class?Constant?{
????private?static?final?int?SESSION_TIMEOUT = 10000;
????private?static?final?String CONNECTION_STRING = "127.0.0.1:2181";
????private?static?final?String LOCK_NODE = "/distributed_lock";
????private?static?final?String CHILDREN_NODE = "/task_";
??}
??
??private?ZkClient zkClient;
??
??public?DistributedLock()?{
????// 連接到Zookeeper
????zkClient = new?ZkClient(new?ZkConnection(Constant.CONNECTION_STRING));
????if(!zkClient.exists(Constant.LOCK_NODE)) {
??????zkClient.create(Constant.LOCK_NODE, "分布式鎖節(jié)點(diǎn)", CreateMode.PERSISTENT);
????}
??}
??
??public?String getLock()?{
????try?{
??????// 1。在Zookeeper指定節(jié)點(diǎn)下創(chuàng)建臨時(shí)順序節(jié)點(diǎn)
??????String lockName = zkClient.createEphemeralSequential(Constant.LOCK_NODE + Constant.CHILDREN_NODE, "");
??????// 嘗試獲取鎖
??????acquireLock(lockName);
??????return?lockName;
????} catch(Exception e) {
??????e.printStackTrace();
????}
????
????return?null;
??}
??
??/**
???* 獲取鎖
???* @throws?InterruptedException
???*/
??public?Boolean acquireLock(String lockName)?throws?InterruptedException {
????// 2.獲取lock節(jié)點(diǎn)下的所有子節(jié)點(diǎn)
????List childrenList = zkClient.getChildren(Constant.LOCK_NODE);
????// 3.對(duì)子節(jié)點(diǎn)進(jìn)行排序,獲取最小值
????Collections.sort(childrenList, new?Comparator() {
??????@Override
??????public?int?compare(String o1, String o2)?{
????????return?Integer.parseInt(o1.split("_")[1]) - Integer.parseInt(o2.split("_")[1]);
??????}
??????
????});
????// 4.判斷當(dāng)前創(chuàng)建的節(jié)點(diǎn)是否在第一位
????int?lockPostion = childrenList.indexOf(lockName.split("/")[lockName.split("/").length - 1]);
????if(lockPostion < 0) {
??????// 不存在該節(jié)點(diǎn)
??????throw?new?ZkNodeExistsException("不存在的節(jié)點(diǎn):"?+ lockName);
????} else?if?(lockPostion == 0) {
??????// 獲取到鎖
??????System.out.println("獲取到鎖:"?+ lockName);
??????return?true;
????} else?if?(lockPostion > 0) {
??????// 未獲取到鎖,阻塞
??????System.out.println("...... 未獲取到鎖,阻塞等待 。。。。。。");
??????// 5.如果未獲取得到鎖,監(jiān)聽(tīng)當(dāng)前創(chuàng)建的節(jié)點(diǎn)前一位的節(jié)點(diǎn)
??????final?CountDownLatch latch = new?CountDownLatch(1);
??????IZkDataListener listener = new?IZkDataListener() {
????????
????????@Override
????????public?void?handleDataDeleted(String dataPath)?throws?Exception {
??????????// 6.前一個(gè)節(jié)點(diǎn)被刪除,當(dāng)不保證輪到自己
??????????System.out.println("。。。。。。前一個(gè)節(jié)點(diǎn)被刪除 。。。。。。");
??????????acquireLock(lockName);
??????????latch.countDown();
????????}
????????
????????@Override
????????public?void?handleDataChange(String dataPath, Object data)?throws?Exception {
??????????// 不用理會(huì)
????????}
??????};
??????try?{
????????zkClient.subscribeDataChanges(Constant.LOCK_NODE + "/"?+ childrenList.get(lockPostion - 1), listener);
????????latch.await();
??????} finally?{
????????zkClient.unsubscribeDataChanges(Constant.LOCK_NODE + "/"?+ childrenList.get(lockPostion - 1), listener);
??????}
????}
????return?false;
??}
??
??/**
???* 釋放鎖(刪除節(jié)點(diǎn))
???*
???* @param?lockName
???*/
??public?void?releaseLock(String lockName)?{
????zkClient.delete(lockName);
??}
??
??public?void?closeZkClient()?{
????zkClient.close();
??}
}
?
@SpringBootApplication
public?class?ZookeeperDemoApplication?{
?
??public?static?void?main(String[] args)?throws?InterruptedException {
????SpringApplication.run(ZookeeperDemoApplication.class, args);
????
????DistributedLock lock = new?DistributedLock();
????String lockName = lock.getLock();
????/**
?????* 執(zhí)行我們的業(yè)務(wù)邏輯
?????*/
????if(lockName != null) {
??????lock.releaseLock(lockName);
????}
????
????lock.closeZkClient();
??}
} 1. 首先利用Zookeeper中臨時(shí)順序節(jié)點(diǎn)的特點(diǎn)
2. 當(dāng)生產(chǎn)者創(chuàng)建節(jié)點(diǎn)生產(chǎn)時(shí),需要判斷父節(jié)點(diǎn)下臨時(shí)順序子節(jié)點(diǎn)的個(gè)數(shù),如果達(dá)到了上限,則阻塞等待;如果沒(méi)有達(dá)到,就創(chuàng)建節(jié)點(diǎn)。
3. 當(dāng)消費(fèi)者獲取節(jié)點(diǎn)時(shí),如果父節(jié)點(diǎn)中不存在臨時(shí)順序子節(jié)點(diǎn),則阻塞等待;如果有子節(jié)點(diǎn),則獲取執(zhí)行自己的業(yè)務(wù),執(zhí)行完畢后刪除該節(jié)點(diǎn)即可。
4. 獲取時(shí)獲取最小值,保證FIFO特性。
public?interface?AppConstant?{
??static?String ZK_CONNECT_STR = "127.0.0.1:2181";
??static?String NODE_PATH = "/mailbox";
??static?String CHILD_NODE_PATH = "/mail_";
??static?int?MAILBOX_SIZE = 10;
}
?
public?class?MailConsumer?implements?Runnable, AppConstant{
?
??private?ZkClient zkClient;
??private?Lock lock;
??private?Condition condition;
??
??public?MailConsumer()?{
????lock = new?ReentrantLock();
????condition = lock.newCondition();
????zkClient = new?ZkClient(new?ZkConnection(ZK_CONNECT_STR));
????System.out.println("sucess connected to zookeeper server!");
????// 不存在就創(chuàng)建mailbox節(jié)點(diǎn)
????if(!zkClient.exists(NODE_PATH)) {
??????zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);
????}
??}
?
??@Override
??public?void?run()?{
????IZkChildListener listener = new?IZkChildListener() {
??????@Override
??????public?void?handleChildChange(String parentPath, List currentChilds) ?throws?Exception {
????????System.out.println("Znode["+parentPath + "] size:"?+ currentChilds.size());
????????// 還是要判斷郵箱是否為空
????????if(currentChilds.size() > 0) {
??????????// 喚醒等待的線(xiàn)程
??????????try?{
????????????lock.lock();
????????????condition.signal();
??????????} catch?(Exception e) {
????????????e.printStackTrace();
??????????} finally?{
????????????lock.unlock();
??????????}
????????}
??????}
????};
????// 監(jiān)視子節(jié)點(diǎn)的改變,不用放用while循環(huán)中,監(jiān)聽(tīng)一次就行了,不需要重復(fù)綁定
????zkClient.subscribeChildChanges(NODE_PATH, listener);
????try?{
??????//循環(huán)隨機(jī)發(fā)送郵件模擬真是情況
??????while(true) {
????????// 判斷是否可以發(fā)送郵件
????????checkMailReceive();
????????// 接受郵件
????????List mailList = zkClient.getChildren(NODE_PATH);
????????// 如果mailsize==0,也沒(méi)有關(guān)系;可以直接循環(huán)獲取就行了
????????if(mailList.size() > 0) {
??????????Collections.sort(mailList, new?Comparator() {
????????????@Override
????????????public?int?compare(String o1, String o2)?{
??????????????return?Integer.parseInt(o1.split("_")[1]) - Integer.parseInt(o2.split("_")[1]);
????????????}
??????????});
??????????// 模擬郵件處理(0-1S)
??????????TimeUnit.MILLISECONDS.sleep(new?Random().nextInt(1000));
??????????zkClient.delete(NODE_PATH + "/"?+ mailList.get(0));
??????????System.out.println("mail has been received:"?+ NODE_PATH + "/"?+ mailList.get(0));
????????}
??????}
????}catch?(Exception e) {
??????e.printStackTrace();
????} finally?{
??????zkClient.unsubscribeChildChanges(NODE_PATH, listener);
????}
??}
?
??private?void?checkMailReceive()?{
????try?{
??????lock.lock();
??????// 判斷郵箱是為空
??????List mailList = zkClient.getChildren(NODE_PATH);
??????System.out.println("mailbox size: "?+ mailList.size());
??????if(mailList.size() == 0) {
????????// 郵箱為空,阻塞消費(fèi)者,直到郵箱有郵件
????????System.out.println("mailbox is empty, please wait 。。。");
????????condition.await();
????????// checkMailReceive();
??????}
????} catch?(Exception e) {
??????e.printStackTrace();
????} finally?{
??????lock.unlock();
????}
??}
}
?
public?class?MailProducer?implements?Runnable, AppConstant{
??
??private?ZkClient zkClient;
??private?Lock lock;
??private?Condition condition;
??
??/**
???* 初始化狀態(tài)
???*/
??public?MailProducer()?{
????lock = new?ReentrantLock();
????condition = lock.newCondition();
????zkClient = new?ZkClient(new?ZkConnection(ZK_CONNECT_STR));
????System.out.println("sucess connected to zookeeper server!");
????// 不存在就創(chuàng)建mailbox節(jié)點(diǎn)
????if(!zkClient.exists(NODE_PATH)) {
??????zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);
????}
??}
?
??@Override
??public?void?run()?{
????IZkChildListener listener = new?IZkChildListener() {
??????@Override
??????public?void?handleChildChange(String parentPath, List currentChilds) ?throws?Exception {
????????System.out.println("Znode["+parentPath + "] size:"?+ currentChilds.size());
????????// 還是要判斷郵箱是否已滿(mǎn)
????????if(currentChilds.size() < MAILBOX_SIZE) {
??????????// 喚醒等待的線(xiàn)程
??????????try?{
????????????lock.lock();
????????????condition.signal();
??????????} catch?(Exception e) {
????????????e.printStackTrace();
??????????} finally?{
????????????lock.unlock();
??????????}
????????}
??????}
????};
????// 監(jiān)視子節(jié)點(diǎn)的改變,不用放用while循環(huán)中,監(jiān)聽(tīng)一次就行了,不需要重復(fù)綁定
????zkClient.subscribeChildChanges(NODE_PATH, listener);
????try?{
??????//循環(huán)隨機(jī)發(fā)送郵件模擬真是情況
??????while(true) {
????????// 判斷是否可以發(fā)送郵件
????????checkMailSend();
????????// 發(fā)送郵件
????????String cretePath = zkClient.createEphemeralSequential(NODE_PATH + CHILD_NODE_PATH, "your mail");
????????System.out.println("your mail has been send:"?+ cretePath);
????????// 模擬隨機(jī)間隔的發(fā)送郵件(0-10S)
????????TimeUnit.MILLISECONDS.sleep(new?Random().nextInt(1000));
??????}
????}catch?(Exception e) {
??????e.printStackTrace();
????} finally?{
??????zkClient.unsubscribeChildChanges(NODE_PATH, listener);
????}
??}
??
??private?void?checkMailSend()?{
????try?{
??????lock.lock();
??????// 判斷郵箱是否已滿(mǎn)
??????List mailList = zkClient.getChildren(NODE_PATH);
??????System.out.println("mailbox size: "?+ mailList.size());
??????if(mailList.size() >= MAILBOX_SIZE) {
????????// 郵箱已滿(mǎn),阻塞生產(chǎn)者,直到郵箱有空間
????????System.out.println("mailbox is full, please wait 。。。");
????????condition.await();
????????checkMailSend();
??????}
????} catch?(Exception e) {
??????e.printStackTrace();
????} finally?{
??????lock.unlock();
????}
??}
} 
/**
?* 服務(wù)提供者
?*
?* @author Administrator
?*
?*/
public?class?ServiceProvider?{
??// 靜態(tài)常量
??static?String ZK_CONNECT_STR = "127.0.0.1:2181";
??static?String NODE_PATH = "/service";
??static?String SERIVCE_NAME = "/myService";
??
??private?ZkClient zkClient;
??
??public?ServiceProvider() {
????zkClient = new?ZkClient(new?ZkConnection(ZK_CONNECT_STR));
????System.out.println("sucess connected to zookeeper server!");
????// 不存在就創(chuàng)建NODE_PATH節(jié)點(diǎn)
????if(!zkClient.exists(NODE_PATH)) {
??????zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);
????}
??}
??
??public?void?registryService(String localIp, Object obj) {
????if(!zkClient.exists(NODE_PATH + SERIVCE_NAME)) {
??????zkClient.create(NODE_PATH + SERIVCE_NAME, "provider services list", CreateMode.PERSISTENT);
????}
????// 對(duì)自己的服務(wù)進(jìn)行注冊(cè)
????zkClient.createEphemeral(NODE_PATH + SERIVCE_NAME + "/"?+ localIp, obj);
????System.out.println("注冊(cè)成功!["?+ localIp + "]");
??}
}
?
/**
?* 消費(fèi)者,通過(guò)某種均衡負(fù)載算法選擇某一個(gè)提供者
?*
?* @author Administrator
?*
?*/
public?class?ServiceConsumer?{
??// 靜態(tài)常量
??static?String ZK_CONNECT_STR = "127.0.0.1:2181";
??static?String NODE_PATH = "/service";
??static?String SERIVCE_NAME = "/myService";
??
??private?List serviceList = new?ArrayList();
??
??private?ZkClient zkClient;
??
??public?ServiceConsumer() {
????zkClient = new?ZkClient(new?ZkConnection(ZK_CONNECT_STR));
????System.out.println("sucess connected to zookeeper server!");
????// 不存在就創(chuàng)建NODE_PATH節(jié)點(diǎn)
????if(!zkClient.exists(NODE_PATH)) {
??????zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);
????}
??}
??
??/**
???* 訂閱服務(wù)
???*/
??public?void?subscribeSerivce() {
????serviceList = zkClient.getChildren(NODE_PATH + SERIVCE_NAME);
????zkClient.subscribeChildChanges(NODE_PATH + SERIVCE_NAME, new?IZkChildListener() {
??????@Override
??????public?void?handleChildChange(String parentPath, List currentChilds ) throws Exception {
????????serviceList = currentChilds;
??????}
????});
??}
??
??/**
???* 模擬調(diào)用服務(wù)
???*/
??public?void?consume() {
????//負(fù)載均衡算法獲取某臺(tái)機(jī)器調(diào)用服務(wù)
????int?index = new?Random().nextInt(serviceList.size());
????System.out.println("調(diào)用["?+ NODE_PATH + SERIVCE_NAME + "]服務(wù):"?+ serviceList.get(index));
??}
} 【END】
