<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Go 工程師必學(xué) -- 信號量的原理與使用

          共 3677字,需瀏覽 8分鐘

           ·

          2021-11-20 05:53

          # 什么是信號量

          信號量是并發(fā)編程中常見的一種同步機(jī)制,在需要控制訪問資源的線程數(shù)量時就會用到信號量,關(guān)于什么是信號量這個問題,我引用一下維基百科對信號量的解釋,大家就明白了。

          信號量的概念是計算機(jī)科學(xué)家 Dijkstra (Dijkstra算法的發(fā)明者)提出來的,廣泛應(yīng)用在不同的操作系統(tǒng)中。系統(tǒng)中,會給每一個進(jìn)程一個信號量,代表每個進(jìn)程當(dāng)前的狀態(tài),未得到控制權(quán)的進(jìn)程,會在特定的地方被迫停下來,等待可以繼續(xù)進(jìn)行的信號到來。

          如果信號量是一個任意的整數(shù),通常被稱為計數(shù)信號量(Counting semaphore),或一般信號量(general semaphore);如果信號量只有二進(jìn)制的0或1,稱為二進(jìn)制信號量(binary semaphore)。在linux系統(tǒng)中,二進(jìn)制信號量(binary semaphore)又稱互斥鎖(Mutex)

          計數(shù)信號量具備兩種操作動作,稱為V(signal())與P(wait())(即部分參考書常稱的“PV操作”)。V操作會增加信號量S的數(shù)值,P操作會減少它。

          運(yùn)行方式:

          1. 初始化信號量,給與它一個非負(fù)數(shù)的整數(shù)值。

          2. 運(yùn)行P( wait()),信號量S的值將被減少。企圖進(jìn)入臨界區(qū)的進(jìn)程,需要先運(yùn)行P(wait())。當(dāng)信號量S減為負(fù)值時,進(jìn)程會被阻塞住,不能繼續(xù);當(dāng)信號量S不為負(fù)值時,進(jìn)程可以獲準(zhǔn)進(jìn)入臨界區(qū)。

          3. 運(yùn)行V(signal()),信號量S的值會被增加。結(jié)束離開臨界區(qū)的進(jìn)程,將會運(yùn)行V(signal())。當(dāng)信號量S不為負(fù)值時,先前被阻塞住的其他進(jìn)程,將可獲準(zhǔn)進(jìn)入臨界區(qū)。

          我們一般用信號量保護(hù)一組資源,比如數(shù)據(jù)庫連接池、一組客戶端的連接等等。每次獲取資源時都會將信號量中的計數(shù)器減去對應(yīng)的數(shù)值,在釋放資源時重新加回來。當(dāng)遇到信號量資源不夠時嘗試獲取的線程就會進(jìn)入休眠,等待其他線程釋放歸還信號量。如果信號量是只有0和1的二進(jìn)位信號量,那么,它的 P/V 就和互斥鎖的 Lock/Unlock 一樣了。

          # Go語言中的信號量表示

          Go 內(nèi)部使用信號量來控制goroutine的阻塞和喚醒,比如互斥鎖sync.Mutex結(jié)構(gòu)體定義的第二個字段就是一個信號量。

          type?Mutex?struct?{
          ????state?int32
          ????sema??uint32
          }

          信號量的PV操作在Go內(nèi)部是通過下面這幾個底層函數(shù)實(shí)現(xiàn)的

          func?runtime_Semacquire(s?*uint32)
          func?runtime_SemacquireMutex(s?*uint32,?lifo?bool,?skipframes?int)
          func?runtime_Semrelease(s?*uint32,?handoff?bool,?skipframes?int)

          上面幾個函數(shù)都是Go語言內(nèi)部使用的,我們不能在編程時直接使用。不過Go 語言的擴(kuò)展并發(fā)原語包中提供了帶權(quán)重的信號量 semaphore.Weighted

          使用信號量前,需先在項(xiàng)目里安裝golang.org/x/sync/

          安裝方法:go get -u golang.org/x/sync

          我們可以按照不同的權(quán)重對資源的訪問進(jìn)行管理,這個結(jié)構(gòu)體對外提供了四個方法:

          • semaphore.NewWeighted 用于創(chuàng)建新的信號量,通過參數(shù)(n int64) 指定信號量的初始值。

          • semaphore.Weighted.Acquire 阻塞地獲取指定權(quán)重的資源,如果當(dāng)前沒有空閑資源,就會陷入休眠等待;相當(dāng)于 P 操作,你可以一次獲取多個資源,如果沒有足夠多的資源,調(diào)用者就會被阻塞。它的第一個參數(shù)是 Context,這就意味著,你可以通過 Context 增加超時或者 cancel 的機(jī)制。如果是正常獲取了資源,就返回 ?nil;否則,就返回 ctx.Err(),信號量不改變。

          • semaphore.Weighted.Release 用于釋放指定權(quán)重的資源;相當(dāng)于 V 操作,可以將 n 個資源釋放,返還給信號量。

          • semaphore.Weighted.TryAcquire 非阻塞地獲取指定權(quán)重的資源,如果當(dāng)前沒有空閑資源,就會直接返回 false;

          # 在Go編程里使用信號量

          在實(shí)際應(yīng)用Go語言開發(fā)程序時,有哪些場景適合使用信號量呢?在需要控制訪問資源的線程數(shù)量時就會需要信號量,我來舉個例子幫助你理解。假設(shè)我們有一組要抓取的頁面,資源有限最多允許我們同時執(zhí)行三個抓取任務(wù),當(dāng)同時有三個抓取任務(wù)在執(zhí)行時,在執(zhí)行完一個抓取任務(wù)后才能執(zhí)行下一個排隊等待的任務(wù)。當(dāng)然這個問題用Channel也能解決,不過這次我們使用Go提供的信號量原語來解決這個問題,代碼如下:

          package?main

          import?(
          ????"context"
          ????"fmt"
          ????"sync"
          ????"time"

          ????"golang.org/x/sync/semaphore"
          )

          func?doSomething(u?string)?{//?模擬抓取任務(wù)的執(zhí)行
          ????fmt.Println(u)
          ????time.Sleep(2?*?time.Second)
          }

          const?(
          ????Limit??=?3?//?同時并行運(yùn)行的goroutine上限
          ????Weight?=?1?//?每個goroutine獲取信號量資源的權(quán)重
          )

          func?main()?{
          ????urls?:=?[]string{
          ????????"http://www.example.com",
          ????????"http://www.example.net",
          ????????"http://www.example.net/foo",
          ????????"http://www.example.net/bar",
          ????????"http://www.example.net/baz",
          ????}
          ????s?:=?semaphore.NewWeighted(Limit)
          ????var?w?sync.WaitGroup
          ????for?_,?u?:=?range?urls?{
          ????????w.Add(1)
          ????????go?func(u?string)?{
          ????????????s.Acquire(context.Background(),?Weight)
          ????????????doSomething(u)
          ????????????s.Release(Weight)
          ????????????w.Done()
          ????????}(u)
          ????}
          ????w.Wait()

          ????fmt.Println("All?Done")
          }

          # Go語言信號量的實(shí)現(xiàn)原理

          Go語言擴(kuò)展庫中的信號量是使用互斥鎖和List 實(shí)現(xiàn)的。互斥鎖實(shí)現(xiàn)其它字段的保護(hù),而 List 實(shí)現(xiàn)了一個等待隊列,等待者的通知是通過 Channel 的通知機(jī)制實(shí)現(xiàn)的。

          ?信號量的數(shù)據(jù)結(jié)構(gòu)

          我們來看一下信號量semaphore.Weighted的數(shù)據(jù)結(jié)構(gòu):

          type?Weighted?struct?{
          ????size????int64?????????//?最大資源數(shù)
          ????cur?????int64?????????//?當(dāng)前已被使用的資源
          ????mu??????sync.Mutex????//?互斥鎖,對字段的保護(hù)
          ????waiters?list.List?????//?等待隊列
          }
          • size字段用來記錄信號量擁有的最大資源數(shù)。

          • cur標(biāo)識當(dāng)前已被使用的資源數(shù)。

          • mu是一個互斥鎖用來提供對其他字段的臨界區(qū)保護(hù)。

          • waiters表示申請資源時由于可使用資源不夠而陷入阻塞等待的調(diào)用者列表。

          ?Acquire請求信號量資源

          Acquire方法會監(jiān)控資源是否可用,而且還要檢測傳遞進(jìn)來的context.Context對象是否發(fā)送了超時過期或者取消的信號,我們來看一下它的代碼實(shí)現(xiàn):

          func?(s?*Weighted)?Acquire(ctx?context.Context,?n?int64)?error?{
          ????s.mu.Lock()
          ????//?如果恰好有足夠的資源,也沒有排隊等待獲取資源的goroutine,
          ????//?將cur加上n后直接返回
          ????if?s.size-s.cur?>=?n?&&?s.waiters.Len()?==?0?{
          ??????s.cur?+=?n
          ??????s.mu.Unlock()
          ??????return?nil
          ????}

          ????//?請求的資源數(shù)大于能提供的最大的資源數(shù)
          ????//?這個任務(wù)處理不了,走錯誤處理邏輯
          ????if?n?>?s.size?{
          ??????s.mu.Unlock()
          ??????//?依賴ctx的狀態(tài)返回,否則一直等待
          ??????<-ctx.Done()
          ??????return?ctx.Err()
          ????}
          ????//?現(xiàn)存資源不夠,?需要把調(diào)用者加入到等待隊列中
          ????//?創(chuàng)建了一個ready?chan,以便被通知喚醒
          ????ready?:=?make(chan?struct{})
          ????w?:=?waiter{n:?n,?ready:?ready}
          ????elem?:=?s.waiters.PushBack(w)
          ????s.mu.Unlock()


          ????//?等待
          ????select?{
          ????case?<-ctx.Done():?//?context的Done被關(guān)閉
          ??????err?:=?ctx.Err()
          ??????s.mu.Lock()
          ??????select?{
          ??????case?<-ready:?//?如果被喚醒了,忽略ctx的狀態(tài)
          ????????err?=?nil
          ??????default:?//?通知waiter
          ????????isFront?:=?s.waiters.Front()?==?elem
          ????????s.waiters.Remove(elem)
          ????????//?通知其它的waiters,檢查是否有足夠的資源
          ????????if?isFront?&&?s.size?>?s.cur?{
          ??????????s.notifyWaiters()
          ????????}
          ??????}
          ??????s.mu.Unlock()
          ??????return?err
          ????case?<-ready:?//?等待者被喚醒了
          ??????return?nil
          ????}
          ??}

          如果調(diào)用者請求不到信號量的資源就會被加入等待者列表里,這里等待者列表的結(jié)構(gòu)體定義是:

          type?waiter?struct?{
          ?n?????int64
          ?ready?chan<-?struct{}?//?當(dāng)調(diào)用者可以獲取到信號量資源時,?close調(diào)這個chan
          }

          包含了兩個字段,調(diào)用者請求的資源數(shù),以及一個ready 通道。ready通道會在調(diào)用者可以被重新喚醒的時候被close調(diào),從而起到通知正在阻塞讀取ready通道的等待者的作用。

          ?NotifyWaiters 通知等待者

          notifyWaiters方法會逐個檢查隊列里等待的調(diào)用者,如果現(xiàn)存資源夠等待者請求的數(shù)量n,或者是沒有等待者了,就返回:

          func?(s?*Weighted)?notifyWaiters()?{
          ????for?{
          ??????next?:=?s.waiters.Front()
          ??????if?next?==?nil?{
          ????????break?//?沒有等待者了,直接返回
          ??????}


          ??????w?:=?next.Value.(waiter)
          ??????if?s.size-s.cur?????????//?如果現(xiàn)有資源不夠隊列頭調(diào)用者請求的資源數(shù),就退出所有等待者會繼續(xù)等待
          ????????//?這里還是按照先入先出的方式處理是為了避免饑餓
          ????????break
          ??????}

          ??????s.cur?+=?w.n
          ??????s.waiters.Remove(next)
          ??????close(w.ready)
          ????}
          ??}

          notifyWaiters方法是按照先入先出的方式喚醒調(diào)用者。當(dāng)釋放 100 個資源的時候,如果第一個等待者需要 101 個資源,那么,隊列中的所有等待者都會繼續(xù)等待,即使隊列后面有的等待者只需要 1 個資源。這樣做的目的是避免饑餓,否則的話,資源可能總是被那些請求資源數(shù)小的調(diào)用者獲取,這樣一來,請求資源數(shù)巨大的調(diào)用者,就沒有機(jī)會獲得資源了。

          ?Release歸還信號量資源

          Release方法就很簡單了,它將當(dāng)前計數(shù)值減去釋放的資源數(shù) n,并調(diào)用notifyWaiters方法,嘗試喚醒等待隊列中的調(diào)用者,看是否有足夠的資源被獲取。

          func?(s?*Weighted)?Release(n?int64)?{
          ????s.mu.Lock()
          ????s.cur?-=?n
          ????if?s.cur?0?{
          ??????s.mu.Unlock()
          ??????panic("semaphore:?released?more?than?held")
          ????}
          ????s.notifyWaiters()
          ????s.mu.Unlock()
          }

          # 總結(jié)

          Go語言中信號量有時候也會被Channel類型所取代,因?yàn)橐粋€ buffered chan 也可以代表 n 個資源。不過既然Go語言通過golang.orgx/sync擴(kuò)展庫對外提供了semaphore.Weight這一種信號量實(shí)現(xiàn),遇到使用信號量的場景時還是盡量使用官方提供的實(shí)現(xiàn)。在使用的過程中我們需要注意以下的幾個問題:

          • Acquire 和 TryAcquire方法都可以用于獲取資源,前者會阻塞地獲取信號量。后者會非阻塞地獲取信號量,如果獲取不到就返回false。

          • Release歸還信號量后,會以先進(jìn)先出的順序喚醒等待隊列中的調(diào)用者。如果現(xiàn)有資源不夠處于等待隊列前面的調(diào)用者請求的資源數(shù),所有等待者會繼續(xù)等待。

          • 如果一個goroutine申請較多的資源,由于上面說的歸還后喚醒等待者的策略,它可能會等待比較長的時間。


          好啦,以上就是今天的推薦。

          這里不得不吹一下本文作者 KevinYan 大佬,輸出能力非常強(qiáng)悍,接近日更的存在,關(guān)鍵是文章質(zhì)量有保證,令人折服。

          如果你喜歡他的文章風(fēng)格,歡迎大家點(diǎn)擊下面小卡片,去關(guān)注他。

          瀏覽 42
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  五月天婷婷色网 | 日本精品一字幕 | 韩国一区二区无码视频 | 中文字幕第315页 | 北条麻妃视频一区 |