Go基于I/O多路復(fù)用的TCP協(xié)議流解析實(shí)踐
在《Go經(jīng)典阻塞式TCP協(xié)議流解析的實(shí)踐》一文中,我們基于Go經(jīng)典的阻塞I/O模型實(shí)現(xiàn)了一個(gè)基于TCP流的自定義協(xié)議的解析。這種one-connection-per-goroutine模型的優(yōu)點(diǎn)就是簡單、好寫以及好理解,降低開發(fā)者心智負(fù)擔(dān)。但一旦連接數(shù)上來,goroutine的數(shù)量就會(huì)線性增加。當(dāng)面對海量連接的場景,這種模型將力不從心:系統(tǒng)中將存在大量goroutine,goroutine調(diào)度和切換的開銷過多。
那么面對海量連接場景,應(yīng)該如何解決呢?業(yè)界成熟方案:使用I/O多路復(fù)用模型。了解Go net包實(shí)現(xiàn)的朋友想必都知曉Go在運(yùn)行時(shí)底層使用的也是I/O多路復(fù)用,其實(shí)現(xiàn)為runtime中的netpoll[1]。goroutine層面獲得的net.Conn(無論是Accept的,還是Dial得到的)都展現(xiàn)出“阻塞”的特征,但這些net.Conn底層實(shí)現(xiàn)的fd(文件描述符)在netpoll中都是non-blocking(非阻塞)的,Go運(yùn)行時(shí)負(fù)責(zé)調(diào)用epoll等多路復(fù)用機(jī)制監(jiān)視這些fd是否可讀或可寫,并適時(shí)喚醒goroutine繼續(xù)網(wǎng)絡(luò)I/O操作,這種方式減少了系統(tǒng)調(diào)用,也減少了運(yùn)行Goroutine的M(操作系統(tǒng)線程)因系統(tǒng)調(diào)用陷入內(nèi)核態(tài)等待的頻率以及因阻塞失去M而不得不去創(chuàng)建新線程的數(shù)量。
那么在用戶層面建立自己的I/O多路復(fù)用的不足在哪里呢?復(fù)雜,不好寫,不好理解。但似乎也沒有其他更好的辦法。除非換語言,否則就得硬著頭皮上^_^。好在,Go社區(qū)已經(jīng)有幾個(gè)不錯(cuò)的Go用戶層面非阻塞I/O多路復(fù)用的開發(fā)框架庫可供選擇,比如:evio[2]、gnet[3]、easygo[4]等。我們選擇gnet。但注意:選擇不代表推薦,這里僅是來做這個(gè)實(shí)踐而已,是否使用gnet開發(fā)上生產(chǎn)的程序,需要你自己評估確定。
1. 基于gnet開發(fā)TCP流協(xié)議解析程序
用框架的一個(gè)門檻就是你要去學(xué)習(xí)框架本身。好在gnet提供了幾個(gè)很典型的examples[5],我們可以基于其中的custom_codec[6]來快速開發(fā)我們的TCP流協(xié)議解析程序。
下面是基于gnet框架實(shí)現(xiàn)custom codec的一個(gè)關(guān)鍵循環(huán),了解這個(gè)循環(huán),我們就知道在什么位置調(diào)用Frame編解碼以及packet編解碼了,這樣決定了后續(xù)demo程序的結(jié)構(gòu):

