《Java 8 in Action》Chapter 7:并行数据处理与性能

在Java 7以前,并行处理数据集合很是麻烦。第一,你得明确地把包含数据的数据结构分红若干子部分。第二,你要给每一个子部分分配一个独立的线程。第三,你须要在恰当的时候对它们进行同步来避免不但愿出现的竞争条件,等待全部线程完成,最后把这些部分结果合并起来。Java 7引入了一个叫做分支/合并的框架,让这些操做更稳定、更不易出错。
Stream接口让你不用太费力气就能对数据集执行并行操做。它容许你声明性地将顺序流变为并行流。此外,你将看到Java是如何变戏法的,或者更实际地来讲, 流是如何在幕后应用Java 7引入的分支/合并框架的。java

1. 并行流

并行流就是一个把内容分红多个数据块,并用不一样的线程分别处理每一个数据块的流。算法

public static long sequentialSum(long n) {
             return Stream.iterate(1L, i -> i + 1)
                          .limit(n)
                          .reduce(0L, Long::sum);
}
传统写法:
public static long iterativeSum(long n) {
        long result = 0;
        for (long i = 1L; i <= n; i++) {
            result += i;
        }
        return result;
}复制代码

1.1 将顺序流转换为并行流

能够把流转换成并行流,从而让前面的函数归约过程(也就是求和)并行运行——对顺序流调用parallel方法:编程

public static long parallelSum(long n) {
        return Stream.iterate(1L, i -> i + 1)
                     .limit(n)
                     .parallel()
                     .reduce(0L, Long::sum);
}复制代码

在现实中,对顺序流调用parallel方法并不意味着流自己有任何实际的变化。它在内部实际上就是设了一个boolean标志,表示你想让调用parallel以后进行的全部操做都并行执行。相似地,你只须要对并行流调用sequential方法就能够把它变成顺序流。请注意,你可能觉得把这两个方法结合起来,就能够更细化地控制在遍历流时哪些操做要并行执行,哪些要顺序执行。数组

配置并行流使用的线程池数据结构

看看流的parallel方法,你可能会想,并行流用的线程是从哪来的?有多少个?怎么自定义这个过程呢?架构

并行流内部使用了默认的ForkJoinPool,它默认的线程数量就是你的处理器数量,这个值是由Runtime.getRuntime().available- Processors()获得的。框架

可是你能够经过系统属性 java.util.concurrent.ForkJoinPool.common.parallelism来改变线程池大小,以下所示:ide

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");函数

这是一个全局设置,所以它将影响代码中全部的并行流。反过来讲,目前还没法专为某个并行流指定这个值。通常而言,让ForkJoinPool的大小等于处理器数量是个不错的默认值,性能

除非你有很好的理由,不然咱们强烈建议你不要修改它。

1.2 测量流性能

并行编程可能很复杂,有时候甚至有点违反直觉。若是用得不对(好比采用了一 个不易并行化的操做,如iterate),它甚至可能让程序的总体性能更差,因此在调用那个看似神奇的parallel操做时,了解背后到底发生了什么是颇有必要的。并行化并非没有代价的。并行化过程自己须要对流作递归划分,把每一个子流的概括操做分配到不一样的线程,而后把这些操做的结果合并成一个值。但在多个内核之间移动数据的代价也可能比你想的要大,因此很重要的一点是要保证在内核中并行执行工做的时间比在内核之间传输数据的时间长。总而言之,不少状况下不可能或不方便并行化。然而,在使用 并行Stream加速代码以前,你必须确保用得对;若是结果错了,算得快就毫无心义了。

1.3 正确使用并行流

错用并行流而产生错误的首要缘由,就是使用的算法改变了某些共享状态。下面是另外一种实现对前n个天然数求和的方法,但这会改变一个共享累加器:

public static long sideEffectSum(long n) {
    Accumulator accumulator = new Accumulator();
    LongStream.rangeClosed(1, n).forEach(accumulator::add)
    return accumulator.total;
}
public class Accumulator {
    public long total = 0;
    public void add(long value) { total += value; }
}复制代码

这段代码自己上就是顺序的,由于每次访问total都会出现数据竞争。接下来将这段代码改成并行:

public static long sideEffectParallelSum(long n) {
    Accumulator accumulator = new Accumulator();
    LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
    return accumulator.total;}
