一.类图说明缓存
Executor接口类,执行Runnable接口execute。函数
ExecutorService接口类继承Executor接口,包含提交执行Runnable和Callable接口submit以及shutdown,invokeAll接口。this
ScheduledExecutorService接口类继承ExecutorService接口,主要包含计划执行接口schedule,scheduleAtFixedRate以及scheduleWithFixedDelay。spa
虚类AbstractExecutorService,实现ExecutorService接口。实现ExecutorService相关接口。线程
ThreadPoolExecutor类,继承虚类AbstractExecutorService。实现初始化线程池数量、策略并执行线程。code
ScheduledThreadPoolExecutor类,继承ThreadPoolExecutor类并实现ScheduledExecutorService接口。实现计划执行相关接口。对象
执行类,定义ThreadPoolExecutor和ScheduledThreadPoolExecutor类,并使用相关concurrent类方法。继承
二.简述下从定义线程池,到其底层执行的简单流程接口
ExecutorService executorService = Executors.newFixedThreadPool(3); executorService.submit(new NewTask());
简单定义了如上两句代码,JDK自动建立3个固定大小的线程的线程池,submit实现了Runnable接口的NewTask对象之后,JDK自动启动线程并执行NewTask对象的run方法。流程是如何的呢?队列
1.Executors的newFixedThreadPool方法new了一个ThreadPoolExecutor对象,且new了一个LinkedBlockingQueue对象。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
2.调用ThreadPoolExecutor的构造函数,其中ThreadFactory默认使用defaultThreadFactory,defaultHandler为AbortPolicy。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
3.初始化之后,调用AbstractExecutorService类的submit方法,执行Runnable对象。
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
4.RunnableAdapter实现了Callable接口,FutureTask类包含callable类成员对象。
FutureTask实现RunnableFuture接口,RunnableFuture继承Runnable接口。因此newTaskFor(task, null)方法返回一个FutureTask对象。
public class FutureTask<V> implements RunnableFuture<V> public interface RunnableFuture<V> extends Runnable, Future<V> RunnableFuture<Void> ftask = newTaskFor(task, null); protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
5.关键的调用ThreadPoolExecutor类execute方法,管理线程池并执行Runnable任务。主要逻辑以下:
1)若是线程数量小于corePoolSize,新来一个任务,建立一个新线程。
2)若是线程数量等于corePoolSize,则将任务缓存到workQueue中。
3)若是线程数量等于corePoolSize而且workQueue队列已满,则继续建立线程直到等于maximumPoolSize。
4)若是线程数量等于maximumPoolSize,且workQueue队列已满,则根据defaultHandler策略执行相应措施。默认是AbortPolicy,抛出一个运行时异常RejectedExecutionException。另外还有3种策略:CallerRunsPolicy->若是线程池没有shutdown,则直接调用Runnable的run方法执行。若是线程池shutdown,则直接丢弃;DiscardPolicy->直接丢弃,没有任何异常;DiscardOldestPolicy->丢弃最老的任务,并调用线程池的execute方法执行,若是线程池没有shutdown。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }