<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Nacos3# 服務(wù)注冊與發(fā)現(xiàn)服務(wù)端啟動(dòng)源碼解析

          共 13236字,需瀏覽 27分鐘

           ·

          2021-06-07 18:07


          引言

          本文從gRPC的.proto文件解讀其暴露的服務(wù),由此生成gRPC的客戶端/服務(wù)端存根。進(jìn)而分析服務(wù)端加載啟動(dòng)過程。最近家里事情較多,本文短了點(diǎn),大伙隨便看看。

          一、內(nèi)容提要

          gRPC Service.proto解讀

          • 暴露用于服務(wù)端到客戶端流式RPC的服務(wù)RequestStream#requestStream
          • 暴露用于簡單RPC調(diào)用的服務(wù)Request#request
          • 暴露用于雙向流式RPC調(diào)用的服務(wù)BiRequestStream#requestBiStream
          • 三種方式入?yún)⒕鶠镻ayload

          Server啟動(dòng)流程

          • 定義了攔截器獲取客戶端的ip、port、connectId等
          • 裝配了.proto定義的兩種調(diào)用方式,用于接受客戶端請求 簡單調(diào)用方式Request#request和雙向流調(diào)用方式BiRequestStream#biRequestStream
          • 設(shè)置了服務(wù)啟動(dòng)端口、線程、接受消息的限制、壓縮/解壓縮類型

          二、gRPC Service .proto解讀

          客戶端和服務(wù)端通過gRPC通信,基于.proto生成響應(yīng)的通信代碼,那先看看.proto暴露了哪些服務(wù)。

          api/proto/nacos_grpc_service.proto

          syntax = "proto3"// 注解@1 

          // 注解@2
          import "google/protobuf/any.proto";
          import "google/protobuf/timestamp.proto";

          option java_multiple_files = true
          option java_package = "com.alibaba.nacos.api.grpc.auto"// 注解@3

          message Metadata { // 注解@4
            string type = 3;
            string clientIp = 8;
            map<string, string> headers = 7
          }

          message Payload { // 注解@5
            Metadata metadata = 2;
            google.protobuf.Any body = 3;
          }

          service RequestStream { // 注解@6
            // build a streamRequest
            rpc requestStream (Payload) returns (stream Payload) {
            }
          }

          service Request { // 注解@7
            // Sends a commonRequest
            rpc request (Payload) returns (Payload) {
            }
          }

          service BiRequestStream { // 注解@8
            // Sends a commonRequest
            rpc requestBiStream (stream Payload) returns (stream Payload) {
            }
          }

          注解@1  定義proto的版本

          注解@2 導(dǎo)入其他的.proto文件

          注解@3 option可選的;指java類生成所在的包

          注解@4  定義Metadata消息格式,生成對應(yīng)Metadata類,包含了字符串類型type和clientIp、map類型的headers

          注解@5 定義Payload消息格式,生成對應(yīng)Payload類,包含了Metadata的引用、Any類型(對應(yīng)java中Object)body

          注解@6 定義service RequestStream會生產(chǎn)客戶端和服務(wù)端存根用于grpc通信,暴露的服務(wù)為requestStream,類型為:服務(wù)端到客戶端流式RPC,接受Payload對象參數(shù),返回批量Payload數(shù)據(jù)

          注解@7  定義service Request會生產(chǎn)客戶端和服務(wù)端存根用于grpc通信,暴露的服務(wù)為request,類型為:簡單RPC調(diào)用,接受Payload參數(shù)返回Payload類型對象

          注解@8 定義service BiRequestStream會生產(chǎn)客戶端和服務(wù)端存根用于grpc通信,暴露的服務(wù)為requestBiStream,類型為:雙向流式RPC,接受批量Payload類型數(shù)據(jù),返回批量Payload類型數(shù)據(jù)

          小結(jié): 我們從.proto的描述中能夠發(fā)現(xiàn),nacos server將暴露三個(gè)服務(wù)。@1 RequestStream#requestStream用于服務(wù)端到客戶端流式RPC;@2 Request#request用于簡單RPC調(diào)用;@3 BiRequestStream#requestBiStream用于雙向流式RPC調(diào)用。三種的出入?yún)⒕鶠镻ayload。


          三、Server啟動(dòng)流程


          坐標(biāo)com.alibaba.nacos.core.remote.BaseRpcServer,在nacos啟動(dòng)時(shí)執(zhí)行

          @PostConstruct
          public void start() throws Exception {
            String serverName = getClass().getSimpleName();
             Loggers.REMOTE.info("Nacos {} Rpc server starting at port {}", serverName, getServicePort());

             startServer();
          }

          源碼解讀

          @Override
          public void startServer() throws Exception {
            final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();

            // 注解@9
            ServerInterceptor serverInterceptor = new ServerInterceptor() {
              @Override
              public <T, S> ServerCall.Listener<T> interceptCall(ServerCall<T, S> call, Metadata headers,
                                                                 ServerCallHandler<T, S> next)
           
          {
                Context ctx = Context.current()
                  .withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID))
                  .withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP))
                  .withValue(CONTEXT_KEY_CONN_REMOTE_PORT, call.getAttributes().get(TRANS_KEY_REMOTE_PORT))
                  .withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT));
                if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) {
                  Channel internalChannel = getInternalChannel(call);
                  ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel);
                }
                return Contexts.interceptCall(ctx, call, headers, next);
              }
            };
            // 注解@10
            addServices(handlerRegistry, serverInterceptor);
            // 注解@11
            server = ServerBuilder.forPort(getServicePort()).executor(getRpcExecutor())
              .maxInboundMessageSize(getInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
              .compressorRegistry(CompressorRegistry.getDefaultInstance())
              .decompressorRegistry(DecompressorRegistry.getDefaultInstance())
              .addTransportFilter(new ServerTransportFilter() {
                @Override
                public Attributes transportReady(Attributes transportAttrs) {  // transport/connection 建立回調(diào)
                  InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs
                    .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
                  InetSocketAddress localAddress = (InetSocketAddress) transportAttrs
                    .get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
                  int remotePort = remoteAddress.getPort();
                  int localPort = localAddress.getPort();
                  String remoteIp = remoteAddress.getAddress().getHostAddress();
                  Attributes attrWrapper = transportAttrs.toBuilder()
                    .set(TRANS_KEY_CONN_ID, System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort)
                    .set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort)
                    .set(TRANS_KEY_LOCAL_PORT, localPort).build();
                  String connectionId = attrWrapper.get(TRANS_KEY_CONN_ID);
                  Loggers.REMOTE_DIGEST.info("Connection transportReady,connectionId = {} ", connectionId);
                  return attrWrapper;

                }

                @Override
                public void transportTerminated(Attributes transportAttrs) // transport/connection 關(guān)閉回調(diào)
                  String connectionId = null;
                  try {
                    connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
                  } catch (Exception e) {
                    // Ignore
                  }
                  if (StringUtils.isNotBlank(connectionId)) {
                    Loggers.REMOTE_DIGEST
                      .info("Connection transportTerminated,connectionId = {} ", connectionId);
                    connectionManager.unregister(connectionId);
                  }
                }
              }).build();
           
            
            // 注解@12
            server.start();
          }

          注解@9 定義server的攔截器,可以從請求中獲取connection id、ip、port等

          注解@10 添加處理服務(wù)

          private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {

            // unary common call register.
            // 注解@10.1
            final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder()
              .setType(MethodDescriptor.MethodType.UNARY)  // 服務(wù)調(diào)用方式UNARY
              .setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_SERVICE_NAME, REQUEST_METHOD_NAME)) // 服務(wù)的接口名和方法名「request」
              .setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())) // 請求序列化類
              .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build(); // 響應(yīng)序列化類

            // 注解@10.2
            final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls
              .asyncUnaryCall((request, responseObserver) -> {
                grpcCommonRequestAcceptor.request(request, responseObserver);
              });

            // 注解@10.3
            final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(REQUEST_SERVICE_NAME)
              .addMethod(unaryPayloadMethod, payloadHandler).build();

            // 注解@10.4
            handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));

            // bi stream register.
            // 注解@10.5
            final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall(
              (responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));

            // 注解@10.6
            final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
              .setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(MethodDescriptor
                                                                                     .generateFullMethodName(REQUEST_BI_STREAM_SERVICE_NAME, REQUEST_BI_STREAM_METHOD_NAME))
              .setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build()))
              .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();

            // 注解@10.7
            final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition
              .builder(REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build();


            // 注解@10.8
            handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));

          }

          注解@10.1 構(gòu)造MethodDescriptor,包括:服務(wù)調(diào)用方式簡單RPC即UNARY、服務(wù)的接口名和方法名、請求序列化類、響應(yīng)序列化類

          注解@10.2 服務(wù)接口處理類,接受到request請求將調(diào)用執(zhí)行

          注解@10.3 構(gòu)建暴露的服務(wù)「Request」

          注解@10.4 注冊到內(nèi)部的注冊中心(Registry)中,可以根據(jù)服務(wù)定義信息查詢實(shí)現(xiàn)類(普通對象request/response調(diào)用)

          注解@10.5 服務(wù)接口處理類,接收到biRequestStream請求將調(diào)用執(zhí)行

          注解@10.6 構(gòu)造MethodDescriptor,包括:服務(wù)雙向流調(diào)用方式BIDI_STREAMING、服務(wù)的接口名和方法名、請求序列化類、響應(yīng)序列化類

          注解@10.7 構(gòu)建暴露的服務(wù)「BiRequestStream」

          注解@10.8 注冊到內(nèi)部的注冊中心(Registry)中,可以根據(jù)服務(wù)定義信息查詢實(shí)現(xiàn)類(雙向流調(diào)用)

          注解@11 設(shè)置server啟動(dòng)的端口(默認(rèn)為 8848 + 1001 = 9849),getRpcExecutor線程執(zhí)行器(線程數(shù)默認(rèn)為 = 處理器核數(shù)*16) ,maxInboundMessageSize最大限制為10M,壓縮解壓縮使用gzip。

          注解@12 注冊發(fā)現(xiàn)server啟動(dòng)(grpc)

          小結(jié): server啟動(dòng)過程中主要干了三件事 @1定義了攔截器獲取客戶端的ip、port、connectId等;@2裝配了.proto定義的兩種調(diào)用方式,簡單調(diào)用方式Request#request和雙向流調(diào)用方式BiRequestStream#biRequestStream;@3設(shè)置了服務(wù)啟動(dòng)端口、線程、接受消息的限制、壓縮/解壓縮類型。

          瀏覽 103
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  婷婷导航| 日韩麻豆 | 久久久av影院 | 天堂网2020 | 五月丁香婷婷色综合 |