Java8函数式编程(二):类比Spark RDD算子的Stream流操做

1 Stream流

对集合进行迭代时,可调用其iterator方法,返回一个iterator对象,以后即可以经过该iterator对象遍历集合中的元素,这被称为外部迭代(for循环自己正是封装了其的语法糖),其示意图以下:java

Java8函数式编程(二):类比Spark RDD算子的Stream流操做

除此以外,还有内部迭代方法,这正是这里要说明的集合的stream()方法返回的Stream对象的一系列操做,好比,要统计一个数字列表的偶数元素个数,当使用Stream对象的操做时,以下:编程

List<Integer> list = new ArrayList<Integer>(){{
    add(1);
    add(2);
    add(3);
}};

long count = list.stream().filter(num -> num % 2 == 0).count();
System.out.println(count);  // 1

其示意图以下:分布式

Java8函数式编程(二):类比Spark RDD算子的Stream流操做

上面提供的例子,好比filter,其参数为一个lambda表达式,因此Stream实际上是用函数式编程方式在集合类上进行复杂操做的工具。ide

2 Stream流操做与Spark RDD算子

其实有Spark经验的人开始使用Stream流操做时,会有似曾相识的感受,好像一切都那么熟悉。函数式编程

参考Spark RDD算子介绍的文章:《Spark RDD算子实战》https://blog.51cto.com/xpleaf/2108481函数

下面从操做对象(名词)和对象操做(动词)两个角度来简单对比一下。工具

2.1 操做对象

Spark RDD算子的操做对象是RDD,中文意思是弹性分布式数据集,对用户而言,它就是相似集合同样的对象,里面存的是数据,只是底层它的数据可能分布于各个节点的各个partition,但无论怎样,其本质仍是数据集。性能

Stream流操做的操做对象是集合,集合本质也是一种数据集,只是相比RDD,它是单机的。翻译

2.2 对象操做

Spark RDD算子有两种类型,分别是Transformation算子和Action算子,前者是延迟计算的,它仅仅记住了数据的逻辑操做,并无真正执行,后者是真正触发Transformation算子的计算。code

Stream流操做也有两种类型,分别是惰性求值和及早求值(我的以为这翻译很差),前者也只是记录了惰性求值的逻辑操做,后者才是真正触发操做。

能够看到其二者是很是类似的,一个是对分布式数据进行的各类操做,一个是单机数据进行的各类操做,把计算分为延迟计算和触发计算两种,好处是显而易见的:当对数据集进行屡次逻辑操做时,有可能迭代只须要一次就可能完成,这样真正触发计算时,一次迭代带来的性能提高是显著的,好比对于过滤和计算这两个操做(前面计算偶数的操做),在一次迭代中就可以完成。

固然,不只类型类似,其自己提供的操做的名称而言,都是类似的,有些东西真的是通用的。

3 经常使用Stream流操做

每一个操做都用一个通俗易懂的例子来进行说明。

3.1 及早求值操做

3.1.1 collect(toList())

其做用是将Stream流中的元素收集起来,造成List、Set或Map等。

List<Integer> list = Stream.of(1, 2, 3).collect(Collectors.toList());

System.out.println(list);   // [1, 2, 3]

1.Stream.of()方法用于方便地生成Stream流;

2.Collectors还有toSet()、toMap()等方法,详见其API。

3.1.2 forEach(Consumer)

对集合中的每一个元素进行操做,其参数是Consumer<T>函数接口。

Consumer<Integer> printNum = System.out::print;
Stream.of(1, 2, 3).forEach(printNum);   // 123

System.out::print表示使用System.out类中的print方法,至关于lambda表达式:element -> System.out.print(element);

上面的例子也能够一步到位:

Stream.of(1, 2, 3).forEach(System.out::print);  // 123

3.1.3 max和min

其参数为Comparator对象,返回一个Optional对象,Optional说明其结果可能有,也可能没有(好比对空值的Stream流操做时)。

// 计算数值流中的最大值
Optional<Integer> maxOptional = Stream.of(1, 2, 3).max(Comparator.comparing(num -> num));
System.out.println(maxOptional.get());  // 3

// 找出字符串流中长度最小的字符串
Optional<String> minOptional = Stream.of("a", "ab", "abc").min(Comparator.comparing(String::length));
System.out.println(minOptional.get());  // a

另外,其确实是及早求值操做,能够验证一下:

Stream.of(1, 2, 3).max(Comparator.comparing(num -> {
    System.out.println(num);
    return num;
}));

输出:

1
2
2
3

3.2 惰性求值操做

3.2.1 map

其参数为Function&lt;T,R&gt;,用于将Stream流中的值转换为另一种流。

// 将字母转换为大写
Stream.of("a", "b", "hello")
    .map(String::toUpperCase)
    .forEach(element -> System.out.print(element + " "));  // A B HELLO

3.2.2 filter

其参数为Predicate&lt;T&gt;,过滤Stream流中的元素。

// 找出偶数
List<Integer> list = Stream.of(1, 2, 3).filter(num -> num % 2 == 0).collect(Collectors.toList());

System.out.println(list);   // [2]

3.2.3 flatMap

其参数为Function&lt;T,R&gt;,只是此时R限定为Stream,将Stream流中的值转换为更多的流。

// 找出字符串中的单词
List<String> list = Stream.of("hello you", "hello me")
    .flatMap(line -> Arrays.stream(line.split(" "))).collect(Collectors.toList());

System.out.println(list);   // [hello, you, hello, me]

是否是感受跟Spark的wordcount例子有点像。

3.2.4 reduce

其参数为BinaryOperator&lt;T&gt;,返回一个Optional对象,Optional说明其结果可能有,也可能没有(好比对空值的Stream流操做时,而且没有指定初始值),用于归约操做。

// 求和
Integer res = Stream.of(1, 2, 3).reduce((acc, element) -> acc + element).get();

// 指定初始值6后,Stream的reduce操做结果确定有值的,所以其返回的不是Optional,而直接是6所属的类型,即Integer
Integer res2 = Stream.of(1, 2, 3).reduce(6, (acc, element) -> acc + element);

System.out.println(String.format("res: %s, res2: %s", res, res2));  // res: 6, res2: 12

4 参考

Java 8 Lambdas,Richard Warburton著(O’Reilly,2014)》

相关文章
相关标签/搜索