基于 CRON 庫擴展的分布式 Crontab 的實現(xiàn)
作者:熊喵君,原文鏈接:https://pandaychen.github.io/2022/01/16/A-GOLANG-CRONTAB-V3-ANALYSIS/
0x00 前言
cron[1] 是一個用于管理定時任務(wù)的庫(單機),基于 Golang 實現(xiàn) Linux 中 crontab 的功能
0x01 使用
Linux 的 crontab
crontab 基本格式:
# 文件格式說明
# ┌──分鐘(0 - 59)
# │ ┌──小時(0 - 23)
# │ │ ┌──日(1 - 31)
# │ │ │ ┌─月(1 - 12)
# │ │ │ │ ┌─星期(0 - 6,表示從周日到周六)
# │ │ │ │ │
# * * * * * 被執(zhí)行的命令
基礎(chǔ)例子
用法極豐富,V3 版本也支持標(biāo)準(zhǔn)的 crontab 格式,具體用法細節(jié)可以參考 此文[2]:
func main() {
job := cron.New(
cron.WithSeconds(), // 添加秒級別支持,默認支持最小粒度為分鐘(如需秒級精度則必須設(shè)置)
)
// 每秒鐘執(zhí)行一次
job.AddFunc("* * * * * *", func() {
fmt.Printf("task run: %v\n", time.Now())
})
job.Run() // 啟動
}
其他典型的用法還有如下:
type cronJobDemo int
func (c cronJobDemo) Run() {
fmt.Println("5s func trigger")
return
}
func main() {
c := cron.New(
cron.WithSeconds(),
)
c.AddFunc("0 * * * *", func() { fmt.Println("Every hour on the half hour") })
c.AddFunc("30 3-6,20-23 * * *", func() { fmt.Println(".. in the range 3-6am, 8-11pm") })
c.AddFunc("CRON_TZ=Asia/Tokyo 30 04 * * *", func() { fmt.Println("Runs at 04:30 Tokyo time every day") })
c.AddFunc("@every 5m", func() { fmt.Println("every 5m, start 5m fron now") }) // 容易理解的格式
// 通過 AddJob 注冊
var cJob cronJobDemo
c.AddJob("@every 5s", cJob)
c.Start()
// c.Stop()
select {}
}
0x02 代碼分析
核心數(shù)據(jù)結(jié)構(gòu)
對于 cron 庫的整體邏輯,最關(guān)鍵的兩個數(shù)據(jù)結(jié)構(gòu)就是 Entry 和 Cron
1、Job:抽象一個定時任務(wù),cron 調(diào)度一個 Job,就去執(zhí)行 Job 的 Run() 方法
type Job interface {
Run()
}
FuncJob:FuncJob 實際就是一個 func() 類型,實現(xiàn)了 Run() 方法:
type FuncJob func()
func (f FuncJob) Run() {
f()
}
在實際應(yīng)用中,我們需要對 Job 結(jié)構(gòu)做一些擴展,于是就有了 JobWrapper,使用修飾器機制加工 Job(傳入一個 Job,返回一個 Job),有點像 gin 中間件,包裝器可以在執(zhí)行實際的 Job 前后添加一些邏輯,然后使用一個 Chain 將這些 JobWrapper 組合到一起。
比如給 Job 添加這樣一些屬性:
在 Job回調(diào)方法中捕獲panic異常如果 Job上次運行還未結(jié)束,推遲本次執(zhí)行如果 Job上次運行還未結(jié)束,跳過本次執(zhí)行記錄每個 Job的執(zhí)行情況
type JobWrapper func(Job) Job
type Chain struct {
wrappers []JobWrapper
}
func NewChain(c ...JobWrapper) Chain {
return Chain{c}
}
2、Chain 結(jié)構(gòu)Chain 是 JobWrapper 的數(shù)組,調(diào)用 Chain 對象的 Then(j Job) 方法應(yīng)用這些 JobWrapper,返回最終的 Job:
type Chain struct {
wrappers []JobWrapper
}
func NewChain(c ...JobWrapper) Chain {
return Chain{c}
}
func (c Chain) Then(j Job) Job {
for i := range c.wrappers {
// 注意:應(yīng)用 JobWrapper 的順序
j = c.wrappers[len(c.wrappers)-i-1](j "len(c.wrappers "len(c.wrappers)-i-1")-i-1")
}
return j
}
3、Schedule:描述一個 job 如何循環(huán)執(zhí)行的抽象,需要實現(xiàn)Next方法,此方法返回任務(wù)下次被調(diào)度的時間
// Schedule describes a job's duty cycle.
type Schedule interface {
// Next returns the next activation time, later than the given time.
// Next is invoked initially, and then each time the job is run.
Next(time.Time) time.Time
}
Scheduler 的實例化結(jié)構(gòu)有:
ConstantDelaySchedule:實現(xiàn)[3]SpecSchedule:實現(xiàn)[4],默認選擇,提供了對 Cron 表達式的解析能力
4、Entry 結(jié)構(gòu):抽象了一個 job
每當(dāng)使用 AddJob 注冊一個定時調(diào)用策略,就會為該策略生成唯一的 Entry,Entry 里會存儲被執(zhí)行的時間、需要被調(diào)度執(zhí)行的實體 Job
type Entry struct {
ID EntryID // job id,可以通過該 id 來刪除 job
Schedule Schedule // 用于計算 job 下次的執(zhí)行時間
Next time.Time // job 下次執(zhí)行時間
Prev time.Time // job 上次執(zhí)行時間,沒執(zhí)行過為 0
WrappedJob Job // 修飾器加工過的 job
Job Job // 未經(jīng)修飾的 job,可以理解為 AddFunc 的第二個參數(shù)
}
5、Cron結(jié)構(gòu)[5]:關(guān)于 Cron 結(jié)構(gòu),有一些細節(jié),entries 為何設(shè)計為一個指針 slice?
// Cron keeps track of any number of entries, invoking the associated func as
// specified by the schedule. It may be started, stopped, and the entries may
// be inspected while running.
type Cron struct {
entries []*Entry // 所有 Job 集合
chain Chain // 裝飾器鏈
stop chan struct{} // 停止信號
add chan *Entry // 用于異步增加 Entry
remove chan EntryID // 用于異步刪除 Entry
snapshot chan chan []Entry
running bool // 是否正在運行
logger Logger
runningMu sync.Mutex // 運行時鎖
location *time.Location // 時區(qū)相關(guān)
parser Parser // Cron 解析器
nextID EntryID
jobWaiter sync.WaitGroup // 并發(fā)控制,正在運行的 Job
}
entries 成員
剛才說到 entries 為何設(shè)計為指針 slice,原因在于 cron 核心邏輯中,每次循環(huán)開始時都會對 Cron.entries 進行排序,排序字段依賴于每個 Entry 結(jié)構(gòu)的 Next 成員,排序依賴于下面的原則:
按照觸發(fā)時間正向排序,越先觸發(fā)的越靠前 IsZero的任務(wù)向后面排由于可能存在相同周期的任務(wù) Job,所以排序是不穩(wěn)定的
// byTime is a wrapper for sorting the entry array by time
// (with zero time at the end).
type byTime []*Entry
func (s byTime) Len() int { return len(s) }
func (s byTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s byTime) Less(i, j int) bool {
// Two zero times should return false.
// Otherwise, zero is "greater" than any other time.
// (To sort it at the end of the list.)
if s[i].Next.IsZero() {
return false
}
if s[j].Next.IsZero() {
return true
}
// 排序的原則,s[i] 比 s[j] 先觸發(fā)
return s[i].Next.Before(s[j].Next)
}
0x03 內(nèi)置 JobWrapper 介紹
Recover:捕捉 panic,避免進程異常退出
此 wrapper 比較好理解,在執(zhí)行內(nèi)層的 Job 邏輯前,添加 recover() 調(diào)用。如果 Job.Run() 執(zhí)行過程中有 panic。這里的 recover() 會捕獲到,輸出調(diào)用堆棧
// cron.go
func Recover(logger Logger) JobWrapper {
return func(j Job) Job {
return FuncJob(func() {
defer func() {
if r := recover(); r != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
err, ok := r.(error)
if !ok {
err = fmt.Errorf("%v", r)
}
logger.Error(err, "panic", "stack", "...\n"+string(buf))
}
}()
j.Run()
})
}
}
DelayIfStillRunning
實現(xiàn)了已有任務(wù)運行推遲的邏輯。核心是通過一個(任務(wù)共用的)互斥鎖 sync.Mutex,每次執(zhí)行任務(wù)前獲取鎖,執(zhí)行結(jié)束之后釋放鎖。所以在上一個任務(wù)結(jié)束前,下一個任務(wù)獲取鎖會阻塞,從而保證的任務(wù)的串行執(zhí)行。
// chain.go
func DelayIfStillRunning(logger Logger) JobWrapper {
return func(j Job) Job {
var mu sync.Mutex
return FuncJob(func() {
start := time.Now()
// 下一個任務(wù)阻塞等待獲取鎖
mu.Lock()
defer mu.Unlock()
if dur := time.Since(start); dur > time.Minute {
logger.Info("delay", "duration", dur)
}
j.Run()
})
}
}
SkipIfStillRunning
和 DelayIfStillRunning 機制不一樣,該方法是跳過執(zhí)行,通過無緩沖 channel 機制實現(xiàn)。執(zhí)行任務(wù)時,從通道中取值,如果成功,執(zhí)行,否則跳過。執(zhí)行完成之后再向通道中發(fā)送一個值,確保下一個任務(wù)能執(zhí)行。初始發(fā)送一個值到通道中,保證第一個任務(wù)的執(zhí)行。
func SkipIfStillRunning(logger Logger) JobWrapper {
return func(j Job) Job {
// 定義一個無緩沖 channel
var ch = make(chan struct{}, 1)
ch <- struct{}{}
return FuncJob(func() {
select {
case v := <-ch:
j.Run()
ch <- v
default:
logger.Info("skip")
}
})
}
}
0x04 核心方法分析
AddJob 方法
AddJob 方法通過兩種方法將任務(wù)節(jié)點 entry 添加到 Cron.entries 中:
初始化時,直接 append運行狀態(tài)下,通過 channel 方式異步添加,避免加鎖
// AddJob adds a Job to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
schedule, err := c.parser.Parse(spec)
if err != nil {
return 0, err
}
return c.Schedule(schedule, cmd), nil
}
// Schedule adds a Job to the Cron to be run on the given schedule.
// The job is wrapped with the configured Chain.
func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
c.runningMu.Lock()
defer c.runningMu.Unlock()
c.nextID++
entry := &Entry{
ID: c.nextID,
Schedule: schedule,
WrappedJob: c.chain.Then(cmd),
Job: cmd,
}
if !c.running {
// 直接加
c.entries = append(c.entries, entry)
} else {
// 異步
c.add <- entry
}
return entry.ID
}
run 方法
run 方法
cron 的核心 run() 方法的實現(xiàn)如下,這個是很經(jīng)典的 for-select 異步處理模型,避免的對 entries 加鎖,非常值得借鑒。其核心有如下幾點:
一個定時任務(wù)(集)的實現(xiàn),內(nèi)部采用排序數(shù)組,取數(shù)組首位元素的時間作為
timer觸發(fā)時間(感覺可以優(yōu)化為最小堆?)每個 entry都包含了該entry下一次執(zhí)行的絕對時間,本輪執(zhí)行完成后立即計算下一輪時間,等待下次循環(huán)時排序更新每次循環(huán)開始對 cron.entries按下次執(zhí)行時間升序排序,只需要對第一個entry啟動定時器即可定時器事件觸發(fā)時,輪詢 cron.entries里需要執(zhí)行的entries直到第一個不滿足條件的,由于數(shù)組是升序,后面無需再遍歷同時,第一個定時器處理結(jié)束開啟下次定時器時,也只需要更新執(zhí)行過的 cron.entries的Next(下次執(zhí)行時間),不需要更新所有的cron.entriesCron內(nèi)部數(shù)據(jù)結(jié)構(gòu)的維護,采用channel實現(xiàn)無鎖機制,缺點是可能會有誤差(ms級),不過在此項目是能夠容忍的,以Job異步添加為例(運行中添加
entry,走異步方式,有duration的延遲):某個 Job之間的delta差,可能多出了duration的延遲,可以容忍定時器實現(xiàn)里,會掃描所有當(dāng)前時間之前的 cron.entries來執(zhí)行,增加了容錯
func (c *Cron) run() {
c.logger.Info("start")
// 初始化,計算每個 Job 下次的執(zhí)行時間
now := c.now()
for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now)
c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
}
// 在 dead loop,進行任務(wù)調(diào)度
for {
// 根據(jù)下一次的執(zhí)行時間,對所有 Job 排序
sort.Sort(byTime(c.entries))
// 計時器,用于沒有任務(wù)可調(diào)度時的阻塞操作
var timer *time.Timer
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
// 無任務(wù)可調(diào)度,設(shè)置計時器到一個很大的值,把下面的 for 阻塞住
timer = time.NewTimer(100000 * time.Hour)
} else {
// 有任務(wù)可調(diào)度了,計時器根據(jù)第一個可調(diào)度任務(wù)的下次執(zhí)行時間設(shè)置
// 排過序,所以第一個肯定是最先被執(zhí)行的
timer = time.NewTimer(c.entries[0].Next.Sub(now))
}
for {
select {
// 有 Job 到了執(zhí)行時間
case now = <-timer.C:
now = now.In(c.location)
c.logger.Info("wake", "now", now)
// 檢查所有 Job,執(zhí)行到時的任務(wù)
for _, e := range c.entries {
// 可能存在相同時間出發(fā)的任務(wù)
if e.Next.After(now) || e.Next.IsZero() {
// 后面都不需要遍歷了!
break
}
// 執(zhí)行 Job 的 func()
c.startJob(e.WrappedJob)
// 保存上次執(zhí)行時間
e.Prev = e.Next
// 設(shè)置 Job 下次的執(zhí)行時間
e.Next = e.Schedule.Next(now)
c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
}
// 添加新 Job
case newEntry := <-c.add:
timer.Stop() // 必須注意,這里停止定時器,避免內(nèi)存泄漏!
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
c.entries = append(c.entries, newEntry)
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
// 獲取所有 Job 的快照
case replyChan := <-c.snapshot:
replyChan <- c.entrySnapshot()
continue
// 停止調(diào)度
case <-c.stop:
timer.Stop()
c.logger.Info("stop")
return
// 根據(jù) entryId 刪除一個 Job
case id := <-c.remove:
timer.Stop()
now = c.now()
c.removeEntry(id)
c.logger.Info("removed", "entry", id)
}
break
}
}
}
上述的代碼的核心流程如下圖:

0x05 小結(jié)
本文分析了基于 Golang 實現(xiàn)的單機定時任務(wù)庫。
0x06 參考
golang cron v3 定時任務(wù)[6] v3-repo[7] Go 每日一庫之 cron[8] GO 編程模式:修飾器[9]
參考資料
cron: https://github.com/robfig/cron/
[2]此文: https://segmentfault.com/a/1190000023029219
[3]實現(xiàn): https://github.com/robfig/cron/blob/v3/constantdelay.go
[4]實現(xiàn): https://pandaychen.github.io/2021/10/05/A-GOLANG-CRONTAB-V3-BASIC-INTRO/
[5]結(jié)構(gòu): https://github.com/robfig/cron/blob/v3/cron.go#L13
[6]golang cron v3 定時任務(wù): https://blog.cugxuan.cn/2020/06/04/Go/golang-cron-v3/
[7]v3-repo: https://github.com/robfig/cron/tree/v3
[8]Go 每日一庫之 cron: https://segmentfault.com/a/1190000023029219
[9]GO 編程模式:修飾器: https://coolshell.cn/articles/17929.html
推薦閱讀
