<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          寫(xiě)給go開(kāi)發(fā)者的gRPC教程-攔截器

          共 11745字,需瀏覽 24分鐘

           ·

          2023-04-18 12:02

          本篇為【寫(xiě)給go開(kāi)發(fā)者的gRPC教程】系列第三篇

          第一篇:protobuf基礎(chǔ)

          第二篇:通信模式

          第三篇:攔截器


          gRPC攔截器和其他框架的攔截器(也稱middleware)作用是一樣的。利用攔截器我們可以在不侵入業(yè)務(wù)邏輯的前提下修改或者記錄服務(wù)端或客戶端的請(qǐng)求與響應(yīng),利用攔截器我們可以實(shí)現(xiàn)諸如日志記錄、權(quán)限認(rèn)證、限流等諸多功能

          上一篇提到gRPC的通信模式分為unarystreaming幾種模式,攔截器也分為兩種:unary interceptorsstreaming interceptors ,兩種攔截器可以分別應(yīng)用在服務(wù)端和客戶端,所以gRPC總共為我們提供了四種攔截器。它們已經(jīng)被定義成了go中的接口,我們創(chuàng)建的攔截器只要實(shí)現(xiàn)這些接口即可

          008f98413e43219dbdb5a79ea9c784af.webpgRPC四種攔截器一覽

          服務(wù)端攔截器

          服務(wù)端的攔截器從請(qǐng)求開(kāi)始按順序執(zhí)行攔截器,在執(zhí)行完對(duì)應(yīng)RPC的邏輯之后,再按反向的順序執(zhí)行攔截器中對(duì)響應(yīng)的處理邏輯

          81327c107ba453066b49a2591d2ff50f.webp服務(wù)端攔截器

          unary interceptors

          對(duì)于unary服務(wù)的攔截器只需實(shí)現(xiàn)UnaryServerInterceptor接口即可

                
                  func(ctx?context.Context,?req?interface{},?
          ?????info?*UnaryServerInfo,?handler?UnaryHandler)
          ?(resp?interface{},?err?error)

          • ctx context.Context:?jiǎn)蝹€(gè)請(qǐng)求的上下文
          • req interface{}:RPC服務(wù)的請(qǐng)求結(jié)構(gòu)體
          • info *UnaryServerInfo:RPC的服務(wù)信息
          • handler UnaryHandler:它包裝了服務(wù)實(shí)現(xiàn),通過(guò)調(diào)用它我們可以完成RPC并獲取到響應(yīng)

          參數(shù)看不懂沒(méi)關(guān)系,我們來(lái)看一個(gè)例子

          示例

                //?實(shí)現(xiàn)?unary?interceptors
          func?orderUnaryServerInterceptor(ctx?context.Context,?req?interface{},?info?*grpc.UnaryServerInfo,?handler?grpc.UnaryHandler)?(interface{},?error)?{
          ?//?Pre-processing?logic
          ?s?:=?time.Now()

          ?//?Invoking?the?handler?to?complete?the?normal?execution?of?a?unary?RPC.
          ?m,?err?:=?handler(ctx,?req)

          ?//?Post?processing?logic
          ?log.Printf("Method:?%s,?req:?%s,?resp:?%s,?latency:?%s\n",
          ??info.FullMethod,?req,?m,?time.Now().Sub(s))
          ??
          ?return?m,?err
          }

          func?main()?{
          ?s?:=?grpc.NewServer(
          ????//?使用?unary?interceptors
          ??grpc.UnaryInterceptor(orderUnaryServerInterceptor),
          ?)
          ?
          ??pb.RegisterOrderManagementServer(s,?&OrderManagementImpl{})
          ??
          ?//?...
          }

          完整代碼參考: https://github.com/liangwt/grpc-example/tree /main/06-interceptors/server。下同

          假設(shè)我們的客戶端請(qǐng)求了GetOrder,根據(jù)示例再重新看下攔截器接口的每一個(gè)參數(shù)

          ?? req interface{}

          RPC服務(wù)的請(qǐng)求結(jié)構(gòu)體,對(duì)于GetOrder來(lái)說(shuō)就是orderId *wrapperspb.StringValue

          ?? info *UnaryServerInfo包含兩個(gè)字段:

          FullMethod是請(qǐng)求的method名字(例如/ecommerce.OrderManagement/getOrder);

          Server就是服務(wù)實(shí)現(xiàn)(就是示例RegisterOrderManagementServer中的&OrderManagementImpl{})

          ?? handler包裝了服務(wù)實(shí)現(xiàn)

          所以在調(diào)用它之前我們可以進(jìn)行改寫(xiě)reqctx、記錄邏輯開(kāi)始時(shí)間等操作

          調(diào)用完handler即完成了RPC并獲取到響應(yīng),我們不僅可以記錄響應(yīng)還可以改寫(xiě)響應(yīng)

          總結(jié)

          這張圖大致展示了UnaryServerInterceptor接口的每個(gè)參數(shù)的含義

          f5e8ffde93f07f589c549aa223a4a52d.webp

          streaming interceptors

          對(duì)于stream服務(wù)的攔截器只要實(shí)現(xiàn)StreamServerInterceptor接口即可。它適用于我們上一篇介紹的

          • 服務(wù)器端流式 RPC
          • 客戶端流式 RPC
          • 雙向流式 RPC
                
                  func(srv?interface{},?ss?ServerStream,?
          ?????info?*StreamServerInfo,?handler?StreamHandler)
          ?error

          • srv interface{}:服務(wù)實(shí)現(xiàn)
          • ss ServerStream:服務(wù)端視角的流。怎么理解呢?無(wú)論是哪一種流式RPC對(duì)于服務(wù)端來(lái)說(shuō)發(fā)送(SendMsg)就代表著響應(yīng)數(shù)據(jù),接收(RecvMsg)就代表著請(qǐng)求數(shù)據(jù),不同的流式RPC的區(qū)別就在于是多次發(fā)送數(shù)據(jù)(服務(wù)器端流式 RPC)還是多次接收數(shù)據(jù)(客戶端流式 RPC)或者兩者均有(雙向流式 RPC)。因此僅使用這一個(gè)抽象就代表了所有的流式RPC場(chǎng)景
          • info *StreamServerInfo:RPC的服務(wù)信息
          • handler StreamHandler:它包裝了服務(wù)實(shí)現(xiàn),通過(guò)調(diào)用它我們可以完成RPC

          示例

          我們來(lái)看一個(gè)例子

                func?orderStreamServerInterceptor(srv?interface{},
          ?ss?grpc.ServerStream,?info?*grpc.StreamServerInfo,?handler?grpc.StreamHandler)
          ?error
          ?{

          ?//?Pre-processing?logic
          ?s?:=?time.Now()

          ?//?Invoking?the?StreamHandler?to?complete?the?execution?of?RPC?invocation
          ?err?:=?handler(srv,?ss)

          ?//?Post?processing?logic
          ?log.Printf("Method:?%s,?latency:?%s\n",?info.FullMethod,?time.Now().Sub(s))

          ?return?err
          }

          func?main()?{
          ?s?:=?grpc.NewServer(
          ??grpc.StreamInterceptor(orderStreamServerInterceptor),
          ?)

          ?pb.RegisterOrderManagementServer(s,?&OrderManagementImpl{})

          ?//...
          }

          根據(jù)示例再重新看下攔截器接口的參數(shù)

          ?? srv interface{}

          服務(wù)實(shí)現(xiàn)(就是示例RegisterOrderManagementServer中的&OrderManagementImpl{})

          ?? ss grpc.ServerStream

          服務(wù)端發(fā)送和接收數(shù)據(jù)的接口,注意它是一個(gè)接口

          ?? info *grpc.StreamServerInfo包含三個(gè)字段:

          FullMethod是請(qǐng)求的method名字(例如/ecommerce.OrderManagement/updateOrders);

          IsClientStream 是否是客戶端流

          IsServerStream 是否是服務(wù)端流

          ?? handler包裝了服務(wù)實(shí)現(xiàn)

          所以在調(diào)用它之前我們可以進(jìn)行改寫(xiě)數(shù)據(jù)流、記錄邏輯開(kāi)始時(shí)間等操作

          調(diào)用完handler即完成了RPC,因?yàn)槭橇魇秸{(diào)用所以不會(huì)返回響應(yīng)數(shù)據(jù),只有error

          流式攔截器既沒(méi)有請(qǐng)求字段,handler也不會(huì)返回響應(yīng),該如何記錄、修改請(qǐng)求響應(yīng)呢?

          如果想劫持流數(shù)據(jù),答案就在ss ServerStream。再重復(fù)一遍它的含義:服務(wù)端視角的流,它是一個(gè)接口。無(wú)論是哪一種流式RPC對(duì)于服務(wù)端來(lái)說(shuō)發(fā)送(SendMsg)就代表著響應(yīng)數(shù)據(jù),接收(RecvMsg)就代表著請(qǐng)求數(shù)據(jù),不同的流式RPC的區(qū)別就在于是多次發(fā)送數(shù)據(jù)(服務(wù)器端流式 RPC)還是多次接收數(shù)據(jù)(客戶端流式 RPC)或者兩者均有(雙向流式 RPC)。因此可以對(duì)ss進(jìn)行包裝,只要傳入handler的類型實(shí)現(xiàn)ServerStream即可

                //?SendMsg?method?call.
          type?wrappedStream?struct?{
          ?Recv?[]interface{}
          ?Send?[]interface{}
          ?grpc.ServerStream
          }

          func?(w?*wrappedStream)?RecvMsg(m?interface{})?error?{
          ?err?:=?w.ServerStream.RecvMsg(m)

          ?w.Recv?=?append(w.Recv,?m)

          ?return?err
          }

          func?(w?*wrappedStream)?SendMsg(m?interface{})?error?{
          ?err?:=?w.ServerStream.SendMsg(m)

          ?w.Send?=?append(w.Send,?m)

          ?return?err
          }

          func?newWrappedStream(s?grpc.ServerStream)?*wrappedStream?{
          ?return?&wrappedStream{
          ??make([]interface{},?0),
          ??make([]interface{},?0),
          ??s,
          ?}
          }

          func?orderStreamServerInterceptor(srv?interface{},
          ?ss?grpc.ServerStream,?info?*grpc.StreamServerInfo,?handler?grpc.StreamHandler)
          ?error
          ?{

          ?//?Pre-processing?logic
          ?s?:=?time.Now()

          ?//?Invoking?the?StreamHandler?to?complete?the?execution?of?RPC?invocation
          ?nss?:=?newWrappedStream(ss)
          ?err?:=?handler(srv,?nss)

          ?//?Post?processing?logic
          ?log.Printf("Method:?%s,?req:?%+v,?resp:?%+v,?latency:?%s\n",
          ??info.FullMethod,?nss.Recv,?nss.Send,?time.Now().Sub(s))

          ?return?err
          }

          客戶端攔截器

          客戶端攔截器和服務(wù)端攔截器類似,從請(qǐng)求開(kāi)始按順序執(zhí)行攔截器,在獲取到服務(wù)端響應(yīng)之后,再按反向的順序執(zhí)行攔截器中對(duì)響應(yīng)的處理邏輯

          fb0d524f0f8f40e8f86f74fd9817d721.webp客戶端攔截器

          unary interceptors

          client端要實(shí)現(xiàn)UnaryClientInterceptor接口實(shí)現(xiàn)的接口如下

                
                  func(ctx?context.Context,?method?string,?req,?reply?interface{},?
          ?????cc?*ClientConn,?invoker?UnaryInvoker,?opts?...CallOption)
          ?error

          你可以在調(diào)用遠(yuǎn)程函數(shù)前攔截RPC,通過(guò)獲取RPC相關(guān)信息,如參數(shù),上下文,函數(shù)名,請(qǐng)求等,你甚至可以修改原始的遠(yuǎn)程調(diào)用

          • ctx context.Context:?jiǎn)蝹€(gè)請(qǐng)求的上下文
          • method string:請(qǐng)求的method名字(例如/ecommerce.OrderManagement/getOrder)
          • req, reply interface{}:請(qǐng)求和響應(yīng)數(shù)據(jù)
          • cc *ClientConn:客戶端與服務(wù)端的鏈接
          • invoker UnaryInvoker:通過(guò)調(diào)用它我們可以完成RPC并獲取到響應(yīng)
          • opts ...CallOption:RPC調(diào)用的所有配置項(xiàng),包含設(shè)置到conn上的,也包含配置在每一個(gè)調(diào)用上的

          示例

                func?orderUnaryClientInterceptor(ctx?context.Context,?method?string,?req,?reply?interface{},
          ?cc?*grpc.ClientConn,?invoker?grpc.UnaryInvoker,?opts?...grpc.CallOption)
          ?error
          ?{
          ?//?Pre-processor?phase
          ?s?:=?time.Now()

          ?//?Invoking?the?remote?method
          ?err?:=?invoker(ctx,?method,?req,?reply,?cc,?opts...)

          ?//?Post-processor?phase
          ?log.Printf("method:?%s,?req:?%s,?resp:?%s,?latency:?%s\n",
          ??method,?req,?reply,?time.Now().Sub(s))

          ?return?err
          }

          func?main()?{
          ?conn,?err?:=?grpc.Dial("127.0.0.1:8009",
          ??grpc.WithInsecure(),
          ??grpc.WithUnaryInterceptor(orderUnaryClientInterceptor),
          ?)
          ?if?err?!=?nil?{
          ??panic(err)
          ?}

          ?c?:=?pb.NewOrderManagementClient(conn)
          ??
          ??//?...
          }

          根據(jù)示例再重新看下攔截器接口的參數(shù)

          ?? cc *grpc.ClientConn客戶端與服務(wù)端的鏈接

          這里的cc就是示例代碼中c := pb.NewOrderManagementClient(conn)conn

          ?? invoker grpc.UnaryInvoker包裝了服務(wù)實(shí)現(xiàn)

          調(diào)用完invoker即完成了RPC,所以我們可以改寫(xiě)req或者在獲取到reply之后修改響應(yīng)

          5e07fd954553f42ed4fa5dbc6d82cd82.webp

          streaming interceptors

          要實(shí)現(xiàn)的接口StreamClientInterceptor

                
                  func(ctx?context.Context,?desc?*StreamDesc,?cc?*ClientConn,?
          ?????method?string,?streamer?Streamer,?opts?...CallOption)
          ?(ClientStream,?error)

          和serve端類似的參數(shù)類似,重點(diǎn)關(guān)注下面幾個(gè)參數(shù)

          • cs ClientStream:客戶端視角的流。類比服務(wù)端的ss ServerStream,無(wú)論是哪一種流式RPC對(duì)于客戶端來(lái)說(shuō)發(fā)送(SendMsg)就代表著請(qǐng)求數(shù)據(jù),接收(RecvMsg)就代表著響應(yīng)數(shù)據(jù)(正好和服務(wù)端是反過(guò)來(lái)的)
          • streamer Streamer:完成RPC請(qǐng)求的調(diào)用

          示例

          這里不再贅述,可以參考服務(wù)端攔截器

                func?orderStreamClientInterceptor(ctx?context.Context,?desc?*grpc.StreamDesc,
          ?cc?*grpc.ClientConn,?method?string,?streamer?grpc.Streamer,
          ?opts?...grpc.CallOption)
          ?(grpc.ClientStream,?error)
          ?{

          ?//?Pre-processing?logic
          ?s?:=?time.Now()

          ?cs,?err?:=?streamer(ctx,?desc,?cc,?method,?opts...)

          ?//?Post?processing?logic
          ?log.Printf("method:?%s,?latency:?%s\n",?method,?time.Now().Sub(s))

          ?return?cs,?err
          }

          func?main()?{
          ?conn,?err?:=?grpc.Dial("127.0.0.1:8009",
          ??grpc.WithInsecure(),
          ??grpc.WithStreamInterceptor(orderStreamClientInterceptor),
          ?)
          ?if?err?!=?nil?{
          ??panic(err)
          ?}

          ?c?:=?pb.NewOrderManagementClient(conn)
          ??
          ??//?...
          }

          如何記錄或者修改流攔截器的請(qǐng)求響應(yīng)數(shù)據(jù)?

          和服務(wù)端stream interceptor同樣的道理,通過(guò)包裝ClientStream即可做到

                //?SendMsg?method?call.
          type?wrappedStream?struct?{
          ?method?string
          ?grpc.ClientStream
          }

          func?(w?*wrappedStream)?RecvMsg(m?interface{})?error?{
          ?err?:=?w.ClientStream.RecvMsg(m)

          ?log.Printf("method:?%s,?res:?%s\n",?w.method,?m)

          ?return?err
          }

          func?(w?*wrappedStream)?SendMsg(m?interface{})?error?{
          ?err?:=?w.ClientStream.SendMsg(m)

          ?log.Printf("method:?%s,?req:?%s\n",?w.method,?m)

          ?return?err
          }

          func?newWrappedStream(method?string,?s?grpc.ClientStream)?*wrappedStream?{
          ?return?&wrappedStream{
          ??method,
          ??s,
          ?}
          }

          func?orderStreamClientInterceptor(ctx?context.Context,?desc?*grpc.StreamDesc,
          ?cc?*grpc.ClientConn,?method?string,?streamer?grpc.Streamer,
          ?opts?...grpc.CallOption)
          ?(grpc.ClientStream,?error)
          ?{

          ?//?Pre-processing?logic
          ?s?:=?time.Now()

          ?cs,?err?:=?streamer(ctx,?desc,?cc,?method,?opts...)

          ?//?Post?processing?logic
          ?log.Printf("method:?%s,?latency:?%s\n",?method,?time.Now().Sub(s))

          ?return?newWrappedStream(method,?cs),?err
          }

          攔截器鏈

          服務(wù)器只能配置一個(gè) unary interceptorstream interceptor,否則會(huì)報(bào)錯(cuò),客戶端也是,雖然不會(huì)報(bào)錯(cuò),但是只有最后一個(gè)才起作用。

                //?服務(wù)端攔截器
          s?:=?grpc.NewServer(
          ??grpc.UnaryInterceptor(orderUnaryServerInterceptor),
          ??grpc.StreamInterceptor(orderStreamServerInterceptor),
          )
                //?客戶端攔截器
          conn,?err?:=?grpc.Dial("127.0.0.1:8009",
          ??grpc.WithInsecure(),
          ??grpc.WithUnaryInterceptor(orderUnaryClientInterceptor),
          ??grpc.WithStreamInterceptor(orderStreamClientInterceptor),
          )

          如果你想配置多個(gè),可以使用攔截器鏈或者自己實(shí)現(xiàn)一個(gè)。

                //?服務(wù)端攔截器
          s?:=?grpc.NewServer(
          ??grpc.ChainUnaryInterceptor(
          ????orderUnaryServerInterceptor1,
          ????orderUnaryServerInterceptor2,
          ??),
          ??grpc.ChainStreamInterceptor(
          ????orderServerStreamInterceptor1,
          ????orderServerStreamInterceptor2,
          ??),
          )
                //?客戶端攔截器
          conn,?err?:=?grpc.Dial("127.0.0.1:8009",
          ??grpc.WithInsecure(),
          ??grpc.WithChainUnaryInterceptor(
          ???orderUnaryClientInterceptor1,
          ??????orderUnaryClientInterceptor2,
          ??),
          ??grpc.WithChainStreamInterceptor(
          ???orderStreamClientInterceptor1,
          ??????orderStreamClientInterceptor2,
          ??),
          )

          生態(tài)

          除了可以自己實(shí)現(xiàn)攔截器外,gRPC生態(tài)也提供了一系列的開(kāi)源的攔截器可供使用,覆蓋權(quán)限、日志、監(jiān)控等諸多方面

          https://github.com/grpc-ecosystem/go-grpc-middleware

          Auth

          • grpc_auth - a customizable (via AuthFunc) piece of auth middleware

          Logging

          • grpc_ctxtags - a library that adds a Tag map to context, with data populated from request body
          • grpc_zap - integration of zap logging library into gRPC handlers.
          • grpc_logrus - integration of logrus logging library into gRPC handlers.
          • grpc_kit - integration of go-kit/log logging library into gRPC handlers.
          • grpc_grpc_logsettable - a wrapper around grpclog.LoggerV2 that allows to replace loggers in runtime (thread-safe).

          Monitoring

          • grpc_prometheus? - Prometheus client-side and server-side monitoring middleware
          • otgrpc? - OpenTracing client-side and server-side interceptors
          • grpc_opentracing - OpenTracing client-side and server-side interceptors with support for streaming and handler-returned tags

          Client

          • grpc_retry - a generic gRPC response code retry mechanism, client-side middleware

          Server

          • grpc_validator - codegen inbound message validation from .proto options
          • grpc_recovery - turn panics into gRPC errors
          • ratelimit - grpc rate limiting by your own limiter

          示例代碼

          https://github.com/liangwt/grpc-example/tree/main/06-interceptors



          推薦閱讀


          福利
          我為大家整理了一份 從入門(mén)到進(jìn)階的Go學(xué)習(xí)資料禮包 ,包含學(xué)習(xí)建議:入門(mén)看什么,進(jìn)階看什么。 關(guān)注公眾號(hào) 「polarisxu」,回復(fù)? ebook ?獲??;還可以回復(fù)「進(jìn)群」,和數(shù)萬(wàn) Gopher 交流學(xué)習(xí)。


          瀏覽 139
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  伊人久久97 | 免费A片在线 | www18禁 | 吖v在线视频免费观看免费观看 | 国产十区 |