Java 8 (6) Stream 流 - 并行数据处理与性能

在Java 7以前,并行处理集合很是麻烦。首先你要明确的把包含数据的数据结构分红若干子部分,而后你要把每一个子部分分配一个独立的线程。而后,你须要在恰当的时候对他们进行同步来避免竞争,等待全部线程完成。最后,把这些部分结果合并起来。Java 7中引入了一个叫作 分支/合并的框架,让这些操做更稳定,更不容易出错。java

并行流算法

  使用Stream接口能够方便的处理它的元素,能够经过对收集源调用parallelStream方法来把集合转换为并行流。并行流就是一个把内容分红多个数据块,并用不一样的线程分别处理每一个数据块的流。这样就能够把给定操做的工做负荷分配给多核处理器的全部内核,让它们都忙起来。编程

例如求和:1到10000之间的和。数组

    //求和
    public static long getSum(long n){
        return Stream.iterate(1L,i->i+1).limit(n).reduce(0L,Long::sum);
    }

这段代码等价于传统Java:数据结构

    //求和
    public static long getSum(long n){
        long sum = 0;
        for(long i = 1L;i<=10000;i++){
            sum += i;
        }
        return sum;
    }

将顺序流转换为并行流app

  只须要对顺序流调用parallel方法便可转换为并行流:框架

    public static long getSum(long n){
        return Stream.iterate(1L,i->i+1).limit(n).parallel().reduce(0L,Long::sum);
    }

这段代码在内部将Stream分红了几块,所以能够对不一样的块独立进行概括操做。最后,同一个概括操做会将各个子流的部分概括结果合并起来,获得整个原始流结果。异步

对顺序流执行parallel方法并不意味着流自己有任何实际的变化,它内部就是一个布尔值,表示parallel以后进行的操做都并行执行,只须要对并行流调用sequential方法就能够变回顺序流。这两个方法能够结合起来,在须要并行的时候并行,须要串行的时候串行。ide

    Stream.parallel()
          .filter(...)
          .sequential()
          .map(...)
          .parallel()
          .reduce();

可是 最后一次parallel或sequential调用会影响整个流水线,上面的例子流水线会并行执行,由于最后调用的是它。函数

 

并行流内部使用了默认的ForkJoinPool,它默认的线程数量就是你的处理器数量,这个值是由Runtime.getRuntime().availableProcessors()获得的,能够经过系统属性java.util.concurrent.ForkJoinPool.common.parallelism来改变线程池的大小,例如:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12")。这是一个全局属性,意味着全部的并行操做都会受影响。通常不建议修改它。

 

对这三个方法进行测量:

编写一个测量方法,这个方法接受一个函数和一个long参数,他会对传给方法的long应用函数10次,记录每次执行的时间(毫秒),并返回最短的一次执行时间:

    public static long measureSumPerf(Function<Long, Long> adder, long n) {
        long fastest = Long.MAX_VALUE;
        for (int i = 0; i < 10; i++) {
            long start = System.nanoTime();
            long sum = adder.apply(n);
            long duration = (System.nanoTime() - start) / 1_000_000;
            System.out.println("Result: " + sum);
            if (duration < fastest) fastest = duration;
        }
        return fastest;
    }

 

    //iterate
    public static long getSum(long n){
        return Stream.iterate(1L,i->i+1).limit(n).reduce(0L,Long::sum);
    }
    //iterate Parallel
    public static long getSumParallel(long n){
        return Stream.iterate(1L,i->i+1).limit(n).parallel().reduce(0L,Long::sum);
    }
    //Java Old
    public static long getSumOldJava(long n){
        long sum = 0;
        for(int i = 0;i<=n;i++){
            sum += i;
        }
        return sum;
    }
System.out.println(measureSumPerf(Main::getSum,10000000)); //105
System.out.println(measureSumPerf(Main::getSumParallel,10000000)); //147
System.out.println(measureSumPerf(Main::getSumOldJava,10000000)); //5

用传统for循环的方式是最快的,由于它更为底层,更重要的是不须要对原始类型进行任何装箱或拆箱操做。他才5毫秒便可完成。

顺序化执行结果为105毫秒,

用并行化进行测试,结果竟然是最慢的 147毫秒,由于iterate生成的是装箱的对象,必须拆箱成数字才能求和,而且咱们很难把iterate分红多个独立块来进行并行执行。

这意味着 并行化编程可能很复杂,若是用的不对,它甚至会让程序的总体性能更差。

LongStream.rangeClosed方法与iterate相比有两个优势:

