在ForkJoinPool中,有invoke,submit,execute三个接口能够提交任务运行,这三个接口有什么区别?分别适用什么场景?less
综述:
当前线程是FJ工做线程且属于当前FJPool,execute,submit方法会调用forkOrSubmit将task加入到工做线程的任务队列,而invoke直接调用FJTask的exec()方法;
当前线程非FJ工做线程或者不属于当前FJPool,则最终都是调用 addSubmission(ForkJoinTask<?>) 将任务加入FJPool的任务队列,并分配一个FJ工做线程。
其中,submit,invoke有返回值(返回指派的任务的计算结果),而execute没有返回值。dom
1、execute
提交任务后,任务将被异步执行。它有2个实现,可将Runnable接口封装成 ForkJoinTask<?> 使用。异步
public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
forkOrSubmit(task);
}ide
public void execute(Runnable task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = ForkJoinTask.adapt(task, null); /*对Runnable接口进行封装*/
forkOrSubmit(job);
}oop
/**
* Unless terminating, forks task if within an ongoing FJ
* computation in the current pool, else submits as external task.
*/
private <T> void forkOrSubmit(ForkJoinTask<T> task) {
ForkJoinWorkerThread w;
Thread t = Thread.currentThread();
if (shutdown)
throw new RejectedExecutionException();
if ((t instanceof ForkJoinWorkerThread) &&
(w = (ForkJoinWorkerThread)t).pool == this)
w.pushTask(task);
/*若是当前调用该方法的线程是FJ工做线程,且属于当前FJPool,则将任务加入工做线程的执行队列;
这里与 ForkJoinTask.fork()相似,fork不判断是否属于当前FJPool*/
else
addSubmission(task); /*不然,加到提交任务队列,等待分配工做线程来执行*/
}ui
2、submit
提交一个任务,其本质与execute相同,区别是execute无返回值,submit会将提交给FJPool的ForkJoinTask<?> 返回给调用者;
由于调用的都是 forkOrSubmit 方法,所以submit的4个实现的实质是同样的。this
/**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(Callable<T> task) { /*Callable的返回值为泛型T,做为封装后的ForkJoinTask<T>的返回值*/
if (task == null)
throw new NullPointerException();
ForkJoinTask<T> job = ForkJoinTask.adapt(task);
forkOrSubmit(job);
return job;
}idea
/**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(Runnable task, T result) { /*Runnable不支持返回值,须要使用单独的参数指定返回值*/
if (task == null)
throw new NullPointerException();
ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
forkOrSubmit(job);
return job;
}.net
3、invoke线程
public <T> T invoke(ForkJoinTask<T> task) {
Thread t = Thread.currentThread();
if (task == null)
throw new NullPointerException();
if (shutdown)
throw new RejectedExecutionException();
if ((t instanceof ForkJoinWorkerThread) &&
((ForkJoinWorkerThread)t).pool == this) /*若是是当前FJPool的工做线程,则执行task,task.invoke将调用ForkJoinTask的exec()抽象方法,调用compute()方法*/
return task.invoke(); // bypass submit if in same pool
else {
addSubmission(task); /*将任务提交到队列,分配一个工做线程给它*/
return task.join();
}
}
/**
* Enqueues the given task in the submissionQueue. Same idea as
* ForkJoinWorkerThread.pushTask except for use of submissionLock.
*
* @param t the task
*/
private void addSubmission(ForkJoinTask<?> t) {
final ReentrantLock lock = this.submissionLock;
lock.lock();
try {
ForkJoinTask<?>[] q; int s, m;
if ((q = submissionQueue) != null) { // ignore if queue removed
long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE;
UNSAFE.putOrderedObject(q, u, t);
queueTop = s + 1;
if (s - queueBase == m)
growSubmissionQueue();
}
} finally {
lock.unlock();
}
signalWork(); /*Wakes up or creare a worker*/
}
4、其它接口
ForkJoinPool:
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
【输入一组Callable对象,执行后返回一组Future<?>】
ForkJoinTask:
public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks)
【输入一组ForkJoinTask<?>对象,返回一组一样的对象】
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
t2.fork();
t1.invoke();
t2.join();
}
【将t2加入任务队列,当即执行t1,并等待t2执行完毕】
public static void invokeAll(ForkJoinTask<?>... tasks)
【可变长入参个数,将任务加入到队列,待执行(异步)】
问题:
一、工做线程建立/分配后,如何获取待执行的任务?
public void run() {
Throwable exception = null;
try {
onStart();
pool.work(this); /*调用pool的work接口,将当前工做线程对象传入*/
} catch (Throwable ex) {
exception = ex;
} finally {
onTermination(exception);
}
}
final void work(ForkJoinWorkerThread w) {
boolean swept = false; // true on empty scans
long c;
while (!w.terminate && (int)(c = ctl) >= 0) {
int a; // active count
if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)
swept = scan(w, a); /*进行scan,work-stealing algorithm 在这里实现*/
else if (tryAwaitWork(w, c))
swept = false;
}
}
/**
* Scans for and, if found, executes one task. Scans start at a
* random index of workers array, and randomly select the first
* (2*#workers)-1 probes, and then, if all empty, resort to 2
* circular sweeps, which is necessary to check quiescence. and
* taking a submission only if no stealable tasks were found. The
* steal code inside the loop is a specialized form of
* ForkJoinWorkerThread.deqTask, followed bookkeeping to support
* helpJoinTask and signal propagation. The code for submission
* queues is almost identical. On each steal, the worker completes
* not only the task, but also all local tasks that this task may
* have generated. On detecting staleness or contention when
* trying to take a task, this method returns without finishing
* sweep, which allows global state rechecks before retry.
*
* @param w the worker
* @param a the number of active workers
* @return true if swept all queues without finding a task
*/
private boolean scan(ForkJoinWorkerThread w, int a) {
int g = scanGuard; // mask 0 avoids useless scans if only one active
int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
ForkJoinWorkerThread[] ws = workers;
if (ws == null || ws.length <= m) // staleness check
return false;
for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
ForkJoinWorkerThread v = ws[k & m]; 【从FJPool的workers取一个工做线程】
if (v != null && (b = v.queueBase) != v.queueTop &&
(q = v.queue) != null && (i = (q.length - 1) & b) >= 0) { 【获取该工做线程的任务队列】
long u = (i << ASHIFT) + ABASE;
if ((t = q[i]) != null && v.queueBase == b && 【从被偷的工做线程任务队列出取出一个任务】
UNSAFE.compareAndSwapObject(q, u, t, null)) {
int d = (v.queueBase = b + 1) - v.queueTop;
v.stealHint = w.poolIndex;
if (d != 0)
signalWork(); // propagate if nonempty
w.execTask(t); 【用指派的工做线程,执行上述偷来的任务】
}
r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
return false; // store next seed
}
else if (j < 0) { // xorshift
r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
}
else
++k;
}
if (scanGuard != g) // staleness check 【无效的检查】
return false;
else { // try to take submission
ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; 【必然走到这里,从FJPool的 submissionQueue 中获取任务来执行】
if ((b = queueBase) != queueTop &&
(q = submissionQueue) != null &&
(i = (q.length - 1) & b) >= 0) {
long u = (i << ASHIFT) + ABASE;
if ((t = q[i]) != null && queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
queueBase = b + 1;
w.execTask(t);
}
return false;
}
return true; // all queues empty
}
}
二、工做线程的任务队列,与FJPool的任务队列之间有什么联系?
【同上】
三、ForkJoinTask.fork() ForkJoinWorkerThread.pushTask(ForkJoinTask<?>)
fork调用了后者,将调用fork的任务对象,加入到当前工做线程的任务列表。
/** * The work-stealing queue array. Size must be a power of two. * Initialized when started (as oposed to when constructed), to * improve memory locality. */ ForkJoinTask<?>[] queue;