前言 java
=====spring
谈到 Java 的线程池最熟悉的莫过于 ExecutorService 接口了,jdk1.5 新增的 java.util.concurrent 包下的这个 api,大大的简化了多线程代码的开发。而不论你用 FixedThreadPool 仍是 CachedThreadPool 其背后实现都是ThreadPoolExecutor。数据库
ThreadPoolExecutor 是一个典型的缓存池化设计的产物,由于池子有大小,当池子体积不够承载时,就涉及到拒绝策略。JDK 中已经预设了 4 种线程池拒绝策略,下面结合场景详细聊聊这些策略的使用场景,以及咱们还能扩展哪些拒绝策略。api
池话设计应该不是一个新名词。咱们常见的如 Java 线程池、JDBC 链接池、Redis 链接池等就是这类设计的表明实现。这种设计会初始预设资源,解决的问题就是抵消每次获取资源的消耗,如建立线程的开销,获取远程链接的开销等。缓存
就比如你去食堂打饭,打饭的大妈会先把饭盛好几份放那里,你来了就直接拿着饭盒加菜便可,不用再临时又盛饭又打菜,效率就高了。多线程
除了初始化资源,池化设计还包括以下这些特征:池子的初始值、池子的活跃值、池子的最大值等,这些特征能够直接映射到 Java 线程池和数据库链接池的成员属性中。并发
和数据源链接池不同,线程池除了初始大小和池子最大值,还多了一个阻塞队列来缓冲。数据源链接池通常请求的链接数超过链接池的最大值的时候就会触发拒绝策略,策略通常是阻塞等待设置的时间或者直接抛异常。而线程池的触发时机以下图:运维
如图,想要了解线程池何时触发拒绝粗略,须要明确上面三个参数的具体含义,是这三个参数整体协调的结果,而不是简单的超过最大线程数就会触发线程拒绝粗略,当提交的任务数大于 corePoolSize 时,会优先放到队列缓冲区,只有填满了缓冲区后,才会判断当前运行的任务是否大于 maxPoolSize,小于时会新建线程处理。大于时就触发了拒绝策略,总结就是:当前提交任务数大于(maxPoolSize + queueCapacity)时就会触发线程池的拒绝策略了。ide
在分析 JDK 自带的线程池拒绝策略前,先看下 JDK 定义的 拒绝策略接口,以下:性能
1public interface RejectedExecutionHandler { 2 void rejectedExecution(Runnable r, ThreadPoolExecutor executor); 3}
接口定义很明确,当触发拒绝策略时,线程池会调用你设置的具体的策略,将当前提交的任务以及线程池实例自己传递给你处理,具体做何处理,不一样场景会有不一样的考虑,下面看 JDK 为咱们内置了哪些实现:
1 public static class CallerRunsPolicy implements RejectedExecutionHandler { 2 3 public CallerRunsPolicy() { } 4 5 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 6 if (!e.isShutdown()) { 7 r.run(); 8 } 9 } 10 }
功能:当触发拒绝策略时,只要线程池没有关闭,就由提交任务的当前线程处理。
使用场景:通常在不容许失败的、对性能要求不高、并发量较小的场景下使用,由于线程池通常状况下不会关闭,也就是提交的任务必定会被运行,可是因为是调用者线程本身执行的,当屡次提交任务时,就会阻塞后续任务执行,性能和效率天然就慢了。
1 public static class AbortPolicy implements RejectedExecutionHandler { 2 3 public AbortPolicy() { } 4 5 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 6 throw new RejectedExecutionException("Task " + r.toString() + 7 " rejected from " + 8 e.toString()); 9 } 10 }
功能:当触发拒绝策略时,直接抛出拒绝执行的异常,停止策略的意思也就是打断当前执行流程
使用场景:这个就没有特殊的场景了,可是一点要正确处理抛出的异常。ThreadPoolExecutor 中默认的策略就是AbortPolicy,ExecutorService 接口的系列 ThreadPoolExecutor 由于都没有显示的设置拒绝策略,因此默认的都是这个。可是请注意,ExecutorService 中的线程池实例队列都是无界的,也就是说把内存撑爆了都不会触发拒绝策略。当本身自定义线程池实例时,使用这个策略必定要处理好触发策略时抛的异常,由于他会打断当前的执行流程。
1 public static class DiscardPolicy implements RejectedExecutionHandler { 2 3 public DiscardPolicy() { } 4 5 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 6 } 7 }
功能:直接静悄悄的丢弃这个任务,不触发任何动做
使用场景:若是你提交的任务可有可无,你就可使用它 。由于它就是个空实现,会悄无声息的吞噬你的的任务。因此这个策略基本上不用了
1 public static class DiscardOldestPolicy implements RejectedExecutionHandler { 2 3 public DiscardOldestPolicy() { } 4 5 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 6 if (!e.isShutdown()) { 7 e.getQueue().poll(); 8 e.execute(r); 9 } 10 } 11 }
功能:若是线程池未关闭,就弹出队列头部的元素,而后尝试执行
使用场景:这个策略仍是会丢弃任务,丢弃时也是毫无声息,可是特色是丢弃的是老的未执行的任务,并且是待执行优先级较高的任务。基于这个特性,我能想到的场景就是,发布消息,和修改消息,当消息发布出去后,还未执行,此时更新的消息又来了,这个时候未执行的消息的版本比如今提交的消息版本要低就能够被丢弃了。由于队列中还有可能存在消息版本更低的消息会排队执行,因此在真正处理消息的时候必定要作好消息的版本比较
1public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy { 2 3 protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class); 4 5 private final String threadName; 6 7 private final URL url; 8 9 private static volatile long lastPrintTime = 0; 10 11 private static Semaphore guard = new Semaphore(1); 12 13 public AbortPolicyWithReport(String threadName, URL url) { 14 this.threadName = threadName; 15 this.url = url; 16 } 17 18 @Override 19 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 20 String msg = String.format("Thread pool is EXHAUSTED!" + 21 " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," + 22 " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!", 23 threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), 24 e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), 25 url.getProtocol(), url.getIp(), url.getPort()); 26 logger.warn(msg); 27 dumpJStack(); 28 throw new RejectedExecutionException(msg); 29 } 30 31 private void dumpJStack() { 32 //省略实现 33 } 34}
能够看到,当dubbo的工做线程触发了线程拒绝后,主要作了三个事情,原则就是尽可能让使用者清楚触发线程拒绝策略的真实缘由
1 private static final class NewThreadRunsPolicy implements RejectedExecutionHandler { 2 NewThreadRunsPolicy() { 3 super(); 4 } 5 6 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 7 try { 8 final Thread t = new Thread(r, "Temporary task executor"); 9 t.start(); 10 } catch (Throwable e) { 11 throw new RejectedExecutionException( 12 "Failed to start a new thread", e); 13 } 14 } 15 }
Netty 中的实现很像 JDK 中的 CallerRunsPolicy,舍不得丢弃任务。不一样的是,CallerRunsPolicy 是直接在调用者线程执行的任务。而 Netty是新建了一个线程来处理的。因此,Netty的实现相较于调用者执行策略的使用面就能够扩展到支持高效率高性能的场景了。可是也要注意一点,Netty的实现里,在建立线程时未作任何的判断约束,也就是说只要系统还有资源就会建立新的线程来处理,直到new不出新的线程了,才会抛建立线程失败的异常
1 new RejectedExecutionHandler() { 2 @Override 3 public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { 4 try { 5 executor.getQueue().offer(r, 60, TimeUnit.SECONDS); 6 } catch (InterruptedException e) { 7 throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker"); 8 } 9 10 throw new RejectedExecutionException("Timed Out while attempting to enqueue Task."); 11 } 12 });
activeMq中的策略属于最大努力执行任务型,当触发拒绝策略时,在尝试一分钟的时间从新将任务塞进任务队列,当一分钟超时还没成功时,就抛出异常
1public class RejectedExecutionHandlerChain implements RejectedExecutionHandler { 2 private final RejectedExecutionHandler[] handlerChain; 3 4 public static RejectedExecutionHandler build(List<RejectedExecutionHandler> chain) { 5 Objects.requireNonNull(chain, "handlerChain must not be null"); 6 RejectedExecutionHandler[] handlerChain = chain.toArray(new RejectedExecutionHandler[0]); 7 return new RejectedExecutionHandlerChain(handlerChain); 8 } 9 10 private RejectedExecutionHandlerChain(RejectedExecutionHandler[] handlerChain) { 11 this.handlerChain = Objects.requireNonNull(handlerChain, "handlerChain must not be null"); 12 } 13 14 @Override 15 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 16 for (RejectedExecutionHandler rejectedExecutionHandler : handlerChain) { 17 rejectedExecutionHandler.rejectedExecution(r, executor); 18 } 19 } 20}
pinpoint的拒绝策略实现颇有特色,和其余的实现都不一样。他定义了一个拒绝策略链,包装了一个拒绝策略列表,当触发拒绝策略时,会将策略链中的rejectedExecution依次执行一遍。
前文从线程池设计思想,以及线程池触发拒绝策略的时机引出java线程池拒绝策略接口的定义。并辅以JDK内置4种以及四个第三方开源软件的拒绝策略定义描述了线程池拒绝策略实现的各类思路和使用场景。但愿阅读此文后能让你对java线程池拒绝策略有更加深入的认识,可以根据不一样的使用场景更加灵活的应用。