DelayQueue 是一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。 队列中的元素必须实现 Delayed 接口,在建立元素时能够指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。java
DelayQueue 也是一种比较特殊的阻塞队列,从类声明也能够看出,DelayQueue 中的全部元素必须实现 Delayed 接口。DelayQueue 队列的元素必须实现 Delayed 接口。segmentfault
// 此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。 public interface Delayed extends Comparable<Delayed> { // 返回与此对象相关的剩余有效时间,以给定的时间单位表示 long getDelay(TimeUnit unit); }
能够看到,Delayed 接口除了自身的 getDelay 方法外,还实现了 Comparable 接口。getDelay 方法用于返回对象的剩余有效时间,实现 Comparable 接口则是为了可以比较两个对象,以便排序。设计模式
也就是说,若是一个类实现了 Delayed 接口,当建立该类的对象并添加到 DelayQueue 中后,只有当该对象的 getDalay 方法返回的剩余时间 ≤0 时才会出队。缓存
另外,因为 DelayQueue 内部委托了 PriorityQueue 对象来实现全部方法,因此能以堆的结构维护元素顺序,这样剩余时间最小的元素就在堆顶,每次出队其实就是删除剩余时间 ≤0 的最小元素。多线程
DelayQueue 的特色简要归纳以下:框架
DelayQueue 很是有用,能够将 DelayQueue 运用在如下应用场景。ide
咱们能够参考 ScheduledThreadPoolExecutor#ScheduledFutureTask 类的实现。源码分析
// 模仿网吧上网场景 public class DelayQueueTest extends Thread { DelayQueue queue = new DelayQueue(); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); public static void main(String[] args) { DelayQueueTest wangba = new DelayQueueTest(); wangba.start(); wangba.shangji("A", 5); wangba.shangji("B", 2); wangba.shangji("C", 4); } public void shangji(String name, int money) { WangMing wm = new WangMing(name, System.currentTimeMillis() + money * 1000l); queue.add(wm); System.out.println(name + "开始上网,时间:" + format.format(new Date()) + ",预计下机时间为:" + format.format(new Date(wm.getEndTime()))); } public void xiaji(WangMing wm) { System.out.println(wm.getName() + "下机,时间:" + format.format(new Date(wm.getEndTime()))); } public void run() { while (true) { try { WangMing wm = (WangMing) queue.take(); xiaji(wm); } catch (InterruptedException e) { } } } } // 网民,必须实现 Delayed 接口 class WangMing implements Delayed { private String name; private long endTime; private TimeUnit timeUnit = TimeUnit.SECONDS; @Override public long getDelay(TimeUnit unit) { return endTime - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { WangMing wm = (WangMing) o; return this.getDelay(timeUnit) - wm.getDelay(timeUnit) > 0 ? 1 : (this.getDelay(timeUnit) - wm.getDelay(timeUnit) < 0 ? -1 : 0); } }
程序执行结果:性能
A开始上网,时间:2017-12-07 09:37:52,预计下机时间为:2017-12-07 09:37:57 B开始上网,时间:2017-12-07 09:37:52,预计下机时间为:2017-12-07 09:37:54 C开始上网,时间:2017-12-07 09:37:52,预计下机时间为:2017-12-07 09:37:56 B下机,时间:2017-12-07 09:37:54 C下机,时间:2017-12-07 09:37:56 A下机,时间:2017-12-07 09:37:57
介绍完了 DelayQueued 的基本使用,读者应该对该阻塞队列的功能有了基本了解,接下来咱们看下 Doug Lea 是如何实现 DelayQueued 的。this
private final transient ReentrantLock lock = new ReentrantLock(); private final Condition available = lock.newCondition(); // PriorityQueue 维护队列 private final PriorityQueue<E> q = new PriorityQueue<E>(); private Thread leader = null;
上述比较特殊的是 leader 字段,咱们以前已经说过,DelayQueue 每次只会出队一个过时的元素,若是队首元素没有过时,就会阻塞出队线程,让线程在 available 这个条件队列上无限等待。
为了提高性能,DelayQueue 并不会让全部出队线程都无限等待,而是用 leader 保存了第一个尝试出队的线程,该线程的等待时间是队首元素的剩余有效期。这样,一旦 leader 线程被唤醒(此时队首元素也失效了),就能够出队成功,而后唤醒一个其它在 available 条件队列上等待的线程。以后,会重复上一步,新唤醒的线程可能取代成为新的 leader 线程。这样,就避免了无效的等待,提高了性能。这实际上是一种名为 Leader-Follower pattern
的多线程设计模式。
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); // 调用 PriorityQueue#offer 方法 if (q.peek() == e) { // 若是入队元素在队首, 则唤醒一个出队线程 leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
须要注意的是当首次入队元素时,须要唤醒一个出队线程,由于此时可能已有出队线程在空队列上等待了,若是不唤醒,会致使出队线程永远没法执行。
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); // 1. 没有元素或元素还在有效期内则直接返回 null if (first == null || first.getDelay(NANOSECONDS) > 0) return null; // 2. 元素已经失效直接取出来一个 else return q.poll(); } finally { lock.unlock(); } }
不阻塞直接 poll 时很简单,再来看一下阻塞式获取元素 take 方法。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); // 1. 集合为空时全部的线程都处于无限等待的状态。 // 只要有元素将其中一个线程转为 leader 状态 if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); // 2. 元素已通过期,直接取出返回 if (delay <= 0) return q.poll(); first = null; // don't retain ref while waiting // 3. 已经在其它线程设置为 leader,无限期等着 if (leader != null) available.await(); // 4. 将 leader 设置为当前线程,阻塞当前线程(限时等待剩余有效时间) else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { // 4.1 尝试获取过时的元素,从新竞争 if (leader == thisThread) leader = null; } } } } } finally { // 5. 队列中有元素则唤醒其它无限等待的线程 // leader 线程是限期等待,每次 leader 线程获取元素出队,若是队列中有元素 // 就要唤醒一个无限等待的线程,将其设置为限期等待,也就是总有一个等待线程是 leader 状态 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
采用 take 阻塞式出队时,这里要思考下集合中元素时全部的等待线程永远进行 wait 状态不被唤醒,也就是说即便元素过时了也没法正常出队?
首先,在每次入队 offer 时,若是是第一个元素就会调用 vailable.signal() 唤醒一个等待的线程。
其次,take 方法自旋结束后若是 leader == null && q.peek() != null,须要唤醒一个等待中的出队线程。
leader == null && q.peek() != null 的含义就是——没有 leader 线程但队列中存在元素。咱们以前说了,leader 线程做用之一就是用来唤醒其它无限等待的线程,因此必需要有这个判断。
固然,若是集合中没有元素了,全部的等待线程都处理无限等待的状态。
参考:
天天用心记录一点点。内容也许不重要,但习惯很重要!