System.out.println("SideEffect parallel sum done in: " + measurePerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) +" msecs" );
Result: 5959989000692
Result: 7425264100768
Result: 6827235020033
Result: 7192970417739
Result: 6714157975331
Result: 7715125932481
SideEffect parallel sum done in: 49 msecs复制代码

这回方法的性能可有可无了,惟一要紧的是每次执行都会返回不一样的结果,都离正确值50000005000000差很远。这是因为多个线程在同时访问累加器,执行total += value,而这一句􏱵然看似简单,却不是一个原子操做。问题的根源在于,forEach中调用的方法有反作用,它会改变多个线程共享的对象的可变状态。要是你想用并行Stream又不想引起相似的意外,就必须避免这种状况。如今你知道了,共享可变状态会影响并行流以及并行计算。

1.4 高效使用并行流

  • 若是有疑问,测量。把顺序流转成并行流垂手可得,但却不必定是好事。咱们在本节中已经指出,并行流并不老是比顺序流快。此外,并行流有时候会和你的直觉不一致,因此在考虑选择顺序流仍是并行流时,第一个也是最重要的建议就是用适当的基准来检查其性能。
  • 留意装箱。自动装箱和拆箱操做会大大下降性能。Java 8中有原始类型流(IntStream、 LongStream、DoubleStream)来避免这种操做,但凡是有可能都应该用这些流。
  • 有些操做自己在并行流上的性能就比顺序流差。特别是limit和findFirst等依赖于元素顺序的操做,它们在并行流上执行的代价很是大。例如,findAny会比findFirst性能好,由于它不必定要按顺序来执行。你老是能够调用unordered方法来把有序流变成无序流。那么,若是你须要流中的n个元素而不是专门要前n个的话,对无序并行流调用 limit可能会比单个有序流(好比数据源是一个List)更高效。
  • 还要考虑流的操做流水线的总计算成本。设N是要处理的元素的总数,Q是一个元素经过 流水线的大体处理成本,则N*Q就是这个对成本的一个粗略的定性估计。Q值较高就意味着使用并行流时性能好的可能性比较大。
  • 对于较小的数据量,选择并行流几乎历来都不是一个好的决定。并行处理少数几个元素的好处还抵不上并行化形成的额外开销。
  • 要考虑流背后的数据结构是否易于分解。例如,ArrayList的拆分效􏶲比LinkedList 高得多,由于前者用不着遍历就能够平均拆分,然后者则必须遍历。另外,用range工厂方法建立的原始类型流也能够快速分解。最后,你将在7.3节中学到,你能够本身实现Spliterator来彻底掌握分解过程。
  • 流自身的特色,以及流水线中的中间操做修改流的方式,均可能会改变分解过程的性能。例如,一个SIZED流能够分红大小相等的两部分,这样每一个部分均可以比较高效地并行处理,但筛选操做可能丢弃的元素个数却没法预测,致使流自己的大小未知。
  • 还要考虑终􏲧操做中合并步骤的代价是大是小(例如Collector中的combiner方法)。 若是这一步代价很大,那么组合每一个子流产生的部分结果所付出的代价就可能会超出经过并行流获得的性能提高。

并行流背后使用的基础架构是Java 7中引入的分支/合并框架。

2. 分支/合并框架

分支/合并框架的目的是以递归方式将能够并行的任务拆分红更小的任务,而后将每一个子任务的结果合并起来生成总体结果。它是ExecutorService接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工做线程。

2.1 使用RecursiveTask

要把任务提交到这个池,必须建立RecursiveTask 的一个子类,其中R是并行化任务(以 及全部子任务)产生的结果类型,或者若是任务不返回结果,则是RecursiveAction类型(固然它可能会更新其余非局部机构)。要定义RecursiveTask,只需实现它惟一的抽象方法 compute:

protected abstract R compute();复制代码

这个方法同时定义了将任务拆分红子任务的逻辑,以及没法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。下图表示了递归任务的拆分过程:

让咱们试着用这个框架为一个数字范围(这里用一个 long[]数组表示)求和。如前所述,你须要先为RecursiveTask类作一个实现,就是下面代码清单中的ForkJoinSumCalculator。

