<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          一個非常不錯的 Go 練手項目

          共 17901字,需瀏覽 36分鐘

           ·

          2021-06-25 00:46

          作者:HDT3213


          今天給大家?guī)淼拈_源項目是 Godis:一個用 Go 語言實現(xiàn)的 Redis 服務(wù)器。支持:

          • 5 種數(shù)據(jù)結(jié)構(gòu)(string、list、hash、set、sortedset)
          • 自動過期(TTL)
          • 發(fā)布訂閱、地理位置、持久化等功能

          你或許不需要自己實現(xiàn) Redis 服務(wù),但你是否厭煩了每天都是寫增刪改查的業(yè)務(wù)代碼,想提高編程水平試圖從零寫個項目打開 IDE 卻發(fā)現(xiàn)無從下手?

          動手造輪子一定是提高編程能力的好辦法,下面就帶大家用 Go 從零開始寫一個 Redis 服務(wù)器(Godis),從中你將學到:

          • 如何編寫 Go 語言 TCP 服務(wù)器
          • 設(shè)計并實現(xiàn)安全可靠的通信協(xié)議(redis 協(xié)議)
          • 如何使用 Go 語言開發(fā)高并發(fā)程序
          • 設(shè)計和實現(xiàn)分布式集群以及分布式事務(wù)
          • 熟悉鏈表、哈希表、跳表以及時間輪等常用數(shù)據(jù)結(jié)構(gòu)

          千萬不要擔心內(nèi)容太難!!雖然示例代碼是 Go,但就算你不會 Go 語言也不會影響你理解 Redis 的原理和底層協(xié)議以及高性能的秘密。而且作者為了照顧到廣大讀者,對技術(shù)的講解做了優(yōu)化。示例代碼在原項目基礎(chǔ)上做了簡化,并逐行地加了注釋。如果是高級玩家,請直接訪問項目閱讀源碼:

          https://github.com/HDT3213/godis

          下面讓我們一起撥開 Redis 的迷霧。

          一、寫個 TCP 服務(wù)器

          眾所周知 Redis 是 C/S 模型,使用 TCP 協(xié)議進行通信。接下來就從實現(xiàn) TCP 服務(wù)端開始。作為廣泛用于服務(wù)端的編程語言 Golang 提供了非常簡潔的 TCP 接口,所以實現(xiàn)起來十分方便。示例代碼:

          func ListenAndServe(address string) {
              // 綁定監(jiān)聽地址
              listener, err := net.Listen("tcp", address)
              if err != nil {
                  log.Fatal(fmt.Sprintf("listen err: %v", err))
              }
              defer listener.Close()
              log.Println(fmt.Sprintf("bind: %s, start listening...", address))

              for {
                  // Accept 會一直阻塞直到有新的連接建立或者listen中斷才會返回
                  conn, err := listener.Accept()
                  if err != nil {
                      // 通常是由于listener被關(guān)閉無法繼續(xù)監(jiān)聽導致的錯誤
                      log.Fatal(fmt.Sprintf("accept err: %v", err))
                  }
                  // 開啟新的 goroutine 處理該連接
                  go Handle(conn)
              }
          }

          func Handle(conn net.Conn) {
              reader := bufio.NewReader(conn)
              for {
                  // ReadString 會一直阻塞直到遇到分隔符 '\n'
                  // 遇到分隔符后 ReadString 會返回上次遇到分隔符到現(xiàn)在收到的所有數(shù)據(jù)
                  // 若在遇到分隔符之前發(fā)生異常, ReadString 會返回已收到的數(shù)據(jù)和錯誤信息
                  msg, err := reader.ReadString('\n')
                  if err != nil {
                      // 通常遇到的錯誤是連接中斷或被關(guān)閉,用io.EOF表示
                      if err == io.EOF {
                          log.Println("connection close")
                      } else {
                          log.Println(err)
                      }
                      return
                  }
                  b := []byte(msg)
                  // 將收到的信息發(fā)送給客戶端
                  conn.Write(b)
              }
          }

          func main() {
              ListenAndServe(":8000")
          }

          ?? 至此只用了 40 行代碼就搞定服務(wù)端啦!啟動上面的 TCP 服務(wù)后,在終端中輸入 telnet 127.0.0.1 8000 就可以連接到剛寫好的服務(wù)器,它會將你發(fā)送的消息原樣返回給你(所以請不要罵它):

          這個 TCP 服務(wù)器的非常簡單,主協(xié)程調(diào)用 accept 函數(shù)來監(jiān)聽端口,接受新連接后開啟一個 Goroutine 來處理它。這種簡單的阻塞 IO 模型有些類似于早期的 Tomcat/Apache 服務(wù)器。

          阻塞 IO 模型是使用一個線程處理一個連接,在沒有收到新數(shù)據(jù)時監(jiān)聽線程處于阻塞狀態(tài),直到數(shù)據(jù)就緒后線程被喚醒進行處理。因為阻塞 IO 模型需要開啟大量線程并且頻繁地進行上下文切換,所以它的效率很低。而 Redis 使用的 epoll 技術(shù)(IO 多路復用)用一個線程處理大量連接,極大地提高了吞吐量。那么我們的 TCP 服務(wù)器會比 Redis 慢很多嗎?

          當然不會,Golang 利用 Goroutine 調(diào)度開銷遠遠小于線程調(diào)度開銷的優(yōu)勢封裝出 goroutine-per-connection 風格的極簡接口,而且 net/tcp 庫將 epoll 封裝成了阻塞 IO 的樣子,在享受 epoll 高性能的同時避免了原生 epoll 接口所需的復雜異步代碼。

          在作者的電腦上 Redis 每秒可以響應 10.6k 個 PING 命令,而 Godis(完整代碼) 的吞吐量為 9.2 kqps 相差并不大。想了解更多 Golang 高性能的秘?密,可以搜索 go netpoller 或者 go 語言 網(wǎng)絡(luò)輪詢器 關(guān)鍵字

          另外,合格的 TCP 的服務(wù)器在關(guān)閉的時候不應該一停了之,而需要完成響應已接收的請求、釋放 TCP 連接等必要的清理工作。這個功能我們一般稱為 優(yōu)雅關(guān)閉 或者 graceful shutdown,優(yōu)雅關(guān)閉步驟:

          • 首先,關(guān)閉 listener 停止接受新連接
          • 然后,遍歷所有存活連接逐個關(guān)閉

          優(yōu)雅關(guān)閉的代碼比較多,這里就不完整貼出了。

          二、透視 Redis 協(xié)議

          在解決完通信后,下一步就是搞清楚 Redis 的協(xié)議,其實就是一套序列化協(xié)議類似 JSON、Protocol Buffers,你看底層其實也就是一些基礎(chǔ)的知識。

          自 Redis 2.0 以后的通信統(tǒng)一為 RESP 協(xié)議(REdis Serialization Protocol),該協(xié)議易于實現(xiàn)不僅可以高效的被程序解析,還能夠被人類讀懂容易調(diào)試。

          RESP 是一個二進制安全的文本協(xié)議,工作于 TCP 協(xié)議上。RESP 以行作為單位,客戶端和服務(wù)器發(fā)送的命令或數(shù)據(jù)一律以 \r\n(CRLF)作為換行符。

          二進制安全是指允許協(xié)議中出現(xiàn)任意字符而不會導致故障。比如 C 語言的字符串以 \0 作為結(jié)尾不允許字符串中間出現(xiàn) \0,而 Go 語言的 string 則允許出現(xiàn) \0,我們說 Go 語言的 string 是二進制安全的,而 C 語言字符串不是二進制安全的。

          RESP 的二進制安全性允許我們在 key 或者 value 中包含 \r 或者 \n 這樣的特殊字符。在使用 Redis 存儲 protobuf、msgpack 等二進制數(shù)據(jù)時,二進制安全性尤為重要。

          RESP 定義了 5 種格式:

          • 簡單字符串(Simple String):服務(wù)器用來返回簡單的結(jié)果,比如 "OK" 非二進制安全,且不允許換行
          • 錯誤信息(Error):服務(wù)器用來返回簡單的錯誤信息,比如 "ERR Invalid Synatx" 非二進制安全,且不允許換行
          • 整數(shù)(Integer):llen、scard 等命令的返回值,64 位有符號整數(shù)
          • 字符串(Bulk String):二進制安全字符串,比如 get 等命令的返回值
          • 數(shù)組(Array,又稱 Multi Bulk Strings):Bulk String 數(shù)組,客戶端發(fā)送指令以及 lrange 等命令響應的格式

          RESP 通過第一個字符來表示格式:

          • 簡單字符串:以"+" 開始, 如:"+OK\r\n"
          • 錯誤:以"-" 開始,如:"-ERR Invalid Synatx\r\n"
          • 整數(shù):以":"開始,如:":1\r\n"
          • 字符串:以 $ 開始
          • 數(shù)組:以 * 開始

          下面讓我們通過一些實際例子來理解協(xié)議。

          2.1 字符串

          字符串(Bulk String)有兩行,第一行為 $+正文長度,第二行為實際內(nèi)容。如:

          $3\r\nSET\r\n

          字符串(Bulk String)是二進制安全的,就是說可以在 Bulk String 內(nèi)部包含 "\r\n" 字符(行尾的 CRLF 被隱藏):

          $4
          a\r\nb

          2.2 空

          $-1 表示 nil,比如使用 get 命令查詢一個不存在的 key 時,響應即為 $-1

          2.3 數(shù)組

          數(shù)組(Array)格式第一行為 "*"+數(shù)組長度,其后是相應數(shù)量的 字符串(Bulk String)。比如 ["foo", "bar"] 的報文(傳輸時的內(nèi)容):

          *2
          $3
          foo
          $3
          bar

          客戶端也使用 數(shù)組(Array)格式向服務(wù)端發(fā)送指令。命令本身將作為第一個參數(shù),比如 SET key value 指令的 RESP 報文:

          *3
          $3
          SET
          $3
          key
          $5
          value

          將換行符打印出來:

          *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n

          2.4 解析預備

          知道常用的 RESP 報文內(nèi)容后,就可以開始著手解析了。但需要注意的是 RESP 是 二進制安全 的協(xié)議,它允許在正文中使用 \r\n 字符。舉例來說 Redis 可以正確接收并執(zhí)行 SET "a\r\nb" hellogithub 指令,這條指令的正確報文是這樣的:

          *3  
          $3
          SET
          $4
          a\r\nb 
          $11
          hellogithub

          ReadBytes 讀取到第五行 "a\r\nb\r\n" 時會將其誤認為兩行:

          *3  
          $3
          SET
          $4
          a  // 錯誤的分行
          b // 錯誤的分行
          $11
          hellogithub

          因此當讀取到第四行 $4 后,不應該繼續(xù)使用 ReadBytes('\n') 讀取下一行,應使用 io.ReadFull(reader, msg) 方法來讀取指定長度的內(nèi)容。

          msg = make([]byte4 + 2// 正文長度4 + 換行符長度2
          _, err = io.ReadFull(reader, msg)

          2.5 編寫 RESP 協(xié)議解析器

          解決完上面內(nèi)容包含 "\r\n" 的問題,我們就可以開始放手編寫 Redis 協(xié)議解析器啦!

          type Payload struct {
           Data redis.Reply
           Err  error
          }

          // ParseStream 通過 io.Reader 讀取數(shù)據(jù)并將結(jié)果通過 channel 將結(jié)果返回給調(diào)用者
          // 流式處理的接口適合供客戶端/服務(wù)端使用
          func ParseStream(reader io.Reader) <-chan *Payload {
           ch := make(chan *Payload)
           go parse0(reader, ch)
           return ch
          }

          由于解析器的代碼比較多,這里只簡單地介紹一下核心流程。

          func parse0(reader io.Reader, ch chan<- *Payload) {
              // 初始化讀取狀態(tài)
              readingMultiLine := false
              expectedArgsCount := 0
              var args [][]byte
              var bulkLen int64
              for {
                  // 上文中我們提到 RESP 是以行為單位的
                  // 因為行分為簡單字符串和二進制安全的 BulkString,我們需要封裝一個 readLine 函數(shù)來兼容
                  line, err = readLine(reader, bulkLen)
                  if err != nil { 
                      // 處理錯誤
                      return
                  }
                  // 接下來我們對剛剛讀取的行進行解析
                  // 我們簡單的將 Reply 分為兩類:
                  // 單行: StatusReply, IntReply, ErrorReply
                  // 多行: BulkReply, MultiBulkReply

                  if !readingMultiLine {
                      if isMulitBulkHeader(line) {
                          // 我們收到了 MulitBulkReply 的第一行
                          // 獲得 MulitBulkReply 中 BulkString 的個數(shù)
                          expectedArgsCount = parseMulitBulkHeader(line)
                          // 等待 MulitBulkReply 后續(xù)行
                          readingMultiLine = true
                      } else if isBulkHeader(line) {
                          // 我們收到了 BulkReply 的第一行
                          // 獲得 BulkReply 第二行的長度, 通過 bulkLen 告訴 readLine 函數(shù)下一行 BulkString 的長度
                          bulkLen = parseBulkHeader()
                          // 這個 Reply 中一共有 1 個 BulkString
                          expectedArgsCount = 1 
                          // 等待 BulkReply 后續(xù)行
                          readingMultiLine = true
                      } else {
                          // 處理 StatusReply, IntReply, ErrorReply 等單行 Reply
                          reply := parseSingleLineReply(line)
                          // 通過 ch 返回結(jié)果
                          emitReply(ch)
                      }
                  } else {
                      // 進入此分支說明我們正在等待 MulitBulkReply 或 BulkReply 的后續(xù)行
                      // MulitBulkReply 的后續(xù)行有兩種,BulkHeader 或者 BulkString
                      if isBulkHeader(line) {
                          bulkLen = parseBulkHeader()
                      } else {
                          // 我們正在讀取一個 BulkString, 它可能是 MulitBulkReply 或 BulkReply 
                          args = append(args, line)
                      }
                      if len(args) == expectedArgsCount { // 我們已經(jīng)讀取了所有后續(xù)行
                          // 通過 ch 返回結(jié)果
                          emitReply(ch)
                          // 重置狀態(tài), 準備解析下一條 Reply
                          readingMultiLine = false
                          expectedArgsCount = 0
                          args = nil
                          bulkLen = 0
                      }
                  }
              }
          }

          三、實現(xiàn)內(nèi)存數(shù)據(jù)庫

          至此我們已經(jīng)搞定數(shù)據(jù)接收和解析的部分了,剩下就是我們應該把數(shù)據(jù)存在哪里了?

          拋開持久化部分,作為基于內(nèi)存的 KV 數(shù)據(jù)庫 Redis 的所有數(shù)據(jù)需要都存儲在內(nèi)存中的哈希表,而這個哈希表就是我們今天需要編寫的最后一個組件。

          與單線程的 Redis 不同我們實現(xiàn)的 Redis(godis)是并行工作的,所以我們必須考慮各種并發(fā)安全問題。常見的并發(fā)安全哈希表設(shè)計有幾種:

          • sync.map:Golang 官方提供的并發(fā)哈希表,適合讀多寫少的場景。但是在 m.dirty 剛被提升后會將 m.read 復制到新的 m.dirty 中,在數(shù)據(jù)量較大的情況下復制操作會阻塞所有協(xié)程,存在較大的隱患。

          • juc.ConcurrentHashMap:Java 的并發(fā)哈希表采用分段鎖實現(xiàn)。在進行擴容時訪問哈希表線程都將協(xié)助進行 rehash 操作,在 rehash 結(jié)束前所有的讀寫操作都會阻塞。因為緩存數(shù)據(jù)庫中鍵值對數(shù)量巨大且對讀寫操作響應時間要求較高,使用 juc 的策略是不合適的。

          • memcached hashtable:在后臺線程進行 rehash 操作時,主線程會判斷要訪問的哈希槽是否已被 rehash 從而決定操作 old_hashtable 還是操作 new_hashtable。這種設(shè)計被稱為漸進式 rehash 它的優(yōu)點是 rehash 操作基本不會阻塞主線程的讀寫,是最理想的的方案。

          但漸進式 rehash 的實現(xiàn)非常復雜,所以 godis 采用 Golang 社區(qū)廣泛使用的分段鎖策略(非上面的三種),就是將 key 分散到固定數(shù)量的 shard 中避免進行整體 rehash 操作。shard 是有鎖保護的 map,當 shard 進行 rehash 時會阻塞 shard 內(nèi)的讀寫,但不會對其他 shard 造成影響。

          代碼如下:

          type ConcurrentDict struct {
              table []*Shard
              count int32
          }

          type Shard struct {
              m     map[string]interface{}
              mutex sync.RWMutex
          }

          func (dict *ConcurrentDict) spread(hashCode uint32) uint32 {
           tableSize := uint32(len(dict.table))
           return (tableSize - 1) & uint32(hashCode)
          }

          func (dict *ConcurrentDict) getShard(index uint32) *Shard {
           return dict.table[index]
          }

          func (dict *ConcurrentDict) Get(key string) (val interface{}, exists bool) {
           hashCode := fnv32(key)
           index := dict.spread(hashCode)
           shard := dict.getShard(index)
           shard.mutex.RLock()
           defer shard.mutex.RUnlock()
           val, exists = shard.m[key]
           return
          }

          func (dict *ConcurrentDict) Put(key string, val interface{}) (result int) {
           if dict == nil {
            panic("dict is nil")
           }
           hashCode := fnv32(key)
           index := dict.spread(hashCode)
           shard := dict.getShard(index)
           shard.mutex.Lock()
           defer shard.mutex.Unlock()

           if _, ok := shard.m[key]; ok {
            shard.m[key] = val
            return 0
           } else {
            shard.m[key] = val
            dict.addCount()
            return 1
           }
          }

          ConcurrentDict 可以保證對單個 key 操作的并發(fā)安全性,但是仍然無法滿足并發(fā)安全的需求,舉例來說:

          1. Incr 命令需要完成:讀取 -> 做加法 -> 寫入 三步操作,讀取和寫入兩步操作不是原子性的
          2. MSETNX 命令當且僅當所有給定鍵都不存在時所有給定鍵設(shè)置值,我們需要保證「檢查多個key是否存在」以及「寫入多個key」這兩個操作的原子性

          因此我們需要實現(xiàn) db.Locker 用于鎖定一個或一組 key 直到我們完成所有操作后再釋放。

          實現(xiàn) db.Locker 最直接的想法是使用一個 map[string]*sync.RWMutex

          • 加鎖過程分為兩步:初始化 mutex -> 加鎖
          • 解鎖過程也分為兩步: 解鎖 -> 釋放mutex

          那么存在一個無法解決的并發(fā)問題:

          時間協(xié)程A協(xié)程B
          1
          locker["a"].Unlock()
          2locker["a"] = &sync.RWMutex{}
          3
          delete(locker["a"])
          4locker["a"].Lock()

          由于 t3 時協(xié)程 B 釋放了鎖,t4 時協(xié)程 A 試圖加鎖會失敗。若協(xié)程B在解鎖時不執(zhí)行 delete(locker["a"]) 就可以避免該異常的發(fā)生,但是這樣會造成嚴重的內(nèi)存泄露。

          我們注意到哈希槽的數(shù)量遠少于 key 的數(shù)量,反過來說多個鍵可以共用一個哈希槽。所以我們不再直接對 key 進行加鎖而是鎖定 key 所在的哈希槽也可以保證安全,另一方面哈希槽數(shù)量較少即使不釋放也不會消耗太多內(nèi)存。

          type Locks struct {
              table []*sync.RWMutex
          }

          func Make(tableSize int) *Locks {
              table := make([]*sync.RWMutex, tableSize)
              for i := 0; i < tableSize; i++ {
                  table[i] = &sync.RWMutex{}
              }
              return &Locks{
                  table: table,
              }
          }

          func (locks *Locks)Lock(key string) {
              index := locks.spread(fnv32(key))
              mu := locks.table[index]
              mu.Lock()
          }

          func (locks *Locks)UnLock(key string) {
              index := locks.spread(fnv32(key))
              mu := locks.table[index]
              mu.Unlock()
          }

          在鎖定多個 key 時需要注意,若 協(xié)程A 持有 鍵a 的鎖試圖獲得 鍵b 的鎖,此時 協(xié)程B 持有 鍵b 的鎖試圖獲得 鍵a 的鎖則會形成死鎖。

          解決方法是所有協(xié)程都按照相同順序加鎖,若兩個協(xié)程都想獲得 鍵a 和 鍵b 的鎖,那么必須先獲取 鍵a 的鎖后獲取 鍵b 的鎖,這樣就可以避免循環(huán)等待。

          到目前為止構(gòu)建 Redis 服務(wù)器所需的基本組件已經(jīng)備齊,只需要將 TCP 服務(wù)器、協(xié)議解析器與哈希表組裝起來我們的 Redis 服務(wù)器就可以開始工作啦。

          最后,以上代碼均簡化自我寫的 Godis:一個開源僅用 Go 語言實現(xiàn)的 Redis 服務(wù)器。期待您的關(guān)注和 Star:

          項目地址:https://github.com/HDT3213/godis

          四、結(jié)束

          很多朋友的日常工作主要是編寫業(yè)務(wù)代碼,對于框架、數(shù)據(jù)庫、中間件這些“架構(gòu)”、“底層代碼” 有一些恐懼感。

          但本文我們只寫了 3 個組件,共計幾百行代碼就實現(xiàn)了一個基本的 Redis 服務(wù)器。所以底層的技術(shù)并不難,只要你對技術(shù)感興趣由淺入深、從簡到繁,“底層代碼”也并不神秘。


             


          喜歡明哥文章的同學
          歡迎長按下圖訂閱!

          ???

          瀏覽 97
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  九九九视频在线 | 在线操b视频 | 豆花av在线 | 成人无码电影333 | 不用播放器看av 成人AV一AV二 |