thread_fork/join并发框架1

一.线程并发控制  Thread、Executor、ForkJoin和Actor
  1.裸线程
      Runnable接口,调用.start()开始,没有现成的API来结束线程,你须要本身来实现,经过相似boolean类型的标记来通信算法

     使用Runnable对象,而后建立对应的Thread对象来控制程序中这些线程的建立、执行以及线程的状态多线程

  2.ExecutorFrameworkExecutor和ExecutorService接口:执行器框架将任务的建立和执行进行了分离,经过这个框架只须要实现Runnable接口的对象和使用Executor对象,而后将Runnable对象发送给执行器,执行器再负责这些任务锁须要的线程,包括线程的建立管理已经线程的结束并发

  public interface Executor {
    void execute(Runnable command);
  }
  ExecutorService管理executor的生命周期,以及CompletionService会抽象掉更多细节,做为已完成任务的队列
  Executors.newFixedThreadPool(4)
   3.经过并行流,使用ForkJoinPool (FJP) 框架

     Fork/Join 分解/合并框架 dom

用来解决分治技术,将问题拆分红若干小任务,在一个任务中,先检查要解决任务的问题的大小,决定是否再拆分任务。若是不须要拆分任务了,就在当前任务中解决问题。而后根据须要返回任务的结果。异步

ForkJoinPool类看做一个特殊的Executor执行器类型,这个框架包括如下两个操做:
 分解(Fork)操做:当须要将一个任务拆分红更小的多个任务时,在框架中执行这些任务
 合并(Join)操做: 当一个主任务等待其建立的多个子任务的完成执行。
Fork/Join 和 ExecutorFramework 主要的区别在于 工做窃取算法(Work-StealingAlgroithms),使用Join操做让一个主任务等待它所建立的子任务的完成,执行这个任务的线程称之为 工做者线程(worker Thread),工做者线程会寻找其余任未被执行的任务,而后开始执行。 什么是工做窃取算法?:就是指某个线程从其余队列里窃取任务来执行。从而提高了性能。分布式

为了达到以上的效果,Fork/Join框架有如下几点限制:
a. 任务只能使用fork() 和 join() 操做当同步机制。若是使用其余的同步机制,工做线程就不能执行其余的任务,固然这些任务是在同步操做里时。好比在Fork/Join框架中将一个任务休眠,正在执行这个任务的工做者线程在休眠期内不能执行另外一个任务。
b. 任务不能执行I/O操做,好比文件数据的读取与写入
c. 任务不能抛出非运行时异常(Checke Exception),必须在代码中处理这些异常ide

  4.actor模型中  Akka Actors
      actor模型中,一切都看作是一个actor。一个actor是一个计算实体,它能够从其余actor那里接收消息。在应答消息时,它能够给其余actor发送消息,或者建立新的actor并与之交互,或者只改变本身的内部状态。
      这是一个很是强大的概念。生命周期和消息传递由你的框架来管理,你只须要指定计算单元是什么就能够了。另外,actor模型强调避免全局状态,这会带来不少便利。你能够应用监督策略,例如免费重试,更简单的分布式系统设计,错误容忍度等等。性能

二.简单应用举例测试

   转自  http://blog.csdn.net/mr_zhuqiang/article/details/48300229

   Fork/Join框架的核心是由ForkJoinPool和ForkJoinTask组成

  ForkJoinPool : 这个类实现了 ExecutorServic接口和工做窃取算法。它管理工做者线程,并提供任务的状态信息,以及任务的执行信息
  ForkJoinTask :是一个将在ForkJoinPool中执行的任务的基类
  Fork/Join框架提供了在一个任务里执行fork和join操做的机制和控制任务状态的方法,一般,为了实现Fork/Join任务,须要实现如下两个类之一的子类。

    RecursiveAction :用于任务没有返回结果的场景,一个ForkJoinTask任务类,递归无结果的任务类。相似callable同样的线程任务, 

    RecursiveTask :  用于任务有返回结果的场景

  下示例描述了,批量修改不少商品的价格,使用Fork/Join线程池 和 RecursiveAction(ForkJoinTask)来实现 递归的分配任务执行   

    public static void main(String[] args) throws InterruptedException {
        // 生成商品数据
        List<Product> list = new ArrayList<Product>();
        for (int i = 0; i < 40; i++) {
            Product p = new Product("苹果" + i, 10);
            list.add(p);
        }
        ///////////////////////////////////////////////
        ForkJoinPool fjp = new ForkJoinPool();

        Task task = new Task(list, 0, list.size(), 19);
        fjp.execute(task);
        // fjp.shutdown(); //关闭线程池
        // fjp.awaitTermination(1, TimeUnit.MINUTES);
        //等待超时。结合shutdown来让任务一完成就继续执行下面的代码

        // 使用循环的方式来查看任务的信息
        do {
            System.out.printf("活跃线程:%s,这一个参数  %s,并行执行的最大数量:%s\n", 
                    fjp.getActiveThreadCount(), 
                    fjp.getStealCount(),
                    fjp.getParallelism());

        } while (!task.isDone()); // 若是任务还未完成,则继续获取信息
        // 若是这个任务完成没有抛出异常并无取消。
        if (task.isCompletedNormally()) { 
            System.out.println("main:任务完成");
        }
        
        System.out.println("main:------------------------------  打印任务结果");
        for (Product product : list) {
            int price = product.getPrice();
            String name = product.getName();
            if (price != 19) { // 结果不是所指望的。就打印出来
                System.out.println(name + "," + price);
            }
        }
        System.out.println("main:------------------------------  打印任务结束");
    }
}