public class ForkJoinSumCalculator extends RecursiveTask<Long> {
    private final long[] numbers;
    private final int start;
    private final int end;

    public static final long THRESHOLD = 10_000;

    public ForkJoinSumCalculator(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    public ForkJoinSumCalculator(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        int length = end - start;
        if (length <= THRESHOLD) {
            return computeSequentially();
        }
        ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);
        leftTask.fork();

        ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);
        Long rightResult = rightTask.compute();
        Long leftResult = leftTask.join();
        return leftResult + rightResult;
    }

    private long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
}复制代码

这里用了一个LongStream来生成包含前n个天然数的数组,而后建立一个ForkJoinTask (RecursiveTask的父类),并把数组传递给代码清单7-2所示ForkJoinSumCalculator的公共构造函数。最后,你建立了一个新的ForkJoinPool,并把任务传给它的调用方法 。在ForkJoinPool中执行时,最后一个方法返回的值就是ForkJoinSumCalculator类定义的任务结果。请注意在实际应用时,使用多个ForkJoinPool是没有什么意义的。正是出于这个缘由,通常来讲把它实例化一次,而后把实例保存在静态字段中,使之成为单例,这样就能够在软件中任何部分方便地重用了。这里建立时用了其默认的无参数构造函数,这意味着想让线程池使用JVM可以使用的全部处理器。更确切地说,该构造函数将使用Runtime.availableProcessors的返回值来决定线程􏶈使用的线程数。请注意availableProcessors方法虽然看起来是处理器, 但它实际上返回的是可用内核的数量,包括超线程生成的虚拟内核。当把ForkJoinSumCalculator任务传给ForkJoinPool时,这个任务就由􏶈中的一个线程 执行,这个线程会调用任务的compute方法。该方法会检查任务是否小到足以顺序执行,若是不够小则会把要求和的数组分红两半,分给两个新的ForkJoinSumCalculator,而它们也由ForkJoinPool安排执行。所以,这一过程能够递归重复,把原任务分为更小的任务,直到知足不方便或不可能再进一步拆分的条件(本例中是求和的项目数小于等于10000)。这时会顺序计算每一个任务的结果,而后由分支过程建立的(隐含的)任务二叉树遍历回到它的根。接下来会合并每一个子任务的部分结果,从而获得总任务的结果。这一过程以下图所示。

2.2 使用分支/合并框架的最佳作法

  • 对一个任务调用join方法会阻塞调用方,直到该任务作出结果。所以,有必要在两个子任务的计算都开始以后再调用它。不然,你获得的版本会比原始的顺序算法更慢更复杂,由于每一个子任务都必须等待另外一个子任务完成才能启动。
  • 不该该在RecursiveTask内部使用ForkJoinPool的invoke方法。相反,你应该始终直接调用compute或fork方法,只有顺序代码才应该用invoke来启动并行计算。
  • 对子任务调用fork方法能够把它排进ForkJoinPool。同时对左边和右边的子任务调用它彷佛很天然,但这样作的效􏶲要比直接对其中一个调用compute低。这样作你能够为其中一个子任务重用同一线程,从而避免在线程池中多分配一个任务形成的开销。
  • 调试使用分支/合并框架的并行计算可能有点棘手。特别是你日常都在你喜欢的IDE里面看栈跟踪(stack trace)来找问题,但放在分支-合并并计算上就不行了,由于调用compute的线程并非概念上的调用方,后者是调用fork的那个。
  • 和并行流同样,你不该理所固然地认为在多核处理器上使用分支/合并框架就比顺序计算快。咱们已经说过,一个任务能够分解成多个独立的子任务,才能让性能在并行化时有所提高。全部这些子任务的运行时间都应该比分出新任务所花的时间长;一个惯用方法是把输入/输出放在一个子任务里,计算放在另外一个里,这样计算就能够和输入/输出同时进行。此外,在比较同一算法的顺序和并行版本的性能时还有别的因素要考虑。就像任何其余Java代码同样,分支/合并框架须要“预热”或者说要执行几遍才会被JIT编译器优化。这就是为何在测量性能以前跑几遍程序很重要,咱们的测试框架就是这么作的。同时还要知道,编译器内置的优化可能会为顺序版本带来一些优􏲵(例如执行死码分析——删去从未被使用的计算)。

2.3 工做窃取

