简单聊一聊ThreadPoolExecutor

本文首发于www.eumji025.comjava

简介

线程池的诞生于JDK1.5,主要的目的是解决咱们在使用线程的时候一般都是重复的建立和销毁,为了让线程可以获得复用,避免咱们重复的建立和销毁,提升咱们的效率,下降内存的开销。没错又是Doug Lea大神又搞出了线程池这一强力工具。数据库

咱们最熟悉的线程池使用案例应该就是数据库链接池,以及咱们任务调度都是会使用线程池的。设计模式

Executors用来建立和管理咱们具体的ThreadPoolExecutor,这里使用了典型的设计模式 - 工厂模式。ThreadPoolExecutor是真正线程池,继承了AbstractExecutorService类,Java集合类和并发类都大量的使用了抽象类实现部分通用的功能。此处的AbstractExecutorService就实现了ExecutorService部分接口功能。最关键的execute方法交给子类去实现。和集合类的套路基本上是如出一辙。并发

看一下Executors的具体实现。ide

public static ExecutorService newFixedThreadPool(int var0) {
    return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
    return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0));
}
public static ScheduledExecutorService newScheduledThreadPool(int var0) {
    return new ScheduledThreadPoolExecutor(var0);
}
复制代码

随便列举了其中的几个例子,这里具体描述一下构造函数的几个参数做用。函数

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
复制代码

corePoolSize => 指代默认建立的线程数。工具

maximumPoolSize => 建立线程的最大数量。测试

keepAliveTime => 线程存活时间this

unit => 存活的时间,应该都很熟悉,包含日,时,分,秒等spa

workQueue => 存放线程的阻塞队列

threadFactory => 建立线程的工厂,默认为DefaultThreadFactory,主要是重写ThreadFactory接口的newThread的方法。

handler => 拒绝策略,主要是指工做任务超过了workQueue的大小后,该执行哪一种策略进行处理。主要有一下几种:

1.AbortPolicy => 默认的策略,直接抛出异常

2.DiscardPolicy => 放弃被拒绝的任务,其实就是啥也不干

3.DiscardOldestPolicy => 放弃最老的任务,也就是立刻要执行的任务

4.CallerRunsPolicy => 直接执行被放弃的任务,我的不喜欢,赤裸裸的插队(并且根本就没有拒绝)

上面简单的介绍了线程池的各个参数,如今就看一下到底能够生成哪些线程池。

fixedThreadPool

fixedThreadPool => 固定大小线程池,一旦建立,数量就不会再改变,若是任务超过线程的数量,就会进入等待的队列,使用的LinkedBlockingQueue就能够认为是无界的队列了由于capacity等于Integer.MAX_VALUE

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
复制代码

咱们简单的测试一下就能够发现其中的功能

static class MyThread extends Thread{
    @Override
    public void run() {
      try {
        Thread.sleep(500L);
        System.out.println(Thread.currentThread().getName() +" running !!!");
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
}
static void fixedThreadPoolTest(){
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    for (int i = 0; i < 20; i++) {
        executorService.submit(new MyThread());
    }
}
复制代码

执行这个test方法的时候,会发现只会有5种线程名称被打印。说明没有没有得到线程的任务就等待,并且是复用的。后续的例子都将使用MyThread作测试。

cachedThreadPool

newCachedThreadPool => 大小不固定,为达到最大值时能够动态生成线程,默认使用的是SynchronousQueue队列,是一种同步队列,指只能存放一个元素,添加了必须被消费了才能再添加。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
复制代码

下面简单的使用一个例子进行说明。

static void cachedThreadPoolTest() throws InterruptedException {
    ExecutorService executorService = Executors.newCachedThreadPool();

    for (int i = 0; i < 5; i++) {
      executorService.submit(new MyThread());
    }
    Thread.sleep(1000L);
    for (int i = 0; i < 20; i++) {
      executorService.submit(new MyThread());
    }
}
复制代码

上面测试的例子将复用前五个线程,并再新建15个线程,结果就不展现了。

singleThreadExecutor

singleThreadExecutor => 大小固定的且只有一个线程的线程池,能够理解为一个元素的fixedThreadPool。

public static ExecutorService newSingleThreadExecutor() {
     return new FinalizableDelegatedExecutorService
       (new ThreadPoolExecutor(1, 1,
                               0L, TimeUnit.MILLISECONDS,
                               new LinkedBlockingQueue<Runnable>()));
 }
复制代码

就不进行测试代码的展现了,由于和fixedThreadPool的道理相同,只不过只有一个线程。

scheduledThreadPool

scheduledThreadPool => 是一种大小不固定的定时任务线程池。使用的DelayedWorkQueue延时队列进行任务记录。DelayedWorkQueue是ScheduledThreadPoolExecutor的内部类

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
复制代码

下面演示一个简单的例子演示如何。须要注意的是延时任务调用的方法会有点不一样。

static void singleThreadScheduledTest(){
    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
    for (int i = 0; i < 2; i++) {
      //延迟5秒执行,只执行一次
      executorService.schedule(new MyThread(),1,TimeUnit.SECONDS);
      //延迟5秒 执行5个周期执行
      //executorService.scheduleAtFixedRate(new MyThread(),5,3, TimeUnit.SECONDS);
    }
}
复制代码

上面测试里的1表明延时1秒执行,且只执行一次,若是想周期执行,可调用下面注释的方法scheduleAtFixedRate方法,表示第一次延时5秒执行,后面的是以3秒为一个周期的执行。

须要注意的是:

1.不会自动出现中止,除非发生异常或者手动的取消掉。

2.假如执行的周期比线程的执行时间短,则会以延时的任务的执行时间长度为准。

singleThreadScheduledExecutor

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
      (new ScheduledThreadPoolExecutor(1));
}
复制代码

能够看出来singleThreadScheduledExecutor和ScheduledThreadPoolExecutor仍是有必定的区别的,singleThreadScheduledExecutor是单独的一个实现类,不过本文不作具体分析。

方法解读

上面大概的介绍了线程池中的几种线程池,接下来咱们将介绍一下如何其中究竟是如何实现的。咱们只说一下常规的线程池的执行逻辑。

从上面的代码咱们能够看到,各类线程池的不一样主要体如今线程的数量范围和使用的workQueue不一样。最终都会调用submit方法,首先看一下线程池中几种不一样参数的submit方法。

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
}
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
}
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
复制代码

submit方法支持多种类型的任务,最终都会包装成RunnableFuture的task。这里体现了一个重要的设计模式 - 适配器模式,下面看一下详细代码

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}
复制代码

其实目的很简单就是把Runnable最终包装成Callable。

在介绍具体的方法以前,首先咱们看一下线程池中几种状态,由于后续会使用到。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));	//最小的负数
    private static final int COUNT_BITS = Integer.SIZE - 3;
    public static final int SIZE = 32;
	private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//29个1

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS; //11100000000000000000000000000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;//0
    private static final int STOP       =  1 << COUNT_BITS;//100000000000000000000000000000
    private static final int TIDYING    =  2 << COUNT_BITS;//1000000000000000000000000000000
    private static final int TERMINATED =  3 << COUNT_BITS;//1100000000000000000000000000000

    // c & 29个0 其实就是获取高三位
    private static int runStateOf(int c) { return c & ~CAPACITY; }
    private static int workerCountOf(int c) { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

复制代码

这是一个很是经典的设计,咱们能够看出,咱们的低29位都是用来记录任务的。高3位表示状态

RUNNING => 高3位值是111。 此状态表示能够接受新任务

SHUTDOWN => 高3位值是000。 此状态不能接受新任务,但会继续已有的任务

STOP => 高3位值是001。 此状态的线程不接受也不处理任务,而且会中断处理中的任务

TIDYING => 高3位值是010。 线程池全部的任务都已经终止,而且worker数量为0,即将运行terminated方法

TERMINATED => 高3位值是011。在TIDYING状态上,已经运行了terminated方法,线程池彻底的中止。

上面的这些数字很重要,必定要记住。

接着上面的讲,最终不论是哪一种submit的方法,都会交给execute方法去执行真正的逻辑。

public void execute(Runnable command) {
    if (command == null) throw new NullPointerException();
    int c = ctl.get();
   //若是尚未超过线程池的线程容量,直接分配线程执行
    if (workerCountOf(c) < corePoolSize) {
      if (addWorker(command, true)) return;
      c = ctl.get();
    }
   //不然判断线程池的状态,若是是正在运行状态,加入到workQueue等待执行
    if (isRunning(c) && workQueue.offer(command)) {
      int recheck = ctl.get();
      //再次校验状态,若是线程池不在运行状态,移除任务,并执行拒绝策略。
      if (! isRunning(recheck) && remove(command))
        reject(command);
      //若是没有正在运行的线程,添加一个线程
      else if (workerCountOf(recheck) == 0)
        addWorker(null, false); //直接开启一个线程,由于任务已经在workQueue上了
    }
  //若是workQueue添加失败,尝试直接起一个worker,用于coreSize和MaxSize不等的状况
    else if (!addWorker(command, false))
      reject(command);
}
复制代码

简单的概述一下上述代码所作的事情:

1.若是当前的活动的线程小于设置的线程数,则直接启动新线程执行任务,不然

2.若是线程池是处于运行状态,且线程数为corePoolSize,且workQueue没满,把任务加入到等待队列中,若是执行成功,再次检查线程的运行状态,到第三步,不然到第四步

3.再次校验状态,若是没有处于运行的状态,把添加的任务剔除。

4.线程池若是不处于运行状态,或者workQueue已经满了,workQueue满了,还能够再次尝试执行分配一个线程(用于corePoolSize不等于maximumPoolSize的状况下),若是仍是失败,说明线程池已经到极限了和或者是已经关闭了线程池。

接下来须要看一下第一步中的addWorker方法

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 线程的状态,running状态的没法进入
           //须要注意其中不能添加一个woker的条件。SHUTDOWN状态下且不一样时知足firstTask为null,workQueue为空的条件
            if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))
                return false;
            //思考可以到达这里的条件
            for (;;) {
               //线程的数量
                int wc = workerCountOf(c);
               //超出了最大容量,或者线程超过规定的数量,失败
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
               //数量+1,跳出循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get(); 
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
           //添加一个worker 里面有玄机,后面介绍
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //再次检查线程池的状态
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // 若是线程已经在运行中,
                            throw new IllegalThreadStateException();
                      //添加一个worker记录
                        workers.add(w);
                        int s = workers.size();
                       //增长最大数量 ,默认为0
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
              // 添加成功则启动任务
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
复制代码

addWorker方法看起来比较长,其实作的事情很是简单。

1.判断线程池的状态,若是不是正常状态(rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))则添加失败,不然进行第二步

