JDK5之后将工做单元和执行机制分离开来,工做单元包括Runnable和Callable;执行机制由Executor框架提供,管理线程的生命周期,将任务的提交和如何执行进行解耦。Executors是一个快速获得线程池的工具类,相关的类图以下所示:java
Executor接口数组
Executor接口只有一个execute方法,用来替代一般建立或启动线程的方法。框架
public interface Executor { void execute(Runnable command); }
ExecutorService接口异步
ExecutorService接口继承自Executor接口,加入了关闭方法、submit方法和对Callable、Future的支持。ide
ScheduledExecutorService接口工具
ScheduledExecutorService扩展ExecutorService接口并加入了对定时任务的支持。oop
ThreadPoolExecutor继承自AbstractExecutorService,也是实现了ExecutorService接口。ui
4.1 内部状态this
1 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 2 private static final int COUNT_BITS = Integer.SIZE - 3; 3 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 4 5 // runState is stored in the high-order bits 6 private static final int RUNNING = -1 << COUNT_BITS; 7 private static final int SHUTDOWN = 0 << COUNT_BITS; 8 private static final int STOP = 1 << COUNT_BITS; 9 private static final int TIDYING = 2 << COUNT_BITS; 10 private static final int TERMINATED = 3 << COUNT_BITS; 11 12 // Packing and unpacking ctl 13 private static int runStateOf(int c) { return c & ~CAPACITY; } 14 private static int workerCountOf(int c) { return c & CAPACITY; } 15 private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl是对线程池的运行状态(高3位)和线程池中有效线程的数量(低29位)进行控制的一个字段。线程池有五种状态,分别是:atom
4.2 构造方法
构造方法有4个,这里只列出其中最基础的一个。
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue, 6 ThreadFactory threadFactory, 7 RejectedExecutionHandler handler) { 8 if (corePoolSize < 0 || 9 maximumPoolSize <= 0 || 10 maximumPoolSize < corePoolSize || 11 keepAliveTime < 0) 12 throw new IllegalArgumentException(); 13 if (workQueue == null || threadFactory == null || handler == null) 14 throw new NullPointerException(); 15 this.corePoolSize = corePoolSize; 16 this.maximumPoolSize = maximumPoolSize; 17 this.workQueue = workQueue; 18 this.keepAliveTime = unit.toNanos(keepAliveTime); 19 this.threadFactory = threadFactory; 20 this.handler = handler; 21 }
构造方法中参数的含义以下:
4.3 execute方法
ThreadPoolExecutor.execute(task)实现了Executor.execute(task),用来提交任务,不能获取返回值,代码以下:
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 /* 5 * Proceed in 3 steps: 6 * 7 * 1. If fewer than corePoolSize threads are running, try to 8 * start a new thread with the given command as its first 9 * task. The call to addWorker atomically checks runState and 10 * workerCount, and so prevents false alarms that would add 11 * threads when it shouldn't, by returning false. 12 * 13 * 2. If a task can be successfully queued, then we still need 14 * to double-check whether we should have added a thread 15 * (because existing ones died since last checking) or that 16 * the pool shut down since entry into this method. So we 17 * recheck state and if necessary roll back the enqueuing if 18 * stopped, or start a new thread if there are none. 19 * 20 * 3. If we cannot queue task, then we try to add a new 21 * thread. If it fails, we know we are shut down or saturated 22 * and so reject the task. 23 */ 24 int c = ctl.get(); 25 /* 26 * workerCountOf方法取出低29位的值,表示当前活动的线程数; 27 * 若是当前活动线程数小于corePoolSize,则新建一个线程放入线程池中; 28 * 并把任务添加到该线程中。 29 */ 30 31 if (workerCountOf(c) < corePoolSize) { 32 /* 33 * addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断仍是maximumPoolSize来判断; 34 * 若是为true,根据corePoolSize来判断; 35 * 若是为false,则根据maximumPoolSize来判断 36 */ 37 if (addWorker(command, true)) 38 return; 39 /* 40 * 若是添加失败,则从新获取ctl值 41 */ 42 c = ctl.get(); 43 } 44 /* 45 * 线程池处于RUNNING状态,把提交的任务成功放入阻塞队列中 46 */ 47 if (isRunning(c) && workQueue.offer(command)) { 48 // 从新获取ctl值 49 int recheck = ctl.get(); 50 // 再次判断线程池的运行状态,若是不是运行状态,因为以前已经把command添加到workQueue中了, 51 // 这时须要移除该command 52 // 执行事后经过handler使用拒绝策略对该任务进行处理,整个方法返回 53 if (! isRunning(recheck) && remove(command)) 54 reject(command); 55 /* 56 * 获取线程池中的有效线程数,若是数量是0,则执行addWorker方法 57 * 这里传入的参数表示: 58 * 1. 第一个参数为null,表示在线程池中建立一个线程,但不去启动; 59 * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断; 60 * 若是判断workerCount大于0,则直接返回,在workQueue中新增的command会在未来的某个时刻被执行。 61 */ 62 else if (workerCountOf(recheck) == 0) 63 addWorker(null, false); 64 } 65 /* 66 * 若是执行到这里,有两种状况: 67 * 1. 线程池已经不是RUNNING状态; 68 * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize而且workQueue已满。 69 * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize; 70 * 若是失败则拒绝该任务 71 */ 72 else if (!addWorker(command, false)) 73 reject(command); 74 }
若是线程池状态一直是RUNNING,则执行过程以下:
4.4 addWorker方法
从executor的方法实现能够看出,addWorker主要负责建立新的线程并执行任务。线程池建立新线程执行任务时,须要获取全局锁:
1 private boolean addWorker(Runnable firstTask, boolean core) { 2 retry: 3 for (;;) { 4 int c = ctl.get(); 5 // 获取运行状态 6 int rs = runStateOf(c); 7 /* 8 * 这个if判断 9 * 若是rs >= SHUTDOWN,则表示此时再也不接收新任务; 10 * 接着判断如下3个条件,只要有1个不知足,则返回false: 11 * 1. rs == SHUTDOWN,这时表示关闭状态,再也不接受新提交的任务,但却能够继续处理阻塞队列中已保存的任务 12 * 2. firsTask为空 13 * 3. 阻塞队列不为空 14 * 15 * 首先考虑rs == SHUTDOWN的状况 16 * 这种状况下不会接受新提交的任务,因此在firstTask不为空的时候会返回false; 17 * 而后,若是firstTask为空,而且workQueue也为空,则返回false, 18 * 由于队列中已经没有任务了,不须要再添加线程了 19 */ 20 // Check if queue empty only if necessary. 21 if (rs >= SHUTDOWN && 22 ! (rs == SHUTDOWN && 23 firstTask == null && 24 ! workQueue.isEmpty())) 25 return false; 26 27 for (;;) { 28 // 获取线程数 29 int wc = workerCountOf(c); 30 // 若是wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false; 31 // 这里的core是addWorker方法的第二个参数,若是为true表示根据corePoolSize来比较, 32 // 若是为false则根据maximumPoolSize来比较。 33 if (wc >= CAPACITY || 34 wc >= (core ? corePoolSize : maximumPoolSize)) 35 return false; 36 // 尝试增长workerCount,若是成功,则跳出第一个for循环 37 if (compareAndIncrementWorkerCount(c)) 38 break retry; 39 // 若是增长workerCount失败,则从新获取ctl的值 40 c = ctl.get(); // Re-read ctl 41 // 若是当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行 42 if (runStateOf(c) != rs) 43 continue retry; 44 // else CAS failed due to workerCount change; retry inner loop 45 } 46 } 47 48 boolean workerStarted = false; 49 boolean workerAdded = false; 50 Worker w = null; 51 try { 52 // 根据firstTask来建立Worker对象 53 w = new Worker(firstTask); 54 // 每个Worker对象都会建立一个线程 55 final Thread t = w.thread; 56 if (t != null) { 57 final ReentrantLock mainLock = this.mainLock; 58 mainLock.lock(); 59 try { 60 // Recheck while holding lock. 61 // Back out on ThreadFactory failure or if 62 // shut down before lock acquired. 63 int rs = runStateOf(ctl.get()); 64 // rs < SHUTDOWN表示是RUNNING状态; 65 // 若是rs是RUNNING状态或者rs是SHUTDOWN状态而且firstTask为null,向线程池中添加线程。 66 // 由于在SHUTDOWN时不会在添加新的任务,但仍是会执行workQueue中的任务 67 if (rs < SHUTDOWN || 68 (rs == SHUTDOWN && firstTask == null)) { 69 if (t.isAlive()) // precheck that t is startable 70 throw new IllegalThreadStateException(); 71 // workers是一个HashSet 72 workers.add(w); 73 int s = workers.size(); 74 // largestPoolSize记录着线程池中出现过的最大线程数量 75 if (s > largestPoolSize) 76 largestPoolSize = s; 77 workerAdded = true; 78 } 79 } finally { 80 mainLock.unlock(); 81 } 82 if (workerAdded) { 83 // 启动线程,执行任务(Worker.thread(firstTask).start()); 84 //启动时会调用Worker类中的run方法,Worker自己实现了Runnable接口,因此一个Worker类型的对象也是一个线程。 85 t.start(); 86 workerStarted = true; 87 } 88 } 89 } finally { 90 if (! workerStarted) 91 addWorkerFailed(w); 92 } 93 return workerStarted; 94 }
4.5 Worker类
线程池中的每个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象。Worker类设计以下:
1 private final class Worker 2 extends AbstractQueuedSynchronizer 3 implements Runnable 4 { 5 /** 6 * This class will never be serialized, but we provide a 7 * serialVersionUID to suppress a javac warning. 8 */ 9 private static final long serialVersionUID = 6138294804551838833L; 10 11 /** Thread this worker is running in. Null if factory fails. */ 12 final Thread thread; 13 /** Initial task to run. Possibly null. */ 14 Runnable firstTask; 15 /** Per-thread task counter */ 16 volatile long completedTasks; 17 18 /** 19 * Creates with given first task and thread from ThreadFactory. 20 * @param firstTask the first task (null if none) 21 */ 22 Worker(Runnable firstTask) { 23 setState(-1); // inhibit interrupts until runWorker 24 this.firstTask = firstTask; 25 this.thread = getThreadFactory().newThread(this); 26 } 27 28 /** Delegates main run loop to outer runWorker */ 29 public void run() { 30 runWorker(this); 31 } 32 33 // Lock methods 34 // 35 // The value 0 represents the unlocked state. 36 // The value 1 represents the locked state. 37 38 protected boolean isHeldExclusively() { 39 return getState() != 0; 40 } 41 42 protected boolean tryAcquire(int unused) { 43 if (compareAndSetState(0, 1)) { 44 setExclusiveOwnerThread(Thread.currentThread()); 45 return true; 46 } 47 return false; 48 } 49 50 protected boolean tryRelease(int unused) { 51 setExclusiveOwnerThread(null); 52 setState(0); 53 return true; 54 } 55 56 public void lock() { acquire(1); } 57 public boolean tryLock() { return tryAcquire(1); } 58 public void unlock() { release(1); } 59 public boolean isLocked() { return isHeldExclusively(); } 60 61 void interruptIfStarted() { 62 Thread t; 63 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { 64 try { 65 t.interrupt(); 66 } catch (SecurityException ignore) { 67 } 68 } 69 } 70 }
4.6 runWorker方法
Worker类中的run方法调用了runWorker方法来执行任务,执行过程以下:
1 final void runWorker(Worker w) { 2 Thread wt = Thread.currentThread(); 3 // 获取第一个任务 4 Runnable task = w.firstTask; 5 w.firstTask = null; 6 // 容许中断 7 w.unlock(); // allow interrupts 8 boolean completedAbruptly = true; 9 try { 10 // 若是task为空,则经过getTask来获取任务 11 while (task != null || (task = getTask()) != null) { 12 w.lock(); 13 // If pool is stopping, ensure thread is interrupted; 14 // if not, ensure thread is not interrupted. This 15 // requires a recheck in second case to deal with 16 // shutdownNow race while clearing interrupt 17 if ((runStateAtLeast(ctl.get(), STOP) || 18 (Thread.interrupted() && 19 runStateAtLeast(ctl.get(), STOP))) && 20 !wt.isInterrupted()) 21 wt.interrupt(); 22 try { 23 beforeExecute(wt, task); 24 Throwable thrown = null; 25 try { 26 task.run(); 27 } catch (RuntimeException x) { 28 thrown = x; throw x; 29 } catch (Error x) { 30 thrown = x; throw x; 31 } catch (Throwable x) { 32 thrown = x; throw new Error(x); 33 } finally { 34 afterExecute(task, thrown); 35 } 36 } finally { 37 task = null; 38 w.completedTasks++; 39 w.unlock(); 40 } 41 } 42 completedAbruptly = false; 43 } finally { 44 processWorkerExit(w, completedAbruptly); 45 } 46 }
4.7 getTask方法
getTask方法用来从阻塞队列中取等待的任务
1 private Runnable getTask() { 2 boolean timedOut = false; // Did the last poll() time out? 3 4 for (;;) { 5 int c = ctl.get(); 6 int rs = runStateOf(c); 7 8 // Check if queue empty only if necessary. 9 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 10 decrementWorkerCount(); 11 return null; 12 } 13 14 int wc = workerCountOf(c); 15 16 // Are workers subject to culling? 17 // timed变量用于判断是否须要进行超时控制。 18 // allowCoreThreadTimeOut默认是false,也就是核心线程不容许进行超时; 19 // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量; 20 // 对于超过核心线程数量的这些线程,须要进行超时控制 21 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 22 23 if ((wc > maximumPoolSize || (timed && timedOut)) 24 && (wc > 1 || workQueue.isEmpty())) { 25 if (compareAndDecrementWorkerCount(c)) 26 return null; 27 continue; 28 } 29 30 try { 31 /* 32 * 根据timed来判断,若是为true,则经过阻塞队列的poll方法进行超时控制,若是在keepAliveTime时间内没有获取到任务,则返回null; 33 * 不然经过take方法,若是这时队列为空,则take方法会阻塞直到队列不为空。 34 * 35 */ 36 Runnable r = timed ? 37 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 38 workQueue.take(); 39 if (r != null) 40 return r; 41 timedOut = true; 42 } catch (InterruptedException retry) { 43 timedOut = false; 44 } 45 } 46 }
1 public class Test{ 2 3 public static void main(String[] args) { 4 5 ExecutorService es = Executors.newCachedThreadPool(); 6 Future<String> future = es.submit(new Callable<String>() { 7 @Override 8 public String call() throws Exception { 9 try { 10 TimeUnit.SECONDS.sleep(2); 11 } catch (InterruptedException e) { 12 e.printStackTrace(); 13 } 14 return "future result"; 15 } 16 }); 17 try { 18 String result = future.get(); 19 System.out.println(result); 20 } catch (Exception e) { 21 e.printStackTrace(); 22 } 23 } 24 }
在实际业务场景中,Future和Callable基本是成对出现的,Callable负责产生结果,Future负责获取结果。
5.1 submit方法
AbstractExecutorService.submit()实现了ExecutorService.submit(),能够得到执行完的返回值。而ThreadPoolExecutor是AbstractExecutorService的子类,因此submit方法也是ThreadPoolExecutor的方法。
1 public Future<?> submit(Runnable task) { 2 if (task == null) throw new NullPointerException(); 3 RunnableFuture<Void> ftask = newTaskFor(task, null); 4 execute(ftask); 5 return ftask; 6 } 7 public <T> Future<T> submit(Runnable task, T result) { 8 if (task == null) throw new NullPointerException(); 9 RunnableFuture<T> ftask = newTaskFor(task, result); 10 execute(ftask); 11 return ftask; 12 } 13 public <T> Future<T> submit(Callable<T> task) { 14 if (task == null) throw new NullPointerException(); 15 RunnableFuture<T> ftask = newTaskFor(task); 16 execute(ftask); 17 return ftask; 18 }
经过submit方法提交的Callable或者Runnable任务会被封装成了一个FutureTask对象。经过Executor.execute方法提交FutureTask到线程池中等待被执行,最终执行的是FutureTask的run方法。
5.2 FutureTask对象
类图
内部状态
/** *... * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
内部状态的修改经过sun.misc.Unsafe修改。
get方法
1 public V get() throws InterruptedException, ExecutionException { 2 int s = state; 3 if (s <= COMPLETING) 4 s = awaitDone(false, 0L); 5 return report(s); 6 }
内部经过awaitDone方法对主线程进行阻塞,具体实现以下:
1 /** 2 * Awaits completion or aborts on interrupt or timeout. 3 * 4 * @param timed true if use timed waits 5 * @param nanos time to wait, if timed 6 * @return state upon completion 7 */ 8 private int awaitDone(boolean timed, long nanos) 9 throws InterruptedException { 10 final long deadline = timed ? System.nanoTime() + nanos : 0L; 11 WaitNode q = null; 12 boolean queued = false; 13 for (;;) { 14 if (Thread.interrupted()) { 15 removeWaiter(q); 16 throw new InterruptedException(); 17 } 18 19 int s = state; 20 if (s > COMPLETING) { 21 if (q != null) 22 q.thread = null; 23 return s; 24 } 25 else if (s == COMPLETING) // cannot time out yet 26 Thread.yield(); 27 else if (q == null) 28 q = new WaitNode(); 29 else if (!queued) 30 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, 31 q.next = waiters, q); 32 else if (timed) { 33 nanos = deadline - System.nanoTime(); 34 if (nanos <= 0L) { 35 removeWaiter(q); 36 return state; 37 } 38 LockSupport.parkNanos(this, nanos); 39 } 40 else 41 LockSupport.park(this); 42 } 43 }
run方法
1 public void run() { 2 if (state != NEW || 3 !UNSAFE.compareAndSwapObject(this, runnerOffset, 4 null, Thread.currentThread())) 5 return; 6 try { 7 Callable<V> c = callable; 8 if (c != null && state == NEW) { 9 V result; 10 boolean ran; 11 try { 12 result = c.call(); 13 ran = true; 14 } catch (Throwable ex) { 15 result = null; 16 ran = false; 17 setException(ex); 18 } 19 if (ran) 20 set(result); 21 } 22 } finally { 23 // runner must be non-null until state is settled to 24 // prevent concurrent calls to run() 25 runner = null; 26 // state must be re-read after nulling runner to prevent 27 // leaked interrupts 28 int s = state; 29 if (s >= INTERRUPTING) 30 handlePossibleCancellationInterrupt(s); 31 } 32 }
FutureTask.run方法是在线程池中被执行的,而非主线程
Exectors工厂类提供了线程池的初始化接口,主要有以下几种:
newFixedThreadPool
1 public static ExecutorService newFixedThreadPool(int nThreads) { 2 return new ThreadPoolExecutor(nThreads, nThreads, 3 0L, TimeUnit.MILLISECONDS, 4 new LinkedBlockingQueue<Runnable>()); 5 }
建立一个固定大小、任务队列容量无界(Integer.MAX_VALUE)的线程池,其中corePoolSize =maximumPoolSize=nThreads,阻塞队列为LinkedBlockingQuene。
注意点:
corePoolSize
,这致使了maximumPoolSize
和keepAliveTime
将会是个无用参数 ;newSingleThreadExecutor
1 public static ExecutorService newSingleThreadExecutor() { 2 return new FinalizableDelegatedExecutorService 3 (new ThreadPoolExecutor(1, 1, 4 0L, TimeUnit.MILLISECONDS, 5 new LinkedBlockingQueue<Runnable>())); 6 }
只有一个线程来执行无界任务队列的单一线程池。若是该线程异常结束,会从新建立一个新的线程继续执行任务,惟一的线程能够保证所提交任务的顺序执行。因为使用了无界队列, 因此SingleThreadPool永远不会拒绝,即饱和策略失效。与newFixedThreadPool(1)的区别在于单一线程池的大小不能再改变。
newCachedThreadPool
1 public static ExecutorService newCachedThreadPool() { 2 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3 60L, TimeUnit.SECONDS, 4 new SynchronousQueue<Runnable>()); 5 }
建立一个大小无界的缓冲线程池。任务队列是一个同步队列。缓冲线程池适用于执行耗时较小的异步任务。池的核心线程数=0 最大线程数=Integer.MAX_VLUE。与前两种稍微不一样的是:
newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
能定时执行任务的线程,池的核心线程数由参数指定。和前面3个线程池基于ThreadPoolExecutor类实现不一样的是,它基于ScheduledThreadPoolExecutor实现。
可使用ThreadPoolExecutor如下方法: