gRPC入門指南 — 客戶端流式RPC(三)
前言
前一篇文章我們學(xué)習(xí)了服務(wù)端流式RPC,客戶端發(fā)送一次請求,通過流的方式多次從服務(wù)端收到信息。這一節(jié)我們來學(xué)習(xí)下客戶端流式RPC,該模式與服務(wù)端流式RPC正好相反,客戶端不斷向服務(wù)端發(fā)送數(shù)據(jù),結(jié)束之后,服務(wù)端返回一個響應(yīng),如下:

新建并編譯 proto 文件
新建 client_stream.proto 文件:
syntax = "proto3";
package proto;
// 定義流式請求信息
message StreamRequest{
// 參數(shù)類型 參數(shù)名稱 標(biāo)識號
string data = 1;
}
// 定義響應(yīng)信息
message SimpleResponse{
int32 code = 1;
string value = 2;
}
// 定義我們的服務(wù)(可以定義多個服務(wù),每個服務(wù)可以定義多個接口)
service StreamService{
// 客戶端流式RPC,需要在請求數(shù)據(jù)前加stream
rpc Record(stream StreamRequest) returns (SimpleResponse){};
}
客戶端流式 RPC,定義方法時需要在請求值之前加上 stream。
進(jìn)入 client_stream.proto 所在的目錄,使用如下命令編譯文件
protoc --go_out=plugins=grpc:. client_stream.proto
執(zhí)行完成之后會生成 client_stream.pb.go 文件。
創(chuàng)建server端
package main
import (
pb "go-grpc-example/3-client_stream_rpc/proto"
"google.golang.org/grpc"
"io"
"log"
"net"
)
const (
Address string = ":8000"
Network string = "tcp"
)
// 定義我們的服務(wù)
type StreamService struct{}
// 實(shí)現(xiàn) Record 方法
func (s *StreamService) Record(srv pb.StreamService_RecordServer) error {
for {
// 從流中獲取消息
req, err := srv.Recv()
if err == io.EOF {
// 發(fā)送數(shù)據(jù)并關(guān)閉
return srv.SendAndClose(&pb.SimpleResponse{
Code: 1,
Value: "ok",
})
}
if err != nil {
return err
}
log.Printf("get from client:%v", req.Data)
}
}
func main() {
// 1.監(jiān)聽端口
listener, err := net.Listen(Network, Address)
if err != nil {
log.Fatalf("listener err: %v", err)
}
log.Println(Address + " net.Listing...")
// 2.創(chuàng)建gRPC服務(wù)端實(shí)例
grpcServer := grpc.NewServer()
// 3.注冊我們實(shí)現(xiàn)的服務(wù) StreamService
pb.RegisterStreamServiceServer(grpcServer, &StreamService{})
// 4.啟動gRPC服務(wù)端
err = grpcServer.Serve(listener)
if err != nil {
log.Fatalf("grpc server err: %v", err)
}
}
在實(shí)現(xiàn)的 Record() 方法中,可以看到 server 端在 for 循環(huán)中不斷從客戶端接收消息,知道接收完畢,服務(wù)端返回一個響應(yīng)。
運(yùn)行服務(wù)端:
go run server.go
輸出:
:8000 net listening...
創(chuàng)建client端
package main
import (
"context"
pb "go-grpc-example/3-client_stream_rpc/proto"
"google.golang.org/grpc"
"log"
"strconv"
"time"
)
const Address string = ":8000"
func main() {
// 1.連接服務(wù)端
conn, err := grpc.Dial(Address, grpc.WithInsecure())
if err != nil {
log.Fatalf("grpc conn err: %v", err)
}
defer conn.Close()
// 2.建立gRPC連接
streamClient := pb.NewStreamServiceClient(conn)
// 3.調(diào)用record,獲取流
stream, err := streamClient.Record(context.Background())
if err != nil {
log.Fatalf("call record err: %v", err)
}
for i := 0; i < 5; i++ {
// 4.向流中發(fā)送數(shù)據(jù)
err := stream.Send(&pb.StreamRequest{Data: strconv.Itoa(i)})
if err != nil {
log.Fatalf("stream request err: %v", err)
}
time.Sleep(1 * time.Second)
}
// 5.關(guān)閉流并獲取返回的消息
resp, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("client stream close err: %v", err)
}
log.Printf("get from server,code:%v,value:%v", resp.GetCode(), resp.GetValue())
}
客戶端代碼,在 for 循環(huán)里面向服務(wù)端發(fā)送了 5 次消息,接著調(diào)用 CloseAndRecv() 關(guān)閉流并接收服務(wù)端返回的數(shù)據(jù)。
運(yùn)行客戶端:
go run client.go
服務(wù)端輸出:
get from client:0
get from client:1
get from client:2
get from client:3
get from client:4
服務(wù)端輸出之后,客戶端輸出:
get from server,code:1,value:ok
總結(jié)
這篇文章主要介紹了客戶端流式 RPC的簡單使用,該模式下客戶端可以多次向服務(wù)端發(fā)送數(shù)據(jù),數(shù)據(jù)發(fā)送完畢之后,服務(wù)端會返回一次響應(yīng)。下篇文章我們會介紹雙向流式 RPC。
推薦閱讀
評論
圖片
表情
