随着业务的发展,单线程已经远远不能知足,随即就有多线程的出现。多线程虽然能解决单线程解决不了的事情,可是它也会给你带来额外的问题。好比成千上万甚至上百万的线程时候,你系统就会出现响应延迟、卡机、甚至直接卡死的状况。为何会出现这样的缘由呢?由于为每一个请求建立一个新线程的开销很大:在建立和销毁线程上花费的时间和消耗的系统资源要比花在处理实际的用户请求的时间和资源更多。html
除了建立和销毁线程的开销以外,活动的线程也消耗系统资源。在一个 JVM里建立太多的线程可能会致使系统因为过分消耗内存而用完内存或“切换过分”。因此为了防止资源不足,服务器应用程序须要一些办法来限制任何给定时刻处理的请求数目。而线程池为线程生命周期开销问题和资源不足问题提供了解决方案。java
一、下降资源消耗,防止资源不足。合理配置线程池中的线程大小,防止请求线程猛增;另外经过重复利用已建立的线程下降线程建立和销毁形成的消耗。
二、提升响应速度。线程池能够经过对多个任务重用线程,在请求到达时线程已经存在(若是有空闲线程时),因此无心中也消除了线程建立所带来的延迟。这样,就能够当即为请求服务,使应用程序响应更快。
三、提升线程的可管理性。使用线程池能够统一分配、调优和监控线程。数据库
上面知道了线程池的做用,那么线程池它是如何工做的呢?其使用核心类是哪个呢?因此要作到合理利用线程池,必须对其实现原理了如指掌。编程
java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此必须了解这个类的用法及其内部原理,下面咱们来看下ThreadPoolExecutor类的具体源码解析。 缓存
3.1 继承关系服务器
经过类的继承关系能够得知哪些方法源于哪里(具体请看代码),下面直接给出类的继承结构的图:多线程
3.2 构造方法 并发
在ThreadPoolExecutor类中提供了四个构造方法:框架
1 // 五个参数的构造函数 2 public class ThreadPoolExecutor extends AbstractExecutorService { 3 public ThreadPoolExecutor(int corePoolSize, 4 int maximumPoolSize, 5 long keepAliveTime, 6 TimeUnit unit, 7 BlockingQueue<Runnable> workQueue) { 8 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 9 Executors.defaultThreadFactory(), defaultHandler); 10 } 11 // 六个参数的构造函数-1 12 public ThreadPoolExecutor(int corePoolSize, 13 int maximumPoolSize, 14 long keepAliveTime, 15 TimeUnit unit, 16 BlockingQueue<Runnable> workQueue, 17 ThreadFactory threadFactory) { 18 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 19 threadFactory, defaultHandler); 20 } 21 22 //六个参数的构造函数 -2 23 public ThreadPoolExecutor(int corePoolSize, 24 int maximumPoolSize, 25 long keepAliveTime, 26 TimeUnit unit, 27 BlockingQueue<Runnable> workQueue, 28 RejectedExecutionHandler handler) { 29 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 30 Executors.defaultThreadFactory(), handler); 31 } 32 // 七个参数的构造函数 33 public ThreadPoolExecutor(int corePoolSize, 34 int maximumPoolSize, 35 long keepAliveTime, 36 TimeUnit unit, 37 BlockingQueue<Runnable> workQueue, 38 ThreadFactory threadFactory, 39 RejectedExecutionHandler handler) { 40 if (corePoolSize < 0 || 41 maximumPoolSize <= 0 || 42 maximumPoolSize < corePoolSize || 43 keepAliveTime < 0) 44 throw new IllegalArgumentException(); 45 if (workQueue == null || threadFactory == null || handler == null) 46 throw new NullPointerException(); 47 this.corePoolSize = corePoolSize; 48 this.maximumPoolSize = maximumPoolSize; 49 this.workQueue = workQueue; 50 this.keepAliveTime = unit.toNanos(keepAliveTime); 51 this.threadFactory = threadFactory; 52 this.handler = handler; 53 }
从源代码中发现前面三个构造器都是调用的第四个构造器进行的初始化工做,那就以第四个构造函数为例,解释下其中各个参数的含义(留意源码中每一个字段上的注释):ide
2. int maximumPoolSize:线程池容许建立的最大线程数。
3. long keepAliveTime:空闲线程等待超时的时间
4. TimeUnit unit:参数keepAliveTime的时间单位。共有七种单位,以下:
public enum TimeUnit { /** * 纳秒=千分之一微妙 */ NANOSECONDS {...}, /** * 微妙=千分之一毫秒 */ MICROSECONDS {...}, /** * 毫秒 */ MILLISECONDS {...}, /** * 秒 */ SECONDS {...}, /** * 分钟 */ MINUTES {...}, /** * 小时 */ HOURS {...}, /** * 天 */ DAYS {...}; }
5. BlockingQueue<Runnable> workQueue: 任务队列,用于保存等待执行任务的阻塞队列。队列也有好几种详细请看这里,这里就不作解释了。
6. ThreadFactory threadFactory:线程工厂,主要用于建立线程。其中能够指定线程名字(千万别忽略这件小事,有意义的名字能让你快速定位到源码中的线程类)
7. RejectedExecutionHandler handler:饱和策略,当队列和线程池都满了,说明线程处于饱和状态,那么后续进来的任务须要一种策略处理。默认状况下是AbortPolicy:表示没法处理新任务时抛出异常。线程池框架提供了如下4中策略(固然也能够本身自定义策略:经过实现RejectedExecutionHandler接口自定义策略):
3.3 重要参数方法和方法解读
1. 线程池状态
// 初始值 -536870912 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 初始值 29 private static final int COUNT_BITS = Integer.SIZE - 3; // 初始值 536870911 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // RUNNING状态:接受新任务并处理排队任务 private static final int RUNNING = -1 << COUNT_BITS; // SHUTDOWN状态:不接受新任务,但处理排队任务 private static final int SHUTDOWN = 0 << COUNT_BITS; // STOP状态:不接受新任务,不处理排队任务,并中断正在进行的任务 private static final int STOP = 1 << COUNT_BITS; // All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method private static final int TIDYING = 2 << COUNT_BITS; // TERMINATED: terminated() has completed private static final int TERMINATED = 3 << COUNT_BITS; // 获取线程池状态,取前三位 private static int runStateOf(int c) { return c & ~CAPACITY; } // 获取当前正在工做的worker,主要是取后面29位 private static int workerCountOf(int c) { return c & CAPACITY; } // 生成ctl private static int ctlOf(int rs, int wc) { return rs | wc; }
当建立线程池后,初始时,线程池处于RUNNING状态;
RUNNING -> SHUTDOWNN:若是调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不可以接受新的任务,它会等待全部任务执行完毕;
(RUNNING or SHUTDOWN) -> STOP:若是调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,而且会去尝试终止正在执行的任务;
SHUTDOWN -> TIDYING or STOP -> TIDYING :当线程池处于SHUTDOWN或STOP状态,而且全部工做线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
2. 线程池中的线程初始化
在说corePoolSize参数时有说到初始化线程池的两个方法,其实在默认状况下,建立线程池以后线程池中是没有线程的,须要提交任务以后才会建立线程。因此若是想在建立线程池以后就建立线程的话,能够经过下面两个方法建立:
/** * 单个建立核心线程 */ public boolean prestartCoreThread() { return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true); } /** * 启动全部核心线程 */ public int prestartAllCoreThreads() { int n = 0; // 添加工做线程 while (addWorker(null, true)) ++n; return n; }
3. 建立线程:addWorker()
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 获取运行状态 int rs = runStateOf(c); /** * 若是当前的线程池的状态>SHUTDOWN 那么拒绝Worker的add 若是=SHUTDOWN * 那么此时不能新加入不为null的Task,若是在WorkCount为empty的时候不能加入任何类型的Worker, * 若是不为empty能够加入task为null的Worker,增长消费的Worker */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 获取有效线程数,并判断//若是当前的数量超过了CAPACITY,或者超过了corePoolSize和maximumPoolSize(试core而定),则直接返回 int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // //CAS尝试增长线程数,若是失败,证实有竞争,那么从新到retry。 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // 继续判断当前线程池的运行状态 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } /** * 新建任务 */ Worker w = new Worker(firstTask); Thread t = w.thread; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); /** * rs!=SHUTDOWN ||firstTask!=null * * 一样检测当rs>SHUTDOWN时直接拒绝减少Wc,同时Terminate,若是为SHUTDOWN同时firstTask不为null的时候也要Terminate */ if (t == null || (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null))) { decrementWorkerCount(); tryTerminate(); return false; } workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } finally { mainLock.unlock(); } t.start(); //Stop或线程Interrupt的时候要停止全部的运行的Worker if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted()) t.interrupt(); return true; }
从上面能够看出:
在rs>SHUTDOWN时,拒绝一切线程的增长,由于STOP是会终止全部的线程,同时移除Queue中全部的待执行的线程的,因此也不须要增长first=null的Worker了。
其次,在SHUTDOWN状态时,是不能增长first!=null的Worker的,同时即便first=null,可是此时Queue为Empty也是不容许增长Worker的,SHUTDOWN下增长的Worker主要用于消耗Queue中的任务。
SHUTDOWN状态时,是不容许向workQueue中增长线程的,isRunning(c) && workQueue.offer(command) 每次在offer以前都要作状态检测,也就是线程池状态变为>=SHUTDOWN时不容许新线程进入线程池了。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. * 原注释已经讲的很清楚了,主要分三步进行: */ int c = ctl.get(); // 一、若是线程数小于基本线程数,则建立线程并执行当前任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 二、若是任务能够排队,则会从新检查看是否能够启动新的任务仍是拒绝任务 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 三、若是咱们没法排队任务,那么咱们尝试添加一个新线程。 若是失败,咱们知道咱们已关闭或饱和,所以拒绝该任务。 else if (!addWorker(command, false)) reject(command); }
注意:该方法是没有返回值的,若是想获取线程执行后的结果能够调用submit方法(固然它底层也是调用execute()方法)
五、线程池关闭:
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
从上线的源码分析后,应该知道线程池处理任务的大概流程了,下面统一梳理下当线程池接到任务时的处理流程:
一、线程池首先会判断核心线程池是否已满(核心线程数是否超过corePoolSize),若没有则建立新的核心线程来处理任务,不然进行第二步;
二、接着会判断阻塞队列是否已满(因此推荐使用有界队列),若是没有满则进入阻塞队列等待执行,不然进行第三步;
三、而后线程池会判断整个线程池是否已满(整个线程数是否超过maximunPoolSize),若没有则建立新线程处理任务,不然交个饱和策略处理新的任务。
一、建立线程或线程池时请指定有意义的线程名称,方便回溯。来源《阿里巴巴 Java开发手册》
二、线程池不容许使用 Executors 去建立,而是经过 ThreadPoolExecutor 的方式,这样 的处理方式让写的同窗更加明确线程池的运行规则,规避资源耗尽的风险。来源《阿里巴巴 Java开发手册》
说明:Executors 返回的线程池对象的弊端以下:
1)FixedThreadPool 和 SingleThreadPool: 容许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而致使 OOM。
2)CachedThreadPool 和 ScheduledThreadPool: 容许的建立线程数量为 Integer.MAX_VALUE,可能会建立大量的线程,从而致使 OOM。
三、合理配置线程池大小,能够从如下几个角度来进行分析:
比方说:对于 CPU 密集型的计算场景,理论上“线程的数量 = CPU核数”是最合适的。
若是是IO密集型任务,参考值能够设置为CPU 核数 * [ 1 +(I/O 耗时 / CPU耗时)]
注意:以上值仅供参考,须要根据具体实际状况(压测)而定。
四、建议使用有界队列
五、合理设置空闲线程等待时间。
若是任务不少且每一个任务执行的时间比较短,则能够调大时间,提升线程利用率。
https://www.cnblogs.com/dolphin0520/p/3932921.html
http://ifeve.com/java-threadpool/
《Java并发编程的艺术》
《阿里巴巴Java开发手册》