Go 工程師必學(xué) -- 信號量的原理與使用
# 什么是信號量
信號量是并發(fā)編程中常見的一種同步機(jī)制,在需要控制訪問資源的線程數(shù)量時就會用到信號量,關(guān)于什么是信號量這個問題,我引用一下維基百科對信號量的解釋,大家就明白了。
信號量的概念是計算機(jī)科學(xué)家 Dijkstra (Dijkstra算法的發(fā)明者)提出來的,廣泛應(yīng)用在不同的操作系統(tǒng)中。系統(tǒng)中,會給每一個進(jìn)程一個信號量,代表每個進(jìn)程當(dāng)前的狀態(tài),未得到控制權(quán)的進(jìn)程,會在特定的地方被迫停下來,等待可以繼續(xù)進(jìn)行的信號到來。
如果信號量是一個任意的整數(shù),通常被稱為計數(shù)信號量(Counting semaphore),或一般信號量(general semaphore);如果信號量只有二進(jìn)制的0或1,稱為二進(jìn)制信號量(binary semaphore)。在linux系統(tǒng)中,二進(jìn)制信號量(binary semaphore)又稱互斥鎖(Mutex)
計數(shù)信號量具備兩種操作動作,稱為V(signal())與P(wait())(即部分參考書常稱的“PV操作”)。V操作會增加信號量S的數(shù)值,P操作會減少它。
運(yùn)行方式:
初始化信號量,給與它一個非負(fù)數(shù)的整數(shù)值。
運(yùn)行P( wait()),信號量S的值將被減少。企圖進(jìn)入臨界區(qū)的進(jìn)程,需要先運(yùn)行P(wait())。當(dāng)信號量S減為負(fù)值時,進(jìn)程會被阻塞住,不能繼續(xù);當(dāng)信號量S不為負(fù)值時,進(jìn)程可以獲準(zhǔn)進(jìn)入臨界區(qū)。
運(yùn)行V(signal()),信號量S的值會被增加。結(jié)束離開臨界區(qū)的進(jìn)程,將會運(yùn)行V(signal())。當(dāng)信號量S不為負(fù)值時,先前被阻塞住的其他進(jìn)程,將可獲準(zhǔn)進(jìn)入臨界區(qū)。
我們一般用信號量保護(hù)一組資源,比如數(shù)據(jù)庫連接池、一組客戶端的連接等等。每次獲取資源時都會將信號量中的計數(shù)器減去對應(yīng)的數(shù)值,在釋放資源時重新加回來。當(dāng)遇到信號量資源不夠時嘗試獲取的線程就會進(jìn)入休眠,等待其他線程釋放歸還信號量。如果信號量是只有0和1的二進(jìn)位信號量,那么,它的 P/V 就和互斥鎖的 Lock/Unlock 一樣了。
# Go語言中的信號量表示
Go 內(nèi)部使用信號量來控制goroutine的阻塞和喚醒,比如互斥鎖sync.Mutex結(jié)構(gòu)體定義的第二個字段就是一個信號量。
type?Mutex?struct?{
????state?int32
????sema??uint32
}
信號量的PV操作在Go內(nèi)部是通過下面這幾個底層函數(shù)實(shí)現(xiàn)的
func?runtime_Semacquire(s?*uint32)
func?runtime_SemacquireMutex(s?*uint32,?lifo?bool,?skipframes?int)
func?runtime_Semrelease(s?*uint32,?handoff?bool,?skipframes?int)
上面幾個函數(shù)都是Go語言內(nèi)部使用的,我們不能在編程時直接使用。不過Go 語言的擴(kuò)展并發(fā)原語包中提供了帶權(quán)重的信號量 semaphore.Weighted
使用信號量前,需先在項(xiàng)目里安裝golang.org/x/sync/包
安裝方法:go get -u golang.org/x/sync
我們可以按照不同的權(quán)重對資源的訪問進(jìn)行管理,這個結(jié)構(gòu)體對外提供了四個方法:
semaphore.NewWeighted 用于創(chuàng)建新的信號量,通過參數(shù)(n int64) 指定信號量的初始值。
semaphore.Weighted.Acquire 阻塞地獲取指定權(quán)重的資源,如果當(dāng)前沒有空閑資源,就會陷入休眠等待;相當(dāng)于 P 操作,你可以一次獲取多個資源,如果沒有足夠多的資源,調(diào)用者就會被阻塞。它的第一個參數(shù)是 Context,這就意味著,你可以通過 Context 增加超時或者 cancel 的機(jī)制。如果是正常獲取了資源,就返回 ?nil;否則,就返回 ctx.Err(),信號量不改變。
semaphore.Weighted.Release 用于釋放指定權(quán)重的資源;相當(dāng)于 V 操作,可以將 n 個資源釋放,返還給信號量。
semaphore.Weighted.TryAcquire 非阻塞地獲取指定權(quán)重的資源,如果當(dāng)前沒有空閑資源,就會直接返回 false;
# 在Go編程里使用信號量
在實(shí)際應(yīng)用Go語言開發(fā)程序時,有哪些場景適合使用信號量呢?在需要控制訪問資源的線程數(shù)量時就會需要信號量,我來舉個例子幫助你理解。假設(shè)我們有一組要抓取的頁面,資源有限最多允許我們同時執(zhí)行三個抓取任務(wù),當(dāng)同時有三個抓取任務(wù)在執(zhí)行時,在執(zhí)行完一個抓取任務(wù)后才能執(zhí)行下一個排隊等待的任務(wù)。當(dāng)然這個問題用Channel也能解決,不過這次我們使用Go提供的信號量原語來解決這個問題,代碼如下:
package?main
import?(
????"context"
????"fmt"
????"sync"
????"time"
????"golang.org/x/sync/semaphore"
)
func?doSomething(u?string)?{//?模擬抓取任務(wù)的執(zhí)行
????fmt.Println(u)
????time.Sleep(2?*?time.Second)
}
const?(
????Limit??=?3?//?同時并行運(yùn)行的goroutine上限
????Weight?=?1?//?每個goroutine獲取信號量資源的權(quán)重
)
func?main()?{
????urls?:=?[]string{
????????"http://www.example.com",
????????"http://www.example.net",
????????"http://www.example.net/foo",
????????"http://www.example.net/bar",
????????"http://www.example.net/baz",
????}
????s?:=?semaphore.NewWeighted(Limit)
????var?w?sync.WaitGroup
????for?_,?u?:=?range?urls?{
????????w.Add(1)
????????go?func(u?string)?{
????????????s.Acquire(context.Background(),?Weight)
????????????doSomething(u)
????????????s.Release(Weight)
????????????w.Done()
????????}(u)
????}
????w.Wait()
????fmt.Println("All?Done")
}
# Go語言信號量的實(shí)現(xiàn)原理
Go語言擴(kuò)展庫中的信號量是使用互斥鎖和List 實(shí)現(xiàn)的。互斥鎖實(shí)現(xiàn)其它字段的保護(hù),而 List 實(shí)現(xiàn)了一個等待隊列,等待者的通知是通過 Channel 的通知機(jī)制實(shí)現(xiàn)的。
?信號量的數(shù)據(jù)結(jié)構(gòu)
我們來看一下信號量semaphore.Weighted的數(shù)據(jù)結(jié)構(gòu):
type?Weighted?struct?{
????size????int64?????????//?最大資源數(shù)
????cur?????int64?????????//?當(dāng)前已被使用的資源
????mu??????sync.Mutex????//?互斥鎖,對字段的保護(hù)
????waiters?list.List?????//?等待隊列
}
size字段用來記錄信號量擁有的最大資源數(shù)。
cur標(biāo)識當(dāng)前已被使用的資源數(shù)。
mu是一個互斥鎖用來提供對其他字段的臨界區(qū)保護(hù)。
waiters表示申請資源時由于可使用資源不夠而陷入阻塞等待的調(diào)用者列表。
?Acquire請求信號量資源
Acquire方法會監(jiān)控資源是否可用,而且還要檢測傳遞進(jìn)來的context.Context對象是否發(fā)送了超時過期或者取消的信號,我們來看一下它的代碼實(shí)現(xiàn):
func?(s?*Weighted)?Acquire(ctx?context.Context,?n?int64)?error?{
????s.mu.Lock()
????//?如果恰好有足夠的資源,也沒有排隊等待獲取資源的goroutine,
????//?將cur加上n后直接返回
????if?s.size-s.cur?>=?n?&&?s.waiters.Len()?==?0?{
??????s.cur?+=?n
??????s.mu.Unlock()
??????return?nil
????}
????//?請求的資源數(shù)大于能提供的最大的資源數(shù)
????//?這個任務(wù)處理不了,走錯誤處理邏輯
????if?n?>?s.size?{
??????s.mu.Unlock()
??????//?依賴ctx的狀態(tài)返回,否則一直等待
??????<-ctx.Done()
??????return?ctx.Err()
????}
????//?現(xiàn)存資源不夠,?需要把調(diào)用者加入到等待隊列中
????//?創(chuàng)建了一個ready?chan,以便被通知喚醒
????ready?:=?make(chan?struct{})
????w?:=?waiter{n:?n,?ready:?ready}
????elem?:=?s.waiters.PushBack(w)
????s.mu.Unlock()
????//?等待
????select?{
????case?<-ctx.Done():?//?context的Done被關(guān)閉
??????err?:=?ctx.Err()
??????s.mu.Lock()
??????select?{
??????case?<-ready:?//?如果被喚醒了,忽略ctx的狀態(tài)
????????err?=?nil
??????default:?//?通知waiter
????????isFront?:=?s.waiters.Front()?==?elem
????????s.waiters.Remove(elem)
????????//?通知其它的waiters,檢查是否有足夠的資源
????????if?isFront?&&?s.size?>?s.cur?{
??????????s.notifyWaiters()
????????}
??????}
??????s.mu.Unlock()
??????return?err
????case?<-ready:?//?等待者被喚醒了
??????return?nil
????}
??}
如果調(diào)用者請求不到信號量的資源就會被加入等待者列表里,這里等待者列表的結(jié)構(gòu)體定義是:
type?waiter?struct?{
?n?????int64
?ready?chan<-?struct{}?//?當(dāng)調(diào)用者可以獲取到信號量資源時,?close調(diào)這個chan
}
包含了兩個字段,調(diào)用者請求的資源數(shù),以及一個ready 通道。ready通道會在調(diào)用者可以被重新喚醒的時候被close調(diào),從而起到通知正在阻塞讀取ready通道的等待者的作用。
?NotifyWaiters 通知等待者
notifyWaiters方法會逐個檢查隊列里等待的調(diào)用者,如果現(xiàn)存資源夠等待者請求的數(shù)量n,或者是沒有等待者了,就返回:
func?(s?*Weighted)?notifyWaiters()?{
????for?{
??????next?:=?s.waiters.Front()
??????if?next?==?nil?{
????????break?//?沒有等待者了,直接返回
??????}
??????w?:=?next.Value.(waiter)
??????if?s.size-s.cur?????????//?如果現(xiàn)有資源不夠隊列頭調(diào)用者請求的資源數(shù),就退出所有等待者會繼續(xù)等待
????????//?這里還是按照先入先出的方式處理是為了避免饑餓
????????break
??????}
??????s.cur?+=?w.n
??????s.waiters.Remove(next)
??????close(w.ready)
????}
??}
notifyWaiters方法是按照先入先出的方式喚醒調(diào)用者。當(dāng)釋放 100 個資源的時候,如果第一個等待者需要 101 個資源,那么,隊列中的所有等待者都會繼續(xù)等待,即使隊列后面有的等待者只需要 1 個資源。這樣做的目的是避免饑餓,否則的話,資源可能總是被那些請求資源數(shù)小的調(diào)用者獲取,這樣一來,請求資源數(shù)巨大的調(diào)用者,就沒有機(jī)會獲得資源了。
?Release歸還信號量資源
Release方法就很簡單了,它將當(dāng)前計數(shù)值減去釋放的資源數(shù) n,并調(diào)用notifyWaiters方法,嘗試喚醒等待隊列中的調(diào)用者,看是否有足夠的資源被獲取。
func?(s?*Weighted)?Release(n?int64)?{
????s.mu.Lock()
????s.cur?-=?n
????if?s.cur?0?{
??????s.mu.Unlock()
??????panic("semaphore:?released?more?than?held")
????}
????s.notifyWaiters()
????s.mu.Unlock()
}
# 總結(jié)
在Go語言中信號量有時候也會被Channel類型所取代,因?yàn)橐粋€ buffered chan 也可以代表 n 個資源。不過既然Go語言通過golang.orgx/sync擴(kuò)展庫對外提供了semaphore.Weight這一種信號量實(shí)現(xiàn),遇到使用信號量的場景時還是盡量使用官方提供的實(shí)現(xiàn)。在使用的過程中我們需要注意以下的幾個問題:
Acquire 和 TryAcquire方法都可以用于獲取資源,前者會阻塞地獲取信號量。后者會非阻塞地獲取信號量,如果獲取不到就返回false。
Release歸還信號量后,會以先進(jìn)先出的順序喚醒等待隊列中的調(diào)用者。如果現(xiàn)有資源不夠處于等待隊列前面的調(diào)用者請求的資源數(shù),所有等待者會繼續(xù)等待。
如果一個goroutine申請較多的資源,由于上面說的歸還后喚醒等待者的策略,它可能會等待比較長的時間。
好啦,以上就是今天的推薦。
這里不得不吹一下本文作者 KevinYan 大佬,輸出能力非常強(qiáng)悍,接近日更的存在,關(guān)鍵是文章質(zhì)量有保證,令人折服。
如果你喜歡他的文章風(fēng)格,歡迎大家點(diǎn)擊下面小卡片,去關(guān)注他。
