<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Go 項(xiàng)目實(shí)戰(zhàn):實(shí)現(xiàn)一個(gè) Redis (1) 之編寫(xiě) TCP 服務(wù)器

          共 3716字,需瀏覽 8分鐘

           ·

          2020-12-01 16:43

          點(diǎn)擊上方藍(lán)色“Go語(yǔ)言中文網(wǎng)”關(guān)注,每天一起學(xué) Go

          Golang 作為廣泛用于服務(wù)端和云計(jì)算領(lǐng)域的編程語(yǔ)言,tcp socket 是其中至關(guān)重要的功能。無(wú)論是 WEB 服務(wù)器還是各類中間件都離不開(kāi) tcp socket 的支持。

          • Echo 服務(wù)器
          • 拆包與粘包
          • 優(yōu)雅關(guān)閉

          您可以在 Github:HDT3213/Godis[1] 項(xiàng)目中看到本文所述 TCP 服務(wù)器的完整代碼及其應(yīng)用。

          IO 模型是網(wǎng)絡(luò)編程中最基礎(chǔ)的話題。早期的 Tomcat/Apache 服務(wù)器使用 BlockingIO 模型,使用一個(gè)線程 listen 端口當(dāng)建立新連接后將其交給另一個(gè)線程處理。因?yàn)槊總€(gè)連接都需要一個(gè)線程,因此內(nèi)存占用及上下文切換帶來(lái)的開(kāi)銷極大。

          此后的 Nginx/Netty/Tornado 使用 select/poll/epoll 等多路 IO 復(fù)用模型它使用一個(gè)線程監(jiān)聽(tīng)多個(gè)連接,當(dāng)任一連接收到數(shù)據(jù)后監(jiān)聽(tīng)線程都會(huì)收到一個(gè)事件并處理它,從而實(shí)現(xiàn)在一個(gè)線程中交替處理多個(gè)連接的 IO 操作。因?yàn)槎鄠€(gè)連接“復(fù)用”了一個(gè)線程,所以需要的線程數(shù)少很多開(kāi)銷也小的多。有得必有失,由于需要在一個(gè)線程中處理多個(gè)連接,開(kāi)發(fā)多路 IO 復(fù)用程序要難得多。

          因?yàn)?Golang 擁有內(nèi)存占用和調(diào)度開(kāi)銷都很小的 goroutine, 因此我們可以讓 BlockingIO 煥發(fā)第二春。得益于輕量級(jí)的 goroutine 我們可以為每個(gè)連接分配一個(gè)協(xié)程,在降低開(kāi)發(fā)難度同時(shí)獲得不錯(cuò)的性能。

          Echo 服務(wù)器

          作為開(kāi)始,我們來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的 Echo 服務(wù)器。它會(huì)接受客戶端連接并將客戶端發(fā)送的內(nèi)容原樣傳回客戶端。

          package?main

          import?(
          ????"fmt"
          ????"net"
          ????"io"
          ????"log"
          ????"bufio"
          )

          func?ListenAndServe(address?string)?{
          ????//?綁定監(jiān)聽(tīng)地址
          ????listener,?err?:=?net.Listen("tcp",?address)
          ????if?err?!=?nil?{
          ????????log.Fatal(fmt.Sprintf("listen?err:?%v",?err))
          ????}
          ????defer?listener.Close()
          ????log.Println(fmt.Sprintf("bind:?%s,?start?listening...",?address))

          ????for?{
          ????????//?Accept?會(huì)一直阻塞直到有新的連接建立或者listen中斷才會(huì)返回
          ????????conn,?err?:=?listener.Accept()
          ????????if?err?!=?nil?{
          ????????????//?通常是由于listener被關(guān)閉無(wú)法繼續(xù)監(jiān)聽(tīng)導(dǎo)致的錯(cuò)誤
          ????????????log.Fatal(fmt.Sprintf("accept?err:?%v",?err))
          ????????}
          ????????//?開(kāi)啟新的?goroutine?處理該連接
          ????????go?Handle(conn)
          ????}
          }

          func?Handle(conn?net.Conn)?{
          ????//?使用?bufio?標(biāo)準(zhǔn)庫(kù)提供的緩沖區(qū)功能
          ????reader?:=?bufio.NewReader(conn)
          ????for?{
          ????????//?ReadString?會(huì)一直阻塞直到遇到分隔符?'\n'
          ????????//?遇到分隔符后會(huì)返回上次遇到分隔符或連接建立后收到的所有數(shù)據(jù),?包括分隔符本身
          ????????//?若在遇到分隔符之前遇到異常,?ReadString?會(huì)返回已收到的數(shù)據(jù)和錯(cuò)誤信息
          ????????msg,?err?:=?reader.ReadString('\n')
          ????????if?err?!=?nil?{
          ????????????//?通常遇到的錯(cuò)誤是連接中斷或被關(guān)閉,用io.EOF表示
          ????????????if?err?==?io.EOF?{
          ????????????????log.Println("connection?close")
          ????????????}?else?{
          ????????????????log.Println(err)
          ????????????}
          ????????????return
          ????????}
          ????????b?:=?[]byte(msg)
          ????????//?將收到的信息發(fā)送給客戶端
          ????????conn.Write(b)
          ????}
          }

          func?main()?{
          ????ListenAndServe(":8000")
          }

          使用 telnet 工具測(cè)試我們編寫(xiě)的 Echo 服務(wù)器:

          $?telnet?127.0.0.1?8000
          Trying?127.0.0.1...
          Connected?to?127.0.0.1.
          Escape?character?is?'^]'.
          >?a
          a
          >?b
          b
          Connection?closed?by?foreign?host.

          拆包與粘包問(wèn)題

          某些朋友可能看到"拆包與粘包"后表示極度震驚,并再三強(qiáng)調(diào): TCP 是個(gè)字節(jié)流協(xié)議不存在粘包問(wèn)題。

          TCP 協(xié)議確實(shí)是面向字節(jié)流的協(xié)議,當(dāng)我們開(kāi)發(fā) TCP 服務(wù)器(其實(shí)是基于 TCP 協(xié)議的應(yīng)用層服務(wù)器)時(shí)有義務(wù)正確地從字節(jié)流中解析出應(yīng)用層消息。

          大多數(shù)語(yǔ)言提供的 TCP 接口允許我們通過(guò) read() 函數(shù)讀取新收到的一段數(shù)據(jù),當(dāng)然這段數(shù)據(jù)并不一定對(duì)應(yīng)一個(gè) TCP 包。

          舉例來(lái)說(shuō),在 Echo 服務(wù)器的示例中,我們定義用\n表示消息結(jié)束。我們可能遇到下列幾種情況:

          1. 收到兩段數(shù)據(jù): "abc", "def\n", 應(yīng)發(fā)出一條響應(yīng) "abcdef\n", 這是拆包的情況
          2. 收到一段數(shù)據(jù): "abc\ndef\n", 應(yīng)發(fā)出兩條響應(yīng) "abc\n", "def\n", 這是粘包的情況

          上層協(xié)議通常采用下列幾種思路之一來(lái)定義消息,以保證完整地進(jìn)行讀取:

          • 定長(zhǎng)消息
          • 在消息尾部添加特殊分隔符,如示例中的 Echo 協(xié)議和 FTP 控制協(xié)議。bufio 標(biāo)準(zhǔn)庫(kù)會(huì)緩存收到的數(shù)據(jù)直到遇到分隔符才會(huì)返回,它可以幫助我們正確地分割字節(jié)流。
          • 將消息分為 header 和 body, 并在 header 中提供 body 總長(zhǎng)度,這種分包方式被稱為 LTV(length,type,value) 包。這是應(yīng)用最廣泛的策略,如 HTTP 協(xié)議。當(dāng)從 header 中獲得 body 長(zhǎng)度后, io.ReadFull 函數(shù)會(huì)讀取指定長(zhǎng)度字節(jié)流,從而解析應(yīng)用層消息。

          在沒(méi)有具體應(yīng)用層協(xié)議的情況下,我們很難詳細(xì)地討論拆包與粘包問(wèn)題。在本系列的第二篇文章: 實(shí)現(xiàn) Redis 協(xié)議解析器[2] 中我們可以看到 Redis 序列化協(xié)議(RESP)對(duì)分隔符和 LTV 包的結(jié)合應(yīng)用,以及兩種分包方式的具體解析代碼。

          優(yōu)雅關(guān)閉

          在生產(chǎn)環(huán)境下需要保證 TCP 服務(wù)器關(guān)閉前完成必要的清理工作,包括將完成正在進(jìn)行的數(shù)據(jù)傳輸,關(guān)閉 TCP 連接等。這種關(guān)閉模式稱為優(yōu)雅關(guān)閉,可以避免資源泄露以及客戶端未收到完整數(shù)據(jù)造成異常。

          TCP 服務(wù)器的優(yōu)雅關(guān)閉模式通常為: 先關(guān)閉 listener 阻止新連接進(jìn)入,然后遍歷所有連接逐個(gè)進(jìn)行關(guān)閉。

          首先修改一下 TCP 服務(wù)器:

          //?handler?是應(yīng)用層服務(wù)器的抽象
          type?Handler?interface?{
          ????Handle(ctx?context.Context,?conn?net.Conn)
          ????Close()error
          }

          func?ListenAndServe(cfg?*Config,?handler?tcp.Handler)?{
          ????listener,?err?:=?net.Listen("tcp",?cfg.Address)
          ????if?err?!=?nil?{
          ????????logger.Fatal(fmt.Sprintf("listen?err:?%v",?err))
          ????}

          ????//?監(jiān)聽(tīng)中斷信號(hào)
          ????//?atomic.AtomicBool?是作者寫(xiě)的封裝:?https://github.com/HDT3213/godis/blob/master/src/lib/sync/atomic/bool.go
          ????var?closing?atomic.AtomicBool
          ????sigCh?:=?make(chan?os.Signal,?1)
          ????signal.Notify(sigCh,?syscall.SIGHUP,?syscall.SIGQUIT,?syscall.SIGTERM,?syscall.SIGINT)
          ????go?func()?{
          ????????sig?:=?<-sigCh
          ????????switch?sig?{
          ????????case?syscall.SIGHUP,?syscall.SIGQUIT,?syscall.SIGTERM,?syscall.SIGINT:
          ????????????//?收到中斷信號(hào)后開(kāi)始關(guān)閉流程
          ????????????logger.Info("shuting?down...")
          ????????????//?設(shè)置標(biāo)志位為關(guān)閉中,?使用原子操作保證線程可見(jiàn)性
          ????????????closing.Set(true)
          ????????????//?先關(guān)閉?listener?阻止新連接進(jìn)入
          ????????????//?listener?關(guān)閉后?listener.Accept()?會(huì)立即返回錯(cuò)誤
          ????????????_?=?listener.Close()
          ????????????//?逐個(gè)關(guān)閉已建立鏈接
          ????????????_?=?handler.Close()
          ????????}
          ????}()


          ????logger.Info(fmt.Sprintf("bind:?%s,?start?listening...",?cfg.Address))
          ????defer?func()?{
          ????????//?在出現(xiàn)未知錯(cuò)誤或panic后保證正常關(guān)閉
          ????????//?這里存在一個(gè)問(wèn)題是:?當(dāng)應(yīng)用正常關(guān)閉后會(huì)再次執(zhí)行關(guān)閉操作
          ????????_?=?listener.Close()
          ????????_?=?handler.Close()
          ????}()
          ????ctx,?_?:=?context.WithCancel(context.Background())
          ????//?waitGroup?的計(jì)數(shù)是當(dāng)前仍存在的連接數(shù)
          ????//?進(jìn)入關(guān)閉流程時(shí),主協(xié)程應(yīng)該等待所有連接都關(guān)閉后再退出
          ????var?waitDone?sync.WaitGroup
          ????for?{
          ????????conn,?err?:=?listener.Accept()
          ????????if?err?!=?nil?{
          ????????????if?closing.Get()?{
          ????????????????//?收到關(guān)閉信號(hào)后進(jìn)入此流程,此時(shí)listener已被監(jiān)聽(tīng)系統(tǒng)信號(hào)的?goroutine?關(guān)閉
          ????????????????logger.Info("waiting?disconnect...")
          ????????????????//?主協(xié)程應(yīng)等待應(yīng)用層服務(wù)器完成工作并關(guān)閉鏈接
          ????????????????waitDone.Wait()
          ????????????????return
          ????????????}
          ????????????logger.Error(fmt.Sprintf("accept?err:?%v",?err))
          ????????????continue
          ????????}
          ????????//?創(chuàng)建一個(gè)新協(xié)程處理鏈接
          ????????logger.Info("accept?link")
          ????????go?func()?{
          ????????????defer?func()?{
          ????????????????waitDone.Done()
          ????????????}()
          ????????????waitDone.Add(1)
          ????????????handler.Handle(ctx,?conn)
          ????????}()
          ????}
          }

          接下來(lái)修改應(yīng)用層服務(wù)器:

          //?客戶端連接的抽象
          type?Client?struct?{
          ????//?tcp?連接
          ????Conn?net.Conn
          ????//?當(dāng)服務(wù)端開(kāi)始發(fā)送數(shù)據(jù)時(shí)進(jìn)入waiting,?阻止其它goroutine關(guān)閉連接
          ????//?wait.Wait是作者編寫(xiě)的帶有最大等待時(shí)間的封裝:
          ????//?https://github.com/HDT3213/godis/blob/master/src/lib/sync/wait/wait.go
          ????Waiting?wait.Wait
          }

          type?EchoHandler?struct?{

          ????//?保存所有工作狀態(tài)client的集合(把map當(dāng)set用)
          ????//?需使用并發(fā)安全的容器
          ????activeConn?sync.Map

          ????//?和?tcp?server?中作用相同的關(guān)閉狀態(tài)標(biāo)識(shí)位
          ????closing?atomic.AtomicBool
          }

          func?MakeEchoHandler()(*EchoHandler)?{
          ????return?&EchoHandler{
          ????}
          }

          //?關(guān)閉客戶端連接
          func?(c?*Client)Close()error?{
          ????//?等待數(shù)據(jù)發(fā)送完成或超時(shí)
          ????c.Waiting.WaitWithTimeout(10?*?time.Second)
          ????c.Conn.Close()
          ????return?nil
          }

          func?(h?*EchoHandler)Handle(ctx?context.Context,?conn?net.Conn)?{
          ????if?h.closing.Get()?{
          ????????//?closing?handler?refuse?new?connection
          ????????conn.Close()
          ????}

          ????client?:=?&Client?{
          ????????Conn:?conn,
          ????}
          ????h.activeConn.Store(client,?1)

          ????reader?:=?bufio.NewReader(conn)
          ????for?{
          ????????msg,?err?:=?reader.ReadString('\n')
          ????????if?err?!=?nil?{
          ????????????if?err?==?io.EOF?{
          ????????????????logger.Info("connection?close")
          ????????????????h.activeConn.Delete(conn)
          ????????????}?else?{
          ????????????????logger.Warn(err)
          ????????????}
          ????????????return
          ????????}
          ????????//?發(fā)送數(shù)據(jù)前先置為waiting狀態(tài)
          ????????client.Waiting.Add(1)

          ????????//?模擬關(guān)閉時(shí)未完成發(fā)送的情況
          ????????//logger.Info("sleeping")
          ????????//time.Sleep(10?*?time.Second)

          ????????b?:=?[]byte(msg)
          ????????conn.Write(b)
          ????????//?發(fā)送完畢,?結(jié)束waiting
          ????????client.Waiting.Done()
          ????}
          }

          func?(h?*EchoHandler)Close()error?{
          ????logger.Info("handler?shuting?down...")
          ????h.closing.Set(true)
          ????//?TODO:?concurrent?wait
          ????h.activeConn.Range(func(key?interface{},?val?interface{})bool?{
          ????????client?:=?key.(*Client)
          ????????client.Close()
          ????????return?true
          ????})
          ????return?nil
          }

          作者:finley

          出處:https://www.cnblogs.com/Finley/p/11070669.html

          版權(quán):本作品采用「署名-非商業(yè)性使用-相同方式共享 4.0 國(guó)際[3]」許可協(xié)議進(jìn)行許可。

          參考資料

          [1]

          Github:HDT3213/Godis: https://github.com/HDT3213/godis/tree/master/src/tcp

          [2]

          實(shí)現(xiàn) Redis 協(xié)議解析器: https://www.cnblogs.com/Finley/p/11923168.html

          [3]

          署名-非商業(yè)性使用-相同方式共享 4.0 國(guó)際: https://creativecommons.org/licenses/by-nc-sa/4.0/



          推薦閱讀


          福利

          我為大家整理了一份從入門到進(jìn)階的Go學(xué)習(xí)資料禮包,包含學(xué)習(xí)建議:入門看什么,進(jìn)階看什么。關(guān)注公眾號(hào) 「polarisxu」,回復(fù) ebook 獲??;還可以回復(fù)「進(jìn)群」,和數(shù)萬(wàn) Gopher 交流學(xué)習(xí)。

          瀏覽 83
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  青娱乐精品在线视频 | 操逼操逼操逼开房操小姐逼 | 东京热高清无码 | 奇米影视四色中文字幕 | 成人毛片18女人毛片免费黑人看 |