<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Python 爬蟲 | 任務(wù)調(diào)度之 Celery 從入門到進(jìn)階

          共 12764字,需瀏覽 26分鐘

           ·

          2021-05-12 18:32

          作者:wedo實(shí)驗君
          整理:Python 編程時光

          1.什么是celery

          celery是一個簡單,靈活、可靠的分布式任務(wù)執(zhí)行框架,可以支持大量任務(wù)的并發(fā)執(zhí)行。celery采用典型生產(chǎn)者和消費(fèi)者模型。生產(chǎn)者提交任務(wù)到任務(wù)隊列,眾多消費(fèi)者從任務(wù)隊列中取任務(wù)執(zhí)行。

          1.1 celery架構(gòu)

          Celery由以下三部分構(gòu)成:消息中間件(Broker)、任務(wù)執(zhí)行單元Worker、結(jié)果存儲(Backend)

          • 任務(wù)調(diào)用提交任務(wù)執(zhí)行請求給Broker隊列

          • 如果是異步任務(wù),worker會立即從隊列中取出任務(wù)并執(zhí)行,執(zhí)行結(jié)果保存在Backend中

          • 如果是定時任務(wù),任務(wù)由Celery Beat進(jìn)程周期性地將任務(wù)發(fā)往Broker隊列,Worker實(shí)時監(jiān)視消息隊列獲取隊列中的任務(wù)執(zhí)行

          1.2 應(yīng)用場景

          • 大量的長時間任務(wù)的異步執(zhí)行, 如上傳大文件

          • 大規(guī)模實(shí)時任務(wù)執(zhí)行,支持集群部署,如支持高并發(fā)的機(jī)器學(xué)習(xí)推理

          • 定時任務(wù)執(zhí)行,如定時發(fā)送郵件,定時掃描機(jī)器運(yùn)行情況

          2.安裝

          celery安裝非常簡單, 除了安裝celery,本文中使用redis作為消息隊列即Broker

          # celery 安裝
          pip install celery
          # celery 監(jiān)控 flower
          pip install flower
          pip install redis
          # redis 安裝
          yum install redis
          # redis啟動
          redis-server /etc/redis.conf

          3. 完整例子

          celery的應(yīng)用開發(fā)涉及四個部分

          • celery 實(shí)例初始化

          • 任務(wù)的定義(定時和實(shí)時任務(wù))

          • 任務(wù)worker的啟動

          • 任務(wù)的調(diào)用

          3.1 項目目錄

          # 項目目錄
          wedo
          .
          ├── config.py
          ├── __init__.py
          ├── period_task.py
          └── tasks.py

          3.2 celery 實(shí)例初始化

          celery的實(shí)例化,主要包括執(zhí)行Broker和backend的訪問方式,任務(wù)模塊的申明等

          # celery 實(shí)例初始化 
          # __init__.py
          from celery import Celery
          app = Celery('wedo')  # 創(chuàng)建 Celery 實(shí)例
          app.config_from_object('wedo.config') 

          # 配置 wedo.config
          # config.py
          BROKER_URL = 'redis://10.8.238.2:6379/0' # Broker配置,使用Redis作為消息中間件
          CELERY_RESULT_BACKEND = 'redis://10.8.238.2:6379/0' # BACKEND配置,這里使用redis
          CELERY_RESULT_SERIALIZER = 'json' # 結(jié)果序列化方案
          CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務(wù)過期時間
          CELERY_TIMEZONE='Asia/Shanghai'   # 時區(qū)配置
          CELERY_IMPORTS = (     # 指定導(dǎo)入的任務(wù)模塊,可以指定多個
              'wedo.tasks',
              'wedo.period_task'
          )

          3.3 任務(wù)的定義

          celery中通過@task的裝飾器來進(jìn)行申明celery任務(wù),其他操作無任何差別

          # 任務(wù)的定義
          # 簡單任務(wù)  tasks.py
          import celery
          import time
          from celery.utils.log import get_task_logger
          from wedo import app

          @app.task
          def sum(x, y):
              return x + y

          @app.task
          def mul(x, y):
              time.sleep(5)
              return x * y

          定時任務(wù)和實(shí)時任務(wù)的區(qū)別主要是要申明何時執(zhí)行任務(wù),任務(wù)本身也是通過task裝飾器來申明 何時執(zhí)行任務(wù)有2種

          • 指定頻率執(zhí)行:sender.add_periodic_task(時間頻率單位s, 任務(wù)函數(shù), name='to_string')

          • crontab方式:分鐘/小時/天/月/周粒度, 可以支持多種調(diào)度

          # 任務(wù)的定義
          # 定時任務(wù)  period_task.py
          from wedo import app
          from celery.schedules import crontab

          @app.on_after_configure.connect
          def setup_periodic_tasks(sender, **kwargs):
              sender.add_periodic_task(5.0, to_string.s("celery peroid task"), name='to_string'# 每5秒執(zhí)行add
              sender.add_periodic_task(
                  crontab(minute='*/10'),      #每10分鐘執(zhí)行一次
                  send_mail.s('hello, this is a celery'), name='send_mail'
              )

          @app.task
          def send_mail(content):
              print('send mail, content is %s' % content)

          @app.task
          def to_string(text):
              return 'this is a %s' % text

          3.4 任務(wù)worker的啟動

          任務(wù)啟動分為worker啟動和定時任務(wù)beat啟動

          # -A wedo為應(yīng)用模塊
          # -l為日志level
          # -c 為進(jìn)程數(shù)
          celery worker -A wedo  -l debug -c 4

          # 后臺啟動
          nohup celery worker -A wedo -l debug -c 4 > ./log.log  2>&1

          # 從下面的日志可以看出啟動了4個任務(wù)
          #   . wedo.period_task.send_mail
          #   . wedo.period_task.to_string
          #   . wedo.tasks.mul
          #   . wedo.tasks.sum

           -------------- celery@localhost.localdomain v4.4.2 (cliffs)
          --- ***** ----- 
          -- ******* ---- Linux-3.10.0-327.28.3.el7.x86_64-x86_64-with-centos-7.2.1511-Core 2020-04-25 23:35:26
          - *** --- * --- 
          - ** ---------- [config]
          - ** ---------- .> app:         wedo:0x7f05af30d320
          - ** ---------- .> transport:   redis://10.8.238.2:6379/0
          - ** ---------- .> results:     redis://10.8.238.2:6379/0
          - *** --- * --- .> concurrency: 4 (prefork)
          -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
          --- ***** ----- 
           -------------- [queues]
                          .> celery           exchange=celery(direct) key=celery


          [tasks]
            . celery.accumulate
            . celery.backend_cleanup
          ...
            . wedo.period_task.send_mail
            . wedo.period_task.to_string
            . wedo.tasks.mul
            . wedo.tasks.sum
          ...
          [2020-04-25 23:35:27,617: INFO/MainProcess] celery@localhost.localdomain ready.
          [2020-04-25 23:35:27,617: DEBUG/MainProcess] basic.qos: prefetch_count->16
          [2020-04-25 23:35:27,655: DEBUG/MainProcess] celery@12103675 joined the party
          celery beat -A wedo.period_task

          celery beat v4.4.2 (cliffs) is starting.
          __    -    ... __   -        _
          LocalTime -> 2020-04-25 23:37:08
          Configuration ->
              . broker -> redis://10.8.238.2:6379/0
              . loader -> celery.loaders.app.AppLoader
              . scheduler -> celery.beat.PersistentScheduler
              . db -> celerybeat-schedule
              . logfile -> [stderr]@%WARNING
              . maxinterval -> 5.00 minutes (300s)
          # worker啟動是4個進(jìn)程
          \_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4    
              \_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
              \_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
              \_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
              \_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4

          worker和beat的停止

          ps auxww | awk '/celery worker/ {print $2}' | xargs kill -9
          ps auxww | awk '/celery beat/ {print $2}' | xargs kill -9

          3.5 任務(wù)的調(diào)用

          任務(wù)worker已經(jīng)啟動好了,通過任務(wù)調(diào)用傳遞給broker(redis),并返回任務(wù)執(zhí)行結(jié)果 任務(wù)調(diào)用主要有兩種,本質(zhì)是一致的,delay是apply_async的封裝,apply_async可以支持更多的任務(wù)調(diào)用配置

          • task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})

          • task.delay(arg1, arg2, kwarg1='x', kwarg2='y')

          apply_async和delay會返回一個異步的任務(wù)結(jié)果,AsyncResult中存儲了任務(wù)的執(zhí)行狀態(tài)和結(jié)果,常用的操作

          value = result.get() # 任務(wù)返回值
          print(result.__dict__) # 結(jié)果信息
          print(result.successful()) # 是否成功
          print(result.fail()) # 是否失敗
          print(result.ready()) # 是否執(zhí)行完成
          print(result.state) # 狀態(tài) PENDING -> STARTED -> SUCCESS/FAIL

          常規(guī)任務(wù):

          from celery.utils.log import get_logger
          from wedo.tasks import sum, mul, post_file
          from celery import group, chain, chord
          logger = get_logger(__name__)
          try:
              result = mul.apply_async(args=(22))
              value = result.get() # 等待任務(wù)執(zhí)行完畢后,才會返回任務(wù)返回值
              print(value)
          except mul.OperationalError as exc: # 任務(wù)異常處理
              logger.exception('Sending task raised: %r', exc)

          組合任務(wù):

          • 多個任務(wù)并行執(zhí)行, group

          • 多個任務(wù)鏈?zhǔn)綀?zhí)行,chain:第一個任務(wù)的返回值作為第二個的輸入?yún)?shù),以此類推

          result = group(sum.s(i, i) for i in range(5))()
          result.get()
          # [0, 2, 4, 6, 8]
          result = chain(sum.s(1,2), sum.s(3), mul.s(3))()
          result.get()
          # ((1+2)+3)*3=18

          4. 分布式集群部署

          celery作為分布式的任務(wù)隊列框架,worker是可以執(zhí)行在不同的服務(wù)器上的。部署過程和單機(jī)上啟動是一樣。只要把項目代碼copy到其他服務(wù)器,使用相同命令就可以了。可以思考下,這個是怎么實(shí)現(xiàn)的?對了,就是通過共享Broker隊列。使用合適的隊列,如redis,單進(jìn)程單線程的方式可以有效的避免同個任務(wù)被不同worker同時執(zhí)行的情況。

          celery worker -A wedo  -l debug -c 4
          • 分布式集群如下:

          5. 進(jìn)階使用

          在前面已經(jīng)了解了celery的主要的功能了。celery還為一些特別的場景提供了需要擴(kuò)展的功能

          5.1 任務(wù)狀態(tài)跟蹤和日志

          有時候我們需要對任務(wù)的執(zhí)行情況做一些監(jiān)控,比如失敗后報警通知。

          • celery在裝飾器@app.task中提供了base參數(shù),傳入重寫的Task模塊,重新on_*函數(shù)就可以控制不同的任務(wù)結(jié)果

          • 在@app.task提供bind=True,可以通過self獲取Task中各種參數(shù)

          • -   self.request:任務(wù)的各種參數(shù)

            • self.update_state: 自定義任務(wù)狀態(tài), 原有的任務(wù)狀態(tài):PENDING -> STARTED -> SUCCESS, 如果你想了解STARTED -> SUCCESS之間的一個狀態(tài),比如執(zhí)行的百分比之類,可以通過自定義狀態(tài)來實(shí)現(xiàn)

            • self.retry: 重試

          import celery
          import time
          from celery.utils.log import get_task_logger
          from wedo import app

          logger = logger = get_task_logger(__name__)
          class TaskMonitor(celery.Task):
              def on_failure(self, exc, task_id, args, kwargs, einfo):
                  """failed callback"""
                  logger.info('task id: {0!r} failed: {1!r}'.format(task_id, exc))

              def on_success(self, retval, task_id, args, kwargs):
                  """success callback"""
                  logger.info('task id:{} , arg:{} , successful !'.format(task_id,args))

              def on_retry(self, exc, task_id, args, kwargs, einfo):
                  """retry callback"""
                  logger.info('task id:{} , arg:{} , retry !  einfo: {}'.format(task_id, args, exc))

          @app.task(base=TaskMonitor, bind=True, name='post_file')
          def post_file(self, file_names):
              logger.info(self.request.__dict__)
              try:
                  for i, file in enumerate(file_names):
                      print('the file %s is posted' % file)
                      if not self.request.called_directly:
                          self.update_state(state='PROGRESS',
                              meta={'current': i, 'total': len(file_names)})
                      time.sleep(2)
              except Exception as exec:
                  raise self.retry(exc=exec, countdown=3, max_retries=5)

          5.2 任務(wù)指定特定的worker執(zhí)行

          celery做為支持分布式,理論上可以無限擴(kuò)展worker。默認(rèn)情況下celery提交任務(wù)后,任務(wù)會放入名為celery的隊列,所有在線的worker都會從任務(wù)隊列中獲取任務(wù),任一個worker都有可能執(zhí)行這個任務(wù)。有時候,有時候任務(wù)的特殊性或者機(jī)器本身的限制,某些任務(wù)只能跑在某些worker上。celery提供了queue在區(qū)別不同的worker,很好的支持這種情況。

          • 啟動worker時,-Q 指定worker支持的任務(wù)列隊名, 可以支持多個隊列名哦

          celery worker -A wedo  -l debug -c 4 -Q celery,hipri
          • 任務(wù)調(diào)用時, queue=*來指定需要執(zhí)行worker

          result = mul.apply_async(args=(22), queue='hipri')

          6. 任務(wù)隊列監(jiān)控

          如果你想通過可視化的方式,查看celery的一切。flower提供可行的解決方案,十分的方便

          flower -A wedo --port=6006
          # web訪問 http://10.8.238.2:6006/

          7. 總結(jié)

          本文和大家了介紹了分布式的隊列celery, 妥妥的很全吧, 歡迎交流。總結(jié)下內(nèi)容:

          • celery為分布式隊列, 通過消息隊列連接任務(wù)提交和執(zhí)行者worker, 松耦合模式,可擴(kuò)展

          • celery消息隊列建議為redis

          • celery通過@app.task裝飾把普通任務(wù)變成celery Task

          • celery worker 通過不同queue支持特定的worker消費(fèi)特定的任務(wù)

          • @app.task中可以同步base和bind參數(shù)獲取更過的控制任務(wù)生命周期

          • flower監(jiān)控celery全過程

          • celery doc:https://docs.celeryproject.org/en/master/getting-started/index.html

          對了,看完記得一鍵四連,這個對我真的很重要。
          瀏覽 84
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  超碰在线免费 | 黄色一级大片在线免费看国产 | 老司机午夜影院 | 国产美女被日 | 视频黄在线观看免费 |