阿里為何不允許用Executors創(chuàng)建線程池?
不點(diǎn)藍(lán)字,我們哪來故事?

每天 11 點(diǎn)更新文章,餓了點(diǎn)外賣,點(diǎn)擊 ??《無門檻外賣優(yōu)惠券,每天免費(fèi)領(lǐng)!》

作者:雪山上的蒲公英
cnblogs.com/zjfjava/p/11227456.html
1. 通過Executors創(chuàng)建線程池的弊端
在創(chuàng)建線程池的時(shí)候,大部分人還是會(huì)選擇使用Executors去創(chuàng)建。

下面是創(chuàng)建定長線程池(FixedThreadPool)的一個(gè)例子,嚴(yán)格來說,當(dāng)使用如下代碼創(chuàng)建線程池時(shí),是不符合編程規(guī)范的。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
原因在于:(摘自阿里編碼規(guī)約)
線程池不允許使用Executors去創(chuàng)建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學(xué)更加明確線程池的運(yùn)行規(guī)則,規(guī)避資源耗盡的風(fēng)險(xiǎn)。
說明:Executors各個(gè)方法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
主要問題是堆積的請求處理隊(duì)列可能會(huì)耗費(fèi)非常大的內(nèi)存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
主要問題是線程數(shù)最大數(shù)是Integer.MAX_VALUE,可能會(huì)創(chuàng)建數(shù)量非常多的線程,甚至OOM。
2. 通過ThreadPoolExecutor創(chuàng)建線程池
所以,針對(duì)上面的不規(guī)范代碼,重構(gòu)為通過ThreadPoolExecutor創(chuàng)建線程池的方式。
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
ThreadPoolExecutor 是線程池的核心實(shí)現(xiàn)。線程的創(chuàng)建和終止需要很大的開銷,線程池中預(yù)先提供了指定數(shù)量的可重用線程,所以使用線程池會(huì)節(jié)省系統(tǒng)資源,并且每個(gè)線程池都維護(hù)了一些基礎(chǔ)的數(shù)據(jù)統(tǒng)計(jì),方便線程的管理和監(jiān)控。
3. ThreadPoolExecutor參數(shù)解釋
下面是對(duì)其參數(shù)的解釋,在創(chuàng)建線程池時(shí)需根據(jù)自己的情況來合理設(shè)置線程池。
corePoolSize & maximumPoolSize
核心線程數(shù)(corePoolSize)和最大線程數(shù)(maximumPoolSize)是線程池中非常重要的兩個(gè)概念,希望同學(xué)們能夠掌握。
當(dāng)一個(gè)新任務(wù)被提交到池中,如果當(dāng)前運(yùn)行線程小于核心線程數(shù)(corePoolSize),即使當(dāng)前有空閑線程,也會(huì)新建一個(gè)線程來處理新提交的任務(wù);如果當(dāng)前運(yùn)行線程數(shù)大于核心線程數(shù)(corePoolSize)并小于最大線程數(shù)(maximumPoolSize),只有當(dāng)?shù)却?duì)列已滿的情況下才會(huì)新建線程。
keepAliveTime & unit
keepAliveTime 為超過 corePoolSize 線程數(shù)量的線程最大空閑時(shí)間,unit 為時(shí)間單位。
等待隊(duì)列
任何阻塞隊(duì)列(BlockingQueue)都可以用來轉(zhuǎn)移或保存提交的任務(wù),線程池大小和阻塞隊(duì)列相互約束線程池:
如果運(yùn)行線程數(shù)小于
corePoolSize,提交新任務(wù)時(shí)就會(huì)新建一個(gè)線程來運(yùn)行;如果運(yùn)行線程數(shù)大于或等于
corePoolSize,新提交的任務(wù)就會(huì)入列等待;如果隊(duì)列已滿,并且運(yùn)行線程數(shù)小于maximumPoolSize,也將會(huì)新建一個(gè)線程來運(yùn)行;如果線程數(shù)大于
maximumPoolSize,新提交的任務(wù)將會(huì)根據(jù)拒絕策略來處理。
下面來看一下三種通用的入隊(duì)策略:
直接傳遞:通過 SynchronousQueue 直接把任務(wù)傳遞給線程。如果當(dāng)前沒可用線程,嘗試入隊(duì)操作會(huì)失敗,然后再創(chuàng)建一個(gè)新的線程。當(dāng)處理可能具有內(nèi)部依賴性的請求時(shí),該策略會(huì)避免請求被鎖定。直接傳遞通常需要無界的最大線程數(shù)(maximumPoolSize),避免拒絕新提交的任務(wù)。當(dāng)任務(wù)持續(xù)到達(dá)的平均速度超過可處理的速度時(shí),可能導(dǎo)致線程的無限增長。
無界隊(duì)列:使用無界隊(duì)列(如 LinkedBlockingQueue)作為等待隊(duì)列,當(dāng)所有的核心線程都在處理任務(wù)時(shí), 新提交的任務(wù)都會(huì)進(jìn)入隊(duì)列等待。因此,不會(huì)有大于 corePoolSize 的線程會(huì)被創(chuàng)建(maximumPoolSize 也將失去作用)。這種策略適合每個(gè)任務(wù)都完全獨(dú)立于其他任務(wù)的情況;例如網(wǎng)站服務(wù)器。這種類型的等待隊(duì)列可以使瞬間爆發(fā)的高頻請求變得平滑。當(dāng)任務(wù)持續(xù)到達(dá)的平均速度超過可處理速度時(shí),可能導(dǎo)致等待隊(duì)列無限增長。
有界隊(duì)列:當(dāng)使用有限的最大線程數(shù)時(shí),有界隊(duì)列(如 ArrayBlockingQueue)可以防止資源耗盡,但是難以調(diào)整和控制。隊(duì)列大小和線程池大小可以相互作用:使用大的隊(duì)列和小的線程數(shù)可以減少CPU使用率、系統(tǒng)資源和上下文切換的開銷,但是會(huì)導(dǎo)致吞吐量變低,如果任務(wù)頻繁地阻塞(例如被I/O限制),系統(tǒng)就能為更多的線程調(diào)度執(zhí)行時(shí)間。使用小的隊(duì)列通常需要更多的線程數(shù),這樣可以最大化CPU使用率,但可能會(huì)需要更大的調(diào)度開銷,從而降低吞吐量。
拒絕策略
當(dāng)線程池已經(jīng)關(guān)閉或達(dá)到飽和(最大線程和隊(duì)列都已滿)狀態(tài)時(shí),新提交的任務(wù)將會(huì)被拒絕。ThreadPoolExecutor 定義了四種拒絕策略:
AbortPolicy:默認(rèn)策略,在需要拒絕任務(wù)時(shí)拋出RejectedExecutionException;
CallerRunsPolicy:直接在 execute 方法的調(diào)用線程中運(yùn)行被拒絕的任務(wù),如果線程池已經(jīng)關(guān)閉,任務(wù)將被丟棄;
DiscardPolicy:直接丟棄任務(wù);
DiscardOldestPolicy:丟棄隊(duì)列中等待時(shí)間最長的任務(wù),并執(zhí)行當(dāng)前提交的任務(wù),如果線程池已經(jīng)關(guān)閉,任務(wù)將被丟棄。
我們也可以自定義拒絕策略,只需要實(shí)現(xiàn) RejectedExecutionHandler;需要注意的是,拒絕策略的運(yùn)行需要指定線程池和隊(duì)列的容量。
ps:Java面試刷題寶庫
4. ThreadPoolExecutor創(chuàng)建線程方式
通過下面的demo來了解ThreadPoolExecutor創(chuàng)建線程的過程。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 測試ThreadPoolExecutor對(duì)線程的執(zhí)行順序
**/
public class ThreadPoolSerialTest {
public static void main(String[] args) {
//核心線程數(shù)
int corePoolSize = 3;
//最大線程數(shù)
int maximumPoolSize = 6;
//超過 corePoolSize 線程數(shù)量的線程最大空閑時(shí)間
long keepAliveTime = 2;
//以秒為時(shí)間單位
TimeUnit unit = TimeUnit.SECONDS;
//創(chuàng)建工作隊(duì)列,用于存放提交的等待執(zhí)行任務(wù)
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(2);
ThreadPoolExecutor threadPoolExecutor = null;
try {
//創(chuàng)建線程池
threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
new ThreadPoolExecutor.AbortPolicy());
//循環(huán)提交任務(wù)
for (int i = 0; i < 8; i++) {
//提交任務(wù)的索引
final int index = (i + 1);
threadPoolExecutor.submit(() -> {
//線程打印輸出
System.out.println("大家好,我是線程:" + index);
try {
//模擬線程執(zhí)行時(shí)間,10s
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//每個(gè)任務(wù)提交后休眠500ms再提交下一個(gè)任務(wù),用于保證提交順序
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
threadPoolExecutor.shutdown();
}
}
}
執(zhí)行結(jié)果:

這里描述一下執(zhí)行的流程:
首先通過 ThreadPoolExecutor 構(gòu)造函數(shù)創(chuàng)建線程池;
執(zhí)行 for 循環(huán),提交 8 個(gè)任務(wù)(恰好等于maximumPoolSize[最大線程數(shù)] + capacity[隊(duì)列大小]);
通過 threadPoolExecutor.submit 提交 Runnable 接口實(shí)現(xiàn)的執(zhí)行任務(wù);
提交第1個(gè)任務(wù)時(shí),由于當(dāng)前線程池中正在執(zhí)行的任務(wù)為 0 ,小于 3(corePoolSize 指定),所以會(huì)創(chuàng)建一個(gè)線程用來執(zhí)行提交的任務(wù)1;
提交第 2, 3 個(gè)任務(wù)的時(shí)候,由于當(dāng)前線程池中正在執(zhí)行的任務(wù)數(shù)量小于等于 3 (corePoolSize 指定),所以會(huì)為每一個(gè)提交的任務(wù)創(chuàng)建一個(gè)線程來執(zhí)行任務(wù);
當(dāng)提交第4個(gè)任務(wù)的時(shí)候,由于當(dāng)前正在執(zhí)行的任務(wù)數(shù)量為 3 (因?yàn)槊總€(gè)線程任務(wù)執(zhí)行時(shí)間為10s,所以提交第4個(gè)任務(wù)的時(shí)候,前面3個(gè)線程都還在執(zhí)行中),此時(shí)會(huì)將第4個(gè)任務(wù)存放到 workQueue 隊(duì)列中等待執(zhí)行;
由于 workQueue 隊(duì)列的大小為 2 ,所以該隊(duì)列中也就只能保存 2 個(gè)等待執(zhí)行的任務(wù),所以第5個(gè)任務(wù)也會(huì)保存到任務(wù)隊(duì)列中;
當(dāng)提交第6個(gè)任務(wù)的時(shí)候,因?yàn)楫?dāng)前線程池正在執(zhí)行的任務(wù)數(shù)量為3,workQueue 隊(duì)列中存儲(chǔ)的任務(wù)數(shù)量也滿了,這時(shí)會(huì)判斷當(dāng)前線程池中正在執(zhí)行的任務(wù)的數(shù)量是否小于6(maximumPoolSize指定);
如果小于 6 ,那么就會(huì)新創(chuàng)建一個(gè)線程來執(zhí)行提交的任務(wù) 6;
執(zhí)行第7,8個(gè)任務(wù)的時(shí)候,也要判斷當(dāng)前線程池中正在執(zhí)行的任務(wù)數(shù)是否小于6(maximumPoolSize指定),如果小于6,那么也會(huì)立即新建線程來執(zhí)行這些提交的任務(wù);
此時(shí),6個(gè)任務(wù)都已經(jīng)提交完畢,那 workQueue 隊(duì)列中的等待 任務(wù)4 和 任務(wù)5 什么時(shí)候執(zhí)行呢?
當(dāng)任務(wù)1執(zhí)行完畢后(10s后),執(zhí)行任務(wù)1的線程并沒有被銷毀掉,而是獲取 workQueue 中的任務(wù)4來執(zhí)行;
當(dāng)任務(wù)2執(zhí)行完畢后,執(zhí)行任務(wù)2的線程也沒有被銷毀,而是獲取 workQueue 中的任務(wù)5來執(zhí)行;
通過上面流程的分析,也就知道了之前案例的輸出結(jié)果的原因。其實(shí),線程池中會(huì)線程執(zhí)行完畢后,并不會(huì)被立刻銷毀,線程池中會(huì)保留 corePoolSize 數(shù)量的線程,當(dāng) workQueue 隊(duì)列中存在任務(wù)或者有新提交任務(wù)時(shí),那么會(huì)通過線程池中已有的線程來執(zhí)行任務(wù),避免了頻繁的線程創(chuàng)建與銷毀,而大于 corePoolSize 小于等于 maximumPoolSize 創(chuàng)建的線程,則會(huì)在空閑指定時(shí)間(keepAliveTime)后進(jìn)行回收。
5. ThreadPoolExecutor拒絕策略
在上面的測試中,我設(shè)置的執(zhí)行線程總數(shù)恰好等于maximumPoolSize[最大線程數(shù)] + capacity[隊(duì)列大小],因此沒有出現(xiàn)需要執(zhí)行拒絕策略的情況,因此在這里,我再增加一個(gè)線程,提交9個(gè)任務(wù),來演示不同的拒絕策略。
AbortPolicy

CallerRunsPolicy

DiscardPolicy

DiscardOldestPolicy

參考
https://www.jianshu.com/p/7be43712ef21 https://www.jianshu.com/p/6f82b738ac58
往期推薦
下方二維碼關(guān)注我

技術(shù)草根,堅(jiān)持分享 編程,算法,架構(gòu)

看完文章,餓了點(diǎn)外賣,點(diǎn)擊 ??《無門檻外賣優(yōu)惠券,每天免費(fèi)領(lǐng)!》


