咱们在关闭线程池的时候会使用shutdown()和shutdownNow(),那么问题来了:java
为了解决这些疑问咱们须要分析java线程池的原理。ide
日常咱们在建立线程池常用的方式以下:oop
ExecutorService executorService = Executors.newFixedThreadPool(5);
看下newFixedThreadPool源码, 其实Executors是个工厂类,内部是new了一个ThreadPoolExecuto:源码分析
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
参数的意义就不介绍了,网上有不少内容,看源码注释也能够明白。ui
线程池中类的继承关系以下:this
将一个Runnable放到线程池执行有两种方式,一个是调用ThreadPoolExecutor#submit,一个是调用ThreadPoolExecutor#execute。其实submit是将Runnable封装成了一个RunnableFuture,而后再调用execute,最终调用的仍是execute,因此咱们这里就只从ThreadPoolExecutor#execute开始分析。.net
ThreadPoolExecutor中有个重要的属性是ctl线程
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 高3位表示状态,低29位表示线程池中线程的多少 private static final int COUNT_BITS = Integer.SIZE - 3; // 32-3 = 29 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 左移29为减1,即最终获得为高3位为0,低29位为1的数字,做为掩码,是二进制运算中经常使用的方法 private static final int RUNNING = -1 << COUNT_BITS; // 高三位111 private static final int SHUTDOWN = 0 << COUNT_BITS; // 高三位000 private static final int STOP = 1 << COUNT_BITS; // 高三位001 private static final int TIDYING = 2 << COUNT_BITS; // 高三位010 private static final int TERMINATED = 3 << COUNT_BITS; // 高三位011 // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } // 保留高3位,即计算线程池状态 private static int workerCountOf(int c) { return c & CAPACITY; } // 保留低29位, 即计算线程数量 private static int ctlOf(int rs, int wc) { return rs | wc; } // 求ctl
ThreadPoolExecutor中使用32位Integer来表示线程池的状态和线程的数量,其中高3位表示状态,低29位表示数量。若是对二进制运行不熟悉能够参考:二进制运算。从上也能够看出线程池有五种状态,咱们关心前3中状态rest
明白了ctl和线程池的状态后咱们来具体看下execute的处理逻辑code
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // 线程数量小于coresize,那么就调用addWorker if (addWorker(command, true)) // 这里知道,返回true就不往下走了 return; c = ctl.get(); } // 不知足上述条件,即线程数量 >= coreSize,或者addWorker返回fasle,那么走下面的逻辑 if (isRunning(c) && workQueue.offer(command)) { // 能够看到是往blockingqueue中放task int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 若是不知足上述条件,即blockingqueue也放不进去,那么就走下面的逻辑 else if (!addWorker(command, false)) reject(command); }
从上面的代码我们能够看到线程池处理线程的基本思路是: 若是线程数量小于coresize那么就执行task,不然就放到queue中,若是queue也放不下就走下面addWorker,若是也失败了,那么就调用reject策略。固然还涉及一些细节,须要进一步分析。
execute中反复调用的是addWorker
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 计算线程池状态 // Check if queue empty only if necessary. if (rs >= SHUTDOWN && // 先忽略 ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 线程数量 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) // 可见若是超过了运行的最大线程数量则返回false return false; if (compareAndIncrementWorkerCount(c)) // 若是成功,线程数量确定加1 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); // 将task封装成了Worker final Thread t = w.thread; // 来获取worker的thread if (t != null) { mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); // 将worker添加到hashset中报存,关闭的时候要使用 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 通过一些检查, 启动了work的thread t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); // 若是线程启动失败,则将线程数减1 } return workerStarted; }
上面的代码看起来比较复杂,可是若是咱们忽略具体的细节,从大体思路上看,其实也比较简单。上面代码的主要思路就是:除了一些状态检查外,首先将线程数量加1,而后将runnable分装成一个worker,去启动worker线程,若是启动失败则再将线程数量减1。返回false的缘由多是线程数量大于容许的数量。因此addWorker调用成功,则会启动一个work线程,且线程池中线程数量加1
woker是线程池中真正的线程实体。线程池中的线程不是自定义的Runnable实现的线程,而是woker线程,worker在run方法里调用了自定义的Runnable的run方法。
Worker继承了AQS,并实现了runnable接口:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); // 这个时候回头看看addWorker中t.start(), 就明白了启动的实际是一个Woker线程,而不是用户定义的Runnable } public void run() { runWorker(this); } }
Worker中firstTask存储了用户定义的Runnable,thread是以他自身为参数的Thread对象。getThreadFactory()默认返回是Executors#DefaultThreadFactory,用来新建线程,并定义了线程名称的前缀等:
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; // } public Thread newThread(Runnable r) { // 调用后新建一个线程 Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
Worker的run方法调用了runWorker,并将自身做为参数传了进去,下面看看问题的关键:runWorker:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { // 注意这里的while循环,这里很关键。这里注意,若是两个条件都知足了,那么线程就结束了 w.lock(); // 注意worker继承了AQS,至关于本身实现了锁,这个在关闭线程的时候有用 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); // 仅仅是回调了Runnable的run方法 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; // 重点,task执行完后就被置位null w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); // 注意while循环结束后worker线程就结束了 } }
runWorker中有个while循环,while中判断条件为(task != null || (task = getTask()) != null)。假设咱们按照正常的逻辑,即task != null,则会调用task.run方法,执行完run方法后而后在finally中task被置为null;接着又进入while循环判断,此次task == null,因此不符合第一个判断条件,则会继续判断 task == getTask()) != null。咱们来看下getTask作了什么。
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 当调用shutdown()方法的时候,线程状态就为shutdown了; 当调用shutdownow()的时候,线程状态就为stop了 decrementWorkerCount(); return null; } boolean timed; // Are workers subject to culling? for (;;) { // 经过死循环设置状态 int wc = workerCountOf(c); // 设置容许core线程timeout或者线程数量大于coresize,则容许线程超时 timed = allowCoreThreadTimeOut || wc > corePoolSize; // 若是线程数量 <= 最大线程数 且 没有超时和容许超时 则跳出死循环 if (wc <= maximumPoolSize && ! (timedOut && timed)) break; if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } try { // 这里是关键,若是容许超时则调用poll从queue中取出task,不然就调用take可阻塞的获取task Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) // 获取到task则返回,而后runWorker的while循环就继续执行,并调用task的run方法 return r; timedOut = true; // 不然设置为timeOut,继续循环,可是下次循环会走到if (compareAndDecrementWorkerCount(c)) 处,并返回null。 } catch (InterruptedException retry) { timedOut = false; } } }
忽略掉具体细节,getTask的总体思路是: 从blockqueue中拿去task,若是queue中没有task则分两种状况:
这个时候回头再看下runWorker,若是task != null,那么就会执行task的run方法,执行完后task就会为被置为null,再次进入while循环执行getTask阻塞在这里了。经过这种方式保留住了线程。若是while循环结束了,那么worker线程也就结束了。
分析到这里咱们再来看下addWoker。addWorker能够将第一个参数设置为null。例如ThreadPoolExecutor#prestartAllCoreThreads:
public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) // addWorker第一个参数是null ++n; return n; }
通过前面的分析,咱们知道addWoker用来启动一个worker线程,worker线程调用runWorker来执行,而runWorker中有个while循环,判断条件是(task != null || (task = getTask()) != null)。由于咱们传入的task为null,因此就会判断task = getTask()) != null,而getTask就是去blockqueue中拿去数据,若是没有任务就会阻塞住。这个时候就是一个阻塞的线程在等待task的到来了。因此传入参数为null表示建立一个空的线程,什么都不执行。
已经知道了线程池内部的大概工做状况,咱们再来看下若是全部core线程都建立好了且处于空置状态,这个时候新放入一个线程的执行流程。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // core线程都建立好了,因此判断条件不知足 if (addWorker(command, true)) return; c = ctl.get(); } // 会走到这里,会经过offer往blockingqueue里放置一个task。这个时候阻塞的core线程会经过blockingqueue的take拿到task执行,相似一个生产者消费者的状况 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 若是blockingqueue添加失败,则建立线程直到maxsize else if (!addWorker(command, false)) reject(command); }
可见,线程和execute经过blockingqueue来通讯,而不是其余方式,execute往blockingqueue中放置task,线程经过take来获取。总体线程池的逻辑以下图
这个时候咱们终于能够来看看shutdown和shutdownNow了
看下shutdown
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); // 重点,将线程状态置为shutdown,这样getTask等workqueue为空后就返回null了 interruptIdleWorkers(); // 重点 onShutdown(); // 什么都没作 } finally { mainLock.unlock(); } tryTerminate(); } private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // 线程没有中断 且 获取到worker的锁 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); // 调用interrup,中断线程 } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
总结下就是: shutdown会把它被调用前放到线程池中的task所有执行完。
再来看下shutdownNow
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); // 重点,将线程状态置为stop interruptWorkers(); // 重点 tasks = drainQueue(); // 重点 } finally { mainLock.unlock(); } tryTerminate(); return tasks; } private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { // 没有去获取woker的锁 try { t.interrupt(); } catch (SecurityException ignore) { } } } private List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; List<Runnable> taskList = new ArrayList<Runnable>(); q.drainTo(taskList); // 将blockingqueue中的task清空 if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; }
从上面的代码能够看出:
总结就是shutdownNow比较粗暴,调用他后,他会将全部以前提交的任务都interrupt,且将blockingqueue中的task清空
另外就是不管是shutdown仍是shutdownNow都是调用Thread的interrupt()方法。若是task不响应中断或者忽略中断标记,那么这个线程就不会被终止。例如在run中执行如下逻辑
poolExecutor.execute(new Runnable() { @Override public void run() { while (true) { System.out.println("b"); try { Thread.sleep(1000); } catch (InterruptedException e) { System.out.printf("不处理"); // 忽略中断 } } } });
运行结果是,即便调用了shutdownNow也终止不了线程运行
b 0 不处理b b b b b ....