在开发程序的过程当中,不少时候咱们会遇到遇到批量执行任务的场景,当各个具体任务之间互相独立并不依赖其余任务的时候,咱们会考虑使用并发的方式,将各个任务分散到不一样的线程中进行执行来提升任务的执行效率。java
咱们会想到为每一个任务都分配一个线程,可是这样的作法存在很大的问题:程序员
一、资源消耗:首先当任务数量庞大的时候,大量线程会占据大量的系统资源,特别是内存,当线程数量大于CPU可用数量时,空闲线程会浪费形成内存的浪费,并加大GC的压力,大量的线程甚至会直接致使程序的内存溢出,并且大量线程在竞争CPU的时候会带来额外的性能开销。若是CPU已经足够忙碌,再多的线程不只不会提升性能,反而会下降性能。web
二、线程生命周期的开销:线程的建立和销毁都是有代价的,线程的建立须要时间、延迟处理的请求、须要JVM和操做系统提供一些辅助操做。若是请求特别庞大,而且任务的执行特别轻量级(好比只是计算1+1),那么对比下来建立和销毁线程代价就太昂贵了。面试
三、稳定性:如资源消耗中所说,若是程序由于大量的线程抛出OutOfMemoryEorror,会致使程序极大的不稳定。编程
既然为每一个任务分配一个线程的作法已经不可行,咱们考虑的代替方法中就必须考虑到,一、线程不能不能无限制建立,数量必须有一个合适的上限。二、线程的建立开销昂贵,那咱们能够考虑重用这些线程。理所固然,池化技术是一项比较容易想到的替代方案(马后炮),线程的池化管理就叫线程池。缓存
ThreadPoolExecutor的关系图简单以下。并发
简单介绍一些Executor、ExecutorService、AbstractExectorService。函数
Executor接口比较简单:性能
1 public interface Executor { 2 void execute(Runnable command); 3 }
该接口只有一个方法,即任务的执行。学习
ExecutorService在Executor接口上,添加了管理生命周期的方法、支持了Callable类型的任务、任务的执行方式。
AbstractExecutorService是一个抽象类,实现了ExecutorService的任务执行方法,添加newTaskFor方法做为钩子对外提供任务的取消通道,可是AbstractExecutorService并无实现生命周期管理相关的方法,而是将生命周期相关的操做丢给了子类。
线程池有多种构造器,参数最完整的构造器以下:
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue, 6 ThreadFactory threadFactory, 7 RejectedExecutionHandler handler) { 8 if (corePoolSize < 0 || 9 maximumPoolSize <= 0 || 10 maximumPoolSize < corePoolSize || 11 keepAliveTime < 0) 12 throw new IllegalArgumentException(); 13 if (workQueue == null || threadFactory == null || handler == null) 14 throw new NullPointerException(); 15 this.corePoolSize = corePoolSize; 16 this.maximumPoolSize = maximumPoolSize; 17 this.workQueue = workQueue; 18 this.keepAliveTime = unit.toNanos(keepAliveTime); 19 this.threadFactory = threadFactory; 20 this.handler = handler; 21 }
corePoolSize:核心线程数量。当对线程池中空闲线程进行回收的时候。假设线程池中线程的数量小于corePoolSize,则不会对线程进行回收。若是线程由于异常缘由退出,若是线程退出后线程池的线程数量小于corePoolSize,则会对线程池添加一个线程。
maximumPoolSize:线程池的最大大小。当线程池中任务已经溢出,若是线程数量已经等于maximunPoolSize,线程池也不会在添加线程。
keepAliveTime:线程的空闲时间。若是线程池的线程数量已经大于corePoolSize,当线程空闲时间超过空闲时间,则该线程会被回收。
unit:线程空闲时间的时间单位。能够选择纳秒、微秒、毫秒、秒、分、小时、天为单位。
workQueue:工做队列。用于存储交付给线程池的任务。能够选择BlockingQueue的实现类来充当线程池的工做队列,newFixThreadExecutor和newSingleThreadExecutor默认采用的是无界的LinkedBlockingQueue来充当工做队列。更为稳妥的方式是选择一种有界的工做队列来存储。例若有界的LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue来充当消息队列,防止由于任务无止境的堆积致使内存溢出。newCachedThreadPool使用的是SynchronousQueue来充当队列,SynchronousQueue不是一个真正的消息队列,而已一个任务在线程正当中的移交机制。通常只有在线程池能够无限大,或者线程池能够拒绝任务的状况下使用SynchronousQueue。
threadFactory:线程工厂。每当线程池须要建立一个线程时,能够经过线程的工厂的new Thread方法来建立线程。能够经过自定义一个ThreadFactory来实现对线程的定制。
handler:拒绝机制。当线程池由于工做池已经饱和,准备拒绝任务时候。会调用RejectedExecutionHandler来拒绝该任务。Jdk提供了几种不一样的RejectedExecutionHandler实现,每种实现都包含不一样的饱和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy。
ThreadLocalPool的池的大小设置,《Java并发编程实战》书中给了一个推荐的设置值。
Ncpu为CPU的数量,Ucpu为CPU的利用率,W/C为任务的等待时间 / 任务的计算时间。在这种状况下,通常线程池的最优大小:
N=Ncpu*Ucpu*(1+W/C)
线程池建立也可使用Executors来建立:
newFixedThreadPool:建立一个固定长度的线程池,每当提交一个任务就建立一个线程,直到达到最大线程数。若是由于异常致使未预期的异常结束。线程池将补充一个线程。
1 public static ExecutorService newFixedThreadPool(int nThreads) { 2 return new ThreadPoolExecutor(nThreads, nThreads, 3 0L, TimeUnit.MILLISECONDS, 4 new LinkedBlockingQueue<Runnable>()); 5 }
newCacheThreadPool:建立一个可缓存的线程池。该线程池核心线程数为0,最大线程为Integer.max_value。能够理解为该线程池规模没有任何限制。
1 public static ExecutorService newCachedThreadPool() { 2 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3 60L, TimeUnit.SECONDS, 4 new SynchronousQueue<Runnable>()); 5 }
newScheduledThreadPool:建立一个固定长度的线程池,已延迟或者定时方式来执行任务,相似于Timer。
1 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 2 return new ScheduledThreadPoolExecutor(corePoolSize); 3 }
newSingleThreadExecutor:建立一个单线程的Executor来执行任务,能确保线程的执行顺序,例如FIFO、LIFO、优先顺序等。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); }
newWorkStealingPool:根据给定的并行等级,建立一个拥有足够的线程数目的线程池。
1 public static ExecutorService newWorkStealingPool(int parallelism) { 2 return new ForkJoinPool 3 (parallelism, 4 ForkJoinPool.defaultForkJoinWorkerThreadFactory, 5 null, true); 6 }
ThreadPoolExecutor中有一个ctl变量。ctl是一个32位的二级制数,其中高3位用于表示线程池的状态,低29位表示线程池中的活动线程。
1 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 2 private static final int COUNT_BITS = Integer.SIZE - 3; 3 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 4 5 6 private static final int RUNNING = -1 << COUNT_BITS; 7 private static final int SHUTDOWN = 0 << COUNT_BITS; 8 private static final int STOP = 1 << COUNT_BITS; 9 private static final int TIDYING = 2 << COUNT_BITS; 10 private static final int TERMINATED = 3 << COUNT_BITS;
如上代码所示,线程池有五种状态。RUNNING、SHUTDOWN、STOP、TIDYING、TERMINNATED。幸亏ThreadPoolExecutor的代码上有对应注释,看着这些注释能对ThreadPoolExecutor的状态做用和状态流转能有一个大体的了解。
RUNNING:在线程池建立的时候,线程池默认处于RUNNING状态。当线程池处于RUNNING状态的时候,任务队列能够接受任务,而且能够执行QUEUE中任务。
SHUTDOWN:不接受新任务,可是会继续执行QUEUE中的任务。
STOP:不接受新任务,也不执行QUEUE中的任务。
TIDYING:全部的任务都停止了,没有活动中的线程。当线程池进行该状态时候,会执行钩子方法terminated() 。
如下是各个状态对应的流转图:
上面有说过,ExecutorService在Executor接口上,添加了管理生命周期的方法。在ThreadPoolExecutor中,主要关闭动做有三个shutdown()、shutdownNow()、awaitTermination()。
shutdown()是一个平缓的关闭方式,线程池被调用了shutdown函数若是还有事作就会把状态设为SHUTDOWN,可是不会真的停止。
1 public void shutdown() { 2 final ReentrantLock mainLock = this.mainLock; 3 mainLock.lock(); 4 try { 5 //检查是否有关闭线程的权限 6 checkShutdownAccess(); 7 //检查线程池状态、小于SHUTDOWN的用CAS的方式将线程池状态设置为SHUTDOWN 8 advanceRunState(SHUTDOWN); 9 //打断没事作的线程 10 interruptIdleWorkers() 11 //这个是ScheduledThreadPoolExecutor中用到的不,ThreadPoolExecutor中是个空的 12 onShutdown(); // hook for ScheduledThreadPoolExecutor 13 } finally { 14 mainLock.unlock(); 15 } 16 //尝试停止,若是还有事作就不会停止 17 tryTerminate(); 18 }
shutdownNow()跟shutdown()类似,可是shutdownNow()比起shutdown()更加粗暴。无论线程池中的线程有没有事作,直接把线程打断。而且状态会设置为STOP。状态设置为STOP后也表示无视任务队列里面是否是还有任务。shutdownNow()由于会关闭已经开始执行可是还没有结束的任务,因此使用shutdownNow()的时候若是须要知道每一个任务被放弃时候的状态,就必须拓展任务,记录清楚任务中未成功执行完成的任务。
1 public List<Runnable> shutdownNow() { 2 List<Runnable> tasks; 3 final ReentrantLock mainLock = this.mainLock; 4 mainLock.lock(); 5 try { 6 checkShutdownAccess(); 7 //检查线程池状态、小于SHUTDOWN的用CAS的方式将线程池状态设置为SHUTDOWN 8 advanceRunState(STOP); 9 //强行打断 10 interruptWorkers(); 11 tasks = drainQueue(); 12 } finally { 13 mainLock.unlock(); 14 } 15 tryTerminate(); 16 return tasks; 17 }
awaitTermination(long timeout, TimeUnit unit)方法,用于进行等待,假设传入时间为60s,若是60s以后ThreadPoolExecutor状态变为TERMINATED,则返回ture,若是状态不为TERMINATED,则会返回false。一般调用玩shutdown()后会使用awaitTermination方法进行等待,确认线程池已经停止。
1 public boolean awaitTermination(long timeout, TimeUnit unit) 2 throws InterruptedException { 3 long nanos = unit.toNanos(timeout); 4 final ReentrantLock mainLock = this.mainLock; 5 mainLock.lock(); 6 try { 7 for (;;) { 8 if (runStateAtLeast(ctl.get(), TERMINATED)) 9 return true; 10 if (nanos <= 0) 11 return false; 12 nanos = termination.awaitNanos(nanos); 13 } 14 } finally { 15 mainLock.unlock(); 16 } 17 }
以上几个方法能够对线程池的状态进行操做。线程池还提供了isShutdown(),isTerminating(),isTerminated()对线程池的状态进行查询。
当咱们要将一个任务提交给线程池时,通常调用的线程池的execute(Runnable command)方法。简单分析一下这个方法:
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 int c = ctl.get(); 5 //如何活动线程数量小于核心线程数量,则添加线程来处理该任务 6 if (workerCountOf(c) < corePoolSize) { 7 if (addWorker(command, true)) 8 return; 9 c = ctl.get(); 10 } 11 //若是线程池在running状态,而且往任务队列里推送任务成功: 12 if (isRunning(c) && workQueue.offer(command)) { 13 int recheck = ctl.get(); 14 //二次检查线程池已经关闭,任务队列删除任务,并拒绝任务 15 if (! isRunning(recheck) && remove(command)) 16 reject(command); 17 //若是工做线程数为0,由于只有当corePoolSize==0的状况下才能走到这里,则此时添加一个非核心的工做者 18 else if (workerCountOf(recheck) == 0) 19 addWorker(null, false); 20 } 21 //走到这边,表示任务推送失败或者线程池已经关闭,添加工做线程,若是线程池已经关闭会返回false,则拒绝该任务 22 else if (!addWorker(command, false)) 23 reject(command); 24 }
咱们从这里能够看出来,当线程池中的活动线程大于或等于核心线程的时候,线程池是不会立刻建立新的线程来执行任务的。只有线程池在任务队列中推送任务失败(任务队列已经满了)的时候才会建立额外的线程来执行任务。若是线程池已经关闭,或者任务队列和工做者已经满了的时候,线程池会开始拒绝任务。reject(command)会用上面说过的RejectedExecutionHandler来对任务进行拒绝。
这里的Worker是ThreadPoolExecutor的内部类,封装Thread类。它的核心方法也就是run()方法。咱们来看一下Worker的run()方法,run()方法就是runWork()方法封装一下。这里的This值的是Worker本身。
1 public void run() { 2 runWorker(this); 3 }
这个是工做者的工做方法。
1 final void runWorker(ThreadPoolExecutor.Worker w) { 2 Thread wt = Thread.currentThread(); 3 Runnable task = w.firstTask; 4 w.firstTask = null; 5 w.unlock(); // allow interrupts 6 boolean completedAbruptly = true; 7 try { 8 //循环获取任务,getTask()会阻塞的从任务队列里拿任务, 9 while (task != null || (task = getTask()) != null) { 10 w.lock(); 11 //判断线程池和线程的状态,是能够继续执行任务的 12 if ((runStateAtLeast(ctl.get(), STOP) || 13 (Thread.interrupted() && 14 runStateAtLeast(ctl.get(), STOP))) && 15 !wt.isInterrupted()) 16 wt.interrupt(); 17 try { 18 //可拓展接口,任务执行前的动做 19 beforeExecute(wt, task); 20 Throwable thrown = null; 21 try { 22 //任务执行没啥好说 23 task.run(); 24 } catch (RuntimeException x) { 25 thrown = x; throw x; 26 } catch (Error x) { 27 thrown = x; throw x; 28 } catch (Throwable x) { 29 thrown = x; throw new Error(x); 30 } finally { 31 //可拓展接口,任务执行前的动做 32 afterExecute(task, thrown); 33 } 34 } finally { 35 task = null; 36 w.completedTasks++; 37 w.unlock(); 38 } 39 } 40 completedAbruptly = false; 41 } finally { 42 //任务退出循环,根据是异常退出仍是正常退出进行收尾 43 //对工做任务进行回收也在这里 44 processWorkerExit(w, completedAbruptly); 45 } 46 }
从队列中获取任务。
1 private Runnable getTask() { 2 boolean timedOut = false; // Did the last poll() time out? 3 4 for (;;) { 5 int c = ctl.get(); 6 int rs = runStateOf(c); 7 8 // 检查线程池状态和队列是否为空,若是没任务可搞直接返回 9 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 10 decrementWorkerCount(); 11 return null; 12 } 13 14 int wc = workerCountOf(c); 15 16 //判断线程池是否须要提出线程 17 // timed参数用于判断是否须要根据超时时间回收线程, 18 //若是容许核心线程回收或者线程数已经超过核心线程数,则为ture 19 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 20 21 //工做者太多或者已经超时则干掉 22 if ((wc > maximumPoolSize || (timed && timedOut)) 23 && (wc > 1 || workQueue.isEmpty())) { 24 if (compareAndDecrementWorkerCount(c)) 25 return null; 26 continue; 27 } 28 29 try { 30 //根据上面的判断,让工做者线程阻塞读取直到被打断或者超时返回 31 Runnable r = timed ? 32 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 33 workQueue.take(); 34 if (r != null) 35 return r; 36 timedOut = true; 37 } catch (InterruptedException retry) { 38 timedOut = false; 39 } 40 } 41 }
线程工做者退出。
1 private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) { 2 //若是不是由于异常缘由致使线程退出,则不要进行Worker数量调整 3 if (completedAbruptly) 4 decrementWorkerCount(); 5 6 final ReentrantLock mainLock = this.mainLock; 7 mainLock.lock(); 8 try { 9 completedTaskCount += w.completedTasks; 10 //从工做队列中删除,让JVM能够对Worker进行回收 11 workers.remove(w); 12 } finally { 13 mainLock.unlock(); 14 } 15 //尝试停止线程池 16 tryTerminate(); 17 18 int c = ctl.get(); 19 20 //线程池若是还在跑,线程异常退出,须要补充工做者,就对工做者进行补充。 21 if (runStateLessThan(c, STOP)) { 22 if (!completedAbruptly) { 23 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; 24 if (min == 0 && ! workQueue.isEmpty()) 25 min = 1; 26 if (workerCountOf(c) >= min) 27 return; // replacement not needed 28 } 29 addWorker(null, false); 30 } 31 }
上面的代码我就不细讲, 主要的流程就写了注释在上面。当年第一次据说线程池会回收空闲线程的时候就会好奇这个操做是怎么搞的,上面代码的workqueue.poll()就是关键,当线程能够回收,而且线程阻塞已经超时,则进行线程回收。
写这篇博客的时候,心情比较烦躁。遵从朋友建议恶搞部分标题名,果真心情好不少。之后能够考虑在内容没歧义的前提下,文章部分也这么写。线程池在java中算是比较基础的内容,入行以来面试也被面了很多,可是一直没看过源码,最近看了一下发现确实学习到了很多东西,部分看懂了,部分由于水平不够没看懂的东西,部分看明白后有种还能够这么写的感慨。果真JAVA程序员要多看看JDK源码。