ForkJoinPool 的使用以及原理

ForkJoinPool 是Java 1.7 引入的一种新的并发框架—— ForkJoin Framework。如下是重要的几点特性:java

  1. ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好。(见 Java Tip: When to use ForkJoinPool vs ExecutorService )
  2. ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治以后递归调用的函数,例如 quick sort 等。
  3. ForkJoinPool 最适合的是计算密集型的任务,若是存在 I/O,线程间同步,sleep() 等会形成线程长时间阻塞的状况时,最好配合使用 ManagedBlocker。

 

ForkJoinPool & ForkJoinTask 概述:算法

  • ForkJoinTask:咱们要使用 ForkJoin 框架,必须首先建立一个 ForkJoin 任务。它提供在任务中执行 fork() 和 join() 操做的机制,一般状况下咱们不须要直接继承 ForkJoinTask 类,而只须要继承它的子类,ForkJoin 框架提供了如下两个子类:
    • RecursiveAction:用于没有返回结果的任务。
    • RecursiveTask :用于有返回结果的任务。
  • ForkJoinPool :ForkJoinTask 须要经过 ForkJoinPool 来执行,任务分割出的子任务会添加到当前工做线程所维护的双端队列中,进入队列的头部。当一个工做线程的队列里暂时没有任务时,它会随机从其余工做线程的队列的尾部获取一个任务。

引用:https://www.infoq.cn/article/fork-join-introduction并发

 

ForkJoinPool 的使用

好比计算1至1000的正整数之和,使用ForkJoinPool 如何进行并行的计算。框架

package common.forkjoinpool;

public interface Calculator {
    long sumUp(long[] numbers);
}
package common.forkjoinpool;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinCalculator implements Calculator {
    private ForkJoinPool pool;

    public ForkJoinCalculator() {
        // 也可使用公用的 ForkJoinPool:
        // pool = ForkJoinPool.commonPool()
        pool = new ForkJoinPool();
    }

    @Override
    public long sumUp(long[] numbers) {
        return pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
    }

    private static class SumTask extends RecursiveTask<Long> {
        private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }


        @Override
        protected Long compute() {

            // 当须要计算的数字小于6时,直接计算结果
            if (to - from < 6) {
                long total = 0;
                for (int i = from; i <= to; i++) {
                    total += numbers[i];
                }
                return total;
                // 不然,把任务一分为二,递归计算
            } else {
                int middle = (from + to) / 2;
                SumTask taskLeft = new SumTask(numbers, from, middle);
                SumTask taskRight = new SumTask(numbers, middle + 1, to);
                taskLeft.fork();
                taskRight.fork();
                return taskLeft.join() + taskRight.join();
            }
        }
    }
}
package common.forkjoinpool;

import java.util.stream.LongStream;

public class Main {

    public static void main(String[] args) {
        long[] numbers = LongStream.rangeClosed(1, 1000).toArray();
        Calculator calculator = new ForkJoinCalculator();
        System.out.println(calculator.sumUp(numbers)); // 打印结果500500
    }
}

这段代码的核心方法是 less

@Override
protected Long compute() {

    // 当须要计算的数字小于6时,直接计算结果
    if (to - from < 6) {
        long total = 0;
        for (int i = from; i <= to; i++) {
            total += numbers[i];
        }
        return total;
        // 不然,把任务一分为二,递归计算
    } else {
        int middle = (from + to) / 2;
        SumTask taskLeft = new SumTask(numbers, from, middle);
        SumTask taskRight = new SumTask(numbers, middle + 1, to);
        taskLeft.fork();
        taskRight.fork();
        return taskLeft.join() + taskRight.join();
    }
}

经过 compute() 方法,分解任务,分而治之。async

 

ForkJoinTask 两种实现类型

ForkJoinTask 继承关系以下,ide

RecursiveTask 和 RecursiveAction 的区别:函数

RecursiveAction性能

