源码学习第一篇(线程池)

你们面试过程当中确定被问道过线程池。为何要使用线程池呢?由于在系统中频繁建立线程会形成很大的CPU消耗。并且用完的线程要等待GC回收也会形成消耗。面试

下面咱们就来学习下最经常使用的线程池 ThreadPoolExecutor,缓存

首先先来看看它的构造方法:安全

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

参数说明:并发

corePoolSize:核心线程数,线程池维持的正常活跃(保障不会线程超时)的最小线程数,不容许为0,除非设置容许线程超时等待(若是提交的任务数量大于这个参数时,提交的任务将被放入缓存队列)。
maximumPoolSize:最大线程数,即为线程池能够容纳的最大线程数。
keepAliveTime: 空闲的线程存活时间。当存在超过corePoolSize或设置了容许核心线程空闲超时的时候,线程将在等待这么长时间后自动销毁(默认以纳秒为单位)。
unit: 空闲的线程存活时间的单位。
workQueue: 等待队列,注意这个队列是阻塞的,当队列中没有任务时线程获取任务将被阻塞,直到有任务时候被唤醒。
threadFactory: 线程工厂,工厂模式你们都懂的,若是有须要自定义线程,实现ThreadFactory接口从newThread方法返回你自定义的线程类就行了。
handler: 拒绝策略处理器,在线程池没法接收新的工做时候会调用该处理器的拒绝方法拒绝掉一些任务。oop

线程池拒绝策略处理接口:RejectedExecutionHandler
实现类:
直接拒绝策略(不会报错):DiscardPolicy
直接拒绝策略(将抛出一个RejectedExecutionException错误):AbortPolicy
让放入的线程本身运行任务:CallerRunsPolicy
抛弃等待时间最长的任务:DiscardOldestPolicy学习

看完了这些参数后是否是仍是一头雾水?哈哈,其实我也是这样。要了解线程池,先要了解它的状态。ui

    private static final int COUNT_BITS = Integer.SIZE - 3;
    //CAPACITY 值为 1 << 29 = 0010 0000 0000 0000 0000 0000 0000 0000
    // (1 << 29) -1 = 0001 1111 1111 1111 1111 1111 1111 1111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    //线程池状态
    // -1 << 29 = 1110 0000 0000 0000 0000 0000 0000 0000
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

线程池状态:
RUNNING:取30位到32位111表明运行中,能够接收新的任务而且会处理等待队列中的任务。
SHUTDOWN:取30位到32位000表明关闭中,不中断正在执行的任务,可是会中断等待中的线程。而后等待队列中剩余的任务处理完就关闭。
STOP: 取30位到32位001表明中止,不接受新任务也不处理等待队列中的任务。而且会中断正在运行的任务。
TIDYING: 清取30位到32位010表明理中,最后的扫尾工做,关闭中到结束的中间状态,全部任务已经完成,工做线程数workerCount为0,线程池将执行结束方法。
TERMINATED: 取30位到32位011表明结束,结束方法完成,线程池结束。
线程池状态转换:
RUNNING -> SHUTDOWN: 调用了shutdown()方法以后,线程池会开始拒绝接收新的任务,从运行中转变为关闭中的状态。通常是在线程池完成工做须要销毁时调用。
RUNNING -> STOP: 调用了shutdownNow()方法以后,线程池会拒绝接受任务并中断因此正在执行的任务,从运行状态转入中止状态。
SHUTDOWN -> STOP: 调用了shutdownNow()方法以后,线程池会拒绝接受任务并中断因此正在执行的任务,从关闭中转为中止状态。
SHUTDOWN -> TIDYING: 线程池自己处于中止中的状态,队列和线程池都为空的时候。线程进入清理状态。
STOP -> TIDYING: 当线程池为空时,线程池从中止中状态转入清理状态。
TIDYING -> TERMINATED: 当线程清理完成会回调 terminated() 方法,完成后线程正式结束。this

