<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Python 神器 Celery 源碼解析(5)

          共 19477字,需瀏覽 39分鐘

           ·

          2021-11-27 21:01

          Celery是一款非常簡單、靈活、可靠的分布式系統(tǒng),可用于處理大量消息,并且提供了一整套操作此系統(tǒng)的工具。Celery 也是一款消息隊列工具,可用于處理實(shí)時數(shù)據(jù)以及任務(wù)調(diào)度。

          本文是是celery源碼解析的第篇,在前4篇里分別介紹了vine, py-amqp和kombu:

          1. 神器 celery 源碼解析- vine實(shí)現(xiàn)Promise功能
          2. 神器 celery 源碼解析- py-amqp實(shí)現(xiàn)AMQP協(xié)議
          3. 神器 celery 源碼解析- kombu,一個python實(shí)現(xiàn)的消息庫
          4. 神器 celery 源碼解析- kombu的企業(yè)級算法

          基本掃清celery的基礎(chǔ)庫后,我們正式進(jìn)入celery的源碼解析,本文包括下面幾個部分:

          • celery應(yīng)用示例
          • celery項(xiàng)目概述
          • worker啟動流程跟蹤
          • client啟動流程跟蹤
          • celery的app
          • worker模式啟動流程
          • 小結(jié)

          celery應(yīng)用示例

          啟動celery之前,我們先使用docker啟動一個redis服務(wù),作為broker:

          $?docker?run?-p?6379:6379?--name?redis?-d?redis:6.2.3-alpine

          使用telnet監(jiān)控redis服務(wù),觀測任務(wù)調(diào)度情況:

          $?telnet?127.0.0.1?6379
          Trying?127.0.0.1...
          Connected?to?localhost.
          Escape?character?is?'^]'.
          monitor
          +OK

          下面是我們的celery服務(wù)代碼?myapp.py?:

          #?myapp.py
          from?celery?import?Celery

          app?=?Celery(
          ????'myapp',
          ????broker='redis://localhost:6379/0',
          ????result_backend='redis://localhost:6379/0'
          )

          @app.task
          def?add(x,?y):
          ????print("add",?x,?y)
          ????return?x?+?y

          if?__name__?==?'__main__':
          ????app.start()

          打開一個新的終端,使用下面的命令啟動celery的worker服務(wù):

          $?python?myapp.py?worker?-l?DEBUG

          正常情況下,可以看到worker正常啟動。啟動的時候會顯示一些banner信息,包括AMQP的實(shí)現(xiàn)協(xié)議,任務(wù)等:

          $?celery?-A?myapp?worker?-l?DEBUG
          ?
          ?--------------?celery@bogon?v5.1.2?(sun-harmonics)
          ---?*****?-----?
          --?*******?----?macOS-10.16-x86_64-i386-64bit?2021-09-08?20:33:45
          -?***?---?*?---?
          -?**?----------?[config]
          -?**?----------?.>?app:?????????myapp:0x7f855079e730
          -?**?----------?.>?transport:???redis://localhost:6379/0
          -?**?----------?.>?results:?????disabled://
          -?***?---?*?---?.>?concurrency:?12?(prefork)
          --?*******?----?.>?task?events:?OFF?(enable?-E?to?monitor?tasks?in?this?worker)
          ---?*****?-----?
          ?--------------?[queues]
          ????????????????.>?celery???????????exchange=celery(direct)?key=celery
          ????????????????

          [tasks]
          ??.?myapp.add

          [2021-09-08?20:33:46,220:?INFO/MainProcess]?Connected?to?redis://localhost:6379/0
          [2021-09-08?20:33:46,234:?INFO/MainProcess]?mingle:?searching?for?neighbors
          [2021-09-08?20:33:47,279:?INFO/MainProcess]?mingle:?all?alone
          [2021-09-08?20:33:47,315:?INFO/MainProcess]?celery@bogon?ready.

          再開啟一個終端窗口,作為client執(zhí)行下面的代碼, 可以看到add函數(shù)正確的執(zhí)行,獲取到計算?16+16?的結(jié)果?32。注意: 這個過程是遠(yuǎn)程執(zhí)行的,使用的是delay方法,函數(shù)的打印print("add", x, y)并沒有輸出:

          $?python
          >>>?from?myapp?import?add
          >>>?task?=?add.delay(16,16)
          >>>?task

          >>>?task.get()
          32

          在celery的worker服務(wù)窗口,可以看到類似下面的輸出。收到一個執(zhí)行任務(wù)?myapp.add?的請求, 請求的uuid是?5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b?,參數(shù)數(shù)組是?[16, 16]?正常執(zhí)行后返回結(jié)果32。

          [2021-11-11?20:13:48,040:?INFO/MainProcess]?Task?myapp.add[5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b]?received
          [2021-11-11?20:13:48,040:?DEBUG/MainProcess]?TaskPool:?Apply?<function?fast_trace_task?at?0x7fda086baa60>?(args:('myapp.add',?'5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b',?{'lang':?'py',?'task':?'myapp.add',?'id':?'5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b',?'shadow':?None,?'eta':?None,?'expires':?None,?'group':?None,?'group_index':?None,?'retries':?0,?'timelimit':?[None,?None],?'root_id':?'5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b',?'parent_id':?None,?'argsrepr':?'(16,?16)',?'kwargsrepr':?'{}',?'origin':?'gen63119@localhost',?'ignore_result':?False,?'reply_to':?'97a3e117-c8cf-3d4c-97c0-c0a76aaf9a16',?'correlation_id':?'5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b',?'hostname':?'celery@localhost',?'delivery_info':?{'exchange':?'',?'routing_key':?'celery',?'priority':?0,?'redelivered':?None},?'args':?[16,?16],?'kwargs':?{}},?b'[[16,?16],?{},?{"callbacks":?null,?"errbacks":?null,?"chain":?null,?"chord":?null}]',?'application/json',?'utf-8')?kwargs:{})
          [2021-11-11?20:13:49,059:?INFO/ForkPoolWorker-8]?Task?myapp.add[5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b]?succeeded?in?1.0166977809999995s:?32

          在redis的monitor窗口,也可以可以看到類似的輸出,展示了過程中一些對redis的操作命令:

          +1636632828.304020?[0?172.16.0.117:51127]?"SUBSCRIBE"?"celery-task-meta-5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b"
          +1636632828.304447?[0?172.16.0.117:51129]?"PING"
          +1636632828.305448?[0?172.16.0.117:51129]?"LPUSH"?"celery"?"{\"body\":?\"W1sxNiwgMTZdLCB7fSwgeyJjYWxsYmFja3MiOiBudWxsLCAiZXJyYmFja3MiOiBudWxsLCAiY2hhaW4iOiBudWxsLCAiY2hvcmQiOiBudWxsfV0=\",?\"content-encoding\":?\"utf-8\",?\"content-type\":?\"application/json\",?\"headers\":?{\"lang\":?\"py\",?\"task\":?\"myapp.add\",?\"id\":?\"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\",?\"shadow\":?null,?\"eta\":?null,?\"expires\":?null,?\"group\":?null,?\"group_index\":?null,?\"retries\":?0,?\"timelimit\":?[null,?null],?\"root_id\":?\"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\",?\"parent_id\":?null,?\"argsrepr\":?\"(16,?16)\",?\"kwargsrepr\":?\"{}\",?\"origin\":?\"gen63119@localhost\",?\"ignore_result\":?false},?\"properties\":?{\"correlation_id\":?\"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\",?\"reply_to\":?\"97a3e117-c8cf-3d4c-97c0-c0a76aaf9a16\",?\"delivery_mode\":?2,?\"delivery_info\":?{\"exchange\":?\"\",?\"routing_key\":?\"celery\"},?\"priority\":?0,?\"body_encoding\":?\"base64\",?\"delivery_tag\":?\"20dbd584-b669-4ef0-8a3b-41d19b354690\"}}"
          +1636632828.307040?[0?172.16.0.117:52014]?"MULTI"
          +1636632828.307075?[0?172.16.0.117:52014]?"ZADD"?"unacked_index"?"1636632828.038743"?"20dbd584-b669-4ef0-8a3b-41d19b354690"
          +1636632828.307088?[0?172.16.0.117:52014]?"HSET"?"unacked"?"20dbd584-b669-4ef0-8a3b-41d19b354690"?"[{\"body\":?\"W1sxNiwgMTZdLCB7fSwgeyJjYWxsYmFja3MiOiBudWxsLCAiZXJyYmFja3MiOiBudWxsLCAiY2hhaW4iOiBudWxsLCAiY2hvcmQiOiBudWxsfV0=\",?\"content-encoding\":?\"utf-8\",?\"content-type\":?\"application/json\",?\"headers\":?{\"lang\":?\"py\",?\"task\":?\"myapp.add\",?\"id\":?\"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\",?\"shadow\":?null,?\"eta\":?null,?\"expires\":?null,?\"group\":?null,?\"group_index\":?null,?\"retries\":?0,?\"timelimit\":?[null,?null],?\"root_id\":?\"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\",?\"parent_id\":?null,?\"argsrepr\":?\"(16,?16)\",?\"kwargsrepr\":?\"{}\",?\"origin\":?\"gen63119@localhost\",?\"ignore_result\":?false},?\"properties\":?{\"correlation_id\":?\"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\",?\"reply_to\":?\"97a3e117-c8cf-3d4c-97c0-c0a76aaf9a16\",?\"delivery_mode\":?2,?\"delivery_info\":?{\"exchange\":?\"\",?\"routing_key\":?\"celery\"},?\"priority\":?0,?\"body_encoding\":?\"base64\",?\"delivery_tag\":?\"20dbd584-b669-4ef0-8a3b-41d19b354690\"}},?\"\",?\"celery\"]"
          ...

          我們再一次回顧下圖,對比一下示例,加強(qiáng)理解:



          hello-world-example-routing
          • 我們先啟動一個celery的worker服務(wù)作為消費(fèi)者
          • 再啟動一個窗口作為生產(chǎn)者執(zhí)行task
          • 使用redis作為broker,負(fù)責(zé)生產(chǎn)者和消費(fèi)者之間的消息通訊
          • 最終生成者的task,作為消息發(fā)送到遠(yuǎn)程的消費(fèi)者上執(zhí)行,執(zhí)行的結(jié)果又通過網(wǎng)絡(luò)回傳給生產(chǎn)者

          上面示例展示了celery作為一個分布式任務(wù)調(diào)度系統(tǒng)的執(zhí)行過程,本地的任務(wù)調(diào)用,通過AMQP協(xié)議的包裝,作為消息發(fā)送到遠(yuǎn)程的消費(fèi)者執(zhí)行。


          celery項(xiàng)目概述

          解析celery采用的代碼版本5.0.5, 主要模塊結(jié)構(gòu):

          模塊描述
          appcelery的app實(shí)現(xiàn)
          appscelery服務(wù)的三種主要模式,worker,beat和multi
          backends任務(wù)結(jié)果存儲
          bin命令行工具實(shí)現(xiàn)
          concurrency各種并發(fā)實(shí)現(xiàn),包括線程,gevent,asyncpool等
          events事件實(shí)現(xiàn)
          worker服務(wù)啟動環(huán)節(jié)實(shí)現(xiàn)
          beat.py&&schedules.py定時和調(diào)度實(shí)現(xiàn)
          result.py任務(wù)結(jié)果實(shí)現(xiàn)
          signals.py一些信號定義
          status.py一些狀態(tài)定義

          從項(xiàng)目結(jié)構(gòu)看,模塊較多,功能復(fù)雜。不過我們已經(jīng)搞定了vine, py-amqp和kombu三個庫,接下來只需要理解worker,beat和multi三種服務(wù)模型,就可以較好的了解celery這個分布式系統(tǒng)如何構(gòu)建。


          worker啟動流程跟蹤

          worker的啟動命令?celery -A myapp worker -l DEBUG?使celery作為一個模塊,入口在main文件的main函數(shù):

          #?ch23-celery/celery-5.0.5/celery/__main__.py
          def?main():
          ????"""Entrypoint?to?the?``celery``?umbrella?command."""
          ????"""celery命令入口"""
          ????...
          ????#?具體執(zhí)行的main函數(shù)
          ????from?celery.bin.celery?import?main?as?_main
          ????sys.exit(_main())

          celery命令作為主命令,加載celery-app的同時,還會啟動worker子命令:

          #?ch23-celery/celery-5.0.5/celery/bin/celery.py
          def?celery(ctx,?app,?broker,?result_backend,?loader,?config,?workdir,
          ???????????no_color,?quiet,?version):
          ????"""Celery?command?entrypoint."""
          ????...
          ????ctx.obj?=?CLIContext(app=app,?no_color=no_color,?workdir=workdir,
          ?????????????????????????quiet=quiet)
          ????#?worker/beat/events三個主要子命令參數(shù)
          ????#?User?options
          ????worker.params.extend(ctx.obj.app.user_options.get('worker',?[]))
          ????beat.params.extend(ctx.obj.app.user_options.get('beat',?[]))
          ????events.params.extend(ctx.obj.app.user_options.get('events',?[]))

          def?main()?->?int:
          ????"""Start?celery?umbrella?command.

          ????This?function?is?the?main?entrypoint?for?the?CLI.

          ????:return:?The?exit?code?of?the?CLI.
          ????"
          ""
          ????return?celery(auto_envvar_prefix="CELERY")

          在worker子命令中創(chuàng)建worker并啟動:

          #?ch23-celery/celery-5.0.5/celery/bin/worker.py
          def?worker(ctx,?hostname=None,?pool_cls=None,?app=None,?uid=None,?gid=None,
          ???????????loglevel=None,?logfile=None,?pidfile=None,?statedb=None,
          ???????????**kwargs):
          ????#?創(chuàng)建和啟動worker
          ????worker?=?app.Worker(
          ????????hostname=hostname,?pool_cls=pool_cls,?loglevel=loglevel,
          ????????logfile=logfile,??#?node?format?handled?by?celery.app.log.setup
          ????????pidfile=node_format(pidfile,?hostname),
          ????????statedb=node_format(statedb,?hostname),
          ????????no_color=ctx.obj.no_color,
          ????????quiet=ctx.obj.quiet,
          ????????**kwargs)
          ????worker.start()

          下面是創(chuàng)建worker的方式,創(chuàng)一個?celery.apps.worker:Worker?對象:

          #?ch23-celery/celery-5.0.5/celery/app/base.py
          def?Worker(self):
          ????#?創(chuàng)建worker
          ????return?self.subclass_with_self('celery.apps.worker:Worker')

          服務(wù)啟動過程中,調(diào)用鏈路如下:

          ?????????????????????????????????+----------+
          ?????????????????????????????+--->app.celery|
          ?????????????????????????????|???+----------+
          +---------+???+----------+???|
          |main.main+--->bin.celery+---+
          +---------+???+----------+???|
          ?????????????????????????????|???+----------+???+-----------+
          ?????????????????????????????+--->bin.worker+--->apps.worker|
          ?????????????????????????????????+----------+???+-----------+

          在這個服務(wù)啟動過程中,創(chuàng)建了celery-application和worker-application兩個應(yīng)用程序。至于具體的啟動流程,我們暫時跳過,先看看客戶端的流程。


          client啟動流程分析

          示例client的啟動過程包括下面4步: 1 創(chuàng)建celery-application, 2 創(chuàng)建task 3 調(diào)用task的delay方法執(zhí)行任務(wù)得到一個異步結(jié)果 4 最后使用異步結(jié)果的get方法獲取真實(shí)結(jié)果

          task是通過app創(chuàng)建的裝飾器創(chuàng)建的Promise對象:

          #?ch23-celery/celery-5.0.5/celery/app/base.py
          task_cls?=?'celery.app.task:Task'

          def?task(self,?*args,?**opts):
          ????"""Decorator?to?create?a?task?class?out?of?any?callable.
          ????"
          ""
          ????def?inner_create_task_cls(shared=True,?filter=None,?lazy=True,?**opts):
          ????????
          ????????def?_create_task_cls(fun):
          ????????????
          ????????????ret?=?PromiseProxy(self._task_from_fun,?(fun,),?opts,
          ???????????????????????????????????????__doc__=fun.__doc__)
          ????????????return?ret

          ????????return?_create_task_cls
          ????return?inner_create_task_cls(**opts)

          task實(shí)際上是一個由Task基類動態(tài)創(chuàng)建的子類:

          def?_task_from_fun(self,?fun,?name=None,?base=None,?bind=False,?**options):
          ????base?=?base?or?self.Task
          ????task?=?type(fun.__name__,?(base,),?dict({
          ????????????????'app':?self,
          ????????????????'name':?name,
          ????????????????'run':?run,
          ????????????????'_decorated':?True,
          ????????????????'__doc__':?fun.__doc__,
          ????????????????'__module__':?fun.__module__,
          ????????????????'__annotations__':?fun.__annotations__,
          ????????????????'__header__':?staticmethod(head_from_fun(fun,?bound=bind)),
          ????????????????'__wrapped__':?run},?**options))
          ????add_autoretry_behaviour(task,?**options)
          ????#?增加task
          ????self._tasks[task.name]?=?task
          ????task.bind(self)??#?connects?task?to?this?app
          ????add_autoretry_behaviour(task,?**options)
          ????return?task

          任務(wù)的執(zhí)行使用app的send_task方法進(jìn)行:

          #?ch23-celery/celery-5.0.5/celery/app/task.py
          def?delay(self,?*args,?**kwargs):
          ????...
          ????return?app.send_task(
          ????????????????self.name,?args,?kwargs,?task_id=task_id,?producer=producer,
          ????????????????link=link,?link_error=link_error,?result_cls=self.AsyncResult,
          ????????????????shadow=shadow,?task_type=self,
          ????????????????**options
          ????????????)

          可以看到,client作為生產(chǎn)者啟動任務(wù),也需要創(chuàng)建celery-application,下面我們就先看celery-application的實(shí)現(xiàn)。


          celery的app兩大功能

          Celery的構(gòu)造函數(shù):

          class?Celery:
          ????
          ????#?協(xié)議類
          ????amqp_cls?=?'celery.app.amqp:AMQP'
          ????backend_cls?=?None
          ????#?事件類
          ????events_cls?=?'celery.app.events:Events'
          ????loader_cls?=?None
          ????log_cls?=?'celery.app.log:Logging'
          ????#?控制類
          ????control_cls?=?'celery.app.control:Control'
          ????#?任務(wù)類
          ????task_cls?=?'celery.app.task:Task'
          ????#?任務(wù)注冊中心
          ????registry_cls?=?'celery.app.registry:TaskRegistry'
          ????...
          ????
          ????def?__init__(self,?main=None,?loader=None,?backend=None,
          ?????????????????amqp=None,?events=None,?log=None,?control=None,
          ?????????????????set_as_current=True,?tasks=None,?broker=None,?include=None,
          ?????????????????changes=None,?config_source=None,?fixups=None,?task_cls=None,
          ?????????????????autofinalize=True,?namespace=None,?strict_typing=True,
          ?????????????????**kwargs):
          ????????#?啟動步驟
          ????????self.steps?=?defaultdict(set)
          ????????#?待執(zhí)行的task
          ????????self._pending?=?deque()
          ????????#?所有任務(wù)
          ????????self._tasks?=?self.registry_cls(self._tasks?or?{})
          ????????...
          ????????self.__autoset('broker_url',?broker)
          ????????self.__autoset('result_backend',?backend)
          ????????...
          ????????self.on_init()
          ????????_register_app(self)

          可以看到celery類提供了一些默認(rèn)模塊類的名稱,可以根據(jù)這些類名動態(tài)創(chuàng)建對象。app對象任務(wù)的處理使用一個隊列作為pending狀態(tài)的任務(wù)容器,使用TaskRegistry來管理任務(wù)的注冊。

          任務(wù)通過task裝飾器,記錄到celery的TaskRegistry中:

          def?task(self,?*args,?**opts):
          ????...
          ????#?增加task
          ????self._tasks[task.name]?=?task
          ????task.bind(self)??#?connects?task?to?this?app
          ????add_autoretry_behaviour(task,?**options)
          ????...

          celery另外一個核心功能是提供到broker的連接:

          def?_connection(self,?url,?userid=None,?password=None,
          ????????????????virtual_host=None,?port=None,?ssl=None,
          ????????????????connect_timeout=None,?transport=None,
          ????????????????transport_options=None,?heartbeat=None,
          ????????????????login_method=None,?failover_strategy=None,?**kwargs):
          ????conf?=?self.conf
          ????return?self.amqp.Connection(
          ????????url,
          ????????userid?or?conf.broker_user,
          ????????password?or?conf.broker_password,
          ????????virtual_host?or?conf.broker_vhost,
          ????????port?or?conf.broker_port,
          ????????transport=transport?or?conf.broker_transport,
          ????????ssl=self.either('broker_use_ssl',?ssl),
          ????????heartbeat=heartbeat,
          ????????login_method=login_method?or?conf.broker_login_method,
          ????????failover_strategy=(
          ????????????failover_strategy?or?conf.broker_failover_strategy
          ????????),
          ????????transport_options=dict(
          ????????????conf.broker_transport_options,?**transport_options?or?{}
          ????????),
          ????????connect_timeout=self.either(
          ????????????'broker_connection_timeout',?connect_timeout
          ????????),
          ????)
          broker_connection?=?connection

          @cached_property
          def?amqp(self):
          ????"""AMQP?related?functionality:?:class:`~@amqp`."""
          ????return?instantiate(self.amqp_cls,?app=self)

          AMQP的實(shí)現(xiàn),是依賴kombu提供的AMQP協(xié)議封裝:

          from?kombu?import?Connection,?Consumer,?Exchange,?Producer,?Queue,?pools

          class?AMQP:
          ????"""App?AMQP?API:?app.amqp."""

          ????Connection?=?Connection

          然后使用我們熟悉的Queue,Consumer,Producer進(jìn)行消息的生成和消費(fèi):

          def?Queues(self,?queues,?create_missing=None,
          ???????????autoexchange=None,?max_priority=None):
          ????...
          ????return?self.Queues(
          ????????????queues,?self.default_exchange,?create_missing,
          ????????????autoexchange,?max_priority,?default_routing_key,
          ????????)
          ????????
          def?TaskConsumer(self,?channel,?queues=None,?accept=None,?**kw):
          ????...
          ????return?self.Consumer(
          ????????channel,?accept=accept,
          ????????queues=queues?or?list(self.queues.consume_from.values()),
          ????????**kw
          ????)

          def?_create_task_sender(self):
          ????...
          ????producer.publish(
          ????????????????body,
          ????????????????exchange=exchange,
          ????????????????routing_key=routing_key,
          ????????????????serializer=serializer?or?default_serializer,
          ????????????????compression=compression?or?default_compressor,
          ????????????????retry=retry,?retry_policy=_rp,
          ????????????????delivery_mode=delivery_mode,?declare=declare,
          ????????????????headers=headers2,
          ????????????????**properties
          ????????????)
          ????...

          celery-app的兩大功能,管理task和管理AMQP連接,我們有一個大概的了解。


          worker模式啟動流程

          worker模式啟動在WorkController中,將服務(wù)分成不同的階段,然后將各個階段組裝成一個叫做藍(lán)圖(Blueprint)的方式進(jìn)行管理:

          class?WorkController:
          ????#?內(nèi)部類
          ????class?Blueprint(bootsteps.Blueprint):
          ????????"""Worker?bootstep?blueprint."""

          ????????name?=?'Worker'
          ????????default_steps?=?{
          ????????????'celery.worker.components:Hub',
          ????????????'celery.worker.components:Pool',
          ????????????'celery.worker.components:Beat',
          ????????????'celery.worker.components:Timer',
          ????????????'celery.worker.components:StateDB',
          ????????????'celery.worker.components:Consumer',
          ????????????'celery.worker.autoscale:WorkerComponent',
          ????????}
          ????
          ????def?__init__(self,?app=None,?hostname=None,?**kwargs):
          ????????self.blueprint?=?self.Blueprint(
          ????????????steps=self.app.steps['worker'],
          ????????????on_start=self.on_start,
          ????????????on_close=self.on_close,
          ????????????on_stopped=self.on_stopped,
          ????????)
          ????????self.blueprint.apply(self,?**kwargs)

          啟動藍(lán)圖:

          def?start(self):
          ????try:
          ????????#?啟動worker
          ????????self.blueprint.start(self)
          ????except?WorkerTerminate:
          ????????self.terminate()
          ????except?Exception?as?exc:
          ????????logger.critical('Unrecoverable?error:?%r',?exc,?exc_info=True)
          ????????self.stop(exitcode=EX_FAILURE)
          ????except?SystemExit?as?exc:
          ????????self.stop(exitcode=exc.code)
          ????except?KeyboardInterrupt:
          ????????self.stop(exitcode=EX_FAILURE)

          啟動步驟,比較簡單,大概代碼如下:

          class?StepType(type):
          ????"""Meta-class?for?steps."""

          ????name?=?None
          ????requires?=?None

          class?Step(metaclass=StepType):
          ????...
          ????
          ????def?instantiate(self,?name,?*args,?**kwargs):
          ????????return?symbol_by_name(name)(*args,?**kwargs)
          ????
          ????def?include_if(self,?parent):
          ????????return?self.enabled
          ????????
          ????def?_should_include(self,?parent):
          ????????if?self.include_if(parent):
          ????????????return?True,?self.create(parent)
          ????????return?False,?None

          ????def?create(self,?parent):
          ????????"""Create?the?step."""

          從Step大概可以看出:

          • 每個步驟,可以有依賴requires
          • 每個步驟,可以有具體的動作instantiate
          • 步驟具有樹狀的父子結(jié)構(gòu),可以自動創(chuàng)建上級步驟

          比如一個消費(fèi)者步驟, 依賴Connection步驟。啟動的時候?qū)onnection進(jìn)行消費(fèi)。兩者代碼如下:

          class?ConsumerStep(StartStopStep):
          ????"""Bootstep?that?starts?a?message?consumer."""

          ????requires?=?('celery.worker.consumer:Connection',)
          ????consumers?=?None

          ????def?start(self,?c):
          ????????channel?=?c.connection.channel()
          ????????self.consumers?=?self.get_consumers(channel)
          ????????for?consumer?in?self.consumers?or?[]:
          ????????????consumer.consume()

          class?Connection(bootsteps.StartStopStep):
          ????"""Service?managing?the?consumer?broker?connection."""

          ????def?__init__(self,?c,?**kwargs):
          ????????c.connection?=?None
          ????????super().__init__(c,?**kwargs)

          ????def?start(self,?c):
          ????????c.connection?=?c.connect()
          ????????info('Connected?to?%s',?c.connection.as_uri())

          在Blueprint中創(chuàng)建和管理這些step:

          class?Blueprint:
          ????
          ????def?__init__(self,?steps=None,?name=None,
          ?????????????????on_start=None,?on_close=None,?on_stopped=None):
          ????????self.name?=?name?or?self.name?or?qualname(type(self))
          ????????#?并集
          ????????self.types?=?set(steps?or?[])?|?set(self.default_steps)
          ????????...
          ????????self.steps?=?{}

          ????def?apply(self,?parent,?**kwargs):
          ????????steps?=?self.steps?=?dict(symbol_by_name(step)?for?step?in?self.types)

          ????????self._debug('Building?graph...')
          ????????for?S?in?self._finalize_steps(steps):
          ????????????step?=?S(parent,?**kwargs)
          ????????????steps[step.name]?=?step
          ????????????order.append(step)
          ????????self._debug('New?boot?order:?{%s}',
          ????????????????????',?'.join(s.alias?for?s?in?self.order))
          ????????for?step?in?order:
          ????????????step.include(parent)
          ????????return?self

          啟動Blueprint:

          def?start(self,?parent):
          ????self.state?=?RUN
          ????if?self.on_start:
          ????????self.on_start()
          ????for?i,?step?in?enumerate(s?for?s?in?parent.steps?if?s?is?not?None):
          ????????self._debug('Starting?%s',?step.alias)
          ????????self.started?=?i?+?1
          ????????step.start(parent)
          ????????logger.debug('^--?substep?ok')

          通過將啟動過程拆分成多個step單元,然后組合單元構(gòu)建成graph,逐一啟動。


          小結(jié)

          本篇我們正式學(xué)習(xí)了一下celery的使用流程,了解celery如果使用redis作為broker,利用服務(wù)作為消費(fèi)者,使用客戶端作為生成者,完成一次遠(yuǎn)程任務(wù)的執(zhí)行。簡單探索worker服務(wù)模式的啟動流程,重點(diǎn)分析celery-application的管理task和管理連接兩大功能實(shí)現(xiàn)。

          小技巧

          celery中展示了一種動態(tài)創(chuàng)建類和對象的方法:

          task?=?type(fun.__name__,?(Task,),?dict({
          ????????????????'app':?self,
          ????????????????'name':?name,
          ????????????????'run':?run,
          ????????????????'_decorated':?True,
          ????????????????'__doc__':?fun.__doc__,
          ????????????????'__module__':?fun.__module__,
          ????????????????'__annotations__':?fun.__annotations__,
          ????????????????'__header__':?staticmethod(head_from_fun(fun,?bound=bind)),
          ????????????????'__wrapped__':?run},?**options))()

          通過type函數(shù)創(chuàng)了一個動態(tài)的task子類,然后執(zhí)行?()?實(shí)例化一個task子對象。

          參考鏈接

          • 以編程方式定義類 https://python3-cookbook.readthedocs.io/zh_CN/latest/c09/p18_define_classes_programmatically.html
          Python貓技術(shù)交流群開放啦!群里既有國內(nèi)一二線大廠在職員工,也有國內(nèi)外高校在讀學(xué)生,既有十多年碼齡的編程老鳥,也有中小學(xué)剛剛?cè)腴T的新人,學(xué)習(xí)氛圍良好!想入群的同學(xué),請在公號內(nèi)回復(fù)『交流群』,獲取貓哥的微信(謝絕廣告黨,非誠勿擾!)~


          還不過癮?試試它們




          分享幾款超好用的 REST API 工具

          Python進(jìn)階:自定義對象實(shí)現(xiàn)切片功能

          len(x) 擊敗 x.len(),從內(nèi)置函數(shù)看 Python 的設(shè)計思想

          當(dāng)談?wù)摰鲿r,我談些什么?

          面向?qū)ο缶幊淌欠褡呦蛄讼觯?/a>

          別再問了,萬字長文教你用 Celery 執(zhí)行和周期任務(wù)(多圖)


          如果你覺得本文有幫助
          請慷慨分享點(diǎn)贊,感謝啦
          瀏覽 128
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  天天干天天干天天干 | 偷拍黄色小视频 | 国产在线小视频 | 天天干天天操天天谢谢 | 美穴在线|