它是一种没有任何返回值的任务。只是作一些工做,好比写数据到磁盘,而后就退出了。 一个RecursiveAction能够把本身的工做分割成更小的几块, 这样它们能够由独立的线程或者CPU执行。 咱们能够经过继承来实现一个RecursiveAction。ui

RecursiveTask

它是一种会返回结果的任务。能够将本身的工做分割为若干更小任务,并将这些子任务的执行合并到一个集体结果。 能够有几个水平的分割和合并。

RecursiveAction以下使用方法,

package common.forkjointask;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

public class MyRecursiveAction extends RecursiveAction {

    /**
     * 每一个"小任务"最多只打印20个数
     */
    private static final int MAX = 20;

    private int start;
    private int end;

    public MyRecursiveAction(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected void compute() {
        //当end-start的值小于MAX时,开始打印
        if ((end - start) < MAX) {
            for (int i = start; i < end; i++) {
                System.out.println(Thread.currentThread().getName() + "-i的值" + i);
            }
        } else {
            // 将大任务分解成两个小任务
            int middle = (start + end) / 2;
            MyRecursiveAction left = new MyRecursiveAction(start, middle);
            MyRecursiveAction right = new MyRecursiveAction(middle, end);
            left.fork();
            right.fork();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 建立包含Runtime.getRuntime().availableProcessors()返回值做为个数的并行线程的ForkJoinPool
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // 提交可分解的PrintTask任务
        forkJoinPool.submit(new MyRecursiveAction(0, 1000));

        while (!forkJoinPool.isTerminated()) {
            forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
        }
        // 关闭线程池
        forkJoinPool.shutdown();
    }

}

这段代码虽然打印了0-999这一千个数字,可是并非连续打印的,这是由于程序将这个打印任务进行了分解,分解后的任务会并行执行,因此不会按顺序打印。

 

ForkJoinPool 简单分析

commonPool的建立 - ForkJoinPool.commPool() 

翻看源码,ForkJoinPool 有这么一个 字段,

/**
     * Common (static) pool. Non-null for public use unless a static
     * construction exception, but internal usages null-check on use
     * to paranoically avoid potential initialization circularities
     * as well as to simplify generated code.
     */
    static final ForkJoinPool common;

这就是 commonPool ,是ForkJoinPool 在类加载时候建立的,

/**
     * Creates and returns the common pool, respecting user settings
     * specified via system properties.
     */
    private static ForkJoinPool makeCommonPool() {
        int parallelism = -1;
        ForkJoinWorkerThreadFactory factory = null;
        UncaughtExceptionHandler handler = null;
        try {  // ignore exceptions in accessing/parsing properties
            String pp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.parallelism");
            String fp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
            String hp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
            if (pp != null)
                parallelism = Integer.parseInt(pp);
            if (fp != null)
                factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                           getSystemClassLoader().loadClass(fp).newInstance());
            if (hp != null)
                handler = ((UncaughtExceptionHandler)ClassLoader.
                           getSystemClassLoader().loadClass(hp).newInstance());
        } catch (Exception ignore) {
        }
        if (factory == null) {
            if (System.getSecurityManager() == null)
                factory = defaultForkJoinWorkerThreadFactory;
            else // use security-managed default
                factory = new InnocuousForkJoinWorkerThreadFactory();
        }
        if (parallelism < 0 && // default 1 less than #cores
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
            parallelism = 1;
        if (parallelism > MAX_CAP)
            parallelism = MAX_CAP;
        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                                "ForkJoinPool.commonPool-worker-");
    }

在makeCommonPool 方法中建立commonPool的时候 调用的构造函数以下,

/**
     * Creates a {@code ForkJoinPool} with the given parameters, without
     * any security checks or parameter validation.  Invoked directly by
     * makeCommonPool.
     */
    private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }

重要参数解释(咱们仍是结合英文注释来看):

1. parallelism:并行度( the parallelism level),默认状况下跟咱们机器的cpu个数保持一致,使用 Runtime.getRuntime().availableProcessors()能够获得咱们机器运行时可用的CPU个数

