池化技术应该是最经常使用的提升程序性能的手段,包括线程池与数据库链接池,常量池等等java
建立与销毁线程是比较耗费时间的,不利于处理Java程序的高并发,所以引入线程池,也就是维护一组可用的线程,若是有任务,就当即将线程池的空闲线程分配给任务,提高性能,若是线程池内全部的线程都是忙状态的话,能够将任务放到任务队列,或者建立一个新的线程并放入线程池,用于处理新的任务git
使用线程池的好处程序员
下降资源消耗。经过重复利用已建立的线程下降线程建立和销毁形成的消耗。github
在《阿里巴巴 Java 开发手册》“并发处理”这一章节,明确指出线程资源必须经过线程池提供,不容许在应用中自行显示建立线程。spring
为何呢?docker
使用线程池的好处是减小在建立和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。若是不使用线程池,有可能会形成系统建立大量同类线程而致使消耗完内存或者“过分切换”的问题。数据库
提升响应速度。当任务到达时,任务能够不须要等待线程建立就能当即执行。编程
提升线程的可管理性。线程是稀缺资源,若是无限制的建立,不只会消耗系统资源,还会下降系统的稳定性,使用线程池能够进行统一的分配,调优和监控。(最原始粗放的服务器实现就是请求绑定一个套接字后就新开一个线程去处理,若是请求量巨大的时候,服务器是确定要崩的,由于缺少对线程资源的管理)数组
线程池监控的方法:缓存
SpringBoot 中的 Actuator
组件
经过ThreadPoolExecutor
的自有接口获取线程池信息
线程池通常用于执行多个不相关联的耗时任务,没有多线程的状况下,任务顺序执行,使用了线程池的话可以让多个不相关联的任务同时执行。
举个项目中实际使用的例子:
实际使用时要注意的通常规则
使用线程池,而不是建立单个线程
使用ThreadPoolExecutor
构造函数而不是Executors
工具类,下文有具体的解释
显式的定义线程池名字,以业务名字做区分,便于定位问题
可使用自定义的ThreadFactory
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/** * 线程工厂,它设置线程名称,有利于咱们定位问题。 */
public final class NamingThreadFactory implements ThreadFactory {
private final AtomicInteger threadNum = new AtomicInteger();
private final ThreadFactory delegate;
private final String name;
/** * 建立一个带名字的线程池生产工厂 */
public NamingThreadFactory(ThreadFactory delegate, String name) {
this.delegate = delegate;
this.name = name; // TODO consider uniquifying this
}
@Override
public Thread newThread(Runnable r) {
Thread t = delegate.newThread(r);
t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
return t;
}
}
复制代码
使用guava的ThreadFactoryBuilder
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "-%d")
.setDaemon(true).build();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory)
复制代码
不一样的业务使用不一样的线程池
有依赖关系的任务在使用同一个线程池在稍高的并发情况下可能会出现一种逻辑上的死锁,大概来讲就是父任务A中调用了子任务B,父任务与子任务共用一个线程池,当父任务占据了所有的核心线程资源,而且子任务仍未执行时,没法退出对核心线程的占用,而与此同时子任务只能堆积在任务队列中,没法得到线程资源,若是又使用了无界队列的话,则会一直堆积直到OOM,具体的参考线程池运用不当的一次线上事故
Executor 框架是 Java5 以后引进的,在 Java 5 以后,经过 Executor 来启动线程比使用Thread
的 start
方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免this
逃逸问题。
补充:this 逃逸是指在构造函数返回以前其余线程就持有该对象的引用. 调用还没有构造彻底的对象的方法可能引起使人疑惑的错误,若是用
volatile
修饰的话应该就能解决这个问题了,不知道Executor框架的出现是如何有助于解决此问题的呢?---不是很清楚
Executor 框架不只包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单
实际上在Executor框架中,还有一个线程池ForkJoinPool
可能用的不太多,此类继承AbstractExecutorService
,文章末尾会介绍到
除了说Executor框架,还有一种说法就是JUC框架
,也就是java.util.concurrent
这个包下的全部的多线程相关类的总称
向线程池提交任务有两种方法:
execute
方法
Runnable
的任务,不提供返回值,源码分析见下文(做为线程池的入口必定是要仔细分析的)submit
方法
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
复制代码
ThreadPoolExecutor
没有实现本身的submit方法,而是沿用的父类AbstractExecutorService
的实现
接受Runnable
或Callable
的任务,并提供Future
类型返回值
submit
内部将传入的任务统一封装为RunnableFuture
类型,此类型实现了Runnable
与Future
接口,老缝合怪了~
不一样之处就在于传入Runnable
的任务获得的Future
可能没法获得有效的返回值,而Callable
的任务可以获得返回结果
提交Runnable
任务时也能够指定一个返回结果,做为Future
的返回结果,可是这个结果显然并非任务执行完成的返回值,而是程序员事先传入的值,其做用相似因而一个flag值
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
复制代码
Runnable
任务会被转换为Callable
类型,若是有传入预期的返回值,call
函数中就会原封不动的返回,可是若是没有传入,就是返回null了submit
内部实际上仍然调用了execute
方法
此处补充Callable
与Runnable
的差别:
补充Future接口的做用
isDone
判断任务是否执行完get方法得到
执行结果
shutdown方法
关闭线程池,线程池的状态变为 SHUTDOWN
。线程池再也不接受新任务了,可是队列里的任务得执行完毕。
执行shutdown
方法后,能够执行awaitTermination
方法,则会等待指定的时间让线程池关闭,若在指定时间内关闭则返回true,不然false
shutdown源码分析
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
// 上锁
mainLock.lock();
try {
// 判断调用者是否有权限shutdown线程池
checkShutdownAccess();
// CAS 设置线程池状态为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断全部空闲线程
interruptIdleWorkers();
// 钩子函数
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
// 解锁
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
}
复制代码
shutdownNow
方法 闭线程池,线程的状态变为 STOP
。线程池会终止当前正在运行的任务,并中止处理排队的任务并返回正在等待执行的任务列表。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
// 上锁
mainLock.lock();
try {
// 判断调用者是否有权限shutdown线程池
checkShutdownAccess();
// CAS 设置线程池状态为STOP
advanceRunState(STOP);
// 中断全部线程
interruptWorkers();
// 从队列中获取剩余的未执行的工做列表
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
// 返回未执行的任务列表
return tasks;
}
复制代码
interruptWorkers
的解析放到了后文中使用以下两个方法来判断线程池是否彻底关闭
isTerminated
() 当调用 shutdown()
方法后,而且全部提交的任务完成后返回为 true,或者是执行shutdownNow
后,线程池内的线程所有被中断,工做线程数量为0后返回trueisShutdown()
当调用 shutdown()
方法后返回为 true。Runnable
或者 Callable
接口的任务对象。Runnable
/Callable
接口的 对象直接交给 execute
执行: ExecutorService.execute(Runnable command)
)或者也能够把 Runnable
对象或Callable
对象提交给 submit
执行(ExecutorService.submit(Runnable task)
或 ExecutorService.submit(Callable <T> task)
)。ExecutorService.submit(…)
,ExecutorService
将返回一个实现Future
接口的对象(刚刚也提到过了执行 execute()
方法和 submit()
方法的区别,submit()
会返回一个 FutureTask
对象FutureTask.get()
方法来等待任务执行完成。主线程也能够执行 FutureTask.cancel(boolean mayInterruptIfRunning)
来取消此任务的执行。Executors
工具类,能够建立普通的线程池与能够执行定时任务的线程池,可是简单的建立方法意味着封装的程度高,就会致使自由度低,甚至有一些风险固定线程数量的线程池
该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中如有空闲线程,则当即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
// 默认任务队列的长度是Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
复制代码
maximumPoolSize
将是事实上的无效参数,由于不可能存在任务队列满的状况(能够将任务队列视做系统内最大,因此不用设置最大线程数,由于再多的任务也彻底能够缓存在队列中)。因此,经过建立 FixedThreadPool
的源码能够看出建立的 FixedThreadPool
的 corePoolSize
和 maximumPoolSize
被设置为同一个值。
keepAliveTime
将是一个无效参数(由于不会有核心线程以外的其他线程)(固然,若是空闲核心线程被容许超时回收的话,就是有用的了,便是,若是空闲就会当即展开回收)corePoolSize
后,新任务将在无界队列中等待,所以线程池中的线程数不会超过 corePoolSize;因此一旦corePoolSize设置不对的话,将会有大量任务干等着,而且性能也没有彻底发挥仅有一个线程的线程池
能够视为是固定线程数量线程池的特值状况,即nThreads为1的状况
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
// 使用包装类包装过的,用来保证:
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
复制代码
动态分配线程数量的线程池
该方法返回一个可根据实际状况调整线程数量的线程池。线程池的线程数量不肯定,但如有空闲线程能够复用,则会优先使用可复用的线程。若全部线程均在工做,又有新的任务提交,则会建立新的线程处理任务。全部线程在当前任务执行完毕后,将返回线程池进行复用。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
复制代码
SynchronousQueue.offer(Runnable task)
提交任务到任务队列。若是当前线程池中有闲线程正在执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
,那么主线程执行 offer 操做与空闲线程执行的 poll
操做配对成功,主线程把任务交给空闲线程执行,execute()
方法执行完成。SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
。这种状况下,offer方法将返回false,此时 CachedThreadPool
会建立新线程执行任务,execute 方法执行完成;Integer.MAX_VALUE
,这意味着线程池能够不受控的一直接受任务,直到栈空间OOMpublic static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
复制代码
虽然队列使用的是有界队列,可是最大线程数量是Integer.MAX_VALUE
,这意味着线程池能够不受控的一直接受任务,直到栈空间OOM
须要注意的是,尽管ScheduledExecutorService是内部调用了父类ThreadPoolExecutord的构造方法,可是其内部实现的核心入口方法再也不是ThreadPoolExecutor的execute方法,而是ScheduledThreadPoolExecutor中的delayExecute方法
定时任务的实现依赖于延迟队列DelayedWorkQueue
能够发现执行定时任务可使用springboot中的@Scheduled注解,也可使用底层的定时任务线程池,实际上本线程池基本不会用,由于实现定时任务有其余的方案,好比springboot的注解与quartz等等
备注: Quartz 是一个由 java 编写的任务调度库,由 OpenSymphony 组织开源出来。在实际项目开发中使用 Quartz 的仍是居多,比较推荐使用 Quartz。由于 Quartz 理论上可以同时对上万个任务进行调度,拥有丰富的功能特性,包括任务调度、任务持久化、可集群化、插件等等
一次性的延迟任务 schedule
方法
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
复制代码
ScheduledThreadPoolExecutor
重写了execute
与submit
方法,两个方法内部实际上都是简单地调用schedule
方法来实现的以上一次任务开始为基准固定间隔循环执行任务 scheduleAtFixedRate
方法
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
复制代码
以上一次任务结束为基准固定间隔循环执行任务 scheduleWithFixedDelay
方法
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
复制代码
后二者的区别见图
引出两个问题:
上边说过了,定时任务线程池的核心入口就是上边三种类型的任务方法中都有的一个方法--就是delayedExecute
,可是在说这个关键的入口方法以前,不得说下,调用方法前对于提交的任务的包装,包装这一块设计到的类比较多,先用一张类图大体把握
首先包装为ScheduledFutureTask
// 用于包装schedule(Runnable)提交的任务
// result为null,ns是纳秒为单位的,要触发执行任务的系统时间
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
// 包装scheduleWithFixedDelay和scheduleAtFixedRate提交的任务
// result 为null
// ns是纳秒为单位的,下一次要触发执行任务的系统时间
// period是以纳秒为单位的任务循环周期
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
// 包装schedule(Callable)提交的任务
// ns是纳秒为单位的,要触发执行任务的系统时间
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
// 关键的run方法
public void run() {
// 首先判断是否是周期性执行的任务
boolean periodic = isPeriodic();
// 判断当前的线程池可否执行定时任务,若是不能则取消任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
// 若是不是周期性任务,也就是一次性的定时任务的话,直接执行提交的任务
ScheduledFutureTask.super.run();
// 若是是周期性执行的任务,首先执行提交的任务,并将任务的状态重置为初始化状态,以备下一次执行
else if (ScheduledFutureTask.super.runAndReset()) {
// 执行完毕后计算下一次执行的时间
setNextRunTime();
// 从新提交当前的任务到延时队列中,用于下一个周期的执行
reExecutePeriodic(outerTask);
}
}
// 计算下一次要执行任务的时间
// time表示下一次执行任务的时间,period是用来计算time的周期时间
private void setNextRunTime() {
long p = period;
if (p > 0)
// scheduleAtFixedRate
// 在第一次执行完任务后,下一次要执行的时间就是彻底按照周期来执行,无论到底何时执行完的(也就是now),以后的每次执行都是如此
time += p;
else
// scheduleWithFixedDelay
// 第一次执行完任务后,下一次要执行的时间是以当前时间为基准计算的,也就是上一次完成任务的时间为基准计算的,以后的每次执行都是如此
time = triggerTime(-p);
}
// 用于在延迟队列中按照下一次触发的顺序进行排序
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
// 触发时间一致的,按照提交的顺序来
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
// 计算从当前时刻到下次执行任务还有多长时间
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
复制代码
scheduleWithFixedDelay
与scheduleAtFixedRate
在实现时的区别就在于这次包装过程当中,前者传入的周期是unit.toNanos(-delay)
然后者是unit.toNanos(perioid)
setNextRunTime
方法中,详见方法注释period
以外还有一个区别就是outerTask
reExecutePeriodic
方法getDelay
方法最主要的应用就是在延时队列的take
poll
这两个获取任务的方法中,起到了控制获取任务的时间的做用
getDelay
方法来控制获取任务的时延--这两个特性是直观上的延迟任务线程池起做用的关键其次包装为RunnableScheduleFuture
protected <V> RunnableScheduledFuture<V> decorateTask( Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
protected <V> RunnableScheduledFuture<V> decorateTask( Callable<V> callable, RunnableScheduledFuture<V> task) {
return task;
}
复制代码
RunnableScheduledFuture
,可是没有看懂为何要用这样的一个方法类型提高定时任务线程池的入口方法delayedExecute
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 1. 判断线程池是否是shutdown状态,若是是执行拒绝策略
if (isShutdown())
reject(task);
else {
// 2. 首先就是向DelayedWorkQueue中添加任务
super.getQueue().add(task);
// 3. 无论是通常的线程池仍是执行定时任务的线程池,都会在向队列中添加完任务后执行re-check
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 4. 若是经过了recheck,执行此方法
// 确保线程池内有线程运行
ensurePrestart();
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
// 对于Executors建立的线程池来讲,核心线程数量为0,因此会保证有非核心线程执行
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
复制代码
SHUTDOWN
的话,直接向队列中添加任务,而没有直接让线程去执行任务的场景从addWorker
开始,后续的就是标准的线程池的线程管理与任务获取的流程了,也就是说定时任务线程池与通常线程池的主要区别在于任务调度部分,而链接任务管理与线程管理的通道--延时队列也须要大体了解下
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
// 任务调度时提交任务的方法就是add方法
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
// 按照排序规则,选择合适的位置插入到队列中
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
// 按照排序规则,选择合适的位置插入到队列中
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
// 按照RunnableScheduledFuture的time属性进行排序
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
// getTask中,核心线程取任务(无超时时间)
// 若是当前不能获取,就阻塞等待
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
// 调用getDelay方法获得须要延时等待的时间
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
// getTask中,非核心线程取任务或则核心线程获取任务(容许超时回收)
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
}
复制代码
DelayedWorkQueue
的内部存储是RunnableScheduledFuture
类型的数组