<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          任務(wù)隊(duì)列神器:Celery 入門到進(jìn)階指南

          共 13068字,需瀏覽 27分鐘

           ·

          2021-03-23 16:33


           △點(diǎn)擊上方Python貓”關(guān)注 ,回復(fù)“1”領(lǐng)取電子書

          作者:wedo實(shí)驗(yàn)君
          來(lái)源:Python中文社區(qū)

          1.什么是celery

          celery是一個(gè)簡(jiǎn)單,靈活、可靠的分布式任務(wù)執(zhí)行框架,可以支持大量任務(wù)的并發(fā)執(zhí)行。celery采用典型生產(chǎn)者和消費(fèi)者模型。生產(chǎn)者提交任務(wù)到任務(wù)隊(duì)列,眾多消費(fèi)者從任務(wù)隊(duì)列中取任務(wù)執(zhí)行。

          1.1 celery架構(gòu)

          Celery由以下三部分構(gòu)成:消息中間件(Broker)、任務(wù)執(zhí)行單元Worker、結(jié)果存儲(chǔ)(Backend)

          • 任務(wù)調(diào)用提交任務(wù)執(zhí)行請(qǐng)求給Broker隊(duì)列
          • 如果是異步任務(wù),worker會(huì)立即從隊(duì)列中取出任務(wù)并執(zhí)行,執(zhí)行結(jié)果保存在Backend中
          • 如果是定時(shí)任務(wù),任務(wù)由Celery Beat進(jìn)程周期性地將任務(wù)發(fā)往Broker隊(duì)列,Worker實(shí)時(shí)監(jiān)視消息隊(duì)列獲取隊(duì)列中的任務(wù)執(zhí)行

          1.2 應(yīng)用場(chǎng)景

          • 大量的長(zhǎng)時(shí)間任務(wù)的異步執(zhí)行, 如上傳大文件
          • 大規(guī)模實(shí)時(shí)任務(wù)執(zhí)行,支持集群部署,如支持高并發(fā)的機(jī)器學(xué)習(xí)推理
          • 定時(shí)任務(wù)執(zhí)行,如定時(shí)發(fā)送郵件,定時(shí)掃描機(jī)器運(yùn)行情況

          2.安裝

          celery安裝非常簡(jiǎn)單, 除了安裝celery,本文中使用redis作為消息隊(duì)列即Broker

          # celery 安裝
          pip install celery
          # celery 監(jiān)控 flower
          pip install flower
          pip install redis
          # redis 安裝
          yum install redis
          # redis啟動(dòng)
          redis-server /etc/redis.conf

          3. 完整例子

          celery的應(yīng)用開(kāi)發(fā)涉及四個(gè)部分

          • celery 實(shí)例初始化
          • 任務(wù)的定義(定時(shí)和實(shí)時(shí)任務(wù))
          • 任務(wù)worker的啟動(dòng)
          • 任務(wù)的調(diào)用

          3.1 項(xiàng)目目錄

          # 項(xiàng)目目錄
          wedo
          .
          ├── config.py
          ├── __init__.py
          ├── period_task.py
          └── tasks.py

          3.2 celery 實(shí)例初始化

          celery的實(shí)例化,主要包括執(zhí)行Broker和backend的訪問(wèn)方式,任務(wù)模塊的申明等

          # celery 實(shí)例初始化 
          # __init__.py
          from celery import Celery
          app = Celery('wedo')  # 創(chuàng)建 Celery 實(shí)例
          app.config_from_object('wedo.config'

          # 配置 wedo.config
          # config.py
          BROKER_URL = 'redis://10.8.238.2:6379/0' # Broker配置,使用Redis作為消息中間件
          CELERY_RESULT_BACKEND = 'redis://10.8.238.2:6379/0' # BACKEND配置,這里使用redis
          CELERY_RESULT_SERIALIZER = 'json' # 結(jié)果序列化方案
          CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務(wù)過(guò)期時(shí)間
          CELERY_TIMEZONE='Asia/Shanghai'   # 時(shí)區(qū)配置
          CELERY_IMPORTS = (     # 指定導(dǎo)入的任務(wù)模塊,可以指定多個(gè)
              'wedo.tasks',
              'wedo.period_task'
          )

          3.3 任務(wù)的定義

          celery中通過(guò)@task的裝飾器來(lái)進(jìn)行申明celery任務(wù),其他操作無(wú)任何差別

          # 任務(wù)的定義
          # 簡(jiǎn)單任務(wù)  tasks.py
          import celery
          import time
          from celery.utils.log import get_task_logger
          from wedo import app

          @app.task
          def sum(x, y):
              return x + y

          @app.task
          def mul(x, y):
              time.sleep(5)
              return x * y

          定時(shí)任務(wù)和實(shí)時(shí)任務(wù)的區(qū)別主要是要申明何時(shí)執(zhí)行任務(wù),任務(wù)本身也是通過(guò)task裝飾器來(lái)申明 何時(shí)執(zhí)行任務(wù)有2種

          • 指定頻率執(zhí)行:sender.add_periodic_task(時(shí)間頻率單位s, 任務(wù)函數(shù), name='to_string')
          • crontab方式:分鐘/小時(shí)/天/月/周粒度, 可以支持多種調(diào)度
          # 任務(wù)的定義
          # 定時(shí)任務(wù)  period_task.py
          from wedo import app
          from celery.schedules import crontab

          @app.on_after_configure.connect
          def setup_periodic_tasks(sender, **kwargs):
              sender.add_periodic_task(5.0, to_string.s("celery peroid task"), name='to_string'# 每5秒執(zhí)行add
              sender.add_periodic_task(
                  crontab(minute='*/10'),      #每10分鐘執(zhí)行一次
                  send_mail.s('hello, this is a celery'), name='send_mail'
              )

          @app.task
          def send_mail(content):
              print('send mail, content is %s' % content)

          @app.task
          def to_string(text):
              return 'this is a %s' % text

          3.4 任務(wù)worker的啟動(dòng)

          任務(wù)啟動(dòng)分為worker啟動(dòng)和定時(shí)任務(wù)beat啟動(dòng)

          # -A wedo為應(yīng)用模塊
          # -l為日志level
          # -c 為進(jìn)程數(shù)
          celery worker -A wedo  -l debug -c 4

          # 后臺(tái)啟動(dòng)
          nohup celery worker -A wedo -l debug -c 4 > ./log.log  2>&1

          # 從下面的日志可以看出啟動(dòng)了4個(gè)任務(wù)
          #   . wedo.period_task.send_mail
          #   . wedo.period_task.to_string
          #   . wedo.tasks.mul
          #   . wedo.tasks.sum

           -------------- [email protected] v4.4.2 (cliffs)
          --- ***** ----- 
          -- ******* ---- Linux-3.10.0-327.28.3.el7.x86_64-x86_64-with-centos-7.2.1511-Core 2020-04-25 23:35:26
          - *** --- * --- 
          - ** ---------- [config]
          - ** ---------- .> app:         wedo:0x7f05af30d320
          - ** ---------- .> transport:   redis://10.8.238.2:6379/0
          - ** ---------- .> results:     redis://10.8.238.2:6379/0
          - *** --- * --- .> concurrency: 4 (prefork)
          -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
          --- ***** ----- 
           -------------- [queues]
                          .> celery           exchange=celery(direct) key=celery
                          

          [tasks]
            . celery.accumulate
            . celery.backend_cleanup
          ...
            . wedo.period_task.send_mail
            . wedo.period_task.to_string
            . wedo.tasks.mul
            . wedo.tasks.sum
          ...
          [2020-04-25 23:35:27,617: INFO/MainProcess] [email protected] ready.
          [2020-04-25 23:35:27,617: DEBUG/MainProcess] basic.qos: prefetch_count->16
          [2020-04-25 23:35:27,655: DEBUG/MainProcess] celery@12103675 joined the party

          celery beat -A wedo.period_task

          celery beat v4.4.2 (cliffs) is starting.
          __    -    ... __   -        _
          LocalTime -> 2020-04-25 23:37:08
          Configuration ->
              . broker -> redis://10.8.238.2:6379/0
              . loader -> celery.loaders.app.AppLoader
              . scheduler -> celery.beat.PersistentScheduler
              . db -> celerybeat-schedule
              . logfile -> [stderr]@%WARNING
              . maxinterval -> 5.00 minutes (300s)
          # worker啟動(dòng)是4個(gè)進(jìn)程
          \_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4    
              \_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
              \_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
              \_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
              \_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4

          worker和beat的停止

          ps auxww | awk '/celery worker/ {print $2}' | xargs kill -9
          ps auxww | awk '/celery beat/ {print $2}' | xargs kill -9

          3.5 任務(wù)的調(diào)用

          任務(wù)worker已經(jīng)啟動(dòng)好了,通過(guò)任務(wù)調(diào)用傳遞給broker(redis),并返回任務(wù)執(zhí)行結(jié)果 任務(wù)調(diào)用主要有兩種,本質(zhì)是一致的,delay是apply_async的封裝,apply_async可以支持更多的任務(wù)調(diào)用配置

          • task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})
          • task.delay(arg1, arg2, kwarg1='x', kwarg2='y')

          apply_async和delay會(huì)返回一個(gè)異步的任務(wù)結(jié)果,AsyncResult中存儲(chǔ)了任務(wù)的執(zhí)行狀態(tài)和結(jié)果,常用的操作

          value = result.get() # 任務(wù)返回值
          print(result.__dict__) # 結(jié)果信息
          print(result.successful()) # 是否成功
          print(result.fail()) # 是否失敗
          print(result.ready()) # 是否執(zhí)行完成
          print(result.state) # 狀態(tài) PENDING -> STARTED -> SUCCESS/FAIL

          常規(guī)任務(wù):

          from celery.utils.log import get_logger
          from wedo.tasks import sum, mul, post_file
          from celery import group, chain, chord
          logger = get_logger(__name__)
          try:
              result = mul.apply_async(args=(22))
              value = result.get() # 等待任務(wù)執(zhí)行完畢后,才會(huì)返回任務(wù)返回值
              print(value)
          except mul.OperationalError as exc: # 任務(wù)異常處理
              logger.exception('Sending task raised: %r', exc)

          組合任務(wù):

          • 多個(gè)任務(wù)并行執(zhí)行, group
          • 多個(gè)任務(wù)鏈?zhǔn)綀?zhí)行,chain:第一個(gè)任務(wù)的返回值作為第二個(gè)的輸入?yún)?shù),以此類推
          result = group(sum.s(i, i) for i in range(5))()
          result.get()
          # [0, 2, 4, 6, 8]
          result = chain(sum.s(1,2), sum.s(3), mul.s(3))()
          result.get()
          # ((1+2)+3)*3=18

          4. 分布式集群部署

          celery作為分布式的任務(wù)隊(duì)列框架,worker是可以執(zhí)行在不同的服務(wù)器上的。部署過(guò)程和單機(jī)上啟動(dòng)是一樣。只要把項(xiàng)目代碼copy到其他服務(wù)器,使用相同命令就可以了。可以思考下,這個(gè)是怎么實(shí)現(xiàn)的?對(duì)了,就是通過(guò)共享Broker隊(duì)列。使用合適的隊(duì)列,如redis,單進(jìn)程單線程的方式可以有效的避免同個(gè)任務(wù)被不同worker同時(shí)執(zhí)行的情況。

          celery worker -A wedo  -l debug -c 4
          • 分布式集群如下:

          5. 進(jìn)階使用

          在前面已經(jīng)了解了celery的主要的功能了。celery還為一些特別的場(chǎng)景提供了需要擴(kuò)展的功能

          5.1 任務(wù)狀態(tài)跟蹤和日志

          有時(shí)候我們需要對(duì)任務(wù)的執(zhí)行情況做一些監(jiān)控,比如失敗后報(bào)警通知。

          • celery在裝飾器@app.task中提供了base參數(shù),傳入重寫的Task模塊,重新on_*函數(shù)就可以控制不同的任務(wù)結(jié)果
          • 在@app.task提供bind=True,可以通過(guò)self獲取Task中各種參數(shù)
            • self.request:任務(wù)的各種參數(shù)
            • self.update_state: 自定義任務(wù)狀態(tài), 原有的任務(wù)狀態(tài):PENDING -> STARTED -> SUCCESS, 如果你想了解STARTED -> SUCCESS之間的一個(gè)狀態(tài),比如執(zhí)行的百分比之類,可以通過(guò)自定義狀態(tài)來(lái)實(shí)現(xiàn)
            • self.retry: 重試
          import celery
          import time
          from celery.utils.log import get_task_logger
          from wedo import app

          logger = logger = get_task_logger(__name__)
          class TaskMonitor(celery.Task):
              def on_failure(self, exc, task_id, args, kwargs, einfo):
                  """failed callback"""
                  logger.info('task id: {0!r} failed: {1!r}'.format(task_id, exc))

              def on_success(self, retval, task_id, args, kwargs):
                  """success callback"""
                  logger.info('task id:{} , arg:{} , successful !'.format(task_id,args))

              def on_retry(self, exc, task_id, args, kwargs, einfo):
                  """retry callback"""
                  logger.info('task id:{} , arg:{} , retry !  einfo: {}'.format(task_id, args, exc))

          @app.task(base=TaskMonitor, bind=True, name='post_file')
          def post_file(self, file_names):
              logger.info(self.request.__dict__)
              try:
                  for i, file in enumerate(file_names):
                      print('the file %s is posted' % file)
                      if not self.request.called_directly:
                          self.update_state(state='PROGRESS',
                              meta={'current': i, 'total': len(file_names)})
                      time.sleep(2)
              except Exception as exec:
                  raise self.retry(exc=exec, countdown=3, max_retries=5)

          5.2 任務(wù)指定特定的worker執(zhí)行

          celery做為支持分布式,理論上可以無(wú)限擴(kuò)展worker。默認(rèn)情況下celery提交任務(wù)后,任務(wù)會(huì)放入名為celery的隊(duì)列,所有在線的worker都會(huì)從任務(wù)隊(duì)列中獲取任務(wù),任一個(gè)worker都有可能執(zhí)行這個(gè)任務(wù)。有時(shí)候,有時(shí)候任務(wù)的特殊性或者機(jī)器本身的限制,某些任務(wù)只能跑在某些worker上。celery提供了queue在區(qū)別不同的worker,很好的支持這種情況。

          • 啟動(dòng)worker時(shí),-Q 指定worker支持的任務(wù)列隊(duì)名, 可以支持多個(gè)隊(duì)列名哦
          celery worker -A wedo  -l debug -c 4 -Q celery,hipri
          • 任務(wù)調(diào)用時(shí), queue=*來(lái)指定需要執(zhí)行worker
          result = mul.apply_async(args=(22), queue='hipri')

          6. 任務(wù)隊(duì)列監(jiān)控

          如果你想通過(guò)可視化的方式,查看celery的一切。flower提供可行的解決方案,十分的方便

          flower -A wedo --port=6006
          # web訪問(wèn) http://10.8.238.2:6006/

          7. 總結(jié)

          本文和大家了介紹了分布式的隊(duì)列celery, 妥妥的很全吧, 歡迎交流。總結(jié)下內(nèi)容:

          • celery為分布式隊(duì)列, 通過(guò)消息隊(duì)列連接任務(wù)提交和執(zhí)行者worker, 松耦合模式,可擴(kuò)展
          • celery消息隊(duì)列建議為redis
          • celery通過(guò)@app.task裝飾把普通任務(wù)變成celery Task
          • celery worker 通過(guò)不同queue支持特定的worker消費(fèi)特定的任務(wù)
          • @app.task中可以同步base和bind參數(shù)獲取更過(guò)的控制任務(wù)生命周期
          • flower監(jiān)控celery全過(guò)程
          • celery doc:https://docs.celeryproject.org/en/master/getting-started/index.html
          Python貓技術(shù)交流群開(kāi)放啦!群里既有國(guó)內(nèi)一二線大廠在職員工,也有國(guó)內(nèi)外高校在讀學(xué)生,既有十多年碼齡的編程老鳥(niǎo),也有中小學(xué)剛剛?cè)腴T的新人,學(xué)習(xí)氛圍良好!想入群的同學(xué),請(qǐng)?jiān)诠?hào)內(nèi)回復(fù)『交流群』,獲取貓哥的微信(謝絕廣告黨,非誠(chéng)勿擾?。?/span>~

          近期熱門文章推薦:

          徹底掌握Python函數(shù)的5種參數(shù)
          餓了么交易系統(tǒng) 5 年演化史
          Python 為什么不支持 switch 語(yǔ)句?
          2021年,你應(yīng)該知道的Python打包指南

          感謝創(chuàng)作者的好文
          瀏覽 108
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  99久久久无码囯产精品 | 免费观看黄色录像 | 色老板免费网址 | 天天插一插 | 无码免费观看视频 |