ThreadPoolExecutor

1、概述

2、参数列表

参数名 做用
corePoolSize 核心线程池大小
maximumPoolSize 最大线程池大小
keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间;能够allowCoreThreadTimeOut(true)使得核心线程有效时间
TimeUnit keepAliveTime时间单位
workQueue 阻塞任务队列
threadFactory 新建线程工厂
RejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理

重点讲解:java

其中比较容易让人误解的是:corePoolSize,maximumPoolSize,workQueue之间关系。 
    1.当线程池小于corePoolSize时,新提交任务将建立一个新线程执行任务,即便此时线程池中存在空闲线程。 
    2.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行 
    3.当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会建立新线程执行任务 
    4.当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理 
    5.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程(只会关闭超过corePoolSize的数) 
    6.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭ide

线程管理机制图示:spa

3、Executors配置方案

一、构造一个固定线程数目的线程池,配置的corePoolSize与maximumPoolSize大小相同,同时使用了一个无界LinkedBlockingQueue存放阻塞任务,所以多余的任务将存在再阻塞队列,不会由RejectedExecutionHandler处理。线程

public static ExecutorService newFixedThreadPool(int nThreads) {  
        return new ThreadPoolExecutor(nThreads, nThreads,  
                                      0L, TimeUnit.MILLISECONDS,  
                                      new LinkedBlockingQueue<Runnable>());  
    }

二、构造一个缓冲功能的线程池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及一个无容量的阻塞队列 SynchronousQueue,所以任务提交以后,将会建立新的线程执行;线程空闲超过60s将会销毁。debug

public static ExecutorService newCachedThreadPool() {  
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
                                      60L, TimeUnit.SECONDS,  
                                      new SynchronousQueue<Runnable>());  
    }

三、构造一个只支持一个线程的线程池,配置corePoolSize=maximumPoolSize=1,无界阻塞队列LinkedBlockingQueue;保证任务由一个线程串行执行。code

public static ExecutorService newSingleThreadExecutor() {  
        return new FinalizableDelegatedExecutorService  
            (new ThreadPoolExecutor(1, 1,  
                                    0L, TimeUnit.MILLISECONDS,  
                                    new LinkedBlockingQueue<Runnable>()));  
    }

四、 构造有定时功能的线程池,配置corePoolSize,无界延迟阻塞队列DelayedWorkQueue;有意思的是:maximumPoolSize=Integer.MAX_VALUE,因为DelayedWorkQueue是无界队列,因此这个值是没有意义的。队列

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {  
        return new ScheduledThreadPoolExecutor(corePoolSize);  
    }  
  
public static ScheduledExecutorService newScheduledThreadPool(  
            int corePoolSize, ThreadFactory threadFactory) {  
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);  
    }  
  
public ScheduledThreadPoolExecutor(int corePoolSize,  
                             ThreadFactory threadFactory) {  
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,  
              new DelayedWorkQueue(), threadFactory);  
    }

4、定制本身的线程池

public class ThreadPoolManager {

	private static final Logger logger = LoggerFactory.getLogger(ThreadPoolManager.class);
    private static ThreadPoolExecutor threadpool = 
new ThreadPoolExecutor(10, 100, 30, TimeUnit.SECONDS, new ArrayBlockingQueue(50),new MessageThreadFactory(),new MessageRejectedExecutionHandler());
     // 是指线程池一启动的时候,就会直接建立核心数线程10个,而后当再有任务进来的时候,在入队到阻塞队列里,若是
     // 队列数超过50个了,才会开始继续建立额外的线程,直到达到最大线程数100个
    static class MessageThreadFactory implements ThreadFactory {
        private AtomicInteger count = new AtomicInteger(0);
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            String threadName = ThreadPoolManager.class.getSimpleName() + count.addAndGet(1);
            logger.debug("新建线程:"+logger);
            t.setName(threadName);
            return t;
        }
    }
    static class MessageRejectedExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);  // 加了这句,则就是阻塞线程池了
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void submit(Runnable task){
        threadpool.submit(task);
    }

    public static Future<String> submit(Callable<String> task){
        return threadpool.submit(task);
    }
}
相关文章
相关标签/搜索