Java 多线程中的任务分解机制-ForkJoinPool,以及CompletableFuture

ForkJoinPool的优点在于,能够充分利用多cpu,多核cpu的优点,把一个任务拆分红多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成以后,再将这些执行结果合并起来便可。html

Java7 提供了ForkJoinPool来支持将一个任务拆分红多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。java

ForkJoinPool是ExecutorService的实现类,所以是一种特殊的线程池。算法

使用方法:建立了ForkJoinPool实例以后,就能够调用ForkJoinPool的submit(ForkJoinTask<T> task) 或invoke(ForkJoinTask<T> task)方法来执行指定任务了。编程

其中ForkJoinTask表明一个能够并行、合并的任务。ForkJoinTask是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask其中RecusiveTask表明有返回值的任务,而RecusiveAction表明没有返回值的任务设计模式

Code:多线程

RecusiveAction实现方法:dom

package com.qhong.thread.ForkJoinPoolDemo; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; public class ForkJoinPoolDemo extends RecursiveAction { private static final long serialVersionUID = 1L; //定义一个分解任务的阈值——50,即一个任务最多承担50个工做量
    private int THRESHOLD=50; //任务量
    private int task_Num=0; ForkJoinPoolDemo(int Num){ this.task_Num=Num; } public static void main (String[] args) throws Exception { //建立一个支持分解任务的线程池ForkJoinPool
        ForkJoinPool pool=new ForkJoinPool(); ForkJoinPoolDemo task=new ForkJoinPoolDemo(120); pool.submit(task); pool.awaitTermination(20, TimeUnit.SECONDS);//等待20s,观察结果
 pool.shutdown(); } /** * @author qhong * @param * @return * @date 2018/4/18 17:13 * @description 实现recursiveAction中抽象方法 */ @Override protected void compute() { if(task_Num<=THRESHOLD){ System.out.println(Thread.currentThread().getName()+"承担了"+task_Num+"份工做"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }else{ //随机解成两个任务
            Random m=new Random(); int x=m.nextInt(50); ForkJoinPoolDemo left=new ForkJoinPoolDemo(x); ForkJoinPoolDemo right=new ForkJoinPoolDemo(task_Num-x); left.fork(); right.fork(); } } }

Output:异步

ForkJoinPool-1-worker-1承担了6份工做 ForkJoinPool-1-worker-2承担了2份工做 ForkJoinPool-1-worker-3承担了30份工做 ForkJoinPool-1-worker-0承担了9份工做 ForkJoinPool-1-worker-1承担了46份工做 ForkJoinPool-1-worker-2承担了17份工做 ForkJoinPool-1-worker-0承担了0份工做 ForkJoinPool-1-worker-3承担了10份工做

RecusiveTask的具体实现:ide

package com.qhong.thread.ForkJoinPoolDemo; import java.util.Arrays; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; import java.util.stream.LongStream; /** * @author qhong * @date 2018/4/18 16:14 * @description **/
public class ForkJoinCalculator implements Calculator { private ForkJoinPool pool; public ForkJoinCalculator() { // 也可使用公用的 ForkJoinPool: // pool = ForkJoinPool.commonPool()
        pool = new ForkJoinPool(); } public static void main(String[] args) { ForkJoinCalculator forkJoinCalculator=new ForkJoinCalculator(); long[] numbers=LongStream.range(1,20).toArray(); System.out.println(Arrays.toString(numbers)); long result=forkJoinCalculator.sumUp(numbers); System.out.println("result:"+result); } private static class SumTask extends RecursiveTask<Long> { private long[] numbers; private int from; private int to; public SumTask(long[] numbers, int from, int to) { this.numbers = numbers; this.from = from; this.to = to; } @Override protected Long compute() { // 当须要计算的数字小于6时,直接计算结果
            if (to - from < 4) { long total = 0; for (int i = from; i <= to; i++) { total += numbers[i]; } System.out.println(String.format("currentThread:%s,total:%s,from:%s,to:%s",Thread.currentThread().getName(),total,from,to)); return total; // 不然,把任务一分为二,递归计算
            } else { int middle = (from + to) / 2; SumTask taskLeft = new SumTask(numbers, from, middle); SumTask taskRight = new SumTask(numbers, middle+1, to); taskLeft.fork(); taskRight.fork(); return taskLeft.join() + taskRight.join(); } } } @Override public long sumUp(long[] numbers) { return pool.invoke(new SumTask(numbers, 0, numbers.length-1)); } }

Output:函数式编程

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19] currentThread:ForkJoinPool-1-worker-2,total:6,from:0,to:2 currentThread:ForkJoinPool-1-worker-3,total:36,from:10,to:12 currentThread:ForkJoinPool-1-worker-2,total:9,from:3,to:4 currentThread:ForkJoinPool-1-worker-3,total:29,from:13,to:14 currentThread:ForkJoinPool-1-worker-2,total:21,from:5,to:7 currentThread:ForkJoinPool-1-worker-3,total:70,from:15,to:18 currentThread:ForkJoinPool-1-worker-2,total:19,from:8,to:9 result:190

