Go發(fā)起HTTP2.0請求流程分析(中篇)——數(shù)據(jù)幀&流控制
回復“Go語言”即可獲贈從入門到進階共10本電子書
蜀僧抱綠綺,西下峨眉峰。
閱讀建議
這是HTTP2.0系列的第二篇,所以筆者推薦閱讀順序如下:
本篇主要分為三個部分:數(shù)據(jù)幀,流控制器以及通過分析源碼逐步了解流控制。
本有意將這三個部分拆成三篇文章,但它們之間又有聯(lián)系,所以最后依舊決定放在一篇文章里面。由于內容較多,筆者認為分三次分別閱讀三個部分較佳。
數(shù)據(jù)幀
HTTP2通信的最小單位是數(shù)據(jù)幀,每一個幀都包含兩部分:幀頭和Payload。不同數(shù)據(jù)流的幀可以交錯發(fā)送(同一個數(shù)據(jù)流的幀必須順序發(fā)送),然后再根據(jù)每個幀頭的數(shù)據(jù)流標識符重新組裝。
由于Payload中為有效數(shù)據(jù),故僅對幀頭進行分析描述。
幀頭
幀頭總長度為9個字節(jié),并包含四個部分,分別是:
Payload的長度,占用三個字節(jié)。
數(shù)據(jù)幀類型,占用一個字節(jié)。
數(shù)據(jù)幀標識符,占用一個字節(jié)。
數(shù)據(jù)流ID,占用四個字節(jié)。
用圖表示如下:

數(shù)據(jù)幀的格式和各部分的含義已經清楚了, 那么我們看看代碼中怎么讀取一個幀頭:
func http2readFrameHeader(buf []byte, r io.Reader) (http2FrameHeader, error) {
_, err := io.ReadFull(r, buf[:http2frameHeaderLen])
if err != nil {
return http2FrameHeader{}, err
}
return http2FrameHeader{
Length: (uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2])),
Type: http2FrameType(buf[3]),
Flags: http2Flags(buf[4]),
StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1),
valid: true,
}, nil
}
在上面的代碼中http2frameHeaderLen是一個常量,其值為9。
從io.Reader中讀取9個字節(jié)后,將前三個字節(jié)和后四個字節(jié)均轉為uint32的類型,從而得到Payload長度和數(shù)據(jù)流ID。另外需要理解的是幀頭的前三個字節(jié)和后四個字節(jié)存儲格式為大端(大小端筆者就不在這里解釋了,請尚不了解的讀者自行百度)。
數(shù)據(jù)幀類型
根據(jù)http://http2.github.io/http2-spec/#rfc.section.11.2描述,數(shù)據(jù)幀類型總共有10個。在go源碼中均有體現(xiàn):
const (
http2FrameData http2FrameType = 0x0
http2FrameHeaders http2FrameType = 0x1
http2FramePriority http2FrameType = 0x2
http2FrameRSTStream http2FrameType = 0x3
http2FrameSettings http2FrameType = 0x4
http2FramePushPromise http2FrameType = 0x5
http2FramePing http2FrameType = 0x6
http2FrameGoAway http2FrameType = 0x7
http2FrameWindowUpdate http2FrameType = 0x8
http2FrameContinuation http2FrameType = 0x9
)
http2FrameData:主要用于發(fā)送請求body和接收響應的數(shù)據(jù)幀。
http2FrameHeaders:主要用于發(fā)送請求header和接收響應header的數(shù)據(jù)幀。
http2FrameSettings:主要用于client和server交流設置相關的數(shù)據(jù)幀。
http2FrameWindowUpdate:主要用于流控制的數(shù)據(jù)幀。
其他數(shù)據(jù)幀類型因為本文不涉及,故不做描述。
數(shù)據(jù)幀標識符
由于數(shù)據(jù)幀標識符種類較多,筆者在這里僅介紹其中部分標識符,先看源碼:
const (
// Data Frame
http2FlagDataEndStream http2Flags = 0x1
// Headers Frame
http2FlagHeadersEndStream http2Flags = 0x1
// Settings Frame
http2FlagSettingsAck http2Flags = 0x1
// 此處省略定義其他數(shù)據(jù)幀標識符的代碼
)
http2FlagDataEndStream:在前篇中提到,調用(*http2ClientConn).newStream方法會創(chuàng)建一個數(shù)據(jù)流,那這個數(shù)據(jù)流什么時候結束呢,這就是http2FlagDataEndStream的作用。
當client收到有響應body的響應時(HEAD請求無響應body,301,302等響應也無響應body),一直讀到http2FrameData數(shù)據(jù)幀的標識符為http2FlagDataEndStream則意味著本次請求結束可以關閉當前數(shù)據(jù)流。
http2FlagHeadersEndStream:如果讀到的http2FrameHeaders數(shù)據(jù)幀有此標識符也意味著本次請求結束。
http2FlagSettingsAck:該標示符意味著對方確認收到http2FrameSettings數(shù)據(jù)幀。
流控制器
流控制是一種阻止發(fā)送方向接收方發(fā)送大量數(shù)據(jù)的機制,以免超出后者的需求或處理能力。Go中HTTP2通過http2flow結構體進行流控制:
type http2flow struct {
// n is the number of DATA bytes we're allowed to send.
// A flow is kept both on a conn and a per-stream.
n int32
// conn points to the shared connection-level flow that is
// shared by all streams on that conn. It is nil for the flow
// that's on the conn directly.
conn *http2flow
}
字段含義英文注釋已經描述的很清楚了,所以筆者不再翻譯。下面看一下和流控制有關的方法。
(*http2flow).available
此方法返回當前流控制可發(fā)送的最大字節(jié)數(shù):
func (f *http2flow) available() int32 {
n := f.n
if f.conn != nil && f.conn.n < n {
n = f.conn.n
}
return n
}
如果
f.conn為nil則意味著此控制器的控制級別為連接,那么可發(fā)送的最大字節(jié)數(shù)就是f.n。如果
f.conn不為nil則意味著此控制器的控制級別為數(shù)據(jù)流,且當前數(shù)據(jù)流可發(fā)送的最大字節(jié)數(shù)不能超過當前連接可發(fā)送的最大字節(jié)數(shù)。
(*http2flow).take
此方法用于消耗當前流控制器的可發(fā)送字節(jié)數(shù):
func (f *http2flow) take(n int32) {
if n > f.available() {
panic("internal error: took too much")
}
f.n -= n
if f.conn != nil {
f.conn.n -= n
}
}
通過實際需要傳遞一個參數(shù),告知當前流控制器想要發(fā)送的數(shù)據(jù)大小。如果發(fā)送的大小超過流控制器允許的大小,則panic,如果未超過流控制器允許的大小,則將當前數(shù)據(jù)流和當前連接的可發(fā)送字節(jié)數(shù)-n。
(*http2flow).add
有消耗就有新增,此方法用于增加流控制器可發(fā)送的最大字節(jié)數(shù):
func (f *http2flow) add(n int32) bool {
sum := f.n + n
if (sum > n) == (f.n > 0) {
f.n = sum
return true
}
return false
}
上面的代碼唯一需要注意的地方是,當sum超過int32正數(shù)最大值(2^31-1)時會返回false。
回顧:在前篇中提到的(*http2Transport).NewClientConn方法和(*http2ClientConn).newStream方法均通過(*http2flow).add初始化可發(fā)送數(shù)據(jù)窗口大小。
有了幀和流控制器的基本概念,下面我們結合源碼來分析總結流控制的具體實現(xiàn)。
(*http2ClientConn).readLoop
前篇分析(*http2Transport).newClientConn時止步于讀循環(huán),那么今天我們就從(*http2ClientConn).readLoop開始。
func (cc *http2ClientConn) readLoop() {
rl := &http2clientConnReadLoop{cc: cc}
defer rl.cleanup()
cc.readerErr = rl.run()
if ce, ok := cc.readerErr.(http2ConnectionError); ok {
cc.wmu.Lock()
cc.fr.WriteGoAway(0, http2ErrCode(ce), nil)
cc.wmu.Unlock()
}
}
由上可知,readLoop的邏輯比較簡單,其核心邏輯在(*http2clientConnReadLoop).run方法里。
func (rl *http2clientConnReadLoop) run() error {
cc := rl.cc
rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
gotReply := false // ever saw a HEADERS reply
gotSettings := false
for {
f, err := cc.fr.ReadFrame()
// 此處省略代碼
maybeIdle := false // whether frame might transition us to idle
switch f := f.(type) {
case *http2MetaHeadersFrame:
err = rl.processHeaders(f)
maybeIdle = true
gotReply = true
case *http2DataFrame:
err = rl.processData(f)
maybeIdle = true
case *http2GoAwayFrame:
err = rl.processGoAway(f)
maybeIdle = true
case *http2RSTStreamFrame:
err = rl.processResetStream(f)
maybeIdle = true
case *http2SettingsFrame:
err = rl.processSettings(f)
case *http2PushPromiseFrame:
err = rl.processPushPromise(f)
case *http2WindowUpdateFrame:
err = rl.processWindowUpdate(f)
case *http2PingFrame:
err = rl.processPing(f)
default:
cc.logf("Transport: unhandled response frame type %T", f)
}
if err != nil {
if http2VerboseLogs {
cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, http2summarizeFrame(f), err)
}
return err
}
if rl.closeWhenIdle && gotReply && maybeIdle {
cc.closeIfIdle()
}
}
}
由上可知,(*http2clientConnReadLoop).run的核心邏輯是讀取數(shù)據(jù)幀然后對不同的數(shù)據(jù)幀進行不同的處理。
cc.fr.ReadFrame()會根據(jù)前面介紹的數(shù)據(jù)幀格式讀出數(shù)據(jù)幀。
前篇中提到使用了一個支持h2協(xié)議的圖片進行分析,本篇繼續(xù)復用該圖片對(*http2clientConnReadLoop).run方法進行debug。
收到http2FrameSettings數(shù)據(jù)幀
讀循環(huán)會最先讀到http2FrameSettings數(shù)據(jù)幀。讀到該數(shù)據(jù)幀后會調用(*http2clientConnReadLoop).processSettings方法。(*http2clientConnReadLoop).processSettings主要包含3個邏輯。
1、判斷是否是http2FrameSettings的ack信息,如果是直接返回,否則繼續(xù)后面的步驟。
if f.IsAck() {
if cc.wantSettingsAck {
cc.wantSettingsAck = false
return nil
}
return http2ConnectionError(http2ErrCodeProtocol)
}
2、處理不同http2FrameSettings的數(shù)據(jù)幀,并根據(jù)server傳遞的信息,修改maxConcurrentStreams等的值。
err := f.ForeachSetting(func(s http2Setting) error {
switch s.ID {
case http2SettingMaxFrameSize:
cc.maxFrameSize = s.Val
case http2SettingMaxConcurrentStreams:
cc.maxConcurrentStreams = s.Val
case http2SettingMaxHeaderListSize:
cc.peerMaxHeaderListSize = uint64(s.Val)
case http2SettingInitialWindowSize:
if s.Val > math.MaxInt32 {
return http2ConnectionError(http2ErrCodeFlowControl)
}
delta := int32(s.Val) - int32(cc.initialWindowSize)
for _, cs := range cc.streams {
cs.flow.add(delta)
}
cc.cond.Broadcast()
cc.initialWindowSize = s.Val
default:
// TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably.
cc.vlogf("Unhandled Setting: %v", s)
}
return nil
})
當收到ID為http2SettingInitialWindowSize的幀時,會調整當前連接中所有數(shù)據(jù)流的可發(fā)送數(shù)據(jù)窗口大小,并修改當前連接的initialWindowSize(每個新創(chuàng)建的數(shù)據(jù)流均會使用該值初始化可發(fā)送數(shù)據(jù)窗口大?。?span style="caret-color: rgb(36, 41, 46);color: rgb(36, 41, 46);font-family: -apple-system, BlinkMacSystemFont, 微軟雅黑, "PingFang SC", Helvetica, Arial, "Hiragino Sans GB", "Microsoft YaHei", SimSun, 宋體, Heiti, 黑體, sans-serif;font-size: 14px;text-align: start;text-size-adjust: auto;">為s.Val。
3、發(fā)送http2FrameSettings的ack信息給server。
cc.wmu.Lock()
defer cc.wmu.Unlock()
cc.fr.WriteSettingsAck()
cc.bw.Flush()
return cc.werr
收到http2WindowUpdateFrame數(shù)據(jù)幀
在筆者debug的過程中,處理完http2FrameSettings數(shù)據(jù)幀后,緊接著就收到了http2WindowUpdateFrame數(shù)據(jù)幀。收到該數(shù)據(jù)幀后會調用(*http2clientConnReadLoop).processWindowUpdate方法:
func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame) error {
cc := rl.cc
cs := cc.streamByID(f.StreamID, false)
if f.StreamID != 0 && cs == nil {
return nil
}
cc.mu.Lock()
defer cc.mu.Unlock()
fl := &cc.flow
if cs != nil {
fl = &cs.flow
}
if !fl.add(int32(f.Increment)) {
return http2ConnectionError(http2ErrCodeFlowControl)
}
cc.cond.Broadcast()
return nil
}
上面的邏輯主要用于更新當前連接和數(shù)據(jù)流的可發(fā)送數(shù)據(jù)窗口大小。如果http2WindowUpdateFrame幀中的StreamID為0,則更新當前連接的可發(fā)送數(shù)據(jù)窗口大小,否則更新對應數(shù)據(jù)流可發(fā)送數(shù)據(jù)窗口大小。
注意:在debug的過程,收到http2WindowUpdateFrame數(shù)據(jù)幀后,又收到一次http2FrameSettings,且該數(shù)據(jù)幀標識符為http2FlagSettingsAck。
筆者在這里特意提醒,這是因為前篇中提到的(*http2Transport).NewClientConn方法,也向server發(fā)送了http2FrameSettings數(shù)據(jù)幀和http2WindowUpdateFrame數(shù)據(jù)幀。
另外,在處理http2FrameSettings和http2WindowUpdateFrame過程中,均出現(xiàn)了cc.cond.Broadcast()調用,該調用主要用于喚醒因為以下兩種情況而Wait的請求:
因當前連接處理的數(shù)據(jù)流已經達到
maxConcurrentStreams的上限(詳見前篇中(*http2ClientConn).awaitOpenSlotForRequest方法分析)。因發(fā)送數(shù)據(jù)流已達可發(fā)送數(shù)據(jù)窗口上限而等待可發(fā)送數(shù)據(jù)窗口更新的請求(后續(xù)會介紹)。
收到http2MetaHeadersFrame數(shù)據(jù)幀
收到此數(shù)據(jù)幀意味著某一個請求已經開始接收響應數(shù)據(jù)。此數(shù)據(jù)幀對應的處理函數(shù)為(*http2clientConnReadLoop).processHeaders:
func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) error {
cc := rl.cc
cs := cc.streamByID(f.StreamID, false)
// 此處省略代碼
res, err := rl.handleResponse(cs, f)
if err != nil {
// 此處省略代碼
cs.resc <- http2resAndError{err: err}
return nil // return nil from process* funcs to keep conn alive
}
if res == nil {
// (nil, nil) special case. See handleResponse docs.
return nil
}
cs.resTrailer = &res.Trailer
cs.resc <- http2resAndError{res: res}
return nil
}
首先我們先看cs.resc <- http2resAndError{res: res}這一行代碼,向數(shù)據(jù)流寫入http2resAndError即本次請求的響應。在(*http2ClientConn).roundTrip方法中有這樣一行代碼readLoopResCh := cs.resc。
回顧:前篇(*http2ClientConn).roundTrip方法的第7點和本部分關聯(lián)起來就可以形成一個完整的請求鏈。
接下來我們對rl.handleResponse方法展開分析。
(*http2clientConnReadLoop).handleResponse
(*http2clientConnReadLoop).handleResponse的主要作用是構建一個Response變量,下面對該函數(shù)的關鍵步驟進行描述。
1、構建一個Response變量。
header := make(Header)
res := &Response{
Proto: "HTTP/2.0",
ProtoMajor: 2,
Header: header,
StatusCode: statusCode,
Status: status + " " + StatusText(statusCode),
}
2、構建header(本篇不對header進行展開分析)。
for _, hf := range f.RegularFields() {
key := CanonicalHeaderKey(hf.Name)
if key == "Trailer" {
t := res.Trailer
if t == nil {
t = make(Header)
res.Trailer = t
}
http2foreachHeaderElement(hf.Value, func(v string) {
t[CanonicalHeaderKey(v)] = nil
})
} else {
header[key] = append(header[key], hf.Value)
}
}
3、處理響應body的ContentLength。
streamEnded := f.StreamEnded()
isHead := cs.req.Method == "HEAD"
if !streamEnded || isHead {
res.ContentLength = -1
if clens := res.Header["Content-Length"]; len(clens) == 1 {
if clen64, err := strconv.ParseInt(clens[0], 10, 64); err == nil {
res.ContentLength = clen64
} else {
// TODO: care? unlike http/1, it won't mess up our framing, so it's
// more safe smuggling-wise to ignore.
}
} else if len(clens) > 1 {
// TODO: care? unlike http/1, it won't mess up our framing, so it's
// more safe smuggling-wise to ignore.
}
}
由上可知,當前數(shù)據(jù)流沒有結束或者是HEAD請求才讀取ContentLength。如果header中的ContentLength不合法則res.ContentLength的值為?-1。
4、構建res.Body。
cs.bufPipe = http2pipe{b: &http2dataBuffer{expected: res.ContentLength}}
cs.bytesRemain = res.ContentLength
res.Body = http2transportResponseBody{cs}
go cs.awaitRequestCancel(cs.req)
if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" {
res.Header.Del("Content-Encoding")
res.Header.Del("Content-Length")
res.ContentLength = -1
res.Body = &http2gzipReader{body: res.Body}
res.Uncompressed = true
}
根據(jù)Content-Encoding的編碼方式,會構建兩種不同的Body:
非gzip編碼時,構造的res.Body類型為
http2transportResponseBody。gzip編碼時,構造的res.Body類型為
http2gzipReader。
收到http2DataFrame數(shù)據(jù)幀
收到此數(shù)據(jù)幀意味著我們開始接收真實的響應,即平常開發(fā)中需要處理的業(yè)務數(shù)據(jù)。此數(shù)據(jù)幀對應的處理函數(shù)為(*http2clientConnReadLoop).processData。
因為server無法及時知道數(shù)據(jù)流在client端的狀態(tài),所以server可能會向client中一個已經不存在的數(shù)據(jù)流發(fā)送數(shù)據(jù):
cc := rl.cc
cs := cc.streamByID(f.StreamID, f.StreamEnded())
data := f.Data()
if cs == nil {
cc.mu.Lock()
neverSent := cc.nextStreamID
cc.mu.Unlock()
// 此處省略代碼
if f.Length > 0 {
cc.mu.Lock()
cc.inflow.add(int32(f.Length))
cc.mu.Unlock()
cc.wmu.Lock()
cc.fr.WriteWindowUpdate(0, uint32(f.Length))
cc.bw.Flush()
cc.wmu.Unlock()
}
return nil
}
接收到的數(shù)據(jù)幀在client沒有對應的數(shù)據(jù)流處理時,通過流控制器為當前連接可讀窗口大小增加f.Length,并且通過http2FrameWindowUpdate數(shù)據(jù)幀告知server將當前連接的可寫窗口大小增加f.Length。
如果client有對應的數(shù)據(jù)流且f.Length大于0:
1、如果是head請求結束當前數(shù)據(jù)流并返回。
if cs.req.Method == "HEAD" && len(data) > 0 {
cc.logf("protocol error: received DATA on a HEAD request")
rl.endStreamError(cs, http2StreamError{
StreamID: f.StreamID,
Code: http2ErrCodeProtocol,
})
return nil
}
2、檢查當前數(shù)據(jù)流能否處理f.Length長度的數(shù)據(jù)。
cc.mu.Lock()
if cs.inflow.available() >= int32(f.Length) {
cs.inflow.take(int32(f.Length))
} else {
cc.mu.Unlock()
return http2ConnectionError(http2ErrCodeFlowControl)
}
由上可知當前數(shù)據(jù)流如果能夠處理該數(shù)據(jù),通過流控制器調用cs.inflow.take減小當前數(shù)據(jù)流可接受窗口大小。
3、當前數(shù)據(jù)流被重置或者被關閉即cs.didReset為true時又或者數(shù)據(jù)幀有填充數(shù)據(jù)時需要調整流控制窗口。
var refund int
if pad := int(f.Length) - len(data); pad > 0 {
refund += pad
}
// Return len(data) now if the stream is already closed,
// since data will never be read.
didReset := cs.didReset
if didReset {
refund += len(data)
}
if refund > 0 {
cc.inflow.add(int32(refund))
cc.wmu.Lock()
cc.fr.WriteWindowUpdate(0, uint32(refund))
if !didReset {
cs.inflow.add(int32(refund))
cc.fr.WriteWindowUpdate(cs.ID, uint32(refund))
}
cc.bw.Flush()
cc.wmu.Unlock()
}
cc.mu.Unlock()
如果數(shù)據(jù)幀有填充數(shù)據(jù)則計算需要返還的填充數(shù)據(jù)長度。
如果數(shù)據(jù)流無效該數(shù)據(jù)幀的長度需要全部返還。
最后,根據(jù)計算的refund增加當前連接或者當前數(shù)據(jù)流的可接受窗口大小,并且同時告知server增加當前連接或者當前數(shù)據(jù)流的可寫窗口大小。
4、數(shù)據(jù)長度大于0且數(shù)據(jù)流正常則將數(shù)據(jù)寫入數(shù)據(jù)流緩沖區(qū)。
if len(data) > 0 && !didReset {
if _, err := cs.bufPipe.Write(data); err != nil {
rl.endStreamError(cs, err)
return err
}
}
回顧:前面的(*http2clientConnReadLoop).handleResponse方法中有這樣一行代碼res.Body = http2transportResponseBody{cs},所以在業(yè)務開發(fā)時能夠通過Response讀取到數(shù)據(jù)流中的緩沖數(shù)據(jù)。
(http2transportResponseBody).Read
在前面的內容里,如果數(shù)據(jù)流狀態(tài)正常且數(shù)據(jù)幀沒有填充數(shù)據(jù)則數(shù)據(jù)流和連接的可接收窗口會一直變小,而這部分內容就是增加數(shù)據(jù)流的可接受窗口大小。
因為篇幅和主旨的問題筆者僅分析描述該方法內和流控制有關的部分。
1、讀取響應數(shù)據(jù)后計算當前連接需要增加的可接受窗口大小。
cc.mu.Lock()
defer cc.mu.Unlock()
var connAdd, streamAdd int32
// Check the conn-level first, before the stream-level.
if v := cc.inflow.available(); v < http2transportDefaultConnFlow/2 {
connAdd = http2transportDefaultConnFlow - v
cc.inflow.add(connAdd)
}
如果當前連接可接受窗口的大小已經小于http2transportDefaultConnFlow(1G)的一半,則當前連接可接收窗口大小需要增加http2transportDefaultConnFlow - cc.inflow.available()。
回顧:http2transportDefaultConnFlow在前篇(*http2Transport).NewClientConn方法部分有提到,且連接剛建立時會通過http2WindowUpdateFrame數(shù)據(jù)幀告知server當前連接可發(fā)送窗口大小增加http2transportDefaultConnFlow。
2、讀取響應數(shù)據(jù)后計算當前數(shù)據(jù)流需要增加的可接受窗口大小。
if err == nil { // No need to refresh if the stream is over or failed.
// Consider any buffered body data (read from the conn but not
// consumed by the client) when computing flow control for this
// stream.
v := int(cs.inflow.available()) + cs.bufPipe.Len()
if v < http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh {
streamAdd = int32(http2transportDefaultStreamFlow - v)
cs.inflow.add(streamAdd)
}
}
如果當前數(shù)據(jù)流可接受窗口大小加上當前數(shù)據(jù)流緩沖區(qū)剩余未讀數(shù)據(jù)的長度小于http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh(4M-4KB),則當前數(shù)據(jù)流可接受窗口大小需要增加http2transportDefaultStreamFlow - v。
回顧:http2transportDefaultStreamFlow在前篇(*http2Transport).NewClientConn方法和(*http2ClientConn).newStream方法中均有提到。
連接剛建立時,發(fā)送http2FrameSettings數(shù)據(jù)幀,告知server每個數(shù)據(jù)流的可發(fā)送窗口大小為http2transportDefaultStreamFlow。
在newStream時,數(shù)據(jù)流默認的可接收窗口大小為http2transportDefaultStreamFlow。
3、將連接和數(shù)據(jù)流分別需要增加的窗口大小通過http2WindowUpdateFrame數(shù)據(jù)幀告知server。
if connAdd != 0 || streamAdd != 0 {
cc.wmu.Lock()
defer cc.wmu.Unlock()
if connAdd != 0 {
cc.fr.WriteWindowUpdate(0, http2mustUint31(connAdd))
}
if streamAdd != 0 {
cc.fr.WriteWindowUpdate(cs.ID, http2mustUint31(streamAdd))
}
cc.bw.Flush()
}
以上就是server向client發(fā)送數(shù)據(jù)的流控制邏輯。
(*http2clientStream).writeRequestBody
前篇中(*http2ClientConn).roundTrip未對(*http2clientStream).writeRequestBody進行分析,下面我們看看該方法的源碼:
func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) {
cc := cs.cc
sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
// 此處省略代碼
req := cs.req
hasTrailers := req.Trailer != nil
remainLen := http2actualContentLength(req)
hasContentLen := remainLen != -1
var sawEOF bool
for !sawEOF {
n, err := body.Read(buf[:len(buf)-1])
// 此處省略代碼
remain := buf[:n]
for len(remain) > 0 && err == nil {
var allowed int32
allowed, err = cs.awaitFlowControl(len(remain))
switch {
case err == http2errStopReqBodyWrite:
return err
case err == http2errStopReqBodyWriteAndCancel:
cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
return err
case err != nil:
return err
}
cc.wmu.Lock()
data := remain[:allowed]
remain = remain[allowed:]
sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
err = cc.fr.WriteData(cs.ID, sentEnd, data)
if err == nil {
err = cc.bw.Flush()
}
cc.wmu.Unlock()
}
if err != nil {
return err
}
}
// 此處省略代碼
return err
}
上面的邏輯可簡單總結為:不停的讀取請求body然后將讀取的內容通過cc.fr.WriteData轉為http2FrameData數(shù)據(jù)幀發(fā)送給server,直到請求body讀完為止。其中和流控制有關的方法是awaitFlowControl,下面我們對該方法進行分析。
(*http2clientStream).awaitFlowControl
此方法的主要作用是等待當前數(shù)據(jù)流可寫窗口有容量能夠寫入數(shù)據(jù)。
func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
cc := cs.cc
cc.mu.Lock()
defer cc.mu.Unlock()
for {
if cc.closed {
return 0, http2errClientConnClosed
}
if cs.stopReqBody != nil {
return 0, cs.stopReqBody
}
if err := cs.checkResetOrDone(); err != nil {
return 0, err
}
if a := cs.flow.available(); a > 0 {
take := a
if int(take) > maxBytes {
take = int32(maxBytes) // can't truncate int; take is int32
}
if take > int32(cc.maxFrameSize) {
take = int32(cc.maxFrameSize)
}
cs.flow.take(take)
return take, nil
}
cc.cond.Wait()
}
}
根據(jù)源碼可以知道,數(shù)據(jù)流被關閉或者停止發(fā)送請求body,則當前數(shù)據(jù)流無法寫入數(shù)據(jù)。當數(shù)據(jù)流狀態(tài)正常時,又分為兩種情況:
當前數(shù)據(jù)流可寫窗口剩余可寫數(shù)據(jù)大于0,則計算可寫字節(jié)數(shù),并將當前數(shù)據(jù)流可寫窗口大小消耗
take。當前數(shù)據(jù)流可寫窗口剩余可寫數(shù)據(jù)小于等于0,則會一直等待直到被喚醒并進入下一次檢查。
上面的第二種情況在收到http2WindowUpdateFrame數(shù)據(jù)幀這一節(jié)中提到過。
server讀取當前數(shù)據(jù)流的數(shù)據(jù)后會向client對應數(shù)據(jù)流發(fā)送http2WindowUpdateFrame數(shù)據(jù)幀,client收到該數(shù)據(jù)幀后會增大對應數(shù)據(jù)流可寫窗口,并執(zhí)行cc.cond.Broadcast()喚醒因發(fā)送數(shù)據(jù)已達流控制上限而等待的數(shù)據(jù)流繼續(xù)發(fā)送數(shù)據(jù)。
以上就是client向server發(fā)送數(shù)據(jù)的流控制邏輯。
總結
幀頭長度為9個字節(jié),并包含四個部分:Payload的長度、幀類型、幀標識符和數(shù)據(jù)流ID。
流控制可分為兩個步驟:
初始時,通過
http2FrameSettings數(shù)據(jù)幀和http2WindowUpdateFrame數(shù)據(jù)幀告知對方當前連接讀寫窗口大小以及連接中數(shù)據(jù)流讀寫窗口大小。在讀寫數(shù)據(jù)過程中,通過發(fā)送
http2WindowUpdateFrame數(shù)據(jù)幀控制另一端的寫窗口大小。
預告
前篇和中篇已經完成,下一期將對http2.0標頭壓縮進行分析。
最后,衷心希望本文能夠對各位讀者有一定的幫助。
-------------------?End?-------------------
往期精彩文章推薦:

歡迎大家點贊,留言,轉發(fā),轉載,感謝大家的相伴與支持
想加入Go學習群請在后臺回復【入群】
萬水千山總是情,點個【在看】行不行
