在 net/http 標準庫做了點手腳
Golang標準庫搭建的http服務端會為每一個請求創(chuàng)建一個協程去處理,雖然每個協程占有的棧空間很小,但是如果萬一來個數百萬千萬的請求(當然,這種可能性有點極端),服務端只能對每一條請求乖乖創(chuàng)建一個協程,這時候,該go進程就存在大量的goroutine,占用服務器資源不說,還會增大gc壓力。這時候就想給該機制加一個限制,搞一個協程池限制一下最大處理請求的協程數量。
瀏覽一下標準庫該部分的源碼實現
func?(srv *Server)?Serve(l net.Listener)?error?{
??.....
??ctx := context.WithValue(baseCtx, ServerContextKey, srv)
??for?{? ? //等待建立連接,沒有請求則會阻塞住
????rw, err := l.Accept()
????if?err !=?nil?{......}
????connCtx := ctx
????if?cc := srv.ConnContext; cc !=?nil?{......}
? ? ......? ??//開啟協程處理請求,主要需要改造的就在此處????go?c.serve(connCtx)
??}
}
主要需要針對創(chuàng)建協程加一個限制條件,如果小于協程池規(guī)定的數量就允許創(chuàng)建,否則等待協程池有空閑位再創(chuàng)建。
01
大致思路
總體使用生產者消費者模式。使用兩個有緩沖區(qū)的channel來實現協程的并發(fā)控制,一個sigChannel通過緩沖空間限制最大的協程數量,另一個jobChannel則用于傳遞請求的數據(包括請求函數以及參數),該jobChannel對于是否緩沖沒有要求。
流程
(1)首先當請求到來之后,往sigChannel中寫入標志位數據,如果此時有空閑位置,則不會阻塞在此處;
(2)之后往jobChannel中寫入要執(zhí)行的函數以及參數;
(3)后臺監(jiān)聽jobChannel的函數worker(該函數要源源不斷讀取管道數據)則會取出管道中的數據;
(4)worker創(chuàng)建goroutine執(zhí)行請求函數;
(5)該請求函數執(zhí)行完成后,goroutine再去取出sigChannel管道中的標志數據,騰出來位置;
注:如果開始時候sigChannel寫數據寫入不了,則說明該池子滿了,則需要阻塞等待。這樣就實現了使用sigChannel控制并發(fā)量的功能。

02
代碼實現
接下來使用代碼實現這種思想
1、首先把net/http包中的代碼給保存一份,防止被搞壞。直接在目錄下搞了一個git倉庫,先把源碼commit一次,再搞一個分支自己瞎搞著玩,。在net/http下建了一個放協程池函數的文件夾,創(chuàng)建一個go文件。
2、首先定義兩個channel,一個用來存放信號,一個存放函數以及參數,結合到http處理這里
type?Info?struct?{? //函數名稱,對應http中c.serve()函數
??ParamFunc?func(ctx context.Context)? //函數的參數,對應c.serve()的connCtx參數
??Param?context.Context
}
type?Task?struct?{? //用于傳遞函數以及參數的管道,對應jobChannel
??taskLet?chan?Info? //用于傳遞信號量的管道
??taskCmp?chan?int64
}
type?Pool?struct?{
??//兩個管道對應的結構體
??tasks *Task?
??//協程池容量
??taskNum?int64?
}
3、創(chuàng)建一個協程池對象,也就是初始化這兩個管道
func?NewPool(n?int64)?*Pool?{
??taskc :=?make(chan?Info,n)
??workc :=?make(chan?int64,n)
??return?&Pool{
????tasks: &Task{
??????taskLet: taskc,
??????taskCmp: workc,
????},
????taskNum: n,
??}
}4、創(chuàng)建一個put函數,用于往兩個channel中塞數據,即生產者
func?(p *Pool)?Put(a Info)??{
? //在sigChannel中塞數據,如果阻塞說明沒有空閑? p.tasks.taskCmp <-?1??//在jobChannel中塞數據??? p.tasks.taskLet <- a
}
5、創(chuàng)建一個run函數,用于監(jiān)聽管道并取出數據,即消費者
func?(p *Pool)?Run()??{
?//持續(xù)監(jiān)聽jobChannel管道,只要有數據監(jiān)聽到則說明已經有空閑位了,?//需要創(chuàng)建goroutine執(zhí)行傳來的函數以及參數
??for?{
????select?{
????case?let := <- p.tasks.taskLet:
??????go?p.Work(let)
????}
??}
}
func?(p *Pool)?Work(f Info)??{
??//執(zhí)行傳入的函數? f.ParamFunc(f.Param)??//執(zhí)行完函數后把sigChannel中標志位取出??<- p.tasks.taskCmp
}
6、修改源碼,需要修改的代碼加到server.go中
func?(srv *Server)?Serve(l net.Listener)?error?{
??.....
??//初始化一個連接池
? po := currencyctl.NewPool(srv.CorrencyNum)
??//異步開啟這個池子,否則會阻塞? go po.Run()
??ctx := context.WithValue(baseCtx, ServerContextKey, srv)
??for?{
????//等待建立連接,沒有請求則會阻塞住
????rw, err := l.Accept()
????if?err !=?nil?{......}
????connCtx := ctx
????if?cc := srv.ConnContext; cc !=?nil?{......}
????......
????//go c.serve(connCtx)
? ? //改造成協程池
? ? po.Put(currencyctl.Info{ParamFunc:c.serve,Param:connCtx})
??}
}
我將處理并發(fā)數量的參數放到了server結構體中,通過http.ListenAndServe()方法傳遞并在下一次賦值。
03
測試階段
接下來跑一個測試用例:
測試代碼很簡單,如下:
package?main
import?(
??"fmt"
??"net/http"
??_"net/http/pprof"
??"time"
)
func?main()?{
??go?func()?{//使用pprof跟蹤
????http.ListenAndServe(":6060",nil,10)
??}()
??http.HandleFunc("/",?func(writer http.ResponseWriter, request *http.Request)?{
????fmt.Println("收到請求。。。")
????time.Sleep(time.Second*1)
????writer.Write([]byte("hello http"))
??})
??http.ListenAndServe(":8000",?nil,100)//限制最大并發(fā)量100
}啟動項目,做一個壓力測試(這里我是用了go-stress-testing工具):
使用并發(fā)請求量為1000時候,查看pprof工具,查看系統協程數,控制在了100左右。

設置協程池協程量為200時候,使用1000并發(fā)請求,看到協程量控制在200

經過驗證,該協程池在net/http標準庫上的應用基本成功了,但是只是測試了一個簡單的接口,沒有經過復雜的業(yè)務驗證,可能存在好多未知問題。
所以,又乖乖git checkout切到了原始分支。
