<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          翻譯翻譯, 什么叫g(shù)RPC雙向流?

          共 5326字,需瀏覽 11分鐘

           ·

          2022-03-18 23:23

          今天來(lái)實(shí)戰(zhàn)一下gRPC的特性,雙向流。

          首先認(rèn)識(shí)一下,什么是雙向流?

          ?

          所謂雙向流式 RPC ,是由客戶端調(diào)用方法來(lái)初始化,而服務(wù)端接收到客戶端的元數(shù)據(jù),方法名和截止時(shí)間。

          并且服務(wù)端可以選擇發(fā)送回它的初始元數(shù)據(jù)或等待客戶端發(fā)送請(qǐng)求的一種通信方式。

          雙向流,特點(diǎn)在于雙向,也就是請(qǐng)求響應(yīng)如何起作用是完全取決于應(yīng)用怎么處理,因?yàn)榭蛻舳撕头?wù)端能在任意順序上讀寫 ,也就是說(shuō)這些流的操作是完全獨(dú)立的。

          例如服務(wù)端可以一直等待,直到它接收到所有客戶端的消息才寫應(yīng)答,或者服務(wù)端和客戶端可以像"乒乓球"一樣:服務(wù)端后得到一個(gè)請(qǐng)求就回送一個(gè)應(yīng)答,接著客戶端根據(jù)應(yīng)答來(lái)發(fā)送另一個(gè)請(qǐng)求,以此類推。

          ?

          通俗地說(shuō),客戶端發(fā)送了 N 個(gè)請(qǐng)求,服務(wù)端返回 N 個(gè)或者 M 個(gè)響應(yīng),該特性能夠充分利用 HTTP/2.0 的多路復(fù)用功能。

          某個(gè)時(shí)刻,HTTP/2.0 鏈路上可以既有請(qǐng)求也有響應(yīng),實(shí)現(xiàn)了全雙工通信(對(duì)比單行道和雙向車道),雙向流式RPC通信用一個(gè)簡(jiǎn)單的圖表示如下:

          grpc雙向流.png

          如何定義雙向流呢?

          ?

          一個(gè) 雙向流式 RPC 是雙方使用讀寫流去發(fā)送一個(gè)消息序列。兩個(gè)流獨(dú)立操作,因此客戶端和服務(wù)器 可以以任意喜歡的順序讀寫:

          比如, 服務(wù)器可以在寫入響應(yīng)前等待接收所有的客戶端消息,或者可以交替地讀取和寫入消息,或者其他讀寫的組合。每個(gè)流中的消息順序被預(yù)留。開(kāi)發(fā)者可以通過(guò)在請(qǐng)求和響應(yīng)前加 stream 關(guān)鍵字去制定方法的類型。

          ?

          我們可以在服務(wù)的IDL定義文件proto中按照如下方式聲明雙向流式RPC接口。

          ??//?Accepts?a?stream?of?RouteNotes?sent?while?a?route?is?being?traversed,
          ??//?while?receiving?other?RouteNotes?(e.g.?from?other?users).
          ??rpc?RouteChat(stream?RouteNote)?returns?(stream?RouteNote)?{}

          聲明方式為在請(qǐng)求和響應(yīng)之前都添加關(guān)鍵字 stream

          實(shí)操一把

          有了基本的概念,我們還是直接上手實(shí)操一下。

          案例介紹

          ?

          本案例為一個(gè)簡(jiǎn)單的應(yīng)答接口,客戶端向服務(wù)端發(fā)送問(wèn)候語(yǔ),服務(wù)端接受問(wèn)候并返回服務(wù)端的問(wèn)候。

          由于是雙向流,所以我們可以在一次接口調(diào)用中,發(fā)送多次問(wèn)候。

          運(yùn)行的效果就是客戶端與服務(wù)端的日志是交替打印的,也就是說(shuō)服務(wù)端在客戶端調(diào)用接口的過(guò)程中就可以逐步發(fā)送響應(yīng)結(jié)果給客戶端,而不是像阻塞式請(qǐng)求一樣,等待客戶端請(qǐng)求發(fā)送完畢,再統(tǒng)一一次性返回接口。

          ?

          這種雙向流的接口處理模式的好處是顯而易見(jiàn)的:

          • 如果傳輸?shù)臄?shù)據(jù)包過(guò)大,客戶端可以將請(qǐng)求包拆分為多個(gè)小包發(fā)送至服務(wù)端。服務(wù)端依次處理小包,發(fā)送過(guò)程與處理過(guò)程互不干擾,互不依賴。
          • 服務(wù)端不需要等待客戶端包全部發(fā)送,才能處理以及響應(yīng)。

          服務(wù)IDL定義

          ?

          首先還是需要編寫定義proto接口定義

          ?
          syntax?=?"proto3";

          option?java_multiple_files?=?true;
          option?java_package?=?"com.snowalker.grpc.sdk.stream";
          option?java_outer_classname?=?"DoubleStreamProto";

          //?服務(wù)IDL定義
          service?DoubleStreamService?{
          ??rpc?chat?(stream?ChatRequest)?returns?(stream?ChatResponse)?{
          ??}
          }

          //?請(qǐng)求
          message?ChatRequest?{
          ??int32?userId?=?1;
          ??string?msg?=?2;
          }

          //?響應(yīng)
          message?ChatResponse?{
          ??int32?userId?=?1;
          ??string?msg?=?2;
          }

          重點(diǎn)關(guān)注chat接口定義,同時(shí)聲明請(qǐng)求與響應(yīng)為stream類型,標(biāo)記這是一個(gè)雙向流RPC。

          對(duì)該proto文件進(jìn)行編譯,所在工程grpc-demo-sdk根路徑下執(zhí)行:

          ?

          mvn clean compile -DskipTests

          ?

          編寫服務(wù)端--實(shí)現(xiàn)服務(wù)邏輯

          服務(wù)端編寫與普通的RPC接口相同,也是需要繼承g(shù)RPC生成的XXXXGrpc.XXXXXImplBase,具體代碼如下:

          public?class?DoubleStreamServiceImpl?extends?DoubleStreamServiceGrpc.DoubleStreamServiceImplBase?{

          重寫chat方法,實(shí)現(xiàn)服務(wù)端邏輯,可以看到,業(yè)務(wù)邏輯同阻塞式接口不同,這里將業(yè)務(wù)邏輯寫在了onNext回調(diào)方法中,因此我們可以知道這是一種異步回調(diào)機(jī)制。

          ?/**
          ??*?@param?responseObserver
          ??*?@return
          ??*/
          ?@Override
          ?public?StreamObserver?chat(StreamObserver?responseObserver)?{
          ??return?new?StreamObserver()?{
          ???@Override
          ???public?void?onNext(ChatRequest?chatRequest)?{

          ????int?userId?=?chatRequest.getUserId();
          ????String?msg?=?chatRequest.getMsg();

          ????logger.info("[DoubleStreamServiceImpl]?服務(wù)端處理開(kāi)始....");
          ????logger.info("[DoubleStreamServiceImpl]?客戶端說(shuō):?["?+?msg?+?"]");

          ????responseObserver.onNext(ChatResponse.newBuilder()
          ??????.setUserId(chatRequest.getUserId())
          ??????.setMsg("這是一條來(lái)自[服務(wù)端]的消息:?你好,收到了-"?+?userId?+?"?的消息.?"?+?new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ssSSS").format(new?Date())?+?"\n")
          ??????.build());
          ???}

          ???@Override
          ???public?void?onError(Throwable?throwable)?{
          ????logger.warning("[DoubleStreamServiceImpl]?gRPC?dealing?error");
          ???}

          ???@Override
          ???public?void?onCompleted()?{
          ????responseObserver.onCompleted();
          ???}
          ??};
          ?}

          構(gòu)造 Streaming 響應(yīng)對(duì)象 StreamObserver并實(shí)現(xiàn) onNext 等接口,由于服務(wù)端也是 Streaming模式,因此響應(yīng)是多個(gè)的,也就是說(shuō) onNext 會(huì)被調(diào)用多次。

          在onNext方法中編寫服務(wù)端業(yè)務(wù)邏輯,這里主要做的就是取出請(qǐng)求體ChatRequest中的userId,以及msg,打印輸出,構(gòu)造響應(yīng)對(duì)象ChatResponse,并return。

          實(shí)現(xiàn) onCompleted 方法,調(diào)用 「responseObserver.onCompleted()」 將請(qǐng)求返回客戶端。

          編寫服務(wù)端--綁定服務(wù)實(shí)現(xiàn)類

          同一般的阻塞式RPC接口相同,想要使雙向流式RPC生效,還需要注冊(cè)服務(wù)實(shí)現(xiàn)類到服務(wù)端ServerBuilder中。

          ?@SneakyThrows
          ?private?void?startServer()?{
          ??int?serverPort?=?10881;
          ??server?=?ServerBuilder.forPort(serverPort)
          ????//?上文中的報(bào)價(jià)服務(wù)實(shí)現(xiàn)類
          ????.addService(new?OrderServiceImpl())
          ????//?添加雙向流式RPC實(shí)現(xiàn)
          ????.addService(new?DoubleStreamServiceImpl())
          ????.build();
          ??server.start();

          ??logger.info("OrderServerBoot?started,?listening?on:"?+?serverPort);

          ??//?優(yōu)雅停機(jī)
          ??addGracefulShowdownHook();
          ?}

          編寫客戶端--實(shí)現(xiàn)客戶端邏輯

          ?

          接著編寫客戶端邏輯。

          ?

          雙向流式RPC的客戶端實(shí)現(xiàn)方式與傳統(tǒng)阻塞式客戶端也是不同的。

          ?

          首先定義接口ExtendResponseObserver,繼承StreamObserver,用于返回文本格式的響應(yīng)體,方便觀察。

          ?
          public?interface?ExtendResponseObserver?extends?StreamObserver?{
          ????String?getExtra();
          }
          ?

          接著編寫客戶端邏輯。

          ?
          /**
          ?*?@author?snowalker
          ?*?@version?1.0
          ?*?@date?2022/3/16?23:49
          ?*?@className
          ?*?@desc
          ?*/
          public?class?DoubleStreamClient?{

          ?private?static?final?Logger?logger?=?Logger.getLogger(DoubleStreamClient.class.getName());

          ?private?final?DoubleStreamServiceGrpc.DoubleStreamServiceStub?doubleStreamServiceStub;

          ?public?DoubleStreamClient(ManagedChannel?channel)?{
          ??doubleStreamServiceStub?=?DoubleStreamServiceGrpc.newStub(channel);
          ?}

          ?public?String?chat(String?msg,?int?user,?int?count)?{
          ??ExtendResponseObserver?chatResponseStreamObserver?=?new?ExtendResponseObserver()?{

          ???@Override
          ???public?String?getExtra()?{
          ????return?stringBuilder.toString();
          ???}

          ???//?用stringBuilder保存所有來(lái)自服務(wù)端的響應(yīng)
          ???private?StringBuilder?stringBuilder?=?new?StringBuilder();

          ???@Override
          ???public?void?onNext(ChatResponse?chatResponse)?{
          ????logger.info("[DoubleStreamClient]?onNext.....");
          ????//?放入匿名類的成員變量中
          ????System.out.println(chatResponse.getMsg());
          ????stringBuilder.append(String.format("服務(wù)端響應(yīng):%s
          ,?用戶:%d"
          ?,?chatResponse.getMsg(),?chatResponse.getUserId()));
          ???}

          ???@Override
          ???public?void?onError(Throwable?throwable)?{
          ????logger.warning("[DoubleStreamClient]?gRPC?request?error");
          ????stringBuilder.append("[DoubleStreamClient]chat?gRPC?error,?"?+?throwable.getMessage());
          ???}

          ???@Override
          ???public?void?onCompleted()?{
          ????logger.info("[DoubleStreamClient]?onCompleted");
          ???}
          ??};

          ??//?重點(diǎn)!!!!RPC調(diào)用發(fā)起
          ??StreamObserver?chatRequestStreamObserver?=?doubleStreamServiceStub.chat(chatResponseStreamObserver);

          ??for(int?i?=?0;?i????//?每次執(zhí)行onNext都會(huì)發(fā)送一筆數(shù)據(jù)到服務(wù)端,
          ???//?服務(wù)端的onNext方法都會(huì)被執(zhí)行一次
          ???ChatRequest?chatRequest?=?ChatRequest.newBuilder()
          ?????.setUserId(user)
          ?????.setMsg("這是一條來(lái)自客戶端的消息:?你好,"?+?user?+?new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ssSSS").format(new?Date()))
          ?????.buildPartial();
          ???LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
          ???chatRequestStreamObserver.onNext(chatRequest);
          ??}

          ??//?客戶端告訴服務(wù)端:數(shù)據(jù)已經(jīng)發(fā)完了
          ??chatRequestStreamObserver.onCompleted();

          ??logger.info("service?finish");

          ??return?chatResponseStreamObserver.getExtra();
          ?}
          }

          解釋下代碼:

          • 客戶端首先定義ExtendResponseObserver實(shí)例,并實(shí)現(xiàn)onNext、onError、onCompleted、getExtra等回調(diào)方法,用于處理服務(wù)端響應(yīng)、異常情況、請(qǐng)求完成、返回字符形式的響應(yīng);

          • 客戶端通過(guò)在循環(huán)中調(diào)用 requestObserver 的 onNext 方法,發(fā)送請(qǐng)求消息到服務(wù)端;

            ??//?重點(diǎn)!!!!RPC調(diào)用發(fā)起
            ??StreamObserver?chatRequestStreamObserver?=?doubleStreamServiceStub.chat(chatResponseStreamObserver);
          • 當(dāng)RPC請(qǐng)求發(fā)送完成之后,通過(guò)調(diào)用 「onCompleted()」 通知服務(wù)端數(shù)據(jù)已經(jīng)發(fā)送完成;

          • 「需要特別注意」,客戶端發(fā)送請(qǐng)求的stub為流式stub為「DoubleStreamServiceGrpc.newStub(channel);」 而非newBlockingStub。

          案例運(yùn)行

          案例運(yùn)行完畢,我們運(yùn)行看看效果。

          本文開(kāi)始,運(yùn)行案例相關(guān)展示嘗試通過(guò)gif動(dòng)態(tài)展示:

          run.gif
          • 先后啟動(dòng)了服務(wù)端與客戶端,
          • 客戶端在一次調(diào)用中循環(huán)發(fā)送chat請(qǐng)求到服務(wù)端
          • 服務(wù)端邊處理請(qǐng)求邊響應(yīng)給客戶端
          • 可以看到服務(wù)端請(qǐng)求日志在一次調(diào)用中是邊處理邊打印的,客戶端日志也是逐步輸出的
          • 表明雙向流式RPC是異步的、高效的、非阻塞的。

          客戶端流式RPC機(jī)理

          稍微翻看源碼,可以發(fā)現(xiàn),實(shí)際上客戶端底層 onNext 方法調(diào)用了 ClientCall 的消息發(fā)送方法,代碼如下(CallToStreamObserverAdapter 類):

          private?static?class?CallToStreamObserverAdapter?extends?ClientCallStreamObserver?{
          ??private?boolean?frozen;
          ???private?final?ClientCall?call;
          ???private?Runnable?onReadyHandler;
          ???private?boolean?autoFlowControlEnabled?=?true;
          ?
          ???public?CallToStreamObserverAdapter(ClientCall?call)?{
          ???this.call?=?call;
          ???}
          ?
          ???private?void?freeze()?{
          ????this.frozen?=?true;
          ???}
          ?
          ???@Override
          ????public?void?onNext(T?value)?{
          ???call.sendMessage(value);
          ???}

          特別注意的是,對(duì)于雙向 Streaming 模式,只支持異步調(diào)用方式。

          總結(jié)

          本文我們主要了解了gRPC的雙向流式調(diào)用,對(duì)于這種流模式調(diào)用,可以充分利用 HTTP/2.0 協(xié)議的多路復(fù)用功能,實(shí)現(xiàn)在在一條 HTTP 鏈路上并行雙向傳輸數(shù)據(jù)(全雙工),它可以有效解決 HTTP/1.X 的數(shù)據(jù)單向傳輸問(wèn)題,在大幅減少 HTTP 連接的情況下充分利用單條鏈路的性能,其性能可以媲美傳統(tǒng)的 RPC 私有長(zhǎng)連接協(xié)議:即通過(guò)更少的鏈路,實(shí)現(xiàn)更高的性能。

          下篇文章,我們將為我們的grpc通信加入基于Nacos的服務(wù)注冊(cè)發(fā)現(xiàn)能力,不見(jiàn)不散。


          瀏覽 76
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  久草色香蕉视频 | 亚洲无码AV电影 | 日韩丝袜足交视频网站 | 中国黄色视频网站 | 亚洲欧美成人 |