<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          談?wù)刦ork/join實現(xiàn)原理

          共 108936字,需瀏覽 218分鐘

           ·

          2021-07-08 07:20

          點擊上方藍(lán)色字體,選擇“標(biāo)星公眾號”

          優(yōu)質(zhì)文章,第一時間送達(dá)

            作者 |  等你歸去來

          來源 |  urlify.cn/nEFRra

          害,又是一個炒冷飯的時間。fork/join是在jdk1.7中出現(xiàn)的一個并發(fā)工作包,其特點是可以將一個大的任務(wù)拆分成多個子任務(wù)進(jìn)行并行處理,最后將子任務(wù)結(jié)果合并成最后的計算結(jié)果,并進(jìn)行輸出。從而達(dá)到多線程分發(fā)任務(wù),達(dá)到高效處理的目的。

          1. 關(guān)于fork/join的一點想法

            以上說法,也許大家沒什么感覺。但換個說法可能會更讓人體會深切。總體上,相當(dāng)于一個map階段數(shù)據(jù)拆分,一個reduce階段數(shù)據(jù)收集。即一個mapreduce過程,是不是有大數(shù)據(jù)的思想在了。只不過這fork/join的拆分難度可見性更大(自己手動拆,mapreduce由shuffle組件自動拆),另外fork/join是在一個機(jī)器上運(yùn)行,而大數(shù)據(jù)的框架,則是在分布式系統(tǒng)中運(yùn)行的。

            從這個點說來,好像研究fork/join就顯得有些意義了。

            只是,按照fork/join的語義解釋,是將任務(wù)拆分,然后處理,然后再合并結(jié)果。如果沒有了合并結(jié)果這一步,那么,它就等同于線程池了,這也就是有人說它與線程池有啥差別的疑惑所在了。再說有需要收集結(jié)果的這一語義,其實我們也是可以通過線程池去執(zhí)行任務(wù),然后再用get()得到結(jié)果,然后在外部做合并,也是一樣咯。

          2. fork/join的幾個核心類

            fork/join被稱作執(zhí)行框架,自然不會是一個單一組件問題了。

            首先,它會有一個 ForkJoinPool, 相當(dāng)于線程池, 所有的任務(wù)都要通過它來進(jìn)行提交,然后由其進(jìn)行統(tǒng)一調(diào)度。

            然后,每個任務(wù)都會有許多相同的代碼,只有業(yè)務(wù)實現(xiàn)是不一樣的,所以它會有一個基類:RecursiveTask . 實現(xiàn)上還有一個無返回結(jié)果的類:RecursiveAction, 只是沒有返回結(jié)果時,往往又可能可以使用普通線程池執(zhí)行替代了。(沒有絕對)

            ForkJoinWorkerThreadFactory, 是fork/join框架的線程工廠類,原本含義與普通的線程工廠類一致,只是它的入?yún)⒉辉偈且粋€個 Runnable 任務(wù),而是 ForkJoinPool, 因為它們所處的上下文是不一樣的。

            ForkJoinWorkerThread, 執(zhí)行fork/join的具體線程,它可能在執(zhí)行過程中,再去主動添加task。而它自身擁有一個隊列,它的主要任務(wù)就是獲取隊列任務(wù),然后執(zhí)行。但當(dāng)其自身的隊列完成時,它可以通過work-steal算法竊取其他線程的隊列任務(wù)。這也是fork/join的核心所在。

            sun.misc.Unsafe, 之所以要提到這個jdk類,是因為在fork/join框架中,對于隊列的管理,不是通過普通的list或數(shù)組來實現(xiàn),而是通過 U.putOrderedObject(a, j, task); 來存放,雖然效果與數(shù)組是一樣的,但它會更簡單地實現(xiàn)線程安全的操作。只是,其中有許多的位操作,值得學(xué)習(xí)的同時,也顯得有些麻煩了。

          3. fork/join使用樣例

            我們通過對一個數(shù)組的排序過程,使用fork/join來實現(xiàn)看看如何使用這框架。尤其對于大數(shù)組的排序,顯得還是有用的。這種大數(shù)組的排序,一般都會使用快速排序或者歸并排序來處理。此處使用fork/join框架來處理,也是暗合了歸并排序的道理了。

          import java.util.Arrays;
          import java.util.Random;
          import java.util.concurrent.ExecutionException;
          import java.util.concurrent.ForkJoinPool;
          import java.util.concurrent.ForkJoinTask;
          import java.util.concurrent.RecursiveTask;

          /**
           * Fork/join框架測試
           */
          public class TestForkJoinFramework {

              public static void main(String[] args) {
                  long beginTime = System.currentTimeMillis();
                  ForkJoinPool pool = new ForkJoinPool();
                  int mockArrLen = 1000_0000;
                  int[] arr = new int[mockArrLen];
                  Random r = new Random();
                  for (int index = 1; index <= mockArrLen; index++) {
                      arr[index - 1] = r.nextInt(1000_0000);
                  }
                  FJOrderTask task = new FJOrderTask(arr);
                  ForkJoinTask<int[]> taskResult = pool.submit(task);
                  try {
                      // 等待結(jié)果完成
                      taskResult.get();
                  } catch (InterruptedException | ExecutionException e) {
                      e.printStackTrace();
                  }
                  long endTime = System.currentTimeMillis();
                  System.out.println("耗時=" + (endTime - beginTime));
              }

              /**
               * 單個排序的子任務(wù)
               */
              private static class FJOrderTask extends RecursiveTask<int[]> {

                  /**
                   * 當(dāng)前排序的數(shù)組值
                   */
                  private final int[] source;

                  public FJOrderTask(int[] source) {
                      this.source = source;
                  }

                  /**
                   * 真正的業(yè)務(wù)計算邏輯
                   *
                   * @see java.util.concurrent.RecursiveTask#compute()
                   */
                  @Override
                  protected int[] compute() {
                      int sourceLen = source.length;
                      // 如果條件成立,說明任務(wù)中要進(jìn)行排序的集合還不夠小
                      System.out.println(Thread.currentThread());
                      if (sourceLen > 2) {
                          int midIndex = sourceLen / 2;
                          // 拆分成兩個子任務(wù), 0 -> mid - 1, mid -> len
                          FJOrderTask task1 = new FJOrderTask(
                                  Arrays.copyOf(source, midIndex));
                          task1.fork();
                          FJOrderTask task2 = new FJOrderTask(
                                  Arrays.copyOfRange(source, midIndex, sourceLen));
                          task2.fork();
                          // 將兩個有序的數(shù)組,合并成一個有序的數(shù)組
                          int[] result1 = task1.join();
                          int[] result2 = task2.join();
                          return insertMerge(result1, result2);
                      }
                      // 否則說明集合中只有一個或者兩個元素,可以進(jìn)行這兩個元素的比較排序了
                      else {
                          // 如果條件成立,說明數(shù)組中只有一個元素,或者是數(shù)組中的元素都已經(jīng)排列好位置了
                          if (sourceLen == 1
                                  || source[0] <= source[1]) {
                              return source;
                          } else {
                              int[] orderedArr = new int[sourceLen];
                              orderedArr[0] = source[1];
                              orderedArr[1] = source[0];
                              return orderedArr;
                          }
                      }
                  }

                  /**
                   * 使用插入排序,將兩個有序數(shù)組合并起來
                   *
                   * @param arr1 有序數(shù)組1
                   * @param arr2 有序數(shù)組2
                   * @return 合并后的有序數(shù)組
                   */
                  private int[] insertMerge(int[] arr1, int[] arr2) {
                      int[] result = new int[arr1.length + arr2.length];
                      int arr1Len = arr1.length;
                      int arr2Len = arr2.length;
                      int destLen = result.length;
                      // 簡單插入排序
                      for (int i = 0, array1Index = 0, array2Index = 0; i < destLen; i++) {
                          int value1 = array1Index >= arr1Len
                                  ? Integer.MAX_VALUE : arr1[array1Index];
                          int value2 = array2Index >= arr2Len
                                  ? Integer.MAX_VALUE : arr2[array2Index];
                          if (value1 < value2) {
                              array1Index++;
                              result[i] = value1;
                          }
                          else {
                              array2Index++;
                              result[i] = value2;
                          }
                      }
                      return result;
                  }

              }
          }

            思路很簡單,就是將數(shù)組一直拆分,直到最后一個或者兩個時,從最下面來開始排序,然后依次往上回溯,使用插入排序歸并結(jié)果集,最終返回排好序的值。如果除去任務(wù)拆分的過程,則時間復(fù)雜度還是非常好的 O(nlog(n)), 只是這任務(wù)拆分的過程,需要大量的空間復(fù)雜度,也不見得是什么好事。且不管它。

           

          4. fork/join框架的實現(xiàn)原理


            我們以上面的demo為出發(fā)點,觀察fork/join的工作過程,不知道100%,也八九不離十了。上面主要有幾個動作,一ForkJoinPool實例化,submit一個Task, get()等待最終結(jié)果完成。這三個看得見的動作好辦,只是其核心也許還在背后。

           

          4.1. ForkJoinPool構(gòu)造器

            每個要調(diào)用框架的應(yīng)用,必先初始化一個pool實例,這是自然。如上使用無參構(gòu)造器,實際上是使用了框架的各種默認(rèn)值而已, 這種默認(rèn)值往往是能夠滿足大部分的場景的,從而體現(xiàn)其易用性。

          // java.util.concurrent.ForkJoinPool#ForkJoinPool()
              /**
               * Creates a {@code ForkJoinPool} with parallelism equal to {@link
               * java.lang.Runtime#availableProcessors}, using the {@linkplain
               * #defaultForkJoinWorkerThreadFactory default thread factory},
               * no UncaughtExceptionHandler, and non-async LIFO processing mode.
               *
               * @throws SecurityException if a security manager exists and
               *         the caller is not permitted to modify threads
               *         because it does not hold {@link
               *         java.lang.RuntimePermission}{@code ("modifyThread")}
               */
              public ForkJoinPool() {
                  // 并行度默認(rèn)是cpu的核數(shù)
                  this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
                       defaultForkJoinWorkerThreadFactory, null, false);
              }
              /**
               * Creates a {@code ForkJoinPool} with the given parameters.
               *
               * @param parallelism the parallelism level. For default value,
               * use {@link java.lang.Runtime#availableProcessors}.
               * @param factory the factory for creating new threads. For default value,
               * use {@link #defaultForkJoinWorkerThreadFactory}.
               * @param handler the handler for internal worker threads that
               * terminate due to unrecoverable errors encountered while executing
               * tasks. For default value, use {@code null}.
               * @param asyncMode if true,
               * establishes local first-in-first-out scheduling mode for forked
               * tasks that are never joined. This mode may be more appropriate
               * than default locally stack-based mode in applications in which
               * worker threads only process event-style asynchronous tasks.
               * For default value, use {@code false}.
               * @throws IllegalArgumentException if parallelism less than or
               *         equal to zero, or greater than implementation limit
               * @throws NullPointerException if the factory is null
               * @throws SecurityException if a security manager exists and
               *         the caller is not permitted to modify threads
               *         because it does not hold {@link
               *         java.lang.RuntimePermission}{@code ("modifyThread")}
               */
              public ForkJoinPool(int parallelism,
                                  ForkJoinWorkerThreadFactory factory,
                                  UncaughtExceptionHandler handler,
                                  boolean asyncMode) {
                  this(checkParallelism(parallelism),
                       checkFactory(factory),
                       handler,
                       // FIFO_QUEUE = 1 << 16, LIFO_QUEUE = 0
                       asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
                       "ForkJoinPool-" + nextPoolId() + "-worker-");
                  checkPermission();
              }
              /**
               * Creates a {@code ForkJoinPool} with the given parameters, without
               * any security checks or parameter validation.  Invoked directly by
               * makeCommonPool.
               */
              private ForkJoinPool(int parallelism,
                                   ForkJoinWorkerThreadFactory factory,
                                   UncaughtExceptionHandler handler,
                                   int mode,
                                   String workerNamePrefix) {
                  this.workerNamePrefix = workerNamePrefix;
                  this.factory = factory;
                  this.ueh = handler;
                  this.config = (parallelism & SMASK) | mode;
                  long np = (long)(-parallelism); // offset ctl counts
                  this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
              }

            構(gòu)造器自然沒啥好說的,就是設(shè)置幾個并行度,初始化線程工廠,標(biāo)識等等。為下文做準(zhǔn)備。

           

          4.2. 任務(wù)submit過程


            上面的例子中,submit只有一次調(diào)用,而實際應(yīng)用中則不一定。但即使如此,一次submit, 其實背后也是有許多的動作的。因為這一個task里,又會生出許多task來。

          // java.util.concurrent.ForkJoinPool#submit
              /**
               * Submits a ForkJoinTask for execution.
               *
               * @param task the task to submit
               * @param <T> the type of the task's result
               * @return the task
               * @throws NullPointerException if the task is null
               * @throws RejectedExecutionException if the task cannot be
               *         scheduled for execution
               */
              public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
                  if (task == null)
                      throw new NullPointerException();
                  // submit主要是向pool中加入任務(wù)隊列
                  externalPush(task);
                  return task;
              }
              /**
               * Tries to add the given task to a submission queue at
               * submitter'
          s current queue. Only the (vastly) most common path
               * is directly handled in this method, while screening for need
               * for externalSubmit.
               *
               * @param task the task. Caller must ensure non-null.
               */
              final void externalPush(ForkJoinTask<?> task) {
                  WorkQueue[] ws; WorkQueue q; int m;
                  int r = ThreadLocalRandom.getProbe();
                  int rs = runState;
                  // 如果線程不是第一次進(jìn)入,且獲得鎖,則直接放隊列即可
                  // 否則走普通加入隊列邏輯
                  if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
                      (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
                      U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                      ForkJoinTask<?>[] a; int am, n, s;
                      if ((a = q.array) != null &&
                          (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                          int j = ((am & s) << ASHIFT) + ABASE;
                          // 通過 putOrderedObject 添加任務(wù)到隊列中
                          U.putOrderedObject(a, j, task);
                          U.putOrderedInt(q, QTOP, s + 1);
                          U.putIntVolatile(q, QLOCK, 0);
                          if (n <= 1)
                              signalWork(ws, q);
                          return;
                      }
                      U.compareAndSwapInt(q, QLOCK, 1, 0);
                  }
                  // 初始化時的submit或者通用 submit
                  externalSubmit(task);
              }
              
              /**
               * Full version of externalPush, handling uncommon cases, as well
               * as performing secondary initialization upon the first
               * submission of the first task to the pool.  It also detects
               * first submission by an external thread and creates a new shared
               * queue if the one at index if empty or contended.
               *
               * @param task the task. Caller must ensure non-null.
               */
              private void externalSubmit(ForkJoinTask<?> task) {
                  int r;                                    // initialize caller's probe
                  if ((r = ThreadLocalRandom.getProbe()) == 0) {
                      ThreadLocalRandom.localInit();
                      r = ThreadLocalRandom.getProbe();
                  }
                  for (;;) {
                      WorkQueue[] ws; WorkQueue q; int rs, m, k;
                      boolean move = false;
                      // 停止運(yùn)行
                      if ((rs = runState) < 0) {
                          tryTerminate(false, false);     // help terminate
                          throw new RejectedExecutionException();
                      }
                      // 未被初始化,先執(zhí)行初始化
                      else if ((rs & STARTED) == 0 ||     // initialize
                               ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
                          int ns = 0;
                          // 上鎖初始化
                          rs = lockRunState();
                          try {
                              if ((rs & STARTED) == 0) {
                                  U.compareAndSwapObject(this, STEALCOUNTER, null,
                                                         new AtomicLong());
                                  // create workQueues array with size a power of two
                                  int p = config & SMASK; // ensure at least 2 slots
                                  int n = (p > 1) ? p - 1 : 1;
                                  n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                                  n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                                  // 隊列數(shù)量初始化
                                  workQueues = new WorkQueue[n];
                                  ns = STARTED;
                              }
                          } finally {
                              unlockRunState(rs, (rs & ~RSLOCK) | ns);
                          }
                      }
                      // 當(dāng)前線程已添加過隊列
                      else if ((q = ws[k = r & m & SQMASK]) != null) {
                          // 上鎖添加到隊列中
                          if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                              ForkJoinTask<?>[] a = q.array;
                              // 取出棧頂指針,向其中放入任務(wù)
                              int s = q.top;
                              boolean submitted = false; // initial submission or resizing
                              try {                      // locked version of push
                                  if ((a != null && a.length > s + 1 - q.base) ||
                                      (a = q.growArray()) != null) {
                                      int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                                      U.putOrderedObject(a, j, task);
                                      U.putOrderedInt(q, QTOP, s + 1);
                                      submitted = true;
                                  }
                              } finally {
                                  U.compareAndSwapInt(q, QLOCK, 1, 0);
                              }
                              // 如果隊列添加成功,則喚醒一個 worker, 返回
                              // 否則進(jìn)入下一次嘗試添加過程
                              if (submitted) {
                                  signalWork(ws, q);
                                  return;
                              }
                          }
                          move = true;                   // move on failure
                      }
                      else if (((rs = runState) & RSLOCK) == 0) { // create new queue
                          q = new WorkQueue(this, null);
                          q.hint = r;
                          q.config = k | SHARED_QUEUE;
                          q.scanState = INACTIVE;
                          rs = lockRunState();           // publish index
                          if (rs > 0 &&  (ws = workQueues) != null &&
                              k < ws.length && ws[k] == null)
                              ws[k] = q;                 // else terminated
                          unlockRunState(rs, rs & ~RSLOCK);
                      }
                      else
                          move = true;                   // move if busy
                      // 如有必要,為當(dāng)前線程生成新的標(biāo)識
                      if (move)
                          r = ThreadLocalRandom.advanceProbe(r);
                  }
              }

            由上可知,submit主要初始化隊列以及向隊列中添加任務(wù),并在喚醒worker處理任務(wù)。但實際上,worker Thread 我們還沒有看到被激活,只是看到有隊workQueue的初始化。那么,worker又是在哪進(jìn)行初始化的呢?只可能是在 signal 的時候了。

           

          4.3. worker的初始化

            worker是真正執(zhí)行任務(wù)的線程,前面光看到添加隊列,以及喚醒worker了。只是這時還未見worker被初始化,實際上它是在被喚醒的邏輯中進(jìn)行初始化的。

          // java.util.concurrent.ForkJoinPool#signalWork
              /**
               * Tries to create or activate a worker if too few are active.
               *
               * @param ws the worker array to use to find signallees
               * @param q a WorkQueue --if non-null, don't retry if now empty
               */
              final void signalWork(WorkQueue[] ws, WorkQueue q) {
                  long c; int sp, i; WorkQueue v; Thread p;
                  while ((c = ctl) < 0L) {                       // too few active,一個標(biāo)識,分兩段使用,低位為0代表worker還可以添加
                      if ((sp = (int)c) == 0) {                  // no idle workers
                          if ((c & ADD_WORKER) != 0L)            // too few workers
                              tryAddWorker(c);
                          break;
                      }
                      if (ws == null)                            // unstarted/terminated
                          break;
                      if (ws.length <= (i = sp & SMASK))         // terminated
                          break;
                      if ((v = ws[i]) == null)                   // terminating
                          break;
                      int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
                      int d = sp - v.scanState;                  // screen CAS
                      long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
                      if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
                          v.scanState = vs;                      // activate v
                          if ((p = v.parker) != null)
                              U.unpark(p);
                          break;
                      }
                      if (q != null && q.base == q.top)          // no more work
                          break;
                  }
              }

              /**
               * Tries to add one worker, incrementing ctl counts before doing
               * so, relying on createWorker to back out on failure.
               *
               * @param c incoming ctl value, with total count negative and no
               * idle workers.  On CAS failure, c is refreshed and retried if
               * this holds (otherwise, a new worker is not needed).
               */
              private void tryAddWorker(long c) {
                  boolean add = false;
                  do {
                      long nc = ((AC_MASK & (c + AC_UNIT)) |
                                 (TC_MASK & (c + TC_UNIT)));
                      if (ctl == c) {
                          int rs, stop;                 // check if terminating
                          if ((stop = (rs = lockRunState()) & STOP) == 0)
                              add = U.compareAndSwapLong(this, CTL, c, nc);
                          unlockRunState(rs, rs & ~RSLOCK);
                          if (stop != 0)
                              break;
                          // 添加標(biāo)識成功,再創(chuàng)建worker
                          if (add) {
                              createWorker();
                              break;
                          }
                      }
                  } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
              }

              /**
               * Tries to construct and start one worker. Assumes that total
               * count has already been incremented as a reservation.  Invokes
               * deregisterWorker on any failure.
               *
               * @return true if successful
               */
              private boolean createWorker() {
                  ForkJoinWorkerThreadFactory fac = factory;
                  Throwable ex = null;
                  ForkJoinWorkerThread wt = null;
                  try {
                      // 調(diào)用線程工廠創(chuàng)建新的worker, 并立即啟動worker
                      if (fac != null && (wt = fac.newThread(this)) != null) {
                          wt.start();
                          return true;
                      }
                  } catch (Throwable rex) {
                      ex = rex;
                  }
                  // 創(chuàng)建失敗,處理異常
                  deregisterWorker(wt, ex);
                  return false;
              }
              /**
               * Default ForkJoinWorkerThreadFactory implementation; creates a
               * new ForkJoinWorkerThread.
               */
              static final class DefaultForkJoinWorkerThreadFactory
                  implements ForkJoinWorkerThreadFactory {
                  public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
                      return new ForkJoinWorkerThread(pool);
                  }
              }

            果然在signal時,創(chuàng)建worker。值得一提的,為了實現(xiàn)安全地添加worker,它會先更新成功ctl,然后再執(zhí)行真正的create操作。避免多創(chuàng)建出worker來。

           

          4.4. worker的工作原理

            前面看到worker創(chuàng)建過程,傳入了pool的實例,即當(dāng)前上下文都是被worker可見的。所以,它能很好地復(fù)用當(dāng)前的配置信息,而它自身是一個異步線程,在創(chuàng)建之后,立即被啟動起來了。那它后續(xù)則必然嘗試從隊列獲取任務(wù),進(jìn)行執(zhí)行了。具體如何?

          1. WorkerThread 構(gòu)造方法

          // java.util.concurrent.ForkJoinWorkerThread#ForkJoinWorkerThread
              /**
               * Creates a ForkJoinWorkerThread operating in the given pool.
               *
               * @param pool the pool this thread works in
               * @throws NullPointerException if pool is null
               */
              protected ForkJoinWorkerThread(ForkJoinPool pool) {
                  // Use a placeholder until a useful name can be set in registerWorker
                  super("aForkJoinWorkerThread");
                  this.pool = pool;
                  // workQueue 臨時向 pool 中進(jìn)行注冊所得
                  this.workQueue = pool.registerWorker(this);
              }
              
              /**
               * Callback from ForkJoinWorkerThread constructor to establish and
               * record its WorkQueue.
               *
               * @param wt the worker thread
               * @return the worker's queue
               */
              final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
                  UncaughtExceptionHandler handler;
                  wt.setDaemon(true);                           // configure thread
                  if ((handler = ueh) != null)
                      wt.setUncaughtExceptionHandler(handler);
                  WorkQueue w = new WorkQueue(this, wt);
                  int i = 0;                                    // assign a pool index
                  int mode = config & MODE_MASK;
                  int rs = lockRunState();
                  try {
                      WorkQueue[] ws; int n;                    // skip if no array
                      if ((ws = workQueues) != null && (n = ws.length) > 0) {
                          int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
                          int m = n - 1;
                          i = ((s << 1) | 1) & m;               // odd-numbered indices
                          if (ws[i] != null) {                  // collision
                              int probes = 0;                   // step by approx half n
                              int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                              while (ws[i = (i + step) & m] != null) {
                                  if (++probes >= n) {
                                      workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                                      m = n - 1;
                                      probes = 0;
                                  }
                              }
                          }
                          w.hint = s;                           // use as random seed
                          w.config = i | mode;
                          w.scanState = i;                      // publication fence
                          ws[i] = w;
                      }
                  } finally {
                      unlockRunState(rs, rs & ~RSLOCK);
                  }
                  wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
                  return w;
              }

            重點則是在 pool 中注冊自身,得到一個 workQueue. 而其具體業(yè)務(wù),則是在run方法中實現(xiàn)。

          // java.util.concurrent.ForkJoinWorkerThread#run
              /**
               * This method is required to be public, but should never be
               * called explicitly. It performs the main run loop to execute
               * {@link ForkJoinTask}s.
               */
              public void run() {
                  if (workQueue.array == null) { // only run once
                      Throwable exception = null;
                      try {
                          onStart();
                          pool.runWorker(workQueue);
                      } catch (Throwable ex) {
                          exception = ex;
                      } finally {
                          try {
                              onTermination(exception);
                          } catch (Throwable ex) {
                              if (exception == null)
                                  exception = ex;
                          } finally {
                              pool.deregisterWorker(this, exception);
                          }
                      }
                  }
              }
              // java.util.concurrent.ForkJoinPool#runWorker
              /**
               * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
               */
              final void runWorker(WorkQueue w) {
                  w.growArray();                   // allocate queue
                  int seed = w.hint;               // initially holds randomization hint
                  int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
                  for (ForkJoinTask<?> t;;) {
                      // 取任務(wù),執(zhí)行
                      if ((t = scan(w, r)) != null)
                          w.runTask(t);
                      else if (!awaitWork(w, r))
                          break;
                      r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
                  }
              }

                  /**
                   * Executes the given task and any remaining local tasks.
                   */
                  final void runTask(ForkJoinTask<?> task) {
                      if (task != null) {
                          scanState &= ~SCANNING; // mark as busy
                          (currentSteal = task).doExec();
                          U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
                          execLocalTasks();
                          ForkJoinWorkerThread thread = owner;
                          if (++nsteals < 0)      // collect on overflow
                              transferStealCount(pool);
                          scanState |= SCANNING;
                          if (thread != null)
                              thread.afterTopLevelExec();
                      }
                  }
              // java.util.concurrent.ForkJoinTask#doExec
              /**
               * Primary execution method for stolen tasks. Unless done, calls
               * exec and records status if completed, but doesn't wait for
               * completion otherwise.
               *
               * @return status on exit from this method
               */
              final int doExec() {
                  int s; boolean completed;
                  if ((s = status) >= 0) {
                      try {
                          completed = exec();
                      } catch (Throwable rex) {
                          return setExceptionalCompletion(rex);
                      }
                      if (completed)
                          s = setCompletion(NORMAL);
                  }
                  return s;
              }
              // java.util.concurrent.RecursiveTask#exec
              /**
               * Implements execution conventions for RecursiveTask.
               */
              protected final boolean exec() {
                  // 即調(diào)用具體業(yè)務(wù)類的 compute 方法
                  result = compute();
                  return true;
              }

            咱們草草看了 worker 如何運(yùn)行任務(wù)。這和線程池沒多少差別,大致仍是從隊列獲取任務(wù),然后執(zhí)行業(yè)務(wù)方法compute . 我們暫時略去了如何獲取任務(wù),以及如何執(zhí)行work-steal了。且看下節(jié)。

           

          4.5. 任務(wù)獲取實現(xiàn)

            主要是通過scan處理。

          // java.util.concurrent.ForkJoinPool#scan
              /**
               * Scans for and tries to steal a top-level task. Scans start at a
               * random location, randomly moving on apparent contention,
               * otherwise continuing linearly until reaching two consecutive
               * empty passes over all queues with the same checksum (summing
               * each base index of each queue, that moves on each steal), at
               * which point the worker tries to inactivate and then re-scans,
               * attempting to re-activate (itself or some other worker) if
               * finding a task; otherwise returning null to await work.  Scans
               * otherwise touch as little memory as possible, to reduce
               * disruption on other scanning threads.
               *
               * @param w the worker (via its WorkQueue)
               * @param r a random seed
               * @return a task, or null if none found
               */
              private ForkJoinTask<?> scan(WorkQueue w, int r) {
                  WorkQueue[] ws; int m;
                  if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
                      int ss = w.scanState;                     // initially non-negative
                      for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
                          WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
                          int b, n; long c;
                          // 首次獲取時,是從自身隊列中獲取
                          if ((q = ws[k]) != null) {
                              if ((n = (b = q.base) - q.top) < 0 &&
                                  (a = q.array) != null) {      // non-empty
                                  long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                                  if ((t = ((ForkJoinTask<?>)
                                            U.getObjectVolatile(a, i))) != null &&
                                      q.base == b) {
                                      if (ss >= 0) {
                                          if (U.compareAndSwapObject(a, i, t, null)) {
                                              q.base = b + 1;
                                              if (n < -1)       // signal others
                                                  signalWork(ws, q);
                                              return t;
                                          }
                                      }
                                      else if (oldSum == 0 &&   // try to activate
                                               w.scanState < 0)
                                          tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
                                  }
                                  if (ss < 0)                   // refresh
                                      ss = w.scanState;
                                  r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
                                  origin = k = r & m;           // move and rescan
                                  oldSum = checkSum = 0;
                                  continue;
                              }
                              checkSum += b;
                          }
                          if ((k = (k + 1) & m) == origin) {    // continue until stable
                              if ((ss >= 0 || (ss == (ss = w.scanState))) &&
                                  oldSum == (oldSum = checkSum)) {
                                  if (ss < 0 || w.qlock < 0)    // already inactive
                                      break;
                                  int ns = ss | INACTIVE;       // try to inactivate
                                  long nc = ((SP_MASK & ns) |
                                             (UC_MASK & ((c = ctl) - AC_UNIT)));
                                  w.stackPred = (int)c;         // hold prev stack top
                                  U.putInt(w, QSCANSTATE, ns);
                                  if (U.compareAndSwapLong(this, CTL, c, nc))
                                      ss = ns;
                                  else
                                      w.scanState = ss;         // back out
                              }
                              checkSum = 0;
                          }
                      }
                  }
                  return null;
              }

            要安全高效地實現(xiàn)一個獲取隊列還是不易啊。

           

          4.6. task.fork 實現(xiàn)

            一般地,能用上fork一詞的場景,一般是對于當(dāng)前環(huán)境的一個copy. 難道這里的fork也是這樣嗎?新開一個線程?不然又是如何找到需要處理的隊列的呢?

          // java.util.concurrent.ForkJoinTask#fork
              /**
               * Arranges to asynchronously execute this task in the pool the
               * current task is running inif applicable, or using the {@link
               * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}.  While
               * it is not necessarily enforced, it is a usage error to fork a
               * task more than once unless it has completed and been
               * reinitialized.  Subsequent modifications to the state of this
               * task or any data it operates on are not necessarily
               * consistently observable by any thread other than the one
               * executing it unless preceded by a call to {@link #join} or
               * related methods, or a call to {@link #isDone} returning {@code
               * true}.
               *
               * @return {@code this}, to simplify usage
               */
              public final ForkJoinTask<V> fork() {
                  Thread t;
                  // ForkJoinWorkerThread 中持有workQueue實例,可直接向其添加任務(wù)
                  if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
                      ((ForkJoinWorkerThread)t).workQueue.push(this);
                  else
                      // 如果是外部線程,則添加到一共享pool中即可,后續(xù)將其各空閑線程處理
                      ForkJoinPool.common.externalPush(this);
                  return this;
              }
                  // java.util.concurrent.ForkJoinPool.WorkQueue#push
                  /**
                   * Pushes a task. Call only by owner in unshared queues.  (The
                   * shared-queue version is embedded in method externalPush.)
                   *
                   * @param task the task. Caller must ensure non-null.
                   * @throws RejectedExecutionException if array cannot be resized
                   */
                  final void push(ForkJoinTask<?> task) {
                      ForkJoinTask<?>[] a; ForkJoinPool p;
                      int b = base, s = top, n;
                      if ((a = array) != null) {    // ignore if queue removed
                          int m = a.length - 1;     // fenced write for task visibility
                          U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
                          U.putOrderedInt(this, QTOP, s + 1);
                          if ((n = s - b) <= 1) {
                              if ((p = pool) != null)
                                  p.signalWork(p.workQueues, this);
                          }
                          else if (n >= m)
                              growArray();
                      }
                  }

          /**
           * A thread managed by a {@link ForkJoinPool}, which executes
           * {@link ForkJoinTask}s.
           * This class is subclassable solely for the sake of adding
           * functionality -- there are no overridable methods dealing with
           * scheduling or execution.  However, you can override initialization
           * and termination methods surrounding the main task processing loop.
           * If you do create such a subclass, you will also need to supply a
           * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to
           * {@linkplain ForkJoinPool#ForkJoinPool use it} in a {@code ForkJoinPool}.
           *
           * @since 1.7
           * @author Doug Lea
           */
          public class ForkJoinWorkerThread extends Thread {
              /*
               * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
               * ForkJoinTasks. For explanation, see the internal documentation
               * of class ForkJoinPool.
               *
               * This class just maintains links to its pool and WorkQueue.  The
               * pool field is set immediately upon construction, but the
               * workQueue field is not set until a call to registerWorker
               * completes. This leads to a visibility race, that is tolerated
               * by requiring that the workQueue field is only accessed by the
               * owning thread.
               *
               * Support for (non-public) subclass InnocuousForkJoinWorkerThread
               * requires that we break quite a lot of encapsulation (via Unsafe)
               * both here and in the subclass to access and set Thread fields.
               */

              final ForkJoinPool pool;                // the pool this thread works in
              final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
              ...
          }

            可見,fork的過程,即是向當(dāng)前線程中添加當(dāng)前任務(wù)而已,并沒有所謂的上下文copy過程。

           

          4.7. task.join 實現(xiàn)

            join的語義是,等待任務(wù)完成后返回。與 Thread.join()一致。只是有一個問題,即如果某個線程阻塞等待結(jié)果去了,那當(dāng)前線程自然就相當(dāng)于無法再被利用了。那后續(xù)的任務(wù)又何從談起呢?想來只有遞歸能夠解決這個問題了。但是遞歸往往又是在單線程中完成的,這豈不無法利用并發(fā)特性了?

            實際上,之所以被分作fork/join兩個步驟,意義就是在這。上一節(jié)我們看到,fork的過程是向隊列中添加了任務(wù),隨后就返回了。這時,如果當(dāng)前worker比較繁忙(在做任務(wù)拆分),則這些任務(wù)就會被其他worker竊取過去處理了。而其他任務(wù)在處理時,又會遇到自己的遞歸,從而將一個單線程的遞歸變?yōu)槎嗑€程的遞歸了。

            下面我們主要看一個線程的遞歸過程。join的本義只是等待當(dāng)前任務(wù)完成,但是當(dāng)前任務(wù)完成又要依賴于其子任務(wù)完成join, 子任務(wù)又要等待其子任務(wù)join, 因此形成遞歸。而join()返回的表象是compute()完成,所以這過程其實是伴隨著compute的運(yùn)算的。

          // java.util.concurrent.ForkJoinTask#join
              /**
               * Returns the result of the computation when it {@link #isDone is
               * done}.  This method differs from {@link #get()} in that
               * abnormal completion results in {@code RuntimeException} or
               * {@code Error}, not {@code ExecutionException}, and that
               * interrupts of the calling thread do <em>not</em> cause the
               * method to abruptly return by throwing {@code
               * InterruptedException}.
               *
               * @return the computed result
               */
              public final V join() {
                  int s;
                  if ((s = doJoin() & DONE_MASK) != NORMAL)
                      reportException(s);
                  // 任務(wù)完成后,主動獲取結(jié)果
                  return getRawResult();
              }
              /**
               * Throws exception, if any, associated with the given status.
               */
              private void reportException(int s) {
                  if (s == CANCELLED)
                      throw new CancellationException();
                  if (s == EXCEPTIONAL)
                      rethrow(getThrowableException());
              }
              // java.util.concurrent.RecursiveTask#getRawResult
              public final V getRawResult() {
                  return result;
              }


              /**
               * Implementation for join, get, quietlyJoin. Directly handles
               * only cases of already-completed, external wait, and
               * unfork+exec.  Others are relayed to ForkJoinPool.awaitJoin.
               *
               * @return status upon completion
               */
              private int doJoin() {
                  int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
                  return (s = status) < 0 ? s :
                      ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
                      // 取當(dāng)前任務(wù)執(zhí)行, doExec 執(zhí)行任務(wù),awaitJoin 等待執(zhí)行完成
                      (w = (wt = (ForkJoinWorkerThread)t).workQueue).
                      tryUnpush(this) && (s = doExec()) < 0 ? s :
                      wt.pool.awaitJoin(w, this, 0L) :
                      externalAwaitDone();
              }

              // java.util.concurrent.ForkJoinPool#awaitJoin
              /**
               * Helps and/or blocks until the given task is done or timeout.
               *
               * @param w caller
               * @param task the task
               * @param deadline for timed waits, if nonzero
               * @return task status on exit
               */
              final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
                  int s = 0;
                  if (task != null && w != null) {
                      ForkJoinTask<?> prevJoin = w.currentJoin;
                      U.putOrderedObject(w, QCURRENTJOIN, task);
                      CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
                          (CountedCompleter<?>)task : null;
                      for (;;) {
                          if ((s = task.status) < 0)
                              break;
                          if (cc != null)
                              helpComplete(w, cc, 0);
                          // 遞歸添加任務(wù)等待完成
                          else if (w.base == w.top || w.tryRemoveAndExec(task))
                              helpStealer(w, task);
                          if ((s = task.status) < 0)
                              break;
                          long ms, ns;
                          if (deadline == 0L)
                              ms = 0L;
                          else if ((ns = deadline - System.nanoTime()) <= 0L)
                              break;
                          else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                              ms = 1L;
                          if (tryCompensate(w)) {
                              task.internalWait(ms);
                              U.getAndAddLong(this, CTL, AC_UNIT);
                          }
                      }
                      U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
                  }
                  return s;
              }
                  // java.util.concurrent.ForkJoinPool.WorkQueue#tryRemoveAndExec
                  /**
                   * If present, removes from queue and executes the given task,
                   * or any other cancelled task. Used only by awaitJoin.
                   *
                   * @return true if queue empty and task not known to be done
                   */
                  final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
                      ForkJoinTask<?>[] a; int m, s, b, n;
                      if ((a = array) != null && (m = a.length - 1) >= 0 &&
                          task != null) {
                          while ((n = (s = top) - (b = base)) > 0) {
                              for (ForkJoinTask<?> t;;) {      // traverse from s to b
                                  long j = ((--s & m) << ASHIFT) + ABASE;
                                  if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
                                      return s + 1 == top;     // shorter than expected
                                  else if (t == task) {
                                      boolean removed = false;
                                      if (s + 1 == top) {      // pop
                                          if (U.compareAndSwapObject(a, j, task, null)) {
                                              U.putOrderedInt(this, QTOP, s);
                                              removed = true;
                                          }
                                      }
                                      else if (base == b)      // replace with proxy
                                          removed = U.compareAndSwapObject(
                                              a, j, task, new EmptyTask());
                                      // 執(zhí)行子任務(wù)
                                      if (removed)
                                          task.doExec();
                                      break;
                                  }
                                  else if (t.status < 0 && s + 1 == top) {
                                      if (U.compareAndSwapObject(a, j, t, null))
                                          U.putOrderedInt(this, QTOP, s);
                                      break;                  // was cancelled
                                  }
                                  if (--n == 0)
                                      return false;
                              }
                              if (task.status < 0)
                                  return false;
                          }
                      }
                      return true;
                  }

            可見,最終fork/join還是使用遞歸完成join任務(wù)等待。差別在于其利用了多線程的優(yōu)勢,同時執(zhí)行多個任務(wù)。這有兩個好處,一是減輕了單線程的任務(wù)處理壓力,二是讓遞歸的深度也分擔(dān)到了多個點上。避免了棧早早溢出的可能。

           

            只是每個線程被分配的任務(wù)數(shù)是多少,join需要等待的結(jié)果有多少,就不太好說了。比如最上層的線程如果任務(wù)被別的線程搶走,則它就只需一直在等結(jié)果就行了。而最下面的線程,則需要承擔(dān)最深的遞歸深度,以保證程序的最終出口。其實從這個點,我們自己可以做個猜想,如果沒有做好控制,讓線程之間任意執(zhí)行任務(wù),是否會造成死鎖呢?這恐怕是個問題。












          瀏覽 69
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日本黄色电影一区 | 亚洲欧美在线观看久99一区 | 靠逼网站在线看 | 人人干人人操人人模 | 日日操夜夜操天天操 |