[零]java8 函数式编程入门官方文档中文版 java.util.stream 中文版 流处理的相关概念

前言

本文为java.util.stream 包文档的译文
极其个别部分可能为了更好理解,陈述略有改动,与原文几乎一致
原文可参考在线API文档
image_5b7910f0_2991

Package java.util.stream Description

一些用于支持流上函数式操做的类 ,例如在集合上的map-reduce转换。例如
int sum = widgets.stream()
.filter(b -> b.getColor() == RED)
.mapToInt(b -> b.getWeight())
.sum();
此处,咱们使用widgets, 他是一个 Collection<Widget>, 做为一个流的源,
而后在流上执行一个filter-map-reduce 来得到红色widgets重量的总和。(总和是一个归约(reduce)操做的例子)
 
这个包中引入的关键抽象是流。
类 Stream、IntStream、LongStream和DoubleStream分别是在对象Object和基本类型int、long和double类型上的流。
流与集合的不一样有如下几点:
  • 不存储数据    流不是存储元素的数据结构;相反,它经过一个哥哥计算操做组合而成的管道,从一个数据源,如数据结构、数组、生成器函数或i/o通道  来传递元素 
  • 函数特性       一个流上的操做产生一个结果,可是不会修改它的源。例如,过滤集合 得到的流会产生一个没有被过滤元素的新流,而不是从源集合中删除元素
  • 延迟搜索        许多流操做,如过滤、映射或重复删除,均可以延迟实现,从而提供出优化的机会。 
  •                      例如,“找到带有三个连续元音的第一个字符串”不须要检查全部的输入字符串。
  •                      流操做分为中间(流生成)操做和终端(值或反作用生成)操做。许多的中间操做, 如filter,map等,都是延迟执行。
  •                      中间操做老是惰性的的。
  • Stream多是无限的   虽然集合的大小是有限的,但流不须要。诸如limit(n)或findFirst()这样的短路操做能够容许在有限时间内完成无限流的计算。 
  • 消耗的          流的元素只在流的生命周期中访问一次。就像迭代器同样,必须生成一个新的流来从新访问源的相同元素 
 
流能够经过多种方式进行得到,好比
  • Collection 提供的stream   parallelStream  
  • 从数组 Arrays.stream(Object[]) 静态方法 
  • Stream类的静态工厂方法 好比  Stream.of(Object[]), IntStream.range(int, int), Stream.iterate(Object, UnaryOperator)   Stream.generate   
  • BufferedReader.lines(); 文件行
  • 获取文件路径的流: Files类的find(), lines(), list(), walk();
  • Random.ints()  随机数流
  • JDK中的许多其余流载方法,包括BitSet.stream(), Pattern.splitAsStream(java.lang.CharSequence), and JarFile.stream().
 
还能够从第三方类库的提供中建立其余一些流 ,详见 Low-level stream construction

Stream operations and pipelines流操做以及管道

流操做被划分为中间和终端操做,经过流管道组合起来。
  1. 一条流管道由一个源(如一个集合、一个数组、一个生成器函数或一个i/o通道)组成;
  2. 而后是零个或更多的中间操做,例如stream.filter 或者stream.map;
  3. 还有一个终端操做,如stream.forEach或Stream.reduce
中间操做返回一条新流,他们老是惰性的;
执行诸如filter()之类的中间操做实际上并不会当即执行任何过滤操做,而是建立了一个新流,当遍历时,它包含与给定谓词相匹配的初始流的元素。直到管道的终端操做被执行,管道源的遍历才会开始
 
终端操做,例如Stream.forEach 和 IntStream.sum,能够遍历流以产生结果或反作用。
在执行终端操做以后,流管道被认为是被消耗掉的,而且不能再被使用;
若是您须要再次遍历相同的数据源,您必须从新从数据源得到一条新流
在几乎全部状况下,终端操做都很迫切,在返回以前完成了数据源的遍历和管道的处理。只有终端操做iterator() 和 spliterator() 不是;
这些都是做为一个“逃生舱口”提供的,以便在现有操做不足以完成任务的状况下,启用任意客户控制的管道遍历
 
