redission 分布式鎖
點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”
優(yōu)質(zhì)文章,第一時(shí)間送達(dá)
? 作者?|??穆穆兔兔
來(lái)源 |? urlify.cn/RrAR3u
66套java從入門到精通實(shí)戰(zhàn)課程分享
概述
分布式系統(tǒng)有一個(gè)著名的理論CAP,指在一個(gè)分布式系統(tǒng)中,最多只能同時(shí)滿足一致性(Consistency)、可用性(Availability)和分區(qū)容錯(cuò)性(Partition tolerance)這三項(xiàng)中的兩項(xiàng)。所以在設(shè)計(jì)系統(tǒng)時(shí),往往需要權(quán)衡,在CAP中作選擇。當(dāng)然,這個(gè)理論也并不一定完美,不同系統(tǒng)對(duì)CAP的要求級(jí)別不一樣,選擇需要考慮方方面面。
在微服務(wù)系統(tǒng)中,一個(gè)請(qǐng)求存在多級(jí)跨服務(wù)調(diào)用,往往需要犧牲強(qiáng)一致性老保證系統(tǒng)高可用,比如通過(guò)分布式事務(wù),異步消息等手段完成。但還是有的場(chǎng)景,需要阻塞所有節(jié)點(diǎn)的所有線程,對(duì)共享資源的訪問(wèn)。比如并發(fā)時(shí)“超賣”和“余額減為負(fù)數(shù)”等情況。
本地鎖可以通過(guò)語(yǔ)言本身支持,要實(shí)現(xiàn)分布式鎖,就必須依賴中間件,數(shù)據(jù)庫(kù)、redis、zookeeper等。
分布式鎖特性
不管使用什么中間件,有幾點(diǎn)是實(shí)現(xiàn)分布式鎖必須要考慮到的。
互斥:互斥好像是必須的,否則怎么叫鎖。
死鎖: 如果一個(gè)線程獲得鎖,然后掛了,并沒(méi)有釋放鎖,致使其他節(jié)點(diǎn)(線程)永遠(yuǎn)無(wú)法獲取鎖,這就是死鎖。分布式鎖必須做到避免死鎖。
性能: 高并發(fā)分布式系統(tǒng)中,線程互斥等待會(huì)成為性能瓶頸,需要好的中間件和實(shí)現(xiàn)來(lái)保證性能。
鎖特性:考慮到復(fù)雜的場(chǎng)景,分布式鎖不能只是加鎖,然后一直等待。最好實(shí)現(xiàn)如Java Lock的一些功能如:鎖判斷,超時(shí)設(shè)置,可重入性等。
Redis實(shí)現(xiàn)之Redisson原理
redission實(shí)現(xiàn)了JDK中的Lock接口,所以使用方式一樣,只是Redssion的鎖是分布式的。如下:
RLock?lock?=?redisson.getLock("className");
lock.lock();
try?{
//?do?sth.
}?finally?{
lock.unlock();
}好,Lock主要實(shí)現(xiàn)是RedissionLock。

