Java Fork / Join进行并行编程

最近几年,计算机处理器领域发生了范式转变。 多年来,处理器制造商一直在提高时钟频率,因此开发人员享受到这样的事实,即他们的单线程软件执行得更快,而无需他们付出任何努力。 现在,处理器制造商青睐多核芯片设计,并且必须以多线程或多进程的方式编写软件才能充分利用硬件。 因此,软件开发人员追赶的唯一方法是编写利用并行性的应用程序,即使用多个CPU内核来处理所有任务,而不是使用单个更快的内核。 摩尔定律仍然适用,但适用范围有所不同。

并行计算或并行化是一种计算形式,其中许多计算同时进行,其原理是大问题通常可以分为较小的问题,然后并行解决(“并行”)。 从本质上讲,如果可以将CPU密集型问题划分为较小的独立任务,则可以将这些任务分配给不同的处理器。

关于多线程和并发,Java非常有趣。 它从一开始就支持Thread,而在过去,您可以使用带有interruptjoinsleep方法的低级方法来操纵线程的执行。 此外,所有对象继承的notifywait方法也可能会有所帮助。

可以通过这种方式控制应用程序的执行,但是过程有些繁琐。 然后是Java 1.5中的并发包 ,它提供了一个更高级别的框架,开发人员可以使用该框架以更简单,更容易和更不易出错的方式处理线程。 该程序包提供了一堆在并发编程中通常有用的实用程序类。

多年以来,对“启用并发”程序的需求变得越来越大,因此该平台采取了这一步骤,进一步引入了Java SE 7的新并发功能 功能之一是引入了Fork / Join框架,该框架原本打算包含在JDK 1.5版本中,但最终没有成功。

Fork / Join框架旨在使分而治之算法易于并行化。 这种类型的算法非常适合可以分为两个或多个相同类型的子问题的问题。 他们使用递归将问题分解为简单的任务,直到这些任务变得足够简单以至于可以直接解决。 然后将子问题的解决方案合并以给出原始问题的解决方案。 关于Fork / Join方法的出色介绍是文章“ Java SE中的Fork-Join开发 ”。

您可能已经注意到,Fork / Join方法类似于MapReduce ,因为它们都是并行化任务的算法。 但是,一个区别是,只有在必要时(如果太大),Fork / Join任务才将自己细分为较小的任务,而MapReduce算法将其作为工作的第一步将所有工作分为几部分。

Fork / Join Java框架起源于JSR-166,您可以在Doug Lea领导的并发JSR-166兴趣站点中找到更多信息。 实际上,如果您希望使用该框架并且没有JDK 7,则必须在这里进行操作。如站点中所述,我们可以使用JDK 1.5和1.6中的相关类,而不必安装最新的JDK。 为此,我们必须下载jsr166 jar并使用-Xbootclasspath / p:jsr166.jar选项启动JVM。 请注意,您可能需要在“ jsr166.jar”之前加上其完整文件路径。 之所以必须这样做,是因为JAR包含一些重写核心Java类的类(例如java.util包下的类)。 不要忘记为JSR-166 API文档添加书签。

因此,让我们看看如何使用该框架解决一个实际问题。 我们将创建一个程序来计算斐波纳契数 (一个经典的教学问题),并查看如何使用新类使我们尽可能快。 Java Fork / Join + Groovy文章中也解决了此问题。

首先,我们创建一个表示问题的类,如下所示:

package com.javacodegeeks.concurrency.forkjoin;

public class FibonacciProblem {
 
 public int n;

 public FibonacciProblem(int n) {
  this.n = n;
 }
 
 public long solve() {
  return fibonacci(n);
 }
 
 private long fibonacci(int n) {
  System.out.println("Thread: " +
   Thread.currentThread().getName() + " calculates " + n);
  if (n <= 1)
   return n;
  else 
   return fibonacci(n-1) + fibonacci(n-2);
 }

}

如您所见,我们使用该解决方案的递归版本,这是一个典型的实现。 (请注意,此实现效率很高,因为它会一遍又一遍地计算相同的值。在实际情况下,已经缓存的计算值应在以后的执行中进行缓存和检索)。 现在让我们看一下单线程方法的外观:

package com.javacodegeeks.concurrency.forkjoin;

import org.perf4j.StopWatch;

public class SillyWorker {
 
 public static void main(String[] args) throws Exception {
  
  int n = 10;
  
  StopWatch stopWatch = new StopWatch();
  FibonacciProblem bigProblem = new FibonacciProblem(n); 
  
  long result = bigProblem.solve();   
  stopWatch.stop();
  
  System.out.println("Computing Fib number: " + n);
  System.out.println("Computed Result: " + result);
  System.out.println("Elapsed Time: " + stopWatch.getElapsedTime());
  
 }

}

我们只是创建一个新的FibonacciProblem并执行其solve方法,该方法将递归调用fibonacci方法。 我还使用漂亮的Perf4J库来跟踪经过的时间。 输出将是这样的(我隔离了最后几行): …线程:main计算1线程:main计算0计算Fib数:10计算结果:55经过的时间:8如预期的那样,所有工作都已完成只有一个线程(主线程)。 让我们看看如何使用Fork / Join框架重写它。 请注意,在Fibonacci解决方案中,将发生以下情况: fibonacci(n-1)+ fibonacci(n-2)因此,我们可以将这两个任务中的每一个分配给一个新的工作程序(即新线程),然后工人已经完成处决,我们将加入结果。 考虑到这一点,我们引入了FibonacciTask类,该类本质上是将较大的Fibonacci问题划分为较小的问题的一种方法。

