ForkJoinPool线程池

1.  拆分线程池的使用场景是什么?java

答: 是对一组连续的数据进行耗时操做,例如 一个 大小为 1000万 的集合 进行操做。json

       例子: 对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会作出一样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,中止这样的分割处理。好比,当元素的数量小于10时,会中止分割,转而使用插入排序对它们进行排序。数组

 

2.   如何使用 ForkJoinPool?
 答: (1)客户端中使用:构建一个任务,将任务推到 线程池中 ide

          (2)构建任务:this

                      2.1 三变量google

                          数组数据spa

                          begin;   //数组数据中开始下标线程

                          end   //数组数据中结束3d

                  2.2   compare中 执行,拆分任务(关键是拆分数据),合并code

    例子(该代码未运行): 

public class RecursiveActionTest extends RecursiveTask<Integer> {
    private static final long serialVersionUID = -3611254198265061729L;
    //阀值(是数组的大小/线程数算出来的)
    public  final int threshold = 0;
    //这儿应该有一个数组
    private int[] bigNum={1,2,3,.....1000000000000};
    private int start;
    private int end;
    private int threadNum=1;

    public RecursiveActionTest(int start, int end) {
        this.start = start;
        this.end = end;
    }
    
   //设置线程数并设置阀值
    public void init(int threadNum){
        this.threadNum=threadNum;
        threshold = end/threadNum;
    }

    //compute 是一个递归任务
    @Override
    protected Integer compute() {
        int sum = 0;

        //若是任务足够小就计算任务
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {    //执行加法任务(任务为 对 数组 start位的数据到end位,求和)
            for (int i = start; i <= end; i++) {
                // 每一个任务为:sum+i
                sum += bigNum[i];
            }
        } else {
            // 若是任务大于阈值,就分裂成10子任务计算,step:每一份的任务数
            int step = (start + end) /threadNum ;
            ArrayList<RecursiveActionTest> subTaskList = new ArrayList<RecursiveActionTest>();
            int pos = start;
            //设置每一个小任务的始起始值和终止值

            for (int i = 0; i < threadNum; i++) {
                int lastOne = pos + step;
                if (lastOne > end) lastOne = end;
                //pos和 lostOne其实对应数组中下标
                RecursiveActionTest subTask = new RecursiveActionTest(pos, lastOne);
                pos += step + 1;
                subTaskList.add(subTask);
                //把子任务推向线程池
                subTask.fork();
            }
            // 等待全部子任务完成,并计算值
            for (RecursiveActionTest task : subTaskList) {
                sum += task.join();
            }
        }

      return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();

        //生成一个计算任务,计算1+2+3+4
        RecursiveActionTest task = new RecursiveActionTest(1, 1000000000000);

        //执行一个任务
        Future<Integer> result = forkjoinPool.submit(task);

        try {
            System.out.println(result.get());
        } catch (Exception e) {
            System.out.println(e);
        }
    }

}

            

                      

 

 

3. 用 forkJoinPool 将 100 万个 User对象 放进一个 JsonArray,用个人电脑双核 用时 5.6 S,用单线程,一个For循环,用时 6.6 S

@Test
    public void test2(){
       JsonArray jsonArray= new JsonArray();
        long start = System.currentTimeMillis();
        System.out.println(start);
        for (int i = 0; i <1000000 ; i++) {
            User user = new User();
            user.setId(i);
            user.setAge("232"+i);
            user.setName("张三");
            JsonObject jObj=new JsonObject();
            jObj.addProperty("id", i);
            jObj.addProperty("age", 20);
            jObj.addProperty("name", "张三");

            jsonArray.add(jObj);
        }
        long middel = System.currentTimeMillis();

        //将 JsonArray中数据转换成集合
        List<User> users = JsonUtils.jsonToList(jsonArray.toString(), User.class);
        long end = System.currentTimeMillis();
        System.out.println("将对象一个一个转换成JsonArray:"+(middel-start));


        System.out.println("JsonArray转换成List"+(end-middel));
 }

 

 

package com.xuecheng.manage_cms;

import com.google.gson.JsonArray;
import com.google.gson.JsonObject;

import java.util.ArrayList;
import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class ForkJoinListTest extends RecursiveTask<JsonArray> {
    private static final long serialVersionUID = -3611254198265061729L;
    //阀值
    public static  int threshold=0 ;
    private int start;
    private int end;
    //线程数
    private int threadNum = 4;
    public ForkJoinListTest(int start, int end) {
        this.start = start;
        this.end = end;


    }

    /**
     * 初始化阀值   只在主线程中调用
     */
    public void init(){

        threshold = end/threadNum;
    }


    @Override
    protected JsonArray compute() {
        JsonArray bigJsonArray= new JsonArray();

        //若是任务足够小就计算任务
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {    //执行加法任务
           // JsonArray jsonArray= new JsonArray();
            for (int i = start; i <= end; i++) {
                User user = new User();
                user.setId(i);
                user.setAge("232"+i);
                user.setName("张三");
                JsonObject jObj=new JsonObject();
                jObj.addProperty("id", i);
                jObj.addProperty("age", 20);
                jObj.addProperty("name", "张三");

                bigJsonArray.add(jObj);
            }
        } else {
            // 若是任务大于阈值,就分裂成10子任务计算,step:每一份的任务数
            int step = (start + end) / threadNum;
            ArrayList<ForkJoinListTest> subTaskList = new ArrayList<ForkJoinListTest>();
            int pos = start;
            //设置每一个小任务的始起始值和终止值

            for (int i = 0; i < threadNum; i++) {
                int lastOne = pos + step;
                if (lastOne > end) lastOne = end;

                ForkJoinListTest subTask = new ForkJoinListTest(pos, lastOne);
                pos += step + 1;
                subTaskList.add(subTask);
                //把子任务推向线程池
                subTask.fork();
            }
            // 等待全部子任务完成,并计算值

            for (ForkJoinListTest task : subTaskList) {

                bigJsonArray.addAll(task.join());
            }
        }

        return bigJsonArray;
    }


    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();

        //生成一个计算任务,计算1+2+3+4
        ForkJoinListTest task = new ForkJoinListTest(1, 1000000);
        //须要初始化阀值
        task.init();
        long begain = System.currentTimeMillis();
        //执行一个任务
        Future<JsonArray> result = forkjoinPool.submit(task);

        try {
            JsonArray jsonElements = result.get();
            long end = System.currentTimeMillis();
           // System.out.println(jsonElements);
            System.out.println("耗时:"+(end-begain));
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}

 

相关文章
相关标签/搜索