Redis 分布式鎖如何自動續(xù)期
閱讀本文大概需要 6?分鐘。
來自:blog.csdn.net/upstream480/article/details/121578638
Redis 實現(xiàn)分布式鎖
指定一個 key 作為鎖標(biāo)記,存入 Redis 中,指定一個 唯一的用戶標(biāo)識作為 value。
當(dāng) key 不存在時才能設(shè)置值,確保同一時間只有一個客戶端進程獲得鎖,滿足互斥性特性。
設(shè)置一個過期時間,防止因系統(tǒng)異常導(dǎo)致沒能刪除這個 key,滿足防死鎖特性。
當(dāng)處理完業(yè)務(wù)之后需要清除這個 key 來釋放鎖,清除 key 時需要校驗 value 值,需要滿足只有加鎖的人才能釋放鎖 。
問題
LockTime設(shè)置過小,鎖自動超時的概率就會增加,鎖異常失效的概率也就會增加;
LockTime設(shè)置過大,萬一服務(wù)出現(xiàn)異常無法正常釋放鎖,那么出現(xiàn)這種異常鎖的時間也就越長。
自動續(xù)期
和釋放鎖的情況一樣,我們需要先判斷持有鎖客戶端是否有變化。否則會造成無論誰持有鎖,守護線程都會去重新設(shè)置鎖的LockTime。
守護線程要在合理的時間再去重新設(shè)置鎖的LockTime,否則會造成資源的浪費。不能動不動就去續(xù)。
如果持有鎖的線程已經(jīng)處理完業(yè)務(wù)了,那么守護線程也應(yīng)該被銷毀。不能業(yè)務(wù)運行結(jié)束了,守護者還在那里繼續(xù)運行,浪費資源。
看門狗

