前一篇文章Java中实现多线程关键词整理中介绍了Java中建立多线程的各类办法,里面提到了线程池,这里对Java中的线程池作一个总结。html
为了更好地控制多线程,JDK提供了一套Executor框架,帮助开发人员有效的进行线程控制,其本质就是一个线程池。其中ThreadPoolExecutor是线程池中最核心的一个类,后面提到的四种线程池都是基于ThreadPoolExecutor实现的。java
ThreadPoolExecutor提供了四个构造方法,咱们看下最重要的一个构造函数:spring
public class ThreadPoolExecutor extends AbstractExecutorService { public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler); }
函数的参数含义以下:编程
线程池执行的过程:数组
ThreadPoolExecutor的继承关系:
缓存
ThreadPoolExecutor中的队列:服务器
ThreadPoolExecutor内部应用了任务缓存队列,即workQueue,它用来存放等待执行的任务。多线程
workQueue的类型为BlockingQueue
任务拒绝策略:框架
当线程池的任务缓存队列已满而且线程池中的线程数目达到maximumPoolSize,若是还有任务到来就会采起任务拒绝策略,一般有如下四种策略:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,可是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,而后从新尝试执行任务(重复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
扩展线程池(记录任务执行日志):
在默认的ThreadPoolExecutor实现中,提供了空的beforeExecutor和afterExecutor的实现,在实际应用中能够对其进行扩展来实现对线程池运行状态的追踪,输出一些有用的调试信息,以帮助系统故障诊断,这对于多线程程序错误排查是颇有帮助的。
ThreadPoolExecutor例子:
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class ThreadPool { private int corePoolSize = 1; // 线程池维护线程的最少数量 private int maximumPoolSize = 10;// 线程池维护线程的最大数量 private long keepAliveTime = 3; // 线程池维护线程所容许的空闲时间 private TimeUnit unit = TimeUnit.SECONDS;// 线程池维护线程所容许的空闲时间的单位 private BlockingQueue<Runnable> workQueue; // 线程池所使用的缓冲队列 private RejectedExecutionHandler handler; // 线程池对拒绝任务的处理策略 private static AtomicLong along = new AtomicLong(0); public void run() throws InterruptedException { ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.DiscardOldestPolicy()) { // 线程执行以前运行 @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("...............beforeExecute"); } // 线程执行以后运行 @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("...............afterExecute"); } // 整个线程池中止以后 protected void terminated() { System.out.println("...............thread stop"); } }; for (int i = 1; i <= 10; i++) { pool.execute(new ThreadPoolTask(i, along)); } for (int i = 1; i <= 10; i++) { pool.execute(new ThreadPoolTask(-i, along)); } pool.shutdown(); Thread.sleep(25000); System.out.println(along.get()); } public static void main(String[] args) { try { new ThreadPool().run(); } catch (InterruptedException e) { e.printStackTrace(); } } } class ThreadPoolTask implements Runnable { private int i = 0; private AtomicLong along; ThreadPoolTask(int i, AtomicLong along) { this.i = i; this.along = along; } @Override public void run() { try { // 模拟业务逻辑 Thread.sleep(1000); along.addAndGet(i); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " " + i); } }
咱们能够利用这个特性实如今线程池中打印出异常堆栈信息(正常是不会打印出来的),这里就不演示了。
Executors 提供了一系列工厂方法用于创先线程池,返回的线程池都实现了 ExecutorService 接口。
// 建立固定数目线程的线程池。 public static ExecutorService newFixedThreadPool(int nThreads) // 建立一个可缓存的线程池,调用execute将重用之前构造的线程(若是线程可用)。 // 若是现有线程没有可用的,则建立一个新线 程并添加到池中。 // 终止并从缓存中移除那些已有 60 秒钟未被使用的线程。 public static ExecutorService newCachedThreadPool() // 建立一个单线程化的Executor。 public static ExecutorService newSingleThreadExecutor() // 建立一个支持定时及周期性的任务执行的线程池,多数状况下可用来替代Timer类。 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
这四种方法都是用的 Executors 中的 ThreadFactory 创建的线程。
newCachedThreadPool()
newFixedThreadPool(int)
newScheduledThreadPool(int)
SingleThreadExecutor()
通常来讲,CachedTheadPool 在程序执行过程当中一般会建立与所需数量相同的线程,而后在它回收旧线程时中止建立新线程,所以它是合理的 Executor 的首选,只有当这种方式会引起问题时(好比须要大量长时间面向链接的线程时),才须要考虑用 FixedThreadPool。
----《Thinking in Java》第四版
以上引用自极客学院,总结的太精彩了。
Spring的TaskExecutor接口等同于java.util.concurrent.Executor接口。 实际上,它存在的主要缘由是为了在使用线程池的时候,将对Java 5的依赖抽象出来。 这个接口只有一个方法execute(Runnable task),它根据线程池的语义和配置,来接受一个执行任务。最初建立TaskExecutor是为了在须要时给其余Spring组件提供一个线程池的抽象。例如ApplicationEventMulticaster组件、JMS的 AbstractMessageListenerContainer和对Quartz的整合都使用了TaskExecutor抽象来提供线程池。 固然,若是你的bean须要线程池行为,你也可使用这个抽象层。
介绍下使用比较多的ThreadPoolTaskExecutor 类,这个实现只能在Java 5以上环境使用(如今应该没有低于1.5的老环境了吧~),它暴露的bean properties能够用来配置一个java.util.concurrent.ThreadPoolExecutor,把它包装到一个TaskExecutor中。
spring中ThreadPoolTaskExecutor最经常使用方式就是作为BEAN注入到容器中,其暴露的各个属性实际上是ThreadPoolExecutor的属性,并且这体现了DI容器的优点:
<bean id="threadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="2"/> <property name="keepAliveSeconds" value="200"/> <property name="maxPoolSize" value="10"/> <property name="queueCapacity" value="60"/> </bean>
线程池的理想大小取决于被提交任务的类型以及所部署系统的特性。在代码中不会固定线程池的大小,而应该经过某种配置机制来来提供,或者根据Runtime.getRuntime().availableProcessors()来动态计算。
若是一台服务器上只部署这一个应用而且只有一个线程池(N为CPU总核数):
线程等待时间所占比例越高,须要越多线程。线程CPU时间所占比例越高,须要越少线程。
【黄金公式】最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目
一个实际的计算过程(慕课网):
假设值
计算
threadcount = tasks/(1/taskcost) =taskstaskcout = (500~1000)0.1 = 50~100 个线程。corePoolSize设置应该大于50
根据8020原则,若是80%的每秒任务数小于800,那么corePoolSize设置为80便可
计算可得 queueCapacity = 80/0.1*1 = 80。意思是队列里的线程能够等待1s,超过了的须要新开线程来执行
切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。
计算可得 maxPoolSize = (1000-80)/10 = 92
(最大任务数-队列容量)/每一个线程每秒处理能力 = 最大线程数
参考博文:
参考书籍:
《Java并发编程实战》
《Java高并发程序设计》