從零到一編碼實現(xiàn)Redis分布式鎖
問: 不是有redission等現(xiàn)成工具嗎?咋不用?
答: 不,我就想自己寫一個!
陳建斌說 : 你這個男的怎么回事 ?!
有的同學(xué),就是這么尿性。也能理解,不自己弄一下,怎么能理解透徹,那就一起來搞一下唄!
使用場景和選型
分布式多節(jié)點的部署方式,使得共享變量有可能被同時操作,遇到有數(shù)據(jù)一致性要求的情況,就需要采取全局鎖定的措施來保障并發(fā)操作下的一致性要求,如,庫存扣減操作、同一個商品的上下架和更新操作等等。
常見的,分布式鎖采用Zookeeper和Redis來實現(xiàn)。怎么取舍呢?
zookeeper | redis | |
加鎖原理 | 創(chuàng)建節(jié)點,節(jié)點已存在時創(chuàng)建失敗 | 插入數(shù)據(jù),數(shù)據(jù)已存在則設(shè)置失敗 |
過期保護 | 節(jié)點類型為臨時節(jié)點,斷連刪除 | 設(shè)置過期時間,到期刪除 |
優(yōu)點 | 在加鎖失敗時,zk的注冊通知更優(yōu)雅 | 速度快,性能高 |
缺點 | 只有l(wèi)eader負(fù)責(zé)寫,然后通知flower,性能較差 | 搶鎖失敗時,需要自旋循環(huán)嘗試 |
生產(chǎn)環(huán)境下,性能往往被優(yōu)先考慮,相比較各自的優(yōu)缺點,綜合考慮,我們一般更傾向于redis。
從0到1 實現(xiàn)分布式鎖
step1: 加鎖 和 解鎖的基礎(chǔ)能力構(gòu)建
Jedis.set(key, value, params)?????
這個2.6之后新增的加強版set命令是真不錯,解決了加鎖時設(shè)置鎖超時時間的原子訴求,防止服務(wù)宕機導(dǎo)致的死鎖~
(1) 一個具有加鎖解鎖功能的分布式鎖對象,最少要有?jedis客戶端?、?對應(yīng)的redis key?、?鎖超時時間?:
//構(gòu)建分布式鎖對象
public?class?DistributedLock?{
????private?Jedis?jedis;
????private?String?lockName;
????private?long?lockExpireSecond;
????public?DistributedLock(Jedis?jedis,?String?lockName,?long?lockExpireSecond)?{
????????this.jedis?=?jedis;
????????this.lockName?=?lockName;
????????this.lockExpireSecond?=?lockExpireSecond;
????}
}
(2) 利用jedis提供的SetParams?,對NX?,?PX?在jedis.set操作中一次性的原子的完成設(shè)置:
public?void?lock()?throws?BizException?{
????String?lockResult?=?null;
????try?{
???????//設(shè)置?NX?PX?參數(shù)
???????SetParams?params?=?new?SetParams();
???????params.nx();
???????params.px(TimeUnit.SECONDS.toMillis(lockExpireSecond));
???????//執(zhí)行加鎖?,?value?暫定?為固定字符串
???????lockResult?=?this.jedis.set(this.lockName,?"lockValue",?params);
???}?catch?(Exception?e)?{
???????LOG.error("lock?error",e);
???}
???if?("OK".equals(lockResult))?{
???????LOG.debug("locked?success,lockName:{}",lockName);
???}?else?{
??????throw?new?BizException("Get?lock?failed.");
???}
}
(3) 用jedis.del命令完成解鎖:
?public?boolean?unlock()?{
????boolean?unlockResult=false;
???try?{
???????this.jedis.del(this.lockName);
???????unlockResult=true;
???}catch?(Exception?e){
??????LOG.error("unLock?error",e);
???}
????return?unlockResult;
}
step2: 加鎖失敗直接結(jié)束? 希望多試幾次
從上面的構(gòu)造函數(shù)和lock()實現(xiàn),發(fā)現(xiàn)當(dāng)前實現(xiàn)屬于一錘子買賣,不成功便成仁。這其實不太滿足我們的生產(chǎn)需求,很多場景下,業(yè)務(wù)執(zhí)行速度是很快的,只要稍微等一等,就可以。那怎么辦?
自定義重試次數(shù)和等待間隔,有限重試等待
//新增重試間隔屬性
private?long?retryIntervalTime;?
//通過構(gòu)造方法初始化重試間隔
public?DistributedLock(Jedis?jedis,?String?lockName,?long?lockExpireSecond,?long?retryIntervalTime)?{
???...略
???this.retryIntervalTime?=?retryIntervalTime;
}
//新增入?yún)?,加鎖超時時間
public?void?lock(long?timeout,TimeUnit?unit)?throws?TimeoutException?{
???String?lockResult?=?null;
???try?{
???????//設(shè)置?NX?PX?參數(shù)
???????SetParams?params?=?new?SetParams();
???????params.nx();
???????params.px(TimeUnit.SECONDS.toMillis(lockExpireSecond));
????????????
??????//加鎖開始時間
??????long?startTime=System.nanoTime();
????????????
??????//循環(huán)有限等待
??????while?(!"OK".equals(lockResult=this.jedis.set(this.lockName,?"lockValue",?params))&&!isTimeout(startTime,unit.toNanos(timeout))){
???????????Thread.sleep(retryIntervalTime);
?????}
??}?catch?(Exception?e)?{
??????LOG.error("lock?error",e);
??}
????????
??//修改拋出異常類型為超時異常
??if?("OK".equals(lockResult))?{
???????LOG.debug("locked?success,lockName:{}",lockName);
??}?else?{
??????throw?new?TimeoutException("Get?lock?failed?because?of?timeout.");
??}
}
step3: 只能解自己加的鎖,別人的鎖不能亂動
考慮一個問題:我們?yōu)榱朔乐辜渔i后機器宕機的情況,給鎖設(shè)置了過期時間,以此來保障鎖可以在服務(wù)節(jié)點宕機不能解鎖時,也可以給后續(xù)業(yè)務(wù)提供鎖操作。

