【GoCN酷Go推薦】分布式任務(wù) + 消息隊(duì)列框架 go-queue
目錄
1.為什么寫這個(gè)庫
2.應(yīng)用場景有哪些
3.如何使用
4.總結(jié)
為什么要寫這個(gè)庫?
在開始自研 go-queue 之前,針對以下我們調(diào)研目前的開源隊(duì)列方案:
beanstalkd
beanstalkd 有一些特殊好用功能:支持任務(wù) priority、延時(shí) (delay)、超時(shí)重發(fā) (time-to-run) 和預(yù)留 (buried),能夠很好的支持分布式的后臺任務(wù)和定時(shí)任務(wù)處理。如下是 beanstalkd 基本部分:
job:任務(wù)單元;tube:任務(wù)隊(duì)列,存儲統(tǒng)一類型job。producer 和 consumer 操作對象;producer:job生產(chǎn)者,通過 put 將 job 加入一個(gè) tube;consumer:job消費(fèi)者,通過 reserve/release/bury/delete 來獲取 job 或改變 job 的狀態(tài);
很幸運(yùn)的是官方提供了 go client:https://github.com/beanstalkd/go-beanstalk。
但是這對不熟悉 beanstalkd 操作的 go 開發(fā)者而言,需要學(xué)習(xí)成本。
kafka
類似基于 kafka 消息隊(duì)列作為存儲的方案,存儲單元是消息,如果要實(shí)現(xiàn)延時(shí)執(zhí)行,可以想到的方案是以延時(shí)執(zhí)行的時(shí)間作為 topic,這樣在大型的消息系統(tǒng)中,充斥大量一次性的 topic(dq_1616324404788, dq_1616324417622),當(dāng)時(shí)間分散,會容易造成磁盤隨機(jī)寫的情況。
而且在 go 生態(tài)中,
同時(shí)考慮以下因素:
支持延時(shí)任務(wù) 高可用,保證數(shù)據(jù)不丟失 可擴(kuò)展資源和性能
所以我們自己基于以上兩個(gè)基礎(chǔ)組件開發(fā)了 go-queue:
基于 beanstalkd開發(fā)了dq,支持定時(shí)和延時(shí)操作。同時(shí)加入redis保證消費(fèi)唯一性。基于 kafka開發(fā)了kq,簡化生產(chǎn)者和消費(fèi)者的開發(fā) API,同時(shí)在寫入 kafka 使用批量寫,節(jié)省 IO。
整體設(shè)計(jì)如下:

應(yīng)用場景
首先在消費(fèi)場景來說,一個(gè)是針對任務(wù)隊(duì)列,一個(gè)是消息隊(duì)列。而兩者最大的區(qū)別:
任務(wù)是沒有順序約束;消息需要; 任務(wù)在加入中,或者是等待中,可能存在狀態(tài)更新(或是取消);消息則是單一的存儲即可;
所以在背后的基礎(chǔ)設(shè)施選型上,也是基于這種消費(fèi)場景。
dq:依賴于beanstalkd,適合延時(shí)、定時(shí)任務(wù)執(zhí)行;kq:依賴于kafka,適用于異步、批量任務(wù)執(zhí)行;
而從其中 dq的 API 中也可以看出:
// 延遲任務(wù)執(zhí)行
- dq.Delay(msg, delayTime);
// 定時(shí)任務(wù)執(zhí)行
- dq.At(msg, atTime);
而在我們內(nèi)部:
如果是 異步消息消費(fèi)/推送 ,則會選擇使用 kq:kq.Push(msg);如果是 15 分鐘提醒/ 明天中午發(fā)送短信 等,則使用 dq;
如何使用
分別介紹dq和 kq的使用方式:
dq
// [Producer]
producer := dq.NewProducer([]dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11301",
Tube: "tube",
},
})
for i := 1000; i < 1005; i++ {
_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
if err != nil {
fmt.Println(err)
}
}
// [Consumer]
consumer := dq.NewConsumer(dq.DqConf{
Beanstalks: []dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11301",
Tube: "tube",
},
},
Redis: redis.RedisConf{
Host: "localhost:6379",
Type: redis.NodeType,
},
})
consumer.Consume(func(body []byte) {
// your consume logic
fmt.Println(string(body))
})
和普通的 生產(chǎn)者 - 消費(fèi)者 模型類似,開發(fā)者也只需要關(guān)注以下:
開發(fā)者只需要關(guān)注自己的任務(wù)類型「延時(shí)/定時(shí)」 消費(fèi)端的消費(fèi)邏輯
kq
producer.go:
// message structure
type message struct {
Key string `json:"key"`
Value string `json:"value"`
Payload string `json:"message"`
}
pusher := kq.NewPusher([]string{
"127.0.0.1:19092",
"127.0.0.1:19093",
"127.0.0.1:19094",
}, "kq")
ticker := time.NewTicker(time.Millisecond)
for round := 0; round < 3; round++ {
select {
case <-ticker.C:
count := rand.Intn(100)
// 準(zhǔn)備消息
m := message{
Key: strconv.FormatInt(time.Now().UnixNano(), 10),
Value: fmt.Sprintf("%d,%d", round, count),
Payload: fmt.Sprintf("%d,%d", round, count),
}
body, err := json.Marshal(m)
if err != nil {
log.Fatal(err)
}
fmt.Println(string(body))
// push to kafka broker
if err := pusher.Push(string(body)); err != nil {
log.Fatal(err)
}
}
}
config.yaml:
Name: kq
Brokers:
- 127.0.0.1:19092
- 127.0.0.1:19092
- 127.0.0.1:19092
Group: adhoc
Topic: kq
Offset: first
Consumers: 1
consumer.go:
var c kq.KqConf
conf.MustLoad("config.yaml", &c)
// WithHandle: 具體的處理msg的logic
// 這也是開發(fā)者需要根據(jù)自己的業(yè)務(wù)定制化
q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error {
fmt.Printf("=> %s\n", v)
return nil
}))
defer q.Stop()
q.Start()
和 dq 不同的是:開發(fā)者不需要關(guān)注任務(wù)類型(在這里也沒有任務(wù)的概念,傳遞的都是 message data)。
其他操作和 dq 類似,只是將 業(yè)務(wù)處理函數(shù) 當(dāng)成配置直接傳入消費(fèi)者中。
總結(jié)
在我們目前的場景中,kq 大量使用在我們的異步消息服務(wù);而延時(shí)任務(wù),我們除了 dq,還可以使用內(nèi)存版的 TimingWheel「go-zero 生態(tài)組件」。
關(guān)于 go-queue 更多的設(shè)計(jì)和實(shí)現(xiàn)文章,可以持續(xù)關(guān)注我們。歡迎大家去關(guān)注和使用。
https://github.com/tal-tech/go-queue
https://github.com/tal-tech/go-zero
歡迎使用 go-zero 并 star 支持我們!
還想了解更多嗎?
更多請查看:https://github.com/tidwall/gjson
歡迎加入我們GOLANG中國社區(qū):https://gocn.vip/
《酷Go推薦》招募:
各位Gopher同學(xué),最近我們社區(qū)打算推出一個(gè)類似GoCN每日新聞的新欄目《酷Go推薦》,主要是每周推薦一個(gè)庫或者好的項(xiàng)目,然后寫一點(diǎn)這個(gè)庫使用方法或者優(yōu)點(diǎn)之類的,這樣可以真正的幫助到大家能夠?qū)W習(xí)到新的庫,并且知道怎么用。
大概規(guī)則和每日新聞類似,如果報(bào)名人多的話每個(gè)人一個(gè)月輪到一次,歡迎大家報(bào)名!
點(diǎn)擊 閱讀原文 即刻報(bào)名
