本文共 22720 字,大约阅读时间需要 75 分钟。
image.png
和传统的线程池使用AQS的实现逻辑不同,ForkJoin
引入全新的结构来标识:
ForkJoinTask
任务的执行池,不再是传统执行池 Worker+Queue 的组合模式,而是维护了一个队列数组WorkQueue
,这样在提交任务和线程任务的时候大幅度的减少碰撞。WorkQueue
用于自己的执行线程Thread
,线程默认将会从top端选取任务用来执行 - LIFO。因为只有owner的Thread才能从top端取任务,所以在设置变量时, int top;
不需要使用 volatile
。fork
/join
方法用于分割任务以及聚合结果。ForkJoinPool变量基本说明
作为框架的提交入口,ForkJoinPool
管理着线程池中线程和任务队列,标识线程池是否还接收任务,显示现在的线程运行状态。本节,对这些控制量进行解释。
disrupter
这种高效率队列的开源实现,大家肯定会对cache line记忆犹新,他们通常的做法自己设置伪变量来填充,jdk1.8�中官网为我们带来了sun.misc.Contended,所以你如果阅读ForkJoinPool
源码可以发现该类也被sun.misc.Contended
标识。 几个重要变量: Pool
运行状态,使用bit位来标识不同状态,比如 // runState bits: SHUTDOWN must be negative, others arbitrary powers of two private static final int RSLOCK = 1; private static final int RSIGNAL = 1 << 1; private static final int STARTED = 1 << 2; private static final int STOP = 1 << 29; private static final int TERMINATED = 1 << 30; private static final int SHUTDOWN = 1 << 31;如果执行
runState & RSLOCK ==0
就能直接说明,目前的运行状态没有被锁住,其他情况一样。static final int MAX_CAP = 0x7fff; // max #workers - 1
也就是说他最大就占16位
Pool
的控制变量,类型是long - 说明有64位,每个部分都有不同的作用。我们使用十六进制来标识ctl,依次说明不同部分的作用。 long np = (long)(-parallelism); // offset ctl countsthis.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);0x xxxx-1 xxxx-2 xxxx-3 xxxx-4我为每个部分使用了数字来标识 - 1,2,3,4。
wait
或者park
,原因我们后面会提到,这个时候,虽然我们开启的线程数量是和并行数相同,但是实际真正执行的却不是这么多。TC 记录了我们一共开启了多少线程,而AC则记录了没有挂起的线程。active
的还是inactive
的,其他为是版本标识。WorkQueue[]
数组中的index。这里需要说明的是,ctl的后32位其实只能表示一个idle workers,那么我们如果有很多个idle worker要怎么办呢?老爷子使用的是stack
的概念来保存这些信息。后32位标识的是top的那个,我们能从top中的变量stackPred
追踪到下一个idle workerWorkQueue变量基本说明
WorkQueue
是一个双向列表,用于task的排队。
scanState: // versioned, <0: inactive; odd:scanning
如果WorkQueue
没有属于自己的owner
(下标为偶数的都没有),该值为 inactive 也就是一个负数。如果有自己的owner
,该值的初始值为其在WorkQueue[]
数组中的下标,也肯定是个奇数。
static final int SCANNING = 1; // false when running tasks static final int INACTIVE = 1 << 31; // must be negative
stackPred: 记录前任的 idle worker
config:index | mode。 如果下标为偶数的WorkQueue
,则其mode是共享类型。如果有自己的owner
默认是 LIFO
什么时候应该设置成 FIFO,注释中这么给的建议:
establishes local first-in-first-out scheduling mode for forked tasks that are never joined. This mode may be more appropriate than default locally stack-based mode in applications in which worker threads only process event-style asynchronous tasks. For default value, use {@code false}
qlock: 锁标识,在多线程往队列中添加数据,会有竞争,使用此标识抢占锁。
base:worker steal的偏移量,因为其他的线程都可以偷该队列的任务,所有base使用volatile
标识。
top:owner
执行任务的偏移量。
parker:如果 owner
挂起,则使用该变量做记录。
currentJoin: 当前正在join等待结果的任务。
currentSteal:当前执行的任务是steal过来的任务,该变量做记录。
ForkJoinTask变量基本说明
((s >>> 16) != 0)
表示需要signal
其他线程ForkJoinPool
提供的提交接口很多,不管提交的是Callable
、Runnable
、ForkJoinTask
最终都会转换成ForkJoinTask
类型的任务,调用方法externalPush(ForkJoinTask<?> task)
来进行提交逻辑。让我们来看看提交的过程:
如果第一次提交(或者是hash之后的队列还未初始化),调用externalSubmit
WorkQueue[n]
,这里的n是power of 2; 3. runState设置为开始状态。ThreadLocalRandom.getProbe()
hash后的数组中相应位置的WorkQueue
未初始化): 初始化WorkQueue
,通过这种方式创立的WorkQueue
均是SHARED_QUEUE
,scanState
为INACTIVE
WorkQueue
,lock住队列,将数据塞到array
top位置。如果添加成功,就用调用接下来要摊开讲的重要的方法signalWork
。如果hash之后的队列已经存在
signalWork
signalWork
signalWork
是fork/join框架中重要的方法之一,用于创建或者激活工作线程。本小节主要看它的源码实现,文章后面我们会总结它的使用场景。
final void signalWork(WorkQueue[] ws, WorkQueue q) { long c; int sp, i; WorkQueue v; Thread p; while ((c = ctl) < 0L) { // too few active if ((sp = (int)c) == 0) { // no idle workers if ((c & ADD_WORKER) != 0L) // too few workers tryAddWorker(c); break; } if (ws == null) // unstarted/terminated break; if (ws.length <= (i = sp & SMASK)) // terminated break; if ((v = ws[i]) == null) // terminating break; int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState int d = sp - v.scanState; // screen CAS long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred); if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) { v.scanState = vs; // activate v if ((p = v.parker) != null) U.unpark(p); break; } if (q != null && q.base == q.top) // no more work break; } }
在上面的章节我们已经具体的分析了ForkJoinPool
中的ctl的标识含义,我们知道当ctl<0
意味着active的线程还没有到达阈值,只有ctl<0
我们才会去讨论要不要创建或者激活新的线程。(int)ctl
很巧妙的拿到了ctl
的低16位
我们知道ctl
代表的是idle worker当低16位为0的时候,意味着此刻没有已经启动但是空闲的线程,如果在没有空闲的线程的情况下
(c & ADD_WORKER) != 0Lprivate static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
意味着我们再增加一个线程,也不能使ac为非负数(只要ctl的最高位为1,表明ac仍是负数) 。我们调用方法tryAddWorker
来创建工作线程。
ctl
来记录我们增加的线程, ctl
编号-1的16位和编号-2的16位均需要加1,表示active的worker加一,总的worker加一。成功后我们将调用createWorker
。ForkJoinWorkerThreadFactory
来产生一个ForkJoinWorkerThread
类型的线程,该线程将会把自己注册到Pool上,怎么注册的呢?实现在方法registerWorker
,前文我们已经提及,拥有线程的WorkQueue
只能出现在数组的奇数下标处。所以线程 首先,创建一个新的WorkQueue
,其次在数组WorkQueue[]
寻找奇数下标尚未初始化的位置,如果循环的次数大于数组长度,还可能需要对数组进行扩容,然后,设置这个WorkQueue
的 config 为 index | mode (下标和模式),scanState为 index (下标>0)。最后启动这个线程。线程的处理我们接下来的章节介绍。当我们发现我们还有idle worker
(即(int)ctl!=0L
),我们需要active其中的一个。
ctl
的编号-3的16位,标记inactive和版本控制,我们将编号-3设置为激活状态并且版本加一。编号-4的16位我们之前也说过,放置了top的挂起的线程的index
所以我们可以根据这个index
拿到WorkQueue
--- 意味着就是这个WorkQueue
的Owner
线程被挂起了。index
信息在这个挂起的线程的stackPred
int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState int d = sp - v.scanState; // screen CAS long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred); if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) { v.scanState = vs; // activate v if ((p = v.parker) != null) U.unpark(p); break; }更新完
ctl
后,我们将唤醒之前挂起的线程。通过上述的简单介绍,用户的任务的提交所经历的步骤就介绍完了。
ForkJoinWorkerThread
启动之后会调用pool
的runWorker
来获取任务执行。
final void runWorker(WorkQueue w) { w.growArray(); // allocate queue int seed = w.hint; // initially holds randomization hint int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift for (ForkJoinTask t;;) { if ((t = scan(w, r)) != null) w.runTask(t); else if (!awaitWork(w, r)) break; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift } }
代码很短,但是我们很容易读懂其中的意思。调用scan
尝试去偷取一个任务,然后调用runTask
或者awaitWork
,这里的scan
是框架的重要的实现,我们将详述上面的三个方法。
scan
代码如下:
private ForkJoinTask scan(WorkQueue w, int r) { WorkQueue[] ws; int m; if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) { int ss = w.scanState; // initially non-negative for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) { WorkQueue q; ForkJoinTask [] a; ForkJoinTask t; int b, n; long c; if ((q = ws[k]) != null) { if ((n = (b = q.base) - q.top) < 0 && (a = q.array) != null) { // non-empty long i = (((a.length - 1) & b) << ASHIFT) + ABASE; if ((t = ((ForkJoinTask ) U.getObjectVolatile(a, i))) != null && q.base == b) { if (ss >= 0) { if (U.compareAndSwapObject(a, i, t, null)) { q.base = b + 1; if (n < -1) // signal others signalWork(ws, q); return t; } } else if (oldSum == 0 && // try to activate w.scanState < 0) tryRelease(c = ctl, ws[m & (int)c], AC_UNIT); } if (ss < 0) // refresh ss = w.scanState; r ^= r << 1; r ^= r >>> 3; r ^= r << 10; origin = k = r & m; // move and rescan oldSum = checkSum = 0; continue; } checkSum += b; } if ((k = (k + 1) & m) == origin) { // continue until stable if ((ss >= 0 || (ss == (ss = w.scanState))) && oldSum == (oldSum = checkSum)) { if (ss < 0 || w.qlock < 0) // already inactive break; int ns = ss | INACTIVE; // try to inactivate long nc = ((SP_MASK & ns) | (UC_MASK & ((c = ctl) - AC_UNIT))); w.stackPred = (int)c; // hold prev stack top U.putInt(w, QSCANSTATE, ns); if (U.compareAndSwapLong(this, CTL, c, nc)) ss = ns; else w.scanState = ss; // back out } checkSum = 0; } } } return null; }
因为我们的WorkQueue
是有owner
线程的队列,我们可以知道以下信息:
我们首先通过random的r
来找到一个我们准备偷取的队列。
base
位置取到任务返回((k = (k + 1) & m) == origin)
)都没有偷到,我们就认为当前的active 线程过剩了,我们准备将当前的线程(即owner
)挂起,我们首先 index | INACTIVE
形成 ctl
的后32位;并行将ac减一。其次,将原来的挂起的top的index
记录到stackPred
中。tryRelease
激活。runTask
如果我们通过scan
偷到了任务,这个时候我们将进入执行状态:
final void runTask(ForkJoinTask task) { if (task != null) { scanState &= ~SCANNING; // mark as busy (currentSteal = task).doExec(); U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC execLocalTasks(); ForkJoinWorkerThread thread = owner; if (++nsteals < 0) // collect on overflow transferStealCount(pool); scanState |= SCANNING; if (thread != null) thread.afterTopLevelExec(); } }
首先scanState &= ~SCANNING;
标识该线程处于繁忙状态。
execLocalTasks
对线程所属的WorkQueue
内的任务进行LIFO执行。awaitWork
如果我们通过scan
一无所获,这个时候我们将进入执行状态:
private boolean awaitWork(WorkQueue w, int r) { if (w == null || w.qlock < 0) // w is terminating return false; for (int pred = w.stackPred, spins = SPINS, ss;;) { if ((ss = w.scanState) >= 0) break; else if (spins > 0) { r ^= r << 6; r ^= r >>> 21; r ^= r << 7; if (r >= 0 && --spins == 0) { // randomize spins WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc; if (pred != 0 && (ws = workQueues) != null && (j = pred & SMASK) < ws.length && (v = ws[j]) != null && // see if pred parking (v.parker == null || v.scanState >= 0)) spins = SPINS; // continue spinning } } else if (w.qlock < 0) // recheck after spins return false; else if (!Thread.interrupted()) { long c, prevctl, parkTime, deadline; int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK); if ((ac <= 0 && tryTerminate(false, false)) || (runState & STOP) != 0) // pool terminating return false; if (ac <= 0 && ss == (int)c) { // is last waiter prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred); int t = (short)(c >>> TC_SHIFT); // shrink excess spares if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl)) return false; // else use timed wait parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t); deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; } else prevctl = parkTime = deadline = 0L; Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport w.parker = wt; if (w.scanState < 0 && ctl == c) // recheck before park U.park(false, parkTime); U.putOrderedObject(w, QPARKER, null); U.putObject(wt, PARKBLOCKER, null); if (w.scanState >= 0) break; if (parkTime != 0L && ctl == c && deadline - System.nanoTime() <= 0L && U.compareAndSwapLong(this, CTL, c, prevctl)) return false; // shrink pool } } return true; }
active
状态(w.scanState >= 0
),则线程继续回去scan任务,如果发现自己是因为时间到了自动醒来,但是自己还是inactive
状态,也许,自己真的是多余的。线程也会执行结束,不再循环执行任务。从上节我们知道,我们获取任务之后,将调用任务的doExec
来具体执行任务:
final int doExec() { int s; boolean completed; if ((s = status) >= 0) { try { completed = exec(); } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) s = setCompletion(NORMAL); } return s; }
我们以RecursiveTask
为例:
protected final boolean exec() { result = compute(); return true; }
最终调用的是 compute
,我们举个example来看。
demo
求整数数组所有元素之和
public class ForkJoinCalculator implements Calculator { private ForkJoinPool pool; private static class SumTask extends RecursiveTask{ private long[] numbers; private int from; private int to; public SumTask(long[] numbers, int from, int to) { this.numbers = numbers; this.from = from; this.to = to; } @Override protected Long compute() { // 当需要计算的数字小于6时,直接计算结果 if (to - from < 6) { long total = 0; for (int i = from; i <= to; i++) { total += numbers[i]; } return total; // 否则,把任务一分为二,递归计算 } else { int middle = (from + to) / 2; SumTask taskLeft = new SumTask(numbers, from, middle); SumTask taskRight = new SumTask(numbers, middle+1, to); taskLeft.fork(); taskRight.fork(); return taskLeft.join() + taskRight.join(); } } } public ForkJoinCalculator() { // 也可以使用公用的 ForkJoinPool: // pool = ForkJoinPool.commonPool() pool = new ForkJoinPool(); } @Override public long sumUp(long[] numbers) { return pool.invoke(new SumTask(numbers, 0, numbers.length-1)); }}
这里需要注意的是其中的compute
fork
方法,获取join
方法的返回值。很明显,这里任务的 fork
、join
就是我们框架的核心,那么,它们都做了什么呢?
fork
fork的逻辑很简单。
public final ForkJoinTaskfork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; }
push
到自己所拥有的队列的top
位置。join
join
是一个等待结果的方法:
public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); }
那么,线程在doJoin
做了什么呢?
private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); }
externalAwaitDone
,首先,设置任务的status为signal状态,这样该任务执行结束之后会调用notifyAll
来唤醒自己;其次,阻塞自己,知道任务执行完成后把自己唤醒。awaitJoin
方法: currentJoin
表明自己正在等待该任务;w.base == w.top
或者 tryRemoveAndExec
返回 true说明自己所属的队列为空,也说明我们通过fork将本线程的任务已经被别的线程偷走,该线程也不会闲着,将会helpStealer
帮助帮助自己执行任务的线程执行任务(互惠互利,你来我往)tryCompensate
为 true,则阻塞本线程,等待任务执行结束的唤醒tryRemoveAndExec
如果join的任务还没有被执行,我们去自己的队列中去查找,看看任务是否不在top位置但是还是在队列中
while ((n = (s = top) - (b = base)) > 0) { for (ForkJoinTask t;;) { // traverse from s to b long j = ((--s & m) << ASHIFT) + ABASE; if ((t = (ForkJoinTask )U.getObject(a, j)) == null) return s + 1 == top; // shorter than expected else if (t == task) { boolean removed = false; if (s + 1 == top) { // pop if (U.compareAndSwapObject(a, j, task, null)) { U.putOrderedInt(this, QTOP, s); removed = true; } } else if (base == b) // replace with proxy removed = U.compareAndSwapObject( a, j, task, new EmptyTask()); if (removed) task.doExec(); break; } else if (t.status < 0 && s + 1 == top) { if (U.compareAndSwapObject(a, j, t, null)) U.putOrderedInt(this, QTOP, s); break; // was cancelled } if (--n == 0) return false; } if (task.status < 0) return false; }
EmptyTask
来占位,将任务取出来执行。helpStealer
,helpStealer
helpStealer
的原则是你帮助我执行任务,我也帮你执行任务。
奇数
下标,如果发现队列对象currentSteal
放置的刚好是自己要找的任务,则说明自己的任务被该队列A的owner
线程偷来执行base
)取出执行;
do { U.putOrderedObject(w, QCURRENTSTEAL, t); t.doExec(); // clear local tasks too } while (task.status >= 0 && w.top != top && (t = w.pop()) != null);
这是什么意思呢? 因为你在执行这个任务的时候,这个任务也可能fork出什么子任务push到当前线程,我们应该记录原来队列top的位置,然后在执行结束后,还回到top原来的位置。
w.base != w.top
),在不再帮助执行任务了。tryCompensate
如果自己等待的任务被偷走执行还没结束,自己的队列还有任务,我们需要做一些补偿
private boolean tryCompensate(WorkQueue w) { boolean canBlock; WorkQueue[] ws; long c; int m, pc, sp; if (w == null || w.qlock < 0 || // caller terminating (ws = workQueues) == null || (m = ws.length - 1) <= 0 || (pc = config & SMASK) == 0) // parallelism disabled canBlock = false; else if ((sp = (int)(c = ctl)) != 0) // release idle worker canBlock = tryRelease(c, ws[sp & m], 0L); else { int ac = (int)(c >> AC_SHIFT) + pc; int tc = (short)(c >> TC_SHIFT) + pc; int nbusy = 0; // validate saturation for (int i = 0; i <= m; ++i) { // two passes of odd indices WorkQueue v; if ((v = ws[((i << 1) | 1) & m]) != null) { if ((v.scanState & SCANNING) != 0) break; ++nbusy; } } if (nbusy != (tc << 1) || ctl != c) canBlock = false; // unstable or stale else if (tc >= pc && ac > 1 && w.isEmpty()) { long nc = ((AC_MASK & (c - AC_UNIT)) | (~AC_MASK & c)); // uncompensated canBlock = U.compareAndSwapLong(this, CTL, c, nc); } else if (tc >= MAX_CAP || (this == common && tc >= pc + commonMaxSpares)) throw new RejectedExecutionException( "Thread limit exceeded replacing blocked worker"); else { // similar to tryAddWorker boolean add = false; int rs; // CAS within lock long nc = ((AC_MASK & c) | (TC_MASK & (c + TC_UNIT))); if (((rs = lockRunState()) & STOP) == 0) add = U.compareAndSwapLong(this, CTL, c, nc); unlockRunState(rs, rs & ~RSLOCK); canBlock = add && createWorker(); // throws on exception } } return canBlock; }
((sp = (int)(c = ctl)) != 0)
说明还有 idle worker
则可以选择唤醒一个线程替代自己,挂起自己等待任务来唤醒自己。idle worker
则额外创建一个新的工作线程替代自己,挂起自己等待任务来唤醒自己。在了解了 Fork/Join Framework 的工作原理之后,相信很多使用上的注意事项就可以从原理中找到原因
针对最近很多人都在面试,我这边也整理了相当多的面试专题资料(spring、mybatis、jvm。。。带多了可以看附上的图片)和多家公司的面试真题。
上述面试题答案都整理成文档笔记。 也还整理了一些面试资料&最新2020收集的一些大厂的面试真题(都整理成文档,小部分截图),有需要的可以。
有人可能会说了:面试真题你全部放上来就好了。。。
你知道我最近整理了多少吗?
希望对大家有所帮助,有用的话点赞给我支持!
转载地址:http://axfvi.baihongyu.com/