<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Python 神器 Celery 源碼閱讀(1)

          共 23098字,需瀏覽 47分鐘

           ·

          2021-09-28 11:27

          花下貓語(yǔ):今天分享的是@肖恩同學(xué)的解讀源碼系列,主角是大名鼎鼎的Celery。肖同學(xué)的系列文章在公眾號(hào)平臺(tái)里很罕見(jiàn),他還曾分享過(guò)Flask、Werkzeug、SQLAlchemy、Requests、Gunicorn等知名庫(kù)的源碼解讀。如果你想在技術(shù)上更進(jìn)一步的話,這些內(nèi)容是不錯(cuò)的閱讀材料。因此,我誠(chéng)心推薦你關(guān)注【游戲不存在】!

          Celery是一款非常簡(jiǎn)單、靈活、可靠的分布式系統(tǒng),可用于處理大量消息,并且提供了一整套操作此系統(tǒng)的工具。Celery 是一款消息隊(duì)列工具,可用于處理實(shí)時(shí)數(shù)據(jù)以及任務(wù)調(diào)度。

          Celery在github上有18k的star和4.1k的fork,非常受歡迎;截止當(dāng)前總共發(fā)布了216個(gè)版本,最近一個(gè)版本是19天前,更新非?;钴S。如果你需要一個(gè)python實(shí)現(xiàn)的任務(wù)調(diào)度框架,首推就是它了。

          從這周開(kāi)始,我們一起閱讀celery的源碼,學(xué)習(xí)如何使用celery,了解分布式任務(wù)調(diào)度框架是如何構(gòu)建,深入celery的實(shí)現(xiàn)細(xì)節(jié)。celery代碼量比較大,預(yù)計(jì)需要3~4周的時(shí)間吧。話不多說(shuō),一起開(kāi)始,本周內(nèi)容主要包括下面幾個(gè)部分:

          • 任務(wù)應(yīng)用場(chǎng)景
          • celery的項(xiàng)目結(jié)構(gòu)
          • promise庫(kù)的實(shí)現(xiàn)
          • 小結(jié)
          • 小技巧

          另外非常重要的一點(diǎn)是,考慮到之前的文章,在源碼和解讀上做的不夠好,這次做一個(gè)小小的改進(jìn)。就是我會(huì)把項(xiàng)目源碼增加我自己的注釋?zhuān)蟼鞯絞ithub上。這樣想了解細(xì)節(jié)實(shí)現(xiàn)的小伙伴請(qǐng)使用【閱讀原文】。我的源碼閱讀項(xiàng)目 yuanmahui,也歡迎大家在github上點(diǎn)贊支持?。

          celery應(yīng)用場(chǎng)景

          我們有這樣一個(gè)flask實(shí)現(xiàn)的web服務(wù),其中hello接口,需要進(jìn)行外部調(diào)用(比如發(fā)送發(fā)送短信驗(yàn)證碼之類(lèi))。這里我們使用 time.sleep 模擬這個(gè)耗時(shí)的操作:

          import time

          from flask import Flask

          app = Flask(__name__)


          @app.route('/')
          def hello():
              time.sleep(1)  # 模擬一下耗時(shí)操作
              return 'Hello, World!'

          可以使用ab來(lái)驗(yàn)證這個(gè)接口的耗時(shí)情況:

          # ab -n 10 -c 5 http://127.0.0.1:5000/
          ...
          Requests per second:    3.30 [#/sec] (mean)
          Time per request:       1513.100 [ms] (mean)
          Time per request:       302.620 [ms] (mean, across all concurrent requests)
          Transfer rate:          0.54 [Kbytes/sec] received

          Connection Times (ms)
                        min  mean[+/-sd] median   max
          Connect:        0    0   0.1      0       0
          Processing:  1007 1008   1.2   1008    1010
          Waiting:     1005 1007   1.0   1007    1008
          Total:       1007 1008   1.2   1009    1010
          ...

          測(cè)試顯示這個(gè)接口的平均耗時(shí)需要1.5秒左右,響應(yīng)緩慢。同時(shí),這樣的接口還會(huì)導(dǎo)致前端頁(yè)面的卡頓。要解決這個(gè)問(wèn)題,就可以使用任務(wù)調(diào)度的方式,把這個(gè)耗時(shí)操作轉(zhuǎn)換成背景任務(wù),同時(shí)及時(shí)返回http響應(yīng), 調(diào)整方法如下:

          ....

          @app.route('/')
          def hello():
              # time.sleep(1)
              do_task(id(request))  # 啟動(dòng)背景任務(wù)
              return 'Hello, World!'


          def do_task(index):
              t = threading.Thread(target=lambda idx: time.sleep(1), args=(index,))
              t.start()

          do_task中新開(kāi)了一個(gè)任務(wù)線程去執(zhí)行這個(gè) time.sleep 操作,當(dāng)前的線程在啟動(dòng)任務(wù)線程后立即返回。再次使用ab對(duì)接口耗時(shí)進(jìn)行測(cè)試:

          ...
          Time per request:       6.304 [ms] (mean)
          ...

          可以發(fā)現(xiàn)使用任務(wù)方式后,hello接口的響應(yīng)效率有了巨大的提升。當(dāng)然這個(gè)簡(jiǎn)單的任務(wù)調(diào)度還有2個(gè)問(wèn)題:

          1. 任務(wù)執(zhí)行的結(jié)果沒(méi)法返回給前端
          2. 任務(wù)和web服務(wù)在一個(gè)進(jìn)程里執(zhí)行,效率不會(huì)太高 Celery的分布式任務(wù)調(diào)度就可以比較好的解決這2個(gè)問(wèn)題。

          celery的項(xiàng)目結(jié)構(gòu)

          celery我們選用 5.0.5 版本,首先requirements/default.txt文件描述主要依賴(lài)下面幾個(gè)庫(kù):

          • billiard celery項(xiàng)目提供的一個(gè)多進(jìn)程池的實(shí)現(xiàn)
          • kombu celery項(xiàng)目提供的一個(gè)消息庫(kù),可以對(duì)接不同的消息隊(duì)列,比如RabbitMQ,Redis
          • vine celery項(xiàng)目提供的一個(gè)promise實(shí)現(xiàn),可以處理任務(wù)的組合和pipline等

          celery支持下面3種工作模式:

          • beat 使用定時(shí)心跳的方式啟動(dòng)
          • multi 使用集群方式啟動(dòng),會(huì)形成多個(gè)工作進(jìn)程
          • worker 普通的工作進(jìn)程方式啟動(dòng)

          celery任務(wù)執(zhí)行的結(jié)果也支持多種存儲(chǔ)方式:

          • Mongo
          • Redis
          • Elasticsearch
          • ...

          celery的并發(fā)也支持多種實(shí)現(xiàn)

          • 多進(jìn)程的fork
          • gevent
          • 多線程
          • eventlet
          • ...

          celery支持工作流

          • 可以根據(jù)函數(shù)簽名進(jìn)行調(diào)度
          • 可以支持鏈?zhǔn)饺蝿?wù)
          • 可以支持分組,和弦...

          celery的項(xiàng)目結(jié)構(gòu)就簡(jiǎn)單介紹這些,后續(xù)章節(jié)再進(jìn)行詳細(xì)介紹。

          promise庫(kù)的實(shí)現(xiàn)

          promise在異步任務(wù)中非常重要,所以celery有個(gè)vine項(xiàng)目實(shí)現(xiàn)了promise功能,在開(kāi)始celery之前,我們先掃清這些外圍障礙。

          promise 簡(jiǎn)介

          Promise是一個(gè)對(duì)象,它代表了一個(gè)異步操作的最終完成或者失敗。我覺(jué)得MDN中的介紹非常好,我們先了解它,再對(duì)比看看Python中如何實(shí)現(xiàn)它。比如有一個(gè)創(chuàng)建音頻文件的操作,成功和失敗的時(shí)候使用不同的輸出:

          // 成功的回調(diào)函數(shù)
          function successCallback(result) {
            console.log("音頻文件創(chuàng)建成功: " + result);
          }

          // 失敗的回調(diào)函數(shù)
          function failureCallback(error) {
            console.log("音頻文件創(chuàng)建失敗: " + error);
          }

          createAudioFileAsync(audioSettings, successCallback, failureCallback)

          傳統(tǒng)的方式就是使用callback方式調(diào)用,把正確和錯(cuò)誤的回調(diào)傳入執(zhí)行函數(shù)createAudioFileAsync中。如果使用Promise方式就會(huì)變成:

          # 創(chuàng)建一個(gè)Promise對(duì)象
          const promise = createAudioFileAsync(audioSettings);
          # 執(zhí)行這個(gè)Promise對(duì)象
          promise.then(successCallback, failureCallback);

          雖然上面代碼使用的是JavaScript,我相信熟悉python的你也可以正確理解。就上面的例子,還不容易看出Promise的優(yōu)點(diǎn)。繼續(xù)看下面的例子:

          doSomething(function(result) {
            doSomethingElse(result, function(newResult) {
              doThirdThing(newResult, function(finalResult) {
                console.log('Got the final result: ' + finalResult);
              }, failureCallback);
            }, failureCallback);
          }, failureCallback);

          這里是一個(gè)多重回調(diào)的實(shí)現(xiàn),先處理doSomething,收到結(jié)果后再處理doSomethingElse,最后再執(zhí)行doThirdThing。僅僅3層回調(diào)還可以接收,如果回調(diào)多了以后就形成回調(diào)地獄,代碼丑陋且難用。如果使用Promise實(shí)現(xiàn),就會(huì)變成:

          doSomething().then(function(result) {
            return doSomethingElse(result);
          })
          .then(function(newResult) {
            return doThirdThing(newResult);
          })
          .then(function(finalResult) {
            console.log('Got the final result: ' + finalResult);
          })
          .catch(failureCallback);

          可以發(fā)現(xiàn)使用Promise方式后,代碼會(huì)變扁平,非常清爽。這里展示了Promise的2個(gè)特點(diǎn):

          • then函數(shù)執(zhí)行后返回的是一個(gè)Promise對(duì)象
          • Promise可以進(jìn)行鏈?zhǔn)?chain)調(diào)用

          簡(jiǎn)單理解Promise后,我們?cè)倩仡^看doSomething,doSomethingElse和doThirdThing,如果這是3個(gè)任務(wù),需要按順序調(diào)度執(zhí)行,后者需要前者的執(zhí)行結(jié)果作為參數(shù)?所以可以知道,從邏輯上講,Promise功能對(duì)于任務(wù)調(diào)度,非常重要。這種實(shí)現(xiàn)方式,和語(yǔ)言其實(shí)無(wú)關(guān)。

          Promise的實(shí)現(xiàn)

          先單元測(cè)試用例看從promise的使用:

          def test_signal(self):
              # mock一個(gè)函數(shù)
              callback = Mock(name='callback')
              # 創(chuàng)建一個(gè)Promise對(duì)象
              a = promise()
              # 使用then函數(shù)添加callback
              a.then(callback)
              # 執(zhí)行Promise對(duì)象
              a(42)
              # 函數(shù)被調(diào)用,參數(shù)為42
              callback.assert_called_once_with(42)

          不難看出python版本的Promise使用和JavaScript版本沒(méi)有太大區(qū)別。promise的構(gòu)造函數(shù)如下:

          class promise:
              ...
              def __init__(self, fun=None, args=None, kwargs=None,
                       callback=None, on_error=None, weak=False,
                       ignore_result=False):
              self.weak = weak
              self.ignore_result = ignore_result
              # 要執(zhí)行的函數(shù)
              self.fun = self._get_fun_or_weakref(fun=fun, weak=weak)
              # 注意位置參數(shù)是元祖,這樣才可以在call里疊加
              self.args = args or ()
              # 關(guān)鍵字參數(shù)是字典
              self.kwargs = kwargs or {}
              # ready,failed,cancelled 三個(gè)狀態(tài),默認(rèn)都是false
              self.ready = False
              self.failed = False
              self.value = None
              self.reason = None
              # Optimization
              # Most promises will only have one callback, so we optimize for this
              # case by using a list only when there are multiple callbacks.
              #   s(calar) pending / l(ist) pending
              # 單個(gè)callback/多個(gè)callback
              self._svpending = None
              self._lvpending = None
              self.on_error = on_error
              self.cancelled = False
              # 可見(jiàn)callback可以通過(guò)參數(shù)傳遞,也可以通過(guò)then函數(shù)傳遞
              if callback is not None:
                  self.then(callback)

          每個(gè)Promise有3個(gè)狀態(tài)位:ready,cancelled和failed,默認(rèn)都是false。

          重點(diǎn)就是then函數(shù):

          def then(self, callback, on_error=None):
              # callback是普通函數(shù),就用promise再嵌套一下
              if not isinstance(callback, Thenable):
                  callback = promise(callback, on_error=on_error)
              if self.cancelled:
                  callback.cancel()
                  return callback
              if self.failed:
                  callback.throw(self.reason)
              elif self.ready:
                  args, kwargs = self.value
                  callback(*args, **kwargs)
              if self._lvpending is None:
                  svpending = self._svpending
                  if svpending is not None:
                      self._svpending, self._lvpending = None, deque([svpending])
                  else:
                      # 初始復(fù)制callback給_svpending
                      # 就是一種遞歸
                      self._svpending = callback
                      return callback
              # 添加到右側(cè)
              self._lvpending.append(callback)
              # 返回的是一個(gè)promise可以繼續(xù)then,實(shí)現(xiàn)a.then(fun_x).then(fun_y).then(fun_z)這樣的鏈?zhǔn)秸{(diào)用
              return callback

          在JavaScript版本已經(jīng)介紹過(guò)Promise的2個(gè)特性就是,then返回一個(gè)新的Promise對(duì)象,又由于返回的是Promise對(duì)象,又可以繼續(xù)執(zhí)行then函數(shù)添加新的callback鏈?zhǔn)秸{(diào)用。

          Promise對(duì)象的執(zhí)行就在魔法函數(shù) *__call__:

          def __call__(self, *args, **kwargs):
              retval = None
              if self.cancelled:
                  return
              # 疊加參數(shù)
              final_args = self.args + args if args else self.args
              final_kwargs = dict(self.kwargs, **kwargs) if kwargs else self.kwargs
              # self.fun may be a weakref
              fun = self._fun_is_alive(self.fun)
              if fun is not None:
                  try:
                      if self.ignore_result:
                          fun(*final_args, **final_kwargs)
                          ca = ()
                          ck = {}
                      else:
                          # 執(zhí)行函數(shù)
                          retval = fun(*final_args, **final_kwargs)
                          self.value = (ca, ck) = (retval,), {}
                  except Exception:
                      # 異常
                      return self.throw()
              else:
                  self.value = (ca, ck) = final_args, final_kwargs
              # 更改ready狀態(tài)
              self.ready = True
              svpending = self._svpending
              # 執(zhí)行callback,把fun執(zhí)行的結(jié)果往callback里傳入這樣形成pipeline
              if svpending is not None:
                  try:
                      svpending(*ca, **ck)
                  finally:
                      self._svpending = None
              else:
                  lvpending = self._lvpending
                  try:
                      while lvpending:
                          # 從左開(kāi)始執(zhí)行
                          p = lvpending.popleft()
                          p(*ca, **ck)
                  finally:
                      self._lvpending = None
              return retval

          上述代碼主要步驟有:

          • 合并Promise對(duì)象的參數(shù)和函數(shù)調(diào)用參數(shù)
          • 如果有初始函數(shù),則執(zhí)行初始函數(shù)
          • 更改Promise對(duì)象的ready狀態(tài)
          • 執(zhí)行callback

          鏈?zhǔn)秸{(diào)用的示例,請(qǐng)看:

          def test_chained(self):

              def add(x, y):
                  return x + y

              def pow2(x):
                  return x ** 2

              adder = Mock(name='adder')
              adder.side_effect = add

              power = Mock(name='multiplier')
              power.side_effect = pow2

              final = Mock(name='final')

              p = promise()
              # 鏈?zhǔn)秸{(diào)用(注意是有序的)
              p.then(adder).then(power).then(final)

              p(42, 42)
              assert p.value == ((42, 42), {})
              adder.assert_called_with(42, 42)
              power.assert_called_with(84)
              final.assert_called_with(7056)

          Barrier的實(shí)現(xiàn)

          vine還提供了一個(gè)叫做barrier的實(shí)現(xiàn),處理多個(gè)Promise對(duì)象的串行化,下面是單元測(cè)試:

          class test_barrier:

              def setup(self):
                  self.m1, self.m2, self.m3 = Mock(), Mock(), Mock()
                  self.ps = [promise(self.m1), promise(self.m2), promise(self.m3)]

              def test_evaluate(self):
                  # 需要執(zhí)行4才才變成ready
                  x = barrier(self.ps)
                  x()
                  assert not x.ready
                  x()
                  assert not x.ready
                  x.add(promise())
                  x()
                  assert not x.ready
                  x()
                  assert x.ready
                  x()
                  x()
                  # 已經(jīng)執(zhí)行完成繼續(xù)添加會(huì)報(bào)錯(cuò)
                  with pytest.raises(ValueError):
                      x.add(promise())

          使用barrier后,4個(gè)Promise對(duì)象的調(diào)用可以串行化,并且可以單步執(zhí)行。執(zhí)行一次x()消耗一個(gè)Promise。

          barrier的構(gòu)造函數(shù):

          class barrier:
              
              def __init__(self, promises=None, args=None, kwargs=None,
                           callback=None, size=None):
                  # Promise的實(shí)現(xiàn)
                  self.p = promise()
                  self.args = args or ()
                  self.kwargs = kwargs or {}
                  self._value = 0
                  self.size = size or 0
                  if not self.size and promises:
                      # iter(l) calls len(l) so generator wrappers
                      # can only return NotImplemented in the case the
                      # generator is not fully consumed yet.
                      plen = promises.__len__()
                      if plen is not NotImplemented:
                          self.size = plen
                  self.ready = self.failed = False
                  self.reason = None
                  self.cancelled = False
                  self.finalized = False
                  # 列表推導(dǎo)式
                  [self.add_noincr(p) for p in promises or []]
                  self.finalized = bool(promises or self.size)
                  if callback:
                      self.then(callback)

          barrier重點(diǎn)是默認(rèn)有一個(gè)promise實(shí)現(xiàn),用來(lái)作為整個(gè)批處理的尾部。參數(shù)中的promises列表通過(guò)add_noincr函數(shù)形成調(diào)用鏈:

          def add_noincr(self, p):
              if not self.cancelled:
                  # 已經(jīng)完成了就不能夠再添加了
                  if self.ready:
                      raise ValueError('Cannot add promise to full barrier')
                  # 其實(shí)就是then().then().then() 添加到自己之前,自己主要執(zhí)行最開(kāi)始定義的callback
                  p.then(self)

          每執(zhí)行一次進(jìn)行計(jì)數(shù),直到執(zhí)行完成后更改狀態(tài)和執(zhí)行自身(尾部)的Promise對(duì)象

          def __call__(self, *args, **kwargs):
              # 判斷是否已經(jīng)執(zhí)行完成:ready和cancelled
              if not self.ready and not self.cancelled:
                  self._value += 1
                  if self.finalized and self._value >= self.size:
                      self.ready = True
                      self.p(*self.args, **self.kwargs)

          小結(jié)

          我們了解的celery是python實(shí)現(xiàn)的一個(gè)任務(wù)調(diào)度系統(tǒng),在github上廣受歡迎,更新活躍。學(xué)習(xí)可以使用任務(wù)調(diào)度方式,幫助我們處理web服務(wù)中一些耗時(shí)任務(wù)。簡(jiǎn)單了解celery項(xiàng)目的一些特點(diǎn),從celery的依賴(lài)項(xiàng)目vine開(kāi)始,了解Promise在任務(wù)調(diào)度系統(tǒng)中的應(yīng)用。最后從vine項(xiàng)目源碼中學(xué)習(xí),如何創(chuàng)建一個(gè)Promise系統(tǒng)。

          小技巧

          對(duì)于抽象的實(shí)現(xiàn),在Python中除了可以使用繼承方式,還可以使用組合Mixin。比如下面:

          class Thenable(Callable, metaclass=abc.ABCMeta):  # pragma: no cover
              ...
              @abc.abstractmethod
              def then(self, on_success, on_error=None):
                  """成功和失敗的2個(gè)回調(diào)"""
                  raise NotImplementedError()
              
          class CanThen:

              def then(self, x, y):
                  pass
                  
          assert isinstance(CanThen(), Thenable)

          可以看到CanThen實(shí)現(xiàn)then函數(shù)后,就可以被認(rèn)定為T(mén)henable的實(shí)現(xiàn),但是CanThen并未繼承自Thenable。這個(gè)魔法主要是由ABCMeta的register和__subclasshook__兩個(gè)方法實(shí)現(xiàn):

          class Thenable(Callable, metaclass=abc.ABCMeta): 
              
              ...
              
              @classmethod
              def __subclasshook__(cls, C):
                  # 也由ABCMeta提供
                  if cls is Thenable:
                      if any('then' in B.__dict__ for B in C.__mro__):
                          return True
                  return NotImplemented

              @classmethod
              def register(cls, other):
                  # overide to return other so `register` can be used as a decorator
                  # 這個(gè)register方法是由ABCMeta提供,其實(shí)現(xiàn)類(lèi)使用裝飾器方式
                  # https://docs.python.org/zh-cn/3/library/abc.html
                  type(cls).register(cls, other)
                  return other

          @Thenable.register
          class promise:
              pass

          assert isinstance(promise(lambda x: x), Thenable)

          promise類(lèi)經(jīng)過(guò)Thenable.register類(lèi)裝飾圈注釋一下后,就可以被認(rèn)定位Thenable的實(shí)現(xiàn),并不需要顯示的編寫(xiě)繼承。

          雖然遲到了,還是厚臉請(qǐng)大家多多點(diǎn)贊和分享支持,愛(ài)你喲??

          參考鏈接

          • Celery中文手冊(cè) https://www.celerycn.io/
          • JavaScript的promise實(shí)現(xiàn) https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Guide/Using_promises
          • 源碼匯 https://github.com/game404/yuanmahui
          Python貓技術(shù)交流群開(kāi)放啦!群里既有國(guó)內(nèi)一二線大廠在職員工,也有國(guó)內(nèi)外高校在讀學(xué)生,既有十多年碼齡的編程老鳥(niǎo),也有中小學(xué)剛剛?cè)腴T(mén)的新人,學(xué)習(xí)氛圍良好!想入群的同學(xué),請(qǐng)?jiān)诠?hào)內(nèi)回復(fù)『交流群』,獲取貓哥的微信(謝絕廣告黨,非誠(chéng)勿擾!)~


          還不過(guò)癮?試試它們




          任務(wù)隊(duì)列神器:Celery 入門(mén)到進(jìn)階指南

          基于 Redis 配置異步 Celery

          Django 3.0+Redis 3.4+Celery 4.4 應(yīng)用開(kāi)發(fā)(附源碼)

          Schedule—比 Celery 更輕量級(jí)的周期任務(wù)調(diào)度工具

          別再問(wèn)了,萬(wàn)字長(zhǎng)文教你用 Celery 執(zhí)行和周期任務(wù)(多圖)

          Python 實(shí)現(xiàn)定時(shí)任務(wù)的八種方案!


          如果你覺(jué)得本文有幫助
          請(qǐng)慷慨分享點(diǎn)贊,感謝啦!
          瀏覽 50
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  av在线资源站 | 亚洲一级毛 | 亚洲韩国在线 | 国产美女被鸡巴操 | 国产做受 高潮游戏视频 |