java8的Stream很重要,spring-reactor里面用到reactor-core,而java8的stream与之很类似,搞懂了再看reactor-core一定事半功倍。
先看一下它的强大,这里只是冰山一角:
从List<Student> 列表中取出name,将name组成一个List。
老代码java
List<String> nameList = new ArrayList(); if(null != list){ for(Student stu : list){ nameList.add(stu.getName()); } }
JAVA8react
List<String> nameList = Optional.ofNullable(list).orElse(Collections.emptyList()).stream() .map(Stu::getName).collect(Collectors.toList());
这里给你们演示一下经过Stream.of建立Stream。
常见的集合经过stream()方法均可以建立Stream。 其实他们最终都是调用如下方法建立的。算法
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) { Objects.requireNonNull(spliterator); return new ReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel); }
Stream.of有两种建立Stream的方法。
第一种spring
Stream.of("a1")
第二种数组
Stream.of("a1","a2"); //这种经过Arrays.stream 构建
这里介绍两个相关的类:app
若是是单个元素,直接使用Spliterator进行构建。 若是是多个元素,会有一个优化,使用SpineBuffer构建。
若是是大数组,使用SpineBuffer,小数组是使用ArrayList。 如何使用SpineBuffer构建?less
Stream.builder().add("a1").add("a2").build();
stream的操做分为两种:
一种是中间操做,就是不须要结果,只须要记录这个过程,通常返回Stream对象都是属于这种
一种是终极操做,就是当即须要返回结果,通常返回非Stream对象,都是属于这种。
stream的状态分为三种:
第一种:Head,第一次建立的时候就是这种
第二种:Stateless,无状态,每一个对象的操做是独立的。
第三种:Stateful,有状态,须要联合多个象才能得出结果。
stream操做特性:
操做特性是指:该stream有固定大小,大小不固定,操做有序,数据有序等。ide
顾名思义:对 Stream进行filter,而后返回新的Stream。 由前一节咱们知道,stream的具体数据存储在Spliterator中。而它自己能够理解为只是一个算法。
filter只是一个中间操做,咱们只须要记录这一个过程就OK了。而后返回新的Stream。若是再次调用fileter,会再次返回一个新Stream。
上面是一个流程图,Sink是包装算子的一个类,好比调用filter,从Head里面拿到对象,通过第一个Sink,再通过第二个Sink的运算,最终获得结果。
下面是Strea.filter的源码实现:优化
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) { Objects.requireNonNull(predicate); return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { return new Sink.ChainedReference<P_OUT, P_OUT>(sink) { @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(P_OUT u) { //若是经过当前filter,就进入下一个算子 if (predicate.test(u)) downstream.accept(u); } }; } }; }
这个方法能够理解为调试方法,它不对结果产生任何影响,将数据原封不动的传给下一个算子ui
public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) { Objects.requireNonNull(action); return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, 0) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { return new Sink.ChainedReference<P_OUT, P_OUT>(sink) { @Override public void accept(P_OUT u) { action.accept(u); downstream.accept(u); } }; } }; }
算子应该是经过一个对象映身成一个Stream,而后调用foreach,将每一个元素传递到下一个算子。
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) { Objects.requireNonNull(mapper); // We can do better than this, by polling cancellationRequested when stream is infinite return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<P_OUT, R>(sink) { @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(P_OUT u) { try (Stream<? extends R> result = mapper.apply(u)) { // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it if (result != null) result.sequential().forEach(downstream); } } }; } }; }
与上面的相似,只是映射成另外一个对象
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { Objects.requireNonNull(mapper); return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<P_OUT, R>(sink) { @Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } }; } }; }
这是一个有状态的操做,由于它返回必定数据的数据组成的Stream。 这里只贴一段核心算法:
Sink<T> opWrapSink(int flags, Sink<T> sink) { return new Sink.ChainedReference<T, T>(sink) { long n = skip; long m = limit >= 0 ? limit : Long.MAX_VALUE; @Override public void begin(long size) { downstream.begin(calcSize(size, skip, m)); } @Override public void accept(T t) { if (n == 0) { if (m > 0) { m--; downstream.accept(t); } } else { n--; } } @Override public boolean cancellationRequested() { return m == 0 || downstream.cancellationRequested(); } }; }
这个与Stram.limit相似,两个联合起来就能够分面查询了。
排序,若是没传比较器就用默认的。
若是有顺序,就不用排序了,若是给定大小了就用一个固定大小的数组来排序,不然用一个列来来排序。
public Sink<T> opWrapSink(int flags, Sink<T> sink) { Objects.requireNonNull(sink); // If the input is already naturally sorted and this operation // also naturally sorted then this is a no-op if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort) return sink; else if (StreamOpFlag.SIZED.isKnown(flags)) return new SizedRefSortingSink<>(sink, comparator); else return new RefSortingSink<>(sink, comparator); }
经过排序,分页,说明这个算子须要支持开始,结束方法。还须要一个取消方法,为何了,好比第一个Stream有20个对象,可是后面只须要第一个,因此我第一个算子给到你一个数据时,第一个算子就须要终止了。
下面看一个anyMatch是怎么实现的。
@Override public final boolean anyMatch(Predicate<? super P_OUT> predicate) { return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY)); }
第二步,主要是用当前stream,和原始的数据容器spliterator
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); }
第三步,最后一个算子和原始容器
@Override public <S> Boolean evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator) { return helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).getAndClearState(); }
第四步 包装算子
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) { Objects.requireNonNull(sink); for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink<P_IN>) sink; }
第五步 数据传递
@Override final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) { copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator); return sink; } final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { Objects.requireNonNull(wrappedSink); //知足要求后,是否须要中止计算 if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { //须要中止计算 copyIntoWithCancel(wrappedSink, spliterator); } }
只须要一个Sink,而后调用wrapSink,再copyInto就能够实现了
final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel) { return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel); }