<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Go 每日一庫(kù)之 ants 源碼賞析

          共 26895字,需瀏覽 54分鐘

           ·

          2021-07-13 22:22

          簡(jiǎn)介

          繼上一篇Go 每日一庫(kù)之 ants,這篇文章我們來(lái)一起看看ants的源碼。

          Pool

          通過(guò)上篇文章,我們知道ants池有兩種創(chuàng)建方式:

          • p, _ := ants.NewPool(cap):這種方式創(chuàng)建的池子對(duì)象需要調(diào)用p.Submit(task)提交任務(wù),任務(wù)是一個(gè)無(wú)參數(shù)無(wú)返回值的函數(shù);
          • p, _ := ants.NewPoolWithFunc(cap, func(interface{})):這種方式創(chuàng)建的池子對(duì)象需要指定池函數(shù),并且使用p.Invoke(arg)調(diào)用池函數(shù)。arg就是傳給池函數(shù)func(interface{})的參數(shù)。

          ants中這兩種池子使用不同的結(jié)構(gòu)來(lái)表示:ants.Poolants.PoolWithFunc。我們先來(lái)介紹Pool。PoolWithFunc結(jié)構(gòu)也是類似的,介紹完Pool之后,我們?cè)俸?jiǎn)單比較一下它們。

          Pool結(jié)構(gòu)定義在文件pool.go中:

          // src/github.com/panjf2000/ants/pool.go
          type Pool struct {
            capacity int32
            running int32
            workers workerArray
            state int32
            lock sync.Locker
            cond *sync.Cond
            workerCache sync.Pool
            blockingNum int
            options *Options
          }

          各個(gè)字段含義如下:

          • capacity:池容量,表示ants最多能創(chuàng)建的 goroutine 數(shù)量。如果為負(fù)數(shù),表示容量無(wú)限制;
          • running:已經(jīng)創(chuàng)建的 worker goroutine 的數(shù)量;
          • workers:存放一組 worker 對(duì)象,workerArray只是一個(gè)接口,表示一個(gè) worker 容器,后面詳述;
          • state:記錄池子當(dāng)前的狀態(tài),是否已關(guān)閉(CLOSED);
          • lock:鎖。ants自己實(shí)現(xiàn)了一個(gè)自旋鎖。用于同步并發(fā)操作;
          • cond:條件變量。處理任務(wù)等待和喚醒;
          • workerCache:使用sync.Pool對(duì)象池管理和創(chuàng)建worker對(duì)象,提升性能;
          • blockingNum:阻塞等待的任務(wù)數(shù)量;
          • options:選項(xiàng)。上一篇文章已經(jīng)詳細(xì)介紹過(guò)了。

          這里明確一個(gè)概念,ants中為每個(gè)任務(wù)都是由 worker 對(duì)象來(lái)處理的,每個(gè) worker 對(duì)象會(huì)對(duì)應(yīng)創(chuàng)建一個(gè) goroutine 來(lái)處理任務(wù)。ants中使用goWorker表示 worker:

          // src/github.com/panjf2000/ants/worker.go
          type goWorker struct {
            pool *Pool
            task chan func()
            recycleTime time.Time
          }

          后文詳細(xì)介紹這一塊內(nèi)容,現(xiàn)在我們只需要知道Pool.workers字段就是存放goWorker對(duì)象的容器。

          Pool創(chuàng)建

          創(chuàng)建Pool對(duì)象需調(diào)用ants.NewPool(size, options)函數(shù)。省略了一些處理選項(xiàng)的代碼,最終代碼如下:

          // src/github.com/panjf2000/ants/pool.go
          func NewPool(size int, options ...Option) (*Pool, error) {
            // ...
            p := &Pool{
              capacity: int32(size),
              lock:     internal.NewSpinLock(),
              options:  opts,
            }
            p.workerCache.New = func() interface{} {
              return &goWorker{
                pool: p,
                task: make(chan func()workerChanCap),
              }
            }
            if p.options.PreAlloc {
              if size == -1 {
                return nil, ErrInvalidPreAllocSize
              }
              p.workers = newWorkerArray(loopQueueType, size)
            } else {
              p.workers = newWorkerArray(stackType, 0)
            }

            p.cond = sync.NewCond(p.lock)

            go p.purgePeriodically()
            return p, nil
          }

          代碼不難理解:

          • 創(chuàng)建Pool對(duì)象,設(shè)置容量,創(chuàng)建一個(gè)自旋鎖來(lái)初始化lock字段,設(shè)置選項(xiàng);
          • 設(shè)置workerCache這個(gè)sync.Pool對(duì)象的New方法,在調(diào)用sync.Pool對(duì)象的Get()方法時(shí),如果它沒(méi)有緩存的 worker 對(duì)象了,則調(diào)用這個(gè)方法創(chuàng)建一個(gè);
          • 根據(jù)是否設(shè)置了預(yù)分配選項(xiàng),創(chuàng)建不同類型的 workers;
          • 使用p.lock鎖創(chuàng)建一個(gè)條件變量;
          • 最后啟動(dòng)一個(gè) goroutine 用于定期清理過(guò)期的 worker。

          Pool.workers字段為workerArray類型,這實(shí)際上是一個(gè)接口,表示一個(gè) worker 容器:

          type workerArray interface {
            len() int
            isEmpty() bool
            insert(worker *goWorker) error
            detach() *goWorker
            retrieveExpiry(duration time.Duration) []*goWorker
            reset()
          }

          每個(gè)方法從名字上很好理解含義:

          • len() int:worker 數(shù)量;
          • isEmpty() bool:worker 數(shù)量是否為 0;
          • insert(worker *goWorker) error:goroutine 任務(wù)執(zhí)行結(jié)束后,將相應(yīng)的 worker 放回workerArray中;
          • detach() *goWorker:從workerArray中取出一個(gè) worker;
          • retrieveExpiry(duration time.Duration) []*goWorker:取出所有的過(guò)期 worker;
          • reset():重置容器。

          workerArrayants中有兩種實(shí)現(xiàn),即workerStackloopQueue。

          workerStack

          我們先來(lái)介紹一下workerStack,它位于文件worker_stack.go中:

          // src/github.com/panjf2000/ants/worker_stack.go
          type workerStack struct {
            items  []*goWorker
            expiry []*goWorker
            size   int
          }

          func newWorkerStack(size int) *workerStack {
            return &workerStack{
              items: make([]*goWorker, 0, size),
              size:  size,
            }
          }
          • items:空閑的worker;
          • expiry:過(guò)期的worker

          goroutine 完成任務(wù)之后,Pool池會(huì)將相應(yīng)的 worker 放回workerStack,調(diào)用workerStack.insert()直接appenditems中即可:

          func (wq *workerStack) insert(worker *goWorker) error {
            wq.items = append(wq.items, worker)
            return nil
          }

          新任務(wù)到來(lái)時(shí),會(huì)調(diào)用workerStack.detach()從容器中取出一個(gè)空閑的 worker:

          func (wq *workerStack) detach() *goWorker {
            l := wq.len()
            if l == 0 {
              return nil
            }

            w := wq.items[l-1]
            wq.items[l-1] = nil // avoid memory leaks
            wq.items = wq.items[:l-1]

            return w
          }

          這里總是返回最后一個(gè) worker,每次insert()也是append到最后,符合棧后進(jìn)先出的特點(diǎn),故稱為workerStack

          這里有一個(gè)細(xì)節(jié),由于切片的底層結(jié)構(gòu)是數(shù)組,只要有引用數(shù)組的指針,數(shù)組中的元素就不會(huì)釋放。這里取出切片最后一個(gè)元素后,將對(duì)應(yīng)數(shù)組元素的指針設(shè)置為nil,主動(dòng)釋放這個(gè)引用。

          上面說(shuō)過(guò)新建Pool對(duì)象時(shí)會(huì)創(chuàng)建一個(gè) goroutine 定期檢查和清理過(guò)期的 worker。通過(guò)調(diào)用workerArray.retrieveExpiry()獲取過(guò)期的 worker 列表。workerStack實(shí)現(xiàn)如下:

          func (wq *workerStack) retrieveExpiry(duration time.Duration) []*goWorker {
            n := wq.len()
            if n == 0 {
              return nil
            }

            expiryTime := time.Now().Add(-duration)
            index := wq.binarySearch(0, n-1, expiryTime)

            wq.expiry = wq.expiry[:0]
            if index != -1 {
              wq.expiry = append(wq.expiry, wq.items[:index+1]...)
              m := copy(wq.items, wq.items[index+1:])
              for i := m; i < n; i++ {
                wq.items[i] = nil
              }
              wq.items = wq.items[:m]
            }
            return wq.expiry
          }

          實(shí)現(xiàn)使用二分查找法找到已過(guò)期的最近一個(gè) worker。由于過(guò)期時(shí)間是按照 goroutine 執(zhí)行任務(wù)后的空閑時(shí)間計(jì)算的,而workerStack.insert()入隊(duì)順序決定了,它們的過(guò)期時(shí)間是從早到晚的。所以可以使用二分查找:

          func (wq *workerStack) binarySearch(l, r int, expiryTime time.Time) int {
            var mid int
            for l <= r {
              mid = (l + r) / 2
              if expiryTime.Before(wq.items[mid].recycleTime) {
                r = mid - 1
              } else {
                l = mid + 1
              }
            }
            return r
          }

          二分查找的是最近過(guò)期的 worker,即將過(guò)期的 worker 的前一個(gè)。它和在它之前的 worker 已經(jīng)全部過(guò)期了。

          如果找到索引index,將items從開(kāi)頭到index(包括)的所有 worker 復(fù)制到expiry字段中。然后將index之后的所有未過(guò)期 worker 復(fù)制到切片頭部,這里使用了copy函數(shù)。copy返回實(shí)際復(fù)制的數(shù)量,即未過(guò)期的 worker 數(shù)量m。然后將切片itemsm開(kāi)始所有的元素置為nil,避免內(nèi)存泄漏,因?yàn)樗鼈円呀?jīng)被復(fù)制到頭部了。最后裁剪items切片,返回過(guò)期 worker 切片。

          loopQueue

          loopQueue實(shí)現(xiàn)基于循環(huán)隊(duì)列,結(jié)構(gòu)定義在文件worker_loop_queue中:

          type loopQueue struct {
            items  []*goWorker
            expiry []*goWorker
            head   int
            tail   int
            size   int
            isFull bool
          }

          func newWorkerLoopQueue(size int) *loopQueue {
            return &loopQueue{
              items: make([]*goWorker, size),
              size:  size,
            }
          }

          由于是循環(huán)隊(duì)列,這里先創(chuàng)建好了一個(gè)長(zhǎng)度為size的切片。循環(huán)隊(duì)列有一個(gè)隊(duì)列頭指針head,指向第一個(gè)有元素的位置,一個(gè)隊(duì)列尾指針tail,指向下一個(gè)可以存放元素的位置。所以一開(kāi)始狀態(tài)如下:

          tail處添加元素,添加后tail指針后移。在head處取出元素,取出后head指針也后移。進(jìn)行一段時(shí)間操作后,隊(duì)列狀態(tài)如下:

          headtail指針到隊(duì)列尾了,需要回繞。所以可能出現(xiàn)這種情況:

          當(dāng)tail指針趕上head指針了,說(shuō)明隊(duì)列就滿了:

          當(dāng)head指針趕上tail指針了,隊(duì)列再次為空:

          根據(jù)示意圖,我們?cè)賮?lái)看loopQueue的操作方法就很簡(jiǎn)單了。

          由于headtail相等的情況有可能是隊(duì)列空,也有可能是隊(duì)列滿,所以loopQueue中增加一個(gè)isFull字段以示區(qū)分。goroutine 完成任務(wù)之后,會(huì)將對(duì)應(yīng)的 worker 對(duì)象放回loopQueue,執(zhí)行的是insert()方法:

          func (wq *loopQueue) insert(worker *goWorker) error {
            if wq.size == 0 {
              return errQueueIsReleased
            }

            if wq.isFull {
              return errQueueIsFull
            }
            wq.items[wq.tail] = worker
            wq.tail++

            if wq.tail == wq.size {
              wq.tail = 0
            }
            if wq.tail == wq.head {
              wq.isFull = true
            }

            return nil
          }

          這個(gè)方法執(zhí)行的就是循環(huán)隊(duì)列的入隊(duì)流程,注意如果插入后tail==head了,說(shuō)明隊(duì)列滿了,設(shè)置isFull字段。

          新任務(wù)到來(lái)調(diào)用loopQueeue.detach()方法獲取一個(gè)空閑的 worker 結(jié)構(gòu):

          func (wq *loopQueue) detach() *goWorker {
            if wq.isEmpty() {
              return nil
            }

            w := wq.items[wq.head]
            wq.items[wq.head] = nil
            wq.head++
            if wq.head == wq.size {
              wq.head = 0
            }
            wq.isFull = false

            return w
          }

          這個(gè)方法對(duì)應(yīng)的是循環(huán)隊(duì)列的出隊(duì)流程,注意每次出隊(duì)后,隊(duì)列肯定不滿了,isFull要重置為false。

          workerStack結(jié)構(gòu)一樣,先入的 worker 對(duì)象過(guò)期時(shí)間早,后入的晚,獲取過(guò)期 worker 的方法與workerStack中類似,只是沒(méi)有使用二分查找了。這里就不贅述了。

          再看Pool創(chuàng)建

          介紹完兩種workerArray的實(shí)現(xiàn)之后,再來(lái)看Pool的創(chuàng)建函數(shù)中workers字段的設(shè)置:

          if p.options.PreAlloc {
            if size == -1 {
              return nil, ErrInvalidPreAllocSize
            }
            p.workers = newWorkerArray(loopQueueType, size)
          else {
            p.workers = newWorkerArray(stackType, 0)
          }

          newWorkerArray()定義在文件worker_array.go中:

          type arrayType int

          const (
            stackType arrayType = 1 << iota
            loopQueueType
          )

          func newWorkerArray(aType arrayType, size int) workerArray {
            switch aType {
            case stackType:
              return newWorkerStack(size)
            case loopQueueType:
              return newWorkerLoopQueue(size)
            default:
              return newWorkerStack(size)
            }
          }

          即如果設(shè)置了預(yù)分配選項(xiàng),就采用loopQueue結(jié)構(gòu)。否則就采用stack的結(jié)構(gòu)。

          worker 結(jié)構(gòu)

          介紹完Pool的創(chuàng)建和結(jié)構(gòu),我們來(lái)看看 worker 的結(jié)構(gòu)。在ants中 worker 用結(jié)構(gòu)體goWorker表示,定義在文件worker.go中。它的結(jié)構(gòu)非常簡(jiǎn)單:

          // src/github.com/panjf2000/ants/worker.go
          type goWorker struct {
            pool *Pool
            task chan func()
            recycleTime time.Time
          }

          具體字段含義很明顯:

          • pool:持有 goroutine 池的引用;
          • task:任務(wù)通道,通過(guò)這個(gè)通道將類型為func ()的函數(shù)作為任務(wù)發(fā)送給goWorker;
          • recyleTime:這個(gè)字段記錄goWorker什么時(shí)候被放回池中(即什么時(shí)候開(kāi)始空閑)。其完成任務(wù)后,在將其放回 goroutine 池的時(shí)候設(shè)置。

          goWorker創(chuàng)建時(shí)會(huì)調(diào)用run()方法,run()方法中啟動(dòng)一個(gè)新 goroutine 處理任務(wù)。run()主體流程非常簡(jiǎn)單:

          func (w *goWorker) run() {
            go func() {
              for f := range w.task {
                if f == nil {
                  return
                }
                f()
                if ok := w.pool.revertWorker(w); !ok {
                  return
                }
              }
            }()
          }

          這個(gè)方法啟動(dòng)一個(gè)新的 goroutine,然后不停地從task通道中接收任務(wù),然后執(zhí)行任務(wù),任務(wù)執(zhí)行完成之后調(diào)用池對(duì)象的revertWorker()方法將該goWorker對(duì)象放回池中,以便下次取出處理新的任務(wù)。revertWorker()方法后面會(huì)詳細(xì)分析。

          這里注意,實(shí)際上for f := range w.task這個(gè)循環(huán)直到通道task關(guān)閉或取出為nil的任務(wù)才會(huì)終止。所以這個(gè) goroutine 一直在運(yùn)行,這正是ants高性能的關(guān)鍵所在。每個(gè)goWorker只會(huì)啟動(dòng)一次 goroutine, 后續(xù)重復(fù)利用這個(gè) goroutine。goroutine 每次只執(zhí)行一個(gè)任務(wù)就會(huì)被放回池中。

          還有一個(gè)細(xì)節(jié),如果放回操作失敗,則會(huì)調(diào)用return,這會(huì)讓 goroutine 運(yùn)行結(jié)束,防止 goroutine 泄漏。

          這里f == nil為 true 時(shí)return,也是一個(gè)細(xì)節(jié)點(diǎn),我們后面講池關(guān)閉的時(shí)候會(huì)詳細(xì)介紹。

          下面我們看看run()方法的異常處理:

          defer func() {
            w.pool.workerCache.Put(w)
            if p := recover(); p != nil {
              if ph := w.pool.options.PanicHandler; ph != nil {
                ph(p)
              } else {
                w.pool.options.Logger.Printf("worker exits from a panic: %v\n", p)
                var buf [4096]byte
                n := runtime.Stack(buf[:], false)
                w.pool.options.Logger.Printf("worker exits from panic: %s\n"string(buf[:n]))
              }
            }
            w.pool.cond.Signal()
          }()

          簡(jiǎn)單來(lái)說(shuō),就是在defer中通過(guò)recover()函數(shù)捕獲任務(wù)執(zhí)行過(guò)程中拋出的panic。這時(shí)任務(wù)執(zhí)行失敗,goroutine 也結(jié)束了。但是goWorker對(duì)象還是可以重復(fù)利用,所以defer函數(shù)一開(kāi)始調(diào)用w.pool.workerCache.Put(w)goWorker對(duì)象放回sync.Pool池中。

          接著就是處理panic,如果選項(xiàng)中指定了panic處理器,直接調(diào)用這個(gè)處理器。否則,ants調(diào)用選項(xiàng)中設(shè)置的Logger記錄一些日志,如堆棧,panic信息等。

          最后需要調(diào)用w.pool.cond.Signal()通知現(xiàn)在有空閑的goWorker了。因?yàn)槲覀儗?shí)際運(yùn)行的goWorker數(shù)量由于panic少了一個(gè),而池中可能有其他任務(wù)在等待處理。

          提交任務(wù)

          接下來(lái),通過(guò)提交任務(wù)就可以串起整個(gè)流程。由上一篇文章我們知道,可以調(diào)用池對(duì)象的Submit()方法提交任務(wù):

          func (p *Pool) Submit(task func()error {
            if p.IsClosed() {
              return ErrPoolClosed
            }
            var w *goWorker
            if w = p.retrieveWorker(); w == nil {
              return ErrPoolOverload
            }
            w.task <- task
            return nil
          }

          首先判斷池是否已關(guān)閉,然后調(diào)用retrieveWorker()方法獲取一個(gè)空閑的 worker,然后將任務(wù)task發(fā)送到 worker 的任務(wù)通道。下面是retrieveWorker()實(shí)現(xiàn):

          func (p *Pool) retrieveWorker() (w *goWorker) {
            p.lock.Lock()

            w = p.workers.detach()
            if w != nil {
              p.lock.Unlock()
            } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
              p.lock.Unlock()
              spawnWorker()
            } else {
              if p.options.Nonblocking {
                p.lock.Unlock()
                return
              }
            Reentry:
              if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
                p.lock.Unlock()
                return
              }
              p.blockingNum++
              p.cond.Wait()
              p.blockingNum--
              var nw int
              if nw = p.Running(); nw == 0 {
                p.lock.Unlock()
                if !p.IsClosed() {
                  spawnWorker()
                }
                return
              }
              if w = p.workers.detach(); w == nil {
                if nw < capacity {
                  p.lock.Unlock()
                  spawnWorker()
                  return
                }
                goto Reentry
              }

              p.lock.Unlock()
            }
            return
          }

          這個(gè)方法稍微有點(diǎn)復(fù)雜,我們一點(diǎn)點(diǎn)來(lái)看。首先調(diào)用p.workers.detach()獲取goWorker對(duì)象。p.workersloopQueue或者workerStack對(duì)象,它們都實(shí)現(xiàn)了detach()方法,前面已經(jīng)介紹過(guò)了。

          如果返回了一個(gè)goWorker對(duì)象,說(shuō)明有空閑 goroutine,直接返回。

          否則,池容量還沒(méi)用完(即容量大于正在工作的goWorker數(shù)量),則調(diào)用spawnWorker()新建一個(gè)goWorker,執(zhí)行其run()方法:

          spawnWorker := func() {
            w = p.workerCache.Get().(*goWorker)
            w.run()
          }

          否則,池容量已用完。如果設(shè)置了非阻塞選項(xiàng),則直接返回。否則,如果設(shè)置了最大阻塞隊(duì)列長(zhǎng)度上限,且當(dāng)前阻塞等待的任務(wù)數(shù)量已經(jīng)達(dá)到這個(gè)上限,直接返回。否則,阻塞等待數(shù)量 +1,調(diào)用p.cond.Wait()等待。

          然后goWorker.run()完成一個(gè)任務(wù)后,調(diào)用池的revertWorker()方法放回goWorker

          func (p *Pool) revertWorker(worker *goWorker) bool {
            if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
              return false
            }
            worker.recycleTime = time.Now()
            p.lock.Lock()

            if p.IsClosed() {
              p.lock.Unlock()
              return false
            }

            err := p.workers.insert(worker)
            if err != nil {
              p.lock.Unlock()
              return false
            }

            p.cond.Signal()
            p.lock.Unlock()
            return true
          }

          這里設(shè)置了goWorkerrecycleTime字段,用于判定過(guò)期。然后將goWorker放回池。workersinsert()方法前面也已經(jīng)分析過(guò)了。

          接著調(diào)用p.cond.Signal()喚醒之前retrieveWorker()方法中的等待。retrieveWorker()方法繼續(xù)執(zhí)行,阻塞等待數(shù)量 -1,這里判斷當(dāng)前goWorker的數(shù)量(也即 goroutine 數(shù)量)。如果數(shù)量等于 0,很有可能池子剛剛執(zhí)行了Release()關(guān)閉,這時(shí)需要判斷池是否處于關(guān)閉狀態(tài),如果是則直接返回。否則,調(diào)用spawnWorker()創(chuàng)建一個(gè)新的goWorker并執(zhí)行其run()方法。

          如果當(dāng)前goWorker數(shù)量不為 0,則調(diào)用p.workers.detach()取出一個(gè)空閑的goWorker返回。這個(gè)操作有可能失敗,因?yàn)榭赡芡瑫r(shí)有多個(gè) goroutine 在等待,喚醒的時(shí)候只有部分 goroutine 能獲取到goWorker。如果失敗了,其容量還未用完,直接創(chuàng)建新的goWorker,反之重新執(zhí)行阻塞等待邏輯。

          這里有很多加鎖和解鎖的邏輯,再加上和信號(hào)量混在一起很難看明白。其實(shí)只需要知道一點(diǎn)就很簡(jiǎn)單了,那就是p.cond.Wait()內(nèi)部會(huì)將當(dāng)前 goroutine 掛起,然后解開(kāi)它持有的鎖,即會(huì)調(diào)用p.lock.Unlock()。這也是為什么revertWorker()p.lock.Lock()加鎖能成功的原因。然后p.cond.Signal()p.cond.Broadcast()會(huì)喚醒因?yàn)?code style="font-size: 14px;overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(239, 112, 96);">p.cond.Wait()而掛起的 goroutine,但是需要Signal()/Broadcast()所在 goroutine 調(diào)用解鎖方法。

          最后,放上整體流程圖:

          清理過(guò)期goWorker

          NewPool()函數(shù)中會(huì)啟動(dòng)一個(gè) goroutine 定期清理過(guò)期的goWorker

          func (p *Pool) purgePeriodically() {
            heartbeat := time.NewTicker(p.options.ExpiryDuration)
            defer heartbeat.Stop()

            for range heartbeat.C {
              if p.IsClosed() {
                break
              }

              p.lock.Lock()
              expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration)
              p.lock.Unlock()

              for i := range expiredWorkers {
                expiredWorkers[i].task <- nil
                expiredWorkers[i] = nil
              }

              if p.Running() == 0 {
                p.cond.Broadcast()
              }
            }
          }

          如果池子已關(guān)閉,直接退出 goroutine。由選項(xiàng)ExpiryDuration來(lái)設(shè)置清理的間隔,如果沒(méi)有設(shè)置該選項(xiàng),采用默認(rèn)值 1s:

          // src/github.com/panjf2000/ants/pool.go
          func NewPool(size int, options ...Option) (*Pool, error) {
            if expiry := opts.ExpiryDuration; expiry < 0 {
              return nil, ErrInvalidPoolExpiry
            } else if expiry == 0 {
              opts.ExpiryDuration = DefaultCleanIntervalTime
            }
          }

          // src/github.com/panjf2000/ants/pool.go
          const (
            DefaultCleanIntervalTime = time.Second
          )

          然后就是每個(gè)清理周期,調(diào)用p.workers.retrieveExpiry()方法,取出過(guò)期的goWorker。因?yàn)橛蛇@些goWorker啟動(dòng)的 goroutine 還阻塞在通道task上,所以要向該通道發(fā)送一個(gè)nil值,而goWorker.run()方法中接收到一個(gè)值為nil的任務(wù)會(huì)return,結(jié)束 goroutine,避免了 goroutine 泄漏。

          如果所有goWorker都被清理掉了,可能這時(shí)還有 goroutine 阻塞在retrieveWorker()方法中的p.cond.Wait()上,所以這里需要調(diào)用p.cond.Broadcast()喚醒這些 goroutine。

          容量動(dòng)態(tài)修改

          在運(yùn)行過(guò)程中,可以動(dòng)態(tài)修改池的容量。調(diào)用p.Tune(size int)方法:

          func (p *Pool) Tune(size int) {
            if capacity := p.Cap(); capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc {
              return
            }
            atomic.StoreInt32(&p.capacity, int32(size))
          }

          這里只是簡(jiǎn)單設(shè)置了一下新的容量,不影響當(dāng)前正在執(zhí)行的goWorker,而且如果設(shè)置了預(yù)分配選項(xiàng),容量不能再次設(shè)置。

          下次執(zhí)行revertWorker()的時(shí)候就會(huì)以新的容量判斷是否能放回,下次執(zhí)行retrieveWorker()的時(shí)候也會(huì)以新容量判斷是否能創(chuàng)建新goWorker。

          關(guān)閉和重新啟動(dòng)Pool

          使用完成之后,需要關(guān)閉Pool,避免 goroutine 泄漏。調(diào)用池對(duì)象的Release()方法關(guān)閉:

          func (p *Pool) Release() {
            atomic.StoreInt32(&p.state, CLOSED)
            p.lock.Lock()
            p.workers.reset()
            p.lock.Unlock()
            p.cond.Broadcast()
          }

          調(diào)用p.workers.reset()結(jié)束loopQueuewokerStack中的 goroutine,做一些清理工作,同時(shí)為了防止有 goroutine 阻塞在p.cond.Wait()上,執(zhí)行一次p.cond.Broadcast()。

          workerStackloopQueuereset()基本相同,即發(fā)送niltask通道從而結(jié)束 goroutine,然后重置各個(gè)字段:

          // loopQueue 版本
          func (wq *loopQueue) reset() {
            if wq.isEmpty() {
              return
            }

          Releasing:
            if w := wq.detach(); w != nil {
              w.task <- nil
              goto Releasing
            }
            wq.items = wq.items[:0]
            wq.size = 0
            wq.head = 0
            wq.tail = 0
          }

          // stack 版本
          func (wq *workerStack) reset() {
            for i := 0; i < wq.len(); i++ {
              wq.items[i].task <- nil
              wq.items[i] = nil
            }
            wq.items = wq.items[:0]
          }

          池關(guān)閉后還可以調(diào)用Reboot()重啟:

          func (p *Pool) Reboot() {
            if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
              go p.purgePeriodically()
            }
          }

          由于p.purgePeriodically()p.Release()之后檢測(cè)到池關(guān)閉就直接退出了,這里需要重新開(kāi)啟一個(gè) goroutine 定期清理。

          PoolWithFuncWorkWithFunc

          上一篇文章中我們還介紹了另一種方式創(chuàng)建Pool,即NewPoolWithFunc(),指定一個(gè)函數(shù)。后面提交任務(wù)時(shí)調(diào)用p.Invoke()提供參數(shù)就可以執(zhí)行該函數(shù)了。這種方式創(chuàng)建的 Pool 和 Woker 結(jié)構(gòu)如下:

          type PoolWithFunc struct {
            workers []*goWorkerWithFunc
            poolFunc func(interface{})
          }

          type goWorkerWithFunc struct {
            pool *PoolWithFunc
            args chan interface{}
            recycleTime time.Time
          }

          與前面介紹的PoolgoWorker大體相似,只是PoolWithFunc保存了傳入的函數(shù)對(duì)象,使用數(shù)組保存 worker。goWorkerWithFuncinterface{}args通道的數(shù)據(jù)類型,其實(shí)也好理解,因?yàn)橐呀?jīng)有函數(shù)了,只需要傳入數(shù)據(jù)作為參數(shù)就可以運(yùn)行了:

          func (w *goWorkerWithFunc) run() {
            go func() {
              for args := range w.args {
                if args == nil {
                  return
                }
                w.pool.poolFunc(args)
                if ok := w.pool.revertWorker(w); !ok {
                  return
                }
              }
            }()
          }

          從通道接收函數(shù)參數(shù),執(zhí)行池中保存的函數(shù)對(duì)象。

          其他細(xì)節(jié)

          task緩沖通道

          還記得創(chuàng)建p.workerCache這個(gè)sync.Pool對(duì)象的代碼么:

          p.workerCache.New = func() interface{} {
            return &goWorker{
              pool: p,
              task: make(chan func()workerChanCap),
            }
          }

          sync.Pool中沒(méi)有goWorker對(duì)象時(shí),調(diào)用New()方法創(chuàng)建一個(gè),注意到這里創(chuàng)建的task通道使用workerChanCap作為容量。這個(gè)變量定義在ants.go文件中:

          var (
            // workerChanCap determines whether the channel of a worker should be a buffered channel
            // to get the best performance. Inspired by fasthttp at
            // https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139
            workerChanCap = func() int {
              // Use blocking channel if GOMAXPROCS=1.
              // This switches context from sender to receiver immediately,
              // which results in higher performance (under go1.5 at least).
              if runtime.GOMAXPROCS(0) == 1 {
                return 0
              }

              // Use non-blocking workerChan if GOMAXPROCS>1,
              // since otherwise the sender might be dragged down if the receiver is CPU-bound.
              return 1
            }()
          )

          為了方便對(duì)照,我把注釋也放上來(lái)了。ants參考了著名的 Web 框架fasthttp的實(shí)現(xiàn)。當(dāng)GOMAXPROCS為 1 時(shí)(即操作系統(tǒng)線程數(shù)為 1),向通道task發(fā)送會(huì)掛起發(fā)送 goroutine,將執(zhí)行流程轉(zhuǎn)向接收 goroutine,這能提升接收處理性能。如果GOMAXPROCS大于 1,ants使用帶緩沖的通道,為了防止接收 goroutine 是 CPU 密集的,導(dǎo)致發(fā)送 goroutine 被阻塞。下面是fasthttp中的相關(guān)代碼:

          // src/github.com/valyala/fasthttp/workerpool.go
          var workerChanCap = func() int {
            // Use blocking workerChan if GOMAXPROCS=1.
            // This immediately switches Serve to WorkerFunc, which results
            // in higher performance (under go1.5 at least).
            if runtime.GOMAXPROCS(0) == 1 {
              return 0
            }

            // Use non-blocking workerChan if GOMAXPROCS>1,
            // since otherwise the Serve caller (Acceptor) may lag accepting
            // new connections if WorkerFunc is CPU-bound.
            return 1
          }()

          自旋鎖

          ants利用atomic.CompareAndSwapUint32()這個(gè)原子操作實(shí)現(xiàn)了一個(gè)自旋鎖。與其他類型的鎖不同,自旋鎖在加鎖失敗之后不會(huì)立刻進(jìn)入等待,而是會(huì)繼續(xù)嘗試。這對(duì)于很快就能獲得鎖的應(yīng)用來(lái)說(shuō)能極大提升性能,因?yàn)槟鼙苊饧渔i和解鎖導(dǎo)致的線程切換:

          type spinLock uint32

          func (sl *spinLock) Lock() {
            backoff := 1
            for !atomic.CompareAndSwapUint32((*uint32)(sl), 01) {
              for i := 0; i < backoff; i++ {
                runtime.Gosched()
              }
              backoff <<= 1
            }
          }

          func (sl *spinLock) Unlock() {
            atomic.StoreUint32((*uint32)(sl), 0)
          }

          // NewSpinLock instantiates a spin-lock.
          func NewSpinLock() sync.Locker {
            return new(spinLock)
          }

          另外這里使用了指數(shù)退避,先等 1 個(gè)循環(huán)周期,通過(guò)runtime.Gosched()告訴運(yùn)行時(shí)切換其他 goroutine 運(yùn)行。如果還是獲取不到鎖,就再等 2 個(gè)周期。如果還是不行,再等 4,8,16...以此類推。這可以防止短時(shí)間內(nèi)獲取不到鎖,導(dǎo)致 CPU 時(shí)間的浪費(fèi)。

          總結(jié)

          ants源碼短小精悍,沒(méi)有引用其他任何第三方庫(kù)。各種細(xì)節(jié)處理,各種性能優(yōu)化的點(diǎn)都是值得我們細(xì)細(xì)品味的。強(qiáng)烈建議大家讀一讀源碼。閱讀優(yōu)秀的源碼,能極大地提高自身的編碼素養(yǎng)。

          大家如果發(fā)現(xiàn)好玩、好用的 Go 語(yǔ)言庫(kù),歡迎到 Go 每日一庫(kù) GitHub 上提交 issue??

          參考

          1. ants GitHub:github.com/panjf2000/ants
          2. Go 每日一庫(kù) GitHub:https://github.com/darjun/go-daily-lib


          推薦閱讀


          福利

          我為大家整理了一份從入門(mén)到進(jìn)階的Go學(xué)習(xí)資料禮包,包含學(xué)習(xí)建議:入門(mén)看什么,進(jìn)階看什么。關(guān)注公眾號(hào) 「polarisxu」,回復(fù) ebook 獲?。贿€可以回復(fù)「進(jìn)群」,和數(shù)萬(wàn) Gopher 交流學(xué)習(xí)。

          瀏覽 100
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩色网 | 中文日韩欧美 | 日皮太爽了我要看免费视频 | 美女乱伦免费 | 天天操夜夜操天天射天天干 |