Fork-join框架

这是一个JDK7引入的并行框架,它把流程划分红fork(分解)+join(合并)两个步骤(怎么那么像MapReduce?),传统线程池来实现一个并行任务的时候,常常须要花费大量的时间去等待其余线程执行任务的完成,可是fork-join框架使用work stealing技术缓解了这个问题:java

  1. 每一个工做线程都有一个双端队列,当分给每一个任务一个线程去执行的时候,这个任务会放到这个队列的头部;
  2. 当这个任务执行完毕,须要和另一个任务的结果执行合并操做,但是那个任务却没有执行的时候,不会干等,而是把另外一个任务放到队列的头部去,让它尽快执行;
  3. 当工做线程的队列为空,它会尝试从其余线程的队列尾部偷一个任务过来;
  4. 取得的任务能够被进一步分解。
  • ForkJoinPool.class,ForkJoin框架的任务池,ExecutorService的实现类
  • ForkJoinTask.class,Future的子类,框架任务的抽象
  • ForkJoinWorkerThread.class,工做线程
  • RecursiveTask.class,ForkJoinTask的实现类,compute方法有返回值,下文中有例子
  • RecursiveAction.class,ForkJoinTask的实现类,compute方法无返回值,只须要覆写compute方法,对于可继续分解的子任务,调用coInvoke方法完成(参数是RecursiveAction子类对象的可变数组):

  依靠应用程序自己并行拆封任务,若是使用简单的多线程程序的方法,复杂度必然很大。这就须要一个更好的范式或者工具来代程序员处理这类问题。Java 7也意识到了这个问题,才标准库中集成了由Doug Lea开发的Fork/Join并行计算框架。经过使用 Fork/Join 模式,软件开发人员可以方便地利用多核平台的计算能力。尽管尚未作到对软件开发人员彻底透明,Fork/Join 模式已经极大地简化了编写并发程序的琐碎工做。对于符合 Fork/Join 模式的应用,软件开发人员再也不须要处理各类并行相关事务,例如同步、通讯等,以难以调试而闻名的死锁和 data race 等错误也就不会出现,提高了思考问题的层次。你能够把 Fork/Join 模式看做并行版本的 Divide and Conquer 策略,仅仅关注如何划分任务和组合中间结果,将剩下的事情丢给 Fork/Join 框架。可是Fork/Join并行计算框架,并非银弹,并不能解决全部应用程序在超多核心处理器上的并发问题。程序员

    若是一个应用能被分解成多个子任务,而且组合多个子任务的结果就可以得到最终的答案,那么这个应用就适合用 Fork/Join 模式来解决。其原理以下图。
数组

    应用程序开发者须要作的就是拆分任务并组合每一个子任务的中间结果,而不用再考虑线程和锁的问题。多线程

一个简单的例子

咱们首先看一个简单的Fork/Join的任务定义。并发

Java代码 复制代码  收藏代码
  1. public class Calculator extends RecursiveTask<Integer> {   
  2.   
  3.     private static final int THRESHOLD = 100;   
  4.     private int start;   
  5.     private int end;   
  6.   
  7.     public Calculator(int start, int end) {   
  8.         this.start = start;   
  9.         this.end = end;   
  10.     }   
  11.   
  12.     @Override  
  13.     protected Integer compute() {   
  14.         int sum = 0;   
  15.         if((start - end) < THRESHOLD){   
  16.             for(int i = start; i< end;i++){   
  17.                 sum += i;   
  18.             }   
  19.         }else{   
  20.             int middle = (start + end) /2;   
  21.             Calculator left = new Calculator(start, middle);   
  22.             Calculator right = new Calculator(middle + 1, end);   
  23.             left.fork();   
  24.             right.fork();   
  25.   
  26.             sum = left.join() + right.join();   
  27.         }   
  28.         return sum;   
  29.     }   
  30.   
  31. }  
public class Calculator extends RecursiveTask<Integer> {

    private static final int THRESHOLD = 100;
    private int start;
    private int end;

    public Calculator(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        if((start - end) < THRESHOLD){
            for(int i = start; i< end;i++){
                sum += i;
            }
        }else{
            int middle = (start + end) /2;
            Calculator left = new Calculator(start, middle);
            Calculator right = new Calculator(middle + 1, end);
            left.fork();
            right.fork();

            sum = left.join() + right.join();
        }
        return sum;
    }

}

 

    这段代码中,定义了一个累加的任务,在compute方法中,判断当前的计算范围是否小于一个值,若是是则计算,若是没有,就把任务拆分为连个子任务,并合并连个子任务的中间结果。程序递归的完成了任务拆分和计算。框架

    任务定义以后就是执行任务,Fork/Join提供一个和Executor框架 的扩展线程池来执行任务。dom