2.根据条件判断是否还能在添加线程,能够则workCount加1成功跳出循环,执行worker逻辑,不然重试或者结束。

3.第二步成功,配置Worker,并在此检查线程池的状态,若是没有问题,则设置worker相关信息,并启动线程。

而后咱们在来看一下execute方法中的remove方法,我相信remove方法不只仅是从workQueue移除元素,否则也不会单独写个方法。

public boolean remove(Runnable task) {
    boolean removed = workQueue.remove(task);
    tryTerminate(); // In case SHUTDOWN and now empty
    return removed;
}
复制代码

首先从workQueue移除元素,而后尝试关闭线程池。具体逻辑仍是在tryTerminate方法中。

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            //若是是运行状态,或者已经中止,或者是存于shutdown状态,可是任务没有处理完,都直接结束,也就证实尝试中止失败
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
           //若是还有工做的线程,把worker的中断状态设为true,ONLY_ONE表示只中断一个
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
			//没有工做的线程了真的药中止了
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
              //设置为TIDYING状态
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                      //留给子类扩展的
                        terminated();
                    } finally {
                       //最终设置为TERMINATED状态
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
        }
    }
复制代码

tryTerminate是任何从workerQueue移除任务都会调用的方法,用来判断当前线程池是否已经没有活着的线程了,若是没有了就关闭线程池。

再次回到execute方法中,reject方法就是调用拒绝策略中的rejectedExecution方法,默认的AbortPolicy就是抛个异常仅此而已。

后面也只是一些相同的方法就再也不多介绍了,最重要的仍是条件判断。

shutdown方法

在看一下线程池的shutdown相关的方法。

主要包含三个方法:

1.shutdown方法 => 将线程池的状态设置为shutdown状态

2.shutdownNow方法 => 直接中止线程池。

3.isShutdown方法 => 判断当前线程池的状态是否不是running状态

下面跟随着源码分别看看这几个方法的详情。

public boolean isShutdown() {
  	return ! isRunning(ctl.get());
}
private static boolean isRunning(int c) {
  	return c < SHUTDOWN;
}
复制代码

isShutdown方法仍是老套路,直接判断是否小于SHUTDOWN状态就能够判断是否为Running状态。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
      checkShutdownAccess(); //检查权限,
      advanceRunState(SHUTDOWN); //设置为SHUTDOWN状态
      interruptIdleWorkers(); //中断等待任务的线程
      onShutdown(); // 空方法,为ScheduledThreadPoolExecutor留的方法
    } finally {
      mainLock.unlock();
    }
    tryTerminate(); //尝试关闭线程池
}
复制代码

shutdown方法是将线程池的状态设置为SHUTDOWN,而且设置线程的中断状态,注意这里的中断只会中断在等待中的线程(没有上锁的线程)。比较简单里面的详情就不展现出来了。

public List<Runnable> shutdownNow() {
     List<Runnable> tasks;
     final ReentrantLock mainLock = this.mainLock;
     mainLock.lock();
     try {
       checkShutdownAccess();
       advanceRunState(STOP); //设置为STOP状态
       interruptWorkers(); //中断全部运行中的且没有被设置中断标志的线程
       tasks = drainQueue(); //获取等待中的任务列表
     } finally {
       mainLock.unlock();
     }
     tryTerminate();
     return tasks;
 }