上面圖中右邊虛框中的frame編解碼、packet編解碼以及React是用戶需要自己實(shí)現(xiàn)的,gnet框架的eventloop.loopRead方法會(huì)循環(huán)調(diào)用frame編解碼和React以實(shí)現(xiàn)TCP流的處理以及響應(yīng)的返回。有了這樣一張“地圖”,我們就可以明確demo程序中各個(gè)包的大致位置了。
我們的demo改自gnet的例子custom_codec[7],其main包結(jié)構(gòu)來自于custom_codec:
// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo4/cmd/server/main.go
type customCodecServer struct {
*gnet.EventServer
addr string
multicore bool
async bool
codec gnet.ICodec
workerPool *goroutine.Pool
}
func (cs *customCodecServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
log.Printf("custom codec server is listening on %s (multi-cores: %t, loops: %d)\n",
srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
return
}
func customCodecServe(addr string, multicore, async bool, codec gnet.ICodec) {
var err error
codec = frame.Frame{}
cs := &customCodecServer{addr: addr, multicore: multicore, async: async, codec: codec, workerPool: goroutine.Default()}
err = gnet.Serve(cs, addr, gnet.WithMulticore(multicore), gnet.WithTCPKeepAlive(time.Minute*5), gnet.WithCodec(codec))
if err != nil {
panic(err)
}
}
func main() {
var port int
var multicore bool
// Example command: go run server.go --port 8888 --multicore=true
flag.IntVar(&port, "port", 8888, "server port")
flag.BoolVar(&multicore, "multicore", true, "multicore")
flag.Parse()
addr := fmt.Sprintf("tcp://:%d", port)
customCodecServe(addr, multicore, false, nil)
}
針對上面代碼,有兩點(diǎn)要注意:
customCodecServe的第三個(gè)參數(shù)我們傳入了false,即我們選擇同步回復(fù)應(yīng)答,而不是異步回復(fù)。 我們將自定義的frame編解碼器(實(shí)現(xiàn)了gnet.ICodec接口)實(shí)例傳給了customCodecServer實(shí)例,這樣后續(xù)gnet loopRead調(diào)用的就是我們自定義的frame編解碼器了。
按上面流程圖的順序,gnet從conn讀取的字節(jié)流將傳遞給我們的frame解碼器,下面我們看看基于gnet的Frame解碼器的實(shí)現(xiàn)(我們的自定義協(xié)議定義可以參考《Go經(jīng)典阻塞式TCP協(xié)議流解析的實(shí)踐》一文):
// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo4/pkg/frame/frame.go
type Frame []byte
func (cc Frame) Decode(c gnet.Conn) ([]byte, error) {
// read length
var frameLength uint32
if n, header := c.ReadN(4); n == 4 {
byteBuffer := bytes.NewBuffer(header)
_ = binary.Read(byteBuffer, binary.BigEndian, &frameLength)
if frameLength > 100 {
c.ResetBuffer()
return nil, errors.New("length value is wrong")
}
if n, wholeFrame := c.ReadN(int(frameLength)); n == int(frameLength) {
c.ShiftN(int(frameLength)) // shift frame length
return wholeFrame[4:], nil // return frame payload
} else {
return nil, errors.New("not enough frame payload data")
}
}
return nil, errors.New("not enough frame length data")
}
上面Frame的Decode實(shí)現(xiàn)既負(fù)責(zé)frame解碼,同時(shí)也會(huì)對frame的當(dāng)前數(shù)據(jù)完整性進(jìn)行校驗(yàn),如果一個(gè)完整的frame尚未就緒,Decode會(huì)返回錯(cuò)誤,之后gnet還會(huì)在連接(conn)可讀時(shí)再次調(diào)用該Decode函數(shù)。這里實(shí)現(xiàn)的關(guān)鍵就是gnet.Conn.ReadN這個(gè)方法,這個(gè)方法本質(zhì)上是一個(gè)Peek操作(gnet稱之為lazyRead),即只預(yù)覽數(shù)據(jù), 不挪動(dòng)數(shù)據(jù)流中的“讀指針”的位置。frame未完全就緒時(shí),gnet在底層會(huì)使用RingBuffer存放已經(jīng)到位的frame的部分?jǐn)?shù)據(jù)。如果frame所有數(shù)據(jù)都就緒了,那么Decode會(huì)調(diào)用gnet.Conn.ShiftN方法來挪動(dòng)底層RingBuffer的“讀指針”的位置,表明這段數(shù)據(jù)已經(jīng)被上層讀取了。
如果預(yù)讀取到的frame長度過長(這里代碼中的100是一個(gè)魔數(shù),僅做demo演示之用,你可以根據(jù)實(shí)際情況使用frame可能的最大值),則會(huì)清空當(dāng)前緩存并返回錯(cuò)誤。(但gnet并沒有因此而斷開與客戶端的連接,這塊兒gnet的機(jī)制是否合理還有待商榷。)
如果解碼順利,根據(jù)我們自定義的協(xié)議spec,我們會(huì)將frame的payload返回,即從frame的第五個(gè)字節(jié)開始返回。
從上圖看到,frame Decode返回的payload將作為輸入數(shù)據(jù)傳給eventHandler.React方法,這個(gè)方法也是我們自己實(shí)現(xiàn)的:
// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo4/cmd/server/main.go
func (cs *customCodecServer) React(framePayload []byte, c gnet.Conn) (out []byte, action gnet.Action) {
var p packet.Packet
var ackFramePayload []byte
p, err := packet.Decode(framePayload)
if err != nil {
fmt.Println("react: packet decode error:", err)
action = gnet.Close // close the connection
return
}
switch p.(type) {
case *packet.Submit:
submit := p.(*packet.Submit)
fmt.Printf("recv submit: id = %s, payload=%s\n", submit.ID, string(submit.Payload))
submitAck := &packet.SubmitAck{
ID: submit.ID,
Result: 0,
}
ackFramePayload, err = packet.Encode(submitAck)
if err != nil {
fmt.Println("handleConn: packet encode error:", err)
action = gnet.Close // close the connection
return
}
out = []byte(ackFramePayload)
return
default:
return nil, gnet.Close // close the connection
}
}
在React中,我們利用packet包對傳入的frame payload進(jìn)行Decode并處理得到的Packet,處理后將packet響應(yīng)進(jìn)行編碼(encode),編碼后得到的字節(jié)序列(ackFramePayload)將作為React的第一個(gè)返回值out返回。
frame會(huì)對React返回的ackFramePayload進(jìn)行Encode,編碼后的字節(jié)序列將被gnet寫入outbound的tcp流中去:
// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo4/pkg/frame/frame.go
func (cc Frame) Encode(c gnet.Conn, framePayload []byte) ([]byte, error) {
result := make([]byte, 0)
buffer := bytes.NewBuffer(result)
// encode frame length(4+ framePayload length)
length := uint32(4 + len([]byte(framePayload)))
if err := binary.Write(buffer, binary.BigEndian, length); err != nil {
s := fmt.Sprintf("Pack length error , %v", err)
return nil, errors.New(s)
}
// encode frame payload
n, err := buffer.Write(framePayload)
if err != nil {
s := fmt.Sprintf("Pack frame payload error , %v", err)
return nil, errors.New(s)
}
if n != len(framePayload) {
s := fmt.Sprintf("Pack frame payload length error , %v", err)
return nil, errors.New(s)
}
return buffer.Bytes(), nil
}
這樣一個(gè)loopRead循環(huán)就完成了。我們可以使用《Go經(jīng)典阻塞式TCP協(xié)議流解析的實(shí)踐》一文中的client對該程序進(jìn)行測試:
// demo2的client
$./client
2021/07/25 16:35:34 dial ok
send submit id = 00000001, payload=full-bluestreak-207e
the result of submit ack[00000001] is 0
send submit id = 00000002, payload=cosmic-spider-ham-2985
the result of submit ack[00000002] is 0
send submit id = 00000003, payload=true-forge-3552
the result of submit ack[00000003] is 0
// demo4的server
$./server
2021/07/25 16:35:31 custom codec server is listening on :8888 (multi-cores: true, loops: 8)
recv submit: id = 00000001, payload=full-bluestreak-207e
recv submit: id = 00000002, payload=cosmic-spider-ham-2985
recv submit: id = 00000003, payload=true-forge-3552
2. 壓測對比
gnet針對內(nèi)存分配、緩存重用等做了很多優(yōu)化,我們來將其與阻塞I/O模型程序在性能上做一下簡單比較(由于資源有限,我們這里的壓測也和上一文中一樣,采用100個(gè)client連接盡力(best effort)發(fā)送,而不是海量連接)。
下面是demo1(阻塞I/O模型未優(yōu)化)、demo3(阻塞I/O模型優(yōu)化后)以及demo4(io多路復(fù)用模型)的性能對比:

