【GoCN酷Go推薦】異步任務(wù)隊列Asynq
什么是Asynq
Asynq是一個go語言實現(xiàn)的分布式任務(wù)隊列和異步處理庫,基于redis,類似sidekiq和celery,他具有以下特點:
保證至少執(zhí)行一次任務(wù) 持久化 失敗重試 worker崩潰自動恢復(fù) 優(yōu)先隊列 暫停隊列 支持中間件 允許唯一任務(wù) 支持Redis Cluster實現(xiàn)自動分片 支持Redis Sentinels實現(xiàn)高可用 提供web ui管理 提供cli管理
go get -u github.com/hibiken/asynq
// 命令行工具:
go get -u github.com/hibiken/asynq/tools/asynq
使用
前提需要保證redis可用
main.go Asynq服務(wù)端worker.go 處理程序asynq_test.go 模擬客戶端使用
Asynq Server
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"time"
"github.com/hibiken/asynq"
"golang.org/x/sys/unix"
)
func main() {
// asynq server
srv := asynq.NewServer(
asynq.RedisClientOpt{
Addr: ":6379",
Password: "Your password",
DB: 0,
},
asynq.Config{Concurrency: 20},
)
mux := asynq.NewServeMux()
// some middlewares
mux.Use(func(next asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
// just record a log
fmt.Println(fmt.Printf("[%s] log - %+v", time.Now().Format("2006-01-02 15:04:05"), t.Payload))
return next.ProcessTask(ctx, t)
})
})
// some workers
mux.HandleFunc("msg", HandleMsg)
// start server
if err := srv.Start(mux); err != nil {
log.Fatalf("could not start server: %v", err)
}
// Wait for termination signal.
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
for {
s := <-sigs
if s == unix.SIGTSTP {
srv.Quiet() // Stop processing new tasks
continue
}
break
}
// Stop worker server.
srv.Stop()
}
Asynq Workers
package main
import (
"context"
"fmt"
"github.com/hibiken/asynq"
)
// HandleMsg 處理msg
func HandleMsg(ctx context.Context, t *asynq.Task) (err error) {
fmt.Println("------HandleMsg start------")
message, _ := t.Payload.GetString("message")
userid, _ := t.Payload.GetInt("user_id")
fmt.Println(fmt.Printf("{message: \"%s\", user_id: %d}", message, userid))
return
}
模擬使用
package main
import (
"fmt"
"os"
"testing"
"time"
"github.com/hibiken/asynq"
)
var c *asynq.Client
func TestMain(m *testing.M) {
r := asynq.RedisClientOpt{
Addr: ":6379",
Password: "Your password",
DB: 0,
}
c = asynq.NewClient(r)
ret := m.Run()
c.Close()
os.Exit(ret)
}
// 即時消費
func Test_Enqueue(t *testing.T) {
payload := map[string]interface{}{"user_id": 1, "message": "i'm immediately message"}
task := asynq.NewTask("msg", payload)
res, err := c.Enqueue(task)
if err != nil {
t.Errorf("could not enqueue task: %v", err)
t.FailNow()
}
fmt.Printf("Enqueued Result: %+v\n", res)
}
// 延時消費
func Test_EnqueueDelay(t *testing.T) {
payload := map[string]interface{}{"user_id": 1, "message": "i'm delay 5 seconds message"}
task := asynq.NewTask("msg", payload)
res, err := c.Enqueue(task, asynq.ProcessIn(5*time.Second))
// res, err := c.Enqueue(task, asynq.ProcessAt(time.Now().Add(5*time.Second)))
if err != nil {
t.Errorf("could not enqueue task: %v", err)
t.FailNow()
}
fmt.Printf("Enqueued Result: %+v\n", res)
}
// 超時、重試、過期
func Test_EnqueueOther(t *testing.T) {
payload := map[string]interface{}{"user_id": 1, "message": "i'm delay 5 seconds message"}
task := asynq.NewTask("msg", payload)
// 10秒超時,最多重試3次,20秒后過期
res, err := c.Enqueue(task, asynq.MaxRetry(3), asynq.Timeout(10*time.Second), asynq.Deadline(time.Now().Add(20*time.Second)))
if err != nil {
t.Errorf("could not enqueue task: %v", err)
t.FailNow()
}
fmt.Printf("Enqueued Result: %+v\n", res)
}
如何測試
先將服務(wù)運行起來
$ go version
go version go1.16 darwin/amd64
$ go run .
運行指定測試
$ go test -timeout 30s -run ^Test_Enqueue$ asynq_test -v -count=1
=== RUN Test_Enqueue
Enqueued Result: &{ID:683d8f36-f8c5-49c0-88b4-f1aefa7686de EnqueuedAt:2021-06-11 10:41:49.018475 +0000 UTC ProcessAt:2021-06-11 18:41:49.017778 +0800 CST m=+0.000892619 Retry:25 Queue:default Timeout:30m0s Deadline:1970-01-01 08:00:00 +0800 CST}
--- PASS: Test_Enqueue (0.00s)
PASS
ok asynq_test 0.009s
隊列管理
Asynq提供了webui 和 命令行工具asynq
webui
Asynqmon webui在這個倉庫里
$ ./asynqmon --port=3000 --redis-addr=localhost:6380

asynq命令行
$ asynq -p Yourpassword stats
Task Count by State
active pending scheduled retry archived
---------- -------- --------- ----- ----
0 0 0 0 0
Task Count by Queue
default
-------
0
Daily Stats 2021-06-11 UTC
processed failed error rate
--------- ------ ----------
4 0 0.00%
Redis Info
version uptime connections memory usage peak memory usage
------- ------ ----------- ------------ -----------------
6.2.0 0 days 5 16.04MB 16.14MB
完整代碼官方文檔:https://github.com/hibiken/asynq
更多請查看:https://github.com/hibiken/asynq
歡迎加入我們GOLANG中國社區(qū):https://gocn.vip/
《酷Go推薦》招募:
各位Gopher同學,最近我們社區(qū)打算推出一個類似GoCN每日新聞的新欄目《酷Go推薦》,主要是每周推薦一個庫或者好的項目,然后寫一點這個庫使用方法或者優(yōu)點之類的,這樣可以真正的幫助到大家能夠?qū)W習到
新的庫,并且知道怎么用。
大概規(guī)則和每日新聞類似,如果報名人多的話每個人一個月輪到一次,歡迎大家報名!(報名地址:https://wj.qq.com/s2/7734329/3f51)
掃碼也可以加入 GoCN 的大家族喲~
Gopher China2021大會日程詳情來了!
點擊下方「閱讀原文」即可報名參加大會
評論
圖片
表情
