线程是并发编程的基础,前面的文章里,咱们的实例基本都是基于线程开发做为实例,而且都是使用的时候就建立一个线程。这种方式比较简单,可是存在一个问题,那就是线程的数量问题。html
假设有一个系统比较复杂,须要的线程数不少,若是都是采用这种方式来建立线程的话,那么就会极大的消耗系统资源。首先是由于线程自己的建立和销毁须要时间,若是每一个小任务都建立一个线程,那么就会大大下降系统的效率。其次是线程自己也是占用内存空间的,大量的线程运行会抢占内存资源,处理不当极可能会内存溢出,这显然不是咱们想看到的。java
那么有什么办法解决呢?有一个好的思路就是对线程进行复用,由于全部的线程并不都是同一时间一块儿运行的,有些线程在某个时刻多是空闲状态,若是这部分空闲线程能有效利用起来,那么就能让线程的运行被充分的利用,这样就不须要建立那么多的线程了。咱们能够把特定数量的线程放在一个容器里,须要使用线程时,从容器里拿出空闲线程使用,线程工做完后不急着关闭,而是退回到线程池等待使用。这样的容器通常被称为线程池。用线程池来管理线程是很是有效的方法,用一张图片能够简单的展现出线程池的管理流程:
编程
Java中也有一套框架来控制管理线程,那就是Executor框架。Executor框架是JDK1.5以后才引入的,位于java.util.cocurrent 包下,能够经过该框架来控制线程的启动、执行和关闭,从而简化并发编程的操做,这是它的核心成员类图:
缓存
Executor:最上层的接口,定义了一个基本方法execute,接受一个Runnable参数,用来替代一般建立或启动线程的方法。多线程
ExecutorService:继承自Executor接口,提供了处理多线程的方法。架构
ScheduledExecutorService:定时调度接口,继承自ExecutorService。并发
AbstractExecutorService:执行框架的抽象类。框架
ThreadPoolExecutor:线程池中最核心的一个类,提供了线程池操做的基本方法。ide
Executors:线程池工厂类,可用于建立一系列有特定功能的线程池。函数
以上Executor框架中的基本成员,其中最核心的的成员无疑就是ThreadPoolExecutor,想了解Java中线程池的运行机制,就必须先了解这个类,而最好的了解方式无疑就是看源码。
打开ThreadPoolExecutor的源码,发现类中提供了四个构造方法
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
能够看出,ThreadPoolExecutor的构造函数中的参数仍是比较多的,而且最核心的是第四个构造函数,其中完成了底层的初始化工做。
下面解释一下构造函数参数的含义:
execute
方法提交的任务。经常使用的有三种队列,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue。threadFactory:线程工厂,用于建立线程。
handler:拒绝策略,当任务太多来不及处理时所采用的处理策略。
看完了构造函数,咱们来看下ThreadPoolExecutor类中几个重要的成员变量:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl:控制线程运行状态的一个字段。同时,根据下面的几个方法runStateOf
,workerCountOf
,ctlOf
能够看出,该字段还包含了两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),而且使用的是Integar类型,高3位保存runState,低29位保存workerCount。
COUNT_BITS:值为29的常量,在字段CAPACITY
被引用计算。
CAPACITY:表示有效线程数量(workerCount)的上限,大小为 (1<<29) - 1。
下面5个变量表示的是线程的运行状态,分别是:
用一个状态转换图表示大概以下 (图片来源于http://www.javashuo.com/article/p-cholswre-gb.html):
构造函数和基本参数都了解后,接下来就是对类中重要方法的研究了。
execute方法
ThreadPoolExecutor类的核心调度方法是execute(),经过调用这个方法能够向线程池提交一个任务,交由线程池去执行。而ThreadPoolExecutor的工做逻辑也能够藉由这个方法来一步步理清。这是方法的源码:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //获取ctl的值,前面说了,该值记录着runState和workerCount int c = ctl.get(); /* * 调用workerCountOf获得当前活动的线程数; * 当前活动线程数小于corePoolSize,新建一个线程放入线程池中; * addWorker(): 把任务添加到该线程中。 */ if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; //若是上面的添加线程操做失败,从新获取ctl值 c = ctl.get(); } //若是当前线程池是运行状态,而且往工做队列中添加该任务 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); /* * 若是当前线程不是运行状态,把任务从队列中移除 * 调用reject(内部调用handler)拒绝接受任务 */ if (! isRunning(recheck) && remove(command)) reject(command); //获取线程池中的有效线程数,若是为0,则执行addWorker建立一个新线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } /* * 若是执行到这里,有两种状况: * 1. 线程池已经不是RUNNING状态; * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize而且workQueue已满。 * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize; * 若是失败则拒绝该任务 */ else if (!addWorker(command, false)) reject(command); }
简单归纳一下代码的逻辑,大概是这样:
一、判断当前运行中的线程数是否小于corePoolSize,是的话则调用addWorker建立线程执行任务。
二、不知足1的条件,就把任务放入工做队列workQueue中。
三、若是任务成功加入workQueue,判断线程池是不是运行状态,不是的话先把任务移出工做队列,并调用reject方法,使用拒绝策略拒绝该任务。线程若是是非运行中,调用addWorker建立一个新线程。
四、若是放入workQueue失败 (队列已满),则调用addWorker建立线程执行任务,若是这时建立线程失败 (addWorker传进去的第二个参数值是false,说明这种状况是当前线程数不小于maximumPoolSize),就会调用reject(内部调用handler)拒绝接受任务。
整个执行流程用一张图片表示大体以下:
以上就是execute方法的大概逻辑,接下来看看addWorker的方法实现。
addWorker方法
源码以下:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); /**线程池状态不为SHUTDOWN时 * 判断队列或者任务是否为空,是的话返回false */. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); /* 这里能够看出core参数决定着活动线程数的大小比较对象 * core为true表示与 corePoolSize大小进行比较 * core为false表示与 maximumPoolSize大小进行比较 * 当前活动线程数大于比较对象就返回false */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 尝试增长workerCount,若是成功,则跳出第一个for循环 if (compareAndIncrementWorkerCount(c)) break retry; // 若是增长workerCount失败,则从新获取ctl的值 c = ctl.get(); // Re-read ctl // 若是当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //建立一个worker对象w w = new Worker(firstTask); //实例化w的线程t final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // workers是一个HashSet,保存着任务的worker对象 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //启动线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
从代码中能够看出,addWorker方法的主要工做是在线程池中建立一个新的线程并执行,其中firstTask参数指定的是新线程须要执行的第一个任务,core参数决定于活动线程数的比较对象是corePoolSize仍是maximumPoolSize。根据传进来的参数首先对线程池和队列的状态进行判断,知足条件就新建一个Worker对象,并实例化该对象的线程,最后启动线程。
Worker类
根据addWorker源码中的逻辑,咱们能够发现,线程池中的每个线程其实都是对应的Worker对象在维护的,因此咱们有必要对Worker类一探究竟,先看一下类的源码:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
从Worker类的构造函数能够看出,当实例化一个Worker对象时,Worker对象会把传进来的Runnable参数firstTask
赋值给本身的同名属性,而且用线程工厂也就是当前的ThreadFactory来新建一个线程。
同时,由于Worker实现了Runnable接口,因此当Worker类中的线程启动时,调用的实际上是run()方法。run方法中调用的是runWorker
方法,咱们来看下它的具体实现:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); //获取第一个任务 Runnable task = w.firstTask; w.firstTask = null; //容许中断 w.unlock(); // allow interrupts //是否由于异常退出循环的标志,processWorkerExit方法会对该参数作判断 boolean completedAbruptly = true; try { //判断task是否为null,是的话经过getTask()从队列中获取任务 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt /* 这里的判断主要逻辑是这样: * 若是线程池正在中止,那么就确保当前线程是中断状态; * 若是不是的话,就要保证不是中断状态 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //用于记录任务执行前须要作哪些事,属于ThreadPoolExecutor类中的方法, //是空的,须要子类具体实现 beforeExecute(wt, task); Throwable thrown = null; try { //执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
总结一下runWorker方法的运行逻辑:
一、经过while循环不断地经过getTask()方法从队列中获取任务;
二、若是线程池正在中止状态,确保当前的线程是中断状态,不然确保当前线程不中断;
三、调用task的run()方法执行任务,执行完毕后须要置为null;
四、循环调用getTask()取不到任务了,跳出循环,执行processWorkerExit()方法。
过完runWorker()的运行流程,咱们来看下getTask()是怎么实现的。
getTask方法
getTask()方法的做用是从队列中获取任务,下面是该方法的源码:
private Runnable getTask() { //记录上次从队列获取任务是否超时 boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //将workerCount减1 decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? /* timed变量用于判断线程的操做是否须要进行超时判断 * allowCoreThreadTimeOut无论它,默认是false * wc > corePoolSize,当前线程是若是大于核心线程数corePoolSize */ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { /* 根据timed变量判断,若是为true,调用workQueue的poll方法获取任务, * 若是在keepAliveTime时间内没有获取到任务,则返回null; * timed为false的话,就调用workQueue的take方法阻塞队列, * 直到队列中有任务可取。 */ Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; //r为null,说明time为true,超时了,把timedOut也设置为true timedOut = true; } catch (InterruptedException retry) { //发生异常,把timedOut也设置为false,从新跑循环 timedOut = false; } } }
getTask的代码看上去比较简单,但其实内有乾坤,咱们来重点分析一下两个if判断的逻辑:
一、当进入getTask方法后,先判断当前线程池状态,若是线程池状态rs >= SHUTDOWN,再进行如下判断:
1)rs 的状态是否大于STOP;2)队列是否为空;
知足以上条件其中之一,就将workerCount减1并返回null,也就是表示队列中再也不有任务。由于线程池的状态值是SHUTDOWN以上时,队列中再也不容许添加新任务,因此上面两个条件知足一个都说明队列中的任务都取完了。
二、进入第二个if判断,这里的逻辑有点绕,但做用比较重要,是为了控制线程池的有效线程数量,咱们来具体解析下代码:
wc > maximumPoolSize
:判断当前线程数是否大于maximumPoolSize,这种状况通常不多发生,除非是maximumPoolSize的大小在该程序执行的同时被进行设置,好比调用ThreadPoolExecutor中的setMaximumPoolSize
方法。
timed && timedOut
:若是为true,表示当前的操做须要进行超时判断,而且上次从队列获取任务已经超时。
wc > 1 || workQueue.isEmpty()
:若是工做线程大于1,或者阻塞队列是空的。
compareAndDecrementWorkerCount
:比较并将线程池中的workerCount减1
在上文中,咱们解析execute方法的逻辑时了解到,若是当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,而且workQueue已满时,仍然能够增长工做线程。
但调用getTask()取任务的过程当中,若是超时没有获取到任务,也就是timedOut为true的状况,说明workQueue已经为空了,也就说明了当前线程池中不须要那么多线程来执行任务了,能够把多于corePoolSize数量的线程销毁掉,也就是不断的让任务被取出,让线程数量保持在corePoolSize便可,直到getTask方法返回null。
而当getTask方法返回null后,runWorker方法中就会由于取不到任务而执行processWorkerExit()方法。
processWorkerExit方法
processWorkerExit方法的做用主要是对worker对象的移除,下面是方法的源码:
private void processWorkerExit(Worker w, boolean completedAbruptly) { //是异常退出的话,执行程序将workerCount数量减1 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; // 从workers的集合中移除worker对象,也就表示着从线程池中移除了一个工做线程 workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
至此,从executor方法开始的整个运行过程就完毕了,总结一下该流程:
执行executor --> 新建Worker对象,并实例化线程 --> 调用runWorker方法,经过getTask()获取任务,并执行run方法 --> getTask()方法中不断向队列取任务,并将workerCount数量减1,直至返回null --> 调用processWorkerExit清除worker对象。
用一张流程图表示以下所示 (图片来源于http://www.javashuo.com/article/p-cholswre-gb.html):
前面咱们屡次提到了workQueue,这是一个任务队列,用来存放等待执行的任务,它是BlockingQueue类型的对象,而在ThreadPoolExecutor的源码注释中,详细介绍了三种经常使用的Queue类型,分别是:
SynchronousQueue:直接提交的队列。这个队列没有容量,当接收到任务的时候,会直接提交给线程处理,而不保留它。若是没有空闲的线程,就新建一个线程来处理这个任务!若是线程数量达到最大值,就会执行拒绝策略。因此,使用这个类型队列的时候,通常都是将maximumPoolSize通常指定成Integer.MAX_VALUE,避免容易被拒绝。
ArrayBlockingQueue:有界的任务队列。须要给定一个参数来限制队列的长度,接收到任务的时候,若是没有达到corePoolSize的值,则新建线程 (核心线程) 执行任务,若是达到了,则将任务放入等待队列。若是队列已满,则在总线程数不到maximumPoolSize的前提下新建线程执行任务,若大于maximumPoolSize,则执行拒绝策略。
LinkedBlockingQueue:无界的任务队列。该队列没有任务数量的限制,因此任务能够一直入队,知道耗尽系统资源。当接收任务,若是当前线程数小于corePoolSize,则新建线程处理任务;若是当前线程数等于corePoolSize,则进入队列等待。
当线程池的任务队列已满而且线程数目达到maximumPoolSize时,对于新加的任务通常会采起拒绝策略,一般有如下四种策略:
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow():
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
代码逻辑就不一一进行解析了,总结一下两个方法的特色就是:
ThreadPoolExecutor的运行机制讲完了,接下来展现一下如何用ThreadPoolExecutor建立线程池实例,具体代码以下:
public static void main(String[] args) { ExecutorService service = new ThreadPoolExecutor(5, 10, 300, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5)); //用lambda表达式编写方法体中的逻辑 Runnable run = () -> { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "正在执行"); } catch (InterruptedException e) { e.printStackTrace(); } }; for (int i = 0; i < 10; i++) { service.execute(run); } //这里必定要作关闭 service.shutdown(); }
上面的代码中,咱们建立的ThreadPoolExecutor线程池的核心线程数为5个,因此,当调用线程池执行任务时,同时运行的线程最多也是5个,执行main方法,输出结果以下:
pool-1-thread-3正在执行 pool-1-thread-1正在执行 pool-1-thread-4正在执行 pool-1-thread-5正在执行 pool-1-thread-3正在执行 pool-1-thread-2正在执行 pool-1-thread-1正在执行 pool-1-thread-4正在执行 pool-1-thread-5正在执行
看到出来,线程池确实只有5个线程在工做,也就是真正的实现了线程的复用,说明咱们的ThreadPoolExecutor实例是有效的。