Python 神器 Celery 源碼閱讀(1)

Celery是一款非常簡(jiǎn)單、靈活、可靠的分布式系統(tǒng),可用于處理大量消息,并且提供了一整套操作此系統(tǒng)的工具。Celery 是一款消息隊(duì)列工具,可用于處理實(shí)時(shí)數(shù)據(jù)以及任務(wù)調(diào)度。
Celery在github上有18k的star和4.1k的fork,非常受歡迎;截止當(dāng)前總共發(fā)布了216個(gè)版本,最近一個(gè)版本是19天前,更新非?;钴S。如果你需要一個(gè)python實(shí)現(xiàn)的任務(wù)調(diào)度框架,首推就是它了。
從這周開(kāi)始,我們一起閱讀celery的源碼,學(xué)習(xí)如何使用celery,了解分布式任務(wù)調(diào)度框架是如何構(gòu)建,深入celery的實(shí)現(xiàn)細(xì)節(jié)。celery代碼量比較大,預(yù)計(jì)需要3~4周的時(shí)間吧。話不多說(shuō),一起開(kāi)始,本周內(nèi)容主要包括下面幾個(gè)部分:
任務(wù)應(yīng)用場(chǎng)景 celery的項(xiàng)目結(jié)構(gòu) promise庫(kù)的實(shí)現(xiàn) 小結(jié) 小技巧
另外非常重要的一點(diǎn)是,考慮到之前的文章,在源碼和解讀上做的不夠好,這次做一個(gè)小小的改進(jìn)。就是我會(huì)把項(xiàng)目源碼增加我自己的注釋?zhuān)蟼鞯絞ithub上。這樣想了解細(xì)節(jié)實(shí)現(xiàn)的小伙伴請(qǐng)使用【閱讀原文】。我的源碼閱讀項(xiàng)目 yuanmahui,也歡迎大家在github上點(diǎn)贊支持?。
celery應(yīng)用場(chǎng)景
我們有這樣一個(gè)flask實(shí)現(xiàn)的web服務(wù),其中hello接口,需要進(jìn)行外部調(diào)用(比如發(fā)送發(fā)送短信驗(yàn)證碼之類(lèi))。這里我們使用 time.sleep 模擬這個(gè)耗時(shí)的操作:
import time
from flask import Flask
app = Flask(__name__)
@app.route('/')
def hello():
time.sleep(1) # 模擬一下耗時(shí)操作
return 'Hello, World!'
可以使用ab來(lái)驗(yàn)證這個(gè)接口的耗時(shí)情況:
# ab -n 10 -c 5 http://127.0.0.1:5000/
...
Requests per second: 3.30 [#/sec] (mean)
Time per request: 1513.100 [ms] (mean)
Time per request: 302.620 [ms] (mean, across all concurrent requests)
Transfer rate: 0.54 [Kbytes/sec] received
Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 0 0.1 0 0
Processing: 1007 1008 1.2 1008 1010
Waiting: 1005 1007 1.0 1007 1008
Total: 1007 1008 1.2 1009 1010
...
測(cè)試顯示這個(gè)接口的平均耗時(shí)需要1.5秒左右,響應(yīng)緩慢。同時(shí),這樣的接口還會(huì)導(dǎo)致前端頁(yè)面的卡頓。要解決這個(gè)問(wèn)題,就可以使用任務(wù)調(diào)度的方式,把這個(gè)耗時(shí)操作轉(zhuǎn)換成背景任務(wù),同時(shí)及時(shí)返回http響應(yīng), 調(diào)整方法如下:
....
@app.route('/')
def hello():
# time.sleep(1)
do_task(id(request)) # 啟動(dòng)背景任務(wù)
return 'Hello, World!'
def do_task(index):
t = threading.Thread(target=lambda idx: time.sleep(1), args=(index,))
t.start()
do_task中新開(kāi)了一個(gè)任務(wù)線程去執(zhí)行這個(gè) time.sleep 操作,當(dāng)前的線程在啟動(dòng)任務(wù)線程后立即返回。再次使用ab對(duì)接口耗時(shí)進(jìn)行測(cè)試:
...
Time per request: 6.304 [ms] (mean)
...
可以發(fā)現(xiàn)使用任務(wù)方式后,hello接口的響應(yīng)效率有了巨大的提升。當(dāng)然這個(gè)簡(jiǎn)單的任務(wù)調(diào)度還有2個(gè)問(wèn)題:
任務(wù)執(zhí)行的結(jié)果沒(méi)法返回給前端 任務(wù)和web服務(wù)在一個(gè)進(jìn)程里執(zhí)行,效率不會(huì)太高 Celery的分布式任務(wù)調(diào)度就可以比較好的解決這2個(gè)問(wèn)題。
celery的項(xiàng)目結(jié)構(gòu)
celery我們選用 5.0.5 版本,首先requirements/default.txt文件描述主要依賴(lài)下面幾個(gè)庫(kù):
billiard celery項(xiàng)目提供的一個(gè)多進(jìn)程池的實(shí)現(xiàn) kombu celery項(xiàng)目提供的一個(gè)消息庫(kù),可以對(duì)接不同的消息隊(duì)列,比如RabbitMQ,Redis vine celery項(xiàng)目提供的一個(gè)promise實(shí)現(xiàn),可以處理任務(wù)的組合和pipline等
celery支持下面3種工作模式:
beat 使用定時(shí)心跳的方式啟動(dòng) multi 使用集群方式啟動(dòng),會(huì)形成多個(gè)工作進(jìn)程 worker 普通的工作進(jìn)程方式啟動(dòng)
celery任務(wù)執(zhí)行的結(jié)果也支持多種存儲(chǔ)方式:
Mongo Redis Elasticsearch ...
celery的并發(fā)也支持多種實(shí)現(xiàn)
多進(jìn)程的fork gevent 多線程 eventlet ...
celery支持工作流
可以根據(jù)函數(shù)簽名進(jìn)行調(diào)度 可以支持鏈?zhǔn)饺蝿?wù) 可以支持分組,和弦...
celery的項(xiàng)目結(jié)構(gòu)就簡(jiǎn)單介紹這些,后續(xù)章節(jié)再進(jìn)行詳細(xì)介紹。
promise庫(kù)的實(shí)現(xiàn)
promise在異步任務(wù)中非常重要,所以celery有個(gè)vine項(xiàng)目實(shí)現(xiàn)了promise功能,在開(kāi)始celery之前,我們先掃清這些外圍障礙。
promise 簡(jiǎn)介
Promise是一個(gè)對(duì)象,它代表了一個(gè)異步操作的最終完成或者失敗。我覺(jué)得MDN中的介紹非常好,我們先了解它,再對(duì)比看看Python中如何實(shí)現(xiàn)它。比如有一個(gè)創(chuàng)建音頻文件的操作,成功和失敗的時(shí)候使用不同的輸出:
// 成功的回調(diào)函數(shù)
function successCallback(result) {
console.log("音頻文件創(chuàng)建成功: " + result);
}
// 失敗的回調(diào)函數(shù)
function failureCallback(error) {
console.log("音頻文件創(chuàng)建失敗: " + error);
}
createAudioFileAsync(audioSettings, successCallback, failureCallback)
傳統(tǒng)的方式就是使用callback方式調(diào)用,把正確和錯(cuò)誤的回調(diào)傳入執(zhí)行函數(shù)createAudioFileAsync中。如果使用Promise方式就會(huì)變成:
# 創(chuàng)建一個(gè)Promise對(duì)象
const promise = createAudioFileAsync(audioSettings);
# 執(zhí)行這個(gè)Promise對(duì)象
promise.then(successCallback, failureCallback);
雖然上面代碼使用的是JavaScript,我相信熟悉python的你也可以正確理解。就上面的例子,還不容易看出Promise的優(yōu)點(diǎn)。繼續(xù)看下面的例子:
doSomething(function(result) {
doSomethingElse(result, function(newResult) {
doThirdThing(newResult, function(finalResult) {
console.log('Got the final result: ' + finalResult);
}, failureCallback);
}, failureCallback);
}, failureCallback);
這里是一個(gè)多重回調(diào)的實(shí)現(xiàn),先處理doSomething,收到結(jié)果后再處理doSomethingElse,最后再執(zhí)行doThirdThing。僅僅3層回調(diào)還可以接收,如果回調(diào)多了以后就形成回調(diào)地獄,代碼丑陋且難用。如果使用Promise實(shí)現(xiàn),就會(huì)變成:
doSomething().then(function(result) {
return doSomethingElse(result);
})
.then(function(newResult) {
return doThirdThing(newResult);
})
.then(function(finalResult) {
console.log('Got the final result: ' + finalResult);
})
.catch(failureCallback);
可以發(fā)現(xiàn)使用Promise方式后,代碼會(huì)變扁平,非常清爽。這里展示了Promise的2個(gè)特點(diǎn):
then函數(shù)執(zhí)行后返回的是一個(gè)Promise對(duì)象 Promise可以進(jìn)行鏈?zhǔn)?chain)調(diào)用
簡(jiǎn)單理解Promise后,我們?cè)倩仡^看doSomething,doSomethingElse和doThirdThing,如果這是3個(gè)任務(wù),需要按順序調(diào)度執(zhí)行,后者需要前者的執(zhí)行結(jié)果作為參數(shù)?所以可以知道,從邏輯上講,Promise功能對(duì)于任務(wù)調(diào)度,非常重要。這種實(shí)現(xiàn)方式,和語(yǔ)言其實(shí)無(wú)關(guān)。
Promise的實(shí)現(xiàn)
先單元測(cè)試用例看從promise的使用:
def test_signal(self):
# mock一個(gè)函數(shù)
callback = Mock(name='callback')
# 創(chuàng)建一個(gè)Promise對(duì)象
a = promise()
# 使用then函數(shù)添加callback
a.then(callback)
# 執(zhí)行Promise對(duì)象
a(42)
# 函數(shù)被調(diào)用,參數(shù)為42
callback.assert_called_once_with(42)
不難看出python版本的Promise使用和JavaScript版本沒(méi)有太大區(qū)別。promise的構(gòu)造函數(shù)如下:
class promise:
...
def __init__(self, fun=None, args=None, kwargs=None,
callback=None, on_error=None, weak=False,
ignore_result=False):
self.weak = weak
self.ignore_result = ignore_result
# 要執(zhí)行的函數(shù)
self.fun = self._get_fun_or_weakref(fun=fun, weak=weak)
# 注意位置參數(shù)是元祖,這樣才可以在call里疊加
self.args = args or ()
# 關(guān)鍵字參數(shù)是字典
self.kwargs = kwargs or {}
# ready,failed,cancelled 三個(gè)狀態(tài),默認(rèn)都是false
self.ready = False
self.failed = False
self.value = None
self.reason = None
# Optimization
# Most promises will only have one callback, so we optimize for this
# case by using a list only when there are multiple callbacks.
# s(calar) pending / l(ist) pending
# 單個(gè)callback/多個(gè)callback
self._svpending = None
self._lvpending = None
self.on_error = on_error
self.cancelled = False
# 可見(jiàn)callback可以通過(guò)參數(shù)傳遞,也可以通過(guò)then函數(shù)傳遞
if callback is not None:
self.then(callback)
每個(gè)Promise有3個(gè)狀態(tài)位:ready,cancelled和failed,默認(rèn)都是false。
重點(diǎn)就是then函數(shù):
def then(self, callback, on_error=None):
# callback是普通函數(shù),就用promise再嵌套一下
if not isinstance(callback, Thenable):
callback = promise(callback, on_error=on_error)
if self.cancelled:
callback.cancel()
return callback
if self.failed:
callback.throw(self.reason)
elif self.ready:
args, kwargs = self.value
callback(*args, **kwargs)
if self._lvpending is None:
svpending = self._svpending
if svpending is not None:
self._svpending, self._lvpending = None, deque([svpending])
else:
# 初始復(fù)制callback給_svpending
# 就是一種遞歸
self._svpending = callback
return callback
# 添加到右側(cè)
self._lvpending.append(callback)
# 返回的是一個(gè)promise可以繼續(xù)then,實(shí)現(xiàn)a.then(fun_x).then(fun_y).then(fun_z)這樣的鏈?zhǔn)秸{(diào)用
return callback
在JavaScript版本已經(jīng)介紹過(guò)Promise的2個(gè)特性就是,then返回一個(gè)新的Promise對(duì)象,又由于返回的是Promise對(duì)象,又可以繼續(xù)執(zhí)行then函數(shù)添加新的callback鏈?zhǔn)秸{(diào)用。
Promise對(duì)象的執(zhí)行就在魔法函數(shù) *__call__:
def __call__(self, *args, **kwargs):
retval = None
if self.cancelled:
return
# 疊加參數(shù)
final_args = self.args + args if args else self.args
final_kwargs = dict(self.kwargs, **kwargs) if kwargs else self.kwargs
# self.fun may be a weakref
fun = self._fun_is_alive(self.fun)
if fun is not None:
try:
if self.ignore_result:
fun(*final_args, **final_kwargs)
ca = ()
ck = {}
else:
# 執(zhí)行函數(shù)
retval = fun(*final_args, **final_kwargs)
self.value = (ca, ck) = (retval,), {}
except Exception:
# 異常
return self.throw()
else:
self.value = (ca, ck) = final_args, final_kwargs
# 更改ready狀態(tài)
self.ready = True
svpending = self._svpending
# 執(zhí)行callback,把fun執(zhí)行的結(jié)果往callback里傳入這樣形成pipeline
if svpending is not None:
try:
svpending(*ca, **ck)
finally:
self._svpending = None
else:
lvpending = self._lvpending
try:
while lvpending:
# 從左開(kāi)始執(zhí)行
p = lvpending.popleft()
p(*ca, **ck)
finally:
self._lvpending = None
return retval
上述代碼主要步驟有:
合并Promise對(duì)象的參數(shù)和函數(shù)調(diào)用參數(shù) 如果有初始函數(shù),則執(zhí)行初始函數(shù) 更改Promise對(duì)象的ready狀態(tài) 執(zhí)行callback
鏈?zhǔn)秸{(diào)用的示例,請(qǐng)看:
def test_chained(self):
def add(x, y):
return x + y
def pow2(x):
return x ** 2
adder = Mock(name='adder')
adder.side_effect = add
power = Mock(name='multiplier')
power.side_effect = pow2
final = Mock(name='final')
p = promise()
# 鏈?zhǔn)秸{(diào)用(注意是有序的)
p.then(adder).then(power).then(final)
p(42, 42)
assert p.value == ((42, 42), {})
adder.assert_called_with(42, 42)
power.assert_called_with(84)
final.assert_called_with(7056)
Barrier的實(shí)現(xiàn)
vine還提供了一個(gè)叫做barrier的實(shí)現(xiàn),處理多個(gè)Promise對(duì)象的串行化,下面是單元測(cè)試:
class test_barrier:
def setup(self):
self.m1, self.m2, self.m3 = Mock(), Mock(), Mock()
self.ps = [promise(self.m1), promise(self.m2), promise(self.m3)]
def test_evaluate(self):
# 需要執(zhí)行4才才變成ready
x = barrier(self.ps)
x()
assert not x.ready
x()
assert not x.ready
x.add(promise())
x()
assert not x.ready
x()
assert x.ready
x()
x()
# 已經(jīng)執(zhí)行完成繼續(xù)添加會(huì)報(bào)錯(cuò)
with pytest.raises(ValueError):
x.add(promise())
使用barrier后,4個(gè)Promise對(duì)象的調(diào)用可以串行化,并且可以單步執(zhí)行。執(zhí)行一次x()消耗一個(gè)Promise。
barrier的構(gòu)造函數(shù):
class barrier:
def __init__(self, promises=None, args=None, kwargs=None,
callback=None, size=None):
# Promise的實(shí)現(xiàn)
self.p = promise()
self.args = args or ()
self.kwargs = kwargs or {}
self._value = 0
self.size = size or 0
if not self.size and promises:
# iter(l) calls len(l) so generator wrappers
# can only return NotImplemented in the case the
# generator is not fully consumed yet.
plen = promises.__len__()
if plen is not NotImplemented:
self.size = plen
self.ready = self.failed = False
self.reason = None
self.cancelled = False
self.finalized = False
# 列表推導(dǎo)式
[self.add_noincr(p) for p in promises or []]
self.finalized = bool(promises or self.size)
if callback:
self.then(callback)
barrier重點(diǎn)是默認(rèn)有一個(gè)promise實(shí)現(xiàn),用來(lái)作為整個(gè)批處理的尾部。參數(shù)中的promises列表通過(guò)add_noincr函數(shù)形成調(diào)用鏈:
def add_noincr(self, p):
if not self.cancelled:
# 已經(jīng)完成了就不能夠再添加了
if self.ready:
raise ValueError('Cannot add promise to full barrier')
# 其實(shí)就是then().then().then() 添加到自己之前,自己主要執(zhí)行最開(kāi)始定義的callback
p.then(self)
每執(zhí)行一次進(jìn)行計(jì)數(shù),直到執(zhí)行完成后更改狀態(tài)和執(zhí)行自身(尾部)的Promise對(duì)象
def __call__(self, *args, **kwargs):
# 判斷是否已經(jīng)執(zhí)行完成:ready和cancelled
if not self.ready and not self.cancelled:
self._value += 1
if self.finalized and self._value >= self.size:
self.ready = True
self.p(*self.args, **self.kwargs)
小結(jié)
我們了解的celery是python實(shí)現(xiàn)的一個(gè)任務(wù)調(diào)度系統(tǒng),在github上廣受歡迎,更新活躍。學(xué)習(xí)可以使用任務(wù)調(diào)度方式,幫助我們處理web服務(wù)中一些耗時(shí)任務(wù)。簡(jiǎn)單了解celery項(xiàng)目的一些特點(diǎn),從celery的依賴(lài)項(xiàng)目vine開(kāi)始,了解Promise在任務(wù)調(diào)度系統(tǒng)中的應(yīng)用。最后從vine項(xiàng)目源碼中學(xué)習(xí),如何創(chuàng)建一個(gè)Promise系統(tǒng)。
小技巧
對(duì)于抽象的實(shí)現(xiàn),在Python中除了可以使用繼承方式,還可以使用組合Mixin。比如下面:
class Thenable(Callable, metaclass=abc.ABCMeta): # pragma: no cover
...
@abc.abstractmethod
def then(self, on_success, on_error=None):
"""成功和失敗的2個(gè)回調(diào)"""
raise NotImplementedError()
class CanThen:
def then(self, x, y):
pass
assert isinstance(CanThen(), Thenable)
可以看到CanThen實(shí)現(xiàn)then函數(shù)后,就可以被認(rèn)定為T(mén)henable的實(shí)現(xiàn),但是CanThen并未繼承自Thenable。這個(gè)魔法主要是由ABCMeta的register和__subclasshook__兩個(gè)方法實(shí)現(xiàn):
class Thenable(Callable, metaclass=abc.ABCMeta):
...
@classmethod
def __subclasshook__(cls, C):
# 也由ABCMeta提供
if cls is Thenable:
if any('then' in B.__dict__ for B in C.__mro__):
return True
return NotImplemented
@classmethod
def register(cls, other):
# overide to return other so `register` can be used as a decorator
# 這個(gè)register方法是由ABCMeta提供,其實(shí)現(xiàn)類(lèi)使用裝飾器方式
# https://docs.python.org/zh-cn/3/library/abc.html
type(cls).register(cls, other)
return other
@Thenable.register
class promise:
pass
assert isinstance(promise(lambda x: x), Thenable)
promise類(lèi)經(jīng)過(guò)Thenable.register類(lèi)裝飾圈注釋一下后,就可以被認(rèn)定位Thenable的實(shí)現(xiàn),并不需要顯示的編寫(xiě)繼承。
雖然遲到了,還是厚臉請(qǐng)大家多多點(diǎn)贊和分享支持,愛(ài)你喲??
參考鏈接
Celery中文手冊(cè) https://www.celerycn.io/ JavaScript的promise實(shí)現(xiàn) https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Guide/Using_promises 源碼匯 https://github.com/game404/yuanmahui

還不過(guò)癮?試試它們
▲任務(wù)隊(duì)列神器:Celery 入門(mén)到進(jìn)階指南
▲Django 3.0+Redis 3.4+Celery 4.4 應(yīng)用開(kāi)發(fā)(附源碼)
▲Schedule—比 Celery 更輕量級(jí)的周期任務(wù)調(diào)度工具
