线程池ThreadPoolExecutor、Executors参数详解与源代码分析

欢迎探讨,若有错误敬请指正 html

如需转载,请注明出处 http://www.cnblogs.com/nullzx/ java

1. ThreadPoolExecutor数据成员

Private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0));

     ctl主要用于存储线程池的工做状态以及池中正在运行的线程数。显然要在一个整型变量存储两个数据,只能将其一分为二。其中高3bit用于存储线程池的状态,低位的29bit用于存储正在运行的线程数。 缓存

     线程池具备如下五种状态,当建立一个线程池时初始化状态为RUNNING 服务器

RUNNING多线程

容许提交并处理任务并发

SHUTDOWN函数

不容许提交新的任务,可是会处理完已提交的任务性能

STOPui

不容许提交新的任务,也不会处理阻塞队列中未执行的任务,并设置正在执行的线程的中断标志位this

TIDYING

全部任务执行完毕,池中工做的线程数为0,等待执行terminated()勾子方法

TERMINATED

terminated()勾子方法执行完毕

      注意,这里说的是线程池的状态而不是池中线程的状态。

      调用线程池的shutdown方法,将线程池由RUNNING(运行状态)转换为SHUTDOWN状态。

      调用线程池的shutdownNow方法,将线程池由RUNNING或SHUTDOWN状态转换为STOP状态。

      SHUTDOWN状态和STOP状态先会转变为TIDYING状态,最终都会变为TERMINATED

Private static int runStateOf(int c)
Private static int workerCountOf(int c)
Private static int ctlOf(int rs,int wc)

      ThreadPoolExecutor同时提供上述三个方法用于池中的线程查看线程池的状态和计算正在运行的线程数。

Private int largestPoolSize;
Private final BlockingQueue<Runnable>workQueue;
Private volatile long keepAliveTime;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;

      上述数据成员对线程池的性能也有很大的影响,我会将它们放到构造中讲解。

Privatefinal HashSet<Worker> workers= new HashSet<Worker>();
Privatelong completedTaskCount;
Private volatile boolean allowCoreThreadTimeOut;
private int largestPoolSize;

       completedTaskCount表示线程池已完成的任务数。

      allowCoreThreadTimeeOut表示是否容许核心线程在空闲状态下自行销毁。

      largestPoolSize 表示线程池从建立到如今,池中线程的最大数量

private final HashSet<Worker> workers = new HashSet<Worker>();

     workers是个HashSet容器,它存储的是Worker类的对象,Worker是线程池的内部类,它继承了Runnable接口,不严格的状况下,能够将一个Worker对象当作Thread对象,也就是工做的线程。shutdown和shutdownNow方法中会使用workers完成对全部线程的遍历。

Privatefinal ReentrantLock mainLock =new ReentrantLock();
Privatefinal Condition termination = mainLock.newCondition();

      mainLock主要用于同步访问(或者说改变)线程池的状态以及线程池的各项参数,好比completedTaskCount和workers等。

     在awaitTermination方法中,(mianLock的)termination是用于延时的条件队列。

2. 构造函数

