<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Redisson 是如何實現(xiàn)分布式鎖的?

          共 9327字,需瀏覽 19分鐘

           ·

          2020-09-05 05:31

          點擊上方藍色“小哈學(xué)Java”,選擇“設(shè)為星標(biāo)

          回復(fù)“資源”獲取獨家整理的學(xué)習(xí)資料!

          來源:tech.lede.com/2017/03/08/rd/server/Redisson

          • Maven配置
          • RedissonLock簡單示例
          • 源碼中使用到的Redis命令
          • 源碼中使用到的lua腳本語義
          • 源碼分析
          • 總結(jié)

          針對項目中使用的分布式鎖進行簡單的示例配置以及源碼解析,并列舉源碼中使用到的一些基礎(chǔ)知識點,但是沒有對redisson中使用到的netty知識進行解析。

          本篇主要是對以下幾個方面進行了探索

          • Maven配置
          • RedissonLock簡單示例
          • 源碼中使用到的Redis命令
          • 源碼中使用到的lua腳本語義
          • 源碼分析

          Maven配置

          <dependency>
          ????<groupId>org.redissongroupId>
          ????<artifactId>redissonartifactId>
          ????<version>2.2.12version>
          dependency>
          <dependency>
          ????<groupId>com.fasterxml.jackson.coregroupId>
          ????<artifactId>jackson-annotationsartifactId>
          ????<version>2.6.0version>
          dependency>

          RedissonLock簡單示例

          redission支持4種連接redis方式,分別為單機、主從、Sentinel、Cluster 集群,項目中使用的連接方式是Sentinel。

          redis服務(wù)器不在本地的同學(xué)請注意權(quán)限問題。

          Sentinel配置

          Config?config?=?new?Config();
          config.useSentinelServers().addSentinelAddress("127.0.0.1:6479",?"127.0.0.1:6489").setMasterName("master").setPassword("password").setDatabase(0);
          RedissonClient?redisson?=?Redisson.create(config);

          簡單使用

          RLock?lock?=?redisson.getLock("test_lock");
          try{
          ????boolean?isLock=lock.tryLock();
          ????if(isLock){
          ????????doBusiness();
          ????}
          }catch(exception?e){
          }finally{
          ????lock.unlock();
          }

          源碼中使用到的Redis命令

          分布式鎖主要需要以下redis命令,這里列舉一下。在源碼分析部分可以繼續(xù)參照命令的操作含義。

          1. EXISTS key :當(dāng) key 存在,返回1;若給定的 key 不存在,返回0。
          2. GETSET key value:將給定 key 的值設(shè)為 value ,并返回 key 的舊值 (old value),當(dāng) key 存在但不是字符串類型時,返回一個錯誤,當(dāng)key不存在時,返回nil。
          3. GET key:返回 key 所關(guān)聯(lián)的字符串值,如果 key 不存在那么返回 nil。
          4. DEL key [KEY …]:刪除給定的一個或多個 key ,不存在的 key 會被忽略,返回實際刪除的key的個數(shù)(integer)。
          5. HSET key field value:給一個key 設(shè)置一個{field=value}的組合值,如果key沒有就直接賦值并返回1,如果field已有,那么就更新value的值,并返回0.
          6. HEXISTS key field:當(dāng)key中存儲著field的時候返回1,如果key或者field至少有一個不存在返回0。
          7. HINCRBY key field increment:將存儲在key中的哈希(Hash)對象中的指定字段field的值加上增量increment。如果鍵key不存在,一個保存了哈希對象的新建將被創(chuàng)建。如果字段field不存在,在進行當(dāng)前操作前,其將被創(chuàng)建,且對應(yīng)的值被置為0,返回值是增量之后的值
          8. PEXPIRE key milliseconds:設(shè)置存活時間,單位是毫秒。expire操作單位是秒。
          9. PUBLISH channel message:向channel post一個message內(nèi)容的消息,返回接收消息的客戶端數(shù)。

          源碼中使用到的lua腳本語義

          Redisson源碼中,執(zhí)行redis命令的是lua腳本,其中主要用到如下幾個概念。

          • redis.call() 是執(zhí)行redis命令.
          • KEYS[1] 是指腳本中第1個參數(shù)
          • ARGV[1] 是指腳本中第一個參數(shù)的值
          • 返回值中nil與false同一個意思。

          需要注意的是,在redis執(zhí)行l(wèi)ua腳本時,相當(dāng)于一個redis級別的鎖,不能執(zhí)行其他操作,類似于原子操作,也是redisson實現(xiàn)的一個關(guān)鍵點。

          另外,如果lua腳本執(zhí)行過程中出現(xiàn)了異常或者redis服務(wù)器直接宕掉了,執(zhí)行redis的根據(jù)日志回復(fù)的命令,會將腳本中已經(jīng)執(zhí)行的命令在日志中刪除。

          源碼分析

          RLOCK結(jié)構(gòu)

          public?interface?RLock?extends?Lock,?RExpirable?{
          ????void?lockInterruptibly(long?leaseTime,?TimeUnit?unit)?throws?InterruptedException;
          ????boolean?tryLock(long?waitTime,?long?leaseTime,?TimeUnit?unit)?throws?InterruptedException;
          ????void?lock(long?leaseTime,?TimeUnit?unit);
          ????void?forceUnlock();
          ????boolean?isLocked();
          ????boolean?isHeldByCurrentThread();
          ????int?getHoldCount();
          ????Future?unlockAsync();
          ????Future?tryLockAsync();
          ????Future?lockAsync();
          ????Future?lockAsync(long?leaseTime,?TimeUnit?unit);
          ????Future?tryLockAsync(long?waitTime,?TimeUnit?unit);
          ????Future?tryLockAsync(long?waitTime,?long?leaseTime,?TimeUnit?unit);
          }

          該接口主要繼承了Lock接口, 并擴展了部分方法, 比如:boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)新加入的leaseTime主要是用來設(shè)置鎖的過期時間, 如果超過leaseTime還沒有解鎖的話, redis就強制解鎖. leaseTime的默認時間是30s

          RedissonLock獲取鎖 tryLock源碼

          Future?tryLockInnerAsync(long?leaseTime,?TimeUnit?unit,?long?threadId)?{
          ???????internalLockLeaseTime?=?unit.toMillis(leaseTime);
          ???????return?commandExecutor.evalWriteAsync(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_LONG,
          ?????????????????"if?(redis.call('exists',?KEYS[1])?==?0)?then?"?+
          ?????????????????????"redis.call('hset',?KEYS[1],?ARGV[2],?1);?"?+
          ?????????????????????"redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+
          ?????????????????????"return?nil;?"?+
          ?????????????????"end;?"?+
          ?????????????????"if?(redis.call('hexists',?KEYS[1],?ARGV[2])?==?1)?then?"?+
          ?????????????????????"redis.call('hincrby',?KEYS[1],?ARGV[2],?1);?"?+
          ?????????????????????"redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+
          ?????????????????????"return?nil;?"?+
          ?????????????????"end;?"?+
          ?????????????????"return?redis.call('pttl',?KEYS[1]);",
          ???????????????????Collections.singletonList(getName()),?internalLockLeaseTime,?getLockName(threadId));
          ???}

          其中:

          • KEYS[1] 表示的是 getName() ,代表的是鎖名 test_lock
          • ARGV[1] 表示的是 internalLockLeaseTime 默認值是30s
          • ARGV[2] 表示的是 getLockName(threadId) 代表的是 id:threadId 用鎖對象id+線程id, 表示當(dāng)前訪問線程,用于區(qū)分不同服務(wù)器上的線程。

          逐句分析:

          if?(redis.call('exists',?KEYS[1])?==?0)?then
          ?????????redis.call('hset',?KEYS[1],?ARGV[2],?1);
          ?????????redis.call('pexpire',?KEYS[1],?ARGV[1]);
          ?????????return?nil;
          ?????????end;if?(redis.call('exists',?KEYS[1])?==?0)?then
          ?????????redis.call('hset',?KEYS[1],?ARGV[2],?1);
          ?????????redis.call('pexpire',?KEYS[1],?ARGV[1]);
          ?????????return?nil;
          ?????????end;

          if (redis.call(‘exists’, KEYS[1]) == 0) 如果鎖名稱不存在

          then redis.call(‘hset’, KEYS[1], ARGV[2],1) 則向redis中添加一個key為test_lock的set,并且向set中添加一個field為線程id,值=1的鍵值對,表示此線程的重入次數(shù)為1

          redis.call(‘pexpire’, KEYS[1], ARGV[1]) 設(shè)置set的過期時間,防止當(dāng)前服務(wù)器出問題后導(dǎo)致死鎖,return nil; end;返回nil 結(jié)束

          if?(redis.call('hexists',?KEYS[1],?ARGV[2])?==?1)?then
          ?????????redis.call('hincrby',?KEYS[1],?ARGV[2],?1);
          ?????????redis.call('pexpire',?KEYS[1],?ARGV[1]);
          ?????????return?nil;
          ?????????end;

          if (redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1) 如果鎖是存在的,檢測是否是當(dāng)前線程持有鎖,如果是當(dāng)前線程持有鎖

          then redis.call(‘hincrby’, KEYS[1], ARGV[2], 1)則將該線程重入的次數(shù)++

          redis.call(‘pexpire’, KEYS[1], ARGV[1]) 并且重新設(shè)置該鎖的有效時間

          return nil; end;返回nil,結(jié)束

          return?redis.call('pttl',?KEYS[1]);

          鎖存在, 但不是當(dāng)前線程加的鎖,則返回鎖的過期時間。

          RedissonLock解鎖 unlock源碼

          @Override
          ????public?void?unlock()?{
          ????????Boolean?opStatus?=?commandExecutor.evalWrite(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_BOOLEAN,
          ????????????????????????"if?(redis.call('exists',?KEYS[1])?==?0)?then?"?+
          ????????????????????????????"redis.call('publish',?KEYS[2],?ARGV[1]);?"?+
          ????????????????????????????"return?1;?"?+
          ????????????????????????"end;"?+
          ????????????????????????"if?(redis.call('hexists',?KEYS[1],?ARGV[3])?==?0)?then?"?+
          ????????????????????????????"return?nil;"?+
          ????????????????????????"end;?"?+
          ????????????????????????"local?counter?=?redis.call('hincrby',?KEYS[1],?ARGV[3],?-1);?"?+
          ????????????????????????"if?(counter?>?0)?then?"?+
          ????????????????????????????"redis.call('pexpire',?KEYS[1],?ARGV[2]);?"?+
          ????????????????????????????"return?0;?"?+
          ????????????????????????"else?"?+
          ????????????????????????????"redis.call('del',?KEYS[1]);?"?+
          ????????????????????????????"redis.call('publish',?KEYS[2],?ARGV[1]);?"?+
          ????????????????????????????"return?1;?"+
          ????????????????????????"end;?"?+
          ????????????????????????"return?nil;",
          ????????????????????????Arrays.asList(getName(),?getChannelName()),?LockPubSub.unlockMessage,?internalLockLeaseTime,?getLockName(Thread.currentThread().getId()));
          ????????if?(opStatus?==?null)?{
          ????????????throw?new?IllegalMonitorStateException("attempt?to?unlock?lock,?not?locked?by?current?thread?by?node?id:?"
          ????????????????????+?id?+?"?thread-id:?"?+?Thread.currentThread().getId());
          ????????}
          ????????if?(opStatus)?{
          ????????????cancelExpirationRenewal();
          ????????}
          ????}

          其中:

          • KEYS[1] 表示的是getName() 代表鎖名test_lock
          • KEYS[2] 表示getChanelName() 表示的是發(fā)布訂閱過程中使用的Chanel
          • ARGV[1] 表示的是LockPubSub.unLockMessage 是解鎖消息,實際代表的是數(shù)字 0,代表解鎖消息
          • ARGV[2] 表示的是internalLockLeaseTime 默認的有效時間 30s
          • ARGV[3] 表示的是getLockName(thread.currentThread().getId()),是當(dāng)前鎖id+線程id

          語義分析:

          if?(redis.call('exists',?KEYS[1])?==?0)?then
          ?????????redis.call('publish',?KEYS[2],?ARGV[1]);
          ?????????return?1;
          ?????????end;

          if (redis.call(‘exists’, KEYS[1]) == 0) 如果鎖已經(jīng)不存在(可能是因為過期導(dǎo)致不存在,也可能是因為已經(jīng)解鎖)

          then redis.call(‘publish’, KEYS[2], ARGV[1]) 則發(fā)布鎖解除的消息

          return 1; end 返回1結(jié)束

          if?(redis.call('hexists',?KEYS[1],?ARGV[3])?==?0)?then
          ?????????return?nil;
          ?????????end;

          if (redis.call(‘hexists’, KEYS[1], ARGV[3]) == 0) 如果鎖存在,但是若果當(dāng)前線程不是加鎖的線

          then return nil;end則直接返回nil 結(jié)束

          local?counter?=?redis.call('hincrby',?KEYS[1],?ARGV[3],?-1);
          if?(counter?>?0)?then
          ?????????redis.call('pexpire',?KEYS[1],?ARGV[2]);
          ?????????return?0;
          else
          ?????????redis.call('del',?KEYS[1]);
          ?????????redis.call('publish',?KEYS[2],?ARGV[1]);
          ?????????return?1;
          end;

          local counter = redis.call(‘hincrby’, KEYS[1], ARGV[3], -1) 如果是鎖是當(dāng)前線程所添加,定義變量counter,表示當(dāng)前線程的重入次數(shù)-1,即直接將重入次數(shù)-1

          if (counter > 0)如果重入次數(shù)大于0,表示該線程還有其他任務(wù)需要執(zhí)行

          then redis.call(‘pexpire’, KEYS[1], ARGV[2]) 則重新設(shè)置該鎖的有效時間

          return 0 返回0結(jié)束

          else redis.call(‘del’, KEYS[1])否則表示該線程執(zhí)行結(jié)束,刪除該鎖

          redis.call(‘publish’, KEYS[2], ARGV[1])并且發(fā)布該鎖解除的消息

          return 1; end;返回1結(jié)束

          return?nil;

          其他情況返回nil并結(jié)束

          if?(opStatus?==?null)?{
          ????????????throw?new?IllegalMonitorStateException("attempt?to?unlock?lock,?not?locked?by?current?thread?by?node?id:?"
          ????????????????????+?id?+?"?thread-id:?"?+?Thread.currentThread().getId());
          ????????}

          腳本執(zhí)行結(jié)束之后,如果返回值不是0或1,即當(dāng)前線程去解鎖其他線程的加鎖時,拋出異常。

          RedissonLock強制解鎖源碼

          @Override
          ????public?void?forceUnlock()?{
          ????????get(forceUnlockAsync());
          ????}
          ????Future?forceUnlockAsync()?{
          ????????cancelExpirationRenewal();
          ????????return?commandExecutor.evalWriteAsync(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_BOOLEAN,
          ????????????????"if?(redis.call('del',?KEYS[1])?==?1)?then?"
          ????????????????+?"redis.call('publish',?KEYS[2],?ARGV[1]);?"
          ????????????????+?"return?1?"
          ????????????????+?"else?"
          ????????????????+?"return?0?"
          ????????????????+?"end",
          ????????????????Arrays.asList(getName(),?getChannelName()),?LockPubSub.unlockMessage);
          ????}

          以上是強制解鎖的源碼,在源碼中并沒有找到forceUnlock()被調(diào)用的痕跡(也有可能是我沒有找對),但是forceUnlockAsync()方法被調(diào)用的地方很多,大多都是在清理資源時刪除鎖。此部分比較簡單粗暴,刪除鎖成功則并發(fā)布鎖被刪除的消息,返回1結(jié)束,否則返回0結(jié)束。

          總結(jié)

          這里只是簡單的一個redisson分布式鎖的測試用例,并分析了執(zhí)行l(wèi)ua腳本這部分,如果要繼續(xù)分析執(zhí)行結(jié)束之后的操作,需要進行netty源碼分析 ,redisson使用了netty完成異步和同步的處理。

          END


          有熱門推薦?

          1.?監(jiān)控系統(tǒng)選型,這篇不可不讀!

          2.?Spring Validation最佳實踐及其實現(xiàn)原理,參數(shù)校驗沒那么簡單!

          3.?詳解:UML類圖符號、各種關(guān)系說明以及舉例

          4.?What?數(shù)據(jù)量巨大還不分庫分表? Sharding-JDBC 入門與項目實戰(zhàn)

          最近面試BAT,整理一份面試資料Java面試BATJ通關(guān)手冊,覆蓋了Java核心技術(shù)、JVM、Java并發(fā)、SSM、微服務(wù)、數(shù)據(jù)庫、數(shù)據(jù)結(jié)構(gòu)等等。

          獲取方式:點“在看”,關(guān)注公眾號并回復(fù)?Java?領(lǐng)取,更多內(nèi)容陸續(xù)奉上。

          文章有幫助的話,在看,轉(zhuǎn)發(fā)吧。

          謝謝支持喲 (*^__^*)

          瀏覽 55
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                    <th id="afajh"><progress id="afajh"></progress></th>
                    91狠狠色丁香婷婷综合久久 | 成人黄色网址大全 | 色综合天天操 | 乱伦天| 欧美操逼-百度日本亚洲 |