JUC源码分析-集合篇(八)DelayQueue

JUC源码分析-集合篇(八)DelayQueue

DelayQueue 是一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。 队列中的元素必须实现 Delayed 接口,在建立元素时能够指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。java

1. DelayQueue 使用场景

1.1 DelayQueue 特色

DelayQueue 也是一种比较特殊的阻塞队列,从类声明也能够看出,DelayQueue 中的全部元素必须实现 Delayed 接口。DelayQueue 队列的元素必须实现 Delayed 接口。segmentfault

// 此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。
public interface Delayed extends Comparable<Delayed> {
    // 返回与此对象相关的剩余有效时间,以给定的时间单位表示
    long getDelay(TimeUnit unit);
}

能够看到,Delayed 接口除了自身的 getDelay 方法外,还实现了 Comparable 接口。getDelay 方法用于返回对象的剩余有效时间,实现 Comparable 接口则是为了可以比较两个对象,以便排序。设计模式

也就是说,若是一个类实现了 Delayed 接口,当建立该类的对象并添加到 DelayQueue 中后,只有当该对象的 getDalay 方法返回的剩余时间 ≤0 时才会出队。缓存

另外,因为 DelayQueue 内部委托了 PriorityQueue 对象来实现全部方法,因此能以堆的结构维护元素顺序,这样剩余时间最小的元素就在堆顶,每次出队其实就是删除剩余时间 ≤0 的最小元素。多线程

DelayQueue 的特色简要归纳以下:框架

  • DelayQueue 是无界阻塞队列;
  • 队列中的元素必须实现 Delayed 接口,元素过时后才会从队列中取走;

1.2 DelayQueue 使用场景

DelayQueue 很是有用,能够将 DelayQueue 运用在如下应用场景。ide

  1. 缓存系统的设计:能够用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。
  2. 定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从 DelayQueue 中获取到任务就开始执行,好比 javax.swing.TimerQueue 就是使用 DelayQueue 实现的。ScheduledFutureTask

1.3 DelayQueue 示例

咱们能够参考 ScheduledThreadPoolExecutor#ScheduledFutureTask 类的实现。源码分析

// 模仿网吧上网场景
public class DelayQueueTest extends Thread {
    DelayQueue queue =  new DelayQueue();
    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");

    public static void main(String[] args) {
        DelayQueueTest wangba = new DelayQueueTest();
        wangba.start();

        wangba.shangji("A", 5);
        wangba.shangji("B", 2);
        wangba.shangji("C", 4);
    }

    public void shangji(String name, int money) {
        WangMing wm = new WangMing(name, System.currentTimeMillis() + money * 1000l);
        queue.add(wm);
        System.out.println(name + "开始上网,时间:" + format.format(new Date()) +
                ",预计下机时间为:" + format.format(new Date(wm.getEndTime())));
    }

    public void xiaji(WangMing wm) {
        System.out.println(wm.getName() + "下机,时间:" + format.format(new Date(wm.getEndTime())));
    }

    public void run() {
        while (true) {
            try {
                WangMing wm = (WangMing) queue.take();
                xiaji(wm);
            } catch (InterruptedException e) {
            }
        }
    }
}

// 网民,必须实现 Delayed 接口
class WangMing implements Delayed {
    private String name;
    private long endTime;
    private TimeUnit timeUnit = TimeUnit.SECONDS;

    @Override
    public long getDelay(TimeUnit unit) {
        return endTime - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        WangMing wm = (WangMing) o;
        return this.getDelay(timeUnit) - wm.getDelay(timeUnit) > 0 ? 1 :
                (this.getDelay(timeUnit) - wm.getDelay(timeUnit) < 0 ? -1 : 0);
    }
}

程序执行结果:性能

A开始上网,时间:2017-12-07 09:37:52,预计下机时间为:2017-12-07 09:37:57
B开始上网,时间:2017-12-07 09:37:52,预计下机时间为:2017-12-07 09:37:54
C开始上网,时间:2017-12-07 09:37:52,预计下机时间为:2017-12-07 09:37:56
B下机,时间:2017-12-07 09:37:54
C下机,时间:2017-12-07 09:37:56
A下机,时间:2017-12-07 09:37:57

2. DelayQueue 源码分析

介绍完了 DelayQueued 的基本使用,读者应该对该阻塞队列的功能有了基本了解,接下来咱们看下 Doug Lea 是如何实现 DelayQueued 的。this

2.1 DelayQueue 属性

private final transient ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();

// PriorityQueue 维护队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;

上述比较特殊的是 leader 字段,咱们以前已经说过,DelayQueue 每次只会出队一个过时的元素,若是队首元素没有过时,就会阻塞出队线程,让线程在 available 这个条件队列上无限等待。

为了提高性能,DelayQueue 并不会让全部出队线程都无限等待,而是用 leader 保存了第一个尝试出队的线程,该线程的等待时间是队首元素的剩余有效期。这样,一旦 leader 线程被唤醒(此时队首元素也失效了),就能够出队成功,而后唤醒一个其它在 available 条件队列上等待的线程。以后,会重复上一步,新唤醒的线程可能取代成为新的 leader 线程。这样,就避免了无效的等待,提高了性能。这实际上是一种名为 Leader-Follower pattern 的多线程设计模式。

2.2 入队 offer

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);             // 调用 PriorityQueue#offer 方法
        if (q.peek() == e) {    // 若是入队元素在队首, 则唤醒一个出队线程
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

须要注意的是当首次入队元素时,须要唤醒一个出队线程,由于此时可能已有出队线程在空队列上等待了,若是不唤醒,会致使出队线程永远没法执行。

2.3 出队 poll

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        // 1. 没有元素或元素还在有效期内则直接返回 null
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        // 2. 元素已经失效直接取出来一个
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

不阻塞直接 poll 时很简单,再来看一下阻塞式获取元素 take 方法。

2.4 阻塞式出队 take

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            // 1. 集合为空时全部的线程都处于无限等待的状态。
            //    只要有元素将其中一个线程转为 leader 状态
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                // 2. 元素已通过期,直接取出返回
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting
                // 3. 已经在其它线程设置为 leader,无限期等着
                if (leader != null)
                    available.await();
                // 4. 将 leader 设置为当前线程,阻塞当前线程(限时等待剩余有效时间)
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        // 4.1 尝试获取过时的元素,从新竞争
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 5. 队列中有元素则唤醒其它无限等待的线程
        //    leader 线程是限期等待,每次 leader 线程获取元素出队,若是队列中有元素
        //    就要唤醒一个无限等待的线程,将其设置为限期等待,也就是总有一个等待线程是 leader 状态
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

采用 take 阻塞式出队时,这里要思考下集合中元素时全部的等待线程永远进行 wait 状态不被唤醒,也就是说即便元素过时了也没法正常出队?

首先,在每次入队 offer 时,若是是第一个元素就会调用 vailable.signal() 唤醒一个等待的线程。
其次,take 方法自旋结束后若是 leader == null && q.peek() != null,须要唤醒一个等待中的出队线程。
leader == null && q.peek() != null 的含义就是——没有 leader 线程但队列中存在元素。咱们以前说了,leader 线程做用之一就是用来唤醒其它无限等待的线程,因此必需要有这个判断。
固然,若是集合中没有元素了,全部的等待线程都处理无限等待的状态。

参考:

  1. J.U.C之collections框架:DelayQueue

天天用心记录一点点。内容也许不重要,但习惯很重要!

相关文章
相关标签/搜索