publicThreadPoolExecutor(intcorePoolSize,
		int maximumPoolSize,
		long keepAliveTime,
		TimeUnit unit,
		BlockingQueue<Runnable> workQueue,
		ThreadFactory threadFactory,
		RejectedExecutionHandler handler)

       线程池的构造函数参数多达7个,如今咱们一一来分析它们对线程池的影响。

       corePoolSize:线程池中核心线程数的最大值

       maximumPoolSize:线程池中能拥有最多线程数

       workQueue:用于缓存任务的阻塞队列

       咱们如今经过向线程池添加新的任务来讲明着三者之间的关系。

     (1)若是没有空闲的线程执行该任务且当前运行的线程数少于corePoolSize,则添加新的线程执行该任务。

     (2)若是没有空闲的线程执行该任务且当前的线程数等于corePoolSize同时阻塞队列未满,则将任务入队列,而不添加新的线程

     (3)若是没有空闲的线程执行该任务且阻塞队列已满同时池中的线程数小于maximumPoolSize,则建立新的线程执行任务。

     (4)若是没有空闲的线程执行该任务且阻塞队列已满同时池中的线程数等于maximumPoolSize,则根据构造函数中的handler指定的策略来拒绝新的任务。

       注意,线程池并无标记哪一个线程是核心线程,哪一个是非核心线程,线程池只关心核心线程的数量。

       通俗解释,若是把线程池比做一个单位的话,corePoolSize就表示正式工,线程就能够表示一个员工。当咱们向单位委派一项工做时,若是单位发现正式工还没招满,单位就会招个正式工来完成这项工做。随着咱们向这个单位委派的工做增多,即便正式工所有满了,工做仍是干不完,那么单位只能按照咱们新委派的工做按前后顺序将它们找个地方搁置起来,这个地方就是workQueue,等正式工完成了手上的工做,就到这里来取新的任务。若是不巧,年底了,各个部门都向这个单位委派任务,致使workQueue已经没有空位置放新的任务,因而单位决定招点临时工吧(临时工:又是我!)。临时工也不是想招多少就找多少,上级部门经过这个单位的maximumPoolSize肯定了你这个单位的人数的最大值,换句话说最多招maximumPoolSize–corePoolSize个临时工。固然,在线程池中,谁是正式工,谁是临时工是没有区别,彻底同工同酬。

        keepAliveTime:表示空闲线程的存活时间。

        TimeUnitunit:表示keepAliveTime的单位。

        为了解释keepAliveTime的做用,咱们在上述状况下作一种假设。假设线程池这个单位已经招了些临时工,但新任务没有继续增长,因此随着每一个员工忙完手头的工做,都来workQueue领取新的任务(看看这个单位的员工多自觉啊)。随着各个员工齐心合力,任务愈来愈少,员工数没变,那么就一定有闲着没事干的员工。这样的话领导不乐意啦,可是又不能轻易fire没事干的员工,由于随时可能有新任务来,因而领导想了个办法,设定了keepAliveTime,当空闲的员工在keepAliveTime这段时间尚未找到事情干,就被辞退啦,毕竟地主家也没有余粮啊!固然辞退到corePoolSize个员工时就再也不辞退了,领导也不想当光杆司令啊!

       handler:表示当workQueue已满,且池中的线程数达到maximumPoolSize时,线程池拒绝添加新任务时采起的策略。

为了解释handler的做用,咱们在上述状况下作另外一种假设。假设线程池这个单位招满临时工,但新任务依然继续增长,线程池从上到下,从里到外真心忙的不可开交,阻塞队列也满了,只好拒绝上级委派下来的任务。怎么拒绝是门艺术,handler通常能够采起如下四种取值。

ThreadPoolExecutor.AbortPolicy()

抛出RejectedExecutionException异常

ThreadPoolExecutor.CallerRunsPolicy()

由向线程池提交任务的线程来执行该任务

ThreadPoolExecutor.DiscardOldestPolicy()

抛弃最旧的任务(最早提交而没有获得执行的任务)

ThreadPoolExecutor.DiscardPolicy()

抛弃当前的任务

     workQueue:它决定了缓存任务的排队策略。对于不一样的应用场景咱们可能会采起不一样的排队策略,这就须要不一样类型的阻塞队列,在线程池中经常使用的阻塞队列有如下2种:

    (1)SynchronousQueue<Runnable>:此队列中不缓存任何一个任务。向线程池提交任务时,若是没有空闲线程来运行任务,则入列操做会阻塞。当有线程来获取任务时,出列操做会唤醒执行入列操做的线程。从这个特性来看,SynchronousQueue是一个无界队列,所以当使用SynchronousQueue做为线程池的阻塞队列时,参数maximumPoolSizes没有任何做用。

    (2)LinkedBlockingQueue<Runnable>:顾名思义是用链表实现的队列,能够是有界的,也能够是无界的,但在Executors中默认使用无界的。

      threadFactory:指定建立线程的工厂

     实际上ThreadPoolExecutor类中还有不少重载的构造函数,下面这个构造函数在Executors中常常用到。

