<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Go中的HTTP請求之——HTTP1.1請求流程分析

          共 7859字,需瀏覽 16分鐘

           ·

          2020-09-10 20:45

          前言

          http是目前應用最為廣泛, 也是程序員接觸最多的協(xié)議之一。今天筆者站在GoPher的角度對http1.1的請求流程進行全面的分析。希望讀者讀完此文后, 能夠有以下幾個收獲:

          1. 對http1.1的請求流程有一個大概的了解

          2. 在平時的開發(fā)中能夠更好地重用底層TCP連接

          3. 對http1.1的線頭阻塞能有一個更清楚的認識

          HTTP1.1流程

          今天內(nèi)容較多, 廢話不多說, 直接上干貨。

          接下來, 筆者將根據(jù)流程圖,對除了NewRequest以外的函數(shù)進行逐步的展開和分析

          (*Client).do

          (*Client).do方法的核心代碼是一個沒有結(jié)束條件的for循環(huán)。

          for {
          // For all but the first request, create the next
          // request hop and replace req.
          if len(reqs) > 0 {
          loc := resp.Header.Get("Location")
          // ...此處省略代碼...
          err = c.checkRedirect(req, reqs)
          // ...此處省略很多代碼...
          }

          reqs = append(reqs, req)
          var err error
          var didTimeout func() bool
          if resp, didTimeout, err = c.send(req, deadline); err != nil
          {
          // c.send() always closes req.Body
          reqBodyClosed = true
          // ...此處省略代碼...
          return nil, uerr(err)
          }

          var shouldRedirect bool
          redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0])
          if !shouldRedirect {
          return resp, nil
          }

          req.closeBody()
          }

          上面的代碼中, 請求第一次進入會調(diào)用c.send, 得到響應后會判斷請求是否需要重定向, 如果需要重定向則繼續(xù)循環(huán), 否則返回響應。

          進入重定向流程后, 這里筆者簡單介紹一下checkRedirect函數(shù):

          func defaultCheckRedirect(req *Request, via []*Request) error {
          if len(via) >= 10 {
          return errors.New("stopped after 10 redirects")
          }
          return nil
          }
          // ...
          func (c *Client) checkRedirect(req *Request, via []*Request) error {
          fn := c.CheckRedirect
          if fn == nil {
          fn = defaultCheckRedirect
          }
          return fn(req, via)
          }

          由上可知, 用戶可以自己定義重定向的檢查規(guī)則。如果用戶沒有自定義檢查規(guī)則, 則重定向次數(shù)不能超過10次。

          (*Client).send

          (*Client).send方法邏輯較為簡單, 主要看用戶有沒有為http.Client的Jar字段實現(xiàn)CookieJar接口。主要流程如下:

          1. 如果實現(xiàn)了CookieJar接口, 為Request添加保存的cookie信息。

          2. 調(diào)用send函數(shù)。

          3. 如果實現(xiàn)了CookieJar接口, 將Response中的cookie信息保存下來。

          // didTimeout is non-nil only if err != nil.
          func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
          if c.Jar != nil {
          for _, cookie := range c.Jar.Cookies(req.URL) {
          req.AddCookie(cookie)
          }
          }
          resp, didTimeout, err = send(req, c.transport(), deadline)
          if err != nil {
          return nil, didTimeout, err
          }
          if c.Jar != nil {
          if rc := resp.Cookies(); len(rc) > 0 {
          c.Jar.SetCookies(req.URL, rc)
          }
          }
          return resp, nil, nil
          }

          另外, 我們還需要關(guān)注c.transport()的調(diào)用。如果用戶未對http.Client指定Transport則會使用go默認的DefaultTransport。

          該Transport實現(xiàn)RoundTripper接口。在go中RoundTripper的定義為“執(zhí)行單個HTTP事務的能力,獲取給定請求的響應”。

          func (c *Client) transport() RoundTripper {
          if c.Transport != nil {
          return c.Transport
          }
          return DefaultTransport
          }

          send

          send函數(shù)會檢查request的URL,以及參數(shù)的rt, 和header值。如果URL和rt為nil則直接返回錯誤。同時, 如果請求中設置了用戶信息, 還會檢查并設置basic的驗證頭信息,最后調(diào)用rt.RoundTrip得到請求的響應。

          func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
          req := ireq // req is either the original request, or a modified fork
          // ...此處省略代碼...
          if u := req.URL.User; u != nil && req.Header.Get("Authorization") == "" {
          username := u.Username()
          password, _ := u.Password()
          forkReq()
          req.Header = cloneOrMakeHeader(ireq.Header)
          req.Header.Set("Authorization", "Basic "+basicAuth(username, password))
          }

          if !deadline.IsZero() {
          forkReq()
          }
          stopTimer, didTimeout := setRequestCancel(req, rt, deadline)

          resp, err = rt.RoundTrip(req)
          if err != nil {
          // ...此處省略代碼...
          return nil, didTimeout, err
          }
          // ...此處省略代碼...
          return resp, nil, nil
          }

          (*Transport).RoundTrip

          (*Transport).RoundTrip的邏輯很簡單,它會調(diào)用(*Transport).roundTrip方法,因此本節(jié)實際上是對(*Transport).roundTrip方法的分析。

          func (t *Transport) RoundTrip(req *Request) (*Response, error) {
          return t.roundTrip(req)
          }
          func (t *Transport) roundTrip(req *Request) (*Response, error) {
          // ...此處省略校驗header頭和headervalue的代碼以及其他代碼...

          for {
          select {
          case <-ctx.Done():
          req.closeBody()
          return nil, ctx.Err()
          default:
          }

          // treq gets modified by roundTrip, so we need to recreate for each retry.
          treq := &transportRequest{Request: req, trace: trace}
          cm, err := t.connectMethodForRequest(treq)
          // ...此處省略代碼...
          pconn, err := t.getConn(treq, cm)
          if err != nil {
          t.setReqCanceler(req, nil)
          req.closeBody()
          return nil, err
          }

          var resp *Response
          if pconn.alt != nil {
          // HTTP/2 path.
          t.setReqCanceler(req, nil) // not cancelable with CancelRequest
          resp, err = pconn.alt.RoundTrip(req)
          } else {
          resp, err = pconn.roundTrip(treq)
          }
          if err == nil {
          return resp, nil
          }

          // ...此處省略判斷是否重試請求的代碼邏輯...
          }
          }

          由上可知, 每次for循環(huán), 會判斷請求上下文是否已經(jīng)取消, 如果沒有取消則繼續(xù)進行后續(xù)的流程。

          1. 先調(diào)用t.getConn方法獲取一個persistConn。

          2. 因為本篇主旨是http1.1,所以我們直接看http1.1的執(zhí)行分支。根據(jù)源碼中的注釋和實際的debug結(jié)果,獲取到連接后, 會繼續(xù)調(diào)用pconn.roundTrip。

          (*Transport).getConn

          筆者認為這一步在http請求中是非常核心的一個步驟,因為只有和server端建立連接后才能進行后續(xù)的通信。

          func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
          req := treq.Request
          trace := treq.trace
          ctx := req.Context()
          // ...此處省略代碼...
          w := &wantConn{
          cm: cm,
          key: cm.key(),
          ctx: ctx,
          ready: make(chan struct{}, 1),
          beforeDial: testHookPrePendingDial,
          afterDial: testHookPostPendingDial,
          }
          // ...此處省略代碼...
          // Queue for idle connection.
          if delivered := t.queueForIdleConn(w); delivered {
          pc := w.pc
          // ...此處省略代碼...
          return pc, nil
          }

          cancelc := make(chan error, 1)
          t.setReqCanceler(req, func(err error) { cancelc <- err })

          // Queue for permission to dial.
          t.queueForDial(w)

          // Wait for completion or cancellation.
          select {
          case <-w.ready:
          // Trace success but only for HTTP/1.
          // HTTP/2 calls trace.GotConn itself.
          if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil {
          trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()})
          }
          // ...此處省略代碼...
          return w.pc, w.err
          case <-req.Cancel:
          return nil, errRequestCanceledConn
          case <-req.Context().Done():
          return nil, req.Context().Err()
          case err := <-cancelc:
          if err == errRequestCanceled {
          err = errRequestCanceledConn
          }
          return nil, err
          }
          }

          由上能夠清楚的知道, 獲取連接分為以下幾個步驟:

          1. 調(diào)用t.queueForIdleConn獲取一個空閑且可復用的連接,如果獲取成功則直接返回該連接。

          2. 如果未獲取到空閑連接則調(diào)用t.queueForDial開始新建一個連接。

          3. 等待w.ready關(guān)閉,則可以返回新的連接。

          (*Transport).queueForIdleConn

          (*Transport).queueForIdleConn方法會根據(jù)請求的connectMethodKey從t.idleConn獲取一個[]*persistConn切片, 并從切片中,根據(jù)算法獲取一個有效的空閑連接。如果未獲取到空閑連接,則將wantConn結(jié)構(gòu)體變量放入t.idleConnWait[w.key]等待隊列,此處wantConn結(jié)構(gòu)體變量就是前面提到的w。

          connectMethodKey定義和queueForIdleConn部分關(guān)鍵代碼如下:

          type connectMethodKey struct {
          proxy, scheme, addr string
          onlyH1 bool
          }

          func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
          // ...此處省略代碼...
          // Look for most recently-used idle connection.
          if list, ok := t.idleConn[w.key]; ok {
          stop := false
          delivered := false
          for len(list) > 0 && !stop {
          pconn := list[len(list)-1]

          // See whether this connection has been idle too long, considering
          // only the wall time (the Round(0)), in case this is a laptop or VM
          // coming out of suspend with previously cached idle connections.
          tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
          // ...此處省略代碼...
          delivered = w.tryDeliver(pconn, nil)
          if delivered {
          // ...此處省略代碼...
          }
          stop = true
          }
          if len(list) > 0 {
          t.idleConn[w.key] = list
          } else {
          delete(t.idleConn, w.key)
          }
          if stop {
          return delivered
          }
          }

          // Register to receive next connection that becomes idle.
          if t.idleConnWait == nil {
          t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
          }
          q := t.idleConnWait[w.key]
          q.cleanFront()
          q.pushBack(w)
          t.idleConnWait[w.key] = q
          return false
          }

          其中w.tryDeliver方法主要作用是將連接協(xié)程安全的賦值給w.pc,并關(guān)閉w.ready管道。此時我們便可以和(*Transport).getConn中調(diào)用queueForIdleConn成功后的返回值對應上。

          (*Transport).queueForDial

          (*Transport).queueForDial方法包含三個步驟:

          1. 如果t.MaxConnsPerHost小于等于0,執(zhí)行go t.dialConnFor(w)并返回。其中MaxConnsPerHost代表著每個host的最大連接數(shù),小于等于0表示不限制。

          2. 如果當前host的連接數(shù)不超過t.MaxConnsPerHost,對當前host的連接數(shù)+1,然后執(zhí)行go t.dialConnFor(w)并返回。

          3. 如果當前host的連接數(shù)等于t.MaxConnsPerHost,則將wantConn結(jié)構(gòu)體變量放入t.connsPerHostWait[w.key]等待隊列,此處wantConn結(jié)構(gòu)體變量就是前面提到的w。另外在放入等待隊列前會先清除隊列中已經(jīng)失效或者不再等待的變量。

          func (t *Transport) queueForDial(w *wantConn) {
          w.beforeDial()
          if t.MaxConnsPerHost <= 0 {
          go t.dialConnFor(w)
          return
          }

          t.connsPerHostMu.Lock()
          defer t.connsPerHostMu.Unlock()

          if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
          if t.connsPerHost == nil {
          t.connsPerHost = make(map[connectMethodKey]int)
          }
          t.connsPerHost[w.key] = n + 1
          go t.dialConnFor(w)
          return
          }

          if t.connsPerHostWait == nil {
          t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
          }
          q := t.connsPerHostWait[w.key]
          q.cleanFront()
          q.pushBack(w)
          t.connsPerHostWait[w.key] = q
          }
          (*Transport).dialConnFor

          (*Transport).dialConnFor方法調(diào)用t.dialConn獲取一個真正的*persistConn。并將這個連接傳遞給w, 如果w已經(jīng)獲取到了連接,則會傳遞失敗,此時調(diào)用t.putOrCloseIdleConn將連接放回空閑連接池。

          如果連接獲取錯誤則會調(diào)用t.decConnsPerHost減少當前host的連接數(shù)。

          func (t *Transport) dialConnFor(w *wantConn) {
          defer w.afterDial()

          pc, err := t.dialConn(w.ctx, w.cm)
          delivered := w.tryDeliver(pc, err)
          if err == nil && (!delivered || pc.alt != nil) {
          // pconn was not passed to w,
          // or it is HTTP/2 and can be shared.
          // Add to the idle connection pool.
          t.putOrCloseIdleConn(pc)
          }
          if err != nil {
          t.decConnsPerHost(w.key)
          }
          }
          • (*Transport).putOrCloseIdleConn方法

          func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
          if err := t.tryPutIdleConn(pconn); err != nil {
          pconn.close(err)
          }
          }
          func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
          if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
          return errKeepAlivesDisabled
          }
          // ...此處省略代碼...
          t.idleMu.Lock()
          defer t.idleMu.Unlock()
          // ...此處省略代碼...

          // Deliver pconn to goroutine waiting for idle connection, if any.
          // (They may be actively dialing, but this conn is ready first.
          // Chrome calls this socket late binding.
          // See https://insouciant.org/tech/connection-management-in-chromium/.)
          key := pconn.cacheKey
          if q, ok := t.idleConnWait[key]; ok {
          done := false
          if pconn.alt == nil {
          // HTTP/1.
          // Loop over the waiting list until we find a w that isn't done already, and hand it pconn.
          for q.len() > 0 {
          w := q.popFront()
          if w.tryDeliver(pconn, nil) {
          done = true
          break
          }
          }
          } else {
          // HTTP/2.
          // Can hand the same pconn to everyone in the waiting list,
          // and we still won't be done: we want to put it in the idle
          // list unconditionally, for any future clients too.
          for q.len() > 0 {
          w := q.popFront()
          w.tryDeliver(pconn, nil)
          }
          }
          if q.len() == 0 {
          delete(t.idleConnWait, key)
          } else {
          t.idleConnWait[key] = q
          }
          if done {
          return nil
          }
          }

          if t.closeIdle {
          return errCloseIdle
          }
          if t.idleConn == nil {
          t.idleConn = make(map[connectMethodKey][]*persistConn)
          }
          idles := t.idleConn[key]
          if len(idles) >= t.maxIdleConnsPerHost() {
          return errTooManyIdleHost
          }
          // ...此處省略代碼...
          t.idleConn[key] = append(idles, pconn)
          t.idleLRU.add(pconn)
          // ...此處省略代碼...
          // Set idle timer, but only for HTTP/1 (pconn.alt == nil).
          // The HTTP/2 implementation manages the idle timer itself
          // (see idleConnTimeout in h2_bundle.go).
          if t.IdleConnTimeout > 0 && pconn.alt == nil {
          if pconn.idleTimer != nil {
          pconn.idleTimer.Reset(t.IdleConnTimeout)
          } else {
          pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
          }
          }
          pconn.idleAt = time.Now()
          return nil
          }
          func (t *Transport) maxIdleConnsPerHost() int {
          if v := t.MaxIdleConnsPerHost; v != 0 {
          return v
          }
          return DefaultMaxIdleConnsPerHost // 2
          }

          由上可知,將連接放入t.idleConn前,先檢查t.idleConnWait的數(shù)量。如果有請求在等待空閑連接, 則將連接復用,沒有空閑連接時,才將連接放入t.idleConn。連接放入t.idleConn后,還會重置連接的可空閑時間。

          另外在t.putOrCloseIdleConn函數(shù)中還需要注意兩點:

          1. 如果用戶自定義了http.client,且將DisableKeepAlives設置為true,或者將MaxIdleConnsPerHost設置為負數(shù),則連接不會放入t.idleConn即連接不能復用。

          2. 在判斷已有空閑連接數(shù)量時, 如果MaxIdleConnsPerHost 不等于0, 則返回用戶設置的數(shù)量,否則返回默認值2,詳見上面的(*Transport).maxIdleConnsPerHost?函數(shù)。

          綜上, 我們知道對于部分有連接數(shù)限制的業(yè)務, 我們可以為http.Client自定義一個Transport, 并設置Transport的MaxConnsPerHost,MaxIdleConnsPerHost,IdleConnTimeoutDisableKeepAlives從而達到即限制連接數(shù)量,又能保證一定的并發(fā)。

          • (*Transport).decConnsPerHost方法

          func (t *Transport) decConnsPerHost(key connectMethodKey) {
          // ...此處省略代碼...
          t.connsPerHostMu.Lock()
          defer t.connsPerHostMu.Unlock()
          n := t.connsPerHost[key]
          // ...此處省略代碼...

          // Can we hand this count to a goroutine still waiting to dial?
          // (Some goroutines on the wait list may have timed out or
          // gotten a connection another way. If they're all gone,
          // we don't want to kick off any spurious dial operations.)
          if q := t.connsPerHostWait[key]; q.len() > 0 {
          done := false
          for q.len() > 0 {
          w := q.popFront()
          if w.waiting() {
          go t.dialConnFor(w)
          done = true
          break
          }
          }
          if q.len() == 0 {
          delete(t.connsPerHostWait, key)
          } else {
          // q is a value (like a slice), so we have to store
          // the updated q back into the map.
          t.connsPerHostWait[key] = q
          }
          if done {
          return
          }
          }

          // Otherwise, decrement the recorded count.
          if n--; n == 0 {
          delete(t.connsPerHost, key)
          } else {
          t.connsPerHost[key] = n
          }
          }

          由上可知, decConnsPerHost方法主要干了兩件事:

          1. 判斷是否有請求在等待撥號, 如果有則執(zhí)行go t.dialConnFor(w)。

          2. 如果沒有請求在等待撥號, 則減少當前host的連接數(shù)量。

          (*Transport).dialConn

          根據(jù)http.Client的默認配置和實際的debug結(jié)果,(*Transport).dialConn方法主要邏輯如下:

          1. 調(diào)用t.dial(ctx, "tcp", cm.addr())創(chuàng)建TCP連接。

          2. 如果是https的請求, 則對請求建立安全的tls傳輸通道。

          3. 為persistConn創(chuàng)建讀寫buffer, 如果用戶沒有自定義讀寫buffer的大小, 根據(jù)writeBufferSize和readBufferSize方法可知, 讀寫bufffer的大小默認為4096。

          4. 執(zhí)行go pconn.readLoop()go pconn.writeLoop()開啟讀寫循環(huán)然后返回連接。

          func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
          pconn = &persistConn{
          t: t,
          cacheKey: cm.key(),
          reqch: make(chan requestAndChan, 1),
          writech: make(chan writeRequest, 1),
          closech: make(chan struct{}),
          writeErrCh: make(chan error, 1),
          writeLoopDone: make(chan struct{}),
          }
          // ...此處省略代碼...
          if cm.scheme() == "https" && t.hasCustomTLSDialer() {
          // ...此處省略代碼...
          } else {
          conn, err := t.dial(ctx, "tcp", cm.addr())
          if err != nil {
          return nil, wrapErr(err)
          }
          pconn.conn = conn
          if cm.scheme() == "https" {
          var firstTLSHost string
          if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
          return nil, wrapErr(err)
          }
          if err = pconn.addTLS(firstTLSHost, trace); err != nil {
          return nil, wrapErr(err)
          }
          }
          }

          // Proxy setup.
          switch { // ...此處省略代碼... }

          if cm.proxyURL != nil && cm.targetScheme == "https" {
          // ...此處省略代碼...
          }

          if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
          // ...此處省略代碼...
          }

          pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
          pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())

          go pconn.readLoop()
          go pconn.writeLoop()
          return pconn, nil
          }
          func (t *Transport) writeBufferSize() int {
          if t.WriteBufferSize > 0 {
          return t.WriteBufferSize
          }
          return 4 << 10
          }

          func (t *Transport) readBufferSize() int {
          if t.ReadBufferSize > 0 {
          return t.ReadBufferSize
          }
          return 4 << 10
          }

          (*persistConn).roundTrip

          (*persistConn).roundTrip方法是http1.1請求的核心之一,該方法在這里獲取真實的Response并返回給上層。

          func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
          // ...此處省略代碼...

          gone := make(chan struct{})
          defer close(gone)
          // ...此處省略代碼...
          const debugRoundTrip = false

          // Write the request concurrently with waiting for a response,
          // in case the server decides to reply before reading our full
          // request body.
          startBytesWritten := pc.nwrite
          writeErrCh := make(chan error, 1)
          pc.writech <- writeRequest{req, writeErrCh, continueCh}

          resc := make(chan responseAndError)
          pc.reqch <- requestAndChan{
          req: req.Request,
          ch: resc,
          addedGzip: requestedGzip,
          continueCh: continueCh,
          callerGone: gone,
          }

          var respHeaderTimer <-chan time.Time
          cancelChan := req.Request.Cancel
          ctxDoneChan := req.Context().Done()
          for {
          testHookWaitResLoop()
          select {
          case err := <-writeErrCh:
          // ...此處省略代碼...
          if err != nil {
          pc.close(fmt.Errorf("write error: %v", err))
          return nil, pc.mapRoundTripError(req, startBytesWritten, err)
          }
          // ...此處省略代碼...
          case <-pc.closech:
          // ...此處省略代碼...
          return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
          case <-respHeaderTimer:
          // ...此處省略代碼...
          return nil, errTimeout
          case re := <-resc:
          if (re.res == nil) == (re.err == nil) {
          panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
          }
          if debugRoundTrip {
          req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
          }
          if re.err != nil {
          return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
          }
          return re.res, nil
          case <-cancelChan:
          pc.t.CancelRequest(req.Request)
          cancelChan = nil
          case <-ctxDoneChan:
          pc.t.cancelRequest(req.Request, req.Context().Err())
          cancelChan = nil
          ctxDoneChan = nil
          }
          }
          }

          由上可知, (*persistConn).roundTrip方法可以分為三步:

          1. 向連接的writech寫入writeRequest:?pc.writech <- writeRequest{req, writeErrCh, continueCh}, 參考(*Transport).dialConn可知pc.writech是一個緩沖大小為1的管道,所以會立馬寫入成功。

          2. 向連接的reqch寫入requestAndChan:?pc.reqch <- requestAndChan, pc.reqch和pc.writech一樣都是緩沖大小為1的管道。其中requestAndChan.ch是一個無緩沖的responseAndError管道,(*persistConn).roundTrip就通過這個管道讀取到真實的響應。

          3. 開啟for循環(huán)select, 等待響應或者超時等信息。

          • (*persistConn).writeLoop 寫循環(huán)

          (*persistConn).writeLoop方法主體邏輯相對簡單,把用戶的請求寫入連接的寫緩存buffer, 最后再flush就可以了。

          func (pc *persistConn) writeLoop() {
          defer close(pc.writeLoopDone)
          for {
          select {
          case wr := <-pc.writech:
          startBytesWritten := pc.nwrite
          err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
          if bre, ok := err.(requestBodyReadError); ok {
          err = bre.error
          wr.req.setError(err)
          }
          if err == nil {
          err = pc.bw.Flush()
          }
          if err != nil {
          wr.req.Request.closeBody()
          if pc.nwrite == startBytesWritten {
          err = nothingWrittenError{err}
          }
          }
          pc.writeErrCh <- err // to the body reader, which might recycle us
          wr.ch <- err // to the roundTrip function
          if err != nil {
          pc.close(err)
          return
          }
          case <-pc.closech:
          return
          }
          }
          }
          • (*persistConn).readLoop 讀循環(huán)

          (*persistConn).readLoop有較多的細節(jié), 我們先看代碼, 然后再逐步分析。

          func (pc *persistConn) readLoop() {
          closeErr := errReadLoopExiting // default value, if not changed below
          defer func() {
          pc.close(closeErr)
          pc.t.removeIdleConn(pc)
          }()

          tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
          if err := pc.t.tryPutIdleConn(pc); err != nil {
          // ...此處省略代碼...
          }
          // ...此處省略代碼...
          return true
          }
          // ...此處省略代碼...
          alive := true
          for alive {
          // ...此處省略代碼...
          rc := <-pc.reqch
          trace := httptrace.ContextClientTrace(rc.req.Context())

          var resp *Response
          if err == nil {
          resp, err = pc.readResponse(rc, trace)
          } else {
          err = transportReadFromServerError{err}
          closeErr = err
          }

          // ...此處省略代碼...
          bodyWritable := resp.bodyIsWritable()
          hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0

          if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
          // Don't do keep-alive on error if either party requested a close
          // or we get an unexpected informational (1xx) response.
          // StatusCode 100 is already handled above.
          alive = false
          }

          if !hasBody || bodyWritable {
          // ...此處省略代碼...
          continue
          }

          waitForBodyRead := make(chan bool, 2)
          body := &bodyEOFSignal{
          body: resp.Body,
          earlyCloseFn: func() error {
          waitForBodyRead <- false
          <-eofc // will be closed by deferred call at the end of the function
          return nil

          },
          fn: func(err error) error {
          isEOF := err == io.EOF
          waitForBodyRead <- isEOF
          if isEOF {
          <-eofc // see comment above eofc declaration
          } else if err != nil {
          if cerr := pc.canceled(); cerr != nil {
          return cerr
          }
          }
          return err
          },
          }

          resp.Body = body
          // ...此處省略代碼...

          select {
          case rc.ch <- responseAndError{res: resp}:
          case <-rc.callerGone:
          return
          }

          // Before looping back to the top of this function and peeking on
          // the bufio.Reader, wait for the caller goroutine to finish
          // reading the response body. (or for cancellation or death)
          select {
          case bodyEOF := <-waitForBodyRead:
          pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
          alive = alive &&
          bodyEOF &&
          !pc.sawEOF &&
          pc.wroteRequest() &&
          tryPutIdleConn(trace)
          if bodyEOF {
          eofc <- struct{}{}
          }
          case <-rc.req.Cancel:
          alive = false
          pc.t.CancelRequest(rc.req)
          case <-rc.req.Context().Done():
          alive = false
          pc.t.cancelRequest(rc.req, rc.req.Context().Err())
          case <-pc.closech:
          alive = false
          }

          testHookReadLoopBeforeNextRead()
          }
          }

          由上可知, 只要連接處于活躍狀態(tài), 則這個讀循環(huán)會一直開啟, 直到 連接不活躍或者產(chǎn)生其他錯誤才會結(jié)束讀循環(huán)。

          在上述源碼中,pc.readResponse(rc,trace)會從連接的讀buffer中獲取一個請求對應的Response。

          讀到響應之后判斷請求是否是HEAD請求或者響應內(nèi)容為空,如果是HEAD請求或者響應內(nèi)容為空則將響應寫入rc.ch,并將連接放入idleConn(此處因為篇幅的原因省略了源碼內(nèi)容, 正常請求的邏輯也有寫響應和將連接放入idleConn兩個步驟)。

          如果不是HEAD請求并且響應內(nèi)容不為空即!hasBody || bodyWritable為false:

          1. 創(chuàng)建一個緩沖大小為2的等待響應被讀取的管道waitForBodyRead:?waitForBodyRead := make(chan bool, 2)

          2. 將響應的Body修改為bodyEOFSignal結(jié)構(gòu)體。通過上面的源碼我們可以知道,此時的resp.Body中有earlyCloseFnfn兩個函數(shù)。earlyCloseFn函數(shù)會向waitForBodyRead管道寫入false, fn函數(shù)會判斷響應是否讀完, 如果已經(jīng)讀完則向waitForBodyRead寫入true否則寫入false。

          3. 將修改后的響應寫入rc.ch。其中rc.chrc := <-pc.reqch獲取,而pc.reqch正是前面(*persistConn).roundTrip函數(shù)寫入的requestAndChanrequestAndChan.ch是一個無緩沖的responseAndError管道,(*persistConn).roundTrip通過這個管道讀取到真實的響應。

          4. select 讀取 waitForBodyRead被寫入的值。如果讀到到的是true則可以調(diào)用tryPutIdleConn(此方法會調(diào)用前面提到的(*Transport).tryPutIdleConn方法)將連接放入idleConn從而復用連接。

          waitForBodyRead寫入true的原因我們已經(jīng)知道了,但是被寫入true的時機我們尚不明確。

          func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
          // ...此處省略代碼...
          n, err = es.body.Read(p)
          if err != nil {
          es.mu.Lock()
          defer es.mu.Unlock()
          if es.rerr == nil {
          es.rerr = err
          }
          err = es.condfn(err)
          }
          return
          }

          func (es *bodyEOFSignal) Close() error {
          es.mu.Lock()
          defer es.mu.Unlock()
          if es.closed {
          return nil
          }
          es.closed = true
          if es.earlyCloseFn != nil && es.rerr != io.EOF {
          return es.earlyCloseFn()
          }
          err := es.body.Close()
          return es.condfn(err)
          }

          // caller must hold es.mu.
          func (es *bodyEOFSignal) condfn(err error) error {
          if es.fn == nil {
          return err
          }
          err = es.fn(err)
          es.fn = nil
          return err
          }

          由上述源碼可知, 只有當調(diào)用方完整的讀取了響應,該連接才能夠被復用。因此在http1.1中,一個連接上的請求,只有等前一個請求處理完之后才能繼續(xù)下一個請求。如果前面的請求處理較慢, 則后面的請求必須等待, 這就是http1.1中的線頭阻塞。

          根據(jù)上面的邏輯, 我們GoPher在平時的開發(fā)中如果遇到了不關(guān)心響應的請求, 也一定要記得把響應body讀完以保證連接的復用性。筆者在這里給出一個demo:

          io.CopyN(ioutil.Discard, resp.Body, 2 << 10)
          resp.Body.Close()

          以上,就是筆者整理的HTTP1.1的請求流程。

          注意

          筆者本著嚴謹?shù)膽B(tài)度, 特此提醒:

          上述流程中筆者對很多細節(jié)并未詳細提及或者僅一筆帶過,希望讀者酌情參考。

          總結(jié)

          1. 在go中發(fā)起http1.1的請求時, 如果遇到不關(guān)心響應的請求,請務必完整讀取響應內(nèi)容以保證連接的復用性。

          2. 如果遇到對連接數(shù)有限制的業(yè)務,可以通過自定義http.Client的Transport, 并設置Transport的MaxConnsPerHost,MaxIdleConnsPerHost,IdleConnTimeoutDisableKeepAlives的值,來控制連接數(shù)。

          3. 如果對于重定向業(yè)務邏輯有需求,可以自定義http.Client的CheckRedirect。

          4. 在http1.1,中一個連接上的請求,只有等前一個請求處理完之后才能繼續(xù)下一個請求。如果前面的請求處理較慢, 則后面的請求必須等待, 這就是http1.1中的線頭阻塞。

          注: 寫本文時, 筆者所用go版本為: go1.14.2




          推薦閱讀



          學習交流 Go 語言,掃碼回復「進群」即可


          站長 polarisxu

          自己的原創(chuàng)文章

          不限于 Go 技術(shù)

          職場和創(chuàng)業(yè)經(jīng)驗


          Go語言中文網(wǎng)

          每天為你

          分享 Go 知識

          Go愛好者值得關(guān)注

          瀏覽 61
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  四虎成人精品永久免费AV九九 | 久久久久久三级片 | 黄色一级片免费看 | 推荐中文字幕无码专区 | 香蕉大香蕉久 |