系列传送门:java
DelayQueue是一个支持延时获取元素的无界阻塞队列,使用PriorityQueue来存储元素。编程
队中的元素必须实现Delayed
接口【Delay接口又继承了Comparable,须要实现compareTo方法】,每一个元素都须要指明过时时间,经过getDelay(unit)
获取元素剩余时间【剩余时间 = 到期时间 - 当前时间】,每次向优先队列中添加元素时根据compareTo方法做为排序规则。缓存
当从队列获取元素时,只有过时的元素才会出队列。并发
使用场景: 缓存系统设计、定时任务调度等。app
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { // 独占锁实现同步 private final transient ReentrantLock lock = new ReentrantLock(); // 优先队列存放数据 private final PriorityQueue<E> q = new PriorityQueue<E>(); /** * 基于Leader-Follower模式的变体,用于尽可能减小没必要要的线程等待 */ private Thread leader = null; /** * 与lock对应的条件变量 */ private final Condition available = lock.newCondition(); }
队中的元素必须实现Delayed
接口【Delay接口又继承了Comparable,须要实现compareTo方法】,每一个元素都须要指明过时时间,经过getDelay(unit)
获取元素剩余时间【剩余时间 = 到期时间 - 当前时间】。dom
每次向优先队列中添加元素时根据compareTo方法做为排序规则,固然咱们约定一下,默认q.peek()出来的就是最早过时的元素。ide
public interface Delayed extends Comparable<Delayed> { // 返回剩余时间 long getDelay(TimeUnit unit); } public interface Comparable<T> { // 定义比较方法 public int compareTo(T o); }
学习了Delayed接口以后,咱们看一个实际的案例,加深印象,源于:《Java并发编程之美》。工具
static class DelayedElement implements Delayed { private final long delayTime; // 延迟时间 private final long expire; // 到期时间 private final String taskName; // 任务名称 public DelayedElement (long delayTime, String taskName) { this.delayTime = delayTime; this.taskName = taskName; expire = now() + delayTime; } final long now () { return System.currentTimeMillis(); } // 剩余时间 = 到期时间 - 当前时间 @Override public long getDelay (TimeUnit unit) { return unit.convert(expire - now(), TimeUnit.MILLISECONDS); } @Override public int compareTo (Delayed o) { return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } @Override public String toString () { final StringBuilder res = new StringBuilder("DelayedElement [ "); res.append("delay = ").append(delayTime); res.append(", expire = ").append(expire); res.append(", taskName = '").append(taskName).append('\''); res.append(" ] "); return res.toString(); } } public static void main (String[] args) { // 建立delayQueue队列 DelayQueue<DelayedElement> delayQueue = new DelayQueue<>(); // 建立延迟任务 Random random = new Random(); for (int i = 0; i < 10; i++) { DelayedElement element = new DelayedElement(random.nextInt(500), "task: " + i); delayQueue.offer(element); } // 依次取出任务并打印 DelayedElement ele = null; try { for (; ; ) { while ((ele = delayQueue.take()) != null) { System.out.println(ele); } } } catch (InterruptedException ex) { ex.printStackTrace(); } } // 打印结果 DelayedElement [ delay = 2, expire = 1611995426061, taskName = 'task: 4' ] DelayedElement [ delay = 52, expire = 1611995426111, taskName = 'task: 2' ] DelayedElement [ delay = 80, expire = 1611995426139, taskName = 'task: 5' ] DelayedElement [ delay = 132, expire = 1611995426191, taskName = 'task: 0' ] DelayedElement [ delay = 174, expire = 1611995426233, taskName = 'task: 9' ] DelayedElement [ delay = 175, expire = 1611995426234, taskName = 'task: 7' ] DelayedElement [ delay = 326, expire = 1611995426385, taskName = 'task: 3' ] DelayedElement [ delay = 447, expire = 1611995426506, taskName = 'task: 8' ] DelayedElement [ delay = 452, expire = 1611995426511, taskName = 'task: 1' ] DelayedElement [ delay = 486, expire = 1611995426545, taskName = 'task: 6' ]
DelayQueue构造器相比于前几个,就显得很是easy了。学习
public DelayQueue() {} public DelayQueue(Collection<? extends E> c) { this.addAll(c); }
由于DelayQueue是无界队列,不会由于边界问题产生阻塞,所以put操做和offer操做是同样的。ui
public void put(E e) { offer(e); } public boolean offer(E e) { // 获取独占锁 final ReentrantLock lock = this.lock; lock.lock(); try { // 加入优先队列里 q.offer(e); // 判断堆顶元素是否是刚刚插入的元素 // 若是判断为true,说明当前这个元素是将最早过时 if (q.peek() == e) { // 重置leader线程为null leader = null; // 激活available变量条件队列中的一个线程 available.signal(); } return true; } finally { lock.unlock(); } }
take方法将会获取并移除队列里面延迟时间过时的元素 ,若是队列里面没有过时元素则陷入等待。
public E take() throws InterruptedException { // 获取独占锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // 瞅一瞅谁最快过时 E first = q.peek(); // 队列为空,则将当前线程置入available的条件队列中,直到里面有元素 if (first == null) available.await(); else { // 看下还有多久过时 long delay = first.getDelay(NANOSECONDS); // 哇,已通过期了,就移除它并返回 if (delay <= 0) return q.poll(); first = null; // don't retain ref while waiting // leader不为null表示其余线程也在执行take // 则将当前线程置入available的条件队列中 if (leader != null) available.await(); else { // 若是leader为null,则选择当前线程做为leader线程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 等待delay时间,时间到以后,会出条件队列,继续竞争锁 available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
若是不设置first = null
,将会引发内存泄露。
- 线程A到达,队首元素没有到期,设置leader = 线程A,而且执行
available.awaitNanos(delay);
等待元素过时。- 这时线程B来了,由于leader != null,则会
available.await();
阻塞,线程C、D、E同理。- 线程A阻塞完毕了,再次循环,获取列首元素成功,出列。
这个时候列首元素应该会被回收掉,可是问题是它还被线程B、线程C持有着,因此不会回收,若是线程增多,且队首元素无限期的不能回收,就会形成内存泄漏。
DelayQueue是一个支持延时获取元素的无界阻塞队列,使用PriorityQueue来存储元素。
队中的元素必须实现Delayed
接口【Delay接口又继承了Comparable,须要实现compareTo方法】,每一个元素都须要指明过时时间,经过getDelay(unit)
获取元素剩余时间【剩余时间 = 到期时间 - 当前时间】,每次向优先队列中添加元素时根据compareTo方法做为排序规则。
基于Leader-Follower模式使用leader变量,减小没必要要的线程等待。
DelayQueue是无界队列,所以插入操做是非阻塞的。可是take操做从队列获取元素时,是阻塞的,阻塞规则为:
available.await()
陷入阻塞。available.await()
陷入阻塞。available.awaitNanos(delay)
。available.signal()
唤醒一个follower线程,接着回到开始那步。《Java并发编程的艺术》
《Java并发编程之美》