Fork/Join框架是Java7提供了的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每一个小任务结果后获得大任务结果的框架,这种开发方法也叫分治编程。分治编程能够极大地利用CPU资源,提升任务执行的效率,也是目前与多线程有关的前沿技术。java
分治的原理上面已经介绍了,就是切割大任务成小任务来完成。看起来好像也不难实现啊!为何专门弄一个新的框架呢?算法
咱们先看一下,在不使用 Fork-Join 框架时,使用普通的线程池是怎么实现的。编程
咦,好像一切都很美好。真的吗?别忘了, 每个切割任务的线程(如线程A)都被阻塞了,直到其子任务完成,才能继续往下运行 。若是任务太大了,须要切割屡次,那么就会有多个线程被阻塞,性能将会急速降低。更糟糕的是,若是你的线程池的线程数量是有上限的,很可能会形成池中全部线程被阻塞,线程池没法执行任务。多线程
public class CountTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
//建立一个计算任务,计算 由1加到12
CountTask countTask = new CountTask(1, 12);
Future<Integer> future = forkJoinPool.submit(countTask);
System.out.println("最终的计算结果:" + future.get());
}
}
class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 2;
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
boolean canCompute = (end - start) <= THRESHOLD;
//任务已经足够小,能够直接计算,并返回结果
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
System.out.println("执行计算任务,计算 " + start + "到 " + end + "的和 ,结果是:" + sum + " 执行此任务的线程:" + Thread.currentThread().getName());
return sum;
} else { //任务过大,须要切割
System.out.println("任务过大,切割的任务: " + start + "加到 " + end + "的和 执行此任务的线程:" + Thread.currentThread().getName());
int middle = (start + end) / 2;
//切割成两个子任务
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
//执行子任务
leftTask.fork();
rightTask.fork();
//等待子任务的完成,并获取执行结果
invokeAll(leftTask,rightTask);
// int leftResult = leftTask.join();
// int rightResult = rightTask.join();
//合并子任务
// sum = leftResult + rightResult;
// return sum;
return leftTask.join()+rightTask.join();
}
}
}
复制代码
运行结果:并发
切割的任务:1加到10 执行此任务的线程是 pool-1-thread-1
切割的任务:1加到5 执行此任务的线程是 pool-1-thread-2
切割的任务:6加到10 执行此任务的线程是 pool-1-thread-3
复制代码
池的线程只有三个,当任务分割了三次后,池中的线程也就都被阻塞了,没法再执行任何任务,一直卡着动不了。框架
为了解决这个问题,工做窃取算法呼之欲出异步
针对上面的问题,Fork-Join 框架使用了“工做窃取(work-stealing)”算法。工做窃取(work-stealing)算法是指某个线程从其余队列里窃取任务来执行。在《Java 并发编程的艺术》对工做窃取算法的解释:ide
使用工做窃取算法有什么优点呢?假如咱们须要作一个比较大的任务,咱们能够把这个任务分割为若干互不依赖的子任务,为了减小线程间的竞争,因而把这些子任务分别放到不一样的队列里,并为每一个队列建立一个单独的线程来执行队列里的任务,线程和队列一一对应,好比A线程负责处理A队列里的任务。可是有的线程会先把本身队列里的任务干完,而其余线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其余线程干活,因而它就去其余线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,因此为了减小窃取任务线程和被窃取任务线程之间的竞争,一般会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。性能
Fork-Join 框架中的工做窃取算法的优势能够总结为如下两点:this
Fork/Join有三个核心类:
由于ForkJoinTask比较复杂,抽象方法比较多,平常使用时通常不会继承ForkJoinTask来实现自定义的任务,而是继承ForkJoinTask的两个子类,实现 compute() 方法:
RecursiveTask: 子任务带返回结果时使用
RecursiveAction: 子任务不带返回结果时使用
复制代码
compute 方法的实现模式通常是:
if 任务足够小
直接返回结果
else
分割成N个子任务
依次调用每一个子任务的fork方法执行子任务
依次调用每一个子任务的join方法合并执行结果
复制代码
计算 1+2+....+12 的结果。
使用Fork/Join框架首先要考虑到的是如何分割任务,若是咱们但愿每一个子任务最多执行两个数的相加,那么咱们设置分割的阈值是2,因为是12个数字相加。同时,观察执行任务的线程名称,理解工做窃取算法的实现。
public class CountTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
//建立一个计算任务,计算 由1加到12
CountTask countTask = new CountTask(1, 12);
Future<Integer> future = forkJoinPool.submit(countTask);
System.out.println("最终的计算结果:" + future.get());
}
}
class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 2;
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
boolean canCompute = (end - start) <= THRESHOLD;
//任务已经足够小,能够直接计算,并返回结果
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
System.out.println("执行计算任务,计算 " + start + "到 " + end + "的和 ,结果是:" + sum + " 执行此任务的线程:" + Thread.currentThread().getName());
} else { //任务过大,须要切割
System.out.println("任务过大,切割的任务: " + start + "加到 " + end + "的和 执行此任务的线程:" + Thread.currentThread().getName());
int middle = (start + end) / 2;
//切割成两个子任务
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
//执行子任务
leftTask.fork();
rightTask.fork();
//等待子任务的完成,并获取执行结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
//合并子任务
sum = leftResult + rightResult;
}
return sum;
}
}
复制代码
运行结果:
任务过大,切割的任务: 1加到 12的和 执行此任务的线程:ForkJoinPool-1-worker-1
任务过大,切割的任务: 7加到 12的和 执行此任务的线程:ForkJoinPool-1-worker-3
任务过大,切割的任务: 1加到 6的和 执行此任务的线程:ForkJoinPool-1-worker-2
执行计算任务,计算 7到 9的和 ,结果是:24 执行此任务的线程:ForkJoinPool-1-worker-3
执行计算任务,计算 1到 3的和 ,结果是:6 执行此任务的线程:ForkJoinPool-1-worker-1
执行计算任务,计算 4到 6的和 ,结果是:15 执行此任务的线程:ForkJoinPool-1-worker-1
执行计算任务,计算 10到 12的和 ,结果是:33 执行此任务的线程:ForkJoinPool-1-worker-3
最终的计算结果:78
复制代码
从结果能够看出,提交的计算任务是由线程1执行,线程1进行了第一次切割,切割成两个子任务 “7加到12“ 和 ”1加到6“,并提交这两个子任务。而后这两个任务便被 线程二、线程3 给窃取了。线程1 的内部队列中已经没有任务了,这时候,线程二、线程3 也分别进行了一次任务切割并各自提交了两个子任务,因而线程1也去窃取任务(这里窃取的都是线程2的子任务)。
遍历指定目录(含子目录)找寻指定类型文件
public class FindDirsFiles extends RecursiveAction{
/** * 当前任务须要搜寻的目录 */
private File path;
public FindDirsFiles(File path) {
this.path = path;
}
public static void main(String [] args){
try {
// 用一个 ForkJoinPool 实例调度总任务
ForkJoinPool pool = new ForkJoinPool();
FindDirsFiles task = new FindDirsFiles(new File("D:/"));
//异步调用
pool.execute(task);
System.out.println("Task is Running......");
Thread.sleep(1);
int otherWork = 0;
for(int i=0;i<1000000;i++){
otherWork = otherWork+i;
}
System.out.println("Main Thread done sth......,otherWork=" + otherWork);
//阻塞的方法
task.join();
System.out.println("Task end");
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
protected void compute() {
List<FindDirsFiles> subTasks = new ArrayList<>();
File[] files = path.listFiles();
if(files!=null) {
for(File file:files) {
if(file.isDirectory()) {
subTasks.add(new FindDirsFiles(file));
}else {
//遇到文件,检查
if(file.getAbsolutePath().endsWith("txt")) {
System.out.println("文件:"+file.getAbsolutePath());
}
}
}
if(!subTasks.isEmpty()) {
for (FindDirsFiles subTask : invokeAll(subTasks)) {
//等待子任务执行完成
subTask.join();
}
}
}
}
}
复制代码