JUC線程池原理與實踐
點擊上方藍色字體,選擇“標星公眾號”
優(yōu)質(zhì)文章,第一時間送達
JUC的線程池架構(gòu)
1.Executor
Executor是Java異步任務(wù)的執(zhí)行者接口,目標是執(zhí)行目標任務(wù)。Executor作為執(zhí)行者角色,目的是提供一種將“任務(wù)提交者”與“任務(wù)執(zhí)行者”分離的機制。它只有一個函數(shù)式方法:
public?interface?Executor?{
????void?execute(Runnable?command);
}
2.ExecutorService
ExecutorService繼承于Executor。它對外提供異步任務(wù)的接收服務(wù)。ExecutorService提供了“接受異步任務(wù)并轉(zhuǎn)交給執(zhí)行者”的方法,比如submit、invoke方法等。具體如下:
public?interface?ExecutorService?extends?Executor?{
????void?shutdown();
????List?shutdownNow();
????boolean?isShutdown();
????boolean?isTerminated();
????boolean?awaitTermination(long?timeout,?TimeUnit?unit)
????????throws?InterruptedException;
?????Future?submit(Callable?task);
?????Future?submit(Runnable?task,?T?result);
????Future>?submit(Runnable?task);
?????List>?invokeAll(Collection?extends?Callable>?tasks)
????????throws?InterruptedException;
?????List>?invokeAll(Collection?extends?Callable>?tasks,
??????????????????????????????????long?timeout,?TimeUnit?unit)
????????throws?InterruptedException;
?????T?invokeAny(Collection?extends?Callable>?tasks)
????????throws?InterruptedException,?ExecutionException;
?????T?invokeAny(Collection?extends?Callable>?tasks,
????????????????????long?timeout,?TimeUnit?unit)
????????throws?InterruptedException,?ExecutionException,?TimeoutException;
}
3.AbstractExecutorService
AbstractExecutorService是一個抽象類,它實現(xiàn)了ExecutorService接口。AbstractExecutorService存在的目的是為ExecutorService中的接口提供默認實現(xiàn)。(模板模式)
4.ThreadPoolExecutor
大名鼎鼎的線程池實現(xiàn)類,繼承于AbstractExecutorService。它是核心實現(xiàn)類,它可以預(yù)先提供指定數(shù)量的可重用線程,可以對線程進行管理和監(jiān)控。
5.ScheduledExecutorService
她繼承于ExecutorService。是一個完成延時和周期性任務(wù)的接口。
6.Executors
是一個靜態(tài)工廠類,內(nèi)置的靜態(tài)工廠方法可以理解為快捷創(chuàng)建線程池的方法。

