<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          從零到一編碼實現(xiàn)Redis分布式鎖

          共 7732字,需瀏覽 16分鐘

           ·

          2021-12-18 11:47

          : 不是有redission等現(xiàn)成工具嗎?咋不用?
          : 不,我就想自己寫一個!

          陳建斌說 : 你這個男的怎么回事 ?!

          有的同學(xué),就是這么尿性。也能理解,不自己弄一下,怎么能理解透徹,那就一起來搞一下唄!

          使用場景和選型

          分布式多節(jié)點的部署方式,使得共享變量有可能被同時操作,遇到有數(shù)據(jù)一致性要求的情況,就需要采取全局鎖定的措施來保障并發(fā)操作下的一致性要求,如,庫存扣減操作、同一個商品的上下架和更新操作等等。

          常見的,分布式鎖采用ZookeeperRedis來實現(xiàn)。怎么取舍呢?


          zookeeper

          redis

          加鎖原理

          創(chuàng)建節(jié)點,節(jié)點已存在時創(chuàng)建失敗

          插入數(shù)據(jù),數(shù)據(jù)已存在則設(shè)置失敗

          過期保護

          節(jié)點類型為臨時節(jié)點,斷連刪除

          設(shè)置過期時間,到期刪除

          優(yōu)點

          在加鎖失敗時,zk的注冊通知更優(yōu)雅

          速度快,性能高

          缺點

          只有l(wèi)eader負(fù)責(zé)寫,然后通知flower,性能較差

          搶鎖失敗時,需要自旋循環(huán)嘗試

          生產(chǎn)環(huán)境下,性能往往被優(yōu)先考慮,相比較各自的優(yōu)缺點,綜合考慮,我們一般更傾向于redis。

          從0到1 實現(xiàn)分布式鎖

          step1: 加鎖 和 解鎖的基礎(chǔ)能力構(gòu)建

          Jedis.set(key, value, params)?????
          這個2.6之后新增的加強版set命令是真不錯,解決了加鎖時設(shè)置鎖超時時間的原子訴求,防止服務(wù)宕機導(dǎo)致的死鎖~

          (1) 一個具有加鎖解鎖功能的分布式鎖對象,最少要有?jedis客戶端?、?對應(yīng)的redis key?、?鎖超時時間?:

          //構(gòu)建分布式鎖對象
          public?class?DistributedLock?{
          ????private?Jedis?jedis;
          ????private?String?lockName;
          ????private?long?lockExpireSecond;

          ????public?DistributedLock(Jedis?jedis,?String?lockName,?long?lockExpireSecond)?{
          ????????this.jedis?=?jedis;
          ????????this.lockName?=?lockName;
          ????????this.lockExpireSecond?=?lockExpireSecond;
          ????}
          }

          (2) 利用jedis提供的SetParams?,對NX?,?PX?在jedis.set操作中一次性的原子的完成設(shè)置:

          public?void?lock()?throws?BizException?{
          ????String?lockResult?=?null;
          ????try?{
          ???????//設(shè)置?NX?PX?參數(shù)
          ???????SetParams?params?=?new?SetParams();
          ???????params.nx();
          ???????params.px(TimeUnit.SECONDS.toMillis(lockExpireSecond));
          ???????//執(zhí)行加鎖?,?value?暫定?為固定字符串
          ???????lockResult?=?this.jedis.set(this.lockName,?"lockValue",?params);

          ???}?catch?(Exception?e)?{
          ???????LOG.error("lock?error",e);
          ???}

          ???if?("OK".equals(lockResult))?{
          ???????LOG.debug("locked?success,lockName:{}",lockName);
          ???}?else?{
          ??????throw?new?BizException("Get?lock?failed.");
          ???}
          }

          (3) 用jedis.del命令完成解鎖:

          ?public?boolean?unlock()?{
          ????boolean?unlockResult=false;

          ???try?{
          ???????this.jedis.del(this.lockName);
          ???????unlockResult=true;
          ???}catch?(Exception?e){
          ??????LOG.error("unLock?error",e);
          ???}
          ????return?unlockResult;
          }

          step2: 加鎖失敗直接結(jié)束? 希望多試幾次

          從上面的構(gòu)造函數(shù)和lock()實現(xiàn),發(fā)現(xiàn)當(dāng)前實現(xiàn)屬于一錘子買賣,不成功便成仁。這其實不太滿足我們的生產(chǎn)需求,很多場景下,業(yè)務(wù)執(zhí)行速度是很快的,只要稍微等一等,就可以。那怎么辦?

          自定義重試次數(shù)和等待間隔,有限重試等待

          //新增重試間隔屬性
          private?long?retryIntervalTime;?

          //通過構(gòu)造方法初始化重試間隔
          public?DistributedLock(Jedis?jedis,?String?lockName,?long?lockExpireSecond,?long?retryIntervalTime)?{
          ???...略
          ???this.retryIntervalTime?=?retryIntervalTime;
          }

          //新增入?yún)?,加鎖超時時間
          public?void?lock(long?timeout,TimeUnit?unit)?throws?TimeoutException?{
          ???String?lockResult?=?null;
          ???try?{
          ???????//設(shè)置?NX?PX?參數(shù)
          ???????SetParams?params?=?new?SetParams();
          ???????params.nx();
          ???????params.px(TimeUnit.SECONDS.toMillis(lockExpireSecond));
          ????????????
          ??????//加鎖開始時間
          ??????long?startTime=System.nanoTime();
          ????????????
          ??????//循環(huán)有限等待
          ??????while?(!"OK".equals(lockResult=this.jedis.set(this.lockName,?"lockValue",?params))&&!isTimeout(startTime,unit.toNanos(timeout))){
          ???????????Thread.sleep(retryIntervalTime);
          ?????}

          ??}?catch?(Exception?e)?{
          ??????LOG.error("lock?error",e);
          ??}
          ????????
          ??//修改拋出異常類型為超時異常
          ??if?("OK".equals(lockResult))?{
          ???????LOG.debug("locked?success,lockName:{}",lockName);
          ??}?else?{
          ??????throw?new?TimeoutException("Get?lock?failed?because?of?timeout.");
          ??}
          }

          step3: 只能解自己加的鎖,別人的鎖不能亂動

          考慮一個問題:我們?yōu)榱朔乐辜渔i后機器宕機的情況,給鎖設(shè)置了過期時間,以此來保障鎖可以在服務(wù)節(jié)點宕機不能解鎖時,也可以給后續(xù)業(yè)務(wù)提供鎖操作。

          參考《How to do distributed locking》

          上圖中,因為業(yè)務(wù)執(zhí)行時間的不可控(或者遇到GC等不可預(yù)期的停頓),給分布式鎖帶來了使用問題。

          我們先看問題一:用戶線程1?把 線程2的鎖釋放了!怎么辦呢?

          加鎖保存線程標(biāo)識,解鎖校驗,非自己的鎖不釋放

          //其他屬性略,新增lockOwner標(biāo)識
          private?String?lockOwner;

          //通過構(gòu)造函數(shù)初始化lockOwner標(biāo)識?
          public?DistributedLock(Jedis?jedis,?String?lockName,?String?lockOwner,?long?lockExpireSecond,?long?retryIntervalTime)?{
          ????...略
          ????this.lockOwner?=?lockOwner;
          }

          public?void?lock(long?timeout,TimeUnit?unit)?throws?TimeoutException?{
          ???String?lockResult?=?null;
          ???try?{
          ??????//設(shè)置?NX?PX?參數(shù)
          ??????SetParams?params?=?new?SetParams();
          ??????params.nx();
          ??????params.px(TimeUnit.SECONDS.toMillis(lockExpireSecond));
          ????????????
          ??????//加鎖開始時間
          ??????long?startTime=System.nanoTime();
          ????????????
          ??????//?set時的value?改為?lockOwner
          ?????while?(!"OK".equals(lockResult=this.jedis.set(this.lockName,?this.lockOwner,?params))&&!isTimeout(startTime,unit.toNanos(timeout))){
          ?????????Thread.sleep(retryIntervalTime);
          ??????}
          ???}?catch?(Exception?e)?{
          ???????LOG.error("lock?error",e);
          ???}
          ???...略
          }
          ????
          public?boolean?unlock()?{
          ????boolean?unlockResult=false;
          ????try?{
          ???????//?先getValue?,并和當(dāng)前l(fā)ockOwner匹配,匹配上才去解鎖
          ????????if?(this.lockOwner.equals(this.jedis.get(this.lockName)))?{
          ???????????this.jedis.del(this.lockName);
          ???????????unlockResult?=?true;
          ???????}
          ???}catch?(Exception?e){
          ????????LOG.error("unLock?error",e);
          ??}
          ????return?unlockResult;
          }

          有的同學(xué)說,這個解鎖的地方,需要用lua包成原子操作。單從功能上來講,上面的實現(xiàn)也是OK的,因為只有g(shù)et到的結(jié)果和本身匹配,才會進行下述操作。包成lua腳本的目的,應(yīng)該主要是為了減少一次傳輸,提高執(zhí)行效率。

          step4: expire時間不夠產(chǎn)生并發(fā)沖突

          也就是之前的圖中的問題二:線程1 還在執(zhí)行中,鎖就過期釋放了,導(dǎo)致線程2也加鎖成功,這直接導(dǎo)致了線程間的業(yè)務(wù)沖突。怎么辦呢?

          鎖持有期內(nèi),根據(jù)需要,動態(tài)延長鎖的過期時間

          觸發(fā)鎖延期的方案選型,也是個大事,jdk原生timer、調(diào)度線程池、nettyTimer都可以實現(xiàn),選哪個好?

          綜合對比精度、資源消耗等方面,Netty中采用時間輪算法的Timer應(yīng)該是首選,都能管理成千上萬的連接、調(diào)度心跳檢測,拿來搞個鎖延期還不是手拿把掐?


          ?首先,需要構(gòu)建一個全局的Timer來存儲和調(diào)度任務(wù)?其次,在加鎖成功之后添加定時觸發(fā)任務(wù)?再次,延期操作時,需要校驗當(dāng)前線程是否還持有鎖?最后,在解鎖時,取消定時任務(wù)?注意點,任務(wù)需要循環(huán)注冊 , 考慮線程被中斷的情況


          構(gòu)建分布式鎖上下文,用于存儲全局時間輪調(diào)度器:

          public?class?LockContext?{

          ????private?HashedWheelTimer?timer;

          ????private?LockContext(){
          ????????//時間輪參數(shù)可以從業(yè)務(wù)自己的配置獲取
          ????????//?long?tickDuration=(Long)?config.get("tickDuration");
          ????????//?int?tickPerWheel=(int)?config.get("tickPerWheel");?//默認(rèn)1024
          ????????//?boolean?leakDetection=(Boolean)config.get("leakDetection");
          ????????timer?=?new?HashedWheelTimer(new?DefaultThreadFactory("distributedLock-timer",true),?10,?TimeUnit.MILLISECONDS,?1024,?false);
          ????}

          通過構(gòu)造函數(shù),將上下文及調(diào)度器傳入分布式鎖對象:

          public?class?DistributedLock?{
          ????//上下文
          ????private?LockContext?context;
          ????//當(dāng)前持有的?Timer?調(diào)度對象
          ????private?volatile?Timeout??lockTimeout;

          ????public?DistributedLock(Jedis?jedis,?String?lockName,?String?lockOwner,?long?lockExpireSecond,?long?retryIntervalTime,?LockContext?context)?{
          ?????????...其他屬性略
          ????????this.context?=?context;
          ????}

          加鎖成功之后,執(zhí)行調(diào)度器注冊操作:

          public?void?lock(long?timeout,?TimeUnit?unit)?throws?TimeoutException?{
          ????//...加鎖?略
          ????
          ???if?("OK".equals(lockResult))?{
          ???????LOGGER.info("locked?success,lockName:{}",lockName);
          ???????try?{
          ???????????//注冊循環(huán)延期事件
          ???????????registerLoopReExpire();
          ???????}finally?{
          ???????????if?(Thread.currentThread().isInterrupted()&&this.lockTimeout!=null){
          ???????????????LOGGER.warn("線程中斷,定時任務(wù)取消");
          ???????????????this.lockTimeout.cancel();
          ???????????}
          ???????}
          ???}?else?{
          ???????throw?new?TimeoutException("Get?lock?failed?because?of?timeout.");
          ????}
          }

          方法registerLoopReExpire()中是實際的任務(wù)注冊和延期操作:

          private?void?registerLoopReExpire()?{
          ????LOGGER.info("分布式鎖延期任務(wù)注冊");

          ????//每次注冊,都把timeout賦給當(dāng)前鎖對象,用于后續(xù)解鎖時取消
          ????this.lockTimeout?=?context.getTimer().newTimeout(new?TimerTask()?{
          ????????@Override
          ????????public?void?run(Timeout?timeout)?throws?Exception?{
          ????????
          ????????????//校驗是否還在持有鎖,并延長過期時間
          ????????????boolean?isReExpired=reExpireLock(lockName,lockOwner);
          ????????
          ????????????if?(isReExpired)?{
          ????????????????//自己調(diào)自己,循環(huán)注冊
          ????????????????registerLoopReExpire();
          ????????????}else?{
          ????????????????lockTimeout.cancel();
          ????????????}
          ????????}
          ????},?TimeUnit.SECONDS.toMillis(?lockExpireSecond)/2,?TimeUnit.MILLISECONDS);

          ????LOGGER.info("分布式鎖延期任務(wù)注冊完成");
          }

          這里有幾個點需要重點關(guān)注:
          ?newTimeout()操作會返回一個Timeout實體,我們需要依賴該實體來對當(dāng)前任務(wù)進行管理,所以需要賦值給鎖內(nèi)部對象。
          ?
          鎖延期,需要根據(jù)lockOwner?和?lockName來判斷,持有鎖才加鎖,需要使用lua方式來保證判斷和執(zhí)行的原子性。
          ?執(zhí)行完延期操作之后,需要根據(jù)結(jié)果進行后續(xù)處理,成功則繼續(xù)注冊,失敗則取消當(dāng)前任務(wù)。?定時任務(wù)的執(zhí)行時間,應(yīng)該要小于鎖的過期時間,取過期時間的1/21/3或自定義傳入。
          來驗證一下,我們設(shè)置 鎖的過期時間為3秒,業(yè)務(wù)執(zhí)行時間為10秒 ,執(zhí)行:
          可以看到,定時任務(wù)一共延期了6次,最后一次注冊成功了,但是業(yè)務(wù)執(zhí)行完隨著解鎖任務(wù)取消了。

          總結(jié)和回顧

          本文,我們從0到1的對分布式鎖進行了編碼實現(xiàn)。從基本能力,最后到生產(chǎn)環(huán)境的各種訴求基本都進行了填充和完善。
          值得一提的是,除了延期功能,上述大部分能力都是經(jīng)歷過生產(chǎn)環(huán)境考驗的。大家如果發(fā)現(xiàn)了延期功能的實現(xiàn)有什么問題,歡迎留言糾正,一起討論進步。
          當(dāng)然,上述內(nèi)容還有缺失,比如jedis?操作lua腳本的延期實現(xiàn),可重入鎖的改造,由于篇幅原因就不都貼出來了,有興趣的同學(xué)也可以按上述思路繼續(xù)完善。
          另外,我們的上述實現(xiàn)都是基于主從架構(gòu),因此,分布式鎖有可能會在主從切換或者其他宕機場景出現(xiàn)異常,但是個人認(rèn)為,用犧牲效率來保障穩(wěn)定的redLock,在大部分場景下其實沒有什么必要。關(guān)于這部分,其實有幾個號主講述的非常到位,可以搜來看看。
          最后,當(dāng)我們對照redisson的分布式鎖實現(xiàn),來回看我們自己的實現(xiàn),其實就會發(fā)現(xiàn),在主邏輯上的實現(xiàn),其實是大同小異的,只是redission的在可重入、效率兼顧(netty架構(gòu)的運用)等方面要更加完備。
          紙上得來終覺淺~ 共勉~

          有道無術(shù),術(shù)可成;有術(shù)無道,止于術(shù)

          歡迎大家關(guān)注Java之道公眾號


          好文章,我在看??

          瀏覽 75
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  大鸡吧在线 | 日韩无码传媒 | 欧美爱爱视频福利 | w超清无码在线观看 | 亚洲自拍手机在线 |