本系列文章经补充和完善,已修订整理成书《Java编程的逻辑》(马俊昌著),由机械工业出版社华章分社出版,于2018年1月上市热销,读者好评如潮!各大网店和书店有售,欢迎购买:京东自营连接 html
![]()
上节咱们介绍了Lambda表达式和函数式接口,本节探讨它们的应用,函数式数据处理,针对常见的集合数据处理,Java 8引入了一套新的类库,位于包java.util.stream下,称之为Stream API,这套API操做数据的思路,不一样于咱们在38节到55节介绍的容器类API,它们是函数式的,很是简洁、灵活、易读,具体有什么不一样呢?因为内容较多,咱们分为两节来介绍,本节先介绍一些基本的API,下节讨论一些高级功能。java
接口Stream相似于一个迭代器,但提供了更为丰富的操做,Stream API的主要操做就定义在该接口中。 Java 8给Collection接口增长了两个默认方法,它们能够返回一个Stream,以下所示:nginx
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
default Stream<E> parallelStream() {
return StreamSupport.stream(spliterator(), true);
}
复制代码
stream()返回的是一个顺序流,parallelStream()返回的是一个并行流。顺序流就是由一个线程执行操做。而并行流背后可能有多个线程并行执行,与以前介绍的并发技术不一样,使用并行流不须要显式管理线程,使用方法与顺序流是同样的。git
下面,咱们主要针对顺序流,学习Stream接口,包括其用法和基本原理,随后咱们再介绍下并行流。先来看一些简单的示例。程序员
上节演示时使用了学生类Student和学生列表List<Student> lists
,本节继续使用它们。github
返回学生列表中90分以上的,传统上的代码通常是这样的:数据库
List<Student> above90List = new ArrayList<>();
for (Student t : students) {
if (t.getScore() > 90) {
above90List.add(t);
}
}
复制代码
使用Stream API,代码能够这样:编程
List<Student> above90List = students.stream()
.filter(t->t.getScore()>90)
.collect(Collectors.toList());
复制代码
先经过stream()获得一个Stream对象,而后调用Stream上的方法,filter()过滤获得90分以上的,它的返回值依然是一个Stream,为了转换为List,调用了collect方法并传递了一个Collectors.toList(),表示将结果收集到一个List中。swift
代码更为简洁易读了,这种数据处理方式被称为函数式数据处理,与传统代码相比,它的特色是:数组
根据学生列表返回名称列表,传统上的代码通常是这样:
List<String> nameList = new ArrayList<>(students.size());
for (Student t : students) {
nameList.add(t.getName());
}
复制代码
使用Stream API,代码能够这样:
List<String> nameList = students.stream()
.map(Student::getName)
.collect(Collectors.toList());
复制代码
这里使用了Stream的map函数,它的参数是一个Function函数式接口,这里传递了方法引用。
返回90分以上的学生名称列表,传统上的代码通常是这样:
List<String> nameList = new ArrayList<>();
for (Student t : students) {
if (t.getScore() > 90) {
nameList.add(t.getName());
}
}
复制代码
使用函数式数据处理的思路,能够将这个问题分解为由两个基本函数实现:
使用Stream API,能够将基本函数filter()和map()结合起来,代码能够这样:
List<String> above90Names = students.stream()
.filter(t->t.getScore()>90)
.map(Student::getName)
.collect(Collectors.toList());
复制代码
这种组合利用基本函数、声明式实现集合数据处理功能的编程风格,就是函数式数据处理。
代码更为直观易读了,但你可能会担忧它的性能有问题。filter()和map()都须要对流中的每一个元素操做一次,一块儿使用会不会就须要遍历两次呢?答案是否认的,只须要一次。实际上,调用filter()和map()都不会执行任何实际的操做,它们只是在构建操做的流水线,调用collect才会触发实际的遍历执行,在一次遍历中完成过滤、转换以及收集结果的任务。
像filter和map这种不实际触发执行、用于构建流水线、返回Stream的操做被称为中间操做(intermediate operation),而像collect这种触发实际执行、返回具体结果的操做被称为终端操做(terminal operation)。Stream API中还有更多的中间和终端操做,下面咱们具体来看下。
除了filter和map,Stream API的中间操做还有distinct, sorted, skip, limit, peek, mapToLong, mapToInt, mapToDouble, flatMap等,咱们逐个来看下。
distinct返回一个新的Stream,过滤重复的元素,只留下惟一的元素,是否重复是根据equals方法来比较的,distinct能够与其余函数如filter, map结合使用。
好比,返回字符串列表中长度小于3的字符串、转换为小写、只保留惟一的,代码能够为:
List<String> list = Arrays.asList(new String[]{"abc","def","hello","Abc"});
List<String> retList = list.stream()
.filter(s->s.length()<=3)
.map(String::toLowerCase)
.distinct()
.collect(Collectors.toList());
System.out.println(retList);
复制代码
输出为:
[abc, def]
复制代码
虽然都是中间操做,但distinct与filter和map是不一样的,filter和map都是无状态的,对于流中的每个元素,它的处理都是独立的,处理后即交给流水线中的下一个操做,但distinct不一样,它是有状态的,在处理过程当中,它须要在内部记录以前出现过的元素,若是已经出现过,即重复元素,它就会过滤掉,不传递给流水线中的下一个操做。
对于顺序流,内部实现时,distinct操做会使用HashSet记录出现过的元素,若是流是有顺序的,须要保留顺序,会使用LinkedHashSet。
有两个sorted方法:
Stream<T> sorted() Stream<T> sorted(Comparator<? super T> comparator) 复制代码
它们都对流中的元素排序,都返回一个排序后的Stream,第一个方法假定元素实现了Comparable接口,第二个方法接受一个自定义的Comparator。
好比,过滤获得90分以上的学生,而后按分数从高到低排序,分数同样的,按名称排序,代码能够为:
List<Student> list = students.stream()
.filter(t->t.getScore()>90)
.sorted(Comparator.comparing(Student::getScore)
.reversed()
.thenComparing(Student::getName))
.collect(Collectors.toList());
复制代码
这里,使用了Comparator的comparing, reversed和thenComparing构建了Comparator。
与distinct同样,sorted也是一个有状态的中间操做,在处理过程当中,须要在内部记录出现过的元素,与distinct不一样的是,每碰到流中的一个元素,distinct都能当即作出处理,要么过滤,要么立刻传递给下一个操做,但sorted不能,它须要先排序,为了排序,它须要先在内部数组中保存碰到的每个元素,到流结尾时,再对数组排序,而后再将排序后的元素逐个传递给流水线中的下一个操做。
它们的定义为:
Stream<T> skip(long n) Stream<T> limit(long maxSize) 复制代码
skip跳过流中的n个元素,若是流中元素不足n个,返回一个空流,limit限制流的长度为maxSize。
好比,将学生列表按照分数排序,返回第3名到第5名,代码能够为:
List<Student> list = students.stream()
.sorted(Comparator.comparing(
Student::getScore).reversed())
.skip(2)
.limit(3)
.collect(Collectors.toList());
复制代码
skip和limit都是有状态的中间操做。对前n个元素,skip的操做就是过滤,对后面的元素,skip就是传递给流水线中的下一个操做。limit的一个特色是,它不须要处理流中的全部元素,只要处理的元素个数达到maxSize,后面的元素就不须要处理了,这种能够提早结束的操做被称为短路操做。
peek的定义为:
Stream<T> peek(Consumer<? super T> action) 复制代码
它返回的流与以前的流是同样的,没有变化,但它提供了一个Consumer,会将流中的每个元素传给该Consumer。这个方法的主要目的是支持调试,可使用该方法观察在流水线中流转的元素,好比:
List<String> above90Names = students.stream()
.filter(t->t.getScore()>90)
.peek(System.out::println)
.map(Student::getName)
.collect(Collectors.toList());
复制代码
map函数接受的参数是一个Function<T, R>
,为避免装箱/拆箱,提升性能,Stream还有以下返回基本类型特定流的方法:
DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) IntStream mapToInt(ToIntFunction<? super T> mapper) LongStream mapToLong(ToLongFunction<? super T> mapper) 复制代码
DoubleStream/IntStream/LongStream是基本类型特定的流,有一些专门的更为高效的方法。好比,求学生列表的分数总和,代码能够为:
double sum = students.stream()
.mapToDouble(Student::getScore)
.sum();
复制代码
flatMap的定义为:
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) 复制代码
它接受一个函数mapper,对流中的每个元素,mapper会将该元素转换为一个流Stream,而后把新生成流的每个元素传递给下一个操做。好比:
List<String> lines = Arrays.asList(new String[]{
"hello abc",
"老马 编程"
});
List<String> words = lines.stream()
.flatMap(line -> Arrays.stream(line.split("\\s+")))
.collect(Collectors.toList());
System.out.println(words);
复制代码
这里的mapper将一行字符串按空白符分隔为了一个单词流,Arrays.stream能够将一个数组转换为一个流,输出为:
[hello, abc, 老马, 编程]
复制代码
能够看出,实际上,flatMap完成了一个1到n的映射。
针对基本类型,flatMap还有以下相似方法:
DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) 复制代码
中间操做不触发实际的执行,返回值是Stream,而终端操做触发执行,返回一个具体的值,除了collect,Stream API的终端操做还有max, min, count, allMatch, anyMatch, noneMatch, findFirst, findAny, forEach, toArray, reduce等,咱们逐个来看下。
max/min的定义为:
Optional<T> max(Comparator<? super T> comparator) Optional<T> min(Comparator<? super T> comparator) 复制代码
它们返回流中的最大值/最小值,值的注意的是,它的返回值类型是Optional<T>
,而不是T。
java.util.Optional是Java 8引入的一个新类,它是一个泛型容器类,内部只有一个类型为T的单一变量value,可能为null,也可能不为null。Optional有什么用呢?它用于准确地传递程序的语义,它清楚地代表,其表明的值可能为null,程序员应该进行适当的处理。
Optional定义了一些方法,好比:
// value不为null时返回true
public boolean isPresent() // 返回实际的值,若是为null,抛出异常NoSuchElementException public T get() // 若是value不为null,返回value,不然返回other public T orElse(T other) // 构建一个空的Optional,value为null public static<T> Optional<T> empty() // 构建一个非空的Optional, 参数value不能为null public static <T> Optional<T> of(T value) // 构建一个Optional,参数value能够为null,也能够不为null public static <T> Optional<T> ofNullable(T value) 复制代码
在max/min的例子中,经过声明返回值为Optional,咱们就知道,具体的返回值不必定存在,这发生在流中不含任何元素的状况下。
看个简单的例子,返回分数最高的学生,代码能够为:
Student student = students.stream()
.max(Comparator.comparing(Student::getScore).reversed())
.get();
复制代码
这里,假定students不为空。
count很简单,就是返回流中元素的个数。好比,统计大于90分的学生个数,代码能够为:
long above90Count = students.stream()
.filter(t->t.getScore()>90)
.count();
复制代码
这几个函数都接受一个谓词Predicate,返回一个boolean值,用于断定流中的元素是否知足必定的条件,它们的区别是:
若是流为空,这几个函数的返回值都是true。
好比,判断是否是全部学生都及格了(不小于60分),代码能够为:
boolean allPass = students.stream()
.allMatch(t->t.getScore()>=60);
复制代码
这几个操做都是短路操做,都不必定须要处理全部元素就能得出结果,好比,对于allMatch,只要有一个元素不知足条件,就能返回false。
它们的定义为:
Optional<T> findFirst() Optional<T> findAny() 复制代码
它们的返回类型都是Optional,若是流为空,返回Optional.empty()。findFirst返回第一个元素,而findAny返回任一元素,它们都是短路操做。
随便找一个不及格的学生,代码能够为:
Optional<Student> student = students.stream()
.filter(t->t.getScore()<60)
.findAny();
if(student.isPresent()){
// 不及格的学生....
}
复制代码
有两个foreach方法:
void forEach(Consumer<? super T> action) void forEachOrdered(Consumer<? super T> action) 复制代码
它们都接受一个Consumer,对流中的每个元素,传递元素给Consumer,区别在于,在并行流中,forEach不保证处理的顺序,而forEachOrdered会保证按照流中元素的出现顺序进行处理。
好比,逐行打印大于90分的学生,代码能够为:
students.stream()
.filter(t->t.getScore()>90)
.forEach(System.out::println);
复制代码
toArray将流转换为数组,有两个方法:
Object[] toArray()
<A> A[] toArray(IntFunction<A[]> generator)
复制代码
不带参数的toArray返回的数组类型为Object[],这常常不是指望的结果,若是但愿获得正确类型的数组,须要传递一个类型为IntFunction的generator,IntFunction的定义为:
public interface IntFunction<R> {
R apply(int value);
}
复制代码
generator接受的参数是流的元素个数,它应该返回对应大小的正确类型的数组。
好比,获取90分以上的学生数组,代码能够为:
Student[] above90Arr = students.stream()
.filter(t->t.getScore()>90)
.toArray(Student[]::new);
复制代码
Student[]::new
就是一个类型为IntFunction<Student[]>
的generator。
reduce表明归约或者叫折叠,它是max/min/count的更为通用的函数,将流中的元素归约为一个值,有三个reduce函数:
Optional<T> reduce(BinaryOperator<T> accumulator);
T reduce(T identity, BinaryOperator<T> accumulator);
<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);
复制代码
第一个基本等同于调用:
boolean foundAny = false;
T result = null;
for (T element : this stream) {
if (!foundAny) {
foundAny = true;
result = element;
}
else
result = accumulator.apply(result, element);
}
return foundAny ? Optional.of(result) : Optional.empty();
复制代码
好比,使用reduce求分数最高的学生,代码能够为:
Student topStudent = students.stream().reduce((accu, t) -> {
if (accu.getScore() >= t.getScore()) {
return accu;
} else {
return t;
}
}).get();
复制代码
第二个reduce函数多了一个identity参数,表示初始值,它基本等同于调用:
T result = identity;
for (T element : this stream)
result = accumulator.apply(result, element)
return result;
复制代码
第一个和第二个reduce的返回类型只能是流中元素的类型,而第三个更为通用,它的归约类型能够自定义,另外,它多了一个combiner参数,combiner用在并行流中,用于合并子线程的结果,对于顺序流,它基本等同于调用:
U result = identity;
for (T element : this stream)
result = accumulator.apply(result, element)
return result;
复制代码
注意与第二个reduce函数相区分,它的结果类型不是T,而是U。好比,使用reduce函数计算学生分数的和,代码能够为:
double sumScore = students.stream().reduce(0d,
(sum, t) -> sum += t.getScore(),
(sum1, sum2) -> sum1 += sum2
);
复制代码
以上,能够看出,reduce虽然更为通用,但比较费解,难以使用,通常状况,应该优先使用其余函数。
collect函数比reduce更为通用、强大和易用,关于它,咱们下节再详细介绍。
前面咱们提到,能够经过Collection接口的stream/parallelStream获取流,还有一些其余的方式能够获取流。
Arrays有一些stream方法,能够将数组或子数组转换为流,好比:
public static IntStream stream(int[] array) public static DoubleStream stream(double[] array, int startInclusive, int endExclusive) public static <T> Stream<T> stream(T[] array) 复制代码
好比,输出当前目录下全部普通文件的名字,代码能够为:
File[] files = new File(".").listFiles();
Arrays.stream(files)
.filter(File::isFile)
.map(File::getName)
.forEach(System.out::println);
复制代码
Stream也有一些静态方法,能够构建流:
//返回一个空流
public static<T> Stream<T> empty() //返回只包含一个元素t的流 public static<T> Stream<T> of(T t) //返回包含多个元素values的流 public static<T> Stream<T> of(T... values) //经过Supplier生成流,流的元素个数是无限的 public static<T> Stream<T> generate(Supplier<T> s) //一样生成无限流,第一个元素为seed,第二个为f(seed),第三个为f(f(seed)),依次类推 public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f) 复制代码
好比,输出10个随机数,代码能够为:
Stream.generate(()->Math.random())
.limit(10)
.forEach(System.out::println);
复制代码
输出100个递增的奇数,代码能够为:
Stream.iterate(1, t->t+2)
.limit(100)
.forEach(System.out::println);
复制代码
前面咱们主要使用的是Collection的stream()方法,换作parallelStream()方法,就会使用并行流,接口方法都是通用的。但并行流内部会使用多线程,线程个数通常与系统的CPU核数同样,以充分利用CPU的计算能力。
进一步来讲,并行流内部会使用Java 7引入的fork/join框架,简单来讲,处理由fork和join两个阶段组成,fork就是将要处理的数据拆分为小块,多线程按小块进行并行计算,join就是将小块的计算结果进行合并,具体咱们就不探讨了。使用并行流,不须要任何线程管理的代码,就能实现并行。
看的出来,使用Stream API处理数据集合,与直接使用容器类API处理数据的思路是彻底不同的。
流定义了不少数据处理的基本函数,对于一个具体的数据处理问题,解决的主要思路就是组合利用这些基本函数,实现指望的功能,这种思路就是函数式数据处理思惟,相比直接利用容器类API的命令式思惟,思考的层次更高。
Stream API的这种思路也不是新发明,它与数据库查询语言SQL是很像的,都是声明式地操做集合数据,不少函数都能在SQL中找到对应,好比filter对应SQL的where,sorted对应order by等。SQL通常都支持分组(group by)功能,Stream API也支持,但关于分组,咱们下节再介绍。
Stream API也与各类基于Unix系统的管道命令相似,熟悉Unix系统的都知道,Unix有不少命令,大部分命令只是专一于完成一件事情,但能够经过管道的方式将多个命令连接起来,完成一些复杂的功能,好比:
cat nginx_access.log | awk '{print $1}' | sort | uniq -c | sort -rnk 1 | head -n 20
复制代码
以上命令能够分析nginx访问日志,统计出访问次数最多的前20个IP地址及其访问次数。具体来讲,cat命令输出nginx访问日志到流,一行为一个元素,awk输出行的第一列,这里为IP地址,sort按IP进行排序,"uniq -c"按IP统计计数,"sort -rnk 1"按计数从高到低排序,"head -n 20"输出前20行。
本节初步介绍了Java 8引入的函数式数据处理类库,Stream API,它相似于Unix的管道命令,也相似于数据库查询语言SQL,经过组合利用基本函数,能够在更高的层次上思考问题,以声明式的方式简洁地实现指望的功能。
对于collect方法,本节只是演示了最基本的应用,它还有不少高级功能,好比实现相似SQL的group by功能,具体怎么实现?实现的原理是什么呢?
(与其余章节同样,本节全部代码位于 github.com/swiftma/pro…,位于包shuo.laoma.java8.c92下)
未完待续,查看最新文章,敬请关注微信公众号“老马说编程”(扫描下方二维码),从入门到高级,深刻浅出,老马和你一块儿探索Java编程及计算机技术的本质。用心原创,保留全部版权。