圖文結(jié)合!Redis延遲隊列g(shù)olang高效實踐
導(dǎo)語?|?
本文主要講述如何使用golang基于Redis實現(xiàn)延遲消息隊列組件。希望對有需求的同學(xué)有所幫助。
一、背景
業(yè)務(wù)中經(jīng)常會有這樣的場景:
-
到期后自動執(zhí)行指定操作;
-
查詢某個任務(wù)是否完成,未完成等待一定時間再次查詢;
-
回調(diào)通知,當回調(diào)失敗時,等待后重試;等等還有其他很多類似的場景。
很多時候我們會直接通過一個本地定時器來幫我們完成這個任務(wù)。如果我們的系統(tǒng)是多實例分布式的,本地定時器就會面臨很多問題,如:怎么保證重復(fù)處理的問題;統(tǒng)一管控的問題等 等。面對本地定時器遇到的問題,我們可以使用分布式延遲隊列來實現(xiàn)。
這里介紹一種使用golang基于redis實現(xiàn)延遲隊列的具體實踐。
二、實現(xiàn)原理
1、使用redis可以通過List類型來實現(xiàn)隊列的功能,通過LPOP,RPUSH來保證先進先出的特性。

2、針對需要延遲處理的消息可以通過SortedSet有序集合類型來存儲, 消息到期時期使用時間戳,作為member score的值。

3、定時輪訓(xùn)sortedset,使用到期時間戳作為score,通過ZRANGEBYSCORE排序獲取到期的消息,將到期的消息遷移到List隊列中即可。

