<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Redis 分布式鎖如何自動(dòng)續(xù)期

          共 1885字,需瀏覽 4分鐘

           ·

          2022-01-09 00:35

          點(diǎn)擊下方“IT牧場(chǎng)”,選擇“設(shè)為星標(biāo)”

          來(lái)源:blog.csdn.net/upstream480/article/details/121578638

          Redis 實(shí)現(xiàn)分布式鎖

          • 指定一個(gè) key 作為鎖標(biāo)記,存入 Redis 中,指定一個(gè) 唯一的用戶標(biāo)識(shí)作為 value。

          • 當(dāng) key 不存在時(shí)才能設(shè)置值,確保同一時(shí)間只有一個(gè)客戶端進(jìn)程獲得鎖,滿足互斥性特性。

          • 設(shè)置一個(gè)過(guò)期時(shí)間,防止因系統(tǒng)異常導(dǎo)致沒(méi)能刪除這個(gè) key,滿足防死鎖特性。

          • 當(dāng)處理完業(yè)務(wù)之后需要清除這個(gè) key 來(lái)釋放鎖,清除 key 時(shí)需要校驗(yàn) value 值,需要滿足只有加鎖的人才能釋放鎖 。

          問(wèn)題

          如果這個(gè)鎖的過(guò)期時(shí)間是30秒,但是業(yè)務(wù)運(yùn)行超過(guò)了30秒,比如40秒,當(dāng)業(yè)務(wù)運(yùn)行到30秒的時(shí)候,鎖過(guò)期了,其他客戶端拿到了這個(gè)鎖,怎么辦

          我們可以設(shè)置一個(gè)合理的過(guò)期時(shí)間,讓業(yè)務(wù)能夠在這個(gè)時(shí)間內(nèi)完成業(yè)務(wù)邏輯,但LockTime的設(shè)置原本就很不容易。

          • LockTime設(shè)置過(guò)小,鎖自動(dòng)超時(shí)的概率就會(huì)增加,鎖異常失效的概率也就會(huì)增加;

          • LockTime設(shè)置過(guò)大,萬(wàn)一服務(wù)出現(xiàn)異常無(wú)法正常釋放鎖,那么出現(xiàn)這種異常鎖的時(shí)間也就越長(zhǎng)。

          我們只能通過(guò)經(jīng)驗(yàn)去配置,一個(gè)可以接受的值,基本上是這個(gè)服務(wù)歷史上的平均耗時(shí)再增加一定的buff。總體來(lái)說(shuō),設(shè)置一個(gè)合理的過(guò)期時(shí)間并不容易

          我們也可以不設(shè)置過(guò)期時(shí)間,讓業(yè)務(wù)運(yùn)行結(jié)束后解鎖,但是如果客戶端出現(xiàn)了異常結(jié)束了或宕機(jī)了,那么這個(gè)鎖就無(wú)法解鎖,變成死鎖;

          自動(dòng)續(xù)期

          我們可以先給鎖設(shè)置一個(gè)LockTime,然后啟動(dòng)一個(gè)守護(hù)線程,讓守護(hù)線程在一段時(shí)間后,重新去設(shè)置這個(gè)鎖的LockTime。

          看起來(lái)很簡(jiǎn)單,但實(shí)現(xiàn)起來(lái)并不容易

          • 和釋放鎖的情況一樣,我們需要先判斷持有鎖客戶端是否有變化。否則會(huì)造成無(wú)論誰(shuí)持有鎖,守護(hù)線程都會(huì)去重新設(shè)置鎖的LockTime。

          • 守護(hù)線程要在合理的時(shí)間再去重新設(shè)置鎖的LockTime,否則會(huì)造成資源的浪費(fèi)。不能動(dòng)不動(dòng)就去續(xù)。

          • 如果持有鎖的線程已經(jīng)處理完業(yè)務(wù)了,那么守護(hù)線程也應(yīng)該被銷毀。不能業(yè)務(wù)運(yùn)行結(jié)束了,守護(hù)者還在那里繼續(xù)運(yùn)行,浪費(fèi)資源。

          看門(mén)狗

          Redisson的看門(mén)狗機(jī)制就是這種機(jī)制實(shí)現(xiàn)自動(dòng)續(xù)期的

          Redissson tryLock

          public?boolean?tryLock(long?waitTime,?long?leaseTime,?TimeUnit?unit)?throws?InterruptedException?{
          ????????long?time?=?unit.toMillis(waitTime);
          ????????long?current?=?System.currentTimeMillis();
          ????????long?threadId?=?Thread.currentThread().getId();
          ????????//?1.嘗試獲取鎖
          ????????Long?ttl?=?tryAcquire(leaseTime,?unit,?threadId);
          ????????//?lock?acquired
          ????????if?(ttl?==?null)?{
          ????????????return?true;
          ????????}

          ????????//?申請(qǐng)鎖的耗時(shí)如果大于等于最大等待時(shí)間,則申請(qǐng)鎖失敗.
          ????????time?-=?System.currentTimeMillis()?-?current;
          ????????if?(time?<=?0)?{
          ????????????acquireFailed(threadId);
          ????????????return?false;
          ????????}

          ????????current?=?System.currentTimeMillis();

          ????????/**
          ?????????* 2.訂閱鎖釋放事件,并通過(guò) await 方法阻塞等待鎖釋放,有效的解決了無(wú)效的鎖申請(qǐng)浪費(fèi)資源的問(wèn)題:
          ?????????*?基于信息量,當(dāng)鎖被其它資源占用時(shí),當(dāng)前線程通過(guò)?Redis?的?channel?訂閱鎖的釋放事件,一旦鎖釋放會(huì)發(fā)消息通知待等待的線程進(jìn)行競(jìng)爭(zhēng).
          ?????????*
          ?????????*?當(dāng)?this.await?返回?false,說(shuō)明等待時(shí)間已經(jīng)超出獲取鎖最大等待時(shí)間,取消訂閱并返回獲取鎖失敗.
          ?????????*?當(dāng)?this.await?返回?true,進(jìn)入循環(huán)嘗試獲取鎖.
          ?????????*/

          ????????RFuture?subscribeFuture?=?subscribe(threadId);
          ????????//?await?方法內(nèi)部是用?CountDownLatch?來(lái)實(shí)現(xiàn)阻塞,獲取?subscribe?異步執(zhí)行的結(jié)果(應(yīng)用了?Netty?的?Future)
          ????????if?(!subscribeFuture.await(time,?TimeUnit.MILLISECONDS))?{
          ????????????if?(!subscribeFuture.cancel(false))?{
          ????????????????subscribeFuture.onComplete((res,?e)?->?{
          ????????????????????if?(e?==?null)?{
          ????????????????????????unsubscribe(subscribeFuture,?threadId);
          ????????????????????}
          ????????????????});
          ????????????}
          ????????????acquireFailed(threadId);
          ????????????return?false;
          ????????}

          ????????try?{
          ????????????//?計(jì)算獲取鎖的總耗時(shí),如果大于等于最大等待時(shí)間,則獲取鎖失敗.
          ????????????time?-=?System.currentTimeMillis()?-?current;
          ????????????if?(time?<=?0)?{
          ????????????????acquireFailed(threadId);
          ????????????????return?false;

          ??????????????}

          ????????????/**
          ?????????????*?3.收到鎖釋放的信號(hào)后,在最大等待時(shí)間之內(nèi),循環(huán)一次接著一次的嘗試獲取鎖
          ?????????????*?獲取鎖成功,則立馬返回?true,
          ?????????????*?若在最大等待時(shí)間之內(nèi)還沒(méi)獲取到鎖,則認(rèn)為獲取鎖失敗,返回?false?結(jié)束循環(huán)
          ?????????????*/

          ????????????while?(true)?{
          ????????????????long?currentTime?=?System.currentTimeMillis();

          ????????????????//?再次嘗試獲取鎖
          ????????????????ttl?=?tryAcquire(leaseTime,?unit,?threadId);
          ????????????????//?lock?acquired
          ????????????????if?(ttl?==?null)?{
          ????????????????????return?true;
          ????????????????}
          ????????????????//?超過(guò)最大等待時(shí)間則返回?false?結(jié)束循環(huán),獲取鎖失敗
          ????????????????time?-=?System.currentTimeMillis()?-?currentTime;
          ????????????????if?(time?<=?0)?{
          ????????????????????acquireFailed(threadId);
          ????????????????????return?false;
          ????????????????}

          ????????????????/**
          ?????????????????* 6.阻塞等待鎖(通過(guò)信號(hào)量(共享鎖)阻塞,等待解鎖消息):
          ?????????????????*/

          ????????????????currentTime?=?System.currentTimeMillis();
          ????????????????if?(ttl?>=?0?&&?ttl?????????????????????//如果剩余時(shí)間(ttl)小于wait time ,就在 ttl 時(shí)間內(nèi),從Entry的信號(hào)量獲取一個(gè)許可(除非被中斷或者一直沒(méi)有可用的許可)。
          ????????????????????getEntry(threadId).getLatch().tryAcquire(ttl,?TimeUnit.MILLISECONDS);
          ????????????????}?else?{
          ????????????????????//則就在wait?time?時(shí)間范圍內(nèi)等待可以通過(guò)信號(hào)量
          ????????????????????getEntry(threadId).getLatch().tryAcquire(time,?TimeUnit.MILLISECONDS);
          ????????????????}

          ????????????????//?更新剩余的等待時(shí)間(最大等待時(shí)間-已經(jīng)消耗的阻塞時(shí)間)
          ????????????????time?-=?System.currentTimeMillis()?-?currentTime;
          ????????????????if?(time?<=?0)?{
          ????????????????????acquireFailed(threadId);
          ????????????????????return?false;
          ????????????????}
          ????????????}
          ????????}?finally?{
          ????????????//?7.無(wú)論是否獲得鎖,都要取消訂閱解鎖消息
          ????????????unsubscribe(subscribeFuture,?threadId);
          ????????}
          ????????return?get(tryLockAsync(waitTime,?leaseTime,?unit));
          ????}
          • 嘗試獲取鎖,返回 null 則說(shuō)明加鎖成功,返回一個(gè)數(shù)值,則說(shuō)明已經(jīng)存在該鎖,ttl 為鎖的剩余存活時(shí)間。

          • 如果此時(shí)客戶端 2 進(jìn)程獲取鎖失敗,那么使用客戶端 2 的線程 id(其實(shí)本質(zhì)上就是進(jìn)程 id)通過(guò) Redis 的 channel 訂閱鎖釋放的事件。如果等待的過(guò)程中一直未等到鎖的釋放事件通知,當(dāng)超過(guò)最大等待時(shí)間則獲取鎖失敗,返回 false,也就是第 39 行代碼。如果等到了鎖的釋放事件的通知,則開(kāi)始進(jìn)入一個(gè)不斷重試獲取鎖的循環(huán)。

          • 循環(huán)中每次都先試著獲取鎖,并得到已存在的鎖的剩余存活時(shí)間。如果在重試中拿到了鎖,則直接返回。如果鎖當(dāng)前還是被占用的,那么等待釋放鎖的消息,具體實(shí)現(xiàn)使用了信號(hào)量 Semaphore 來(lái)阻塞線程,當(dāng)鎖釋放并發(fā)布釋放鎖的消息后,信號(hào)量的 release() 方法會(huì)被調(diào)用,此時(shí)被信號(hào)量阻塞的等待隊(duì)列中的一個(gè)線程就可以繼續(xù)嘗試獲取鎖了。

          • 當(dāng)鎖正在被占用時(shí),等待獲取鎖的進(jìn)程并不是通過(guò)一個(gè) while(true) 死循環(huán)去獲取鎖,而是利用了 Redis 的發(fā)布訂閱機(jī)制,通過(guò) await 方法阻塞等待鎖的進(jìn)程,有效的解決了無(wú)效的鎖申請(qǐng)浪費(fèi)資源的問(wèn)題。

          看門(mén)狗如何自動(dòng)續(xù)期

          Redisson看門(mén)狗機(jī)制, 只要客戶端加鎖成功,就會(huì)啟動(dòng)一個(gè) Watch Dog。

          private??RFuture?tryAcquireAsync(long?leaseTime,?TimeUnit?unit,?long?threadId)?{
          ????if?(leaseTime?!=?-1)?{
          ????????return?tryLockInnerAsync(leaseTime,?unit,?threadId,?RedisCommands.EVAL_LONG);
          ????}
          ????RFuture?ttlRemainingFuture?=?tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),?TimeUnit.MILLISECONDS,?threadId,?RedisCommands.EVAL_LONG);
          ????ttlRemainingFuture.onComplete((ttlRemaining,?e)?->?{
          ????????if?(e?!=?null)?{
          ????????????return;
          ????????}

          ????????//?lock?acquired
          ????????if?(ttlRemaining?==?null)?{
          ????????????scheduleExpirationRenewal(threadId);
          ????????}
          ????});
          ????return?ttlRemainingFuture;
          }
          • leaseTime 必須是 -1 才會(huì)開(kāi)啟 Watch Dog 機(jī)制,如果需要開(kāi)啟 Watch Dog 機(jī)制就必須使用默認(rèn)的加鎖時(shí)間為 30s。
          • 如果你自己自定義時(shí)間,超過(guò)這個(gè)時(shí)間,鎖就會(huì)自定釋放,并不會(huì)自動(dòng)續(xù)期。

          續(xù)期原理

          續(xù)期原理其實(shí)就是用lua腳本,將鎖的時(shí)間重置為30s

          private?void?scheduleExpirationRenewal(long?threadId)?{
          ????ExpirationEntry?entry?=?new?ExpirationEntry();
          ????ExpirationEntry?oldEntry?=?EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(),?entry);
          ????if?(oldEntry?!=?null)?{
          ????????oldEntry.addThreadId(threadId);
          ????}?else?{
          ????????entry.addThreadId(threadId);
          ????????renewExpiration();
          ????}
          }

          protected?RFuture?renewExpirationAsync(long?threadId)?{
          ????return?commandExecutor.evalWriteAsync(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_BOOLEAN,
          ????????????"if?(redis.call('hexists',?KEYS[1],?ARGV[2])?==?1)?then?"?+
          ????????????????"redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+
          ????????????????"return?1;?"?+
          ????????????"end;?"?+
          ????????????"return?0;",
          ????????Collections.singletonList(getName()),
          ????????internalLockLeaseTime,?getLockName(threadId));
          }

          Watch Dog 機(jī)制其實(shí)就是一個(gè)后臺(tái)定時(shí)任務(wù)線程,獲取鎖成功之后,會(huì)將持有鎖的線程放入到一個(gè)?RedissonLock.EXPIRATION_RENEWAL_MAP里面,然后每隔 10 秒 (internalLockLeaseTime / 3) 檢查一下,如果客戶端 還持有鎖 key(判斷客戶端是否還持有 key,其實(shí)就是遍歷?EXPIRATION_RENEWAL_MAP?里面線程 id 然后根據(jù)線程 id 去 Redis 中查,如果存在就會(huì)延長(zhǎng) key 的時(shí)間),那么就會(huì)不斷的延長(zhǎng)鎖 key 的生存時(shí)間。

          如果服務(wù)宕機(jī)了,Watch Dog 機(jī)制線程也就沒(méi)有了,此時(shí)就不會(huì)延長(zhǎng) key 的過(guò)期時(shí)間,到了 30s 之后就會(huì)自動(dòng)過(guò)期了,其他線程就可以獲取到鎖。

          干貨分享

          最近將個(gè)人學(xué)習(xí)筆記整理成冊(cè),使用PDF分享。關(guān)注我,回復(fù)如下代碼,即可獲得百度盤(pán)地址,無(wú)套路領(lǐng)取!

          ?001:《Java并發(fā)與高并發(fā)解決方案》學(xué)習(xí)筆記;?002:《深入JVM內(nèi)核——原理、診斷與優(yōu)化》學(xué)習(xí)筆記;?003:《Java面試寶典》?004:《Docker開(kāi)源書(shū)》?005:《Kubernetes開(kāi)源書(shū)》?006:《DDD速成(領(lǐng)域驅(qū)動(dòng)設(shè)計(jì)速成)》?007:全部?008:加技術(shù)群討論

          加個(gè)關(guān)注不迷路

          喜歡就點(diǎn)個(gè)"在看"唄^_^

          瀏覽 37
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                    <th id="afajh"><progress id="afajh"></progress></th>
                    久久诱惑 | 久久亚洲大家都在搜 | 亚洲电影网站 | 18禁一区 | 嫩草综合网 |