首先是其继承关系以下:java
经过观察上面四种线程池的源码:
缓存
如:newFixedThreadPoolide
- public static ExecutorService newFixedThreadPool(int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>());
- }
如:newCachedThreadPool
- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
- }
如:newSingleThreadExecutor
- public static ExecutorService newSingleThreadExecutor() {
- return new FinalizableDelegatedExecutorService
- (new ThreadPoolExecutor(1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>()));
- }
能够发现,其实它们调用的都是同一个接口ThreadPoolExecutor方法,只不过传入参数不同而已。下面就来看看这个神秘的ThreadPoolExecutor。
首先来看看它的一些基本参数:this
- public class ThreadPoolExecutor extends AbstractExecutorService {
-
-
- volatile int runState;
- static final int RUNNING = 0;
- static final int SHUTDOWN = 1;
- static final int STOP = 2;
- static final int TERMINATED = 3;
-
-
- private final BlockingQueue<Runnable> workQueue;
-
- private final ReentrantLock mainLock = new ReentrantLock();
-
- private final Condition termination = mainLock.newCondition();
-
- private final HashSet<Worker> workers = new HashSet<Worker>();
-
- private volatile long keepAliveTime;
-
- private volatile int corePoolSize;
-
- private volatile int maximumPoolSize;
-
- private volatile int poolSize;
-
- private volatile RejectedExecutionHandler handler;
-
- private volatile ThreadFactory threadFactory;
-
- private int largestPoolSize;
-
- private long completedTaskCount;
-
- ................
- }
初始化线程池大小 有如下四种方法:url
从源码中能够看到其实最终都是调用了如下的方法:spa
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- if (corePoolSize < 0 ||
- maximumPoolSize <= 0 ||
- maximumPoolSize < corePoolSize ||
- keepAliveTime < 0)
- throw new IllegalArgumentException();
- if (workQueue == null || threadFactory == null || handler == null)
- throw new NullPointerException();
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.threadFactory = threadFactory;
- this.handler = handler;
- }
这里很简单,就是设置一下各个参数,并校验参数是否正确,而后抛出对应的异常。
接下来咱们来看看最重要的方法execute,其源码以下:.net
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
- if (runState == RUNNING && workQueue.offer(command)) {
- if (runState != RUNNING || poolSize == 0)
- ensureQueuedTaskHandled(command);
- }
- else if (!addIfUnderMaximumPoolSize(command))
- reject(command);
- }
- }
笔者在上面加了点注释。下面咱们一个一个判断来看
首先判断1线程
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
orm
(1)当poolSize >= corePoolSize 不成立时,代表当前线程数小于核心线程数目,左边返回fasle.接着执行右边判断!addIfUnderCorePoolSize(command)对象
它作了以下操做
- private boolean addIfUnderCorePoolSize(Runnable firstTask) {
- Thread t = null;
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- if (poolSize < corePoolSize && runState == RUNNING)
- t = addThread(firstTask);
- } finally {
- mainLock.unlock();
- }
- return t != null;
- }
发现它又调用addTread
- private Thread addThread(Runnable firstTask) {
- Worker w = new Worker(firstTask);
- Thread t = threadFactory.newThread(w);
- boolean workerStarted = false;
- if (t != null) {
- if (t.isAlive())
- throw new IllegalThreadStateException();
- w.thread = t;
- workers.add(w);
- int nt = ++poolSize;
- if (nt > largestPoolSize)
- largestPoolSize = nt;
- try {
- t.start();
- workerStarted = true;
- }
- finally {
- if (!workerStarted)
- workers.remove(w);
- }
- }
- return t;
- }
其实Work是真实去调用线程方法的地方,它是对Thread类的一个包装,每次Thread类调用其start方法时,就会调用到work的run方法。其代码以下,
- private void runTask(Runnable task) {
- final ReentrantLock runLock = this.runLock;
- runLock.lock();
- try {
- if ((runState >= STOP ||
- (Thread.interrupted() && runState >= STOP)) &&
- hasRun)
- thread.interrupt();
-
- boolean ran = false;
- beforeExecute(thread, task);
- try {
- task.run();
- ran = true;
- afterExecute(task, null);
- ++completedTasks;
- } catch (RuntimeException ex) {
- if (!ran)
- afterExecute(task, ex);
- throw ex;
- }
- } finally {
- runLock.unlock();
- }
- }
-
-
- public void run() {
- try {
- hasRun = true;
- Runnable task = firstTask;
- firstTask = null;
- while (task != null || (task = getTask()) != null) {
- runTask(task);
- task = null;
- }
- } finally {
- workerDone(this);
- }
- }
发现要执行一个线程真的很不容易,若是addIfUnderCorePoolSize返回true,刚代表成功添加一条线程,并调用了其start方法,那么整个调用到此结束。若是返回fasle.那么就进入判断2.
(2)当poolSize >= corePoolSize成立时,整个判断返回true。接着执行判断2
判断2
- if (runState == RUNNING && workQueue.offer(command)) {
若是当前线程池在运行状态,而且将当前线程加入到缓冲队列中。workQueue的offer是一个非阻塞方法。如查缓冲队列满了的话,返回为false.不然返回true;
若是上面两个都 为true,代表线程被成功添加到缓冲队列中,而且当前线程池在运行。进入判断3
判断3
- if (runState != RUNNING || poolSize == 0)
- ensureQueuedTaskHandled(command);
当线程被加入到线程池中,进入判断3.若是这时线程池没有在运行或者运行的线程为为0。那么就调用ensureQueuedTaskHandled,它作的实际上是判断下是否在拒绝这个线程的执行。
- private void ensureQueuedTaskHandled(Runnable command) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- boolean reject = false;
- Thread t = null;
- try {
- int state = runState;
- if (state != RUNNING && workQueue.remove(command))
- reject = true;
- else if (state < STOP &&
- poolSize < Math.max(corePoolSize, 1) &&
- !workQueue.isEmpty())
- t = addThread(null);
- } finally {
- mainLock.unlock();
- }
- if (reject)
- reject(command);
- }
判断4
- else if (!addIfUnderMaximumPoolSize(command))
- reject(command);
在判断2为false时执行,代表当前线程池没有在运行或者该线程加入缓冲队列中失败,那么就会尝试再启动下该线程,若是仍是失败,那就根据拒绝策略来处理这个线程。其源码以下:
- private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
- Thread t = null;
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- if (poolSize < maximumPoolSize && runState == RUNNING)
- t = addThread(firstTask);
- } finally {
- mainLock.unlock();
- }
- return t != null;
- }
通常调用这个方法是发生在缓冲队列已满了,那么线程池会尝试直接启动该线程。固然,它要保存当前运行的poolSize必定要小于maximumPoolSize。不然,最后。仍是会拒绝这个线程!
以上大概就是整个线程池启动一条线程的总体过程。
总结:
ThreadPoolExecutor中,包含了一个任务缓存队列和若干个执行线程,任务缓存队列是一个大小固定的缓冲区队列,用来缓存待执行的任务,执行线程用来处理待执行的任务。每一个待执行的任务,都必须实现Runnable接口,执行线程调用其run()方法,完成相应任务。
ThreadPoolExecutor对象初始化时,不建立任何执行线程,当有新任务进来时,才会建立执行线程。
构造ThreadPoolExecutor对象时,须要配置该对象的核心线程池大小和最大线程池大小:
当目前执行线程的总数小于核心线程大小时,全部新加入的任务,都在新线程中处理
当目前执行线程的总数大于或等于核心线程时,全部新加入的任务,都放入任务缓存队列中
当目前执行线程的总数大于或等于核心线程,而且缓存队列已满,同时此时线程总数小于线程池的最大大小,那么建立新线程,加入线程池中,协助处理新的任务。
当全部线程都在执行,线程池大小已经达到上限,而且缓存队列已满时,就rejectHandler拒绝新的任务
转自: https://blog.csdn.net/evankaka/article/details/51489322