JAVA8中引入了lamda表达式和Stream接口。其丰富的API及强大的表达能力极大的简化代码,提高了效率,同时还经过parallelStream提供并发操做的支持,本文探讨parallelStream方法的使用。html
首先看下java doc中对parallelStream的定义。java
A sequence of elements supporting sequential and parallel aggregate operations. ... Stream pipelines may execute either sequentially or in parallel. This execution mode is a property of the stream.
Streams are created with an initial choice of sequential or parallel execution. (For example, Collection.stream()
creates a sequential stream, and Collection.parallelStream() creates a parallel one.)
This choice of execution mode may be modified by the BaseStream.sequential() or BaseStream.parallel() methods,
and may be queried with the BaseStream.isParallel() method.
既然能够并行的执行,废话很少说,先看一个例子。api
class Person { int id; String name; String sex; float height; public Person(int id, String name, String sex, float height) { this.id = id; this.name = name; this.sex = sex; this.height = height; } } /** * 构造数据 * * @return */ public List<Person> constructPersons() { List<Person> persons = new ArrayList<Person>(); for (int i = 0; i < 5; i++) { Person p = new Person(i, "name" + i, "sex" + i, i); persons.add(p); } return persons; } /** * for * * @param persons */ public void doFor(List<Person> persons) { long start = System.currentTimeMillis(); for (Person p : persons) { try { Thread.sleep(1000); } catch (InterruptedException e) { } System.out.println(p.name); } long end = System.currentTimeMillis(); System.out.println("doFor cost:" + (end - start)); } /** * 顺序流 * * @param persons */ public void doStream(List<Person> persons) { long start = System.currentTimeMillis(); persons.stream().forEach(x -> { try { Thread.sleep(1000); } catch (InterruptedException e) { } System.out.println(x.name); }); long end = System.currentTimeMillis(); System.out.println("doStream cost:" + (end - start)); } /** * 并行流 * * @param persons */ public void doParallelStream(List<Person> persons) { long start = System.currentTimeMillis(); persons.parallelStream().forEach(x -> { try { Thread.sleep(1000); } catch (InterruptedException e) { } System.out.println(x.name); }); long end = System.currentTimeMillis(); System.out.println("doParallelStream cost:" + (end - start)); }
执行结果:数组
name0 name1 name2 name3 name4 doFor cost:5021 name0 name1 name2 name3 name4 doStream cost:5076 name4 name0 name2 name3 name1 doParallelStream cost:1010
代码上 stream 和 parallelStream 语法差别较小,从执行结果来看,stream顺序输出,而parallelStream 无序输出;parallelStream 执行耗时是 stream 的五分之一。
能够看到在当前测试场景下,parallelStream 得到的相对较好的执行性能,那parallelStream背后究竟是什么呢?
要深刻了解parallelStream,首先要弄明白ForkJoin框架和ForkJoinPool。ForkJoin框架是java7中提供的并行执行框架,他的策略是分而治之。说白了,就是把一个大的任务切分红不少小的子任务,子任务执行完毕后,再把结果合并起来。安全
顺便说下ForkJoin框架和ThreadPoolExecutor的区别,ForkJoin框架可使用数量有限的线程数,执行大量任务,而且这些任务之间是有父子依赖的,必须是子任务执行完成后,父任务才能执行。ThreadPoolExecutor 显然是没法支持这种场景的。而ForkJoin框架,可让其中的线程建立新的任务,并挂起当前的任务,任务以及子任务会保留在一个内部队列中,此时线程就可以从队列中选择任务顺序执行。
Java 8为ForkJoinPool添加了一个通用线程池,这个线程池用来处理那些没有被显式提交到任何线程池的任务。它是ForkJoinPool类型上的一个静态元素,它拥有的默认线程数量等于运行计算机上的处理器数量。当调用Arrays类上添加的新方法时,自动并行化就会发生。好比用来排序一个数组的并行快速排序,用来对一个数组中的元素进行并行遍历。自动并行化也被运用在Java 8新添加的Stream API中。
上面的代码中,forEach方法会为每一个元素的操做建立一个任务,该任务会被前文中提到的ForkJoinPool中的通用线程池处理。以上的并行计算逻辑固然也可使用ThreadPoolExecutor完成,可是就代码的可读性和代码量而言,使用ForkJoinPool明显更胜一筹。
默认线程池的数量就是处理器的数量,特殊场景下可使用系统属性:-Djava.util.concurrent.ForkJoinPool.common.parallelism={N} 调整。
对上面例子作下调整,sleep时间变为2ms,并发
Thread.sleep(2);
执行结果以下:oracle
doFor cost:12 ======================= doParallelStream cost:62 ======================= doStream cost:13
doParallelStream耗时最多,可见并非并行执行就是性能最好的,要根据具体的应用场景测试分析。这个例子中,每一个子任务执行时间较短,而线程切换消耗了大量时间。
说到了并发,不得不提线程安全。先看一个例子:框架
public void doThreadUnSafe() { List<Integer> listFor = new ArrayList<>(); List<Integer> listParallel = new ArrayList<>(); IntStream.range(0, 1000).forEach(listFor::add); IntStream.range(0, 1000).parallel().forEach(listParallel::add); System.out.println("listFor size :" + listFor.size()); System.out.println("listParallel size :" + listParallel.size()); }
输出结果:性能
listFor size :1000 listParallel size :949
显而易见,stream.parallel.forEach()中执行的操做并不是线程安全。若是须要线程安全,能够把集合转换为同步集合,即:Collections.synchronizedList(new ArrayList<>())。
总结下来以下:测试
参考:
https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html
https://docs.oracle.com/javase/8/docs/api/index.html
https://blog.csdn.net/u011001723/article/details/52794455/