做为一个牛逼的程序员,相信你们确定是接触过多线程的概念的。而且可能会在实际的工做中由于一些业务场景须要使用自定义线程池来执行批量的任务或对线程进行管理。一样,咱们项目中也存在一个两个场景须要使用线程池。而这两个场景分别为:java
一、持续监听某个外部接口的不间断的返回信息,其实就是长连接的阻塞接口,总共12种资源须要监听,因此就意味须要12个不间断的线程执行阻塞任务。程序员
二、RabbitMQ的消费者,由于须要应用启动的时候就执行消息的消费,因此也经过线程池中获取线程执行消费任务。多线程
public class ThreadPoolUtil { private static Logger logger = LoggerFactory.getLogger(ThreadPoolUtil.class); private static volatile ThreadPoolExecutor threadPoolExecutor = null; /** * 建立 * @return */ private static AtomicInteger nextId = new AtomicInteger(0); public static ThreadPoolExecutor createExecutor(){ int corePoolSize = 12; // 核心线程12个 int maxPoolSize = 16; // 最大线程数 16个 int keepAliveSeconds = 60; //闲置存活时间60秒 BlockingQueue<Runnable> queue = new ArrayBlockingQueue(500); // 临时队列500个 RejectedExecutionHandler rejectedExecutionHandler = (r, executor) -> logger.error("队列已经满了{},直接拒绝吧", executor.getTaskCount());
// 同步代码块 synchronized (ThreadPoolUtil.class){ if (threadPoolExecutor != null){ return threadPoolExecutor; } // 建立单例的线程池 threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveSeconds, TimeUnit.SECONDS, queue, r -> { String fileName = Thread.currentThread().getStackTrace()[5].getFileName(); // 获取外部用户层的调用栈信息 String threadName = fileName.substring(0,fileName.indexOf("."))+"-"; // 获取调用栈的名称,做为线程的名称 Thread thread = new Thread(r, threadName+nextId.incrementAndGet()); return thread; }, rejectedExecutionHandler); } return threadPoolExecutor; } }
看看上面的线程池设计,好像是没有啥问题的。若是是放在普通的可终结的任务使用当前线程池,理论上是没有太大问题。可是!咱们的应用恰好这几个任务都是阻塞的。阻塞就意味着线程是没法回收的,其余的任务使用这个线程池以后,就只能先放到队列中,而后一直得不到释放的线程资源执行。最终队列积压,任务被抛弃。ide
由于在初始化的时候,已经将 12 个监听都启动了,而且使用的是当前线程池构造工具。启动完成以后,12个核心线程就一直被阻塞占用。这12个资源的监听仍是比较正常的,而且可以对监听数据进行处理和执行。工具
由于须要MQ消费端启动的时候就能够执行消费,因此在启动的时候,设置了启动配置类中调用上述工具建立线程池,至关于用新的线程执行消息监听的动做。然而MQ却迟迟不见消费的过程,致使消息队列一直积压。而且没法完成正确的数据处理。测试
猜想:没有被消费,应该就是咱们的线程池中没有空闲的线程进行消息监听处理。初始化的时候的消费监听的任务被直接丢弃到了线程池的任务队列中,而这个线程池的任务队列中数数据只有在两种状况下才可能被执行。spa
第一种:线程池中有空闲的线程,能够进行执行线程
第二种:消息队列满了,开启了加大了线程池的线程数以便执行堆积的任务设计
而咱们的这个一步开启MQ消费监听的任务被发送到线程池的时候,由于核心线程数就是 12 ,而咱们前面的资源监听接口已经开启了12个阻塞任务,因此就没有了可用线程。因此被存放到了线程池待执行任务队列中。可怕的是,咱们这个线程池的队列大小为500 ,很显然 1 < 500 ,因此就没法触发线程加大的动做,致使这个队列的任务“被遗忘”。3d
理论支撑:
线程池的核心参数包括: coreSize , maxSize, quauaSize,RejectedExecutionHandler
分别为:核心线程数,最大线程数,可积压的任务数,拒绝策略
当建立线程的时候,首先会先建立核心线程数等量的线程,好比上面就是 12个核心线程, 而当咱们的核心线程都在执行阶段的时候,再次加入的任务就会被存放到任务队列中。当任务不断的增长而且幅度远远大于核心线程的处理速度,致使任务队列存放到最大值,好比上面的500,那么就须要增长线程数,此时就是须要增长线程数到最大值,好比上面的16,然而,增大了以后,发现已然不能处理消化任务的投放数量,这个时候就用不一样的处理策略,好比上面的 rejectedExecutionHandler 就是直接丢弃。
猜想和理论匹配一下的话就是:核心线程是12 ,这12个线程被资源监听的阻塞任务占用没法释放,而开启消费监听的任务被丢到了待执行的任务队列中,此时,任务队列又不知足益处的条件,因此就没有增长新的线程来处理,以致于,这个建立消费监听的任务就“被遗忘”了。
如何进行论证呢?使用以下测试代码
public static void main(String[] args) { ThreadPoolExecutor executor = createExecutor(); // 临界值 分别设置12 16 512 518 for (int i =0; i < $临界值;i++){ int finalI = i; executor.execute(new Runnable() { @Override public void run() { try { System.out.println("当前任务序号为:"finalI +" ,活跃线程数"+ executor.getActiveCount()); Thread.sleep(10000*1000); // 这就看成是持久的任务在执行 } catch (InterruptedException e) { e.printStackTrace(); } } }); } }
测试结果:
临界值为 12, 核心线程恰好够用
临界值为 16 , 虽然任务数大于了核心线程,可是并无新建线程数。因此验证任务被放到了队列中,先使用队列存放,队列满了再开新线程
临界值为 512 任务数量大于 核心线程数 因此新任务放到了队列中,且恰好不会有超出,不触发新的线程建立
临界值为 516 任务数量大于 ( 核心线程数 + 队列大小 ) 全部活跃线程被加到最大
临界值为 518, 任务数量大于 ( 队列大小 + 最大线程数) 全部产生丢弃
出现这个问题以后,咱们直接就增长了核心线程的数量,以保证总体大于在阻塞任务的数量。好比咱们这个就是从新设置为核心线程数量 16 > 12,
同时,咱们将阻塞任务同非阻塞任务所建立的线程池进行隔离,以减小共用线程池形成的 正常任务被遗忘的可能性。
那么在开发中,如何设置i线程池的大小?其实这没有特定的规范,须要结合本身任务的执行时间而考虑,
可是最好提早考虑好,任务是否为阻塞性任务,若是是的话,建议作好线程隔离。
在咱们通常将核心线程设置为 n + 1 (n 为内核数量)
最大线程数量设置 2n + 1 (n 为内核数量)