分布式鎖(Redisson)-從零開始,深入理解與不斷優(yōu)化
作者:大程子的技術(shù)成長路
鏈接:https://www.jianshu.com/p/bc4ff4694cf3
分布式鎖場景
互聯(lián)網(wǎng)秒殺
搶優(yōu)惠卷
接口冪等性校驗
案例1
如下代碼模擬了下單減庫存的場景,我們分析下在高并發(fā)場景下會存在什么問題
package com.wangcp.redisson;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class IndexController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
/**
* 模擬下單減庫存的場景
* @return
*/
@RequestMapping(value = "/duduct_stock")
public String deductStock(){
// 從redis 中拿當前庫存的值
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if(stock > 0){
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock",realStock + "");
System.out.println("扣減成功,剩余庫存:" + realStock);
}else{
System.out.println("扣減失敗,庫存不足");
}
return "end";
}
}
假設(shè)在redis中庫存(stock)初始值是100。
現(xiàn)在有5個客戶端同時請求該接口,可能就會存在同時執(zhí)行
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
案例2-使用synchronized 實現(xiàn)單機鎖
在遇到案例1的問題后,大部分人的第一反應(yīng)都會想到加鎖來控制事務(wù)的原子性,如下代碼所示:
@RequestMapping(value = "/duduct_stock")
public String deductStock(){
synchronized (this){
// 從redis 中拿當前庫存的值
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if(stock > 0){
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock",realStock + "");
System.out.println("扣減成功,剩余庫存:" + realStock);
}else{
System.out.println("扣減失敗,庫存不足");
}
}
return "end";
}
現(xiàn)在當有多個請求訪問該接口時,同一時刻只有一個請求可進入方法體中進行庫存的扣減,其余請求等候。
但我們都知道,synchronized 鎖是屬于JVM級別的,也就是我們俗稱的“單機鎖”。但現(xiàn)在基本大部分公司使用的都是集群部署,現(xiàn)在我們思考下以上代碼在集群部署的情況下還能保證庫存數(shù)據(jù)的一致性嗎?

答案是不能,如上圖所示,請求經(jīng)Nginx分發(fā)后,可能存在多個服務(wù)同時從Redis中獲取庫存數(shù)據(jù),此時只加synchronized (單機鎖)是無效的,并發(fā)越高,出現(xiàn)問題的幾率就越大。
案例3-使用SETNX實現(xiàn)分布式鎖
setnx:將 key 的值設(shè)為 value,當且僅當 key 不存在。
若給定 key 已經(jīng)存在,則 setnx 不做任何動作。
/**
* 模擬下單減庫存的場景
* @return
*/
@RequestMapping(value = "/duduct_stock")
public String deductStock(){
String lockKey = "product_001";
// 使用 setnx 添加分布式鎖
// 返回 true 代表之前redis中沒有key為 lockKey 的值,并已進行成功設(shè)置
// 返回 false 代表之前redis中已經(jīng)存在 lockKey 這個key了
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "wangcp");
if(!result){
// 代表已經(jīng)加鎖了
return "error_code";
}
// 從redis 中拿當前庫存的值
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if(stock > 0){
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock",realStock + "");
System.out.println("扣減成功,剩余庫存:" + realStock);
}else{
System.out.println("扣減失敗,庫存不足");
}
// 釋放鎖
stringRedisTemplate.delete(lockKey);
return "end";
}
我們知道 Redis 是單線程執(zhí)行,現(xiàn)在再看案例2中的流程圖時,哪怕高并發(fā)場景下多個請求都執(zhí)行到了setnx的代碼,redis會根據(jù)請求的先后順序進行排列,只有排列在隊頭的請求才能設(shè)置成功。其它請求只能返回“error_code”。
當setnx設(shè)置成功后,可執(zhí)行業(yè)務(wù)代碼對庫存扣減,執(zhí)行完成后對鎖進行釋放。
死鎖:假如第一個請求在setnx加鎖完成后,執(zhí)行業(yè)務(wù)代碼時出現(xiàn)了異常,那釋放鎖的代碼就無法執(zhí)行,后面所有的請求也都無法進行操作了。
針對死鎖的問題,我們對代碼再次進行優(yōu)化,添加try-finally,在finally中添加釋放鎖代碼,這樣無論如何都會執(zhí)行釋放鎖代碼,如下所示:
/**
* 模擬下單減庫存的場景
* @return
*/
@RequestMapping(value = "/duduct_stock")
public String deductStock(){
String lockKey = "product_001";
try{
// 使用 setnx 添加分布式鎖
// 返回 true 代表之前redis中沒有key為 lockKey 的值,并已進行成功設(shè)置
// 返回 false 代表之前redis中已經(jīng)存在 lockKey 這個key了
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "wangcp");
if(!result){
// 代表已經(jīng)加鎖了
return "error_code";
}
// 從redis 中拿當前庫存的值
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if(stock > 0){
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock",realStock + "");
System.out.println("扣減成功,剩余庫存:" + realStock);
}else{
System.out.println("扣減失敗,庫存不足");
}
}finally {
// 釋放鎖
stringRedisTemplate.delete(lockKey);
}
return "end";
}
案例4-加入過期時間
針對想到的問題,對代碼再次進行優(yōu)化,加入過期時間,這樣即便出現(xiàn)了上述的問題,在時間到期后鎖也會自動釋放掉,不會出現(xiàn)“死鎖”的情況。
@RequestMapping(value = "/duduct_stock")
public String deductStock(){
String lockKey = "product_001";
try{
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"wangcp",10,TimeUnit.SECONDS);
if(!result){
// 代表已經(jīng)加鎖了
return "error_code";
}
// 從redis 中拿當前庫存的值
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if(stock > 0){
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock",realStock + "");
System.out.println("扣減成功,剩余庫存:" + realStock);
}else{
System.out.println("扣減失敗,庫存不足");
}
}finally {
// 釋放鎖
stringRedisTemplate.delete(lockKey);
}
return "end";
}
現(xiàn)在我們再思考一下,給鎖加入過期時間后就可以了嗎?就可以完美運行不出問題了嗎?
超時時間設(shè)置的10s真的合適嗎?如果不合適設(shè)置多少秒合適呢?如下圖所示

假設(shè)同一時間有三個請求。
請求1首先加鎖后需執(zhí)行15秒,但在執(zhí)行到10秒時鎖失效釋放。
請求2進入后加鎖執(zhí)行,在請求2執(zhí)行到5秒時,請求1執(zhí)行完成進行鎖釋放,但此時釋放掉的是請求2的鎖。
請求3在請求2執(zhí)行5秒時開始執(zhí)行,但在執(zhí)行到3秒時請求2執(zhí)行完成將請求3的鎖進行釋放。
我們現(xiàn)在只是模擬3個請求便可看出問題,如果在真正高并發(fā)的場景下,可能鎖就會面臨“一直失效”或“永久失效”。
那么具體問題出在哪里呢?總結(jié)為以下幾點:
1.存在請求釋放鎖時釋放掉的并不是自己的鎖
2.超時時間過短,存在代碼未執(zhí)行完便自動釋放
針對問題我們思考對應(yīng)的解決方法:
案例5-Redisson分布式鎖
引入依賴
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.6.5</version>
</dependency>
初始化客戶端
@Bean
public RedissonClient redisson(){
// 單機模式
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.3.170:6379").setDatabase(0);
return Redisson.create(config);
}
Redisson實現(xiàn)分布式鎖
package com.wangcp.redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class IndexController {
@Autowired
private RedissonClient redisson;
@Autowired
private StringRedisTemplate stringRedisTemplate;
/**
* 模擬下單減庫存的場景
* @return
*/
@RequestMapping(value = "/duduct_stock")
public String deductStock(){
String lockKey = "product_001";
// 1.獲取鎖對象
RLock redissonLock = redisson.getLock(lockKey);
try{
// 2.加鎖
redissonLock.lock(); // 等價于 setIfAbsent(lockKey,"wangcp",10,TimeUnit.SECONDS);
// 從redis 中拿當前庫存的值
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if(stock > 0){
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock",realStock + "");
System.out.println("扣減成功,剩余庫存:" + realStock);
}else{
System.out.println("扣減失敗,庫存不足");
}
}finally {
// 3.釋放鎖
redissonLock.unlock();
}
return "end";
}
}
Redisson 分布式鎖實現(xiàn)原理圖

Redisson 底層源碼分析
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
沒錯,加鎖最終執(zhí)行的就是這段 lua 腳本語言。
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
腳本的主要邏輯為:
exists 判斷 key 是否存在
當判斷不存在則設(shè)置 key
然后給設(shè)置的key追加過期時間
這樣來看其實和我們前面案例中的實現(xiàn)方法好像沒什么區(qū)別,但實際上并不是。
這段lua腳本命令在Redis中執(zhí)行時,會被當成一條命令來執(zhí)行,能夠保證原子性,故要不都成功,要不都失敗。
我們在源碼中看到Redssion的許多方法實現(xiàn)中很多都用到了lua腳本,這樣能夠極大的保證命令執(zhí)行的原子性。
Redisson鎖自動“續(xù)命”源碼
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
這段代碼是在加鎖后開啟一個守護線程進行監(jiān)聽。Redisson超時時間默認設(shè)置30s,線程每10s調(diào)用一次判斷鎖還是否存在,如果存在則延長鎖的超時時間。
現(xiàn)在,我們再回過頭來看看案例5中的加鎖代碼與原理圖,其實完善到這種程度已經(jīng)可以滿足很多公司的使用了,并且很多公司也確實是這樣用的。但我們再思考下是否還存在問題呢?例如以下場景:
眾所周知 Redis 在實際部署使用時都是集群部署的,那在高并發(fā)場景下我們加鎖,當把key寫入到master節(jié)點后,master還未同步到slave節(jié)點時master宕機了,原有的slave節(jié)點經(jīng)過選舉變?yōu)榱诵碌膍aster節(jié)點,此時可能就會出現(xiàn)鎖失效問題。
通過分布式鎖的實現(xiàn)機制我們知道,高并發(fā)場景下只有加鎖成功的請求可以繼續(xù)處理業(yè)務(wù)邏輯。那就出現(xiàn)了大伙都來加鎖,但有且僅有一個加鎖成功了,剩余的都在等待。其實分布式鎖與高并發(fā)在語義上就是相違背的,我們的請求雖然都是并發(fā),但Redis幫我們把請求進行了排隊執(zhí)行,也就是把我們的并行轉(zhuǎn)為了串行。串行執(zhí)行的代碼肯定不存在并發(fā)問題了,但是程序的性能肯定也會因此受到影響。
針對這些問題,我們再次思考解決方案
正文結(jié)束
1.不認命,從10年流水線工人,到谷歌上班的程序媛,一位湖南妹子的勵志故事
3.從零開始搭建創(chuàng)業(yè)公司后臺技術(shù)棧
5.37歲程序員被裁,120天沒找到工作,無奈去小公司,結(jié)果懵了...
6.IntelliJ IDEA 2019.3 首個最新訪問版本發(fā)布,新特性搶先看
一個人學(xué)習、工作很迷茫?
點擊「閱讀原文」加入我們的小圈子!

