如何高效的使用并行流

在Java7以前想要并行处理大量数据是很困难的,首先把数据拆分红不少个部分,而后把这这些子部分放入到每一个线程中去执行计算逻辑,最后在把每一个线程返回的计算结果进行合并操做;在Java7中提供了一个处理大数据的fork/join框架,屏蔽掉了线程之间交互的处理,更加专一于数据的处理。java


Fork/Join框架

Fork/Join框架采用的是思想就是分而治之,把大的任务拆分红小的任务,而后放入到独立的线程中去计算,同时为了最大限度的利用多核CPU,采用了一个种工做窃取的算法来运行任务,也就是说当某个线程处理完本身工做队列中的任务后,尝试当其余线程的工做队列中窃取一个任务来执行,直到全部任务处理完毕。因此为了减小线程之间的竞争,一般会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行;在百度找了一张图算法

image

  • 使用RecursiveTask

《2020最新Java基础精讲视频教程和学习路线!》
使用Fork/Join框架首先须要建立本身的任务,须要继承RecursiveTask,实现抽象方法安全

protected abstract V compute();

实现类须要在该方法中实现任务的拆分,计算,合并;伪代码能够表示成这样:网络

if(任务已经不可拆分){
    return 顺序计算结果;
} else {
    1.任务拆分红两个子任务
    2.递归调用本方法,拆分子任务
    3.等待子任务执行完成
    4.合并子任务的结果
}
  • Fork/Join实战

任务:完成对一亿个天然数求和框架

咱们先使用串行的方式实现,代码以下:ide

long result = LongStream.rangeClosed(1, 100000000)
                .reduce(0, Long::sum);
System.out.println("result:" + result);
复制代码

使用Fork/Join框架实现,代码以下:post

public class SumRecursiveTask extends RecursiveTask<Long> {
    private long[] numbers;
    private int start;
    private int end;

    public SumRecursiveTask(long[] numbers) {
        this.numbers = numbers;
        this.start = 0;
        this.end = numbers.length;
    }

    public SumRecursiveTask(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        int length = end - start;
        if (length < 20000) {  //小于20000个就不在进行拆分
            return sum();
        }
        SumRecursiveTask leftTask = new SumRecursiveTask(numbers, start, start + length / 2); //进行任务拆分
        SumRecursiveTask rightTask = new SumRecursiveTask(numbers, start + (length / 2), end); //进行任务拆分
        leftTask.fork(); //把该子任务交友ForkJoinPoll线程池去执行
        rightTask.fork(); //把该子任务交友ForkJoinPoll线程池去执行
        return leftTask.join() + rightTask.join(); //把子任务的结果相加
    }


    private long sum() {
        int sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }


    public static void main(String[] args) {
        long[] numbers = LongStream.rangeClosed(1, 100000000).toArray();

        Long result = new ForkJoinPool().invoke(new SumRecursiveTask(numbers));
        System.out.println("result:" +result);
    }
}
复制代码
Fork/Join默认的线程数量就是你的处理器数量,这个值是由 Runtime.getRuntime().available- Processors()获得的。 可是你能够经过系统属性 java.util.concurrent.ForkJoinPool.common. parallelism来改变线程池大小,以下所示: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12"); 这是一个全局设置,所以它将影响代码中全部的并行流。目前还没法专为某个 并行流指定这个值。由于会影响到全部的并行流,因此在任务中经历避免网络/IO操做,不然可能会拖慢其余并行流的运行速度

parallelStream

以上咱们说到的都是在Java7中使用并行流的操做,Java8并无止步于此,为咱们提供更加便利的方式,那就是parallelStreamparallelStream底层仍是经过Fork/Join框架来实现的。学习

  • 常见的使用方式

1.串行流转化成并行流测试

LongStream.rangeClosed(1,1000)
                .parallel()
                .forEach(System.out::println);
复制代码

2.直接生成并行流大数据

List<Integer> values = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            values.add(i);
        }
        values.parallelStream()
                .forEach(System.out::println);
复制代码
  • 正确的使用parallelStream

咱们使用parallelStream来实现上面的累加例子看看效果,代码以下:

public static void main(String[] args) {
    Summer summer = new Summer();
    LongStream.rangeClosed(1, 100000000)
            .parallel()
            .forEach(summer::add);
    System.out.println("result:" + summer.sum);

}

static class Summer {
    public long sum = 0;

    public void add(long value) {
        sum += value;
    }
}
复制代码

运行结果以下:

result

运行以后,咱们发现运行的结果不正确,而且每次运行的结果都不同,这是为何呢? 这里其实就是错用parallelStream常见的状况,parallelStream是非线程安全的,在这个里面中使用多个线程去修改了共享变量sum, 执行了sum += value操做,这个操做自己是非原子性的,因此在使用并行流时应该避免去修改共享变量。

修改上面的例子,正确使用parallelStream来实现,代码以下:

long result = LongStream.rangeClosed(1, 100000000)
        .parallel()
        .reduce(0, Long::sum);
System.out.println("result:" + result);
复制代码

在前面咱们已经说过了fork/join的操做流程是:拆子部分,计算,合并结果;由于parallelStream底层使用的也是fork/join框架,因此这些步骤也是须要作的,可是从上面的代码,咱们看到Long::sum作了计算,reduce作了合并结果,咱们并无去作任务的拆分,因此这个过程确定是parallelStream已经帮咱们实现了,这个时候就必须的说说Spliterator

Spliterator是Java8加入的新接口,是为了并行执行任务而设计的。

public interface Spliterator<T> {
    boolean tryAdvance(Consumer<? super T> action);

    Spliterator<T> trySplit();

    long estimateSize();

    int characteristics();
}
复制代码

tryAdvance: 遍历全部的元素,若是还有能够遍历的就返回ture,不然返回false

trySplit: 对全部的元素进行拆分红小的子部分,若是已经不能拆分就返回null

estimateSize: 当前拆分里面还剩余多少个元素

characteristics: 返回当前Spliterator特性集的编码


总结

  1. 要证实并行处理比顺序处理效率高,只能经过测试,不能靠猜想(本文累加的例子在多台电脑上运行了屡次,也并不能证实采用并行来处理累加就必定比串行的快多少,因此只能经过多测试,环境不一样可能结果就会不一样)
  2. 数据量较少,而且计算逻辑简单,一般不建议使用并行流
  3. 须要考虑流的操做时间消耗
  4. 在有些状况下须要本身去实现拆分的逻辑,并行流才能高效
    • *
感谢你们能够耐心地读到这里。

原文连接:https://juejin.cn/post/690071...

相关文章
相关标签/搜索