<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          它來了,關于Golang并發(fā)編程的超詳細教程!

          共 967字,需瀏覽 2分鐘

           ·

          2021-12-23 10:18


          導語?|?本文主要對go并發(fā)基礎庫、擴展以及三方庫的一些使用和技巧進行介紹, 并且指出一些常見問題,以及對一些并發(fā)庫的選擇和優(yōu)化進行分析和探討,為讀者提供一些相關經驗和交流分享。


          go原生/擴展庫


          提倡的原則


          不要通過共享內存進行通信;相反,通過通信來共享內存。


          [如何貫徹這個原則的demo1.3.5](#有鎖的地方就去用channel優(yōu)化)



          Goroutine


          (一)goroutine并發(fā)模型


          • 調度器主要結構


          主要調度器結構是M、P、G:


          • M,內核級別線程,goroutine基于M之上,代表執(zhí)行者,底層線程,物理線程。


          • P,處理器,用來執(zhí)行goroutine,因此維護了一個goroutine隊列,里面存儲了所有要執(zhí)行的goroutine,將等待執(zhí)行的G與M對接,它的數(shù)目也代表了真正的并發(fā)度( 即有多少個goroutine可以同時進行)。


          • G,goroutine實現(xiàn)的核心結構,相當于輕量級線程,里面包含了goroutine需要的棧,程序計數(shù)器,以及所在M的信息。


          P的數(shù)量由環(huán)境變量中的GOMAXPROCS決定,通常來說和核心數(shù)對應。


          • 映射關系


          用戶空間線程和內核空間線程映射關系有如下三種:


          • N:1

          • 1:1

          • M:N


          • 調度圖


          關系如圖,灰色的G則是暫時還未運行的,處于就緒態(tài),等待被調度,這個隊列被P維護



          注:?簡單調度圖如上,有關于P再多個M中切換,公共goroutine隊列,M從線程緩存中創(chuàng)建等步驟沒有體現(xiàn)。



          (二)goroutine使用


          • demo1


          go list.Sort()


          • demo2


          func Announce(message string, delay time.Duration) {  go func() {        time.Sleep(delay)        fmt.println(message)    }() }



          channel


          (一)channel特性


          創(chuàng)建


          // 創(chuàng)建 channela := make(chan int)b := make(chan int, 10)// 單向 channelc := make(chan<- int)d := make(<-chan int)


          存入/讀取/關閉



          tip:


          v, ok := <-a  // 檢查是否成功關閉(ok = false:已關閉)



          (二)channel使用/基礎


          • use channel


          ci := make(chan int)           cj := make(chan int, 0)         cs := make(chan *os.File, 100)


          c := make(chan int) go func() {    list.Sort()    c <- 1 }()doSomethingForValue<- c


          func Server(queue chan *Request) {  for req := range queue {      sem <- 1        go func() {            process(req)             <- sem         }()    }}


          func Server(queue chan *Requet) {    for req := range queue {      sem <- 1        go func(req *Request) {          process(req)            <- sem        }(req)    }}


          func Serve(queue chan *Request) {    for req := range queue {        req := req         sem <- 1      go func() {            process(req)          <-sem        }()    }}



          (三)channel使用/技巧


          等待一個事件,也可以通過close一個channel就足夠了


          c := make(chan bool)go func() {    // close 的 channel 會讀到一個零值    close(c)}()<-c


          阻塞程序


          開源項目【是一個支持集群的im及實時推送服務】里面的基準測試的案例



          取最快結果


          func main() {  ret := make(chan string, 3)  for i := 0; i < cap(ret); i++ {    go call(ret)  }    fmt.Println(<-ret)}func call(ret chan<- string) {  // do something  // ...  ret <- "result"}


          協(xié)同多個goroutines


          注:?協(xié)同多個goroutines方案很多,這里只展示channel的一種


          limits := make(chan struct{}, 2)for i := 0; i < 10; i++ {  go func() {        // 緩沖區(qū)滿了就會阻塞在這    limits <- struct{}{}    do()    <-limits  }()}


          搭配select操作


          for {      select {    case a := <- testChanA:        // todo a    case b, ok := testChanB:        // todo b, 通過 ok 判斷 tesChanB 的關閉情況    default:        // 默認分支    }}


          main go routinue確認worker goroutinue真正退出的方式


          func worker(testChan chan bool) {    for {      select {        // todo some    // case ...        case <- testChan:          testChan <- true          return      }  }}
          func main() { testChan := make(chan bool) go worker(testChan) testChan <- true <- testChan}


          關閉的channel不會被阻塞


          testChan := make(chan bool)close(testChan)
          zeroValue := <- testChanfmt.Println(zeroValue) // false
          testChan <- true // panic: send on closed channel


          注:?如果是buffered channel,即使被close,也可以讀到之前存入的值,讀取完畢后開始讀零值,寫入則會觸發(fā)panic


          nil channel讀取和存入都不會阻塞,close會panic


          range遍歷channel


          for rangec := make(chan int, 20)go func() {  for i := 0; i < 10; i++ {    c <- i  }  close(c)}()// 當 c 被關閉后,取完里面的元素就會跳出循環(huán)for x := range c {  fmt.Println(x)}


          例: 唯一id


          func newUniqueIdService() <-chan string {  id := make(chan string)  go func() {    var counter int64 = 0    for {      id <- fmt.Sprintf("%x", counter)      counter += 1    }  }()  return id}func newUniqueIdServerMain()  {  id := newUniqueIdService()  for i := 0; i < 10; i++ {    fmt.Println(<- id)  }}


          帶緩沖隊列構造


          超時timeout和心跳heart beat


          超時控制


          func main() {  done := do()  select {  case <-done:    // logic  case <-time.After(3 * time.Second):    // timeout  }}


          demo


          開源im/goim項目中的應用



          心跳


          done := make(chan bool)  defer func() {    close(done)  }()  ticker := time.NewTicker(10 * time.Second)  go func() {    for {      select {      case <-done:        ticker.Stop()        return      case <-ticker.C:        message.Touch()      }    }  }()}


          多個goroutine同步響應


          func main() {  c := make(chan struct{})  for i := 0; i < 5; i++ {    go do(c)  }  close(c)}func do(c <-chan struct{}) {    // 會阻塞直到收到 close  <-c  fmt.Println("hello")}


          利用channel阻塞的特性和帶緩沖的channel來實現(xiàn)控制并發(fā)數(shù)量


          func channel() {    count := 10 // 最大并發(fā)  sum := 100  // 總數(shù)
          c := make(chan struct{}, count) sc := make(chan struct{}, sum) defer close(c) defer close(sc)
          for i:=0; i c <- struct{} go func(j int) { fmt.Println(j) <- c // 執(zhí)行完畢,釋放資源 sc <- struct {}{} // 記錄到執(zhí)行總數(shù) } }
          for i:=sum; i>0; i++ { <- sc } }



          go并發(fā)編程(基礎庫)


          注:這塊東西為什么放到channel之后,因為這里包含了一些低級庫,實際業(yè)務代碼中除了context之外用到都較少(比如一些鎖mutex,或者一些原子庫atomic),實際并發(fā)編程代碼中可以用channel就用channel,這也是go一直比較推崇得做法Share memory by communicating;don’t communicate by sharing memory。


          (一)Mutex/RWMutex


          鎖,使用簡單,保護臨界區(qū)數(shù)據。使用的時候注意鎖粒度,每次加鎖后都要記得解鎖。



          • Mutex demo


          package main
          import ( "fmt" "sync" "time")
          func main() { var mutex sync.Mutex wait := sync.WaitGroup{}
          now := time.Now() for i := 1; i <= 3; i++ { wait.Add(1) go func(i int) { mutex.Lock() time.Sleep(time.Second) mutex.Unlock() defer wait.Done() }(i) } wait.Wait() duration := time.Since(now) fmt.Print(duration)}


          結果:?可以看到整個執(zhí)行持續(xù)了3s多,內部多個協(xié)程已經被 “鎖” 住了。




          • RWMutex demo


          注意:?這東西可以并發(fā)讀,不可以并發(fā)讀寫/并發(fā)寫寫,不過現(xiàn)在即便場景是讀多寫少也很少用到這,一般集群環(huán)境都得分布式鎖了。


          package main
          import ( "fmt" "sync" "time")
          var m *sync.RWMutex
          func init() { m = new(sync.RWMutex)}
          func main() { go read() go read() go write()
          time.Sleep(time.Second * 3)}
          func read() { m.RLock() fmt.Println("startR") time.Sleep(time.Second) fmt.Println("endR") m.RUnlock()}func write() { m.Lock() fmt.Println("startW") time.Sleep(time.Second) fmt.Println("endW") m.Unlock()}


          輸出:




          (二)Atomic


          • 可以對簡單類型進行原子操作


          • int32

          • int64

          • uint32

          • uint64

          • uintptr

          • unsafe.Pointer


          • 可以進行得原子操作如下:


          • 增/減

          • 比較并且交換假定被操作的值未曾被改變, 并一旦確定這個假設的真實性就立即進行值替換

          • 載入為了原子的讀取某個值(防止寫操作未完成就發(fā)生了一個讀操作)

          • 存儲原子的值存儲函數(shù)

          • 交換原子交換


          demo: 增


          ??package?main
          import ( "fmt" "sync" "sync/atomic")
          func main() { var sum uint64
          var wg sync.WaitGroup
          for i := 0; i < 100; i++ { wg.Add(1) go func() { for c := 0; c < 100; c++ { atomic.AddUint64(&sum, 1) } defer wg.Done() }() }
          wg.Wait() fmt.Println(sum)}


          結果:




          (三)WaitGroup/ErrGroup


          waitGroup是一個waitGroup對象可以等待一組goroutinue結束,但是他對錯誤傳遞,goroutinue出錯時不再等待其他goroutinue(減少資源浪費) 都不能很好的解決,那么errGroup可以解決這部分問題。


          注意


          • errGroup中如果多個goroutinue錯誤,只會獲取第一個出錯的goroutinue的錯誤信息,后面的則不會被感知到。


          • errGroup里面沒有做panic處理,代碼要保持健壯。


          demo: errGroup


          package main
          import ( "golang.org/x/sync/errgroup" "log" "net/http")
          func main() { var g errgroup.Group var urls = []string{????"https://github.com/", "errUrl", } for _, url := range urls { url := url g.Go(func() error { resp, err := http.Get(url) if err == nil { _ = resp.Body.Close() } return err }) } err := g.Wait() if err != nil { log.Fatal("getErr", err) return }}


          結果:




          (四)once


          保證了傳入的函數(shù)只會執(zhí)行一次,這常用在單例模式,配置文件加載,初始化這些場景下。


          demo:


          times := 10  var (    o  sync.Once    wg sync.WaitGroup  )  wg.Add(times)  for i := 0; i < times; i++ {    go func(i int) {      defer wg.Done()      o.Do(func() {        fmt.Println(i)      })    }(i)  }??wg.Wait()


          結果:




          (五)Context


          go開發(fā)已經對他了解了太多,可以再多個goroutinue設置截止日期,同步信號,傳遞相關請求值。


          對他的說明文章太多了,本文對此不作多說明。


          這邊列一個遇到的問題


          grpc多服務調用,級聯(lián)cancel

          A->B->C

          A調用B,B調用C,當A不依賴B請求C得結果時,B請求C之后直接返回A,那么A,B間context被cancel,而C得context也是繼承于前面,C請求直接掛掉,只需要重新搞個context向下傳就好,記得帶上reqId, logId等必要信息。



          并行


          某些計算可以再CPU之間并行化,如果計算可以被劃分為不同的可獨立執(zhí)行的部分,那么他就是可并行化的,任務可以通過一個channel發(fā)送結束信號。


          假如我們可以再數(shù)組上進行一個比較耗時的操作,操作的值在每個數(shù)據上獨立,如下:


          type vector []float64
          func (v vector) DoSome(i, n int, u Vector, c chan int) { for ; i < n; i ++ { v[i] += u.Op(v[i]) } c <- 1 }


          我們可以再每個CPU上進行循環(huán)無關的迭代計算,我們僅需要創(chuàng)建完所有的goroutine后,從channel中讀取結束信號進行計數(shù)即可。


          (一)并發(fā)編程/工作流方案


          這部分如需自己開發(fā),內容其實可以分為兩部分能力去做:


          • 并發(fā)編程增強方案


          • 工作流解決方案


          需要去解決一些基礎問題


          并發(fā)編程:


          • 啟動goroutine時,增加防止程序panic能力


          • 去封裝一些更簡單的錯誤處理方案,比如支持多個錯誤返回


          • 限定任務的goroutine數(shù)量


          工作流:


          • 在每個工作流執(zhí)行到下一步前先去判斷上一步的結果


          • 工作流內嵌入一些攔截器



          (二)singlelFlight(go官方擴展同步包)


          一般系統(tǒng)重要的查詢增加了緩存后,如果遇到緩存擊穿,那么可以通過任務計劃,加索等方式去解決這個問題,singleflight這個庫也可以很不錯的應對這種問題。


          它可以獲取第一次請求得結果去返回給相同得請求。核心方法Do執(zhí)行和返回給定函數(shù)的值,確保某一個時間只有一個方法被執(zhí)行。如果一個重復的請求進入,則重復的請求會等待前一個執(zhí)行完畢并獲取相同的數(shù)據,返回值shared標識返回值v是否是傳遞給重復的調用請求。


          一句話形容他的功能,它可以用來歸并請求,但是最好加上超時重試等機制,防止第一個執(zhí)行得請求出現(xiàn)超時等異常情況導致同時間大量請求不可用。


          場景:?數(shù)據變化量小(key變化不頻繁,重復率高),但是請求量大的場景。


          demo


          package main
          import ( "golang.org/x/sync/singleflight" "log" "math/rand" "sync" "time")
          var ( g singleflight.Group)
          const ( funcKey = "key" times = 5 randomNum = 100)
          func init() { rand.Seed(time.Now().UnixNano())}
          func main() { var wg sync.WaitGroup wg.Add(times)
          for i := 0; i < times; i++ { go func() { defer wg.Done() num, err := run(funcKey) if err != nil { log.Fatal(err) return } log.Println(num) }() } wg.Wait()}
          func run(key string) (num int, err error) { v, err, isShare := g.Do(key, func() (interface{}, error) { time.Sleep(time.Second * 5) num = rand.Intn(randomNum) //[0,100) return num, nil }) if err != nil { log.Fatal(err) return 0, err } data := v.(int) log.Println(isShare) return data, nil}


          連續(xù)執(zhí)行3次,返回結果如下,全部取了共享得結果:



          但是注釋掉time.Sleep(time.Second*5)?再嘗試一次看看



          這次全部取得真實值。


          實踐:伙伴部門高峰期可以減少20%的Redis調用,大大減少了Redis的負載。



          實踐


          (一)開發(fā)案例


          注:?下面用到的方案因為開發(fā)時間較早,并不一定是以上多種方案中最優(yōu)的,選擇有很多種,使用那種方案只有有所考慮可以自圓其說即可。


          建議:?項目中逐漸形成統(tǒng)一解決方案,從混亂到統(tǒng)一,逐漸小團隊內對此類邏輯形成統(tǒng)一的一個解決標準,而不是大家對需求之外的控制代碼寫出各式各樣的控制邏輯。


          • 批量三要素校驗


          • 場景


          三要素批量校驗接口限頻單賬戶最高100qps/s,整個系統(tǒng)多個校驗場景公有一個賬戶。


          限頻需要限制批量校驗最高為50~80qps/s(需要預留令牌供其他場景使用,否則頻繁調用批量接口時候其他場景均會失敗限頻)


          • 設計


          • 使用go routine來并發(fā)進行三要素校驗,因為go routinue,所以每次開啟50~80 go routine同時進行單次三要素校驗。


          • 每輪校驗耗時1s,如果所有routinue校驗后與校驗開始時間間隔不滿一秒,則需要主動程序睡眠至1s,然后開始下輪校驗。


          • 因為只是校驗場景,如果某次校驗失敗,最容易的原因其實是校驗方異常,或者被其他校驗場景再當前1s內消耗過多令牌。


          那么整個批量接口返回err,運營同學重新發(fā)起就好。


          • 代碼


          代碼需要進行的優(yōu)化點


          • 加鎖(推薦使用,最多不到100的競爭者數(shù)目,使用鎖性能影響微乎其微);


          • 給每個傳入routine的element數(shù)組包裝,增加一個key屬性,每個返回的result包含key通過key映射可以得到需要的一個順序。


          sleep 1s這個操作可以從調用前開始計時,調用完成后不滿1s補充至1s,而不是每次最長調用時間elapsedTime+1s;


          通道中獲取的三要素校驗結果順序和入參數(shù)據數(shù)組順序不對應,這里通過兩種方案


          分組調用getElementResponseConcurrent方法時,傳入切片可以省略部分計算,直接使用切片表達式。



          elementNum := len(elements)m := elementNum / 80n := elementNum % 80if m < 1 {        if results, err := getElementResponseConcurrent(ctx, elements, conn, caller); err != nil {                return nil, err        } else {                response.Results = results                return response, nil        }} else {        results := make([]int64, 0)        if n != 0 {                m = m + 1        }        var result []int64        for i := 1; i <= m; i++ {                if i == m {                        result, err = getElementResponseConcurrent(ctx, elements[(i-1)*80:(i-1)*80+n], conn, caller)                } else {                        result, err = getElementResponseConcurrent(ctx, elements[(i-1)*80:i*80], conn, caller)                }                if err != nil {                        return nil, err                }                results = append(results, result...)        }        response.Results = results}
          // getElementResponseConcurrentfunc getElementResponseConcurrent(ctx context.Context, elements []*api.ThreeElements, conn *grpc.ClientConn, caller *api.Caller) ([]int64, error) { results := make([]int64, 0)
          var chResult = make(chan int64) chanErr := make(chan error) defer close(chanErr) wg := sync.WaitGroup{}
          faceIdClient := api.NewFaceIdClient(conn) for _, element := range elements { wg.Add(1) go func(element *api.ThreeElements) { param := element.Param verificationRequest := &api.CheckMobileVerificationRequest{ Caller: caller, Param: param, } if verification, err := faceIdClient.CheckMobileVerification(ctx, verificationRequest); err != nil { chanErr <- err return } else { result := verification.Result chanErr <- nil chResult <- result } defer wg.Done() }(element) }
          for i := 0; i < len(elements); i++ { if err := <-chanErr; err != nil { return nil, err } var result = <-chResult results = append(results, result) } wg.Wait() time.Sleep(time.Second) return results, nil }


          • 歷史數(shù)據批量標簽


          場景:產品上線一年,逐步開始做數(shù)據分析和統(tǒng)計需求提供給運營使用,接入Tdw之前是直接采用接口讀歷史表進行的數(shù)據分析,涉及全量用戶的分析給用戶記錄打標簽,數(shù)據效率較低,所以采用并發(fā)分組的思想,考慮協(xié)程比較輕量,從開始上線時間節(jié)點截止當前時間分共100組,代碼較為簡單。



          問題本次接口不是上線最終版,核心分析方法僅測試環(huán)境少量數(shù)據就會有N多條慢查詢,依賴得外部分析方法涉及多條查詢且沒走索引,加了索引后,線上數(shù)據預估也在1h之內跑完,所以線上最終還是串行,防止線上數(shù)據量較大還有慢查詢存在cpu打滿。


          func (s ServiceOnceJob) CompensatingHistoricalLabel(ctx context.Context,        request *api.CompensatingHistoricalLabelRequest) (response *api.CompensatingHistoricalLabelResponse, err error) {        if request.Key != interfaceKey {                return nil, transform.Simple("err")        }    ctx, cancelFunc := context.WithCancel(ctx)var (        wg = new(sync.WaitGroup)        userRegisterDb = new(datareportdb.DataReportUserRegisteredRecords)        startNum = int64(0))wg.Add(1)
          countHistory, err := userRegisterDb.GetUserRegisteredCountForHistory(ctx, historyStartTime, historyEndTime)if err != nil { return nil, err}
          div := decimal.NewFromFloat(float64(countHistory)).Div(decimal.NewFromFloat(float64(theNumberOfConcurrent)))f, _ := div.Float64()num := int64(math.Ceil(f))
          for i := 0; i < theNumberOfConcurrent; i++ { go func(startNum int64) { defer wg.Done() for { select { case <- ctx.Done(): return default: userDataArr, err := userRegisterDb.GetUserRegisteredDataHistory(ctx, startNum, num) if err != nil { cancelFunc() } for _, userData := range userDataArr { if err := analyseUserAction(userData); err != nil { cancelFunc() } } } } }(startNum) startNum = startNum + num}wg.Wait()
          return response, nil}


          • 批量發(fā)起/批量簽署


          實現(xiàn)思路和上面其實差不多,都是需要支持批量的特性,基本上現(xiàn)在業(yè)務中統(tǒng)一使用多協(xié)程處理。



          思考


          (一)golang協(xié)程很牛,協(xié)程的數(shù)目最大到底多大合適,有什么衡量指標么?


          衡量指標,協(xié)程數(shù)目衡量


          這邊收集碼客等平臺的回答基本上可以這樣理解這件事:


          • 不要一個請求spawn出太多請求,指數(shù)級增長。這一點,在第二點會受到加強。


          • 當你生成goroutines,需要明確他們何時退出以及是否退出,良好管理每個goroutines。


          盡量保持并發(fā)代碼足夠簡單,這樣grroutines得生命周期就很明顯了,如果沒做到,那么要記錄下異常goroutine退出的時間和原因。


          • 數(shù)目的話應該需要多少搞多少,擴增服務而不是限制,限制一般或多或少都會不合理,不僅delay更會造成擁堵。


          • 注意協(xié)程泄露問題,關注服務的指標。



          (二)使用鎖時候正確釋放鎖的方式


          任何情況使用鎖一定要切記鎖的釋放,任何情況!任何情況!任何情況!


          即便是panic時也要記得鎖的釋放,否則可以有下面的情況:


          • 代碼庫提供給他人使用,出現(xiàn)panic時候被外部recover,這時候就會導致鎖沒釋放



          (三)goroutine泄露預防與排查


          一個goroutine啟動后沒有正常退出,而是直到整個服務結束才退出,這種情況下,goroutine無法釋放,內存會飆高,嚴重可能會導致服務不可用。


          goroutine的退出其實只有以下幾種方式可以做到:


          • main函數(shù)退出

          • context通知退出

          • goroutine panic退出

          • goroutine 正常執(zhí)行完畢退出


          大多數(shù)引起goroutine泄露的原因基本上都是如下情況:


          • channel阻塞,導致協(xié)程永遠沒有機會退出

          • 異常的程序邏輯(比如循環(huán)沒有退出條件)


          杜絕:


          想要杜絕這種出現(xiàn)泄露的情況,需要清楚的了解channel再goroutine中的使用,循環(huán)是否有正確的跳出邏輯。


          排查:


          • go pprof工具

          • runtime.NumGoroutine()判斷實時協(xié)程數(shù)

          • 第三方庫


          案例:


          package main
          import ( "fmt" "net/http" _ "net/http/pprof" "runtime" "time")
          func toLeak() { c := make(chan int) go func() { <-c }()}
          func main() { go toLeak()
          go func() { _ = http.ListenAndServe("0.0.0.0:8080", nil) }()
          c := time.Tick(time.Second) for range c { fmt.Printf("goroutine [nums]: %d\n", runtime.NumGoroutine()) }}


          輸出:



          pprof:


          http://127.0.0.1:8080/debug/pprof/goroutine?debug=1



          復雜情況也可以用其他的可視化工具:


          go tool pprof -http=:8001?http://127.0.0.1:8080/debug/pprof/goroutine?debug=1




          (四)父協(xié)程捕獲子協(xié)程panic


          使用方便,支持鏈式調用


          父協(xié)程捕獲子協(xié)程panic(https://taoshu.in/go/safe-goroutine.html)



          (五)有鎖的地方就去用channel優(yōu)化


          有鎖的地方就去用channel優(yōu)化,這句話可能有點絕對,肯定不是所有場景都可以做到,但是大多數(shù)場景絕X是可以的,干掉鎖去使用channel優(yōu)化代碼進行解耦絕對是一個有趣的事情。


          分享一個很不錯的優(yōu)化demo:


          場景


          • 一個簡單的即時聊天室,支持連接成功的用戶收發(fā)消息,使用socket


          • 客戶端發(fā)送消息到服務端,服務端可以發(fā)送消息到每一個客戶端


          分析


          • 需要一個鏈接池保存每一個客戶端


          • 客戶端發(fā)送消息到服務端,服務端遍歷鏈接池發(fā)送給各個客戶端(用戶斷開鏈接,需要移除鏈接池的對應鏈接,否則會發(fā)送發(fā)錯;遍歷發(fā)送消息,需要再goroutine中發(fā)送,不應該被阻塞)

          問題


          上述有個針對鏈接池的并發(fā)操作


          解決


          引入鎖


          增加鎖機制,解決針對鏈接池的并發(fā)問題


          發(fā)送消息也需要去加鎖因為要防止出現(xiàn)panic: concurrent write to websocket connection


          導致的問題


          假設網絡延時,用戶新增時候還有消息再發(fā)送中,新加入的用戶就無法獲得鎖了,后面其他的相關操作都會被阻塞導致問題


          使用channel優(yōu)化:


          引入channel


          • 新增客戶端集合,包含三個通道


          鏈接新增通道registerChan,鏈接移除通道unregisterChan,發(fā)送消息通道m(xù)essageChan


          • 使用通道


          • 新增鏈接,鏈接丟入registerChan

          • 移除鏈接,鏈接丟入unregisterChan

          • 消息發(fā)送,消息丟入messageChan


          • 通道消息方法,代碼來自于開源項目簡單聊天架構演變


          // 處理所有管道任務func (room *Room) ProcessTask() {  log := zap.S()  log.Info("啟動處理任務")  for {    select {    case c := <-room.register:      log.Info("當前有客戶端進行注冊")      room.clientsPool[c] = true    case c := <-room.unregister:      log.Info("當前有客戶端離開")      if room.clientsPool[c] {        close(c.send)        delete(room.clientsPool, c)      }    case m := <-room.send:      for c := range room.clientsPool {        select {        case c.send <- m:        default:          break        }      }    }  }}


          結果:成功使用channel替換了鎖。


          參考資料:

          1.父協(xié)程捕獲子協(xié)程 panic

          2.啟發(fā)代碼 1: 微服務框架?啟發(fā)代碼 2: 同步/異步工具包

          3.goroutine 如何實現(xiàn)

          4.從簡單的即時聊天來看架構演變(simple-chatroom)



          ?作者簡介


          國利鵬

          騰訊電子簽開放平臺中心后臺工程師

          騰訊電子簽開放平臺中心后臺工程師,主要負責騰訊電子簽后端開發(fā)工作,有豐富的電子簽署相關工作經驗。



          ?推薦閱讀


          有的放矢,遠程操控中實時音視頻的優(yōu)化之道

          TVP三周年:聚力成長,共赴新篇!

          代碼質量第5層-只是實現(xiàn)了功能

          萬物智聯(lián)下,騰訊云IoT的差異化發(fā)展之路“新”在何處?



          瀏覽 98
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  操逼黄网| 久久国产精彩视频 | 蘑菇视频在线观看隐藏线路 | 亚洲无码中文字幕在线播放 | 狠狠干天天日 |