Java 8 Stream原理解析

提及 Java 8,咱们知道 Java 8 大改动之一就是增长函数式编程,而 Stream API 即是函数编程的主角,Stream API 是一种流式的处理数据风格,也就是将要处理的数据看成流,在管道中进行传输,并在管道中的每一个节点对数据进行处理,如过滤、排序、转换等。编程

首先咱们先看一个使用Stream API的示例,具体代码以下:数组

 

code1 Stream exampleapp

这是个很简单的一个Stream使用例子,咱们过滤掉空字符串后,转成int类型并计算出最大值,这其中包括了三个操做:filter、mapToInt、sum。相信大多数人再刚使用Stream API的时候都会有个疑问,Stream是指怎么实现的,是每一次函数调用就执行一次迭代吗?答案确定是否,由于若是真的是每一次函数调用就执行一次迭代,这个效率是很难接受的,Stream也不会那么受欢迎。less

其实Stream内部是经过流水线(Pipeline)的方式来实现的,基本思想是在迭代的时候顺着流水线尽量的执行更多的操做,从而避免屡次迭代。为了对Stream的操做有更清晰的认识,咱们汇总了Stream的全部操做。函数式编程

从上表能够看出Stream将全部操做分为两类:中间操做和终止操做。其中中间操做分为无状态和有状态,终止操做分为非短路操做和短路操做,下面是针对这几个操做的含义说明:函数

一、中间操做:中间操做只是一种标记,只有结束操做才会触发实际计算spa

  • 无状态:指元素的处理不受前面元素的影响;
  • 有状态:有状态的中间操做必须等到全部元素处理以后才知道最终结果,好比排序是有状态操做,在读取全部元素以前并不能肯定排序结果。

二、终止操做:顾名思义,就是得出最后计算结果的操做设计

  • 短路操做:指不用处理所有元素就能够返回结果;
  • 非短路操做:指必须处理全部元素才能获得最终结果。

Stream流水线解决方案

 经过上面的介绍,咱们了解到Stream在执行中间操做时仅仅是记录,当用户调用终止操做时,会在一个迭代里将已经记录的操做顺着流水线所有执行掉。沿着这个思路,有几个问题须要解决:code

  1. 用户的操做如何记录?
  2. 操做如何叠加?
  3. 叠加以后的操做如何执行?

一、操做如何记录

图1-1对象

关于操做如何记录,在JDK源码注释中屡次用(操做)stage来标识用户的每一次操做,而一般状况下Stream的操做又须要一个回调函数,因此一个完整的操做是由数据来源、操做、回调函数组成的三元组来表示。而在具体实现中,使用实例化的ReferencePipeline来表示,即图1-1中的Head、StatelessOp、StatefulOp的实例。接下来咱们来看下Stream几个经常使用方法的源码。

 code2 Collection.Stream()

code3 StreamSupport.stream()

code4 ReferencePipeline.map()

从上面源码中能够看出来,咱们调用stream()方法时最终会建立一个Head实例来表示流操做的头,当调用map()方法时则会建立无状态的中间操做实例StatelessOp,一样调用其余操做对应的方法也会生成一个ReferencePipeline实例,在这里就不一一列举。在用户调用一系列操做后,最终会造成一个双向链表,以下图所示:

图1-2

二、操做如何叠加

上面咱们说明了Stream是经过stage记录操做,但stage只保存当前操做,它并不知道下个stage如何操做,须要什么操做。因此要执行的话还须要某种协议将各个stage关联起来。jdk中就是使用Slink接口来实现的,Slink接口定义begin()、end()、cancellationRequested()、accept()四个方法,以下表所示。

往回看code3 ReferencePipeline.map()的方法,咱们会发现咱们在建立一个ReferencePipeline实例的时候,须要重写opWrapSink方法来生成对应Sink实例。并且经过阅读源码会发现经常使用的操做都会建立一个ChainedReference实例。咱们能够看下code5 ChainedReference抽象类的源码实现,由于ChainedReference只是个抽象实现,不携带具体操做的特性,因此是更能体现做者的设计理念。

经过查看源码能够发现ChainedReference会持有下一个操做的Slink,并在调用begin、end、cancellationRequested方法会调用下一个操做的Slink的相应方法,以此来达到叠加的效果。

code5 ChainedReference

三、叠加以后的操做如何执行

Sink完美封装了Stream每一步操做,并给出了[处理->转发]的模式来叠加操做。这一连串的齿轮已经咬合,就差最后一步拨动齿轮启动执行。是什么启动这一连串的操做呢?也许你已经想到了启动的原始动力就是结束操做(Terminal Operation),一旦调用某个结束操做,就会触发整个流水线的执行。

结束操做以后不能再有别的操做,因此结束操做不会建立新的流水线阶段(Stage),直观的说就是流水线的链表不会在日后延伸了。结束操做会建立一个包装了本身操做的Sink,这也是流水线中最后一个Sink,这个Sink只须要处理数据而不须要将结果传递给下游的Sink(由于没有下游)。对于Sink的[处理->转发]模型,结束操做的Sink就是调用链的出口。

咱们再来考察一下上游的Sink是如何找到下游Sink的。一种可选的方案是在PipelineHelper中设置一个Sink字段,在流水线中找到下游Stage并访问Sink字段便可。但Stream类库的设计者没有这么作,而是设置了一个Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)方法来获得Sink,该方法的做用是返回一个新的包含了当前Stage表明的操做以及可以将结果传递给downstream的Sink对象。为何要产生一个新对象而不是返回一个Sink字段?这是由于使用opWrapSink()能够将当前操做与下游Sink(上文中的downstream参数)结合成新Sink。试想只要从流水线的最后一个Stage开始,不断调用上一个Stage的opWrapSink()方法直到最开始(不包括stage0,由于stage0表明数据源,不包含操做),就能够获得一个表明了流水线上全部操做的Sink,用代码表示就是这样:

code6 AbstractPipeline.wrapSink

如今流水线上从开始到结束的全部的操做都被包装到了一个Sink里,执行这个Sink就至关于执行整个流水线,执行Sink的代码以下:

code7 AbstractPipeline.copyInto

上述代码首先调用wrappedSink.begin()方法告诉Sink数据即将到来,而后调用spliterator.forEachRemaining()方法对数据进行迭代,最后调用wrappedSink.end()方法通知Sink数据处理结束。逻辑如此清晰。


做者:Huang Rongpeng
相关文章
相关标签/搜索