class Product {
    private String name;
    private int price;

    public Product(String name, int price) {
        this.name = name;
        this.price = price;
    }

    public String getName() {
        return name;
    }
    ......
}

class Task extends RecursiveAction {
    private List<Product> list; // 全部任务
    private int start;          // 处理任务的开始索引
    private int end;            // 处理任务的结束索引
    private int price;          // 更改的价格

    public Task(List<Product> list, int start, int end, int price) {
        this.list = list;
        this.start = start;
        this.end = end;
        this.price = price;
    }

    @Override
    protected void compute() {
        if (end - start <= 10) { // 每一个task 只能处理10条数据。
            System.out.printf("起始:start:%s,end:%s\n", start, end);
            update();
        } else { // 多余的数据,则须要分给更多的任务
            int middle = (end + start) / 2; // 由于是索引。因此须要开始和结尾相加,而后除以2 就能获得
                                            // 两个索引之间的数值
            Task task1 = new Task(list, start, middle, 19);
            Task task2 = new Task(list, middle, end, 19);
            
            System.out.printf("分析:middle:%s,start:%s,end:%s\n", middle, start, end); // 方便推算
            // 这里把任务分红了2半递归执行
 invokeAll(task1, task2); 
        }
    }

    // 根据给定的起始索引和结束索引更新结果
    private void update() {
        for (int i = start; i < end; i++) {
            Product product = list.get(i);
            product.setPrice(price);
 
            System.out.printf("%s,修改了价格,索引:%s,%s,%s\n",
             Thread.currentThread().getName(),
              i,product.getName(),product.getPrice() );
        }
    }
}

结果分析:
   起始信息:咱们有40个商品,每一个任务处理10个商品。恰好4个工做线程处理。
  分析信息:去中间索引,这个分析在商品数量不能被2整除的时候颇有用,在不能被2整除的状况下,该示例任然会尽量的均衡分配任务的数量
工做原理
  invokeAll方法来执行一个主任务锁建立的多个子任务,这个是一个同步的调用,主任务将等待子任务的完成,而后继续执行(有多是结束),当这个主任务等待它的子任务时,执行这个主任务的工做者线程接收另外一个等待执行的任务并开始执行(并行),正由于有了这个行为,因此说Fork/Join框架提供了一种比Runnable和Callable对象更加高效的任务管理机制。
  ForkJoinTask类的invokeAll方法是执行器框架ExecutorFramework和Fork/Join框架之间的主要差别之一。在执行器框架中。
  在执行器框架中:全部的任务必须发送给执行器
  在Fork/Join框架:线程池中包含了待执行方法的任务,任务的控制也是在线程池中进行的,咱们在task类中使用了invokeAll方法,task类继承了RecursiveAction,而RecursiveAction类则继承了ForkJoinTask.

.经常使用方法

1.fork join get

   fork()方法容许ForkJoinTask任务异步执行,也容许一个新的ForkJoinTask从存在的ForkJoinTask中被启动。
   join()方法容许一个ForkJoinTask等待另外一个ForkJoinTask执行完成。

   fork()只会让ForkJoinPool调度一个新的任务,而不会建立子虚拟机。

   RecursiveTask.join() : 也是用来获取任务的合并结果

   RecursiveTask.get(long timeout,TimeUnit unit) : 该方法,是给定一个指定的超时时间,若是超时尚未返回结果则返回null 

   invokeAll(task1,task2): 是一个同步的方法,任务会被挂起,等待子任务发送到线程池中而且直到完成


