<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          【GoCN酷Go推薦】異步任務(wù)隊列Asynq

          共 7687字,需瀏覽 16分鐘

           ·

          2021-06-18 00:10

          什么是Asynq

          Asynq是一個go語言實現(xiàn)的分布式任務(wù)隊列和異步處理庫,基于redis,類似sidekiqcelery,他具有以下特點:

          • 保證至少執(zhí)行一次任務(wù)
          • 持久化
          • 失敗重試
          • worker崩潰自動恢復(fù)
          • 優(yōu)先隊列
          • 暫停隊列
          • 支持中間件
          • 允許唯一任務(wù)
          • 支持Redis Cluster實現(xiàn)自動分片
          • 支持Redis Sentinels實現(xiàn)高可用
          • 提供web ui管理
          • 提供cli管理

          安裝
          go get -u github.com/hibiken/asynq
          // 命令行工具:
          go get -u github.com/hibiken/asynq/tools/asynq

          使用

          前提需要保證redis可用

          main.go Asynq服務(wù)端worker.go 處理程序asynq_test.go 模擬客戶端使用

          Asynq Server

          package main

          import (
           "context"
           "fmt"
           "log"
           "os"
           "os/signal"
           "time"

           "github.com/hibiken/asynq"
           "golang.org/x/sys/unix"
          )

          func main() {
           // asynq server
           srv := asynq.NewServer(
            asynq.RedisClientOpt{
             Addr:     ":6379",
             Password: "Your password",
             DB:       0,
            },
            asynq.Config{Concurrency: 20},
           )

           mux := asynq.NewServeMux()

           // some middlewares
           mux.Use(func(next asynq.Handler) asynq.Handler {
            return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
             // just record a log
             fmt.Println(fmt.Printf("[%s] log - %+v", time.Now().Format("2006-01-02 15:04:05"), t.Payload))

             return next.ProcessTask(ctx, t)
            })
           })

           // some workers
           mux.HandleFunc("msg", HandleMsg)

           // start server
           if err := srv.Start(mux); err != nil {
            log.Fatalf("could not start server: %v", err)
           }

           // Wait for termination signal.
           sigs := make(chan os.Signal, 1)
           signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
           for {
            s := <-sigs
            if s == unix.SIGTSTP {
             srv.Quiet() // Stop processing new tasks
             continue
            }
            break
           }

           // Stop worker server.
           srv.Stop()
          }

          Asynq Workers

          package main

          import (
           "context"
           "fmt"

           "github.com/hibiken/asynq"
          )

          // HandleMsg 處理msg
          func HandleMsg(ctx context.Context, t *asynq.Task) (err error) {
           fmt.Println("------HandleMsg start------")

           message, _ := t.Payload.GetString("message")
           userid, _ := t.Payload.GetInt("user_id")

           fmt.Println(fmt.Printf("{message: \"%s\", user_id: %d}", message, userid))
           return
          }

          模擬使用

          package main

          import (
           "fmt"
           "os"
           "testing"
           "time"

           "github.com/hibiken/asynq"
          )

          var c *asynq.Client

          func TestMain(m *testing.M) {
           r := asynq.RedisClientOpt{
            Addr:     ":6379",
            Password: "Your password",
            DB:       0,
           }
           c = asynq.NewClient(r)
           ret := m.Run()
           c.Close()
           os.Exit(ret)
          }

          // 即時消費
          func Test_Enqueue(t *testing.T) {
           payload := map[string]interface{}{"user_id"1"message""i'm immediately message"}
           task := asynq.NewTask("msg", payload)
           res, err := c.Enqueue(task)
           if err != nil {
            t.Errorf("could not enqueue task: %v", err)
            t.FailNow()
           }
           fmt.Printf("Enqueued Result: %+v\n", res)
          }

          // 延時消費
          func Test_EnqueueDelay(t *testing.T) {
           payload := map[string]interface{}{"user_id"1"message""i'm delay 5 seconds message"}
           task := asynq.NewTask("msg", payload)
           res, err := c.Enqueue(task, asynq.ProcessIn(5*time.Second))
           // res, err := c.Enqueue(task, asynq.ProcessAt(time.Now().Add(5*time.Second)))
           if err != nil {
            t.Errorf("could not enqueue task: %v", err)
            t.FailNow()
           }
           fmt.Printf("Enqueued Result: %+v\n", res)
          }

          // 超時、重試、過期
          func Test_EnqueueOther(t *testing.T) {
           payload := map[string]interface{}{"user_id"1"message""i'm delay 5 seconds message"}
           task := asynq.NewTask("msg", payload)
           // 10秒超時,最多重試3次,20秒后過期
           res, err := c.Enqueue(task, asynq.MaxRetry(3), asynq.Timeout(10*time.Second), asynq.Deadline(time.Now().Add(20*time.Second)))
           if err != nil {
            t.Errorf("could not enqueue task: %v", err)
            t.FailNow()
           }
           fmt.Printf("Enqueued Result: %+v\n", res)
          }

          如何測試

          先將服務(wù)運行起來

          $ go version
          go version go1.16 darwin/amd64
          $ go run .

          運行指定測試

          $ go test -timeout 30s -run ^Test_Enqueue$ asynq_test -v -count=1

          === RUN   Test_Enqueue
          Enqueued Result: &{ID:683d8f36-f8c5-49c0-88b4-f1aefa7686de EnqueuedAt:2021-06-11 10:41:49.018475 +0000 UTC ProcessAt:2021-06-11 18:41:49.017778 +0800 CST m=+0.000892619 Retry:25 Queue:default Timeout:30m0s Deadline:1970-01-01 08:00:00 +0800 CST}
          --- PASS: Test_Enqueue (0.00s)
          PASS
          ok   asynq_test 0.009s

          隊列管理

          Asynq提供了webui 和 命令行工具asynq

          webui

          Asynqmon webui在這個倉庫里

          $ ./asynqmon --port=3000 --redis-addr=localhost:6380

          img

          asynq命令行

          $ asynq -p Yourpassword stats
          Task Count by State
          active      pending   scheduled  retry  archived
          ----------  --------  ---------  -----  ----
          0           0         0          0      0

          Task Count by Queue
          default
          -------
          0

          Daily Stats 2021-06-11 UTC
          processed  failed  error rate
          ---------  ------  ----------
          4          0       0.00%

          Redis Info
          version  uptime  connections  memory usage  peak memory usage
          -------  ------  -----------  ------------  -----------------
          6.2.0    0 days  5            16.04MB       16.14MB

          更多閱讀

          完整代碼官方文檔:https://github.com/hibiken/asynq


          還想了解更多嗎?

          更多請查看:https://github.com/hibiken/asynq 

          歡迎加入我們GOLANG中國社區(qū):https://gocn.vip/


          《酷Go推薦》招募:


          各位Gopher同學,最近我們社區(qū)打算推出一個類似GoCN每日新聞的新欄目《酷Go推薦》,主要是每周推薦一個庫或者好的項目,然后寫一點這個庫使用方法或者優(yōu)點之類的,這樣可以真正的幫助到大家能夠?qū)W習到

          新的庫,并且知道怎么用。


          大概規(guī)則和每日新聞類似,如果報名人多的話每個人一個月輪到一次,歡迎大家報名!(報名地址:https://wj.qq.com/s2/7734329/3f51)


          掃碼也可以加入 GoCN 的大家族喲~


           Gopher China2021大會日程詳情來了!



          點擊下方「閱讀原文」即可報名參加大會


          瀏覽 210
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  色哟哟av网址 | 97精品超碰一区二区三区 | 亚洲黄色视频网址 | 综合久久黄色 | 精品九九九九九九九九九屄 |