<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          基于 CRON 庫擴展的分布式 Crontab 的實現(xiàn)

          共 22638字,需瀏覽 46分鐘

           ·

          2022-07-17 17:31

          作者:熊喵君,原文鏈接:https://pandaychen.github.io/2022/01/16/A-GOLANG-CRONTAB-V3-ANALYSIS/

          0x00 前言

          cron[1] 是一個用于管理定時任務(wù)的庫(單機),基于 Golang 實現(xiàn) Linux 中 crontab 的功能

          0x01 使用

          Linux 的 crontab

          crontab 基本格式:

          # 文件格式說明
          # ┌──分鐘(0 - 59)
          # │ ┌──小時(0 - 23)
          # │ │ ┌──日(1 - 31)
          # │ │ │ ┌─月(1 - 12)
          # │ │ │ │ ┌─星期(0 - 6,表示從周日到周六)
          # │ │ │ │ │
          # * * * * * 被執(zhí)行的命令

          基礎(chǔ)例子

          用法極豐富,V3 版本也支持標(biāo)準(zhǔn)的 crontab 格式,具體用法細節(jié)可以參考 此文[2]

          func main() {
              job := cron.New(
                  cron.WithSeconds(), // 添加秒級別支持,默認支持最小粒度為分鐘(如需秒級精度則必須設(shè)置)
              )
              // 每秒鐘執(zhí)行一次
              job.AddFunc("* * * * * *"func() {
                  fmt.Printf("task run: %v\n", time.Now())
              })
              job.Run()   // 啟動
          }

          其他典型的用法還有如下:

          type cronJobDemo int

          func (c cronJobDemo) Run() {
                  fmt.Println("5s func trigger")
                  return
          }

          func main() {
              c := cron.New(
                      cron.WithSeconds(),
              )
              c.AddFunc("0 * * * *"func() { fmt.Println("Every hour on the half hour") })
              c.AddFunc("30 3-6,20-23 * * *"func() { fmt.Println(".. in the range 3-6am, 8-11pm") })
              c.AddFunc("CRON_TZ=Asia/Tokyo 30 04 * * *"func() { fmt.Println("Runs at 04:30 Tokyo time every day") })
              c.AddFunc("@every 5m"func() { fmt.Println("every 5m, start 5m fron now") }) // 容易理解的格式
              // 通過 AddJob 注冊
              var cJob cronJobDemo
              c.AddJob("@every 5s", cJob)
              c.Start()
              // c.Stop()

              select {}
          }

          0x02 代碼分析

          核心數(shù)據(jù)結(jié)構(gòu)

          對于 cron 庫的整體邏輯,最關(guān)鍵的兩個數(shù)據(jù)結(jié)構(gòu)就是 EntryCron

          1、Job:抽象一個定時任務(wù),cron 調(diào)度一個 Job,就去執(zhí)行 JobRun() 方法

          type Job interface {
              Run()
          }

          FuncJobFuncJob 實際就是一個 func() 類型,實現(xiàn)了 Run() 方法:

          type FuncJob func()
          func (f FuncJob) Run() { 
              f() 
          }

          在實際應(yīng)用中,我們需要對 Job 結(jié)構(gòu)做一些擴展,于是就有了 JobWrapper,使用修飾器機制加工 Job(傳入一個 Job,返回一個 Job),有點像 gin 中間件,包裝器可以在執(zhí)行實際的 Job 前后添加一些邏輯,然后使用一個 Chain 將這些 JobWrapper 組合到一起。

          比如給 Job 添加這樣一些屬性:

          • Job 回調(diào)方法中捕獲 panic 異常
          • 如果 Job 上次運行還未結(jié)束,推遲本次執(zhí)行
          • 如果 Job 上次運行還未結(jié)束,跳過本次執(zhí)行
          • 記錄每個 Job 的執(zhí)行情況
          type JobWrapper func(Job) Job

          type Chain struct {
            wrappers []JobWrapper
          }

          func NewChain(c ...JobWrapper) Chain {
            return Chain{c}
          }

          2、Chain 結(jié)構(gòu)ChainJobWrapper 的數(shù)組,調(diào)用 Chain 對象的 Then(j Job) 方法應(yīng)用這些 JobWrapper,返回最終的 Job

          type Chain struct {
            wrappers []JobWrapper
          }

          func NewChain(c ...JobWrapper) Chain {
            return Chain{c}
          }

          func (c Chain) Then(j Job) Job {
            for i := range c.wrappers {
                // 注意:應(yīng)用 JobWrapper 的順序
              j = c.wrappers[len(c.wrappers)-i-1](j "len(c.wrappers "len(c.wrappers)-i-1")-i-1")
            }
            return j
          }

          3、Schedule:描述一個 job 如何循環(huán)執(zhí)行的抽象,需要實現(xiàn)Next方法,此方法返回任務(wù)下次被調(diào)度的時間

          // Schedule describes a job's duty cycle.
          type Schedule interface {
           // Next returns the next activation time, later than the given time.
           // Next is invoked initially, and then each time the job is run.
           Next(time.Time) time.Time
          }

          Scheduler 的實例化結(jié)構(gòu)有:

          • ConstantDelaySchedule實現(xiàn)[3]
          • SpecSchedule實現(xiàn)[4],默認選擇,提供了對 Cron 表達式的解析能力

          4、Entry 結(jié)構(gòu):抽象了一個 job 每當(dāng)使用 AddJob 注冊一個定時調(diào)用策略,就會為該策略生成唯一的 EntryEntry 里會存儲被執(zhí)行的時間、需要被調(diào)度執(zhí)行的實體 Job

          type Entry struct {
              ID EntryID          // job id,可以通過該 id 來刪除 job
              Schedule Schedule   // 用于計算 job 下次的執(zhí)行時間
              Next time.Time      // job 下次執(zhí)行時間
              Prev time.Time      // job 上次執(zhí)行時間,沒執(zhí)行過為 0
              WrappedJob Job      // 修飾器加工過的 job
              Job Job             // 未經(jīng)修飾的 job,可以理解為 AddFunc 的第二個參數(shù)
          }

          5、Cron結(jié)構(gòu)[5]:關(guān)于 Cron 結(jié)構(gòu),有一些細節(jié),entries 為何設(shè)計為一個指針 slice

          // Cron keeps track of any number of entries, invoking the associated func as
          // specified by the schedule. It may be started, stopped, and the entries may
          // be inspected while running.
          type Cron struct {
              entries   []*Entry          // 所有 Job 集合
              chain     Chain             // 裝飾器鏈
              stop      chan struct{}     // 停止信號
              add       chan *Entry       // 用于異步增加 Entry
              remove    chan EntryID      // 用于異步刪除 Entry
              snapshot  chan chan []Entry
              running   bool              // 是否正在運行
              logger    Logger
              runningMu sync.Mutex        // 運行時鎖
              location  *time.Location    // 時區(qū)相關(guān)
              parser    Parser            // Cron 解析器
              nextID    EntryID
              jobWaiter sync.WaitGroup    // 并發(fā)控制,正在運行的 Job
          }

          entries 成員

          剛才說到 entries 為何設(shè)計為指針 slice,原因在于 cron 核心邏輯中,每次循環(huán)開始時都會對 Cron.entries 進行排序,排序字段依賴于每個 Entry 結(jié)構(gòu)的 Next 成員,排序依賴于下面的原則:

          1. 按照觸發(fā)時間正向排序,越先觸發(fā)的越靠前
          2. IsZero 的任務(wù)向后面排
          3. 由于可能存在相同周期的任務(wù) Job,所以排序是不穩(wěn)定的
          // byTime is a wrapper for sorting the entry array by time
          // (with zero time at the end).
          type byTime []*Entry

          func (s byTime) Len() int      { return len(s) }
          func (s byTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
          func (s byTime) Less(i, j int) bool {
           // Two zero times should return false.
           // Otherwise, zero is "greater" than any other time.
           // (To sort it at the end of the list.)
           if s[i].Next.IsZero() {
            return false
           }
           if s[j].Next.IsZero() {
            return true
           }
              // 排序的原則,s[i] 比 s[j] 先觸發(fā)
           return s[i].Next.Before(s[j].Next)
          }

          0x03 內(nèi)置 JobWrapper 介紹

          Recover:捕捉 panic,避免進程異常退出

          此 wrapper 比較好理解,在執(zhí)行內(nèi)層的 Job 邏輯前,添加 recover() 調(diào)用。如果 Job.Run() 執(zhí)行過程中有 panic。這里的 recover() 會捕獲到,輸出調(diào)用堆棧

          // cron.go
          func Recover(logger Logger) JobWrapper {
            return func(j Job) Job {
              return FuncJob(func() {
                defer func() {
                  if r := recover(); r != nil {
                    const size = 64 << 10
                    buf := make([]byte, size)
                    buf = buf[:runtime.Stack(buf, false)]
                    err, ok := r.(error)
                    if !ok {
                      err = fmt.Errorf("%v", r)
                    }
                    logger.Error(err, "panic""stack""...\n"+string(buf))
                  }
                }()
                j.Run()
              })
            }
          }

          DelayIfStillRunning

          實現(xiàn)了已有任務(wù)運行推遲的邏輯。核心是通過一個(任務(wù)共用的)互斥鎖 sync.Mutex,每次執(zhí)行任務(wù)前獲取鎖,執(zhí)行結(jié)束之后釋放鎖。所以在上一個任務(wù)結(jié)束前,下一個任務(wù)獲取鎖會阻塞,從而保證的任務(wù)的串行執(zhí)行。

          // chain.go
          func DelayIfStillRunning(logger Logger) JobWrapper {
            return func(j Job) Job {
              var mu sync.Mutex
              return FuncJob(func() {
                start := time.Now()
                // 下一個任務(wù)阻塞等待獲取鎖
                mu.Lock()
                defer mu.Unlock()
                if dur := time.Since(start); dur > time.Minute {
                  logger.Info("delay""duration", dur)
                }
                j.Run()
              })
            }
          }

          SkipIfStillRunning

          DelayIfStillRunning 機制不一樣,該方法是跳過執(zhí)行,通過無緩沖 channel 機制實現(xiàn)。執(zhí)行任務(wù)時,從通道中取值,如果成功,執(zhí)行,否則跳過。執(zhí)行完成之后再向通道中發(fā)送一個值,確保下一個任務(wù)能執(zhí)行。初始發(fā)送一個值到通道中,保證第一個任務(wù)的執(zhí)行。

          func SkipIfStillRunning(logger Logger) JobWrapper {
            return func(j Job) Job {
              // 定義一個無緩沖 channel
              var ch = make(chan struct{}, 1)
              ch <- struct{}{}
              return FuncJob(func() {
                select {
                case v := <-ch:
                  j.Run()
                  ch <- v
                default:
                  logger.Info("skip")
                }
              })
            }
          }

          0x04 核心方法分析

          AddJob 方法

          AddJob 方法通過兩種方法將任務(wù)節(jié)點 entry 添加到 Cron.entries 中:

          1. 初始化時,直接 append
          2. 運行狀態(tài)下,通過 channel 方式異步添加,避免加鎖
          // AddJob adds a Job to the Cron to be run on the given schedule.
          // The spec is parsed using the time zone of this Cron instance as the default.
          // An opaque ID is returned that can be used to later remove it.
          func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
           schedule, err := c.parser.Parse(spec)
           if err != nil {
            return 0, err
           }
           return c.Schedule(schedule, cmd), nil
          }

          // Schedule adds a Job to the Cron to be run on the given schedule.
          // The job is wrapped with the configured Chain.
          func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
           c.runningMu.Lock()
           defer c.runningMu.Unlock()
           c.nextID++
           entry := &Entry{
            ID:         c.nextID,
            Schedule:   schedule,
            WrappedJob: c.chain.Then(cmd),
            Job:        cmd,
           }
           if !c.running {
                  // 直接加
            c.entries = append(c.entries, entry)
           } else {
                  // 異步
            c.add <- entry
           }
           return entry.ID
          }

          run 方法

          run 方法

          cron 的核心 run() 方法的實現(xiàn)如下,這個是很經(jīng)典的 for-select 異步處理模型,避免的對 entries 加鎖,非常值得借鑒。其核心有如下幾點:

          1. 一個定時任務(wù)(集)的實現(xiàn),內(nèi)部采用排序數(shù)組,取數(shù)組首位元素的時間作為timer觸發(fā)時間(感覺可以優(yōu)化為最小堆?)

            • 每個 entry 都包含了該 entry 下一次執(zhí)行的絕對時間,本輪執(zhí)行完成后立即計算下一輪時間,等待下次循環(huán)時排序更新
            • 每次循環(huán)開始對 cron.entries 按下次執(zhí)行時間升序排序,只需要對第一個 entry 啟動定時器即可
            • 定時器事件觸發(fā)時,輪詢 cron.entries 里需要執(zhí)行的 entries 直到第一個不滿足條件的,由于數(shù)組是升序,后面無需再遍歷
            • 同時,第一個定時器處理結(jié)束開啟下次定時器時,也只需要更新執(zhí)行過的 cron.entriesNext(下次執(zhí)行時間),不需要更新所有的 cron.entries
          2. Cron內(nèi)部數(shù)據(jù)結(jié)構(gòu)的維護,采用channel實現(xiàn)無鎖機制,缺點是可能會有誤差(ms級),不過在此項目是能夠容忍的,以 Job

            異步添加為例(運行中添加entry,走異步方式,有duration的延遲):

            • 某個 Job 之間的 delta 差,可能多出了 duration 的延遲,可以容忍
            • 定時器實現(xiàn)里,會掃描所有當(dāng)前時間之前的 cron.entries 來執(zhí)行,增加了容錯
          func (c *Cron) run() {
              c.logger.Info("start")

              // 初始化,計算每個 Job 下次的執(zhí)行時間
              now := c.now()
              for _, entry := range c.entries {
                  entry.Next = entry.Schedule.Next(now)
                  c.logger.Info("schedule""now", now, "entry", entry.ID, "next", entry.Next)
              }

              // 在 dead loop,進行任務(wù)調(diào)度
              for {
                  // 根據(jù)下一次的執(zhí)行時間,對所有 Job 排序
                  sort.Sort(byTime(c.entries))

                  // 計時器,用于沒有任務(wù)可調(diào)度時的阻塞操作
                  var timer *time.Timer
                  if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
                      // 無任務(wù)可調(diào)度,設(shè)置計時器到一個很大的值,把下面的 for 阻塞住
                      timer = time.NewTimer(100000 * time.Hour)
                  } else {
                      // 有任務(wù)可調(diào)度了,計時器根據(jù)第一個可調(diào)度任務(wù)的下次執(zhí)行時間設(shè)置
                      // 排過序,所以第一個肯定是最先被執(zhí)行的
                      timer = time.NewTimer(c.entries[0].Next.Sub(now))
                  }

                  for {
                      select {
                      // 有 Job 到了執(zhí)行時間
                      case now = <-timer.C:
                          now = now.In(c.location)
                          c.logger.Info("wake""now", now)
                          // 檢查所有 Job,執(zhí)行到時的任務(wù)
                          for _, e := range c.entries {
                              // 可能存在相同時間出發(fā)的任務(wù)
                              if e.Next.After(now) || e.Next.IsZero() {
                                  // 后面都不需要遍歷了!
                                  break
                              }
                              // 執(zhí)行 Job 的 func()
                              c.startJob(e.WrappedJob)

                              // 保存上次執(zhí)行時間
                              e.Prev = e.Next
                              // 設(shè)置 Job 下次的執(zhí)行時間
                              e.Next = e.Schedule.Next(now)
                              c.logger.Info("run""now", now, "entry", e.ID, "next", e.Next)
                          }

                      // 添加新 Job
                      case newEntry := <-c.add:
                          timer.Stop()        // 必須注意,這里停止定時器,避免內(nèi)存泄漏!
                          now = c.now()
                          newEntry.Next = newEntry.Schedule.Next(now)
                          c.entries = append(c.entries, newEntry)
                          c.logger.Info("added""now", now, "entry", newEntry.ID, "next", newEntry.Next)

                      // 獲取所有 Job 的快照
                      case replyChan := <-c.snapshot:
                          replyChan <- c.entrySnapshot()
                          continue

                      // 停止調(diào)度
                      case <-c.stop:
                          timer.Stop()
                          c.logger.Info("stop")
                          return

                      // 根據(jù) entryId 刪除一個 Job
                      case id := <-c.remove:
                          timer.Stop()
                          now = c.now()
                          c.removeEntry(id)
                          c.logger.Info("removed""entry", id)
                      }

                      break
                  }
              }
          }

          上述的代碼的核心流程如下圖:

          image

          0x05 小結(jié)

          本文分析了基于 Golang 實現(xiàn)的單機定時任務(wù)庫。

          0x06 參考

          • golang cron v3 定時任務(wù)[6]
          • v3-repo[7]
          • Go 每日一庫之 cron[8]
          • GO 編程模式:修飾器[9]

          參考資料

          [1]

          cron: https://github.com/robfig/cron/

          [2]

          此文: https://segmentfault.com/a/1190000023029219

          [3]

          實現(xiàn): https://github.com/robfig/cron/blob/v3/constantdelay.go

          [4]

          實現(xiàn): https://pandaychen.github.io/2021/10/05/A-GOLANG-CRONTAB-V3-BASIC-INTRO/

          [5]

          結(jié)構(gòu): https://github.com/robfig/cron/blob/v3/cron.go#L13

          [6]

          golang cron v3 定時任務(wù): https://blog.cugxuan.cn/2020/06/04/Go/golang-cron-v3/

          [7]

          v3-repo: https://github.com/robfig/cron/tree/v3

          [8]

          Go 每日一庫之 cron: https://segmentfault.com/a/1190000023029219

          [9]

          GO 編程模式:修飾器: https://coolshell.cn/articles/17929.html



          推薦閱讀


          福利

          我為大家整理了一份從入門到進階的Go學(xué)習(xí)資料禮包,包含學(xué)習(xí)建議:入門看什么,進階看什么。關(guān)注公眾號 「polarisxu」,回復(fù) ebook 獲取;還可以回復(fù)「進群」,和數(shù)萬 Gopher 交流學(xué)習(xí)。

          瀏覽 9
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  www.caopeng | 日日爽夜夜爽 | 伊人大香蕉在线网 | 在线亚洲视屏 | 俺也来也去成人拍拍网 |