Python 神器 Celery 源碼解析(4)

Celery是一款非常簡單、靈活、可靠的分布式系統(tǒng),可用于處理大量消息,并且提供了一整套操作此系統(tǒng)的工具。Celery 也是一款消息隊列工具,可用于處理實時數(shù)據(jù)以及任務(wù)調(diào)度。
本文是是celery源碼解析的第四篇,在前3篇里分別介紹了vine, py-amqp和kombu:
神器 celery 源碼解析- vine實現(xiàn)Promise功能 神器 celery 源碼解析- py-amqp實現(xiàn)AMQP協(xié)議 神器 celery 源碼解析- kombu,一個python實現(xiàn)的消息庫
本篇我們繼續(xù)聊聊kombu這個python實現(xiàn)的消息庫中的一些常用算法實現(xiàn),和各種排序算法不一樣,都是解決一些具體的業(yè)務(wù)問題,非常有用。本文包括下面幾個部分:
LRU緩存淘汰算法 令牌桶限流算法 Round-Robin調(diào)度算法 LamportClock時間戳算法 LaxBoundedSemaphore有限信號量算法
LRU緩存淘汰算法
緩存,顧名思義,就是將計算結(jié)果暫時存起來,以供后期使用,這樣可以省去重復(fù)計算的工作。比如我們計算斐波那契數(shù)列的遞歸算法:
#?根據(jù)定義遞歸求解
def?fib(n):
????if?n?<=?1:?
????????return?n
????return?fib(n?-?1)?+?fib(n?-?2)
我們求n為5的數(shù),展開數(shù)學公式大概如下(這里簡化python函數(shù)fib名稱為數(shù)學函數(shù)f):
f(5)=f(4)????????????????????+f(3)
????=f(3)??????????+f(2)?????+f(2)?????+f(1)
????=f(2)?????+f(1)+f(1)+f(0)+f(1)+f(0)+f(1)
????=f(1)+f(0)+f(1)+f(1)+f(0)+f(1)+f(0)+f(1)
????=5
根據(jù)數(shù)學公式,我們可以知道,在執(zhí)行f(5)過程中,重復(fù)執(zhí)行了5次f(1), 3次f(0)。要提高執(zhí)行效率,就可以用到緩存。最簡單的實現(xiàn)版本:
#?根據(jù)定義遞歸求解
cache?=?{}
def?fib_v1(n):
????if?n?in?cache:
????????return?cache[n]
????if?n?<=?1:?
????????result?=?n
????else:
????????result?=??fib(n?-?1)?+?fib(n?-?2)
????cache[n]?=?result
????return?result
這種實現(xiàn)方式有2個弊端,一個是依賴一個外部的cache變量,另一個是cache功能和fib函數(shù)綁定,還需要修改fib函數(shù)。我們可以通過一個裝飾器實現(xiàn)這個cache,而不用改動fib函數(shù):
def?cache_decorator(fun):
????_cache?=?{}
????def?wrapper(*args,?**kwargs):
????????if?args?in?_cache:
????????????return?_cache[args]
????????else:
????????????ret?=?fun(*args,?**kwargs)
????????????_cache[args]?=?ret
????????????return?ret
????return?wrapper
使用的時候可以直接給fib函數(shù)添加上裝飾器:
@cache_decorator
def?fib(n):
????...
這種緩存實現(xiàn)實現(xiàn)方式,還是會有問題:無法進行清理,內(nèi)存會持續(xù)增長。編程中有一句話是: 命名和緩存失效是計算機科學里面最難應(yīng)對的兩件事。關(guān)于緩存淘汰有各種算法,請見參考鏈接,我這里重點介紹一下LRU和LFU。
LRU(Least recently used)最早使用淘汰算法,核心特點是:?最早的數(shù)先淘汰 LFU(Least-frequently used)最少使用淘汰算法, 核心特點是:?最少的數(shù)先淘汰
關(guān)于LRU,在我之前介紹tinydb時候有過介紹。其中的實現(xiàn)如下:
class?LRUCache(abc.MutableMapping,?Generic[K,?V]):
????def?__init__(self,?capacity=None):
????????self.capacity?=?capacity??#?緩存容量
????????self.cache?=?OrderedDict()??#?有序字典
????
????def?get(self,?key:?K,?default:?D?=?None)?->?Optional[Union[V,?D]]:
????????value?=?self.cache.get(key)??#?從換成獲取
????????if?value?is?not?None:
????????????del?self.cache[key]
????????????self.cache[key]?=?value??#?更新緩存順序
????????????return?value
????????return?default
????def?set(self,?key:?K,?value:?V):
????????if?self.cache.get(key):
????????????del?self.cache[key]??
????????????self.cache[key]?=?value?#?更新緩存順序及值
????????else:
????????????self.cache[key]?=?value
????????????if?self.capacity?is?not?None?and?self.length?>?self.capacity:
????????????????self.cache.popitem(last=False)??#?淘汰最古老的數(shù)據(jù)
LRU的特點只要保持緩存數(shù)據(jù)是有序的, 我們甚至不需要自己實現(xiàn),使用系統(tǒng)functools中的實現(xiàn):
from?functools?import?lru_cache
@lru_cache()
def?fib(n):
????...
kombu中給我們提供了一個線程安全的版本, 主要實現(xiàn):
#?kombu-5.0.0/kombu/utils/functional.py
class?LRUCache(UserDict):
????"""LRU?Cache?implementation?using?a?doubly?linked?list?to?track?access.
????"""
????def?__init__(self,?limit=None):
????????self.limit?=?limit
????????self.mutex?=?threading.RLock()
????????self.data?=?OrderedDict()
????
????def?__getitem__(self,?key):
????????with?self.mutex:
????????????value?=?self[key]?=?self.data.pop(key)
????????????return?value
????
????def?__setitem__(self,?key,?value):
????????#?remove?least?recently?used?key.
????????with?self.mutex:
????????????if?self.limit?and?len(self.data)?>=?self.limit:
????????????????self.data.pop(next(iter(self.data)))
????????????self.data[key]?=?value
????...
上面代碼在設(shè)置和獲取數(shù)據(jù)時候都先獲取鎖,然后再進行數(shù)據(jù)操作。
關(guān)于緩存使用,除了通過業(yè)務(wù)場景判斷適用那種淘汰算法外,還可以使用具體的緩存命中率指標進行分析:
def?memoize(maxsize=None,?keyfun=None,?Cache=LRUCache):
????"""Decorator?to?cache?function?return?value."""
????def?_memoize(fun):
????????mutex?=?threading.Lock()
????????cache?=?Cache(limit=maxsize)
????????@wraps(fun)
????????def?_M(*args,?**kwargs):
????????????if?keyfun:
????????????????key?=?keyfun(args,?kwargs)
????????????else:
????????????????key?=?args?+?(KEYWORD_MARK,)?+?tuple(sorted(kwargs.items()))
????????????try:
????????????????with?mutex:
????????????????????value?=?cache[key]
????????????except?KeyError:
????????????????value?=?fun(*args,?**kwargs)
????????????????#?未命中需要執(zhí)行函數(shù)
????????????????_M.misses?+=?1
????????????????with?mutex:
????????????????????cache[key]?=?value
????????????else:
????????????????#?命中率增加
????????????????_M.hits?+=?1
????????????return?value
????????def?clear():
????????????"""Clear?the?cache?and?reset?cache?statistics."""
????????????#?清理緩存及統(tǒng)計
????????????cache.clear()
????????????_M.hits?=?_M.misses?=?0
????????#?統(tǒng)計信息
????????_M.hits?=?_M.misses?=?0
????????_M.clear?=?clear
????????_M.original_func?=?fun
????????return?_M
????return?_memoize
memoize的實現(xiàn)并不復(fù)雜,增加了hits/misses數(shù)據(jù),可以統(tǒng)計分析緩存的命中率,幫助正確使用LRU緩存。還添加了clear接口,可以在需要的時候?qū)彺嬷苯舆M行清理。
注意memoize使用了一個鎖,在LRUCache還是使用了一個鎖,這個鎖的使用,我們以后再講。
令牌桶限流算法
限流是指在系統(tǒng)面臨高并發(fā)、大流量請求的情況下,限制新的流量對系統(tǒng)的訪問,從而保證系統(tǒng)服務(wù)的安全性。常用的限流算法有計數(shù)器、漏斗算法和令牌桶算法。其中計數(shù)器算法又分固定窗口算法、滑動窗口算法,后者我們在TCP協(xié)議中經(jīng)常會碰到。
算法中存在一個令牌桶,以恒定的速率向令牌桶中放入令牌。當請求來時,會首先到令牌桶中去拿令牌,如果拿到了令牌,則該請求會被處理,并消耗掉令牌;如果拿不到令牌,則該請求會被丟棄。當然令牌桶也有一定的容量,如果滿了令牌就無法放進去了,這樣算法就有限流作用。又因為令牌產(chǎn)生的速率是很定的,如果消費速率較低,桶里會額外緩存一部分令牌,用于應(yīng)對流量突發(fā)時候的消耗。下面是算法的示意圖:

