JDK源码学习2-ThreadPoolExecutor学习,源码一篇就够了

写在开篇

刚开始新学一门框架,直接看源码是比较痛苦的,也咨询过一些前辈,怎样看源码,他们说从“入口”看,那么什么是入口呢?我摸索了好久,我认为入口是如下两个java

  1. 初始化的过程。能够是一个类的构造器,也能够是Bean的initMethod,通常这里会作一些变量的赋值,或者起一些服务。
  2. 调用流程。好比本篇咱们讲线程池,2000行代码,从头看会至关吃力,若是是我,我会从execute(task),也就是从提交一个任务的时候开始看。

    这是我总结的两个“入口”,也是我看源码的主线,有更好的经验能够一块儿交流。下面我将从这两条主线来开始源码的分析。编程

1. 初始化

看一下构造器的内容并发

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
      TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory                       
      threadFactory,RejectedExecutionHandler handler) {
      
    if (corePoolSize < 0 ||maximumPoolSize <= 0 || maximumPoolSize < corePoolSize ||
    keepAliveTime < 0) 
        throw new IllegalArgumentException();
        
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
        
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

线程池的初始化须要7个参数,看过上一篇文章的朋友应该对这些参数很熟悉了,在此就不赘述了。框架

2. 提交任务后,线程池在干什么

2.1 提交任务

在上一篇文章(2.5 入队)中,咱们知道了当提交一个新任务,什么状况下会新建线程,何时塞入工做队列,什么状况下会拒绝任务。请务必提早了解基础概念,再看下文的源码说明:ide

首先,咱们要了解下ctl变量表示什么,代码中会大量用到。ctl变量是一个AtomicInteger,就意味着,ctl是原子性,一个int有32位,前3位表示当前线程池的状态(RUNNING,STOP…),后29位表示当前的线程数(具体实现能够百度,这里不是重点),为何要这样作呢,由于状态和线程数是两个变量,而且这两个变量的关系是息息相关的,若是分开赋值,那么将没法保证原子性。若是要保证原子性,就得上锁,这样会大大下降性能。AtomicInteger类型作到了既保证了原子性,也无需上锁,是否是很方便(以前我也不理解为何,直到看到一篇博文,比较赞同这种说法,若是有不一样意见,能够一块儿讨论哈)性能

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException()
        //先获取ctl
        int c = ctl.get();
        //1.若是池的工做线程数小于core size,就新建线程来处理这个任务
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                 return;
                //若是新建线程失败,就更新ctl
             c = ctl.get();
        }
        //2.若是池的工做线程数大于等于core size,而且线程池的状态是RUNNING,
             //而且能够塞进工做队列,就再从新拉一下ctl,作recheck
        //2.1若是线程池的状态不是RUNNING,就从队列中删除这个任务,再拒绝这个任务
        //2.2若是线程池的状态是RUNNING,线程池又没有线程在运行,就新建一个工做线程来处理队列里的任务
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
             if (! isRunning(recheck) && remove(command))
                 reject(command);
             else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
       //3.若是池的工做线程数大于等于core size,但没法塞入工做队列,
            //那么就新建工做线程来处理这个任务,若是无法新建线程,
            //意味着这个线程数SHUT DOWN,或者线程池饱和了,咱们就拒绝这个任务
        else if (!addWorker(command, false))
            reject(command);
}

2.2 新建线程

新建线程主要在addWorker这个方法中完成,若是传入task,表示新建的线程直接来处理这个新task,若是传入的是null,那么这个新建线程就是来处理工做队列里的任务。新建线程主要分为3步:this

1)不停地check线程状态和线程数,而后将线程数+1(修改ctl)。由于以前的全部操做都没有上锁,直到修改线程数以前,任何线程都有机会修改ctl。

2)新建包装类Worker。在此先认为Worker包装了一个Thread,一个处理任务的劳动力,详细的后面再说。而后将worker加入workers。spa

3)启动worker。线程

private boolean addWorker(Runnable firstTask, boolean core) {
    //第一步:        
    retry:
    for (;;) {
        //再次拉取最新的线程池状态
        int c = ctl.get();
        int rs = runStateOf(c);
        //若是线程池的状态是RUNNING,或者虽然是SHUTDOWN的状态,但firstTask是空,工做队列非空
        //(说明这个方法的调用是为了增长工做线程来处理队列任务),那就继续,不然返回false,也就是addWorker失败
        if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))
            return false;
        for (;;) {
            //获取线程数,根据传入的core来和core size/max size作对比,好比在上一步提交任务的时候,
           //当前线程数 < core size,因而以新建线程来处理新任务的模式传参给addWorker(),
           //如今发现线程数变成 >= core size了,固然是不能继续下去了,以此类推。
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //上一步recheck没问题,那么就将线程数 +1,跳出全部循环   
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //线程数+1失败,说明有其余线程改变了这个线程数,须要再次判断当前线程池的状态和线程数    
            c = ctl.get();  // Re-read ctl
            
           //线程状态若是没变,咱们就只重复当前循环check线程数,若是状态也变了,
            //咱们就要跳到retry标志的位置,也就是最外层的循环里,从新check 线程池的状态和线程数。
            if (runStateOf(c) != rs) 
                continue retry; 
        }
    }
    //第二步: 
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null; 
    try {
        //在此先认为Worker包装了一个Thread,一个处理任务的劳动力,详细的后面再说,
        //这里须要注意的是,firstTask和thread没有关系。
        w = new Worker(firstTask);
        final Thread t = w.thread;
        //若是劳动力建立成功
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //如今咱们拿到了mainLock的锁了,由于接下来,咱们要处理workers,在此以前,
                //须要再次确认线程数与状态,以前都没上锁,万一线程池被停了呢?
                int rs = runStateOf(ctl.get());
                //若是状态是RUNNING或者虽然状态是SHUTDOWN,可是没传入新任务.
                if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {
                    //若是线程已经开始跑了(注意,咱们尚未手动启动线程,执行start方法),就抛出异常
                    if (t.isAlive()) // precheck that t is startable  
                        throw new IllegalThreadStateException();
                    //加入到workers中,workers是一个hashset,里面储存了这些干活劳动力
                    //而后更新largestPoolSize,就释放锁啦
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //第三步:
            //若是一切顺利,线程数改了,劳动力也添加成功了,就来启动线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //若是线程启动失败,那就回滚以前所作的一些修改操做,并检查,是否是须要中止这个线程池
        if (! workerStarted)
            addWorkerFailed(w);
    }
    //返回新线程的启动结果
    return workerStarted;
}

2.3 处理任务

在上一步,咱们知道了处理任务的劳动力Worker,这里简单介绍一下Worker,详细地不说,这里咱们只了解在整个流程中用到的部分。调试

首先Worker实现了Runnable接口,继承自AbstractQueuedSynchronizer,其中有两个变量Thread threadRunnable firstTask,首先,咱们来看下初始化:

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

firstTask是咱们在一开始就提交的任务,若是咱们addWorker()传入的是null,那么这个firstTask也是null。thread就是线程自己,ThreadFactory可由线程池初始化的时候传入自定义的工厂,若是没有,线程池内部会有一个默认的工厂,默认状况下,会new Thread()

这里引伸一下,自定义的工厂类能够作什么呢?虽然咱们能够借此修改线程优先级或者守护状态,可是不建议你这么作。经常使用的作法,能够经过自定义工厂传入自定义的Thread,这里咱们能够定制线程的行为,好比,修改线程的名字,设置自定义 UncaughtExceptionHandlerLogger写入信息,维护一些统计信息(包括有多少线程被建立和销毁),以及在线程被建立或终止时把调试消息写入日志。

---------------摘自《Java并发编程实战》

如今咱们来看一下Workerrun()方法,为何会调用run()方法呢?上一步启动线程不是使用thread.start()方法吗?咱们来看一下这段注释(来自thread.run()):

/**
* If this thread was constructed using a separate
* <code>Runnable</code> run object, then that
* <code>Runnable</code> object's <code>run</code> method is called;
* otherwise, this method does nothing and returns.
* <p>
* Subclasses of <code>Thread</code> should override this method.
*
* @see     #start()
* @see     #stop()
* @see     #Thread(ThreadGroup, Runnable, String)
*/
@Override
public void run() {
    if (target != null) {
        target.run();
    }
}

也就是,若是Thread对象使用了Runnable作初始化,那么在调用Thread.run()的时候,会调用这个Runnable对象的run方法,还记得在Worker类的构造器中的 this.thread = getThreadFactory().newThread(this);这句吗?所以,当启动工做线程的时候,咱们实际是运行的Worker.run()

Worker.run()方法内部只有一句“runWorker(this)”,因此咱们直接来看runWorker()方法:

这个方法的主要任务,就是不断从队列中领取任务,运行任务。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //若是这个线程在构造的时候没有传入task,而且也没办法经过getTask()拿到新任务,那么这个线程就被回收了
        //getTask()在这个能够认为是从队列中领取任务,具体实现,下文再说
        while (task != null || (task = getTask()) != null) {
            //在运行任务前,咱们须要上锁,除非线程池STOP了,不然不容许打断这个线程
            w.lock();
            // 若是线程池的状态是STOP了,确认当前线程是否被打断,若是没有被打断,就打断这个线程
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() &&          
                                                      runStateAtLeast(ctl.get(), STOP))) && 
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //beforeExecute方法抛出的异常会致使线程回收
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //run
                    task.run();
                    } catch (RuntimeException x) {
                        thrown = x; 
                        throw x;
                    } catch (Error x) {
                        thrown = x; 
                        throw x;
                    } catch (Throwable x) {
                        thrown = x; 
                        throw new Error(x);
                    } finally {
                        //运行中的任务若是抛出异常,虽然咱们会将异常传入Thrown,可是也会致使线程的回收
                        afterExecute(task, thrown);
                    }
            } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

