<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          使用線程池時一定要注意的五個點

          共 20825字,需瀏覽 42分鐘

           ·

          2021-05-15 23:34

          一、使用線程池在流量突發(fā)期間能夠平滑地服務(wù)降級

          很多場景下應(yīng)用程序必須能夠處理一系列傳入請求,簡單的處理方式是通過一個線程順序的處理這些請求,如下圖:

          單線程策略的優(yōu)勢和劣勢都非常明顯:

          優(yōu)勢:設(shè)計和實現(xiàn)簡單;劣勢:這種方式會帶來處理效率的問題,單線程的處理能力是有限,不能發(fā)揮多核處理器優(yōu)勢。

          在這種場景下我們就需要考慮并發(fā),一個簡單的并發(fā)策略就是Thread-Per-Message模式,即為每個請求使用一個新的線程。Thread-Per-Message策略的優(yōu)勢和劣勢也非常明顯:

          優(yōu)勢:設(shè)計和實現(xiàn)比較簡單,能夠同時處理多個請求,提升響應(yīng)效率;

          劣勢:主要在兩個方面

          1.資源消耗 引入了在串行執(zhí)行中所沒有的開銷,包括線程創(chuàng)建和調(diào)度,任務(wù)處理,資源分配和回收以及頻繁上下文切換所需的時間和資源。2.安全

          • 攻擊者可以通過一次進行大量請求使系統(tǒng)癱瘓并且拒絕服務(wù) (DoS),從而導(dǎo)致系統(tǒng)立即不響應(yīng)而不是平滑地退出。
          • 從安全角度來看,一個組件可能由于連續(xù)的錯誤而耗盡所有資源,因此使所有其他組件無法獲得資源。

          有沒有一種方式可以并發(fā)執(zhí)行又可以克服Thread-Per-Message的問題?

          采用線程池的策略,線程池通過控制并發(fā)執(zhí)行的工作線程的最大數(shù)量來解決Thread-Per-Message帶來的問題。可見下圖,請求來臨時先放入線程池的隊列

          線程池可以接受一個RunnableCallable<T>任務(wù),并將其存儲在臨時隊列中,當(dāng)有空閑線程時可以從隊列中拿到一個任務(wù)并執(zhí)行。

          反例(使用 Thread-Per-Message 策略)

          class Helper {
              public void handle(Socket socket) {
                  // do something
              }
          }

          final class RequestHandler {
              private final Helper helper = new Helper();

              //......
              private RequestHandler(int port) throws IOException {
                  //do something
              }

              public void handleRequest() {
                  new Thread(new Runnable() {
                      public void run() {
                          try {
                              helper.handle(server.accept());
                          } catch (IOException e) {
                              // Forward to handler
                          }
                      }
                  }).start();
              }
          }

          正例(使用 線程池 策略)

          class Helper {
              public void handle(Socket socket) {
                  // do something
              }
          }

          final class RequestHandler {
              private final Helper helper = new Helper();
              private final ServerSocket server;
              private final ExecutorService exec;

              private RequestHandler(int port, int poolSize) throws IOException {
                  server = new ServerSocket(port);
                  exec = Executors.newFixedThreadPool(poolSize);
              }

              public static RequestHandler newInstance(int poolSize) throws IOException {
                  return new RequestHandler(0, poolSize);
              }

              public void handleRequest() {
                  Future<?> future = exec.submit(new Runnable() {
                      @Override
                      public void run() {
                          try {
                              helper.handle(server.accept());
                          } catch (IOException e) {
                              // Forward to handler
                          }
                      }
                  });
              }
              // ... Other methods such as shutting down the thread pool
              // and task cancellation ...
          }

          JAVA 中(JDK 1.5+)線程池的種類:

          • newFixedThreadPool()

          • newCachedThreadPool()

          • newSingleThreadExecutor()

          • newScheduledThreadPool()

            線程池的詳細使用方法可參見Java API文檔

          二、不要在有界線程池中執(zhí)行相互依賴的任務(wù)

          程序不能使用來自有界線程池的線程來執(zhí)行依賴于線程池中其他任務(wù)的任務(wù)。

          有兩個場景:

          1. 當(dāng)線程池中正在執(zhí)行的線程阻塞在依賴于線程池中其他任務(wù)的完成上,這樣就會出現(xiàn)稱為線程饑餓(threadstarvation)死鎖的死鎖形式。
          2. 線程饑餓死鎖還會發(fā)生在當(dāng)前執(zhí)行的任務(wù)向線程池提交其他任務(wù)并等待這些任務(wù)完成的時候,然而此時線程池缺乏一次容納所有任務(wù)的能力。

          要緩解上面兩個場景產(chǎn)生的問題有兩個簡單的辦法:

          1. 擴大線程池中的線程數(shù),以容納更多的任務(wù),但 決定一個線程池合適的大小可能是困難的甚至不可能的。
          2. 線程池中的隊列改為無界,由于系統(tǒng)資源有限,無界隊列只能說是盡可能容納任務(wù) 但饑餓死鎖的現(xiàn)象無法消除。

          真正解決此類方法還是需要梳理線程池執(zhí)行業(yè)務(wù)流程,不要在有界線程池中執(zhí)行相互依賴的任務(wù),防止出現(xiàn)競爭和死鎖。

          三、確保提交到線程池的任務(wù)可中斷

          向線程池提交的任務(wù)需要支持中斷。從而保證線程可以中斷,線程池可以關(guān)閉。線程池支持 java.util.concurrent.ExecutorService.shutdownNow() 方法,該方法嘗試停止所有正在執(zhí)行的任務(wù),停止等待任務(wù)的處理,并返回等待執(zhí)行的任務(wù)的列表。

          但是 shutdownNow() 除了盡力嘗試停止處理主動執(zhí)行的任務(wù)之外不能保證一定能夠停止。例如,典型的實現(xiàn)是通過Thread.interrupt()來停止,因此任何未能響應(yīng)中斷的任務(wù)可能永遠不會終止,也就造成線程池?zé)o法真正的關(guān)閉。

          反例:

          public final class Worker implements Runnable // Thread‐safe class
              private AtomBoolean flag = new AtomBoolean(true);

              public Worker() throws IOException {
                  //do something
              }

              // Only one thread can use the socket at a particular time
              @Override
              public void run() {
                  try {
                      while (flag.get()) {
                          // do something
                      }
                  } catch (IOException ie) {
                      // Forward to handler
                  }
              }

              public void shutdown() {
                  this.flag.set(false);
              }
          }

          正例:

          public final class Worker implements Runnable // Thread‐safe class
              public Worker() throws IOException {
                  //do something
              }

              // Only one thread can use the socket at a particular time
              @Override
              public void run() {
                  try {
                      while (!Thread.interrupted()) {
                          // do something
                      }
                  } catch (IOException ie) {
                      // Forward to handler
                  }
              }
          }

          四、確保在線程池中執(zhí)行的任務(wù)不能悄無聲息地失敗

          線程池中的所有任務(wù)必須提供機制,如果它們異常終止,則需要通知應(yīng)用程序.

          如果不這樣做不會導(dǎo)致資源泄漏,但由于池中的線程仍然被會重復(fù)使用,使故障診斷非常困難或不可能。

          在應(yīng)用程序級別處理異常的最好方法是使用異常處理。異常處理可以執(zhí)行診斷操作,清理和關(guān)閉Java虛擬機,或者只是記錄故障的詳細信息。

          也就是說在線程池里執(zhí)行的任務(wù)也需要能夠拋出異常并被捕獲處理。

          任務(wù)恢復(fù)或清除操作可以通過重寫 java.util.concurrent.ThreadPoolExecutor 類的 afterExecute() 鉤子來執(zhí)行。

          當(dāng)任務(wù)通過執(zhí)行其 run() 方法中的所有語句并且成功結(jié)束任務(wù),或者由于異常而導(dǎo)致任務(wù)停止時,將調(diào)用此鉤子。

          可以通過自定義 ThreadPoolExecutor 服務(wù)來重載 afterExecute() 鉤子。

          還可以通過重載 terminated() 方法來釋放線程池獲取的資源,就像一個finally塊。

          反例:

          final class PoolService {
              private final ExecutorService pool = Executors.newFixedThreadPool(10);

              public void doSomething() {
                  pool.execute(new Task());
              }
          }

          final class Task implements Runnable {
              @Override
              public void run() {
                  // do something
                  throw new NullPointerException();
              }
          }

          任務(wù)意外終止時作為一個運行時異常,無法通知應(yīng)用程序。此外,它缺乏恢復(fù)機制。因此,如果Task拋出一個NullPointerException ,異常將被忽略。

          正例:

          class CustomThreadPoolExecutor extends ThreadPoolExecutor {
              // ... Constructor ...
              public CustomThreadPoolExecutor(
                  int corePoolSize, int maximumPoolSize, long keepAliveTime,
                          TimeUnit unit, BlockingQueue<Runnable> workQueue)
           
          {
                              super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
                          }
              @Override
              public void afterExecute(Runnable r, Throwable t) {
                  super.afterExecute(r, t);
                  if (t != null) {
                      // Exception occurred, forward to handler
                  }
                      // ... Perform task‐specific cleanup actions
                  }
              @Override
              public void terminated() {
                  super.terminated();
                  // ... Perform final clean‐up actions
              }
          }

          另外一種方式是使用 ExecutorService.submit() 方法(代替 execute() 方法)將任務(wù)提交到線程池并獲取 Future 對象。

          當(dāng)通過 ExecutorService.submit() 提交任務(wù)時,拋出的異常并未到達未捕獲的異常處理機制,因為拋出的異常被認為是返回狀態(tài)的一部分,因此被包裝在ExecutionException ,并由Future.get() 返回。

          Future<?> future = threadPool.submit(new Task());
          try {
              future.get();
          catch (InterruptedException e) {
              Thread.currentThread().interrupt();  // Reset interrupted status
          catch (ExecutionException e) {
              Throwable exception = e.getCause();
              // Forward to exception reporter
          }

          五、確保在使用線程池時重新初始化ThreadLocal變量

          java.lang.ThreadLocal 類提供線程內(nèi)的本地變量。根據(jù)Java API

          這些變量與其它正常變量不同,每個線程訪問(通過其get或set方法)都有其屬于各自線程的,獨立初始化的變量拷貝。ThreadLocal實例通常是一些希望將狀態(tài)與線程(例如,用戶ID或事務(wù)ID)相關(guān)聯(lián)的類中的私有靜態(tài)字段。

          ThreadLocal對象需要關(guān)注那些對象被線程池中的多個線程執(zhí)行的類。

          線程池緩存技術(shù)允許線程重用以減少線程創(chuàng)建開銷,或者當(dāng)創(chuàng)建無限數(shù)量的線程時可以降低系統(tǒng)的可靠性。

          當(dāng) ThreadLocal 對象在一個線程中被修改,隨后變得可重用時,在重用的線程上執(zhí)行的下一個任務(wù)將能看到該線程上執(zhí)行過的上一個任務(wù)修改的ThreadLocal 對象的狀態(tài)。

          所以要在使用線程池時重新初始化的ThreadLocal對象實例。

          反例:

          public enum Day {
              MONDAY, TUESDAY, WEDNESDAY, THURSDAY, FRIDAY, SATURDAY, SUNDAY;
          }

          public final class Diary {
              private static final ThreadLocal<Day> days = new ThreadLocal<Day>() {
                  // Initialize to Monday
                  protected Day initialValue() {
                      return Day.MONDAY;
                  }
              };
              private static Day currentDay() {
                  return days.get();
              }
              public static void setDay(Day newDay) {
                  days.set(newDay);
              }
              // Performs some thread‐specific task
              public void threadSpecificTask() {
                  // Do task ...
              }
          }

          public final class DiaryPool {
              final int numOfThreads = 2// Maximum number of threads allowed in pool
              final Executor exec;
              final Diary diary;

              DiaryPool() {
                  exec = (Executor) Executors.newFixedThreadPool(numOfThreads);
                  diary = new Diary();
              }

              public void doSomething1() {
                  exec.execute(new Runnable() {
                      @Override
                      public void run() {
                          diary.setDay(Day.FRIDAY);
                          diary.threadSpecificTask();
                      }
                  });
              }

              public void doSomething2() {
                  exec.execute(new Runnable() {
                      @Override
                      public void run() {
                          diary.threadSpecificTask();
                      }
                  });
              }

              public static void main(String[] args) {
                  DiaryPool dp = new DiaryPool();
                  dp.doSomething1(); // Thread 1, requires current day as Friday
                  dp.doSomething2(); // Thread 2, requires current day as Monday
                  dp.doSomething2(); // Thread 3, requires current day as Monday
              }
          }

          DiaryPool類創(chuàng)建了一個線程池,它可以通過一個共享的無界的隊列來重用固定數(shù)量的線程。

          在任何時候,不超過numOfThreads個線程正在處理任務(wù)。如果在所有線程都處于活動狀態(tài)時提交其他任務(wù),則 它們在隊列中等待,直到線程可用。

          當(dāng)線程循環(huán)時,線程的線程局部狀態(tài)仍然存在。

          下表顯示了可能的執(zhí)行順序:

          時間任務(wù)線程池提交方法日期
          1t11doSomething1()星期五
          2t22doSomething2()星期一
          3t31doSomething3()星期五

          在這個執(zhí)行順序中,期望從doSomething2() 開始的兩個任務(wù)( t 2和t 3 doSomething2() 將當(dāng)天視為星 期一。然而,因為池線程1被重用,所以t 3觀察到星期五。

          解決方案(try-finally條款)

          符合規(guī)則的方案removeDay() 方法添加到Diary類,并在try‐finally 塊中的實現(xiàn)doSomething1() 類的doSomething1() 方法的語句。finally 塊通過刪除當(dāng)前線程中的值來恢復(fù)threadlocal類型的days對象的初始狀態(tài)。

          public final class Diary {
              // ...
              public static void removeDay() {
                  days.remove();
              }
          }

          public final class DiaryPool {
              // ...
              public void doSomething1() {
                  exec.execute(new Runnable() {
                      @Override
                      public void run() {
                          try {
                              Diary.setDay(Day.FRIDAY);
                              diary.threadSpecificTask();
                          } finally {
                              Diary.removeDay(); // Diary.setDay(Day.MONDAY)
                              // can also be used
                          }
                      }
                  });
              }
          // ...
          }

          如果threadlocal變量再次被同一個線程讀取,它將使用initialValue()方法重新初始化 ,除非任務(wù)已經(jīng)明確設(shè)置了變量的值。這個解決方案將維護的責(zé)任轉(zhuǎn)移到客戶端( DiaryPool ),但是當(dāng)Diary類不能被修改時是一個好的選擇。

          解決方案(beforeExecute())

          使用一個自定義ThreadPoolExecutor 來擴展 ThreadPoolExecutor 并覆蓋beforeExecute() 方法。beforeExecute() 方法在 Runnable 任務(wù)在指定線程中執(zhí)行之前被調(diào)用。該方法在線程 “t” 執(zhí)行任務(wù) “r” 之前重新初始化 threadlocal 變量。

          class CustomThreadPoolExecutor extends ThreadPoolExecutor {
              public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                                              long keepAliveTime, TimeUnit unit,
                                              BlockingQueue<Runnable> workQueue)
           
          {
                  super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
              }

              @Override
              public void beforeExecute(Thread t, Runnable r) {
                  if (t == null || r == null) {
                      throw new NullPointerException();
                  }
                  Diary.setDay(Day.MONDAY);
                  super.beforeExecute(t, r);
              }
          }

          public final class DiaryPool {
              // ...
              DiaryPool() {
                  exec = new CustomThreadPoolExecutor(NumOfthreads, NumOfthreads,
                          10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));
                  diary = new Diary();
              }
              // ...
          }
          瀏覽 52
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  www国产亚洲精品久久网站 | 三级av免费电影 三级a片在线观看 | 尻屄视频网址 | 美女尻屄 | 日韩免费毛片 |