<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          深入Java線程池:從設計思想到源碼解讀

          共 2620字,需瀏覽 6分鐘

           ·

          2022-01-23 17:15

          點擊關注公眾號,Java干貨及時送達??

          初識線程池

          我們知道,線程的創(chuàng)建和銷毀都需要映射到操作系統(tǒng),因此其代價是比較高昂的。出于避免頻繁創(chuàng)建、銷毀線程以及方便線程管理的需要,線程池應運而生。

          線程池優(yōu)勢

          • 降低資源消耗:線程池通常會維護一些線程(數(shù)量為 corePoolSize),這些線程被重復使用來執(zhí)行不同的任務,任務完成后不會銷毀。在待處理任務量很大的時候,通過對線程資源的復用,避免了線程的頻繁創(chuàng)建與銷毀,從而降低了系統(tǒng)資源消耗。
          • 提高響應速度:由于線程池維護了一批 alive 狀態(tài)的線程,當任務到達時,不需要再創(chuàng)建線程,而是直接由這些線程去執(zhí)行任務,從而減少了任務的等待時間。
          • 提高線程的可管理性:使用線程池可以對線程進行統(tǒng)一的分配,調優(yōu)和監(jiān)控。

          線程池設計思路

          有句話叫做藝術來源于生活,編程語言也是如此,很多設計思想能映射到日常生活中,比如面向對象思想、封裝、繼承,等等。今天我們要說的線程池,它同樣可以在現(xiàn)實世界找到對應的實體——工廠。

          先假想一個工廠的生產流程:

          工廠中有固定的一批工人,稱為正式工人,工廠接收的訂單由這些工人去完成。當訂單增加,正式工人已經忙不過來了,工廠會將生產原料暫時堆積在倉庫中,等有空閑的工人時再處理(因為工人空閑了也不會主動處理倉庫中的生產任務,所以需要調度員實時調度)。倉庫堆積滿了后,訂單還在增加怎么辦?

          工廠只能臨時擴招一批工人來應對生產高峰,而這批工人高峰結束后是要清退的,所以稱為臨時工。當時臨時工也以招滿后(受限于工位限制,臨時工數(shù)量有上限),后面的訂單只能忍痛拒絕了。

          我們做如下一番映射:

          • 工廠——線程池
          • 訂單——任務(Runnable)
          • 正式工人——核心線程
          • 臨時工——普通線程
          • 倉庫——任務隊列
          • 調度員——getTask()

          getTask()是一個方法,將任務隊列中的任務調度給空閑線程,在解讀線程池有詳細介紹

          映射后,形成線程池流程圖如下,兩者是不是有異曲同工之妙?

          這樣,線程池的工作原理或者說流程就很好理解了,提煉成一個簡圖:

          深入線程池

          那么接下來,問題來了,線程池是具體如何實現(xiàn)這套工作機制的呢?從Java線程池Executor框架體系可以看出:線程池的真正實現(xiàn)類是ThreadPoolExecutor,因此我們接下來重點研究這個類。

          構造方法

          研究一個類,先從它的構造方法開始。ThreadPoolExecutor提供了4個有參構造方法:

          public?ThreadPoolExecutor(int?corePoolSize,
          ??????????????????????????int?maximumPoolSize,
          ??????????????????????????long?keepAliveTime,
          ??????????????????????????TimeUnit?unit,
          ??????????????????????????BlockingQueue?workQueue)
          ?
          {
          ????this(corePoolSize,?maximumPoolSize,?keepAliveTime,?unit,?workQueue,
          ?????????????Executors.defaultThreadFactory(),?defaultHandler);
          }

          public?ThreadPoolExecutor(int?corePoolSize,
          ??????????????????????????int?maximumPoolSize,
          ??????????????????????????long?keepAliveTime,
          ??????????????????????????TimeUnit?unit,
          ??????????????????????????BlockingQueue?workQueue,
          ??????????????????????????ThreadFactory?threadFactory)
          ?
          {
          ????this(corePoolSize,?maximumPoolSize,?keepAliveTime,?unit,?workQueue,
          ?????????????threadFactory,?defaultHandler);
          }

          public?ThreadPoolExecutor(int?corePoolSize,
          ??????????????????????????int?maximumPoolSize,
          ??????????????????????????long?keepAliveTime,
          ??????????????????????????TimeUnit?unit,
          ??????????????????????????BlockingQueue?workQueue,
          ??????????????????????????RejectedExecutionHandler?handler)
          ?
          {
          ????this(corePoolSize,?maximumPoolSize,?keepAliveTime,?unit,?workQueue,
          ?????????????Executors.defaultThreadFactory(),?handler);
          }

          public?ThreadPoolExecutor(int?corePoolSize,
          ??????????????????????????int?maximumPoolSize,
          ??????????????????????????long?keepAliveTime,
          ??????????????????????????TimeUnit?unit,
          ??????????????????????????BlockingQueue?workQueue,
          ??????????????????????????ThreadFactory?threadFactory,
          ??????????????????????????RejectedExecutionHandler?handler)
          ?
          {
          ????if?(corePoolSize?0?||
          ????????maximumPoolSize?<=?0?||
          ????????maximumPoolSize?????????keepAliveTime?0)
          ????????throw?new?IllegalArgumentException();
          ????if?(workQueue?==?null?||?threadFactory?==?null?||?handler?==?null)
          ????????throw?new?NullPointerException();
          ????this.corePoolSize?=?corePoolSize;
          ????this.maximumPoolSize?=?maximumPoolSize;
          ????this.workQueue?=?workQueue;
          ????this.keepAliveTime?=?unit.toNanos(keepAliveTime);
          ????this.threadFactory?=?threadFactory;
          ????this.handler?=?handler;
          }

          解釋一下構造方法中涉及到的參數(shù):

          • corePoolSize(必需): 核心線程數(shù)。即池中一直保持存活的線程數(shù),即使這些線程處于空閑。但是將allowCoreThreadTimeOut參數(shù)設置為true后,核心線程處于空閑一段時間以上,也會被回收。
          • maximumPoolSize(必需): 池中允許的最大線程數(shù)。當核心線程全部繁忙且任務隊列打滿之后,線程池會臨時追加線程,直到總線程數(shù)達到maximumPoolSize這個上限。
          • keepAliveTime(必需): 線程空閑超時時間。當非核心線程處于空閑狀態(tài)的時間超過這個時間后,該線程將被回收。將allowCoreThreadTimeOut參數(shù)設置為true后,核心線程也會被回收。
          • unit(必需): keepAliveTime參數(shù)的時間單位。有:TimeUnit.DAYS(天)、TimeUnit.HOURS(小時)、TimeUnit.MINUTES(分鐘)、TimeUnit.SECONDS(秒)、TimeUnit.MILLISECONDS(毫秒)、TimeUnit.MICROSECONDS(微秒)、TimeUnit.NANOSECONDS(納秒)
          • workQueue(必需): 任務隊列,采用阻塞隊列實現(xiàn)。當核心線程全部繁忙時,后續(xù)由execute方法提交的Runnable將存放在任務隊列中,等待被線程處理。
          • threadFactory(可選): 線程工廠。指定線程池創(chuàng)建線程的方式。
          • handler(可選): 拒絕策略。當線程池中線程數(shù)達到maximumPoolSize且workQueue打滿時,后續(xù)提交的任務將被拒絕,handler可以指定用什么方式拒絕任務。

          放到一起再看一下:

          任務隊列

          使用ThreadPoolExecutor需要指定一個實現(xiàn)了BlockingQueue接口的任務等待隊列。在ThreadPoolExecutor線程池的API文檔中,一共推薦了三種等待隊列,它們是:SynchronousQueue、LinkedBlockingQueue和ArrayBlockingQueue;

          • SynchronousQueue: 同步隊列。這是一個內部沒有任何容量的阻塞隊列,任何一次插入操作的元素都要等待相對的刪除/讀取操作,否則進行插入操作的線程就要一直等待,反之亦然。
          • LinkedBlockingQueue: 無界隊列(嚴格來說并非無界,上限是Integer.MAX_VALUE),基于鏈表結構。使用無界隊列后,當核心線程都繁忙時,后續(xù)任務可以無限加入隊列,因此線程池中線程數(shù)不會超過核心線程數(shù)。這種隊列可以提高線程池吞吐量,但代價是犧牲內存空間,甚至會導致內存溢出。另外,使用它時可以指定容量,這樣它也就是一種有界隊列了。
          • ArrayBlockingQueue: 有界隊列,基于數(shù)組實現(xiàn)。在線程池初始化時,指定隊列的容量,后續(xù)無法再調整。這種有界隊列有利于防止資源耗盡,但可能更難調整和控制。

          另外,Java還提供了另外4種隊列:

          • PriorityBlockingQueue: 支持優(yōu)先級排序的無界阻塞隊列。存放在PriorityBlockingQueue中的元素必須實現(xiàn)Comparable接口,這樣才能通過實現(xiàn)compareTo()方法進行排序。優(yōu)先級最高的元素將始終排在隊列的頭部;PriorityBlockingQueue不會保證優(yōu)先級一樣的元素的排序,也不保證當前隊列中除了優(yōu)先級最高的元素以外的元素,隨時處于正確排序的位置。
          • DelayQueue: 延遲隊列。基于二叉堆實現(xiàn),同時具備:無界隊列、阻塞隊列、優(yōu)先隊列的特征。DelayQueue延遲隊列中存放的對象,必須是實現(xiàn)Delayed接口的類對象。通過執(zhí)行時延從隊列中提取任務,時間沒到任務取不出來。更多內容請見DelayQueue。
          • LinkedBlockingDeque: 雙端隊列。基于鏈表實現(xiàn),既可以從尾部插入/取出元素,還可以從頭部插入元素/取出元素。
          • LinkedTransferQueue: 由鏈表結構組成的無界阻塞隊列。這個隊列比較特別的時,采用一種預占模式,意思就是消費者線程取元素時,如果隊列不為空,則直接取走數(shù)據(jù),若隊列為空,那就生成一個節(jié)點(節(jié)點元素為null)入隊,然后消費者線程被等待在這個節(jié)點上,后面生產者線程入隊時發(fā)現(xiàn)有一個元素為null的節(jié)點,生產者線程就不入隊了,直接就將元素填充到該節(jié)點,并喚醒該節(jié)點等待的線程,被喚醒的消費者線程取走元素。

          拒絕策略

          線程池有一個重要的機制:拒絕策略。當線程池workQueue已滿且無法再創(chuàng)建新線程池時,就要拒絕后續(xù)任務了。拒絕策略需要實現(xiàn)RejectedExecutionHandler接口,不過Executors框架已經為我們實現(xiàn)了4種拒絕策略:

          • AbortPolicy(默認): 丟棄任務并拋出RejectedExecutionException異常。
          • CallerRunsPolicy: 直接運行這個任務的run方法,但并非是由線程池的線程處理,而是交由任務的調用線程處理。
          • DiscardPolicy: 直接丟棄任務,不拋出任何異常。
          • DiscardOldestPolicy: 將當前處于等待隊列列頭的等待任務強行取出,然后再試圖將當前被拒絕的任務提交到線程池執(zhí)行。

          線程工廠指定創(chuàng)建線程的方式,這個參數(shù)不是必選項,Executors類已經為我們非常貼心地提供了一個默認的線程工廠:

          /**
          ?*?The?default?thread?factory
          ?*/

          static?class?DefaultThreadFactory?implements?ThreadFactory?{
          ????private?static?final?AtomicInteger?poolNumber?=?new?AtomicInteger(1);
          ????private?final?ThreadGroup?group;
          ????private?final?AtomicInteger?threadNumber?=?new?AtomicInteger(1);
          ????private?final?String?namePrefix;

          ????DefaultThreadFactory()?{
          ????????SecurityManager?s?=?System.getSecurityManager();
          ????????group?=?(s?!=?null)???s.getThreadGroup()?:
          ??????????????????????????????Thread.currentThread().getThreadGroup();
          ????????namePrefix?=?"pool-"?+
          ??????????????????????poolNumber.getAndIncrement()?+
          ?????????????????????"-thread-";
          ????}

          ????public?Thread?newThread(Runnable?r)?{
          ????????Thread?t?=?new?Thread(group,?r,
          ??????????????????????????????namePrefix?+?threadNumber.getAndIncrement(),
          ??????????????????????????????0);
          ????????if?(t.isDaemon())
          ????????????t.setDaemon(false);
          ????????if?(t.getPriority()?!=?Thread.NORM_PRIORITY)
          ????????????t.setPriority(Thread.NORM_PRIORITY);
          ????????return?t;
          ????}
          }

          線程池狀態(tài)

          線程池有5種狀態(tài):

          volatile?int?runState;
          //?runState?is?stored?in?the?high-order?bits
          private?static?final?int?RUNNING????=?-1?<private?static?final?int?SHUTDOWN???=??0?<private?static?final?int?STOP???????=??1?<private?static?final?int?TIDYING????=??2?<private?static?final?int?TERMINATED?=??3?<

          runState表示當前線程池的狀態(tài),它是一個 volatile 變量用來保證線程之間的可見性。

          下面的幾個static final變量表示runState可能的幾個取值,有以下幾個狀態(tài):

          • RUNNING: 當創(chuàng)建線程池后,初始時,線程池處于RUNNING狀態(tài);
          • SHUTDOWN: 如果調用了shutdown()方法,則線程池處于SHUTDOWN狀態(tài),此時線程池不能夠接受新的任務,它會等待所有任務執(zhí)行完畢;
          • STOP: 如果調用了shutdownNow()方法,則線程池處于STOP狀態(tài),此時線程池不能接受新的任務,并且會去嘗試終止正在執(zhí)行的任務;
          • TERMINATED: 當線程池處于SHUTDOWN或STOP狀態(tài),并且所有工作線程已經銷毀,任務緩存隊列已經清空或執(zhí)行結束后,線程池被設置為TERMINATED狀態(tài)。

          初始化&容量調整&關閉

          1、線程初始化

          默認情況下,創(chuàng)建線程池之后,線程池中是沒有線程的,需要提交任務之后才會創(chuàng)建線程。

          在實際中如果需要線程池創(chuàng)建之后立即創(chuàng)建線程,可以通過以下兩個方法辦到:

          • prestartCoreThread():boolean prestartCoreThread(),初始化一個核心線程
          • prestartAllCoreThreads():int prestartAllCoreThreads(),初始化所有核心線程,并返回初始化的線程數(shù)
          public?boolean?prestartCoreThread()?{
          ????return?addIfUnderCorePoolSize(null);?//注意傳進去的參數(shù)是null
          }

          public?int?prestartAllCoreThreads()?{
          ????int?n?=?0;
          ????while?(addIfUnderCorePoolSize(null))//注意傳進去的參數(shù)是null
          ????????++n;
          ????return?n;
          }

          2、線程池關閉

          ThreadPoolExecutor提供了兩個方法,用于線程池的關閉:

          • shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執(zhí)行完后才終止,但再也不會接受新的任務
          • shutdownNow():立即終止線程池,并嘗試打斷正在執(zhí)行的任務,并且清空任務緩存隊列,返回尚未執(zhí)行的任務

          3、線程池容量調整

          ThreadPoolExecutor提供了動態(tài)調整線程池容量大小的方法:

          • setCorePoolSize:設置核心池大小
          • setMaximumPoolSize:設置線程池最大能創(chuàng)建的線程數(shù)目大小

          當上述參數(shù)從小變大時,ThreadPoolExecutor進行線程賦值,還可能立即創(chuàng)建新的線程來執(zhí)行任務。

          使用線程池

          ThreadPoolExecutor

          通過構造方法使用ThreadPoolExecutor是線程池最直接的使用方式,下面看一個實例:

          import?java.util.concurrent.ArrayBlockingQueue;
          import?java.util.concurrent.ThreadPoolExecutor;
          import?java.util.concurrent.TimeUnit;

          public?class?MyTest?{
          ?public?static?void?main(String[]?args)?{
          ??//?創(chuàng)建線程池
          ??ThreadPoolExecutor?threadPool?=?new?ThreadPoolExecutor(3,?5,?5,?TimeUnit.SECONDS,
          ????new?ArrayBlockingQueue(5));
          ??//?向線程池提交任務
          ??for?(int?i?=?0;?i????threadPool.execute(new?Runnable()?{
          ????@Override
          ????public?void?run()?{
          ?????for?(int?x?=?0;?x?2;?x++)?{
          ??????System.out.println(Thread.currentThread().getName()?+?":"?+?x);
          ??????try?{
          ???????Thread.sleep(2000);
          ??????}?catch?(InterruptedException?e)?{
          ???????e.printStackTrace();
          ??????}
          ?????}
          ????}
          ???});
          ??}

          ??//?關閉線程池
          ??threadPool.shutdown();?//?設置線程池的狀態(tài)為SHUTDOWN,然后中斷所有沒有正在執(zhí)行任務的線程
          ??//?threadPool.shutdownNow();?//?設置線程池的狀態(tài)為STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務的線程,并返回等待執(zhí)行任務的列表,該方法要慎用,容易造成不可控的后果
          ?}
          }

          運行結果:

          pool-1-thread-2:0
          pool-1-thread-1:0
          pool-1-thread-3:0
          pool-1-thread-2:1
          pool-1-thread-3:1
          pool-1-thread-1:1

          Executors封裝線程池

          另外,Executors封裝好了4種常見的功能線程池(還是那么地貼心):

          1、FixedThreadPool

          固定容量線程池。其特點是最大線程數(shù)就是核心線程數(shù),意味著線程池只能創(chuàng)建核心線程,keepAliveTime為0,即線程執(zhí)行完任務立即回收。任務隊列未指定容量,代表使用默認值Integer.MAX_VALUE。適用于需要控制并發(fā)線程的場景。

          //?使用默認線程工廠
          public?static?ExecutorService?newFixedThreadPool(int?nThreads)?{
          ????return?new?ThreadPoolExecutor(nThreads,?nThreads,
          ??????????????????????????????????0L,?TimeUnit.MILLISECONDS,
          ??????????????????????????????????new?LinkedBlockingQueue());
          }
          //?需要自定義線程工廠
          public?static?ExecutorService?newFixedThreadPool(int?nThreads,?ThreadFactory?threadFactory)?{
          ????return?new?ThreadPoolExecutor(nThreads,?nThreads,
          ??????????????????????????????????0L,?TimeUnit.MILLISECONDS,
          ??????????????????????????????????new?LinkedBlockingQueue(),
          ??????????????????????????????????threadFactory);
          }

          使用示例:

          //?1.?創(chuàng)建線程池對象,設置核心線程和最大線程數(shù)為5
          ExecutorService?fixedThreadPool?=?Executors.newFixedThreadPool(5);
          //?2.?創(chuàng)建Runnable(任務)
          Runnable?task?=new?Runnable(){
          ??public?void?run()?{
          ?????System.out.println(Thread.currentThread().getName()?+?"--->運行");
          ??}
          };
          //?3.?向線程池提交任務
          fixedThreadPool.execute(task);

          2、 SingleThreadExecutor

          單線程線程池。特點是線程池中只有一個線程(核心線程),線程執(zhí)行完任務立即回收,使用有界阻塞隊列(容量未指定,使用默認值Integer.MAX_VALUE

          public?static?ExecutorService?newSingleThreadExecutor()?{
          ????return?new?FinalizableDelegatedExecutorService
          ????????(new?ThreadPoolExecutor(1,?1,
          ????????????????????????????????0L,?TimeUnit.MILLISECONDS,
          ????????????????????????????????new?LinkedBlockingQueue()));
          }
          //?為節(jié)省篇幅,省略了自定義線程工廠方式的源碼

          使用示例:

          //?1.?創(chuàng)建單線程線程池
          ExecutorService?singleThreadExecutor?=?Executors.newSingleThreadExecutor();
          //?2.?創(chuàng)建Runnable(任務)
          Runnable?task?=?new?Runnable(){
          ??public?void?run()?{
          ?????System.out.println(Thread.currentThread().getName()?+?"--->運行");
          ??}
          };
          //?3.?向線程池提交任務
          singleThreadExecutor.execute(task);

          3、 ScheduledThreadPool

          定時線程池。指定核心線程數(shù)量,普通線程數(shù)量無限,線程執(zhí)行完任務立即回收,任務隊列為延時阻塞隊列。這是一個比較特別的線程池,適用于執(zhí)行定時或周期性的任務。

          public?static?ScheduledExecutorService?newScheduledThreadPool(int?corePoolSize)?{
          ????return?new?ScheduledThreadPoolExecutor(corePoolSize);
          }

          //?繼承了?ThreadPoolExecutor
          public?class?ScheduledThreadPoolExecutor?extends?ThreadPoolExecutor
          ????????implements?ScheduledExecutorService?
          {
          ????//?構造函數(shù),省略了自定義線程工廠的構造函數(shù)
          ?public?ScheduledThreadPoolExecutor(int?corePoolSize)?{
          ?????super(corePoolSize,?Integer.MAX_VALUE,?0,?NANOSECONDS,
          ???????????new?DelayedWorkQueue());
          ?}
          ?
          ?//?延時執(zhí)行任務
          ?public?ScheduledFuture?schedule(Runnable?command,
          ???????????????????????????????????????long?delay,
          ???????????????????????????????????????TimeUnit?unit)?{
          ????????...
          ????}
          ?//?定時執(zhí)行任務
          ?public?ScheduledFuture?scheduleAtFixedRate(Runnable?command,
          ??????????????????????????????????????????????????long?initialDelay,
          ??????????????????????????????????????????????????long?period,
          ??????????????????????????????????????????????????TimeUnit?unit)?{...}
          }

          使用示例:

          //?1.?創(chuàng)建定時線程池
          ExecutorService?scheduledThreadPool?=?Executors.newScheduledThreadPool(5);
          //?2.?創(chuàng)建Runnable(任務)
          Runnable?task?=?new?Runnable(){
          ??public?void?run()?{
          ?????System.out.println(Thread.currentThread().getName()?+?"--->運行");
          ??}
          };
          //?3.?向線程池提交任務
          scheduledThreadPool.schedule(task,?2,?TimeUnit.SECONDS);?//?延遲2s后執(zhí)行任務
          scheduledThreadPool.scheduleAtFixedRate(task,50,2000,TimeUnit.MILLISECONDS);//?延遲50ms后、每隔2000ms執(zhí)行任務

          4、CachedThreadPool

          緩存線程池。沒有核心線程,普通線程數(shù)量為Integer.MAX_VALUE(可以理解為無限),線程閑置60s后回收,任務隊列使用SynchronousQueue這種無容量的同步隊列。適用于任務量大但耗時低的場景。

          public?static?ExecutorService?newCachedThreadPool()?{
          ????return?new?ThreadPoolExecutor(0,?Integer.MAX_VALUE,
          ??????????????????????????????????60L,?TimeUnit.SECONDS,
          ??????????????????????????????????new?SynchronousQueue());
          }

          使用示例:

          //?1.?創(chuàng)建緩存線程池
          ExecutorService?cachedThreadPool?=?Executors.newCachedThreadPool();
          //?2.?創(chuàng)建Runnable(任務)
          Runnable?task?=?new?Runnable(){
          ??public?void?run()?{
          ?????System.out.println(Thread.currentThread().getName()?+?"--->運行");
          ??}
          };
          //?3.?向線程池提交任務
          cachedThreadPool.execute(task);

          解讀線程池

          OK,相信前面內容閱讀起來還算輕松愉悅吧,那么從這里開始就進入深水區(qū)了,如果后面內容能吃透,那么線程池知識就真的被你掌握了。

          我們知道,向線程池提交任務是用ThreadPoolExecutorexecute()方法,但在其內部,線程任務的處理其實是相當復雜的,涉及到ThreadPoolExecutorWorkerThread三個類的6個方法:

          execute()

          ThreadPoolExecutor類中,任務提交方法的入口是execute(Runnable command)方法(submit()方法也是調用了execute()),該方法其實只在嘗試做一件事:經過各種校驗之后,調用 addWorker(Runnable command,boolean core)方法為線程池創(chuàng)建一個線程并執(zhí)行任務,與之相對應,execute()的結果有兩個:

          參數(shù)說明:

          • Runnable command:待執(zhí)行的任務

          執(zhí)行流程:

          1、通過 ctl.get()得到線程池的當前線程數(shù),如果線程數(shù)小于corePoolSize,則調用 addWorker(commond,true)方法創(chuàng)建新的線程執(zhí)行任務,否則執(zhí)行步驟2;

          2、步驟1失敗,說明已經無法再創(chuàng)建新線程,那么考慮將任務放入阻塞隊列,等待執(zhí)行完任務的線程來處理。基于此,判斷線程池是否處于Running狀態(tài)(只有Running狀態(tài)的線程池可以接受新任務),如果任務添加到任務隊列成功則進入步驟3,失敗則進入步驟4;

          3、來到這一步需要說明任務已經加入任務隊列,這時要二次校驗線程池的狀態(tài),會有以下情形:

          • 線程池不再是Running狀態(tài)了,需要將任務從任務隊列中移除,如果移除成功則拒絕本次任務
          • 線程池是Running狀態(tài),則判斷線程池工作線程是否為0,是則調用 addWorker(commond,true)添加一個沒有初始任務的線程(這個線程將去獲取已經加入任務隊列的本次任務并執(zhí)行),否則進入步驟4;
          • 線程池不是Running狀態(tài),但從任務隊列移除任務失敗(可能已被某線程獲取?),進入步驟4;

          4、將線程池擴容至maximumPoolSize并調用 addWorker(commond,false)方法創(chuàng)建新的線程執(zhí)行任務,失敗則拒絕本次任務。

          流程圖:

          源碼詳讀:

          /**
          ?*?在將來的某個時候執(zhí)行給定的任務。任務可以在新線程中執(zhí)行,也可以在現(xiàn)有的池線程中執(zhí)行。
          ?*?如果由于此執(zhí)行器已關閉或已達到其容量而無法提交任務以供執(zhí)行,則由當前的{@code?RejectedExecutionHandler}處理該任務。
          ?*?
          ?*?@param?command?the?task?to?execute??待執(zhí)行的任務命令
          ?*/

          public?void?execute(Runnable?command)?{
          ????if?(command?==?null)
          ????????throw?new?NullPointerException();
          ????/*
          ?????*?Proceed?in?3?steps:
          ?????*?
          ?????* 1. 如果運行的線程少于corePoolSize,將嘗試以給定的命令作為第一個任務啟動新線程。
          ?????*
          ?????*?2.?如果一個任務可以成功排隊,那么我們仍然需要仔細檢查兩點,其一,我們是否應該添加一個線程
          ?????*?(因為自從上次檢查至今,一些存在的線程已經死亡),其二,線程池狀態(tài)此時已改變成非運行態(tài)。因此,我們重新檢查狀態(tài),如果檢查不通過,則移除已經入列的任務,如果檢查通過且線程池線程數(shù)為0,則啟動新線程。
          ?????*?
          ?????* 3. 如果無法將任務加入任務隊列,則將線程池擴容到極限容量并嘗試創(chuàng)建一個新線程,如果失敗則拒絕任務。
          ?????*/

          ????int?c?=?ctl.get();
          ???
          ????//?步驟1:判斷線程池當前線程數(shù)是否小于線程池大小
          ????if?(workerCountOf(c)?????????//?增加一個工作線程并添加任務,成功則返回,否則進行步驟2
          ????????//?true代表使用coreSize作為邊界約束,否則使用maximumPoolSize
          ????????if?(addWorker(command,?true))
          ????????????return;
          ????????c?=?ctl.get();
          ????}
          ????//?步驟2:不滿足workerCountOf(c)?< corePoolSize或addWorker失敗,進入步驟2
          ????//?校驗線程池是否是Running狀態(tài)且任務是否成功放入workQueue(阻塞隊列)
          ????if?(isRunning(c)?&&?workQueue.offer(command))?{
          ????????int?recheck?=?ctl.get();
          ????????//?再次校驗,如果線程池非Running且從任務隊列中移除任務成功,則拒絕該任務
          ????????if?(!?isRunning(recheck)?&&?remove(command))
          ????????????reject(command);
          ????????//?如果線程池工作線程數(shù)量為0,則新建一個空任務的線程
          ????????else?if?(workerCountOf(recheck)?==?0)
          ????????????//?如果線程池不是Running狀態(tài),是加入不進去的
          ????????????addWorker(null,?false);
          ????}
          ????//?步驟3:如果線程池不是Running狀態(tài)或任務入列失敗,嘗試擴容maxPoolSize后再次addWorker,失敗則拒絕任務
          ????else?if?(!addWorker(command,?false))
          ????????reject(command);
          }

          addWorker()

          addWorker(Runnable firstTask, boolean core)方法,顧名思義,向線程池添加一個帶有任務的工作線程。

          參數(shù)說明:

          • Runnable firstTask:新創(chuàng)建的線程應該首先運行的任務(如果沒有,則為空)。
          • boolean core:該參數(shù)決定了線程池容量的約束條件,即當前線程數(shù)量以何值為極限值。參數(shù)為 true 則使用corePollSize 作為約束值,否則使用maximumPoolSize。

          執(zhí)行流程:

          1、外層循環(huán)判斷線程池的狀態(tài)是否可以新增工作線程。這層校驗基于下面兩個原則:

          • 線程池為Running狀態(tài)時,既可以接受新任務也可以處理任務
          • 線程池為關閉狀態(tài)時只能新增空任務的工作線程(worker)處理任務隊列(workQueue)中的任務不能接受新任務

          2、內層循環(huán)向線程池添加工作線程并返回是否添加成功的結果。

          • 首先校驗線程數(shù)是否已經超限制,是則返回false,否則進入下一步
          • 通過CAS使工作線程數(shù)+1,成功則進入步驟3,失敗則再次校驗線程池是否是運行狀態(tài),是則繼續(xù)內層循環(huán),不是則返回外層循環(huán)

          3、核心線程數(shù)量+1成功的后續(xù)操作:添加到工作線程集合,并啟動工作線程

          • 首先獲取鎖之后,再次校驗線程池狀態(tài)(具體校驗規(guī)則見代碼注解),通過則進入下一步,未通過則添加線程失敗
          • 線程池狀態(tài)校驗通過后,再檢查線程是否已經啟動,是則拋出異常,否則嘗試將線程加入線程池
          • 檢查線程是否啟動成功,成功則返回true,失敗則進入 addWorkerFailed 方法

          流程圖:

          源碼詳讀:

          private?boolean?addWorker(Runnable?firstTask,?boolean?core)?{
          ????//?外層循環(huán):判斷線程池狀態(tài)
          ????retry:
          ????for?(;;)?{
          ????????int?c?=?ctl.get();
          ????????int?rs?=?runStateOf(c);

          ????????/**?
          ?????????*?1.線程池為非Running狀態(tài)(Running狀態(tài)則既可以新增核心線程也可以接受任務)
          ?????????*?2.線程為shutdown狀態(tài)且firstTask為空且隊列不為空
          ?????????*?3.滿足條件1且條件2不滿足,則返回false
          ?????????* 4.條件2解讀:線程池為shutdown狀態(tài)時且任務隊列不為空時,可以新增空任務的線程來處理隊列中的任務
          ?????????*/

          ????????if?(rs?>=?SHUTDOWN?&&
          ????????????!?(rs?==?SHUTDOWN?&&
          ???????????????firstTask?==?null?&&
          ???????????????!?workQueue.isEmpty()))
          ????????????return?false;

          ??//?內層循環(huán):線程池添加核心線程并返回是否添加成功的結果
          ????????for?(;;)?{
          ????????????int?wc?=?workerCountOf(c);
          ????????????//?校驗線程池已有線程數(shù)量是否超限:
          ????????????//?1.線程池最大上限CAPACITY?
          ????????????//?2.corePoolSize或maximumPoolSize(取決于入參core)
          ????????????if?(wc?>=?CAPACITY?||
          ????????????????wc?>=?(core???corePoolSize?:?maximumPoolSize))?
          ????????????????return?false;
          ????????????//?通過CAS操作使工作線程數(shù)+1,跳出外層循環(huán)
          ????????????if?(compareAndIncrementWorkerCount(c))?
          ????????????????break?retry;
          ????????????//?線程+1失敗,重讀ctl
          ????????????c?=?ctl.get();???//?Re-read?ctl
          ????????????//?如果此時線程池狀態(tài)不再是running,則重新進行外層循環(huán)
          ????????????if?(runStateOf(c)?!=?rs)
          ????????????????continue?retry;
          ????????????//?其他?CAS?失敗是因為工作線程數(shù)量改變了,繼續(xù)內層循環(huán)嘗試CAS對線程數(shù)+1
          ????????????//?else?CAS?failed?due?to?workerCount?change;?retry?inner?loop
          ????????}
          ????}

          ????/**
          ?????*?核心線程數(shù)量+1成功的后續(xù)操作:添加到工作線程集合,并啟動工作線程
          ?????*/

          ????boolean?workerStarted?=?false;
          ????boolean?workerAdded?=?false;
          ????Worker?w?=?null;
          ????try?{
          ????????final?ReentrantLock?mainLock?=?this.mainLock;
          ????????w?=?new?Worker(firstTask);
          ????????final?Thread?t?=?w.thread;
          ????????if?(t?!=?null)?{
          ????????????//?下面代碼需要加鎖:線程池主鎖
          ????????????mainLock.lock();?
          ????????????try?{
          ????????????????//?持鎖期間重新檢查,線程工廠創(chuàng)建線程失敗或獲取鎖之前關閉的情況發(fā)生時,退出
          ????????????????int?c?=?ctl.get();
          ????????????????int?rs?=?runStateOf(c);

          ????//?再次檢驗線程池是否是running狀態(tài)或線程池shutdown但線程任務為空
          ????????????????if?(rs?????????????????????(rs?==?SHUTDOWN?&&?firstTask?==?null))?{
          ????????????????????//?線程已經啟動,則拋出非法線程狀態(tài)異常
          ????????????????????//?為什么會存在這種狀態(tài)呢?未解決
          ????????????????????if?(t.isAlive())?//?precheck?that?t?is?startable
          ????????????????????????throw?new?IllegalThreadStateException();
          ????????????????????workers.add(w);?//加入線程池
          ????????????????????int?s?=?workers.size();
          ????????????????????//?如果當前工作線程數(shù)超過線程池曾經出現(xiàn)過的最大線程數(shù),刷新后者值
          ????????????????????if?(s?>?largestPoolSize)
          ????????????????????????largestPoolSize?=?s;?
          ????????????????????workerAdded?=?true;
          ????????????????}
          ????????????}?finally?{
          ????????????????mainLock.unlock();??//?釋放鎖
          ????????????}
          ????????????if?(workerAdded)?{?//?工作線程添加成功,啟動該線程
          ????????????????t.start();
          ????????????????workerStarted?=?true;
          ????????????}
          ????????}
          ????}?finally?{
          ????????//線程啟動失敗,則進入addWorkerFailed
          ????????if?(!?workerStarted)?
          ????????????addWorkerFailed(w);
          ????}
          ????return?workerStarted;
          }

          Worker類

          Worker類是內部類,既實現(xiàn)了Runnable,又繼承了AbstractQueuedSynchronizer(以下簡稱AQS),所以其既是一個可執(zhí)行的任務,又可以達到鎖的效果。

          Worker類主要維護正在運行任務的線程的中斷控制狀態(tài),以及其他次要的記錄。這個類適時地繼承了AbstractQueuedSynchronizer類,以簡化獲取和釋放鎖(該鎖作用于每個任務執(zhí)行代碼)的過程。這樣可以防止去中斷正在運行中的任務,只會中斷在等待從任務隊列中獲取任務的線程。

          我們實現(xiàn)了一個簡單的不可重入互斥鎖,而不是使用可重入鎖,因為我們不希望工作任務在調用setCorePoolSize之類的池控制方法時能夠重新獲取鎖。另外,為了在線程真正開始運行任務之前禁止中斷,我們將鎖狀態(tài)初始化為負值,并在啟動時清除它(在runWorker中)。

          private?final?class?Worker
          ????extends?AbstractQueuedSynchronizer
          ????implements?Runnable
          {
          ????/**
          ?????*?This?class?will?never?be?serialized,?but?we?provide?a
          ?????*?serialVersionUID?to?suppress?a?javac?warning.
          ?????*/

          ????private?static?final?long?serialVersionUID?=?6138294804551838833L;
          ?
          ????/**?Thread?this?worker?is?running?in.??Null?if?factory?fails.?*/
          ????final?Thread?thread;?
          ?????
          ????/**?Initial?task?to?run.??Possibly?null.?*/
          ????Runnable?firstTask;
          ?????
          ????/**?Per-thread?task?counter?*/
          ????volatile?long?completedTasks;
          ?
          ????/**
          ?????*?Creates?with?given?first?task?and?thread?from?ThreadFactory.
          ?????*?@param?firstTask?the?first?task?(null?if?none)
          ?????*/

          ????//?通過構造函數(shù)初始化,
          ????Worker(Runnable?firstTask)?{
          ????????//設置AQS的同步狀態(tài)
          ????????// state:鎖狀態(tài),-1為初始值,0為unlock狀態(tài),1為lock狀態(tài)
          ????????setState(-1);?//?inhibit?interrupts?until?runWorker??在調用runWorker前,禁止中斷
          ???????
          ????????this.firstTask?=?firstTask;
          ????????//?線程工廠創(chuàng)建一個線程
          ????????this.thread?=?getThreadFactory().newThread(this);?
          ????}
          ?
          ????/**?Delegates?main?run?loop?to?outer?runWorker??*/
          ????public?void?run()?{
          ????????runWorker(this);?//runWorker()是ThreadPoolExecutor的方法
          ????}
          ?
          ????//?Lock?methods
          ????//?The?value?0?represents?the?unlocked?state.?0代表“沒被鎖定”狀態(tài)
          ????//?The?value?1?represents?the?locked?state.?1代表“鎖定”狀態(tài)
          ?
          ????protected?boolean?isHeldExclusively()?{
          ????????return?getState()?!=?0;
          ????}
          ?
          ????/**
          ?????*?嘗試獲取鎖的方法
          ?????*?重寫AQS的tryAcquire(),AQS本來就是讓子類來實現(xiàn)的
          ?????*/

          ????protected?boolean?tryAcquire(int?unused)?{
          ????????//?判斷原值為0,且重置為1,所以state為-1時,鎖無法獲取。
          ????????//?每次都是0->1,保證了鎖的不可重入性
          ????????if?(compareAndSetState(0,?1))?{
          ????????????//?設置exclusiveOwnerThread=當前線程
          ????????????setExclusiveOwnerThread(Thread.currentThread());?
          ????????????return?true;
          ????????}
          ????????return?false;
          ????}
          ?
          ????/**
          ?????*?嘗試釋放鎖
          ?????*?不是state-1,而是置為0
          ?????*/

          ????protected?boolean?tryRelease(int?unused)?{
          ????????setExclusiveOwnerThread(null);?
          ????????setState(0);
          ????????return?true;
          ????}
          ?
          ????public?void?lock()????????{?acquire(1);?}
          ????public?boolean?tryLock()??{?return?tryAcquire(1);?}
          ????public?void?unlock()??????{?release(1);?}
          ????public?boolean?isLocked()?{?return?isHeldExclusively();?}
          ?
          ????/**
          ?????*?中斷(如果運行)
          ?????*?shutdownNow時會循環(huán)對worker線程執(zhí)行
          ?????*?且不需要獲取worker鎖,即使在worker運行時也可以中斷
          ?????*/

          ????void?interruptIfStarted()?{
          ????????Thread?t;
          ????????//如果state>=0、t!=null、且t沒有被中斷
          ????????//new?Worker()時state==-1,說明不能中斷
          ????????if?(getState()?>=?0?&&?(t?=?thread)?!=?null?&&?!t.isInterrupted())?{
          ????????????try?{
          ????????????????t.interrupt();
          ????????????}?catch?(SecurityException?ignore)?{
          ????????????}
          ????????}
          ????}
          }

          runWorker()

          可以說,runWorker(Worker w)是線程池中真正處理任務的方法,前面的execute()addWorker()都是在為該方法做準備和鋪墊。

          參數(shù)說明:

          • Worker w:封裝的Worker,攜帶了工作線程的諸多要素,包括Runnable(待處理任務)、lock(鎖)、completedTasks(記錄線程池已完成任務數(shù))

          執(zhí)行流程:

          1、判斷當前任務或者從任務隊列中獲取的任務是否不為空,都為空則進入步驟2,否則進入步驟3

          2、任務為空,則將completedAbruptly置為false(即線程不是突然終止),并執(zhí)行processWorkerExit(w,completedAbruptly)方法進入線程退出程序

          3、任務不為空,則進入循環(huán),并加鎖

          4、判斷是否為線程添加中斷標識,以下兩個條件滿足其一則添加中斷標識:

          • 線程池狀態(tài)>=STOP,即STOP或TERMINATED
          • 一開始判斷線程池狀態(tài)Thread.interrupted()為true,即線程已經被中斷,再次檢查線程池狀態(tài)是否>=STOP(以消除該瞬間shutdown方法生效,使線程池處于STOP或TERMINATED)

          5、執(zhí)行前置方法 beforeExecute(wt, task)(該方法為空方法,由子類實現(xiàn))后執(zhí)行task.run()方法執(zhí)行任務(執(zhí)行不成功拋出相應異常)

          6、執(zhí)行后置方法 afterExecute(task, thrown)(該方法為空方法,由子類實現(xiàn))后將線程池已完成的任務數(shù)+1,并釋放鎖。

          7、再次進行循環(huán)條件判斷。

          流程圖:

          源碼詳讀:

          final?void?runWorker(Worker?w)?{
          ????Thread?wt?=?Thread.currentThread();
          ????Runnable?task?=?w.firstTask;
          ????w.firstTask?=?null;
          ????//?allow?interrupts
          ????//?new?Worker()是state==-1,此處是調用Worker類的tryRelease()方法,將state置為0,而interruptIfStarted()中只有state>=0才允許調用中斷
          ????w.unlock();?
          ????????????
          ????//?線程退出的原因,true是任務導致,false是線程正常退出
          ????boolean?completedAbruptly?=?true;?
          ????try?{
          ????????//?當前任務和從任務隊列中獲取的任務都為空,方停止循環(huán)
          ????????while?(task?!=?null?||?(task?=?getTask())?!=?null)?{
          ????????????//上鎖可以防止在shutdown()時終止正在運行的worker,而不是應對并發(fā)
          ????????????w.lock();?
          ?????????????
          ????????????//?If?pool?is?stopping,?ensure?thread?is?interrupted;
          ????????????//?if?not,?ensure?thread?is?not?interrupted.??This
          ????????????//?requires?a?recheck?in?second?case?to?deal?with
          ????????????//?shutdownNow?race?while?clearing?interrupt
          ????????????/**
          ?????????????*?判斷1:確保只有在線程處于stop狀態(tài)且wt未中斷時,wt才會被設置中斷標識
          ?????????????*?條件1:線程池狀態(tài)>=STOP,即STOP或TERMINATED
          ?????????????*?條件2:一開始判斷線程池狀態(tài)=STOP(以消除該瞬間shutdown方法生效,使線程池處于STOP或TERMINATED),
          ?????????????*?條件1與條件2任意滿意一個,且wt不是中斷狀態(tài),則中斷wt,否則進入下一步
          ?????????????*/

          ????????????if?((runStateAtLeast(ctl.get(),?STOP)?||
          ?????????????????(Thread.interrupted()?&&
          ??????????????????runStateAtLeast(ctl.get(),?STOP)))?&&
          ????????????????!wt.isInterrupted())
          ????????????????wt.interrupt();?//當前線程調用interrupt()中斷
          ?????????????
          ????????????try?{
          ????????????????//執(zhí)行前(空方法,由子類重寫實現(xiàn))
          ????????????????beforeExecute(wt,?task);
          ?????????????????
          ????????????????Throwable?thrown?=?null;
          ????????????????try?{
          ????????????????????task.run();
          ????????????????}?
          ????????????????catch?(RuntimeException?x)?{
          ????????????????????thrown?=?x;?throw?x;
          ????????????????}?
          ????????????????catch?(Error?x)?{
          ????????????????????thrown?=?x;?throw?x;
          ????????????????}?
          ????????????????catch?(Throwable?x)?{
          ????????????????????thrown?=?x;?throw?new?Error(x);
          ????????????????}?
          ????????????????finally?{
          ????????????????????//執(zhí)行后(空方法,由子類重寫實現(xiàn))
          ????????????????????afterExecute(task,?thrown);?
          ????????????????}
          ????????????}?
          ????????????finally?{
          ????????????????task?=?null;?
          ????????????????w.completedTasks++;?//完成任務數(shù)+1
          ????????????????w.unlock();?//釋放鎖
          ????????????}
          ????????}
          ????????//?
          ????????completedAbruptly?=?false;
          ????}?
          ????finally?{
          ????????//處理worker的退出
          ????????processWorkerExit(w,?completedAbruptly);
          ????}
          }

          getTask()

          由函數(shù)調用關系圖可知,在ThreadPoolExecutor類的實現(xiàn)中,Runnable getTask()方法是為void runWorker(Worker w)方法服務的,它的作用就是在任務隊列(workQueue)中獲取 task(Runnable)。

          參數(shù)說明:無參數(shù)

          執(zhí)行流程:

          • 將timedOut(上次獲取任務是否超時)置為false(首次執(zhí)行方法,無上次,自然為false),進入一個無限循環(huán)
          • 如果線程池為Shutdown狀態(tài)且任務隊列為空(線程池shutdown狀態(tài)可以處理任務隊列中的任務,不再接受新任務,這個是重點)或者線程池為STOP或TERMINATED狀態(tài),則意味著線程池不必再獲取任務了,當前工作線程數(shù)量-1并返回null,否則進入步驟3
          • 如果線程池數(shù)量超限制或者時間超限且(任務隊列為空或當前線程數(shù)>1),則進入步驟4,否則進入步驟5。
          • 移除工作線程,成功則返回null,不成功則進入下輪循環(huán)。
          • 嘗試用poll() 或者 take()(具體用哪個取決于timed的值)獲取任務,如果任務不為空,則返回該任務。如果為空,則將timeOut 置為 true進入下一輪循環(huán)。如果獲取任務過程發(fā)生異常,則將 timeOut置為 false 后進入下一輪循環(huán)。

          流程圖:

          源碼詳讀:

          private?Runnable?getTask()?{
          ????//?最新一次poll是否超時
          ????boolean?timedOut?=?false;?//?Did?the?last?poll()?time?out?

          ????for?(;;)?{
          ????????int?c?=?ctl.get();
          ????????int?rs?=?runStateOf(c);

          ????????//?Check?if?queue?empty?only?if?necessary.
          ????????/**
          ?????????*?條件1:線程池狀態(tài)SHUTDOWN、STOP、TERMINATED狀態(tài)
          ?????????*?條件2:線程池STOP、TERMINATED狀態(tài)或workQueue為空
          ?????????*?條件1與條件2同時為true,則workerCount-1,并且返回null
          ?????????*?注:條件2是考慮到SHUTDOWN狀態(tài)的線程池不會接受任務,但仍會處理任務
          ?????????*/

          ????????if?(rs?>=?SHUTDOWN?&&?(rs?>=?STOP?||?workQueue.isEmpty()))?{
          ????????????decrementWorkerCount();
          ????????????return?null;
          ????????}

          ????????int?wc?=?workerCountOf(c);

          ????????//?Are?workers?subject?to?culling?
          ????????/**
          ?????????*?下列兩個條件滿足任意一個,則給當前正在嘗試獲取任務的工作線程設置阻塞時間限制(超時會被銷毀?不太確定這點),否則線程可以一直保持活躍狀態(tài)
          ?????????* 1.allowCoreThreadTimeOut:當前線程是否以keepAliveTime為超時時限等待任務
          ?????????*?2.當前線程數(shù)量已經超越了核心線程數(shù)
          ?????????*/

          ????????boolean?timed?=?allowCoreThreadTimeOut?||?wc?>?corePoolSize;
          ????????????
          ????????//?兩個條件全部為true,則通過CAS使工作線程數(shù)-1,即剔除工作線程
          ????????//?條件1:工作線程數(shù)大于maximumPoolSize,或(工作線程阻塞時間受限且上次在任務隊列拉取任務超時)
          ????????//?條件2:wc > 1或任務隊列為空
          ????????if?((wc?>?maximumPoolSize?||?(timed?&&?timedOut))
          ????????????&&?(wc?>?1?||?workQueue.isEmpty()))?{
          ????????????//?移除工作線程,成功則返回null,不成功則進入下輪循環(huán)
          ????????????if?(compareAndDecrementWorkerCount(c))
          ????????????????return?null;
          ????????????continue;
          ????????}

          ?????//?執(zhí)行到這里,說明已經經過前面重重校驗,開始真正獲取task了
          ????????try?{
          ????????????//?如果工作線程阻塞時間受限,則使用poll(),否則使用take()
          ????????????//?poll()設定阻塞時間,而take()無時間限制,直到拿到結果為止
          ????????????Runnable?r?=?timed??
          ????????????????workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS)?:
          ????????????????workQueue.take();
          ????????????//?r不為空,則返回該Runnable
          ????????????if?(r?!=?null)
          ????????????????return?r;
          ????????????//?沒能獲取到Runable,則將最近獲取任務是否超時設置為true
          ????????????timedOut?=?true;
          ????????}?catch?(InterruptedException?retry)?{
          ????????????//?響應中斷,進入下一次循環(huán)前將最近獲取任務超時狀態(tài)置為false
          ????????????timedOut?=?false;
          ????????}
          ????}
          }

          processWorkerExit()

          processWorkerExit(Worker w, boolean completedAbruptly)執(zhí)行線程退出的方法

          參數(shù)說明:

          • Worker w:要結束的工作線程。
          • boolean completedAbruptly:是否突然完成(異常導致),如果工作線程因為用戶異常死亡,則completedAbruptly參數(shù)為 true。

          執(zhí)行流程:

          1、如果 completedAbruptly 為 true,即工作線程因為異常突然死亡,則執(zhí)行工作線程-1操作。

          2、主線程獲取鎖后,線程池已經完成的任務數(shù)追加 w(當前工作線程) 完成的任務數(shù),并從worker的set集合中移除當前worker。

          3、根據(jù)線程池狀態(tài)進行判斷是否執(zhí)行tryTerminate()結束線程池。

          4、是否需要增加工作線程,如果線程池還沒有完全終止,仍需要保持一定數(shù)量的線程。

          • 如果當前線程是突然終止的,調用addWorker()創(chuàng)建工作線程

          • 當前線程不是突然終止,但當前工作線程數(shù)量小于線程池需要維護的線程數(shù)量,則創(chuàng)建工作線程。需要維護的線程數(shù)量為corePoolSize(取決于成員變量 allowCoreThreadTimeOut是否為 false)或1。

          源碼詳讀:

          /**
          ?*?Performs?cleanup?and?bookkeeping?for?a?dying?worker.?Called
          ?*?only?from?worker?threads.?Unless?completedAbruptly?is?set,
          ?*?assumes?that?workerCount?has?already?been?adjusted?to?account
          ?*?for?exit.??This?method?removes?thread?from?worker?set,?and
          ?*?possibly?terminates?the?pool?or?replaces?the?worker?if?either
          ?*?it?exited?due?to?user?task?exception?or?if?fewer?than
          ?*?corePoolSize?workers?are?running?or?queue?is?non-empty?but
          ?*?there?are?no?workers.
          ?*
          ?*?@param?w?the?worker
          ?*?@param?completedAbruptly?if?the?worker?died?due?to?user?exception
          ?*/

          private?void?processWorkerExit(Worker?w,?boolean?completedAbruptly)?{
          ????/**
          ?????*?1.工作線程-1操作
          ?????*?1)如果completedAbruptly?為true,說明工作線程發(fā)生異常,那么將正在工作的線程數(shù)量-1
          ?????*?2)如果completedAbruptly?為false,說明工作線程無任務可以執(zhí)行,由getTask()執(zhí)行worker-1操作
          ?????*/

          ????if?(completedAbruptly)?//?If?abrupt,?then?workerCount?wasn't?adjusted
          ????????decrementWorkerCount();

          ????//?2.從線程set集合中移除工作線程,該過程需要加鎖
          ????final?ReentrantLock?mainLock?=?this.mainLock;
          ????mainLock.lock();
          ????try?{
          ????????//?將該worker已完成的任務數(shù)追加到線程池已完成的任務數(shù)
          ????????completedTaskCount?+=?w.completedTasks;
          ????????//?HashSet中移除該worker
          ????????workers.remove(w);
          ????}?finally?{
          ????????mainLock.unlock();
          ????}
          ????
          ?//?3.根據(jù)線程池狀態(tài)進行判斷是否結束線程池
          ????tryTerminate();
          ?
          ?/**
          ?????*?4.是否需要增加工作線程
          ?????*?線程池狀態(tài)是running?或?shutdown
          ?????*?如果當前線程是突然終止的,addWorker()
          ?????*?如果當前線程不是突然終止的,但當前線程數(shù)量??????*?故如果調用線程池shutdown(),直到workQueue為空前,線程池都會維持corePoolSize個線程,然后再逐漸銷毀這corePoolSize個線程
          ?????*/

          ????int?c?=?ctl.get();
          ????if?(runStateLessThan(c,?STOP))?{
          ???????if?(!completedAbruptly)?{
          ????????????int?min?=?allowCoreThreadTimeOut???0?:?corePoolSize;
          ????????????if?(min?==?0?&&?!?workQueue.isEmpty())
          ????????????????min?=?1;
          ????????????if?(workerCountOf(c)?>=?min)
          ????????????????return;?//?replacement?not?needed
          ????????}
          ????????addWorker(null,?false);
          ????}
          }

          好啦,以上就是Java線程池的全部內容啦,堅持讀完的伙伴兒們你們收獲如何?覺得有幫助的就順手點個贊吧。

          來源:blog.csdn.net/mu_wind/article/details/113806680

          1.?坑爹!Quartz 重復調度問題,你遇到過么?

          2.?實戰(zhàn)!阿里神器 Seata 實現(xiàn) TCC模式 解決分布式事務!

          3.?Nginx從安裝到高可用,一篇搞定!

          4.?SpringBoot + Elasticsearch7.6實現(xiàn)簡單查詢及高亮分詞查詢

          最近面試BAT,整理一份面試資料Java面試BATJ通關手冊,覆蓋了Java核心技術、JVM、Java并發(fā)、SSM、微服務、數(shù)據(jù)庫、數(shù)據(jù)結構等等。

          獲取方式:點“在看”,關注公眾號并回復?Java?領取,更多內容陸續(xù)奉上。

          PS:因公眾號平臺更改了推送規(guī)則,如果不想錯過內容,記得讀完點一下在看,加個星標,這樣每次新文章推送才會第一時間出現(xiàn)在你的訂閱列表里。

          “在看”支持小哈呀,謝謝啦??!

          瀏覽 56
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  黄色在线免看 | 天堂网国产资源 | 偷拍视频中文字幕资源 | 天天操天天操天天操天天 | 涩婷婷 |