Java8 新特性之流式数据处理

一. 流式处理简介

在我接触到java8流式处理的时候,个人第一感受是流式处理让集合操做变得简洁了许多,一般咱们须要多行代码才能完成的操做,借助于流式处理能够在一行中实现。好比咱们但愿对一个包含整数的集合中筛选出全部的偶数,并将其封装成为一个新的List返回,那么在java8以前,咱们须要经过以下代码实现:java

List<Integer> evens = new ArrayList<>();
for (final Integer num : nums) {
    if (num % 2 == 0) {
        evens.add(num);
    }
}

经过java8的流式处理,咱们能够将代码简化为:数据库

List<Integer> evens = nums.stream().filter(num -> num % 2 == 0).collect(Collectors.toList());

先简单解释一下上面这行语句的含义,stream()操做将集合转换成一个流,filter()执行咱们自定义的筛选处理,这里是经过lambda表达式筛选出全部偶数,最后咱们经过collect()对结果进行封装处理,并经过Collectors.toList()指定其封装成为一个List集合返回。数组

由上面的例子能够看出,java8的流式处理极大的简化了对于集合的操做,实际上不光是集合,包括数组、文件等,只要是能够转换成流,咱们均可以借助流式处理,相似于咱们写SQL语句同样对其进行操做。java8经过内部迭代来实现对流的处理,一个流式处理能够分为三个部分:转换成流、中间操做、终端操做。以下图:安全

以集合为例,一个流式处理的操做咱们首先须要调用stream()函数将其转换成流,而后再调用相应的中间操做达到咱们须要对集合进行的操做,好比筛选、转换等,最后经过终端操做对前面的结果进行封装,返回咱们须要的形式。多线程

二. 中间操做

咱们定义一个简单的学生实体类,用于后面的例子演示:app

public class Student {

    /** 学号 */
    private long id;

    private String name;

    private int age;

    /** 年级 */
    private int grade;

    /** 专业 */
    private String major;

    /** 学校 */
    private String school;

    // 省略getter和setter
}
// 初始化
List<Student> students = new ArrayList<Student>() {
    {
        add(new Student(20160001, "孔明", 20, 1, "土木工程", "武汉大学"));
        add(new Student(20160002, "伯约", 21, 2, "信息安全", "武汉大学"));
        add(new Student(20160003, "玄德", 22, 3, "经济管理", "武汉大学"));
        add(new Student(20160004, "云长", 21, 2, "信息安全", "武汉大学"));
        add(new Student(20161001, "翼德", 21, 2, "机械与自动化", "华中科技大学"));
        add(new Student(20161002, "元直", 23, 4, "土木工程", "华中科技大学"));
        add(new Student(20161003, "奉孝", 23, 4, "计算机科学", "华中科技大学"));
        add(new Student(20162001, "仲谋", 22, 3, "土木工程", "浙江大学"));
        add(new Student(20162002, "鲁肃", 23, 4, "计算机科学", "浙江大学"));
        add(new Student(20163001, "丁奉", 24, 5, "土木工程", "南京大学"));
    }
};

2.1 过滤

过滤,顾名思义就是按照给定的要求对集合进行筛选知足条件的元素,java8提供的筛选操做包括:filter、distinct、limit、skip。框架

filter
在前面的例子中咱们已经演示了如何使用filter,其定义为:Stream<T> filter(Predicate<? super T> predicate),filter接受一个谓词Predicate,咱们能够经过这个谓词定义筛选条件,在介绍lambda表达式时咱们介绍过Predicate是一个函数式接口,其包含一个test(T t)方法,该方法返回boolean。如今咱们但愿从集合students中筛选出全部武汉大学的学生,那么咱们能够经过filter来实现,并将筛选操做做为参数传递给filter:jvm

List<Student> whuStudents = students.stream()
                                    .filter(student -> "武汉大学".equals(student.getSchool()))
                         
           .collect(Collectors.toList());

distinct
distinct操做相似于咱们在写SQL语句时,添加的DISTINCT关键字,用于去重处理,distinct基于Object.equals(Object)实现,回到最开始的例子,假设咱们但愿筛选出全部不重复的偶数,那么能够添加distinct操做:ide

List<Integer> evens = nums.stream()
                        .filter(num -> num % 2 == 0).distinct()
                        .collect(Collectors.toList());

limit
limit操做也相似于SQL语句中的LIMIT关键字,不过相对功能较弱,limit返回包含前n个元素的流,当集合大小小于n时,则返回实际长度,好比下面的例子返回前两个专业为土木工程专业的学生:函数

List<Student> civilStudents = students.stream()
                                    .filter(student -> "土木工程".equals(student.getMajor())).limit(2)
                                    .collect(Collectors.toList());

