java8新特性(六):Stream多线程并行数据处理

转:http://blog.csdn.net/sunjin9418/article/details/53143588java

将一个顺序执行的流转变成一个并发的流只要调用 parallel()方法
public static long parallelSum( long n){
     return Stream.iterate(1L, i -> i +1).limit(n).parallel().reduce(0L,Long::sum);
}
并行流就是一个把内容分红多个数据块,并用不不一样的线程分别处理每一个数据块的流。最后合并每一个数据块的计算结果。
将一个并发流转成顺序的流只要调用sequential()方法
stream.parallel() .filter(...) .sequential() .map(...) .parallel() .reduce();
 
这两个方法能够屡次调用, 只有最后一个调用决定这个流是顺序的仍是并发的。
 
并发流使用的默认线程数等于你机器的处理器核心数。
 
经过这个方法能够修改这个值,这是全局属性。
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");
 
并不是使用多线程并行流处理数据的性能必定高于单线程顺序流的性能,由于性能受到多种因素的影响。
如何高效使用并发流的一些建议:
1. 若是不肯定, 就本身测试。
2. 尽可能使用基本类型的流  IntStream, LongStream, and DoubleStream
3. 有些操做使用并发流的性能会比顺序流的性能更差,好比limit,findFirst , 依赖元素顺序的操做在并发流中是极其消耗性能的 。findAny的性能就会好不少,应为不依赖顺序。
4. 考虑流中计算的性能(Q)和操做的性能(N)的对比, Q表示单个处理所需的时间, N表示须要处理的数量,若是Q的值越大, 使用并发流的性能就会越高。
5. 数据量不大时使用并发流,性能得不到提高。
6.考虑数据结构:并发流须要对数据进行分解,不一样的数据结构被分解的性能时不同的。
 
流的数据源和可分解性
可分解性
ArrayList 很是好
LinkedList
IntStream.range 很是好
Stream.iterate
HashSet
TreeSet
 
 
7. 流的特性以及中间操做对流的修改都会对数据对分解性能形成影响。 好比固定大小的流在任务分解的时候就能够平均分配,可是若是有filter操做,那么流就不能预先知道在这个操做后还会剩余多少元素。
 
8. 考虑最终操做的性能:若是最终操做在合并并发流的计算结果时的性能消耗太大,那么使用并发流提高的性能就会得不偿失。
 
9.须要理解并发流实现机制:
 
fork/join 框架
 
fork/join框架是jdk1.7引入的,java8的stream多线程并不是流的正是以这个框架为基础的,因此想要深刻理解并发流就要学习fork/join框架。
fork/join框架的目的是以递归方式将能够并行的任务拆分红更小的任务,而后将每一个子任务的结果合并起来生成总体结果。它是ExecutorService接口的一个实现,它把子任务分配线程池(ForkJoinPool)中的工做线程。要把任务提交到这个线程池,必须建立RecursiveTask<R>的一个子类,若是任务不返回结果则是RecursiveAction的子类。
 
fork/join框架流程示意图:
 
 
废话很少说,上代码:
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

/**
 * Created by sunjin on 2016/7/5.
 * 继承RecursiveTask来建立能够用于分支/合并的框架任务
 */
public class ForkJoinSumCalculator  extends RecursiveTask<Long> {
     //要求和的数组
     private final long[]  numbers;
     //子任务处理的数组开始和终止的位置
     private final int  start;
     private final int  end;
     //不在将任务分解成子任务的阀值大小
     public static final int  THRESHOLD = 10000;

     //用于建立组任务的构造函数
     public ForkJoinSumCalculator( long[] numbers){
         this(numbers, 0, numbers. length);
    }

     //用于递归建立子任务的构造函数
     public ForkJoinSumCalculator( long[] numbers, int start, int end){
         this. numbers = numbers;
         this. start = start;
         this. end = end;
    }

     //重写接口的方法
    @Override
     protected Long compute() {
         //当前任务负责求和的部分的大小
         int length =  end start;
         //若是小于等于阀值就顺序执行计算结果
         if(length <=  THRESHOLD){
             return computeSequentially();
        }
         //建立子任务来为数组的前一半求和
        ForkJoinSumCalculator leftTask =  new ForkJoinSumCalculator( numbersstartstart + length/2);
         //将子任务拆分出去,丢到ForkJoinPool线程池异步执行。
        leftTask.fork();
         //建立子任务来为数组的后一半求和
        ForkJoinSumCalculator rightTask =  new ForkJoinSumCalculator( numbersstart + length/2,  end);
         //第二个任务直接使用当前线程计算而再也不开启新的线程。
         long rightResult = rightTask.compute();
         //读取第一个子任务的结果,若是没有完成则等待。
         long leftResult = leftTask.join();
         //合并两个子任务的计算结果
         return rightResult + leftResult;
    }

     //顺序执行计算的简单算法
     private long computeSequentially(){
         long sum = 0;
         for( int i = start; i<  end; i++){
            sum +=  numbers[i];
        }
         return sum;
    }
     //提供给外部使用的入口方法
     public static long forkJoinSum( long n) {
         long[] numbers = LongStream.rangeClosed(1, n).toArray();
        ForkJoinTask<Long> task =  new ForkJoinSumCalculator(numbers);
         return new ForkJoinPool().invoke(task);
    }
}
注意事项:
1. 调用join 方法要等到调用这个方法的线程的本身的任务完成以后。
2. 不要直接去调用ForkJoinPool的invoke方法 ,只须要调用RecursiveTask的fork或者compute。
3. 拆解任务时只须要调用一次fork执行其中一个子任务, 另外一个子任务直接利用当前线程计算。应为fork方法只是在ForkJoinPool中计划一个任务。
4.任务拆分的粒度不宜太细,不否得不偿失。
 
 
工做盗取
因为各类因素,即使任务拆分是平均的,也不能保证全部子任务能同时执行结束, 大部分状况是某些子任务已经结束, 其余子任务还有不少, 在这个时候就会有不少资源空闲, 因此fork/join框架经过工做盗取机制来保证资源利用最大化, 让空闲的线程去偷取正在忙碌的线程的任务。
在没有任务线程中的任务存在一个队列当中, 线程每次会从头部获取一个任务执行,执行完了再从queue的头部获取一个任务,直到队列中的全部任务执行完,这个线程偷取别的线程队列中的任务时会从队列到尾部获取任务,而且执行,直到全部任务执行结束。
从这个角度分析,任务的粒度越小, 资源利用越充分。
 
 
工做盗取示意图
page222image11664
 
 
可拆分迭代器Spliterator
 
它和Iterator同样也是用于遍历数据源中的元素,可是他是为并行执行而设计的。 java8 全部数据结构都实现了 这个接口, 通常状况不须要本身写实现代码。可是了解它的实现方式会让你对并行流的工做原理有更深的了解。(未完待续)
相关文章
相关标签/搜索