在一块儿来学Java8(七)——Stream(中)
咱们学习了Stream.collect
的用法,今天咱们来学习下Stream.reduce
的用法。java
reduce操做能够理解成对Stream中元素累计处理,它有三个重载方法。微信
Optional<T> reduce(BinaryOperator<T> accumulator);
T reduce(T identity, BinaryOperator<T> accumulator);
<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);
先来看下重载1方法,这个方法须要咱们传入一个参数,参数名字定义为收集器
,顾名思义是须要咱们对元素进行收集。多线程
下面经过一个数值累加的例子来讲明reduce
的基本用法。app
Optional<Integer> opt = Stream.of(1, 2, 3, 4) .reduce((n1, n2) -> { int ret = n1 + n2; System.out.println(n1 + "(n1) + " + n2 + "(n2) = " + ret); return ret; }); int sum = opt.orElse(0); System.out.println("sum:" + sum);
打印:框架
1(n1) + 2(n2) = 3 3(n1) + 3(n2) = 6 6(n1) + 4(n2) = 10 sum:10
这个例子中对Stream例子中的三个数字进行相加,获得总和。最后返回一个Optional<Integer>
对象是由于考虑到Stream中没有元素的状况,所以返回结果是未知的,应该由开发者来肯定返回值。ide
在Lambda表达式中提供了两个参数n1,n2。从打印结果中能够看出,n1,n2最开始分别是Stream中第一,第二两个元素,把这两个数进行相加后返回,而后带着这个结果再次进入到Lambda表达式中,n1是前一次相加后的值,n2是下一个元素值。学习
这段代码效果等同于:ui
int[] arr = { 1, 2, 3, 4}; int sum = 0; for (int i : arr) { sum += i; }
再来看下重载3,这个方法有三个参数,每一个参数说明以下:this
若是Stream对象是串行的,那么只有accumulator生效,combiner是不生效的。线程
使用Stream.parallel()
的方法开启并行模式,使用Stream.sequential()
开启串行模式,默认开启的是串行模式。
并行模式能够简单理解为在多线程中执行,每一个线程中单独执行它的任务。串行则是在单一线程中顺序执行。
下面来看下重载3的例子:
int sum = Stream.of(1, 2, 3, 4) .reduce(0, (n1, n2) -> { int ret = n1 + n2; System.out.println(n1 + "(n1) + " + n2 + "(n2) = " + ret); return ret; }, (s1, s2) -> { int ret = s1 + s2; System.out.println(s1 + "(s1) + " + s2 + "(s2) = " + ret); return ret; }); System.out.println("sum:" + sum);
打印:
0(n1) + 1(n2) = 1 1(n1) + 2(n2) = 3 3(n1) + 3(n2) = 6 6(n1) + 4(n2) = 10 sum:10
能够看到,在串行模式下并没运行combiner参数,只运行了accumulator参数,从给定的初始值0开始累加。
这里已经指定了初始值(identity),所以返回类型就是初始值的类型。
咱们把例子改为并行模式,而后看下执行结果。
int sum = Stream.of(1, 2, 3, 4) .parallel() // 并行模式 .reduce(0, (n1, n2) -> { int ret = n1 + n2; System.out.println(n1 + "(n1) + " + n2 + "(n2) = " + ret); return ret; }, (s1, s2) -> { int ret = s1 + s2; System.out.println(s1 + "(s1) + " + s2 + "(s2) = " + ret); return ret; }); System.out.println("sum:" + sum);
打印:
0(n1) + 3(n2) = 3 0(n1) + 1(n2) = 1 0(n1) + 2(n2) = 2 1(s1) + 2(s2) = 3 0(n1) + 4(n2) = 4 3(s1) + 4(s2) = 7 3(s1) + 7(s2) = 10 sum:10
从打印的结果中咱们能够看到几个现象:
n1
参数始终是0由于是并行模式,前2个现象很好理解,那为何n1
参数始终是0?
由于开了并行模式后,运行reduce方法的底层是使用了ForkJoinPool
(分支/合并框架)。
分支/合并框架
的原理是将一个大任务拆分红多个子任务,这些子任务并行处理本身的事情,而后框架将这些子任务的结果合并起来,生成一个最终结果。
每一个子任务之间是没有关联的,它们的执行状态都是同样的,所以每一个子任务给到的初始值(identity)都是同样的,在本例中是0
同时须要一个合并方法用来合并每一个子任务的处理结果,而后最终返回,使用数学表达式即为:
(0+1) + (0+2) + (0+3) + (0+4) = 10
再来看下重载3这个方法签名,每一个参数的分工都明确了。
reduce(identity, accumulator, combiner)
查看reduce方法文档,发现有下面一段话:
Performs a reduction on the elements of this stream, using the provided identity value and an associative accumulation function, and returns the reduced value. This is equivalent to: T result = identity; for (T element : this stream) result = accumulator.apply(result, element) return result; The identity value must be an identity for the accumulator function. This means that for all t, accumulator.apply(identity, t) is equal to t. The accumulator function must be an associative function.
其中有一句重要的话:This means that for all t, accumulator.apply(identity, t) is equal to t.
简单来讲,必需要知足下面这个公式:
accumulator.apply(identity, t) == t
若是不知足的话,在并行模式下执行accumulator会有问题。
咱们把上一个例子中的初始值改为1,而后看看执行结果
int sum = Stream.of(1, 2, 3, 4) .parallel() // 这里改为了1 .reduce(1, (n1, n2) -> { int ret = n1 + n2; System.out.println(n1 + "(n1) + " + n2 + "(n2) = " + ret); return ret; }, (s1, s2) -> { int ret = s1 + s2; System.out.println(s1 + "(s1) + " + s2 + "(s2) = " + ret); return ret; }); System.out.println("sum:" + sum);
打印:
1(n1) + 3(n2) = 4 1(n1) + 4(n2) = 5 4(s1) + 5(s2) = 9 1(n1) + 2(n2) = 3 1(n1) + 1(n2) = 2 2(s1) + 3(s2) = 5 5(s1) + 9(s2) = 14 sum:14
理想中的结果应该是11才对,即1 + 1 + 2 + 3 + 4
。能够看到在并行模式下对identity的值是有要求的。 必须知足公式:accumulator.apply(identity, t) == t
。
这里accumulator.apply(identity, t) == t
即为:accumulator.apply(1, 1) == 1
,使用数学表达式表示:
1(identity) + 1 == 1
显然这个等式是不成立的,把identity改为0则公式成立:0 + 1 == 1
紧接着,对于combiner参数,须要知足另外一个公式:
combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)
在这个例子中,咱们取第一次执行combiner状况: 4(s1) + 5(s2) = 9
,套用公式即为:
combiner.apply(5, accumulator.apply(1, 4)) == accumulator.apply(5, 4)
在这里u=5,identity=1,t=4
转换成数学表达式为:5 + (1 + 4) == 5 + 4
显然这个等式是不成立的,把identity改为0,等式就成立了:5 + (0 + 4) == 5 + 4
总结一下
使用reduce(identity, accumulator, combiner)
方法时,必须同时知足下面两个公式:
accumulator.apply(identity, t) == t
combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)
这个方法实际上是reduce(identity, accumulator, combiner)的一种特殊形式,只不过是把combiner部分用accumulator来代替了,即
reduce(identity, accumulator)
等同于reduce(identity, accumulator, accumulator)
所以reduce(identity, accumulator)
的使用方式和注意事项是跟reduce(identity, accumulator, combiner)
同样的,这里再也不赘述。
本篇主要讲解了Stream.reduce的使用方法及注意事项,在并行模式下,reduce是使用分支/合并框架
实现的,在下一篇文章中咱们开始学习分支/合并框架
。
按期分享技术干货,一块儿学习,一块儿进步!微信公众号:猿敲月下码