<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Netty服務端的新連接接入源碼解析

          共 7756字,需瀏覽 16分鐘

           ·

          2021-07-26 17:33

          有道無術,術尚可求也!有術無道,止于術!

          經(jīng)過上一章節(jié)的學習,我們基本了解了Netty是如何對IO事件以及異步任務的處理了,今天我們就一起來學習一下,Netty是如何處理新連接接入與數(shù)據(jù)讀取的!

          一、源碼尋找

          我們上一章節(jié)學到了,當存在IO事件的時候,Netty的反應堆線程會監(jiān)聽這些事件,然后進行處理,忘記的,可以回顧一下上一章節(jié),,我們這里直接進入到:

          io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)

          這里的代碼,我們昨天只是做了一個大概的分析,并沒有深入的講解,這一章節(jié)具體分析一下新連接的接入和Channel數(shù)據(jù)的讀取。

          private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
          final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
          .........忽略有效性驗證................

          try {
          int readyOps = k.readyOps();
          ...................忽略其他 事件的處理邏輯...............
          if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
          unsafe.read();
          }
          } catch (CancelledKeyException ignored) {
          unsafe.close(unsafe.voidPromise());
          }
          }

          我們重點關注當事件存在讀事件或者新連接接入事件的時候,才會進入到這一判斷邏輯,那么由此可見,我們兵丁是要關注unsafe.read() 這一行代碼了!

          二、新連接接入源碼分析

          首選,我們聲明一下,我們現(xiàn)在一直是按照服務端啟動邏輯進行分析的,那么服務端邏輯分析,對照通道就是NioServerSocketChannel, 我們在創(chuàng)建NioServerSocketChannel的時候初始化過一個Unsafe對象,他是NioMessageUnsafe類型的,如果有疑問的同學可以回顧一下NioServerSocketChannel的初始化過程!

          所以,必然,我們這里的unsafe.read();  就必然進入的是NioMessageUnsafe的read方法:

          image-20210504115544746
          @Override
          public void read() {
          ..................忽略不必要代碼............
          try {
          try {
          do {
          //讀取數(shù)據(jù) 可能是數(shù)據(jù) 也可能是新連接
          int localRead = doReadMessages(readBuf);
          //如果沒數(shù)據(jù)就跳出
          if (localRead == 0) {
          break;
          }
          //-1 就是連接被關閉
          if (localRead < 0) {
          closed = true;
          break;
          }
          //讀取的連接數(shù)增加
          allocHandle.incMessagesRead(localRead);
          //每次默認讀取最大16個連接 剩余的后續(xù)去讀
          } while (allocHandle.continueReading());
          } catch (Throwable t) {
          exception = t;
          }
          //獲取連接數(shù)量或者讀取的數(shù)據(jù)的數(shù)量
          int size = readBuf.size();
          for (int i = 0; i < size; i ++) {
          readPending = false;
          //開始傳播channelRead屬性
          pipeline.fireChannelRead(readBuf.get(i));
          }
          //清空緩沖區(qū)
          readBuf.clear();
          allocHandle.readComplete();
          //傳播讀取完成事件
          pipeline.fireChannelReadComplete();
          ...................忽略不必要代碼.......................
          } finally {
          ...................忽略不必要代碼.......................
          }
          }

          1. 讀取新連接

          int localRead = doReadMessages(readBuf);

          這行代碼是讀取新連接的主要邏輯:

          image-20210504181959983
          @Override
          protected int doReadMessages(List<Object> buf) throws Exception {
          //調(diào)用JDK ServerSocketChannel獲取新連接 JDK SocketChannel
          SocketChannel ch = SocketUtils.accept(javaChannel());
          try {
          if (ch != null) {
          //將客戶端連接直接包裝為 Netty的管道包裝對象 NioSocketChannel
          buf.add(new NioSocketChannel(this, ch));
          return 1;
          }
          } catch (Throwable t) {
          ................忽略異常處理.............
          }
          return 0;
          }

          可以看到這里的邏輯比較簡單,首先,Netty會使用先前保存的JDK 的原生的SocketChannel調(diào)用accept方法進行獲取JDK新連接的管道!

          注意此時獲取的管道是JDK NIO的原生的管道對象,和Netty還沒有關系,然后再將JDK NIO原生的Channel包裝為Netty的NioSocketChannel放到緩沖區(qū)里面,注意此時放到緩沖區(qū)里面的對象就是Netty的包裝對象了!包裝完成之后直接返回 ,此時我們的緩沖區(qū)就存在數(shù)據(jù)量,這個數(shù)據(jù)是NioSocketChannel對象!

          我們回到主線 read方法,當調(diào)用完doMessage方法之后開始就要處理這個NioSocketChannel了呀!

          2. 處理新連接的管道

          pipeline.fireChannelRead(readBuf.get(i));

          從代碼上看,可以看到,他是把剛剛我們讀到的NioSocketChannel出來往下傳播,這個代碼是在通道內(nèi)傳播,我們前幾節(jié)課講過,此時Pipeline的結構是如圖所示的數(shù)據(jù)結構:

          image-20210504194847065

          我們看如下代碼:

          @Override
          public final ChannelPipeline fireChannelRead(Object msg) {
          AbstractChannelHandlerContext.invokeChannelRead(head, msg);
          return this;
          }

          他是從頭節(jié)點開始傳播的,channelRead的傳播是自上而下的,所以就勢必會傳播到 ServerBootstrapAcceptor的邏輯中,所以我們進入到ServerBootstrapAcceptor#channelRead方法:

          public void channelRead(ChannelHandlerContext ctx, Object msg) {
          //能進入都這一段邏輯的就必定是通道對象,因為只有服務端管道會存在該處理器
          final Channel child = (Channel) msg;
          //向服務端管道追加childHandler 在構建ServerBootStrap的時候傳入的
          child.pipeline().addLast(childHandler);
          //在構建ServerBootStrap的時候傳入的
          setChannelOptions(child, childOptions, logger);
          //在構建ServerBootStrap的時候傳入的
          setAttributes(child, childAttrs);
          try {
          //開始進行注冊,注冊邏輯同NioServerSocketChannel相同
          childGroup.register(child).addListener(new ChannelFutureListener() {
          @Override
          public void operationComplete(ChannelFuture future) throws Exception {
          if (!future.isSuccess()) {
          forceClose(child, future.cause());
          }
          }
          });
          } catch (Throwable t) {
          forceClose(child, t);
          }
          }

          我們可以看到這個,先是向通道內(nèi)注冊一些客戶端的參數(shù),然后開始進行注冊Channel, 注冊的時候同NioServerSocketChannel的注冊邏輯一樣,只不過NioSocketChannel的關注事件是OP_READ事件,這里留一個作業(yè),同學們可以自己分析一下NioSocketChannel的創(chuàng)建,分析一下它的注冊邏輯與反應堆邏輯!

          三、客戶端數(shù)據(jù)讀取源碼解析

          我們還是直接回到

          io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)

          private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
          final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
          .........忽略有效性驗證................

          try {
          int readyOps = k.readyOps();
          ...................忽略其他 事件的處理邏輯...............
          if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
          unsafe.read();
          }
          } catch (CancelledKeyException ignored) {
          unsafe.close(unsafe.voidPromise());
          }
          }

          上面分析過,負責客戶端新連接的通道是NioSocketChannel,大家自行分析一下內(nèi)部邏輯,與NioServerSocketChannel的相似度90%!

          1. 讀取通道數(shù)據(jù)

          NioSocketChannel的Unsafe是 NioByteUnsafe, 所以我們直接進入到:

          io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read

          查看他是如何進行數(shù)據(jù)讀取的:

          @Override
          public final void read() {
          ........................忽略........................
          //獲取客戶端通道管道
          final ChannelPipeline pipeline = pipeline();
          //獲取一個內(nèi)存分配器
          final ByteBufAllocator allocator = config.getAllocator();
          final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
          allocHandle.reset(config);

          ByteBuf byteBuf = null;
          boolean close = false;
          try {
          do {
          //分配一個ByteBuf緩沖區(qū)
          byteBuf = allocHandle.allocate(allocator);
          //開始向緩沖區(qū)內(nèi)寫入通道的數(shù)據(jù)
          allocHandle.lastBytesRead(doReadBytes(byteBuf));
          if (allocHandle.lastBytesRead() <= 0) {
          // 如果沒有讀取到緩沖區(qū),就釋放緩沖區(qū).
          byteBuf.release();
          byteBuf = null;
          //設置關閉標志
          close = allocHandle.lastBytesRead() < 0;
          if (close) {
          // There is nothing left to read as we received an EOF.
          readPending = false;
          }
          break;
          }

          allocHandle.incMessagesRead(1);
          readPending = false;
          //傳播一次readChnnel事件
          pipeline.fireChannelRead(byteBuf);
          byteBuf = null;
          } while (allocHandle.continueReading());

          allocHandle.readComplete();
          //傳播一次readChnnelComplete事件
          pipeline.fireChannelReadComplete();
          //關閉通道
          if (close) {
          closeOnRead(pipeline);
          }
          } catch (Throwable t) {
          ....................忽略不必要代碼.....................
          }
          }

          我們整體將邏輯分為以下幾個步驟:

          1. 獲取一個內(nèi)存分配器,Netty中存在一個專門用于分配ByteBuf的內(nèi)存分配器。這里是將它獲取出來!
          2. 使用上一步獲取的內(nèi)存分配器分配一塊緩沖區(qū),用域后續(xù)的使用!
          3. 開始讀取通道內(nèi)的數(shù)據(jù)寫入預先分配好的緩沖區(qū)!
          4. 讀取數(shù)據(jù)完畢后,將帶有數(shù)據(jù)的緩沖區(qū)調(diào)用pieline的傳播方法進行數(shù)據(jù)的傳播 readChannel方法!
          5. 當通道內(nèi)的數(shù)據(jù)被處理完后,傳播一次 channelReadComplete方法

          四、總結

          1. 在Netty中NioServerSocketChannel與NioSocketChannel的處理中,對于數(shù)據(jù)的讀取擁有不同的處理方法,NioServerSockerChannel主要用于處理新連接的,在初始化的時候就會在通道內(nèi)加入一個新連接接入器ServerBootstrapAcceptor

            NioServerSocketChannel對象在讀取到數(shù)據(jù)后將之包裝為NioSocketChannel對象,然后使用ServerBootstrapAcceptor進行NioSocketChannel的注冊與啟動反應堆線程!

          2. 當通道內(nèi)存在數(shù)據(jù)的時候,被NioSockerChannel探測到后,就會先分配一塊緩沖區(qū),將數(shù)據(jù)讀取進預先分配好的緩沖區(qū),然后進行數(shù)據(jù)的向下通道流轉(zhuǎn)(事件觸發(fā))!

          才疏學淺,如果文章中理解有誤,歡迎大佬們私聊指正!歡迎關注作者的公眾號,一起進步,一起學習!



          ??「轉(zhuǎn)發(fā)」「在看」,是對我最大的支持??



          瀏覽 52
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  成人影视久久久无码三区 | 老鸭窝在线观看视频 | 色色在线 | 午夜丁香综合 | 国产二区视频在线观看 |