说到limit,不得不说起一下另一个流操做:sorted。该操做用于对流中元素进行排序,sorted要求待比较的元素必须实现Comparable接口,若是没有实现也没关系,咱们能够将比较器做为参数传递给sorted(Comparator<? super T> comparator),好比咱们但愿筛选出专业为土木工程的学生,并按年龄从小到大排序,筛选出年龄最小的两个学生,那么能够实现为:

List<Student> sortedCivilStudents = students.stream()
                                            .filter(student -> "土木工程".equals(student.getMajor())).sorted((s1, s2) -> s1.getAge() - s2.getAge())
                                            .limit(2)
                                            .collect(Collectors.toList());

skip
skip操做与limit操做相反,如同其字面意思同样,是跳过前n个元素,好比咱们但愿找出排序在2以后的土木工程专业的学生,那么能够实现为:

List<Student> civilStudents = students.stream()
                                    .filter(student -> "土木工程".equals(student.getMajor()))
                                    .skip(2)
                                    .collect(Collectors.toList());

经过skip,就会跳过前面两个元素,返回由后面全部元素构造的流,若是n大于知足条件的集合的长度,则会返回一个空的集合。

2.2 映射

在SQL中,借助SELECT关键字后面添加须要的字段名称,能够仅输出咱们须要的字段数据,而流式处理的映射操做也是实现这一目的,在java8的流式处理中,主要包含两类映射操做:map和flatMap。

map
举例说明,假设咱们但愿筛选出全部专业为计算机科学的学生姓名,那么咱们能够在filter筛选的基础之上,经过map将学生实体映射成为学生姓名字符串,具体实现以下:

List<String> names = students.stream()
                            .filter(student -> "计算机科学".equals(student.getMajor()))
                            .map(Student::getName).collect(Collectors.toList());

除了上面这类基础的map,java8还提供了mapToDouble(ToDoubleFunction<? super T> mapper)mapToInt(ToIntFunction<? super T> mapper)mapToLong(ToLongFunction<? super T> mapper),这些映射分别返回对应类型的流,java8为这些流设定了一些特殊的操做,好比咱们但愿计算全部专业为计算机科学学生的年龄之和,那么咱们能够实现以下:

int totalAge = students.stream()
                    .filter(student -> "计算机科学".equals(student.getMajor()))
                    .mapToInt(Student::getAge).sum();

经过将Student按照年龄直接映射为IntStream,咱们能够直接调用提供的sum()方法来达到目的,此外使用这些数值流的好处还在于能够避免jvm装箱操做所带来的性能消耗。

flatMap
flatMap与map的区别在于 flatMap是将一个流中的每一个值都转成一个个流,而后再将这些流扁平化成为一个流 。举例说明,假设咱们有一个字符串数组String[] strs = {"java8", "is", "easy", "to", "use"};,咱们但愿输出构成这一数组的全部非重复字符,那么咱们可能首先会想到以下实现:

List<String[]> distinctStrs = Arrays.stream(strs)
                                .map(str -> str.split(""))  // 映射成为Stream<String[]>
                                .distinct()
                                .collect(Collectors.toList());

在执行map操做之后,咱们获得是一个包含多个字符串(构成一个字符串的字符数组)的流,此时执行distinct操做是基于在这些字符串数组之间的对比,因此达不到咱们但愿的目的,此时的输出为:

[j, a, v, a, 8]
[i, s]
[e, a, s, y]
[t, o]
[u, s, e]

distinct只有对于一个包含多个字符的流进行操做才能达到咱们的目的,即对Stream<String>进行操做。此时flatMap就能够达到咱们的目的:

List<String> distinctStrs = Arrays.stream(strs)
                                .map(str -> str.split(""))  // 映射成为Stream<String[]>
                                .flatMap(Arrays::stream)  // 扁平化为Stream<String>
                                .distinct()
                                .collect(Collectors.toList());

flatMap将由map映射获得的Stream<String[]>,转换成由各个字符串数组映射成的流Stream<String>,再将这些小的流扁平化成为一个由全部字符串构成的大流Steam<String>,从而可以达到咱们的目的。
与map相似,flatMap也提供了针对特定类型的映射操做:flatMapToDouble(Function<? super T,? extends DoubleStream> mapper)flatMapToInt(Function<? super T,? extends IntStream> mapper)flatMapToLong(Function<? super T,? extends LongStream> mapper)

三. 终端操做

终端操做是流式处理的最后一步,咱们能够在终端操做中实现对流查找、归约等操做。

3.1 查找

allMatch
allMatch用于检测是否所有都知足指定的参数行为,若是所有知足则返回true,例如咱们但愿检测是否全部的学生都已满18周岁,那么能够实现为:

boolean isAdult = students.stream().allMatch(student -> student.getAge() >= 18);

anyMatch
anyMatch则是检测是否存在一个或多个知足指定的参数行为,若是知足则返回true,例如咱们但愿检测是否有来自武汉大学的学生,那么能够实现为:

boolean hasWhu = students.stream().anyMatch(student -> "武汉大学".equals(student.getSchool()));

noneMathch
noneMatch用于检测是否不存在知足指定行为的元素,若是不存在则返回true,例如咱们但愿检测是否不存在专业为计算机科学的学生,能够实现以下:

boolean noneCs = students.stream().noneMatch(student -> "计算机科学".equals(student.getMajor()));

findFirst
findFirst用于返回知足条件的第一个元素,好比咱们但愿选出专业为土木工程的排在第一个学生,那么能够实现以下:

Optional<Student> optStu = students.stream().filter(student -> "土木工程".equals(student.getMajor())).findFirst();

findAny
findAny相对于findFirst的区别在于,findAny不必定返回第一个,而是返回任意一个,好比咱们但愿返回任意一个专业为土木工程的学生,能够实现以下:

Optional<Student> optStu = students.stream().filter(student -> "土木工程".equals(student.getMajor())).findAny();

实际上对于顺序流式处理而言,findFirst和findAny返回的结果是同样的,至于为何会这样设计,是由于在下一篇咱们介绍的并行流式处理,当咱们启用并行流式处理的时候,查找第一个元素每每会有不少限制,若是不是特别需求,在并行流式处理中使用findAny的性能要比findFirst好。

3.2 归约

前面的例子中咱们大部分都是经过collect(Collectors.toList())对数据封装返回,如个人目标不是返回一个新的集合,而是但愿对通过参数化操做后的集合进行进一步的运算,那么咱们可用对集合实施归约操做。java8的流式处理提供了reduce方法来达到这一目的。

前面咱们经过mapToInt将Stream<Student>映射成为IntStream,并经过IntStream的sum方法求得全部学生的年龄之和,实际上咱们经过归约操做,也能够达到这一目的,实现以下:

// 前面例子中的方法
int totalAge = students.stream()
                .filter(student -> "计算机科学".equals(student.getMajor()))
                .mapToInt(Student::getAge).sum();
// 归约操做
int totalAge = students.stream()
                .filter(student -> "计算机科学".equals(student.getMajor()))
                .map(Student::getAge)
                .reduce(0, (a, b) -> a + b);

// 进一步简化
int totalAge2 = students.stream()
                .filter(student -> "计算机科学".equals(student.getMajor()))
                .map(Student::getAge)
                .reduce(0, Integer::sum);

// 采用无初始值的重载版本,须要注意返回Optional
Optional<Integer> totalAge = students.stream()
                .filter(student -> "计算机科学".equals(student.getMajor()))
                .map(Student::getAge)
                .reduce(Integer::sum);  // 去掉初始值

3.3 收集

前面利用collect(Collectors.toList())是一个简单的收集操做,是对处理结果的封装,对应的还有toSettoMap,以知足咱们对于结果组织的需求。这些方法均来自于java.util.stream.Collectors,咱们能够称之为收集器。

3.3.1 归约

收集器也提供了相应的归约操做,可是与reduce在内部实现上是有区别的,收集器更加适用于可变容器上的归约操做,这些收集器广义上均基于Collectors.reducing()实现。

例1:求学生的总人数

long count = students.stream().collect(Collectors.counting());

// 进一步简化
long count = students.stream().count();

例2:求年龄的最大值和最小值

// 求最大年龄
Optional<Student> olderStudent = students.stream().collect(Collectors.maxBy((s1, s2) -> s1.getAge() - s2.getAge()));

// 进一步简化
Optional<Student> olderStudent2 = students.stream().collect(Collectors.maxBy(Comparator.comparing(Student::getAge)));

// 求最小年龄
Optional<Student> olderStudent3 = students.stream().collect(Collectors.minBy(Comparator.comparing(Student::getAge)));

例3:求年龄总和

int totalAge4 = students.stream().collect(Collectors.summingInt(Student::getAge));

对应的还有summingLongsummingDouble

例4:求年龄的平均值

double avgAge = students.stream().collect(Collectors.averagingInt(Student::getAge));

对应的还有averagingLongaveragingDouble

例5:一次性获得元素个数、总和、均值、最大值、最小值

IntSummaryStatistics statistics = students.stream().collect(Collectors.summarizingInt(Student::getAge));

输出:

IntSummaryStatistics{count=10, sum=220, min=20, average=22.000000, max=24}

对应的还有summarizingLongsummarizingDouble