public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
	this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
	Executors.defaultThreadFactory(), defaultHandler);
}

      注意到上述的构造方法使用Executors中的defaultThreadFactory()线程工厂和ThreadPoolExecutor中的defaultHandler抛弃策略。

      使用defaultThreadFactory建立的线程同属于相同的线程组,具备同为Thread.NORM_PRIORITY的优先级,以及名为"pool-XXX-thread-"的线程名(XXX为建立线程时顺序序号),且建立的线程都是非守护进程。

      defaultHandler缺省抛弃策是ThreadPoolExecutor.AbortPolicy()。

      除了在建立线程池时指定上述参数的值外,还可在线程池建立之后经过以下方法进行设置。

Public void allowCoreThreadTimeOut(boolean value)
Public void setKeepAliveTime(long time,TimeUnit unit)
Public void setMaximumPoolSize(int maximumPoolSize)
Public void setCorePoolSize(int corePoolSize)
Public void setThreadFactory(ThreadFactory threadFactory)
Public void setRejectedExecutionHandler(RejectedExecutionHandler handler)

3. 其它有关涉及池中线程数量的相关方法

public void allowCoreThreadTimeOut(boolean value) 
public int prestartAllCoreThreads()

     默认状况下,当池中有空闲线程,且线程的数量大于corePoolSize时,空闲时间超过keepAliveTime的线程会自行销毁,池中仅仅会保留corePoolSize个线程。若是线程池中调用了allowCoreThreadTimeOut这个方法,则空闲时间超过keepAliveTime的线程所有都会自行销毁,而没必要理会corePoolSize这个参数。

     若是池中的线程数量小于corePoolSize时,调用prestartAllCoreThreads方法,则不管是否有待执行的任务,线程池都会建立新的线程,直到池中线程数量达到corePoolSize。

4. Executors中的线程池的工厂方法

     为了防止使用者错误搭配ThreadPoolExecutor构造函数的各个参数以及更加方便简洁的建立ThreadPoolExecutor对象,JavaSE中又定义了Executors类,Eexcutors类提供了建立经常使用配置线程池的方法。如下是Executors经常使用的三个建立线程池的源代码。

      从源码中能够看出,Executors间接的调用了重载的ThreadPoolExecutor构造函数,并帮助用户根据不一样的应用场景,配置不一样的参数。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

      newCachedThreadPool:使用SynchronousQueue做为阻塞队列,队列无界,线程的空闲时限为60秒。这种类型的线程池很是适用IO密集的服务,由于IO请求具备密集、数量巨大、不持续、服务器端CPU等待IO响应时间长的特色。服务器端为了能提升CPU的使用率就应该为每一个IO请求都建立一个线程,以避免CPU由于等待IO响应而空闲。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

      newFixedThreadPool:需指定核心线程数,核心线程数和最大线程数相同,使用LinkedBlockingQueue 做为阻塞队列,队列无界,线程空闲时间0秒。这种类型的线程池能够适用CPU密集的工做,在这种工做中CPU忙于计算而不多空闲,因为CPU能真正并发的执行的线程数是必定的(好比四核八线程),因此对于那些须要CPU进行大量计算的线程,建立的线程数超过CPU可以真正并发执行的线程数就没有太大的意义。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

      newSingleThreadExecutor:池中只有一个线程工做,阻塞队列无界,它能保证按照任务提交的顺序来执行任务。

5. 任务的提交过程

submit方法源码

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

       submit的实现方法位于抽象类AbstractExecutorService中,而此时execute方法还未实现(而是在AbstractExecutorService的继承类ThreadPoolExecutor中实现)。submit有三种重载方法,这里我选取了两个经常使用的进行分析,能够看出不管哪一个submit方法都最终调用了execute方法。

