你真的了解 sync.Once 嗎

來這家公司轉(zhuǎn)型做go大概一個多月了吧,工作中也是邊寫邊學(xué),最近也是在極客時間學(xué)習(xí)一些go相關(guān)課程,現(xiàn)學(xué)現(xiàn)用,源碼在我 github 上。
是什么
引用官方描述的一段話,Once is?a?object?that will?perform exactly?one action,即它是一個對象,它提供了保證某個動作只被執(zhí)行一次的功能。最典型的場景當(dāng)然就是單例對象的初始化操作。
咋么做
Once?的代碼很簡潔,從頭到尾加注釋不超過 70 行代碼。對外暴露了一個唯一接口 Do(f func())?,使用起來也是非常簡單。
package mainimport ("fmt""sync")func main() {var once sync.Oncefun1 := func() {fmt.Println("第一次打印")}??once.Do(fun1)??fun2 := func() {fmt.Println("第二次打印")??}once.Do(fun2)}
在運行上面這段代碼之后,從結(jié)果中你會發(fā)現(xiàn)只運行了?fun1。這樣看好像沒什么問題,但是這段代碼并不是并發(fā)的調(diào)用 Do()?,那就稍微調(diào)整一下代碼:
package mainimport ("fmt""sync""time")func main() {var once sync.Oncefor i := 0; i < 5; i++ {go func(i int) {fun1 := func() {????????fmt.Printf("i:=%d\n",?i)}once.Do(fun1)}(i)}??//?為了防止主goroutine直接運行完了,啥都看不到time.Sleep(50 * time.Millisecond)}
我們開啟了5個并發(fā)的?goroutine?,不管你咋么運行,始終只打印一次,至于?i 是多少,就看先執(zhí)行的是哪個?g?了。Once 保證只有第一次調(diào)用?Do()?方法時,傳遞的 f?(無參數(shù)無返回值的函數(shù))?才會執(zhí)行,并且之后不管調(diào)用的參數(shù)是否改變了,也不再執(zhí)行。
咋么實現(xiàn)
在看一個功能的同時,其實我們本身也可以站在技術(shù)的角度上來思考,如果是你,你會咋么實現(xiàn)這個?Once。我覺得這是件很有意思的事情。
第一時間想到的就是 go 中開箱即用的 sync.Mutex 的? Lock()?方法的第一段:
// Lock locks m.// If the lock is already in use, the calling goroutine// blocks until the mutex is available.func (m *Mutex) Lock() {// Fast path: grab unlocked mutex.??if?atomic.CompareAndSwapInt32(&m.state,?0,?mutexLocked)?{? ?......return}......}
利用?atomic?的原子操作來實現(xiàn)這個需求。這確實可以保證只執(zhí)行一次。但是也存在一個巨大的坑,我們來驗證下:
package mainimport ("fmt""net""sync/atomic""time")type OnceA struct {done uint32}func (o *OnceA) Do(f func()) {if !atomic.CompareAndSwapUint32(&o.done, 0, 1) {return}f()}func main() {var once OnceAvar conn net.Conngo func() {????fun1?:=?func()?{??????time.Sleep(5?*?time.Second)?//模擬初始化的速度很慢conn, _ = net.DialTimeout("tcp", "baidu.com:80", time.Second)}once.Do(fun1)}()??time.Sleep(500?*?time.Millisecond)fun2 := func() {fmt.Println("執(zhí)行fun2")conn, _ = net.DialTimeout("tcp", "baidu.com:80", time.Second)}??//再調(diào)用do已經(jīng)檢查到done為1了??once.Do(fun2)_, err := conn.Write([]byte("\"GET / HTTP/1.1\\r\\nHost: baidu.com\\r\\n Accept: */*\\r\\n\\r\\n\""))if err != nil {fmt.Println("err:", err)}}
conn?是一個?net.Conn?的接口類型變量,這里為了達到效果,通過?sleep?模擬了初始化資源的耗時?,當(dāng)?fun2()?想要進行初始化的時候,已然發(fā)現(xiàn) done?的值是 1 了,但是?fun1 初始化速度很慢,導(dǎo)致接下來操作 conn.Write 的時候,因為此時?conn?還是一個空資源,最終運行時拋出空指針的?panic 了。
這個問題的原因在于真正使用資源的時候,資源初始化還沒到位,真是尷尬?。
那么 Go 是如何避免這種問題的呢?
// Copyright 2009 The Go Authors. All rights reserved.// Use of this source code is governed by a BSD-style// license that can be found in the LICENSE file.package syncimport ("sync/atomic")// Once is an object that will perform exactly one action.type?Once?struct?{done uint32m Mutex}func (o *Once) Do(f func()) {// Note: Here is an incorrect implementation of Do://// if atomic.CompareAndSwapUint32(&o.done, 0, 1) {// f()// }//// Do guarantees that when it returns, f has finished.// This implementation would not implement that guarantee:// given two simultaneous calls, the winner of the cas would// call f, and the second would return immediately, without// waiting for the first's call to f to complete.// This is why the slow path falls back to a mutex, and why// the atomic.StoreUint32 must be delayed until after f returns.if atomic.LoadUint32(&o.done) == 0 {// Outlined slow-path to allow inlining of the fast-path.o.doSlow(f)}}func (o *Once) doSlow(f func()) {o.m.Lock()defer o.m.Unlock()if o.done == 0 {defer atomic.StoreUint32(&o.done, 1)f()}}
你看大佬都直接注釋貼心的告訴你?if atomic.CompareAndSwapUint32(&o.done, 0, 1) 這個不是正確的實現(xiàn)。并發(fā)的情況下,勝者獲得調(diào)用?f?,但是第二個會直接返回,沒有等待第一個初始化結(jié)束。
所以 Once?實現(xiàn)使用了一個互斥鎖,互斥鎖保證了只有一個?g?初始化,同時采取的是雙檢查的機制,再次判斷?Once.done?是否為0,如果為 0,代表第一次初始化,等到初始化結(jié)束之后,再釋放鎖。并發(fā)情況下,其他的 g?就會被阻塞在 o.m.Lock()。
如何避坑
說是避坑,但是絕大多數(shù)的坑都是由于程序員自身代碼問題所導(dǎo)致的,雖然有點尷尬,但確實如此。?Once 的“坑” 還算少的,不像?sync.Mutex 和 Channel?那樣,稍微姿勢不注意點就?panic?了。這一塊后續(xù)再寫文章介紹下。除了上面需要注意的使用資源的時候資源還未初始化完成的問題,在?Once?中還需要避免的是死鎖問題
//?由于嵌套調(diào)用?Do 里面的 lock導(dǎo)致死鎖func ErrOne() {??var?o?sync.Onceo.Do(func() {o.Do(func() {fmt.Println("初始化")})})}
?Do?調(diào)用了?f,f 里面又調(diào)用了?Do,最終導(dǎo)致死鎖。我把上面的代碼簡化成下面這樣
package mainimport "sync"func main() {var mu sync.Mutexmu.Lock()mu.Lock()}
避免這種錯誤也很簡單,不要在?f?函數(shù)中再次調(diào)用當(dāng)前的?Once?即可。
延伸
上面有提到過,Once.Do?由于某些原因?qū)е鲁跏蓟。窃膯栴}在于,后續(xù)再也沒有機會執(zhí)行同一個?Once.Do 了,發(fā)生這樣的情況,理想的處理是,只有真正初始化成功,才設(shè)置?Done?的值,并且如果初始化失敗,理應(yīng)通知到上游服務(wù),這樣上游服務(wù)可以做一些重試機制或者異常處理等操作。
package mainimport ("fmt""io""net""os""sync""sync/atomic""time")type Once struct {done uint32m sync.Mutex}//?傳入的f?有返回值,如果初始化失敗,返回對應(yīng)error,//?Do方法再把這個err返回給上游服務(wù)func?(o?*Once)?Do(f?func()?error)?error?{if atomic.LoadUint32(&o.done) == 1 { //fast pathreturn nil}??return?o.doSlow(f)}func?(o?*Once)?doSlow(f?func()?error)?error?{o.m.Lock()defer o.m.Unlock()var err error??if?o.done?==?0?{?//雙檢查,還沒有初始化????err?=?f()????if?err?==?nil?{?//?只有真正初始化成功才把 done 的值改成1atomic.StoreUint32(&o.done, 1)}}return err}
我們改變了f?函數(shù),增加了一個返回值,在初始化失敗之后返回給?Do?函數(shù),由?Do?函數(shù)再把錯誤返回給上游的調(diào)用方,把控制權(quán)交還給調(diào)用方做失敗的處理。另外改動的一點是,只有真正初始化成功之后才把?Done 的值改成1。那么我們可以簡單的把上面的業(yè)務(wù)代碼改造一下:
package mainimport ("fmt""io""net""os""sync""sync/atomic""time")type Once struct {done uint32m sync.Mutex}// 傳入的f 有返回值,如果初始化失敗,返回對應(yīng)error,// Do方法再把這個err返回給上游服務(wù)func (o *Once) Do(fn func() error) error {if atomic.LoadUint32(&o.done) == 1 {return nil}return o.doSlow(fn)}func (o *Once) doSlow(fn func() error) error {o.m.Lock()defer o.m.Unlock()var err errorif o.done == 0 { /雙檢查,還沒有初始化err = fn()????if?err?==?nil?{?//?只有真正初始化成功才把 done 的值改成1atomic.StoreUint32(&o.done, 1)}}return err}func main() {urls := []string{"127.0.0.1:3453","127.0.0.1:9002","127.0.0.1:9003","baidu.com:80",}var conn net.Connvar o Oncecount := 0var err errorfor _, url := range urls {err := o.Do(func() error {count++fmt.Printf("初始化%d次\n", count)conn, err = net.DialTimeout("tcp", url, time.Second)fmt.Println(err)return err})if err == nil {break}if count == 3 {fmt.Println("初始化失敗,不再重試")break}}if conn != nil {_, _ = conn.Write([]byte("GET / HTTP/1.1\r\nHost: google.com\r\n Accept: */*\r\n\r\n"))_, _ = io.Copy(os.Stdout, conn)}}
當(dāng)我們在使用一些開源工具時,只要業(yè)務(wù)需要,你可以改造各種你想要的東西。有時候,阻塞住你的,往往就是一身空想罷了。共勉
推薦閱讀