粗略來看,采用gnet I/O多路復(fù)用模型的程序(demo4)在性能上平均比阻塞I/O模型優(yōu)化后的程序(demo3)高出15%~20%。
不僅如此,通過dstat采集的系統(tǒng)監(jiān)控?cái)?shù)據(jù)也表明跑demo4時(shí),cpu系統(tǒng)時(shí)間(sys)占用也比demo3少了5個(gè)點(diǎn)左右:
跑demo3時(shí)的dstat -tcdngym輸出:
----system---- ----total-cpu-usage---- -dsk/total- -net/total- ---paging-- ---system-- ------memory-usage-----
time |usr sys idl wai hiq siq| read writ| recv send| in out | int csw | used buff cach free
23-07 17:03:17| 2 1 97 0 0 0|3458B 19k| 0 0 | 0 0 | 535 2475 |1921M 225M 5354M 8386M
23-07 17:03:18| 40 45 5 0 0 11| 0 0 | 66B 54B| 0 0 | 11k 15k|1922M 225M 5354M 8384M
23-07 17:03:19| 39 46 6 0 0 9| 0 0 | 66B 1158B| 0 0 | 12k 18k|1922M 225M 5354M 8384M
23-07 17:03:20| 35 48 7 0 0 11| 0 0 | 66B 462B| 0 0 | 12k 22k|1922M 225M 5354M 8385M
23-07 17:03:21| 39 44 7 0 0 10| 0 12k| 66B 462B| 0 0 | 11k 16k|1922M 225M 5354M 8385M
23-07 17:03:22| 38 45 6 0 0 10| 0 0 | 66B 102B| 0 0 | 11k 16k|1923M 225M 5354M 8384M
23-07 17:03:23| 38 45 7 0 0 10| 0 0 | 66B 470B| 0 0 | 12k 20k|1923M 225M 5354M 8384M
23-07 17:03:24| 39 46 6 0 0 9| 0 0 | 66B 462B| 0 0 | 11k 19k|1923M 225M 5354M 8384M
跑demo4時(shí)的dstat -tcdngym輸出:
----system---- ----total-cpu-usage---- -dsk/total- -net/total- ---paging-- ---system-- ------memory-usage-----
time |usr sys idl wai hiq siq| read writ| recv send| in out | int csw | used buff cach free
24-07 20:28:38| 43 42 7 0 0 8| 0 20k|1050B 14k| 0 0 | 11k 18k|1954M 234M 5959M 7738M
24-07 20:28:39| 44 41 9 0 0 7| 0 16k| 396B 7626B| 0 0 | 11k 17k|1954M 234M 5959M 7739M
24-07 20:28:40| 43 42 6 0 0 8| 0 0 | 132B 7044B| 0 0 | 11k 16k|1954M 234M 5959M 7738M
24-07 20:28:41| 42 42 8 0 0 8| 0 0 | 630B 12k| 0 0 | 12k 20k|1955M 234M 5959M 7738M
24-07 20:28:42| 45 41 7 0 0 7| 0 0 | 726B 9980B| 0 0 | 11k 16k|1955M 234M 5959M 7738M
2. 異步回應(yīng)答
在上面的例子中,我們采用的是gnet同步回應(yīng)答的方式,gnet還支持異步回應(yīng)答的方式,即將React中得到的ackFramePayload提交給gnet創(chuàng)建的一個(gè)goroutine Worker池,由worker池中的某個(gè)空閑goroutine在后續(xù)將ackFramePayload編碼為一個(gè)完整的ackFrame后返回給client端。
要支持異步回應(yīng)答,我們需要對demo4做幾處修改(見demo5),主要修改點(diǎn)都在cmd/server/main.go中。
第一處:main函數(shù)調(diào)用customCodecServe時(shí),將第三個(gè)參數(shù)async設(shè)置為true:
// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo5/cmd/server/main.go
func main() {
... ...
customCodecServe(addr, multicore, true, nil)
}
第二處:在customCodecServer的React方法中,我們得到編碼后的ackFramePayload后,不要立即將其賦值給out并返回,而是判斷是否要異步返回應(yīng)答。如果異步返回應(yīng)答,則將ackFramePayload提交給workerpool,workerPool后續(xù)會(huì)分配goroutine,并通過gnet.Conn的AsyncWrite將應(yīng)答寫回client。如果非異步,在將ackFramePayload賦值給out并返回。
// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo5/cmd/server/main.go
func (cs *customCodecServer) React(framePayload []byte, c gnet.Conn) (out []byte, action gnet.Action) {
... ...
switch p.(type) {
case *packet.Submit:
submit := p.(*packet.Submit)
fmt.Printf("recv submit: id = %s, payload=%s\n", submit.ID, string(submit.Payload))
submitAck := &packet.SubmitAck{
ID: submit.ID,
Result: 0,
}
ackFramePayload, err = packet.Encode(submitAck)
if err != nil {
fmt.Println("handleConn: packet encode error:", err)
action = gnet.Close // close the connection
return
}
default:
return nil, gnet.Close // close the connection
}
if cs.async {
data := append([]byte{}, ackFramePayload...)
_ = cs.workerPool.Submit(func() {
fmt.Println("handleConn: async write ackFramePayload")
c.AsyncWrite(data)
})
return
}
out = ackFramePayload
return
}
除此之外,其他包的代碼不變。我們依然還做個(gè)壓測,看看異步回應(yīng)答的demo5性能究竟如何!

