上一节咱们介绍了 Stream 相关的 API,map,filter 等方法都是中间操做。这一节咱们介绍终止操做。终止操做(terminal operation)的做用是产生结果。java
forEach 和 forEachOrderedsql
forEach 和 forEachOrdered 都是对流中的每个元素执行对应的操做。须要传入 Consumer 类型的函数式接口(一个输入参数,没有返回值)。编程
若是咱们使用串行流时这两个方法并无什么区别,可是若是使用并行流(后面会有专门的章节去将并行流)的时候 forEach 并不能保证元素的处理顺序,而 forEachOrdered 是严格按照顺序去执行的。数组
void forEach(Consumer<? super T> action); void forEachOrdered(Consumer<? super T> action);
这两个方法没有返回值,因此通常使用这两个方法进行打印等操做。安全
Arrays.asList(4,2,6,8,1,9,3).stream().forEach(System.out::println);
toArray多线程
toArray 方法的做用是将 Stream 流转换为数组。toArray 方法是一个终止操做。toArray 方法有两个实现,第一个实现不须要传入参数,返回 Object 类型的数组。第二个实现咱们能够返回特定类型的数组,须要传入 IntFunction 类型的函数式接口。并发
Object[] toArray(); <A> A[] toArray(IntFunction<A[]> generator);
例子:app
因为 toArray() 方法返回的是 Object 类型的数组,因此使用起来不若有参数的 toArray 方法灵活。框架
Object[] a = Arrays.asList(4,2,6,8,1,9,3).stream().filter(i->i>3).toArray(); for(Object object:a){ System.out.println(object); }
若是咱们想返回特定类型的数组,咱们就可使用带参数的 toArray 方法。IntFunction 中须要传入一个 int 类型的数组做为数组的长度,返回的 A[] 是咱们须要返回的数组类型。还会将流中的元素放入返回的数组中。ide
例子:
//Lambda 表达式 Integer[]res1 = Arrays.asList(4,2,6,8,1,9,3).stream().filter(i->i>3).toArray(i->new Integer[i]); //方法引用 Integer[]res2 = Arrays.asList(4,2,6,8,1,9,3).stream().filter(i->i>3).toArray(Integer[]::new);
咱们在来看看 JDK 中给咱们的示例,将过滤后的元素放入到 Person 数组中:
Person[] men = people.stream() .filter(p -> p.getGender() == MALE) .toArray(Person[]::new);
reduce
聚合操做。就是将流中的元素汇集到一块儿计算出一个结果,好比求和等操做。
T reduce(T identity, BinaryOperator<T> accumulator); Optional<T> reduce(BinaryOperator<T> accumulator); <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);
来看第一个方法的例子:
这个方法须要咱们传入一个初始值,而后经过函数式接口 BinaryOperator 的实例计算出流中数据的和。
Stream<Integer> stream = Arrays.stream(new Integer[]{1,2,3,4,5,6,7,8,9}); Integer result = stream.reduce(0,(i,j)->i+j); System.out.println(result);
把这个方法改为 for 循环的方式方便你们理解:
其实直观的看函数式编程的方式更加易于理解,stream 流至关于 for 循环的操做。
T res = identity; for(int i=0;i<n;i++){ res = accumulator.apply(res,a[i]); } return res;
若是在上面的方法,咱们不给初始值,返回的就是 Optional 类型。也就是 reduce 方法的第二个实现。
Stream<Integer> str = Arrays.stream(new Integer[]{1,2,3,4,5,6,7,8,9}); Optional<Integer> re = str.reduce((i, j)->{ return i < j?i:j; }); re.ifPresent(System.out::println); //上面的方法也能够改为方法引用的方式来实现 //reduce(Integer::min)
第三个函数实现等到咱们讲并行流的时候再讲。
Collect
收集器, 做用是将输入元素累积到一个可变的结果容器中(到 StringBuilder 中,到 List中,获取汇总信息等等)。它会将全部的元素处理完毕后,将积累的结果转换为一个最终的表示,它支持串行与并行两种方式执行。
咱们再来看第一个方法实现:
//参数的做用和上面相同 <R> R collect(Supplier<R> supplier, //提供一个 Container 容器 BiConsumer<R, ? super T> accumulator, //累加器 BiConsumer<R, R> combiner); //并行聚合
例子:
咱们须要接收一个字符串流而后返回 List 集合。
Supplier
BiConsumer<R, ? super T> accumulator,这个函数式接口的做用是将流中的元素放入到前面建立的容器中,可使用 (thelist, item)->thelist.add(item) 或者 ArrayList::add 方法来实现 BiConsumer 函数式接口。
BiConsumer<R, R> combiner,是将全部并行操做的结果聚集到一块儿,就是当全部容器的结果聚集到一块儿就能够了。咱们可使用 (theList1,theList2)-> theList1.addAll(thelist2),或者 ArrayList::addAll 来实现 BiConsumer 函数式接口。
//方法引用方式 List<String> asList = Arrays.asList("hello","world","java8","lambda").stream().collect(ArrayList::new, ArrayList::add, ArrayList::addAll); //Lambda 表达式方式 List<String> asList1 = Arrays.asList("hello","world","java8","lambda").stream().collect(()->new ArrayList<>(), (thelist, item)-> thelist.add(item), (theList1,theList2)-> theList1.addAll(theList2));
下面的例子会接收一个字符串的流而后组成一个字符串:
Supplier
BiConsumer<R, ? super T> accumulator,这个函数式接口的做用是将流中的元素放入到前面建立的容器中,咱们可使用 StringBuilder 的 append 方法来将流中的字符串数据追加到前一步建立的 StringBuilder 中。
BiConsumer<R, R> combiner,是将全部并行操做的结果聚集到一块儿。咱们仍是使用 append 方法将全部的 StringBuilder 聚集到一块儿。
StringBuilder concat = Arrays.asList("hello","world","java8","lambda").stream().collect(StringBuilder::new, StringBuilder::append,StringBuilder::append); StringBuilder concat1 = Arrays.asList("hello","world","java8","lambda").stream().collect(()->new StringBuilder(),(stringBuilder, s) -> stringBuilder.append(s),(stringBuilder1, stringBuilder2) -> stringBuilder1.append(stringBuilder2));
先看第二个方法实现:
<R, A> R collect(Collector<? super T, A, R> collector);
Collector<T,A,R> 泛型接口,可变的汇聚操做,
Collector 并非一个函数式接口,咱们看看它都定义了哪些抽象方法,以及这些抽象方法的做用:
//T 是 Stream 里提供的 input 类型 //A 是累加器,结果会被放入累加器中 //R 返回值,不必定全部状况都返回 Collection public interface Collector<T,A,R>{ Supplier<A> supplier(); //提供一个 Container BiConsumer<A,T> accumulator(); //累加器 Function<A,R> finisher(); //返回的结果 BinaryOperator<A> combiner(); //并行聚合,并行操做时,好比有四个线程同时去执行,那么就会生成4个部分结果,使用combiner 聚合到一块儿 Set<Characteristics> characteristics(); //Collector的特征 // Concurrent, unordered,identity_finish }
Collector 的实现类须要实现上面的方法而后将结果聚集到一块儿。Collectors 自己提供了关于 Collector 的常见汇聚实现,好比汇聚到 List,Collection 等等。Collectors 自己实际是一个工厂。
先来看一些使用的例子:
toCollection 能够汇聚成一些常规的集合,好比 ArrayList,LinkedList,TreeSet 等等。
Stream<String> stream = Stream.of("hello","world","helloworld"); List<String> list = stream.collect(Collectors.toCollection(ArrayList::new)); //若是想要换成 LinkedList 或者 TreeSet,只须要换掉 toCollection 的参数就能够了
固然也能够直接使用 toList(将元素汇聚到 ArrayList 中),toSet(将元素汇聚到 HashSet 中)中。
List<String> list1 = Arrays.asList("hello","world","helloworld","test"); List<String> r = list1.stream().map(String::toUpperCase).collect(Collectors.toList()); Set<String> s = list1.stream().map(String::toUpperCase).collect(Collectors.toSet());
咱们经过 toList 的源码在了解一下 JDK 的实现者们如何实现 Collector 接口的。
Supplier 接口的实现直接返回一个 new ArrayList,累加器的实现使用 ArrayList 接口的 add 方法,聚合操做则是使用 addAll 方法。
public static <T> Collector<T, ?, List<T>> toList() { return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add, (left, right) -> { left.addAll(right); return left; }, CH_ID); }
咱们再来看看 toSet 方法的源码。
Supplier 接口的实现直接返回一个 new HashSet,累加器的实现使用 HashSet 的 add 方法,聚合操做一样使用 addAll 方法。
public static <T> Collector<T, ?, Set<T>> toSet() { return new CollectorImpl<>((Supplier<Set<T>>) HashSet::new, Set::add, (left, right) -> { left.addAll(right); return left; }, CH_UNORDERED_ID); }
咱们已经基本了解了 Collectors 中聚合的方法,相信咱们在实际工做中也能根据本身的须要实现一个本身的聚合方法。
咱们还能够在 collect 方法执行以后在进行一些操做,好比将结果的集合变为一个不可变的集合。
List<String> t = list1.stream().collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));
collectors 还为提供了许多求值或者计算的方法,好比,最大值(maxBy),最小值(minBy),个数(counting),求和(summingInt,summingLong,summingDouble)等等,由于比较简单就不一一介绍了。
Collectors 的分组和分区
分组,至关于 sql 的 groupBy。好比咱们将 List
之前的编程思想:
流的方式:
Map<String,List<Student>> map = students.stream().collect(Collectors.groupingBy(Student::getName));
若是咱们想返回姓名和姓名相同的 Student 的个数该怎么办哪?能够在 groupingBy 的第二个参数放入咱们须要放入每一个组内的数据。
//返回 name,count(*) Map<String,Long> map =students.stream().collect(Collectors.groupingBy(Student::getName,Collectors.counting()));
若是咱们想按照姓名分组,获取相同姓名的同窗的平均分数改怎么办那?
Map<String,Double> map = students.stream().collect(Collectors.groupingBy(Student::getName,Collectors.avaeragingDouble(Student::getSocre));
分区,partition by,能够认为分组的特殊状况。只会有两组。至关于 True 和 False 两个组。
咱们把学生分区,大于 90 分的是一个分区,小于 90 分的是一个分区。
//大于 90 的一个分区,小于 90 的一个分区 Map<Boolean,List<Student>> s = students.stream().collect(Collectors.partitioningBy(student->student.getScore()>=90));
allMatch/noneMatch/anyMatch
这三个方法都须要传入 Predicate 类型的函数式接口,allMatch 只有全部元素都符合咱们在 Predicate 实例中设置的条件则返回 true,不然返回 false。anyMatch 则是任意一个符合就返回 true,都不符合则返回 false。noneMatch 是全部都不符合则返回 true,不然返回 false。
findAny/findFirst
findAny 返回流中的某一个元素,findFirst 返回流中第一个元素。两个函数的返回值都是 Optional 类型的,若是找不到元素,则返回一个包含空值的 Optioanl 对象。
咱们常用 Collection 接口下的 stream() 方法来将咱们须要操做的集合转化为流。其实 Collection 接口还有另外一个方法来生成流,这个方法用于生成并行流,parallelStream()。所谓并行流就是用多线程的方式对流中的数据进行操做。
并行流带来的好处固然是加快处理数据的速度,可是使用并行流并不老是很快,有时反而很是慢。并且使用 parallelStream 还要注意线程安全问题。这两个问题咱们稍后给你们讲解,咱们先来看看 parallelStream 的底层原理。
底层原理
咱们经过 parallelStream 的执行来看看后台到底有哪些线程。咱们经过 sleep 方法让它执行的时间长一点,方便咱们查看。
public static void main(String[] args) { List<String> lists = Arrays.asList("hello","world","java8","fork","join","string","main"); lists.parallelStream().forEach(s->{ try { Thread.sleep(100000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(s); }); }
使用 jconsole 命令,选择咱们正在执行的程序。
注意后台中有三个 ForkJoinPool 名字的线程,这说明 parallelStream 使用了 fork/join 框架来执行多线程任务。
最后来看上面那段代码的执行结果,咱们发现使用 parallelStream 以后 forEach 方法中的元素执行顺序出现了问题。
main hello join world string java8 fork
Fork/Join
为了不有的同窗对于这个框架不熟悉,咱们来简单介绍一下这个框架的思想和使用方法。
Fork/Join 是 JAVA7 中引入的并行操做框架,注意这里并非并发而是并行。并发指的是在系统中有多个线程运行去执行任务,它们同时等待一个 CPU 分配时间片去执行,某一时刻只有一个线程执行任务。而并行在这里指的是利用多核 CPU,将子任务划分到不一样 CPU 中,达到同时执行的效果,某一时刻可能有互相不影响多个线程同时执行子任务。
Fork/Join 从名字上来看就是采用分而治之的思想,将任务 fork 成多个子任务,执行完毕后在将结果 join 到一块儿。
Fork/Join 框架有两个重要的类,ForkJoinPool,继承了 ExecutorService 和线程池同一个父类,至关于一个特殊的线程池,里面线程的数量通常为 CPU 的核数。线程池里面的 Task 类为 ForkJoinTask,咱们通常使用它的子类来建立 task,RecursiveAction 适用于没有返回值的 task,RecursiveTask 适用于有返回值的 task。
线程安全
经过上面两个章节咱们已经了解了 parallelStream 的建立,底层原理了。parallelStream 的使用也和 stream 方法同样,只不过 parallelStream 使用并行的方式处理流中的数据,并行带来的好处是可以加快数据处理的速度,可是此时咱们必定要注意线程安全的问题,就如同上面的 forEach 方法同样,使用 parallelStream 后就出现了数据处理顺序的问题。
在使用并行流时,有些方法是很难保持稳定性的,而且维持稳定性的成本很高,若是须要严格的顺序或者高性能时就不要使用 parallelStream,使用这些方法时咱们仍是推荐你们使用串行流。好比 distinct,limit,skip。
有些方法须要咱们本身提供同步的机制,好比 forEach。有些不须要额外同步的方法,reduce,collect,咱们也推荐你们使用 reduce 和 collect。
咱们就 forEach 的线程不安全问题来给你们一个演示:
咱们经过并行流的 forEach 方法将流中元素加入到新的容器中。
List<String> lists = Arrays.asList("hello","world","java8","fork","join","string","main"); List<String> res = new ArrayList<>(); lists.parallelStream().forEach(res::add); res.stream().forEach(System.out::println);
结果,新容器中的数据顺序不但变了,并且还出现了 null 这样严重的线程安全问题。
null main hello java8 string fork join
解决办法,经过 synchronizedList 方法将咱们要经过并行流操做的容器变为线程安全的。
List<String> lists = Arrays.asList("hello","world","java8","fork","join","string","main"); List<String> res = new ArrayList<>(); List<String> newRes = Collections.synchronizedList(res); lists.parallelStream().forEach(newRes::add); res.stream().forEach(System.out::println);
结果,虽然顺序仍是变了,可是元素一个也很多。
join main world java8 string hello fork
性能
并发流 parallelStream 的做用是使用多线程的方式来加快流处理的速度。那么并行流的性能到底怎么样哪?
咱们经过加法来计算一下并行流和串行流和 for 循环的速度,这个结果彻底是基于个人机器,在不一样的机器上时间可能会出现偏差,可是不会影响结果。
第一个方法,for 循环。
private static long forMethod(long limit){ long result= 0; for(long i=1l;i< limit;i++){ result = result+i; } return result; }
第二个方法,使用 iteate 方法来生成一个无限流,经过 limit 限制流的个数,而后经过 reduce 来计算。
private static long iterateStream(long limit){ return Stream.iterate(1L,i->i+1). limit(limit).reduce(0L,Long::sum); }
第三个方法,使用 iteate 方法来生成一个无限流,经过 limit 限制流的个数,使用并行的方式来计算。
private static long iterateStreamparallel(long limit){ return Stream.iterate(1L,i->i+1). limit(limit).parallel().reduce(0L,Long::sum); }
第四个方法,为了不自动拆装箱带来的性能损耗,咱们使用 mapToLong 将其转化为 LongStream。
private static long iterateStream2(long limit){ return Stream.iterate(1L,i->i+1).mapToLong(Long::longValue). limit(limit).reduce(0L,Long::sum); }
第五个方法,为了不自动拆装箱带来的性能损耗,咱们使用 mapToLong 将其转化为 LongStream,使用并行的方式来计算。
private static long iterateStream2parallel(long limit){ return Stream.iterate(1L, i->i+1).mapToLong(Long::longValue).parallel(). limit(limit).reduce(0L,Long::sum); }
第六个方法,直接使用 LongStream 的 rangeClosed 来生成一个有限流。
private static long iterateStream3(long limit){ return LongStream.rangeClosed(1,limit).parallel().reduce(0L,Long::sum); }
第七个方法直接使用 LongStream 的 rangeClosed 来生成一个有限流,经过并行来计算。
private static long iterateStream3(long limit){ return LongStream.rangeClosed(1,limit).parallel.reduce(0L,Long::sum); }
测试方法:
private static long functionTest(Function<Long,Long> add, long limit){ long start = System.currentTimeMillis(); long result = add.apply(limit); long end = System.currentTimeMillis(); long cost = end - start; return cost; }
咱们将 limit 设置成 10000000,来看看各个方法的执行时间到底有什么差别。
forMethod cost time: 7 iterateStream cost time: 241 iterateStreamparallel cost time: 1790 iterateStream2 cost time: 107 iterateStream2parallel cost time: 367 iterateStream3 cost time: 37 iterateStream3parallel cost time: 28
咱们的结论是基于 for 循环,iterate 和 rangeClosed 的方法来统计的。
最后给出前人总结的并行流处理速度的总结:
方法 | 处理速度 |
---|---|
ArrayList | 优秀 |
LinkedList | 差 |
*Stream.range | 优秀 |
Stream.iterate | 差 |
HashSet | 好 |
TreeSet | 好 |