gRPC入門(mén)指南 — 服務(wù)端流式RPC(二)
前言
前一篇文章,我們學(xué)習(xí)了簡(jiǎn)單模式 RPC(Simple RPC),gRPC 服務(wù)端和客戶(hù)端在通信時(shí)始終只有一個(gè)請(qǐng)求和一個(gè)響應(yīng)。實(shí)際的應(yīng)用場(chǎng)景中,我們會(huì)遇到需要不斷傳輸數(shù)據(jù)的時(shí)候,比如當(dāng)數(shù)據(jù)量很大時(shí)。這個(gè)時(shí)候我們就可以使用流式 RPC,可以實(shí)現(xiàn)邊傳輸、邊處理數(shù)據(jù)。這篇文章先介紹下服務(wù)端流式 RPC:客戶(hù)端一次請(qǐng)求,服務(wù)端通過(guò) stream 的方式響應(yīng)多條數(shù)據(jù),如下圖所示:

創(chuàng)建并編譯proto文件
新建文件 server_stream.proto
syntax = "proto3";
package proto;
// 定義發(fā)送請(qǐng)求信息
message SimpleRequest{
// 參數(shù)類(lèi)型 參數(shù)名稱(chēng) 標(biāo)識(shí)號(hào)
string data = 1;
}
// 定義流式響應(yīng)信息
message StreamResponse{
int32 code = 1;
string value = 2;
}
// 定義我們的服務(wù)(可以定義多個(gè)服務(wù),每個(gè)服務(wù)可以定義多個(gè)接口)
service StreamService{
// 服務(wù)端流式RPC,需要在響應(yīng)數(shù)據(jù)前加stream
rpc List(SimpleRequest) returns (stream StreamResponse){};
}
服務(wù)端流式 RPC,定義方法時(shí)需要在返回值之前加上 stream。
進(jìn)入 server_stream.proto 所在的目錄,使用如下命令編譯文件
protoc --go_out=plugins=grpc:. server_stream.proto
執(zhí)行完成之后會(huì)生成 server_stream.pb.go 文件。
創(chuàng)建server端
package main
import (
pb "go-grpc-example/2-server_stream_rpc/proto"
"google.golang.org/grpc"
"log"
"net"
"time"
)
const (
Address string = ":8000"
Network string = "tcp"
)
// 定義我們的服務(wù)
type StreamService struct{}
// 實(shí)現(xiàn)List方法
func (s *StreamService) List(req *pb.SimpleRequest, srv pb.StreamService_ListServer) error {
for i := 0; i < 5; i++ {
// 向流中發(fā)送消息,默認(rèn)每次發(fā)送消息大小為 math.MaxInt32 byte
err := srv.Send(&pb.StreamResponse{
Code: int32(i),
Value: req.Data,
})
if err != nil {
return err
}
time.Sleep(1 * time.Second)
}
return nil
}
func main() {
// 1.監(jiān)聽(tīng)端口
listener, err := net.Listen(Network, Address)
if err != nil {
log.Fatalf("listener err: %v", err)
}
log.Println(Address + " net.Listing...")
// 2.實(shí)例化gRPC服務(wù)端
// 默認(rèn)單次接受消息大小為 1024*1024*4 字節(jié)(4M),發(fā)送大小為 math.MaxInt32 字節(jié)
grpcServer := grpc.NewServer()
// 3.注冊(cè)我們實(shí)現(xiàn)的服務(wù) StreamService
pb.RegisterStreamServiceServer(grpcServer, &StreamService{})
// 4.啟動(dòng)gRPC服務(wù)端
err = grpcServer.Serve(listener)
if err != nil {
log.Fatalf("grpc server err: %v", err)
}
}
通過(guò)追源代碼可以看到,實(shí)例化 gRPC 服務(wù)端時(shí),默認(rèn)情況下默認(rèn)單次接受消息大小為 1024 * 1024 * 4 字節(jié)(4M),發(fā)送大小為 math.MaxInt32 字節(jié)。
const (
defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
defaultServerMaxSendMessageSize = math.MaxInt32
)
運(yùn)行服務(wù)端:
go run server.go
輸出:
:8000 net listening...
創(chuàng)建client端
package main
import (
"context"
pb "go-grpc-example/2-server_stream_rpc/proto"
"google.golang.org/grpc"
"io"
"log"
)
const Address string = ":8000"
func main() {
// 1.連接服務(wù)端
conn, err := grpc.Dial(Address, grpc.WithInsecure())
if err != nil {
log.Fatalf("grpc conn err: %v", err)
}
defer conn.Close()
// 2.創(chuàng)建grpc客戶(hù)端
grpcClient := pb.NewStreamServiceClient(conn)
// 3.調(diào)用服務(wù)端提供的服務(wù)
req := &pb.SimpleRequest{
Data: "Hello,Server",
}
stream, err := grpcClient.List(context.Background(), req)
if err != nil {
log.Fatalf("call server err,%v", err)
}
for {
// 4.處理服務(wù)端發(fā)送過(guò)來(lái)的流信息
resp, err := stream.Recv()
if err == io.EOF { // 流是否結(jié)束
break
}
if err != nil {
log.Fatalf("client get stream err:%v", err)
}
log.Printf("get from stream server,code:%v,value:%v", resp.GetCode(), resp.GetValue())
}
}
從代碼可以看出,客戶(hù)端發(fā)送一次請(qǐng)求,之后在 for 循環(huán)里面不停地從服務(wù)端接受信息,只到服務(wù)端結(jié)束發(fā)送。
運(yùn)行客戶(hù)端:
go run client.go
get from stream server,code:0,value:Hello,Server
get from stream server,code:1,value:Hello,Server
get from stream server,code:2,value:Hello,Server
get from stream server,code:3,value:Hello,Server
get from stream server,code:4,value:Hello,Server
總結(jié)
這篇文章主要介紹了服務(wù)端流式 RPC的簡(jiǎn)單使用,該模式下客戶(hù)端發(fā)送一次請(qǐng)求,服務(wù)端會(huì)發(fā)送多次響應(yīng)數(shù)據(jù)。
下篇文章我們介紹下客戶(hù)端流式 RPC。
推薦閱讀
