Java线程池(ThreadPool)详解

线程五个状态(生命周期):

线程运行时间

    假设一个服务器完成一项任务所需时间为:T1 建立线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。

       若是:T1 + T3 远大于 T2,则能够采用线程池,以提升服务器性能。
html

线程池技术

  一个线程池包括如下四个基本组成部分java

       一、线程池管理器(ThreadPool):用于建立并管理线程池,包括 建立线程池,销毁线程池,添加新任务;编程

 

                二、工做线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,能够循环的执行任务;数组

                三、任务接口(Task):每一个任务必须实现的接口,以供工做线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工做,任务的执行状态等;缓存

                四、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。安全

   线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提升服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。
  线程池不只调整T1,T3产生的时间段,并且它还显著减小了建立线程的数目,看一个例子:
  假设一个服务器一天要处理50000个请求,而且每一个请求须要一个单独的线程完成。在线程池中,线程数通常是固定的,因此产生线程总数不会超过线程池中线程的数目,而若是服务器不利用线程池来处理这些请求则线程总数为50000。通常线程池大小是远小于50000。因此利用线程池的服务器程序不会为了建立50000而在处理请求时浪费时间,从而提升效率。服务器

 

java并发编程:线程池的使用并发

    咱们使用线程的时候就去建立一个线程,这样实现起来很是简便,可是就会有一个问题:ide

  若是并发的线程数量不少,而且每一个线程都是执行一个时间很短的任务就结束了,这样频繁建立线程就会大大下降系统的效率,由于频繁建立线程和销毁线程须要时间。post

  那么有没有一种办法使得线程能够复用,就是执行完一个任务,并不被销毁,而是能够继续执行其余的任务?

  在Java中能够经过线程池来达到这样的效果。下面咱们就来详细讲解一下Java的线程池,首先咱们从最核心的ThreadPoolExecutor类中的方法讲起,而后再讲述它的实现原理,接着给出了它的使用示例,最后讨论了一下如何合理配置线程池的大小。

  本文的目录大纲:

  一.Java中的ThreadPoolExecutor类

  二.深刻剖析线程池实现原理

  三.使用示例

  四.如何合理配置线程池的大小 

  如有不正之处请多多谅解,并欢迎批评指正。 

一.Java中的ThreadPoolExecutor类

  java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,所以若是要透彻地了解Java中的线程池,必须先了解这个类。下面咱们来看一下ThreadPoolExecutor类的具体实现源码。

  在ThreadPoolExecutor类中提供了四个构造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

   从上面的代码能够得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,经过观察每一个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工做。

   下面解释下一下构造器中各个参数的含义:

  • corePoolSize:核心池的大小,这个参数与后面讲述的线程池的实现原理有很是大的关系。在建立了线程池后,默认状况下,线程池中并无任何线程,而是等待有任务到来才建立线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就能够看出,是预建立线程的意思,即在没有任务到来以前就建立corePoolSize个线程或者一个线程。默认状况下,在建立了线程池后,线程池中的线程数为0,当有任务来以后,就会建立一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中
  • maximumPoolSize:线程池最大线程数,它表示在线程池中最多能建立多少个线程
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认状况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起做用,直到线程池中的线程数不大于corePoolSize:即当线程池中的线程数大于corePoolSize时,若是一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize;可是若是调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起做用,直到线程池中的线程数为0;
  • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性,以及TimeUtil的源码展现
TimeUnit.DAYS;               //
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒
public enum TimeUnit {
    NANOSECONDS {
        public long toNanos(long d)   { return d; }
        public long toMicros(long d)  { return d/(C1/C0); }
        public long toMillis(long d)  { return d/(C2/C0); }
        public long toSeconds(long d) { return d/(C3/C0); }
        public long toMinutes(long d) { return d/(C4/C0); }
        public long toHours(long d)   { return d/(C5/C0); }
        public long toDays(long d)    { return d/(C6/C0); }
        public long convert(long d, TimeUnit u) { return u.toNanos(d); }
        int excessNanos(long d, long m) { return (int)(d - (m*C2)); }
    },
    MICROSECONDS {
        public long toNanos(long d)   { return x(d, C1/C0, MAX/(C1/C0)); }
        public long toMicros(long d)  { return d; }
        public long toMillis(long d)  { return d/(C2/C1); }
        public long toSeconds(long d) { return d/(C3/C1); }
        public long toMinutes(long d) { return d/(C4/C1); }
        public long toHours(long d)   { return d/(C5/C1); }
        public long toDays(long d)    { return d/(C6/C1); }
        public long convert(long d, TimeUnit u) { return u.toMicros(d); }
        int excessNanos(long d, long m) { return (int)((d*C1) - (m*C2)); }
    },
    MILLISECONDS {
        public long toNanos(long d)   { return x(d, C2/C0, MAX/(C2/C0)); }
        public long toMicros(long d)  { return x(d, C2/C1, MAX/(C2/C1)); }
        public long toMillis(long d)  { return d; }
        public long toSeconds(long d) { return d/(C3/C2); }
        public long toMinutes(long d) { return d/(C4/C2); }
        public long toHours(long d)   { return d/(C5/C2); }
        public long toDays(long d)    { return d/(C6/C2); }
        public long convert(long d, TimeUnit u) { return u.toMillis(d); }
        int excessNanos(long d, long m) { return 0; }
    },
    SECONDS {
        public long toNanos(long d)   { return x(d, C3/C0, MAX/(C3/C0)); }
        public long toMicros(long d)  { return x(d, C3/C1, MAX/(C3/C1)); }
        public long toMillis(long d)  { return x(d, C3/C2, MAX/(C3/C2)); }
        public long toSeconds(long d) { return d; }
        public long toMinutes(long d) { return d/(C4/C3); }
        public long toHours(long d)   { return d/(C5/C3); }
        public long toDays(long d)    { return d/(C6/C3); }
        public long convert(long d, TimeUnit u) { return u.toSeconds(d); }
        int excessNanos(long d, long m) { return 0; }
    },
    MINUTES {
        public long toNanos(long d)   { return x(d, C4/C0, MAX/(C4/C0)); }
        public long toMicros(long d)  { return x(d, C4/C1, MAX/(C4/C1)); }
        public long toMillis(long d)  { return x(d, C4/C2, MAX/(C4/C2)); }
        public long toSeconds(long d) { return x(d, C4/C3, MAX/(C4/C3)); }
        public long toMinutes(long d) { return d; }
        public long toHours(long d)   { return d/(C5/C4); }
        public long toDays(long d)    { return d/(C6/C4); }
        public long convert(long d, TimeUnit u) { return u.toMinutes(d); }
        int excessNanos(long d, long m) { return 0; }
    },
    HOURS {
        public long toNanos(long d)   { return x(d, C5/C0, MAX/(C5/C0)); }
        public long toMicros(long d)  { return x(d, C5/C1, MAX/(C5/C1)); }
        public long toMillis(long d)  { return x(d, C5/C2, MAX/(C5/C2)); }
        public long toSeconds(long d) { return x(d, C5/C3, MAX/(C5/C3)); }
        public long toMinutes(long d) { return x(d, C5/C4, MAX/(C5/C4)); }
        public long toHours(long d)   { return d; }
        public long toDays(long d)    { return d/(C6/C5); }
        public long convert(long d, TimeUnit u) { return u.toHours(d); }
        int excessNanos(long d, long m) { return 0; }
    },
    DAYS {
        public long toNanos(long d)   { return x(d, C6/C0, MAX/(C6/C0)); }
        public long toMicros(long d)  { return x(d, C6/C1, MAX/(C6/C1)); }
        public long toMillis(long d)  { return x(d, C6/C2, MAX/(C6/C2)); }
        public long toSeconds(long d) { return x(d, C6/C3, MAX/(C6/C3)); }
        public long toMinutes(long d) { return x(d, C6/C4, MAX/(C6/C4)); }
        public long toHours(long d)   { return x(d, C6/C5, MAX/(C6/C5)); }
        public long toDays(long d)    { return d; }
        public long convert(long d, TimeUnit u) { return u.toDays(d); }
        int excessNanos(long d, long m) { return 0; }
    };

    static final long C0 = 1L;
    static final long C1 = C0 * 1000L;
    static final long C2 = C1 * 1000L;
    static final long C3 = C2 * 1000L;
    static final long C4 = C3 * 60L;
    static final long C5 = C4 * 60L;
    static final long C6 = C5 * 24L;

    static final long MAX = Long.MAX_VALUE;

    static long x(long d, long m, long over) {
        if (d >  over) return Long.MAX_VALUE;
        if (d < -over) return Long.MIN_VALUE;
        return d * m;
    }
    public long convert(long sourceDuration, TimeUnit sourceUnit) {
        throw new AbstractMethodError();
    }
    public long toNanos(long duration) {
        throw new AbstractMethodError();
    }
    public long toMicros(long duration) {
        throw new AbstractMethodError();
    }
    public long toMillis(long duration) {
        throw new AbstractMethodError();
    }
    public long toSeconds(long duration) {
        throw new AbstractMethodError();
    }
    public long toMinutes(long duration) {
        throw new AbstractMethodError();
    }
    public long toHours(long duration) {
        throw new AbstractMethodError();
    }
    public long toDays(long duration) {
        throw new AbstractMethodError();
    }
    public void timedWait(Object obj, long timeout)
            throws InterruptedException {
        if (timeout > 0) {
            long ms = toMillis(timeout);
            int ns = excessNanos(timeout, ms);
            obj.wait(ms, ns);
        }
    }
    public void timedJoin(Thread thread, long timeout)
            throws InterruptedException {
        if (timeout > 0) {
            long ms = toMillis(timeout);
            int ns = excessNanos(timeout, ms);
            thread.join(ms, ns);
        }
    }
    public void sleep(long timeout) throws InterruptedException {
        if (timeout > 0) {
            long ms = toMillis(timeout);
            int ns = excessNanos(timeout, ms);
            Thread.sleep(ms, ns);
        }
    }

}
  • workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择会对线程池的运行过程产生重大影响,通常来讲,这里的阻塞队列有如下几种选择:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;

  ArrayBlockingQueue和PriorityBlockingQueue使用较少,通常使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

  • threadFactory:线程工厂,主要用来建立线程;
  • handler:表示当拒绝处理任务时的策略,有如下四种取值:
ThreadPoolExecutor.AbortPolicy;//丢弃任务并抛出RejectedExecutionException异常。 
ThreadPoolExecutor.DiscardPolicy;//也是丢弃任务,可是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy;//丢弃队列最前面的任务,而后从新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy;//由调用线程处理该任务 

   具体参数的配置与线程池的关系将在下一节讲述。

public class ThreadPoolExecutor extends AbstractExecutorService {
.......
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial parameters and default rejected execution handler.*/
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial parameters and default thread factory.*/
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial parameters.*/
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
.......
}

  从ThreadPoolExecutor类的代码能够知道,ThreadPoolExecutor继承了AbstractExecutorService,咱们来看一下AbstractExecutorService的源码:

public abstract class AbstractExecutorService implements ExecutorService {
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
    public Future<?> submit(Runnable task) {};
    public <T> Future<T> submit(Runnable task, T result) { };
    public <T> Future<T> submit(Callable<T> task) { };
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                            boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
    };
}

   AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。

  咱们接着看看ExecutorService接口的源代码:

public interface ExecutorService extends Executor {
    void shutdown();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
 
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

   而ExecutorService又是继承了Executor接口,咱们看一下Executor接口的实现:

public interface Executor {
    void execute(Runnable command);
}

   到这里,咱们就应该明白了ThreadPoolExecutor类、AbstractExecutorService类、ExecutorService接口和Executor接口几个之间的关系了。

  Executor是最顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思能够理解,就是用来执行传进去的任务的;

  ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

  AbstractExecutorService抽象类实现了ExecutorService接口,基本实现了ExecutorService接口中声明的全部方法;

  ThreadPoolExecutor类继承了抽象类AbstractExecutorService

  在ThreadPoolExecutor类中有几个很是重要的方法:   

    public void execute(Runnable command) {...........}
    public void shutdown() {........}
    public List<Runnable> shutdownNow() {..............}

  继承AbstractExecutorService抽象类中的方法

public Future<?> submit(Runnable task) {
  if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

  execute()方法其实是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,经过这个方法能够向线程池提交一个任务,交由线程池去执行。

  submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并无对其进行重写,这个方法也是用来向线程池提交任务的,可是它和execute()方法不一样,它可以返回任务执行的结果,去看submit()方法的实现,会发现它实际上仍是调用的execute()方法,只不过它利用了Future来获取任务执行结果。

  shutdown()和shutdownNow()是用来关闭线程池的。

  还有不少其余的方法:

  好比:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,有兴趣的朋友能够自行查阅API。

二.剖析线程池实现原理

  以上内容,咱们从宏观上介绍了ThreadPoolExecutor,下面咱们来深刻分析一下线程池的具体实现原理,将从下面几个方面讲解:

  1.线程池状态     2.任务的执行     3.线程池中的线程初始化  4.任务缓存队列及排队策略  

  5.任务拒绝策略  6.线程池的关闭  7.线程池容量的动态调整

1.线程池状态

  在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:

// runState is stored in the high-order bits
volatile
int runState;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

  runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;

  下面的几个static final变量表示runState可能的几个取值。

  当建立线程池后,初始时,线程池处于RUNNING状态;

  若是调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不可以接受新的任务,它会等待全部任务执行完毕;

  若是调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,而且会去尝试终止正在执行的任务;

  当线程池处于SHUTDOWN或STOP状态,而且全部工做线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

2.任务的执行

  在了解将任务提交给线程池到任务执行完毕整个过程以前,咱们先来看一下ThreadPoolExecutor类中其余的一些比较重要成员变量:

private final BlockingQueue<Runnable> workQueue;              //任务缓存队列,用来存放等待执行的任务
private final ReentrantLock mainLock = new ReentrantLock();   //线程池的主要状态锁,对线程池状态(好比线程池大小
                                                              //、runState等)的改变都要使用这个锁
private final HashSet<Worker> workers = new HashSet<Worker>();  //用来存放工做集
 
private volatile long  keepAliveTime;    //线程存货时间   
private volatile boolean allowCoreThreadTimeOut;   //是否容许为核心线程设置存活时间
private volatile int   corePoolSize;     //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数
 
private volatile int   poolSize;       //线程池中当前的线程数
 
private volatile RejectedExecutionHandler handler; //任务拒绝策略
 
private volatile ThreadFactory threadFactory;   //线程工厂,用来建立线程
 
private int largestPoolSize;   //用来记录线程池中曾经出现过的最大线程数
 
private long completedTaskCount;   //用来记录已经执行完毕的任务个数

   每一个变量都有其各自的做用,这里要重点解释一下corePoolSize、maximumPoolSize、largestPoolSize三个变量。

  corePoolSize在不少地方被翻译成核心池大小,其实个人理解这个就是线程池的大小。举个简单的例子:

  假若有一个工厂,工厂里面有10个工人,每一个工人同时只能作一件任务。

  所以只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人作;

  当10个工人都有任务在作时,若是还来了任务,就把任务进行排队等待;

  若是说新任务数目增加的速度远远大于工人作任务的速度,那么此时工厂主管可能会想补救措施,好比从新招4个临时工人进来;

  而后就将任务也分配给这4个临时工人作;

  若是说着14个工人作任务的速度仍是不够,此时工厂主管可能就要考虑再也不接收新的任务或者抛弃前面的一些任务了。

  当这14个工人当中有人空闲时,而新任务增加的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。

 

  这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

  也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量忽然过大时的一种补救措施。

  不过为了方便理解,在本文后面仍是将corePoolSize翻译成核心池大小。

  largestPoolSize只是一个用来起记录做用的变量,用来记录线程池中曾经有过的最大线程数目,跟线程池的容量没有任何关系

 

  下面咱们看一下任务从提交到最终执行完毕经历了哪些过程。

  JDK1.5  execute()方法实现原理

  在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然经过submit也能够提交任务,可是实际上submit方法里面最终调用的仍是execute()方法,因此咱们只须要研究execute()方法的实现原理便可:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

   上面的代码可能看起来不是那么容易理解,下面咱们一句一句解释:

  首先,判断提交的任务command是否为null,如果null,则抛出空指针异常;

  接着是这句,这句要好好理解一下:

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))

   因为是或条件运算符,因此先计算前半部分的值,若是线程池中当前线程数不小于核心池大小,那么就会直接进入下面的if语句块了。

  若是线程池中当前线程数小于核心池大小,则接着执行后半部分,也就是执行

