聊聊分布式鎖——Redis和Redisson的方式

Keeper導(dǎo)讀:分布式鎖的文章其實(shí)早就爛大街了,但有些“菜鳥”寫的太淺,或者自己估計(jì)都沒搞明白,沒用過,看完后我更懵逼了,有些“大牛”寫的吧,又太高級(jí),只能看懂前半部分,后邊就開始講論文了,也比較懵逼,所以還得我這個(gè)中不溜的來總結(jié)下。文章攏共分為幾個(gè)部分:
什么是分布式鎖
分布式鎖的實(shí)現(xiàn)要求
基于 Redisson 實(shí)現(xiàn)的 Redis 分布式鎖
一、什么是分布式鎖
分布式~~鎖,要這么念,首先得是『分布式』,然后才是『鎖』
分布式:這里的分布式指的是分布式系統(tǒng),涉及到好多技術(shù)和理論,包括CAP 理論、分布式存儲(chǔ)、分布式事務(wù)、分布式鎖...
分布式系統(tǒng)是由一組通過網(wǎng)絡(luò)進(jìn)行通信、為了完成共同的任務(wù)而協(xié)調(diào)工作的計(jì)算機(jī)節(jié)點(diǎn)組成的系統(tǒng)。
分布式系統(tǒng)的出現(xiàn)是為了用廉價(jià)的、普通的機(jī)器完成單個(gè)計(jì)算機(jī)無法完成的計(jì)算、存儲(chǔ)任務(wù)。其目的是利用更多的機(jī)器,處理更多的數(shù)據(jù)。
鎖:對(duì)對(duì),就是你想的那個(gè),Javer 學(xué)的第一個(gè)鎖應(yīng)該就是
synchronizedJava 初級(jí)面試問題,來拼寫下 賽克瑞納挨日的
從鎖的使用場(chǎng)景有來看下邊這 3 種鎖:
線程鎖:
synchronized是用在方法或代碼塊中的,我們把它叫『線程鎖』,線程鎖的實(shí)現(xiàn)其實(shí)是靠線程之間共享內(nèi)存實(shí)現(xiàn)的,說白了就是內(nèi)存中的一個(gè)整型數(shù),有空閑、上鎖這類狀態(tài),比如 synchronized 是在對(duì)象頭中的 Mark Word 有個(gè)鎖狀態(tài)標(biāo)志,Lock 的實(shí)現(xiàn)類大部分都有個(gè)叫volatile int state的共享變量來做狀態(tài)標(biāo)志。進(jìn)程鎖:為了控制同一操作系統(tǒng)中多個(gè)進(jìn)程訪問某個(gè)共享資源,因?yàn)檫M(jìn)程具有獨(dú)立性,各個(gè)進(jìn)程無法訪問其他進(jìn)程的資源,因此無法通過 synchronized 等線程鎖實(shí)現(xiàn)進(jìn)程鎖。比如說,我們的同一個(gè) linux 服務(wù)器,部署了好幾個(gè) Java 項(xiàng)目,有可能同時(shí)訪問或操作服務(wù)器上的相同數(shù)據(jù),這就需要進(jìn)程鎖,一般可以用『文件鎖』來達(dá)到進(jìn)程互斥。
分布式鎖:隨著用戶越來越多,我們上了好多服務(wù)器,原本有個(gè)定時(shí)給客戶發(fā)郵件的任務(wù),如果不加以控制的話,到點(diǎn)后每臺(tái)機(jī)器跑一次任務(wù),客戶就會(huì)收到 N 條郵件,這就需要通過分布式鎖來互斥了。
書面解釋:分布式鎖是控制分布式系統(tǒng)或不同系統(tǒng)之間共同訪問共享資源的一種鎖實(shí)現(xiàn),如果不同的系統(tǒng)或同一個(gè)系統(tǒng)的不同主機(jī)之間共享了某個(gè)資源時(shí),往往需要互斥來防止彼此干擾來保證一致性。
知道了什么是分布式鎖,接下來就到了技術(shù)選型環(huán)節(jié)
二、分布式鎖要怎么搞
要實(shí)現(xiàn)一個(gè)分布式鎖,我們一般選擇集群機(jī)器都可以操作的外部系統(tǒng),然后各個(gè)機(jī)器都去這個(gè)外部系統(tǒng)申請(qǐng)鎖。
這個(gè)外部系統(tǒng)一般需要滿足如下要求才能勝任:
互斥:在任意時(shí)刻,只能有一個(gè)客戶端能持有鎖。 防止死鎖:即使有一個(gè)客戶端在持有鎖的期間崩潰而沒有主動(dòng)解鎖,也能保證后續(xù)其他客戶端能加鎖。所以鎖一般要有一個(gè)過期時(shí)間。 獨(dú)占性:解鈴還須系鈴人,加鎖和解鎖必須是同一個(gè)客戶端,一把鎖只能有一把鑰匙,客戶端自己的鎖不能被別人給解開,當(dāng)然也不能去開別人的鎖。 容錯(cuò):外部系統(tǒng)不能太“脆弱”,要保證外部系統(tǒng)的正常運(yùn)行,客戶端才可以加鎖和解鎖。
我覺得可以這么類比:
好多商販要租用某個(gè)倉(cāng)庫,同一時(shí)刻,只能給一個(gè)商販租用,且只能有一把鑰匙,還得有固定的“租期”,到期后要回收的,當(dāng)然最重要的是倉(cāng)庫門不能壞了,要不鎖都鎖不住。這不就是分布式鎖嗎?
感慨自己真是個(gè)愛技術(shù)愛生活的程序猿~~
其實(shí)鎖,本質(zhì)上就是用來進(jìn)行防重操作的(數(shù)據(jù)一致性),像查詢這種冪等操作,就不需要費(fèi)這勁
直接上結(jié)論:
分布式鎖一般有三種實(shí)現(xiàn)方式:1. 數(shù)據(jù)庫樂觀鎖;2. 基于 Redis 的分布式鎖;3. 基于 ZooKeeper 的分布式鎖。
但為了追求更好的性能,我們通常會(huì)選擇使用 Redis 或 Zookeeper 來做。
想必也有喜歡問為什么的同學(xué),那數(shù)據(jù)庫客觀鎖怎么就性能不好了?
使用數(shù)據(jù)庫樂觀鎖,包括主鍵防重,版本號(hào)控制。但是這兩種方法各有利弊。
使用主鍵沖突的策略進(jìn)行防重,在并發(fā)量非常高的情況下對(duì)數(shù)據(jù)庫性能會(huì)有影響,尤其是應(yīng)用數(shù)據(jù)表和主鍵沖突表在一個(gè)庫的時(shí)候,表現(xiàn)更加明顯。還有就是在 MySQL 數(shù)據(jù)庫中采用主鍵沖突防重,在大并發(fā)情況下有可能會(huì)造成鎖表現(xiàn)象,比較好的辦法是在程序中生產(chǎn)主鍵進(jìn)行防重。
使用版本號(hào)策略
這個(gè)策略源于 MySQL 的 MVCC 機(jī)制,使用這個(gè)策略其實(shí)本身沒有什么問題,唯一的問題就是對(duì)數(shù)據(jù)表侵入較大,我們要為每個(gè)表設(shè)計(jì)一個(gè)版本號(hào)字段,然后寫一條判斷 SQL 每次進(jìn)行判斷。
第三趴,編碼
三、基于 Redis 的分布式鎖
其實(shí) Redis 官網(wǎng)已經(jīng)給出了實(shí)現(xiàn):https://redis.io/topics/distlock,說各種書籍和博客用了各種手段去用 Redis 實(shí)現(xiàn)分布式鎖,建議用 Redlock 實(shí)現(xiàn),這樣更規(guī)范、更安全。我們循序漸進(jìn)來看
我們默認(rèn)指定大家用的是 Redis 2.6.12 及更高的版本,就不再去講 setnx、expire 這種了,直接 set 命令加鎖
set key value[expiration EX seconds|PX milliseconds] [NX|XX]
eg:
SET resource_name my_random_value NX PX 30000
SET 命令的行為可以通過一系列參數(shù)來修改
EX second:設(shè)置鍵的過期時(shí)間為second秒。SET key value EX second效果等同于SETEX key second value。PX millisecond:設(shè)置鍵的過期時(shí)間為millisecond毫秒。SET key value PX millisecond效果等同于PSETEX key millisecond value。NX:只在鍵不存在時(shí),才對(duì)鍵進(jìn)行設(shè)置操作。SET key value NX效果等同于SETNX key value。XX:只在鍵已經(jīng)存在時(shí),才對(duì)鍵進(jìn)行設(shè)置操作。
這條指令的意思:當(dāng) key——resource_name 不存在時(shí)創(chuàng)建這樣的key,設(shè)值為 my_random_value,并設(shè)置過期時(shí)間 30000 毫秒。
別看這干了兩件事,因?yàn)?Redis 是單線程的,這一條指令不會(huì)被打斷,所以是原子性的操作。
Redis 實(shí)現(xiàn)分布式鎖的主要步驟:
指定一個(gè) key 作為鎖標(biāo)記,存入 Redis 中,指定一個(gè) 唯一的標(biāo)識(shí) 作為 value。 當(dāng) key 不存在時(shí)才能設(shè)置值,確保同一時(shí)間只有一個(gè)客戶端進(jìn)程獲得鎖,滿足 互斥性 特性。 設(shè)置一個(gè)過期時(shí)間,防止因系統(tǒng)異常導(dǎo)致沒能刪除這個(gè) key,滿足 防死鎖 特性。 當(dāng)處理完業(yè)務(wù)之后需要清除這個(gè) key 來釋放鎖,清除 key 時(shí)需要校驗(yàn) value 值,需要滿足 解鈴還須系鈴人 。
設(shè)置一個(gè)隨機(jī)值的意思是在解鎖時(shí)候判斷 key 的值和我們存儲(chǔ)的隨機(jī)數(shù)是不是一樣,一樣的話,才是自己的鎖,直接 del 解鎖就行。
當(dāng)然這個(gè)兩個(gè)操作要保證原子性,所以 Redis 給出了一段 lua 腳本(Redis 服務(wù)器會(huì)單線程原子性執(zhí)行 lua 腳本,保證 lua 腳本在處理的過程中不會(huì)被任意其它請(qǐng)求打斷。):
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
問題:
我們先拋出兩個(gè)問題思考:
獲取鎖時(shí),過期時(shí)間要設(shè)置多少合適呢?
預(yù)估一個(gè)合適的時(shí)間,其實(shí)沒那么容易,比如操作資源的時(shí)間最慢可能要 10 s,而我們只設(shè)置了 5 s 就過期,那就存在鎖提前過期的風(fēng)險(xiǎn)。這個(gè)問題先記下,我們先看下 Javaer 要怎么在代碼中用 Redis 鎖。
容錯(cuò)性如何保證呢?
Redis 掛了怎么辦,你可能會(huì)說上主從、上集群,但也會(huì)出現(xiàn)這樣的極端情況,當(dāng)我們上鎖后,主節(jié)點(diǎn)就掛了,這個(gè)時(shí)候還沒來的急同步到從節(jié)點(diǎn),主從切換后鎖還是丟了
帶著這兩個(gè)問題,我們接著看
Redisson 實(shí)現(xiàn)代碼
redisson 是 Redis 官方的分布式鎖組件。GitHub 地址:https://github.com/redisson/redisson
Redisson 是一個(gè)在 Redis 的基礎(chǔ)上實(shí)現(xiàn)的 Java 駐內(nèi)存數(shù)據(jù)網(wǎng)格(In-Memory Data Grid)。它不僅提供了一系列的分布式的 Java 常用對(duì)象,還實(shí)現(xiàn)了可重入鎖(Reentrant Lock)、公平鎖(Fair Lock、聯(lián)鎖(MultiLock)、 紅鎖(RedLock)、 讀寫鎖(ReadWriteLock)等,還提供了許多分布式服務(wù)。Redisson 提供了使用 Redis 的最簡(jiǎn)單和最便捷的方法。Redisson 的宗旨是促進(jìn)使用者對(duì) Redis 的關(guān)注分離(Separation of Concern),從而讓使用者能夠?qū)⒕Ω械胤旁谔幚順I(yè)務(wù)邏輯上。
redisson 現(xiàn)在已經(jīng)很強(qiáng)大了,github 的 wiki 也很詳細(xì),分布式鎖的介紹直接戳 Distributed locks and synchronizers
Redisson 支持單點(diǎn)模式、主從模式、哨兵模式、集群模式,只是配置的不同,我們以單點(diǎn)模式來看下怎么使用,代碼很簡(jiǎn)單,都已經(jīng)為我們封裝好了,直接拿來用就好,詳細(xì)的demo,我放在了 github: starfish-learn-redisson 上,這里就不一步步來了
RLock lock = redisson.getLock("myLock");
RLock 提供了各種鎖方法,我們來解讀下這個(gè)接口方法,
注:代碼為 3.16.2 版本,可以看到繼承自 JDK 的 Lock 接口,和 Reddsion 的異步鎖接口 RLockAsync(這個(gè)我們先不研究)
RLock

