Fork/Join 框架(一):建立Fork/Join

说到Fork/Join 框架 ,得不提起执行器框架(Executor Framework),它将任务的建立和执行进行了分离,经过Executor Framework只须要实现Runnable接口的对象和使用Executor对象,而后将Runnable 对象发送个执行器。执行器在负责运行这些任务所须要的线程,包括线程的建立,线程的管理以及线程的结束。html

java 7 又更近了一步,它包括了ExecutorService 接口的另外一种实现,用来解决特殊类型的问题。就是Fork/Join 框架,有时也称为”分解/合并框架“。java

Fork/Join 框架,是用来解决可以经过分治技术,将问题拆分为小任务的问题。它和执行器框架的只要区别在于工做窃取算法。算法

下面实现简单的例子,咱们实现一项更新产品价格的任务。最初的任务将负责更新列表中的全部的元素的价格。若是一个任务须要更新大于10个元素,将分为两个部分去执行。而后再去更新各自部分的产品价格。框架

一、建立一个产品类,用来存储产品的名称和价格
package five2;
/**
 * 
 * @author qpx
 *
 */
public class Product {
	
	private String name;
	private double price;
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public double getPrice() {
		return price;
	}
	public void setPrice(double price) {
		this.price = price;
	}
	
	

}

  二、建立生成一个随机产品列表的类异步

package five2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * 随机产品列表
 * @author qpx
 *
 */
public class productListGenetator {
	
	public List<Product> genetate(int size){
		
		List<Product> products = new ArrayList<>();
		for(int i = 0 ;i<size;i++){
			Product product = new Product();
			product.setName("Product"+i);
			product.setPrice(10);
			products.add(product);
			
			
		}
		return products;
		
	}
	
	public static void main(String[] args) {
		List<Integer> aaa = new ArrayList<Integer>();
		aaa.add(1);
		aaa.add(0, 2);
		
		System.out.println(Arrays.toString(aaa.toArray()));
	}

}

 三、建立一个Task 的类,继承RecursiveAction 类,这个是主任务类ide

package five2;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

public class Task extends RecursiveAction {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	private List<Product> products;

	private int first;
	private int last;

	private double increment;

	public Task(List<Product> products, int first, int last, double increment) {
		super();
		this.products = products;
		this.first = first;
		this.last = last;
		this.increment = increment;
	}

	@Override
	protected void compute() {
		// TODO Auto-generated method stub‘
		if (this.last - this.first < 10) {

			updateprices();
		} else {
			int middle = (last + first) / 2+1;
			//System.out.println("middle:"+middle);
			System.out.printf(Thread.currentThread().getName()+"  Task:Pending tasks:%s\n", getQueuedTaskCount());
			Task t1 = new Task(products, first, (middle), increment);
			//System.out.println("t1:first:"+first+",last:"+middle);
			Task t2 = new Task(products, middle, last, increment);
			//System.out.println("t2:first:"+middle+",last:"+last);
			invokeAll(t1, t2);

		}

	}

	private void updateprices() {
		// TODO Auto-generated method stub
		for (int i = first; i < last; i++) {
			Product product = products.get(i);
			product.setPrice(product.getPrice() * (1 + increment));
			System.out.println(Thread.currentThread().getName() + "的i值:"    
                    + i);    

		}
	}

	public static void main(String[] args) throws InterruptedException {
			productListGenetator a = new productListGenetator();
			List<Product> products = a.genetate(1000);
			
			Task task = new Task(products,0,products.size(),0.20);
			
			//ForkJoinPool pool = new ForkJoinPool(10);
		        ForkJoinPool pool = new ForkJoinPool();

			
			pool.submit(task);
			
			
			do{
				System.out.printf("Main: 线程数量:%d\n",pool.getActiveThreadCount());
				
				System.out.printf("Main: Thread 窃取数量:%d\n",pool.getStealCount());
				
				System.out.printf("Main: Thread 平行数量:%d\n",pool.getParallelism());
				
				
				TimeUnit.MILLISECONDS.sleep(5);

				
				
			}while(!task.isDone());
			pool.shutdown();
			
			
			if(task.isCompletedNormally()){
				System.out.printf("Main: The process has completed normally.\n");
				
				
			}
			
			for(int i = 0;i<products.size();i++){
				
				Product p = products.get(i);
				if(p.getPrice()!=12){
					
					System.out.printf("Product %s:%f\n",p.getName(),p.getName());
				}
				
				
			}
			
			System.out.printf("Main:End of the Program.\n");
	}
}

 注意:咱们采用了无参的构造方式建立了this

ForkJoinPool pool = new ForkJoinPool(); 他将执行默认的配置,建立一个线程书等于计算机CPU数目的线程池。

   另外ForkJionPool 类还提供了如下方法用于执行任务。spa

   

execute(Runnabletask) 注意的是使用Runnable对象时,ForkJionPool 不会采用工做窃取算法。仅仅实用ForkJoinTask类的时候采用工做窃取算法
invoke(ForkJoinTask<T> list)  execute方法是异步调用的,此方法是同步调用的
相关文章
相关标签/搜索