addIfUnderCorePoolSize(command)

  若是执行完addIfUnderCorePoolSize这个方法返回false,则继续执行下面的if语句块,不然整个方法就直接执行完毕了。

  若是执行完addIfUnderCorePoolSize这个方法返回false,而后接着判断:

if (runState == RUNNING && workQueue.offer(command))

   若是当前线程池处于RUNNING状态,则将任务放入任务缓存队列;若是当前线程池不处于RUNNING状态或者任务放入缓存队列失败,则执行:

addIfUnderMaximumPoolSize(command)

  若是执行addIfUnderMaximumPoolSize方法失败,则执行reject()方法进行任务拒绝处理。

  回到前面:

if (runState == RUNNING && workQueue.offer(command))

   这句的执行,若是说当前线程池处于RUNNING状态且将任务放入任务缓存队列成功,则继续进行判断:

if (runState != RUNNING || poolSize == 0)

   这句判断是为了防止在将此任务添加进任务缓存队列的同时其余线程忽然调用shutdown或者shutdownNow方法关闭了线程池的一种应急措施。若是是这样就执行:

ensureQueuedTaskHandled(command)

   进行应急处理,从名字能够看出是保证 添加到任务缓存队列中的任务获得处理。

咱们接着看2个关键方法的实现:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize < corePoolSize && runState == RUNNING)
            t = addThread(firstTask);        //建立线程去执行firstTask任务   
        } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

  这个是addIfUnderCorePoolSize方法的具体实现,从名字能够看出它的意图就是当低于核心池大小时执行的方法。下面看其具体实现,首先获取到锁,由于这地方涉及到线程池状态的变化,先经过if语句判断当前线程池中的线程数目是否小于核心池大小,有朋友也许会有疑问:前面在execute()方法中不是已经判断过了吗,只有线程池当前线程数目小于核心池大小才会执行addIfUnderCorePoolSize方法的,为什么这地方还要继续判断?缘由很简单,前面的判断过程当中并无加锁,所以可能在execute方法判断的时候poolSize小于corePoolSize,而判断完以后,在其余线程中又向线程池提交了任务,就可能致使poolSize不小于corePoolSize了,因此须要在这个地方继续判断。而后接着判断线程池的状态是否为RUNNING,缘由也很简单,由于有可能在其余线程中调用了shutdown或者shutdownNow方法。而后就是执行

t = addThread(firstTask);

   这个方法也很是关键,传进去的参数为提交的任务,返回值为Thread类型。而后接着在下面判断t是否为空,为空则代表建立线程失败(即poolSize>=corePoolSize或者runState不等于RUNNING),不然调用t.start()方法启动线程。

  咱们来看一下addThread方法的实现:

private Thread addThread(Runnable firstTask) {
    Worker w = new Worker(firstTask);
    Thread t = threadFactory.newThread(w);  //建立一个线程,执行任务   
    if (t != null) {
        w.thread = t;            //将建立的线程的引用赋值为w的成员变量       
        workers.add(w);
        int nt = ++poolSize;     //当前线程数加1       
        if (nt > largestPoolSize)
            largestPoolSize = nt;
    }
    return t;
}

   在addThread方法中,首先用提交的任务建立了一个Worker对象,而后调用线程工厂threadFactory建立了一个新的线程t,而后将线程t的引用赋值给了Worker对象的成员变量thread,接着经过workers.add(w)将Worker对象添加到工做集当中。

 

