<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Go源碼分析:grpc context

          共 6267字,需瀏覽 13分鐘

           ·

          2021-12-09 13:11

          ? ? ? ??gRPC 是基于 HTTP/2 協(xié)議的。進程間傳輸定義了一個 metadata 對象,該對象放在 Request-Headers 內(nèi)所以通過 metadata 我們可以將上一個進程中的全局對象透傳到下一個被調(diào)用的進程。

          type MD map[string][]string

          進程內(nèi)部我們通過context來傳輸上下文數(shù)據(jù),進程間傳遞MD的時候,我們也可以從ctx,取出來,進行傳遞

          //set 數(shù)據(jù)到 metadatamd := metadata.Pairs("key", "val")// 新建一個有 metadata 的 contextctx := metadata.NewOutgoingContext(context.Background(), md)

          ????????為什么不直接把context里面的數(shù)據(jù)全取出來,傳遞給下游呢?這是出于可維護性和安全性兩方面的考慮,如果將ctx所有信息都傳遞下去,很有可能將一些內(nèi)部信息泄漏,另一方面,下游在取ctx的時候,不知道到底傳了哪些數(shù)據(jù)。所以grpc定義了兩個context:

          OutgoingContextIncomingContext

          OutgoingContext用于發(fā)送請求一方,包裝下游依賴的數(shù)據(jù),傳遞出去。IncomingContext用于服務(wù)端接受,客戶端傳遞來的context信息。context中間通過序列化成http2 header的方式進行傳輸。metadata/metadata.go,我們可以看到這兩個context雖然也是通過context.WithValue?設(shè)置數(shù)據(jù),通過context.Value來讀取數(shù)據(jù)。

          type mdIncomingKey struct{}type mdOutgoingKey struct{}
          // NewIncomingContext creates a new context with incoming md attached.func NewIncomingContext(ctx context.Context, md MD) context.Context { return context.WithValue(ctx, mdIncomingKey{}, md)}
          // NewOutgoingContext creates a new context with outgoing md attached. If used// in conjunction with AppendToOutgoingContext, NewOutgoingContext will// overwrite any previously-appended metadata.func NewOutgoingContext(ctx context.Context, md MD) context.Context { return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md})}
          func FromIncomingContext(ctx context.Context) (MD, bool) {  md, ok := ctx.Value(mdIncomingKey{}).(MD)  if !ok {    return nil, false  }  out := MD{}  for k, v := range md {    // We need to manually convert all keys to lower case, because MD is a    // map, and there's no guarantee that the MD attached to the context is    // created using our helper functions.    key := strings.ToLower(k)    out[key] = v  }  return out, true}func FromOutgoingContext(ctx context.Context) (MD, bool) {  raw, ok := ctx.Value(mdOutgoingKey{}).(rawMD)  if !ok {    return nil, false  }
          out := MD{} for k, v := range raw.md { // We need to manually convert all keys to lower case, because MD is a // map, and there's no guarantee that the MD attached to the context is // created using our helper functions. key := strings.ToLower(k) out[key] = v } for _, added := range raw.added { if len(added)%2 == 1 { panic(fmt.Sprintf("metadata: FromOutgoingContext got an odd number of input pairs for metadata: %d", len(added))) }
          for i := 0; i < len(added); i += 2 { key := strings.ToLower(added[i]) out[key] = append(out[key], added[i+1]) } } return out, ok}

          但是,和普通context也是有差別的,MD的存儲的時候,key 是string,value是[]string,context為了盡可能地防止覆蓋,key 、value都是interface類型的,并且通過lint等方式,盡可能做到不讓修改,也就是說用戶自己存入的數(shù)據(jù)的key盡量要是新定義的類型,類型別名也不可以。

          ????????????直觀理解,客戶端在發(fā)送請求的時候,會初始化一個OutgoingContext,服務(wù)端在取的時候,用的是IncomingContext中間必然存在一個從OutgoingContext?取數(shù)據(jù),方讓http2 header,從http2 header 取數(shù)據(jù)存入IncomingContext?的過程。我們通過源碼來分析下:


          1,server端構(gòu)造IncomingContext?的過程:

          func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)         s.serveStreams(st)

          我們從server.go文件ServeHTTP?函數(shù)開始:

          func (s *Server) serveStreams(st transport.ServerTransport)         st.HandleStreams(func(stream *transport.Stream) 

          它調(diào)用了internal/transport/http2_server.go里面的函數(shù)

          func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) ?????case?*http2.MetaHeadersFrame:???????if?t.operateHeaders(frame,?handle,?traceCtx)?{
          func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool)   ctx = metadata.NewIncomingContext(ctx, ht.headerMD)

          可以看到,通過http2的header構(gòu)造了我們的IncomingContext


          2,client從?OutgoingContext取數(shù)據(jù)的過程

          客戶端的請求調(diào)用是從call.go的Invoke函數(shù)開始的

          func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error  
          func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error         cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)         err := cs.SendMsg(req); err != nil 
          func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption)        return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)          op := func(a *csAttempt) error { return a.newStream() }            s, err := a.t.NewStream(cs.ctx, cs.callHdr)

          最終調(diào)用啦a.t.NewStream

          實現(xiàn)在internal/transport/http2_client.go

          func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error)     headerFields, err := t.createHeaderFields(ctx, callHdr) 
          func?(t?*http2Client)?createHeaderFields(ctx?context.Context,?callHdr?*CallHdr)?([]hpack.HeaderField,?error)?????md,?added,?ok?:=?metadata.FromOutgoingContextRaw(ctx);?ok?

          至此,完成了,數(shù)據(jù)的轉(zhuǎn)換。那么問題來了,對于一個處于中游的grpc服務(wù),每個請求,我都去先獲取IncomingContext然后設(shè)置OutgoingContext是不是很麻煩我們有沒有相關(guān)的簡單方案呢?答案是middleware


          3,客戶端middleware

          ????在客戶端發(fā)起的請求連接的時候,我們可以在options里面添加攔截器unaryClientInterceptors

              conn, err := grpc.Dial(target, dialOptions...)    dialOptions := append([]grpc.DialOption{      grpc.WithUnaryInterceptor(grpcMiddleware.ChainUnaryClient(unaryClientInterceptors...)),

          客戶端的攔截器有很多,比如:

          clientinterceptors.UnaryTracingInterceptor,clientinterceptors.DurationInterceptor,clientinterceptors.PrometheusInterceptor,clientinterceptors.BreakerInterceptor,clientinterceptors.TimeoutInterceptor(cliOpts.Timeout),

          一個常見的客戶端攔截器可以這么寫,攔截器的入?yún)⒂形覀冃枰囊磺校?br>

          func DurationInterceptor(ctx context.Context, method string, req, reply interface{},  cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {   //do some thing  err := invoker(ctx, method, req, reply, cc, opts...)

          4,服務(wù)端middleware

          我們在注冊服務(wù)的時候,可以注冊unaryServerInterceptors

          server = grpc.NewServer(dialOptions...)    dialOptions := []grpc.ServerOption{      grpc_middleware.WithUnaryServerChain(unaryServerInterceptors...),

          常見的服務(wù)端攔截器長這樣:

          func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {  o := evaluateOptions(opts)??return?func(ctx?context.Context,?req?interface{},?info?*grpc.UnaryServerInfo,?handler?grpc.UnaryHandler)?(_?interface{},?err?error)?{??//do?some?thing??}



          推薦閱讀


          福利

          我為大家整理了一份從入門到進階的Go學(xué)習(xí)資料禮包,包含學(xué)習(xí)建議:入門看什么,進階看什么。關(guān)注公眾號 「polarisxu」,回復(fù)?ebook?獲取;還可以回復(fù)「進群」,和數(shù)萬 Gopher 交流學(xué)習(xí)。

          瀏覽 69
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  2024AV中文在线播放 | 在线观看你懂的网站 | 最近中文字幕免费mv第一季歌词在线观看 | 国产黄色毛片电影 | 国产黄色电影在线播放 |