我們具體看看kombu中提供的實現(xiàn)。TokenBucket類:
class?TokenBucket:
????#:?The?rate?in?tokens/second?that?the?bucket?will?be?refilled.
????fill_rate?=?None
????#:?Maximum?number?of?tokens?in?the?bucket.
????capacity?=?1
????#:?Timestamp?of?the?last?time?a?token?was?taken?out?of?the?bucket.
????timestamp?=?None
????def?__init__(self,?fill_rate,?capacity=1):
????????#?容量上限
????????self.capacity?=?float(capacity)
????????#?剩余令牌數(shù),初始等于容量上限
????????self._tokens?=?capacity
????????#?填充率
????????self.fill_rate?=?float(fill_rate)
????????self.timestamp?=?monotonic()
????????#?數(shù)據(jù)容器
????????self.contents?=?deque()
????def?add(self,?item):
????????self.contents.append(item)
????def?pop(self):
????????#?先進先出
????????return?self.contents.popleft()
代碼包括:
令牌速率fill_rate 桶的容量上限 一個時間戳 剩余令牌數(shù) 算法提供了一個基于雙端隊列的數(shù)據(jù)容器,可以對容器進行先進先出操作
令牌桶是否可用的判斷:
def?can_consume(self,?tokens=1):
????"""Check?if?one?or?more?tokens?can?be?consumed.
????Returns:
????????bool:?true?if?the?number?of?tokens?can?be?consumed
????????????from?the?bucket.??If?they?can?be?consumed,?a?call?will?also
????????????consume?the?requested?number?of?tokens?from?the?bucket.
????????????Calls?will?only?consume?`tokens`?(the?number?requested)
????????????or?zero?tokens?--?it?will?never?consume?a?partial?number
????????????of?tokens.
????"""
????if?tokens?<=?self._get_tokens():
????????#?消費n個令牌
????????self._tokens?-=?tokens
????????return?True
????return?False
def?_get_tokens(self):
????if?self._tokens?????????#?記錄當前時間
????????now?=?monotonic()
????????#?計算已經(jīng)流失的令牌數(shù)量
????????delta?=?self.fill_rate?*?(now?-?self.timestamp)
????????#?更新容量上限或者剩余令牌和流失數(shù)量之和
????????self._tokens?=?min(self.capacity,?self._tokens?+?delta)
????????self.timestamp?=?now
????return?self._tokens
我們可以看到,算法在進行令牌消費判斷的同時,還會對桶的剩余流量進行自校正,很巧妙。
TokenBucket的使用在ConsumerMixin的run方法中。創(chuàng)建了一個速率為1的令牌桶,然后持續(xù)的進行消費。如果有令牌則消費消費者上的消息;如果沒有令牌則進行休眠
#ch23-celery/kombu-5.0.0/kombu/mixins.py:240
class?ConsumerMixin:
????def?run(self,?_tokens=1,?**kwargs):
????????restart_limit?=?TokenBucket(1)
????????...
????????#?無限循環(huán)
????????while?not?self.should_stop:
????????????try:
????????????????#?有令牌消費
????????????????if?restart_limit.can_consume(_tokens):??#?pragma:?no?cover
????????????????????for?_?in?self.consume(limit=None,?**kwargs):
????????????????????????pass
????????????????else:
????????????????????#?沒浪費休眠
????????????????????sleep(restart_limit.expected_time(_tokens))
????????????except?errors:
????????????????...
其中的休眠時間,是由令牌桶根據(jù)期望值計算得來:
def?expected_time(self,?tokens=1):
????"""Return?estimated?time?of?token?availability.
????Returns:
????????float:?the?time?in?seconds.
????"""
????_tokens?=?self._get_tokens()
????tokens?=?max(tokens,?_tokens)
????return?(tokens?-?_tokens)?/?self.fill_rate
Round-Robin調(diào)度算法
Round-Robin調(diào)度算法,最常見的大概是在nginx。Round-Robin方式可讓nginx將請求按順序輪流地分配到后端服務(wù)器上,它均衡地對待后端的每一臺服務(wù)器,而不關(guān)心服務(wù)器實際的連接數(shù)和當前的系統(tǒng)負載,循環(huán)往復(fù)。在kombu中也提供了幾種類似的調(diào)度算法:
輪詢調(diào)度 公平調(diào)度
我們先看Round-Robin方式:
class?round_robin_cycle:
????"""Iterator?that?cycles?between?items?in?round-robin."""
????"""輪詢調(diào)度算法"""
????def?__init__(self,?it=None):
????????self.items?=?it?if?it?is?not?None?else?[]
????def?update(self,?it):
????????"""Update?items?from?iterable."""
????????"""更新列表"""
????????self.items[:]?=?it
????def?consume(self,?n):
????????"""Consume?n?items."""
????????"""消費n個元素"""
????????return?self.items[:n]
????def?rotate(self,?last_used):
????????"""Move?most?recently?used?item?to?end?of?list."""
????????"""旋轉(zhuǎn):把最后一個元素放到列表某尾"""
????????items?=?self.items
????????try:
????????????items.append(items.pop(items.index(last_used)))
????????except?ValueError:
????????????pass
????????return?last_used
算法實現(xiàn)很簡單,就是一個有序隊列,可以每次消費前n個有序元素,并且可以將最近使用的元素旋轉(zhuǎn)到隊尾。下面是旋轉(zhuǎn)的單元測試:
def?test_round_robin_cycle():
????it?=?cycle_by_name('round_robin')(['A',?'B',?'C'])
????assert?it.consume(3)?==?['A',?'B',?'C']
????it.rotate('B')
????assert?it.consume(3)?==?['A',?'C',?'B']
????it.rotate('A')
????assert?it.consume(3)?==?['C',?'B',?'A']
????it.rotate('A')
????assert?it.consume(3)?==?['C',?'B',?'A']
????it.rotate('C')
????assert?it.consume(3)?==?['B',?'A',?'C']
還有一種公平循環(huán)的調(diào)度算法:
class?FairCycle:
????"""Cycle?between?resources.
????Consume?from?a?set?of?resources,?where?each?resource?gets
????an?equal?chance?to?be?consumed?from.
????Arguments:
????????fun?(Callable):?Callback?to?call.
????????resources?(Sequence[Any]):?List?of?resources.
????????predicate?(type):?Exception?predicate.
????"""
????
????def?__init__(self,?fun,?resources,?predicate=Exception):
????????self.fun?=?fun
????????self.resources?=?resources
????????self.predicate?=?predicate
????????#??初始位置
????????self.pos?=?0
FairCycle是一種資源之間公平循環(huán)的調(diào)度算法, 構(gòu)造函數(shù)中:
利用資源的函數(shù) 多個資源的集合
使用的方式是使用get方法傳入回調(diào):
def?_next(self):
????while?1:
????????try:
????????????resource?=?self.resources[self.pos]
????????????#?位置加1
????????????self.pos?+=?1
????????????return?resource
????????except?IndexError:
????????????#?到尾部后,重置位置
????????????self.pos?=?0
????????????if?not?self.resources:
????????????????raise?self.predicate()
def?get(self,?callback,?**kwargs):
????"""Get?from?next?resource."""
????#?無限重試
????for?tried?in?count(0):??#?for?infinity
????????#?獲取資源
????????resource?=?self._next()
????????try:
????????????#?利用資源
????????????return?self.fun(resource,?callback,?**kwargs)
????????except?self.predicate:
????????????#?reraise?when?retries?exchausted.
????????????#?容錯上限
????????????if?tried?>=?len(self.resources)?-?1:
????????????????raise
調(diào)度主要體現(xiàn)再獲取資源的next函數(shù)上,沒次獲取資源后位置標志進行后移,到尾部后在重置到0,繼續(xù)下一輪循環(huán)。算法還可以對資源進行容錯,也就是如果獲取到的資源無法正常使用,還可以嘗試使用下一個資源進行重試。
LamportClock算法
蘭波特時間戳算法(LamportClock),使用邏輯時間戳作為值的版本以允許跨服務(wù)器對值進行排序,是解決分布式系統(tǒng)時間一致的重要算法。
服務(wù)器上的系統(tǒng)時間,使用物理的晶體振蕩測量,會有不準的情況。我們會經(jīng)常遇到服務(wù)器或者快或者慢的情況,一般使用NTP服務(wù),來和互聯(lián)網(wǎng)上的某個時間源進行同步。如果本地時間提前了,進行聯(lián)網(wǎng)校時后,會出現(xiàn)本地時間倒退的問題。而對于兩臺不同的服務(wù)器上,要進行時間統(tǒng)一,就更不能使用系統(tǒng)時間。
蘭波特時間戳算法,原理如下:
維護一個數(shù)字來表示時間戳,并且在每個集群節(jié)點都維護一個 Lamport 時鐘的實例。 如果事件在節(jié)點內(nèi)發(fā)生,時間戳加1 事件要發(fā)送到遠端,則在消息總帶上時間戳 接收到遠端的消息,時間戳 = Max(本地時間戳,消息中的時間戳) + 1(進行校正跳躍)
這個過程,可以看下面的圖示:

從圖中可以看到下面兩點:
對于每個節(jié)點的事件時間,都是遞增有序的,比如A是[4,5,7,10], B節(jié)點是[2,3,4,6,7], C節(jié)點是[1,5,6,8,9] 時間戳不是全局唯一,不同節(jié)點之間會存在序號重復(fù),比如4號消息在A和B節(jié)點都存在,5號消息在A和C節(jié)點存在
了解算法的場景和原理后,我們再來看算法的實現(xiàn)。
class?LamportClock:
????#:?The?clocks?current?value.
????value?=?0
????def?__init__(self,?initial_value=0,?Lock=Lock):
????????self.value?=?initial_value
????????self.mutex?=?Lock()
????def?adjust(self,?other):
????????with?self.mutex:
????????????value?=?self.value?=?max(self.value,?other)?+?1
????????????return?value
????def?forward(self):
????????with?self.mutex:
????????????self.value?+=?1
????????????return?self.value
算法的實現(xiàn)其實非常簡單,就是轉(zhuǎn)發(fā)的時候時間戳+1;收到消息后進行校正,這個過程中使用線程鎖,保證本地的有序。
LaxBoundedSemaphore有限信號量算法
前面講的幾種算法,都是基于線程鎖實現(xiàn)。使用鎖會降低效率,如果在協(xié)程中,可以使用無鎖的方案,會更高效。kombu的LaxBoundedSemaphore實現(xiàn),可以作為一種參考。
我們先看使用示例:
?>>>?from?future?import?print_statement?as?printf
#?^?ignore:?just?fooling?stupid?pyflakes
>>>?x?=?LaxBoundedSemaphore(2)
>>>?x.acquire(printf,?'HELLO?1')
HELLO?1
>>>?x.acquire(printf,?'HELLO?2')
HELLO?2
>>>?x.acquire(printf,?'HELLO?3')
>>>?x._waiters???#?private,?do?not?access?directly
[print,?('HELLO?3',)]
>>>?x.release()
HELLO?3
示例展示了幾步:
創(chuàng)建一個大小為2的LaxBoundedSemaphore信號量 申請信號,并且執(zhí)行print函數(shù),可以立即執(zhí)行 繼續(xù)申請信號執(zhí)行print函數(shù),也可以立即執(zhí)行 再申請信號執(zhí)行print函數(shù),這時候由于信號超標,函數(shù)不會立即執(zhí)行 手工釋放信號量,最后一次申請的print函數(shù)自動執(zhí)行
下面是具體的實現(xiàn),LaxBoundedSemaphore的構(gòu)造函數(shù):
class?LaxBoundedSemaphore:
????def?__init__(self,?value):
????????#?信號容量
????????self.initial_value?=?self.value?=?value
????????#?使用雙端隊列,F(xiàn)IFO
????????self._waiting?=?deque()
????????self._add_waiter?=?self._waiting.append
????????self._pop_waiter?=?self._waiting.popleft
申請執(zhí)行回調(diào)函數(shù),會進行信號判斷,信號充足會執(zhí)行行回調(diào)并消減一次信號量;信號量不足則將函數(shù)及參數(shù)放入代辦的隊列:
def?acquire(self,?callback,?*partial_args,?**partial_kwargs):
????"""Acquire?semaphore.
????This?will?immediately?apply?``callback``?if
????the?resource?is?available,?otherwise?the?callback?is?suspended
????until?the?semaphore?is?released.
????Arguments:
????????callback?(Callable):?The?callback?to?apply.
????????*partial_args?(Any):?partial?arguments?to?callback.
????"""
????value?=?self.value
????if?value?<=?0:
????????#?容量不夠的時候先暫存執(zhí)行函數(shù),并不更改可用數(shù)量
????????self._add_waiter((callback,?partial_args,?partial_kwargs))
????????return?False
????else:
????????#?可用數(shù)量-1
????????self.value?=?max(value?-?1,?0)
????????#?直接執(zhí)行函數(shù)
????????callback(*partial_args,?**partial_kwargs)
????????return?True
使用release時候會取出頭部的代辦函數(shù),并進行執(zhí)行,此時信號量不增不減。如果代辦全部執(zhí)行完成后,則逐步恢復(fù)信號量到默認值:
def?release(self):
????"""Release?semaphore.
????Note:
????????If?there?are?any?waiters?this?will?apply?the?first?waiter
????????that?is?waiting?for?the?resource?(FIFO?order).
????"""
????try:
????????waiter,?args,?kwargs?=?self._pop_waiter()
????except?IndexError:
????????#?無緩存則只增加可用數(shù)量
????????self.value?=?min(self.value?+?1,?self.initial_value)
????else:
????????#?有緩存則執(zhí)行第一個緩存,可用數(shù)量不變還是小于0
????????waiter(*args,?**kwargs)
小結(jié)
本篇文章,我們學習了5種實用的業(yè)務(wù)算法。LRU緩存淘汰算法,可以對緩存中最早的數(shù)據(jù)進行淘汰。令牌桶限流算法,可以協(xié)助進行服務(wù)流量限流,較好的保護后端服務(wù),避免突發(fā)流量的到時的崩潰。Round-Robin調(diào)度算法,可以進行負載的均衡,保障資源的平衡使用。LamportClock時間戳算法,可以在分布式系統(tǒng)中,進行不同服務(wù)之間的有序時間戳同步。LaxBoundedSemaphore有限信號量算法,是一種無鎖算法,可高效的提供資源使用控制。
小技巧
kombu中提供了一個自動重試算法,可以作為重試算法的模版:
#?kombu-5.0.0/kombu/utils/functional.py
def?retry_over_time(fun,?catch,?args=None,?kwargs=None,?errback=None,
????????????????????max_retries=None,?interval_start=2,?interval_step=2,
????????????????????interval_max=30,?callback=None,?timeout=None):
????kwargs?=?{}?if?not?kwargs?else?kwargs
????args?=?[]?if?not?args?else?args
????interval_range?=?fxrange(interval_start,
?????????????????????????????interval_max?+?interval_start,
?????????????????????????????interval_step,?repeatlast=True)
????#?超時時間
????end?=?time()?+?timeout?if?timeout?else?None
????for?retries?in?count():
????????try:
????????????return?fun(*args,?**kwargs)
????????except?catch?as?exc:
????????????#?超過次數(shù)
????????????if?max_retries?is?not?None?and?retries?>=?max_retries:
????????????????raise
????????????#?超過時間
????????????if?end?and?time()?>?end:
????????????????raise
????????????...
????????????#?休眠
????????????sleep(1.0)
從模版可以看到重試時候使用次數(shù)和超時時間兩個維度進行跳出(不可能無限重試):
使用count()進行無限循環(huán) 使用time()進行超時限定 使用max_retries容錯上限次數(shù)限定 每次錯誤后,都休眠一段時間,給被調(diào)用方機會,提高下一次成功的概率。
實際上關(guān)于休眠時間,也有一些更復(fù)雜的算法,比如線性遞增之類,這里使用了固定間隔的休眠
參考鏈接
緩存淘汰算法 https://en.wikipedia.org/wiki/Cache_replacement_policies#Least_recently_used_(LRU) 令牌桶算法 https://en.wikipedia.org/wiki/Token_Bucket what-is-token-bucket-and-leaky-bucket-algorithms https://hansliu.com/posts/2020/11/what-is-token-bucket-and-leaky-bucket-algorithms.html 波特時間戳算法 https://en.wikipedia.org/wiki/Lamport_timestamps lamport面包店算法簡介 https://segmentfault.com/a/1190000011066299 https://martinfowler.com/articles/patterns-of-distributed-systems/lamport-clock.html

還不過癮?試試它們
▲Python進階:websocket構(gòu)建實時日志跟蹤器
▲Python重大變化:是match-case,不是switch-case
▲Python 3.9 性能優(yōu)化:更快的 list()、dict() 和 range() 等內(nèi)置類型