JDK1.7版本execute()方法实现略有不一样

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

  下面咱们一句一句理解:

  首先,判断提交的任务command是否为null,如果null,则抛出空指针异常;

  接着是这句,这句要好好理解一下:

if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
  }

   若是线程池中当前线程数<核心池大小,就么就不进入执行如下代码,若是小于核心池大小,线程正在运行,尝试将给定的命令做为首要任务启动一个新线程。通知addworker自动检查runstate和workercount,因此添加线程时,防止误报,它不该该,经过返回false。

官方这么解释

/*If fewer than corePoolSize threads are running, try to start a new thread with the given command as its first task.  The call to addWorker atomic  ally checks runState and workerCount, and so prevents false alarms that would add threads when it shouldn't, by returning false.
*/

  

if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
    }

若是线程池中当前线程数>=核心池大小,就么就不进入执行以上代码,官方解释:

/* If a task can be successfully queued, then we still need
  * to double-check whether we should have added a thread
  * (because existing ones died since last checking) or that
  * the pool shut down since entry into this method. So we
  * recheck state and if necessary roll back the enqueuing if
  * stopped, or start a new thread if there are none.
  */

 

else if (!addWorker(command, false))
            reject(command);

官方解释

/*If we cannot queue task, then we try to add a new
    * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task
    */

 

下面咱们看一下Worker类的源码,在1.5JDK中Worker类只实现了Runnable接口:

public class ThreadPoolExecutor extends AbstractExecutorService {
.........省略部分代码.......
private final class Worker implements Runnable { private final ReentrantLock runLock = new ReentrantLock(); private Runnable firstTask; volatile long completedTasks; Thread thread; Worker(Runnable firstTask) { this.firstTask = firstTask; } boolean isActive() { return runLock.isLocked(); } void interruptIfIdle() { final ReentrantLock runLock = this.runLock; if (runLock.tryLock()) { try { if (thread != Thread.currentThread()) thread.interrupt(); } finally { runLock.unlock(); } } } void interruptNow() { thread.interrupt(); } private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { if (runState < STOP && Thread.interrupted() && runState >= STOP) boolean ran = false; beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor类的一个方法,没有具体实现,用户能够根据 //本身须要重载这个方法和后面的afterExecute方法来进行一些统计信息,好比某个任务的执行时间等 try { task.run(); ran = true; afterExecute(task, null); ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); } } public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); //当任务队列中没有任务时,进行清理工做 } } }
..........省略部分代码........ }

 

   它实际上实现了Runnable接口,所以上面的Thread t = threadFactory.newThread(w);效果跟下面这句的效果基本同样:

Thread t = new Thread(w);

   至关于传进去了一个Runnable任务,在线程t中执行这个Runnable。

  既然Worker实现了Runnable接口,那么天然最核心的方法即是run()方法了:

public void run() {
    try {
        Runnable task = firstTask;
        firstTask = null;
        while (task != null || (task = getTask()) != null) {
            runTask(task);
            task = null;
        }
    } finally {
        workerDone(this);
    }
}

   从run方法的实现能够看出,它首先执行的是经过构造器传进来的任务firstTask,在调用runTask()执行完firstTask以后,在while循环里面不断经过getTask()去取新的任务来执行,那么去哪里取呢?天然是从任务缓存队列里面去取,getTask是ThreadPoolExecutor类中的方法,并非Worker类中的方法,下面是getTask方法的实现:

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //若是线程数大于核心池大小或者容许为核心池线程设置空闲时间,
                //则经过poll取任务,若等待必定的时间取不到任务,则返回null
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {    //若是没取到任务,即r为null,则判断当前的worker是否能够退出
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();   //中断处于空闲状态的worker
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

   在getTask中,先判断当前线程池状态,若是runState大于SHUTDOWN(即为STOP或者TERMINATED),则直接返回null。

  若是runState为SHUTDOWN或者RUNNING,则从任务缓存队列取任务。

  若是当前线程池的线程数大于核心池大小corePoolSize或者容许为核心池中的线程设置空闲存活时间,则调用poll(time,timeUnit)来取任务,这个方法会等待必定的时间,若是取不到任务就返回null。

  而后判断取到的任务r是否为null,为null则经过调用workerCanExit()方法来判断当前worker是否能够退出,咱们看一下workerCanExit()的实现:

private boolean workerCanExit() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    boolean canExit;
    //若是runState大于等于STOP,或者任务缓存队列为空了
    //或者  容许为核心池线程设置空闲存活时间而且线程池中的线程数目大于1
    try {
        canExit = runState >= STOP ||
            workQueue.isEmpty() ||
            (allowCoreThreadTimeOut &&
             poolSize > Math.max(1, corePoolSize));
    } finally {
        mainLock.unlock();
    }
    return canExit;
}

