Fork/Join框架的核心是由下列两个类组成的。java
ForkJoinPool:这个类实现了 ExecutorServcie接口和工做窃取算法(work-Stealing Algorithm)。他管理工组者线程,并提供任务的状态信息,以及任务的执行信息。算法
ForkJoinTask:这个类是一个将在ForkJoinPool中执行的任务的基类。app
Fork/Join框架提供了在一个任务里执行fork()和join()操做的机制和控制状态的方法。一般,为了实现Fork/Join任务,须要实现一个如下两个类之一的子类。
框架
RecursiveAction: 用于惹怒没有返回结果的场景。less
RecursiveTask: 用于任务有返回结果的场景。ide
Code:this
package com.packtpub.java7.concurrency.chapter5.recipe01.util; /** * This class stores the data of a Product. It's name and it's price * */ public class Product { /** * Name of the product */ private String name; /** * Price of the product */ private double price; /** * This method returns the name of the product * @return the name of the product */ public String getName() { return name; } /** * This method establish the name of the product * @param name the name of the product */ public void setName(String name) { this.name = name; } /** * This method returns the price of the product * @return the price of the product */ public double getPrice() { return price; } /** * This method establish the price of the product * @param price the price of the product */ public void setPrice(double price) { this.price = price; } }
package com.packtpub.java7.concurrency.chapter5.recipe01.util; import java.util.ArrayList; import java.util.List; /** * This class generates a product list of a determined size. * Each product is initialized with a predefined name and price. * */ public class ProductListGenerator { /** * This method generates the list of products * @param size the size of the product list * @return the generated list of products */ public List<Product> generate (int size) { List<Product> ret=new ArrayList<Product>(); for (int i=0; i<size; i++){ Product product=new Product(); product.setName("Product "+i); product.setPrice(10); ret.add(product); } return ret; } }
package com.packtpub.java7.concurrency.chapter5.recipe01.task; import java.util.List; import java.util.concurrent.RecursiveAction; import com.packtpub.java7.concurrency.chapter5.recipe01.util.Product; /** * This class implements the tasks that are going to update the * price information. If the assigned interval of values is less that 10, it * increases the prices of the assigned products. In other case, it divides * the assigned interval in two, creates two new tasks and execute them * */ public class Task extends RecursiveAction { /** * serial version UID. The ForkJoinTask class implements the serializable interface. */ private static final long serialVersionUID = 1L; /** * List of products */ private List<Product> products; /** * Fist and Last position of the interval assigned to the task */ private int first; private int last; /** * Increment in the price of products this task has to apply */ private double increment; /** * Constructor of the class. Initializes its attributes * @param products list of products * @param first first element of the list assigned to the task * @param last last element of the list assigned to the task * @param increment price increment that this task has to apply */ public Task (List<Product> products, int first, int last, double increment) { this.products=products; this.first=first; this.last=last; this.increment=increment; } /** * Method that implements the job of the task * 若是last 和 first属性值的差别大于10,就建立两个新的Task对象,一个处理前一半的产品,另外一个处理后一半的产品 * 而后调用ForkJoinPool的invokeAll()方法来执行这两个新的任务。 */ @Override protected void compute() { if (last-first<10) { updatePrices(); } else { int middle=(last+first)/2; System.out.printf("Task: Pending tasks: %s\n",getQueuedTaskCount()); Task t1=new Task(products, first,middle+1, increment); Task t2=new Task(products, middle+1,last, increment); invokeAll(t1, t2); } } /** * Method that updates the prices of the assigned products to the task */ private void updatePrices() { for (int i=first; i<last; i++){ Product product=products.get(i); product.setPrice(product.getPrice()*(1+increment)); } } }
package com.packtpub.java7.concurrency.chapter5.recipe01.core; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import com.packtpub.java7.concurrency.chapter5.recipe01.task.Task; import com.packtpub.java7.concurrency.chapter5.recipe01.util.Product; import com.packtpub.java7.concurrency.chapter5.recipe01.util.ProductListGenerator; /** * Main class of the example. It creates a list of products, a ForkJoinPool and * a task to execute the actualization of products. * */ public class Main { /** * Main method of the example * @param args */ public static void main(String[] args) { // Create a list of products ProductListGenerator generator=new ProductListGenerator(); List<Product> products=generator.generate(10000); // Craete a task Task task=new Task(products, 0, products.size(), 0.20); // Create a ForkJoinPool ForkJoinPool pool=new ForkJoinPool(); // Execute the Task pool.execute(task); // Write information about the pool do { System.out.printf("Main: Thread Count: %d\n",pool.getActiveThreadCount()); System.out.printf("Main: Thread Steal: %d\n",pool.getStealCount()); System.out.printf("Main: Paralelism: %d\n",pool.getParallelism()); try { TimeUnit.MILLISECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } while (!task.isDone()); // Shutdown the pool pool.shutdown(); // Check if the task has completed normally if (task.isCompletedNormally()){ System.out.printf("Main: The process has completed normally.\n"); } // Expected result: 12. Write products which price is not 12 for (int i=0; i<products.size(); i++){ Product product=products.get(i); if (product.getPrice()!=12) { System.out.printf("Product %s: %f\n",product.getName(),product.getPrice()); } } // End of the program System.out.println("Main: End of the program.\n"); } }