<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          在 net/http 標準庫做了點手腳

          共 1898字,需瀏覽 4分鐘

           ·

          2021-01-09 21:25

          Golang標準庫搭建的http服務端會為每一個請求創(chuàng)建一個協程去處理,雖然每個協程占有的棧空間很小,但是如果萬一來個數百萬千萬的請求(當然,這種可能性有點極端),服務端只能對每一條請求乖乖創(chuàng)建一個協程,這時候,該go進程就存在大量的goroutine,占用服務器資源不說,還會增大gc壓力。這時候就想給該機制加一個限制,搞一個協程池限制一下最大處理請求的協程數量。


          瀏覽一下標準庫該部分的源碼實現


          func?(srv *Server)?Serve(l net.Listener)?error?{
          ??.....

          ??ctx := context.WithValue(baseCtx, ServerContextKey, srv)
          ??for?{
          ? ? //等待建立連接,沒有請求則會阻塞住
          ????rw, err := l.Accept()
          ????if?err !=?nil?{......}
          ????connCtx := ctx
          ????if?cc := srv.ConnContext; cc !=?nil?{......}
          ? ? ......
          ? ??//開啟協程處理請求,主要需要改造的就在此處????go?c.serve(connCtx)
          ??}
          }


          主要需要針對創(chuàng)建協程加一個限制條件,如果小于協程池規(guī)定的數量就允許創(chuàng)建,否則等待協程池有空閑位再創(chuàng)建。



          01

          大致思路



          總體使用生產者消費者模式。使用兩個有緩沖區(qū)的channel來實現協程的并發(fā)控制,一個sigChannel通過緩沖空間限制最大的協程數量,另一個jobChannel則用于傳遞請求的數據(包括請求函數以及參數),該jobChannel對于是否緩沖沒有要求。


          流程


          (1)首先當請求到來之后,往sigChannel中寫入標志位數據,如果此時有空閑位置,則不會阻塞在此處;

          (2)之后往jobChannel中寫入要執(zhí)行的函數以及參數;

          (3)后臺監(jiān)聽jobChannel的函數worker(該函數要源源不斷讀取管道數據)則會取出管道中的數據;

          (4)worker創(chuàng)建goroutine執(zhí)行請求函數;

          (5)該請求函數執(zhí)行完成后,goroutine再去取出sigChannel管道中的標志數據,騰出來位置;

          注:如果開始時候sigChannel寫數據寫入不了,則說明該池子滿了,則需要阻塞等待。這樣就實現了使用sigChannel控制并發(fā)量的功能。


          02

          代碼實現



          接下來使用代碼實現這種思想


          1、首先把net/http包中的代碼給保存一份,防止被搞壞。直接在目錄下搞了一個git倉庫,先把源碼commit一次,再搞一個分支自己瞎搞著玩,。在net/http下建了一個放協程池函數的文件夾,創(chuàng)建一個go文件。


          2、首先定義兩個channel,一個用來存放信號,一個存放函數以及參數,結合到http處理這里


          type?Info?struct?{? //函數名稱,對應http中c.serve()函數
          ??ParamFunc?func(ctx context.Context)
          ? //函數的參數,對應c.serve()的connCtx參數
          ??Param?context.Context
          }

          type?Task?struct
          ?{
          ? //用于傳遞函數以及參數的管道,對應jobChannel
          ??taskLet?chan?Info
          ? //用于傳遞信號量的管道
          ??taskCmp?chan?int64
          }

          type?Pool?struct?{
          ??//兩個管道對應的結構體
          ??tasks *Task?
          ??//協程池容量
          ??taskNum?int64?
          }


          3、創(chuàng)建一個協程池對象,也就是初始化這兩個管道


          func?NewPool(n?int64)?*Pool?{
          ??taskc :=?make(chan?Info,n)
          ??workc :=?make(chan?int64,n)
          ??return?&Pool{
          ????tasks: &Task{
          ??????taskLet: taskc,
          ??????taskCmp: workc,
          ????},
          ????taskNum: n,
          ??}
          }


          4、創(chuàng)建一個put函數,用于往兩個channel中塞數據,即生產者


          func?(p *Pool)?Put(a Info)??{
          ? //在sigChannel中塞數據,如果阻塞說明沒有空閑
          ? p.tasks.taskCmp <-?1??//在jobChannel中塞數據??? p.tasks.taskLet <- a
          }


          5、創(chuàng)建一個run函數,用于監(jiān)聽管道并取出數據,即消費者


          func?(p *Pool)?Run()??{
          ?//持續(xù)監(jiān)聽jobChannel管道,只要有數據監(jiān)聽到則說明已經有空閑位了,
          ?//需要創(chuàng)建goroutine執(zhí)行傳來的函數以及參數
          ??for?{
          ????select?{
          ????case?let := <- p.tasks.taskLet:
          ??????go?p.Work(let)
          ????}
          ??}
          }

          func?(p *Pool)?Work(f Info)??{
          ??//執(zhí)行傳入的函數
          ? f.ParamFunc(f.Param)??//執(zhí)行完函數后把sigChannel中標志位取出??<- p.tasks.taskCmp
          }


          6、修改源碼,需要修改的代碼加到server.go中


          func?(srv *Server)?Serve(l net.Listener)?error?{
          ??.....
          ??//初始化一個連接池
          ? po := currencyctl.NewPool(srv.CorrencyNum)
          ??//異步開啟這個池子,否則會阻塞
          ? go po.Run()
          ??ctx := context.WithValue(baseCtx, ServerContextKey, srv)
          ??for?{
          ????//等待建立連接,沒有請求則會阻塞住
          ????rw, err := l.Accept()
          ????if?err !=?nil?{......}
          ????connCtx := ctx
          ????if?cc := srv.ConnContext; cc !=?nil?{......}
          ????......
          ????//go c.serve(connCtx)
          ? ? //改造成協程池
          ? ? po.Put(currencyctl.Info{ParamFunc:c.serve,Param:connCtx})
          ??}
          }


          我將處理并發(fā)數量的參數放到了server結構體中,通過http.ListenAndServe()方法傳遞并在下一次賦值。


          03

          測試階段



          接下來跑一個測試用例:


          測試代碼很簡單,如下:


          package?main
          import?(
          ??"fmt"
          ??"net/http"
          ??_"net/http/pprof"
          ??"time"
          )
          func?main()?{
          ??go?func()?{//使用pprof跟蹤
          ????http.ListenAndServe(":6060",nil,10)
          ??}()
          ??http.HandleFunc("/",?func(writer http.ResponseWriter, request *http.Request)?{
          ????fmt.Println("收到請求。。。")
          ????time.Sleep(time.Second*1)
          ????writer.Write([]byte("hello http"))
          ??})
          ??http.ListenAndServe(":8000",?nil,100)//限制最大并發(fā)量100
          }


          啟動項目,做一個壓力測試(這里我是用了go-stress-testing工具):


          使用并發(fā)請求量為1000時候,查看pprof工具,查看系統協程數,控制在了100左右。


          設置協程池協程量為200時候,使用1000并發(fā)請求,看到協程量控制在200



          經過驗證,該協程池在net/http標準庫上的應用基本成功了,但是只是測試了一個簡單的接口,沒有經過復雜的業(yè)務驗證,可能存在好多未知問題。

          所以,又乖乖git checkout切到了原始分支。



          瀏覽 56
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  99综合97 | 大香蕉首页 | 国产黄色免费小视频 | 中文字幕+乱码+中文ktv | 色婷激情五月 |