1.LongStream.rangeClosed直接产生原始类型的long数字,没有装箱和拆箱。

2.LongStream.rangeClosed会生成数字范围,很容易拆分为独立的小块。

    //5
    public static long GetRangeClosedSum(long n){
        return LongStream.rangeClosed(1,n).reduce(0L,Long::sum);
    }

顺序化的LongStream.rangeClosed 只花费了5毫秒,他比iterate顺序化要快得多,由于他没有装箱和拆箱。再来看看并行化:

    //1
    public static long GetRangeClosedSumParallel(long n){
        return LongStream.rangeClosed(1,n).parallel().reduce(0L,Long::sum);
    }

LongStream.rangeClosed 调用parallel方法后,执行只使用了1毫秒,终于能够像上面图中同样并行了,并行化过程自己须要对流作递归划分,把每一个子流的概括操做分配到不一样的线程,而后把这些操做的结果合并成一个值。

 

正确使用并行流
  错用并行流而产生错误的首要缘由,就是使用的算法改变了某些共享状态,例如 另外一种实现对n个天然数求和的方法,但这会改变一个共享累加器:

public class Accumulator {
    public long total = 0;
    public void add(long value){
        total += value;
    }
}
    public static long sideEffectSum(long n){
        Accumulator accumulator = new Accumulator();
        LongStream.rangeClosed(1,n).forEach(Accumulator::add);
        return accumulator.total;
    }

这种代码本质就是顺序的,每次访问total都会出现数据竞争。若是你尝试用同步来修复,那就彻底失去并行的意义了。咱们试着在forEach前加入parallel方法使其并行化:

public static long sideEffectSum(long n){
        Accumulator accumulator = new Accumulator();
        LongStream.rangeClosed(1,n).parallel().forEach(Accumulator::add);
        return accumulator.total;
    }

调用上面的测试方法:

System.out.println(measureSumPerf(Main::sideEffectSum,10000000));
Result: 10140890625203
Result: 9544849565325
Result: 6438093946815
Result: 11805543046590
Result: 6658954367405
Result: 4642751863823
Result: 5948081550315
Result: 7219270279482
Result: 7258008360508
Result: 4898539133022
1

性能可有可无了,由于结果都是错误的,每次执行都会返回不一样的结果,都离正确值差很远。这是因为多个线程在同时访问累加器,执行total+=value;foreach中调用的方法会改变多个线程共享对象的可变状态。 共享可变状态会影响并行流以及并行计算。

 

如何使用并行流

  1.测量,把顺序流转换成并行流很容易,但不必定性能会提高。并行流不必定老是比顺序流快,因此使用并行流时对其和顺序流进行测量。

  2.留意装箱。自动装箱和拆箱操做会大大下降性能,Java 8中又原始类型流(IntStream、LongStream、DoubleStream)来避免这些操做。

  3.有些操做自己在并行流上的性能就比顺序流差。特别是limit何findFirst等依赖于元素顺序的操做,他们在并行流上执行的代价很是大。例如,findAny会比findFrist性能好,由于它不必定要按照顺序来执行。

  4.对于小数据量,不建议使用并行流。

  5.要考虑流背后的数据结构是否易于分解。例如,ArrayList的拆分效率比LinkedList高的多,由于ArrayList用不着遍历就能够拆分,而LinkedList必须遍历。另外,用range方法建立的原始类型流也能够快速分解。

 

分支/合并框架

  分支/合并框架的目的是以递归的方式将能够并行的任务拆分红更小的任务,而后将每一个子任务结果合并起来成为总体结果。它是ExecutorService接口的一个实现,它把子任务分配给线程池(ForkJoinPool)中的工做线程。

1.使用RecursiveTask

  要把任务提交到这个池,必须建立RecursiveTask<R>的一个子类,其中R是并行化任务(以及全部子任务)产生的结果类型,或者若是任务不返回结果,则是RecursiveAction类型(它可能会更新其余非局部机构)。要定义RecursiveTask,只需实现它惟一的抽象方法compute,这个方法同时定义了将任务拆分红子任务的逻辑,以及没法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。相似如下伪代码:

        if(任务足够小或不可分) {
            顺序计算该任务
        }else{
            将任务拆分为两个子任务 
            递归调用本方法,拆分每一个子任务,等待全部子任务完成 
            合并每一个子任务的结果
        }

通常来讲没有确切的标准来绝对一个任务是否能够被拆分,可是有几种试探方法能够查看是否能够拆分。

 之前面的求和例子为基础,咱们试着用这个框架为一个数字范围long[]数组求和,首选须要为RecursiveTask类作一个实现,ForkJoinSumCalculator

