JDK源码分析之concurrent包(二) -- 线程池ThreadPoolExecutor

  上一篇咱们简单描述了Executor框架的结构,本篇正式开始并发包中部分源码的解读。java

  咱们知道,目前主流的商用虚拟机在线程的实现上可能会有所差异。但无论如何实现,在开启和关闭线程时必定会耗费不少CPU资源,甚至在线程的挂起和恢复JDK1.6都作了自旋锁的优化。因此,使用线程池来管理和执行多线程任务会大大提升程序执行效率。关于使用线程池的优势这里不作过多说明,咱们直接进入Java5并发包中ThreadPoolExecutor的实现的源码。安全


 

在解读源码前,咱们先来看看建立线程池的通常作法和线程池的几种类别:多线程

1 Executors.newFixedThreadPool(int nThreads); // 建立一个固定线程数的线程池
2 Executors.newScheduledThreadPool(int nThreads); // 建立一个可对线程进行时间调度的线程池
3 Executors.newCachedThreadPool(); // 建立一个可缓冲的无线程数量界限(Integer.MAX_VALUE)的线程池
4 Executors.newSingleThreadExecutor(); // 建立一个可复用的单一线程的线程池

咱们重点来看一、三、4条,在Executors中如何实现的并发

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

能够看到,差异只是ThreadPoolExecutor的构造方法的参数不一样,下面来看看ThreadPoolExecutor的构造方法的参数(按顺序):框架

  • corePoolSize - 池中所保存的线程数,包括空闲线程。
  • maximumPoolSize - 池中容许的最大线程数。
  • keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
  • unit - keepAliveTime 参数的时间单位。
  • workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。
  • threadFactory - 执行程序建立新线程时使用的工厂。
  • handler - 因为超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。

从参数说明中看出,一、三、4中的线程池主要是“核心线程数”和“最大线程数”的差异,而keepAliveTime和workQueue的差异是由“核心线程数”和“最大线程数”是否相等来决定的。那么“核心线程数”和“最大线程数”分别表明什么?带着这个疑问进入execute方法,源码以下:oop

 1 public void execute(Runnable command) {
 2     if (command == null)
 3         throw new NullPointerException();
 4     if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
 5         if (runState == RUNNING && workQueue.offer(command)) {
 6             if (runState != RUNNING || poolSize == 0)
 7                 ensureQueuedTaskHandled(command);
 8         }
 9         else if (!addIfUnderMaximumPoolSize(command))
10             reject(command); // is shutdown or saturated
11     }
12 }

第4行的代码表达一件事:当线程池中当前线程数小于核心线程数时,执行addIfUnderCorePoolSize(command)方法,而且执行成功后再也不执行后面的逻辑。那咱们就来看看这个addIfUnderCorePoolSize(command)方法作了什么:优化

 1 /**
 2  * Creates and starts a new thread running firstTask as its first
 3  * task, only if fewer than corePoolSize threads are running
 4  * and the pool is not shut down.
 5  * @param firstTask the task the new thread should run first (or
 6  * null if none)
 7  * @return true if successful
 8  */
 9 private boolean addIfUnderCorePoolSize(Runnable firstTask) {
10     Thread t = null;
11     final ReentrantLock mainLock = this.mainLock;
12     mainLock.lock();
13     try {
14         if (poolSize < corePoolSize && runState == RUNNING)
15             t = addThread(firstTask);
16     } finally {
17         mainLock.unlock();
18     }
19     if (t == null)
20         return false;
21     t.start();
22     return true;
23 }

 方法注释的主要意思是:当运行线程少于核心线程时,就建立并运行一个新的线程。代码的第15行建立了一个新的线程,第21行运行了这个线程。接下来看看如何建立的这个线程:this

 1 private Thread addThread(Runnable firstTask) {
 2     Worker w = new Worker(firstTask);
 3     Thread t = threadFactory.newThread(w);
 4     if (t != null) {
 5         w.thread = t;
 6         workers.add(w);
 7         int nt = ++poolSize;
 8         if (nt > largestPoolSize)
 9             largestPoolSize = nt;
10     }
11     return t;
12 }

第二行能够看到,线程池中真正执行的线程是由名为Worker的内部类来执行的,关于Worker的主要结构和方法以下:spa

注:addThread方法的注释中强调了要在持有mainLock的锁时才能调用,mainLock锁在线程池的安全并发的实现中担任着很是重要的角色,而且对于firstTask,有一点不一样的逻辑在,因为篇幅有限,本文这里不作重点解读了线程

 1 private final class Worker implements Runnable {
 2 
 3   // others codes
 4 
 5   /**
 6    * Main run loop
 7    */
 8   public void run() {
 9       try {
10           Runnable task = firstTask;
11           firstTask = null;
12           while (task != null || (task = getTask()) != null) {
13               runTask(task);
14               task = null;
15           }
16       } finally {
17           workerDone(this);
18       }
19   }
20 }

