ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治以后递归调用的函数,例如 quick sort 等。
ForkJoinPool 最适合的是计算密集型的任务,若是存在 I/O,线程间同步,sleep() 等会形成线程长时间阻塞的状况时,最好配合使用 ManagedBlocker。java
static { // initialize field offsets for CAS etc try { U = sun.misc.Unsafe.getUnsafe(); Class<?> k = ForkJoinPool.class; CTL = U.objectFieldOffset (k.getDeclaredField("ctl")); RUNSTATE = U.objectFieldOffset (k.getDeclaredField("runState")); STEALCOUNTER = U.objectFieldOffset (k.getDeclaredField("stealCounter")); Class<?> tk = Thread.class; PARKBLOCKER = U.objectFieldOffset (tk.getDeclaredField("parkBlocker")); Class<?> wk = WorkQueue.class; QTOP = U.objectFieldOffset (wk.getDeclaredField("top")); QLOCK = U.objectFieldOffset (wk.getDeclaredField("qlock")); QSCANSTATE = U.objectFieldOffset (wk.getDeclaredField("scanState")); QPARKER = U.objectFieldOffset (wk.getDeclaredField("parker")); QCURRENTSTEAL = U.objectFieldOffset (wk.getDeclaredField("currentSteal")); QCURRENTJOIN = U.objectFieldOffset (wk.getDeclaredField("currentJoin")); Class<?> ak = ForkJoinTask[].class; ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } commonMaxSpares = DEFAULT_COMMON_MAX_SPARES; defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory(); modifyThreadPermission = new RuntimePermission("modifyThread"); common = java.security.AccessController.doPrivileged (new java.security.PrivilegedAction<ForkJoinPool>() { public ForkJoinPool run() { return makeCommonPool(); }}); int par = common.config & SMASK; // report 1 even if threads disabled commonParallelism = par > 0 ? par : 1; }
/** * Creates and returns the common pool, respecting user settings * specified via system properties. */ private static ForkJoinPool makeCommonPool() { int parallelism = -1; ForkJoinWorkerThreadFactory factory = null; UncaughtExceptionHandler handler = null; try { // ignore exceptions in accessing/parsing properties String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); String fp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.threadFactory"); String hp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); if (pp != null) parallelism = Integer.parseInt(pp); if (fp != null) factory = ((ForkJoinWorkerThreadFactory)ClassLoader. getSystemClassLoader().loadClass(fp).newInstance()); if (hp != null) handler = ((UncaughtExceptionHandler)ClassLoader. getSystemClassLoader().loadClass(hp).newInstance()); } catch (Exception ignore) { } if (factory == null) { if (System.getSecurityManager() == null) factory = defaultForkJoinWorkerThreadFactory; else // use security-managed default factory = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; if (parallelism > MAX_CAP) parallelism = MAX_CAP; return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, "ForkJoinPool.commonPool-worker-"); }
经过代码指定,必须得在commonPool初始化以前(parallel的stream被调用以前,通常可在系统启动后设置)注入进去,不然没法生效。
经过启动参数指定无此限制,较为安全算法
parallelism(即配置线程池个数
)
能够经过java.util.concurrent.ForkJoinPool.common.parallelism进行配置,最大值不能超过MAX_CAP,即32767.编程
static final int MAX_CAP = 0x7fff; //32767
若是没有指定,则默认为Runtime.getRuntime().availableProcessors() - 1.安全
代码指定(必须得在commonPool初始化以前注入进去,不然没法生效
)并发
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");
或者参数指定app
-Djava.util.concurrent.ForkJoinPool.common.parallelism=8
threadFactory
默认为defaultForkJoinWorkerThreadFactory,没有securityManager的话。less
/** * Default ForkJoinWorkerThreadFactory implementation; creates a * new ForkJoinWorkerThread. */ static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new ForkJoinWorkerThread(pool); } }
代码指定(必须得在commonPool初始化以前注入进去,不然没法生效
)dom
System.setProperty("java.util.concurrent.ForkJoinPool.common.threadFactory",YourForkJoinWorkerThreadFactory.class.getName());
参数指定ide
-Djava.util.concurrent.ForkJoinPool.common.threadFactory=com.xxx.xxx.YourForkJoinWorkerThreadFactory
exceptionHandler
若是没有设置,默认为null函数
/** * Callback from ForkJoinWorkerThread constructor to establish and * record its WorkQueue. * * @param wt the worker thread * @return the worker's queue */ final WorkQueue registerWorker(ForkJoinWorkerThread wt) { UncaughtExceptionHandler handler; wt.setDaemon(true); // configure thread if ((handler = ueh) != null) wt.setUncaughtExceptionHandler(handler); WorkQueue w = new WorkQueue(this, wt); int i = 0; // assign a pool index int mode = config & MODE_MASK; int rs = lockRunState(); try { WorkQueue[] ws; int n; // skip if no array if ((ws = workQueues) != null && (n = ws.length) > 0) { int s = indexSeed += SEED_INCREMENT; // unlikely to collide int m = n - 1; i = ((s << 1) | 1) & m; // odd-numbered indices if (ws[i] != null) { // collision int probes = 0; // step by approx half n int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; while (ws[i = (i + step) & m] != null) { if (++probes >= n) { workQueues = ws = Arrays.copyOf(ws, n <<= 1); m = n - 1; probes = 0; } } } w.hint = s; // use as random seed w.config = i | mode; w.scanState = i; // publication fence ws[i] = w; } } finally { unlockRunState(rs, rs & ~RSLOCK); } wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1))); return w; }
代码指定(必须得在commonPool初始化以前注入进去,不然没法生效
)
System.setProperty("java.util.concurrent.ForkJoinPool.common.exceptionHandler",YourUncaughtExceptionHandler.class.getName());
参数指定
-Djava.util.concurrent.ForkJoinPool.common.exceptionHandler=com.xxx.xxx.YourUncaughtExceptionHandler
// Mode bits for ForkJoinPool.config and WorkQueue.config static final int MODE_MASK = 0xffff << 16; // top half of int static final int LIFO_QUEUE = 0; static final int FIFO_QUEUE = 1 << 16; static final int SHARED_QUEUE = 1 << 31; // must be negative
控制是FIFO仍是LIFO
/** * Takes next task, if one exists, in order specified by mode. */ final ForkJoinTask<?> nextLocalTask() { return (config & FIFO_QUEUE) == 0 ? pop() : poll(); }
ForkJoinPool 的每一个工做线程都维护着一个工做队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
每一个工做线程在运行中产生新的任务(一般是由于调用了 fork())时,会放入工做队列的队尾,而且工做线程在处理本身的工做队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
每一个工做线程在处理本身的工做队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool的任务,或是来自于其余工做线程的工做队列),窃取的任务位于其余线程的工做队列的队首,也就是说工做线程在窃取其余工做线程的任务时,使用的是 FIFO 方式。
queue capacity
/** * Capacity of work-stealing queue array upon initialization. * Must be a power of two; at least 4, but should be larger to * reduce or eliminate cacheline sharing among queues. * Currently, it is much larger, as a partial workaround for * the fact that JVMs often place arrays in locations that * share GC bookkeeping (especially cardmarks) such that * per-write accesses encounter serious memory contention. */ static final int INITIAL_QUEUE_CAPACITY = 1 << 13; /** * Maximum size for queue arrays. Must be a power of two less * than or equal to 1 << (31 - width of array entry) to ensure * lack of wraparound of index calculations, but defined to a * value a bit less than this to help users trap runaway * programs before saturating systems. */ static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
超出报异常
/** * Initializes or doubles the capacity of array. Call either * by owner or with lock held -- it is OK for base, but not * top, to move while resizings are in progress. */ final ForkJoinTask<?>[] growArray() { ForkJoinTask<?>[] oldA = array; int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY; if (size > MAXIMUM_QUEUE_CAPACITY) throw new RejectedExecutionException("Queue capacity exceeded"); int oldMask, t, b; ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size]; if (oldA != null && (oldMask = oldA.length - 1) >= 0 && (t = top) - (b = base) > 0) { int mask = size - 1; do { // emulate poll from old array, push to new array ForkJoinTask<?> x; int oldj = ((b & oldMask) << ASHIFT) + ABASE; int j = ((b & mask) << ASHIFT) + ABASE; x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj); if (x != null && U.compareAndSwapObject(oldA, oldj, x, null)) U.putObjectVolatile(a, j, x); } while (++b != t); } return a; }