Executors的4種快捷創(chuàng)建線程池的方法
newSingleThreadExecutor 創(chuàng)建只有一個線程的線程池
newFixedThreadPool 創(chuàng)建固定大小的線程池
newCachedThreadPool 創(chuàng)建一個不限制線程數(shù)量的線程池,任何提交的任務(wù)都立即執(zhí)行,空閑線程會及時回收
newScheduledThreadPool 創(chuàng)建一個可定期或延時執(zhí)行任務(wù)的線程池
newSingleThreadExecutor
public?static?void?main(String[]?args)?{
???????final?AtomicInteger?integer?=?new?AtomicInteger(0);
???????ExecutorService?pool?=?Executors.newSingleThreadExecutor();
???????for?(int?i?=?0;?i?5;?i++)?{
???????????pool.execute(()?->?{
???????????????System.out.println(Thread.currentThread()?+?"?:doing"?+?"-"?+?integer.incrementAndGet());
???????????????try?{
???????????????????Thread.sleep(500);
???????????????}?catch?(InterruptedException?e)?{
???????????????????e.printStackTrace();
???????????????}
???????????});
???????}
???????pool.shutdown();
}
Thread[pool-1-thread-1,5,main]?:doing-1
Thread[pool-1-thread-1,5,main]?:doing-2
Thread[pool-1-thread-1,5,main]?:doing-3
Thread[pool-1-thread-1,5,main]?:doing-4
Thread[pool-1-thread-1,5,main]?:doing-5
場景:任務(wù)按照提交順序,一個任務(wù)一個任務(wù)逐個執(zhí)行。
以上代碼最后調(diào)用shutdown來關(guān)閉線程池。執(zhí)行shutdown方法后,線程池狀態(tài)變?yōu)閟hutdown,線程池將拒絕新任務(wù),不能再往線程池中添加新任務(wù)。此時,線程池不會立刻退出,直到線程池中的任務(wù)處理完成后才會退出。還有一個shutdownNow方法,執(zhí)行這個后,線程狀態(tài)變?yōu)閟top,試圖停止所有正在執(zhí)行的線程,并且不再處理阻塞隊列中等待的任務(wù),會返回那些未執(zhí)行的任務(wù)。
newFixedThreadPool
ExecutorService?pool?=?Executors.newFixedThreadPool(3);
Thread[pool-1-thread-1,5,main]?:doing-1
Thread[pool-1-thread-3,5,main]?:doing-2
Thread[pool-1-thread-2,5,main]?:doing-3
Thread[pool-1-thread-3,5,main]?:doing-4
Thread[pool-1-thread-1,5,main]?:doing-5
適用場景:需要任務(wù)長期執(zhí)行的場景。“固定數(shù)量的線程池”能穩(wěn)定的保證一個數(shù),避免頻繁 回收和創(chuàng)建線程,適用于CPU密集型的任務(wù),在CPU被線程長期占用的情況下,能確保少分配線程。
弊端:內(nèi)部使用無界隊列存放任務(wù),當(dāng)有大量任務(wù),隊列無限增大,服務(wù)器資源迅速耗盡。
newFixedThreadPool工廠方法返回一個ThreadPoolExecutor實例,該線程池實例的corePoolSize數(shù)量為參數(shù)nThread,其maximumPoolSize數(shù)量也為參數(shù)nThread,其workQueue屬性的值為LinkedBlockingQueue
newCachedThreadPool
線程池內(nèi)的某些線程無事可干成為空閑線程,可以靈活回收這些空閑線程。
ExecutorService?pool?=?Executors.newCachedThreadPool();
Thread[pool-1-thread-5,5,main]?:doing-5
Thread[pool-1-thread-1,5,main]?:doing-1
Thread[pool-1-thread-2,5,main]?:doing-2
Thread[pool-1-thread-3,5,main]?:doing-3
Thread[pool-1-thread-4,5,main]?:doing-4
特點:在執(zhí)行任務(wù)時,如果池內(nèi)所有線程忙,則會添加新線程來處理。不會限制線程的大小,完全依賴于操作系統(tǒng)能夠創(chuàng)建的最大線程大小。如果存量線程超過了處理任務(wù)數(shù)量,就會回收線程。、
適用場景:快速處理突發(fā)性強、耗時短的任務(wù)場景,如Netty的NIO處理場景、REST API接口的瞬時削峰場景。
弊端:沒有最大線程數(shù)量限制,如果大量的異步任務(wù)提交,服務(wù)器資源可能耗盡。
newScheduledThreadPool
public?static?void?main(String[]?args)?{
????????final?AtomicInteger?integer?=?new?AtomicInteger(0);
????????ScheduledExecutorService?pool?=?Executors.newScheduledThreadPool(5);
????????for?(int?i?=?0;?i?5;?i++)?{
????????????pool.scheduleAtFixedRate(
????????????????????()?->?{
????????????????????????System.out.println(Thread.currentThread()?+?"?:doing"?+?"-"?+?integer.incrementAndGet());
????????????????????},?0,?500,?TimeUnit.MILLISECONDS);
????????????//?0表示首次執(zhí)行任務(wù)的執(zhí)行時間,500表示每次執(zhí)行任務(wù)的間隔時間
????????}
//????????pool.shutdown();
}
因為可以周期性執(zhí)行任務(wù),所以不shutdown。
適用場景:周期性執(zhí)行任務(wù)的場景。
線程池的標準創(chuàng)建方式
使用ThreadPoolExecutor構(gòu)造方法創(chuàng)建,一個比較重要呃構(gòu)造器如下:
public?ThreadPoolExecutor(int?corePoolSize,核心線程數(shù)
??????????????????????????????int?maximumPoolSize,?最大線程數(shù)
??????????????????????????????long?keepAliveTime,?TimeUnit?unit,?空閑時間
??????????????????????????????BlockingQueue?workQueue,?阻塞隊列
??????????????????????????????ThreadFactory?threadFactory,?線程工廠(線程產(chǎn)生方式)
??????????????????????????????RejectedExecutionHandler?handler?拒絕策略)?{
????...
}
1.核心和最大線程數(shù)量
接收新任務(wù)時,并且當(dāng)前工作線程池數(shù)少于核心線程數(shù)量,即使有工作線程是空閑的,它也會創(chuàng)建新線程處理任務(wù),直到達到核心線程數(shù)。
2.BlockingQueue
阻塞隊列用于暫時接收任務(wù)。
3.KeepAliveTime
設(shè)置線程最大空閑時長,如果超過這個時間,非核心線程會被回收。當(dāng)然,也可以調(diào)用allowCoreThreadTimeOut方法將超時策略應(yīng)用到核心線程。
線程池的任務(wù)調(diào)度流程
工作線程數(shù)量小于核心線程數(shù)量,執(zhí)行新任務(wù)時會優(yōu)先創(chuàng)建線程,而不是獲取空閑線程。
任務(wù)數(shù)量大于核心線程數(shù)量,新任務(wù)將被加入阻塞隊列中。執(zhí)行任務(wù)時,也是先從阻塞隊列中獲取任務(wù)。
在核心線程用完,阻塞隊列已滿的情況下,會創(chuàng)建非核心線程處理新任務(wù)。
在如果線程池總數(shù)超過maximumPoolSize,線程池會拒絕接收任務(wù),為新任務(wù)執(zhí)行拒絕策略。

