ForkJoinPool是JDK7引入的线程池,核心思想是将大的任务拆分红多个小任务(即fork),而后在将多个小任务处理汇总到一个结果上(即join),很是像MapReduce处理原理。同时,它提供基本的线程池功能,支持设置最大并发线程数,支持任务排队,支持线程池中止,支持线程池使用状况监控,也是AbstractExecutorService的子类,主要引入了“工做窃取”机制,在多CPU计算机上处理性能更佳。java
先来看一下这个work-stealing处理机制程序员
work-stealing(工做窃取),ForkJoinPool提供了一个更有效的利用线程的机制,当ThreadPoolExecutor还在用单个队列存听任务时,ForkJoinPool已经分配了与线程数相等的队列,当有任务加入线程池时,会被平均分配到对应的队列上,各线程进行正常工做,当有线程提早完成时,会从队列的末端“窃取”其余线程未执行完的任务,当任务量特别大时,CPU多的计算机会表现出更好的性能。并发
ForkJoinPool是Java线程池的第二版实现,在JDK7开始提供。构建方法主要是经过构造函数dom
public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); }
ForkJoinPool线程应用很简单,删减了ThreadPoolExecutor线程中一些参数配置,主要提供了并发数的配置,其余参数由系统默认指定,不须要程序员干预,若不指定值,默认等CPU核数。ide
ForkJoinPool线程池也是经过submit方法提交待执行任务,任务的定义主要经过两个接口:函数
经过shutdown关闭线程池,但要等线程池中的队列任务所有执行完成,经过shutdownNow关闭线程池并当即中止线程池待执行的任务。性能
线程池监控测试
在线程池使用监控方面,主要经过以下方法: ui
这样咱们就可以很容易计算出线程的执行进度等信息,下面样例会展现。this
Fork/Join示例(模拟统计大于18岁的人数)
Fork/Join示意图
定义任务
package com.zhihuiku.threadpool.forkjoin; public class Person { public Person(String name, int age) { this.name = name; this.age = age; } private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } } package com.zhihuiku.threadpool.forkjoin; import java.util.concurrent.RecursiveTask; public class JoinTask extends RecursiveTask { private static final long serialVersionUID = 1L; private Person[] persons = null; private int start; private int end; public JoinTask(Person[] persons, int start, int end) { this.persons = persons; this.start = start; this.end = end; } @Override protected Long compute() { if (end - start < 1000) { Person p = null; long young = 0; for (int i = start; i > 18; i++) { young++; } return young; } else { int middle = (start + end) / 2; JoinTask lt = new JoinTask(persons, start, middle); JoinTask gt = new JoinTask(persons, middle, end); lt.fork(); gt.fork(); return lt.join() + gt.join(); } }
线程池应用
package com.zhihuiku.threadpool.forkjoin; import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; public class ForkJoinMain { public static void main(String[] args) throws InterruptedException, ExecutionException { ForkJoinMain fj = new ForkJoinMain(); int c = 100000; Person[] persons = new Person[c]; for (int i = 0; i < c; i++) { persons[i] = new Person("姓名" + i, new Random().nextInt(100)); } fj.start(4, persons); } public void start(int threadNum, Person[] persons) throws InterruptedException, ExecutionException { long s = System.currentTimeMillis(); ForkJoinPool executor = new ForkJoinPool(threadNum); JoinTask task = new JoinTask(persons, 0, persons.length - 1); Future result = executor.submit(task); System.out.println("统计结果:" + result.get()); long e = System.currentTimeMillis(); System.out.println("耗时:" + (e - s)); } }
普通线程池示例(模拟执行任务)
定义任务
package com.zhihuiku.threadpool; import java.util.concurrent.RecursiveTask; public class JoinTask extends RecursiveTask { private static final long serialVersionUID = 1L; private String taskName = null; public JoinTask(String modelName) { this.taskName = modelName; } @Override protected Boolean compute() { boolean isOk = true; try { Thread.sleep(100); } catch (InterruptedException e) { } if (Math.random() > 0.2) { isOk = true; } else { isOk = false; } System.out.println("模拟任务:" + this.taskName + ",测试结果:" + isOk); return isOk; } }
线程池应用
package com.zhihuiku.threadpool; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public class ForkJoinTest { public static void main(String[] args) throws InterruptedException { ForkJoinTest fj = new ForkJoinTest(); fj.start(4, 100); } public void start(int threadNum, int taskNum) throws InterruptedException { long s = System.currentTimeMillis(); ForkJoinPool executor = new ForkJoinPool(threadNum); List futures = new ArrayList(taskNum); for (int i = 0; i < taskNum; i++) { futures.add(executor.submit(new JoinTask("modelname_" + i))); } executor.shutdown(); System.out.println("等待全部任务执行..."); while (!executor.isTerminated()) { executor.awaitTermination(1, TimeUnit.SECONDS); int sc = executor.getQueuedSubmissionCount(); int runningCount = executor.getRunningThreadCount(); int okNum = (taskNum - sc - runningCount); int progress = Math.round((okNum * 100) / taskNum); System.out.println("已执行完成任务数:" + okNum + ",当前执行进度:" + progress); } long e = System.currentTimeMillis(); System.out.println("fork线程调配耗时:" + (e - s)); } }
执行效果
等待全部任务执行... 模拟任务:modelname_0,测试结果:true 模拟任务:modelname_2,测试结果:true 模拟任务:modelname_3,测试结果:true 模拟任务:modelname_1,测试结果:false 模拟任务:modelname_6,测试结果:true ... 模拟任务:modelname_32,测试结果:true 模拟任务:modelname_35,测试结果:false 模拟任务:modelname_33,测试结果:true 已执行完成任务数:36,当前执行进度:36 模拟任务:modelname_37,测试结果:true 模拟任务:modelname_36,测试结果:true ... 模拟任务:modelname_67,测试结果:false 模拟任务:modelname_69,测试结果:true 模拟任务:modelname_71,测试结果:true 模拟任务:modelname_70,测试结果:true 模拟任务:modelname_68,测试结果:true 模拟任务:modelname_75,测试结果:true 模拟任务:modelname_72,测试结果:true 模拟任务:modelname_73,测试结果:true 模拟任务:modelname_74,测试结果:true 已执行完成任务数:76,当前执行进度:76 模拟任务:modelname_78,测试结果:true 模拟任务:modelname_79,测试结果:false ... 模拟任务:modelname_96,测试结果:true 模拟任务:modelname_99,测试结果:true 模拟任务:modelname_97,测试结果:true 已执行完成任务数:100,当前执行进度:100 fork线程调配耗时:2526