2. factory:建立新线程的工厂( the factory for creating new threads)。默认状况下使用ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory。

3. handler:线程异常状况下的处理器,该处理器在线程执行任务时因为某些没法预料到的错误而致使任务线程中断时进行一些处理,默认状况为null。

4. asyncMode:这个参数要注意,在ForkJoinPool中,每个工做线程都有一个独立的任务队列,asyncMode表示工做线程内的任务队列是采用何种方式进行调度,能够是先进先出FIFO,也能够是后进先出LIFO。若是为true,则线程池中的工做线程则使用先进先出方式进行任务调度,默认状况下是false。

ForkJoinPool 有一个 Async Mode ,效果是工做线程在处理本地任务时也使用 FIFO 顺序。这种模式下的 ForkJoinPool 更接近因而一个消息队列,而不是用来处理递归式的任务。

 

ForkJoinPool work stealing 算法

  1. ForkJoinPool 的每一个工做线程都维护着一个工做队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
  2. 每一个工做线程在运行中产生新的任务(一般是由于调用了 fork())时,会放入工做队列的队尾,而且工做线程在处理本身的工做队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
  3. 每一个工做线程在处理本身的工做队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其余工做线程的工做队列),窃取的任务位于其余线程的工做队列的队首,也就是说工做线程在窃取其余工做线程的任务时,使用的是 FIFO 方式。
  4. 在遇到 join() 时,若是须要 join 的任务还没有完成,则会先处理其余任务,并等待其完成。
  5. 在既没有本身的任务,也没有能够窃取的任务时,进入休眠。

 

ForkJoinTask fork 方法

fork() 作的工做只有一件事,既是把任务推入当前工做线程的工做队列里。能够参看如下的源代码:

public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

 

ForkJoinTask join 方法

join() 的工做则复杂得多,也是 join() 可使得线程免于被阻塞的缘由——不像同名的 Thread.join()

  1. 检查调用 join() 的线程是不是 ForkJoinThread 线程。若是不是(例如 main 线程),则阻塞当前线程,等待任务完成。若是是,则不阻塞。
  2. 查看任务的完成状态,若是已经完成,直接返回结果。
  3. 若是任务还没有完成,但处于本身的工做队列内,则完成它。
  4. 若是任务已经被其余的工做线程偷走,则窃取这个小偷的工做队列内的任务(以 FIFO 方式),执行,以期帮助它早日完成欲 join 的任务。
  5. 若是偷走任务的小偷也已经把本身的任务所有作完,正在等待须要 join 的任务时,则找到小偷的小偷,帮助它完成它的任务。
  6. 递归地执行第5步。

将上述流程画成序列图的话就是这个样子:

 

ForkJoinPool.submit 方法

public static void main(String[] args) throws InterruptedException {
        // 建立包含Runtime.getRuntime().availableProcessors()返回值做为个数的并行线程的ForkJoinPool
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // 提交可分解的PrintTask任务
        forkJoinPool.submit(new MyRecursiveAction(0, 1000));

        while (!forkJoinPool.isTerminated()) {
            forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
        }
        // 关闭线程池
        forkJoinPool.shutdown();
    }

其实除了前面介绍过的每一个工做线程本身拥有的工做队列之外,ForkJoinPool 自身也拥有工做队列,这些工做队列的做用是用来接收由外部线程(非 ForkJoinThread 线程)提交过来的任务,而这些工做队列被称为 submitting queue 。

submit() 和 fork() 其实没有本质区别,只是提交对象变成了 submitting queue 而已(还有一些同步,初始化的操做)。submitting queue 和其余 work queue 同样,是工做线程”窃取“的对象,所以当其中的任务被一个工做线程成功窃取时,就意味着提交的任务真正开始进入执行阶段。

参考:http://blog.dyngr.com/blog/2016/09/15/java-forkjoinpool-internals/

=======END=======

相关文章
相关标签/搜索