難以駕馭的 Go timer,一文帶你參透計(jì)時(shí)器的奧秘
大家好,我是煎魚(yú)。今天的男主角是計(jì)時(shí)器 timer。
在實(shí)際的應(yīng)用工程中,我們常常會(huì)需要多久后,或定時(shí)去做某個(gè)事情。甚至在分析標(biāo)準(zhǔn)庫(kù) context 的父子級(jí)傳播時(shí),都能見(jiàn)到等待多久后自動(dòng)觸發(fā)取消事件的蹤影。
而在 Go 語(yǔ)言中,能夠完成這類運(yùn)行的功能訴求就是標(biāo)準(zhǔn)庫(kù) time,在具體的功能范疇上我們稱其為 “計(jì)時(shí)器“,是一個(gè)非常具有價(jià)值的一個(gè)模塊。在這篇文章中我們將對(duì)其做進(jìn)一步的分析和研討。
什么是 timer
可以控制時(shí)間,確保應(yīng)用程序中的某段代碼在某個(gè)時(shí)刻運(yùn)行。在 Go 語(yǔ)言中可以單次執(zhí)行,也可以循環(huán)執(zhí)行。
最常見(jiàn)的方式就是引用標(biāo)準(zhǔn)庫(kù) time 去做一些事情,普通開(kāi)發(fā)者經(jīng)常使用到的標(biāo)準(zhǔn)庫(kù)代碼是:
time.Now().Unix()
上述代碼可用于獲取當(dāng)前時(shí)間的 Unix 時(shí)間戳,而在內(nèi)部的具體實(shí)現(xiàn)上提供了 Time、Timer 以及 Ticker 的各類配套方法。
timer 基本特性
Timer
演示代碼:
func main() {
timer := time.NewTimer(2 * time.Second)
<-timer.C
fmt.Println("我的腦子真的進(jìn)煎魚(yú)了!")
}
輸出結(jié)果:
// 等待兩秒...
我的腦子真的進(jìn)煎魚(yú)了!
我們可以通過(guò) time.NewTimer 方法定時(shí)在 2 秒進(jìn)行程序的執(zhí)行。而其還有個(gè)變種的用法,在做 channel 的源碼剖析時(shí)有發(fā)現(xiàn)
func main() {
v := make(chan struct{})
timer := time.AfterFunc(2*time.Second, func() {
fmt.Println("我想在這個(gè)點(diǎn)吃煎魚(yú)!")
v <- struct{}{}
})
defer timer.Stop()
<-v
}
在等待 2 秒后,會(huì)立即調(diào)用 time.AfterFunc 所對(duì)應(yīng)的匿名方法。在時(shí)間上我們也可以指定對(duì)應(yīng)的具體時(shí)間,達(dá)到異步的定時(shí)執(zhí)行等訴求。
Ticker
演示代碼:
func main() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
done := make(chan bool)
go func() {
time.Sleep(10 * time.Second)
done <- true
}()
for {
select {
case <-done:
fmt.Println("Done!")
return
case t := <-ticker.C:
fmt.Println("炸煎魚(yú): ", t.Unix())
}
}
}
輸出結(jié)果:
// 每隔一秒輸出一次
炸煎魚(yú): 1611666168
炸煎魚(yú): 1611666169
炸煎魚(yú): 1611666170
炸煎魚(yú): 1611666171
...
我們通過(guò) time.NewTicker 方法設(shè)定每 1 秒執(zhí)行一次方法,因此在 for-select 中,我們會(huì)每 1 秒就可以自動(dòng) “炸一條煎魚(yú)”,真是快樂(lè)極了。
而由于我們?cè)?goroutine 中通過(guò) sleep 方法的設(shè)定了 done 變量的輸入,因此在 10 秒后就會(huì)結(jié)束炸煎魚(yú)的循環(huán)輸出,最終退出。
最小堆:四叉堆
在 Go 語(yǔ)言中,內(nèi)置計(jì)時(shí)器的數(shù)據(jù)結(jié)構(gòu)都會(huì)涉及到最小四叉堆,如下圖所示:

