<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Go發(fā)起HTTP2.0請求流程分析(中篇)——數(shù)據(jù)幀&流控制

          共 11951字,需瀏覽 24分鐘

           ·

          2020-09-19 08:20

          閱讀建議

          這是HTTP2.0系列的第二篇,所以筆者推薦閱讀順序如下:

          1. Go中的HTTP請求之——HTTP1.1請求流程分析

          2. Go發(fā)起HTTP2.0請求流程分析(前篇)

          本篇主要分為三個部分:數(shù)據(jù)幀,流控制器以及通過分析源碼逐步了解流控制。

          本有意將這三個部分拆成三篇文章,但它們之間又有聯(lián)系,所以最后依舊決定放在一篇文章里面。由于內(nèi)容較多,筆者認(rèn)為分三次分別閱讀三個部分較佳。

          數(shù)據(jù)幀

          HTTP2通信的最小單位是數(shù)據(jù)幀,每一個幀都包含兩部分:幀頭Payload。不同數(shù)據(jù)流的幀可以交錯發(fā)送(同一個數(shù)據(jù)流的幀必須順序發(fā)送),然后再根據(jù)每個幀頭的數(shù)據(jù)流標(biāo)識符重新組裝。

          由于Payload中為有效數(shù)據(jù),故僅對幀頭進行分析描述。

          幀頭

          幀頭總長度為9個字節(jié),并包含四個部分,分別是:

          1. Payload的長度,占用三個字節(jié)。

          2. 數(shù)據(jù)幀類型,占用一個字節(jié)。

          3. 數(shù)據(jù)幀標(biāo)識符,占用一個字節(jié)。

          4. 數(shù)據(jù)流ID,占用四個字節(jié)。

          用圖表示如下:

          數(shù)據(jù)幀的格式和各部分的含義已經(jīng)清楚了, 那么我們看看代碼中怎么讀取一個幀頭:

          func http2readFrameHeader(buf []byte, r io.Reader) (http2FrameHeader, error) {
          _, err := io.ReadFull(r, buf[:http2frameHeaderLen])
          if err != nil {
          return http2FrameHeader{}, err
          }
          return http2FrameHeader{
          Length: (uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2])),
          Type: http2FrameType(buf[3]),
          Flags: http2Flags(buf[4]),
          StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1),
          valid: true,
          }, nil
          }

          在上面的代碼中http2frameHeaderLen是一個常量,其值為9。

          從io.Reader中讀取9個字節(jié)后,將前三個字節(jié)和后四個字節(jié)均轉(zhuǎn)為uint32的類型,從而得到Payload長度和數(shù)據(jù)流ID。另外需要理解的是幀頭的前三個字節(jié)和后四個字節(jié)存儲格式為大端(大小端筆者就不在這里解釋了,請尚不了解的讀者自行百度)。

          數(shù)據(jù)幀類型

          根據(jù)http://http2.github.io/http2-spec/#rfc.section.11.2描述,數(shù)據(jù)幀類型總共有10個。在go源碼中均有體現(xiàn):

          const (
          http2FrameData http2FrameType = 0x0
          http2FrameHeaders http2FrameType = 0x1
          http2FramePriority http2FrameType = 0x2
          http2FrameRSTStream http2FrameType = 0x3
          http2FrameSettings http2FrameType = 0x4
          http2FramePushPromise http2FrameType = 0x5
          http2FramePing http2FrameType = 0x6
          http2FrameGoAway http2FrameType = 0x7
          http2FrameWindowUpdate http2FrameType = 0x8
          http2FrameContinuation http2FrameType = 0x9
          )

          http2FrameData:主要用于發(fā)送請求body和接收響應(yīng)的數(shù)據(jù)幀。

          http2FrameHeaders:主要用于發(fā)送請求header和接收響應(yīng)header的數(shù)據(jù)幀。

          http2FrameSettings:主要用于client和server交流設(shè)置相關(guān)的數(shù)據(jù)幀。

          http2FrameWindowUpdate:主要用于流控制的數(shù)據(jù)幀。

          其他數(shù)據(jù)幀類型因為本文不涉及,故不做描述。

          數(shù)據(jù)幀標(biāo)識符

          由于數(shù)據(jù)幀標(biāo)識符種類較多,筆者在這里僅介紹其中部分標(biāo)識符,先看源碼:

          const (
          // Data Frame
          http2FlagDataEndStream http2Flags = 0x1

          // Headers Frame
          http2FlagHeadersEndStream http2Flags = 0x1

          // Settings Frame
          http2FlagSettingsAck http2Flags = 0x1
          // 此處省略定義其他數(shù)據(jù)幀標(biāo)識符的代碼
          )

          http2FlagDataEndStream:在前篇中提到,調(diào)用(*http2ClientConn).newStream方法會創(chuàng)建一個數(shù)據(jù)流,那這個數(shù)據(jù)流什么時候結(jié)束呢,這就是http2FlagDataEndStream的作用。

          當(dāng)client收到有響應(yīng)body的響應(yīng)時(HEAD請求無響應(yīng)body,301,302等響應(yīng)也無響應(yīng)body),一直讀到http2FrameData數(shù)據(jù)幀的標(biāo)識符為http2FlagDataEndStream則意味著本次請求結(jié)束可以關(guān)閉當(dāng)前數(shù)據(jù)流。

          http2FlagHeadersEndStream:如果讀到的http2FrameHeaders數(shù)據(jù)幀有此標(biāo)識符也意味著本次請求結(jié)束。

          http2FlagSettingsAck:該標(biāo)示符意味著對方確認(rèn)收到http2FrameSettings數(shù)據(jù)幀。

          流控制器

          流控制是一種阻止發(fā)送方向接收方發(fā)送大量數(shù)據(jù)的機制,以免超出后者的需求或處理能力。Go中HTTP2通過http2flow結(jié)構(gòu)體進行流控制:

          type http2flow struct {
          // n is the number of DATA bytes we're allowed to send.
          // A flow is kept both on a conn and a per-stream.
          n int32

          // conn points to the shared connection-level flow that is
          // shared by all streams on that conn. It is nil for the flow
          // that's on the conn directly.
          conn *http2flow
          }

          字段含義英文注釋已經(jīng)描述的很清楚了,所以筆者不再翻譯。下面看一下和流控制有關(guān)的方法。

          (*http2flow).available

          此方法返回當(dāng)前流控制可發(fā)送的最大字節(jié)數(shù):

          func (f *http2flow) available() int32 {
          n := f.n
          if f.conn != nil && f.conn.n < n {
          n = f.conn.n
          }
          return n
          }
          • 如果f.conn為nil則意味著此控制器的控制級別為連接,那么可發(fā)送的最大字節(jié)數(shù)就是f.n

          • 如果f.conn不為nil則意味著此控制器的控制級別為數(shù)據(jù)流,且當(dāng)前數(shù)據(jù)流可發(fā)送的最大字節(jié)數(shù)不能超過當(dāng)前連接可發(fā)送的最大字節(jié)數(shù)。

          (*http2flow).take

          此方法用于消耗當(dāng)前流控制器的可發(fā)送字節(jié)數(shù):

          func (f *http2flow) take(n int32) {
          if n > f.available() {
          panic("internal error: took too much")
          }
          f.n -= n
          if f.conn != nil {
          f.conn.n -= n
          }
          }

          通過實際需要傳遞一個參數(shù),告知當(dāng)前流控制器想要發(fā)送的數(shù)據(jù)大小。如果發(fā)送的大小超過流控制器允許的大小,則panic,如果未超過流控制器允許的大小,則將當(dāng)前數(shù)據(jù)流和當(dāng)前連接的可發(fā)送字節(jié)數(shù)-n。

          (*http2flow).add

          有消耗就有新增,此方法用于增加流控制器可發(fā)送的最大字節(jié)數(shù):

          func (f *http2flow) add(n int32) bool {
          sum := f.n + n
          if (sum > n) == (f.n > 0) {
          f.n = sum
          return true
          }
          return false
          }

          上面的代碼唯一需要注意的地方是,當(dāng)sum超過int32正數(shù)最大值(2^31-1)時會返回false。

          回顧:在前篇中提到的(*http2Transport).NewClientConn方法和(*http2ClientConn).newStream方法均通過(*http2flow).add初始化可發(fā)送數(shù)據(jù)窗口大小。

          有了幀和流控制器的基本概念,下面我們結(jié)合源碼來分析總結(jié)流控制的具體實現(xiàn)。

          (*http2ClientConn).readLoop

          前篇分析(*http2Transport).newClientConn時止步于讀循環(huán),那么今天我們就從(*http2ClientConn).readLoop開始。

          func (cc *http2ClientConn) readLoop() {
          rl := &http2clientConnReadLoop{cc: cc}
          defer rl.cleanup()
          cc.readerErr = rl.run()
          if ce, ok := cc.readerErr.(http2ConnectionError); ok {
          cc.wmu.Lock()
          cc.fr.WriteGoAway(0, http2ErrCode(ce), nil)
          cc.wmu.Unlock()
          }
          }

          由上可知,readLoop的邏輯比較簡單,其核心邏輯在(*http2clientConnReadLoop).run方法里。

          func (rl *http2clientConnReadLoop) run() error {
          cc := rl.cc
          rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
          gotReply := false // ever saw a HEADERS reply
          gotSettings := false
          for {
          f, err := cc.fr.ReadFrame()
          // 此處省略代碼
          maybeIdle := false // whether frame might transition us to idle

          switch f := f.(type) {
          case *http2MetaHeadersFrame:
          err = rl.processHeaders(f)
          maybeIdle = true
          gotReply = true
          case *http2DataFrame:
          err = rl.processData(f)
          maybeIdle = true
          case *http2GoAwayFrame:
          err = rl.processGoAway(f)
          maybeIdle = true
          case *http2RSTStreamFrame:
          err = rl.processResetStream(f)
          maybeIdle = true
          case *http2SettingsFrame:
          err = rl.processSettings(f)
          case *http2PushPromiseFrame:
          err = rl.processPushPromise(f)
          case *http2WindowUpdateFrame:
          err = rl.processWindowUpdate(f)
          case *http2PingFrame:
          err = rl.processPing(f)
          default:
          cc.logf("Transport: unhandled response frame type %T", f)
          }
          if err != nil {
          if http2VerboseLogs {
          cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, http2summarizeFrame(f), err)
          }
          return err
          }
          if rl.closeWhenIdle && gotReply && maybeIdle {
          cc.closeIfIdle()
          }
          }
          }

          由上可知,(*http2clientConnReadLoop).run的核心邏輯是讀取數(shù)據(jù)幀然后對不同的數(shù)據(jù)幀進行不同的處理。

          cc.fr.ReadFrame()會根據(jù)前面介紹的數(shù)據(jù)幀格式讀出數(shù)據(jù)幀。

          前篇中提到使用了一個支持h2協(xié)議的圖片進行分析,本篇繼續(xù)復(fù)用該圖片對(*http2clientConnReadLoop).run方法進行debug。

          收到http2FrameSettings數(shù)據(jù)幀

          讀循環(huán)會最先讀到http2FrameSettings數(shù)據(jù)幀。讀到該數(shù)據(jù)幀后會調(diào)用(*http2clientConnReadLoop).processSettings方法。(*http2clientConnReadLoop).processSettings主要包含3個邏輯。

          1、判斷是否是http2FrameSettings的ack信息,如果是直接返回,否則繼續(xù)后面的步驟。

          if f.IsAck() {
          if cc.wantSettingsAck {
          cc.wantSettingsAck = false
          return nil
          }
          return http2ConnectionError(http2ErrCodeProtocol)
          }

          2、處理不同http2FrameSettings的數(shù)據(jù)幀,并根據(jù)server傳遞的信息,修改maxConcurrentStreams等的值。

          err := f.ForeachSetting(func(s http2Setting) error {
          switch s.ID {
          case http2SettingMaxFrameSize:
          cc.maxFrameSize = s.Val
          case http2SettingMaxConcurrentStreams:
          cc.maxConcurrentStreams = s.Val
          case http2SettingMaxHeaderListSize:
          cc.peerMaxHeaderListSize = uint64(s.Val)
          case http2SettingInitialWindowSize:
          if s.Val > math.MaxInt32 {
          return http2ConnectionError(http2ErrCodeFlowControl)
          }
          delta := int32(s.Val) - int32(cc.initialWindowSize)
          for _, cs := range cc.streams {
          cs.flow.add(delta)
          }
          cc.cond.Broadcast()
          cc.initialWindowSize = s.Val
          default:
          // TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably.
          cc.vlogf("Unhandled Setting: %v", s)
          }
          return nil
          })

          當(dāng)收到ID為http2SettingInitialWindowSize的幀時,會調(diào)整當(dāng)前連接中所有數(shù)據(jù)流的可發(fā)送數(shù)據(jù)窗口大小,并修改當(dāng)前連接的initialWindowSize(每個新創(chuàng)建的數(shù)據(jù)流均會使用該值初始化可發(fā)送數(shù)據(jù)窗口大小)s.Val。

          3、發(fā)送http2FrameSettings的ack信息給server。

          	cc.wmu.Lock()
          defer cc.wmu.Unlock()

          cc.fr.WriteSettingsAck()
          cc.bw.Flush()
          return cc.werr

          收到http2WindowUpdateFrame數(shù)據(jù)幀

          在筆者debug的過程中,處理完http2FrameSettings數(shù)據(jù)幀后,緊接著就收到了http2WindowUpdateFrame數(shù)據(jù)幀。收到該數(shù)據(jù)幀后會調(diào)用(*http2clientConnReadLoop).processWindowUpdate方法:

          func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame) error {
          cc := rl.cc
          cs := cc.streamByID(f.StreamID, false)
          if f.StreamID != 0 && cs == nil {
          return nil
          }

          cc.mu.Lock()
          defer cc.mu.Unlock()

          fl := &cc.flow
          if cs != nil {
          fl = &cs.flow
          }
          if !fl.add(int32(f.Increment)) {
          return http2ConnectionError(http2ErrCodeFlowControl)
          }
          cc.cond.Broadcast()
          return nil
          }

          上面的邏輯主要用于更新當(dāng)前連接和數(shù)據(jù)流的可發(fā)送數(shù)據(jù)窗口大小。如果http2WindowUpdateFrame幀中的StreamID為0,則更新當(dāng)前連接的可發(fā)送數(shù)據(jù)窗口大小,否則更新對應(yīng)數(shù)據(jù)流可發(fā)送數(shù)據(jù)窗口大小。

          注意:在debug的過程,收到http2WindowUpdateFrame數(shù)據(jù)幀后,又收到一次http2FrameSettings,且該數(shù)據(jù)幀標(biāo)識符為http2FlagSettingsAck。

          筆者在這里特意提醒,這是因為前篇中提到的(*http2Transport).NewClientConn方法,也向server發(fā)送了http2FrameSettings數(shù)據(jù)幀和http2WindowUpdateFrame數(shù)據(jù)幀。

          另外,在處理http2FrameSettingshttp2WindowUpdateFrame過程中,均出現(xiàn)了cc.cond.Broadcast()調(diào)用,該調(diào)用主要用于喚醒因為以下兩種情況而Wait的請求:

          1. 因當(dāng)前連接處理的數(shù)據(jù)流已經(jīng)達到maxConcurrentStreams的上限(詳見前篇中(*http2ClientConn).awaitOpenSlotForRequest方法分析)。

          2. 因發(fā)送數(shù)據(jù)流已達可發(fā)送數(shù)據(jù)窗口上限而等待可發(fā)送數(shù)據(jù)窗口更新的請求(后續(xù)會介紹)。

          收到http2MetaHeadersFrame數(shù)據(jù)幀

          收到此數(shù)據(jù)幀意味著某一個請求已經(jīng)開始接收響應(yīng)數(shù)據(jù)。此數(shù)據(jù)幀對應(yīng)的處理函數(shù)為(*http2clientConnReadLoop).processHeaders

          func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) error {
          cc := rl.cc
          cs := cc.streamByID(f.StreamID, false)
          // 此處省略代碼
          res, err := rl.handleResponse(cs, f)
          if err != nil {
          // 此處省略代碼
          cs.resc <- http2resAndError{err: err}
          return nil // return nil from process* funcs to keep conn alive
          }
          if res == nil {
          // (nil, nil) special case. See handleResponse docs.
          return nil
          }
          cs.resTrailer = &res.Trailer
          cs.resc <- http2resAndError{res: res}
          return nil
          }

          首先我們先看cs.resc <- http2resAndError{res: res}這一行代碼,向數(shù)據(jù)流寫入http2resAndError即本次請求的響應(yīng)。在(*http2ClientConn).roundTrip方法中有這樣一行代碼readLoopResCh := cs.resc

          回顧:前篇(*http2ClientConn).roundTrip方法的第7點和本部分關(guān)聯(lián)起來就可以形成一個完整的請求鏈。

          接下來我們對rl.handleResponse方法展開分析。

          (*http2clientConnReadLoop).handleResponse

          (*http2clientConnReadLoop).handleResponse的主要作用是構(gòu)建一個Response變量,下面對該函數(shù)的關(guān)鍵步驟進行描述。

          1、構(gòu)建一個Response變量。

          header := make(Header)
          res := &Response{
          Proto: "HTTP/2.0",
          ProtoMajor: 2,
          Header: header,
          StatusCode: statusCode,
          Status: status + " " + StatusText(statusCode),
          }

          2、構(gòu)建header(本篇不對header進行展開分析)。

          for _, hf := range f.RegularFields() {
          key := CanonicalHeaderKey(hf.Name)
          if key == "Trailer" {
          t := res.Trailer
          if t == nil {
          t = make(Header)
          res.Trailer = t
          }
          http2foreachHeaderElement(hf.Value, func(v string) {
          t[CanonicalHeaderKey(v)] = nil
          })
          } else {
          header[key] = append(header[key], hf.Value)
          }
          }

          3、處理響應(yīng)body的ContentLength。

          streamEnded := f.StreamEnded()
          isHead := cs.req.Method == "HEAD"
          if !streamEnded || isHead {
          res.ContentLength = -1
          if clens := res.Header["Content-Length"]; len(clens) == 1 {
          if clen64, err := strconv.ParseInt(clens[0], 10, 64); err == nil {
          res.ContentLength = clen64
          } else {
          // TODO: care? unlike http/1, it won't mess up our framing, so it's
          // more safe smuggling-wise to ignore.
          }
          } else if len(clens) > 1 {
          // TODO: care? unlike http/1, it won't mess up our framing, so it's
          // more safe smuggling-wise to ignore.
          }
          }

          由上可知,當(dāng)前數(shù)據(jù)流沒有結(jié)束或者是HEAD請求才讀取ContentLength。如果header中的ContentLength不合法則res.ContentLength的值為?-1

          4、構(gòu)建res.Body。

          cs.bufPipe = http2pipe{b: &http2dataBuffer{expected: res.ContentLength}}
          cs.bytesRemain = res.ContentLength
          res.Body = http2transportResponseBody{cs}
          go cs.awaitRequestCancel(cs.req)

          if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" {
          res.Header.Del("Content-Encoding")
          res.Header.Del("Content-Length")
          res.ContentLength = -1
          res.Body = &http2gzipReader{body: res.Body}
          res.Uncompressed = true
          }

          根據(jù)Content-Encoding的編碼方式,會構(gòu)建兩種不同的Body:

          1. 非gzip編碼時,構(gòu)造的res.Body類型為http2transportResponseBody

          2. gzip編碼時,構(gòu)造的res.Body類型為http2gzipReader。

          收到http2DataFrame數(shù)據(jù)幀

          收到此數(shù)據(jù)幀意味著我們開始接收真實的響應(yīng),即平常開發(fā)中需要處理的業(yè)務(wù)數(shù)據(jù)。此數(shù)據(jù)幀對應(yīng)的處理函數(shù)為(*http2clientConnReadLoop).processData

          因為server無法及時知道數(shù)據(jù)流在client端的狀態(tài),所以server可能會向client中一個已經(jīng)不存在的數(shù)據(jù)流發(fā)送數(shù)據(jù):

          cc := rl.cc
          cs := cc.streamByID(f.StreamID, f.StreamEnded())
          data := f.Data()
          if cs == nil {
          cc.mu.Lock()
          neverSent := cc.nextStreamID
          cc.mu.Unlock()
          // 此處省略代碼
          if f.Length > 0 {
          cc.mu.Lock()
          cc.inflow.add(int32(f.Length))
          cc.mu.Unlock()

          cc.wmu.Lock()
          cc.fr.WriteWindowUpdate(0, uint32(f.Length))
          cc.bw.Flush()
          cc.wmu.Unlock()
          }
          return nil
          }

          接收到的數(shù)據(jù)幀在client沒有對應(yīng)的數(shù)據(jù)流處理時,通過流控制器為當(dāng)前連接可讀窗口大小增加f.Length,并且通過http2FrameWindowUpdate數(shù)據(jù)幀告知server將當(dāng)前連接的可寫窗口大小增加f.Length

          如果client有對應(yīng)的數(shù)據(jù)流且f.Length大于0:

          1、如果是head請求結(jié)束當(dāng)前數(shù)據(jù)流并返回。

          if cs.req.Method == "HEAD" && len(data) > 0 {
          cc.logf("protocol error: received DATA on a HEAD request")
          rl.endStreamError(cs, http2StreamError{
          StreamID: f.StreamID,
          Code: http2ErrCodeProtocol,
          })
          return nil
          }

          2、檢查當(dāng)前數(shù)據(jù)流能否處理f.Length長度的數(shù)據(jù)。

          cc.mu.Lock()
          if cs.inflow.available() >= int32(f.Length) {
          cs.inflow.take(int32(f.Length))
          } else {
          cc.mu.Unlock()
          return http2ConnectionError(http2ErrCodeFlowControl)
          }

          由上可知當(dāng)前數(shù)據(jù)流如果能夠處理該數(shù)據(jù),通過流控制器調(diào)用cs.inflow.take減小當(dāng)前數(shù)據(jù)流可接受窗口大小。

          3、當(dāng)前數(shù)據(jù)流被重置或者被關(guān)閉即cs.didReset為true時又或者數(shù)據(jù)幀有填充數(shù)據(jù)時需要調(diào)整流控制窗口。

          var refund int
          if pad := int(f.Length) - len(data); pad > 0 {
          refund += pad
          }
          // Return len(data) now if the stream is already closed,
          // since data will never be read.
          didReset := cs.didReset
          if didReset {
          refund += len(data)
          }
          if refund > 0 {
          cc.inflow.add(int32(refund))
          cc.wmu.Lock()
          cc.fr.WriteWindowUpdate(0, uint32(refund))
          if !didReset {
          cs.inflow.add(int32(refund))
          cc.fr.WriteWindowUpdate(cs.ID, uint32(refund))
          }
          cc.bw.Flush()
          cc.wmu.Unlock()
          }
          cc.mu.Unlock()
          • 如果數(shù)據(jù)幀有填充數(shù)據(jù)則計算需要返還的填充數(shù)據(jù)長度。

          • 如果數(shù)據(jù)流無效該數(shù)據(jù)幀的長度需要全部返還。

          最后,根據(jù)計算的refund增加當(dāng)前連接或者當(dāng)前數(shù)據(jù)流的可接受窗口大小,并且同時告知server增加當(dāng)前連接或者當(dāng)前數(shù)據(jù)流的可寫窗口大小。

          4、數(shù)據(jù)長度大于0且數(shù)據(jù)流正常則將數(shù)據(jù)寫入數(shù)據(jù)流緩沖區(qū)。

          if len(data) > 0 && !didReset {
          if _, err := cs.bufPipe.Write(data); err != nil {
          rl.endStreamError(cs, err)
          return err
          }
          }

          回顧:前面的(*http2clientConnReadLoop).handleResponse方法中有這樣一行代碼res.Body = http2transportResponseBody{cs},所以在業(yè)務(wù)開發(fā)時能夠通過Response讀取到數(shù)據(jù)流中的緩沖數(shù)據(jù)。

          (http2transportResponseBody).Read

          在前面的內(nèi)容里,如果數(shù)據(jù)流狀態(tài)正常且數(shù)據(jù)幀沒有填充數(shù)據(jù)則數(shù)據(jù)流和連接的可接收窗口會一直變小,而這部分內(nèi)容就是增加數(shù)據(jù)流的可接受窗口大小。

          因為篇幅和主旨的問題筆者僅分析描述該方法內(nèi)和流控制有關(guān)的部分。

          1、讀取響應(yīng)數(shù)據(jù)后計算當(dāng)前連接需要增加的可接受窗口大小。

          cc.mu.Lock()
          defer cc.mu.Unlock()
          var connAdd, streamAdd int32
          // Check the conn-level first, before the stream-level.
          if v := cc.inflow.available(); v < http2transportDefaultConnFlow/2 {
          connAdd = http2transportDefaultConnFlow - v
          cc.inflow.add(connAdd)
          }

          如果當(dāng)前連接可接受窗口的大小已經(jīng)小于http2transportDefaultConnFlow(1G)的一半,則當(dāng)前連接可接收窗口大小需要增加http2transportDefaultConnFlow - cc.inflow.available()

          回顧http2transportDefaultConnFlow在前篇(*http2Transport).NewClientConn方法部分有提到,且連接剛建立時會通過http2WindowUpdateFrame數(shù)據(jù)幀告知server當(dāng)前連接可發(fā)送窗口大小增加http2transportDefaultConnFlow。

          2、讀取響應(yīng)數(shù)據(jù)后計算當(dāng)前數(shù)據(jù)流需要增加的可接受窗口大小。

          if err == nil { // No need to refresh if the stream is over or failed.
          // Consider any buffered body data (read from the conn but not
          // consumed by the client) when computing flow control for this
          // stream.
          v := int(cs.inflow.available()) + cs.bufPipe.Len()
          if v < http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh {
          streamAdd = int32(http2transportDefaultStreamFlow - v)
          cs.inflow.add(streamAdd)
          }
          }

          如果當(dāng)前數(shù)據(jù)流可接受窗口大小加上當(dāng)前數(shù)據(jù)流緩沖區(qū)剩余未讀數(shù)據(jù)的長度小于http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh(4M-4KB),則當(dāng)前數(shù)據(jù)流可接受窗口大小需要增加http2transportDefaultStreamFlow - v

          回顧http2transportDefaultStreamFlow在前篇(*http2Transport).NewClientConn方法和(*http2ClientConn).newStream方法中均有提到。

          連接剛建立時,發(fā)送http2FrameSettings數(shù)據(jù)幀,告知server每個數(shù)據(jù)流的可發(fā)送窗口大小為http2transportDefaultStreamFlow

          newStream時,數(shù)據(jù)流默認(rèn)的可接收窗口大小為http2transportDefaultStreamFlow。

          3、將連接和數(shù)據(jù)流分別需要增加的窗口大小通過http2WindowUpdateFrame數(shù)據(jù)幀告知server。

          if connAdd != 0 || streamAdd != 0 {
          cc.wmu.Lock()
          defer cc.wmu.Unlock()
          if connAdd != 0 {
          cc.fr.WriteWindowUpdate(0, http2mustUint31(connAdd))
          }
          if streamAdd != 0 {
          cc.fr.WriteWindowUpdate(cs.ID, http2mustUint31(streamAdd))
          }
          cc.bw.Flush()
          }

          以上就是server向client發(fā)送數(shù)據(jù)的流控制邏輯。

          (*http2clientStream).writeRequestBody

          前篇中(*http2ClientConn).roundTrip未對(*http2clientStream).writeRequestBody進行分析,下面我們看看該方法的源碼:

          func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) {
          cc := cs.cc
          sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
          // 此處省略代碼
          req := cs.req
          hasTrailers := req.Trailer != nil
          remainLen := http2actualContentLength(req)
          hasContentLen := remainLen != -1

          var sawEOF bool
          for !sawEOF {
          n, err := body.Read(buf[:len(buf)-1])
          // 此處省略代碼
          remain := buf[:n]
          for len(remain) > 0 && err == nil {
          var allowed int32
          allowed, err = cs.awaitFlowControl(len(remain))
          switch {
          case err == http2errStopReqBodyWrite:
          return err
          case err == http2errStopReqBodyWriteAndCancel:
          cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
          return err
          case err != nil:
          return err
          }
          cc.wmu.Lock()
          data := remain[:allowed]
          remain = remain[allowed:]
          sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
          err = cc.fr.WriteData(cs.ID, sentEnd, data)
          if err == nil {
          err = cc.bw.Flush()
          }
          cc.wmu.Unlock()
          }
          if err != nil {
          return err
          }
          }
          // 此處省略代碼
          return err
          }

          上面的邏輯可簡單總結(jié)為:不停的讀取請求body然后將讀取的內(nèi)容通過cc.fr.WriteData轉(zhuǎn)為http2FrameData數(shù)據(jù)幀發(fā)送給server,直到請求body讀完為止。其中和流控制有關(guān)的方法是awaitFlowControl,下面我們對該方法進行分析。

          (*http2clientStream).awaitFlowControl

          此方法的主要作用是等待當(dāng)前數(shù)據(jù)流可寫窗口有容量能夠?qū)懭霐?shù)據(jù)。

          func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
          cc := cs.cc
          cc.mu.Lock()
          defer cc.mu.Unlock()
          for {
          if cc.closed {
          return 0, http2errClientConnClosed
          }
          if cs.stopReqBody != nil {
          return 0, cs.stopReqBody
          }
          if err := cs.checkResetOrDone(); err != nil {
          return 0, err
          }
          if a := cs.flow.available(); a > 0 {
          take := a
          if int(take) > maxBytes {

          take = int32(maxBytes) // can't truncate int; take is int32
          }
          if take > int32(cc.maxFrameSize) {
          take = int32(cc.maxFrameSize)
          }
          cs.flow.take(take)
          return take, nil
          }
          cc.cond.Wait()
          }
          }

          根據(jù)源碼可以知道,數(shù)據(jù)流被關(guān)閉或者停止發(fā)送請求body,則當(dāng)前數(shù)據(jù)流無法寫入數(shù)據(jù)。當(dāng)數(shù)據(jù)流狀態(tài)正常時,又分為兩種情況:

          1. 當(dāng)前數(shù)據(jù)流可寫窗口剩余可寫數(shù)據(jù)大于0,則計算可寫字節(jié)數(shù),并將當(dāng)前數(shù)據(jù)流可寫窗口大小消耗take。

          2. 當(dāng)前數(shù)據(jù)流可寫窗口剩余可寫數(shù)據(jù)小于等于0,則會一直等待直到被喚醒并進入下一次檢查。

          上面的第二種情況在收到http2WindowUpdateFrame數(shù)據(jù)幀這一節(jié)中提到過。

          server讀取當(dāng)前數(shù)據(jù)流的數(shù)據(jù)后會向client對應(yīng)數(shù)據(jù)流發(fā)送http2WindowUpdateFrame數(shù)據(jù)幀,client收到該數(shù)據(jù)幀后會增大對應(yīng)數(shù)據(jù)流可寫窗口,并執(zhí)行cc.cond.Broadcast()喚醒因發(fā)送數(shù)據(jù)已達流控制上限而等待的數(shù)據(jù)流繼續(xù)發(fā)送數(shù)據(jù)。

          以上就是client向server發(fā)送數(shù)據(jù)的流控制邏輯。

          總結(jié)

          1. 幀頭長度為9個字節(jié),并包含四個部分:Payload的長度、幀類型、幀標(biāo)識符和數(shù)據(jù)流ID。

          2. 流控制可分為兩個步驟:

            • 初始時,通過http2FrameSettings數(shù)據(jù)幀和http2WindowUpdateFrame數(shù)據(jù)幀告知對方當(dāng)前連接讀寫窗口大小以及連接中數(shù)據(jù)流讀寫窗口大小。

            • 在讀寫數(shù)據(jù)過程中,通過發(fā)送http2WindowUpdateFrame數(shù)據(jù)幀控制另一端的寫窗口大小。

          預(yù)告

          前篇和中篇已經(jīng)完成,下一期將對http2.0標(biāo)頭壓縮進行分析。

          最后,衷心希望本文能夠?qū)Ω魑蛔x者有一定的幫助。

          :寫本文時, 筆者所用go版本為: go1.14.2




          推薦閱讀



          學(xué)習(xí)交流 Go 語言,掃碼回復(fù)「進群」即可


          站長 polarisxu

          自己的原創(chuàng)文章

          不限于 Go 技術(shù)

          職場和創(chuàng)業(yè)經(jīng)驗


          Go語言中文網(wǎng)

          每天為你

          分享 Go 知識

          Go愛好者值得關(guān)注



          瀏覽 47
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  高清无码一区二区三区 | 欧美在线 | 亚洲 | 成人无码无遮又大又爽 | 欧美日韩A∨ | 爱综合五月天 |