线程池整理

通常在生产环境中,咱们都不会直接new一个Thread,而后再去start(),由于这么作会不断频繁的建立线程,销毁线程,过大的线程会耗尽CPU和内存资源,大量的垃圾回收,也会给GC带来压力,延长GC停顿时间.java

一、固定大小线程池算法

public class ThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(1000);
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        MyTask task = new MyTask();
        ExecutorService es = Executors.newFixedThreadPool(5);
        for (int i = 0;i < 10;i++) {
            es.submit(task);
        }
        es.shutdown();
    }
}

运行结果:并发

1539134496389:Thread ID:11
1539134496389:Thread ID:12
1539134496389:Thread ID:13
1539134496389:Thread ID:14
1539134496389:Thread ID:15
1539134497390:Thread ID:14
1539134497390:Thread ID:12
1539134497390:Thread ID:15
1539134497390:Thread ID:13
1539134497390:Thread ID:11框架

结果解读:运行结果并非一次刷出来的,而是刷出了5个,中间会停顿1秒,再刷出5个,说明,并行处理是5个线程执行一次,而后再并行处理5个。ide

将Executors.newFixedThreadPool改为Executors.newCachedThreadPool()性能

public class ThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(1000);
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        MyTask task = new MyTask();
        ExecutorService es = Executors.newCachedThreadPool();
        for (int i = 0;i < 10;i++) {
            es.submit(task);
        }
        es.shutdown();
    }
}

结果相同,可是是同时并行处理的,中间没有停顿,说明newCachedThreadPool()是根据须要来分配线程数的。this

二、计划任务线程

newScheduledThreadPool()有两个方法来调用线程对象,scheduleAtFixedRate()跟scheduleWithFixedDelay().他们之间的差异就是scheduleAtFixedRate()总共只占用调度时间,而scheduleWithFixedDelay()占用的是线程执行时间加调度时间.但若是scheduleAtFixedRate()的线程执行时间大于调度时间,也不会出现重复调度(即一个线程尚未执行完,另一个线程会启动),而是一个线程执行完,另外一个线程立刻启动.3d