三、消息遷移的原子性
針對到期消息的往list的遷移需要三個動作:
-
查詢到期消息;
-
從sortedset取出到期消息;
-
將到期消息push到list隊列中。
那如何保證這個三個操作的 原子性 (要么都成功,要么都失?。?。在redis中有兩種處理方式可以保證多操作間的原子性。
(一)Transaction
-
MULTI標記一個事務(wù)塊的開始。事務(wù)塊內(nèi)的多條命令會按照先后順序被放進一個隊列當中,最后由EXEC命令原子性(atomic)地執(zhí)行。
-
EXEC執(zhí)行所有事務(wù)塊內(nèi)的命令。
(二)LuaScript
Redis使用單個Lua解釋器去運行所有腳本,并且,Redis也保證腳本會以原子性(atomic)的方式執(zhí)行:當某個腳本正在運行的時候,不會有其他腳本或Redis命令被執(zhí)行。這和使用MULTI/EXEC包圍的事務(wù)很類似。在其他別的客戶端看來,腳本的效果(effect)要么是不可見的(not visible),要么就是已完成的(already completed)。注意:編寫的script不能很慢,因為會阻塞其他命令的執(zhí)行。
Transaction能夠保證多個操作的原子性。LuaScript不僅保證了多操作間的原子性,可以處理更復(fù)雜的邏輯,如保障get、set操作的原子性。這里針對到期消息的遷移是先查詢到期的消息,然后兩個寫操作完成。所以這里使用LuaScript實現(xiàn)更為簡單方便。否則還需要加一個鎖來避免同一個到期消息多次處理的問題。如果遷移到期消息的模塊是單實例(非并發(fā))處理的,不需要加鎖處理。
四、List,SortedSet性能
網(wǎng)上針對redis的壓測很多,這里我們使用memtier_benchmark將與延遲隊列使用到相關(guān)的操作進行壓測。使用redis6.0,8核16Glinux服務(wù)器。
(一)List讀寫性能 LPOP,RPUSH時間復(fù)雜度為O(1)


(二)sortedset 相關(guān)操作的讀寫性能
-
zadd O(M*log(N)), N是有序集的基數(shù),M為成功添加的新成員的數(shù)量。

-
ZREMRANGEBYRANKS O(log(N)+M), N為有序集的基數(shù),而M為被移除成員的數(shù)量。

在熟悉了基于redis實現(xiàn)的延遲隊列的基本方法后,接下來看下使用golang具體的實現(xiàn)。
五、消息協(xié)議定義
定義一個消息結(jié)構(gòu)來保存消息:
// Job
type Job struct {
Id string `msgpack:"1"` // 任務(wù)id
Topic string `msgpack:"2"` // 消息名
Delay int64 `msgpack:"3"` // 延遲時間
Playload []byte `msgpack:"4"` // 消息體
Timestamp int64 `msgpack:"5"` // 消息投遞時間
}
這里使用msgpack實現(xiàn)消息的序列化。messagepack是一個高效的二進制序列化協(xié)議。相比json編碼后的數(shù)據(jù)的體積更小,編解碼的速度更快。redis script也支持messagepack。
benchmark性能測試:

其他編解碼性能對比參考:
https://github.com/alecthomas/go_serialization_benchmarks
六、延遲隊列的核心-redis
基于redis實現(xiàn)分布式延遲隊列,其核心是使用List類型實現(xiàn)隊列功能;使用sortedset實現(xiàn)延遲消息的管理,并且輪詢sortedset將到期的消息遷移到List隊列中,再啟用consumer實例處理List隊列中的消息,就完成了整個延遲隊列的核心處理流程。先來看下針對redis操作的相關(guān)實現(xiàn),這里操作redis的庫使用的是go-redis庫。
(一)獲取延時消息數(shù)
// zcard
func zcard(rdb *redis.Client, key string) *redis.IntCmd {
return rdb.ZCard(context.Background(), key)
}
(二)獲取等待執(zhí)行的消息數(shù)
// list 長度
func llen(rdb *redis.Client, key string) *redis.IntCmd {
return rdb.LLen(context.Background(), key)
}
這兩個方法屬于metric方法,可以幫助我們了解當前延時隊列的消息積壓情況,為我們對consumer實例的水平擴展提供參考指標。
(三)發(fā)送可執(zhí)行消息
// 推送新的job到隊列
func lpush(rdb *redis.Client, key string, value interface{}) error {
return rdb.LPush(context.Background(), key, value).Err()
}
(四)發(fā)送延時消息
// 增加延遲job
func zadd(rdb *redis.Client, key string, value interface{}, delay int) error {
return rdb.ZAdd(
context.Background(),
key,
&redis.Z{Score: float64(delay), Member: value}).Err()
}
(五)獲取可執(zhí)行消息
// 從ready隊列取消息
func rpop(rdb *redis.Client, key string) *redis.StringCmd {
return rdb.RPop(context.Background(), key)
}
(六)到期消息遷移到待執(zhí)行隊列 ,這里使用redis script實現(xiàn)。每次都取指定數(shù)量(limit 0 num)的到期消息,時間花費相對穩(wěn)定。也不至于在到期消息突增時,導(dǎo)致redis內(nèi)存占用突增。每次執(zhí)行僅對客戶端返回消息數(shù),從而降低網(wǎng)絡(luò)傳輸。
// 將到期的job遷移到ready隊列等待執(zhí)行,這里使用redis script實現(xiàn)
func migrateExpiredJobs(rdb *redis.Client, delaykey, readyKey string) error {
script := redis.NewScript(`
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1], 'limit', 0, 20)
if(next(val) ~= nil) then
redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
redis.call('rpush', KEYS[2], unpack(val, 1, #val))
end
return #val
`)
return script.Run(context.Background(), rdb, []string{delaykey, readyKey}, time.Now().Unix()).Err()
}
七、Product實現(xiàn)
product功能比較單一,僅實現(xiàn)消息的投遞。這里是對Redis兩個方法RPUSH、ZADD的的封裝。
// QueueClient
type QueueClient struct {
queue *queue
}
QueueClient提供了 兩個操作方法 :
-
發(fā)送即時消息,針對非延遲執(zhí)行的消息直接投遞到ready隊列中,等待執(zhí)行;
// Dispatch
func (c *QueueClient) Dispatch(topic string, payload []byte) error {
return c.queue.Push(&Job{
Topic: topic,
Playload: payload,
Delay: 0,
Timestamp: time.Now().Unix(),
})
}
-
發(fā)起延遲消息,針對需要延遲執(zhí)行的消息這里支持秒級的延遲消息。這里底層使用的ZADD。
// DispatchDelaySecond
func (c *QueueClient) DispatchDelaySecond(topic string, payload []byte, delaySec int) error {
return c.queue.DelayJob(&Job{
Topic: topic,
Playload: payload,
Delay: int64(delaySec),
Timestamp: time.Now().Unix(),
})
}
八、consumer實現(xiàn)
在完成消息投遞的相關(guān)方法的實現(xiàn)后,我們來看下如何實現(xiàn)一個穩(wěn)定高效的消息處理框架。
consumer完成 兩個核心操作 :
-
將到期的消息,遷移到可執(zhí)行隊列中;
-
從可執(zhí)行隊列中取出消息完成相應(yīng)的處理。
接下來看下consumer queueserver的實現(xiàn)的主要方法。

1、QueueServer開始執(zhí)行的topic worker處理消息。一個topic一個協(xié)程的處理方式,保證不同的topic之間不相互影響;
// QueueServer
type QueueServer struct {
queueOption *QueueOption
topicwokers []*TopicWorker
queue *queue
stopCh chan struct{}
close uint32
}
// Run 開始處理消息直到收到退出命令
func (s *QueueServer) Run(ctx context.Context) error {
// 一個topic 一個協(xié)程處理
for _, topicWorker := range s.topicwokers {
go s.processJob(ctx, topicWorker)
}
// 監(jiān)聽系統(tǒng)信號
go s.watchSystemSignal(ctx)
// 等待退出
<-s.stopCh
ctxTimeOut, cancalFunc := context.WithTimeout(ctx, time.Second*time.Duration(s.queueOption.CloseWaitTime))
defer cancalFunc()
return s.Close(ctxTimeOut)
}
2、定義統(tǒng)一消息處理接口JobHandler,定義了兩個方法:Topic返回隊列名;Execute完成消息的處理,可以實現(xiàn)業(yè)務(wù)處理邏輯。
// JobHandler
type JobHandler interface {
Topic() string // 返回topic名稱
Execute(context.Context, []byte) error // 處理消息
}
通過topic worker來管理每個消息隊列的處理。
// TopicWorker
type TopicWorker struct {
TopicName string // topic 名稱
Handler JobHandler // 處理job的方法
WorkerCount int // 并行任務(wù)數(shù)
WorkerPool *semaphore.Weighted // 通過信號量控制并發(fā)協(xié)程數(shù)
}
通過信號量semaphore庫實現(xiàn)協(xié)程的并發(fā)數(shù)控制。如果消息間有順序要求,可以設(shè)置workerCount為1。當server退出的時候,獲取與workerCount相等數(shù)量的信號量等待所有處理消息的協(xié)程執(zhí)行完成。s.queue.GetReadyJob(topic.TopicName) 這個方法的內(nèi)部出了獲取可執(zhí)行消息外,還執(zhí)行了到期消息遷移的方法。
// GetReadyJob 遷移到期消息,返回可執(zhí)行消息
func (q *queue) GetReadyJob(topic string) (*Job, error) {
migrateExpiredJobs(q.rdb,
fmt.Sprintf("%s:%s", topic, DelayJobType),
fmt.Sprintf("%s:%s", topic, ReadyJobType))
return q.Pop(topic)
}
// processJob 處理消息
func (s *QueueServer) processJob(ctx context.Context, topic *TopicWorker) error {
// 循環(huán)獲取消息直到server退出
for {
// 判斷server是否退出
if atomic.LoadUint32(&s.close) == closed {
break
}
// 通過信號量控制并發(fā)的協(xié)程數(shù),正在運行的協(xié)程達到上限就等待
if err := topic.WorkerPool.Acquire(ctx, 1); err != nil {
return err
}
job, err := s.queue.GetReadyJob(topic.TopicName)
if err != nil && err != redis.Nil {
topic.WorkerPool.Release(1)
continue
}
// 沒有要執(zhí)行的job
if job == nil {
topic.WorkerPool.Release(1)
time.Sleep(time.Second * time.Duration(s.queueOption.WaitTime))
continue
}
go func() {
topic.Handler.Execute(ctx, job.Playload)
topic.WorkerPool.Release(1)
}()
}
// 等待所有處理消息的協(xié)程執(zhí)行完成
if err := topic.WorkerPool.Acquire(ctx, int64(topic.WorkerCount)); err != nil {
return err
}
return nil
}
九、擴展
(一)Job錯誤重試
如果想要給上面的Job處理加上錯誤重試的機制。我們給Job struct加上TryCount字段,當JobHandler 執(zhí)行返回error時,把job放入可執(zhí)行隊列或是延遲集合(等待指定時候后重試)。通過TryCount來限定重試的次數(shù)。超過指定次數(shù)后丟棄消息。
(二)Job超時重試
如果想要給上面的Job處理加上超時重試的機制。我們給Job struct加上TryTimeOut字段,當讀取消息的同時把job放入延遲集合(等待到達超時時間后重試)。通過TryCount來限定重試的次數(shù)。超過指定次數(shù)后丟棄消息。執(zhí)行成功時需要從set集合刪除。
十、總結(jié)
使用golang基于redis實現(xiàn)延遲隊列的方法如上所述,實現(xiàn)方式很多,但核心基本相同,可能在某些實現(xiàn)細節(jié)上略有差異。比如:
-
使用單獨的協(xié)程來完成到期消息到可執(zhí)行隊列的遷移;
-
使用redis stream來實現(xiàn)隊列。
熟悉php laravel框架的應(yīng)該覺得這個方案相似,本文的實現(xiàn)方案跟laravel里的queue庫實現(xiàn)方案類似,它支持更多的消息驅(qū)動:本地、文件、mysql、redis等。但我們借助golang可以實現(xiàn)的更高效消息處理框架。使用這種方式需要考慮消息丟失時的補償機制。
?作者簡介

王曉林
騰訊應(yīng)用開發(fā)工程師
騰訊應(yīng)用開發(fā)工程師,目前負責(zé)游可愛平臺(yka.qq.com)的后臺開發(fā)工作。
推薦閱讀
我為大家整理了一份 從入門到進階的Go學(xué)習(xí)資料禮包 ,包含學(xué)習(xí)建議:入門看什么,進階看什么。 關(guān)注公眾號 「polarisxu」,回復(fù)? ebook ?獲?。贿€可以回復(fù)「進群」,和數(shù)萬 Gopher 交流學(xué)習(xí)。
