Redisson 是如何實現(xiàn)分布式鎖的?
點擊上方藍色“小哈學(xué)Java”,選擇“設(shè)為星標(biāo)”
回復(fù)“資源”獲取獨家整理的學(xué)習(xí)資料!


來源:tech.lede.com/2017/03/08/rd/server/Redisson
Maven配置 RedissonLock簡單示例 源碼中使用到的Redis命令 源碼中使用到的lua腳本語義 源碼分析 總結(jié)
針對項目中使用的分布式鎖進行簡單的示例配置以及源碼解析,并列舉源碼中使用到的一些基礎(chǔ)知識點,但是沒有對redisson中使用到的netty知識進行解析。
本篇主要是對以下幾個方面進行了探索
Maven配置 RedissonLock簡單示例 源碼中使用到的Redis命令 源碼中使用到的lua腳本語義 源碼分析
Maven配置
<dependency>
????<groupId>org.redissongroupId>
????<artifactId>redissonartifactId>
????<version>2.2.12version>
dependency>
<dependency>
????<groupId>com.fasterxml.jackson.coregroupId>
????<artifactId>jackson-annotationsartifactId>
????<version>2.6.0version>
dependency>
RedissonLock簡單示例
redission支持4種連接redis方式,分別為單機、主從、Sentinel、Cluster 集群,項目中使用的連接方式是Sentinel。
redis服務(wù)器不在本地的同學(xué)請注意權(quán)限問題。
Sentinel配置
Config?config?=?new?Config();
config.useSentinelServers().addSentinelAddress("127.0.0.1:6479",?"127.0.0.1:6489").setMasterName("master").setPassword("password").setDatabase(0);
RedissonClient?redisson?=?Redisson.create(config);
簡單使用
RLock?lock?=?redisson.getLock("test_lock");
try{
????boolean?isLock=lock.tryLock();
????if(isLock){
????????doBusiness();
????}
}catch(exception?e){
}finally{
????lock.unlock();
}
源碼中使用到的Redis命令
分布式鎖主要需要以下redis命令,這里列舉一下。在源碼分析部分可以繼續(xù)參照命令的操作含義。
EXISTS key :當(dāng) key 存在,返回1;若給定的 key 不存在,返回0。 GETSET key value:將給定 key 的值設(shè)為 value ,并返回 key 的舊值 (old value),當(dāng) key 存在但不是字符串類型時,返回一個錯誤,當(dāng)key不存在時,返回nil。 GET key:返回 key 所關(guān)聯(lián)的字符串值,如果 key 不存在那么返回 nil。 DEL key [KEY …]:刪除給定的一個或多個 key ,不存在的 key 會被忽略,返回實際刪除的key的個數(shù)(integer)。 HSET key field value:給一個key 設(shè)置一個{field=value}的組合值,如果key沒有就直接賦值并返回1,如果field已有,那么就更新value的值,并返回0. HEXISTS key field:當(dāng)key中存儲著field的時候返回1,如果key或者field至少有一個不存在返回0。 HINCRBY key field increment:將存儲在key中的哈希(Hash)對象中的指定字段field的值加上增量increment。如果鍵key不存在,一個保存了哈希對象的新建將被創(chuàng)建。如果字段field不存在,在進行當(dāng)前操作前,其將被創(chuàng)建,且對應(yīng)的值被置為0,返回值是增量之后的值 PEXPIRE key milliseconds:設(shè)置存活時間,單位是毫秒。expire操作單位是秒。 PUBLISH channel message:向channel post一個message內(nèi)容的消息,返回接收消息的客戶端數(shù)。
源碼中使用到的lua腳本語義
Redisson源碼中,執(zhí)行redis命令的是lua腳本,其中主要用到如下幾個概念。
redis.call() 是執(zhí)行redis命令. KEYS[1] 是指腳本中第1個參數(shù) ARGV[1] 是指腳本中第一個參數(shù)的值 返回值中nil與false同一個意思。
需要注意的是,在redis執(zhí)行l(wèi)ua腳本時,相當(dāng)于一個redis級別的鎖,不能執(zhí)行其他操作,類似于原子操作,也是redisson實現(xiàn)的一個關(guān)鍵點。
另外,如果lua腳本執(zhí)行過程中出現(xiàn)了異常或者redis服務(wù)器直接宕掉了,執(zhí)行redis的根據(jù)日志回復(fù)的命令,會將腳本中已經(jīng)執(zhí)行的命令在日志中刪除。
源碼分析
RLOCK結(jié)構(gòu)
public?interface?RLock?extends?Lock,?RExpirable?{
????void?lockInterruptibly(long?leaseTime,?TimeUnit?unit)?throws?InterruptedException;
????boolean?tryLock(long?waitTime,?long?leaseTime,?TimeUnit?unit)?throws?InterruptedException;
????void?lock(long?leaseTime,?TimeUnit?unit);
????void?forceUnlock();
????boolean?isLocked();
????boolean?isHeldByCurrentThread();
????int?getHoldCount();
????Future?unlockAsync() ;
????Future?tryLockAsync() ;
????Future?lockAsync() ;
????Future?lockAsync(long?leaseTime,?TimeUnit?unit) ;
????Future?tryLockAsync(long?waitTime,?TimeUnit?unit) ;
????Future?tryLockAsync(long?waitTime,?long?leaseTime,?TimeUnit?unit) ;
}
該接口主要繼承了Lock接口, 并擴展了部分方法, 比如:boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)新加入的leaseTime主要是用來設(shè)置鎖的過期時間, 如果超過leaseTime還沒有解鎖的話, redis就強制解鎖. leaseTime的默認時間是30s
RedissonLock獲取鎖 tryLock源碼
Future?tryLockInnerAsync(long?leaseTime,?TimeUnit?unit,?long?threadId)? {
???????internalLockLeaseTime?=?unit.toMillis(leaseTime);
???????return?commandExecutor.evalWriteAsync(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_LONG,
?????????????????"if?(redis.call('exists',?KEYS[1])?==?0)?then?"?+
?????????????????????"redis.call('hset',?KEYS[1],?ARGV[2],?1);?"?+
?????????????????????"redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+
?????????????????????"return?nil;?"?+
?????????????????"end;?"?+
?????????????????"if?(redis.call('hexists',?KEYS[1],?ARGV[2])?==?1)?then?"?+
?????????????????????"redis.call('hincrby',?KEYS[1],?ARGV[2],?1);?"?+
?????????????????????"redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+
?????????????????????"return?nil;?"?+
?????????????????"end;?"?+
?????????????????"return?redis.call('pttl',?KEYS[1]);",
???????????????????Collections.其中:
KEYS[1] 表示的是 getName() ,代表的是鎖名 test_lock ARGV[1] 表示的是 internalLockLeaseTime 默認值是30s ARGV[2] 表示的是 getLockName(threadId) 代表的是 id:threadId 用鎖對象id+線程id, 表示當(dāng)前訪問線程,用于區(qū)分不同服務(wù)器上的線程。
逐句分析:
if?(redis.call('exists',?KEYS[1])?==?0)?then
?????????redis.call('hset',?KEYS[1],?ARGV[2],?1);
?????????redis.call('pexpire',?KEYS[1],?ARGV[1]);
?????????return?nil;
?????????end;if?(redis.call('exists',?KEYS[1])?==?0)?then
?????????redis.call('hset',?KEYS[1],?ARGV[2],?1);
?????????redis.call('pexpire',?KEYS[1],?ARGV[1]);
?????????return?nil;
?????????end;
if (redis.call(‘exists’, KEYS[1]) == 0) 如果鎖名稱不存在
then redis.call(‘hset’, KEYS[1], ARGV[2],1) 則向redis中添加一個key為test_lock的set,并且向set中添加一個field為線程id,值=1的鍵值對,表示此線程的重入次數(shù)為1
redis.call(‘pexpire’, KEYS[1], ARGV[1]) 設(shè)置set的過期時間,防止當(dāng)前服務(wù)器出問題后導(dǎo)致死鎖,return nil; end;返回nil 結(jié)束
if?(redis.call('hexists',?KEYS[1],?ARGV[2])?==?1)?then
?????????redis.call('hincrby',?KEYS[1],?ARGV[2],?1);
?????????redis.call('pexpire',?KEYS[1],?ARGV[1]);
?????????return?nil;
?????????end;
if (redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1) 如果鎖是存在的,檢測是否是當(dāng)前線程持有鎖,如果是當(dāng)前線程持有鎖
then redis.call(‘hincrby’, KEYS[1], ARGV[2], 1)則將該線程重入的次數(shù)++
redis.call(‘pexpire’, KEYS[1], ARGV[1]) 并且重新設(shè)置該鎖的有效時間
return nil; end;返回nil,結(jié)束
return?redis.call('pttl',?KEYS[1]);
鎖存在, 但不是當(dāng)前線程加的鎖,則返回鎖的過期時間。
RedissonLock解鎖 unlock源碼
@Override
????public?void?unlock()?{
????????Boolean?opStatus?=?commandExecutor.evalWrite(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_BOOLEAN,
????????????????????????"if?(redis.call('exists',?KEYS[1])?==?0)?then?"?+
????????????????????????????"redis.call('publish',?KEYS[2],?ARGV[1]);?"?+
????????????????????????????"return?1;?"?+
????????????????????????"end;"?+
????????????????????????"if?(redis.call('hexists',?KEYS[1],?ARGV[3])?==?0)?then?"?+
????????????????????????????"return?nil;"?+
????????????????????????"end;?"?+
????????????????????????"local?counter?=?redis.call('hincrby',?KEYS[1],?ARGV[3],?-1);?"?+
????????????????????????"if?(counter?>?0)?then?"?+
????????????????????????????"redis.call('pexpire',?KEYS[1],?ARGV[2]);?"?+
????????????????????????????"return?0;?"?+
????????????????????????"else?"?+
????????????????????????????"redis.call('del',?KEYS[1]);?"?+
????????????????????????????"redis.call('publish',?KEYS[2],?ARGV[1]);?"?+
????????????????????????????"return?1;?"+
????????????????????????"end;?"?+
????????????????????????"return?nil;",
????????????????????????Arrays.其中:
KEYS[1] 表示的是getName() 代表鎖名test_lock KEYS[2] 表示getChanelName() 表示的是發(fā)布訂閱過程中使用的Chanel ARGV[1] 表示的是LockPubSub.unLockMessage 是解鎖消息,實際代表的是數(shù)字 0,代表解鎖消息 ARGV[2] 表示的是internalLockLeaseTime 默認的有效時間 30s ARGV[3] 表示的是getLockName(thread.currentThread().getId()),是當(dāng)前鎖id+線程id
語義分析:
if?(redis.call('exists',?KEYS[1])?==?0)?then
?????????redis.call('publish',?KEYS[2],?ARGV[1]);
?????????return?1;
?????????end;
if (redis.call(‘exists’, KEYS[1]) == 0) 如果鎖已經(jīng)不存在(可能是因為過期導(dǎo)致不存在,也可能是因為已經(jīng)解鎖)
then redis.call(‘publish’, KEYS[2], ARGV[1]) 則發(fā)布鎖解除的消息
return 1; end 返回1結(jié)束
if?(redis.call('hexists',?KEYS[1],?ARGV[3])?==?0)?then
?????????return?nil;
?????????end;
if (redis.call(‘hexists’, KEYS[1], ARGV[3]) == 0) 如果鎖存在,但是若果當(dāng)前線程不是加鎖的線
then return nil;end則直接返回nil 結(jié)束
local?counter?=?redis.call('hincrby',?KEYS[1],?ARGV[3],?-1);
if?(counter?>?0)?then
?????????redis.call('pexpire',?KEYS[1],?ARGV[2]);
?????????return?0;
else
?????????redis.call('del',?KEYS[1]);
?????????redis.call('publish',?KEYS[2],?ARGV[1]);
?????????return?1;
end;
local counter = redis.call(‘hincrby’, KEYS[1], ARGV[3], -1) 如果是鎖是當(dāng)前線程所添加,定義變量counter,表示當(dāng)前線程的重入次數(shù)-1,即直接將重入次數(shù)-1
if (counter > 0)如果重入次數(shù)大于0,表示該線程還有其他任務(wù)需要執(zhí)行
then redis.call(‘pexpire’, KEYS[1], ARGV[2]) 則重新設(shè)置該鎖的有效時間
return 0 返回0結(jié)束
else redis.call(‘del’, KEYS[1])否則表示該線程執(zhí)行結(jié)束,刪除該鎖
redis.call(‘publish’, KEYS[2], ARGV[1])并且發(fā)布該鎖解除的消息
return 1; end;返回1結(jié)束
return?nil;
其他情況返回nil并結(jié)束
if?(opStatus?==?null)?{
????????????throw?new?IllegalMonitorStateException("attempt?to?unlock?lock,?not?locked?by?current?thread?by?node?id:?"
????????????????????+?id?+?"?thread-id:?"?+?Thread.currentThread().getId());
????????}
腳本執(zhí)行結(jié)束之后,如果返回值不是0或1,即當(dāng)前線程去解鎖其他線程的加鎖時,拋出異常。
RedissonLock強制解鎖源碼
@Override
????public?void?forceUnlock()?{
????????get(forceUnlockAsync());
????}
????Future?forceUnlockAsync()? {
????????cancelExpirationRenewal();
????????return?commandExecutor.evalWriteAsync(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_BOOLEAN,
????????????????"if?(redis.call('del',?KEYS[1])?==?1)?then?"
????????????????+?"redis.call('publish',?KEYS[2],?ARGV[1]);?"
????????????????+?"return?1?"
????????????????+?"else?"
????????????????+?"return?0?"
????????????????+?"end",
????????????????Arrays.以上是強制解鎖的源碼,在源碼中并沒有找到forceUnlock()被調(diào)用的痕跡(也有可能是我沒有找對),但是forceUnlockAsync()方法被調(diào)用的地方很多,大多都是在清理資源時刪除鎖。此部分比較簡單粗暴,刪除鎖成功則并發(fā)布鎖被刪除的消息,返回1結(jié)束,否則返回0結(jié)束。
總結(jié)
這里只是簡單的一個redisson分布式鎖的測試用例,并分析了執(zhí)行l(wèi)ua腳本這部分,如果要繼續(xù)分析執(zhí)行結(jié)束之后的操作,需要進行netty源碼分析 ,redisson使用了netty完成異步和同步的處理。
END
有熱門推薦?
2.?Spring Validation最佳實踐及其實現(xiàn)原理,參數(shù)校驗沒那么簡單!
最近面試BAT,整理一份面試資料《Java面試BATJ通關(guān)手冊》,覆蓋了Java核心技術(shù)、JVM、Java并發(fā)、SSM、微服務(wù)、數(shù)據(jù)庫、數(shù)據(jù)結(jié)構(gòu)等等。
獲取方式:點“在看”,關(guān)注公眾號并回復(fù)?Java?領(lǐng)取,更多內(nèi)容陸續(xù)奉上。
文章有幫助的話,在看,轉(zhuǎn)發(fā)吧。
謝謝支持喲 (*^__^*)

