5分鐘快速掌握 Python 定時(shí)任務(wù)框架

來源 | Python中文社區(qū)
一、APScheduler 簡介
schedule:第三方模塊,該模塊適合比較輕量級的一些調(diào)度任務(wù),但卻不適用于復(fù)雜時(shí)間的調(diào)度APScheduler:第三方定時(shí)任務(wù)框架,是對 Java 第三方定時(shí)任務(wù)框架Quartz的模仿與移植,能提供比schedule更復(fù)雜的應(yīng)用場景,并且各種組件都是模塊化,易于使用與二次開發(fā)。Celery Beat:屬于celery這分布式任務(wù)隊(duì)列第三方庫下的一個(gè)定時(shí)任務(wù)組件,如果使用需要配合 RabbitMQ 或 Redis 這類的消息隊(duì)列套件,需要花費(fèi)一定的時(shí)間在環(huán)境搭建上,但在高版本中已經(jīng)不支持 Windows。
APScheduler 來對我們的調(diào)度任務(wù)或定時(shí)任務(wù)進(jìn)行管理是個(gè)性價(jià)比極高的選擇。而本文主要會(huì)帶你快速上手有關(guān) APScheduler 的使用。二、APScheduler 概念與組件
APScheduler 之前,我們需要對這個(gè)框架的一些概念簡單了解,主要有那么以下幾個(gè):觸發(fā)器(trigger) 任務(wù)持久化(job stores) 執(zhí)行器(executor) 調(diào)度器(scheduler)
1.觸發(fā)器(trigger)
APScheduler 中主要是指時(shí)間觸發(fā)器,并且主要有三類時(shí)間觸發(fā)器可供使用:date:日期觸發(fā)器。日期觸發(fā)器主要是在某一日期時(shí)間點(diǎn)上運(yùn)行任務(wù)時(shí)調(diào)用,是APScheduler里面最簡單的一種觸發(fā)器。所以通常也適用于一次性的任務(wù)或作業(yè)調(diào)度。interval:間隔觸發(fā)器。間隔觸發(fā)器是在日期觸發(fā)器基礎(chǔ)上擴(kuò)展了對時(shí)間部分,比如時(shí)、分、秒、天、周這幾個(gè)部分的設(shè)定。是我們用以對重復(fù)性任務(wù)進(jìn)行設(shè)定或調(diào)度的一個(gè)常用調(diào)度器。設(shè)定了時(shí)間部分之后,從起始日期開始(默認(rèn)是當(dāng)前)會(huì)按照設(shè)定的時(shí)間去執(zhí)行任務(wù)。cron:cron表達(dá)式觸發(fā)器。cron表達(dá)式觸發(fā)器就等價(jià)于我們 Linux 上的 crontab,它主要用于更復(fù)雜的日期時(shí)間進(jìn)行設(shè)定。但需要注意的是,APScheduler不支持 6 位及以上的 cron 表達(dá)式,最多只支持到 5 位。
2.任務(wù)持久化(job stores)
APScheduler 就會(huì)根據(jù)對存儲(chǔ)好的調(diào)度任務(wù)結(jié)果進(jìn)行判斷,如果出現(xiàn)已經(jīng)過期但未執(zhí)行的情況會(huì)進(jìn)行相應(yīng)的操作。APScheduler 為我們提供了多種持久化任務(wù)的途徑,默認(rèn)是使用 memory 也就是內(nèi)存的形式,但內(nèi)存并不是持久化最好的方式。最好的方式則是通過像數(shù)據(jù)庫這樣的載體來將我們的定時(shí)任務(wù)寫入到磁盤當(dāng)中,只要磁盤沒有損壞就能將數(shù)據(jù)給恢復(fù)。APScheduler 支持的且常用的數(shù)據(jù)庫主要有:sqlalchemy形式的數(shù)據(jù)庫,這里就主要是指各種傳統(tǒng)的關(guān)系型數(shù)據(jù)庫,如 MySQL、PostgreSQL、SQLite 等。mongodb非結(jié)構(gòu)化的 Mongodb 數(shù)據(jù)庫,該類型數(shù)據(jù)庫經(jīng)常用于對非結(jié)構(gòu)化或版結(jié)構(gòu)化數(shù)據(jù)的存儲(chǔ)或操作,如 JSON。redis內(nèi)存數(shù)據(jù)庫,通常用作數(shù)據(jù)緩存來使用,當(dāng)然通過一些主從復(fù)制等方式也能實(shí)現(xiàn)當(dāng)中數(shù)據(jù)的持久化或保存。
Scheduler 實(shí)例時(shí)創(chuàng)建,或是單獨(dú)為任務(wù)指定。配置的方式相對簡單,我們只需要指定對應(yīng)的數(shù)據(jù)庫鏈接即可。3.執(zhí)行器(executor)
APScheduler 里的執(zhí)行器通常就是 ThreadPoolExecutor 或 ProcessPoolExecutor 這樣的線程池和進(jìn)程池兩種。AsyncIOExecutor、TwistedExecutor 和 GeventExecutor 三種執(zhí)行器。4.調(diào)度器(scheduler)
APScheduler 的用途。根據(jù)用途的不同,APScheduler 又提供了以下幾種調(diào)度器:BlockingScheduler:阻塞調(diào)度器,當(dāng)程序中沒有任何存在主進(jìn)程之中運(yùn)行東西時(shí),就則使用該調(diào)度器。BackgroundScheduler:后臺(tái)調(diào)度器,在不使用后面任何的調(diào)度器且希望在應(yīng)用程序內(nèi)部運(yùn)行時(shí)的后臺(tái)啟動(dòng)時(shí)才進(jìn)行使用,如當(dāng)前你已經(jīng)開啟了一個(gè) Django 或 Flask 服務(wù)。AsyncIOScheduler:AsyncIO調(diào)度器,如果代碼是通過asyncio模塊進(jìn)行異步操作,使用該調(diào)度器。GeventScheduler:Gevent調(diào)度器,如果代碼是通過gevent模塊進(jìn)行協(xié)程操作,使用該調(diào)度器TornadoScheduler:Tornado調(diào)度器,在Tornado框架中使用TwistedScheduler:Twisted調(diào)度器,在基于Twisted的框架或應(yīng)用程序中使用QtScheduler:Qt調(diào)度器,在構(gòu)建Qt應(yīng)用中進(jìn)行使用。
BlockingScheduler 調(diào)度器來進(jìn)行操作,它會(huì)在當(dāng)前進(jìn)程中啟動(dòng)相應(yīng)的線程來進(jìn)行任務(wù)調(diào)度與處理;反之,如果是和 Web 項(xiàng)目或應(yīng)用共存,那么需要選擇 BackgroundScheduler 調(diào)度器,因?yàn)樗粫?huì)干擾當(dāng)前應(yīng)用的線程或進(jìn)程狀況。APScheduler 的運(yùn)行流程:設(shè)定調(diào)度器(scheduler)用以對任務(wù)的調(diào)度與安排進(jìn)行全局統(tǒng)籌 對相應(yīng)的函數(shù)或方法上設(shè)定相應(yīng)的觸發(fā)器(trigger),并添加到調(diào)度器中 如有任務(wù)持久化(job stores)需要?jiǎng)t需要設(shè)定對應(yīng)的持久化層,否則默認(rèn)使用內(nèi)存存儲(chǔ)任務(wù) 當(dāng)觸發(fā)器被觸發(fā)時(shí),就將任務(wù)交由執(zhí)行器(executor)進(jìn)行執(zhí)行
三、APScheduler 快速上手
APScheduler 里面的概念和組件看起來有點(diǎn)多,但在使用上并不算很復(fù)雜,我們可以通過本節(jié)的示例就能夠很快使用。1.選擇對應(yīng)的 scheduler
scheduler 對象,所有的 scheduler 對象都被放在了 apscheduler.schedulers 模塊下,我們可以直接通過查看 API 文檔或者借助 IDE 補(bǔ)全的提示來獲取相應(yīng)的 scheduler 對象。BlockingScheduler:#?main.py
from?apscheduler.schedulers.blocking?import?BlockingScheduler
scheduler?=?BlockingScheduler()
2.配置 scheduler
scheduler 的一些配置我們可以直接在實(shí)例化對象時(shí)就進(jìn)行配置,當(dāng)然也可以在創(chuàng)建實(shí)例化對象之后再進(jìn)行配置。#?main.py
from?datetime?import?datetime
from?apscheduler.executors.pool?import?ThreadPoolExecutor
from?apscheduler.jobstores.sqlalchemy?import?SQLAlchemyJobStore
from?apscheduler.schedulers.blocking?import?BlockingScheduler
#?任務(wù)持久化?使用?SQLite
jobstores?=?{
????'default':?SQLAlchemyJobStore(url?=?'sqlite:///jobs.db')
}
#?執(zhí)行器配置
executors?=?{
????'default':?ThreadPoolExecutor(20),
}
#?關(guān)于?Job?的相關(guān)配置,見官方文檔?API
job_defaults?=?{
????'coalesce':?False,
????'next_run_time':?datetime.now()
}
scheduler?=?BlockingScheduler(
??jobstores?=?jobstores,
??executors?=?executors,
??job_defaults?=?job_defaults,
??timezone?=?'Asia/Shanghai'
)
scheduler.configure 方法進(jìn)行同樣的操作:scheduler?=?BlockingScheduler()
scheduler.configure(jobstores=jobstores,?executors=executors,?job_defaults=job_defaults,?timezone='Asia/Shanghai')
3.添加并執(zhí)行你的任務(wù)
scheduler 對象之后,我們需要調(diào)用其下的 add_job() 或是 scheduled_job() 方法來將我們需要執(zhí)行的函數(shù)進(jìn)行注冊。前者是以傳參的形式指定對應(yīng)的函數(shù)名,而后者則是以裝飾器的形式直接對我們要執(zhí)行的函數(shù)進(jìn)行修飾。now():from?datetime?import?datetime
def?now(trigger):
????print(f"trigger:{trigger}?->?{datetime.now()}")
add_job() 可以這樣寫:if?__name__?==?'__main__':
????scheduler.add_job(now,?trigger?=?"interval",?args?=?("interval",),?seconds?=?5)
????scheduler.start()
start() 方法之后調(diào)度器就會(huì)開始執(zhí)行,并在控制臺(tái)上看到對應(yīng)的結(jié)果了:trigger:interval?->?2021-01-16?21:19:43.356674
trigger:interval?->?2021-01-16?21:19:46.679849
trigger:interval?->?2021-01-16?21:19:48.356595
@scheduled_job 的方式來裝飾我們的任務(wù)或許會(huì)更加自由一些,于是上面的例子就可以寫成這樣:@scheduler.scheduled_job(trigger?=?"interval",?args?=?("interval",),?seconds?=?5)
def?now(trigger):
????print(f"trigger:{trigger}?->?{datetime.now()}")
if?__name__?==?'__main__':
????scheduler.start()
start() 方法執(zhí)行前調(diào)用,否則會(huì)找不到任務(wù)或是拋出異常。四、將 APScheduler 集成到 Web 項(xiàng)目中
APScheduler 由于多樣的調(diào)度器,我們能夠?qū)⑵浜臀覀兊捻?xiàng)目結(jié)合到一起。Flask,那么 Flask-APScheduler 這一別人寫好的第三方包裝庫就很適合你,雖然它沒有相關(guān)的文檔,但只要你了解了前面我所介紹的有關(guān)于 APScheduler 的概念和組件,你就能很輕易地看懂這個(gè)第三方庫倉庫里的示例代碼。APScheduler 本身也提供了一些對任務(wù)或作業(yè)的增刪改查操作,我們可以自己編寫一套合適的 API。temp-scheduler
├──?config.py???????#?配置項(xiàng)
├──?main.py?????????#?API?文件
└──?scheduler.py????#?APScheduler?相關(guān)設(shè)置
1.安裝依賴
這里我們需要的依賴不多,只需要簡單幾個(gè)即可:
pip?install?fastapi?apscheduler?sqlalchemy?uvicorn
2.配置項(xiàng)
config.py 我們主要像 Flask 的配置那樣簡單設(shè)定:from?apscheduler.executors.pool?import?ThreadPoolExecutor
from?apscheduler.jobstores.sqlalchemy?import?SQLAlchemyJobStore
from?apscheduler.schedulers.blocking?import?BlockingScheduler
class?SchedulerConfig:
????JOBSTORES?=?{"default":?SQLAlchemyJobStore(url="sqlite:///job.db")}
????EXECUTORS?=?{"default":?ThreadPoolExecutor(20)}
????JOB_DEFAULTS?=?{"coalesce":?False}
????@classmethod
????def?to_dict(cls):
????????return?{
????????????"jobstores":?cls.JOBSTORES,
????????????"executors":?cls.EXECUTORS,
????????????"job_defaults":?cls.JOB_DEFAULTS,
????????}
SchedulerConfig 配置項(xiàng)中我們可以自己實(shí)現(xiàn)一個(gè) to_dict() 類方法,以便我們后續(xù)傳參時(shí)通過解包的方式直接傳入配置參數(shù)即可。3.Scheduler 相關(guān)設(shè)置
scheduler.py 模塊的設(shè)定也比較簡單,即設(shè)定對應(yīng)的 scheduler 調(diào)度器即可。由于是演示 demo 我還將要定期執(zhí)行的任務(wù)也放在了這個(gè)模塊當(dāng)中:import?logging
from?datetime?import?datetime
from?apscheduler.schedulers.background?import?BackgroundScheduler
from?config?import?SchedulerConfig
scheduler?=?BackgroundScheduler()
logger?=?logging.getLogger(__name__)
def?init_scheduler()?->?None:
????#?config?scheduler
????scheduler.configure(**SchedulerConfig.to_dict())
????logger.info("scheduler?is?running...")
????#?schedule?test
????scheduler.add_job(
????????func=mytask,
????????trigger="date",
????????args=("APScheduler?Initialize.",),
????????next_run_time=datetime.now(),
????)
????scheduler.start()
def?mytask(message:?str)?->?None:
????print(f"[{datetime.now()}]?message:?{message}")
init_scheduler()方法主要用于在 API 服務(wù)啟動(dòng)時(shí)被調(diào)用,然后對scheduler對象的配置以及測試mytask()則是我們要定期執(zhí)行的任務(wù),后續(xù)我們可以通過 APScheduler 提供的方法來自行添加任務(wù)
4.API 設(shè)置
main.py 模塊就主要存放著我們由 FastAPI 所構(gòu)建的相關(guān) API。如果在后續(xù)開發(fā)時(shí)存在多個(gè)接口,此時(shí)就需要將不同接口放在不同模塊文件中,以達(dá)到路由的分發(fā)與管理,類似于 Flask 的藍(lán)圖模式。import?logging
import?uuid
from?datetime?import?datetime
from?typing?import?Any,?Dict,?Optional,?Sequence,?Union
from?fastapi?import?FastAPI
from?pydantic?import?BaseModel
from?scheduler?import?init_scheduler,?mytask,?scheduler
logger?=?logging.getLogger(__name__)
app?=?FastAPI(title="APScheduler?API")
app.add_event_handler("startup",?init_scheduler)
class?Job(BaseModel):
????id:?Union[int,?str,?uuid.UUID]
????name:?Optional[str]?=?None
????func:?Optional[str]?=?None
????args:?Optional[Sequence[Optional[str]]]?=?None
????kwargs:?Optional[Dict[str,?Any]]?=?None
????executor:?Optional[str]?=?None
????misfire_grace_time:?Optional[str]?=?None
????coalesce:?Optional[bool]?=?None
????max_instances:?Optional[int]?=?None
????next_run_time:?Optional[Union[str,?datetime]]?=?None
@app.post("/add")
def?add_job(
????message:?str,
????trigger:?str,
????trigger_args:?Optional[dict],
????id:?Union[str,?int,?uuid.UUID],
):
????try:
????????scheduler.add_job(
????????????func=mytask,
????????????trigger=trigger,
????????????kwargs={"message":?message},
????????????id=id,
????????????**trigger_args,
????????)
????except?Exception?as?e:
????????logger.exception(e.args)
????????return?{"status_code":?0,?"message":?"添加失敗"}
????return?{"status_code":?1,?"message":?"添加成功"}
@app.delete("/delete/{id}")
def?delete_job(id:?Union[str,?int,?uuid.UUID]):
????"""delete?exist?job?by?id"""
????try:
????????scheduler.remove_job(job_id=id)
????except?Exception:
????????return?dict(
????????????message="刪除失敗",
????????????status_code=0,
????????)
????return?dict(
????????message="刪除成功",
????????status_code=1,
????)
@app.put("/reschedule/{id}")
def?reschedule_job(
????id:?Union[str,?int,?uuid.UUID],?trigger:?str,?trigger_args:?Optional[dict]
):
????try:
????????scheduler.reschedule_job(job_id=id,?trigger=trigger,?**trigger_args)
????except?Exception?as?e:
????????logger.exception(e.args)
????????return?dict(
????????????message="修改失敗",
????????????status_code=0,
????????)
????return?dict(
????????message="修改成功",
????????status_code=1,
????)
@app.get("/job")
def?get_all_jobs():
????jobs?=?None
????try:
????????job_list?=?scheduler.get_jobs()
????????if?job_list:
????????????jobs?=?[Job(**task.__getstate__())?for?task?in?job_list]
????except?Exception?as?e:
????????logger.exception(e.args)
????????return?dict(
????????????message="查詢失敗",
????????????status_code=0,
????????????jobs=jobs,
????????)
????return?dict(
????????message="查詢成功",
????????status_code=1,
????????jobs=jobs,
????)
@app.get("/job/{id}")
def?get_job_by_id(id:?Union[int,?str,?uuid.UUID]):
????jobs?=?[]
????try:
????????job?=?scheduler.get_job(job_id=id)
????????if?job:
????????????jobs?=?[Job(**job.__getstate__())]
????except?Exception?as?e:
????????logger.exception(e.args)
????????return?dict(
????????????message="查詢失敗",
????????????status_code=0,
????????????jobs=jobs,
????????)
????return?dict(
????????message="查詢成功",
????????status_code=1,
????????jobs=jobs,
????)
FastAPI 對象 app的初始化。這里用到的add_event_handler()方法就有點(diǎn)像 Flask 中的before_first_request,會(huì)在 Web 服務(wù)請求伊始進(jìn)行操作,理解為初始化相關(guān)的操作即可。API 接口路由。路由通過 app對象下的對應(yīng) HTTP 方法來實(shí)現(xiàn),如GET、POST、PUT等。這里的裝飾器用法其實(shí)也和 Flask 很類似,就不多贅述。scheduler對象的增刪改查。從scheduler.py模塊中引入我們創(chuàng)建好的scheduler對象之后就可以直接用來做增刪改查的操作:增:使用 add_job()方法,其主要的參數(shù)是要運(yùn)行的函數(shù)(或方法)、觸發(fā)器以及觸發(fā)器參數(shù)等刪:使用 delete_job()方法,我們需要傳入一個(gè)對應(yīng)任務(wù)的id參數(shù),用以能夠查找到對應(yīng)的任務(wù)改:使用 reschedule_job()方法,這里也需要一個(gè)對應(yīng)任務(wù)的id參數(shù),以及需要重新修改的觸發(fā)器及其參數(shù)查:使用 get_jobs()和get_job()兩個(gè)方法,前者是直接獲取到當(dāng)前調(diào)度的所有任務(wù),返回的是一個(gè)包含了APScheduler.job.Job對象的列表,而后者是通過id參數(shù)來查找對應(yīng)的任務(wù)對象;這里我通過底層源碼使用__getstate__()來獲取到任務(wù)的相關(guān)信息,這些信息我們通過事先設(shè)定好的Job對象來對其進(jìn)行序列化,最后將信息從接口中返回。
5.運(yùn)行
uvicorn?main:app?
http://127.0.0.1:8000/docs ?中看到關(guān)于全部接口的 Swagger 文檔頁面了:
五、結(jié)尾
APScheduler 框架的概念及其用法,并進(jìn)行了簡單的實(shí)踐。APScheduler 的模塊化設(shè)計(jì)才可以讓我們更方便地去理解、使用它,并將其運(yùn)用到我們實(shí)際的開發(fā)過程中。評論
圖片
表情
