作為 Gopher 你一定要懂的連接池
問題引入
作為一名Golang開發(fā)者,線上環(huán)境遇到過好幾次連接數暴增問題(mysql/redis/kafka等)。
糾其原因,Golang作為常駐進程,請求第三方服務或者資源完畢后,需要手動關閉連接,否則連接會一直存在。而很多時候,開發(fā)者不一定記得關閉這個連接。
這樣是不是很麻煩?于是有了連接池。顧名思義,連接池就是管理連接的;我們從連接池獲取連接,請求完畢后再將連接還給連接池;連接池幫我們做了連接的建立、復用以及回收工作。
在設計與實現連接池時,我們通常需要考慮以下幾個問題:
連接池的連接數目是否有限制,最大可以建立多少個連接? 當連接長時間沒有使用,需要回收該連接嗎? 業(yè)務請求需要獲取連接時,此時若連接池無空閑連接且無法新建連接,業(yè)務需要排隊等待嗎? 排隊的話又存在另外的問題,隊列長度有無限制,排隊時間呢?
Golang連接池實現原理
我們以Golang HTTP連接池為例,分析連接池的實現原理。
結構體Transport
Transport結構定義如下:
type?Transport?struct?{
????//操作空閑連接需要獲取鎖
????idleMu???????sync.Mutex
????//空閑連接池,key為協議目標地址等組合
????idleConn?????map[connectMethodKey][]*persistConn?//?most?recently?used?at?end
????//等待空閑連接的隊列,基于切片實現,隊列大小無限制
????idleConnWait?map[connectMethodKey]wantConnQueue??//?waiting?getConns
????
????//排隊等待建立連接需要獲取鎖
????connsPerHostMu???sync.Mutex
????//每個host建立的連接數
????connsPerHost?????map[connectMethodKey]int
????//等待建立連接的隊列,同樣基于切片實現,隊列大小無限制
????connsPerHostWait?map[connectMethodKey]wantConnQueue?//?waiting?getConns
????
????//最大空閑連接數
????MaxIdleConns?int
????//每個目標host最大空閑連接數;默認為2(注意默認值)
????MaxIdleConnsPerHost?int
????//每個host可建立的最大連接數
????MaxConnsPerHost?int
????//連接多少時間沒有使用則被關閉
????IdleConnTimeout?time.Duration
????
????//禁用長連接,使用短連接
????DisableKeepAlives?bool
}
可以看到,連接護著隊列,都是一個map結構,而key為協議目標地址等組合,即同一種協議與同一個目標host可建立的連接或者空閑連接是有限制的。
需要特別注意的是,MaxIdleConnsPerHost默認等于2,即與目標主機最多只維護兩個空閑連接。這會導致什么呢?
如果遇到突發(fā)流量,瞬間建立大量連接,但是回收連接時,由于最大空閑連接數的限制,該聯機不能進入空閑連接池,只能直接關閉。結果是,一直新建大量連接,又關閉大量連,業(yè)務機器的TIME_WAIT連接數隨之突增。
線上有些業(yè)務架構是這樣的:客戶端 ===> LVS ===> Nginx ===> 服務。LVS負載均衡方案采用DR模式,LVS與Nginx配置統一VIP。此時在客戶端看來,只有一個IP地址,只有一個Host。上述問題更為明顯。
最后,Transport也提供了配置DisableKeepAlives,禁用長連接,使用短連接訪問第三方資源或者服務。
連接獲取與回收
Transport結構提供下面兩個方法實現連接的獲取與回收操作。
func?(t?*Transport)?getConn(treq?*transportRequest,?cm?connectMethod)?(pc?*persistConn,?err?error)?{}
func?(t?*Transport)?tryPutIdleConn(pconn?*persistConn)?error?{}
連接的獲取主要分為兩步走:1)嘗試獲取空閑連接;2)嘗試新建連接:
//getConn方法內部實現
if?delivered?:=?t.queueForIdleConn(w);?delivered?{
????return?pc,?nil
}
????
t.queueForDial(w)
當然,可能獲取不到連接而需要排隊,此時怎么辦呢?當前會阻塞當前協程了,直到獲取連接為止,或者httpclient超時取消請求:
select?{
????case?<-w.ready:
????????return?w.pc,?w.err
????????
????//超時被取消
????case?<-req.Cancel:
????????return?nil,?errRequestCanceledConn
????……
}
var?errRequestCanceledConn?=?errors.New("net/http:?request?canceled?while?waiting?for?connection")?//?TODO:?unify?
排隊等待空閑連接的邏輯如下:
func?(t?*Transport)?queueForIdleConn(w?*wantConn)?(delivered?bool)?{
????//如果配置了空閑超時時間,獲取到連接需要檢測,超時則關閉連接
????if?t.IdleConnTimeout?>?0?{
????????oldTime?=?time.Now().Add(-t.IdleConnTimeout)
????}
????
????if?list,?ok?:=?t.idleConn[w.key];?ok?{
????????for?len(list)?>?0?&&?!stop?{
????????????pconn?:=?list[len(list)-1]
????????????tooOld?:=?!oldTime.IsZero()?&&?pconn.idleAt.Round(0).Before(oldTime)
????????????//超時了,關閉連接
????????????if?tooOld?{
????????????????go?pconn.closeConnIfStillIdle()
????????????}
????????????
????????????//分發(fā)連接到wantConn
????????????delivered?=?w.tryDeliver(pconn,?nil)
????????}
????}
????
????//排隊等待空閑連接
????q?:=?t.idleConnWait[w.key]
????q.pushBack(w)
????t.idleConnWait[w.key]?=?q
}
排隊等待新建連接的邏輯如下:
func?(t?*Transport)?queueForDial(w?*wantConn)?{
????//如果沒有限制最大連接數,直接建立連接
????if?t.MaxConnsPerHost?<=?0?{
????????go?t.dialConnFor(w)
????????return
????}
????
????//如果沒超過連接數限制,直接建立連接
????if?n?:=?t.connsPerHost[w.key];?n?????????go?t.dialConnFor(w)
????????return
????}
????
????//排隊等待連接建立
????q?:=?t.connsPerHostWait[w.key]
????q.pushBack(w)
????t.connsPerHostWait[w.key]?=?q
}
連接建立完成后,同樣會調用tryDeliver分發(fā)連接到wantConn,同時關閉通道w.ready,這樣主協程糾接觸阻塞了。
func?(w?*wantConn)?tryDeliver(pc?*persistConn,?err?error)?bool?{
????w.pc?=?pc
????close(w.ready)
}
請求處理完成后,通過tryPutIdleConn將連接放回連接池;這時候如果存在等待空閑連接的協程,則需要分發(fā)復用該連接。另外,在回收連接時,還需要校驗空閑連接數目是否超過限制:
func?(t?*Transport)?tryPutIdleConn(pconn?*persistConn)?error?{
????//禁用長連接;或者最大空閑連接數不合法
????if?t.DisableKeepAlives?||?t.MaxIdleConnsPerHost?0?{
????????return?errKeepAlivesDisabled
????}
????
????if?q,?ok?:=?t.idleConnWait[key];?ok?{
????????//如果等待隊列不為空,分發(fā)連接
????????for?q.len()?>?0?{
????????????w?:=?q.popFront()
????????????if?w.tryDeliver(pconn,?nil)?{
????????????????done?=?true
????????????????break
????????????}
????????}
????}
????
????//空閑連接數目超過限制,默認為DefaultMaxIdleConnsPerHost=2
????idles?:=?t.idleConn[key]
????if?len(idles)?>=?t.maxIdleConnsPerHost()?{
????????return?errTooManyIdleHost
????}
}
空閑連接超時關閉
Golang HTTP連接池如何實現空閑連接的超時關閉邏輯呢?從上述queueForIdleConn邏輯可以看到,每次在獲取到空閑連接時,都會檢測是否已經超時,超時則關閉連接。
那如果沒有業(yè)務請求到達,一直不需要獲取連接,空閑連接就不會超時關閉嗎?其實在將空閑連接添加到連接池時,Golang同時還設置了定時器,定時器到期后,自然會關閉該連接。
pconn.idleTimer?=?time.AfterFunc(t.IdleConnTimeout,?pconn.closeConnIfStillIdle)
排隊隊列怎么實現
怎么實現隊列模型呢?很簡單,可以基于切片:
queue????[]*wantConn
//入隊
queue?=?append(queue,?w)
//出隊
v?:=?queue[0]
queue[0]?=?nil
queue?=?queue[1:]
這樣有什么問題嗎?隨著頻繁的入隊與出隊操作,切片queue的底層數組,會有大量空間無法復用而造成浪費。除非該切片執(zhí)行了擴容操作。
Golang在實現隊列時,使用了兩個切片head和tail;head切片用于出隊操作,tail切片用于入隊操作;出隊時,如果head切片為空,則交換head與tail。通過這種方式,Golang實現了底層數組空間的復用。
func?(q?*wantConnQueue)?pushBack(w?*wantConn)?{
????q.tail?=?append(q.tail,?w)
}
func?(q?*wantConnQueue)?popFront()?*wantConn?{
????if?q.headPos?>=?len(q.head)?{
????????if?len(q.tail)?==?0?{
????????????return?nil
????????}
????????//?Pick?up?tail?as?new?head,?clear?tail.
????????q.head,?q.headPos,?q.tail?=?q.tail,?0,?q.head[:0]
????}
????w?:=?q.head[q.headPos]
????q.head[q.headPos]?=?nil
????q.headPos++
????return?w
}
本文作者:源代碼
原文鏈接:https://segmentfault.com/a/1190000023676010
推薦閱讀
站長 polarisxu
自己的原創(chuàng)文章
不限于 Go 技術
職場和創(chuàng)業(yè)經驗
Go語言中文網
每天為你
分享 Go 知識
Go愛好者值得關注
