<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          如何使用 Go 每分鐘處理百萬請求?

          共 5132字,需瀏覽 11分鐘

           ·

          2021-04-29 17:13

          偶然間看到一篇寫于15年的文章,說實(shí)話,標(biāo)題確實(shí)吸引了我。 

          關(guān)于這篇文章,我就不直接翻譯了,原文地址我放在文章最后了。 

          項(xiàng)目的需求就是很簡單,客戶端發(fā)送請求,服務(wù)端接收請求處理數(shù)據(jù)(原文是把資源上傳至 Amazon S3 資源中)。本質(zhì)上就是這樣,

          我稍微改動了原文的業(yè)務(wù)代碼,但是并不影響核心模塊。在第一版中,每收到一個 Request,開啟一個 G 進(jìn)行處理,很常規(guī)的操作。

          初版

          package main
          import ( "fmt" "log" "net/http" "time")
          type Payload struct { // 傳啥不重要}
          func (p *Payload) UpdateToS3() error { //存儲邏輯,模擬操作耗時 time.Sleep(500 * time.Millisecond) fmt.Println("上傳成功") return nil}
          func payloadHandler(w http.ResponseWriter, r *http.Request) { // 業(yè)務(wù)過濾 // 請求body解析...... var p Payload go p.UpdateToS3() w.Write([]byte("操作成功"))}
          func main() { http.HandleFunc("/payload", payloadHandler) log.Fatal(http.ListenAndServe(":8099", nil))}


          這樣操作存在什么問題呢?一般情況下,沒什么問題。但是如果是高并發(fā)的場景下,不對 G 進(jìn)行控制,你的 CPU 使用率暴漲,內(nèi)存占用暴漲......,直至程序奔潰。


          如果此操作落地至數(shù)據(jù)庫,例如mysql。相應(yīng)的,你數(shù)據(jù)庫服務(wù)器的磁盤IO、網(wǎng)絡(luò)帶寬 、CPU負(fù)載、內(nèi)存消耗都會達(dá)到非常高的情況,一并奔潰。所以,一旦程序中出現(xiàn)不可控的事物,往往是危險的信號。

          中版

          package main
          import ( "fmt" "log" "net/http" "time")
          const MaxQueue = 400
          var Queue chan Payload
          func init() { Queue = make(chan Payload, MaxQueue)}
          type Payload struct { // 傳啥不重要}
          func (p *Payload) UpdateToS3() error { //存儲邏輯,模擬操作耗時 time.Sleep(500 * time.Millisecond) fmt.Println("上傳成功") return nil}
          func payloadHandler(w http.ResponseWriter, r *http.Request) { // 業(yè)務(wù)過濾 // 請求body解析...... var p Payload //go p.UpdateToS3() Queue <- p w.Write([]byte("操作成功"))}
          // 處理任務(wù)func StartProcessor() { for { select { case payload := <-Queue: payload.UpdateToS3() } }}
          func main() { http.HandleFunc("/payload", payloadHandler) //單獨(dú)開一個g接收與處理任務(wù) go StartProcessor() log.Fatal(http.ListenAndServe(":8099", nil))}

          這一版借助帶 buffered 的 channel 來完成這個功能,這樣控制住了無限制的G,但是依然沒有解決問題。

          原因是處理請求是一個同步的操作,每次只會處理一個任務(wù),然而高并發(fā)下請求進(jìn)來的速度會遠(yuǎn)遠(yuǎn)超過處理的速度。這種情況,一旦 channel 滿了之后, 后續(xù)的請求將會被阻塞等待。然后你會發(fā)現(xiàn),響應(yīng)的時間會大幅度的開始增加, 甚至不再有任何的響應(yīng)。

          終版

          package main
          import ("fmt""log""net/http""time")
          const ( MaxWorker = 100 //隨便設(shè)置值 MaxQueue = 200 // 隨便設(shè)置值)
          // 一個可以發(fā)送工作請求的緩沖 channelvar JobQueue chan Job
          func init() { JobQueue = make(chan Job, MaxQueue)}
          type Payload struct{}
          type Job struct { PayLoad Payload}
          type Worker struct { WorkerPool chan chan Job JobChannel chan Job quit chan bool}
          func NewWorker(workerPool chan chan Job) Worker { return Worker{ WorkerPool: workerPool, JobChannel: make(chan Job), quit: make(chan bool), }}
          // Start 方法開啟一個 worker 循環(huán),監(jiān)聽退出 channel,可按需停止這個循環(huán)func (w Worker) Start() { go func() { for { // 將當(dāng)前的 worker 注冊到 worker 隊列中 w.WorkerPool <- w.JobChannel select { case job := <-w.JobChannel: // 真正業(yè)務(wù)的地方 // 模擬操作耗時 time.Sleep(500 * time.Millisecond) fmt.Printf("上傳成功:%v\n", job) case <-w.quit: return } } }()}
          func (w Worker) stop() { go func() { w.quit <- true }()}
          // 初始化操作
          type Dispatcher struct { // 注冊到 dispatcher 的 worker channel 池 WorkerPool chan chan Job}
          func NewDispatcher(maxWorkers int) *Dispatcher { pool := make(chan chan Job, maxWorkers) return &Dispatcher{WorkerPool: pool}}
          func (d *Dispatcher) Run() { // 開始運(yùn)行 n 個 worker for i := 0; i < MaxWorker; i++ { worker := NewWorker(d.WorkerPool) worker.Start() } go d.dispatch()}
          func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: go func(job Job) { // 嘗試獲取一個可用的 worker job channel,阻塞直到有可用的 worker jobChannel := <-d.WorkerPool // 分發(fā)任務(wù)到 worker job channel 中 jobChannel <- job }(job) } }}
          // 接收請求,把任務(wù)篩入JobQueue。func payloadHandler(w http.ResponseWriter, r *http.Request) { work := Job{PayLoad: Payload{}} JobQueue <- work _, _ = w.Write([]byte("操作成功"))}
          func main() { // 通過調(diào)度器創(chuàng)建worker,監(jiān)聽來自 JobQueue的任務(wù) d := NewDispatcher(MaxWorker) d.Run() http.HandleFunc("/payload", payloadHandler) log.Fatal(http.ListenAndServe(":8099", nil))}

          最終采用的是兩級 channel,一級是將用戶請求數(shù)據(jù)放入到 chan Job 中,這個 channel job 相當(dāng)于待處理的任務(wù)隊列。

          另一級用來存放可以處理任務(wù)的 work 緩存隊列,類型為 chan chan Job。調(diào)度器把待處理的任務(wù)放入一個空閑的緩存隊列當(dāng)中,work 會一直處理它的緩存隊列。通過這種方式,實(shí)現(xiàn)了一個 worker 池。大致畫了一個圖幫助理解,

          首先我們在接收到一個請求后,創(chuàng)建 Job 任務(wù),把它放入到任務(wù)隊列中等待 work 池處理

          func payloadHandler(w http.ResponseWriter, r *http.Request) {  job := Job{PayLoad: Payload{}}  JobQueue <- work  _, _ = w.Write([]byte("操作成功"))}

          調(diào)度器初始化work池后,在 dispatch 中,一旦我們接收到 JobQueue 的任務(wù),就去嘗試獲取一個可用的 worker,分發(fā)任務(wù)給 worker 的 job channel 中。 注意這個過程不是同步的,而是每接收到一個 job,就開啟一個 G 去處理。這樣可以保證 JobQueue 不需要進(jìn)行阻塞,對應(yīng)的往 JobQueue 理論上也不需要阻塞地寫入任務(wù)。

          func (d *Dispatcher) Run() {  // 開始運(yùn)行 n 個 worker  for i := 0; i < MaxWorker; i++ {    worker := NewWorker(d.WorkerPool)    worker.Start()  }  go d.dispatch()}
          func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: go func(job Job) { // 嘗試獲取一個可用的 worker job channel,阻塞直到有可用的 worker jobChannel := <-d.WorkerPool // 分發(fā)任務(wù)到 worker job channel 中 jobChannel <- job }(job) } }}

          這里"不可控"的 G 和上面還是又所不同的。僅僅極短時間內(nèi)處于阻塞讀 Chan 狀態(tài), 當(dāng)有空閑的 worker 被喚醒,然后分發(fā)任務(wù),整個生命周期遠(yuǎn)遠(yuǎn)短于上面的操作


          附錄

          [1]http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

             

          -- END --


          喜歡明哥文章的同學(xué)
          歡迎長按下圖訂閱!

          ???

          瀏覽 132
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  www.91熊猫成人网 | 午夜第一页 | 艹逼在线观看 | 免费在线观看网站性情淫乱做爱 | 九哥操逼王 |