Java ForkJoin 解析[精品短文]

本文主要想了解两个地方:如何窃取任务、task如何等待(join) 代码基于 OpenJDK 12git

窃取算法(work-stealing)

ForkJoin-Paper-DougLea中能够看出:github

  • 每一个队列建立一个单独的线程来执行队列里的任务,线程和队列一一对应。
  • 队列使用的是双端队列,支持LIFO、FIFO。
  • 子任务会被放到线程(不必定是当前线程)的队列中。
  • 工做线程按照LIFO的顺序处理本身队列中数据。
  • 当一个工做线程处理完本身队列中数据的时候,会随机挑选一个工做线程,并“窃取”的该工做线程队列队尾的task。

到了这里就能够知道,窃取任务从其余线程队列的尾部窃取的了。算法

窃取算法优缺点

工做窃取算法的优势:充分利用线程进行并行计算,减小了线程间的竞争。bash

工做窃取算法的缺点:在某些状况下仍是存在竞争,好比双端队列里只有一个任务时。而且该算法会消耗了更多的系统资源,好比建立多个线程和多个双端队列。微信

Task 等待(join)

Join方法的主要做用是阻塞当前线程并等待获取结果。具体代码以下:less

public final V join() {
    int s;
    if (((s = doJoin()) & ABNORMAL) != 0)
        reportException(s);
    return getRawResult();
}
复制代码

首先,它调用了doJoin()方法,经过doJoin()方法获得当前任务的状态来判断返回什么结果,任务状态有4种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常(EXCEPTIONAL)。ui

  • 若是任务状态是已完成,则直接返回任务结果。
  • 若是任务状态是被取消,则直接抛出CancellationException。
  • 若是任务状态是抛出异常,则直接抛出对应的异常。

让咱们再来分析一下doJoin()方法的实现代码:this

/**
 * Implementation for join, get, quietlyJoin. Directly handles
 * only cases of already-completed, external wait, and
 * unfork+exec.  Others are relayed to ForkJoinPool.awaitJoin.
 *
 * @return status upon completion
 */
private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    return 
        //已完成,返回status
    	(s = status) < 0 ? s :
    	//未完成,若是当前线程是ForkJoinWorkerThread,从该线程中取出workQueue,并尝试将
        //当前task出队而后执行,执行的结果是完成则返回状态,不然使用当线程池所在的ForkJoinPool的awaitJoin方法等待
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) :
        //当前线程不是ForkJoinWorkerThread,调用externalAwaitDone方法
        //externalAwaitDone: Blocks a non-worker-thread until completion.
        externalAwaitDone();
}
/**
 * Pops the given task only if it is at the current top.
 */
final boolean tryUnpush(ForkJoinTask<?> task) {
    boolean popped = false;
    int s, cap; ForkJoinTask<?>[] a;
    if ((a = array) != null && (cap = a.length) > 0 &&
        (s = top) != base &&
        (popped = QA.compareAndSet(a, (cap - 1) & --s, task, null)))
        TOP.setOpaque(this, s);
    return popped;
}
/**
 * Primary execution method for stolen tasks. Unless done, calls
 * exec and records status if completed, but doesn't wait for * completion otherwise. * * @return status on exit from this method */ final int doExec() { int s; boolean completed; // 仅未完成的任务会运行,其余状况会忽略. if ((s = status) >= 0) { try { //exec是abstract方法 //调用ForkJoinTask子类中exec completed = exec(); } catch (Throwable rex) { completed = false; s = setExceptionalCompletion(rex); } if (completed) s = setDone(); } return s; } 复制代码

在doJoin()方法里,首先经过查看任务的状态,看任务是否已经执行完成,若是执行完成,则直接返回任务状态;若是没有执行完,则从任务队列中取出任务并执行。若是任务顺利执行完成,则设置任务状态为NORMAL,若是出现异常,则记录异常,并将任务状态设置为EXCEPTIONAL。spa

我的微信公众号:线程

我的github:

github.com/jiankunking

我的博客:

jiankunking.com

相关文章
相关标签/搜索