延迟处理流能够显著提升效率;
在像上面的filer-map-sum例子这样的管道中,过滤、映射和求和能够被融合到数据的单个传递中,而且具备最小的中间状态。
惰性还容许在没有必要的状况下避免检查全部数据;对于诸如“查找第一个超过1000个字符的字符串”这样的操做,只须要检查足够的字符串,就能够找到具备所需特征的字符串,而不须要检查源的全部字符串。(当输入流是无限的而不只仅是大的时候,这种行为就变得更加剧要了。)
 
中间操做被进一步划分为无状态和有状态操做。
无状态操做,如filter和map,在处理新元素时不保留之前处理的元素的状态——每一个元素均可以独立于其余元素的操做处理。
有状态的操做,例如distinct和sorted,则须要考虑从先前看处处理的元素中合并状态。
 
有状态操做可能须要在产生结果以前处理整个输入。
例如,直到一我的看到了流的全部元素以前  他没办法完成对流的排序
所以,在并行计算下,一些包含有状态中间操做的管道可能须要对数据进行屡次传递,或者可能须要缓冲重要数据。
包含彻底无状态的中间操做的管道能够在单次传递过程当中进行处理,不管是顺序的仍是并行的,只有最少的数据缓冲
 
此外,一些操做被认为是短路操做。一个中间操做,若是在提供无限流输入时,它可能会产生一个有限的流,那么他就是短路的。
若是在无限流做为输入时,它可能在有限的时间内终止,这个终端操做是短路的。
在管道中进行短路操做是处理无限流在有限时间内正常终止的必要条件,但不是充分条件 

Parallelism并行

经过显式的for循环处理元素本质上是串行的
流经过将计算从新定义为聚合操做的管道,而不是在每一个单独元素上当即执行操做,从而促进并行执行。
全部的流操做均可以串行或并行执行
JDK中流的实现建立的都是串行流, 除非显式的设置为并行
例如,Collection有方法Collection.stream()和Collection.parallelstream(),它们分别产生串行和并行流;
其余的流方法好比  IntStream.range(int, int) 产生串行的流,可是能够经过调用BaseStream.parallel()方法设置为 并行化
想要计算全部 widgets的重量之和 只须要
image_5b7910f0_7b5e
 
 
这个例子的串行和并行版本的惟一区别是初始时建立流,使用parallelStream()而不是stream()
当启动终端操做时,流管道是按顺序或并行执行的,这取决于它被调用的流的策略模式。
一个流是否能够串行或并行执行,能够用isParallel()方法来得到,
能够用BaseStream.sequential() 和 BaseStream.parallel() 操做修改。
当启动终端操做时,流管道是按顺序或并行执行的,这取决于它被调用的流的模式。
 
除了被肯定为显式非肯定性的操做以外,如findAny(),不管是顺序执行仍是并行执行,都不该该改变计算的结果。
 
大多数流操做接受描述用户指定行为的参数,这些参数一般是lambda表达式。
为了保持正确的行为,这些行为参数必须是不干涉non-interfering的,而且在大多数状况下必须是无状态的。
这些参数始终是函数式接口的实例,例如Function,一般是lambda表达式或方法引用

Non-interference  无干扰的 非干涉的

Streams容许您在各类数据源上执行可能并行的聚合操做,甚至包括ArrayList之类的非线程安全集合。
只有当咱们可以在流管道的执行过程当中防止对数据源的干扰时这才是可能的。
除了逃脱舱口iterator()和spliterator()以外,都是在调用终端操做时开始执行,并在终端操做完成时结束。
对于大多数数据源来讲,防止干扰意味着确保在流管道的执行过程当中根本没有修改数据源。
这方面的一个显著的例外是源是并发集合的流,它们是专门设计用来处理并发修改的。并发流源是那些Spliterator 设置了并发特性(CONCURRENT characteristic)
 
