线程的建立须要开辟虚拟机栈、本地方法栈、程序计数器等线程私有的内存空间,在线程销毁时须要回收这些系统资源。频繁的建立销毁线程会浪费大量资源,使用线程池能够更好的管理和协调线程的工做。缓存
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
…………
}
复制代码
1)构造参数分析:
2)拒绝策略分析: ThreadPoolExecutor中提供了四个RejectedExecutionHandler策略。bash
3)建立线程池的其余方式(不推荐):Executors这个线程池静态工厂能够建立三个线程池的包装对象:ForkJoinPool、ThreadPoolExecutor、ScheduledThreadPoolExecutor。Executors中关于ThreadPoolExecutor的核心方法以下:并发
// SynchronousQueue是不存储元素的阻塞队列,而且maximumPoolSize为Integer.MAX_VALUE
便是无界,当主线程提交任务速度高于CachedThreadPool的处理速度时会不断建立线程,
极端状况下会发生OOM
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
// keepAliveTime为0意味着多余的空闲线程会被马上终止,LinkedBlockingQueue的默认容量
是Integer.MAX_VALUE即无界,极端状况下会发生OOM
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
// LinkedBlockingQueue的默认容量是Integer.MAX_VALUE即无界,极端状况下会发生OOM
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
复制代码
看过了上述方法以后能够发现这三个方法构造出来的线程池都存在OOM的风险。而且不能灵活的配置线程工厂和拒绝策略,因此不推荐使用Executors来建立线程池。ui
4)向线程池提交任务:有两个方法execute()和submit()能够向线程池提交任务。execute()方法用于提交不须要返回值的任务,没法判断任务是否被线程池执行成功。submit()方法用于提交有返回值的任务(Callable)。线程池会返回一个future类型对象,经过future的get()方法能够获取返回值,值得注意的是get()方法会阻塞当前线程直到任务完成。spa
5)关闭线程池:有两个方法shutdown()和shutdownNow()能够关闭线程池。它们的原理是遍历线程池中的工做线程,而后逐个的调用线程的interrupt()方法来中断线程(没法响应中断的线程没法终止)。它们的区别在于shutdownNow()首先将线程池状态设置为STOP,而后尝试中止全部线程;shutdown()是将线程池状态设置为SHOTDOWN,而后中断全部没有正在执行任务的线程。线程
当线程池接收到一个任务以后,执行流程以下图:code
ThreadPoolExecutor执行示意图:cdn
下面是ThreadPoolExecutor中execute()方法的核心代码:对象
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取用于返回线程数和线程池状态的integer数值
int c = ctl.get();
// 一、若是工做线程数小于核心线程数,则建立任务并执行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 二、若是线程池处于RUNNING状态则将任务加入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 三、核心线程池和队列都满了,尝试建立一个新的线程
else if (!addWorker(command, false))
// 四、若是建立失败则执行拒绝策略
reject(command);
}
复制代码
addWorker()主要是建立工做线程 -- 将任务包装成Worker类。在一、3两个步骤中建立线程时须要获取全局锁ReentrantLock避免被干扰,当当前工做线程数大于等于corePoolSize以后几乎全部的execute()都是在执行步骤2。 Worker在执行完任务以后还会循环获取工做队列的任务来执行while (task != null || (task = getTask()) != null)
,getTask()方法中获取阻塞队列中的任务(poll()或take(),若是核心线程会被销毁或者当前线程数大于核心线程数则用poll()超时获取)blog
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
: workQueue.take();
复制代码
线程池的工做原理代码在这里就不具体分析了,下图直观的展现了线程池的工做原理。
想要合理的配置线程池首先须要分析任务特性:CPU密集型任务、IO密集型任务、混合型任务 .
CPU密集型任务:尽可能使用较小的线程池,通常为CPU核心数+1。CPU密集型任务的CPU使用率很高,过多的线程数运行只能增长上下文切换的次数,所以会带来额外的开销。
IO密集型任务:使用稍大的线程池,通常为2*CPU核心数。IO密集型任务CPU使用率并不高,可让CPU在等待IO的时候去处理别的任务,充分利用CPU。
混合型任务:能够将任务分红IO密集型和CPU密集型任务,而后分别用不一样的线程池去处理。只要分完以后两个任务的执行时间相差不大,那么就会比串行执行高效。若是划分以后两个任务执行时间相差甚远,那么最终的时间仍然取决于后执行完的任务,并且还要加上任务拆分与合并的开销。
在线程池的实现中还涉及了不少并发包中的知识好比BlockingQueue、ReentrantLock、Condition等,在这里就暂时不进行介绍了,后续会介绍它们。