翻譯翻譯, 什么叫g(shù)RPC雙向流?
今天來(lái)實(shí)戰(zhàn)一下gRPC的特性,雙向流。
首先認(rèn)識(shí)一下,什么是雙向流?
?所謂雙向流式 RPC ,是由客戶端調(diào)用方法來(lái)初始化,而服務(wù)端接收到客戶端的元數(shù)據(jù),方法名和截止時(shí)間。
并且服務(wù)端可以選擇發(fā)送回它的初始元數(shù)據(jù)或等待客戶端發(fā)送請(qǐng)求的一種通信方式。
雙向流,特點(diǎn)在于雙向,也就是請(qǐng)求響應(yīng)如何起作用是完全取決于應(yīng)用怎么處理,因?yàn)榭蛻舳撕头?wù)端能在任意順序上讀寫 ,也就是說(shuō)這些流的操作是完全獨(dú)立的。
例如服務(wù)端可以一直等待,直到它接收到所有客戶端的消息才寫應(yīng)答,或者服務(wù)端和客戶端可以像"乒乓球"一樣:服務(wù)端后得到一個(gè)請(qǐng)求就回送一個(gè)應(yīng)答,接著客戶端根據(jù)應(yīng)答來(lái)發(fā)送另一個(gè)請(qǐng)求,以此類推。
?
通俗地說(shuō),客戶端發(fā)送了 N 個(gè)請(qǐng)求,服務(wù)端返回 N 個(gè)或者 M 個(gè)響應(yīng),該特性能夠充分利用 HTTP/2.0 的多路復(fù)用功能。
某個(gè)時(shí)刻,HTTP/2.0 鏈路上可以既有請(qǐng)求也有響應(yīng),實(shí)現(xiàn)了全雙工通信(對(duì)比單行道和雙向車道),雙向流式RPC通信用一個(gè)簡(jiǎn)單的圖表示如下:

如何定義雙向流呢?
?一個(gè) 雙向流式 RPC 是雙方使用讀寫流去發(fā)送一個(gè)消息序列。兩個(gè)流獨(dú)立操作,因此客戶端和服務(wù)器 可以以任意喜歡的順序讀寫:
比如, 服務(wù)器可以在寫入響應(yīng)前等待接收所有的客戶端消息,或者可以交替地讀取和寫入消息,或者其他讀寫的組合。每個(gè)流中的消息順序被預(yù)留。開(kāi)發(fā)者可以通過(guò)在請(qǐng)求和響應(yīng)前加
?stream關(guān)鍵字去制定方法的類型。
我們可以在服務(wù)的IDL定義文件proto中按照如下方式聲明雙向流式RPC接口。
??//?Accepts?a?stream?of?RouteNotes?sent?while?a?route?is?being?traversed,
??//?while?receiving?other?RouteNotes?(e.g.?from?other?users).
??rpc?RouteChat(stream?RouteNote)?returns?(stream?RouteNote)?{}
聲明方式為在請(qǐng)求和響應(yīng)之前都添加關(guān)鍵字 stream。
實(shí)操一把
有了基本的概念,我們還是直接上手實(shí)操一下。
案例介紹
?本案例為一個(gè)簡(jiǎn)單的應(yīng)答接口,客戶端向服務(wù)端發(fā)送問(wèn)候語(yǔ),服務(wù)端接受問(wèn)候并返回服務(wù)端的問(wèn)候。
由于是雙向流,所以我們可以在一次接口調(diào)用中,發(fā)送多次問(wèn)候。
運(yùn)行的效果就是客戶端與服務(wù)端的日志是交替打印的,也就是說(shuō)服務(wù)端在客戶端調(diào)用接口的過(guò)程中就可以逐步發(fā)送響應(yīng)結(jié)果給客戶端,而不是像阻塞式請(qǐng)求一樣,等待客戶端請(qǐng)求發(fā)送完畢,再統(tǒng)一一次性返回接口。
?
這種雙向流的接口處理模式的好處是顯而易見(jiàn)的:
如果傳輸?shù)臄?shù)據(jù)包過(guò)大,客戶端可以將請(qǐng)求包拆分為多個(gè)小包發(fā)送至服務(wù)端。服務(wù)端依次處理小包,發(fā)送過(guò)程與處理過(guò)程互不干擾,互不依賴。 服務(wù)端不需要等待客戶端包全部發(fā)送,才能處理以及響應(yīng)。
服務(wù)IDL定義
?首先還是需要編寫定義proto接口定義
?
syntax?=?"proto3";
option?java_multiple_files?=?true;
option?java_package?=?"com.snowalker.grpc.sdk.stream";
option?java_outer_classname?=?"DoubleStreamProto";
//?服務(wù)IDL定義
service?DoubleStreamService?{
??rpc?chat?(stream?ChatRequest)?returns?(stream?ChatResponse)?{
??}
}
//?請(qǐng)求
message?ChatRequest?{
??int32?userId?=?1;
??string?msg?=?2;
}
//?響應(yīng)
message?ChatResponse?{
??int32?userId?=?1;
??string?msg?=?2;
}
重點(diǎn)關(guān)注chat接口定義,同時(shí)聲明請(qǐng)求與響應(yīng)為stream類型,標(biāo)記這是一個(gè)雙向流RPC。
對(duì)該proto文件進(jìn)行編譯,所在工程grpc-demo-sdk根路徑下執(zhí)行:
?mvn clean compile -DskipTests
?
編寫服務(wù)端--實(shí)現(xiàn)服務(wù)邏輯
服務(wù)端編寫與普通的RPC接口相同,也是需要繼承g(shù)RPC生成的XXXXGrpc.XXXXXImplBase,具體代碼如下:
public?class?DoubleStreamServiceImpl?extends?DoubleStreamServiceGrpc.DoubleStreamServiceImplBase?{
重寫chat方法,實(shí)現(xiàn)服務(wù)端邏輯,可以看到,業(yè)務(wù)邏輯同阻塞式接口不同,這里將業(yè)務(wù)邏輯寫在了onNext回調(diào)方法中,因此我們可以知道這是一種異步回調(diào)機(jī)制。
?/**
??*?@param?responseObserver
??*?@return
??*/
?@Override
?public?StreamObserver?chat(StreamObserver?responseObserver)?{
??return?new?StreamObserver()?{
???@Override
???public?void?onNext(ChatRequest?chatRequest)?{
????int?userId?=?chatRequest.getUserId();
????String?msg?=?chatRequest.getMsg();
????logger.info("[DoubleStreamServiceImpl]?服務(wù)端處理開(kāi)始....");
????logger.info("[DoubleStreamServiceImpl]?客戶端說(shuō):?["?+?msg?+?"]");
????responseObserver.onNext(ChatResponse.newBuilder()
??????.setUserId(chatRequest.getUserId())
??????.setMsg("這是一條來(lái)自[服務(wù)端]的消息:?你好,收到了-"?+?userId?+?"?的消息.?"?+?new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ssSSS").format(new?Date())?+?"\n")
??????.build());
???}
???@Override
???public?void?onError(Throwable?throwable)?{
????logger.warning("[DoubleStreamServiceImpl]?gRPC?dealing?error");
???}
???@Override
???public?void?onCompleted()?{
????responseObserver.onCompleted();
???}
??};
?}
構(gòu)造 Streaming 響應(yīng)對(duì)象 StreamObserver并實(shí)現(xiàn) onNext 等接口,由于服務(wù)端也是 Streaming模式,因此響應(yīng)是多個(gè)的,也就是說(shuō) onNext 會(huì)被調(diào)用多次。
在onNext方法中編寫服務(wù)端業(yè)務(wù)邏輯,這里主要做的就是取出請(qǐng)求體ChatRequest中的userId,以及msg,打印輸出,構(gòu)造響應(yīng)對(duì)象ChatResponse,并return。
實(shí)現(xiàn) onCompleted 方法,調(diào)用 「responseObserver.onCompleted()」 將請(qǐng)求返回客戶端。
編寫服務(wù)端--綁定服務(wù)實(shí)現(xiàn)類
同一般的阻塞式RPC接口相同,想要使雙向流式RPC生效,還需要注冊(cè)服務(wù)實(shí)現(xiàn)類到服務(wù)端ServerBuilder中。
?@SneakyThrows
?private?void?startServer()?{
??int?serverPort?=?10881;
??server?=?ServerBuilder.forPort(serverPort)
????//?上文中的報(bào)價(jià)服務(wù)實(shí)現(xiàn)類
????.addService(new?OrderServiceImpl())
????//?添加雙向流式RPC實(shí)現(xiàn)
????.addService(new?DoubleStreamServiceImpl())
????.build();
??server.start();
??logger.info("OrderServerBoot?started,?listening?on:"?+?serverPort);
??//?優(yōu)雅停機(jī)
??addGracefulShowdownHook();
?}
編寫客戶端--實(shí)現(xiàn)客戶端邏輯
?接著編寫客戶端邏輯。
?
雙向流式RPC的客戶端實(shí)現(xiàn)方式與傳統(tǒng)阻塞式客戶端也是不同的。
?首先定義接口ExtendResponseObserver,繼承StreamObserver,用于返回文本格式的響應(yīng)體,方便觀察。
?
public?interface?ExtendResponseObserver?extends?StreamObserver?{
????String?getExtra();
}
?接著編寫客戶端邏輯。
?
/**
?*?@author?snowalker
?*?@version?1.0
?*?@date?2022/3/16?23:49
?*?@className
?*?@desc
?*/
public?class?DoubleStreamClient?{
?private?static?final?Logger?logger?=?Logger.getLogger(DoubleStreamClient.class.getName());
?private?final?DoubleStreamServiceGrpc.DoubleStreamServiceStub?doubleStreamServiceStub;
?public?DoubleStreamClient(ManagedChannel?channel)?{
??doubleStreamServiceStub?=?DoubleStreamServiceGrpc.newStub(channel);
?}
?public?String?chat(String?msg,?int?user,?int?count)?{
??ExtendResponseObserver?chatResponseStreamObserver?=?new?ExtendResponseObserver()?{
???@Override
???public?String?getExtra()?{
????return?stringBuilder.toString();
???}
???//?用stringBuilder保存所有來(lái)自服務(wù)端的響應(yīng)
???private?StringBuilder?stringBuilder?=?new?StringBuilder();
???@Override
???public?void?onNext(ChatResponse?chatResponse)?{
????logger.info("[DoubleStreamClient]?onNext.....");
????//?放入匿名類的成員變量中
????System.out.println(chatResponse.getMsg());
????stringBuilder.append(String.format("服務(wù)端響應(yīng):%s
,?用戶:%d"?,?chatResponse.getMsg(),?chatResponse.getUserId()));
???}
???@Override
???public?void?onError(Throwable?throwable)?{
????logger.warning("[DoubleStreamClient]?gRPC?request?error");
????stringBuilder.append("[DoubleStreamClient]chat?gRPC?error,?"?+?throwable.getMessage());
???}
???@Override
???public?void?onCompleted()?{
????logger.info("[DoubleStreamClient]?onCompleted");
???}
??};
??//?重點(diǎn)!!!!RPC調(diào)用發(fā)起
??StreamObserver?chatRequestStreamObserver?=?doubleStreamServiceStub.chat(chatResponseStreamObserver);
??for(int?i?=?0;?i????//?每次執(zhí)行onNext都會(huì)發(fā)送一筆數(shù)據(jù)到服務(wù)端,
???//?服務(wù)端的onNext方法都會(huì)被執(zhí)行一次
???ChatRequest?chatRequest?=?ChatRequest.newBuilder()
?????.setUserId(user)
?????.setMsg("這是一條來(lái)自客戶端的消息:?你好,"?+?user?+?new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ssSSS").format(new?Date()))
?????.buildPartial();
???LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
???chatRequestStreamObserver.onNext(chatRequest);
??}
??//?客戶端告訴服務(wù)端:數(shù)據(jù)已經(jīng)發(fā)完了
??chatRequestStreamObserver.onCompleted();
??logger.info("service?finish");
??return?chatResponseStreamObserver.getExtra();
?}
}
解釋下代碼:
客戶端首先定義ExtendResponseObserver實(shí)例,并實(shí)現(xiàn)onNext、onError、onCompleted、getExtra等回調(diào)方法,用于處理服務(wù)端響應(yīng)、異常情況、請(qǐng)求完成、返回字符形式的響應(yīng);
客戶端通過(guò)在循環(huán)中調(diào)用 requestObserver 的 onNext 方法,發(fā)送請(qǐng)求消息到服務(wù)端;
??//?重點(diǎn)!!!!RPC調(diào)用發(fā)起
??StreamObserver?chatRequestStreamObserver?=?doubleStreamServiceStub.chat(chatResponseStreamObserver); 當(dāng)RPC請(qǐng)求發(fā)送完成之后,通過(guò)調(diào)用 「onCompleted()」 通知服務(wù)端數(shù)據(jù)已經(jīng)發(fā)送完成;
「需要特別注意」,客戶端發(fā)送請(qǐng)求的stub為流式stub為「DoubleStreamServiceGrpc.newStub(channel);」 而非newBlockingStub。
案例運(yùn)行
案例運(yùn)行完畢,我們運(yùn)行看看效果。
本文開(kāi)始,運(yùn)行案例相關(guān)展示嘗試通過(guò)gif動(dòng)態(tài)展示:

先后啟動(dòng)了服務(wù)端與客戶端, 客戶端在一次調(diào)用中循環(huán)發(fā)送chat請(qǐng)求到服務(wù)端 服務(wù)端邊處理請(qǐng)求邊響應(yīng)給客戶端 可以看到服務(wù)端請(qǐng)求日志在一次調(diào)用中是邊處理邊打印的,客戶端日志也是逐步輸出的 表明雙向流式RPC是異步的、高效的、非阻塞的。
客戶端流式RPC機(jī)理
稍微翻看源碼,可以發(fā)現(xiàn),實(shí)際上客戶端底層 onNext 方法調(diào)用了 ClientCall 的消息發(fā)送方法,代碼如下(CallToStreamObserverAdapter 類):
private?static?class?CallToStreamObserverAdapter?extends?ClientCallStreamObserver?{
??private?boolean?frozen;
???private?final?ClientCall?call;
???private?Runnable?onReadyHandler;
???private?boolean?autoFlowControlEnabled?=?true;
?
???public?CallToStreamObserverAdapter(ClientCall?call)?{
???this.call?=?call;
???}
?
???private?void?freeze()?{
????this.frozen?=?true;
???}
?
???@Override
????public?void?onNext(T?value)?{
???call.sendMessage(value);
???}
特別注意的是,對(duì)于雙向 Streaming 模式,只支持異步調(diào)用方式。
總結(jié)
本文我們主要了解了gRPC的雙向流式調(diào)用,對(duì)于這種流模式調(diào)用,可以充分利用 HTTP/2.0 協(xié)議的多路復(fù)用功能,實(shí)現(xiàn)在在一條 HTTP 鏈路上并行雙向傳輸數(shù)據(jù)(全雙工),它可以有效解決 HTTP/1.X 的數(shù)據(jù)單向傳輸問(wèn)題,在大幅減少 HTTP 連接的情況下充分利用單條鏈路的性能,其性能可以媲美傳統(tǒng)的 RPC 私有長(zhǎng)連接協(xié)議:即通過(guò)更少的鏈路,實(shí)現(xiàn)更高的性能。
下篇文章,我們將為我們的grpc通信加入基于Nacos的服務(wù)注冊(cè)發(fā)現(xiàn)能力,不見(jiàn)不散。
