深刻分析java线程池的理解

                           深刻分析java线程池的理解

1.概述

Java1.5后引入的Executor框架的最大优势是把任务的提交和执行解耦,只需把Task描述清楚,而后提交便可。至于这个Task是怎么被执行的,被谁执行的,何时执行的,就所有交给线程池管理。java

2.先上案例

MyTask.javaweb

package com.qu.webservice;

public class MyTask implements Runnable{
    private int taskNum;
    
    public MyTask(int taskNum){
    	this.taskNum = taskNum;
    }
	@Override
	public void run() {
		// TODO Auto-generated method stub
		System.out.println("正在执行的任务"+taskNum);
		try {
			Thread.sleep(4000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println("任务"+taskNum+"执行完毕!");
	}

}

main函数:数组

package com.qu.webservice;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestExecutor {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
        		5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));
        for(int i= 0;i<5;i++){
        	MyTask task = new MyTask(i);
        	executor.execute(task);
        	System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
                    executor.getQueue().size()+",已执行完毕的任务数目:"+executor.getCompletedTaskCount());
        }
        executor.shutdown();
	}

}

当咱们任务的个数为5时,运行后效果以下:缓存

当咱们任务的个数为10时,运行后效果以下:框架

 从执行结果能够看出,当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了以后,便建立新的线程。若是上面程序中,将for循环中改为执行15个任务,此时线程池中线程数为10,已经达到了线程池容许的最大线程数,效果图以下:ide

若是上面程序中,将for循环中改为执行16个任务时,此时就会抛出拒绝异常,效果图以下:函数

可能有人会问,为何会出现这样的状况呢?今天我就带着你们从源码理解线程池的工做原理,来搞清楚咱们如今的疑问。工具

3.Executor框架成员

线程池实现框架中包含了一堆实现类,它们之间的关系以下,只有了解了各个类之间的关系,才能方便咱们更好的理解线程池的实现,来张图给你们看看。ui

从图中能够看到Executor、ExecutorService、ScheduledExecutorService定义线程池接口,ThreadPoolExecutor和ScheduledThreadPoolExecutor是线程池的实现,前者是一个普通的线程池,后者一个按期调度的线程池,Executors是辅助工具,用以帮助咱们快速定义线程池。this

3.1首先看一下ThreadPoolExecutor这个类

 java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,所以若是要透彻地了解Java中的线程池,必须先了解这个类。下面咱们来看一下ThreadPoolExecutor类的具体实现源码。

 在ThreadPoolExecutor类中提供了四个构造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

  从上面的代码能够得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,经过观察每一个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工做。

构造参数的选择对于不一样的场景显得极为重要,下面解释下一下构造器中各个参数的含义:

  • corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有很是大的关系。默认状况下,线程池建立后,线程池中的个数是为0,只有来了任务以后,才会去建立线程来执行任务。若是当线程池中的线程数大于corePoolSize时,在提交到线程池的任务就会被存放到缓存任务队列当中。除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就能够看出,是预建立线程的意思,即在没有任务到来以前就建立corePoolSize个线程或者一个线程。
  • maximumPoolSize:线程池 中最大线程数,这个参数也是一个很是重要的参数,它表示在线程池中容许最多能建立多少个线程。
  • keepAliveTime:表示线程没有执行任务时可以存活的时间,只有当线程池中的线程数大于corePoolSize,keepAliveTime这个参数才会起做用。也就是说当线程池中的线程数大于corePoolSize时,此时有一个空闲的线程空闲时间为keepAliveTime,则这个线程就会被终止。直到线程池中的线程数不超过corePoolSize。可是若是调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起做用,直到线程池中的线程数为0;
  • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒
  • workQueue:一个阻塞队列,用来存取等待执行的任务,这个参数的选择也极为重要,阻塞队列有以下几种:
ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO原则对元素进行排序。
LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序元素,吞吐量一般要高于ArrayBlockingQuene。
SynchronousQuene:一个不存储元素的阻塞队列,每一个插入操做必须等到另外一个线程调用移除操做,不然插入操做一直处于阻塞状态,吞吐量一般要高于LinkedBlockingQuene。
priorityBlockingQuene:具备优先级的无界阻塞队列。

 ArrayBlockingQueue和PriorityBlockingQueue使用较少,通常使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

  • threadFactory:建立线程的工厂。能够经过自定义线程工厂给每一个线程设置有意义的名称。如guava提供的ThreadFactoryBuilder。
