Go 每日一庫之 bytebufferpool
簡介
在編程開發(fā)中,我們經(jīng)常會需要頻繁創(chuàng)建和銷毀同類對象的情形。這樣的操作很可能會對性能造成影響。這時(shí),常用的優(yōu)化手段就是使用對象池(object pool)。需要?jiǎng)?chuàng)建對象時(shí),我們先從對象池中查找。如果有空閑對象,則從池中移除這個(gè)對象并將其返回給調(diào)用者使用。只有在池中無空閑對象時(shí),才會真正創(chuàng)建一個(gè)新對象。另一方面,對象使用完之后,我們并不進(jìn)行銷毀。而是將它放回到對象池以供后續(xù)使用。使用對象池在頻繁創(chuàng)建和銷毀對象的情形下,能大幅度提升性能。同時(shí),為了避免對象池中的對象占用過多的內(nèi)存。對象池一般還配有特定的清理策略。Go 標(biāo)準(zhǔn)庫sync.Pool就是這樣一個(gè)例子。sync.Pool中的對象會被垃圾回收清理掉。
在這類對象中,比較特殊的一類是字節(jié)緩沖(底層一般是字節(jié)切片)。在做字符串拼接時(shí),為了拼接的高效,我們通常將中間結(jié)果存放在一個(gè)字節(jié)緩沖。在拼接完成之后,再從字節(jié)緩沖中生成結(jié)果字符串。在收發(fā)網(wǎng)絡(luò)包時(shí),也需要將不完整的包暫時(shí)存放在字節(jié)緩沖中。
Go 標(biāo)準(zhǔn)庫中的類型bytes.Buffer封裝字節(jié)切片,提供一些使用接口。我們知道切片的容量是有限的,容量不足時(shí)需要進(jìn)行擴(kuò)容。而頻繁的擴(kuò)容容易造成性能抖動(dòng)。bytebufferpool實(shí)現(xiàn)了自己的Buffer類型,并使用一個(gè)簡單的算法降低擴(kuò)容帶來的性能損失。bytebufferpool已經(jīng)在大名鼎鼎的 Web 框架fasthttp和靈活的 Go 模塊庫quicktemplate得到了應(yīng)用。實(shí)際上,這 3 個(gè)庫是同一個(gè)作者:valyala??。
快速使用
本文代碼使用 Go Modules。
創(chuàng)建目錄并初始化:
$ mkdir bytebufferpool && cd bytebufferpool
$ go mod init github.com/darjun/go-daily-lib/bytebufferpool
安裝bytebufferpool庫:
$ go get -u github.com/PuerkitoBio/bytebufferpool
典型的使用方式先通過bytebufferpool提供的Get()方法獲取一個(gè)bytebufferpool.Buffer對象,然后調(diào)用這個(gè)對象的方法寫入數(shù)據(jù),使用完成之后再調(diào)用bytebufferpool.Put()將對象放回對象池中。例:
package main
import (
"fmt"
"github.com/valyala/bytebufferpool"
)
func main() {
b := bytebufferpool.Get()
b.WriteString("hello")
b.WriteByte(',')
b.WriteString(" world!")
fmt.Println(b.String())
bytebufferpool.Put(b)
}
直接調(diào)用bytebufferpool包的Get()和Put()方法,底層操作的是包中默認(rèn)的對象池:
// bytebufferpool/pool.go
var defaultPool Pool
func Get() *ByteBuffer { return defaultPool.Get() }
func Put(b *ByteBuffer) { defaultPool.Put(b) }
我們當(dāng)然可以根據(jù)實(shí)際需要?jiǎng)?chuàng)建新的對象池,將相同用處的對象放在一起(比如我們可以創(chuàng)建一個(gè)對象池用于輔助接收網(wǎng)絡(luò)包,一個(gè)用于輔助拼接字符串):
func main() {
joinPool := new(bytebufferpool.Pool)
b := joinPool.Get()
b.WriteString("hello")
b.WriteByte(',')
b.WriteString(" world!")
fmt.Println(b.String())
joinPool.Put(b)
}
bytebufferpool沒有提供具體的創(chuàng)建函數(shù),不過可以使用new創(chuàng)建。
優(yōu)化細(xì)節(jié)
在將對象放回池中時(shí),會根據(jù)當(dāng)前切片的容量進(jìn)行相應(yīng)的處理。bytebufferpool將大小分為 20 個(gè)區(qū)間:
| < 2^6 | 2^6 ~ 2^7-1 | ... | > 2^25 |
如果容量小于 2^6,則屬于第一個(gè)區(qū)間。如果處于 2^6 和 2^7-1 之間,則落在第二個(gè)區(qū)間。以此類推。執(zhí)行足夠多的放回次數(shù)后,bytebufferpool會重新校準(zhǔn),計(jì)算處于哪個(gè)區(qū)間容量的對象最多。將defaultSize設(shè)置為該區(qū)間的上限容量,第一個(gè)區(qū)間的上限容量為 2^6,第二區(qū)間為 2^7,最后一個(gè)區(qū)間為 2^26。后續(xù)通過Get()請求對象時(shí),若池中無空閑對象,創(chuàng)建一個(gè)新對象時(shí),直接將容量設(shè)置為defaultSize。這樣基本可以避免在使用過程中的切片擴(kuò)容,從而提升性能。下面結(jié)合代碼來理解:
// bytebufferpool/pool.go
const (
minBitSize = 6 // 2**6=64 is a CPU cache line size
steps = 20
minSize = 1 << minBitSize
maxSize = 1 << (minBitSize + steps - 1)
calibrateCallsThreshold = 42000
maxPercentile = 0.95
)
type Pool struct {
calls [steps]uint64
calibrating uint64
defaultSize uint64
maxSize uint64
pool sync.Pool
}
我們可以看到,bytebufferpool內(nèi)部使用了標(biāo)準(zhǔn)庫中的對象sync.Pool。
這里的steps就是上面所說的區(qū)間,一共 20 份。calls數(shù)組記錄放回的對象容量落在各個(gè)區(qū)間的次數(shù)。
調(diào)用Pool.Get()將對象放回時(shí),首先計(jì)算切片容量落在哪個(gè)區(qū)間,增加calls數(shù)組中相應(yīng)元素的值:
// bytebufferpool/pool.go
func (p *Pool) Put(b *ByteBuffer) {
idx := index(len(b.B))
if atomic.AddUint64(&p.calls[idx], 1) > calibrateCallsThreshold {
p.calibrate()
}
maxSize := int(atomic.LoadUint64(&p.maxSize))
if maxSize == 0 || cap(b.B) <= maxSize {
b.Reset()
p.pool.Put(b)
}
}
如果calls數(shù)組該元素超過指定值calibrateCallsThreshold=42000(說明距離上次校準(zhǔn),放回對象到該區(qū)間的次數(shù)已經(jīng)達(dá)到閾值了,42000 應(yīng)該就是個(gè)經(jīng)驗(yàn)數(shù)字),則調(diào)用Pool.calibrate()執(zhí)行校準(zhǔn)操作:
// bytebufferpool/pool.go
func (p *Pool) calibrate() {
// 避免并發(fā)放回對象觸發(fā) `calibrate`
if !atomic.CompareAndSwapUint64(&p.calibrating, 0, 1) {
return
}
// step 1.統(tǒng)計(jì)并排序
a := make(callSizes, 0, steps)
var callsSum uint64
for i := uint64(0); i < steps; i++ {
calls := atomic.SwapUint64(&p.calls[i], 0)
callsSum += calls
a = append(a, callSize{
calls: calls,
size: minSize << i,
})
}
sort.Sort(a)
// step 2.計(jì)算 defaultSize 和 maxSize
defaultSize := a[0].size
maxSize := defaultSize
maxSum := uint64(float64(callsSum) * maxPercentile)
callsSum = 0
for i := 0; i < steps; i++ {
if callsSum > maxSum {
break
}
callsSum += a[i].calls
size := a[i].size
if size > maxSize {
maxSize = size
}
}
// step 3.保存對應(yīng)值
atomic.StoreUint64(&p.defaultSize, defaultSize)
atomic.StoreUint64(&p.maxSize, maxSize)
atomic.StoreUint64(&p.calibrating, 0)
}
step 1.統(tǒng)計(jì)并排序
calls數(shù)組記錄了放回對象到對應(yīng)區(qū)間的次數(shù)。按照這個(gè)次數(shù)從大到小排序。注意:minSize << i表示區(qū)間i的上限容量。
step 2.計(jì)算defaultSize和maxSize
defaultSize很好理解,取排序后的第一個(gè)size即可。maxSize值記錄放回次數(shù)超過 95% 的多個(gè)對象容量的最大值。它的作用是防止將使用較少的大容量對象放回對象池,從而占用太多內(nèi)存。這里就可以理解Pool.Put()方法后半部分的邏輯了:
// 如果要放回的對象容量大于 maxSize,則不放回
maxSize := int(atomic.LoadUint64(&p.maxSize))
if maxSize == 0 || cap(b.B) <= maxSize {
b.Reset()
p.pool.Put(b)
}
step 3.保存對應(yīng)值
后續(xù)通過Pool.Get()獲取對象時(shí),若池中無空閑對象,新創(chuàng)建的對象默認(rèn)容量為defaultSize。這樣的容量能滿足絕大多數(shù)情況下的使用,避免使用過程中的切片擴(kuò)容。
// bytebufferpool/pool.go
func (p *Pool) Get() *ByteBuffer {
v := p.pool.Get()
if v != nil {
return v.(*ByteBuffer)
}
return &ByteBuffer{
B: make([]byte, 0, atomic.LoadUint64(&p.defaultSize)),
}
}
其他一些細(xì)節(jié):
容量最小值取 2^6 = 64,因?yàn)檫@就是 64 位計(jì)算機(jī)上 CPU 緩存行的大小。這個(gè)大小的數(shù)據(jù)可以一次性被加載到 CPU 緩存行中,再小就無意義了。 代碼中多次使用 atomic原子操作,避免加鎖導(dǎo)致性能損失。
當(dāng)然這個(gè)庫缺點(diǎn)也很明顯,由于大部分使用的容量都小于defaultSize,會有部分內(nèi)存浪費(fèi)。
總結(jié)
去掉注釋,空行,bytebufferpool只用了 150 行左右的代碼就實(shí)現(xiàn)了一個(gè)高性能的Buffer對象池。其中細(xì)節(jié)值得細(xì)細(xì)品味。閱讀高質(zhì)量的代碼,學(xué)習(xí)編碼細(xì)節(jié)有助于提升自己的編碼能力。強(qiáng)烈建議細(xì)細(xì)品讀!!!
大家如果發(fā)現(xiàn)好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue??
參考
bytebufferpool GitHub:https://github.com/valyala/bytebufferpool Go 每日一庫 GitHub:https://github.com/darjun/go-daily-lib
推薦閱讀
