<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          深入學(xué)習(xí)Java線程池

          共 24533字,需瀏覽 50分鐘

           ·

          2021-10-01 13:17

          在之前,我們都是通過new Thread來創(chuàng)建一個線程,由于線程的創(chuàng)建和銷毀都需要消耗一定的CPU資源,所以在高并發(fā)下這種創(chuàng)建線程的方式將嚴(yán)重影響代碼執(zhí)行效率。而線程池的作用就是讓一個線程執(zhí)行結(jié)束后不馬上銷毀,繼續(xù)執(zhí)行新的任務(wù),這樣就節(jié)省了不斷創(chuàng)建線程和銷毀線程的開銷。

          ThreadPoolExecutor

          創(chuàng)建Java線程池最為核心的類為ThreadPoolExecutor

          它提供了四種構(gòu)造函數(shù)來創(chuàng)建線程池,其中最為核心的構(gòu)造函數(shù)如下所示:

          public ThreadPoolExecutor(int corePoolSize,     // 核心線程數(shù)
          int maximumPoolSize, // 最大線程個數(shù)
          long keepAliveTime, // 等待時間
          TimeUnit unit, // 等待時間單位
          BlockingQueue<Runnable> workQueue, // 工作隊列
          ThreadFactory threadFactory, // 線程創(chuàng)建工廠
          RejectedExecutionHandler handler)
          // 拒絕策略
          復(fù)制代碼

          這7個參數(shù)的含義如下:

          1. corePoolSize 線程池核心線程數(shù)。即線程池中保留的線程個數(shù),即使這些線程是空閑的,也不會被銷毀,除非通過ThreadPoolExecutor的allowCoreThreadTimeOut(true)方法開啟了核心線程的超時策略;

          2. maximumPoolSize 線程池中允許的最大線程個數(shù);

          3. keepAliveTime 用于設(shè)置那些超出核心線程數(shù)量的線程的最大等待時間,超過這個時間還沒有新任務(wù)的話,超出的線程將被銷毀;

          4. unit 超時時間單位;

          5. workQueue 線程隊列。用于保存通過execute方法提交的,等待被執(zhí)行的任務(wù);

          6. threadFactory 線程創(chuàng)建工廠,即指定怎樣創(chuàng)建線程;

          7. handler 拒絕策略。即指定當(dāng)線程提交的數(shù)量超出了maximumPoolSize后,該使用什么策略處理超出的線程。

          在通過這個構(gòu)造方法創(chuàng)建線程池的時候,這幾個參數(shù)必須滿足以下條件,否則將拋出IllegalArgumentException異常:

          1. corePoolSize不能小于0;

          2. keepAliveTime不能小于0;

          3. maximumPoolSize 不能小于等于0;

          4. maximumPoolSize不能小于corePoolSize;

          此外,workQueue、threadFactory和handler不能為null,否則將拋出空指針異常。

          下面舉些例子來深入理解這幾個參數(shù)的含義。

          使用上面的構(gòu)造方法創(chuàng)建一個線程池:

          ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
          1, 2, 10,
          TimeUnit.SECONDS,
          new ArrayBlockingQueue<>(1),
          (ThreadFactory) Thread::new,
          new ThreadPoolExecutor.AbortPolicy());
          System.out.println("線程池創(chuàng)建完畢");

          int activeCount = -1;
          int queueSize = -1;
          while (true) {
          if (activeCount != threadPoolExecutor.getActiveCount()
          || queueSize != threadPoolExecutor.getQueue().size()) {
          System.out.println("活躍線程個數(shù) " + threadPoolExecutor.getActiveCount());
          System.out.println("核心線程個數(shù) " + threadPoolExecutor.getCorePoolSize());
          System.out.println("隊列線程個數(shù) " + threadPoolExecutor.getQueue().size());
          System.out.println("最大線程數(shù) " + threadPoolExecutor.getMaximumPoolSize());
          System.out.println("------------------------------------");
          activeCount = threadPoolExecutor.getActiveCount();
          queueSize = threadPoolExecutor.getQueue().size();
          }
          }
          復(fù)制代碼

          上面的代碼創(chuàng)建了一個核心線程數(shù)量為1,允許最大線程數(shù)量為2,最大活躍時間為10秒,線程隊列長度為1的線程池。

          假如我們通過execute方法向線程池提交1個任務(wù),看看結(jié)果如何:

          ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
          1, 2, 10,
          TimeUnit.SECONDS,
          new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
          new ThreadPoolExecutor.AbortPolicy());
          System.out.println("線程池創(chuàng)建完畢");

          threadPoolExecutor.execute(() -> sleep(100));

          int activeCount = -1;
          int queueSize = -1;
          while (true) {
          if (activeCount != threadPoolExecutor.getActiveCount()
          || queueSize != threadPoolExecutor.getQueue().size()) {
          System.out.println("活躍線程個數(shù) " + threadPoolExecutor.getActiveCount());
          System.out.println("核心線程個數(shù) " + threadPoolExecutor.getCorePoolSize());
          System.out.println("隊列線程個數(shù) " + threadPoolExecutor.getQueue().size());
          System.out.println("最大線程數(shù) " + threadPoolExecutor.getMaximumPoolSize());
          System.out.println("------------------------------------");
          activeCount = threadPoolExecutor.getActiveCount();
          queueSize = threadPoolExecutor.getQueue().size();
          }
          }
          復(fù)制代碼

          ThreadPoolExecutor的execute和submit方法都可以向線程池提交任務(wù),區(qū)別是,submit方法能夠返回執(zhí)行結(jié)果,返回值類型為Future。

          sleep方法代碼:

          private static void sleep(long value) {
          try {
          System.out.println(Thread.currentThread().getName() + "線程執(zhí)行sleep方法");
          TimeUnit.SECONDS.sleep(value);
          } catch (InterruptedException e) {
          e.printStackTrace();
          }
          }
          復(fù)制代碼

          線程池核心線程數(shù)量為1,通過execute提交了一個任務(wù)后,由于核心線程是空閑的,所以任務(wù)被執(zhí)行了。由于這個任務(wù)的邏輯是休眠100秒,所以在這100秒內(nèi),線程池的活躍線程數(shù)量為1。此外,因?yàn)樘峤坏娜蝿?wù)被核心線程執(zhí)行了,所以并沒有線程需要被放到線程隊列里等待,線程隊列長度為0。

          假如我們通過execute方法向線程池提交2個任務(wù),看看結(jié)果如何:

          threadPoolExecutor.execute(() -> sleep(100));
          threadPoolExecutor.execute(() -> sleep(100));
          復(fù)制代碼

          線程池核心線程數(shù)量為1,通過execute提交了2個任務(wù)后,一開始核心線程是空閑的,Thread-0被執(zhí)行。由于這個任務(wù)的邏輯是休眠100秒,所以在這100秒內(nèi),線程池的活躍線程數(shù)量為1。因?yàn)楹诵木€程數(shù)量為1,所以另外一個任務(wù)在這100秒內(nèi)不能被執(zhí)行,于是被放到線程隊列里等待,線程隊列長度為1。

          假如我們通過execute方法向線程池提交3個任務(wù),看看結(jié)果如何:

          threadPoolExecutor.execute(() -> sleep(100));
          threadPoolExecutor.execute(() -> sleep(100));
          threadPoolExecutor.execute(() -> sleep(100));
          復(fù)制代碼

          這三個任務(wù)都是休眠100秒,所以核心線程池中第一個任務(wù)正在被執(zhí)行,第二個任務(wù)被放入到了線程隊列。而當(dāng)?shù)谌齻€任務(wù)被提交進(jìn)來時,線程隊列滿了(我們定義的長度為1),由于該線程池允許的最大線程數(shù)量為2,所以線程池還可以再創(chuàng)建一個線程來執(zhí)行另外一個任務(wù),于是乎之前在線程隊列里的線程被取出執(zhí)行(FIFO),第三個任務(wù)被放入到了線程隊列。

          改變第二個和第三個任務(wù)的睡眠時間,觀察輸出:

          threadPoolExecutor.execute(() -> sleep(100));
          threadPoolExecutor.execute(() -> sleep(5));
          threadPoolExecutor.execute(() -> sleep(5));
          復(fù)制代碼

          第二個任務(wù)提交5秒后,任務(wù)執(zhí)行完畢,所以線程隊列里的任務(wù)被執(zhí)行,于是隊列線程個數(shù)為0,活躍線程數(shù)量為2(第一個和第三個任務(wù))。再過5秒后,第三個任務(wù)執(zhí)行完畢,于是活躍線程數(shù)量為1(第一個100秒還沒執(zhí)行完畢)。

          在第三個任務(wù)結(jié)束的瞬間,我們觀察線程快照:

          可以看到,線程池中有兩個線程,Thread-0在執(zhí)行第一個任務(wù)(休眠100秒,還沒結(jié)束),Thread-1執(zhí)行完第三個任務(wù)后并沒有馬上被銷毀。過段時間后(10秒鐘后)再觀察線程快照:

          可以看到,Thread-1這個線程被銷毀了,因?yàn)槲覀冊趧?chuàng)建線程池的時候,指定keepAliveTime 為10秒,10秒后,超出核心線程池線程外的那些線程將被銷毀。

          假如一次性提交4個任務(wù),看看會怎樣:

          threadPoolExecutor.execute(() -> sleep(100));
          threadPoolExecutor.execute(() -> sleep(100));
          threadPoolExecutor.execute(() -> sleep(100));
          threadPoolExecutor.execute(() -> sleep(100));
          復(fù)制代碼

          因?yàn)槲覀冊O(shè)置的拒絕策略為AbortPolicy,所以最后提交的那個任務(wù)直接被拒絕了。更多拒絕策略下面會介紹到。

          關(guān)閉線程池

          線程池包含以下幾個狀態(tài):

          當(dāng)線程池中所有任務(wù)都處理完畢后,線程并不會自己關(guān)閉。我們可以通過調(diào)用shutdownshutdownNow方法來關(guān)閉線程池。兩者的區(qū)別在于:

          1. shutdown方法將線程池置為shutdown狀態(tài),拒絕新的任務(wù)提交,但線程池并不會馬上關(guān)閉,而是等待所有正在執(zhí)行的和線程隊列里的任務(wù)都執(zhí)行完畢后,線程池才會被關(guān)閉。所以這個方法是平滑的關(guān)閉線程池。

          2. shutdownNow方法將線程池置為stop狀態(tài),拒絕新的任務(wù)提交,中斷正在執(zhí)行的那些任務(wù),并且清除線程隊列里的任務(wù)并返回。所以這個方法是比較“暴力”的。

          舉兩個例子觀察下兩者的區(qū)別:

          shutdown例子:

          public static void main(String[] args) {
          ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
          2, 4, 10,
          TimeUnit.SECONDS,
          new ArrayBlockingQueue<>(2), (ThreadFactory) Thread::new,
          new ThreadPoolExecutor.AbortPolicy());

          threadPoolExecutor.execute(new shortTask());
          threadPoolExecutor.execute(new longTask());
          threadPoolExecutor.execute(new longTask());
          threadPoolExecutor.execute(new shortTask());

          threadPoolExecutor.shutdown();
          System.out.println("已經(jīng)執(zhí)行了線程池shutdown方法");
          }

          static class shortTask implements Runnable {
          @Override
          public void run() {
          try {
          TimeUnit.SECONDS.sleep(1);
          System.out.println(Thread.currentThread().getName() + "執(zhí)行shortTask完畢");
          } catch (InterruptedException e) {
          System.err.println("shortTask執(zhí)行過程中被打斷" + e.getMessage());
          }
          }
          }

          static class longTask implements Runnable {
          @Override
          public void run() {
          try {
          TimeUnit.SECONDS.sleep(5);
          System.out.println(Thread.currentThread().getName() + "執(zhí)行l(wèi)ongTask完畢");
          } catch (InterruptedException e) {
          System.err.println("longTask執(zhí)行過程中被打斷" + e.getMessage());
          }
          }
          }
          復(fù)制代碼

          啟動程序,控制臺輸出如下:

          可以看到,雖然在任務(wù)都被提交后馬上執(zhí)行了shutdown方法,但是并不會馬上關(guān)閉線程池,而是等待所有被提交的任務(wù)都執(zhí)行完了才關(guān)閉。

          shutdownNow例子:

          public static void main(String[] args) {
          ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
          2, 4, 10,
          TimeUnit.SECONDS,
          new ArrayBlockingQueue<>(2), (ThreadFactory) Thread::new,
          new ThreadPoolExecutor.AbortPolicy());

          threadPoolExecutor.execute(new shortTask());
          threadPoolExecutor.execute(new longTask());
          threadPoolExecutor.execute(new longTask());
          threadPoolExecutor.execute(new shortTask());

          List<Runnable> runnables = threadPoolExecutor.shutdownNow(); // 馬上關(guān)閉,并返回還未被執(zhí)行的任務(wù)
          System.out.println(runnables);

          System.out.println("已經(jīng)執(zhí)行了線程池shutdownNow方法");
          }

          static class shortTask implements Runnable {
          @Override
          public void run() {
          try {
          TimeUnit.SECONDS.sleep(1);
          System.out.println(Thread.currentThread().getName() + "執(zhí)行shortTask完畢");
          } catch (InterruptedException e) {
          System.err.println("shortTask執(zhí)行過程中被打斷" + e.getMessage());
          }
          }
          }

          static class longTask implements Runnable {
          @Override
          public void run() {
          try {
          TimeUnit.SECONDS.sleep(5);
          System.out.println(Thread.currentThread().getName() + "執(zhí)行l(wèi)ongTask完畢");
          } catch (InterruptedException e) {
          System.err.println("longTask執(zhí)行過程中被打斷" + e.getMessage());
          }
          }
          }
          復(fù)制代碼

          啟動程序,控制臺輸出如下:

          可以看到,在執(zhí)行shutdownNow方法后,線程池馬上就被關(guān)閉了,正在執(zhí)行中的兩個任務(wù)被打斷,并且返回了線程隊列中等待被執(zhí)行的兩個任務(wù)。

          通過上面兩個例子我們還可以看到shutdownshutdownNow方法都不是阻塞的。常與shutdown搭配的方法有awaitTermination

          awaitTermination方法接收timeout和TimeUnit兩個參數(shù),用于設(shè)定超時時間及單位。當(dāng)?shù)却^設(shè)定時間時,會監(jiān)測ExecutorService是否已經(jīng)關(guān)閉,若關(guān)閉則返回true,否則返回false。該方法是阻塞的:

          public static void main(String[] args) throws InterruptedException {
          ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
          2, 4, 10,
          TimeUnit.SECONDS,
          new ArrayBlockingQueue<>(2), (ThreadFactory) Thread::new,
          new ThreadPoolExecutor.AbortPolicy());

          threadPoolExecutor.execute(new shortTask());
          threadPoolExecutor.execute(new longTask());
          threadPoolExecutor.execute(new longTask());
          threadPoolExecutor.execute(new shortTask());

          threadPoolExecutor.shutdown();
          boolean isShutdown = threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS);
          if (isShutdown) {
          System.out.println("線程池在3秒內(nèi)成功關(guān)閉");
          } else {
          System.out.println("等了3秒還沒關(guān)閉,不等了╰(‵□′)╯");
          }
          System.out.println("------------");
          }

          static class shortTask implements Runnable {
          @Override
          public void run() {
          try {
          TimeUnit.SECONDS.sleep(1);
          System.out.println(Thread.currentThread().getName() + "執(zhí)行shortTask完畢");
          } catch (InterruptedException e) {
          System.err.println("shortTask執(zhí)行過程中被打斷" + e.getMessage());
          }
          }
          }

          static class longTask implements Runnable {
          @Override
          public void run() {
          try {
          TimeUnit.SECONDS.sleep(5);
          System.out.println(Thread.currentThread().getName() + "執(zhí)行l(wèi)ongTask完畢");
          } catch (InterruptedException e) {
          System.err.println("longTask執(zhí)行過程中被打斷" + e.getMessage());
          }
          }
          }
          復(fù)制代碼

          啟動程序輸出如下:

          4大拒絕策略

          當(dāng)線程池?zé)o法再接收新的任務(wù)的時候,可采取如下四種策略:

          CallerRunsPolicy

          CallerRunsPolicy策略:由調(diào)用線程處理該任務(wù):

          public static void main(String[] args) throws InterruptedException {
          ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
          2, 3, 10,
          TimeUnit.SECONDS,
          new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
          new ThreadPoolExecutor.CallerRunsPolicy());

          threadPoolExecutor.execute(new shortTask("任務(wù)1"));
          threadPoolExecutor.execute(new longTask("任務(wù)2"));
          threadPoolExecutor.execute(new longTask("任務(wù)3"));
          threadPoolExecutor.execute(new shortTask("任務(wù)4"));
          threadPoolExecutor.execute(new shortTask("任務(wù)5"));

          threadPoolExecutor.shutdown();
          }

          static class shortTask implements Runnable {
          private String name;

          public shortTask(String name) {
          this.name = name;
          }

          @Override
          public void run() {
          try {
          TimeUnit.SECONDS.sleep(1);
          System.out.println(Thread.currentThread().getName() + "執(zhí)行shortTask-name-" + name + "完畢");
          } catch (InterruptedException e) {
          System.err.println("shortTask執(zhí)行過程中被打斷" + e.getMessage());
          }
          }
          }

          static class longTask implements Runnable {
          private String name;

          public longTask(String name) {
          this.name = name;
          }

          @Override
          public void run() {
          try {
          TimeUnit.SECONDS.sleep(5);
          System.out.println(Thread.currentThread().getName() + "執(zhí)行l(wèi)ongTask-name-" + name + "完畢");
          } catch (InterruptedException e) {
          System.err.println("longTask執(zhí)行過程中被打斷" + e.getMessage());
          }
          }
          }
          復(fù)制代碼

          上面的線程池最多只能一次性提交4個任務(wù),第5個任務(wù)提交后會被拒絕策略處理。啟動程序輸出如下:

          可以看到,第5個提交的任務(wù)由調(diào)用線程(即main線程)處理該任務(wù)。

          AbortPolicy

          AbortPolicy策略:丟棄任務(wù),并拋出RejectedExecutionException異常。前面的例子就是使用該策略,所以不再演示。

          DiscardOldestPolicy

          DiscardOldestPolicy策略:丟棄最早被放入到線程隊列的任務(wù),將新提交的任務(wù)放入到線程隊列末端:

          public static void main(String[] args) throws InterruptedException {
          ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
          2, 3, 10,
          TimeUnit.SECONDS,
          new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
          new ThreadPoolExecutor.DiscardOldestPolicy());

          threadPoolExecutor.execute(new shortTask("任務(wù)1"));
          threadPoolExecutor.execute(new longTask("任務(wù)2"));
          threadPoolExecutor.execute(new longTask("任務(wù)3"));
          threadPoolExecutor.execute(new shortTask("任務(wù)4"));
          threadPoolExecutor.execute(new shortTask("任務(wù)5"));

          threadPoolExecutor.shutdown();
          }

          static class shortTask implements Runnable {

          private String name;

          public shortTask(String name) {
          this.name = name;
          }

          @Override
          public void run() {
          try {
          TimeUnit.SECONDS.sleep(1);
          System.out.println(Thread.currentThread().getName() + "執(zhí)行shortTask-name-" + name + "完畢");
          } catch (InterruptedException e) {
          System.err.println("shortTask執(zhí)行過程中被打斷" + e.getMessage());
          }
          }
          }

          static class longTask implements Runnable {

          private String name;

          public longTask(String name) {
          this.name = name;
          }

          @Override
          public void run() {
          try {
          TimeUnit.SECONDS.sleep(5);
          System.out.println(Thread.currentThread().getName() + "執(zhí)行l(wèi)ongTask-name-" + name + "完畢");
          } catch (InterruptedException e) {
          System.err.println("longTask執(zhí)行過程中被打斷" + e.getMessage());
          }
          }
          }
          復(fù)制代碼

          啟動程序輸出如下:可以看到最后提交的任務(wù)被執(zhí)行了,而第3個任務(wù)是第一個被放到線程隊列的任務(wù),被丟棄了。

          DiscardPolicy

          DiscardPolicy策略:直接丟棄新的任務(wù),不拋異常:

          public static void main(String[] args) throws InterruptedException {
          ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
          2, 3, 10,
          TimeUnit.SECONDS,
          new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
          new ThreadPoolExecutor.DiscardPolicy());

          threadPoolExecutor.execute(new shortTask("任務(wù)1"));
          threadPoolExecutor.execute(new longTask("任務(wù)2"));
          threadPoolExecutor.execute(new longTask("任務(wù)3"));
          threadPoolExecutor.execute(new shortTask("任務(wù)4"));
          threadPoolExecutor.execute(new shortTask("任務(wù)5"));

          threadPoolExecutor.shutdown();
          }

          static class shortTask implements Runnable {

          private String name;

          public shortTask(String name) {
          this.name = name;
          }

          @Override
          public void run() {
          try {
          TimeUnit.SECONDS.sleep(1);
          System.out.println(Thread.currentThread().getName() + "執(zhí)行shortTask-name-" + name + "完畢");
          } catch (InterruptedException e) {
          System.err.println("shortTask執(zhí)行過程中被打斷" + e.getMessage());
          }
          }
          }

          static class longTask implements Runnable {

          private String name;

          public longTask(String name) {
          this.name = name;
          }

          @Override
          public void run() {
          try {
          TimeUnit.SECONDS.sleep(5);
          System.out.println(Thread.currentThread().getName() + "執(zhí)行l(wèi)ongTask-name-" + name + "完畢");
          } catch (InterruptedException e) {
          System.err.println("longTask執(zhí)行過程中被打斷" + e.getMessage());
          }
          }
          }
          復(fù)制代碼

          啟動程序,輸出如下:

          第5個任務(wù)直接被拒絕丟棄了,而沒有拋出任何異常。

          線程池工廠方法

          除了使用ThreadPoolExecutor的構(gòu)造方法創(chuàng)建線程池外,我們也可以使用Executors提供的工廠方法來創(chuàng)建不同類型的線程池:

          newFixedThreadPool

          查看newFixedThreadPool方法源碼:

          public static ExecutorService newFixedThreadPool(int nThreads) {
          return new ThreadPoolExecutor(nThreads, nThreads,
          0L, TimeUnit.MILLISECONDS,
          new LinkedBlockingQueue<Runnable>());
          }
          復(fù)制代碼

          可以看到,通過newFixedThreadPool創(chuàng)建的是一個固定大小的線程池,大小由nThreads參數(shù)指定,它具有如下幾個特點(diǎn):

          1. 因?yàn)閏orePoolSize和maximumPoolSize的值都為nThreads,所以線程池中線程數(shù)量永遠(yuǎn)等于nThreads,不可能新建除了核心線程數(shù)的線程來處理任務(wù),即keepAliveTime實(shí)際上在這里是無效的。

          2. LinkedBlockingQueue是一個無界隊列(最大長度為Integer.MAX_VALUE),所以這個線程池理論是可以無限的接收新的任務(wù),這就是為什么上面沒有指定拒絕策略的原因。

          newCachedThreadPool

          查看newCachedThreadPool方法源碼:

          public static ExecutorService newCachedThreadPool() {
          return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
          60L, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>());
          }
          復(fù)制代碼

          這是一個理論上無限大小的線程池:

          1. 核心線程數(shù)為0,SynchronousQueue隊列是沒有長度的隊列,所以當(dāng)有新的任務(wù)提交,如果有空閑的還未超時的(最大空閑時間60秒)線程則執(zhí)行該任務(wù),否則新增一個線程來處理該任務(wù)。

          2. 因?yàn)榫€程數(shù)量沒有限制,理論上可以接收無限個新任務(wù),所以這里也沒有指定拒絕策略。

          newSingleThreadExecutor

          查看newSingleThreadExecutor源碼:

          public static ExecutorService newSingleThreadExecutor() {
          return new FinalizableDelegatedExecutorService
          (new ThreadPoolExecutor(1, 1,
          0L, TimeUnit.MILLISECONDS,
          new LinkedBlockingQueue<Runnable>()));
          }
          復(fù)制代碼
          1. 核心線程數(shù)和最大線程數(shù)都為1,每次只能有一個線程處理任務(wù)。

          2. LinkedBlockingQueue隊列可以接收無限個新任務(wù)。

          newScheduledThreadPool

          查看newScheduledThreadPool源碼:

          public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
          return new ScheduledThreadPoolExecutor(corePoolSize);
          }
          ......

          public ScheduledThreadPoolExecutor(int corePoolSize) {
          super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
          }
          復(fù)制代碼

          所以newScheduledThreadPool理論是也是可以接收無限個任務(wù),DelayedWorkQueue也是一個無界隊列。

          使用newScheduledThreadPool創(chuàng)建的線程池除了可以處理普通的Runnable任務(wù)外,它還具有調(diào)度的功能:

          1.延遲指定時間后執(zhí)行:

          ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
          // 延遲5秒執(zhí)行
          executorService.schedule(() -> System.out.println("hello"), 5, TimeUnit.SECONDS);
          復(fù)制代碼

          2.按指定的速率執(zhí)行:

          ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
          // 延遲1秒執(zhí)行,然后每5秒執(zhí)行一次
          executorService.scheduleAtFixedRate(
          () -> System.out.println(LocalTime.now()), 1, 5, TimeUnit.SECONDS
          );
          復(fù)制代碼

          3.按指定的時延執(zhí)行:

          ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
          executorService.scheduleWithFixedDelay(
          () -> System.out.println(LocalTime.now()), 1, 5, TimeUnit.SECONDS
          );
          復(fù)制代碼

          乍一看,scheduleAtFixedRate和scheduleWithFixedDelay沒啥區(qū)別,實(shí)際它們還是有區(qū)別的:

          • scheduleAtFixedRate按照固定速率執(zhí)行任務(wù),比如每5秒執(zhí)行一個任務(wù),即使上一個任務(wù)沒有結(jié)束,5秒后也會開始處理新的任務(wù);

          • scheduleWithFixedDelay按照固定的時延處理任務(wù),比如每延遲5秒執(zhí)行一個任務(wù),無論上一個任務(wù)處理了1秒,1分鐘還是1小時,下一個任務(wù)總是在上一個任務(wù)執(zhí)行完畢后5秒鐘后開始執(zhí)行。

          對于這些線程池工廠方法的使用,阿里巴巴編程規(guī)程指出:

          因?yàn)檫@幾個線程池理論是都可以接收無限個任務(wù),所以這就有內(nèi)存溢出的風(fēng)險。實(shí)際上只要我們掌握了ThreadPoolExecutor構(gòu)造函數(shù)7個參數(shù)的含義,我們就可以根據(jù)不同的業(yè)務(wù)來創(chuàng)建出符合需求的線程池。一般線程池的創(chuàng)建可以參考如下規(guī)則:

          • IO密集型任務(wù):IO密集型任務(wù)線程并不是一直在執(zhí)行任務(wù),應(yīng)該配置盡可能多的線程,線程池線程數(shù)量推薦設(shè)置為2 * CPU核心數(shù);對于IO密集型任務(wù),網(wǎng)絡(luò)上也有另一種線程池數(shù)量計算公式:CPU核心數(shù)/(1 - 阻塞系數(shù)),阻塞系數(shù)取值0.8~0.9,至于這兩種公式使用哪一個,可以根據(jù)實(shí)際環(huán)境測試比較得出;

          • 計算密集型任務(wù):此類型需要CPU的大量運(yùn)算,所以盡可能的去壓榨CPU資源,線程池線程數(shù)量推薦設(shè)置為CPU核心數(shù) + 1。

          CPU核心數(shù)可以使用Runtime獲得:

          Runtime.getRuntime().availableProcessors()
          復(fù)制代碼

          一些API的用法

          ThreadPoolExecutor提供了幾個判斷線程池狀態(tài)的方法:

          public static void main(String[] args) throws InterruptedException {
          ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
          1, 2, 5, TimeUnit.SECONDS,
          new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
          new ThreadPoolExecutor.AbortPolicy()
          );

          threadPoolExecutor.execute(() -> {
          try {
          TimeUnit.SECONDS.sleep(5);
          } catch (InterruptedException e) {
          e.printStackTrace();
          }
          });

          threadPoolExecutor.shutdown();
          System.out.println("線程池為shutdown狀態(tài):" + threadPoolExecutor.isShutdown());
          System.out.println("線程池正在關(guān)閉:" + threadPoolExecutor.isTerminating());
          System.out.println("線程池已經(jīng)關(guān)閉:" + threadPoolExecutor.isTerminated());
          threadPoolExecutor.awaitTermination(6, TimeUnit.SECONDS);
          System.out.println("線程池已經(jīng)關(guān)閉" + threadPoolExecutor.isTerminated());
          }
          復(fù)制代碼

          程序輸出如下:

          前面我們提到,線程池核心線程即使是空閑狀態(tài)也不會被銷毀,除非使用allowCoreThreadTimeOut設(shè)置了允許核心線程超時:

          public static void main(String[] args) throws InterruptedException {
          ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
          1, 2, 3, TimeUnit.SECONDS,
          new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
          new ThreadPoolExecutor.AbortPolicy()
          );
          threadPoolExecutor.allowCoreThreadTimeOut(true);
          threadPoolExecutor.execute(() -> {
          try {
          TimeUnit.SECONDS.sleep(5);
          System.out.println("任務(wù)執(zhí)行完畢");
          } catch (InterruptedException e) {
          e.printStackTrace();
          }
          });
          }
          復(fù)制代碼

          程序輸出如下所示:

          5秒后任務(wù)執(zhí)行完畢,核心線程處于空閑的狀態(tài)。因?yàn)橥ㄟ^allowCoreThreadTimeOut方法設(shè)置了允許核心線程超時,所以3秒后(keepAliveTime設(shè)置為3秒),核心線程被銷毀。核心線程被銷毀后,線程池也就沒有作用了,于是就自動關(guān)閉了。

          值得注意的是,如果一個線程池調(diào)用了allowCoreThreadTimeOut(true)方法,那么它的keepAliveTime不能為0。

          ThreadPoolExecutor提供了一remove方法,查看其源碼:

          public boolean remove(Runnable task) {
          boolean removed = workQueue.remove(task);
          tryTerminate(); // In case SHUTDOWN and now empty
          return removed;
          }
          復(fù)制代碼

          可看到,它刪除的是線程隊列中的任務(wù),而非正在被執(zhí)行的任務(wù)。舉個例子:

          public static void main(String[] args) throws InterruptedException {
          ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
          1, 2, 3, TimeUnit.SECONDS,
          new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
          new ThreadPoolExecutor.AbortPolicy()
          );
          threadPoolExecutor.execute(() -> {
          try {
          TimeUnit.SECONDS.sleep(5);
          System.out.println("任務(wù)執(zhí)行完畢");
          } catch (InterruptedException e) {
          e.printStackTrace();
          }
          });

          Runnable r = () -> System.out.println("看看我是否會被刪除");
          threadPoolExecutor.execute(r);
          threadPoolExecutor.remove(r);

          threadPoolExecutor.shutdown();
          }
          復(fù)制代碼

          執(zhí)行程序,輸出如下:

          可看到任務(wù)并沒有被執(zhí)行,已經(jīng)被刪除,因?yàn)槲ㄒ灰粋€核心線程已經(jīng)在執(zhí)行任務(wù)了,所以后提交的這個任務(wù)被放到了線程隊列里,然后通過remove方法刪除。

          默認(rèn)情況下,只有當(dāng)往線程池里提交了任務(wù)后,線程池才會啟動核心線程處理任務(wù)。我們可以通過調(diào)用prestartCoreThread方法,讓核心線程即使沒有任務(wù)提交,也處于等待執(zhí)行任務(wù)的活躍狀態(tài):

          public static void main(String[] args) throws InterruptedException {
          ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
          2, 2, 3, TimeUnit.SECONDS,
          new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
          new ThreadPoolExecutor.AbortPolicy()
          );
          System.out.println("活躍線程數(shù): " + threadPoolExecutor.getActiveCount());
          threadPoolExecutor.prestartCoreThread();
          System.out.println("活躍線程數(shù): " + threadPoolExecutor.getActiveCount());
          threadPoolExecutor.prestartCoreThread();
          System.out.println("活躍線程數(shù): " + threadPoolExecutor.getActiveCount());
          threadPoolExecutor.prestartCoreThread();
          System.out.println("活躍線程數(shù): " + threadPoolExecutor.getActiveCount());
          }
          復(fù)制代碼

          程序輸出如下所示:

          該方法返回boolean類型值,如果所以核心線程都啟動了,返回false,反之返回true。

          還有一個和它類似的prestartAllCoreThreads方法,它的作用是一次性啟動所有核心線程,讓其處于活躍地等待執(zhí)行任務(wù)的狀態(tài)。

          ThreadPoolExecutor的invokeAny方法用于隨機(jī)執(zhí)行任務(wù)集合中的某個任務(wù),并返回執(zhí)行結(jié)果,該方法是同步方法:

          public static void main(String[] args) throws InterruptedException, ExecutionException {
          ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
          2, 5, 3, TimeUnit.SECONDS,
          new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
          new ThreadPoolExecutor.AbortPolicy()
          );

          // 任務(wù)集合
          List<Callable<Integer>> tasks = IntStream.range(0, 4).boxed().map(i -> (Callable<Integer>) () -> {
          TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
          return i;
          }).collect(Collectors.toList());
          // 隨機(jī)執(zhí)行結(jié)果
          Integer result = threadPoolExecutor.invokeAny(tasks);
          System.out.println("-------------------");
          System.out.println(result);
          threadPoolExecutor.shutdownNow();
          }
          復(fù)制代碼

          啟動程序,輸出如下:

          ThreadPoolExecutor的invokeAll則是執(zhí)行任務(wù)集合中的所有任務(wù),返回Future集合:

          public static void main(String[] args) throws InterruptedException, ExecutionException {
          ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
          2, 5, 3, TimeUnit.SECONDS,
          new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
          new ThreadPoolExecutor.AbortPolicy()
          );

          List<Callable<Integer>> tasks = IntStream.range(0, 4).boxed().map(i -> (Callable<Integer>) () -> {
          TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
          return i;
          }).collect(Collectors.toList());

          List<Future<Integer>> futureList = threadPoolExecutor.invokeAll(tasks);
          futureList.stream().map(f->{
          try {
          return f.get();
          } catch (InterruptedException | ExecutionException e) {
          return null;
          }
          }).forEach(System.out::println);

          threadPoolExecutor.shutdownNow();
          }
          復(fù)制代碼

          輸出如下:

          總結(jié)下這些方法:

          方法描述
          allowCoreThreadTimeOut(boolean value)是否允許核心線程空閑后超時,是的話超時后核心線程將銷毀,線程池自動關(guān)閉
          awaitTermination(long timeout, TimeUnit unit)阻塞當(dāng)前線程,等待線程池關(guān)閉,timeout用于指定等待時間。
          execute(Runnable command)向線程池提交任務(wù),沒有返回值
          submit(Runnable task)向線程池提交任務(wù),返回Future
          isShutdown()判斷線程池是否為shutdown狀態(tài)
          isTerminating()判斷線程池是否正在關(guān)閉
          isTerminated()判斷線程池是否已經(jīng)關(guān)閉
          remove(Runnable task)移除線程隊列中的指定任務(wù)
          prestartCoreThread()提前讓一個核心線程處于活躍狀態(tài),等待執(zhí)行任務(wù)
          prestartAllCoreThreads()提前讓所有核心線程處于活躍狀態(tài),等待執(zhí)行任務(wù)
          getActiveCount()獲取線程池活躍線程數(shù)
          getCorePoolSize()獲取線程池核心線程數(shù)
          threadPoolExecutor.getQueue()獲取線程池線程隊列
          getMaximumPoolSize()獲取線程池最大線程數(shù)
          shutdown()讓線程池處于shutdown狀態(tài),不再接收任務(wù),等待所有正在運(yùn)行中的任務(wù)結(jié)束后,關(guān)閉線程池。
          shutdownNow()讓線程池處于stop狀態(tài),不再接受任務(wù),嘗試打斷正在運(yùn)行中的任務(wù),并關(guān)閉線程池,返回線程隊列中的任務(wù)。


          作者:Dynasty
          鏈接:https://juejin.cn/post/7013213892067196936
          來源:稀土掘金
          著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。



          瀏覽 93
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩无码三级 | 亚洲成人黄色影院 | 黄片黄片黄片黄片黄片黄片黄片 | 天天草天天射天天撸 | 麻豆三级片电影 |