能够看到,Worker实现了Runnable接口,线程池中执行的线程实际上是Worker的run()方法。而第13行的runTask(task)方法的实现是直接调用了提交到线程池中的Runnable任务的run方法(具体代码请自行查看源码,这里再也不列出,其中还包含一些针对shutdown和shutdownNow的逻辑),还有比较重要的是第12行的getTask()方法,最后来看getTask()的源码:

 1 Runnable getTask() {
 2     for (;;) {
 3         try {
 4             int state = runState;
 5             if (state > SHUTDOWN)
 6                 return null;
 7             Runnable r;
 8             if (state == SHUTDOWN)  // Help drain queue
 9                 r = workQueue.poll();
10             else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
11                 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
12             else
13                 r = workQueue.take();
14             if (r != null)
15                 return r;
16             if (workerCanExit()) {
17                 if (runState >= SHUTDOWN) // Wake up others
18                     interruptIdleWorkers();
19                 return null;
20             }
21             // Else retry
22         } catch (InterruptedException ie) {
23             // On interruption, re-check runState
24         }
25     }
26 }

以上代码第13行将线程池保持线程不关闭的实现已经展现出来了:由一个死循环不断的从队列中取出提交到线程池中的Runnable任务,而后直接调用其run()方法便可

基于这个原理,咱们就会很容易的看懂其它的一些特性。

让咱们先回头看看关于“核心线程”的源码,回到最开始的execute()的源码:

 1 public void execute(Runnable command) {
 2     if (command == null)
 3         throw new NullPointerException();
 4     if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
 5         if (runState == RUNNING && workQueue.offer(command)) {
 6             if (runState != RUNNING || poolSize == 0)
 7                 ensureQueuedTaskHandled(command);
 8         }
 9         else if (!addIfUnderMaximumPoolSize(command))
10             reject(command); // is shutdown or saturated
11     }
12 }

前面咱们说根据第4行,当线程池中当前线程数小于核心线程数时,执行addIfUnderCorePoolSize(command)方法并再也不执行后面的代码。而当当前线程数大于等于核心线程数时,就会直接执行第5行的workQueue.offer(command),将新任务添加到名为workQueue队列中,也就是死循环中不断取Runnable任务的队列。这里这个workQueue是由构造方法传进来的workQueue队列。经过Executors建立线程池的一、三、4条种类能够看出,核心线程=最大线程的线程池,使用最大容量(Integer.MAX_VALUE)的LinkedBlockingQueue队列,就是说,线程池没法扩展,超出的Runnable任务所有进入阻塞队列中,等待Worker执行完。而核心线程<最大线程的线程池,使用无容量的SynchronousQueue队列,就是说,线程池能够无限扩展,扩展的线程所有新建Worker并执行。但根据getTask()方法的第10行第11行,超出核心线程数的Worker,空闲时只会存活keepAliveTime时间(构造方法的参数)。

OK,到这里,经过源码已经解释了ThreadPoolExecutor线程池主要的特性的实现原理。

上面罗里吧嗦的一大堆主要说明了JDK源码中实现的ThreadPoolExecutor线程池的如下几个主要特性(来自JDK API的描述):

核心线程数与最大线程数的意义:

ThreadPoolExecutor将根据corePoolSize和maximumPoolSize设置的边界自动调整池大小。当新任务在方法 execute(java.lang.Runnable)中提交时,若是运行的线程少于corePoolSize,则建立新线程来处理请求,即便其余辅助线程是空闲的。若是运行的线程多于corePoolSize而少于maximumPoolSize,则仅当队列满时才建立新线程。若是设置的corePoolSize和maximumPoolSize相同,则建立了固定大小的线程池。若是将 maximumPoolSize设置为基本的无界值(如Integer.MAX_VALUE),则容许池适应任意数量的并发任务。

保持活动时间:

若是池中当前有多于 corePoolSize 的线程,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止

排队:

全部 BlockingQueue 均可用于传输和保持提交的任务。可使用此队列与池大小进行交互:

  • 若是运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。
  • 若是运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
  • 若是没法将请求加入队列,则建立新的线程,除非建立此线程超出 maximumPoolSize,在这种状况下,任务将被拒绝。 

其它的特性,如终止线程池的几种方式及被拒绝的任务由构造方法传入的handler处理等本文并未给出源码解读,感兴趣的读者可自行查看JDK源码。

另外,关于ThreadPoolExecutor的子类ScheduledThreadPoolExecutor,本文不打算详细介绍了。其核心原理是同样的,只是多了“Schedule”的功能。而这个任务调度的功能是经过构造时传入的DelayQueue来实现的,你们若是感兴趣能够看下DelayQueue的介绍:“Delayed元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素”。“延迟期满”的原理是经过lock包中ReadWriteLock锁获取的Condition的awaitNanos(long nanosTimeout)方法来实现的。

总结

本文经过部分关键处源码的解读,介绍了ThreadPoolExecutor线程池的实现原理。我我的简单总结为两点:

  • 线程池中真正执行的线程是由名为Worker的内部类来执行的
  • 执行的方式是由一个死循环不断的从队列中取出提交到线程池中的Runnable任务,而后直接调用其run()方法

这两点只是作归纳,真正展开来描述,仍是有不少细节的。 

相关文章
相关标签/搜索