在 Java7 以前,若是想要并行处理一个集合,咱们须要如下几步 1. 手动分红几部分 2. 为每部分建立线程 3. 在适当的时候合并。而且还须要关注多个线程之间共享变量的修改问题。而 Java8 为咱们提供了并行流,能够一键开启并行模式。是否是很酷呢?让咱们来看看吧前端
声明:本文首发于博客园,做者:后青春期的Keats;地址:https://www.cnblogs.com/keatsCoder/ 转载请注明,谢谢!java
什么是并行流:并行流就是将一个流的内容分红多个数据块,并用不一样的线程分别处理每一个不一样数据块的流。例若有这么一个需求:算法
有一个 List
List<Apple> appleList = new ArrayList<>(); // 伪装数据是从库里查出来的 for (Apple apple : appleList) { apple.setPrice(5.0 * apple.getWeight() / 1000); }
咱们经过迭代器遍历 list 中的 apple 对象,完成了每一个 apple 价格的计算。而这个算法的时间复杂度是 O(list.size()) 随着 list 大小的增长,耗时也会跟着线性增长。并行流网络
能够大大缩短这个时间。并行流处理该集合的方法以下:多线程
appleList.parallelStream().forEach(apple -> apple.setPrice(5.0 * apple.getWeight() / 1000));
和普通流的区别是这里调用的 parallelStream()
方法。固然也能够经过 stream.parallel() 将普通流转换成并行流。并行流也能经过 sequential() 方法转换为顺序流,但要注意:流的并行和顺序转换不会对流自己作任何实际的变化,仅仅是打了个标记而已。而且在一条流水线上对流进行屡次并行 / 顺序的转换,生效的是最后一次的方法调用app
并行流如此方便,它的线程从那里来呢?有多少个?怎么配置呢?框架
并行流内部使用了默认的 ForkJoinPool 线程池。默认的线程数量就是处理器的核心数,而配置系统核心属性: java.util.concurrent.ForkJoinPool.common.parallelism 能够改变线程池大小。不过该值是全局变量。改变他会影响全部并行流。目前还没法为每一个流配置专属的线程数。通常来讲采用处理器核心数是不错的选择ide
为了更容易的测试性能,咱们在每次计算完苹果价格后,让线程睡 1s,表示在这期间执行了其余 IO 相关的操做,并输出程序执行耗时,顺序执行的耗时:函数
public static void main(String[] args) throws InterruptedException { List<Apple> appleList = initAppleList(); Date begin = new Date(); for (Apple apple : appleList) { apple.setPrice(5.0 * apple.getWeight() / 1000); Thread.sleep(1000); } Date end = new Date(); log.info("苹果数量:{}个, 耗时:{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000); }
并行版本
List<Apple> appleList = initAppleList(); Date begin = new Date(); appleList.parallelStream().forEach(apple -> { apple.setPrice(5.0 * apple.getWeight() / 1000); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } ); Date end = new Date(); log.info("苹果数量:{}个, 耗时:{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);
耗时状况
跟咱们的预测一致,个人电脑是 四核I5 处理器,开启并行后四个处理器每人执行一个线程,最后 1s 完成了任务!
经过上面的测试,有的人会轻易获得一个结论:并行流很快,咱们能够彻底放弃 foreach/fori/iter 外部迭代,使用 Stream 提供的内部迭代来实现了。事实真的是这样吗?并行流真的如此完美吗?答案固然是否认的。你们能够复制下面的代码,在本身的电脑上测试。测试完后能够发现,并行流并不老是最快的处理方式。
对于 iterate 方法来处理的前 n 个数字来讲,无论并行与否,它老是慢于循环的,非并行版本能够理解为流化操做没有循环更偏向底层致使的慢。可并行版本是为何慢呢?这里有两个须要注意的点:
iterate 生成的是装箱的对象,必须拆箱成数字才能求和
咱们很难把 iterate 分红多个独立的块来并行执行
这个问题颇有意思,咱们必须意识到某些流操做比其余操做更容易并行化。对于 iterate 来讲,每次应用这个函数都要依赖于前一次应用的结果。所以在这种状况下,咱们不只不能有效的将流划分红小块处理。反而还由于并行化再次增长了开支。
而对于 LongStream.rangeClosed() 方法来讲,就不存在 iterate 的第两个痛点了。它生成的是基本类型的值,不用拆装箱操做,另外它能够直接将要生成的数字 1 - n 拆分红 1 - n/4, 1n/4 - 2n/4, ... 3n/4 - n 这样四部分。所以并行状态下的 rangeClosed() 是快于 for 循环外部迭代的
package lambdasinaction.chap7; import java.util.stream.*; public class ParallelStreams { public static long iterativeSum(long n) { long result = 0; for (long i = 0; i <= n; i++) { result += i; } return result; } public static long sequentialSum(long n) { return Stream.iterate(1L, i -> i + 1).limit(n).reduce(Long::sum).get(); } public static long parallelSum(long n) { return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(Long::sum).get(); } public static long rangedSum(long n) { return LongStream.rangeClosed(1, n).reduce(Long::sum).getAsLong(); } public static long parallelRangedSum(long n) { return LongStream.rangeClosed(1, n).parallel().reduce(Long::sum).getAsLong(); } }
package lambdasinaction.chap7; import java.util.concurrent.*; import java.util.function.*; public class ParallelStreamsHarness { public static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool(); public static void main(String[] args) { System.out.println("Iterative Sum done in: " + measurePerf(ParallelStreams::iterativeSum, 10_000_000L) + " msecs"); System.out.println("Sequential Sum done in: " + measurePerf(ParallelStreams::sequentialSum, 10_000_000L) + " msecs"); System.out.println("Parallel forkJoinSum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000L) + " msecs" ); System.out.println("Range forkJoinSum done in: " + measurePerf(ParallelStreams::rangedSum, 10_000_000L) + " msecs"); System.out.println("Parallel range forkJoinSum done in: " + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000L) + " msecs" ); } public static <T, R> long measurePerf(Function<T, R> f, T input) { long fastest = Long.MAX_VALUE; for (int i = 0; i < 10; i++) { long start = System.nanoTime(); R result = f.apply(input); long duration = (System.nanoTime() - start) / 1_000_000; System.out.println("Result: " + result); if (duration < fastest) fastest = duration; } return fastest; } }
并行流虽然轻易的实现了多线程,可是仍未解决多线程中共享变量的修改问题。下面代码中存在共享变量 total,分别使用顺序流和并行流计算前n个天然数的和
public static long sideEffectSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).forEach(accumulator::add); return accumulator.total; } public static long sideEffectParallelSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add); return accumulator.total; } public static class Accumulator { private long total = 0; public void add(long value) { total += value; } }
顺序执行每次输出的结果都是:50000005000000,而并行执行的结果却五花八门了。这是由于每次访问 totle 都会存在数据竞争,关于数据竞争的缘由,你们能够看看关于 volatile 的博客。所以当代码中存在修改共享变量的操做时,是不建议使用并行流的。
在并行流的使用上有下面几点须要注意:
尽可能使用 LongStream / IntStream / DoubleStream 等原始数据流代替 Stream 来处理数字,以免频繁拆装箱带来的额外开销
要考虑流的操做流水线的总计算成本,假设 N 是要操做的任务总数,Q 是每次操做的时间。N * Q 就是操做的总时间,Q 值越大就意味着使用并行流带来收益的可能性越大
例如:前端传来几种类型的资源,须要存储到数据库。每种资源对应不一样的表。咱们能够视做类型数为 N,存储数据库的网络耗时 + 插入操做耗时为 Q。通常状况下网络耗时都是比较大的。所以该操做就比较适合并行处理。固然当类型数目大于核心数时,该操做的性能提高就会打必定的折扣了。更好的优化方法在往后的博客会为你们奉上
对于较少的数据量,不建议使用并行流
容易拆分红块的流数据,建议使用并行流
如下是一些常见的集合框架对应流的可拆分性能表
源 | 可拆分性 |
---|---|
ArrayList | 极佳 |
LinkedList | 差 |
IntStream.range | 极佳 |
Stream.iterate | 差 |
HashSet | 好 |
TreeSet | 好 |
码字不易,若是你以为读完之后有收获,不妨点个推荐让更多的人看到吧!