并发编程之 源码剖析 线程池 实现原理

前言

在上一篇文章中咱们介绍了线程池的使用,那么如今咱们有个疑问:线程池究竟是怎么实现的?毕竟好奇是人类的天性。那咱们今天就来看看吧,扒开 他的源码,一探究竟。java

1. 从 Demo 入手

上图是个最简单的demo,咱们从这个 demo 开始看源码,首先一步一步来看。spring

首先咱们手动建立了线程池,使用了有数量限制的阻塞队列,使用了线程池工厂提供的默认线程工厂,和一个默认的拒绝策略,咱们看看默认的线程工厂是如何建立的?并发

默认的线程工厂从当前线程中获取线程组,设置了默认的线程名字前缀 pool-xxx-thread-xxx,强制设置为非守护线程,强制设置为默认优先级。ui

而后咱们看看ThreadPoolExecutor 的构造方法:this

没有什么特殊的东西,主要是一些判断。spa

好了,那么咱们看看 execute 方法是如何实现的。线程

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // c = -536870911
    int c = ctl.get();
    // 工做线程数量小于核心线程池设定数,则建立线程。
    if (workerCountOf(c) < corePoolSize) {
        // 若是添加成功则直接返回
        if (addWorker(command, true))
            return;
        // 不然再次获取活动线程数量
        c = ctl.get();
    }
    // 若是线程池正在运行,而且添加进队列成功
    if (isRunning(c) && workQueue.offer(command)) {
        // 再次对线程池状态检查, 由于上面 addWorker 过了而且失败了,因此须要检查
        int recheck = ctl.get();
        // 若是状态不是运行状态,且从队列删除该任务成功并尝试中止线程池
        if (! isRunning(recheck) && remove(command))
            // 拒绝任务
            reject(command);
        // 若是当前工做线程数量为0(线程池已关闭),则添加一个 null 到队列中
        else if (workerCountOf(recheck) == 0)
            // 添加个空的任务
            addWorker(null, false);
    }
    // 若是添加队列失败,则建立一个任务线程,若是失败,则拒绝
    else if (!addWorker(command, false))
        // 拒绝
        reject(command);
    }
}
复制代码

首先,空判断。设计

而后判断,若是正在工做的线程小于设置的核心线程,则建立线程并返回,若是正在工做的线程数量大于等于核心线程数量,则试图将任务放入队列,若是失败,则尝试建立一个 maximumPoolSize 的任务。注意,在remove 方法中,该方法已经试图中止线程池的运行。3d

从这段代码中,能够看到,最重要的方法就是 addWorker 和 workQueue.offer(command) 这段代码,一个是建立线程,一个是放入队列。后者就是将任务添加到阻塞队列中。代理

那么咱们就看看 addWorker 方法。

2. addWorker 方法-----建立线程池

private boolean addWorker(Runnable firstTask, boolean core)

该方法很长,楼主说一下这个方法的两个参数,第一个参数为 Runnable 类型,表示线程池中某个线程的第一个任务,第二个参数是若是是 true,则建立 core 核心线程,若是是 false ,则建立 maximumPoolSize 线程。这两个线程的生命周期是不一样的。

楼主截取该方法中最终的代码:

其中,在该方法中,建立一个 Worker 对象,该对象代理了任务对象,咱们看看该类的构造方法:

经过线程工厂建立线程,注意,传递的是 this ,所以,在上面的代码中国,调用了 worker 对象的 thread 属性的 start 方法,实际上就是调用了该类的 run 方法。那么改类的 run 方法是怎么实现的呢?

调用了自身的 runWorker 方法。这个方法很是的重要。

3. Worker.runWorker(Worker w) 方法-------线程池的最核心方法

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted. This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
复制代码

首先说该方法的主要逻辑:

  1. 首先执行 firstTask 的 run 方法。
  2. 而后循环获取阻塞队列中的任务,并调用他们的 run 方法。
  3. 若是线程池中的任务异常,就抛出异常并中止运行线程池。

这个方法能够说就是线程池的核心,在最开始的设定的核心任务数都是直接调用 start 方法启动线程的,启动以后,这个线程并不关闭,而是一直在阻塞队列上等待,若是有任务就执行任务的run 方法,而不是 start 方法,这点很重要。

而该方法中有几个注意的地方就是线程池留给咱们扩展的,在执行任务以前,会执行 beforeExecute 方法,该方法默认为空,咱们能够实现该方法,在任务执行结束后,在 finally 块中有 afterExecute 方法,一样也是空的,咱们能够扩展。

楼主看到这里的代码后,大为赞叹,Doug Lea 能够说神通常的人物。

那么,线程池还有一个方法, submit 是如何实现的呢?其实核心逻辑也是 runWorker 方法,否则楼主也不会说这个方法是线程池的核心。

那咱们看看 submit 方法是如何实现的。

4. submit 方法实现原理。

该方法最终也是走 execute 方法的,所以逻辑基本相同,不一样的是什么呢?咱们看看。咱们看到,第二行代码建立了 一个 RunnableFuture 对象,RunnableFuture 是一个接口,具体的实现是什么呢?咱们看看:

FutureTask

FutureTask 对象,该对象也是一个线程对象:

那咱们就看看该方法的 run 方法。

该方法核心逻辑楼主已经框起来了,其中调用了 call 方法,返回一个返回值,并在set 方法中,将返回值设置在一个变量中,若是是异常,则将异常设置在变量中。咱们看看set方法:

该方法经过CAS将任务状态状态从new变成 COMPLETING,而后,设置 outcome 变量,也就是返回值。最后,调用 finishCompletion 方法,完成一些变量的清理工做。

那么,若是从submit 中得到返回值呢?这要看get方法:

该方法会判断状态,若是状态尚未完成,那么就调用 awaitDone 方法等待,若是完成了,调用 report 返回值结果。

看见了刚刚设置的 outcome 变量,若是状态正常,则直接返回,若是状态为取消,则抛出异常,其他状况也抛出异常。

咱们回到 awaitDone 方法,看看该方法如何等待的。

该方法有一个死循环,直到有一个肯定的状态返回,若是状态大于 COMPLETING ,也就是 成功了,就返回该状态,若是正在进行中,则让出CPU时间片进行等待。若是都不是,则让该线程阻塞等待。在哪里唤醒呢?在 finishCompletion 方法中会唤醒该线程。

该方法循环了等待线程链表的链表,并唤醒链表中的每一个线程。

还有一个须要的注意的地方就是,在任务执行完毕会执行 done 方法,JDK 默认是空的,咱们能够扩展该方法。好比 Spring 的并发包 org.springframework.util.concurrent 就有2个类重写了该方法。

5. 总结

好了,到这里,线程池的基本实现原理咱们知道了,也解开了楼主一直以来的疑惑,能够说,线程池的核心方法就是 runWorker 方法 配合 阻塞队列,当线程启动后,就从队列中取出队列中的任务,执行任务的 run 方法。能够说设计的很是巧妙。而回调线程 callback 也是经过该方法,JDK 封装了 FutureTask 类来执行他们的 call 方法。

good luck!!!!

相关文章
相关标签/搜索