<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Zookeeper 最典型的應用場景(理論 + 實戰(zhàn))

          共 25245字,需瀏覽 51分鐘

           ·

          2021-08-24 19:13

          點擊關(guān)注公眾號,Java干貨及時送達

          1.前言

          之前自己寫了一些關(guān)于Zookeeper的基礎(chǔ)知識,Zookeeper作為一種協(xié)調(diào)分布式應用高性能的調(diào)度服務,實際的應用場景也非常的廣泛,這里主要通過幾個例子來具體的說明Zookeeper在特定場景下的使用方式(下面的這些功能估計consul和etcd也能實現(xiàn),以后學到了再說吧)。

          2.具體應用

          2.1.一致性配置管理

          我們在開發(fā)的時候,有時候需要獲取一些公共的配置,比如數(shù)據(jù)庫連接信息等,并且偶然可能需要更新配置。如果我們的服務器有N多臺的話,那修改起來會特別的麻煩,并且還需要重新啟動。這里Zookeeper就可以很方便的實現(xiàn)類似的功能。

          2.1.1.思路

          將公共的配置存放在Zookeeper的節(jié)點中

          應用程序可以連接到Zookeeper中并對Zookeeper中配置節(jié)點進行讀取或者修改(對于寫操作可以進行權(quán)限驗證設(shè)置),下面是具體的流程圖:

          2.1.2.事例

          數(shù)據(jù)庫配置信息一致性的維護

          配置類:

          public class CommonConfig implements Serializable{
           // 數(shù)據(jù)庫連接配置
           private String dbUrl;
           private String username;
           private String password;
           private String driverClass;

           public CommonConfig() {}

           public CommonConfig(String dbUrl, String username, String password, String driverClass) {
            super();
            this.dbUrl = dbUrl;
            this.username = username;
            this.password = password;
            this.driverClass = driverClass;
           }

           public String getDbUrl() {
            return dbUrl;
           }

           public void setDbUrl(String dbUrl) {
            this.dbUrl = dbUrl;
           }

           public String getUsername() {
            return username;
           }

           public void setUsername(String username) {
            this.username = username;
           }

           public String getPassword() {
            return password;
           }

           public void setPassword(String password) {
            this.password = password;
           }

           public String getDriverClass() {
            return driverClass;
           }

           public void setDriverClass(String driverClass) {
            this.driverClass = driverClass;
           }

           @Override
           public String toString() {
            return "CommonConfig:{dbUrl:" + this.dbUrl +
              ", username:" + this.username +
              ", password:" + this.password +
              ", driverClass:" + this.driverClass + "}";
           }
          }

          配置管理中心

          同步配置信息到Zookeeper服務器

          public class ZkConfigMng {
           private String nodePath = "/commConfig";
           private CommonConfig commonConfig;
           private ZkClient zkClient;

           public CommonConfig initConfig(CommonConfig commonConfig) {
            if(commonConfig == null) {
             this.commonConfig = new CommonConfig("jdbc:mysql://127.0.0.1:3306/mydata?useUnicode=true&characterEncoding=utf-8",
               "root""root""com.mysql.jdbc.Driver");
            } else {
             this.commonConfig = commonConfig;
            }
            return this.commonConfig;
           }

           /**
            * 更新配置
            *
            * @param commonConfig
            * @return
            */
           public CommonConfig update(CommonConfig commonConfig) {
            if(commonConfig != null) {
             this.commonConfig = commonConfig;
            }
            syncConfigToZookeeper();
            return this.commonConfig;
           }

           public void syncConfigToZookeeper() {
            if(zkClient == null) {
             zkClient = new ZkClient("127.0.0.1:2181");
            }
            if(!zkClient.exists(nodePath)) {
             zkClient.createPersistent(nodePath);
            }
            zkClient.writeData(nodePath, commonConfig);
           }
          }

          以上是提供者,下面我們需要一個客戶端獲取這些配置

          public class ZkConfigClient implements Runnable {

           private String nodePath = "/commConfig";

           private CommonConfig commonConfig;

           @Override
           public void run() {
            ZkClient zkClient = new ZkClient(new ZkConnection("127.0.0.1:2181", 5000));
            while (!zkClient.exists(nodePath)) {
             System.out.println("配置節(jié)點不存在!");
             try {
              TimeUnit.SECONDS.sleep(1);
             } catch (InterruptedException e) {
              e.printStackTrace();
             }
            }
            // 獲取節(jié)點
            commonConfig = (CommonConfig)zkClient.readData(nodePath);
            System.out.println(commonConfig.toString());
            zkClient.subscribeDataChanges(nodePath, new IZkDataListener() {

             @Override
             public void handleDataDeleted(String dataPath) throws Exception {
              if(dataPath.equals(nodePath)) {
               System.out.println("節(jié)點:" + dataPath + "被刪除了!");
              }
             }

             @Override
             public void handleDataChange(String dataPath, Object data) throws Exception {
              if(dataPath.equals(nodePath)) {
               System.out.println("節(jié)點:" + dataPath + ", 數(shù)據(jù):" + data + " - 更新");
               commonConfig = (CommonConfig) data;
              }
             }
            });
           }

          }

          下面啟動Main函數(shù)

          配置管理服務啟動

          public static void main(String[] args) throws InterruptedException {
            SpringApplication.run(ZookeeperApiDemoApplication.class, args);

            ZkConfigMng zkConfigMng = new ZkConfigMng();
            zkConfigMng.initConfig(null);
            zkConfigMng.syncConfigToZookeeper();
            TimeUnit.SECONDS.sleep(10);

            // 修改值
            zkConfigMng.update(new CommonConfig("jdbc:mysql://192.168.1.122:3306/mydata?useUnicode=true&characterEncoding=utf-8",
              "root""wxh""com.mysql.jdbc.Driver"));
           }
          }

          客戶端啟動:

          public static void main(String[] args) throws InterruptedException {
            SpringApplication.run(ZookeeperApiDemoApplication.class, args);

            ExecutorService executorService = Executors.newFixedThreadPool(3);
            // 模擬多個客戶端獲取配置
            executorService.submit(new ZkConfigClient());
            executorService.submit(new ZkConfigClient());
            executorService.submit(new ZkConfigClient());
           }
          }

          2.2.分布式鎖

          在我們?nèi)粘5拈_發(fā)中,如果是單個進程中對共享資源的訪問,我們只需要用synchronized或者lock就能實現(xiàn)互斥操作。但是對于跨進程、跨主機、跨網(wǎng)絡的共享資源似乎就無能為力了。

          另外,分布式系列面試題和答案全部整理好了,微信搜索Java技術(shù)棧,在后臺發(fā)送:面試,可以在線閱讀。

          2.1.1.思路
          • 首先zookeeper中我們可以創(chuàng)建一個/distributed_lock持久化節(jié)點
          • 然后再在/distributed_lock節(jié)點下創(chuàng)建自己的臨時順序節(jié)點,比如:/distributed_lock/task_00000000008
          • 獲取所有的/distributed_lock下的所有子節(jié)點,并排序
          • 判讀自己創(chuàng)建的節(jié)點是否最小值(第一位)
          • 如果是,則獲取得到鎖,執(zhí)行自己的業(yè)務邏輯,最后刪除這個臨時節(jié)點。
          • 如果不是最小值,則需要監(jiān)聽自己創(chuàng)建節(jié)點前一位節(jié)點的數(shù)據(jù)變化,并阻塞。
          • 當前一位節(jié)點被刪除時,我們需要通過遞歸來判斷自己創(chuàng)建的節(jié)點是否在是最小的,如果是則執(zhí)行5);如果不是則執(zhí)行6)(就是遞歸循環(huán)的判斷)

          下面是具體的流程圖:

          2.1.3.事例
          public class DistributedLock {

           // 常亮
           static class Constant {
            private static final int SESSION_TIMEOUT = 10000;
            private static final String CONNECTION_STRING = "127.0.0.1:2181";
            private static final String LOCK_NODE = "/distributed_lock";
            private static final String CHILDREN_NODE = "/task_";
           }

           private ZkClient zkClient;

           public DistributedLock() {
            // 連接到Zookeeper
            zkClient = new ZkClient(new ZkConnection(Constant.CONNECTION_STRING));
            if(!zkClient.exists(Constant.LOCK_NODE)) {
             zkClient.create(Constant.LOCK_NODE, "分布式鎖節(jié)點", CreateMode.PERSISTENT);
            }
           }

           public String getLock() {
            try {
             // 1。在Zookeeper指定節(jié)點下創(chuàng)建臨時順序節(jié)點
             String lockName = zkClient.createEphemeralSequential(Constant.LOCK_NODE + Constant.CHILDREN_NODE, "");
             // 嘗試獲取鎖
             acquireLock(lockName);
             return lockName;
            } catch(Exception e) {
             e.printStackTrace();
            }

            return null;
           }

           /**
            * 獲取鎖
            * @throws InterruptedException
            */
           public Boolean acquireLock(String lockName) throws InterruptedException {
            // 2.獲取lock節(jié)點下的所有子節(jié)點
            List<String> childrenList = zkClient.getChildren(Constant.LOCK_NODE);
            // 3.對子節(jié)點進行排序,獲取最小值
            Collections.sort(childrenList, new Comparator<String>() {
             @Override
             public int compare(String o1, String o2) {
              return Integer.parseInt(o1.split("_")[1]) - Integer.parseInt(o2.split("_")[1]);
             }

            });
            // 4.判斷當前創(chuàng)建的節(jié)點是否在第一位
            int lockPostion = childrenList.indexOf(lockName.split("/")[lockName.split("/").length - 1]);
            if(lockPostion < 0) {
             // 不存在該節(jié)點
             throw new ZkNodeExistsException("不存在的節(jié)點:" + lockName);
            } else if (lockPostion == 0) {
             // 獲取到鎖
             System.out.println("獲取到鎖:" + lockName);
             return true;
            } else if (lockPostion > 0) {
             // 未獲取到鎖,阻塞
             System.out.println("...... 未獲取到鎖,阻塞等待 。。。。。。");
             // 5.如果未獲取得到鎖,監(jiān)聽當前創(chuàng)建的節(jié)點前一位的節(jié)點
             final CountDownLatch latch = new CountDownLatch(1);
             IZkDataListener listener = new IZkDataListener() {

              @Override
              public void handleDataDeleted(String dataPath) throws Exception {
               // 6.前一個節(jié)點被刪除,當不保證輪到自己
               System.out.println("。。。。。。前一個節(jié)點被刪除  。。。。。。");
               acquireLock(lockName);
               latch.countDown();
              }

              @Override
              public void handleDataChange(String dataPath, Object data) throws Exception {
               // 不用理會
              }
             };
             try {
              zkClient.subscribeDataChanges(Constant.LOCK_NODE + "/" + childrenList.get(lockPostion - 1), listener);
              latch.await();
             } finally {
              zkClient.unsubscribeDataChanges(Constant.LOCK_NODE + "/" + childrenList.get(lockPostion - 1), listener);
             }
            }
            return false;
           }

           /**
            * 釋放鎖(刪除節(jié)點)
            *
            * @param lockName
            */
           public void releaseLock(String lockName) {
            zkClient.delete(lockName);
           }

           public void closeZkClient() {
            zkClient.close();
           }
          }

          @SpringBootApplication
          public class ZookeeperDemoApplication {

           public static void main(String[] args) throws InterruptedException {
            SpringApplication.run(ZookeeperDemoApplication.class, args);

            DistributedLock lock = new DistributedLock();
            String lockName = lock.getLock();
            /**
             * 執(zhí)行我們的業(yè)務邏輯
             */
            if(lockName != null) {
             lock.releaseLock(lockName);
            }

            lock.closeZkClient();
           }
          }

          2.3.分布式隊列

          在日常使用中,特別是像生產(chǎn)者消費者模式中,經(jīng)常會使用BlockingQueue來充當緩沖區(qū)的角色。但是在分布式系統(tǒng)中這種方式就不能使用BlockingQueue來實現(xiàn)了,但是Zookeeper可以實現(xiàn)。

          點擊關(guān)注公眾號,Java干貨及時送達

          2.1.1.思路
          • 首先利用Zookeeper中臨時順序節(jié)點的特點
          • 當生產(chǎn)者創(chuàng)建節(jié)點生產(chǎn)時,需要判斷父節(jié)點下臨時順序子節(jié)點的個數(shù),如果達到了上限,則阻塞等待;如果沒有達到,就創(chuàng)建節(jié)點。
          • 當消費者獲取節(jié)點時,如果父節(jié)點中不存在臨時順序子節(jié)點,則阻塞等待;如果有子節(jié)點,則獲取執(zhí)行自己的業(yè)務,執(zhí)行完畢后刪除該節(jié)點即可。
          • 獲取時獲取最小值,保證FIFO特性。
          2.1.2.事例

          這個是一個消費者對一個生產(chǎn)者,如果是多個消費者對多個生產(chǎn)者,對代碼需要調(diào)整。Spring Boot 學習筆記,分享給你。

          public interface AppConstant {
           static String ZK_CONNECT_STR = "127.0.0.1:2181";
           static String NODE_PATH = "/mailbox";
           static String CHILD_NODE_PATH = "/mail_";
           static int MAILBOX_SIZE = 10;
          }

          public class MailConsumer implements Runnable, AppConstant{

           private ZkClient zkClient;
           private Lock lock;
           private Condition condition;

           public MailConsumer() {
            lock = new ReentrantLock();
            condition = lock.newCondition();
            zkClient = new ZkClient(new ZkConnection(ZK_CONNECT_STR));
            System.out.println("sucess connected to zookeeper server!");
            // 不存在就創(chuàng)建mailbox節(jié)點
            if(!zkClient.exists(NODE_PATH)) {
             zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);
            }
           }

           @Override
           public void run() {
            IZkChildListener listener = new IZkChildListener() {
             @Override
             public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
              System.out.println("Znode["+parentPath + "] size:" + currentChilds.size());
              // 還是要判斷郵箱是否為空
              if(currentChilds.size() > 0) {
               // 喚醒等待的線程
               try {
                lock.lock();
                condition.signal();
               } catch (Exception e) {
                e.printStackTrace();
               } finally {
                lock.unlock();
               }
              }
             }
            };
            // 監(jiān)視子節(jié)點的改變,不用放用while循環(huán)中,監(jiān)聽一次就行了,不需要重復綁定
            zkClient.subscribeChildChanges(NODE_PATH, listener);
            try {
             //循環(huán)隨機發(fā)送郵件模擬真是情況
             while(true) {
              // 判斷是否可以發(fā)送郵件
              checkMailReceive();
              // 接受郵件
              List<String> mailList = zkClient.getChildren(NODE_PATH);
              // 如果mailsize==0,也沒有關(guān)系;可以直接循環(huán)獲取就行了
              if(mailList.size() > 0) {
               Collections.sort(mailList, new Comparator<String>() {
                @Override
                public int compare(String o1, String o2) {
                 return Integer.parseInt(o1.split("_")[1]) - Integer.parseInt(o2.split("_")[1]);
                }
               });
               // 模擬郵件處理(0-1S)
               TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
               zkClient.delete(NODE_PATH + "/" + mailList.get(0));
               System.out.println("mail has been received:" + NODE_PATH + "/" + mailList.get(0));
              }
             }
            }catch (Exception e) {
             e.printStackTrace();
            } finally {
             zkClient.unsubscribeChildChanges(NODE_PATH, listener);
            }
           }

           private void checkMailReceive() {
            try {
             lock.lock();
             // 判斷郵箱是為空
             List<String> mailList = zkClient.getChildren(NODE_PATH);
             System.out.println("mailbox size: " + mailList.size());
             if(mailList.size() == 0) {
              // 郵箱為空,阻塞消費者,直到郵箱有郵件
              System.out.println("mailbox is empty, please wait 。。。");
              condition.await();
              // checkMailReceive();
             }
            } catch (Exception e) {
             e.printStackTrace();
            } finally {
             lock.unlock();
            }
           }
          }

          public class MailProducer implements Runnable, AppConstant{

           private ZkClient zkClient;
           private Lock lock;
           private Condition condition;

           /**
            * 初始化狀態(tài)
            */
           public MailProducer() {
            lock = new ReentrantLock();
            condition = lock.newCondition();
            zkClient = new ZkClient(new ZkConnection(ZK_CONNECT_STR));
            System.out.println("sucess connected to zookeeper server!");
            // 不存在就創(chuàng)建mailbox節(jié)點
            if(!zkClient.exists(NODE_PATH)) {
             zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);
            }
           }

           @Override
           public void run() {
            IZkChildListener listener = new IZkChildListener() {
             @Override
             public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
              System.out.println("Znode["+parentPath + "] size:" + currentChilds.size());
              // 還是要判斷郵箱是否已滿
              if(currentChilds.size() < MAILBOX_SIZE) {
               // 喚醒等待的線程
               try {
                lock.lock();
                condition.signal();
               } catch (Exception e) {
                e.printStackTrace();
               } finally {
                lock.unlock();
               }
              }
             }
            };
            // 監(jiān)視子節(jié)點的改變,不用放用while循環(huán)中,監(jiān)聽一次就行了,不需要重復綁定
            zkClient.subscribeChildChanges(NODE_PATH, listener);
            try {
             //循環(huán)隨機發(fā)送郵件模擬真是情況
             while(true) {
              // 判斷是否可以發(fā)送郵件
              checkMailSend();
              // 發(fā)送郵件
              String cretePath = zkClient.createEphemeralSequential(NODE_PATH + CHILD_NODE_PATH, "your mail");
              System.out.println("your mail has been send:" + cretePath);
              // 模擬隨機間隔的發(fā)送郵件(0-10S)
              TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
             }
            }catch (Exception e) {
             e.printStackTrace();
            } finally {
             zkClient.unsubscribeChildChanges(NODE_PATH, listener);
            }
           }

           private void checkMailSend() {
            try {
             lock.lock();
             // 判斷郵箱是否已滿
             List<String> mailList = zkClient.getChildren(NODE_PATH);
             System.out.println("mailbox size: " + mailList.size());
             if(mailList.size() >= MAILBOX_SIZE) {
              // 郵箱已滿,阻塞生產(chǎn)者,直到郵箱有空間
              System.out.println("mailbox is full, please wait 。。。");
              condition.await();
              checkMailSend();
             }
            } catch (Exception e) {
             e.printStackTrace();
            } finally {
             lock.unlock();
            }
           }
          }

          2.4.均衡負載

          首先我們需要簡單的理解分布式和集群,通俗點說:分布式就是將一個系統(tǒng)拆分到多個獨立運行的應用中(有可能在同一臺主機也有可能在不同的主機上),集群就是將單個獨立的應用復制多分放在不同的主機上來減輕服務器的壓力。

          而Zookeeper不僅僅可以作為分布式集群的服務注冊調(diào)度中心(例如dubbo),也可以實現(xiàn)集群的負載均衡。另外,Zookeeper 系列面試題和答案全部整理好了,微信搜索Java技術(shù)棧,在后臺發(fā)送:面試,可以在線閱讀。

          2.4.1.思路

          首先我們要理解,如果是一個集群,那么他就會有多臺主機。所以,他在Zookeeper中信息的存在應該是如下所示:

          如上的結(jié)構(gòu),當服務調(diào)用方調(diào)用服務時,就可以根據(jù)特定的均衡負載算法來實現(xiàn)對服務的調(diào)用(調(diào)用前需要監(jiān)聽/service/serviceXXX節(jié)點,以更新列表數(shù)據(jù))

          2.4.2.事例
          /**
           * 服務提供者
           *
           * @author Administrator
           *
           */
          public class ServiceProvider {
           // 靜態(tài)常量
           static String ZK_CONNECT_STR = "127.0.0.1:2181";
           static String NODE_PATH = "/service";
           static String SERIVCE_NAME = "/myService";

           private ZkClient zkClient;

           public ServiceProvider() {
            zkClient = new ZkClient(new ZkConnection(ZK_CONNECT_STR));
            System.out.println("sucess connected to zookeeper server!");
            // 不存在就創(chuàng)建NODE_PATH節(jié)點
            if(!zkClient.exists(NODE_PATH)) {
             zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);
            }
           }

           public void registryService(String localIp, Object obj) {
            if(!zkClient.exists(NODE_PATH + SERIVCE_NAME)) {
             zkClient.create(NODE_PATH + SERIVCE_NAME, "provider services list", CreateMode.PERSISTENT);
            }
            // 對自己的服務進行注冊
            zkClient.createEphemeral(NODE_PATH + SERIVCE_NAME + "/" + localIp, obj);
            System.out.println("注冊成功![" + localIp + "]");
           }
          }

          /**
           * 消費者,通過某種均衡負載算法選擇某一個提供者
           *
           * @author Administrator
           *
           */
          public class ServiceConsumer {
           // 靜態(tài)常量
           static String ZK_CONNECT_STR = "127.0.0.1:2181";
           static String NODE_PATH = "/service";
           static String SERIVCE_NAME = "/myService";

           private List<String> serviceList = new ArrayList<String>();

           private ZkClient zkClient;

           public ServiceConsumer() {
            zkClient = new ZkClient(new ZkConnection(ZK_CONNECT_STR));
            System.out.println("sucess connected to zookeeper server!");
            // 不存在就創(chuàng)建NODE_PATH節(jié)點
            if(!zkClient.exists(NODE_PATH)) {
             zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);
            }
           }

           /**
            * 訂閱服務
            */
           public void subscribeSerivce() {
            serviceList = zkClient.getChildren(NODE_PATH + SERIVCE_NAME);
            zkClient.subscribeChildChanges(NODE_PATH + SERIVCE_NAME, new IZkChildListener() {
             @Override
             public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
              serviceList = currentChilds;
             }
            });
           }

           /**
            * 模擬調(diào)用服務
            */
           public void consume() {
            //負載均衡算法獲取某臺機器調(diào)用服務
            int index = new Random().nextInt(serviceList.size());
            System.out.println("調(diào)用[" + NODE_PATH + SERIVCE_NAME + "]服務:" + serviceList.get(index));
           }
          }

          3.總結(jié)

          Zookeeper是一個功能非常強大的應用,除了上面幾種應用外,還有命名服務、分布式協(xié)調(diào)通知等也是常用的場景。另外,關(guān)注公眾號Java技術(shù)棧,在后臺回復:面試,可以獲取我整理的 Java/ Zookeeper 系列面試題和答案,非常齊全。

          原文鏈接:https://blog.csdn.net/u013468915/article/details/80955110

          版權(quán)聲明:本文為CSDN博主「永遠_不會懂」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接及本聲明。






          關(guān)注Java技術(shù)??锤喔韶?/strong>



          獲取 Spring Boot 實戰(zhàn)筆記!
          瀏覽 49
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  一级电影毛片 | 人妻熟女一二三区夜夜爱 | 亚洲影院每日更新 | 国产精品午夜在线观看 | 做爱网站免费 |