怎樣實現(xiàn)一個分布式的公平鎖?
在并發(fā)的場景下,很多時候,我們的業(yè)務開發(fā)中會有加鎖的操作,以此來保證執(zhí)行的互斥,保障業(yè)務邏輯。比如在 Java 里就有多種基于 AQS 的組件,方便使用。創(chuàng)建鎖的時候,還可以特別的指定一下,當前這個鎖,是否需要公平。
/*** Creates an instance of {@code ReentrantLock} with the* given fairness policy.** @param fair {@code true} if this lock should use a fair ordering policy*/public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}
對于需要公平的場景,和我們真實生活一樣,這里的 FairSync 會通過一個 CLH 隊列將請求線程排隊。
在單實例應用中,本機的 CLH 排隊就足夠了,我們現(xiàn)在切換到分布式的場景。
在分布式場景下,為了實現(xiàn)鎖的功能,就出現(xiàn)了各種分布式鎖。相比單實例場景下的鎖只能鎖定自己的實例,分布式的鎖由于統(tǒng)一的外部中間件的介入,將鎖的信息提取到獨立的外部,所以可以將多個應用實例做到互斥。
那分布式的場景下,怎么樣能保證公平呢?
和我們從單實例到分布式加鎖的思路一樣,要公平,就排隊,在統(tǒng)一的第三方處進行排隊。
來第三方這里排隊,也有一些需要注意的點。比如你在判斷當前隊列里有沒有等待,如果沒有就取鎖成功,執(zhí)行,有等待就入隊,而判斷的這個邏輯,仍然可能是并發(fā)操作,也需要做到加鎖處理。就好像你看了一眼某個飯店沒什么人,開心的去買了杯奶茶,回來一看滿了。
對于分布式鎖,基于 Redis 的 Redission 用得不少。如果換成Redis 分布式公平鎖,那基本就只有 Redission 了。
下面我通過兩段代碼,以及部分文字,描述下 Redission 的公平鎖的實現(xiàn)。
簡要概括下:
Redission 的公平鎖,是通過「 Redis + Lua?腳本」來實現(xiàn)的。在拿到一個 Redission 的 Redis 連接之后,通過?「eval()」可以執(zhí)行一段 Lua script,同時傳入一些 key 和 args。因為不管有多少 Lua 的邏輯,都是在同一個連接內,所以不會存在買完奶茶發(fā)現(xiàn)人滿了的情況。這里應用到了 Redis 的 pub/sub 功能,等待的線程,會在輪到自己時收到 Redis 的提醒,前提是需要訂閱了相應的通知。
來看加鎖的 Lua 邏輯,代碼寫的比較清楚,我也加了些對應的Redis操作以及參數(shù)的注釋。通過 list 來存儲排隊信息,同時每個等待線程都有一個超時時間,超時退出隊列。?所以eval 執(zhí)行這個的時候,返回的是一個 ttl Long 類型。表示過期時間。
--[[用于 lock 操作。KEYS[1] = lockNameKEYS[2] = waitQueueName????KEYS[3]?=?timeoutName?????ARGV[1] = waitTimeARGV[2] = lockNameARGV[3] = leaseTime????ARGV[4]?=?currentTime--]]while true dolocal firstThreadId2 = redis.call("lindex", KEYS[2], 0)if firstThreadId2 == false thenbreakendlocal timeout = tonumber(redis.call("zscore", KEYS[3], firstThreadId2))if timeout <= tonumber(ARGV[4]) thenredis.call("zrem", KEYS[3], firstThreadId2)redis.call("lpop", KEYS[2])elsebreakendendif(redis.call("exists", KEYS[1]) == 0) and((redis.call("exists", KEYS[2]) == 0) or (redis.call("lindex", KEYS[2], 0) == ARGV[2]))thenredis.call("lpop", KEYS[2])redis.call("zrem", KEYS[3], ARGV[2]) -- 移除有序集合中的一個或多個成員redis.call("hset", KEYS[1], ARGV[2], 1) -- 將哈希表 key 中的字段 field 的值設為 value 。redis.call("pexpire", KEYS[1], ARGV[1])return nilendif (redis.call("hexists", KEYS[1], ARGV[2]) == 1) then -- HEXISTS key field 查看哈希表 key 中,指定的字段是否存在。redis.call("hincrby", KEYS[1], ARGV[2], 1) -- HINCRBY key field increment 為哈希表 key 中的指定字段的整數(shù)值加上增量 increment 。redis.call("pexpire", KEYS[1], ARGV[1]) -- Redis PEXPIRE 命令和 EXPIRE 命令的作用類似,但是它以毫秒為單位設置 key 的生存時間,而不像 EXPIRE 命令那樣,以秒為單位。return nilendlocal firstThreadId = redis.call("lindex", KEYS[2], 0)local ttlif firstThreadId ~= false and firstThreadId ~= ARGV[2] thenttl = tonumber(redis.call("zscore", KEYS[3], firstThreadId)) - tonumber(ARGV[4])elsettl = redis.call("pttl", KEYS[1]) -- Redis Pttl 命令以毫秒為單位返回 key 的剩余過期時間。endlocal?timeout?=?ttl??+?tonumber(ARGV[3])?+?tonumber(ARGV[4])if redis.call("zadd", KEYS[3], timeout, ARGV[2]) == 1 thenredis.call("rpush", KEYS[2], ARGV[2])endreturn?ttl
再來看解鎖的邏輯。我前面加鎖的一些內容對應著看,重點在于?「publish」這里,在輪到某個線程時,nextThreadId 這個 channel 會收到通知。
這里的 threadId 是我們在加鎖和解鎖的時候都需要傳入的。如果你留意過 Java 的線程 Id 就會發(fā)現(xiàn),不同實例之間有很大概率會重復的。為了避免,各個 Client 在傳入 ThradId 的時候,除了真實的 id 外,還需要加入各個 client 對應的信息加以區(qū)分。
--[[用于 unlock 操作。KEYS[1] = lockNameKEYS[2] = waitQueueNameKEYS[3] = timeoutNameKEYS[4] = channelNameARGV[1] = messageARGV[2] = leaseTimeARGV[3] = lockNameARGV[3] = currentTime--]]while true dolocal firstThreadId2 = redis.call("lindex", KEYS[2], 0)if firstThreadId2 == false thenbreakendlocal timeout = tonumber(redis.call("zscore", KEYS[3], firstThreadId2))if timeout <= tonumber(ARGV[4]) thenredis.call("zrem", KEYS[3], firstThreadId2)redis.call("lpop", KEYS[2])elsebreakendendif (redis.call("exists", KEYS[1]) == 0) thenlocal nextThreadId = redis.call("lindex", KEYS[2], 0)if nextThreadId ~= false thenredis.call("publish", KEYS[4] .. ":" .. nextThreadId, ARGV[1])endreturn 1endif (redis.call("hexists", KEYS[1], ARGV[3]) == 0) thenreturn nilendlocal counter = redis.call("hincrby", KEYS[1], ARGV[3], -1)if (counter > 0) thenredis.call("pexpire", KEYS[1], ARGV[2])return 0endredis.call("del", KEYS[1])local nextThreadId = redis.call("lindex", KEYS[2], 0)if nextThreadId ~= false thenredis.call("publish", KEYS[4] .. ":" .. nextThreadId, ARGV[1])endreturn 1
看過了解鎖邏輯后,外面eval 執(zhí)行加鎖的時候,需要對應有 sub ,才會收到這里解鎖的 pub 信息,否則就卡住了。
這里需要注意下,sub 這個功能是個阻塞操作,需要單獨的線程里執(zhí)行,通過一個 Future 來實現(xiàn)一定等待時間的 sub 功能,超時再 unsub。這塊邏輯 Redission 封裝的比較多,感興趣的可以到源碼里點點。(都不用下載代碼,直接在 GitHub 上 按個. 就行在線閱讀源碼的新方法)
相關閱讀
