Go 每日一庫(kù)之 ants 源碼賞析
簡(jiǎn)介
繼上一篇Go 每日一庫(kù)之 ants,這篇文章我們來(lái)一起看看ants的源碼。
Pool
通過(guò)上篇文章,我們知道ants池有兩種創(chuàng)建方式:
p, _ := ants.NewPool(cap):這種方式創(chuàng)建的池子對(duì)象需要調(diào)用p.Submit(task)提交任務(wù),任務(wù)是一個(gè)無(wú)參數(shù)無(wú)返回值的函數(shù);p, _ := ants.NewPoolWithFunc(cap, func(interface{})):這種方式創(chuàng)建的池子對(duì)象需要指定池函數(shù),并且使用p.Invoke(arg)調(diào)用池函數(shù)。arg就是傳給池函數(shù)func(interface{})的參數(shù)。
在ants中這兩種池子使用不同的結(jié)構(gòu)來(lái)表示:ants.Pool和ants.PoolWithFunc。我們先來(lái)介紹Pool。PoolWithFunc結(jié)構(gòu)也是類似的,介紹完Pool之后,我們?cè)俸?jiǎn)單比較一下它們。
Pool結(jié)構(gòu)定義在文件pool.go中:
// src/github.com/panjf2000/ants/pool.go
type Pool struct {
capacity int32
running int32
workers workerArray
state int32
lock sync.Locker
cond *sync.Cond
workerCache sync.Pool
blockingNum int
options *Options
}
各個(gè)字段含義如下:
capacity:池容量,表示ants最多能創(chuàng)建的 goroutine 數(shù)量。如果為負(fù)數(shù),表示容量無(wú)限制;running:已經(jīng)創(chuàng)建的 worker goroutine 的數(shù)量;workers:存放一組 worker 對(duì)象,workerArray只是一個(gè)接口,表示一個(gè) worker 容器,后面詳述;state:記錄池子當(dāng)前的狀態(tài),是否已關(guān)閉(CLOSED);lock:鎖。ants自己實(shí)現(xiàn)了一個(gè)自旋鎖。用于同步并發(fā)操作;cond:條件變量。處理任務(wù)等待和喚醒;workerCache:使用sync.Pool對(duì)象池管理和創(chuàng)建worker對(duì)象,提升性能;blockingNum:阻塞等待的任務(wù)數(shù)量;options:選項(xiàng)。上一篇文章已經(jīng)詳細(xì)介紹過(guò)了。
這里明確一個(gè)概念,ants中為每個(gè)任務(wù)都是由 worker 對(duì)象來(lái)處理的,每個(gè) worker 對(duì)象會(huì)對(duì)應(yīng)創(chuàng)建一個(gè) goroutine 來(lái)處理任務(wù)。ants中使用goWorker表示 worker:
// src/github.com/panjf2000/ants/worker.go
type goWorker struct {
pool *Pool
task chan func()
recycleTime time.Time
}
后文詳細(xì)介紹這一塊內(nèi)容,現(xiàn)在我們只需要知道Pool.workers字段就是存放goWorker對(duì)象的容器。
Pool創(chuàng)建
創(chuàng)建Pool對(duì)象需調(diào)用ants.NewPool(size, options)函數(shù)。省略了一些處理選項(xiàng)的代碼,最終代碼如下:
// src/github.com/panjf2000/ants/pool.go
func NewPool(size int, options ...Option) (*Pool, error) {
// ...
p := &Pool{
capacity: int32(size),
lock: internal.NewSpinLock(),
options: opts,
}
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
if p.options.PreAlloc {
if size == -1 {
return nil, ErrInvalidPreAllocSize
}
p.workers = newWorkerArray(loopQueueType, size)
} else {
p.workers = newWorkerArray(stackType, 0)
}
p.cond = sync.NewCond(p.lock)
go p.purgePeriodically()
return p, nil
}
代碼不難理解:
創(chuàng)建 Pool對(duì)象,設(shè)置容量,創(chuàng)建一個(gè)自旋鎖來(lái)初始化lock字段,設(shè)置選項(xiàng);設(shè)置 workerCache這個(gè)sync.Pool對(duì)象的New方法,在調(diào)用sync.Pool對(duì)象的Get()方法時(shí),如果它沒(méi)有緩存的 worker 對(duì)象了,則調(diào)用這個(gè)方法創(chuàng)建一個(gè);根據(jù)是否設(shè)置了預(yù)分配選項(xiàng),創(chuàng)建不同類型的 workers; 使用 p.lock鎖創(chuàng)建一個(gè)條件變量;最后啟動(dòng)一個(gè) goroutine 用于定期清理過(guò)期的 worker。
Pool.workers字段為workerArray類型,這實(shí)際上是一個(gè)接口,表示一個(gè) worker 容器:
type workerArray interface {
len() int
isEmpty() bool
insert(worker *goWorker) error
detach() *goWorker
retrieveExpiry(duration time.Duration) []*goWorker
reset()
}
每個(gè)方法從名字上很好理解含義:
len() int:worker 數(shù)量;isEmpty() bool:worker 數(shù)量是否為 0;insert(worker *goWorker) error:goroutine 任務(wù)執(zhí)行結(jié)束后,將相應(yīng)的 worker 放回workerArray中;detach() *goWorker:從workerArray中取出一個(gè) worker;retrieveExpiry(duration time.Duration) []*goWorker:取出所有的過(guò)期 worker;reset():重置容器。
workerArray在ants中有兩種實(shí)現(xiàn),即workerStack和loopQueue。
workerStack
我們先來(lái)介紹一下workerStack,它位于文件worker_stack.go中:
// src/github.com/panjf2000/ants/worker_stack.go
type workerStack struct {
items []*goWorker
expiry []*goWorker
size int
}
func newWorkerStack(size int) *workerStack {
return &workerStack{
items: make([]*goWorker, 0, size),
size: size,
}
}
items:空閑的worker;expiry:過(guò)期的worker。
goroutine 完成任務(wù)之后,Pool池會(huì)將相應(yīng)的 worker 放回workerStack,調(diào)用workerStack.insert()直接append到items中即可:
func (wq *workerStack) insert(worker *goWorker) error {
wq.items = append(wq.items, worker)
return nil
}
新任務(wù)到來(lái)時(shí),會(huì)調(diào)用workerStack.detach()從容器中取出一個(gè)空閑的 worker:
func (wq *workerStack) detach() *goWorker {
l := wq.len()
if l == 0 {
return nil
}
w := wq.items[l-1]
wq.items[l-1] = nil // avoid memory leaks
wq.items = wq.items[:l-1]
return w
}
這里總是返回最后一個(gè) worker,每次insert()也是append到最后,符合棧后進(jìn)先出的特點(diǎn),故稱為workerStack。
這里有一個(gè)細(xì)節(jié),由于切片的底層結(jié)構(gòu)是數(shù)組,只要有引用數(shù)組的指針,數(shù)組中的元素就不會(huì)釋放。這里取出切片最后一個(gè)元素后,將對(duì)應(yīng)數(shù)組元素的指針設(shè)置為nil,主動(dòng)釋放這個(gè)引用。
上面說(shuō)過(guò)新建Pool對(duì)象時(shí)會(huì)創(chuàng)建一個(gè) goroutine 定期檢查和清理過(guò)期的 worker。通過(guò)調(diào)用workerArray.retrieveExpiry()獲取過(guò)期的 worker 列表。workerStack實(shí)現(xiàn)如下:
func (wq *workerStack) retrieveExpiry(duration time.Duration) []*goWorker {
n := wq.len()
if n == 0 {
return nil
}
expiryTime := time.Now().Add(-duration)
index := wq.binarySearch(0, n-1, expiryTime)
wq.expiry = wq.expiry[:0]
if index != -1 {
wq.expiry = append(wq.expiry, wq.items[:index+1]...)
m := copy(wq.items, wq.items[index+1:])
for i := m; i < n; i++ {
wq.items[i] = nil
}
wq.items = wq.items[:m]
}
return wq.expiry
}
實(shí)現(xiàn)使用二分查找法找到已過(guò)期的最近一個(gè) worker。由于過(guò)期時(shí)間是按照 goroutine 執(zhí)行任務(wù)后的空閑時(shí)間計(jì)算的,而workerStack.insert()入隊(duì)順序決定了,它們的過(guò)期時(shí)間是從早到晚的。所以可以使用二分查找:
func (wq *workerStack) binarySearch(l, r int, expiryTime time.Time) int {
var mid int
for l <= r {
mid = (l + r) / 2
if expiryTime.Before(wq.items[mid].recycleTime) {
r = mid - 1
} else {
l = mid + 1
}
}
return r
}
二分查找的是最近過(guò)期的 worker,即將過(guò)期的 worker 的前一個(gè)。它和在它之前的 worker 已經(jīng)全部過(guò)期了。
如果找到索引index,將items從開(kāi)頭到index(包括)的所有 worker 復(fù)制到expiry字段中。然后將index之后的所有未過(guò)期 worker 復(fù)制到切片頭部,這里使用了copy函數(shù)。copy返回實(shí)際復(fù)制的數(shù)量,即未過(guò)期的 worker 數(shù)量m。然后將切片items從m開(kāi)始所有的元素置為nil,避免內(nèi)存泄漏,因?yàn)樗鼈円呀?jīng)被復(fù)制到頭部了。最后裁剪items切片,返回過(guò)期 worker 切片。
loopQueue
loopQueue實(shí)現(xiàn)基于循環(huán)隊(duì)列,結(jié)構(gòu)定義在文件worker_loop_queue中:
type loopQueue struct {
items []*goWorker
expiry []*goWorker
head int
tail int
size int
isFull bool
}
func newWorkerLoopQueue(size int) *loopQueue {
return &loopQueue{
items: make([]*goWorker, size),
size: size,
}
}
由于是循環(huán)隊(duì)列,這里先創(chuàng)建好了一個(gè)長(zhǎng)度為size的切片。循環(huán)隊(duì)列有一個(gè)隊(duì)列頭指針head,指向第一個(gè)有元素的位置,一個(gè)隊(duì)列尾指針tail,指向下一個(gè)可以存放元素的位置。所以一開(kāi)始狀態(tài)如下:

在tail處添加元素,添加后tail指針后移。在head處取出元素,取出后head指針也后移。進(jìn)行一段時(shí)間操作后,隊(duì)列狀態(tài)如下:

head或tail指針到隊(duì)列尾了,需要回繞。所以可能出現(xiàn)這種情況:

當(dāng)tail指針趕上head指針了,說(shuō)明隊(duì)列就滿了:

當(dāng)head指針趕上tail指針了,隊(duì)列再次為空:

根據(jù)示意圖,我們?cè)賮?lái)看loopQueue的操作方法就很簡(jiǎn)單了。
由于head和tail相等的情況有可能是隊(duì)列空,也有可能是隊(duì)列滿,所以loopQueue中增加一個(gè)isFull字段以示區(qū)分。goroutine 完成任務(wù)之后,會(huì)將對(duì)應(yīng)的 worker 對(duì)象放回loopQueue,執(zhí)行的是insert()方法:
func (wq *loopQueue) insert(worker *goWorker) error {
if wq.size == 0 {
return errQueueIsReleased
}
if wq.isFull {
return errQueueIsFull
}
wq.items[wq.tail] = worker
wq.tail++
if wq.tail == wq.size {
wq.tail = 0
}
if wq.tail == wq.head {
wq.isFull = true
}
return nil
}
這個(gè)方法執(zhí)行的就是循環(huán)隊(duì)列的入隊(duì)流程,注意如果插入后tail==head了,說(shuō)明隊(duì)列滿了,設(shè)置isFull字段。
新任務(wù)到來(lái)調(diào)用loopQueeue.detach()方法獲取一個(gè)空閑的 worker 結(jié)構(gòu):
func (wq *loopQueue) detach() *goWorker {
if wq.isEmpty() {
return nil
}
w := wq.items[wq.head]
wq.items[wq.head] = nil
wq.head++
if wq.head == wq.size {
wq.head = 0
}
wq.isFull = false
return w
}
這個(gè)方法對(duì)應(yīng)的是循環(huán)隊(duì)列的出隊(duì)流程,注意每次出隊(duì)后,隊(duì)列肯定不滿了,isFull要重置為false。
與workerStack結(jié)構(gòu)一樣,先入的 worker 對(duì)象過(guò)期時(shí)間早,后入的晚,獲取過(guò)期 worker 的方法與workerStack中類似,只是沒(méi)有使用二分查找了。這里就不贅述了。
再看Pool創(chuàng)建
介紹完兩種workerArray的實(shí)現(xiàn)之后,再來(lái)看Pool的創(chuàng)建函數(shù)中workers字段的設(shè)置:
if p.options.PreAlloc {
if size == -1 {
return nil, ErrInvalidPreAllocSize
}
p.workers = newWorkerArray(loopQueueType, size)
} else {
p.workers = newWorkerArray(stackType, 0)
}
newWorkerArray()定義在文件worker_array.go中:
type arrayType int
const (
stackType arrayType = 1 << iota
loopQueueType
)
func newWorkerArray(aType arrayType, size int) workerArray {
switch aType {
case stackType:
return newWorkerStack(size)
case loopQueueType:
return newWorkerLoopQueue(size)
default:
return newWorkerStack(size)
}
}
即如果設(shè)置了預(yù)分配選項(xiàng),就采用loopQueue結(jié)構(gòu)。否則就采用stack的結(jié)構(gòu)。
worker 結(jié)構(gòu)
介紹完Pool的創(chuàng)建和結(jié)構(gòu),我們來(lái)看看 worker 的結(jié)構(gòu)。在ants中 worker 用結(jié)構(gòu)體goWorker表示,定義在文件worker.go中。它的結(jié)構(gòu)非常簡(jiǎn)單:
// src/github.com/panjf2000/ants/worker.go
type goWorker struct {
pool *Pool
task chan func()
recycleTime time.Time
}
具體字段含義很明顯:
pool:持有 goroutine 池的引用;task:任務(wù)通道,通過(guò)這個(gè)通道將類型為func ()的函數(shù)作為任務(wù)發(fā)送給goWorker;recyleTime:這個(gè)字段記錄goWorker什么時(shí)候被放回池中(即什么時(shí)候開(kāi)始空閑)。其完成任務(wù)后,在將其放回 goroutine 池的時(shí)候設(shè)置。
goWorker創(chuàng)建時(shí)會(huì)調(diào)用run()方法,run()方法中啟動(dòng)一個(gè)新 goroutine 處理任務(wù)。run()主體流程非常簡(jiǎn)單:
func (w *goWorker) run() {
go func() {
for f := range w.task {
if f == nil {
return
}
f()
if ok := w.pool.revertWorker(w); !ok {
return
}
}
}()
}
這個(gè)方法啟動(dòng)一個(gè)新的 goroutine,然后不停地從task通道中接收任務(wù),然后執(zhí)行任務(wù),任務(wù)執(zhí)行完成之后調(diào)用池對(duì)象的revertWorker()方法將該goWorker對(duì)象放回池中,以便下次取出處理新的任務(wù)。revertWorker()方法后面會(huì)詳細(xì)分析。
這里注意,實(shí)際上for f := range w.task這個(gè)循環(huán)直到通道task關(guān)閉或取出為nil的任務(wù)才會(huì)終止。所以這個(gè) goroutine 一直在運(yùn)行,這正是ants高性能的關(guān)鍵所在。每個(gè)goWorker只會(huì)啟動(dòng)一次 goroutine, 后續(xù)重復(fù)利用這個(gè) goroutine。goroutine 每次只執(zhí)行一個(gè)任務(wù)就會(huì)被放回池中。
還有一個(gè)細(xì)節(jié),如果放回操作失敗,則會(huì)調(diào)用return,這會(huì)讓 goroutine 運(yùn)行結(jié)束,防止 goroutine 泄漏。
這里f == nil為 true 時(shí)return,也是一個(gè)細(xì)節(jié)點(diǎn),我們后面講池關(guān)閉的時(shí)候會(huì)詳細(xì)介紹。
下面我們看看run()方法的異常處理:
defer func() {
w.pool.workerCache.Put(w)
if p := recover(); p != nil {
if ph := w.pool.options.PanicHandler; ph != nil {
ph(p)
} else {
w.pool.options.Logger.Printf("worker exits from a panic: %v\n", p)
var buf [4096]byte
n := runtime.Stack(buf[:], false)
w.pool.options.Logger.Printf("worker exits from panic: %s\n", string(buf[:n]))
}
}
w.pool.cond.Signal()
}()
簡(jiǎn)單來(lái)說(shuō),就是在defer中通過(guò)recover()函數(shù)捕獲任務(wù)執(zhí)行過(guò)程中拋出的panic。這時(shí)任務(wù)執(zhí)行失敗,goroutine 也結(jié)束了。但是goWorker對(duì)象還是可以重復(fù)利用,所以defer函數(shù)一開(kāi)始調(diào)用w.pool.workerCache.Put(w)將goWorker對(duì)象放回sync.Pool池中。
接著就是處理panic,如果選項(xiàng)中指定了panic處理器,直接調(diào)用這個(gè)處理器。否則,ants調(diào)用選項(xiàng)中設(shè)置的Logger記錄一些日志,如堆棧,panic信息等。
最后需要調(diào)用w.pool.cond.Signal()通知現(xiàn)在有空閑的goWorker了。因?yàn)槲覀儗?shí)際運(yùn)行的goWorker數(shù)量由于panic少了一個(gè),而池中可能有其他任務(wù)在等待處理。
提交任務(wù)
接下來(lái),通過(guò)提交任務(wù)就可以串起整個(gè)流程。由上一篇文章我們知道,可以調(diào)用池對(duì)象的Submit()方法提交任務(wù):
func (p *Pool) Submit(task func()) error {
if p.IsClosed() {
return ErrPoolClosed
}
var w *goWorker
if w = p.retrieveWorker(); w == nil {
return ErrPoolOverload
}
w.task <- task
return nil
}
首先判斷池是否已關(guān)閉,然后調(diào)用retrieveWorker()方法獲取一個(gè)空閑的 worker,然后將任務(wù)task發(fā)送到 worker 的任務(wù)通道。下面是retrieveWorker()實(shí)現(xiàn):
func (p *Pool) retrieveWorker() (w *goWorker) {
p.lock.Lock()
w = p.workers.detach()
if w != nil {
p.lock.Unlock()
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
p.lock.Unlock()
spawnWorker()
} else {
if p.options.Nonblocking {
p.lock.Unlock()
return
}
Reentry:
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
p.lock.Unlock()
return
}
p.blockingNum++
p.cond.Wait()
p.blockingNum--
var nw int
if nw = p.Running(); nw == 0 {
p.lock.Unlock()
if !p.IsClosed() {
spawnWorker()
}
return
}
if w = p.workers.detach(); w == nil {
if nw < capacity {
p.lock.Unlock()
spawnWorker()
return
}
goto Reentry
}
p.lock.Unlock()
}
return
}
這個(gè)方法稍微有點(diǎn)復(fù)雜,我們一點(diǎn)點(diǎn)來(lái)看。首先調(diào)用p.workers.detach()獲取goWorker對(duì)象。p.workers是loopQueue或者workerStack對(duì)象,它們都實(shí)現(xiàn)了detach()方法,前面已經(jīng)介紹過(guò)了。
如果返回了一個(gè)goWorker對(duì)象,說(shuō)明有空閑 goroutine,直接返回。
否則,池容量還沒(méi)用完(即容量大于正在工作的goWorker數(shù)量),則調(diào)用spawnWorker()新建一個(gè)goWorker,執(zhí)行其run()方法:
spawnWorker := func() {
w = p.workerCache.Get().(*goWorker)
w.run()
}
否則,池容量已用完。如果設(shè)置了非阻塞選項(xiàng),則直接返回。否則,如果設(shè)置了最大阻塞隊(duì)列長(zhǎng)度上限,且當(dāng)前阻塞等待的任務(wù)數(shù)量已經(jīng)達(dá)到這個(gè)上限,直接返回。否則,阻塞等待數(shù)量 +1,調(diào)用p.cond.Wait()等待。
然后goWorker.run()完成一個(gè)任務(wù)后,調(diào)用池的revertWorker()方法放回goWorker:
func (p *Pool) revertWorker(worker *goWorker) bool {
if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
return false
}
worker.recycleTime = time.Now()
p.lock.Lock()
if p.IsClosed() {
p.lock.Unlock()
return false
}
err := p.workers.insert(worker)
if err != nil {
p.lock.Unlock()
return false
}
p.cond.Signal()
p.lock.Unlock()
return true
}
這里設(shè)置了goWorker的recycleTime字段,用于判定過(guò)期。然后將goWorker放回池。workers的insert()方法前面也已經(jīng)分析過(guò)了。
接著調(diào)用p.cond.Signal()喚醒之前retrieveWorker()方法中的等待。retrieveWorker()方法繼續(xù)執(zhí)行,阻塞等待數(shù)量 -1,這里判斷當(dāng)前goWorker的數(shù)量(也即 goroutine 數(shù)量)。如果數(shù)量等于 0,很有可能池子剛剛執(zhí)行了Release()關(guān)閉,這時(shí)需要判斷池是否處于關(guān)閉狀態(tài),如果是則直接返回。否則,調(diào)用spawnWorker()創(chuàng)建一個(gè)新的goWorker并執(zhí)行其run()方法。
如果當(dāng)前goWorker數(shù)量不為 0,則調(diào)用p.workers.detach()取出一個(gè)空閑的goWorker返回。這個(gè)操作有可能失敗,因?yàn)榭赡芡瑫r(shí)有多個(gè) goroutine 在等待,喚醒的時(shí)候只有部分 goroutine 能獲取到goWorker。如果失敗了,其容量還未用完,直接創(chuàng)建新的goWorker,反之重新執(zhí)行阻塞等待邏輯。
這里有很多加鎖和解鎖的邏輯,再加上和信號(hào)量混在一起很難看明白。其實(shí)只需要知道一點(diǎn)就很簡(jiǎn)單了,那就是p.cond.Wait()內(nèi)部會(huì)將當(dāng)前 goroutine 掛起,然后解開(kāi)它持有的鎖,即會(huì)調(diào)用p.lock.Unlock()。這也是為什么revertWorker()中p.lock.Lock()加鎖能成功的原因。然后p.cond.Signal()或p.cond.Broadcast()會(huì)喚醒因?yàn)?code style="font-size: 14px;overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(239, 112, 96);">p.cond.Wait()而掛起的 goroutine,但是需要Signal()/Broadcast()所在 goroutine 調(diào)用解鎖方法。
最后,放上整體流程圖:

清理過(guò)期goWorker
在NewPool()函數(shù)中會(huì)啟動(dòng)一個(gè) goroutine 定期清理過(guò)期的goWorker:
func (p *Pool) purgePeriodically() {
heartbeat := time.NewTicker(p.options.ExpiryDuration)
defer heartbeat.Stop()
for range heartbeat.C {
if p.IsClosed() {
break
}
p.lock.Lock()
expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration)
p.lock.Unlock()
for i := range expiredWorkers {
expiredWorkers[i].task <- nil
expiredWorkers[i] = nil
}
if p.Running() == 0 {
p.cond.Broadcast()
}
}
}
如果池子已關(guān)閉,直接退出 goroutine。由選項(xiàng)ExpiryDuration來(lái)設(shè)置清理的間隔,如果沒(méi)有設(shè)置該選項(xiàng),采用默認(rèn)值 1s:
// src/github.com/panjf2000/ants/pool.go
func NewPool(size int, options ...Option) (*Pool, error) {
if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
opts.ExpiryDuration = DefaultCleanIntervalTime
}
}
// src/github.com/panjf2000/ants/pool.go
const (
DefaultCleanIntervalTime = time.Second
)
然后就是每個(gè)清理周期,調(diào)用p.workers.retrieveExpiry()方法,取出過(guò)期的goWorker。因?yàn)橛蛇@些goWorker啟動(dòng)的 goroutine 還阻塞在通道task上,所以要向該通道發(fā)送一個(gè)nil值,而goWorker.run()方法中接收到一個(gè)值為nil的任務(wù)會(huì)return,結(jié)束 goroutine,避免了 goroutine 泄漏。
如果所有goWorker都被清理掉了,可能這時(shí)還有 goroutine 阻塞在retrieveWorker()方法中的p.cond.Wait()上,所以這里需要調(diào)用p.cond.Broadcast()喚醒這些 goroutine。
容量動(dòng)態(tài)修改
在運(yùn)行過(guò)程中,可以動(dòng)態(tài)修改池的容量。調(diào)用p.Tune(size int)方法:
func (p *Pool) Tune(size int) {
if capacity := p.Cap(); capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc {
return
}
atomic.StoreInt32(&p.capacity, int32(size))
}
這里只是簡(jiǎn)單設(shè)置了一下新的容量,不影響當(dāng)前正在執(zhí)行的goWorker,而且如果設(shè)置了預(yù)分配選項(xiàng),容量不能再次設(shè)置。
下次執(zhí)行revertWorker()的時(shí)候就會(huì)以新的容量判斷是否能放回,下次執(zhí)行retrieveWorker()的時(shí)候也會(huì)以新容量判斷是否能創(chuàng)建新goWorker。
關(guān)閉和重新啟動(dòng)Pool
使用完成之后,需要關(guān)閉Pool,避免 goroutine 泄漏。調(diào)用池對(duì)象的Release()方法關(guān)閉:
func (p *Pool) Release() {
atomic.StoreInt32(&p.state, CLOSED)
p.lock.Lock()
p.workers.reset()
p.lock.Unlock()
p.cond.Broadcast()
}
調(diào)用p.workers.reset()結(jié)束loopQueue或wokerStack中的 goroutine,做一些清理工作,同時(shí)為了防止有 goroutine 阻塞在p.cond.Wait()上,執(zhí)行一次p.cond.Broadcast()。
workerStack與loopQueue的reset()基本相同,即發(fā)送nil到task通道從而結(jié)束 goroutine,然后重置各個(gè)字段:
// loopQueue 版本
func (wq *loopQueue) reset() {
if wq.isEmpty() {
return
}
Releasing:
if w := wq.detach(); w != nil {
w.task <- nil
goto Releasing
}
wq.items = wq.items[:0]
wq.size = 0
wq.head = 0
wq.tail = 0
}
// stack 版本
func (wq *workerStack) reset() {
for i := 0; i < wq.len(); i++ {
wq.items[i].task <- nil
wq.items[i] = nil
}
wq.items = wq.items[:0]
}
池關(guān)閉后還可以調(diào)用Reboot()重啟:
func (p *Pool) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
go p.purgePeriodically()
}
}
由于p.purgePeriodically()在p.Release()之后檢測(cè)到池關(guān)閉就直接退出了,這里需要重新開(kāi)啟一個(gè) goroutine 定期清理。
PoolWithFunc和WorkWithFunc
上一篇文章中我們還介紹了另一種方式創(chuàng)建Pool,即NewPoolWithFunc(),指定一個(gè)函數(shù)。后面提交任務(wù)時(shí)調(diào)用p.Invoke()提供參數(shù)就可以執(zhí)行該函數(shù)了。這種方式創(chuàng)建的 Pool 和 Woker 結(jié)構(gòu)如下:
type PoolWithFunc struct {
workers []*goWorkerWithFunc
poolFunc func(interface{})
}
type goWorkerWithFunc struct {
pool *PoolWithFunc
args chan interface{}
recycleTime time.Time
}
與前面介紹的Pool和goWorker大體相似,只是PoolWithFunc保存了傳入的函數(shù)對(duì)象,使用數(shù)組保存 worker。goWorkerWithFunc以interface{}為args通道的數(shù)據(jù)類型,其實(shí)也好理解,因?yàn)橐呀?jīng)有函數(shù)了,只需要傳入數(shù)據(jù)作為參數(shù)就可以運(yùn)行了:
func (w *goWorkerWithFunc) run() {
go func() {
for args := range w.args {
if args == nil {
return
}
w.pool.poolFunc(args)
if ok := w.pool.revertWorker(w); !ok {
return
}
}
}()
}
從通道接收函數(shù)參數(shù),執(zhí)行池中保存的函數(shù)對(duì)象。
其他細(xì)節(jié)
task緩沖通道
還記得創(chuàng)建p.workerCache這個(gè)sync.Pool對(duì)象的代碼么:
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
在sync.Pool中沒(méi)有goWorker對(duì)象時(shí),調(diào)用New()方法創(chuàng)建一個(gè),注意到這里創(chuàng)建的task通道使用workerChanCap作為容量。這個(gè)變量定義在ants.go文件中:
var (
// workerChanCap determines whether the channel of a worker should be a buffered channel
// to get the best performance. Inspired by fasthttp at
// https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139
workerChanCap = func() int {
// Use blocking channel if GOMAXPROCS=1.
// This switches context from sender to receiver immediately,
// which results in higher performance (under go1.5 at least).
if runtime.GOMAXPROCS(0) == 1 {
return 0
}
// Use non-blocking workerChan if GOMAXPROCS>1,
// since otherwise the sender might be dragged down if the receiver is CPU-bound.
return 1
}()
)
為了方便對(duì)照,我把注釋也放上來(lái)了。ants參考了著名的 Web 框架fasthttp的實(shí)現(xiàn)。當(dāng)GOMAXPROCS為 1 時(shí)(即操作系統(tǒng)線程數(shù)為 1),向通道task發(fā)送會(huì)掛起發(fā)送 goroutine,將執(zhí)行流程轉(zhuǎn)向接收 goroutine,這能提升接收處理性能。如果GOMAXPROCS大于 1,ants使用帶緩沖的通道,為了防止接收 goroutine 是 CPU 密集的,導(dǎo)致發(fā)送 goroutine 被阻塞。下面是fasthttp中的相關(guān)代碼:
// src/github.com/valyala/fasthttp/workerpool.go
var workerChanCap = func() int {
// Use blocking workerChan if GOMAXPROCS=1.
// This immediately switches Serve to WorkerFunc, which results
// in higher performance (under go1.5 at least).
if runtime.GOMAXPROCS(0) == 1 {
return 0
}
// Use non-blocking workerChan if GOMAXPROCS>1,
// since otherwise the Serve caller (Acceptor) may lag accepting
// new connections if WorkerFunc is CPU-bound.
return 1
}()
自旋鎖
ants利用atomic.CompareAndSwapUint32()這個(gè)原子操作實(shí)現(xiàn)了一個(gè)自旋鎖。與其他類型的鎖不同,自旋鎖在加鎖失敗之后不會(huì)立刻進(jìn)入等待,而是會(huì)繼續(xù)嘗試。這對(duì)于很快就能獲得鎖的應(yīng)用來(lái)說(shuō)能極大提升性能,因?yàn)槟鼙苊饧渔i和解鎖導(dǎo)致的線程切換:
type spinLock uint32
func (sl *spinLock) Lock() {
backoff := 1
for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
for i := 0; i < backoff; i++ {
runtime.Gosched()
}
backoff <<= 1
}
}
func (sl *spinLock) Unlock() {
atomic.StoreUint32((*uint32)(sl), 0)
}
// NewSpinLock instantiates a spin-lock.
func NewSpinLock() sync.Locker {
return new(spinLock)
}
另外這里使用了指數(shù)退避,先等 1 個(gè)循環(huán)周期,通過(guò)runtime.Gosched()告訴運(yùn)行時(shí)切換其他 goroutine 運(yùn)行。如果還是獲取不到鎖,就再等 2 個(gè)周期。如果還是不行,再等 4,8,16...以此類推。這可以防止短時(shí)間內(nèi)獲取不到鎖,導(dǎo)致 CPU 時(shí)間的浪費(fèi)。
總結(jié)
ants源碼短小精悍,沒(méi)有引用其他任何第三方庫(kù)。各種細(xì)節(jié)處理,各種性能優(yōu)化的點(diǎn)都是值得我們細(xì)細(xì)品味的。強(qiáng)烈建議大家讀一讀源碼。閱讀優(yōu)秀的源碼,能極大地提高自身的編碼素養(yǎng)。
大家如果發(fā)現(xiàn)好玩、好用的 Go 語(yǔ)言庫(kù),歡迎到 Go 每日一庫(kù) GitHub 上提交 issue??
參考
ants GitHub:github.com/panjf2000/ants Go 每日一庫(kù) GitHub:https://github.com/darjun/go-daily-lib
推薦閱讀
