Java面试中,线程池也算是一个高频的问题,其实就JDK源码来看线程池这一块的实现代码应该算是写的清晰易懂的,经过这篇文章,咱们就来盘点一下线程池的知识点。java
本文基于JDK1.8源码进行分析面试
首先看下线程池构造函数:并发
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { //忽略赋值与校验逻辑 }
构造参数比较多,一个一个说下:函数
当咱们向线程池提交任务时,一般使用execute方法,接下来就先从该方法开始分析。
在分析execute代码以前,须要先说明下,咱们都知道线程池是维护了一批线程来处理用户提交的任务,达到线程复用的目的,线程池维护的这批线程被封装成了Worker。this
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //JDK8的源码中,线程池自己的状态跟worker数量使用同一个变量ctl来维护 int c = ctl.get(); //经过位运算得出固然线程池中的worker数量与构造参数corePoolSize进行比较 if (workerCountOf(c) < corePoolSize) { //若是小于corePoolSize,则直接新增一个worker,并把固然用户提交的任务command做为参数,若是成功则返回。 if (addWorker(command, true)) return; //若是失败,则获取最新的线程池数据 c = ctl.get(); } //若是线程池仍在运行,则把任务放到阻塞队列中等待执行。 if (isRunning(c) && workQueue.offer(command)) { //这里的recheck思路是为了处理并发问题 int recheck = ctl.get(); //当任务成功放入队列时,若是recheck发现线程池已经再也不运行了则从队列中把任务删除 if (! isRunning(recheck) && remove(command)) //删除成功之后,会调用构造参数传入的拒绝策略。 reject(command); //若是worker的数量为0(此时队列中可能有任务没有执行),则新建一个worker(因为此时新建woker的目的是执行队列中堆积的任务, //所以入参没有执行任务,详细逻辑后面会详细分析addWorker方法)。 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //若是前面的新增woker,放入队列都失败,则会继续新增worker,此时线程池的状态是woker数量达到corePoolSize,阻塞队列任务已满 //只能基于maximumPoolSize参数新建woker else if (!addWorker(command, false)) //若是基于maximumPoolSize新建woker失败,此时是线程池中线程数已达到上限,队列已满,则调用构造参数中传入的拒绝策略 reject(command); }
源码里我增长了不少注释,须要多读几遍才能彻底理解,总结一下用户向线程池提交任务之后,线程池的执行逻辑:spa
从execute的源码能够看出addWorker方法是重中之重,立刻来看下它的实现。
addWorker方法:线程
private boolean addWorker(Runnable firstTask, boolean core) { //这里有一段基于CAS+死循环实现的关于线程池状态,线程数量的校验与更新逻辑就先忽略了,重点看主流程。 //... boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //把指定任务做为参数新建一个worker线程 w = new Worker(firstTask); //这里是重点,咋一看,必定觉得w.thread就是咱们传入的firstTask //实际上是经过线程池构造函数参数threadFactory生成的woker对象 //也就是说这个变量t就是表明woker线程。绝对不是用户提交的线程任务firstTask!!! final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //加锁以后仍旧是判断线程池状态等一些校验逻辑。 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); //把新建的woker线程放入集合保存,这里使用的是HashSet workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //而后启动woker线程 //这里再强调一遍上面说的逻辑,该变量t表明woker线程,也就是会调用woker的run方法 t.start(); workerStarted = true; } } } finally { if (! workerStarted) //若是woker启动失败,则进行一些善后工做,好比说修改当前woker数量等等 addWorkerFailed(w); } return workerStarted; }
addWorker方法主要作的工做就是新建一个Woker线程,加入到woker集合中,而后启动该线程,那么接下来的重点就是Woker类的run方法了。code
worker执行方法:对象
//Woker类实现了Runnable接口 public void run() { runWorker(this); } //最终woker执行逻辑走到了这里 final void runWorker(Worker w) { Thread wt = Thread.currentThread(); //task就是Woker构造函数入参指定的任务,即用户提交的任务 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { //通常状况下,task都不会为空(特殊状况上面注释中也说明了),所以会直接进入循环体中 //这里getTask方法是要重点说明的,它的实现跟咱们构造参数设置存活时间有关 //咱们都知道构造参数设置的时间表明了线程池中的线程,即woker线程的存活时间,若是到期则回收woker线程,这个逻辑的实现就在getTask中。 //来不及执行的任务,线程池会放入一个阻塞队列,getTask方法就是去阻塞队列中取任务,用户设置的存活时间,就是 //从这个阻塞队列中取任务等待的最大时间,若是getTask返回null,意思就是woker等待了指定时间仍然没有 //取到任务,此时就会跳过循环体,进入woker线程的销毁逻辑。 while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //该方法是个空的实现,若是有须要用户能够本身继承该类进行实现 beforeExecute(wt, task); Throwable thrown = null; try { //真正的任务执行逻辑 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //该方法是个空的实现,若是有须要用户能够本身继承该类进行实现 afterExecute(task, thrown); } } finally { //这里设为null,也就是循环体再执行的时候会调用getTask方法 task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //当指定任务执行完成,阻塞队列中也取不到可执行任务时,会进入这里,作一些善后工做,好比在corePoolSize跟maximumPoolSize之间的woker会进行回收 processWorkerExit(w, completedAbruptly); } }
woker线程的执行流程就是首先执行初始化时分配给的任务,执行完成之后会尝试从阻塞队列中获取可执行的任务,若是指定时间内仍然没有任务能够执行,则进入销毁逻辑。
注:这里只会回收corePoolSize与maximumPoolSize直接的那部分woker继承
理解了整个线程池的运行原理之后,再来看下JDK默认提供的线程池类型就会一目了然了:
public static ExecutorService newFixedThreadPool(int nThreads) { //corePoolSize跟maximumPoolSize值同样,同时传入一个无界阻塞队列 //根据上面分析的woker回收逻辑,该线程池的线程会维持在指定线程数,不会进行回收 return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
public static ExecutorService newSingleThreadExecutor() { //线程池中只有一个线程进行任务执行,其余的都放入阻塞队列 //外面包装的FinalizableDelegatedExecutorService类实现了finalize方法,在JVM垃圾回收的时候会关闭线程池 return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
public static ExecutorService newCachedThreadPool() { //这个线程池corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE,意思也就是说来一个任务就建立一个woker,回收时间是60s return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
最后再说说初始化线程池时线程数的选择:
上述只是一个基本思想,若是真的须要精确的控制,仍是须要上线之后观察线程池中线程数量跟队列的状况来定。