寫(xiě)給go開(kāi)發(fā)者的gRPC教程-攔截器
本篇為【寫(xiě)給go開(kāi)發(fā)者的gRPC教程】系列第三篇
第一篇:protobuf基礎(chǔ)
第二篇:通信模式
第三篇:攔截器
gRPC的攔截器和其他框架的攔截器(也稱middleware)作用是一樣的。利用攔截器我們可以在不侵入業(yè)務(wù)邏輯的前提下修改或者記錄服務(wù)端或客戶端的請(qǐng)求與響應(yīng),利用攔截器我們可以實(shí)現(xiàn)諸如日志記錄、權(quán)限認(rèn)證、限流等諸多功能
上一篇提到gRPC的通信模式分為unary和streaming幾種模式,攔截器也分為兩種:unary interceptors和streaming interceptors ,兩種攔截器可以分別應(yīng)用在服務(wù)端和客戶端,所以gRPC總共為我們提供了四種攔截器。它們已經(jīng)被定義成了go中的接口,我們創(chuàng)建的攔截器只要實(shí)現(xiàn)這些接口即可
gRPC四種攔截器一覽服務(wù)端攔截器
服務(wù)端的攔截器從請(qǐng)求開(kāi)始按順序執(zhí)行攔截器,在執(zhí)行完對(duì)應(yīng)RPC的邏輯之后,再按反向的順序執(zhí)行攔截器中對(duì)響應(yīng)的處理邏輯
服務(wù)端攔截器unary interceptors
對(duì)于unary服務(wù)的攔截器只需實(shí)現(xiàn)UnaryServerInterceptor接口即可
func(ctx?context.Context,?req?interface{},?
?????info?*UnaryServerInfo,?handler?UnaryHandler)?(resp?interface{},?err?error)
-
ctx context.Context:?jiǎn)蝹€(gè)請(qǐng)求的上下文 -
req interface{}:RPC服務(wù)的請(qǐng)求結(jié)構(gòu)體 -
info *UnaryServerInfo:RPC的服務(wù)信息 -
handler UnaryHandler:它包裝了服務(wù)實(shí)現(xiàn),通過(guò)調(diào)用它我們可以完成RPC并獲取到響應(yīng)
參數(shù)看不懂沒(méi)關(guān)系,我們來(lái)看一個(gè)例子
示例
//?實(shí)現(xiàn)?unary?interceptors
func?orderUnaryServerInterceptor(ctx?context.Context,?req?interface{},?info?*grpc.UnaryServerInfo,?handler?grpc.UnaryHandler)?(interface{},?error)?{
?//?Pre-processing?logic
?s?:=?time.Now()
?//?Invoking?the?handler?to?complete?the?normal?execution?of?a?unary?RPC.
?m,?err?:=?handler(ctx,?req)
?//?Post?processing?logic
?log.Printf("Method:?%s,?req:?%s,?resp:?%s,?latency:?%s\n",
??info.FullMethod,?req,?m,?time.Now().Sub(s))
??
?return?m,?err
}
func?main()?{
?s?:=?grpc.NewServer(
????//?使用?unary?interceptors
??grpc.UnaryInterceptor(orderUnaryServerInterceptor),
?)
?
??pb.RegisterOrderManagementServer(s,?&OrderManagementImpl{})
??
?//?...
}
完整代碼參考: https://github.com/liangwt/grpc-example/tree /main/06-interceptors/server。下同
假設(shè)我們的客戶端請(qǐng)求了GetOrder,根據(jù)示例再重新看下攔截器接口的每一個(gè)參數(shù)
?? req interface{}
RPC服務(wù)的請(qǐng)求結(jié)構(gòu)體,對(duì)于GetOrder來(lái)說(shuō)就是orderId *wrapperspb.StringValue
?? info *UnaryServerInfo包含兩個(gè)字段:
FullMethod是請(qǐng)求的method名字(例如/ecommerce.OrderManagement/getOrder);
Server就是服務(wù)實(shí)現(xiàn)(就是示例RegisterOrderManagementServer中的&OrderManagementImpl{})
?? handler包裝了服務(wù)實(shí)現(xiàn)
所以在調(diào)用它之前我們可以進(jìn)行改寫(xiě)req或ctx、記錄邏輯開(kāi)始時(shí)間等操作
調(diào)用完handler即完成了RPC并獲取到響應(yīng),我們不僅可以記錄響應(yīng)還可以改寫(xiě)響應(yīng)
總結(jié)
這張圖大致展示了UnaryServerInterceptor接口的每個(gè)參數(shù)的含義

streaming interceptors
對(duì)于stream服務(wù)的攔截器只要實(shí)現(xiàn)StreamServerInterceptor接口即可。它適用于我們上一篇介紹的
- 服務(wù)器端流式 RPC
- 客戶端流式 RPC
- 雙向流式 RPC
func(srv?interface{},?ss?ServerStream,?
?????info?*StreamServerInfo,?handler?StreamHandler)?error
-
srv interface{}:服務(wù)實(shí)現(xiàn) -
ss ServerStream:服務(wù)端視角的流。怎么理解呢?無(wú)論是哪一種流式RPC對(duì)于服務(wù)端來(lái)說(shuō)發(fā)送(SendMsg)就代表著響應(yīng)數(shù)據(jù),接收(RecvMsg)就代表著請(qǐng)求數(shù)據(jù),不同的流式RPC的區(qū)別就在于是多次發(fā)送數(shù)據(jù)(服務(wù)器端流式 RPC)還是多次接收數(shù)據(jù)(客戶端流式 RPC)或者兩者均有(雙向流式 RPC)。因此僅使用這一個(gè)抽象就代表了所有的流式RPC場(chǎng)景 -
info *StreamServerInfo:RPC的服務(wù)信息 -
handler StreamHandler:它包裝了服務(wù)實(shí)現(xiàn),通過(guò)調(diào)用它我們可以完成RPC
示例
我們來(lái)看一個(gè)例子
func?orderStreamServerInterceptor(srv?interface{},
?ss?grpc.ServerStream,?info?*grpc.StreamServerInfo,?handler?grpc.StreamHandler)?error?{
?//?Pre-processing?logic
?s?:=?time.Now()
?//?Invoking?the?StreamHandler?to?complete?the?execution?of?RPC?invocation
?err?:=?handler(srv,?ss)
?//?Post?processing?logic
?log.Printf("Method:?%s,?latency:?%s\n",?info.FullMethod,?time.Now().Sub(s))
?return?err
}
func?main()?{
?s?:=?grpc.NewServer(
??grpc.StreamInterceptor(orderStreamServerInterceptor),
?)
?pb.RegisterOrderManagementServer(s,?&OrderManagementImpl{})
?//...
}
根據(jù)示例再重新看下攔截器接口的參數(shù)
?? srv interface{}
服務(wù)實(shí)現(xiàn)(就是示例RegisterOrderManagementServer中的&OrderManagementImpl{})
?? ss grpc.ServerStream
服務(wù)端發(fā)送和接收數(shù)據(jù)的接口,注意它是一個(gè)接口
?? info *grpc.StreamServerInfo包含三個(gè)字段:
FullMethod是請(qǐng)求的method名字(例如/ecommerce.OrderManagement/updateOrders);
IsClientStream 是否是客戶端流
IsServerStream 是否是服務(wù)端流
?? handler包裝了服務(wù)實(shí)現(xiàn)
所以在調(diào)用它之前我們可以進(jìn)行改寫(xiě)數(shù)據(jù)流、記錄邏輯開(kāi)始時(shí)間等操作
調(diào)用完handler即完成了RPC,因?yàn)槭橇魇秸{(diào)用所以不會(huì)返回響應(yīng)數(shù)據(jù),只有error
流式攔截器既沒(méi)有請(qǐng)求字段,handler也不會(huì)返回響應(yīng),該如何記錄、修改請(qǐng)求響應(yīng)呢?
如果想劫持流數(shù)據(jù),答案就在ss ServerStream。再重復(fù)一遍它的含義:服務(wù)端視角的流,它是一個(gè)接口。無(wú)論是哪一種流式RPC對(duì)于服務(wù)端來(lái)說(shuō)發(fā)送(SendMsg)就代表著響應(yīng)數(shù)據(jù),接收(RecvMsg)就代表著請(qǐng)求數(shù)據(jù),不同的流式RPC的區(qū)別就在于是多次發(fā)送數(shù)據(jù)(服務(wù)器端流式 RPC)還是多次接收數(shù)據(jù)(客戶端流式 RPC)或者兩者均有(雙向流式 RPC)。因此可以對(duì)ss進(jìn)行包裝,只要傳入handler的類型實(shí)現(xiàn)ServerStream即可
//?SendMsg?method?call.
type?wrappedStream?struct?{
?Recv?[]interface{}
?Send?[]interface{}
?grpc.ServerStream
}
func?(w?*wrappedStream)?RecvMsg(m?interface{})?error?{
?err?:=?w.ServerStream.RecvMsg(m)
?w.Recv?=?append(w.Recv,?m)
?return?err
}
func?(w?*wrappedStream)?SendMsg(m?interface{})?error?{
?err?:=?w.ServerStream.SendMsg(m)
?w.Send?=?append(w.Send,?m)
?return?err
}
func?newWrappedStream(s?grpc.ServerStream)?*wrappedStream?{
?return?&wrappedStream{
??make([]interface{},?0),
??make([]interface{},?0),
??s,
?}
}
func?orderStreamServerInterceptor(srv?interface{},
?ss?grpc.ServerStream,?info?*grpc.StreamServerInfo,?handler?grpc.StreamHandler)?error?{
?//?Pre-processing?logic
?s?:=?time.Now()
?//?Invoking?the?StreamHandler?to?complete?the?execution?of?RPC?invocation
?nss?:=?newWrappedStream(ss)
?err?:=?handler(srv,?nss)
?//?Post?processing?logic
?log.Printf("Method:?%s,?req:?%+v,?resp:?%+v,?latency:?%s\n",
??info.FullMethod,?nss.Recv,?nss.Send,?time.Now().Sub(s))
?return?err
}
客戶端攔截器
客戶端攔截器和服務(wù)端攔截器類似,從請(qǐng)求開(kāi)始按順序執(zhí)行攔截器,在獲取到服務(wù)端響應(yīng)之后,再按反向的順序執(zhí)行攔截器中對(duì)響應(yīng)的處理邏輯
客戶端攔截器unary interceptors
client端要實(shí)現(xiàn)UnaryClientInterceptor接口實(shí)現(xiàn)的接口如下
func(ctx?context.Context,?method?string,?req,?reply?interface{},?
?????cc?*ClientConn,?invoker?UnaryInvoker,?opts?...CallOption)?error
你可以在調(diào)用遠(yuǎn)程函數(shù)前攔截RPC,通過(guò)獲取RPC相關(guān)信息,如參數(shù),上下文,函數(shù)名,請(qǐng)求等,你甚至可以修改原始的遠(yuǎn)程調(diào)用
-
ctx context.Context:?jiǎn)蝹€(gè)請(qǐng)求的上下文 -
method string:請(qǐng)求的method名字(例如/ecommerce.OrderManagement/getOrder) -
req, reply interface{}:請(qǐng)求和響應(yīng)數(shù)據(jù) -
cc *ClientConn:客戶端與服務(wù)端的鏈接 -
invoker UnaryInvoker:通過(guò)調(diào)用它我們可以完成RPC并獲取到響應(yīng) -
opts ...CallOption:RPC調(diào)用的所有配置項(xiàng),包含設(shè)置到conn上的,也包含配置在每一個(gè)調(diào)用上的
示例
func?orderUnaryClientInterceptor(ctx?context.Context,?method?string,?req,?reply?interface{},
?cc?*grpc.ClientConn,?invoker?grpc.UnaryInvoker,?opts?...grpc.CallOption)?error?{
?//?Pre-processor?phase
?s?:=?time.Now()
?//?Invoking?the?remote?method
?err?:=?invoker(ctx,?method,?req,?reply,?cc,?opts...)
?//?Post-processor?phase
?log.Printf("method:?%s,?req:?%s,?resp:?%s,?latency:?%s\n",
??method,?req,?reply,?time.Now().Sub(s))
?return?err
}
func?main()?{
?conn,?err?:=?grpc.Dial("127.0.0.1:8009",
??grpc.WithInsecure(),
??grpc.WithUnaryInterceptor(orderUnaryClientInterceptor),
?)
?if?err?!=?nil?{
??panic(err)
?}
?c?:=?pb.NewOrderManagementClient(conn)
??
??//?...
}
根據(jù)示例再重新看下攔截器接口的參數(shù)
?? cc *grpc.ClientConn客戶端與服務(wù)端的鏈接
這里的cc就是示例代碼中c := pb.NewOrderManagementClient(conn)的conn
?? invoker grpc.UnaryInvoker包裝了服務(wù)實(shí)現(xiàn)
調(diào)用完invoker即完成了RPC,所以我們可以改寫(xiě)req或者在獲取到reply之后修改響應(yīng)

streaming interceptors
要實(shí)現(xiàn)的接口StreamClientInterceptor
func(ctx?context.Context,?desc?*StreamDesc,?cc?*ClientConn,?
?????method?string,?streamer?Streamer,?opts?...CallOption)?(ClientStream,?error)
和serve端類似的參數(shù)類似,重點(diǎn)關(guān)注下面幾個(gè)參數(shù)
-
cs ClientStream:客戶端視角的流。類比服務(wù)端的ss ServerStream,無(wú)論是哪一種流式RPC對(duì)于客戶端來(lái)說(shuō)發(fā)送(SendMsg)就代表著請(qǐng)求數(shù)據(jù),接收(RecvMsg)就代表著響應(yīng)數(shù)據(jù)(正好和服務(wù)端是反過(guò)來(lái)的) -
streamer Streamer:完成RPC請(qǐng)求的調(diào)用
示例
這里不再贅述,可以參考服務(wù)端攔截器
func?orderStreamClientInterceptor(ctx?context.Context,?desc?*grpc.StreamDesc,
?cc?*grpc.ClientConn,?method?string,?streamer?grpc.Streamer,
?opts?...grpc.CallOption)?(grpc.ClientStream,?error)?{
?//?Pre-processing?logic
?s?:=?time.Now()
?cs,?err?:=?streamer(ctx,?desc,?cc,?method,?opts...)
?//?Post?processing?logic
?log.Printf("method:?%s,?latency:?%s\n",?method,?time.Now().Sub(s))
?return?cs,?err
}
func?main()?{
?conn,?err?:=?grpc.Dial("127.0.0.1:8009",
??grpc.WithInsecure(),
??grpc.WithStreamInterceptor(orderStreamClientInterceptor),
?)
?if?err?!=?nil?{
??panic(err)
?}
?c?:=?pb.NewOrderManagementClient(conn)
??
??//?...
}
如何記錄或者修改流攔截器的請(qǐng)求響應(yīng)數(shù)據(jù)?
和服務(wù)端stream interceptor同樣的道理,通過(guò)包裝ClientStream即可做到
//?SendMsg?method?call.
type?wrappedStream?struct?{
?method?string
?grpc.ClientStream
}
func?(w?*wrappedStream)?RecvMsg(m?interface{})?error?{
?err?:=?w.ClientStream.RecvMsg(m)
?log.Printf("method:?%s,?res:?%s\n",?w.method,?m)
?return?err
}
func?(w?*wrappedStream)?SendMsg(m?interface{})?error?{
?err?:=?w.ClientStream.SendMsg(m)
?log.Printf("method:?%s,?req:?%s\n",?w.method,?m)
?return?err
}
func?newWrappedStream(method?string,?s?grpc.ClientStream)?*wrappedStream?{
?return?&wrappedStream{
??method,
??s,
?}
}
func?orderStreamClientInterceptor(ctx?context.Context,?desc?*grpc.StreamDesc,
?cc?*grpc.ClientConn,?method?string,?streamer?grpc.Streamer,
?opts?...grpc.CallOption)?(grpc.ClientStream,?error)?{
?//?Pre-processing?logic
?s?:=?time.Now()
?cs,?err?:=?streamer(ctx,?desc,?cc,?method,?opts...)
?//?Post?processing?logic
?log.Printf("method:?%s,?latency:?%s\n",?method,?time.Now().Sub(s))
?return?newWrappedStream(method,?cs),?err
}
攔截器鏈
服務(wù)器只能配置一個(gè) unary interceptor和 stream interceptor,否則會(huì)報(bào)錯(cuò),客戶端也是,雖然不會(huì)報(bào)錯(cuò),但是只有最后一個(gè)才起作用。
//?服務(wù)端攔截器
s?:=?grpc.NewServer(
??grpc.UnaryInterceptor(orderUnaryServerInterceptor),
??grpc.StreamInterceptor(orderStreamServerInterceptor),
)
//?客戶端攔截器
conn,?err?:=?grpc.Dial("127.0.0.1:8009",
??grpc.WithInsecure(),
??grpc.WithUnaryInterceptor(orderUnaryClientInterceptor),
??grpc.WithStreamInterceptor(orderStreamClientInterceptor),
)
如果你想配置多個(gè),可以使用攔截器鏈或者自己實(shí)現(xiàn)一個(gè)。
//?服務(wù)端攔截器
s?:=?grpc.NewServer(
??grpc.ChainUnaryInterceptor(
????orderUnaryServerInterceptor1,
????orderUnaryServerInterceptor2,
??),
??grpc.ChainStreamInterceptor(
????orderServerStreamInterceptor1,
????orderServerStreamInterceptor2,
??),
)
//?客戶端攔截器
conn,?err?:=?grpc.Dial("127.0.0.1:8009",
??grpc.WithInsecure(),
??grpc.WithChainUnaryInterceptor(
???orderUnaryClientInterceptor1,
??????orderUnaryClientInterceptor2,
??),
??grpc.WithChainStreamInterceptor(
???orderStreamClientInterceptor1,
??????orderStreamClientInterceptor2,
??),
)
生態(tài)
除了可以自己實(shí)現(xiàn)攔截器外,gRPC生態(tài)也提供了一系列的開(kāi)源的攔截器可供使用,覆蓋權(quán)限、日志、監(jiān)控等諸多方面
https://github.com/grpc-ecosystem/go-grpc-middleware
Auth
-
grpc_auth- a customizable (viaAuthFunc) piece of auth middleware
Logging
-
grpc_ctxtags- a library that adds aTagmap to context, with data populated from request body -
grpc_zap- integration of zap logging library into gRPC handlers. -
grpc_logrus- integration of logrus logging library into gRPC handlers. -
grpc_kit- integration of go-kit/log logging library into gRPC handlers. -
grpc_grpc_logsettable- a wrapper aroundgrpclog.LoggerV2that allows to replace loggers in runtime (thread-safe).
Monitoring
-
grpc_prometheus? - Prometheus client-side and server-side monitoring middleware -
otgrpc? - OpenTracing client-side and server-side interceptors -
grpc_opentracing- OpenTracing client-side and server-side interceptors with support for streaming and handler-returned tags
Client
-
grpc_retry- a generic gRPC response code retry mechanism, client-side middleware
Server
-
grpc_validator- codegen inbound message validation from.protooptions -
grpc_recovery- turn panics into gRPC errors -
ratelimit- grpc rate limiting by your own limiter
示例代碼
https://github.com/liangwt/grpc-example/tree/main/06-interceptors
推薦閱讀
我為大家整理了一份 從入門(mén)到進(jìn)階的Go學(xué)習(xí)資料禮包 ,包含學(xué)習(xí)建議:入門(mén)看什么,進(jìn)階看什么。 關(guān)注公眾號(hào) 「polarisxu」,回復(fù)? ebook ?獲??;還可以回復(fù)「進(jìn)群」,和數(shù)萬(wàn) Gopher 交流學(xué)習(xí)。
