Java8并行流使用注意事项

对于从事Java开发的童鞋来讲,相信对于Java8的并行流并不陌生,没错,咱们经常用它来执行并行任务,可是因为并行流(parallel stream)采用的是享线程池,可能会对咱们的性能形成严重影响,那怎么处理呢?java

问题异步

首先咱们来看看具体的问题。在开发中,咱们经常经过如下方法,实现并行流执行并行任务:性能

myList.parallelStream.map(obj -> longRunningOperation())网站

可是这存在一个严重的问题:在 JVM 的后台,使用通用的 fork/join 池来完成上述功能,该池是全部并行流共享的。默认状况,fork/join 池会为每一个处理器分配一个线程。假设你有一台16核的机器,这样你就只能建立16个线程。对 CPU 密集型的任务来讲,这样是有意义的,由于你的机器确实只能执行16个线程。可是真实状况下,不是全部的任务都是 CPU 密集型的。例如:ui

myList.parallelStream this

  .map(this::retrieveFromA)spa

  .map(this::processUsingB)线程

  .forEach(this::saveToC)blog

 

myList.parallelStream 开发

  .map(this::retrieveFromD)

  .map(this::processUsingE)

  .forEach(this::saveToD)

这两个流很大程度上是受限于IO操做,因此会等待其余系统。但这两个流使用相同的(小)线程池,所以会相互等待而被阻塞,很是不友好。好比:

 

final List<Integer> firstRange = buildIntRange(); 

   firstRange.parallelStream().forEach((number) -> {

      try {

         // do something slow

         Thread.sleep(5);

      } catch (InterruptedException e) { }

});

 

在执行期间,我获取了一份线程dump的文件。这是相关的线程:

 

ForkJoinPool.commonPool-worker-1 

ForkJoinPool.commonPool-worker-2 

ForkJoinPool.commonPool-worker-3 

ForkJoinPool.commonPool-worker-4

如今,我要并行的执行这两个并行流:

Runnable firstTask = () -> { 

  firstRange.parallelStream().forEach((number) -> {

    try {

      // do something slow

      Thread.sleep(5);

    } catch (InterruptedException e) { }

  });

};

Runnable secondTask = () -> { 

  secondRange.parallelStream().forEach((number) -> {

    try {

      // do something slow

      Thread.sleep(5);

    } catch (InterruptedException e) { }

  });

};

// run threads

 

此次咱们再看一下线程dump文件:

 

ForkJoinPool.commonPool-worker-1 

ForkJoinPool.commonPool-worker-2 

ForkJoinPool.commonPool-worker-3 

ForkJoinPool.commonPool-worker-4

正如你所见,结果是同样的。咱们只使用了4个线程。

 

解决办法

对于上面的问题,咱们能够在JVM 后台使用 fork/join 池,在 ForkJoinTask 的文档中,咱们能够看到:

若是合适,安排一个异步执行的任务到当前正在运行的池中。若是任务不在inForkJoinPool()中,也能够调用ForkJoinPool.commonPool()获取新的池来执行,好比:

 

ForkJoinPool forkJoinPool = new ForkJoinPool(3); 

forkJoinPool.submit(() -> { 

  firstRange.parallelStream().forEach((number) -> {

    try {

      Thread.sleep(5);

    } catch (InterruptedException e) { }

  });

});

ForkJoinPool forkJoinPool2 = new ForkJoinPool(3); 

forkJoinPool2.submit(() -> { 

  secondRange.parallelStream().forEach((number) -> {

    try {

      Thread.sleep(5);

    } catch (InterruptedException e) {

    }

  });

});

 

如今,咱们再次查看线程池:

 

ForkJoinPool-1-worker-1 

ForkJoinPool-1-worker-2 

ForkJoinPool-1-worker-3 

ForkJoinPool-1-worker-4 

ForkJoinPool-2-worker-1 

ForkJoinPool-2-worker-2 

ForkJoinPool-2-worker-3 

ForkJoinPool-1-worker-4

上面这种方法为何又正确显示了呢?由于咱们建立本身的线程池,因此能够避免共享线程池,若是有须要,甚至能够分配比处理机数量更多的线程。

ForkJoinPool forkJoinPool = new ForkJoinPool(<numThreads>);

 

以上就是Java8并行流在使用中所存在的一些问题及解决办法,部份内容参考自一个Java教学网站,但愿对Java初学者有所帮助。

相关文章
相关标签/搜索