接上文:java线程池的原理学习java
ThreadPoolExecutor
,线程池类,继承自 AbstractExecutorService
segmentfault
public class ThreadPoolExecutor extends AbstractExecutorService
ThreadPoolExecutor
提供了四种构造方法实现(这里只介绍一种):并发
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
有必要对每一个参数解释一下:工具
corePoolSize
- 池中所保存的线程数,包括空闲线程。学习
maximumPoolSize
- 池中容许的最大线程数。this
keepAliveTime
- 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。线程
unit
- keepAliveTime
参数的时间单位。日志
workQueue
- 执行前用于保持任务的队列。此队列仅保持由 execute
方法提交的 Runnable
任务。code
threadFactory
- 执行程序建立新线程时使用的工厂。orm
handler
- 因为超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
为了便于跨大量上下文使用,此类提供了不少可调整的参数和扩展钩子 (hook)。jdk文档中建议在一般状况下,使用 Executors
提供的工厂方法配置,也就是提供好了的线程池。若非要手动配置,须要遵循如下规则:
ThreadPoolExecutor
将根据corePoolSize
和maximumPoolSize
设置的边界自动调整池大小。当新任务在方法execute(java.lang.Runnable)
中提交时:
1.运行的线程少于corePoolSize
,则建立新线程来处理请求,即便其余辅助线程是空闲的。
2:运行的线程多于corePoolSize
而少于maximumPoolSize
,则把任务放进队列,由空闲线程从队列中取任务,仅当队列满时才建立新线程。
3:若是设置的corePoolSize
和maximumPoolSize
相同,则建立了固定大小的线程池。
4:若是将maximumPoolSize
设置为基本的无界值(如Integer.MAX_VALUE
),则容许池适应任意数量的并发任务。
还要注意如下两点:
在大多数状况下,核心和最大池大小仅基于构造器来设置,不过也可使用 setCorePoolSize(int)
和 setMaximumPoolSize(int)
进行动态更改。
当池中的线程数大于 corePoolSize
的时候,多余的线程会等待 keepAliveTime
长的时间,若是无请求可处理就自行销毁。
使用
ThreadFactory
建立新线程。若是没有另外说明,则在同一个 ThreadGroup 中一概使用Executors.defaultThreadFactory()
建立线程,而且这些线程具备相同的NORM_PRIORITY
优先级和非守护进程状态。经过提供不一样的ThreadFactory
,能够改变线程的名称、线程组、优先级、守护进程状态,等等。若是从newThread
返回 null 时ThreadFactory
未能建立线程,则执行程序将继续运行,但不能执行任何任务。
ThreadFactory
是线程工厂,它是一个接口:
public interface ThreadFactory { Thread newThread(Runnable r); }
ThreadPoolExecutor
中的 threadFactory
是由 Executors
工具类提供的:
public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); }
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { ////建立的线程以“pool-N-thread-M”命名,N是该工厂的序号,M是线程号 Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); //设为非后台线程 if (t.isDaemon()) t.setDaemon(false); //优先级为normal if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
DefaultThreadFactory
是一个静态内部类
前面说到,当线程池中运行的线程等于或多于 corePoolSize
,则 Executor
始终首选将请求加入队列,而不添加新的线程,将任务加入队列有三种策略(具体参见jdk文档)。
两种状况下,新提交的任务将会被拒绝:
当 Executor
已经关闭
Executor
将有限边界用于最大线程和工做队列容量,且已经饱和
被拒绝的任务, execute
方法都将调用其 RejectedExecutionHandler
的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor)
方法。下面提供了四种预约义的处理程序策略:
在默认的
ThreadPoolExecutor.AbortPolicy
中,处理程序遭到拒绝将抛出运行时RejectedExecutionException
。
在ThreadPoolExecutor.CallerRunsPolicy
中,线程调用运行该任务的execute
自己。此策略提供简单的反馈控制机制,可以减缓新任务的提交速度。
在ThreadPoolExecutor.DiscardPolicy
中,不能执行的任务将被删除。
在ThreadPoolExecutor.DiscardOldestPolicy
中,若是执行程序还没有关闭,则位于工做队列头部的任务将被删除,而后重试执行程序(若是再次失败,则重复此过程)。
定义和使用其余种类的 RejectedExecutionHandler
类也是可能的,但这样作须要很是当心,尤为是当策略仅用于特定容量或排队策略时。
此类提供两个 protected
可重写的 钩子方法:
protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { }
这两种方法分别在执行 每一个任务
以前和以后调用。它们可用于操纵执行环境;注意这里是每一个任务,即每次运行新任务时都会执行一遍。例如,从新初始化 ThreadLocal
、搜集统计信息或添加日志条目。此外,还能够重写方法 terminated()
来执行 Executor
彻底终止后须要完成的全部特殊处理。
若是钩子 (hook) 或回调方法抛出异常,则内部辅助线程将依次失败并忽然终止。
jdk文档中提供了一个能够暂停和恢复的线程池例子:
class PausableThreadPoolExecutor extends ThreadPoolExecutor { private boolean isPaused; private ReentrantLock pauseLock = new ReentrantLock(); private Condition unpaused = pauseLock.newCondition(); public PausableThreadPoolExecutor(...) { super(...); } protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); pauseLock.lock(); try { while (isPaused) unpaused.await(); } catch(InterruptedException ie) { t.interrupt(); } finally { pauseLock.unlock(); } } public void pause() { pauseLock.lock(); try { isPaused = true; } finally { pauseLock.unlock(); } } public void resume() { pauseLock.lock(); try { isPaused = false; unpaused.signalAll(); } finally { pauseLock.unlock(); } } }