Go:并發(fā) IO 優(yōu)化
1. 背景
有的時(shí)候我們會(huì)遇到并發(fā) IO 的情況,例如,并發(fā)爬蟲下載網(wǎng)絡(luò)上的圖片。如果并發(fā)度過高或者下載的內(nèi)容過大,會(huì)導(dǎo)致網(wǎng)絡(luò) IO 耗時(shí)急劇上升。這時(shí)候就需要優(yōu)化一下每次網(wǎng)絡(luò)IO 的耗時(shí)。
2. 網(wǎng)絡(luò)下載圖片用例
以下載網(wǎng)絡(luò)數(shù)據(jù)為例,下面是典型的代碼。
func?TestHttpGet(t?*testing.T)?{
????rsp,?err?:=?http.Get("http://xxx.com")
????if?err?!=?nil?{
????????t.Errorf("get?err:%v",?err)
????????return
????}
????defer?rsp.Body.Close()
????body,?err?:=?ioutil.ReadAll(rsp.Body)
????t.Logf("body?len:%v,?read?err:%v",?len(body),?err)
}
在代碼中,首先通過 http.Get 獲取網(wǎng)絡(luò)上的資源,這段耗時(shí)不容易在業(yè)務(wù)層面優(yōu)化。因此想要優(yōu)化整體耗時(shí),只有從讀取響應(yīng) rsp.Body 入手。
3. ioutil.ReadAll
3.1. 源碼分析
ioutil.ReadAll 中其實(shí)是調(diào)用了 bytes.Buffer.ReadFrom 函數(shù),buf 的初始容量是 bytes.MinRead = 512。
//?readAll?reads?from?r?until?an?error?or?EOF?and?returns?the?data?it?read
//?from?the?internal?buffer?allocated?with?a?specified?capacity.
func?readAll(r?io.Reader,?capacity?int64)?(b?[]byte,?err?error)?{
????var?buf?bytes.Buffer
????if?int64(int(capacity))?==?capacity?{
????????buf.Grow(int(capacity))
????}
????_,?err?=?buf.ReadFrom(r)
????return?buf.Bytes(),?err
}
//?ReadAll?reads?from?r?until?an?error?or?EOF?and?returns?the?data?it?read.
//?A?successful?call?returns?err?==?nil,?not?err?==?EOF.?Because?ReadAll?is
//?defined?to?read?from?src?until?EOF,?it?does?not?treat?an?EOF?from?Read
//?as?an?error?to?be?reported.
func?ReadAll(r?io.Reader)?([]byte,?error)?{
????return?readAll(r,?bytes.MinRead)
}
在 buf.ReadFrom(r) 中,首先將 b.buf 擴(kuò)容 MinRead = 512 字節(jié),然后從 r 中一輪一輪讀取數(shù)據(jù),直到 b.buf 填完。
//?MinRead?is?the?minimum?slice?size?passed?to?a?Read?call?by
//?Buffer.ReadFrom.?As?long?as?the?Buffer?has?at?least?MinRead?bytes?beyond
//?what?is?required?to?hold?the?contents?of?r,?ReadFrom?will?not?grow?the
//?underlying?buffer.
const?MinRead?=?512
//?ReadFrom?reads?data?from?r?until?EOF?and?appends?it?to?the?buffer,?growing
//?the?buffer?as?needed.?The?return?value?n?is?the?number?of?bytes?read.?Any
//?error?except?io.EOF?encountered?during?the?read?is?also?returned.?If?the
//?buffer?becomes?too?large,?ReadFrom?will?panic?with?ErrTooLarge.
func?(b?*Buffer)?ReadFrom(r?io.Reader)?(n?int64,?err?error)?{
????b.lastRead?=?opInvalid
????for?{
????????i?:=?b.grow(MinRead)
????????b.buf?=?b.buf[:i]
????????m,?e?:=?r.Read(b.buf[i:cap(b.buf)])
????????if?m?0?{
????????????panic(errNegativeRead)
????????}
????????b.buf?=?b.buf[:i+m]
????????n?+=?int64(m)
????????if?e?==?io.EOF?{
????????????return?n,?nil?//?e?is?EOF,?so?return?nil?explicitly
????????}
????????if?e?!=?nil?{
????????????return?n,?e
????????}
????}
}
在 b.grow(n) 函數(shù)用來將 bytes.Buffer 擴(kuò)容,以便容納下 n 個(gè) byte,如果 b 已經(jīng)無法擴(kuò)容了,則會(huì)產(chǎn)生一個(gè) panic,攜帶 ErrTooLarge error。
bytes.Buffer 的定義如下:
//?A?Buffer?is?a?variable-sized?buffer?of?bytes?with?Read?and?Write?methods.
//?The?zero?value?for?Buffer?is?an?empty?buffer?ready?to?use.
type?Buffer?struct?{
????buf??????[]byte?//?contents?are?the?bytes?buf[off?:?len(buf)]
????off??????int????//?read?at?&buf[off],?write?at?&buf[len(buf)]
????lastRead?readOp?//?last?read?operation,?so?that?Unread*?can?work?correctly.
}
b.grow(n) 函數(shù)的邏輯為:
如果
b.buf的長(zhǎng)度 len + n 小于等于b.buf的容量:cap(b.buf)。則直接返回長(zhǎng)度 len。如果
b.buf = nil并且n < 64時(shí),則會(huì)新建一個(gè)長(zhǎng)度為n,容量為 64 的[]byte 數(shù)組并返回。如果
b.buf的長(zhǎng)度len + n小于等于b.buf一半容量:cap(b.buf)/2,就把b.buf[b.offset:]部分的數(shù)據(jù)移到b.buf開頭,b.offset是 bytes.Buffer 開始讀的位置,這樣就是把b.buf中可用的數(shù)據(jù)向前移到開頭。如果
b.buf的長(zhǎng)度len + n大于b.buf一半容量:cap(b.buf)/2,則會(huì)調(diào)用makeSlice分配一個(gè)新的 []byte,長(zhǎng)度為當(dāng)前容量的二倍加n:cap(b.buf)*2+n,然后把原來的數(shù)據(jù)復(fù)制到新 buf 中:copy(buf, b.buf[b.off:])。
所以說,如果bytes.Buffer初始的 buf 容量不夠大,而需要讀取的數(shù)據(jù)太大的話,會(huì)頻繁的進(jìn)行內(nèi)存分配,這是耗時(shí)增加的原因。
而在readall函數(shù)中,bytes.Buffer 的初始容量是 512 字節(jié),之后會(huì)成倍增加直到滿足數(shù)據(jù)大小。
3.2. 資源分配分析
為了避免網(wǎng)絡(luò) IO 測(cè)試對(duì)外產(chǎn)生的影響,使用磁盤 IO 來替代網(wǎng)絡(luò) IO,分析 ioutil.ReadAll 的內(nèi)存分配。磁盤 IO 使用一個(gè) 72MB 的測(cè)試文件:test.data.rar。

