论如何优雅的自定义ThreadPoolExecutor线程池

更好的markDown阅读体验可直接访问个人CSDN博客:https://blog.csdn.net/u012881584/article/details/85221635java

前言

线程池想必你们也都用过,JDK的Executors 也自带一些线程池。可是不知道你们有没有想过,如何才是最优雅的方式去使用过线程池吗? 生产环境要怎么去配置本身的线程池才是合理的呢?
今天周末,恰好有时间来总结一下本身所认为的'优雅', 若有问题欢迎你们指正。面试

线程池使用规则

要使用好线程池,那么必定要遵循几个规则:数据库

  1. 线程个数大小的设置
  2. 线程池相关参数配置
  3. 利用Hook嵌入你的行为
  4. 线程池的关闭

线程池配置相关

线程池大小的设置

这实际上是一个面试的考点,不少面试官会问你线程池coreSize 的大小来考察你对于线程池的理解。
首先针对于这个问题,咱们必需要明确咱们的需求是计算密集型仍是IO密集型,只有了解了这一点,咱们才能更好的去设置线程池的数量进行限制。编程

一、计算密集型:
顾名思义就是应用须要很是多的CPU计算资源,在多核CPU时代,咱们要让每个CPU核心都参与计算,将CPU的性能充分利用起来,这样才算是没有浪费服务器配置,若是在很是好的服务器配置上还运行着单线程程序那将是多么重大的浪费。对于计算密集型的应用,彻底是靠CPU的核数来工做,因此为了让它的优点彻底发挥出来,避免过多的线程上下文切换,比较理想方案是:缓存

线程数 = CPU核数+1,也能够设置成CPU核数*2,但还要看JDK的版本以及CPU配置(服务器的CPU有超线程)。服务器

通常设置CPU * 2便可。网络

二、IO密集型
咱们如今作的开发大部分都是WEB应用,涉及到大量的网络传输,不只如此,与数据库,与缓存间的交互也涉及到IO,一旦发生IO,线程就会处于等待状态,当IO结束,数据准备好后,线程才会继续执行。所以从这里能够发现,对于IO密集型的应用,咱们能够多设置一些线程池中线程的数量,这样就能让在等待IO的这段时间内,线程能够去作其它事,提升并发处理效率。那么这个线程池的数据量是否是能够随便设置呢?固然不是的,请必定要记得,线程上下文切换是有代价的。目前总结了一套公式,对于IO密集型应用:
线程数 = CPU核心数/(1-阻塞系数) 这个阻塞系数通常为0.8~0.9之间,也能够取0.8或者0.9。
套用公式,对于双核CPU来讲,它比较理想的线程数就是20,固然这都不是绝对的,须要根据实际状况以及实际业务来调整:final int poolSize = (int)(cpuCore/(1-0.9))并发

针对于阻塞系数,《Programming Concurrency on the JVM Mastering》即《Java 虚拟机并发编程》中有提到一句话:app

对于阻塞系数,咱们能够先试着猜想,抑或采用一些性能分析工具或java.lang.management API 来肯定线程花在系统/IO操做上的时间与CPU密集任务所耗的时间比值。ide

线程池相关参数配置

说到这一点,咱们只须要谨记一点,必定不要选择没有上限限制的配置项
这也是为何不建议使用Executors 中建立线程的方法。
好比,Executors.newCachedThreadPool的设置与无界队列的设置由于某些不可预期的状况,线程池会出现系统异常,致使线程暴增的状况或者任务队列不断膨胀,内存耗尽致使系统崩溃和异常。 咱们推荐使用自定义线程池来避免该问题,这也是在使用线程池规范的首要原则! 当心无大错,千万别过分自信!
能够看下Executors中四个建立线程池的方法:

//使用无界队列
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

//线程池数量是无限的
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

其余的就再也不列举了,你们能够自行查阅源码。

第二,合理设置线程数量、和线程空闲回收时间,根据具体的任务执行周期和时间去设定,避免频繁的回收和建立,虽然咱们使用线程池的目的是为了提高系统性能和吞吐量,可是也要考虑下系统的稳定性,否则出现不可预期问题会很麻烦!
第三,根据实际场景,选择适用于本身的拒绝策略。进行补偿,不要乱用JDK支持的自动补偿机制!尽可能采用自定义的拒绝策略去进行兜底!
第四,线程池拒绝策略,自定义拒绝策略能够实现RejectedExecutionHandler接口。
JDK自带的拒绝策略以下:
AbortPolicy:直接抛出异常阻止系统正常工做。
CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。
DiscardOldestPolicy:丢弃最老的一个请求,尝试再次提交当前任务。
DiscardPolicy:丢弃没法处理的任务,不给予任何处理。

利用Hook

利用Hook,留下线程池执行轨迹:
ThreadPoolExecutor提供了protected类型能够被覆盖的钩子方法,容许用户在任务执行以前会执行以后作一些事情。咱们能够经过它来实现好比初始化ThreadLocal、收集统计信息、如记录日志等操做。这类Hook如beforeExecute和afterExecute。另外还有一个Hook能够用来在任务被执行完的时候让用户插入逻辑,如rerminated 。
若是hook方法执行失败,则内部的工做线程的执行将会失败或被中断。

