在Java面试中,线程池相关知识,虽不能说是必问提,但出现的频次也是很是高的。同时又鉴于公众号“程序新视界”的读者后台留言让写一篇关于Java线程池的文章,因而就有本篇内容,本篇将基于Java线程池的原理、实现以及相关源码进行讲解等。java
线程池是一种多线程处理形式,处理过程当中将任务提交到线程池,任务的执行交由线程池来管理。面试
为了充分利用CPU多核资源,应用都会采用多线程并行/并发计算,最大限度的利用多核提高应用程序性能。spring
试想一下,若是每一个请求都执行一遍建立线程、执行任务、销毁线程,那么对服务器资源将是一种浪费。在高并发的状况下,甚至会耗尽服务器资源。apache
线程池的主要做用有两个:不一样请求之间重复利用线程,无需频繁的建立和销毁线程,下降系统开销和控制线程数量上限,避免建立过多的线程耗尽进程内存空间,同时减小线程上下文切换次数。数组
在JDK5版本中增长了内置线程池实现ThreadPoolExecutor,同时提供了Executors来建立不一样类型的线程池。Executors中提供了如下常见的线程池建立方法:缓存
虽然在JDK中提供Executors类来支持以上类型的线程池建立,但一般状况下不建议开发人员直接使用(见《阿里巴巴java开发规范》)。服务器
线程池不容许使用Executors去建立,而是经过ThreadPoolExecutor的方式,这样的处理方式让写的同窗更加明确线程池的运行规则,规避资源耗尽的风险。微信
Executors部分方法的弊端:多线程
同时,阿里巴巴java开发规范中推荐了3种线程池建立方式。并发
方式一,引入commons-lang3包。
//org.apache.commons.lang3.concurrent.BasicThreadFactory ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
方式二,引入com.google.guava包。
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("demo-pool-%d").build(); //Common Thread Pool ExecutorService pool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); pool.execute(()-> System.out.println(Thread.currentThread().getName())); pool.shutdown();//gracefully shutdown
方式三,spring配置线程池方式:自定义线程工厂bean须要实现ThreadFactory,可参考该接口的其它默认实现类,使用方式直接注入bean,调用execute(Runnable task)方法便可。
<bean id="userThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="10" /> <property name="maxPoolSize" value="100" /> <property name="queueCapacity" value="2000" /> <property name="threadFactory" value= threadFactory /> <property name="rejectedExecutionHandler"> <ref local="rejectedExecutionHandler" /> </property> </bean> // in code userThreadPool.execute(thread);
除了以上推荐的建立线程池的方法,还能够经过ThreadPoolExecutor的构造方法,直接建立线程池。本质上来说,以上方法最终也是建立了ThreadPoolExecutor对象,而后堆积进行包装处理。
ThreadPoolExecutor提供了多个构造方法,咱们最终都调用的构造方法来进行说明。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // 省略代码 }
核心参数做用解析以下:
构造方法的中最后的参数RejectedExecutionHandler用于指定线程池的拒绝策略。当请求任务不断的过来,而系统此时又处理不过来的时候,咱们就须要采起对应的策略是拒绝服务。
默认有四种类型:
固然,除了默认的4种策略以外,还能够根据业务需求自定义拒绝策略。经过实现RejectedExecutionHandler接口,在建立ThreadPoolExecutor对象时做为参数传入便可。
在spring-integration-core中便自定义了CallerBlocksPolicy,相关代码以下:
public class CallerBlocksPolicy implements RejectedExecutionHandler { private static final Log logger = LogFactory.getLog(CallerBlocksPolicy.class); private final long maxWait; public CallerBlocksPolicy(long maxWait) { this.maxWait = maxWait; } public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (!executor.isShutdown()) { try { BlockingQueue<Runnable> queue = executor.getQueue(); if (logger.isDebugEnabled()) { logger.debug("Attempting to queue task execution for " + this.maxWait + " milliseconds"); } if (!queue.offer(r, this.maxWait, TimeUnit.MILLISECONDS)) { throw new RejectedExecutionException("Max wait time expired to queue task"); } else { if (logger.isDebugEnabled()) { logger.debug("Task execution queued"); } } } catch (InterruptedException var4) { Thread.currentThread().interrupt(); throw new RejectedExecutionException("Interrupted", var4); } } else { throw new RejectedExecutionException("Executor has been shut down"); } } }
建立完成ThreadPoolExecutor以后,当向线程池提交任务时,一般使用execute方法。execute方法的执行流程图以下:
下面看一下JDK8中ThreadPoolExecutor中execute方法的源代码实现:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 线程池自己的状态跟worker数量使用同一个变量ctl来维护 int c = ctl.get(); // 经过位运算得出固然线程池中的worker数量与构造参数corePoolSize进行比较 if (workerCountOf(c) < corePoolSize) { // 若是小于corePoolSize,则直接新增一个worker,并把固然用户提交的任务command做为参数,若是成功则返回。 if (addWorker(command, true)) return; // 若是失败,则获取最新的线程池数据 c = ctl.get(); } // 若是线程池仍在运行,则把任务放到阻塞队列中等待执行。 if (isRunning(c) && workQueue.offer(command)) { // 这里的recheck思路是为了处理并发问题 int recheck = ctl.get(); // 当任务成功放入队列时,若是recheck发现线程池已经再也不运行了则从队列中把任务删除 if (! isRunning(recheck) && remove(command)) //删除成功之后,会调用构造参数传入的拒绝策略。 reject(command); // 若是worker的数量为0(此时队列中可能有任务没有执行),则新建一个worker(因为此时新建woker的目的是执行队列中堆积的任务, // 所以入参没有执行任务,详细逻辑后面会详细分析addWorker方法)。 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 若是前面的新增woker,放入队列都失败,则会继续新增worker,此时线程池的状态是woker数量达到corePoolSize,阻塞队列任务已满 // 只能基于maximumPoolSize参数新建woker else if (!addWorker(command, false)) // 若是基于maximumPoolSize新建woker失败,此时是线程池中线程数已达到上限,队列已满,则调用构造参数中传入的拒绝策略 reject(command); }
下面再看在上述代码中调用的addWorker方法的源代码实现及解析:
private boolean addWorker(Runnable firstTask, boolean core) { // 这里有一段基于CAS+死循环实现的关于线程池状态,线程数量的校验与更新逻辑就先忽略了,重点看主流程。 //... boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 把指定任务做为参数新建一个worker线程 w = new Worker(firstTask); // 这里是重点w.thread是经过线程池构造函数参数threadFactory生成的woker对象 // 也就是说这个变量t就是表明woker线程。绝对不是用户提交的线程任务firstTask。 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 加锁以后仍旧是判断线程池状态等一些校验逻辑。 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); // 把新建的woker线程放入集合保存,这里使用的是HashSet workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 而后启动woker线程 // 该变量t表明woker线程,会调用woker的run方法 t.start(); workerStarted = true; } } } finally { if (! workerStarted) // 若是woker启动失败,则进行一些善后工做,好比说修改当前woker数量等 addWorkerFailed(w); } return workerStarted; }
addWorker方法主要作的工做就是新建一个Woker线程,加入到woker集合中。在上述方法中会调用到Worker类的run方法,并最终执行了runWorker方法。
// Woker类实现了Runnable接口 public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // task就是Woker构造函数入参指定的任务,即用户提交的任务 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { //通常状况下,task都不会为空(特殊状况上面注释中也说明了),所以会直接进入循环体中 //这里getTask方法是要重点说明的,它的实现跟咱们构造参数设置存活时间有关 //咱们都知道构造参数设置的时间表明了线程池中的线程,即woker线程的存活时间,若是到期则回收woker线程,这个逻辑的实现就在getTask中。 //来不及执行的任务,线程池会放入一个阻塞队列,getTask方法就是去阻塞队列中取任务,用户设置的存活时间,就是 //从这个阻塞队列中取任务等待的最大时间,若是getTask返回null,意思就是woker等待了指定时间仍然没有 //取到任务,此时就会跳过循环体,进入woker线程的销毁逻辑。 while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //该方法是个空的实现,若是有须要用户能够本身继承该类进行实现 beforeExecute(wt, task); Throwable thrown = null; try { //真正的任务执行逻辑 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //该方法是个空的实现,若是有须要用户能够本身继承该类进行实现 afterExecute(task, thrown); } } finally { //这里设为null,也就是循环体再执行的时候会调用getTask方法 task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //当指定任务执行完成,阻塞队列中也取不到可执行任务时,会进入这里,作一些善后工做,好比在corePoolSize跟maximumPoolSize之间的woker会进行回收 processWorkerExit(w, completedAbruptly); } }
woker线程的执行流程就是首先执行初始化时分配给的任务,执行完成之后会尝试从阻塞队列中获取可执行的任务,若是指定时间内仍然没有任务能够执行,则进入销毁逻辑。这里只会回收corePoolSize与maximumPoolSize直接的那部分woker。
执行任务除了可使用execute方法还可使用submit方法。它们的主要区别是:execute适用于不须要关注返回值的场景,submit方法适用于须要关注返回值的场景。
当执行任务时发生异常,那么该怎么处理呢?首先看当Thread线程异常如何处理。
在任务中经过try...catch是能够捕获异常并进行处理的,以下代码:
Thread t = new Thread(() -> { try { System.out.println(1 / 0); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } }); t.start();
若是不少线程任务默认的异常处理机制都是相同的,能够经过Thread类的UncaughtExceptionHandler来设置线程默认的异常处理机制。
实现UncaughtExceptionHandler接口,并调用Thread#setUncaughtExceptionHandler(UncaughtExceptionHandler)方法。若是想设置为全局默认异常处理机制,则可调用Thread#setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler)方法。
ThreadGroup默认提供了异常处理机制以下:
public void uncaughtException(Thread t, Throwable e) { if (parent != null) { parent.uncaughtException(t, e); } else { Thread.UncaughtExceptionHandler ueh = Thread.getDefaultUncaughtExceptionHandler(); if (ueh != null) { ueh.uncaughtException(t, e); } else if (!(e instanceof ThreadDeath)) { System.err.print("Exception in thread \"" + t.getName() + "\" "); e.printStackTrace(System.err); } } }
ThreadPoolExecutor的异常处理机制与Thread是同样的。同时,ThreadPoolExecutor提供了uncaughtExceptionHandler方法来设置异常处理。以下示例:
public class ThreadPool { public static void main(String[] args) { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("demo-pool-%d") .setUncaughtExceptionHandler(new LogUncaughtExceptionHandler()) .build(); ExecutorService pool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); pool.execute(() -> { throw new RuntimeException("测试异常"); }); pool.shutdown(); } static class LogUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { @Override public void uncaughtException(Thread t, Throwable e) { System.out.println("打印LogUncaughtExceptionHandler中得到的异常信息:" + e.getMessage()); } } }
但须要注意的是使用UncaughtExceptionHandler的方法只适用于execute方法执行的任务,而对submit方法是无效。submit执行的任务,能够经过返回的Future对象的get方法接收抛出的异常,再进行处理。这也算是execute方法与submit方法的差异之一。
线程池有如下工做队列:
关闭线程池能够调用shutdownNow和shutdown两个方法来实现。
shutdownNow:对正在执行的任务所有发出interrupt(),中止执行,对还未开始执行的任务所有取消,而且返回还没开始的任务列表。
shutdown:当咱们调用shutdown后,线程池将再也不接受新的任务,但也不会去强制终止已经提交或者正在执行中的任务。
参考文章:
https://www.jianshu.com/p/5df6e38e4362
https://juejin.im/post/5d1882b1f265da1ba84aa676
原文连接:《面试题-关于Java线程池一篇文章就够了》