寫一個(gè)下面的單測(cè)代碼:
func?TestReadAll(t?*testing.T)?{
????file,?err?:=?os.Open(testName)
????if?err?!=?nil?{
????????t.Errorf("open?err:%v",?err)
????????return
????}
????_,?err?=?ioutil.ReadAll(file)
????if?err?!=?nil?{
????????t.Errorf("readall?err:%v",?err)
????????return
????}
}
執(zhí)行單元測(cè)試,并儲(chǔ)存內(nèi)存和cpu概要信息。
go?test?--run?TestReadAll$?-v?-memprofile?readall.mem?-memprofilerate?1?-cpuprofile?readall.cpu
接下來使用 pprof 分析內(nèi)存和cpu 的概要文件。
3.2.1. cpu 分析
首先分析 cpu 概要文件。在 bash 中輸入:
go?tool?pprof?-http=:8080?readall.cpu
則會(huì)在打開一個(gè)頁面,里面就是各個(gè)函數(shù)的耗時(shí),例如,TestReadAll 就花了 90ms。

3.2.2. 內(nèi)存分析
接下來是內(nèi)存概要文件分析。在 bash 中輸入:
go?tool?pprof?-http=:8080?readall.mem
可以看到在 ioutil.ReadAll 進(jìn)行了多次內(nèi)存分配。這是因?yàn)樵?ioutil.ReadAll 內(nèi)部會(huì)多次調(diào)用 bytes.Buffer 的 Grow(n) 函數(shù)進(jìn)行擴(kuò)容,最后一次擴(kuò)容產(chǎn)生了一個(gè) 128 MB 的切片。
128MB 正好是測(cè)試文件大小向上取整的512字節(jié)的整數(shù)倍。