咱们可使用beforeExecute和afterExecute来记录线程以前前和后的一些运行状况,也能够直接把运行完成后的状态记录到ELK等日志系统。

关闭线程池

内容当线程池不在被引用而且工做线程数为0的时候,线程池将被终止。咱们也能够调用shutdown来手动终止线程池。若是咱们忘记调用shutdown,为了让线程资源被释放,咱们还可使用keepAliveTime和allowCoreThreadTimeOut来达到目的!
固然,稳妥的方式是使用虚拟机Runtime.getRuntime().addShutdownHook方法,手工去调用线程池的关闭方法!

线程池使用实例

线程池核心代码:

public class AsyncProcessQueue {

 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

 /**
  * Task 包装类<br>
  * 此类型的意义是记录可能会被 Executor 吃掉的异常<br>
  */
 public static class TaskWrapper implements Runnable {
  private static final Logger _LOGGER = LoggerFactory.getLogger(TaskWrapper.class);

  private final Runnable gift;

  public TaskWrapper(final Runnable target) {
   this.gift = target;
  }

  @Override
  public void run() {

   // 捕获异常,避免在 Executor 里面被吞掉了
   if (gift != null) {

    try {
     gift.run();
    } catch (Exception e) {
     _LOGGER.error("Wrapped target execute exception.", e);
    }
   }
  }
 }

 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

 /**
  * 执行指定的任务
  * 
  * @param task
  * @return
  */
 public static boolean execute(final Runnable task) {
  return AsyncProcessor.executeTask(new TaskWrapper(task));
 }
}
public class AsyncProcessor {
 static final Logger LOGGER = LoggerFactory.getLogger(AsyncProcessor.class);

 /**
  * 默认最大并发数<br>
  */
 private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;

 /**
  * 线程池名称格式
  */
 private static final String THREAD_POOL_NAME = "ExternalConvertProcessPool-%d";

 /**
  * 线程工厂名称
  */
 private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder().namingPattern(THREAD_POOL_NAME)
   .daemon(true).build();

 /**
  * 默认队列大小
  */
 private static final int DEFAULT_SIZE = 500;

 /**
  * 默认线程存活时间
  */
 private static final long DEFAULT_KEEP_ALIVE = 60L;

 /**NewEntryServiceImpl.java:689
  * Executor
  */
 private static ExecutorService executor;

 /**
  * 执行队列
  */
 private static BlockingQueue<Runnable> executeQueue = new ArrayBlockingQueue<>(DEFAULT_SIZE);

 static {
  // 建立 Executor
  // 此处默认最大值改成处理器数量的 4 倍
  try {
   executor = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE,
     TimeUnit.SECONDS, executeQueue, FACTORY);

   // 关闭事件的挂钩
   Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

    @Override
    public void run() {
     AsyncProcessor.LOGGER.info("AsyncProcessor shutting down.");

     executor.shutdown();

     try {

      // 等待1秒执行关闭
      if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
       AsyncProcessor.LOGGER.error("AsyncProcessor shutdown immediately due to wait timeout.");
       executor.shutdownNow();
      }
     } catch (InterruptedException e) {
      AsyncProcessor.LOGGER.error("AsyncProcessor shutdown interrupted.");
      executor.shutdownNow();
     }

     AsyncProcessor.LOGGER.info("AsyncProcessor shutdown complete.");
    }
   }));
  } catch (Exception e) {
   LOGGER.error("AsyncProcessor init error.", e);
   throw new ExceptionInInitializerError(e);
  }
 }

 /**
  * 此类型没法实例化
  */
 private AsyncProcessor() {
 }

 /**
  * 执行任务,不论是否成功<br>
  * 其实也就是包装之后的 {@link Executer} 方法
  * 
  * @param task
  * @return
  */
 public static boolean executeTask(Runnable task) {

  try {
   executor.execute(task);
  } catch (RejectedExecutionException e) {
   LOGGER.error("Task executing was rejected.", e);
   return false;
  }

  return true;
 }

 /**
  * 提交任务,并能够在稍后获取其执行状况<br>
  * 当提交失败时,会抛出 {@link }
  * 
  * @param task
  * @return
  */
 public static <T> Future<T> submitTask(Callable<T> task) {

  try {
   return executor.submit(task);
  } catch (RejectedExecutionException e) {
   LOGGER.error("Task executing was rejected.", e);
   throw new UnsupportedOperationException("Unable to submit the task, rejected.", e);
  }
 }
}

使用方式:

AsyncProcessQueue.execute(new Runnable() {
          @Override
         public void run() {
               //do something
        }
});

能够根据本身的使用场景灵活变动,我这里并无用到beforeExecute和afterExecute以及拒绝策略。

相关文章
相关标签/搜索