在前面的一篇文章里我对java threadpool的几种基本应用方法作了个总结。Java的线程池针对不一样应用的场景,主要有固定长度类型、可变长度类型以及定时执行等几种。针对这几种类型的建立,java中有一个专门的Executors类提供了一系列的方法封装了具体的实现。这些功能和用途不同的线程池主要依赖于ThreadPoolExecutor,ScheduledThreadPoolExecutor等几个类。如前面文章讨论所说,这些类和相关类的主要结构以下:java
这里不是对全部类的详细实现作一个分析,而是从现有线程池ThreadPoolExecutor的源代码出发,分析一个线程池应该考虑的要点。从本文总体的方向来讲,主要结合前面文章中提交线程给线程池以后分为返回结果和不返回结果的方式,按照他们执行的脉络来分析当咱们提交一个线程到线程池以后他们内部是如何运行的。顺便也详细理解线程池这种参考实现的内部结构。安全
咱们从最初使用多线程的代码开始,在一些示例代码里,咱们经过Executors.newFixedThreadPool()等方法建立了一个ExecutorService类型的线程池。实际上具体实现对应的是ThreadPoolExecutor等。而后咱们再使用这个对象的execute或者submit方法。前面咱们了解到,一般咱们用execute方法执行一个线程不经过这个方法自己返回执行结果或者咱们不须要利用这个方法来获取结果。而submit方法是须要获得结果的。那么他们二者一个要结果,一个不要结果的是怎么统一块儿来的呢?若是咱们看类AbstractExecutorService的以下代码则就能够理解了:多线程
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
前面这30多行代码没什么特别的,主要针对不一样类型的方法签名。他们能够接收Runnable, Callable类型的参数。对于须要返回结果的类型,经过专门一个变量来保存结果。整体来讲至关于一个简单的包装。而具体执行的代码仍是要看execute方法。这里须要注意的一点就是newTaskFor(task)方法经过一个包装类将Callable变量包装成一个线程,让它能够运行。ide
既然前面的两种方法都归结于同一个方法execute,那么咱们就来看看它的具体实现:oop
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
这是ThreadPoolExecutor里面的代码。前面这部分的代码看起来比较困难,并且也比较难懂。别急,咱们先把代码放这里。在讨论这些详细实现的思路前,咱们先看看几个要实现线程池须要考虑的点。ui
假定咱们要实现一个线程池,那么有哪些地方是咱们须要认真考虑的呢?从线程池自己的定义来看,它是将一组事先建立好的线程放在一个资源池里,当须要的时候就将该线程分配给具体的任务来执行。那么,这个池子该有多大呢?咱们线程池确定要面临多个线程资源的访问,是否是自己的结构要保证线程安全呢?还有,若是线程池建立好以后咱们后续有若干任务使用了线程资源,当池里面的资源使用完以后咱们该如何安排呢?是给线程池扩容,建立更多的线程资源,仍是增长一个队列,让一些任务先在里面排队呢?在一些极端的状况下,好比说来的任务实在是太多了线程池处理不过来,对于这些任务该怎么处理呢?是丢弃仍是通知给请求方?线程执行的时候会有碰到异常或者错误的状况,这些异常咱们该怎么处理?怎么样保证这些异常的处理不会致使线程池其余任务的正常运行不出错呢?this
总的来讲,前面的这几个问题能够归结为一下几个方面:atom
1. 线程池的结构。spa
2. 线程池的任务分配策略。
3. 线程池的异常和错误处理。
下面,咱们针对这几个问题结合源代码详细的分析一下。
在ThreadPoolExecutor里面有一个AtomicInteger的数值,它用来表示两个信息,一个是当前线程池的状态,还有一个就是当前线程的数目。由于这两部分都是糅合到一个整型数字里头,因此他们的信息访问就比较紧凑和特殊一点:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
这部分的代码看起来有点怪异,其实很好理解。咱们设定的线程池里面最多能够容纳的线程数为(2^29) -1。这也就是为何前面用一个Integer.SIZE - 3做为位数。这样这个整数的0-28位表示的就是线程的数目。而高位的部分,29-31位的地方则表示线程池的状态。这里定义的主要有5种状态,分别对应的值是从-1到3.
他们对应着线程的running, shutdown, stop, tidying, terminated这几个状态。
除了前面的几个部分之外,线程池里还有以下几个成员:
private final BlockingQueue<Runnable> workQueue; private final ReentrantLock mainLock = new ReentrantLock(); /** * Set containing all worker threads in pool. Accessed only when * holding mainLock. */ private final HashSet<Worker> workers = new HashSet<Worker>(); /** * Wait condition to support awaitTermination */ private final Condition termination = mainLock.newCondition(); /** * Tracks largest attained pool size. Accessed only under * mainLock. */ private int largestPoolSize; /** * Counter for completed tasks. Updated only on termination of * worker threads. Accessed only under mainLock. */ private long completedTaskCount; private volatile ThreadFactory threadFactory; /** * Handler called when saturated or shutdown in execute. */ private volatile RejectedExecutionHandler handler; /** * Timeout in nanoseconds for idle threads waiting for work. * Threads use this timeout when there are more than corePoolSize * present or if allowCoreThreadTimeOut. Otherwise they wait * forever for new work. */ private volatile long keepAliveTime; /** * If false (default), core threads stay alive even when idle. * If true, core threads use keepAliveTime to time out waiting * for work. */ private volatile boolean allowCoreThreadTimeOut; /** * Core pool size is the minimum number of workers to keep alive * (and not allow to time out etc) unless allowCoreThreadTimeOut * is set, in which case the minimum is zero. */ private volatile int corePoolSize; /** * Maximum pool size. Note that the actual maximum is internally * bounded by CAPACITY. */ private volatile int maximumPoolSize;
这些部分的内容看起来比较多,实际上他们几个都是在一些方法里常常用到的。
workQueue: 一个BlockingQueue<Runnable>队列,自己的结构能够保证访问的线程安全。至关于一个排队等待队列。当咱们线程池里线程达到corePoolSize的时候,一些须要等待执行的线程就放在这个队列里等待。
workers: 一个HashSet<Worker>的集合。线程池里全部能够当即执行的线程都放在这个集合里。
mainLock: 一个访问workers所须要使用的锁。从前面的workQueue, workers这两个结构咱们能够看到,若是咱们要往线程池里面增长执行任务或者执行完毕一个任务,都要访问到这两个结构。因此大多数状况下为了保证线程安全,就须要使用mainLock这个锁。
corePoolSize: 处于活跃状态的最少worker数目。咱们一个线程池里确定事先建立好了若干个,等来执行任务的时候直接拿去就能够跑了。那么到底要保证最初有多少个呢?就由corePoolSize这个来指定了。
maximumPoolSize:线程池最大的长度。能够设置的一个参数。在咱们当前池里面的线程数到达这个数字的时候就不能再往里面加了。须要注意的是这里是咱们设定的一个池最大范围。在这里能够设定的最大数字是(2^29) -1。
其余还有几个牵涉到的成员好比说RejectedExecutionHandler等,相对都比较简单一点,代码里的注释就已经可以说清楚了。
ok,有了前面这几个基本成员的说明,咱们再看看他们使用的work的结构。既然执行的都是一个Worker的集合。那么在这里Worker的定义是什么样的呢?下面是Worker的定义代码:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() == 1; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } }
这里咱们能够看到Worker自己实现了Runnable接口,因此它能够当成一个线程来执行。而后也继承了AbstractQueuedSynchronizer,也能够实现一些对自己的线程同步访问。这里最重要的几个部分在于它里面定义了一个Thread thread和Runnable firstTask。看到这里,咱们可能会比较奇怪,咱们只是要一个能够执行的线程,这里放一个Thread和一个Runnable的变量作什么呢?在Worker的run方法里,调用的runWorker方法究竟是怎么执行的呢?咱们再来看看runWorker方法:
final void runWorker(Worker w) { Runnable task = w.firstTask; w.firstTask = null; boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); clearInterruptsForTaskRun(); try { beforeExecute(w.thread, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
这部分代码看起来挺多的,里面的beforeExecute,afterExecute的方法在默认的实现里是空的。在一些有特定要求的地方能够经过继承ThreadPoolExecutor提供自定义的实现。和前面的定义结合起来看,看来Worker这里也没干什么别的,就是绕了个圈执行了里面设定的firstTask。
若是咱们仔细看其中的代码,还有一个须要注意的地方就是这里用了一个while循环来执行task,而跳出循环的条件则是要task为null。那么这个getTask是作了什么呢?通常来讲咱们给它一个线程,执行完任务不就完了吗?要它再去拿线程干吗啊?咱们看看它的实现:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } boolean timed; // Are workers subject to culling? for (;;) { int wc = workerCountOf(c); timed = allowCoreThreadTimeOut || wc > corePoolSize; if (wc <= maximumPoolSize && ! (timedOut && timed)) break; if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
从代码的注释里能够看到,这个方法要从workQueue里面获取task而后提交执行。也就是说咱们线程池执行完了当前的任务后会主动到这个队列里来取后续等待的任务执行。若是当前线程由于超时、线程池要关闭等状态影响则可能会退出,而若是一切都正常的话,则会从workQueue里面调用poll或take方法取到当前任务。前面一大堆的判断和循环就是判断当前线程池长度是否超过maximumPoolSize以及当前状态是否要关闭了。若是长度超了或者状态不对则不必继续去取任务执行了,须要尽快返回。
有了前面这部分的分析,咱们知道Worker只不过包含了一个指向咱们须要建立的Runnable对象,而后在Worker做为线程执行的时候再来运行这个Runnable里面的线程执行部分。
有了前面那部分的铺垫,咱们再来回过头看线程池拿到一个任务后execute方法的执行。咱们将这些代码拆开来看,这是第一部分:
int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); }
这里获取到当前正在执行的线程数目,若是这些线程的数目少于corePoolSize,则将该线程加入到线程池中。而后返回。
另一部分的代码以下:
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); }
这里是假定若是前面不能直接加入到线程池Worker集合里,则加入到workQueue队列等待执行。里面的if else判断语句则是检查当前线程池的状态。若是线程池自己的状态是要关闭并清理了,咱们则不能提交线程进去了。这里咱们就要reject他们。因此前面咱们看到的一些线程池拒绝线程执行的机制在这里也获得了验证。
最后面这部分的代码以下:
else if (!addWorker(command, false)) reject(command);
这里对应代码注释里的第3种状况,咱们前面作了两种尝试,一个是将线程加入到workers集合或者workerQueue队列排队。在这两种状况都失败的状况下,咱们尝试加入一个新的线程。若是这种状况下咱们也失败了,则拒绝线程提交执行。
这里几个地方都用到了addWorker方法,而咱们既然是execute方法,确定要让线程执行起来。但是这里没有见到那个地方调用线程的start方法。那么极可能这个线程方法的具体调用就在addWorker方法里。
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } Worker w = new Worker(firstTask); Thread t = w.thread; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); if (t == null || (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null))) { decrementWorkerCount(); tryTerminate(); return false; } workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } finally { mainLock.unlock(); } t.start(); // It is possible (but unlikely) for a thread to have been // added to workers, but not yet started, during transition to // STOP, which could result in a rare missed interrupt, // because Thread.interrupt is not guaranteed to have any effect // on a non-yet-started Thread (see Thread#interrupt). if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted()) t.interrupt(); return true; }
前面的嵌套for循环主要是用来判断当前线程池的状态是否能够容许继续加线程,同时也判断线程池的长度是否已经超标。固然,既然咱们有一个线程加入了执行,当前运行的数量也要更新。若是没问题,则经过break retry;跳出这两个循环开始后面的正式执行。
在正式执行的时候咱们建立一个Worker对象,并将mainLock加锁。保证后续执行部分是单线程执行的。在进入加锁的部分以后还须要再一次检查一下线程池的状态。这里咱们将当前的线程加入到workers集合。而后咱们经过t.start()方法正式执行线程。在这里一个线程才算是真正的执行起来了。
前面咱们看了一下线程池的执行机制。在默认的线程池实现里,它是经过一个workers集合来保持最核心活跃状态的线程组。当咱们新加入线程执行任务时,则先利用这里的线程。若是这里的被占用满了以后则加入到workQueue这个队列里排队。这里面有一个重要的地方就是要常常检查当前线程池的状态,只有在运行状态的时候才能够往里面加线程,不然提交线程任务则会被拒绝。咱们也要检查线程池的长度,防止提交的执行任务达到了咱们设定的上限。为了保证线程的提交和执行安全,咱们用一个lock来管理对线程集合workers和workerQueue的加锁控制。
在线程池中也有一些扩展点。好比在线程执行的过程当中咱们能够覆写beforeExecute, afterExecute方法来提供本身特定的功能。另外,当线程执行不符合条件要被丢弃或者拒绝的时候,咱们也能够提供一些RejectExecutionHandler的具体实现。在系统的默认实现里已经提供了5种。
总的来讲,对于一个线程池,它最核心的部分是对应一个线程运行集合和一个队列。若是咱们可以保证好他们的状态、大小以及线程安全执行,那么基本上一个线程的雏形就差很少完成了。