<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          NioServerSocketChannel的注冊源碼解析

          共 11659字,需瀏覽 24分鐘

           ·

          2021-07-13 11:55

          有道無術(shù),術(shù)尚可求也!有術(shù)無道,止于術(shù)!

          我們上一章分析了Netty中NioServerSocketChaennl的創(chuàng)建于初始化,本章節(jié)將繼續(xù)分析NioServerSocketChannel的分析,NioServerSocketChannel是Netty官方封裝的一個通道對象,旨用來代替或者包裝JDK原生的SocketChannel對象,那么他是如何講NioServerSocketChannel于JDK的NIO相關(guān)代碼關(guān)聯(lián)起來的呢?

          一、源碼入口尋找

          我們上一節(jié)課主要分析的源碼方法是initAndRegister方法,其實從名字可以看出來,這里是做通道的初始化于注冊的,我們繼續(xù)回到這個方法,該方法的尋找,參照上一章節(jié):

          AbstractBootstrap#initAndRegister

          我們跳過上節(jié)課已經(jīng)分析的代碼,直接來到注冊相關(guān)的邏輯:

          ChannelFuture regFuture = config().group().register(channel);

          我們逐個方法進行分析:

          config()
          image-20210429084738842

          現(xiàn)在我們創(chuàng)建的ServerBootstrap,所以為什么選這個我就不多說了:

          private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);

          我們可以看到,他返回的是這個對象,該對象是再創(chuàng)建ServerBootstrap的時候自動創(chuàng)建的,我們看,他構(gòu)造方法里面穿了一個this,證明他持有一個ServerBootstrap的引用,這代表著他可以通過這個對象,獲取ServerBootstrap內(nèi)所有的屬性和方法!獲取到這個類之后干嘛了呢?

          config().group()

          估計大家很多都已經(jīng)猜出來了,我們直接點進group里面去驗證一下:

          @SuppressWarnings("deprecation")
          public final EventLoopGroup group() {
          return bootstrap.group();
          }

          該代碼是獲取到了我們再構(gòu)建ServerBootstrap的時候設(shè)置的bossGroup對象,有興趣的可以追一下,這里比較簡單就不做太多的闡述了,我們繼續(xù)回到主線,

          config().group().register(channel);

          我們通過上述代碼的分析,知道了group方法返回的是NioEventLoopGroup,我們進入到register方法:

          image-20210429090641133

          我們發(fā)現(xiàn)這里并沒有NioEventLoopGroup,但是通過前幾章我們的學習,我們知道NioEventLoopGroup是MultithreadEventLoopGroup的子類,所以我們子類沒有往父類找,我們進入到MultithreadEventLoopGroup源碼里面:

          @Override
          public ChannelFuture register(Channel channel) {
          //一般來說這里獲取的NioEventLoop 他有繼承與 SingleThreadEventLoop
          return next().register(channel);
          }

          在這里,我們看到了一個我們前面分析過得代碼,next(),他調(diào)用的是chooser.next();, chooser是我們在構(gòu)建NioEventLoopGroup的時候創(chuàng)建的一個執(zhí)行器的選擇器,next方法的功能是輪訓的返回一個線程執(zhí)行器:NioEventLoop!記不太清的同學可以回頭看NioEventLoopGroup初始化源碼解析的那一章代碼!

          現(xiàn)在我們根據(jù)前幾章的基礎(chǔ),我們知道了next()方法返回的是一個NioEventLoop類,我們進入到register()方法查看:

          image-20210429095521189

          但是,我們發(fā)現(xiàn)NioEventLoop相關(guān)的實現(xiàn),但是我們根據(jù)前面所學,我們可以知道,NioEventLoop的父類是SingleThreadEventLoop,所以我們進入到 SingleThreadEventLoop#register(io.netty.channel.Channel):

          @Override
          public ChannelFuture register(Channel channel) {
          //調(diào)用本身的注冊方法
          return register(new DefaultChannelPromise(channel, this));
          }

          //沒什么可說的繼續(xù)往下追
          @Override
          public ChannelFuture register(final ChannelPromise promise) {
          ObjectUtil.checkNotNull(promise, "promise");
          promise.channel().unsafe().register(this, promise);
          return promise;
          }

          我們一定能夠猜到,這里的主要代碼是:promise.channel().unsafe().register(this, promise);

          我們上一章分析過 unsafe是 NioMessageUnsafe, 但是register卻沒有他的實現(xiàn):

          image-20210429100442771

          我們還是需要往父類追,進入到io.netty.channel.AbstractChannel.AbstractUnsafe#register(this, promise):

          我們這里先關(guān)注一下參數(shù) :

          this: 傳入的是他本身,他本身是個什么 NioEventLoop,也就是說,他傳入了一個執(zhí)行器

          promise:NioServerSocketChannel的包裝對象

          我們進入到 register方法中,分析主要代碼:

          @Override
          public final void register(EventLoop eventLoop, final ChannelPromise promise) {
          ......................暫時忽略不必要代碼.............................
          AbstractChannel.this.eventLoop = eventLoop;
          //注意此時的thread = null 所以返回false
          if (eventLoop.inEventLoop()) {
          //實際的注冊 注冊selector 觸發(fā) handlerAdded事件和 channelRegistered事件
          register0(promise);
          } else {
          .......................暫時忽略不必要代碼......................
          }
          }
          AbstractChannel.this.eventLoop = eventLoop;

          首先我們將上一步獲取的執(zhí)行器保存在NioServerSocketChannel中!  這行代碼有力的證明了,每一個Channel綁定一個NioEventLoop對象!

          if (eventLoop.inEventLoop()) {
          //實際的注冊 注冊selector 觸發(fā) handlerAdded事件和 channelRegistered事件
          register0(promise);
          }

          注意:這里我需要澄清一點,真實的調(diào)試過程中,并不會走這個分支,而是會走else分支異步進行注冊,這里為了更方便大家理解,我就依照if分支進行源碼分析,其實沒有太大變化,都是調(diào)用register0方法進行注冊,只不過一個同步一個異步,關(guān)于異步,是Netty中及其重要的一個知識點,我將放到后面單獨開一章進行講解!

          我們進入到register0源碼里面:

          private void register0(ChannelPromise promise) {
          try {
          ..............忽略代碼..................
          ]
          //實際的注冊 調(diào)用jdk底層的數(shù)據(jù)注冊selector
          // 調(diào)用 JDK 底層的 register() 進行注冊
          //io.netty.channel.nio.AbstractNioChannel.doRegister
          doRegister();
          neverRegistered = false;
          registered = true;

          //通知管道 傳播handlerAdded事件
          //觸發(fā) handlerAdded 事件 觸發(fā)任務(wù) add事件
          pipeline.invokeHandlerAddedIfNeeded();

          safeSetSuccess(promise);
          //通知管道 傳播channelRegistered事件
          // 觸發(fā) channelRegistered 事件
          pipeline.fireChannelRegistered();
          // 如果從未注冊過頻道,則僅觸發(fā)channelActive。
          // 如果取消注冊并重新注冊通道,則多個通道處于活動狀態(tài)。
          //isActive() 返回false
          // 此時 Channel 還未注冊綁定地址,所以處于非活躍狀態(tài)
          if (isActive()) {
          ....................忽略不必要代碼..................
          }
          } catch (Throwable t) {
          // 直接關(guān)閉通道以避免FD泄漏。
          closeForcibly();
          closeFuture.setClosed();
          safeSetFailure(promise, t);
          }
          }

          二、源碼解析

          doRegister();

          doRegister();

          真正的注冊方法,該方法是將Netty本身的NioServerSocket與JDK連接起來的最重要的一個類!

          image-20210429142545350
          selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

          javaChannel()方法是返回JDK原生的SocketChannel,他是再NioServerSocketChannel初始化的時候被保存的,還記得我們再講述NIO開發(fā)Socket的時候的流程嗎

          image-20210429143500583

          我們重點關(guān)注一下javaChannel().register的參數(shù):

          eventLoop().unwrappedSelector():NioEventLoop再創(chuàng)建的時候,會保存兩個選擇器,一個是JDK的原始的選擇器,一個是經(jīng)過Netty包裝的選擇器,這里返回的是原生的選擇器!

          0:不關(guān)注任何事件

          this:this代表著當前類,他是NioServerSocketChannel類型的,他將一個NioServerSocketChannel的對象,綁定到了JDK原生的選擇器,后續(xù)只需要通過SelectionKey.attachment(),就能獲取到NioServerSocketChannel,而一個NioServerSocketChannel里面又包含一個JDK原生的Channel對象,就可以基于該jdk原生的Channel來進行各種讀寫操作!

          到現(xiàn)在為止,我們就完成JDK中的NIO的將通道綁定到選擇器上,我們回到上一步:

          pipeline.invokeHandlerAddedIfNeeded

          pipeline.invokeHandlerAddedIfNeeded();

          開始回調(diào)pipeline通道里面添加自定義事件:

          final void invokeHandlerAddedIfNeeded() {
          assert channel.eventLoop().inEventLoop();
          if (firstRegistration) {
          firstRegistration = false;
          // 現(xiàn)在,我們已注冊到EventLoop?,F(xiàn)在該調(diào)用ChannelHandler的回調(diào)了,
          // 在完成注冊之前添加的內(nèi)容。
          callHandlerAddedForAllHandlers();
          }
          }

          //callHandlerAddedForAllHandlers
          private void callHandlerAddedForAllHandlers() {
          //task = PendingHandlerAddedTask
          PendingHandlerCallback task = pendingHandlerCallbackHead;
          while (task != null) {
          task.execute();
          task = task.next;
          }
          }

          需要注意的是 PendingHandlerCallback task 是PendingHandlerAddedTask類型的,他是什么時候加載的呢?實在我們初始化NioServerSocketChannel的時候調(diào)用addLast方法的時候被賦的值,有興趣的小伙伴可以自己去跟一下源碼,這里直接進入到:

          image-20210429155614576
          if (executor.inEventLoop()) {
          callHandlerAdded0(ctx);
          }

          //進入到 callHandlerAdded0源碼邏輯
          private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
          try{
          ctx.callHandlerAdded();
          }
          .......................
          }

          //進入到ctx.callHandlerAdded();
          final void callHandlerAdded() throws Exception {
          if (setAddComplete()) {
          handler().handlerAdded(this);
          }
          }

          還接記得handler()嗎,我再NioServerSocketChannel初始化的時候說過,當時程序向pipeline中添加了一個ChannelInitializer,這里返回的就是那個ChannelInitializer!  我們進入到ChannelInitializer#handlerAdded方法里面:

          @Override
          public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
          if (ctx.channel().isRegistered()) {
          if (initChannel(ctx)) {
          removeState(ctx);
          }
          }
          }

          首先我們重點關(guān)注一個 initChannel(ctx),

          private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
          // 防止再次進入。
          if (initMap.add(ctx)) {
          try {
          // 調(diào)用 ChannelInitializer 實現(xiàn)的 initChannel() 方法
          initChannel((C) ctx.channel());
          } catch (Throwable cause) {
          ................................
          } finally {
          ChannelPipeline pipeline = ctx.pipeline();
          if (pipeline.context(this) != null) {
          // 將 ChannelInitializer 自身從 Pipeline 中移出
          pipeline.remove(this);
          }
          }
          return true;
          }
          return false;
          }
          initChannel((C) ctx.channel());

          該方法會回調(diào)ChannelInitializer的抽象方法initChannel,該抽象方法在我們初始化的時候完成,我們就要找到實現(xiàn)這個抽象方法的地方,我們回到上一節(jié)課的代碼: io.netty.bootstrap.ServerBootstrap#init

          void init(Channel channel) {
          ..........................;
          p.addLast(new ChannelInitializer<Channel>() {
          @Override
          public void initChannel(final Channel ch) {
          final ChannelPipeline pipeline = ch.pipeline();
          //將用戶自定義的handler添加進管道 handler 是在構(gòu)建ServerBootStr的時候傳入的 handler
          ChannelHandler handler = config.handler();
          if (handler != null) {
          pipeline.addLast(handler);
          }

          ch.eventLoop().execute(() -> {
          pipeline.addLast(new ServerBootstrapAcceptor(
          ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
          });
          }
          }

          上一節(jié)課講的時候,我們將這一段邏輯略過了,只說是會向通道中添加一個ChannelInitializer實現(xiàn),現(xiàn)在開始回調(diào)他的initChannel方法了:

          ChannelHandler handler = config.handler();
          if (handler != null) {
          pipeline.addLast(handler);
          }

          這段代碼會將客戶再構(gòu)建ServerBootstrap的時候傳入的handler添加進通道,我們?yōu)榱朔奖憷斫?,假設(shè)用戶沒有設(shè)置handler,所以這個handler判斷不通過,跳過,我們繼續(xù)往下:

          ch.eventLoop().execute(() -> {
          pipeline.addLast(new ServerBootstrapAcceptor(
          ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
          });

          這里異步的向管道流注冊一個默認的Handler, 為什么說是異步的,我們暫且不說,我們暫且認為是同步的進行add,此時我們的通道如下:

          image-20210429162226540

          ServerBootstrapAcceptor的作用是專門用于新連接接入的,

          ServerBootstrapAcceptor(
          final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
          Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
          this.childGroup = childGroup;
          this.childHandler = childHandler;
          this.childOptions = childOptions;
          this.childAttrs = childAttrs;

          enableAutoReadTask = new Runnable() {
          @Override
          public void run() {
          channel.config().setAutoRead(true);
          }
          };
          }

          我們可以看到,他會保存一系列的參數(shù),包括WorkGroup、childHandler、childOptions、childAttrs這些參數(shù)都是我們再創(chuàng)建serverBootstrap的時候傳入的參數(shù),這也證明了,這些參數(shù)是作用于客戶端Socket連接的!

          有關(guān)ServerBootstrapAcceptor,后續(xù)會進行一個詳細的分析,我們接著說,這里需要重點講一下addLast方法,

          @Override
          public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
          ........................忽略.........................
          //通知添加方法回調(diào)
          callHandlerAdded0(newCtx);
          return this;
          }

          在進行添加的時候,他會回調(diào)內(nèi)部產(chǎn)生的handlerAdded方法,還記得,我們在介紹Netty的基本架構(gòu)的業(yè)務(wù)通道章節(jié)嗎?

          image-20210429163456014

          再調(diào)用addLast之后,該方法會被回調(diào)!

          這樣就講所有的方法注冊完畢了,我們繼續(xù)回到ChannelInitializer#handlerAdded方法,當**initChannel(ctx)**調(diào)用完了之后:

          private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
          // 防止再次進入。
          if (initMap.add(ctx)) {
          try {
          // 調(diào)用 ChannelInitializer 實現(xiàn)的 initChannel() 方法
          initChannel((C) ctx.channel());
          } catch (Throwable cause) {
          ................................
          } finally {
          ChannelPipeline pipeline = ctx.pipeline();
          if (pipeline.context(this) != null) {
          // 將 ChannelInitializer 自身從 Pipeline 中移出
          pipeline.remove(this);
          }
          }
          return true;
          }
          return false;
          }

          我們會進入到finally里面,我們會看到,此時會做一個操作,刪除當前的類,當前的類是誰,是ChannelInitializer,所以刪除完畢后,此時管道對象的結(jié)構(gòu)如圖所示:

          image-20210429173529722

          至此 invokeHandlerAddedIfNeeded 分析完畢

          pipeline.fireChannelRegistered();

          @Override
          public final ChannelPipeline fireChannelRegistered() {
          AbstractChannelHandlerContext.invokeChannelRegistered(head);
          return this;
          }

          這行代碼其實沒什么可說的,大家可以自己跟一下調(diào)試一下代碼,這個代碼的意義是從HeadContext節(jié)點開始傳播channelRegistered方法:

          image-20210429174602462

          至此,NioServerSocketChannel的注冊基本就分析完了,有的同學可能覺得少分析了一段:

          if (isActive()) {
          if (firstRegistration) {
          //Channel 當前狀態(tài)為活躍時,觸發(fā) channelActive 事件
          pipeline.fireChannelActive();
          } else if (config().isAutoRead()) {
          // 該通道已注冊,并已設(shè)置autoRead()。這意味著我們需要開始閱讀
          // 再次,以便我們處理入站數(shù)據(jù)。
          //
          // See https://github.com/netty/netty/issues/4805
          //開始讀事件
          beginRead();
          }
          }

          這段代碼再第一次啟動的時候并不會被調(diào)用,因為此時通道還沒有綁定端口正式啟動起了,所以這里isActive會返回false,有關(guān)邏輯,會在新連接接入講解的時候進行分析!

          三、總結(jié)

          1. Netty會調(diào)用JDK底層的注冊方法,同時將本身的NioServerSocketChannel作為att綁定到選擇事件上!
          2. 當注冊完成后會回調(diào) handlerAdded方法
          3. Netty會回調(diào)再初始化NioServerSocketChannel的時候注冊的Channelinitialization, 添加一個新連接接入器ServerBootstrapAcceptor,并刪除本身!
          4. 當注冊完成后會回調(diào)Channelregistered方法

          才疏學淺,如果文章中理解有誤,歡迎大佬們私聊指正!歡迎關(guān)注作者的公眾號,一起進步,一起學習!



          ??「轉(zhuǎn)發(fā)」「在看」,是對我最大的支持??



          瀏覽 55
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  人人摸在线精品视频 | 七区九区一区在线 | 久久永久免费 | 亚洲AV永久无码国产精品国产 | 做爱视频软件 |