public class ScheduledExecutorServiceDemo {
    public static void main(String[] args) {
        ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
        ses.scheduleAtFixedRate(new Runnable() {
            public void run() {
                try {
                    long start = System.currentTimeMillis();
                    Thread.sleep(2000);
                    System.out.println((System.currentTimeMillis() -start) +":" + Thread.currentThread().getName());
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },0,2, TimeUnit.SECONDS);
    }
}

运行结果(部分截取)对象

2001:pool-1-thread-1
2000:pool-1-thread-1
2000:pool-1-thread-2
2000:pool-1-thread-1
2001:pool-1-thread-3
2000:pool-1-thread-2

结果解读:尽管有时间调度,他们依然是不一样的线程来运行的,每显示一条中间停顿2秒(线程运行时间也是2秒)

public class ScheduledExecutorServiceDemo {
    public static void main(String[] args) {
        ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
        ses.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                try {
                    long start = System.currentTimeMillis();
                    Thread.sleep(2000);
                    System.out.println((System.currentTimeMillis() -start) +":" + Thread.currentThread().getName());
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },0,2, TimeUnit.SECONDS);
    }
}

运行结果与以前相同,可是每显示一条的时间间隔为4秒(线程运行时间依然为2秒),其中2秒为调度时间,2秒为运行时间.

三、核心线程池的内部实现。

其实不管是Executors工厂的哪一种实现,都是调用了同一个类ThreadPoolExecutor,使用了不一样的构造参数罢了.不一样的构造参数能够产生不一样种类的线程池,所以咱们也能够自定义线程池.

JDK实现

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

拒绝策略

当线程池任务数量超过系统实际承载能力时,能够启用拒绝策略。

直接中断策略

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
//                new RejectedExecutionHandler() {
//                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//                        System.out.println(r.toString() + " is discard");
//                    }
//                });
        for (int i = 0;i < Integer.MAX_VALUE;i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

ThreadPoolExecutor()的最后一个参数为中断策略,上面的new ThreadPoolExecutor.AbortPolicy()为直接中断!

参数说明:

第一个参数corePoolSize:指定了线程池中的线程数量.

第二个参数maximumPoolSize:指定了线程池中的最大线程数量.

第三个参数KeepAliveTime:当线程池线程数量超过了corePoolSize时,多余的空闲线程的存活时间.即超过corePoolSize的空闲线程,在多长时间内会被销毁.

第四个参数unit:keepAliveTime的单位.

第五个参数workQueue:任务队列,被提交但还没有被执行的任务.

1,直接提交的队列:SynchronousQueue,无容量,每个插入操做都要等待一个删除操做,提交的任务不会被真实保存,老是将新任务提交给线程执行.若是没有空闲进程,则尝试建立新的进程.若是进程数量达到最大,则执行拒绝策略.

2,有界的任务队列:ArrayBlockingQueue,必须带一个容量参数,表示该队列的最大容量.当线程池的实际线程数小于corePoolSize,会优先建立新的线程,若大于corePoolSize,则会将新任务加入到等待队列.若等待队列满的时候,没法加入,则在总线程数不大于maximumPoolSize的前提下,建立新的进程执行任务.若大于maximumPoolSize,执行拒绝策略.

3,无界的任务队列:LinkedBlockingQueue,除非系统资源耗尽,不存在任务入队失败的状况.当线程池的实际线程数小于corePoolSize,会优先建立新的线程,若大于corePoolSize,则会将新任务加入到等待队列,若任务的建立和处理的速度差别很大,无界队列会保持快速增加,直到耗尽系统内存.

4,优先任务队列:PriorityBlockingQueue,能够控制任务执行的前后顺序.是一个特殊的无界队列.不管是ArrayBlockingQueue仍是LinkedBlockingQueue都是按照先进先出算法处理任务的,而PriorityBlockingQueue则能够根据任务自身的优先级顺序前后执行,老是确保高优先级的任务先执行.

第六个参数threadFactory:线程工厂,用于建立线程,通常用默认的便可.

第七个参数handler:拒绝策略,当任务太多,来不及处理,如何拒绝任务.

运行结果

1539153799420:Thread ID:11
1539153799430:Thread ID:12
1539153799440:Thread ID:13
1539153799450:Thread ID:14
1539153799460:Thread ID:15
1539153799520:Thread ID:11
1539153799530:Thread ID:12
1539153799540:Thread ID:13
1539153799550:Thread ID:14
1539153799560:Thread ID:15
1539153799621:Thread ID:11
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@45ee12a7 rejected from java.util.concurrent.ThreadPoolExecutor@330bedb4[Running, pool size = 5, active threads = 5, queued tasks = 9, completed tasks = 6]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at com.guanjian.RejectThreadPoolDemo.main(RejectThreadPoolDemo.java:31)

结果解读:因为并发线程数量太大,Integer.MAX_VALUE,咱们线程池的最大线程数只有5个,而无界任务队列LinkedBlockingQueue<Runnable>只有10个,没法知足快速的线程数量增加,拒绝策略发挥做用,抛出异常,阻止系统正常工做.

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
//                new RejectedExecutionHandler() {
//                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//                        System.out.println(r.toString() + " is discard");
//                    }
//                });
        for (int i = 0;i < Integer.MAX_VALUE;i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

new ThreadPoolExecutor.CallerRunsPolicy()只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务,但性能极有可能会急剧降低.

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy());
//                new RejectedExecutionHandler() {
//                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//                        System.out.println(r.toString() + " is discard");
//                    }
//                });
        for (int i = 0;i < Integer.MAX_VALUE;i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

ThreadPoolExecutor.DiscardOldestPolicy()该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务.

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
//                new RejectedExecutionHandler() {
//                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//                        System.out.println(r.toString() + " is discard");
//                    }
//                });
        for (int i = 0;i < Integer.MAX_VALUE;i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

ThreadPoolExecutor.DiscardPolicy()丢弃没法处理的任务,不予任何处理.

自定义拒绝策略

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(),
                new RejectedExecutionHandler() {
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println(r.toString() + " is discard");
                    }
                });
        for (int i = 0;i < Integer.MAX_VALUE;i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

运行结果:

1539159379178:Thread ID:11
1539159379187:Thread ID:12
1539159379197:Thread ID:13
1539159379207:Thread ID:14
1539159379217:Thread ID:15
1539159379279:Thread ID:11
1539159379288:Thread ID:12
1539159379301:Thread ID:13
1539159379308:Thread ID:14
1539159379318:Thread ID:15
1539159379379:Thread ID:11
1539159379388:Thread ID:12
1539159379401:Thread ID:13
java.util.concurrent.FutureTask@45ee12a7 is discard
1539159379408:Thread ID:14
1539159379418:Thread ID:15
java.util.concurrent.FutureTask@330bedb4 is discard
java.util.concurrent.FutureTask@2503dbd3 is discard
java.util.concurrent.FutureTask@4b67cf4d is discard
java.util.concurrent.FutureTask@7ea987ac is discard

这里只是比ThreadPoolExecutor.DiscardPolicy()多了打印出丢弃的任务.

自定义线程建立

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactory() {
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setDaemon(true);
                        System.out.println("create " + t);
                        return t;
                    }
                });
        for (int i = 0; i < 5; i++) {
            es.submit(task);
        }
        Thread.sleep(2000);
    }
}

