等待返回任务的结果的多步骤的处理场景, 批量并发执行任务,总耗时是单个步骤耗时最长的那个,提供总体的执行效率,程序员
最终一致性,异步执行任务,无需等待,快速返回数据库
通常状况下咱们是经过ThreadPoolExecutor来构造咱们的线程池对象的。缓存
* 阿里巴巴的开发规范文档是禁止直接使用Executors静态工厂类来建立线程池的,缘由是 服务器
【强制】线程池不容许使用 Executors 去建立,而是经过 ThreadPoolExecutor 的方式,这样
的处理方式让写的同窗更加明确线程池的运行规则,规避资源耗尽的风险。
说明: Executors 返回的线程池对象的弊端以下:
(1) FixedThreadPool 和 SingleThreadPool :
容许的请求队列长度为 Integer.MAX_VALUE ,可能会堆积大量的请求,从而致使 OOM 。
(2) CachedThreadPool 和 ScheduledThreadPool :
容许的建立线程数量为 Integer.MAX_VALUE ,可能会建立大量的线程,从而致使 OOM 。并发
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { }
参数说明:异步
- corePoolSize:核心线程数,线程池最低的线程数
- maximumPoolSize:容许的最大的线程数
- keepAliveTime:当前线程数超过corePoolSize的时候,空闲线程保留的时间
- unit: keepAliveTime线程保留的时间的单位
- workQueue: 任务缓冲区
- threadFactory: 线程的构造工厂
- handler: 线程池饱含时候的处理策略
Java经过Executors提供四种线程池,分别为:async
- newCachedThreadPool建立一个可缓存线程池,若是线程池长度超过处理须要,可灵活回收空闲线程,若无可回收,则新建线程。
- newFixedThreadPool 建立一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
- newScheduledThreadPool 建立一个定长线程池,支持定时及周期性任务执行。
- newSingleThreadExecutor 建立一个单线程化的线程池,它只会用惟一的工做线程来执行任务,保证全部任务按照指定顺序(FIFO, LIFO, 优先级)执行。
public static ExecutorService newCachedThreadPool(){ return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>()); }
它是一个能够无限扩大的线程池;优化
- 它比较适合处理执行时间比较小的任务;
corePoolSize为0,maximumPoolSize为无限大,意味着线程数量能够无限大;ui
keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死;线程
采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必需要找到一条工做线程处理他,若是当前没有空闲的线程,那么就会再建立一条新的线程。
public static ExecutorService newFixedThreadPool(int nThreads){ return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }
- 它是一种固定大小的线程池;corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;
- keepAliveTime为0,意味着一旦有多余的空闲线程,就会被当即中止掉;但这里keepAliveTime无效;
- 阻塞队列采用了LinkedBlockingQueue,它是一个无界队列;因为阻塞队列是一个无界队列,所以永远不可能拒绝任务;
- 因为采用了无界队列,实际线程数量将永远维持在nThreads,所以maximumPoolSize和keepAliveTime将无效。
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
- 定时任务的使用
public static ExecutorService newSingleThreadExecutor(){ return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }
- 它只会建立一条工做线程处理任务;
- 采用的阻塞队列为LinkedBlockingQueue;
线程池 | 特色 | 建议使用场景 |
---|---|---|
newCachedThreadPool | 一、线程数无上限 二、空闲线程存活60s 三、阻塞队列 |
一、任务执行时间短 二、任务要求响应时间短 |
newFixedThreadPool | 一、线程数固定 二、无界队列 |
一、任务比较平缓 二、控制最大的线程数 |
newScheduledThreadPool | 核心线程数量固定、非核心线程数量无限制(闲置时立刻回收) | 执行定时 / 周期性 任务 |
newSingleThreadExecutor | 只有一个核心线程(保证全部任务按照指定顺序在一个线程中执行,不须要处理线程同步的问题) | 不适合并发但可能引发IO阻塞性及影响UI线程响应的操做,如数据库操做,文件操做等 |
现象 | 缘由 |
---|---|
整个系统影响缓慢,大部分504 | 一、为设置最大的线程数,任务积压过多,线程数用尽 |
oom | 一、队列无界或者size设置过大 |
使用线程池对效率并无明显的提高 | 一、线程池的参数设置太小,线程数太小或者队列太小,或者是服务器的cpu核数过低 |
- 线程池中线程数和队列的类型及长度对线程会形成很大的影响,并且会争夺系统稀有资源,线程数。设置不当,或是没有最大的利用系统资源,提升系统的总体运行效率,或是致使整个系统的故障。典型的场景是线程数被占满,其余的请求无响应。或是任务积压过多,直接oom
- 方便的排查线程中的故障以及优化线程池的使用
另起一个定时单线程数的线程池newSingleThreadScheduledExecutor
调用scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)定时执行监控任务;
定时任务内 经过ThreadPoolExecutor对象获取监控的对象信息,好比t线程池须要执行的任务数、线程池在运行过程当中已完成的任务数、曾经建立过的最大线程数、线程池里的线程数量、线程池里活跃的线程数量、当前排队线程数
根据预设的日志或报警策略,进行规则控制
定义线程池并启动监控
/** * 定义线程池的队列的长度 */ private final Integer queueSize = 1000; /** * 定义一个定长的线程池 */ private ExecutorService executorService; @PostConstruct private void initExecutorService() { log.info( "executorService init with param: threadcount:{} ,queuesize:{}", systemConfig.getThreadCount(), systemConfig.getThreadQueueSize()); executorService = new ThreadPoolExecutor( systemConfig.getThreadCount(), systemConfig.getThreadCount(), 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(systemConfig.getThreadQueueSize()), new BasicThreadFactory.Builder() .namingPattern("async-sign-thread-%d") .build(), (r, executor) -> log.error("the async executor pool is full!!")); /** 启动线程池的监控 */ ThreadPoolMonitoring threadPoolMonitoring = new ThreadPoolMonitoring(); threadPoolMonitoring.init(); }
线程池的监控
/** * 功能说明:线程池监控 * * @params * @return <br> * 修改历史<br> * [2019年06月14日 10:20:10 10:20] 建立方法by fengqingyang */ public class ThreadPoolMonitoring { /** 用于周期性监控线程池的运行状态 */ private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( new BasicThreadFactory.Builder() .namingPattern("async thread executor monitor") .build()); /** * 功能说明:自动运行监控 * * @return <br> * 修改历史<br> * [2019年06月14日 10:26:51 10:26] 建立方法by fengqingyang * @params */ public void init() { scheduledExecutorService.scheduleAtFixedRate( () -> { try { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService; /** 线程池须要执行的任务数 */ long taskCount = threadPoolExecutor.getTaskCount(); /** 线程池在运行过程当中已完成的任务数 */ long completedTaskCount = threadPoolExecutor.getCompletedTaskCount(); /** 曾经建立过的最大线程数 */ long largestPoolSize = threadPoolExecutor.getLargestPoolSize(); /** 线程池里的线程数量 */ long poolSize = threadPoolExecutor.getPoolSize(); /** 线程池里活跃的线程数量 */ long activeCount = threadPoolExecutor.getActiveCount(); /** 当前排队线程数 */ int queueSize = threadPoolExecutor.getQueue().size(); log.info( "async-executor monitor. taskCount:{}, completedTaskCount:{}, largestPoolSize:{}, poolSize:{}, activeCount:{},queueSize:{}", taskCount, completedTaskCount, largestPoolSize, poolSize, activeCount, queueSize); /** 超过阀值的80%报警 */ if (activeCount >= systemConfig.getThreadCount() * 0.8) { log.error( "async-executor monitor. taskCount:{}, completedTaskCount:{}, largestPoolSize:{}, poolSize:{}, activeCount:{},queueSize:{}", taskCount, completedTaskCount, largestPoolSize, poolSize, activeCount, queueSize); ; } } catch (Exception ex) { log.error("ThreadPoolMonitoring service error,{}", ex.getMessage()); } }, 0, 30, TimeUnit.SECONDS); } }
- 线程数要合理设置,通常建议值是核数的2倍。
- 线程池队列的类型和长度要根据业特性合理设置
- 不一样的业务须要线程池隔离,避免相互影响
- 未每一个线程池增长特有的命名规范以及关键的日志,方便出问题排查和优化
更多精彩,敬请关注, 程序员导航网 https://chenzhuofan.top