提交一个线程到线程池:java
1.判断核心线程是否都在执行任务,若是有空闲的或者没有被建立的,则建立去执行新的工做线程执行任务,若是核心线程都在执行,则下一步;缓存
2.线程池判断工做队列是否已满,没有满,则提交任务到工做等待队列,若是工做等待队列已满,则下一步;bash
3.判断线程池里的全部工做线程是否都在工做,若是没有,则建立一个新的工做线程执行任务,若是已满,则根据饱和策略处理任务。服务器
饱和执行策略:并发
1.AbortPolicy 拒绝新任务,直接抛出异常异步
2.DiscardPolicy 直接丢弃任务spa
3.DiscardOldestPolicy 丢弃队列中最老的任务 先将阻塞队列中的头元素出队抛弃,再尝试提交任务。线程
4.CallerRunsPolicy 将任务丢给调用线程执行rest
接口继承关系code
Java API对ExecutorService接口的实现有两个,因此这两个便是Java线程池具体实现类:
1. ThreadPoolExecutor 2. ScheduledThreadPoolExecutor
ExecutorService还继承了Executor
接口(注意区分Executor接口和Executors工厂类),这个接口只有一个execute()
方法,最后咱们看一下整个继承树
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { }
参数详情:
corePoolSize 核心线程池大小,在刚开始建立线程池时,核心线程建立完不会当即启动,有任务提交时才会启动达到核心线程数大小,若是一开始就要建立能够调用prestartAllCoreThreads。
maximumPoolSize 容许的最大线程数量,当核心线程满和阻塞队列满时才会判断最大线程数量,决定是否建立普通线程。
keepAliveTime 线程数大于核心线程数时,多余空闲线程存活时间
unit keepAliveTime的时间单位
workQueue 线程数量超过核心线程数时用于保存任务,包含三种类型的BlockingQueue:有界队列,无界队列,同步移交。
可选择的BlockingQueue:
1.LinkedBlockingQueue 无界队列,遵循先进先出,使用该队列作为阻塞队列时要尤为小心,当任务耗时较长时可能会致使大量新任务在队列中堆积最终致使OOM。阅读代码发现,Executors.newFixedThreadPool 采用就是 LinkedBlockingQueue,当QPS很高,发送数据很大,大量的任务被添加到这个无界LinkedBlockingQueue 中,致使cpu和内存飙升服务器挂掉。
2.ArrayBlockingQueue 有界队列 遵循先进先出,
3.PriorityBlockingQueue 优先级队列,任务的优先级由Comparator决定。
4.SychronousQueue 将任务直接移交给工做线程,不是一个真正的队列,是一种线程移交机制。必须有另外一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。
threadPoolFactory 建立新线程时使用的工厂类
handler 阻塞队列已满,且线程数达到最大线程后所采起的饱和策略
Java给咱们提供了一个Executors工厂类
,它能够帮助咱们很方便的建立各类类型ExecutorService线程池,Executors一共能够建立下面这四类线程池:
newCachedThreadPool:建立一个可缓存线程池,若是线程池长度超过处理须要,可灵活回收空闲线程,若无可回收,则新建线程。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
初始化时核心线程数为0,若是线程池长度超过处理须要,可当即回收空闲线程,无可回收则新建。
要将一个元素放入SynchronousQueue中,必须有另外一个线程正在等待接收这个元素。所以即使SynchronousQueue一开始为空且大小为1,第一个任务也没法放入其中,由于没有线程在等待从SynchronousQueue中取走元素。所以第一个任务到达时便会建立一个新线程执行该任务。
newFixedThreadPool:建立一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
固定大小线程数,使用无限大的LinkedBlockingQueue存听任务。
newScheduledThreadPool:建立一个定长线程池,支持定时及周期性任务执行。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
DelayedWorkQueue是一个无界阻塞队列,是ScheduledThreadPoolExecutor的静态内部类
newSingleThreadExecutor:建立一个单线程化的线程池,它只会用惟一的工做线程来执行任务,保证全部任务按照指定顺序(FIFO, LIFO, 优先级)执行。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); }
首先new了一个线程数目为 1 的ScheduledThreadPoolExecutor,再把该对象传入DelegatedScheduledExecutorService中,看看DelegatedScheduledExecutorService的实现代码:
DelegatedScheduledExecutorService(ScheduledExecutorService executor) { super(executor); e = executor; } #父类: DelegatedExecutorService(ExecutorService executor) { e = executor; }
其实就是使用装饰模式加强了ScheduledExecutorService(1)的功能,不只确保只有一个线程顺序执行任务,也保证线程意外终止后会从新建立一个线程继续执行任务。
Executors只是一个工厂类,它全部的方法返回的都是ThreadPoolExecutor
、ScheduledThreadPoolExecutor
这两个类的实例。
ExecutorService executorService = Executors.newFixedThreadPool(10); executorService.execute(new Runnable() { public void run() { System.out.println("Asynchronous task"); } }); executorService.shutdown();
ExecutorService有以下几个执行方法:
- execute(Runnable) - submit(Runnable) - submit(Callable) - invokeAny(...) - invokeAll(...)
这个方法接收一个Runnable实例,而且异步的执行,请看下面的实例:
ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.execute(new Runnable() { public void run() { System.out.println("Asynchronous task"); } }); executorService.shutdown();
这个方法有个问题,就是没有办法获知task的执行结果。若是咱们想得到task的执行结果,咱们能够传入一个Callable的实例。
submit(Runnable)
和execute(Runnable)
区别是前者能够返回一个Future对象,经过返回的Future对象,咱们能够检查提交的任务是否执行完毕,请看下面执行的例子:
Future future = executorService.submit(new Runnable() { public void run() { System.out.println("Asynchronous task"); } }); future.get(); //returns null if the task has finished correctly.
若是任务执行完成,future.get()
方法会返回一个null。注意,future.get()方法会产生阻塞。
submit(Callable)
和submit(Runnable)
相似,也会返回一个Future对象,可是除此以外,submit(Callable)接收的是一个Callable的实现,Callable接口中的call()
方法有一个返回值,能够返回任务的执行结果,而Runnable接口中的run()
方法是void
的,没有返回值。请看下面实例:
Future future = executorService.submit(new Callable(){ public Object call() throws Exception { System.out.println("Asynchronous Callable"); return "Callable Result"; } }); System.out.println("future.get() = " + future.get());
若是任务执行完成,future.get()方法会返回Callable任务的执行结果。注意,future.get()方法会产生阻塞。
invokeAny(...)
方法接收的是一个Callable的集合,执行这个方法不会返回Future,可是会返回全部Callable任务中其中一个任务的执行结果。这个方法也没法保证返回的是哪一个任务的执行结果,反正是其中的某一个。请看下面实例:
ExecutorService executorService = Executors.newSingleThreadExecutor(); Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 1"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 2"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 3"; } }); String result = executorService.invokeAny(callables); System.out.println("result = " + result); executorService.shutdown();
invokeAll(...)
与 invokeAny(...)
相似也是接收一个Callable集合,可是前者执行以后会返回一个Future的List,其中对应着每一个Callable任务执行后的Future对象。状况下面这个实例
ExecutorService executorService = Executors.newSingleThreadExecutor(); Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 1"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 2"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 3"; } }); List<Future<String>> futures = executorService.invokeAll(callables); for(Future<String> future : futures){ System.out.println("future.get = " + future.get()); } executorService.shutdown();
当咱们使用完成ExecutorService以后应该关闭它,不然它里面的线程会一直处于运行状态。
举个例子,若是的应用程序是经过main()方法启动的,在这个main()退出以后,若是应用程序中的ExecutorService没有关闭,这个应用将一直运行。之因此会出现这种状况,是由于ExecutorService中运行的线程会阻止JVM关闭。
若是要关闭ExecutorService中执行的线程,咱们能够调用ExecutorService.shutdown()
方法。在调用shutdown()方法以后,ExecutorService不会当即关闭,可是它再也不接收新的任务,直到当前全部线程执行完成才会关闭,全部在shutdown()执行以前提交的任务都会被执行。
若是咱们想当即关闭ExecutorService,咱们能够调用ExecutorService.shutdownNow()
方法。这个动做将跳过全部正在执行的任务和被提交尚未执行的任务。可是它并不对正在执行的任务作任何保证,有可能它们都会中止,也有可能执行完成。
正确结束线程应该在线程内先使用break结束任务。