使用Go實(shí)現(xiàn)可用select監(jiān)聽的隊列

1. 背景與選型
和《基于Redis Cluster的分布式鎖實(shí)現(xiàn)以互斥方式操作共享資源》一文一樣,今天要說的Go隊列方案也是有一定項(xiàng)目背景的。
5G消息方興未艾[1]!前一段時間從事了一段時間5G消息網(wǎng)關(guān)的研發(fā),但凡涉及類似消息業(yè)務(wù)的網(wǎng)關(guān),我們一般都離不開隊列這種數(shù)據(jù)結(jié)構(gòu)的支持。這個5G消息網(wǎng)關(guān)項(xiàng)目采用的是Go技術(shù)棧開發(fā),那么我們應(yīng)該如何為它選擇一個與業(yè)務(wù)模型匹配且性能不差的實(shí)現(xiàn)呢?
如今一提到消息隊列,大家第一個想到的一定是kafka[2],kafka的確是一款優(yōu)秀的分布式隊列中間件,但對于我們這個系統(tǒng)來說,它有些“重”,部署和運(yùn)維都有門檻,并且項(xiàng)目組里也沒有能很好維護(hù)它的專家,畢竟“可控”是技術(shù)選擇的一個重要因素。除此之外,我們更想在Go技術(shù)棧的生態(tài)中挑選,但kafka是Java實(shí)現(xiàn)的。
Go圈里在性能上能與kafka“掰掰手腕”的成熟選手不多,nats[3]以及其主持持久化的子項(xiàng)目nats-streaming[4]算是其中兩個。不過nats的消息送達(dá)模型是:At-least-once-delivery,即至少送一次(而沒有kafka的精確送一次的送達(dá)模型)。一旦消費(fèi)者性能下降,給nats server返回的應(yīng)答超時,nats就會做消息的重發(fā)處理:即將消息重新加入到隊列中。這與我們的業(yè)務(wù)模型不符,即便nats提供了發(fā)送超時的設(shè)定,但我們還是無法給出適當(dāng)?shù)膖imeout時間。Go圈里的另一個高性能分布式消息隊列nsq[5]采用的也是“至少送一次”的消息送達(dá)模型[6],因此也無法滿足我們的業(yè)務(wù)需求。
我們的業(yè)務(wù)決定了我們需要的隊列要支持“多生產(chǎn)者多消費(fèi)者”模型,Go語言內(nèi)置的channel也是一個不錯的候選。經(jīng)過多個Go版本的打磨和優(yōu)化,channel的send和recv操作性能在一定數(shù)量goroutine的情況下已經(jīng)可以滿足很多業(yè)務(wù)場景的需求了。但channel還是不完全滿足我們的業(yè)務(wù)需求。我們的系統(tǒng)要求盡可能將來自客戶端的消息接收下來并緩存在隊列中。即便下游發(fā)送性能變慢,也要將客戶消息先收下來,而不是拒收或延遲響應(yīng)。而channel本質(zhì)上是一個具有“靜態(tài)大小”的隊列并且Go的channel操作語義會在channel buffer滿的情況下阻塞對channel的繼續(xù)send,這就與我們的場景要求有背離,即便我們使用buffered channel,我們也很難選擇一個合適的len值,并且一旦buffer滿,它與unbuffered channel行為無異。
這樣一來,我們便選擇自己實(shí)現(xiàn)一個簡單的、高性能的滿足業(yè)務(wù)要求的隊列,并且最好能像channel那樣可以被select監(jiān)聽到數(shù)據(jù)ready,而不是給消費(fèi)者帶去“心智負(fù)擔(dān)” :消費(fèi)者采用輪詢的方式查看隊列中是否有數(shù)據(jù)。
2. 設(shè)計與實(shí)現(xiàn)方案
要設(shè)計和實(shí)現(xiàn)這樣一個隊列結(jié)構(gòu),我們需要解決三個問題:
實(shí)現(xiàn)隊列這個數(shù)據(jù)結(jié)構(gòu); 實(shí)現(xiàn)多goroutine并發(fā)訪問隊列時對消費(fèi)者和生產(chǎn)者的協(xié)調(diào); 解決消費(fèi)者使用select監(jiān)聽隊列的問題。
我們逐一來看!
1) 基礎(chǔ)隊列結(jié)構(gòu)實(shí)現(xiàn)來自一個未被Go項(xiàng)目采納的技術(shù)提案
隊列是最基礎(chǔ)的數(shù)據(jù)結(jié)構(gòu),實(shí)現(xiàn)一個“先進(jìn)先出(FIFO)”的練手queue十分容易,但實(shí)現(xiàn)一份能加入標(biāo)準(zhǔn)庫、資源占用小且性能良好的queue并不容易。Christian Petrin[7]在2018年10月份曾發(fā)起一份關(guān)于Go標(biāo)準(zhǔn)庫加入queue實(shí)現(xiàn)的技術(shù)提案[8],提案對基于array和鏈表的多種queue實(shí)現(xiàn)[9]進(jìn)行詳細(xì)的比對,并最終給出結(jié)論:impl7[10]是最為適宜和有競爭力的標(biāo)準(zhǔn)庫queue的候選者。雖然該技術(shù)提案目前尚未得到accept,但impl7足可以作為我們的內(nèi)存隊列的基礎(chǔ)實(shí)現(xiàn)。
2) 為impl7添加并發(fā)支持
在性能敏感的領(lǐng)域,我們可以直接使用sync包提供的諸多同步原語來實(shí)現(xiàn)goroutine并發(fā)安全訪問,這里也不例外,一個最簡單的讓impl7隊列實(shí)現(xiàn)支持并發(fā)的方法就是使用sync.Mutex實(shí)現(xiàn)對隊列的互斥訪問。由于impl7并未作為一個獨(dú)立的repo存在,我們將其代碼copy到我們的實(shí)現(xiàn)中(queueimpl7.go),并將其包名由queueimpl7改名為queue:
// github.com/bigwhite/experiments/blob/master/queue-with-select/safe-queue1/queueimpl7.go
// Package queueimpl7 implements an unbounded, dynamically growing FIFO queue.
// Internally, queue store the values in fixed sized slices that are linked using
// a singly linked list.
// This implementation tests the queue performance when performing lazy creation of
// the internal slice as well as starting with a 1 sized slice, allowing it to grow
// up to 16 by using the builtin append function. Subsequent slices are created with
// 128 fixed size.
package queue
// Keeping below as var so it is possible to run the slice size bench tests with no coding changes.
var (
// firstSliceSize holds the size of the first slice.
firstSliceSize = 1
// maxFirstSliceSize holds the maximum size of the first slice.
maxFirstSliceSize = 16
// maxInternalSliceSize holds the maximum size of each internal slice.
maxInternalSliceSize = 128
)
... ...
下面我們就來為以queueimpl7為底層實(shí)現(xiàn)的queue增加并發(fā)訪問支持:
// github.com/bigwhite/experiments/blob/master/queue-with-select/safe-queue1/safe-queue.go
package queue
import (
"sync"
)
type SafeQueue struct {
q *Queueimpl7
sync.Mutex
}
func NewSafe() *SafeQueue {
sq := &SafeQueue{
q: New(),
}
return sq
}
func (s *SafeQueue) Len() int {
s.Lock()
n := s.q.Len()
s.Unlock()
return n
}
func (s *SafeQueue) Push(v interface{}) {
s.Lock()
defer s.Unlock()
s.q.Push(v)
}
func (s *SafeQueue) Pop() (interface{}, bool) {
s.Lock()
defer s.Unlock()
return s.q.Pop()
}
func (s *SafeQueue) Front() (interface{}, bool) {
s.Lock()
defer s.Unlock()
return s.q.Front()
}
我們建立一個新結(jié)構(gòu)體SafeQueue,用于表示支持并發(fā)訪問的Queue,該結(jié)構(gòu)只是在queueimpl7的Queue的基礎(chǔ)上嵌入了sync.Mutex。
3) 支持select監(jiān)聽
到這里支持并發(fā)的queue雖然實(shí)現(xiàn)了,但在使用上還存在一些問題,尤其是對消費(fèi)者而言,它只能通過輪詢的方式來檢查隊列中是否有消息。而Go并發(fā)范式中,select扮演著重要角色,如果能讓SafeQueue像普通channel那樣能支持select監(jiān)聽,那么消費(fèi)者在使用時的心智負(fù)擔(dān)將大大降低。于是我們得到了下面第二版的SafeQueue實(shí)現(xiàn):
// github.com/bigwhite/experiments/blob/master/queue-with-select/safe-queue2/safe-queue.go
package queue
import (
"sync"
"time"
)
const (
signalInterval = 200
signalChanSize = 10
)
type SafeQueue struct {
q *Queueimpl7
sync.Mutex
C chan struct{}
}
func NewSafe() *SafeQueue {
sq := &SafeQueue{
q: New(),
C: make(chan struct{}, signalChanSize),
}
go func() {
ticker := time.NewTicker(time.Millisecond * signalInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if sq.q.Len() > 0 {
// send signal to indicate there are message waiting to be handled
select {
case sq.C <- struct{}{}:
//signaled
default:
// not block this goroutine
}
}
}
}
}()
return sq
}
func (s *SafeQueue) Len() int {
s.Lock()
n := s.q.Len()
s.Unlock()
return n
}
func (s *SafeQueue) Push(v interface{}) {
s.Lock()
defer s.Unlock()
s.q.Push(v)
}
func (s *SafeQueue) Pop() (interface{}, bool) {
s.Lock()
defer s.Unlock()
return s.q.Pop()
}
func (s *SafeQueue) Front() (interface{}, bool) {
s.Lock()
defer s.Unlock()
return s.q.Front()
}
從上面代碼看到,每個SafeQueue的實(shí)例會伴隨一個goroutine,該goroutine會定期(signalInterval)掃描其所綁定的隊列實(shí)例中當(dāng)前消息數(shù),如果大于0,則會向SafeQueue結(jié)構(gòu)中新增的channel發(fā)送一條數(shù)據(jù),作為一個“事件”。SafeQueue的消費(fèi)者則可以通過select來監(jiān)聽該channel,待收到“事件”后調(diào)用SafeQueue的Pop方法獲取隊列數(shù)據(jù)。下面是一個SafeQueue的簡單使用示例:
// github.com/bigwhite/experiments/blob/master/queue-with-select/main.go
package main
import (
"fmt"
"sync"
"time"
queue "github.com/bigwhite/safe-queue/safe-queue2"
)
func main() {
var q = queue.NewSafe()
var wg sync.WaitGroup
wg.Add(2)
// 生產(chǎn)者
go func() {
for i := 0; i < 1000; i++ {
time.Sleep(time.Second)
q.Push(i + 1)
}
wg.Done()
}()
// 消費(fèi)者
go func() {
LOOP:
for {
select {
case <-q.C:
for {
i, ok := q.Pop()
if !ok {
// no msg available
continue LOOP
}
fmt.Printf("%d\n", i.(int))
}
}
}
}()
wg.Wait()
}
從支持SafeQueue的原理可以看到,當(dāng)有多個消費(fèi)者時,只有一個消費(fèi)者能得到“事件”并開始消費(fèi)。如果隊列消息較少,只有一個消費(fèi)者可以啟動消費(fèi),這個機(jī)制也不會導(dǎo)致“驚群”;當(dāng)隊列中有源源不斷的消費(fèi)產(chǎn)生時,與SafeQueue綁定的goroutine可能會連續(xù)發(fā)送“事件”,多個消費(fèi)者都會收到事件并啟動消費(fèi)行為。在這樣的實(shí)現(xiàn)下,建議消費(fèi)者在收到“事件”后持續(xù)消費(fèi),直到Pop的第二個返回值返回false(代表隊列為空),就像上面示例中的那樣。
這個SafeQueue的性能“中規(guī)中矩”,比buffered channel略好(Go 1.16 darwin下跑的benchmark):
$go test -bench .
goos: darwin
goarch: amd64
pkg: github.com/bigwhite/safe-queue/safe-queue2
cpu: Intel(R) Core(TM) i5-8257U CPU @ 1.40GHz
BenchmarkParallelQueuePush-8 10687545 110.9 ns/op 32 B/op 1 allocs/op
BenchmarkParallelQueuePop-8 18185744 55.58 ns/op 0 B/op 0 allocs/op
BenchmarkParallelPushBufferredChan-8 10275184 127.1 ns/op 16 B/op 1 allocs/op
BenchmarkParallelPopBufferedChan-8 10168750 128.8 ns/op 16 B/op 1 allocs/op
BenchmarkParallelPushUnBufferredChan-8 3005150 414.9 ns/op 16 B/op 1 allocs/op
BenchmarkParallelPopUnBufferedChan-8 2987301 402.9 ns/op 16 B/op 1 allocs/op
PASS
ok github.com/bigwhite/safe-queue/safe-queue2 11.209s
注:BenchmarkParallelQueuePop-8因?yàn)槭亲x取空隊列,所以沒有分配內(nèi)存,實(shí)際情況是會有內(nèi)存分配的。另外并發(fā)goroutine的模擬差異可能導(dǎo)致有結(jié)果差異。
3. 擴(kuò)展與問題
上面實(shí)現(xiàn)的SafeQueue是一個純內(nèi)存隊列,一旦程序停止/重啟,未處理的消息都將消失。一個傳統(tǒng)的解決方法是采用wal(write ahead log)在推隊列之前將消息持久化后寫入文件,在消息出隊列后將消息狀態(tài)也寫入wal文件中。這樣重啟程序時,從wal中恢復(fù)消息到各個隊列即可。我們也可以將wal封裝到SafeQueue的實(shí)現(xiàn)中,在SafeQueue的Push和Pop時自動操作wal,并對SafeQueue的使用者透明,不過這里有一個前提,那就是隊列消息的可序列化(比如使用protobuf)。另外SafeQueue還需提供一個對外的wal消息恢復(fù)接口。大家可以考慮一下如何實(shí)現(xiàn)這些。
另外在上述的SafeQueue實(shí)現(xiàn)中,我們在給SafeQueue增加select監(jiān)聽時引入兩個const:
const (
signalInterval = 200
signalChanSize = 10
)
對于SafeQueue的使用者而言,這兩個默認(rèn)值可能不滿足需求,那么我們可以將SafeQueue的New方法做一些改造,采用“功能選項(xiàng)(functional option)”的模式[11]為用戶提供設(shè)置這兩個值的可選接口,這個“作業(yè)”也留給大家了^_^。
本文所有示例代碼可以在這里[12]下載 - https://github.com/bigwhite/experiments/tree/master/queue-with-select。
參考資料
5G消息方興未艾: https://51smspush.com
[2]kafka: https://kafka.apache.org/
[3]nats: https://github.com/nats-io/nats-server
[4]nats-streaming: https://github.com/nats-io/nats-streaming-server
[5]nsq: https://github.com/nsqio/nsq
[6]“至少送一次”的消息送達(dá)模型: https://nsq.io/overview/features_and_guarantees.html
[7]Christian Petrin: https://github.com/christianrpetrin
[8]關(guān)于Go標(biāo)準(zhǔn)庫加入queue實(shí)現(xiàn)的技術(shù)提案: https://github.com/golang/proposal/blob/master/design/27935-unbounded-queue-package.md
[9]多種queue實(shí)現(xiàn): https://github.com/christianrpetrin/queue-tests
[10]impl7: https://github.com/christianrpetrin/queue-tests/tree/master/queueimpl7/queueimpl7.go
[11]“功能選項(xiàng)(functional option)”的模式: https://www.imooc.com/read/87/article/2424
[12]這里: https://github.com/bigwhite/experiments/tree/master/queue-with-select
[13]改善Go語?編程質(zhì)量的50個有效實(shí)踐: https://www.imooc.com/read/87
[14]Kubernetes實(shí)戰(zhàn):高可用集群搭建、配置、運(yùn)維與應(yīng)用: https://coding.imooc.com/class/284.html
[15]我愛發(fā)短信: https://51smspush.com/
[16]鏈接地址: https://m.do.co/c/bff6eed92687
推薦閱讀
