Java8中提供了Stream对集合操做做出了极大的简化,学习了Stream以后,咱们之后不用使用for循环就能对集合做出很好的操做。
1、流的初始化与转换:
Java中的Stream的全部操做都是针对流的,因此,使用Stream必需要获得Stream对象:
一、初始化一个流:
Stream stream = Stream.of("a", "b", "c");
二、数组转换为一个流:
String [] strArray = new String[] {"a", "b", "c"};
stream = Stream.of(strArray);
或者
stream = Arrays.stream(strArray);
三、集合对象转换为一个流(Collections):
List<String> list = Arrays.asList(strArray);
stream = list.stream();
2、流的操做:html
流的操做能够归结为几种:java
一、遍历操做(map):编程
使用map操做能够遍历集合中的每一个对象,并对其进行操做,map以后,用.collect(Collectors.toList())会获得操做后的集合。数组
1.一、遍历转换为大写:
List<String> output = wordList.stream().
map(String::toUpperCase).数据结构
collect(Collectors.toList());app
1.二、平方数:
List<Integer> nums = Arrays.asList(1, 2, 3, 4);
List<Integer> squareNums = nums.stream().
map(n -> n * n).
collect(Collectors.toList());框架
二、过滤操做(filter):less
使用filter能够对象Stream中进行过滤,经过测试的元素将会留下来生成一个新的Stream。ide
2.一、获得其中不为空的String
List<String> filterLists = new ArrayList<>();
filterLists.add("");
filterLists.add("a");
filterLists.add("b");
List afterFilterLists = filterLists.stream()
.filter(s -> !s.isEmpty())函数式编程
.collect(Collectors.toList());
三、循环操做(forEach):
若是只是想对流中的每一个对象进行一些自定义的操做,可使用forEach:
List<String> forEachLists = new ArrayList<>();
forEachLists.add("a");
forEachLists.add("b");
forEachLists.add("c");
forEachLists.stream().forEach(s-> System.out.println(s));
四、返回特定的结果集合(limit/skip):
limit 返回 Stream 的前面 n 个元素;skip 则是扔掉前 n 个元素:
List<String> forEachLists = new ArrayList<>();
forEachLists.add("a");
forEachLists.add("b");
forEachLists.add("c");
forEachLists.add("d");
forEachLists.add("e");
forEachLists.add("f");
List<String> limitLists = forEachLists.stream().skip(2).limit(3).collect(Collectors.toList());
注意skip与limit是有顺序关系的,好比使用skip(2)会跳过集合的前两个,返回的为c、d、e、f,而后调用limit(3)会返回前3个,因此最后返回的c,d,e
五、排序(sort/min/max/distinct):
sort能够对集合中的全部元素进行排序。max,min能够寻找出流中最大或者最小的元素,而distinct能够寻找出不重复的元素:
5.一、对一个集合进行排序:
List<Integer> sortLists = new ArrayList<>();
sortLists.add(1);
sortLists.add(4);
sortLists.add(6);
sortLists.add(3);
sortLists.add(2);
List<Integer> afterSortLists = sortLists.stream().sorted((In1,In2)->
In1-In2).collect(Collectors.toList());
5.二、获得其中长度最大的元素:
List<String> maxLists = new ArrayList<>();
maxLists.add("a");
maxLists.add("b");
maxLists.add("c");
maxLists.add("d");
maxLists.add("e");
maxLists.add("f");
maxLists.add("hahaha");
int maxLength = maxLists.stream().mapToInt(s->s.length()).max().getAsInt();
System.out.println("字符串长度最长的长度为"+maxLength);
5.三、对一个集合进行查重:
List<String> distinctList = new ArrayList<>();
distinctList.add("a");
distinctList.add("a");
distinctList.add("c");
distinctList.add("d");
List<String> afterDistinctList = distinctList.stream().distinct().collect(Collectors.toList());
其中的distinct()方法能找出stream中元素equal(),即相同的元素,并将相同的去除,上述返回即为a,c,d。
六、匹配(Match方法):
有的时候,咱们只须要判断集合中是否所有知足条件,或者判断集合中是否有知足条件的元素,这时候就可使用match方法:
allMatch:Stream 中所有元素符合传入的 predicate,返回 true
anyMatch:Stream 中只要有一个元素符合传入的 predicate,返回 true
noneMatch:Stream 中没有一个元素符合传入的 predicate,返回 true
6.一、判断集合中没有有为‘c’的元素:
List<String> matchList = new ArrayList<>();
matchList.add("a");
matchList.add("a");
matchList.add("c");
matchList.add("d");
boolean isExits = matchList.stream().anyMatch(s -> s.equals("c"));
6.二、判断集合中是否全不为空:
List<String> matchList = new ArrayList<>();
matchList.add("a");
matchList.add("");
matchList.add("a");
matchList.add("c");
matchList.add("d");
boolean isNotEmpty = matchList.stream().noneMatch(s -> s.isEmpty());
则返回的为false
转自:https://blog.csdn.net/happyheng/article/details/52832313
在这篇文章中,将会对流的实现原理进行深度,解析,具体关于如何使用,请参考《Java8函数式编程》。
在深刻原理以前,咱们有必要知道关于Stream的一些基础知识,关于Stream的操做分类,如表1-1所示。
表1-1 Stream的经常使用操做分类(表格引自这里)
如表1-1中所示,Stream中的操做能够分为两大类:中间操做与结束操做,中间操做只是对操做进行了记录,只有结束操做才会触发实际的计算(即惰性求值),这也是Stream在迭代大集合时高效的缘由之一。中间操做又能够分为无状态(Stateless)操做与有状态(Stateful)操做,前者是指元素的处理不受以前元素的影响;后者是指该操做只有拿到全部元素以后才能继续下去。结束操做又能够分为短路与非短路操做,这个应该很好理解,前者是指遇到某些符合条件的元素就能够获得最终结果;然后者是指必须处理全部元素才能获得最终结果。
在探究Stream的执行原理以前,咱们先看以下两段代码(本文将以code_1为例进行说明):
code_1
public static void main(String[] args) { List<String> list = Lists.newArrayList( "bcd", "cde", "def", "abc"); List<String> result = list.stream() //.parallel() .filter(e -> e.length() >= 3) .map(e -> e.charAt(0)) //.peek(System.out :: println) //.sorted() //.peek(e -> System.out.println("++++" + e)) .map(e -> String.valueOf(e)) .collect(Collectors.toList()); System.out.println("----------------------------"); System.out.println(result); }
code_2
public void targetMethod() { List<String> list = Lists.newArrayList( "bcd", "cde", "def", "abc"); List<String> result = Lists.newArrayListWithCapacity(list.size()); for (String str : list) { if (str.length() >= 3) { char e = str.charAt(0); String tempStr = String.valueOf(e); result.add(tempStr); } } System.out.println("----------------------------"); System.out.println(result); }
很明显,在最终结果上而言,code_1与code_2是等价的。那么,Stream是怎么作的呢?显然不是每次操做都进行迭代,由于这对于执行时间与存储中间变量来讲都将是噩梦。
显然,若是code_2只对集合迭代了一次,也就是说至关高效。那么这么作有没有弊端?有!模板代码、中间变量、不利于并行都是其存在的问题。可是按着code_2的思路能够知道有如下几个问题须要解决:
那么Stream是如何解决的呢?所谓源码之下,无所遁形。那么,首先来看一下Stream包的结构(如图1-1所示)。
图1-1 Stream包的结构示意图
其中各个部分的主要功能为:
咱们单独把第二部分拎出来用于说明Stream的惰性求值实现,如图1-2所示,Java8针对Int、long、double进行了优化,主要用于频繁的拆装箱。咱们以引用类型进行介绍,在图中已经标为绿色。
图1-2
关于操做如何记录,在JDK源码注释中屡次用(操做)stage来标识用户的每一次操做,而一般状况下Stream的操做又须要一个回调函数,因此一个完整的操做是由数据来源、操做、回调函数组成的三元组来表示。而在具体实现中,使用实例化的ReferencePipeline来表示,即图1-2中的Head、StatelessOp、StatefulOp的实例。
如code_三、code_4所示为调用stream.map()的关键的两个方法,在用户
调用一系列操做后会造成如图1-3所示的双链表结构。
图1-3
code_3(ReferencePipeline.map())
@Override @SuppressWarnings("unchecked") public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { Objects.requireNonNull(mapper); return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<P_OUT, R>(sink) { @Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } }; } }; }
code_4(AbstractPipeline.AbstractPipeline())
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) { if (previousStage.linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; this.previousStage = previousStage; this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); this.sourceStage = previousStage.sourceStage; if (opIsStateful()) sourceStage.sourceAnyStateful = true; this.depth = previousStage.depth + 1; }
在上一步已经在stage中记录了每一步操做,此时并无执行。可是stage只是保存了当前的操做,并不能肯定下一个stage须要何种操做,何种数据,其实JDK为此定义了Sink接口,其中只有begin()、end()、cancellationRequested()、accept()四个接口(如表1-2所示,摘自这里),其中中间操做的子类中包含一个指向下游sink的指针。
表1-2
如今转向code_3,能够看出,在satge链中,每一步都包含了opWrapSink()。当调用终结操做时,将会触发code_5从最后一个stage(终结操做产生的satge)开始,递归产生图1-4所示的结构。
code_5(AbstractPipeline.wrapSink())
@Override @SuppressWarnings("unchecked") final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) { Objects.requireNonNull(sink); for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink<P_IN>) sink; }
图1-4
全部的操做已经造成了图1-4的结构,接下来就会触发code_6,此时结果就会产生对应的结果啦!
code_6(AbstractPipelie.copyInto())
@Override final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { copyIntoWithCancel(wrappedSink, spliterator); } }
那么,Stream是如何并行执行的呢?其实产生stage链的过程和串行并无区别,只是在最终执行时进行了相应的调整,咱们将code_1改变为code_7
code_7
public static void main(String[] args) { List<String> list = Lists.newArrayList( "bcd", "cde", "def", "abc"); List<String> result = list.stream() .parallel() .filter(e -> e.length() >= 3) //.map(e -> e.charAt(0)) //.peek(System.out :: println) .sorted() //.peek(e -> System.out.println("++++" + e)) .map(e -> String.valueOf(e)) .collect(Collectors.toList()); System.out.println("----------------------------"); System.out.println(result); }
那么最终产生的stage链与sink的结构如图1-5所示,由于此时stage链中有一个有状态操做(sorted()),也就是说在这里必须处理完全部元素才能进行下一步操做。那么此时不管是并行仍是串行,此时都会产生两个sink链,也就是表明了两次迭代,才产生了最终结果。
图1-5
那么,到底是如何并行的呢?其实当调用collect操做时会调用code_8,其中的evaluateParallel()如code_9所示。
code_8(AbstractPipeline.evaluate())
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); }
code_9(ReduceOp.evaluateParallel())
@Override public <P_IN> R evaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { return new ReduceTask<>(this, helper, spliterator).invoke().get(); }
其实Stream的并行处理是基于ForkJoin框架的,相关类与接口的结构如图1-6所示。其中AbstractShortCircuitTask用于处理短路操做,其余相关操做相似,会产生对应的Task。
图1-6
关于code_8中获取源Spliterator,如code_10所示,
code_10(AbstractPipeline.sourceSpliterator())
@SuppressWarnings("unchecked") private Spliterator<?> sourceSpliterator(int terminalFlags) { Spliterator<?> spliterator = null; if (sourceStage.sourceSpliterator != null) { spliterator = sourceStage.sourceSpliterator; sourceStage.sourceSpliterator = null; } else if (sourceStage.sourceSupplier != null) { spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get(); sourceStage.sourceSupplier = null; } else { throw new IllegalStateException(MSG_CONSUMED); } if (isParallel() && sourceStage.sourceAnyStateful) { //若是是并行流而且有stage包含stateful操做 //那么就会依次遍历stage,直到遇到stateful stage时 int depth = 1; for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this; u != e; u = p, p = p.nextStage) { int thisOpFlags = p.sourceOrOpFlags; if (p.opIsStateful()) { depth = 0; if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) { //若是有短路操做,则去除相应标记 thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT; } //尽可能以惰性求值的方式进行操做 spliterator = p.opEvaluateParallelLazy(u, spliterator); thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED) ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED; } p.depth = depth++; p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags); } } if (terminalFlags != 0) { // Apply flags from the terminal operation to last pipeline stage combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags); } return spliterator; }
关于各个task就行是如何并行执行,其实最终调用的是code_11所示,对应的流程如图1-7所示,其中交替fork子节点是为了缓和数据分片不均形成的性能退化。
code_11(AbstractTask.compute())
@Override public void compute() { Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators long sizeEstimate = rs.estimateSize(); long sizeThreshold = getTargetSize(sizeEstimate); boolean forkRight = false; @SuppressWarnings("unchecked") K task = (K) this; while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) { K leftChild, rightChild, taskToFork; task.leftChild = leftChild = task.makeChild(ls); task.rightChild = rightChild = task.makeChild(rs); task.setPendingCount(1); if (forkRight) { forkRight = false; rs = ls; task = leftChild; taskToFork = rightChild; } else { forkRight = true; task = rightChild; taskToFork = leftChild; } taskToFork.fork(); sizeEstimate = rs.estimateSize(); } task.setLocalResult(task.doLeaf()); task.tryComplete(); }
图1-7
数据大小;源数据结构(分割越容易越好),arraylist、数组比较好,hashSet、treeSet次之,linked最差;装箱;核的数量(可以使用);单元处理开销(越大越好)
终结操做之外的操做,尽可能避免反作用,避免突变基于堆栈的引用,或者在执行过程当中进行任何I/O;传递给流操做的数据源应该是互不干扰(避免修改数据源)。
本文主要探究了Stream的实现原理,并无涉及到具体的流操做的用法(读者能够参考《java8函数式编程》),而且给出了使用Stream的部分建议。
深刻理解Java Stream流水线
Java 8 Stream探秘
java.util.stream 库简介
Dorae 转载注明出处 http://www.cnblogs.com/Dorae/