Python 神器 Celery 源碼解析(5)

Celery是一款非常簡單、靈活、可靠的分布式系統(tǒng),可用于處理大量消息,并且提供了一整套操作此系統(tǒng)的工具。Celery 也是一款消息隊列工具,可用于處理實(shí)時數(shù)據(jù)以及任務(wù)調(diào)度。
本文是是celery源碼解析的第五篇,在前4篇里分別介紹了vine, py-amqp和kombu:
神器 celery 源碼解析- vine實(shí)現(xiàn)Promise功能 神器 celery 源碼解析- py-amqp實(shí)現(xiàn)AMQP協(xié)議 神器 celery 源碼解析- kombu,一個python實(shí)現(xiàn)的消息庫 神器 celery 源碼解析- kombu的企業(yè)級算法
基本掃清celery的基礎(chǔ)庫后,我們正式進(jìn)入celery的源碼解析,本文包括下面幾個部分:
celery應(yīng)用示例 celery項(xiàng)目概述 worker啟動流程跟蹤 client啟動流程跟蹤 celery的app worker模式啟動流程 小結(jié)
celery應(yīng)用示例
啟動celery之前,我們先使用docker啟動一個redis服務(wù),作為broker:
$?docker?run?-p?6379:6379?--name?redis?-d?redis:6.2.3-alpine
使用telnet監(jiān)控redis服務(wù),觀測任務(wù)調(diào)度情況:
$?telnet?127.0.0.1?6379
Trying?127.0.0.1...
Connected?to?localhost.
Escape?character?is?'^]'.
monitor
+OK
下面是我們的celery服務(wù)代碼?myapp.py?:
#?myapp.py
from?celery?import?Celery
app?=?Celery(
????'myapp',
????broker='redis://localhost:6379/0',
????result_backend='redis://localhost:6379/0'
)
@app.task
def?add(x,?y):
????print("add",?x,?y)
????return?x?+?y
if?__name__?==?'__main__':
????app.start()
打開一個新的終端,使用下面的命令啟動celery的worker服務(wù):
$?python?myapp.py?worker?-l?DEBUG
正常情況下,可以看到worker正常啟動。啟動的時候會顯示一些banner信息,包括AMQP的實(shí)現(xiàn)協(xié)議,任務(wù)等:
$?celery?-A?myapp?worker?-l?DEBUG
?
?--------------?celery@bogon?v5.1.2?(sun-harmonics)
---?*****?-----?
--?*******?----?macOS-10.16-x86_64-i386-64bit?2021-09-08?20:33:45
-?***?---?*?---?
-?**?----------?[config]
-?**?----------?.>?app:?????????myapp:0x7f855079e730
-?**?----------?.>?transport:???redis://localhost:6379/0
-?**?----------?.>?results:?????disabled://
-?***?---?*?---?.>?concurrency:?12?(prefork)
--?*******?----?.>?task?events:?OFF?(enable?-E?to?monitor?tasks?in?this?worker)
---?*****?-----?
?--------------?[queues]
????????????????.>?celery???????????exchange=celery(direct)?key=celery
????????????????
[tasks]
??.?myapp.add
[2021-09-08?20:33:46,220:?INFO/MainProcess]?Connected?to?redis://localhost:6379/0
[2021-09-08?20:33:46,234:?INFO/MainProcess]?mingle:?searching?for?neighbors
[2021-09-08?20:33:47,279:?INFO/MainProcess]?mingle:?all?alone
[2021-09-08?20:33:47,315:?INFO/MainProcess]?celery@bogon?ready.
再開啟一個終端窗口,作為client執(zhí)行下面的代碼, 可以看到add函數(shù)正確的執(zhí)行,獲取到計算?16+16?的結(jié)果?32。注意: 這個過程是遠(yuǎn)程執(zhí)行的,使用的是delay方法,函數(shù)的打印print("add", x, y)并沒有輸出:
$?python
>>>?from?myapp?import?add
>>>?task?=?add.delay(16,16)
>>>?task
>>>?task.get()
32
在celery的worker服務(wù)窗口,可以看到類似下面的輸出。收到一個執(zhí)行任務(wù)?myapp.add?的請求, 請求的uuid是?5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b?,參數(shù)數(shù)組是?[16, 16]?正常執(zhí)行后返回結(jié)果32。
[2021-11-11?20:13:48,040:?INFO/MainProcess]?Task?myapp.add[5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b]?received
[2021-11-11?20:13:48,040:?DEBUG/MainProcess]?TaskPool:?Apply?<function?fast_trace_task?at?0x7fda086baa60>?(args:('myapp.add',?'5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b',?{'lang':?'py',?'task':?'myapp.add',?'id':?'5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b',?'shadow':?None,?'eta':?None,?'expires':?None,?'group':?None,?'group_index':?None,?'retries':?0,?'timelimit':?[None,?None],?'root_id':?'5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b',?'parent_id':?None,?'argsrepr':?'(16,?16)',?'kwargsrepr':?'{}',?'origin':?'gen63119@localhost',?'ignore_result':?False,?'reply_to':?'97a3e117-c8cf-3d4c-97c0-c0a76aaf9a16',?'correlation_id':?'5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b',?'hostname':?'celery@localhost',?'delivery_info':?{'exchange':?'',?'routing_key':?'celery',?'priority':?0,?'redelivered':?None},?'args':?[16,?16],?'kwargs':?{}},?b'[[16,?16],?{},?{"callbacks":?null,?"errbacks":?null,?"chain":?null,?"chord":?null}]',?'application/json',?'utf-8')?kwargs:{})
[2021-11-11?20:13:49,059:?INFO/ForkPoolWorker-8]?Task?myapp.add[5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b]?succeeded?in?1.0166977809999995s:?32
在redis的monitor窗口,也可以可以看到類似的輸出,展示了過程中一些對redis的操作命令:
+1636632828.304020?[0?172.16.0.117:51127]?"SUBSCRIBE"?"celery-task-meta-5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b"
+1636632828.304447?[0?172.16.0.117:51129]?"PING"
+1636632828.305448?[0?172.16.0.117:51129]?"LPUSH"?"celery"?"{\"body\":?\"W1sxNiwgMTZdLCB7fSwgeyJjYWxsYmFja3MiOiBudWxsLCAiZXJyYmFja3MiOiBudWxsLCAiY2hhaW4iOiBudWxsLCAiY2hvcmQiOiBudWxsfV0=\",?\"content-encoding\":?\"utf-8\",?\"content-type\":?\"application/json\",?\"headers\":?{\"lang\":?\"py\",?\"task\":?\"myapp.add\",?\"id\":?\"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\",?\"shadow\":?null,?\"eta\":?null,?\"expires\":?null,?\"group\":?null,?\"group_index\":?null,?\"retries\":?0,?\"timelimit\":?[null,?null],?\"root_id\":?\"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\",?\"parent_id\":?null,?\"argsrepr\":?\"(16,?16)\",?\"kwargsrepr\":?\"{}\",?\"origin\":?\"gen63119@localhost\",?\"ignore_result\":?false},?\"properties\":?{\"correlation_id\":?\"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\",?\"reply_to\":?\"97a3e117-c8cf-3d4c-97c0-c0a76aaf9a16\",?\"delivery_mode\":?2,?\"delivery_info\":?{\"exchange\":?\"\",?\"routing_key\":?\"celery\"},?\"priority\":?0,?\"body_encoding\":?\"base64\",?\"delivery_tag\":?\"20dbd584-b669-4ef0-8a3b-41d19b354690\"}}"
+1636632828.307040?[0?172.16.0.117:52014]?"MULTI"
+1636632828.307075?[0?172.16.0.117:52014]?"ZADD"?"unacked_index"?"1636632828.038743"?"20dbd584-b669-4ef0-8a3b-41d19b354690"
+1636632828.307088?[0?172.16.0.117:52014]?"HSET"?"unacked"?"20dbd584-b669-4ef0-8a3b-41d19b354690"?"[{\"body\":?\"W1sxNiwgMTZdLCB7fSwgeyJjYWxsYmFja3MiOiBudWxsLCAiZXJyYmFja3MiOiBudWxsLCAiY2hhaW4iOiBudWxsLCAiY2hvcmQiOiBudWxsfV0=\",?\"content-encoding\":?\"utf-8\",?\"content-type\":?\"application/json\",?\"headers\":?{\"lang\":?\"py\",?\"task\":?\"myapp.add\",?\"id\":?\"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\",?\"shadow\":?null,?\"eta\":?null,?\"expires\":?null,?\"group\":?null,?\"group_index\":?null,?\"retries\":?0,?\"timelimit\":?[null,?null],?\"root_id\":?\"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\",?\"parent_id\":?null,?\"argsrepr\":?\"(16,?16)\",?\"kwargsrepr\":?\"{}\",?\"origin\":?\"gen63119@localhost\",?\"ignore_result\":?false},?\"properties\":?{\"correlation_id\":?\"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\",?\"reply_to\":?\"97a3e117-c8cf-3d4c-97c0-c0a76aaf9a16\",?\"delivery_mode\":?2,?\"delivery_info\":?{\"exchange\":?\"\",?\"routing_key\":?\"celery\"},?\"priority\":?0,?\"body_encoding\":?\"base64\",?\"delivery_tag\":?\"20dbd584-b669-4ef0-8a3b-41d19b354690\"}},?\"\",?\"celery\"]"
...
我們再一次回顧下圖,對比一下示例,加強(qiáng)理解:

