談?wù)刦ork/join實(shí)現(xiàn)原理
害,又是一個(gè)炒冷飯的時(shí)間。fork/join是在jdk1.7中出現(xiàn)的一個(gè)并發(fā)工作包,其特點(diǎn)是可以將一個(gè)大的任務(wù)拆分成多個(gè)子任務(wù)進(jìn)行并行處理,最后將子任務(wù)結(jié)果合并成最后的計(jì)算結(jié)果,并進(jìn)行輸出。從而達(dá)到多線程分發(fā)任務(wù),達(dá)到高效處理的目的。
1. 關(guān)于fork/join的一點(diǎn)想法
以上說(shuō)法,也許大家沒(méi)什么感覺(jué)。但換個(gè)說(shuō)法可能會(huì)更讓人體會(huì)深切。總體上,相當(dāng)于一個(gè)map階段數(shù)據(jù)拆分,一個(gè)reduce階段數(shù)據(jù)收集。即一個(gè)mapreduce過(guò)程,是不是有大數(shù)據(jù)的思想在了。只不過(guò)這fork/join的拆分難度可見(jiàn)性更大(自己手動(dòng)拆,mapreduce由shuffle組件自動(dòng)拆),另外fork/join是在一個(gè)機(jī)器上運(yùn)行,而大數(shù)據(jù)的框架,則是在分布式系統(tǒng)中運(yùn)行的。
從這個(gè)點(diǎn)說(shuō)來(lái),好像研究fork/join就顯得有些意義了。
只是,按照f(shuō)ork/join的語(yǔ)義解釋,是將任務(wù)拆分,然后處理,然后再合并結(jié)果。如果沒(méi)有了合并結(jié)果這一步,那么,它就等同于線程池了,這也就是有人說(shuō)它與線程池有啥差別的疑惑所在了。再說(shuō)有需要收集結(jié)果的這一語(yǔ)義,其實(shí)我們也是可以通過(guò)線程池去執(zhí)行任務(wù),然后再用get()得到結(jié)果,然后在外部做合并,也是一樣咯。
2. fork/join的幾個(gè)核心類
fork/join被稱作執(zhí)行框架,自然不會(huì)是一個(gè)單一組件問(wèn)題了。
首先,它會(huì)有一個(gè) ForkJoinPool, 相當(dāng)于線程池, 所有的任務(wù)都要通過(guò)它來(lái)進(jìn)行提交,然后由其進(jìn)行統(tǒng)一調(diào)度。
然后,每個(gè)任務(wù)都會(huì)有許多相同的代碼,只有業(yè)務(wù)實(shí)現(xiàn)是不一樣的,所以它會(huì)有一個(gè)基類:RecursiveTask . 實(shí)現(xiàn)上還有一個(gè)無(wú)返回結(jié)果的類:RecursiveAction, 只是沒(méi)有返回結(jié)果時(shí),往往又可能可以使用普通線程池執(zhí)行替代了。(沒(méi)有絕對(duì))
ForkJoinWorkerThreadFactory, 是fork/join框架的線程工廠類,原本含義與普通的線程工廠類一致,只是它的入?yún)⒉辉偈且粋€(gè)個(gè) Runnable 任務(wù),而是 ForkJoinPool, 因?yàn)樗鼈兯幍纳舷挛氖遣灰粯拥摹?/span>
ForkJoinWorkerThread, 執(zhí)行fork/join的具體線程,它可能在執(zhí)行過(guò)程中,再去主動(dòng)添加task。而它自身?yè)碛幸粋€(gè)隊(duì)列,它的主要任務(wù)就是獲取隊(duì)列任務(wù),然后執(zhí)行。但當(dāng)其自身的隊(duì)列完成時(shí),它可以通過(guò)work-steal算法竊取其他線程的隊(duì)列任務(wù)。這也是fork/join的核心所在。
sun.misc.Unsafe, 之所以要提到這個(gè)jdk類,是因?yàn)樵趂ork/join框架中,對(duì)于隊(duì)列的管理,不是通過(guò)普通的list或數(shù)組來(lái)實(shí)現(xiàn),而是通過(guò) U.putOrderedObject(a, j, task); 來(lái)存放,雖然效果與數(shù)組是一樣的,但它會(huì)更簡(jiǎn)單地實(shí)現(xiàn)線程安全的操作。只是,其中有許多的位操作,值得學(xué)習(xí)的同時(shí),也顯得有些麻煩了。
3. fork/join使用樣例
我們通過(guò)對(duì)一個(gè)數(shù)組的排序過(guò)程,使用fork/join來(lái)實(shí)現(xiàn)看看如何使用這框架。尤其對(duì)于大數(shù)組的排序,顯得還是有用的。這種大數(shù)組的排序,一般都會(huì)使用快速排序或者歸并排序來(lái)處理。此處使用fork/join框架來(lái)處理,也是暗合了歸并排序的道理了。
import java.util.Arrays;import java.util.Random;import java.util.concurrent.ExecutionException;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.ForkJoinTask;import java.util.concurrent.RecursiveTask;/*** Fork/join框架測(cè)試*/public class TestForkJoinFramework {public static void main(String[] args) {long beginTime = System.currentTimeMillis();ForkJoinPool pool = new ForkJoinPool();int mockArrLen = 1000_0000;int[] arr = new int[mockArrLen];Random r = new Random();for (int index = 1; index <= mockArrLen; index++) {arr[index - 1] = r.nextInt(1000_0000);}FJOrderTask task = new FJOrderTask(arr);ForkJoinTask<int[]> taskResult = pool.submit(task);try {// 等待結(jié)果完成taskResult.get();} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}long endTime = System.currentTimeMillis();System.out.println("耗時(shí)=" + (endTime - beginTime));}/*** 單個(gè)排序的子任務(wù)*/private static class FJOrderTask extends RecursiveTask<int[]> {/*** 當(dāng)前排序的數(shù)組值*/private final int[] source;public FJOrderTask(int[] source) {this.source = source;}/*** 真正的業(yè)務(wù)計(jì)算邏輯** @see java.util.concurrent.RecursiveTask#compute()*/@Overrideprotected int[] compute() {int sourceLen = source.length;// 如果條件成立,說(shuō)明任務(wù)中要進(jìn)行排序的集合還不夠小System.out.println(Thread.currentThread());if (sourceLen > 2) {int midIndex = sourceLen / 2;// 拆分成兩個(gè)子任務(wù), 0 -> mid - 1, mid -> lenFJOrderTask task1 = new FJOrderTask(Arrays.copyOf(source, midIndex));task1.fork();FJOrderTask task2 = new FJOrderTask(Arrays.copyOfRange(source, midIndex, sourceLen));task2.fork();// 將兩個(gè)有序的數(shù)組,合并成一個(gè)有序的數(shù)組int[] result1 = task1.join();int[] result2 = task2.join();return insertMerge(result1, result2);}// 否則說(shuō)明集合中只有一個(gè)或者兩個(gè)元素,可以進(jìn)行這兩個(gè)元素的比較排序了else {// 如果條件成立,說(shuō)明數(shù)組中只有一個(gè)元素,或者是數(shù)組中的元素都已經(jīng)排列好位置了if (sourceLen == 1|| source[0] <= source[1]) {return source;} else {int[] orderedArr = new int[sourceLen];orderedArr[0] = source[1];orderedArr[1] = source[0];return orderedArr;}}}/*** 使用插入排序,將兩個(gè)有序數(shù)組合并起來(lái)** @param arr1 有序數(shù)組1* @param arr2 有序數(shù)組2* @return 合并后的有序數(shù)組*/private int[] insertMerge(int[] arr1, int[] arr2) {int[] result = new int[arr1.length + arr2.length];int arr1Len = arr1.length;int arr2Len = arr2.length;int destLen = result.length;// 簡(jiǎn)單插入排序for (int i = 0, array1Index = 0, array2Index = 0; i < destLen; i++) {int value1 = array1Index >= arr1Len? Integer.MAX_VALUE : arr1[array1Index];int value2 = array2Index >= arr2Len? Integer.MAX_VALUE : arr2[array2Index];if (value1 < value2) {array1Index++;result[i] = value1;}else {array2Index++;result[i] = value2;}}return result;}}}
思路很簡(jiǎn)單,就是將數(shù)組一直拆分,直到最后一個(gè)或者兩個(gè)時(shí),從最下面來(lái)開(kāi)始排序,然后依次往上回溯,使用插入排序歸并結(jié)果集,最終返回排好序的值。如果除去任務(wù)拆分的過(guò)程,則時(shí)間復(fù)雜度還是非常好的 O(nlog(n)), 只是這任務(wù)拆分的過(guò)程,需要大量的空間復(fù)雜度,也不見(jiàn)得是什么好事。且不管它。
4. fork/join框架的實(shí)現(xiàn)原理
我們以上面的demo為出發(fā)點(diǎn),觀察fork/join的工作過(guò)程,不知道100%,也八九不離十了。上面主要有幾個(gè)動(dòng)作,一ForkJoinPool實(shí)例化,submit一個(gè)Task, get()等待最終結(jié)果完成。這三個(gè)看得見(jiàn)的動(dòng)作好辦,只是其核心也許還在背后。
4.1. ForkJoinPool構(gòu)造器
每個(gè)要調(diào)用框架的應(yīng)用,必先初始化一個(gè)pool實(shí)例,這是自然。如上使用無(wú)參構(gòu)造器,實(shí)際上是使用了框架的各種默認(rèn)值而已, 這種默認(rèn)值往往是能夠滿足大部分的場(chǎng)景的,從而體現(xiàn)其易用性。
// java.util.concurrent.ForkJoinPool#ForkJoinPool()/*** Creates a {@code ForkJoinPool} with parallelism equal to {@link* java.lang.Runtime#availableProcessors}, using the {@linkplain* #defaultForkJoinWorkerThreadFactory default thread factory},* no UncaughtExceptionHandler, and non-async LIFO processing mode.** @throws SecurityException if a security manager exists and* the caller is not permitted to modify threads* because it does not hold {@link* java.lang.RuntimePermission}{@code ("modifyThread")}*/public ForkJoinPool() {// 并行度默認(rèn)是cpu的核數(shù)this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),defaultForkJoinWorkerThreadFactory, null, false);}/*** Creates a {@code ForkJoinPool} with the given parameters.** @param parallelism the parallelism level. For default value,* use {@link java.lang.Runtime#availableProcessors}.* @param factory the factory for creating new threads. For default value,* use {@link #defaultForkJoinWorkerThreadFactory}.* @param handler the handler for internal worker threads that* terminate due to unrecoverable errors encountered while executing* tasks. For default value, use {@code null}.* @param asyncMode if true,* establishes local first-in-first-out scheduling mode for forked* tasks that are never joined. This mode may be more appropriate* than default locally stack-based mode in applications in which* worker threads only process event-style asynchronous tasks.* For default value, use {@code false}.* @throws IllegalArgumentException if parallelism less than or* equal to zero, or greater than implementation limit* @throws NullPointerException if the factory is null* @throws SecurityException if a security manager exists and* the caller is not permitted to modify threads* because it does not hold {@link* java.lang.RuntimePermission}{@code ("modifyThread")}*/public ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,boolean asyncMode) {this(checkParallelism(parallelism),checkFactory(factory),handler,// FIFO_QUEUE = 1 << 16, LIFO_QUEUE = 0asyncMode ? FIFO_QUEUE : LIFO_QUEUE,"ForkJoinPool-" + nextPoolId() + "-worker-");checkPermission();}/*** Creates a {@code ForkJoinPool} with the given parameters, without* any security checks or parameter validation. Invoked directly by* makeCommonPool.*/private ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,int mode,String workerNamePrefix) {this.workerNamePrefix = workerNamePrefix;this.factory = factory;this.ueh = handler;this.config = (parallelism & SMASK) | mode;long np = (long)(-parallelism); // offset ctl countsthis.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);}
構(gòu)造器自然沒(méi)啥好說(shuō)的,就是設(shè)置幾個(gè)并行度,初始化線程工廠,標(biāo)識(shí)等等。為下文做準(zhǔn)備。
4.2. 任務(wù)submit過(guò)程
上面的例子中,submit只有一次調(diào)用,而實(shí)際應(yīng)用中則不一定。但即使如此,一次submit, 其實(shí)背后也是有許多的動(dòng)作的。因?yàn)檫@一個(gè)task里,又會(huì)生出許多task來(lái)。
// java.util.concurrent.ForkJoinPool#submit/*** Submits a ForkJoinTask for execution.** @param task the task to submit* @param <T> the type of the task's result* @return the task* @throws NullPointerException if the task is null* @throws RejectedExecutionException if the task cannot be* scheduled for execution*/public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {if (task == null)throw new NullPointerException();// submit主要是向pool中加入任務(wù)隊(duì)列externalPush(task);return task;}/*** Tries to add the given task to a submission queue at* submitter's current queue. Only the (vastly) most common path* is directly handled in this method, while screening for need* for externalSubmit.** @param task the task. Caller must ensure non-null.*/final void externalPush(ForkJoinTask<?> task) {WorkQueue[] ws; WorkQueue q; int m;int r = ThreadLocalRandom.getProbe();int rs = runState;// 如果線程不是第一次進(jìn)入,且獲得鎖,則直接放隊(duì)列即可// 否則走普通加入隊(duì)列邏輯if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a; int am, n, s;if ((a = q.array) != null &&(am = a.length - 1) > (n = (s = q.top) - q.base)) {int j = ((am & s) << ASHIFT) + ABASE;// 通過(guò) putOrderedObject 添加任務(wù)到隊(duì)列中U.putOrderedObject(a, j, task);U.putOrderedInt(q, QTOP, s + 1);U.putIntVolatile(q, QLOCK, 0);if (n <= 1)signalWork(ws, q);return;}U.compareAndSwapInt(q, QLOCK, 1, 0);}// 初始化時(shí)的submit或者通用 submitexternalSubmit(task);}/*** Full version of externalPush, handling uncommon cases, as well* as performing secondary initialization upon the first* submission of the first task to the pool. It also detects* first submission by an external thread and creates a new shared* queue if the one at index if empty or contended.** @param task the task. Caller must ensure non-null.*/private void externalSubmit(ForkJoinTask<?> task) {int r; // initialize caller's probeif ((r = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit();r = ThreadLocalRandom.getProbe();}for (;;) {WorkQueue[] ws; WorkQueue q; int rs, m, k;boolean move = false;// 停止運(yùn)行if ((rs = runState) < 0) {tryTerminate(false, false); // help terminatethrow new RejectedExecutionException();}// 未被初始化,先執(zhí)行初始化else if ((rs & STARTED) == 0 || // initialize((ws = workQueues) == null || (m = ws.length - 1) < 0)) {int ns = 0;// 上鎖初始化rs = lockRunState();try {if ((rs & STARTED) == 0) {U.compareAndSwapObject(this, STEALCOUNTER, null,new AtomicLong());// create workQueues array with size a power of twoint p = config & SMASK; // ensure at least 2 slotsint n = (p > 1) ? p - 1 : 1;n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;// 隊(duì)列數(shù)量初始化workQueues = new WorkQueue[n];ns = STARTED;}} finally {unlockRunState(rs, (rs & ~RSLOCK) | ns);}}// 當(dāng)前線程已添加過(guò)隊(duì)列else if ((q = ws[k = r & m & SQMASK]) != null) {// 上鎖添加到隊(duì)列中if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a = q.array;// 取出棧頂指針,向其中放入任務(wù)int s = q.top;boolean submitted = false; // initial submission or resizingtry { // locked version of pushif ((a != null && a.length > s + 1 - q.base) ||(a = q.growArray()) != null) {int j = (((a.length - 1) & s) << ASHIFT) + ABASE;U.putOrderedObject(a, j, task);U.putOrderedInt(q, QTOP, s + 1);submitted = true;}} finally {U.compareAndSwapInt(q, QLOCK, 1, 0);}// 如果隊(duì)列添加成功,則喚醒一個(gè) worker, 返回// 否則進(jìn)入下一次嘗試添加過(guò)程if (submitted) {signalWork(ws, q);return;}}move = true; // move on failure}else if (((rs = runState) & RSLOCK) == 0) { // create new queueq = new WorkQueue(this, null);q.hint = r;q.config = k | SHARED_QUEUE;q.scanState = INACTIVE;rs = lockRunState(); // publish indexif (rs > 0 && (ws = workQueues) != null &&k < ws.length && ws[k] == null)ws[k] = q; // else terminatedunlockRunState(rs, rs & ~RSLOCK);}elsemove = true; // move if busy// 如有必要,為當(dāng)前線程生成新的標(biāo)識(shí)if (move)r = ThreadLocalRandom.advanceProbe(r);}}
由上可知,submit主要初始化隊(duì)列以及向隊(duì)列中添加任務(wù),并在喚醒worker處理任務(wù)。但實(shí)際上,worker Thread 我們還沒(méi)有看到被激活,只是看到有隊(duì)workQueue的初始化。那么,worker又是在哪進(jìn)行初始化的呢?只可能是在 signal 的時(shí)候了。
4.3. worker的初始化
worker是真正執(zhí)行任務(wù)的線程,前面光看到添加隊(duì)列,以及喚醒worker了。只是這時(shí)還未見(jiàn)worker被初始化,實(shí)際上它是在被喚醒的邏輯中進(jìn)行初始化的。
// java.util.concurrent.ForkJoinPool#signalWork/*** Tries to create or activate a worker if too few are active.** @param ws the worker array to use to find signallees* @param q a WorkQueue --if non-null, don't retry if now empty*/final void signalWork(WorkQueue[] ws, WorkQueue q) {long c; int sp, i; WorkQueue v; Thread p;while ((c = ctl) < 0L) { // too few active,一個(gè)標(biāo)識(shí),分兩段使用,低位為0代表worker還可以添加if ((sp = (int)c) == 0) { // no idle workersif ((c & ADD_WORKER) != 0L) // too few workerstryAddWorker(c);break;}if (ws == null) // unstarted/terminatedbreak;if (ws.length <= (i = sp & SMASK)) // terminatedbreak;if ((v = ws[i]) == null) // terminatingbreak;int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanStateint d = sp - v.scanState; // screen CASlong nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {v.scanState = vs; // activate vif ((p = v.parker) != null)U.unpark(p);break;}if (q != null && q.base == q.top) // no more workbreak;}}/*** Tries to add one worker, incrementing ctl counts before doing* so, relying on createWorker to back out on failure.** @param c incoming ctl value, with total count negative and no* idle workers. On CAS failure, c is refreshed and retried if* this holds (otherwise, a new worker is not needed).*/private void tryAddWorker(long c) {boolean add = false;do {long nc = ((AC_MASK & (c + AC_UNIT)) |(TC_MASK & (c + TC_UNIT)));if (ctl == c) {int rs, stop; // check if terminatingif ((stop = (rs = lockRunState()) & STOP) == 0)add = U.compareAndSwapLong(this, CTL, c, nc);unlockRunState(rs, rs & ~RSLOCK);if (stop != 0)break;// 添加標(biāo)識(shí)成功,再創(chuàng)建workerif (add) {createWorker();break;}}} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);}/*** Tries to construct and start one worker. Assumes that total* count has already been incremented as a reservation. Invokes* deregisterWorker on any failure.** @return true if successful*/private boolean createWorker() {ForkJoinWorkerThreadFactory fac = factory;Throwable ex = null;ForkJoinWorkerThread wt = null;try {// 調(diào)用線程工廠創(chuàng)建新的worker, 并立即啟動(dòng)workerif (fac != null && (wt = fac.newThread(this)) != null) {wt.start();return true;}} catch (Throwable rex) {ex = rex;}// 創(chuàng)建失敗,處理異常deregisterWorker(wt, ex);return false;}/*** Default ForkJoinWorkerThreadFactory implementation; creates a* new ForkJoinWorkerThread.*/static final class DefaultForkJoinWorkerThreadFactoryimplements ForkJoinWorkerThreadFactory {public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {return new ForkJoinWorkerThread(pool);}}
果然在signal時(shí),創(chuàng)建worker。值得一提的,為了實(shí)現(xiàn)安全地添加worker,它會(huì)先更新成功ctl,然后再執(zhí)行真正的create操作。避免多創(chuàng)建出worker來(lái)。
4.4. worker的工作原理
前面看到worker創(chuàng)建過(guò)程,傳入了pool的實(shí)例,即當(dāng)前上下文都是被worker可見(jiàn)的。所以,它能很好地復(fù)用當(dāng)前的配置信息,而它自身是一個(gè)異步線程,在創(chuàng)建之后,立即被啟動(dòng)起來(lái)了。那它后續(xù)則必然嘗試從隊(duì)列獲取任務(wù),進(jìn)行執(zhí)行了。具體如何?
1. WorkerThread 構(gòu)造方法
// java.util.concurrent.ForkJoinWorkerThread#ForkJoinWorkerThread/*** Creates a ForkJoinWorkerThread operating in the given pool.** @param pool the pool this thread works in* @throws NullPointerException if pool is null*/protected ForkJoinWorkerThread(ForkJoinPool pool) {// Use a placeholder until a useful name can be set in registerWorkersuper("aForkJoinWorkerThread");this.pool = pool;// workQueue 臨時(shí)向 pool 中進(jìn)行注冊(cè)所得this.workQueue = pool.registerWorker(this);}/*** Callback from ForkJoinWorkerThread constructor to establish and* record its WorkQueue.** @param wt the worker thread* @return the worker's queue*/final WorkQueue registerWorker(ForkJoinWorkerThread wt) {UncaughtExceptionHandler handler;wt.setDaemon(true); // configure threadif ((handler = ueh) != null)wt.setUncaughtExceptionHandler(handler);WorkQueue w = new WorkQueue(this, wt);int i = 0; // assign a pool indexint mode = config & MODE_MASK;int rs = lockRunState();try {WorkQueue[] ws; int n; // skip if no arrayif ((ws = workQueues) != null && (n = ws.length) > 0) {int s = indexSeed += SEED_INCREMENT; // unlikely to collideint m = n - 1;i = ((s << 1) | 1) & m; // odd-numbered indicesif (ws[i] != null) { // collisionint probes = 0; // step by approx half nint step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;while (ws[i = (i + step) & m] != null) {if (++probes >= n) {workQueues = ws = Arrays.copyOf(ws, n <<= 1);m = n - 1;probes = 0;}}}w.hint = s; // use as random seedw.config = i | mode;w.scanState = i; // publication fencews[i] = w;}} finally {unlockRunState(rs, rs & ~RSLOCK);}wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));return w;}
重點(diǎn)則是在 pool 中注冊(cè)自身,得到一個(gè) workQueue. 而其具體業(yè)務(wù),則是在run方法中實(shí)現(xiàn)。
// java.util.concurrent.ForkJoinWorkerThread#run/*** This method is required to be public, but should never be* called explicitly. It performs the main run loop to execute* {@link ForkJoinTask}s.*/public void run() {if (workQueue.array == null) { // only run onceThrowable exception = null;try {onStart();pool.runWorker(workQueue);} catch (Throwable ex) {exception = ex;} finally {try {onTermination(exception);} catch (Throwable ex) {if (exception == null)exception = ex;} finally {pool.deregisterWorker(this, exception);}}}}// java.util.concurrent.ForkJoinPool#runWorker/*** Top-level runloop for workers, called by ForkJoinWorkerThread.run.*/final void runWorker(WorkQueue w) {w.growArray(); // allocate queueint seed = w.hint; // initially holds randomization hintint r = (seed == 0) ? 1 : seed; // avoid 0 for xorShiftfor (ForkJoinTask<?> t;;) {// 取任務(wù),執(zhí)行if ((t = scan(w, r)) != null)w.runTask(t);else if (!awaitWork(w, r))break;r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift}}/*** Executes the given task and any remaining local tasks.*/final void runTask(ForkJoinTask<?> task) {if (task != null) {scanState &= ~SCANNING; // mark as busy(currentSteal = task).doExec();U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GCexecLocalTasks();ForkJoinWorkerThread thread = owner;if (++nsteals < 0) // collect on overflowtransferStealCount(pool);scanState |= SCANNING;if (thread != null)thread.afterTopLevelExec();}}// java.util.concurrent.ForkJoinTask#doExec/*** Primary execution method for stolen tasks. Unless done, calls* exec and records status if completed, but doesn't wait for* completion otherwise.** @return status on exit from this method*/final int doExec() {int s; boolean completed;if ((s = status) >= 0) {try {completed = exec();} catch (Throwable rex) {return setExceptionalCompletion(rex);}if (completed)s = setCompletion(NORMAL);}return s;}// java.util.concurrent.RecursiveTask#exec/*** Implements execution conventions for RecursiveTask.*/protected final boolean exec() {// 即調(diào)用具體業(yè)務(wù)類的 compute 方法result = compute();return true;}
咱們草草看了 worker 如何運(yùn)行任務(wù)。這和線程池沒(méi)多少差別,大致仍是從隊(duì)列獲取任務(wù),然后執(zhí)行業(yè)務(wù)方法compute . 我們暫時(shí)略去了如何獲取任務(wù),以及如何執(zhí)行work-steal了。且看下節(jié)。
4.5. 任務(wù)獲取實(shí)現(xiàn)
主要是通過(guò)scan處理。
// java.util.concurrent.ForkJoinPool#scan/*** Scans for and tries to steal a top-level task. Scans start at a* random location, randomly moving on apparent contention,* otherwise continuing linearly until reaching two consecutive* empty passes over all queues with the same checksum (summing* each base index of each queue, that moves on each steal), at* which point the worker tries to inactivate and then re-scans,* attempting to re-activate (itself or some other worker) if* finding a task; otherwise returning null to await work. Scans* otherwise touch as little memory as possible, to reduce* disruption on other scanning threads.** @param w the worker (via its WorkQueue)* @param r a random seed* @return a task, or null if none found*/private ForkJoinTask<?> scan(WorkQueue w, int r) {WorkQueue[] ws; int m;if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {int ss = w.scanState; // initially non-negativefor (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;int b, n; long c;// 首次獲取時(shí),是從自身隊(duì)列中獲取if ((q = ws[k]) != null) {if ((n = (b = q.base) - q.top) < 0 &&(a = q.array) != null) { // non-emptylong i = (((a.length - 1) & b) << ASHIFT) + ABASE;if ((t = ((ForkJoinTask<?>)U.getObjectVolatile(a, i))) != null &&q.base == b) {if (ss >= 0) {if (U.compareAndSwapObject(a, i, t, null)) {q.base = b + 1;if (n < -1) // signal otherssignalWork(ws, q);return t;}}else if (oldSum == 0 && // try to activatew.scanState < 0)tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);}if (ss < 0) // refreshss = w.scanState;r ^= r << 1; r ^= r >>> 3; r ^= r << 10;origin = k = r & m; // move and rescanoldSum = checkSum = 0;continue;}checkSum += b;}if ((k = (k + 1) & m) == origin) { // continue until stableif ((ss >= 0 || (ss == (ss = w.scanState))) &&oldSum == (oldSum = checkSum)) {if (ss < 0 || w.qlock < 0) // already inactivebreak;int ns = ss | INACTIVE; // try to inactivatelong nc = ((SP_MASK & ns) |(UC_MASK & ((c = ctl) - AC_UNIT)));w.stackPred = (int)c; // hold prev stack topU.putInt(w, QSCANSTATE, ns);if (U.compareAndSwapLong(this, CTL, c, nc))ss = ns;elsew.scanState = ss; // back out}checkSum = 0;}}}return null;}
要安全高效地實(shí)現(xiàn)一個(gè)獲取隊(duì)列還是不易啊。
4.6. task.fork 實(shí)現(xiàn)
一般地,能用上fork一詞的場(chǎng)景,一般是對(duì)于當(dāng)前環(huán)境的一個(gè)copy. 難道這里的fork也是這樣嗎?新開(kāi)一個(gè)線程?不然又是如何找到需要處理的隊(duì)列的呢?
// java.util.concurrent.ForkJoinTask#fork/*** Arranges to asynchronously execute this task in the pool the* current task is running in, if applicable, or using the {@link* ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While* it is not necessarily enforced, it is a usage error to fork a* task more than once unless it has completed and been* reinitialized. Subsequent modifications to the state of this* task or any data it operates on are not necessarily* consistently observable by any thread other than the one* executing it unless preceded by a call to {@link #join} or* related methods, or a call to {@link #isDone} returning {@code* true}.** @return {@code this}, to simplify usage*/public final ForkJoinTask<V> fork() {Thread t;// ForkJoinWorkerThread 中持有workQueue實(shí)例,可直接向其添加任務(wù)if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)((ForkJoinWorkerThread)t).workQueue.push(this);else// 如果是外部線程,則添加到一共享pool中即可,后續(xù)將其各空閑線程處理ForkJoinPool.common.externalPush(this);return this;}// java.util.concurrent.ForkJoinPool.WorkQueue#push/*** Pushes a task. Call only by owner in unshared queues. (The* shared-queue version is embedded in method externalPush.)** @param task the task. Caller must ensure non-null.* @throws RejectedExecutionException if array cannot be resized*/final void push(ForkJoinTask<?> task) {ForkJoinTask<?>[] a; ForkJoinPool p;int b = base, s = top, n;if ((a = array) != null) { // ignore if queue removedint m = a.length - 1; // fenced write for task visibilityU.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);U.putOrderedInt(this, QTOP, s + 1);if ((n = s - b) <= 1) {if ((p = pool) != null)p.signalWork(p.workQueues, this);}else if (n >= m)growArray();}}/*** A thread managed by a {@link ForkJoinPool}, which executes* {@link ForkJoinTask}s.* This class is subclassable solely for the sake of adding* functionality -- there are no overridable methods dealing with* scheduling or execution. However, you can override initialization* and termination methods surrounding the main task processing loop.* If you do create such a subclass, you will also need to supply a* custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to* {@linkplain ForkJoinPool#ForkJoinPool use it} in a {@code ForkJoinPool}.** @since 1.7* @author Doug Lea*/public class ForkJoinWorkerThread extends Thread {/** ForkJoinWorkerThreads are managed by ForkJoinPools and perform* ForkJoinTasks. For explanation, see the internal documentation* of class ForkJoinPool.** This class just maintains links to its pool and WorkQueue. The* pool field is set immediately upon construction, but the* workQueue field is not set until a call to registerWorker* completes. This leads to a visibility race, that is tolerated* by requiring that the workQueue field is only accessed by the* owning thread.** Support for (non-public) subclass InnocuousForkJoinWorkerThread* requires that we break quite a lot of encapsulation (via Unsafe)* both here and in the subclass to access and set Thread fields.*/final ForkJoinPool pool; // the pool this thread works infinal ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics...}
可見(jiàn),fork的過(guò)程,即是向當(dāng)前線程中添加當(dāng)前任務(wù)而已,并沒(méi)有所謂的上下文copy過(guò)程。
4.7. task.join 實(shí)現(xiàn)
join的語(yǔ)義是,等待任務(wù)完成后返回。與 Thread.join()一致。只是有一個(gè)問(wèn)題,即如果某個(gè)線程阻塞等待結(jié)果去了,那當(dāng)前線程自然就相當(dāng)于無(wú)法再被利用了。那后續(xù)的任務(wù)又何從談起呢?想來(lái)只有遞歸能夠解決這個(gè)問(wèn)題了。但是遞歸往往又是在單線程中完成的,這豈不無(wú)法利用并發(fā)特性了?
實(shí)際上,之所以被分作fork/join兩個(gè)步驟,意義就是在這。上一節(jié)我們看到,fork的過(guò)程是向隊(duì)列中添加了任務(wù),隨后就返回了。這時(shí),如果當(dāng)前worker比較繁忙(在做任務(wù)拆分),則這些任務(wù)就會(huì)被其他worker竊取過(guò)去處理了。而其他任務(wù)在處理時(shí),又會(huì)遇到自己的遞歸,從而將一個(gè)單線程的遞歸變?yōu)槎嗑€程的遞歸了。
下面我們主要看一個(gè)線程的遞歸過(guò)程。join的本義只是等待當(dāng)前任務(wù)完成,但是當(dāng)前任務(wù)完成又要依賴于其子任務(wù)完成join, 子任務(wù)又要等待其子任務(wù)join, 因此形成遞歸。而join()返回的表象是compute()完成,所以這過(guò)程其實(shí)是伴隨著compute的運(yùn)算的。
// java.util.concurrent.ForkJoinTask#join/*** Returns the result of the computation when it {@link #isDone is* done}. This method differs from {@link #get()} in that* abnormal completion results in {@code RuntimeException} or* {@code Error}, not {@code ExecutionException}, and that* interrupts of the calling thread do <em>not</em> cause the* method to abruptly return by throwing {@code* InterruptedException}.** @return the computed result*/public final V join() {int s;if ((s = doJoin() & DONE_MASK) != NORMAL)reportException(s);// 任務(wù)完成后,主動(dòng)獲取結(jié)果return getRawResult();}/*** Throws exception, if any, associated with the given status.*/private void reportException(int s) {if (s == CANCELLED)throw new CancellationException();if (s == EXCEPTIONAL)rethrow(getThrowableException());}// java.util.concurrent.RecursiveTask#getRawResultpublic final V getRawResult() {return result;}/*** Implementation for join, get, quietlyJoin. Directly handles* only cases of already-completed, external wait, and* unfork+exec. Others are relayed to ForkJoinPool.awaitJoin.** @return status upon completion*/private int doJoin() {int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;return (s = status) < 0 ? s :((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?// 取當(dāng)前任務(wù)執(zhí)行, doExec 執(zhí)行任務(wù),awaitJoin 等待執(zhí)行完成(w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s :wt.pool.awaitJoin(w, this, 0L) :externalAwaitDone();}// java.util.concurrent.ForkJoinPool#awaitJoin/*** Helps and/or blocks until the given task is done or timeout.** @param w caller* @param task the task* @param deadline for timed waits, if nonzero* @return task status on exit*/final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {int s = 0;if (task != null && w != null) {ForkJoinTask<?> prevJoin = w.currentJoin;U.putOrderedObject(w, QCURRENTJOIN, task);CountedCompleter<?> cc = (task instanceof CountedCompleter) ?(CountedCompleter<?>)task : null;for (;;) {if ((s = task.status) < 0)break;if (cc != null)helpComplete(w, cc, 0);// 遞歸添加任務(wù)等待完成else if (w.base == w.top || w.tryRemoveAndExec(task))helpStealer(w, task);if ((s = task.status) < 0)break;long ms, ns;if (deadline == 0L)ms = 0L;else if ((ns = deadline - System.nanoTime()) <= 0L)break;else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)ms = 1L;if (tryCompensate(w)) {task.internalWait(ms);U.getAndAddLong(this, CTL, AC_UNIT);}}U.putOrderedObject(w, QCURRENTJOIN, prevJoin);}return s;}// java.util.concurrent.ForkJoinPool.WorkQueue#tryRemoveAndExec/*** If present, removes from queue and executes the given task,* or any other cancelled task. Used only by awaitJoin.** @return true if queue empty and task not known to be done*/final boolean tryRemoveAndExec(ForkJoinTask<?> task) {ForkJoinTask<?>[] a; int m, s, b, n;if ((a = array) != null && (m = a.length - 1) >= 0 &&task != null) {while ((n = (s = top) - (b = base)) > 0) {for (ForkJoinTask<?> t;;) { // traverse from s to blong j = ((--s & m) << ASHIFT) + ABASE;if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)return s + 1 == top; // shorter than expectedelse if (t == task) {boolean removed = false;if (s + 1 == top) { // popif (U.compareAndSwapObject(a, j, task, null)) {U.putOrderedInt(this, QTOP, s);removed = true;}}else if (base == b) // replace with proxyremoved = U.compareAndSwapObject(a, j, task, new EmptyTask());// 執(zhí)行子任務(wù)if (removed)task.doExec();break;}else if (t.status < 0 && s + 1 == top) {if (U.compareAndSwapObject(a, j, t, null))U.putOrderedInt(this, QTOP, s);break; // was cancelled}if (--n == 0)return false;}if (task.status < 0)return false;}}return true;}
可見(jiàn),最終fork/join還是使用遞歸完成join任務(wù)等待。差別在于其利用了多線程的優(yōu)勢(shì),同時(shí)執(zhí)行多個(gè)任務(wù)。這有兩個(gè)好處,一是減輕了單線程的任務(wù)處理壓力,二是讓遞歸的深度也分擔(dān)到了多個(gè)點(diǎn)上。避免了棧早早溢出的可能。
只是每個(gè)線程被分配的任務(wù)數(shù)是多少,join需要等待的結(jié)果有多少,就不太好說(shuō)了。比如最上層的線程如果任務(wù)被別的線程搶走,則它就只需一直在等結(jié)果就行了。而最下面的線程,則需要承擔(dān)最深的遞歸深度,以保證程序的最終出口。其實(shí)從這個(gè)點(diǎn),我們自己可以做個(gè)猜想,如果沒(méi)有做好控制,讓線程之間任意執(zhí)行任務(wù),是否會(huì)造成死鎖呢?這恐怕是個(gè)問(wèn)題。

騰訊、阿里、滴滴后臺(tái)面試題匯總總結(jié) — (含答案)
面試:史上最全多線程面試題 !
最新阿里內(nèi)推Java后端面試題
JVM難學(xué)?那是因?yàn)槟銢](méi)認(rèn)真看完這篇文章

關(guān)注作者微信公眾號(hào) —《JAVA爛豬皮》
了解更多java后端架構(gòu)知識(shí)以及最新面試寶典


看完本文記得給作者點(diǎn)贊+在看哦~~~大家的支持,是作者源源不斷出文的動(dòng)力
作者:等你歸去來(lái)
出處:https://www.cnblogs.com/yougewe/p/14943418.html