分析:

根据上面的示例代码,能够看出 fork() 和 join() 是 Fork/Join Framework “魔法”的关键。咱们能够根据函数名假设一下 fork() 和 join() 的做用:

  • fork():开启一个新线程(或是重用线程池内的空闲线程),将任务交给该线程处理。
  • join():等待该任务的处理线程处理完毕,得到返回值。

并非每一个 fork() 都会促成一个新线程被建立,而每一个 join() 也不是必定会形成线程被阻塞。

Fork/Join Framework 的实现算法并非那么“显然”,而是一个更加复杂的算法——这个算法的名字就叫作 work stealing 算法

  • ForkJoinPool 的每一个工做线程都维护着一个工做队列WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务ForkJoinTask)。
  • 每一个工做线程在运行中产生新的任务(一般是由于调用了 fork())时,会放入工做队列的队尾,而且工做线程在处理本身的工做队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
  • 每一个工做线程在处理本身的工做队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其余工做线程的工做队列),窃取的任务位于其余线程的工做队列的队首,也就是说工做线程在窃取其余工做线程的任务时,使用的是 FIFO 方式。
  • 在遇到 join() 时,若是须要 join 的任务还没有完成,则会先处理其余任务,并等待其完成。
  • 在既没有本身的任务,也没有能够窃取的任务时,进入休眠。

fork() 作的工做只有一件事,既是把任务推入当前工做线程的工做队列里

join() 的工做则复杂得多,也是 join() 可使得线程免于被阻塞的缘由——不像同名的 Thread.join()

  1. 检查调用 join() 的线程是不是 ForkJoinThread 线程。若是不是(例如 main 线程),则阻塞当前线程,等待任务完成。若是是,则不阻塞。
  2. 查看任务的完成状态,若是已经完成,直接返回结果。
  3. 若是任务还没有完成,但处于本身的工做队列内,则完成它。
  4. 若是任务已经被其余的工做线程偷走,则窃取这个小偷的工做队列内的任务(以 FIFO 方式),执行,以期帮助它早日完成欲 join 的任务。
  5. 若是偷走任务的小偷也已经把本身的任务所有作完,正在等待须要 join 的任务时,则找到小偷的小偷,帮助它完成它的任务。
  6. 递归地执行第5步。

所谓work-stealing模式,即每一个工做线程都会有本身的任务队列。当工做线程完成了本身全部的工做后,就会去“偷”别的工做线程的任务。

假如咱们须要作一个比较大的任务,咱们能够把这个任务分割为若干互不依赖的子任务,为了减小线程间的竞争,因而把这些子任务分别放到不一样的队列里,并为每一个队列建立一个单独的线程来执行队列里的任务,线程和队列一一对应,好比A线程负责处理A队列里的任务。可是有的线程会先把本身队列里的任务干完,而其余线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其余线程干活,因而它就去其余线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,因此为了减小窃取任务线程和被窃取任务线程之间的竞争,一般会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

