做为阻塞队列的一员,DelayQueue(延迟队列)因为其特殊含义而使用在特定的场景之中,主要在于Delay这个词上,那么其内部是如何实现的呢?今天一块儿经过DelayQueue的源码来看一看其是如何完成Delay操做的java
JDK版本号:1.8.0_171
DelayQueue内部经过优先级队列PriorityQueue来实现队列元素的排序操做,以前已经介绍过PriorityBlockingQueue的源码实现,二者比较相似,可自行回顾下,既然用到了优先级队列,则须要保证其队列元素的可比较性,以及延迟队列的特性(可计算延迟时间,经过延迟时间进行比较排序),故这里其中的队列元素须要实现Delayed接口,DelayQueue主要就在于理解这两部份内容数组
下面示例代码部分已经显示了DelayQueue的用法,从名字命名上也能理解出其含义,延迟队列,主要在于延迟消费,如何实现呢?这里就须要用到Delayed接口,后面会进行说明,在使用时须要实现Delayed接口和compareTo接口安全
本身能够先试试运行结果,理解看看,能够看下调用poll和take的结果。若是用过rocketmq,能够类比其中的延迟消息队列,等到规定的时间再进行消费,只不过mq中的实现要比这复杂多线程
public class TestDelayQueue { public static void main(String[] args) throws InterruptedException { DelayQueue<DelayItem> delayQueue = new DelayQueue(); // 20s后 delayQueue.add(new DelayItem(20, "aaaaaa")); // 10秒后 delayQueue.add(new DelayItem(10, "bbbbbb")); // 30秒后 delayQueue.add(new DelayItem(30, "cccccc")); while (0 < delayQueue.size()) { Thread.sleep(1000); DelayItem d = delayQueue.poll(); // DelayItem d = delayQueue.take(); System.out.println(null != d ? d.getItem() : "null"); } } static class DelayItem implements Delayed { private long delayTime; private String item; public DelayItem(long delayTime, String item) { super(); // 当前时间 LocalDateTime localDateTime = LocalDateTime.now(); this.delayTime = localDateTime.getSecond() + delayTime; this.item = item; } @Override public long getDelay(TimeUnit unit) { LocalDateTime localDateTime = LocalDateTime.now(); return unit.convert(delayTime - localDateTime.getSecond(), TimeUnit.SECONDS); } @Override public int compareTo(Delayed o) { return this.getDelay(TimeUnit.SECONDS) - o.getDelay(TimeUnit.SECONDS) < 0 ? -1 : 1; } public String getItem() { return item; } } }
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>
首先要说明的是Delayed接口,类定义部分也已经明确指出其使用(E extends Delayed),咱们在操做时放入DelayQueue队列元素必须实现这个接口,实现其中的getDelay方法和compareTo方法,在使用示例代码部分我也说明了这两个方法的做用ide
public interface Delayed extends Comparable<Delayed> { /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit); }
其中使用了PriorityQueue来完成有序出队操做,与以前讲解过的PriorityBlockingQueue相似,有些许不一样,可自行参考源码部分,也能够去看我以前的一篇专门讲解PriorityBlockingQueue源码的文章,主要异同在于PriorityQueue是非线程安全的,而PriorityBlockingQueue是线程安全的,内部排序机制使用的都是堆排序this
若是你了解过PriorityQueue或PriorityBlockingQueue则在这里使用这个类是很容易理解源码实现人员的目的的,建议先去了解其实现,要不直接看这个源码比较有难度spa
因为须要实现延迟队列,使用PriorityQueue根据时间排序(自行实现具体细节,例如上边示例根据时间来排序),经过Delayed接口限制使用DelayQueue的场景线程
/** * 可重入锁ReentrantLock */ private final transient ReentrantLock lock = new ReentrantLock(); /** * 内部使用PriorityQueue来完成DelayQueue的操做 */ private final PriorityQueue<E> q = new PriorityQueue<E>(); /** * leader线程 * 指定了用于等待队列元素出队的线程 * 若是非空,则这个线程能够阻塞等待一段时间(时间经过计算得到),其余线程则无限等待 * 避免其余线程没必要要的等待 * 这个线程等待一段时间而后出队操做,其余线程则无限等待, * 若是等待过程当中入队了过时时间更短的元素(优先级队列堆顶元素变化),则会重置leader为null,并会唤醒等待的线程去争抢leader来获取执行出队的权利 */ private Thread leader = null; /** * Condition对象完成线程等待和唤醒任务 */ private final Condition available = lock.newCondition();
构造方法比较简单,无参构造没有进行任何操做,有参构造方法直接传入对应类型的集合,循环add放入队列code
public DelayQueue() {} public DelayQueue(Collection<? extends E> c) { this.addAll(c); }
入队操做,先得到lock,以后经过优先级队列的offer方法完成入队,同时判断是否要重置leader对象
/** * Inserts the specified element into this delay queue. * * @param e the element to add * @return {@code true} * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); // 此节点为当前优先级队列堆顶节点,即0的索引位置 // 即这次添加的节点即为下次要获取的堆顶节点(出队节点) // 若是非堆顶节点则表示堆顶节点未变化则不要重置leader if (q.peek() == e) { // leader线程置空,让出队线程争抢leader优先执行权 leader = null; // 唤醒阻塞的线程 available.signal(); } return true; } finally { lock.unlock(); } }
出队操做,先得到lock,以后经过优先级队列的poll方法完成出队,固然须要判断堆顶元素是否已到期。等待超时方法较为复杂,需耐心理解
/** * Retrieves and removes the head of this queue, or returns {@code null} * if this queue has no elements with an expired delay. * * @return the head of this queue, or {@code null} if this * queue has no elements with an expired delay */ public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { // 堆顶元素 E first = q.peek(); // 堆为空或者堆顶元素延迟时间还未到期则返回null,不然经过poll出队 if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } } /** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue, * or the specified wait time expires. * * @return the head of this queue, or {@code null} if the * specified waiting time elapses before an element with * an expired delay becomes available * @throws InterruptedException {@inheritDoc} */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { // 时间转成纳秒 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // 堆顶元素 E first = q.peek(); // 空表示队列为空 if (first == null) { // 等待时间小于等于0则直接返回null,不然就阻塞等待nanos时间 if (nanos <= 0) return null; else // 若是中途被唤醒则更新nanos,剩余等待时间 nanos = available.awaitNanos(nanos); } else { // 堆顶元素的延迟时间 long delay = first.getDelay(NANOSECONDS); // 延迟时间到期直接出队操做 if (delay <= 0) return q.poll(); // 延迟时间未到期直接返回null if (nanos <= 0) return null; // 延迟时间未到期同时设置了超时时间进入下面进行处理 // 处于等待状态不要引用first first = null; // don't retain ref while waiting // 超时时间小于延迟时间或者leader非空阻塞等待nanos // 超时时间小于延迟时间则当前线程最多等待nanos超时时间便可 // leader非空则代表其余线程已经得到优先执行权,最多等待nanos超时时间便可 // 在等待中有可能被唤醒再此循环执行 if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { // 超时时间大于延迟时间同时leader线程为空进入下面处理 Thread thisThread = Thread.currentThread(); // 先设置leader线程获取执行权 leader = thisThread; try { // 阻塞等待delay便可出队操做 // 万一等待过程当中被唤醒则经过剩余等待时间循环判断处理 // 有可能在等待中入队了延迟时间更短的元素,此时需释放leader从新争抢优先执行权 long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { // 释放leader执行权,从新争抢leader if (leader == thisThread) leader = null; } } } } } finally { // leader空且队列非空则唤醒其余阻塞的线程 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
出队操做,先得到lock,再经过判断最终执行poll完成出队操做,和poll的超时等待方法相似
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * * @return the head of this queue * @throws InterruptedException {@inheritDoc} */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) // 队列无数据则阻塞等待 available.await(); else { // 获取其堆顶元素延迟时间 long delay = first.getDelay(NANOSECONDS); // 延迟时间已到期,能够进行出队操做了 if (delay <= 0) return q.poll(); // 延迟时间还未到期 // 置空first,等待时间内去掉引用 first = null; // don't retain ref while waiting // leader线程非空,表示其余线程已经获取优先执行权,阻塞等待 if (leader != null) available.await(); else { // leader为空则指向当前线程,表示当前线程得到执行权 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 阻塞等待delay秒以后继续 // 也有可能新入队元素(堆顶元素变化时)被唤醒需从新获取leader执行权 available.awaitNanos(delay); } finally { // leader置空,释放优先执行权 if (leader == thisThread) leader = null; } } } } } finally { // leader空且队列非空则唤醒其余阻塞的线程 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
转移队列操做,内部是先经过peek方法先获取队列堆顶元素,判断其是否已到期,如到期则添加元素到新队列中,同时对原队列出队操做,固然,只转移已经到期的全部元素
/** * Returns first element only if it is expired. * Used only by drainTo. Call only when holding lock. * * 命名上彻底能了解其含义 * 获取队列中的堆顶元素,延迟时间还未到期则返回null * 被drainTo所使用,参照下面方法 * */ private E peekExpired() { // assert lock.isHeldByCurrentThread(); E first = q.peek(); return (first == null || first.getDelay(NANOSECONDS) > 0) ? null : first; } public int drainTo(Collection<? super E> c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; // 转移已过时的元素到新队列中 for (E e; (e = peekExpired()) != null;) { c.add(e); // In this order, in case add() throws. q.poll(); ++n; } return n; } finally { lock.unlock(); } } public int drainTo(Collection<? super E> c, int maxElements) { // 判空 if (c == null) throw new NullPointerException(); // 非本对象 if (c == this) throw new IllegalArgumentException(); // 长度判断 if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; // 转移已过时的元素到新队列中,最多转移maxElements个元素 for (E e; n < maxElements && (e = peekExpired()) != null;) { c.add(e); // In this order, in case add() throws. q.poll(); ++n; } return n; } finally { lock.unlock(); } }
其余方法如peek,size,clear,toArray,remove等都是经过优先级队列PriorityQueue来实现的,只是每次操做时须要先得到可重入锁保证线程安全
迭代器的实现不是很复杂,迭代器复制了队列中的全部元素,须要注意的是,迭代器中的remove方法会经过removeEQ方法直接删除原PriorityQueue队列中的元素,不是删除拷贝的数据元素
/** * * 本质上调用PriorityQueue.toArray * 将PriorityQueue的底层数组拷贝做为迭代器的array * 故这里保存了全部的元素,不只仅是已过时的元素 */ public Iterator<E> iterator() { return new Itr(toArray()); } /** * Snapshot iterator that works off copy of underlying q array. */ private class Itr implements Iterator<E> { // 保存PriorityQueue的数组 final Object[] array; // Array of all elements // 下次next返回的元素索引 int cursor; // index of next element to return // 上次返回的return元素索引 int lastRet; // index of last element, or -1 if no such Itr(Object[] array) { lastRet = -1; this.array = array; } public boolean hasNext() { return cursor < array.length; } @SuppressWarnings("unchecked") public E next() { if (cursor >= array.length) throw new NoSuchElementException(); lastRet = cursor; return (E)array[cursor++]; } // 删除元素,须要注意,会直接把原队列中的元素删除 public void remove() { if (lastRet < 0) throw new IllegalStateException(); removeEQ(array[lastRet]); lastRet = -1; } }
DelayQueue做为一个特殊的阻塞队列,主要在于Delay特性上,内部经过优先级阻塞队列和Delayed接口实现延迟的操做,若是以前已经了解了优先级队列,则很是容易理解其源码实现逻辑,复杂点的部分也就在于在多线程环境下入队一个新的更短的元素时内部作的处理,经过争抢leader来肯定优先出队的那个线程,作不一样的处理,比较有意思,能够参考文章多理解理解,不算过于复杂
以上内容若有问题欢迎指出,笔者验证后将及时修正,谢谢