我們先啟動一個celery的worker服務(wù)作為消費(fèi)者 再啟動一個窗口作為生產(chǎn)者執(zhí)行task 使用redis作為broker,負(fù)責(zé)生產(chǎn)者和消費(fèi)者之間的消息通訊 最終生成者的task,作為消息發(fā)送到遠(yuǎn)程的消費(fèi)者上執(zhí)行,執(zhí)行的結(jié)果又通過網(wǎng)絡(luò)回傳給生產(chǎn)者
上面示例展示了celery作為一個分布式任務(wù)調(diào)度系統(tǒng)的執(zhí)行過程,本地的任務(wù)調(diào)用,通過AMQP協(xié)議的包裝,作為消息發(fā)送到遠(yuǎn)程的消費(fèi)者執(zhí)行。
celery項(xiàng)目概述
解析celery采用的代碼版本5.0.5, 主要模塊結(jié)構(gòu):
| 模塊 | 描述 |
|---|---|
| app | celery的app實(shí)現(xiàn) |
| apps | celery服務(wù)的三種主要模式,worker,beat和multi |
| backends | 任務(wù)結(jié)果存儲 |
| bin | 命令行工具實(shí)現(xiàn) |
| concurrency | 各種并發(fā)實(shí)現(xiàn),包括線程,gevent,asyncpool等 |
| events | 事件實(shí)現(xiàn) |
| worker | 服務(wù)啟動環(huán)節(jié)實(shí)現(xiàn) |
| beat.py&&schedules.py | 定時和調(diào)度實(shí)現(xiàn) |
| result.py | 任務(wù)結(jié)果實(shí)現(xiàn) |
| signals.py | 一些信號定義 |
| status.py | 一些狀態(tài)定義 |
從項(xiàng)目結(jié)構(gòu)看,模塊較多,功能復(fù)雜。不過我們已經(jīng)搞定了vine, py-amqp和kombu三個庫,接下來只需要理解worker,beat和multi三種服務(wù)模型,就可以較好的了解celery這個分布式系統(tǒng)如何構(gòu)建。
worker啟動流程跟蹤
worker的啟動命令?celery -A myapp worker -l DEBUG?使celery作為一個模塊,入口在main文件的main函數(shù):
#?ch23-celery/celery-5.0.5/celery/__main__.py
def?main():
????"""Entrypoint?to?the?``celery``?umbrella?command."""
????"""celery命令入口"""
????...
????#?具體執(zhí)行的main函數(shù)
????from?celery.bin.celery?import?main?as?_main
????sys.exit(_main())
celery命令作為主命令,加載celery-app的同時,還會啟動worker子命令:
#?ch23-celery/celery-5.0.5/celery/bin/celery.py
def?celery(ctx,?app,?broker,?result_backend,?loader,?config,?workdir,
???????????no_color,?quiet,?version):
????"""Celery?command?entrypoint."""
????...
????ctx.obj?=?CLIContext(app=app,?no_color=no_color,?workdir=workdir,
?????????????????????????quiet=quiet)
????#?worker/beat/events三個主要子命令參數(shù)
????#?User?options
????worker.params.extend(ctx.obj.app.user_options.get('worker',?[]))
????beat.params.extend(ctx.obj.app.user_options.get('beat',?[]))
????events.params.extend(ctx.obj.app.user_options.get('events',?[]))
def?main()?->?int:
????"""Start?celery?umbrella?command.
????This?function?is?the?main?entrypoint?for?the?CLI.
????:return:?The?exit?code?of?the?CLI.
????"""
????return?celery(auto_envvar_prefix="CELERY")
在worker子命令中創(chuàng)建worker并啟動:
#?ch23-celery/celery-5.0.5/celery/bin/worker.py
def?worker(ctx,?hostname=None,?pool_cls=None,?app=None,?uid=None,?gid=None,
???????????loglevel=None,?logfile=None,?pidfile=None,?statedb=None,
???????????**kwargs):
????#?創(chuàng)建和啟動worker
????worker?=?app.Worker(
????????hostname=hostname,?pool_cls=pool_cls,?loglevel=loglevel,
????????logfile=logfile,??#?node?format?handled?by?celery.app.log.setup
????????pidfile=node_format(pidfile,?hostname),
????????statedb=node_format(statedb,?hostname),
????????no_color=ctx.obj.no_color,
????????quiet=ctx.obj.quiet,
????????**kwargs)
????worker.start()
下面是創(chuàng)建worker的方式,創(chuàng)一個?celery.apps.worker:Worker?對象:
#?ch23-celery/celery-5.0.5/celery/app/base.py
def?Worker(self):
????#?創(chuàng)建worker
????return?self.subclass_with_self('celery.apps.worker:Worker')
服務(wù)啟動過程中,調(diào)用鏈路如下:
?????????????????????????????????+----------+
?????????????????????????????+--->app.celery|
?????????????????????????????|???+----------+
+---------+???+----------+???|
|main.main+--->bin.celery+---+
+---------+???+----------+???|
?????????????????????????????|???+----------+???+-----------+
?????????????????????????????+--->bin.worker+--->apps.worker|
?????????????????????????????????+----------+???+-----------+
在這個服務(wù)啟動過程中,創(chuàng)建了celery-application和worker-application兩個應(yīng)用程序。至于具體的啟動流程,我們暫時跳過,先看看客戶端的流程。
client啟動流程分析
示例client的啟動過程包括下面4步: 1 創(chuàng)建celery-application, 2 創(chuàng)建task 3 調(diào)用task的delay方法執(zhí)行任務(wù)得到一個異步結(jié)果 4 最后使用異步結(jié)果的get方法獲取真實(shí)結(jié)果
task是通過app創(chuàng)建的裝飾器創(chuàng)建的Promise對象:
#?ch23-celery/celery-5.0.5/celery/app/base.py
task_cls?=?'celery.app.task:Task'
def?task(self,?*args,?**opts):
????"""Decorator?to?create?a?task?class?out?of?any?callable.
????"""
????def?inner_create_task_cls(shared=True,?filter=None,?lazy=True,?**opts):
????????
????????def?_create_task_cls(fun):
????????????
????????????ret?=?PromiseProxy(self._task_from_fun,?(fun,),?opts,
???????????????????????????????????????__doc__=fun.__doc__)
????????????return?ret
????????return?_create_task_cls
????return?inner_create_task_cls(**opts)
task實(shí)際上是一個由Task基類動態(tài)創(chuàng)建的子類:
def?_task_from_fun(self,?fun,?name=None,?base=None,?bind=False,?**options):
????base?=?base?or?self.Task
????task?=?type(fun.__name__,?(base,),?dict({
????????????????'app':?self,
????????????????'name':?name,
????????????????'run':?run,
????????????????'_decorated':?True,
????????????????'__doc__':?fun.__doc__,
????????????????'__module__':?fun.__module__,
????????????????'__annotations__':?fun.__annotations__,
????????????????'__header__':?staticmethod(head_from_fun(fun,?bound=bind)),
????????????????'__wrapped__':?run},?**options))
????add_autoretry_behaviour(task,?**options)
????#?增加task
????self._tasks[task.name]?=?task
????task.bind(self)??#?connects?task?to?this?app
????add_autoretry_behaviour(task,?**options)
????return?task
任務(wù)的執(zhí)行使用app的send_task方法進(jìn)行:
#?ch23-celery/celery-5.0.5/celery/app/task.py
def?delay(self,?*args,?**kwargs):
????...
????return?app.send_task(
????????????????self.name,?args,?kwargs,?task_id=task_id,?producer=producer,
????????????????link=link,?link_error=link_error,?result_cls=self.AsyncResult,
????????????????shadow=shadow,?task_type=self,
????????????????**options
????????????)
可以看到,client作為生產(chǎn)者啟動任務(wù),也需要創(chuàng)建celery-application,下面我們就先看celery-application的實(shí)現(xiàn)。
celery的app兩大功能
Celery的構(gòu)造函數(shù):
class?Celery:
????
????#?協(xié)議類
????amqp_cls?=?'celery.app.amqp:AMQP'
????backend_cls?=?None
????#?事件類
????events_cls?=?'celery.app.events:Events'
????loader_cls?=?None
????log_cls?=?'celery.app.log:Logging'
????#?控制類
????control_cls?=?'celery.app.control:Control'
????#?任務(wù)類
????task_cls?=?'celery.app.task:Task'
????#?任務(wù)注冊中心
????registry_cls?=?'celery.app.registry:TaskRegistry'
????...
????
????def?__init__(self,?main=None,?loader=None,?backend=None,
?????????????????amqp=None,?events=None,?log=None,?control=None,
?????????????????set_as_current=True,?tasks=None,?broker=None,?include=None,
?????????????????changes=None,?config_source=None,?fixups=None,?task_cls=None,
?????????????????autofinalize=True,?namespace=None,?strict_typing=True,
?????????????????**kwargs):
????????#?啟動步驟
????????self.steps?=?defaultdict(set)
????????#?待執(zhí)行的task
????????self._pending?=?deque()
????????#?所有任務(wù)
????????self._tasks?=?self.registry_cls(self._tasks?or?{})
????????...
????????self.__autoset('broker_url',?broker)
????????self.__autoset('result_backend',?backend)
????????...
????????self.on_init()
????????_register_app(self)
可以看到celery類提供了一些默認(rèn)模塊類的名稱,可以根據(jù)這些類名動態(tài)創(chuàng)建對象。app對象任務(wù)的處理使用一個隊列作為pending狀態(tài)的任務(wù)容器,使用TaskRegistry來管理任務(wù)的注冊。
任務(wù)通過task裝飾器,記錄到celery的TaskRegistry中:
def?task(self,?*args,?**opts):
????...
????#?增加task
????self._tasks[task.name]?=?task
????task.bind(self)??#?connects?task?to?this?app
????add_autoretry_behaviour(task,?**options)
????...
celery另外一個核心功能是提供到broker的連接:
def?_connection(self,?url,?userid=None,?password=None,
????????????????virtual_host=None,?port=None,?ssl=None,
????????????????connect_timeout=None,?transport=None,
????????????????transport_options=None,?heartbeat=None,
????????????????login_method=None,?failover_strategy=None,?**kwargs):
????conf?=?self.conf
????return?self.amqp.Connection(
????????url,
????????userid?or?conf.broker_user,
????????password?or?conf.broker_password,
????????virtual_host?or?conf.broker_vhost,
????????port?or?conf.broker_port,
????????transport=transport?or?conf.broker_transport,
????????ssl=self.either('broker_use_ssl',?ssl),
????????heartbeat=heartbeat,
????????login_method=login_method?or?conf.broker_login_method,
????????failover_strategy=(
????????????failover_strategy?or?conf.broker_failover_strategy
????????),
????????transport_options=dict(
????????????conf.broker_transport_options,?**transport_options?or?{}
????????),
????????connect_timeout=self.either(
????????????'broker_connection_timeout',?connect_timeout
????????),
????)
broker_connection?=?connection
@cached_property
def?amqp(self):
????"""AMQP?related?functionality:?:class:`~@amqp`."""
????return?instantiate(self.amqp_cls,?app=self)
AMQP的實(shí)現(xiàn),是依賴kombu提供的AMQP協(xié)議封裝:
from?kombu?import?Connection,?Consumer,?Exchange,?Producer,?Queue,?pools
class?AMQP:
????"""App?AMQP?API:?app.amqp."""
????Connection?=?Connection
然后使用我們熟悉的Queue,Consumer,Producer進(jìn)行消息的生成和消費(fèi):
def?Queues(self,?queues,?create_missing=None,
???????????autoexchange=None,?max_priority=None):
????...
????return?self.Queues(
????????????queues,?self.default_exchange,?create_missing,
????????????autoexchange,?max_priority,?default_routing_key,
????????)
????????
def?TaskConsumer(self,?channel,?queues=None,?accept=None,?**kw):
????...
????return?self.Consumer(
????????channel,?accept=accept,
????????queues=queues?or?list(self.queues.consume_from.values()),
????????**kw
????)
def?_create_task_sender(self):
????...
????producer.publish(
????????????????body,
????????????????exchange=exchange,
????????????????routing_key=routing_key,
????????????????serializer=serializer?or?default_serializer,
????????????????compression=compression?or?default_compressor,
????????????????retry=retry,?retry_policy=_rp,
????????????????delivery_mode=delivery_mode,?declare=declare,
????????????????headers=headers2,
????????????????**properties
????????????)
????...
celery-app的兩大功能,管理task和管理AMQP連接,我們有一個大概的了解。
worker模式啟動流程
worker模式啟動在WorkController中,將服務(wù)分成不同的階段,然后將各個階段組裝成一個叫做藍(lán)圖(Blueprint)的方式進(jìn)行管理:
class?WorkController:
????#?內(nèi)部類
????class?Blueprint(bootsteps.Blueprint):
????????"""Worker?bootstep?blueprint."""
????????name?=?'Worker'
????????default_steps?=?{
????????????'celery.worker.components:Hub',
????????????'celery.worker.components:Pool',
????????????'celery.worker.components:Beat',
????????????'celery.worker.components:Timer',
????????????'celery.worker.components:StateDB',
????????????'celery.worker.components:Consumer',
????????????'celery.worker.autoscale:WorkerComponent',
????????}
????
????def?__init__(self,?app=None,?hostname=None,?**kwargs):
????????self.blueprint?=?self.Blueprint(
????????????steps=self.app.steps['worker'],
????????????on_start=self.on_start,
????????????on_close=self.on_close,
????????????on_stopped=self.on_stopped,
????????)
????????self.blueprint.apply(self,?**kwargs)
啟動藍(lán)圖:
def?start(self):
????try:
????????#?啟動worker
????????self.blueprint.start(self)
????except?WorkerTerminate:
????????self.terminate()
????except?Exception?as?exc:
????????logger.critical('Unrecoverable?error:?%r',?exc,?exc_info=True)
????????self.stop(exitcode=EX_FAILURE)
????except?SystemExit?as?exc:
????????self.stop(exitcode=exc.code)
????except?KeyboardInterrupt:
????????self.stop(exitcode=EX_FAILURE)
啟動步驟,比較簡單,大概代碼如下:
class?StepType(type):
????"""Meta-class?for?steps."""
????name?=?None
????requires?=?None
class?Step(metaclass=StepType):
????...
????
????def?instantiate(self,?name,?*args,?**kwargs):
????????return?symbol_by_name(name)(*args,?**kwargs)
????
????def?include_if(self,?parent):
????????return?self.enabled
????????
????def?_should_include(self,?parent):
????????if?self.include_if(parent):
????????????return?True,?self.create(parent)
????????return?False,?None
????def?create(self,?parent):
????????"""Create?the?step."""
從Step大概可以看出:
每個步驟,可以有依賴requires 每個步驟,可以有具體的動作instantiate 步驟具有樹狀的父子結(jié)構(gòu),可以自動創(chuàng)建上級步驟
比如一個消費(fèi)者步驟, 依賴Connection步驟。啟動的時候?qū)onnection進(jìn)行消費(fèi)。兩者代碼如下:
class?ConsumerStep(StartStopStep):
????"""Bootstep?that?starts?a?message?consumer."""
????requires?=?('celery.worker.consumer:Connection',)
????consumers?=?None
????def?start(self,?c):
????????channel?=?c.connection.channel()
????????self.consumers?=?self.get_consumers(channel)
????????for?consumer?in?self.consumers?or?[]:
????????????consumer.consume()
class?Connection(bootsteps.StartStopStep):
????"""Service?managing?the?consumer?broker?connection."""
????def?__init__(self,?c,?**kwargs):
????????c.connection?=?None
????????super().__init__(c,?**kwargs)
????def?start(self,?c):
????????c.connection?=?c.connect()
????????info('Connected?to?%s',?c.connection.as_uri())
在Blueprint中創(chuàng)建和管理這些step:
class?Blueprint:
????
????def?__init__(self,?steps=None,?name=None,
?????????????????on_start=None,?on_close=None,?on_stopped=None):
????????self.name?=?name?or?self.name?or?qualname(type(self))
????????#?并集
????????self.types?=?set(steps?or?[])?|?set(self.default_steps)
????????...
????????self.steps?=?{}
????def?apply(self,?parent,?**kwargs):
????????steps?=?self.steps?=?dict(symbol_by_name(step)?for?step?in?self.types)
????????self._debug('Building?graph...')
????????for?S?in?self._finalize_steps(steps):
????????????step?=?S(parent,?**kwargs)
????????????steps[step.name]?=?step
????????????order.append(step)
????????self._debug('New?boot?order:?{%s}',
????????????????????',?'.join(s.alias?for?s?in?self.order))
????????for?step?in?order:
????????????step.include(parent)
????????return?self
啟動Blueprint:
def?start(self,?parent):
????self.state?=?RUN
????if?self.on_start:
????????self.on_start()
????for?i,?step?in?enumerate(s?for?s?in?parent.steps?if?s?is?not?None):
????????self._debug('Starting?%s',?step.alias)
????????self.started?=?i?+?1
????????step.start(parent)
????????logger.debug('^--?substep?ok')
通過將啟動過程拆分成多個step單元,然后組合單元構(gòu)建成graph,逐一啟動。
小結(jié)
本篇我們正式學(xué)習(xí)了一下celery的使用流程,了解celery如果使用redis作為broker,利用服務(wù)作為消費(fèi)者,使用客戶端作為生成者,完成一次遠(yuǎn)程任務(wù)的執(zhí)行。簡單探索worker服務(wù)模式的啟動流程,重點(diǎn)分析celery-application的管理task和管理連接兩大功能實(shí)現(xiàn)。
小技巧
celery中展示了一種動態(tài)創(chuàng)建類和對象的方法:
task?=?type(fun.__name__,?(Task,),?dict({
????????????????'app':?self,
????????????????'name':?name,
????????????????'run':?run,
????????????????'_decorated':?True,
????????????????'__doc__':?fun.__doc__,
????????????????'__module__':?fun.__module__,
????????????????'__annotations__':?fun.__annotations__,
????????????????'__header__':?staticmethod(head_from_fun(fun,?bound=bind)),
????????????????'__wrapped__':?run},?**options))()
通過type函數(shù)創(chuàng)了一個動態(tài)的task子類,然后執(zhí)行?()?實(shí)例化一個task子對象。
參考鏈接
以編程方式定義類 https://python3-cookbook.readthedocs.io/zh_CN/latest/c09/p18_define_classes_programmatically.html

還不過癮?試試它們
▲Python進(jìn)階:自定義對象實(shí)現(xiàn)切片功能