就是能够本身定义线程,如守护线程等等

运行结果:

create Thread[Thread-0,5,main]
create Thread[Thread-1,5,main]
create Thread[Thread-2,5,main]
create Thread[Thread-3,5,main]
create Thread[Thread-4,5,main]
1539159694414:Thread ID:11
1539159694414:Thread ID:12
1539159694414:Thread ID:13
1539159694414:Thread ID:14
1539159694414:Thread ID:15

扩展线程池

线程池能够扩展出线程执行前,执行后,终止的后续处理

public class ExtThreadPool {
    public static class MyTask implements Runnable {
        public String name;
        public MyTask(String name) {
            this.name = name;
        }
        public void run() {
            System.out.println("正在执行" + ":Thread ID" + Thread.currentThread().getId() + ",Task Name=" + name);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行:" + ((MyTask)r).name);
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("执行完成:" + ((MyTask)r).name);
            }

            @Override
            protected void terminated() {
                System.out.println("线程池退出!");
            }
        };
        for (int i = 0;i < 5;i++) {
            MyTask task = new MyTask("TASK-GEYM-" + i);
            es.execute(task);
            Thread.sleep(10);
        }
        es.shutdown();
    }
}

运行结果:

准备执行:TASK-GEYM-0
正在执行:Thread ID11,Task Name=TASK-GEYM-0
准备执行:TASK-GEYM-1
正在执行:Thread ID12,Task Name=TASK-GEYM-1
准备执行:TASK-GEYM-2
正在执行:Thread ID13,Task Name=TASK-GEYM-2
准备执行:TASK-GEYM-3
正在执行:Thread ID14,Task Name=TASK-GEYM-3
准备执行:TASK-GEYM-4
正在执行:Thread ID15,Task Name=TASK-GEYM-4
执行完成:TASK-GEYM-0
执行完成:TASK-GEYM-1
执行完成:TASK-GEYM-2
执行完成:TASK-GEYM-3
执行完成:TASK-GEYM-4
线程池退出!

在线程池中寻找堆栈

有时候线程执行时会出现Bug,抛出异常,若是使用submit()来提交线程时,不会打印异常信息,而使用execute()来执行线程时能够打印异常信息.

public class DivTask implements Runnable {
    int a,b;
    public DivTask(int a,int b) {
        this.a = a;
        this.b = b;
    }
    public void run() {
        double re = a / b;
        System.out.println(re);
    }

    public static void main(String[] args) {
//        ExecutorService pools = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.MILLISECONDS,
//                new SynchronousQueue<Runnable>());
        ExecutorService pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
        for (int i = 0;i < 5;i++) {
            pools.submit(new DivTask(100,i));
        }
    }
}

这段代码中,5个并发线程会有一个线程有除0错误

运行结果:

100.0
50.0
33.0
25.0

结果没有任何提示,异常抛出.

public class DivTask implements Runnable {
    int a,b;
    public DivTask(int a,int b) {
        this.a = a;
        this.b = b;
    }
    public void run() {
        double re = a / b;
        System.out.println(re);
    }

    public static void main(String[] args) {
//        ExecutorService pools = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.MILLISECONDS,
//                new SynchronousQueue<Runnable>());
        ExecutorService pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
        for (int i = 0;i < 5;i++) {
            pools.execute(new DivTask(100,i));
        }
    }
}

运行结果:

100.0
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
50.0
33.0
    at com.guanjian.DivTask.run(DivTask.java:18)
