Java线程池ThreadPoolExecutor深度探索及源码解析

咱们的程序里,时常要使用多线程。所以多线程的管理变的尤其重要。ThreadPoolExecutor很好的解决了这一点。本篇文章主要从源码入手,分析ThreadPoolExecutor的原理。算法

###1.标记和构造方法####编程

和不少状态对象同样,ThreadPoolExecutor也经过一个int的头3位来记录线程池的状态,后面20多位来标记工做线程数量。而且提供通用的位运算接口来得到你所须要的数据。多线程

private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

咱们先来看下ThreadPoolExecutor的构造方法,这里彷佛咱们又要老生常谈了,网上已经有不少关于线程池各个参数的介绍了,这里,非墨仍是会再说一遍,这样加深一下你们的印象。oop

public ThreadPoolExecutor(int corePoolSize, //核心线程数量
                              int maximumPoolSize,//最大线程数量
                              long keepAliveTime,//存活时间
                              TimeUnit unit,//存活时间单位
                              BlockingQueue<Runnable> workQueue,//工做队列
                              ThreadFactory threadFactory,//线程构造工厂
                              RejectedExecutionHandler handler//拒绝请求回调
) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

###2.执行流程####post

按照咱们熟知的线程池机制, 1.当请求被post到咱们的线程池中,咱们的线程池会先生成一个核心线程来执行它 2.当核心线程满了的时候,将会把这个请求放入到咱们的工做请求队列workQueue中。 3.若是你提供的队列是一个有界队列的时候,线程池将会判断你的最大线程数是否超过你的核心线程。若是超过核心线程的话,线程池会生成新的线程去执行它。 4.若是这个时候,已经达到了最大线程数,那么线程池将走到拒绝回调 5.若是线程池的最大线程数不大于核心线程数,而且工做队列已满,那么将直接走拒绝回调ui

实际上这个流程已经在ThreadPoolExecutor.execute方法注释中有详细的说明。即使没有说明,咱们也能够从它的代码流程简单看出一些端倪:this

//code ThreadPoolExecutor.execute(Runnable)
        int c = ctl.get();//获取当前运行线程数
        if (workerCountOf(c) < corePoolSize) {//若是当前线程数小于核心线程数
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//将请求命令加入到工做队列
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))//加入到非核心线程中
            reject(command);//若是非核心线程没有执行,那么将走拒绝请求回调

###3.深刻源码###线程

咱们以execute为入口,深刻分析一下这个线程池的源码。int c = ctl.get()方法咱们暂时不说,后面咱们将会补充,咱们暂且把它理解为得到一个数量,而这个数量c将会传入到workerCountOf方法中。这个方法名称咱们就能知道其用意,就是为了得到当前工做线程数量。code

private static int workerCountOf(int c)  { return c & CAPACITY; }

上文咱们说到,线程池会经过一个int的后几位来记录线程数量,而workerCountOf就是经过位运算来得到当前工做线程数。在得到当前线程数了之后,若是当前线程数小于 corePoolSize的话,将会经过addWorker方法把command加入到工做线程中。addWorker须要提供两个参数,一个是你的command,另一个boolean量是为了标识是不是往core线程中加。对象

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();//得到一个含有状态和数量的值
            int rs = runStateOf(c);//得到当前线程池状态
            ...
           for (;;) {//第二个for
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
   }

这里,经过上面的代码咱们能够清晰的了解ctl变量的存在的目的: 1.首先,当从类型上看clt是一个原子类型,说明它是要支持多线程调用的 2.ctl里面的值须要存储两个信息,一个是线程数量,一个是当前线程池的工做状态。

这时候是否有读者还在纳闷,为何个人线程数小于个人核心线程数,我往个人线程池里加,仍是可能出现加不进去的状况。事实上,“第二个for”循环已经很好的说明了这一点。由于线程池不能保证是同一个线程调用addWorker方法。线程池须要同步事后,才能保证是不是否往核心线程里面加。这就是为何在ThreadPoolExecutor.execute方法里,在判断完核心线程数量以后,若是失败了,还要再取一次当前线程数的缘由。

if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();//再取一次
        }

好的,咱们继续回到"第二个for"。咱们能够看出,线程池在同步方面不只细化了粒度,并且用的是CAS算法。这种算法能够劲量的避免因为sync引发的线程阻塞。

for (;;) {//第二个for
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
               //当线程池数量超过核心线程的时候退出,返回false
                if (compareAndIncrementWorkerCount(c))
                    break retry;//当增长线程成功的时候,跳出循环
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;//状态不一致继续循环
                // else CAS failed due to workerCount change; retry inner loop
            }

