<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Netty的ChannelPipline傳播源碼解析

          共 14729字,需瀏覽 30分鐘

           ·

          2021-07-26 17:33


          有道無術(shù),術(shù)尚可求也!有術(shù)無道,止于術(shù)!

          上一章節(jié) ,我們基本講述了Netty對于新連接接入與管道數(shù)據(jù)處理的源碼解析,大家可能發(fā)現(xiàn),在我們前面學(xué)習(xí)中,涉及到了很多的有關(guān)pipeline的操作,在前面介紹這些的時候,我為了保證主線邏輯的清晰,基本都是一概而過,本章節(jié)將對pipeline做一個詳細的講解!

          一、基礎(chǔ)鋪墊

          1. JAVA中的基本位運算符

          運算符描述
          &
          |
          ~
          ^異或
          <<左移
          >>右移

          2. 位運算解釋與實例

          &(與)

          十進制二進制
          30 0 1 1
          50 1 0 1
          & 后結(jié)果:10 0 0 1

          即:對應(yīng)位都為 1 時,才為 1,否則全為 0。

          |(或)

          十進制二進制
          30 0 1 1
          50 1 0 1
          | 后結(jié)果 :70 1 1 1

          即:對應(yīng)位只要有 1 時,即為 1,否則全為 0。

          ~(非)

          十進制二進制
          30 0 1 1
          ~ 后結(jié)果:121 1 0 0

          即:對應(yīng)位取反。

          異或 ^

          十進制二進制
          30 0 1 1
          50 1 0 1
          ^ 后結(jié)果:60 1 1 0

          即:只要對應(yīng)為不同即為 1。

          3. 配合Netty實例

          我們在以往學(xué)習(xí)Netty中見到過類似于以下代碼:

          selectionKey.interestOps(interestOps | readInterestOp);

          我們重點關(guān)注位運算:interestOps | readInterestOp

          該行代碼的意思是位運算計算一個數(shù)字,該數(shù)字包含 | 前后的數(shù)字!

          //初始化一個值
          int interestOps = 0;
          //給當前這個值增加一個可讀事件
          interestOps |= OP_READ;
          //給當前的值增加一個可寫的事件
          interestOps |= OP_WRITE;
          //判斷當前的事件是不是包含可讀事件 true
          boolean isRead = (interestOps & OP_READ) == OP_READ;
          //判斷當前的事件是不是不包含可讀事件 false
          boolean isRead = (interestOps & OP_READ) == 0;
          //剔除可讀事件
          interestOps &= ~OP_READ;
          //剔除可寫事件
          interestOps &= ~OP_WRITE;

          二、源碼解析

          1. 創(chuàng)建管道

          io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline

          protected DefaultChannelPipeline(Channel channel) {
          this.channel = ObjectUtil.checkNotNull(channel, "channel");
          succeededFuture = new SucceededChannelFuture(channel, null);
          voidPromise = new VoidChannelPromise(channel, true);
          //創(chuàng)建一個管道上下文 尾部節(jié)點
          tail = new TailContext(this);
          //創(chuàng)建一個管道上下文 頭部節(jié)點
          head = new HeadContext(this);
          //頭部節(jié)點的下一個節(jié)點設(shè)置為尾部節(jié)點
          head.next = tail;
          //尾部節(jié)點的上一個節(jié)點設(shè)置為頭部節(jié)點
          tail.prev = head;
          }

          可以看到,這里初始化管道的時候,管道內(nèi)部存在兩個Handler tail和head節(jié)點,兩個節(jié)點組成雙向鏈表!

          image-20210505233350257

          2. 向通道內(nèi)添加一個Handler處理器

          ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
          @Override
          public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
          System.out.println("channelRegistered");
          super.channelRegistered(ctx);
          }
          });

          上述代碼再一個Netty開發(fā)中是很常見的一個代碼,這里向通道內(nèi)添加了一個 ChannelInboundHandlerAdapter,我們進入到addLast方法:

          @Override
          public final ChannelPipeline addLast(ChannelHandler... handlers) {
          return addLast(null, handlers);
          }

          //進入到 addLast
          @Override
          public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
          ObjectUtil.checkNotNull(handlers, "handlers");

          for (ChannelHandler h: handlers) {
          if (h == null) {
          break;
          }
          addLast(executor, null, h);
          }

          return this;
          }

          //進入到 addLast(executor, null, h);
          @Override
          public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
          final AbstractChannelHandlerContext newCtx;
          synchronized (this) {
          //驗證是否重復(fù)添加改handler
          checkMultiplicity(handler);
          //將handler封裝為上下文對象
          newCtx = newContext(group, filterName(name, handler), handler);
          //將該節(jié)點添加到雙向鏈表中
          addLast0(newCtx);
          ........................忽略其他代碼..............
          }
          ........................忽略其他代碼..............
          return this;
          }

          這里總共分為兩步:

          1. 驗證Handler是否被重復(fù)添加

            checkMultiplicity(handler);
            private static void checkMultiplicity(ChannelHandler handler) {
            //驗證是不是 ChannelHandlerAdapter 類型的,如果不是直接忽略
            if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            //如果不是可共享的而且是已經(jīng)添加過的直接報錯
            if (!h.isSharable() && h.added) {
            throw new ChannelPipelineException(
            h.getClass().getName() +
            " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            //如果是可共享的或者未添加的,將該handler內(nèi)的 added屬性設(shè)置為true證明該handler已經(jīng)被添加
            h.added = true;
            }
            }

            他是如何判斷是否被添加過的呢?

            每一個Handler中都存在一個 added屬性,當這個屬性為true的時候,證明這個Handler已經(jīng)被添加過了,Netty常規(guī)情況下為了考慮線程安全問題,是不允許一個Handler被重復(fù)的使用的!

            但是我們有時候會有這樣一個需求,Handler的功能比較類似,而且我們通過代碼手段,避免了線程安全問題,所以又想重復(fù)添加Handler,Netty提供了一個注解 @Sharable注解,當存在該注解的時候,證明這個Handler是可以被復(fù)用的,可以被重復(fù)添加!

            所以,checkMultiplicity方法通過判斷類是否增加了@Sharable注解和added屬性是否為空來驗證Handle是否違規(guī)重復(fù)添加了!

            當驗證通過之后,將added設(shè)置為true,證明這個Handler已經(jīng)被添加過了!

          2. 將Handler封裝為包裝對象

            newCtx = newContext(group, filterName(name, handler), handler);

            這里比較難理解的就是這個,我們進入到newContext方法里面:

            private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
            return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
            }

            進入到 DefaultChannelHandlerContext類的源碼里面:

            DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
            //調(diào)用父類進行掩碼計算
            super(pipeline, executor, name, handler.getClass());
            //保存一個handler
            this.handler = handler;
            }

            這里除了會保存一個handler還會調(diào)用父類,我們介入到父類里面:

            AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
            String name, Class<? extends ChannelHandler> handlerClass) {
            this.name = ObjectUtil.checkNotNull(name, "name");
            this.pipeline = pipeline;
            this.executor = executor;
            //標識 是in還是out
            this.executionMask = mask(handlerClass);
            // 如果由EventLoop或給定的Executor驅(qū)動的驅(qū)動程序是OrderedEventExecutor的實例,則其順序為。
            ordered = executor == null || executor instanceof OrderedEventExecutor;
            }

            這里會保存一些屬性,這些屬性都是我們前面講過的,大家自行分析下,我們重點關(guān)注掩碼的計算:

            this.executionMask = mask(handlerClass);
            static int mask(Class<? extends ChannelHandler> clazz) {
            //直接再緩存中取出
            Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
            Integer mask = cache.get(clazz);
            //緩存中不存在
            if (mask == null) {
            mask = mask0(clazz);
            cache.put(clazz, mask);
            }
            return mask;
            }

            先從緩存中取出,如果不存在就調(diào)用 mask0(clazz); 方法計算,然后再放進緩存,我們進入到mask0(clazz);方法:

            private static int mask0(Class<? extends ChannelHandler> handlerType) {
            int mask = MASK_EXCEPTION_CAUGHT;
            try {
            if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
            // 如果是 ChannelInboundHandler 實例,所有 Inbound 事件置為 1
            mask |= MASK_ALL_INBOUND;
            //判斷是否存在Skip注解 如果催你在這個跳過的注解 就移除這個
            if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
            mask &= ~MASK_CHANNEL_REGISTERED;
            }
            ..................忽略類似的代碼.....................
            }

            if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
            mask |= MASK_ALL_OUTBOUND;

            if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
            SocketAddress.class, ChannelPromise.class))
            {
            mask &= ~MASK_BIND;
            }
            ..................忽略類似的代碼.....................
            }
            } catch (Exception e) {
            ..................忽略異常的代碼.....................
            }

            return mask;
            }

            這會區(qū)分兩種情況,一種是ChannelInboundHandler類型的,一種是ChannelOutboundHandler類型的,二者邏輯相同,我們以ChannelInboundHandler為例:

            首先,再ChannelHandlerMask類里面定義了很多的預(yù)設(shè)掩碼值:

            /**
            * 以下是方法代表的掩碼值
            */

            static final int MASK_EXCEPTION_CAUGHT = 1;
            /**
            * channelRegistered方法的掩碼
            */

            static final int MASK_CHANNEL_REGISTERED = 1 << 1;
            /**
            * channelUnregistered方法的掩碼
            */

            static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
            /**
            * 后面的以此類推
            */

            static final int MASK_CHANNEL_ACTIVE = 1 << 3;
            static final int MASK_CHANNEL_INACTIVE = 1 << 4;
            static final int MASK_CHANNEL_READ = 1 << 5;
            static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
            static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
            static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
            /**
            * bind方法的掩碼
            */

            static final int MASK_BIND = 1 << 9;
            /**
            * connect方法的掩碼
            */

            static final int MASK_CONNECT = 1 << 10;
            /**
            * 后面的以此類推
            */

            static final int MASK_DISCONNECT = 1 << 11;
            static final int MASK_CLOSE = 1 << 12;
            static final int MASK_DEREGISTER = 1 << 13;
            static final int MASK_READ = 1 << 14;
            static final int MASK_WRITE = 1 << 15;
            static final int MASK_FLUSH = 1 << 16;

            /**
            * 包含全部 Inbound方法的掩碼
            */

            private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_CHANNEL_REGISTERED |
            MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
            MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;

            /**
            * 包含全部 outbound方法的掩碼
            */

            private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
            MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;

            我們回到 mask0方法:

            mask |= MASK_ALL_INBOUND;

            一開始,我們會直接將一個handler的掩碼計算為擁有全部方法的掩碼!

            if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
            mask &= ~MASK_CHANNEL_REGISTERED;
            }

            判斷該方法是否存在 @Skip注解,如果存在就排除掉這個掩碼!

            整個邏輯執(zhí)行完畢后,這個掩碼就只會包含handler中沒有被@Sikp注解注解的方法掩碼!

            有同學(xué)可能疑問,我在書寫handler的時候并沒有增加@Sikp注解呀!  我們都知道,實現(xiàn)一個Handler就必定需要繼承 ChannelInboundHandlerAdapter或者ChannelOutboundHandlerAdapter, 我們隨便挑一個類進去看:

            image-20210506002314359

            可以看到,這些方法其實都是被默認添加了的,只不過我們重寫之后沒添加!現(xiàn)在我們明白,handler是如何區(qū)分你實現(xiàn)了那些方法的了!

            這里會將handler包裝為HandlerContext對象,類似于tailContext和HeadContext一樣,此時上下文對象的結(jié)構(gòu)如下:

            image-20210506002420080
          3. 將HandlerContext添加進pipeline中:

            addLast0(newCtx);
            private void addLast0(AbstractChannelHandlerContext newCtx) {
            AbstractChannelHandlerContext prev = tail.prev;
            newCtx.prev = prev;
            newCtx.next = tail;
            prev.next = newCtx;
            tail.prev = newCtx;
            }
            image-20210506000354375

          整個過程如上,無非就是指針指向地址的變換,比較簡單,不做深入分析!

          3. 刪除一個處理器

          ch.pipeline().remove("xxxxxx")
          @Override
          public final ChannelPipeline remove(ChannelHandler handler) {
          remove(getContextOrDie(handler));
          return this;
          }
          1. 尋找處理器Handler的上下文

            getContextOrDie(handler)
            private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
            //尋找handler
            AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
            if (ctx == null) {
            throw new NoSuchElementException(handler.getClass().getName());
            } else {
            return ctx;
            }
            }

            //context(handler);
            @Override
            public final ChannelHandlerContext context(ChannelHandler handler) {
            ObjectUtil.checkNotNull(handler, "handler");

            AbstractChannelHandlerContext ctx = head.next;
            for (;;) {

            if (ctx == null) {
            return null;
            }
            //循環(huán)迭代 判斷是否尋找到這個handler
            if (ctx.handler() == handler) {
            //返回這個handler的上下文對象
            return ctx;
            }

            ctx = ctx.next;
            }
            }
          2. 刪除這個處理器

            remove(getContextOrDie(handler));
            private <T extends ChannelHandler> T removeIfExists(ChannelHandlerContext ctx) {
            if (ctx == null) {
            return null;
            }
            return (T) remove((AbstractChannelHandlerContext) ctx).handler();
            }

            //直接進入到 刪除Handler的主要邏輯
            //(T) remove((AbstractChannelHandlerContext) ctx).handler();
            private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
            //首先刪除的handler不是tail和尾節(jié)點
            assert ctx != head && ctx != tail;

            synchronized (this) {
            //刪除上下文對象
            atomicRemoveFromHandlerList(ctx);
            ................忽略....................

            EventExecutor executor = ctx.executor();
            if (!executor.inEventLoop()) {
            executor.execute(new Runnable() {
            @Override
            public void run() {
            //回調(diào)handlerRemoved方法
            callHandlerRemoved0(ctx);
            }
            });
            return ctx;
            }
            }
            callHandlerRemoved0(ctx);
            return ctx;
            }

            首先我們關(guān)注 atomicRemoveFromHandlerList(ctx);

            private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
            //獲取該節(jié)點的上級節(jié)點
            AbstractChannelHandlerContext prev = ctx.prev;
            //獲取該節(jié)點的下級節(jié)點
            AbstractChannelHandlerContext next = ctx.next;
            //重建指針位置
            prev.next = next;
            next.prev = prev;
            }

            指針位置重建之后,我們回調(diào)handlerRemoved方法

            callHandlerRemoved0(ctx);

          至此我們就完成了pipeline的創(chuàng)建、添加、刪除的源碼解析!

          4. 管道事件傳播

          我們前面見到過很多的事件傳播代碼,我們以 channelRegistered 方法的事件回調(diào)為例:

          io.netty.channel.AbstractChannel.AbstractUnsafe#register0

          //通知管道  傳播channelRegistered事件
          // 觸發(fā) channelRegistered 事件
          pipeline.fireChannelRegistered();

          我們進入到改行代碼的源碼:

          @Override
          public final ChannelPipeline fireChannelRegistered() {
          //執(zhí)行注冊方法 從head方法
          AbstractChannelHandlerContext.invokeChannelRegistered(head);
          return this;
          }
          static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
          EventExecutor executor = next.executor();
          if (executor.inEventLoop()) {
          next.invokeChannelRegistered();
          } else {
          executor.execute(new Runnable() {
          @Override
          public void run() {
          next.invokeChannelRegistered();
          }
          });
          }
          }

          我們可以看到,這里使用了 next.invokeChannelRegistered();方法  我們依舊按照同步方法進行分析!

          private void invokeChannelRegistered() {
          if (invokeHandler()) {
          try {
          //現(xiàn)在調(diào)用的HeadContext的handler
          ((ChannelInboundHandler) handler()).channelRegistered(this);
          } catch (Throwable t) {
          notifyHandlerException(t);
          }
          } else {
          fireChannelRegistered();
          }
          }

          我們現(xiàn)在進入到了headContext,所以我們進入到: io.netty.channel.DefaultChannelPipeline.HeadContext#channelRegistered

          @Override
          public void channelRegistered(ChannelHandlerContext ctx) {
          invokeHandlerAddedIfNeeded();
          //向下傳播事件
          ctx.fireChannelRegistered();
          }

          這一段代碼除了執(zhí)行Head的invokeHandlerAddedIfNeeded方法之外,還又一次傳播了channelRegistered事件,我們進入到 ctx.fireChannelRegistered();:

          @Override
          public ChannelHandlerContext fireChannelRegistered() {
          invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
          return this;
          }

          我們?nèi)绻胍蛳聜鞑ィ覀兪紫葢?yīng)該找到下一個節(jié)點是誰才能傳播,Netty這里調(diào)用了findContextInbound(MASK_CHANNEL_REGISTERED)查找下一個節(jié)點,我我們先關(guān)注以下參數(shù) MASK_CHANNEL_REGISTERED, 他是channelRegistered方法的掩碼, 我們進入到  findContextInbound方法源碼:

          private AbstractChannelHandlerContext findContextInbound(int mask) {
          AbstractChannelHandlerContext ctx = this;
          do {
          //獲取下一個inbun事件
          ctx = ctx.next;
          //只要和掩碼&運算后不為0的都是 inbunt事件
          } while ((ctx.executionMask & mask) == 0);
          return ctx;
          }

          從當前節(jié)點向下尋找,只要 掩碼計算包含這個方法,就證明該context包含channelRegistered方法,就直接返回!

          尋找到了handler之后,就開始調(diào)用了:

          invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
          static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
          EventExecutor executor = next.executor();
          if (executor.inEventLoop()) {
          next.invokeChannelRegistered();
          } else {
          executor.execute(new Runnable() {
          @Override
          public void run() {
          next.invokeChannelRegistered();
          }
          });
          }
          }
          next.invokeChannelRegistered();

          具體邏輯就和上面分析的一致了,調(diào)用該handler的ChannelRegistered方法!

          傳播某一個事件,就會使用哪個事件的掩碼,從當前節(jié)點向下尋找,知道對應(yīng)的Handler之后,回調(diào)對應(yīng)的方法!

          關(guān)于管道的傳播,你明白了嗎?

          提一個問題, 觀察以下兩種傳播方式有何不同:

          ctx.fireChannelRegistered();
          ctx.pipeline().fireChannelRegistered();

          才疏學(xué)淺,如果文章中理解有誤,歡迎大佬們私聊指正!歡迎關(guān)注作者的公眾號,一起進步,一起學(xué)習(xí)!



          ??「轉(zhuǎn)發(fā)」「在看」,是對我最大的支持??


          瀏覽 52
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  MFYD-013 肉食人妻女上司が部下を誘惑し | 国家一级黄色片 | 久久视频网站 | 午夜成人免费视频 | 国产精品成人午夜福利 |