25.0
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

有异常抛出

重写跟踪线程池,自定义跟踪

public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
    public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    public void execute(Runnable task) {
        super.execute(wrap(task,clientTrace(),Thread.currentThread().getName()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(wrap(task,clientTrace(),Thread.currentThread().getName()));
    }

    private Exception clientTrace() {
        return new Exception("Client stack trace");
    }

    private Runnable wrap(final Runnable task,final Exception clientStack,String clientThreadName) {
        return new Runnable() {
            public void run() {
                try {
                    task.run();
                } catch (Exception e) {
                    clientStack.printStackTrace();
                    try {
                        throw e;
                    } catch (Exception e1) {
                        e1.printStackTrace();
                    }
                }
            }
        };
    }
}
public class DivTask implements Runnable {
    int a,b;
    public DivTask(int a,int b) {
        this.a = a;
        this.b = b;
    }
    public void run() {
        double re = a / b;
        System.out.println(re);
    }

    public static void main(String[] args) {
        ExecutorService pools = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.MILLISECONDS,
                new SynchronousQueue<Runnable>());
//        ExecutorService pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.SECONDS,
//                new SynchronousQueue<Runnable>());
        for (int i = 0;i < 5;i++) {
            pools.execute(new DivTask(100,i));
        }
    }
}

运行结果:

100.0
java.lang.Exception: Client stack trace
50.0
    at com.guanjian.TraceThreadPoolExecutor.clientTrace(TraceThreadPoolExecutor.java:27)
33.0
25.0
    at com.guanjian.TraceThreadPoolExecutor.execute(TraceThreadPoolExecutor.java:18)
    at com.guanjian.DivTask.main(DivTask.java:28)
java.lang.ArithmeticException: / by zero
    at com.guanjian.DivTask.run(DivTask.java:18)
    at com.guanjian.TraceThreadPoolExecutor$1.run(TraceThreadPoolExecutor.java:34)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

这样咱们就能够知道是在哪里出了错.

四、分而治之,Fork/Join框架

将一个大任务拆分红各类较小规模的任务,进行并行处理,也许按照约定条件拆分的任务仍是大于约定条件就继续拆分.有两种线程类型,一种是有返回值的RecursiveTask<T>,一种是没有返回值的RecursiveAction,他们都继承于ForkJoinTask<>,一个带泛型<T>,一个是<Void>.

/**
 * Created by Administrator on 2018/10/11.
 * 能够理解成一个Runnable(线程类)
 */
public class CountTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 10000;
    private long start;
    private long end;

    public CountTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    /**
     * 能够理解成run()方法
     * @return
     */
    @Override
    protected Long compute() {
        long sum = 0;
        boolean canCompute = (end - start) < THRESHOLD;
        //最终计算,全部的最终拆分都是在这里计算
        if (canCompute) {
            for (long i = start;i <= end;i++) {
                sum += i;
            }
        }else {
            //并行计算的规模,拆分红100个并行计算
            long step = (start + end) /100;
            //建立子任务线程集合
            List<CountTask> subTasks = new ArrayList<CountTask>();
            //每一个并行子任务的开始值
            long pos = start;
            //并行执行100个分叉线程
            for (int i = 0;i < 100;i++) {
                //每一个并行子任务的结束值
                long lastOne = pos + step;
                if (lastOne > end) {
                    lastOne = end;
                }
                //创建一个子任务的线程
                CountTask subTask = new CountTask(pos,lastOne);
                //建立下一个并行子任务的开始值
                pos += step + 1;
                //将当前子任务线程添加到线程集合
                subTasks.add(subTask);
                //执行该线程,实际上是一个递归,判断lastOne-pos是否小于THRESHOLD,小于则真正执行,不然继续分叉100个子线程
                subTask.fork();
            }
            for (CountTask t:subTasks) {
                //阻断每一次分叉前的上一级线程进行等待,并将最终并行的结果进行层层累加
                sum += t.join();
            }
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask task = new CountTask(0,200000L);
        ForkJoinTask<Long> result = forkJoinPool.submit(task);
        try {
            long res = result.get();
            System.out.println("sum: " + res);
        }catch (InterruptedException e) {
            e.printStackTrace();
        }catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

运行结果:

sum: 20000100000

相关文章
相关标签/搜索