全網(wǎng)最詳細的線程池 ThreadPoolExecutor 解讀!
答應我, 不要再用 if (obj != null) 判空了 20個示例!詳解 Java8 Stream 用法,從此告別shi山(垃圾代碼) 利用Java8新特征,重構傳統(tǒng)設計模式,你學會了嗎? 竟然有一半的人不知道 for 與 foreach 的區(qū)別??? 利用多線程批量拆分 List 導入數(shù)據(jù)庫,效率杠杠的!
一、ThreadPoolExecutor類講解
1、線程池狀態(tài):
五種狀態(tài):

線程池的 shutdown()方法,將線程池由 RUNNING(運行狀態(tài))轉換為 SHUTDOWN狀態(tài)線程池的 shutdownNow()方法,將線程池由RUNNING 或 SHUTDOWN 狀態(tài)轉換為 STOP 狀態(tài)。
注:
SHUTDOWN狀態(tài) 和 STOP 狀態(tài) 先會轉變?yōu)?TIDYING狀態(tài),最終都會變?yōu)?TERMINATED
2、ThreadPoolExecutor構造函數(shù):
ThreadPoolExecutor繼承自AbstractExecutorService,而AbstractExecutorService實現(xiàn)了ExecutorService接口。

接下來我們分別講解這些參數(shù)的含義。
2.1)線程池工作原理:
corePoolSize:線程池中核心線程數(shù)的最大值maximumPoolSize:線程池中能擁有最多線程數(shù)workQueue:用于緩存任務的阻塞隊列
當調(diào)用線程池execute() 方法添加一個任務時,線程池會做如下判斷:
如果有空閑線程,則直接執(zhí)行該任務; 如果沒有空閑線程,且當前運行的線程數(shù)少于 corePoolSize,則創(chuàng)建新的線程執(zhí)行該任務;如果沒有空閑線程,且當前的線程數(shù)等于 corePoolSize,同時阻塞隊列未滿,則將任務入隊列,而不添加新的線程;如果沒有空閑線程,且阻塞隊列已滿,同時池中的線程數(shù)小于 maximumPoolSize,則創(chuàng)建新的線程執(zhí)行任務;如果沒有空閑線程,且阻塞隊列已滿,同時池中的線程數(shù)等于 maximumPoolSize,則根據(jù)構造函數(shù)中的 handler 指定的策略來拒絕新的任務。

2.2)KeepAliveTime:
keepAliveTime:表示空閑線程的存活時間TimeUnit unit:表示keepAliveTime的單位
當一個線程無事可做,超過一定的時間(keepAliveTime)時,線程池會判斷,如果當前運行的線程數(shù)大于 corePoolSize,那么這個線程就被停掉。所以線程池的所有任務完成后,它最終會收縮到 corePoolSize 的大小。
注:如果線程池設置了
allowCoreThreadTimeout參數(shù)為true(默認false),那么當空閑線程超過keepaliveTime后直接停掉。(不會判斷線程數(shù)是否大于corePoolSize)即:最終線程數(shù)會變?yōu)?。
2.3)workQueue 任務隊列:
workQueue:它決定了緩存任務的排隊策略ThreadPoolExecutor線程池推薦了三種等待隊列,它們是:SynchronousQueue、LinkedBlockingQueue和ArrayBlockingQueue。
1)有界隊列:
SynchronousQueue:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于 阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個隊列。ArrayBlockingQueue:一個由數(shù)組支持的有界阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序。一旦創(chuàng)建了這樣的緩存區(qū),就不能再增加其容量。試圖向已滿隊列中放入元素會導致操作受阻塞;試圖從空隊列中提取元素將導致類似阻塞。
2)無界隊列:
LinkedBlockingQueue:基于鏈表結構的無界阻塞隊列,它可以指定容量也可以不指定容量(實際上任何無限容量的隊列/棧都是有容量的,這個容量就是Integer.MAX_VALUE)PriorityBlockingQueue:是一個按照優(yōu)先級進行內(nèi)部元素排序的無界阻塞隊列。隊列中的元素必須實現(xiàn) Comparable 接口,這樣才能通過實現(xiàn)compareTo()方法進行排序。優(yōu)先級最高的元素將始終排在隊列的頭部;PriorityBlockingQueue不會保證優(yōu)先級一樣的元素的排序。
注意:
keepAliveTime和maximumPoolSize及BlockingQueue的類型均有關系。如果BlockingQueue是無界的,那么永遠不會觸發(fā)maximumPoolSize,自然keepAliveTime也就沒有了意義。
2.4)threadFactory:
threadFactory :指定創(chuàng)建線程的工廠。(可以不指定)
如果不指定線程工廠時,ThreadPoolExecutor 會使用ThreadPoolExecutor.defaultThreadFactory 創(chuàng)建線程。默認工廠創(chuàng)建的線程:同屬于相同的線程組,具有同為 Thread.NORM_PRIORITY 的優(yōu)先級,以及名為 “pool-XXX-thread-” 的線程名(XXX為創(chuàng)建線程時順序序號),且創(chuàng)建的線程都是非守護進程。
2.5)handler 拒絕策略:
handler :表示當 workQueue 已滿,且池中的線程數(shù)達到 maximumPoolSize 時,線程池拒絕添加新任務時采取的策略。(可以不指定)

