<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          redission 分布式鎖

          共 6573字,需瀏覽 14分鐘

           ·

          2020-09-30 12:13

          點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”

          優(yōu)質(zhì)文章,第一時(shí)間送達(dá)

          ? 作者?|??穆穆兔兔

          來(lái)源 |? urlify.cn/RrAR3u

          66套java從入門到精通實(shí)戰(zhàn)課程分享

          概述

          分布式系統(tǒng)有一個(gè)著名的理論CAP,指在一個(gè)分布式系統(tǒng)中,最多只能同時(shí)滿足一致性(Consistency)、可用性(Availability)和分區(qū)容錯(cuò)性(Partition tolerance)這三項(xiàng)中的兩項(xiàng)。所以在設(shè)計(jì)系統(tǒng)時(shí),往往需要權(quán)衡,在CAP中作選擇。當(dāng)然,這個(gè)理論也并不一定完美,不同系統(tǒng)對(duì)CAP的要求級(jí)別不一樣,選擇需要考慮方方面面。

          在微服務(wù)系統(tǒng)中,一個(gè)請(qǐng)求存在多級(jí)跨服務(wù)調(diào)用,往往需要犧牲強(qiáng)一致性老保證系統(tǒng)高可用,比如通過(guò)分布式事務(wù),異步消息等手段完成。但還是有的場(chǎng)景,需要阻塞所有節(jié)點(diǎn)的所有線程,對(duì)共享資源的訪問(wèn)。比如并發(fā)時(shí)“超賣”和“余額減為負(fù)數(shù)”等情況。

          本地鎖可以通過(guò)語(yǔ)言本身支持,要實(shí)現(xiàn)分布式鎖,就必須依賴中間件,數(shù)據(jù)庫(kù)、redis、zookeeper等。

          分布式鎖特性

          不管使用什么中間件,有幾點(diǎn)是實(shí)現(xiàn)分布式鎖必須要考慮到的。

          1. 互斥:互斥好像是必須的,否則怎么叫鎖。

          2. 死鎖: 如果一個(gè)線程獲得鎖,然后掛了,并沒(méi)有釋放鎖,致使其他節(jié)點(diǎn)(線程)永遠(yuǎn)無(wú)法獲取鎖,這就是死鎖。分布式鎖必須做到避免死鎖。

          3. 性能: 高并發(fā)分布式系統(tǒng)中,線程互斥等待會(huì)成為性能瓶頸,需要好的中間件和實(shí)現(xiàn)來(lái)保證性能。

          4. 鎖特性:考慮到復(fù)雜的場(chǎng)景,分布式鎖不能只是加鎖,然后一直等待。最好實(shí)現(xiàn)如Java Lock的一些功能如:鎖判斷,超時(shí)設(shè)置,可重入性等。

          Redis實(shí)現(xiàn)之Redisson原理

          redission實(shí)現(xiàn)了JDK中的Lock接口,所以使用方式一樣,只是Redssion的鎖是分布式的。如下:

          RLock?lock?=?redisson.getLock("className");
          lock.lock();
          try?{
          //?do?sth.
          }?finally?{
          lock.unlock();
          }

          好,Lock主要實(shí)現(xiàn)是RedissionLock。

          先來(lái)看常用的Lock方法實(shí)現(xiàn)。

          @Override
          public?void?lock()?{
          try?{
          lockInterruptibly();
          }?catch?(InterruptedException?e)?{
          Thread.currentThread().interrupt();
          }
          }
          @Override
          public?void?lockInterruptibly()?throws?InterruptedException?{
          lockInterruptibly(-1,?null);
          }

          再看lockInterruptibly方法:

          @Override
          public?void?lockInterruptibly(long?leaseTime,?TimeUnit?unit)?throws?InterruptedException?{
          long?threadId?=?Thread.currentThread().getId();
          //?獲取鎖
          Long?ttl?=?tryAcquire(leaseTime,?unit,?threadId);
          if?(ttl?==?null)?{?//?獲取成功
          return;
          }
          ?
          //?異步訂閱redis?chennel
          RFuture?future?=?subscribe(threadId);
          commandExecutor.syncSubscription(future);?//?阻塞獲取訂閱結(jié)果
          ?
          try?{
          while?(true)?{//?循環(huán)判斷知道獲取鎖
          ttl?=?tryAcquire(leaseTime,?unit,?threadId);
          //?lock?acquired
          if?(ttl?==?null)?{
          break;
          }
          ?
          //?waiting?for?message
          if?(ttl?>=?0)?{
          getEntry(threadId).getLatch().tryAcquire(ttl,?TimeUnit.MILLISECONDS);
          }?else?{
          getEntry(threadId).getLatch().acquire();
          }
          }
          }?finally?{
          unsubscribe(future,?threadId);//?取消訂閱
          }
          }

          總結(jié)lockInterruptibly:獲取鎖,不成功則訂閱釋放鎖的消息,獲得消息前阻塞。得到釋放通知后再去循環(huán)獲取鎖。

          下面重點(diǎn)看看如何獲取鎖:Long ttl = tryAcquire(leaseTime, unit, threadId)

          private?Long?tryAcquire(long?leaseTime,?TimeUnit?unit,?long?threadId)?{
          return?get(tryAcquireAsync(leaseTime,?unit,?threadId));//?通過(guò)異步獲取鎖,但get(future)實(shí)現(xiàn)同步
          }
          private??RFuture?tryAcquireAsync(long?leaseTime,?TimeUnit?unit,?final?long?threadId)?{
          if?(leaseTime?!=?-1)?{?//1?如果設(shè)置了超時(shí)時(shí)間,直接調(diào)用?tryLockInnerAsync
          return?tryLockInnerAsync(leaseTime,?unit,?threadId,?RedisCommands.EVAL_LONG);
          }
          //2?如果leaseTime==-1,則默認(rèn)超時(shí)時(shí)間為30s
          RFuture?ttlRemainingFuture?=?tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS,?TimeUnit.SECONDS,?threadId,?RedisCommands.EVAL_LONG);
          //3?監(jiān)聽Future,獲取Future返回值ttlRemaining(剩余超時(shí)時(shí)間),獲取鎖成功,但是ttlRemaining,則刷新過(guò)期時(shí)間
          ttlRemainingFuture.addListener(new?FutureListener()?{
          @Override
          public?void?operationComplete(Future?future)?throws?Exception?{
          if?(!future.isSuccess())?{
          return;
          }
          ?
          Long?ttlRemaining?=?future.getNow();
          //?lock?acquired
          if?(ttlRemaining?==?null)?{
          scheduleExpirationRenewal(threadId);
          }
          }
          });
          return?ttlRemainingFuture;
          }

          已經(jīng)在注釋中解釋了,需要注意的是,此處用到了Netty的Future-listen模型,可以看看我的另一篇對(duì)Future的簡(jiǎn)單講解:給Future一個(gè)Promise。

          下面就是最重要的redis獲取鎖的方法tryLockInnerAsync:

          ?RFuture?tryLockInnerAsync(long?leaseTime,?TimeUnit?unit,?long?threadId,?RedisStrictCommand?command)?{
          internalLockLeaseTime?=?unit.toMillis(leaseTime);
          return?commandExecutor.evalWriteAsync(
          getName(),
          LongCodec.INSTANCE,
          command,
          "if?(redis.call('exists',?KEYS[1])?==?0)?then?"?+
          "redis.call('hset',?KEYS[1],?ARGV[2],?1);?"?+
          "redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+
          "return?nil;?"?+
          "end;?"?+
          "if?(redis.call('hexists',?KEYS[1],?ARGV[2])?==?1)?then?"?+
          "redis.call('hincrby',?KEYS[1],?ARGV[2],?1);?"?+
          "redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+
          "return?nil;?"?+
          "end;?"?+
          "return?redis.call('pttl',?KEYS[1]);",
          Collections.singletonList(getName()),?internalLockLeaseTime,?getLockName(threadId));
          }

          這個(gè)方法主要就是調(diào)用redis執(zhí)行eval lua,為什么使用eval,因?yàn)閞edis對(duì)lua腳本執(zhí)行具有原子性。把這個(gè)方法翻譯一下:

          --?1.?沒(méi)被鎖{key不存在}
          eval?"return?redis.call('exists',?KEYS[1])"?1?myLock
          --?(1)?設(shè)置Lock為key,uuid:threadId為filed,?filed值為1
          eval?"return?redis.call('hset',?KEYS[1],?ARGV[2],?1)"?1?myLock?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
          --?(2)?設(shè)置key過(guò)期時(shí)間{防止獲取鎖后線程掛掉導(dǎo)致死鎖}
          eval?"return?redis.call('pexpire',?KEYS[1],?ARGV[1])"?1?myLock?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
          ?
          --?2.?已經(jīng)被同線程獲得鎖{key存在并且field存在}
          eval?"return?redis.call('hexists',?KEYS[1],?ARGV[2])"?1?myLock?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
          --?(1)?可重入,但filed字段+1
          eval?"return?redis.call('hincrby',?KEYS[1],?ARGV[2],1)"?1?myLock?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
          --?(2)?刷新過(guò)去時(shí)間
          eval?"return?redis.call('pexpire',?KEYS[1],?ARGV[1])"?1?myLock?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
          ?
          -- 3. 已經(jīng)被其他線程鎖住{key存在,但是field不存在}:以毫秒為單位返回 key 的剩余超時(shí)時(shí)間
          eval?"return?redis.call('pttl',?KEYS[1])"?1?myLock

          這就是核心獲取鎖的方式,下面直接釋放鎖方法unlockInnerAsync

          --?1.?key不存在
          eval?"return?redis.call('exists',?KEYS[1])"?2?myLock?redisson_lock__channel_lock?0?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
          --?(1)?發(fā)送釋放鎖的消息,返回1,釋放成功
          eval?"return?redis.call('publish',?KEYS[2],?ARGV[1])"?2?myLock?redisson_lock__channel_lock?0?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
          ?
          --?2.?key存在,但field不存在,說(shuō)明自己不是鎖持有者,無(wú)權(quán)釋放,直接return?nil
          eval?"return?redis.call('hexists',?KEYS[1],?ARGV[3])"?2?myLock?redisson_lock__channel_lock?0?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
          eval?"return?nil"
          ?
          --?3.?filed存在,說(shuō)明是本線程在鎖,但有可能其他地方重入鎖,不能直接釋放,應(yīng)該-1
          eval?"return?redis.call('hincrby',?KEYS[1],?ARGV[3],-1)"?2?myLock?redisson_lock__channel_lock?0?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
          ?
          -- 4. 如果減1后大于0,說(shuō)明還有其他重入鎖,刷新過(guò)期時(shí)間,返回0。
          eval?"return?redis.call('pexpire',?KEYS[1],?ARGV[2])"?2?myLock?redisson_lock__channel_lock?0?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
          ?
          --?5.?如果不大于0,說(shuō)明最后一把鎖,需要釋放
          --?刪除key
          eval?"return?redis.call('del',?KEYS[1])"?2?myLock?redisson_lock__channel_lock?0?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
          --?發(fā)釋放消息
          eval?"return?redis.call('publish',?KEYS[2],?ARGV[1])"?2?myLock?redisson_lock__channel_lock?0?3000?3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
          --?返回1,釋放成功

          從釋放鎖代碼中看到,刪除key后會(huì)發(fā)送消息,所以上文提到獲取鎖失敗后,阻塞訂閱此消息。

          另外,上文提到刷新過(guò)期時(shí)間方法scheduleExpirationRenewal,指線程獲取鎖后需要不斷刷新失效時(shí)間,避免未執(zhí)行完鎖就失效。這個(gè)方法的實(shí)現(xiàn)原理也類似,只是使用了Netty的TimerTask,每到過(guò)期時(shí)間1/3就去重新刷一次,如果key不存在則停止刷新。Timer實(shí)現(xiàn)大概如下:

          private?static?void?nettyTimer()?{
          final?int?expireTime?=?6;
          EventExecutorGroup?group?=?new?DefaultEventExecutorGroup(1);
          final?Timer?timer?=?new?HashedWheelTimer();
          timer.newTimeout(timerTask?->?{
          Future?future?=?group.submit(()?->?{
          System.out.println("刷新key的失效時(shí)間為"+expireTime?+"秒");
          return?false;//?但key不存在時(shí),返回true
          });
          future.addListener(future1?->?{
          if?(!future.getNow())?{
          nettyTimer();
          }
          });
          },?expireTime/3,?TimeUnit.SECONDS);
          }



          參考列表:

          • 一分鐘實(shí)現(xiàn)分布式鎖



          粉絲福利:Java從入門到入土學(xué)習(xí)路線圖

          ???

          ?長(zhǎng)按上方微信二維碼?2 秒


          感謝點(diǎn)贊支持下哈?

          瀏覽 52
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                    <th id="afajh"><progress id="afajh"></progress></th>
                    天天色天天| 久久精品国产亚洲7777 | www蜜桃久久 | 嫩逼网站 | 91亚洲国产精品 |