Java从1.5开始正式提供了并发包,而这个并发包里面除了原子变量,synchronizer,并发容器,另一个很是重要的特性就是线程池.对于线程池的意义,咱们这边再也不多说.html
上图是线程池的主体类图,ThreadPoolExecutor是应用最为普遍的一个线程池实现(我也将在接下来的文字中详细描述我对这个类的理解和执行机制),ScheduledThreadPoolExecutor则在ThreadPoolExecutor上提供了定时执行的等附加功能,这个能够从ScheduledExecutorService接口的定义中看出来.Executors则相似工厂方法,提供了几个很是经常使用的线程池初始化方法.java
ThreadPoolExecutor并发
这个类继承了AbstractExecutorService抽象类,AbstractExecutorService主要的职责有2部分,一部分定义和实现提交任务的方法(3个submit方法的实现),实例化FutureTask而且交给子类执行,另一部分实现invokeAny,invokeAll方法.留给子类的方法为execute方法,也就是Executor接口定义的方法.ide
[java] //实例化一个FutureTask,交给子类的execute方法执行.这种设计可以保证callable和runnable的执行接口方法的一致性(FutureTask包装了这个差异) public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } [/java]
回到ThreadPoolExecutor类.ThreadPoolExecutor须要实现除了咱们刚才说的execute(Runnablecommand)方法外,还得实现ExecutorService接口定义的部分方法.但ThreadPoolExecutor所提供的不光是这些,如下根据个人理解来列一下它所具备的特性
1.execute流程
2.池
3.工做队列
4.饱和拒绝策略
5.线程工厂
6.beforeExecute和afterExecute扩展.net
execute方法的实现有个机制很是重要,当当前线程池线程数量小于corePoolSize,那么生成一个新的worker并把提交的任务置为这个工做线程的头一个执行任务,若是大于corePoolSize,那么会试着将提交的任务塞到workQueue里面供线程池里面的worker稍后执行,并非直接再起一个worker,可是当workQueue也满,而且当前线程池小于maxPoolSize,那么起一个新的worker并将该任务设为该worker执行的第一个任务执行,大于maxPoolSize,workQueue也满负荷,那么调用饱和策略里面的行为.线程
worker线程在执行完一个任务以后并不会马上关闭,而是尝试着去workQueue里面取任务,若是取不到,根据策略关闭或者保持空闲状态.因此submit任务的时候,提交的顺序为核心线程池——工做队列——扩展线程池.设计
池包括核心池,扩展池(2者的线程在同一个hashset中,这里只是为了方便才这么称呼,并非分离的),核心池在池内worker没有用完的状况下,只要有任务提交都会建立新的线程,其表明线程池正常处理任务的能力.扩展池,是在核心线程池用完,而且工做队列也已排满任务的状况下才会开始初始化线程,其表明的是线程池超出正常负载时的解决方案,一旦任务完成,而且试图从workQueue取不到任务,那么会比较当前线程池与核心线程池的大小,大于核心线程池数的worker将被销毁.
[java]htm
Runnable getTask() { for (;;) { try { int state = runState; //>SHUTDOWN就是STOP或者TERMINATED //直接返回 if (state > SHUTDOWN) return null; Runnable r; //若是是SHUTDOWN状态,那么取任务,若是有 //将剩余任务执行完毕,不然就结束了 if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); //若是不是以上状态的(也就是RUNNING状态的),那么若是当前池大于核心池数量, //或者容许核心线程池取任务超时就能够关闭,那么从任务队列取任务, //若是超出keepAliveTime,那么就返回null了,也就意味着这个worker结束了 else if (poolSize > corePoolSize || allowCoreThreadTimeOut) r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); //若是当前池小于核心池,而且不容许核心线程池取任务超时就关闭,那么take(),直到拿到任务或者被interrupt else r = workQueue.take(); //若是通过以上断定,任务不为空,那么返回任务 if (r != null) return r; //若是取到任务为空,那么断定是否能够退出 if (workerCanExit()) { //若是整个线程池状态变为SHUTDOWN或者TERMINATED,那么将全部worker interrupt (若是正在执行,那继续让其执行) if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } } //worker从workQueue中取不到数据的时候调用此方法,以决定本身是否跳出取任务的无限循环,从而结束此worker的运行 private boolean workerCanExit() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); boolean canExit; try { /**//* *线程池状态为stop或者terminated, *或者任务队列里面任务已经为空, *或者容许线程池线程空闲超时(实现方式是从工做队列拿最多keepAliveTime的任务,超过这个时间就返回null了)而且 *当前线程池大于corePoolSize(>1) *那么容许线程结束 *static final int RUNNING = 0; *static final int SHUTDOWN = 1; *static final int STOP = 2; *static final int TERMINATED = 3; */ canExit = runState >= STOP || workQueue.isEmpty() || (allowCoreThreadTimeOut && poolSize > Math.max(1,corePoolSize)); } finally { mainLock.unlock(); } return canExit; } [/java]
当提交任务是,线程池都已满,而且工做队列也无空闲位置的状况下,ThreadPoolExecutor会执行reject操做,JDK提供了四种reject策略,包括AbortPolicy(直接抛RejectedExceptionException),CallerRunsPolicy(提交任务线程本身执行,固然这时剩余任务也将没法提交),DiscardOldestPolicy(将线程池的workQueue任务队列里面最老的任务剔除,将新任务丢入),DiscardPolicy(无视,忽略此任务,而且当即返回).实例化ThreadPoolExecutor时,若是不指定任何饱和策略,默认将使用AbortPolicy.blog
我的认为这些饱和策略并不十分理想,特别是在应用既要保证快速,又要高可用的状况下,个人想法是可以加入超时等待策略,也就是提交线程时线程池满,可以park住提交任务的线程,一旦有空闲,能在第一时间通知到等待线程.这个实际上和主线程执行类似,可是主线程执行期间即便线程池有大量空闲也不会当即能够提交任务,效率上后者可能会比较低,特别是执行慢速任务.
实例化Worker的时候会调用ThreadFactory的addThread(Runnabler)方法返回一个Thread,这个线程工厂是能够在ThreadPoolExecutor实例化的时候指定的,若是不指定,那么将会使用DefaultThreadFactory,这个也就是提供给使用者命名线程,线程归组,是不是demon等线程相关属性设置的机会.
beforeExecute和afterExecute是提供给使用者扩展的,这两个方法会在workerrunTask以前和run完毕以后分别调用.JDK注释里DougLea(concurrent包做者)展现了beforeExecute一个颇有趣的示例.代码以下.
[java]
class PausableThreadPoolExecutor extends ThreadPoolExecutor { private boolean isPaused; private ReentrantLock pauseLock = new ReentrantLock(); private Condition unpaused = pauseLock.newCondition(); public PausableThreadPoolExecutor() { super(); } protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); pauseLock.lock(); try { while (isPaused) unpaused.await(); } catch (InterruptedException ie) { t.interrupt(); } finally { pauseLock.unlock(); } } public void pause() { pauseLock.lock(); try { isPaused = true; } finally { pauseLock.unlock(); } } public void resume() { pauseLock.lock(); try { isPaused = false; unpaused.signalAll(); } finally { pauseLock.unlock(); } } } [/java]
使用这个线程池,用户能够随时调用pause停止剩余任务执行,固然也可使用resume从新开始执行剩余任务.
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor是一个很实用的类,它的实现核心是基于DelayedWorkQueue.从ScheduledThreadPoolExecutor的继承结构上来看,各位应该可以看出些端倪来,就是ScheduledThreadPoolExecutor将ThreadPoolExecutor中的任务队列设置成了DelayedWorkQueue,这也就是说,线程池Worker从任务队列中取的一个任务,须要等待这个队列中最短超时任务的超时,也就是实现定时的效果.因此ScheduledThreadPoolExecutor所作的工做实际上是比较少的.主要就是实现任务的实例化并加入工做队列,以及支持scheduleAtFixedRate和scheduleAtFixedDelay这种周期性任务执行.
[java]
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory); } [/java]
private void runPeriodic() { boolean ok = ScheduledFutureTask.super.runAndReset(); boolean down = isShutdown(); // Reschedule if not cancelled and not shutdown or policy allows if (ok && (!down ||(getContinueExistingPeriodicTasksAfterShutdownPolicy() && !isStopped()))) { long p = period; if (p > 0) time += p; else time = triggerTime(-p); ScheduledThreadPoolExecutor.super.getQueue().add(this); } // This might have been the final executed delayed // task. Wake up threads to check. else if (down) interruptIdleWorkers(); } [/java]
ExecutorCompletionService
CompletionService定义了线程池执行任务集,能够依次拿到任务执行完毕的Future,ExecutorCompletionService是其实现类,先举个例子,以下代码,这个例子中,须要注意ThreadPoolExecutor核心池必定保证可以让任务提交而且立刻执行,而不是放到等待队列中去,那样次序将会没法控制,CompletionService也将失去效果(其实核心池中的任务完成顺序仍是准确的).
[java]
public static void main(String[] args) throws InterruptedException, ExecutionException{ ThreadPoolExecutor es=new ThreadPoolExecutor(10, 15, 2000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),new ThreadPoolExecutor.AbortPolicy()); CompletionService<String> cs=new ExecutorCompletionService<String>(es); cs.submit(new Callable<String>() { @Override public String call() throws Exception { Thread.currentThread().sleep(1000); return "i am sleeped 1000 milliseconds"; } }); cs.submit(new Callable<String>() { @Override public String call() throws Exception { Thread.currentThread().sleep(5000); return "i am sleeped 5000 milliseconds"; } }); cs.submit(new Callable<String>() { @Override public String call() throws Exception { Thread.currentThread().sleep(4000); return "i am sleeped 4000 milliseconds"; } }); cs.submit(new Callable<String>() { @Override public String call() throws Exception { Thread.currentThread().sleep(2000); return "i am sleeped 2000 milliseconds"; } }); for(int i=0;i<4;i++){ Future<String> fu=cs.take(); System.out.println(fu.get()); } } [/java]
执行结果:
i am sleeped 1000 milliseconds i am sleeped 2000 milliseconds i am sleeped 4000 milliseconds i am sleeped 5000 milliseconds
从执行结果看来,咱们发现先完成的任务先被拿出来了,直到全部任务被执行完毕,也就是CompletionService的效果达到了.
ExecutorCompletionService并不复杂,关键的一个点就是它的内部类QueueingFuture继承了FutureTask类,而且实现了done()方法,done()方法是在线程池任务执行完毕,最后调用FutureTask的方法(这在JAVALOCK代码浅析(http://www.blogjava.net/BucketLi/archive/2010/09/30/333471.html)一文中对于FutureTask代码解析有提到)
QueueingFuture的done()方法实现是将执行完的任务(FutureTask)丢入全局的完成队列中(completionQueue),那么take是从这个blockingqueue中取元素.也就是任务完成就会有元素,即生产者消费者.
这种实现的思想是将本来在单个FutureTask上的等待转化为在BlockingQueue上的等待,即对所有FutureTask的等待,从而达到哪一个先完成,哪一个就可取执行结果的效果.
[java]
private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); his.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; } [/java]
总结:JUC提供的线程池体系核心是在ThreadPoolExecutor,而ScheduledThreadPoolExecutor和ExecutorCompletionService只是对其扩展,这里没有去细讲Executors这个便捷类,这个类提供不少便捷的线程池构建方法.各位使用的时候不妨去看下.