<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Redisson 是如何實(shí)現(xiàn)分布式鎖的?

          共 8980字,需瀏覽 18分鐘

           ·

          2020-10-24 04:41

          作者 |?bravoban

          來源 |?tech.lede.com/2017/03/08/rd/server/Redisson

          針對(duì)項(xiàng)目中使用的分布式鎖進(jìn)行簡(jiǎn)單的示例配置以及源碼解析,并列舉源碼中使用到的一些基礎(chǔ)知識(shí)點(diǎn),但是沒有對(duì)redisson中使用到的netty知識(shí)進(jìn)行解析。

          本篇主要是對(duì)以下幾個(gè)方面進(jìn)行了探索

          • Maven配置

          • RedissonLock簡(jiǎn)單示例

          • 源碼中使用到的Redis命令

          • 源碼中使用到的lua腳本語義

          • 源碼分析

          Maven配置

          <dependency>
          ????<groupId>org.redissongroupId>
          ????<artifactId>redissonartifactId>
          ????<version>2.2.12version>
          dependency>
          <dependency>
          ????<groupId>com.fasterxml.jackson.coregroupId>
          ????<artifactId>jackson-annotationsartifactId>
          ????<version>2.6.0version>
          dependency>

          RedissonLock簡(jiǎn)單示例

          redission支持4種連接redis方式,分別為單機(jī)、主從、Sentinel、Cluster 集群,項(xiàng)目中使用的連接方式是Sentinel。

          redis服務(wù)器不在本地的同學(xué)請(qǐng)注意權(quán)限問題。

          Sentinel配置

          Config?config?=?new?Config();
          config.useSentinelServers().addSentinelAddress("127.0.0.1:6479",?"127.0.0.1:6489").setMasterName("master").setPassword("password").setDatabase(0);
          RedissonClient?redisson?=?Redisson.create(config);

          簡(jiǎn)單使用

          RLock?lock?=?redisson.getLock("test_lock");
          try{
          ????boolean?isLock=lock.tryLock();
          ????if(isLock){
          ????????doBusiness();
          ????}
          }catch(exception?e){
          }finally{
          ????lock.unlock();
          }

          源碼中使用到的Redis命令

          分布式鎖主要需要以下redis命令,這里列舉一下。在源碼分析部分可以繼續(xù)參照命令的操作含義。

          1. EXISTS key :當(dāng) key 存在,返回1;若給定的 key 不存在,返回0。

          2. GETSET key value:將給定 key 的值設(shè)為 value ,并返回 key 的舊值 (old value),當(dāng) key 存在但不是字符串類型時(shí),返回一個(gè)錯(cuò)誤,當(dāng)key不存在時(shí),返回nil。

          3. GET key:返回 key 所關(guān)聯(lián)的字符串值,如果 key 不存在那么返回 nil。

          4. DEL key [KEY …]:刪除給定的一個(gè)或多個(gè) key ,不存在的 key 會(huì)被忽略,返回實(shí)際刪除的key的個(gè)數(shù)(integer)。

          5. HSET key field value:給一個(gè)key 設(shè)置一個(gè){field=value}的組合值,如果key沒有就直接賦值并返回1,如果field已有,那么就更新value的值,并返回0.

          6. HEXISTS key field:當(dāng)key中存儲(chǔ)著field的時(shí)候返回1,如果key或者field至少有一個(gè)不存在返回0。

          7. HINCRBY key field increment:將存儲(chǔ)在key中的哈希(Hash)對(duì)象中的指定字段field的值加上增量increment。如果鍵key不存在,一個(gè)保存了哈希對(duì)象的新建將被創(chuàng)建。如果字段field不存在,在進(jìn)行當(dāng)前操作前,其將被創(chuàng)建,且對(duì)應(yīng)的值被置為0,返回值是增量之后的值

          8. PEXPIRE key milliseconds:設(shè)置存活時(shí)間,單位是毫秒。expire操作單位是秒。

          9. PUBLISH channel message:向channel post一個(gè)message內(nèi)容的消息,返回接收消息的客戶端數(shù)。

          源碼中使用到的lua腳本語義

          Redisson源碼中,執(zhí)行redis命令的是lua腳本,其中主要用到如下幾個(gè)概念。

          • redis.call() 是執(zhí)行redis命令.

          • KEYS[1] 是指腳本中第1個(gè)參數(shù)

          • ARGV[1] 是指腳本中第一個(gè)參數(shù)的值

          • 返回值中nil與false同一個(gè)意思。

          需要注意的是,在redis執(zhí)行l(wèi)ua腳本時(shí),相當(dāng)于一個(gè)redis級(jí)別的鎖,不能執(zhí)行其他操作,類似于原子操作,也是redisson實(shí)現(xiàn)的一個(gè)關(guān)鍵點(diǎn)。

          另外,如果lua腳本執(zhí)行過程中出現(xiàn)了異常或者redis服務(wù)器直接宕掉了,執(zhí)行redis的根據(jù)日志回復(fù)的命令,會(huì)將腳本中已經(jīng)執(zhí)行的命令在日志中刪除。

          源碼分析

          RLOCK結(jié)構(gòu)

          public?interface?RLock?extends?Lock,?RExpirable?{
          ????void?lockInterruptibly(long?leaseTime,?TimeUnit?unit)?throws?InterruptedException;
          ????boolean?tryLock(long?waitTime,?long?leaseTime,?TimeUnit?unit)?throws?InterruptedException;
          ????void?lock(long?leaseTime,?TimeUnit?unit);
          ????void?forceUnlock();
          ????boolean?isLocked();
          ????boolean?isHeldByCurrentThread();
          ????int?getHoldCount();
          ????Future?unlockAsync();
          ????Future?tryLockAsync();
          ????Future?lockAsync();
          ????Future?lockAsync(long?leaseTime,?TimeUnit?unit);
          ????Future?tryLockAsync(long?waitTime,?TimeUnit?unit);
          ????Future?tryLockAsync(long?waitTime,?long?leaseTime,?TimeUnit?unit);
          }

          該接口主要繼承了Lock接口, 并擴(kuò)展了部分方法, 比如:boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)新加入的leaseTime主要是用來設(shè)置鎖的過期時(shí)間, 如果超過leaseTime還沒有解鎖的話, redis就強(qiáng)制解鎖. leaseTime的默認(rèn)時(shí)間是30s

          RedissonLock獲取鎖 tryLock源碼

          Future?tryLockInnerAsync(long?leaseTime,?TimeUnit?unit,?long?threadId)?{
          ???????internalLockLeaseTime?=?unit.toMillis(leaseTime);
          ???????return?commandExecutor.evalWriteAsync(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_LONG,
          ?????????????????"if?(redis.call('exists',?KEYS[1])?==?0)?then?"?+
          ?????????????????????"redis.call('hset',?KEYS[1],?ARGV[2],?1);?"?+
          ?????????????????????"redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+
          ?????????????????????"return?nil;?"?+
          ?????????????????"end;?"?+
          ?????????????????"if?(redis.call('hexists',?KEYS[1],?ARGV[2])?==?1)?then?"?+
          ?????????????????????"redis.call('hincrby',?KEYS[1],?ARGV[2],?1);?"?+
          ?????????????????????"redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+
          ?????????????????????"return?nil;?"?+
          ?????????????????"end;?"?+
          ?????????????????"return?redis.call('pttl',?KEYS[1]);",
          ???????????????????Collections.singletonList(getName()),?internalLockLeaseTime,?getLockName(threadId));
          ???}

          其中:

          • KEYS[1] 表示的是 getName() ,代表的是鎖名 test_lock

          • ARGV[1] 表示的是 internalLockLeaseTime 默認(rèn)值是30s

          • ARGV[2] 表示的是 getLockName(threadId) 代表的是 id:threadId 用鎖對(duì)象id+線程id, 表示當(dāng)前訪問線程,用于區(qū)分不同服務(wù)器上的線程。

          逐句分析:

          if?(redis.call('exists',?KEYS[1])?==?0)?then?
          ?????????redis.call('hset',?KEYS[1],?ARGV[2],?1);?
          ?????????redis.call('pexpire',?KEYS[1],?ARGV[1]);?
          ?????????return?nil;
          ?????????end;if?(redis.call('exists',?KEYS[1])?==?0)?then?
          ?????????redis.call('hset',?KEYS[1],?ARGV[2],?1);?
          ?????????redis.call('pexpire',?KEYS[1],?ARGV[1]);?
          ?????????return?nil;
          ?????????end;

          if (redis.call(‘exists’, KEYS[1]) == 0)?如果鎖名稱不存在

          then redis.call(‘hset’, KEYS[1], ARGV[2],1)?則向redis中添加一個(gè)key為test_lock的set,并且向set中添加一個(gè)field為線程id,值=1的鍵值對(duì),表示此線程的重入次數(shù)為1

          redis.call(‘pexpire’, KEYS[1], ARGV[1])?設(shè)置set的過期時(shí)間,防止當(dāng)前服務(wù)器出問題后導(dǎo)致死鎖,return nil; end;返回nil 結(jié)束

          if?(redis.call('hexists',?KEYS[1],?ARGV[2])?==?1)?then?
          ?????????redis.call('hincrby',?KEYS[1],?ARGV[2],?1);?
          ?????????redis.call('pexpire',?KEYS[1],?ARGV[1]);
          ?????????return?nil;?
          ?????????end;

          if (redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1)?如果鎖是存在的,檢測(cè)是否是當(dāng)前線程持有鎖,如果是當(dāng)前線程持有鎖

          then redis.call(‘hincrby’, KEYS[1], ARGV[2], 1)則將該線程重入的次數(shù)++

          redis.call(‘pexpire’, KEYS[1], ARGV[1])?并且重新設(shè)置該鎖的有效時(shí)間

          return nil; end;返回nil,結(jié)束

          return?redis.call('pttl',?KEYS[1]);

          鎖存在, 但不是當(dāng)前線程加的鎖,則返回鎖的過期時(shí)間。

          RedissonLock解鎖 unlock源碼

          @Override
          ????public?void?unlock()?{
          ????????Boolean?opStatus?=?commandExecutor.evalWrite(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_BOOLEAN,
          ????????????????????????"if?(redis.call('exists',?KEYS[1])?==?0)?then?"?+
          ????????????????????????????"redis.call('publish',?KEYS[2],?ARGV[1]);?"?+
          ????????????????????????????"return?1;?"?+
          ????????????????????????"end;"?+
          ????????????????????????"if?(redis.call('hexists',?KEYS[1],?ARGV[3])?==?0)?then?"?+
          ????????????????????????????"return?nil;"?+
          ????????????????????????"end;?"?+
          ????????????????????????"local?counter?=?redis.call('hincrby',?KEYS[1],?ARGV[3],?-1);?"?+
          ????????????????????????"if?(counter?>?0)?then?"?+
          ????????????????????????????"redis.call('pexpire',?KEYS[1],?ARGV[2]);?"?+
          ????????????????????????????"return?0;?"?+
          ????????????????????????"else?"?+
          ????????????????????????????"redis.call('del',?KEYS[1]);?"?+
          ????????????????????????????"redis.call('publish',?KEYS[2],?ARGV[1]);?"?+
          ????????????????????????????"return?1;?"+
          ????????????????????????"end;?"?+
          ????????????????????????"return?nil;",
          ????????????????????????Arrays.asList(getName(),?getChannelName()),?LockPubSub.unlockMessage,?internalLockLeaseTime,?getLockName(Thread.currentThread().getId()));
          ????????if?(opStatus?==?null)?{
          ????????????throw?new?IllegalMonitorStateException("attempt?to?unlock?lock,?not?locked?by?current?thread?by?node?id:?"
          ????????????????????+?id?+?"?thread-id:?"?+?Thread.currentThread().getId());
          ????????}
          ????????if?(opStatus)?{
          ????????????cancelExpirationRenewal();
          ????????}
          ????}

          其中:

          • KEYS[1] 表示的是getName() 代表鎖名test_lock

          • KEYS[2] 表示getChanelName() 表示的是發(fā)布訂閱過程中使用的Chanel

          • ARGV[1] 表示的是LockPubSub.unLockMessage 是解鎖消息,實(shí)際代表的是數(shù)字 0,代表解鎖消息

          • ARGV[2] 表示的是internalLockLeaseTime 默認(rèn)的有效時(shí)間 30s

          • ARGV[3] 表示的是getLockName(thread.currentThread().getId()),是當(dāng)前鎖id+線程id

          語義分析:

          if?(redis.call('exists',?KEYS[1])?==?0)?then
          ?????????redis.call('publish',?KEYS[2],?ARGV[1]);
          ?????????return?1;
          ?????????end;

          if (redis.call(‘exists’, KEYS[1]) == 0)?如果鎖已經(jīng)不存在(可能是因?yàn)檫^期導(dǎo)致不存在,也可能是因?yàn)橐呀?jīng)解鎖)

          then redis.call(‘publish’, KEYS[2], ARGV[1])?則發(fā)布鎖解除的消息

          return 1; end?返回1結(jié)束

          if?(redis.call('hexists',?KEYS[1],?ARGV[3])?==?0)?then?
          ?????????return?nil;
          ?????????end;

          if (redis.call(‘hexists’, KEYS[1], ARGV[3]) == 0)?如果鎖存在,但是若果當(dāng)前線程不是加鎖的線

          then return nil;end則直接返回nil 結(jié)束

          local?counter?=?redis.call('hincrby',?KEYS[1],?ARGV[3],?-1);
          if?(counter?>?0)?then
          ?????????redis.call('pexpire',?KEYS[1],?ARGV[2]);?
          ?????????return?0;
          else
          ?????????redis.call('del',?KEYS[1]);
          ?????????redis.call('publish',?KEYS[2],?ARGV[1]);
          ?????????return?1;
          end;

          local counter = redis.call(‘hincrby’, KEYS[1], ARGV[3], -1)?如果是鎖是當(dāng)前線程所添加,定義變量counter,表示當(dāng)前線程的重入次數(shù)-1,即直接將重入次數(shù)-1

          if (counter > 0)如果重入次數(shù)大于0,表示該線程還有其他任務(wù)需要執(zhí)行

          then redis.call(‘pexpire’, KEYS[1], ARGV[2])?則重新設(shè)置該鎖的有效時(shí)間

          return 0?返回0結(jié)束

          else redis.call(‘del’, KEYS[1])否則表示該線程執(zhí)行結(jié)束,刪除該鎖

          redis.call(‘publish’, KEYS[2], ARGV[1])并且發(fā)布該鎖解除的消息

          return 1; end;返回1結(jié)束

          return?nil;

          其他情況返回nil并結(jié)束

          if?(opStatus?==?null)?{
          ????????????throw?new?IllegalMonitorStateException("attempt?to?unlock?lock,?not?locked?by?current?thread?by?node?id:?"
          ????????????????????+?id?+?"?thread-id:?"?+?Thread.currentThread().getId());
          ????????}

          腳本執(zhí)行結(jié)束之后,如果返回值不是0或1,即當(dāng)前線程去解鎖其他線程的加鎖時(shí),拋出異常。

          RedissonLock強(qiáng)制解鎖源碼

          @Override
          ????public?void?forceUnlock()?{
          ????????get(forceUnlockAsync());
          ????}
          ????Future?forceUnlockAsync()?{
          ????????cancelExpirationRenewal();
          ????????return?commandExecutor.evalWriteAsync(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_BOOLEAN,
          ????????????????"if?(redis.call('del',?KEYS[1])?==?1)?then?"
          ????????????????+?"redis.call('publish',?KEYS[2],?ARGV[1]);?"
          ????????????????+?"return?1?"
          ????????????????+?"else?"
          ????????????????+?"return?0?"
          ????????????????+?"end",
          ????????????????Arrays.asList(getName(),?getChannelName()),?LockPubSub.unlockMessage);
          ????}

          以上是強(qiáng)制解鎖的源碼,在源碼中并沒有找到forceUnlock()被調(diào)用的痕跡(也有可能是我沒有找對(duì)),但是forceUnlockAsync()方法被調(diào)用的地方很多,大多都是在清理資源時(shí)刪除鎖。此部分比較簡(jiǎn)單粗暴,刪除鎖成功則并發(fā)布鎖被刪除的消息,返回1結(jié)束,否則返回0結(jié)束。

          總結(jié)

          這里只是簡(jiǎn)單的一個(gè)redisson分布式鎖的測(cè)試用例,并分析了執(zhí)行l(wèi)ua腳本這部分,如果要繼續(xù)分析執(zhí)行結(jié)束之后的操作,需要進(jìn)行netty源碼分析 ,redisson使用了netty完成異步和同步的處理。


          瀏覽 25
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                    <th id="afajh"><progress id="afajh"></progress></th>
                    九一免费观看网站 | 午夜男女爽爽 | 成人水蜜桃视频 | 青娱乐精品视频日美 | 欧美三级午夜理伦三级老人 |