DelayQueue 的类签名和继承结构以下:安全
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {}
app
DelayQueue 中的元素要实现 Delayed 接口,该接口定义以下:ide
public interface Delayed extends Comparable<Delayed> {
/**
* 以给定的时间单位,返回该对象的剩余延迟
* 若为零或者负数表示延时已通过去
*/
long getDelay(TimeUnit unit);
}
源码分析
Comparable 接口也只有一个 compareTo 方法:flex
public interface Comparable<T> {
public int compareTo(T o);
}
ui
DelayQueue 有两个构造器,以下:this
// 无参构造器
public DelayQueue() {}
// 指定集合的构造器
public DelayQueue(Collection<? extends E> c) {
// 该方法最后是经过 add 方法实现的,后文进行分析
this.addAll(c);
}
spa
// 锁,用于保证线程安全
private final transient ReentrantLock lock = new ReentrantLock();
// 优先队列,实际存储元素的地方
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 线程等待的标识
private Thread leader = null;
// 触发条件,表示是否能够从队列中读取元素
private final Condition available = lock.newCondition();
线程
DelayQueue 也是一个队列,它的入队方法有:add(E), offer(E), put(E) 等,它们的定义以下:3d
public boolean add(E e) {
return offer(e);
}
public void put(E e) {
offer(e);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
这几个方法都是经过 offer(E) 方法实现的,它的代码以下:
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 入队
q.offer(e);
// 若该元素为队列头部元素,唤醒等待的线程
// (表示能够从队列中读取数据了)
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
有入队天然也有出队,主要方法有:poll(), take(), poll(timeout, unit), 以下:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取队列头部元素
E first = q.peek();
// 头部元素为空,或者延时未到,则返回空
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
// 不然返回头部元素
else
return q.poll();
} finally {
lock.unlock();
}
}
poll 方法是非阻塞的,即调用以后不管元素是否存在都会当即返回。下面看下阻塞的 take 方法:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 以可中断方式获取锁
lock.lockInterruptibly();
try {
// 无限循环
for (;;) {
// 获取队列头部元素
E first = q.peek();
// 若为空,则等待
if (first == null)
available.await();
// 若不为空
else {
// 获取延迟的纳秒数,若小于等于零(即过时),则获取并删除头部元素
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
// 执行到这里,表示 delay>0,也就是延时未过时
first = null; // don't retain ref while waiting
// leader 不为空表示有其余线程在读取数据,当前线程等待
if (leader != null)
available.await();
else {
// 将当前线程设置为 leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待延迟时间过时
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 唤醒该条件下的其余线程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
take 方法是阻塞操做,当条件不知足时会一直等待。另外一个 poll(timeout, unit) 方法和它有些相似,只不过带有延时,以下:
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 以可中断方式获取锁
lock.lockInterruptibly();
try {
// 无限循环
for (;;) {
// 获取队列的头部元素
E first = q.peek();
// 若头部元素为空(即队列为空),当超时时间大于零则等待相应的时间;
// 不然(即超时时间小于等于零)返回空
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
// 执行到这里表示队列头部元素不为空
// 获取剩余延时
long delay = first.getDelay(NANOSECONDS);
// 延时已过时,返回队列头部元素
if (delay <= 0)
return q.poll();
// 延时未过时且等待超时,返回空
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
// 延时未过时且等待未超时,且等待超时<延迟时间
// 表示有其余线程在取数据,则当前线程进入等待
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
// 没有其余线程等待,将当前线程设置为 leader,相似于“独占”操做
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
// 计算剩余延迟时间
nanos -= delay - timeLeft;
} finally {
// 该线程操做完毕,把 leader 置空
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 唤醒 available 条件下的一个其余线程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
此外还有一个 peek 方法,该方法虽然也能获取队列头部的元素,但与以上出队方法不一样的是,peek 方法只是读取队列头部元素,并不会将其删除:
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 返回队列的头部元素(不删除)
return q.peek();
} finally {
lock.unlock();
}
}
自定义一个实现了 Delayed 接口的 Task 类,并将它的几个对象添加到一个延迟队列中,代码以下:
public class TestDelayedQueue {
public static void main(String[] args) throws Exception {
BlockingQueue<Task> delayQueue = new DelayQueue<>();
long now = System.currentTimeMillis();
delayQueue.put(new Task("c", now + 6000));
delayQueue.put(new Task("d", now + 10000));
delayQueue.put(new Task("a", now + 3000));
delayQueue.put(new Task("b", now + 4000));
while (true) {
System.out.println(delayQueue.take());
TimeUnit.SECONDS.sleep(1);
}
}
private static class Task implements Delayed {
private String taskName;
private long endTime;
public Task(String taskName, long endTime) {
this.taskName = taskName;
this.endTime = endTime;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "taskName-->" + taskName;
}
}
}