Java代码 复制代码  收藏代码
  1. @Test  
  2. public void run() throws Exception{   
  3.     ForkJoinPool forkJoinPool = new ForkJoinPool();   
  4.     Future<Integer> result = forkJoinPool.submit(new Calculator(0, 10000));   
  5.   
  6.     assertEquals(new Integer(49995000), result.get());   
  7. }  
    @Test
    public void run() throws Exception{
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        Future<Integer> result = forkJoinPool.submit(new Calculator(0, 10000));

        assertEquals(new Integer(49995000), result.get());
    }

 

Fork/Join框架的主要类


RecursiveAction供不须要返回值的任务继续。ide

RecursiveTask经过泛型参数设置计算的返回值类型。工具

ForkJoinPool提供了一系列的submit方法,计算任务。ForkJoinPool默认的线程数经过Runtime.availableProcessors()得到,由于在计算密集型的任务中,得到多于处理性核心数的线程并不能得到更多性能提高。性能

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
    doSubmit(task);
    return task;
}

sumit方法返回了task自己,ForkJoinTask实现了Future接口,因此能够经过它等待得到结果。

另外一例子

这个例子并行排序数组,不须要返回结果,因此继承了RecursiveAction。

Java代码 复制代码  收藏代码
  1. public class SortTask extends RecursiveAction {   
  2.     final long[] array;   
  3.     final int start;   
  4.     final int end;   
  5.     private int THRESHOLD = 100; //For demo only   
  6.   
  7.     public SortTask(long[] array) {   
  8.         this.array = array;   
  9.         this.start = 0;   
  10.         this.end = array.length - 1;   
  11.     }   
  12.   
  13.     public SortTask(long[] array, int start, int end) {   
  14.         this.array = array;   
  15.         this.start = start;   
  16.         this.end = end;   
  17.     }   
  18.   
  19.     protected void compute() {   
  20.         if (end - start < THRESHOLD)   
  21.             sequentiallySort(array, start, end);   
  22.         else {   
  23.             int pivot = partition(array, start, end);   
  24.             new SortTask(array, start, pivot - 1).fork();   
  25.             new SortTask(array, pivot + 1, end).fork();   
  26.         }   
  27.     }   
  28.   
  29.     private int partition(long[] array, int start, int end) {   
  30.         long x = array[end];   
  31.         int i = start - 1;   
  32.         for (int j = start; j < end; j++) {   
  33.             if (array[j] <= x) {   
  34.                 i++;   
  35.                 swap(array, i, j);   
  36.             }   
  37.         }   
  38.         swap(array, i + 1, end);   
  39.         return i + 1;   
  40.     }   
  41.   
  42.     private void swap(long[] array, int i, int j) {   
  43.         if (i != j) {   
  44.             long temp = array[i];   
  45.             array[i] = array[j];   
  46.             array[j] = temp;   
  47.         }   
  48.     }   
  49.   
  50.     private void sequentiallySort(long[] array, int lo, int hi) {   
  51.         Arrays.sort(array, lo, hi + 1);   
  52.     }   
  53. }  
public class SortTask extends RecursiveAction {
    final long[] array;
    final int start;
    final int end;
    private int THRESHOLD = 100; //For demo only

    public SortTask(long[] array) {
        this.array = array;
        this.start = 0;
        this.end = array.length - 1;
    }

    public SortTask(long[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    protected void compute() {
        if (end - start < THRESHOLD)
            sequentiallySort(array, start, end);
        else {
            int pivot = partition(array, start, end);
            new SortTask(array, start, pivot - 1).fork();
            new SortTask(array, pivot + 1, end).fork();
        }
    }

    private int partition(long[] array, int start, int end) {
        long x = array[end];
        int i = start - 1;
        for (int j = start; j < end; j++) {
            if (array[j] <= x) {
                i++;
                swap(array, i, j);
            }
        }
        swap(array, i + 1, end);
        return i + 1;
    }

    private void swap(long[] array, int i, int j) {
        if (i != j) {
            long temp = array[i];
            array[i] = array[j];
            array[j] = temp;
        }
    }

    private void sequentiallySort(long[] array, int lo, int hi) {
        Arrays.sort(array, lo, hi + 1);
    }
}

 

Java代码 复制代码  收藏代码
  1. @Test  
  2. public void run() throws InterruptedException {   
  3.     ForkJoinPool forkJoinPool = new ForkJoinPool();   
  4.     Random rnd = new Random();   
  5.     long[] array = new long[SIZE];   
  6.     for (int i = 0; i < SIZE; i++) {   
  7.         array[i] = rnd.nextInt();   
  8.     }   
  9.     forkJoinPool.submit(new SortTask(array));   
  10.   
  11.     forkJoinPool.shutdown();   
  12.     forkJoinPool.awaitTermination(1000, TimeUnit.SECONDS);   
  13.   
  14.     for (int i = 1; i < SIZE; i++) {   
  15.         assertTrue(array[i - 1] < array[i]);   
  16.     }   
  17. }  
相关文章
相关标签/搜索