<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Python實現(xiàn)定時任務(wù)的八種方案!

          共 25609字,需瀏覽 52分鐘

           ·

          2023-10-10 09:17

          在日常工作中,我們常常會用到需要周期性執(zhí)行的任務(wù),一種方式是采用 Linux 系統(tǒng)自帶的 crond[1] 結(jié)合命令行實現(xiàn)。另外一種方式是直接使用 Python。接下里整理的是常見的 Python 定時任務(wù)的實現(xiàn)方式。

          利用 while True: + sleep() 實現(xiàn)定時任務(wù)

          位于 time 模塊中的 sleep(secs) 函數(shù),可以實現(xiàn)令當(dāng)前執(zhí)行的線程暫停 secs 秒后再繼續(xù)執(zhí)行。所謂暫停,即令當(dāng)前線程進(jìn)入阻塞狀態(tài),當(dāng)達(dá)到 sleep() 函數(shù)規(guī)定的時間后,再由阻塞狀態(tài)轉(zhuǎn)為就緒狀態(tài),等待 CPU 調(diào)度。

          基于這樣的特性我們可以通過 while 死循環(huán)+sleep() 的方式實現(xiàn)簡單的定時任務(wù)。

          代碼示例:

          import datetime

          import time

          def time_printer():

              now = datetime.datetime.now()

              ts = now.strftime('%Y-%m-%d %H:%M:%S')

              print('do func time :', ts)

          def loop_monitor():

              while True:

                  time_printer()

                  time.sleep(5)  # 暫停 5 秒

          if __name__ == "__main__":

              loop_monitor()

          主要缺點:

          • 只能設(shè)定間隔,不能指定具體的時間,比如每天早上 8:00

          • sleep 是一個阻塞函數(shù),也就是說 sleep 這一段時間,程序什么也不能操作 。使用 Timeloop 庫運(yùn)行定時任務(wù)

          Timeloop[2] 是一個庫,可用于運(yùn)行多周期任務(wù)。這是一個簡單的庫,它使用 decorator 模式在線程中運(yùn)行標(biāo)記函數(shù)。

          示例代碼:

          import time

          from timeloop import Timeloop

          from datetime import timedelta

          tl = Timeloop()

          @tl.job(interval=timedelta(seconds=2))

          def sample_job_every_2s():

              print "2s job current time : {}".format(time.ctime())

          @tl.job(interval=timedelta(seconds=5))

          def sample_job_every_5s():

              print "5s job current time : {}".format(time.ctime())

          @tl.job(interval=timedelta(seconds=10))

          def sample_job_every_10s():

              print "10s job current time : {}".format(time.ctime())

          利用 threading.Timer 實現(xiàn)定時任務(wù)

          threading 模塊中的 Timer 是一個非阻塞函數(shù),比 sleep 稍好一點,timer 最基本理解就是定時器,我們可以啟動多個定時任務(wù),這些定時器任務(wù)是異步執(zhí)行,所以不存在等待順序執(zhí)行問題。


          Timer(interval, function, args=[ ], kwargs={ })

          • interval: 指定的時間

          • function: 要執(zhí)行的方法

          • args/kwargs: 方法的參數(shù) 代碼示例:

          代碼示例:

          import datetime

          from threading import Timer

          def time_printer():

              now = datetime.datetime.now()

              ts = now.strftime('%Y-%m-%d %H:%M:%S')

              print('do func time :', ts)

              loop_monitor()

          def loop_monitor():

              t = Timer(5, time_printer)

              t.start()

          if __name__ == "__main__":

              loop_monitor()

          備注:Timer 只能執(zhí)行一次,這里需要循環(huán)調(diào)用,否則只能執(zhí)行一次

          利用內(nèi)置模塊 sched 實現(xiàn)定時任務(wù)

          sched 模塊實現(xiàn)了一個通用事件調(diào)度器,在調(diào)度器類使用一個延遲函數(shù)等待特定的時間,執(zhí)行任務(wù)。同時支持多線程應(yīng)用程序,在每個任務(wù)執(zhí)行后會立刻調(diào)用延時函數(shù),以確保其他線程也能執(zhí)行。

          class sched.scheduler(timefunc, delayfunc) 這個類定義了調(diào)度事件的通用接口,它需要外部傳入兩個參數(shù),timefunc 是一個沒有參數(shù)的返回時間類型數(shù)字的函數(shù)(常用使用的如 time 模塊里面的 time),delayfunc 應(yīng)該是一個需要一個參數(shù)來調(diào)用、與 timefunc 的輸出兼容、并且作用為延遲多個時間單位的函數(shù)(常用的如 time 模塊的 sleep)。

          代碼示例:

          import datetime

          import time

          import sched

          def time_printer():

              now = datetime.datetime.now()

              ts = now.strftime('%Y-%m-%d %H:%M:%S')

              print('do func time :', ts)

              loop_monitor()

          def loop_monitor():

              s = sched.scheduler(time.time, time.sleep)  # 生成調(diào)度器

              s.enter(5, 1, time_printer, ())

              s.run()

          if __name__ == "__main__":

              loop_monitor()

          scheduler 對象主要方法:

          • enter(delay, priority, action, argument),安排一個事件來延遲 delay 個時間單位。

          • cancel(event):從隊列中刪除事件。如果事件不是當(dāng)前隊列中的事件,則該方法將跑出一個 ValueError。

          • run():運(yùn)行所有預(yù)定的事件。這個函數(shù)將等待(使用傳遞給構(gòu)造函數(shù)的 delayfunc() 函數(shù)),然后執(zhí)行事件,直到不再有預(yù)定的事件。

          個人點評:比 threading.Timer 更好,不需要循環(huán)調(diào)用。

          利用調(diào)度模塊 schedule 實現(xiàn)定時任務(wù)

          schedule[3] 是一個第三方輕量級的任務(wù)調(diào)度模塊,可以按照秒,分,小時,日期或者自定義事件執(zhí)行時間。schedule[4] 允許用戶使用簡單、人性化的語法以預(yù)定的時間間隔定期運(yùn)行 Python 函數(shù)(或其它可調(diào)用函數(shù))。

          先來看代碼,是不是不看文檔就能明白什么意思?

          import schedule

          import time

          def job():

              print("I'm working...")

          schedule.every(10).seconds.do(job)

          schedule.every(10).minutes.do(job)

          schedule.every().hour.do(job)

          schedule.every().day.at("10:30").do(job)

          schedule.every(5).to(10).minutes.do(job)

          schedule.every().monday.do(job)

          schedule.every().wednesday.at("13:15").do(job)

          schedule.every().minute.at(":17").do(job)

          while True:

              schedule.run_pending()

              time.sleep(1)

          裝飾器:通過 @repeat() 裝飾靜態(tài)方法

          import time

          from schedule import every, repeat, run_pending

          @repeat(every().second)

          def job():

              print('working...')

          while True:

              run_pending()

              time.sleep(1)

          傳遞參數(shù):

          import schedule

          def greet(name):

              print('Hello', name)

          schedule.every(2).seconds.do(greet, name='Alice')

          schedule.every(4).seconds.do(greet, name='Bob')

          while True:

              schedule.run_pending()

          裝飾器同樣能傳遞參數(shù):

          from schedule import every, repeat, run_pending

          @repeat(every().second, 'World')

          @repeat(every().minute, 'Mars')

          def hello(planet):

              print('Hello', planet)

          while True:

              run_pending()

          取消任務(wù):

          import schedule

          i = 0

          def some_task():

              global i

              i += 1

              print(i)

              if i == 10:

                  schedule.cancel_job(job)

                  print('cancel job')

                  exit(0)

          job = schedule.every().second.do(some_task)

          while True:

              schedule.run_pending()

          運(yùn)行一次任務(wù):

          import time

          import schedule

          def job_that_executes_once():

              print('Hello')

              return schedule.CancelJob

          schedule.every().minute.at(':34').do(job_that_executes_once)

          while True:

              schedule.run_pending()

              time.sleep(1)

          根據(jù)標(biāo)簽檢索任務(wù):

          # 檢索所有任務(wù):schedule.get_jobs()

          import schedule

          def greet(name):

              print('Hello {}'.format(name))

          schedule.every().day.do(greet, 'Andrea').tag('daily-tasks''friend')

          schedule.every().hour.do(greet, 'John').tag('hourly-tasks''friend')

          schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks''customer')

          schedule.every().day.do(greet, 'Derek').tag('daily-tasks''guest')

          friends = schedule.get_jobs('friend')

          print(friends)

          根據(jù)標(biāo)簽取消任務(wù):

          # 取消所有任務(wù):schedule.clear()

          import schedule

          def greet(name):

              print('Hello {}'.format(name))

              if name == 'Cancel':

                  schedule.clear('second-tasks')

                  print('cancel second-tasks')

          schedule.every().second.do(greet, 'Andrea').tag('second-tasks''friend')

          schedule.every().second.do(greet, 'John').tag('second-tasks''friend')

          schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks''customer')

          schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks''guest')

          while True:

              schedule.run_pending()

          運(yùn)行任務(wù)到某時間:

          import schedule

          from datetime import datetime, timedelta, time

          def job():

              print('working...')

          schedule.every().second.until('23:59').do(job)  # 今天 23:59 停止

          schedule.every().second.until('2030-01-01 18:30').do(job)  # 2030-01-01 18:30 停止

          schedule.every().second.until(timedelta(hours=8)).do(job)  # 8 小時后停止

          schedule.every().second.until(time(23, 59, 59)).do(job)  # 今天 23:59:59 停止

          schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job)  # 2030-01-01 18:30 停止

          while True:

              schedule.run_pending()

          馬上運(yùn)行所有任務(wù)(主要用于測試):

          import schedule

          def job():

              print('working...')

          def job1():

              print('Hello...')

          schedule.every().monday.at('12:40').do(job)

          schedule.every().tuesday.at('16:40').do(job1)

          schedule.run_all()

          schedule.run_all(delay_seconds=3)  # 任務(wù)間延遲 3 秒

          并行運(yùn)行:使用 Python 內(nèi)置隊列實現(xiàn):

          import threading

          import time

          import schedule

          def job1():

              print("I'm running on thread %s" % threading.current_thread())

          def job2():

              print("I'm running on thread %s" % threading.current_thread())

          def job3():

              print("I'm running on thread %s" % threading.current_thread())

          def run_threaded(job_func):

              job_thread = threading.Thread(target=job_func)

              job_thread.start()

          schedule.every(10).seconds.do(run_threaded, job1)

          schedule.every(10).seconds.do(run_threaded, job2)

          schedule.every(10).seconds.do(run_threaded, job3)

          while True:

              schedule.run_pending()

              time.sleep(1)

          利用任務(wù)框架 APScheduler 實現(xiàn)定時任務(wù)

          APScheduler[5](advanceded python scheduler)基于 Quartz 的一個 Python 定時任務(wù)框架,實現(xiàn)了 Quartz 的所有功能,使用起來十分方便。提供了基于日期、固定時間間隔以及 crontab 類型的任務(wù),并且可以持久化任務(wù)。基于這些功能,我們可以很方便的實現(xiàn)一個 Python 定時任務(wù)系統(tǒng)。

          它有以下三個特點:

          • 類似于 Liunx Cron 的調(diào)度程序(可選的開始/結(jié)束時間)

          • 基于時間間隔的執(zhí)行調(diào)度(周期性調(diào)度,可選的開始/結(jié)束時間)

          • 一次性執(zhí)行任務(wù)(在設(shè)定的日期/時間運(yùn)行一次任務(wù))

          APScheduler 有四種組成部分:

          • 觸發(fā)器 (trigger) 包含調(diào)度邏輯,每一個作業(yè)有它自己的觸發(fā)器,用于決定接下來哪一個作業(yè)會運(yùn)行。除了他們自己初始配置意外,觸發(fā)器完全是無狀態(tài)的。

          • 作業(yè)存儲 (job store) 存儲被調(diào)度的作業(yè),默認(rèn)的作業(yè)存儲是簡單地把作業(yè)保存在內(nèi)存中,其他的作業(yè)存儲是將作業(yè)保存在數(shù)據(jù)庫中。一個作業(yè)的數(shù)據(jù)講在保存在持久化作業(yè)存儲時被序列化,并在加載時被反序列化。調(diào)度器不能分享同一個作業(yè)存儲。

          • 執(zhí)行器 (executor) 處理作業(yè)的運(yùn)行,他們通常通過在作業(yè)中提交制定的可調(diào)用對象到一個線程或者進(jìn)城池來進(jìn)行。當(dāng)作業(yè)完成時,執(zhí)行器將會通知調(diào)度器。

          • 調(diào)度器 (scheduler) 是其他的組成部分。你通常在應(yīng)用只有一個調(diào)度器,應(yīng)用的開發(fā)者通常不會直接處理作業(yè)存儲、調(diào)度器和觸發(fā)器,相反,調(diào)度器提供了處理這些的合適的接口。配置作業(yè)存儲和執(zhí)行器可以在調(diào)度器中完成,例如添加、修改和移除作業(yè)。通過配置 executor、jobstore、trigger,使用線程池 (ThreadPoolExecutor 默認(rèn)值 20) 或進(jìn)程池 (ProcessPoolExecutor 默認(rèn)值 5) 并且默認(rèn)最多 3 個 (max_instances) 任務(wù)實例同時運(yùn)行,實現(xiàn)對 job 的增刪改查等調(diào)度控制

          示例代碼:

          from apscheduler.schedulers.blocking import BlockingScheduler

          from datetime import datetime

          # 輸出時間

          def job():

              print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

          # BlockingScheduler

          sched = BlockingScheduler()

          sched.add_job(my_job, 'interval', seconds=5, id='my_job_id')

          sched.start()

          APScheduler 中的重要概念

          Job 作業(yè)

          Job 作為 APScheduler 最小執(zhí)行單位。創(chuàng)建 Job 時指定執(zhí)行的函數(shù),函數(shù)中所需參數(shù),Job 執(zhí)行時的一些設(shè)置信息。

          構(gòu)建說明:

          • id:指定作業(yè)的唯一 ID

          • name:指定作業(yè)的名字

          • trigger:apscheduler 定義的觸發(fā)器,用于確定 Job 的執(zhí)行時間,根據(jù)設(shè)置的 trigger 規(guī)則,計算得到下次執(zhí)行此 job 的時間, 滿足時將會執(zhí)行

          • executor:apscheduler 定義的執(zhí)行器,job 創(chuàng)建時設(shè)置執(zhí)行器的名字,根據(jù)字符串你名字到 scheduler 獲取到執(zhí)行此 job 的 執(zhí)行器,執(zhí)行 job 指定的函數(shù)

          • max_instances:執(zhí)行此 job 的最大實例數(shù),executor 執(zhí)行 job 時,根據(jù) job 的 id 來計算執(zhí)行次數(shù),根據(jù)設(shè)置的最大實例數(shù)來確定是否可執(zhí)行

          • next_run_time:Job 下次的執(zhí)行時間,創(chuàng)建 Job 時可以指定一個時間 [datetime], 不指定的話則默認(rèn)根據(jù) trigger 獲取觸發(fā)時間

          • misfire_grace_time:Job 的延遲執(zhí)行時間,例如 Job 的計劃執(zhí)行時間是 21:00:00,但因服務(wù)重啟或其他原因?qū)е?21:00:31 才執(zhí)行,如果設(shè)置此 key 為 40, 則該 job 會繼續(xù)執(zhí)行,否則將會丟棄此 job

          • coalesce:Job 是否合并執(zhí)行,是一個 bool 值。例如 scheduler 停止 20s 后重啟啟動,而 job 的觸發(fā)器設(shè)置為 5s 執(zhí)行一次,因此此 job 錯過了 4 個執(zhí)行時間,如果設(shè)置為是,則會合并到一次執(zhí)行,否則會逐個執(zhí)行

          • func:Job 執(zhí)行的函數(shù)

          • args:Job 執(zhí)行函數(shù)需要的位置參數(shù)

          • kwargs:Job 執(zhí)行函數(shù)需要的關(guān)鍵字參數(shù)

          Trigger 觸發(fā)器

          Trigger 綁定到 Job,在 scheduler 調(diào)度篩選 Job 時,根據(jù)觸發(fā)器的規(guī)則計算出 Job 的觸發(fā)時間,然后與當(dāng)前時間比較確定此 Job 是否會被執(zhí)行,總之就是根據(jù) trigger 規(guī)則計算出下一個執(zhí)行時間。

          目前 APScheduler 支持觸發(fā)器:

          • 指定時間的 DateTrigger

          • 指定間隔時間的 IntervalTrigger

          • 像 Linux 的 crontab 一樣的 CronTrigger。

          觸發(fā)器參數(shù):date

          date 定時,作業(yè)只執(zhí)行一次。

          • run_date (datetime|str) – the date/time to run the job at

          • timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already

          sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])

          sched.add_job(my_job, 'date', run_date=datetime(2019, 7, 6, 16, 30, 5), args=['text'])

          觸發(fā)器參數(shù):interval

          interval 間隔調(diào)度

          • weeks (int) – 間隔幾周

          • days (int) – 間隔幾天

          • hours (int) – 間隔幾小時

          • minutes (int) – 間隔幾分鐘

          • seconds (int) – 間隔多少秒

          • start_date (datetime|str) – 開始日期

          • end_date (datetime|str) – 結(jié)束日期

          • timezone (datetime.tzinfo|str) – 時區(qū)

          sched.add_job(job_function, 'interval', hours=2)

          觸發(fā)器參數(shù):cron

          cron 調(diào)度

          • (int|str) 表示參數(shù)既可以是 int 類型,也可以是 str 類型

          • (datetime | str) 表示參數(shù)既可以是 datetime 類型,也可以是 str 類型

          • year (int|str) – 4-digit year -(表示四位數(shù)的年份,如 2008 年)

          • month (int|str) – month (1-12) -(表示取值范圍為 1-12 月)

          • day (int|str) – day of the (1-31) -(表示取值范圍為 1-31 日)

          • week (int|str) – ISO week (1-53) -(格里歷 2006 年 12 月 31 日可以寫成 2006 年-W52-7(擴(kuò)展形式)或 2006W527(緊湊形式))

          • day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) – (表示一周中的第幾天,既可以用 0-6 表示也可以用其英語縮寫表示)

          • hour (int|str) – hour (0-23) – (表示取值范圍為 0-23 時)

          • minute (int|str) – minute (0-59) – (表示取值范圍為 0-59 分)

          • second (int|str) – second (0-59) – (表示取值范圍為 0-59 秒)

          • start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) – (表示開始時間)

          • end_date (datetime|str) – latest possible date/time to trigger on (inclusive) – (表示結(jié)束時間)

          • timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示時區(qū)取值)

          CronTrigger 可用的表達(dá)式:

          表達(dá)式 參數(shù)類型 描述
          * 所有 通配符。例:minutes=*即每分鐘觸發(fā)
          * / a 所有 每隔時長 a 執(zhí)行一次。例:minutes=”* / 3″ 即每隔 3 分鐘執(zhí)行一次
          a – b 所有 a – b 的范圍內(nèi)觸發(fā)。例:minutes=“2-5”。即 2 到 5 分鐘內(nèi)每分鐘執(zhí)行一次
          a – b / c 所有 a – b 范圍內(nèi),每隔時長 c 執(zhí)行一次。
          xth y 第幾個星期幾觸發(fā)。x 為第幾個,y 為星期幾
          last x 一個月中,最后一個星期的星期幾觸發(fā)
          last 一個月中的最后一天觸發(fā)
          x, y, z 所有 組合表達(dá)式,可以組合確定值或上述表達(dá)式

          # 6-8,11-12 月第三個周五 00:00, 01:00, 02:00, 03:00 運(yùn)行


          sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

          # 每周一到周五運(yùn)行 直到 2024-05-30 00:00:00

          sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2024-05-30'

          Executor 執(zhí)行器

          Executor 在 scheduler 中初始化,另外也可通過 scheduler 的 add_executor 動態(tài)添加 Executor。每個 executor 都會綁定一個 alias,這個作為唯一標(biāo)識綁定到 Job,在實際執(zhí)行時會根據(jù) Job 綁定的 executor 找到實際的執(zhí)行器對象,然后根據(jù)執(zhí)行器對象執(zhí)行 Job。

          Executor 的種類會根據(jù)不同的調(diào)度來選擇,如果選擇 AsyncIO 作為調(diào)度的庫,那么選擇 AsyncIOExecutor,如果選擇 tornado 作為調(diào)度的庫,選擇 TornadoExecutor,如果選擇啟動進(jìn)程作為調(diào)度,選擇 ThreadPoolExecutor 或者 ProcessPoolExecutor 都可以。

          Executor 的選擇需要根據(jù)實際的 scheduler 來選擇不同的執(zhí)行器。目前 APScheduler 支持的 Executor:

          • executors.asyncio:同步 io,阻塞

          • executors.gevent:io 多路復(fù) 用,非阻塞

          • executors.pool: 線程 ThreadPoolExecutor 和進(jìn)程 ProcessPoolExecutor

          • executors.twisted:基于事件驅(qū)動

          Jobstore 作業(yè)存儲

          Jobstore 在 scheduler 中初始化,另外也可通過 scheduler 的 add_jobstore 動態(tài)添加 Jobstore。每個 jobstore 都會綁定一個 alias,scheduler 在 Add Job 時,根據(jù)指定的 jobstore 在 scheduler 中找到相應(yīng)的 jobstore,并將 job 添加到 jobstore 中。作業(yè)存儲器決定任務(wù)的保存方式, 默認(rèn)存儲在內(nèi)存中(MemoryJobStore),重啟后就沒有了。APScheduler 支持的任務(wù)存儲器有:

          • jobstores.memory:內(nèi)存

          • jobstores.mongodb:存儲在 mongodb

          • jobstores.redis:存儲在 redis

          • jobstores.rethinkdb:存儲在 rethinkdb

          • jobstores.sqlalchemy:支持 sqlalchemy 的數(shù)據(jù)庫如 mysql,sqlite 等

          • jobstores.zookeeper:zookeeper

          不同的任務(wù)存儲器可以在調(diào)度器的配置中進(jìn)行配置(見調(diào)度器)

          Event 事件

          Event 是 APScheduler 在進(jìn)行某些操作時觸發(fā)相應(yīng)的事件,用戶可以自定義一些函數(shù)來監(jiān)聽這些事件,當(dāng)觸發(fā)某些 Event 時,做一些具體的操作。常見的比如。Job 執(zhí)行異常事件 EVENT_JOB_ERROR。Job 執(zhí)行時間錯過事件 EVENT_JOB_MISSED。

          目前 APScheduler 定義的 Event:

          • EVENT_SCHEDULER_STARTED

          • EVENT_SCHEDULER_START

          • EVENT_SCHEDULER_SHUTDOWN

          • EVENT_SCHEDULER_PAUSED

          • EVENT_SCHEDULER_RESUMED

          • EVENT_EXECUTOR_ADDED

          • EVENT_EXECUTOR_REMOVED

          • EVENT_JOBSTORE_ADDED

          • EVENT_JOBSTORE_REMOVED

          • EVENT_ALL_JOBS_REMOVED

          • EVENT_JOB_ADDED

          • EVENT_JOB_REMOVED

          • EVENT_JOB_MODIFIED

          • EVENT_JOB_EXECUTED

          • EVENT_JOB_ERROR

          • EVENT_JOB_MISSED

          • EVENT_JOB_SUBMITTED

          • EVENT_JOB_MAX_INSTANCES

          Listener 表示用戶自定義監(jiān)聽的一些 Event,比如當(dāng) Job 觸發(fā)了 EVENT_JOB_MISSED 事件時可以根據(jù)需求做一些其他處理。

          調(diào)度器

          Scheduler 是 APScheduler 的核心,所有相關(guān)組件通過其定義。scheduler 啟動之后,將開始按照配置的任務(wù)進(jìn)行調(diào)度。除了依據(jù)所有定義 Job 的 trigger 生成的將要調(diào)度時間喚醒調(diào)度之外。當(dāng)發(fā)生 Job 信息變更時也會觸發(fā)調(diào)度。

          APScheduler 支持的調(diào)度器方式如下,比較常用的為 BlockingScheduler 和 BackgroundScheduler

          • BlockingScheduler:適用于調(diào)度程序是進(jìn)程中唯一運(yùn)行的進(jìn)程,調(diào)用 start 函數(shù)會阻塞當(dāng)前線程,不能立即返回。

          • BackgroundScheduler:適用于調(diào)度程序在應(yīng)用程序的后臺運(yùn)行,調(diào)用 start 后主線程不會阻塞。

          • AsyncIOScheduler:適用于使用了 asyncio 模塊的應(yīng)用程序。

          • GeventScheduler:適用于使用 gevent 模塊的應(yīng)用程序。

          • TwistedScheduler:適用于構(gòu)建 Twisted 的應(yīng)用程序。

          • QtScheduler:適用于構(gòu)建 Qt 的應(yīng)用程序。

          Scheduler 的工作流程

          Scheduler 添加 job 流程:

          Scheduler 調(diào)度流程:

          使用分布式消息系統(tǒng) Celery 實現(xiàn)定時任務(wù)

          Celery[6] 是一個簡單,靈活,可靠的分布式系統(tǒng),用于處理大量消息,同時為操作提供維護(hù)此類系統(tǒng)所需的工具,也可用于任務(wù)調(diào)度。Celery 的配置比較麻煩,如果你只是需要一個輕量級的調(diào)度工具,Celery 不會是一個好選擇。

          Celery 是一個強(qiáng)大的分布式任務(wù)隊列,它可以讓任務(wù)的執(zhí)行完全脫離主程序,甚至可以被分配到其他主機(jī)上運(yùn)行。我們通常使用它來實現(xiàn)異步任務(wù)(async task)和定時任務(wù)(crontab)。異步任務(wù)比如是發(fā)送郵件、或者文件上傳,圖像處理等等一些比較耗時的操作 ,定時任務(wù)是需要在特定時間執(zhí)行的任務(wù)。

          需要注意,celery 本身并不具備任務(wù)的存儲功能,在調(diào)度任務(wù)的時候肯定是要把任務(wù)存起來的,因此在使用 celery 的時候還需要搭配一些具備存儲、訪問功能的工具,比如:消息隊列、Redis 緩存、數(shù)據(jù)庫等。官方推薦的是消息隊列 RabbitMQ,有些時候使用 Redis 也是不錯的選擇。

          它的架構(gòu)組成如下圖:

          Celery 架構(gòu),它采用典型的生產(chǎn)者-消費(fèi)者模式,主要由以下部分組成:

          • Celery Beat,任務(wù)調(diào)度器,Beat 進(jìn)程會讀取配置文件的內(nèi)容,周期性地將配置中到期需要執(zhí)行的任務(wù)發(fā)送給任務(wù)隊列。

          • Producer:需要在隊列中進(jìn)行的任務(wù),一般由用戶、觸發(fā)器或其他操作將任務(wù)入隊,然后交由 workers 進(jìn)行處理。調(diào)用了 Celery 提供的 API、函數(shù)或者裝飾器而產(chǎn)生任務(wù)并交給任務(wù)隊列處理的都是任務(wù)生產(chǎn)者。

          • Broker,即消息中間件,在這指任務(wù)隊列本身,Celery 扮演生產(chǎn)者和消費(fèi)者的角色,brokers 就是生產(chǎn)者和消費(fèi)者存放/獲取產(chǎn)品的地方(隊列)。

          • Celery Worker,執(zhí)行任務(wù)的消費(fèi)者,從隊列中取出任務(wù)并執(zhí)行。通常會在多臺服務(wù)器運(yùn)行多個消費(fèi)者來提高執(zhí)行效率。

          • Result Backend:任務(wù)處理完后保存狀態(tài)信息和結(jié)果,以供查詢。Celery 默認(rèn)已支持 Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy 等方式。

          實際應(yīng)用中,用戶從 Web 前端發(fā)起一個請求,我們只需要將請求所要處理的任務(wù)丟入任務(wù)隊列 broker 中,由空閑的 worker 去處理任務(wù)即可,處理的結(jié)果會暫存在后臺數(shù)據(jù)庫 backend 中。我們可以在一臺機(jī)器或多臺機(jī)器上同時起多個 worker 進(jìn)程來實現(xiàn)分布式地并行處理任務(wù)。

          Celery 定時任務(wù)實例:

          • Python Celery & RabbitMQ Tutorial[7]

          • Celery 配置實踐筆記[8]

          使用數(shù)據(jù)流工具 Apache Airflow 實現(xiàn)定時任務(wù)

          Apache Airflow[9] 是 Airbnb 開源的一款數(shù)據(jù)流程工具,目前是 Apache 孵化項目。以非常靈活的方式來支持?jǐn)?shù)據(jù)的 ETL 過程,同時還支持非常多的插件來完成諸如 HDFS 監(jiān)控、郵件通知等功能。Airflow 支持單機(jī)和分布式兩種模式,支持 Master-Slave 模式,支持 Mesos 等資源調(diào)度,有非常好的擴(kuò)展性。被大量公司采用。

          Airflow 使用 Python 開發(fā),它通過 DAGs(Directed Acyclic Graph, 有向無環(huán)圖)來表達(dá)一個工作流中所要執(zhí)行的任務(wù),以及任務(wù)之間的關(guān)系和依賴。比如,如下的工作流中,任務(wù) T1 執(zhí)行完成,T2 和 T3 才能開始執(zhí)行,T2 和 T3 都執(zhí)行完成,T4 才能開始執(zhí)行。

          Airflow 提供了各種 Operator 實現(xiàn),可以完成各種任務(wù)實現(xiàn):

          • BashOperator – 執(zhí)行 bash 命令或腳本。

          • SSHOperator – 執(zhí)行遠(yuǎn)程 bash 命令或腳本(原理同 paramiko 模塊)。

          • PythonOperator – 執(zhí)行 Python 函數(shù)。

          • EmailOperator – 發(fā)送 Email。

          • HTTPOperator – 發(fā)送一個 HTTP 請求。

          • MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等,執(zhí)行 SQL 任務(wù)。

          • DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator…

          除了以上這些 Operators 還可以方便的自定義 Operators 滿足個性化的任務(wù)需求。

          一些情況下,我們需要根據(jù)執(zhí)行結(jié)果執(zhí)行不同的任務(wù),這樣工作流會產(chǎn)生分支。

          如:

          這種需求可以使用 BranchPythonOperator 來實現(xiàn)。

          Airflow 產(chǎn)生的背景

          通常,在一個運(yùn)維系統(tǒng),數(shù)據(jù)分析系統(tǒng),或測試系統(tǒng)等大型系統(tǒng)中,我們會有各種各樣的依賴需求。包括但不限于:

          • 時間依賴:任務(wù)需要等待某一個時間點觸發(fā)。

          • 外部系統(tǒng)依賴:任務(wù)依賴外部系統(tǒng)需要調(diào)用接口去訪問。

          • 任務(wù)間依賴:任務(wù) A 需要在任務(wù) B 完成后啟動,兩個任務(wù)互相間會產(chǎn)生影響。

          • 資源環(huán)境依賴:任務(wù)消耗資源非常多, 或者只能在特定的機(jī)器上執(zhí)行。

          crontab 可以很好地處理定時執(zhí)行任務(wù)的需求,但僅能管理時間上的依賴。Airflow 的核心概念 DAG(有向無環(huán)圖)—— 來表現(xiàn)工作流。

          • Airflow 是一種 WMS,即:它將任務(wù)以及它們的依賴看作代碼,按照那些計劃規(guī)范任務(wù)執(zhí)行,并在實際工作進(jìn)程之間分發(fā)需執(zhí)行的任務(wù)。

          • Airflow 提供了一個用于顯示當(dāng)前活動任務(wù)和過去任務(wù)狀態(tài)的優(yōu)秀 UI,并允許用戶手動管理任務(wù)的執(zhí)行和狀態(tài)。

          • Airflow 中的工作流是具有方向性依賴的任務(wù)集合。

          • DAG 中的每個節(jié)點都是一個任務(wù),DAG 中的邊表示的是任務(wù)之間的依賴(強(qiáng)制為有向無環(huán),因此不會出現(xiàn)循環(huán)依賴,從而導(dǎo)致無限執(zhí)行循環(huán))。

          Airflow 核心概念

          • DAGs:即有向無環(huán)圖 (Directed Acyclic Graph),將所有需要運(yùn)行的 tasks 按照依賴關(guān)系組織起來,描述的是所有 tasks 執(zhí)行順序。

          • Operators:可以簡單理解為一個 class,描述了 DAG 中某個的 task 具體要做的事。其中,airflow 內(nèi)置了很多 operators,如 BashOperator 執(zhí)行一個 bash 命令,PythonOperator 調(diào)用任意的 Python 函數(shù),EmailOperator 用于發(fā)送郵件,HTTPOperator 用于發(fā)送 HTTP 請求, SqlOperator 用于執(zhí)行 SQL 命令等等,同時,用戶可以自定義 Operator,這給用戶提供了極大的便利性。

          • Tasks:Task 是 Operator 的一個實例,也就是 DAGs 中的一個 node。

          • Task Instance:task 的一次運(yùn)行。Web 界面中可以看到 task instance 有自己的狀態(tài),包括”running”, “success”, “failed”, “skipped”, “up for retry”等。

          • Task Relationships:DAGs 中的不同 Tasks 之間可以有依賴關(guān)系,如 Task1 >> Task2,表明 Task2 依賴于 Task2 了。通過將 DAGs 和 Operators 結(jié)合起來,用戶就可以創(chuàng)建各種復(fù)雜的 工作流(workflow)。

          Airflow 的架構(gòu)

          在一個可擴(kuò)展的生產(chǎn)環(huán)境中,Airflow 含有以下組件:

          • 元數(shù)據(jù)庫:這個數(shù)據(jù)庫存儲有關(guān)任務(wù)狀態(tài)的信息。

          • 調(diào)度器:Scheduler 是一種使用 DAG 定義結(jié)合元數(shù)據(jù)中的任務(wù)狀態(tài)來決定哪些任務(wù)需要被執(zhí)行以及任務(wù)執(zhí)行優(yōu)先級的過程。調(diào)度器通常作為服務(wù)運(yùn)行。

          • 執(zhí)行器:Executor 是一個消息隊列進(jìn)程,它被綁定到調(diào)度器中,用于確定實際執(zhí)行每個任務(wù)計劃的工作進(jìn)程。有不同類型的執(zhí)行器,每個執(zhí)行器都使用一個指定工作進(jìn)程的類來執(zhí)行任務(wù)。例如,LocalExecutor 使用與調(diào)度器進(jìn)程在同一臺機(jī)器上運(yùn)行的并行進(jìn)程執(zhí)行任務(wù)。其他像 CeleryExecutor 的執(zhí)行器使用存在于獨立的工作機(jī)器集群中的工作進(jìn)程執(zhí)行任務(wù)。

          • Workers:這些是實際執(zhí)行任務(wù)邏輯的進(jìn)程,由正在使用的執(zhí)行器確定。

          Worker 的具體實現(xiàn)由配置文件中的 executor 來指定,airflow 支持多種 Executor:

          • SequentialExecutor: 單進(jìn)程順序執(zhí)行,一般只用來測試

          • LocalExecutor: 本地多進(jìn)程執(zhí)行

          • CeleryExecutor: 使用 Celery 進(jìn)行分布式任務(wù)調(diào)度

          • DaskExecutor:使用 Dask[10] 進(jìn)行分布式任務(wù)調(diào)度

          • KubernetesExecutor: 1.10.0 新增,創(chuàng)建臨時 POD 執(zhí)行每次任務(wù)

          生產(chǎn)環(huán)境一般使用 CeleryExecutor 和 KubernetesExecutor。

          使用 CeleryExecutor 的架構(gòu)如圖:

          使用 KubernetesExecutor 的架構(gòu)如圖:

          其他參考:

          • Getting started with Apache Airflow[11]
          • Understanding Apache Airflow’s key concepts[12]

          參考資料

          [1]

          Linux 系統(tǒng)自帶的 crond: https://www.biaodianfu.com/crontab.html

          [2]

          Timeloop: https://github.com/sankalpjonn/timeloop

          [3]

          schedule: https://github.com/dbader/schedule

          [4]

          schedule: https://schedule.readthedocs.io/en/stable/

          [5]

          APScheduler: https://github.com/agronholm/apscheduler

          [6]

          Celery: https://github.com/celery/celery

          [7]

          Python Celery & RabbitMQ Tutorial: https://tests4geeks.com/blog/python-celery-rabbitmq-tutorial/

          [8]

          Celery 配置實踐筆記: https://github.com/biaodianfu/celery-demo

          [9]

          Apache Airflow: https://airflow.apache.org/

          [10]

          Dask: https://distributed.dask.org/en/latest/

          [11]

          Getting started with Apache Airflow: https://towardsdatascience.com/getting-started-with-apache-airflow-df1aa77d7b1b

          [12]

          Understanding Apache Airflow’s key concepts: https://medium.com/@dustinstansbury/understanding-apache-airflows-key-concepts-a96efed52b1a


          轉(zhuǎn)自:www.biaodianfu.com/python-schedule.html

          瀏覽 429
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  操逼免费的视频 | 91九色91蝌蚪91窝成人 | 国产操逼在线看 | 婷婷六月天在线 | 中日亚洲国产特级黄片 |