在前面三章中,咱们已经看到了新的 Stream 接口可让你以声明性方式处理数据集。咱们还解释了将外部迭代换为内部迭代可以让原生Java库控制流元素的处理。这种方法让Java程序员无需显式实现优化来为数据集的处理加速。到目前为止,最重要的好处是能够对这些集合执行操做流水线,可以自动利用计算机上的多个内核。java
例如,在Java 7以前,并行处理数据集合很是麻烦。第一,你得明确地把包含数据的数据结构分红若干子部分。第二,你要给每一个子部分分配一个独立的线程。第三,你须要在恰当的时候对它们进行同步来避免不但愿出现的竞争条件,等待全部线程完成,最后把这些部分结果合并起来。Java 7引入了一个叫做分支/合并的框架,让这些操做更稳定、更不易出错。git
在本章中,咱们将了解 Stream 接口如何让你不用太费力气就能对数据集执行并行操做。它容许你声明性地将顺序流变为并行流。此外,你将看到Java是如何变戏法的,或者更实际地来讲,流是如何在幕后应用Java 7引入的分支/合并框架的。你还会发现,了解并行流内部是如何工做的很重要,由于若是你忽视这一方面,就可能因误用而获得意外的(极可能是错的)结果。程序员
咱们会特别演示,在并行处理数据块以前,并行流被划分为数据块的方式在某些状况下偏偏是这些错误且没法解释的结果的根源。所以,咱们将会学习如何经过实现和使用你本身的Spliterator 来控制这个划分过程。github
在第4章的笔记中,咱们简要地了解到了 Stream 接口可让你很是方便地处理它的元素:能够经过对收集源调用 parallelStream 方法来把集合转换为并行流。并行流就是一个把内容分红多个数据块,并用不一样的线程分别处理每一个数据块的流。这样一来,你就能够自动把给定操做的工做负荷分配给多核处理器的全部内核,让它们都忙起来。让咱们用一个简单的例子来试验一下这个思想。算法
假设你须要写一个方法,接受数字n做为参数,并返回从1到给定参数的全部数字的和。一个直接(也许有点土)的方法是生成一个无穷大的数字流,把它限制到给定的数目,而后用对两个数字求和的 BinaryOperator 来归约这个流,以下所示:编程
public static long sequentialSum(long n) {
// 生成天然数无限流
return Stream.iterate(1L, i -> i + 1)
// 限制到前n个数
.limit(n)
// 对全部数字求和来概括流
.reduce(0L, Long::sum);
}
复制代码
用更为传统的Java术语来讲,这段代码与下面的迭代等价:数组
public static long iterativeSum(long n) {
long result = 0;
for (long i = 0; i <= n; i++) {
result += i;
}
return result;
}
复制代码
这彷佛是利用并行处理的好机会,特别是n很大的时候。那怎么入手呢?你要对结果变量进行同步吗?用多少个线程呢?谁负责生成数呢?谁来作加法呢?根本用不着担忧啦。用并行流的话,这问题就简单多了!数据结构
咱们能够把流转换成并行流,从而让前面的函数归约过程(也就是求和)并行运行——对顺序流调用 parallel 方法:架构
public static long parallelSum(long n) {
// 生成天然数无限流
return Stream.iterate(1L, i -> i + 1)
// 限制到前n个数
.limit(n)
// 将流转为并行流
.parallel()
// 对全部数字求和来概括流
.reduce(0L, Long::sum);
}
复制代码
并行流的执行过程:app
请注意,在现实中,对顺序流调用 parallel 方法并不意味着流自己有任何实际的变化。它在内部实际上就是设了一个 boolean 标志,表示你想让调用 parallel 以后进行的全部操做都并行执行。相似地,你只须要对并行流调用 sequential 方法就能够把它变成顺序流。请注意,你可能觉得把这两个方法结合起来,就能够更细化地控制在遍历流时哪些操做要并行执行,哪些要顺序执行。例如,你能够这样作:
stream.parallel()
.filter(...)
.sequential()
.map(...)
.parallel()
.reduce();
复制代码
但最后一次 parallel 或 sequential 调用会影响整个流水线。在本例中,流水线会并行执行,由于最后调用的是它。
回到咱们的数字求和练习,咱们说过,在多核处理器上运行并行版本时,会有显著的性能提高。如今你有三个方法,用三种不一样的方式(迭代式、顺序概括和并行概括)作彻底相同的操做,让咱们看看谁最快吧!
咱们声称并行求和方法应该比顺序和迭代方法性能好。然而在软件工程上,靠猜绝对不是什么好办法!特别是在优化性能时,你应该始终遵循三个黄金规则:测量,测量,再测量。
public static long measurePerf(Function<Long, Long> adder, long n) {
long fastest = Long.MAX_VALUE;
for (int i = 0; i < 10; i++) {
long start = System.nanoTime();
long sum = adder.apply(n);
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Result: " + sum);
if (duration < fastest) {
fastest = duration;
}
}
return fastest;
}
复制代码
这个方法接受一个函数和一个 long 做为参数。它会对传给方法的 long 应用函数10次,记录每次执行的时间(以毫秒为单位),并返回最短的一次执行时间。假设你把先前开发的全部方法都放进了一个名为 ParallelStreams 的类,你就能够用这个框架来测试顺序加法器函数对前一千万个天然数求和要用多久:
System.out.println("Sequential sum done in:" +
measurePerf(ParallelStreams::sequentialSum, 10_000_000) + " msecs");
复制代码
请注意,咱们对这个结果应持保留态度。影响执行时间的因素有不少,好比你的电脑支持多少个内核。你能够在本身的机器上跑一下这些代码。在一台i5 6200U 的笔记本上运行它,输出是这样的:
Sequential sum done in:110 msecs
复制代码
用传统 for 循环的迭代版本执行起来应该会快不少,由于它更为底层,更重要的是不须要对原始类型作任何装箱或拆箱操做。若是你试着测量它的性能:
System.out.println("Iterative sum done in:" +
measurePerf(ParallelStreams::iterativeSum, 10_000_000) + " msecs");
复制代码
将获得:
Iterative sum done in:4 msecs
复制代码
如今咱们来对函数的并行版本作测试:
System.out.println("Parallel sum done in: " +
measurePerf(ParallelStreams::parallelSum, 10_000_000) + " msecs");
复制代码
看看会出现什么状况:
Parallel sum done in: 525 msecs
复制代码
这至关使人失望,求和方法的并行版本比顺序版本要慢不少。你如何解释这个意外的结果呢?这里实际上有两个问题:
第二个问题更有意思一点,由于你必须意识到某些流操做比其余操做更容易并行化。具体来讲, iterate 很难分割成可以独立执行的小块,由于每次应用这个函数都要依赖前一次应用的结果。
这意味着,在这个特定状况下,概括进程不是像上图那样进行的;整张数字列表在概括过程开始时没有准备好,于是没法有效地把流划分为小块来并行处理。把流标记成并行,你实际上是给顺序处理增长了开销,它还要把每次求和操做分到一个不一样的线程上。
这就说明了并行编程可能很复杂,有时候甚至有点违反直觉。若是用得不对(好比采用了一个不易并行化的操做,如 iterate ),它甚至可能让程序的总体性能更差,因此在调用那个看似神奇的 parallel 操做时,了解背后到底发生了什么是颇有必要的。
使用更有针对性的方法
那到底要怎么利用多核处理器,用流来高效地并行求和呢?咱们在第5章中讨论了一个叫LongStream.rangeClosed 的方法。这个方法与 iterate 相比有两个优势。
让咱们先看一下它用于顺序流时的性能如何,看看拆箱的开销到底要没关系:
public static long rangedSum(long n) {
return LongStream.rangeClosed(1, n)
.reduce(0L, Long::sum);
}
复制代码
这一次的输出是:
Ranged sum done in: 5 msecs
复制代码
这个数值流比前面那个用 iterate 工厂方法生成数字的顺序执行版本要快得多,由于数值流避免了非针对性流那些不必的自动装箱和拆箱操做。因而可知,选择适当的数据结构每每比并行化算法更重要。但要是对这个新版本应用并行流呢?
public static long parallelRangedSum(long n) {
return LongStream.rangeClosed(1, n)
.parallel()
.reduce(0L, Long::sum);
}
复制代码
如今把这个函数传给的测试方法:
System.out.println("Parallel range sum done in:" +
measurePerf(ParallelStreams::parallelRangedSum, 10_000_000) +
" msecs");
复制代码
你会获得:
Parallel range sum done in:2 msecs
复制代码
amazing!终于,咱们获得了一个比顺序执行更快的并行概括,由于这一次概括操做能够像并行流执行图那样执行了。这也代表,使用正确的数据结构而后使其并行工做可以保证最佳的性能。
尽管如此,请记住,并行化并非没有代价的。并行化过程自己须要对流作递归划分,把每一个子流的概括操做分配到不一样的线程,而后把这些操做的结果合并成一个值。但在多个内核之间移动数据的代价也可能比你想的要大,因此很重要的一点是要保证在内核中并行执行工做的时间比在内核之间传输数据的时间长。总而言之,不少状况下不可能或不方便并行化。然而,在使用并行 Stream 加速代码以前,你必须确保用得对;若是结果错了,算得快就毫无心义了。让咱们来看一个常见的陷阱。
错用并行流而产生错误的首要缘由,就是使用的算法改变了某些共享状态。下面是另外一种实现对前n个天然数求和的方法,但这会改变一个共享累加器:
public static long sideEffectSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n)
.forEach(accumulator::add);
return accumulator.total;
}
public static class Accumulator {
private long total = 0;
public void add(long value) {
total += value;
}
}
复制代码
这种代码很是广泛,特别是对那些熟悉指令式编程范式的程序员来讲。这段代码和你习惯的那种指令式迭代数字列表的方式很像:初始化一个累加器,一个个遍历列表中的元素,把它们和累加器相加。
那这种代码又有什么问题呢?不幸的是,它真的无可救药,由于它在本质上就是顺序的。每次访问 total 都会出现数据竞争。若是你尝试用同步来修复,那就彻底失去并行的意义了。为了说明这一点,让咱们试着把 Stream 变成并行的:
public static long sideEffectParallelSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n)
.parallel()
.forEach(accumulator::add);
return accumulator.total;
}
复制代码
执行测试方法,并打印每次执行的结果:
System.out.println("SideEffect parallel sum done in: " +
measurePerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) + " msecs");
复制代码
你可能会获得相似于下面这种输出:
Result: 9869563545574
Result: 12405006536090
Result: 8268141260766
Result: 11208597038187
Result: 12358062322272
Result: 19218969315182
Result: 11255083226412
Result: 25746147125980
Result: 13327069088874
SideEffect parallel sum done in: 4 msecs
复制代码
这回方法的性能可有可无了,惟一要紧的是每次执行都会返回不一样的结果,都离正确值50000005000000 差很远。这是因为多个线程在同时访问累加器,执行 total += value ,而这一句虽然看似简单,却不是一个原子操做。问题的根源在于, forEach 中调用的方法有反作用,它会改变多个线程共享的对象的可变状态。要是你想用并行 Stream 又不想引起相似的意外,就必须避免这种状况。
如今你知道了,共享可变状态会影响并行流以及并行计算。如今,记住要避免共享可变状态,确保并行 Stream 获得正确的结果。接下来,咱们会看到一些实用建议,你能够由此判断何时能够利用并行流来提高性能。
通常而言,想给出任何关于何时该用并行流的定量建议都是不可能也毫无心义的,由于任何相似于“仅当至少有一千个(或一百万个或随便什么数字)元素的时候才用并行流)”的建议对于某台特定机器上的某个特定操做多是对的,但在略有差别的另外一种状况下可能就是大错特错。尽管如此,咱们至少能够提出一些定性意见,帮你决定某个特定状况下是否有必要使用并行流。
最后,咱们还要强调并行流背后使用的基础架构是Java 7中引入的分支/合并框架。并行汇总的示例证实了要想正确使用并行流,了解它的内部原理相当重要,因此咱们会在下一节仔细研究分支/合并框架。
分支/合并框架的目的是以递归方式将能够并行的任务拆分红更小的任务,而后将每一个子任务的结果合并起来生成总体结果。它是 ExecutorService 接口的一个实现,它把子任务分配给线程池(称为 ForkJoinPool )中的工做线程。首先来看看如何定义任务和子任务。
要把任务提交到这个池,必须建立 RecursiveTask 的一个子类,其中 R 是并行化任务(以及全部子任务)产生的结果类型,或者若是任务不返回结果,则是 RecursiveAction 类型(固然它可能会更新其余非局部机构)。要定义 RecursiveTask, 只需实现它惟一的抽象方法compute :
protected abstract R compute();
复制代码
这个方法同时定义了将任务拆分红子任务的逻辑,以及没法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。正因为此,这个方法的实现相似于下面的伪代码:
if (任务足够小或不可分) {
顺序计算该任务
} else {
将任务分红两个子任务
递归调用本方法,拆分每一个子任务,等待全部子任务完成
合并每一个子任务的结果
}
复制代码
通常来讲并无确切的标准决定一个任务是否应该再拆分,但有几种试探方法能够帮助你作出这一决定。
你可能已经注意到,这只不过是著名的分治算法的并行版本而已。这里举一个用分支/合并框架的实际例子,还之前面的例子为基础,让咱们试着用这个框架为一个数字范围(这里用一个long[] 数组表示)求和。如前所述,你须要先为RecursiveTask类作一个实现,就是下面代码清单中的ForkJoinSumCalculator 。
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
/** * 再也不将任务分解为子任务的数组大小 */
public static final long THRESHOLD = 10_000;
/** * 要求和的数组 */
private final long[] numbers;
/** * 子任务处理的数组的起始和终止位置 */
private final int start;
private final int end;
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
// 该任务负责求和的部分的大小
int length = end - start;
// 若是大小小于或等于阈值,顺序计算结果
if (length <= THRESHOLD) {
return computeSequentially();
}
// 建立一个子任务来为数组的前一半求和
ForkJoinSumCalculator leftTask =
new ForkJoinSumCalculator(numbers, start, start + length / 2);
leftTask.fork();
// 利用另外一个ForkJoinPool线程异步执行新建立的子任务
ForkJoinSumCalculator rightTask =
new ForkJoinSumCalculator(numbers, start + length / 2, end);
// 同步执行第二个子任务,有可能容许进一步递归划分
Long rightResult = rightTask.compute();
// 读取第一个子任务的结果,若是还没有完成就等待
Long leftResult = leftTask.join();
// 该任务的结果是两个子任务结果的组合
return leftResult + rightResult;
}
private Long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
}
复制代码
如今编写一个方法来并行对前n个天然数求和就很简单了。你只需把想要的数字数组传给ForkJoinSumCalculator 的构造函数:
public static long forkJoinSum(long n) {
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
复制代码
这里用了一个 LongStream 来生成包含前n个天然数的数组,而后建立一个 ForkJoinTask( RecursiveTask 的父类),并把数组传递 ForkJoinSumCalculator 的公共构造函数。最后,你建立了一个新的 ForkJoinPool ,并把任务传给它的调用方法 。在ForkJoinPool 中执行时,最后一个方法返回的值就是 ForkJoinSumCalculator 类定义的任务结果。
请注意在实际应用时,使用多个 ForkJoinPool 是没有什么意义的。正是出于这个缘由,通常来讲把它实例化一次,而后把实例保存在静态字段中,使之成为单例,这样就能够在软件中任何部分方便地重用了。这里建立时用了其默认的无参数构造函数,这意味着想让线程池使用JVM可以使用的全部处理器。更确切地说,该构造函数将使用 Runtime.availableProcessors 的返回值来决定线程池使用的线程数。请注意 availableProcessors 方法虽然看起来是处理器,但它实际上返回的是可用内核的数量,包括超线程生成的虚拟内核。
当把 ForkJoinSumCalculator 任务传给 ForkJoinPool 时,这个任务就由池中的一个线程执行,这个线程会调用任务的 compute 方法。该方法会检查任务是否小到足以顺序执行,若是不够小则会把要求和的数组分红两半,分给两个新的 ForkJoinSumCalculator ,而它们也由ForkJoinPool 安排执行。所以,这一过程能够递归重复,把原任务分为更小的任务,直到知足不方便或不可能再进一步拆分的条件(本例中是求和的项目数小于等于10 000)。这时会顺序计算每一个任务的结果,而后由分支过程建立的(隐含的)任务二叉树遍历回到它的根。接下来会合并每一个子任务的部分结果,从而获得总任务的结果。
你能够再用一次本章开始时写的测试框架,来看看显式使用分支/合并框架的求和方法的性能:
System.out.println("ForkJoin sum done in: " + measurePerf(
ForkJoinSumCalculator::forkJoinSum, 10_000_000) + " msecs");
复制代码
它生成如下输出:
ForkJoin sum done in: 41 msecs
复制代码
这个性能看起来比用并行流的版本要差,但这只是由于必须先要把整个数字流都放进一个long[] ,以后才能在 ForkJoinSumCalculator 任务中使用它。
虽然分支/合并框架还算简单易用,不幸的是它也很容易被误用。如下是几个有效使用它的最佳作法。
对于分支/合并拆分策略还有最后一点补充:你必须选择一个标准,来决定任务是要进一步拆分仍是已小到能够顺序求值。
在 ForkJoinSumCalculator 的例子中,咱们决定在要求和的数组中最多包含10 000个项目时就再也不建立子任务了。这个选择是很随意的,但大多数状况下也很难找到一个好的启发式方法来肯定它,只能试几个不一样的值来尝试优化它。在咱们的测试案例中,咱们先用了一个有1000万项目的数组,意味着 ForkJoinSumCalculator 至少会分出1000个子任务来。这彷佛有点浪费资源,由于咱们用来运行它的机器上只有四个内核。在这个特定例子中可能确实是这样,由于全部的任务都受CPU约束,预计所花的时间也差很少。
但分出大量的小任务通常来讲都是一个好的选择。这是由于,理想状况下,划分并行任务时,应该让每一个任务都用彻底相同的时间完成,让全部的CPU内核都一样繁忙。不幸的是,实际中,每一个子任务所花的时间可能天差地别,要么是由于划分策略效率低,要么是有不可预知的缘由,好比磁盘访问慢,或是须要和外部服务协调执行。
分支/合并框架工程用一种称为工做窃取(work stealing)的技术来解决这个问题。在实际应用中,这意味着这些任务差很少被平均分配到 ForkJoinPool 中的全部线程上。每一个线程都为分配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执行。基于前面所述的缘由,某个线程可能早早完成了分配给它的全部任务,也就是它的队列已经空了,而其余的线程还很忙。这时,这个线程并无闲下来,而是随机选了一个别的线程,从队列的尾巴上“偷走”一个任务。这个过程一直继续下去,直到全部的任务都执行完毕,全部的队列都清空。这就是为何要划成许多小任务而不是少数几个大任务,这有助于更好地在工做线程之间平衡负载。
通常来讲,这种工做窃取算法用于在池中的工做线程之间从新分配和平衡任务。当工做线程队列中有一个任务被分红两个子任务时,一个子任务就被闲置的工做线程“偷走”了。如前所述,这个过程能够不断递归,直到规定子任务应顺序执行的条件为真。
如今你应该清楚流如何使用分支/合并框架来并行处理它的项目了,不过还有一点没有讲。本节中咱们分析了一个例子,你明确地指定了将数字数组拆分红多个任务的逻辑。可是,使用本章前面讲的并行流时就用不着这么作了,这就意味着,确定有一种自动机制来为你拆分流。这种新的自动机制称为 Spliterator ,咱们会在下一节中讨论。
Spliterator 是Java 8中加入的另外一个新接口;这个名字表明“可分迭代器”(splitableiterator)。和 Iterator 同样, Spliterator 也用于遍历数据源中的元素,但它是为了并行执行而设计的。虽然在实践中可能用不着本身开发 Spliterator ,但了解一下它的实现方式会让你对并行流的工做原理有更深刻的了解。Java 8已经为集合框架中包含的全部数据结构提供了一个默认的 Spliterator 实现。集合实现了 Spliterator 接口,接口提供了一个 spliterator 方法。这个接口定义了若干方法,以下面的代码清单所示。
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characteristics();
}
复制代码
与往常同样, T 是 Spliterator 遍历的元素的类型。 tryAdvance 方法的行为相似于普通的Iterator ,由于它会按顺序一个一个使用 Spliterator 中的元素,而且若是还有其余元素要遍历就返回 true 。但 trySplit 是专为 Spliterator 接口设计的,由于它能够把一些元素划出去分给第二个 Spliterator (由该方法返回),让它们两个并行处理。 Spliterator 还可经过estimateSize 方法估计还剩下多少元素要遍历,由于即便不那么确切,能快速算出来是一个值也有助于让拆分均匀一点。
重要的是,要了解这个拆分过程在内部是如何执行的,以便在须要时可以掌控它。所以,咱们会在下一节中详细地分析它。
将 Stream 拆分红多个部分的算法是一个递归过程。第一步是对第一个Spliterator 调用 trySplit ,生成第二个 Spliterator 。第二步对这两个 Spliterator 调用trysplit ,这样总共就有了四个 Spliterator 。这个框架不断对 Spliterator 调用 trySplit直到它返回 null ,代表它处理的数据结构不能再分割,如第三步所示。最后,这个递归拆分过程到第四步就终止了,这时全部的 Spliterator 在调用 trySplit 时都返回了 null 。
这个拆分过程也受 Spliterator 自己的特性影响,而特性是经过 characteristics 方法声明的。
让咱们来看一个可能须要你本身实现 Spliterator 的实际例子。咱们要开发一个简单的方法来数数一个 String 中的单词数。这个方法的一个迭代版本能够写成下面的样子。
public static int countWordsIteratively(String s) {
int counter = 0;
boolean lastSpace = true;
for (char c : s.toCharArray()) {
if (Character.isWhitespace(c)) {
lastSpace = true;
} else {
if (lastSpace) {
counter++;
}
lastSpace = Character.isWhitespace(c);
}
}
return counter;
}
复制代码
让咱们把这个方法用在但丁的《神曲》的《地狱篇》的第一句话上:
public static final String SENTENCE =
" Nel mezzo del cammin di nostra vita " +
"mi ritrovai in una selva oscura" +
" che la dritta via era smarrita ";
System.out.println("Found " + countWordsIteratively(SENTENCE) + " words");
复制代码
请注意,咱们在句子里添加了一些额外的随机空格,以演示这个迭代实现即便在两个词之间存在多个空格时也能正常工做。正如咱们所料,这段代码将打印如下内容:
Found 19 words
复制代码
理想状况下,你会想要用更为函数式的风格来实现它,由于就像咱们前面说过的,这样你就能够用并行 Stream 来并行化这个过程,而无需显式地处理线程和同步问题。
首先你须要把 String 转换成一个流。不幸的是,原始类型的流仅限于 int 、 long 和 double , 因此你只能用 Stream :
Stream<Character> stream = IntStream.range(0, SENTENCE.length())
.mapToObj(SENTENCE::charAt);
复制代码
你能够对这个流作归约来计算字数。在归约流时,你得保留由两个变量组成的状态:一个 int用来计算到目前为止数过的字数,还有一个 boolean 用来记得上一个遇到的 Character 是否是空格。由于Java没有元组(tuple,用来表示由异类元素组成的有序列表的结构,不须要包装对象),因此你必须建立一个新类 WordCounter 来把这个状态封装起来,以下所示。
private static class WordCounter {
private final int counter;
private final boolean lastSpace;
public WordCounter(int counter, boolean lastSpace) {
this.counter = counter;
this.lastSpace = lastSpace;
}
public WordCounter accumulate(Character c) {
if (Character.isWhitespace(c)) {
return lastSpace ?
this :
new WordCounter(counter, true);
} else {
return lastSpace ?
new WordCounter(counter + 1, false) :
this;
}
}
public WordCounter combine(WordCounter wordCounter) {
return new WordCounter(counter + wordCounter.counter,
wordCounter.lastSpace);
}
public int getCounter() {
return counter;
}
}
复制代码
在这个列表中, accumulate 方法定义了如何更改 WordCounter 的状态,或更确切地说是用哪一个状态来创建新的 WordCounter ,由于这个类是不可变的。每次遍历到 Stream 中的一个新的Character 时,就会调用 accumulate 方法。具体来讲,就像 countWordsIteratively 方法同样,当上一个字符是空格,新字符不是空格时,计数器就加一。
调用第二个方法 combine 时,会对做用于 Character 流的两个不一样子部分的两个WordCounter 的部分结果进行汇总,也就是把两个 WordCounter 内部的计数器加起来。
private static int countWords(Stream<Character> stream) {
WordCounter wordCounter = stream.reduce(new WordCounter(0, true),
WordCounter::accumulate,
WordCounter::combine);
return wordCounter.getCounter();
}
复制代码
如今你就能够试一试这个方法,给它由包含但丁的《神曲》中《地狱篇》第一句的 String建立的流:
Stream<Character> stream = IntStream.range(0, SENTENCE.length())
.mapToObj(SENTENCE::charAt);
System.out.println("Found " + countWords(stream) + " words");
复制代码
你能够和迭代版本比较一下输出:
Found 19 words
复制代码
到如今为止都很好,但咱们以函数式实现 WordCounter 的主要缘由之一就是能轻松地并行处理,让咱们来看看具体是如何实现的。
你能够尝试用并行流来加快字数统计,以下所示:
System.out.println("Found " + countWords(stream.parallel()) + " words");
复制代码
不幸的是,此次的输出是:
Found 25 words
复制代码
显然有什么不对,可究竟是哪里不对呢?问题的根源并不难找。由于原始的 String 在任意位置拆分,因此有时一个词会被分为两个词,而后数了两次。这就说明,拆分流会影响结果,而把顺序流换成并行流就可能使结果出错。
如何解决这个问题呢?解决方案就是要确保 String 不是在随机位置拆开的,而只能在词尾拆开。要作到这一点,你必须为 Character 实现一个 Spliterator ,它只能在两个词之间拆开String (以下所示),而后由此建立并行流。
private static class WordCounterSpliterator implements Spliterator<Character> {
private final String string;
private int currentChar = 0;
public WordCounterSpliterator(String string) {
this.string = string;
}
@Override
public boolean tryAdvance(Consumer<? super Character> action) {
action.accept(string.charAt(currentChar++));
return currentChar < string.length();
}
@Override
public Spliterator<Character> trySplit() {
int currentSize = string.length() - currentChar;
if (currentSize < 10) {
return null;
}
for (int splitPos = currentSize / 2 + currentChar;
splitPos < string.length(); splitPos++) {
if (Character.isWhitespace(string.charAt(splitPos))) {
Spliterator<Character> spliterator =
new WordCounterSpliterator(string.substring(currentChar,
splitPos));
currentChar = splitPos;
return spliterator;
}
}
return null;
}
@Override
public long estimateSize() {
return string.length() - currentChar;
}
@Override
public int characteristics() {
return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
}
}
复制代码
这个 Spliterator 由要解析的 String 建立,并遍历了其中的 Character ,同时保存了当前正在遍历的字符位置。让咱们快速回顾一下实现了Spliterator接口的WordCounterSpliterator 中的各个函数。
如今就能够用这个新的 WordCounterSpliterator 来处理并行流了,以下所示:
Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE);
Stream<Character> stream = StreamSupport.stream(spliterator, true);
复制代码
传给 StreamSupport.stream 工厂方法的第二个布尔参数意味着你想建立一个并行流。把这个并行流传给 countWords 方法:
System.out.println("Found " + countWords(stream.parallel()) + " words");
复制代码
能够获得意料之中的正确输出:
Found 19 words
复制代码
你已经看到了 Spliterator 如何让你控制拆分数据结构的策略。 Spliterator 还有最后一个值得注意的功能,就是能够在第一次遍历、第一次拆分或第一次查询估计大小时绑定元素的数据源,而不是在建立时就绑定。这种状况下,它称为延迟绑定(late-binding)的 Spliterator 。