delayQueue其实就是在每次往优先级队列中添加元素,而后以元素的delay过时值做为排序的因素,以此来达到先过时的元素会拍在队首,每次从队列里取出来都是最早要过时的元素。less
所谓Delayed类型,由于须要比较,因此继承了Comparable接口:优化
public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); }
DelayQueue须要排序存储Delayed类型的对象同时具有阻塞功能,可是阻塞的过程伴有延迟等待类型的阻塞,所以不能直接使用BlockingPriorityQueue来实现,而是用非阻塞的版本的PriorityQueue来实现排序存储。ui
private final PriorityQueue<E> q = new PriorityQueue<E>();
所以DelayQueue须要本身实现阻塞的功能(须要一个Condition):this
private final Condition available = lock.newCondition();
代码以下:spa
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
整个代码的过程当中并无使用上太难理解的地方,可是有几个比较难以理解他为何这么作的地方线程
你们可能看到在咱们的DelayQueue中有一个Thread类型的元素leader,那么他是作什么的呢,有什么用呢?code
让咱们先看一下元素注解上的doc描述:对象
Thread designated to wait for the element at the head of the queue.
This variant of the Leader-Follower pattern serves to minimize unnecessary timed waiting.
when a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely.
The leader thread must signal some other thread before returning from take() or poll(...), unless some other thread becomes leader in the interim.
Whenever the head of the queue is replaced with an element with an earlier expiration time, the leader field is invalidated by being reset to null, and some waiting thread, but not necessarily the current leader, is signalled.
So waiting threads must be prepared to acquire and lose leadership while waiting.排序
上面主要的意思就是说用leader来减小没必要要的等待时间,那么这里咱们的DelayQueue是怎么利用leader来作到这一点的呢:继承
这里咱们想象着咱们有个多个消费者线程用take方法去取,内部先加锁,而后每一个线程都去peek第一个节点.
若是leader不为空说明已经有线程在取了,设置当前线程等待
if (leader != null) available.await();
若是为空说明没有其余线程去取这个节点,设置leader并等待delay延时到期,直到poll后结束循环
else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } }
first = null; // don't retain ref while waiting
咱们能够看到doug lea后面写的注释,那么这段代码有什么用呢?
想一想假设如今延迟队列里面有三个对象。