<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          還在用crontab? 分布式定時(shí)任務(wù)了解一下

          共 4762字,需瀏覽 10分鐘

           ·

          2021-03-18 22:19

          前言

          日常任務(wù)開(kāi)放中,我們會(huì)有很多異步、批量、定時(shí)、延遲任務(wù)要處理,go-zero中有 go-queue,推薦使用 go-queue 去處理,go-queue 本身也是基于 go-zero 開(kāi)發(fā)的,其本身是有兩種模式:

          • dq:依賴于beanstalkd ,適合延時(shí)、定時(shí)任務(wù)執(zhí)行;
          • kq:依賴于 kafka ,適用于異步、批量任務(wù)執(zhí)行;

          本篇就先從 dq 開(kāi)始,慢慢探究 go-queue 背后執(zhí)行的邏輯。

          dq 簡(jiǎn)介

          dq 封裝底層 beanstalkd 操作,分布式存儲(chǔ),延遲、定時(shí)設(shè)置。重啟服務(wù)可以重新執(zhí)行,但是消息不會(huì)丟失,因?yàn)橄⒌奶幚矶冀挥?beanstalkd 完成。

          可以看出使用非常簡(jiǎn)單,同時(shí) dq 中使用了 redis setnx 保證了每個(gè)消息只被消費(fèi)一次。但是在生產(chǎn)者端沒(méi)有使用 redis 做消息存儲(chǔ),這個(gè)和前面描述的一致。

          對(duì) dq 的整體架構(gòu)做了簡(jiǎn)單介紹,下面就開(kāi)始正式的探索 :hammer:

          生產(chǎn)者 example

          func main() {
           producer := dq.NewProducer([]dq.Beanstalk{
            {
             Endpoint: "localhost:11300",
             Tube:     "tube",
            },
            {
             Endpoint: "localhost:11301",
             Tube:     "tube",
            },
           })
           for i := 1000; i < 1005; i++ {
              // Delay:延遲執(zhí)行
            _, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
              // At:在某一個(gè)時(shí)刻執(zhí)行
            //_, err := producer.At([]byte(strconv.Itoa(i)), time.Now().Add(time.Second*5))
            if err != nil {
             fmt.Println(err)
            }
           }
          }

          從使用上,簡(jiǎn)單分為兩步:

          1. NewProducer(opts):將本地隊(duì)列的端口配置和主題配置傳入生產(chǎn)者;
          2. producer.Delay():使用剛創(chuàng)建好的 生產(chǎn)者,調(diào)用它的 Delay() 。將需要異步發(fā)送的消息傳入,Delay 還需要傳入延遲執(zhí)行的時(shí)間。

          需要說(shuō)明的是:創(chuàng)建的 producer 是一個(gè)接口,Delay() 只是接口其中的一個(gè)方法。后續(xù)會(huì)其他的方法和內(nèi)部設(shè)計(jì)。那我們就繼續(xù)往下探索吧~~~

          深入生產(chǎn)者執(zhí)行流程

          下面從 example 的代碼進(jìn)去,看整個(gè)函數(shù)的調(diào)用鏈。

          初始化

          dq.NewProducer([]dq.Beanstalk{{opt1}, {opt2}, ...}) // 初始化生產(chǎn)者
           |- NewProducerNode(endpoint, tube)        // endpoint,tube 來(lái)自傳入的配置數(shù)組

          緊接著就到 producerNode.go ,這個(gè)部分就會(huì)牽涉到 beanstalk 的初始化:

          NewProducerNode(endpoint, tube)
           |- conn: newConnection(endpoint, tube)
            |- return &connection{} 

          這就涉及到 beanstalkconnection.conn -> *beanstalk.Conn

          但是在 newConnection() 中并沒(méi)有對(duì) beanstalk.Conn 進(jìn)行初始化,這屬于 延遲初始化

          Delay

          首先是生產(chǎn)者端調(diào)用 producer.Delay(data, timesecond) ,就把消息插入到內(nèi)部隊(duì)列,timesecond 就是延遲執(zhí)行的時(shí)間。我們來(lái)看看 Delay() 到底做了什么?

          p.Delay(data, timesecond)
           |- p.wrap(data, time)   // 將 data 和 time 包裝到一塊
            |- p.insert(nodeFn)
             |- node.Delay()    // for rangre p.node 每一個(gè)node都執(zhí)行一遍 `Delay()`

          p.insert 就是將上一步封裝好的 data 傳遞給 p{cluster} 的每一個(gè)node去執(zhí)行 node.Delay。

          在前面的 初始化 說(shuō)過(guò),最開(kāi)始是沒(méi)有對(duì) conn 進(jìn)行初始化,那現(xiàn)在要插入數(shù)據(jù),總不能不初始化這個(gè) conn

          node.Delay()         // 配置中的每個(gè)node都執(zhí)行 `Delay()`
           |- node.conn.get()     // 獲取node中的conn【conn==nil,就初始化一個(gè)conn】
           |- _, err := conn.Put(data, deplay, opts...)
            |- node.conn.reset()    // 出現(xiàn)err情況下,如OOM/Timeout等情況 -> 關(guān)閉conn,防止泄漏

          所以最后 Delay 實(shí)際上是執(zhí)行 tube.Put(data, delay)

          tube.Put(data, delay)
           |- tube.Conn.cmd("put", ...)  // 生產(chǎn)者發(fā)布job

          這里就涉及到 beanstalkPut 操作:首先看看生產(chǎn)者 Put 指令參數(shù)說(shuō)明:

          put <pri> <delay> <ttr> <bytes> <data>
          • <pri> :優(yōu)先級(jí),值越小優(yōu)先級(jí)越高,默認(rèn)為1024;
          • <delay> :延遲 ready 秒數(shù),在這段時(shí)間 job 為 delayed 狀態(tài);
          • <ttr>time to run ,允許 worker 執(zhí)行的最大秒數(shù),如果 worker 在這段時(shí)間不能 delete,release,bury job,那么當(dāng) job 超時(shí),服務(wù)器將自動(dòng) release 此job;
          • <bytes>job body的長(zhǎng)度,不包含\r\n;
          • <data>:job body data;

          OK。那插入 job 成功,響應(yīng)什么呢?

          INSERTED <id>\r\n

          返回的 id 是插入 job 的任務(wù)ID。到此 Put 分析完畢,跟著代碼走一遍:

          tube.Put(data, priority, daley, ttr)
           |- tube.Conn.cmd("put", ...)
           |- tube.Conn.readResp("INSERTED id")
          |- return id, err   // 將id返回

          這樣我們?cè)?example 中直接可以看到的 生產(chǎn)者 執(zhí)行的操作就介紹完了。上圖,圖更好說(shuō)話:

          producer interface

          那么除了 example 中使用的 Delay() ,還有其余幾個(gè)方法:

          Producer interface {
            At(body []byte, at time.Time) (string, error)
            Close() error
            Delay(body []byte, delay time.Duration) (string, error)
            Revoke(ids string) error
          }
          • At:指定某個(gè)時(shí)間執(zhí)行【實(shí)質(zhì)也是執(zhí)行 Delay()
          • Close:關(guān)閉全部node的連接
          • Delay:延遲執(zhí)行。傳入延遲的時(shí)間。
          • Revoke:實(shí)質(zhì)上是當(dāng)出現(xiàn)最小寫(xiě)入節(jié)點(diǎn)<2時(shí),觸發(fā)添加失敗,將添加成功的job刪除掉。

          當(dāng)然,事實(shí)上 dq 使用上,開(kāi)發(fā)者只需要使用 At/Delay 就行了。也就是你只要知道你的任務(wù)是定時(shí)觸發(fā)還是延遲觸發(fā)即可。剩下的,dq 內(nèi)部的封裝都已經(jīng)幫你做好了。

          框架地址

          https://github.com/tal-tech/go-queue

          同時(shí)在 go-queue 也大量使用 go-zero 的流式處理庫(kù) fx。

          https://github.com/tal-tech/go-zero

          歡迎使用 go-queuestar 支持我們!一起構(gòu)建 go-zero 生態(tài)!??



          推薦閱讀


          福利

          我為大家整理了一份從入門(mén)到進(jìn)階的Go學(xué)習(xí)資料禮包,包含學(xué)習(xí)建議:入門(mén)看什么,進(jìn)階看什么。關(guān)注公眾號(hào) 「polarisxu」,回復(fù) ebook 獲??;還可以回復(fù)「進(jìn)群」,和數(shù)萬(wàn) Gopher 交流學(xué)習(xí)。

          瀏覽 41
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  久久精品96无码内射 | 欧美精品成人一区二区在线观看 | 逼特逼视频网 | 黄色毛片av成人免费 | 黄色片黄色片一级片不卡片 |