要求:线程资源必须经过线程池
提供,不容许在应用自行显式建立线程; 说明:使用线程池的好处是减小在建立和销毁线程上所花的时间以及系统资源的开销,解决资源不足的问题。若是不使用线程池,有可能形成系统建立大量同类线程而致使消耗内存或者“过分切换”的问题。java
by 《阿里巴巴Java手册》编程
线程池,顾名思义是一个放着线程的池子,这个池子的线程主要是用来执行任务的。当用户提交任务时,线程池会建立线程去执行任务,若任务超过了核心线程数的时候,会在一个任务队列里进行排队等待,这个详细流程,咱们会后面细讲。 任务,一般是一些抽象的且离散的工做单元,咱们会把应用程序的工做分解到多个任务中去执行。通常咱们须要使用多线程执行任务的时候,这些任务最好都是相互独立的,这样有必定的任务边界供程序把控。 多线程,当使用多线程的时候,任务处理过程就能够从主线程中剥离出来,任务能够并行处理,同时处理多个请求。固然了,任务处理代码必须是线程安全的。数组
一共有7个:corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler,(5+2,前5个重要)缓存
这边咱们区分两个概念:安全
当前线程总数< corePoolSize
,新建的线程即为核心线程。当前线程总数< corePoolSize
,新建的线程即为核心线程。核心线程默认状况下会一直存活在线程池中,即便这个核心线程不工做(空闲状态),除非ThreadPoolExecutor 的 allowCoreThreadTimeOut
这个属性为 true
,那么核心线程若是空闲状态下,超过必定时间后就被销毁。多线程
线程总数 = 核心线程数 + 非核心线程数并发
keepAliveTime即为空闲线程容许的最大的存活时间。若是一个非核心线程空闲状态的时长超过keepAliveTime了,就会被销毁掉。注意:若是设置allowCoreThreadTimeOut = true
,就变成核心线程超时销毁了。less
TimeUnit 是一个枚举类型,列举以下:性能
单位 | 说明 |
---|---|
NANOSECONDS | 1微毫秒 = 1微秒 / 1000 |
MICROSECONDS | 1微秒 = 1毫秒 / 1000 |
MILLISECONDS | 1毫秒 = 1秒 /1000 |
SECONDS | 秒 |
MINUTES | 分 |
HOURS | 小时 |
DAYS | 天 |
当核心线程都在工做的时候,新提交的任务就会被添加到这个工做阻塞队列中进行排队等待;若是阻塞队列也满了,线程池就新建非核心线程去执行任务。workQueue维护的是等待执行的Runnable对象。 经常使用的 workQueue 类型:(无界队列、有界队列、同步移交队列)ui
适用于很是大的或者无界的线程池,能够避免任务排队
,SynchronousQueue队列接收到任务后,会直接将任务从生产者移交给工做者线程
,这种移交机制高效。它是一种不存储元素的队列,任务不会先放到队列中去等线程来取,而是直接移交给执行的线程。只有当线程池是无界的或能够拒绝任务的时候,SynchronousQueue队列的使用才有意义,maximumPoolSize 通常指定成 Integer.MAX_VALUE,即无限大。要将一个元素放入SynchronousQueue,就须要有另外一个线程在等待接收这个元素。若没有线程在等待,而且线程池的当前线程数小于最大值,则ThreadPoolExecutor就会新建一个线程;不然,根据饱和策略,拒绝任务。newCachedThreadPool
默认使用的就是这种同步移交队列。吞吐量高于LinkedBlockingQueue。链表结构
的阻塞队列,FIFO原则排序
。当任务提交过来,若当前线程数小于corePoolSize核心线程数,则线程池新建核心线程去执行任务;若当前线程数等于corePoolSize核心线程数,则进入工做队列进行等待。LinkedBlockingQueue队列没有最大值限制,只要任务数超过核心线程数,都会被添加到队列中,这就会致使总线程数永远不会超过 corePoolSize
,因此maximumPoolSize 是一个无效设定。newFixedThreadPool
和newSingleThreadPool
默认是使用的是无界LinkedBlockingQueue队列
。吞吐量高于ArrayBlockingQueue。数组结构
的有界
阻塞队列,能够设置队列上限值,FIFO原则排序
。当任务提交时,若当前线程小于corePoolSize核心线程数,则新建核心线程执行任务;若当先线程数等于corePoolSize核心线程数,则进入队列排队等候;若队列的任务数也排满了,则新建非核心线程执行任务;若队列满了且总线程数达到了maximumPoolSize最大线程数,则根据饱和策略进行任务的拒绝。注意: 只有当任务相互独立没有任何依赖的时候,线程池或工做队列设置有界是合理的;若任务之间存在依赖性,须要使用无界的线程池,如newCachedThreadPool,不然有可能会致使死锁问题。
建立线程的方式,这是一个接口,你 new 他的时候须要实现他的 Thread newThread(Runnable r) 方法,通常用不上,
抛出异常专用,当队列和最大线程池都满了以后的饱和策略。
通常流程即为:建立worker线程;添加任务入workQueue队列;worker线程执行任务。
未达到 corePoolSize
,则新建一个线程(核心线程)
执行任务达到了 corePoolSize
,则将任务移入阻塞队列等待
,让空闲线程处理;队列已满
,新建线程(非核心线程)
执行任务总线程数又达到了 maximumPoolSize
,就会按照拒绝策略处理没法执行的任务,好比RejectedExecutionHandler抛出异常。这边,为了你们可以更好的去理解这块的流程,咱们举一个例子。生活中咱们常常会去打一些公司的咨询电话或者是一些特定机构的投诉电话,而那个公司或者机构的客服中心就是一个线程池
,正式员工的客服小姐姐就比如是核心线程
,好比有6个客服小姐姐。 5. 当用户的电话打进到公司的客服中心的时候(提交任务)
; 6. 客服中心会调度客服小姐姐去接听电话(建立线程执行任务)
,若是接听的电话超过了6个,6个客服小姐姐都在接听的工做状态了(核心线程池满了)
,这时客服中心会有一个电话接听等待通道(进入任务队列等待)
,就是咱们常常听到的“您的通话在排队,前面排队n人。” 7. 固然,这个电话接听等待通道也是有上限的,当超过这个上限的时候(任务队列满了)
,客服中心就会当即安排外协员工(非核心线程)
,也就是非正式员工去接听额外的电话(任务队列满了,正式和非正式员工数量>总任务数,线程池建立非核心线程去执行任务)
。 8. 当用户电话数激增,客服中心控制台发现这个时候正式员工和外协员工的总和已经知足不了这些用户电话接入了(总线程池满)
,就开始根据一些公司电话接听规则去拒绝这些电话(按照拒绝策略处理没法执行的任务)
terminated()
后会更新为这个状态。ThreadPoolExecutor
,在java.util.concurrent下。
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */
public ThreadPoolExecutor(int corePoolSize, //核心线程数 int maximumPoolSize, //最大线程数 long keepAliveTime, //空闲线程存活时间 TimeUnit unit, //存活时间单位 BlockingQueue<Runnable> workQueue, //任务的阻塞队列 ThreadFactory threadFactory, //新线程的产生方式 RejectedExecutionHandler handler //拒绝策略) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
复制代码
ThreadPoolExecutor 继承 AbstractExecutorService;AbstractExecutorService 实现 ExecutorService, ExecutorService 继承 Executor
public class ThreadPoolExecutor extends AbstractExecutorService {}
public abstract class AbstractExecutorService implements ExecutorService {}
public interface ExecutorService extends Executor {}
复制代码
1)5参数构造器
// 5参数构造器
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) 复制代码
2)6参数构造器-1
// 6参数构造器-1
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) 复制代码
3)6参数构造器-2
// 6参数构造器-2
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) 复制代码
4)7参数构造器
// 7参数构造器
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 复制代码
//建立固定数目线程的线程池
Executors.newFixedThreadPool(200);
//建立一个无限线程的线程池,无需等待队列,任务提交即执行
Executors.newCachedThreadPool()
//建立有且仅有一个线程的线程池
Executors.newSingleThreadExecutor();
复制代码
newCachedThreadPool将建立一个可缓存的线程,若是当前线程数超过处理任务时,回收空闲线程;当需求增长时,能够添加新线程去处理任务。
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
复制代码
newFixedThreadPool建立一个固定长度的线程池,每次提交一个任务的时候就会建立一个新的线程,直到达到线程池的最大数量限制。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads);
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
复制代码
newScheduledThreadPool建立一个固定长度的线程池,而且以延迟或者定时的方式去执行任务。
ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
复制代码
newSingleThreadExecutor顾名思义,是一个单线程的Executor,只建立一个工做线程执行任务,若这个惟一的线程异常故障了,会新建另外一个线程来替代,newSingleThreadExecutor能够保证任务依照在工做队列的排队顺序来串行执行。
ExecutorService singleThreadPool = Executors.newSingleThreadPool();
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();
}
}
复制代码
ThreadPoolExecutor.execute(Runnable command)方法,便可向线程池内添加一个任务
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
//获取当前线程池的状态
int c = ctl.get();
//若当前线程数量小于corePoolSize,则建立一个新的线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//判断当前线程是否处于运行状态,且写入任务阻塞队列是否成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次获取线程状态进行双重检查;若是线程变成非运行状态,则从阻塞队列移除任务;
if (! isRunning(recheck) && remove(command))
//执行拒绝策略
reject(command);
//若当前线程池为空,则新建一个线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//当前线程为非运行状态而且尝试新建线程,若失败则执行拒绝策略。
else if (!addWorker(command, false))
reject(command);
}
复制代码
1)若当前线程数小于corePoolSize
,则调用addWorker()方法建立线程执行任务。 2)若当前线程不小于corePoolSize
,则将任务添加到workQueue队列,等待空闲线程来执行。 3)若队列里的任务数到达上限
,且当前运行线程小于maximumPoolSize
,任务入workQueue队列失败,新建线程执行任务; 4)若建立线程也失败(队列任务数到达上限
,且当前线程数达到了maximumPoolSize
),对于新加入的任务,就会调用reject()(内部调用handler)拒绝接受任务。
ThreadPoolExecutor的饱和策略能够经过调用setRejectedExecutionHandler
来修改。JDK提供了几种不一样的RejectedExecutionHandler实现,每种实现都包含有不一样的饱和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy和DiscardOldestPolicy。 拒绝策略以下:
RejectedExecutionHandler rejected = null;
//默认策略,阻塞队列满,则丢任务、抛出异常
rejected = new ThreadPoolExecutor.AbortPolicy();
//阻塞队列满,则丢任务,不抛异常
rejected = new ThreadPoolExecutor.DiscardPolicy();
//删除队列中最旧的任务(最先进入队列的任务),尝试从新提交新的任务
rejected = new ThreadPoolExecutor.DiscardOldestPolicy();
//队列满,不丢任务,不抛异常,若添加到线程池失败,那么主线程会本身去执行该任务
rejected = new ThreadPoolExecutor.CallerRunsPolicy();
复制代码
(1)AbortPolicy、DiscardPolicy和DiscardOldestPolicy AbortPolicy
是默认的饱和策略
,就是停止任务,该策略将抛出RejectedExecutionException。调用者能够捕获这个异常而后去编写代码处理异常。 当新提交的任务没法保存到队列
中等待执行时,DiscardPolicy
会悄悄的抛弃该任务
。 DiscardOldestPolicy
则会抛弃最旧的
(下一个将被执行的任务),而后尝试从新提交新的任务。若是工做队列是那个优先级队列时,搭配DiscardOldestPolicy饱和策略会致使优先级最高的那个任务被抛弃,因此二者不要组合使用。 (2)CallerRunsPolicy CallerRunsPolicy是“调用者运行”策略,实现了一种调节机制 。它不会抛弃任务
,也不会抛出异常
。 而是将任务回退到调用者
。它不会在线程池中
执行任务,而是在一个调用了execute的线程中
执行该任务。在线程满后,新任务将交由调用线程池execute方法的主线程执行,而因为主线程在忙碌,因此不会执行accept方法,从而实现了一种平缓的性能下降。 当工做队列被填满后,没有预约义的饱和策略来阻塞execute(除了抛弃就是停止还有去让调用者去执行)。然而能够经过Semaphore来限制任务的到达率。
参考 《Java并发编程实战》 jdk 1.8 源码包