1.线程池的好处。程序员
线程使应用可以更加充分合理的协调利用cpu 、内存、网络、i/o等系统资源。数据库
线程的建立须要开辟虚拟机栈,本地方法栈、程序计数器等线程私有的内存空间。编程
在线程的销毁时须要回收这些系统资源。频繁的建立和销毁线程会浪费大量的系统资源,增长并发编程的风险。缓存
另外,在服务器负载过大的时候,如何让新的线程等待或者友好的拒绝服务?这些丢失线程自身没法解决的。因此须要经过线程池协调多个线程,并实现相似主次线程隔离、定时执行、周期执行等任务。线程池的做用包括:安全
在了解线程池的基本做用后,咱们学习一下线程池是如何建立线程的。首先从ThreadPoolExecutor构造方法讲起,学习如何定义ThreadFectory和RejectExecutionHandler,并编写一个最简单的线程池示例。而后,经过分析ThreadPoolExecutor的execute和addWorker两个核心方法,学习如何把任务线程加入到线程池中运行。ThreadPoolExecutor的构造方法以下:服务器
1 public ThreadPoolExecutor( 2 int corePoolSize, //第1个参数 3 int maximumPoolSize, //第2个参数 4 long keepAliveTime, //第3个参数 5 TimeUnit unit, //第4个参数 6 BlockingQueue<Runnable> workQueue, //第5个参数 7 ThreadFactory threadFactory, //第6个参数 8 RejectedExecutionHandler handler) { //第7个参数 9 if (corePoolSize < 0 || 10 maximumPoolSize <= 0 || 11 maximumPoolSize < corePoolSize || 12 keepAliveTime < 0) // 第一处 13 throw new IllegalArgumentException(); 14 if (workQueue == null || threadFactory == null || handler == null)//第二处 15 throw new NullPointerException(); 16 this.corePoolSize = corePoolSize; 17 this.maximumPoolSize = maximumPoolSize; 18 this.workQueue = workQueue; 19 this.keepAliveTime = unit.toNanos(keepAliveTime); 20 this.threadFactory = threadFactory; 21 this.handler = handler; 22 }
从代码第2处来看,队列、线程工程、拒绝处理服务都必须有实例对象,但在实际编码中,不多有程序员对着三者进行实例化,而经过Executors这个线程池静态工厂提供默认实现,那么Executors与ThreadPoolExecutor 是什么关系呢?线程池相关的类图网络
ExecutorService接口继承了Executor接口,定义了管理线程任务的方法。ExecutorService 的抽象类AbstractExecutorService 提供了 submit()、invokeAll()等部分方法的实现,可是核心方法Executor.execute() 并无在这里实现。由于全部的任务都在这个方法里执行,不一样的实现会带来不一样的执行策略,这一点在后续的ThreadPoolExecutor解析时,会一步步分析。经过Executor的静态工厂方法能够建立三个线程池的包装对象:ForkJoinPool、ThreadPoolExecutor、ScheduledThreadPoolExecutor。Executors核心方法有5个:并发
1 /** 2 * Creates a work-stealing thread pool using all 3 * {@link Runtime#availableProcessors available processors} 4 * as its target parallelism level. 5 * @return the newly created thread pool 6 * @see #newWorkStealingPool(int) 7 * @since 1.8 8 */ 9 public static ExecutorService newWorkStealingPool() { 10 return new ForkJoinPool 11 (Runtime.getRuntime().availableProcessors(), 12 ForkJoinPool.defaultForkJoinWorkerThreadFactory, 13 null, true); 14 }
1 public static ExecutorService newFixedThreadPool(int nThreads) { 2 return new ThreadPoolExecutor(nThreads, nThreads, 3 0L, TimeUnit.MILLISECONDS, 4 new LinkedBlockingQueue<Runnable>()); 5 }
这里,输入的队列没有指明长度,下面介绍LinkedBlockingQueue的构造方法。ide
1 public LinkedBlockingQueue() { 2 this(Integer.MAX_VALUE); 3 }
使用这样的无界队列,若是瞬间请求很是大,会有OOM的风险。除newWorkStealingPool 外,其余四个建立方式都存在资源耗尽的风险。学习
Executors 中默认的线程工程和拒绝策略过于简单,一般对用户不够友好。线程工厂须要作建立前的准备工做,对线程池建立的线程必须明确标识,就像药品的生产批号同样,为线程自己指定有意思的名称和相应的序列号。拒绝策略应该考虑到业务场景返回相应的提示或者友好的跳转 1 public class UserThreadFactory implements ThreadFactory { 2 private final String namePrefix;
3 private final AtomicInteger nextId = new AtomicInteger(); 4 //定义线程组名称,在使用jstack 来排查问题是,很是有帮助 5 UserThreadFactory(String whatFeatureOfGroup){ 6 namePrefix = "UserThreadFactory's"+whatFeatureOfGroup+"-Worker-"; 7 } 8 9 @Override 10 public Thread newThread(Runnable task){ 11 String name = namePrefix+ nextId.getAndIncrement(); 12 Thread thread = new Thread(null,task,name,0);
//打印threadname 13 return thread; 14 } 15 16 public static void main(String[] args) { 17 UserThreadFactory threadFactory = new UserThreadFactory("你好"); 18 String ss = threadFactory.namePrefix; 19 Task task = new Task(ss); 20 Thread thread = null; 21 for(int i = 0; i < 10; i++) { 22 thread = threadFactory.newThread(task); 23 thread.start(); 24 } 25 26 } 27 } 28 29 class Task implements Runnable{ 30 String poolname; 31 Task(String poolname){ 32 this.poolname = poolname; 33 } 34 private final AtomicLong count = new AtomicLong(); 35 36 @Override 37 public void run(){ 38 System.out.println(poolname+"_running_"+ count.getAndIncrement()); 39 } 40 }
上述示例包括线程工厂和任务执行体的定义,经过newThread 方法快速、统一地建立线程任务,强调线程必定要是欧特定的意义和名称,方便出错时回溯。
下面简单地实现一下RejectedExecutionHandler,实现了接口的rejectExecution方法,打印当前线程池状态,源码以下。
1 public class UserRejectHandler implements RejectedExecutionHandler { 2 3 @Override 4 public void rejectedExecution(Runnable task, ThreadPoolExecutor executor){ 5 System.out.println("task rejected."+ executor.toString()); 6 } 7 }
在ThreadPoolExecutor 中提供了4个公开的内部静态类:
根据以前实现的线程工厂和拒绝策略,线程池的相关代码实现以下:
public class UserThreadPool { public static void main(String[] args) { BlockingQueue queue = new LinkedBlockingQueue(2); UserThreadFactory f1 = new UserThreadFactory("第一机房"); UserThreadFactory f2 = new UserThreadFactory("第二机房"); UserRejectHandler handler = new UserRejectHandler(); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,2,60, TimeUnit.SECONDS,queue,f1,handler); ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1,2,60, TimeUnit.SECONDS,queue,f2,handler); Runnable task1 = new Task(""); for (int i = 0;i<200;i++){ threadPoolExecutor.execute(task1); threadPoolExecutor2.execute(task1); } } }
当任务被拒绝的时候,拒绝策略会打印出当前线程池的大小以及达到了maximumPoolSize=2 ,且队列已满。