Go 項(xiàng)目實(shí)戰(zhàn):實(shí)現(xiàn)一個(gè) Redis (1) 之編寫(xiě) TCP 服務(wù)器
Golang 作為廣泛用于服務(wù)端和云計(jì)算領(lǐng)域的編程語(yǔ)言,tcp socket 是其中至關(guān)重要的功能。無(wú)論是 WEB 服務(wù)器還是各類中間件都離不開(kāi) tcp socket 的支持。
Echo 服務(wù)器 拆包與粘包 優(yōu)雅關(guān)閉
您可以在 Github:HDT3213/Godis[1] 項(xiàng)目中看到本文所述 TCP 服務(wù)器的完整代碼及其應(yīng)用。
IO 模型是網(wǎng)絡(luò)編程中最基礎(chǔ)的話題。早期的 Tomcat/Apache 服務(wù)器使用 BlockingIO 模型,使用一個(gè)線程 listen 端口當(dāng)建立新連接后將其交給另一個(gè)線程處理。因?yàn)槊總€(gè)連接都需要一個(gè)線程,因此內(nèi)存占用及上下文切換帶來(lái)的開(kāi)銷極大。
此后的 Nginx/Netty/Tornado 使用 select/poll/epoll 等多路 IO 復(fù)用模型它使用一個(gè)線程監(jiān)聽(tīng)多個(gè)連接,當(dāng)任一連接收到數(shù)據(jù)后監(jiān)聽(tīng)線程都會(huì)收到一個(gè)事件并處理它,從而實(shí)現(xiàn)在一個(gè)線程中交替處理多個(gè)連接的 IO 操作。因?yàn)槎鄠€(gè)連接“復(fù)用”了一個(gè)線程,所以需要的線程數(shù)少很多開(kāi)銷也小的多。有得必有失,由于需要在一個(gè)線程中處理多個(gè)連接,開(kāi)發(fā)多路 IO 復(fù)用程序要難得多。
因?yàn)?Golang 擁有內(nèi)存占用和調(diào)度開(kāi)銷都很小的 goroutine, 因此我們可以讓 BlockingIO 煥發(fā)第二春。得益于輕量級(jí)的 goroutine 我們可以為每個(gè)連接分配一個(gè)協(xié)程,在降低開(kāi)發(fā)難度同時(shí)獲得不錯(cuò)的性能。
Echo 服務(wù)器
作為開(kāi)始,我們來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的 Echo 服務(wù)器。它會(huì)接受客戶端連接并將客戶端發(fā)送的內(nèi)容原樣傳回客戶端。
package?main
import?(
????"fmt"
????"net"
????"io"
????"log"
????"bufio"
)
func?ListenAndServe(address?string)?{
????//?綁定監(jiān)聽(tīng)地址
????listener,?err?:=?net.Listen("tcp",?address)
????if?err?!=?nil?{
????????log.Fatal(fmt.Sprintf("listen?err:?%v",?err))
????}
????defer?listener.Close()
????log.Println(fmt.Sprintf("bind:?%s,?start?listening...",?address))
????for?{
????????//?Accept?會(huì)一直阻塞直到有新的連接建立或者listen中斷才會(huì)返回
????????conn,?err?:=?listener.Accept()
????????if?err?!=?nil?{
????????????//?通常是由于listener被關(guān)閉無(wú)法繼續(xù)監(jiān)聽(tīng)導(dǎo)致的錯(cuò)誤
????????????log.Fatal(fmt.Sprintf("accept?err:?%v",?err))
????????}
????????//?開(kāi)啟新的?goroutine?處理該連接
????????go?Handle(conn)
????}
}
func?Handle(conn?net.Conn)?{
????//?使用?bufio?標(biāo)準(zhǔn)庫(kù)提供的緩沖區(qū)功能
????reader?:=?bufio.NewReader(conn)
????for?{
????????//?ReadString?會(huì)一直阻塞直到遇到分隔符?'\n'
????????//?遇到分隔符后會(huì)返回上次遇到分隔符或連接建立后收到的所有數(shù)據(jù),?包括分隔符本身
????????//?若在遇到分隔符之前遇到異常,?ReadString?會(huì)返回已收到的數(shù)據(jù)和錯(cuò)誤信息
????????msg,?err?:=?reader.ReadString('\n')
????????if?err?!=?nil?{
????????????//?通常遇到的錯(cuò)誤是連接中斷或被關(guān)閉,用io.EOF表示
????????????if?err?==?io.EOF?{
????????????????log.Println("connection?close")
????????????}?else?{
????????????????log.Println(err)
????????????}
????????????return
????????}
????????b?:=?[]byte(msg)
????????//?將收到的信息發(fā)送給客戶端
????????conn.Write(b)
????}
}
func?main()?{
????ListenAndServe(":8000")
}
使用 telnet 工具測(cè)試我們編寫(xiě)的 Echo 服務(wù)器:
$?telnet?127.0.0.1?8000
Trying?127.0.0.1...
Connected?to?127.0.0.1.
Escape?character?is?'^]'.
>?a
a
>?b
b
Connection?closed?by?foreign?host.
拆包與粘包問(wèn)題
某些朋友可能看到"拆包與粘包"后表示極度震驚,并再三強(qiáng)調(diào): TCP 是個(gè)字節(jié)流協(xié)議不存在粘包問(wèn)題。
TCP 協(xié)議確實(shí)是面向字節(jié)流的協(xié)議,當(dāng)我們開(kāi)發(fā) TCP 服務(wù)器(其實(shí)是基于 TCP 協(xié)議的應(yīng)用層服務(wù)器)時(shí)有義務(wù)正確地從字節(jié)流中解析出應(yīng)用層消息。
大多數(shù)語(yǔ)言提供的 TCP 接口允許我們通過(guò) read() 函數(shù)讀取新收到的一段數(shù)據(jù),當(dāng)然這段數(shù)據(jù)并不一定對(duì)應(yīng)一個(gè) TCP 包。
舉例來(lái)說(shuō),在 Echo 服務(wù)器的示例中,我們定義用\n表示消息結(jié)束。我們可能遇到下列幾種情況:
收到兩段數(shù)據(jù): "abc", "def\n", 應(yīng)發(fā)出一條響應(yīng) "abcdef\n", 這是拆包的情況 收到一段數(shù)據(jù): "abc\ndef\n", 應(yīng)發(fā)出兩條響應(yīng) "abc\n", "def\n", 這是粘包的情況
上層協(xié)議通常采用下列幾種思路之一來(lái)定義消息,以保證完整地進(jìn)行讀取:
定長(zhǎng)消息 在消息尾部添加特殊分隔符,如示例中的 Echo 協(xié)議和 FTP 控制協(xié)議。bufio 標(biāo)準(zhǔn)庫(kù)會(huì)緩存收到的數(shù)據(jù)直到遇到分隔符才會(huì)返回,它可以幫助我們正確地分割字節(jié)流。 將消息分為 header 和 body, 并在 header 中提供 body 總長(zhǎng)度,這種分包方式被稱為 LTV(length,type,value) 包。這是應(yīng)用最廣泛的策略,如 HTTP 協(xié)議。當(dāng)從 header 中獲得 body 長(zhǎng)度后, io.ReadFull 函數(shù)會(huì)讀取指定長(zhǎng)度字節(jié)流,從而解析應(yīng)用層消息。
在沒(méi)有具體應(yīng)用層協(xié)議的情況下,我們很難詳細(xì)地討論拆包與粘包問(wèn)題。在本系列的第二篇文章: 實(shí)現(xiàn) Redis 協(xié)議解析器[2] 中我們可以看到 Redis 序列化協(xié)議(RESP)對(duì)分隔符和 LTV 包的結(jié)合應(yīng)用,以及兩種分包方式的具體解析代碼。
優(yōu)雅關(guān)閉
在生產(chǎn)環(huán)境下需要保證 TCP 服務(wù)器關(guān)閉前完成必要的清理工作,包括將完成正在進(jìn)行的數(shù)據(jù)傳輸,關(guān)閉 TCP 連接等。這種關(guān)閉模式稱為優(yōu)雅關(guān)閉,可以避免資源泄露以及客戶端未收到完整數(shù)據(jù)造成異常。
TCP 服務(wù)器的優(yōu)雅關(guān)閉模式通常為: 先關(guān)閉 listener 阻止新連接進(jìn)入,然后遍歷所有連接逐個(gè)進(jìn)行關(guān)閉。
首先修改一下 TCP 服務(wù)器:
//?handler?是應(yīng)用層服務(wù)器的抽象
type?Handler?interface?{
????Handle(ctx?context.Context,?conn?net.Conn)
????Close()error
}
func?ListenAndServe(cfg?*Config,?handler?tcp.Handler)?{
????listener,?err?:=?net.Listen("tcp",?cfg.Address)
????if?err?!=?nil?{
????????logger.Fatal(fmt.Sprintf("listen?err:?%v",?err))
????}
????//?監(jiān)聽(tīng)中斷信號(hào)
????//?atomic.AtomicBool?是作者寫(xiě)的封裝:?https://github.com/HDT3213/godis/blob/master/src/lib/sync/atomic/bool.go
????var?closing?atomic.AtomicBool
????sigCh?:=?make(chan?os.Signal,?1)
????signal.Notify(sigCh,?syscall.SIGHUP,?syscall.SIGQUIT,?syscall.SIGTERM,?syscall.SIGINT)
????go?func()?{
????????sig?:=?<-sigCh
????????switch?sig?{
????????case?syscall.SIGHUP,?syscall.SIGQUIT,?syscall.SIGTERM,?syscall.SIGINT:
????????????//?收到中斷信號(hào)后開(kāi)始關(guān)閉流程
????????????logger.Info("shuting?down...")
????????????//?設(shè)置標(biāo)志位為關(guān)閉中,?使用原子操作保證線程可見(jiàn)性
????????????closing.Set(true)
????????????//?先關(guān)閉?listener?阻止新連接進(jìn)入
????????????//?listener?關(guān)閉后?listener.Accept()?會(huì)立即返回錯(cuò)誤
????????????_?=?listener.Close()
????????????//?逐個(gè)關(guān)閉已建立鏈接
????????????_?=?handler.Close()
????????}
????}()
????logger.Info(fmt.Sprintf("bind:?%s,?start?listening...",?cfg.Address))
????defer?func()?{
????????//?在出現(xiàn)未知錯(cuò)誤或panic后保證正常關(guān)閉
????????//?這里存在一個(gè)問(wèn)題是:?當(dāng)應(yīng)用正常關(guān)閉后會(huì)再次執(zhí)行關(guān)閉操作
????????_?=?listener.Close()
????????_?=?handler.Close()
????}()
????ctx,?_?:=?context.WithCancel(context.Background())
????//?waitGroup?的計(jì)數(shù)是當(dāng)前仍存在的連接數(shù)
????//?進(jìn)入關(guān)閉流程時(shí),主協(xié)程應(yīng)該等待所有連接都關(guān)閉后再退出
????var?waitDone?sync.WaitGroup
????for?{
????????conn,?err?:=?listener.Accept()
????????if?err?!=?nil?{
????????????if?closing.Get()?{
????????????????//?收到關(guān)閉信號(hào)后進(jìn)入此流程,此時(shí)listener已被監(jiān)聽(tīng)系統(tǒng)信號(hào)的?goroutine?關(guān)閉
????????????????logger.Info("waiting?disconnect...")
????????????????//?主協(xié)程應(yīng)等待應(yīng)用層服務(wù)器完成工作并關(guān)閉鏈接
????????????????waitDone.Wait()
????????????????return
????????????}
????????????logger.Error(fmt.Sprintf("accept?err:?%v",?err))
????????????continue
????????}
????????//?創(chuàng)建一個(gè)新協(xié)程處理鏈接
????????logger.Info("accept?link")
????????go?func()?{
????????????defer?func()?{
????????????????waitDone.Done()
????????????}()
????????????waitDone.Add(1)
????????????handler.Handle(ctx,?conn)
????????}()
????}
}
接下來(lái)修改應(yīng)用層服務(wù)器:
//?客戶端連接的抽象
type?Client?struct?{
????//?tcp?連接
????Conn?net.Conn
????//?當(dāng)服務(wù)端開(kāi)始發(fā)送數(shù)據(jù)時(shí)進(jìn)入waiting,?阻止其它goroutine關(guān)閉連接
????//?wait.Wait是作者編寫(xiě)的帶有最大等待時(shí)間的封裝:
????//?https://github.com/HDT3213/godis/blob/master/src/lib/sync/wait/wait.go
????Waiting?wait.Wait
}
type?EchoHandler?struct?{
????//?保存所有工作狀態(tài)client的集合(把map當(dāng)set用)
????//?需使用并發(fā)安全的容器
????activeConn?sync.Map
????//?和?tcp?server?中作用相同的關(guān)閉狀態(tài)標(biāo)識(shí)位
????closing?atomic.AtomicBool
}
func?MakeEchoHandler()(*EchoHandler)?{
????return?&EchoHandler{
????}
}
//?關(guān)閉客戶端連接
func?(c?*Client)Close()error?{
????//?等待數(shù)據(jù)發(fā)送完成或超時(shí)
????c.Waiting.WaitWithTimeout(10?*?time.Second)
????c.Conn.Close()
????return?nil
}
func?(h?*EchoHandler)Handle(ctx?context.Context,?conn?net.Conn)?{
????if?h.closing.Get()?{
????????//?closing?handler?refuse?new?connection
????????conn.Close()
????}
????client?:=?&Client?{
????????Conn:?conn,
????}
????h.activeConn.Store(client,?1)
????reader?:=?bufio.NewReader(conn)
????for?{
????????msg,?err?:=?reader.ReadString('\n')
????????if?err?!=?nil?{
????????????if?err?==?io.EOF?{
????????????????logger.Info("connection?close")
????????????????h.activeConn.Delete(conn)
????????????}?else?{
????????????????logger.Warn(err)
????????????}
????????????return
????????}
????????//?發(fā)送數(shù)據(jù)前先置為waiting狀態(tài)
????????client.Waiting.Add(1)
????????//?模擬關(guān)閉時(shí)未完成發(fā)送的情況
????????//logger.Info("sleeping")
????????//time.Sleep(10?*?time.Second)
????????b?:=?[]byte(msg)
????????conn.Write(b)
????????//?發(fā)送完畢,?結(jié)束waiting
????????client.Waiting.Done()
????}
}
func?(h?*EchoHandler)Close()error?{
????logger.Info("handler?shuting?down...")
????h.closing.Set(true)
????//?TODO:?concurrent?wait
????h.activeConn.Range(func(key?interface{},?val?interface{})bool?{
????????client?:=?key.(*Client)
????????client.Close()
????????return?true
????})
????return?nil
}
作者:finley
出處:https://www.cnblogs.com/Finley/p/11070669.html
版權(quán):本作品采用「署名-非商業(yè)性使用-相同方式共享 4.0 國(guó)際[3]」許可協(xié)議進(jìn)行許可。
參考資料
Github:HDT3213/Godis: https://github.com/HDT3213/godis/tree/master/src/tcp
[2]實(shí)現(xiàn) Redis 協(xié)議解析器: https://www.cnblogs.com/Finley/p/11923168.html
[3]署名-非商業(yè)性使用-相同方式共享 4.0 國(guó)際: https://creativecommons.org/licenses/by-nc-sa/4.0/
推薦閱讀