因为咱们如今只有一个线程在工做,不存在多线程竞争的状况,所以咱们选择跳出循环的逻辑。跳出循环之后,程序将真正意义上的生成一个Worker线程来执行指令。

//code private boolean addWorker(Runnable firstTask, boolean core)
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);//生成一个worker对象
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();//锁定全局锁
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());//检查线程池状态

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);//将Worker线程归入workers集合对象管理
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;//从新赋值largestPoolSize变量
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;

上面的代码很是简单,线程池将生成一个Worker的线程包装类。不管是是不是核心线程,全部的线程都被归入到workers集合对象进行管理。若是一切流程都正常workerAdded将为true,Worker里的线程将被启动。启动后Worker将执行线程的run方法,而在run方法中,又调用到Worker的runWorker(Worker)方法:

public void run() {
            runWorker(this);
        }

runworker是真正的线程执行流程的代码段:

// code runworker
 try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();//判断当前线程池状态,
                try {
                    beforeExecute(wt, task);//执行前切面
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//执行后切面
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }

runworker方法中引伸出来两个方法beforeExecute和afterExecute。能够经过继承的方式来监控command的执行。至关于在command.run以前和以后切了两个面,是一种面向方面的编程模式。当Task执行完成以后,因为while循环,将再次执行while的判断条件task = getTask()) != null; getTask方法是可能阻塞的,阻塞的时间是根据你在构造线程池的时候设置的超时时间来决定的。

private Runnable getTask() {
        boolean timedOut = false; //是否判断超时

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //allowCoreThreadTimeOut变量用于控制是否让核心线程也进行超时判断
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?//经过timed变量来选择使用poll方法仍是take方法
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;//若是poll获取的r为空,标记为超时
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }


>getTask仍是一个循环操做,第一次执行的时候,会经过timed变量来判断是否有超时检查,若是有超时检查的话将调用poll方法。若是poll在规定的时间内并无得到任何的执行对象,返回的r为null,timedOut将被标记为true。这时候,又再次进入循环。这时候,若是你是非核心线程,是扩展线程的话,那么,if ((wc > maximumPoolSize || (timed && timedOut))这个判断为true,程序将返回一个null。
在runWorker方法中,若是getTask返回的对象为null,runWorker将跳出while循环,执行finally语句:
finally {

            processWorkerExit(w, completedAbruptly);         }

>processWorkerExit方法须要传递两个变量,第一个变量是Worker对象,第二个变量是completedAbruptly变量,这个变量是干什么用的呢?由于你的程序跳出可能存在两种状况,一种是正常跳出,一种是异常跳出,若是是异常跳出的话,这个时候你的workercount未必正常的执行decrement操做,所以经过这个变量来标记程序的执行状态。

//code processWorkerExit final ReentrantLock mainLock = this.mainLock;         mainLock.lock();         try {             completedTaskCount += w.completedTasks;             workers.remove(w);         } finally {             mainLock.unlock();         }

mainLock是一个全局锁,主要是为了同步全局的workers变量。上面的代码中,线程池将记录一下task执行数据,而且将worker从workers队列中删除。
这个时候,基本上整个线程池的流程都已经概述完了,固然,咱们还确实一个变量,那就是RejectedExecutionHandler类型变量。这个得回到咱们的execute方法:

if (isRunning(c) && workQueue.offer(command)) {             int recheck = ctl.get();             if (! isRunning(recheck) && remove(command))                 reject(command);             else if (workerCountOf(recheck) == 0)                 addWorker(null, false);         }         else if (!addWorker(command, false))             reject(command);//拒绝请求

>当线程池拒绝请求的时候,将调用reject方法,而reject方法将会回调RejectedExecutionHandler的rejectedExecution方法:

final void reject(Runnable command) {         handler.rejectedExecution(command, this);     }

线程池提供一个默认的拒绝请求回调:

//code ThreadPoolExecutor private static final RejectedExecutionHandler defaultHandler =         new AbortPolicy(); //code AbortPolicy public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {             throw new RejectedExecutionException("Task " + r.toString() +                                                  " rejected from " +                                                  e.toString());         }

也就是采用异常的方式来拒绝请求。


这样,ThreadPoolExecutor的主要源码和结构已经分析完了,固然还有其余的特性和功能须要看官们本身去探索。
                                                                                                        -----非墨
相关文章
相关标签/搜索