JDK源码分析-DelayQueue

概述

DelayQueue 也是一种队列,它内部的元素有“延迟”,也就是当从队列中获取元素时,若是它的延迟时间未到,则没法取出。

DelayQueue 的类签名和承结构以下:安全

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>    implements BlockingQueue<E> {}app


下面分析其代码实现。

代码分析

相关接口

DelayQueue 中的元素要实现 Delayed 接口,该接口定义以下:ide

public interface Delayed extends Comparable<Delayed> {    /**     * 以给定的时间单位,返回该对象的剩余延迟     * 若为零或者负数表示延时已通过去     */    long getDelay(TimeUnit unit);}源码分析

Delayed 接口继承自 Comparable 接口,而它自己只定义了一个 getDelay 方法,该方法的做用是获取对象的剩余延迟时间。

Comparable 接口也只有一个 compareTo 方法:flex

public interface Comparable<T> {    public int compareTo(T o);}ui

这里再也不详述。

构造器

DelayQueue 有两个构造器,以下:this

// 无参构造器public DelayQueue() {}
// 指定集合的构造器public DelayQueue(Collection<? extends E> c) {    // 该方法最后是经过 add 方法实现的,后文进行分析    this.addAll(c);}
spa


成员变量


// 锁,用于保证线程安全private final transient ReentrantLock lock = new ReentrantLock();
// 优先队列,实际存储元素的地方private final PriorityQueue<E> q = new PriorityQueue<E>();
// 线程等待的标识private Thread leader = null;
// 触发条件,表示是否能够从队列中读取元素private final Condition available = lock.newCondition();线程

关于优先队列可参考前文「JDK源码分析-PriorityQueue」的分析。

入队方法

DelayQueue 也是一个队列,它的入队方法有:add(E), offer(E), put(E) 等,它们的定义以下:3d

public boolean add(E e) {    return offer(e);}
public void put(E e) {    offer(e);}
public boolean offer(E e, long timeout, TimeUnit unit) {    return offer(e);}

这几个方法都是经过 offer(E) 方法实现的,它的代码以下:

public boolean offer(E e) {    final ReentrantLock lock = this.lock;    lock.lock();    try {        // 入队        q.offer(e);        // 若该元素为队列头部元素,唤醒等待的线程        // (表示能够从队列中读取数据了)        if (q.peek() == e) {            leader = null;            available.signal();        }        return true;    } finally {        lock.unlock();    }}


出队方法

有入队天然也有出队,主要方法有:poll(), take(), poll(timeout, unit), 以下:

public E poll() {    final ReentrantLock lock = this.lock;    lock.lock();    try {        // 获取队列头部元素        E first = q.peek();        // 头部元素为空,或者延时未到,则返回空        if (first == null || first.getDelay(NANOSECONDS) > 0)            return null;        // 不然返回头部元素        else            return q.poll();    } finally {        lock.unlock();    }}

poll 方法是非阻塞的,即调用以后不管元素是否存在都会当即返回。下面看下阻塞的 take 方法:

public E take() throws InterruptedException {    final ReentrantLock lock = this.lock;    // 以可中断方式获取锁    lock.lockInterruptibly();    try {        // 无限循环        for (;;) {            // 获取队列头部元素            E first = q.peek();            // 若为空,则等待            if (first == null)                available.await();            // 若不为空            else {                // 获取延迟的纳秒数,若小于等于零(即过时),则获取并删除头部元素                long delay = first.getDelay(NANOSECONDS);                if (delay <= 0)                    return q.poll();                // 执行到这里,表示 delay>0,也就是延时未过时                first = null; // don't retain ref while waiting                // leader 不为空表示有其余线程在读取数据,当前线程等待                if (leader != null)                    available.await();                else {                    // 将当前线程设置为 leader                    Thread thisThread = Thread.currentThread();                    leader = thisThread;                    try {                        // 等待延迟时间过时                        available.awaitNanos(delay);                    } finally {                        if (leader == thisThread)                            leader = null;                    }                }            }        }    } finally {        // 唤醒该条件下的其余线程        if (leader == null && q.peek() != null)            available.signal();        lock.unlock();    }}

该方法看起来稍复杂,主要逻辑以下:
1. 获取队列头部元素;
    1.1 若该元素为空(队列为空),则当前线程等待;
    1.2 若该元素不为空,且已通过期,则取出该元素(并移除);
        1.2.1 若未过时,且有其余线程在操做(leader 不为空),当前线程等待;
        1.2.2 若未过时,且没有其余线程操做,则占有“操做权”(将 leader 设置为当前线程),并等待延迟过时。
以上操做循环执行。

take 方法是阻塞操做,当条件不知足时会一直等待。另外一个 poll(timeout, unit) 方法和它有些相似,只不过带有延时,以下:

public E poll(long timeout, TimeUnit unit) throws InterruptedException {        long nanos = unit.toNanos(timeout);        final ReentrantLock lock = this.lock;        // 以可中断方式获取锁        lock.lockInterruptibly();        try {            // 无限循环            for (;;) {                // 获取队列的头部元素                E first = q.peek();                // 若头部元素为空(即队列为空),当超时时间大于零则等待相应的时间;                //   不然(即超时时间小于等于零)返回空                if (first == null) {                    if (nanos <= 0)                        return null;                    else                        nanos = available.awaitNanos(nanos);                } else {                    // 执行到这里表示队列头部元素不为空                    // 获取剩余延时                    long delay = first.getDelay(NANOSECONDS);                    // 延时已过时,返回队列头部元素                    if (delay <= 0)                        return q.poll();                    // 延时未过时且等待超时,返回空                    if (nanos <= 0)                        return null;                    first = null; // don't retain ref while waiting                    // 延时未过时且等待未超时,且等待超时<延迟时间                    // 表示有其余线程在取数据,则当前线程进入等待                    if (nanos < delay || leader != null)                        nanos = available.awaitNanos(nanos);                    else {                        // 没有其余线程等待,将当前线程设置为 leader,相似于“独占”操做                        Thread thisThread = Thread.currentThread();                        leader = thisThread;                        try {                            long timeLeft = available.awaitNanos(delay);                            // 计算剩余延迟时间                            nanos -= delay - timeLeft;                        } finally {                            // 该线程操做完毕,把 leader 置空                            if (leader == thisThread)                                leader = null;                        }                    }                }            }        } finally {            // 唤醒 available 条件下的一个其余线程            if (leader == null && q.peek() != null)                available.signal();            lock.unlock();        }    }

take 和 poll 方法还有一个区别: 当延迟未过时时,take 方法会一直等待,而 poll 方法则会返回空。

此外还有一个 peek 方法,该方法虽然也能获取队列头部的元素,但与以上出队方法不一样的是,peek 方法只是读取队列头部元素,并不会将其删除:

public E peek() {    final ReentrantLock lock = this.lock;    lock.lock();    try {        // 返回队列的头部元素(不删除)        return q.peek();    } finally {        lock.unlock();    }}

以上就是 DelayQueue 的主要方法的代码分析,为便于理解,下面简要举例分析。

用法举例

示例代码:

自定义一个实现了 Delayed 接口的 Task 类,并将它的几个对象添加到一个延迟队列中,代码以下:

public class TestDelayedQueue {    public static void main(String[] args) throws Exception {        BlockingQueue<Task> delayQueue = new DelayQueue<>();        long now = System.currentTimeMillis();        delayQueue.put(new Task("c", now + 6000));                delayQueue.put(new Task("d", now + 10000));        delayQueue.put(new Task("a", now + 3000));        delayQueue.put(new Task("b", now + 4000));                while (true) {            System.out.println(delayQueue.take());            TimeUnit.SECONDS.sleep(1);        }    }
   private static class Task implements Delayed {        private String taskName;        private long endTime;
       public Task(String taskName, long endTime) {            this.taskName = taskName;            this.endTime = endTime;        }
        @Override        public long getDelay(TimeUnit unit) {            return unit.convert(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);        }
        @Override        public int compareTo(Delayed o) {            return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));        }
        @Override        public String toString() {            return "taskName-->" + taskName;        }    }}

结果会以延迟时间的顺序取出各个元素。

小结

1. DelayQueue 是一种队列,同时实现了 BlockingQueue 接口;
2. 它内部的元素有延迟时间的概念,出队时,若延时未到,则没法读取到队列头部的元素;
3. 它是线程安全的。

相关阅读:
JDK源码分析-PriorityQueue
JDK源码分析-BlockingQueue


相关文章
相关标签/搜索