4. io.Copy
前面說到,使用 ioutil.ReadAll 讀取大文件時(shí)會(huì)出現(xiàn)頻繁的內(nèi)存分配,增加大量不必要的耗時(shí)。
那我們會(huì)想,可以直接避免內(nèi)存頻繁分配嗎?反正內(nèi)存也不會(huì)省,那我們?cè)谥爸苯右淮畏峙鋲蛄耍缶筒粫?huì)有額外的內(nèi)存分配耗時(shí)了。
io.Copy 就可以實(shí)現(xiàn)這個(gè)功能。
4.1. 預(yù)分配文件大小內(nèi)存
func?TestIOCopy(t?*testing.T)?{
????file,?err?:=?os.Open(testName)
????if?err?!=?nil?{
????????t.Errorf("open?err:%v",?err)
????????return
????}
????data?:=?make([]byte,?0,?74077894)
????buf?:=?bytes.NewBuffer(data)
????_,?err?=?io.Copy(buf,?file)
????if?err?!=?nil?{
????????t.Errorf("readall?err:%v",?err)
????????return
????}
}
在上面代碼中,預(yù)分配文件大小的內(nèi)存,然后調(diào)用 io.Copy復(fù)制數(shù)據(jù)。
在 io.Copy 函數(shù)中會(huì)直接調(diào)用 buf.ReadFrom 讀取 file 中的數(shù)據(jù)。
//?ReadFrom?reads?data?from?r?until?EOF?and?appends?it?to?the?buffer,?growing
//?the?buffer?as?needed.?The?return?value?n?is?the?number?of?bytes?read.?Any
//?error?except?io.EOF?encountered?during?the?read?is?also?returned.?If?the
//?buffer?becomes?too?large,?ReadFrom?will?panic?with?ErrTooLarge.
func?(b?*Buffer)?ReadFrom(r?io.Reader)?(n?int64,?err?error)?{
????b.lastRead?=?opInvalid
????for?{
????????i?:=?b.grow(MinRead)
????????b.buf?=?b.buf[:i]
????????m,?e?:=?r.Read(b.buf[i:cap(b.buf)])
????????if?m?0?{
????????????panic(errNegativeRead)
????????}
????????b.buf?=?b.buf[:i+m]
????????n?+=?int64(m)
????????if?e?==?io.EOF?{
????????????return?n,?nil?//?e?is?EOF,?so?return?nil?explicitly
????????}
????????if?e?!=?nil?{
????????????return?n,?e
????????}
????}
}
執(zhí)行單測(cè)生成 cpu 和內(nèi)存概要文件:
go?test?--run?TestIOCopy?-v?-memprofile?iocopy.mem?-memprofilerate?1?-cpuprofile?iocopy.cpu
分析 cpu 時(shí)間如下,可以看到只花了 40ms,比之前的 ioutil.ReadAll 低 50ms。但是還是調(diào)用了 buffer.grow 函數(shù),說明在這個(gè)單測(cè)中還是存在額外的內(nèi)存分配。

分析內(nèi)存概要文件如下,可以發(fā)現(xiàn)的確有額外的內(nèi)存分配,并且分配的內(nèi)存是文件大小的兩倍。這說明耗時(shí)還有進(jìn)一步下降的空間。

4.2. 預(yù)分配雙倍文件大小內(nèi)存
在代碼中預(yù)先分配雙倍文件大小的內(nèi)存:
func?TestIOCopy(t?*testing.T)?{
????file,?err?:=?os.Open(testName)
????if?err?!=?nil?{
????????t.Errorf("open?err:%v",?err)
????????return
????}
????data?:=?make([]byte,?0,?74077894*2)
????buf?:=?bytes.NewBuffer(data)
????_,?err?=?io.Copy(buf,?file)
????if?err?!=?nil?{
????????t.Errorf("readall?err:%v",?err)
????????return
????}
}
執(zhí)行單測(cè),分析 cpu 和內(nèi)存概要文件。
分析 cpu 耗時(shí),可以看到只花了 10ms,比最開始使用 ioutil.ReadAll 減少80ms。

內(nèi)存概要分析如下,可以看到除了最開始的內(nèi)存分配,代碼內(nèi)部沒有額外的內(nèi)存分配了,這也是耗時(shí)進(jìn)一步下降的原因。

