相信如今不少人都已经使用过java8提供的java.util.stream编程接口,用起来是如此的爽。有如这夏天里一缕清爽凉风,让你眼前一亮的女神。所以我就想试着去了解女神的心里,她为什么如此的美丽高雅。。。下面咱们就试着去了解Stream API。java
首先咱们看一下stream的基本使用方法:编程
ArrayList<String> list = Lists.newArrayList("America", "ABC", "CNN", "OK", "ASYNC"); List<String> strings = list.stream().filter(e -> e.startsWith("A")).map(e -> e + " nice").collect(Collectors.toList());
最终咱们会获得ArrayList中以A开头的字母加上“nice”的字符串List,若是放在jdk7里咱们会这样写:数据结构
ArrayList<String> strings = Lists.newArrayList(); for (String s : list) { if(s.startsWith("A")){ String newStr = s + "nice"; strings.add(newStr); } }
我试着去看源代码,发现Stream实质上就是这样执行咱们的需求的。下面就说说我看到了什么。app
打开java.util.stream
包,能够看到核心接口Stream类,顾名思义就是流水的意思,官方文档原话说的是less
A sequence of elements supporting sequential and parallel aggregate operations.
Stream就是一个支持串行和并行的汇集操做的一系列元素。
定义了一些中间操做(Intermediate operations)和结束操做(Terminal operations),
中间操做包括无状态(Stateless)操做好比:filter, map, flatMap等,有状态(Stateful)操做好比:distinct, sorted, limit等;
结束操做(Terminal operations)包括非短路操做(short-circuiting)好比:forEach, reduce, collect等和短路操做如:findFirst, findAny;ide
中间操做不是真正的操做而是一种操做的描述,只有执行到结束操做才会触发实际计算,在结束操做执行以前只是把中间操做记录了下来。无状态中间操做指元素的操做不受其余元素的影响,好比以某一Predicate去filter元素,元素和元素以前不互相影响。而有状态中间操做指的是元素和元素之间是有关联的,好比sorted,只有读取全部元素以后才能肯定排序结果。函数
短路结束操做指的是不用处理全部元素才能返回结果,好比findFirst,只要找到第一个符合条件的元素便可返回结果。非短路结束操做则必须处理完全部元素才能返回结果。学习
Stream继承了BaseStream,定义了一些Stream的基本操做。ui
以上所说的操做须要被按顺序记录下来,这里就须要管道流水线Pipeline的概念来实现。this
管道有一个基类PipelineHelper,他是执行Stream管道的一个helper,将Stream的全部信息收集到一个地方。
上面所说的操做其实都定义在PipelineHelper的一个子类ReferencePipeline中,包括Head(Source stage of a ReferencePipeline)、StatelessOp(Base class for a stateless intermediate stage of a Stream.)、StatefulOp(Base class for a stateful intermediate stage of a Stream.)静态内部类。
ReferencePipeline是描述中间操做管道流和源管道流的一个类,同时也实现了Stream接口
在Stream中使用stage(阶段)来描述一个完整的操做,而Head、StatelessOp、StatefulOp这三个操做都是实例化的PipelineHelper,也就是stage。能够把stage理解为带管道的流(Stream with Pipeline)
在本文一开始的例子中,咱们分析一下有几个stage,下图:
每一步Stream的方法调用都产生一个新的stage,在随后的分析中会发现,这些stage会以双向链表的方式连接,而每一个stage都记录了每个阶段的操做,这样咱们就能够依赖这种数据结构来保存对数据源的全部操做了。
stage的连接靠Sink来实现,咱们先看一下Sink的接口,咱们这里只看ChainedReference
ChainedReference包括:
每一个stage都把操做实如今Sink里,上游stage调用下游stage的accept方法,达到按顺序执行每一个操做的目的。
直接上代码
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) { Objects.requireNonNull(predicate); return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { return new Sink.ChainedReference<P_OUT, P_OUT>(sink) { @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(P_OUT u) { if (predicate.test(u)) downstream.accept(u); } }; } }; }
上面代码是Stream的filter方法,fiter是一个无状态操做,返回一个新的stage,还实现了AbstractPipeline.opWrapSink来返回stage实现的sink。这里filter的参数是一个predicate,在predicate.test返回true时调用下游的stage的sink的accept方法,这样整个操做流就连续执行下去了。
在说Stream自动执行以前,有必要说一说每一个stage是怎么连接起来的。Stream在操做时产生的Operation类是如何用双向链表的结构来先后连接的?
在上面Stream.filter的源代码能够看到,filter返回了一个StatelessOp对象,构造函数接受了当前对象this为第一个参数,而后来看StatelessOp的代码:
abstract static class StatelessOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> { /** * Construct a new Stream by appending a stateless intermediate * operation to an existing stream. * * @param upstream The upstream pipeline stage * @param inputShape The stream shape for the upstream pipeline stage * @param opFlags Operation flags for the new stage */ StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, StreamShape inputShape, int opFlags) { super(upstream, opFlags); assert upstream.getOutputShape() == inputShape; } @Override final boolean opIsStateful() { return false; } }
能够看到StatelessOp实现了ReferencePipeline接口,在构造函数里调用了super(upstream, opFlags),而这个upstream(上游流)参数就是上面传入的this,下游流StatelessOp的upstream就指向this了,这样就经过下游流的upstream连接上游流。目前每一个操做之间还只是单链表。
那有人就会想了,下游流保存了上游流的引用,那上游流是怎么保存下游流的引用呢?这就要看最后的结束操做了,咱们来看Stream.collect代码:
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) { A container; if (isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { container = collector.supplier().get(); BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator(); forEach(u -> accumulator.accept(container, u)); } else { container = evaluate(ReduceOps.makeRef(collector)); } return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container); }
这里咱们只看串行操做的分支。filter返回了一个结束操做的计算结果。咱们来看evaluate方法:
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); }
AbstractPipeline.evaluate方法接收了一个结束操做对象,咱们只看串行操做:
public <P_IN> R evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { return helper.wrapAndCopyInto(makeSink(), spliterator).get(); }
继续看AbstractPipeline.wrapAndCopyInto:
@Override final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) { copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator); return sink; } @Override final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { copyIntoWithCancel(wrappedSink, spliterator); } }
AbstractPipeline.wrapAndCopyInto接收告终束操做的sink,继续看AbstractPipeline.wrapSink:
@Override @SuppressWarnings("unchecked") final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) { Objects.requireNonNull(sink); for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink<P_IN>) sink; }
从结束操做的sink开始,一层一层包装sink,最后第一个中间操做的sink在最外层,在每一个操做的opWrapSink方法里返回的sink都维护了一个downstream指向后一个操做,这样,双向链表的结构就完成了。这样,咱们在copyInto方法里调用begin、accept、end的时候就会经过downstream一层一层的调用下去,最终在结束操做执行实际计算。
结束Stream的基本原理就分析到这里,比较浅,并且思路也有点混乱,有不少问题在里面,但愿你们和我一块儿讨论学习。但愿看不明白的童鞋能够向我提问,看过源码的童鞋欢迎指出错误!你们一块儿学习!