參考《How to do distributed locking》
上圖中,因為業(yè)務(wù)執(zhí)行時間的不可控(或者遇到GC等不可預(yù)期的停頓),給分布式鎖帶來了使用問題。
我們先看問題一:用戶線程1?把 線程2的鎖釋放了!怎么辦呢?
加鎖保存線程標(biāo)識,解鎖校驗,非自己的鎖不釋放
//其他屬性略,新增lockOwner標(biāo)識
private?String?lockOwner;
//通過構(gòu)造函數(shù)初始化lockOwner標(biāo)識?
public?DistributedLock(Jedis?jedis,?String?lockName,?String?lockOwner,?long?lockExpireSecond,?long?retryIntervalTime)?{
????...略
????this.lockOwner?=?lockOwner;
}
public?void?lock(long?timeout,TimeUnit?unit)?throws?TimeoutException?{
???String?lockResult?=?null;
???try?{
??????//設(shè)置?NX?PX?參數(shù)
??????SetParams?params?=?new?SetParams();
??????params.nx();
??????params.px(TimeUnit.SECONDS.toMillis(lockExpireSecond));
????????????
??????//加鎖開始時間
??????long?startTime=System.nanoTime();
????????????
??????//?set時的value?改為?lockOwner
?????while?(!"OK".equals(lockResult=this.jedis.set(this.lockName,?this.lockOwner,?params))&&!isTimeout(startTime,unit.toNanos(timeout))){
?????????Thread.sleep(retryIntervalTime);
??????}
???}?catch?(Exception?e)?{
???????LOG.error("lock?error",e);
???}
???...略
}
????
public?boolean?unlock()?{
????boolean?unlockResult=false;
????try?{
???????//?先getValue?,并和當(dāng)前l(fā)ockOwner匹配,匹配上才去解鎖
????????if?(this.lockOwner.equals(this.jedis.get(this.lockName)))?{
???????????this.jedis.del(this.lockName);
???????????unlockResult?=?true;
???????}
???}catch?(Exception?e){
????????LOG.error("unLock?error",e);
??}
????return?unlockResult;
}
有的同學(xué)說,這個解鎖的地方,需要用lua包成原子操作。單從功能上來講,上面的實現(xiàn)也是OK的,因為只有g(shù)et到的結(jié)果和本身匹配,才會進行下述操作。包成lua腳本的目的,應(yīng)該主要是為了減少一次傳輸,提高執(zhí)行效率。
step4: expire時間不夠產(chǎn)生并發(fā)沖突

