无限制建立线程的不足:html
1) 线程生命周期开销高;java
2) 资源消耗大,尤为是内存。若是可运行的线程数量多于可用处理器的数量,那么有些线程将闲置。大量空闲的线程占用许多内存,给垃圾回收器带来压力(频繁 stop the world)。因此,若是已经拥有足够多的线程使全部CPU保持忙碌状态,那么建立再多的线程反而会下降性能。web
3) 稳定性。可建立线程的数量存在必定限制。每一个都会维护两个执行栈,一个用于java代码,另外一个用于原生代码。一般JVM在默认状况下生成一个复合栈,大约为0.5MB。若是无限制地建立线程,破坏了系统对线程的限制,就极可能抛出OutOfMemoryError异常,使得系统处于不稳定状态。编程
public class ExecutorTest { private static final int NUMBERS = 100; private static final Executor EXECUTOR = Executors.newFixedThreadPool(NUMBERS); public void processRequst() throws IOException { ServerSocket serverSocket = new ServerSocket(80); while (true) { final Socket conn = serverSocket.accept(); Runnable task = new Runnable() { @Override public void run() { System.out.println("-----------process request----------"); } }; EXECUTOR.execute(task); } } }
线程池经过调用Executors的静态工厂方法的四种建立方式:缓存
1) newFixedThreadPool。建立一个固定长度的线程池,每当提交一个任务时就建立一个线程,直到达到线程池的最大数量,若是某个线程发生异常而结束那么线程池会补充一个线程。服务器
/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue. At any point, at most * {@code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks. The threads in the pool will exist * until it is explicitly {@link ExecutorService#shutdown shutdown}. * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if {@code nThreads <= 0} */ public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
2) newCachedThreadPool。建立一个可缓存的线程池。若是线程池的当前规模超过了处理需求时,将回收空闲的线程,而当需求增长时,则会添加新的线程,线程池的规模不存在任何限制。网络
/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available. These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to {@code execute} will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using {@link ThreadPoolExecutor} constructors. * * @return the newly created thread pool */ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
3) newSingleThreadExecutor。这是一个单线程的executor,它建立单个工做者线程来执行任务,若是这个线程异常结束,会建立另外一个线程替代。该方式能确保依照任务在队列中的顺序来串行执行(FIFO,LIFO,优先级)。多线程
/** * Creates an Executor that uses a single worker thread operating * off an unbounded queue. (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.) Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * {@code newFixedThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * * @return the newly created single-threaded Executor */ public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
4) newScheduledThreadPool。建立一个固定长度的线程池,而且以延迟或定时的方式执行任务并发
/** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @return a newly created scheduled thread pool * @throws IllegalArgumentException if {@code corePoolSize < 0} */ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } /** * Creates a new {@code ScheduledThreadPoolExecutor} with the * given core pool size. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @throws IllegalArgumentException if {@code corePoolSize < 0} */ public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
建立线程池的方法最终都是建立了一个ThreadPoolExecutors实例,该类的构造方法以下app
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
参数说明(摘抄自JDK1.6参考文档)
1) 核心和最大池大小(corePoolSize和maximumPoolSize)
ThreadPoolExecutor 将根据 corePoolSiz和 maximumPoolSize设置的边界自动调整池大小。当新任务在方法 execute(java.lang.Runnable) 中提交时,若是运行的线程少于 corePoolSize,则建立新线程来处理请求,即便其余辅助线程是空闲的。若是运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才建立新线程。若是设置的 corePoolSize 和 maximumPoolSize 相同,则建立了固定大小的线程池。若是将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则容许池适应任意数量的并发任务。在大多数状况下,核心和最大池大小仅基于构造来设置,不过也可使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。
2) 保持活动时间(keepAliveTime)
若是池中当前有多于 corePoolSize 的线程,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止。这提供了当池处于非活动状态时减小资源消耗的方法。若是池后来变为活动,则能够建立新的线程。也可使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 动态地更改此参数。默认状况下,保持活动策略只在有多于 corePoolSizeThreads 的线程时应用。可是只要 keepAliveTime 值非 0,allowCoreThreadTimeOut(boolean) 方法也可将此超时策略应用于核心线程。
TimeUnit为超时时间单位。
3)阻塞队列(BlockingQueue)
全部 BlockingQueue 均可用于传输和保持提交的任务。可使用此队列与池大小进行交互:
排队有三种通用策略:
4) 建立新线程(ThreadFactory)
使用 ThreadFactory 建立新线程。若是没有另外说明,则在同一个 ThreadGroup 中一概使用 Executors.defaultThreadFactory() 建立线程,而且这些线程具备相同的 NORM_PRIORITY 优先级和非守护进程状态。经过提供不一样的 ThreadFactory,能够改变线程的名称、线程组、优先级、守护进程状态,等等。若是从 newThread 返回 null 时 ThreadFactory 未能建立线程,则执行程序将继续运行,但不能执行任何任务。
5) 被拒绝的任务(RejectedExecutionHandler)
当 Executor 已经关闭,而且 Executor 将有限边界用于最大线程和工做队列容量,且已经饱和时,在方法 execute(java.lang.Runnable) 中提交的新任务将被拒绝。在以上两种状况下,execute 方法都将调用其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四种预约义的处理程序策略:
定义和使用其余种类的 RejectedExecutionHandler 类也是可能的,但这样作须要很是当心,尤为是当策略仅用于特定容量或排队策略时。
咱们知道,JVM只有在全部线程所有终止后才会退出。全部,若是咱们没法正确地关闭Executor,JVM将没法结束。为了解决执行服务的生命周期问题,ExecutorService继承Executor接口,添加了一些用于生命周期管理的方法。ExecutorService的生命周期有三种状态:运行、关闭和已终止。
public interface ExecutorService extends Executor { /** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. * Invocation has no additional effect if already shut down. * 执行平缓的关闭过程,再也不接受新的任务,同时等待已经提交的任务执行完成——包括哪些还未开始执行的任务 */ void shutdown(); /** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks * that were awaiting execution. * 尝试取消全部运行中的任务,而且再也不启动队列中还没有开始执行的任务 */ List<Runnable> shutdownNow(); /** * Returns {@code true} if this executor has been shut down. * 查询 ExecutorService 是否已经关闭 */ boolean isShutdown(); /** * Returns {@code true} if all tasks have completed following shut down. * Note that {@code isTerminated} is never {@code true} unless * either {@code shutdown} or {@code shutdownNow} was called first. * * @return {@code true} if all tasks have completed following shut down * * 查询 ExecutorService 是否已经终止 */ boolean isTerminated(); /** * Blocks until all tasks have completed execution after a shutdown * request, or the timeout occurs, or the current thread is * interrupted, whichever happens first. * * 等待ExecutorService进入终止状态 */ boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; /*其余用于提交任务的方法……*/ }
采用future和callable携带任务结果,长时间执行的任务能够先进行计算,在以后经过future取得计算结果。
示例程序
/* * 网页数据加载并行的可能性,假设网页只包含文本和图像两种数据 */ public class CompletionTest { class ImageData{ // 属性.... } // 从序列化的数据中加载图像 public ImageData loadFrom(String source) { return new ImageData(); } /** * 单线程模式 * @param sequence 序列化后的网页数据 */ public void loadWithSingleThread(CharSequence source) { System.out.print("加载文本"); List<ImageData> list = new ArrayList<>(); // 从 source 解析图像数据并加入到 list 中 ..... // loadImage(source) for (ImageData imageData : list) { System.out.println("图像加载完成" + imageData); } } private final ExecutorService executorService = Executors.newFixedThreadPool(10); /* * 单线程加载CPU利用率低,若是程序依赖于长时间的 io(当前从网络加载图像就是)那么将很费时间 * 结合 futureTask 预加载图像 */ public void loadInFuture(CharSequence source) { Callable<List<ImageData>> task = new Callable<List<ImageData>>() { @Override public List<ImageData> call() throws Exception { List<ImageData> result = new ArrayList<>(); /* * 加载图像数据到 result * loadImageFrom source to list..... */ return result; } }; Future<List<ImageData>> future = executorService.submit(task); /* * loadText from source..... */ System.out.println("loading text"); try { List<ImageData> imageDatas = future.get(); for (ImageData imageData : imageDatas) { System.out.println("图像数据" + imageData); } } catch (InterruptedException e) { // 抛出中断异常,从新设置线程的中断状态 Thread.currentThread().interrupt(); // 中断了,结果已不须要 future.cancel(true); } catch (ExecutionException e) { e.printStackTrace(); } } /* * 采用 future 来预加载图像在必定程度上提供了并发性,但在本问题中效率仍比较低,由于咱们采用的是一次性加载完图像 * 再返回,而相对于加载文原本说,图像加载速度要低不少,在本问题中几乎能够说效率与串行差异不大,那怎么改进? * 为每一图片设置一个相应的 future计算任务,而后循环操做,每计算完就直接加载,那样用户看到的页面是一张张加载 * 出来的,这可行,但比较繁琐,咱们能够直接使用CompletionService。 * 结合 completionService 加载图像 */ public void loadWithCompeletionSevice(CharSequence source) { List<String> imageInfo = new ArrayList<>(); // imageInfo = load from source....... CompletionService<ImageData> service = new ExecutorCompletionService<>(executorService); for (String string : imageInfo) { service.submit(new Callable<CompletionTest.ImageData>() { @Override public ImageData call() throws Exception { ImageData imageData = loadFrom(string); // imageDate = loadFromSource(); return imageData; } }); } // loadText(source) System.out.println("loading text"); try { for (int i = 0; i < imageInfo.size(); i++) { // 在得出结果以前阻塞 Future<ImageData> future = service.take(); ImageData imageData = future.get(); // loading image ... System.out.println("loading image" + imageData); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { e.printStackTrace(); } } }
有时候,咱们可能没法在指定的时间内完成某个任务,那么咱们将不须要它的结果,此时咱们能够放弃这个任务。例如,一个web应用程序须要从外部的广告服务器获取广告,当若是该应用程序在2秒内得不到响应,那么将显示一个默认的广告,这样即便没法获取广告信息,也不会下降站点的性能。
程序示例
public class GetAd { class Ad{ /* * 属性.... */ } private final ExecutorService execute = Executors.newFixedThreadPool(100); private final Ad DEFAULT_AD = new Ad(); private final long TIME_LOAD = 2000; public void loadAd() { long endTime = System.currentTimeMillis() + TIME_LOAD; Future<Ad> future = execute.submit(new FetchAdTask()); // loading page text..... Ad ad; try { // leftTime 有可能为负数,但 future.get 方法会把负数视为0 long leftTime = endTime - System.currentTimeMillis(); ad = future.get(leftTime, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); future.cancel(true); ad = DEFAULT_AD; } catch (TimeoutException e) { future.cancel(true); ad = DEFAULT_AD; } catch (ExecutionException e) { ad = DEFAULT_AD; } System.out.println("load complete" + ad); } class FetchAdTask implements Callable<Ad>{ @Override public Ad call() throws Exception { // load ad from ad server return new Ad(); } } }
总结自《java并发编程实战》