   也就是说若是线程池处于STOP状态、或者任务队列已为空或者容许为核心池线程设置空闲存活时间而且线程数大于1时,容许worker退出。若是容许worker退出,则调用interruptIdleWorkers()中断处于空闲状态的worker,咱们看一下interruptIdleWorkers()的实现:

void interruptIdleWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)  //实际上调用的是worker的interruptIfIdle()方法
            w.interruptIfIdle();
    } finally {
        mainLock.unlock();
    }
}

   从实现能够看出,它实际上调用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:

void interruptIfIdle() {
    final ReentrantLock runLock = this.runLock;
    if (runLock.tryLock()) {    //注意这里,是调用tryLock()来获取锁的,由于若是当前worker正在执行任务,锁已经被获取了,是没法获取到锁的
                                //若是成功获取了锁,说明当前worker处于空闲状态
        try {
    if (thread != Thread.currentThread())  
    thread.interrupt();
        } finally {
            runLock.unlock();
        }
    }
}

    这里有一个很是巧妙的设计方式,假如咱们来设计线程池,可能会有一个任务分派线程,当发现有线程空闲时,就从任务缓存队列中取一个任务交给空闲线程执行。可是在这里,并无采用这样的方式,由于这样会要额外地对任务分派线程进行管理,无形地会增长难度和复杂度,这里直接让执行完任务的线程去任务缓存队列里面取任务来执行。

   咱们再看addIfUnderMaximumPoolSize方法的实现,这个方法的实现思想和addIfUnderCorePoolSize方法的实现思想很是类似,惟一的区别在于addIfUnderMaximumPoolSize方法是在线程池中的线程数达到了核心池大小而且往任务队列中添加任务失败的状况下执行的:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize < maximumPoolSize && runState == RUNNING)
            t = addThread(firstTask);
    } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

   看到没有,其实它和addIfUnderCorePoolSize方法的实现基本如出一辙,只是if语句判断条件中的poolSize < maximumPoolSize不一样而已。

  到这里,大部分朋友应该对任务提交给线程池以后到被执行的整个过程有了一个基本的了解,下面总结一下:

  1)首先,要清楚corePoolSize和maximumPoolSize的含义;

  2)其次,要知道Worker是用来起到什么做用的;

  3)要知道任务提交给线程池以后的处理策略,这里总结一下主要有4点:

  • 若是当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会建立一个线程去执行这个任务;
  • 若是当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(通常来讲是任务缓存队列已满),则会尝试建立新的线程去执行这个任务;
  • 若是当前线程池中的线程数目达到maximumPoolSize,则会采起任务拒绝策略进行处理;
  • 若是线程池中的线程数量大于 corePoolSize时,若是某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;若是容许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

JDK1.7 Worker做为ThreadPoolExecutor在继承AbstractQueuedSynchronizer类也实现了Runnable接口

 

public class ThreadPoolExecutor extends AbstractExecutorService {
........................
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks;
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
...................... }

3.线程池中的线程初始化

  默认状况下,建立线程池以后,线程池中是没有线程的,须要提交任务以后才会建立线程。

  在实际中若是须要线程池建立以后当即建立线程,能够经过如下两个方法办到:

  • prestartCoreThread():初始化一个核心线程;
  • prestartAllCoreThreads():初始化全部核心线程

  下面是这2个方法的实现:

public boolean prestartCoreThread() {
    return addIfUnderCorePoolSize(null); //注意传进去的参数是null
}
 
public int prestartAllCoreThreads() {
    int n = 0;
    while (addIfUnderCorePoolSize(null))//注意传进去的参数是null
        ++n;
    return n;
}

   注意上面传进去的参数是null,根据第2小节的分析可知若是传进去的参数为null,则最后执行线程会阻塞在getTask方法中的

r = workQueue.take();

   即等待任务队列中有任务。

4.任务缓存队列及排队策略

  在前面咱们屡次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。

  workQueue的类型为BlockingQueue<Runnable>,一般能够取下面三种类型:

  • ArrayBlockingQueue:基于数组的先进先出队列,此队列建立时必须指定大小;
  • LinkedBlockingQueue:基于链表的先进先出队列,若是建立时没有指定此队列大小,则默认为Integer.MAX_VALUE;
  • synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

5.任务拒绝策略

  当线程池的任务缓存队列已满而且线程池中的线程数目达到maximumPoolSize,若是还有任务到来就会采起任务拒绝策略,一般有如下四种策略:

ThreadPoolExecutor.AbortPolicy;//丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy;//也是丢弃任务,可是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy;//丢弃队列最前面的任务,而后从新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy;//由调用线程处理该任务

6.线程池的关闭

  ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

  • shutdown():不会当即终止线程池,而是要等全部任务缓存队列中的任务都执行完后才终止,但不再会接受新的任务
  • shutdownNow():当即终止线程池,并尝试打断正在执行的任务,而且清空任务缓存队列,返回还没有执行的任务

