<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          深入淺出線程池

          共 28889字,需瀏覽 58分鐘

           ·

          2023-10-28 20:11



          一、線程

          1、什么是線程

          線程(thread)是操作系統(tǒng)能夠進(jìn)行運(yùn)算調(diào)度的最小單位。它被包含在進(jìn)程之中,是進(jìn)程中的實(shí)際 運(yùn)作單位。一條線程指的是進(jìn)程中一個(gè)單一順序的控制流,一個(gè)進(jìn)程中可以并發(fā)多個(gè)線程,每條線 程并行執(zhí)行不同的任務(wù)。

          2、如何創(chuàng)建線程

          2.1、JAVA中創(chuàng)建線程

             
             
          /** * 繼承Thread類,重寫run方法 */class MyThread extends Thread {    @Override    public void run() {        System.out.println("myThread..." + Thread.currentThread().getName());} }
          /** * 實(shí)現(xiàn)Runnable接口,實(shí)現(xiàn)run方法 */class MyRunnable implements Runnable { @Override public void run() { System.out.println("MyRunnable..." + Thread.currentThread().getName());} }
          /** * 實(shí)現(xiàn)Callable接口,指定返回類型,實(shí)現(xiàn)call方法 */class MyCallable implements Callable<String> { @Override public String call() throws Exception { return "MyCallable..." + Thread.currentThread().getName();} }

          2.2、測試一下

             
             
          public static void main(String[] args) throws Exception {    MyThread thread = new MyThread();    thread.run();   //myThread...main    thread.start(); //myThread...Thread-0        MyRunnable myRunnable = new MyRunnable();    Thread thread1 = new Thread(myRunnable);    myRunnable.run();   //MyRunnable...main    thread1.start();    //MyRunnable...Thread-1        MyCallable myCallable = new MyCallable();    FutureTask<String> futureTask = new FutureTask<>(myCallable);    Thread thread2 = new Thread(futureTask);    thread2.start();    System.out.println(myCallable.call());  //MyCallable...main    System.out.println(futureTask.get());   //MyCallable...Thread-2
          }

          2.3、問題

          既然我們創(chuàng)建了線程,那為何我們直接調(diào)用方法和我們調(diào)用start()方法的結(jié)果不同?new Thread() 是否真實(shí)創(chuàng)建了線程?

          2.4、問題分析

          我們直接調(diào)用方法,可以看到是執(zhí)行的主線程,而調(diào)用start()方法就是開啟了新線程,那說明new Thread()并沒有創(chuàng)建線程,而是在start()中創(chuàng)建了線程。
          那我們看下Thread類start()方法:
             
             
          class Thread implements Runnable { //Thread類實(shí)現(xiàn)了Runnalbe接口,實(shí)現(xiàn)了run()方法         private Runnable target;
          public synchronized void start() { ...
          boolean started = false; try { start0(); //可以看到,start()方法真實(shí)的調(diào)用時(shí)start0()方法 started = true; } finally { ... } } private native void start0(); //start0()是一個(gè)native方法,由JVM調(diào)用底層操作系統(tǒng),開啟一個(gè)線程,由操作系統(tǒng)過統(tǒng)一調(diào)度
          @Override public void run() { if (target != null) { target.run(); //操作系統(tǒng)在執(zhí)行新開啟的線程時(shí),回調(diào)Runnable接口的run()方法,執(zhí)行我們預(yù)設(shè)的線程任務(wù)
          } } }

          2.5、總結(jié)

          • JAVA不能直接創(chuàng)建線程執(zhí)行任務(wù),而是通過創(chuàng)建Thread對象調(diào)用操作系統(tǒng)開啟線程,在由操作系 統(tǒng)回調(diào)Runnable接口的run()方法執(zhí)行任務(wù);
          • 實(shí)現(xiàn)Runnable的方式,將線程實(shí)際要執(zhí)行的回調(diào)任務(wù)單獨(dú)提出來了,實(shí)現(xiàn)線程的啟動與回調(diào)任務(wù) 解耦;
          • 實(shí)現(xiàn)Callable的方式,通過Future模式不但將線程的啟動與回調(diào)任務(wù)解耦,而且可以在執(zhí)行完成后 獲取到執(zhí)行的結(jié)果;


          二、多線程

          1、什么是多線程

          多線程(multithreading),是指從軟件或者硬件上實(shí)現(xiàn)多個(gè)線程并發(fā)執(zhí)行的技術(shù)。同一個(gè)線程只能處理完一個(gè)任務(wù)再處理下一個(gè)任務(wù),有時(shí)我們需要多個(gè)任務(wù)同時(shí)處理,這時(shí),我們就需要?jiǎng)?chuàng)建多 個(gè)線程來同時(shí)處理任務(wù)。

          2、多線程有什么好處

          2.1、串行處理

             
             
          public static void main(String[] args) throws Exception {    System.out.println("start...");    long start = System.currentTimeMillis();    for (int i = 0; i < 5; i++) {        Thread.sleep(2000);  //每個(gè)任務(wù)執(zhí)行2秒         System.out.println("task done..."); //處理執(zhí)行結(jié)果    }    long end = System.currentTimeMillis();    System.out.println("end...,time = "  + (end - start));}//執(zhí)行結(jié)果start...task done...task done...task done...task done...task done... end...,time = 10043

          2.2、并行處理

             
             
          public static void main(String[] args) throws Exception {    System.out.println("start...");    long start = System.currentTimeMillis();    List<Future> list = new ArrayList<>();
          for (int i = 0; i < 5; i++) { Callable<String> callable = new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(2000); //每個(gè)任務(wù)執(zhí)行2秒 return "task done..."; }
          }; FutureTask task = new FutureTask(callable); list.add(task); new Thread(task).start();
          } list.forEach(future -> { try { System.out.println(future.get()); //處理執(zhí)行結(jié)果 } catch (Exception e) { } }); long end = System.currentTimeMillis(); System.out.println("end...,time = " + (end - start));
          } //執(zhí)行結(jié)果 start... task done... task done... task done... task done... task done... end...,time = 2005

          2.3、總結(jié)

          • 多線程可以把一個(gè)任務(wù)拆分為幾個(gè)子任務(wù),多個(gè)子任務(wù)可以并發(fā)執(zhí)行,每一個(gè)子任務(wù)就是一個(gè)線程。
          • 多線程是為了同步完成多項(xiàng)任務(wù),不是為了提高運(yùn)行效率,而是為了提高資源使用效率來提高系統(tǒng) 的效率。

          2.4、多線程的問題

          上面示例中我們可以看到,如果每來一個(gè)任務(wù),我們就創(chuàng)建一個(gè)線程,有很多任務(wù)的情況下,我們 會創(chuàng)建大量的線程,可能會導(dǎo)致系統(tǒng)資源的耗盡。同時(shí),我們知道線程的執(zhí)行是需要搶占CPU資源 的,那如果有太多的線程,就會導(dǎo)致大量時(shí)間用在線程切換的開銷上。
          再有,每來一個(gè)任務(wù)都需要?jiǎng)?chuàng)建一個(gè)線程,而創(chuàng)建一個(gè)線程需要調(diào)用操作系統(tǒng)底層方法,開銷較 大,而線程執(zhí)行完成后就被回收了。在需要大量線程的時(shí)候,創(chuàng)建線程的時(shí)間就花費(fèi)不少了。


          三、線程池

          1、如何設(shè)計(jì)一個(gè)線程池

          由于多線程的開發(fā)存在上述的一些問題,那我們是否可以設(shè)計(jì)一個(gè)東西來避免這些問題呢?當(dāng)然可以! 線程池就是為了解決這些問題而生的。那我們該如何設(shè)計(jì)一個(gè)線程池來解決這些問題呢?或者說,一個(gè)線程池該具備什么樣的功能?

          1.1、線程池基本功能

          • 多線程會創(chuàng)建大量的線程耗盡資源,那線程池應(yīng)該對線程數(shù)量有所限制,可以保證不會耗盡系統(tǒng)資 源;
          • 每次創(chuàng)建新的線程會增加創(chuàng)建時(shí)的開銷,那線程池應(yīng)該減少線程的創(chuàng)建,盡量復(fù)用已創(chuàng)建好的線 程;

          1.2、線程池面臨問題

          • 我們知道線程在執(zhí)行完自己的任務(wù)后就會被回收,那我們?nèi)绾螐?fù)用線程?
          • 我們指定了線程的最大數(shù)量,當(dāng)任務(wù)數(shù)超出線程數(shù)時(shí),我們該如何處理?

          1.3、創(chuàng)新源于生活

          先假設(shè)一個(gè)場景:假設(shè)我們是一個(gè)物流公司的管理人員,要配送的貨物就是我們的任務(wù),貨車就是 我們配送工具,我們當(dāng)然不能有多少貨物就準(zhǔn)備多少貨車。那當(dāng)顧客源源不斷的將貨物交給我們配送,我們該如何管理才能讓公司經(jīng)營的最好呢?
          • 最開始貨物來的時(shí)候,我們還沒有貨車,每批要運(yùn)輸?shù)呢浳镂覀兌家徺I一輛車來運(yùn)輸;
          • 當(dāng)貨車運(yùn)輸完成后,暫時(shí)還沒有下一批貨物到達(dá),那貨車就在倉庫停著,等有貨物來了立馬就可以 運(yùn)輸;
          • 當(dāng)我們有了一定數(shù)量的車后,我們認(rèn)為已經(jīng)夠用了,那后面就不再買車了,這時(shí)要是由新的貨物來 了,我們就會讓貨物先放倉庫,等有車回來再配送;
          • 當(dāng)618大促來襲,要配送的貨物太多,車都在路上,倉庫也都放滿了,那怎么辦呢?我們就選擇臨 時(shí)租一些車來幫忙配送,提高配送的效率;
          • 但是貨物還是太多,我們增加了臨時(shí)的貨車,依舊配送不過來,那這時(shí)我們就沒辦法了,只能讓發(fā)貨的客戶排隊(duì)等候或者干脆不接收了;
          • 大促圓滿完成后,累計(jì)的貨物已經(jīng)配送完成了,為了降低成本,我們就將臨時(shí)租的車都還了;

          1.4、技術(shù)源于創(chuàng)新

          基于上述場景,物流公司就是我們的線程池、貨物就是我們的線程任務(wù)、貨車就是我們的線程。我們?nèi)绾卧O(shè)計(jì)公司的管理貨車的流程,就應(yīng)該如何設(shè)計(jì)線程池管理線程的流程。
          • 當(dāng)任務(wù)進(jìn)來我們還沒有線程時(shí),我們就該創(chuàng)建線程執(zhí)行任務(wù);
          • 當(dāng)線程任務(wù)執(zhí)行完成后,線程不釋放,等著下一個(gè)任務(wù)進(jìn)來后接著執(zhí)行;
          • 當(dāng)創(chuàng)建的線程數(shù)量達(dá)到一定量后,新來的任務(wù)我們存起來等待空閑線程執(zhí)行,這就要求線程池有個(gè) 存任務(wù)的容器;
          • 當(dāng)容器存滿后,我們需要增加一些臨時(shí)的線程來提高處理效率;
          • 當(dāng)增加臨時(shí)線程后依舊處理不了的任務(wù),那就應(yīng)該將此任務(wù)拒絕;
          • 當(dāng)所有任務(wù)執(zhí)行完成后,就應(yīng)該將臨時(shí)的線程釋放掉,以免增加不必要的開銷;

          2、線程池具體分析

          上文中,我們講了該如何設(shè)計(jì)一個(gè)線程池,下面我們看看大神是如何設(shè)計(jì)的;

          2.1、 JAVA中的線程池是如何設(shè)計(jì)的

          2.1.1、 線程池設(shè)計(jì)

          看下線程池中的屬性,了解線程池的設(shè)計(jì)。
             
             
          public class ThreadPoolExecutor extends AbstractExecutorService {
          //線程池的打包控制狀態(tài),用高3位來表示線程池的運(yùn)行狀態(tài),低29位來表示線程池中工作線程的數(shù)量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //值為29,用來表示偏移量 private static final int COUNT_BITS = Integer.SIZE - 3;
          //線程池的最大容量 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
          //線程池的運(yùn)行狀態(tài),總共有5個(gè)狀態(tài),用高3位來表示 private static final int RUNNING = -1 << COUNT_BITS; //接受新任務(wù)并處理阻塞隊(duì)列中的任務(wù)
          private static final int SHUTDOWN = 0 << COUNT_BITS; //不接受新任務(wù)但會處理阻塞隊(duì)列中的任務(wù)
          private static final int STOP = 1 << COUNT_BITS; //不會接受新任務(wù),也不會處理阻塞隊(duì)列中的任務(wù),并且中斷正在運(yùn)行的任務(wù)
          private static final int TIDYING = 2 << COUNT_BITS; //所有任務(wù)都已終止, 工作線程數(shù)量為0,即將要執(zhí)行terminated()鉤子方法
          private static final int TERMINATED = 3 << COUNT_BITS; // terminated()方法已經(jīng)執(zhí)行結(jié)束
          //任務(wù)緩存隊(duì)列,用來存放等待執(zhí)行的任務(wù) private final BlockingQueue<Runnable> workQueue;
          //全局鎖,對線程池狀態(tài)等屬性修改時(shí)需要使用這個(gè)鎖 private final ReentrantLock mainLock = new ReentrantLock();
          //線程池中工作線程的集合,訪問和修改需要持有全局鎖 private final HashSet<Worker> workers = new HashSet<Worker>();
          // 終止條件 private final Condition termination = mainLock.newCondition();
          //線程池中曾經(jīng)出現(xiàn)過的最大線程數(shù) private int largestPoolSize; //已完成任務(wù)的數(shù)量 private long completedTaskCount; //線程工廠 private volatile ThreadFactory threadFactory; //任務(wù)拒絕策略 private volatile RejectedExecutionHandler handler;
          //線程存活時(shí)間 private volatile long keepAliveTime;
          //是否允許核心線程超時(shí) private volatile boolean allowCoreThreadTimeOut;
          //核心池大小,若allowCoreThreadTimeOut被設(shè)置,核心線程全部空閑超時(shí)被回收的情況下會為0 private volatile int corePoolSize;
          //最大池大小,不得超過CAPACITY private volatile int maximumPoolSize; //默認(rèn)的任務(wù)拒絕策略 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
          //運(yùn)行權(quán)限相關(guān) private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
          ... }
          小結(jié)一下:以上線程池的設(shè)計(jì)可以看出,線程池的功能還是很完善的。
          • 提供了線程創(chuàng)建、數(shù)量及存活時(shí)間等的管理;
          • 提供了線程池狀態(tài)流轉(zhuǎn)的管理;
          • 提供了任務(wù)緩存的各種容器;
          • 提供了多余任務(wù)的處理機(jī)制;
          • 提供了簡單的統(tǒng)計(jì)功能;

          2.1.2、線程池構(gòu)造函數(shù)

             
             
          //構(gòu)造函數(shù) public ThreadPoolExecutor(int corePoolSize, //核心線程數(shù)                            int maximumPoolSize, //最大允許線程數(shù)                            long keepAliveTime, //線程存活時(shí)間                            TimeUnit unit, //存活時(shí)間單位                            BlockingQueue<Runnable> workQueue, //任務(wù)緩存隊(duì)列                           ThreadFactory threadFactory, //線程工廠                            RejectedExecutionHandler handler) { //拒絕策略     if (corePoolSize < 0 ||        maximumPoolSize <= 0 ||        maximumPoolSize < corePoolSize ||        keepAliveTime < 0)        throw new IllegalArgumentException();            if (workQueue == null || threadFactory == null || handler == null)        throw new NullPointerException();            this.corePoolSize = corePoolSize;    this.maximumPoolSize = maximumPoolSize;    this.workQueue = workQueue;    this.keepAliveTime = unit.toNanos(keepAliveTime);    this.threadFactory = threadFactory;    this.handler = handler;}
          小結(jié)一下:
          • 構(gòu)造函數(shù)告訴了我們可以怎樣去適用線程池,線程池的哪些特性是我們可以控制的;

          2.1.3、線程池執(zhí)行

          2.1.3.1、提交任務(wù)方法
          • public void execute(Runnable command);
          • Future<?> submit(Runnable task);
          • Future submit(Runnable task, T result);
          • Future submit(Callable task);
             
             
          public Future<?> submit(Runnable task) {        if (task == null) throw new NullPointerException();        RunnableFuture<Void> ftask = newTaskFor(task, null);        execute(ftask);        return ftask;}
          可以看到submit方法的底層調(diào)用的也是execute方法,所以我們這里只分析execute方法;
             
             
              public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();                int c = ctl.get();        //第一步:創(chuàng)建核心線程        if (workerCountOf(c) < corePoolSize) {  //worker數(shù)量小于corePoolSize            if (addWorker(command, true))       //創(chuàng)建worker                return;            c = ctl.get();        }        //第二步:加入緩存隊(duì)列        if (isRunning(c) && workQueue.offer(command)) { //線程池處于RUNNING狀態(tài),將任務(wù)加入workQueue任務(wù)緩存隊(duì)列            int recheck = ctl.get();                if (! isRunning(recheck) && remove(command))    //雙重檢查,若線程池狀態(tài)關(guān)閉了,移除任務(wù)                reject(command);            else if (workerCountOf(recheck) == 0)       //線程池狀態(tài)正常,但是沒有線程了,創(chuàng)建worker                addWorker(null, false);        }        //第三步:創(chuàng)建臨時(shí)線程        else if (!addWorker(command, false))            reject(command);    }
          小結(jié)一下:execute()方法主要功能:
          • 核心線程數(shù)量不足就創(chuàng)建核心線程;
          • 核心線程滿了就加入緩存隊(duì)列;
          • 緩存隊(duì)列滿了就增加非核心線程;
          • 非核心線程也滿了就拒絕任務(wù);
          2.1.3.2、創(chuàng)建線程
          private boolean addWorker(Runnable firstTask, boolean core) {        retry:        for (;;) {            int c = ctl.get();            int rs = runStateOf(c);
          //等價(jià)于:rs>=SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()) //線程池已關(guān)閉,并且無需執(zhí)行緩存隊(duì)列中的任務(wù),則不創(chuàng)建 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;
          for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) //CAS增加線程數(shù) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
          //上面的流程走完,就可以真實(shí)開始創(chuàng)建線程了 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); //這里創(chuàng)建了線程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get());
          if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); //這里將線程加入到線程池中 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); //添加成功,啟動線程 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); //添加線程失敗操作 } return workerStarted; }
          小結(jié):addWorker()方法主要功能;
          • 增加線程數(shù);
          • 創(chuàng)建線程Worker實(shí)例加入線程池;
          • 加入完成開啟線程;
          • 啟動失敗則回滾增加流程;
          2.1.3.3、工作線程的實(shí)現(xiàn)
              private final class Worker  //Worker類是ThreadPoolExecutor的內(nèi)部類        extends AbstractQueuedSynchronizer          implements Runnable{                final Thread thread;    //持有實(shí)際線程        Runnable firstTask;     //worker所對應(yīng)的第一個(gè)任務(wù),可能為空        volatile long completedTasks;   //記錄執(zhí)行任務(wù)數(shù)
          Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); //當(dāng)前線程調(diào)用ThreadPoolExecutor中的runWorker方法,在這里實(shí)現(xiàn)的線程復(fù)用 }
          ...繼承AQS,實(shí)現(xiàn)了不可重入鎖... }
             
             
          小結(jié):工作線程Worker類主要功能;
          • 此類持有一個(gè)工作線程,不斷處理拿到的新任務(wù),持有的線程即為可復(fù)用的線程;
          • 此類可看作一個(gè)適配類,在run()方法中真實(shí)調(diào)用runWorker()方法不斷獲取新任務(wù),完成線程復(fù)用;
          2.1.3.4、線程的復(fù)用
          final void runWorker(Worker w) {    //ThreadPoolExecutor中的runWorker方法,在這里實(shí)現(xiàn)的線程復(fù)用        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock(); // allow interrupts        boolean completedAbruptly = true;   //標(biāo)識線程是否異常終止        try {            while (task != null || (task = getTask()) != null) {    //這里會不斷從任務(wù)隊(duì)列獲取任務(wù)并執(zhí)行                w.lock();                                //線程是否需要中斷                if ((runStateAtLeast(ctl.get(), STOP) ||                         (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {                    beforeExecute(wt, task);    //執(zhí)行任務(wù)前的Hook方法,可自定義                    Throwable thrown = null;                    try {                        task.run();             //執(zhí)行實(shí)際的任務(wù)                    } catch (RuntimeException x) {                        thrown = x; throw x;                    } catch (Error x) {                        thrown = x; throw x;                    } catch (Throwable x) {                        thrown = x; throw new Error(x);                    } finally {                        afterExecute(task, thrown); //執(zhí)行任務(wù)后的Hook方法,可自定義                    }                } finally {                    task = null;    //執(zhí)行完成后,將當(dāng)前線程中的任務(wù)制空,準(zhǔn)備執(zhí)行下一個(gè)任務(wù)                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false;        } finally {            processWorkerExit(w, completedAbruptly);    //線程執(zhí)行完成后的清理工作        }    }
          小結(jié):runWorker()方法主要功能;
          • 循環(huán)從緩存隊(duì)列中獲取新的任務(wù),直到?jīng)]有任務(wù)為止;
          • 使用worker持有的線程真實(shí)執(zhí)行任務(wù);
          • 任務(wù)都執(zhí)行完成后的清理工作;
          2.1.3.5、隊(duì)列中獲取待執(zhí)行任務(wù)
             
             
          private Runnable getTask() {        boolean timedOut = false;   //標(biāo)識當(dāng)前線程是否超時(shí)未能獲取到task對象
          for (;;) { int c = ctl.get(); int rs = runStateOf(c);
          // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; }
          int wc = workerCountOf(c);
          // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
          if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) //若線程存活時(shí)間超時(shí),則CAS減去線程數(shù)量 return null; continue; }
          try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //允許超時(shí)回收則阻塞等待 workQueue.take(); //不允許則直接獲取,沒有就返回null if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
          小結(jié):getTask()方法主要功能;
          實(shí)際在緩存隊(duì)列中獲取待執(zhí)行的任務(wù);
          在這里管理線程是否要阻塞等待,控制線程的數(shù)量;
          2.1.3.6、清理工作
          private void processWorkerExit(Worker w, boolean completedAbruptly) {        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted            decrementWorkerCount();
          final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); //移除執(zhí)行完成的線程 } finally { mainLock.unlock(); }
          tryTerminate(); //每次回收完一個(gè)線程后都嘗試終止線程池
          int c = ctl.get(); if (runStateLessThan(c, STOP)) { //到這里說明線程池沒有終止 if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); //異常終止線程的話,需要在常見一個(gè)線程 } }
          小結(jié):processWorkerExit()方法主要功能;
          • 真實(shí)完成線程池線程的回收;
          • 調(diào)用嘗試終止線程池;
          • 保證線程池正常運(yùn)行;
          2.1.3.7、嘗試終止線程池
          final void tryTerminate() {        for (;;) {            int c = ctl.get();                        //若線程池正在執(zhí)行、線程池已終止、線程池還需要執(zhí)行緩存隊(duì)列中的任務(wù)時(shí),返回            if (isRunning(c) ||                runStateAtLeast(c, TIDYING) ||                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))                return;                            //執(zhí)行到這里,線程池為SHUTDOWN且無待執(zhí)行任務(wù) 或 STOP 狀態(tài)            if (workerCountOf(c) != 0) {                interruptIdleWorkers(ONLY_ONE);     //只中斷一個(gè)線程                return;            }
          //執(zhí)行到這里,線程池已經(jīng)沒有可用線程了,可以終止了 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //CAS設(shè)置線程池終止 try { terminated(); //執(zhí)行鉤子方法 } finally { ctl.set(ctlOf(TERMINATED, 0)); //這里將線程池設(shè)為終態(tài) termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
          小結(jié):tryTerminate()方法主要功能;
          • 實(shí)際嘗試終止線程池;
          • 終止成功則調(diào)用鉤子方法,并且將線程池置為終態(tài)。

          2.2、JAVA線程池總結(jié)

          以上通過對JAVA線程池的具體分析我們可以看出,雖然流程看似復(fù)雜,但其實(shí)有很多內(nèi)容都是狀態(tài)重復(fù)校驗(yàn)、線程安全的保證等內(nèi)容,其主要的功能與我們前面所提出的設(shè)計(jì)功能一致,只是額外增加了一些擴(kuò)展,下面我們簡單整理下線程池的功能;
          2.2.1、主要功能
          • 線程數(shù)量及存活時(shí)間的管理;
          • 待處理任務(wù)的存儲功能;
          • 線程復(fù)用機(jī)制功能;
          • 任務(wù)超量的拒絕功能;
          2.2.2、擴(kuò)展功能
          • 簡單的執(zhí)行結(jié)果統(tǒng)計(jì)功能;
          • 提供線程執(zhí)行異常處理機(jī)制;
          • 執(zhí)行前后處理流程自定義;
          • 提供線程創(chuàng)建方式的自定義;
          2.2.3、流程總結(jié)
          以上通過對JAVA線程池任務(wù)提交流程的分析我們可以看出,線程池執(zhí)行的簡單流程如下圖所示;

          2.3、JAVA線程池使用

          線程池基本使用驗(yàn)證上述流程:
          public static void main(String[] args) throws Exception {                //創(chuàng)建線程池       ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(               5, 10, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(5));                //加入4個(gè)任務(wù),小于核心線程,應(yīng)該只有4個(gè)核心線程,隊(duì)列為0        for (int i = 0; i < 4; i++) {            threadPoolExecutor.submit(new MyRunnable());        }        System.out.println("worker count = " + threadPoolExecutor.getPoolSize());   //worker count = 4        System.out.println("queue size = " + threadPoolExecutor.getQueue().size()); //queue size = 0                //再加4個(gè)任務(wù),超過核心線程,但是沒有超過核心線程 + 緩存隊(duì)列容量,應(yīng)該5個(gè)核心線程,隊(duì)列為3        for (int i = 0; i < 4; i++) {            threadPoolExecutor.submit(new MyRunnable());        }        System.out.println("worker count = " + threadPoolExecutor.getPoolSize());   //worker count = 5        System.out.println("queue size = " + threadPoolExecutor.getQueue().size()); //queue size = 3                //再加4個(gè)任務(wù),隊(duì)列滿了,應(yīng)該5個(gè)熱核心線程,隊(duì)列5個(gè),非核心線程2個(gè)        for (int i = 0; i < 4; i++) {            threadPoolExecutor.submit(new MyRunnable());        }        System.out.println("worker count = " + threadPoolExecutor.getPoolSize());   //worker count = 7        System.out.println("queue size = " + threadPoolExecutor.getQueue().size()); //queue size = 5                //再加4個(gè)任務(wù),核心線程滿了,應(yīng)該5個(gè)熱核心線程,隊(duì)列5個(gè),非核心線程5個(gè),最后一個(gè)拒絕        for (int i = 0; i < 4; i++) {            try {                threadPoolExecutor.submit(new MyRunnable());            } catch (Exception e) {                e.printStackTrace();    //java.util.concurrent.RejectedExecutionException            }        }        System.out.println("worker count = " + threadPoolExecutor.getPoolSize());   //worker count = 10        System.out.println("queue size = " + threadPoolExecutor.getQueue().size()); //queue size = 5        System.out.println(threadPoolExecutor.getTaskCount());  //共執(zhí)行15個(gè)任務(wù)                //執(zhí)行完成,休眠15秒,非核心線程釋放,應(yīng)該5個(gè)核心線程,隊(duì)列為0        Thread.sleep(1500);        System.out.println("worker count = " + threadPoolExecutor.getPoolSize());   //worker count = 5        System.out.println("queue size = " + threadPoolExecutor.getQueue().size()); //queue size = 0                //關(guān)閉線程池        threadPoolExecutor.shutdown();    }?
          -end-

          瀏覽 885
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  看屄屄视频| 99精品无码 | 日本欧美国产 | 欧美在线视频a | 国产精品日韩无码有码 |