<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Go官方設(shè)計(jì)了一個(gè)信號(hào)量庫,真好用!

          共 10879字,需瀏覽 22分鐘

           ·

          2021-09-19 12:46

          # 前言

          在寫上一篇文章請(qǐng)勿濫用goroutine時(shí),發(fā)現(xiàn)Go語言擴(kuò)展包提供了一個(gè)帶權(quán)重的信號(hào)量庫Semaphore,使用信號(hào)量我們可以實(shí)現(xiàn)一個(gè)"工作池"控制一定數(shù)量的goroutine并發(fā)工作。因?yàn)閷?duì)源碼抱有好奇的態(tài)度,所以在周末仔細(xì)看了一下這個(gè)庫并進(jìn)行了解析,在這里記錄一下。

          # 何為信號(hào)量

          要想知道一個(gè)東西是什么,我都愛去百度百科上搜一搜,輸入"信號(hào)量",這答案不就來了。

          百度百科解釋:

          信號(hào)量(Semaphore),有時(shí)被稱為信號(hào)燈,是[多線程環(huán)境下使用的一種設(shè)施,是可以用來保證兩個(gè)或多個(gè)關(guān)鍵代碼段不被并發(fā)調(diào)用。在進(jìn)入一個(gè)關(guān)鍵代碼段之前,線程必須獲取一個(gè)信號(hào)量;一旦該關(guān)鍵代碼段完成了,那么該線程必須釋放信號(hào)量。其它想進(jìn)入該關(guān)鍵代碼段的線程必須等待直到第一個(gè)線程釋放信號(hào)量。為了完成這個(gè)過程,需要?jiǎng)?chuàng)建一個(gè)信號(hào)量VI,然后將Acquire Semaphore VI以及Release Semaphore VI分別放置在每個(gè)關(guān)鍵代碼段的首末端。確認(rèn)這些信號(hào)量VI引用的是初始創(chuàng)建的信號(hào)量。

          通過這段解釋我們可以得知什么是信號(hào)量,其實(shí)信號(hào)量就是一種變量或者抽象數(shù)據(jù)類型,用于控制并發(fā)系統(tǒng)中多個(gè)進(jìn)程對(duì)公共資源的訪問,訪問具有原子性。信號(hào)量主要分為兩類:

          • 二值信號(hào)量:顧名思義,其值只有兩種0或者1,相當(dāng)于互斥量,當(dāng)值為1時(shí)資源可用,當(dāng)值為0時(shí),資源被鎖住,進(jìn)程阻塞無法繼續(xù)執(zhí)行。

          • 計(jì)數(shù)信號(hào)量:信號(hào)量是一個(gè)任意的整數(shù),起始時(shí),如果計(jì)數(shù)器的計(jì)數(shù)值為0,那么創(chuàng)建出來的信號(hào)量就是不可獲得的狀態(tài),如果計(jì)數(shù)器的計(jì)數(shù)值大于0,那么創(chuàng)建出來的信號(hào)量就是可獲得的狀態(tài),并且總共獲取的次數(shù)等于計(jì)數(shù)器的值。

          # 信號(hào)量工作原理

          信號(hào)量是由操作系統(tǒng)來維護(hù)的,信號(hào)量只能進(jìn)行兩種操作等待和發(fā)送信號(hào),操作總結(jié)來說,核心就是PV操作:

          • P原語:P是荷蘭語Proberen(測(cè)試)的首字母。為阻塞原語,負(fù)責(zé)把當(dāng)前進(jìn)程由運(yùn)行狀態(tài)轉(zhuǎn)換為阻塞狀態(tài),直到另外一個(gè)進(jìn)程喚醒它。操作為:申請(qǐng)一個(gè)空閑資源(把信號(hào)量減1),若成功,則退出;若失敗,則該進(jìn)程被阻塞;

          • V原語:V是荷蘭語Verhogen(增加)的首字母。為喚醒原語,負(fù)責(zé)把一個(gè)被阻塞的進(jìn)程喚醒,它有一個(gè)參數(shù)表,存放著等待被喚醒的進(jìn)程信息。操作為:釋放一個(gè)被占用的資源(把信號(hào)量加1),如果發(fā)現(xiàn)有被阻塞的進(jìn)程,則選擇一個(gè)喚醒之。

          在信號(hào)量進(jìn)行PV操作時(shí)都為原子操作,并且在PV原語執(zhí)行期間不允許有中斷的發(fā)生。

          PV原語對(duì)信號(hào)量的操作可以分為三種情況:

          • 把信號(hào)量視為某種類型的共享資源的剩余個(gè)數(shù),實(shí)現(xiàn)對(duì)一類共享資源的訪問

          • 把信號(hào)量用作進(jìn)程間的同步

          • 視信號(hào)量為一個(gè)加鎖標(biāo)志,實(shí)現(xiàn)對(duì)一個(gè)共享變量的訪問

          具體在什么場(chǎng)景使用本文就不在繼續(xù)分析,接下來我們重點(diǎn)來看一下Go語言提供的擴(kuò)展包Semaphore,看看它是怎樣實(shí)現(xiàn)的。

          # 官方擴(kuò)展包`Semaphore`

          我們之前在分析Go語言源碼時(shí)總會(huì)看到這幾個(gè)函數(shù):

          func runtime_Semacquire(s *uint32)
          func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
          func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

          這幾個(gè)函數(shù)就是信號(hào)量的PV操作,不過他們都是給Go內(nèi)部使用的,如果想使用信號(hào)量,那就可以使用官方的擴(kuò)展包:Semaphore,這是一個(gè)帶權(quán)重的信號(hào)量,接下來我們就重點(diǎn)分析一下這個(gè)庫。

          安裝方法:go get -u golang.org/x/sync

           數(shù)據(jù)結(jié)構(gòu)

          type Weighted struct {
           size    int64 // 設(shè)置一個(gè)最大權(quán)值
           cur     int64 // 標(biāo)識(shí)當(dāng)前已被使用的資源數(shù)
           mu      sync.Mutex // 提供臨界區(qū)保護(hù)
           waiters list.List // 阻塞等待的調(diào)用者列表
          }

          semaphore庫核心結(jié)構(gòu)就是Weighted,主要有4個(gè)字段:

          • size:這個(gè)代表的是最大權(quán)值,在創(chuàng)建Weighted對(duì)象指定

          • cur:相當(dāng)于一個(gè)游標(biāo),來記錄當(dāng)前已使用的權(quán)值

          • mu:互斥鎖,并發(fā)情況下做臨界區(qū)保護(hù)

          • waiters:阻塞等待的調(diào)用者列表,使用鏈表數(shù)據(jù)結(jié)構(gòu)保證先進(jìn)先出的順序,存儲(chǔ)的數(shù)據(jù)是waiter對(duì)象,waiter數(shù)據(jù)結(jié)構(gòu)如下:

          type waiter struct {
           n     int64 // 等待調(diào)用者權(quán)重值
           ready chan<- struct{} // close channel就是喚醒
          }

          這里只有兩個(gè)字段:

          • n:這個(gè)就是等待調(diào)用者的權(quán)重值

          • ready:這就是一個(gè)channel,利用channelclose機(jī)制實(shí)現(xiàn)喚醒

          semaphore還提供了一個(gè)創(chuàng)建Weighted對(duì)象的方法,在初始化時(shí)需要給定最大權(quán)值:

          // NewWeighted為并發(fā)訪問創(chuàng)建一個(gè)新的加權(quán)信號(hào)量,該信號(hào)量具有給定的最大權(quán)值。
          func NewWeighted(n int64) *Weighted {
           w := &Weighted{size: n}
           return w
          }

           阻塞獲取權(quán)值的方法 - Acquire

          先直接看代碼吧:

          func (s *Weighted) Acquire(ctx context.Context, n int64) error {
           s.mu.Lock() // 加鎖保護(hù)臨界區(qū)
           // 有資源可用并且沒有等待獲取權(quán)值的goroutine
           if s.size-s.cur >= n && s.waiters.Len() == 0 {
            s.cur += n // 加權(quán)
            s.mu.Unlock() // 釋放鎖
            return nil
           }
           // 要獲取的權(quán)值n大于最大的權(quán)值了
           if n > s.size {
            // 先釋放鎖,確保其他goroutine調(diào)用Acquire的地方不被阻塞
            s.mu.Unlock()
            // 阻塞等待context的返回
            <-ctx.Done()
            return ctx.Err()
           }
           // 走到這里就說明現(xiàn)在沒有資源可用了
           // 創(chuàng)建一個(gè)channel用來做通知喚醒
           ready := make(chan struct{})
           // 創(chuàng)建waiter對(duì)象
           w := waiter{n: n, ready: ready}
           // waiter按順序入隊(duì)
           elem := s.waiters.PushBack(w)
           // 釋放鎖,等待喚醒,別阻塞其他goroutine
           s.mu.Unlock()

           // 阻塞等待喚醒
           select {
           // context關(guān)閉
           case <-ctx.Done():
            err := ctx.Err() // 先獲取context的錯(cuò)誤信息
            s.mu.Lock()
            select {
            case <-ready:
             // 在context被關(guān)閉后被喚醒了,那么試圖修復(fù)隊(duì)列,假裝我們沒有取消
             err = nil
            default:
             // 判斷是否是第一個(gè)元素
             isFront := s.waiters.Front() == elem
             // 移除第一個(gè)元素
             s.waiters.Remove(elem)
             // 如果是第一個(gè)元素且有資源可用通知其他waiter
             if isFront && s.size > s.cur {
              s.notifyWaiters()
             }
            }
            s.mu.Unlock()
            return err
           // 被喚醒了
           case <-ready:
            return nil
           }
          }

          注釋已經(jīng)加到代碼中了,總結(jié)一下這個(gè)方法主要有三個(gè)流程:

          • 流程一:有資源可用時(shí)并且沒有等待權(quán)值的goroutine,走正常加權(quán)流程;

          • 流程二:想要獲取的權(quán)值n大于初始化時(shí)設(shè)置最大的權(quán)值了,這個(gè)goroutine永遠(yuǎn)不會(huì)獲取到信號(hào)量,所以阻塞等待context的關(guān)閉;

          • 流程三:前兩步都沒問題的話,就說明現(xiàn)在系統(tǒng)沒有資源可用了,這時(shí)就需要阻塞等待喚醒,在阻塞等待喚醒這里有特殊邏輯;

          • -   特殊邏輯二:context關(guān)閉后,則根據(jù)是否有可用資源決定通知后面等待喚醒的調(diào)用者,這樣做的目的其實(shí)是為了避免當(dāng)不同的context控制不同的goroutine時(shí),未關(guān)閉的goroutine不會(huì)被阻塞住,依然執(zhí)行,來看這樣一個(gè)例子(因?yàn)?code style="line-height: inherit;overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(226, 36, 70);background: rgb(248, 248, 248);">goroutine的搶占式調(diào)度,所以這個(gè)例子也會(huì)具有偶然性):

            • 特殊邏輯一:如果在context被關(guān)閉后被喚醒了,那么就先忽略掉這個(gè)cancel,試圖修復(fù)隊(duì)列。

              func main()  {
               s := semaphore.NewWeighted(3)
               ctx,cancel := context.WithTimeout(context.Background(), time.Second * 2)
               defer cancel()

               for i :=0; i < 3; i++{
                 if i != 0{
                  go func(num int) {
                   if err := s.Acquire(ctx,3); err != nil{
                    fmt.Printf("goroutine: %d, err is %s\n", num, err.Error())
                    return
                   }
                   time.Sleep(2 * time.Second)
                   fmt.Printf("goroutine: %d run over\n",num)
                   s.Release(3)
              }(i)
                 }else {
                  go func(num int) {
                   ct,cancel := context.WithTimeout(context.Background(), time.Second * 3)
                   defer cancel()
                   if err := s.Acquire(ct,3); err != nil{
                    fmt.Printf("goroutine: %d, err is %s\n", num, err.Error())
                    return
                   }
                   time.Sleep(3 * time.Second)
                   fmt.Printf("goroutine: %d run over\n",num)
                   s.Release(3)
                  }(i)
                 }

               }
               time.Sleep(10 * time.Second)
              }

              上面的例子中goroutine:0 使用ct對(duì)象來做控制,超時(shí)時(shí)間為3s,goroutine:1goroutine:2對(duì)象使用ctx對(duì)象來做控制,超時(shí)時(shí)間為2s,這三個(gè)goroutine占用的資源都等于最大資源數(shù),也就是說只能有一個(gè)goruotine運(yùn)行成功,另外兩個(gè)goroutine都會(huì)被阻塞,因?yàn)?code style="line-height: inherit;overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(226, 36, 70);background: rgb(248, 248, 248);">goroutine是搶占式調(diào)度,所以我們不能確定哪個(gè)gouroutine會(huì)第一個(gè)被執(zhí)行,這里我們假設(shè)第一個(gè)獲取到信號(hào)量的是gouroutine:2,阻塞等待的調(diào)用者列表順序是:goroutine:1 -> goroutine:0,因?yàn)樵?code style="line-height: inherit;overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(226, 36, 70);background: rgb(248, 248, 248);">goroutine:2中有一個(gè)2s的延時(shí),所以會(huì)觸發(fā)ctx的超時(shí),ctx會(huì)下發(fā)Done信號(hào),因?yàn)?code style="line-height: inherit;overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(226, 36, 70);background: rgb(248, 248, 248);">goroutine:2和goroutine:1都是被ctx控制的,所以就會(huì)把goroutine:1從等待者隊(duì)列中取消,但是因?yàn)?code style="line-height: inherit;overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(226, 36, 70);background: rgb(248, 248, 248);">goroutine:1屬于隊(duì)列的第一個(gè)隊(duì)員,并且因?yàn)?code style="line-height: inherit;overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(226, 36, 70);background: rgb(248, 248, 248);">goroutine:2已經(jīng)釋放資源,那么就會(huì)喚醒goroutine:0繼續(xù)執(zhí)行,畫個(gè)圖表示一下:

              圖片

              使用這種方式可以避免goroutine永久失眠。

           不阻塞獲取權(quán)值的方法 - TryAcquir

          func (s *Weighted) TryAcquire(n int64) bool {
           s.mu.Lock() // 加鎖
           // 有資源可用并且沒有等待獲取資源的goroutine
           success := s.size-s.cur >= n && s.waiters.Len() == 0
           if success {
            s.cur += n
           }
           s.mu.Unlock()
           return success
          }

          這個(gè)方法就簡(jiǎn)單很多了,不阻塞地獲取權(quán)重為n的信號(hào)量,成功時(shí)返回true,失敗時(shí)返回false并保持信號(hào)量不變。

           釋放權(quán)重

          func (s *Weighted) Release(n int64) {
           s.mu.Lock()
           // 釋放資源
           s.cur -= n
           // 釋放資源大于持有的資源,則會(huì)發(fā)生panic
           if s.cur < 0 {
            s.mu.Unlock()
            panic("semaphore: released more than held")
           }
           // 通知其他等待的調(diào)用者
           s.notifyWaiters()
           s.mu.Unlock()
          }

          這里就是很常規(guī)的操作,主要就是資源釋放,同時(shí)進(jìn)行安全性判斷,如果釋放資源大于持有的資源,則會(huì)發(fā)生panic。

           喚醒waiter

          AcquireRelease方法中都調(diào)用了notifyWaiters,我們來分析一下這個(gè)方法:

          func (s *Weighted) notifyWaiters() {
           for {
            // 獲取等待調(diào)用者隊(duì)列中的隊(duì)員
            next := s.waiters.Front()
            // 沒有要通知的調(diào)用者了
            if next == nil {
             break // No more waiters blocked.
            }

            // 斷言出waiter信息
            w := next.Value.(waiter)
            if s.size-s.cur < w.n {
             // 沒有足夠資源為下一個(gè)調(diào)用者使用時(shí),繼續(xù)阻塞該調(diào)用者,遵循先進(jìn)先出的原則,
             // 避免需要資源數(shù)比較大的waiter被餓死
             //
             // 考慮一個(gè)場(chǎng)景,使用信號(hào)量作為讀寫鎖,現(xiàn)有N個(gè)令牌,N個(gè)reader和一個(gè)writer
             // 每個(gè)reader都可以通過Acquire(1)獲取讀鎖,writer寫入可以通過Acquire(N)獲得寫鎖定
             // 但不包括所有的reader,如果我們?cè)试Sreader在隊(duì)列中前進(jìn),writer將會(huì)餓死-總是有一個(gè)令牌可供每個(gè)reader
             break
            }

            // 獲取資源
            s.cur += w.n
            // 從waiter列表中移除
            s.waiters.Remove(next)
            // 使用channel的close機(jī)制喚醒waiter
            close(w.ready)
           }
          }

          這里只需要注意一個(gè)點(diǎn):?jiǎn)拘?code style="font-size: inherit;line-height: inherit;overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(226, 36, 70);background: rgb(248, 248, 248);">waiter采用先進(jìn)先出的原則,避免需要資源數(shù)比較大的waiter被餓死。

           何時(shí)使用Semaphore

          到這里我們就把Semaphore的源代碼看了一篇,代碼行數(shù)不多,封裝的也很巧妙,那么我們?cè)撌裁磿r(shí)候選擇使用它呢?

          目前能想到一個(gè)場(chǎng)景就是Semaphore配合上errgroup實(shí)現(xiàn)一個(gè)"工作池",使用Semaphore限制goroutine的數(shù)量,配合上errgroup做并發(fā)控制,示例如下:

          const (
           limit = 2


          func main()  {
           serviceName := []string{
            "cart",
            "order",
            "account",
            "item",
            "menu",
           }
           eg,ctx := errgroup.WithContext(context.Background())
           s := semaphore.NewWeighted(limit)
           for index := range serviceName{
            name := serviceName[index]
            if err := s.Acquire(ctx,1); err != nil{
             fmt.Printf("Acquire failed and err is %s\n", err.Error())
             break
            }
            eg.Go(func() error {
             defer s.Release(1)
             return callService(name)
            })
           }

           if err := eg.Wait(); err != nil{
            fmt.Printf("err is %s\n", err.Error())
            return
           }
           fmt.Printf("run success\n")
          }

          func callService(name string) error {
           fmt.Println("call ",name)
           time.Sleep(1 * time.Second)
           return nil
          }

          結(jié)果如下:

          call  order
          call  cart
          call  account
          call  item
          call  menu
          run success

          # 總結(jié)

          本文我們主要賞析了Go官方擴(kuò)展庫Semaphore的實(shí)現(xiàn),他的設(shè)計(jì)思路簡(jiǎn)單,僅僅用幾十行就完成了完美的封裝,值得我們借鑒學(xué)習(xí)。不過在實(shí)際業(yè)務(wù)場(chǎng)景中,我們使用信號(hào)量的場(chǎng)景并不多,大多數(shù)場(chǎng)景我們都可以使用channel來替代,但是有些場(chǎng)景使用Semaphore來實(shí)現(xiàn)會(huì)更好,比如上篇文章【[警惕] 請(qǐng)勿濫用goroutine】我們使用channel+sync來控制goroutine數(shù)量,這種實(shí)現(xiàn)方式并不好,因?yàn)閷?shí)際已經(jīng)起來了多個(gè)goroutine,只不過控制了工作的goroutine數(shù)量,如果改用semaphore實(shí)現(xiàn)才是真正的控制了goroutine數(shù)量。


             


          喜歡明哥文章的同學(xué)
          歡迎長(zhǎng)按下圖訂閱!

          ???

          瀏覽 62
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日本韩国一级 | 欧美综合一级 | 一区二区三区四区无码 | 看无码一区二区三区 | 美女黄网站 |