DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在建立元素时能够指定多久才能够从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。后端
因延迟阻塞队列的特性, 咱们通常将 DelayQueue 做用于如下场景 :缓存
下面咱们以缓存系统的应用,看下 DelayQueue 的使用,代码以下:安全
public class DelayQueueDemo { static class Cache implements Runnable { private boolean stop = false; private Map<String, String> itemMap = new HashMap<>(); private DelayQueue<CacheItem> delayQueue = new DelayQueue<>(); public Cache () { // 开启内部线程检测是否过时 new Thread(this).start(); } /** * 添加缓存 * * @param key * @param value * @param exprieTime 过时时间,单位秒 */ public void put (String key, String value, long exprieTime) { CacheItem cacheItem = new CacheItem(key, exprieTime); // 此处忽略添加剧复 key 的处理 delayQueue.add(cacheItem); itemMap.put(key, value); } public String get (String key) { return itemMap.get(key); } public void shutdown () { stop = true; } @Override public void run() { while (!stop) { CacheItem cacheItem = delayQueue.poll(); if (cacheItem != null) { // 元素过时, 从缓存中移除 itemMap.remove(cacheItem.getKey()); System.out.println("key : " + cacheItem.getKey() + " 过时并移除"); } } System.out.println("cache stop"); } } static class CacheItem implements Delayed { private String key; /** * 过时时间(单位秒) */ private long exprieTime; private long currentTime; public CacheItem(String key, long exprieTime) { this.key = key; this.exprieTime = exprieTime; this.currentTime = System.currentTimeMillis(); } @Override public long getDelay(TimeUnit unit) { // 计算剩余的过时时间 // 大于 0 说明未过时 return exprieTime - TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - currentTime); } @Override public int compareTo(Delayed o) { // 过时时间长的放置在队列尾部 if (this.getDelay(TimeUnit.MICROSECONDS) > o.getDelay(TimeUnit.MICROSECONDS)) { return 1; } // 过时时间短的放置在队列头 if (this.getDelay(TimeUnit.MICROSECONDS) < o.getDelay(TimeUnit.MICROSECONDS)) { return -1; } return 0; } public String getKey() { return key; } } public static void main(String[] args) throws InterruptedException { Cache cache = new Cache(); // 添加缓存元素 cache.put("a", "1", 5); cache.put("b", "2", 4); cache.put("c", "3", 3); while (true) { String a = cache.get("a"); String b = cache.get("b"); String c = cache.get("c"); System.out.println("a : " + a + ", b : " + b + ", c : " + c); // 元素均过时后退出循环 if (StringUtils.isEmpty(a) && StringUtils.isEmpty(b) && StringUtils.isEmpty(c)) { break; } TimeUnit.MILLISECONDS.sleep(1000); } cache.shutdown(); } } 复制代码
执行结果以下:架构
a : 1, b : 2, c : 3 a : 1, b : 2, c : 3 a : 1, b : 2, c : 3 key : c 过时并移除 a : 1, b : 2, c : null key : b 过时并移除 a : 1, b : null, c : null key : a 过时并移除 a : null, b : null, c : null cache stop 复制代码
从执行结果能够看出,因循环内部每次停顿 1 秒,当等待 3 秒后,元素 c 过时并从缓存中清除,等待 4 秒后,元素 b 过时并从缓存中清除,等待 5 秒后,元素 a 过时并从缓存中清除。给你们推荐一个Java后端架构群:698581634 进群免费领取架构资料。ide
重入锁this
private final transient ReentrantLock lock = new ReentrantLock(); 复制代码
用于保证队列操做的线程安全性spa
优先队列线程
private final PriorityQueue<E> q = new PriorityQueue<E>(); 复制代码
存储介质,用于保证延迟低的优先执行3d
leadercode
leader 指向的是第一个从队列获取元素阻塞等待的线程,其做用是减小其余线程没必要要的等待时间。(这个地方我一直没搞明白 怎么就减小其余线程的等待时间了)
condition
private final Condition available = lock.newCondition(); 复制代码
条件对象,当新元素到达,或新线程可能须要成为leader时被通知
下面将主要对队列的入队,出队动做进行分析 :
入队 - offer
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { // 入队 q.offer(e); if (q.peek() == e) { // 若入队的元素位于队列头部,说明当前元素延迟最小 // 将 leader 置空 leader = null; // 唤醒阻塞在等待队列的线程 available.signal(); } return true; } finally { lock.unlock(); } } 复制代码
出队 - poll
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) // 等待 add 唤醒 available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) // 已过时则直接返回队列头节点 return q.poll(); first = null; // don't retain ref while waiting if (leader != null) // 若 leader 不为空 // 说明已经有其余线程调用过 take 操做 // 当前调用线程 follower 挂起等待 available.await(); else { // 若 leader 为空 // 将 leader 指向当前线程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 当前调用线程在指定 delay 时间内挂起等待 available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) // leader 处理完以后,唤醒 follower available.signal(); lock.unlock(); } } 复制代码
Leader-follower 模式