5. 并發(fā)壓測(cè)
前面的測(cè)試只是運(yùn)行一次,比較cpu 耗時(shí)和內(nèi)存分配次數(shù)。但是在實(shí)際業(yè)務(wù)中,可能存在并發(fā) IO 的情形,這種情況下,io.copy 比 ioutil.ReadAll 能提高多少性能呢?
下面的單測(cè)中,分別運(yùn)行 100 次并發(fā)示例進(jìn)行比較,在 readAllData 和 iocpoyData 函數(shù)中并發(fā)度控制在 10,計(jì)算量為 100。執(zhí)行單元測(cè)試,統(tǒng)計(jì)總的 cpu 耗時(shí)和內(nèi)存分布。
注意,下面的 iocpoyData 函數(shù)中,預(yù)分配的內(nèi)存大小是雙倍的文件大小。
func?TestReadAllIOCopy(t?*testing.T)?{
????for?i?:=?0;?i?100;?i++?{
????????readmax,?readtotal?:=?readAllData(t,?testName)
????????copymax,?copytotal?:=?iocopyData(t,?testName)
????????t.Logf("Max?copy/read:%v,?total?copy/read:%v",
????????????float64(copymax)/float64(readmax),?float64(copytotal)/float64(readtotal))
????}
}
func?readAllData(t?*testing.T,?fileName?string)?(int64,?int64)?{
????mu?:=?&sync.Mutex{}
????var?max?int64
????var?total?int64
????ctrl?:=?make(chan?struct{},?10)
????wg?:=?&sync.WaitGroup{}
????for?i?:=?0;?i?100;?i++?{
????????ctrl?<-?struct{}{}
????????wg.Add(1)
????????go?func()?{
????????????defer?func()?{
????????????????<-ctrl
????????????????wg.Done()
????????????}()
????????????start?:=?time.Now()
????????????file,?err?:=?os.Open(fileName)
????????????if?err?!=?nil?{
????????????????t.Errorf("open?err:%v",?err)
????????????????return
????????????}
????????????_,?err?=?ioutil.ReadAll(file)
????????????if?err?!=?nil?{
????????????????t.Errorf("readall?err:%v",?err)
????????????????return
????????????}
????????????cost?:=?time.Since(start).Milliseconds()
????????????atomic.AddInt64(&total,?cost)
????????????mu.Lock()
????????????if?cost?>?max?{
????????????????max?=?cost
????????????}
????????????mu.Unlock()
????????}()
????}
????wg.Wait()
????return?max,?total
}
func?iocopyData(t?*testing.T,?fileName?string)?(int64,?int64)?{
????mu?:=?&sync.Mutex{}
????var?max?int64
????var?total?int64
????wg?:=?&sync.WaitGroup{}
????ctrl?:=?make(chan?struct{},?10)
????for?i?:=?0;?i?100;?i++?{
????????ctrl?<-?struct{}{}
????????wg.Add(1)
????????go?func()?{
????????????defer?func()?{
????????????????<-ctrl
????????????????wg.Done()
????????????}()
????????????start?:=?time.Now()
????????????file,?err?:=?os.Open(fileName)
????????????if?err?!=?nil?{
????????????????t.Errorf("open?err:%v",?err)
????????????????return
????????????}
????????????fileInfo,?er?:=?os.Stat(fileName)
????????????if?er?!=?nil?{
????????????????t.Errorf("state?err:%v",?err)
????????????????return
????????????}
????????????data?:=?make([]byte,?0,?fileInfo.Size()*2)
????????????buf?:=?bytes.NewBuffer(data)
????????????_,?err?=?io.Copy(buf,?file)
????????????if?err?!=?nil?{
????????????????t.Errorf("copy?err:%v",?err)
????????????????return
????????????}
????????????cost?:=?time.Since(start).Milliseconds()
????????????atomic.AddInt64(&total,?cost)
????????????mu.Lock()
????????????if?cost?>?max?{
????????????????max?=?cost
????????????}
????????????mu.Unlock()
????????}()
????}
????wg.Wait()
????return?max,?total
}
5.1. cpu 分析
下圖是 cpu 時(shí)間的分析,可以看到 readAllData 花了 704.03s,iocopyData 只花了 161s,后者是前者比例的 22.8%。
兩個(gè)函數(shù)都會(huì)調(diào)用 runtime.makeSlice 進(jìn)行內(nèi)存分配,不同的是 readAllData 花費(fèi)了 248.8s 在調(diào)用這個(gè)函數(shù)上面,而 readAllData 只花了 131.6s,后者是前者的 52.8%,這個(gè)結(jié)果也是和代碼實(shí)現(xiàn)相吻合的。

5.2. 內(nèi)存分析
接下來看一下兩者的內(nèi)存分析。
readAllData 在內(nèi)部多次分配內(nèi)存,所以內(nèi)存消耗也要比 iocopyData 大很多。readAllData 執(zhí)行的時(shí)候花了 2.44TB 的內(nèi)存,幾乎全部用在了 bytes.makeSlice 上面;而 iocopyData 則只在最開始手動(dòng)進(jìn)行了內(nèi)存分配,共花了 1.35TB 內(nèi)存了;后者是前者內(nèi)存消耗的 55.3%。這個(gè)比例與前面內(nèi)存分配消耗的時(shí)間比例也是吻合的。

總結(jié)
綜上所述,在涉及頻繁 IO 的情況下,盡可能使用 io.Copy 并且分配指定內(nèi)存可以降低代碼運(yùn)行時(shí)間,并且提高內(nèi)存效率。當(dāng)指定的內(nèi)存大小是需要讀取的數(shù)據(jù)大小的兩倍時(shí),效率達(dá)到最高。
推薦閱讀