实际中,每一个子任务所花的时间可能天差地别,要么是由于划分策略效率低,要么是有不可预知的缘由,好比磁盘访问慢,或是须要和外部任务协调执行。分支/合并框架工程用一种称为工做窃取(work stealing)的技术来解决这个问题。在实际应用中,这意味着这些任务差很少被平均分配到ForkJoinPool中的全部线程上。每一个线程都为分配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执行。基于前面所述的缘由,某个线程可能早早完成了分配给它的全部任务,也就是它的队列已经空了,而其余的线程还很忙。这时,这个线程并无闲下来,而是随机选了一个别的线程,从队列的尾巴上“偷走”一个任务。这个过程一直继续下去,直到全部的任务都执行完毕,全部的队列都清空。这就是为何要划成许多小任务而不是少数几个大任务,这有助于更好地在工做线程之间平衡负载。通常来讲,这种工做窃取算法用于在池中的工做线程之间从新分配和平衡任务。

3. Spliterator

Spliterator是Java 8中加入的另外一个新接口;这个名字表明“可分迭代器”(splitable iterator)。和Iterator同样,Spliterator也用于遍历数据源中的元素,但它是为了并行执行而设计的。

public interface Spliterator<T> {
        boolean tryAdvance(Consumer<? super T> action);
        Spliterator<T> trySplit();
        long estimateSize();
        int characteristics();
}复制代码

与往常同样,T是Spliterator遍历的元素的类型。tryAdvance方法的行为相似于普通的 Iterator,由于它会按顺序一个一个使用Spliterator中的元素,而且若是还有其余元素要遍历就返回true。但trySplit是专为Spliterator接口设计的,由于它能够把一些元素划出去分给第二个Spliterator(由该方法返回),让它们两个并行处理。Spliterator还可经过 estimateSize方法估计还剩下多少元素要遍历,由于即便不那么确切,能快速算出来是一个值也有助于让拆分均匀一点。

3.1 拆分过程

将Stream拆分红多个部分的算法是一个递􏰒过程,如图7-6所示。第一步是对第一个 Spliterator调用trySplit,生成第二个Spliterator。第二步对这两个Spliterator调用 trysplit,这样总共就有了四个Spliterator。这个框架不断对Spliterator调用trySplit直到它返回null,代表它处理的数据结构不能再分割,如第三步所示。最后,这个递归拆分过程到第四步就终止了,这时全部的Spliterator在调用trySplit时都返回了null。

Spliterator的特性
    Spliterator接口声明的最后一个抽象方法是characteristics,它将返回一个int,代 表Spliterator自己特性集的编码。
    使用Spliterator的客户能够用这些特性来更好地控制和优化它的使用。
    表7-2总结了这些特性。(不幸的是,虽然它们在概念上与收集器的特性有重叠,编码却不同。)
![](https://user-gold-cdn.xitu.io/2019/8/28/16cd56026a7081e8?w=1260&h=456&f=png&s=67011)复制代码

3.2 实现自定义Spliterator

4. 小结

在本章中,你了解了如下内容。

  • 内部迭代让你能够并行处理一个流,而无需在代码中显式使用和􏷡调不一样的线程。
  • 虽然并行处理一个流很容易,却不能保证程序在全部状况下都运行得更快。并行软件的行为和性能有时是违反直觉的,所以必定要测量,确保你并无把程序拖得更慢。
  • 像并行流那样对一个数据集并行执行操做能够提高性能,特别是要处理的元素数量庞大,或处理单个元素特别耗时的时候。
  • 从性能角度来看,使用正确的数据结构,如尽量利用原始流而不是通常化的流,几乎老是比尝试并行化某些操做更为重要。
  • 分支/合并框架让你得以用递归方式将能够并行的任务拆分红更小的任务,在不一样的线程上执行,而后将各个子任务的结果合并起来生成总体结果。
  • Spliterator定义了并行流如何拆分它要遍历的数据。

资源获取

  • 公众号回复 : Java8 便可获取《Java 8 in Action》中英文版!

Tips

  • 欢迎收藏和转发,感谢你的支持!(๑•̀ㅂ•́)و✧
  • 欢迎关注个人公众号:庄里程序猿,读书笔记教程资源第一时间得到!

相关文章
相关标签/搜索