阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操做将会被阻塞,或者当队列是满时,往队列里添加元素的操做会被阻塞java
延迟阻塞队列DelayQueue
的底层是基于优先级队列PriorityQueue
来实现的,所以研究延迟阻塞队列,更多的注意力应集中在如下两点缓存
类的声明以下,要求队列中的元素必须继承 Delayed
安全
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); }
这个限定,主要服务于优先级队列的排序要求,根据延迟时间对元素队列中的元素进行排序并发
入队的实现逻辑比较简单,为了保证并发安全,实现中实现加锁机制异步
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
入队的实际是交由优先级队列进行实现,须要注意的是,入队以后,额外的一个操做,若是入队的元素刚好在队列头,执行两个操做ide
leader
赋值为空 (这个是干吗的,为何这么作?)available.signal()
唤醒被阻塞的线程(什么线程被阻塞?)public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } }
出队的操做一样加锁,获取队列头的元素,判断延期时间是否结束,是才返回结果,不然返回nullpost
注意,这里有两个疑问性能
getDelay()
方法返回值会变么,由谁来改变呢?虽然上面的出队和入队的逻辑比较简单,可是留下的疑问一点都很多,上面的四个问题应该如何解答?学习
继续看源码,发现还有一个出队的方法, 传入了两个参数表示阻塞的超时时间(即超过这个时间没有返回,则抛一个中端异常)测试
public E poll(long timeout, TimeUnit unit) throws InterruptedException { // 时间转换为纳秒 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // 获取队列头 E first = q.peek(); if (first == null) { // 队列为空 if (nanos <= 0) // 延时时间已过,直接返回null return null; else // 当前线程阻塞 nanos (ns),而后再次循环 nanos = available.awaitNanos(nanos); } else { // 队列非空 // 获取队列头元素的延迟时间 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) // 延迟时间小于0,直接返回队列头 return q.poll(); if (nanos <= 0) // 阻塞时间已过,队列头的延迟时间还没到,则返回null return null; first = null; // don't retain ref while waiting if (nanos < delay || leader != null) // 没法获取当前的队列头 //(由于队列头延迟时间大于阻塞时间,即队列头不生效) // 继续阻塞,以指望此时可能新增一个到队列头 nanos = available.awaitNanos(nanos); else { // 能够获取当前队列头 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 阻塞到队列头生效 long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
分析: 以当前队列为空做为条件
上面代码的流程以下:
继续化重点
getDelay
返回小于0)上面的方法由于加上了一个超时时间(即在指定的时间内依然没法返回时,断掉阻塞),分析起来可能不太顺畅,再看源码,还有一个take方法,逻辑与上面类似,只是砍掉了超时断开阻塞的逻辑
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) // 队列头为空,则阻塞,直到新增一个入队为止 available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) // 若队列头元素已生效,则直接返回 return q.poll(); first = null; // don't retain ref while waiting if (leader != null) // leader 非空时,表示有其余的一个线程在出队阻塞中 // 此时挂住当前线程,等待另外一个线程出队完成 available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 等待队列头元素生效 available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) // 当前线程出队完成,通知其余出队阻塞的线程继续执行 available.signal(); lock.unlock(); } }
经过了以前的烧脑逻辑以后,再看这个就简单不少了
如有另外一个线程已经处于等待队列头生效的阻塞过程当中,则阻塞当前线程,直到另外一个线程完成出队操做
若没有其余线程阻塞在出队过程当中,即当前线程为第一个获取队列头的线程
leader = thisThread
)leader=null
)最后步骤1中被阻塞的线程
所以能够愉快的解答上面的四个问题
添加一个元素到队列头
leader赋值为空 (这个是干吗的,为何这么作?)
leader记录了被阻塞在等待队列头生效的线程 新增一个元素到队列头,表示等待原来队列头生效的阻塞的线程已经失去了阻塞的意义,此时须要获取新的队列头进行返回了
available.signal() 唤醒被阻塞的线程(什么线程被阻塞?)
获取队列头的线程被唤起,主要有两种场景: 1. 以前队列为空,致使被阻塞的线程 2. 以前队列非空,可是队列头没有生效致使被阻塞的线程
普通的出队方法
队列中元素getDelay()方法返回值会变么,由谁来改变呢?
必须得变,不然这个元素一直不生效,将直接致使线程一直阻塞 由队列中的元素实现类来保证,返回值是逐渐变小的
上面的出队没有阻塞,直接返回了null
须要阻塞获取队列头,用 `take`, `poll(long,TimeUnit)`来代替
上面分析的是阻塞队列的实现原理,接下来举一个实例来解析下这个延迟阻塞队列的使用姿式,加深下理解
(简化了在简化以后的,与实际会有一些区别,请勿彻底认定合理)
好比和电商的详情页展现,为了提升应用的性能,咱们将整个页面进行了缓存,当详情页发生修改后,咱们会更新缓存的内容
所以为了保证缓存的内容和实际的内容是一致的,咱们须要一个对帐的任务,当详情页修改后,而且更新缓存完成以后,咱们须要再次对比缓存和实际内容的一致性;
此时一个异步的任务能够这么设计:监听详情页修改的事件,延迟一段时间,而后再对比缓存和实际内容的一致性 (这里延迟一段时间主要是为了保证缓存已经更新完成)
@Data @NoArgsConstructor @AllArgsConstructor public class DetailInfo { private int itemId; private String title; private String desc; private int price; }
UpdateTask
注意其中 getDelay()
的实现逻辑,根据当前时间与预订的延迟生效时间进行比较
@Data @AllArgsConstructor public class UpdateTask implements Delayed { private int itemId; private long delayTime; @Override public long getDelay(TimeUnit unit) { return delayTime - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { return (int) (getDelay(TimeUnit.MICROSECONDS) - o.getDelay(TimeUnit.MICROSECONDS)); } }
更新事件的监听订阅使用了 Guava的EventBus
来处理,若有疑问能够搜索EventBus的使用姿式
public class DetailManager { // 模拟真实数据存储空间 private Map<Integer, DetailInfo> realMap = new ConcurrentHashMap<>(); // 模拟缓存空间 private Map<String, String> cache = new ConcurrentHashMap<>(); private Gson gson = new Gson(); private String getCacheKey(int itemId) { return "detailInfo_" + itemId; } // eventBus 用于发送更新事件;异步接受更新事件 private AsyncEventBus eventBus; private void init() { DetailInfo detailInfo = new DetailInfo(1, "onw", "第一个测试", 100); DetailInfo detailInfo2 = new DetailInfo(2, "two", "第二个测试", 200); realMap.put(detailInfo.getItemId(), detailInfo); realMap.put(detailInfo2.getItemId(), detailInfo2); cache.put(getCacheKey(detailInfo.getItemId()), gson.toJson(detailInfo)); cache.put(getCacheKey(detailInfo2.getItemId()), gson.toJson(detailInfo2)); eventBus = new AsyncEventBus("Validate-Thread", Executors.newFixedThreadPool(2)); eventBus.register(this); } // 模拟更新商品 public void updateDetail(int itemId) { DetailInfo detailInfo = realMap.get(itemId); long now = System.currentTimeMillis(); detailInfo.setTitle("title_" + itemId + "_" + now); cache.put(getCacheKey(itemId), gson.toJson(detailInfo)); // 发送一个修改的事件 eventBus.post(new UpdateTask(itemId, now + 5000)); System.out.println("[UpdateInfo]>>>ItemId: " + itemId + " updateTime: " + now + " validateTime: " + (now + 5000)); } // 延迟队列 private DelayQueue<UpdateTask> delayQueue = new DelayQueue<>(); /** * 监听修改事件 * @param updateTask */ @Subscribe public void verify(UpdateTask updateTask) { long getTaskTime = System.currentTimeMillis(); delayQueue.put(updateTask); try { UpdateTask task = delayQueue.take(); long processTime = System.currentTimeMillis(); DetailInfo real = realMap.get(task.getItemId()); String cacheObj = cache.get(getCacheKey(task.getItemId())); boolean ans = gson.toJson(real).equals(cacheObj); System.out.println("validate itemId: " + updateTask.getItemId() + " getEventTime: " + getTaskTime + " processTime:" + processTime + " ans: " + ans); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { DetailManager detailManager = new DetailManager(); detailManager.init(); // 开始修改 detailManager.updateDetail(1); Thread.sleep(20); detailManager.updateDetail(2); Thread.sleep(35000); } }
简单说明主流程
DetailManager
输出结果以下
[UpdateInfo]>>>ItemId: 1 updateTime: 1508677959067 validateTime: 1508677964067 [UpdateInfo]>>>ItemId: 2 updateTime: 1508677959103 validateTime: 1508677964103 Thread[pool-1-thread-1,5,main]>>> validate itemId: 1 getEventTime: 1508677959078 processTime:1508677964067 ans: true Thread[pool-1-thread-2,5,main]>>> validate itemId: 2 getEventTime: 1508677964067 processTime:1508677964103 ans: true
从上面的输出能够得知,实际验证的时间戳和预期的时间错是相同的
延迟阻塞队列DelayQueue,学习下来以后感受很是有意思,首先是加深了使用姿式的了解,其次对其中的阻塞,唤醒机制有了必定了解,涨了锁使用知识的见识(这里面还有一个很是有意思的东西就是 Condition
和 ReentrantLock
的使用,后续线程安全篇的研究能够以此做为应用场景)
简单小结上面的学习内容
PriorityQueue
take()
, poll(long, TimeUnit)
两方法之一Delayed
接口,内部实现的getDelay
方法,要求返回值愈来愈小(若是一直大于0,这个延迟任务就一直没法执行了)GG