ThreadPoolExecutor源码详解

我以前一篇文章谈到了ThreadPoolExecutor的做用(http://my.oschina.net/xionghui/blog/494004),这篇文章介绍下它的原理,并根据原理分析下它的实现源码。java

咱们先来查看一下ThreadPoolExecutor API,看看它能实现什么功能,而后看看它是怎么实现这些功能的。数据库

ThreadPoolExecutor API

ThreadPoolExecutor API比较长,这里列出几个关键点:安全

  • 核心和最大池大小:若是运行的线程少于 corePoolSize,则建立新线程来处理请求(即一个Runnable实例),即便其它线程是空闲的。若是运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才建立新线程。函数

  • 保持活动时间:若是池中当前有多于 corePoolSize 的线程,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止。ui

  • 排队:若是运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列BlockingQueue,而不添加新的线程。this

  • 被拒绝的任务:当 Executor 已经关闭,或者队列已满且线程数量达到maximumPoolSize时(即线程池饱和了)请求将被拒绝。spa

好,ThreadPoolExecutor实现的功能确实不少,我们来屡屡ThreadPoolExecutor 的执行过程:.net

  1. 若是运行的线程少于 corePoolSize,ThreadPoolExecutor 会始终首选建立新的线程来处理请求;注意,这时即便有空闲线程也不会重复使用(这和数据库链接池有很大差异)。线程

  2. 若是运行的线程等于或多于 corePoolSize,则 ThreadPoolExecutor 会将请求加入队列BlockingQueue,而不添加新的线程(这和数据库链接池也不同)rest

  3. 若是没法将请求加入队列(好比队列已满),则建立新的线程来处理请求;可是若是建立的线程数超出 maximumPoolSize,在这种状况下,请求将被拒绝。

到这儿你们应该了解了线程池的大概执行过程,下面经过源码来介绍下ThreadPoolExecutor是如何实现这些过程和功能的。在理解源码前我们先来考虑几个问题:

  1. 线程池里的线程如何重复利用?好比一个线程执行完请求,怎么控制不退出。

  2. 线程池空闲时线程池里的线程数量会不会降到0?

  3. 线程池如何保持活动时间?线程能够设置一段时间内闲置就会退出(经过keepAliveTime 设置)。

  4. 线程池的阻塞队列有什么用?

  5. 请求数量太多如何处理过多的请求?

ThreadPoolExecutor源码

首先看下线程池的执行过程:

execute(Runnable command)是ThreadPoolExecutor的核心处理方法,用于处理Runnable 请求。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
}

poolSize为线程池内启动的线程数量,当线程池的poolSize小于核心池corePoolSize时,会去执行addIfUnderCorePoolSize(command),addIfUnderCorePoolSize(Runnable firstTask)会建立一个新线程来处理请求:

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
    }

能够看到,首先加锁(默认是非公平锁)已保证线程安全,而后会进行double check,状态合法则建立新线程。建立新线程处理任务是经过addThread(Runnable firstTask)方法来完成:

 private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);
        Thread t = threadFactory.newThread(w);
        if (t != null) {
            w.thread = t;
            workers.add(w);
            int nt = ++poolSize;
            if (nt > largestPoolSize)
                largestPoolSize = nt;
        }
        return t;
    }

能够看到建立线程时使用了内部类Worker封装了请求Runnable,Worker也是一个Runnable,它封装了firstTask请求,做用后面再介绍。

这里先介绍下经过threadFactory建立新线程的过程:threadFactory是能够自定义的(经过ThreadPoolExecutor 的构造函数传入),默认会使用DefaultThreadFactory,再来看看DefaultThreadFactory是如何建立新线程的:

public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }

代码很明朗,建立了一个线程,设置为非守护线程并设置优先级为5。其中group和namePrefix是在DefaultThreadFactory的构造函数中定义的:

group = (s != null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";


如今回到addIfUnderCorePoolSize(Runnable firstTask)方法,建立完线程后会直接start,而后就会调用Work的run()方法,这里介绍下Work的做用:

public void run() {
            try {
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) {
                    runTask(task);
                    task = null;
                }
            } finally {
                workerDone(this);
            }
        }

其中firstTask就是execute(Runnable command)方法传入的请求,能够看到,若是firstTask不为空,则直接执行,不然会经过getTask()从阻塞队列中获取等待的任务;到这里能够解答第一个问题了:线程池里的线程如何重复利用?一个线程会执行多个请求(即Runnable),当执行完一个请求后会经过getTask()去获取新的请求来执行(是从阻塞队列中获取,后面会介绍)。下面看看getTask()方法

Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }

挑重点介绍:当poolSize小于或等于corePoolSize时,会经过workQueue.take()一直等待,直到workQueue添加新的Runnable,到这里能够解答第二个问题了:线程池空闲时线程池里的线程数量会不会降到0?答案是若是线程池里的线程数量小于或等于核心线程数(corePoolSize)则不会退出任何线程。