public class ForkJoinSumCalculator extends java.util.concurrent.RecursiveTask<Long> {

    //要求和的数组
    private final long[] numbers;
    //子任务处理的数组的开始位置。
    private final int start;
    //子任务处理的数组的终止位置
    private final int end;
    //不可将任务分解为子任务的数组大小
    public static final long THRESHOLD = 10000;

    //共用构造函数用于建立主任务
    public ForkJoinSumCalculator(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    //私有构造方法用于以递归方式为主任务建立子任务
    private ForkJoinSumCalculator(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        //该任务负责求和的部分的大小
        int length = end - start;
        if (length <= THRESHOLD) {
            return computeSequentially(); //若是大小小于或等于阀值,顺序计算结果
        }
        //建立一个子任务来为数组的前一半进行求和
        ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);
        //利用另外一个ForkJoinPool线程异步执行新建立的子任务
        leftTask.fork();
        //建立一个任务为数组的后一半进行求和
        ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);

        //同步执行第二个子任务,有可能容许进一步递归划分
        Long rightResult = rightTask.compute();
        //读取第一个子任务的结果,若是还没有完成就等待
        Long leftResult = leftTask.join();
        //合并两个任务的结果
        return leftResult + rightResult;
    }

    //在子任务不可再分时计算结果
    private long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
}

如今就能够经过调用构造函数来求和了:

        long[] numbers = LongStream.rangeClosed(1,10000000).toArray();
        ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
        System.out.println(new ForkJoinPool().invoke(task));

这里使用LongStream.rangeClosed生成了一个long 数组,而后建立了一个ForkJoinTask的父类,并把数组传递给ForkJoinSumCalculator的公共构造函数,最后建立爱你了一个新的ForkJoinPool,并把任务传给它调用方法。在ForkJoinPool中执行时,最后一个方法返回的值就是ForkJoinSumCalculator类定义的任务结果。

在实际应用时,使用多个ForkJoinPool是没有意义的,通常来讲把它实例化一次,而后把实力保存在静态字段中,使之成为单例。这样就能够在任何地方方便的重用了。

运行ForkJoinSumCalculator

  当把ForkJoinSumCalculator任务传给ForkJoinPool时,这个任务就由池中的一个线程执行,这个线程会调用任务的compute方法。该方法会检查任务是否小到足以顺序执行,若是不够小则会把要求和的数组分红两半,分给两个新的ForkJoinSumCalculator,而它们也由ForkJoinPool安排执行。所以,这一过程能够递归重复,把原任务分为更小的任务,直到知足不方便或不可能再进一步拆分的条件。这时会顺序计算每一个任务的结果,而后由分支过程建立的任务二叉树遍历回到它的根。接下来回合并每一个子任务的部分结果,从而获得总任务的结果。

System.out.println(measureSumPerf(Main::forkJoinSum,10000000)); // 79

执行速度为79毫秒,是由于必须先把整个数字流都放进一个long数组,以后才能在ForkJoinSumCalculator任务中使用它。

 

使用分支/合并框架的最佳作法

1.对一个任务调用join方法会阻塞调用方,直到该任务作出结果。所以,有必要在两个字任务的计算都开始后再调用它。不然,你获得的版本会比原始的顺序算法更慢更复杂,由于每一个子任务都必须等待另外一个子任务完成才能启动。

2.不该该在RecursiveTask内部使用ForkJoinPool的invoke方法。相反,你应该始终直接调用compute或fork方法,只有顺序代码才应该用invoke来启动并行计算。

3.对于子任务调用fork方法能够把它排进ForkJoinPool。同时对左边和右边的子任务调用它彷佛很天然,但这样的效率要比直接对其中一个调用compute低。这样作你能够为其中一个子任务重用同一个线程,从而避免在线程池中多分配一个任务形成的开销。

4.对于分支/合并拆分策略你必须选择一个标准,来决定任务是要进一步拆分仍是已到能够顺序求值。

 

