任務(wù)隊(duì)列神器:Celery 入門到進(jìn)階指南

1.什么是celery
celery是一個(gè)簡(jiǎn)單,靈活、可靠的分布式任務(wù)執(zhí)行框架,可以支持大量任務(wù)的并發(fā)執(zhí)行。celery采用典型生產(chǎn)者和消費(fèi)者模型。生產(chǎn)者提交任務(wù)到任務(wù)隊(duì)列,眾多消費(fèi)者從任務(wù)隊(duì)列中取任務(wù)執(zhí)行。
1.1 celery架構(gòu)
Celery由以下三部分構(gòu)成:消息中間件(Broker)、任務(wù)執(zhí)行單元Worker、結(jié)果存儲(chǔ)(Backend)

任務(wù)調(diào)用提交任務(wù)執(zhí)行請(qǐng)求給Broker隊(duì)列 如果是異步任務(wù),worker會(huì)立即從隊(duì)列中取出任務(wù)并執(zhí)行,執(zhí)行結(jié)果保存在Backend中 如果是定時(shí)任務(wù),任務(wù)由Celery Beat進(jìn)程周期性地將任務(wù)發(fā)往Broker隊(duì)列,Worker實(shí)時(shí)監(jiān)視消息隊(duì)列獲取隊(duì)列中的任務(wù)執(zhí)行
1.2 應(yīng)用場(chǎng)景
大量的長(zhǎng)時(shí)間任務(wù)的異步執(zhí)行, 如上傳大文件 大規(guī)模實(shí)時(shí)任務(wù)執(zhí)行,支持集群部署,如支持高并發(fā)的機(jī)器學(xué)習(xí)推理 定時(shí)任務(wù)執(zhí)行,如定時(shí)發(fā)送郵件,定時(shí)掃描機(jī)器運(yùn)行情況
2.安裝
celery安裝非常簡(jiǎn)單, 除了安裝celery,本文中使用redis作為消息隊(duì)列即Broker
# celery 安裝
pip install celery
# celery 監(jiān)控 flower
pip install flower
pip install redis
# redis 安裝
yum install redis
# redis啟動(dòng)
redis-server /etc/redis.conf
3. 完整例子
celery的應(yīng)用開(kāi)發(fā)涉及四個(gè)部分
celery 實(shí)例初始化 任務(wù)的定義(定時(shí)和實(shí)時(shí)任務(wù)) 任務(wù)worker的啟動(dòng) 任務(wù)的調(diào)用
3.1 項(xiàng)目目錄
# 項(xiàng)目目錄
wedo
.
├── config.py
├── __init__.py
├── period_task.py
└── tasks.py
3.2 celery 實(shí)例初始化
celery的實(shí)例化,主要包括執(zhí)行Broker和backend的訪問(wèn)方式,任務(wù)模塊的申明等
# celery 實(shí)例初始化
# __init__.py
from celery import Celery
app = Celery('wedo') # 創(chuàng)建 Celery 實(shí)例
app.config_from_object('wedo.config')
# 配置 wedo.config
# config.py
BROKER_URL = 'redis://10.8.238.2:6379/0' # Broker配置,使用Redis作為消息中間件
CELERY_RESULT_BACKEND = 'redis://10.8.238.2:6379/0' # BACKEND配置,這里使用redis
CELERY_RESULT_SERIALIZER = 'json' # 結(jié)果序列化方案
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務(wù)過(guò)期時(shí)間
CELERY_TIMEZONE='Asia/Shanghai' # 時(shí)區(qū)配置
CELERY_IMPORTS = ( # 指定導(dǎo)入的任務(wù)模塊,可以指定多個(gè)
'wedo.tasks',
'wedo.period_task'
)
3.3 任務(wù)的定義
celery中通過(guò)@task的裝飾器來(lái)進(jìn)行申明celery任務(wù),其他操作無(wú)任何差別
# 任務(wù)的定義
# 簡(jiǎn)單任務(wù) tasks.py
import celery
import time
from celery.utils.log import get_task_logger
from wedo import app
@app.task
def sum(x, y):
return x + y
@app.task
def mul(x, y):
time.sleep(5)
return x * y
定時(shí)任務(wù)和實(shí)時(shí)任務(wù)的區(qū)別主要是要申明何時(shí)執(zhí)行任務(wù),任務(wù)本身也是通過(guò)task裝飾器來(lái)申明 何時(shí)執(zhí)行任務(wù)有2種
指定頻率執(zhí)行:sender.add_periodic_task(時(shí)間頻率單位s, 任務(wù)函數(shù), name='to_string') crontab方式:分鐘/小時(shí)/天/月/周粒度, 可以支持多種調(diào)度
# 任務(wù)的定義
# 定時(shí)任務(wù) period_task.py
from wedo import app
from celery.schedules import crontab
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(5.0, to_string.s("celery peroid task"), name='to_string') # 每5秒執(zhí)行add
sender.add_periodic_task(
crontab(minute='*/10'), #每10分鐘執(zhí)行一次
send_mail.s('hello, this is a celery'), name='send_mail'
)
@app.task
def send_mail(content):
print('send mail, content is %s' % content)
@app.task
def to_string(text):
return 'this is a %s' % text
3.4 任務(wù)worker的啟動(dòng)
任務(wù)啟動(dòng)分為worker啟動(dòng)和定時(shí)任務(wù)beat啟動(dòng)
# -A wedo為應(yīng)用模塊
# -l為日志level
# -c 為進(jìn)程數(shù)
celery worker -A wedo -l debug -c 4
# 后臺(tái)啟動(dòng)
nohup celery worker -A wedo -l debug -c 4 > ./log.log 2>&1
# 從下面的日志可以看出啟動(dòng)了4個(gè)任務(wù)
# . wedo.period_task.send_mail
# . wedo.period_task.to_string
# . wedo.tasks.mul
# . wedo.tasks.sum
-------------- [email protected] v4.4.2 (cliffs)
--- ***** -----
-- ******* ---- Linux-3.10.0-327.28.3.el7.x86_64-x86_64-with-centos-7.2.1511-Core 2020-04-25 23:35:26
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: wedo:0x7f05af30d320
- ** ---------- .> transport: redis://10.8.238.2:6379/0
- ** ---------- .> results: redis://10.8.238.2:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. celery.accumulate
. celery.backend_cleanup
...
. wedo.period_task.send_mail
. wedo.period_task.to_string
. wedo.tasks.mul
. wedo.tasks.sum
...
[2020-04-25 23:35:27,617: INFO/MainProcess] [email protected] ready.
[2020-04-25 23:35:27,617: DEBUG/MainProcess] basic.qos: prefetch_count->16
[2020-04-25 23:35:27,655: DEBUG/MainProcess] celery@12103675 joined the party
celery beat -A wedo.period_task
celery beat v4.4.2 (cliffs) is starting.
__ - ... __ - _
LocalTime -> 2020-04-25 23:37:08
Configuration ->
. broker -> redis://10.8.238.2:6379/0
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%WARNING
. maxinterval -> 5.00 minutes (300s)
# worker啟動(dòng)是4個(gè)進(jìn)程
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
worker和beat的停止
ps auxww | awk '/celery worker/ {print $2}' | xargs kill -9
ps auxww | awk '/celery beat/ {print $2}' | xargs kill -9
3.5 任務(wù)的調(diào)用
任務(wù)worker已經(jīng)啟動(dòng)好了,通過(guò)任務(wù)調(diào)用傳遞給broker(redis),并返回任務(wù)執(zhí)行結(jié)果 任務(wù)調(diào)用主要有兩種,本質(zhì)是一致的,delay是apply_async的封裝,apply_async可以支持更多的任務(wù)調(diào)用配置
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'}) task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
apply_async和delay會(huì)返回一個(gè)異步的任務(wù)結(jié)果
value = result.get() # 任務(wù)返回值
print(result.__dict__) # 結(jié)果信息
print(result.successful()) # 是否成功
print(result.fail()) # 是否失敗
print(result.ready()) # 是否執(zhí)行完成
print(result.state) # 狀態(tài) PENDING -> STARTED -> SUCCESS/FAIL
常規(guī)任務(wù):
from celery.utils.log import get_logger
from wedo.tasks import sum, mul, post_file
from celery import group, chain, chord
logger = get_logger(__name__)
try:
result = mul.apply_async(args=(2, 2))
value = result.get() # 等待任務(wù)執(zhí)行完畢后,才會(huì)返回任務(wù)返回值
print(value)
except mul.OperationalError as exc: # 任務(wù)異常處理
logger.exception('Sending task raised: %r', exc)
組合任務(wù):
多個(gè)任務(wù)并行執(zhí)行, group 多個(gè)任務(wù)鏈?zhǔn)綀?zhí)行,chain:第一個(gè)任務(wù)的返回值作為第二個(gè)的輸入?yún)?shù),以此類推
result = group(sum.s(i, i) for i in range(5))()
result.get()
# [0, 2, 4, 6, 8]
result = chain(sum.s(1,2), sum.s(3), mul.s(3))()
result.get()
# ((1+2)+3)*3=18
4. 分布式集群部署
celery作為分布式的任務(wù)隊(duì)列框架,worker是可以執(zhí)行在不同的服務(wù)器上的。部署過(guò)程和單機(jī)上啟動(dòng)是一樣。只要把項(xiàng)目代碼copy到其他服務(wù)器,使用相同命令就可以了。可以思考下,這個(gè)是怎么實(shí)現(xiàn)的?對(duì)了,就是通過(guò)共享Broker隊(duì)列。使用合適的隊(duì)列,如redis,單進(jìn)程單線程的方式可以有效的避免同個(gè)任務(wù)被不同worker同時(shí)執(zhí)行的情況。
celery worker -A wedo -l debug -c 4
分布式集群如下:

5. 進(jìn)階使用
在前面已經(jīng)了解了celery的主要的功能了。celery還為一些特別的場(chǎng)景提供了需要擴(kuò)展的功能
5.1 任務(wù)狀態(tài)跟蹤和日志
有時(shí)候我們需要對(duì)任務(wù)的執(zhí)行情況做一些監(jiān)控,比如失敗后報(bào)警通知。
celery在裝飾器@app.task中提供了base參數(shù),傳入重寫的Task模塊,重新on_*函數(shù)就可以控制不同的任務(wù)結(jié)果 在@app.task提供bind=True,可以通過(guò)self獲取Task中各種參數(shù) self.request:任務(wù)的各種參數(shù) self.update_state: 自定義任務(wù)狀態(tài), 原有的任務(wù)狀態(tài):PENDING -> STARTED -> SUCCESS, 如果你想了解STARTED -> SUCCESS之間的一個(gè)狀態(tài),比如執(zhí)行的百分比之類,可以通過(guò)自定義狀態(tài)來(lái)實(shí)現(xiàn) self.retry: 重試
import celery
import time
from celery.utils.log import get_task_logger
from wedo import app
logger = logger = get_task_logger(__name__)
class TaskMonitor(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""failed callback"""
logger.info('task id: {0!r} failed: {1!r}'.format(task_id, exc))
def on_success(self, retval, task_id, args, kwargs):
"""success callback"""
logger.info('task id:{} , arg:{} , successful !'.format(task_id,args))
def on_retry(self, exc, task_id, args, kwargs, einfo):
"""retry callback"""
logger.info('task id:{} , arg:{} , retry ! einfo: {}'.format(task_id, args, exc))
@app.task(base=TaskMonitor, bind=True, name='post_file')
def post_file(self, file_names):
logger.info(self.request.__dict__)
try:
for i, file in enumerate(file_names):
print('the file %s is posted' % file)
if not self.request.called_directly:
self.update_state(state='PROGRESS',
meta={'current': i, 'total': len(file_names)})
time.sleep(2)
except Exception as exec:
raise self.retry(exc=exec, countdown=3, max_retries=5)
5.2 任務(wù)指定特定的worker執(zhí)行
celery做為支持分布式,理論上可以無(wú)限擴(kuò)展worker。默認(rèn)情況下celery提交任務(wù)后,任務(wù)會(huì)放入名為celery的隊(duì)列,所有在線的worker都會(huì)從任務(wù)隊(duì)列中獲取任務(wù),任一個(gè)worker都有可能執(zhí)行這個(gè)任務(wù)。有時(shí)候,有時(shí)候任務(wù)的特殊性或者機(jī)器本身的限制,某些任務(wù)只能跑在某些worker上。celery提供了queue在區(qū)別不同的worker,很好的支持這種情況。
啟動(dòng)worker時(shí),-Q 指定worker支持的任務(wù)列隊(duì)名, 可以支持多個(gè)隊(duì)列名哦
celery worker -A wedo -l debug -c 4 -Q celery,hipri
任務(wù)調(diào)用時(shí), queue=*來(lái)指定需要執(zhí)行worker
result = mul.apply_async(args=(2, 2), queue='hipri')
6. 任務(wù)隊(duì)列監(jiān)控
如果你想通過(guò)可視化的方式,查看celery的一切。flower提供可行的解決方案,十分的方便
flower -A wedo --port=6006
# web訪問(wèn) http://10.8.238.2:6006/


7. 總結(jié)
本文和大家了介紹了分布式的隊(duì)列celery, 妥妥的很全吧, 歡迎交流。總結(jié)下內(nèi)容:
celery為分布式隊(duì)列, 通過(guò)消息隊(duì)列連接任務(wù)提交和執(zhí)行者worker, 松耦合模式,可擴(kuò)展 celery消息隊(duì)列建議為redis celery通過(guò)@app.task裝飾把普通任務(wù)變成celery Task celery worker 通過(guò)不同queue支持特定的worker消費(fèi)特定的任務(wù) @app.task中可以同步base和bind參數(shù)獲取更過(guò)的控制任務(wù)生命周期 flower監(jiān)控celery全過(guò)程 celery doc:https://docs.celeryproject.org/en/master/getting-started/index.html

近期熱門文章推薦:

