.NET領(lǐng)域最硬核的gRPC 核心能力一把梭

前言,本文定位為.NET方向 grpc核心能力一把梭,全篇是姿勢性和結(jié)論性的展示, 方便中高級程序員快速上手.NET Grpc。
有關(guān)grpc更深層次的前世今生、底層原理、困惑點釋疑請聽下回分解, 歡迎菜鳥老鳥們提出寶貴意見。
- grpc宏觀目標(biāo): 高性能rpc框架
- grpc框架實現(xiàn)宏觀目標(biāo)的底層3協(xié)議
- http2通信協(xié)議, 基礎(chǔ)能力
- proto buffer:打解包協(xié)議==> 二進制
- proto buffer:服務(wù)協(xié)議,IDL
- 調(diào)用管道: 池化tcp、 tcp探活
- 負載均衡
- 元數(shù)據(jù) metadata
- 攔截器
一. 宏觀目標(biāo)
gRPC是高性能的RPC框架, 有效地用于服務(wù)通信(不管是數(shù)據(jù)中心內(nèi)部還是跨數(shù)據(jù)中心)。
科普rpc:程序可以像調(diào)用本地函數(shù)和本地對象一樣, 達成調(diào)用遠程服務(wù)的效果,rpc屏蔽了底層的通信細節(jié)和打解包細節(jié)。跟許多rpc協(xié)議一樣, grpc也是基于IDL(interface define lauguage)來定義服務(wù)協(xié)議。
grpc是基于http/2協(xié)議的高性能的rpc框架。
二. grpc實現(xiàn)跨語言的rpc調(diào)用目標(biāo)
基于三協(xié)議:
- 底層傳輸協(xié)議:基于http2 (多路復(fù)用、雙向流式通信)
- 打解包協(xié)議:基于proto Buffer 打包成二進制格式傳輸
- 接口協(xié)議:基于契約優(yōu)先的開發(fā)方式(契約以proto buffer格式定義), 可以使用protoc 編譯器生產(chǎn)各種語言的本地代理類, 磨平了微服務(wù)平臺中各語言的編程隔閡。
下圖演示了C++ grpc服務(wù), 被跨語言客戶端調(diào)用, rpc服務(wù)提供方會在調(diào)用方產(chǎn)生服務(wù)代理stub, 客戶端就像調(diào)用本地服務(wù)一樣,產(chǎn)生遠程調(diào)用的效果。
在大規(guī)模微服務(wù)中,C++grpc服務(wù)也可能作為調(diào)用的客戶端, 于是這個服務(wù)上可能也存在其他服務(wù)提供方的服務(wù)代理stub, 上圖沒有體現(xiàn)。
三. 通過腳手架項目分析gRPC簡單一元通信
我們將從使用gRPC服務(wù)模板創(chuàng)建一個新的dotnet項目。
VS gRPC服務(wù)模板默認使用TLS 來創(chuàng)建gRRPC服務(wù), 實際上不管是HTTP1.1 還是HTTP2, 都不強制要求使用TLS 如果服務(wù)一開始同時支持HTTP1.1+ HTTP2 但是沒有TLS, 那么協(xié)商的結(jié)果將是 HTTP1.1+ TLS,這樣的話gRPC調(diào)用將會失敗。
3.1 The RPC Service Definition
protocol buffers既用作服務(wù)的接口定義語言(記錄服務(wù)定義和負載消息),又用作底層消息交換格式。這個說法語上面的3大底層協(xié)議2,3 呼應(yīng)。
① 使用protocol buffers在.proto文件中定義服務(wù)接口。在其中,定義可遠程調(diào)用的方法的入?yún)⒑头祷刂殿愋汀7?wù)器實現(xiàn)此接口并運行g(shù)RPC服務(wù)器以處理客戶端調(diào)用。
② 定義服務(wù)后,使用PB編譯器protoc從.proto文件生成指定語言的數(shù)據(jù)訪問/傳輸類stub,該文件包含服務(wù)接口中消息和方法的實現(xiàn)。
syntax = "proto3"; // `syntax`指示使用的protocol buffers的版本
option csharp_namespace = "GrpcAuthor"; // `csharp_namespace`指示未來生成的存根文件所在的`命名空間`, 這是對應(yīng)C#語言, java語言應(yīng)填 java_package
package greet;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply); // 一元rpc調(diào)用
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
注釋一看就懂。
接下來使用protoc編譯器和C#插件來對proto文件生成服務(wù)器或客戶端代碼。
- ① 由客戶端和服務(wù)共享的強類型對象,表示消息的服務(wù)操作和數(shù)據(jù)元素, 這個是pb序列化協(xié)議的強類型對象。
- ②一個強類型基類,具有遠程 gRPC 服務(wù)可以繼承和擴展的所需網(wǎng)絡(luò)管道:Greeter.GreeterBase
- ③一個客戶端存根,其中包含調(diào)用遠程 gRPC 服務(wù)所需的管道:Greeter.GreeterClient 。運行時,每條消息都序列化為標(biāo)準(zhǔn) Protobuf 二進制表示形式,在客戶端和遠程服務(wù)之間交換。
3.2 實現(xiàn)服務(wù)定義
腳手架項目使用Grpc.AspNetCore NuGet包:所需的類由構(gòu)建過程自動生成, 你只需要在項目.csproj文件中添加配置節(jié):
<ItemGroup>
<Protobuf Include="Protos\greet.proto" GrpcServices="Server" />
</ItemGroup>
以下是繼承②強基類而實現(xiàn)的grpc服務(wù)
public class GreeterService : Greeter.GreeterBase
{
private readonly ILogger<GreeterService> _logger;
public GreeterService(ILogger<GreeterService> logger)
{
_logger = logger;
}
public override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
{
return Task.FromResult(new HelloReply
{
Message = "Hello " + request.Name
});
}
}
最后在原h(huán)ttp服務(wù)進程上注冊Grpc端點
public void ConfigureServices(IServiceCollection services)
{
services.AddGrpc();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
app.UseEndpoints(endpoints =>
{
endpoints.MapGrpcService<GreeterService>();
endpoints.MapGet("/", async context =>
{
await context.Response.WriteAsync("----http調(diào)用-------");
});
});
}
以上在localhost:5000端口同時支持了grpc調(diào)用和http調(diào)用。
--- 啟動服務(wù)---...
3.3. 創(chuàng)建gRPC .NET客戶端
Visual Studio創(chuàng)建一個名為GrpcAuthorClient的新控制臺項目。
安裝如下nuget包:
Install-Package Grpc.Net.Client // 包含.NET Core客戶端; Install-Package Google.Protobuf // 包含protobuf消息API; Install-Package Grpc.Tools // 對Protobuf文件進行編譯
① 拷貝服務(wù)端項目中的..proto文件
② 將選項csharp_namespace值修改為GrpcAuthorClient。
③ 更新.csproj文件的配置節(jié)
<ItemGroup>
<Protobuf Include="Protos\author.proto" GrpcServices="Client" />
</ItemGroup>
④ Client主文件:
static void Main(string[] args)
{
var serverAddress = "https://localhost:5001";
using var channel = GrpcChannel.ForAddress(serverAddress);
var client = new Greeter.GreeterClient(channel);
var reply = client.SayHello(new HelloRequest { Name = "宋小寶!" });
Console.WriteLine(reply.Message.ToString());
Console.WriteLine("Press any key to exit...");
Console.ReadKey();
}
使用服務(wù)器地址創(chuàng)建GrpcChannel,然后使用GrpcChannel對象實例化GreeterClient;然后使用SayHello同步方法; 服務(wù)器響應(yīng)時,打印結(jié)果。
腳手架例子就可以入門,下面聊一聊另外的核心功能
四. gRPC打乒乓球:雙向流式通信[1]
除了上面的一元rpc調(diào)用(Unary RPC), 還有
- Client streaming RPC:客戶端流式RPC,客戶端以流形式(一系列消息)向服務(wù)器發(fā)起請求,客戶端將等待服務(wù)器讀取消息并返回響應(yīng),gRPC服務(wù)端能保證了收到的單個RPC調(diào)用中的消息順序。
- Server streaming RPC :服務(wù)器流式RPC,客戶端向服務(wù)器發(fā)送請求,并獲取服務(wù)器流(一系列消息)。客戶端從返回的流(一系列消息)中讀取,直到?jīng)]有更多消息為止, gRPC客戶端能保證收到的單個RPC調(diào)用中的消息順序。
- Bidirectional streaming RPC:雙向流式RPC,雙方都使用讀寫流發(fā)送一系列消息。這兩個流是獨立運行的,因此客戶端和服務(wù)器可以按照自己喜歡的順序進行讀寫:例如,服務(wù)器可以在寫響應(yīng)之前等待接收所有客戶端消息,或者可以先讀取一條消息再寫入一條消息,或讀寫的其他組合,同樣每個流中的消息順序都會保留。
針對腳手架項目,稍作修改成打乒乓球,考察gRpc雙向流式通信、Cancellation機制、grpc元數(shù)據(jù)三個特性
雙向流式可以不管對方是否回復(fù),首先已方是可以持續(xù)發(fā)送的,己方可以等收到所有信息再回復(fù),也可以收到一次回復(fù)一次,也可以自定義收到幾次回復(fù)一次。
本次演示土乒乓球?qū)ィ?/p>
- 對攻用到 雙向流,收到一次,回復(fù)一次。
- 強制設(shè)置30s的回合對攻必須分出勝負, 使用Cancellation控制回合結(jié)束
- 對攻雙方是白云和黑土, 使用元數(shù)據(jù)約束
① 添加服務(wù)定義接口
rpc PingPongHello(stream Serve) returns (stream Catch);
② 服務(wù)器實現(xiàn)
public override async Task PingPongHello(IAsyncStreamReader<Serve> requestStream,IServerStreamWriter<Catch> responseStream, ServerCallContext context)
{
try
{
if ("baiyun" != context.RequestHeaders.Get("node").Value) // 接收請求頭 header
{
context.Status = new Status(StatusCode.PermissionDenied,"黑土只和白云打乒乓球"); // 設(shè)置響應(yīng)狀態(tài)碼
await Task.CompletedTask;
return;
}
await context.WriteResponseHeadersAsync(new Metadata{ // 發(fā)送響應(yīng)頭header
{ "node", "heitu" }
});
long round = 0L;
context.CancellationToken.Register(() => {
Console.WriteLine($"乒乓球回合制結(jié)束, {context.Peer} : {round}");
context.ResponseTrailers.Add("round", round.ToString()); // 統(tǒng)計一個回合里雙方有多少次對攻
context.Status = new Status(StatusCode.OK,""); // 設(shè)置響應(yīng)狀態(tài)碼
});
while (!context.CancellationToken.IsCancellationRequested)
{
var asyncRequests = requestStream.ReadAllAsync(context.CancellationToken);
await foreach (var req in asyncRequests)
{
var send = RandomDirect(); // ToDo 想要實現(xiàn)一個 隨時間衰減的概率算法,模擬對攻最后終止。
await responseStream.WriteAsync(new Catch
{
Direct = send,
Id = req.Id
});
Console.WriteLine($" {context.Peer} : 第{req.Id}次服務(wù)端收到 {req.Direct}, 第{req.Id + 1}次發(fā)送 {send}");
round++;
}
}
}
catch(Exception ex)
{
Console.WriteLine($"{ex.Message}");
}
finally
{
Console.WriteLine($"乒乓球回合制結(jié)束");
}
}
static Direction RandomDirect()
{
var ran = new Random();
var ix = ran.Next(0, 4);
var dir= new[] { "Front", "Back","Left", "Right", }[ix];
System.Enum.TryParse<Direction>(dir, out var direct);
return direct;
}
③ 客戶端
var serverAddress = "http://localhost:5000";
var handler = new SocketsHttpHandler
{
PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan,
KeepAlivePingDelay = TimeSpan.FromSeconds(60),
KeepAlivePingTimeout = TimeSpan.FromSeconds(30), // tcp心跳探活
EnableMultipleHttp2Connections = true // 啟用并發(fā)tcp連接
};
using var channel = GrpcChannel.ForAddress(serverAddress, new GrpcChannelOptions {
Credentials = ChannelCredentials.Insecure,
MaxReceiveMessageSize = 1024 * 1024 * 10,
MaxSendMessageSize = 1024 * 1024 * 10,
HttpHandler = handler
});
var client = new PingPong.PingPongClient(channel);
AsyncDuplexStreamingCall<Serve,Catch> duplexCall = null;
Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}, 白云先發(fā)球");
using (var cancellationTokenSource = new CancellationTokenSource(30*1000))
{
try
{
duplexCall = client.PingPongHello(new Metadata
{
{ "node", "baiyun" }
}, null, cancellationTokenSource.Token );
var headers = await duplexCall.ResponseHeadersAsync;
if ("heitu" != headers.Get("node").Value) // 接收響應(yīng)頭
{
throw new RpcException(new Status(StatusCode.PermissionDenied, "白云只和黑土打乒乓球"));
}
var direct = RandomDirect();
await duplexCall.RequestStream.WriteAsync(new Serve { Id= 1, Direct = direct }) ;
await foreach (var resp in duplexCall.ResponseStream.ReadAllAsync())
{
Console.WriteLine($"第{resp.Id}次攻防,客戶端發(fā)送{direct},客戶端收到 {resp.Direct}");
direct = RandomDirect();
await duplexCall.RequestStream.WriteAsync(new Serve { Id= resp.Id+1 ,Direct = direct });
}
Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}打乒乓球結(jié)束");
if (duplexCall != null)
{
var tr = duplexCall.GetTrailers(); // 接受響應(yīng)尾
var round = tr.Get("round").Value.ToString();
Console.Write($" 進行了 {round} 次攻防)");
}
}
catch (RpcException ex)
{
var trailers = ex.Trailers;
_ = trailers.GetValue("round");
}
catch(Exception ex)
{
Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}打乒乓球(30s回合制)結(jié)束 未分出勝負,{ex.Message}");
}
}
https://github.com/zaozaoniao/GrpcAuthor
五:grpc擴展點
grpc:是基于http2 多路復(fù)用能力,在單tcp連接上發(fā)起高效rpc調(diào)用的框架。根據(jù)grpc調(diào)用的生命周期:可在如下階段擴展能力
- 服務(wù)可尋址
- 附加在grpc header/trailer的元數(shù)據(jù)
- 連接/調(diào)用 憑證
- 連接/調(diào)用 重試機制----> 攔截器
- 調(diào)用狀態(tài)碼 :https://grpc.github.io/grpc/core/md_doc_statuscodes.html
下面挑選幾個核心的擴展點著重聊一聊。
5.1 負載均衡
哪些調(diào)用能做負載均衡?
只有[gRPC調(diào)用]能實現(xiàn)對多服務(wù)提供方節(jié)點的負載平衡, 一旦建立了gRPC流式調(diào)用,所有通過該流式調(diào)用發(fā)送的消息都將發(fā)送到一個端點。
grpc負載均衡的時機?
grpc誕生的初衷是點對點通信,現(xiàn)在常用于內(nèi)網(wǎng)服務(wù)之間的通信,在微服務(wù)背景下,服務(wù)調(diào)用也有負載均衡的問題,也正因為連接建立之后是“點對點通信”,所以不方便基于L4做負載均衡。
根據(jù)grpc的調(diào)用姿勢, grpc的負載均衡可在如下環(huán)節(jié):
① 客戶端負載均衡 :對于每次rpc call,選擇一個服務(wù)終結(jié)點,直接調(diào)用無延遲, 但客戶端需要周期性尋址 。
② L7做服務(wù)端負載均衡 :L7負載層能理解HTTP/2,并且能在一個HTTP/2連接上跨多個服務(wù)提供方節(jié)點將[多路復(fù)用的gRPC調(diào)用]分發(fā)給上游服務(wù)節(jié)點。使用代理比客戶端負載平衡更簡單,但會給gRPC調(diào)用增加額外的延遲。
常見的是客戶端負載均衡。
- https://grpc.io/blog/grpc-load-balancing/
5.2 調(diào)用通道
grpc 利用http2 使用單一tcp連接提供到指定主機端口上年的grpc調(diào)用,通道是與遠程服務(wù)器的長期tcp連接的抽象。
客戶端對象可以重用相同的通道,與rpc調(diào)用行為相比,創(chuàng)建通道是一項昂貴的操作,因此應(yīng)該為盡可能多的調(diào)用重復(fù)使用單個通道。
根據(jù)http2 上默認并發(fā)流的限制(100), .NET支持在單tcp連接并發(fā)流到達上限的時候,產(chǎn)生新的tcp連接, 故通道是一個池化的tcp并發(fā)流的概念, grpc通道具有狀態(tài),包括已連接和空閑.
像websockets這類長時間利用tcp連接的機制一樣,都需要心跳保活機制, 可以快速的進行g(shù)rpc調(diào)用,而不用等待tcp連接建立而延遲。
可以指定通道參數(shù)來修改gRPC的默認行為,例如打開或關(guān)閉消息壓縮, 添加連接憑據(jù)。
var handler = new SocketsHttpHandler
{
PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan,
KeepAlivePingDelay = TimeSpan.FromSeconds(60),
KeepAlivePingTimeout = TimeSpan.FromSeconds(30), // tcp心跳探活
EnableMultipleHttp2Connections = true // 啟用并發(fā)tcp連接
};
var channel = GrpcChannel.ForAddress("https://localhost:5001", new GrpcChannelOptions
{
Credentials = ChannelCredentials.Insecure, // 連接憑據(jù)
HttpHandler = handler
});
https://learn.microsoft.com/en-us/aspnet/core/grpc/performance?view=aspnetcore-7.0
5.3 Metadata
元數(shù)據(jù)是以鍵值對列表的形式提供的有關(guān)特定RPC調(diào)用的信息(身份認證信息、訪問令牌、代理信息),在grpc調(diào)用雙方,一般元數(shù)據(jù)存儲在header或trailer 中。
客戶端發(fā)起調(diào)用時會有metadata參數(shù)可供使用:
// 上例中的 proto被編譯之后產(chǎn)生了如下 sdk
public virtual HelloReply SayHello(HelloRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
{
return SayHello(request, new CallOptions(headers, deadline, cancellationToken));
}
對于身份認證元數(shù)據(jù),有更通用的方式:builder.Services.AddGrpcClient<Greeter.GreeterClient>().AddCallCredentials((x,y) =>{ })
grpc 服務(wù)端可發(fā)送的是 header 和trailer, trailer只能在服務(wù)端響應(yīng)完畢發(fā)送, 至于為什么有header,還有trailer,請看再談 gRPC 的 Trailers 設(shè)計[2], 總體而言grpc流式通信需要在調(diào)用結(jié)束 給客戶端傳遞一些之前給不了的信息。
await context.WriteResponseHeadersAsync(new Metadata{ // 發(fā)送響應(yīng)頭
{ "node", "B" }
});
context.ResponseTrailers.Add("count", cnt); // 發(fā)送響應(yīng)尾
context.Status = Status.DefaultSuccess; // 設(shè)置響應(yīng)狀態(tài)碼
5.4 自定義攔截器和可能使用到的HttpClient
攔截器與 .net httpclientDelegate 、 axio的請求攔截器類似,都是在發(fā)起調(diào)用的時候,做一些過濾或者追加的行為。https://learn.microsoft.com/en-us/aspnet/core/grpc/interceptors?view=aspnetcore-8.0
builder.Services
.AddGrpcClient<Greeter.GreeterClient>(o =>
{
o.Address = new Uri("https://localhost:5001");
})
.AddInterceptor<LoggingInterceptor>(); // 默認在客戶端之間共享
// 以下是一個客戶端日志攔截器,在一元異步調(diào)用時攔截
public class ClientLoggingInterceptor : Interceptor
{
private readonly ILogger _logger;
public ClientLoggingInterceptor(ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<ClientLoggingInterceptor>();
}
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
TRequest request,
ClientInterceptorContext<TRequest, TResponse> context,
AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
{
_logger.LogInformation("Starting call. Type/Method: {Type} / {Method}",
context.Method.Type, context.Method.Name); // 攔截動作: 在continuation之前做日志記錄。
return continuation(request, context);
}
}
總結(jié)
gRPC是具有可插拔身份驗證和負載平衡功能的高性能RPC框架。
使用protocol buffers定義結(jié)構(gòu)化數(shù)據(jù); 針對不同語言編譯出的代理sdk屏蔽底層通信和打接包細節(jié), 完成了本地實現(xiàn)遠程調(diào)用的效果 (調(diào)用方不care是遠程通信)。
Additional Resources
? https://developers.google.com/protocol-buffers/docs/csharptutorial ? https://www.grpc.io/docs/what-is-grpc/core-concepts/ ? https://docs.microsoft.com/en-us/dotnet/architecture/grpc-for-wcf-developers/why-grpc
- https://thenewstack.io/grpc-a-deep-dive-into-the-communication-pattern/
雙向流式通信: https://thenewstack.io/grpc-a-deep-dive-into-the-communication-pattern/
[2]再談 gRPC 的 Trailers 設(shè)計: https://taoshu.in/grpc-trailers.html
全文原創(chuàng),希望得到各位反饋,歡迎斧正交流, 若有更多進展,會實時更新到[左下角閱讀原文]。
微信公眾號又又又又又改版了 推送規(guī)則也改變了,置為星標(biāo)不迷路!