new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
  • rejectedExecutionHandler:饱和策略,当阻塞队列满了且没有空闲的工做线程,说明线程池处于饱和状态,那么必须采起一种策略处理提交的新任务。这个策略在默认状况下是AbortPolicy,表示没法处理新任务时抛出异常。不过,线程池提供了4种策略:
一、AbortPolicy:直接抛出异常。
二、CallerRunsPolicy:只用调用者所在的线程来运行任务。
三、DiscardOldestPolicy:丢弃阻塞队列中最近的一个任务,并执行当前任务。
四、DiscardPolicy:直接丢弃。
固然,也能够根据应用场景来实现RejectedExecutionHandler接口自定义饱和策略,如记录日志或持久化存储不能处理的任务。

从上面给出的ThreadPoolExecutor类的代码能够知道,ThreadPoolExecutor继承了AbstractExecutorService,咱们来看一下AbstractExecutorService的实现:

3.2AbstractExecutorService

咱们来看一下AbstractExecutorService这个抽象类的实现:

public abstract class AbstractExecutorService implements ExecutorService {
 
     
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
    public Future<?> submit(Runnable task) {};
    public <T> Future<T> submit(Runnable task, T result) { };
    public <T> Future<T> submit(Callable<T> task) { };
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                            boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
    };
}

 这是一个抽象类,主要定义了3个方法:invokeAll(), invokeAny(), submit()等方法,并无作其余的。

AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。

3.2ExecutorService

咱们来看一下ExecutorService这个接口的实现:

public interface ExecutorService extends Executor {
 
    void shutdown();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
 
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

这个接口也是定义了一些方法,具体的方法在这儿就不依依介绍了。

而ExecutorService又是继承了Executor接口,咱们看一下Executor接口的实现:

3.3Executor

public interface Executor {
    void execute(Runnable command);
}

 到这里,你们应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。

总结一下:

Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思能够理解,就是用来执行传进去的任务的;

而后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的全部方法;而后ThreadPoolExecutor继承了类AbstractExecutorService。

在ThreadPoolExecutor类中有几个很是重要的方法:

execute()
submit()
shutdown()
shutdownNow()

execute()方法其实是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,经过这个方法能够向线程池提交一个任务,交由线程池去执行。

submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并无对其进行重写,这个方法也是用来向线程池提交任务的,可是它和execute()方法不一样,它可以返回任务执行的结果,去看submit()方法的实现,会发现它实际上仍是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。

shutdown()和shutdownNow()是用来关闭线程池的。

还有不少其余的方法:好比:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法

4.深刻剖析线程池实现的原理

下面我将从如下几个方法具体的详解线程池实现的原理:

1.线程池状态

 在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:

volatile int runState;
static final int RUNNING    = 0;
static final int SHUTDOWN   = 1;
static final int STOP       = 2;
static final int TERMINATED = 3;

runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;

下面的几个static final变量表示runState可能的几个取值。

  • 当建立线程池后,初始时,线程池处于RUNNING状态;
  • 若是调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不可以接受新的任务,它会等待全部任务执行完毕;
  • 若是调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,而且会去尝试终止正在执行的任务;
  • 当线程池处于SHUTDOWN或STOP状态,而且全部工做线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

2.任务的执行

在了解将任务提交给线程池到任务执行完毕整个过程以前,咱们先来看一下ThreadPoolExecutor类中其余的一些比较重要成员变量:

private final BlockingQueue<Runnable> workQueue;              //任务缓存队列,用来存放等待执行的任务
private final ReentrantLock mainLock = new ReentrantLock();   //线程池的主要状态锁,对线程池状态(好比线程池大小
                                                              //、runState等)的改变都要使用这个锁
private final HashSet<Worker> workers = new HashSet<Worker>();  //用来存放工做集
 
private volatile long  keepAliveTime;    //线程存货时间   
private volatile boolean allowCoreThreadTimeOut;   //是否容许为核心线程设置存活时间
private volatile int   corePoolSize;     //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数
 
private volatile int   poolSize;       //线程池中当前的线程数
 
private volatile RejectedExecutionHandler handler; //任务拒绝策略
 
private volatile ThreadFactory threadFactory;   //线程工厂,用来建立线程
 
private int largestPoolSize;   //用来记录线程池中曾经出现过的最大线程数
 
private long completedTaskCount;   //用来记录已经执行完毕的任务个数

在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然经过submit也能够提交任务,可是实际上submit方法里面最终调用的仍是execute()方法,因此咱们只须要研究execute()方法的实现原理便可:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

1.判断提交的任务command是否为null,如果null,则抛出空指针异常;接着是这句,这句要好好理解一下:

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))

