<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          使用Go實(shí)現(xiàn)可用select監(jiān)聽的隊列

          共 12508字,需瀏覽 26分鐘

           ·

          2021-04-10 10:21

          1. 背景與選型

          《基于Redis Cluster的分布式鎖實(shí)現(xiàn)以互斥方式操作共享資源》一文一樣,今天要說的Go隊列方案也是有一定項(xiàng)目背景的。

          5G消息方興未艾[1]!前一段時間從事了一段時間5G消息網(wǎng)關(guān)的研發(fā),但凡涉及類似消息業(yè)務(wù)的網(wǎng)關(guān),我們一般都離不開隊列這種數(shù)據(jù)結(jié)構(gòu)的支持。這個5G消息網(wǎng)關(guān)項(xiàng)目采用的是Go技術(shù)棧開發(fā),那么我們應(yīng)該如何為它選擇一個與業(yè)務(wù)模型匹配且性能不差的實(shí)現(xiàn)呢?

          如今一提到消息隊列,大家第一個想到的一定是kafka[2],kafka的確是一款優(yōu)秀的分布式隊列中間件,但對于我們這個系統(tǒng)來說,它有些“重”,部署和運(yùn)維都有門檻,并且項(xiàng)目組里也沒有能很好維護(hù)它的專家,畢竟“可控”是技術(shù)選擇的一個重要因素。除此之外,我們更想在Go技術(shù)棧的生態(tài)中挑選,但kafka是Java實(shí)現(xiàn)的。

          Go圈里在性能上能與kafka“掰掰手腕”的成熟選手不多,nats[3]以及其主持持久化的子項(xiàng)目nats-streaming[4]算是其中兩個。不過nats的消息送達(dá)模型是:At-least-once-delivery,即至少送一次(而沒有kafka的精確送一次的送達(dá)模型)。一旦消費(fèi)者性能下降,給nats server返回的應(yīng)答超時,nats就會做消息的重發(fā)處理:即將消息重新加入到隊列中。這與我們的業(yè)務(wù)模型不符,即便nats提供了發(fā)送超時的設(shè)定,但我們還是無法給出適當(dāng)?shù)膖imeout時間。Go圈里的另一個高性能分布式消息隊列nsq[5]采用的也是“至少送一次”的消息送達(dá)模型[6],因此也無法滿足我們的業(yè)務(wù)需求。

          我們的業(yè)務(wù)決定了我們需要的隊列要支持“多生產(chǎn)者多消費(fèi)者”模型,Go語言內(nèi)置的channel也是一個不錯的候選。經(jīng)過多個Go版本的打磨和優(yōu)化,channel的send和recv操作性能在一定數(shù)量goroutine的情況下已經(jīng)可以滿足很多業(yè)務(wù)場景的需求了。但channel還是不完全滿足我們的業(yè)務(wù)需求。我們的系統(tǒng)要求盡可能將來自客戶端的消息接收下來并緩存在隊列中。即便下游發(fā)送性能變慢,也要將客戶消息先收下來,而不是拒收或延遲響應(yīng)。而channel本質(zhì)上是一個具有“靜態(tài)大小”的隊列并且Go的channel操作語義會在channel buffer滿的情況下阻塞對channel的繼續(xù)send,這就與我們的場景要求有背離,即便我們使用buffered channel,我們也很難選擇一個合適的len值,并且一旦buffer滿,它與unbuffered channel行為無異。

          這樣一來,我們便選擇自己實(shí)現(xiàn)一個簡單的、高性能的滿足業(yè)務(wù)要求的隊列,并且最好能像channel那樣可以被select監(jiān)聽到數(shù)據(jù)ready,而不是給消費(fèi)者帶去“心智負(fù)擔(dān)” :消費(fèi)者采用輪詢的方式查看隊列中是否有數(shù)據(jù)。

          2. 設(shè)計與實(shí)現(xiàn)方案

          要設(shè)計和實(shí)現(xiàn)這樣一個隊列結(jié)構(gòu),我們需要解決三個問題:

          • 實(shí)現(xiàn)隊列這個數(shù)據(jù)結(jié)構(gòu);
          • 實(shí)現(xiàn)多goroutine并發(fā)訪問隊列時對消費(fèi)者和生產(chǎn)者的協(xié)調(diào);
          • 解決消費(fèi)者使用select監(jiān)聽隊列的問題。

          我們逐一來看!

          1) 基礎(chǔ)隊列結(jié)構(gòu)實(shí)現(xiàn)來自一個未被Go項(xiàng)目采納的技術(shù)提案

          隊列是最基礎(chǔ)的數(shù)據(jù)結(jié)構(gòu),實(shí)現(xiàn)一個“先進(jìn)先出(FIFO)”的練手queue十分容易,但實(shí)現(xiàn)一份能加入標(biāo)準(zhǔn)庫、資源占用小且性能良好的queue并不容易。Christian Petrin[7]在2018年10月份曾發(fā)起一份關(guān)于Go標(biāo)準(zhǔn)庫加入queue實(shí)現(xiàn)的技術(shù)提案[8],提案對基于array和鏈表的多種queue實(shí)現(xiàn)[9]進(jìn)行詳細(xì)的比對,并最終給出結(jié)論:impl7[10]是最為適宜和有競爭力的標(biāo)準(zhǔn)庫queue的候選者。雖然該技術(shù)提案目前尚未得到accept,但impl7足可以作為我們的內(nèi)存隊列的基礎(chǔ)實(shí)現(xiàn)。

          2) 為impl7添加并發(fā)支持

          在性能敏感的領(lǐng)域,我們可以直接使用sync包提供的諸多同步原語來實(shí)現(xiàn)goroutine并發(fā)安全訪問,這里也不例外,一個最簡單的讓impl7隊列實(shí)現(xiàn)支持并發(fā)的方法就是使用sync.Mutex實(shí)現(xiàn)對隊列的互斥訪問。由于impl7并未作為一個獨(dú)立的repo存在,我們將其代碼copy到我們的實(shí)現(xiàn)中(queueimpl7.go),并將其包名由queueimpl7改名為queue:

          // github.com/bigwhite/experiments/blob/master/queue-with-select/safe-queue1/queueimpl7.go

          // Package queueimpl7 implements an unbounded, dynamically growing FIFO queue.
          // Internally, queue store the values in fixed sized slices that are linked using
          // a singly linked list.
          // This implementation tests the queue performance when performing lazy creation of
          // the internal slice as well as starting with a 1 sized slice, allowing it to grow
          // up to 16 by using the builtin append function. Subsequent slices are created with
          // 128 fixed size.
          package queue

          // Keeping below as var so it is possible to run the slice size bench tests with no coding changes.
          var (
                  // firstSliceSize holds the size of the first slice.
                  firstSliceSize = 1

                  // maxFirstSliceSize holds the maximum size of the first slice.
                  maxFirstSliceSize = 16

                  // maxInternalSliceSize holds the maximum size of each internal slice.
                  maxInternalSliceSize = 128
          )
          ... ...

          下面我們就來為以queueimpl7為底層實(shí)現(xiàn)的queue增加并發(fā)訪問支持:

          // github.com/bigwhite/experiments/blob/master/queue-with-select/safe-queue1/safe-queue.go

          package queue

          import (
           "sync"
          )

          type SafeQueue struct {
           q *Queueimpl7
           sync.Mutex
          }

          func NewSafe() *SafeQueue {
           sq := &SafeQueue{
            q: New(),
           }

           return sq
          }

          func (s *SafeQueue) Len() int {
           s.Lock()
           n := s.q.Len()
           s.Unlock()
           return n
          }

          func (s *SafeQueue) Push(v interface{}) {
           s.Lock()
           defer s.Unlock()

           s.q.Push(v)
          }

          func (s *SafeQueue) Pop() (interface{}, bool) {
           s.Lock()
           defer s.Unlock()
           return s.q.Pop()
          }

          func (s *SafeQueue) Front() (interface{}, bool) {
           s.Lock()
           defer s.Unlock()
           return s.q.Front()
          }

          我們建立一個新結(jié)構(gòu)體SafeQueue,用于表示支持并發(fā)訪問的Queue,該結(jié)構(gòu)只是在queueimpl7的Queue的基礎(chǔ)上嵌入了sync.Mutex。

          3) 支持select監(jiān)聽

          到這里支持并發(fā)的queue雖然實(shí)現(xiàn)了,但在使用上還存在一些問題,尤其是對消費(fèi)者而言,它只能通過輪詢的方式來檢查隊列中是否有消息。而Go并發(fā)范式中,select扮演著重要角色,如果能讓SafeQueue像普通channel那樣能支持select監(jiān)聽,那么消費(fèi)者在使用時的心智負(fù)擔(dān)將大大降低。于是我們得到了下面第二版的SafeQueue實(shí)現(xiàn):

          // github.com/bigwhite/experiments/blob/master/queue-with-select/safe-queue2/safe-queue.go

          package queue

          import (
           "sync"
           "time"
          )

          const (
           signalInterval = 200
           signalChanSize = 10
          )

          type SafeQueue struct {
           q *Queueimpl7
           sync.Mutex
           C chan struct{}
          }

          func NewSafe() *SafeQueue {
           sq := &SafeQueue{
            q: New(),
            C: make(chan struct{}, signalChanSize),
           }

           go func() {
            ticker := time.NewTicker(time.Millisecond * signalInterval)
            defer ticker.Stop()
            for {
             select {
             case <-ticker.C:
              if sq.q.Len() > 0 {
               // send signal to indicate there are message waiting to be handled
               select {
               case sq.C <- struct{}{}:
                //signaled
               default:
                // not block this goroutine
               }
              }
             }
            }

           }()

           return sq
          }

          func (s *SafeQueue) Len() int {
           s.Lock()
           n := s.q.Len()
           s.Unlock()
           return n
          }

          func (s *SafeQueue) Push(v interface{}) {
           s.Lock()
           defer s.Unlock()

           s.q.Push(v)
          }

          func (s *SafeQueue) Pop() (interface{}, bool) {
           s.Lock()
           defer s.Unlock()
           return s.q.Pop()
          }

          func (s *SafeQueue) Front() (interface{}, bool) {
           s.Lock()
           defer s.Unlock()
           return s.q.Front()
          }

          從上面代碼看到,每個SafeQueue的實(shí)例會伴隨一個goroutine,該goroutine會定期(signalInterval)掃描其所綁定的隊列實(shí)例中當(dāng)前消息數(shù),如果大于0,則會向SafeQueue結(jié)構(gòu)中新增的channel發(fā)送一條數(shù)據(jù),作為一個“事件”。SafeQueue的消費(fèi)者則可以通過select來監(jiān)聽該channel,待收到“事件”后調(diào)用SafeQueue的Pop方法獲取隊列數(shù)據(jù)。下面是一個SafeQueue的簡單使用示例:

          // github.com/bigwhite/experiments/blob/master/queue-with-select/main.go
          package main

          import (
           "fmt"
           "sync"
           "time"

           queue "github.com/bigwhite/safe-queue/safe-queue2"
          )

          func main() {
           var q = queue.NewSafe()
           var wg sync.WaitGroup

           wg.Add(2)
           // 生產(chǎn)者
           go func() {
            for i := 0; i < 1000; i++ {
             time.Sleep(time.Second)
             q.Push(i + 1)

            }
            wg.Done()
           }()

           // 消費(fèi)者
           go func() {
           LOOP:
            for {
             select {
             case <-q.C:
              for {
               i, ok := q.Pop()
               if !ok {
                // no msg available
                continue LOOP
               }

               fmt.Printf("%d\n", i.(int))
              }
             }

            }

           }()

           wg.Wait()
          }

          從支持SafeQueue的原理可以看到,當(dāng)有多個消費(fèi)者時,只有一個消費(fèi)者能得到“事件”并開始消費(fèi)。如果隊列消息較少,只有一個消費(fèi)者可以啟動消費(fèi),這個機(jī)制也不會導(dǎo)致“驚群”;當(dāng)隊列中有源源不斷的消費(fèi)產(chǎn)生時,與SafeQueue綁定的goroutine可能會連續(xù)發(fā)送“事件”,多個消費(fèi)者都會收到事件并啟動消費(fèi)行為。在這樣的實(shí)現(xiàn)下,建議消費(fèi)者在收到“事件”后持續(xù)消費(fèi),直到Pop的第二個返回值返回false(代表隊列為空),就像上面示例中的那樣。

          這個SafeQueue的性能“中規(guī)中矩”,比buffered channel略好(Go 1.16 darwin下跑的benchmark):

          $go test -bench .
          goos: darwin
          goarch: amd64
          pkg: github.com/bigwhite/safe-queue/safe-queue2
          cpu: Intel(R) Core(TM) i5-8257U CPU @ 1.40GHz
          BenchmarkParallelQueuePush-8              10687545        110.9 ns/op       32 B/op        1 allocs/op
          BenchmarkParallelQueuePop-8               18185744         55.58 ns/op        0 B/op        0 allocs/op
          BenchmarkParallelPushBufferredChan-8      10275184        127.1 ns/op       16 B/op        1 allocs/op
          BenchmarkParallelPopBufferedChan-8        10168750        128.8 ns/op       16 B/op        1 allocs/op
          BenchmarkParallelPushUnBufferredChan-8     3005150        414.9 ns/op       16 B/op        1 allocs/op
          BenchmarkParallelPopUnBufferedChan-8       2987301        402.9 ns/op       16 B/op        1 allocs/op
          PASS
          ok   github.com/bigwhite/safe-queue/safe-queue2 11.209s

          注:BenchmarkParallelQueuePop-8因?yàn)槭亲x取空隊列,所以沒有分配內(nèi)存,實(shí)際情況是會有內(nèi)存分配的。另外并發(fā)goroutine的模擬差異可能導(dǎo)致有結(jié)果差異。

          3. 擴(kuò)展與問題

          上面實(shí)現(xiàn)的SafeQueue是一個純內(nèi)存隊列,一旦程序停止/重啟,未處理的消息都將消失。一個傳統(tǒng)的解決方法是采用wal(write ahead log)在推隊列之前將消息持久化后寫入文件,在消息出隊列后將消息狀態(tài)也寫入wal文件中。這樣重啟程序時,從wal中恢復(fù)消息到各個隊列即可。我們也可以將wal封裝到SafeQueue的實(shí)現(xiàn)中,在SafeQueue的Push和Pop時自動操作wal,并對SafeQueue的使用者透明,不過這里有一個前提,那就是隊列消息的可序列化(比如使用protobuf)。另外SafeQueue還需提供一個對外的wal消息恢復(fù)接口。大家可以考慮一下如何實(shí)現(xiàn)這些。

          另外在上述的SafeQueue實(shí)現(xiàn)中,我們在給SafeQueue增加select監(jiān)聽時引入兩個const:

          const (
           signalInterval = 200
           signalChanSize = 10
          )

          對于SafeQueue的使用者而言,這兩個默認(rèn)值可能不滿足需求,那么我們可以將SafeQueue的New方法做一些改造,采用“功能選項(xiàng)(functional option)”的模式[11]為用戶提供設(shè)置這兩個值的可選接口,這個“作業(yè)”也留給大家了^_^。

          本文所有示例代碼可以在這里[12]下載 - https://github.com/bigwhite/experiments/tree/master/queue-with-select。


          參考資料

          [1] 

          5G消息方興未艾: https://51smspush.com

          [2] 

          kafka: https://kafka.apache.org/

          [3] 

          nats: https://github.com/nats-io/nats-server

          [4] 

          nats-streaming: https://github.com/nats-io/nats-streaming-server

          [5] 

          nsq: https://github.com/nsqio/nsq

          [6] 

          “至少送一次”的消息送達(dá)模型: https://nsq.io/overview/features_and_guarantees.html

          [7] 

          Christian Petrin: https://github.com/christianrpetrin

          [8] 

          關(guān)于Go標(biāo)準(zhǔn)庫加入queue實(shí)現(xiàn)的技術(shù)提案: https://github.com/golang/proposal/blob/master/design/27935-unbounded-queue-package.md

          [9] 

          多種queue實(shí)現(xiàn): https://github.com/christianrpetrin/queue-tests

          [10] 

          impl7: https://github.com/christianrpetrin/queue-tests/tree/master/queueimpl7/queueimpl7.go

          [11] 

          “功能選項(xiàng)(functional option)”的模式: https://www.imooc.com/read/87/article/2424

          [12] 

          這里: https://github.com/bigwhite/experiments/tree/master/queue-with-select

          [13] 

          改善Go語?編程質(zhì)量的50個有效實(shí)踐: https://www.imooc.com/read/87

          [14] 

          Kubernetes實(shí)戰(zhàn):高可用集群搭建、配置、運(yùn)維與應(yīng)用: https://coding.imooc.com/class/284.html

          [15] 

          我愛發(fā)短信: https://51smspush.com/

          [16] 

          鏈接地址: https://m.do.co/c/bff6eed92687



          推薦閱讀


          福利

          我為大家整理了一份從入門到進(jìn)階的Go學(xué)習(xí)資料禮包,包含學(xué)習(xí)建議:入門看什么,進(jìn)階看什么。關(guān)注公眾號 「polarisxu」,回復(fù) ebook 獲??;還可以回復(fù)「進(jìn)群」,和數(shù)萬 Gopher 交流學(xué)習(xí)。

          瀏覽 36
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美国产免费A视频 | 色就是亚洲 | 日本a视频免费 | www.最全三级在线 | 成人视频一区 |