如今不明白这些状态是干什么用的,能够先放到一边,接下来咱们要看一下线程池的几个比较核心的方法。为了让代码更容易理解,我在上面加入了注释。spa

    // 获取方法状态, 取计数器的30-32位做为线程池的状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //取计数器的后29位做为线程数
    private static int workerCountOf(int c)  { return c & CAPACITY; }


    //线程池执行方法
    public void execute(Runnable command) {
        if (command == null){
            throw new NullPointerException();
        }
        //获取计数器的数值
        int c = ctl.get();

        /*
         * 若是运行的线程数少于corePoolSize核心线程数,就启动一个新的线程来执行该任务。
         * addWorker方法将以原子方式检查runState(运行状态)和workerCount(工做中总线程数),
         * 若是运行状态为关闭或者工做中总线程数等于核心线程数返回false。
         */
        //workerCountOf 取计数器的后29位做为线程数 和核心线程数进行对比
        if (workerCountOf(c) < corePoolSize) {
            //启动一个新的线程来完成工做
            if (addWorker(command, true)){
                return;
            }
            //获取计数器的最新数值
            c = ctl.get();
        }
        /*
         * isRunning 方法是经过原子的计数器获取的数值来判断运行状态(注意若是是运行状态前三位必定是111整个数值是个负数),
         * 若是运行状态是运行状态,就尝试向等待队列末尾插入一个任务。
         */
        if (isRunning(c) && workQueue.offer(command)) {
            //获取计数器的最新数值
            int recheck = ctl.get();
            //再次判断线程池状态,若是这时候线程池进入其余状态,折删除刚刚添加的任务,而且在remove中调用了中断空闲线程的方法
            if (!isRunning(recheck) && remove(command)){
                //调用拒绝策略
                reject(command);
            }
            /*
             * 若是线程池还处于运行中 或 者线程处于关闭中,可是等待队列任务删除失败。
             * 判断工做线程的数量为0的话,就建立一个空闲线程放入池中,让他将队列中的任务执行完后
             * 线程池在转入清理状态
             */
            else if (workerCountOf(recheck) == 0){
                addWorker(null, false);
            }
        }
        /*
         * 若是运行状态为关闭再次尝试调用 addWorker 方法运行这个任务,这时addWorker 
         * 会返回false。这时候执行拒绝策略(reject 方法底层其实调用的拒绝策略的拒绝方法)。
         * 若是线程状态为运行中,可是由于等待队列满了,插入失败,再次调用addWorker 尝试运行
         * 任务,而且该方法参数 core为false若线程数未超过 最大线程数会建立一个新的线程来运行任务
         * addWorker 方法下面有介绍
         */
        else if (!addWorker(command, false)){
            reject(command);
        }
    }

    //将任务加入工做中的方法,线程池的核心方法之一
    //参数 core 表明是否是比较核心线程数的意思。
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            //获取计数器的数值
            int c = ctl.get();
            //取30位-32位判断线程状态
            int rs = runStateOf(c);
            /*
             * 若是线程池处于 中止,清理中,结束 等状态,直接返回false拒绝该任务
             * 若是线程处于 关闭中 且传入的任务不为空 也直接拒绝该任务。
             * 若是传入的任务为空,可是等待队列为空,证实没任务了不要建立新的线程,直接返回结束该方法。
             */
            if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())){
                return false;
            }
            for (;;) {
            
                //获取当前线程数
                int wc = workerCountOf(c);
                //判断线程数是否超过极限容量, 若是没超过 且core为true 在看看是否超过核心线程数
                // 若是 且core为false 则和最大线程数进行比较。 若是超过了就返回false拒绝该任务。
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)){
                    return false;
                }
                //修改原子计数器的线程数 若是修改为功 就不必在无限循环了 直接断开循环跳到最外层不在执行循环
                if (compareAndIncrementWorkerCount(c)){
                    break retry;
                }
                //若是上面的修改没有成功 再次获取原子计数器数值
                c = ctl.get();  // Re-read ctl
                //判断当前的状态十分和以前的同样若是同样继续循环修改 若是不同则结束循环跳到最外层大循环,重新开始
                if (runStateOf(c) != rs){
                    continue retry;
                }
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //获取主锁,为了保证在并发下添加任务的安全性
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    //获取到锁后 重新获取线程池运行状态
                    int rs = runStateOf(ctl.get());
                    //若是线程处于运行状态 或者 者线程处于关闭中,可是等待队列任务删除失败。且线程池中线程已全被中断
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        //判断线程是不是活跃的
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //将新建的工做线程加入工做队列
                        workers.add(w);
                        //获取工做线程数量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //设置工做线程加入成功
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //若是工做线程加入成功启动新的线程 并将线程状态设置为 true
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

下面顺便提一句,为何阿里规范不让用 Executors.newSingleThreadExecutor(); 这样的方法建立线程池。线程

咱们能够看看他的源码:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
LinkedBlockingQueue 这是一个阻塞无限队列,也就是说这个队列只要不是OOM了就能够一直往里面放。这样会形成若是线程数到达核心线程数之后仍是处理不过来并不会继续建立线程,而是会一直往队列中塞任务,直到内存溢出。
相关文章
相关标签/搜索