因为是或条件运算符,因此先计算前半部分的值,若是线程池中当前线程数不小于核心池大小,那么就会直接进入下面的if语句块了。若是线程池中当前线程数小于核心池大小,则接着执行后半部分,也就是执行

addIfUnderCorePoolSize(command)方法。

2.若是执行完addIfUnderCorePoolSize这个方法返回false,则继续执行下面的if语句块,不然整个方法就直接执行完毕了。若是执行完addIfUnderCorePoolSize这个方法返回false,而后接着判断:

if (runState == RUNNING && workQueue.offer(command))

若是当前线程池处于RUNNING状态,则将任务放入任务缓存队列;若是当前线程池不处于RUNNING状态或者任务放入缓存队列失败,则执行:addIfUnderMaximumPoolSize(command)

3.若是执行addIfUnderMaximumPoolSize方法失败,则执行reject()方法进行任务拒绝处理。

回到前面:if (runState == RUNNING && workQueue.offer(command

这句的执行,若是说当前线程池处于RUNNING状态且将任务放入任务缓存队列成功,则继续进行判断:
if (runState != RUNNING || poolSize == 0)

这句判断是为了防止在将此任务添加进任务缓存队列的同时其余线程忽然调用shutdown或者shutdownNow方法关闭了线程池的一种应急措施。若是是这样就执行:

ensureQueuedTaskHandled(command)进行应急处理,从名字能够看出是保证 添加到任务缓存队列中的任务获得处理。

从上面分析源码能够看出主要是2个方法:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize

咱们先看addIfUnderCorePoolSize()这个方法:

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize < corePoolSize && runState == RUNNING)
            t = addThread(firstTask);        //建立线程去执行firstTask任务   
        } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

这个是addIfUnderCorePoolSize方法的具体实现,从名字能够看出它的意图就是当低于核心吃大小时执行的方法。下面看其具体实现,首先获取到锁,由于这地方涉及到线程池状态的变化,先经过if语句判断当前线程池中的线程数目是否小于核心池大小,有朋友也许会有疑问:前面在execute()方法中不是已经判断过了吗,只有线程池当前线程数目小于核心池大小才会执行addIfUnderCorePoolSize方法的,为什么这地方还要继续判断?缘由很简单,前面的判断过程当中并无加锁,所以可能在execute方法判断的时候poolSize小于corePoolSize,而判断完以后,在其余线程中又向线程池提交了任务,就可能致使poolSize不小于corePoolSize了,因此须要在这个地方继续判断。而后接着判断线程池的状态是否为RUNNING,缘由也很简单,由于有可能在其余线程中调用了shutdown或者shutdownNow方法。而后就是执行

t = addThread(firstTask);

这个方法也很是关键,传进去的参数为提交的任务,返回值为Thread类型。而后接着在下面判断t是否为空,为空则代表建立线程失败(即poolSize>=corePoolSize或者runState不等于RUNNING),不然调用t.start()方法启动线程。

咱们来看一下addThread方法的实现:

private Thread addThread(Runnable firstTask) {
    Worker w = new Worker(firstTask);
    Thread t = threadFactory.newThread(w);  //建立一个线程,执行任务   
    if (t != null) {
        w.thread = t;            //将建立的线程的引用赋值为w的成员变量       
        workers.add(w);
        int nt = ++poolSize;     //当前线程数加1       
        if (nt > largestPoolSize)
            largestPoolSize = nt;
    }
    return t;
}

在addThread方法中,首先用提交的任务建立了一个Worker对象,而后调用线程工厂threadFactory建立了一个新的线程t,而后将线程t的引用赋值给了Worker对象的成员变量thread,接着经过workers.add(w)将Worker对象添加到工做集当中。

下面咱们看一下Worker类的实现:

private final class Worker implements Runnable {
    private final ReentrantLock runLock = new ReentrantLock();
    private Runnable firstTask;
    volatile long completedTasks;
    Thread thread;
    Worker(Runnable firstTask) {
        this.firstTask = firstTask;
    }
    boolean isActive() {
        return runLock.isLocked();
    }
    void interruptIfIdle() {
        final ReentrantLock runLock = this.runLock;
        if (runLock.tryLock()) {
            try {
        if (thread != Thread.currentThread())
        thread.interrupt();
            } finally {
                runLock.unlock();
            }
        }
    }
    void interruptNow() {
        thread.interrupt();
    }
 
    private void runTask(Runnable task) {
        final ReentrantLock runLock = this.runLock;
        runLock.lock();
        try {
            if (runState < STOP &&
                Thread.interrupted() &&
                runState >= STOP)
            boolean ran = false;
            beforeExecute(thread, task);   //beforeExecute方法是ThreadPoolExecutor类的一个方法,没有具体实现,用户能够根据
            //本身须要重载这个方法和后面的afterExecute方法来进行一些统计信息,好比某个任务的执行时间等           
            try {
                task.run();
                ran = true;
                afterExecute(task, null);
                ++completedTasks;
            } catch (RuntimeException ex) {
                if (!ran)
                    afterExecute(task, ex);
                throw ex;
            }
        } finally {
            runLock.unlock();
        }
    }
 
    public void run() {
        try {
            Runnable task = firstTask;
            firstTask = null;
            while (task != null || (task = getTask()) != null) {
                runTask(task);
                task = null;
            }
        } finally {
            workerDone(this);   //当任务队列中没有任务时,进行清理工做       
        }
    }
}

 它实际上实现了Runnable接口,所以上面的Thread t = threadFactory.newThread(w);效果跟下面这句的效果基本同样:

Thread t = new Thread(w);

至关于传进去了一个Runnable任务,在线程t中执行这个Runnable。

既然Worker实现了Runnable接口,那么天然最核心的方法即是run()方法了:

public void run() {
    try {
        Runnable task = firstTask;
        firstTask = null;
        while (task != null || (task = getTask()) != null) {
            runTask(task);
            task = null;
        }
    } finally {
        workerDone(this);
    }
}

 从run方法的实现能够看出,它首先执行的是经过构造器传进来的任务firstTask,在调用runTask()执行完firstTask以后,在while循环里面不断经过getTask()去取新的任务来执行,那么去哪里取呢?天然是从任务缓存队列里面去取,getTask是ThreadPoolExecutor类中的方法,并非Worker类中的方法,下面是getTask方法的实现:

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //若是线程数大于核心池大小或者容许为核心池线程设置空闲时间,
                //则经过poll取任务,若等待必定的时间取不到任务,则返回null
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {    //若是没取到任务,即r为null,则判断当前的worker是否能够退出
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();   //中断处于空闲状态的worker
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

在getTask中,先判断当前线程池状态,若是runState大于SHUTDOWN(即为STOP或者TERMINATED),则直接返回null。

若是runState为SHUTDOWN或者RUNNING,则从任务缓存队列取任务。

若是当前线程池的线程数大于核心池大小corePoolSize或者容许为核心池中的线程设置空闲存活时间,则调用poll(time,timeUnit)来取任务,这个方法会等待必定的时间,若是取不到任务就返回null。

而后判断取到的任务r是否为null,为null则经过调用workerCanExit()方法来判断当前worker是否能够退出,咱们看一下workerCanExit()的实现:

private boolean workerCanExit() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    boolean canExit;
    //若是runState大于等于STOP,或者任务缓存队列为空了
    //或者  容许为核心池线程设置空闲存活时间而且线程池中的线程数目大于1
    try {
        canExit = runState >= STOP ||
            workQueue.isEmpty() ||
            (allowCoreThreadTimeOut &&
             poolSize > Math.max(1, corePoolSize));
    } finally {
        mainLock.unlock();
    }
    return canExit;
}

也就是说若是线程池处于STOP状态、或者任务队列已为空或者容许为核心池线程设置空闲存活时间而且线程数大于1时,容许worker退出。若是容许worker退出,则调用interruptIdleWorkers()中断处于空闲状态的worker,咱们看一下interruptIdleWorkers()的实现:

void interruptIdleWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)  //实际上调用的是worker的interruptIfIdle()方法
            w.interruptIfIdle();
    } finally {
        mainLock.unlock();
    }
}

 从实现能够看出,它实际上调用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:

void interruptIfIdle() {
    final ReentrantLock runLock = this.runLock;
    if (runLock.tryLock()) {    //注意这里,是调用tryLock()来获取锁的,由于若是当前worker正在执行任务,锁已经被获取了,是没法获取到锁的
                                //若是成功获取了锁,说明当前worker处于空闲状态
        try {
    if (thread != Thread.currentThread())  
    thread.interrupt();
        } finally {
            runLock.unlock();
        }
    }
}

这里有一个很是巧妙的设计方式,假如咱们来设计线程池,可能会有一个任务分派线程,当发现有线程空闲时,就从任务缓存队列中取一个任务交给空闲线程执行。可是在这里,并无采用这样的方式,由于这样会要额外地对任务分派线程进行管理,无形地会增长难度和复杂度,这里直接让执行完任务的线程去任务缓存队列里面取任务来执行。

咱们再看addIfUnderMaximumPoolSize方法的实现,这个方法的实现思想和addIfUnderCorePoolSize方法的实现思想很是类似,惟一的区别在于addIfUnderMaximumPoolSize方法是在线程池中的线程数达到了核心池大小而且往任务队列中添加任务失败的状况下执行的:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize < maximumPoolSize && runState == RUNNING)
            t = addThread(firstTask);
    } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

 看到没有,其实它和addIfUnderCorePoolSize方法的实现基本如出一辙,只是if语句判断条件中的poolSize < maximumPoolSize不一样而已。到这里,大部分朋友应该对任务提交给线程池以后到被执行的整个过程有了一个基本的了解,下面总结一下:

  • 若是当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会建立一个线程去执行这个任务;
  • 若是当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(通常来讲是任务缓存队列已满),则会尝试建立新的线程去执行这个任务;
  • 若是当前线程池中的线程数目达到maximumPoolSize,则会采起任务拒绝策略进行处理;
  • 若是线程池中的线程数量大于 corePoolSize时,若是某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;若是容许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

3.线程池中的线程初始化

默认状况下,建立线程池以后,线程池中是没有线程的,须要提交任务以后才会建立线程。

  在实际中若是须要线程池建立以后当即建立线程,能够经过如下两个方法办到:

  • prestartCoreThread():初始化一个核心线程;
  • prestartAllCoreThreads():初始化全部核心线程

下面是这2个方法的实现:

public boolean prestartCoreThread() {
    return addIfUnderCorePoolSize(null); //注意传进去的参数是null
}
 
public int prestartAllCoreThreads() {
    int n = 0;
    while (addIfUnderCorePoolSize(null))//注意传进去的参数是null
        ++n;
    return n;
}

注意上面传进去的参数是null,根据第2前面的分析可知若是传进去的参数为null,则最后执行线程会阻塞在getTask方法中的

r = workQueue.take();

即等待任务队列中有任务。

4.任务缓存队列及排队策略

在前面咱们屡次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。

  workQueue的类型为BlockingQueue<Runnable>,一般能够取下面三种类型:

  1)ArrayBlockingQueue:基于数组的先进先出队列,此队列建立时必须指定大小;

  2)LinkedBlockingQueue:基于链表的先进先出队列,若是建立时没有指定此队列大小,则默认为Integer.MAX_VALUE;

  3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

5.任务拒绝策略

当线程池的任务缓存队列已满而且线程池中的线程数目达到maximumPoolSize,若是还有任务到来就会采起任务拒绝策略,一般有如下四种策略:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,可是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,而后从新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

6.线程池的关闭

ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

  • shutdown():不会当即终止线程池,而是要等全部任务缓存队列中的任务都执行完后才终止,但不再会接受新的任务
  • shutdownNow():当即终止线程池,并尝试打断正在执行的任务,而且清空任务缓存队列,返回还没有执行的任务

7.线程池容量的动态调整

ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

  • setCorePoolSize:设置核心池大小
  • setMaximumPoolSize:设置线程池最大能建立的线程数目大小

  当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能当即建立新的线程来执行任务。

相关文章
相关标签/搜索