7.线程池容量的动态调整

  ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

  • setCorePoolSize:设置核心池大小
  • setMaximumPoolSize:设置线程池最大能建立的线程数目大小

  当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能当即建立新的线程来执行任务。

三.使用示例

  前面咱们讨论了关于线程池的实现原理,这一节咱们来看一下它的具体使用:

public class Test {
     public static void main(String[] args) {   
         ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                 new ArrayBlockingQueue<Runnable>(5));
          
         for(int i=0;i<15;i++){
             MyTask myTask = new MyTask(i);
             executor.execute(myTask);
             System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
             executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount());
         }
         executor.shutdown();
     }
}
 
 
class MyTask implements Runnable {
    private int taskNum;
     
    public MyTask(int num) {
        this.taskNum = num;
    }
     
    @Override
    public void run() {
        System.out.println("正在执行task "+taskNum);
        try {
            Thread.currentThread().sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("task "+taskNum+"执行完毕");
    }
}

  执行结果:

正在执行task 0
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 1
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 2
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 3
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 4
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
线程池中线程数目:6,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 10
线程池中线程数目:7,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 11
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 12
线程池中线程数目:9,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 14
正在执行task 13
task 0执行完毕
正在执行task 5
task 1执行完毕
正在执行task 6
task 4执行完毕
正在执行task 7
task 2执行完毕
正在执行task 8
task 10执行完毕
正在执行task 9
task 3执行完毕
task 12执行完毕
task 11执行完毕
task 13执行完毕
task 14执行完毕
task 5执行完毕
task 6执行完毕
task 8执行完毕
task 7执行完毕
task 9执行完毕

 

  从执行结果能够看出,当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了以后,便建立新的线程。若是上面程序中,将for循环中改为执行20个任务,就会抛出任务拒绝异常了。

  不过在java doc中,并不提倡咱们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来建立线程池:

Executors.newCachedThreadPool();        //建立一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
Executors.newSingleThreadExecutor();   //建立容量为1的缓冲池
Executors.newFixedThreadPool(int);    //建立固定容量大小的缓冲池

   下面是这三个静态方法的具体实现;

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

  从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。

  newFixedThreadPool建立的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

  newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;

  newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就建立线程运行,当线程空闲超过60秒,就销毁线程。

  实际中,若是Executors提供的三个静态方法能知足要求,就尽可能使用它提供的三个方法,由于本身去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。

  另外,若是ThreadPoolExecutor达不到要求,能够本身继承ThreadPoolExecutor类进行重写。

四.如何合理配置线程池的大小

  本节来讨论一个比较重要的话题:如何合理配置线程池大小,仅供参考。

  通常须要根据任务的类型来配置线程池大小:

  若是是CPU密集型任务,就须要尽可能压榨CPU,参考值能够设为 NCPU+1

  若是是IO密集型任务,参考值能够设置为2*NCPU

  固然,这只是一个参考值,具体的设置还须要根据实际状况进行调整,好比能够先将线程池大小设置为参考值,再观察任务运行状况和系统负载、资源利用率来进行适当调整。

五.线程池的使用举例

代码实现中并无实现任务接口,而是把Runnable对象加入到线程池管理器(ThreadPool),而后剩下的事情就由线程池管理器(ThreadPool)来完成了

package mine.util.thread;  
  
import java.util.LinkedList;  
import java.util.List;  
  
/** 
 * 线程池类,线程管理器:建立线程,执行任务,销毁线程,获取线程基本信息 
 */  
public final class ThreadPool {  
    // 线程池中默认线程的个数为5  
    private static int worker_num = 5;  
    // 工做线程  
    private WorkThread[] workThrads;  
    // 未处理的任务  
    private static volatile int finished_task = 0;  
    // 任务队列,做为一个缓冲,List线程不安全  
    private List<Runnable> taskQueue = new LinkedList<Runnable>();  
    private static ThreadPool threadPool;  
  
    // 建立具备默认线程个数的线程池  
    private ThreadPool() {  
        this(5);  
    }  
  
    // 建立线程池,worker_num为线程池中工做线程的个数  
    private ThreadPool(int worker_num) {  
        ThreadPool.worker_num = worker_num;  
        workThrads = new WorkThread[worker_num];  
        for (int i = 0; i < worker_num; i++) {  
            workThrads[i] = new WorkThread();  
            workThrads[i].start();// 开启线程池中的线程  
        }  
    }  
  
    // 单态模式,得到一个默认线程个数的线程池  
    public static ThreadPool getThreadPool() {  
        return getThreadPool(ThreadPool.worker_num);  
    }  
  
    // 单态模式,得到一个指定线程个数的线程池,worker_num(>0)为线程池中工做线程的个数  
    // worker_num<=0建立默认的工做线程个数  
    public static ThreadPool getThreadPool(int worker_num1) {  
        if (worker_num1 <= 0)  
            worker_num1 = ThreadPool.worker_num;  
        if (threadPool == null)  
            threadPool = new ThreadPool(worker_num1);  
        return threadPool;  
    }  
  
