<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          使用 Asynq 實(shí)現(xiàn) Go 異步任務(wù)處理

          共 4483字,需瀏覽 9分鐘

           ·

          2023-05-16 22:14

          # 1. 介紹

          Asynq 是一個(gè) Go 庫,用于對(duì)任務(wù)進(jìn)行排隊(duì)并與工作人員異步處理它們。

          它的工作原理:

          • 客戶端將任務(wù)放入隊(duì)列

          • 服務(wù)器從隊(duì)列中拉出任務(wù)并為每個(gè)任務(wù)啟動(dòng)一個(gè)工作 goroutine

          • 多個(gè)工作人員同時(shí)處理任務(wù)

          倉庫鏈接:https://github.com/hibiken/asynq)

          # 2. 快速開始

          ?2.1 準(zhǔn)備工作

          確保已安裝并運(yùn)行了redis

          redis-server

          安裝asynq軟件包

          go?get?-u?github.com/hibiken/asynq

          創(chuàng)建項(xiàng)目asynq_task,目錄結(jié)構(gòu):

          8ef0ceae3579374df568a1fe83bb3c81.webp

          ?2.2 Redis連接項(xiàng)

          Asynq 使用 Redis 作為消息代理。
          client.go 和 main.go 都需要連接到 Redis 進(jìn)行寫入和讀取。
          我們將使用 RedisClientOpt 指定如何連接到本地 Redis 實(shí)例。

          asynq.RedisClientOpt{
          ????Addr:?????"127.0.0.1:6379",
          ????Password:?"",
          ????DB:???????2,
          }

          ?2.3 Task任務(wù)

          type?Task?struct?{
          ????//?一個(gè)簡(jiǎn)單的字符串值,表示要執(zhí)行的任務(wù)的類型.
          ????typename?string

          ????//?有效載荷保存執(zhí)行任務(wù)所需的數(shù)據(jù),有效負(fù)載值必須是可序列化的.
          ????payload?[]byte

          ????//?保存任務(wù)的選項(xiàng).
          ????opts?[]Option

          ????//?任務(wù)的結(jié)果編寫器.
          ????w?*ResultWriter
          }

          ?2.4 編寫程序

          test_delivery.go : 一個(gè)封裝任務(wù)創(chuàng)建和任務(wù)處理的包

          package?test_delivery

          import?(
          ????"context"
          ????"encoding/json"
          ????"fmt"
          ????"github.com/hibiken/asynq"
          ????"log"
          )

          const?(
          ????TypeEmailDelivery?=?"email:deliver"
          )

          type?EmailDeliveryPayload?struct?{
          ????UserID?????int
          ????TemplateID?string
          }

          func?NewEmailDeliveryTask(userID?int,?tmplID?string)?(*asynq.Task,?error)?{
          ????payload,?err?:=?json.Marshal(EmailDeliveryPayload{UserID:?userID,?TemplateID:?tmplID})
          ????if?err?!=?nil?{
          ????????fmt.Println(err)
          ????????return?nil,?err
          ????}
          ????return?asynq.NewTask(TypeEmailDelivery,?payload),?nil
          }

          func?HandleEmailDeliveryTask(ctx?context.Context,?t?*asynq.Task)?error?{
          ????var?p?EmailDeliveryPayload
          ????if?err?:=?json.Unmarshal(t.Payload(),?&p);?err?!=?nil?{
          ????????return?fmt.Errorf("json.Unmarshal?failed:?%v:?%w",?err,?asynq.SkipRetry)
          ????}
          ????//邏輯處理start...
          ????log.Printf("Sending?Email?to?User:?user_id=%d,?template_id=%s",?p.UserID,?p.TemplateID)
          ????return?nil
          }

          client.go: 在應(yīng)用程序代碼中,導(dǎo)入上述包并用于Client將任務(wù)放入隊(duì)列中。

          package?client

          import?(
          ????"asynq_task/test_delivery"
          ????"github.com/hibiken/asynq"
          ????"log"
          )

          func?EmailDeliveryTaskAdd()?{
          ????client?:=?asynq.NewClient(asynq.RedisClientOpt{
          ????????Addr:?????"127.0.0.1:6379",
          ????????Password:?"",
          ????????DB:???????2,
          ????})
          ????defer?client.Close()

          ????task,?err?:=?test_delivery.NewEmailDeliveryTask(42,?"some:template:id")
          ????if?err?!=?nil?{
          ????????log.Fatalf("could?not?create?task:?%v",?err)
          ????}
          ????info,?err?:=?client.Enqueue(task)
          ????if?err?!=?nil?{
          ????????log.Fatalf("could?not?enqueue?task:?%v",?err)
          ????}
          ????log.Printf("enqueued?task:?id=%s?queue=%s",?info.ID,?info.Queue)
          }

          main.go: ?異步任務(wù)服務(wù)入口文件

          接下來,啟動(dòng)一個(gè)工作服務(wù)器以在后臺(tái)處理這些任務(wù)。要啟動(dòng)后臺(tái)工作人員,使用Server并提供您Handler來處理任務(wù)。可以選擇使用ServeMux來創(chuàng)建處理程序,就像使用net/httpHandler 一樣。

          package?main

          import?(
          ????"asynq_task/test_delivery"
          ????"github.com/hibiken/asynq"
          ????"log"
          )

          func?main()?{
          ????srv?:=?asynq.NewServer(
          ????????asynq.RedisClientOpt{
          ????????????Addr:?????"127.0.0.1:6379",
          ????????????Password:?"",
          ????????????DB:???????2,
          ????????},
          ????????asynq.Config{
          ????????????//?每個(gè)進(jìn)程并發(fā)執(zhí)行的worker數(shù)量
          ????????????Concurrency:?5,
          ????????????//?Optionally?specify?multiple?queues?with?different?priority.
          ????????????Queues:?map[string]int{
          ????????????????"critical":?6,
          ????????????????"default":??3,
          ????????????????"low":??????1,
          ????????????},
          ????????????//?See?the?godoc?for?other?configuration?options
          ????????},
          ????)

          ????mux?:=?asynq.NewServeMux()
          ????mux.HandleFunc(test_delivery.TypeEmailDelivery,?test_delivery.HandleEmailDeliveryTask)

          ????if?err?:=?srv.Run(mux);?err?!=?nil?{
          ????????log.Fatalf("could?not?run?server:?%v",?err)
          ????}
          }

          Asynq 是一個(gè) Go 庫(https://github.com/hibiken/asynq),用于對(duì)任務(wù)進(jìn)行排隊(duì)并與工作人員異步處理它們。用來分發(fā)異步任務(wù)

          package?main

          import?(
          ????"asynq_task/test_delivery/client"
          ????"time"
          )

          func?main()?{
          ????for?i?:=?0;?i?<?3;?i++?{
          ????????client.EmailDeliveryTaskAdd()
          ????????time.Sleep(time.Second?*?3)
          ????}
          }

          ?5. 運(yùn)行查看結(jié)果

          1. 首先,我們要先把異步任務(wù)啟動(dòng)起來準(zhǔn)備好接收,也就是啟動(dòng)cmd/main.go

          2. 啟動(dòng)test.go文件向異步任務(wù)服務(wù)添加任務(wù)隊(duì)列

          結(jié)果如下:

          go run main.go

          7b7222d4d27f4eba43290588c248db66.webpgo run test.go

          1c4d0d4cab2028a8e241a77afbb9b965.webp

          # 3. 細(xì)節(jié)

          ?3.1 關(guān)于asynq的優(yōu)雅退出

          如果異步服務(wù)突然被暫停,正在執(zhí)行的異步任務(wù)會(huì)push到隊(duì)列中,下次啟動(dòng)的時(shí)候自動(dòng)執(zhí)行。

          我們可以將一個(gè)異步任務(wù)中途sleep幾秒,發(fā)送一個(gè)異步任務(wù),任務(wù)沒執(zhí)行完中途停掉任務(wù)測(cè)試出結(jié)果:

          f4694ef8307b16684375796a33b49732.webp

          再次啟動(dòng)異步任務(wù)服務(wù),發(fā)現(xiàn)這個(gè)任務(wù)被重新執(zhí)行。

          ?3.2 client中 client.Enqueue 的使用

          1) 立即處理任務(wù)

          client.Enqueue(t1,?time.Now())

          2)延時(shí)處理任務(wù), 兩小時(shí)后處理

          client.Enqueue(t2,?asynq.ProcessIn(time.Now().Add(2?*?time.Hour)))

          3) 任務(wù)重試,最大重試次數(shù)為25次。

          client.Enqueue(task,?asynq.MaxRetry(5))

          4)確保任務(wù)的唯一性

          4-1:使用TaskID選項(xiàng):自行生成唯一的任務(wù) ID

          _,?err?:=?client.Enqueue(task,?asynq.TaskID("mytaskid"))

          //?Second?task?will?fail,?err?is?ErrTaskIDConflict?(assuming?that?the?first?task?didn't?get?processed?yet)
          _,?err?=?client.Enqueue(task,?asynq.TaskID("mytaskid"))

          4-2:使用Unique選項(xiàng):讓 Asynq 為任務(wù)創(chuàng)建唯一性鎖

          err?:=?c.Enqueue(t1,?asynq.Unique(time.Hour))

          另外,asynq異步任務(wù)提供了命令行工具和Asynqmon用于監(jiān)控和管理Asynq異步任務(wù)和隊(duì)列。WebUI可以通過傳遞兩個(gè)標(biāo)志來啟用與 Prometheus 的集成。

          作者:sweey_lff原文鏈接:https://huaweicloud.csdn.net/637ef508df016f70ae4ca586.html?

          ? ?

          0271cff4d475d6182e3be8db6e4c2843.webp
          喜歡明哥文章的同學(xué)歡迎長(zhǎng)按下圖訂閱!

          ???

          瀏覽 111
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲AV一二三区 | 成人91久久 | 青娱乐在线播放视频 | 一级无线免费视频 | 亚洲精品一二三四区 |