Redisson 是如何實(shí)現(xiàn)分布式鎖的?
作者 |?bravoban
針對(duì)項(xiàng)目中使用的分布式鎖進(jìn)行簡(jiǎn)單的示例配置以及源碼解析,并列舉源碼中使用到的一些基礎(chǔ)知識(shí)點(diǎn),但是沒有對(duì)redisson中使用到的netty知識(shí)進(jìn)行解析。
本篇主要是對(duì)以下幾個(gè)方面進(jìn)行了探索
Maven配置
RedissonLock簡(jiǎn)單示例
源碼中使用到的Redis命令
源碼中使用到的lua腳本語義
源碼分析
Maven配置
<dependency>
????<groupId>org.redissongroupId>
????<artifactId>redissonartifactId>
????<version>2.2.12version>
dependency>
<dependency>
????<groupId>com.fasterxml.jackson.coregroupId>
????<artifactId>jackson-annotationsartifactId>
????<version>2.6.0version>
dependency>
RedissonLock簡(jiǎn)單示例
redission支持4種連接redis方式,分別為單機(jī)、主從、Sentinel、Cluster 集群,項(xiàng)目中使用的連接方式是Sentinel。
redis服務(wù)器不在本地的同學(xué)請(qǐng)注意權(quán)限問題。
Sentinel配置
Config?config?=?new?Config();
config.useSentinelServers().addSentinelAddress("127.0.0.1:6479",?"127.0.0.1:6489").setMasterName("master").setPassword("password").setDatabase(0);
RedissonClient?redisson?=?Redisson.create(config);
簡(jiǎn)單使用
RLock?lock?=?redisson.getLock("test_lock");
try{
????boolean?isLock=lock.tryLock();
????if(isLock){
????????doBusiness();
????}
}catch(exception?e){
}finally{
????lock.unlock();
}
源碼中使用到的Redis命令
分布式鎖主要需要以下redis命令,這里列舉一下。在源碼分析部分可以繼續(xù)參照命令的操作含義。
EXISTS key :當(dāng) key 存在,返回1;若給定的 key 不存在,返回0。
GETSET key value:將給定 key 的值設(shè)為 value ,并返回 key 的舊值 (old value),當(dāng) key 存在但不是字符串類型時(shí),返回一個(gè)錯(cuò)誤,當(dāng)key不存在時(shí),返回nil。
GET key:返回 key 所關(guān)聯(lián)的字符串值,如果 key 不存在那么返回 nil。
DEL key [KEY …]:刪除給定的一個(gè)或多個(gè) key ,不存在的 key 會(huì)被忽略,返回實(shí)際刪除的key的個(gè)數(shù)(integer)。
HSET key field value:給一個(gè)key 設(shè)置一個(gè){field=value}的組合值,如果key沒有就直接賦值并返回1,如果field已有,那么就更新value的值,并返回0.
HEXISTS key field:當(dāng)key中存儲(chǔ)著field的時(shí)候返回1,如果key或者field至少有一個(gè)不存在返回0。
HINCRBY key field increment:將存儲(chǔ)在key中的哈希(Hash)對(duì)象中的指定字段field的值加上增量increment。如果鍵key不存在,一個(gè)保存了哈希對(duì)象的新建將被創(chuàng)建。如果字段field不存在,在進(jìn)行當(dāng)前操作前,其將被創(chuàng)建,且對(duì)應(yīng)的值被置為0,返回值是增量之后的值
PEXPIRE key milliseconds:設(shè)置存活時(shí)間,單位是毫秒。expire操作單位是秒。
PUBLISH channel message:向channel post一個(gè)message內(nèi)容的消息,返回接收消息的客戶端數(shù)。
源碼中使用到的lua腳本語義
Redisson源碼中,執(zhí)行redis命令的是lua腳本,其中主要用到如下幾個(gè)概念。
redis.call() 是執(zhí)行redis命令.
KEYS[1] 是指腳本中第1個(gè)參數(shù)
ARGV[1] 是指腳本中第一個(gè)參數(shù)的值
返回值中nil與false同一個(gè)意思。
需要注意的是,在redis執(zhí)行l(wèi)ua腳本時(shí),相當(dāng)于一個(gè)redis級(jí)別的鎖,不能執(zhí)行其他操作,類似于原子操作,也是redisson實(shí)現(xiàn)的一個(gè)關(guān)鍵點(diǎn)。
另外,如果lua腳本執(zhí)行過程中出現(xiàn)了異常或者redis服務(wù)器直接宕掉了,執(zhí)行redis的根據(jù)日志回復(fù)的命令,會(huì)將腳本中已經(jīng)執(zhí)行的命令在日志中刪除。
源碼分析
RLOCK結(jié)構(gòu)
public?interface?RLock?extends?Lock,?RExpirable?{
????void?lockInterruptibly(long?leaseTime,?TimeUnit?unit)?throws?InterruptedException;
????boolean?tryLock(long?waitTime,?long?leaseTime,?TimeUnit?unit)?throws?InterruptedException;
????void?lock(long?leaseTime,?TimeUnit?unit);
????void?forceUnlock();
????boolean?isLocked();
????boolean?isHeldByCurrentThread();
????int?getHoldCount();
????Future?unlockAsync() ;
????Future?tryLockAsync() ;
????Future?lockAsync() ;
????Future?lockAsync(long?leaseTime,?TimeUnit?unit) ;
????Future?tryLockAsync(long?waitTime,?TimeUnit?unit) ;
????Future?tryLockAsync(long?waitTime,?long?leaseTime,?TimeUnit?unit) ;
}
該接口主要繼承了Lock接口, 并擴(kuò)展了部分方法, 比如:boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)新加入的leaseTime主要是用來設(shè)置鎖的過期時(shí)間, 如果超過leaseTime還沒有解鎖的話, redis就強(qiáng)制解鎖. leaseTime的默認(rèn)時(shí)間是30s
RedissonLock獲取鎖 tryLock源碼
Future?tryLockInnerAsync(long?leaseTime,?TimeUnit?unit,?long?threadId)? {
???????internalLockLeaseTime?=?unit.toMillis(leaseTime);
???????return?commandExecutor.evalWriteAsync(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_LONG,
?????????????????"if?(redis.call('exists',?KEYS[1])?==?0)?then?"?+
?????????????????????"redis.call('hset',?KEYS[1],?ARGV[2],?1);?"?+
?????????????????????"redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+
?????????????????????"return?nil;?"?+
?????????????????"end;?"?+
?????????????????"if?(redis.call('hexists',?KEYS[1],?ARGV[2])?==?1)?then?"?+
?????????????????????"redis.call('hincrby',?KEYS[1],?ARGV[2],?1);?"?+
?????????????????????"redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+
?????????????????????"return?nil;?"?+
?????????????????"end;?"?+
?????????????????"return?redis.call('pttl',?KEYS[1]);",
???????????????????Collections.其中:
KEYS[1] 表示的是 getName() ,代表的是鎖名 test_lock
ARGV[1] 表示的是 internalLockLeaseTime 默認(rèn)值是30s
ARGV[2] 表示的是 getLockName(threadId) 代表的是 id:threadId 用鎖對(duì)象id+線程id, 表示當(dāng)前訪問線程,用于區(qū)分不同服務(wù)器上的線程。
逐句分析:
if?(redis.call('exists',?KEYS[1])?==?0)?then?
?????????redis.call('hset',?KEYS[1],?ARGV[2],?1);?
?????????redis.call('pexpire',?KEYS[1],?ARGV[1]);?
?????????return?nil;
?????????end;if?(redis.call('exists',?KEYS[1])?==?0)?then?
?????????redis.call('hset',?KEYS[1],?ARGV[2],?1);?
?????????redis.call('pexpire',?KEYS[1],?ARGV[1]);?
?????????return?nil;
?????????end;
if (redis.call(‘exists’, KEYS[1]) == 0)?如果鎖名稱不存在
then redis.call(‘hset’, KEYS[1], ARGV[2],1)?則向redis中添加一個(gè)key為test_lock的set,并且向set中添加一個(gè)field為線程id,值=1的鍵值對(duì),表示此線程的重入次數(shù)為1
redis.call(‘pexpire’, KEYS[1], ARGV[1])?設(shè)置set的過期時(shí)間,防止當(dāng)前服務(wù)器出問題后導(dǎo)致死鎖,return nil; end;返回nil 結(jié)束
if?(redis.call('hexists',?KEYS[1],?ARGV[2])?==?1)?then?
?????????redis.call('hincrby',?KEYS[1],?ARGV[2],?1);?
?????????redis.call('pexpire',?KEYS[1],?ARGV[1]);
?????????return?nil;?
?????????end;
if (redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1)?如果鎖是存在的,檢測(cè)是否是當(dāng)前線程持有鎖,如果是當(dāng)前線程持有鎖
then redis.call(‘hincrby’, KEYS[1], ARGV[2], 1)則將該線程重入的次數(shù)++
redis.call(‘pexpire’, KEYS[1], ARGV[1])?并且重新設(shè)置該鎖的有效時(shí)間
return nil; end;返回nil,結(jié)束
return?redis.call('pttl',?KEYS[1]);
鎖存在, 但不是當(dāng)前線程加的鎖,則返回鎖的過期時(shí)間。
RedissonLock解鎖 unlock源碼
@Override
????public?void?unlock()?{
????????Boolean?opStatus?=?commandExecutor.evalWrite(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_BOOLEAN,
????????????????????????"if?(redis.call('exists',?KEYS[1])?==?0)?then?"?+
????????????????????????????"redis.call('publish',?KEYS[2],?ARGV[1]);?"?+
????????????????????????????"return?1;?"?+
????????????????????????"end;"?+
????????????????????????"if?(redis.call('hexists',?KEYS[1],?ARGV[3])?==?0)?then?"?+
????????????????????????????"return?nil;"?+
????????????????????????"end;?"?+
????????????????????????"local?counter?=?redis.call('hincrby',?KEYS[1],?ARGV[3],?-1);?"?+
????????????????????????"if?(counter?>?0)?then?"?+
????????????????????????????"redis.call('pexpire',?KEYS[1],?ARGV[2]);?"?+
????????????????????????????"return?0;?"?+
????????????????????????"else?"?+
????????????????????????????"redis.call('del',?KEYS[1]);?"?+
????????????????????????????"redis.call('publish',?KEYS[2],?ARGV[1]);?"?+
????????????????????????????"return?1;?"+
????????????????????????"end;?"?+
????????????????????????"return?nil;",
????????????????????????Arrays.其中:
KEYS[1] 表示的是getName() 代表鎖名test_lock
KEYS[2] 表示getChanelName() 表示的是發(fā)布訂閱過程中使用的Chanel
ARGV[1] 表示的是LockPubSub.unLockMessage 是解鎖消息,實(shí)際代表的是數(shù)字 0,代表解鎖消息
ARGV[2] 表示的是internalLockLeaseTime 默認(rèn)的有效時(shí)間 30s
ARGV[3] 表示的是getLockName(thread.currentThread().getId()),是當(dāng)前鎖id+線程id
語義分析:
if?(redis.call('exists',?KEYS[1])?==?0)?then
?????????redis.call('publish',?KEYS[2],?ARGV[1]);
?????????return?1;
?????????end;
if (redis.call(‘exists’, KEYS[1]) == 0)?如果鎖已經(jīng)不存在(可能是因?yàn)檫^期導(dǎo)致不存在,也可能是因?yàn)橐呀?jīng)解鎖)
then redis.call(‘publish’, KEYS[2], ARGV[1])?則發(fā)布鎖解除的消息
return 1; end?返回1結(jié)束
if?(redis.call('hexists',?KEYS[1],?ARGV[3])?==?0)?then?
?????????return?nil;
?????????end;
if (redis.call(‘hexists’, KEYS[1], ARGV[3]) == 0)?如果鎖存在,但是若果當(dāng)前線程不是加鎖的線
then return nil;end則直接返回nil 結(jié)束
local?counter?=?redis.call('hincrby',?KEYS[1],?ARGV[3],?-1);
if?(counter?>?0)?then
?????????redis.call('pexpire',?KEYS[1],?ARGV[2]);?
?????????return?0;
else
?????????redis.call('del',?KEYS[1]);
?????????redis.call('publish',?KEYS[2],?ARGV[1]);
?????????return?1;
end;
local counter = redis.call(‘hincrby’, KEYS[1], ARGV[3], -1)?如果是鎖是當(dāng)前線程所添加,定義變量counter,表示當(dāng)前線程的重入次數(shù)-1,即直接將重入次數(shù)-1
if (counter > 0)如果重入次數(shù)大于0,表示該線程還有其他任務(wù)需要執(zhí)行
then redis.call(‘pexpire’, KEYS[1], ARGV[2])?則重新設(shè)置該鎖的有效時(shí)間
return 0?返回0結(jié)束
else redis.call(‘del’, KEYS[1])否則表示該線程執(zhí)行結(jié)束,刪除該鎖
redis.call(‘publish’, KEYS[2], ARGV[1])并且發(fā)布該鎖解除的消息
return 1; end;返回1結(jié)束
return?nil;
其他情況返回nil并結(jié)束
if?(opStatus?==?null)?{
????????????throw?new?IllegalMonitorStateException("attempt?to?unlock?lock,?not?locked?by?current?thread?by?node?id:?"
????????????????????+?id?+?"?thread-id:?"?+?Thread.currentThread().getId());
????????}
腳本執(zhí)行結(jié)束之后,如果返回值不是0或1,即當(dāng)前線程去解鎖其他線程的加鎖時(shí),拋出異常。
RedissonLock強(qiáng)制解鎖源碼
@Override
????public?void?forceUnlock()?{
????????get(forceUnlockAsync());
????}
????Future?forceUnlockAsync()?{
????????cancelExpirationRenewal();
????????return?commandExecutor.evalWriteAsync(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_BOOLEAN,
????????????????"if?(redis.call('del',?KEYS[1])?==?1)?then?"
????????????????+?"redis.call('publish',?KEYS[2],?ARGV[1]);?"
????????????????+?"return?1?"
????????????????+?"else?"
????????????????+?"return?0?"
????????????????+?"end",
????????????????Arrays. 以上是強(qiáng)制解鎖的源碼,在源碼中并沒有找到forceUnlock()被調(diào)用的痕跡(也有可能是我沒有找對(duì)),但是forceUnlockAsync()方法被調(diào)用的地方很多,大多都是在清理資源時(shí)刪除鎖。此部分比較簡(jiǎn)單粗暴,刪除鎖成功則并發(fā)布鎖被刪除的消息,返回1結(jié)束,否則返回0結(jié)束。
總結(jié)
這里只是簡(jiǎn)單的一個(gè)redisson分布式鎖的測(cè)試用例,并分析了執(zhí)行l(wèi)ua腳本這部分,如果要繼續(xù)分析執(zhí)行結(jié)束之后的操作,需要進(jìn)行netty源碼分析 ,redisson使用了netty完成異步和同步的處理。