例6:字符串拼接

String names = students.stream().map(Student::getName).collect(Collectors.joining());
// 输出:孔明伯约玄德云长翼德元直奉孝仲谋鲁肃丁奉
String names = students.stream().map(Student::getName).collect(Collectors.joining(", "));
// 输出:孔明, 伯约, 玄德, 云长, 翼德, 元直, 奉孝, 仲谋, 鲁肃, 丁奉

3.3.2 分组

在数据库操做中,咱们能够经过GROUP BY关键字对查询到的数据进行分组,java8的流式处理也为咱们提供了这样的功能Collectors.groupingBy来操做集合。好比咱们能够按学校对上面的学生进行分组:

Map<String, List<Student>> groups = students.stream().collect(Collectors.groupingBy(Student::getSchool));

groupingBy接收一个分类器Function<? super T, ? extends K> classifier,咱们能够自定义分类器来实现须要的分类效果。

上面演示的是一级分组,咱们还能够定义多个分类器实现 多级分组,好比咱们但愿在按学校分组的基础之上再按照专业进行分组,实现以下:

Map<String, Map<String, List<Student>>> groups2 = students.stream().collect(
                Collectors.groupingBy(Student::getSchool,  // 一级分组,按学校
                Collectors.groupingBy(Student::getMajor)));  // 二级分组,按专业

实际上在groupingBy的第二个参数不是只能传递groupingBy,还能够传递任意Collector类型,好比咱们能够传递一个Collector.counting,用以统计每一个组的个数:

Map<String, Long> groups = students.stream().collect(Collectors.groupingBy(Student::getSchool, Collectors.counting()));

若是咱们不添加第二个参数,则编译器会默认帮咱们添加一个Collectors.toList()

3.3.3 分区

分区能够看作是分组的一种特殊状况,在分区中key只有两种状况:true或false,目的是将待分区集合按照条件一分为二,java8的流式处理利用ollectors.partitioningBy()方法实现分区,该方法接收一个谓词,例如咱们但愿将学生分为武大学生和非武大学生,那么能够实现以下:

Map<Boolean, List<Student>> partition = students.stream().collect(Collectors.partitioningBy(student -> "武汉大学".equals(student.getSchool())));

分区相对分组的优点在于,咱们能够同时获得两类结果,在一些应用场景下能够一步获得咱们须要的全部结果,好比将数组分为奇数和偶数。

以上介绍的全部收集器均实现自接口java.util.stream.Collector,该接口的定义以下:

public interface Collector<T, A, R> {
    /**
     * A function that creates and returns a new mutable result container.
     *
     * @return a function which returns a new, mutable result container
     */
    Supplier<A> supplier();

    /**
     * A function that folds a value into a mutable result container.
     *
     * @return a function which folds a value into a mutable result container
     */
    BiConsumer<A, T> accumulator();

    /**
     * A function that accepts two partial results and merges them.  The
     * combiner function may fold state from one argument into the other and
     * return that, or may return a new result container.
     *
     * @return a function which combines two partial results into a combined
     * result
     */
    BinaryOperator<A> combiner();

    /**
     * Perform the final transformation from the intermediate accumulation type
     * {@code A} to the final result type {@code R}.
     *
     * <p>If the characteristic {@code IDENTITY_TRANSFORM} is
     * set, this function may be presumed to be an identity transform with an
     * unchecked cast from {@code A} to {@code R}.
     *
     * @return a function which transforms the intermediate result to the final
     * result
     */
    Function<A, R> finisher();

    /**
     * Returns a {@code Set} of {@code Collector.Characteristics} indicating
     * the characteristics of this Collector.  This set should be immutable.
     *
     * @return an immutable set of collector characteristics
     */
    Set<Characteristics> characteristics();

}

咱们也能够实现该接口来定义本身的收集器,此处再也不展开。

四. 并行流式数据处理

流式处理中的不少都适合采用 分而治之 的思想,从而在处理集合较大时,极大的提升代码的性能,java8的设计者也看到了这一点,因此提供了 并行流式处理。上面的例子中咱们都是调用stream()方法来启动流式处理,java8还提供了parallelStream()来启动并行流式处理,parallelStream()本质上基于java7的Fork-Join框架实现,其默认的线程数为宿主机的内核数。

启动并行流式处理虽然简单,只须要将stream()替换成parallelStream()便可,但既然是并行,就会涉及到多线程安全问题,因此在启用以前要先确认并行是否值得(并行的效率不必定高于顺序执行),另外就是要保证线程安全。此两项没法保证,那么并行毫无心义,毕竟结果比速度更加剧要,之后有时间再来详细分析一下并行流式数据处理的具体实现和最佳实践。

相关文章
相关标签/搜索