一個 demo 學會 workerPool
via:https://www.pixelstech.net/article/1611483826-Demo-on-creating-worker-pool-in-GoLang
作者:sonic0002
今天給大家分享一篇關于 workPool 的文章,這個平時大家應該用的比較多,一起來看下。
原文如下:
工作池是這樣一個池子,會創(chuàng)建指定數(shù)量的 worker,這些 worker 能獲取任務并處理。允許多個任務同時處理,但是需要維持固定數(shù)量的 worker 避免系統(tǒng)資源被過度使用。
通常有兩種方式創(chuàng)建任務池:
一種是預先創(chuàng)建固定數(shù)量的 worker; 另外一種是當有需要的時候才會創(chuàng)建 worker,當然也會有數(shù)量限制;
本文將與大家一起討論第一種方式。當我們預先知道有許多任務需要同時運行,并且很大概率會用上最大數(shù)量的 worker,通常會采用這種方式。
為了演示,我們先創(chuàng)建 Worker 結構體,它獲取任務并執(zhí)行。
import (
"fmt"
)
// Worker ...
type Worker struct {
ID int
Name string
StopChan chan bool
}
// Start ...
func (w *Worker) Start(jobQueue chan Job) {
w.StopChan = make(chan bool)
successChan := make(chan bool)
go func() {
successChan <- true
for {
// take job
job := <-jobQueue
if job != nil {
job.Start(w)
} else {
fmt.Printf("worker %s to be stopped\n", w.Name)
w.StopChan <- true
break
}
}
}()
// wait for the worker to start
<-successChan
}
// Stop ...
func (w *Worker) Stop() {
// wait for the worker to stop, blocking
_ = <-w.StopChan
fmt.Printf("worker %s stopped\n", w.Name)
}
Worker 有一些屬性保存當前的狀態(tài),另外還聲明了兩個方法分別用于啟動、停止 worker。
在 Start() 方法里,創(chuàng)建了兩個 channel 分別用于 worker 的啟動和停止。最重要的是 for 循環(huán)里面,worker 會一直等待獲取 job 并可執(zhí)行的直到任務隊列關閉。
Job 是包含單個方法 Start() 的接口,所以只要實現(xiàn) Start() 方法就可以有不同類型的 job。
// Job ...
type Job interface {
Start(worker *Worker) error
}
一旦 Worker 確定之后,接下來就是創(chuàng)建 pool 來管理 workers。
import (
"fmt"
"sync"
)
// Pool ...
type Pool struct {
Name string
Size int
Workers []*Worker
QueueSize int
Queue chan Job
}
// Initiualize ...
func (p *Pool) Initialize() {
// maintain minimum 1 worker
if p.Size < 1 {
p.Size = 1
}
p.Workers = []*Worker{}
for i := 1; i <= p.Size; i++ {
worker := &Worker{
ID: i - 1,
Name: fmt.Sprintf("%s-worker-%d", p.Name, i-1),
}
p.Workers = append(p.Workers, worker)
}
// maintain min queue size as 1
if p.QueueSize < 1 {
p.QueueSize = 1
}
p.Queue = make(chan Job, p.QueueSize)
}
// Start ...
func (p *Pool) Start() {
for _, worker := range p.Workers {
worker.Start(p.Queue)
}
fmt.Println("all workers started")
}
// Stop ...
func (p *Pool) Stop() {
close(p.Queue) // close the queue channel
var wg sync.WaitGroup
for _, worker := range p.Workers {
wg.Add(1)
go func(w *Worker) {
defer wg.Done()
w.Stop()
}(worker)
}
wg.Wait()
fmt.Println("all workers stopped")
}
Pool 包含 worker 切片和用于保存 job 的隊列。worker 的數(shù)量在初始化的時候是可以自定義。
關鍵點在 Stop() 的邏輯,當它被調用時,會先關閉 job 隊列,worker 便會從 job 隊列讀到 nil,接著就會關閉對應的 worker。接著在 for 循環(huán)里,等待 worker 并發(fā)地停止直到最后一個 worker 停止。
為了演示整體邏輯,下面的例子展示了一個僅僅輸出值的 job。
import "fmt"
func main() {
pool := &Pool{
Name: "test",
Size: 5,
QueueSize: 20,
}
pool.Initialize()
pool.Start()
defer pool.Stop()
for i := 1; i <= 100; i++ {
job := &PrintJob{
Index: i,
}
pool.Queue <- job
}
}
// PrintJob ...
type PrintJob struct {
Index int
}
func (pj *PrintJob) Start(worker *Worker) error {
fmt.Printf("job %s - %d\n", worker.Name, pj.Index)
return nil
}
如果你看了上面的代碼邏輯,就會發(fā)現(xiàn)很簡單,創(chuàng)建了有 5 個 worker 的工作池并且 job 隊列的大小是 20。
接著,模擬 job 創(chuàng)建和處理過程:一旦 job 被創(chuàng)建就會 push 到任務隊列里,等待著的 worker 便會從隊列里取出 job 并處理。
類似下面這樣的輸出:
all workers started
job test-worker-3 - 4
job test-worker-3 - 6
job test-worker-3 - 7
job test-worker-3 - 8
job test-worker-3 - 9
job test-worker-3 - 10
job test-worker-3 - 11
job test-worker-3 - 12
job test-worker-3 - 13
job test-worker-3 - 14
job test-worker-3 - 15
job test-worker-3 - 16
job test-worker-3 - 17
job test-worker-3 - 18
job test-worker-3 - 19
job test-worker-3 - 20
worker test-worker-3 to be stopped
job test-worker-4 - 5
job test-worker-0 - 1
worker test-worker-3 stopped
job test-worker-2 - 3
worker test-worker-2 to be stopped
worker test-worker-2 stopped
worker test-worker-4 to be stopped
worker test-worker-4 stopped
worker test-worker-0 to be stopped
worker test-worker-0 stopped
job test-worker-1 - 2
worker test-worker-1 to be stopped
worker test-worker-1 stopped
all workers stopped
推薦閱讀
