<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          分布式事務、分布式鎖、分布式session

          共 11774字,需瀏覽 24分鐘

           ·

          2020-09-05 01:29


          點擊上方藍色“程序猿DD”,選擇“設為星標”

          回復“資源”獲取獨家整理的學習資料!

          來源 |?cnblogs.com/heqiyoujing/p/10917102.html

          一、分布式session

          session 是啥?瀏覽器有個 cookie,在一段時間內這個 cookie 都存在,然后每次發(fā)請求過來都帶上一個特殊的?jsessionid cookie,就根據這個東西,在服務端可以維護一個對應的 session 域,里面可以放點數據。

          一般的話只要你沒關掉瀏覽器,cookie 還在,那么對應的那個 session 就在,但是如果 cookie 沒了,session 也就沒了。常見于什么購物車之類的東西,還有登錄狀態(tài)保存之類的。

          這個不多說了,懂 Java 的都該知道這個。

          單塊系統(tǒng)的時候這么玩兒 session 沒問題,但是你要是分布式系統(tǒng)呢,那么多的服務,session 狀態(tài)在哪兒維護啊?

          (1)完全不用 session

          使用 JWT Token 儲存用戶身份,然后再從數據庫或者 cache 中獲取其他的信息。這樣無論請求分配到哪個服務器都無所謂

          (2)tomcat + redis

          這個其實還挺方便的,就是使用 session 的代碼,跟以前一樣,還是基于 tomcat 原生的 session 支持即可,然后就是用一個叫做?Tomcat RedisSessionManager?的東西,讓所有我們部署的 tomcat 都將 session 數據存儲到 redis 即可。

          在 tomcat 的配置文件中配置:

          <Valve?className="com.orangefunction.tomcat.redissessions.RedisSessionHandlerValve"?/>

          <Manager?className="com.orangefunction.tomcat.redissessions.RedisSessionManager"
          ?????????host="{redis.host}"
          ?????????port="{redis.port}"
          ?????????database="{redis.dbnum}"
          ?????????maxInactiveInterval="60"/>

          然后指定 redis 的 host 和 port 就 ok 了。

          <Valve?className="com.orangefunction.tomcat.redissessions.RedisSessionHandlerValve"?/>

          <Manager?className="com.orangefunction.tomcat.redissessions.RedisSessionManager"
          ?????????sentinelMaster="mymaster"
          ?????????sentinels=":26379,:26379,:26379"
          ?????????maxInactiveInterval="60"/>

          還可以用上面這種方式基于 redis 哨兵支持的 redis 高可用集群來保存 session 數據,都是 ok 的。

          (3)spring?session + redis

          上面所說的第二種方式會與 tomcat 容器重耦合,如果我要將 web 容器遷移成 jetty,難道還要重新把 jetty 都配置一遍?

          因為上面那種 tomcat + redis 的方式好用,但是會嚴重依賴于web容器,不好將代碼移植到其他 web 容器上去,尤其是你要是換了技術棧咋整?比如換成了 spring cloud 或者是 spring boot 之類的呢?

          所以現在比較好的還是基于 Java 一站式解決方案,也就是 spring。人家 spring 基本上承包了大部分我們需要使用的框架,spirng cloud 做微服務,spring boot 做腳手架,所以用 sping session 是一個很好的選擇。

          在 pom.xml 中配置:

          <dependency>
          ????<groupId>org.springframework.sessiongroupId>
          ????<artifactId>spring-session-data-redisartifactId>
          ????<version>1.2.1.RELEASEversion>
          dependency>

          <dependency>
          ??????<groupId>redis.clientsgroupId>
          ??????<artifactId>jedisartifactId>
          ??????<version>2.8.1version>
          dependency>

            在 spring 配置文件中配置:

          <bean?id="redisHttpSessionConfiguration"
          class="org.springframework.session.data.redis.config.annotation.web.http.RedisHttpSessionConfiguration">

          ????<property?name="maxInactiveIntervalInSeconds"?value="600"/>
          bean>
          <bean?id="jedisPoolConfig"?class="redis.clients.jedis.JedisPoolConfig">
          ????<property?name="maxTotal"?value="100"?/>
          ????<property?name="maxIdle"?value="10"?/>
          bean>
          <bean?id="jedisConnectionFactory"
          class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"?destroy-method="destroy">

          ????<property?name="hostName"?value="${redis_hostname}"/>
          ????<property?name="port"?value="${redis_port}"/>
          ????<property?name="password"?value="${redis_pwd}"?/>
          ????<property?name="timeout"?value="3000"/>
          ????<property?name="usePool"?value="true"/>
          ????<property?name="poolConfig"?ref="jedisPoolConfig"/>
          bean>

          在 web.xml 中配置:

          <filter>
          ??<filter-name>springSessionRepositoryFilterfilter-name>
          ??<filter-class>org.springframework.web.filter.DelegatingFilterProxyfilter-class>
          filter>
          <filter-mapping>
          ??<filter-name>springSessionRepositoryFilterfilter-name>
          ??<url-pattern>/*url-pattern>
          filter-mapping>

          示例代碼:

          @RestController
          @RequestMapping("/test")
          public?class?TestController?{

          ??@RequestMapping("/putIntoSession")
          ??public?String?putIntoSession(HttpServletRequest?request,?String?username)?{
          ????request.getSession().setAttribute("name",??"leo");
          ????return?"ok";
          ??}
          ????
          @RequestMapping("/getFromSession")
          ??public?String?getFromSession(HttpServletRequest?request,?Model?model){
          ????String?name?=?request.getSession().getAttribute("name");
          ????return?name;
          ??}
          ??
          }

          上面的代碼就是 ok 的,給 sping session 配置基于 redis 來存儲 session 數據,然后配置了一個 spring session 的過濾器,這樣的話,session 相關操作都會交給 spring session 來管了。接著在代碼中,就用原生的 session 操作,就是直接基于 spring sesion 從 redis 中獲取數據了。

          實現分布式的會話有很多種方式,我說的只不過是比較常見的幾種方式,tomcat + redis 早期比較常用,但是會重耦合到 tomcat 中;近些年,通過 spring session 來實現。

          二、分布式事務

          當我們的單個數據庫的性能產生瓶頸的時候,我們可能會對數據庫進行分區(qū),這里所說的分區(qū)指的是物理分區(qū),分區(qū)之后可能不同的庫就處于不同的服務器上了,這個時候單個數據庫的ACID已經不能適應這種情況了,而在這種ACID的集群環(huán)境下,再想保證集群的ACID幾乎是很難達到,或者即使能達到那么效率和性能會大幅下降,最為關鍵的是再很難擴展新的分區(qū)了,這個時候如果再追求集群的ACID會導致我們的系統(tǒng)變得很差,這時我們就需要引入一個新的理論原則來適應這種集群的情況,就是 CAP 原則或者叫CAP定理,那么CAP定理指的是什么呢?  

          CAP定理

          CAP定理是由加州大學伯克利分校Eric Brewer教授提出來的,他指出WEB服務無法同時滿足一下3個屬性:

          • 一致性(Consistency) :客戶端知道一系列的操作都會同時發(fā)生(生效)

          • 可用性(Availability) :每個操作都必須以可預期的響應結束

          • 分區(qū)容錯性(Partition tolerance) :即使出現單個組件無法可用,操作依然可以完成

            具體地講在分布式系統(tǒng)中,在任何數據庫設計中,一個Web應用至多只能同時支持上面的兩個屬性。顯然,任何橫向擴展策略都要依賴于數據分區(qū)。因此,設計人員必須在一致性與可用性之間做出選擇。

          這個定理在迄今為止的分布式系統(tǒng)中都是適用的!?為什么這么說呢?

          這個時候有同學可能會把數據庫的2PC(兩階段提交)搬出來說話了。OK,我們就來看一下數據庫的兩階段提交。

          對數據庫分布式事務有了解的同學一定知道數據庫支持的2PC,又叫做 XA Transactions。

          其中,XA 是一個兩階段提交協議,該協議分為以下兩個階段:

          • 第一階段:事務協調器要求每個涉及到事務的數據庫預提交(precommit)此操作,并反映是否可以提交.

          • 第二階段:事務協調器要求每個數據庫提交數據。

          其中,如果有任何一個數據庫否決此次提交,那么所有數據庫都會被要求回滾它們在此事務中的那部分信息。這樣做的缺陷是什么呢? 咋看之下我們可以在數據庫分區(qū)之間獲得一致性。

          如果CAP 定理是對的,那么它一定會影響到可用性。

          如果說系統(tǒng)的可用性代表的是執(zhí)行某項操作相關所有組件的可用性的和。那么在兩階段提交的過程中,可用性就代表了涉及到的每一個數據庫中可用性的和。我們假設兩階段提交的過程中每一個數據庫都具有99.9%的可用性,那么如果兩階段提交涉及到兩個數據庫,這個結果就是99.8%。根據系統(tǒng)可用性計算公式,假設每個月43200分鐘,99.9%的可用性就是43157分鐘, 99.8%的可用性就是43114分鐘,相當于每個月的宕機時間增加了43分鐘。

          以上,可以驗證出來,CAP定理從理論上來講是正確的,CAP我們先看到這里,等會再接著說。

          在分布式系統(tǒng)中,要實現分布式事務,無外乎那幾種解決方案。

          分布式事務的實現主要有以下 5 種方案:

          • XA 方案

          • TCC 方案

          • 本地消息表

          • 可靠消息最終一致性方案

          • 最大努力通知方案

          兩階段提交方案/XA方案

          所謂的 XA 方案,即:兩階段提交,有一個事務管理器的概念,負責協調多個數據庫(資源管理器)的事務,事務管理器先問問各個數據庫你準備好了嗎?如果每個數據庫都回復 ok,那么就正式提交事務,在各個數據庫上執(zhí)行操作;如果任何其中一個數據庫回答不 ok,那么就回滾事務。

          這種分布式事務方案,比較適合單塊應用里,跨多個庫的分布式事務,而且因為嚴重依賴于數據庫層面來搞定復雜的事務,效率很低,絕對不適合高并發(fā)的場景。如果要玩兒,那么基于?Spring + JTA?就可以搞定,自己隨便搜個 demo 看看就知道了。

          這個方案,我們很少用,一般來說某個系統(tǒng)內部如果出現跨多個庫的這么一個操作,是不合規(guī)的。我可以給大家介紹一下, 現在微服務,一個大的系統(tǒng)分成幾十個甚至幾百個服務。一般來說,我們的規(guī)定和規(guī)范,是要求每個服務只能操作自己對應的一個數據庫。

          如果你要操作別的服務對應的庫,不允許直連別的服務的庫,違反微服務架構的規(guī)范,你隨便交叉胡亂訪問,幾百個服務的話,全體亂套,這樣的一套服務是沒法管理的,沒法治理的,可能會出現數據被別人改錯,自己的庫被別人寫掛等情況。

          如果你要操作別人的服務的庫,你必須是通過調用別的服務的接口來實現,絕對不允許交叉訪問別人的數據庫。

          TCC 方案

          TCC 的全稱是:TryConfirmCancel

          • Try 階段:這個階段說的是對各個服務的資源做檢測以及對資源進行鎖定或者預留。

          • Confirm 階段:這個階段說的是在各個服務中執(zhí)行實際的操作。

          • Cancel 階段:如果任何一個服務的業(yè)務方法執(zhí)行出錯,那么這里就需要進行補償,就是執(zhí)行已經執(zhí)行成功的業(yè)務邏輯的回滾操作。(把那些執(zhí)行成功的回滾)

          這種方案說實話幾乎很少人使用,我們用的也比較少,但是也有使用的場景。因為這個事務回滾實際上是嚴重依賴于你自己寫代碼來回滾和補償了,會造成補償代碼巨大,非常之惡心。

          比如說我們,一般來說跟錢相關的,跟錢打交道的,支付、交易相關的場景,我們會用 TCC,嚴格保證分布式事務要么全部成功,要么全部自動回滾,嚴格保證資金的正確性,保證在資金上不會出現問題。

          而且最好是你的各個業(yè)務執(zhí)行的時間都比較短。

          但是說實話,一般盡量別這么搞,自己手寫回滾邏輯,或者是補償邏輯,實在太惡心了,那個業(yè)務代碼是很難維護的。

          本地消息表

          本地消息表其實是國外的 ebay 搞出來的這么一套思想。

          這個大概意思是這樣的:

          1. A 系統(tǒng)在自己本地一個事務里操作同時,插入一條數據到消息表;

          2. 接著 A 系統(tǒng)將這個消息發(fā)送到 MQ 中去;

          3. B 系統(tǒng)接收到消息之后,在一個事務里,往自己本地消息表里插入一條數據,同時執(zhí)行其他的業(yè)務操作,如果這個消息已經被處理過了,那么此時這個事務會回滾,這樣保證不會重復處理消息;

          4. B 系統(tǒng)執(zhí)行成功之后,就會更新自己本地消息表的狀態(tài)以及 A 系統(tǒng)消息表的狀態(tài);

          5. 如果 B 系統(tǒng)處理失敗了,那么就不會更新消息表狀態(tài),那么此時 A 系統(tǒng)會定時掃描自己的消息表,如果有未處理的消息,會再次發(fā)送到 MQ 中去,讓 B 再次處理;

          6. 這個方案保證了最終一致性,哪怕 B 事務失敗了,但是 A 會不斷重發(fā)消息,直到 B 那邊成功為止。

          這個方案說實話最大的問題就在于嚴重依賴于數據庫的消息表來管理事務啥的,如果是高并發(fā)場景咋辦呢?咋擴展呢?所以一般確實很少用。

          可靠消息最終一致性方案

          這個的意思,就是干脆不要用本地的消息表了,直接基于 MQ 來實現事務。比如阿里的 RocketMQ 就支持消息事務。

          大概的意思就是:

          1. A 系統(tǒng)先發(fā)送一個 prepared 消息到 mq,如果這個 prepared 消息發(fā)送失敗那么就直接取消操作別執(zhí)行了;

          2. 如果這個消息發(fā)送成功過了,那么接著執(zhí)行本地事務,如果成功就告訴 mq 發(fā)送確認消息,如果失敗就告訴 mq 回滾消息;

          3. 如果發(fā)送了確認消息,那么此時 B 系統(tǒng)會接收到確認消息,然后執(zhí)行本地的事務;

          4. mq 會自動定時輪詢所有 prepared 消息回調你的接口,問你,這個消息是不是本地事務處理失敗了,所有沒發(fā)送確認的消息,是繼續(xù)重試還是回滾?一般來說這里你就可以查下數據庫看之前本地事務是否執(zhí)行,如果回滾了,那么這里也回滾吧。這個就是避免可能本地事務執(zhí)行成功了,而確認消息卻發(fā)送失敗了。

          5. 這個方案里,要是系統(tǒng) B 的事務失敗了咋辦?重試咯,自動不斷重試直到成功,如果實在是不行,要么就是針對重要的資金類業(yè)務進行回滾,比如 B 系統(tǒng)本地回滾后,想辦法通知系統(tǒng) A 也回滾;或者是發(fā)送報警由人工來手工回滾和補償。

          6. 這個還是比較合適的,目前國內互聯網公司大都是這么玩兒的,要不你舉用 RocketMQ 支持的,要不你就自己基于類似 ActiveMQ?RabbitMQ?自己封裝一套類似的邏輯出來,總之思路就是這樣子的。

          最大努力通知方案

          這個方案的大致意思就是:

          1. 系統(tǒng) A 本地事務執(zhí)行完之后,發(fā)送個消息到 MQ;

          2. 這里會有個專門消費 MQ 的最大努力通知服務,這個服務會消費 MQ 然后寫入數據庫中記錄下來,或者是放入個內存隊列也可以,接著調用系統(tǒng) B 的接口;

          3. 要是系統(tǒng) B 執(zhí)行成功就 ok 了;要是系統(tǒng) B 執(zhí)行失敗了,那么最大努力通知服務就定時嘗試重新調用系統(tǒng) B,反復 N 次,最后還是不行就放棄。

          你們公司是如何處理分布式事務的?

          如果你真的被問到,可以這么說,我們某某特別嚴格的場景,用的是 TCC 來保證強一致性;然后其他的一些場景基于阿里的 RocketMQ 來實現分布式事務。

          你找一個嚴格資金要求絕對不能錯的場景,你可以說你是用的 TCC 方案;如果是一般的分布式事務場景,訂單插入之后要調用庫存服務更新庫存,庫存數據沒有資金那么的敏感,可以用可靠消息最終一致性方案。

          友情提示一下,RocketMQ 3.2.6 之前的版本,是可以按照上面的思路來的,但是之后接口做了一些改變,我這里不再贅述了。

          當然如果你愿意,你可以參考可靠消息最終一致性方案來自己實現一套分布式事務,比如基于 RocketMQ 來玩兒。

          三、分布式鎖

          redis 分布式鎖

          redis 最普通的分布式鎖

          第一個最普通的實現方式,就是在 redis 里創(chuàng)建一個 key,這樣就算加鎖。

          SET my:lock 隨機值 NX PX 30000

          執(zhí)行這個命令就 ok。

          • NX:表示只有?key?不存在的時候才會設置成功。(如果此時 redis 中存在這個 key,那么設置失敗,返回?nil

          • PX 30000:意思是 30s 后鎖自動釋放。別人創(chuàng)建的時候如果發(fā)現已經有了就不能加鎖了。

          釋放鎖就是刪除 key ,但是一般可以用?lua?腳本刪除,判斷 value 一樣才刪除:

          -- 刪除鎖的時候,找到 key 對應的 value,跟自己傳過去的 value 做比較,如果是一樣的

          if?redis.call("get",KEYS[1])?==?ARGV[1]?then
          ??return?redis.call("del",KEYS[1])
          else
          ??return?0
          end


          為啥要用隨機值呢?因為如果某個客戶端獲取到了鎖,但是阻塞了很長時間才執(zhí)行完,比如說超過了 30s,此時可能已經自動釋放鎖了,此時可能別的客戶端已經獲取到了這個鎖,要是你這個時候直接刪除 key 的話會有問題,所以得用隨機值加上面的?lua?腳本來釋放鎖。

          但是這樣是肯定不行的。因為如果是普通的 redis 單實例,那就是單點故障。或者是 redis 普通主從,那 redis 主從異步復制,如果主節(jié)點掛了(key 就沒有了),key 還沒同步到從節(jié)點,此時從節(jié)點切換為主節(jié)點,別人就可以 set key,從而拿到鎖。

          RedLock 算法

          這個場景是假設有一個 redis cluster,有 5 個 redis master 實例。然后執(zhí)行如下步驟獲取一把鎖:

          1. 獲取當前時間戳,單位是毫秒;

          2. 跟上面類似,輪流嘗試在每個 master 節(jié)點上創(chuàng)建鎖,過期時間較短,一般就幾十毫秒;

          3. 嘗試在大多數節(jié)點上建立一個鎖,比如 5 個節(jié)點就要求是 3 個節(jié)點?n / 2 + 1

          4. 客戶端計算建立好鎖的時間,如果建立鎖的時間小于超時時間,就算建立成功了;

          5. 要是鎖建立失敗了,那么就依次之前建立過的鎖刪除;

          6. 只要別人建立了一把分布式鎖,你就得不斷輪詢去嘗試獲取鎖。

          ?

          zk 分布式鎖

          zk 分布式鎖,其實可以做的比較簡單,就是某個節(jié)點嘗試創(chuàng)建臨時 znode,此時創(chuàng)建成功了就獲取了這個鎖;這個時候別的客戶端來創(chuàng)建鎖會失敗,只能注冊個監(jiān)聽器監(jiān)聽這個鎖。釋放鎖就是刪除這個 znode,一旦釋放掉就會通知客戶端,然后有一個等待著的客戶端就可以再次重新加鎖。

            

          public?class?ZooKeeperSession?{

          private?static?CountDownLatch?connectedSemaphore?=?new?CountDownLatch(1);

          private?ZooKeeper?zookeeper;
          private?CountDownLatch?latch;

          public?ZooKeeperSession()?{
          try?{
          this.zookeeper?=?new?ZooKeeper("192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181",?50000,?new?ZooKeeperWatcher());
          try?{
          ????????????????connectedSemaphore.await();
          ????????????}?catch?(InterruptedException?e)?{
          ????????????????e.printStackTrace();
          ????????????}

          ????????????System.out.println("ZooKeeper?session?established......");
          ????????}?catch?(Exception?e)?{
          ????????????e.printStackTrace();
          ????????}
          ????}

          /**
          ?????*?獲取分布式鎖
          ?????*
          ?????*?@param?productId
          ?????*/

          public?Boolean?acquireDistributedLock(Long?productId)?{
          ????????String?path?=?"/product-lock-"?+?productId;

          try?{
          ????????????zookeeper.create(path,?"".getBytes(),?Ids.OPEN_ACL_UNSAFE,?CreateMode.EPHEMERAL);
          return?true;
          ????????}?catch?(Exception?e)?{
          while?(true)?{
          try?{
          //?相當于是給node注冊一個監(jiān)聽器,去看看這個監(jiān)聽器是否存在
          ????????????????????Stat?stat?=?zk.exists(path,?true);

          if?(stat?!=?null)?{
          this.latch?=?new?CountDownLatch(1);
          this.latch.await(waitTime,?TimeUnit.MILLISECONDS);
          this.latch?=?null;
          ????????????????????}
          ????????????????????zookeeper.create(path,?"".getBytes(),?Ids.OPEN_ACL_UNSAFE,?CreateMode.EPHEMERAL);
          return?true;
          ????????????????}?catch?(Exception?ee)?{
          continue;
          ????????????????}
          ????????????}

          ????????}
          return?true;
          ????}

          /**
          ?????*?釋放掉一個分布式鎖
          ?????*
          ?????*?@param?productId
          ?????*/

          public?void?releaseDistributedLock(Long?productId)?{
          ????????String?path?=?"/product-lock-"?+?productId;
          try?{
          ????????????zookeeper.delete(path,?-1);
          ????????????System.out.println("release?the?lock?for?product[id="?+?productId?+?"]......");
          ????????}?catch?(Exception?e)?{
          ????????????e.printStackTrace();
          ????????}
          ????}

          /**
          ?????*?建立zk?session的watcher
          ?????*
          ?????*?@author?bingo
          ?????*?@since?2018/11/29
          ?????*
          ?????*/

          private?class?ZooKeeperWatcher?implements?Watcher?{

          public?void?process(WatchedEvent?event)?{
          ????????????System.out.println("Receive?watched?event:?"?+?event.getState());

          if?(KeeperState.SyncConnected?==?event.getState())?{
          ????????????????connectedSemaphore.countDown();
          ????????????}

          if?(this.latch?!=?null)?{
          this.latch.countDown();
          ????????????}
          ????????}

          ????}

          /**
          ?????*?封裝單例的靜態(tài)內部類
          ?????*
          ?????*?@author?bingo
          ?????*?@since?2018/11/29
          ?????*
          ?????*/

          private?static?class?Singleton?{

          private?static?ZooKeeperSession?instance;

          static?{
          ????????????instance?=?new?ZooKeeperSession();
          ????????}

          public?static?ZooKeeperSession?getInstance()?{
          return?instance;
          ????????}

          ????}

          /**
          ?????*?獲取單例
          ?????*
          ?????*?@return
          ?????*/

          public?static?ZooKeeperSession?getInstance()?{
          return?Singleton.getInstance();
          ????}

          /**
          ?????*?初始化單例的便捷方法
          ?????*/

          public?static?void?init()?{
          ????????getInstance();
          ????}

          }

          也可以采用另一種方式,創(chuàng)建臨時順序節(jié)點:

          如果有一把鎖,被多個人給競爭,此時多個人會排隊,第一個拿到鎖的人會執(zhí)行,然后釋放鎖;后面的每個人都會去監(jiān)聽排在自己前面的那個人創(chuàng)建的 node 上,一旦某個人釋放了鎖,排在自己后面的人就會被 zookeeper 給通知,一旦被通知了之后,就 ok 了,自己就獲取到了鎖,就可以執(zhí)行代碼了。

          public?class?ZooKeeperDistributedLock?implements?Watcher?{

          private?ZooKeeper?zk;
          private?String?locksRoot?=?"/locks";
          private?String?productId;
          private?String?waitNode;
          private?String?lockNode;
          private?CountDownLatch?latch;
          private?CountDownLatch?connectedLatch?=?new?CountDownLatch(1);
          private?int?sessionTimeout?=?30000;

          public?ZooKeeperDistributedLock(String?productId)?{
          this.productId?=?productId;
          try?{
          ????????????String?address?=?"192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181";
          ????????????zk?=?new?ZooKeeper(address,?sessionTimeout,?this);
          ????????????connectedLatch.await();
          ????????}?catch?(IOException?e)?{
          throw?new?LockException(e);
          ????????}?catch?(KeeperException?e)?{
          throw?new?LockException(e);
          ????????}?catch?(InterruptedException?e)?{
          throw?new?LockException(e);
          ????????}
          ????}

          public?void?process(WatchedEvent?event)?{
          if?(event.getState()?==?KeeperState.SyncConnected)?{
          ????????????connectedLatch.countDown();
          return;
          ????????}

          if?(this.latch?!=?null)?{
          this.latch.countDown();
          ????????}
          ????}

          public?void?acquireDistributedLock()?{
          try?{
          if?(this.tryLock())?{
          return;
          ????????????}?else?{
          ????????????????waitForLock(waitNode,?sessionTimeout);
          ????????????}
          ????????}?catch?(KeeperException?e)?{
          throw?new?LockException(e);
          ????????}?catch?(InterruptedException?e)?{
          throw?new?LockException(e);
          ????????}
          ????}

          public?boolean?tryLock()?{
          try?{
          //?傳入進去的locksRoot?+?“/”?+?productId
          //?假設productId代表了一個商品id,比如說1
          //?locksRoot?=?locks
          //?/locks/10000000000,/locks/10000000001,/locks/10000000002
          ????????????lockNode?=?zk.create(locksRoot?+?"/"?+?productId,?new?byte[0],?ZooDefs.Ids.OPEN_ACL_UNSAFE,?CreateMode.EPHEMERAL_SEQUENTIAL);

          //?看看剛創(chuàng)建的節(jié)點是不是最小的節(jié)點
          // locks:10000000000,10000000001,10000000002
          ????????????List?locks?=?zk.getChildren(locksRoot,?false);
          ????????????Collections.sort(locks);

          if(lockNode.equals(locksRoot+"/"+?locks.get(0))){
          //如果是最小的節(jié)點,則表示取得鎖
          return?true;
          ????????????}

          //如果不是最小的節(jié)點,找到比自己小1的節(jié)點
          int?previousLockIndex?=?-1;
          for(int?i?=?0;?i?if(lockNode.equals(locksRoot?+?“/”?+?locks.get(i)))?{
          ?????????????????????previousLockIndex?=?i?-?1;
          break;
          ????????}
          ???????}

          this.waitNode?=?locks.get(previousLockIndex);
          ????????}?catch?(KeeperException?e)?{
          throw?new?LockException(e);
          ????????}?catch?(InterruptedException?e)?{
          throw?new?LockException(e);
          ????????}
          return?false;
          ????}

          private?boolean?waitForLock(String?waitNode,?long?waitTime)?throws?InterruptedException,?KeeperException?{
          ????????Stat?stat?=?zk.exists(locksRoot?+?"/"?+?waitNode,?true);
          if?(stat?!=?null)?{
          this.latch?=?new?CountDownLatch(1);
          this.latch.await(waitTime,?TimeUnit.MILLISECONDS);
          this.latch?=?null;
          ????????}
          return?true;
          ????}

          public?void?unlock()?{
          try?{
          //?刪除/locks/10000000000節(jié)點
          //?刪除/locks/10000000001節(jié)點
          ????????????System.out.println("unlock?"?+?lockNode);
          ????????????zk.delete(lockNode,?-1);
          ????????????lockNode?=?null;
          ????????????zk.close();
          ????????}?catch?(InterruptedException?e)?{
          ????????????e.printStackTrace();
          ????????}?catch?(KeeperException?e)?{
          ????????????e.printStackTrace();
          ????????}
          ????}

          public?class?LockException?extends?RuntimeException?{
          private?static?final?long?serialVersionUID?=?1L;

          public?LockException(String?e)?{
          ????????????super(e);
          ????????}

          public?LockException(Exception?e)?{
          ????????????super(e);
          ????????}
          ????}
          }

          redis 分布式鎖和 zk 分布式鎖的對比

          • redis 分布式鎖,其實需要自己不斷去嘗試獲取鎖,比較消耗性能。

          • zk 分布式鎖,獲取不到鎖,注冊個監(jiān)聽器即可,不需要不斷主動嘗試獲取鎖,性能開銷較小。

          另外一點就是,如果是 redis 獲取鎖的那個客戶端 出現 bug 掛了,那么只能等待超時時間之后才能釋放鎖;而 zk 的話,因為創(chuàng)建的是臨時 znode,只要客戶端掛了,znode 就沒了,此時就自動釋放鎖。

          redis 分布式鎖大家沒發(fā)現好麻煩嗎?遍歷上鎖,計算時間等等......zk 的分布式鎖語義清晰實現簡單。

          所以先不分析太多的東西,就說這兩點,我個人實踐認為 zk 的分布式鎖比 redis 的分布式鎖牢靠、而且模型簡單易用。?

          程序員的眼里,不止有代碼和bug,還有詩與遠方和妹子!!!


          往期推薦

          臥槽!竟然可以直接白嫖 Github Action 的 2C7G 服務器!

          阿里取消“P”序列職級顯示引熱議,網友:P3、P4流下了感動的淚水

          RocketMQ 消息丟失場景及解決辦法

          IntelliJ IDEA 2020.2.1 發(fā)布,Lombok插件可能被官方支持

          思科前員工離職后刪庫,直接損失達 240 萬美元


          離職后與大家在星球聊了很多,你不來看看?


          我的星球是否適合你?

          點擊閱讀原文看看我們都聊過啥?

          瀏覽 65
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  性爱视频大香蕉 | 婷婷色在线 | 国产又爽 又黄 免费视频两年半 | 日韩黄色电影免费看 | 免费黄色成人在线观看 |