<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Python 實現(xiàn)定時任務(wù)的八種方案!

          共 19483字,需瀏覽 39分鐘

           ·

          2021-09-10 13:26

          △點擊上方“Python貓”關(guān)注 ,回復(fù)“1”領(lǐng)取電子書

          作者:錢魏Way

          來源:https://www.biaodianfu.com/python-schedule.html

          在日常工作中,我們常常會用到需要周期性執(zhí)行的任務(wù),一種方式是采用 Linux 系統(tǒng)自帶的 crond 結(jié)合命令行實現(xiàn)。另外一種方式是直接使用Python。接下來整理的是常見的Python定時任務(wù)的實現(xiàn)方式。

          目錄

          • 利用while True: + sleep()實現(xiàn)定時任務(wù)
          • 使用Timeloop庫運行定時任務(wù)
          • 利用threading.Timer實現(xiàn)定時任務(wù)
          • 利用內(nèi)置模塊sched實現(xiàn)定時任務(wù)
          • 利用調(diào)度模塊schedule實現(xiàn)定時任務(wù)
          • 利用任務(wù)框架APScheduler實現(xiàn)定時任務(wù)
            • Job 作業(yè)
            • Trigger 觸發(fā)器
            • Executor 執(zhí)行器
            • Jobstore 作業(yè)存儲
            • Event 事件
            • 調(diào)度器
            • APScheduler中的重要概念
            • Scheduler的工作流程
          • 使用分布式消息系統(tǒng)Celery實現(xiàn)定時任務(wù)
          • 使用數(shù)據(jù)流工具Apache Airflow實現(xiàn)定時任務(wù)
            • Airflow 產(chǎn)生的背景
            • Airflow 核心概念
            • Airflow 的架構(gòu)

          利用while True: + sleep()實現(xiàn)定時任務(wù)

          位于 time 模塊中的 sleep(secs) 函數(shù),可以實現(xiàn)令當前執(zhí)行的線程暫停 secs 秒后再繼續(xù)執(zhí)行。所謂暫停,即令當前線程進入阻塞狀態(tài),當達到 sleep() 函數(shù)規(guī)定的時間后,再由阻塞狀態(tài)轉(zhuǎn)為就緒狀態(tài),等待 CPU 調(diào)度。

          基于這樣的特性我們可以通過while死循環(huán)+sleep()的方式實現(xiàn)簡單的定時任務(wù)。

          代碼示例:

          import datetime
          import time
          def time_printer():
              now = datetime.datetime.now()
              ts = now.strftime('%Y-%m-%d %H:%M:%S')
              print('do func time :', ts)
          def loop_monitor():
              while True:
                  time_printer()
                  time.sleep(5)  # 暫停5秒
          if __name__ == "__main__":
              loop_monitor()

          主要缺點:

          • 只能設(shè)定間隔,不能指定具體的時間,比如每天早上8:00
          • sleep 是一個阻塞函數(shù),也就是說 sleep 這一段時間,程序什么也不能操作。

          使用Timeloop庫運行定時任務(wù)

          Timeloop是一個庫,可用于運行多周期任務(wù)。這是一個簡單的庫,它使用decorator模式在線程中運行標記函數(shù)。

          示例代碼:

          import time
          from timeloop import Timeloop
          from datetime import timedelta
          tl = Timeloop()
          @tl.job(interval=timedelta(seconds=2))
          def sample_job_every_2s():
              print "2s job current time : {}".format(time.ctime())
          @tl.job(interval=timedelta(seconds=5))
          def sample_job_every_5s():
              print "5s job current time : {}".format(time.ctime())
          @tl.job(interval=timedelta(seconds=10))
          def sample_job_every_10s():
              print "10s job current time : {}".format(time.ctime())

          利用threading.Timer實現(xiàn)定時任務(wù)

          threading 模塊中的 Timer 是一個非阻塞函數(shù),比 sleep 稍好一點,timer最基本理解就是定時器,我們可以啟動多個定時任務(wù),這些定時器任務(wù)是異步執(zhí)行,所以不存在等待順序執(zhí)行問題。

          Timer(interval, function, args=[ ], kwargs={ })

          • interval: 指定的時間
          • function: 要執(zhí)行的方法
          • args/kwargs: 方法的參數(shù)

          代碼示例:


          備注:Timer只能執(zhí)行一次,這里需要循環(huán)調(diào)用,否則只能執(zhí)行一次

          利用內(nèi)置模塊sched實現(xiàn)定時任務(wù)

          sched模塊實現(xiàn)了一個通用事件調(diào)度器,在調(diào)度器類使用一個延遲函數(shù)等待特定的時間,執(zhí)行任務(wù)。同時支持多線程應(yīng)用程序,在每個任務(wù)執(zhí)行后會立刻調(diào)用延時函數(shù),以確保其他線程也能執(zhí)行。

          class sched.scheduler(timefunc, delayfunc)這個類定義了調(diào)度事件的通用接口,它需要外部傳入兩個參數(shù),timefunc是一個沒有參數(shù)的返回時間類型數(shù)字的函數(shù)(常用使用的如time模塊里面的time),delayfunc應(yīng)該是一個需要一個參數(shù)來調(diào)用、與timefunc的輸出兼容、并且作用為延遲多個時間單位的函數(shù)(常用的如time模塊的sleep)。

          代碼示例:

          import datetime
          import time
          import sched
          def time_printer():
              now = datetime.datetime.now()
              ts = now.strftime('%Y-%m-%d %H:%M:%S')
              print('do func time :', ts)
              loop_monitor()
          def loop_monitor():
              s = sched.scheduler(time.time, time.sleep)  # 生成調(diào)度器
              s.enter(51, time_printer, ())
              s.run()
          if __name__ == "__main__":
              loop_monitor()

          scheduler對象主要方法:

          • enter(delay, priority, action, argument),安排一個事件來延遲delay個時間單位。
          • cancel(event):從隊列中刪除事件。如果事件不是當前隊列中的事件,則該方法將跑出一個ValueError。
          • run():運行所有預(yù)定的事件。這個函數(shù)將等待(使用傳遞給構(gòu)造函數(shù)的delayfunc()函數(shù)),然后執(zhí)行事件,直到不再有預(yù)定的事件。

          個人點評:比threading.Timer更好,不需要循環(huán)調(diào)用。

          利用調(diào)度模塊schedule實現(xiàn)定時任務(wù)

          schedule是一個第三方輕量級的任務(wù)調(diào)度模塊,可以按照秒,分,小時,日期或者自定義事件執(zhí)行時間。schedule允許用戶使用簡單、人性化的語法以預(yù)定的時間間隔定期運行Python函數(shù)(或其它可調(diào)用函數(shù))。

          先來看代碼,是不是不看文檔就能明白什么意思?

          import schedule
          import time
          def job():
              print("I'm working...")
          schedule.every(10).seconds.do(job)
          schedule.every(10).minutes.do(job)
          schedule.every().hour.do(job)
          schedule.every().day.at("10:30").do(job)
          schedule.every(5).to(10).minutes.do(job)
          schedule.every().monday.do(job)
          schedule.every().wednesday.at("13:15").do(job)
          schedule.every().minute.at(":17").do(job)
          while True:
              schedule.run_pending()
              time.sleep(1)

          裝飾器:通過 @repeat() 裝飾靜態(tài)方法

          import time
          from schedule import every, repeat, run_pending
          @repeat(every().second)
          def job():
              print('working...')
          while True:
              run_pending()
              time.sleep(1)

          傳遞參數(shù):

          import schedule
          def greet(name):
              print('Hello', name)
          schedule.every(2).seconds.do(greet, name='Alice')
          schedule.every(4).seconds.do(greet, name='Bob')
          while True:
              schedule.run_pending()

          裝飾器同樣能傳遞參數(shù):

          from schedule import every, repeat, run_pending
          @repeat(every().second, 'World')
          @repeat(every().minute, 'Mars')
          def hello(planet):
              print('Hello', planet)
          while True:
              run_pending()

          取消任務(wù):

          import schedule
          i = 0
          def some_task():
              global i
              i += 1
              print(i)
              if i == 10:
                  schedule.cancel_job(job)
                  print('cancel job')
                  exit(0)
          job = schedule.every().second.do(some_task)
          while True:
              schedule.run_pending()

          運行一次任務(wù):

          import time
          import schedule
          def job_that_executes_once():
              print('Hello')
              return schedule.CancelJob
          schedule.every().minute.at(':34').do(job_that_executes_once)
          while True:
              schedule.run_pending()
              time.sleep(1)

          根據(jù)標簽檢索任務(wù):

          # 檢索所有任務(wù):schedule.get_jobs()
          import schedule
          def greet(name):
              print('Hello {}'.format(name))
          schedule.every().day.do(greet, 'Andrea').tag('daily-tasks''friend')
          schedule.every().hour.do(greet, 'John').tag('hourly-tasks''friend')
          schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks''customer')
          schedule.every().day.do(greet, 'Derek').tag('daily-tasks''guest')
          friends = schedule.get_jobs('friend')
          print(friends)

          根據(jù)標簽取消任務(wù):

          # 取消所有任務(wù):schedule.clear()
          import schedule
          def greet(name):
              print('Hello {}'.format(name))
              if name == 'Cancel':
                  schedule.clear('second-tasks')
                  print('cancel second-tasks')
          schedule.every().second.do(greet, 'Andrea').tag('second-tasks''friend')
          schedule.every().second.do(greet, 'John').tag('second-tasks''friend')
          schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks''customer')
          schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks''guest')
          while True:
              schedule.run_pending()

          運行任務(wù)到某時間:

          import schedule
          from datetime import datetime, timedelta, time
          def job():
              print('working...')
          schedule.every().second.until('23:59').do(job)  # 今天23:59停止
          schedule.every().second.until('2030-01-01 18:30').do(job)  # 2030-01-01 18:30停止
          schedule.every().second.until(timedelta(hours=8)).do(job)  # 8小時后停止
          schedule.every().second.until(time(235959)).do(job)  # 今天23:59:59停止
          schedule.every().second.until(datetime(20301118300)).do(job)  # 2030-01-01 18:30停止
          while True:
              schedule.run_pending()

          馬上運行所有任務(wù)(主要用于測試):

          import schedule
          def job():
              print('working...')
          def job1():
              print('Hello...')
          schedule.every().monday.at('12:40').do(job)
          schedule.every().tuesday.at('16:40').do(job1)
          schedule.run_all()
          schedule.run_all(delay_seconds=3)  # 任務(wù)間延遲3秒

          并行運行:使用 Python 內(nèi)置隊列實現(xiàn):

          import threading
          import time
          import schedule
          def job1():
              print("I'm running on thread %s" % threading.current_thread())
          def job2():
              print("I'm running on thread %s" % threading.current_thread())
          def job3():
              print("I'm running on thread %s" % threading.current_thread())
          def run_threaded(job_func):
              job_thread = threading.Thread(target=job_func)
              job_thread.start()
          schedule.every(10).seconds.do(run_threaded, job1)
          schedule.every(10).seconds.do(run_threaded, job2)
          schedule.every(10).seconds.do(run_threaded, job3)
          while True:
              schedule.run_pending()
              time.sleep(1)

          利用任務(wù)框架APScheduler實現(xiàn)定時任務(wù)

          APScheduler(advanceded python scheduler)基于Quartz的一個Python定時任務(wù)框架,實現(xiàn)了Quartz的所有功能,使用起來十分方便。提供了基于日期、固定時間間隔以及crontab類型的任務(wù),并且可以持久化任務(wù)。基于這些功能,我們可以很方便的實現(xiàn)一個Python定時任務(wù)系統(tǒng)。

          它有以下三個特點:

          • 類似于 Liunx Cron 的調(diào)度程序(可選的開始/結(jié)束時間)
          • 基于時間間隔的執(zhí)行調(diào)度(周期性調(diào)度,可選的開始/結(jié)束時間)
          • 一次性執(zhí)行任務(wù)(在設(shè)定的日期/時間運行一次任務(wù))

          APScheduler有四種組成部分:

          • 觸發(fā)器(trigger) 包含調(diào)度邏輯,每一個作業(yè)有它自己的觸發(fā)器,用于決定接下來哪一個作業(yè)會運行。除了他們自己初始配置意外,觸發(fā)器完全是無狀態(tài)的。
          • 作業(yè)存儲(job store) 存儲被調(diào)度的作業(yè),默認的作業(yè)存儲是簡單地把作業(yè)保存在內(nèi)存中,其他的作業(yè)存儲是將作業(yè)保存在數(shù)據(jù)庫中。一個作業(yè)的數(shù)據(jù)講在保存在持久化作業(yè)存儲時被序列化,并在加載時被反序列化。調(diào)度器不能分享同一個作業(yè)存儲。
          • 執(zhí)行器(executor) 處理作業(yè)的運行,他們通常通過在作業(yè)中提交制定的可調(diào)用對象到一個線程或者進城池來進行。當作業(yè)完成時,執(zhí)行器將會通知調(diào)度器。
          • 調(diào)度器(scheduler) 是其他的組成部分。你通常在應(yīng)用只有一個調(diào)度器,應(yīng)用的開發(fā)者通常不會直接處理作業(yè)存儲、調(diào)度器和觸發(fā)器,相反,調(diào)度器提供了處理這些的合適的接口。配置作業(yè)存儲和執(zhí)行器可以在調(diào)度器中完成,例如添加、修改和移除作業(yè)。通過配置executor、jobstore、trigger,使用線程池(ThreadPoolExecutor默認值20)或進程池(ProcessPoolExecutor 默認值5)并且默認最多3個(max_instances)任務(wù)實例同時運行,實現(xiàn)對job的增刪改查等調(diào)度控制

          示例代碼:

          from apscheduler.schedulers.blocking import BlockingScheduler
          from datetime import datetime
          # 輸出時間
          def job():
              print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
          # BlockingScheduler
          sched = BlockingScheduler()
          sched.add_job(my_job, 'interval', seconds=5, id='my_job_id')
          sched.start()

          APScheduler中的重要概念

          Job 作業(yè)

          Job作為APScheduler最小執(zhí)行單位。創(chuàng)建Job時指定執(zhí)行的函數(shù),函數(shù)中所需參數(shù),Job執(zhí)行時的一些設(shè)置信息。

          構(gòu)建說明:

          • id:指定作業(yè)的唯一ID
          • name:指定作業(yè)的名字
          • trigger:apscheduler定義的觸發(fā)器,用于確定Job的執(zhí)行時間,根據(jù)設(shè)置的trigger規(guī)則,計算得到下次執(zhí)行此job的時間, 滿足時將會執(zhí)行
          • executor:apscheduler定義的執(zhí)行器,job創(chuàng)建時設(shè)置執(zhí)行器的名字,根據(jù)字符串你名字到scheduler獲取到執(zhí)行此job的 執(zhí)行器,執(zhí)行job指定的函數(shù)
          • max_instances:執(zhí)行此job的最大實例數(shù),executor執(zhí)行job時,根據(jù)job的id來計算執(zhí)行次數(shù),根據(jù)設(shè)置的最大實例數(shù)來確定是否可執(zhí)行
          • next_run_time:Job下次的執(zhí)行時間,創(chuàng)建Job時可以指定一個時間[datetime],不指定的話則默認根據(jù)trigger獲取觸發(fā)時間
          • misfire_grace_time:Job的延遲執(zhí)行時間,例如Job的計劃執(zhí)行時間是21:00:00,但因服務(wù)重啟或其他原因?qū)е?1:00:31才執(zhí)行,如果設(shè)置此key為40,則該job會繼續(xù)執(zhí)行,否則將會丟棄此job
          • coalesce:Job是否合并執(zhí)行,是一個bool值。例如scheduler停止20s后重啟啟動,而job的觸發(fā)器設(shè)置為5s執(zhí)行一次,因此此job錯過了4個執(zhí)行時間,如果設(shè)置為是,則會合并到一次執(zhí)行,否則會逐個執(zhí)行
          • func:Job執(zhí)行的函數(shù)
          • args:Job執(zhí)行函數(shù)需要的位置參數(shù)
          • kwargs:Job執(zhí)行函數(shù)需要的關(guān)鍵字參數(shù)

          Trigger 觸發(fā)器

          Trigger綁定到Job,在scheduler調(diào)度篩選Job時,根據(jù)觸發(fā)器的規(guī)則計算出Job的觸發(fā)時間,然后與當前時間比較確定此Job是否會被執(zhí)行,總之就是根據(jù)trigger規(guī)則計算出下一個執(zhí)行時間。

          目前APScheduler支持觸發(fā)器:

          • 指定時間的DateTrigger
          • 指定間隔時間的IntervalTrigger
          • 像Linux的crontab一樣的CronTrigger。

          觸發(fā)器參數(shù):date

          date定時,作業(yè)只執(zhí)行一次。

          • run_date (datetime|str) – the date/time to run the job at
          • timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
          sched.add_job(my_job, 'date', run_date=date(2009116), args=['text'])
          sched.add_job(my_job, 'date', run_date=datetime(20197616305), args=['text'])

          觸發(fā)器參數(shù):interval

          interval間隔調(diào)度

          • weeks (int) –  間隔幾周
          • days (int) –  間隔幾天
          • hours (int) –  間隔幾小時
          • minutes (int) –  間隔幾分鐘
          • seconds (int) –  間隔多少秒
          • start_date (datetime|str) –  開始日期
          • end_date (datetime|str) –  結(jié)束日期
          • timezone (datetime.tzinfo|str) –  時區(qū)
          sched.add_job(job_function, 'interval', hours=2)

          觸發(fā)器參數(shù):cron

          cron調(diào)度

          • (int|str) 表示參數(shù)既可以是int類型,也可以是str類型
          • (datetime | str) 表示參數(shù)既可以是datetime類型,也可以是str類型
          • year (int|str) – 4-digit year -(表示四位數(shù)的年份,如2008年)
          • month (int|str) – month (1-12) -(表示取值范圍為1-12月)
          • day (int|str) – day of the (1-31) -(表示取值范圍為1-31日)
          • week (int|str) – ISO week (1-53) -(格里歷2006年12月31日可以寫成2006年-W52-7(擴展形式)或2006W527(緊湊形式))
          • day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) – (表示一周中的第幾天,既可以用0-6表示也可以用其英語縮寫表示)
          • hour (int|str) – hour (0-23) – (表示取值范圍為0-23時)
          • minute (int|str) – minute (0-59) – (表示取值范圍為0-59分)
          • second (int|str) – second (0-59) – (表示取值范圍為0-59秒)
          • start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) – (表示開始時間)
          • end_date (datetime|str) – latest possible date/time to trigger on (inclusive) – (表示結(jié)束時間)
          • timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示時區(qū)取值)

          CronTrigger可用的表達式:

          表達式參數(shù)類型描述
          *所有通配符。例:minutes=*即每分鐘觸發(fā)
          * / a所有每隔時長a執(zhí)行一次。例:minutes=”* / 3″ 即每隔3分鐘執(zhí)行一次
          a – b所有a – b的范圍內(nèi)觸發(fā)。例:minutes=“2-5”。即2到5分鐘內(nèi)每分鐘執(zhí)行一次
          a – b / c所有a – b范圍內(nèi),每隔時長c執(zhí)行一次。
          xth y第幾個星期幾觸發(fā)。x為第幾個,y為星期幾
          last x一個月中,最后一個星期的星期幾觸發(fā)
          last一個月中的最后一天觸發(fā)
          x, y, z所有組合表達式,可以組合確定值或上述表達式
          # 6-8,11-12月第三個周五 00:00, 01:00, 02:00, 03:00運行
          sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
          # 每周一到周五運行 直到2024-05-30 00:00:00
          sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2024-05-30'

          Executor 執(zhí)行器

          Executor在scheduler中初始化,另外也可通過scheduler的add_executor動態(tài)添加Executor。每個executor都會綁定一個alias,這個作為唯一標識綁定到Job,在實際執(zhí)行時會根據(jù)Job綁定的executor找到實際的執(zhí)行器對象,然后根據(jù)執(zhí)行器對象執(zhí)行Job。

          Executor的種類會根據(jù)不同的調(diào)度來選擇,如果選擇AsyncIO作為調(diào)度的庫,那么選擇AsyncIOExecutor,如果選擇tornado作為調(diào)度的庫,選擇TornadoExecutor,如果選擇啟動進程作為調(diào)度,選擇ThreadPoolExecutor或者ProcessPoolExecutor都可以。

          Executor的選擇需要根據(jù)實際的scheduler來選擇不同的執(zhí)行器。目前APScheduler支持的Executor:

          • executors.asyncio:同步io,阻塞
          • executors.gevent:io多路復(fù)用,非阻塞
          • executors.pool: 線程ThreadPoolExecutor和進程ProcessPoolExecutor
          • executors.twisted:基于事件驅(qū)動

          Jobstore 作業(yè)存儲

          Jobstore在scheduler中初始化,另外也可通過scheduler的add_jobstore動態(tài)添加Jobstore。每個jobstore都會綁定一個alias,scheduler在Add Job時,根據(jù)指定的jobstore在scheduler中找到相應(yīng)的jobstore,并將job添加到j(luò)obstore中。作業(yè)存儲器決定任務(wù)的保存方式, 默認存儲在內(nèi)存中(MemoryJobStore),重啟后就沒有了。APScheduler支持的任務(wù)存儲器有:

          • jobstores.memory:內(nèi)存
          • jobstores.mongodb:存儲在mongodb
          • jobstores.redis:存儲在redis
          • jobstores.rethinkdb:存儲在rethinkdb
          • jobstores.sqlalchemy:支持sqlalchemy的數(shù)據(jù)庫如mysql,sqlite等
          • jobstores.zookeeper:zookeeper

          不同的任務(wù)存儲器可以在調(diào)度器的配置中進行配置(見調(diào)度器)

          Event 事件

          Event是APScheduler在進行某些操作時觸發(fā)相應(yīng)的事件,用戶可以自定義一些函數(shù)來監(jiān)聽這些事件,當觸發(fā)某些Event時,做一些具體的操作。常見的比如。Job執(zhí)行異常事件 EVENT_JOB_ERROR。Job執(zhí)行時間錯過事件 EVENT_JOB_MISSED。

          目前APScheduler定義的Event:

          • EVENT_SCHEDULER_STARTED
          • EVENT_SCHEDULER_START
          • EVENT_SCHEDULER_SHUTDOWN
          • EVENT_SCHEDULER_PAUSED
          • EVENT_SCHEDULER_RESUMED
          • EVENT_EXECUTOR_ADDED
          • EVENT_EXECUTOR_REMOVED
          • EVENT_JOBSTORE_ADDED
          • EVENT_JOBSTORE_REMOVED
          • EVENT_ALL_JOBS_REMOVED
          • EVENT_JOB_ADDED
          • EVENT_JOB_REMOVED
          • EVENT_JOB_MODIFIED
          • EVENT_JOB_EXECUTED
          • EVENT_JOB_ERROR
          • EVENT_JOB_MISSED
          • EVENT_JOB_SUBMITTED
          • EVENT_JOB_MAX_INSTANCES

          Listener表示用戶自定義監(jiān)聽的一些Event,比如當Job觸發(fā)了EVENT_JOB_MISSED事件時可以根據(jù)需求做一些其他處理。

          調(diào)度器

          Scheduler是APScheduler的核心,所有相關(guān)組件通過其定義。scheduler啟動之后,將開始按照配置的任務(wù)進行調(diào)度。除了依據(jù)所有定義Job的trigger生成的將要調(diào)度時間喚醒調(diào)度之外。當發(fā)生Job信息變更時也會觸發(fā)調(diào)度。

          APScheduler支持的調(diào)度器方式如下,比較常用的為BlockingScheduler和BackgroundScheduler

          • BlockingScheduler:適用于調(diào)度程序是進程中唯一運行的進程,調(diào)用start函數(shù)會阻塞當前線程,不能立即返回。
          • BackgroundScheduler:適用于調(diào)度程序在應(yīng)用程序的后臺運行,調(diào)用start后主線程不會阻塞。
          • AsyncIOScheduler:適用于使用了asyncio模塊的應(yīng)用程序。
          • GeventScheduler:適用于使用gevent模塊的應(yīng)用程序。
          • TwistedScheduler:適用于構(gòu)建Twisted的應(yīng)用程序。
          • QtScheduler:適用于構(gòu)建Qt的應(yīng)用程序。

          Scheduler的工作流程

          Scheduler添加job流程:

          Scheduler調(diào)度流程:

          使用分布式消息系統(tǒng)Celery實現(xiàn)定時任務(wù)

          Celery是一個簡單,靈活,可靠的分布式系統(tǒng),用于處理大量消息,同時為操作提供維護此類系統(tǒng)所需的工具, 也可用于任務(wù)調(diào)度。Celery 的配置比較麻煩,如果你只是需要一個輕量級的調(diào)度工具,Celery 不會是一個好選擇。

          Celery 是一個強大的分布式任務(wù)隊列,它可以讓任務(wù)的執(zhí)行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來實現(xiàn)異步任務(wù)(async task)和定時任務(wù)(crontab)。異步任務(wù)比如是發(fā)送郵件、或者文件上傳, 圖像處理等等一些比較耗時的操作 ,定時任務(wù)是需要在特定時間執(zhí)行的任務(wù)。

          需要注意,celery本身并不具備任務(wù)的存儲功能,在調(diào)度任務(wù)的時候肯定是要把任務(wù)存起來的,因此在使用celery的時候還需要搭配一些具備存儲、訪問功能的工具,比如:消息隊列、Redis緩存、數(shù)據(jù)庫等。官方推薦的是消息隊列RabbitMQ,有些時候使用Redis也是不錯的選擇。

          它的架構(gòu)組成如下圖:

          Celery架構(gòu),它采用典型的生產(chǎn)者-消費者模式,主要由以下部分組成:

          • Celery Beat,任務(wù)調(diào)度器,Beat進程會讀取配置文件的內(nèi)容,周期性地將配置中到期需要執(zhí)行的任務(wù)發(fā)送給任務(wù)隊列。
          • Producer:需要在隊列中進行的任務(wù),一般由用戶、觸發(fā)器或其他操作將任務(wù)入隊,然后交由workers進行處理。調(diào)用了Celery提供的API、函數(shù)或者裝飾器而產(chǎn)生任務(wù)并交給任務(wù)隊列處理的都是任務(wù)生產(chǎn)者。
          • Broker,即消息中間件,在這指任務(wù)隊列本身,Celery扮演生產(chǎn)者和消費者的角色,brokers就是生產(chǎn)者和消費者存放/獲取產(chǎn)品的地方(隊列)。
          • Celery Worker,執(zhí)行任務(wù)的消費者,從隊列中取出任務(wù)并執(zhí)行。通常會在多臺服務(wù)器運行多個消費者來提高執(zhí)行效率。
          • Result Backend:任務(wù)處理完后保存狀態(tài)信息和結(jié)果,以供查詢。Celery默認已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。

          實際應(yīng)用中,用戶從Web前端發(fā)起一個請求,我們只需要將請求所要處理的任務(wù)丟入任務(wù)隊列broker中,由空閑的worker去處理任務(wù)即可,處理的結(jié)果會暫存在后臺數(shù)據(jù)庫backend中。我們可以在一臺機器或多臺機器上同時起多個worker進程來實現(xiàn)分布式地并行處理任務(wù)。

          Celery定時任務(wù)實例:

          • Python Celery & RabbitMQ Tutorial
          • Celery 配置實踐筆記

          使用數(shù)據(jù)流工具Apache Airflow實現(xiàn)定時任務(wù)

          Apache Airflow 是Airbnb開源的一款數(shù)據(jù)流程工具,目前是Apache孵化項目。以非常靈活的方式來支持數(shù)據(jù)的ETL過程,同時還支持非常多的插件來完成諸如HDFS監(jiān)控、郵件通知等功能。Airflow支持單機和分布式兩種模式,支持Master-Slave模式,支持Mesos等資源調(diào)度,有非常好的擴展性。被大量公司采用。

          Airflow使用Python開發(fā),它通過DAGs(Directed Acyclic Graph, 有向無環(huán)圖)來表達一個工作流中所要執(zhí)行的任務(wù),以及任務(wù)之間的關(guān)系和依賴。比如,如下的工作流中,任務(wù)T1執(zhí)行完成,T2和T3才能開始執(zhí)行,T2和T3都執(zhí)行完成,T4才能開始執(zhí)行。

          Airflow提供了各種Operator實現(xiàn),可以完成各種任務(wù)實現(xiàn):

          • BashOperator – 執(zhí)行 bash 命令或腳本。
          • SSHOperator – 執(zhí)行遠程 bash 命令或腳本(原理同 paramiko 模塊)。
          • PythonOperator – 執(zhí)行 Python 函數(shù)。
          • EmailOperator – 發(fā)送 Email。
          • HTTPOperator – 發(fā)送一個 HTTP 請求。
          • MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等,執(zhí)行 SQL 任務(wù)。
          • DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator…

          除了以上這些 Operators 還可以方便的自定義 Operators 滿足個性化的任務(wù)需求。

          一些情況下,我們需要根據(jù)執(zhí)行結(jié)果執(zhí)行不同的任務(wù),這樣工作流會產(chǎn)生分支。如:

          這種需求可以使用BranchPythonOperator來實現(xiàn)。

          Airflow 產(chǎn)生的背景

          通常,在一個運維系統(tǒng),數(shù)據(jù)分析系統(tǒng),或測試系統(tǒng)等大型系統(tǒng)中,我們會有各種各樣的依賴需求。包括但不限于:

          • 時間依賴:任務(wù)需要等待某一個時間點觸發(fā)。
          • 外部系統(tǒng)依賴:任務(wù)依賴外部系統(tǒng)需要調(diào)用接口去訪問。
          • 任務(wù)間依賴:任務(wù) A 需要在任務(wù) B 完成后啟動,兩個任務(wù)互相間會產(chǎn)生影響。
          • 資源環(huán)境依賴:任務(wù)消耗資源非常多, 或者只能在特定的機器上執(zhí)行。

          crontab 可以很好地處理定時執(zhí)行任務(wù)的需求,但僅能管理時間上的依賴。Airflow 的核心概念 DAG(有向無環(huán)圖)—— 來表現(xiàn)工作流。

          • Airflow 是一種 WMS,即:它將任務(wù)以及它們的依賴看作代碼,按照那些計劃規(guī)范任務(wù)執(zhí)行,并在實際工作進程之間分發(fā)需執(zhí)行的任務(wù)。
          • Airflow 提供了一個用于顯示當前活動任務(wù)和過去任務(wù)狀態(tài)的優(yōu)秀 UI,并允許用戶手動管理任務(wù)的執(zhí)行和狀態(tài)。
          • Airflow 中的工作流是具有方向性依賴的任務(wù)集合。
          • DAG 中的每個節(jié)點都是一個任務(wù),DAG 中的邊表示的是任務(wù)之間的依賴(強制為有向無環(huán),因此不會出現(xiàn)循環(huán)依賴,從而導(dǎo)致無限執(zhí)行循環(huán))。

          Airflow 核心概念

          • DAGs:即有向無環(huán)圖(Directed Acyclic Graph),將所有需要運行的tasks按照依賴關(guān)系組織起來,描述的是所有tasks執(zhí)行順序。
          • Operators:可以簡單理解為一個class,描述了DAG中某個的task具體要做的事。其中,airflow內(nèi)置了很多operators,如BashOperator 執(zhí)行一個bash 命令,PythonOperator 調(diào)用任意的Python 函數(shù),EmailOperator 用于發(fā)送郵件,HTTPOperator 用于發(fā)送HTTP請求, SqlOperator 用于執(zhí)行SQL命令等等,同時,用戶可以自定義Operator,這給用戶提供了極大的便利性。
          • Tasks:Task 是 Operator的一個實例,也就是DAGs中的一個node。
          • Task Instance:task的一次運行。Web 界面中可以看到task instance 有自己的狀態(tài),包括”running”, “success”, “failed”, “skipped”, “up for retry”等。
          • Task Relationships:DAGs中的不同Tasks之間可以有依賴關(guān)系,如 Task1 >> Task2,表明Task2依賴于Task2了。通過將DAGs和Operators結(jié)合起來,用戶就可以創(chuàng)建各種復(fù)雜的 工作流(workflow)。

          Airflow 的架構(gòu)

          在一個可擴展的生產(chǎn)環(huán)境中,Airflow 含有以下組件:

          • 元數(shù)據(jù)庫:這個數(shù)據(jù)庫存儲有關(guān)任務(wù)狀態(tài)的信息。
          • 調(diào)度器:Scheduler 是一種使用 DAG 定義結(jié)合元數(shù)據(jù)中的任務(wù)狀態(tài)來決定哪些任務(wù)需要被執(zhí)行以及任務(wù)執(zhí)行優(yōu)先級的過程。調(diào)度器通常作為服務(wù)運行。
          • 執(zhí)行器:Executor 是一個消息隊列進程,它被綁定到調(diào)度器中,用于確定實際執(zhí)行每個任務(wù)計劃的工作進程。有不同類型的執(zhí)行器,每個執(zhí)行器都使用一個指定工作進程的類來執(zhí)行任務(wù)。例如,LocalExecutor 使用與調(diào)度器進程在同一臺機器上運行的并行進程執(zhí)行任務(wù)。其他像 CeleryExecutor 的執(zhí)行器使用存在于獨立的工作機器集群中的工作進程執(zhí)行任務(wù)。
          • Workers:這些是實際執(zhí)行任務(wù)邏輯的進程,由正在使用的執(zhí)行器確定。

          Worker的具體實現(xiàn)由配置文件中的executor來指定,airflow支持多種Executor:

          • SequentialExecutor: 單進程順序執(zhí)行,一般只用來測試
          • LocalExecutor: 本地多進程執(zhí)行
          • CeleryExecutor: 使用Celery進行分布式任務(wù)調(diào)度
          • DaskExecutor:使用Dask進行分布式任務(wù)調(diào)度
          • KubernetesExecutor: 1.10.0新增, 創(chuàng)建臨時POD執(zhí)行每次任務(wù)

          生產(chǎn)環(huán)境一般使用CeleryExecutor和KubernetesExecutor。

          使用CeleryExecutor的架構(gòu)如圖:

          使用KubernetesExecutor的架構(gòu)如圖:

          其它參考:

          • Getting started with Apache Airflow
          • Understanding Apache Airflow’s key concepts
          Python貓技術(shù)交流群開放啦!群里既有國內(nèi)一二線大廠在職員工,也有國內(nèi)外高校在讀學生,既有十多年碼齡的編程老鳥,也有中小學剛剛?cè)腴T的新人,學習氛圍良好!想入群的同學,請在公號內(nèi)回復(fù)『交流群』,獲取貓哥的微信(謝絕廣告黨,非誠勿擾!)~


          還不過癮?試試它們




          把 Redis 當作隊列用,真的合適嗎?

          Python 中如何實現(xiàn)參數(shù)化測試?

          如何優(yōu)雅高效地節(jié)省內(nèi)存?官方給出了 12 個經(jīng)典用法

          Python 實用技:將 Office 文件轉(zhuǎn)為 PDF

          Python 面向切面編程 AOP 和裝飾器

          終于來了!!Pyston v2.0 發(fā)布,解決 Python 慢速的救星


          如果你覺得本文有幫助
          請慷慨分享點贊,感謝啦
          瀏覽 39
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  成人久久大香蕉 | 韩国免费一级a一片在线播放 | 北条麻妃91视频 | 伊人啪啪网 | 免费清高视频一黄色情 |