    // 执行任务,其实只是把任务加入任务队列,何时执行有线程池管理器觉定  
    public void execute(Runnable task) {  
        synchronized (taskQueue) {  
            taskQueue.add(task);  
            taskQueue.notify();  
        }  
    }  
  
    // 批量执行任务,其实只是把任务加入任务队列,何时执行有线程池管理器觉定  
    public void execute(Runnable[] task) {  
        synchronized (taskQueue) {  
            for (Runnable t : task)  
                taskQueue.add(t);  
            taskQueue.notify();  
        }  
    }  
  
    // 批量执行任务,其实只是把任务加入任务队列,何时执行有线程池管理器觉定  
    public void execute(List<Runnable> task) {  
        synchronized (taskQueue) {  
            for (Runnable t : task)  
                taskQueue.add(t);  
            taskQueue.notify();  
        }  
    }  
  
    // 销毁线程池,该方法保证在全部任务都完成的状况下才销毁全部线程,不然等待任务完成才销毁  
    public void destroy() {  
        while (!taskQueue.isEmpty()) {// 若是还有任务没执行完成,就先睡会吧  
            try {  
                Thread.sleep(10);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
        // 工做线程中止工做,且置为null  
        for (int i = 0; i < worker_num; i++) {  
            workThrads[i].stopWorker();  
            workThrads[i] = null;  
        }  
        threadPool=null;  
        taskQueue.clear();// 清空任务队列  
    }  
  
    // 返回工做线程的个数  
    public int getWorkThreadNumber() {  
        return worker_num;  
    }  
  
    // 返回已完成任务的个数,这里的已完成是只出了任务队列的任务个数,可能该任务并无实际执行完成  
    public int getFinishedTasknumber() {  
        return finished_task;  
    }  
  
    // 返回任务队列的长度,即还没处理的任务个数  
    public int getWaitTasknumber() {  
        return taskQueue.size();  
    }  
  
    // 覆盖toString方法,返回线程池信息:工做线程个数和已完成任务个数  
    @Override  
    public String toString() {  
        return "WorkThread number:" + worker_num + "  finished task number:"  
                + finished_task + "  wait task number:" + getWaitTasknumber();  
    }  
  
    /** 
     * 内部类,工做线程 
     */  
    private class WorkThread extends Thread {  
        // 该工做线程是否有效,用于结束该工做线程  
        private boolean isRunning = true;  
  
        /* 
         * 关键所在啊,若是任务队列不空,则取出任务执行,若任务队列空,则等待 
         */  
        @Override  
        public void run() {  
            Runnable r = null;  
            while (isRunning) {// 注意,若线程无效则天然结束run方法,该线程就没用了  
                synchronized (taskQueue) {  
                    while (isRunning && taskQueue.isEmpty()) {// 队列为空  
                        try {  
                            taskQueue.wait(20);  
                        } catch (InterruptedException e) {  
                            e.printStackTrace();  
                        }  
                    }  
                    if (!taskQueue.isEmpty())  
                        r = taskQueue.remove(0);// 取出任务  
                }  
                if (r != null) {  
                    r.run();// 执行任务  
                }  
                finished_task++;  
                r = null;  
            }  
        }  
  
        // 中止工做,让该线程天然执行完run方法,天然结束  
        public void stopWorker() {  
            isRunning = false;  
        }  
    }  
}  

测试代码:

package mine.util.thread;  
  
//测试线程池  
public class TestThreadPool {  
    public static void main(String[] args) {  
        // 建立3个线程的线程池  
        ThreadPool t = ThreadPool.getThreadPool(3);  
        t.execute(new Runnable[] { new Task(), new Task(), new Task() });  
        t.execute(new Runnable[] { new Task(), new Task(), new Task() });  
        System.out.println(t);  
        t.destroy();// 全部线程都执行完成才destory  
        System.out.println(t);  
    }  
  
    // 任务类  
    static class Task implements Runnable {  
        private static volatile int i = 1;  
  
        @Override  
        public void run() {// 执行任务  
            System.out.println("任务 " + (i++) + " 完成");  
        }  
    }  
}  

运行结果

WorkThread number:3  finished task number:0  wait task number:6
任务 1 完成
任务 2 完成
任务 3 完成
任务 4 完成
任务 5 完成
任务 6 完成
WorkThread number:3  finished task number:6  wait task number:0

分析:因为并无任务接口,传入的能够是自定义的任何任务,因此线程池并不能准确的判断该任务是否真正的已经完成(真正完成该任务是这个任务的run方法执行完毕),只能知道该任务已经出了任务队列,正在执行或者已经完成。

JAVA类库中提供的线程池简介:

     java提供的线程池更增强大,相信理解线程池的工做原理,看类库中的线程池就不会感到陌生了。

executors类源码

   public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }
.......还有一些内部类.............

executorService接口继承了executor接口

excutors接口定义了返回executorService对象的静态方法

此文章感谢做者--海子:http://www.cnblogs.com/dolphin0520/p/3932921.html

        --Hsuxu:http://blog.csdn.net/hsuxu/article/details/8985931

相关文章
相关标签/搜索