所以,在流管道中,源不是并发的行为参数,永远不该该修改流的数据源。
一个行为参数将被称之为干扰的(interfere) 若是对于一个非并发数据源来讲若是它修改或致使被修改数据源被修改.
不只仅是并行的管道须要,全部的管道都须要是非干扰的(non-interference)
除非流数据源是并发的,不然在执行流管道时修改stream的数据源可能会致使异常、错误的答案或不一致的行为。
对于表现良好的stream,数据源是能够修改的,只要是在终端操做开始以前,而且全部的修改都会包含在内
好比
image_5b7910f0_6a66
 
首先建立一个列表,由两个字符串组成:“one”;和“two”。
而后,从该列表中建立一条stream。接下来,经过添加第三个字符串:“three”来修改列表。
最后,流的元素被collect 以及joining在一块儿。因为该列表在终端收集操做开始以前被修改,结果将是一串“one two three”。
从JDK集合返回的全部流,以及大多数其余JDK类,都像这样表现良好;
对于其余库生成的流,请参阅 Low-level stream construction,以知足构建行为良好的流的需求。

Stateless behaviors无状态行为

若是流操做的行为参数是有状态的,那么流管道的结果多是不肯定的或不正确的。
有状态的lambda(或实现适当的功函数接口的其余对象)是一个其结果依赖于任何可能在流水线执行过程当中发生变化的状态。
有状态lambda的一个例子是map()的参数:
image_5b7910f0_374
在这里,若是映射操做是并行执行的,那么相同输入的结果可能因线程调度差别而变化,而对于无状态lambda表达式,结果老是相同的
 
还要注意的是,试图从行为参数访问可变状态时,在安全性和性能方面是您一个错误的选择;
若是你不一样步访问那个状态,你就有了数据竞争,所以你的代码可能出现问题,
可是若是你对那个状态进行同步访问,你就有可能会破坏你想要从并行性中获得的受益。
最好的方法是在流操做中彻底地避免有状态的行为参数; 一般总会有种方法能够重构流以免状态性

Side-effects反作用

 
通常来讲,对流操做的行为参数的反作用是不鼓励的,由于它们一般会致使不知情的违反无状态要求的行为,以及其余线程安全隐患
 
若是行为参数确实有反作用,除非显式地声明,不然就没法保证这些反作用对其余线程的可见性,也不能保证在同一条管道内的“相同”元素上的不一样操做在相同的线程中执行。此外,这些影响的排序可能出乎意料。即便管道被限制生成一个与stream源的处理顺序一致的结果(例如,IntStream.range(0,5).parallel().map(x -> x*2).toArray() 必须生成0、二、四、六、8),对于将mapper函数应用于个别元素的顺序,或者对于给定元素执行任何行为参数的顺序,都没有保证
 
对许多可能会被尝试使用于反作用的计算中,能够替换为无反作用的,更安全更有效的表达,好比使用归约而不是可变的累积器。
然而,使用println()来进行调试的反作用一般是无害的。少部分的流操做,如forEach()和peek(),用的就是他们的反作用;这些应该当心使用。
 
下面的例子演示,如何从一个使用反作用的计算转变为不适用反作用
下面的代码搜索一个字符串流,以匹配给定的正则表达式,并将匹配放在列表中
image_5b7910f0_b8a
 
这段代码没必要要地使用了反作用。若是并行执行,ArrayList的非线程安全将致使不正确的结果,而且添加所需的同步将致使竞争,从而破坏并行性的好处。
此外,在这里使用反作用是彻底没有必要的;forEach()能够简单地被替换为更安全、更高效、更适合并行化的reduce操做。
image_5b7910f0_2879

Ordering 排序

 
流可能有也可能没有定义好的顺序。流是否有顺序取决于源和中间操做。(所谓定义好的顺序,就是说原始数据源是否有序)
某些流源(如列表或数组)本质上是有序的,而其余的(如HashSet)则不是。
一些中间操做,好比sorted(),能够在无序的流中强加一个顺序,而其余的操做可能会使一个有序的流变成无序,例如BaseStream.unordered(). 
此外,一些终端操做可能会忽略顺序,好比forEach()。
 