execute方法源码

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

         因为execute方法中屡次调用addWorker,咱们这里就简要介绍一下它,这个方法的主要做用就是建立一个线程来执行Runnnable对象。

addWorker(Runnable firstTask, boolean core)

       第一个参数firstTask不为null,则建立的线程就会先执行firstTask对象,而后去阻塞队列中取任务,否直接到阻塞队列中获取任务来执行。第二个参数,core参数为真,则用corePoolSize做为池中线程数量的最大值;为假,则以maximumPoolSize做为池中线程数量的最大值。

      简要分析一下execute源码,执行一个Runnable对象时,首先经过workerCountOf(c)获取线程池中线程的数量,若是池中的数量小于corePoolSize就调用addWorker添加一个线程来执行这个任务。不然经过workQueue.offer(command)方法入列。若是入列成功还须要在一次判断池中的线程数,由于咱们建立线程池时可能要求核心线程数量为0,因此咱们必须使用addWorker(null, false)来建立一个临时线程去阻塞队列中获取任务来执行。

       isRunning(c) 的做用是判断线程池是否处于运行状态,若是入列后发现线程池已经关闭,则出列。不须要在入列前判断线程池的状态,由于判断一个线程池工做处于RUNNING状态到执行入列操做这段时间,线程池可能被其它线程关闭了,因此提早判断毫无心义。

addWorker源码

private boolean addWorker(Runnable firstTask, boolean core) {
    //这个两个for循环主要是判断可否增长一个线程,
	//外循环来判断线程池的状态
	//内循环主要是个增长线程数的CAS操做
	retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // 若是是由于线程数的改变致使CAS失败,只须要重复内循环
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);//建立线程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();//启动线程
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

6. 线程的执行过程

runWorker源码

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

       Thread的run方法实际上调用了Worker类的runWorker方法,而Worker类继承了AQS类,并实现了lock、unlock、trylock方法。可是这些方法不是真正意义上的锁,因此在代码中加锁操做和解锁操做没有成对出现。runWorker方法中获取到任务就“加锁”,完成任务后就“解锁”。也就是说在“加锁”到“解锁”的这段时间内,线程处于忙碌状态,而其它时间段,处于空闲状态。线程池就能够经过trylock方法来肯定这个线程是否空闲。

       getTask方法的主要做用是从阻塞队列中获取任务。

       beforeExecute(wt, task)和afterExecute(task, thrown)是个钩子函数,若是咱们须要在任务执行以前和任务执行之后进行一些操做,那么咱们能够自定义一个继承ThreadPoolExecutor类,并覆盖这两个方法。

getTask源代码

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

     能够看出若是容许线程在keepAliveTime时间内未获取到任务线程就销毁就调用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),不然会调用workQueue.take()方法(该方法即便获取不到任务就会一直阻塞下去)。而肯定是否使用workQueue.poll方法只有两个条件决定,一个是当前池中的线程是否大于核心线程数量,第二个是是否容许核心线程销毁,二者其一知足就会调用该方法。

7. 线程池的关闭过程

shutdown源码

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

       advanceRunState(SHUTDOWN)的做用是经过CAS操做将线程池的状态更改成SHUTDOWN状态。

       interruptIdleWorkers是对空闲的线程进行中断,它实际上调用了重载带参数的函数interruptIdleWorkers(false)

       onShutdown也是一个钩子函数

interruptIdleWorkers源码

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

      经过workers容器,遍历池中的线程,对每一个线程进行tryLock()操做,若是成功说明线程空闲,则设置其中断标志位。而线程是否响应中断则由任务的编写者决定。

8. 参考文章

[1] http://www.infoq.com/cn/articles/java-threadPool/

[2] http://my.oschina.net/u/1398304/blog/376827?fromerr=limo9iEj

[3] http://www.cnblogs.com/dolphin0520/p/3932921.html

[4] http://cuisuqiang.iteye.com/blog/2019372

[5] http://blog.sina.com.cn/s/blog_5eeabe8b0100v9i5.html

相关文章
相关标签/搜索