ThreadFactory(線程工廠)
創(chuàng)建線程方式
阻塞隊列
阻塞隊列與普通度列相比:阻塞隊列為空時,會阻塞當(dāng)前線程的元素獲取操作。當(dāng)隊列中有元素,被阻塞的線程會被自動喚醒。
BlockingQueue是JUC包的一個超級接口,比較常用的實現(xiàn)類有:
(1)ArrayBlockingQueue:數(shù)組隊列
(2)LinkedBlockingQueue:鏈表隊列
(3)PriorityBlockingQueue:優(yōu)先級隊列
(4)DelayQueue:延遲隊列
(5)SynchronousQueue:同步隊列
調(diào)度器的鉤子方法
ThreadPoolExecutor為每個任務(wù)執(zhí)行前后都提供了鉤子方法。
//?任務(wù)執(zhí)行之前的鉤子方法(前鉤子)
protected?void?beforeExecute(Thread?t,?Runnable?r)?{?}
//?之后(后鉤子)
protected?void?afterExecute(Runnable?r,?Throwable?t)?{?}
//?終止(停止鉤子)
protected?void?terminated()?{?}
beforeExecute:可用于重新初始化ThreadLocal線程本地變量實例、更新日志記錄、計時統(tǒng)計等。
afterExecute:更新日志記錄、計時統(tǒng)計等。
terminated:Executor終止時調(diào)用。
演示一下前鉤子。
public?class?TestMain?{
????public?static?void?main(String[]?args)?{
????????final?ThreadPoolExecutor?pool?=?new?ThreadPoolExecutor(
????????????????2,
????????????????4,
????????????????60,?TimeUnit.SECONDS,
????????????????new?LinkedBlockingQueue<>(2))?{
????????????@Override
????????????protected?void?beforeExecute(Thread?t,?Runnable?r)?{
????????????????System.out.println("前鉤子嗷?~?~?~?");
????????????}
????????};
????????for?(int?i?=?0;?i?5;?i++)?{
????????????pool.execute(()?->?{
????????????????System.out.println("你誰啊");
????????????});
????????}
????}
}
線程池拒絕策略
任務(wù)被拒絕有兩種情況:
線程池已經(jīng)關(guān)閉。
工作隊列已滿且最大線程數(shù)已滿。
拒絕策略有以下實現(xiàn):
AbortPolicy:拒絕策略。拋異常。
DiscardPolicy:拋棄策略。丟棄新來的任務(wù)。
DiscardOldestPolicy:拋棄最老任務(wù)策略。因為隊列是隊尾進對頭出,所以每次都是移除隊頭元素后再入隊。
CallerRunsPolicy:調(diào)用者執(zhí)行策略。提交任務(wù)線程自己執(zhí)行任務(wù),不使用線程池中的線程。
自定義策略。實現(xiàn)RejectExecutionHandler接口的rejectedExecution方法。
? 作者?|??bllbl
來源 |??cnblogs.com/bllbl/p/15417779.html

