<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          NioEventLoopGroup源碼解析

          共 8796字,需瀏覽 18分鐘

           ·

          2021-07-04 00:21

          有道無(wú)術(shù),術(shù)尚可求也!有術(shù)無(wú)道,止于術(shù)!


          NioEventLoopGroup的初始化源碼

          一、尋找源碼的過(guò)程

          我們前面說(shuō)到過(guò),NioEventLoopGroup我們可以近乎把它看作是一個(gè)線程池,該線程池會(huì)執(zhí)行一個(gè)一個(gè)的任務(wù),我們常用的NioEventLoopGroup大概有兩種,NioEventLoopGroup(int nThreads),NioEventLoopGroup(),即一個(gè)是指定線程數(shù)量的,一個(gè)是默認(rèn)指定線程數(shù)量的!這里我們以無(wú)參構(gòu)造為入口進(jìn)行分析!

          EventLoopGroup work = new NioEventLoopGroup();
          public NioEventLoopGroup() {
          this(0);
          }

          當(dāng)我們使用默認(rèn)的數(shù)量的時(shí)候,他會(huì)傳遞一個(gè)0,我們繼續(xù)往下跟!

          public NioEventLoopGroup(int nThreads) {
          this(nThreads, (Executor) null);
          }

          注意這里傳遞的參數(shù)是:0,null

          public NioEventLoopGroup(int nThreads, Executor executor) {
          //每個(gè) group維護(hù)一個(gè) SelectorProvider 主要用它獲取selector選擇器
          this(nThreads, executor, SelectorProvider.provider());
          }

          這里面多傳遞了一個(gè) SelectorProvider.provider(),該方法是JDK NIO提供的API主要可以獲取NIO選擇器或者Channel,如下圖:

          image-20210424220821596

          我們回歸主線繼續(xù)跟:

          public NioEventLoopGroup(
          int nThreads, Executor executor, final SelectorProvider selectorProvider)
          {
          this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
          }

          這里多傳遞了一個(gè) DefaultSelectStrategy選擇策略,這在后面講解NioEventLoop會(huì)具體講解,不做闡述!

          public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
          final SelectStrategyFactory selectStrategyFactory)
          {
          super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
          }

          我們會(huì)發(fā)現(xiàn)這里還會(huì)默認(rèn)傳遞一個(gè)拒絕策略RejectedExecutionHandlers.reject(),這個(gè)拒絕策略是干嘛的呢?

          @Override
          public void rejected(Runnable task, SingleThreadEventExecutor executor) {
          throw new RejectedExecutionException();
          }

          我們得到一個(gè)結(jié)論,當(dāng)某些條件觸發(fā)這個(gè)拒絕策略,那么他會(huì)拋出一個(gè)RejectedExecutionException異常,具體什么時(shí)候觸發(fā),后續(xù)也會(huì)詳細(xì)說(shuō)明,這里只需要記住就OK了!

          我們繼續(xù)回到主線, 這里我們開始調(diào)用父類,還記得上一節(jié)課我們分析的NioEventLoopGroup的父類是誰(shuí)嗎?沒錯(cuò)是:MultithreadEventLoopGroup, 我們會(huì)進(jìn)入到MultithreadEventLoopGroup里面:

          protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
          //線程數(shù)量為0時(shí) 使用默認(rèn)的cpu * 2
          super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
          }

          nThreads還記得是幾嗎?是0對(duì)不對(duì),這里有個(gè)判斷,當(dāng)你的線程數(shù)量為0的時(shí)候,會(huì)使用DEFAULT_EVENT_LOOP_THREADS當(dāng)作線程池的數(shù)量,DEFAULT_EVENT_LOOP_THREADS是多少呢?

          DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

          默認(rèn)是CPU的兩倍,所以我們現(xiàn)在得到一個(gè)結(jié)論,當(dāng)我們使用默認(rèn)的NioEventLoopGroup的時(shí)候,系統(tǒng)會(huì)默認(rèn)使用系統(tǒng)CPU核數(shù)*2當(dāng)作線程池的數(shù)量!

          我們上一步傳遞過(guò)來(lái)的selectorProvider、拒絕策略、selectStrategyFactory被封裝為數(shù)組,并放在args[0],args[1], args[2]的位置!

          我們繼續(xù)回到主線,這里又再次調(diào)用到父類,MultithreadEventExecutorGroup:

          protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
          this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
          }

          注意,這里又再次多傳遞了一個(gè)參數(shù):DefaultEventExecutorChooserFactory一個(gè)選擇器工廠,這里會(huì)返回一個(gè)選擇器,他是DefaultEventExecutorChooserFactory類型的,具體分析后面會(huì)分析!我們繼續(xù)回到主線:

          protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
          ..........后續(xù)源碼補(bǔ)充..........
          }

          到這里我們終于看到了一大段代碼,這里是EventLoopGroup的主要邏輯,我們逐行分析:

          二、構(gòu)建線程執(zhí)行器

          1. 源碼解析

          //newDefaultThreadFactory  構(gòu)建線程工廠
          if (executor == null) {
          //創(chuàng)建并保存線程執(zhí)行器 執(zhí)行器 執(zhí)行任務(wù)的 默認(rèn)是 DefaultThreadFactory 線程池
          executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
          }

          這里會(huì)判斷我們傳入的執(zhí)行器是否為空,否則就新建一個(gè),我們還記得executor是什么值嗎?是null,對(duì)不對(duì),所以它會(huì)進(jìn)入到這里的邏輯我們進(jìn)入到newDefaultThreadFactory源碼里面看一下:

          newDefaultThreadFactory()主要邏輯

          protected ThreadFactory newDefaultThreadFactory() {
          return new DefaultThreadFactory(getClass());
          }

          可以看到,這里向執(zhí)行器里面?zhèn)魅肓艘粋€(gè) DefaultThreadFactory一個(gè)默認(rèn)的線程工廠!

          ThreadPerTaskExecutor主要邏輯

          /**
          * io.netty.util.concurrent.DefaultThreadFactory#newThread(java.lang.Runnable)
          *
          * 執(zhí)行任務(wù) 每次執(zhí)行任務(wù)都會(huì)創(chuàng)建一個(gè)線程實(shí)體對(duì)象
          * @param command 線程
          */

          @Override
          public void execute(Runnable command) {
          //執(zhí)行任務(wù)
          threadFactory.newThread(command).start();
          }

          我們發(fā)現(xiàn),這里調(diào)用了一個(gè)我們傳入的線程工廠,創(chuàng)建了一個(gè)新的線程并調(diào)用start方法啟動(dòng)了起來(lái),那么他是如何創(chuàng)建的呢? 我們進(jìn)入到newThread源碼里面查看,由于我們默認(rèn)使用的線程工廠是 DefaultThreadFactory, 所以,我們會(huì)進(jìn)入到 DefaultThreadFactory#newThread

          @Override
          public Thread newThread(Runnable r) {
          //創(chuàng)建一個(gè)線程 每次執(zhí)行任務(wù)的時(shí)候都會(huì)創(chuàng)建一個(gè)線程實(shí)體
          Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
          try {
          if (t.isDaemon() != daemon) {
          t.setDaemon(daemon);
          }

          if (t.getPriority() != priority) {
          t.setPriority(priority);
          }
          } catch (Exception ignored) {
          // Doesn't matter even if failed to set.
          }
          return t;
          }

          這里沒有太多的操作,只是會(huì)將一個(gè) Runnable封裝為一個(gè) Thread進(jìn)行返回,我們重點(diǎn)關(guān)注一下這個(gè)Thread,它和我們傳統(tǒng)使用的Thread是一樣的嗎?  我們跟進(jìn)到 newThread方法看一下:

          protected Thread newThread(Runnable r, String name) {
          //Netty自己封裝的線程
          return new FastThreadLocalThread(threadGroup, r, name);
          }

          邏輯很簡(jiǎn)單,就是將一個(gè)Thread包裝為Netty自定義的 FastThreadLocalThread,至于為什么,我們暫時(shí)不往下多做解釋,后續(xù)章節(jié)會(huì)很詳細(xì)的解釋它!

          2. 線程執(zhí)行器總結(jié)

          這里我們會(huì)創(chuàng)建一個(gè)線程執(zhí)行器 ThreadPerTaskExecutor,使用默認(rèn)的線程工廠DefaultThreadFactory,線程執(zhí)行器會(huì)將一個(gè)任務(wù)包裝為一個(gè) FastThreadLocalThread對(duì)象,然后調(diào)用start方法開啟一個(gè)新的線程執(zhí)行任務(wù)!

          三、創(chuàng)建對(duì)應(yīng)數(shù)量的執(zhí)行器

          //創(chuàng)建執(zhí)行器數(shù)組  數(shù)量和預(yù)設(shè)線程數(shù)量一致
          children = new EventExecutor[nThreads];

          for (int i = 0; i < nThreads; i ++) {
          boolean success = false;
          try {
          //創(chuàng)建執(zhí)行器 開始創(chuàng)建執(zhí)行器 這里的執(zhí)行機(jī)估計(jì)就會(huì)EventLoop 是NioEventLoop
          children[i] = newChild(executor, args);
          success = true;
          } catch (Exception e) {
          .....省略不必要代碼
          } finally {
          .....省略不必要代碼
          }
          }

          1. 源碼解析

          children = new EventExecutor[nThreads];

          首先他會(huì)創(chuàng)建一個(gè)空的EventExecutor執(zhí)行器數(shù)組,然后遍歷填充!

          還記得 nThreads是幾嗎?  默認(rèn)是CPU*2的大小,所以這里會(huì)創(chuàng)建 CPU * 2數(shù)量的執(zhí)行器! 我們發(fā)現(xiàn),for循環(huán)中填充的主要邏輯是newChild,所以,我們進(jìn)入到 newChild方法, 這里提示一點(diǎn),我們創(chuàng)建的Group對(duì)象是一個(gè)什么對(duì)象? 是NioEventLoopGroup對(duì)象對(duì)不對(duì),所以我們這里會(huì)進(jìn)入到  NioEventLoopGroup#newChild方法:

          @Override
          protected EventLoop newChild(Executor executor, Object... args) throws Exception {
          EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
          return new NioEventLoop(this, executor, (SelectorProvider) args[0],
          ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
          }

          我們傳遞過(guò)來(lái) 的args長(zhǎng)度為3,前面做過(guò)解析 :

          args[0]為 selectorProvider、args[1]為拒絕策略、args[2]為selectStrategyFactory

          所以 queueFactory為null, 然后我們?cè)僦攸c(diǎn)關(guān)注 NioEventLoop對(duì)象,可以看出,newChild方法返回的是 NioEventLoop,那么我們初步就可以確定,EventExecutor數(shù)組里面存在的是NioEventLoop對(duì)象!至此,我們就不深究了,NioEventLoop的初始化源碼分析我會(huì)放到下一節(jié)課分析,這里我們可以確定一件事, EventExecutor數(shù)組里面存在的是NioEventLoop對(duì)象!我們繼續(xù)回到主線:

          2. 執(zhí)行器數(shù)組總結(jié)

          for循環(huán)完畢之后,此時(shí)的EventExecutor[nThreads];數(shù)組就被填充滿了,里面的每一個(gè)元素都是NioEventLoop對(duì)象,每一個(gè)NioEventLoop對(duì)象都包含一個(gè) ThreadPerTaskExecutor線程執(zhí)行器對(duì)象!

          四、創(chuàng)建一個(gè)執(zhí)行器選擇器

          1. 源碼解析

          chooser = chooserFactory.newChooser(children);

          還記得 chooserFactory是什么類型的嗎? 是DefaultEventExecutorChooserFactory類型的,忘了的可以往上翻一下尋找源碼的過(guò)程中的代碼或者調(diào)試一下!

          我們進(jìn)入到  DefaultEventExecutorChooserFactory#newChooser 源碼邏輯中,并傳入剛剛我們循環(huán)填充好的數(shù)組:

          @Override
          public EventExecutorChooser newChooser(EventExecutor[] executors) {
          //判斷2的冪 isPowerOfTwo
          if (isPowerOfTwo(executors.length)) {
          return new PowerOfTwoEventExecutorChooser(executors);
          } else {
          //簡(jiǎn)單的
          return new GenericEventExecutorChooser(executors);
          }
          }

          可以看到,這里似乎有兩種情況,返回的是不同的策略對(duì)象,當(dāng)你的數(shù)組長(zhǎng)度是2的冪等次方的時(shí)候,返回的是 PowerOfTwoEventExecutorChooser對(duì)象,否則返回  GenericEventExecutorChooser對(duì)象,我們就兩種情況全部分析一下:

          I、PowerOfTwoEventExecutorChooser
          PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
          this.executors = executors;
          }

          @Override
          public EventExecutor next() {
          //2的冪等性 實(shí)現(xiàn)這個(gè) 也能實(shí)現(xiàn)循環(huán)取數(shù)的
          //executors 就是NioEventLoop數(shù)組 按照2次冪求本次獲取的EventLoop是個(gè)啥
          return executors[idx.getAndIncrement() & executors.length - 1];
          }

          這段代碼的主要邏輯是,取一個(gè)自增的CAS類,與數(shù)組長(zhǎng)度做&運(yùn)算,最終會(huì)出現(xiàn)循環(huán)取數(shù)的結(jié)果:

          從上面的圖片可以基本看出來(lái), 該功能可以實(shí)現(xiàn)一個(gè)循環(huán)取數(shù)的功能,每次達(dá)到數(shù)組的尾部部都會(huì)重新回到頭部重新獲取!

          代碼案例:

          public static void main(String[] args) {
          String[] strings = {"第一個(gè)", "第二個(gè)", "第三個(gè)", "第四個(gè)"};
          AtomicInteger idx = new AtomicInteger();
          for (int i = 0; i < 9; i++) {
          System.out.println(strings[idx.getAndIncrement() & strings.length -1]);
          }

          }

          結(jié)果集

          第一個(gè)
          第二個(gè)
          第三個(gè)
          第四個(gè)
          第一個(gè)
          第二個(gè)
          第三個(gè)
          第四個(gè)
          第一個(gè)
          II、GenericEventExecutorChooser

          當(dāng)你的線程數(shù)量不是2的冪次方的時(shí)候,會(huì)走一個(gè)通用的選擇器,具體實(shí)現(xiàn)源碼如下:

          GenericEventExecutorChooser(EventExecutor[] executors) {
          this.executors = executors;
          }

          @Override
          public EventExecutor next() {
          //自增 取模 以達(dá)到循環(huán)的目的
          //假設(shè)executors 長(zhǎng)度為5 那么 不斷的循環(huán)就會(huì)不斷的得到 0 1 2 3 4 0 1 2 3 4。。。
          return executors[Math.abs(idx.getAndIncrement() % executors.length)];
          }

          這個(gè)代碼就不用了我做演示了吧,他的功能和上面那個(gè)功能是一樣的能  能夠達(dá)到一個(gè)循環(huán)取數(shù)的功能

          思考

          為什么Netty要分為兩個(gè)策略類來(lái)實(shí)現(xiàn)呢,直接用第二種不行嗎?

          Netty官方對(duì)性能的要求達(dá)到了極致,大家要知道位運(yùn)算速度要高于直接取模運(yùn)算的,所以Netty官方即使是這一點(diǎn)也做了一個(gè)優(yōu)化!

          2. 執(zhí)行器選擇器總結(jié)

          我們通過(guò)上述可以了解到,這里會(huì)通過(guò)一個(gè)選擇器工廠創(chuàng)建一個(gè)選擇器,并保存在NioEvenetLoopGroup中,調(diào)用該選擇器的next方法會(huì)返回一個(gè)NioEventLoop對(duì)象,其中的獲取方式是不斷的循環(huán),依次獲取NioEventLoop對(duì)象,這也是一個(gè)NioEventLoop對(duì)SocketChannel為一對(duì)多的基礎(chǔ)!這都是后話!

          NioEventLoopGroup源碼總結(jié)

          1. 創(chuàng)建一個(gè)線程執(zhí)行器,當(dāng)調(diào)用該線程執(zhí)行器的execute方法的時(shí)候,會(huì)講一個(gè)Runable對(duì)象包裝為Thread對(duì)象,再將Thread對(duì)象包裝為FastThreadLocalThread對(duì)象,然后啟動(dòng)起來(lái)!    簡(jiǎn)單來(lái)說(shuō),每調(diào)用一次execute方法,都去創(chuàng)建并啟動(dòng)一條新線程執(zhí)行任務(wù)!
          2. 創(chuàng)建一個(gè)執(zhí)行器數(shù)組,數(shù)組長(zhǎng)度與我們傳遞的數(shù)量有關(guān),默認(rèn)為CPU*2個(gè)數(shù)量,然后再循環(huán)填充這個(gè)空數(shù)組,數(shù)組里面的元素是一個(gè)NioEventLoop對(duì)象,每一個(gè)NioEventLoop對(duì)會(huì)持有一個(gè)線程執(zhí)行器的引用!
          3. 創(chuàng)建一個(gè)執(zhí)行器選擇器,調(diào)用該執(zhí)行器選擇器的next方法可以返回一個(gè)NioEventLoop對(duì)象,內(nèi)部是進(jìn)行循環(huán)取數(shù)的,每一個(gè)NioEventLoop都可能會(huì)被多次獲取!

          才疏學(xué)淺,如果文章中理解有誤,歡迎大佬們私聊指正!歡迎關(guān)注作者的公眾號(hào),一起進(jìn)步,一起學(xué)習(xí)!



          ??「轉(zhuǎn)發(fā)」「在看」,是對(duì)我最大的支持??



          瀏覽 83
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  91蜜臀在线视频免费 | www.18av | 中文无码播放 | 国产乱伦毛片 | 欧美性爱在线网站 |