<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Netty 線程模型

          共 31787字,需瀏覽 64分鐘

           ·

          2021-06-18 07:31

          公眾號關注 “GitHub今日熱榜
          設為 “星標”,帶你挖掘更多開發(fā)神器!










          1.前言

           

          Netty為什么會高效?回答就是良好的線程模型,和內(nèi)存管理。在Java的NIO例子中就我將客戶端的操作單獨放在一個線程中處理了,這么做的原因在于如果將客戶端連接串起來,后來的連接就要等前一個處理完,當然這并不意味著多線程比單線程有優(yōu)勢,而是在于每個客戶端都需要進行讀取準備好的緩存數(shù)據(jù),再執(zhí)行一些業(yè)務邏輯。如果業(yè)務邏輯耗時很久,那么順序執(zhí)行的方式?jīng)]有多線程優(yōu)勢大。另一個方面目前多核CPU很常見了,多線程是個不錯的選擇。這些在第一節(jié)就說明過,也提到過NIO并不是提升了IO操作的速度,而是減少了CPU的浪費時間,這些概念不能搞混。


          本節(jié)不涉及內(nèi)存管理,只介紹相關的線程模型。


          2.核心類


           

          上圖就是我們需要關注的體系內(nèi)容了,主要從EventExecutorGroup開始往下看,再上層的父接口是JDK提供的并發(fā)包內(nèi)的內(nèi)容,基礎是線程池中可以執(zhí)行周期任務的線程池服務。所以從這我們可以知道Netty可以實現(xiàn)周期任務,比如心跳檢測。接口定義下面將逐一介紹。


          2.1 EventExecutorGroup


            

          • isShuttingDown():是否正在關閉,或者是已經(jīng)關閉。

          • shutdownGracefully():優(yōu)雅停機,等待所有執(zhí)行中的任務執(zhí)行完成,并不再接收新的任務。

          • terminationFuture():返回一個該線程池管理的所有線程都terminated的時候觸發(fā)的future。

          • shutdown():廢棄了的關閉方法,被shutdownGracefully取代。

          • next():返回一個被該Group管理的EventExecutor。

          • iterator():所有管理的EventExecutor的迭代器。

          • submit():提交一個線程任務。

          • schedule():周期執(zhí)行一個任務。

           

          上述方法基本上是對周期線程池的一個封裝,但是擴展了EventExecuotr概念,即分了若干個小組,處理事件。另外一個比較實用的就是優(yōu)雅停機了。


          2.2 EventLoopGroup


           

          EventLoopGroup中的方法很少,其主要是和channel結(jié)合了,就多了一個將channel注冊到線程池中的方法。


          2.3 EventExecutor


           

          EventExecutor繼承自EventExecutorGroup,這個之前也提到過該類,相當于Group中的一個子集。


          • next():就是找group中下一個子集

          • parent():就是所屬group

          • inEventLoop():當前線程是否是在該子集中

          • newXXX():這個是下一節(jié)內(nèi)容,此處不介紹。


          2.4 EventLoop


          該接口就一個方法,就是parent();EventLoop和EventLoopGroup與EventExecutor和EventExecutorGroup是一組相似的概念。了解這些就可以了。


          3 實現(xiàn)細節(jié)

           

          EventLoop和EventLoopGroup的實現(xiàn)十分簡單,簡單看下就可以了,這里介紹幾個重要的實現(xiàn)類。


          3.1 AbstractEventExecutor

           

          該類繼承自上節(jié)說過的AbstractExecutorService,其最重要的是execute方法未實現(xiàn)。該類是對AbstractExecutorService的一個進一步加工,添加了group的概念,和不同的Future創(chuàng)建方法。這里不要被之前的Java線程池模型所干擾,其不一定是線程池。回到上一節(jié)線程池的介紹,最終的樣子都是Execute方法決定的。


          3.2 AbstractScheduledEventExecutor

           

          該類是對AbstractEventExecutor的一個進一步實現(xiàn),其實現(xiàn)了周期任務的執(zhí)行。原理是內(nèi)部持有一個優(yōu)先隊列ScheduledFutureTask。所有周期任務都添加到這個隊列中,也實現(xiàn)了取出周期任務的方法,但是該抽象類并沒有具體執(zhí)行周期任務的實現(xiàn)。


          3.3 SingleThreadEventExecutor

           

          該類是對AbstractScheduledEventExecutor的一個實現(xiàn),其基本上是我們最終的一個EventLoop的雛形了,很多不同協(xié)議的EventLoop都是基于它實現(xiàn)的。


           

          雖然名字叫做單線程執(zhí)行器,但是其不一定是單個線程。Executor默認使用的是ThreadPerTaskExecutor,其executor會為每一個任務創(chuàng)建一個線程并執(zhí)行,當然你也可以傳入自己的executor。Queue使用的是LinkedBlockingQueue,無容量限制的任務隊列。其提供了添加任務到任務隊列,從任務隊列中獲取任務的方法。


          protected boolean runAllTasks() {
              assert inEventLoop();
              boolean fetchedAll;
              boolean ranAtLeastOne = false;
           
              do {
                  fetchedAll = fetchFromScheduledTaskQueue();
                  if (runAllTasksFrom(taskQueue)) {
                      ranAtLeastOne = true;
                  }
              } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
           
              if (ranAtLeastOne) {
                  lastExecutionTime = ScheduledFutureTask.nanoTime();
              }
              afterRunningAllTasks();
              return ranAtLeastOne;
          }
           
          protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
              Runnable task = pollTaskFrom(taskQueue);
              if (task == null) {
                  return false;
              }
              for (;;) {
                  safeExecute(task);
                  task = pollTaskFrom(taskQueue);
                  if (task == null) {
                      return true;
                  }
              }
          }


          執(zhí)行過程如上:1.先獲取所有的周期任務,放入taskQueue;2.不斷的執(zhí)行taskQueue中的任務;3.afterRunningAllTasks就是一個自由發(fā)揮的方法。safeExecute就是直接執(zhí)行run方法。


          private void doStartThread() {
              assert thread == null;
              executor.execute(new Runnable() {
                  @Override
                  public void run() {
                      thread = Thread.currentThread();
                      if (interrupted) {
                          thread.interrupt();
                      }
           
                      boolean success = false;
                      updateLastExecutionTime();
                      try {
                          SingleThreadEventExecutor.this.run();
                          success = true;
                      } catch (Throwable t) {
                          logger.warn("Unexpected exception from an event executor: ", t);
                      } finally {
                          for (;;) {
                              int oldState = state;
                              if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                      SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                                  break;
                              }
                          }
           
                          // Check if confirmShutdown() was called at the end of the loop.
                          if (success && gracefulShutdownStartTime == 0) {
                              logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                      SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                      "before run() implementation terminates.");
                          }
           
                          try {
                              // Run all remaining tasks and shutdown hooks.
                              for (;;) {
                                  if (confirmShutdown()) {
                                      break;
                                  }
                              }
                          } finally {
                              try {
                                  cleanup();
                              } finally {
                                  STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                                  threadLock.release();
                                  if (!taskQueue.isEmpty()) {
                                      logger.warn(
                                              "An event executor terminated with " +
                                                      "non-empty task queue (" + taskQueue.size() + ')');
                                  }
           
                                  terminationFuture.setSuccess(null);
                              }
                          }
                      }
                  }
              });
          }


          上面是該Executor初始化過程,run方法又是交給子類進行初始化了。


          public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
              if (quietPeriod < 0) {
                  throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");
              }
              if (timeout < quietPeriod) {
                  throw new IllegalArgumentException(
                          "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
              }
              if (unit == null) {
                  throw new NullPointerException("unit");
              }
           
              if (isShuttingDown()) {
                  return terminationFuture();
              }
           
              boolean inEventLoop = inEventLoop();
              boolean wakeup;
              int oldState;
              for (;;) {
                  if (isShuttingDown()) {
                      return terminationFuture();
                  }
                  int newState;
                  wakeup = true;
                  oldState = state;
                  if (inEventLoop) {
                      newState = ST_SHUTTING_DOWN;
                  } else {
                      switch (oldState) {
                          case ST_NOT_STARTED:
                          case ST_STARTED:
                              newState = ST_SHUTTING_DOWN;
                              break;
                          default:
                              newState = oldState;
                              wakeup = false;
                      }
                  }
                  if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
                      break;
                  }
              }
              gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
              gracefulShutdownTimeout = unit.toNanos(timeout);
           
              if (oldState == ST_NOT_STARTED) {
                  try {
                      doStartThread();
                  } catch (Throwable cause) {
                      STATE_UPDATER.set(this, ST_TERMINATED);
                      terminationFuture.tryFailure(cause);
           
                      if (!(cause instanceof Exception)) {
                          // Also rethrow as it may be an OOME for example
                          PlatformDependent.throwException(cause);
                      }
                      return terminationFuture;
                  }
              }
           
              if (wakeup) {
                  wakeup(inEventLoop);
              }
           
              return terminationFuture();
          }


          上面是一個優(yōu)雅停機的過程,改變該Executor的狀態(tài)成ST_SHUTTING_DOWN,這里要注意addTask的時候只有shutdown狀態(tài)才會拒絕,所以此時這里的邏輯還不會拒絕新任務添加,然后返回了一個terminationFuture,這里不做介紹。


          3.4 SingleThreadEventLoop

           

          此類繼承自上面講解的SingleThreadEventExecutor,這里多了一個tailTask隊列,用于每次事件循環(huán)后置任務處理,暫且不管。重要的在于很早提到了register方法,將channel注冊到線程中。


          public ChannelFuture register(Channel channel) {
              return register(new DefaultChannelPromise(channel, this));
          }
           
          public ChannelFuture register(final ChannelPromise promise) {
              ObjectUtil.checkNotNull(promise, "promise");
              promise.channel().unsafe().register(this, promise);
              return promise;
          }


          實際上就是生成了一個DefaultChannelPromise,將channel和線程綁定,最后都放入了unsafe對象中。


          3.5 NioEventLoop


          上面講了一些雜亂無章的內(nèi)容,這里借助NioEventLoop好好梳理一下整個設計流程。NioEventLoop繼承自SingleThreadEventLoop,先對之前的相關研究進行總結(jié):


          1.EventExecutorGroup接口是繼承自Java的周期任務接口,是一個事件處理器組的概念,其相關方法有:是否正在關閉;優(yōu)雅停機;獲取一個事件處理器;提交一個任務;提交一個周期任務

            

          2.EventExecutor接口是事件處理器,其繼承自EventExecutorGroup,目的并不是說每一個事件處理器都是一個事件處理器組,而是為了復用接口定義的方法。每個處理器都應該具備優(yōu)雅停機,提交任務,判斷是否關閉的方法,其它方法有:獲取處理器組;獲取下一個處理器(覆蓋了父接口的next方法);判斷是否在事件循環(huán)內(nèi);創(chuàng)建Promise、ProgressivePromise、SucceededFuture、FailedFuture

            

          3.EventLoopGroup繼承自EventExecutorGroup,更新了父接口的含義,EventExecutor 的定位是處理單個事件,group就是處理事件組了。EventLoop的定位是處理一個連接的生命周期過程中的周期事件,group是多個EventLoop的集合了。這里又有一個尷尬的地方,group按照定義本不需要定義其它方法,但是由于Server端的設計(之前說過服務端的channel也是一個線程),使用的是group,所以group必須承擔單個EventLoop的職責。最終添加了額外的方法:獲取下一個EventLoop;注冊channel;

            

          4.EventLoop,事件循環(huán),其也是一個處理器,最終繼承自EventExecutor和EventLoopGroup,方法只有一個:獲取父事件循環(huán)組EventLoopGroup。

           

          上述接口光看名稱很容易陷入誤解,實際上定義是想將單個loop和group分離,但是實現(xiàn)上由于Server端包含一個服務端監(jiān)聽連接線程,一個客戶端連接線程,其Group承擔了單個的職責,所以定義了一些本該由單個執(zhí)行器處理的方法,又為了復用方法,導致loop繼承了group,這樣看起來怪怪的,接口理解起來就混亂了。結(jié)合上面的描述,再看一遍繼承圖就更清楚了:


           

          理解了上面的設定,我們再來看看客戶端的事件處理是如何設計的,即總結(jié)上訴抽象類做了哪些事情。


          1. AbstractEventExecutor:入?yún)⒕鸵粋€parent,該類完成了一個基本處理:


          a.將next設置成自己(上面說過繼承的group,這個操作就和group區(qū)分開了)。

          b.優(yōu)雅停機調(diào)用的是帶有超時的停機方案,超時為15秒

          c.覆蓋了Java提供的newTask包裝成FutureTask的方法,使用了自己的PromiseTask

          d.提供安全執(zhí)行方法:safeExecute,直接調(diào)用的run方法

             

          該類是最基礎的一個抽象類,基本作用就是與group在定義混亂上做了一個區(qū)分。提供了執(zhí)行器與Future關聯(lián)方法和一個基本的執(zhí)行任務的方法。

            

          2.AbstractScheduledEventExecutor:入?yún)⒁彩且粋€parent,該類對AbstractEventExecutor未處理的周期任務提供了具體的完成方法:

              

          a.提供計算當前距服務啟動的時間

          b.提供存儲ScheduledFutureTask的優(yōu)先隊列

          c.提供了取消所有周期任務的方法

          d.提供了獲取一個符合的周期任務的方法,要滿足時間,并獲取后移除

          e.提供了獲取距最近一個周期任務的時間多久

          f.提供了移除一個周期任務的方法

          g.提供添加周期任務的方法

             

          該類提供了周期任務執(zhí)行的一些基本方法,涉及添加周期任務,移除,獲取等方法。

            

          3??.SingleThreadEventExecutor:入?yún)╬arent,addTaskWakesUp標志,maxPendingTasks最大任務隊列數(shù)(16和io.netty.eventexecutor.maxPendingTasks(default Integer.MAX_VALUE)參數(shù)更大的那個值),executor執(zhí)行器(默認是ThreadPerTaskExecutor,每個任務創(chuàng)建一個線程執(zhí)行),taskQueue任務隊列(默認是LinkedBlockingQueue),rejectedExecutionHandler拒絕任務的處理類(默認直接拋出RejectedExecutionException)。該類主要完成了一個單線程的EventExecutor的基本操作:


          a.創(chuàng)建一個taskQueue

          b.中斷線程

          c.從任務隊列中獲取一個任務,takeTask連同周期任務也會獲取

          d.添加任務到任務隊列

          e.移除任務隊列中指定任務

          f.運行所有任務,會先將周期任務存入taskQueue,再使用safeExecute方法執(zhí)行任務

          g.實現(xiàn)了execute方法,會添加任務到任務隊列,如果當前線程不是事件循環(huán)線程,開啟一個線程。通過的就是持有的executor來開啟的線程任務。execute方法調(diào)用了run方法,該類沒有實現(xiàn)run方法。任務的添加都不是通過execute直接執(zhí)行了,而是走的添加任務到taskQueue,由未實現(xiàn)的run線程來處理這些事件。

          h.優(yōu)雅停機

             

          這樣就有了一個基礎的單線程模型了,開啟線程,保存,取出任務的方法都有了,只有在開啟線程中執(zhí)行任務的run()方法還未實現(xiàn)。

            

          4.SingleThreadEventLoop:入?yún)⒑蚐ingleThreadEventExecutor一致,不同的是多了一個tailTasks。該類主要是針對netty自身的事件循環(huán)的定義來實現(xiàn)方法了:

              

          a.注冊channel,實際上是生成了一個DefaultChannelPromise對象,持有了channel,和運行該channel的EventExecutor,然后將該對象交給最底層的unsafe處理。

          b.添加一個事件周期結(jié)束后執(zhí)行的尾任務tailTasks

          c.執(zhí)行尾任務

          d.刪除指定尾任務

             

          該類就很簡單,沒有過多的內(nèi)容,只是增加了一個每個事件周期后執(zhí)行的任務而已。

           

          回顧完了,上面4個父類構(gòu)建了一個基本的帶定時任務,普通任務,事件循環(huán)后置任務的EventLoop,每個channel綁定了一個線程執(zhí)行器,通過DefaultChannelPromise持有兩者,最終交給Unsafe操作。子類只需要實現(xiàn)run方法,處理任務隊列中的任務。下面就是重頭戲NioEventLoop這個客戶端的線程是如何設計的了:


          NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                       SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
              super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
              if (selectorProvider == null) {
                  throw new NullPointerException("selectorProvider");
              }
              if (strategy == null) {
                  throw new NullPointerException("selectStrategy");
              }
              provider = selectorProvider;
              final SelectorTuple selectorTuple = openSelector();
              selector = selectorTuple.selector;
              unwrappedSelector = selectorTuple.unwrappedSelector;
              selectStrategy = strategy;
          }


          調(diào)用的就是父類方法,不過在創(chuàng)建EventLoop的時候創(chuàng)建了selector,這個是NIO中也提到過的。該EventLoop是在Group中newChild創(chuàng)建的。


          protected void run() {
              for (;;) {
                  try {
                      switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                          case SelectStrategy.CONTINUE:
                              continue;
                          case SelectStrategy.SELECT:
                              select(wakenUp.getAndSet(false));
                              if (wakenUp.get()) {
                                  selector.wakeup();
                              }
                          default:
                      }
                      cancelledKeys = 0;
                      needsToSelectAgain = false;
                      final int ioRatio = this.ioRatio;
                      if (ioRatio == 100) {
                          try {
                              processSelectedKeys();
                          } finally {
                              runAllTasks();
                          }
                      } else {
                          final long ioStartTime = System.nanoTime();
                          try {
                              processSelectedKeys();
                          } finally {
                              final long ioTime = System.nanoTime() - ioStartTime;
                              runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                          }
                      }
                  } catch (Throwable t) {
                      handleLoopException(t);
                  }
                  try {
                      if (isShuttingDown()) {
                          closeAll();
                          if (confirmShutdown()) {
                              return;
                          }
                      }
                  } catch (Throwable t) {
                      handleLoopException(t);
                  }
              }
          }


          上面是我們需要關注的run方法,該方法被單獨的線程執(zhí)行。通過一個策略來判斷執(zhí)行哪個,默認的策略是任務隊列中有任務,select執(zhí)行一下,返回當前的事件數(shù),沒任務返回SelectStrategy.SELECT。簡單的說就是有任務就補充新到來的事件執(zhí)行所有的任務,沒任務就執(zhí)行新到的事件。先處理IO讀寫任務,再處理其他任務,ioRation設置的耗時比例是IO任務占一個執(zhí)行周期的百分比,默認50,意思是IO執(zhí)行了50秒,其他任務也會得到50秒的執(zhí)行時間。后續(xù)操作就是獲取所有select的key,執(zhí)行所有的任務了。這里就有一個判斷如果是停機狀態(tài),就會closeAll(),之前說優(yōu)雅停機的時候就是設置了一個這個標志,最后是在執(zhí)行任務之后判斷。processSelectedKey環(huán)節(jié)都交給unsafe類完成了,這里就掛上了handler相關觸發(fā),handler的執(zhí)行也就說明都在該線程內(nèi)了。

           

          上面的描述雖然把整個過程都關聯(lián)上了,但是最主要的問題還是混亂的:如何做到一個channel創(chuàng)建一個線程的?上面只是說明了channel和EventExecutor是綁定在DefaultChannelPromise并交給了Unsafe類,并沒有看到是如何創(chuàng)建線程的。而且另一個問題在于,processSelectedKey是選擇了所有的key,這不是所有的channel共享了一個線程嗎?

           

          要解決該問題要回到Bootstrap的channel建立過程:initAndRegister()方法中,通過channelFactory創(chuàng)建了一個channel對象,而后ChannelFuture regFuture = config().group().register(channel);主要就是觀察register方法,該類是設置的線程池NioEventLoopGroup提供的方法,其繼承的是MultithreadEventLoopGroup,是調(diào)用了next方法獲取的EventLoop,最后接上上面channel和eventloop綁定的內(nèi)容。next中獲取的EventLoop早在類初始化的時候就生成了,在構(gòu)造方法中MultithreadEventExecutorGroup,children就是Eventloop,next不過是挑選了一個線程池而已,默認數(shù)量是CPU核數(shù)的2倍。這個也就是前面說的,線程數(shù)量不是越多越好。

           

          這樣我們明白了,客戶端注冊的時候是分配了一個線程給它。客戶端并不需要多線程,但是還是繼續(xù)看后面的內(nèi)容:AbstractUnsafe的register方法給出了相關解答。channel持有了該EventLoop,此時線程還是未運行狀態(tài),只是有了這么一個對象而已。


          if (eventLoop.inEventLoop()) {
              register0(promise);
          } else {
              try {
                  eventLoop.execute(new Runnable() {
                      @Override
                      public void run() {
                          register0(promise);
                      }
                  });
              } catch (Throwable t) {
                  logger.warn(
                          "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                          AbstractChannel.this, t);
                  closeForcibly();
                  closeFuture.setClosed();
                  safeSetFailure(promise, t);
              }
          }


          這里execute方法,這個是之前我們講過的一個方法,其會判斷當前線程是否是該channel的線程,目前都沒有初始化線程肯定不是,將任務放入任務隊列,開啟一個線程(線程處于未啟動狀態(tài)才會開啟,否則不執(zhí)行),這個設計使得execute的時候只會開啟一次線程,而所有的任務都會被放入任務隊列,由這個線程執(zhí)行。再回到run方法,這個是channel線程執(zhí)行的方法,目前每個NioEventLoop都執(zhí)行了Select等方法啊,這不是處理了所有的channel的工作嗎?并沒有達到一個channel生命周期控制在一個線程中啊。

           

          這里實際上是JAVA NIO的例子帶來的誤解,認為必須一個線程來使用select,然后遍歷事件分配線程給channel執(zhí)行讀寫操作。實際上在Netty中不一樣,Netty所有線程都在執(zhí)行select并獲取相關事件,但是實際上其并沒有執(zhí)行所有的事件。


          private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
              final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
              if (!k.isValid()) {
                  final EventLoop eventLoop;
                  try {
                      eventLoop = ch.eventLoop();
                  } catch (Throwable ignored) {
                      return;
                  }
                  
                  if (eventLoop != this || eventLoop == null) {
                      return;
                  }
                   
                  unsafe.close(unsafe.voidPromise());
                  return;
              }
           
              try {
                  int readyOps = k.readyOps();
                   
                  if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                      int ops = k.interestOps();
                      ops &= ~SelectionKey.OP_CONNECT;
                      k.interestOps(ops);
           
                      unsafe.finishConnect();
                  }
           
                  if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                      ch.unsafe().forceFlush();
                  }
           
                  if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                      unsafe.read();
                  }
              } catch (CancelledKeyException ignored) {
                  unsafe.close(unsafe.voidPromise());
              }


          看這里,if (eventLoop != this || eventLoop == null) ,如果channel的eventLoop不是當前的eventLoop就不執(zhí)行,這個就一小段代碼,但是就直接決定了EventLoop只對自己所綁定的channel感興趣,最終達到了只處理自己相關的任務的目的。


          3.6 NioEventLoopGroup


          該類沒有太多需要說明的內(nèi)容,前面已經(jīng)講解了很多。


          1.AbstractEventExecutorGroup,實現(xiàn)了基本的方法,所有方法都是調(diào)用next()即挑選出一個線程執(zhí)行器完成的。

            

          2.MultithreadEventExecutorGroup,實現(xiàn)了一個基本的線程池,持有子線程,主要工作是初始化了子線程數(shù)組,提供了next方法。

            

          3.MultithreadEventLoopGroup,實現(xiàn)了Netty的channel線程池,提供了register方法,雖然也是調(diào)用next的register方法。

            

          4.NioEventLoopGroup,實現(xiàn)了創(chuàng)建子線程數(shù)組時newChild方法,所有的EventLoop都是這個方法創(chuàng)建的。

           

          group就是兩個重點,一個next()挑選事件執(zhí)行器,一個newChild()創(chuàng)建線程執(zhí)行器對象。


          4.總結(jié)


          本節(jié)耗費了大量的篇幅講解了Netty的線程模型的設計思路,主要看點如下:


          1.解釋了EventLoop、EventLoopGroup、EventExecutor、EventExecutorGroup這四者之間的關系,和這么復雜混亂的繼承關系的原因。

          2.解釋了group是如何初始化線程,并綁定channel的,next().register()

          3.解釋了eventloop為什么和channel綁定了,execute()開啟線程,以及每個eventloop都在獲取IO事件,但是通過channel的eventloop是否等于當前過濾掉其它的事件,只處理自己綁定的channel事件。

           

          由于每個線程都在獲取IO事件,所以這段邏輯變的非常復雜,這也就是我之前說的寫好很IO很困難。

           

          最后附上一張對前面所有內(nèi)容總結(jié)的一個圖,清醒一下頭腦,從復雜的代碼中脫身:


           

          這個圖就是一個基本的執(zhí)行過程圖了,可能有遺漏的地方,但是大體情況如圖所示。



          出處:cnblogs.com/lighten/p/8967630.html










          關注GitHub今日熱榜,專注挖掘好用的開發(fā)工具,致力于分享優(yōu)質(zhì)高效的工具、資源、插件等,助力開發(fā)者成長!







          點個在看,你最好看


          瀏覽 63
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产avtt | 91日逼视频 | 久久夜天天天天 | 大香五月婷婷 | 国内精品久久久久久久星 |