2.RecursiveAction 和RecursiveTask

   RecursiveAction的实例表明执行没有返回结果。
   RecursiveTask会有返回值。下面例子 返回值

public class ForkJoin2Test {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //生成随机矩阵
        final int rows = 10000; // 矩阵行数
        final int cols = 10000; // 矩阵列数
        final int number = 5;   // 查找的数字
        long start = System.currentTimeMillis();
        MatrixMock mock = new MatrixMock(rows, cols, number); // 生成矩阵对象
        long end = System.currentTimeMillis();
        System.out.println("建立矩阵花费时间:" + (end - start));

        //执行任务
        ForkJoinPool pool = new ForkJoinPool();
        Task2 task = new Task2(mock, 0, rows, 5);

        start = System.currentTimeMillis();
        pool.execute(task);
        pool.shutdown();
        //
        pool.awaitTermination(1, TimeUnit.MILLISECONDS);
        System.out.println("线程搜索的结果是:" + task.get());

        end = System.currentTimeMillis();
        System.out.println("线程搜索时间是:" + (end - start));

        start = System.currentTimeMillis();
        int temp = 0;
        for (int i = 0; i < rows; i++) {
            int[] rs = mock.getRow(i);
            for (int row : rs) {
                if (5 == row) {
                    temp++;
                }
            }
        }
        end = System.currentTimeMillis();
        System.out.println("单线程搜索结果是:" + temp);
        System.out.println("单线程搜索时间是:" + (end - start));
    }
}

// 任务类。查找数字出现的次数
class Task2 extends RecursiveTask<Integer> {
    private static final long serialVersionUID = 1L;
    private MatrixMock mock;
    private int start; // 查询起始行索引
    private int end; // 查询结束行索引
    private int num; // 要查找的数字

    public Task2(MatrixMock mock, int start, int end, int num) {
        this.mock = mock;
        this.start = start;
        this.end = end;
        this.num = num;
    }

    @Override
    protected Integer compute() {
        int result = 0;

        if (end - start < 100) { // 每一个任务最多负责5行数据
            result = this.search();
            // 适合矩阵小的时候 查看对比结果
            // System.out.printf("%s,搜索起始行是:%s-%s,搜索结果是:%s\n",Thread.currentThread().getName(),start,end,result);
        } else { // 不然则拆分红两个子任务
            int mid = (end + start) / 2;
            Task2 task1 = new Task2(mock, start, mid, num);
            Task2 task2 = new Task2(mock, mid, end, num);
            invokeAll(task1, task2);
            try {
                result = task1.get() + task2.get(); // 两个结果相加,要想到 该框架的特性就是 递归
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        return result;
    }

    // 计算当前任务分配的行数
    private int search() {
        int result = 0;
        for (int i = start; i < end; i++) {
            int[] rows = mock.getRow(i);
            for (int row : rows) {
                if (num == row) {
                    result++;
                }
            }
        }
        return result;
    }
}

// 随机矩阵
class MatrixMock {
    private int[][] data;

    public MatrixMock(int size, int cols, int number) {
        data = new int[size][cols];
        Random random = new Random();

        int counter = 0;
        // 用随机数为矩阵赋值。每生成一个字,就用它跟要查找的数字比较,进行比较。若是一致,就用计数器加1
        for (int i = 0; i < size; i++) {
            for (int j = 0; j < cols; j++) {
                data[i][j] = random.nextInt(10);
                if (data[i][j] == number) {
                    counter++;
                }
            }
        }
        // 用来验证多线程查找的正确性
        System.out.printf("在矩阵中找到了数字:%d,%d次\n", number, counter);
        // 测试的时候,能够放开此代码,能打印出 矩阵分布图。固然须要矩阵10 * 10 比较小的收,控制台才能装得下
        // for (int i = 0; i < data.length; i++) {
        // for (int j = 0; j < data[i].length; j++) {
        // System.out.printf(data[i][j] + " | ");
        // }
        // System.out.println("");
        // }
    }

    public int[] getRow(int row) {
        if (row >= 0 && row < data.length) {
            return data[row];
        }
        return null;
    }
}
相关文章
相关标签/搜索