public interface RLock extends Lock, RLockAsync {
/**
* 獲取鎖的名字
*/
String getName();
/**
* 這個(gè)叫終端鎖操作,表示該鎖可以被中斷 假如A和B同時(shí)調(diào)這個(gè)方法,A獲取鎖,B為獲取鎖,那么B線程可以通過
* Thread.currentThread().interrupt(); 方法真正中斷該線程
*/
void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException;
/**
* 這個(gè)應(yīng)該是最常用的,嘗試獲取鎖
* waitTimeout 嘗試獲取鎖的最大等待時(shí)間,超過這個(gè)值,則認(rèn)為獲取鎖失敗
* leaseTime 鎖的持有時(shí)間,超過這個(gè)時(shí)間鎖會(huì)自動(dòng)失效(值應(yīng)設(shè)置為大于業(yè)務(wù)處理的時(shí)間,確保在鎖有效期內(nèi)業(yè)務(wù)能處理完)
*/
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
/**
* 鎖的有效期設(shè)置為 leaseTime,過期后自動(dòng)失效
* 如果 leaseTime 設(shè)置為 -1, 表示不主動(dòng)過期
*/
void lock(long leaseTime, TimeUnit unit);
/**
* Unlocks the lock independently of its state
*/
boolean forceUnlock();
/**
* 檢查是否被另一個(gè)線程鎖住
*/
boolean isLocked();
/**
* 檢查當(dāng)前線線程是否持有該鎖
*/
boolean isHeldByCurrentThread();
/**
* 這個(gè)就明了了,檢查指定線程是否持有鎖
*/
boolean isHeldByThread(long threadId);
/**
* 返回當(dāng)前線程持有鎖的次數(shù)
*/
int getHoldCount();
/**
* 返回鎖的剩余時(shí)間
* @return time in milliseconds
* -2 if the lock does not exist.
* -1 if the lock exists but has no associated expire.
*/
long remainTimeToLive();
}
Demo
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("").setDatabase(1);
RedissonClient redissonClient = Redisson.create(config);
RLock disLock = redissonClient.getLock("mylock");
boolean isLock;
try {
/**
* 嘗試獲取鎖的最大等待時(shí)間是 100 秒,超過這個(gè)值還沒獲取到,就認(rèn)為獲取失敗
* 鎖的持有時(shí)間是 10 秒
*/
isLock = disLock.tryLock(100, 10, TimeUnit.MILLISECONDS);
if (isLock) {
//做自己的業(yè)務(wù)
Thread.sleep(10000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
disLock.unlock();
}
就是這么簡(jiǎn)單,Redisson 已經(jīng)做好了封裝,使用起來 so easy,如果使用主從、哨兵、集群這種也只是配置不同。
原理
看源碼小 tips,最好是 fork 到自己的倉(cāng)庫,然后拉到本地,邊看邊注釋,然后提交到自己的倉(cāng)庫,也方便之后再看,不想這么麻煩的,也可以直接看我的 Jstarfish/redisson
先看下 RLock 的類關(guān)系

跟著源碼,可以發(fā)現(xiàn) RedissonLock 是 RLock 的直接實(shí)現(xiàn),也是我們加鎖、解鎖操作的核心類
加鎖
主要的加鎖方法就下邊這兩個(gè),區(qū)別也很簡(jiǎn)單,一個(gè)有等待時(shí)間,一個(gè)沒有,所以我們挑個(gè)復(fù)雜的看(源碼包含了另一個(gè)的絕大部分)
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
void lock(long leaseTime, TimeUnit unit);
RedissonLock.tryLock
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// 獲取等鎖的最長(zhǎng)時(shí)間
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
//取得當(dāng)前線程id(判斷是否可重入鎖的關(guān)鍵)
long threadId = Thread.currentThread().getId();
// 【核心點(diǎn)1】嘗試獲取鎖,若返回值為null,則表示已獲取到鎖,返回的ttl就是key的剩余存活時(shí)間
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
return true;
}
// 還可以容忍的等待時(shí)長(zhǎng) = 獲取鎖能容忍的最大等待時(shí)長(zhǎng) - 執(zhí)行完上述操作流程的時(shí)間
time -= System.currentTimeMillis() - current;
if (time <= 0) {
//等不到了,直接返回失敗
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
/**
* 【核心點(diǎn)2】
* 訂閱解鎖消息 redisson_lock__channel:{$KEY},并通過await方法阻塞等待鎖釋放,解決了無效的鎖申請(qǐng)浪費(fèi)資源的問題:
* 基于信息量,當(dāng)鎖被其它資源占用時(shí),當(dāng)前線程通過 Redis 的 channel 訂閱鎖的釋放事件,一旦鎖釋放會(huì)發(fā)消息通知待等待的線程進(jìn)行競(jìng)爭(zhēng)
* 當(dāng) this.await返回false,說明等待時(shí)間已經(jīng)超出獲取鎖最大等待時(shí)間,取消訂閱并返回獲取鎖失敗
* 當(dāng) this.await返回true,進(jìn)入循環(huán)嘗試獲取鎖
*/
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
//await 方法內(nèi)部是用CountDownLatch來實(shí)現(xiàn)阻塞,獲取subscribe異步執(zhí)行的結(jié)果(應(yīng)用了Netty 的 Future)
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
// ttl 不為空,表示已經(jīng)有這樣的key了,只能阻塞等待
try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 來個(gè)死循環(huán),繼續(xù)嘗試著獲取鎖
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
currentTime = System.currentTimeMillis();
/**
* 【核心點(diǎn)3】根據(jù)鎖TTL,調(diào)整阻塞等待時(shí)長(zhǎng);
* 1、latch其實(shí)是個(gè)信號(hào)量Semaphore,調(diào)用其tryAcquire方法會(huì)讓當(dāng)前線程阻塞一段時(shí)間,避免在while循環(huán)中頻繁請(qǐng)求獲鎖;
* 當(dāng)其他線程釋放了占用的鎖,會(huì)廣播解鎖消息,監(jiān)聽器接收解鎖消息,并釋放信號(hào)量,最終會(huì)喚醒阻塞在這里的線程
* 2、該Semaphore的release方法,會(huì)在訂閱解鎖消息的監(jiān)聽器消息處理方法org.redisson.pubsub.LockPubSub#onMessage調(diào)用;
*/
//調(diào)用信號(hào)量的方法來阻塞線程,時(shí)長(zhǎng)為鎖等待時(shí)間和租期時(shí)間中較小的那個(gè)
if (ttl >= 0 && ttl < time) {
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
// 獲取到鎖或者拋出中斷異常,退訂redisson_lock__channel:{$KEY},不再關(guān)注解鎖事件
unsubscribe(subscribeFuture, threadId);
}
}
接著看注釋中提到的 3 個(gè)核心點(diǎn)
核心點(diǎn)1-嘗試加鎖:RedissonLock.tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
// leaseTime != -1 說明沒過期
if (leaseTime != -1) {
// 實(shí)質(zhì)是異步執(zhí)行加鎖Lua腳本
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 否則,已經(jīng)過期了,傳參變?yōu)樾碌臅r(shí)間(續(xù)期后)
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 續(xù)期
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
異步執(zhí)行加鎖 Lua 腳本:RedissonLock.tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
// 1.如果緩存中的key不存在,則執(zhí)行 hincrby 命令(hincrby key UUID+threadId 1), 設(shè)值重入次數(shù)1
// 然后通過 pexpire 命令設(shè)置鎖的過期時(shí)間(即鎖的租約時(shí)間)
// 返回空值 nil ,表示獲取鎖成功
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果key已經(jīng)存在,并且value也匹配,表示是當(dāng)前線程持有的鎖,則執(zhí)行 hincrby 命令,重入次數(shù)加1,并且設(shè)置失效時(shí)間
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//如果key已經(jīng)存在,但是value不匹配,說明鎖已經(jīng)被其他線程持有,通過 pttl 命令獲取鎖的剩余存活時(shí)間并返回,至此獲取鎖失敗
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
KEYS[1] 就是 Collections.singletonList(getName()),表示分布式鎖的key; ARGV[1] 就是internalLockLeaseTime,即鎖的租約時(shí)間(持有鎖的有效時(shí)間),默認(rèn)30s; ARGV[2] 就是getLockName(threadId),是獲取鎖時(shí)set的唯一值 value,即UUID+threadId
看門狗續(xù)期:RedissonBaseLock.scheduleExpirationRenewal
// 基于線程ID定時(shí)調(diào)度和續(xù)期
protected void scheduleExpirationRenewal(long threadId) {
// 新建一個(gè)ExpirationEntry記錄線程重入計(jì)數(shù)
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
// 當(dāng)前進(jìn)行的當(dāng)前線程重入加鎖
oldEntry.addThreadId(threadId);
} else {
// 當(dāng)前進(jìn)行的當(dāng)前線程首次加鎖
entry.addThreadId(threadId);
// 首次新建ExpirationEntry需要觸發(fā)續(xù)期方法,記錄續(xù)期的任務(wù)句柄
renewExpiration();
}
}
// 處理續(xù)期
private void renewExpiration() {
// 根據(jù)entryName獲取ExpirationEntry實(shí)例,如果為空,說明在cancelExpirationRenewal()方法已經(jīng)被移除,一般是解鎖的時(shí)候觸發(fā)
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 新建一個(gè)定時(shí)任務(wù),這個(gè)就是看門狗的實(shí)現(xiàn),io.netty.util.Timeout是Netty結(jié)合時(shí)間輪使用的定時(shí)任務(wù)實(shí)例
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 這里是重復(fù)外面的那個(gè)邏輯,
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
// 獲取ExpirationEntry中首個(gè)線程ID,如果為空說明調(diào)用過cancelExpirationRenewal()方法清空持有的線程重入計(jì)數(shù),一般是鎖已經(jīng)釋放的場(chǎng)景
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 向Redis異步發(fā)送續(xù)期的命令
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
// 拋出異常,續(xù)期失敗,只打印日志和直接終止任務(wù)
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
// 返回true證明續(xù)期成功,則遞歸調(diào)用續(xù)期方法(重新調(diào)度自己),續(xù)期失敗說明對(duì)應(yīng)的鎖已經(jīng)不存在,直接返回,不再遞歸
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}// 這里的執(zhí)行頻率為leaseTime轉(zhuǎn)換為ms單位下的三分之一,由于leaseTime初始值為-1的情況下才會(huì)進(jìn)入續(xù)期邏輯,那么這里的執(zhí)行頻率為lockWatchdogTimeout的三分之一
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
// ExpirationEntry實(shí)例持有調(diào)度任務(wù)實(shí)例
ee.setTimeout(task);
}
核心點(diǎn)2-訂閱解鎖消息:RedissonLock.subscribe
protected final LockPubSub pubSub;
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
//在構(gòu)造器中初始化pubSub,跟著這幾個(gè)get方法會(huì)發(fā)現(xiàn)他們都是在構(gòu)造器中初始化的,在PublishSubscribeService中會(huì)有
// private final AsyncSemaphore[] locks = new AsyncSemaphore[50]; 這樣一段代碼,初始化了一組信號(hào)量
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return pubSub.subscribe(getEntryName(), getChannelName());
}
// 在LockPubSub中注冊(cè)一個(gè)entryName -> RedissonLockEntry的哈希映射,RedissonLockEntry實(shí)例中存放著RPromise<RedissonLockEntry>結(jié)果,一個(gè)信號(hào)量形式的鎖和訂閱方法重入計(jì)數(shù)器
public RFuture<E> subscribe(String entryName, String channelName) {
AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
RPromise<E> newPromise = new RedissonPromise<>();
semaphore.acquire(() -> {
if (!newPromise.setUncancellable()) {
semaphore.release();
return;
}
E entry = entries.get(entryName);
if (entry != null) {
entry.acquire();
semaphore.release();
entry.getPromise().onComplete(new TransferListener<E>(newPromise));
return;
}
E value = createEntry(newPromise);
value.acquire();
E oldValue = entries.putIfAbsent(entryName, value);
if (oldValue != null) {
oldValue.acquire();
semaphore.release();
oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));
return;
}
RedisPubSubListener<Object> listener = createListener(channelName, value);
service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
});
return newPromise;
}
核心點(diǎn) 3 比較簡(jiǎn)單,就不說了
解鎖
RedissonLock.unlock()
@Override
public void unlock() {
try {
// 獲取當(dāng)前調(diào)用解鎖操作的線程ID
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
// IllegalMonitorStateException一般是A線程加鎖,B線程解鎖,內(nèi)部判斷線程狀態(tài)不一致拋出的
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
RedissonBaseLock.unlockAsync
@Override
public RFuture<Void> unlockAsync(long threadId) {
// 構(gòu)建一個(gè)結(jié)果RedissonPromise
RPromise<Void> result = new RedissonPromise<>();
// 返回的RFuture如果持有的結(jié)果為true,說明解鎖成功,返回NULL說明線程ID異常,加鎖和解鎖的客戶端線程不是同一個(gè)線程
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
// 取消看門狗的續(xù)期任務(wù)
cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
RedissonLock.unlockInnerAsync
// 真正的內(nèi)部解鎖的方法,執(zhí)行解鎖的Lua腳本
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//如果分布式鎖存在,但是value不匹配,表示鎖已經(jīng)被其他線程占用,無權(quán)釋放鎖,那么直接返回空值(解鈴還須系鈴人)
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//如果value匹配,則就是當(dāng)前線程占有分布式鎖,那么將重入次數(shù)減1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//重入次數(shù)減1后的值如果大于0,表示分布式鎖有重入過,那么只能更新失效時(shí)間,還不能刪除
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
//重入次數(shù)減1后的值如果為0,這時(shí)就可以刪除這個(gè)KEY,并發(fā)布解鎖消息,返回1
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
//這5個(gè)參數(shù)分別對(duì)應(yīng)KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
我只列出了一小部分代碼,更多的內(nèi)容還是得自己動(dòng)手
從源碼中,我們可以看到 Redisson 幫我們解決了拋出的第一個(gè)問題:失效時(shí)間設(shè)置多長(zhǎng)時(shí)間為好?
Redisson 提供了看門狗,每獲得一個(gè)鎖時(shí),只設(shè)置一個(gè)很短的超時(shí)時(shí)間,同時(shí)起一個(gè)線程在每次快要到超時(shí)時(shí)間時(shí)去刷新鎖的超時(shí)時(shí)間。在釋放鎖的同時(shí)結(jié)束這個(gè)線程。
但是沒有解決節(jié)點(diǎn)掛掉,丟失鎖的問題,接著來~

四、RedLock
我們上邊介紹的分布式鎖,在某些極端情況下仍然是有缺陷的
客戶端長(zhǎng)時(shí)間內(nèi)阻塞導(dǎo)致鎖失效
客戶端 1 得到了鎖,因?yàn)榫W(wǎng)絡(luò)問題或者 GC 等原因?qū)е麻L(zhǎng)時(shí)間阻塞,然后業(yè)務(wù)程序還沒執(zhí)行完鎖就過期了,這時(shí)候客戶端 2 也能正常拿到鎖,可能會(huì)導(dǎo)致線程安全的問題。
Redis 服務(wù)器時(shí)鐘漂移
如果 Redis 服務(wù)器的機(jī)器時(shí)間發(fā)生了向前跳躍,就會(huì)導(dǎo)致這個(gè) key 過早超時(shí)失效,比如說客戶端 1 拿到鎖后,key 還沒有到過期時(shí)間,但是 Redis 服務(wù)器的時(shí)間比客戶端快了 2 分鐘,導(dǎo)致 key 提前就失效了,這時(shí)候,如果客戶端 1 還沒有釋放鎖的話,就可能導(dǎo)致多個(gè)客戶端同時(shí)持有同一把鎖的問題。
單點(diǎn)實(shí)例安全問題
如果 Redis 是單機(jī)模式的,如果掛了的話,那所有的客戶端都獲取不到鎖了,假設(shè)你是主從模式,但 Redis 的主從同步是異步進(jìn)行的,如果 Redis 主宕機(jī)了,這個(gè)時(shí)候從機(jī)并沒有同步到這一把鎖,那么機(jī)器 B 再次申請(qǐng)的時(shí)候就會(huì)再次申請(qǐng)到這把鎖,這也是問題
為了解決這些個(gè)問題 Redis 作者提出了 RedLock 紅鎖的算法,在 Redission 中也對(duì) RedLock 進(jìn)行了實(shí)現(xiàn)。
Redis 官網(wǎng)對(duì) redLock 算法的介紹大致如下:The Redlock algorithm
在分布式版本的算法里我們假設(shè)我們有 N 個(gè) Redis master 節(jié)點(diǎn),這些節(jié)點(diǎn)都是完全獨(dú)立的,我們不用任何復(fù)制或者其他隱含的分布式協(xié)調(diào)機(jī)制。之前我們已經(jīng)描述了在 Redis 單實(shí)例下怎么安全地獲取和釋放鎖。我們確保將在每(N) 個(gè)實(shí)例上使用此方法獲取和釋放鎖。在我們的例子里面我們?cè)O(shè)置 N=5,這是一個(gè)比較合理的設(shè)置,所以我們需要在 5 臺(tái)機(jī)器或者虛擬機(jī)上面運(yùn)行這些實(shí)例,這樣保證他們不會(huì)同時(shí)都宕掉。為了取到鎖,客戶端應(yīng)該執(zhí)行以下操作:
獲取當(dāng)前 Unix 時(shí)間,以毫秒為單位。 依次嘗試從 5 個(gè)實(shí)例,使用相同的 key 和具有唯一性的 value(例如UUID)獲取鎖。當(dāng)向 Redis 請(qǐng)求獲取鎖時(shí),客戶端應(yīng)該設(shè)置一個(gè)嘗試從某個(gè) Reids 實(shí)例獲取鎖的最大等待時(shí)間(超過這個(gè)時(shí)間,則立馬詢問下一個(gè)實(shí)例),這個(gè)超時(shí)時(shí)間應(yīng)該小于鎖的失效時(shí)間。例如你的鎖自動(dòng)失效時(shí)間為 10 秒,則超時(shí)時(shí)間應(yīng)該在 5-50 毫秒之間。這樣可以避免服務(wù)器端 Redis 已經(jīng)掛掉的情況下,客戶端還在死死地等待響應(yīng)結(jié)果。如果服務(wù)器端沒有在規(guī)定時(shí)間內(nèi)響應(yīng),客戶端應(yīng)該盡快嘗試去另外一個(gè) Redis 實(shí)例請(qǐng)求獲取鎖。 客戶端使用當(dāng)前時(shí)間減去開始獲取鎖時(shí)間(步驟1記錄的時(shí)間)就得到獲取鎖消耗的時(shí)間。當(dāng)且僅當(dāng)從大多數(shù)(N/2+1,這里是3個(gè)節(jié)點(diǎn))的 Redis 節(jié)點(diǎn)都取到鎖,并且使用的總耗時(shí)小于鎖失效時(shí)間時(shí),鎖才算獲取成功。 如果取到了鎖,key 的真正有效時(shí)間 = 有效時(shí)間(獲取鎖時(shí)設(shè)置的 key 的自動(dòng)超時(shí)時(shí)間) - 獲取鎖的總耗時(shí)(詢問各個(gè) Redis 實(shí)例的總耗時(shí)之和)(步驟 3 計(jì)算的結(jié)果)。 如果因?yàn)槟承┰颍罱K獲取鎖失敗(即沒有在至少 “N/2+1 ”個(gè) Redis 實(shí)例取到鎖或者“獲取鎖的總耗時(shí)”超過了“有效時(shí)間”),客戶端應(yīng)該在所有的 Redis 實(shí)例上進(jìn)行解鎖(即便某些 Redis 實(shí)例根本就沒有加鎖成功,這樣可以防止某些節(jié)點(diǎn)獲取到鎖但是客戶端沒有得到響應(yīng)而導(dǎo)致接下來的一段時(shí)間不能被重新獲取鎖)。
總結(jié)下就是:
客戶端在多個(gè) Redis 實(shí)例上申請(qǐng)加鎖,必須保證大多數(shù)節(jié)點(diǎn)加鎖成功
解決容錯(cuò)性問題,部分實(shí)例異常,剩下的還能加鎖成功
大多數(shù)節(jié)點(diǎn)加鎖的總耗時(shí),要小于鎖設(shè)置的過期時(shí)間
多實(shí)例操作,可能存在網(wǎng)絡(luò)延遲、丟包、超時(shí)等問題,所以就算是大多數(shù)節(jié)點(diǎn)加鎖成功,如果加鎖的累積耗時(shí)超過了鎖的過期時(shí)間,那有些節(jié)點(diǎn)上的鎖可能也已經(jīng)失效了,還是沒有意義的
釋放鎖,要向全部節(jié)點(diǎn)發(fā)起釋放鎖請(qǐng)求
如果部分節(jié)點(diǎn)加鎖成功,但最后由于異常導(dǎo)致大部分節(jié)點(diǎn)沒加鎖成功,就要釋放掉所有的,各節(jié)點(diǎn)要保持一致
關(guān)于 RedLock,兩位分布式大佬,Antirez 和 Martin 還進(jìn)行過一場(chǎng)爭(zhēng)論,感興趣的也可以看看
Config config1 = new Config();
config1.useSingleServer().setAddress("127.0.0.1:6379");
RedissonClient redissonClient1 = Redisson.create(config1);
Config config2 = new Config();
config2.useSingleServer().setAddress("127.0.0.1:5378");
RedissonClient redissonClient2 = Redisson.create(config2);
Config config3 = new Config();
config3.useSingleServer().setAddress("127.0.0.1:5379");
RedissonClient redissonClient3 = Redisson.create(config3);
/**
* 獲取多個(gè) RLock 對(duì)象
*/
RLock lock1 = redissonClient1.getLock(lockKey);
RLock lock2 = redissonClient2.getLock(lockKey);
RLock lock3 = redissonClient3.getLock(lockKey);
/**
* 根據(jù)多個(gè) RLock 對(duì)象構(gòu)建 RedissonRedLock (最核心的差別就在這里)
*/
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
try {
/**
* 4.嘗試獲取鎖
* waitTimeout 嘗試獲取鎖的最大等待時(shí)間,超過這個(gè)值,則認(rèn)為獲取鎖失敗
* leaseTime 鎖的持有時(shí)間,超過這個(gè)時(shí)間鎖會(huì)自動(dòng)失效(值應(yīng)設(shè)置為大于業(yè)務(wù)處理的時(shí)間,確保在鎖有效期內(nèi)業(yè)務(wù)能處理完)
*/
boolean res = redLock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
//成功獲得鎖,在這里處理業(yè)務(wù)
}
} catch (Exception e) {
throw new RuntimeException("aquire lock fail");
}finally{
//無論如何, 最后都要解鎖
redLock.unlock();
}
最核心的變化就是需要構(gòu)建多個(gè) RLock ,然后根據(jù)多個(gè) RLock 構(gòu)建成一個(gè) RedissonRedLock,因?yàn)?redLock 算法是建立在多個(gè)互相獨(dú)立的 Redis 環(huán)境之上的(為了區(qū)分可以叫為 Redission node),Redission node 節(jié)點(diǎn)既可以是單機(jī)模式(single),也可以是主從模式(master/salve),哨兵模式(sentinal),或者集群模式(cluster)。這就意味著,不能跟以往這樣只搭建 1個(gè) cluster、或 1個(gè) sentinel 集群,或是1套主從架構(gòu)就了事了,需要為 RedissonRedLock 額外搭建多幾套獨(dú)立的 Redission 節(jié)點(diǎn)。
RedissonMultiLock.tryLock
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// try {
// return tryLockAsync(waitTime, leaseTime, unit).get();
// } catch (ExecutionException e) {
// throw new IllegalStateException(e);
// }
long newLeaseTime = -1;
if (leaseTime != -1) {
if (waitTime == -1) {
newLeaseTime = unit.toMillis(leaseTime);
} else {
newLeaseTime = unit.toMillis(waitTime)*2;
}
}
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
long lockWaitTime = calcLockWaitTime(remainTime);
//允許加鎖失敗節(jié)點(diǎn)個(gè)數(shù)限制(N-(N/2+1))
int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
// 遍歷所有節(jié)點(diǎn)通過EVAL命令執(zhí)行l(wèi)ua加鎖
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
// 對(duì)節(jié)點(diǎn)嘗試加鎖
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
// 如果拋出這類異常,為了防止加鎖成功,但是響應(yīng)失敗,需要解鎖所有節(jié)點(diǎn)
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
lockAcquired = false;
}
if (lockAcquired) {
acquiredLocks.add(lock);
} else {
/*
* 計(jì)算已經(jīng)申請(qǐng)鎖失敗的節(jié)點(diǎn)是否已經(jīng)到達(dá) 允許加鎖失敗節(jié)點(diǎn)個(gè)數(shù)限制 (N-(N/2+1))
* 如果已經(jīng)到達(dá), 就認(rèn)定最終申請(qǐng)鎖失敗,則沒有必要繼續(xù)從后面的節(jié)點(diǎn)申請(qǐng)了
* 因?yàn)?nbsp;Redlock 算法要求至少N/2+1 個(gè)節(jié)點(diǎn)都加鎖成功,才算最終的鎖申請(qǐng)成功
*/
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}
if (failedLocksLimit == 0) {
unlockInner(acquiredLocks);
if (waitTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
failedLocksLimit--;
}
}
//計(jì)算 目前從各個(gè)節(jié)點(diǎn)獲取鎖已經(jīng)消耗的總時(shí)間,如果已經(jīng)等于最大等待時(shí)間,則認(rèn)定最終申請(qǐng)鎖失敗,返回false
if (remainTime != -1) {
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}
if (leaseTime != -1) {
acquiredLocks.stream()
.map(l -> (RedissonLock) l)
.map(l -> l.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS))
.forEach(f -> f.syncUninterruptibly());
}
return true;
}
參考與感謝
《Redis —— Distributed locks with Redis》 《Redisson —— Distributed locks and synchronizers》 慢談 Redis 實(shí)現(xiàn)分布式鎖 以及 Redisson 源碼解析 理解Redisson中分布式鎖的實(shí)現(xiàn)
歡迎關(guān)注微信公眾號(hào):互聯(lián)網(wǎng)全棧架構(gòu),收取更多有價(jià)值的信息。
