概念:java
这个Stream并不是是I/O流里的Stream,也不是集合元素,更不是数据结构,它是JDK1.8带来的新特性,是一种用函数式编程在集合类上进行复杂操做的工具。Stream就像工厂里的流水线同样,有输入和输出。Stream不能够重复遍历集合里面的数据,数据在Stream里面就像水在渠道里面同样,流过了就一去不复返。编程
简而言之,Stream是之内部迭代的方式处理集合数据的操做,内部迭代能够将更多的控制权交给集合类。Stream 和 Iterator 的功能相似,只不过 Iterator 是之外部迭代的形式处理集合的数据。数组
在JDK1.8之前,对集合的操做须要写出处理的过程,如在集合中筛选出知足条件的数据,须要一 一遍历集合中的每一个元素,再把每一个元素逐一判断是否知足条件,最后将知足条件的元素保存返回。而Stream 对集合筛选的操做提供了一种更为便捷的操做,只需将实现函数接口的筛选条件做为参数传递进来,Stream会自行操做并将合适的元素一样以 stream 的方式返回,最后进行接收便可。bash
内部迭代与外部迭代:数据结构
使用for循环等利用Iterator进行迭代操做的,咱们都叫作外部迭代,而使用stream流进行迭代操做的叫作内部迭代。内部迭代最明显的好处就是当数量很大的状况下,咱们不须要对数据进行拆分,而且能够经过调用指定函数实现并行遍历。多线程
外部迭代示例代码:dom
int[] nums = {1, 2, 3}; // 循环属于外部迭代 int sum = 0; for (int num : nums) { sum += num; } System.out.println("计算结果为:" + sum); // 计算结果为:6
使用stream内部迭代示例代码:ide
int[] nums = {1, 2, 3}; // 使用stream进行内部迭代 int sum = IntStream.of(nums).sum(); System.out.println("计算结果为:" + sum); // 计算结果为:6
使用stream流操做时的一些概念:函数式编程
如何区分中间操做和终止操做呢?能够根据操做的返回值类型判断,若是返回值是Stream,则该操做为中间操做。若是返回值不是Stream或者为空,则该操做是终止操做。函数
以下图所示,前两个操做是中间操做,只有最后一个操做是终止操做:
能够形象地理解Stream的操做是对一组粗糙的工艺品原型(即对应的 Stream 数据源)进行加工成颜色统一的工艺品(即最终获得的结果),第一步筛选出合适的原型(即对应Stream的 filter 的方法),第二步将这些筛选出来的原型工艺品上色(对应Stream的map方法),第三步取下这些上好色的工艺品(即对应Stream的 collect(toList())方法)。在取下工艺品以前进行的操做都是中间操做,能够有多个或者0个中间操做,但每一个Stream数据源只能有一次终止操做,不然程序会报错。
接下来,咱们经过一个简单的示例来演示以上所提到的几个概念,代码以下:
package org.zero01.example.demo; import java.util.stream.IntStream; public class StreamDemo { public static void main(String[] args) { int[] nums = {1, 2, 3}; // IntStream建立数字流,of则是输入一个数组,这里的map就是中间操做(返回stream流的操做),sum则是终止操做 int sum = IntStream.of(nums).map(i -> i * 2).sum(); System.out.println("计算结果为:" + sum); System.out.println("惰性求值就是终止操做没有调用的状况下,中间操做不会执行"); IntStream.of(nums).map(StreamDemo::doubleNum); } public static int doubleNum(int i) { System.out.println("doubleNum 方法执行了"); return i * 2; } }
运行以上代码,控制台输出结果以下,能够看到因为惰性求值的缘由,doubleNum方法没有被调用:
计算结果为:12 惰性求值就是终止操做没有调用的状况下,中间操做不会执行
对一个流完整的操做过程:流的建立 -> 中间操做 -> 终止操做。流的建立是第一步,而流的常见建立方式以下表:
代码示例以下:
public static void main(String[] args) { List<String> list = new ArrayList<>(); // 从集合建立流 list.stream(); // 从集合建立并行流 list.parallelStream(); // 从数组建立流 Arrays.stream(new int[]{1, 2, 3, 4, 5}); // 建立数字流 IntStream.of(1, 2, 3, 4, 5); // 建立1-10的数字流 IntStream.rangeClosed(1, 10); // 使用random建立一个无限流,须要调用limit来限制大小,不然会报错 new Random().ints().limit(10); // 本身建立流 Random random = new Random(); Stream.generate(() -> random.nextInt()).limit(20); }
而后咱们来看看流的中间操做,中间操做分类两类,一类是无状态操做,一类则是有状态操做。以下表:
无状态操做:
有状态操做:
共同点:
代码示例以下:
public static void main(String[] args) { String str = "my name is zero"; // 把每一个单词的长度打印出来 System.out.println("---------------map---------------"); Stream.of(str.split(" ")).map(String::length).forEach(System.out::println); // 只打印长度大于2的单词 System.out.println("---------------filter---------------"); Stream.of(str.split(" ")).filter(s -> s.length() > 2) .map(String::length).forEach(System.out::println); // flatMap 适合用于A元素下有B属性,而且这个B属性是个集合,最终获得全部的A元素里面的全部B属性集合 // 这里调用了 boxed() 方法的缘由是intStream\LongStream等数字流并不是是Stream的子类,因此须要装箱 System.out.println("---------------flatMap---------------"); Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed()) .forEach(integer -> System.out.println((char) integer.intValue())); // peek 通常用于debug,相似于forEach,不一样的是peek是中间操做而forEach是终止操做 System.out.println("---------------peek---------------"); Stream.of(str.split(" ")).peek(System.out::println).forEach(System.out::println); // limit 主要用于限制无限流,咱们能够结合filter来产生特定区间的随机数 System.out.println("---------------limit---------------"); new Random().ints().filter(i -> i > 100 && i < 1000).limit(10).forEach(System.out::println); }
接下来咱们看看流的终止操做,一样的,终止操做也分类两类,分别是短路操做和非短路操做。以下表:
短路操做:
非短路操做:
具体代码及注释,请参考以下示例:
public static void main(String[] args) { String str = "my name is zero"; // 一般会在使用并行流的时候使用forEachOrdered,forEachOrdered能够在并行的状况下保证元素顺序的一致 str.chars().parallel().forEachOrdered(i -> System.out.print((char) i)); System.out.println(); // 而forEach则没法在并行的状况下保证元素顺序的一致 str.chars().parallel().forEach(i -> System.out.print((char) i)); System.out.println(); // collect属于收集器,使用能够将放入流里面的数据收集成集合类型 List<String> list = Stream.of(str.split(" ")).collect(Collectors.toList()); System.out.println(list); // reduce用于缩减、归约数据集,咱们可使用reduce来将数组拼接成一个字符串 Optional<String> letters = Stream.of(str.split(" ")).reduce((s1, s2) -> s1 + "|" + s2); System.out.println(letters.orElse("")); // 带初始化值的reduce,这样咱们就无需经过Optional去判断空值了 String reduce = Stream.of(str.split(" ")).reduce("", (s1, s2) -> s1 + "|" + s2); System.out.println(reduce); // 使用reduce计算字符串长度的总和 Integer lengthCount = Stream.of(str.split(" ")).map(String::length).reduce(0, (s1, s2) -> s1 + s2); System.out.println(lengthCount); // 使用max能够经过传入比较器在一组数据中取出最大值 Optional<String> max = Stream.of(str.split(" ")).max(Comparator.comparingInt(String::length)); System.out.println(max.orElse("")); // 使用findFirst拿出第一个元素 OptionalInt first = new Random().ints().findFirst(); System.out.println(first.orElse(0)); // 使用findAny随机拿出一个元素 OptionalInt any = new Random().ints().findAny(); System.out.println(any.orElse(0)); // 使用allMatch匹配全部的元素是否都为zero boolean is = Stream.of(str.split(" ")).allMatch("zero"::equals); System.out.println(is); }
以上的例子中大多数建立的都是单线程流,其实咱们能够建立多线程并行的Stream流,即并行流。使用并行流时,咱们并不需关心多线程执行以及任务拆分等问题,由于Stream都已经帮咱们管理好了,因此用起来也是很方便的。
咱们先来看一个不使用并行流的示例,如下代码会每隔3秒打印一行信息:
public static void main(String[] args) { // 不使用并行流 IntStream.range(1, 100).peek(ParallelStreamDemo::debug).sum(); } public static void debug(int i) { System.out.println("debug" + i); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } }
运行结果以下:
而使用并行流后,会发现同时打印了多行信息。代码以下:
public static void main(String[] args) { // 不使用并行流 IntStream.range(1, 100).parallel().peek(ParallelStreamDemo::debug).sum(); } public static void debug(int i) { System.out.println("debug" + i); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } }
至于同时会打印多少行,默认取决于cpu的核心数量,例如我电脑cpu有4个核心,因此会同时打印四行,也就是说开启了四个线程。运行结果以下:
经过以上的例子,咱们得知能够调用parallel方法来建立并行流。那么一样的,也能够调用相似的方法建立串行流,这个方法就是sequential。若是如今有一个需求:当进行第一步操做时需使用并行流,而第二步操做则需使用串行流。那么咱们能够经过结合这两个方法来实现这个需求吗?咱们来看一个简单的例子就知道了,代码以下:
public static void main(String[] args) { IntStream.range(1, 100) // 1.调用parallel产生并行流 .parallel().peek(ParallelStreamDemo::debug) // 2.调用sequential产生串行流 .sequential().peek(ParallelStreamDemo::debug2).sum(); } public static void debug(int i) { System.out.println("debug" + i); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } } public static void debug2(int i) { System.err.println("debug2" + i); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } }
运行结果以下:
从运行结果能够看到,运行过程始终是串行的,是一行行打印的。因此能够得出一个结论:屡次调用 parallel/sequential方法,会以最后一次调用的为准,天然就没法实现以上所提到的需求了。
接下来咱们看看并行流里线程相关的东西,在上文中,咱们提到了默认状况下,并行流开启的线程数量取决于cpu的核心数量。那么并行流使用的是哪一个线程池?如何设置开启的线程数量?
先来回答第一个问题,并行流里使用的线程池是java.util.concurrent.ForkJoinPool
,这一点能够直接在方法里打印线程名称得知,因此这里就不演示了。对ForkJoinPool感兴趣的话,能够查阅fork/join相关的概念。
关于第二个设置线程数量的问题,则是须要在建立并行流以前,设置ForkJoinPool里parallelism属性的值,例如我要开启20个线程,具体代码以下:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
还有一点须要注意的就是,全部的并行流默认都将会使用同一个ForkJoinPool线程池,若咱们的并行任务比较多的话,可能会出现任务阻塞的状况。若是想要防止一些比较关键的任务出现阻塞的状况,则须要自行建立线程池去处理这些任务。以下示例:
public static void main(String[] args) { // 使用本身建立的线程池,不使用默认线程池,防止任务阻塞 ForkJoinPool forkJoinPool = new ForkJoinPool(20); forkJoinPool.submit(() -> IntStream.range(1, 100).parallel().peek(ParallelStreamDemo::debug).sum()); forkJoinPool.shutdown(); // 关闭线程池 // 防止主线程提早结束 synchronized (forkJoinPool) { try { forkJoinPool.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } }
本小节咱们来看一下收集器相关的东西,收集器就是将流处理完后的数据收集起来,例如将数据收集到一个集合里,或者对数据求和、拼接成字符串等行为都属于收集器。
如下使用一组例子来演示一下收集器的常见使用方式。首先定义一个Student类以及相关的枚举类,代码以下:
// ...省略getter/setter以及全参/无参构造函数... public class Student { private String name; private int age; private Gender gender; private Grade grade; } /** * 性别 */ enum Gender { MALE, FEMALE } /** * 班级 */ enum Grade { ONE, TWO, THREE, FOUR; }
使用收集器的示例代码以下:
public static void main(String[] args) { // 测试数据 List<Student> students = Arrays.asList( new Student("小明", 10, Gender.MALE, Grade.ONE), new Student("大明", 9, Gender.MALE, Grade.THREE), new Student("小白", 8, Gender.FEMALE, Grade.TWO), new Student("小黑", 13, Gender.FEMALE, Grade.FOUR), new Student("小红", 7, Gender.FEMALE, Grade.THREE), new Student("小黄", 13, Gender.MALE, Grade.ONE), new Student("小青", 13, Gender.FEMALE, Grade.THREE), new Student("小紫", 9, Gender.FEMALE, Grade.TWO), new Student("小王", 6, Gender.MALE, Grade.ONE), new Student("小李", 6, Gender.MALE, Grade.ONE), new Student("小马", 14, Gender.FEMALE, Grade.FOUR), new Student("小刘", 13, Gender.MALE, Grade.FOUR)); // 获得全部学生的年龄列表 List<Integer> ages = students.stream().map(Student::getAge).collect(Collectors.toList()); // 能够经过toCollection指定集合实现类型 // List<Integer> ages = students.stream().map(Student::getAge).collect(Collectors.toCollection(LinkedList::new)); System.out.println("全部学生的年龄列表: " + ages); // 统计汇总信息 IntSummaryStatistics ageSummaryStatistics = students.stream().collect(Collectors.summarizingInt(Student::getAge)); System.out.println("年龄汇总信息: " + ageSummaryStatistics); // 分块-按照规则把数据分红两块,这里按性别将学生数据分为两块 Map<Boolean, List<Student>> genders = students.stream().collect(Collectors.partitioningBy(s -> s.getGender() == Gender.MALE)); // 这里使用了Apache集合工具类进行打印 MapUtils.verbosePrint(System.out, "男女学生列表: ", genders); // 分组-按照规则把数据分为多组数据,这里按班级将学生数据进行分组 Map<Grade, List<Student>> grades = students.stream().collect(Collectors.groupingBy(Student::getGrade)); MapUtils.verbosePrint(System.out, "学生班级列表: ", grades); // 统计每一个分组里的数据-统计全部班级里学生的数量 Map<Grade, Long> gradesCount = students.stream().collect(Collectors.groupingBy(Student::getGrade, Collectors.counting())); MapUtils.verbosePrint(System.out, "全部班级学生人数列表: ", gradesCount); }
运行结果以下:
全部学生的年龄列表: [10, 9, 8, 13, 7, 13, 13, 9, 6, 6, 14, 13] 年龄汇总信息: IntSummaryStatistics{count=12, sum=121, min=6, average=10.083333, max=14} 男女学生列表: = { false = [Student(name=小白, age=8, gender=FEMALE, grade=TWO), Student(name=小黑, age=13, gender=FEMALE, grade=FOUR), Student(name=小红, age=7, gender=FEMALE, grade=THREE), Student(name=小青, age=13, gender=FEMALE, grade=THREE), Student(name=小紫, age=9, gender=FEMALE, grade=TWO), Student(name=小马, age=14, gender=FEMALE, grade=FOUR)] true = [Student(name=小明, age=10, gender=MALE, grade=ONE), Student(name=大明, age=9, gender=MALE, grade=THREE), Student(name=小黄, age=13, gender=MALE, grade=ONE), Student(name=小王, age=6, gender=MALE, grade=ONE), Student(name=小李, age=6, gender=MALE, grade=ONE), Student(name=小刘, age=13, gender=MALE, grade=FOUR)] } 学生班级列表: = { TWO = [Student(name=小白, age=8, gender=FEMALE, grade=TWO), Student(name=小紫, age=9, gender=FEMALE, grade=TWO)] FOUR = [Student(name=小黑, age=13, gender=FEMALE, grade=FOUR), Student(name=小马, age=14, gender=FEMALE, grade=FOUR), Student(name=小刘, age=13, gender=MALE, grade=FOUR)] ONE = [Student(name=小明, age=10, gender=MALE, grade=ONE), Student(name=小黄, age=13, gender=MALE, grade=ONE), Student(name=小王, age=6, gender=MALE, grade=ONE), Student(name=小李, age=6, gender=MALE, grade=ONE)] THREE = [Student(name=大明, age=9, gender=MALE, grade=THREE), Student(name=小红, age=7, gender=FEMALE, grade=THREE), Student(name=小青, age=13, gender=FEMALE, grade=THREE)] } 全部班级学生人数列表: = { TWO = 2 FOUR = 3 ONE = 4 THREE = 3 }
经过以上几个小节的内容,咱们已经掌握了流的基本操做。可是咱们对流的运行机制还不太清楚,因此本小节咱们将简单认识一下Stream的运行机制。
一样的,咱们首先来编写一段简单的Stream操做代码,以下:
public static void main(String[] args) { Random random = new Random(); // 随机产生数据 Stream<Integer> stream = Stream.generate(random::nextInt) // 产生500个 ( 无限流须要短路操做. ) .limit(500) // 第1个无状态操做 .peek(s -> print("peek: " + s)) // 第2个无状态操做 .filter(s -> { print("filter: " + s); return s > 1000000; }); // 终止操做 stream.count(); } public static void print(String s) { // 5毫秒打印一第二天志 System.out.println(Thread.currentThread().getName() + " > " + s); try { TimeUnit.MILLISECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } }
运行以上代码,控制台输出以下:
1.从运行结果中能够看到,peek和filter是交替执行的,也就是说全部操做都是链式调用,一个元素只迭代一次
2.既然是一个链式的调用,那么这条链是怎么样的呢?是如何维护的呢?咱们在终止操做上打上断点,经过debug运行。以下图,能够看到每个中间操做返回一个新的流,而流里面有一个属性sourceStage,它都指向同一个地方,就是链表的头Head:
3.而Head里指向了nextStage,nextStage里又指向了nextStage,一直指向到链尾的null值。就如:Head -> nextStage -> nextStage -> ... -> null
这就是Stream里实现链式调用所需的一个链表结构,是一条单链
以上的例子只有无状态操做,若是加入有状态操做,会发生什么变化呢?示例代码以下:
public static void main(String[] args) { Random random = new Random(); // 随机产生数据 Stream<Integer> stream = Stream.generate(random::nextInt) // 产生500个 ( 无限流须要短路操做. ) .limit(500) // 第1个无状态操做 .peek(s -> print("peek: " + s)) // 第2个无状态操做 .filter(s -> { print("filter: " + s); return s > 1000000; }) // 有状态操做 .sorted((i1, i2) -> { print("排序: " + i1 + ", " + i2); return i1.compareTo(i2); }) // 又一个无状态操做 .peek(s -> print("peek2: " + s)).parallel(); // 终止操做 stream.count(); }
运行以上代码,控制台输出以下:
main > peek: -1564323985 main > filter: -1564323985 main > peek: -779802182 main > filter: -779802182 main > peek: -498652682 main > filter: -498652682 main > 排序: 78555310, 50589406 main > 排序: 74439402, 50589406 main > 排序: 56492454, 50589406 main > 排序: 39808935, 50589406 main > 排序: 39808935, 39002482 main > peek2: 25284397 main > peek2: 29672249 main > peek2: 29800626 main > peek2: 32299397
从输出的日志信息能够发现,用于排序的中间操做截断了流,并无像无状态操做那样交替执行。因此咱们就能够得知有状态操做会把无状态操做截断,会单独进行处理而不会交替执行。
而后咱们再加上并行流,看看并行状况下又是怎样的,输出的日志信息以下:
ForkJoinPool.commonPool-worker-3 > peek: -332079048 ForkJoinPool.commonPool-worker-3 > filter: -332079048 ForkJoinPool.commonPool-worker-1 > filter: 1974510987 ForkJoinPool.commonPool-worker-4 > peek: -1727742841 ForkJoinPool.commonPool-worker-4 > filter: -1727742841 main > 排序: 58979900, 74247464 main > 排序: 58979900, 57671811 main > 排序: 53543451, 57671811 main > 排序: 53543451, 42862261 main > 排序: 43624983, 42862261 ForkJoinPool.commonPool-worker-0 > peek2: 1152454167 ForkJoinPool.commonPool-worker-2 > peek2: 1468420859 ForkJoinPool.commonPool-worker-5 > peek2: 736525554 ForkJoinPool.commonPool-worker-6 > peek2: 1×××50615
从日志打印能够看到,排序操做依旧是main线程执行的,而其余的操做则是线程池里的线程执行的。因此咱们经过这个例子能够得知即使在并行的环境下,有状态的中间操做不必定能并行操做。
顺带说明一下 parallel/ sequetial 这2个操做也是中间操做 (也是返回stream) ,可是区别在于它们不会建立流,,它们只修改 Head 的并行标志,由于这两个方法修改的是同一个地方,因此才会以最后一次调用的为准: