<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          從青銅到王者,Python 實現(xiàn)定時任務的八種方案,最后一個是神級!

          共 17600字,需瀏覽 36分鐘

           ·

          2021-10-23 03:45

          作者:錢魏Way

          https://www.biaodianfu.com/python-schedule.html

          在日常工作中,我們常常會用到需要周期性執(zhí)行的任務,一種方式是采用 Linux 系統(tǒng)自帶的 crond 結(jié)合命令行實現(xiàn)。另外一種方式是直接使用Python。接下來整理的是常見的Python定時任務的實現(xiàn)方式。

          目錄

          • 利用while True: + sleep()實現(xiàn)定時任務
          • 使用Timeloop庫運行定時任務
          • 利用threading.Timer實現(xiàn)定時任務
          • 利用內(nèi)置模塊sched實現(xiàn)定時任務
          • 利用調(diào)度模塊schedule實現(xiàn)定時任務
          • 利用任務框架APScheduler實現(xiàn)定時任務
            • Job 作業(yè)
            • Trigger 觸發(fā)器
            • Executor 執(zhí)行器
            • Jobstore 作業(yè)存儲
            • Event 事件
            • 調(diào)度器
            • APScheduler中的重要概念
            • Scheduler的工作流程
          • 使用分布式消息系統(tǒng)Celery實現(xiàn)定時任務
          • 使用數(shù)據(jù)流工具Apache Airflow實現(xiàn)定時任務
            • Airflow 產(chǎn)生的背景
            • Airflow 核心概念
            • Airflow 的架構(gòu)

          利用while True: + sleep()實現(xiàn)定時任務

          位于 time 模塊中的 sleep(secs) 函數(shù),可以實現(xiàn)令當前執(zhí)行的線程暫停 secs 秒后再繼續(xù)執(zhí)行。所謂暫停,即令當前線程進入阻塞狀態(tài),當達到 sleep() 函數(shù)規(guī)定的時間后,再由阻塞狀態(tài)轉(zhuǎn)為就緒狀態(tài),等待 CPU 調(diào)度。

          基于這樣的特性我們可以通過while死循環(huán)+sleep()的方式實現(xiàn)簡單的定時任務。

          代碼示例:

          import?datetime
          import?time
          def?time_printer():
          ????now?=?datetime.datetime.now()
          ????ts?=?now.strftime('%Y-%m-%d?%H:%M:%S')
          ????print('do?func?time?:',?ts)
          def?loop_monitor():
          ????while?True:
          ????????time_printer()
          ????????time.sleep(5)??#?暫停5秒
          if?__name__?==?"__main__":
          ????loop_monitor()

          主要缺點:

          • 只能設(shè)定間隔,不能指定具體的時間,比如每天早上8:00
          • sleep 是一個阻塞函數(shù),也就是說 sleep 這一段時間,程序什么也不能操作。

          使用Timeloop庫運行定時任務

          Timeloop是一個庫,可用于運行多周期任務。這是一個簡單的庫,它使用decorator模式在線程中運行標記函數(shù)。

          示例代碼:

          import?time
          from?timeloop?import?Timeloop
          from?datetime?import?timedelta
          tl?=?Timeloop()
          @tl.job(interval=timedelta(seconds=2))
          def?sample_job_every_2s():
          ????print?"2s?job?current?time?:?{}".format(time.ctime())
          @tl.job(interval=timedelta(seconds=5))
          def?sample_job_every_5s():
          ????print?"5s?job?current?time?:?{}".format(time.ctime())
          @tl.job(interval=timedelta(seconds=10))
          def?sample_job_every_10s():
          ????print?"10s?job?current?time?:?{}".format(time.ctime())

          利用threading.Timer實現(xiàn)定時任務

          threading 模塊中的 Timer 是一個非阻塞函數(shù),比 sleep 稍好一點,timer最基本理解就是定時器,我們可以啟動多個定時任務,這些定時器任務是異步執(zhí)行,所以不存在等待順序執(zhí)行問題。

          Timer(interval, function, args=[ ], kwargs={ })

          • interval: 指定的時間
          • function: 要執(zhí)行的方法
          • args/kwargs: 方法的參數(shù)

          代碼示例:

          import?datetime
          from?threading?import?Timer
          def?time_printer():
          ????now?=?datetime.datetime.now()
          ????ts?=?now.strftime('%Y-%m-%d?%H:%M:%S')
          ????print('do?func?time?:',?ts)
          ????loop_monitor()
          def?loop_monitor():
          ????t?=?Timer(5,?time_printer)
          ????t.start()
          if?__name__?==?"__main__":
          ????loop_monitor()

          備注:Timer只能執(zhí)行一次,這里需要循環(huán)調(diào)用,否則只能執(zhí)行一次

          利用內(nèi)置模塊sched實現(xiàn)定時任務

          sched模塊實現(xiàn)了一個通用事件調(diào)度器,在調(diào)度器類使用一個延遲函數(shù)等待特定的時間,執(zhí)行任務。同時支持多線程應用程序,在每個任務執(zhí)行后會立刻調(diào)用延時函數(shù),以確保其他線程也能執(zhí)行。

          class sched.scheduler(timefunc, delayfunc)這個類定義了調(diào)度事件的通用接口,它需要外部傳入兩個參數(shù),timefunc是一個沒有參數(shù)的返回時間類型數(shù)字的函數(shù)(常用使用的如time模塊里面的time),delayfunc應該是一個需要一個參數(shù)來調(diào)用、與timefunc的輸出兼容、并且作用為延遲多個時間單位的函數(shù)(常用的如time模塊的sleep)。

          代碼示例:

          import?datetime
          import?time
          import?sched
          def?time_printer():
          ????now?=?datetime.datetime.now()
          ????ts?=?now.strftime('%Y-%m-%d?%H:%M:%S')
          ????print('do?func?time?:',?ts)
          ????loop_monitor()
          def?loop_monitor():
          ????s?=?sched.scheduler(time.time,?time.sleep)??#?生成調(diào)度器
          ????s.enter(5,?1,?time_printer,?())
          ????s.run()
          if?__name__?==?"__main__":
          ????loop_monitor()

          scheduler對象主要方法:

          • enter(delay, priority, action, argument),安排一個事件來延遲delay個時間單位。
          • cancel(event):從隊列中刪除事件。如果事件不是當前隊列中的事件,則該方法將跑出一個ValueError。
          • run():運行所有預定的事件。這個函數(shù)將等待(使用傳遞給構(gòu)造函數(shù)的delayfunc()函數(shù)),然后執(zhí)行事件,直到不再有預定的事件。

          個人點評:比threading.Timer更好,不需要循環(huán)調(diào)用。

          利用調(diào)度模塊schedule實現(xiàn)定時任務

          schedule是一個第三方輕量級的任務調(diào)度模塊,可以按照秒,分,小時,日期或者自定義事件執(zhí)行時間。schedule允許用戶使用簡單、人性化的語法以預定的時間間隔定期運行Python函數(shù)(或其它可調(diào)用函數(shù))。

          先來看代碼,是不是不看文檔就能明白什么意思?

          import?schedule
          import?time
          def?job():
          ????print("I'm?working...")
          schedule.every(10).seconds.do(job)
          schedule.every(10).minutes.do(job)
          schedule.every().hour.do(job)
          schedule.every().day.at("10:30").do(job)
          schedule.every(5).to(10).minutes.do(job)
          schedule.every().monday.do(job)
          schedule.every().wednesday.at("13:15").do(job)
          schedule.every().minute.at(":17").do(job)
          while?True:
          ????schedule.run_pending()
          ????time.sleep(1)

          裝飾器:通過 @repeat() 裝飾靜態(tài)方法

          import?time
          from?schedule?import?every,?repeat,?run_pending
          @repeat(every().second)
          def?job():
          ????print('working...')
          while?True:
          ????run_pending()
          ????time.sleep(1)

          傳遞參數(shù):

          import?schedule
          def?greet(name):
          ????print('Hello',?name)
          schedule.every(2).seconds.do(greet,?name='Alice')
          schedule.every(4).seconds.do(greet,?name='Bob')
          while?True:
          ????schedule.run_pending()

          裝飾器同樣能傳遞參數(shù):

          from?schedule?import?every,?repeat,?run_pending
          @repeat(every().second,?'World')
          @repeat(every().minute,?'Mars')
          def?hello(planet):
          ????print('Hello',?planet)
          while?True:
          ????run_pending()

          取消任務:

          import?schedule
          i?=?0
          def?some_task():
          ????global?i
          ????i?+=?1
          ????print(i)
          ????if?i?==?10:
          ????????schedule.cancel_job(job)
          ????????print('cancel?job')
          ????????exit(0)
          job?=?schedule.every().second.do(some_task)
          while?True:
          ????schedule.run_pending()

          運行一次任務:

          import?time
          import?schedule
          def?job_that_executes_once():
          ????print('Hello')
          ????return?schedule.CancelJob
          schedule.every().minute.at(':34').do(job_that_executes_once)
          while?True:
          ????schedule.run_pending()
          ????time.sleep(1)

          根據(jù)標簽檢索任務:

          #?檢索所有任務:schedule.get_jobs()
          import?schedule
          def?greet(name):
          ????print('Hello?{}'.format(name))
          schedule.every().day.do(greet,?'Andrea').tag('daily-tasks',?'friend')
          schedule.every().hour.do(greet,?'John').tag('hourly-tasks',?'friend')
          schedule.every().hour.do(greet,?'Monica').tag('hourly-tasks',?'customer')
          schedule.every().day.do(greet,?'Derek').tag('daily-tasks',?'guest')
          friends?=?schedule.get_jobs('friend')
          print(friends)

          根據(jù)標簽取消任務:

          #?取消所有任務:schedule.clear()
          import?schedule
          def?greet(name):
          ????print('Hello?{}'.format(name))
          ????if?name?==?'Cancel':
          ????????schedule.clear('second-tasks')
          ????????print('cancel?second-tasks')
          schedule.every().second.do(greet,?'Andrea').tag('second-tasks',?'friend')
          schedule.every().second.do(greet,?'John').tag('second-tasks',?'friend')
          schedule.every().hour.do(greet,?'Monica').tag('hourly-tasks',?'customer')
          schedule.every(5).seconds.do(greet,?'Cancel').tag('daily-tasks',?'guest')
          while?True:
          ????schedule.run_pending()

          運行任務到某時間:

          import?schedule
          from?datetime?import?datetime,?timedelta,?time
          def?job():
          ????print('working...')
          schedule.every().second.until('23:59').do(job)??#?今天23:59停止
          schedule.every().second.until('2030-01-01?18:30').do(job)??#?2030-01-01?18:30停止
          schedule.every().second.until(timedelta(hours=8)).do(job)??#?8小時后停止
          schedule.every().second.until(time(23,?59,?59)).do(job)??#?今天23:59:59停止
          schedule.every().second.until(datetime(2030,?1,?1,?18,?30,?0)).do(job)??#?2030-01-01?18:30停止
          while?True:
          ????schedule.run_pending()

          馬上運行所有任務(主要用于測試):

          import?schedule
          def?job():
          ????print('working...')
          def?job1():
          ????print('Hello...')
          schedule.every().monday.at('12:40').do(job)
          schedule.every().tuesday.at('16:40').do(job1)
          schedule.run_all()
          schedule.run_all(delay_seconds=3)??#?任務間延遲3秒

          并行運行:使用 Python 內(nèi)置隊列實現(xiàn):

          import?threading
          import?time
          import?schedule
          def?job1():
          ????print("I'm?running?on?thread?%s"?%?threading.current_thread())
          def?job2():
          ????print("I'm?running?on?thread?%s"?%?threading.current_thread())
          def?job3():
          ????print("I'm?running?on?thread?%s"?%?threading.current_thread())
          def?run_threaded(job_func):
          ????job_thread?=?threading.Thread(target=job_func)
          ????job_thread.start()
          schedule.every(10).seconds.do(run_threaded,?job1)
          schedule.every(10).seconds.do(run_threaded,?job2)
          schedule.every(10).seconds.do(run_threaded,?job3)
          while?True:
          ????schedule.run_pending()
          ????time.sleep(1)

          利用任務框架APScheduler實現(xiàn)定時任務

          APScheduler(advanceded python scheduler)基于Quartz的一個Python定時任務框架,實現(xiàn)了Quartz的所有功能,使用起來十分方便。提供了基于日期、固定時間間隔以及crontab類型的任務,并且可以持久化任務?;谶@些功能,我們可以很方便的實現(xiàn)一個Python定時任務系統(tǒng)。

          它有以下三個特點:

          • 類似于 Liunx Cron 的調(diào)度程序(可選的開始/結(jié)束時間)
          • 基于時間間隔的執(zhí)行調(diào)度(周期性調(diào)度,可選的開始/結(jié)束時間)
          • 一次性執(zhí)行任務(在設(shè)定的日期/時間運行一次任務)

          APScheduler有四種組成部分:

          • 觸發(fā)器(trigger) 包含調(diào)度邏輯,每一個作業(yè)有它自己的觸發(fā)器,用于決定接下來哪一個作業(yè)會運行。除了他們自己初始配置意外,觸發(fā)器完全是無狀態(tài)的。
          • 作業(yè)存儲(job store) 存儲被調(diào)度的作業(yè),默認的作業(yè)存儲是簡單地把作業(yè)保存在內(nèi)存中,其他的作業(yè)存儲是將作業(yè)保存在數(shù)據(jù)庫中。一個作業(yè)的數(shù)據(jù)講在保存在持久化作業(yè)存儲時被序列化,并在加載時被反序列化。調(diào)度器不能分享同一個作業(yè)存儲。
          • 執(zhí)行器(executor) 處理作業(yè)的運行,他們通常通過在作業(yè)中提交制定的可調(diào)用對象到一個線程或者進城池來進行。當作業(yè)完成時,執(zhí)行器將會通知調(diào)度器。
          • 調(diào)度器(scheduler) 是其他的組成部分。你通常在應用只有一個調(diào)度器,應用的開發(fā)者通常不會直接處理作業(yè)存儲、調(diào)度器和觸發(fā)器,相反,調(diào)度器提供了處理這些的合適的接口。配置作業(yè)存儲和執(zhí)行器可以在調(diào)度器中完成,例如添加、修改和移除作業(yè)。通過配置executor、jobstore、trigger,使用線程池(ThreadPoolExecutor默認值20)或進程池(ProcessPoolExecutor 默認值5)并且默認最多3個(max_instances)任務實例同時運行,實現(xiàn)對job的增刪改查等調(diào)度控制

          示例代碼:

          from?apscheduler.schedulers.blocking?import?BlockingScheduler
          from?datetime?import?datetime
          #?輸出時間
          def?job():
          ????print(datetime.now().strftime("%Y-%m-%d?%H:%M:%S"))
          #?BlockingScheduler
          sched?=?BlockingScheduler()
          sched.add_job(my_job,?'interval',?seconds=5,?id='my_job_id')
          sched.start()

          APScheduler中的重要概念

          Job 作業(yè)

          Job作為APScheduler最小執(zhí)行單位。創(chuàng)建Job時指定執(zhí)行的函數(shù),函數(shù)中所需參數(shù),Job執(zhí)行時的一些設(shè)置信息。

          構(gòu)建說明:

          • id:指定作業(yè)的唯一ID
          • name:指定作業(yè)的名字
          • trigger:apscheduler定義的觸發(fā)器,用于確定Job的執(zhí)行時間,根據(jù)設(shè)置的trigger規(guī)則,計算得到下次執(zhí)行此job的時間, 滿足時將會執(zhí)行
          • executor:apscheduler定義的執(zhí)行器,job創(chuàng)建時設(shè)置執(zhí)行器的名字,根據(jù)字符串你名字到scheduler獲取到執(zhí)行此job的 執(zhí)行器,執(zhí)行job指定的函數(shù)
          • max_instances:執(zhí)行此job的最大實例數(shù),executor執(zhí)行job時,根據(jù)job的id來計算執(zhí)行次數(shù),根據(jù)設(shè)置的最大實例數(shù)來確定是否可執(zhí)行
          • next_run_time:Job下次的執(zhí)行時間,創(chuàng)建Job時可以指定一個時間[datetime],不指定的話則默認根據(jù)trigger獲取觸發(fā)時間
          • misfire_grace_time:Job的延遲執(zhí)行時間,例如Job的計劃執(zhí)行時間是21:00:00,但因服務重啟或其他原因?qū)е?1:00:31才執(zhí)行,如果設(shè)置此key為40,則該job會繼續(xù)執(zhí)行,否則將會丟棄此job
          • coalesce:Job是否合并執(zhí)行,是一個bool值。例如scheduler停止20s后重啟啟動,而job的觸發(fā)器設(shè)置為5s執(zhí)行一次,因此此job錯過了4個執(zhí)行時間,如果設(shè)置為是,則會合并到一次執(zhí)行,否則會逐個執(zhí)行
          • func:Job執(zhí)行的函數(shù)
          • args:Job執(zhí)行函數(shù)需要的位置參數(shù)
          • kwargs:Job執(zhí)行函數(shù)需要的關(guān)鍵字參數(shù)

          Trigger 觸發(fā)器

          Trigger綁定到Job,在scheduler調(diào)度篩選Job時,根據(jù)觸發(fā)器的規(guī)則計算出Job的觸發(fā)時間,然后與當前時間比較確定此Job是否會被執(zhí)行,總之就是根據(jù)trigger規(guī)則計算出下一個執(zhí)行時間。

          目前APScheduler支持觸發(fā)器:

          • 指定時間的DateTrigger
          • 指定間隔時間的IntervalTrigger
          • 像Linux的crontab一樣的CronTrigger。

          觸發(fā)器參數(shù):date

          date定時,作業(yè)只執(zhí)行一次。

          • run_date (datetime|str) – the date/time to run the job at
          • timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
          sched.add_job(my_job,?'date',?run_date=date(2009,?11,?6),?args=['text'])
          sched.add_job(my_job,?'date',?run_date=datetime(2019,?7,?6,?16,?30,?5),?args=['text'])

          觸發(fā)器參數(shù):interval

          interval間隔調(diào)度

          • weeks (int) – ?間隔幾周
          • days (int) – ?間隔幾天
          • hours (int) – ?間隔幾小時
          • minutes (int) – ?間隔幾分鐘
          • seconds (int) – ?間隔多少秒
          • start_date (datetime|str) – ?開始日期
          • end_date (datetime|str) – ?結(jié)束日期
          • timezone (datetime.tzinfo|str) – ?時區(qū)
          sched.add_job(job_function,?'interval',?hours=2)

          觸發(fā)器參數(shù):cron

          cron調(diào)度

          • (int|str) 表示參數(shù)既可以是int類型,也可以是str類型
          • (datetime | str) 表示參數(shù)既可以是datetime類型,也可以是str類型
          • year (int|str) – 4-digit year -(表示四位數(shù)的年份,如2008年)
          • month (int|str) – month (1-12) -(表示取值范圍為1-12月)
          • day (int|str) – day of the (1-31) -(表示取值范圍為1-31日)
          • week (int|str) – ISO week (1-53) -(格里歷2006年12月31日可以寫成2006年-W52-7(擴展形式)或2006W527(緊湊形式))
          • day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) – (表示一周中的第幾天,既可以用0-6表示也可以用其英語縮寫表示)
          • hour (int|str) – hour (0-23) – (表示取值范圍為0-23時)
          • minute (int|str) – minute (0-59) – (表示取值范圍為0-59分)
          • second (int|str) – second (0-59) – (表示取值范圍為0-59秒)
          • start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) – (表示開始時間)
          • end_date (datetime|str) – latest possible date/time to trigger on (inclusive) – (表示結(jié)束時間)
          • timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示時區(qū)取值)

          CronTrigger可用的表達式:

          表達式參數(shù)類型描述
          *所有通配符。例:minutes=*即每分鐘觸發(fā)
          * / a所有每隔時長a執(zhí)行一次。例:minutes=”* / 3″ 即每隔3分鐘執(zhí)行一次
          a – b所有a – b的范圍內(nèi)觸發(fā)。例:minutes=“2-5”。即2到5分鐘內(nèi)每分鐘執(zhí)行一次
          a – b / c所有a – b范圍內(nèi),每隔時長c執(zhí)行一次。
          xth y第幾個星期幾觸發(fā)。x為第幾個,y為星期幾
          last x一個月中,最后一個星期的星期幾觸發(fā)
          last一個月中的最后一天觸發(fā)
          x, y, z所有組合表達式,可以組合確定值或上述表達式
          #?6-8,11-12月第三個周五?00:00,?01:00,?02:00,?03:00運行
          sched.add_job(job_function,?'cron',?month='6-8,11-12',?day='3rd?fri',?hour='0-3')
          #?每周一到周五運行?直到2024-05-30?00:00:00
          sched.add_job(job_function,?'cron',?day_of_week='mon-fri',?hour=5,?minute=30,?end_date='2024-05-30'

          Executor 執(zhí)行器

          Executor在scheduler中初始化,另外也可通過scheduler的add_executor動態(tài)添加Executor。每個executor都會綁定一個alias,這個作為唯一標識綁定到Job,在實際執(zhí)行時會根據(jù)Job綁定的executor找到實際的執(zhí)行器對象,然后根據(jù)執(zhí)行器對象執(zhí)行Job。

          Executor的種類會根據(jù)不同的調(diào)度來選擇,如果選擇AsyncIO作為調(diào)度的庫,那么選擇AsyncIOExecutor,如果選擇tornado作為調(diào)度的庫,選擇TornadoExecutor,如果選擇啟動進程作為調(diào)度,選擇ThreadPoolExecutor或者ProcessPoolExecutor都可以。

          Executor的選擇需要根據(jù)實際的scheduler來選擇不同的執(zhí)行器。目前APScheduler支持的Executor:

          • executors.asyncio:同步io,阻塞
          • executors.gevent:io多路復用,非阻塞
          • executors.pool: 線程ThreadPoolExecutor和進程ProcessPoolExecutor
          • executors.twisted:基于事件驅(qū)動

          Jobstore 作業(yè)存儲

          Jobstore在scheduler中初始化,另外也可通過scheduler的add_jobstore動態(tài)添加Jobstore。每個jobstore都會綁定一個alias,scheduler在Add Job時,根據(jù)指定的jobstore在scheduler中找到相應的jobstore,并將job添加到jobstore中。作業(yè)存儲器決定任務的保存方式, 默認存儲在內(nèi)存中(MemoryJobStore),重啟后就沒有了。APScheduler支持的任務存儲器有:

          • jobstores.memory:內(nèi)存
          • jobstores.mongodb:存儲在mongodb
          • jobstores.redis:存儲在redis
          • jobstores.rethinkdb:存儲在rethinkdb
          • jobstores.sqlalchemy:支持sqlalchemy的數(shù)據(jù)庫如mysql,sqlite等
          • jobstores.zookeeper:zookeeper

          不同的任務存儲器可以在調(diào)度器的配置中進行配置(見調(diào)度器)

          Event 事件

          Event是APScheduler在進行某些操作時觸發(fā)相應的事件,用戶可以自定義一些函數(shù)來監(jiān)聽這些事件,當觸發(fā)某些Event時,做一些具體的操作。常見的比如。Job執(zhí)行異常事件 EVENT_JOB_ERROR。Job執(zhí)行時間錯過事件 EVENT_JOB_MISSED。

          目前APScheduler定義的Event:

          • EVENT_SCHEDULER_STARTED
          • EVENT_SCHEDULER_START
          • EVENT_SCHEDULER_SHUTDOWN
          • EVENT_SCHEDULER_PAUSED
          • EVENT_SCHEDULER_RESUMED
          • EVENT_EXECUTOR_ADDED
          • EVENT_EXECUTOR_REMOVED
          • EVENT_JOBSTORE_ADDED
          • EVENT_JOBSTORE_REMOVED
          • EVENT_ALL_JOBS_REMOVED
          • EVENT_JOB_ADDED
          • EVENT_JOB_REMOVED
          • EVENT_JOB_MODIFIED
          • EVENT_JOB_EXECUTED
          • EVENT_JOB_ERROR
          • EVENT_JOB_MISSED
          • EVENT_JOB_SUBMITTED
          • EVENT_JOB_MAX_INSTANCES

          Listener表示用戶自定義監(jiān)聽的一些Event,比如當Job觸發(fā)了EVENT_JOB_MISSED事件時可以根據(jù)需求做一些其他處理。

          調(diào)度器

          Scheduler是APScheduler的核心,所有相關(guān)組件通過其定義。scheduler啟動之后,將開始按照配置的任務進行調(diào)度。除了依據(jù)所有定義Job的trigger生成的將要調(diào)度時間喚醒調(diào)度之外。當發(fā)生Job信息變更時也會觸發(fā)調(diào)度。

          APScheduler支持的調(diào)度器方式如下,比較常用的為BlockingScheduler和BackgroundScheduler

          • BlockingScheduler:適用于調(diào)度程序是進程中唯一運行的進程,調(diào)用start函數(shù)會阻塞當前線程,不能立即返回。
          • BackgroundScheduler:適用于調(diào)度程序在應用程序的后臺運行,調(diào)用start后主線程不會阻塞。
          • AsyncIOScheduler:適用于使用了asyncio模塊的應用程序。
          • GeventScheduler:適用于使用gevent模塊的應用程序。
          • TwistedScheduler:適用于構(gòu)建Twisted的應用程序。
          • QtScheduler:適用于構(gòu)建Qt的應用程序。

          Scheduler的工作流程

          Scheduler添加job流程:

          Scheduler調(diào)度流程:

          使用分布式消息系統(tǒng)Celery實現(xiàn)定時任務

          Celery是一個簡單,靈活,可靠的分布式系統(tǒng),用于處理大量消息,同時為操作提供維護此類系統(tǒng)所需的工具, 也可用于任務調(diào)度。Celery 的配置比較麻煩,如果你只是需要一個輕量級的調(diào)度工具,Celery 不會是一個好選擇。

          Celery 是一個強大的分布式任務隊列,它可以讓任務的執(zhí)行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來實現(xiàn)異步任務(async task)和定時任務(crontab)。異步任務比如是發(fā)送郵件、或者文件上傳, 圖像處理等等一些比較耗時的操作 ,定時任務是需要在特定時間執(zhí)行的任務。

          需要注意,celery本身并不具備任務的存儲功能,在調(diào)度任務的時候肯定是要把任務存起來的,因此在使用celery的時候還需要搭配一些具備存儲、訪問功能的工具,比如:消息隊列、Redis緩存、數(shù)據(jù)庫等。官方推薦的是消息隊列RabbitMQ,有些時候使用Redis也是不錯的選擇。

          它的架構(gòu)組成如下圖:

          Celery架構(gòu),它采用典型的生產(chǎn)者-消費者模式,主要由以下部分組成:

          • Celery Beat,任務調(diào)度器,Beat進程會讀取配置文件的內(nèi)容,周期性地將配置中到期需要執(zhí)行的任務發(fā)送給任務隊列。
          • Producer:需要在隊列中進行的任務,一般由用戶、觸發(fā)器或其他操作將任務入隊,然后交由workers進行處理。調(diào)用了Celery提供的API、函數(shù)或者裝飾器而產(chǎn)生任務并交給任務隊列處理的都是任務生產(chǎn)者。
          • Broker,即消息中間件,在這指任務隊列本身,Celery扮演生產(chǎn)者和消費者的角色,brokers就是生產(chǎn)者和消費者存放/獲取產(chǎn)品的地方(隊列)。
          • Celery Worker,執(zhí)行任務的消費者,從隊列中取出任務并執(zhí)行。通常會在多臺服務器運行多個消費者來提高執(zhí)行效率。
          • Result Backend:任務處理完后保存狀態(tài)信息和結(jié)果,以供查詢。Celery默認已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。

          實際應用中,用戶從Web前端發(fā)起一個請求,我們只需要將請求所要處理的任務丟入任務隊列broker中,由空閑的worker去處理任務即可,處理的結(jié)果會暫存在后臺數(shù)據(jù)庫backend中。我們可以在一臺機器或多臺機器上同時起多個worker進程來實現(xiàn)分布式地并行處理任務。

          Celery定時任務實例:

          • Python Celery & RabbitMQ Tutorial
          • Celery 配置實踐筆記

          使用數(shù)據(jù)流工具Apache Airflow實現(xiàn)定時任務

          Apache Airflow 是Airbnb開源的一款數(shù)據(jù)流程工具,目前是Apache孵化項目。以非常靈活的方式來支持數(shù)據(jù)的ETL過程,同時還支持非常多的插件來完成諸如HDFS監(jiān)控、郵件通知等功能。Airflow支持單機和分布式兩種模式,支持Master-Slave模式,支持Mesos等資源調(diào)度,有非常好的擴展性。被大量公司采用。

          Airflow使用Python開發(fā),它通過DAGs(Directed Acyclic Graph, 有向無環(huán)圖)來表達一個工作流中所要執(zhí)行的任務,以及任務之間的關(guān)系和依賴。比如,如下的工作流中,任務T1執(zhí)行完成,T2和T3才能開始執(zhí)行,T2和T3都執(zhí)行完成,T4才能開始執(zhí)行。

          Airflow提供了各種Operator實現(xiàn),可以完成各種任務實現(xiàn):

          • BashOperator – 執(zhí)行 bash 命令或腳本。
          • SSHOperator – 執(zhí)行遠程 bash 命令或腳本(原理同 paramiko 模塊)。
          • PythonOperator – 執(zhí)行 Python 函數(shù)。
          • EmailOperator – 發(fā)送 Email。
          • HTTPOperator – 發(fā)送一個 HTTP 請求。
          • MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等,執(zhí)行 SQL 任務。
          • DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator…

          除了以上這些 Operators 還可以方便的自定義 Operators 滿足個性化的任務需求。

          一些情況下,我們需要根據(jù)執(zhí)行結(jié)果執(zhí)行不同的任務,這樣工作流會產(chǎn)生分支。如:

          這種需求可以使用BranchPythonOperator來實現(xiàn)。

          Airflow 產(chǎn)生的背景

          通常,在一個運維系統(tǒng),數(shù)據(jù)分析系統(tǒng),或測試系統(tǒng)等大型系統(tǒng)中,我們會有各種各樣的依賴需求。包括但不限于:

          • 時間依賴:任務需要等待某一個時間點觸發(fā)。
          • 外部系統(tǒng)依賴:任務依賴外部系統(tǒng)需要調(diào)用接口去訪問。
          • 任務間依賴:任務 A 需要在任務 B 完成后啟動,兩個任務互相間會產(chǎn)生影響。
          • 資源環(huán)境依賴:任務消耗資源非常多, 或者只能在特定的機器上執(zhí)行。

          crontab 可以很好地處理定時執(zhí)行任務的需求,但僅能管理時間上的依賴。Airflow 的核心概念 DAG(有向無環(huán)圖)—— 來表現(xiàn)工作流。

          • Airflow 是一種 WMS,即:它將任務以及它們的依賴看作代碼,按照那些計劃規(guī)范任務執(zhí)行,并在實際工作進程之間分發(fā)需執(zhí)行的任務。
          • Airflow 提供了一個用于顯示當前活動任務和過去任務狀態(tài)的優(yōu)秀 UI,并允許用戶手動管理任務的執(zhí)行和狀態(tài)。
          • Airflow 中的工作流是具有方向性依賴的任務集合。
          • DAG 中的每個節(jié)點都是一個任務,DAG 中的邊表示的是任務之間的依賴(強制為有向無環(huán),因此不會出現(xiàn)循環(huán)依賴,從而導致無限執(zhí)行循環(huán))。

          Airflow 核心概念

          • DAGs:即有向無環(huán)圖(Directed Acyclic Graph),將所有需要運行的tasks按照依賴關(guān)系組織起來,描述的是所有tasks執(zhí)行順序。
          • Operators:可以簡單理解為一個class,描述了DAG中某個的task具體要做的事。其中,airflow內(nèi)置了很多operators,如BashOperator 執(zhí)行一個bash 命令,PythonOperator 調(diào)用任意的Python 函數(shù),EmailOperator 用于發(fā)送郵件,HTTPOperator 用于發(fā)送HTTP請求, SqlOperator 用于執(zhí)行SQL命令等等,同時,用戶可以自定義Operator,這給用戶提供了極大的便利性。
          • Tasks:Task 是 Operator的一個實例,也就是DAGs中的一個node。
          • Task Instance:task的一次運行。Web 界面中可以看到task instance 有自己的狀態(tài),包括”running”, “success”, “failed”, “skipped”, “up for retry”等。
          • Task Relationships:DAGs中的不同Tasks之間可以有依賴關(guān)系,如 Task1 >> Task2,表明Task2依賴于Task2了。通過將DAGs和Operators結(jié)合起來,用戶就可以創(chuàng)建各種復雜的 工作流(workflow)。

          Airflow 的架構(gòu)

          在一個可擴展的生產(chǎn)環(huán)境中,Airflow 含有以下組件:

          • 元數(shù)據(jù)庫:這個數(shù)據(jù)庫存儲有關(guān)任務狀態(tài)的信息。
          • 調(diào)度器:Scheduler 是一種使用 DAG 定義結(jié)合元數(shù)據(jù)中的任務狀態(tài)來決定哪些任務需要被執(zhí)行以及任務執(zhí)行優(yōu)先級的過程。調(diào)度器通常作為服務運行。
          • 執(zhí)行器:Executor 是一個消息隊列進程,它被綁定到調(diào)度器中,用于確定實際執(zhí)行每個任務計劃的工作進程。有不同類型的執(zhí)行器,每個執(zhí)行器都使用一個指定工作進程的類來執(zhí)行任務。例如,LocalExecutor 使用與調(diào)度器進程在同一臺機器上運行的并行進程執(zhí)行任務。其他像 CeleryExecutor 的執(zhí)行器使用存在于獨立的工作機器集群中的工作進程執(zhí)行任務。
          • Workers:這些是實際執(zhí)行任務邏輯的進程,由正在使用的執(zhí)行器確定。

          Worker的具體實現(xiàn)由配置文件中的executor來指定,airflow支持多種Executor:

          • SequentialExecutor: 單進程順序執(zhí)行,一般只用來測試
          • LocalExecutor: 本地多進程執(zhí)行
          • CeleryExecutor: 使用Celery進行分布式任務調(diào)度
          • DaskExecutor:使用Dask進行分布式任務調(diào)度
          • KubernetesExecutor: 1.10.0新增, 創(chuàng)建臨時POD執(zhí)行每次任務

          生產(chǎn)環(huán)境一般使用CeleryExecutor和KubernetesExecutor。

          使用CeleryExecutor的架構(gòu)如圖:

          使用KubernetesExecutor的架構(gòu)如圖:

          其它參考:

          • Getting started with Apache Airflow
          • Understanding Apache Airflow’s key concepts

          重要通知:

          菜鳥學Python團隊將開始拓展Go語言的內(nèi)容,歡迎關(guān)注我們的Go語言小號,我們將從Go的入門系列開始寫起,一步一步帶你研究Go語言,對go語言,云計算,大數(shù)據(jù),云原生有興趣的可以來支持一下。




          推薦閱讀:

          入門:?最全的零基礎(chǔ)學Python的問題? |?零基礎(chǔ)學了8個月的Python??|?實戰(zhàn)項目?|學Python就是這條捷徑


          量化:?定投基金到底能賺多少錢?? |?我用Python對去年800只基金的數(shù)據(jù)分析??


          干貨:爬取豆瓣短評,電影《后來的我們》?|?38年NBA最佳球員分析?|? ?從萬眾期待到口碑撲街!唐探3令人失望? |?笑看新倚天屠龍記?|?燈謎答題王?|用Python做個海量小姐姐素描圖?|碟中諜這么火,我用機器學習做個迷你推薦系統(tǒng)電影


          趣味:彈球游戲? |?九宮格? |?漂亮的花?|?兩百行Python《天天酷跑》游戲!


          AI:?會做詩的機器人?|?給圖片上色?|?預測收入?|?碟中諜這么火,我用機器學習做個迷你推薦系統(tǒng)電影


          小工具:?Pdf轉(zhuǎn)Word,輕松搞定表格和水??!?|?一鍵把html網(wǎng)頁保存為pdf!|??再見PDF提取收費!?|?用90行代碼打造最強PDF轉(zhuǎn)換器,word、PPT、excel、markdown、html一鍵轉(zhuǎn)換?|?制作一款釘釘?shù)蛢r機票提示器!?|60行代碼做了一個語音壁紙切換器天天看小姐姐!


          年度爆款文案


          點閱讀原文,看Python全套!

          瀏覽 50
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  久色网站| 欧美日在线 | AV色中色| 国产综合久久777777歌曲 | 婷婷爱五月天 |