说到Fork/Join 框架 ,得不提起执行器框架(Executor Framework),它将任务的建立和执行进行了分离,经过Executor Framework只须要实现Runnable接口的对象和使用Executor对象,而后将Runnable 对象发送个执行器。执行器在负责运行这些任务所须要的线程,包括线程的建立,线程的管理以及线程的结束。html
java 7 又更近了一步,它包括了ExecutorService 接口的另外一种实现,用来解决特殊类型的问题。就是Fork/Join 框架,有时也称为”分解/合并框架“。java
Fork/Join 框架,是用来解决可以经过分治技术,将问题拆分为小任务的问题。它和执行器框架的只要区别在于工做窃取算法。算法
下面实现简单的例子,咱们实现一项更新产品价格的任务。最初的任务将负责更新列表中的全部的元素的价格。若是一个任务须要更新大于10个元素,将分为两个部分去执行。而后再去更新各自部分的产品价格。框架
一、建立一个产品类,用来存储产品的名称和价格 package five2; /** * * @author qpx * */ public class Product { private String name; private double price; public String getName() { return name; } public void setName(String name) { this.name = name; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } }
二、建立生成一个随机产品列表的类异步
package five2; import java.util.ArrayList; import java.util.Arrays; import java.util.List; /** * 随机产品列表 * @author qpx * */ public class productListGenetator { public List<Product> genetate(int size){ List<Product> products = new ArrayList<>(); for(int i = 0 ;i<size;i++){ Product product = new Product(); product.setName("Product"+i); product.setPrice(10); products.add(product); } return products; } public static void main(String[] args) { List<Integer> aaa = new ArrayList<Integer>(); aaa.add(1); aaa.add(0, 2); System.out.println(Arrays.toString(aaa.toArray())); } }
三、建立一个Task 的类,继承RecursiveAction 类,这个是主任务类ide
package five2; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; public class Task extends RecursiveAction { /** * */ private static final long serialVersionUID = 1L; private List<Product> products; private int first; private int last; private double increment; public Task(List<Product> products, int first, int last, double increment) { super(); this.products = products; this.first = first; this.last = last; this.increment = increment; } @Override protected void compute() { // TODO Auto-generated method stub‘ if (this.last - this.first < 10) { updateprices(); } else { int middle = (last + first) / 2+1; //System.out.println("middle:"+middle); System.out.printf(Thread.currentThread().getName()+" Task:Pending tasks:%s\n", getQueuedTaskCount()); Task t1 = new Task(products, first, (middle), increment); //System.out.println("t1:first:"+first+",last:"+middle); Task t2 = new Task(products, middle, last, increment); //System.out.println("t2:first:"+middle+",last:"+last); invokeAll(t1, t2); } } private void updateprices() { // TODO Auto-generated method stub for (int i = first; i < last; i++) { Product product = products.get(i); product.setPrice(product.getPrice() * (1 + increment)); System.out.println(Thread.currentThread().getName() + "的i值:" + i); } } public static void main(String[] args) throws InterruptedException { productListGenetator a = new productListGenetator(); List<Product> products = a.genetate(1000); Task task = new Task(products,0,products.size(),0.20); //ForkJoinPool pool = new ForkJoinPool(10); ForkJoinPool pool = new ForkJoinPool(); pool.submit(task); do{ System.out.printf("Main: 线程数量:%d\n",pool.getActiveThreadCount()); System.out.printf("Main: Thread 窃取数量:%d\n",pool.getStealCount()); System.out.printf("Main: Thread 平行数量:%d\n",pool.getParallelism()); TimeUnit.MILLISECONDS.sleep(5); }while(!task.isDone()); pool.shutdown(); if(task.isCompletedNormally()){ System.out.printf("Main: The process has completed normally.\n"); } for(int i = 0;i<products.size();i++){ Product p = products.get(i); if(p.getPrice()!=12){ System.out.printf("Product %s:%f\n",p.getName(),p.getName()); } } System.out.printf("Main:End of the Program.\n"); } }
注意:咱们采用了无参的构造方式建立了this
ForkJoinPool pool = new ForkJoinPool(); 他将执行默认的配置,建立一个线程书等于计算机CPU数目的线程池。
另外ForkJionPool 类还提供了如下方法用于执行任务。spa
execute(Runnabletask) 注意的是使用Runnable对象时,ForkJionPool 不会采用工做窃取算法。仅仅实用ForkJoinTask类的时候采用工做窃取算法 invoke(ForkJoinTask<T> list) execute方法是异步调用的,此方法是同步调用的