Stream流是Java8新引入的一个特性, 它容许你以声明性方式处理数据集合, 而不是像之前的指令式编程那样须要编写具体怎么实现。html
好比炒菜, 用指令式编程须要编写具体的实现java
配菜(); 热锅(); 放油(); 翻炒(); 放调料(); 出锅();
而若是是Stream流这种声明式方式, 只须要一步操做 炒菜(); 就能够完成上面的炒菜功能。它关注的是我要作什么, 而不是我要怎么作。编程
与Collection集合使用外部迭代不一样, Stream 流使用内部迭代, 它帮你把迭代作了, 还把获得的流值存在了某个地方, 你只要给出一个函数说要作什么就能够了。segmentfault
同一个流只能被消费一次, 下面这段代码运行会抛异常 java.lang.IllegalStateException 数组
@Test public void test16() { List<Dish> menu = Arrays.asList( new Dish("pork", false, 800, Dish.Type.MEAT), new Dish("beef", false, 700, Dish.Type.MEAT)); Stream<Dish> stream = menu.stream(); stream.forEach(System.out::println); stream.forEach(System.out::println); //java.lang.IllegalStateException: stream has already been operated upon or closed }
诸如filter、map、limit、sorted、distinct等中间操做会返回另外一个流, 多个中间操做链接起来就造成了一条流水线。除非流水线上触发一个终端操做, 如forEach、count、collect, 不然中间操做不会执行任何处理。并发
由于Stream流的一个特性就是惰性求值, 只有在触发了终端操做时, 才会把前面全部的中间操做合并起来一次性所有处理完。app
在正式介绍Stream API以前, 先引入一些实体类和几组数据集合, 后面的代码示例会常常用到它们。框架
这里插入一个小技巧,使用IDEA插件 lombok 可让你不用重复的编写实体类的Getter/Setter、构造方法等等,你只须要在实体类上添加一个 @Data 注解便可,lombok插件会在编译期间自动帮你生成Getter/Setter方法、toString方法。dom
@Data public class Dish { private String name; private boolean vegetarian; private int calories; private Type type; public Dish(String name, boolean vegetarian, int calories, Type type) { this.name = name; this.vegetarian = vegetarian; this.calories = calories; this.type = type; } public enum Type { MEAT, FISH, OTHER } public enum CaloricLevel { DIET, NORMAL, FAT } @Override public String toString() { return name; } } @Data public class Transaction { private Trader trader; private int year; private int value; private String currency; public Transaction(Trader trader, int year, int value) { this.trader = trader; this.year = year; this.value = value; } } @Data public class Trader { private String name; private String city; public Trader(String name, String city) { this.name = name; this.city = city; } }
static List<Dish> menu; static List<Integer> nums; static List<Transaction> transactions; static { menu = Arrays.asList( new Dish("pork", false, 800, Dish.Type.MEAT), new Dish("beef", false, 700, Dish.Type.MEAT), new Dish("chicken", false, 400, Dish.Type.MEAT), new Dish("french fries", true, 530, Dish.Type.OTHER), new Dish("rice", true, 350, Dish.Type.OTHER), new Dish("season fruit", true, 120, Dish.Type.OTHER), new Dish("pizza", true, 550, Dish.Type.OTHER), new Dish("prawns", false, 300, Dish.Type.FISH), new Dish("salmon", false, 450, Dish.Type.FISH)); nums = Arrays.asList(1, 3, 5, 7, 9, 11, 13); Trader raoul = new Trader("Raoul", "Cambridge"); Trader mario = new Trader("Mario", "Milan"); Trader alan = new Trader("Alan", "Cambridge"); Trader brian = new Trader("Brian", "Cambridge"); transactions = Arrays.asList( new Transaction(brian, 2011, 300), new Transaction(raoul, 2012, 1000), new Transaction(raoul, 2011, 400), new Transaction(mario, 2012, 710), new Transaction(mario, 2012, 700), new Transaction(alan, 2012, 950) ); }
map()方法用于将流中的元素映射成一个新的元素。ide
@Test public void test17() { //获取每道菜的名称的长度 List<Integer> list = menu.stream() .map(Dish::getName) .map(String::length) .collect(Collectors.toList()); }
flatMap()方法会把一个流中的每一个值转换成另外一个流,而后把全部的流扁平化,链接起来造成一个新的流。
@Test public void test18() { List<String> words = Arrays.asList("hello", "world"); List<String> list = words.stream() .map(i -> i.split("")) .flatMap(Arrays::stream)//流扁平化,造成一个新的流 .distinct()//过滤重复的元素 .collect(Collectors.toList()); System.out.println(list);//result: [h, e, l, o, w, r, d] }
findFirst()用于返回流中的第一个元素,findAny() 返回流中任意一个元素。由于流多是空的,因此findFirst()和findAny()的返回类型都是Optional<T>, 当流没有元素时,就返回一个空的Optional。
对于findFirst()和findAny(),若是不关心返回的元素是哪一个,使用findAny()在并行流时限制更少。
@Test public void test19() { menu.stream() .filter(Dish::isVegetarian) .findAny() .ifPresent(i -> System.out.println(i.getName()));//会在Optional包含值的时候执行给定的代码块 }
你能够用 allMatch() 、noneMatch()和anyMatch()方法让流匹配给定的谓词Predicate<T>, 方法名就可见名知意, 分别对应 全部元素都要匹配、全部元素都不匹配、任意一个元素匹配。
经过reduce()方法能够对流进行归约操做。
所谓规约操做就是将流中全部元素反复结合起来, 最终获得一个值.
@Test public void test20() { Integer sum1 = nums.stream().reduce(0, Integer::sum); System.out.println(sum1); Optional<Integer> o1 = nums.stream().reduce(Integer::sum);//求和 System.out.println(o1.get()); Optional<Integer> o2 = nums.stream().reduce(Integer::max);//最大值 System.out.println(o2.get()); Integer count = menu.stream().map(d -> 1).reduce(0, Integer::sum);//计算流中元素的个数 menu.stream().count(); }
下面经过一段对交易员数据集合transactions进行处理的示例, 总结下经常使用的几种Stream API。
@Test public void test21() { //(1) 找出2011年发生的全部交易,并按交易额排序(从低到高)。 List<Transaction> list = transactions.stream().filter(i -> 2011 == i.getYear()).sorted(Comparator.comparing(Transaction::getValue)).collect(Collectors.toList()); //(2) 交易员都在哪些不一样的城市工做过? Set<String> cities = transactions.stream().map(Transaction::getTrader).map(Trader::getCity).collect(Collectors.toSet()); //(3) 查找全部来自于剑桥的交易员,并按姓名排序。 List<Trader> trades = transactions.stream().map(Transaction::getTrader).filter(i -> "Cambridge".equals(i.getCity())).distinct().sorted(Comparator.comparing(Trader::getName)).collect(Collectors.toList()); //(4) 返回全部交易员的姓名字符串,按字母顺序排序。 String names = transactions.stream().map(Transaction::getTrader).distinct().map(Trader::getName).sorted().reduce("", (a, b) -> a + b); //(5) 有没有交易员是在米兰工做的? boolean flag = transactions.stream().map(Transaction::getTrader).anyMatch(trader -> "Milan".equals(trader.getCity())); //(6) 打印生活在剑桥的交易员的全部交易的总额。 Integer sum = transactions.stream().filter(i -> "Cambridge".equals(i.getTrader().getCity())).map(Transaction::getValue).reduce(0, Integer::sum); //(7) 全部交易中,最高的交易额是多少? Integer max = transactions.stream().map(Transaction::getValue).reduce(0, Integer::max); //(8) 找到交易额最小的交易。 Optional<Transaction> first = transactions.stream().min(Comparator.comparingInt(Transaction::getValue)); System.out.println(first.get()); }
原始类型流特化: IntStream, LongStream, DoubleStream的简单使用以及和Stream流之间的相互转换。
@Test public void test22() { int calories = menu.stream().mapToInt(Dish::getCalories).sum(); //映射到数值流 mapToXxx IntStream intStream = menu.stream().mapToInt(Dish::getCalories); //转换回基本类型对应的对象流 Stream<Integer> stream = intStream.boxed(); //intStream.mapToObj(Integer::valueOf); //默认值OptionalInt List<Dish> list = new ArrayList<>(); OptionalInt optionalInt = list.stream().mapToInt(Dish::getCalories).max(); System.out.println(optionalInt.orElse(88)); //result: 88 // 数值范围 long count = IntStream.rangeClosed(1, 102).filter(i -> i % 3 == 0).count(); System.out.println(count);//result: 34 }
构建流的几种方式
由集合建立流, 根据数值范围建立数值流, 由值建立流, 由数组建立流, 由文件生成流, 由函数生成无限流。
@Test public void test24() { IntStream.rangeClosed(1, 100);//根据数值范围建立数值流 Stream<String> stream = Stream.of("java8", "盖聂", "少司命");//由值建立流 int sum = Arrays.stream(new int[]{1, 2, 3, 4}).sum();//由数组建立流 //由文件生成流 ===>下面示例Files.lines获得一个流,流中的每一个元素对应文件中的一行 try (Stream<String> lines = Files.lines(Paths.get("1.txt"), Charset.defaultCharset())) { long count = lines.flatMap(line -> Arrays.stream(line.split(" "))) .distinct() .count(); } catch (IOException ex) { } //由函数生成流: 建立无限流 Stream.iterate(0, n -> n + 1) .limit(10) .forEach(System.out::println); Stream.iterate(new int[]{0, 1}, arr -> new int[]{arr[1], arr[0] + arr[1]}) //建立一个斐波纳契元祖序列 .limit(10) .forEach(arr -> System.out.println("(" + arr[0] + ", " + arr[1] + ")")); Stream.generate(Math::random) .limit(5) .forEach(System.out::println); }
Collectors类中提供了一些静态工厂方法, 用于流的归约和汇总操做。
常见的有counting() 计算流中元素的个数,maxBy()和minBy() 取出流中某个属性值最大或最小的元素,joining() 将对流中每个对象应用 toString() 方法获得的全部字符串链接成一个字符串,reducing() 对流中的元素进行归约操做等等。
下面是简单的示例, 类中已经导入了Collectors类中的全部静态方法。
@Test public void test1() { Long count = menu.stream().collect(counting());//菜单里有多少种菜 Optional<Dish> optionalDish = menu.stream().collect(maxBy(comparingInt(Dish::getCalories)));//菜单里热量最高的菜 Integer totalCalories1 = menu.stream().collect(summingInt(Dish::getCalories));//菜单列表的总热量 Double averageCalories = menu.stream().collect(averagingInt(Dish::getCalories));//菜单列表的热量平均值 IntSummaryStatistics intSummaryStatistics = menu.stream().collect(summarizingInt(Dish::getCalories));//一次迭代,统计出菜单列表元素个数, 菜肴热量最大值、最小值、平均值、总和 System.out.println(intSummaryStatistics.toString()); //result: IntSummaryStatistics{count=9, sum=4200, min=120, average=466.666667, max=800} String names = menu.stream().map(Dish::getName).collect(joining(","));//链接字符串 Integer totalCalories2 = menu.stream().collect(reducing(0, Dish::getCalories, Integer::sum));//菜单列表的总热量 }
流的分组和分区操做 groupingBy(), partitioningBy()
所谓分组,就是将流中的元素按某个属性根据必定的规则分为不一样的小块。好比常见的考试评定班级学生成绩状况,分数<60 为不及格,60<=分数<80为良好,80<=分数为优秀,这个就是分组。
分区则比较特殊,它是根据一个谓词Predicate<T>做为分类函数,也就是分出来的只会有两种类型,对应的Map键就是布尔类型。
@Test public void test2() { //单级分组 Map<Type, List<Dish>> map1 = menu.stream().collect(groupingBy(Dish::getType)); //多级分组 result: {FISH={NORMAL=[salmon], DIET=[prawns]}, OTHER={NORMAL=[french fries, pizza], DIET=[rice, season fruit]}, MEAT={NORMAL=[chicken], FAT=[pork, beef]}} Map<Type, Map<CaloricLevel, List<Dish>>> map2 = menu.stream().collect(groupingBy(Dish::getType, groupingBy(dish -> { if (dish.getCalories() < 400) return DIET; else if (dish.getCalories() < 700) return NORMAL; else return FAT; }))); //菜单中每种类型的菜肴的数量 Map<Type, Long> map3 = menu.stream().collect(groupingBy(Dish::getType, counting()));//result: {FISH=2, OTHER=4, MEAT=3} //菜单中每种类型热量最高的菜肴 Map<Type, Optional<Dish>> map4 = menu.stream().collect(groupingBy(Dish::getType, maxBy(comparingInt(Dish::getCalories))));//result:{FISH=Optional[salmon], OTHER=Optional[pizza], MEAT=Optional[pork]} //上面分组操做后的Optional<Dish>是必定有值的,因此这个Optional包装没什么意义,能够经过collectingAndThen()方法把Dish直接提取出来 Map<Type, Dish> map5 = menu.stream().collect(groupingBy(Dish::getType, collectingAndThen(maxBy(comparingInt(Dish::getCalories)), Optional::get)));//result:{FISH=Optional[salmon], OTHER=Optional[pizza], MEAT=Optional[pork]} //根据菜肴类型分组,获取全部的菜肴名称 result: {MEAT=[chicken, beef, pork], OTHER=[season fruit, pizza, rice, french fries], FISH=[salmon, prawns]} LinkedHashMap<Type, Set<String>> map6 = menu.stream().collect(groupingBy(Dish::getType, LinkedHashMap::new, mapping(Dish::getName, toSet()))); //在上面的例子中, toSet()方法生成的收集器咱们是没法指定Set类型的, 可使用toCollection()工厂方法来指定集合类型, 好比LInkedHashSet LinkedHashMap<Type, LinkedHashSet<String>> menu7 = menu.stream().collect(groupingBy(Dish::getType, LinkedHashMap::new, mapping(Dish::getName, toCollection(LinkedHashSet::new)))); //按菜肴是否素食进行分区 result: {false=[chicken, salmon, prawns, beef, pork], true=[rice, french fries, pizza, season fruit]} Map<Boolean, HashSet<Dish>> map9 = menu.stream().collect(partitioningBy(Dish::isVegetarian, toCollection(HashSet::new))); //获取素食和非素食中热量最高的菜肴 result: {false=pork, true=pizza} Map<Boolean, Dish> map10 = menu.stream().collect(partitioningBy(Dish::isVegetarian, collectingAndThen(maxBy(comparingInt(Dish::getCalories)), Optional::get))); //将前20个天然数按质数和非质数分区 Map<Boolean, List<Integer>> map11 = IntStream.rangeClosed(2, 20).boxed().collect(partitioningBy(this::isPrime)); } private boolean isPrime(int candidate) { int sqrt = (int) Math.sqrt(candidate); return IntStream.rangeClosed(2, sqrt).noneMatch(i -> candidate % i == 0); }
自定义收集器的两种方式
IDENTITY_FINISH
特征(即对结果容器作最终类型转换的finisher()方法返回的是一个恒等函数)的收集器才能使用。@Test public void test3() { //粗糙的自定义收集器 List<Dish> list = menu.stream().collect(new ToListCollector<Dish>()); //对于IDENTITY_FINISH这种最终函数是恒等函数的收集操做,能够用Stream中的重载方法collect()实现一样的效果 HashSet<Object> hashset = menu.stream().collect(HashSet::new, HashSet::add, HashSet::addAll); } public class ToListCollector<T> implements Collector<T, List<T>, List<T>> { /** * 建立一个空的结果容器,供数据收集使用 */ @Override public Supplier<List<T>> supplier() { return ArrayList::new; } /** * 将元素添加到结果容器 */ @Override public BiConsumer<List<T>, T> accumulator() { return List::add; } /** * 此方法定义了在使用并行流时,从各个子流进行归约所得的结果容器要如何合并在一块儿 */ @Override public BinaryOperator<List<T>> combiner() { return (left, right) -> { left.addAll(right); return left; }; } /** * 对结果容器作最终类型转换 */ @Override public Function<List<T>, List<T>> finisher() { return Function.identity(); } /** * 定义收集器的一些行为特征,好比无序归约、并行归约、最终类型转换finisher()返回的函数是一个恒等函数 */ @Override public Set<Characteristics> characteristics() { return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.CONCURRENT)); } }
调用流的 sequential() 或 parallel() 方法能够指定流顺序/并行执行,其底层原理就是改变一个记录是否并行执行的标志的布尔变量的值来实现的。
并行流内部使用了默认的 ForkJoinPool 分支/合并框架,它默认的线程数就是当前机器的处理器数量,这个值是由 Runtime.getRuntime().availableProcessors() 获得的,能够经过下面的方式改变线程池的大小,但不建议,由于一旦线程数超过了处理器的数量,就可能会引起并发访问的共享资源竞争问题。
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "128");//全局设置
下面这段代码对原始迭代、并行流、顺序流的几种方式进行了测试,它们使用不一样的实现方式对 1~10000000 之间的天然数求和,你会看到,在某些场景下若是不恰当的使用了并行流,反而会大大下降性能,好比Stream类的iterate()方法生成的流使用并行反而会增长额外开销。
由于每次应用iterate()方法时都要依赖前一次应用的结果,所以没法有效的把流划分为多个小块来并行处理,这里把流标记成并行,实则给本来的顺序处理增长了额外的开销
@Test public void test1() { long sec1 = this.measureSumPerf(ParallelStream::iterativeSum, 1000_0000); System.out.println(sec1);//4毫秒 long sec2 = this.measureSumPerf(ParallelStream::sequentialSum, 1000_0000); System.out.println(sec2);//16毫秒 //每次应用iterate()方法时都要依赖前一次应用的结果,所以没法有效的把流划分为多个小块来并行处理,这里把流标记成并行,实则给本来的顺序处理增长了额外的开销 long sec3 = this.measureSumPerf(ParallelStream::parallelSum, 1000_0000); System.out.println(sec3);//241毫秒 } public long measureSumPerf(Function<Long, Long> adder, long n) { long fastest = Long.MAX_VALUE; for (int i = 0; i < 10; i++) { long start = System.nanoTime(); long sum = adder.apply(n); long duration = (System.nanoTime() - start) / 1_000_000; System.out.println("Result: " + sum); if (duration < fastest) fastest = duration; } return fastest; } public class ParallelStream { public static long sequentialSum(long n) { return LongStream.iterate(1, i -> i + 1) .limit(n) .sum(); // return LongStream.rangeClosed(1, n).reduce(0, Long::sum);//4毫秒 } public static long iterativeSum(long n) { long sum = 0; for (long i = 1; i < n + 1; i++) { sum += i; } return sum; } public static long parallelSum(long n) { return LongStream.iterate(1, i -> i + 1) .limit(n) .parallel() .sum(); // return LongStream.rangeClosed(1, n).parallel().reduce(0, Long::sum);//2毫秒 } }
同理, 相似limit和findFirst这种依赖于元素顺序的操做,在并行流上的性能通常会比顺序流差。
经过Stream类提供的peek()方法能够查看Stream流水线每一步中间操做的输出结果
@Test public void test9() { List<Integer> numbers = Arrays.asList(2, 3, 4, 5, 6, 7, 8, 9); List<Integer> result = numbers.stream() .peek(x -> System.out.println("from stream: " + x)) .map(x -> x + 17) .peek(x -> System.out.println("after map: " + x)) .filter(x -> x % 2 == 0) .peek(x -> System.out.println("after filter: " + x)) .limit(3) .peek(x -> System.out.println("after limit: " + x + "\n")) .collect(toList()); }
输出结果以下
from stream: 2 after map: 19 from stream: 3 after map: 20 after filter: 20 after limit: 20 from stream: 4 after map: 21 from stream: 5 after map: 22 after filter: 22 after limit: 22 from stream: 6 after map: 23 from stream: 7 after map: 24 after filter: 24 after limit: 24
做者:张小凡
出处:https://www.cnblogs.com/qingshanli/
本文版权归做者和博客园共有,欢迎转载,但未经做者赞成必须保留此段声明,且在文章页面明显位置给出原文链接,不然保留追究法律责任的权利。若是以为还有帮助的话,能够点一下右下角的【推荐】。
原文出处:https://www.cnblogs.com/qingshanli/p/11748380.html