上一篇从总体上介绍了Executor
接口,从上一篇咱们知道了Executor
框架的最顶层实现是ThreadPoolExecutor
类,Executors
工厂类中提供的newScheduledThreadPool
、newFixedThreadPool
、newCachedThreadPool
方法其实也只是ThreadPoolExecutor
的构造函数参数不一样而已。经过传入不一样的参数,就能够构造出适用于不一样应用场景下的线程池,那么它的底层原理是怎样实现的呢,这篇就来介绍下ThreadPoolExecutor
线程池的运行过程。java
既然要讲运行过程,那么首先要了解下线程池的状态分为哪些?spring
volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3;
ThreadPoolExecutor
代码中定义了上面几个变量:定义了一个volatile变量runState,以及其余几个表示状态的常量。
runState
:初始状态,表示当前线程池的运行状态,它的值就是上面的那4个常量值之一缓存
RUNNING
:线程池接受新任务并执行队列任务中...框架
SHUTDOWN
:再也不接受新任务,可是会继续执行等待队列Queued中的任务。当调用了shutdown()方法,会从 RUNNING -> SHUTDOWN函数
STOP
:再也不接受新任务,同时也不执行等待队列Queued中的任务,而且会尝试终止正在执行中的任务。当调用了shutdownNow()方法, 会从(RUNNING or SHUTDOWN) -> STOP学习
TERMINATED
:线程池中全部线程已经中止运行,其余行为同 STOP状态。this
在讲解运行过程前,咱们先看下ThreadPoolExecutor
中的几个比较重要的成员变量:线程
private final BlockingQueue<Runnable> workQueue; //任务缓存队列,用来保存等待中的任务,等待worker线程空闲时执行任务 private final ReentrantLock mainLock = new ReentrantLock(); //更新 poolSize, corePoolSize,maximumPoolSize, runState, and workers set 时须要持有这个锁 private final HashSet<Worker> workers = new HashSet<Worker>(); //用来保存工做中的执行线程 private volatile long keepAliveTime; //超过corePoolSize外的线程空闲存活之间 private volatile boolean allowCoreThreadTimeOut; //是否对corePoolSize内的线程设置空闲存活时间 private volatile int corePoolSize; //核心线程数 private volatile int maximumPoolSize; //最大线程数(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列) private volatile int poolSize; //线程池中的当前线程数 private volatile RejectedExecutionHandler handler; //任务拒绝策略 private volatile ThreadFactory threadFactory; //线程工厂,用来新建线程 private int largestPoolSize; //记录线程池中出现过的最大线程数大小 private long completedTaskCount; //已经执行完的线程数
这边重点解释下 corePoolSize
、maximumPoolSize
、workQueue
两个变量,这两个变量涉及到线程池中建立线程个数的一个策略。
corePoolSize
: 这个变量咱们能够理解为线程池的核心大小,举个例子来讲明(corePoolSize假设等于10,maximumPoolSize等于20):设计
先看下前一篇文章中的一个例子:代理
ExecutorService executor = Executors.newFixedThreadPool(3); IntStream.range(0, 6).forEach(i -> executor.execute(() -> { String threadName = Thread.currentThread().getName(); System.out.println("finished: " + threadName); }));
上面代码就是新建6个任务,而后扔到线程池中运行,输出线程名称,直到运行完毕。其中最核心的方法就是execute()
方法,虽然submit()
也能够执行任务,但它底层也是调用execute()
方法,因此懂了execute()
的实现原理便可:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { //1. if (runState == RUNNING && workQueue.offer(command)) { //2. if (runState != RUNNING || poolSize == 0) //3. ensureQueuedTaskHandled(command); //4. } else if (!addIfUnderMaximumPoolSize(command)) //5. reject(command); // is shutdown or saturated //6 } }
上面的代码看起来逻辑有点复杂,咱们一个一个看,首先看上面1位置处:
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
是一个或表达式,它分红两部分
addIfUnderCorePoolSize(command)
,这个方法是当线程数小于核心线程数时,用来新建线程执行任务(由于线程数小于corePoolSize时,直接新建线程来运行任务,无论当前线程池里有没有空闲的线程)。若是新建失败,那么进入if语句块,成功了那么execute方法就执行结束了,由于线程已经新建成功了,任务已经开始在线程池中运行。进入if语句块后,看上面代码2.if (runState == RUNNING && workQueue.offer(command))
if (!addIfUnderMaximumPoolSize(command))
,判断新任务用新线程执行是否成功(注:这里的新线程就是咱们上面讲的 “借来的工人” maximumPoolSize)继续进到代码块3 的if语句块if (runState != RUNNING || poolSize == 0)
, 由于新任务加入到等待队列中了,这句判断是为了防止在将此任务添加进任务缓存队列的同时其余线程忽然调用shutdown或者shutdownNow方法关闭了线程池的一种应急措施。若是是的话,应急处理加入的新任务 ensureQueuedTaskHandled(command)
。
咱们看下两个关键方法的实现:
##### 1.addIfUnderCorePoolSize
private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } return t != null; }
首先获取锁,由于涉及到线程池状态的变化。而后再次判断 if (poolSize < corePoolSize && runState == RUNNING)
,在execute()方法中咱们已经判断过一次,这边再次判断是为了防止其余线程又新增了新线程或者调用了shutdown、shutdownNow方法,这边起到了双重检查的一个效果。若是为true
的话,进行t = addThread(firstTask)
新增线程执行任务。addThread方法里面比较简单,就是经过线程工厂建立线程thread,而后封装到Worker对象中,加入到 workers队列中,并执行线程,能够把Worker对象当作是拥有一个线程的对象。
private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); boolean workerStarted = false; if (t != null) { w.thread = t; workers.add(w); int nt = ++poolSize; if (nt > largestPoolSize) largestPoolSize = nt; try { t.start(); workerStarted = true; } } return t; }
这里在介绍下Worker对象, 它实现了Runnable接口,你把它当成Runnable的一个代理类便可,最终也是执行它的run方法。只要注意一下Worker中的beforeExecute
和afterExecute
方法,这两个方法在ThreadPoolExecutor中没有具体实现,用户能够重写这个方法和后面的afterExecute方法来进行一些统计信息,好比某个任务的执行时间等,而afterExecute方法还有一个Throwable t
参数,用户能够用来记录一些异常信息,由于新线程中的异常时捕获不到的,须要在afterExecute中记录。
看起来这个是否是和spring 切面有点像,能够看到 知识都是相通的。
看一下它的run方法:
public void run() { try { hasRun = true; Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { //1 runTask(task); task = null; } } finally { workerDone(this); } }
注意代码块1,能够看到这边在循环获取任务,并执行,直到任务所有执行完毕。除了第一个任务,其余任务都是经过getTask()
方法去取,这个方法是ThreadPoolExecutor中的一个方法。咱们猜一下,整个类中只有任务缓存队列中保存了任务,应该就是去缓存队列中取了。
Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); //取任务 else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //若是线程数大于核心池大小或者容许为核心池线程设置空闲时间, //则经过poll取任务,若等待必定的时间取不到任务,则返回null r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take(); if (r != null) return r; if (workerCanExit()) { //若是没取到任务,即r为null,则判断当前的worker是否能够退出 if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); //中断处于空闲状态的worker return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } }
这里有一个很是巧妙的设计方式,假如咱们来设计线程池,可能会有一个任务分派线程,当发现有线程空闲时,就从任务缓存队列中取一个任务交给 空闲线程执行。可是在这里,并无采用这样的方式,由于这样会要额外地对任务分派线程进行管理,无形地会增长难度和复杂度,这里直接让执行完任务的线程Worker去任务缓存队列里面取任务来执行,由于每个Worker里面都包含了一个线程thread。
这个方法的实现思想和 addIfUnderCorePoolSize方法的实现思想很是类似,惟一的区别在于addIfUnderMaximumPoolSize方法是在线程 池中的线程数达到了核心池大小而且往任务队列中添加任务失败的状况下执行的:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < maximumPoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } return t != null; }
到这里,大部分朋友应该对任务提交给线程池以后到被执行的整个过程有了一个基本的了解,下面总结一下:
这篇写完了,后面会介绍一下任务缓存队列的种类已经缓存的策略以及任务拒绝策略等。若是文章有什么问题,欢迎你们指正,你们互相沟通,互相学习。