若是一个流有序,大多数操做都被限制在顺序的元素上操做;
若是流的源是包含一、二、3的列表,那么执行map(x-x 2)的结果必须是二、四、6。
然而,若是源没有定义的顺序,那么值二、四、6的任何排列都将是一个有效的结果。
 
对于串行流,顺序的存在与否并不影响性能,只影响肯定性。
若是一个流是有序的,在相同的源上重复执行相同的流管道将产生相同的结果;
若是没有排序,重复执行可能会产生不一样的结果
 
对于并行流,放松排序的限制有时能够实现更高效的执行。
若是元素的排序不是很重要,那么能够更有效地实现某些聚合操做,如过滤重复元素(distinct()  )或分组归约(Collectors.groupingBy())。
相似地,与顺序相关的操做,如limit(),可能须要缓冲以确保正确的排序,从而破坏并行性的好处。
在流有顺序的状况下,可是用户并不特别关心这个顺序,显式地经过unordered()方法调用取消排序, 可能会改善一些有状态或终端操做的并行性能。
然而,大多数的流管道,例如上面的“blocks的重量总和”,即便在排序约束下仍然有效地并行化。
 

Reduction operations归约操做

 
一个归约操做(也称为折叠)接受一系列的输入元素,并经过重复应用组合操做将它们组合成一个简单的结果,例如查找一组数字的总和或最大值,或者将元素累积到一个列表中。streams类有多种形式的通用归约reduce操做,称为reduce()和collect(),以及多个专门化的简化形式,如sum()、max()或count()
 
固然,这样的操做能够很容易用简单的顺序循环来实现,以下所示
image_5b7910f0_6543
 
然而,咱们有充分的理由倾向于reduce操做,而不是像上面这样的迭代累计运算。
它不只是一个“更抽象的”——它在流上把流做为一个总体运行而不是做用于单独的元素——可是一个适当构造的reduce操做本质上是可并行的,只要用于处理元素的函数(s)是结合的associative和无状态stateless的。举个例子,给定一个数字流,咱们想要找到和,咱们能够写:
image_5b7910f0_1a90
 
几乎不须要怎么修改,就能够以并行的方式运行
image_5b7910f0_458
 
之因此归约操做能够很好地并行,是由于实现能够并行地操做数据的子集,而后将中间结果组合在一块儿,获得最终的正确答案。(即便该语言有一个“"parallel for-each"”构造,迭代累计运算方法仍然须要开发人员提供对共享累积变量sum的线程安全更新以及所需的同步,这可能会消除并行性带来的任何性能收益。)
使用reduce()代替了归约操做的并行化的全部负担,而且库能够提供一个高效的并行实现,不须要额外的同步
 
前面展现的“widgets”示例展现了如何与其余操做相结合,以替换for循环。
若是widgets 是Widget 对象的集合,它有一个getWeight方法,咱们能够找到最重的widget:
image_5b7910f0_3e8  
 
在更通用的形式中   对类型为T的元素,而且返回结果类型为U的reduce操做   须要三个参数:
image_5b7910f0_b3
在这里,identity不只仅是归约的初始化结果值或者若是没有任何元素时的一个默认的返回值
迭代累计运算器接受部分结果和下一个元素,并产生一个新的中间结果。
组合函数结合了两个部分结果,产生了一个新的中间结果。
(在并行减小的状况下,组合是必要的,在这个过程当中,输入被分区,每一个分区都计算出部分的累积,而后将部分结果组合起来产生最终的结果。)
 
