浅谈ForkJoinPool

ForkJoinPool是什么?

谈到线程池,不少人会想到Executors提供的一些预设的线程池,好比单线程线程池SingleThreadExecutor,固定大小的线程池FixedThreadPool,可是不多有人会注意到其中还提供了一种特殊的线程池:WorkStealingPool,咱们点进这个方法,会看到和其余方法不一样的是,这种线程池并非经过ThreadPoolExecutor来建立的,而是ForkJoinPool来建立的:算法

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
复制代码

这两种线程池之间并非继承关系,而是平级关系:数组

ThreadPoolExecutor应该都很了解了,就是一个基本的存储线程的线程池,须要执行任务的时候就从线程池中拿一个线程来执行。而ForkJoinPool则不单单是这么简单,一样也不是ThreadPoolExecutor的代替品,这种线程池是为了实现“ 分治法”这一思想而建立的,经过把大任务拆分红小任务,而后再把小任务的结果汇总起来就是最终的结果,和MapReduce的思想很相似

举个例子,咱们要统计1-100的累加和,若是使用ForkJoinPool来实现的话,就能够将1-100每5位划分一段,划分出20段,看成20个任务,每一个任务只计算本身区间内的结果,最后将这20个任务的结果汇总起来就是1-100的累加和bash

ForkJoinPool怎么使用?

ForkJoinPool的本质就是两点:并发

  1. 若是任务很小:直接计算得出结果
  2. 若是任务很大
    • 拆分红N个子任务
    • 调用子任务的fork()进行计算
    • 调用子任务的join()合并结果

接来下咱们来作一个1-100的累加例子:异步

  1. 首先定义咱们须要执行的任务:
class Task extends RecursiveTask<Integer> {

    private int start;

    private int end;
    private int mid;

    public Task(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        if (end - start < 6) {
            // 当任务很小时,直接进行计算
            for (int i = start; i <= end; i++) {
                sum += i;
            }
            System.out.println(Thread.currentThread().getName() + " count sum: " + sum);
        } else {
            // 不然,将任务进行拆分
            mid = (end - start) / 2 + start;
            Task left = new Task(start, mid);
            Task right = new Task(mid + 1, end);

            // 执行上一步拆分的子任务
            left.fork();
            right.fork();

            // 拿到子任务的执行结果
            sum += left.join();
            sum += right.join();
        }

        return sum;
    }
}
复制代码

这里的RecursiveTaskForkJoinTask的子类,ForkJoinTask又是Future的子类,不了解Future类的能够认为是一个异步执行,而且能够有返回值的Runnable类ide

咱们首先在Task类中定义了任务须要的一些数据,好比开始位置和结束位置。重点是其中的compute方法,在其中实现了咱们刚才说到的步骤,若是任务很小(经过任务数据来判断),就进行计算,不然将任务拆分,使用fork()执行,并经过join()拿到计算结果高并发

  1. 将任务提交到线程池

刚才咱们定义了任务类,接下来就须要把这个任务提交到线程池:ui

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        Task countTask = new Task(1, 100);
        ForkJoinTask<Integer> result = forkJoinPool.submit(countTask);

        System.out.println("result: " + result.get());

        forkJoinPool.shutdown();
    }
复制代码

注意,这里ForkJoinPool初始化能够传入一个并行参数,若是不传入该参数的话会默认使用处理器个数来做为并行参数this

建立任务对象和线程池以后,使用submit方法来提交任务,该方法会返回一个ForkJoinTask<T>类型的对象,调用其get方法便可拿到执行结果spa

同时要注意,该线程池也须要调用shutdown方法来关闭

ForkJoinPool的原理

ForkJoinPool中有三个重要角色:

  • ForkJoinWorkerThread:工做线程,在内部对Thread进行的封装
  • WorkQueue:任务队列
  • ForkJoinTask:任务,继承自Future,在含义上分为submission和task两种

在线程池中,任务队列使用数组来保存,其中保存了全部提交进来的任务:

  1. 奇数位置保存submission
  2. 偶数位置保存task

submission指的是本地提交的任务,如submit、execute提交的任务;而task则是经过fork方法添加的子任务。这两种任务仅仅在含义上有所区别,因此一同保存在任务队列中,经过位置进行区分

ForkJoinPool的核心

想理解ForkJoinPool的原理,就要理解其核心,一共有两点,其一是分治法,其二就是工做窃取算法。分治法相信就不用多说了,就是经过把大任务拆分红小任务来提升并发度。重点要说的就是工做窃取算法,该算法的原理:

全部线程均尝试找到并执行已提交的任务,或是经过其余任务建立的子任务

依赖于这种特性,来尽可能避免一个线程执行完本身的任务后“无所事事”的状况。同时,窃取顺序是FIFO的

相关文章
相关标签/搜索