Stream使用这么久,它是如何提升遍历集合效率?

对于List 集合类,我想你们确定很了解了,那我想必定也知道集合的顶端接口 Collection。在 Java8 中,Collection 新增了两个流方法,分别是 Stream() 和 parallelStream()数据库


经过英文名不难猜想,这两个方法确定和 Stream 有关,那进一步猜想,是否是和咱们熟悉的 InputStream 和 OutputStream 也有关系呢?集合类中新增的两个 Stream 方法到底有什么做用?框架


什么是 Stream?less

在 Java8 以前,咱们一般是经过 for 循环或者 Iterator 迭代来从新排序合并数据,又或者经过从新定义 Collections.sorts 的 Comparator 方法来实现,这两种方式对于大数据量系统来讲,效率并非很理想。ide


Java8 中添加了一个新的接口类 Stream,他和咱们以前接触的字节流概念不太同样,Java8 集合中的 Stream 至关于高级版的 Iterator,他能够经过 Lambda 表达式对集合进行各类很是便利、高效的聚合操做(Aggregate Operation),或者大批量数据操做 (Bulk Data Operation)。学习


Stream 的聚合操做与数据库 SQL 的聚合操做 sorted、filter、map 等相似。咱们在应用层就能够高效地实现相似数据库 SQL 的聚合操做了,而在数据操做方面,Stream 不只能够经过串行的方式实现数据操做,还能够经过并行的方式处理大批量数据,提升数据的处理效率。大数据


接下来咱们就用一个简单的例子来体验下 Stream 的简洁与强大。优化

这个 Demo 的需求是过滤分组一所中学里身高在 160cm 以上的男女同窗,咱们先用传统的迭代方式来实现,代码以下:ui













