<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          聊聊分布式鎖——Redis和Redisson的方式

          共 45307字,需瀏覽 91分鐘

           ·

          2021-09-17 15:26

          點(diǎn)擊藍(lán)色“JavaKeeper”關(guān)注我喲
          加個“星標(biāo)”,一起成長,做牛逼閃閃的技術(shù)人

          Keeper導(dǎo)讀:分布式鎖的文章其實(shí)早就爛大街了,但有些“菜鳥”寫的太淺,或者自己估計(jì)都沒搞明白,沒用過,看完后我更懵逼了,有些“大牛”寫的吧,又太高級,只能看懂前半部分,后邊就開始講論文了,也比較懵逼,所以還得我這個中不溜的來總結(jié)下。文章攏共分為幾個部分:

          • 什么是分布式鎖

          • 分布式鎖的實(shí)現(xiàn)要求

          • 基于 Redisson 實(shí)現(xiàn)的 Redis 分布式鎖

          一、什么是分布式鎖

          分布式~~鎖,要這么念,首先得是『分布式』,然后才是『鎖』

          • 分布式:這里的分布式指的是分布式系統(tǒng),涉及到好多技術(shù)和理論,包括CAP 理論、分布式存儲、分布式事務(wù)、分布式鎖...

            分布式系統(tǒng)是由一組通過網(wǎng)絡(luò)進(jìn)行通信、為了完成共同的任務(wù)而協(xié)調(diào)工作的計(jì)算機(jī)節(jié)點(diǎn)組成的系統(tǒng)。

            分布式系統(tǒng)的出現(xiàn)是為了用廉價的、普通的機(jī)器完成單個計(jì)算機(jī)無法完成的計(jì)算、存儲任務(wù)。其目的是利用更多的機(jī)器,處理更多的數(shù)據(jù)

          • 鎖:對對,就是你想的那個,Javer 學(xué)的第一個鎖應(yīng)該就是 synchronized

            Java 初級面試問題,來拼寫下 賽克瑞納挨日的

            從鎖的使用場景有來看下邊這 3 種鎖:

            線程鎖synchronized 是用在方法或代碼塊中的,我們把它叫『線程鎖』,線程鎖的實(shí)現(xiàn)其實(shí)是靠線程之間共享內(nèi)存實(shí)現(xiàn)的,說白了就是內(nèi)存中的一個整型數(shù),有空閑、上鎖這類狀態(tài),比如 synchronized 是在對象頭中的 Mark Word 有個鎖狀態(tài)標(biāo)志,Lock 的實(shí)現(xiàn)類大部分都有個叫 volatile int state 的共享變量來做狀態(tài)標(biāo)志。

            進(jìn)程鎖:為了控制同一操作系統(tǒng)中多個進(jìn)程訪問某個共享資源,因?yàn)檫M(jìn)程具有獨(dú)立性,各個進(jìn)程無法訪問其他進(jìn)程的資源,因此無法通過 synchronized 等線程鎖實(shí)現(xiàn)進(jìn)程鎖。比如說,我們的同一個 linux 服務(wù)器,部署了好幾個 Java 項(xiàng)目,有可能同時訪問或操作服務(wù)器上的相同數(shù)據(jù),這就需要進(jìn)程鎖,一般可以用『文件鎖』來達(dá)到進(jìn)程互斥。

            分布式鎖:隨著用戶越來越多,我們上了好多服務(wù)器,原本有個定時給客戶發(fā)郵件的任務(wù),如果不加以控制的話,到點(diǎn)后每臺機(jī)器跑一次任務(wù),客戶就會收到 N 條郵件,這就需要通過分布式鎖來互斥了。

            書面解釋:分布式鎖是控制分布式系統(tǒng)或不同系統(tǒng)之間共同訪問共享資源的一種鎖實(shí)現(xiàn),如果不同的系統(tǒng)或同一個系統(tǒng)的不同主機(jī)之間共享了某個資源時,往往需要互斥來防止彼此干擾來保證一致性。

          知道了什么是分布式鎖,接下來就到了技術(shù)選型環(huán)節(jié)

          二、分布式鎖要怎么搞

          要實(shí)現(xiàn)一個分布式鎖,我們一般選擇集群機(jī)器都可以操作的外部系統(tǒng),然后各個機(jī)器都去這個外部系統(tǒng)申請鎖。

          這個外部系統(tǒng)一般需要滿足如下要求才能勝任:

          1. 互斥:在任意時刻,只能有一個客戶端能持有鎖。
          2. 防止死鎖:即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證后續(xù)其他客戶端能加鎖。所以鎖一般要有一個過期時間。
          3. 獨(dú)占性:解鈴還須系鈴人,加鎖和解鎖必須是同一個客戶端,一把鎖只能有一把鑰匙,客戶端自己的鎖不能被別人給解開,當(dāng)然也不能去開別人的鎖。
          4. 容錯:外部系統(tǒng)不能太“脆弱”,要保證外部系統(tǒng)的正常運(yùn)行,客戶端才可以加鎖和解鎖。

          我覺得可以這么類比:

          好多商販要租用某個倉庫,同一時刻,只能給一個商販租用,且只能有一把鑰匙,還得有固定的“租期”,到期后要回收的,當(dāng)然最重要的是倉庫門不能壞了,要不鎖都鎖不住。這不就是分布式鎖嗎?

          感慨自己真是個愛技術(shù)愛生活的程序猿~~

          其實(shí)鎖,本質(zhì)上就是用來進(jìn)行防重操作的(數(shù)據(jù)一致性),像查詢這種冪等操作,就不需要費(fèi)這勁

          直接上結(jié)論:

          分布式鎖一般有三種實(shí)現(xiàn)方式:1. 數(shù)據(jù)庫樂觀鎖;2. 基于 Redis 的分布式鎖;3. 基于 ZooKeeper 的分布式鎖。

          但為了追求更好的性能,我們通常會選擇使用 Redis 或 Zookeeper 來做。

          想必也有喜歡問為什么的同學(xué),那數(shù)據(jù)庫客觀鎖怎么就性能不好了?

          使用數(shù)據(jù)庫樂觀鎖,包括主鍵防重,版本號控制。但是這兩種方法各有利弊。

          • 使用主鍵沖突的策略進(jìn)行防重,在并發(fā)量非常高的情況下對數(shù)據(jù)庫性能會有影響,尤其是應(yīng)用數(shù)據(jù)表和主鍵沖突表在一個庫的時候,表現(xiàn)更加明顯。還有就是在 MySQL 數(shù)據(jù)庫中采用主鍵沖突防重,在大并發(fā)情況下有可能會造成鎖表現(xiàn)象,比較好的辦法是在程序中生產(chǎn)主鍵進(jìn)行防重。

          • 使用版本號策略

            這個策略源于 MySQL 的  MVCC 機(jī)制,使用這個策略其實(shí)本身沒有什么問題,唯一的問題就是對數(shù)據(jù)表侵入較大,我們要為每個表設(shè)計(jì)一個版本號字段,然后寫一條判斷 SQL 每次進(jìn)行判斷。

          第三趴,編碼

          三、基于 Redis 的分布式鎖

          其實(shí) Redis 官網(wǎng)已經(jīng)給出了實(shí)現(xiàn):https://redis.io/topics/distlock,說各種書籍和博客用了各種手段去用 Redis 實(shí)現(xiàn)分布式鎖,建議用 Redlock 實(shí)現(xiàn),這樣更規(guī)范、更安全。我們循序漸進(jìn)來看

          我們默認(rèn)指定大家用的是 Redis 2.6.12 及更高的版本,就不再去講 setnxexpire 這種了,直接 set 命令加鎖

          set key value[expiration EX seconds|PX milliseconds] [NX|XX]

          eg:

          SET resource_name my_random_value NX PX 30000

          SET 命令的行為可以通過一系列參數(shù)來修改

          • EX second :設(shè)置鍵的過期時間為 second 秒。SET key value EX second 效果等同于 SETEX key second value
          • PX millisecond :設(shè)置鍵的過期時間為 millisecond 毫秒。SET key value PX millisecond 效果等同于 PSETEX key millisecond value
          • NX :只在鍵不存在時,才對鍵進(jìn)行設(shè)置操作。SET key value NX 效果等同于 SETNX key value
          • XX :只在鍵已經(jīng)存在時,才對鍵進(jìn)行設(shè)置操作。

          這條指令的意思:當(dāng) key——resource_name 不存在時創(chuàng)建這樣的key,設(shè)值為 my_random_value,并設(shè)置過期時間 30000 毫秒。

          別看這干了兩件事,因?yàn)?Redis 是單線程的,這一條指令不會被打斷,所以是原子性的操作。

          Redis 實(shí)現(xiàn)分布式鎖的主要步驟:

          1. 指定一個 key 作為鎖標(biāo)記,存入 Redis 中,指定一個 唯一的標(biāo)識 作為 value。
          2. 當(dāng) key 不存在時才能設(shè)置值,確保同一時間只有一個客戶端進(jìn)程獲得鎖,滿足 互斥性 特性。
          3. 設(shè)置一個過期時間,防止因系統(tǒng)異常導(dǎo)致沒能刪除這個 key,滿足 防死鎖 特性。
          4. 當(dāng)處理完業(yè)務(wù)之后需要清除這個 key 來釋放鎖,清除 key 時需要校驗(yàn) value 值,需要滿足 解鈴還須系鈴人

          設(shè)置一個隨機(jī)值的意思是在解鎖時候判斷 key 的值和我們存儲的隨機(jī)數(shù)是不是一樣,一樣的話,才是自己的鎖,直接 del 解鎖就行。

          當(dāng)然這個兩個操作要保證原子性,所以 Redis 給出了一段 lua 腳本(Redis 服務(wù)器會單線程原子性執(zhí)行 lua 腳本,保證 lua 腳本在處理的過程中不會被任意其它請求打斷。):

          if redis.call("get",KEYS[1]) == ARGV[1then
              return redis.call("del",KEYS[1])
          else
              return 0
          end

          問題:

          我們先拋出兩個問題思考:

          1. 獲取鎖時,過期時間要設(shè)置多少合適呢?

            預(yù)估一個合適的時間,其實(shí)沒那么容易,比如操作資源的時間最慢可能要 10 s,而我們只設(shè)置了 5 s  就過期,那就存在鎖提前過期的風(fēng)險。這個問題先記下,我們先看下 Javaer 要怎么在代碼中用 Redis 鎖。

          2. 容錯性如何保證呢?

            Redis 掛了怎么辦,你可能會說上主從、上集群,但也會出現(xiàn)這樣的極端情況,當(dāng)我們上鎖后,主節(jié)點(diǎn)就掛了,這個時候還沒來的急同步到從節(jié)點(diǎn),主從切換后鎖還是丟了

          帶著這兩個問題,我們接著看

          Redisson 實(shí)現(xiàn)代碼

          redisson 是 Redis 官方的分布式鎖組件。GitHub 地址:https://github.com/redisson/redisson

          Redisson 是一個在 Redis 的基礎(chǔ)上實(shí)現(xiàn)的 Java 駐內(nèi)存數(shù)據(jù)網(wǎng)格(In-Memory Data Grid)。它不僅提供了一系列的分布式的 Java 常用對象,還實(shí)現(xiàn)了可重入鎖(Reentrant Lock)、公平鎖(Fair Lock、聯(lián)鎖(MultiLock)、 紅鎖(RedLock)、 讀寫鎖(ReadWriteLock)等,還提供了許多分布式服務(wù)。Redisson 提供了使用 Redis 的最簡單和最便捷的方法。Redisson 的宗旨是促進(jìn)使用者對 Redis 的關(guān)注分離(Separation of Concern),從而讓使用者能夠?qū)⒕Ω械胤旁谔幚順I(yè)務(wù)邏輯上。

          redisson 現(xiàn)在已經(jīng)很強(qiáng)大了,github 的 wiki 也很詳細(xì),分布式鎖的介紹直接戳 Distributed locks and synchronizers

          Redisson 支持單點(diǎn)模式、主從模式、哨兵模式、集群模式,只是配置的不同,我們以單點(diǎn)模式來看下怎么使用,代碼很簡單,都已經(jīng)為我們封裝好了,直接拿來用就好,詳細(xì)的demo,我放在了 github: starfish-learn-redisson 上,這里就不一步步來了

          RLock lock = redisson.getLock("myLock");

          RLock 提供了各種鎖方法,我們來解讀下這個接口方法,

          注:代碼為 3.16.2 版本,可以看到繼承自 JDK 的 Lock 接口,和 Reddsion 的異步鎖接口 RLockAsync(這個我們先不研究)

          RLock

          public interface RLock extends LockRLockAsync {

              /**
               * 獲取鎖的名字
               */

              String getName();
              
              /**
               * 這個叫終端鎖操作,表示該鎖可以被中斷 假如A和B同時調(diào)這個方法,A獲取鎖,B為獲取鎖,那么B線程可以通過
               * Thread.currentThread().interrupt(); 方法真正中斷該線程
               */

              void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException;

              /**
               * 這個應(yīng)該是最常用的,嘗試獲取鎖
               * waitTimeout 嘗試獲取鎖的最大等待時間,超過這個值,則認(rèn)為獲取鎖失敗
               * leaseTime   鎖的持有時間,超過這個時間鎖會自動失效(值應(yīng)設(shè)置為大于業(yè)務(wù)處理的時間,確保在鎖有效期內(nèi)業(yè)務(wù)能處理完)
               */

              boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;

              /**
               * 鎖的有效期設(shè)置為 leaseTime,過期后自動失效
               * 如果 leaseTime 設(shè)置為 -1, 表示不主動過期
               */

              void lock(long leaseTime, TimeUnit unit);

              /**
               * Unlocks the lock independently of its state
               */

              boolean forceUnlock();

              /**
               * 檢查是否被另一個線程鎖住
               */

              boolean isLocked();

              /**
               * 檢查當(dāng)前線線程是否持有該鎖
               */

              boolean isHeldByCurrentThread();
            
               /**
               *  這個就明了了,檢查指定線程是否持有鎖
               */

              boolean isHeldByThread(long threadId);

              /**
               * 返回當(dāng)前線程持有鎖的次數(shù)
               */

              int getHoldCount();

              /**
               * 返回鎖的剩余時間
               * @return time in milliseconds
               *          -2 if the lock does not exist.
               *          -1 if the lock exists but has no associated expire.
               */

              long remainTimeToLive();
              
          }

          Demo

          Config config = new Config();
          config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("").setDatabase(1);
          RedissonClient redissonClient = Redisson.create(config);
          RLock disLock = redissonClient.getLock("mylock");
          boolean isLock;
          try {
            /**
             * 嘗試獲取鎖的最大等待時間是 100 秒,超過這個值還沒獲取到,就認(rèn)為獲取失敗
             * 鎖的持有時間是 10 秒
             */

            isLock = disLock.tryLock(10010, TimeUnit.MILLISECONDS);
            if (isLock) {
              //做自己的業(yè)務(wù)
              Thread.sleep(10000);
            }
          catch (Exception e) {
             e.printStackTrace();
          finally {
             disLock.unlock();
          }

          就是這么簡單,Redisson 已經(jīng)做好了封裝,使用起來 so easy,如果使用主從、哨兵、集群這種也只是配置不同。

          原理

          看源碼小 tips,最好是 fork 到自己的倉庫,然后拉到本地,邊看邊注釋,然后提交到自己的倉庫,也方便之后再看,不想這么麻煩的,也可以直接看我的 Jstarfish/redisson

          先看下 RLock 的類關(guān)系

          跟著源碼,可以發(fā)現(xiàn) RedissonLock 是 RLock 的直接實(shí)現(xiàn),也是我們加鎖、解鎖操作的核心類

          加鎖

          主要的加鎖方法就下邊這兩個,區(qū)別也很簡單,一個有等待時間,一個沒有,所以我們挑個復(fù)雜的看(源碼包含了另一個的絕大部分)

          boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
          void lock(long leaseTime, TimeUnit unit);

          RedissonLock.tryLock

          @Override
          public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
              // 獲取等鎖的最長時間
              long time = unit.toMillis(waitTime);
              long current = System.currentTimeMillis();
              //取得當(dāng)前線程id(判斷是否可重入鎖的關(guān)鍵)
              long threadId = Thread.currentThread().getId();
              // 【核心點(diǎn)1】嘗試獲取鎖,若返回值為null,則表示已獲取到鎖,返回的ttl就是key的剩余存活時間
              Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
              if (ttl == null) {
                  return true;
              }
              // 還可以容忍的等待時長 = 獲取鎖能容忍的最大等待時長 - 執(zhí)行完上述操作流程的時間
              time -= System.currentTimeMillis() - current;
              if (time <= 0) {
                  //等不到了,直接返回失敗
                  acquireFailed(waitTime, unit, threadId);
                  return false;
              }

              current = System.currentTimeMillis();
              /**
               * 【核心點(diǎn)2】
               * 訂閱解鎖消息 redisson_lock__channel:{$KEY},并通過await方法阻塞等待鎖釋放,解決了無效的鎖申請浪費(fèi)資源的問題:
               * 基于信息量,當(dāng)鎖被其它資源占用時,當(dāng)前線程通過 Redis 的 channel 訂閱鎖的釋放事件,一旦鎖釋放會發(fā)消息通知待等待的線程進(jìn)行競爭
               * 當(dāng) this.await返回false,說明等待時間已經(jīng)超出獲取鎖最大等待時間,取消訂閱并返回獲取鎖失敗
               * 當(dāng) this.await返回true,進(jìn)入循環(huán)嘗試獲取鎖
               */

              RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
              //await 方法內(nèi)部是用CountDownLatch來實(shí)現(xiàn)阻塞,獲取subscribe異步執(zhí)行的結(jié)果(應(yīng)用了Netty 的 Future)
              if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
                  if (!subscribeFuture.cancel(false)) {
                      subscribeFuture.onComplete((res, e) -> {
                          if (e == null) {
                              unsubscribe(subscribeFuture, threadId);
                          }
                      });
                  }
                  acquireFailed(waitTime, unit, threadId);
                  return false;
              }

              // ttl 不為空,表示已經(jīng)有這樣的key了,只能阻塞等待
              try {
                  time -= System.currentTimeMillis() - current;
                  if (time <= 0) {
                      acquireFailed(waitTime, unit, threadId);
                      return false;
                  }

                  // 來個死循環(huán),繼續(xù)嘗試著獲取鎖
                  while (true) {
                      long currentTime = System.currentTimeMillis();
                      ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                      if (ttl == null) {
                          return true;
                      }

                      time -= System.currentTimeMillis() - currentTime;
                      if (time <= 0) {
                          acquireFailed(waitTime, unit, threadId);
                          return false;
                      }

                      currentTime = System.currentTimeMillis();

                     /**
                      * 【核心點(diǎn)3】根據(jù)鎖TTL,調(diào)整阻塞等待時長;
                      * 1、latch其實(shí)是個信號量Semaphore,調(diào)用其tryAcquire方法會讓當(dāng)前線程阻塞一段時間,避免在while循環(huán)中頻繁請求獲鎖;
                      *  當(dāng)其他線程釋放了占用的鎖,會廣播解鎖消息,監(jiān)聽器接收解鎖消息,并釋放信號量,最終會喚醒阻塞在這里的線程
                      * 2、該Semaphore的release方法,會在訂閱解鎖消息的監(jiān)聽器消息處理方法org.redisson.pubsub.LockPubSub#onMessage調(diào)用;
                      */

                      //調(diào)用信號量的方法來阻塞線程,時長為鎖等待時間和租期時間中較小的那個
                      if (ttl >= 0 && ttl < time) {
                          subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                      } else {
                          subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                      }

                      time -= System.currentTimeMillis() - currentTime;
                      if (time <= 0) {
                          acquireFailed(waitTime, unit, threadId);
                          return false;
                      }
                  }
              } finally {
                  // 獲取到鎖或者拋出中斷異常,退訂redisson_lock__channel:{$KEY},不再關(guān)注解鎖事件
                  unsubscribe(subscribeFuture, threadId);
              }
          }

          接著看注釋中提到的 3 個核心點(diǎn)

          核心點(diǎn)1-嘗試加鎖:RedissonLock.tryAcquireAsync

          private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
              RFuture<Long> ttlRemainingFuture;
              // leaseTime != -1 說明沒過期
              if (leaseTime != -1) {
                  // 實(shí)質(zhì)是異步執(zhí)行加鎖Lua腳本
                  ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
              } else {
                  // 否則,已經(jīng)過期了,傳參變?yōu)樾碌臅r間(續(xù)期后)
                  ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                          TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
              }
              ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                  if (e != null) {
                      return;
                  }

                  // lock acquired
                  if (ttlRemaining == null) {
                      if (leaseTime != -1) {
                          internalLockLeaseTime = unit.toMillis(leaseTime);
                      } else {
                          // 續(xù)期
                          scheduleExpirationRenewal(threadId);
                      }
                  }
              });
              return ttlRemainingFuture;
          }

          異步執(zhí)行加鎖 Lua 腳本:RedissonLock.tryLockInnerAsync

          <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
              return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                      // 1.如果緩存中的key不存在,則執(zhí)行 hincrby 命令(hincrby key UUID+threadId 1), 設(shè)值重入次數(shù)1
                      // 然后通過 pexpire 命令設(shè)置鎖的過期時間(即鎖的租約時間)
                      // 返回空值 nil ,表示獲取鎖成功
                      "if (redis.call('exists', KEYS[1]) == 0) then " +
                              "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                              "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                              "return nil; " +
                              "end; " +
                              // 如果key已經(jīng)存在,并且value也匹配,表示是當(dāng)前線程持有的鎖,則執(zhí)行 hincrby 命令,重入次數(shù)加1,并且設(shè)置失效時間
                              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                              "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                              "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                              "return nil; " +
                              "end; " +
                              //如果key已經(jīng)存在,但是value不匹配,說明鎖已經(jīng)被其他線程持有,通過 pttl 命令獲取鎖的剩余存活時間并返回,至此獲取鎖失敗
                              "return redis.call('pttl', KEYS[1]);",
                      Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
          }
          • KEYS[1] 就是 Collections.singletonList(getName()),表示分布式鎖的key;
          • ARGV[1] 就是internalLockLeaseTime,即鎖的租約時間(持有鎖的有效時間),默認(rèn)30s;
          • ARGV[2] 就是getLockName(threadId),是獲取鎖時set的唯一值 value,即UUID+threadId

          看門狗續(xù)期:RedissonBaseLock.scheduleExpirationRenewal

          // 基于線程ID定時調(diào)度和續(xù)期
          protected void scheduleExpirationRenewal(long threadId) {
              // 新建一個ExpirationEntry記錄線程重入計(jì)數(shù)
              ExpirationEntry entry = new ExpirationEntry();
              ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
              if (oldEntry != null) {
                  // 當(dāng)前進(jìn)行的當(dāng)前線程重入加鎖
                  oldEntry.addThreadId(threadId);
              } else {
                  // 當(dāng)前進(jìn)行的當(dāng)前線程首次加鎖
                  entry.addThreadId(threadId);
                  // 首次新建ExpirationEntry需要觸發(fā)續(xù)期方法,記錄續(xù)期的任務(wù)句柄
                  renewExpiration();
              }
          }

          // 處理續(xù)期
          private void renewExpiration() {
            // 根據(jù)entryName獲取ExpirationEntry實(shí)例,如果為空,說明在cancelExpirationRenewal()方法已經(jīng)被移除,一般是解鎖的時候觸發(fā)
            ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ee == null) {
              return;
            }

            // 新建一個定時任務(wù),這個就是看門狗的實(shí)現(xiàn),io.netty.util.Timeout是Netty結(jié)合時間輪使用的定時任務(wù)實(shí)例
            Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
              @Override
              public void run(Timeout timeout) throws Exception {
                // 這里是重復(fù)外面的那個邏輯,
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                  return;
                }
                // 獲取ExpirationEntry中首個線程ID,如果為空說明調(diào)用過cancelExpirationRenewal()方法清空持有的線程重入計(jì)數(shù),一般是鎖已經(jīng)釋放的場景
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                  return;
                }
                // 向Redis異步發(fā)送續(xù)期的命令
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                  // 拋出異常,續(xù)期失敗,只打印日志和直接終止任務(wù)
                  if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                  }
                  // 返回true證明續(xù)期成功,則遞歸調(diào)用續(xù)期方法(重新調(diào)度自己),續(xù)期失敗說明對應(yīng)的鎖已經(jīng)不存在,直接返回,不再遞歸
                  if (res) {
                    // reschedule itself
                    renewExpiration();
                  } else {
                    cancelExpirationRenewal(null);
                  }
                });
              }// 這里的執(zhí)行頻率為leaseTime轉(zhuǎn)換為ms單位下的三分之一,由于leaseTime初始值為-1的情況下才會進(jìn)入續(xù)期邏輯,那么這里的執(zhí)行頻率為lockWatchdogTimeout的三分之一
            }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
            // ExpirationEntry實(shí)例持有調(diào)度任務(wù)實(shí)例
            ee.setTimeout(task);
          }

          核心點(diǎn)2-訂閱解鎖消息:RedissonLock.subscribe

          protected final LockPubSub pubSub;

          public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
            super(commandExecutor, name);
            this.commandExecutor = commandExecutor;
            this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
            //在構(gòu)造器中初始化pubSub,跟著這幾個get方法會發(fā)現(xiàn)他們都是在構(gòu)造器中初始化的,在PublishSubscribeService中會有
            // private final AsyncSemaphore[] locks = new AsyncSemaphore[50]; 這樣一段代碼,初始化了一組信號量
            this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
          }

          protected RFuture<RedissonLockEntry> subscribe(long threadId) {
            return pubSub.subscribe(getEntryName(), getChannelName());
          }

          // 在LockPubSub中注冊一個entryName -> RedissonLockEntry的哈希映射,RedissonLockEntry實(shí)例中存放著RPromise<RedissonLockEntry>結(jié)果,一個信號量形式的鎖和訂閱方法重入計(jì)數(shù)器
          public RFuture<E> subscribe(String entryName, String channelName) {
            AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
            RPromise<E> newPromise = new RedissonPromise<>();
            semaphore.acquire(() -> {
              if (!newPromise.setUncancellable()) {
                semaphore.release();
                return;
              }

              E entry = entries.get(entryName);
              if (entry != null) {
                entry.acquire();
                semaphore.release();
                entry.getPromise().onComplete(new TransferListener<E>(newPromise));
                return;
              }

              E value = createEntry(newPromise);
              value.acquire();

              E oldValue = entries.putIfAbsent(entryName, value);
              if (oldValue != null) {
                oldValue.acquire();
                semaphore.release();
                oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));
                return;
              }

              RedisPubSubListener<Object> listener = createListener(channelName, value);
              service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
            });

            return newPromise;
          }

          核心點(diǎn) 3 比較簡單,就不說了

          解鎖

          RedissonLock.unlock()

          @Override
          public void unlock() {
            try {
              // 獲取當(dāng)前調(diào)用解鎖操作的線程ID
              get(unlockAsync(Thread.currentThread().getId()));
            } catch (RedisException e) {
              // IllegalMonitorStateException一般是A線程加鎖,B線程解鎖,內(nèi)部判斷線程狀態(tài)不一致拋出的
              if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
              } else {
                throw e;
              }
            }
          }

          RedissonBaseLock.unlockAsync

          @Override
          public RFuture<Void> unlockAsync(long threadId) {
            // 構(gòu)建一個結(jié)果RedissonPromise
            RPromise<Void> result = new RedissonPromise<>();
            // 返回的RFuture如果持有的結(jié)果為true,說明解鎖成功,返回NULL說明線程ID異常,加鎖和解鎖的客戶端線程不是同一個線程
            RFuture<Boolean> future = unlockInnerAsync(threadId);

            future.onComplete((opStatus, e) -> {
              // 取消看門狗的續(xù)期任務(wù)
              cancelExpirationRenewal(threadId);

              if (e != null) {
                result.tryFailure(e);
                return;
              }

              if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                                                                                      + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
              }

              result.trySuccess(null);
            });

            return result;
          }

          RedissonLock.unlockInnerAsync

          // 真正的內(nèi)部解鎖的方法,執(zhí)行解鎖的Lua腳本
          protected RFuture<Boolean> unlockInnerAsync(long threadId) {
            return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                                  //如果分布式鎖存在,但是value不匹配,表示鎖已經(jīng)被其他線程占用,無權(quán)釋放鎖,那么直接返回空值(解鈴還須系鈴人)
                                  "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                                  "return nil;" +
                                  "end; " +
                                  //如果value匹配,則就是當(dāng)前線程占有分布式鎖,那么將重入次數(shù)減1
                                  "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                                  //重入次數(shù)減1后的值如果大于0,表示分布式鎖有重入過,那么只能更新失效時間,還不能刪除
                                  "if (counter > 0) then " +
                                  "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                                  "return 0; " +
                                  "else " +
                                  //重入次數(shù)減1后的值如果為0,這時就可以刪除這個KEY,并發(fā)布解鎖消息,返回1
                                  "redis.call('del', KEYS[1]); " +
                                  "redis.call('publish', KEYS[2], ARGV[1]); " +
                                  "return 1; " +
                                  "end; " +
                                  "return nil;",
                                  //這5個參數(shù)分別對應(yīng)KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
                                  Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
          }

          我只列出了一小部分代碼,更多的內(nèi)容還是得自己動手

          從源碼中,我們可以看到 Redisson 幫我們解決了拋出的第一個問題:失效時間設(shè)置多長時間為好?

          Redisson 提供了看門狗,每獲得一個鎖時,只設(shè)置一個很短的超時時間,同時起一個線程在每次快要到超時時間時去刷新鎖的超時時間。在釋放鎖的同時結(jié)束這個線程。

          但是沒有解決節(jié)點(diǎn)掛掉,丟失鎖的問題,接著來~

          四、RedLock

          我們上邊介紹的分布式鎖,在某些極端情況下仍然是有缺陷的

          1. 客戶端長時間內(nèi)阻塞導(dǎo)致鎖失效

            客戶端 1 得到了鎖,因?yàn)榫W(wǎng)絡(luò)問題或者 GC 等原因?qū)е麻L時間阻塞,然后業(yè)務(wù)程序還沒執(zhí)行完鎖就過期了,這時候客戶端 2 也能正常拿到鎖,可能會導(dǎo)致線程安全的問題。

          2. Redis 服務(wù)器時鐘漂移

            如果 Redis 服務(wù)器的機(jī)器時間發(fā)生了向前跳躍,就會導(dǎo)致這個 key 過早超時失效,比如說客戶端 1 拿到鎖后,key 還沒有到過期時間,但是 Redis 服務(wù)器的時間比客戶端快了 2 分鐘,導(dǎo)致 key 提前就失效了,這時候,如果客戶端 1 還沒有釋放鎖的話,就可能導(dǎo)致多個客戶端同時持有同一把鎖的問題。

          3. 單點(diǎn)實(shí)例安全問題

            如果 Redis 是單機(jī)模式的,如果掛了的話,那所有的客戶端都獲取不到鎖了,假設(shè)你是主從模式,但 Redis 的主從同步是異步進(jìn)行的,如果 Redis 主宕機(jī)了,這個時候從機(jī)并沒有同步到這一把鎖,那么機(jī)器 B 再次申請的時候就會再次申請到這把鎖,這也是問題

          為了解決這些個問題 Redis 作者提出了 RedLock 紅鎖的算法,在 Redission 中也對 RedLock 進(jìn)行了實(shí)現(xiàn)。

          Redis 官網(wǎng)對 redLock 算法的介紹大致如下:The Redlock algorithm

          在分布式版本的算法里我們假設(shè)我們有 N 個 Redis master 節(jié)點(diǎn),這些節(jié)點(diǎn)都是完全獨(dú)立的,我們不用任何復(fù)制或者其他隱含的分布式協(xié)調(diào)機(jī)制。之前我們已經(jīng)描述了在 Redis 單實(shí)例下怎么安全地獲取和釋放鎖。我們確保將在每(N) 個實(shí)例上使用此方法獲取和釋放鎖。在我們的例子里面我們設(shè)置 N=5,這是一個比較合理的設(shè)置,所以我們需要在 5 臺機(jī)器或者虛擬機(jī)上面運(yùn)行這些實(shí)例,這樣保證他們不會同時都宕掉。為了取到鎖,客戶端應(yīng)該執(zhí)行以下操作:

          1. 獲取當(dāng)前 Unix 時間,以毫秒為單位。
          2. 依次嘗試從 5 個實(shí)例,使用相同的 key 和具有唯一性的 value(例如UUID)獲取鎖。當(dāng)向 Redis 請求獲取鎖時,客戶端應(yīng)該設(shè)置一個嘗試從某個 Reids 實(shí)例獲取鎖的最大等待時間(超過這個時間,則立馬詢問下一個實(shí)例),這個超時時間應(yīng)該小于鎖的失效時間。例如你的鎖自動失效時間為 10 秒,則超時時間應(yīng)該在 5-50 毫秒之間。這樣可以避免服務(wù)器端 Redis 已經(jīng)掛掉的情況下,客戶端還在死死地等待響應(yīng)結(jié)果。如果服務(wù)器端沒有在規(guī)定時間內(nèi)響應(yīng),客戶端應(yīng)該盡快嘗試去另外一個 Redis 實(shí)例請求獲取鎖。
          3. 客戶端使用當(dāng)前時間減去開始獲取鎖時間(步驟1記錄的時間)就得到獲取鎖消耗的時間。當(dāng)且僅當(dāng)從大多數(shù)(N/2+1,這里是3個節(jié)點(diǎn))的 Redis 節(jié)點(diǎn)都取到鎖,并且使用的總耗時小于鎖失效時間時,鎖才算獲取成功。
          4. 如果取到了鎖,key 的真正有效時間 = 有效時間(獲取鎖時設(shè)置的 key 的自動超時時間) - 獲取鎖的總耗時(詢問各個 Redis 實(shí)例的總耗時之和)(步驟 3 計(jì)算的結(jié)果)。
          5. 如果因?yàn)槟承┰颍罱K獲取鎖失敗(即沒有在至少 “N/2+1 ”個 Redis 實(shí)例取到鎖或者“獲取鎖的總耗時”超過了“有效時間”),客戶端應(yīng)該在所有的 Redis 實(shí)例上進(jìn)行解鎖(即便某些 Redis 實(shí)例根本就沒有加鎖成功,這樣可以防止某些節(jié)點(diǎn)獲取到鎖但是客戶端沒有得到響應(yīng)而導(dǎo)致接下來的一段時間不能被重新獲取鎖)。

          總結(jié)下就是:

          1. 客戶端在多個 Redis 實(shí)例上申請加鎖,必須保證大多數(shù)節(jié)點(diǎn)加鎖成功

            解決容錯性問題,部分實(shí)例異常,剩下的還能加鎖成功

          2. 大多數(shù)節(jié)點(diǎn)加鎖的總耗時,要小于鎖設(shè)置的過期時間

            多實(shí)例操作,可能存在網(wǎng)絡(luò)延遲、丟包、超時等問題,所以就算是大多數(shù)節(jié)點(diǎn)加鎖成功,如果加鎖的累積耗時超過了鎖的過期時間,那有些節(jié)點(diǎn)上的鎖可能也已經(jīng)失效了,還是沒有意義的

          3. 釋放鎖,要向全部節(jié)點(diǎn)發(fā)起釋放鎖請求

            如果部分節(jié)點(diǎn)加鎖成功,但最后由于異常導(dǎo)致大部分節(jié)點(diǎn)沒加鎖成功,就要釋放掉所有的,各節(jié)點(diǎn)要保持一致

          關(guān)于 RedLock,兩位分布式大佬,Antirez 和 Martin 還進(jìn)行過一場爭論,感興趣的也可以看看

          Config config1 = new Config();
          config1.useSingleServer().setAddress("127.0.0.1:6379");
          RedissonClient redissonClient1 = Redisson.create(config1);

          Config config2 = new Config();
          config2.useSingleServer().setAddress("127.0.0.1:5378");
          RedissonClient redissonClient2 = Redisson.create(config2);

          Config config3 = new Config();
          config3.useSingleServer().setAddress("127.0.0.1:5379");
          RedissonClient redissonClient3 = Redisson.create(config3);

          /**
           * 獲取多個 RLock 對象
           */

          RLock lock1 = redissonClient1.getLock(lockKey);
          RLock lock2 = redissonClient2.getLock(lockKey);
          RLock lock3 = redissonClient3.getLock(lockKey);

          /**
           * 根據(jù)多個 RLock 對象構(gòu)建 RedissonRedLock (最核心的差別就在這里)
           */

          RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);

          try {
              /**
               * 4.嘗試獲取鎖
               * waitTimeout 嘗試獲取鎖的最大等待時間,超過這個值,則認(rèn)為獲取鎖失敗
               * leaseTime   鎖的持有時間,超過這個時間鎖會自動失效(值應(yīng)設(shè)置為大于業(yè)務(wù)處理的時間,確保在鎖有效期內(nèi)業(yè)務(wù)能處理完)
               */

              boolean res = redLock.tryLock(10010, TimeUnit.SECONDS);
              if (res) {
                  //成功獲得鎖,在這里處理業(yè)務(wù)
              }
          catch (Exception e) {
              throw new RuntimeException("aquire lock fail");
          }finally{
              //無論如何, 最后都要解鎖
              redLock.unlock();
          }

          最核心的變化就是需要構(gòu)建多個 RLock ,然后根據(jù)多個 RLock 構(gòu)建成一個 RedissonRedLock,因?yàn)?redLock 算法是建立在多個互相獨(dú)立的 Redis 環(huán)境之上的(為了區(qū)分可以叫為 Redission node),Redission node 節(jié)點(diǎn)既可以是單機(jī)模式(single),也可以是主從模式(master/salve),哨兵模式(sentinal),或者集群模式(cluster)。這就意味著,不能跟以往這樣只搭建 1個 cluster、或 1個 sentinel 集群,或是1套主從架構(gòu)就了事了,需要為 RedissonRedLock 額外搭建多幾套獨(dú)立的 Redission 節(jié)點(diǎn)。

          RedissonMultiLock.tryLock

          @Override
          public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
            //        try {
            //            return tryLockAsync(waitTime, leaseTime, unit).get();
            //        } catch (ExecutionException e) {
            //            throw new IllegalStateException(e);
            //        }
            long newLeaseTime = -1;
            if (leaseTime != -1) {
              if (waitTime == -1) {
                newLeaseTime = unit.toMillis(leaseTime);
              } else {
                newLeaseTime = unit.toMillis(waitTime)*2;
              }
            }

            long time = System.currentTimeMillis();
            long remainTime = -1;
            if (waitTime != -1) {
              remainTime = unit.toMillis(waitTime);
            }
            long lockWaitTime = calcLockWaitTime(remainTime);

            //允許加鎖失敗節(jié)點(diǎn)個數(shù)限制(N-(N/2+1))
            int failedLocksLimit = failedLocksLimit();
            List<RLock> acquiredLocks = new ArrayList<>(locks.size());
            // 遍歷所有節(jié)點(diǎn)通過EVAL命令執(zhí)行l(wèi)ua加鎖
            for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
              RLock lock = iterator.next();
              boolean lockAcquired;
              try {
                // 對節(jié)點(diǎn)嘗試加鎖
                if (waitTime == -1 && leaseTime == -1) {
                  lockAcquired = lock.tryLock();
                } else {
                  long awaitTime = Math.min(lockWaitTime, remainTime);
                  lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
                }
              } catch (RedisResponseTimeoutException e) {
                // 如果拋出這類異常,為了防止加鎖成功,但是響應(yīng)失敗,需要解鎖所有節(jié)點(diǎn)
                unlockInner(Arrays.asList(lock));
                lockAcquired = false;
              } catch (Exception e) {
                lockAcquired = false;
              }

              if (lockAcquired) {
                acquiredLocks.add(lock);
              } else {
                /*
                 *  計(jì)算已經(jīng)申請鎖失敗的節(jié)點(diǎn)是否已經(jīng)到達(dá) 允許加鎖失敗節(jié)點(diǎn)個數(shù)限制 (N-(N/2+1))
                 * 如果已經(jīng)到達(dá), 就認(rèn)定最終申請鎖失敗,則沒有必要繼續(xù)從后面的節(jié)點(diǎn)申請了
                 * 因?yàn)?nbsp;Redlock 算法要求至少N/2+1 個節(jié)點(diǎn)都加鎖成功,才算最終的鎖申請成功
                 */

                if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                  break;
                }

                if (failedLocksLimit == 0) {
                  unlockInner(acquiredLocks);
                  if (waitTime == -1) {
                    return false;
                  }
                  failedLocksLimit = failedLocksLimit();
                  acquiredLocks.clear();
                  // reset iterator
                  while (iterator.hasPrevious()) {
                    iterator.previous();
                  }
                } else {
                  failedLocksLimit--;
                }
              }
              //計(jì)算 目前從各個節(jié)點(diǎn)獲取鎖已經(jīng)消耗的總時間,如果已經(jīng)等于最大等待時間,則認(rèn)定最終申請鎖失敗,返回false
              if (remainTime != -1) {
                remainTime -= System.currentTimeMillis() - time;
                time = System.currentTimeMillis();
                if (remainTime <= 0) {
                  unlockInner(acquiredLocks);
                  return false;
                }
              }
            }

            if (leaseTime != -1) {
              acquiredLocks.stream()
                .map(l -> (RedissonLock) l)
                .map(l -> l.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS))
                .forEach(f -> f.syncUninterruptibly());
            }

            return true;
          }
          參考與感謝
          • 《Redis —— Distributed locks with Redis》
          • 《Redisson —— Distributed locks and synchronizers》
          • 慢談 Redis 實(shí)現(xiàn)分布式鎖 以及 Redisson 源碼解析
          • 理解Redisson中分布式鎖的實(shí)現(xiàn)

          圖解 git 常用命令



          瀏覽 42
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  午夜造逼 | 青青草综合网 | 肏屄视频在线 | 国产精品美女 | 国产女人水真多18精品 |