2.4 线程回收

由上面的代码能够看出,一个工做线程的大体工做流程是,首先看这个线程初始化的时候有没有传入task,没有的话,就经过getTask()方法尝试获取task。若是拿到这个task,就会先上锁,而后执行task.run(),若是没拿到,那么这个线程就会经过processWorkerExit()回收掉。

首先,咱们看下getTask()方法,有的人可能以为无非是从队列里面取任务,的确这个方法的主要任务是这个,可是要记住线程池大部分的状况下是不上锁的,因此咱们要常常检查线程池的状态和线程数,还记得在初始化线程池的时候传入的keep-alive time吗?这里就用到了这个参数。

咱们先看一下注释:

getTask()方法是用于经过阻塞或者等待时间来获取任务,至于选择哪一种模式根据配置而定。getTask()方法在如下4种状况下会返回null(也就是这个工做线程会被回收):

1)若是当前线程数大于max size(可能由于setMaximumPoolSize方法改变了线程池的max size)。

2)线程池停下了(STOP)。

3)线程池SHUTDOWN而且队列空了。

4)线程等待超时(设置了allowCoreThreadTimeOut或者线程数 > core size)

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //检查是否符合第2,3种状况,是就删除线程数,返回null
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        //检查是否符合第4种状况
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //检查是否符合第1种状况  或者这个线程以前尝试过获取任务而且符合第四种状况。
        //若是当前线程数> 1或者队列已经没有任务了,就删除线程数,返回null
        if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            //若是设置了allowCoreThreadTimeOut或者当前线程数 > core size,就使用poll方法拿任务
            //不然就用take方法阻塞拿任务
            Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
            if (r != null)
                return r;
            //若是poll方法超时,没拿到任务,就将timedOut设置为true
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

接下来,咱们看下,若是一个Worker被回收以前会接受怎样的处理:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //若是是因异常而被回收的线程,那就没有经历过线程数的-1的处理,这里补充完成
    if (completedAbruptly)
        decrementWorkerCount();
        //接着从workers中删除这个劳动力,和添加同样,须要上锁。而且记录总的完成任务数
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    
    //尝试终止这个线程池,上文提到几种终止线程池的状况,
    //这个方法就是判断此刻线程池的状态是否须要终止,
    //线程被回收有可能意味着要终止线程,因此此处须要判断,详细的后文再说
    tryTerminate();
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        //若是是线程是正常退出
        if (!completedAbruptly) {
            //获取线程池容许的最小线程数
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
                //若是当前线程数不小于规定的最小值,就让这个线程正常退出
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        //若是这个劳动力不是正常退出,或者即便是正常退出,可是当前线程数小于规定的最小值,
        //就补充一个劳动力去处理队列的任务
        addWorker(null, false);
    }
}

总结一下:

  1. 判断线程池状态,若是达到终止条件就终止这个线程池
  2. 无论这个线程是正常退出仍是非正常退出,若是当前线程数小于设定的最小值,就新建一个Worker,不然就正常回收。

2.5 拒绝任务

上文提到,咱们有四种拒绝任务的策略,如今咱们具体来看一看:

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

reject()方法内部调用的handler.rejectedExecution()方法根据handler的不一样而具体实现也不相同。

​ 1)AbortPolicy:默认策略,直接抛出异常

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
     throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
 }

​ 2)CallerRunsPolicy:调用execute(),尝试将任务丢给线程池处理的线程自己处理任务

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}

​ 3)DiscardOldestPolicy:将队列头部的旧任务丢掉,而后再次尝试将新任务丢入线程池:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
    }
}

​ 4)DiscardPolicy:忽略这个任务,空方法。

2.6 终止线程池

前文提到tryTerminate()方法是用于检测线程池的当前状态是否是达到终止条件,若是符合条件,就终止这个线程池。咱们先看下哪些方法调用了tryTerminate()

图片描述

这些方法都有可能致使线程池的终止,如今,咱们来看下tryTerminate方法的具体实现:

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        //检查线程池的状态,若是线程池RUNING,或者是TIDYING,TERMINATED状态
        //或者是SHUTDOWN状态,可是队列非空,就退出
        if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        //若是符合终止的条件,可是线程池还有线程,因而打断其中一个线程,返回
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        //若是符合终止条件,而且线程池已经没有线程池了
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //先将状态改成TIDYING,执行terminated方法(可被改写,默认为空),再将状态改成TERMINATED
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

不管是shutdown()仍是shutdownNow()方法内部都是使用tryTerminate()来终止线程池,因此在此就再也不赘述。

相关文章
相关标签/搜索