工做窃取

  在ForkJoinSumCalculator的例子中,设置的阀值是10000,就是在数组为10000时就不会再建立子任务了。在测试案例中,咱们先有了一个1000万的数组,意味着ForkJoinSumCalculator至少会分出1000个子任务来。分出大量的小任务通常来讲是一个好的选择,理想的状况下,划分并行任务时,应该让每一个任务都用彻底相同时间完成,让全部的CPU都一样繁忙,但在实际中,每一个子任务所花费的时间可能天差地别,要么是由于划分策略效率低,要么是有不可预知的缘由,好比磁盘访问慢,或是须要和外部服务协调执行。

  分支/合并框架工程用一种称为工做窃取(work stealing)的技术来解决这个问题。在实际应用中,这意味着这些任务差很少被平均分配到ForkJoinPool中的全部线程上。每一个线程都为分配给它的任务保存一个双向链式队列,没完成一个任务,他就会从队列头上取出下一个任务开始执行。基于前面所述的缘由,某个线程可能早早完成了分配给它的任务,也就是它的队列已经空了,而其余的线程还很忙。这时,这个线程没有闲下来,而是随机选了一个别的线程,从队列的尾巴上偷走一个任务。这个过程一直继续下去,直到全部的任务都执行完毕,全部队列都清空。这就是为何划成许多小任务而不是少数几个大任务,这样有助于更好地在工做线程之间平衡负载。

如今应该清楚 流 是如何使用 分支/合并 框架来并行处理它的项目了。本例中咱们明确指定了将数组拆分红多少个任务的逻辑。可是,使用并行流时就不用这么作了,有一种机制来为你拆分流。

 

Spliterator

  Spliterator是Java 8 中加入的另外一个新接口,这个名字表明“可分迭代器”,和Iterator同样,Spliterator也用于遍历数据源中的元素,但它是为了并行执行而设计的。Java 8已经为集合框架中包含了全部数据结构提供了一个默认的Spliterator实现。 集合实现了Spliterator接口,接口提供了一个spliterator方法。这个接口定义了若干方法:

public interface Spliterator<T> {
    
    boolean tryAdvance(Consumer<? super T> action);

    Spliterator<T> trySplit();

    long estimateSize();

    int characteristics();

}

T是Spliterator遍历的元素类型,tryAdvance方法的行为相似于普通的Iterator,由于它会按顺序一个个地使用Spliterator中的元素,而且若是还有其余元素要遍历就返回true。但trySplit是专为Spliterator接口设计的,由于它能够把一些元素划出去分给第二个Spliterator,让它们两个并行处理。Spliterator还可经过estimateSize,让它们两个并行处理。Spliterator还可经过estimateSize方法估计还剩下多少元素要遍历,由于即便不那么确切,能快速算出来是一个值也有助于让拆分均匀一点。

拆分过程

  将Stream拆分红多个部分的算法是一个递归过程,如图所示,第一步是对第一个Spliterator调用trySplit,生成第二个Spliterator。第二步对这两个Spliterator调用trySplit直到它返回null,代表它处理的数据结构不能再分割,第三部,这个递归拆分过程到第四步就终止了,这时全部的Spliterator在调用trySplit时都返回了null。

这个拆分过程也受Spliterator自己特性影响,而特性是经过characteristics方法声明的。

Spliterator的特性

Spliterator接口声明的最后一个抽象方法是characteristics,它返回一个int,表明Spliterator自己特性集的编码。

  ORDERED : 元素有既定的顺序,所以Spliterator在遍历和划分时也遵循这一点

  DISTINCT : 对于任意一对遍历过的元素x和y,x.equals(y) 返回false

  SORTED : 遍历的元素按照一个预约义的顺序排序

  SIZED : 该Spliterator由一个已知大小的源创建,所以estimatedSize()返回的是 准确值

  NONNULL : 保证遍历的元素不会为null

  IMMUTABLE: Spliterator的数据源不能修改,这意味着在遍历时不能添加、删除、修改任何元素

  CONCURRENT : 该Spliterator的数据源能够被其余线程同时修改而无需同步

  SUBSIZED : 该Spliterator和全部从它拆分出来的Spliterator都是SIZED

 

小结:

  1.内部迭代让你能够并行处理一个流,而无需在代码中显示使用和协调不一样的线程。

  2.虽然并行处理一个流很容易,却不能保证程序在全部状况下都运行的更快。所以必定要测量,确保你并无把程序拖的更慢。

  3.像并行流那样昂对一个数据集并行执行操做能够提高性能,特别是要处理的元素数量庞大,或处理单个元素特别耗时时。

  4.尽可能使用原始特化流,来避免装箱和拆箱操做。

  5.分支/合并框架让你得以用递归方式将能够并行的任务拆分红功效的任务,在不一样的线程上执行,而后将各个子任务的结果合并起来生成总体结果。

  6.Spliterator定义了并行流如何拆分它要遍历的数据。

相关文章
相关标签/搜索