<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          【GoCN酷Go推薦】分布式任務(wù) + 消息隊(duì)列框架 go-queue

          共 6295字,需瀏覽 13分鐘

           ·

          2021-03-22 12:42

          目錄



          1.為什么寫這個(gè)庫


          2.應(yīng)用場景有哪些


          3.如何使用


          4.總結(jié)


           為什么要寫這個(gè)庫?


          在開始自研 go-queue 之前,針對以下我們調(diào)研目前的開源隊(duì)列方案:

          beanstalkd

          beanstalkd 有一些特殊好用功能:支持任務(wù) priority、延時(shí) (delay)、超時(shí)重發(fā) (time-to-run) 和預(yù)留 (buried),能夠很好的支持分布式的后臺任務(wù)和定時(shí)任務(wù)處理。如下是 beanstalkd 基本部分:

          • job:任務(wù)單元;
          • tube:任務(wù)隊(duì)列,存儲統(tǒng)一類型 job。producer 和 consumer 操作對象;
          • producerjob 生產(chǎn)者,通過 put 將 job 加入一個(gè) tube;
          • consumerjob 消費(fèi)者,通過 reserve/release/bury/delete 來獲取 job 或改變 job 的狀態(tài);

          很幸運(yùn)的是官方提供了 go client:https://github.com/beanstalkd/go-beanstalk。

          但是這對不熟悉 beanstalkd 操作的 go 開發(fā)者而言,需要學(xué)習(xí)成本。

          kafka

          類似基于 kafka 消息隊(duì)列作為存儲的方案,存儲單元是消息,如果要實(shí)現(xiàn)延時(shí)執(zhí)行,可以想到的方案是以延時(shí)執(zhí)行的時(shí)間作為 topic,這樣在大型的消息系統(tǒng)中,充斥大量一次性的 topicdq_1616324404788, dq_1616324417622),當(dāng)時(shí)間分散,會容易造成磁盤隨機(jī)寫的情況。

          而且在 go 生態(tài)中,

          同時(shí)考慮以下因素:

          • 支持延時(shí)任務(wù)
          • 高可用,保證數(shù)據(jù)不丟失
          • 可擴(kuò)展資源和性能

          所以我們自己基于以上兩個(gè)基礎(chǔ)組件開發(fā)了 go-queue

          1. 基于 beanstalkd 開發(fā)了 dq,支持定時(shí)和延時(shí)操作。同時(shí)加入 redis 保證消費(fèi)唯一性。
          2. 基于 kafka 開發(fā)了 kq,簡化生產(chǎn)者和消費(fèi)者的開發(fā) API,同時(shí)在寫入 kafka 使用批量寫,節(jié)省 IO。

          整體設(shè)計(jì)如下:

          應(yīng)用場景

          首先在消費(fèi)場景來說,一個(gè)是針對任務(wù)隊(duì)列,一個(gè)是消息隊(duì)列。而兩者最大的區(qū)別:

          • 任務(wù)是沒有順序約束;消息需要;
          • 任務(wù)在加入中,或者是等待中,可能存在狀態(tài)更新(或是取消);消息則是單一的存儲即可;

          所以在背后的基礎(chǔ)設(shè)施選型上,也是基于這種消費(fèi)場景。

          • dq:依賴于beanstalkd ,適合延時(shí)、定時(shí)任務(wù)執(zhí)行;
          • kq:依賴于 kafka ,適用于異步、批量任務(wù)執(zhí)行;

          而從其中 dq的 API 中也可以看出:

          // 延遲任務(wù)執(zhí)行
          - dq.Delay(msg, delayTime);

          // 定時(shí)任務(wù)執(zhí)行
          - dq.At(msg, atTime);

          而在我們內(nèi)部:

          • 如果是 異步消息消費(fèi)/推送 ,則會選擇使用kqkq.Push(msg)
          • 如果是 15 分鐘提醒/ 明天中午發(fā)送短信 等,則使用 dq

          如何使用

          分別介紹dqkq的使用方式:

          dq

          // [Producer]
          producer := dq.NewProducer([]dq.Beanstalk{
              {
                  Endpoint: "localhost:11300",
                  Tube:     "tube",
              },
              {
                  Endpoint: "localhost:11301",
                  Tube:     "tube",
              },
          })  

          for i := 1000; i < 1005; i++ {
              _, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
              if err != nil {
                  fmt.Println(err)
              }
          }
          // [Consumer]
          consumer := dq.NewConsumer(dq.DqConf{
            Beanstalks: []dq.Beanstalk{
              {
                Endpoint: "localhost:11300",
                Tube:     "tube",
              },
              {
                Endpoint: "localhost:11301",
                Tube:     "tube",
              },
            },
            Redis: redis.RedisConf{
              Host: "localhost:6379",
              Type: redis.NodeType,
            },
          })
          consumer.Consume(func(body []byte) {
            // your consume logic
            fmt.Println(string(body))
          })

          和普通的 生產(chǎn)者 - 消費(fèi)者 模型類似,開發(fā)者也只需要關(guān)注以下:

          1. 開發(fā)者只需要關(guān)注自己的任務(wù)類型「延時(shí)/定時(shí)」
          2. 消費(fèi)端的消費(fèi)邏輯

          kq

          producer.go:

          // message structure
          type message struct {
              Key     string `json:"key"`
              Value   string `json:"value"`
              Payload string `json:"message"`
          }

          pusher := kq.NewPusher([]string{
              "127.0.0.1:19092",
              "127.0.0.1:19093",
              "127.0.0.1:19094",
          }, "kq")

          ticker := time.NewTicker(time.Millisecond)
          for round := 0; round < 3; round++ {
              select {
              case <-ticker.C:
                  count := rand.Intn(100)
              // 準(zhǔn)備消息
                  m := message{
                      Key:     strconv.FormatInt(time.Now().UnixNano(), 10),
                      Value:   fmt.Sprintf("%d,%d", round, count),
                      Payload: fmt.Sprintf("%d,%d", round, count),
                  }
                  body, err := json.Marshal(m)
                  if err != nil {
                      log.Fatal(err)
                  }

                  fmt.Println(string(body))
              // push to kafka broker
                  if err := pusher.Push(string(body)); err != nil {
                      log.Fatal(err)
                  }
              }
          }

          config.yaml

          Name: kq
          Brokers:
            - 127.0.0.1:19092
            - 127.0.0.1:19092
            - 127.0.0.1:19092
          Group: adhoc
          Topic: kq
          Offset: first
          Consumers: 1

          consumer.go:

          var c kq.KqConf
          conf.MustLoad("config.yaml", &c)

          // WithHandle: 具體的處理msg的logic
          // 這也是開發(fā)者需要根據(jù)自己的業(yè)務(wù)定制化
          q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error {
            fmt.Printf("=> %s\n", v)
            return nil
          }))
          defer q.Stop()
          q.Start()

          dq 不同的是:開發(fā)者不需要關(guān)注任務(wù)類型(在這里也沒有任務(wù)的概念,傳遞的都是 message data)。

          其他操作和 dq 類似,只是將 業(yè)務(wù)處理函數(shù) 當(dāng)成配置直接傳入消費(fèi)者中。

          總結(jié) 

          在我們目前的場景中,kq 大量使用在我們的異步消息服務(wù);而延時(shí)任務(wù),我們除了 dq,還可以使用內(nèi)存版的 TimingWheel「go-zero 生態(tài)組件」。

          關(guān)于 go-queue 更多的設(shè)計(jì)和實(shí)現(xiàn)文章,可以持續(xù)關(guān)注我們。歡迎大家去關(guān)注和使用。

          https://github.com/tal-tech/go-queue

          https://github.com/tal-tech/go-zero

          歡迎使用 go-zero 并 star 支持我們!

          還想了解更多嗎?

          更多請查看:https://github.com/tidwall/gjson

          歡迎加入我們GOLANG中國社區(qū):https://gocn.vip/


          《酷Go推薦》招募:


          各位Gopher同學(xué),最近我們社區(qū)打算推出一個(gè)類似GoCN每日新聞的新欄目《酷Go推薦》,主要是每周推薦一個(gè)庫或者好的項(xiàng)目,然后寫一點(diǎn)這個(gè)庫使用方法或者優(yōu)點(diǎn)之類的,這樣可以真正的幫助到大家能夠?qū)W習(xí)到新的庫,并且知道怎么用。


          大概規(guī)則和每日新聞類似,如果報(bào)名人多的話每個(gè)人一個(gè)月輪到一次,歡迎大家報(bào)名!


          點(diǎn)擊 閱讀原文 即刻報(bào)名




          瀏覽 36
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美性爱乱 | 黄色做爱视频网站 | 精品无码一区二区三区的天堂 | 人人前后碰 | 欧美精品第一页 |