<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Go中的HTTP請(qǐng)求之——HTTP1.1請(qǐng)求流程分析

          共 7831字,需瀏覽 16分鐘

           ·

          2022-05-17 10:46

          點(diǎn)擊上方“Go語言進(jìn)階學(xué)習(xí)”,進(jìn)行關(guān)注

          回復(fù)“Go語言”即可獲贈(zèng)Python從入門到進(jìn)階共10本電子書

          薄宦梗猶泛,故園蕪已平。

          前言

          http是目前應(yīng)用最為廣泛, 也是程序員接觸最多的協(xié)議之一。今天筆者站在GoPher的角度對(duì)http1.1的請(qǐng)求流程進(jìn)行全面的分析。希望讀者讀完此文后, 能夠有以下幾個(gè)收獲:

          1. 對(duì)http1.1的請(qǐng)求流程有一個(gè)大概的了解

          2. 在平時(shí)的開發(fā)中能夠更好地重用底層TCP連接

          3. 對(duì)http1.1的線頭阻塞能有一個(gè)更清楚的認(rèn)識(shí)

          HTTP1.1流程

          今天內(nèi)容較多, 廢話不多說, 直接上干貨。

          接下來, 筆者將根據(jù)流程圖,對(duì)除了NewRequest以外的函數(shù)進(jìn)行逐步的展開和分析

          (*Client).do

          (*Client).do方法的核心代碼是一個(gè)沒有結(jié)束條件的for循環(huán)。

          for {
          // For all but the first request, create the next
          // request hop and replace req.
          if len(reqs) > 0 {
          loc := resp.Header.Get("Location")
          // ...此處省略代碼...
          err = c.checkRedirect(req, reqs)
          // ...此處省略很多代碼...
          }

          reqs = append(reqs, req)
          var err error
          var didTimeout func() bool
          if resp, didTimeout, err = c.send(req, deadline); err != nil
          {
          // c.send() always closes req.Body
          reqBodyClosed = true
          // ...此處省略代碼...
          return nil, uerr(err)
          }

          var shouldRedirect bool
          redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0])
          if !shouldRedirect {
          return resp, nil
          }

          req.closeBody()
          }

          上面的代碼中, 請(qǐng)求第一次進(jìn)入會(huì)調(diào)用c.send, 得到響應(yīng)后會(huì)判斷請(qǐng)求是否需要重定向, 如果需要重定向則繼續(xù)循環(huán), 否則返回響應(yīng)。

          進(jìn)入重定向流程后, 這里筆者簡(jiǎn)單介紹一下checkRedirect函數(shù):

          func defaultCheckRedirect(req *Request, via []*Request) error {
          if len(via) >= 10 {
          return errors.New("stopped after 10 redirects")
          }
          return nil
          }
          // ...
          func (c *Client) checkRedirect(req *Request, via []*Request) error {
          fn := c.CheckRedirect
          if fn == nil {
          fn = defaultCheckRedirect
          }
          return fn(req, via)
          }

          由上可知, 用戶可以自己定義重定向的檢查規(guī)則。如果用戶沒有自定義檢查規(guī)則, 則重定向次數(shù)不能超過10次

          (*Client).send

          (*Client).send方法邏輯較為簡(jiǎn)單, 主要看用戶有沒有為http.Client的Jar字段實(shí)現(xiàn)CookieJar接口。主要流程如下:

          1. 如果實(shí)現(xiàn)了CookieJar接口, 為Request添加保存的cookie信息。

          2. 調(diào)用send函數(shù)。

          3. 如果實(shí)現(xiàn)了CookieJar接口, 將Response中的cookie信息保存下來。

          // didTimeout is non-nil only if err != nil.
          func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
          if c.Jar != nil {
          for _, cookie := range c.Jar.Cookies(req.URL) {
          req.AddCookie(cookie)
          }
          }
          resp, didTimeout, err = send(req, c.transport(), deadline)
          if err != nil {
          return nil, didTimeout, err
          }
          if c.Jar != nil {
          if rc := resp.Cookies(); len(rc) > 0 {
          c.Jar.SetCookies(req.URL, rc)
          }
          }
          return resp, nil, nil
          }

          另外, 我們還需要關(guān)注c.transport()的調(diào)用。如果用戶未對(duì)http.Client指定Transport則會(huì)使用go默認(rèn)的DefaultTransport。

          該Transport實(shí)現(xiàn)RoundTripper接口。在go中RoundTripper的定義為“執(zhí)行單個(gè)HTTP事務(wù)的能力,獲取給定請(qǐng)求的響應(yīng)”。

          func (c *Client) transport() RoundTripper {
          if c.Transport != nil {
          return c.Transport
          }
          return DefaultTransport
          }

          send

          send函數(shù)會(huì)檢查request的URL,以及參數(shù)的rt, 和header值。如果URL和rt為nil則直接返回錯(cuò)誤。同時(shí), 如果請(qǐng)求中設(shè)置了用戶信息, 還會(huì)檢查并設(shè)置basic的驗(yàn)證頭信息,最后調(diào)用rt.RoundTrip得到請(qǐng)求的響應(yīng)。

          func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
          req := ireq // req is either the original request, or a modified fork
          // ...此處省略代碼...
          if u := req.URL.User; u != nil && req.Header.Get("Authorization") == "" {
          username := u.Username()
          password, _ := u.Password()
          forkReq()
          req.Header = cloneOrMakeHeader(ireq.Header)
          req.Header.Set("Authorization", "Basic "+basicAuth(username, password))
          }

          if !deadline.IsZero() {
          forkReq()
          }
          stopTimer, didTimeout := setRequestCancel(req, rt, deadline)

          resp, err = rt.RoundTrip(req)
          if err != nil {
          // ...此處省略代碼...
          return nil, didTimeout, err
          }
          // ...此處省略代碼...
          return resp, nil, nil
          }

          (*Transport).RoundTrip

          (*Transport).RoundTrip的邏輯很簡(jiǎn)單,它會(huì)調(diào)用(*Transport).roundTrip方法,因此本節(jié)實(shí)際上是對(duì)(*Transport).roundTrip方法的分析。

          func (t *Transport) RoundTrip(req *Request) (*Response, error) {
          return t.roundTrip(req)
          }
          func (t *Transport) roundTrip(req *Request) (*Response, error) {
          // ...此處省略校驗(yàn)header頭和headervalue的代碼以及其他代碼...

          for {
          select {
          case <-ctx.Done():
          req.closeBody()
          return nil, ctx.Err()
          default:
          }

          // treq gets modified by roundTrip, so we need to recreate for each retry.
          treq := &transportRequest{Request: req, trace: trace}
          cm, err := t.connectMethodForRequest(treq)
          // ...此處省略代碼...
          pconn, err := t.getConn(treq, cm)
          if err != nil {
          t.setReqCanceler(req, nil)
          req.closeBody()
          return nil, err
          }

          var resp *Response
          if pconn.alt != nil {
          // HTTP/2 path.
          t.setReqCanceler(req, nil) // not cancelable with CancelRequest
          resp, err = pconn.alt.RoundTrip(req)
          } else {
          resp, err = pconn.roundTrip(treq)
          }
          if err == nil {
          return resp, nil
          }

          // ...此處省略判斷是否重試請(qǐng)求的代碼邏輯...
          }
          }

          由上可知, 每次for循環(huán), 會(huì)判斷請(qǐng)求上下文是否已經(jīng)取消, 如果沒有取消則繼續(xù)進(jìn)行后續(xù)的流程。

          1. 先調(diào)用t.getConn方法獲取一個(gè)persistConn。

          2. 因?yàn)楸酒髦际莌ttp1.1,所以我們直接看http1.1的執(zhí)行分支。根據(jù)源碼中的注釋和實(shí)際的debug結(jié)果,獲取到連接后, 會(huì)繼續(xù)調(diào)用pconn.roundTrip

          (*Transport).getConn

          筆者認(rèn)為這一步在http請(qǐng)求中是非常核心的一個(gè)步驟,因?yàn)橹挥泻蛃erver端建立連接后才能進(jìn)行后續(xù)的通信。

          func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
          req := treq.Request
          trace := treq.trace
          ctx := req.Context()
          // ...此處省略代碼...
          w := &wantConn{
          cm: cm,
          key: cm.key(),
          ctx: ctx,
          ready: make(chan struct{}, 1),
          beforeDial: testHookPrePendingDial,
          afterDial: testHookPostPendingDial,
          }
          // ...此處省略代碼...
          // Queue for idle connection.
          if delivered := t.queueForIdleConn(w); delivered {
          pc := w.pc
          // ...此處省略代碼...
          return pc, nil
          }

          cancelc := make(chan error, 1)
          t.setReqCanceler(req, func(err error) { cancelc <- err })

          // Queue for permission to dial.
          t.queueForDial(w)

          // Wait for completion or cancellation.
          select {
          case <-w.ready:
          // Trace success but only for HTTP/1.
          // HTTP/2 calls trace.GotConn itself.
          if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil {
          trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()})
          }
          // ...此處省略代碼...
          return w.pc, w.err
          case <-req.Cancel:
          return nil, errRequestCanceledConn
          case <-req.Context().Done():
          return nil, req.Context().Err()
          case err := <-cancelc:
          if err == errRequestCanceled {
          err = errRequestCanceledConn
          }
          return nil, err
          }
          }

          由上能夠清楚的知道, 獲取連接分為以下幾個(gè)步驟:

          1. 調(diào)用t.queueForIdleConn獲取一個(gè)空閑且可復(fù)用的連接,如果獲取成功則直接返回該連接。

          2. 如果未獲取到空閑連接則調(diào)用t.queueForDial開始新建一個(gè)連接。

          3. 等待w.ready關(guān)閉,則可以返回新的連接。

          (*Transport).queueForIdleConn

          (*Transport).queueForIdleConn方法會(huì)根據(jù)請(qǐng)求的connectMethodKey從t.idleConn獲取一個(gè)[]*persistConn切片, 并從切片中,根據(jù)算法獲取一個(gè)有效的空閑連接。如果未獲取到空閑連接,則將wantConn結(jié)構(gòu)體變量放入t.idleConnWait[w.key]等待隊(duì)列,此處wantConn結(jié)構(gòu)體變量就是前面提到的w

          connectMethodKey定義和queueForIdleConn部分關(guān)鍵代碼如下:

          type connectMethodKey struct {
          proxy, scheme, addr string
          onlyH1 bool
          }

          func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
          // ...此處省略代碼...
          // Look for most recently-used idle connection.
          if list, ok := t.idleConn[w.key]; ok {
          stop := false
          delivered := false
          for len(list) > 0 && !stop {
          pconn := list[len(list)-1]

          // See whether this connection has been idle too long, considering
          // only the wall time (the Round(0)), in case this is a laptop or VM
          // coming out of suspend with previously cached idle connections.
          tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
          // ...此處省略代碼...
          delivered = w.tryDeliver(pconn, nil)
          if delivered {
          // ...此處省略代碼...
          }
          stop = true
          }
          if len(list) > 0 {
          t.idleConn[w.key] = list
          } else {
          delete(t.idleConn, w.key)
          }
          if stop {
          return delivered
          }
          }

          // Register to receive next connection that becomes idle.
          if t.idleConnWait == nil {
          t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
          }
          q := t.idleConnWait[w.key]
          q.cleanFront()
          q.pushBack(w)
          t.idleConnWait[w.key] = q
          return false
          }

          其中w.tryDeliver方法主要作用是將連接協(xié)程安全的賦值給w.pc,并關(guān)閉w.ready管道。此時(shí)我們便可以和(*Transport).getConn中調(diào)用queueForIdleConn成功后的返回值對(duì)應(yīng)上。

          (*Transport).queueForDial

          (*Transport).queueForDial方法包含三個(gè)步驟:

          1. 如果t.MaxConnsPerHost小于等于0,執(zhí)行go t.dialConnFor(w)并返回。其中MaxConnsPerHost代表著每個(gè)host的最大連接數(shù),小于等于0表示不限制。

          2. 如果當(dāng)前host的連接數(shù)不超過t.MaxConnsPerHost,對(duì)當(dāng)前host的連接數(shù)+1,然后執(zhí)行go t.dialConnFor(w)并返回。

          3. 如果當(dāng)前host的連接數(shù)等于t.MaxConnsPerHost,則將wantConn結(jié)構(gòu)體變量放入t.connsPerHostWait[w.key]等待隊(duì)列,此處wantConn結(jié)構(gòu)體變量就是前面提到的w。另外在放入等待隊(duì)列前會(huì)先清除隊(duì)列中已經(jīng)失效或者不再等待的變量。

          func (t *Transport) queueForDial(w *wantConn) {
          w.beforeDial()
          if t.MaxConnsPerHost <= 0 {
          go t.dialConnFor(w)
          return
          }

          t.connsPerHostMu.Lock()
          defer t.connsPerHostMu.Unlock()

          if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
          if t.connsPerHost == nil {
          t.connsPerHost = make(map[connectMethodKey]int)
          }
          t.connsPerHost[w.key] = n + 1
          go t.dialConnFor(w)
          return
          }

          if t.connsPerHostWait == nil {
          t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
          }
          q := t.connsPerHostWait[w.key]
          q.cleanFront()
          q.pushBack(w)
          t.connsPerHostWait[w.key] = q
          }
          (*Transport).dialConnFor

          (*Transport).dialConnFor方法調(diào)用t.dialConn獲取一個(gè)真正的*persistConn。并將這個(gè)連接傳遞給w, 如果w已經(jīng)獲取到了連接,則會(huì)傳遞失敗,此時(shí)調(diào)用t.putOrCloseIdleConn將連接放回空閑連接池。

          如果連接獲取錯(cuò)誤則會(huì)調(diào)用t.decConnsPerHost減少當(dāng)前host的連接數(shù)。

          func (t *Transport) dialConnFor(w *wantConn) {
          defer w.afterDial()

          pc, err := t.dialConn(w.ctx, w.cm)
          delivered := w.tryDeliver(pc, err)
          if err == nil && (!delivered || pc.alt != nil) {
          // pconn was not passed to w,
          // or it is HTTP/2 and can be shared.
          // Add to the idle connection pool.
          t.putOrCloseIdleConn(pc)
          }
          if err != nil {
          t.decConnsPerHost(w.key)
          }
          }
          • (*Transport).putOrCloseIdleConn方法

          func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
          if err := t.tryPutIdleConn(pconn); err != nil {
          pconn.close(err)
          }
          }
          func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
          if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
          return errKeepAlivesDisabled
          }
          // ...此處省略代碼...
          t.idleMu.Lock()
          defer t.idleMu.Unlock()
          // ...此處省略代碼...

          // Deliver pconn to goroutine waiting for idle connection, if any.
          // (They may be actively dialing, but this conn is ready first.
          // Chrome calls this socket late binding.
          // See https://insouciant.org/tech/connection-management-in-chromium/.)
          key := pconn.cacheKey
          if q, ok := t.idleConnWait[key]; ok {
          done := false
          if pconn.alt == nil {
          // HTTP/1.
          // Loop over the waiting list until we find a w that isn't done already, and hand it pconn.
          for q.len() > 0 {
          w := q.popFront()
          if w.tryDeliver(pconn, nil) {
          done = true
          break
          }
          }
          } else {
          // HTTP/2.
          // Can hand the same pconn to everyone in the waiting list,
          // and we still won't be done: we want to put it in the idle
          // list unconditionally, for any future clients too.
          for q.len() > 0 {
          w := q.popFront()
          w.tryDeliver(pconn, nil)
          }
          }
          if q.len() == 0 {
          delete(t.idleConnWait, key)
          } else {
          t.idleConnWait[key] = q
          }
          if done {
          return nil
          }
          }

          if t.closeIdle {
          return errCloseIdle
          }
          if t.idleConn == nil {
          t.idleConn = make(map[connectMethodKey][]*persistConn)
          }
          idles := t.idleConn[key]
          if len(idles) >= t.maxIdleConnsPerHost() {
          return errTooManyIdleHost
          }
          // ...此處省略代碼...
          t.idleConn[key] = append(idles, pconn)
          t.idleLRU.add(pconn)
          // ...此處省略代碼...
          // Set idle timer, but only for HTTP/1 (pconn.alt == nil).
          // The HTTP/2 implementation manages the idle timer itself
          // (see idleConnTimeout in h2_bundle.go).
          if t.IdleConnTimeout > 0 && pconn.alt == nil {
          if pconn.idleTimer != nil {
          pconn.idleTimer.Reset(t.IdleConnTimeout)
          } else {
          pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
          }
          }
          pconn.idleAt = time.Now()
          return nil
          }
          func (t *Transport) maxIdleConnsPerHost() int {
          if v := t.MaxIdleConnsPerHost; v != 0 {
          return v
          }
          return DefaultMaxIdleConnsPerHost // 2
          }

          由上可知,將連接放入t.idleConn前,先檢查t.idleConnWait的數(shù)量。如果有請(qǐng)求在等待空閑連接, 則將連接復(fù)用,沒有空閑連接時(shí),才將連接放入t.idleConn。連接放入t.idleConn后,還會(huì)重置連接的可空閑時(shí)間。

          另外在t.putOrCloseIdleConn函數(shù)中還需要注意兩點(diǎn):

          1. 如果用戶自定義了http.client,且將DisableKeepAlives設(shè)置為true,或者將MaxIdleConnsPerHost設(shè)置為負(fù)數(shù),則連接不會(huì)放入t.idleConn即連接不能復(fù)用。

          2. 在判斷已有空閑連接數(shù)量時(shí), 如果MaxIdleConnsPerHost 不等于0, 則返回用戶設(shè)置的數(shù)量,否則返回默認(rèn)值2,詳見上面的(*Transport).maxIdleConnsPerHost?函數(shù)。

          綜上, 我們知道對(duì)于部分有連接數(shù)限制的業(yè)務(wù), 我們可以為http.Client自定義一個(gè)Transport, 并設(shè)置Transport的MaxConnsPerHostMaxIdleConnsPerHostIdleConnTimeoutDisableKeepAlives從而達(dá)到即限制連接數(shù)量,又能保證一定的并發(fā)。

          • (*Transport).decConnsPerHost方法

          func (t *Transport) decConnsPerHost(key connectMethodKey) {
          // ...此處省略代碼...
          t.connsPerHostMu.Lock()
          defer t.connsPerHostMu.Unlock()
          n := t.connsPerHost[key]
          // ...此處省略代碼...

          // Can we hand this count to a goroutine still waiting to dial?
          // (Some goroutines on the wait list may have timed out or
          // gotten a connection another way. If they're all gone,
          // we don't want to kick off any spurious dial operations.)
          if q := t.connsPerHostWait[key]; q.len() > 0 {
          done := false
          for q.len() > 0 {
          w := q.popFront()
          if w.waiting() {
          go t.dialConnFor(w)
          done = true
          break
          }
          }
          if q.len() == 0 {
          delete(t.connsPerHostWait, key)
          } else {
          // q is a value (like a slice), so we have to store
          // the updated q back into the map.
          t.connsPerHostWait[key] = q
          }
          if done {
          return
          }
          }

          // Otherwise, decrement the recorded count.
          if n--; n == 0 {
          delete(t.connsPerHost, key)
          } else {
          t.connsPerHost[key] = n
          }
          }

          由上可知, decConnsPerHost方法主要干了兩件事:

          1. 判斷是否有請(qǐng)求在等待撥號(hào), 如果有則執(zhí)行go t.dialConnFor(w)

          2. 如果沒有請(qǐng)求在等待撥號(hào), 則減少當(dāng)前host的連接數(shù)量。

          (*Transport).dialConn

          根據(jù)http.Client的默認(rèn)配置和實(shí)際的debug結(jié)果,(*Transport).dialConn方法主要邏輯如下:

          1. 調(diào)用t.dial(ctx, "tcp", cm.addr())創(chuàng)建TCP連接。

          2. 如果是https的請(qǐng)求, 則對(duì)請(qǐng)求建立安全的tls傳輸通道。

          3. 為persistConn創(chuàng)建讀寫buffer, 如果用戶沒有自定義讀寫buffer的大小, 根據(jù)writeBufferSize和readBufferSize方法可知, 讀寫bufffer的大小默認(rèn)為4096。

          4. 執(zhí)行go pconn.readLoop()go pconn.writeLoop()開啟讀寫循環(huán)然后返回連接。

          func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
          pconn = &persistConn{
          t: t,
          cacheKey: cm.key(),
          reqch: make(chan requestAndChan, 1),
          writech: make(chan writeRequest, 1),
          closech: make(chan struct{}),
          writeErrCh: make(chan error, 1),
          writeLoopDone: make(chan struct{}),
          }
          // ...此處省略代碼...
          if cm.scheme() == "https" && t.hasCustomTLSDialer() {
          // ...此處省略代碼...
          } else {
          conn, err := t.dial(ctx, "tcp", cm.addr())
          if err != nil {
          return nil, wrapErr(err)
          }
          pconn.conn = conn
          if cm.scheme() == "https" {
          var firstTLSHost string
          if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
          return nil, wrapErr(err)
          }
          if err = pconn.addTLS(firstTLSHost, trace); err != nil {
          return nil, wrapErr(err)
          }
          }
          }

          // Proxy setup.
          switch { // ...此處省略代碼... }

          if cm.proxyURL != nil && cm.targetScheme == "https" {
          // ...此處省略代碼...
          }

          if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
          // ...此處省略代碼...
          }

          pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
          pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())

          go pconn.readLoop()
          go pconn.writeLoop()
          return pconn, nil
          }
          func (t *Transport) writeBufferSize() int {
          if t.WriteBufferSize > 0 {
          return t.WriteBufferSize
          }
          return 4 << 10
          }

          func (t *Transport) readBufferSize() int {
          if t.ReadBufferSize > 0 {
          return t.ReadBufferSize
          }
          return 4 << 10
          }

          (*persistConn).roundTrip

          (*persistConn).roundTrip方法是http1.1請(qǐng)求的核心之一,該方法在這里獲取真實(shí)的Response并返回給上層。

          func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
          // ...此處省略代碼...

          gone := make(chan struct{})
          defer close(gone)
          // ...此處省略代碼...
          const debugRoundTrip = false

          // Write the request concurrently with waiting for a response,
          // in case the server decides to reply before reading our full
          // request body.
          startBytesWritten := pc.nwrite
          writeErrCh := make(chan error, 1)
          pc.writech <- writeRequest{req, writeErrCh, continueCh}

          resc := make(chan responseAndError)
          pc.reqch <- requestAndChan{
          req: req.Request,
          ch: resc,
          addedGzip: requestedGzip,
          continueCh: continueCh,
          callerGone: gone,
          }

          var respHeaderTimer <-chan time.Time
          cancelChan := req.Request.Cancel
          ctxDoneChan := req.Context().Done()
          for {
          testHookWaitResLoop()
          select {
          case err := <-writeErrCh:
          // ...此處省略代碼...
          if err != nil {
          pc.close(fmt.Errorf("write error: %v", err))
          return nil, pc.mapRoundTripError(req, startBytesWritten, err)
          }
          // ...此處省略代碼...
          case <-pc.closech:
          // ...此處省略代碼...
          return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
          case <-respHeaderTimer:
          // ...此處省略代碼...
          return nil, errTimeout
          case re := <-resc:
          if (re.res == nil) == (re.err == nil) {
          panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
          }
          if debugRoundTrip {
          req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
          }
          if re.err != nil {
          return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
          }
          return re.res, nil
          case <-cancelChan:
          pc.t.CancelRequest(req.Request)
          cancelChan = nil
          case <-ctxDoneChan:
          pc.t.cancelRequest(req.Request, req.Context().Err())
          cancelChan = nil
          ctxDoneChan = nil
          }
          }
          }

          由上可知, (*persistConn).roundTrip方法可以分為三步:

          1. 向連接的writech寫入writeRequest:?pc.writech <- writeRequest{req, writeErrCh, continueCh}, 參考(*Transport).dialConn可知pc.writech是一個(gè)緩沖大小為1的管道,所以會(huì)立馬寫入成功。

          2. 向連接的reqch寫入requestAndChan:?pc.reqch <- requestAndChan, pc.reqch和pc.writech一樣都是緩沖大小為1的管道。其中requestAndChan.ch是一個(gè)無緩沖的responseAndError管道,(*persistConn).roundTrip就通過這個(gè)管道讀取到真實(shí)的響應(yīng)。

          3. 開啟for循環(huán)select, 等待響應(yīng)或者超時(shí)等信息。

          • (*persistConn).writeLoop 寫循環(huán)

          (*persistConn).writeLoop方法主體邏輯相對(duì)簡(jiǎn)單,把用戶的請(qǐng)求寫入連接的寫緩存buffer, 最后再flush就可以了。

          func (pc *persistConn) writeLoop() {
          defer close(pc.writeLoopDone)
          for {
          select {
          case wr := <-pc.writech:
          startBytesWritten := pc.nwrite
          err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
          if bre, ok := err.(requestBodyReadError); ok {
          err = bre.error
          wr.req.setError(err)
          }
          if err == nil {
          err = pc.bw.Flush()
          }
          if err != nil {
          wr.req.Request.closeBody()
          if pc.nwrite == startBytesWritten {
          err = nothingWrittenError{err}
          }
          }
          pc.writeErrCh <- err // to the body reader, which might recycle us
          wr.ch <- err // to the roundTrip function
          if err != nil {
          pc.close(err)
          return
          }
          case <-pc.closech:
          return
          }
          }
          }
          • (*persistConn).readLoop 讀循環(huán)

          (*persistConn).readLoop有較多的細(xì)節(jié), 我們先看代碼, 然后再逐步分析。

          func (pc *persistConn) readLoop() {
          closeErr := errReadLoopExiting // default value, if not changed below
          defer func() {
          pc.close(closeErr)
          pc.t.removeIdleConn(pc)
          }()

          tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
          if err := pc.t.tryPutIdleConn(pc); err != nil {
          // ...此處省略代碼...
          }
          // ...此處省略代碼...
          return true
          }
          // ...此處省略代碼...
          alive := true
          for alive {
          // ...此處省略代碼...
          rc := <-pc.reqch
          trace := httptrace.ContextClientTrace(rc.req.Context())

          var resp *Response
          if err == nil {
          resp, err = pc.readResponse(rc, trace)
          } else {
          err = transportReadFromServerError{err}
          closeErr = err
          }

          // ...此處省略代碼...
          bodyWritable := resp.bodyIsWritable()
          hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0

          if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
          // Don't do keep-alive on error if either party requested a close
          // or we get an unexpected informational (1xx) response.
          // StatusCode 100 is already handled above.
          alive = false
          }

          if !hasBody || bodyWritable {
          // ...此處省略代碼...
          continue
          }

          waitForBodyRead := make(chan bool, 2)
          body := &bodyEOFSignal{
          body: resp.Body,
          earlyCloseFn: func() error {
          waitForBodyRead <- false
          <-eofc // will be closed by deferred call at the end of the function
          return nil

          },
          fn: func(err error) error {
          isEOF := err == io.EOF
          waitForBodyRead <- isEOF
          if isEOF {
          <-eofc // see comment above eofc declaration
          } else if err != nil {
          if cerr := pc.canceled(); cerr != nil {
          return cerr
          }
          }
          return err
          },
          }

          resp.Body = body
          // ...此處省略代碼...

          select {
          case rc.ch <- responseAndError{res: resp}:
          case <-rc.callerGone:
          return
          }

          // Before looping back to the top of this function and peeking on
          // the bufio.Reader, wait for the caller goroutine to finish
          // reading the response body. (or for cancellation or death)
          select {
          case bodyEOF := <-waitForBodyRead:
          pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
          alive = alive &&
          bodyEOF &&
          !pc.sawEOF &&
          pc.wroteRequest() &&
          tryPutIdleConn(trace)
          if bodyEOF {
          eofc <- struct{}{}
          }
          case <-rc.req.Cancel:
          alive = false
          pc.t.CancelRequest(rc.req)
          case <-rc.req.Context().Done():
          alive = false
          pc.t.cancelRequest(rc.req, rc.req.Context().Err())
          case <-pc.closech:
          alive = false
          }

          testHookReadLoopBeforeNextRead()
          }
          }

          由上可知, 只要連接處于活躍狀態(tài), 則這個(gè)讀循環(huán)會(huì)一直開啟, 直到 連接不活躍或者產(chǎn)生其他錯(cuò)誤才會(huì)結(jié)束讀循環(huán)。

          在上述源碼中,pc.readResponse(rc,trace)會(huì)從連接的讀buffer中獲取一個(gè)請(qǐng)求對(duì)應(yīng)的Response。

          讀到響應(yīng)之后判斷請(qǐng)求是否是HEAD請(qǐng)求或者響應(yīng)內(nèi)容為空,如果是HEAD請(qǐng)求或者響應(yīng)內(nèi)容為空則將響應(yīng)寫入rc.ch,并將連接放入idleConn(此處因?yàn)槠脑蚴÷粤嗽创a內(nèi)容, 正常請(qǐng)求的邏輯也有寫響應(yīng)和將連接放入idleConn兩個(gè)步驟)。

          如果不是HEAD請(qǐng)求并且響應(yīng)內(nèi)容不為空即!hasBody || bodyWritable為false:

          1. 創(chuàng)建一個(gè)緩沖大小為2的等待響應(yīng)被讀取的管道waitForBodyRead:?waitForBodyRead := make(chan bool, 2)

          2. 將響應(yīng)的Body修改為bodyEOFSignal結(jié)構(gòu)體。通過上面的源碼我們可以知道,此時(shí)的resp.Body中有earlyCloseFnfn兩個(gè)函數(shù)。earlyCloseFn函數(shù)會(huì)向waitForBodyRead管道寫入false, fn函數(shù)會(huì)判斷響應(yīng)是否讀完, 如果已經(jīng)讀完則向waitForBodyRead寫入true否則寫入false

          3. 將修改后的響應(yīng)寫入rc.ch。其中rc.chrc := <-pc.reqch獲取,而pc.reqch正是前面(*persistConn).roundTrip函數(shù)寫入的requestAndChanrequestAndChan.ch是一個(gè)無緩沖的responseAndError管道,(*persistConn).roundTrip通過這個(gè)管道讀取到真實(shí)的響應(yīng)。

          4. select 讀取 waitForBodyRead被寫入的值。如果讀到到的是true則可以調(diào)用tryPutIdleConn(此方法會(huì)調(diào)用前面提到的(*Transport).tryPutIdleConn方法)將連接放入idleConn從而復(fù)用連接。

          waitForBodyRead寫入true的原因我們已經(jīng)知道了,但是被寫入true的時(shí)機(jī)我們尚不明確。

          func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
          // ...此處省略代碼...
          n, err = es.body.Read(p)
          if err != nil {
          es.mu.Lock()
          defer es.mu.Unlock()
          if es.rerr == nil {
          es.rerr = err
          }
          err = es.condfn(err)
          }
          return
          }

          func (es *bodyEOFSignal) Close() error {
          es.mu.Lock()
          defer es.mu.Unlock()
          if es.closed {
          return nil
          }
          es.closed = true
          if es.earlyCloseFn != nil && es.rerr != io.EOF {
          return es.earlyCloseFn()
          }
          err := es.body.Close()
          return es.condfn(err)
          }

          // caller must hold es.mu.
          func (es *bodyEOFSignal) condfn(err error) error {
          if es.fn == nil {
          return err
          }
          err = es.fn(err)
          es.fn = nil
          return err
          }

          由上述源碼可知, 只有當(dāng)調(diào)用方完整的讀取了響應(yīng),該連接才能夠被復(fù)用。因此在http1.1中,一個(gè)連接上的請(qǐng)求,只有等前一個(gè)請(qǐng)求處理完之后才能繼續(xù)下一個(gè)請(qǐng)求。如果前面的請(qǐng)求處理較慢, 則后面的請(qǐng)求必須等待, 這就是http1.1中的線頭阻塞。

          根據(jù)上面的邏輯, 我們GoPher在平時(shí)的開發(fā)中如果遇到了不關(guān)心響應(yīng)的請(qǐng)求, 也一定要記得把響應(yīng)body讀完以保證連接的復(fù)用性。筆者在這里給出一個(gè)demo:

          io.CopyN(ioutil.Discard, resp.Body, 2 << 10)
          resp.Body.Close()

          以上,就是筆者整理的HTTP1.1的請(qǐng)求流程。

          注意

          筆者本著嚴(yán)謹(jǐn)?shù)膽B(tài)度, 特此提醒:

          上述流程中筆者對(duì)很多細(xì)節(jié)并未詳細(xì)提及或者僅一筆帶過,希望讀者酌情參考。

          總結(jié)

          1. 在go中發(fā)起http1.1的請(qǐng)求時(shí), 如果遇到不關(guān)心響應(yīng)的請(qǐng)求,請(qǐng)務(wù)必完整讀取響應(yīng)內(nèi)容以保證連接的復(fù)用性。

          2. 如果遇到對(duì)連接數(shù)有限制的業(yè)務(wù),可以通過自定義http.Client的Transport, 并設(shè)置Transport的MaxConnsPerHostMaxIdleConnsPerHostIdleConnTimeoutDisableKeepAlives的值,來控制連接數(shù)。

          3. 如果對(duì)于重定向業(yè)務(wù)邏輯有需求,可以自定義http.Client的CheckRedirect

          4. 在http1.1,中一個(gè)連接上的請(qǐng)求,只有等前一個(gè)請(qǐng)求處理完之后才能繼續(xù)下一個(gè)請(qǐng)求。如果前面的請(qǐng)求處理較慢, 則后面的請(qǐng)求必須等待, 這就是http1.1中的線頭阻塞。

          注: 寫本文時(shí), 筆者所用go版本為: go1.14.2

          生命不息, 探索不止, 后續(xù)將持續(xù)更新有關(guān)于go的技術(shù)探索

          原創(chuàng)不易, 卑微求關(guān)注收藏二連。

          推薦閱讀:

          -------------------?End?-------------------

          歡迎大家點(diǎn)贊轉(zhuǎn)發(fā),轉(zhuǎn)載,感謝大家的相伴與支持

          想加入學(xué)習(xí)群請(qǐng)?jiān)诤笈_(tái)回復(fù)【入群

          萬水千山總是情,點(diǎn)個(gè)【在看】行不行

          瀏覽 83
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产经典操逼 | 成年人看的毛片 | 人人操在线 | 国产小说一区二区三区国产 | 淫香淫色综合 |