本文首发于一世流云专栏: https://segmentfault.com/blog...
DelayQueue
是JDK1.5时,随着J.U.C包一块儿引入的一种阻塞队列,它实现了BlockingQueue接口,底层基于已有的PriorityBlockingQueue实现:segmentfault
DelayQueue也是一种比较特殊的阻塞队列,从类声明也能够看出,DelayQueue中的全部元素必须实现Delayed
接口:设计模式
/** * 一种混合风格的接口,用来标记那些应该在给定延迟时间以后执行的对象。 * <p> * 此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。 */ public interface Delayed extends Comparable<Delayed> { /** * 返回与此对象相关的剩余有效时间,以给定的时间单位表示. */ long getDelay(TimeUnit unit); }
能够看到,Delayed接口除了自身的getDelay
方法外,还实现了Comparable接口。getDelay方法用于返回对象的剩余有效时间,实现Comparable接口则是为了可以比较两个对象,以便排序。缓存
也就是说,若是一个类实现了Delayed接口,当建立该类的对象并添加到DelayQueue中后,只有当该对象的getDalay方法返回的剩余时间≤0时才会出队。网络
另外,因为DelayQueue内部委托了PriorityBlockingQueue对象来实现全部方法,因此能以堆的结构维护元素顺序,这样剩余时间最小的元素就在堆顶,每次出队其实就是删除剩余时间≤0的最小元素。多线程
DelayQueue的特色简要归纳以下:框架
为了便于理解DelayQueue的功能,咱们先来看一个使用DelayQueue的示例。dom
第一节说了,队列元素必须实现Delayed接口,咱们先来定义一个Data类,做为队列元素:异步
public class Data implements Delayed { private static final AtomicLong atomic = new AtomicLong(0); private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss-n"); // 数据的失效时间点 private final long time; // 序号 private final long seqno; /** * @param deadline 数据失效时间点 */ public Data(long deadline) { this.time = deadline; this.seqno = atomic.getAndIncrement(); } /** * 返回剩余有效时间 * * @param unit 时间单位 */ @Override public long getDelay(TimeUnit unit) { return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS); } /** * 比较两个Delayed对象的大小, 比较顺序以下: * 1. 若是是对象自己, 返回0; * 2. 比较失效时间点, 先失效的返回-1,后失效的返回1; * 3. 比较元素序号, 序号小的返回-1, 不然返回1. * 4. 非Data类型元素, 比较剩余有效时间, 剩余有效时间小的返回-1,大的返回1,相同返回0 */ @Override public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof Data) { Data x = (Data) other; // 优先比较失效时间 long diff = this.time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (this.seqno < x.seqno) // 剩余时间相同则比较序号 return -1; else return 1; } // 通常不会执行到此处,除非元素不是Data类型 long diff = this.getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; } @Override public String toString() { return "Data{" + "time=" + time + ", seqno=" + seqno + "}, isValid=" + isValid(); } private boolean isValid() { return this.getDelay(TimeUnit.NANOSECONDS) > 0; } }
关于队列元素Data类,须要注意如下几点:ide
time
字段保存失效时间点)的纳秒形式(构造时指定,好比当前时间+60s);seqno
字段表示元素序号,每一个元素惟一,仅用于失效时间点一致的元素之间的比较。getDelay
方法返回元素的剩余有效时间,能够根据入参的TimeUnit选择时间的表示形式(秒、微妙、纳秒等),通常选择纳秒以提升精度;compareTo
方法用于比较两个元素的大小,以便在队列中排序。因为DelayQueue基于优先级队列实现,因此内部是“堆”的形式,咱们定义的规则是先失效的元素将先出队,因此先失效元素应该在堆顶,即compareTo方法返回结果<0的元素优先出队;仍是以“生产者-消费者”模式来做为DelayQueued的示例:性能
生产者
public class Producer implements Runnable { private final DelayQueue<Data> queue; public Producer(DelayQueue<Data> queue) { this.queue = queue; } @Override public void run() { while (true) { long currentTime = System.nanoTime(); long validTime = ThreadLocalRandom.current().nextLong(1000000000L, 7000000000L); Data data = new Data(currentTime + validTime); queue.put(data); System.out.println(Thread.currentThread().getName() + ": put " + data); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
消费者
public class Consumer implements Runnable { private final DelayQueue<Data> queue; public Consumer(DelayQueue<Data> queue) { this.queue = queue; } @Override public void run() { while (true) { try { Data data = queue.take(); System.out.println(Thread.currentThread().getName() + ": take " + data); Thread.yield(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
调用
public class Main { public static void main(String[] args) { DelayQueue<Data> queue = new DelayQueue<>(); Thread c1 = new Thread(new Consumer(queue), "consumer-1"); Thread p1 = new Thread(new Producer(queue), "producer-1"); c1.start(); p1.start(); } }
执行结果:
producer-1: put Data{time=73262562161592, seqno=0}, isValid=true
producer-1: put Data{time=73262787192726, seqno=1}, isValid=true
producer-1: put Data{time=73265591291171, seqno=2}, isValid=true
producer-1: put Data{time=73266850330909, seqno=3}, isValid=trueconsumer-1: take Data{time=73262562161592, seqno=0}, isValid=false
consumer-1: take Data{time=73262787192726, seqno=1}, isValid=false
producer-1: put Data{time=73267928737184, seqno=4}, isValid=true
producer-1: put Data{time=73265083111776, seqno=5}, isValid=true
producer-1: put Data{time=73268729942809, seqno=6}, isValid=trueconsumer-1: take Data{time=73265083111776, seqno=5}, isValid=false
上面示例中,咱们建立了一个生产者,一个消费者,生产者不断得入队元素,每一个元素都会有个截止有效期;消费者不断得从队列者获取元素。从输出能够看出,消费者每次获取到的元素都是有效期最小的,且都是已经失效了的。(由于DelayQueue每次出队只会删除有效期最小且已通过期的元素)
介绍完了DelayQueued的基本使用,读者应该对该阻塞队列的功能有了基本了解,接下来咱们看下Doug Lea是如何实现DelayQueued的。
DelayQueued提供了两种构造器,都很是简单:
/** * 默认构造器. */ public DelayQueue() { }
/** * 从已有集合构造队列. */ public DelayQueue(Collection<? extends E> c) { this.addAll(c); }
能够看到,内部的PriorityQueue并不是在构造时建立,而是对象建立时生成:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue<E> q = new PriorityQueue<E>(); /** * leader线程是首个尝试出队元素(队列不为空)但被阻塞的线程. * 该线程会限时等待(队首元素的剩余有效时间),用于唤醒其它等待线程 */ private Thread leader = null; /** * 出队线程条件队列, 当有多个线程, 会在此条件队列上等待. */ private final Condition available = lock.newCondition(); //... }
上述比较特殊的是leader
字段,咱们以前已经说过,DelayQueue每次只会出队一个过时的元素,若是队首元素没有过时,就会阻塞出队线程,让线程在available
这个条件队列上无限等待。
为了提高性能,DelayQueue并不会让全部出队线程都无限等待,而是用leader
保存了第一个尝试出队的线程,该线程的等待时间是队首元素的剩余有效期。这样,一旦leader线程被唤醒(此时队首元素也失效了),就能够出队成功,而后唤醒一个其它在available
条件队列上等待的线程。以后,会重复上一步,新唤醒的线程可能取代成为新的leader线程。这样,就避免了无效的等待,提高了性能。这实际上是一种名为“Leader-Follower pattern”的多线程设计模式。
put方法没有什么特别,因为是无界队列,因此也不会阻塞线程。
/** * 入队一个指定元素e. * 因为是无界队列, 因此该方法并不会阻塞线程. */ public void put(E e) { offer(e); } public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); // 调用PriorityQueue的offer方法 if (q.peek() == e) { // 若是入队元素在队首, 则唤醒一个出队线程 leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
须要注意的是当首次入队元素时,须要唤醒一个出队线程,由于此时可能已有出队线程在空队列上等待了,若是不唤醒,会致使出队线程永远没法执行。
if (q.peek() == e) { // 若是入队元素在队首, 则唤醒一个出队线程 leader = null; available.signal(); }
整个take方法在一个自旋中完成,其实就分为两种状况:
1.队列为空
这种状况直接阻塞出队线程。(在available条件队列等待)
2.队列非空
队列非空时,还要看队首元素的状态(有效期),若是队首元素过时了,那直接出队就好了;若是队首元素未过时,就要看当前线程是不是第一个到达的出队线程(即判断leader
是否为空),若是不是,就无限等待,若是是,则限时等待。
/** * 队首出队元素. * 若是队首元素(堆顶)未到期或队列为空, 则阻塞线程. */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (; ; ) { E first = q.peek(); // 读取队首元素 if (first == null) // CASE1: 队列为空, 直接阻塞 available.await(); else { // CASE2: 队列非空 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) // CASE2.0: 队首元素已过时 return q.poll(); // 执行到此处说明队列非空, 且队首元素未过时 first = null; if (leader != null) // CASE2.1: 已存在leader线程 available.await(); // 无限期阻塞当前线程 else { // CASE2.2: 不存在leader线程 Thread thisThread = Thread.currentThread(); leader = thisThread; // 将当前线程置为leader线程 try { available.awaitNanos(delay); // 阻塞当前线程(限时等待剩余有效时间) } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) // 不存在leader线程, 则唤醒一个其它出队线程 available.signal(); lock.unlock(); } }
须要注意,自旋结束后若是leader == null && q.peek() != null
,须要唤醒一个等待中的出队线程。
leader == null && q.peek() != null
的含义就是——没有leader
线程但队列中存在元素。咱们以前说了,leader线程做用之一就是用来唤醒其它无限等待的线程,因此必需要有这个判断。
DelayQueue是阻塞队列中很是有用的一种队列,常常被用于缓存或定时任务等的设计。
考虑一种使用场景:
异步通知的重试,在不少系统中,当用户完成服务调用后,系统有时须要将结果异步通知到用户的某个URI。因为网络等缘由,不少时候会通知失败,这个时候就须要一种重试机制。
这时能够用DelayQueue保存通知失败的请求,失效时间能够根据已通知的次数来设定(好比:2s、5s、10s、20s),这样每次从队列中take获取的就是剩余时间最短的请求,若是已重复通知次数超过必定阈值,则能够把消息抛弃。
后面,咱们在讲J.U.C之executors框架的时候,还会再次看到DelayQueue的身影。JUC线程池框架中的ScheduledThreadPoolExecutor.DelayedWorkQueue
就是一种延时阻塞队列。