也就是之前的圖中的問題二:線程1 還在執(zhí)行中,鎖就過期釋放了,導(dǎo)致線程2也加鎖成功,這直接導(dǎo)致了線程間的業(yè)務(wù)沖突。怎么辦呢?
鎖持有期內(nèi),根據(jù)需要,動態(tài)延長鎖的過期時間
觸發(fā)鎖延期的方案選型,也是個大事,jdk原生timer、調(diào)度線程池、netty的Timer都可以實現(xiàn),選哪個好?
綜合對比精度、資源消耗等方面,Netty中采用時間輪算法的Timer應(yīng)該是首選,都能管理成千上萬的連接、調(diào)度心跳檢測,拿來搞個鎖延期還不是手拿把掐?
public?class?LockContext?{
????private?HashedWheelTimer?timer;
????private?LockContext(){
????????//時間輪參數(shù)可以從業(yè)務(wù)自己的配置獲取
????????//?long?tickDuration=(Long)?config.get("tickDuration");
????????//?int?tickPerWheel=(int)?config.get("tickPerWheel");?//默認(rèn)1024
????????//?boolean?leakDetection=(Boolean)config.get("leakDetection");
????????timer?=?new?HashedWheelTimer(new?DefaultThreadFactory("distributedLock-timer",true),?10,?TimeUnit.MILLISECONDS,?1024,?false);
????}
public?class?DistributedLock?{
????//上下文
????private?LockContext?context;
????//當(dāng)前持有的?Timer?調(diào)度對象
????private?volatile?Timeout??lockTimeout;
????public?DistributedLock(Jedis?jedis,?String?lockName,?String?lockOwner,?long?lockExpireSecond,?long?retryIntervalTime,?LockContext?context)?{
?????????...其他屬性略
????????this.context?=?context;
????}
public?void?lock(long?timeout,?TimeUnit?unit)?throws?TimeoutException?{
????//...加鎖?略
????
???if?("OK".equals(lockResult))?{
???????LOGGER.info("locked?success,lockName:{}",lockName);
???????try?{
???????????//注冊循環(huán)延期事件
???????????registerLoopReExpire();
???????}finally?{
???????????if?(Thread.currentThread().isInterrupted()&&this.lockTimeout!=null){
???????????????LOGGER.warn("線程中斷,定時任務(wù)取消");
???????????????this.lockTimeout.cancel();
???????????}
???????}
???}?else?{
???????throw?new?TimeoutException("Get?lock?failed?because?of?timeout.");
????}
}
registerLoopReExpire()中是實際的任務(wù)注冊和延期操作:
private?void?registerLoopReExpire()?{
????LOGGER.info("分布式鎖延期任務(wù)注冊");
????//每次注冊,都把timeout賦給當(dāng)前鎖對象,用于后續(xù)解鎖時取消
????this.lockTimeout?=?context.getTimer().newTimeout(new?TimerTask()?{
????????@Override
????????public?void?run(Timeout?timeout)?throws?Exception?{
????????
????????????//校驗是否還在持有鎖,并延長過期時間
????????????boolean?isReExpired=reExpireLock(lockName,lockOwner);
????????
????????????if?(isReExpired)?{
????????????????//自己調(diào)自己,循環(huán)注冊
????????????????registerLoopReExpire();
????????????}else?{
????????????????lockTimeout.cancel();
????????????}
????????}
????},?TimeUnit.SECONDS.toMillis(?lockExpireSecond)/2,?TimeUnit.MILLISECONDS);
????LOGGER.info("分布式鎖延期任務(wù)注冊完成");
}
newTimeout()操作會返回一個Timeout實體,我們需要依賴該實體來對當(dāng)前任務(wù)進行管理,所以需要賦值給鎖內(nèi)部對象。lockOwner?和?lockName來判斷,持有鎖才加鎖,需要使用lua方式來保證判斷和執(zhí)行的原子性。1/2或1/3或自定義傳入。
總結(jié)和回顧
jedis?操作lua腳本的延期實現(xiàn),可重入鎖的改造,由于篇幅原因就不都貼出來了,有興趣的同學(xué)也可以按上述思路繼續(xù)完善。有道無術(shù),術(shù)可成;有術(shù)無道,止于術(shù)
歡迎大家關(guān)注Java之道公眾號
好文章,我在看??
