Java是天生就支持并发的语言,支持并发意味着多线程,线程的频繁建立在高并发及大数据量是很是消耗资源的,由于java提供了线程池。在jdk1.5之前的版本中,线程池的使用是及其简陋的,可是在JDK1.5后,有了很大的改善。JDK1.5以后加入了java.util.concurrent包,java.util.concurrent包的加入给予开发人员开发并发程序以及解决并发问题很大的帮助。这篇文章主要介绍下并发包下的Executor接口,Executor接口虽然做为一个很是旧的接口(JDK1.5 2004年发布),可是不少程序员对于其中的一些原理仍是不熟悉,所以写这篇文章来介绍下Executor接口,同时巩固下本身的知识。若是文章中有出现错误,欢迎你们指出。java
对于数据库链接,咱们常常听到数据库链接池这个概念。由于创建数据库链接时很是耗时的一个操做,其中涉及到网络IO的一些操做。所以就想出把链接经过一个链接池来管理。须要链接的话,就从链接池里取一个。当使用完了,就“关闭”链接,这不是正在乎义上的关闭,只是把链接放回到咱们的池里,供其余人在使用。因此对于线程,也有了线程池这个概念,其中的原理和数据库链接池是差很少的,所以java jdk中也提供了线程池的功能。程序员
线程池的做用:线程池就是限制系统中使用线程的数量以及更好的使用线程数据库
根据系统的运行状况,能够自动或手动设置线程数量,达到运行的最佳效果:配置少了,将影响系统的执行效率,配置多了,又会浪费系统的资源。用线程池配置数量,其余线程排队等候。当一个任务执行完毕后,就从队列中取一个新任务运行,若是没有新任务,那么这个线程将等待。若是来了一个新任务,可是没有空闲线程的话,那么把任务加入到等待队列中。缓存
为何要用线程池:网络
Java里面线程池的顶级接口是Executor,可是严格意义上讲Executor并非一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。多线程
比较重要的几个类:并发
ExecutorService高并发 |
真正的线程池接口。工具 |
ScheduledExecutorService大数据 |
能和Timer/TimerTask相似,解决那些须要任务重复执行的问题。 |
ThreadPoolExecutor |
ExecutorService的默认实现。 |
ScheduledThreadPoolExecutor |
继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。 |
要配置一个线程池是比较复杂的,尤为是对于线程池的原理不是很清楚的状况下,颇有可能配置的线程池不是较优的,所以在Executors类里面提供了一些静态工厂,生成一些经常使用的线程池。
1. newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
建立一个单线程的线程池。这个线程池只有一个线程在工做,也就是至关于单线程串行执行全部任务。若是这个惟一的线程由于异常结束,那么会有一个新的线程来替代它。此线程池保证全部任务的执行顺序按照任务的提交顺序执行。
2. newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
建立固定大小的线程池。每次提交一个任务就建立一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,在提交新任务,任务将会进入等待队列中等待。若是某个线程由于执行异常而结束,那么线程池会补充一个新线程。
3. newCachedThreadPool
public static ExecutorService newCachedThreadPool() public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
建立一个可缓存的线程池。若是线程池的大小超过了处理任务所须要的线程,
那么就会回收部分空闲(60秒处于等待任务到来)的线程,当任务数增长时,此线程池又能够智能的添加新线程来处理任务。此线程池的最大值是Integer的最大值(2^31-1)。
4. newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
建立一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
实例:
注:使用了java8的lambda表达式以及stream
1.newSingleThreadExecutor
public class SingleThreadExecutorTest { public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
IntStream.range(0, 5).forEach(i -> executor.execute(() -> { String threadName = Thread.currentThread().getName(); System.out.println("finished: " + threadName); })); try { //close pool executor.shutdown(); executor.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (!executor.isTerminated()) { executor.shutdownNow(); } } } }
输出结果:
finished: pool-1-thread-1 finished: pool-1-thread-1 finished: pool-1-thread-1 finished: pool-1-thread-1 finished: pool-1-thread-1
线程名都同样,说明是同一个线程
2.newFixedThreadPool
public class FixedThreadExecutorTest { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(3); IntStream.range(0, 6).forEach(i -> executor.execute(() -> { try { TimeUnit.SECONDS.sleep(1); String threadName = Thread.currentThread().getName(); System.out.println("finished: " + threadName); } catch (InterruptedException e) { e.printStackTrace(); } })); //close pool 同上... } }
输出结果:
finished: pool-1-thread-2 finished: pool-1-thread-3 finished: pool-1-thread-1 finished: pool-1-thread-1 finished: pool-1-thread-2 finished: pool-1-thread-3
只建立了三个线程
3.newCachedThreadPool
public class CachedThreadExecutorTest { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); IntStream.range(0, 6).forEach(i -> executor.execute(() -> { try { TimeUnit.SECONDS.sleep(1); String threadName = Thread.currentThread().getName(); System.out.println("finished: " + threadName); } catch (InterruptedException e) { e.printStackTrace(); } }));
//close pool 同上... } }
输出结果:
finished: pool-1-thread-1 finished: pool-1-thread-2 finished: pool-1-thread-3 finished: pool-1-thread-6 finished: pool-1-thread-5 finished: pool-1-thread-4
建立了6个线程
4.newScheduledThreadPool
public class ScheduledThreadExecutorTest { public static void main(String[] args) { ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); executor.scheduleAtFixedRate(() -> System.out.println(System.currentTimeMillis()), 1000, 2000, TimeUnit.MILLISECONDS); //close pool 同上 } }
输出结果:
1457967177735 1457967179736 1457967181735
java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,所以若是要透彻地了解Java中的线程池,必须先了解这个类。下面咱们来看一下ThreadPoolExecutor类的具体实现源码。
在ThreadPoolExecutor类中提供了四个构造方法:
public class ThreadPoolExecutor extends AbstractExecutorService { ..... public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler); ... }
从上面的代码能够得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,经过观察每一个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工做。
ThreadPoolExecutor的完整构造方法的签名是:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小时 TimeUnit.MINUTES; //分钟 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //纳秒
ArrayBlockingQueue; //有界队列 LinkedBlockingQueue; //无界队列 SynchronousQueue; //特殊的一个队列,只有存在等待取出的线程时才能加入队列,能够说容量为0,是无界队列
ArrayBlockingQueue和PriorityBlockingQueue使用较少,通常使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,可是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,而后从新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
ThreadPoolExecutor是Executors类的底层实现。
在JDK帮助文档中,有如此一段话:
“强烈建议程序员使用较为方便的 Executors
工厂方法 Executors.newCachedThreadPool()
(无界线程池,能够进行自动线程回收)、Executors.newFixedThreadPool(int)
(固定大小线程池)Executors.newSingleThreadExecutor()
(单个后台线程)
它们均为大多数使用场景预约义了设置。”
下面介绍一下几个类的源码:
ExecutorService newFixedThreadPool (int nThreads):固定大小线程池。
能够看到,corePoolSize和maximumPoolSize的大小是同样的(实际上,后面会介绍,若是使用无界queue的话 maximumPoolSize参数是没有意义的),keepAliveTime和unit的设值代表什么?-就是该实现不想keep alive!最后的BlockingQueue选择了LinkedBlockingQueue,该queue有一个特色,他是无界的。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
ExecutorService newSingleThreadExecutor():单线程
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
ExecutorService newCachedThreadPool():无界线程池,能够进行自动线程回收
这个实现就有意思了。首先是无界的线程池,因此咱们能够发现maximumPoolSize为big big。其次BlockingQueue的选择上使用SynchronousQueue。可能对于该BlockingQueue有些陌生,简单说:该 QUEUE中,每一个插入操做必须等待另外一个线程的对应移除操做。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
从上面给出的ThreadPoolExecutor类的代码能够知道,ThreadPoolExecutor继承了AbstractExecutorService,AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。
public abstract class AbstractExecutorService implements ExecutorService
而ExecutorService又是继承了Executor接口
public interface ExecutorService extends Executor
咱们看一下Executor接口的实现:
public interface Executor { void execute(Runnable command); }
到这里,你们应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。
Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思能够理解,就是用来执行传进去的任务的;
而后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的全部方法;
而后ThreadPoolExecutor继承了类AbstractExecutorService。
在ThreadPoolExecutor类中有几个很是重要的方法:
public void execute(Runnable command) public <T> Future<T> submit(Callable<T> task) public void shutdown() public List<Runnable> shutdownNow() //返回未执行的任务
execute()方法其实是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,经过这个方法能够向线程池提交一个任务,交由线程池去执行。
submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中 并无对其进行重写,这个方法也是用来向线程池提交任务的,可是它和execute()方法不一样,它可以返回任务执行的结果,去看submit()方法的 实现,会发现它实际上仍是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。
shutdown()和shutdownNow()是用来关闭线程池的。