submit

其实除了前面介绍过的每一个工做线程本身拥有的工做队列之外,ForkJoinPool 自身也拥有工做队列,这些工做队列的做用是用来接收由外部线程(非 ForkJoinThread 线程)提交过来的任务,而这些工做队列被称为 submitting queue 。

submit() 和 fork() 其实没有本质区别,只是提交对象变成了 submitting queue 而已(还有一些同步,初始化的操做)。submitting queue 和其余 work queue 同样,是工做线程”窃取“的对象,所以当其中的任务被一个工做线程成功窃取时,就意味着提交的任务真正开始进入执行阶段。

 

ForkJoinPool与ThreadPoolExecutor区别:

1.ForkJoinPool中的每一个线程都会有一个队列,而ThreadPoolExecutor只有一个队列,并根据queue类型不一样,细分出各类线程池

2.ForkJoinPool可以使用数量有限的线程来完成很是多的具备父子关系的任务,ThreadPoolExecutor中根本没有什么父子关系任务

3.ForkJoinPool在使用过程当中,会建立大量的子任务,会进行大量的gc,可是ThreadPoolExecutor不须要,所以单线程(或者任务分配平均)

4.ForkJoinPool在多任务,且任务分配不均是有优点,可是在单线程或者任务分配均匀的状况下,效率没有ThreadPoolExecutor高,毕竟要进行大量gc子任务

 

ForkJoinPool在多线程状况下,可以实现工做窃取(Work Stealing),在该线程池的每一个线程中会维护一个队列来存放须要被执行的任务。当线程自身队列中的任务都执行完毕后,它会从别的线程中拿到未被执行的任务并帮助它执行。

ThreadPoolExecutor由于它其中的线程并不会关注每一个任务之间任务量的差别。当执行任务量最小的任务的线程执行完毕后,它就会处于空闲的状态(Idle),等待任务量最大的任务执行完毕。

所以多任务在多线程中分配不均时,ForkJoinPool效率高。

 

stream中应用ForkJoinPool

Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .sorted((s1, s2) -> { System.out.format("sort: %s <> %s [%s]\n", s1, s2, Thread.currentThread().getName()); return s1.compareTo(s2); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName()));

parallelStream让部分Java代码自动地以并行的方式执行

最后:

有一点要注意,就是手动设置ForkJoinPool的线程数量时,实际线程数为设置的线程数+1,由于还有一个main主线程

即便将ForkJoinPool的通用线程池的线程数量设置为1,实际上也会有2个工做线程。所以线程数为1的ForkJoinPool通用线程池和线程数为2的ThreadPoolExecutor是等价的。

与ForkJoinPool对应的是CompletableFuture

Future以及相关使用方法提供了异步执行任务的能力,可是对于结果的获取倒是很不方便,只能经过阻塞或者轮询的方式获得任务的结果。

阻塞的方式显然和咱们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,并且也不能及时地获得计算结果

CompletableFuture就是利用观察者设计模式当计算结果完成及时通知监听者

在Java 8中, 新增长了一个包含50个方法左右的类: CompletableFuture,提供了很是强大的Future的扩展功能,能够帮助咱们简化异步编程的复杂性,提供了函数式编程的能力,能够经过回调的方式处理计算结果,而且提供了转换和组合CompletableFuture的方法。

具体讲解连接:http://colobu.com/2016/02/29/Java-CompletableFuture/

http://colobu.com/2018/03/12/20-Examples-of-Using-Java%E2%80%99s-CompletableFuture/

 

 

http://www.cnblogs.com/lixuwu/p/7979480.html#undefined

http://blog.dyngr.com/blog/2016/09/15/java-forkjoinpool-internals/

https://www.jianshu.com/p/8d7e3cc892cf

https://blog.csdn.net/dm_vincent/article/details/39505977

相关文章
相关标签/搜索