<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          一個 demo 學會 workerPool

          共 6459字,需瀏覽 13分鐘

           ·

          2021-08-10 00:32

          via:https://www.pixelstech.net/article/1611483826-Demo-on-creating-worker-pool-in-GoLang

          作者:sonic0002

          今天給大家分享一篇關于 workPool 的文章,這個平時大家應該用的比較多,一起來看下。

          原文如下:


          工作池是這樣一個池子,會創(chuàng)建指定數(shù)量的 worker,這些 worker 能獲取任務并處理。允許多個任務同時處理,但是需要維持固定數(shù)量的 worker 避免系統(tǒng)資源被過度使用。

          通常有兩種方式創(chuàng)建任務池:

          • 一種是預先創(chuàng)建固定數(shù)量的 worker;
          • 另外一種是當有需要的時候才會創(chuàng)建 worker,當然也會有數(shù)量限制;

          本文將與大家一起討論第一種方式。當我們預先知道有許多任務需要同時運行,并且很大概率會用上最大數(shù)量的 worker,通常會采用這種方式。

          為了演示,我們先創(chuàng)建 Worker 結構體,它獲取任務并執(zhí)行。

          import (
           "fmt"
          )

          // Worker ...
          type Worker struct {
           ID       int
           Name     string
           StopChan chan bool
          }

          // Start ...
          func (w *Worker) Start(jobQueue chan Job) {
           w.StopChan = make(chan bool)
           successChan := make(chan bool)

           go func() {
            successChan <- true
            for {
             // take job
             job := <-jobQueue
             if job != nil {
              job.Start(w)
             } else {
              fmt.Printf("worker %s to be stopped\n", w.Name)
              w.StopChan <- true
              break
             }
            }
           }()

           // wait for the worker to start
           <-successChan
          }

          // Stop ...
          func (w *Worker) Stop() {
           // wait for the worker to stop, blocking
           _ = <-w.StopChan
           fmt.Printf("worker %s stopped\n", w.Name)
          }

          Worker 有一些屬性保存當前的狀態(tài),另外還聲明了兩個方法分別用于啟動、停止 worker。

          在 Start() 方法里,創(chuàng)建了兩個 channel 分別用于 worker 的啟動和停止。最重要的是 for 循環(huán)里面,worker 會一直等待獲取 job 并可執(zhí)行的直到任務隊列關閉。

          Job 是包含單個方法 Start() 的接口,所以只要實現(xiàn) Start() 方法就可以有不同類型的 job。

          // Job ...
          type Job interface {
           Start(worker *Worker) error
          }

          一旦 Worker 確定之后,接下來就是創(chuàng)建 pool 來管理 workers。

          import (
           "fmt"
           "sync"
          )

          // Pool ...
          type Pool struct {
           Name string

           Size    int
           Workers []*Worker

           QueueSize int
           Queue     chan Job
          }

          // Initiualize ...
          func (p *Pool) Initialize() {
           // maintain minimum 1 worker
           if p.Size < 1 {
            p.Size = 1
           }
           p.Workers = []*Worker{}
           for i := 1; i <= p.Size; i++ {
            worker := &Worker{
             ID:   i - 1,
             Name: fmt.Sprintf("%s-worker-%d", p.Name, i-1),
            }
            p.Workers = append(p.Workers, worker)
           }

           // maintain min queue size as 1
           if p.QueueSize < 1 {
            p.QueueSize = 1
           }
           p.Queue = make(chan Job, p.QueueSize)
          }

          // Start ...
          func (p *Pool) Start() {
           for _, worker := range p.Workers {
            worker.Start(p.Queue)
           }
           fmt.Println("all workers started")
          }

          // Stop ...
          func (p *Pool) Stop() {
           close(p.Queue) // close the queue channel

           var wg sync.WaitGroup
           for _, worker := range p.Workers {
            wg.Add(1)
            go func(w *Worker) {
             defer wg.Done()

             w.Stop()
            }(worker)
           }
           wg.Wait()
           fmt.Println("all workers stopped")
          }

          Pool 包含 worker 切片和用于保存 job 的隊列。worker 的數(shù)量在初始化的時候是可以自定義。

          關鍵點在 Stop() 的邏輯,當它被調用時,會先關閉 job 隊列,worker 便會從 job 隊列讀到 nil,接著就會關閉對應的 worker。接著在 for 循環(huán)里,等待 worker 并發(fā)地停止直到最后一個 worker 停止。

          為了演示整體邏輯,下面的例子展示了一個僅僅輸出值的 job。

          import "fmt"

          func main() {
           pool := &Pool{
            Name:      "test",
            Size:      5,
            QueueSize: 20,
           }
           pool.Initialize()
           pool.Start()
                  defer pool.Stop()

           for i := 1; i <= 100; i++ {
            job := &PrintJob{
             Index: i,
            }
            pool.Queue <- job
           }
          }

          // PrintJob ...
          type PrintJob struct {
           Index int
          }

          func (pj *PrintJob) Start(worker *Worker) error {
           fmt.Printf("job %s - %d\n", worker.Name, pj.Index)
           return nil
          }

          如果你看了上面的代碼邏輯,就會發(fā)現(xiàn)很簡單,創(chuàng)建了有 5 個 worker 的工作池并且 job 隊列的大小是 20。

          接著,模擬 job 創(chuàng)建和處理過程:一旦 job 被創(chuàng)建就會 push 到任務隊列里,等待著的 worker 便會從隊列里取出 job 并處理。

          類似下面這樣的輸出:

          all workers started
          job test-worker-3 - 4
          job test-worker-3 - 6
          job test-worker-3 - 7
          job test-worker-3 - 8
          job test-worker-3 - 9
          job test-worker-3 - 10
          job test-worker-3 - 11
          job test-worker-3 - 12
          job test-worker-3 - 13
          job test-worker-3 - 14
          job test-worker-3 - 15
          job test-worker-3 - 16
          job test-worker-3 - 17
          job test-worker-3 - 18
          job test-worker-3 - 19
          job test-worker-3 - 20
          worker test-worker-3 to be stopped
          job test-worker-4 - 5
          job test-worker-0 - 1
          worker test-worker-3 stopped
          job test-worker-2 - 3
          worker test-worker-2 to be stopped
          worker test-worker-2 stopped
          worker test-worker-4 to be stopped
          worker test-worker-4 stopped
          worker test-worker-0 to be stopped
          worker test-worker-0 stopped
          job test-worker-1 - 2
          worker test-worker-1 to be stopped
          worker test-worker-1 stopped
          all workers stopped




          推薦閱讀


          福利

          我為大家整理了一份從入門到進階的Go學習資料禮包,包含學習建議:入門看什么,進階看什么。關注公眾號 「polarisxu」,回復 ebook 獲??;還可以回復「進群」,和數(shù)萬 Gopher 交流學習。

          瀏覽 91
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩免费黄色视频 | 成人天堂一区二区三区精华液功效 | 久久久成人片 | 日日干视屏 | 一级免费爱爱 |