复制代码

shutdownNow方法和上面的shutdown方法很类似,只是不一样的是,shutdownNow更完全,直接将线程池的状态设置为STOP,而且会移除有全部的等待中的task,并且这里设置的是全部运行中线程的中断状态。下面看一下drainQueue方法

private List<Runnable> drainQueue() {
     BlockingQueue<Runnable> q = workQueue;
     ArrayList<Runnable> taskList = new ArrayList<Runnable>();
     q.drainTo(taskList);//将workQueue的对象转移到taskList(会清空q里的元素)
     if (!q.isEmpty()) {
       //若是q还有新offer的元素
       for (Runnable r : q.toArray(new Runnable[0])) {
         if (q.remove(r))
           taskList.add(r); //添加到taskList中
       }
     }
     return taskList;
 }
复制代码

主要作的事情就是从workQueue中获取全部的任务放到taskList中,并从workQueue中删除。

补充

前面咱们了解线程是如何执行任务和关闭线程池的方法,可是咱们须要思考这样一个场景,就是当咱们有任务被放在workQueue里的时候,上述的方法并无讲述这样的状况下是如何执行的,这里须要介绍一下其中的逻辑。这时候就能够看一下留在addWorker的玄机了。。。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}
复制代码

能够从构造函数看出,thread对象其实指向的就是当前的worker,因此addWorker方法后面的thread.start就会调用worker.run方法。还有一点值得注意的是Worker继承了AbstractQueuedSynchronizer,下面详细看一下run方法中的实现

public void run() {
  	runWorker(this);
}
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask; //取出worker中的任务
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
      //获取任务,
        while (task != null || (task = getTask()) != null) {
            w.lock();
           //若是线程池中止了,当前 线程没有终端,将当前线程设为中断状态
            if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();
            try {
              //空方法,留给子类的
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                   //执行task的逻辑
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
复制代码

从上面的方法能够看出,worker的run方法主要作了几件事。

1.循环获取任务,并执行,发生异常则抛出异常

2.若是没有问题最终关闭worker。

首先看一下getTask方法是如何操做的。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

      //检查队列是否为空,或者线程池是否处理关闭状态
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
           //递减worker的数量
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 判断是否已经设置超时
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
       //线程超过了最大数量或者超时,说明不可用了,干掉
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
           //获取任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
           //不然获取任务超时
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
复制代码

上述获取任务的方法仍是比较复杂的,特别是状态的判断,简单的总结一下:

1.若是线程池的状态是STOP或者工做的队列为空,循环去一个一个的减小worker的数量,此处只是减小数量。并无结束里面的worker。

2.若是不知足第一条,开始校验是否设置了超时关闭线程或者说线程数超过了设置的值。这时候判断去判断线程1.是否超过了线程数的最大值或者知足了超时的条件 2.线程数大于1或者已经没有待处理的工做了。知足这些条件就去掉一个worker

3.若是2也没有知足,就尝试获取task,获取到了就返回,不然就设置timeOut为true,说明取task失败了。

上面介绍了如何获取任务和管理worker的getTask方法,下面咱们在看一下任务执行完后的processWorkerExit方法。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
   //若是completedAbruptly为true就减小worker的数量,产生于runWorker发生异常。
    if (completedAbruptly) decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
      //删除这个worker
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    //完成任务了就尝试关闭线程池
    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
           //判断有没有设置超时
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
          //判断还有没有线程能够工做
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
      //执行addworker
        addWorker(null, false);
    }
}
复制代码

上述的方法主要是worker进行退出,主要作的几件事以下

1.判断是否正常的结束,若是不是就要删减worker。

2.记录完成任务的数量并移除worker。

3.尝试关闭线程池,而后判断线程池的状态,若是尚未处于中止的状态,继续判断是否是正常的结束,若是是的话去检查线程池里线程的状态,若是正常就结束,若是不知足最好都添加一个worker。

总结

1.本文首先介绍了线程池的几种类型的线程池,从代码均可以看到其实共用用的同一个构造方法,不一样的只是参数的不用。

2.分析了线程池的几种状态,这里是比较重要的,特别是高三位表示状态,低29位表示线程数。

3.分析了submit和execute方法,经过corePoolSize,maximumPoolSize,workQueue来判断新任务是新启一个线程仍是加入到workQueue中,或是执行拒绝策略。

4.分析了shutdown相关的方法逻辑

5.分析了worker究竟是如何工做的,这里主要是对内部类worker的run及其涉及的方法进行解读。

本篇只是我的看源码的一些观点,若是存在不清晰或者表述错误的观点欢迎你们反馈。

与君共勉!!!

相关文章
相关标签/搜索