java.util.concurrent.Executor : 负责线程的使用与调度的根接口
|–ExecutorService:Executor的子接口,线程池的主要接口
|–ThreadPoolExecutor:ExecutorService的实现类
|–ScheduledExecutorService:ExecutorService的子接口,负责线程的调度
|–ScheduledThreadPoolExecutor:继承了ThreadPoolExecutor实现了ScheduledExecutorServicejava
其余的建立完线程池后,使用 threadPool.execute(new Runnable())方式执行任务。数组
public static void main(String[] args) { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); // 表示延迟3秒执行 scheduledThreadPool.schedule(new Runnable() { public void run() { System.out.println("delay 3 seconds"); } }, 3, TimeUnit.SECONDS); } // 表示延迟1秒后每3秒执行一次 scheduledThreadPool.scheduleAtFixedRate(new Runnable() { public void run() { System.out.println("delay 1 seconds, and excute every 3 seconds"); } }, 1, 3, TimeUnit.SECONDS); }
查看Executors源码咱们知道,Executors 类提供了使用了 ThreadPoolExecutor 的简单的 ExecutorService 实现,也就是上面所说的四种Executors线程池,可是 ThreadPoolExecutor 提供的功能远不止于此。
不过在java doc中,并不提倡咱们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来建立线程池
咱们能够在建立 ThreadPoolExecutor 实例时指定活动线程的数量,咱们也能够限制线程池的大小而且建立咱们本身的 RejectedExecutionHandler 实现来处理不能适应工做队列的工做。
下面咱们就先了解一下ThreadPoolExecutor,而后在看个示例代码。缓存
Executors 源码:并发
public class Executors { public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); } public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1, threadFactory)); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); } }
java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,所以若是要透彻地了解Java中的线程池,必须先了解这个类。下面咱们来看一下ThreadPoolExecutor类的具体实现源码。ide
在ThreadPoolExecutor类中提供了四个构造方法:ui
public class ThreadPoolExecutor extends AbstractExecutorService { ..... public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler); ... }
上面的代码能够得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,经过观察每一个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工做。this
构造器中各个参数的含义:atom
一、Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思能够理解,就是用来执行传进去的任务的;
二、而后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
三、抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的全部方法;
四、而后ThreadPoolExecutor继承了类AbstractExecutorService。spa
一、execute()方法其实是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,经过这个方法能够向线程池提交一个任务,交由线程池去执行。
二、submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并无对其进行重写,这个方法也是用来向线程池提交任务的,可是它和execute()方法不一样,它可以返回任务执行的结果,去看submit()方法的实现,会发现它实际上仍是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。
三、shutdown()和shutdownNow()是用来关闭线程池的。
四、还有一大波get的方法, 能够获取与线程池相关属性的方法。线程
volatile int runState; // 前线程池的状态,它是一个volatile变量用来保证线程之间的可见性 static final int RUNNING = 0; // 当建立线程池后,初始时,线程池处于RUNNING状态 static final int SHUTDOWN = 1; 若是调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不可以接受新的任务,它会等待全部任务执行完毕 static final int STOP = 2; // 若是调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,而且会去尝试终止正在执行的任务; static final int TERMINATED = 3; // 当线程池处于SHUTDOWN或STOP状态,而且全部工做线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
private final BlockingQueue<Runnable> workQueue; //任务缓存队列,用来存放等待执行的任务 private final ReentrantLock mainLock = new ReentrantLock(); //线程池的主要状态锁,对线程池状态(好比线程池大小、runState等)的改变都要使用这个锁 private final HashSet<Worker> workers = new HashSet<Worker>(); //用来存放工做集 private volatile long keepAliveTime; //线程存货时间 private volatile boolean allowCoreThreadTimeOut; //是否容许为核心线程设置存活时间 private volatile int corePoolSize; //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列) private volatile int maximumPoolSize; //线程池最大能容忍的线程数, 当线程数大于corePoolSize时,建立新的先线程,可是建立新的线程数 + corePoolSize不能大于maximumPoolSize private volatile int poolSize; //线程池中当前的线程数 private volatile RejectedExecutionHandler handler; //任务拒绝策略 private volatile ThreadFactory threadFactory; //线程工厂,用来建立线程 private int largestPoolSize; //用来记录线程池中曾经出现过的最大线程数 private long completedTaskCount; //用来记录已经执行完毕的任务个数
ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
从代码注释,咱们知道:
addWorker() 添加任务, 建立Worker, Worker 继承 AbstractQueuedSynchronizer 实现 Runnable
addWorker()几个关键步骤:
w = new Worker(firstTask); final Thread t = w.thread; // 从worker取得线程 if (workerAdded) { t.start(); // worker添加成功,执行任务 workerStarted = true; }
默认状况下,建立线程池以后,线程池中是没有线程的,须要提交任务以后才会建立线程。
在实际中若是须要线程池建立以后当即建立线程,能够经过如下两个方法办到:
// 初始化一个核心线程; public boolean prestartCoreThread() { return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true); } // 初始化全部核心线程 public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; }
workQueue的类型为BlockingQueue,一般能够取下面三种类型:
1)ArrayBlockingQueue:基于数组的先进先出队列,此队列建立时必须指定大小;
2)LinkedBlockingQueue:基于链表的先进先出队列,若是建立时没有指定此队列大小,则默认为Integer.MAX_VALUE;
3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
前面已经讲过, 当线程池的任务缓存队列已满而且线程池中的线程数目达到maximumPoolSize,若是还有任务到来就会采起任务拒绝策略,一般有如下四种策略:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,可是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,而后从新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
shutdown():不会当即终止线程池,而是要等全部任务缓存队列中的任务都执行完后才终止,但不再会接受新的任务
shutdownNow():当即终止线程池,并尝试打断正在执行的任务,而且清空任务缓存队列,返回还没有执行的任务
ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
setCorePoolSize:设置核心池大小
setMaximumPoolSize:设置线程池最大能建立的线程数目大小
当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能当即建立新的线程来执行任务。
咱们能够在建立 ThreadPoolExecutor 实例时指定活动线程的数量,咱们也能够限制线程池的大小而且建立咱们本身的 RejectedExecutionHandler 实现来处理不能适应工做队列的工做。
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + " is rejected"); } }
hreadPoolExecutor 提供了一些方法,咱们可使用这些方法来查询 executor 的当前状态,线程池大小,活动线程数量以及任务数量。所以我是用来一个监控线程在特定的时间间隔内打印 executor 信息。
MyMonitorThread.java
public class MyMonitorThread implements Runnable { private ThreadPoolExecutor executor; private int seconds; private boolean run=true; public MyMonitorThread(ThreadPoolExecutor executor, int delay) { this.executor = executor; this.seconds=delay; } public void shutdown(){ this.run=false; } @Override public void run() { while(run){ System.out.println( String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s", this.executor.getPoolSize(), this.executor.getCorePoolSize(), this.executor.getActiveCount(), this.executor.getCompletedTaskCount(), this.executor.getTaskCount(), this.executor.isShutdown(), this.executor.isTerminated())); try { Thread.sleep(seconds*1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
这里是使用 ThreadPoolExecutor 的线程池实现例子。
WorkerPool.java
public class WorkerPool { public static void main(String args[]) throws InterruptedException{ //RejectedExecutionHandler implementation RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl(); //Get the ThreadFactory implementation to use ThreadFactory threadFactory = Executors.defaultThreadFactory(); //creating the ThreadPoolExecutor ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler); //start the monitoring thread MyMonitorThread monitor = new MyMonitorThread(executorPool, 3); Thread monitorThread = new Thread(monitor); monitorThread.start(); //submit work to the thread pool for(int i=0; i<10; i++){ executorPool.execute(new WorkerThread("cmd"+i)); } Thread.sleep(30000); //shut down the pool executorPool.shutdown(); //shut down the monitor thread Thread.sleep(5000); monitor.shutdown(); } }
意在初始化 ThreadPoolExecutor 时,咱们保持初始池大小为 2,最大池大小为 4 而工做队列大小为 2。所以若是已经有四个正在执行的任务而此时分配来更多任务的话,工做队列将仅仅保留他们(新任务)中的两个,其余的将会被 RejectedExecutionHandlerImpl 处理。
遵循两原则: 一、若是是CPU密集型任务,就须要尽可能压榨CPU,参考值能够设为 NCPU+1 二、若是是IO密集型任务,参考值能够设置为2*NCPU 固然,这只是一个参考值,具体的设置还须要根据实际状况进行调整,好比能够先将线程池大小设置为参考值,再观察任务运行状况和系统负载、资源利用率来进行适当调整。