[警惕] 請(qǐng)勿濫用goroutine
前言
Go語言中,goroutine的創(chuàng)建成本很低,調(diào)度效率很高,人稱可以開幾百幾千萬個(gè)goroutine,但是真正開幾百幾千萬個(gè)goroutine就不會(huì)有任何影響嗎?本文我們就一起來看一看goroutine是否有數(shù)量限制并介紹幾種正確使用goroutine的姿勢~。
現(xiàn)狀
在Go語言中,goroutine的創(chuàng)建成本很低,調(diào)度效率高,Go語言在設(shè)計(jì)時(shí)就是按以數(shù)萬個(gè)goroutine為規(guī)范進(jìn)行設(shè)計(jì)的,數(shù)十萬個(gè)并不意外,但是goroutine在內(nèi)存占用方面確實(shí)具有有限的成本,你不能創(chuàng)造無限數(shù)量的它們,比如這個(gè)例子:
ch := generate()
go func() {
for range ch { }
}()
這段代碼通過generate()方法獲得一個(gè)channel,然后啟動(dòng)一個(gè)goroutine一直去處理這個(gè)channel的數(shù)據(jù),這個(gè)goroutine什么時(shí)候會(huì)退出?答案是不確定,ch是由函數(shù)generate()來決定的,所以有可能這個(gè)goroutine永遠(yuǎn)都不會(huì)退出,這就有可能會(huì)引發(fā)內(nèi)存泄漏。
goroutine就是G-P-M調(diào)度模型中的G,我們可以把goroutine看成是一種協(xié)程,創(chuàng)建goroutine也是有開銷的,但是開銷很小,初始只需要2-4k的??臻g,當(dāng)goroutine數(shù)量越來越大時(shí),同時(shí)存在的goroutine也越來越多時(shí),程序就隱藏內(nèi)存泄漏的問題??匆粋€(gè)例子:
func main() {
for i := 0; i < math.MaxInt64; i++ {
go func(i int) {
time.Sleep(5 * time.Second)
}(i)
}
}
大家可以在自己的電腦上運(yùn)行一下這個(gè)程序,觀察一下CPU和內(nèi)存占用情況,我說下我運(yùn)行后的現(xiàn)象:


