<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Netty 源碼深度解析-EventLoop(1):EventLoop的構(gòu)造

          共 21805字,需瀏覽 44分鐘

           ·

          2021-05-23 09:18


          -     前言    -


          本文源碼地址:netty-source-code-analysis

          本文所使用的netty版本4.1.6.Final:帶注釋的netty源碼

          EventLoop在netty中發(fā)揮著驅(qū)動引擎的作用,本文我們以NioEventLoopGroupNioEventLoop為例著重分析一下EventLoopGroupEventLoop的創(chuàng)建、一些重要的數(shù)據(jù)結(jié)構(gòu)和netty的一些優(yōu)化。



          -     NioEventLoopGroup    -


          咱們以NioEventLoopGroup為例進(jìn)行分析。NioEventLoopGroup有很多構(gòu)造方法,咱們不再一一貼出,只貼出2個關(guān)鍵的構(gòu)造方法。

          NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider)該構(gòu)造方法給出了SelectStrategyFactory的默認(rèn)值為DefaultSelectStrategyFactory.INSTANCEEventLoop在每次循環(huán)時需要調(diào)用該類的calculateStrategy方法來決定循環(huán)的策略,具體咱們后邊講。

          我們通過new NioEventLoopGroup()或者new NioEventLoopGroup(32)創(chuàng)建EventLoopGroup時最終都會調(diào)用到這個構(gòu)造方法。接著又調(diào)用了另一個構(gòu)造方法,咱們跟下去。

          public NioEventLoopGroup(
                  int nThreads, Executor executor, final SelectorProvider selectorProvider)
           
          {
              this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
          }

          這里調(diào)用了父類 MultithreadEventLoopGroup的構(gòu)造方法,咱們繼續(xù)跟下去。

          public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                                   final SelectStrategyFactory selectStrategyFactory) {
              super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
          }

          MultithreadEventLoopGroup的構(gòu)造方法如下,這里給出了nThreads的默認(rèn)值,為MultithreadEventLoopGroup的靜態(tài)屬性DEFAULT_EVENT_LOOP_THREADS。接著調(diào)用父類MultithreadEventExecutorGroup的構(gòu)造方法。

          protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
              super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
          }

          DEFAULT_EVENT_LOOP_THREADS的賦值在MultithreadEventLoopGroup的靜態(tài)代碼塊中,我們看到該值默認(rèn)為cpu核數(shù)×2。

          static {
              DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                      "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
              

          咱們接著跟到MultithreadEventExecutorGroup的構(gòu)造方法,這里繼續(xù)調(diào)用本類的構(gòu)造方法MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args),并且為executor賦默認(rèn)值。

          protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
              this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
          }

          executor的默認(rèn)值為ThreadPerTaskExecutor,顧名思義就是為每一個任務(wù)創(chuàng)建一個線程,我們看一下它的實現(xiàn),代碼如下,execute方法中為每一個任務(wù)創(chuàng)建了一個線程。

          public final class ThreadPerTaskExecutor implements Executor {

              @Override
              public void execute(Runnable command) {
                  threadFactory.newThread(command).start();
              }
          }

          我們繼續(xù)跟到MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args),這里接著調(diào)用另一個構(gòu)造方法MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args)并且為chooserFacotry賦默認(rèn)值,chooserFactory咱們在“服務(wù)端的啟動流程”和“客戶端的啟動流程”中均有涉及,作用是在為Channel綁定EventLoop時輪詢地從EventLoopGroup里選擇EventLoop,這里不再贅述。

          protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
              this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
          }

          接著看下一個構(gòu)造方法MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args)這里就是創(chuàng)建EventLoopGroup的核心邏輯了

          首先創(chuàng)建了一個EventExecutor數(shù)組,并賦值給children屬性,數(shù)據(jù)長度和nThreads相等,很容易想到,這里應(yīng)該就是保存EventLoop的數(shù)組了。

          緊接著循環(huán)調(diào)用newChild方法為數(shù)組的每一個元素賦值。

          最后調(diào)用chooserFactory.newChooser(children)chooser賦值。

           protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                      EventExecutorChooserFactory chooserFactory, Object... args) {

                  children = new EventExecutor[nThreads];

                  for (int i = 0; i < nThreads; i ++) {
                      boolean success = false;
                      try {
                          children[i] = newChild(executor, args);
                          success = true;
                      } catch (Exception e) {
                        
                      } finally {
                      }
                  }
                  chooser = chooserFactory.newChooser(children);
          }

          我們跟著看一下newChild方法,該方法為抽象的,這里newChild方法的實現(xiàn)在NioEventLoopGroup中。好了,到這里咱們看到了NioEventLoop的構(gòu)造方法,繼續(xù)跟進(jìn)去。

          protected EventLoop newChild(Executor executor, Object... args) throws Exception {
              return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                  ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
          }



          -     NioEventLoop    -


          來看NioEventLoop的構(gòu)造方法,這里NioEventLoop保存了3個屬性。

          • providerselector的工廠類,從provider可以得到selector
          • selector: 調(diào)用provider.openSelector()得到的selector,這里netty做了優(yōu)化,咱們后邊講。
          • selectStrategy:這個的calculateStrategy方法返回值,決定了EventLoop的下一步動作,咱們也是后邊講。
          NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                       SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
              super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
              if (selectorProvider == null) {
                  throw new NullPointerException("selectorProvider");
              }
              if (strategy == null) {
                  throw new NullPointerException("selectStrategy");
              }
              provider = selectorProvider;
              selector = openSelector();
              selectStrategy = strategy;
          }

          繼續(xù)跟到父類SingleThreadEventLoop的構(gòu)造方法,這里初始化了一個屬性tailTasks,具體有什么用,咱們一會兒說,先記住這里有一個任務(wù)隊列叫tailTasks

          protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                          boolean addTaskWakesUp, int maxPendingTasks,
                                          RejectedExecutionHandler rejectedExecutionHandler) {
              super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
              tailTasks = newTaskQueue(maxPendingTasks);
          }

          繼續(xù)跟到父類SingleThreadEventExecutor的構(gòu)造方法,這里有幾個屬性賦值。

          • addTaskWakesUp:這個是用來喚醒阻塞在taskQueue上的EventLoop線程的,在NioEventLoopEventLoop不會阻塞在taskQueue上,這里用處不是很大,可以先不研究它。
          • maxPendingTasks:后面緊接著就用到了,任務(wù)隊列的最大長度。
          • executor:真正生成線程的類,默認(rèn)是ThreadPerTaskExecutor
          • taskQueue任務(wù)隊列,還記得上邊已經(jīng)有一個tailTask隊列了嗎,這里是另外一個隊列
          • rejectedExecutionHandler:拒絕策略。
          protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                              boolean addTaskWakesUp, int maxPendingTasks,
                                              RejectedExecutionHandler rejectedHandler) {
              super(parent);
              this.addTaskWakesUp = addTaskWakesUp;
              this.maxPendingTasks = Math.max(16, maxPendingTasks);
              this.executor = ObjectUtil.checkNotNull(executor, "executor");
              taskQueue = newTaskQueue(this.maxPendingTasks);
              rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
          }

          繼續(xù)跟到父類AbstractScheduledEventExecutor的構(gòu)造方法,這里什么也沒干,又繼續(xù)調(diào)用父類的構(gòu)造方法,但是我們注意到該類中也有一個隊列scheduledTaskQueue,顧名思義是定時任務(wù)隊列,只是該隊列沒有在構(gòu)造方法中初始化,等到有定時任務(wù)加入時才初始化。

          public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {

              Queue<ScheduledFutureTask<?>> scheduledTaskQueue;

              protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
                  super(parent);
              }
          }

          繼續(xù)跟到父類AbstractEventExecutor的構(gòu)造方法,這里為parent賦值,即該EventLoop所屬的EventLoopGroup

          protected AbstractEventExecutor(EventExecutorGroup parent) {
              this.parent = parent;
          }



          -     Netty 的優(yōu)化    -


          3.1 對selector的優(yōu)化

          我們回到NioEventLoop類中的openSelector方法,這個方法在干什么呢,它在通過反射替換sun.nio.ch.SelectorImpl類的兩個屬性selectedKeyspublicSelectedKeys,將這兩個屬性都替換成了SelectedSelectionKeySet類的實例。為什么要這么換呢,咱們接著往下看。

           private Selector openSelector() {
                  final Selector selector;
                  try {
                      selector = provider.openSelector();
                  } catch (IOException e) {
                  }

                  final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

                  Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                      @Override
                      public Object run() {
                          try {
                              return Class.forName(
                                      "sun.nio.ch.SelectorImpl",
                                      false,
                                      PlatformDependent.getSystemClassLoader());
                          } catch (ClassNotFoundException e) {
                          } catch (SecurityException e) {
                          }
                      }
                  });

                  final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;

                  Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                      @Override
                      public Object run() {
                          try {
                              Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                              Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                              selectedKeysField.setAccessible(true);
                              publicSelectedKeysField.setAccessible(true);

                              selectedKeysField.set(selector, selectedKeySet);
                              publicSelectedKeysField.set(selector, selectedKeySet);
                              return null;
                          } catch (NoSuchFieldException e) {
           
                          } catch (IllegalAccessException e) {

                          } catch (RuntimeException e) {
                             
                          }
                      }
                  });
                  if (maybeException instanceof Exception) {

                  } else {
                      //將selectedKeySet保存到selectedKeys屬性中
                      selectedKeys = selectedKeySet;
                  }

                  return selector;
              }

          sun.nio.ch.SelectorImpl類中的selectedKeyspublicSelectedKeys的作用是保存有興趣事件發(fā)生的Selectionkey,其中publicSelectedKeysselectedKeys的包裝類,Util.ungrowableSet(this.selectedKeys),看方法名ungrowableSet表示包裝之后這個set就不能添加元素了,但是可以刪除元素。也就是說jdk內(nèi)部使用selectedKeys(注意這是個protected字段)進(jìn)行添加和刪除操作,而暴露給用戶的是publicSelectedKeys只能進(jìn)行刪除操作(看selectedKeys方法)。

          默認(rèn)實現(xiàn)用的是HashSet

          public abstract class SelectorImpl extends AbstractSelector {
              protected Set<SelectionKey> selectedKeys = new HashSet();
              protected HashSet<SelectionKey> keys = new HashSet();
              private Set<SelectionKey> publicKeys;
              private Set<SelectionKey> publicSelectedKeys;

              protected SelectorImpl(SelectorProvider var1) {
                  super(var1);
                  if (Util.atBugLevel("1.4")) {
                      this.publicKeys = this.keys;
                      this.publicSelectedKeys = this.selectedKeys;
                  } else {
                      this.publicKeys = Collections.unmodifiableSet(this.keys);
                      this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
                  }

              }
              
              public Set<SelectionKey> selectedKeys() {
                  if (!this.isOpen() && !Util.atBugLevel("1.4")) {
                      throw new ClosedSelectorException();
                  } else {
                      return this.publicSelectedKeys;
                  }
              }
          }

          再來看看SelectedSelectionKeySet,這里用的數(shù)據(jù)結(jié)構(gòu)是數(shù)組,很顯然netty這里的優(yōu)化是因為數(shù)組的元素添加和遍歷操作比HashSet更快。有同學(xué)對這里有兩個數(shù)組的存在表示疑問,大家不用操心這個問題了,因為我也沒看懂為什么會有兩個數(shù)組,還要交替使用。netty在后來的版本中做了優(yōu)化,只用一個數(shù)組就實現(xiàn)了。所以這里只要知道netty用數(shù)組進(jìn)行優(yōu)化就可以了。

          那么又一個問題來了,為什么jdk里邊不用數(shù)組或者List,而用HashSet呢,那是因為selectorselect方法每次調(diào)用都會把已經(jīng)準(zhǔn)備好的SelectionKey放入selectedKeys中,如果用戶在第一次調(diào)用select方法之后沒有處理相應(yīng)Channel的事件的,也沒有刪除selectedKeys中的元素,那么再次調(diào)用select之后,相同的SelectionKey會再次加入selectedKeys中,如果selectedKeys使用數(shù)組或者List實現(xiàn)將起不到去重的效果。

          另一方面,如果使用List或者數(shù)組,刪除成本也比較高。

          而netty的實現(xiàn)中保證不會在連續(xù)調(diào)用兩次select方法之間不刪除selectedKeys中的元素,而且netty直接將selectedKeys暴露出來,在刪除的時候可以直接將數(shù)據(jù)中對應(yīng)索引的元素設(shè)置為null。

          final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

              private SelectionKey[] keysA;
              private int keysASize;
              private SelectionKey[] keysB;
              private int keysBSize;
              private boolean isA = true;
          }

          3.2 對任務(wù)隊列的優(yōu)化

          SingleThreadEventExecutor的構(gòu)造方法中taskQueue屬性通過調(diào)用newTaskQueue方法產(chǎn)生,newTaskQueue方法在NioEventLoop中被覆蓋了,我們看一下。

          這里被優(yōu)化成了MpscQueue,全稱是MultiProducerSingleConsumerQueue,即多生產(chǎn)者單消費(fèi)者隊列,感興趣的同學(xué)自己去看一下,既然單獨(dú)做一個這樣的隊列出來,在當(dāng)前場景下自然是比BlockingQueue性能更好了。

          為什么可以做這樣的優(yōu)化呢,很顯然,這里的隊列消費(fèi)者只有一個那就是EventLoop,而生產(chǎn)者會有多個。并且,這里有一行注釋This event loop never calls takeTask(),這就說明這個MultiProducerSingleConsumerQueue并沒有阻塞的take方法,而正好NioEventLoop也不需要調(diào)用阻塞的take方法,正好適合。

          @Override
          protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
              // This event loop never calls takeTask()
              return PlatformDependent.newMpscQueue(maxPendingTasks);
          }

          同樣的,tailTask這個隊列也被優(yōu)化成了MpscQueue。我們看一下scheduledTaskQueue是什么隊列,答案在AbstractScheduledEventExecutor#scheduledTaskQueue方法中,我們看到scheduledTaskQueue僅僅是一個普通的優(yōu)先級隊列,甚至都不是一個線程安全的阻塞隊列。

          Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
              if (scheduledTaskQueue == null) {
                  scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
              }
              return scheduledTaskQueue;
          }

          為什么呢,為什么tailTaskstaskQueueMpscQueue,而scheduledTaskQueue是非線程安全的隊列呢?可能是因為沒有的帶優(yōu)先級功能的MpsQueue吧。

          scheduledTaskQueue是非線程安全的隊列,會不會有多線程安全問題呢,答案是不會。我們看一下AbstractScheduledEventExecutor#schedule(io.netty.util.concurrent.ScheduledFutureTask<V>)方法,這里在添加定時任務(wù)時,如果是非EventLoop線程調(diào)用,則會發(fā)起一個異步調(diào)用,最終往scheduledTaskQueue添加定時任務(wù)的還是EventLoop線程。所以呢,這里又有另一個優(yōu)化,那就是可以延遲初始化scheduledTaskQueue



          -     總結(jié)    -


          畫重點(diǎn)來了,本文的重點(diǎn)就這兩個。

          • 每個EventLoop中有3個隊列,分別是tailTaskstaskQueuescheduledTaskQueue。而且tailTaskstaskQueue隊列在NioEventLoop中的被優(yōu)化為MultiProducerSingleConsumerQueue
          • netty對selector中的selectedKeys做了優(yōu)化,從HashSet替換為SelectedSelectionKeySetSelectedSelectionKeySet是用數(shù)組實現(xiàn)的Set,添加元素和遍歷效率更高。


          作者:王建新,轉(zhuǎn)轉(zhuǎn)架構(gòu)部資深Java工程師,主要負(fù)責(zé)服務(wù)治理、RPC框架、分布式調(diào)用跟蹤、監(jiān)控系統(tǒng)等。愛技術(shù)、愛學(xué)習(xí),歡迎聯(lián)系交流。

          來源公眾號:種代碼

          瀏覽 35
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲人视频在线观看 | 青青蜜臀| 99在线精品视频免费观看软件 | 99久久精品无码一区二区 | 欧美www|