<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          聊聊分布式鎖——Redis和Redisson的方式

          共 45405字,需瀏覽 91分鐘

           ·

          2021-09-27 17:08

          點(diǎn)擊藍(lán)色“JavaKeeper”關(guān)注我喲
          加個(gè)“星標(biāo)”,一起成長(zhǎng),做牛逼閃閃的技術(shù)人

          Keeper導(dǎo)讀:分布式鎖的文章其實(shí)早就爛大街了,但有些“菜鳥”寫的太淺,或者自己估計(jì)都沒搞明白,沒用過,看完后我更懵逼了,有些“大牛”寫的吧,又太高級(jí),只能看懂前半部分,后邊就開始講論文了,也比較懵逼,所以還得我這個(gè)中不溜的來總結(jié)下。文章攏共分為幾個(gè)部分:

          • 什么是分布式鎖

          • 分布式鎖的實(shí)現(xiàn)要求

          • 基于 Redisson 實(shí)現(xiàn)的 Redis 分布式鎖

          一、什么是分布式鎖

          分布式~~鎖,要這么念,首先得是『分布式』,然后才是『鎖』

          • 分布式:這里的分布式指的是分布式系統(tǒng),涉及到好多技術(shù)和理論,包括CAP 理論、分布式存儲(chǔ)、分布式事務(wù)、分布式鎖...

            分布式系統(tǒng)是由一組通過網(wǎng)絡(luò)進(jìn)行通信、為了完成共同的任務(wù)而協(xié)調(diào)工作的計(jì)算機(jī)節(jié)點(diǎn)組成的系統(tǒng)。

            分布式系統(tǒng)的出現(xiàn)是為了用廉價(jià)的、普通的機(jī)器完成單個(gè)計(jì)算機(jī)無法完成的計(jì)算、存儲(chǔ)任務(wù)。其目的是利用更多的機(jī)器,處理更多的數(shù)據(jù)

          • 鎖:對(duì)對(duì),就是你想的那個(gè),Javer 學(xué)的第一個(gè)鎖應(yīng)該就是 synchronized

            Java 初級(jí)面試問題,來拼寫下 賽克瑞納挨日的

            從鎖的使用場(chǎng)景有來看下邊這 3 種鎖:

            線程鎖synchronized 是用在方法或代碼塊中的,我們把它叫『線程鎖』,線程鎖的實(shí)現(xiàn)其實(shí)是靠線程之間共享內(nèi)存實(shí)現(xiàn)的,說白了就是內(nèi)存中的一個(gè)整型數(shù),有空閑、上鎖這類狀態(tài),比如 synchronized 是在對(duì)象頭中的 Mark Word 有個(gè)鎖狀態(tài)標(biāo)志,Lock 的實(shí)現(xiàn)類大部分都有個(gè)叫 volatile int state 的共享變量來做狀態(tài)標(biāo)志。

            進(jìn)程鎖:為了控制同一操作系統(tǒng)中多個(gè)進(jìn)程訪問某個(gè)共享資源,因?yàn)檫M(jìn)程具有獨(dú)立性,各個(gè)進(jìn)程無法訪問其他進(jìn)程的資源,因此無法通過 synchronized 等線程鎖實(shí)現(xiàn)進(jìn)程鎖。比如說,我們的同一個(gè) linux 服務(wù)器,部署了好幾個(gè) Java 項(xiàng)目,有可能同時(shí)訪問或操作服務(wù)器上的相同數(shù)據(jù),這就需要進(jìn)程鎖,一般可以用『文件鎖』來達(dá)到進(jìn)程互斥。

            分布式鎖:隨著用戶越來越多,我們上了好多服務(wù)器,原本有個(gè)定時(shí)給客戶發(fā)郵件的任務(wù),如果不加以控制的話,到點(diǎn)后每臺(tái)機(jī)器跑一次任務(wù),客戶就會(huì)收到 N 條郵件,這就需要通過分布式鎖來互斥了。

            書面解釋:分布式鎖是控制分布式系統(tǒng)或不同系統(tǒng)之間共同訪問共享資源的一種鎖實(shí)現(xiàn),如果不同的系統(tǒng)或同一個(gè)系統(tǒng)的不同主機(jī)之間共享了某個(gè)資源時(shí),往往需要互斥來防止彼此干擾來保證一致性。

          知道了什么是分布式鎖,接下來就到了技術(shù)選型環(huán)節(jié)

          二、分布式鎖要怎么搞

          要實(shí)現(xiàn)一個(gè)分布式鎖,我們一般選擇集群機(jī)器都可以操作的外部系統(tǒng),然后各個(gè)機(jī)器都去這個(gè)外部系統(tǒng)申請(qǐng)鎖。

          這個(gè)外部系統(tǒng)一般需要滿足如下要求才能勝任:

          1. 互斥:在任意時(shí)刻,只能有一個(gè)客戶端能持有鎖。
          2. 防止死鎖:即使有一個(gè)客戶端在持有鎖的期間崩潰而沒有主動(dòng)解鎖,也能保證后續(xù)其他客戶端能加鎖。所以鎖一般要有一個(gè)過期時(shí)間。
          3. 獨(dú)占性:解鈴還須系鈴人,加鎖和解鎖必須是同一個(gè)客戶端,一把鎖只能有一把鑰匙,客戶端自己的鎖不能被別人給解開,當(dāng)然也不能去開別人的鎖。
          4. 容錯(cuò):外部系統(tǒng)不能太“脆弱”,要保證外部系統(tǒng)的正常運(yùn)行,客戶端才可以加鎖和解鎖。

          我覺得可以這么類比:

          好多商販要租用某個(gè)倉(cāng)庫,同一時(shí)刻,只能給一個(gè)商販租用,且只能有一把鑰匙,還得有固定的“租期”,到期后要回收的,當(dāng)然最重要的是倉(cāng)庫門不能壞了,要不鎖都鎖不住。這不就是分布式鎖嗎?

          感慨自己真是個(gè)愛技術(shù)愛生活的程序猿~~

          其實(shí)鎖,本質(zhì)上就是用來進(jìn)行防重操作的(數(shù)據(jù)一致性),像查詢這種冪等操作,就不需要費(fèi)這勁

          直接上結(jié)論:

          分布式鎖一般有三種實(shí)現(xiàn)方式:1. 數(shù)據(jù)庫樂觀鎖;2. 基于 Redis 的分布式鎖;3. 基于 ZooKeeper 的分布式鎖。

          但為了追求更好的性能,我們通常會(huì)選擇使用 Redis 或 Zookeeper 來做。

          想必也有喜歡問為什么的同學(xué),那數(shù)據(jù)庫客觀鎖怎么就性能不好了?

          使用數(shù)據(jù)庫樂觀鎖,包括主鍵防重,版本號(hào)控制。但是這兩種方法各有利弊。

          • 使用主鍵沖突的策略進(jìn)行防重,在并發(fā)量非常高的情況下對(duì)數(shù)據(jù)庫性能會(huì)有影響,尤其是應(yīng)用數(shù)據(jù)表和主鍵沖突表在一個(gè)庫的時(shí)候,表現(xiàn)更加明顯。還有就是在 MySQL 數(shù)據(jù)庫中采用主鍵沖突防重,在大并發(fā)情況下有可能會(huì)造成鎖表現(xiàn)象,比較好的辦法是在程序中生產(chǎn)主鍵進(jìn)行防重。

          • 使用版本號(hào)策略

            這個(gè)策略源于 MySQL 的  MVCC 機(jī)制,使用這個(gè)策略其實(shí)本身沒有什么問題,唯一的問題就是對(duì)數(shù)據(jù)表侵入較大,我們要為每個(gè)表設(shè)計(jì)一個(gè)版本號(hào)字段,然后寫一條判斷 SQL 每次進(jìn)行判斷。

          第三趴,編碼

          三、基于 Redis 的分布式鎖

          其實(shí) Redis 官網(wǎng)已經(jīng)給出了實(shí)現(xiàn):https://redis.io/topics/distlock,說各種書籍和博客用了各種手段去用 Redis 實(shí)現(xiàn)分布式鎖,建議用 Redlock 實(shí)現(xiàn),這樣更規(guī)范、更安全。我們循序漸進(jìn)來看

          我們默認(rèn)指定大家用的是 Redis 2.6.12 及更高的版本,就不再去講 setnxexpire 這種了,直接 set 命令加鎖

          set key value[expiration EX seconds|PX milliseconds] [NX|XX]

          eg:

          SET resource_name my_random_value NX PX 30000

          SET 命令的行為可以通過一系列參數(shù)來修改

          • EX second :設(shè)置鍵的過期時(shí)間為 second 秒。SET key value EX second 效果等同于 SETEX key second value
          • PX millisecond :設(shè)置鍵的過期時(shí)間為 millisecond 毫秒。SET key value PX millisecond 效果等同于 PSETEX key millisecond value
          • NX :只在鍵不存在時(shí),才對(duì)鍵進(jìn)行設(shè)置操作。SET key value NX 效果等同于 SETNX key value
          • XX :只在鍵已經(jīng)存在時(shí),才對(duì)鍵進(jìn)行設(shè)置操作。

          這條指令的意思:當(dāng) key——resource_name 不存在時(shí)創(chuàng)建這樣的key,設(shè)值為 my_random_value,并設(shè)置過期時(shí)間 30000 毫秒。

          別看這干了兩件事,因?yàn)?Redis 是單線程的,這一條指令不會(huì)被打斷,所以是原子性的操作。

          Redis 實(shí)現(xiàn)分布式鎖的主要步驟:

          1. 指定一個(gè) key 作為鎖標(biāo)記,存入 Redis 中,指定一個(gè) 唯一的標(biāo)識(shí) 作為 value。
          2. 當(dāng) key 不存在時(shí)才能設(shè)置值,確保同一時(shí)間只有一個(gè)客戶端進(jìn)程獲得鎖,滿足 互斥性 特性。
          3. 設(shè)置一個(gè)過期時(shí)間,防止因系統(tǒng)異常導(dǎo)致沒能刪除這個(gè) key,滿足 防死鎖 特性。
          4. 當(dāng)處理完業(yè)務(wù)之后需要清除這個(gè) key 來釋放鎖,清除 key 時(shí)需要校驗(yàn) value 值,需要滿足 解鈴還須系鈴人

          設(shè)置一個(gè)隨機(jī)值的意思是在解鎖時(shí)候判斷 key 的值和我們存儲(chǔ)的隨機(jī)數(shù)是不是一樣,一樣的話,才是自己的鎖,直接 del 解鎖就行。

          當(dāng)然這個(gè)兩個(gè)操作要保證原子性,所以 Redis 給出了一段 lua 腳本(Redis 服務(wù)器會(huì)單線程原子性執(zhí)行 lua 腳本,保證 lua 腳本在處理的過程中不會(huì)被任意其它請(qǐng)求打斷。):

          if redis.call("get",KEYS[1]) == ARGV[1then
              return redis.call("del",KEYS[1])
          else
              return 0
          end

          問題:

          我們先拋出兩個(gè)問題思考:

          1. 獲取鎖時(shí),過期時(shí)間要設(shè)置多少合適呢?

            預(yù)估一個(gè)合適的時(shí)間,其實(shí)沒那么容易,比如操作資源的時(shí)間最慢可能要 10 s,而我們只設(shè)置了 5 s  就過期,那就存在鎖提前過期的風(fēng)險(xiǎn)。這個(gè)問題先記下,我們先看下 Javaer 要怎么在代碼中用 Redis 鎖。

          2. 容錯(cuò)性如何保證呢?

            Redis 掛了怎么辦,你可能會(huì)說上主從、上集群,但也會(huì)出現(xiàn)這樣的極端情況,當(dāng)我們上鎖后,主節(jié)點(diǎn)就掛了,這個(gè)時(shí)候還沒來的急同步到從節(jié)點(diǎn),主從切換后鎖還是丟了

          帶著這兩個(gè)問題,我們接著看

          Redisson 實(shí)現(xiàn)代碼

          redisson 是 Redis 官方的分布式鎖組件。GitHub 地址:https://github.com/redisson/redisson

          Redisson 是一個(gè)在 Redis 的基礎(chǔ)上實(shí)現(xiàn)的 Java 駐內(nèi)存數(shù)據(jù)網(wǎng)格(In-Memory Data Grid)。它不僅提供了一系列的分布式的 Java 常用對(duì)象,還實(shí)現(xiàn)了可重入鎖(Reentrant Lock)、公平鎖(Fair Lock、聯(lián)鎖(MultiLock)、 紅鎖(RedLock)、 讀寫鎖(ReadWriteLock)等,還提供了許多分布式服務(wù)。Redisson 提供了使用 Redis 的最簡(jiǎn)單和最便捷的方法。Redisson 的宗旨是促進(jìn)使用者對(duì) Redis 的關(guān)注分離(Separation of Concern),從而讓使用者能夠?qū)⒕Ω械胤旁谔幚順I(yè)務(wù)邏輯上。

          redisson 現(xiàn)在已經(jīng)很強(qiáng)大了,github 的 wiki 也很詳細(xì),分布式鎖的介紹直接戳 Distributed locks and synchronizers

          Redisson 支持單點(diǎn)模式、主從模式、哨兵模式、集群模式,只是配置的不同,我們以單點(diǎn)模式來看下怎么使用,代碼很簡(jiǎn)單,都已經(jīng)為我們封裝好了,直接拿來用就好,詳細(xì)的demo,我放在了 github: starfish-learn-redisson 上,這里就不一步步來了

          RLock lock = redisson.getLock("myLock");

          RLock 提供了各種鎖方法,我們來解讀下這個(gè)接口方法,

          注:代碼為 3.16.2 版本,可以看到繼承自 JDK 的 Lock 接口,和 Reddsion 的異步鎖接口 RLockAsync(這個(gè)我們先不研究)

          RLock

          public interface RLock extends LockRLockAsync {

              /**
               * 獲取鎖的名字
               */

              String getName();
              
              /**
               * 這個(gè)叫終端鎖操作,表示該鎖可以被中斷 假如A和B同時(shí)調(diào)這個(gè)方法,A獲取鎖,B為獲取鎖,那么B線程可以通過
               * Thread.currentThread().interrupt(); 方法真正中斷該線程
               */

              void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException;

              /**
               * 這個(gè)應(yīng)該是最常用的,嘗試獲取鎖
               * waitTimeout 嘗試獲取鎖的最大等待時(shí)間,超過這個(gè)值,則認(rèn)為獲取鎖失敗
               * leaseTime   鎖的持有時(shí)間,超過這個(gè)時(shí)間鎖會(huì)自動(dòng)失效(值應(yīng)設(shè)置為大于業(yè)務(wù)處理的時(shí)間,確保在鎖有效期內(nèi)業(yè)務(wù)能處理完)
               */

              boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;

              /**
               * 鎖的有效期設(shè)置為 leaseTime,過期后自動(dòng)失效
               * 如果 leaseTime 設(shè)置為 -1, 表示不主動(dòng)過期
               */

              void lock(long leaseTime, TimeUnit unit);

              /**
               * Unlocks the lock independently of its state
               */

              boolean forceUnlock();

              /**
               * 檢查是否被另一個(gè)線程鎖住
               */

              boolean isLocked();

              /**
               * 檢查當(dāng)前線線程是否持有該鎖
               */

              boolean isHeldByCurrentThread();
            
               /**
               *  這個(gè)就明了了,檢查指定線程是否持有鎖
               */

              boolean isHeldByThread(long threadId);

              /**
               * 返回當(dāng)前線程持有鎖的次數(shù)
               */

              int getHoldCount();

              /**
               * 返回鎖的剩余時(shí)間
               * @return time in milliseconds
               *          -2 if the lock does not exist.
               *          -1 if the lock exists but has no associated expire.
               */

              long remainTimeToLive();
              
          }

          Demo

          Config config = new Config();
          config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("").setDatabase(1);
          RedissonClient redissonClient = Redisson.create(config);
          RLock disLock = redissonClient.getLock("mylock");
          boolean isLock;
          try {
            /**
             * 嘗試獲取鎖的最大等待時(shí)間是 100 秒,超過這個(gè)值還沒獲取到,就認(rèn)為獲取失敗
             * 鎖的持有時(shí)間是 10 秒
             */

            isLock = disLock.tryLock(10010, TimeUnit.MILLISECONDS);
            if (isLock) {
              //做自己的業(yè)務(wù)
              Thread.sleep(10000);
            }
          catch (Exception e) {
             e.printStackTrace();
          finally {
             disLock.unlock();
          }

          就是這么簡(jiǎn)單,Redisson 已經(jīng)做好了封裝,使用起來 so easy,如果使用主從、哨兵、集群這種也只是配置不同。

          原理

          看源碼小 tips,最好是 fork 到自己的倉(cāng)庫,然后拉到本地,邊看邊注釋,然后提交到自己的倉(cāng)庫,也方便之后再看,不想這么麻煩的,也可以直接看我的 Jstarfish/redisson

          先看下 RLock 的類關(guān)系

          跟著源碼,可以發(fā)現(xiàn) RedissonLock 是 RLock 的直接實(shí)現(xiàn),也是我們加鎖、解鎖操作的核心類

          加鎖

          主要的加鎖方法就下邊這兩個(gè),區(qū)別也很簡(jiǎn)單,一個(gè)有等待時(shí)間,一個(gè)沒有,所以我們挑個(gè)復(fù)雜的看(源碼包含了另一個(gè)的絕大部分)

          boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
          void lock(long leaseTime, TimeUnit unit);

          RedissonLock.tryLock

          @Override
          public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
              // 獲取等鎖的最長(zhǎng)時(shí)間
              long time = unit.toMillis(waitTime);
              long current = System.currentTimeMillis();
              //取得當(dāng)前線程id(判斷是否可重入鎖的關(guān)鍵)
              long threadId = Thread.currentThread().getId();
              // 【核心點(diǎn)1】嘗試獲取鎖,若返回值為null,則表示已獲取到鎖,返回的ttl就是key的剩余存活時(shí)間
              Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
              if (ttl == null) {
                  return true;
              }
              // 還可以容忍的等待時(shí)長(zhǎng) = 獲取鎖能容忍的最大等待時(shí)長(zhǎng) - 執(zhí)行完上述操作流程的時(shí)間
              time -= System.currentTimeMillis() - current;
              if (time <= 0) {
                  //等不到了,直接返回失敗
                  acquireFailed(waitTime, unit, threadId);
                  return false;
              }

              current = System.currentTimeMillis();
              /**
               * 【核心點(diǎn)2】
               * 訂閱解鎖消息 redisson_lock__channel:{$KEY},并通過await方法阻塞等待鎖釋放,解決了無效的鎖申請(qǐng)浪費(fèi)資源的問題:
               * 基于信息量,當(dāng)鎖被其它資源占用時(shí),當(dāng)前線程通過 Redis 的 channel 訂閱鎖的釋放事件,一旦鎖釋放會(huì)發(fā)消息通知待等待的線程進(jìn)行競(jìng)爭(zhēng)
               * 當(dāng) this.await返回false,說明等待時(shí)間已經(jīng)超出獲取鎖最大等待時(shí)間,取消訂閱并返回獲取鎖失敗
               * 當(dāng) this.await返回true,進(jìn)入循環(huán)嘗試獲取鎖
               */

              RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
              //await 方法內(nèi)部是用CountDownLatch來實(shí)現(xiàn)阻塞,獲取subscribe異步執(zhí)行的結(jié)果(應(yīng)用了Netty 的 Future)
              if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
                  if (!subscribeFuture.cancel(false)) {
                      subscribeFuture.onComplete((res, e) -> {
                          if (e == null) {
                              unsubscribe(subscribeFuture, threadId);
                          }
                      });
                  }
                  acquireFailed(waitTime, unit, threadId);
                  return false;
              }

              // ttl 不為空,表示已經(jīng)有這樣的key了,只能阻塞等待
              try {
                  time -= System.currentTimeMillis() - current;
                  if (time <= 0) {
                      acquireFailed(waitTime, unit, threadId);
                      return false;
                  }

                  // 來個(gè)死循環(huán),繼續(xù)嘗試著獲取鎖
                  while (true) {
                      long currentTime = System.currentTimeMillis();
                      ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                      if (ttl == null) {
                          return true;
                      }

                      time -= System.currentTimeMillis() - currentTime;
                      if (time <= 0) {
                          acquireFailed(waitTime, unit, threadId);
                          return false;
                      }

                      currentTime = System.currentTimeMillis();

                     /**
                      * 【核心點(diǎn)3】根據(jù)鎖TTL,調(diào)整阻塞等待時(shí)長(zhǎng);
                      * 1、latch其實(shí)是個(gè)信號(hào)量Semaphore,調(diào)用其tryAcquire方法會(huì)讓當(dāng)前線程阻塞一段時(shí)間,避免在while循環(huán)中頻繁請(qǐng)求獲鎖;
                      *  當(dāng)其他線程釋放了占用的鎖,會(huì)廣播解鎖消息,監(jiān)聽器接收解鎖消息,并釋放信號(hào)量,最終會(huì)喚醒阻塞在這里的線程
                      * 2、該Semaphore的release方法,會(huì)在訂閱解鎖消息的監(jiān)聽器消息處理方法org.redisson.pubsub.LockPubSub#onMessage調(diào)用;
                      */

                      //調(diào)用信號(hào)量的方法來阻塞線程,時(shí)長(zhǎng)為鎖等待時(shí)間和租期時(shí)間中較小的那個(gè)
                      if (ttl >= 0 && ttl < time) {
                          subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                      } else {
                          subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                      }

                      time -= System.currentTimeMillis() - currentTime;
                      if (time <= 0) {
                          acquireFailed(waitTime, unit, threadId);
                          return false;
                      }
                  }
              } finally {
                  // 獲取到鎖或者拋出中斷異常,退訂redisson_lock__channel:{$KEY},不再關(guān)注解鎖事件
                  unsubscribe(subscribeFuture, threadId);
              }
          }

          接著看注釋中提到的 3 個(gè)核心點(diǎn)

          核心點(diǎn)1-嘗試加鎖:RedissonLock.tryAcquireAsync

          private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
              RFuture<Long> ttlRemainingFuture;
              // leaseTime != -1 說明沒過期
              if (leaseTime != -1) {
                  // 實(shí)質(zhì)是異步執(zhí)行加鎖Lua腳本
                  ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
              } else {
                  // 否則,已經(jīng)過期了,傳參變?yōu)樾碌臅r(shí)間(續(xù)期后)
                  ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                          TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
              }
              ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                  if (e != null) {
                      return;
                  }

                  // lock acquired
                  if (ttlRemaining == null) {
                      if (leaseTime != -1) {
                          internalLockLeaseTime = unit.toMillis(leaseTime);
                      } else {
                          // 續(xù)期
                          scheduleExpirationRenewal(threadId);
                      }
                  }
              });
              return ttlRemainingFuture;
          }

          異步執(zhí)行加鎖 Lua 腳本:RedissonLock.tryLockInnerAsync

          <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
              return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                      // 1.如果緩存中的key不存在,則執(zhí)行 hincrby 命令(hincrby key UUID+threadId 1), 設(shè)值重入次數(shù)1
                      // 然后通過 pexpire 命令設(shè)置鎖的過期時(shí)間(即鎖的租約時(shí)間)
                      // 返回空值 nil ,表示獲取鎖成功
                      "if (redis.call('exists', KEYS[1]) == 0) then " +
                              "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                              "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                              "return nil; " +
                              "end; " +
                              // 如果key已經(jīng)存在,并且value也匹配,表示是當(dāng)前線程持有的鎖,則執(zhí)行 hincrby 命令,重入次數(shù)加1,并且設(shè)置失效時(shí)間
                              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                              "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                              "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                              "return nil; " +
                              "end; " +
                              //如果key已經(jīng)存在,但是value不匹配,說明鎖已經(jīng)被其他線程持有,通過 pttl 命令獲取鎖的剩余存活時(shí)間并返回,至此獲取鎖失敗
                              "return redis.call('pttl', KEYS[1]);",
                      Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
          }
          • KEYS[1] 就是 Collections.singletonList(getName()),表示分布式鎖的key;
          • ARGV[1] 就是internalLockLeaseTime,即鎖的租約時(shí)間(持有鎖的有效時(shí)間),默認(rèn)30s;
          • ARGV[2] 就是getLockName(threadId),是獲取鎖時(shí)set的唯一值 value,即UUID+threadId

          看門狗續(xù)期:RedissonBaseLock.scheduleExpirationRenewal

          // 基于線程ID定時(shí)調(diào)度和續(xù)期
          protected void scheduleExpirationRenewal(long threadId) {
              // 新建一個(gè)ExpirationEntry記錄線程重入計(jì)數(shù)
              ExpirationEntry entry = new ExpirationEntry();
              ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
              if (oldEntry != null) {
                  // 當(dāng)前進(jìn)行的當(dāng)前線程重入加鎖
                  oldEntry.addThreadId(threadId);
              } else {
                  // 當(dāng)前進(jìn)行的當(dāng)前線程首次加鎖
                  entry.addThreadId(threadId);
                  // 首次新建ExpirationEntry需要觸發(fā)續(xù)期方法,記錄續(xù)期的任務(wù)句柄
                  renewExpiration();
              }
          }

          // 處理續(xù)期
          private void renewExpiration() {
            // 根據(jù)entryName獲取ExpirationEntry實(shí)例,如果為空,說明在cancelExpirationRenewal()方法已經(jīng)被移除,一般是解鎖的時(shí)候觸發(fā)
            ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ee == null) {
              return;
            }

            // 新建一個(gè)定時(shí)任務(wù),這個(gè)就是看門狗的實(shí)現(xiàn),io.netty.util.Timeout是Netty結(jié)合時(shí)間輪使用的定時(shí)任務(wù)實(shí)例
            Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
              @Override
              public void run(Timeout timeout) throws Exception {
                // 這里是重復(fù)外面的那個(gè)邏輯,
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                  return;
                }
                // 獲取ExpirationEntry中首個(gè)線程ID,如果為空說明調(diào)用過cancelExpirationRenewal()方法清空持有的線程重入計(jì)數(shù),一般是鎖已經(jīng)釋放的場(chǎng)景
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                  return;
                }
                // 向Redis異步發(fā)送續(xù)期的命令
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                  // 拋出異常,續(xù)期失敗,只打印日志和直接終止任務(wù)
                  if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                  }
                  // 返回true證明續(xù)期成功,則遞歸調(diào)用續(xù)期方法(重新調(diào)度自己),續(xù)期失敗說明對(duì)應(yīng)的鎖已經(jīng)不存在,直接返回,不再遞歸
                  if (res) {
                    // reschedule itself
                    renewExpiration();
                  } else {
                    cancelExpirationRenewal(null);
                  }
                });
              }// 這里的執(zhí)行頻率為leaseTime轉(zhuǎn)換為ms單位下的三分之一,由于leaseTime初始值為-1的情況下才會(huì)進(jìn)入續(xù)期邏輯,那么這里的執(zhí)行頻率為lockWatchdogTimeout的三分之一
            }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
            // ExpirationEntry實(shí)例持有調(diào)度任務(wù)實(shí)例
            ee.setTimeout(task);
          }

          核心點(diǎn)2-訂閱解鎖消息:RedissonLock.subscribe

          protected final LockPubSub pubSub;

          public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
            super(commandExecutor, name);
            this.commandExecutor = commandExecutor;
            this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
            //在構(gòu)造器中初始化pubSub,跟著這幾個(gè)get方法會(huì)發(fā)現(xiàn)他們都是在構(gòu)造器中初始化的,在PublishSubscribeService中會(huì)有
            // private final AsyncSemaphore[] locks = new AsyncSemaphore[50]; 這樣一段代碼,初始化了一組信號(hào)量
            this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
          }

          protected RFuture<RedissonLockEntry> subscribe(long threadId) {
            return pubSub.subscribe(getEntryName(), getChannelName());
          }

          // 在LockPubSub中注冊(cè)一個(gè)entryName -> RedissonLockEntry的哈希映射,RedissonLockEntry實(shí)例中存放著RPromise<RedissonLockEntry>結(jié)果,一個(gè)信號(hào)量形式的鎖和訂閱方法重入計(jì)數(shù)器
          public RFuture<E> subscribe(String entryName, String channelName) {
            AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
            RPromise<E> newPromise = new RedissonPromise<>();
            semaphore.acquire(() -> {
              if (!newPromise.setUncancellable()) {
                semaphore.release();
                return;
              }

              E entry = entries.get(entryName);
              if (entry != null) {
                entry.acquire();
                semaphore.release();
                entry.getPromise().onComplete(new TransferListener<E>(newPromise));
                return;
              }

              E value = createEntry(newPromise);
              value.acquire();

              E oldValue = entries.putIfAbsent(entryName, value);
              if (oldValue != null) {
                oldValue.acquire();
                semaphore.release();
                oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));
                return;
              }

              RedisPubSubListener<Object> listener = createListener(channelName, value);
              service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
            });

            return newPromise;
          }

          核心點(diǎn) 3 比較簡(jiǎn)單,就不說了

          解鎖

          RedissonLock.unlock()

          @Override
          public void unlock() {
            try {
              // 獲取當(dāng)前調(diào)用解鎖操作的線程ID
              get(unlockAsync(Thread.currentThread().getId()));
            } catch (RedisException e) {
              // IllegalMonitorStateException一般是A線程加鎖,B線程解鎖,內(nèi)部判斷線程狀態(tài)不一致拋出的
              if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
              } else {
                throw e;
              }
            }
          }

          RedissonBaseLock.unlockAsync

          @Override
          public RFuture<Void> unlockAsync(long threadId) {
            // 構(gòu)建一個(gè)結(jié)果RedissonPromise
            RPromise<Void> result = new RedissonPromise<>();
            // 返回的RFuture如果持有的結(jié)果為true,說明解鎖成功,返回NULL說明線程ID異常,加鎖和解鎖的客戶端線程不是同一個(gè)線程
            RFuture<Boolean> future = unlockInnerAsync(threadId);

            future.onComplete((opStatus, e) -> {
              // 取消看門狗的續(xù)期任務(wù)
              cancelExpirationRenewal(threadId);

              if (e != null) {
                result.tryFailure(e);
                return;
              }

              if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                                                                                      + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
              }

              result.trySuccess(null);
            });

            return result;
          }

          RedissonLock.unlockInnerAsync

          // 真正的內(nèi)部解鎖的方法,執(zhí)行解鎖的Lua腳本
          protected RFuture<Boolean> unlockInnerAsync(long threadId) {
            return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                                  //如果分布式鎖存在,但是value不匹配,表示鎖已經(jīng)被其他線程占用,無權(quán)釋放鎖,那么直接返回空值(解鈴還須系鈴人)
                                  "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                                  "return nil;" +
                                  "end; " +
                                  //如果value匹配,則就是當(dāng)前線程占有分布式鎖,那么將重入次數(shù)減1
                                  "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                                  //重入次數(shù)減1后的值如果大于0,表示分布式鎖有重入過,那么只能更新失效時(shí)間,還不能刪除
                                  "if (counter > 0) then " +
                                  "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                                  "return 0; " +
                                  "else " +
                                  //重入次數(shù)減1后的值如果為0,這時(shí)就可以刪除這個(gè)KEY,并發(fā)布解鎖消息,返回1
                                  "redis.call('del', KEYS[1]); " +
                                  "redis.call('publish', KEYS[2], ARGV[1]); " +
                                  "return 1; " +
                                  "end; " +
                                  "return nil;",
                                  //這5個(gè)參數(shù)分別對(duì)應(yīng)KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
                                  Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
          }

          我只列出了一小部分代碼,更多的內(nèi)容還是得自己動(dòng)手

          從源碼中,我們可以看到 Redisson 幫我們解決了拋出的第一個(gè)問題:失效時(shí)間設(shè)置多長(zhǎng)時(shí)間為好?

          Redisson 提供了看門狗,每獲得一個(gè)鎖時(shí),只設(shè)置一個(gè)很短的超時(shí)時(shí)間,同時(shí)起一個(gè)線程在每次快要到超時(shí)時(shí)間時(shí)去刷新鎖的超時(shí)時(shí)間。在釋放鎖的同時(shí)結(jié)束這個(gè)線程。

          但是沒有解決節(jié)點(diǎn)掛掉,丟失鎖的問題,接著來~

          四、RedLock

          我們上邊介紹的分布式鎖,在某些極端情況下仍然是有缺陷的

          1. 客戶端長(zhǎng)時(shí)間內(nèi)阻塞導(dǎo)致鎖失效

            客戶端 1 得到了鎖,因?yàn)榫W(wǎng)絡(luò)問題或者 GC 等原因?qū)е麻L(zhǎng)時(shí)間阻塞,然后業(yè)務(wù)程序還沒執(zhí)行完鎖就過期了,這時(shí)候客戶端 2 也能正常拿到鎖,可能會(huì)導(dǎo)致線程安全的問題。

          2. Redis 服務(wù)器時(shí)鐘漂移

            如果 Redis 服務(wù)器的機(jī)器時(shí)間發(fā)生了向前跳躍,就會(huì)導(dǎo)致這個(gè) key 過早超時(shí)失效,比如說客戶端 1 拿到鎖后,key 還沒有到過期時(shí)間,但是 Redis 服務(wù)器的時(shí)間比客戶端快了 2 分鐘,導(dǎo)致 key 提前就失效了,這時(shí)候,如果客戶端 1 還沒有釋放鎖的話,就可能導(dǎo)致多個(gè)客戶端同時(shí)持有同一把鎖的問題。

          3. 單點(diǎn)實(shí)例安全問題

            如果 Redis 是單機(jī)模式的,如果掛了的話,那所有的客戶端都獲取不到鎖了,假設(shè)你是主從模式,但 Redis 的主從同步是異步進(jìn)行的,如果 Redis 主宕機(jī)了,這個(gè)時(shí)候從機(jī)并沒有同步到這一把鎖,那么機(jī)器 B 再次申請(qǐng)的時(shí)候就會(huì)再次申請(qǐng)到這把鎖,這也是問題

          為了解決這些個(gè)問題 Redis 作者提出了 RedLock 紅鎖的算法,在 Redission 中也對(duì) RedLock 進(jìn)行了實(shí)現(xiàn)。

          Redis 官網(wǎng)對(duì) redLock 算法的介紹大致如下:The Redlock algorithm

          在分布式版本的算法里我們假設(shè)我們有 N 個(gè) Redis master 節(jié)點(diǎn),這些節(jié)點(diǎn)都是完全獨(dú)立的,我們不用任何復(fù)制或者其他隱含的分布式協(xié)調(diào)機(jī)制。之前我們已經(jīng)描述了在 Redis 單實(shí)例下怎么安全地獲取和釋放鎖。我們確保將在每(N) 個(gè)實(shí)例上使用此方法獲取和釋放鎖。在我們的例子里面我們?cè)O(shè)置 N=5,這是一個(gè)比較合理的設(shè)置,所以我們需要在 5 臺(tái)機(jī)器或者虛擬機(jī)上面運(yùn)行這些實(shí)例,這樣保證他們不會(huì)同時(shí)都宕掉。為了取到鎖,客戶端應(yīng)該執(zhí)行以下操作:

          1. 獲取當(dāng)前 Unix 時(shí)間,以毫秒為單位。
          2. 依次嘗試從 5 個(gè)實(shí)例,使用相同的 key 和具有唯一性的 value(例如UUID)獲取鎖。當(dāng)向 Redis 請(qǐng)求獲取鎖時(shí),客戶端應(yīng)該設(shè)置一個(gè)嘗試從某個(gè) Reids 實(shí)例獲取鎖的最大等待時(shí)間(超過這個(gè)時(shí)間,則立馬詢問下一個(gè)實(shí)例),這個(gè)超時(shí)時(shí)間應(yīng)該小于鎖的失效時(shí)間。例如你的鎖自動(dòng)失效時(shí)間為 10 秒,則超時(shí)時(shí)間應(yīng)該在 5-50 毫秒之間。這樣可以避免服務(wù)器端 Redis 已經(jīng)掛掉的情況下,客戶端還在死死地等待響應(yīng)結(jié)果。如果服務(wù)器端沒有在規(guī)定時(shí)間內(nèi)響應(yīng),客戶端應(yīng)該盡快嘗試去另外一個(gè) Redis 實(shí)例請(qǐng)求獲取鎖。
          3. 客戶端使用當(dāng)前時(shí)間減去開始獲取鎖時(shí)間(步驟1記錄的時(shí)間)就得到獲取鎖消耗的時(shí)間。當(dāng)且僅當(dāng)從大多數(shù)(N/2+1,這里是3個(gè)節(jié)點(diǎn))的 Redis 節(jié)點(diǎn)都取到鎖,并且使用的總耗時(shí)小于鎖失效時(shí)間時(shí),鎖才算獲取成功。
          4. 如果取到了鎖,key 的真正有效時(shí)間 = 有效時(shí)間(獲取鎖時(shí)設(shè)置的 key 的自動(dòng)超時(shí)時(shí)間) - 獲取鎖的總耗時(shí)(詢問各個(gè) Redis 實(shí)例的總耗時(shí)之和)(步驟 3 計(jì)算的結(jié)果)。
          5. 如果因?yàn)槟承┰颍罱K獲取鎖失敗(即沒有在至少 “N/2+1 ”個(gè) Redis 實(shí)例取到鎖或者“獲取鎖的總耗時(shí)”超過了“有效時(shí)間”),客戶端應(yīng)該在所有的 Redis 實(shí)例上進(jìn)行解鎖(即便某些 Redis 實(shí)例根本就沒有加鎖成功,這樣可以防止某些節(jié)點(diǎn)獲取到鎖但是客戶端沒有得到響應(yīng)而導(dǎo)致接下來的一段時(shí)間不能被重新獲取鎖)。

          總結(jié)下就是:

          1. 客戶端在多個(gè) Redis 實(shí)例上申請(qǐng)加鎖,必須保證大多數(shù)節(jié)點(diǎn)加鎖成功

            解決容錯(cuò)性問題,部分實(shí)例異常,剩下的還能加鎖成功

          2. 大多數(shù)節(jié)點(diǎn)加鎖的總耗時(shí),要小于鎖設(shè)置的過期時(shí)間

            多實(shí)例操作,可能存在網(wǎng)絡(luò)延遲、丟包、超時(shí)等問題,所以就算是大多數(shù)節(jié)點(diǎn)加鎖成功,如果加鎖的累積耗時(shí)超過了鎖的過期時(shí)間,那有些節(jié)點(diǎn)上的鎖可能也已經(jīng)失效了,還是沒有意義的

          3. 釋放鎖,要向全部節(jié)點(diǎn)發(fā)起釋放鎖請(qǐng)求

            如果部分節(jié)點(diǎn)加鎖成功,但最后由于異常導(dǎo)致大部分節(jié)點(diǎn)沒加鎖成功,就要釋放掉所有的,各節(jié)點(diǎn)要保持一致

          關(guān)于 RedLock,兩位分布式大佬,Antirez 和 Martin 還進(jìn)行過一場(chǎng)爭(zhēng)論,感興趣的也可以看看

          Config config1 = new Config();
          config1.useSingleServer().setAddress("127.0.0.1:6379");
          RedissonClient redissonClient1 = Redisson.create(config1);

          Config config2 = new Config();
          config2.useSingleServer().setAddress("127.0.0.1:5378");
          RedissonClient redissonClient2 = Redisson.create(config2);

          Config config3 = new Config();
          config3.useSingleServer().setAddress("127.0.0.1:5379");
          RedissonClient redissonClient3 = Redisson.create(config3);

          /**
           * 獲取多個(gè) RLock 對(duì)象
           */

          RLock lock1 = redissonClient1.getLock(lockKey);
          RLock lock2 = redissonClient2.getLock(lockKey);
          RLock lock3 = redissonClient3.getLock(lockKey);

          /**
           * 根據(jù)多個(gè) RLock 對(duì)象構(gòu)建 RedissonRedLock (最核心的差別就在這里)
           */

          RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);

          try {
              /**
               * 4.嘗試獲取鎖
               * waitTimeout 嘗試獲取鎖的最大等待時(shí)間,超過這個(gè)值,則認(rèn)為獲取鎖失敗
               * leaseTime   鎖的持有時(shí)間,超過這個(gè)時(shí)間鎖會(huì)自動(dòng)失效(值應(yīng)設(shè)置為大于業(yè)務(wù)處理的時(shí)間,確保在鎖有效期內(nèi)業(yè)務(wù)能處理完)
               */

              boolean res = redLock.tryLock(10010, TimeUnit.SECONDS);
              if (res) {
                  //成功獲得鎖,在這里處理業(yè)務(wù)
              }
          catch (Exception e) {
              throw new RuntimeException("aquire lock fail");
          }finally{
              //無論如何, 最后都要解鎖
              redLock.unlock();
          }

          最核心的變化就是需要構(gòu)建多個(gè) RLock ,然后根據(jù)多個(gè) RLock 構(gòu)建成一個(gè) RedissonRedLock,因?yàn)?redLock 算法是建立在多個(gè)互相獨(dú)立的 Redis 環(huán)境之上的(為了區(qū)分可以叫為 Redission node),Redission node 節(jié)點(diǎn)既可以是單機(jī)模式(single),也可以是主從模式(master/salve),哨兵模式(sentinal),或者集群模式(cluster)。這就意味著,不能跟以往這樣只搭建 1個(gè) cluster、或 1個(gè) sentinel 集群,或是1套主從架構(gòu)就了事了,需要為 RedissonRedLock 額外搭建多幾套獨(dú)立的 Redission 節(jié)點(diǎn)。

          RedissonMultiLock.tryLock

          @Override
          public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
            //        try {
            //            return tryLockAsync(waitTime, leaseTime, unit).get();
            //        } catch (ExecutionException e) {
            //            throw new IllegalStateException(e);
            //        }
            long newLeaseTime = -1;
            if (leaseTime != -1) {
              if (waitTime == -1) {
                newLeaseTime = unit.toMillis(leaseTime);
              } else {
                newLeaseTime = unit.toMillis(waitTime)*2;
              }
            }

            long time = System.currentTimeMillis();
            long remainTime = -1;
            if (waitTime != -1) {
              remainTime = unit.toMillis(waitTime);
            }
            long lockWaitTime = calcLockWaitTime(remainTime);

            //允許加鎖失敗節(jié)點(diǎn)個(gè)數(shù)限制(N-(N/2+1))
            int failedLocksLimit = failedLocksLimit();
            List<RLock> acquiredLocks = new ArrayList<>(locks.size());
            // 遍歷所有節(jié)點(diǎn)通過EVAL命令執(zhí)行l(wèi)ua加鎖
            for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
              RLock lock = iterator.next();
              boolean lockAcquired;
              try {
                // 對(duì)節(jié)點(diǎn)嘗試加鎖
                if (waitTime == -1 && leaseTime == -1) {
                  lockAcquired = lock.tryLock();
                } else {
                  long awaitTime = Math.min(lockWaitTime, remainTime);
                  lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
                }
              } catch (RedisResponseTimeoutException e) {
                // 如果拋出這類異常,為了防止加鎖成功,但是響應(yīng)失敗,需要解鎖所有節(jié)點(diǎn)
                unlockInner(Arrays.asList(lock));
                lockAcquired = false;
              } catch (Exception e) {
                lockAcquired = false;
              }

              if (lockAcquired) {
                acquiredLocks.add(lock);
              } else {
                /*
                 *  計(jì)算已經(jīng)申請(qǐng)鎖失敗的節(jié)點(diǎn)是否已經(jīng)到達(dá) 允許加鎖失敗節(jié)點(diǎn)個(gè)數(shù)限制 (N-(N/2+1))
                 * 如果已經(jīng)到達(dá), 就認(rèn)定最終申請(qǐng)鎖失敗,則沒有必要繼續(xù)從后面的節(jié)點(diǎn)申請(qǐng)了
                 * 因?yàn)?nbsp;Redlock 算法要求至少N/2+1 個(gè)節(jié)點(diǎn)都加鎖成功,才算最終的鎖申請(qǐng)成功
                 */

                if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                  break;
                }

                if (failedLocksLimit == 0) {
                  unlockInner(acquiredLocks);
                  if (waitTime == -1) {
                    return false;
                  }
                  failedLocksLimit = failedLocksLimit();
                  acquiredLocks.clear();
                  // reset iterator
                  while (iterator.hasPrevious()) {
                    iterator.previous();
                  }
                } else {
                  failedLocksLimit--;
                }
              }
              //計(jì)算 目前從各個(gè)節(jié)點(diǎn)獲取鎖已經(jīng)消耗的總時(shí)間,如果已經(jīng)等于最大等待時(shí)間,則認(rèn)定最終申請(qǐng)鎖失敗,返回false
              if (remainTime != -1) {
                remainTime -= System.currentTimeMillis() - time;
                time = System.currentTimeMillis();
                if (remainTime <= 0) {
                  unlockInner(acquiredLocks);
                  return false;
                }
              }
            }

            if (leaseTime != -1) {
              acquiredLocks.stream()
                .map(l -> (RedissonLock) l)
                .map(l -> l.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS))
                .forEach(f -> f.syncUninterruptibly());
            }

            return true;
          }
          參考與感謝
          • 《Redis —— Distributed locks with Redis》
          • 《Redisson —— Distributed locks and synchronizers》
          • 慢談 Redis 實(shí)現(xiàn)分布式鎖 以及 Redisson 源碼解析
          • 理解Redisson中分布式鎖的實(shí)現(xiàn)

          推薦閱讀:
          面霸篇:高頻 Java 基礎(chǔ)問題(核心卷一)
          MYSQL 那點(diǎn)破事!
          Redis緩存那點(diǎn)破事
          面試官問:如何保證 MQ消息是有序的?
          MySQL 開源工具集合


          關(guān)號(hào)互聯(lián)網(wǎng)全棧架構(gòu)價(jià)

          瀏覽 68
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  艹逼网站 | 黑人大鸡巴视频 | 五月婷婷第四色 | 黄色影院在线观看 | 黄片大全在线 |