Zookeeper 最典型的應(yīng)用場(chǎng)景(理論 + 實(shí)戰(zhàn))
300本計(jì)算機(jī)編程的經(jīng)典書籍下載
AI全套:Python3+TensorFlow打造人臉識(shí)別智能小程序
最新人工智能資料-Google工程師親授 Tensorflow-入門到進(jìn)階
黑馬頭條項(xiàng)目 - Java Springboot2.0(視頻、資料、代碼和講義)14天完整版
1.前言
2.具體應(yīng)用
2.1.一致性配置管理
我們?cè)陂_發(fā)的時(shí)候,有時(shí)候需要獲取一些公共的配置,比如數(shù)據(jù)庫(kù)連接信息等,并且偶然可能需要更新配置。如果我們的服務(wù)器有N多臺(tái)的話,那修改起來會(huì)特別的麻煩,并且還需要重新啟動(dòng)。這里Zookeeper就可以很方便的實(shí)現(xiàn)類似的功能。
2.1.1.思路
將公共的配置存放在Zookeeper的節(jié)點(diǎn)中
應(yīng)用程序可以連接到Zookeeper中并對(duì)Zookeeper中配置節(jié)點(diǎn)進(jìn)行讀取或者修改(對(duì)于寫操作可以進(jìn)行權(quán)限驗(yàn)證設(shè)置),下面是具體的流程圖:
2.1.2.事例
數(shù)據(jù)庫(kù)配置信息一致性的維護(hù)
配置類:
public class CommonConfig implements Serializable{
// 數(shù)據(jù)庫(kù)連接配置
private String dbUrl;
private String username;
private String password;
private String driverClass;
public CommonConfig() {}
public CommonConfig(String dbUrl, String username, String password, String driverClass) {
super();
this.dbUrl = dbUrl;
this.username = username;
this.password = password;
this.driverClass = driverClass;
}
public String getDbUrl() {
return dbUrl;
}
public void setDbUrl(String dbUrl) {
this.dbUrl = dbUrl;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getDriverClass() {
return driverClass;
}
public void setDriverClass(String driverClass) {
this.driverClass = driverClass;
}
@Override
public String toString() {
return "CommonConfig:{dbUrl:" + this.dbUrl +
", username:" + this.username +
", password:" + this.password +
", driverClass:" + this.driverClass + "}";
}
}
獲取本地配置信息 修改配置,并同步
同步配置信息到Zookeeper服務(wù)器
public class ZkConfigMng {
private String nodePath = "/commConfig";
private CommonConfig commonConfig;
private ZkClient zkClient;
public CommonConfig initConfig(CommonConfig commonConfig) {
if(commonConfig == null) {
this.commonConfig = new CommonConfig("jdbc:mysql://127.0.0.1:3306/mydata?useUnicode=true&characterEncoding=utf-8",
"root", "root", "com.mysql.jdbc.Driver");
} else {
this.commonConfig = commonConfig;
}
return this.commonConfig;
}
/**
* 更新配置
*
* @param commonConfig
* @return
*/
public CommonConfig update(CommonConfig commonConfig) {
if(commonConfig != null) {
this.commonConfig = commonConfig;
}
syncConfigToZookeeper();
return this.commonConfig;
}
public void syncConfigToZookeeper() {
if(zkClient == null) {
zkClient = new ZkClient("127.0.0.1:2181");
}
if(!zkClient.exists(nodePath)) {
zkClient.createPersistent(nodePath);
}
zkClient.writeData(nodePath, commonConfig);
}
}
public class ZkConfigClient implements Runnable {
private String nodePath = "/commConfig";
private CommonConfig commonConfig;
@Override
public void run() {
ZkClient zkClient = new ZkClient(new ZkConnection("127.0.0.1:2181", 5000));
while (!zkClient.exists(nodePath)) {
System.out.println("配置節(jié)點(diǎn)不存在!");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 獲取節(jié)點(diǎn)
commonConfig = (CommonConfig)zkClient.readData(nodePath);
System.out.println(commonConfig.toString());
zkClient.subscribeDataChanges(nodePath, new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
if(dataPath.equals(nodePath)) {
System.out.println("節(jié)點(diǎn):" + dataPath + "被刪除了!");
}
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
if(dataPath.equals(nodePath)) {
System.out.println("節(jié)點(diǎn):" + dataPath + ", 數(shù)據(jù):" + data + " - 更新");
commonConfig = (CommonConfig) data;
}
}
});
}
}
配置管理服務(wù)啟動(dòng)
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(ZookeeperApiDemoApplication.class, args);
ZkConfigMng zkConfigMng = new ZkConfigMng();
zkConfigMng.initConfig(null);
zkConfigMng.syncConfigToZookeeper();
TimeUnit.SECONDS.sleep(10);
// 修改值
zkConfigMng.update(new CommonConfig("jdbc:mysql://192.168.1.122:3306/mydata?useUnicode=true&characterEncoding=utf-8",
"root", "wxh", "com.mysql.jdbc.Driver"));
}
}
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(ZookeeperApiDemoApplication.class, args);
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 模擬多個(gè)客戶端獲取配置
executorService.submit(new ZkConfigClient());
executorService.submit(new ZkConfigClient());
executorService.submit(new ZkConfigClient());
}
}
2.2.分布式鎖
在我們?nèi)粘5拈_發(fā)中,如果是單個(gè)進(jìn)程中對(duì)共享資源的訪問,我們只需要用synchronized或者lock就能實(shí)現(xiàn)互斥操作。但是對(duì)于跨進(jìn)程、跨主機(jī)、跨網(wǎng)絡(luò)的共享資源似乎就無能為力了。
2.1.1.思路
首先zookeeper中我們可以創(chuàng)建一個(gè)
/distributed_lock持久化節(jié)點(diǎn)然后再在
/distributed_lock節(jié)點(diǎn)下創(chuàng)建自己的臨時(shí)順序節(jié)點(diǎn),比如:/distributed_lock/task_00000000008獲取所有的
/distributed_lock下的所有子節(jié)點(diǎn),并排序判讀自己創(chuàng)建的節(jié)點(diǎn)是否最小值(第一位)
如果是,則獲取得到鎖,執(zhí)行自己的業(yè)務(wù)邏輯,最后刪除這個(gè)臨時(shí)節(jié)點(diǎn)。
如果不是最小值,則需要監(jiān)聽自己創(chuàng)建節(jié)點(diǎn)前一位節(jié)點(diǎn)的數(shù)據(jù)變化,并阻塞。
當(dāng)前一位節(jié)點(diǎn)被刪除時(shí),我們需要通過遞歸來判斷自己創(chuàng)建的節(jié)點(diǎn)是否在是最小的,如果是則執(zhí)行5);如果不是則執(zhí)行6)(就是遞歸循環(huán)的判斷)
下面是具體的流程圖:

2.1.3.事例
public class DistributedLock {
// 常亮
static class Constant {
private static final int SESSION_TIMEOUT = 10000;
private static final String CONNECTION_STRING = "127.0.0.1:2181";
private static final String LOCK_NODE = "/distributed_lock";
private static final String CHILDREN_NODE = "/task_";
}
private ZkClient zkClient;
public DistributedLock() {
// 連接到Zookeeper
zkClient = new ZkClient(new ZkConnection(Constant.CONNECTION_STRING));
if(!zkClient.exists(Constant.LOCK_NODE)) {
zkClient.create(Constant.LOCK_NODE, "分布式鎖節(jié)點(diǎn)", CreateMode.PERSISTENT);
}
}
public String getLock() {
try {
// 1。在Zookeeper指定節(jié)點(diǎn)下創(chuàng)建臨時(shí)順序節(jié)點(diǎn)
String lockName = zkClient.createEphemeralSequential(Constant.LOCK_NODE + Constant.CHILDREN_NODE, "");
// 嘗試獲取鎖
acquireLock(lockName);
return lockName;
} catch(Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 獲取鎖
* @throws InterruptedException
*/
public Boolean acquireLock(String lockName) throws InterruptedException {
// 2.獲取lock節(jié)點(diǎn)下的所有子節(jié)點(diǎn)
List<String> childrenList = zkClient.getChildren(Constant.LOCK_NODE);
// 3.對(duì)子節(jié)點(diǎn)進(jìn)行排序,獲取最小值
Collections.sort(childrenList, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return Integer.parseInt(o1.split("_")[1]) - Integer.parseInt(o2.split("_")[1]);
}
});
// 4.判斷當(dāng)前創(chuàng)建的節(jié)點(diǎn)是否在第一位
int lockPostion = childrenList.indexOf(lockName.split("/")[lockName.split("/").length - 1]);
if(lockPostion < 0) {
// 不存在該節(jié)點(diǎn)
throw new ZkNodeExistsException("不存在的節(jié)點(diǎn):" + lockName);
} else if (lockPostion == 0) {
// 獲取到鎖
System.out.println("獲取到鎖:" + lockName);
return true;
} else if (lockPostion > 0) {
// 未獲取到鎖,阻塞
System.out.println("...... 未獲取到鎖,阻塞等待 。。。。。。");
// 5.如果未獲取得到鎖,監(jiān)聽當(dāng)前創(chuàng)建的節(jié)點(diǎn)前一位的節(jié)點(diǎn)
final CountDownLatch latch = new CountDownLatch(1);
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
// 6.前一個(gè)節(jié)點(diǎn)被刪除,當(dāng)不保證輪到自己
System.out.println("。。。。。。前一個(gè)節(jié)點(diǎn)被刪除 。。。。。。");
acquireLock(lockName);
latch.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
// 不用理會(huì)
}
};
try {
zkClient.subscribeDataChanges(Constant.LOCK_NODE + "/" + childrenList.get(lockPostion - 1), listener);
latch.await();
} finally {
zkClient.unsubscribeDataChanges(Constant.LOCK_NODE + "/" + childrenList.get(lockPostion - 1), listener);
}
}
return false;
}
/**
* 釋放鎖(刪除節(jié)點(diǎn))
*
* @param lockName
*/
public void releaseLock(String lockName) {
zkClient.delete(lockName);
}
public void closeZkClient() {
zkClient.close();
}
}
@SpringBootApplication
public class ZookeeperDemoApplication {
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(ZookeeperDemoApplication.class, args);
DistributedLock lock = new DistributedLock();
String lockName = lock.getLock();
/**
* 執(zhí)行我們的業(yè)務(wù)邏輯
*/
if(lockName != null) {
lock.releaseLock(lockName);
}
lock.closeZkClient();
}
}
2.3.分布式隊(duì)列
在日常使用中,特別是像生產(chǎn)者消費(fèi)者模式中,經(jīng)常會(huì)使用BlockingQueue來充當(dāng)緩沖區(qū)的角色。但是在分布式系統(tǒng)中這種方式就不能使用BlockingQueue來實(shí)現(xiàn)了,但是Zookeeper可以實(shí)現(xiàn)。
2.1.1.思路
首先利用Zookeeper中臨時(shí)順序節(jié)點(diǎn)的特點(diǎn) 當(dāng)生產(chǎn)者創(chuàng)建節(jié)點(diǎn)生產(chǎn)時(shí),需要判斷父節(jié)點(diǎn)下臨時(shí)順序子節(jié)點(diǎn)的個(gè)數(shù),如果達(dá)到了上限,則阻塞等待;如果沒有達(dá)到,就創(chuàng)建節(jié)點(diǎn)。 當(dāng)消費(fèi)者獲取節(jié)點(diǎn)時(shí),如果父節(jié)點(diǎn)中不存在臨時(shí)順序子節(jié)點(diǎn),則阻塞等待;如果有子節(jié)點(diǎn),則獲取執(zhí)行自己的業(yè)務(wù),執(zhí)行完畢后刪除該節(jié)點(diǎn)即可。 獲取時(shí)獲取最小值,保證FIFO特性。
2.1.2.事例
這個(gè)是一個(gè)消費(fèi)者對(duì)一個(gè)生產(chǎn)者,如果是多個(gè)消費(fèi)者對(duì)多個(gè)生產(chǎn)者,對(duì)代碼需要調(diào)整。
public interface AppConstant {
static String ZK_CONNECT_STR = "127.0.0.1:2181";
static String NODE_PATH = "/mailbox";
static String CHILD_NODE_PATH = "/mail_";
static int MAILBOX_SIZE = 10;
}
public class MailConsumer implements Runnable, AppConstant{
private ZkClient zkClient;
private Lock lock;
private Condition condition;
public MailConsumer() {
lock = new ReentrantLock();
condition = lock.newCondition();
zkClient = new ZkClient(new ZkConnection(ZK_CONNECT_STR));
System.out.println("sucess connected to zookeeper server!");
// 不存在就創(chuàng)建mailbox節(jié)點(diǎn)
if(!zkClient.exists(NODE_PATH)) {
zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);
}
}
@Override
public void run() {
IZkChildListener listener = new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("Znode["+parentPath + "] size:" + currentChilds.size());
// 還是要判斷郵箱是否為空
if(currentChilds.size() > 0) {
// 喚醒等待的線程
try {
lock.lock();
condition.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
};
// 監(jiān)視子節(jié)點(diǎn)的改變,不用放用while循環(huán)中,監(jiān)聽一次就行了,不需要重復(fù)綁定
zkClient.subscribeChildChanges(NODE_PATH, listener);
try {
//循環(huán)隨機(jī)發(fā)送郵件模擬真是情況
while(true) {
// 判斷是否可以發(fā)送郵件
checkMailReceive();
// 接受郵件
List<String> mailList = zkClient.getChildren(NODE_PATH);
// 如果mailsize==0,也沒有關(guān)系;可以直接循環(huán)獲取就行了
if(mailList.size() > 0) {
Collections.sort(mailList, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return Integer.parseInt(o1.split("_")[1]) - Integer.parseInt(o2.split("_")[1]);
}
});
// 模擬郵件處理(0-1S)
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
zkClient.delete(NODE_PATH + "/" + mailList.get(0));
System.out.println("mail has been received:" + NODE_PATH + "/" + mailList.get(0));
}
}
}catch (Exception e) {
e.printStackTrace();
} finally {
zkClient.unsubscribeChildChanges(NODE_PATH, listener);
}
}
private void checkMailReceive() {
try {
lock.lock();
// 判斷郵箱是為空
List<String> mailList = zkClient.getChildren(NODE_PATH);
System.out.println("mailbox size: " + mailList.size());
if(mailList.size() == 0) {
// 郵箱為空,阻塞消費(fèi)者,直到郵箱有郵件
System.out.println("mailbox is empty, please wait 。。。");
condition.await();
// checkMailReceive();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class MailProducer implements Runnable, AppConstant{
private ZkClient zkClient;
private Lock lock;
private Condition condition;
/**
* 初始化狀態(tài)
*/
public MailProducer() {
lock = new ReentrantLock();
condition = lock.newCondition();
zkClient = new ZkClient(new ZkConnection(ZK_CONNECT_STR));
System.out.println("sucess connected to zookeeper server!");
// 不存在就創(chuàng)建mailbox節(jié)點(diǎn)
if(!zkClient.exists(NODE_PATH)) {
zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);
}
}
@Override
public void run() {
IZkChildListener listener = new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("Znode["+parentPath + "] size:" + currentChilds.size());
// 還是要判斷郵箱是否已滿
if(currentChilds.size() < MAILBOX_SIZE) {
// 喚醒等待的線程
try {
lock.lock();
condition.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
};
// 監(jiān)視子節(jié)點(diǎn)的改變,不用放用while循環(huán)中,監(jiān)聽一次就行了,不需要重復(fù)綁定
zkClient.subscribeChildChanges(NODE_PATH, listener);
try {
//循環(huán)隨機(jī)發(fā)送郵件模擬真是情況
while(true) {
// 判斷是否可以發(fā)送郵件
checkMailSend();
// 發(fā)送郵件
String cretePath = zkClient.createEphemeralSequential(NODE_PATH + CHILD_NODE_PATH, "your mail");
System.out.println("your mail has been send:" + cretePath);
// 模擬隨機(jī)間隔的發(fā)送郵件(0-10S)
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
}
}catch (Exception e) {
e.printStackTrace();
} finally {
zkClient.unsubscribeChildChanges(NODE_PATH, listener);
}
}
private void checkMailSend() {
try {
lock.lock();
// 判斷郵箱是否已滿
List<String> mailList = zkClient.getChildren(NODE_PATH);
System.out.println("mailbox size: " + mailList.size());
if(mailList.size() >= MAILBOX_SIZE) {
// 郵箱已滿,阻塞生產(chǎn)者,直到郵箱有空間
System.out.println("mailbox is full, please wait 。。。");
condition.await();
checkMailSend();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
2.4.均衡負(fù)載
首先我們需要簡(jiǎn)單的理解分布式和集群,通俗點(diǎn)說:分布式就是將一個(gè)系統(tǒng)拆分到多個(gè)獨(dú)立運(yùn)行的應(yīng)用中(有可能在同一臺(tái)主機(jī)也有可能在不同的主機(jī)上),集群就是將單個(gè)獨(dú)立的應(yīng)用復(fù)制多分放在不同的主機(jī)上來減輕服務(wù)器的壓力。
而Zookeeper不僅僅可以作為分布式集群的服務(wù)注冊(cè)調(diào)度中心(例如dubbo),也可以實(shí)現(xiàn)集群的負(fù)載均衡。
2.4.1.思路
首先我們要理解,如果是一個(gè)集群,那么他就會(huì)有多臺(tái)主機(jī)。所以,他在Zookeeper中信息的存在應(yīng)該是如下所示:
如上的結(jié)構(gòu),當(dāng)服務(wù)調(diào)用方調(diào)用服務(wù)時(shí),就可以根據(jù)特定的均衡負(fù)載算法來實(shí)現(xiàn)對(duì)服務(wù)的調(diào)用(調(diào)用前需要監(jiān)聽/service/serviceXXX節(jié)點(diǎn),以更新列表數(shù)據(jù))
2.4.2.事例
/**
* 服務(wù)提供者
*
* @author Administrator
*
*/
public class ServiceProvider {
// 靜態(tài)常量
static String ZK_CONNECT_STR = "127.0.0.1:2181";
static String NODE_PATH = "/service";
static String SERIVCE_NAME = "/myService";
private ZkClient zkClient;
public ServiceProvider() {
zkClient = new ZkClient(new ZkConnection(ZK_CONNECT_STR));
System.out.println("sucess connected to zookeeper server!");
// 不存在就創(chuàng)建NODE_PATH節(jié)點(diǎn)
if(!zkClient.exists(NODE_PATH)) {
zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);
}
}
public void registryService(String localIp, Object obj) {
if(!zkClient.exists(NODE_PATH + SERIVCE_NAME)) {
zkClient.create(NODE_PATH + SERIVCE_NAME, "provider services list", CreateMode.PERSISTENT);
}
// 對(duì)自己的服務(wù)進(jìn)行注冊(cè)
zkClient.createEphemeral(NODE_PATH + SERIVCE_NAME + "/" + localIp, obj);
System.out.println("注冊(cè)成功![" + localIp + "]");
}
}
/**
* 消費(fèi)者,通過某種均衡負(fù)載算法選擇某一個(gè)提供者
*
* @author Administrator
*
*/
public class ServiceConsumer {
// 靜態(tài)常量
static String ZK_CONNECT_STR = "127.0.0.1:2181";
static String NODE_PATH = "/service";
static String SERIVCE_NAME = "/myService";
private List<String> serviceList = new ArrayList<String>();
private ZkClient zkClient;
public ServiceConsumer() {
zkClient = new ZkClient(new ZkConnection(ZK_CONNECT_STR));
System.out.println("sucess connected to zookeeper server!");
// 不存在就創(chuàng)建NODE_PATH節(jié)點(diǎn)
if(!zkClient.exists(NODE_PATH)) {
zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);
}
}
/**
* 訂閱服務(wù)
*/
public void subscribeSerivce() {
serviceList = zkClient.getChildren(NODE_PATH + SERIVCE_NAME);
zkClient.subscribeChildChanges(NODE_PATH + SERIVCE_NAME, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
serviceList = currentChilds;
}
});
}
/**
* 模擬調(diào)用服務(wù)
*/
public void consume() {
//負(fù)載均衡算法獲取某臺(tái)機(jī)器調(diào)用服務(wù)
int index = new Random().nextInt(serviceList.size());
System.out.println("調(diào)用[" + NODE_PATH + SERIVCE_NAME + "]服務(wù):" + serviceList.get(index));
}
}
3.總結(jié)
Zookeeper是一個(gè)功能非常強(qiáng)大的應(yīng)用,除了上面幾種應(yīng)用外,還有命名服務(wù)、分布式協(xié)調(diào)通知等也是常用的場(chǎng)景。
原文鏈接:https://blog.csdn.net/u013468915/article/details/80955110
看完本文有收獲?請(qǐng)轉(zhuǎn)發(fā)分享給更多人
往期資源:
