C# Redis分布式鎖 - 單節(jié)點

為什么要用分布式鎖?
先上一張截圖,這是在瀏覽別人的博客時看到的.

?
在了解為什么要用分布式鎖之前,我們應該知道到底什么是分布式鎖.
鎖按照不同的維度,有多種分類.比如
1.悲觀鎖,樂觀鎖;
2.公平鎖,非公平鎖;
3.獨享鎖,共享鎖;
4.線程鎖,進程鎖;
等等.
我們平時用的鎖,比如 lock,它是線程鎖,主要用來給方法,代碼塊加鎖.由于進程的內(nèi)存單元是被其所有線程共享的,所以線程鎖控制的實際是多個線程對同一塊內(nèi)存區(qū)域的訪問.
有線程鎖,就必然有進程鎖.顧名思義,進程鎖的目的是控制多個進程對共享資源的訪問.因為進程之間彼此獨立,各個進程是無法控制其他進程對資源的訪問,所以只能通過操作系統(tǒng)來控制.比如?Mutex.
但是進程鎖有一個前提,那就是需要多個進程在同一個系統(tǒng)中,如果多個進程不在同一個系統(tǒng),那就只能使用分布式鎖來控制了.

分布式鎖是控制分布式系統(tǒng)中不同系統(tǒng)之間訪問共享資源的一種鎖實現(xiàn).它和線程鎖,進程鎖的作用都是一樣,只是范圍不一樣.
所以要實現(xiàn)分布式鎖,就必須依靠第三方存儲介質來存儲鎖的信息.因為各個進程之間彼此誰都不服誰,只能找一個帶頭大哥咯;
?
以下示例需引用NUGET: CSRedisCore
示例一
CSRedisClient redisClient = new CSRedis.CSRedisClient("127.0.0.1:6379,defaultDatabase=0");var lockKey = "lockKey";var stock = 5;//商品庫存var taskCount = 10;//線程數(shù)量redisClient.Del(lockKey);//測試前,先把鎖刪了.for (int i = 0; i < taskCount; i++){Task.Run(() =>{//獲取鎖do{//setnx : key不存在才會成功,存在則失敗.var success = redisClient.SetNx(lockKey, 1);if (success == true){break;}Thread.Sleep(TimeSpan.FromSeconds(1));//休息1秒再嘗試獲取鎖} while (true);Console.WriteLine($"線程:{Task.CurrentId} 拿到了鎖,開始消費");if (stock <= 0){Console.WriteLine($"庫存不足,線程:{Task.CurrentId} 搶購失敗!");redisClient.Del(lockKey);return;}stock--;//模擬處理業(yè)務Thread.Sleep(TimeSpan.FromSeconds(new Random().Next(1, 3)));Console.WriteLine($"線程:{Task.CurrentId} 消費完畢!剩余 {stock} 個");//業(yè)務處理完后,釋放鎖.redisClient.Del(lockKey);});}
運行結果:

?
看起來貌似沒毛病,實際上上述代碼有個致命的問題:
當某個線程拿到鎖之后,如果系統(tǒng)崩潰了,那么鎖永遠都不會被釋放.因此,我們應該給鎖加一個過期時間,當時間到了,還沒有被主動釋放,我們就讓redis釋放掉它,以保證其他消費者可以拿到鎖,進行消費.
這里給鎖加過期時間也有講究,不能拿到鎖后再加,比如:
//setnx : key不存在才會成功,存在則失敗.var success = redisClient.SetNx(lockKey, 1);if (success == true){redisClient.Set(lockKey, 1, expireSeconds: 5);break;}
這樣操作的話,獲取鎖和設置鎖的過期時間就不是原子操作,同樣會出現(xiàn)上面提到的問題.Redis 提供了一個合而為一的操作可以解決這個問題.
//set : key存在則失敗,不存在才會成功,并且過期時間5秒var success = redisClient.Set(lockKey, 1, expireSeconds: 5, exists: RedisExistence.Nx);
這個問題雖然解決了,但隨之產(chǎn)生了一個新的問題:
假設有3個線程A,B,C
當線程A拿到鎖后執(zhí)行業(yè)務的時候超時了,超過了鎖的過期時間還沒執(zhí)行完,這時候鎖被Redis釋放了,
于是線程B拿到了鎖并開始執(zhí)行業(yè)務邏輯.
當線程B的業(yè)務邏輯還沒執(zhí)行完的時候,線程A的業(yè)務邏輯執(zhí)行完了,于是乎就跑去釋放掉了鎖.
這時候線程C就可以拿到鎖開始執(zhí)行它的業(yè)務邏輯.
這不就亂套了么...

因此,線程在釋放鎖的時候應該判斷這個鎖還屬不屬于自己.
所以,在設置鎖的時候,redis的value值不能像上面代碼那樣,隨便給個1,而應該給一個隨機值,代表當前線程.
var id = Guid.NewGuid().ToString("N");//獲取鎖do{//set : key存在則失敗,不存在才會成功,并且過期時間5秒var success = redisClient.Set(lockKey, id, expireSeconds: 5, exists: RedisExistence.Nx);if (success == true){break;}Thread.Sleep(TimeSpan.FromSeconds(1));//休息1秒再嘗試獲取鎖} while (true);Console.WriteLine($"線程:{Task.CurrentId} 拿到了鎖,開始消費");.........//業(yè)務處理完后,釋放鎖.var value = redisClient.Get<string>(lockKey);if (value == id){redisClient.Del(lockKey);}
完美了嗎?
不完美.還是老生常談的問題,取value和刪除key 分了兩步走,不是原子操作.

并且,這里還不能用pipe,因為需要根據(jù)取到的value來決定下一個操作.上面設置過期時間倒是可以用pipe.
所以,這里只能用lua.
2020.10.09 補:將庫存放到redis
完整的代碼如下:
CSRedisClient redisClient = new CSRedis.CSRedisClient("127.0.0.1:6379,defaultDatabase=0");var lockKey = "lockKey";var stockKey = "stock";redisClient.Set(stockKey, 5);//商品庫存var releaseLockScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";//釋放鎖的redis腳本redisClient.Del(lockKey);//測試前,先把鎖刪了.Parallel.For(0, 10, i =>{var id = Guid.NewGuid().ToString("N");//獲取鎖do{//set : key存在則失敗,不存在才會成功,并且過期時間5秒var success = redisClient.Set(lockKey, id, expireSeconds: 5, exists: RedisExistence.Nx);if (success == true){break;}Thread.Sleep(TimeSpan.FromSeconds(1));//休息1秒再嘗試獲取鎖} while (true);Console.WriteLine($"線程:{Task.CurrentId} 拿到了鎖,開始消費");//扣減庫存var currentStock = redisClient.IncrBy(stockKey, -1);if (currentStock < 0){Console.WriteLine($"庫存不足,線程:{Task.CurrentId} 搶購失敗!");redisClient.Eval(releaseLockScript, lockKey, id);return;}//模擬處理業(yè)務,這里不考慮失敗的情況Thread.Sleep(TimeSpan.FromSeconds(new Random().Next(1, 3)));Console.WriteLine($"線程:{Task.CurrentId} 消費完畢!剩余 {currentStock} 個");//業(yè)務處理完后,釋放鎖.redisClient.Eval(releaseLockScript, lockKey, id);});
這篇文章只介紹了單節(jié)點Redis的分布式鎖,因為單節(jié)點,所以不是高可用.
多節(jié)點Redis則需要用官方介紹的RedLock,這玩意有點繞,我需要捋一捋.?

