10問10答:你真的了解線程池嗎?

《Java開發(fā)手冊》中強調(diào),線程資源必須通過線程池提供,而創(chuàng)建線程池必須使用ThreadPoolExecutor。手冊主要強調(diào)利用線程池避免兩個問題,一是線程過渡切換,二是避免請求過多時造成OOM。但是如果參數(shù)配置錯誤,還是會引發(fā)上面的兩個問題。所以本節(jié)我們主要是討論ThreadPoolExecutor的一些技術細節(jié),并且給出幾個常用的最佳實踐建議。
我在查找資料的過程中,發(fā)現(xiàn)有些問題存在爭議。后面發(fā)現(xiàn),一部分原因是因為不同JDK版本的現(xiàn)實是有差異的。因此,下面的分析是基于當下最常用的版本JDK1.8,并且對于存在爭議的問題,我們分析源碼,源碼才是最準確的。
1 corePoolSize=0會怎么樣
這是一個爭議點。我發(fā)現(xiàn)大部分博文,不論是國內(nèi)的還是國外的,都是這樣回答這個問題的:
提交任務后,先判斷當前池中線程數(shù)是否小于corePoolSize,如果小于,則創(chuàng)建新線程執(zhí)行這個任務。
否則,判斷等待隊列是否已滿,如果沒有滿,則添加到等待隊列。
否則,判斷當前池中線程數(shù)是否大于maximumPoolSize,如果大于則拒絕。
否則,創(chuàng)建一個新的線程執(zhí)行這個任務。
int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);// 注意這一行代碼,添加到等待隊列成功后,判斷當前池內(nèi)線程數(shù)是否為0,如果是則創(chuàng)建一個firstTask為null的worker,這個worker會從等待隊列中獲取任務并執(zhí)行。else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);
線程池提交任務后,首先判斷當前池中線程數(shù)是否小于corePoolSize。
如果小于則嘗試創(chuàng)建新的線程執(zhí)行該任務;否則嘗試添加到等待隊列。
如果添加隊列成功,判斷當前池內(nèi)線程數(shù)是否為0,如果是則創(chuàng)建一個firstTask為null的worker,這個worker會從等待隊列中獲取任務并執(zhí)行。
如果添加到等待隊列失敗,一般是隊列已滿,才會再嘗試創(chuàng)建新的線程。
但在創(chuàng)建之前需要與maximumPoolSize比較,如果小于則創(chuàng)建成功。
否則執(zhí)行拒絕策略。
prestartCoreThread:Starts a core thread, causing it to idly wait for work. This overrides the default policy of starting core threads only when new tasks are executed.
prestartAllCoreThreads:Starts all core threads.
corePoolSize=0:在一般情況下只使用一個線程消費任務,只有當并發(fā)請求特別多、等待隊列都滿了之后,才開始用多線程。
allowsCoreThreadTimeOut=true && corePoolSize>1:在一般情況下就開始使用多線程(corePoolSize個),當并發(fā)請求特別多,等待隊列都滿了之后,繼續(xù)加大線程數(shù)。但是當請求沒有的時候,允許核心線程也終止。
在這個while條件中,有個getTask()方法是核心中的核心,它所做的事情就是從等待隊列中取出任務來執(zhí)行:
如果沒有達到corePoolSize,則創(chuàng)建的Worker在執(zhí)行完它承接的任務后,會用workQueue.take()取任務、注意,這個接口是阻塞接口,如果取不到任務,Worker線程一直阻塞。
如果超過了corePoolSize,或者allowCoreThreadTimeOut,一個Worker在空閑了之后,會用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)取任務。注意,這個接口只阻塞等待keepAliveTime時間,超過這個時間返回null,則Worker的while循環(huán)執(zhí)行結束,則被終止了。
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 看這里,核心邏輯在這里while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 注意,核心中的核心在這里Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
虛擬機棧 本地方法棧 程序計數(shù)器
ThreadLocal:業(yè)務代碼是否使用了ThreadLocal?就算沒有,Spring框架中也大量使用了ThreadLocal,你所在公司的框架可能也是一樣。
局部變量:線程處于阻塞狀態(tài),肯定還有棧幀沒有出棧,棧幀中有局部變量表,凡是被局部變量表引用的內(nèi)存都不能回收。所以如果這個線程創(chuàng)建了比較大的局部變量,那么這一部分內(nèi)存無法GC。
TLAB機制:如果你的應用線程數(shù)處于高位,那么新的線程初始化可能因為Eden沒有足夠的空間分配TLAB而觸發(fā)YoungGC。
線程池保持空閑的核心線程是它的默認配置,一般來講是沒有問題的,因為它占用的內(nèi)存一般不大。怕的就是業(yè)務代碼中使用ThreadLocal緩存的數(shù)據(jù)過大又不清理。
如果你的應用線程數(shù)處于高位,那么需要觀察一下YoungGC的情況,估算一下Eden大小是否足夠。如果不夠的話,可能要謹慎地創(chuàng)建新線程,并且讓空閑的線程終止;必要的時候,可能需要對JVM進行調(diào)參。
如果我們使用execute()提交任務,我們一般要在Runable任務的代碼加上try-catch進行異常處理。
如果我們使用submit()提交任務,我們一般要在主線程中,對Future.get()進行try-catch進行異常處理。
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;}public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)// 核心代碼s = awaitDone(false, 0L);return report(s);}private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;// 死循環(huán)for (;;) {if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;// 只有任務的狀態(tài)是’已完成‘,才會跳出死循環(huán)if (s > COMPLETING) {if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // cannot time out yetThread.yield();else if (q == null)q = new WaitNode();else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}}
public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}// get方法中依賴的,報告執(zhí)行結果private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}
不論是用execute還是submit,都可以自己在業(yè)務代碼上加try-catch進行異常處理。我一般喜歡使用這種方式,因為我喜歡對不同業(yè)務場景的異常進行差異化處理,至少打不一樣的日志吧。
如果是execute,還可以自定義線程池,繼承ThreadPoolExecutor并復寫其afterExecute(Runnable r, Throwable t)方法。
或者實現(xiàn)Thread.UncaughtExceptionHandler接口,實現(xiàn)void uncaughtException(Thread t, Throwable e);方法,并將該handler傳遞給線程池的ThreadFactory。
但是注意,afterExecute和UncaughtExceptionHandler都不適用submit。因為通過上面的FutureTask.run()不難發(fā)現(xiàn),它自己對Throwable進行了try-catch,封裝到了outcome屬性,所以底層方法execute的Worker是拿不到異常信息的。
shutdown => 平緩關閉,等待所有已添加到線程池中的任務執(zhí)行完再關閉。
shutdownNow => 立刻關閉,停止正在執(zhí)行的任務,并返回隊列中未執(zhí)行的任務。
/*** Executes the given command at some time in the future. The command* may execute in a new thread, in a pooled thread, or in the calling* thread, at the discretion of the {@code Executor} implementation.*/
【強制】使用ThreadPoolExecutor的構造函數(shù)聲明線程池,避免使用Executors類的 newFixedThreadPool和newCachedThreadPool。
【強制】 創(chuàng)建線程或線程池時請指定有意義的線程名稱,方便出錯時回溯。即threadFactory參數(shù)要構造好。
【建議】建議不同類別的業(yè)務用不同的線程池。
【建議】CPU密集型任務(N+1):這種任務消耗的主要是CPU資源,可以將線程數(shù)設置為N(CPU核心數(shù))+1,比CPU核心數(shù)多出來的一個線程是為了防止線程偶發(fā)的缺頁中斷,或者其它原因導致的任務暫停而帶來的影響。一旦任務暫停,CPU就會處于空閑狀態(tài),而在這種情況下多出來的一個線程就可以充分利用CPU的空閑時間。
【建議】I/O密集型任務(2N):這種任務應用起來,系統(tǒng)會用大部分的時間來處理I/O交互,而線程在處理I/O的時間段內(nèi)不會占用CPU來處理,這時就可以將CPU交出給其它線程使用。因此在I/O密集型任務的應用中,我們可以多配置一些線程,具體的計算方法是2N。
【建議】workQueue不要使用無界隊列,盡量使用有界隊列。避免大量任務等待,造成OOM。
【建議】如果是資源緊張的應用,使用allowsCoreThreadTimeOut可以提高資源利用率。
【建議】雖然使用線程池有多種異常處理的方式,但在任務代碼中,使用try-catch最通用,也能給不同任務的異常處理做精細化。
【建議】對于資源緊張的應用,如果擔心線程池資源使用不當,可以利用ThreadPoolExecutor的API實現(xiàn)簡單的監(jiān)控,然后進行分析和優(yōu)化。

private static final ThreadPoolExecutor pool;static {ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("po-detail-pool-%d").build();pool = new ThreadPoolExecutor(4, 8, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(512), threadFactory, new ThreadPoolExecutor.AbortPolicy());pool.allowCoreThreadTimeOut(true);}
threadFactory:給出帶業(yè)務語義的線程命名。
corePoolSize:快速啟動4個線程處理該業(yè)務,是足夠的。
maximumPoolSize:IO密集型業(yè)務,我的服務器是4C8G的,所以4*2=8。
keepAliveTime:服務器資源緊張,讓空閑的線程快速釋放。
pool.allowCoreThreadTimeOut(true):也是為了在可以的時候,讓線程釋放,釋放資源。
workQueue:一個任務的執(zhí)行時長在100~300ms,業(yè)務高峰期8個線程,按照10s超時(已經(jīng)很高了)。10s鐘,8個線程,可以處理10 * 1000ms / 200ms * 8 = 400個任務左右,往上再取一點,512已經(jīng)很多了。
handler:極端情況下,一些任務只能丟棄,保護服務端。
有道無術,術可成;有術無道,止于術
歡迎大家關注Java之道公眾號
好文章,我在看??
評論
圖片
表情
