<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Java線程池源碼解析及高質(zhì)量代碼案例

          共 68826字,需瀏覽 138分鐘

           ·

          2021-05-30 08:51

          點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”

          優(yōu)質(zhì)文章,第一時(shí)間送達(dá)

            作者 |  Star先生

          來源 |  urlify.cn/EvIrIr

          引言

          本文為Java高級(jí)編程中的一些知識(shí)總結(jié),其中第一章對Jdk 1.7.0_25中的多線程架構(gòu)中的線程池ThreadPoolExecutor源碼進(jìn)行架構(gòu)原理介紹以及源碼解析。第二章則分析了幾個(gè)違反Java高質(zhì)量代碼案例以及相應(yīng)解決辦法。如有總結(jié)的不好的地方,歡迎大家提出寶貴的意見和建議。 

          Java線程池架構(gòu)原理及源碼解析

          ThreadPoolExecutor是一個(gè) ExecutorService,它使用可能的幾個(gè)池線程之一執(zhí)行每個(gè)提交的任務(wù),通常使用 Executors 工廠方法配置。線程池可以解決兩個(gè)不同問題:由于減少了每個(gè)任務(wù)調(diào)用的開銷,它們通常可以在執(zhí)行大量異步任務(wù)時(shí)提供增強(qiáng)的性能,并且還可以提供綁定和管理資源(包括執(zhí)行任務(wù)集時(shí)使用的線程)的方法。每個(gè) ThreadPoolExecutor 還維護(hù)著一些基本的統(tǒng)計(jì)數(shù)據(jù),如完成的任務(wù)數(shù)。

          構(gòu)建參數(shù)源碼

          public ThreadPoolExecutor(int corePoolSize,
                                    int maximumPoolSize,
                                    long keepAliveTime,
                                    TimeUnit unit,
                                    BlockingQueue<Runnable> workQueue,
                                    RejectedExecutionHandler handler)
          {
              this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                   Executors.defaultThreadFactory(), handler);
          }

          參數(shù)解釋

          • corePoolSize:核心線程數(shù),會(huì)一直存活,即使沒有任務(wù),線程池也會(huì)維護(hù)線程的最少數(shù)量。

          • maximumPoolSize:線程池維護(hù)線程的最大數(shù)量。

          • keepAliveTime:線程池維護(hù)線程所允許的空閑時(shí)間,當(dāng)線程空閑時(shí)間達(dá)到keepAliveTime,該線程會(huì)退出,直到線程數(shù)量等于corePoolSize。如果allowCoreThreadTimeout設(shè)置為 
            true,則所有線程均會(huì)退出直到線程數(shù)量為0。 
            unit:線程池維護(hù)線程所允許的空閑時(shí)間的單位、可選參數(shù)值為:TimeUnit中的幾個(gè)靜態(tài)屬性:NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。

          • workQueue:線程池所使用的緩沖隊(duì)列,常用的是:java.util.concurrent.ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue。

          • handler:線程池中的數(shù)量大于maximumPoolSize,對拒絕任務(wù)的處理策略,默認(rèn)值ThreadPoolExecutor.AbortPolicy()。

          源碼詳細(xì)解析

          excute源碼

          public void execute(Runnable command)
          {
              if (command == null)
                  throw new NullPointerException();
              if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
              {
                  if (runState == RUNNING && workQueue.offer(command))
                  {
                      if (runState != RUNNING || poolSize == 0)
                          ensureQueuedTaskHandled(command);
                  }
                  else if (!addIfUnderMaximumPoolSize(command))
                      reject(command); // is shutdown or saturated
              }
          }

          一個(gè)任務(wù)通過 execute(Runnable)方法被添加到線程池,任務(wù)就是一個(gè)Runnable類型的對象,任務(wù)的執(zhí)行方法就是run()方法,如果傳入的為null,側(cè)拋出NullPointerException。 
          首先第一個(gè)判定空操作就不用說了,下面判定的poolSize >= corePoolSize成立時(shí)候會(huì)進(jìn)入if的區(qū)域,當(dāng)然它不成立也有可能會(huì)進(jìn)入,他會(huì)判定addIfUnderCorePoolSize是否返回false,如果返回false就會(huì)進(jìn)去。 
          如果當(dāng)前線程數(shù)小于corePoolSize,調(diào)用addIfUnderCorePoolSize方法,addIfUnderCorePoolSize方法首先調(diào)用mainLock加鎖,再次判斷當(dāng)前線程數(shù)小于corePoolSize并且線程池處于RUNNING狀態(tài),則調(diào)用addThread增加線程。 


           
          圖一:ThreadPoolExecutor運(yùn)行狀態(tài)圖 


          addIfUnderCorePoolSize源碼

          private boolean addIfUnderCorePoolSize(Runnable firstTask)
          {
              Thread t = null;
              final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              try
              {
                  if (poolSize < corePoolSize && runState == RUNNING)
                      t = addThread(firstTask);
              }
              finally
              {
                  mainLock.unlock();
              }
              if (t == null)
                  return false;
              t.start();
              return true;
          }

          addThread方法首先創(chuàng)建Work對象,然后調(diào)用threadFactory創(chuàng)建新的線程,如果創(chuàng)建的線程不為null,將Work對象的 thread屬性設(shè)置為此創(chuàng)建出來的線程,并將此Work對象放入workers中,然后在增加當(dāng)前線程池的中線程數(shù),增加后回到 addIfUnderCorePoolSize方法 ,釋放mainLock,最后啟動(dòng)這個(gè)新創(chuàng)建的線程來執(zhí)行新傳入的任務(wù)。 
          可以發(fā)現(xiàn),這段源碼是如果發(fā)現(xiàn)小于corePoolSize就會(huì)創(chuàng)建一個(gè)新的線程,并且調(diào)用線程的start()方法將線程運(yùn)行起來:這個(gè)addThread()方法,我們先不考慮細(xì)節(jié),因?yàn)槲覀冞€要先看到前面是怎么進(jìn)去的,這里可以發(fā)信啊,只有沒有創(chuàng)建成功Thread才會(huì)返回false,也就是當(dāng)當(dāng)前的poolSize > corePoolSize的時(shí)候,或線程池已經(jīng)不是在running狀態(tài)的時(shí)候才會(huì)出現(xiàn)。 
          注意:這里在外部判定一次poolSize和corePoolSize只是初步判定,內(nèi)部是加鎖后判定的,以得到更為準(zhǔn)確的結(jié)果,而外部初步判定如果是大于了,就沒有必要進(jìn)入這段有鎖的代碼了。

          addThread源碼

          private Thread addThread(Runnable firstTask)
          {
              Worker w = new Worker(firstTask);
              Thread t = threadFactory.newThread(w);
              < span style = "color:#ff0000;" > < / span >
                             if (t != null)
              {
                  w.thread = t;
                  workers.add(w);
                  int nt = ++poolSize;
                  if (nt > largestPoolSize)
                      largestPoolSize = nt;
              }
              return t;
          }

          ThreadFactory接口默認(rèn)實(shí)現(xiàn)DefaultThreadFactory

          public Thread newThread(Runnable r)
          {
              Thread t = new Thread(group, r,
                                    namePrefix + threadNumber.getAndIncrement(),
                                    0);
              if (t.isDaemon())
                  t.setDaemon(false);
              if (t.getPriority() != Thread.NORM_PRIORITY)
                  t.setPriority(Thread.NORM_PRIORITY);
              return t;
          }

          這里創(chuàng)建了一個(gè)Work,其余的操作,就是講poolSize疊加,然后將將其放入workers的運(yùn)行隊(duì)列等操作; 
          我們主要關(guān)心Worker是干什么的,因?yàn)檫@個(gè)threadFactory對我們用途不大,只是做了Thread的命名處理;而Worker你會(huì)發(fā)現(xiàn)它的定義也是一個(gè)Runnable,外部開始在代碼段中發(fā)現(xiàn)了調(diào)用哪個(gè)這個(gè)Worker的start()方法,也就是線程的啟動(dòng)方法,其實(shí)也就是調(diào)用了Worker的run()方法,那么我們重點(diǎn)要關(guān)心run方法是如何處理的。

          Worker的run方法

          public void run()
          {
              try
              {
                  Runnable task = firstTask;
                  firstTask = null;
                  while (task != null || (task = getTask()) != null)
                  {
                      runTask(task);
                      task = null;
                  }
              }
              finally
              {
                  workerDone(this);
              }
          }

          從以上方法可以看出,Worker所在的線程啟動(dòng)后,首先執(zhí)行創(chuàng)建其時(shí)傳入的Runnable任務(wù),執(zhí)行完成后,循環(huán)調(diào)用getTask來獲取新的任務(wù),在沒有任務(wù)的情況下,退出此線程。FirstTask其實(shí)就是開始在創(chuàng)建work的時(shí)候,由外部傳入的Runnable對象,也就是你自己的Thread,你會(huì)發(fā)現(xiàn)它如果發(fā)現(xiàn)task為空,就會(huì)調(diào)用getTask()方法再判定,直到兩者為空,并且是一個(gè)while循環(huán)體。

          getTask源碼

          Runnable getTask()
          {
              for (;;)
              {
                  try
                  {
                      int state = runState;
                      if (state > SHUTDOWN)
                          return null;
                      Runnable r;
                      if (state == SHUTDOWN)  // Help drain queue
                          r = workQueue.poll();
                      else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                          r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                      else
                          r = workQueue.take();
                      if (r != null)
                          return r;
                      if (workerCanExit())
                      {
                          if (runState >= SHUTDOWN) // Wake up others
                              interruptIdleWorkers();
                          return null;
                      }
                      // Else retry
                  }
                  catch (InterruptedException ie)
                  {
                      // On interruption, re-check runState
                  }
              }
          }

          你會(huì)發(fā)現(xiàn)它是從workQueue隊(duì)列中,也就是等待隊(duì)列中獲取一個(gè)元素出來并返回!當(dāng)前線程運(yùn)行完后,在到workQueue中去獲取一個(gè)task出來,繼續(xù)運(yùn)行,這樣就保證了線程池中有一定的線程一直在運(yùn)行;此時(shí)若跳出了while循 環(huán),只有workQueue隊(duì)列為空才會(huì)出現(xiàn)或出現(xiàn)了類似于shutdown的操作,自然運(yùn)行隊(duì)列會(huì)減少1,當(dāng)再有新的線程進(jìn)來的時(shí)候,就又開始向 worker里面放數(shù)據(jù)了,這樣以此類推,實(shí)現(xiàn)了線程池的功能。

          execute方法部分實(shí)現(xiàn)

          if (runState == RUNNING && workQueue.offer(command))
          {
              if (runState != RUNNING || poolSize == 0)
                  ensureQueuedTaskHandled(command);
          }
          else if (!addIfUnderMaximumPoolSize(command))
              reject(command); // is shutdown or saturated
          如果當(dāng)前線程池?cái)?shù)量大于corePoolSize或addIfUnderCorePoolSize方法執(zhí)行失敗,則執(zhí)行后續(xù)操作;如果線程池處于運(yùn)行狀態(tài) 并且workQueue中成功加入任務(wù),再次判斷如果線程池的狀態(tài)不為運(yùn)行狀態(tài)或當(dāng)前線程池?cái)?shù)為0,則調(diào)用 ensureQueuedTaskHandled方法

          ensureQueuedTaskHandled源碼

          private void ensureQueuedTaskHandled(Runnable command)
          {
              final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              boolean reject = false;
              Thread t = null;
              try
              {
                  int state = runState;
                  if (state != RUNNING && workQueue.remove(command))
                      reject = true;
                  else if (state < STOP &&
                           poolSize < Math.max(corePoolSize, 1) &&
                           !workQueue.isEmpty())
                      t = addThread(null);
              }
              finally
              {
                  mainLock.unlock();
              }
              if (reject)
                  reject(command);
              else if (t != null)
                  t.start();
          }

          第一個(gè)if,也就是當(dāng)當(dāng)前狀態(tài)為running的時(shí)候,就會(huì)去執(zhí)行workQueue.offer(command),這個(gè)workQueue其實(shí)就是一 個(gè)BlockingQueue,offer()操作就是在隊(duì)列的尾部寫入一個(gè)對象,此時(shí)寫入的對象為線程的對象而已;所以你可以認(rèn)為只有線程池在 RUNNING狀態(tài),才會(huì)在隊(duì)列尾部插入數(shù)據(jù),否則就執(zhí)行else if,其實(shí)else if可以看出是要做一個(gè)是否大于MaximumPoolSize的判定,如果大于這個(gè)值,就會(huì)做reject的操作。ensureQueuedTaskHandled方法判斷線程池運(yùn)行,如果狀態(tài)不為運(yùn)行狀態(tài),從workQueue中刪除,并調(diào)用reject做拒絕處理。

          reject源碼

          void reject(Runnable command)
          {
              handler.rejectedExecution(command, this);
          }

          再次回到execute方法

          if (runState == RUNNING && workQueue.offer(command))
          {
              if (runState != RUNNING || poolSize == 0)
                  ensureQueuedTaskHandled(command);
          }
          else if (!addIfUnderMaximumPoolSize(command))
              reject(command); // is shutdown or saturated

          如線程池workQueue offer失敗或不處于運(yùn)行狀態(tài),調(diào)用addIfUnderMaximumPoolSize, addIfUnderMaximumPoolSize方法基本和addIfUnderCorePoolSize實(shí)現(xiàn)類似,不同點(diǎn)在于根據(jù)最大線程數(shù)(maximumPoolSize)進(jìn)行比較,如果超過最大線程數(shù),返回false,調(diào)用reject方法。

          addIfUnderMaximumPoolSize源碼

          private boolean addIfUnderMaximumPoolSize(Runnable firstTask)
          {
              Thread t = null;
              final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              try
              {
                  if (poolSize < maximumPoolSize && runState == RUNNING)
                      t = addThread(firstTask);
              }
              finally
              {
                  mainLock.unlock();
              }
              if (t == null)
                  return false;
              t.start();
              return true;
          }

          也就是如果線程池滿了,而且線程池調(diào)用了shutdown后,還在調(diào)用execute方法時(shí),就會(huì)拋出上面說明的異常:RejectedExecutionException。

          workerDone源碼

          void workerDone(Worker w)
          {
              final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              try
              {
                  completedTaskCount += w.completedTasks;
                  workers.remove(w);
                  if (--poolSize == 0)
                      tryTerminate();
              }
              finally
              {
                  mainLock.unlock();
              }
          }

          注意這里將workers.remove(w)掉,并且調(diào)用了—poolSize來做操作。至于tryTerminate是做了更多關(guān)于回收方面的操作。

          runTask(task)源碼

          private void runTask(Runnable task)
          {
              final ReentrantLock runLock = this.runLock;
              runLock.lock();
              try
              {
                  if (runState < STOP &&
                          Thread.interrupted() &&
                          runState >= STOP)
                      thread.interrupt();
                  boolean ran = false;
                  beforeExecute(thread, task);
                  try
                  {
                      task.run();
                      ran = true;
                      afterExecute(task, null);
                      ++completedTasks;
                  }
                  catch (RuntimeException ex)
                  {
                      if (!ran)
                          afterExecute(task, ex);
                      throw ex;
                  }
              }
              finally
              {
                  runLock.unlock();
              }
          }

          你可以看到,這里面的task為傳入的task信息,調(diào)用的不是start方法,而是run方法,因?yàn)閞un方法直接調(diào)用不會(huì)啟動(dòng)新的線程,也是因?yàn)檫@樣,導(dǎo)致了你無法獲取到你自己的線程的狀態(tài),因?yàn)榫€程池是直接調(diào)用的run方法,而不是start方法來運(yùn)行。 
          這里有個(gè)beforeExecute和afterExecute方法,分別代表在執(zhí)行前和執(zhí)行后,你可以做一段操作,在這個(gè)類中,這兩個(gè)方法都是空的,因?yàn)槠胀ň€程池?zé)o需做更多的操作。 
          如果你要實(shí)現(xiàn)類似暫停等待通知的或其他的操作,可以自己extends后進(jìn)行重寫構(gòu)造。

          添加任務(wù)處理流程

          AbortPolicy()

          public static class AbortPolicy implements RejectedExecutionHandler
          {
              /**
               * Creates an {@code AbortPolicy}.
               */
              public AbortPolicy() { }

              /**
               * Always throws RejectedExecutionException.
               *
               * @param r the runnable task requested to be executed
               * @param e the executor attempting to execute this task
               * @throws RejectedExecutionException always.
               */
              public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
              {
                  throw new RejectedExecutionException("Task " + r.toString() +
                                                       " rejected from " +
                                                       e.toString());
              }
          }
          /*當(dāng)線程池中的數(shù)量等于最大線程數(shù)時(shí),直接拋出拋出java.util.concurrent.RejectedExecutionException異常。*/

          CallerRunsPolicy()

          public static class CallerRunsPolicy implements RejectedExecutionHandler
          {
              /**
               * Creates a {@code CallerRunsPolicy}.
               */
              public CallerRunsPolicy() { }

              /**
               * Executes task r in the caller's thread, unless the executor
               * has been shut down, in which case the task is discarded.
               *
               * @param r the runnable task requested to be executed
               * @param e the executor attempting to execute this task
               */
              public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
              {
                  if (!e.isShutdown())
                  {
                      r.run();
                  }
              }
          }

          當(dāng)線程池中的數(shù)量等于最大線程數(shù)時(shí)、重試執(zhí)行當(dāng)前的任務(wù),交由調(diào)用者線程來執(zhí)行任務(wù)。

          DiscardOldestPolicy()

          public static class DiscardOldestPolicy implements RejectedExecutionHandler
          {
              /**
               * Creates a {@code DiscardOldestPolicy} for the given executor.
               */
              public DiscardOldestPolicy() { }

              /**
               * Obtains and ignores the next task that the executor
               * would otherwise execute, if one is immediately available,
               * and then retries execution of task r, unless the executor
               * is shut down, in which case task r is instead discarded.
               *
               * @param r the runnable task requested to be executed
               * @param e the executor attempting to execute this task
               */
              public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
              {
                  if (!e.isShutdown())
                  {
                      e.getQueue().poll();
                      e.execute(r);
                  }
              }
          }

          當(dāng)線程池中的數(shù)量等于最大線程數(shù)時(shí)、拋棄線程池中最后一個(gè)要執(zhí)行的任務(wù),并執(zhí)行新傳入的任務(wù)。

          DiscardPolicy()

          public static class DiscardPolicy implements RejectedExecutionHandler
          {
              /**
               * Creates a {@code DiscardPolicy}.
               */
              public DiscardPolicy() { }
              /**
               * Does nothing, which has the effect of discarding task r.
               *
               * @param r the runnable task requested to be executed
               * @param e the executor attempting to execute this task
               */
              public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
              {
              }
          }

          當(dāng)線程池中的數(shù)量等于最大線程數(shù)時(shí),不做任何動(dòng)作。 
          通常你得到線程池后,會(huì)調(diào)用其中的:submit方法或execute方法去操作;其實(shí)你會(huì)發(fā)現(xiàn),submit方法最終會(huì)調(diào)用execute方法來進(jìn)行操 作,只是他提供了一個(gè)Future來托管返回值的處理而已,當(dāng)你調(diào)用需要有返回值的信息時(shí),你用它來處理是比較好的;這個(gè)Future會(huì)包裝對 Callable信息,并定義一個(gè)Sync對象,當(dāng)你發(fā)生讀取返回值的操作的時(shí)候,會(huì)通過Sync對象進(jìn)入鎖,直到有返回值的數(shù)據(jù)通知。

          違反Java高質(zhì)量代碼案例

          異步運(yùn)算使用Callable接口

          Callable接口代碼如下:

          public interface Callable<V>{
              v call() throws Exception;
          }

          實(shí)現(xiàn)Callable接口,只是表明它是一個(gè)可調(diào)用的任務(wù),并不表示它具有多線程運(yùn)算的能力,還是要執(zhí)行器來執(zhí)行。代碼如下:

          class TaxCalculator implements Callable<Integer>{

              private int seedMoney;
              public TaxCalculator(int _seedMoney){
                  seedMoney=_seedMoney;
              }
              @Override
              public Integer call() throws Exception {
                 TimeUnit.MILLISECONDS.sleep(10000);
                  return seedMoney/10;
              }

          }

          這里模擬稅款計(jì)算器運(yùn)算,可能花費(fèi)10秒鐘時(shí)間。用戶輸入即有輸出,若耗時(shí)較長,則顯示運(yùn)算進(jìn)度。如果我們直接計(jì)算,就只有一個(gè)main線程,是不可能友好提示的,如果稅金不計(jì)算完畢,也不會(huì)執(zhí)行后續(xù)動(dòng)作,所以最好的辦法就是重啟一個(gè)線程來運(yùn)算,讓main線程做進(jìn)度提示

          public static void main(String[] args) throws Exception{
                  ExecutorService es=Executors.newSingleThreadExecutor();
                  Future<Integer> future=es.submit(new TaxCalculator(100));
                  while(!future.isDone()){
                      TimeUnit.MILLISECONDS.sleep(200);
                      System.out.println("#");
                  }
                  System.out.println("\n 計(jì)算完成,稅金是:"+future.get()+"元");
                  es.shutdown();

              }

          Executors是一個(gè)靜態(tài)工具類,提供了異步執(zhí)行器的創(chuàng)建能力,如單線程執(zhí)行newSingleThreadExcutor、固定線程數(shù)量的執(zhí)行器newFixedThreadPool等,一般是異步計(jì)算的入口類。

          優(yōu)先選擇線程池

          線程的狀態(tài)只能由新建狀態(tài)轉(zhuǎn)變?yōu)檫\(yùn)行態(tài)后才可能被阻塞或等待,最后終結(jié),不可能產(chǎn)生本末倒置的情況,代碼如下:

          public static void main(String[] args) throws Exception{

              Thread t=new Thread(new Runnable() {

                  @Override
                  public void run() {
                      System.out.println("線程在運(yùn)行");

                  }
              });
              t.start();
              while(!t.getState().equals(Thread.State.TERMINATED)){
                  TimeUnit.MILLISECONDS.sleep(10);
              }
              t.start();
          }

          此時(shí)程序運(yùn)行會(huì)報(bào)IllegalThreadStateException異常,原因就是不能從結(jié)束狀態(tài)直接轉(zhuǎn)換為可運(yùn)行狀態(tài)。這時(shí)可以引入線程池,當(dāng)系統(tǒng)需要時(shí)直接從線程池中獲得線程,運(yùn)算出結(jié)果,再把線程返回到線程池中,代碼如下:

          public static void main(String[] args) {
                  ExecutorService es = Executors.newFixedThreadPool(2);
                  for (int i = 0; i < 4; i++) {
                      es.submit(new Runnable() {

                          @Override
                          public void run() {
                              System.out.println(Thread.currentThread().getName());

                          }

                      });
                  }
                  es.shutdown();
              }

          線程死鎖

          Java是單線程語言,一旦線程死鎖,只能借助外部進(jìn)程重啟應(yīng)用才能解決。

          static class A {
                  public synchronized void a1(B b) {
                      String name = Thread.currentThread().getName();
                      System.out.println(name + "進(jìn)入A.a1()");
                      try {
                          Thread.sleep(1000);
                      } catch (Exception e) {
                          // TODO: handle exception
                      }
                      System.out.println(name + "試圖訪問B.b2()");
                      b.b2();
                  }

                  public synchronized void a2() {
                      System.out.println("進(jìn)入 a.a2()");
                  }
              }

              static class B {
                  public synchronized void b1(A a) {
                      String name = Thread.currentThread().getName();
                      System.out.println(name + "進(jìn)入B.b1()");
                      try {
                          Thread.sleep(1000);
                      } catch (Exception e) {
                          // TODO: handle exception
                      }
                      System.out.println(name + "試圖訪問A.a2()");
                      a.a2();
                  }

                  public synchronized void b2() {
                      System.out.println("進(jìn)入 B.b2()");
                  }
              }

              public static void main(String[] args) {
                  final A a = new A();
                  final B b = new B();
                  new Thread(new Runnable() {

                      @Override
                      public void run() {
                          a.a1(b);
                      }
                  }, "線程A").start();
                  ;
                  new Thread(new Runnable() {

                      @Override
                      public void run() {
                          b.b1(a);
                      }
                  }, "線程B").start();
                  ;
              }

          此段程序定義了兩個(gè)資源A和B,然后在兩個(gè)線程A、B中使用了該資源,由于兩個(gè)資源之間有交互操作,并且都是同步方法,因此在線程A休眠1秒鐘后,它會(huì)試圖訪問資源B的b2方法,但是線程B持有該類的鎖,并同時(shí)在等待A線程釋放其鎖資源,所以此時(shí)就出現(xiàn)了兩個(gè)線程在互相等待釋放資源的情況,也就是死鎖??梢允褂米孕i改進(jìn),代碼如下:

          public  void b2()
          {
              try
              {
                  if(Lock.trylock(2, TimeUnit.SECONDS))
                  {
                      System.out.println("進(jìn)入 B.b2()");
                  }
              }
              catch (InterruptedException e)
              {
                  // TODO: handle exception
              }
              finally
              {
                  Lock.unlock();
              }


          }

          它原理和互斥鎖一樣,如果一個(gè)執(zhí)行單元要想訪問被自旋鎖保護(hù)的共享資源,則必須先得到鎖,在訪問完共享資源后,也必須釋放鎖。

          忽略設(shè)置阻塞隊(duì)列長度

          BlockingQueue是一種集合,實(shí)現(xiàn)了Collection接口,容量是不可以自行管理的,代碼如下:

          public static void main(String[] args) throws Exception {
                  BlockingDeque<String> bq = (BlockingDeque<String>) new ArrayBlockingQueue<String>(
                          5);
                  for (int i = 0; i < 10; i++) {
                      bq.add("");
                  }
              }

          阻塞隊(duì)列容量是固定的,非阻塞隊(duì)列則是變長的。阻塞隊(duì)列可以在聲明是指定隊(duì)列的容量,若指定的容量,則元素的數(shù)量不可超過該容量,若不指定,隊(duì)列的容量為Integer的最大值

          public  class ArrayBlockingQueue<E> extends AbstractQueue<E> implements
              BlockingDeque<E>, java.io.Serializable
          {
              public final E[] items;
              private int count;

              public boolean add(E e)
              {
                  if (offer(e))
                      return true;
                  else
                      throw new IllegalStateException("Queue full");
              }

              public boolean offer(E e)
              {
                  final ReentrantLock lock = this.lock;
                  lock.lock();
                  try
                  {
                      if (count == items.length)
                          ;
                      else
                      {
                          insert(e);
                          return true;
                      }
                  }
                  finally
                  {
                      lock.unlock();
                  }
              }

          }

          上面在加入元素時(shí),如果判斷當(dāng)前隊(duì)列已滿,則返回false,表示插入失敗,之后再包裝成隊(duì)列滿異常。

          使用stop方法停止線程

          stop方法會(huì)破壞原子邏輯,代碼如下:

          class MutiThread implements Runnable {
              int a = 0;

              @Override
              public void run() {
                  // TODO Auto-generated method stub
                  synchronized ("") {
                      a++;
                      try {
                          Thread.sleep(100);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      a--;
                      String tn = Thread.currentThread().getName();
                      System.out.println(tn + ":a=" + a);

                  }
              }

              public static void main(String[] args) {
                  MutiThread t = new MutiThread();
                  Thread t1 = new Thread(t);
                  t1.start();
                  for (int i = 0; i < 5; i++) {
                      new Thread(t).start();
                  }
                  t1.stop();
              }
          }

          所有線程共享了一個(gè)MutilThread的實(shí)例變量t,由于在run方法中加入了同步代碼塊,所以只能有一個(gè)線程進(jìn)入到synchronized塊中,可以自定義標(biāo)志位來決定線程執(zhí)行情況,代碼如下:

          class SafeStopThread extends Thread{
              private volatile boolean stop=false;
              @Override
              public void run()
              {//判斷線程體是否運(yùn)行
                  while(stop)
                  {}
              }
              //線程終止
              public void terminate(){
                  stop=true;
              }
          }

          在線程體中判斷是否需要停止運(yùn)行,即可保證線程體的邏輯完整性,而且也不會(huì)破壞原子邏輯。

          覆寫start方法

          代碼:

          class MutiThread implements Thread
          {


              @Override
              public void start()
              {
                  //調(diào)用線程體
                  run();
              }
          }
          @Override
          public void run()
          {

          }
          }
          public static void main(String[] args)
          {
              MutiThread t = new MutiThread();
              t.start();
          }

          }

          main方法根本就沒有啟動(dòng)一個(gè)子線程,整個(gè)應(yīng)用程序中只有一個(gè)主線程在運(yùn)行,并不會(huì)創(chuàng)建其他的線程。改進(jìn)后代碼如下:

          class MutiThread implements Thread
          {


              @Override
              public void start()
              {
                  /*線程啟動(dòng)前的業(yè)務(wù)處理*/
                  super.start();
                  /*線程啟動(dòng)后的業(yè)務(wù)處理*/
              }
          }
          @Override
          public void run()
          {

          }
          }

          start方法調(diào)用父類的start方法,沒有主動(dòng)調(diào)用run方法,由JVM自行調(diào)用,不用我們的顯式實(shí)現(xiàn)。

          使用過多線程優(yōu)先級(jí)

          Java線程有10個(gè)基本,級(jí)別為0代表JVM 
          代碼如下:

          class MutiThread implements Runnable {
              public void start(int _priority) {
                  Thread t = new Thread(this);
                  t.setPriority(_priority);
                  t.start();
              }

              @Override
              public void run() {
                  for (int i = 0; i < 10000; i++) {
                      Math.hypot(Math.pow(924526789, i), Math.cos(i));
                  }
                  System.out.println("Priority:"+Thread.currentThread().getPriority());
              }
              public static void main(String[] args) {
                  for(int i=0;i<20;i++)
                  {
                      new MutiThread().start(i%10+1);
                  }
              }
          }

          Java優(yōu)先級(jí)只是代表搶占CPU機(jī)會(huì)大小,優(yōu)先級(jí)越高,搶占CPU機(jī)會(huì)越大,被優(yōu)先執(zhí)行的可能性越高,優(yōu)先級(jí)相差不大,則搶占CPU機(jī)會(huì)差別也不大。導(dǎo)致優(yōu)先級(jí)為9的線程比優(yōu)先級(jí)為10的線程先運(yùn)行。于是在Thread類中設(shè)置三個(gè)優(yōu)先級(jí),建議使用優(yōu)先級(jí)常量,而不是1到10的隨機(jī)數(shù)字,代碼如下:

            public final static int MIN_PRIORITY = 1;

          /**
            * The default priority that is assigned to a thread.
            */
          public final static int NORM_PRIORITY = 5;

          /**
           * The maximum priority that a thread can have.
           */
          public final static int MAX_PRIORITY = 10;

          /**
           * Returns a reference to the currently executing thread object.
           *
           * @return  the currently executing thread.
           */
          }

          Lock與synchronized

          Lock為顯式鎖,synchronized為內(nèi)部鎖,代碼如下:

          class  Task
          {
              public void dosomething(){
                  try {
                      Thread.sleep(2000);
                  } catch (Exception e) {
                      // TODO: handle exception
                  }
                  StringBuffer sb=new StringBuffer();
                  sb.append("線程名:"+Thread.currentThread().getName());
                  sb.append(",線程時(shí)間:"+Calendar.getInstance().get(13)+"s");
                  System.out.println(sb);
              }
          }
          //顯示鎖任務(wù)
          class TaskWithLock extends Task implements Runnable{
          private final Lock lock=new ReentrantLock();
              @Override
              public void run() {
                  try {
                      lock.lock();
                      dosomething();
                  } finally
                  {
                      lock.unlock();
                  }

              }};
              //內(nèi)部鎖任務(wù)
              class TaskWithSync extends Task implements Runnable{

                  @Override
                  public void run() {

                          synchronized ("A") {
                              dosomething();

                          }


                  }};

          對于同步資源來說,顯式鎖時(shí)對象級(jí)別的鎖,而內(nèi)部鎖時(shí)類級(jí)別的鎖,也就是說lock鎖時(shí)跟隨對象的,synchronized鎖時(shí)跟隨類 
          改進(jìn)方法:把Lock定義為所有線程的共享變量。

          public static void main(String[] args) {
                  //多個(gè)線程共享鎖
                  final Lock lock=new ReentrantLock();
                  ……
              }

          線程池異常處理

          Java中線程執(zhí)行的任務(wù)接口java.lang.Runnable 要求不拋出Checked異常,

          public interface Runnable {   

              public abstract void run();   
          }

          那么如果 run() 方法中拋出了RuntimeException,將會(huì)怎么處理了? 
          通常java.lang.Thread對象運(yùn)行設(shè)置一個(gè)默認(rèn)的異常處理方法:

          java.lang.Thread.setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler)  

          而這個(gè)默認(rèn)的靜態(tài)全局的異常捕獲方法時(shí)輸出堆棧。當(dāng)然,我們可以覆蓋此默認(rèn)實(shí)現(xiàn),只需要一個(gè)自定義的java.lang.Thread.UncaughtExceptionHandler接口實(shí)現(xiàn)即可。

          public interface UncaughtExceptionHandler {   

              void uncaughtException(Thread t, Throwable e);   
          }

          而在線程池中卻比較特殊。默認(rèn)情況下,線程池 java.util.concurrent.ThreadPoolExecutor 會(huì)Catch住所有異常, 當(dāng)任務(wù)執(zhí)行完成(java.util.concurrent.ExecutorService.submit(Callable))獲取其結(jié)果 時(shí)(java.util.concurrent.Future.get())會(huì)拋出此RuntimeException。

          /**   
           * Waits if necessary for the computation to complete, and then   
           * retrieves its result.   
           *   
           * @return the computed result   
           * @throws CancellationException if the computation was cancelled   
           * @throws ExecutionException if the computation threw an exception   
           * @throws InterruptedException if the current thread was interrupted while waiting   
           */   
          V get() throws InterruptedException, ExecutionException;

          其中 ExecutionException 異常即是java.lang.Runnable 或者 java.util.concurrent.Callable 拋出的異常。

          也就是說,線程池在執(zhí)行任務(wù)時(shí)捕獲了所有異常,并將此異常加入結(jié)果中。這樣一來線程池中的所有線程都將無法捕獲到拋出的異常。從而無法通過設(shè)置線程的默認(rèn)捕獲方法攔截的錯(cuò)誤異常。也不同通過 自定義線程來完成異常的攔截。好在java.util.concurrent.ThreadPoolExecutor 預(yù)留了一個(gè)方法,運(yùn)行在任務(wù)執(zhí)行完畢進(jìn)行擴(kuò)展(當(dāng)然也預(yù)留一個(gè)protected方法beforeExecute(Thread t, Runnable r)):

          protected void afterExecute(Runnable r, Throwable t) { } 

          此方法的默認(rèn)實(shí)現(xiàn)為空,這樣我們就可以通過繼承或者覆蓋ThreadPoolExecutor 來達(dá)到自定義的錯(cuò)誤處理。

          解決辦法如下:

          ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(11, 100, 1, TimeUnit.MINUTES, //   
                  new ArrayBlockingQueue<Runnable>(10000),//   
                  new DefaultThreadFactory()) {   

              protected void afterExecute(Runnable r, Throwable t) {   
                  super.afterExecute(r, t);   
                  printException(r, t);   
              }   
          };   

          private static void printException(Runnable r, Throwable t) {   
              if (t == null && r instanceof Future<?>) {   
                  try {   
                      Future<?> future = (Future<?>) r;   
                      if (future.isDone())   
                          future.get();   
                  } catch (CancellationException ce) {   
                      t = ce;   
                  } catch (ExecutionException ee) {   
                      t = ee.getCause();   
                  } catch (InterruptedException ie) {   
                      Thread.currentThread().interrupt(); // ignore/reset   
                  }   
              }   
              if (t != null)   
                  log.error(t.getMessage(), t);   
          }

          使用SimpleThread類

          TestThreadPool類是一個(gè)測試程序,用來模擬客戶端的請求,當(dāng)你運(yùn)行它時(shí),系統(tǒng)首先會(huì)顯示線程池的初始化信息,然后提示你從鍵盤上輸入字符串,并按下回車鍵,這時(shí)你會(huì)發(fā)現(xiàn)屏幕上顯示信息,告訴你某個(gè)線程正在處理你的請求,如果你快速地輸入一行行字符串,那么你會(huì)發(fā)現(xiàn)線程池中不斷有線程被喚醒,來處理你的請求,在本例中,我創(chuàng)建了一個(gè)擁有10個(gè)線程的線程池,如果線程池中沒有可用線程了,系統(tǒng)會(huì)提示你相應(yīng)的警告信息,但如果你稍等片刻,那你會(huì)發(fā)現(xiàn)屏幕上會(huì)陸陸續(xù)續(xù)提示有線程進(jìn)入了睡眠狀態(tài),這時(shí)你又可以發(fā)送新的請求了。 
          代碼如下:

          //TestThreadPool.java

          import java.io.*;
          public class TestThreadPool
          {
              public static void main(String[] args)
              {
                  try
                  {
                      BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
                      String s;
                      ThreadPoolManager manager = new ThreadPoolManager(10);
                      while((s = br.readLine()) != null)
                      {
                          manager.process(s);
                      }
                  }
                  catch(IOException e) {}
              }
          }

          ThreadPoolManager類,顧名思義,它是一個(gè)用于管理線程池的類,它的主要職責(zé)是初始化線程池,并為客戶端的請求分配不同的線程來進(jìn)行處理,如果線程池滿了,它會(huì)對你發(fā)出警告信息。 
          代碼如下:

          import java.util.*;
            
            class ThreadPoolManager
               
          {
               
                 private int maxThread;
                 public Vector vector;
                 public void setMaxThread(int threadCount)
                
              {
                     maxThread = threadCount;
                    
              }
                
                 public ThreadPoolManager(int threadCount)
                
              {
                     setMaxThread(threadCount);
                    System.out.println("Starting thread pool...");
                     vector = new Vector();
                     for(int i = 1; i <= 10; i++)
                         {
                         SimpleThread thread = new SimpleThread(i);
                         vector.addElement(thread);
                         thread.start();
                        
                  }
                    
              }
                
                 public void process(String argument)
                
              {
                     int i;
                     for(i = 0; i < vector.size(); i++)
                        {
                        SimpleThread currentThread = (SimpleThread)vector.elementAt(i);
                         if(!currentThread.isRunning())
                             {
                            System.out.println("Thread " + (i + 1) + " is processing:" +
                            argument);
                           currentThread.setArgument(argument);
                             currentThread.setRunning(true);
                             return;
                           
                      }
                        
                  }
                    if(i == vector.size())
                         {
                         System.out.println("pool is full, try in another time.");
                        
                  }
                    
              }
                
          }//end of class ThreadPoolManager

          我們先關(guān)注一下這個(gè)類的構(gòu)造函數(shù),然后再看它的process()方法。第16-24行是它的構(gòu)造函數(shù),首先它給ThreadPoolManager類的成員變量maxThread賦值,maxThread表示用于控制線程池中最大線程的數(shù)量。第18行初始化一個(gè)數(shù)組vector,它用來存放所有的SimpleThread類,這時(shí)候就充分體現(xiàn)了JAVA語言的優(yōu)越性與藝術(shù)性:如果你用C語言的話,至少要寫100行以上的代碼來完成vector的功能,而且C語言數(shù)組只能容納類型統(tǒng)一的基本數(shù)據(jù)類型,無法容納對象。好了,閑話少說,第19-24行的循環(huán)完成這樣一個(gè)功能:先創(chuàng)建一個(gè)新的SimpleThread類,然后將它放入vector中去,最后用thread.start()來啟動(dòng)這個(gè)線程,為什么要用start()方法來啟動(dòng)線程呢?因?yàn)檫@是JAVA語言中所規(guī)定的,如果你不用的話,那這些線程將永遠(yuǎn)得不到激活,從而導(dǎo)致本示例程序根本無法運(yùn)行。

          process()方法,第30-40行的循環(huán)依次從vector數(shù)組中選取SimpleThread線程,并檢查它是否處于激活狀態(tài)(所謂激活狀態(tài)是指此線程是否正在處理客戶端的請求),如果處于激活狀態(tài)的話,那繼續(xù)查找vector數(shù)組的下一項(xiàng),如果vector數(shù)組中所有的線程都處于激活狀態(tài)的話,那它會(huì)打印出一條信息,提示用戶稍候再試。相反如果找到了一個(gè)睡眠線程的話,那第35-38行會(huì)對此進(jìn)行處理,它先告訴客戶端是哪一個(gè)線程來處理這個(gè)請求,然后將客戶端的請求,即字符串a(chǎn)rgument轉(zhuǎn)發(fā)給SimpleThread類的setArgument()方法進(jìn)行處理,并調(diào)用SimpleThread類的setRunning()方法來喚醒當(dāng)前線程,來對客戶端請求進(jìn)行處理。

          解決辦法是引入SimpleThread類,它是Thread類的一個(gè)子類,它才真正對客戶端的請求進(jìn)行處理,SimpleThread在示例程序初始化時(shí)都處于睡眠狀態(tài),但如果它接受到了ThreadPoolManager類發(fā)過來的調(diào)度信息,則會(huì)將自己喚醒,并對請求進(jìn)行處理。 
          代碼如下:

          class SimpleThread extends Thread
               
          {
                 private boolean runningFlag;
                 private String argument;
                 public boolean isRunning()
                
              {
                     return runningFlag;
                    
              }
                public synchronized void setRunning(boolean flag)
                
              {
                     runningFlag = flag;
                     if(flag)
                         this.notify();
                    
              }
                
                 public String getArgument()
                
              {
                     return this.argument;
                    
              }
                 public void setArgument(String string)
                
              {
                     argument = string;
                    
              }
                
                 public SimpleThread(int threadNumber)
                
              {
                     runningFlag = false;
                     System.out.println("thread " + threadNumber + "started.");
                    
              }
                
                 public synchronized void run()
                
              {
                     try{
                         while(true)
                             {
                             if(!runningFlag)
                                 {
                                 this.wait();
                                
                          }
                             else
                                 {
                                 System.out.println("processing " + getArgument() + "... done.");
                                 sleep(5000);
                                 System.out.println("Thread is sleeping...");
                                 setRunning(false);
                                
                          }
                            
                      }
                        
                  }
                  catch(InterruptedException e)
                  {
                         System.out.println("Interrupt");
                        
                  }
                    
              }//end of run()

                
          }//end of class SimpleThread

          線程使用不當(dāng)導(dǎo)致內(nèi)存溢出

          代碼如下:

          class IndexCallable implements Callable
          {
              private List<?> t;
              @override
              public object call()
              {
                  ……
              }
          }

          程序是這樣的,有一個(gè)線程會(huì)往List中插入對象,線程池中的多個(gè)線程叢List中取數(shù)據(jù),然后進(jìn)行處理,處理完以后把對象從List中刪除。outofmemory有幾種可能:

          1.線程池中的處理線程在處理完以后沒有從List中刪掉元素

          2.向List中插入元素的速度高于從List中刪除元素的速度,造成List中積累的元素?cái)?shù)量不斷攀升,可以隨時(shí)打印一下List中的元素?cái)?shù)量,看是否是一支攀升。

          3.ArrayList和LinkedList都不是線程安全的,把List換成Vector或者保證List變量通過Synchronized同步訪問。

          4.在程序的其他地方還持有List中的對象句柄,雖然從List中刪掉了,如果別的地方還保存著該對象的句柄,那么也不會(huì)被垃圾回收。

          5.JVM的應(yīng)用程序最大可用內(nèi)存參數(shù)(-Xmx)配置過低

          如:

          JAVA_OPTS="-server -Xms800m -Xmx800m -XX:PermSize=64M -XX:MaxNewSize=256m -XX:MaxPermSize=128m -Djava.awt.headless=true "

          工作隊(duì)列

          是同一組固定的工作線程相結(jié)合的工作隊(duì)列,它使用 wait() 和 notify() 來通知等待線程新的工作已經(jīng)到達(dá)了。該工作隊(duì)列通常被實(shí)現(xiàn)成具有相關(guān)監(jiān)視器對象的某種鏈表,下邊的代碼顯示了簡單的合用工作隊(duì)列的示例。盡管 Thread API 沒有對使用 Runnable 接口強(qiáng)加特殊要求,但使用 Runnable 對象隊(duì)列的這種模式是調(diào)度程序和工作隊(duì)列的公共約定。

          public class WorkQueue
          {

              private final int nThreads;

              private final PoolWorker[] threads;

              private final LinkedList queue;

              public WorkQueue(int nThreads)
              {

                  this.nThreads = nThreads;

                  queue = new LinkedList();

                  threads = new PoolWorker[nThreads];

                  for (int i = 0; i

                          threads[i] = new PoolWorker();

                          threads[i].start();

              }

          }

                  public void execute(Runnable r)
          {

              synchronized(queue)
              {

                  queue.addLast(r);

                  queue.notify();

              }

          }

          private class PoolWorker extends Thread
          {

              public void run()
              {

                  Runnable r;

                  while (true)
                  {

                      synchronized(queue)
                      {

                          while (queue.isEmpty())
                          {

                              try

                              {

                                  queue.wait();

                              }

                              catch (InterruptedException ignored)

                              {

                              }

                          }

                          r = (Runnable) queue.removeFirst();

                      }

                      // If we don't catch RuntimeException,

                      // the pool could leak threads

                      try
                      {

                          r.run();

                      }

                      catch (RuntimeException e)
                      {

                          // You might want to log something here

                      }

                  }

              }

          }

          }

          實(shí)現(xiàn)使用的是 notify() 而不是 notifyAll() 。大多數(shù)專家建議使用 notifyAll() 而不是 notify() ,而且理由很充分:使用 notify() 具有難以捉摸的風(fēng)險(xiǎn),只有在某些特定條件下使用該方法才是合適的。另一方面,如果使用得當(dāng), notify() 具有比 notifyAll() 更可取的性能特征;特別是,notify() 引起的環(huán)境切換要少得多,這一點(diǎn)在服務(wù)器應(yīng)用程序中是很重要的。






          瀏覽 63
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日皮网站在线观看 | 天堂成人网站 | 欧美成人高清在线 | 天天草天天干 | 翔田千里久久 |