# 在任务与执行策略之间的隐性耦合
Executror框架能够将任务的提交与任务的执行解耦开。可是虽然Executor框架为制定和修改执行策略提供了很大的灵活性,但并不是全部的任务都能适用全部的执行策略。有些类型的任务须要明确地制定执行策略,其中包括:html
依赖性任务
:大多数行为正确的任务都是独立的:它们不依赖于其余任务的执行时序、执行结果或其余效果。当在线程池中执行独立任务时,能够任意修改线程池大小和配置,这些修改只会对执行性能产生影响。若是提交给线程池的任务须要依赖于其余任务,那么隐含的对执行策略带来了约束,此时必须当心地维持这些执行策略以免产生活跃性问题。使用线程封闭机制的任务
:与线程池相比,单线程的Executor可以对并发性作出更强的承诺。它们能确保任务不会并发的执行。对象能够封闭在任务线程中,使得在该线程执行的任务在访问该对象时不须要同步。这种状况将在任务与执行策略之间造成隐性的耦合:即任务要求其执行所在的Executor是单线程的。若是将Executor从单线程环境改成线程池环境,那么将会失去线程安全。对响应时间敏感
:若是将一个运行时间较长的任务提交到单线程的Executor中,或者将多个运行时间较长的任务提交到一个只包含少许线程的线程池中,那么将会下降由该Executor管理的服务性。使用threadLocal的任务
:ThreadLocal使每一个线程都拥有某个变量的一个私有版本。只要条件容许,Executor能够自由的重用这些线程。若是从任务中抛出一个未受检查的异常,那么将用一个新的工做者线程来替代抛出异常的线程。只有线程本地值的生命周期受限于任务的生命周期时,在线程池中的线程使用ThreadLocal才有意义,而在线程池中的线程中不该该使用ThreadLocal在任务之间传递值。只有当任务都是同类型的而且互相独立时,线程池的性能才能达到最佳。若是运行时间较长和运行时间较短的任务混合在一块儿,除非线程池很大,不然很容易形成拥塞。
在线程池中,若是任务依赖于其余任务,那么可能产生死锁。
在单线程的Executor中,若是一个任务将另外一个任务提交到同一个Executor,而且等待这个被提交任务的结果,那么一般会引起死锁。
若是全部正在执行任务的线程都因为等待其余仍处于工做队列的任务而阻塞,那么会发生一样的问题。这种现象被称为线程饥饿死锁(Thread Starvation Deadlock)
java
public class ThreadDeadLock{ ExecutorService exec = Executors.newSingleThreadExecutor(); public class RenderPageTask implements Callable<String> { public String call throws Exception { Future<String> header,footer; header = exec.submit(new LoadFileTask("header.html")); footer = exec.submit(new LoadFileTask("footer.html")); String page = renderBody(); // 这里将发生死锁:因为当前任务在等待子任务的结果 return header.get() + page + footer.get(); } } }
每当提交了一个有依赖性的Executor任务时,要清晰的知道可能会出现线程“饥饿”死锁,所以须要在代码或配置Executor的配置文件中记录线程池的大小限制或配置限制。
若是任务阻塞时间过长,那么即使不出现死锁,任务的的响应性也不好。执行时间较长可能会形成线程池阻塞,增长执行时间较短任务的服务时间。若是线程数量远小于在稳定状态下执行时间较长任务的数量,那么到最后可能全部的线程都会运行这些执行时间较长的任务,从而影响总体的响应性。
经过限定任务等待资源的时间,不要无限制的等待,来缓解执行时间任务较长任务的影响。平台类库的大多数阻塞方法都提供了限时版本和无限时版本,例如Thread.join,BlockingQueue.put,CountDownLatch.await以及Selector.select等。若是等待超时,那么能够把任务标为失败,而后终止任务或者将任务放回队列以供随后执行。这样不管任务最终是否能执行成功,至少任务能顺利继续执行下去。不过若是线程池中老是充满被阻塞的任务,那么多是线程池的规模太小。node
要想正确的设置线程池的大小,必须分析计算环境、资源预算和任务的特性。在部署的系统中有多少个CPU?多大的内存?任务是计算密集型、I/O密集型仍是两者皆可?它们是否须要像JDBC链接这样的稀缺资源?而且若是它们之间的行为相差很大,那么应该考虑使用多个线程池,从而使每一个线程池能够根据各自的工做负载来调整。算法
计算密集型
的任务,在拥有N个Cpu的系统上,当线程池的大小为N+1时,一般能有最优的利用率:即便当计算密集型的线程偶尔因为页缺失故障或者其余缘由暂停时,这个“额外”的线程也能保证cpu的时钟周期不会被浪费。包含IO操做或者其余阻塞操做
的任务,因为线程并不会一直执行,所以线程池的规模应该更大。若是要正确的设置线程池的大小,你须要估算任务的等待时间和计算时间的比值。CPU周期并非惟一影响线程池大小的资源,还包括内存、文件句柄、套接字句柄和数据库链接等。数据库
ThreadPoolExecutor为一些Executor提供了基本的实现,这些Executor是由Executors中的newCachedThreadPool
、newFixedThreadPool
等工厂方法返回的。ThreadPoolExecutor是一个灵活的,稳定的线程池,且支持各类定制。
若是默认的构造函数不能知足需求,那么能够经过ThreadPoolExecutor的构造函数,而且根据本身的需求来定制。ThreadPoolExecutor定义了不少构造函数。数组
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
线程池的基本大小(corePoolSize)
、最大大小(maximumPoolSize)
、存活时间
等因素共同负责线程的建立与销毁。安全
基本大小就是线程池的目标大小,即没有执行任务时
线程池的大小。服务器
建立ThreadPoolExecutor的初期,线程并不会当即启动,而是等到有任务提交时才启动,除非调用prestartAllCoreThread)
经过调节线程池的基本大小和存活时间,能够帮助线程池回收空闲线程占有的资源(回收线程时会产生额外的延迟,由于当需求增长时,必须建立新的线程来知足需求)。并发
Integer.MAX_VALUE
,而基本大小设置为0,超时时间设置为1分钟,这样建立出来的线程能够被无限扩展,当需求下降的时候自动收缩。在有限的线程池中限制可并发执行的任务数量(单线程的Executor是一种特例:它们能确保不会有任务并发执行,由于它们经过线程封闭来实现线程安全性。)
若是无限制的建立线程,那么将致使系统的不稳定性,而且经过固定大小的线程池(而不是收到一个请求就建立一个线程)来解决这样的问题。然而这个方案并不完整。在高负载的状况下,应用程序仍可能耗尽资源。若是新请求的到达速率超过了线程池的处理速率,那么新来的请求将累积起来。在线程池中,这些请求会在一个由Executor管理的Runnable队列中等待,而不会像线程那样去竞争CPU资源。经过一个Runnable和一个链表节点来表示一个等待中的任务,固然比用线程来表示开销低不少。可是若是客户提交给服务器请求的速率超过了服务器的处理速度,那么资源仍可能被耗尽。
即便请求到达的速率很稳定,也有可能出现请求突增的状况。尽管队列有足浴缓解任务的突增问题,可是若是任务持续高速的到来,那么最终仍是会抑制请求的到达率以免耗尽内存,甚至在耗尽内存以前,响应性能也随着任务队列的增加而愈来愈糟。框架
ThreadPoolExecutor容许提供一个BlockingQueue
来保存等待执行的任务。基本的任务排列方法有3种:
队列的选择与其余配置参数有关,例如线程池的大小等。
newFixedThreadPool和newSingleThreadExecutor在默认状况下将使用一个无界的LinkedBlockingQueue。若是全部线程都处于忙碌,那么任务将在队列中等待,若是任务快速的到达,超过了cpu处理任务的速度,那么队列将无限制的增长。
更稳妥的策略是使用有界队列,例如ArrayBlockingQueue、有界的LinkedBlockingQueue、PriorityBlockingQueue等。有界队列能够避免资源耗尽。可是带来了一个新问题:当队列填满之后该怎么办?(饱和策略能够解决这个问题)。在使用有界的工做队列时,队列的大小与线程池的大小必须一块儿调节:
对于很是大的或者无界的线程池,能够经过使用SynchronousQueue来避免任务排队,以及直接将任务从生产者移交给工做者线程。SynchronousQueue并非一个真正的队列,而是一种在线程间进行移交的机制。要将一个元素放入SynchronousQueue中,必须有另外一个线程正在等待接收这个元素。若是没有线程正在等待,而且线程池的当前大小小于最大值,那么ThreadPoolExecutor将会建立一个新的线程。不然根据饱和策略,这个任务将被拒绝。
使用直接移交将更加高效,由于任务会直接交给执行它的线程,而不是被首先放入队列里,而后由工做线程从队列中提取任务。只有当线程池是无界的或者是能够拒绝的时候,SynchronousQueue才有实际价值。在newCachedThreadPool中就使用了SynchronousQueue。
当使用像LinkedBlockingQueue
或ArrayBlockingQueue
这样的FIFO队列,任务的执行顺序和它们的到达顺序相同,若是想进一步控制任务的执行顺序,可使用PriorityBlockingQueue
,内容根据天然顺序或者Comparable定义。
对于Executor,newCachedThreadPool工厂方法是一种很好的默认选择,它能提供比固定大小更好的排队性能(因为使用了SynchronousQueue而不是LinkedBlockingQueue),当须要限制当前任务的数量以知足资源管理器需求时,能够选择固定大小的线程池,避免过载问题。
当有界队列被填满后,饱和策略开始发挥做用。
ThreadPoolExecutor的饱和策略能够经过调用setRejectedExecutionHandle
来修改。若是某个任务被提交到已经关闭的Executor时,也会触发饱和策略。
JDK提供了几种不一样的RejectedExecutionHandle
实现,每种实现都包含不一样的策略:
AbortPolicy
:停止策略,是默认的饱和策略。该策略将会抛出未检查的RejectedExecutionException
,调用者能够捕获这个异常,而后根据需求来编写本身的处理代码。DiscardPolicy
:抛弃策略,会悄悄的抛弃该任务。DiscardOldestPolicy
:抛弃最旧的策略,会抛弃下一个将被执行的任务,而后尝试提交当前任务。(若是是优先队列,则会抛弃优先级最高的任务,所以不要将DiscardOldestPolicy和优先队列一块儿使用)CallerRunsPolicy
:调用者运行策略,实现了一种调节机制,既不会抛弃任务,也不会抛出异常,而是将某些任务回退给调用者,从而下降新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了execute的线程中执行该任务。当线程池中的全部线程都被占用,而且工做队列被填满的时候,下一个任务会在调用execute时在主线执行。因为执行须要必定时间,所以主线至少在一段时间内不能提交任何任务,从而使得工做者线程有时间来处理完正在执行的任务。在这期间,主线程不会调用accept,所以请求将会被保存到TCP层的队列中而不是在应用程序的队列中,若是持续过载,TCP层最终发现它的请求队列被填满,所以一样会开始抛弃请求。从线程池 -> 工做队列 -> 应用程序 -> TCP层,最终到达客户端,这种策略可以实现一种平缓的性能下降。/** * 建立一个固定大小的线程池,同时使用“调用者运行”的饱和策略 */ ThreadPoolExecutor executor = new ThreadPoolExecutor(N_THREADS, N_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(CAPACITY)); executor.setRejectedExecutionHandle(new ThreadPoolExecutor.CallerRunsPolicy);
当工做队列被填满后,没有预约义的饱和策略来阻塞execute。能够经过使用Semaphore信号量来限制任务的到达率。
每当线程池须要建立一个线程时,都是经过线程工厂方法来完成的。默认的线程工厂方法将建立一个新的、非守护的线程。ThreadFactory
接口只定义了一个方法Thread new Thread(Runnable r)
,每当线程池须要建立一个新线程时都会调用这个方法。若是在应用程序中须要利用安全策略来控制对某些特殊代码库的访问权限,那么能够经过Executors中的privilegedThreadFactory
工厂来定制本身的线程工厂。经过这样的方式建立出来的线程,将于privilegedThreadFactory拥有一样的访问权限。若是不使用privilegedThreadFactory
,线程池建立的线程将从在须要更新线程时调用execute或submit的客户端程序中继承访问权限,从而致使一些使人困惑的安全问题。
在调用完成ThreadPoolExecutor的构造函数以后,仍然能够设置大多数传递给它的构造函数的参数。若是Executor是经过Executors中的某个(newSingleThreadExecutor除外)工厂方法建立的,那么能够将结果的类型转化为ThreadPoolExecutor。
ExecutorService exec = Executors.newCachedThreadPool(); if(exec instanceof ThreadPoolExecutor){ ((ThreadPoolExecutor) exec).setCorePool(10); }else { throw new AssertionError("Oops,bad assumpion"); }
在Executors中包含一个unconfigurableExecutorService
工厂方法,该方法能够对ExecutorService进行包装,若是你将ExecutorService暴露给不信任的代码,又不指望其被修改,就能够经过unconfigurableExecutorService来包装它。
ThreadPoolExecutor是可拓展的,它提供了几个能够在子类化中改写的方法:
这几个方法有利于拓展ThreadPoolExecutor的行为。在执行任务的线程池中将调用beforeExecute和afterExecute方法,以便与添加日志,计时。不管是从run中正常返回,仍是抛出一个异常而返回,afterExcute都会被调用,若是beforeExecute抛出一个RuntimeException,那么任务将不被执行,afterExecute也不被调用。
在线程池关闭操做时执行terminated,能够用来释放Executor在其生命周期里分配的各类资源,还能够发送通知,记录日志等。
void processSequentially(List<Element> elements){ for(Element e : elements){ process(e); } } void processInparallet(Executor exec, List<Element> elements){ for(final Element e : elements){ exec.excute(new Runnable(){ public void run(){ process(e); }; }); } }
调用processInparallet比processSequentially能更好的返回,由于一当列表中的任务提交完成,就会当即返回,而不会等待这些任务执行完成。
public<T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results){ for(Node<T> n : nodes){ results.add(n.compute()); sequentialRecursive(n.getChildren(), results); } } public<T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results){ for(final Node<T> n : nodes){ exec.execute(new Runnbale(){ public void run(){ results.add(n.compute()); } }); parallelRecursive(exec, n.getChildren(), results); } }
当parallelRecursive返回的时候,树中的各个节点都已经访问过了(遍历过程还是串行,compute()调用才是并行),而且每一个节点的计算任务也已经放入了Executor的工做队列。
对于并发执行的任务,Executor框架是一种强大且灵活的框架。它提供了大量可调节的选项,例如建立线程和关闭线程的策略,处理队列任务的策略,处理过多任务的策略,而且提供了几个钩子方法来拓展它的行为。固然,其中有些参数不能很好的工做,某些类型的任务须要特定的执行策略,而一些参数组合可能会产生想象以外的结果。