在Java7以前想要并行处理大量数据是很困难的,首先把数据拆分红不少个部分,而后把这这些子部分放入到每一个线程中去执行计算逻辑,最后在把每一个线程返回的计算结果进行合并操做;在Java7中提供了一个处理大数据的fork/join框架,屏蔽掉了线程之间交互的处理,更加专一于数据的处理。java
Fork/Join框架采用的是思想就是分而治之,把大的任务拆分红小的任务,而后放入到独立的线程中去计算,同时为了最大限度的利用多核CPU,采用了一个种工做窃取
的算法来运行任务,也就是说当某个线程处理完本身工做队列中的任务后,尝试当其余线程的工做队列中窃取一个任务来执行,直到全部任务处理完毕。因此为了减小线程之间的竞争,一般会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行;在百度找了一张图算法
RecursiveTask
《2020最新Java基础精讲视频教程和学习路线!》
使用Fork/Join框架首先须要建立本身的任务,须要继承RecursiveTask
,实现抽象方法安全
protected abstract V compute();
实现类须要在该方法中实现任务的拆分,计算,合并;伪代码能够表示成这样:网络
if(任务已经不可拆分){ return 顺序计算结果; } else { 1.任务拆分红两个子任务 2.递归调用本方法,拆分子任务 3.等待子任务执行完成 4.合并子任务的结果 }
任务:完成对一亿个天然数求和框架
咱们先使用串行的方式实现,代码以下:ide
long result = LongStream.rangeClosed(1, 100000000) .reduce(0, Long::sum); System.out.println("result:" + result); 复制代码
使用Fork/Join框架实现,代码以下:post
public class SumRecursiveTask extends RecursiveTask<Long> { private long[] numbers; private int start; private int end; public SumRecursiveTask(long[] numbers) { this.numbers = numbers; this.start = 0; this.end = numbers.length; } public SumRecursiveTask(long[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } @Override protected Long compute() { int length = end - start; if (length < 20000) { //小于20000个就不在进行拆分 return sum(); } SumRecursiveTask leftTask = new SumRecursiveTask(numbers, start, start + length / 2); //进行任务拆分 SumRecursiveTask rightTask = new SumRecursiveTask(numbers, start + (length / 2), end); //进行任务拆分 leftTask.fork(); //把该子任务交友ForkJoinPoll线程池去执行 rightTask.fork(); //把该子任务交友ForkJoinPoll线程池去执行 return leftTask.join() + rightTask.join(); //把子任务的结果相加 } private long sum() { int sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } public static void main(String[] args) { long[] numbers = LongStream.rangeClosed(1, 100000000).toArray(); Long result = new ForkJoinPool().invoke(new SumRecursiveTask(numbers)); System.out.println("result:" +result); } } 复制代码
Fork/Join默认的线程数量就是你的处理器数量,这个值是由Runtime.getRuntime().available- Processors()
获得的。 可是你能够经过系统属性java.util.concurrent.ForkJoinPool.common. parallelism
来改变线程池大小,以下所示:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");
这是一个全局设置,所以它将影响代码中全部的并行流。目前还没法专为某个 并行流指定这个值。由于会影响到全部的并行流,因此在任务中经历避免网络/IO操做,不然可能会拖慢其余并行流的运行速度
以上咱们说到的都是在Java7中使用并行流的操做,Java8并无止步于此,为咱们提供更加便利的方式,那就是parallelStream
;parallelStream
底层仍是经过Fork/Join框架来实现的。学习
1.串行流转化成并行流测试
LongStream.rangeClosed(1,1000) .parallel() .forEach(System.out::println); 复制代码
2.直接生成并行流大数据
List<Integer> values = new ArrayList<>(); for (int i = 0; i < 10000; i++) { values.add(i); } values.parallelStream() .forEach(System.out::println); 复制代码
咱们使用parallelStream
来实现上面的累加例子看看效果,代码以下:
public static void main(String[] args) { Summer summer = new Summer(); LongStream.rangeClosed(1, 100000000) .parallel() .forEach(summer::add); System.out.println("result:" + summer.sum); } static class Summer { public long sum = 0; public void add(long value) { sum += value; } } 复制代码
运行结果以下:
运行以后,咱们发现运行的结果不正确,而且每次运行的结果都不同,这是为何呢? 这里其实就是错用parallelStream
常见的状况,parallelStream
是非线程安全的,在这个里面中使用多个线程去修改了共享变量sum, 执行了sum += value
操做,这个操做自己是非原子性的,因此在使用并行流时应该避免去修改共享变量。
修改上面的例子,正确使用parallelStream
来实现,代码以下:
long result = LongStream.rangeClosed(1, 100000000) .parallel() .reduce(0, Long::sum); System.out.println("result:" + result); 复制代码
在前面咱们已经说过了fork/join的操做流程是:拆子部分,计算,合并结果;由于parallelStream
底层使用的也是fork/join框架,因此这些步骤也是须要作的,可是从上面的代码,咱们看到Long::sum
作了计算,reduce
作了合并结果,咱们并无去作任务的拆分,因此这个过程确定是parallelStream
已经帮咱们实现了,这个时候就必须的说说Spliterator
Spliterator
是Java8加入的新接口,是为了并行执行任务而设计的。
public interface Spliterator<T> { boolean tryAdvance(Consumer<? super T> action); Spliterator<T> trySplit(); long estimateSize(); int characteristics(); } 复制代码
tryAdvance: 遍历全部的元素,若是还有能够遍历的就返回ture,不然返回false
trySplit: 对全部的元素进行拆分红小的子部分,若是已经不能拆分就返回null
estimateSize: 当前拆分里面还剩余多少个元素
characteristics: 返回当前Spliterator特性集的编码
感谢你们能够耐心地读到这里。