更准确地说,identity必须是组合函数的恒等式。这意味着对全部的u,combiner.apply(identity, u)等于u,
另外,组合函数必须是结合的,必须与累加器函数兼容:
对全部u和t,
combiner.apply(identity, u) 必须等于accumulator.apply(u, t).
三参数形式是双参数形式的泛化,将映射步骤合并到累加步骤中。
咱们能够用更通常的形式从新改写这个简单的widgets重量的例子
image_5b7910f0_414e
尽管显式的map-reduce的形式更易于阅读,所以一般应该优先考虑。
通用的形式是为了  经过将映射和减小到单个函数,以重要的工做进行优化 这种场景

Mutable reduction 可变的归约

一个可变的归约操做在处理流中的元素时,将输入元素积累到一个可变的结果容器中,例如一个Collection或StringBuilder,
若是咱们想要获取一串字符串的流并将它们链接成一个长字符串,咱们能够经过普通的reduce来实现这个目标:
image_5b7910f0_1e36
 
咱们会获得想要的结果,它甚至能够并行工做,然而,可是咱们可能对性能不满意
这样的实现将会进行大量的字符串复制  时间复杂度O(n^2)
一种更有效的方法是将结果累积到StringBuilder中,这是一个用于累积字符串的可变容器
就如同咱们对普通的归约操做处理同样,咱们可使用相同的技术来处理可变的归约
 
可变归约操做称为collect()当它将指望的结果收集到一个结果容器中,例如一个集合
收集操做须要三个功能:
一个supplier 功能来构造结果容器的新实例,
一个累计运算器函数将一个输入元素合并到一个结果容器中,
一个组合函数将一个结果容器的内容合并到另外一个结果容器中。
 
它的形式与普通归约的通常形式很是类似
image_5b7910f0_2046
 
与reduce()相比,以这种抽象的方式表示收集的好处是它直接适合并行化:
咱们能够并行地累计运算部分结果,而后将它们组合起来,只要积累和组合功能知足适当的需求。
例如,为了收集流中的元素的字符串表示到ArrayList,咱们能够编写显式的for循环
image_5b7910f0_30f2
或者咱们可使用一个可并行的collect形式
image_5b7910f0_56c8
 
或者,从累加器函数中提取出来map操做,咱们能够更简洁地表达它:
image_5b7910f0_4867
 
在这里,咱们的supplier只是ArrayList的构造器,累加器将string   element元素添加到ArrayList中,组合器简单地使用addAll将字符串从一个容器复制到另外一个容器中
 
collect的三个部分——supplier, accumulator, 和combiner ——是紧密耦合的。
咱们可使用Collector来抽象的表达描述这三部分。
上面的例子能够将字符串collect到列表中,可使用一个标准收集器来重写:
image_5b7910f0_4a1c
 
将可变的归约打包成收集器有另外一个优势:可组合性。
类Collectors包含许多用于收集器的预约义工厂,包括将一个收集器转换为另外一个收集器的组合器。
例如,假设咱们有一个Collector,它计算员工流的薪水之和,以下所列
 
image_5b7910f0_175
 
 
(对于第二个类型的参数  ?  ,仅仅代表咱们不关心收集器所使用的中间类型。 )若是咱们想要建立一个收集器来按部门计算工资的总和,咱们可使用groupingBy来重用summingSalaries 薪水:
image_5b7910f0_256d
就像常规的reduce操做同样,只有知足适当的条件collect()  操做才可以并行化
对于任何部分累计运算的结果,将其与空结果容器相结合combiner  必须产生一个等效的结果
也就是说,对于任意一个部分累计运算的结果p,累计运算或者组合调用的结果,p必须等于   combiner.apply(p, supplier.get()).
 
并且,不管计算是否分割,它必须产生一个等价的结果。对于任何输入元素t1和t2,下面计算的结果r1和r2必须是等价的
image_5b7910f0_1d27
在这里,等价一般指的是Object.equals(Object).。但在某些状况下,等价性的要求可能会下降
 

Reduction, concurrency, and ordering 归约 并发与排序

 
经过一些复杂的reduce操做,例如生成map的collect(),例如
image_5b7910f0_604
并行执行操做可能实际上会产生反效果。这是由于组合步骤(经过键将一个Map合并到另外一个Map)对于某些Map实现来讲可能代价很大
 
