<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Go經(jīng)典阻塞式TCP協(xié)議流解析的實(shí)踐

          共 25034字,需瀏覽 51分鐘

           ·

          2021-09-07 12:36

          1. Go經(jīng)典阻塞I/O的TCP網(wǎng)絡(luò)編程模型

          Go語(yǔ)言誕生十多年來(lái)取得了飛速發(fā)展,并得到了全世界開(kāi)發(fā)者的廣泛接納和應(yīng)用,其應(yīng)用領(lǐng)域廣泛,包括:Web服務(wù)、數(shù)據(jù)庫(kù)、網(wǎng)絡(luò)編程、系統(tǒng)編程、DevOps、安全檢測(cè)與管控、數(shù)據(jù)科學(xué)以及人工智能等。下面是2020年Go官方開(kāi)發(fā)者調(diào)查的部分結(jié)果:

          圖:2020年Go官方開(kāi)發(fā)者調(diào)查之Go語(yǔ)言的應(yīng)用領(lǐng)域(對(duì)比2019)

          我們看到**“Web編程”“網(wǎng)絡(luò)編程”**分別位列第一名和第四名,這個(gè)應(yīng)用領(lǐng)域數(shù)據(jù)分布與Go語(yǔ)言最初的面向大規(guī)模分布式網(wǎng)絡(luò)服務(wù)的設(shè)計(jì)目標(biāo)十分契合。網(wǎng)絡(luò)通信這塊是服務(wù)端程序必不可少也是至關(guān)重要的一部分。Go標(biāo)準(zhǔn)庫(kù)的net包是在Go中進(jìn)行網(wǎng)絡(luò)編程的基礎(chǔ)。即便您沒(méi)有直接使用到net包中有關(guān)TCP Socket[1]方面的函數(shù)/方法或接口,但net/http包想必大家總是用過(guò)的,http包實(shí)現(xiàn)的是HTTP這個(gè)應(yīng)用層協(xié)議,其在傳輸層使用的依舊是TCP Socket。

          Go是自帶運(yùn)行時(shí)的跨平臺(tái)編程語(yǔ)言,由于Go運(yùn)行時(shí)調(diào)度的需要,Go基于I/O多路復(fù)用機(jī)制(linux上使用epoll,macOS和freebsd上使用kqueue)設(shè)計(jì)和實(shí)現(xiàn)了一套適合自己的TCP Socket網(wǎng)絡(luò)編程模型。并且,Go秉承了自己一貫的追求簡(jiǎn)單的設(shè)計(jì)哲學(xué)[2],Go向語(yǔ)言使用者暴露了簡(jiǎn)單的TCP Socket API接口,而將Go TCP socket網(wǎng)絡(luò)編程的“復(fù)雜性”留給了自己并隱藏在Go運(yùn)行時(shí)的實(shí)現(xiàn)中。這樣,大多數(shù)情況下,Go開(kāi)發(fā)者無(wú)需關(guān)心Socket是否是阻塞的,也無(wú)需親自將Socket文件描述符的回調(diào)函數(shù)注冊(cè)到類(lèi)似epoll這樣的系統(tǒng)調(diào)用中,而只需在每個(gè)連接對(duì)應(yīng)的goroutine中以最簡(jiǎn)單最易用的**“阻塞I/O模型”**的方式進(jìn)行Socket操作即可(像下圖所示),這種設(shè)計(jì)大大降低了網(wǎng)絡(luò)應(yīng)用開(kāi)發(fā)人員的心智負(fù)擔(dān)。

          這是經(jīng)典的Go tcp網(wǎng)絡(luò)編程模型。由于TCP是全雙工模型,每一端(peer)都可以單獨(dú)在已經(jīng)建立的連接上進(jìn)行讀寫(xiě),因此在Go中,我們常常針對(duì)一個(gè)已建立的TCP連接建立兩個(gè)goroutine,一個(gè)負(fù)責(zé)從連接上讀取數(shù)據(jù)(如需響應(yīng)(ack),也可以由該read goroutine直接回復(fù)),一個(gè)負(fù)責(zé)將新生成的業(yè)務(wù)數(shù)據(jù)寫(xiě)入連接。

          read goroutine為例,其典型的程序結(jié)構(gòu)如下:

          func handleConn(c net.Conn) {
              defer c.Close()
              for {
                  // read from the connection c
                  ... ...
                  // write ack to the connection c
                  ... ...
              }
          }

          func main() {
              l, err := net.Listen("tcp"":8888")
              if err != nil {
                  fmt.Println("listen error:", err)
                  return
              }

              for {
                  c, err := l.Accept()
                  if err != nil {
                      fmt.Println("accept error:", err)
                      break
                  }
                  // start a new goroutine to handle
                  // the new connection.
                  go handleConn(c) // start a read goroutine
              }
          }

          從上面代碼,我們看到,針對(duì)每一個(gè)向server建立成功的連接,程序都會(huì)啟動(dòng)一個(gè)reader goroutine負(fù)責(zé)從連接讀取數(shù)據(jù),并在處理后,返回(向連接寫(xiě)入)響應(yīng)(ack)。這樣的程序結(jié)構(gòu)已經(jīng)直白到無(wú)法再直白了,即便你是網(wǎng)絡(luò)編程小白,看懂這樣的程序想必也不會(huì)費(fèi)多少腦細(xì)胞。

          我們知道,TCP傳輸控制協(xié)議是一種面向連接的、可靠的、基于字節(jié)流的傳輸層通信協(xié)議,因此TCP socket編程多為流數(shù)據(jù)(streaming)處理。這種數(shù)據(jù)的特點(diǎn)是按序逐個(gè)字節(jié)傳輸,在傳輸層沒(méi)有明顯的數(shù)據(jù)邊界(只有應(yīng)用層能識(shí)別出協(xié)議數(shù)據(jù)的邊界,這個(gè)依賴(lài)應(yīng)用層協(xié)議的定義)。TCP發(fā)送端發(fā)送了1000個(gè)字節(jié),TCP接收端就會(huì)接收到1000個(gè)字節(jié)。發(fā)送端可能通過(guò)一次發(fā)送操作就發(fā)送了這1000個(gè)字節(jié),但接收端可能通過(guò)10次讀取操作才讀完這1000個(gè)字節(jié),也就是說(shuō)發(fā)送端的發(fā)送動(dòng)作與接收端的接收動(dòng)作并沒(méi)有嚴(yán)格的一一對(duì)應(yīng)關(guān)系。這與UDP協(xié)議基于數(shù)據(jù)報(bào)(diagram)形式的數(shù)據(jù)傳輸形式有本質(zhì)差別(更多關(guān)于tcp與udp差別的內(nèi)容可以詳見(jiàn)《TCP/IP詳解卷1:協(xié)議》[3]一書(shū))。

          本文我們就來(lái)了解一下基于經(jīng)典Go阻塞式網(wǎng)絡(luò)I/O模型對(duì)基于TCP流的自定義協(xié)議進(jìn)行解析的基本模式。

          2. 自定義協(xié)議簡(jiǎn)述

          為了便于后續(xù)內(nèi)容展開(kāi),我們現(xiàn)在這里說(shuō)明一下我們即將解析的自定義流協(xié)議。基于TCP的自定義應(yīng)用層流協(xié)議有兩種常見(jiàn)的定義模式:

          • 二進(jìn)制模式

          采用長(zhǎng)度字段分隔,常見(jiàn)的包括:mqtt(物聯(lián)網(wǎng)最常用的應(yīng)用層協(xié)議之一)、cmpp(中國(guó)移動(dòng)互聯(lián)網(wǎng)短信網(wǎng)關(guān)接口協(xié)議)[4]等。

          • 文本模式

          采用特定分隔符分割和識(shí)別,常見(jiàn)的包括http等。

          這里我們使用二進(jìn)制模式來(lái)定義我們即將解析的應(yīng)用層協(xié)議,下面是協(xié)議的定義:

          這是一個(gè)請(qǐng)求應(yīng)答協(xié)議,請(qǐng)求包和應(yīng)答包的第一個(gè)字段都是包總長(zhǎng)度,這也是在應(yīng)用層用于“分割包”的最重要字段。第二個(gè)字段則是用于標(biāo)識(shí)包類(lèi)型,這里我們定義四種類(lèi)型:

          onst (
              CommandConn   = iota + 0x01 // 0x01,連接請(qǐng)求包
              CommandSubmit               // 0x02,消息發(fā)送請(qǐng)求包
          )

          const (
              CommandConnAck   = iota + 0x80 // 0x81,連接請(qǐng)求的響應(yīng)包
              CommandSubmitAck               //0x82,消息發(fā)送請(qǐng)求的響應(yīng)包
          )

          ID是每個(gè)連接上請(qǐng)求的消息流水,多用于請(qǐng)求發(fā)送方后續(xù)匹配響應(yīng)包之用。請(qǐng)求包與響應(yīng)包唯一的不同之處在于最后一個(gè)字段,請(qǐng)求包定義了有效載荷(payload),而響應(yīng)包則定義了請(qǐng)求包的響應(yīng)狀態(tài)字段(result)。

          明確了應(yīng)用層協(xié)議包的定義后,我們就來(lái)看看如何解析這樣的一個(gè)流協(xié)議吧。

          3. 建立Frame和Packet抽象

          在真正開(kāi)始編寫(xiě)代碼前,我們先來(lái)針對(duì)上述應(yīng)用層協(xié)議建立兩個(gè)抽象概念:Frame和Packet。

          首先,我們?cè)O(shè)定無(wú)論是從client到server,還是server到client,數(shù)據(jù)流都是由一個(gè)接一個(gè)Frame組成的,上述的協(xié)議就封裝在這一個(gè)個(gè)的Frame中。我們可以通過(guò)特定的方法將Frame與Frame分割開(kāi)來(lái):

          每個(gè)Frame由一個(gè)totalLength和frame payload構(gòu)成,如下圖左側(cè)Frame結(jié)構(gòu)所示:

          這樣,我們通過(guò)Frame header: totalLength即可將Frame之間隔離開(kāi)來(lái)。我們將Frame payload定義為一個(gè)packet,每個(gè)Packet的結(jié)構(gòu)如上圖右側(cè)所示。每個(gè)packet包含commandID、ID和payload(packet payload)字段。

          這樣我們就將上述的協(xié)議轉(zhuǎn)換為由Frame和Packet兩個(gè)抽象組成的TCP流了。

          4. 阻塞式TCP流協(xié)議解析的基本程序結(jié)構(gòu)

          建立完抽象后,我們就要開(kāi)始解析這個(gè)協(xié)議了!下圖是該阻塞式TCP流協(xié)議解析的server流程圖:

          我們看到tcp流數(shù)據(jù)先后經(jīng)由frame decode和packet decode后得到應(yīng)用層所需的packet數(shù)據(jù),應(yīng)用層回復(fù)的響應(yīng)則先后經(jīng)過(guò)packet的encode與frame的encode后寫(xiě)入tcp響應(yīng)流中。

          下面我們就先來(lái)看看frame編解碼的代碼。我們首先定義frame編碼器的接口類(lèi)型:

          // github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo1/pkg/frame/frame.go

          type FramePayload []byte

          type StreamFrameCodec interface {
              Encode(io.Writer, FramePayload) error   // data -> frame,并寫(xiě)入io.Writer
              Decode(io.Reader) (FramePayload, error) // 從io.Reader中提取frame payload,并返回給上層
          }

          我們將流數(shù)據(jù)的輸入定義為io.Reader,將流數(shù)據(jù)輸出定義為io.Writer。和上圖中的設(shè)計(jì)意義,Decode方法返回framePayload,而Encode會(huì)將輸入的framePayload編碼為frame并寫(xiě)入outbound的tcp流。

          一旦確定好接口方法集,我們就來(lái)給出一個(gè)StreamFrameCodec接口的實(shí)現(xiàn):


          // github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo1/pkg/frame/frame.go

          type myFrameCodec struct{}

          func NewMyFrameCodec() StreamFrameCodec {
              return &myFrameCodec{}
          }

          func (p *myFrameCodec) Encode(w io.Writer, framePayload FramePayload) error {
              var f = framePayload
              var totalLen int32 = int32(len(framePayload)) + 4

              err := binary.Write(w, binary.BigEndian, &totalLen)
              if err != nil {
                  return err
              }

              // make sure all data will be written to outbound stream
              for {
                  n, err := w.Write([]byte(f)) // write the frame payload to outbound stream
                  if err != nil {
                      return err
                  }
                  if n >= len(f) {
                      break
                  }
                  if n < len(f) {
                      f = f[n:]
                  }
              }
              return nil
          }

          func (p *myFrameCodec) Decode(r io.Reader) (FramePayload, error) {
              var totalLen int32
              err := binary.Read(r, binary.BigEndian, &totalLen)
              if err != nil {
                  return nil, err
              }

              buf := make([]byte, totalLen-4)
              _, err = io.ReadFull(r, buf)
              if err != nil {
                  return nil, err
              }
              return FramePayload(buf), nil
          }

          在上面在這段實(shí)現(xiàn)中,有三點(diǎn)要注意:

          • 網(wǎng)絡(luò)字節(jié)序使用大端字節(jié)序(BigEndian)[5],因此無(wú)論是Encode還是Decode,我們都是用binary.BigEndian;
          • binary.Read或Write會(huì)根據(jù)參數(shù)的寬度讀取或?qū)懭雽?duì)應(yīng)的字節(jié)個(gè)數(shù)的字節(jié),這里totalLen使用int32,那么Read或Write只會(huì)操作流中的4個(gè)字節(jié);
          • 這里沒(méi)有設(shè)置deadline,因此io.ReadFull一般會(huì)讀滿(mǎn)你所需的字節(jié)數(shù),除非遇到EOF或ErrUnexpectedEOF。

          接下來(lái),我們?cè)倏纯碢acket的編解碼。和Frame不同,Packet有多種類(lèi)型(這里僅定義了Conn, submit,connack, submit ack)。因此我們首先抽象一下這些類(lèi)型需要遵循的共同接口:

          // github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo1/pkg/packet/packet.go

          type Packet interface {
              Decode([]byte) error     // []byte -> struct
              Encode() ([]byte, error) //  struct -> []byte
          }

          其中Decode是將一段字節(jié)流數(shù)據(jù)解碼為一個(gè)Packet類(lèi)型,可能是conn,可能是submit等(根據(jù)解碼出來(lái)的commandID判斷)。而Encode則是將一個(gè)Packet類(lèi)型編碼為一段字節(jié)流數(shù)據(jù)。下面是submit和submitack類(lèi)型的Packet接口實(shí)現(xiàn):

          // github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo1/pkg/packet/packet.go

          type Submit struct {
              ID      string
              Payload []byte
          }

          func (s *Submit) Decode(pktBody []byte) error {
              s.ID = string(pktBody[:8])
              s.Payload = pktBody[8:]
              return nil
          }

          func (s *Submit) Encode() ([]byte, error) {
              return bytes.Join([][]byte{[]byte(s.ID[:8]), s.Payload}, nil), nil
          }

          type SubmitAck struct {
              ID     string
              Result uint8
          }

          func (s *SubmitAck) Decode(pktBody []byte) error {
              s.ID = string(pktBody[0:8])
              s.Result = uint8(pktBody[8])
              return nil
          }

          func (s *SubmitAck) Encode() ([]byte, error) {
              return bytes.Join([][]byte{[]byte(s.ID[:8]), []byte{s.Result}}, nil), nil
          }

          不過(guò)上述各種類(lèi)型的編解碼被調(diào)用的前提是明確數(shù)據(jù)流是什么類(lèi)型的,因此我們需要在包級(jí)提供一個(gè)對(duì)外的函數(shù)Decode,該函數(shù)負(fù)責(zé)從字節(jié)流中解析出對(duì)應(yīng)的類(lèi)型(根據(jù)commandID),并調(diào)用對(duì)應(yīng)類(lèi)型的Decode方法:

          // github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo1/pkg/packet/packet.go
          func Decode(packet []byte) (Packet, error) {
           commandID := packet[0]
           pktBody := packet[1:]

           switch commandID {
           case CommandConn:
            return nil, nil
           case CommandConnAck:
            return nil, nil
           case CommandSubmit:
            s := Submit{}
            err := s.Decode(pktBody)
            if err != nil {
             return nil, err
            }
            return &s, nil
           case CommandSubmitAck:
            s := SubmitAck{}
            err := s.Decode(pktBody)
            if err != nil {
             return nil, err
            }
            return &s, nil
           default:
            return nil, fmt.Errorf("unknown commandID [%d]", commandID)
           }
          }

          同樣,我們也需要包級(jí)的Encode函數(shù),根據(jù)傳入的packet類(lèi)型調(diào)用對(duì)應(yīng)的Encode方法實(shí)現(xiàn)對(duì)象的編碼:

          // github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo1/pkg/packet/packet.go
          func Encode(p Packet) ([]byte, error) {
           var commandID uint8
           var pktBody []byte
           var err error

           switch t := p.(type) {
           case *Submit:
            commandID = CommandSubmit
            pktBody, err = p.Encode()
            if err != nil {
             return nil, err
            }
           case *SubmitAck:
            commandID = CommandSubmitAck
            pktBody, err = p.Encode()
            if err != nil {
             return nil, err
            }
           default:
            return nil, fmt.Errorf("unknown type [%s]", t)
           }
           return bytes.Join([][]byte{[]byte{commandID}, pktBody}, nil), nil
          }

          好了,萬(wàn)事俱備只欠東風(fēng)!下面我們就來(lái)編寫(xiě)程序結(jié)構(gòu),將tcp conn與Frame、Packet連接起來(lái):

          // github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo1/cmd/server/main.go

          package main

          import (
           "fmt"
           "net"

           "github.com/bigwhite/tcp-stream-proto/demo1/pkg/frame"
           "github.com/bigwhite/tcp-stream-proto/demo1/pkg/packet"
          )

          func handlePacket(framePayload []byte) (ackFramePayload []byte, err error) {
           var p packet.Packet
           p, err = packet.Decode(framePayload)
           if err != nil {
            fmt.Println("handleConn: packet decode error:", err)
            return
           }

           switch p.(type) {
           case *packet.Submit:
            submit := p.(*packet.Submit)
            fmt.Printf("recv submit: id = %s, payload=%s\n", submit.ID, string(submit.Payload))
            submitAck := &packet.SubmitAck{
             ID:     submit.ID,
             Result: 0,
            }
            ackFramePayload, err = packet.Encode(submitAck)
            if err != nil {
             fmt.Println("handleConn: packet encode error:", err)
             return nil, err
            }
            return ackFramePayload, nil
           default:
            return nil, fmt.Errorf("unknown packet type")
           }
          }

          func handleConn(c net.Conn) {
           defer c.Close()
           frameCodec := frame.NewMyFrameCodec()

           for {
            // read from the connection

            // decode the frame to get the payload
            // the payload is undecoded packet
            framePayload, err := frameCodec.Decode(c)
            if err != nil {
             fmt.Println("handleConn: frame decode error:", err)
             return
            }

            // do something with the packet
            ackFramePayload, err := handlePacket(framePayload)
            if err != nil {
             fmt.Println("handleConn: handle packet error:", err)
             return
            }

            // write ack frame to the connection
            err = frameCodec.Encode(c, ackFramePayload)
            if err != nil {
             fmt.Println("handleConn: frame encode error:", err)
             return
            }
           }
          }

          func main() {
           l, err := net.Listen("tcp"":8888")
           if err != nil {
            fmt.Println("listen error:", err)
            return
           }

           for {
            c, err := l.Accept()
            if err != nil {
             fmt.Println("accept error:", err)
             break
            }
            // start a new goroutine to handle
            // the new connection.
            go handleConn(c)
           }
          }

          在上面這個(gè)程序中,main函數(shù)是標(biāo)準(zhǔn)的“one connection per goroutine”的結(jié)構(gòu),重點(diǎn)邏輯都在handleConn中。在handleConn中,我們看到十分清晰的代碼結(jié)構(gòu):

          read conn
           ->frame decode
            -> handle packet
             -> packet decode
             -> packet(ack) encode
           ->frame(ack) encode
          write conn

          到這里,一個(gè)經(jīng)典阻塞式TCP流解析的demo就完成了(你可以將demo中提供的client和server run起來(lái)驗(yàn)證一下)。

          5. 可能的優(yōu)化點(diǎn)

          在上面的demo1中,我們直接將net.Conn實(shí)例傳給frame.Decode作為io.Reader參數(shù)的實(shí)參,這樣我們每次調(diào)用Read方法都是直接從Conn中讀取數(shù)據(jù)。不過(guò)Go runtime使用net poller將net.Conn.Read轉(zhuǎn)換為io多路復(fù)用的等待,避免了每次從net.Conn直接讀取都轉(zhuǎn)換為一次系統(tǒng)調(diào)用。但即便如此,也可能會(huì)多一次goroutine的上下文切換(在數(shù)據(jù)尚未ready的情況下)。雖然goroutine的上下文切換代價(jià)相較于線(xiàn)程切換要小許多,但畢竟這種切換并不是免費(fèi)的,我們要減少這種切換。我們可以通過(guò)緩存讀的方式來(lái)減少net.Conn.Read真實(shí)調(diào)用的頻率。我們可以像下面這樣改造demo1的例子:

          // github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo2/cmd/server/main.go

          func handleConn(c net.Conn) {
              defer c.Close()
              frameCodec := frame.NewMyFrameCodec()
              rbuf := bufio.NewReader(c) // 為io增加緩存

              for {
                  // read from the connection

                  // decode the frame to get the payload
                  // the payload is undecoded packet
                  framePayload, err := frameCodec.Decode(rbuf) // 使用bufio,減少直接read conn.Conn的次數(shù)
                  if err != nil {
                      fmt.Println("handleConn: frame decode error:", err)
                      return
                  }
                  ... ...
              }
              ... ...
          }

          bufio內(nèi)部每次從net.Conn嘗試讀取其內(nèi)部緩存(buf)大小的數(shù)據(jù),而不是用戶(hù)傳入的希望讀取的數(shù)據(jù)大小。這些數(shù)據(jù)緩存在內(nèi)存中,這樣后續(xù)Read就可以直接從內(nèi)存中得到數(shù)據(jù),而不是每次都從net.Conn讀取,從而降低goroutine上下文切換的頻率。

          除此之外,我們?cè)趂rame包中的frame Decode實(shí)現(xiàn)如下:

          // github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo2/pkg/frame/frame.go

          func (p *myFrameCodec) Decode(r io.Reader) (FramePayload, error) {
              var totalLen int32
              err := binary.Read(r, binary.BigEndian, &totalLen)
              if err != nil {
                  return nil, err
              }

              buf := make([]byte, totalLen-4)
              _, err = io.ReadFull(r, buf)
              if err != nil {
                  return nil, err
              }
              return FramePayload(buf), nil
          }

          我們看到每次調(diào)用這個(gè)方法都會(huì)分配一個(gè)buf,并且buf是不定長(zhǎng)的,這些在程序關(guān)鍵路徑上的堆內(nèi)存對(duì)象分配會(huì)給GC帶來(lái)壓力,我們要盡量避免或減小其頻度,一個(gè)可行的辦法是盡量重用對(duì)象,在Go中一提到重用內(nèi)存對(duì)象,我們就想到了sync.Pool,但這里還有一個(gè)問(wèn)題,那就是“不定長(zhǎng)”,這給sync.Pool的使用增加了難度。

          mcache[6]是字節(jié)技術(shù)團(tuán)隊(duì)開(kāi)源的多級(jí)sync.Pool包,它可以根據(jù)你所要分配的對(duì)象大小選擇不同的sync.Pool池,有些類(lèi)似tcmalloc的多級(jí)(class)內(nèi)存對(duì)象管理,與Go runtime的mcache也是類(lèi)似的,mcache一共分為46個(gè)等級(jí),每個(gè)等級(jí)一個(gè)sync.Pool:

          // github.com/bytedance/gopkg/tree/master/lang/mcache/mcache.go
          const maxSize = 46

          // index contains []byte which cap is 1<<index
          var caches [maxSize]sync.Pool

          我們可以從mcache中分配內(nèi)存來(lái)?yè)Q掉每次都申請(qǐng)一個(gè)[]byte的動(dòng)作以達(dá)到內(nèi)存對(duì)象重用,降低GC壓力的目的:

          // github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo3/pkg/frame/frame.go

          func (p *myFrameCodec) Decode(r io.Reader) (FramePayload, error) {
              var totalLen int32
              err := binary.Read(r, binary.BigEndian, &totalLen)
              if err != nil {
                  return nil, err
              }

              buf := mcache.Malloc(int(totalLen - 4))  // 這里我們重用mcache中的內(nèi)存對(duì)象
              _, err = io.ReadFull(r, buf)
              if err != nil {
                  return nil, err
              }
              return FramePayload(buf), nil
          }

          有了mcache.Malloc,我們就需要在特定位置調(diào)用mcache.Free歸還內(nèi)存對(duì)象,而packet中的Decode就是最好的位置:

          // github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo3/pkg/packet/packet.go

          func Decode(packet []byte) (Packet, error) {
              defer mcache.Free(packet) // 在decode結(jié)束后,釋放對(duì)象回mcache
              commandID := packet[0]
              pktBody := packet[1:]
              ... ...
          }

          上面是兩個(gè)在不動(dòng)用pprof這樣的工具的前提下就能識(shí)別出的較為明顯的可優(yōu)化的點(diǎn),可優(yōu)化的點(diǎn)可能還有很多,這里不一一列舉了。

          6. 簡(jiǎn)單的壓力測(cè)試

          既然給出了優(yōu)化的點(diǎn),我們就來(lái)粗略壓測(cè)一下優(yōu)化前和優(yōu)化后的程序。我們?yōu)閮蓚€(gè)版本程序添加上基于標(biāo)準(zhǔn)庫(kù)expvar的計(jì)數(shù)器(以?xún)?yōu)化前的demo1為例):

          // github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo1-with-metrics/cmd/server/main.go

          func handleConn(c net.Conn) {
              defer c.Close()
              frameCodec := frame.NewMyFrameCodec()
              
              for {
                  // read from the connection
                  ... ...
                  // write ack frame to the connection
                  err = frameCodec.Encode(c, ackFramePayload)
                  if err != nil {
                      fmt.Println("handleConn: frame encode error:", err)
                      return
                  }   
                  monitor.SubmitInTotal.Add(1) // 每處理完一條消息,計(jì)數(shù)器+1
              }   
          }   

          在monitor包中,我們每秒計(jì)算一下處理性能:

          // github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo1-with-metrics/pkg/monitor/monitor.go
          func init() {
              // register statistics index
              SubmitInTotal = expvar.NewInt("submitInTotal")
              submitInRate = expvar.NewInt("submitInRate")

              go func() {
                  var lastSubmitInTotal int64

                  ticker := time.NewTicker(time.Second)
                  defer ticker.Stop()
                  for {
                      select {
                      case <-ticker.C:
                          newSubmitInTotal := SubmitInTotal.Value()
                          submitInRate.Set(newSubmitInTotal - lastSubmitInTotal) // 兩秒處理的消息量之差作為處理速度
                          lastSubmitInTotal = newSubmitInTotal
                      }
                  }
              }()
          }

          有了基于expvar的計(jì)數(shù)器,我們就可以通過(guò)帶有導(dǎo)出csv功能的expvarmon工具獲取程序每秒的處理性能了(壓測(cè)客戶(hù)端可以使用demo1-with-metrics的client)。下面的性能對(duì)比圖是在一個(gè)4核8g的云主機(jī)上獲得的(條件有限,壓測(cè)client與server放在一臺(tái)機(jī)器上了,必然相互干擾):

          我們看到,優(yōu)化后的程序從趨勢(shì)上看略微好于優(yōu)化前的(雖然不是很穩(wěn)定)。

          如果你覺(jué)得采集瞬時(shí)值太夠?qū)I(yè)^_^,也可以在被測(cè)程序上添加基于go-metrics的metric,這個(gè)作業(yè)就留給大家了:)

          7. 小結(jié)

          在本文中,我們簡(jiǎn)單說(shuō)明了Go經(jīng)典阻塞I/O的TCP網(wǎng)絡(luò)編程模型,這種模型最大的好處就是簡(jiǎn)單,降低開(kāi)發(fā)人員在處理網(wǎng)絡(luò)I/O時(shí)的心智負(fù)擔(dān),將更多關(guān)注集中在業(yè)務(wù)層面。文中基于這種模型,給出了一個(gè)自定義流協(xié)議的解析實(shí)現(xiàn)框架,并說(shuō)明了一些可優(yōu)化的點(diǎn)。在非超大連接數(shù)量的場(chǎng)景下,這類(lèi)模型會(huì)有不錯(cuò)性能和開(kāi)發(fā)效率。一旦連接數(shù)量猛增,相應(yīng)的處理這些連接的goroutine數(shù)量就會(huì)線(xiàn)性增加,Goroutine調(diào)度的開(kāi)銷(xiāo)就會(huì)顯著增加,這個(gè)時(shí)候我們就要考慮是否使用其他模型應(yīng)對(duì)了,這個(gè)我們?cè)诤罄m(xù)篇章再說(shuō)。

          本文涉及的所有代碼可以從這里下載[7]:https://github.com/bigwhite/experiments/tree/master/tcp-stream-proto

          參考資料

          [1] 

          TCP Socket: https://tonybai.com/2015/11/17/tcp-programming-in-golang/

          [2] 

          追求簡(jiǎn)單的設(shè)計(jì)哲學(xué): https://www.imooc.com/read/87/article/2321

          [3] 

          《TCP/IP詳解卷1:協(xié)議》: https://book.douban.com/subject/26825411/

          [4] 

          cmpp(中國(guó)移動(dòng)互聯(lián)網(wǎng)短信網(wǎng)關(guān)接口協(xié)議): https://github.com/bigwhite/gocmpp

          [5] 

          大端字節(jié)序(BigEndian): https://tonybai.com/2011/01/21/encounter-byte-order-problem-again/

          [6] 

          mcache: https://github.com/bytedance/gopkg/tree/master/lang/mcache

          [7] 

          這里下載: https://github.com/bigwhite/experiments/tree/master/tcp-stream-proto

          [8] 

          改善Go語(yǔ)?編程質(zhì)量的50個(gè)有效實(shí)踐: https://www.imooc.com/read/87

          [9] 

          Kubernetes實(shí)戰(zhàn):高可用集群搭建、配置、運(yùn)維與應(yīng)用: https://coding.imooc.com/class/284.html

          [10] 

          我愛(ài)發(fā)短信: https://51smspush.com/

          [11] 

          鏈接地址: https://m.do.co/c/bff6eed92687



          推薦閱讀


          福利

          我為大家整理了一份從入門(mén)到進(jìn)階的Go學(xué)習(xí)資料禮包,包含學(xué)習(xí)建議:入門(mén)看什么,進(jìn)階看什么。關(guān)注公眾號(hào) 「polarisxu」,回復(fù) ebook 獲取;還可以回復(fù)「進(jìn)群」,和數(shù)萬(wàn) Gopher 交流學(xué)習(xí)。

          瀏覽 29
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  五月婷婷综合久久 | 日本黄色操逼 | 狠狠色网站 | 亚洲精品一区二区三区蜜桃 | 亚洲91视频 |