<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Python 神器 Celery 源碼解析(4)

          共 7968字,需瀏覽 16分鐘

           ·

          2021-11-11 23:14

          Celery是一款非常簡單、靈活、可靠的分布式系統(tǒng),可用于處理大量消息,并且提供了一整套操作此系統(tǒng)的工具。Celery 也是一款消息隊列工具,可用于處理實時數(shù)據(jù)以及任務(wù)調(diào)度。

          本文是是celery源碼解析的第篇,在前3篇里分別介紹了vine, py-amqp和kombu:

          1. 神器 celery 源碼解析- vine實現(xiàn)Promise功能
          2. 神器 celery 源碼解析- py-amqp實現(xiàn)AMQP協(xié)議
          3. 神器 celery 源碼解析- kombu,一個python實現(xiàn)的消息庫

          本篇我們繼續(xù)聊聊kombu這個python實現(xiàn)的消息庫中的一些常用算法實現(xiàn),和各種排序算法不一樣,都是解決一些具體的業(yè)務(wù)問題,非常有用。本文包括下面幾個部分:

          • LRU緩存淘汰算法
          • 令牌桶限流算法
          • Round-Robin調(diào)度算法
          • LamportClock時間戳算法
          • LaxBoundedSemaphore有限信號量算法

          LRU緩存淘汰算法

          緩存,顧名思義,就是將計算結(jié)果暫時存起來,以供后期使用,這樣可以省去重復(fù)計算的工作。比如我們計算斐波那契數(shù)列的遞歸算法:

          #?根據(jù)定義遞歸求解
          def?fib(n):
          ????if?n?<=?1:?
          ????????return?n
          ????return?fib(n?-?1)?+?fib(n?-?2)

          我們求n為5的數(shù),展開數(shù)學公式大概如下(這里簡化python函數(shù)fib名稱為數(shù)學函數(shù)f):

          f(5)=f(4)????????????????????+f(3)
          ????=f(3)??????????+f(2)?????+f(2)?????+f(1)
          ????=f(2)?????+f(1)+f(1)+f(0)+f(1)+f(0)+f(1)
          ????=f(1)+f(0)+f(1)+f(1)+f(0)+f(1)+f(0)+f(1)
          ????=5

          根據(jù)數(shù)學公式,我們可以知道,在執(zhí)行f(5)過程中,重復(fù)執(zhí)行了5次f(1), 3次f(0)。要提高執(zhí)行效率,就可以用到緩存。最簡單的實現(xiàn)版本:

          #?根據(jù)定義遞歸求解
          cache?=?{}
          def?fib_v1(n):
          ????if?n?in?cache:
          ????????return?cache[n]
          ????if?n?<=?1:?
          ????????result?=?n
          ????else:
          ????????result?=??fib(n?-?1)?+?fib(n?-?2)
          ????cache[n]?=?result
          ????return?result

          這種實現(xiàn)方式有2個弊端,一個是依賴一個外部的cache變量,另一個是cache功能和fib函數(shù)綁定,還需要修改fib函數(shù)。我們可以通過一個裝飾器實現(xiàn)這個cache,而不用改動fib函數(shù):

          def?cache_decorator(fun):
          ????_cache?=?{}

          ????def?wrapper(*args,?**kwargs):
          ????????if?args?in?_cache:
          ????????????return?_cache[args]
          ????????else:
          ????????????ret?=?fun(*args,?**kwargs)
          ????????????_cache[args]?=?ret
          ????????????return?ret

          ????return?wrapper

          使用的時候可以直接給fib函數(shù)添加上裝飾器:

          @cache_decorator
          def?fib(n):
          ????...

          這種緩存實現(xiàn)實現(xiàn)方式,還是會有問題:無法進行清理,內(nèi)存會持續(xù)增長。編程中有一句話是: 命名和緩存失效是計算機科學里面最難應(yīng)對的兩件事。關(guān)于緩存淘汰有各種算法,請見參考鏈接,我這里重點介紹一下LRU和LFU。

          • LRU(Least recently used)最早使用淘汰算法,核心特點是:?最早的數(shù)先淘汰
          • LFU(Least-frequently used)最少使用淘汰算法, 核心特點是:?最少的數(shù)先淘汰

          關(guān)于LRU,在我之前介紹tinydb時候有過介紹。其中的實現(xiàn)如下:

          class?LRUCache(abc.MutableMapping,?Generic[K,?V]):
          ????def?__init__(self,?capacity=None):
          ????????self.capacity?=?capacity??#?緩存容量
          ????????self.cache?=?OrderedDict()??#?有序字典
          ????
          ????def?get(self,?key:?K,?default:?D?=?None)?->?Optional[Union[V,?D]]:
          ????????value?=?self.cache.get(key)??#?從換成獲取
          ????????if?value?is?not?None:
          ????????????del?self.cache[key]
          ????????????self.cache[key]?=?value??#?更新緩存順序
          ????????????return?value
          ????????return?default

          ????def?set(self,?key:?K,?value:?V):
          ????????if?self.cache.get(key):
          ????????????del?self.cache[key]??
          ????????????self.cache[key]?=?value?#?更新緩存順序及值
          ????????else:
          ????????????self.cache[key]?=?value
          ????????????if?self.capacity?is?not?None?and?self.length?>?self.capacity:
          ????????????????self.cache.popitem(last=False)??#?淘汰最古老的數(shù)據(jù)

          LRU的特點只要保持緩存數(shù)據(jù)是有序的, 我們甚至不需要自己實現(xiàn),使用系統(tǒng)functools中的實現(xiàn):

          from?functools?import?lru_cache

          @lru_cache()
          def?fib(n):
          ????...

          kombu中給我們提供了一個線程安全的版本, 主要實現(xiàn):

          #?kombu-5.0.0/kombu/utils/functional.py

          class?LRUCache(UserDict):
          ????"""LRU?Cache?implementation?using?a?doubly?linked?list?to?track?access.
          ????"
          ""
          ????def?__init__(self,?limit=None):
          ????????self.limit?=?limit
          ????????self.mutex?=?threading.RLock()
          ????????self.data?=?OrderedDict()
          ????
          ????def?__getitem__(self,?key):
          ????????with?self.mutex:
          ????????????value?=?self[key]?=?self.data.pop(key)
          ????????????return?value
          ????
          ????def?__setitem__(self,?key,?value):
          ????????#?remove?least?recently?used?key.
          ????????with?self.mutex:
          ????????????if?self.limit?and?len(self.data)?>=?self.limit:
          ????????????????self.data.pop(next(iter(self.data)))
          ????????????self.data[key]?=?value
          ????...

          上面代碼在設(shè)置和獲取數(shù)據(jù)時候都先獲取鎖,然后再進行數(shù)據(jù)操作。

          關(guān)于緩存使用,除了通過業(yè)務(wù)場景判斷適用那種淘汰算法外,還可以使用具體的緩存命中率指標進行分析:

          def?memoize(maxsize=None,?keyfun=None,?Cache=LRUCache):
          ????"""Decorator?to?cache?function?return?value."""
          ????def?_memoize(fun):
          ????????mutex?=?threading.Lock()
          ????????cache?=?Cache(limit=maxsize)

          ????????@wraps(fun)
          ????????def?_M(*args,?**kwargs):
          ????????????if?keyfun:
          ????????????????key?=?keyfun(args,?kwargs)
          ????????????else:
          ????????????????key?=?args?+?(KEYWORD_MARK,)?+?tuple(sorted(kwargs.items()))
          ????????????try:
          ????????????????with?mutex:
          ????????????????????value?=?cache[key]
          ????????????except?KeyError:
          ????????????????value?=?fun(*args,?**kwargs)
          ????????????????#?未命中需要執(zhí)行函數(shù)
          ????????????????_M.misses?+=?1
          ????????????????with?mutex:
          ????????????????????cache[key]?=?value
          ????????????else:
          ????????????????#?命中率增加
          ????????????????_M.hits?+=?1
          ????????????return?value

          ????????def?clear():
          ????????????"""Clear?the?cache?and?reset?cache?statistics."""
          ????????????#?清理緩存及統(tǒng)計
          ????????????cache.clear()
          ????????????_M.hits?=?_M.misses?=?0
          ????????#?統(tǒng)計信息
          ????????_M.hits?=?_M.misses?=?0
          ????????_M.clear?=?clear
          ????????_M.original_func?=?fun
          ????????return?_M

          ????return?_memoize

          memoize的實現(xiàn)并不復(fù)雜,增加了hits/misses數(shù)據(jù),可以統(tǒng)計分析緩存的命中率,幫助正確使用LRU緩存。還添加了clear接口,可以在需要的時候?qū)彺嬷苯舆M行清理。

          注意memoize使用了一個鎖,在LRUCache還是使用了一個鎖,這個鎖的使用,我們以后再講。

          令牌桶限流算法

          限流是指在系統(tǒng)面臨高并發(fā)、大流量請求的情況下,限制新的流量對系統(tǒng)的訪問,從而保證系統(tǒng)服務(wù)的安全性。常用的限流算法有計數(shù)器、漏斗算法和令牌桶算法。其中計數(shù)器算法又分固定窗口算法、滑動窗口算法,后者我們在TCP協(xié)議中經(jīng)常會碰到。

          算法中存在一個令牌桶,以恒定的速率向令牌桶中放入令牌。當請求來時,會首先到令牌桶中去拿令牌,如果拿到了令牌,則該請求會被處理,并消耗掉令牌;如果拿不到令牌,則該請求會被丟棄。當然令牌桶也有一定的容量,如果滿了令牌就無法放進去了,這樣算法就有限流作用。又因為令牌產(chǎn)生的速率是很定的,如果消費速率較低,桶里會額外緩存一部分令牌,用于應(yīng)對流量突發(fā)時候的消耗。下面是算法的示意圖:

          Token bucket Diagram

          我們具體看看kombu中提供的實現(xiàn)。TokenBucket類:

          class?TokenBucket:
          ????#:?The?rate?in?tokens/second?that?the?bucket?will?be?refilled.
          ????fill_rate?=?None

          ????#:?Maximum?number?of?tokens?in?the?bucket.
          ????capacity?=?1

          ????#:?Timestamp?of?the?last?time?a?token?was?taken?out?of?the?bucket.
          ????timestamp?=?None

          ????def?__init__(self,?fill_rate,?capacity=1):
          ????????#?容量上限
          ????????self.capacity?=?float(capacity)
          ????????#?剩余令牌數(shù),初始等于容量上限
          ????????self._tokens?=?capacity
          ????????#?填充率
          ????????self.fill_rate?=?float(fill_rate)
          ????????self.timestamp?=?monotonic()
          ????????#?數(shù)據(jù)容器
          ????????self.contents?=?deque()

          ????def?add(self,?item):
          ????????self.contents.append(item)

          ????def?pop(self):
          ????????#?先進先出
          ????????return?self.contents.popleft()

          代碼包括:

          • 令牌速率fill_rate
          • 桶的容量上限
          • 一個時間戳
          • 剩余令牌數(shù)
          • 算法提供了一個基于雙端隊列的數(shù)據(jù)容器,可以對容器進行先進先出操作

          令牌桶是否可用的判斷:

          def?can_consume(self,?tokens=1):
          ????"""Check?if?one?or?more?tokens?can?be?consumed.

          ????Returns:
          ????????bool:?true?if?the?number?of?tokens?can?be?consumed
          ????????????from?the?bucket.??If?they?can?be?consumed,?a?call?will?also
          ????????????consume?the?requested?number?of?tokens?from?the?bucket.
          ????????????Calls?will?only?consume?`tokens`?(the?number?requested)
          ????????????or?zero?tokens?--?it?will?never?consume?a?partial?number
          ????????????of?tokens.
          ????"
          ""
          ????if?tokens?<=?self._get_tokens():
          ????????#?消費n個令牌
          ????????self._tokens?-=?tokens
          ????????return?True
          ????return?False

          def?_get_tokens(self):
          ????if?self._tokens?????????#?記錄當前時間
          ????????now?=?monotonic()
          ????????#?計算已經(jīng)流失的令牌數(shù)量
          ????????delta?=?self.fill_rate?*?(now?-?self.timestamp)
          ????????#?更新容量上限或者剩余令牌和流失數(shù)量之和
          ????????self._tokens?=?min(self.capacity,?self._tokens?+?delta)
          ????????self.timestamp?=?now
          ????return?self._tokens

          我們可以看到,算法在進行令牌消費判斷的同時,還會對桶的剩余流量進行自校正,很巧妙。

          TokenBucket的使用在ConsumerMixin的run方法中。創(chuàng)建了一個速率為1的令牌桶,然后持續(xù)的進行消費。如果有令牌則消費消費者上的消息;如果沒有令牌則進行休眠

          #ch23-celery/kombu-5.0.0/kombu/mixins.py:240

          class?ConsumerMixin:

          ????def?run(self,?_tokens=1,?**kwargs):
          ????????restart_limit?=?TokenBucket(1)
          ????????...
          ????????#?無限循環(huán)
          ????????while?not?self.should_stop:
          ????????????try:
          ????????????????#?有令牌消費
          ????????????????if?restart_limit.can_consume(_tokens):??#?pragma:?no?cover
          ????????????????????for?_?in?self.consume(limit=None,?**kwargs):
          ????????????????????????pass
          ????????????????else:
          ????????????????????#?沒浪費休眠
          ????????????????????sleep(restart_limit.expected_time(_tokens))
          ????????????except?errors:
          ????????????????...

          其中的休眠時間,是由令牌桶根據(jù)期望值計算得來:

          def?expected_time(self,?tokens=1):
          ????"""Return?estimated?time?of?token?availability.

          ????Returns:
          ????????float:?the?time?in?seconds.
          ????"
          ""
          ????_tokens?=?self._get_tokens()
          ????tokens?=?max(tokens,?_tokens)
          ????return?(tokens?-?_tokens)?/?self.fill_rate

          Round-Robin調(diào)度算法

          Round-Robin調(diào)度算法,最常見的大概是在nginx。Round-Robin方式可讓nginx將請求按順序輪流地分配到后端服務(wù)器上,它均衡地對待后端的每一臺服務(wù)器,而不關(guān)心服務(wù)器實際的連接數(shù)和當前的系統(tǒng)負載,循環(huán)往復(fù)。在kombu中也提供了幾種類似的調(diào)度算法:

          • 輪詢調(diào)度
          • 公平調(diào)度

          我們先看Round-Robin方式:

          class?round_robin_cycle:
          ????"""Iterator?that?cycles?between?items?in?round-robin."""
          ????"""輪詢調(diào)度算法"""

          ????def?__init__(self,?it=None):
          ????????self.items?=?it?if?it?is?not?None?else?[]

          ????def?update(self,?it):
          ????????"""Update?items?from?iterable."""
          ????????"""更新列表"""
          ????????self.items[:]?=?it

          ????def?consume(self,?n):
          ????????"""Consume?n?items."""
          ????????"""消費n個元素"""
          ????????return?self.items[:n]

          ????def?rotate(self,?last_used):
          ????????"""Move?most?recently?used?item?to?end?of?list."""
          ????????"""旋轉(zhuǎn):把最后一個元素放到列表某尾"""
          ????????items?=?self.items
          ????????try:
          ????????????items.append(items.pop(items.index(last_used)))
          ????????except?ValueError:
          ????????????pass
          ????????return?last_used

          算法實現(xiàn)很簡單,就是一個有序隊列,可以每次消費前n個有序元素,并且可以將最近使用的元素旋轉(zhuǎn)到隊尾。下面是旋轉(zhuǎn)的單元測試:

          def?test_round_robin_cycle():
          ????it?=?cycle_by_name('round_robin')(['A',?'B',?'C'])
          ????assert?it.consume(3)?==?['A',?'B',?'C']
          ????it.rotate('B')
          ????assert?it.consume(3)?==?['A',?'C',?'B']
          ????it.rotate('A')
          ????assert?it.consume(3)?==?['C',?'B',?'A']
          ????it.rotate('A')
          ????assert?it.consume(3)?==?['C',?'B',?'A']
          ????it.rotate('C')
          ????assert?it.consume(3)?==?['B',?'A',?'C']

          還有一種公平循環(huán)的調(diào)度算法:

          class?FairCycle:
          ????"""Cycle?between?resources.

          ????Consume?from?a?set?of?resources,?where?each?resource?gets
          ????an?equal?chance?to?be?consumed?from.

          ????Arguments:
          ????????fun?(Callable):?Callback?to?call.
          ????????resources?(Sequence[Any]):?List?of?resources.
          ????????predicate?(type):?Exception?predicate.
          ????"
          ""
          ????
          ????def?__init__(self,?fun,?resources,?predicate=Exception):
          ????????self.fun?=?fun
          ????????self.resources?=?resources
          ????????self.predicate?=?predicate
          ????????#??初始位置
          ????????self.pos?=?0

          FairCycle是一種資源之間公平循環(huán)的調(diào)度算法, 構(gòu)造函數(shù)中:

          • 利用資源的函數(shù)
          • 多個資源的集合

          使用的方式是使用get方法傳入回調(diào):

          def?_next(self):
          ????while?1:
          ????????try:
          ????????????resource?=?self.resources[self.pos]
          ????????????#?位置加1
          ????????????self.pos?+=?1
          ????????????return?resource
          ????????except?IndexError:
          ????????????#?到尾部后,重置位置
          ????????????self.pos?=?0
          ????????????if?not?self.resources:
          ????????????????raise?self.predicate()

          def?get(self,?callback,?**kwargs):
          ????"""Get?from?next?resource."""
          ????#?無限重試
          ????for?tried?in?count(0):??#?for?infinity
          ????????#?獲取資源
          ????????resource?=?self._next()
          ????????try:
          ????????????#?利用資源
          ????????????return?self.fun(resource,?callback,?**kwargs)
          ????????except?self.predicate:
          ????????????#?reraise?when?retries?exchausted.
          ????????????#?容錯上限
          ????????????if?tried?>=?len(self.resources)?-?1:
          ????????????????raise

          調(diào)度主要體現(xiàn)再獲取資源的next函數(shù)上,沒次獲取資源后位置標志進行后移,到尾部后在重置到0,繼續(xù)下一輪循環(huán)。算法還可以對資源進行容錯,也就是如果獲取到的資源無法正常使用,還可以嘗試使用下一個資源進行重試。

          LamportClock算法

          蘭波特時間戳算法(LamportClock),使用邏輯時間戳作為值的版本以允許跨服務(wù)器對值進行排序,是解決分布式系統(tǒng)時間一致的重要算法。

          服務(wù)器上的系統(tǒng)時間,使用物理的晶體振蕩測量,會有不準的情況。我們會經(jīng)常遇到服務(wù)器或者快或者慢的情況,一般使用NTP服務(wù),來和互聯(lián)網(wǎng)上的某個時間源進行同步。如果本地時間提前了,進行聯(lián)網(wǎng)校時后,會出現(xiàn)本地時間倒退的問題。而對于兩臺不同的服務(wù)器上,要進行時間統(tǒng)一,就更不能使用系統(tǒng)時間。

          蘭波特時間戳算法,原理如下:

          • 維護一個數(shù)字來表示時間戳,并且在每個集群節(jié)點都維護一個 Lamport 時鐘的實例。
          • 如果事件在節(jié)點內(nèi)發(fā)生,時間戳加1
          • 事件要發(fā)送到遠端,則在消息總帶上時間戳
          • 接收到遠端的消息,時間戳 = Max(本地時間戳,消息中的時間戳) + 1(進行校正跳躍)

          這個過程,可以看下面的圖示:

          從圖中可以看到下面兩點:

          • 對于每個節(jié)點的事件時間,都是遞增有序的,比如A是[4,5,7,10], B節(jié)點是[2,3,4,6,7], C節(jié)點是[1,5,6,8,9]
          • 時間戳不是全局唯一,不同節(jié)點之間會存在序號重復(fù),比如4號消息在A和B節(jié)點都存在,5號消息在A和C節(jié)點存在

          了解算法的場景和原理后,我們再來看算法的實現(xiàn)。

          class?LamportClock:
          ????#:?The?clocks?current?value.
          ????value?=?0

          ????def?__init__(self,?initial_value=0,?Lock=Lock):
          ????????self.value?=?initial_value
          ????????self.mutex?=?Lock()

          ????def?adjust(self,?other):
          ????????with?self.mutex:
          ????????????value?=?self.value?=?max(self.value,?other)?+?1
          ????????????return?value

          ????def?forward(self):
          ????????with?self.mutex:
          ????????????self.value?+=?1
          ????????????return?self.value

          算法的實現(xiàn)其實非常簡單,就是轉(zhuǎn)發(fā)的時候時間戳+1;收到消息后進行校正,這個過程中使用線程鎖,保證本地的有序。

          LaxBoundedSemaphore有限信號量算法

          前面講的幾種算法,都是基于線程鎖實現(xiàn)。使用鎖會降低效率,如果在協(xié)程中,可以使用無鎖的方案,會更高效。kombu的LaxBoundedSemaphore實現(xiàn),可以作為一種參考。

          我們先看使用示例:

          ?>>>?from?future?import?print_statement?as?printf
          #?^?ignore:?just?fooling?stupid?pyflakes

          >>>?x?=?LaxBoundedSemaphore(2)

          >>>?x.acquire(printf,?'HELLO?1')
          HELLO?1

          >>>?x.acquire(printf,?'HELLO?2')
          HELLO?2

          >>>?x.acquire(printf,?'HELLO?3')
          >>>?x._waiters???#?private,?do?not?access?directly
          [print,?('HELLO?3',)]

          >>>?x.release()
          HELLO?3

          示例展示了幾步:

          1. 創(chuàng)建一個大小為2的LaxBoundedSemaphore信號量
          2. 申請信號,并且執(zhí)行print函數(shù),可以立即執(zhí)行
          3. 繼續(xù)申請信號執(zhí)行print函數(shù),也可以立即執(zhí)行
          4. 再申請信號執(zhí)行print函數(shù),這時候由于信號超標,函數(shù)不會立即執(zhí)行
          5. 手工釋放信號量,最后一次申請的print函數(shù)自動執(zhí)行

          下面是具體的實現(xiàn),LaxBoundedSemaphore的構(gòu)造函數(shù):

          class?LaxBoundedSemaphore:
          ????def?__init__(self,?value):
          ????????#?信號容量
          ????????self.initial_value?=?self.value?=?value
          ????????#?使用雙端隊列,F(xiàn)IFO
          ????????self._waiting?=?deque()
          ????????self._add_waiter?=?self._waiting.append
          ????????self._pop_waiter?=?self._waiting.popleft

          申請執(zhí)行回調(diào)函數(shù),會進行信號判斷,信號充足會執(zhí)行行回調(diào)并消減一次信號量;信號量不足則將函數(shù)及參數(shù)放入代辦的隊列:

          def?acquire(self,?callback,?*partial_args,?**partial_kwargs):
          ????"""Acquire?semaphore.

          ????This?will?immediately?apply?``callback``?if
          ????the?resource?is?available,?otherwise?the?callback?is?suspended
          ????until?the?semaphore?is?released.

          ????Arguments:
          ????????callback?(Callable):?The?callback?to?apply.
          ????????*partial_args?(Any):?partial?arguments?to?callback.
          ????"
          ""
          ????value?=?self.value
          ????if?value?<=?0:
          ????????#?容量不夠的時候先暫存執(zhí)行函數(shù),并不更改可用數(shù)量
          ????????self._add_waiter((callback,?partial_args,?partial_kwargs))
          ????????return?False
          ????else:
          ????????#?可用數(shù)量-1
          ????????self.value?=?max(value?-?1,?0)
          ????????#?直接執(zhí)行函數(shù)
          ????????callback(*partial_args,?**partial_kwargs)
          ????????return?True

          使用release時候會取出頭部的代辦函數(shù),并進行執(zhí)行,此時信號量不增不減。如果代辦全部執(zhí)行完成后,則逐步恢復(fù)信號量到默認值:

          def?release(self):
          ????"""Release?semaphore.

          ????Note:
          ????????If?there?are?any?waiters?this?will?apply?the?first?waiter
          ????????that?is?waiting?for?the?resource?(FIFO?order).
          ????"
          ""
          ????try:
          ????????waiter,?args,?kwargs?=?self._pop_waiter()
          ????except?IndexError:
          ????????#?無緩存則只增加可用數(shù)量
          ????????self.value?=?min(self.value?+?1,?self.initial_value)
          ????else:
          ????????#?有緩存則執(zhí)行第一個緩存,可用數(shù)量不變還是小于0
          ????????waiter(*args,?**kwargs)

          小結(jié)

          本篇文章,我們學習了5種實用的業(yè)務(wù)算法。LRU緩存淘汰算法,可以對緩存中最早的數(shù)據(jù)進行淘汰。令牌桶限流算法,可以協(xié)助進行服務(wù)流量限流,較好的保護后端服務(wù),避免突發(fā)流量的到時的崩潰。Round-Robin調(diào)度算法,可以進行負載的均衡,保障資源的平衡使用。LamportClock時間戳算法,可以在分布式系統(tǒng)中,進行不同服務(wù)之間的有序時間戳同步。LaxBoundedSemaphore有限信號量算法,是一種無鎖算法,可高效的提供資源使用控制。

          小技巧

          kombu中提供了一個自動重試算法,可以作為重試算法的模版:

          #?kombu-5.0.0/kombu/utils/functional.py

          def?retry_over_time(fun,?catch,?args=None,?kwargs=None,?errback=None,
          ????????????????????max_retries=None,?interval_start=2,?interval_step=2,
          ????????????????????interval_max=30,?callback=None,?timeout=None):
          ????kwargs?=?{}?if?not?kwargs?else?kwargs
          ????args?=?[]?if?not?args?else?args
          ????interval_range?=?fxrange(interval_start,
          ?????????????????????????????interval_max?+?interval_start,
          ?????????????????????????????interval_step,?repeatlast=True)
          ????#?超時時間
          ????end?=?time()?+?timeout?if?timeout?else?None
          ????for?retries?in?count():
          ????????try:
          ????????????return?fun(*args,?**kwargs)
          ????????except?catch?as?exc:
          ????????????#?超過次數(shù)
          ????????????if?max_retries?is?not?None?and?retries?>=?max_retries:
          ????????????????raise
          ????????????#?超過時間
          ????????????if?end?and?time()?>?end:
          ????????????????raise
          ????????????...
          ????????????#?休眠
          ????????????sleep(1.0)

          從模版可以看到重試時候使用次數(shù)和超時時間兩個維度進行跳出(不可能無限重試):

          • 使用count()進行無限循環(huán)
          • 使用time()進行超時限定
          • 使用max_retries容錯上限次數(shù)限定
          • 每次錯誤后,都休眠一段時間,給被調(diào)用方機會,提高下一次成功的概率。

          實際上關(guān)于休眠時間,也有一些更復(fù)雜的算法,比如線性遞增之類,這里使用了固定間隔的休眠

          參考鏈接

          • 緩存淘汰算法 https://en.wikipedia.org/wiki/Cache_replacement_policies#Least_recently_used_(LRU)
          • 令牌桶算法 https://en.wikipedia.org/wiki/Token_Bucket
          • what-is-token-bucket-and-leaky-bucket-algorithms https://hansliu.com/posts/2020/11/what-is-token-bucket-and-leaky-bucket-algorithms.html
          • 波特時間戳算法 https://en.wikipedia.org/wiki/Lamport_timestamps
          • lamport面包店算法簡介 https://segmentfault.com/a/1190000011066299
          • https://martinfowler.com/articles/patterns-of-distributed-systems/lamport-clock.html
          Python貓技術(shù)交流群開放啦!群里既有國內(nèi)一二線大廠在職員工,也有國內(nèi)外高校在讀學生,既有十多年碼齡的編程老鳥,也有中小學剛剛?cè)腴T的新人,學習氛圍良好!想入群的同學,請在公號內(nèi)回復(fù)『交流群』,獲取貓哥的微信(謝絕廣告黨,非誠勿擾!)~


          還不過癮?試試它們




          與 Python 之父聊天:更快的 Python!

          如何用 Python 實現(xiàn)分布式計算?

          Python進階:websocket構(gòu)建實時日志跟蹤器

          Python重大變化:是match-case,不是switch-case

          Python 3.9 性能優(yōu)化:更快的 list()、dict() 和 range() 等內(nèi)置類型

          Python到底是強類型語言,還是弱類型語言?


          如果你覺得本文有幫助
          請慷慨分享點贊,感謝啦
          瀏覽 96
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  操逼姐妹双飞 | 无码在线观看播放 | 操逼大鸡吧| 亚洲免费电影黄 | 《精品 模特私拍秘 泄密》学院派 |