Tomcat線程池的騷操作
“?寫給自己看,說給別人聽。你好,這是think123的第71篇原創(chuàng)文章”
在之前的《為什么阿里建議你不要使用Executors來創(chuàng)建線程池?》文章中深入分析了一下線程池,這里重溫下它的主要流程:
提交一個任務(wù),如果線程池中的工作線程數(shù)小于corePoolSize,則新建一個工作線程執(zhí)行任務(wù)
如果線程池當(dāng)前的工作線程已經(jīng)等于了corePoolSize,則將新的任務(wù)放入到工作隊列中正在執(zhí)行
如果工作隊列已經(jīng)滿了,并且工作線程數(shù)小于maximumPoolSize,則新建一個工作線程來執(zhí)行任務(wù)
如果當(dāng)前線程池中工作線程數(shù)已經(jīng)達到了maximumPoolSize,而新的任務(wù)無法放入到任務(wù)隊列中,則采用對應(yīng)的策略進行相應(yīng)的處理(默認(rèn)是拒絕策略)
當(dāng)線程數(shù)大于核心線程數(shù)時,線程等待 keepAliveTime 后還是沒有任務(wù)需要處理的話,收縮線程到核心線程數(shù)。(傳入 true 給 allowCoreThreadTimeOut 方法,來讓線程池在空閑的時候同樣回收核心線程。)
在JDK自帶的策略中有一個CallerRunsPolicy策略,很容易被忽視,它的意思是當(dāng)任務(wù)隊列已經(jīng)滿了的時候這個任務(wù)會被調(diào)用線程池的線程執(zhí)行。?
比如我們在tomcat線程中調(diào)用了線程池,當(dāng)線程池的隊列滿了之后,會將這個任務(wù)交給tomcat線程執(zhí)行。這個時候就會影響其他同步執(zhí)行的線程,甚至可能把線程池搞崩潰。
我們還可以通過一些手段來修改線程池的默認(rèn)行為,比如回收核心線程。或者聲明線程池后立即調(diào)用 prestartAllCoreThreads 方法,來啟動所有核心線程。
我們發(fā)現(xiàn)核心線程數(shù)只有在工作隊列滿了之后才會擴容,那么能不能先擴容核心線程,等到達到最大線程數(shù)之后再加入工作隊列呢?讓線程池更彈性,優(yōu)先開啟更多線程呢?
當(dāng)然可以,tomcat中的ThreadPoolExecutor就是就做了這樣的優(yōu)化。
tomcat的線程池在創(chuàng)建的時候會先啟動所有的核心線程(prestartAllCoreThreads),并且會優(yōu)先擴容線程數(shù)。
tomcat中的ThreadPoolExecutor繼承自java.util.concurrent.ThreadPoolExecutor,并且任務(wù)隊列使用的是TaskQueue(繼承自LinkedBlockingQueue)
public?class?ThreadPoolExecutor?extends?java.util.concurrent.ThreadPoolExecutor?{
public?ThreadPoolExecutor(int?corePoolSize,?int?maximumPoolSize,?long?keepAliveTime,?
??TimeUnit?unit,?BlockingQueue?workQueue,?RejectedExecutionHandler?handler) ?{
??????super(corePoolSize,?maximumPoolSize,?keepAliveTime,?unit,?workQueue,?handler);
??????//?調(diào)用父類的方法,先啟動所有的核心線程
??????prestartAllCoreThreads();
????}
??public?void?execute(Runnable?command)?{
????if?(command?==?null)
??????throw?new?NullPointerException();
????int?c?=?ctl.get();
????//?1.?如果工作線程數(shù)小于核心線程數(shù)(corePoolSize),則創(chuàng)建一個工作線程執(zhí)行任務(wù)
????if?(workerCountOf(c)???????if?(addWorker(command,?true))
??????????return;
??????c?=?ctl.get();
????}
????//?2.?如果當(dāng)前是running狀態(tài),并且任務(wù)隊列能夠添加任務(wù)
????if?(isRunning(c)?&&?workQueue.offer(command))?{
??????int?recheck?=?ctl.get();
??????//?2.1?如果不處于running狀態(tài)了(使用者可能調(diào)用了shutdown方法),
??????//?則將剛才添加到任務(wù)隊列的任務(wù)移除
??????if?(!?isRunning(recheck)?&&?remove(command))
??????????reject(command);
???????//?2.2?如果當(dāng)前沒有工作線程,
???????//?則新建一個工作線程來執(zhí)行任務(wù)(任務(wù)已經(jīng)被添加到了任務(wù)隊列)
??????else?if?(workerCountOf(recheck)?==?0)
??????????addWorker(null,?false);
????}
????//?3.?隊列已經(jīng)滿了的情況下,則新啟動一個工作線程來執(zhí)行任務(wù)
????else?if?(!addWorker(command,?false))
??????reject(command);
??}
}
public?class?TaskQueue?extends?LinkedBlockingQueue<Runnable>?{
??//?通過setParent方法設(shè)置線程池
??private?volatile?ThreadPoolExecutor?parent?=?null;
??@Override
??public?boolean?offer(Runnable?o)?{
????
????if?(parent==null)?return?super.offer(o);
????//?1.?線程數(shù)已經(jīng)擴容到了最大線程數(shù),此時正常加入隊列
????if?(parent.getPoolSize()?==?parent.getMaximumPoolSize())?return?super.offer(o);
????//?2.?存在空閑線程將其加入到隊列中
????if?(parent.getSubmittedCount()<=(parent.getPoolSize()))?return?super.offer(o);
????//?3.?核心線程數(shù)少于最大線程數(shù),不加入隊列,而是會創(chuàng)建一個新的工作線程
????if?(parent.getPoolSize()return?false;
????
????//?加入隊列
????return?super.offer(o);
??}
}
比如核心線程數(shù)設(shè)置為1,最大線程數(shù)設(shè)置為2,每個任務(wù)執(zhí)行時間是5s,當(dāng)?shù)谝粋€任務(wù)提交之后,submittedCount=1,會創(chuàng)建一個工作線程執(zhí)行任務(wù),poolSize變成1(查看execute方法的第一步)。
此時第二個任務(wù)提交了,submittedCount的值為2。不符合offer方法中第一和第二個判斷,但是符合第三個判斷,返回false,表示加入隊列失敗(表示隊列已滿)
此時在回到execute的第三個條件判斷,直接啟動一個新的工作線程來執(zhí)行任務(wù)。
這樣就做到了優(yōu)先擴容到最大線程數(shù)。來不及處理的多余任務(wù)才會放入到隊列中。
關(guān)注我不迷路
我是我,不一樣的煙火
作者:think123, 一個試圖把問題想簡單的程序員。
"三思而后行 , think23"