CPU使用率瘋狂上漲 內(nèi)存占用率也不斷上漲 運(yùn)行一段時(shí)間后主進(jìn)程崩潰了。。。
因此每次在編寫GO程序時(shí),都應(yīng)該仔細(xì)考慮一個(gè)問題:
您將要啟動(dòng)的
goroutine將如何以及在什么條件下結(jié)束?
接下來我們就來介紹幾種方式可以控制goroutine和goroutine的數(shù)量。
控制goroutine的方法
Context包
Go 語言中的每一個(gè)請(qǐng)求的都是通過一個(gè)單獨(dú)的 goroutine 進(jìn)行處理的,HTTP/RPC 請(qǐng)求的處理器往往都會(huì)啟動(dòng)新的Goroutine 訪問數(shù)據(jù)庫和 RPC 服務(wù),我們可能會(huì)創(chuàng)建多個(gè)goroutine 來處理一次請(qǐng)求,而 Context 的主要作用就是在不同的 goroutine 之間同步請(qǐng)求特定的數(shù)據(jù)、取消信號(hào)以及處理請(qǐng)求的截止日期。
Context包主要衍生了四個(gè)函數(shù):
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context
使用這四個(gè)函數(shù)我們對(duì)goroutine進(jìn)行控制,具體展開就不再本文說了,我們以WithCancel方法寫一個(gè)例子:
func main() {
ctx,cancel := context.WithCancel(context.Background())
go Speak(ctx)
time.Sleep(10*time.Second)
cancel()
time.Sleep(2 * time.Second)
fmt.Println("bye bye!")
}
func Speak(ctx context.Context) {
for range time.Tick(time.Second){
select {
case <- ctx.Done():
fmt.Println("asong哥,我收到信號(hào)了,要走了,拜拜!")
return
default:
fmt.Println("asong哥,你好帥呀~balabalabalabala")
}
}
}
運(yùn)行結(jié)果:
asong哥,你好帥呀~balabalabalabala
# ....... 省略部分
asong哥,我收到信號(hào)了,要走了,拜拜!
bye bye!
這里我們使用withCancel創(chuàng)建了一個(gè)基于Background的ctx,然后啟動(dòng)了一個(gè)goroutine每隔1s夸我一句,10s后在主goroutine中發(fā)送取消新信號(hào),那么啟動(dòng)的goroutine在檢測到信號(hào)后就會(huì)取消退出。
channel
我們知道channel是用于goroutine的數(shù)據(jù)通信,在Go中通過goroutine+channel的方式,可以簡單、高效地解決并發(fā)問題。上面我們介紹了使用context來達(dá)到對(duì)goroutine的控制,實(shí)際上context的內(nèi)部實(shí)現(xiàn)也是使用的channel,所以有時(shí)候?yàn)榱藢?shí)現(xiàn)方便,我們可以直接通過channel+select或者channel+close的方式來控制goroutine的退出,我們分別來一寫一個(gè)例子:
channel+select
func fibonacci(ch chan int, done chan struct{}) {
x, y := 0, 1
for {
select {
case ch <- x:
x, y = y, x+y
case <-done:
fmt.Println("over")
return
}
}
}
func main() {
ch := make(chan int)
done := make(chan struct{})
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-ch)
}
done <- struct{}{}
}()
fibonacci(ch, done)
}
上面的例子是計(jì)算斐波那契數(shù)列的結(jié)果,我們使用兩個(gè)channel,一個(gè)channel用來傳輸數(shù)據(jù),另外一個(gè)channel用來做結(jié)束信號(hào),這里我們使用的是select的阻塞式的收發(fā)操作,直到有一個(gè)channel發(fā)生狀態(tài)改變,我們也可以在select中使用default語句,那么select語句在執(zhí)行時(shí)會(huì)遇到這兩種情況:
當(dāng)存在可以收發(fā)的 Channel時(shí),直接處理該Channel對(duì)應(yīng)的case;當(dāng)不存在可以收發(fā)的 Channel時(shí),執(zhí)行default中的語句;
建議大家使用帶default的方式,因?yàn)樵谝粋€(gè)nil channel上的操作會(huì)一直被阻塞,如果沒有default case,只有nil channel的select會(huì)一直被阻塞。
channel+close
channel可以單個(gè)出隊(duì),也可以循環(huán)出隊(duì),因?yàn)槲覀兛梢允褂?code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;">for-range循環(huán)處理channel,range ch會(huì)一直迭代到channel被關(guān)閉,根據(jù)這個(gè)特性,我們也可做到對(duì)goroutine的控制:
func main() {
ch := make(chan int, 10)
go func() {
for i:=0; i<10;i++{
ch <- i
}
close(ch)
}()
go func() {
for val := range ch{
fmt.Println(val)
}
fmt.Println("receive data over")
}()
time.Sleep(5* time.Second)
fmt.Println("program over")
}
如果對(duì)channel不熟悉的朋友可以看一下我之前的文章:學(xué)習(xí)channel設(shè)計(jì):從入門到放棄
控制goroutine的數(shù)量
我們可以通過以下方式達(dá)到控制goroutine數(shù)量的目的,不過本身Go的goroutine就已經(jīng)很輕量了,所以控制goroutine的數(shù)量還是要根據(jù)具體場景分析,并不是所有場景都需要控制goroutine的數(shù)量的,一般在并發(fā)場景我們會(huì)考慮控制goroutine的數(shù)量,接下來我們來看一看如下幾種方式達(dá)到控制goroutine數(shù)量的目的。
協(xié)程池
寫 go 并發(fā)程序的時(shí)候如果程序會(huì)啟動(dòng)大量的goroutine ,勢必會(huì)消耗大量的系統(tǒng)資源(內(nèi)存,CPU),所以可以考慮使用goroutine池達(dá)到復(fù)用goroutine,節(jié)省資源,提升性能。也有一些開源的協(xié)程池庫,例如:ants、go-playground/pool、jeffail/tunny等,這里我們看ants的一個(gè)官方例子:
var sum int32
func myFunc(i interface{}) {
n := i.(int32)
atomic.AddInt32(&sum, n)
fmt.Printf("run with %d\n", n)
}
func demoFunc() {
time.Sleep(10 * time.Millisecond)
fmt.Println("Hello World!")
}
func main() {
defer ants.Release()
runTimes := 1000
// Use the common pool.
var wg sync.WaitGroup
syncCalculateSum := func() {
demoFunc()
wg.Done()
}
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = ants.Submit(syncCalculateSum)
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks.\n")
// Use the pool with a function,
// set 10 to the capacity of goroutine pool and 1 second for expired duration.
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
myFunc(i)
wg.Done()
})
defer p.Release()
// Submit tasks one by one.
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = p.Invoke(int32(i))
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", p.Running())
fmt.Printf("finish all tasks, result is %d\n", sum)
}
這個(gè)例子其實(shí)就是計(jì)算大量整數(shù)和的程序,這里通過ants.NewPoolWithFunc()創(chuàng)建了一個(gè) goroutine 池。第一個(gè)參數(shù)是池容量,即池中最多有 10 個(gè)goroutine。第二個(gè)參數(shù)為每次執(zhí)行任務(wù)的函數(shù)。當(dāng)我們調(diào)用p.Invoke(data)的時(shí)候,ants池會(huì)在其管理的 goroutine 中找出一個(gè)空閑的,讓它執(zhí)行函數(shù)taskFunc,并將data作為參數(shù)。
具體這個(gè)庫的設(shè)計(jì)就不詳細(xì)展開了,后面會(huì)專門寫一篇文章來介紹如何設(shè)計(jì)一個(gè)協(xié)程池。
信號(hào)量Semaphore
Go語言的官方擴(kuò)展包為我們提供了一個(gè)基于權(quán)重的信號(hào)量Semaphore,我可以根據(jù)信號(hào)量來控制一定數(shù)量的 goroutine 并發(fā)工作,官方也給提供了一個(gè)例子:workerPool,代碼有點(diǎn)長就不在這里貼了,我們來自己寫一個(gè)稍微簡單點(diǎn)的例子:
const (
Limit = 3 // 同時(shí)運(yùn)行的goroutine上限
Weight = 1 // 信號(hào)量的權(quán)重
)
func main() {
names := []string{
"asong1",
"asong2",
"asong3",
"asong4",
"asong5",
"asong6",
"asong7",
}
sem := semaphore.NewWeighted(Limit)
var w sync.WaitGroup
for _, name := range names {
w.Add(1)
go func(name string) {
sem.Acquire(context.Background(), Weight)
fmt.Println(name)
time.Sleep(2 * time.Second) // 延時(shí)能更好的體現(xiàn)出來控制
sem.Release(Weight)
w.Done()
}(name)
}
w.Wait()
fmt.Println("over--------")
}
上面的例子我們使用 NewWeighted() 函數(shù)創(chuàng)建一個(gè)并發(fā)訪問的最大資源數(shù),也就是同時(shí)運(yùn)行的goroutine上限為3,使用Acquire函數(shù)來獲取指定個(gè)數(shù)的資源,如果當(dāng)前沒有空閑資源可用,則當(dāng)前goroutine將陷入休眠狀態(tài),最后使用release函數(shù)釋放已使用資源數(shù)量(計(jì)數(shù)器)進(jìn)行更新減少,并通知其它 waiters。
channel+waitgroup實(shí)現(xiàn)
這個(gè)方法我是在煎魚大佬的一篇文章學(xué)到的:來,控制一下Goroutine的并發(fā)數(shù)量
主要實(shí)現(xiàn)原理是利用waitGroup做并發(fā)控制,利用channel可以在goroutine之間進(jìn)行數(shù)據(jù)通信,通過限制channel的隊(duì)列長度來控制同時(shí)運(yùn)行的goroutine數(shù)量,例子如下:
func main() {
count := 9 // 要運(yùn)行的goroutine數(shù)量
limit := 3 // 同時(shí)運(yùn)行的goroutine為3個(gè)
ch := make(chan bool, limit)
wg := sync.WaitGroup{}
wg.Add(count)
for i:=0; i < count; i++{
go func(num int) {
defer wg.Done()
ch <- true // 發(fā)送信號(hào)
fmt.Printf("%d 我在干活 at time %d\n",num,time.Now().Unix())
time.Sleep(2 * time.Second)
<- ch // 接收數(shù)據(jù)代表退出了
}(i)
}
wg.Wait()
}
這種實(shí)現(xiàn)方式真的妙,與信號(hào)量的實(shí)現(xiàn)方式基本相似,某些場景大家也可以考慮使用這種方式來達(dá)到控制goroutine的目的,不過最好封裝一下,要不有點(diǎn)丑陋,感興趣的可以看一下煎魚大佬是怎么封裝的:https://github.com/eddycjy/gsema/blob/master/sema.go
總結(jié)
本文主要目的是介紹控制goroutine的幾種方式、控制goroutine數(shù)量的幾種方式,goroutine的創(chuàng)建成本低、效率高帶來了很大優(yōu)勢,同時(shí)也會(huì)有一些弊端,這就需要我們?cè)趯?shí)際開發(fā)中根據(jù)具體場景選擇正確的方式使用goroutine,本文介紹的技術(shù)方案也可能是片面的,如果你有更好的方式可以在評(píng)論區(qū)中分享出來,我們大家一起學(xué)習(xí)學(xué)習(xí)~。
文中代碼已經(jīng)上傳github,歡迎 star:https://github.com/asong2020/Golang_Dream/tree/master/code_demo/goroutine_demo
推薦閱讀
