<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Rocketmq源碼分析02:NameServer 啟動(dòng)流程

          共 16124字,需瀏覽 33分鐘

           ·

          2021-04-04 08:42

          注:本系列源碼分析基于RocketMq 4.8.0,gitee倉庫鏈接:https://gitee.com/funcy/rocketmq.git.

          本文我們來分析NameServer相關(guān)代碼,在正式分析源碼前,我們先來回憶下NameServer的功能:

          NameServer是一個(gè)非常簡(jiǎn)單的Topic路由注冊(cè)中心,其角色類似Dubbo中的zookeeper,支持Broker的動(dòng)態(tài)注冊(cè)與發(fā)現(xiàn)。主要包括兩個(gè)功能:

          • Broker管理,NameServer接受Broker集群的注冊(cè)信息并且保存下來作為路由信息的基本數(shù)據(jù)。然后提供心跳檢測(cè)機(jī)制,檢查Broker是否還存活;

          • 路由信息管理,每個(gè)NameServer將保存關(guān)于Broker集群的整個(gè)路由信息和用于客戶端查詢的隊(duì)列信息。然后ProducerConumser通過NameServer就可以知道整個(gè)Broker集群的路由信息,從而進(jìn)行消息的投遞和消費(fèi)。

          本文我們將通過源碼來分析NameServer的啟動(dòng)流程。

          1. 主方法:NamesrvStartup#main

          NameServer位于RocketMq項(xiàng)目的namesrv模塊下,主類是org.apache.rocketmq.namesrv.NamesrvStartup,代碼如下:

          public?class?NamesrvStartup?{

          ????...

          ????public?static?void?main(String[]?args)?{
          ????????main0(args);
          ????}

          ????public?static?NamesrvController?main0(String[]?args)?{
          ????????try?{
          ????????????//?創(chuàng)建?controller
          ????????????NamesrvController?controller?=?createNamesrvController(args);
          ????????????//?啟動(dòng)
          ????????????start(controller);
          ????????????String?tip?=?"The?Name?Server?boot?success.?serializeType="?
          ????????????????????+?RemotingCommand.getSerializeTypeConfigInThisServer();
          ????????????log.info(tip);
          ????????????System.out.printf("%s%n",?tip);
          ????????????return?controller;
          ????????}?catch?(Throwable?e)?{
          ????????????e.printStackTrace();
          ????????????System.exit(-1);
          ????????}

          ????????return?null;
          ????}

          ????...
          }

          可以看到,main()方法里的代碼還是相當(dāng)簡(jiǎn)單的,主要包含了兩個(gè)方法:

          • createNamesrvController(...):創(chuàng)建 controller
          • start(...):?jiǎn)?dòng)nameServer

          接下來我們就來分析這兩個(gè)方法了。

          2. 創(chuàng)建controllerNamesrvStartup#createNamesrvController

          public?static?NamesrvController?createNamesrvController(String[]?args)?throws?IOException,?JoranException?{
          ????//?省略解析命令行代碼
          ????...

          ????//?nameServer的相關(guān)配置
          ????final?NamesrvConfig?namesrvConfig?=?new?NamesrvConfig();
          ????//??nettyServer的相關(guān)配置
          ????final?NettyServerConfig?nettyServerConfig?=?new?NettyServerConfig();
          ????//?端口寫死了。。。
          ????nettyServerConfig.setListenPort(9876);
          ????if?(commandLine.hasOption('c'))?{
          ????????//?處理配置文件
          ????????String?file?=?commandLine.getOptionValue('c');
          ????????if?(file?!=?null)?{
          ????????????//?讀取配置文件,并將其加載到?properties?中
          ????????????InputStream?in?=?new?BufferedInputStream(new?FileInputStream(file));
          ????????????properties?=?new?Properties();
          ????????????properties.load(in);
          ????????????//?將?properties?里的屬性賦值到?namesrvConfig?與?nettyServerConfig
          ????????????MixAll.properties2Object(properties,?namesrvConfig);
          ????????????MixAll.properties2Object(properties,?nettyServerConfig);

          ????????????namesrvConfig.setConfigStorePath(file);

          ????????????System.out.printf("load?config?properties?file?OK,?%s%n",?file);
          ????????????in.close();
          ????????}
          ????}

          ????//?處理?-p?參數(shù),該參數(shù)用于打印nameServer、nettyServer配置,省略
          ????...

          ????//?將?commandLine?的所有配置設(shè)置到?namesrvConfig?中
          ????MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine),?namesrvConfig);
          ????//?檢查環(huán)境變量:ROCKETMQ_HOME
          ????if?(null?==?namesrvConfig.getRocketmqHome())?{
          ????????//?如果不設(shè)置?ROCKETMQ_HOME,就會(huì)在這里報(bào)錯(cuò)
          ????????System.out.printf("Please?set?the?%s?variable?in?your?environment?to?match?
          ????????????????the?location?of?the?RocketMQ?installation%n"
          ,?MixAll.ROCKETMQ_HOME_ENV);
          ????????System.exit(-2);
          ????}

          ????//?省略日志配置
          ????...

          ????//?創(chuàng)建一個(gè)controller
          ????final?NamesrvController?controller?=?
          ????????????new?NamesrvController(namesrvConfig,?nettyServerConfig);

          ????//?將當(dāng)前?properties?合并到項(xiàng)目的配置中,并且當(dāng)前?properties?會(huì)覆蓋項(xiàng)目中的配置
          ????controller.getConfiguration().registerConfig(properties);

          ????return?controller;
          }

          這個(gè)方法有點(diǎn)長(zhǎng),不過所做的事就兩件:

          1. 處理配置
          2. 創(chuàng)建NamesrvController實(shí)例

          2.1 處理配置

          咱們先簡(jiǎn)單地看下配置的處理。在我們啟動(dòng)項(xiàng)目中,可以使用-c /xxx/xxx.conf指定配置文件的位置,然后在createNamesrvController(...)方法中,通過如下代碼

          InputStream?in?=?new?BufferedInputStream(new?FileInputStream(file));
          properties?=?new?Properties();
          properties.load(in);

          將配置文件的內(nèi)容加載到properties對(duì)象中,然后調(diào)用MixAll.properties2Object(properties, namesrvConfig)方法將properties的屬性賦值給namesrvConfig,``MixAll.properties2Object(...)`代碼如下:

          public?static?void?properties2Object(final?Properties?p,?final?Object?object)?{
          ????Method[]?methods?=?object.getClass().getMethods();
          ????for?(Method?method?:?methods)?{
          ????????String?mn?=?method.getName();
          ????????if?(mn.startsWith("set"))?{
          ????????????try?{
          ????????????????String?tmp?=?mn.substring(4);
          ????????????????String?first?=?mn.substring(3,?4);
          ????????????????//?首字母小寫
          ????????????????String?key?=?first.toLowerCase()?+?tmp;
          ????????????????//?從Properties中獲取對(duì)應(yīng)的值
          ????????????????String?property?=?p.getProperty(key);
          ????????????????if?(property?!=?null)?{
          ????????????????????//?獲取值,并進(jìn)行相應(yīng)的類型轉(zhuǎn)換
          ????????????????????Class<?>[]?pt?=?method.getParameterTypes();
          ????????????????????if?(pt?!=?null?&&?pt.length?>?0)?{
          ????????????????????????String?cn?=?pt[0].getSimpleName();
          ????????????????????????Object?arg?=?null;
          ????????????????????????//?轉(zhuǎn)換成int
          ????????????????????????if?(cn.equals("int")?||?cn.equals("Integer"))?{
          ????????????????????????????arg?=?Integer.parseInt(property);
          ????????????????????????//?其他類型如long,double,float,boolean都是這樣轉(zhuǎn)換的,這里就省略了????
          ????????????????????????}?else?if?(...)?{
          ????????????????????????????...
          ????????????????????????}?else?{
          ????????????????????????????continue;
          ????????????????????????}
          ????????????????????????//?反射調(diào)用
          ????????????????????????method.invoke(object,?arg);
          ????????????????????}
          ????????????????}
          ????????????}?catch?(Throwable?ignored)?{
          ????????????}
          ????????}
          ????}
          }

          這個(gè)方法非常簡(jiǎn)單:

          1. 先獲取到object中的所有setXxx(...)方法
          2. 得到setXxx(...)中的Xxx
          3. 首字母小寫得到xxx
          4. properties獲取xxx屬性對(duì)應(yīng)的值,并根據(jù)setXxx(...)方法的參數(shù)類型進(jìn)行轉(zhuǎn)換
          5. 反射調(diào)用setXxx(...)方法進(jìn)行賦值

          這里之后,namesrvConfignettyServerConfig就賦值成功了。

          2.2 創(chuàng)建NamesrvController實(shí)例

          我們?cè)賮砜纯?code style="font-size:14px;color:rgb(30,107,184);background-color:rgba(27,31,35,.05);font-family:'Operator Mono', Consolas, Monaco, Menlo, monospace;">createNamesrvController(...)方法的第二個(gè)重要功能:創(chuàng)建NamesrvController實(shí)例.

          創(chuàng)建NamesrvController實(shí)例的代碼如下:

          final?NamesrvController?controller?=?new?NamesrvController(namesrvConfig,?nettyServerConfig);

          我們直接進(jìn)入NamesrvController的構(gòu)造方法:

          /**
          ?*?構(gòu)造方法,一系列的賦值操作
          ?*/

          public?NamesrvController(NamesrvConfig?namesrvConfig,?NettyServerConfig?nettyServerConfig)?{
          ????this.namesrvConfig?=?namesrvConfig;
          ????this.nettyServerConfig?=?nettyServerConfig;
          ????this.kvConfigManager?=?new?KVConfigManager(this);
          ????this.routeInfoManager?=?new?RouteInfoManager();
          ????this.brokerHousekeepingService?=?new?BrokerHousekeepingService(this);
          ????this.configuration?=?new?Configuration(log,?this.namesrvConfig,?this.nettyServerConfig);
          ????this.configuration.setStorePathFromConfig(this.namesrvConfig,?"configStorePath");
          }

          構(gòu)造方法里只是一系列的賦值操作,沒做什么實(shí)質(zhì)性的工作,就先不管了。

          3. 啟動(dòng)nameServerNamesrvStartup#start

          讓我們回到一開始的NamesrvStartup#main0方法,

          public?static?NamesrvController?main0(String[]?args)?{

          ????try?{
          ????????NamesrvController?controller?=?createNamesrvController(args);
          ????????start(controller);
          ????????...
          ????}?catch?(Throwable?e)?{
          ????????e.printStackTrace();
          ????????System.exit(-1);
          ????}

          ????return?null;
          }

          接下來我們來看看start(controller)方法中做了什么,進(jìn)入NamesrvStartup#start方法:

          public?static?NamesrvController?start(final?NamesrvController?controller)?throws?Exception?{
          ????if?(null?==?controller)?{
          ????????throw?new?IllegalArgumentException("NamesrvController?is?null");
          ????}
          ????//?初始化
          ????boolean?initResult?=?controller.initialize();
          ????if?(!initResult)?{
          ????????controller.shutdown();
          ????????System.exit(-3);
          ????}
          ????//?關(guān)閉鉤子,可以在關(guān)閉前進(jìn)行一些操作
          ????Runtime.getRuntime().addShutdownHook(new?ShutdownHookThread(log,?new?Callable<Void>()?{
          ????????@Override
          ????????public?Void?call()?throws?Exception?{
          ????????????controller.shutdown();
          ????????????return?null;
          ????????}
          ????}));
          ????//?啟動(dòng)
          ????controller.start();

          ????return?controller;
          }

          start(...)方法的邏輯也十分簡(jiǎn)潔,主要包含3個(gè)操作:

          1. 初始化,想必是做一些啟動(dòng)前的操作
          2. 添加關(guān)閉鉤子,所謂的關(guān)閉鉤子,可以理解為一個(gè)線程,可以用來監(jiān)聽jvm的關(guān)閉事件,在jvm真正關(guān)閉前,可以進(jìn)行一些處理操作,這里的關(guān)閉前的處理操作就是controller.shutdown()方法所做的事了,所做的事也很容易想到,無非就是關(guān)閉線程池、關(guān)閉已經(jīng)打開的資源等,這里我們就不深究了
          3. 啟動(dòng)操作,這應(yīng)該就是真正啟動(dòng)nameServer服務(wù)了

          接下來我們主要來探索初始化與啟動(dòng)操作流程。

          3.1 初始化:NamesrvController#initialize

          初始化的處理方法是NamesrvController#initialize,代碼如下:

          public?boolean?initialize()?{
          ????//?加載?kv?配置
          ????this.kvConfigManager.load();
          ????//?創(chuàng)建?netty?遠(yuǎn)程服務(wù)
          ????this.remotingServer?=?new?NettyRemotingServer(this.nettyServerConfig,?
          ????????????this.brokerHousekeepingService);
          ????//?netty?遠(yuǎn)程服務(wù)線程
          ????this.remotingExecutor?=?Executors.newFixedThreadPool(
          ????????????nettyServerConfig.getServerWorkerThreads(),?
          ????????????new?ThreadFactoryImpl("RemotingExecutorThread_"));
          ????//?注冊(cè),就是把?remotingExecutor?注冊(cè)到?remotingServer
          ????this.registerProcessor();

          ????//?開啟定時(shí)任務(wù),每隔10s掃描一次broker,移除不活躍的broker
          ????this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{
          ????????@Override
          ????????public?void?run()?{
          ????????????NamesrvController.this.routeInfoManager.scanNotActiveBroker();
          ????????}
          ????},?5,?10,?TimeUnit.SECONDS);

          ????//?省略打印kv配置的定時(shí)任務(wù)
          ????...

          ????//?Tls安全傳輸,我們不關(guān)注
          ????if?(TlsSystemConfig.tlsMode?!=?TlsMode.DISABLED)?{
          ????????...
          ????}

          ????return?true;
          }

          這個(gè)方法所做的事很明了,代碼中都已經(jīng)注釋了,代碼看著多,實(shí)際干的就兩件事:

          1. 處理netty相關(guān):創(chuàng)建遠(yuǎn)程服務(wù)與工作線程
          2. 開啟定時(shí)任務(wù):移除不活躍的broker

          什么是NettyRemotingServer呢?在本文開篇介紹NamerServer的功能時(shí),提到NameServer是一個(gè)簡(jiǎn)單的注冊(cè)中心,這個(gè)NettyRemotingServer就是對(duì)外開放的入口,用來接收broker的注冊(cè)消息的,當(dāng)然還會(huì)處理一些其他消息,我們后面會(huì)分析到。

          1. 創(chuàng)建NettyRemotingServer

          我們先來看看NettyRemotingServer的創(chuàng)建過程:

          public?NettyRemotingServer(final?NettyServerConfig?nettyServerConfig,
          ????????final?ChannelEventListener?channelEventListener)
          ?
          {
          ????super(nettyServerConfig.getServerOnewaySemaphoreValue(),?
          ????????????nettyServerConfig.getServerAsyncSemaphoreValue());
          ????this.serverBootstrap?=?new?ServerBootstrap();
          ????this.nettyServerConfig?=?nettyServerConfig;
          ????this.channelEventListener?=?channelEventListener;

          ????int?publicThreadNums?=?nettyServerConfig.getServerCallbackExecutorThreads();
          ????if?(publicThreadNums?<=?0)?{
          ????????publicThreadNums?=?4;
          ????}

          ????//?創(chuàng)建?publicExecutor
          ????this.publicExecutor?=?Executors.newFixedThreadPool(publicThreadNums,?new?ThreadFactory()?{
          ????????private?AtomicInteger?threadIndex?=?new?AtomicInteger(0);
          ????????@Override
          ????????public?Thread?newThread(Runnable?r)?{
          ????????????return?new?Thread(r,?"NettyServerPublicExecutor_"?
          ????????????????????+?this.threadIndex.incrementAndGet());
          ????????}
          ????});

          ????//?判斷是否使用?epoll
          ????if?(useEpoll())?{
          ????????//?boss
          ????????this.eventLoopGroupBoss?=?new?EpollEventLoopGroup(1,?new?ThreadFactory()?{
          ????????????private?AtomicInteger?threadIndex?=?new?AtomicInteger(0);
          ????????????@Override
          ????????????public?Thread?newThread(Runnable?r)?{
          ????????????????return?new?Thread(r,?String.format("NettyEPOLLBoss_%d",?
          ????????????????????this.threadIndex.incrementAndGet()));
          ????????????}
          ????????});
          ????????//?worker
          ????????this.eventLoopGroupSelector?=?new?EpollEventLoopGroup(
          ????????????????nettyServerConfig.getServerSelectorThreads(),?new?ThreadFactory()?{
          ????????????private?AtomicInteger?threadIndex?=?new?AtomicInteger(0);
          ????????????private?int?threadTotal?=?nettyServerConfig.getServerSelectorThreads();

          ????????????@Override
          ????????????public?Thread?newThread(Runnable?r)?{
          ????????????????return?new?Thread(r,?String.format("NettyServerEPOLLSelector_%d_%d",?
          ????????????????????threadTotal,?this.threadIndex.incrementAndGet()));
          ????????????}
          ????????});
          ????}?else?{
          ????????//?這里也是創(chuàng)建了兩個(gè)線程
          ????????...
          ????}
          ????//?加載ssl上下文
          ????loadSslContext();
          }

          整個(gè)方法下來,其實(shí)就是做了一些賦值操作,我們挑重點(diǎn)講:

          1. serverBootstrap:熟悉netty的小伙伴應(yīng)該對(duì)這個(gè)很熟悉了,這個(gè)就是netty服務(wù)端的啟動(dòng)類
          2. publicExecutor:這里創(chuàng)建了一個(gè)名為publicExecutor線程池,暫時(shí)并不知道這個(gè)線程有啥作用,先混個(gè)臉熟吧
          3. eventLoopGroupBosseventLoopGroupSelector線程組:熟悉netty的小伙伴應(yīng)該對(duì)這兩個(gè)線程很熟悉了,這就是netty用來處理連接事件與讀寫事件的線程了,eventLoopGroupBoss對(duì)應(yīng)的是netty的boss線程組,eventLoopGroupSelector對(duì)應(yīng)的是worker線程組

          到這里,netty服務(wù)的準(zhǔn)備工作本完成了。

          2. 創(chuàng)建netty服務(wù)線程池

          讓我們?cè)倩氐?code style="font-size:14px;color:rgb(30,107,184);background-color:rgba(27,31,35,.05);font-family:'Operator Mono', Consolas, Monaco, Menlo, monospace;">NamesrvController#initialize方法,NettyRemotingServer創(chuàng)建完成后,接著就是netty遠(yuǎn)程服務(wù)線程池了:

          this.remotingExecutor?=?Executors.newFixedThreadPool(
          ????nettyServerConfig.getServerWorkerThreads(),?
          ????new?ThreadFactoryImpl("RemotingExecutorThread_"));

          創(chuàng)建完成線程池后,接著就是注冊(cè)了,也就是registerProcessor方法所做的工作:

          this.registerProcessor();

          registerProcessor()中 ,會(huì)把當(dāng)前的 NamesrvController 注冊(cè)到 remotingServer中:

          private?void?registerProcessor()?{
          ????if?(namesrvConfig.isClusterTest())?{
          ????????this.remotingServer.registerDefaultProcessor(
          ????????????new?ClusterTestRequestProcessor(this,?namesrvConfig.getProductEnvName()),
          ????????????this.remotingExecutor);
          ????}?else?{
          ????????//?注冊(cè)操作
          ????????this.remotingServer.registerDefaultProcessor(
          ????????????new?DefaultRequestProcessor(this),?this.remotingExecutor);
          ????}
          }

          最終注冊(cè)到為NettyRemotingServerdefaultRequestProcessor屬性:

          @Override
          public?void?registerDefaultProcessor(NettyRequestProcessor?processor,?ExecutorService?executor)?{
          ????this.defaultRequestProcessor?
          ????????????=?new?Pair<NettyRequestProcessor,?ExecutorService>(processor,?executor);
          }

          好了,到這里NettyRemotingServer相關(guān)的配置就準(zhǔn)備完成了,這個(gè)過程中一共準(zhǔn)備了4個(gè)線程池:

          1. publicExecutor:暫時(shí)不知道做啥的,后面遇到了再分析
          2. eventLoopGroupBoss:處理netty連接事件的線程組
          3. eventLoopGroupSelector:處理netty讀寫事件的線程池
          4. remotingExecutor:暫時(shí)不知道做啥的,后面遇到了再分析
          3. 創(chuàng)建定時(shí)任務(wù)

          準(zhǔn)備完netty相關(guān)配置后,接著代碼中啟動(dòng)了一個(gè)定時(shí)任務(wù):

          this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{
          ????@Override
          ????public?void?run()?{
          ????????NamesrvController.this.routeInfoManager.scanNotActiveBroker();
          ????}
          },?5,?10,?TimeUnit.SECONDS);

          這個(gè)定時(shí)任務(wù)位于NamesrvController#initialize方法中,每10s執(zhí)行一次,任務(wù)內(nèi)容由RouteInfoManager#scanNotActiveBroker提供,它所做的主要工作是監(jiān)聽broker的上報(bào)信息,及時(shí)移除不活躍的broker,關(guān)于源碼的具體分析,我們后面再詳細(xì)分析。

          3.2 啟動(dòng):NamesrvController#start

          分析完NamesrvController的初始化流程后,讓我們回到NamesrvStartup#start方法:

          public?static?NamesrvController?start(final?NamesrvController?controller)?throws?Exception?{

          ????...
          ????
          ????//?啟動(dòng)
          ????controller.start();

          ????return?controller;
          }

          接下來,我們來看看NamesrvController的啟動(dòng)流程:

          public?void?start()?throws?Exception?{
          ????//?啟動(dòng)nettyServer
          ????this.remotingServer.start();
          ????//?監(jiān)聽tls配置文件的變化,不關(guān)注
          ????if?(this.fileWatchService?!=?null)?{
          ????????this.fileWatchService.start();
          ????}
          }

          這個(gè)方法主要調(diào)用了NettyRemotingServer#start,我們跟進(jìn)去:

          public?void?start()?{
          ????...

          ????ServerBootstrap?childHandler?=
          ????????//?在?NettyRemotingServer#init?中準(zhǔn)備的兩個(gè)線程組
          ????????this.serverBootstrap.group(this.eventLoopGroupBoss,?this.eventLoopGroupSelector)
          ????????????.channel(useEpoll()???EpollServerSocketChannel.class?:?NioServerSocketChannel.class)

          ????????????//?省略?option(...)與childOption(...)方法的配置
          ????????????...
          ????????????//?綁定ip與端口
          ????????????.localAddress(new?InetSocketAddress(this.nettyServerConfig.getListenPort()))
          ????????????.childHandler(new?ChannelInitializer<SocketChannel>()?
          {
          ????????????????@Override
          ????????????????public?void?initChannel(SocketChannel?ch)?throws?Exception?{
          ????????????????????ch.pipeline()
          ????????????????????????.addLast(defaultEventExecutorGroup,?
          ????????????????????????????HANDSHAKE_HANDLER_NAME,?handshakeHandler)
          ????????????????????????.addLast(defaultEventExecutorGroup,
          ????????????????????????????encoder,
          ????????????????????????????new?NettyDecoder(),
          ????????????????????????????new?IdleStateHandler(0,?0,?
          ????????????????????????????????nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
          ????????????????????????????connectionManageHandler,
          ????????????????????????????serverHandler
          ????????????????????????);
          ????????????????}
          ????????????});

          ????if?(nettyServerConfig.isServerPooledByteBufAllocatorEnable())?{
          ????????childHandler.childOption(ChannelOption.ALLOCATOR,?PooledByteBufAllocator.DEFAULT);
          ????}

          ????try?{
          ????????ChannelFuture?sync?=?this.serverBootstrap.bind().sync();
          ????????InetSocketAddress?addr?=?(InetSocketAddress)?sync.channel().localAddress();
          ????????this.port?=?addr.getPort();
          ????}?catch?(InterruptedException?e1)?{
          ????????throw?new?RuntimeException("this.serverBootstrap.bind().sync()?InterruptedException",?e1);
          ????}

          ????...
          }

          這個(gè)方法中,主要處理了NettyRemotingServer的啟動(dòng),關(guān)于其他一些操作并非我們關(guān)注的重點(diǎn),就先忽略了。

          可以看到,這個(gè)方法里就是處理了一個(gè)netty的啟動(dòng)流程,關(guān)于netty的相關(guān)操作,非本文重點(diǎn),這里就不多作說明了。這里需要指出的是,在netty中,如果Channel是出現(xiàn)了連接/讀/寫等事件,這些事件會(huì)經(jīng)過Pipeline上的ChannelHandler上進(jìn)行流轉(zhuǎn),NettyRemotingServer添加的ChannelHandler如下:

          ch.pipeline()
          ????.addLast(defaultEventExecutorGroup,?
          ????????HANDSHAKE_HANDLER_NAME,?handshakeHandler)
          ????.addLast(defaultEventExecutorGroup,
          ????????encoder,
          ????????new?NettyDecoder(),
          ????????new?IdleStateHandler(0,?0,?
          ????????????nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
          ????????connectionManageHandler,
          ????????serverHandler
          ????);

          這些ChannelHandler只要分為幾類:

          1. handshakeHandler:處理握手操作,用來判斷tls的開啟狀態(tài)
          2. encoder/NettyDecoder:處理報(bào)文的編解碼操作
          3. IdleStateHandler:處理心跳
          4. connectionManageHandler:處理連接請(qǐng)求
          5. serverHandler:處理讀寫請(qǐng)求

          這里我們重點(diǎn)關(guān)注的是serverHandler,這個(gè)ChannelHandler就是用來處理broker注冊(cè)消息、producer/consumer獲取topic消息的,這也是我們接下來要分析的重點(diǎn)。

          執(zhí)行完NamesrvController#startNameServer就可以對(duì)外提供連接服務(wù)了。

          4. 總結(jié)

          本文主要分析了NameServer的啟動(dòng)流程,整個(gè)啟動(dòng)流程分為3步:

          1. 創(chuàng)建controller:這一步主要是解析nameServer的配置并完成賦值操作
          2. 初始化controller:主要?jiǎng)?chuàng)建了NettyRemotingServer對(duì)象、netty服務(wù)線程池、定時(shí)任務(wù)
          3. 啟動(dòng)controller:就是啟動(dòng)netty 服務(wù)

          好了,本文的分析就到這里了,下篇文章我們繼續(xù)分析NameServer


          限于作者個(gè)人水平,文中難免有錯(cuò)誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。

          本文首發(fā)于微信公眾號(hào) Java技術(shù)探秘,如果您喜歡本文,歡迎關(guān)注該公眾號(hào),讓我們一起在技術(shù)的世界里探秘吧!


          瀏覽 91
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  天天干在线观看 | 免费高清在线观看黄色视频 | 91av成人网站 | 日韩欧美mv国产 | 婷五月丁香乱伦电影网站 |