首先是老规矩,推荐一下个人企鹅交流群:java
有兴趣交流springboot进行快速开发的同窗能够加一下下面的企鹅群。spring
普通线程编程
当多个资源须要开启线程来处理的时候,咱们怎么办?是否一直在重复下面的流程:数组
create -> run -> destroy
复制代码
咱们知道计算机的每次运行都是须要大量的资源消耗,5个线程的操做可能没有影响,5w个呢? 五万次建立和销毁才有仅仅五万次的执行吗?执行任务可能花费了大量的时间来处理这些建立和销毁。缓存
线程池安全
eg:超市结帐:收营员服务组,单个收营员,收银工做,等待被收银的人群springboot
在Executors中,jdk提供了一下相关的线程池,以下:bash
静态方法 | 建立的线程池类型 | 返回值的实际实现 |
---|---|---|
newFixedThreadPool(int) | 固定线程池 | ThreadPoolExecutor |
newWorkStealingPool() | 处理器核心数的并行线程池 | ForkJoinPool |
newSingleThreadExecutor() | 一个线程的单独线程池 | FinalizableDelegatedExecutorService |
newCachedThreadPool() | 缓存线程池 | ThreadPoolExecutor |
newSingleThreadScheduledExecutor() | 单独线程定时线程池 | DelegatedScheduledExecutorService |
newScheduledThreadPool(int) | 定时线程池 | ScheduledThreadPoolExecutor |
newSingleThreadExecutor() 一个线程的线程池this
为何这里我要拿一个线程的线程池来讲明呢?其实咱们把简单的搞定复杂的也是演变过来的。先上码:spa
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
复制代码
咱们能够看到上面方法的返回值都是ExecutorService,但实际上实例化的是FinalizableDelegatedExecutorService,咱们进去看看源码,以下:
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
//构造方法
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
//对象销毁的时候调用
protected void finalize() {
super.shutdown();
}
}
复制代码
上面的代码咱们能够明显的看到FinalizableDelegatedExecutorService仅仅是对DelegatedExecutorService的封装,惟一实现的就是在对象销毁的时候将ExecutorService结束。
到这里咱们就应该返回来分析DelegatedExecutorService,以及上面的方法中的具体代码。
咱们看看默认的单线程线程池的实现,以下:
new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
//此处的代码实现了一个ExecutorService,分别有几个参数?何解?
//
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
}
//咱们能够看到几个参数的字面意思分别是:
//corePoolSize 核心线程数量,包括空闲线程
//maximumPoolSize 最大线程数量
//keepAliveTime 保持活跃时间(参照后续源码,这里应该是:当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间)
//unit keepAliveTime 参数的时间单位
//workQueue 执行前用于保持任务的队列。此队列仅保持由 execute方法提交的 Runnable任务
//Executors.defaultThreadFactory() 默认线程工厂
//defaultHandler 超出线程和任务队列的任务的处理程序,实现为:new AbortPolicy(),固然这里默认是没有处理的,须要咱们手动实现
//这里,咱们接着看默认的线程工厂,毕竟线程池核心是须要线程来执行任务,因此此处先看线程来源。
static class DefaultThreadFactory implements ThreadFactory {
//池数量,指定原子操做
private static final AtomicInteger poolNumber = new AtomicInteger(1);
//线程组
private final ThreadGroup group;
//线程数量,指定原子操做
private final AtomicInteger threadNumber = new AtomicInteger(1);
//线程名称前缀
private final String namePrefix;
DefaultThreadFactory() {
//获取系统安全管理器
SecurityManager s = System.getSecurityManager();
//建立线程组,由是否获取系统安全管理器决定
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
//构造线程名称
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
//建立线程
public Thread newThread(Runnable r) {
//将线程组、Runnable接口(线程实际执行代码块)、线程名、线程所须要的堆栈大小为0
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
//若是为守护线程,取消守护状态,必须在线程执行前调用这个setDaemon方法
if (t.isDaemon())
t.setDaemon(false);
//默认任务优先级,值为5
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
//上面的默认线程工厂,提供给了咱们一个非守护线程的线程,由原子操做保证线程惟一,任务优先级默认(最低1,最高10,默认5,此处优先级为5)
复制代码
看了上面这些咱们能够总结一下:单线程线程池,默认只有一个线程和一个线程池,等待新任务时间为0,添加了原子操做来绑定线程。
是否是到这里就完了? 固然没有,咱们如今须要看看更加具体的ThreadPoolExecutor,才能更加深刻明白线程池。
public class ThreadPoolExecutor extends AbstractExecutorService {
/** *全部的构造方法均指向这里,因此咱们看一下这个就足够 */
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
//参数检查,说明线程池不能线程=0,也不能最大线程数量不大于0切最大线程数量不能少于核心线程数量,等待任务最长时间不能小于0
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
//等待任务队列、线程工厂、超任务队列的处理程序
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
//上面的判断,能够看作是一种防护式编程,全部的问题预先处理,后续无需考虑相似问题
//构造线程池相关设定阈值
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}
//到了这里其实咱们没必要先追究具体的实现,仍是先看看AbstractExecutorService吧。
//抽象的执行服务
public abstract class AbstractExecutorService implements ExecutorService {
//执行方法
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
//获取任务数量
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
//任务集合
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
//执行完成服务
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
try {
// 记录异常
ExecutionException ee = null;
//超时时间线
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//使用迭代器获取任务
Iterator<? extends Callable<T>> it = tasks.iterator();
// 肯定开始一项任务
futures.add(ecs.submit(it.next()));
//任务数量减小
--ntasks;
//正在执行任务标志
int active = 1;
//循环执行任务
for (;;) {
//获取任务队列中第一个任务
Future<T> f = ecs.poll();
//任务为空,若是还有任务则执行任务(任务数量减1,提交任务到执行队列,正在执行任务数量+1)
//正在执行任务数为0,说明任务执行完毕,中断任务循环
//如有超时检查,则执行超时检查机制
//上述状况都不知足,则取出任务队列头,并将其从队列移除
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
break;
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
f = ecs.take();
}
//任务不为空
if (f != null) {
//正在执行标志-1
--active;
try {
//返回执行结果
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
//取消全部任务
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
//执行方法
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
//和上面相似,这里也是建立任务队列
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
//迭代进行任务执行
try {
//建立任务,并添加到任务队列
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
//设置超时时间标记
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
//在执行器没有多少多并行性的状况下,交替执行时间检查和调用。
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
nanos = deadline - System.nanoTime();
//任务超时,返回任务队列
if (nanos <= 0L)
return futures;
}
//遍历任务并返回任务执行结果
for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
//超时
if (nanos <= 0L)
return futures;
try {
//给定执行时间等待任务完成并返回结果
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
//未完成则取消执行
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
/** *建立任务队列 */
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
/** * 提交任务到执行队列 */
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
}
复制代码
经过上面的代码咱们已经基本了解了一个线程池是如何建立任务队列并执行任务的,因此在这里咱们只须要关注一些关键的ThreadPoolExecutor的方法就能了解线程池是如何工做的,而且对应的几种模式的线程池均可以推导出来。
首先在此次看源码以前咱们要胡乱思索一番,整理一下线程池的执行大概流程:
咱们前面简单的说过几个ThreadPoolExecutor的主要参数,咱们下面再仔细总结一下:
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}
复制代码
核心线程:线程池新建线程的时候,若是当前线程总数小于corePoolSize,则新建的是核心线程,若是超过corePoolSize,则新建的线程不是核心线程。核心线程默认状况下会一直存活在线程池中,即便这个核心线程啥也不干(闲置状态)。若是指定ThreadPoolExecutor的allowCoreThreadTimeOut这个属性为true,那么核心线程若是不干活(闲置状态)的话,超过必定时间(时长下面参数决定),就会被销毁掉。
线程总数 = 核心线程数 + 非核心线程数。
一个非核心线程,若是不干活(闲置状态)的时长超过这个参数所设定的时长,就会被销毁掉,若是设置allowCoreThreadTimeOut = true,则会做用于核心线程。
TimeUnit是一个枚举类型,其包括: NANOSECONDS : 1微毫秒 = 1微秒 / 1000 MICROSECONDS : 1微秒 = 1毫秒 / 1000 MILLISECONDS : 1毫秒 = 1秒 /1000 SECONDS : 秒 MINUTES : 分 HOURS : 小时 DAYS : 天
当全部的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,若是队列满了,则新建非核心线程执行任务。
threadFactory:建立线程的方式。
handler:异常处理程序。
既然已经知道了任务执行,那么任务是怎么排队的呢?
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/* * 1. 若是运行的线程少于corepoolSize大小,新任务会直接开启新的线程执行。 * 对addWorker的调用原子性地检查运行状态和workerCount,从而经过返回false防止假警报,假警报会在不该该的状况下添加线程。 * * 2. 若是一个任务成功的加入队列,咱们须要再次检查是否须要开启新的线程来执行。 * 可能缘由有:已有任务执行完毕,或者线程池已经被结束。 * * * 3. 若是不能对任务进行排队,则尝试添加一个新任务线程。 * 若是它失败了,咱们知道咱们已经关闭或饱和了因此拒绝这个任务。 */
//运行状态标签
int c = ctl.get();
//正在执行的线程数 < 核心线程数 ,则当即执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//再次检查是否运行且没超过任务队列容量
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//须要执行的任务不在运行状态且不在等待队列,则将这个任务异常抛出
if (! isRunning(recheck) && remove(command))
reject(command);
//运行任务数量为空,则将核心线程外的线程任务置空
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//超过核心线程数且其余线程任务也添加失败,抛出异常
else if (!addWorker(command, false))
reject(command);
}
复制代码
看到这里咱们已经模模糊糊的明白了任务排队执行,全部的任务队列都是同样的排队执行,那么咱们任务队列又有哪些呢?
LinkedBlockingQueue:线性阻塞队列。接收到任务,若是没超过corePoolSize,则建立新线程执行,不然进入阻塞队列等待
ArrayBlockingQueue:数组阻塞队列。数组特诊是长度固定,也就是这个队列长度固定。接收到新任务,若是没超过corePoolSize,则建立新线程执行,若是超过,则建立新线程(线程总数<maximumPoolSize)执行。若是新任务既不能在队列中等待,又不能执行,抛出异常。
SynchronousQueue:同步队列。 既然是同步队列,说明新任务来了就执行。也就是核心线程数量无限大。
DelayQueue:延迟队列,听名字也知道任务要延迟执行,这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务。
也就是说到了这里,咱们基本已经分析了线程池的几个核心:jdk自带线程池种类、线程池内的线程工厂(用于生产线程)、线程池任务执行、线程池任务排队、线程池队列类型。咱们总结一张图,能够结束本篇文章,固然其余类型的线程池具体实现,请自行查看源码。
思考:在Java开发中还有哪些相似的东西是这种操做的呢?