通常在生产环境中,咱们都不会直接new一个Thread,而后再去start(),由于这么作会不断频繁的建立线程,销毁线程,过大的线程会耗尽CPU和内存资源,大量的垃圾回收,也会给GC带来压力,延长GC停顿时间.java
一、固定大小线程池算法
public class ThreadPoolDemo { public static class MyTask implements Runnable { public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(1000); }catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { MyTask task = new MyTask(); ExecutorService es = Executors.newFixedThreadPool(5); for (int i = 0;i < 10;i++) { es.submit(task); } es.shutdown(); } }
运行结果:并发
1539134496389:Thread ID:11
1539134496389:Thread ID:12
1539134496389:Thread ID:13
1539134496389:Thread ID:14
1539134496389:Thread ID:15
1539134497390:Thread ID:14
1539134497390:Thread ID:12
1539134497390:Thread ID:15
1539134497390:Thread ID:13
1539134497390:Thread ID:11框架
结果解读:运行结果并非一次刷出来的,而是刷出了5个,中间会停顿1秒,再刷出5个,说明,并行处理是5个线程执行一次,而后再并行处理5个。ide
将Executors.newFixedThreadPool改为Executors.newCachedThreadPool()性能
public class ThreadPoolDemo { public static class MyTask implements Runnable { public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(1000); }catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { MyTask task = new MyTask(); ExecutorService es = Executors.newCachedThreadPool(); for (int i = 0;i < 10;i++) { es.submit(task); } es.shutdown(); } }
结果相同,可是是同时并行处理的,中间没有停顿,说明newCachedThreadPool()是根据须要来分配线程数的。this
二、计划任务线程
newScheduledThreadPool()有两个方法来调用线程对象,scheduleAtFixedRate()跟scheduleWithFixedDelay().他们之间的差异就是scheduleAtFixedRate()总共只占用调度时间,而scheduleWithFixedDelay()占用的是线程执行时间加调度时间.但若是scheduleAtFixedRate()的线程执行时间大于调度时间,也不会出现重复调度(即一个线程尚未执行完,另一个线程会启动),而是一个线程执行完,另外一个线程立刻启动.3d
public class ScheduledExecutorServiceDemo { public static void main(String[] args) { ScheduledExecutorService ses = Executors.newScheduledThreadPool(10); ses.scheduleAtFixedRate(new Runnable() { public void run() { try { long start = System.currentTimeMillis(); Thread.sleep(2000); System.out.println((System.currentTimeMillis() -start) +":" + Thread.currentThread().getName()); }catch (InterruptedException e) { e.printStackTrace(); } } },0,2, TimeUnit.SECONDS); } }
运行结果(部分截取)对象
2001:pool-1-thread-1
2000:pool-1-thread-1
2000:pool-1-thread-2
2000:pool-1-thread-1
2001:pool-1-thread-3
2000:pool-1-thread-2
结果解读:尽管有时间调度,他们依然是不一样的线程来运行的,每显示一条中间停顿2秒(线程运行时间也是2秒)
public class ScheduledExecutorServiceDemo { public static void main(String[] args) { ScheduledExecutorService ses = Executors.newScheduledThreadPool(10); ses.scheduleWithFixedDelay(new Runnable() { public void run() { try { long start = System.currentTimeMillis(); Thread.sleep(2000); System.out.println((System.currentTimeMillis() -start) +":" + Thread.currentThread().getName()); }catch (InterruptedException e) { e.printStackTrace(); } } },0,2, TimeUnit.SECONDS); } }
运行结果与以前相同,可是每显示一条的时间间隔为4秒(线程运行时间依然为2秒),其中2秒为调度时间,2秒为运行时间.
三、核心线程池的内部实现。
其实不管是Executors工厂的哪一种实现,都是调用了同一个类ThreadPoolExecutor,使用了不一样的构造参数罢了.不一样的构造参数能够产生不一样种类的线程池,所以咱们也能够自定义线程池.
JDK实现
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
拒绝策略
当线程池任务数量超过系统实际承载能力时,能够启用拒绝策略。
直接中断策略
public class RejectThreadPoolDemo { public static class MyTask implements Runnable { public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); // new RejectedExecutionHandler() { // public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // System.out.println(r.toString() + " is discard"); // } // }); for (int i = 0;i < Integer.MAX_VALUE;i++) { es.submit(task); Thread.sleep(10); } } }
ThreadPoolExecutor()的最后一个参数为中断策略,上面的new ThreadPoolExecutor.AbortPolicy()为直接中断!
参数说明:
第一个参数corePoolSize:指定了线程池中的线程数量.
第二个参数maximumPoolSize:指定了线程池中的最大线程数量.
第三个参数KeepAliveTime:当线程池线程数量超过了corePoolSize时,多余的空闲线程的存活时间.即超过corePoolSize的空闲线程,在多长时间内会被销毁.
第四个参数unit:keepAliveTime的单位.
第五个参数workQueue:任务队列,被提交但还没有被执行的任务.
1,直接提交的队列:SynchronousQueue,无容量,每个插入操做都要等待一个删除操做,提交的任务不会被真实保存,老是将新任务提交给线程执行.若是没有空闲进程,则尝试建立新的进程.若是进程数量达到最大,则执行拒绝策略.
2,有界的任务队列:ArrayBlockingQueue,必须带一个容量参数,表示该队列的最大容量.当线程池的实际线程数小于corePoolSize,会优先建立新的线程,若大于corePoolSize,则会将新任务加入到等待队列.若等待队列满的时候,没法加入,则在总线程数不大于maximumPoolSize的前提下,建立新的进程执行任务.若大于maximumPoolSize,执行拒绝策略.
3,无界的任务队列:LinkedBlockingQueue,除非系统资源耗尽,不存在任务入队失败的状况.当线程池的实际线程数小于corePoolSize,会优先建立新的线程,若大于corePoolSize,则会将新任务加入到等待队列,若任务的建立和处理的速度差别很大,无界队列会保持快速增加,直到耗尽系统内存.
4,优先任务队列:PriorityBlockingQueue,能够控制任务执行的前后顺序.是一个特殊的无界队列.不管是ArrayBlockingQueue仍是LinkedBlockingQueue都是按照先进先出算法处理任务的,而PriorityBlockingQueue则能够根据任务自身的优先级顺序前后执行,老是确保高优先级的任务先执行.
第六个参数threadFactory:线程工厂,用于建立线程,通常用默认的便可.
第七个参数handler:拒绝策略,当任务太多,来不及处理,如何拒绝任务.
运行结果
1539153799420:Thread ID:11
1539153799430:Thread ID:12
1539153799440:Thread ID:13
1539153799450:Thread ID:14
1539153799460:Thread ID:15
1539153799520:Thread ID:11
1539153799530:Thread ID:12
1539153799540:Thread ID:13
1539153799550:Thread ID:14
1539153799560:Thread ID:15
1539153799621:Thread ID:11
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@45ee12a7 rejected from java.util.concurrent.ThreadPoolExecutor@330bedb4[Running, pool size = 5, active threads = 5, queued tasks = 9, completed tasks = 6]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at com.guanjian.RejectThreadPoolDemo.main(RejectThreadPoolDemo.java:31)
结果解读:因为并发线程数量太大,Integer.MAX_VALUE,咱们线程池的最大线程数只有5个,而无界任务队列LinkedBlockingQueue<Runnable>只有10个,没法知足快速的线程数量增加,拒绝策略发挥做用,抛出异常,阻止系统正常工做.
public class RejectThreadPoolDemo { public static class MyTask implements Runnable { public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); // new RejectedExecutionHandler() { // public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // System.out.println(r.toString() + " is discard"); // } // }); for (int i = 0;i < Integer.MAX_VALUE;i++) { es.submit(task); Thread.sleep(10); } } }
new ThreadPoolExecutor.CallerRunsPolicy()只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务,但性能极有可能会急剧降低.
public class RejectThreadPoolDemo { public static class MyTask implements Runnable { public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy()); // new RejectedExecutionHandler() { // public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // System.out.println(r.toString() + " is discard"); // } // }); for (int i = 0;i < Integer.MAX_VALUE;i++) { es.submit(task); Thread.sleep(10); } } }
ThreadPoolExecutor.DiscardOldestPolicy()该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务.
public class RejectThreadPoolDemo { public static class MyTask implements Runnable { public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy()); // new RejectedExecutionHandler() { // public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // System.out.println(r.toString() + " is discard"); // } // }); for (int i = 0;i < Integer.MAX_VALUE;i++) { es.submit(task); Thread.sleep(10); } } }
ThreadPoolExecutor.DiscardPolicy()丢弃没法处理的任务,不予任何处理.
自定义拒绝策略
public class RejectThreadPoolDemo { public static class MyTask implements Runnable { public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + " is discard"); } }); for (int i = 0;i < Integer.MAX_VALUE;i++) { es.submit(task); Thread.sleep(10); } } }
运行结果:
1539159379178:Thread ID:11
1539159379187:Thread ID:12
1539159379197:Thread ID:13
1539159379207:Thread ID:14
1539159379217:Thread ID:15
1539159379279:Thread ID:11
1539159379288:Thread ID:12
1539159379301:Thread ID:13
1539159379308:Thread ID:14
1539159379318:Thread ID:15
1539159379379:Thread ID:11
1539159379388:Thread ID:12
1539159379401:Thread ID:13
java.util.concurrent.FutureTask@45ee12a7 is discard
1539159379408:Thread ID:14
1539159379418:Thread ID:15
java.util.concurrent.FutureTask@330bedb4 is discard
java.util.concurrent.FutureTask@2503dbd3 is discard
java.util.concurrent.FutureTask@4b67cf4d is discard
java.util.concurrent.FutureTask@7ea987ac is discard
这里只是比ThreadPoolExecutor.DiscardPolicy()多了打印出丢弃的任务.
自定义线程建立
public class RejectThreadPoolDemo { public static class MyTask implements Runnable { public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); System.out.println("create " + t); return t; } }); for (int i = 0; i < 5; i++) { es.submit(task); } Thread.sleep(2000); } }
就是能够本身定义线程,如守护线程等等
运行结果:
create Thread[Thread-0,5,main]
create Thread[Thread-1,5,main]
create Thread[Thread-2,5,main]
create Thread[Thread-3,5,main]
create Thread[Thread-4,5,main]
1539159694414:Thread ID:11
1539159694414:Thread ID:12
1539159694414:Thread ID:13
1539159694414:Thread ID:14
1539159694414:Thread ID:15
扩展线程池
线程池能够扩展出线程执行前,执行后,终止的后续处理
public class ExtThreadPool { public static class MyTask implements Runnable { public String name; public MyTask(String name) { this.name = name; } public void run() { System.out.println("正在执行" + ":Thread ID" + Thread.currentThread().getId() + ",Task Name=" + name); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()) { @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("准备执行:" + ((MyTask)r).name); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("执行完成:" + ((MyTask)r).name); } @Override protected void terminated() { System.out.println("线程池退出!"); } }; for (int i = 0;i < 5;i++) { MyTask task = new MyTask("TASK-GEYM-" + i); es.execute(task); Thread.sleep(10); } es.shutdown(); } }
运行结果:
准备执行:TASK-GEYM-0
正在执行:Thread ID11,Task Name=TASK-GEYM-0
准备执行:TASK-GEYM-1
正在执行:Thread ID12,Task Name=TASK-GEYM-1
准备执行:TASK-GEYM-2
正在执行:Thread ID13,Task Name=TASK-GEYM-2
准备执行:TASK-GEYM-3
正在执行:Thread ID14,Task Name=TASK-GEYM-3
准备执行:TASK-GEYM-4
正在执行:Thread ID15,Task Name=TASK-GEYM-4
执行完成:TASK-GEYM-0
执行完成:TASK-GEYM-1
执行完成:TASK-GEYM-2
执行完成:TASK-GEYM-3
执行完成:TASK-GEYM-4
线程池退出!
在线程池中寻找堆栈
有时候线程执行时会出现Bug,抛出异常,若是使用submit()来提交线程时,不会打印异常信息,而使用execute()来执行线程时能够打印异常信息.
public class DivTask implements Runnable { int a,b; public DivTask(int a,int b) { this.a = a; this.b = b; } public void run() { double re = a / b; System.out.println(re); } public static void main(String[] args) { // ExecutorService pools = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.MILLISECONDS, // new SynchronousQueue<Runnable>()); ExecutorService pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); for (int i = 0;i < 5;i++) { pools.submit(new DivTask(100,i)); } } }
这段代码中,5个并发线程会有一个线程有除0错误
运行结果:
100.0
50.0
33.0
25.0
结果没有任何提示,异常抛出.
public class DivTask implements Runnable { int a,b; public DivTask(int a,int b) { this.a = a; this.b = b; } public void run() { double re = a / b; System.out.println(re); } public static void main(String[] args) { // ExecutorService pools = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.MILLISECONDS, // new SynchronousQueue<Runnable>()); ExecutorService pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); for (int i = 0;i < 5;i++) { pools.execute(new DivTask(100,i)); } } }
运行结果:
100.0
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
50.0
33.0
at com.guanjian.DivTask.run(DivTask.java:18)
25.0
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
有异常抛出
重写跟踪线程池,自定义跟踪
public class TraceThreadPoolExecutor extends ThreadPoolExecutor { public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override public void execute(Runnable task) { super.execute(wrap(task,clientTrace(),Thread.currentThread().getName())); } @Override public Future<?> submit(Runnable task) { return super.submit(wrap(task,clientTrace(),Thread.currentThread().getName())); } private Exception clientTrace() { return new Exception("Client stack trace"); } private Runnable wrap(final Runnable task,final Exception clientStack,String clientThreadName) { return new Runnable() { public void run() { try { task.run(); } catch (Exception e) { clientStack.printStackTrace(); try { throw e; } catch (Exception e1) { e1.printStackTrace(); } } } }; } }
public class DivTask implements Runnable { int a,b; public DivTask(int a,int b) { this.a = a; this.b = b; } public void run() { double re = a / b; System.out.println(re); } public static void main(String[] args) { ExecutorService pools = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>()); // ExecutorService pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.SECONDS, // new SynchronousQueue<Runnable>()); for (int i = 0;i < 5;i++) { pools.execute(new DivTask(100,i)); } } }
运行结果:
100.0
java.lang.Exception: Client stack trace
50.0
at com.guanjian.TraceThreadPoolExecutor.clientTrace(TraceThreadPoolExecutor.java:27)
33.0
25.0
at com.guanjian.TraceThreadPoolExecutor.execute(TraceThreadPoolExecutor.java:18)
at com.guanjian.DivTask.main(DivTask.java:28)
java.lang.ArithmeticException: / by zero
at com.guanjian.DivTask.run(DivTask.java:18)
at com.guanjian.TraceThreadPoolExecutor$1.run(TraceThreadPoolExecutor.java:34)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
这样咱们就能够知道是在哪里出了错.
四、分而治之,Fork/Join框架
将一个大任务拆分红各类较小规模的任务,进行并行处理,也许按照约定条件拆分的任务仍是大于约定条件就继续拆分.有两种线程类型,一种是有返回值的RecursiveTask<T>,一种是没有返回值的RecursiveAction,他们都继承于ForkJoinTask<>,一个带泛型<T>,一个是<Void>.
/** * Created by Administrator on 2018/10/11. * 能够理解成一个Runnable(线程类) */ public class CountTask extends RecursiveTask<Long> { private static final int THRESHOLD = 10000; private long start; private long end; public CountTask(long start, long end) { this.start = start; this.end = end; } /** * 能够理解成run()方法 * @return */ @Override protected Long compute() { long sum = 0; boolean canCompute = (end - start) < THRESHOLD; //最终计算,全部的最终拆分都是在这里计算 if (canCompute) { for (long i = start;i <= end;i++) { sum += i; } }else { //并行计算的规模,拆分红100个并行计算 long step = (start + end) /100; //建立子任务线程集合 List<CountTask> subTasks = new ArrayList<CountTask>(); //每一个并行子任务的开始值 long pos = start; //并行执行100个分叉线程 for (int i = 0;i < 100;i++) { //每一个并行子任务的结束值 long lastOne = pos + step; if (lastOne > end) { lastOne = end; } //创建一个子任务的线程 CountTask subTask = new CountTask(pos,lastOne); //建立下一个并行子任务的开始值 pos += step + 1; //将当前子任务线程添加到线程集合 subTasks.add(subTask); //执行该线程,实际上是一个递归,判断lastOne-pos是否小于THRESHOLD,小于则真正执行,不然继续分叉100个子线程 subTask.fork(); } for (CountTask t:subTasks) { //阻断每一次分叉前的上一级线程进行等待,并将最终并行的结果进行层层累加 sum += t.join(); } } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask task = new CountTask(0,200000L); ForkJoinTask<Long> result = forkJoinPool.submit(task); try { long res = result.get(); System.out.println("sum: " + res); }catch (InterruptedException e) { e.printStackTrace(); }catch (ExecutionException e) { e.printStackTrace(); } } }
运行结果:
sum: 20000100000