Python 中 Redis 庫分布式鎖簡單分析

作者:ayuliao
來源:懶編程
簡介
我們常會遇到某段邏輯在相同時間段內(nèi)只希望被單個實例執(zhí)行,而在微服務(wù)架構(gòu)中,一個程序可能會存在多個實例,此時就需要通過分布式鎖來實現(xiàn)串行執(zhí)行。
最簡單的分布式鎖無非就是找到對于多個程序?qū)嵗詥我坏拇嬖?,比如MySQL數(shù)據(jù)只有一個或Redis只有一個,此時都可以利用這單一的存在構(gòu)建一個鎖,多個程序?qū)嵗獔?zhí)行某段邏輯前必須先獲得這個鎖,然后才能執(zhí)行。
因為某些原因,上班的時候我和同事一起研究了一下Python redis庫中分布式鎖的實現(xiàn)源碼,這里簡單分享一下。
通過pip可以安裝這個庫。
pip?install?redis==2.10.6
這里以這個庫的2.10.6版本為例,對它Redis分布式鎖源碼進(jìn)行簡單的分析。
代碼分析
實例化StrictRedis對象后,使用其中的lock方法便可獲得一個分布式鎖。
首先看一下lock方法對應(yīng)的源碼。
def?lock(self,?name,?timeout=None,?sleep=0.1,?blocking_timeout=None,
?????????????lock_class=None,?thread_local=True):
????????if?lock_class?is?None:
????????????if?self._use_lua_lock?is?None:
????????????????#?the?first?time?.lock()?is?called,?determine?if?we?can?use
????????????????#?Lua?by?attempting?to?register?the?necessary?scripts
????????????????try:
????????????????????LuaLock.register_scripts(self)
????????????????????self._use_lua_lock?=?True
????????????????except?ResponseError:
????????????????????self._use_lua_lock?=?False
????????????lock_class?=?self._use_lua_lock?and?LuaLock?or?Lock
????????return?lock_class(self,?name,?timeout=timeout,?sleep=sleep,
??????????????????????????blocking_timeout=blocking_timeout,
??????????????????????????thread_local=thread_local)
該方法提供了多個參數(shù),其中:
name用于指定鎖名 timeout用于指定鎖的超時時間 sleep用于指定線程睡眠時間,線程爭奪鎖的過程本質(zhì)就是一個循環(huán),每過sleep秒,就會嘗試去獲取鎖對象 blocking_timeout用于指定阻塞超時時間,當(dāng)多個實例爭奪鎖時,這個時間就是實例等待鎖的最長時間 lock_class表示使用鎖的類對象 thread_local表示是否線程安全
方法中最關(guān)鍵的一句代碼為lock_class = self._use_lua_lock and LuaLock or Lock,確定了lock_class后,便實例化該lock_class即可。
lock_class可以為LuaLock也可為Lock,經(jīng)過簡單分析,Lock類才是關(guān)鍵,LuaLock類繼承自Lock,通過Lua代碼實現(xiàn)Redis的一些操作,這里著重看Lock類。
首先看到該類的__init__方法。
class?Lock(object):
????def?__init__(self,?redis,?name,?timeout=None,?sleep=0.1,
?????????????????blocking=True,?blocking_timeout=None,?thread_local=True):
????????self.redis?=?redis
????????self.name?=?name
????????self.timeout?=?timeout
????????self.sleep?=?sleep
????????self.blocking?=?blocking
????????self.blocking_timeout?=?blocking_timeout
????????self.thread_local?=?bool(thread_local)
????????self.local?=?threading.local()?if?self.thread_local?else?dummy()
????????self.local.token?=?None
????????if?self.timeout?and?self.sleep?>?self.timeout:
????????????raise?LockError("'sleep'?must?be?less?than?'timeout'")
__init__方法初始化不同的屬性,其中self.local為線程的本地字段,用于存儲該線程特有的數(shù)據(jù),不與其他線程進(jìn)行共享。
此外,在__init__方法中對timeout與sleep進(jìn)行的判斷,如果線程等待鎖時的睡眠時間大于鎖的超時時間,則直接返回錯誤。
接著重點看Lock類中的acquire方法,該方法代碼如下。
import?time?as?mod_time
class?Lock(object):
????def?acquire(self,?blocking=None,?blocking_timeout=None):
????????sleep?=?self.sleep
????????token?=?b(uuid.uuid1().hex)
????????if?blocking?is?None:
????????????blocking?=?self.blocking
????????if?blocking_timeout?is?None:
????????????blocking_timeout?=?self.blocking_timeout
????????stop_trying_at?=?None
????????if?blocking_timeout?is?not?None:
????????????stop_trying_at?=?mod_time.time()?+?blocking_timeout
????????while?1:
????????????if?self.do_acquire(token):
????????????????self.local.token?=?token
????????????????return?True
????????????if?not?blocking:
????????????????return?False
????????????if?stop_trying_at?is?not?None?and?mod_time.time()?>?stop_trying_at:
????????????????return?False
????????????mod_time.sleep(sleep)
acquire方法的主邏輯就是一個死循環(huán),在死循環(huán)中調(diào)用do_acquire方法獲取Redis分布式鎖,如果成功獲得鎖,則將token存儲到當(dāng)前線程的local對象中,如果沒有獲得,則判斷blocking,如果blocking為Flase,則不再阻塞,直接返回結(jié)果,反之,則判斷當(dāng)前時間是否超過blocking_timeout,超過,同樣返回False,反之,通過sleep方法讓當(dāng)前線程睡眠sleep秒。
進(jìn)一步分析do_acquire方法,代碼如下:
????def?do_acquire(self,?token):
????????if?self.redis.setnx(self.name,?token):
????????????if?self.timeout:
????????????????#?convert?to?milliseconds
????????????????timeout?=?int(self.timeout?*?1000)?#?轉(zhuǎn)成毫秒
????????????????self.redis.pexpire(self.name,?timeout)
????????????return?True
????????return?False
do_acquire方法中,一開始通過redis的setnx方法將name對著作為key,token作為value,setnx方法只有在key不存的情況下,才能正常的將value存入Redis中,若key依存,該方法不做任何操作,此時就相當(dāng)于沒有獲取到鎖。
將token成功插入后,則判斷有無超時時間,如果設(shè)置了timeout,則通過pexpire方法將redis中name這個key的超時設(shè)置一下,因為pexpire方法是以毫秒為單位的,所以需要先將timeout轉(zhuǎn)換成毫秒單位。
如果沒有設(shè)置timeout,那么name這個key只能通過do_release方法中的邏輯清除。
至此,我們清楚的知道了,Redis分布式鎖的本質(zhì)其實就是Redis中的一個key-value,非常簡單...
理清鎖的獲取邏輯后,來看一下相應(yīng)的釋放邏輯,主要關(guān)注release方法,該方法代碼如下。
????def?release(self):
????????"Releases?the?already?acquired?lock"
????????expected_token?=?self.local.token
????????if?expected_token?is?None:
????????????raise?LockError("Cannot?release?an?unlocked?lock")
????????self.local.token?=?None
????????self.do_release(expected_token)
release方法中先將線程中的token取出,并將其置為None,然后調(diào)用do_release方法實現(xiàn)鎖的釋放,do_release方法代碼如下。
????def?do_release(self,?expected_token):
????????name?=?self.name
????????def?execute_release(pipe):
????????????lock_value?=?pipe.get(name)
????????????if?lock_value?!=?expected_token:
????????????????raise?LockError("Cannot?release?a?lock?that's?no?longer?owned")
????????????pipe.delete(name)
????????self.redis.transaction(execute_release,?name)
do_release方法的邏輯非常簡單,其主要邏輯在execute_release方法,通過Redis的transaction方法開啟一個事務(wù)來執(zhí)行execute_release方法中邏輯。
在execute_release中,首先通過get方法獲取name這個key對應(yīng)的value,獲得后,通過delete方法將其刪除,實現(xiàn)Redis分布式鎖的釋放。
blocking屬性
觀察到acquire方法的這段代碼。
????????while?1:
????????????if?self.do_acquire(token):
????????????????self.local.token?=?token
????????????????return?True
????????????if?not?blocking:
????????????????return?False
????????????if?stop_trying_at?is?not?None?and?mod_time.time()?>?stop_trying_at:
????????????????return?False
????????????mod_time.sleep(sleep)
如果blocking為True,獲取不到鎖,則執(zhí)行后面的邏輯,讓線程睡眠,阻塞等待其他線程將鎖釋放;如果blocking為False,獲取不到鎖,則直接返回獲取鎖失敗。
這就會引出幾種情況,假設(shè)現(xiàn)在有線程A與線程B都需要執(zhí)行相同的邏輯,執(zhí)行前需要獲取鎖。
如果線程A在執(zhí)行的過程中,線程B也要執(zhí)行了,如果blocking為True,此時線程B會被阻塞,等待線程A是否Redis鎖;如果blocking為False,線程B此時獲取不到鎖,不執(zhí)行相同的邏輯。
如果線程A執(zhí)行完了,此時線程B到來,如果blocking為True或False,此時線程B都不會被阻塞并成功拿到鎖,執(zhí)行相同的邏輯。
一個簡單的結(jié)論是,blocking無法保證邏輯是否被單次執(zhí)行,如果希望通過Redis分布式鎖讓邏輯只執(zhí)行一次,依舊需要從業(yè)務(wù)層面做控制,比如MySQL中的業(yè)務(wù)數(shù)據(jù)是否被修改或Redis中是否記錄這業(yè)務(wù)數(shù)據(jù)等。
結(jié)尾
現(xiàn)在很多業(yè)務(wù)都離不開Redis,它已經(jīng)成為互聯(lián)網(wǎng)中的基礎(chǔ)設(shè)施了,Redis有很多有趣的內(nèi)容可以跟大家分享。
前段時間看見Redis之父退居二線,說已經(jīng)為Redis工作了10年了,每天都要revice、merge他人的代碼,這種工作讓他沒有創(chuàng)造東西的快樂,所以決定退居二線,將Redis交由社區(qū)運營,這讓我有些感慨,軟件工程是創(chuàng)造性的工作,適當(dāng)?shù)姆趴铡㈤喿x與行業(yè)無關(guān)的書籍其實有助于激發(fā)創(chuàng)造力。
最后感謝你的閱讀,我們下篇文章見。

近期熱門文章推薦:

