使用線程池時一定要注意的五個點
很多場景下應(yīng)用程序必須能夠處理一系列傳入請求,簡單的處理方式是通過一個線程順序的處理這些請求,如下圖:
單線程策略的優(yōu)勢和劣勢都非常明顯:
優(yōu)勢:設(shè)計和實現(xiàn)簡單;劣勢:這種方式會帶來處理效率的問題,單線程的處理能力是有限,不能發(fā)揮多核處理器優(yōu)勢。
在這種場景下我們就需要考慮并發(fā),一個簡單的并發(fā)策略就是Thread-Per-Message模式,即為每個請求使用一個新的線程。
Thread-Per-Message策略的優(yōu)勢和劣勢也非常明顯:
優(yōu)勢:設(shè)計和實現(xiàn)比較簡單,能夠同時處理多個請求,提升響應(yīng)效率;
劣勢:主要在兩個方面
1.資源消耗 引入了在串行執(zhí)行中所沒有的開銷,包括線程創(chuàng)建和調(diào)度,任務(wù)處理,資源分配和回收以及頻繁上下文切換所需的時間和資源。2.安全
攻擊者可以通過一次進行大量請求使系統(tǒng)癱瘓并且拒絕服務(wù) (DoS),從而導(dǎo)致系統(tǒng)立即不響應(yīng)而不是平滑地退出。 從安全角度來看,一個組件可能由于連續(xù)的錯誤而耗盡所有資源,因此使所有其他組件無法獲得資源。
有沒有一種方式可以并發(fā)執(zhí)行又可以克服Thread-Per-Message的問題?
采用線程池的策略,線程池通過控制并發(fā)執(zhí)行的工作線程的最大數(shù)量來解決Thread-Per-Message帶來的問題。可見下圖,請求來臨時先放入線程池的隊列

線程池可以接受一個Runnable或Callable<T>任務(wù),并將其存儲在臨時隊列中,當(dāng)有空閑線程時可以從隊列中拿到一個任務(wù)并執(zhí)行。
反例(使用 Thread-Per-Message 策略)
class Helper {
public void handle(Socket socket) {
// do something
}
}
final class RequestHandler {
private final Helper helper = new Helper();
//......
private RequestHandler(int port) throws IOException {
//do something
}
public void handleRequest() {
new Thread(new Runnable() {
public void run() {
try {
helper.handle(server.accept());
} catch (IOException e) {
// Forward to handler
}
}
}).start();
}
}
正例(使用 線程池 策略)
class Helper {
public void handle(Socket socket) {
// do something
}
}
final class RequestHandler {
private final Helper helper = new Helper();
private final ServerSocket server;
private final ExecutorService exec;
private RequestHandler(int port, int poolSize) throws IOException {
server = new ServerSocket(port);
exec = Executors.newFixedThreadPool(poolSize);
}
public static RequestHandler newInstance(int poolSize) throws IOException {
return new RequestHandler(0, poolSize);
}
public void handleRequest() {
Future<?> future = exec.submit(new Runnable() {
@Override
public void run() {
try {
helper.handle(server.accept());
} catch (IOException e) {
// Forward to handler
}
}
});
}
// ... Other methods such as shutting down the thread pool
// and task cancellation ...
}
JAVA 中(JDK 1.5+)線程池的種類:
newFixedThreadPool()newCachedThreadPool()newSingleThreadExecutor()newScheduledThreadPool()線程池的詳細使用方法可參見Java API文檔
二、不要在有界線程池中執(zhí)行相互依賴的任務(wù)
程序不能使用來自有界線程池的線程來執(zhí)行依賴于線程池中其他任務(wù)的任務(wù)。
有兩個場景:
當(dāng)線程池中正在執(zhí)行的線程阻塞在依賴于線程池中其他任務(wù)的完成上,這樣就會出現(xiàn)稱為線程饑餓(threadstarvation)死鎖的死鎖形式。 線程饑餓死鎖還會發(fā)生在當(dāng)前執(zhí)行的任務(wù)向線程池提交其他任務(wù)并等待這些任務(wù)完成的時候,然而此時線程池缺乏一次容納所有任務(wù)的能力。
要緩解上面兩個場景產(chǎn)生的問題有兩個簡單的辦法:
擴大線程池中的線程數(shù),以容納更多的任務(wù),但 決定一個線程池合適的大小可能是困難的甚至不可能的。 線程池中的隊列改為無界,由于系統(tǒng)資源有限,無界隊列只能說是盡可能容納任務(wù) 但饑餓死鎖的現(xiàn)象無法消除。
真正解決此類方法還是需要梳理線程池執(zhí)行業(yè)務(wù)流程,不要在有界線程池中執(zhí)行相互依賴的任務(wù),防止出現(xiàn)競爭和死鎖。
三、確保提交到線程池的任務(wù)可中斷
向線程池提交的任務(wù)需要支持中斷。從而保證線程可以中斷,線程池可以關(guān)閉。線程池支持 java.util.concurrent.ExecutorService.shutdownNow() 方法,該方法嘗試停止所有正在執(zhí)行的任務(wù),停止等待任務(wù)的處理,并返回等待執(zhí)行的任務(wù)的列表。
但是 shutdownNow() 除了盡力嘗試停止處理主動執(zhí)行的任務(wù)之外不能保證一定能夠停止。例如,典型的實現(xiàn)是通過Thread.interrupt()來停止,因此任何未能響應(yīng)中斷的任務(wù)可能永遠不會終止,也就造成線程池?zé)o法真正的關(guān)閉。
反例:
public final class Worker implements Runnable { // Thread‐safe class
private AtomBoolean flag = new AtomBoolean(true);
public Worker() throws IOException {
//do something
}
// Only one thread can use the socket at a particular time
@Override
public void run() {
try {
while (flag.get()) {
// do something
}
} catch (IOException ie) {
// Forward to handler
}
}
public void shutdown() {
this.flag.set(false);
}
}
正例:
public final class Worker implements Runnable { // Thread‐safe class
public Worker() throws IOException {
//do something
}
// Only one thread can use the socket at a particular time
@Override
public void run() {
try {
while (!Thread.interrupted()) {
// do something
}
} catch (IOException ie) {
// Forward to handler
}
}
}
四、確保在線程池中執(zhí)行的任務(wù)不能悄無聲息地失敗
線程池中的所有任務(wù)必須提供機制,如果它們異常終止,則需要通知應(yīng)用程序.
如果不這樣做不會導(dǎo)致資源泄漏,但由于池中的線程仍然被會重復(fù)使用,使故障診斷非常困難或不可能。
在應(yīng)用程序級別處理異常的最好方法是使用異常處理。異常處理可以執(zhí)行診斷操作,清理和關(guān)閉Java虛擬機,或者只是記錄故障的詳細信息。
也就是說在線程池里執(zhí)行的任務(wù)也需要能夠拋出異常并被捕獲處理。
任務(wù)恢復(fù)或清除操作可以通過重寫 java.util.concurrent.ThreadPoolExecutor 類的 afterExecute() 鉤子來執(zhí)行。
當(dāng)任務(wù)通過執(zhí)行其 run() 方法中的所有語句并且成功結(jié)束任務(wù),或者由于異常而導(dǎo)致任務(wù)停止時,將調(diào)用此鉤子。
可以通過自定義 ThreadPoolExecutor 服務(wù)來重載 afterExecute() 鉤子。
還可以通過重載 terminated() 方法來釋放線程池獲取的資源,就像一個finally塊。
反例:
final class PoolService {
private final ExecutorService pool = Executors.newFixedThreadPool(10);
public void doSomething() {
pool.execute(new Task());
}
}
final class Task implements Runnable {
@Override
public void run() {
// do something
throw new NullPointerException();
}
}
任務(wù)意外終止時作為一個運行時異常,無法通知應(yīng)用程序。此外,它缺乏恢復(fù)機制。因此,如果Task拋出一個NullPointerException ,異常將被忽略。
正例:
class CustomThreadPoolExecutor extends ThreadPoolExecutor {
// ... Constructor ...
public CustomThreadPoolExecutor(
int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t != null) {
// Exception occurred, forward to handler
}
// ... Perform task‐specific cleanup actions
}
@Override
public void terminated() {
super.terminated();
// ... Perform final clean‐up actions
}
}
另外一種方式是使用 ExecutorService.submit() 方法(代替 execute() 方法)將任務(wù)提交到線程池并獲取 Future 對象。
當(dāng)通過 ExecutorService.submit() 提交任務(wù)時,拋出的異常并未到達未捕獲的異常處理機制,因為拋出的異常被認為是返回狀態(tài)的一部分,因此被包裝在ExecutionException ,并由Future.get() 返回。
Future<?> future = threadPool.submit(new Task());
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Reset interrupted status
} catch (ExecutionException e) {
Throwable exception = e.getCause();
// Forward to exception reporter
}
五、確保在使用線程池時重新初始化ThreadLocal變量
java.lang.ThreadLocal 類提供線程內(nèi)的本地變量。根據(jù)Java API
這些變量與其它正常變量不同,每個線程訪問(通過其get或set方法)都有其屬于各自線程的,獨立初始化的變量拷貝。ThreadLocal實例通常是一些希望將狀態(tài)與線程(例如,用戶ID或事務(wù)ID)相關(guān)聯(lián)的類中的私有靜態(tài)字段。
ThreadLocal對象需要關(guān)注那些對象被線程池中的多個線程執(zhí)行的類。
線程池緩存技術(shù)允許線程重用以減少線程創(chuàng)建開銷,或者當(dāng)創(chuàng)建無限數(shù)量的線程時可以降低系統(tǒng)的可靠性。
當(dāng) ThreadLocal 對象在一個線程中被修改,隨后變得可重用時,在重用的線程上執(zhí)行的下一個任務(wù)將能看到該線程上執(zhí)行過的上一個任務(wù)修改的ThreadLocal 對象的狀態(tài)。
所以要在使用線程池時重新初始化的ThreadLocal對象實例。
反例:
public enum Day {
MONDAY, TUESDAY, WEDNESDAY, THURSDAY, FRIDAY, SATURDAY, SUNDAY;
}
public final class Diary {
private static final ThreadLocal<Day> days = new ThreadLocal<Day>() {
// Initialize to Monday
protected Day initialValue() {
return Day.MONDAY;
}
};
private static Day currentDay() {
return days.get();
}
public static void setDay(Day newDay) {
days.set(newDay);
}
// Performs some thread‐specific task
public void threadSpecificTask() {
// Do task ...
}
}
public final class DiaryPool {
final int numOfThreads = 2; // Maximum number of threads allowed in pool
final Executor exec;
final Diary diary;
DiaryPool() {
exec = (Executor) Executors.newFixedThreadPool(numOfThreads);
diary = new Diary();
}
public void doSomething1() {
exec.execute(new Runnable() {
@Override
public void run() {
diary.setDay(Day.FRIDAY);
diary.threadSpecificTask();
}
});
}
public void doSomething2() {
exec.execute(new Runnable() {
@Override
public void run() {
diary.threadSpecificTask();
}
});
}
public static void main(String[] args) {
DiaryPool dp = new DiaryPool();
dp.doSomething1(); // Thread 1, requires current day as Friday
dp.doSomething2(); // Thread 2, requires current day as Monday
dp.doSomething2(); // Thread 3, requires current day as Monday
}
}
DiaryPool類創(chuàng)建了一個線程池,它可以通過一個共享的無界的隊列來重用固定數(shù)量的線程。
在任何時候,不超過numOfThreads個線程正在處理任務(wù)。如果在所有線程都處于活動狀態(tài)時提交其他任務(wù),則 它們在隊列中等待,直到線程可用。
當(dāng)線程循環(huán)時,線程的線程局部狀態(tài)仍然存在。
下表顯示了可能的執(zhí)行順序:
| 時間 | 任務(wù) | 線程池 | 提交方法 | 日期 |
|---|---|---|---|---|
| 1 | t1 | 1 | doSomething1() | 星期五 |
| 2 | t2 | 2 | doSomething2() | 星期一 |
| 3 | t3 | 1 | doSomething3() | 星期五 |
在這個執(zhí)行順序中,期望從doSomething2() 開始的兩個任務(wù)( t 2和t 3 doSomething2() 將當(dāng)天視為星 期一。然而,因為池線程1被重用,所以t 3觀察到星期五。
解決方案(try-finally條款)
符合規(guī)則的方案removeDay() 方法添加到Diary類,并在try‐finally 塊中的實現(xiàn)doSomething1() 類的doSomething1() 方法的語句。finally 塊通過刪除當(dāng)前線程中的值來恢復(fù)threadlocal類型的days對象的初始狀態(tài)。
public final class Diary {
// ...
public static void removeDay() {
days.remove();
}
}
public final class DiaryPool {
// ...
public void doSomething1() {
exec.execute(new Runnable() {
@Override
public void run() {
try {
Diary.setDay(Day.FRIDAY);
diary.threadSpecificTask();
} finally {
Diary.removeDay(); // Diary.setDay(Day.MONDAY)
// can also be used
}
}
});
}
// ...
}
如果threadlocal變量再次被同一個線程讀取,它將使用initialValue()方法重新初始化 ,除非任務(wù)已經(jīng)明確設(shè)置了變量的值。這個解決方案將維護的責(zé)任轉(zhuǎn)移到客戶端( DiaryPool ),但是當(dāng)Diary類不能被修改時是一個好的選擇。
解決方案(beforeExecute())
使用一個自定義ThreadPoolExecutor 來擴展 ThreadPoolExecutor 并覆蓋beforeExecute() 方法。beforeExecute() 方法在 Runnable 任務(wù)在指定線程中執(zhí)行之前被調(diào)用。該方法在線程 “t” 執(zhí)行任務(wù) “r” 之前重新初始化 threadlocal 變量。
class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public void beforeExecute(Thread t, Runnable r) {
if (t == null || r == null) {
throw new NullPointerException();
}
Diary.setDay(Day.MONDAY);
super.beforeExecute(t, r);
}
}
public final class DiaryPool {
// ...
DiaryPool() {
exec = new CustomThreadPoolExecutor(NumOfthreads, NumOfthreads,
10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));
diary = new Diary();
}
// ...
}