然而当poolSize大于corePoolSize时或经过workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)等待keepAliveTime(ns),这里能够解答第三个问题了:线程池如何保持活动时间?答案是经过阻塞队列workQueue控制。

这里须要注意下,当poolSize大于corePoolSize时且在keepAliveTime内没有得到新的请求,则会判断当前线程是否须要退出,经过workerCanExit()来判断:

private boolean workerCanExit() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        boolean canExit;
        try {
            canExit = runState >= STOP ||
                workQueue.isEmpty() ||
                (allowCoreThreadTimeOut &&
                 poolSize > Math.max(1, corePoolSize));
        } finally {
            mainLock.unlock();
        }
        return canExit;
    }

从上面能够看到线程退出的条件为:运行状态大于STOP,或者阻塞队列为空,或者当前线程数大于核心线程数。达到条件则返回false,此时getTask()会返回空,而后Work的run()方法里面的while循环则会退出,线程此时会退出并销毁。注意,退出前会执行workerDone(this)进行一些清理操做:

void workerDone(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
            if (--poolSize == 0)
                tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }


介绍完了Work的处理过程我们再回到execute(Runnable command)方法,前面已经贴出源码了,这里再贴一份:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
}

前面讲到当线程池的poolSize小于核心池corePoolSize时会去建立新线程来执行请求,而后若是poolSize超过了corePoolSize则会直接把请求Runnable添加进阻塞队列workQueue里,这里有两种状况:

1. 若是添加成功,则直接返回。前面介绍过,线程池的线程会执行完本身的请求后会从阻塞队列workQueue中取请求来执行。

2.若是添加失败(好比队列满了),则会经过addIfUnderMaximumPoolSize(command)建立新的线程来处理请求。

到这里能够解答第四个问题了:线程池的阻塞队列有什么用?阻塞队列有两个做用:第一是为了控制线程存活,经过workQueue的take和pull实现;第二是为了存放Runnable对象,以便线程池里空闲的线程处理。

下面继续介绍addIfUnderMaximumPoolSize(command)方法:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < maximumPoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
    }

该方法和addIfUnderCorePoolSize(Runnable firstTask)方法相似,大体流程是若是线程池内建立的线程数小于最大线程数maximumPoolSize则建立新线程执行请求,不然返回false。

若是返回false,表示请求数量不能再被处理,此时会调用reject(command)来处理请求:

void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

能够看到,处理过程很简单,就直接调用handler来处理请求;这里的handler能够自定义(一样是经过构造函数传入),handler默认是使用AbortPolicy:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException();
        }

AbortPolicy处理也是很简单粗暴,直接抛出非受查异常RejectedExecutionException。

到这里也能够解答第五个问题了:请求数量太多如何处理过多的请求?答案是经过handler处理的。

ThreadPoolExecutor 钩子

ThreadPoolExecutor设置确实十分精巧(做者就是大名鼎鼎的Doug Lea),上面介绍了它的一些实现细节;下面再来谈谈它的一些钩子。

默认状况下,线程池的线程只是在新任务到达时才建立和启动的;若是但愿预先启动线程,能够使用方法 prestartCoreThread() 或 prestartAllCoreThreads() 。

prestartCoreThread()会建立并启动一个线程,prestartAllCoreThreads()会启动因此的corePoolSize个线程

public boolean prestartCoreThread() {
        return addIfUnderCorePoolSize(null);
    }
    
    public int prestartAllCoreThreads() {
        int n = 0;
        while (addIfUnderCorePoolSize(null))
            ++n;
        return n;
    }
    
    private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
    }

另外还有两个经常使用的钩子方法:beforeExecute(java.lang.Thread, java.lang.Runnable) 和 afterExecute(java.lang.Runnable, java.lang.Throwable)。

protected void beforeExecute(Thread t, Runnable r) { }

protected void afterExecute(Runnable r, Throwable t) { }

ThreadPoolExecutor内他们的默认实现为空方法。咱们能够扩展它们,它们会在执行请求先后调用:

private void runTask(Runnable task) {
            final ReentrantLock runLock = this.runLock;
            runLock.lock();
            try {
                if (runState < STOP &&
                    Thread.interrupted() &&
                    runState >= STOP)
                    thread.interrupt();
                boolean ran = false;
                beforeExecute(thread, task);
                try {
                    task.run();
                    ran = true;
                    afterExecute(task, null);
                    ++completedTasks;
                } catch (RuntimeException ex) {
                    if (!ran)
                        afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                runLock.unlock();
            }
        }

正确使用ThreadPoolExecutor

咱们一般使用 Executors 工厂方法来配置ThreadPoolExecutor,下面摘自ThreadPoolExecutor API:

相关文章
相关标签/搜索