package com.javacodegeeks.concurrency.forkjoin;

import java.util.concurrent.RecursiveTask;

public class FibonacciTask extends RecursiveTask<Long> {

 private static final long serialVersionUID = 6136927121059165206L;
 
 private static final int THRESHOLD = 5;

 private FibonacciProblem problem;
 public long result;
 
 public FibonacciTask(FibonacciProblem problem) {
  this.problem = problem;
 }

 @Override
 public Long compute() {
  if (problem.n < THRESHOLD) { // easy problem, don't bother with parallelism
   result = problem.solve();
  }
  else {
   FibonacciTask worker1 = new FibonacciTask(new FibonacciProblem(problem.n-1));
   FibonacciTask worker2 = new FibonacciTask(new FibonacciProblem(problem.n-2));
   worker1.fork();
   result = worker2.compute() + worker1.join();

  }
  return result;
 }

}

注意:如果您没有使用JDK 7,而是手动包含了JSR-166库,则必须覆盖默认的Java核心类。 否则,您将遇到以下错误: java.lang.SecurityException:禁止的程序包名称:java.util.concurrent为避免这种情况,请使用以下参数将JVM设置为覆盖类: -Xbootclasspath / p:lib / jsr166.jar我使用了“ lib / jsr166.jar”值,因为该JAR驻留在我的Eclipse项目中一个名为“ lib”的文件夹中。 配置如下所示:

我们的任务扩展了RecursiveTask类,该类是递归结果的ForkJoinTask 我们将覆盖处理此任务执行的主要计算的计算方法。 在该方法中,我们首先检查是否必须使用并行性(通过与阈值进行比较)。 如果这是一个易于执行的任务,我们将直接调用solve方法,否则我们将创建两个较小的任务,然后分别执行每个任务。 执行发生在不同的线程中,然后将其结果合并。 这可以通过使用forkjoin方法来实现。 让我们测试一下实现:

package com.javacodegeeks.concurrency.forkjoin;

import java.util.concurrent.ForkJoinPool;

import org.perf4j.StopWatch;

public class ForkJoinWorker {
 
 public static void main(String[] args) {
  
  // Check the number of available processors
  int processors = Runtime.getRuntime().availableProcessors();
  System.out.println("No of processors: " + processors);
  
  int n = 10;
  
  StopWatch stopWatch = new StopWatch();   
  FibonacciProblem bigProblem = new FibonacciProblem(n);
  
  FibonacciTask task = new FibonacciTask(bigProblem);
  ForkJoinPool pool = new ForkJoinPool(processors);
  pool.invoke(task);
  
  long result = task.result;
  System.out.println("Computed Result: " + result);
  
  stopWatch.stop();
  System.out.println("Elapsed Time: " + stopWatch.getElapsedTime());
  
 }
 
}

我们首先检查系统中可用的处理器数量,然后创建具有相应并行度的新ForkJoinPool 我们使用invoke方法分配并执行任务。 这是输出(我已隔离了第一个和最后一个方法): 处理器数量:8线程:ForkJoinPool-1-worker-7计算4线程:ForkJoinPool-1-worker-6计算4线程:ForkJoinPool-1-worker- 4计算4…线程:ForkJoinPool-1-worker-2计算1线程:ForkJoinPool-1-worker-2计算0计算结果:55经过的时间:16请注意,现在将计算委托给多个工作线程,每个工作线程其中的一项任务比原来的任务要小。 您可能已经注意到,经过的时间大于上一个。 之所以发生这种矛盾,是因为我们使用了较低的阈值(5)和较低的n值(10)。 由于创建了大量线程,因此引入了不必要的延迟。 对于较大的阈值(大约20)和较高的n(40和更高),框架的真正力量将变得显而易见。 我对n> 40的值进行了一些快速压力测试,下面是带有结果的图表:

显然,Fork / Join框架的伸缩性比单线程方法好得多,并且可以在更短的时间内执行计算。 (如果您希望自己进行一些压力测试,请不要忘记删除FibonacciProblem类中的System.out调用。)查看Windows 7计算机( i7- 720QM(具有4个内核和超线程 ),同时使用了每种方法。


单线程: 在执行过程中,CPU的总使用率仍然很低(从未超过16%)。 如您所见,当单线程应用程序难以执行所有计算时,CPU利用率不足。

多线程:

CPU利用率要好得多,所有处理器都有助于进行总计算。 在Java中介绍Fork / Join框架时,我们已经结束了。 请注意,这里我只是做些表面介绍,还存在许多其他功能,它们随时可以帮助我们利用多核CPU。 一个新的时代正在出现,因此开发人员应该熟悉这些概念。 与往常一样,您可以在这里找到为本文生成的源代码。 别忘了分享!

相关片段:

翻译自: https://www.javacodegeeks.com/2011/02/java-forkjoin-parallel-programming.html