從上圖來看,在這個(gè)場景下通過異步回應(yīng)答的方式,性能反而下降很多,甚至還不如阻塞式I/O模型的程序。對此沒有做深究,但猜測可能是應(yīng)答過多且同時(shí)集中回復(fù)時(shí)workerpool創(chuàng)建了很多goroutine,不僅沒有起到池化的作用,還帶來的goroutine創(chuàng)建和調(diào)度的開銷。
3. 小結(jié)
在本文中,我們將阻塞式I/O模型換成了I/O多路復(fù)用模型,并基于gnet框架重新實(shí)現(xiàn)了自定義TCP流協(xié)議的解析程序。在同步回應(yīng)答的策略下,基于gnet開發(fā)TCP流協(xié)議解析程序相比于阻塞I/O模型程序的性能有一定提升。
本文涉及的所有代碼可以從這里下載[8]:https://github.com/bigwhite/experiments/tree/master/tcp-stream-proto
參考資料
netpoll: https://github.com/golang/go/tree/master/src/runtime/netpoll.go
[2]evio: https://github.com/tidwall/evio
[3]gnet: https://github.com/panjf2000/gnet
[4]easygo: https://github.com/mailru/easygo
[5]gnet提供了幾個(gè)很典型的examples: https://github.com/gnet-io/gnet-examples
[6]custom_codec: https://github.com/gnet-io/gnet-examples/tree/master/examples/custom_codec
[7]custom_codec: https://github.com/gnet-io/gnet-examples/tree/master/examples/custom_codec
[8]這里下載: https://github.com/bigwhite/experiments/tree/master/tcp-stream-proto
[9]改善Go語?編程質(zhì)量的50個(gè)有效實(shí)踐: https://www.imooc.com/read/87
[10]Kubernetes實(shí)戰(zhàn):高可用集群搭建、配置、運(yùn)維與應(yīng)用: https://coding.imooc.com/class/284.html
[11]我愛發(fā)短信: https://51smspush.com/
[12]鏈接地址: https://m.do.co/c/bff6eed92687
推薦閱讀
