<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Redis 分布式鎖如何自動續(xù)期

          共 1906字,需瀏覽 4分鐘

           ·

          2021-12-28 19:15

          程序員的成長之路
          互聯(lián)網(wǎng)/程序員/技術(shù)/資料共享?
          關(guān)注


          閱讀本文大概需要 6?分鐘。

          來自:blog.csdn.net/upstream480/article/details/121578638

          Redis 實現(xiàn)分布式鎖

          • 指定一個 key 作為鎖標(biāo)記,存入 Redis 中,指定一個 唯一的用戶標(biāo)識作為 value。

          • 當(dāng) key 不存在時才能設(shè)置值,確保同一時間只有一個客戶端進程獲得鎖,滿足互斥性特性。

          • 設(shè)置一個過期時間,防止因系統(tǒng)異常導(dǎo)致沒能刪除這個 key,滿足防死鎖特性。

          • 當(dāng)處理完業(yè)務(wù)之后需要清除這個 key 來釋放鎖,清除 key 時需要校驗 value 值,需要滿足只有加鎖的人才能釋放鎖 。

          問題

          如果這個鎖的過期時間是30秒,但是業(yè)務(wù)運行超過了30秒,比如40秒,當(dāng)業(yè)務(wù)運行到30秒的時候,鎖過期了,其他客戶端拿到了這個鎖,怎么辦
          我們可以設(shè)置一個合理的過期時間,讓業(yè)務(wù)能夠在這個時間內(nèi)完成業(yè)務(wù)邏輯,但LockTime的設(shè)置原本就很不容易。
          • LockTime設(shè)置過小,鎖自動超時的概率就會增加,鎖異常失效的概率也就會增加;

          • LockTime設(shè)置過大,萬一服務(wù)出現(xiàn)異常無法正常釋放鎖,那么出現(xiàn)這種異常鎖的時間也就越長。

          我們只能通過經(jīng)驗去配置,一個可以接受的值,基本上是這個服務(wù)歷史上的平均耗時再增加一定的buff。總體來說,設(shè)置一個合理的過期時間并不容易
          我們也可以不設(shè)置過期時間,讓業(yè)務(wù)運行結(jié)束后解鎖,但是如果客戶端出現(xiàn)了異常結(jié)束了或宕機了,那么這個鎖就無法解鎖,變成死鎖;

          自動續(xù)期

          我們可以先給鎖設(shè)置一個LockTime,然后啟動一個守護線程,讓守護線程在一段時間后,重新去設(shè)置這個鎖的LockTime。
          看起來很簡單,但實現(xiàn)起來并不容易
          • 和釋放鎖的情況一樣,我們需要先判斷持有鎖客戶端是否有變化。否則會造成無論誰持有鎖,守護線程都會去重新設(shè)置鎖的LockTime。

          • 守護線程要在合理的時間再去重新設(shè)置鎖的LockTime,否則會造成資源的浪費。不能動不動就去續(xù)。

          • 如果持有鎖的線程已經(jīng)處理完業(yè)務(wù)了,那么守護線程也應(yīng)該被銷毀。不能業(yè)務(wù)運行結(jié)束了,守護者還在那里繼續(xù)運行,浪費資源。

          看門狗

          Redisson的看門狗機制就是這種機制實現(xiàn)自動續(xù)期的

          Redissson tryLock

          public?boolean?tryLock(long?waitTime,?long?leaseTime,?TimeUnit?unit)?throws?InterruptedException?{
          ????????long?time?=?unit.toMillis(waitTime);
          ????????long?current?=?System.currentTimeMillis();
          ????????long?threadId?=?Thread.currentThread().getId();
          ????????//?1.嘗試獲取鎖
          ????????Long?ttl?=?tryAcquire(leaseTime,?unit,?threadId);
          ????????//?lock?acquired
          ????????if?(ttl?==?null)?{
          ????????????return?true;
          ????????}

          ????????//?申請鎖的耗時如果大于等于最大等待時間,則申請鎖失敗.
          ????????time?-=?System.currentTimeMillis()?-?current;
          ????????if?(time?<=?0)?{
          ????????????acquireFailed(threadId);
          ????????????return?false;
          ????????}

          ????????current?=?System.currentTimeMillis();

          ????????/**
          ?????????* 2.訂閱鎖釋放事件,并通過 await 方法阻塞等待鎖釋放,有效的解決了無效的鎖申請浪費資源的問題:
          ?????????*?基于信息量,當(dāng)鎖被其它資源占用時,當(dāng)前線程通過?Redis?的?channel?訂閱鎖的釋放事件,一旦鎖釋放會發(fā)消息通知待等待的線程進行競爭.
          ?????????*
          ?????????*?當(dāng)?this.await?返回?false,說明等待時間已經(jīng)超出獲取鎖最大等待時間,取消訂閱并返回獲取鎖失敗.
          ?????????*?當(dāng)?this.await?返回?true,進入循環(huán)嘗試獲取鎖.
          ?????????*/

          ????????RFuture?subscribeFuture?=?subscribe(threadId);
          ????????//?await?方法內(nèi)部是用?CountDownLatch?來實現(xiàn)阻塞,獲取?subscribe?異步執(zhí)行的結(jié)果(應(yīng)用了?Netty?的?Future)
          ????????if?(!subscribeFuture.await(time,?TimeUnit.MILLISECONDS))?{
          ????????????if?(!subscribeFuture.cancel(false))?{
          ????????????????subscribeFuture.onComplete((res,?e)?->?{
          ????????????????????if?(e?==?null)?{
          ????????????????????????unsubscribe(subscribeFuture,?threadId);
          ????????????????????}
          ????????????????});
          ????????????}
          ????????????acquireFailed(threadId);
          ????????????return?false;
          ????????}

          ????????try?{
          ????????????//?計算獲取鎖的總耗時,如果大于等于最大等待時間,則獲取鎖失敗.
          ????????????time?-=?System.currentTimeMillis()?-?current;
          ????????????if?(time?<=?0)?{
          ????????????????acquireFailed(threadId);
          ????????????????return?false;

          ??????????????}

          ????????????/**
          ?????????????*?3.收到鎖釋放的信號后,在最大等待時間之內(nèi),循環(huán)一次接著一次的嘗試獲取鎖
          ?????????????*?獲取鎖成功,則立馬返回?true,
          ?????????????*?若在最大等待時間之內(nèi)還沒獲取到鎖,則認(rèn)為獲取鎖失敗,返回?false?結(jié)束循環(huán)
          ?????????????*/

          ????????????while?(true)?{
          ????????????????long?currentTime?=?System.currentTimeMillis();

          ????????????????//?再次嘗試獲取鎖
          ????????????????ttl?=?tryAcquire(leaseTime,?unit,?threadId);
          ????????????????//?lock?acquired
          ????????????????if?(ttl?==?null)?{
          ????????????????????return?true;
          ????????????????}
          ????????????????//?超過最大等待時間則返回?false?結(jié)束循環(huán),獲取鎖失敗
          ????????????????time?-=?System.currentTimeMillis()?-?currentTime;
          ????????????????if?(time?<=?0)?{
          ????????????????????acquireFailed(threadId);
          ????????????????????return?false;
          ????????????????}

          ????????????????/**
          ?????????????????* 6.阻塞等待鎖(通過信號量(共享鎖)阻塞,等待解鎖消息):
          ?????????????????*/

          ????????????????currentTime?=?System.currentTimeMillis();
          ????????????????if?(ttl?>=?0?&&?ttl?????????????????????//如果剩余時間(ttl)小于wait time ,就在 ttl 時間內(nèi),從Entry的信號量獲取一個許可(除非被中斷或者一直沒有可用的許可)。
          ????????????????????getEntry(threadId).getLatch().tryAcquire(ttl,?TimeUnit.MILLISECONDS);
          ????????????????}?else?{
          ????????????????????//則就在wait?time?時間范圍內(nèi)等待可以通過信號量
          ????????????????????getEntry(threadId).getLatch().tryAcquire(time,?TimeUnit.MILLISECONDS);
          ????????????????}

          ????????????????//?更新剩余的等待時間(最大等待時間-已經(jīng)消耗的阻塞時間)
          ????????????????time?-=?System.currentTimeMillis()?-?currentTime;
          ????????????????if?(time?<=?0)?{
          ????????????????????acquireFailed(threadId);
          ????????????????????return?false;
          ????????????????}
          ????????????}
          ????????}?finally?{
          ????????????//?7.無論是否獲得鎖,都要取消訂閱解鎖消息
          ????????????unsubscribe(subscribeFuture,?threadId);
          ????????}
          ????????return?get(tryLockAsync(waitTime,?leaseTime,?unit));
          ????}
          • 嘗試獲取鎖,返回 null 則說明加鎖成功,返回一個數(shù)值,則說明已經(jīng)存在該鎖,ttl 為鎖的剩余存活時間。

          • 如果此時客戶端 2 進程獲取鎖失敗,那么使用客戶端 2 的線程 id(其實本質(zhì)上就是進程 id)通過 Redis 的 channel 訂閱鎖釋放的事件。如果等待的過程中一直未等到鎖的釋放事件通知,當(dāng)超過最大等待時間則獲取鎖失敗,返回 false,也就是第 39 行代碼。如果等到了鎖的釋放事件的通知,則開始進入一個不斷重試獲取鎖的循環(huán)。

          • 循環(huán)中每次都先試著獲取鎖,并得到已存在的鎖的剩余存活時間。如果在重試中拿到了鎖,則直接返回。如果鎖當(dāng)前還是被占用的,那么等待釋放鎖的消息,具體實現(xiàn)使用了信號量 Semaphore 來阻塞線程,當(dāng)鎖釋放并發(fā)布釋放鎖的消息后,信號量的 release() 方法會被調(diào)用,此時被信號量阻塞的等待隊列中的一個線程就可以繼續(xù)嘗試獲取鎖了。

          • 當(dāng)鎖正在被占用時,等待獲取鎖的進程并不是通過一個 while(true) 死循環(huán)去獲取鎖,而是利用了 Redis 的發(fā)布訂閱機制,通過 await 方法阻塞等待鎖的進程,有效的解決了無效的鎖申請浪費資源的問題。

          看門狗如何自動續(xù)期

          Redisson看門狗機制, 只要客戶端加鎖成功,就會啟動一個 Watch Dog。
          private??RFuture?tryAcquireAsync(long?leaseTime,?TimeUnit?unit,?long?threadId)?{
          ????if?(leaseTime?!=?-1)?{
          ????????return?tryLockInnerAsync(leaseTime,?unit,?threadId,?RedisCommands.EVAL_LONG);
          ????}
          ????RFuture?ttlRemainingFuture?=?tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),?TimeUnit.MILLISECONDS,?threadId,?RedisCommands.EVAL_LONG);
          ????ttlRemainingFuture.onComplete((ttlRemaining,?e)?->?{
          ????????if?(e?!=?null)?{
          ????????????return;
          ????????}

          ????????//?lock?acquired
          ????????if?(ttlRemaining?==?null)?{
          ????????????scheduleExpirationRenewal(threadId);
          ????????}
          ????});
          ????return?ttlRemainingFuture;
          }
          • leaseTime 必須是 -1 才會開啟 Watch Dog 機制,如果需要開啟 Watch Dog 機制就必須使用默認(rèn)的加鎖時間為 30s。
          • 如果你自己自定義時間,超過這個時間,鎖就會自定釋放,并不會自動續(xù)期。

          續(xù)期原理

          續(xù)期原理其實就是用lua腳本,將鎖的時間重置為30s
          private?void?scheduleExpirationRenewal(long?threadId)?{
          ????ExpirationEntry?entry?=?new?ExpirationEntry();
          ????ExpirationEntry?oldEntry?=?EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(),?entry);
          ????if?(oldEntry?!=?null)?{
          ????????oldEntry.addThreadId(threadId);
          ????}?else?{
          ????????entry.addThreadId(threadId);
          ????????renewExpiration();
          ????}
          }

          protected?RFuture?renewExpirationAsync(long?threadId)?{
          ????return?commandExecutor.evalWriteAsync(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_BOOLEAN,
          ????????????"if?(redis.call('hexists',?KEYS[1],?ARGV[2])?==?1)?then?"?+
          ????????????????"redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+
          ????????????????"return?1;?"?+
          ????????????"end;?"?+
          ????????????"return?0;",
          ????????Collections.singletonList(getName()),
          ????????internalLockLeaseTime,?getLockName(threadId));
          }
          Watch Dog 機制其實就是一個后臺定時任務(wù)線程,獲取鎖成功之后,會將持有鎖的線程放入到一個?RedissonLock.EXPIRATION_RENEWAL_MAP里面,然后每隔 10 秒 (internalLockLeaseTime / 3) 檢查一下,如果客戶端 還持有鎖 key(判斷客戶端是否還持有 key,其實就是遍歷?EXPIRATION_RENEWAL_MAP?里面線程 id 然后根據(jù)線程 id 去 Redis 中查,如果存在就會延長 key 的時間),那么就會不斷的延長鎖 key 的生存時間。
          如果服務(wù)宕機了,Watch Dog 機制線程也就沒有了,此時就不會延長 key 的過期時間,到了 30s 之后就會自動過期了,其他線程就可以獲取到鎖。

          推薦閱讀:

          學(xué)會 IDEA 的這個功能,閱讀源碼簡直太簡單了!!!

          Java有陷阱,用時需謹(jǐn)慎-慎用入?yún)⒆龇祷刂?/a>

          互聯(lián)網(wǎng)初中高級大廠面試題(9個G)

          內(nèi)容包含Java基礎(chǔ)、JavaWeb、MySQL性能優(yōu)化、JVM、鎖、百萬并發(fā)、消息隊列、高性能緩存、反射、Spring全家桶原理、微服務(wù)、Zookeeper、數(shù)據(jù)結(jié)構(gòu)、限流熔斷降級......等技術(shù)棧!

          ?戳閱讀原文領(lǐng)取!? ? ? ? ? ? ? ??? ??? ? ? ? ? ? ? ? ? ?朕已閱?

          瀏覽 34
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                    <th id="afajh"><progress id="afajh"></progress></th>
                    91探花国产综合在线精品 | 色色婷婷五月 | 久久久久成人电 | 操逼精品视频 | 中文字幕一区第一页 |