<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Go語言的并發(fā)與WorkerPool - 第一部分

          共 7727字,需瀏覽 16分鐘

           ·

          2021-09-02 20:33

          via:

          https://hackernoon.com/concurrency-in-golang-and-workerpool-part-1-e9n31ao
          作者:Hasan


          原文如下:


          現(xiàn)代編程語言中,并發(fā)已經(jīng)成為必不可少的特性。現(xiàn)在絕大多數(shù)編程語言都有一些方法實(shí)現(xiàn)并發(fā)。

          其中一些實(shí)現(xiàn)方式非常強(qiáng)大,能將負(fù)載轉(zhuǎn)移到不同的系統(tǒng)線程,比如 Java 等;一些則在同一線程上模擬這種行為,比如 Ruby 等。

          Golang 的并發(fā)模型非常強(qiáng)大,稱為 CSP(通信順序進(jìn)程),它將一個(gè)問題分解成更小的順序進(jìn)程,然后調(diào)度這些進(jìn)程的實(shí)例(稱為 Goroutine)。這些進(jìn)程通過 channel 傳遞信息實(shí)現(xiàn)通信。

          本文,我們將探討如何利用 golang 的并發(fā)性,以及如何在 workerPool 使用。系列文章的第二篇,我們將探討如何構(gòu)建一個(gè)強(qiáng)大的并發(fā)解決方案。

          一個(gè)簡單的例子

          假設(shè)我們需要調(diào)用一個(gè)外部 API 接口,整個(gè)過程需要花費(fèi) 100ms。如果我們需要同步地調(diào)用該接口 1000 次,則需要花費(fèi) 100s。

          //// model/data.go

          package model

          type SimpleData struct {
           ID int
          }

          //// basic/basic.go

          package basic

          import (
           "fmt"
           "github.com/Joker666/goworkerpool/model"
           "time"
          )

          func Work(allData []model.SimpleData) {
           start := time.Now()
           for i, _ := range allData {
            Process(allData[i])
           }
           elapsed := time.Since(start)
           fmt.Printf("Took ===============> %s\n", elapsed)
          }

          func Process(data model.SimpleData) {
           fmt.Printf("Start processing %d\n", data.ID)
           time.Sleep(100 * time.Millisecond)
           fmt.Printf("Finish processing %d\n", data.ID)
          }

          //// main.go

          package main

          import (
           "fmt"
           "github.com/Joker666/goworkerpool/basic"
           "github.com/Joker666/goworkerpool/model"
           "github.com/Joker666/goworkerpool/worker"
          )

          func main() {
           // Prepare the data
           var allData []model.SimpleData
           for i := 0; i < 1000; i++ {
            data := model.SimpleData{ ID: i }
            allData = append(allData, data)
           }
           fmt.Printf("Start processing all work \n")

           // Process
           basic.Work(allData)
          }
          Start processing all work
          Took ===============> 1m40.226679665s

          上面的代碼創(chuàng)建了 model 包,包里包含一個(gè)結(jié)構(gòu)體,這個(gè)結(jié)構(gòu)體只有一個(gè) int 類型的成員。我們同步地處理 data,這顯然不是最佳方案,因?yàn)榭梢圆l(fā)處理這些任務(wù)。我們換一種方案,使用 goroutine 和 channel 來處理。

          異步

          //// worker/notPooled.go

          func NotPooledWork(allData []model.SimpleData) {
           start := time.Now()
           var wg sync.WaitGroup

           dataCh := make(chan model.SimpleData, 100)

           wg.Add(1)
           go func() {
            defer wg.Done()
            for data := range dataCh {
             wg.Add(1)
             go func(data model.SimpleData) {
              defer wg.Done()
              basic.Process(data)
             }(data)
            }
           }()

           for i, _ := range allData {
            dataCh <- allData[i]
           }

           close(dataCh)
           wg.Wait()
           elapsed := time.Since(start)
           fmt.Printf("Took ===============> %s\n", elapsed)
          }

          //// main.go

          // Process
          worker.NotPooledWork(allData)
          Start processing all work
          Took ===============> 101.191534ms

          上面的代碼,我們創(chuàng)建了容量 100 的緩存 channel,并通過 NoPooledWork() 將數(shù)據(jù) push 到 channel 里。channel 長度滿 100 之后,我們是無法再向其中添加元素直到有元素被讀取走。使用 for range 讀取 channel,并生成 goroutine 處理。這里我們沒有限制生成 goroutine 的數(shù)量,這可以盡可能多地處理任務(wù)。從理論上來講,在給定所需資源的情況下,可以處理盡可能多的數(shù)據(jù)。執(zhí)行代碼,完成 1000 個(gè)任務(wù)只花費(fèi)了 100ms。很瘋狂吧!不全是,接著往下看。

          問題

          除非我們擁有地球上所有的資源,否則在特定時(shí)間內(nèi)能夠分配的資源是有限的。一個(gè) goroutine 占用的最小內(nèi)存是 2k,但也能達(dá)到 1G。上述并發(fā)執(zhí)行所有任務(wù)的解決方案中,假設(shè)有一百萬個(gè)任務(wù),就會(huì)很快耗盡機(jī)器的內(nèi)存和 CPU。我們要么升級機(jī)器的配置,要么就尋找其他更好的解決方案。

          計(jì)算機(jī)科學(xué)家很久之前就考慮過這個(gè)問題,并提出了出色的解決方案 - 使用 Thread Pool 或者 Worker Pool。這個(gè)方案是使用 worker 數(shù)量受限的工作池來處理任務(wù),workers 會(huì)按順序一個(gè)接一個(gè)處理任務(wù),這樣就避免了 CPU 和內(nèi)存使用急速增長。

          解決方案:Worker Pool

          我們通過實(shí)現(xiàn) worker pool 來修復(fù)之前遇到的問題。

          //// worker/pooled.go

          func PooledWork(allData []model.SimpleData) {
           start := time.Now()
           var wg sync.WaitGroup
           workerPoolSize := 100

           dataCh := make(chan model.SimpleData, workerPoolSize)

           for i := 0; i < workerPoolSize; i++ {
            wg.Add(1)
            go func() {
             defer wg.Done()

             for data := range dataCh {
              basic.Process(data)
             }
            }()
           }

           for i, _ := range allData {
            dataCh <- allData[i]
           }

           close(dataCh)
           wg.Wait()
           elapsed := time.Since(start)
           fmt.Printf("Took ===============> %s\n", elapsed)
          }

          //// main.go

          // Process
          worker.PooledWork(allData)
          Start processing all work
          Took ===============> 1.002972449s

          上面的代碼,worker 數(shù)量限制在 100,我們創(chuàng)建了相應(yīng)數(shù)量的 goroutine 來處理任務(wù)。我們可以把 channel 看作是隊(duì)列,worker goroutine 看作是消費(fèi)者。多個(gè) goroutine 可以監(jiān)聽同一個(gè) channel,但是 channel 里的每一個(gè)元素只會(huì)被處理一次。

          Go 語言的 channel 可以當(dāng)作隊(duì)列使用。

          這是一個(gè)比較好的解決方案,執(zhí)行代碼,我們看到完成所有任務(wù)花費(fèi) 1s。雖然沒有 100ms 這么快,但已經(jīng)能滿足業(yè)務(wù)需要,而且我們得到了一個(gè)更好的解決方案,能將負(fù)載均攤在不同的時(shí)間片上。

          處理錯(cuò)誤

          我們能做的還沒完。上面看起來是一個(gè)完整的解決方案,但卻不是的,我們沒有處理錯(cuò)誤情況。所以需要模擬出錯(cuò)的情形,并且看下我們需要怎么處理。

          //// worker/pooledError.go

          func PooledWorkError(allData []model.SimpleData) {
           start := time.Now()
           var wg sync.WaitGroup
           workerPoolSize := 100

           dataCh := make(chan model.SimpleData, workerPoolSize)
           errors := make(chan error, 1000)

           for i := 0; i < workerPoolSize; i++ {
            wg.Add(1)
            go func() {
             defer wg.Done()

             for data := range dataCh {
              process(data, errors)
             }
            }()
           }

           for i, _ := range allData {
            dataCh <- allData[i]
           }

           close(dataCh)

           wg.Add(1)
           go func() {
            defer wg.Done()
            for {
             select {
             case err := <-errors:
              fmt.Println("finished with error:", err.Error())
             case <-time.After(time.Second * 1):
              fmt.Println("Timeout: errors finished")
              return
             }
            }
           }()

           defer close(errors)
           wg.Wait()
           elapsed := time.Since(start)
           fmt.Printf("Took ===============> %s\n", elapsed)
          }

          func process(data model.SimpleData, errors chan<- error) {
           fmt.Printf("Start processing %d\n", data.ID)
           time.Sleep(100 * time.Millisecond)
           if data.ID % 29 == 0 {
            errors <- fmt.Errorf("error on job %v", data.ID)
           } else {
            fmt.Printf("Finish processing %d\n", data.ID)
           }
          }

          //// main.go

          // Process
          worker.PooledWorkError(allData)

          我們修改了 process() 函數(shù),處理一些隨機(jī)的錯(cuò)誤并將錯(cuò)誤 push 到 errors chnanel 里。所以,為了處理并發(fā)出現(xiàn)的錯(cuò)誤,我們可以使用 errors channel 保存錯(cuò)誤數(shù)據(jù)。在所有任務(wù)處理完成之后,可以檢查錯(cuò)誤 channel 是否有數(shù)據(jù)。錯(cuò)誤 channel 里的元素保存了任務(wù) ID,方便需要的時(shí)候再處理這些任務(wù)。

          比之前沒處理錯(cuò)誤,很明顯這是一個(gè)更好的解決方案。但我們還可以做得更好,

          我們將在下篇文章討論如何編寫一個(gè)強(qiáng)大的 worker pool 包,并且在 worker 數(shù)量受限的情況下處理并發(fā)任務(wù)。

          總結(jié)

          Go 語言的并發(fā)模型足夠強(qiáng)大給力,只需要構(gòu)建一個(gè) worker pool 就能很好地解決問題而無需做太多工作,這就是它沒有包含在標(biāo)準(zhǔn)庫中的原因。但是,我們自己可以構(gòu)建一個(gè)滿足自身需求的方案。很快,我會(huì)在下一篇文章中講到,敬請期待!

          點(diǎn)擊【閱讀原文】直達(dá)代碼倉庫[1]

          參考資料

          [1]

          代碼倉庫: https://github.com/Joker666/goworkerpool?ref=hackernoon.com



          推薦閱讀


          福利

          我為大家整理了一份從入門到進(jìn)階的Go學(xué)習(xí)資料禮包,包含學(xué)習(xí)建議:入門看什么,進(jìn)階看什么。關(guān)注公眾號 「polarisxu」,回復(fù) ebook 獲取;還可以回復(fù)「進(jìn)群」,和數(shù)萬 Gopher 交流學(xué)習(xí)。

          瀏覽 56
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲欧洲视频在线观看 | 思思热高清无码播放 | 人妖操女人 | 香蕉网站视频婷婷 | 99精品A√ |