<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Go:并發(fā) IO 優(yōu)化

          共 2104字,需瀏覽 5分鐘

           ·

          2022-04-18 02:55

          1. 背景

          有的時(shí)候我們會(huì)遇到并發(fā) IO 的情況,例如,并發(fā)爬蟲下載網(wǎng)絡(luò)上的圖片。如果并發(fā)度過高或者下載的內(nèi)容過大,會(huì)導(dǎo)致網(wǎng)絡(luò) IO 耗時(shí)急劇上升。這時(shí)候就需要優(yōu)化一下每次網(wǎng)絡(luò)IO 的耗時(shí)。

          2. 網(wǎng)絡(luò)下載圖片用例

          以下載網(wǎng)絡(luò)數(shù)據(jù)為例,下面是典型的代碼。

          func?TestHttpGet(t?*testing.T)?{
          ????rsp,?err?:=?http.Get("http://xxx.com")
          ????if?err?!=?nil?{
          ????????t.Errorf("get?err:%v",?err)
          ????????return
          ????}
          ????defer?rsp.Body.Close()
          ????body,?err?:=?ioutil.ReadAll(rsp.Body)
          ????t.Logf("body?len:%v,?read?err:%v",?len(body),?err)
          }

          在代碼中,首先通過 http.Get 獲取網(wǎng)絡(luò)上的資源,這段耗時(shí)不容易在業(yè)務(wù)層面優(yōu)化。因此想要優(yōu)化整體耗時(shí),只有從讀取響應(yīng) rsp.Body 入手。

          3. ioutil.ReadAll

          3.1. 源碼分析

          ioutil.ReadAll 中其實(shí)是調(diào)用了 bytes.Buffer.ReadFrom 函數(shù),buf 的初始容量是 bytes.MinRead = 512

          //?readAll?reads?from?r?until?an?error?or?EOF?and?returns?the?data?it?read
          //?from?the?internal?buffer?allocated?with?a?specified?capacity.
          func?readAll(r?io.Reader,?capacity?int64)?(b?[]byte,?err?error)?{
          ????var?buf?bytes.Buffer
          ????if?int64(int(capacity))?==?capacity?{
          ????????buf.Grow(int(capacity))
          ????}
          ????_,?err?=?buf.ReadFrom(r)
          ????return?buf.Bytes(),?err
          }

          //?ReadAll?reads?from?r?until?an?error?or?EOF?and?returns?the?data?it?read.
          //?A?successful?call?returns?err?==?nil,?not?err?==?EOF.?Because?ReadAll?is
          //?defined?to?read?from?src?until?EOF,?it?does?not?treat?an?EOF?from?Read
          //?as?an?error?to?be?reported.
          func?ReadAll(r?io.Reader)?([]byte,?error)?{
          ????return?readAll(r,?bytes.MinRead)
          }

          buf.ReadFrom(r) 中,首先將 b.buf 擴(kuò)容 MinRead = 512 字節(jié),然后從 r 中一輪一輪讀取數(shù)據(jù),直到 b.buf 填完。

          //?MinRead?is?the?minimum?slice?size?passed?to?a?Read?call?by
          //?Buffer.ReadFrom.?As?long?as?the?Buffer?has?at?least?MinRead?bytes?beyond
          //?what?is?required?to?hold?the?contents?of?r,?ReadFrom?will?not?grow?the
          //?underlying?buffer.
          const?MinRead?=?512

          //?ReadFrom?reads?data?from?r?until?EOF?and?appends?it?to?the?buffer,?growing
          //?the?buffer?as?needed.?The?return?value?n?is?the?number?of?bytes?read.?Any
          //?error?except?io.EOF?encountered?during?the?read?is?also?returned.?If?the
          //?buffer?becomes?too?large,?ReadFrom?will?panic?with?ErrTooLarge.
          func?(b?*Buffer)?ReadFrom(r?io.Reader)?(n?int64,?err?error)?{
          ????b.lastRead?=?opInvalid
          ????for?{
          ????????i?:=?b.grow(MinRead)
          ????????b.buf?=?b.buf[:i]
          ????????m,?e?:=?r.Read(b.buf[i:cap(b.buf)])
          ????????if?m?0?{
          ????????????panic(errNegativeRead)
          ????????}

          ????????b.buf?=?b.buf[:i+m]
          ????????n?+=?int64(m)
          ????????if?e?==?io.EOF?{
          ????????????return?n,?nil?//?e?is?EOF,?so?return?nil?explicitly
          ????????}
          ????????if?e?!=?nil?{
          ????????????return?n,?e
          ????????}
          ????}
          }

          b.grow(n) 函數(shù)用來將 bytes.Buffer 擴(kuò)容,以便容納下 n 個(gè) byte,如果 b 已經(jīng)無法擴(kuò)容了,則會(huì)產(chǎn)生一個(gè) panic,攜帶 ErrTooLarge error。

          bytes.Buffer 的定義如下:

          //?A?Buffer?is?a?variable-sized?buffer?of?bytes?with?Read?and?Write?methods.
          //?The?zero?value?for?Buffer?is?an?empty?buffer?ready?to?use.
          type?Buffer?struct?{
          ????buf??????[]byte?//?contents?are?the?bytes?buf[off?:?len(buf)]
          ????off??????int????//?read?at?&buf[off],?write?at?&buf[len(buf)]
          ????lastRead?readOp?//?last?read?operation,?so?that?Unread*?can?work?correctly.
          }

          b.grow(n) 函數(shù)的邏輯為:

          1. 如果 b.buf 的長(zhǎng)度 len + n 小于等于 b.buf 的容量:cap(b.buf)。則直接返回長(zhǎng)度 len。

          2. 如果 b.buf = nil 并且 n < 64時(shí),則會(huì)新建一個(gè)長(zhǎng)度為n,容量為 64 的[]byte 數(shù)組并返回。

          3. 如果b.buf的長(zhǎng)度 len + n小于等于 b.buf 一半容量:cap(b.buf)/2,就把b.buf[b.offset:]部分的數(shù)據(jù)移到b.buf開頭,b.offset是 bytes.Buffer 開始讀的位置,這樣就是把 b.buf中可用的數(shù)據(jù)向前移到開頭。

          4. 如果b.buf的長(zhǎng)度 len + n大于 b.buf 一半容量:cap(b.buf)/2,則會(huì)調(diào)用 makeSlice分配一個(gè)新的 []byte,長(zhǎng)度為當(dāng)前容量的二倍加n:cap(b.buf)*2+n,然后把原來的數(shù)據(jù)復(fù)制到新 buf 中:copy(buf, b.buf[b.off:])

          所以說,如果bytes.Buffer初始的 buf 容量不夠大,而需要讀取的數(shù)據(jù)太大的話,會(huì)頻繁的進(jìn)行內(nèi)存分配,這是耗時(shí)增加的原因。

          而在readall函數(shù)中,bytes.Buffer 的初始容量是 512 字節(jié),之后會(huì)成倍增加直到滿足數(shù)據(jù)大小。

          3.2. 資源分配分析

          為了避免網(wǎng)絡(luò) IO 測(cè)試對(duì)外產(chǎn)生的影響,使用磁盤 IO 來替代網(wǎng)絡(luò) IO,分析 ioutil.ReadAll 的內(nèi)存分配。磁盤 IO 使用一個(gè) 72MB 的測(cè)試文件:test.data.rar

          寫一個(gè)下面的單測(cè)代碼:

          func?TestReadAll(t?*testing.T)?{
          ????file,?err?:=?os.Open(testName)
          ????if?err?!=?nil?{
          ????????t.Errorf("open?err:%v",?err)
          ????????return
          ????}
          ????_,?err?=?ioutil.ReadAll(file)
          ????if?err?!=?nil?{
          ????????t.Errorf("readall?err:%v",?err)
          ????????return
          ????}
          }

          執(zhí)行單元測(cè)試,并儲(chǔ)存內(nèi)存和cpu概要信息。

          go?test?--run?TestReadAll$?-v?-memprofile?readall.mem?-memprofilerate?1?-cpuprofile?readall.cpu

          接下來使用 pprof 分析內(nèi)存和cpu 的概要文件。

          3.2.1. cpu 分析

          首先分析 cpu 概要文件。在 bash 中輸入:

          go?tool?pprof?-http=:8080?readall.cpu

          則會(huì)在打開一個(gè)頁面,里面就是各個(gè)函數(shù)的耗時(shí),例如,TestReadAll 就花了 90ms。

          3.2.2. 內(nèi)存分析

          接下來是內(nèi)存概要文件分析。在 bash 中輸入:

          go?tool?pprof?-http=:8080?readall.mem

          可以看到在 ioutil.ReadAll 進(jìn)行了多次內(nèi)存分配。這是因?yàn)樵?ioutil.ReadAll 內(nèi)部會(huì)多次調(diào)用 bytes.BufferGrow(n) 函數(shù)進(jìn)行擴(kuò)容,最后一次擴(kuò)容產(chǎn)生了一個(gè) 128 MB 的切片。

          128MB 正好是測(cè)試文件大小向上取整的512字節(jié)的整數(shù)倍。

          4. io.Copy

          前面說到,使用 ioutil.ReadAll 讀取大文件時(shí)會(huì)出現(xiàn)頻繁的內(nèi)存分配,增加大量不必要的耗時(shí)。

          那我們會(huì)想,可以直接避免內(nèi)存頻繁分配嗎?反正內(nèi)存也不會(huì)省,那我們?cè)谥爸苯右淮畏峙鋲蛄耍缶筒粫?huì)有額外的內(nèi)存分配耗時(shí)了。

          io.Copy 就可以實(shí)現(xiàn)這個(gè)功能。

          4.1. 預(yù)分配文件大小內(nèi)存

          func?TestIOCopy(t?*testing.T)?{
          ????file,?err?:=?os.Open(testName)
          ????if?err?!=?nil?{
          ????????t.Errorf("open?err:%v",?err)
          ????????return
          ????}
          ????data?:=?make([]byte,?0,?74077894)
          ????buf?:=?bytes.NewBuffer(data)
          ????_,?err?=?io.Copy(buf,?file)
          ????if?err?!=?nil?{
          ????????t.Errorf("readall?err:%v",?err)
          ????????return
          ????}
          }

          在上面代碼中,預(yù)分配文件大小的內(nèi)存,然后調(diào)用 io.Copy復(fù)制數(shù)據(jù)。

          io.Copy 函數(shù)中會(huì)直接調(diào)用 buf.ReadFrom 讀取 file 中的數(shù)據(jù)。

          //?ReadFrom?reads?data?from?r?until?EOF?and?appends?it?to?the?buffer,?growing
          //?the?buffer?as?needed.?The?return?value?n?is?the?number?of?bytes?read.?Any
          //?error?except?io.EOF?encountered?during?the?read?is?also?returned.?If?the
          //?buffer?becomes?too?large,?ReadFrom?will?panic?with?ErrTooLarge.
          func?(b?*Buffer)?ReadFrom(r?io.Reader)?(n?int64,?err?error)?{
          ????b.lastRead?=?opInvalid
          ????for?{
          ????????i?:=?b.grow(MinRead)
          ????????b.buf?=?b.buf[:i]
          ????????m,?e?:=?r.Read(b.buf[i:cap(b.buf)])
          ????????if?m?0?{
          ????????????panic(errNegativeRead)
          ????????}

          ????????b.buf?=?b.buf[:i+m]
          ????????n?+=?int64(m)
          ????????if?e?==?io.EOF?{
          ????????????return?n,?nil?//?e?is?EOF,?so?return?nil?explicitly
          ????????}
          ????????if?e?!=?nil?{
          ????????????return?n,?e
          ????????}
          ????}
          }

          執(zhí)行單測(cè)生成 cpu 和內(nèi)存概要文件:

          go?test?--run?TestIOCopy?-v?-memprofile?iocopy.mem?-memprofilerate?1?-cpuprofile?iocopy.cpu

          分析 cpu 時(shí)間如下,可以看到只花了 40ms,比之前的 ioutil.ReadAll 低 50ms。但是還是調(diào)用了 buffer.grow 函數(shù),說明在這個(gè)單測(cè)中還是存在額外的內(nèi)存分配。


          分析內(nèi)存概要文件如下,可以發(fā)現(xiàn)的確有額外的內(nèi)存分配,并且分配的內(nèi)存是文件大小的兩倍。這說明耗時(shí)還有進(jìn)一步下降的空間。

          4.2. 預(yù)分配雙倍文件大小內(nèi)存

          在代碼中預(yù)先分配雙倍文件大小的內(nèi)存:

          func?TestIOCopy(t?*testing.T)?{
          ????file,?err?:=?os.Open(testName)
          ????if?err?!=?nil?{
          ????????t.Errorf("open?err:%v",?err)
          ????????return
          ????}
          ????data?:=?make([]byte,?0,?74077894*2)
          ????buf?:=?bytes.NewBuffer(data)
          ????_,?err?=?io.Copy(buf,?file)
          ????if?err?!=?nil?{
          ????????t.Errorf("readall?err:%v",?err)
          ????????return
          ????}
          }

          執(zhí)行單測(cè),分析 cpu 和內(nèi)存概要文件。

          分析 cpu 耗時(shí),可以看到只花了 10ms,比最開始使用 ioutil.ReadAll 減少80ms。


          內(nèi)存概要分析如下,可以看到除了最開始的內(nèi)存分配,代碼內(nèi)部沒有額外的內(nèi)存分配了,這也是耗時(shí)進(jìn)一步下降的原因。

          5. 并發(fā)壓測(cè)

          前面的測(cè)試只是運(yùn)行一次,比較cpu 耗時(shí)和內(nèi)存分配次數(shù)。但是在實(shí)際業(yè)務(wù)中,可能存在并發(fā) IO 的情形,這種情況下,io.copyioutil.ReadAll 能提高多少性能呢?

          下面的單測(cè)中,分別運(yùn)行 100 次并發(fā)示例進(jìn)行比較,在 readAllDataiocpoyData 函數(shù)中并發(fā)度控制在 10,計(jì)算量為 100。執(zhí)行單元測(cè)試,統(tǒng)計(jì)總的 cpu 耗時(shí)和內(nèi)存分布。

          注意,下面的 iocpoyData 函數(shù)中,預(yù)分配的內(nèi)存大小是雙倍的文件大小。

          func?TestReadAllIOCopy(t?*testing.T)?{
          ????for?i?:=?0;?i?100;?i++?{
          ????????readmax,?readtotal?:=?readAllData(t,?testName)
          ????????copymax,?copytotal?:=?iocopyData(t,?testName)
          ????????t.Logf("Max?copy/read:%v,?total?copy/read:%v",
          ????????????float64(copymax)/float64(readmax),?float64(copytotal)/float64(readtotal))
          ????}
          }
          func?readAllData(t?*testing.T,?fileName?string)?(int64,?int64)?{
          ????mu?:=?&sync.Mutex{}
          ????var?max?int64
          ????var?total?int64
          ????ctrl?:=?make(chan?struct{},?10)
          ????wg?:=?&sync.WaitGroup{}
          ????for?i?:=?0;?i?100;?i++?{
          ????????ctrl?<-?struct{}{}
          ????????wg.Add(1)
          ????????go?func()?{
          ????????????defer?func()?{
          ????????????????<-ctrl
          ????????????????wg.Done()
          ????????????}()
          ????????????start?:=?time.Now()
          ????????????file,?err?:=?os.Open(fileName)
          ????????????if?err?!=?nil?{
          ????????????????t.Errorf("open?err:%v",?err)
          ????????????????return
          ????????????}
          ????????????_,?err?=?ioutil.ReadAll(file)
          ????????????if?err?!=?nil?{
          ????????????????t.Errorf("readall?err:%v",?err)
          ????????????????return
          ????????????}
          ????????????cost?:=?time.Since(start).Milliseconds()
          ????????????atomic.AddInt64(&total,?cost)
          ????????????mu.Lock()
          ????????????if?cost?>?max?{
          ????????????????max?=?cost
          ????????????}
          ????????????mu.Unlock()
          ????????}()
          ????}
          ????wg.Wait()
          ????return?max,?total
          }

          func?iocopyData(t?*testing.T,?fileName?string)?(int64,?int64)?{
          ????mu?:=?&sync.Mutex{}
          ????var?max?int64
          ????var?total?int64
          ????wg?:=?&sync.WaitGroup{}
          ????ctrl?:=?make(chan?struct{},?10)
          ????for?i?:=?0;?i?100;?i++?{
          ????????ctrl?<-?struct{}{}
          ????????wg.Add(1)
          ????????go?func()?{
          ????????????defer?func()?{
          ????????????????<-ctrl
          ????????????????wg.Done()
          ????????????}()
          ????????????start?:=?time.Now()
          ????????????file,?err?:=?os.Open(fileName)
          ????????????if?err?!=?nil?{
          ????????????????t.Errorf("open?err:%v",?err)
          ????????????????return
          ????????????}
          ????????????fileInfo,?er?:=?os.Stat(fileName)
          ????????????if?er?!=?nil?{
          ????????????????t.Errorf("state?err:%v",?err)
          ????????????????return
          ????????????}
          ????????????data?:=?make([]byte,?0,?fileInfo.Size()*2)
          ????????????buf?:=?bytes.NewBuffer(data)
          ????????????_,?err?=?io.Copy(buf,?file)
          ????????????if?err?!=?nil?{
          ????????????????t.Errorf("copy?err:%v",?err)
          ????????????????return
          ????????????}
          ????????????cost?:=?time.Since(start).Milliseconds()
          ????????????atomic.AddInt64(&total,?cost)
          ????????????mu.Lock()
          ????????????if?cost?>?max?{
          ????????????????max?=?cost
          ????????????}
          ????????????mu.Unlock()
          ????????}()
          ????}
          ????wg.Wait()
          ????return?max,?total
          }

          5.1. cpu 分析

          下圖是 cpu 時(shí)間的分析,可以看到 readAllData 花了 704.03s,iocopyData 只花了 161s,后者是前者比例的 22.8%。

          兩個(gè)函數(shù)都會(huì)調(diào)用 runtime.makeSlice 進(jìn)行內(nèi)存分配,不同的是 readAllData 花費(fèi)了 248.8s 在調(diào)用這個(gè)函數(shù)上面,而 readAllData 只花了 131.6s,后者是前者的 52.8%,這個(gè)結(jié)果也是和代碼實(shí)現(xiàn)相吻合的。

          5.2. 內(nèi)存分析

          接下來看一下兩者的內(nèi)存分析。

          readAllData 在內(nèi)部多次分配內(nèi)存,所以內(nèi)存消耗也要比 iocopyData 大很多。readAllData 執(zhí)行的時(shí)候花了 2.44TB 的內(nèi)存,幾乎全部用在了 bytes.makeSlice 上面;而 iocopyData 則只在最開始手動(dòng)進(jìn)行了內(nèi)存分配,共花了 1.35TB 內(nèi)存了;后者是前者內(nèi)存消耗的 55.3%。這個(gè)比例與前面內(nèi)存分配消耗的時(shí)間比例也是吻合的。

          總結(jié)

          綜上所述,在涉及頻繁 IO 的情況下,盡可能使用 io.Copy 并且分配指定內(nèi)存可以降低代碼運(yùn)行時(shí)間,并且提高內(nèi)存效率。當(dāng)指定的內(nèi)存大小是需要讀取的數(shù)據(jù)大小的兩倍時(shí),效率達(dá)到最高。



          推薦閱讀


          福利

          我為大家整理了一份從入門到進(jìn)階的Go學(xué)習(xí)資料禮包,包含學(xué)習(xí)建議:入門看什么,進(jìn)階看什么。關(guān)注公眾號(hào) 「polarisxu」,回復(fù)?ebook?獲取;還可以回復(fù)「進(jìn)群」,和數(shù)萬 Gopher 交流學(xué)習(xí)。

          瀏覽 8
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  九九九九国产 | 欧美三级欧美一级 | 日韩一区影院 | 大香蕉天天日 | 麻豆一区在线 |