Map<String, List<Student>> stuMap = new HashMap<String, List<Student>>();    for (Student stu: studentsList) {        if (stu.getHeight() > 160) { //若是身高大于160             if (stuMap.get(stu.getSex()) == null) { //该性别还没分类                  List<Student> list = new ArrayList<Student>(); //新建该性别学生的列表                  list.add(stu);//将学生放进去列表                  stuMap.put(stu.getSex(), list);//将列表放到map中              } else { //该性别分类已存在                  stuMap.get(stu.getSex()).add(stu);//该性别分类已存在,则直接放进去便可              }          }     }



咱们再使用 Java8 中的 Stream API 进行实现:this

1. 串行实现lua


Map<String, List<Student>> stuMap = stuList.stream().filter((Student s) -> s.getHeight() > 160) .collect(Collectors.groupingBy(Student ::getSex));

2. 并行实现


Map<String, List<Student>> stuMap = stuList.parallelStream().filter((Student s) -> s.getHeight() > 160) .collect(Collectors.groupingBy(Student ::getSex));


经过上面两个简单的例子,咱们能够发现,Stream 结合 Lambda 表达式实现遍历筛选功能很是得简洁和便捷。


Stream 如何优化遍历?

上面咱们初步了解了 Java8 中的 Stream API,那 Stream 是如何作到优化迭代的呢?并行又是如何实现的?下面咱们就透过 Stream 源码剖析 Stream 的实现原理。


1.Stream 操做分类


官方将 Stream 中的操做分为两大类:中间操做(Intermediate operations)和终结操做(Terminal operations)。中间操做只对操做进行了记录,即只会返回一个流,不会进行计算操做,而终结操做是实现了计算操做。


终结操做又能够分为短路(Short-circuiting)与非短路(Unshort-circuiting)操做,前者是指遇到某些符合条件的元素就能够获得最终结果,后者是指必须处理完全部元素才能获得最终结果。操做分类详情以下图所示:


咱们一般还会将中间操做称为懒操做,也正是由这种懒操做结合终结操做、数据源构成的处理管道(Pipeline),实现了 Stream 的高效。


2.Stream 源码实现


在了解 Stream 如何工做以前,咱们先来了解下 Stream 包是由哪些主要结构类组合而成的,各个类的职责是什么。参照下图:

图片


BaseStream 和 Stream 为最顶端的接口类。BaseStream 主要定义了流的基本接口方法,例如,spliterator、isParallel 等;Stream 则定义了一些流的经常使用操做方法,例如,map、filter 等。


ReferencePipeline 是一个结构类,他经过定义内部类组装了各类操做流。他定义了 Head、StatelessOp、StatefulOp 三个内部类,实现了 BaseStream 与 Stream 的接口方法。


Sink 接口是定义每一个 Stream 操做之间关系的协议,他包含 begin()、end()、cancellationRequested()、accpt() 四个方法。ReferencePipeline 最终会将整个 Stream 流操做组装成一个调用链,而这条调用链上的各个 Stream 操做的上下关系就是经过 Sink 接口协议来定义实现的。


3.Stream 操做叠加


咱们知道,一个 Stream 的各个操做是由处理管道组装,并统一完成数据处理的。在 JDK 中每次的中断操做会以使用阶段(Stage)命名。


管道结构一般是由 ReferencePipeline 类实现的,前面讲解 Stream 包结构时,我提到过 ReferencePipeline 包含了 Head、StatelessOp、StatefulOp 三种内部类。


Head 类主要用来定义数据源操做,在咱们初次调用 names.stream() 方法时,会初次加载 Head 对象,此时为加载数据源操做;接着加载的是中间操做,分别为无状态中间操做 StatelessOp 对象和有状态操做 StatefulOp 对象,此时的 Stage 并无执行,而是经过 AbstractPipeline 生成了一个中间操做 Stage 链表;当咱们调用终结操做时,会生成一个最终的 Stage,经过这个 Stage 触发以前的中间操做,从最后一个 Stage 开始,递归产生一个 Sink 链。以下图所示:









List<String> names = Arrays.asList("张三", "李四", "王老五", "李三", "刘老四", "王小二", "张四", "张五六七");
String maxLenStartWithZ = names.stream()                  .filter(name -> name.startsWith("张"))                  .mapToInt(String::length)                  .max()                  .toString();



这个例子的需求是查找出一个长度最长,而且以张为姓氏的名字。从代码角度来看,你可能会认为是这样的操做流程:首先遍历一次集合,获得以“张”开头的全部名字;而后遍历一次 filter 获得的集合,将名字转换成数字长度;最后再从长度集合中找到最长的那个名字而且返回。



4.Stream 并行处理


Stream 处理数据的方式有两种,串行处理和并行处理。要实现并行处理,咱们只须要在例子的代码中新增一个 Parallel() 方法,代码以下所示:









List<String> names = Arrays.asList("张三", "李四", "王老五", "李三", "刘老四", "王小二", "张四", "张五六七");
String maxLenStartWithZ = names.stream()                    .parallel()                  .filter(name -> name.startsWith("张"))                  .mapToInt(String::length)                  .max()                  .toString();


Stream 的并行处理在执行终结操做以前,跟串行处理的实现是同样的。而在调用终结方法以后,实现的方式就有点不太同样,会调用 TerminalOp 的 evaluateParallel 方法进行并行处理。











final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
       assert getOutputShape() == terminalOp.inputShape();        if (linkedOrConsumed)            throw new IllegalStateException(MSG_STREAM_LINKED);        linkedOrConsumed = true;        return isParallel()               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));    }


这里的并行处理指的是,Stream 结合了 ForkJoin 框架,对 Stream 处理进行了分片,Splititerator 中的 estimateSize 方法会估算出分片的数据量。


总结


纵观 Stream 的设计实现,很是值得咱们学习。从大的设计方向上来讲,Stream 将整个操做分解为了链式结构,不只简化了遍历操做,还为实现了并行计算打下了基础


从小的分类方向上来讲,Stream 将遍历元素的操做和对元素的计算分为中间操做和终结操做,而中间操做又根据元素之间状态有无干扰分为有状态和无状态操做,实现了链结构中的不一样阶段。


在串行处理操做中,Stream 在执行每一步中间操做时,并不会作实际的数据操做处理,而是将这些中间操做串联起来,最终由终结操做触发,生成一个数据处理链表,经过 Java8 中的 Spliterator 迭代器进行数据处理;此时,每执行一次迭代,就对全部的无状态的中间操做进行数据处理,而对有状态的中间操做,就须要迭代处理完全部的数据,再进行处理操做;最后就是进行终结操做的数据处理。


在并行处理操做中,Stream 对中间操做基本跟串行处理方式是同样的,但在终结操做中,Stream 将结合 ForkJoin 框架对集合进行切片处理,ForkJoin 框架将每一个切片的处理结果 Join 合并起来。最后就是要注意 Stream 的使用场景。

相关文章
相关标签/搜索