整體來(lái)講就是父節(jié)點(diǎn)一定比其子節(jié)點(diǎn)小,子節(jié)點(diǎn)之間沒(méi)有任何關(guān)系和大小的要求。
數(shù)據(jù)結(jié)構(gòu)
在 Go 語(yǔ)言中每個(gè)計(jì)時(shí)器運(yùn)行時(shí)的基本單元是 runtime.timer:
type timer struct {
pp puintptr
when int64
period int64
f func(interface{}, uintptr)
arg interface{}
seq uintptr
nextwhen int64
status uint32
}
pp:計(jì)時(shí)器所在的處理器 P 的指針地址。 when:計(jì)時(shí)器被喚醒的時(shí)間。 period:計(jì)時(shí)器再次被喚醒的時(shí)間(when+period)。 f:回調(diào)函數(shù),每次在計(jì)時(shí)器被喚醒時(shí)都會(huì)調(diào)用。 arg:回調(diào)函數(shù)的參數(shù),每次在計(jì)時(shí)器被喚醒時(shí)會(huì)將該參數(shù)項(xiàng)傳入回調(diào)函數(shù) f中。seq:回調(diào)函數(shù)的參數(shù),該參數(shù)僅在 netpoll的應(yīng)用場(chǎng)景下使用。nextwhen:當(dāng)計(jì)時(shí)器狀態(tài)為 timerModifiedXX 時(shí),將會(huì)使用 nextwhen的值設(shè)置到where字段上。status:計(jì)時(shí)器的當(dāng)前狀態(tài)值,計(jì)時(shí)器本身包含大量的枚舉標(biāo)識(shí),這塊會(huì)在后面介紹。
但這類基本單元都不會(huì)是對(duì)用戶端暴露的結(jié)構(gòu)體,在對(duì)外上我們直觀見(jiàn)的最多的是 time.NewTimer 所創(chuàng)建的 Timer 結(jié)構(gòu)體:
type Timer struct {
C <-chan Time
r runtimeTimer
}
C:用于接收 Timer所觸發(fā)的事件,當(dāng)計(jì)時(shí)器的消息事件(例如:到期)發(fā)生時(shí),該 channel 會(huì)接收到通知。r:與 runtime.timer作用類似,內(nèi)在屬性保持一致。
同時(shí)在計(jì)時(shí)器運(yùn)行模式上自 Go1.14 起發(fā)生了變更,runtime.timer 改為將每個(gè) timer 均存儲(chǔ)在對(duì)應(yīng)的處理器 P 中
type p struct {
...
timersLock mutex
timers []*timer
...
}
在處理器 P 上,timers 字段就是一個(gè)以最小四叉堆形式存儲(chǔ)的媒介。在時(shí)序上,需要立刻執(zhí)行,或說(shuō)需要越早執(zhí)行的,就越排在堆的越上面:

實(shí)現(xiàn)原理
在了解了計(jì)時(shí)器的基本特性和數(shù)據(jù)結(jié)構(gòu)后,我們進(jìn)一步展開(kāi),一層層剖析其原理,看看其是何物。在 Go 語(yǔ)言中,計(jì)時(shí)器在運(yùn)行時(shí)涉及十種狀態(tài)處理,分別涉及增、刪、改以及重置等操作。
計(jì)時(shí)器所包含的狀態(tài)如下:
| 狀態(tài)名 | 含義 |
|---|---|
| timerNoStatus | 計(jì)時(shí)器尚未設(shè)置狀態(tài) |
| timerWaiting | 等待計(jì)時(shí)器啟動(dòng) |
| timerRunning | 運(yùn)行計(jì)時(shí)器的回調(diào)方法 |
| timerDeleted | 計(jì)時(shí)器已經(jīng)被刪除,但仍然在某些 P 的堆中 |
| timerRemoving | 計(jì)時(shí)器即將被刪除 |
| timerRemoved | 計(jì)時(shí)器已經(jīng)停止,且不在任何 P 的堆中 |
| timerModifying | 計(jì)時(shí)器正在被修改 |
| timerModifiedEarlier | 計(jì)時(shí)器已被修改為更早的時(shí)間 |
| timerModifiedLater | 計(jì)時(shí)器已被修改為更晚的時(shí)間 |
| timerMoving | 計(jì)時(shí)器已經(jīng)被修改,正在被移動(dòng) |
這時(shí)候可能就會(huì)有小伙伴疑惑,各種啟動(dòng)、刪除、停止、啟動(dòng)是指代的是什么意思?為什么會(huì)涉及到 P 的管理?
創(chuàng)建計(jì)時(shí)器
接下來(lái)我們依然是從 NewTimer 和 NewTicker 方法開(kāi)始入手:
func NewTimer(d Duration) *Timer {
c := make(chan Time, 1)
t := &Timer{
C: c,
r: runtimeTimer{
when: when(d),
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}
在該方法中,其主要包含如下動(dòng)作:
創(chuàng)建 Timer對(duì)象,主要是C和r屬性,含義與前面所表述的一致。調(diào)用 startTimer方法,啟動(dòng)計(jì)時(shí)器。
NewTicker 方法與 NewTimer 類似,主要是增加了 period 字段:
func NewTicker(d Duration) *Ticker {
c := make(chan Time, 1)
t := &Ticker{
C: c,
r: runtimeTimer{
when: when(d),
period: int64(d),
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}
在 Ticker 結(jié)構(gòu)體中,period 字段用于表示計(jì)時(shí)器再次被喚醒的時(shí)間,可以便于做輪詢觸發(fā)。
啟動(dòng)計(jì)時(shí)器
在前面調(diào)用 NewTimer、NewTicker 方法時(shí),會(huì)將新創(chuàng)建的新計(jì)時(shí)器 timer 加入到創(chuàng)建 timer 的 P 的最小堆中:
func addtimer(t *timer) {
if t.when < 0 {
t.when = maxWhen
}
if t.status != timerNoStatus {
throw("addtimer called with initialized timer")
}
t.status = timerWaiting
when := t.when
pp := getg().m.p.ptr()
lock(&pp.timersLock)
cleantimers(pp)
doaddtimer(pp, t)
unlock(&pp.timersLock)
wakeNetPoller(when)
}
檢查是否滿足基本條件:新增計(jì)時(shí)器的邊界處理, timerNoStatus狀態(tài)判斷排除。調(diào)用 cleantimers方法:清理處理器 P 中的計(jì)時(shí)器隊(duì)列,可以加快創(chuàng)建和刪除計(jì)時(shí)器的程序的速度。調(diào)用 doaddtimer方法:將當(dāng)前所新創(chuàng)建的timer新增到當(dāng)前處理器 P 的堆中。調(diào)用 wakeNetPoller方法:?jiǎn)拘丫W(wǎng)絡(luò)輪詢器中休眠的線程,檢查計(jì)時(shí)器被喚醒的時(shí)間(when)是否在當(dāng)前輪詢預(yù)期運(yùn)行的時(shí)間(pollerPollUntil)內(nèi),若是喚醒。
停止計(jì)時(shí)器
在計(jì)時(shí)器的運(yùn)轉(zhuǎn)中,一般會(huì)調(diào)用 timer.Stop() 方法來(lái)停止/終止/刪除計(jì)時(shí)器。雖然說(shuō)法多樣。但大家的真實(shí)目的是一樣的,就是讓這個(gè) timer 從輪詢器中消失,也就是從處理器 P 的堆中移除 timer:
func deltimer(t *timer) bool {
for {
switch s := atomic.Load(&t.status); s {
case timerWaiting, timerModifiedLater:
// timerWaiting/timerModifiedLater -> timerDeleted
...
case timerModifiedEarlier:
// timerModifiedEarlier -> timerModifying -> timerDeleted
...
case timerDeleted, timerRemoving, timerRemoved:
// timerDeleted/timerRemoving/timerRemoved
return false
case timerRunning, timerMoving:
// timerRunning/timerMoving
osyield()
case timerNoStatus:
return false
case timerModifying:
osyield()
default:
badTimer()
}
}
}
但移除也不是直接一個(gè) delete 就完事的,其在真正的刪除方法 deltimer 中遵循了基本的規(guī)則處理:
timerWaiting/timerModifiedLater -> timerDeleted。 timerModifiedEarlier -> timerModifying -> timerDeleted。 timerDeleted/timerRemoving/timerRemoved -> 無(wú)需變更,已經(jīng)滿足條件。 timerRunning/timerMoving/timerModifying -> 正在執(zhí)行、移動(dòng)中,無(wú)法停止,等待下一次狀態(tài)檢查再處理。 timerNoStatus -> 無(wú)法停止,不滿足條件。
上述五個(gè)基本流轉(zhuǎn)邏輯就覆蓋了 runtimer.deltimer 方法了,若有進(jìn)一步需求的可通過(guò)傳送門詳細(xì)閱讀。
修改/重置計(jì)時(shí)器
在應(yīng)用程序的調(diào)度中,有時(shí)候因?yàn)檫壿嫯a(chǎn)生了變更,我們需要重置計(jì)時(shí)器。這時(shí)候一般會(huì)調(diào)用 timer.Reset() 方法來(lái)重新設(shè)置 Duration 值。
其表面對(duì)應(yīng)的是 resetTimer 方法,但實(shí)際與修改計(jì)時(shí)器的 modtimer 方法是共用的:
func resettimer(t *timer, when int64) bool {
return modtimer(t, when, t.period, t.f, t.arg, t.seq)
}
因此在這節(jié)中我們可以將重置和修改計(jì)時(shí)器放在一起分析。修改計(jì)時(shí)器,本質(zhì)上是需要變更現(xiàn)有計(jì)時(shí)器,而在 Go 語(yǔ)言的計(jì)時(shí)器中是需要遵循基本規(guī)則,因此 modtimer 遵循下述規(guī)則處理:
timerWaiting -> timerModifying -> timerModifiedXX timerModifiedXX -> timerModifying -> timerModifiedYY timerNoStatus -> timerModifying -> timerWaiting timerRemoved -> timerModifying -> timerWaiting timerDeleted -> timerModifying -> timerModifiedXX timerRunning -> 等待狀態(tài)改變,才可以進(jìn)行下一步 timerMoving -> 等待狀態(tài)改變,才可以進(jìn)行下一步 timerRemoving -> 等待狀態(tài)改變,才可以進(jìn)行下一步 timerModifying -> 等待狀態(tài)改變,才可以進(jìn)行下一步
func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) bool {
...
if wasRemoved {
t.when = when
pp := getg().m.p.ptr()
lock(&pp.timersLock)
doaddtimer(pp, t)
unlock(&pp.timersLock)
releasem(mp)
wakeNetPoller(when)
} else {
t.nextwhen = when
newStatus := uint32(timerModifiedLater)
if when < t.when {
newStatus = timerModifiedEarlier
}
...
releasem(mp)
if newStatus == timerModifiedEarlier {
wakeNetPoller(when)
}
}
return pending
}
在完成了計(jì)時(shí)器的狀態(tài)處理后,會(huì)分為兩種情況處理:
待修改的計(jì)時(shí)器已經(jīng)被刪除:由于既有的計(jì)時(shí)器已經(jīng)沒(méi)有了,因此會(huì)調(diào)用 doaddtimer方法創(chuàng)建一個(gè)新的計(jì)時(shí)器,并將原本的timer屬性賦值過(guò)去,再調(diào)用wakeNetPoller方法在預(yù)定時(shí)間喚醒網(wǎng)絡(luò)輪詢。正常邏輯處理:如果修改后的計(jì)時(shí)器的觸發(fā)時(shí)間小于原本的觸發(fā)時(shí)間,則修改該計(jì)時(shí)器的狀態(tài)為 timerModifiedEarlier,并且調(diào)用wakeNetPoller方法在預(yù)定時(shí)間喚醒網(wǎng)絡(luò)輪詢。
觸發(fā)計(jì)時(shí)器
在前面有提到 Go1.14 后,Go Timer 都已經(jīng)歸屬到各個(gè)處理器 P 中去了,因此計(jì)時(shí)器的觸發(fā)分為了兩個(gè)部分:
通過(guò)調(diào)度器在調(diào)度時(shí)進(jìn)行計(jì)時(shí)器的觸發(fā)。 通過(guò)系統(tǒng)監(jiān)控檢查并觸發(fā)計(jì)時(shí)器(到期未執(zhí)行)。
調(diào)度器觸發(fā)
調(diào)度器的觸發(fā)一共分兩種情況,一種是在調(diào)度循環(huán)的時(shí)候調(diào)用 checkTimers 方法進(jìn)行計(jì)時(shí)器的觸發(fā):
func schedule() {
_g_ := getg()
top:
pp := _g_.m.p.ptr()
pp.preempt = false
// 處理調(diào)度時(shí)的計(jì)時(shí)器觸發(fā)
checkTimers(pp, 0)
...
execute(gp, inheritTime)
}
另外一種是當(dāng)前處理器 P 沒(méi)有可執(zhí)行的 Timer,且沒(méi)有可執(zhí)行的 G。那么按照調(diào)度模型,就會(huì)去竊取其他計(jì)時(shí)器和 G:
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
top:
_p_ := _g_.m.p.ptr()
...
now, pollUntil, _ := checkTimers(_p_, 0)
...
}
調(diào)度系統(tǒng)在計(jì)時(shí)器處不深究,我們進(jìn)一步剖析具體觸發(fā)計(jì)時(shí)器的 checkTimers 方法:
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
if atomic.Load(&pp.adjustTimers) == 0 {
next := int64(atomic.Load64(&pp.timer0When))
if next == 0 {
return now, 0, false
}
if now == 0 {
now = nanotime()
}
if now < next {
if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {
return now, next, false
}
}
}
lock(&pp.timersLock)
adjusttimers(pp)
...
}
起始先通過(guò) pp.adjustTimers檢查當(dāng)前處理器 P 中是否有需要處理的計(jì)時(shí)器。若無(wú)需執(zhí)行的計(jì)時(shí)器,則直接返回。 若有,則判斷下一個(gè)計(jì)時(shí)器待刪除的計(jì)時(shí)器和處理器 P 上的計(jì)時(shí)器數(shù)量,若前者小于后者 1/4 則直接返回。 確定需要處理計(jì)時(shí)器后,通過(guò)調(diào)用 adjusttimers方法重新根據(jù)時(shí)間將timers切片中timer的先后順序重新排列(相當(dāng)于 resort)。
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
...
rnow = now
if len(pp.timers) > 0 {
if rnow == 0 {
rnow = nanotime()
}
for len(pp.timers) > 0 {
if tw := runtimer(pp, rnow); tw != 0 {
if tw > 0 {
pollUntil = tw
}
break
}
ran = true
}
}
...
}
在前面調(diào)整了 timers 切片中的最小堆的排序后,將會(huì)調(diào)用 runtimer 方法去真正運(yùn)行所需要執(zhí)行的 timer,完成觸計(jì)時(shí)器的發(fā)。
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
...
if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {
clearDeletedTimers(pp)
}
unlock(&pp.timersLock)
return rnow, pollUntil, ran
}
在最后掃尾階段,如果當(dāng)前 G 的處理器與調(diào)用 checkTimers 方法所傳入的處理器一致,并且處理器中 timerDeleted 狀態(tài)的計(jì)時(shí)器數(shù)量是處理器 P 堆中的計(jì)時(shí)器的 1/4 以上,則調(diào)用 clearDeletedTimers 方法對(duì)已為刪除狀態(tài)的的計(jì)時(shí)器進(jìn)行清理。
系統(tǒng)監(jiān)控觸發(fā)
即使是通過(guò)每次調(diào)度器調(diào)度和竊取的時(shí)候觸發(fā),但畢竟是具有一定的隨機(jī)和不確定性。
因此系統(tǒng)監(jiān)控觸發(fā)依然是一個(gè)兜底保障,在 Go 語(yǔ)言中 runtime.sysmon 方法承擔(dān)了這一個(gè)責(zé)任,存在觸發(fā)計(jì)時(shí)器的邏輯:
func sysmon() {
...
for {
...
next, _ := timeSleepUntil()
if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {
lock(&sched.lock)
if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {
if next > now {
...
next, _ = timeSleepUntil()
lock(&sched.lock)
atomic.Store(&sched.sysmonwait, 0)
noteclear(&sched.sysmonnote)
}
idle = 0
delay = 20
}
unlock(&sched.lock)
}
...
}
}
在每次進(jìn)行系統(tǒng)監(jiān)控時(shí),都會(huì)在流程上調(diào)用 timeSleepUntil 方法去獲取下一個(gè)計(jì)時(shí)器應(yīng)觸發(fā)的時(shí)間,以及保存該計(jì)時(shí)器已打開(kāi)的計(jì)時(shí)器堆的 P。
在獲取完畢后會(huì)馬上檢查當(dāng)前是否存在 GC,若是正在 STW 則獲取調(diào)度互斥鎖。若發(fā)現(xiàn)下一個(gè)計(jì)時(shí)器的觸發(fā)時(shí)間已經(jīng)過(guò)去,則重新調(diào)用 timeSleepUntil 獲取下一個(gè)計(jì)時(shí)器的時(shí)間和相應(yīng) P 的地址。
func sysmon() {
...
for {
...
lock(&sched.sysmonlock)
{
now1 := nanotime()
if now1-now > 50*1000 /* 50μs */ {
next, _ = timeSleepUntil()
}
now = now1
}
...
}
}
檢查 sched.sysmonlock 所花費(fèi)的時(shí)間是否超過(guò) 50μs。若是,則有可能前面所獲取的下一個(gè)計(jì)時(shí)器觸發(fā)時(shí)間已過(guò)期,因此重新調(diào)用 timeSleepUntil 方法再次獲取。
func sysmon() {
...
for {
...
lastpoll := int64(atomic.Load64(&sched.lastpoll))
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
list := netpoll(0) // non-blocking - returns list of goroutines
if !list.empty() {
incidlelocked(-1)
injectglist(&list)
incidlelocked(1)
}
}
if next < now {
startm(nil, false)
}
}
}
如果發(fā)現(xiàn)超過(guò) 10ms 的時(shí)間沒(méi)有進(jìn)行 netpoll 網(wǎng)絡(luò)輪詢,則主動(dòng)調(diào)用 netpoll 方法觸發(fā)輪詢。
同時(shí)如果存在不可搶占的處理器 P,則調(diào)用 startm 方法來(lái)運(yùn)行那些應(yīng)該運(yùn)行,但沒(méi)有在運(yùn)行的計(jì)時(shí)器。
運(yùn)行計(jì)時(shí)器
runtimer 方法主要承擔(dān)計(jì)時(shí)器的具體運(yùn)行,同時(shí)也會(huì)針對(duì)計(jì)時(shí)器的不同狀態(tài)(含刪除、修改、等待等)都進(jìn)行了對(duì)應(yīng)的處理,也相當(dāng)于是個(gè)大的集中處理中樞了。例如在 timerDeleted 狀態(tài)下的計(jì)時(shí)器將會(huì)進(jìn)行刪除。
其遵循下述規(guī)則處理
timerNoStatus -> 恐慌:計(jì)時(shí)器未初始化 timerWaiting -> timerWaiting timerWaiting -> timerRunning -> timerNoStatus timerWaiting -> timerRunning -> timerWaiting timerModifying -> 等待狀態(tài)改變,才可以進(jìn)行下一步 timerModifiedXX -> timerMoving -> timerWaiting timerDeleted -> timerRemoving -> timerRemoved timerRunning -> 恐慌:并發(fā)調(diào)用 timerRemoved -> 恐慌:計(jì)時(shí)器堆不一致 timerRemoving -> 恐慌:計(jì)時(shí)器堆不一致 timerMoving -> 恐慌:計(jì)時(shí)器堆不一致
我們?cè)俑鶕?jù)時(shí)間狀態(tài)機(jī),去針對(duì)性的看看源碼是如何實(shí)現(xiàn)的:
func runtimer(pp *p, now int64) int64 {
for {
t := pp.timers[0]
switch s := atomic.Load(&t.status); s {
case timerWaiting:
if t.when > now {
return t.when
}
runOneTimer(pp, t, now)
return 0
case timerDeleted:
...
case timerModifiedEarlier, timerModifiedLater:
...
case timerModifying:
osyield()
case timerNoStatus, timerRemoved:
badTimer()
case timerRunning, timerRemoving, timerMoving:
badTimer()
default:
badTimer()
}
}
}
我們主要關(guān)注運(yùn)行計(jì)時(shí)器,也就是 timerWaiting 狀態(tài)下的處理,其首先會(huì)對(duì)觸發(fā)時(shí)間(when)進(jìn)行判定,若大于當(dāng)前時(shí)間則直接返回(因?yàn)樗栌|發(fā)的時(shí)間未到)。否則將會(huì)調(diào)用 runOneTimer 方法去執(zhí)行本次觸發(fā):
func runOneTimer(pp *p, t *timer, now int64) {
f := t.f
arg := t.arg
seq := t.seq
if t.period > 0 {
delta := t.when - now
t.when += t.period * (1 + -delta/t.period)
siftdownTimer(pp.timers, 0)
if !atomic.Cas(&t.status, timerRunning, timerWaiting) {
badTimer()
}
updateTimer0When(pp)
} else {
dodeltimer0(pp)
}
unlock(&pp.timersLock)
f(arg, seq)
lock(&pp.timersLock)
}
如果 period大于 0,說(shuō)明當(dāng)前是 ticker,需要再次觸發(fā),因此還需要調(diào)整計(jì)時(shí)器的狀態(tài)。重新計(jì)算下一次的觸發(fā)時(shí)間,并且更新其在最小堆的位置。 調(diào)用 atomic.Cas方法該計(jì)時(shí)器的狀態(tài)從timerRunning原子修改為timerWaiting狀態(tài)。調(diào)用 updateTimer0When方法設(shè)置處理器 P 的timer0When字段。如果 period等于 0,說(shuō)明當(dāng)前是 timer,只需要單次觸發(fā)就可以了。
在完成計(jì)時(shí)器的運(yùn)行屬性更新后,上互斥鎖,調(diào)用計(jì)時(shí)器的回調(diào)方法 f,完成本次完整的觸發(fā)流程。
總結(jié)
Go 語(yǔ)言的 Timer 其實(shí)已經(jīng)改過(guò)了好幾版,在 Go1.14 的正式大改版后。目前來(lái)看已經(jīng)初步的到了一個(gè)新的階段。其設(shè)計(jì)的模式主要圍繞三塊:
在各個(gè)處理器 P 中,Timer 以最小四叉堆的存儲(chǔ)方式在 timers 中。 在調(diào)度器的每輪調(diào)度中都會(huì)對(duì)計(jì)時(shí)器進(jìn)行觸發(fā)和檢查。 在系統(tǒng)監(jiān)聽(tīng)上 netpoll會(huì)定時(shí)進(jìn)行計(jì)時(shí)器的觸發(fā)和檢查。在計(jì)時(shí)器的處理中,十個(gè)狀態(tài)的流轉(zhuǎn)和對(duì)應(yīng)處理非常重要。
?? 點(diǎn)擊關(guān)注煎魚(yú),在知識(shí)的海洋里遨游