先來(lái)看常用的Lock方法實(shí)現(xiàn)。
@Override
public?void?lock()?{
try?{
lockInterruptibly();
}?catch?(InterruptedException?e)?{
Thread.currentThread().interrupt();
}
}
@Override
public?void?lockInterruptibly()?throws?InterruptedException?{
lockInterruptibly(-1,?null);
}再看lockInterruptibly方法:
@Override
public?void?lockInterruptibly(long?leaseTime,?TimeUnit?unit)?throws?InterruptedException?{
long?threadId?=?Thread.currentThread().getId();
//?獲取鎖
Long?ttl?=?tryAcquire(leaseTime,?unit,?threadId);
if?(ttl?==?null)?{?//?獲取成功
return;
}
?
//?異步訂閱redis?chennel
RFuture?future?=?subscribe(threadId);
commandExecutor.syncSubscription(future);?//?阻塞獲取訂閱結(jié)果
?
try?{
while?(true)?{//?循環(huán)判斷知道獲取鎖
ttl?=?tryAcquire(leaseTime,?unit,?threadId);
//?lock?acquired
if?(ttl?==?null)?{
break;
}
?
//?waiting?for?message
if?(ttl?>=?0)?{
getEntry(threadId).getLatch().tryAcquire(ttl,?TimeUnit.MILLISECONDS);
}?else?{
getEntry(threadId).getLatch().acquire();
}
}
}?finally?{
unsubscribe(future,?threadId);//?取消訂閱
}
} 總結(jié)lockInterruptibly:獲取鎖,不成功則訂閱釋放鎖的消息,獲得消息前阻塞。得到釋放通知后再去循環(huán)獲取鎖。
下面重點(diǎn)看看如何獲取鎖:Long ttl = tryAcquire(leaseTime, unit, threadId)
private?Long?tryAcquire(long?leaseTime,?TimeUnit?unit,?long?threadId)?{
return?get(tryAcquireAsync(leaseTime,?unit,?threadId));//?通過(guò)異步獲取鎖,但get(future)實(shí)現(xiàn)同步
}
private??RFuture?tryAcquireAsync(long?leaseTime,?TimeUnit?unit,?final?long?threadId)?{
if?(leaseTime?!=?-1)?{?//1?如果設(shè)置了超時(shí)時(shí)間,直接調(diào)用?tryLockInnerAsync
return?tryLockInnerAsync(leaseTime,?unit,?threadId,?RedisCommands.EVAL_LONG);
}
//2?如果leaseTime==-1,則默認(rèn)超時(shí)時(shí)間為30s
RFuture?ttlRemainingFuture?=?tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS,?TimeUnit.SECONDS,?threadId,?RedisCommands.EVAL_LONG);
//3?監(jiān)聽Future,獲取Future返回值ttlRemaining(剩余超時(shí)時(shí)間),獲取鎖成功,但是ttlRemaining,則刷新過(guò)期時(shí)間
ttlRemainingFuture.addListener(new?FutureListener()?{
@Override
public?void?operationComplete(Future?future)?throws?Exception?{
if?(!future.isSuccess())?{
return;
}
?
Long?ttlRemaining?=?future.getNow();
//?lock?acquired
if?(ttlRemaining?==?null)?{
scheduleExpirationRenewal(threadId);
}
}
});
return?ttlRemainingFuture;
}
已經(jīng)在注釋中解釋了,需要注意的是,此處用到了Netty的Future-listen模型,可以看看我的另一篇對(duì)Future的簡(jiǎn)單講解:給Future一個(gè)Promise。
下面就是最重要的redis獲取鎖的方法tryLockInnerAsync:
?RFuture?tryLockInnerAsync(long?leaseTime,?TimeUnit?unit,?long?threadId,?RedisStrictCommand?command)?{
internalLockLeaseTime?=?unit.toMillis(leaseTime);
return?commandExecutor.evalWriteAsync(
getName(),
LongCodec.INSTANCE,
command,
"if?(redis.call('exists',?KEYS[1])?==?0)?then?"?+
"redis.call('hset',?KEYS[1],?ARGV[2],?1);?"?+
"redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+
"return?nil;?"?+
"end;?"?+
"if?(redis.call('hexists',?KEYS[1],?ARGV[2])?==?1)?then?"?+
"redis.call('hincrby',?KEYS[1],?ARGV[2],?1);?"?+
"redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+
"return?nil;?"?+
"end;?"?+
"return?redis.call('pttl',?KEYS[1]);",
Collections. 這個(gè)方法主要就是調(diào)用redis執(zhí)行eval lua,為什么使用eval,因?yàn)閞edis對(duì)lua腳本執(zhí)行具有原子性。把這個(gè)方法翻譯一下:
--?1.?沒(méi)被鎖{key不存在}
eval?"return?redis.call('exists',?KEYS[1])"?1?myLock
--?(1)?設(shè)置Lock為key,uuid:threadId為filed,?filed值為1
eval?"return?redis.call('hset',?KEYS[1],?ARGV[2],?1)"?1?myLock?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
--?(2)?設(shè)置key過(guò)期時(shí)間{防止獲取鎖后線程掛掉導(dǎo)致死鎖}
eval?"return?redis.call('pexpire',?KEYS[1],?ARGV[1])"?1?myLock?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
?
--?2.?已經(jīng)被同線程獲得鎖{key存在并且field存在}
eval?"return?redis.call('hexists',?KEYS[1],?ARGV[2])"?1?myLock?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
--?(1)?可重入,但filed字段+1
eval?"return?redis.call('hincrby',?KEYS[1],?ARGV[2],1)"?1?myLock?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
--?(2)?刷新過(guò)去時(shí)間
eval?"return?redis.call('pexpire',?KEYS[1],?ARGV[1])"?1?myLock?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
?
-- 3. 已經(jīng)被其他線程鎖住{key存在,但是field不存在}:以毫秒為單位返回 key 的剩余超時(shí)時(shí)間
eval?"return?redis.call('pttl',?KEYS[1])"?1?myLock這就是核心獲取鎖的方式,下面直接釋放鎖方法unlockInnerAsync:
--?1.?key不存在
eval?"return?redis.call('exists',?KEYS[1])"?2?myLock?redisson_lock__channel_lock?0?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
--?(1)?發(fā)送釋放鎖的消息,返回1,釋放成功
eval?"return?redis.call('publish',?KEYS[2],?ARGV[1])"?2?myLock?redisson_lock__channel_lock?0?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
?
--?2.?key存在,但field不存在,說(shuō)明自己不是鎖持有者,無(wú)權(quán)釋放,直接return?nil
eval?"return?redis.call('hexists',?KEYS[1],?ARGV[3])"?2?myLock?redisson_lock__channel_lock?0?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
eval?"return?nil"
?
--?3.?filed存在,說(shuō)明是本線程在鎖,但有可能其他地方重入鎖,不能直接釋放,應(yīng)該-1
eval?"return?redis.call('hincrby',?KEYS[1],?ARGV[3],-1)"?2?myLock?redisson_lock__channel_lock?0?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
?
-- 4. 如果減1后大于0,說(shuō)明還有其他重入鎖,刷新過(guò)期時(shí)間,返回0。
eval?"return?redis.call('pexpire',?KEYS[1],?ARGV[2])"?2?myLock?redisson_lock__channel_lock?0?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
?
--?5.?如果不大于0,說(shuō)明最后一把鎖,需要釋放
--?刪除key
eval?"return?redis.call('del',?KEYS[1])"?2?myLock?redisson_lock__channel_lock?0?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
--?發(fā)釋放消息
eval?"return?redis.call('publish',?KEYS[2],?ARGV[1])"?2?myLock?redisson_lock__channel_lock?0?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
--?返回1,釋放成功從釋放鎖代碼中看到,刪除key后會(huì)發(fā)送消息,所以上文提到獲取鎖失敗后,阻塞訂閱此消息。
另外,上文提到刷新過(guò)期時(shí)間方法scheduleExpirationRenewal,指線程獲取鎖后需要不斷刷新失效時(shí)間,避免未執(zhí)行完鎖就失效。這個(gè)方法的實(shí)現(xiàn)原理也類似,只是使用了Netty的TimerTask,每到過(guò)期時(shí)間1/3就去重新刷一次,如果key不存在則停止刷新。Timer實(shí)現(xiàn)大概如下:
private?static?void?nettyTimer()?{
final?int?expireTime?=?6;
EventExecutorGroup?group?=?new?DefaultEventExecutorGroup(1);
final?Timer?timer?=?new?HashedWheelTimer();
timer.newTimeout(timerTask?->?{
Future?future?=?group.submit(()?->?{
System.out.println("刷新key的失效時(shí)間為"+expireTime?+"秒");
return?false;//?但key不存在時(shí),返回true
});
future.addListener(future1?->?{
if?(!future.getNow())?{
nettyTimer();
}
});
},?expireTime/3,?TimeUnit.SECONDS);
}
參考列表:
一分鐘實(shí)現(xiàn)分布式鎖
粉絲福利:Java從入門到入土學(xué)習(xí)路線圖
???

?長(zhǎng)按上方微信二維碼?2 秒
感謝點(diǎn)贊支持下哈?
