原文出处:http://cmsblogs.com/ 『chenssy』java
DelayQueue是一个支持延时获取元素的无界阻塞队列。里面的元素所有都是“可延期”的元素,列头的元素是最早“到期”的元素,若是队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行。也就是说只有在延迟期到时才可以从队列中取元素。缓存
DelayQueue主要用于两个方面:并发
DelayQueue实现的关键主要有以下几个:优化
ReentrantLock、Condition这两个对象就不须要阐述了,他是实现整个BlockingQueue的核心。PriorityQueue是一个支持优先级线程排序的队列(参考【死磕Java并发】-----J.U.C之阻塞队列:PriorityBlockingQueue),leader后面阐述。这里咱们先来了解Delay,他是实现延时操做的关键。this
Delayed接口是用来标记那些应该在给定延迟时间以后执行的对象,它定义了一个long getDelay(TimeUnit unit)方法,该方法返回与此对象相关的的剩余时间。同时实现该接口的对象必须定义一个compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。线程
public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); }
如何使用该接口呢?上面说的很是清楚了,实现该接口的getDelay()方法,同时定义compareTo()方法便可。code
先看DelayQueue的定义:对象
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { /** 可重入锁 */ private final transient ReentrantLock lock = new ReentrantLock(); /** 支持优先级的BlockingQueue */ private final PriorityQueue<E> q = new PriorityQueue<E>(); /** 用于优化阻塞 */ private Thread leader = null; /** Condition */ private final Condition available = lock.newCondition(); /** * 省略不少代码 */ }
看了DelayQueue的内部结构就对上面几个关键点一目了然了,可是这里有一点须要注意,DelayQueue的元素都必须继承Delayed接口。同时也能够从这里初步理清楚DelayQueue内部实现的机制了:以支持优先级无界队列的PriorityQueue做为一个容器,容器里面的元素都应该实现Delayed接口,在每次往优先级队列中添加元素时以元素的过时时间做为排序条件,最早过时的元素放在优先级最高。blog
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { // 向 PriorityQueue中插入元素 q.offer(e); // 若是当前元素的对首元素(优先级最高),leader设置为空,唤醒全部等待线程 if (q.peek() == e) { leader = null; available.signal(); } // 无界队列,永远返回true return true; } finally { lock.unlock(); } }
offer(E e)就是往PriorityQueue中添加元素,具体能够参考(【死磕Java并发】-----J.U.C之阻塞队列:PriorityBlockingQueue)。整个过程仍是比较简单,可是在判断当前元素是否为对首元素,若是是的话则设置leader=null,这是很是关键的一个步骤,后面阐述。排序
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // 对首元素 E first = q.peek(); // 对首为空,阻塞,等待off()操做唤醒 if (first == null) available.await(); else { // 获取对首元素的超时时间 long delay = first.getDelay(NANOSECONDS); // <=0 表示已过时,出对,return if (delay <= 0) return q.poll(); first = null; // don't retain ref while waiting // leader != null 证实有其余线程在操做,阻塞 if (leader != null) available.await(); else { // 不然将leader 设置为当前线程,独占 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 超时阻塞 available.awaitNanos(delay); } finally { // 释放leader if (leader == thisThread) leader = null; } } } } } finally { // 唤醒阻塞线程 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
首先是获取对首元素,若是对首元素的延时时间 delay <= 0 ,则能够出对了,直接return便可。不然设置first = null,这里设置为null的主要目的是为了不内存泄漏。若是 leader != null 则表示当前有线程占用,则阻塞,不然设置leader为当前线程,而后调用awaitNanos()方法超时等待。
first = null
这里为何若是不设置first = null,则会引发内存泄漏呢?线程A到达,列首元素没有到期,设置leader = 线程A,这是线程B来了由于leader != null,则会阻塞,线程C同样。假如线程阻塞完毕了,获取列首元素成功,出列。这个时候列首元素应该会被回收掉,可是问题是它还被线程B、线程C持有着,因此不会回收,这里只有两个线程,若是有线程D、线程E...呢?这样会无限期的不能回收,就会形成内存泄漏。
这个入队、出对过程和其余的阻塞队列没有很大区别,无非是在出对的时候增长了一个到期时间的判断。同时经过leader来减小没必要要阻塞。
转载,请保留原文地址,谢谢 ~