<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Netty的異步任務處理與Socket事件處理

          共 16741字,需瀏覽 34分鐘

           ·

          2021-07-26 17:33

          有道無術,術尚可求也!有術無道,止于術!

          經(jīng)過前面幾章的學習,我們基本是明白了Netty通道的創(chuàng)建、注冊、與綁定與JDK NIO的對應關系,如果我們使用的是JDK NIO的方式去開發(fā)一個Socket服務端的時候,此時還缺少了一個重要的環(huán)節(jié),就是循環(huán)處理IO事件!

          我們前面不只一次的見到Netty的異步事件,因為我們某些知識還沒有學習到,所以我們都按照同步的方式去獲取的,所以我們本章節(jié)將帶你學習,Netty對于IO事件的處理與異步事件的處理!

          我們以綁定為出發(fā)點,由點到面進行分析!

          一、源碼入口

          我們直接進入到綁定的源碼分析:

          private static void doBind0(
          final ChannelFuture regFuture, final Channel channel,
          final SocketAddress localAddress, final ChannelPromise promise)
          {

          // 在觸發(fā)channelRegistered()之前調(diào)用此方法。給用戶處理程序一個設置的機會
          // 其channelRegistered()實現(xiàn)中的管道。
          channel.eventLoop().execute(() -> {
          if (regFuture.isSuccess()) {
          channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
          } else {
          promise.setFailure(regFuture.cause());
          }
          });
          }

          我們上節(jié)課直接分析的channel.bind方法,而忽略上上面的異步方法,這里我們開始分析異步方法,我們進入到channel.eventLoop().execute()方法:

          image-20210430145227945

          二、源碼分析

          我們前面分析過,每個Channel綁定一個NioEventLoop,而EventLoop又是SingleThreadEventExecutor的子類,所以我們進入到io.netty.util.concurrent.SingleThreadEventExecutor#execute(java.lang.Runnable):

          @Override
          public void execute(Runnable task) {
          ObjectUtil.checkNotNull(task, "task");
          execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
          }

          ---------------------------分界線------------------------------------

          //繼續(xù)往下追 execute
          private void execute(Runnable task, boolean immediate) {
          //判斷當前執(zhí)行的線程是不是 NIoEventLoopGroup的線程 這里是false
          boolean inEventLoop = inEventLoop();
          //將任務加入到隊列
          addTask(task);
          //這里永遠只能啟動一次 一個eventLoop
          if (!inEventLoop) {
          //啟動線程
          startThread();
          .....................................
          }
          //io.netty.channel.nio.NioEventLoop.selector
          if (!addTaskWakesUp && immediate) {
          wakeup(inEventLoop);
          }
          }

          我們這里可以分為兩部分:

          1. 添加任務

          addTask(task);

          ----------------------------------分界線---------------------------

          protected void addTask(Runnable task) {
          ObjectUtil.checkNotNull(task, "task");
          if (!offerTask(task)) {
          reject(task);
          }
          }

          基礎好一點的同學我估計已經(jīng)有點猜到了,單看這個 offerTask有沒有像和隊列相關的操作,我們進入到offerTask方法:

          final boolean offerTask(Runnable task) {
          ...............忽略.................
          return taskQueue.offer(task);
          }

          果不其然,果然是入隊操作,taskQueue是什么呢?

          image-20210430152558414

          我們再初始化NioEventLoop的源碼分析學習的時候,學習到,我們會創(chuàng)建兩個MpscQ隊列(多生產(chǎn)者,單消費者),這個taskQueue就是當時我們創(chuàng)建的一個任務隊列,這里面將我們提交的異步任務追加到隊列里面!

          返回異步任務是不是被追加到隊列里面了,如果隊列滿了,或者其他原因追加失敗的話,會返回false,就會執(zhí)行reject方法:

          protected final void reject(Runnable task) {
          rejectedExecutionHandler.rejected(task, this);
          }

          這個拒絕策略同樣是我們再創(chuàng)建NioEventLoop的時候創(chuàng)建保存的,給大家留一個作業(yè),去追一下這個拒絕策略,判斷一下當發(fā)生了添加異步任務失敗之后,會發(fā)生什么呢?

          2. 啟動消費線程

          startThread();

          -----------------------------分割線-------------------------
          /**
          * 啟動線程
          */

          private void startThread() {
          if (state == ST_NOT_STARTED) {
          if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
          boolean success = false;
          try {
          //啟動線程
          doStartThread();
          success = true;
          } finally {
          if (!success) {
          STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
          }
          }
          }
          }
          }

          注意,這里有個CAS操作 STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED); 判斷消費線程是不是已經(jīng)啟動,如果已經(jīng)啟動就不進入這個邏輯,如果沒啟動就進入這個邏輯!我們第一次調(diào)用,肯定沒啟動,進入這個邏輯:

          doStartThread();
          ----------------------------分割線---------------------------

          private void doStartThread() {
          assert thread == null;
          //創(chuàng)建一條線程并啟動
          //這個線程又EventLoop
          executor.execute(new Runnable() {
          @Override
          public void run() {
          //保存當前線程 給線程賦值的就是這里
          thread = Thread.currentThread();
          ...........................忽略........................
          try {
          //進行實際的啟動
          //io.netty.channel.nio.NioEventLoop.run
          SingleThreadEventExecutor.this.run();
          success = true;
          } catch (Throwable t) {
          logger.warn("Unexpected exception from an event executor: ", t);
          } finally {
          ...........................忽略........................
          }
          }
          ...........................忽略........................
          }
          ...........................忽略........................
          }

          代碼比較長,我們只分析主線邏輯:

          thread = Thread.currentThread();

          首先保存了一下當前線程到成員變量,這個分支不是很重要,后面有時間進行分析!

          SingleThreadEventExecutor.this.run();

          這個就是處理異步任務的代碼,我們進入到run方法查看:

          image-20210501112253211
          @Override
          protected void run() {
          int selectCnt = 0;
          for (;;) {
          try {
          int strategy;
          try {
          //存在任務就返回IO時間的數(shù)量,不存在任務就返回select阻塞等待事件發(fā)生
          strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
          switch (strategy) {
          case SelectStrategy.CONTINUE:
          continue;
          case SelectStrategy.BUSY_WAIT:
          //如果不存在異步任務 就進行事件選擇
          case SelectStrategy.SELECT:
          //下一個定時任務的截至時間 當不存在任務的時候就返回-1
          long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
          if (curDeadlineNanos == -1L) {
          curDeadlineNanos = NONE;
          }
          nextWakeupNanos.set(curDeadlineNanos);
          try {
          //不存在任務就去阻塞獲取IO事件
          if (!hasTasks()) {
          strategy = select(curDeadlineNanos);
          }
          } finally {
          nextWakeupNanos.lazySet(AWAKE);
          }
          default:
          }
          } catch (IOException e) {
          //替換一個選擇器
          rebuildSelector0();
          //選擇次數(shù)重置為0
          selectCnt = 0;
          //處理循環(huán)異常 主要處理方式就是睡眠一會讓程序主動釋放CPU
          handleLoopException(e);
          continue;
          }
          //本次循環(huán)次數(shù)+1
          selectCnt++;
          cancelledKeys = 0;
          needsToSelectAgain = false;
          //這里是默認值 50
          final int ioRatio = this.ioRatio;
          boolean ranTasks;
          //不會進這個分支
          if (ioRatio == 100) {
          try {
          if (strategy > 0) {
          processSelectedKeys();
          }
          } finally {
          // Ensure we always run tasks.
          ranTasks = runAllTasks();
          }
          //當存在I/O事件的時候
          } else if (strategy > 0) {
          //記錄一下當前的時間
          final long ioStartTime = System.nanoTime();
          try {
          //處理IO事件
          processSelectedKeys();
          } finally {
          //計算處理IO事件耗費的事件
          final long ioTime = System.nanoTime() - ioStartTime;
          //里面的時間是計算處理異步任務的時間盡量保持為1:1
          ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
          }
          } else {
          //沒有IO事件的話就處理異步任務
          ranTasks = runAllTasks(0);
          }

          if (ranTasks || strategy > 0) {
          if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
          logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
          selectCnt - 1, selector);
          }
          //沒有空輪詢的話三次一清空
          selectCnt = 0;
          //如果空輪詢的次數(shù)超過默認的512次 就處理空輪詢BUG的選擇器
          } else if (unexpectedSelectorWakeup(selectCnt)) {
          //空輪詢被處理后清空 輪詢次數(shù)
          selectCnt = 0;
          }
          } catch (CancelledKeyException e) {
          ...................忽略........................
          } finally {
          ...................忽略........................
          }
          }
          }

          這主線邏輯分為三個:如何解決IO事件、如何處理異步任務、如何解決空輪詢BUG!!分支代碼關注一下注釋,這里分析下主線代碼:

          I. I/O事件的處理

          processSelectedKeys();

          private void processSelectedKeys() {
          if (selectedKeys != null) {
          processSelectedKeysOptimized();
          } else {
          processSelectedKeysPlain(selector.selectedKeys());
          }
          }

          selectedKeys是我們在創(chuàng)建NIOEventLoop的時候,會創(chuàng)建一個優(yōu)化后的的SelectorKeySet集合,使用數(shù)組來實現(xiàn)的,大家忘記的話,可以會看一下NioEventLoop的初始化源碼篇!

          當你沒有禁用優(yōu)化的時候,就會進入到if分支,我們查看if內(nèi)部代碼的源碼:

          private void processSelectedKeysOptimized() {
          //開始遍歷所有的主鍵
          for (int i = 0; i < selectedKeys.size; ++i) {
          //獲取事件
          final SelectionKey k = selectedKeys.keys[i];
          //將該位置的數(shù)據(jù)制空
          selectedKeys.keys[i] = null;
          //獲取之間注冊NioServerSocketChannel的時候,綁定的Channel對象
          final Object a = k.attachment();

          if (a instanceof AbstractNioChannel) {
          //開始進行IO事件處理
          processSelectedKey(k, (AbstractNioChannel) a);
          } else {
          .........................忽略............................
          }
          .........................忽略............................
          }
          }

          獲取事件集合中的每一個key,同時獲取之前綁定的NioServerSocketChannel,然后調(diào)用processSelectedKey處理這個事件:

          private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
          final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
          if (!k.isValid()) {
          //當key失效之后,就關閉通道
          ....................忽略....................
          }

          try {
          //獲取當前事件的key 掩碼
          int readyOps = k.readyOps();
          //是否包含連接事件
          if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
          //獲取包含的事件
          int ops = k.interestOps();
          //剔除OP_CONNECT事件
          ops &= ~SelectionKey.OP_CONNECT;
          //重新更新關注的事件
          k.interestOps(ops);
          //傳播 connect事件
          unsafe.finishConnect();
          }
          //如果當前返回的關注事件的掩碼包含 OP_WRITE的話
          if ((readyOps & SelectionKey.OP_WRITE) != 0) {
          //開始向通道內(nèi)刷新數(shù)據(jù)
          ch.unsafe().forceFlush();
          }
          //如果當前的事件掩碼包含讀、新連接接入事件 或者 不關注任何事件的時候 傳播read事件
          if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //傳播read事件 可能是新連接接入也可能有數(shù)據(jù)可讀
          unsafe.read();
          }
          } catch (CancelledKeyException ignored) {
          //發(fā)生異常關閉通道
          unsafe.close(unsafe.voidPromise());
          }
          }

          大家可以看到,里面的處理基本和我們對于JDK NIO的處理一致,就是判斷各種事件然后進行對應的處理!

          II、異步任務的處理

          runAllTasks();
          protected boolean runAllTasks() {
          assert inEventLoop();
          boolean fetchedAll;
          boolean ranAtLeastOne = false;

          do {
          //合并任務 將定時任務的隊列里面的任務拉去出來,和異步任務的隊列進行合并
          fetchedAll = fetchFromScheduledTaskQueue();
          //開始執(zhí)行全部的任務
          if (runAllTasksFrom(taskQueue)) {
          ranAtLeastOne = true;
          }
          } while (!fetchedAll);

          if (ranAtLeastOne) {
          lastExecutionTime = ScheduledFutureTask.nanoTime();
          }
          afterRunningAllTasks();
          return ranAtLeastOne;
          }

          這里就是異步任務的被執(zhí)行的地方,這里分為兩個步驟:1. 合并任務   2.執(zhí)行taskQueue異步任務  3.執(zhí)行tailQueue異步任務!

          1. 合并任務

            fetchedAll = fetchFromScheduledTaskQueue();

            Netty在我們學習中已經(jīng)知道了兩種隊列,一種是taskQueue隊列,一種是tailQueue隊列,現(xiàn)在又出現(xiàn)了第三種隊列:scheduledTaskQueue,他是一個專門存放定時任務的對隊列,這里的合并任務就是將即將要執(zhí)行的任務合并到taskQueue中等待執(zhí)行!

            這行代碼執(zhí)行完畢后,所有即將要執(zhí)行的任務都被添加在了taskQueue隊列中,等待后續(xù)的執(zhí)行!

          2. 執(zhí)行taskQueue異步任務

            //注意這里傳入的是合并完成后額taskQueue
            runAllTasksFrom(taskQueue)

            上述代碼將對應的任務全部集中到了taskQueue隊列中后們這里開始消費taskQueue隊列進行執(zhí)行!我們可以適當?shù)目匆幌略创a:

            protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
            //從taskQueue隊列中彈出一個任務
            Runnable task = pollTaskFrom(taskQueue);
            if (task == null) {
            return false;
            }
            for (;;) {
            //執(zhí)行任務 調(diào)用run方法
            safeExecute(task);
            //繼續(xù)彈出任務
            task = pollTaskFrom(taskQueue);
            //如果彈出的任務為空
            if (task == null) {
            //直接返回
            return true;
            }
            }
            }
          3. 執(zhí)行tailQueue異步任務

            afterRunningAllTasks();

            這里開始執(zhí)行tailQueue節(jié)點的任務,可以看到,tailQueue節(jié)點的任務執(zhí)行優(yōu)先級低于上述兩種隊列!

            image-20210503101059511
            @Override
            protected void afterRunningAllTasks() {
            //注意這里傳入的是 tailQueue
            runAllTasksFrom(tailTasks);
            }

            //繼續(xù)往下看源碼
            protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
            //彈出任務
            Runnable task = pollTaskFrom(taskQueue);
            if (task == null) {
            return false;
            }
            for (;;) {
            //執(zhí)行任務
            safeExecute(task);
            //再次彈出任務
            task = pollTaskFrom(taskQueue);
            if (task == null) {
            //任務執(zhí)行完畢 返回true
            return true;
            }
            }
            }

            這里就不作過多講解了,這里和上面的邏輯基本一致,只是執(zhí)行的qeueb不是一個!

          III、解決臭名昭著的JDK空輪詢BUG

          可能大家大家都知道,JDK NIO在事件循環(huán)判斷的時候可能會出現(xiàn)空輪詢的BUG,導致CPU100%,雖然Oracle官方宣稱空輪詢的BUG已經(jīng)解決了,但是后續(xù)經(jīng)過一些公司實際的業(yè)務上證明并沒有解決,只是出現(xiàn)幾率小了點,Netty事實上并沒有解決這個空輪詢BUG只是用另外一種比較巧妙的方法規(guī)避開了,我們一起學習下:

          首先,我們先想一下,我們?nèi)绾螖喽ㄎ覀兊某绦蚩赡馨l(fā)生了空輪詢的BUG,學習過NIO的都知道,我們會調(diào)用一個selector.select()進行阻塞等待有完成的事件發(fā)生,當selet方法阻塞解除的時候,就證明一定有我么感興趣的事件發(fā)生,但是當我們發(fā)現(xiàn)select方法解除了阻塞,但是事件數(shù)量卻為0的時候,我們就認為可能出現(xiàn)了空輪詢的BUG!

          但是IO數(shù)量為0并不是一定出現(xiàn)了空輪詢的BUG,也可能外部調(diào)用了markUp方法,所以我們不能每一次出現(xiàn)事件數(shù)量為0的時候都認為程序出現(xiàn)了空輪詢BUG,所以我們就需要有一個記錄它出現(xiàn)該類異常情況發(fā)生的次數(shù),當發(fā)生的次數(shù)達到了我們設置的閾值,就證明它可能發(fā)生了空輪詢的BUG,這個時候需要處理這個空輪詢的BUG!

          那么如何處理呢? 我們?nèi)蝿瞻l(fā)生空輪詢問題是因為(JDK官方認為,這個Linux Epoll告訴JDK有事件了,但是JDK獲取事件的時候獲取了一個空,所以JDK只能返回一個0)所以就發(fā)生了空輪詢:

          JDK官方給出的解決方案

          Netty是使用的第三種,拋棄舊的選擇器,重建一個新的選擇器,然后替換舊的選擇器,我們一起看下源碼!

          我們看看Netty是如何做的,我們回到io.netty.channel.nio.NioEventLoop#run源碼:

          我還是,為了方便講解,把這段代碼貼出來省略和空輪詢無關的代碼(完整代碼見上):

          @Override
          protected void run() {
          int selectCnt = 0;
          for (;;) {
          ........................忽略進行事件選擇的代碼...................
          //本次循環(huán)次數(shù)+1
          selectCnt++;
          ....................忽略事件處理和異步任務執(zhí)行的代碼................
          //當處理的異步任務或者IO事件的數(shù)量大于0,證明沒有發(fā)生空輪詢
          if (ranTasks || strategy > 0) {
          //每隔三次打印一次日志
          if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
          logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
          selectCnt - 1, selector);
          }
          //沒有空輪詢的話清空
          selectCnt = 0;
          //如果出現(xiàn)異步任務為空 IO事件為空的話就會進入到這個邏輯
          } else if (unexpectedSelectorWakeup(selectCnt)) {
          //空輪詢被處理后清空 輪詢次數(shù)
          selectCnt = 0;
          }
          } catch (CancelledKeyException e) {
          ...................忽略........................
          } finally {
          ...................忽略........................
          }
          }

          可以仔細的看一下 上述代碼的注釋,我們進入到 unexpectedSelectorWakeup(selectCnt) 方法:

          private boolean unexpectedSelectorWakeup(int selectCnt) {
          ..............忽略日志打印................
          if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
          //判斷異常情況的次數(shù)是不是超過了預設的512次
          selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
          //開始重新構建一個selector
          rebuildSelector();
          return true;
          }
          return false;
          }

          我們讀源碼到這里,可以知道,當異常執(zhí)行的次數(shù)超過了閾值 512次,就會調(diào)用一個  rebuildSelector方法,我們點進去看一下:

          public void rebuildSelector() {
          if (!inEventLoop()) {
          execute(new Runnable() {
          @Override
          public void run() {
          rebuildSelector0();
          }
          });
          return;
          }
          rebuildSelector0();
          }

          我們按照慣例,按照同步方法調(diào)用 rebuildSelector0();

          private void rebuildSelector0() {
          //獲取原始的選擇器
          final Selector oldSelector = selector;
          //聲明一個新的選擇器
          final SelectorTuple newSelectorTuple;

          if (oldSelector == null) {
          return;
          }

          try {
          //創(chuàng)建一個新的選擇器,賦值給新的選擇器變量
          newSelectorTuple = openSelector();
          } catch (Exception e) {
          logger.warn("Failed to create a new Selector.", e);
          return;
          }

          int nChannels = 0;
          //開始遍歷舊的選擇器,將舊選擇器的IO事件的key,綁定到新創(chuàng)建的選擇器上
          for (SelectionKey key: oldSelector.keys()) {
          //獲取舊選擇器的管道
          Object a = key.attachment();
          try {
          //如果key失效了,就跳過!
          if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
          continue;
          }
          //獲取對應關注的事件掩碼
          int interestOps = key.interestOps();
          //將舊key置為失效
          key.cancel();
          //重新將管道綁定到新的選擇器上
          SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
          //替換管道里面保存的選擇器事件主鍵
          if (a instanceof AbstractNioChannel) {
          // Update SelectionKey
          ((AbstractNioChannel) a).selectionKey = newKey;
          }
          nChannels ++;
          } catch (Exception e) {
          ...............省略...............
          }
          }
          //重新保存新的優(yōu)化后的選擇器和原始選擇器
          selector = newSelectorTuple.selector;
          unwrappedSelector = newSelectorTuple.unwrappedSelector;

          try {
          //關閉舊的選擇器
          oldSelector.close();
          } catch (Throwable t) {
          if (logger.isWarnEnabled()) {
          ...............省略..................
          }
          }
          ...............省略..................
          }

          我們從上述代碼可以看到,Netty處理空輪詢的問題的策略是,當發(fā)現(xiàn)你可能發(fā)生空輪詢的次數(shù)超過了512次的時候,就直接重新獲取一個新的選擇器,然后將舊的選擇器直接替換掉,這樣空輪詢的BUG也就很輕易的解決了!

          三、總結

          1. 每一個EventLoop都會啟動一條永久運行的線程,用于處理異步任務和IO事件,我們稱之為Reactor線程。
          2. 如果存在IO事件的話,會先處理IO事件!
          3. Reactor線程會先將定時任務里面的任務合并到taskqueue里面,然后執(zhí)行!taskQueue執(zhí)行完畢后執(zhí)行tailQueue隊列的任務!
          4. 如果空輪詢的次數(shù)發(fā)生了512次,就認為發(fā)生了空輪詢的BUG,就會拋棄原來的選擇器,重建一個新的選擇器,將舊選擇器上的事件全部綁定到新的選擇器上,然后將舊選擇器刪除!

          才疏學淺,如果文章中理解有誤,歡迎大佬們私聊指正!歡迎關注作者的公眾號,一起進步,一起學習!



          ??「轉發(fā)」「在看」,是對我最大的支持??



          瀏覽 36
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  天堂网2020 | 操逼视频网址 | 国产一区二区三区四区五区在线 | 色婷婷综合在线 | 欧美pmⅴ |