然而,假设在这个reduce中使用的结果容器是一个可修改的集合——例如ConcurrentHashMap。在这种状况下,对迭代累计运算器的并行调用实际上能够将它们的结果并发地放到相同的共享结果容器中,从而将再也不须要组合器合并不一样的结果容器。这可能会促进并行执行性能的提高。咱们称之为并行reduce
 
支持并发reduce的收集器以Collector.Characteristics.CONCURRENT characteristic特性为标志。并发特性。然而,并发集合也有缺点。
若是多个线程将结果并发地存入一个共享容器,那么产生结果的顺序是不肯定的。
所以,只有在排序对正在处理的流不重要的状况下,才可能执行并发的reduce
下面这些条件下   Stream.collect(Collector) 的实现会并发reduce(归约)
  • 流是并行的;
  • 收集器有Collector.Characteristics.CONCURRENT 特性
  • 要么是无序的流,要么收集器拥有Collector.Characteristics.UNORDERED 特性
 
您能够经过使用BaseStream.unordered()方法来确保流是无序的。例如:
image_5b7910f0_2a7e
 
(Collectors.groupingByConcurrent(java.util.function.Function<? super T, ? extends K>) 等同于 groupingBy). 

Associativity 结合性

 
若是一个操做或者函数方法知足下面的形式,那么他就是结合的
image_5b7910f0_724c
若是咱们把这个问题扩大到四项,就能够看到这种结合性对于并行的重要性
image_5b7910f0_43e2
这样咱们就能够把(a op b)  和 (c op d) 进行并行计算  最后在对他们进行  op  运算
结合性操做的例子包括数字加法、min、max和字符串串联

Low-level stream construction  低级流构造器

 
到目前为止,全部的流示例都使用了Collection.stream()或Arrays.stream(Object)等方法来得到一个stream。这些处理流的方法是如何实现的?
 
类StreamSupport提供了许多用于建立流的低级方法,全部这些方法都使用某种形式的Spliterator。
一个Spliterator是迭代器的一个并行版本;
它描述了一个(多是无限的)元素集合,支持顺序前进、批量遍历,并将一部分输入分割成另外一个可并行处理的Spliterator。
在最低层,全部的流都由一个Spliterator驱动构造
 
在实现Spliterator时,有许多实现选择,几乎全部的实现都是在简单的实现和运行时性能之间进行权衡。
建立Spliterator的最简单、但最不高性能的方法是,使用 Spliterators.spliteratorUnknownSize(java.util.Iterator, int)从一个iterator中建立spliterator  。
虽然这样的spliterator 能够工做,但它可能会提供糟糕的并行性能,由于咱们已经丢失了容量信息(底层数据集有多大),以及被限制为一个简单的分割算法。
 
一个高质量的spliterator 将提供平衡的和已知大小的分割,精确的容量信息,以及一些可用于实现优化执行的spliterator 或数据的其余特征  (特征见spliterator characteristics)
 
可变数据源的Spliterators 有一个额外的挑战;绑定到数据的时间,由于数据可能在建立Spliterators 后和开始执行流管道的期间,发生变化。
理想状况下,一个流的spliterator将报告一个IMMUTABLE or CONCURRENT;若是不是,应该是后期绑定(late-binding)。
若是一个源不能直接提供一个推荐的spliterator,它可能会经过Supplier 间接地提供一个spliterator,并经过接收Supplier做为参数的stream()版本构造一个stream。只有在流管道的终端操做以后,才从Supplier处得到spliterator
 
这些要求极大地减小了流源的变化和流管道的执行之间的潜在的干扰。
基于具备所需特性的spliterators ,或者使用 Supplier-based 的工厂的形式的流,在终端操做开始以前对数据源的修改是不受影响的(若是流操做的行为参数知足不干涉和无状态的要求标准)。参见不干涉 Non-Interference的细节。
相关文章
相关标签/搜索