最科學的的還是 AbortPolicy 提供的處理方式:拋出異常,由開發(fā)人員進行處理。
3、常用方法:
除了在創(chuàng)建線程池時指定上述參數(shù)的值外,還可在線程池創(chuàng)建以后通過如下方法進行設置。

此外,還有一些方法:
getCorePoolSize():返回線程池的核心線程數(shù),這個值是一直不變的,返回在構造函數(shù)中設置的coreSize大小;getMaximumPoolSize():返回線程池的最大線程數(shù),這個值是一直不變的,返回在構造函數(shù)中設置的coreSize大小;getLargestPoolSize():記錄了曾經(jīng)出現(xiàn)的最大線程個數(shù)(水位線);getPoolSize():線程池中當前線程的數(shù)量;getActiveCount():Returns the approximate(近似) number of threads that are actively executing tasks;prestartAllCoreThreads():會啟動所有核心線程,無論是否有待執(zhí)行的任務,線程池都會創(chuàng)建新的線程,直到池中線程數(shù)量達到 corePoolSize;prestartCoreThread():會啟動一個核心線程(同上);allowCoreThreadTimeOut(true):允許核心線程在KeepAliveTime時間后,退出;
4、Executors類:
Executors類的底層實現(xiàn)便是ThreadPoolExecutor!Executors 工廠方法有:
Executors.newCachedThreadPool():無界線程池,可以進行自動線程回收Executors.newFixedThreadPool(int):固定大小線程池Executors.newSingleThreadExecutor():單個后臺線程
它們均為大多數(shù)使用場景預定義了設置。不過在阿里java文檔中說明,盡量不要用該類創(chuàng)建線程池。
二、線程池相關接口介紹:
1、ExecutorService接口:
該接口是真正的線程池接口。上面的ThreadPoolExecutor以及下面的ScheduledThreadPoolExecutor都是該接口的實現(xiàn)類。改接口常用方法:
Future> submit(Runnable task):提交Runnable任務到線程池,返回Future對象,由于Runnable沒有返回值,也就是說調(diào)用Future對象get()方法返回null;:提交Callable任務到線程池,返回Future對象,調(diào)用Future對象get()方法可以獲取Callable的返回值;Future submit(Callable task) :提交Runnable任務到線程池,返回Future對象,調(diào)用Future對象get()方法可以獲取Runnable的參數(shù)值;Future submit(Runnable task,T result) invokeAll(collection of tasks)/invokeAll(collection of tasks, long timeout, TimeUnit unit):invokeAll會按照任務集合中的順序將所有的Future添加到返回的集合中,該方法是一個阻塞的方法。只有當所有的任務都執(zhí)行完畢時,或者調(diào)用線程被中斷,又或者超出指定時限時,invokeAll方法才會返回。當invokeAll返回之后每個任務要么返回,要么取消,此時客戶端可以調(diào)用get/isCancelled來判斷具體是什么情況。invokeAny(collection of tasks)/invokeAny(collection of tasks, long timeout, TimeUnit unit):阻塞的方法,不會返回 Future 對象,而是返回集合中某一個Callable 對象的結果,而且無法保證調(diào)用之后返回的結果是哪一個 Callable,如果一個任務運行完畢或者拋出異常,方法會取消其它的 Callable 的執(zhí)行。和invokeAll區(qū)別是只要有一個任務執(zhí)行完了,就把結果返回,并取消其他未執(zhí)行完的任務;同樣,也帶有超時功能;shutdown():在完成已提交的任務后關閉服務,不再接受新任;shutdownNow():停止所有正在執(zhí)行的任務并關閉服務;isTerminated():測試是否所有任務都執(zhí)行完畢了;isShutdown():測試是否該ExecutorService已被關閉。
1.1)submit方法示例:
我們知道,線程池接口中有以下三個主要方法,接下來我們看一下具體示例:

1)Callable:
public?static?ThreadPoolExecutor?threadPool?=?new?ThreadPoolExecutor(5,?50,?300,?TimeUnit.SECONDS,?
???new?ArrayBlockingQueue(50),??
???new?ThreadFactory(){?public?Thread?newThread(Runnable?r)?{
????????????????return?new?Thread(r,?"schema_task_pool_"?+?r.hashCode());
????????????}},?new?ThreadPoolExecutor.DiscardOldestPolicy());
?
public?static?void?callableTest()?{
?int?a?=?1;
?//callable
?Future?future?=?threadPool.submit(new?Callable(){
??@Override
??public?Boolean?call()?throws?Exception?{
???int?b?=?a?+?100;
???System.out.println(b);
???return?true;
??}
?});
?try?{
??System.out.println("feature.get");
??Boolean?boolean1?=?future.get();
??System.out.println(boolean1);
?}?catch?(InterruptedException?e)?{
??System.out.println("InterruptedException...");
??e.printStackTrace();
?}?catch?(ExecutionException?e)?{
??System.out.println("execute?exception...");
??e.printStackTrace();
?}?
}
2)Runnable:
public?static?void?runnableTest()?{
?int?a?=?1;
?//runnable
?Future>?future1?=?threadPool.submit(new?Runnable(){
??@Override
??public?void?run()?{
???int?b?=?a?+?100;
???System.out.println(b);
??}
?});
?try?{
??System.out.println("feature.get");
??Object?x?=?future1.get(900,TimeUnit.MILLISECONDS);
??System.out.println(x);//null
?}?catch?(InterruptedException?e)?{
??e.printStackTrace();
?}?catch?(ExecutionException?e)?{
??System.out.println("execute?exception...");
??e.printStackTrace();
?}?catch?(TimeoutException?e)?{
??e.printStackTrace();
?}
}
3)Runnable+result:
class?RunnableTask?implements?Runnable?{
?Person?p;
?RunnableTask(Person?p)?{
??this.p?=?p;
?}
?
?@Override
?public?void?run()?{
??p.setId(1);
??p.setName("Runnable?Task...");
?}
}
class?Person?{
?private?Integer?id;
?private?String?name;
?
?public?Person(Integer?id,?String?name)?{
??super();
??this.id?=?id;
??this.name?=?name;
?}
?public?Integer?getId()?{
??return?id;
?}
?public?void?setId(Integer?id)?{
??this.id?=?id;
?}
?public?String?getName()?{
??return?name;
?}
?public?void?setName(String?name)?{
??this.name?=?name;
?}
?@Override
?public?String?toString()?{
??return?"Person?[id="?+?id?+?",?name="?+?name?+?"]";
?}
}
?
public?static?void?runnableTest2()?{
?//runnable?+?result
?Person?p?=?new?Person(0,"person");
?Future?future2?=?threadPool.submit(new?RunnableTask(p),p);
?try?{
??System.out.println("feature.get");
??Person?person?=?future2.get();
??System.out.println(person);
?}?catch?(InterruptedException?e)?{
??e.printStackTrace();
?}?catch?(ExecutionException?e)?{
??e.printStackTrace();
?}
}
1.2)線程池執(zhí)行時,Callable的call方法(Runnable的run方法)拋出異常后,會出現(xiàn)什么?
在上面的例子中我們可以看到,線程池無論是執(zhí)行Callable還是Runnable,調(diào)用返回的Future對象get()方法時需要處理兩種異常(如果是調(diào)用get(timeout)方法,需要處理三種異常),如下:
//在線程池上運行
Future如果get方法被打斷,進入 InterruptedException異常;如果線程執(zhí)行過程(call、run方法)中拋出異常,進入 ExecutionException異常;如果get方法超時,進入 TimeoutException異常;
1.3)submit()和execute()方法區(qū)別:
ExecutorService、ScheduledExecutorService接口的submit()和execute()方法都是把任務提交到線程池中,但二者的區(qū)別是
接收的參數(shù)不一樣, execute只能接收Runnable類型、submit可以接收Runnable和Callable兩種類型;submit有返回值,而execute沒有返回值;submit方便Exception處理;
1)submit方法內(nèi)部實現(xiàn):
其實submit方法也沒有什么神秘的,就是將我們的任務封裝成了RunnableFuture接口(繼承了Runnable、Future接口),再調(diào)用execute方法,我們看源碼:
????public?Future>?submit(Runnable?task)?{
????????if?(task?==?null)?throw?new?NullPointerException();
????????RunnableFuture?ftask?=?newTaskFor(task,?null);??//轉成?RunnableFuture,傳的result是null
????????execute(ftask);
????????return?ftask;
????}
?
????public??Future?submit(Runnable?task,?T?result)? {
????????if?(task?==?null)?throw?new?NullPointerException();
????????RunnableFuture?ftask?=?newTaskFor(task,?result);
????????execute(ftask);
????????return?ftask;
????}
?
????public??Future?submit(Callable?task) ? {
????????if?(task?==?null)?throw?new?NullPointerException();
????????RunnableFuture?ftask?=?newTaskFor(task);
????????execute(ftask);
????????return?ftask;
????}
2)newTaskFor方法內(nèi)部實現(xiàn):
newTaskFor方法是new了一個FutureTask返回,所以三個方法其實都是把task轉成FutureTask,如果task是Callable,就直接賦值,如果是Runnable 就轉為Callable再賦值。
當submit參數(shù)是Callable 時:
????protected??RunnableFuture?newTaskFor(Callable?callable) ? {
????????return?new?FutureTask(callable);
????}
????public?FutureTask(Callable?callable) ?{
????????if?(callable?==?null)
????????????throw?new?NullPointerException();
????????this.callable?=?callable;
????????this.state?=?NEW;??????
????}
當submit參數(shù)是Runnable時:
???//?按順序看,層層調(diào)用
????protected??RunnableFuture?newTaskFor(Runnable?runnable,?T?value)? {
????????return?new?FutureTask(runnable,?value);
????}
????public?FutureTask(Runnable?runnable,?V?result)?{
????????this.callable?=?Executors.callable(runnable,?result);??//轉?runnable?為?callable?
????????this.state?=?NEW;?
????}
???//?以下為Executors中的方法
????public?static??Callable?callable(Runnable?task,?T?result)? {
????????if?(task?==?null)
????????????throw?new?NullPointerException();
????????return?new?RunnableAdapter(task,?result);
????}
????static?final?class?RunnableAdapter<T>?implements?Callable<T>?{??//適配器
????????final?Runnable?task;
????????final?T?result;
????????RunnableAdapter(Runnable?task,?T?result)?{
????????????this.task?=?task;
????????????this.result?=?result;
????????}
????????public?T?call()?{???
????????????task.run();
????????????return?result;
????????}
????}
看了源碼就揭開了神秘面紗了,就是因為Future需要返回結果,所以內(nèi)部task必須是Callable,如果task是Runnable 就偷天換日,在Runnable 外面包個Callable馬甲,返回的結果在構造時就寫好。
參考:https://blog.csdn.net/liuxiao723846/article/details/108024212
1.4)ScheduledExecutorService接口:
繼承ExecutorService,并且提供了按時間安排執(zhí)行任務的功能,它提供的方法主要有:
schedule(task, initDelay): 安排所提交的Callable或Runnable任務在initDelay指定的時間后執(zhí)行;scheduleAtFixedRate():安排所提交的Runnable任務按指定的間隔重復執(zhí)行;scheduleWithFixedDelay():安排所提交的Runnable任務在每次執(zhí)行完后,等待delay所指定的時間后重復執(zhí)行;
注:該接口的實現(xiàn)類是
ScheduledThreadPoolExecutor。
2、Callable接口:
jdk1.5以后創(chuàng)建線程可以通過一下方式:
繼承 Thread類,實現(xiàn)void run()方法;實現(xiàn) Runnable接口,實現(xiàn)void run()方法;實現(xiàn) Callable接口,實現(xiàn)V call() Throws Exception方法
1)Callable和Runnale接口區(qū)別:
Callable可以拋出異常,和Future、FutureTask配合可以用來獲取異步執(zhí)行的結果;Runnable沒有返回結果,異常只能內(nèi)部消化;
2)執(zhí)行Callable的線程的方法可以通過以下兩種方式:
借助 FutureTask,使用Thread的start方法來執(zhí)行;加入到線程池中,使用線程池的 execute或submit執(zhí)行;
注:
Callable無法直接使用Thread來執(zhí)行;
我們都知道,Callable帶有返回值的,如果我們不需要返回值,卻又想用Callable該如何做?
jdk中有個Void類型(大寫V),但必須也要return null。
threadpool.submit(new?Callable()?{
????@Override
????public?Void?call()?{
????????//...
????????return?null;
????}
});
3)通過Executors工具類可以把Runnable接口轉換成Callable接口:
Executors中的callable方法可以將Runnable轉成Callable,如下:
public?static??Callable?callable(Runnable?task,?T?result)? {
????????if?(task?==?null)
????????????throw?new?NullPointerException();
????????return?new?RunnableAdapter(task,?result);
}
RunnableAdapter類在上面已經(jīng)看過源碼,原理就是將返回值result作為成員變量,通過參數(shù)傳遞進去,進而實現(xiàn)了Runnable可以返回值。
示例:
public?static?void?test5()?{
?????Person?p?=?new?Person(0,"person");
?????RunnableTask?runnableTask?=?new?RunnableTask(p);//創(chuàng)建runnable
?????Callable?callable?=?Executors.callable(runnableTask,p);//轉換
?????Future?future1?=?threadPool.submit(callable);//在線程池上執(zhí)行Callable
?????try?{
???Person?person?=?future1.get();
???System.out.println(person);
??}?catch?(InterruptedException?|?ExecutionException?e)?{
???e.printStackTrace();
??}
?????
?????Runnable?runnable?=?new?Runnable()?{//創(chuàng)建Runnable
???@Override
???public?void?run()?{
????
???}
?????};
?????Callable 3、Future接口:
3.1)Future是用來獲取異步計算結果的接口,常用方法:
boolean cancel(boolean mayInterruptIfRunning):試圖取消對此任務的執(zhí)行。如果任務已完成、或已取消,或者由于某些其他原因而無法取消,則此嘗試將失敗。當調(diào)用 cancel 時,如果調(diào)用成功,而此任務尚未啟動,則此任務將永不運行。如果任務已經(jīng)啟動,則mayInterruptIfRunning參數(shù)確定是否應該以試圖停止任務的方式來中斷執(zhí)行此任務的線程。此方法返回后,對isDone()的后續(xù)調(diào)用將始終返回 true。如果此方法返回 true,則對isCancelled()的后續(xù)調(diào)用將始終返回 true。boolean isCancelled():如果在任務正常完成前將其取消,則返回 true。boolean isDone():如果任務已完成,則返回 true,可能由于正常終止、異常或取消而完成,在所有這些情況中,此方法都將返回 true。V get()throws InterruptedException,ExecutionException:獲取異步結果,此方法會一直阻塞等到計算完成;V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException,TimeoutException:獲取異步結果,此方法會在指定時間內(nèi)一直阻塞等到計算完成,超時后會拋出超時異常。
通過方法分析我們也知道實際上Future提供了3種功能:
能夠中斷執(zhí)行中的任務; 判斷任務是否執(zhí)行完成; 獲取任務執(zhí)行完成后額結果。
但是Future只是一個接口,我們無法直接創(chuàng)建對象,因此就需要其實現(xiàn)類FutureTask登場啦。
3.2)FutureTask類:
1)FutureTask類的實現(xiàn):
public?class?FutureTask<V>?implements?RunnableFuture<V>?{
//...
}
?
public?interface?RunnableFuture<V>?extends?Runnable,?Future<V>?{
????/**
?????*?Sets?this?Future?to?the?result?of?its?computation
?????*?unless?it?has?been?cancelled.
?????*/
????void?run();
}
FutureTask實現(xiàn)了Runnable、Future兩個接口。由于FutureTask實現(xiàn)了Runnable,因此它既可以通過Thread包裝來直接執(zhí)行,也可以提交給ExecuteService來執(zhí)行。并且還可以直接通過get()函數(shù)獲取執(zhí)行結果,該函數(shù)會阻塞,直到結果返回。
因此FutureTask既是Future、Runnable,又是包裝了Callable( 如果是Runnable最終也會被轉換為Callable ), 它是這兩者的合體。
2)FutureTask的構造函數(shù):
public?FutureTask(Callable?callable) ?{
?
}
?
public?FutureTask(Runnable?runnable,?V?result)?{
?
}
3.3)示例:(FutureTask兩種構造函數(shù)、以及在Thread和線程池上運行)
1)FutureTask包裝過的Callable在Thread、線程池上執(zhí)行:
public?static?void?test3()?{
??int?a?=?1,b?=?2;
??Callable?callable?=?new?Callable()?{
???@Override
???public?Integer?call()?throws?Exception?{
????return?a?+?b;
???}
??};
??//通過futureTask來執(zhí)行Callable
??FutureTask?futureTask?=?new?FutureTask<>(callable);
??
??//1.使用Thread執(zhí)行線程
??new?Thread(futureTask).start();
??try?{
???Integer?integer?=?futureTask.get();
???System.out.println(integer);
??}?catch?(InterruptedException?e)?{
???e.printStackTrace();
??}?catch?(ExecutionException?e)?{
???e.printStackTrace();
??}
??
??//2.使用線程池執(zhí)行線程
??Executors.newFixedThreadPool(1).submit(futureTask);
??threadPool.shutdown();
??try?{
???Integer?integer?=?futureTask.get();
???System.out.println(integer);
??}?catch?(InterruptedException?|?ExecutionException?e)?{
???e.printStackTrace();
??}?
?}
2)FutureTask包裝過的Runnable在Thread、線程池上執(zhí)行:
public?static?void?test4()?{
??Person?p?=?new?Person(0,"person");
??RunnableTask?runnableTask?=?new?RunnableTask(p);
??
??//創(chuàng)建futureTask來執(zhí)行Runnable
??FutureTask?futureTask?=?new?FutureTask<>(runnableTask,p);
??
??//1.使用Thread執(zhí)行線程
??new?Thread(futureTask).start();
??try?{
???Person?x?=?futureTask.get();
???System.out.println(x);
??}?catch?(InterruptedException?|?ExecutionException?e)?{
???e.printStackTrace();
??}?
??
??//2.使用線程池執(zhí)行線程
??threadPool.submit(futureTask);
??threadPool.shutdown();
??try?{
???Person?y?=?futureTask.get();
???System.out.println(y);
??}?catch?(InterruptedException?|?ExecutionException?e)?{
???e.printStackTrace();
??}
?}
Person、RunnableTask類同上面的示例中。
最后,再給大家推薦一個GitHub項目,該項目整理了上千本常用技術PDF,技術書籍都可以在這里找到。
GitHub地址:https://github.com/hello-go-maker/cs-books
電子書已經(jīng)更新好了,拿走不謝,記得點一個star,持續(xù)更新中...

