<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          最強(qiáng)分布式鎖工具:Redisson

          共 44260字,需瀏覽 89分鐘

           ·

          2022-08-11 16:47

          點(diǎn)擊關(guān)注公眾號(hào),利用碎片時(shí)間學(xué)習(xí)

          一、Redisson概述

          什么是Redisson?

          Redisson是一個(gè)在Redis的基礎(chǔ)上實(shí)現(xiàn)的Java駐內(nèi)存數(shù)據(jù)網(wǎng)格(In-Memory Data Grid)。它不僅提供了一系列的分布式的Java常用對(duì)象,還提供了許多分布式服務(wù)。


          其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最簡(jiǎn)單和最便捷的方法。


          Redisson的宗旨是促進(jìn)使用者對(duì)Redis的關(guān)注分離(Separation of Concern),從而讓使用者能夠?qū)⒕Ω械胤旁谔幚順I(yè)務(wù)邏輯上。

          一個(gè)基于Redis實(shí)現(xiàn)的分布式工具,有基本分布式對(duì)象和高級(jí)又抽象的分布式服務(wù),為每個(gè)試圖再造分布式輪子的程序員帶來了大部分分布式問題的解決辦法。

          Redisson和Jedis、Lettuce有什么區(qū)別?倒也不是雷鋒和雷鋒塔

          Redisson和它倆的區(qū)別就像一個(gè)用鼠標(biāo)操作圖形化界面,一個(gè)用命令行操作文件。Redisson是更高層的抽象,Jedis和Lettuce是Redis命令的封裝。

          • Jedis是Redis官方推出的用于通過Java連接Redis客戶端的一個(gè)工具包,提供了Redis的各種命令支持

          • Lettuce是一種可擴(kuò)展的線程安全的 Redis 客戶端,通訊框架基于Netty,支持高級(jí)的 Redis 特性,比如哨兵,集群,管道,自動(dòng)重新連接和Redis數(shù)據(jù)模型。Spring Boot 2.x 開始 Lettuce 已取代 Jedis 成為首選 Redis 的客戶端。

          • Redisson是架設(shè)在Redis基礎(chǔ)上,通訊基于Netty的綜合的、新型的中間件,企業(yè)級(jí)開發(fā)中使用Redis的最佳范本

          Jedis把Redis命令封裝好,Lettuce則進(jìn)一步有了更豐富的Api,也支持集群等模式。但是兩者也都點(diǎn)到為止,只給了你操作Redis數(shù)據(jù)庫的腳手架,而Redisson則是基于Redis、Lua和Netty建立起了成熟的分布式解決方案,甚至redis官方都推薦的一種工具集。

          二、分布式鎖

          分布式鎖怎么實(shí)現(xiàn)?

          分布式鎖是并發(fā)業(yè)務(wù)下的剛需,雖然實(shí)現(xiàn)五花八門:ZooKeeper有Znode順序節(jié)點(diǎn),數(shù)據(jù)庫有表級(jí)鎖和樂/悲觀鎖,Redis有setNx,但是殊途同歸,最終還是要回到互斥上來,本篇介紹Redisson,那就以redis為例。

          怎么寫一個(gè)簡(jiǎn)單的Redis分布式鎖?

          以Spring Data Redis為例,用RedisTemplate來操作Redis(setIfAbsent已經(jīng)是setNx + expire的合并命令),如下

          // 加鎖
          public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) {
              return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit);
          }
          // 解鎖,防止刪錯(cuò)別人的鎖,以u(píng)uid為value校驗(yàn)是否自己的鎖
          public void unlock(String lockName, String uuid) {
              if(uuid.equals(redisTemplate.opsForValue().get(lockName)){        redisTemplate.opsForValue().del(lockName);    }
          }

          // 結(jié)構(gòu)
          if(tryLock){
              // todo
          }finally{
              unlock;
          }

          簡(jiǎn)單1.0版本完成,聰明的小張一眼看出,這是鎖沒錯(cuò),但get和del操作非原子性,并發(fā)一旦大了,無法保證進(jìn)程安全。于是小張?zhí)嶙h,用Lua腳本

          Lua腳本是什么?

          Lua腳本是redis已經(jīng)內(nèi)置的一種輕量小巧語言,其執(zhí)行是通過redis的eval/evalsha命令來運(yùn)行,把操作封裝成一個(gè)Lua腳本,如論如何都是一次執(zhí)行的原子操作。

          于是2.0版本通過Lua腳本刪除

          lockDel.lua如下

          if redis.call('get', KEYS[1]) == ARGV[1] 
              then 
           -- 執(zhí)行刪除操作
                  return redis.call('del', KEYS[1]) 
              else 
           -- 不成功,返回0
                  return 0 
          end

          delete操作時(shí)執(zhí)行Lua命令

          // 解鎖腳本
          DefaultRedisScript<Object> unlockScript = new DefaultRedisScript();
          unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lockDel.lua")));

          // 執(zhí)行l(wèi)ua腳本解鎖
          redisTemplate.execute(unlockScript, Collections.singletonList(keyName), value);

          2.0似乎更像一把鎖,但好像又缺少了什么,小張一拍腦袋,synchronized和ReentrantLock都很絲滑,因?yàn)樗麄兌际强芍厝腈i,一個(gè)線程多次拿鎖也不會(huì)死鎖,我們需要可重入。

          怎么保證可重入?

          重入就是,同一個(gè)線程多次獲取同一把鎖是允許的,不會(huì)造成死鎖,這一點(diǎn)synchronized偏向鎖提供了很好的思路,synchronized的實(shí)現(xiàn)重入是在JVM層面,JAVA對(duì)象頭MARK WORD中便藏有線程ID和計(jì)數(shù)器來對(duì)當(dāng)前線程做重入判斷,避免每次CAS。

          當(dāng)一個(gè)線程訪問同步塊并獲取鎖時(shí),會(huì)在對(duì)象頭和棧幀中的鎖記錄里存儲(chǔ)偏向的線程ID,以后該線程在進(jìn)入和退出同步塊時(shí)不需要進(jìn)行CAS操作來加鎖和解鎖,只需簡(jiǎn)單測(cè)試一下對(duì)象頭的Mark Word里是否存儲(chǔ)著指向當(dāng)前線程的偏向鎖。如果測(cè)試成功,表示線程已經(jīng)獲得了鎖。如果測(cè)試失敗,則需要再測(cè)試一下Mark Word中偏向鎖標(biāo)志是否設(shè)置成1:沒有則CAS競(jìng)爭(zhēng);設(shè)置了,則CAS將對(duì)象頭偏向鎖指向當(dāng)前線程。

          再維護(hù)一個(gè)計(jì)數(shù)器,同個(gè)線程進(jìn)入則自增1,離開再減1,直到為0才能釋放

          可重入鎖

          仿造該方案,我們需改造Lua腳本:

          1.需要存儲(chǔ) 鎖名稱lockName、獲得該鎖的線程id和對(duì)應(yīng)線程的進(jìn)入次數(shù)count

          2.加鎖

          每次線程獲取鎖時(shí),判斷是否已存在該鎖

          • 不存在

          • 設(shè)置hash的key為線程id,value初始化為1

          • 設(shè)置過期時(shí)間

          • 返回獲取鎖成功true

          • 存在

          • 繼續(xù)判斷是否存在當(dāng)前線程id的hash key

          • 存在,線程key的value + 1,重入次數(shù)增加1,設(shè)置過期時(shí)間

          • 不存在,返回加鎖失敗

          3.解鎖

          每次線程來解鎖時(shí),判斷是否已存在該鎖

          • 存在

          • 是否有該線程的id的hash key,有則減1,無則返回解鎖失敗

          • 減1后,判斷剩余count是否為0,為0則說明不再需要這把鎖,執(zhí)行del命令刪除

          1.存儲(chǔ)結(jié)構(gòu)

          為了方便維護(hù)這個(gè)對(duì)象,我們用Hash結(jié)構(gòu)來存儲(chǔ)這些字段。Redis的Hash類似Java的HashMap,適合存儲(chǔ)對(duì)象。

          hset lockname1 threadId 1

          設(shè)置一個(gè)名字為lockname1的hash結(jié)構(gòu),該hash結(jié)構(gòu)key為threadId,值value為1

          hget lockname1 threadId

          獲取lockname1的threadId的值

          存儲(chǔ)結(jié)構(gòu)為

          lockname 鎖名稱
              key1:   threadId   唯一鍵,線程id
              value1:  count     計(jì)數(shù)器,記錄該線程獲取鎖的次數(shù)

          redis中的結(jié)構(gòu)

          2.計(jì)數(shù)器的加減

          當(dāng)同一個(gè)線程獲取同一把鎖時(shí),我們需要對(duì)對(duì)應(yīng)線程的計(jì)數(shù)器count做加減

          判斷一個(gè)redis key是否存在,可以用exists,而判斷一個(gè)hash的key是否存在,可以用hexists

          而redis也有hash自增的命令hincrby

          每次自增1時(shí) hincrby lockname1 threadId 1,自減1時(shí) hincrby lockname1 threadId -1

          3.解鎖的判斷

          當(dāng)一把鎖不再被需要了,每次解鎖一次,count減1,直到為0時(shí),執(zhí)行刪除

          綜合上述的存儲(chǔ)結(jié)構(gòu)和判斷流程,加鎖和解鎖Lua如下

          加鎖 lock.lua

          local key = KEYS[1];
          local threadId = ARGV[1];
          local releaseTime = ARGV[2];

          -- lockname不存在
          if(redis.call('exists', key) == 0) then
              redis.call('hset', key, threadId, '1');
              redis.call('expire', key, releaseTime);
              return 1;
          end;

          -- 當(dāng)前線程已id存在
          if(redis.call('hexists', key, threadId) == 1) then
              redis.call('hincrby', key, threadId, '1');
              redis.call('expire', key, releaseTime);
              return 1;
          end;
          return 0;

          解鎖 unlock.lua

          local key = KEYS[1];
          local threadId = ARGV[1];

          -- lockname、threadId不存在
          if (redis.call('hexists', key, threadId) == 0) then
              return nil;
          end;

          -- 計(jì)數(shù)器-1
          local count = redis.call('hincrby', key, threadId, -1);

          -- 刪除lock
          if (count == 0) then
              redis.call('del', key);
              return nil;
          end;

          代碼

          /**
           * @description 原生redis實(shí)現(xiàn)分布式鎖
           **/

          @Getter
          @Setter
          public class RedisLock {

              private RedisTemplate redisTemplate;
              private DefaultRedisScript<Long> lockScript;
              private DefaultRedisScript<Object> unlockScript;

              public RedisLock(RedisTemplate redisTemplate) {
                  this.redisTemplate = redisTemplate;
                  // 加載加鎖的腳本
                  lockScript = new DefaultRedisScript<>();
                  this.lockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lock.lua")));
                  this.lockScript.setResultType(Long.class);
                  // 加載釋放鎖的腳本
                  unlockScript = new DefaultRedisScript<>();
                  this.unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("unlock.lua")));
              }

              /**
               * 獲取鎖
               */

              public String tryLock(String lockName, long releaseTime) {
                  // 存入的線程信息的前綴
                  String key = UUID.randomUUID().toString();

                  // 執(zhí)行腳本
                  Long result = (Long) redisTemplate.execute(
                          lockScript,
                          Collections.singletonList(lockName),
                          key + Thread.currentThread().getId(),
                          releaseTime);

                  if (result != null && result.intValue() == 1) {
                      return key;
                  } else {
                      return null;
                  }
              }

              /**
               * 解鎖
               * @param lockName
               * @param key
               */

              public void unlock(String lockName, String key) {
                  redisTemplate.execute(unlockScript,
                          Collections.singletonList(lockName),
                          key + Thread.currentThread().getId()
                          );
              }
          }

          至此已經(jīng)完成了一把分布式鎖,符合互斥、可重入、防死鎖的基本特點(diǎn)。

          嚴(yán)謹(jǐn)?shù)男堄X得雖然當(dāng)個(gè)普通互斥鎖,已經(jīng)穩(wěn)穩(wěn)夠用,可是業(yè)務(wù)里總是又很多特殊情況的,比如A進(jìn)程在獲取到鎖的時(shí)候,因業(yè)務(wù)操作時(shí)間太長(zhǎng),鎖釋放了但是業(yè)務(wù)還在執(zhí)行,而此刻B進(jìn)程又可以正常拿到鎖做業(yè)務(wù)操作,兩個(gè)進(jìn)程操作就會(huì)存在依舊有共享資源的問題

          而且如果負(fù)責(zé)儲(chǔ)存這個(gè)分布式鎖的Redis節(jié)點(diǎn)宕機(jī)以后,而且這個(gè)鎖正好處于鎖住的狀態(tài)時(shí),這個(gè)鎖會(huì)出現(xiàn)鎖死的狀態(tài)

          小張不是杠精,因?yàn)閹齑娌僮骺傆羞@樣那樣的特殊。

          所以我們希望在這種情況時(shí),可以延長(zhǎng)鎖的releaseTime延遲釋放鎖來直到完成業(yè)務(wù)期望結(jié)果,這種不斷延長(zhǎng)鎖過期時(shí)間來保證業(yè)務(wù)執(zhí)行完成的操作就是鎖續(xù)約。

          讀寫分離也是常見,一個(gè)讀多寫少的業(yè)務(wù)為了性能,常常是有讀鎖和寫鎖的。

          而此刻的擴(kuò)展已經(jīng)超出了一把簡(jiǎn)單輪子的復(fù)雜程度,光是處理續(xù)約,就夠小張喝一壺,何況在性能(鎖的最大等待時(shí)間)、優(yōu)雅(無效鎖申請(qǐng))、重試(失敗重試機(jī)制)等方面還要下功夫研究。

          在小張苦思冥想時(shí),旁邊的小白湊過來看了看小張,很好奇,都2021年了,為什么不直接用redisson呢?

          Redisson就有這把你要的鎖。

          三、Redisson分布式鎖

          號(hào)稱簡(jiǎn)單的Redisson分布式鎖的使用姿勢(shì)是什么?

          1.依賴

          <!-- 原生,本章使用-->
          <dependency>
              <groupId>org.redisson</groupId>
              <artifactId>redisson</artifactId>
              <version>3.13.6</version>
          </dependency>

          <!-- 另一種Spring集成starter,本章未使用 -->
          <dependency>
              <groupId>org.redisson</groupId>
              <artifactId>redisson-spring-boot-starter</artifactId>
              <version>3.13.6</version>
          </dependency>

          2.配置

          @Configuration
          public class RedissionConfig {
              @Value("${spring.redis.host}")
              private String redisHost;

              @Value("${spring.redis.password}")
              private String password;

              private int port = 6379;

              @Bean
              public RedissonClient getRedisson() {
                  Config config = new Config();
                  config.useSingleServer().
                          setAddress("redis://" + redisHost + ":" + port).
                          setPassword(password);
                  config.setCodec(new JsonJacksonCodec());
                  return Redisson.create(config);
              }
          }

          3.啟用分布式鎖

          @Resource
          private RedissonClient redissonClient;

          RLock rLock = redissonClient.getLock(lockName);
          try {
              boolean isLocked = rLock.tryLock(expireTime, TimeUnit.MILLISECONDS);
              if (isLocked) {
                  // TODO
                          }
              } catch (Exception e) {
                      rLock.unlock();
              }

          簡(jiǎn)潔明了,只需要一個(gè)RLock,既然推薦Redisson,就往里面看看他是怎么實(shí)現(xiàn)的。

          四、RLock

          RLock是Redisson分布式鎖的最核心接口,繼承了concurrent包的Lock接口和自己的RLockAsync接口,RLockAsync的返回值都是RFuture,是Redisson執(zhí)行異步實(shí)現(xiàn)的核心邏輯,也是Netty發(fā)揮的主要陣地。

          RLock如何加鎖?

          從RLock進(jìn)入,找到RedissonLock類,找到tryLock方法再遞進(jìn)到干事的tryAcquireOnceAsync方法,這是加鎖的主要代碼(版本不一此處實(shí)現(xiàn)有差別,和最新3.15.x有一定出入,但是核心邏輯依然未變。此處以3.13.6為例)

          private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
                  if (leaseTime != -1L) {
                      return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
                  } else {
                      RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
                      ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                          if (e == null) {
                              if (ttlRemaining) {
                                  this.scheduleExpirationRenewal(threadId);
                              }

                          }
                      });
                      return ttlRemainingFuture;
                  }
              }

          此處出現(xiàn)leaseTime時(shí)間判斷的2個(gè)分支,實(shí)際上就是加鎖時(shí)是否設(shè)置過期時(shí)間,未設(shè)置過期時(shí)間(-1)時(shí)則會(huì)有watchDog鎖續(xù)約(下文),一個(gè)注冊(cè)了加鎖事件的續(xù)約任務(wù)。我們先來看有過期時(shí)間tryLockInnerAsync部分,

          evalWriteAsync是eval命令執(zhí)行l(wèi)ua的入口

          <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
                  this.internalLockLeaseTime = unit.toMillis(leaseTime);
                  return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
              }

          這里揭開真面目,eval命令執(zhí)行Lua腳本的地方,此處的Lua腳本展開

          -- 不存在該key時(shí)
          if (redis.call('exists', KEYS[1]) == 0then 
            -- 新增該鎖并且hash中該線程id對(duì)應(yīng)的count置1
            redis.call('hincrby', KEYS[1], ARGV[2], 1); 
            -- 設(shè)置過期時(shí)間
            redis.call('pexpire', KEYS[1], ARGV[1]); 
            return nil
          end

          -- 存在該key 并且 hash中線程id的key也存在
          if (redis.call('hexists', KEYS[1], ARGV[2]) == 1then 
            -- 線程重入次數(shù)++
            redis.call('hincrby', KEYS[1], ARGV[2], 1); 
            redis.call('pexpire', KEYS[1], ARGV[1]); 
            return nil
          end
          return redis.call('pttl', KEYS[1]);

          和前面我們寫自定義的分布式鎖的腳本幾乎一致,看來redisson也是一樣的實(shí)現(xiàn),具體參數(shù)分析:

          // keyName
          KEYS[1] = Collections.singletonList(this.getName())
          // leaseTime
          ARGV[1] = this.internalLockLeaseTime
          // uuid+threadId組合的唯一值
          ARGV[2] = this.getLockName(threadId)

          總共3個(gè)參數(shù)完成了一段邏輯:

          判斷該鎖是否已經(jīng)有對(duì)應(yīng)hash表存在,

          ? 沒有對(duì)應(yīng)的hash表:則set該hash表中一個(gè)entry的key為鎖名稱,value為1,之后設(shè)置該hash表失效時(shí)間為leaseTime

          ? 存在對(duì)應(yīng)的hash表:則將該lockName的value執(zhí)行+1操作,也就是計(jì)算進(jìn)入次數(shù),再設(shè)置失效時(shí)間leaseTime

          ? 最后返回這把鎖的ttl剩余時(shí)間

          也和上述自定義鎖沒有區(qū)別

          既然如此,那解鎖的步驟也肯定有對(duì)應(yīng)的-1操作,再看unlock方法,同樣查找方法名,一路到

          protected RFuture<Boolean> unlockInnerAsync(long threadId) {
                  return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)});
              }

          掏出Lua部分

          -- 不存在key
          if (redis.call('hexists', KEYS[1], ARGV[3]) == 0then 
            return nil;
          end;
          -- 計(jì)數(shù)器 -1
          local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
          if (counter > 0then 
            -- 過期時(shí)間重設(shè)
            redis.call('pexpire', KEYS[1], ARGV[2]); 
            return 0
          else
            -- 刪除并發(fā)布解鎖消息
            redis.call('del', KEYS[1]); 
            redis.call('publish', KEYS[2], ARGV[1]); 
            return 1;
          end
          return nil;

          該Lua KEYS有2個(gè)Arrays.asList(getName(), getChannelName())

          name 鎖名稱
          channelName,用于pubSub發(fā)布消息的channel名稱

          ARGV變量有三個(gè)LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)

          LockPubSub.UNLOCK_MESSAGE,channel發(fā)送消息的類別,此處解鎖為0
          internalLockLeaseTime,watchDog配置的超時(shí)時(shí)間,默認(rèn)為30s
          lockName 這里的lockName指的是uuid和threadId組合的唯一值

          步驟如下:

          1.如果該鎖不存在則返回nil;

          2.如果該鎖存在則將其線程的hash key計(jì)數(shù)器-1,

          3.計(jì)數(shù)器counter>0,重置下失效時(shí)間,返回0;否則,刪除該鎖,發(fā)布解鎖消息unlockMessage,返回1;

          其中unLock的時(shí)候使用到了Redis發(fā)布訂閱PubSub完成消息通知。

          而訂閱的步驟就在RedissonLock的加鎖入口的lock方法里

          long threadId = Thread.currentThread().getId();
                  Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
                  if (ttl != null) {
                      // 訂閱
                      RFuture<RedissonLockEntry> future = this.subscribe(threadId);
                      if (interruptibly) {
                          this.commandExecutor.syncSubscriptionInterrupted(future);
                      } else {
                          this.commandExecutor.syncSubscription(future);
                      }
                      // 省略

          當(dāng)鎖被其他線程占用時(shí),通過監(jiān)聽鎖的釋放通知(在其他線程通過RedissonLock釋放鎖時(shí),會(huì)通過發(fā)布訂閱pub/sub功能發(fā)起通知),等待鎖被其他線程釋放,也是為了避免自旋的一種常用效率手段。

          1.解鎖消息

          為了一探究竟通知了什么,通知后又做了什么,進(jìn)入LockPubSub。

          這里只有一個(gè)明顯的監(jiān)聽方法onMessage,其訂閱和信號(hào)量的釋放都在父類PublishSubscribe,我們只關(guān)注監(jiān)聽事件的實(shí)際操作

          protected void onMessage(RedissonLockEntry value, Long message) {
                  Runnable runnableToExecute;
                  if (message.equals(unlockMessage)) {
                      // 從監(jiān)聽器隊(duì)列取監(jiān)聽線程執(zhí)行監(jiān)聽回調(diào)
                      runnableToExecute = (Runnable)value.getListeners().poll();
                      if (runnableToExecute != null) {
                          runnableToExecute.run();
                      }
                      // getLatch()返回的是Semaphore,信號(hào)量,此處是釋放信號(hào)量
                      // 釋放信號(hào)量后會(huì)喚醒等待的entry.getLatch().tryAcquire去再次嘗試申請(qǐng)鎖
                      value.getLatch().release();
                  } else if (message.equals(readUnlockMessage)) {
                      while(true) {
                          runnableToExecute = (Runnable)value.getListeners().poll();
                          if (runnableToExecute == null) {
                              value.getLatch().release(value.getLatch().getQueueLength());
                              break;
                          }
                          runnableToExecute.run();
                      }
                  }
              }

          發(fā)現(xiàn)一個(gè)是默認(rèn)解鎖消息,一個(gè)是讀鎖解鎖消息,因?yàn)閞edisson是有提供讀寫鎖的,而讀寫鎖讀讀情況和讀寫、寫寫情況互斥情況不同,我們只看上面的默認(rèn)解鎖消息unlockMessage分支

          LockPubSub監(jiān)聽最終執(zhí)行了2件事

          1. runnableToExecute.run() 執(zhí)行監(jiān)聽回調(diào)

          2. value.getLatch().release(); 釋放信號(hào)量

          Redisson通過LockPubSub監(jiān)聽解鎖消息,執(zhí)行監(jiān)聽回調(diào)和釋放信號(hào)量通知等待線程可以重新?lián)屾i。

          這時(shí)再回來看tryAcquireOnceAsync另一分支

          private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
                  if (leaseTime != -1L) {
                      return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
                  } else {
                      RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
                      ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                          if (e == null) {
                              if (ttlRemaining) {
                                  this.scheduleExpirationRenewal(threadId);
                              }

                          }
                      });
                      return ttlRemainingFuture;
                  }
              }

          可以看到,無超時(shí)時(shí)間時(shí),在執(zhí)行加鎖操作后,還執(zhí)行了一段費(fèi)解的邏輯

          ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                          if (e == null) {
                              if (ttlRemaining) {
                                  this.scheduleExpirationRenewal(threadId);
                              }

                          }
                      })                   }                 }             }) 復(fù)制代碼

          此處涉及到Netty的Future/Promise-Listener模型,Redisson中幾乎全部以這種方式通信(所以說Redisson是基于Netty通信機(jī)制實(shí)現(xiàn)的),理解這段邏輯可以試著先理解

          在 Java 的 Future 中,業(yè)務(wù)邏輯為一個(gè) Callable 或 Runnable 實(shí)現(xiàn)類,該類的 call()或 run()執(zhí)行完畢意味著業(yè)務(wù)邏輯的完結(jié),在 Promise 機(jī)制中,可以在業(yè)務(wù)邏輯中人工設(shè)置業(yè)務(wù)邏輯的成功與失敗,這樣更加方便的監(jiān)控自己的業(yè)務(wù)邏輯。

          這塊代碼的表面意義就是,在執(zhí)行異步加鎖的操作后,加鎖成功則根據(jù)加鎖完成返回的ttl是否過期來確認(rèn)是否執(zhí)行一段定時(shí)任務(wù)。

          這段定時(shí)任務(wù)的就是watchDog的核心。

          2.鎖續(xù)約

          查看RedissonLock.this.scheduleExpirationRenewal(threadId)

          private void scheduleExpirationRenewal(long threadId) {
                  RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
                  RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
                  if (oldEntry != null) {
                      oldEntry.addThreadId(threadId);
                  } else {
                      entry.addThreadId(threadId);
                      this.renewExpiration();
                  }

              }

          private void renewExpiration() {
                  RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
                  if (ee != null) {
                      Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                          public void run(Timeout timeout) throws Exception {
                              RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
                              if (ent != null) {
                                  Long threadId = ent.getFirstThreadId();
                                  if (threadId != null) {
                                      RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
                                      future.onComplete((res, e) -> {
                                          if (e != null) {
                                              RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
                                          } else {
                                              if (res) {
                                                  RedissonLock.this.renewExpiration();
                                              }

                                          }
                                      });
                                  }
                              }
                          }
                      }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
                      ee.setTimeout(task);
                  }
              }

          拆分來看,這段連續(xù)嵌套且冗長(zhǎng)的代碼實(shí)際上做了幾步

          ? 添加一個(gè)netty的Timeout回調(diào)任務(wù),每(internalLockLeaseTime / 3)毫秒執(zhí)行一次,執(zhí)行的方法是renewExpirationAsync

          ? renewExpirationAsync重置了鎖超時(shí)時(shí)間,又注冊(cè)一個(gè)監(jiān)聽器,監(jiān)聽回調(diào)又執(zhí)行了renewExpiration

          renewExpirationAsync 的Lua如下

          protected RFuture<Boolean> renewExpirationAsync(long threadId) {
                  return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
              }

          if (redis.call('hexists', KEYS[1], ARGV[2]) == 1then 
            redis.call('pexpire', KEYS[1], ARGV[1]); 
            return 1
          end
          return 0;

          重新設(shè)置了超時(shí)時(shí)間。

          Redisson加這段邏輯的目的是什么?

          目的是為了某種場(chǎng)景下保證業(yè)務(wù)不影響,如任務(wù)執(zhí)行超時(shí)但未結(jié)束,鎖已經(jīng)釋放的問題。

          當(dāng)一個(gè)線程持有了一把鎖,由于并未設(shè)置超時(shí)時(shí)間leaseTime,Redisson默認(rèn)配置了30S,開啟watchDog,每10S對(duì)該鎖進(jìn)行一次續(xù)約,維持30S的超時(shí)時(shí)間,直到任務(wù)完成再刪除鎖。

          這就是Redisson的鎖續(xù)約,也就是WatchDog實(shí)現(xiàn)的基本思路。

          3.流程概括

          通過整體的介紹,流程簡(jiǎn)單概括:

          1. A、B線程爭(zhēng)搶一把鎖,A獲取到后,B阻塞

          2. B線程阻塞時(shí)并非主動(dòng)CAS,而是PubSub方式訂閱該鎖的廣播消息

          3. A操作完成釋放了鎖,B線程收到訂閱消息通知

          4. B被喚醒開始繼續(xù)搶鎖,拿到鎖

          詳細(xì)加鎖解鎖流程總結(jié)如下圖:

          五、公平鎖

          以上介紹的可重入鎖是非公平鎖,Redisson還基于Redis的隊(duì)列(List)和ZSet實(shí)現(xiàn)了公平鎖

          公平的定義是什么?

          公平就是按照客戶端的請(qǐng)求先來后到排隊(duì)來獲取鎖,先到先得,也就是FIFO,所以隊(duì)列和容器順序編排必不可少

          FairSync

          回顧JUC的ReentrantLock公平鎖的實(shí)現(xiàn)

          /**
           * Sync object for fair locks
           */

          static final class FairSync extends Sync {
              private static final long serialVersionUID = -3000897897090466540L;

              final void lock() {
                  acquire(1);
              }

              /**
               * Fair version of tryAcquire.  Don't grant access unless
               * recursive call or no waiters or is first.
               */

              protected final boolean tryAcquire(int acquires) {
                  final Thread current = Thread.currentThread();
                  int c = getState();
                  if (c == 0) {
                      if (!hasQueuedPredecessors() &&
                          compareAndSetState(0, acquires)) {
                          setExclusiveOwnerThread(current);
                          return true;
                      }
                  }
                  else if (current == getExclusiveOwnerThread()) {
                      int nextc = c + acquires;
                      if (nextc < 0)
                          throw new Error("Maximum lock count exceeded");
                      setState(nextc);
                      return true;
                  }
                  return false;
              }
          }

          AQS已經(jīng)提供了整個(gè)實(shí)現(xiàn),是否公平取決于實(shí)現(xiàn)類取出節(jié)點(diǎn)邏輯是否順序取

          AbstractQueuedSynchronizer是用來構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架,通過內(nèi)置FIFO隊(duì)列來完成資源獲取線程的排隊(duì)工作,他自身沒有實(shí)現(xiàn)同步接口,僅僅定義了若干同步狀態(tài)獲取和釋放的方法來供自定義同步組件使用(上圖),支持獨(dú)占和共享獲取,這是基于模版方法模式的一種設(shè)計(jì),給公平/非公平提供了土壤。

          我們用2張圖來簡(jiǎn)單解釋AQS的等待流程(出自《JAVA并發(fā)編程的藝術(shù)》)

          一張是同步隊(duì)列(FIFO雙向隊(duì)列)管理 獲取同步狀態(tài)失敗(搶鎖失敗)的線程引用、等待狀態(tài)和前驅(qū)后繼節(jié)點(diǎn)的流程圖

          一張是獨(dú)占式獲取同步狀態(tài)的總流程,核心acquire(int arg)方法調(diào)用流程

          可以看出鎖的獲取流程

          AQS維護(hù)一個(gè)同步隊(duì)列,獲取狀態(tài)失敗的線程都會(huì)加入到隊(duì)列中進(jìn)行自旋,移出隊(duì)列或停止自旋的條件是前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)切成功獲取了同步狀態(tài)。

          而比較另一段非公平鎖類NonfairSync可以發(fā)現(xiàn),控制公平和非公平的關(guān)鍵代碼,在于hasQueuedPredecessors方法。

          static final class NonfairSync extends Sync {
              private static final long serialVersionUID = 7316153563782823691L;

              /**
               * Performs lock.  Try immediate barge, backing up to normal
               * acquire on failure.
               */

              final void lock() {
                  if (compareAndSetState(01))
                      setExclusiveOwnerThread(Thread.currentThread());
                  else
                      acquire(1);
              }

              protected final boolean tryAcquire(int acquires) {
                  return nonfairTryAcquire(acquires);
              }
          }

          NonfairSync減少了了hasQueuedPredecessors判斷條件,該方法的作用就是

          查看同步隊(duì)列中當(dāng)前節(jié)點(diǎn)是否有前驅(qū)節(jié)點(diǎn),如果有比當(dāng)前線程更早請(qǐng)求獲取鎖則返回true。

          保證每次都取隊(duì)列的第一個(gè)節(jié)點(diǎn)(線程)來獲取鎖,這就是公平規(guī)則

          為什么JUC以默認(rèn)非公平鎖呢?

          因?yàn)楫?dāng)一個(gè)線程請(qǐng)求鎖時(shí),只要獲取來同步狀態(tài)即成功獲取。在此前提下,剛釋放的線程再次獲取同步狀態(tài)的幾率會(huì)非常大,使得其他線程只能在同步隊(duì)列中等待。但這樣帶來的好處是,非公平鎖大大減少了系統(tǒng)線程上下文的切換開銷。

          可見公平的代價(jià)是性能與吞吐量。

          Redis里沒有AQS,但是有List和zSet,看看Redisson是怎么實(shí)現(xiàn)公平的。

          RedissonFairLock

          RedissonFairLock 用法依然很簡(jiǎn)單

          RLock fairLock = redissonClient.getFairLock(lockName);

          fairLock.lock();

          RedissonFairLock繼承自RedissonLock,同樣一路向下找到加鎖實(shí)現(xiàn)方法tryLockInnerAsync

          這里有2段冗長(zhǎng)的Lua,但是Debug發(fā)現(xiàn),公平鎖的入口在 command == RedisCommands.EVAL_LONG 之后,此段Lua較長(zhǎng),參數(shù)也多,我們著重分析Lua的實(shí)現(xiàn)規(guī)則

          參數(shù)

          -- lua中的幾個(gè)參數(shù)
          KEYS = Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName)
          KEYS[1]: lock_name, 鎖名稱                   
          KEYS[2]: "redisson_lock_queue:{xxx}"  線程隊(duì)列
          KEYS[3]: "redisson_lock_timeout:{xxx}"  線程id對(duì)應(yīng)的超時(shí)集合

          ARGV =  internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime
          ARGV[1]: "{leaseTime}" 過期時(shí)間
          ARGV[2]: "{Redisson.UUID}:{threadId}"   
          ARGV[3] = 當(dāng)前時(shí)間 + 線程等待時(shí)間:(10:00:00) + 5000毫秒 = 10:00:05
          ARGV[4] = 當(dāng)前時(shí)間(10:00:00)  部署服務(wù)器時(shí)間,非redis-server服務(wù)器時(shí)間

          公平鎖實(shí)現(xiàn)的Lua腳本

          -- 1.死循環(huán)清除過期key
          while true do 
            -- 獲取頭節(jié)點(diǎn)
              local firstThreadId2 = redis.call('lindex', KEYS[2], 0);
              -- 首次獲取必空跳出循環(huán)
            if firstThreadId2 == false then 
              break;
            end;
            -- 清除過期key
            local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));
            if timeout <= tonumber(ARGV[4]) then
              redis.call('zrem', KEYS[3], firstThreadId2);
              redis.call('lpop', KEYS[2]);
            else
              break;
            end;
          end;

          -- 2.不存在該鎖 && (不存在線程等待隊(duì)列 || 存在線程等待隊(duì)列而且第一個(gè)節(jié)點(diǎn)就是此線程ID),加鎖部分主要邏輯
          if (redis.call('exists', KEYS[1]) == 0and 
            ((redis.call('exists', KEYS[2]) == 0)  or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then
            -- 彈出隊(duì)列中線程id元素,刪除Zset中該線程id對(duì)應(yīng)的元素
            redis.call('lpop', KEYS[2]);
            redis.call('zrem', KEYS[3], ARGV[2]);
            local keys = redis.call('zrange', KEYS[3], 0-1);
            -- 遍歷zSet所有key,將key的超時(shí)時(shí)間(score) - 當(dāng)前時(shí)間ms
            for i = 1, #keys, 1 do 
              redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);
            end;
              -- 加鎖設(shè)置鎖過期時(shí)間
            redis.call('hset', KEYS[1], ARGV[2], 1);
            redis.call('pexpire', KEYS[1], ARGV[1]);
            return nil;
          end;

          -- 3.線程存在,重入判斷
          if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then
            redis.call('hincrby', KEYS[1], ARGV[2],1);
            redis.call('pexpire', KEYS[1], ARGV[1]);
            return nil;
          end;

          -- 4.返回當(dāng)前線程剩余存活時(shí)間
          local timeout = redis.call('zscore', KEYS[3], ARGV[2]);
              if timeout ~= false then
            -- 過期時(shí)間timeout的值在下方設(shè)置,此處的減法算出的依舊是當(dāng)前線程的ttl
            return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);
          end;

          -- 5.尾節(jié)點(diǎn)剩余存活時(shí)間
          local lastThreadId = redis.call('lindex', KEYS[2], -1);
          local ttl;
          -- 尾節(jié)點(diǎn)不空 && 尾節(jié)點(diǎn)非當(dāng)前線程
          if lastThreadId ~= false and lastThreadId ~= ARGV[2then
            -- 計(jì)算隊(duì)尾節(jié)點(diǎn)剩余存活時(shí)間
            ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);
          else
            -- 獲取lock_name剩余存活時(shí)間
            ttl = redis.call('pttl', KEYS[1]);
          end;

          -- 6.末尾排隊(duì)
          -- zSet 超時(shí)時(shí)間(score),尾節(jié)點(diǎn)ttl + 當(dāng)前時(shí)間 + 5000ms + 當(dāng)前時(shí)間,無則新增,有則更新
          -- 線程id放入隊(duì)列尾部排隊(duì),無則插入,有則不再插入
          local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);
          if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then
            redis.call('rpush', KEYS[2], ARGV[2]);
          end;
          return ttl;

          1.公平鎖加鎖步驟

          通過以上Lua,可以發(fā)現(xiàn),lua操作的關(guān)鍵結(jié)構(gòu)是列表(list)和有序集合(zSet)。

          其中l(wèi)ist維護(hù)了一個(gè)等待的線程隊(duì)列redisson_lock_queue:{xxx},zSet維護(hù)了一個(gè)線程超時(shí)情況的有序集合redisson_lock_timeout:{xxx},盡管lua較長(zhǎng),但是可以拆分為6個(gè)步驟

          1.隊(duì)列清理

          • 保證隊(duì)列中只有未過期的等待線程

          2.首次加鎖

          • hset加鎖,pexpire過期時(shí)間

          3.重入判斷

          • 此處同可重入鎖lua

          4.返回ttl

          5.計(jì)算尾節(jié)點(diǎn)ttl

          • 初始值為鎖的剩余過期時(shí)間

          6.末尾排隊(duì)

          • ttl + 2 * currentTime + waitTime是score的默認(rèn)值計(jì)算公式

          2.模擬

          如果模擬以下順序,就會(huì)明了redisson公平鎖整個(gè)加鎖流程

          假設(shè) t1 10:00:00 < t2 10:00:10 < t3 10:00:20

          t1:當(dāng)線程1初次獲取鎖

          1.等待隊(duì)列無頭節(jié)點(diǎn),跳出死循環(huán)->2

          2.不存在該鎖 && 不存在線程等待隊(duì)列 成立

          2.1 lpop和zerm、zincrby都是無效操作,只有加鎖生效,說明是首次加鎖,加鎖后返回nil

          加鎖成功,線程1獲取到鎖,結(jié)束

          t2:線程2嘗試獲取鎖(線程1未釋放鎖)

          1.等待隊(duì)列無頭節(jié)點(diǎn),跳出死循環(huán)->2

          2.不存在該鎖 不成立->3

          3.非重入線程 ->4

          4.score無值 ->5

          5.尾節(jié)點(diǎn)為空,設(shè)置ttl初始值為lock_name的ttl -> 6

          6.按照ttl + waitTime + currentTime + currentTime 來設(shè)置zSet超時(shí)時(shí)間score,并且加入等待隊(duì)列,線程2為頭節(jié)點(diǎn)

          score = 20S + 5000ms + 10:00:10 + 10:00:10 = 10:00:35 + 10:00:10

          t3:線程3嘗試獲取鎖(線程1未釋放鎖)

          1.等待隊(duì)列有頭節(jié)點(diǎn)

          1.1未過期->2

          2.不存在該鎖不成立->3

          3.非重入線程->4

          4.score無值 ->5

          5.尾節(jié)點(diǎn)不為空 && 尾節(jié)點(diǎn)線程為2,非當(dāng)前線程

          5.1取出之前設(shè)置的score,減去當(dāng)前時(shí)間:ttl = score - currentTime ->6

          6.按照ttl + waitTime + currentTime + currentTime 來設(shè)置zSet超時(shí)時(shí)間score,并且加入等待隊(duì)列

          score = 10S + 5000ms + 10:00:20 + 10:00:20 = 10:00:35 + 10:00:20

          如此一來,三個(gè)需要搶奪一把鎖的線程,完成了一次排隊(duì),在list中排列他們等待線程id,在zSet中存放過期時(shí)間(便于排列優(yōu)先級(jí))。其中返回ttl的線程2客戶端、線程3客戶端將會(huì)一直按一定間隔自旋重復(fù)執(zhí)行該段Lua,嘗試加鎖,如此一來便和AQS有了異曲同工之處。

          而當(dāng)線程1釋放鎖之后(這里依舊有通過Pub/Sub發(fā)布解鎖消息,通知其他線程獲取)

          10:00:30 線程2嘗試獲取鎖(線程1已釋放鎖)

          1.等待隊(duì)列有頭節(jié)點(diǎn),未過期->2

          2.不存在該鎖 & 等待隊(duì)列頭節(jié)點(diǎn)是當(dāng)前線程 成立

          2.1刪除當(dāng)前線程的隊(duì)列信息和zSet信息,超時(shí)時(shí)間為:

          線程2 10:00:35 + 10:00:10 - 10:00:30 = 10:00:15

          線程3 10:00:35 + 10:00:20 - 10:00:30 = 10:00:25

          2.2線程2獲取到鎖,重新設(shè)置過期時(shí)間

          加鎖成功,線程2獲取到鎖,結(jié)束

          排隊(duì)結(jié)構(gòu)如圖

          公平鎖的釋放腳本和重入鎖類似,多了一步加鎖開頭的清理過期key的while true邏輯,在此不再展開篇幅描述。

          由上可以看出,Redisson公平鎖的玩法類似于延遲隊(duì)列的玩法,核心都在Redis的List和zSet結(jié)構(gòu)的搭配,但又借鑒了AQS實(shí)現(xiàn),在定時(shí)判斷頭節(jié)點(diǎn)上如出一轍(watchDog),保證了鎖的競(jìng)爭(zhēng)公平和互斥。并發(fā)場(chǎng)景下,lua腳本里,zSet的score很好地解決了順序插入的問題,排列好優(yōu)先級(jí)。

          并且為了防止因異常而退出的線程無法清理,每次請(qǐng)求都會(huì)判斷頭節(jié)點(diǎn)的過期情況給予清理,最后釋放時(shí)通過CHANNEL通知訂閱線程可以來獲取鎖,重復(fù)一開始的步驟,順利交接到下一個(gè)順序線程。

          六、總結(jié)

          Redisson整體實(shí)現(xiàn)分布式加解鎖流程的實(shí)現(xiàn)稍顯復(fù)雜,作者Rui Gu對(duì)Netty和JUC、Redis研究深入,利用了很多高級(jí)特性和語義,值得深入學(xué)習(xí),本次介紹也只是單機(jī)Redis下鎖實(shí)現(xiàn)。

          Redisson也提供了多機(jī)情況下的聯(lián)鎖MultiLock:

          https://github.com/redisson/redisson/wiki/8.-分布式鎖和同步器#81-可重入鎖reentrant-lock

          和官方推薦的紅鎖RedLock:

          https://github.com/redisson/redisson/wiki/8.-分布式鎖和同步器#84-紅鎖redlock

          所以,當(dāng)你真的需要分布式鎖時(shí),不妨先來Redisson里找找。

          來源:juejin.cn/post/6961380552519712798


          瀏覽 49
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩国产精品毛片 | 国产在线观看一区 | 色噜噜在线国产 | 欧美成人一区二区三区高清 | 欧美成人三级网站 |