Go 每日一庫(kù)之 ants
簡(jiǎn)介
處理大量并發(fā)是 Go 語(yǔ)言的一大優(yōu)勢(shì)。語(yǔ)言內(nèi)置了方便的并發(fā)語(yǔ)法,可以非常方便的創(chuàng)建很多個(gè)輕量級(jí)的 goroutine 并發(fā)處理任務(wù)。相比于創(chuàng)建多個(gè)線程,goroutine 更輕量、資源占用更少、切換速度更快、無(wú)線程上下文切換開(kāi)銷更少。但是受限于資源總量,系統(tǒng)中能夠創(chuàng)建的 goroutine 數(shù)量也是受限的。默認(rèn)每個(gè) goroutine 占用 8KB 內(nèi)存,一臺(tái) 8GB 內(nèi)存的機(jī)器滿打滿算也只能創(chuàng)建 8GB/8KB = 1000000 個(gè) goroutine,更何況系統(tǒng)還需要保留一部分內(nèi)存運(yùn)行日常管理任務(wù),go 運(yùn)行時(shí)需要內(nèi)存運(yùn)行 gc、處理 goroutine 切換等。使用的內(nèi)存超過(guò)機(jī)器內(nèi)存容量,系統(tǒng)會(huì)使用交換區(qū)(swap),導(dǎo)致性能急速下降。我們可以簡(jiǎn)單驗(yàn)證一下創(chuàng)建過(guò)多 goroutine 會(huì)發(fā)生什么:
func main() {
var wg sync.WaitGroup
wg.Add(10000000)
for i := 0; i < 10000000; i++ {
go func() {
time.Sleep(1 * time.Minute)
}()
}
wg.Wait()
}
在我的機(jī)器上(8G內(nèi)存)運(yùn)行上面的程序會(huì)報(bào)errno 1455,即Out of Memory錯(cuò)誤,這很好理解。謹(jǐn)慎運(yùn)行。
另一方面,goroutine 的管理也是一個(gè)問(wèn)題。goroutine 只能自己運(yùn)行結(jié)束,外部沒(méi)有任何手段可以強(qiáng)制j結(jié)束一個(gè) goroutine。如果一個(gè) goroutine 因?yàn)槟撤N原因沒(méi)有自行結(jié)束,就會(huì)出現(xiàn) goroutine 泄露。此外,頻繁創(chuàng)建 goroutine 也是一個(gè)開(kāi)銷。
鑒于上述原因,自然出現(xiàn)了與線程池一樣的需求,即 goroutine 池。一般的 goroutine 池自動(dòng)管理 goroutine 的生命周期,可以按需創(chuàng)建,動(dòng)態(tài)縮容。向 goroutine 池提交一個(gè)任務(wù),goroutine 池會(huì)自動(dòng)安排某個(gè) goroutine 來(lái)處理。
ants就是其中一個(gè)實(shí)現(xiàn) goroutine 池的庫(kù)。
快速使用
本文代碼使用 Go Modules。
創(chuàng)建目錄并初始化:
$ mkdir ants && cd ants
$ go mod init github.com/darjun/go-daily-lib/ants
安裝ants庫(kù),使用v2版本:
$ go get -u github.com/panjf2000/ants/v2
我們接下來(lái)要實(shí)現(xiàn)一個(gè)計(jì)算大量整數(shù)和的程序。首先創(chuàng)建基礎(chǔ)的任務(wù)結(jié)構(gòu),并實(shí)現(xiàn)其執(zhí)行任務(wù)方法:
type Task struct {
index int
nums []int
sum int
wg *sync.WaitGroup
}
func (t *Task) Do() {
for _, num := range t.nums {
t.sum += num
}
t.wg.Done()
}
很簡(jiǎn)單,就是將一個(gè)切片中的所有整數(shù)相加。
然后我們創(chuàng)建 goroutine 池,注意池使用完后需要手動(dòng)關(guān)閉,這里使用defer關(guān)閉:
p, _ := ants.NewPoolWithFunc(10, taskFunc)
defer p.Release()
func taskFunc(data interface{}) {
task := data.(*Task)
task.Do()
fmt.Printf("task:%d sum:%d\n", task.index, task.sum)
}
上面調(diào)用了ants.NewPoolWithFunc()創(chuàng)建了一個(gè) goroutine 池。第一個(gè)參數(shù)是池容量,即池中最多有 10 個(gè) goroutine。第二個(gè)參數(shù)為每次執(zhí)行任務(wù)的函數(shù)。當(dāng)我們調(diào)用p.Invoke(data)的時(shí)候,ants池會(huì)在其管理的 goroutine 中找出一個(gè)空閑的,讓它執(zhí)行函數(shù)taskFunc,并將data作為參數(shù)。
接著,我們模擬數(shù)據(jù),做數(shù)據(jù)切分,生成任務(wù),交給 ants 處理:
const (
DataSize = 10000
DataPerTask = 100
)
nums := make([]int, DataSize, DataSize)
for i := range nums {
nums[i] = rand.Intn(1000)
}
var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
tasks := make([]*Task, 0, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {
task := &Task{
index: i + 1,
nums: nums[i*DataPerTask : (i+1)*DataPerTask],
wg: &wg,
}
tasks = append(tasks, task)
p.Invoke(task)
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())
隨機(jī)生成 10000 個(gè)整數(shù),將這些整數(shù)分為 100 份,每份 100 個(gè),生成Task結(jié)構(gòu),調(diào)用p.Invoke(task)處理。wg.Wait()等待處理完成,然后輸出ants正在運(yùn)行的 goroutine 數(shù)量,這時(shí)應(yīng)該是 0。
最后我們將結(jié)果匯總,并驗(yàn)證一下結(jié)果,與直接相加得到的結(jié)果做一個(gè)比較:
var sum int
for _, task := range tasks {
sum += task.sum
}
var expect int
for _, num := range nums {
expect += num
}
fmt.Printf("finish all tasks, result is %d expect:%d\n", sum, expect)
運(yùn)行:
$ go run main.go
...
task:96 sum:53275
task:88 sum:50090
task:62 sum:57114
task:45 sum:48041
task:82 sum:45269
running goroutines: 0
finish all tasks, result is 5010172 expect:5010172
確實(shí),任務(wù)完成之后,正在運(yùn)行的 goroutine 數(shù)量變?yōu)?0。而且我們驗(yàn)證了,結(jié)果沒(méi)有偏差。另外需要注意,goroutine 池中任務(wù)的執(zhí)行順序是隨機(jī)的,與提交任務(wù)的先后沒(méi)有關(guān)系。由上面運(yùn)行打印的任務(wù)標(biāo)識(shí)我們也能發(fā)現(xiàn)這一點(diǎn)。
函數(shù)作為任務(wù)
ants支持將一個(gè)不接受任何參數(shù)的函數(shù)作為任務(wù)提交給 goroutine 運(yùn)行。由于不接受參數(shù),我們提交的函數(shù)要么不需要外部數(shù)據(jù),只需要處理自身邏輯,否則就必須用某種方式將需要的數(shù)據(jù)傳遞進(jìn)去,例如閉包。
提交函數(shù)作為任務(wù)的 goroutine 池使用ants.NewPool()創(chuàng)建,它只接受一個(gè)參數(shù)表示池子的容量。調(diào)用池子對(duì)象的Submit()方法來(lái)提交任務(wù),將一個(gè)不接受任何參數(shù)的函數(shù)傳入。
最開(kāi)始的例子可以改寫(xiě)一下。增加一個(gè)任務(wù)包裝函數(shù),將任務(wù)需要的參數(shù)作為包裝函數(shù)的參數(shù)。包裝函數(shù)返回實(shí)際的任務(wù)函數(shù),該任務(wù)函數(shù)就可以通過(guò)閉包訪問(wèn)它需要的數(shù)據(jù)了:
type taskFunc func()
func taskFuncWrapper(nums []int, i int, sum *int, wg *sync.WaitGroup) taskFunc {
return func() {
for _, num := range nums[i*DataPerTask : (i+1)*DataPerTask] {
*sum += num
}
fmt.Printf("task:%d sum:%d\n", i+1, *sum)
wg.Done()
}
}
調(diào)用ants.NewPool(10)創(chuàng)建 goroutine 池,同樣池子用完需要釋放,這里使用defer:
p, _ := ants.NewPool(10)
defer p.Release()
生成模擬數(shù)據(jù),切分任務(wù)。提交任務(wù)給ants池執(zhí)行,這里使用taskFuncWrapper()包裝函數(shù)生成具體的任務(wù),然后調(diào)用p.Submit()提交:
nums := make([]int, DataSize, DataSize)
for i := range nums {
nums[i] = rand.Intn(1000)
}
var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
partSums := make([]int, DataSize/DataPerTask, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {
p.Submit(taskFuncWrapper(nums, i, &partSums[i], &wg))
}
wg.Wait()
匯總結(jié)果,驗(yàn)證:
var sum int
for _, partSum := range partSums {
sum += partSum
}
var expect int
for _, num := range nums {
expect += num
}
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks, result is %d expect is %d\n", sum, expect)
這個(gè)程序的功能與最開(kāi)始的完全相同。
執(zhí)行流程
GitHub 倉(cāng)庫(kù)中有個(gè)執(zhí)行流程圖,我重新繪制了一下:

執(zhí)行流程如下:
初始化 goroutine 池; 提交任務(wù)給 goroutine 池,檢查是否有空閑的 goroutine: 已到上限,檢查 goroutine 池是否是非阻塞的: 未到上限,創(chuàng)建一個(gè)新的 goroutine 處理任務(wù) 非阻塞,直接返回 nil表示執(zhí)行失敗阻塞,等待 goroutine 空閑 有,獲取空閑 goroutine 無(wú),檢查池中的 goroutine 數(shù)量是否已到池容量上限: 任務(wù)處理完成,將 goroutine 交還給池,以待處理下一個(gè)任務(wù)
選項(xiàng)
ants提供了一些選項(xiàng)可以定制 goroutine 池的行為。選項(xiàng)使用Options結(jié)構(gòu)定義:
// src/github.com/panjf2000/ants/options.go
type Options struct {
ExpiryDuration time.Duration
PreAlloc bool
MaxBlockingTasks int
Nonblocking bool
PanicHandler func(interface{})
Logger Logger
}
各個(gè)選項(xiàng)含義如下:
ExpiryDuration:過(guò)期時(shí)間。表示 goroutine 空閑多長(zhǎng)時(shí)間之后會(huì)被ants池回收PreAlloc:預(yù)分配。調(diào)用NewPool()/NewPoolWithFunc()之后預(yù)分配worker(管理一個(gè)工作 goroutine 的結(jié)構(gòu)體)切片。而且使用預(yù)分配與否會(huì)直接影響池中管理worker的結(jié)構(gòu)。見(jiàn)下面源碼MaxBlockingTasks:最大阻塞任務(wù)數(shù)量。即池中 goroutine 數(shù)量已到池容量,且所有 goroutine 都處理繁忙狀態(tài),這時(shí)到來(lái)的任務(wù)會(huì)在阻塞列表等待。這個(gè)選項(xiàng)設(shè)置的是列表的最大長(zhǎng)度。阻塞的任務(wù)數(shù)量達(dá)到這個(gè)值后,后續(xù)任務(wù)提交直接返回失敗Nonblocking:池是否阻塞,默認(rèn)阻塞。提交任務(wù)時(shí),如果ants池中 goroutine 已到上限且全部繁忙,阻塞的池會(huì)將任務(wù)添加的阻塞列表等待(當(dāng)然受限于阻塞列表長(zhǎng)度,見(jiàn)上一個(gè)選項(xiàng))。非阻塞的池直接返回失敗PanicHandler:panic 處理。遇到 panic 會(huì)調(diào)用這里設(shè)置的處理函數(shù)Logger:指定日志記錄器
NewPool()部分源碼:
if p.options.PreAlloc {
if size == -1 {
return nil, ErrInvalidPreAllocSize
}
p.workers = newWorkerArray(loopQueueType, size)
} else {
p.workers = newWorkerArray(stackType, 0)
}
使用預(yù)分配時(shí),創(chuàng)建loopQueueType類型的結(jié)構(gòu),反之創(chuàng)建stackType類型。這是ants定義的兩種管理worker的數(shù)據(jù)結(jié)構(gòu)。
ants定義了一些With*函數(shù)來(lái)設(shè)置這些選項(xiàng):
func WithOptions(options Options) Option {
return func(opts *Options) {
*opts = options
}
}
func WithExpiryDuration(expiryDuration time.Duration) Option {
return func(opts *Options) {
opts.ExpiryDuration = expiryDuration
}
}
func WithPreAlloc(preAlloc bool) Option {
return func(opts *Options) {
opts.PreAlloc = preAlloc
}
}
func WithMaxBlockingTasks(maxBlockingTasks int) Option {
return func(opts *Options) {
opts.MaxBlockingTasks = maxBlockingTasks
}
}
func WithNonblocking(nonblocking bool) Option {
return func(opts *Options) {
opts.Nonblocking = nonblocking
}
}
func WithPanicHandler(panicHandler func(interface{})) Option {
return func(opts *Options) {
opts.PanicHandler = panicHandler
}
}
func WithLogger(logger Logger) Option {
return func(opts *Options) {
opts.Logger = logger
}
}
這里使用了 Go 語(yǔ)言中非常常見(jiàn)的一種模式,我稱之為選項(xiàng)模式,非常方便地構(gòu)造有大量參數(shù),且大部分有默認(rèn)值或一般不需要顯式設(shè)置的對(duì)象。
我們來(lái)驗(yàn)證幾個(gè)選項(xiàng)。
最大等待隊(duì)列長(zhǎng)度
ants池設(shè)置容量之后,如果所有的 goroutine 都在處理任務(wù)。這時(shí)提交的任務(wù)默認(rèn)會(huì)進(jìn)入等待隊(duì)列,WithMaxBlockingTasks(maxBlockingTasks int)可以設(shè)置等待隊(duì)列的最大長(zhǎng)度。超過(guò)這個(gè)長(zhǎng)度,提交任務(wù)直接返回錯(cuò)誤:
func wrapper(i int, wg *sync.WaitGroup) func() {
return func() {
fmt.Printf("hello from task:%d\n", i)
time.Sleep(1 * time.Second)
wg.Done()
}
}
func main() {
p, _ := ants.NewPool(4, ants.WithMaxBlockingTasks(2))
defer p.Release()
var wg sync.WaitGroup
wg.Add(8)
for i := 1; i <= 8; i++ {
go func(i int) {
err := p.Submit(wrapper(i, &wg))
if err != nil {
fmt.Printf("task:%d err:%v\n", i, err)
wg.Done()
}
}(i)
}
wg.Wait()
}
上面代碼中,我們?cè)O(shè)置 goroutine 池的容量為 4,最大阻塞隊(duì)列長(zhǎng)度為 2。然后一個(gè) for 提交 8 個(gè)任務(wù),期望結(jié)果是:4 個(gè)任務(wù)在執(zhí)行,2 個(gè)任務(wù)在等待,2 個(gè)任務(wù)提交失敗。運(yùn)行結(jié)果:
hello from task:8
hello from task:5
hello from task:4
hello from task:6
task:7 err:too many goroutines blocked on submit or Nonblocking is set
task:3 err:too many goroutines blocked on submit or Nonblocking is set
hello from task:1
hello from task:2
我們看到提交任務(wù)失敗,打印too many goroutines blocked ...。
代碼中有 4 點(diǎn)需要注意:
提交任務(wù)必須并行進(jìn)行。如果是串行提交,第 5 個(gè)任務(wù)提交時(shí)由于池中沒(méi)有空閑的 goroutine 處理該任務(wù), Submit()方法會(huì)被阻塞,后續(xù)任務(wù)就都不能提交了。也就達(dá)不到驗(yàn)證的目的了由于任務(wù)可能提交失敗,失敗的任務(wù)不會(huì)實(shí)際執(zhí)行,所以實(shí)際上 wg.Done()次數(shù)會(huì)小于 8。因而在err != nil分支中我們需要調(diào)用一次wg.Done()。否則wg.Wait()會(huì)永遠(yuǎn)阻塞為了避免任務(wù)執(zhí)行過(guò)快,空出了 goroutine,觀察不到現(xiàn)象,每個(gè)任務(wù)中我使用 time.Sleep(1 * time.Second)休眠 1s由于 goroutine 之間的執(zhí)行順序未顯式同步,故每次執(zhí)行的順序不確定
由于簡(jiǎn)單起見(jiàn),前面的例子中Submit()方法的返回值都被我們忽略了。實(shí)際開(kāi)發(fā)中一定不要忽略。
非阻塞
ants池默認(rèn)是阻塞的,我們可以使用WithNonblocking(nonblocking bool)設(shè)置其為非阻塞。非阻塞的ants池中,在所有 goroutine 都在處理任務(wù)時(shí),提交新任務(wù)會(huì)直接返回錯(cuò)誤:
func main() {
p, _ := ants.NewPool(2, ants.WithNonblocking(true))
defer p.Release()
var wg sync.WaitGroup
wg.Add(3)
for i := 1; i <= 3; i++ {
err := p.Submit(wrapper(i, &wg))
if err != nil {
fmt.Printf("task:%d err:%v\n", i, err)
wg.Done()
}
}
wg.Wait()
}
使用上個(gè)例子中的wrapper()函數(shù),ants池容量設(shè)置為 2。連續(xù)提交 3 個(gè)任務(wù),期望結(jié)果前兩個(gè)任務(wù)正常執(zhí)行,第 3 個(gè)任務(wù)提交時(shí)返回錯(cuò)誤:
hello from task:2
task:3 err:too many goroutines blocked on submit or Nonblocking is set
hello from task:1
panic 處理器
一個(gè)魯棒性強(qiáng)的庫(kù)一定不會(huì)忽視錯(cuò)誤的處理,特別是宕機(jī)相關(guān)的錯(cuò)誤。在 Go 語(yǔ)言中就是 panic,也被稱為運(yùn)行時(shí)恐慌,在程序運(yùn)行的過(guò)程中產(chǎn)生的嚴(yán)重性錯(cuò)誤,例如索引越界,空指針解引用等,都會(huì)觸發(fā) panic。如果不處理 panic,程序會(huì)直接意外退出,可能造成數(shù)據(jù)丟失的嚴(yán)重后果。
ants中如果 goroutine 在執(zhí)行任務(wù)時(shí)發(fā)生panic,會(huì)終止當(dāng)前任務(wù)的執(zhí)行,將發(fā)生錯(cuò)誤的堆棧輸出到os.Stderr。注意,該 goroutine 還是會(huì)被放回池中,下次可以取出執(zhí)行新的任務(wù)。
func wrapper(i int, wg *sync.WaitGroup) func() {
return func() {
fmt.Printf("hello from task:%d\n", i)
if i%2 == 0 {
panic(fmt.Sprintf("panic from task:%d", i))
}
wg.Done()
}
}
func main() {
p, _ := ants.NewPool(2)
defer p.Release()
var wg sync.WaitGroup
wg.Add(3)
for i := 1; i <= 2; i++ {
p.Submit(wrapper(i, &wg))
}
time.Sleep(1 * time.Second)
p.Submit(wrapper(3, &wg))
p.Submit(wrapper(5, &wg))
wg.Wait()
}
我們讓偶數(shù)個(gè)任務(wù)觸發(fā)panic。提交兩個(gè)任務(wù),第二個(gè)任務(wù)一定會(huì)觸發(fā)panic。觸發(fā)panic之后,我們還可以繼續(xù)提交任務(wù) 3、5。注意這里沒(méi)有 4,提交任務(wù) 4 還是會(huì)觸發(fā)panic。
上面的程序需要注意 2 點(diǎn):
任務(wù)函數(shù)中 wg.Done()是在panic方法之后,如果觸發(fā)了panic,函數(shù)中的其他正常邏輯就不會(huì)再繼續(xù)執(zhí)行了。所以我們雖然wg.Add(3),但是一共提交了 4 個(gè)任務(wù),其中一個(gè)任務(wù)觸發(fā)了panic,wg.Done()沒(méi)有正確執(zhí)行。實(shí)際開(kāi)發(fā)中,我們一般使用defer語(yǔ)句來(lái)確保wg.Done()一定會(huì)執(zhí)行在 for 循環(huán)之后,我添加了一行代碼 time.Sleep(1 * time.Second)。如果沒(méi)有這一行,后續(xù)的兩條Submit()方法可以直接執(zhí)行,可能會(huì)導(dǎo)致任務(wù)很快就完成了,wg.Wait()直接返回了,這時(shí)panic的堆棧還沒(méi)有輸出。你可以嘗試注釋掉這行代碼運(yùn)行看看結(jié)果
除了ants提供的默認(rèn) panic 處理器,我們還可以使用WithPanicHandler(paincHandler func(interface{}))指定我們自己編寫(xiě)的 panic 處理器。處理器的參數(shù)就是傳給panic的值:
func panicHandler(err interface{}) {
fmt.Fprintln(os.Stderr, err)
}
p, _ := ants.NewPool(2, ants.WithPanicHandler(panicHandler))
defer p.Release()
其余代碼與上面的完全相同,指定了panicHandler后觸發(fā)panic就會(huì)執(zhí)行它。運(yùn)行:
hello from task:2
panic from task:2
hello from task:1
hello from task:5
hello from task:3
看到輸出了傳給panic函數(shù)的字符串(第二行輸出)。
默認(rèn)池
為了方便使用,很多 Go 庫(kù)都喜歡提供其核心功能類型的一個(gè)默認(rèn)實(shí)現(xiàn)。可以直接通過(guò)庫(kù)提供的接口調(diào)用。例如net/http,例如ants。ants庫(kù)中定義了一個(gè)默認(rèn)的池,默認(rèn)容量為MaxInt32。goroutine 池的各個(gè)方法都可以直接通過(guò)ants包直接訪問(wèn):
// src/github.com/panjf2000/ants/ants.go
defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)
func Submit(task func()) error {
return defaultAntsPool.Submit(task)
}
func Running() int {
return defaultAntsPool.Running()
}
func Cap() int {
return defaultAntsPool.Cap()
}
func Free() int {
return defaultAntsPool.Free()
}
func Release() {
defaultAntsPool.Release()
}
func Reboot() {
defaultAntsPool.Reboot()
}
直接使用:
func main() {
defer ants.Release()
var wg sync.WaitGroup
wg.Add(2)
for i := 1; i <= 2; i++ {
ants.Submit(wrapper(i, &wg))
}
wg.Wait()
}
默認(rèn)池也需要Release()。
總結(jié)
本文介紹了 goroutine 池的由來(lái),并借由ants庫(kù)介紹了基本的使用方法,和一些細(xì)節(jié)。ants源碼不多,去掉測(cè)試的核心代碼只有 1k 行左右,建議有時(shí)間、感興趣的童鞋深入閱讀。
大家如果發(fā)現(xiàn)好玩、好用的 Go 語(yǔ)言庫(kù),歡迎到 Go 每日一庫(kù) GitHub 上提交 issue??
參考
ants GitHub:github.com/panjf2000/ants Go 每日一庫(kù) GitHub:https://github.com/darjun/go-daily-lib
推薦閱讀
