Java線程池的實現(xiàn)原理,你清楚么?
來源:blog.csdn.net/u013332124/article/details/79587436
原理概述

其實java線程池的實現(xiàn)原理很簡單,說白了就是一個線程集合workerSet和一個阻塞隊列workQueue。當(dāng)用戶向線程池提交一個任務(wù)(也就是線程)時,線程池會先將任務(wù)放入workQueue中。
workerSet中的線程會不斷的從workQueue中獲取線程然后執(zhí)行。當(dāng)workQueue中沒有任務(wù)的時候,worker就會阻塞,直到隊列中有任務(wù)了就取出來繼續(xù)執(zhí)行。
線程池的幾個主要參數(shù)的作用
public?ThreadPoolExecutor(int?corePoolSize,
?????????????int?maximumPoolSize,
?????????????long?keepAliveTime,
?????????????TimeUnit?unit,
?????????????BlockingQueue?workQueue,
?????????????ThreadFactory?threadFactory,
?????????????RejectedExecutionHandler?handler)
corePoolSize: 規(guī)定線程池有幾個線程(worker)在運行。 maximumPoolSize: 當(dāng)workQueue滿了,不能添加任務(wù)的時候,這個參數(shù)才會生效。規(guī)定線程池最多只能有多少個線程(worker)在執(zhí)行。 keepAliveTime: 超出corePoolSize大小的那些線程的生存時間,這些線程如果長時間沒有執(zhí)行任務(wù)并且超過了keepAliveTime設(shè)定的時間,就會消亡。 unit: 生存時間對于的單位 workQueue: 存放任務(wù)的隊列 threadFactory: 創(chuàng)建線程的工廠 handler: 當(dāng)workQueue已經(jīng)滿了,并且線程池線程數(shù)已經(jīng)達到maximumPoolSize,將執(zhí)行拒絕策略。
任務(wù)提交后的流程分析
用戶通過submit提交一個任務(wù)。線程池會執(zhí)行如下流程:
判斷當(dāng)前運行的worker數(shù)量是否超過corePoolSize,如果不超過corePoolSize。就創(chuàng)建一個worker直接執(zhí)行該任務(wù)?!?線程池最開始是沒有worker在運行的 如果正在運行的worker數(shù)量超過或者等于corePoolSize,那么就將該任務(wù)加入到workQueue隊列中去。 如果workQueue隊列滿了,也就是offer方法返回false的話,就檢查當(dāng)前運行的worker數(shù)量是否小于maximumPoolSize,如果小于就創(chuàng)建一個worker直接執(zhí)行該任務(wù)。 如果當(dāng)前運行的worker數(shù)量是否大于等于maximumPoolSize,那么就執(zhí)行RejectedExecutionHandler來拒絕這個任務(wù)的提交。
源碼解析
我們先來看一下ThreadPoolExecutor中的幾個關(guān)鍵屬性。
//這個屬性是用來存放?當(dāng)前運行的worker數(shù)量以及線程池狀態(tài)的
//int是32位的,這里把int的高3位拿來充當(dāng)線程池狀態(tài)的標志位,后29位拿來充當(dāng)當(dāng)前運行worker的數(shù)量
private?final?AtomicInteger?ctl?=?new?AtomicInteger(ctlOf(RUNNING,?0));
//存放任務(wù)的阻塞隊列
private?final?BlockingQueue?workQueue;
//worker的集合,用set來存放
private?final?HashSet?workers?=?new?HashSet();
//歷史達到的worker數(shù)最大值
private?int?largestPoolSize;
//當(dāng)隊列滿了并且worker的數(shù)量達到maxSize的時候,執(zhí)行具體的拒絕策略
private?volatile?RejectedExecutionHandler?handler;
//超出coreSize的worker的生存時間
private?volatile?long?keepAliveTime;
//常駐worker的數(shù)量
private?volatile?int?corePoolSize;
//最大worker的數(shù)量,一般當(dāng)workQueue滿了才會用到這個參數(shù)
private?volatile?int?maximumPoolSize;
1. 提交任務(wù)相關(guān)源碼
下面是execute方法的源碼
public?void?execute(Runnable?command)?{
????????if?(command?==?null)
????????????throw?new?NullPointerException();
????????int?c?=?ctl.get();
????????//workerCountOf(c)會獲取當(dāng)前正在運行的worker數(shù)量
????????if?(workerCountOf(c)?????????????//如果workerCount小于corePoolSize,就創(chuàng)建一個worker然后直接執(zhí)行該任務(wù)
????????????if?(addWorker(command,?true))
????????????????return;
????????????c?=?ctl.get();
????????}
????????//isRunning(c)是判斷線程池是否在運行中,如果線程池被關(guān)閉了就不會再接受任務(wù)
????????//后面將任務(wù)加入到隊列中
????????if?(isRunning(c)?&&?workQueue.offer(command))?{
????????????//如果添加到隊列成功了,會再檢查一次線程池的狀態(tài)
????????????int?recheck?=?ctl.get();
????????????//如果線程池關(guān)閉了,就將剛才添加的任務(wù)從隊列中移除
????????????if?(!?isRunning(recheck)?&&?remove(command))
????????????????//執(zhí)行拒絕策略
????????????????reject(command);
????????????else?if?(workerCountOf(recheck)?==?0)
????????????????addWorker(null,?false);
????????}
????????//如果加入隊列失敗,就嘗試直接創(chuàng)建worker來執(zhí)行任務(wù)
????????else?if?(!addWorker(command,?false))
????????????//如果創(chuàng)建worker失敗,就執(zhí)行拒絕策略
????????????reject(command);
}
添加worker的方法addWorker源碼
private?boolean?addWorker(Runnable?firstTask,?boolean?core)?{
????????retry:
????????//使用自旋+cas失敗重試來保證線程競爭問題
????????for?(;;)?{
????????????//先獲取線程池的狀態(tài)
????????????int?c?=?ctl.get();
????????????int?rs?=?runStateOf(c);
????????????//?如果線程池是關(guān)閉的,或者workQueue隊列非空,就直接返回false,不做任何處理
????????????if?(rs?>=?SHUTDOWN?&&
????????????????!?(rs?==?SHUTDOWN?&&
???????????????????firstTask?==?null?&&
???????????????????!?workQueue.isEmpty()))
????????????????return?false;
????????????for?(;;)?{
????????????????int?wc?=?workerCountOf(c);
????????????????//根據(jù)入?yún)ore?來判斷可以創(chuàng)建的worker數(shù)量是否達到上限,如果達到上限了就拒絕創(chuàng)建worker
????????????????if?(wc?>=?CAPACITY?||
????????????????????wc?>=?(core???corePoolSize?:?maximumPoolSize))
????????????????????return?false;
????????????????//沒有的話就嘗試修改ctl添加workerCount的值。這里用了cas操作,如果失敗了下一個循環(huán)會繼續(xù)重試,直到設(shè)置成功
????????????????if?(compareAndIncrementWorkerCount(c))
????????????????????//如果設(shè)置成功了就跳出外層的那個for循環(huán)
????????????????????break?retry;
????????????????//重讀一次ctl,判斷如果線程池的狀態(tài)改變了,會再重新循環(huán)一次
????????????????c?=?ctl.get();??//?Re-read?ctl
????????????????if?(runStateOf(c)?!=?rs)
????????????????????continue?retry;
????????????}
????????}
????????boolean?workerStarted?=?false;
????????boolean?workerAdded?=?false;
????????Worker?w?=?null;
????????try?{
????????????final?ReentrantLock?mainLock?=?this.mainLock;
????????????//創(chuàng)建一個worker,將提交上來的任務(wù)直接交給worker
????????????w?=?new?Worker(firstTask);
????????????final?Thread?t?=?w.thread;
????????????if?(t?!=?null)?{
????????????????//加鎖,防止競爭
????????????????mainLock.lock();
????????????????try?{
????????????????????int?c?=?ctl.get();
????????????????????int?rs?=?runStateOf(c);
????????????????????//還是判斷線程池的狀態(tài)
????????????????????if?(rs?????????????????????????(rs?==?SHUTDOWN?&&?firstTask?==?null))?{
????????????????????????//如果worker的線程已經(jīng)啟動了,會拋出異常
????????????????????????if?(t.isAlive())?
??????????????????????????????throw?new?IllegalThreadStateException();
????????????????????????//添加新建的worker到線程池中
????????????????????????workers.add(w);
????????????????????????int?s?=?workers.size();
????????????????????????//更新歷史worker數(shù)量的最大值
????????????????????????if?(s?>?largestPoolSize)
????????????????????????????largestPoolSize?=?s;
????????????????????????//設(shè)置新增標志位
????????????????????????workerAdded?=?true;
????????????????????}
????????????????}?finally?{
????????????????????mainLock.unlock();
????????????????}
????????????????//如果worker是新增的,就啟動該線程
????????????????if?(workerAdded)?{
????????????????????t.start();
?????????????????????//成功啟動了線程,設(shè)置對應(yīng)的標志位
????????????????????workerStarted?=?true;
????????????????}
????????????}
????????}?finally?{
????????????//如果啟動失敗了,會觸發(fā)執(zhí)行相應(yīng)的方法
????????????if?(!?workerStarted)
????????????????addWorkerFailed(w);
????????}
????????return?workerStarted;
}
2. Worker的結(jié)構(gòu)
Worker是ThreadPoolExecutor內(nèi)部定義的一個內(nèi)部類。我們先看一下Worker的繼承關(guān)系
private?final?class?Worker?extends?AbstractQueuedSynchronizer?implements?Runnable
它實現(xiàn)了Runnable接口,所以可以拿來當(dāng)線程用。同時它還繼承了AbstractQueuedSynchronizer同步器類,主要用來實現(xiàn)一個不可重入的鎖。
一些屬性還有構(gòu)造方法:
//運行的線程,前面addWorker方法中就是直接通過啟動這個線程來啟動這個worker
final?Thread?thread;
//當(dāng)一個worker剛創(chuàng)建的時候,就先嘗試執(zhí)行這個任務(wù)
Runnable?firstTask;
//記錄完成任務(wù)的數(shù)量
volatile?long?completedTasks;
Worker(Runnable?firstTask)?{
????????????setState(-1);?//?inhibit?interrupts?until?runWorker
????????????this.firstTask?=?firstTask;
????????????//創(chuàng)建一個Thread,將自己設(shè)置給他,后面這個thread啟動的時候,也就是執(zhí)行worker的run方法
????????????this.thread?=?getThreadFactory().newThread(this);
}
worker的run方法
public?void?run()?{
????????????//這里調(diào)用了ThreadPoolExecutor的runWorker方法
????????????runWorker(this);
}
ThreadPoolExecutor的runWorker方法
final?void?runWorker(Worker?w)?{
????????//獲取當(dāng)前線程
????????Thread?wt?=?Thread.currentThread();
????????Runnable?task?=?w.firstTask;
????????w.firstTask?=?null;
????????//執(zhí)行unlock方法,允許其他線程來中斷自己
????????w.unlock();?//?allow?interrupts
????????boolean?completedAbruptly?=?true;
????????try?{
????????????//如果前面的firstTask有值,就直接執(zhí)行這個任務(wù)
????????????//如果沒有具體的任務(wù),就執(zhí)行g(shù)etTask()方法從隊列中獲取任務(wù)
????????????//這里會不斷執(zhí)行循環(huán)體,除非線程中斷或者getTask()返回null才會跳出這個循環(huán)
????????????while?(task?!=?null?||?(task?=?getTask())?!=?null)?{
????????????????//執(zhí)行任務(wù)前先鎖住,這里主要的作用就是給shutdown方法判斷worker是否在執(zhí)行中的
????????????????//shutdown方法里面會嘗試給這個線程加鎖,如果這個線程在執(zhí)行,就不會中斷它
????????????????w.lock();
???????????????//判斷線程池狀態(tài),如果線程池被強制關(guān)閉了,就馬上退出
????????????????if?((runStateAtLeast(ctl.get(),?STOP)?||
?????????????????????(Thread.interrupted()?&&
??????????????????????runStateAtLeast(ctl.get(),?STOP)))?&&
????????????????????!wt.isInterrupted())
????????????????????wt.interrupt();
????????????????try?{
????????????????????//執(zhí)行任務(wù)前調(diào)用。預(yù)留的方法,可擴展
????????????????????beforeExecute(wt,?task);
????????????????????Throwable?thrown?=?null;
????????????????????try?{
????????????????????????//真正的執(zhí)行任務(wù)
????????????????????????task.run();
????????????????????}?catch?(RuntimeException?x)?{
????????????????????????thrown?=?x;?throw?x;
????????????????????}?catch?(Error?x)?{
????????????????????????thrown?=?x;?throw?x;
????????????????????}?catch?(Throwable?x)?{
????????????????????????thrown?=?x;?throw?new?Error(x);
????????????????????}?finally?{
???????????????????????//執(zhí)行任務(wù)后調(diào)用。預(yù)留的方法,可擴展
????????????????????????afterExecute(task,?thrown);
????????????????????}
????????????????}?finally?{
????????????????????task?=?null;
????????????????????//記錄完成的任務(wù)數(shù)量
????????????????????w.completedTasks++;
????????????????????w.unlock();
????????????????}
????????????}
????????????completedAbruptly?=?false;
????????}?finally?{
????????????processWorkerExit(w,?completedAbruptly);
????????}
}
下面來看一下getTask()方法,這里面涉及到keepAliveTime的使用,從這個方法我們可以看出先吃池是怎么讓超過corePoolSize的那部分worker銷毀的。
private?Runnable?getTask()?{
????????boolean?timedOut?=?false;?
????????for?(;;)?{
????????????int?c?=?ctl.get();
????????????int?rs?=?runStateOf(c);
????????????//?如果線程池已經(jīng)關(guān)閉了,就直接返回null,
????????????//如果這里返回null,調(diào)用的那個worker就會跳出while循環(huán),然后執(zhí)行完銷毀線程
????????????//SHUTDOWN狀態(tài)表示執(zhí)行了shutdown()方法
????????????//STOP表示執(zhí)行了shutdownNow()方法
????????????if?(rs?>=?SHUTDOWN?&&?(rs?>=?STOP?||?workQueue.isEmpty()))?{
????????????????decrementWorkerCount();
????????????????return?null;
????????????}
????????????//獲取當(dāng)前正在運行中的worker數(shù)量
????????????int?wc?=?workerCountOf(c);
????????????//?如果設(shè)置了核心worker也會超時或者當(dāng)前正在運行的worker數(shù)量超過了corePoolSize,就要根據(jù)時間判斷是否要銷毀線程了
????????????//其實就是從隊列獲取任務(wù)的時候要不要設(shè)置超時間時間,如果超過這個時間隊列還沒有任務(wù)進來,就會返回null
????????????boolean?timed?=?allowCoreThreadTimeOut?||?wc?>?corePoolSize;
????????????
????????????//如果上一次循環(huán)從隊列獲取到的未null,這時候timedOut就會為true了
????????????if?((wc?>?maximumPoolSize?||?(timed?&&?timedOut))
????????????????&&?(wc?>?1?||?workQueue.isEmpty()))?{
????????????????//通過cas來設(shè)置WorkerCount,如果多個線程競爭,只有一個可以設(shè)置成功
????????????????//最后如果沒設(shè)置成功,就進入下一次循環(huán),說不定下一次worker的數(shù)量就沒有超過corePoolSize了,也就不用銷毀worker了
????????????????if?(compareAndDecrementWorkerCount(c))
????????????????????return?null;
????????????????continue;
????????????}
????????????try?{
????????????????//如果要設(shè)置超時時間,就設(shè)置一下咯
????????????????//過了這個keepAliveTime時間還沒有任務(wù)進隊列就會返回null,那worker就會銷毀
????????????????Runnable?r?=?timed??
????????????????????workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS)?:
????????????????????workQueue.take();
????????????????if?(r?!=?null)
????????????????????return?r;
????????????????//如果r為null,就設(shè)置timedOut為true
????????????????timedOut?=?true;
????????????}?catch?(InterruptedException?retry)?{
????????????????timedOut?=?false;
????????????}
????????}
}
3. 添加Callable任務(wù)的實現(xiàn)源碼
public??Future?submit(Callable?task) ? {
????????if?(task?==?null)?throw?new?NullPointerException();
????????RunnableFuture?ftask?=?newTaskFor(task);
????????execute(ftask);
????????return?ftask;
}
要添加一個有返回值的任務(wù)的實現(xiàn)也很簡單。其實就是對任務(wù)做了一層封裝,將其封裝成Future,然后提交給線程池執(zhí)行,最后返回這個future。
這里的 newTaskFor(task) 方法會將其封裝成一個FutureTask類。
外部的線程拿到這個future,執(zhí)行g(shù)et()方法的時候,如果任務(wù)本身沒有執(zhí)行完,執(zhí)行線程就會被阻塞,直到任務(wù)執(zhí)行完。
下面是FutureTask的get方法
public?V?get()?throws?InterruptedException,?ExecutionException?{
????????int?s?=?state;
????????//判斷狀態(tài),如果任務(wù)還沒執(zhí)行完,就進入休眠,等待喚醒
????????if?(s?<=?COMPLETING)
????????????s?=?awaitDone(false,?0L);
????????//返回值
????????return?report(s);
}
FutureTask中通過一個state狀態(tài)來判斷任務(wù)是否完成。當(dāng)run方法執(zhí)行完后,會將state狀態(tài)置為完成,同時喚醒所有正在等待的線程。我們可以看一下FutureTask的run方法
public?void?run()?{
????????//判斷線程的狀態(tài)
????????if?(state?!=?NEW?||
????????????!UNSAFE.compareAndSwapObject(this,?runnerOffset,
?????????????????????????????????????????null,?Thread.currentThread()))
????????????return;
????????try?{
????????????Callable?c?=?callable;
????????????if?(c?!=?null?&&?state?==?NEW)?{
????????????????V?result;
????????????????boolean?ran;
????????????????try?{
????????????????????//執(zhí)行call方法
????????????????????result?=?c.call();
????????????????????ran?=?true;
????????????????}?catch?(Throwable?ex)?{
????????????????????result?=?null;
????????????????????ran?=?false;
????????????????????setException(ex);
????????????????}
????????????????if?(ran)
????????????????????//這個方法里面會設(shè)置返回內(nèi)容,并且喚醒所以等待中的線程
????????????????????set(result);
????????????}
????????}?finally?{
????????????runner?=?null;
????????????int?s?=?state;
????????????if?(s?>=?INTERRUPTING)
????????????????handlePossibleCancellationInterrupt(s);
????????}
}
4. shutdown和shutdownNow方法的實現(xiàn)
shutdown方法會將線程池的狀態(tài)設(shè)置為SHUTDOWN,線程池進入這個狀態(tài)后,就拒絕再接受任務(wù),然后會將剩余的任務(wù)全部執(zhí)行完
public?void?shutdown()?{
????????final?ReentrantLock?mainLock?=?this.mainLock;
????????mainLock.lock();
????????try?{
????????????//檢查是否可以關(guān)閉線程
????????????checkShutdownAccess();
????????????//設(shè)置線程池狀態(tài)
????????????advanceRunState(SHUTDOWN);
????????????//嘗試中斷worker
????????????interruptIdleWorkers();
?????????????//預(yù)留方法,留給子類實現(xiàn)
????????????onShutdown();?//?hook?for?ScheduledThreadPoolExecutor
????????}?finally?{
????????????mainLock.unlock();
????????}
????????tryTerminate();
}
private?void?interruptIdleWorkers()?{
????????interruptIdleWorkers(false);
}
private?void?interruptIdleWorkers(boolean?onlyOne)?{
????????final?ReentrantLock?mainLock?=?this.mainLock;
????????mainLock.lock();
????????try?{
????????????//遍歷所有的worker
????????????for?(Worker?w?:?workers)?{
????????????????Thread?t?=?w.thread;
????????????????//先嘗試調(diào)用w.tryLock(),如果獲取到鎖,就說明worker是空閑的,就可以直接中斷它
????????????????//注意的是,worker自己本身實現(xiàn)了AQS同步框架,然后實現(xiàn)的類似鎖的功能
????????????????//它實現(xiàn)的鎖是不可重入的,所以如果worker在執(zhí)行任務(wù)的時候,會先進行加鎖,這里tryLock()就會返回false
????????????????if?(!t.isInterrupted()?&&?w.tryLock())?{
????????????????????try?{
????????????????????????t.interrupt();
????????????????????}?catch?(SecurityException?ignore)?{
????????????????????}?finally?{
????????????????????????w.unlock();
????????????????????}
????????????????}
????????????????if?(onlyOne)
????????????????????break;
????????????}
????????}?finally?{
????????????mainLock.unlock();
????????}
}
shutdownNow做的比較絕,它先將線程池狀態(tài)設(shè)置為STOP,然后拒絕所有提交的任務(wù)。最后中斷左右正在運行中的worker,然后清空任務(wù)隊列。
public?List?shutdownNow()? {
????????List?tasks;
????????final?ReentrantLock?mainLock?=?this.mainLock;
????????mainLock.lock();
????????try?{
????????????checkShutdownAccess();
????????????//檢測權(quán)限
????????????advanceRunState(STOP);
????????????//中斷所有的worker
????????????interruptWorkers();
????????????//清空任務(wù)隊列
????????????tasks?=?drainQueue();
????????}?finally?{
????????????mainLock.unlock();
????????}
????????tryTerminate();
????????return?tasks;
}
private?void?interruptWorkers()?{
????????final?ReentrantLock?mainLock?=?this.mainLock;
????????mainLock.lock();
????????try?{
????????????//遍歷所有worker,然后調(diào)用中斷方法
????????????for?(Worker?w?:?workers)
????????????????w.interruptIfStarted();
????????}?finally?{
????????????mainLock.unlock();
????????}
}
總結(jié)
java線程池的實現(xiàn)原理還是挺簡單的。但是有一些細節(jié)還是需要去看源碼才能得出答案。另外,附送學(xué)習(xí)資源:Java進階視頻資源
本文也沒辦法把所有的源碼都講解一遍,只列了比較重要的一些源碼。有興趣的同學(xué)可以自己打開源碼好好看一下,肯定會對實現(xiàn)原理了解的更加深刻。
最后,如果本文有哪里說的不對或者遺漏的地方,也煩請指出,感激不盡。
程序汪資料鏈接
堪稱神級的Spring Boot手冊,從基礎(chǔ)入門到實戰(zhàn)進階
臥槽!字節(jié)跳動《算法中文手冊》火了,完整版 PDF 開放下載!
臥槽!阿里大佬總結(jié)的《圖解Java》火了,完整版PDF開放下載!
字節(jié)跳動總結(jié)的設(shè)計模式 PDF 火了,完整版開放下載!
歡迎添加程序汪個人微信 itwang008? 進粉絲群或圍觀朋友圈