Redissson tryLock
public?boolean?tryLock(long?waitTime,?long?leaseTime,?TimeUnit?unit)?throws?InterruptedException?{
????????long?time?=?unit.toMillis(waitTime);
????????long?current?=?System.currentTimeMillis();
????????long?threadId?=?Thread.currentThread().getId();
????????//?1.嘗試獲取鎖
????????Long?ttl?=?tryAcquire(leaseTime,?unit,?threadId);
????????//?lock?acquired
????????if?(ttl?==?null)?{
????????????return?true;
????????}
????????//?申請鎖的耗時如果大于等于最大等待時間,則申請鎖失敗.
????????time?-=?System.currentTimeMillis()?-?current;
????????if?(time?<=?0)?{
????????????acquireFailed(threadId);
????????????return?false;
????????}
????????current?=?System.currentTimeMillis();
????????/**
?????????* 2.訂閱鎖釋放事件,并通過 await 方法阻塞等待鎖釋放,有效的解決了無效的鎖申請浪費資源的問題:
?????????*?基于信息量,當(dāng)鎖被其它資源占用時,當(dāng)前線程通過?Redis?的?channel?訂閱鎖的釋放事件,一旦鎖釋放會發(fā)消息通知待等待的線程進行競爭.
?????????*
?????????*?當(dāng)?this.await?返回?false,說明等待時間已經(jīng)超出獲取鎖最大等待時間,取消訂閱并返回獲取鎖失敗.
?????????*?當(dāng)?this.await?返回?true,進入循環(huán)嘗試獲取鎖.
?????????*/
????????RFuture?subscribeFuture?=?subscribe(threadId);
????????//?await?方法內(nèi)部是用?CountDownLatch?來實現(xiàn)阻塞,獲取?subscribe?異步執(zhí)行的結(jié)果(應(yīng)用了?Netty?的?Future)
????????if?(!subscribeFuture.await(time,?TimeUnit.MILLISECONDS))?{
????????????if?(!subscribeFuture.cancel(false))?{
????????????????subscribeFuture.onComplete((res,?e)?->?{
????????????????????if?(e?==?null)?{
????????????????????????unsubscribe(subscribeFuture,?threadId);
????????????????????}
????????????????});
????????????}
????????????acquireFailed(threadId);
????????????return?false;
????????}
????????try?{
????????????//?計算獲取鎖的總耗時,如果大于等于最大等待時間,則獲取鎖失敗.
????????????time?-=?System.currentTimeMillis()?-?current;
????????????if?(time?<=?0)?{
????????????????acquireFailed(threadId);
????????????????return?false;
??????????????}
????????????/**
?????????????*?3.收到鎖釋放的信號后,在最大等待時間之內(nèi),循環(huán)一次接著一次的嘗試獲取鎖
?????????????*?獲取鎖成功,則立馬返回?true,
?????????????*?若在最大等待時間之內(nèi)還沒獲取到鎖,則認(rèn)為獲取鎖失敗,返回?false?結(jié)束循環(huán)
?????????????*/
????????????while?(true)?{
????????????????long?currentTime?=?System.currentTimeMillis();
????????????????//?再次嘗試獲取鎖
????????????????ttl?=?tryAcquire(leaseTime,?unit,?threadId);
????????????????//?lock?acquired
????????????????if?(ttl?==?null)?{
????????????????????return?true;
????????????????}
????????????????//?超過最大等待時間則返回?false?結(jié)束循環(huán),獲取鎖失敗
????????????????time?-=?System.currentTimeMillis()?-?currentTime;
????????????????if?(time?<=?0)?{
????????????????????acquireFailed(threadId);
????????????????????return?false;
????????????????}
????????????????/**
?????????????????* 6.阻塞等待鎖(通過信號量(共享鎖)阻塞,等待解鎖消息):
?????????????????*/
????????????????currentTime?=?System.currentTimeMillis();
????????????????if?(ttl?>=?0?&&?ttl?????????????????????//如果剩余時間(ttl)小于wait time ,就在 ttl 時間內(nèi),從Entry的信號量獲取一個許可(除非被中斷或者一直沒有可用的許可)。
????????????????????getEntry(threadId).getLatch().tryAcquire(ttl,?TimeUnit.MILLISECONDS);
????????????????}?else?{
????????????????????//則就在wait?time?時間范圍內(nèi)等待可以通過信號量
????????????????????getEntry(threadId).getLatch().tryAcquire(time,?TimeUnit.MILLISECONDS);
????????????????}
????????????????//?更新剩余的等待時間(最大等待時間-已經(jīng)消耗的阻塞時間)
????????????????time?-=?System.currentTimeMillis()?-?currentTime;
????????????????if?(time?<=?0)?{
????????????????????acquireFailed(threadId);
????????????????????return?false;
????????????????}
????????????}
????????}?finally?{
????????????//?7.無論是否獲得鎖,都要取消訂閱解鎖消息
????????????unsubscribe(subscribeFuture,?threadId);
????????}
????????return?get(tryLockAsync(waitTime,?leaseTime,?unit));
????}
嘗試獲取鎖,返回 null 則說明加鎖成功,返回一個數(shù)值,則說明已經(jīng)存在該鎖,ttl 為鎖的剩余存活時間。
如果此時客戶端 2 進程獲取鎖失敗,那么使用客戶端 2 的線程 id(其實本質(zhì)上就是進程 id)通過 Redis 的 channel 訂閱鎖釋放的事件。如果等待的過程中一直未等到鎖的釋放事件通知,當(dāng)超過最大等待時間則獲取鎖失敗,返回 false,也就是第 39 行代碼。如果等到了鎖的釋放事件的通知,則開始進入一個不斷重試獲取鎖的循環(huán)。
循環(huán)中每次都先試著獲取鎖,并得到已存在的鎖的剩余存活時間。如果在重試中拿到了鎖,則直接返回。如果鎖當(dāng)前還是被占用的,那么等待釋放鎖的消息,具體實現(xiàn)使用了信號量 Semaphore 來阻塞線程,當(dāng)鎖釋放并發(fā)布釋放鎖的消息后,信號量的 release() 方法會被調(diào)用,此時被信號量阻塞的等待隊列中的一個線程就可以繼續(xù)嘗試獲取鎖了。
當(dāng)鎖正在被占用時,等待獲取鎖的進程并不是通過一個 while(true) 死循環(huán)去獲取鎖,而是利用了 Redis 的發(fā)布訂閱機制,通過 await 方法阻塞等待鎖的進程,有效的解決了無效的鎖申請浪費資源的問題。
看門狗如何自動續(xù)期
private? ?RFuture ?tryAcquireAsync(long?leaseTime,?TimeUnit?unit,?long?threadId)? {
????if?(leaseTime?!=?-1)?{
????????return?tryLockInnerAsync(leaseTime,?unit,?threadId,?RedisCommands.EVAL_LONG);
????}
????RFuture?ttlRemainingFuture?=?tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),?TimeUnit.MILLISECONDS,?threadId,?RedisCommands.EVAL_LONG);
????ttlRemainingFuture.onComplete((ttlRemaining,?e)?->?{
????????if?(e?!=?null)?{
????????????return;
????????}
????????//?lock?acquired
????????if?(ttlRemaining?==?null)?{
????????????scheduleExpirationRenewal(threadId);
????????}
????});
????return?ttlRemainingFuture;
}
leaseTime 必須是 -1 才會開啟 Watch Dog 機制,如果需要開啟 Watch Dog 機制就必須使用默認(rèn)的加鎖時間為 30s。 如果你自己自定義時間,超過這個時間,鎖就會自定釋放,并不會自動續(xù)期。
續(xù)期原理
private?void?scheduleExpirationRenewal(long?threadId)?{
????ExpirationEntry?entry?=?new?ExpirationEntry();
????ExpirationEntry?oldEntry?=?EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(),?entry);
????if?(oldEntry?!=?null)?{
????????oldEntry.addThreadId(threadId);
????}?else?{
????????entry.addThreadId(threadId);
????????renewExpiration();
????}
}
protected?RFuture?renewExpirationAsync(long?threadId)? {
????return?commandExecutor.evalWriteAsync(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_BOOLEAN,
????????????"if?(redis.call('hexists',?KEYS[1],?ARGV[2])?==?1)?then?"?+
????????????????"redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+
????????????????"return?1;?"?+
????????????"end;?"?+
????????????"return?0;",
????????Collections.
RedissonLock.EXPIRATION_RENEWAL_MAP里面,然后每隔 10 秒 (internalLockLeaseTime / 3) 檢查一下,如果客戶端 還持有鎖 key(判斷客戶端是否還持有 key,其實就是遍歷?EXPIRATION_RENEWAL_MAP?里面線程 id 然后根據(jù)線程 id 去 Redis 中查,如果存在就會延長 key 的時間),那么就會不斷的延長鎖 key 的生存時間。推薦閱讀:
學(xué)會 IDEA 的這個功能,閱讀源碼簡直太簡單了!!!
Java有陷阱,用時需謹(jǐn)慎-慎用入?yún)⒆龇祷刂?/a>
內(nèi)容包含Java基礎(chǔ)、JavaWeb、MySQL性能優(yōu)化、JVM、鎖、百萬并發(fā)、消息隊列、高性能緩存、反射、Spring全家桶原理、微服務(wù)、Zookeeper、數(shù)據(jù)結(jié)構(gòu)、限流熔斷降級......等技術(shù)棧!
?戳閱讀原文領(lǐng)取!? ? ? ? ? ? ? ??? ??? ? ? ? ? ? ? ? ? ?朕已閱?

