JDK容器学习之Queue:DelayQueue

延迟阻塞队列 DelayQueue

阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操做将会被阻塞,或者当队列是满时,往队列里添加元素的操做会被阻塞java

延迟阻塞队列DelayQueue的底层是基于优先级队列PriorityQueue来实现的,所以研究延迟阻塞队列,更多的注意力应集中在如下两点缓存

  • 阻塞是如何实现的
  • 应用场景是什么

I. 阻塞队列的实现逻辑

1. 限定

类的声明以下,要求队列中的元素必须继承 Delayed安全

public class DelayQueue<E extends Delayed> 
    extends AbstractQueue<E>
    implements BlockingQueue<E>
    
    
public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

这个限定,主要服务于优先级队列的排序要求,根据延迟时间对元素队列中的元素进行排序并发

2. 入队

入队的实现逻辑比较简单,为了保证并发安全,实现中实现加锁机制异步

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

入队的实际是交由优先级队列进行实现,须要注意的是,入队以后,额外的一个操做,若是入队的元素刚好在队列头,执行两个操做ide

  1. leader赋值为空 (这个是干吗的,为何这么作?)
  2. available.signal() 唤醒被阻塞的线程(什么线程被阻塞?)

3. 出队

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

出队的操做一样加锁,获取队列头的元素,判断延期时间是否结束,是才返回结果,不然返回nullpost

注意,这里有两个疑问性能

  1. 队列中元素getDelay()方法返回值会变么,由谁来改变呢?
  2. 上面的出队没有阻塞,直接返回了null

虽然上面的出队和入队的逻辑比较简单,可是留下的疑问一点都很多,上面的四个问题应该如何解答?学习

继续看源码,发现还有一个出队的方法, 传入了两个参数表示阻塞的超时时间(即超过这个时间没有返回,则抛一个中端异常)测试

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 时间转换为纳秒
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 获取队列头
            E first = q.peek();
            if (first == null) { // 队列为空
                if (nanos <= 0)
                    // 延时时间已过,直接返回null
                    return null;
                else
                    // 当前线程阻塞 nanos (ns),而后再次循环
                    nanos = available.awaitNanos(nanos);
            } else { // 队列非空
                // 获取队列头元素的延迟时间
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0) // 延迟时间小于0,直接返回队列头
                    return q.poll();
                if (nanos <= 0) 
                // 阻塞时间已过,队列头的延迟时间还没到,则返回null
                    return null;
                first = null; // don't retain ref while waiting
                if (nanos < delay || leader != null)
                // 没法获取当前的队列头
                //(由于队列头延迟时间大于阻塞时间,即队列头不生效)
                // 继续阻塞,以指望此时可能新增一个到队列头
                    nanos = available.awaitNanos(nanos);
                else {
                // 能够获取当前队列头
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                    // 阻塞到队列头生效
                        long timeLeft = available.awaitNanos(delay);
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

分析: 以当前队列为空做为条件

上面代码的流程以下:

  1. 阻塞当前方法
  2. 此时若新入队一个元素,根据前面入队方法,此时表示新入队的就是在队列头,会发出一个唤醒的操做
  3. 此时阻塞的线程被唤醒,继续循环,再次获取队列头(此时非空了)
  4. 判断队列头的元素延迟时间是否已过
  • 已过,则弹出队列头,并返回
  • 未过,继续判断阻塞时间是否小于0
    • 是则表示已通过了预期的阻塞时间,直接返回null
    • 若阻塞时间小于队列头的延迟时间(表示能够当前的队列头,不是本方法预期的),则继续阻塞当前线程,以指望此时有新入队的元素可能被再次获取
    • 不然表示当前线程获能够获取如今的队列头,记录下当前线程,并阻塞,等到队列头元素生效

继续化重点

  • 添加元素到队列头会唤起出队的阻塞线程
  • 被唤起以后,出队线程会再次获取队列头元素,判断是否能够返回(即getDelay返回小于0)

上面的方法由于加上了一个超时时间(即在指定的时间内依然没法返回时,断掉阻塞),分析起来可能不太顺畅,再看源码,还有一个take方法,逻辑与上面类似,只是砍掉了超时断开阻塞的逻辑

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
            // 队列头为空,则阻塞,直到新增一个入队为止
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                // 若队列头元素已生效,则直接返回
                    return q.poll();
                first = null; // don't retain ref while waiting
                if (leader != null)
                // leader 非空时,表示有其余的一个线程在出队阻塞中
                // 此时挂住当前线程,等待另外一个线程出队完成
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                    // 等待队列头元素生效
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
        // 当前线程出队完成,通知其余出队阻塞的线程继续执行
            available.signal();
        lock.unlock();
    }
}

经过了以前的烧脑逻辑以后,再看这个就简单不少了

1. 队列为空,则阻塞,直到有个线程完成入队操做

2. 获取队列头,若队列头已生效,则直接返回

3. 若队列头未生效

  1. 如有另外一个线程已经处于等待队列头生效的阻塞过程当中,则阻塞当前线程,直到另外一个线程完成出队操做

  2. 若没有其余线程阻塞在出队过程当中,即当前线程为第一个获取队列头的线程

    • 标识当前线程处于等待队列头生效的阻塞中(leader = thisThread
    • 阻塞当前线程,等待队列头生效
    • 队列头生效以后,清空标识(leader=null)
    • 再次进入循环,获取队列头并返回
  3. 最后步骤1中被阻塞的线程


所以能够愉快的解答上面的四个问题

添加一个元素到队列头

  1. leader赋值为空 (这个是干吗的,为何这么作?)

    leader记录了被阻塞在等待队列头生效的线程
    新增一个元素到队列头,表示等待原来队列头生效的阻塞的线程已经失去了阻塞的意义,此时须要获取新的队列头进行返回了
  2. available.signal() 唤醒被阻塞的线程(什么线程被阻塞?)

    获取队列头的线程被唤起,主要有两种场景:
    1. 以前队列为空,致使被阻塞的线程
    2. 以前队列非空,可是队列头没有生效致使被阻塞的线程

普通的出队方法

  1. 队列中元素getDelay()方法返回值会变么,由谁来改变呢?

    必须得变,不然这个元素一直不生效,将直接致使线程一直阻塞
    由队列中的元素实现类来保证,返回值是逐渐变小的
  2. 上面的出队没有阻塞,直接返回了null

    须要阻塞获取队列头,用 `take`, `poll(long,TimeUnit)`来代替

II. 应用场景及使用case

上面分析的是阻塞队列的实现原理,接下来举一个实例来解析下这个延迟阻塞队列的使用姿式,加深下理解

1. 一个实例场景

(简化了在简化以后的,与实际会有一些区别,请勿彻底认定合理)

好比和电商的详情页展现,为了提升应用的性能,咱们将整个页面进行了缓存,当详情页发生修改后,咱们会更新缓存的内容

所以为了保证缓存的内容和实际的内容是一致的,咱们须要一个对帐的任务,当详情页修改后,而且更新缓存完成以后,咱们须要再次对比缓存和实际内容的一致性;

此时一个异步的任务能够这么设计:监听详情页修改的事件,延迟一段时间,而后再对比缓存和实际内容的一致性 (这里延迟一段时间主要是为了保证缓存已经更新完成)

2. 实现

详情信息 DetailInfo

@Data
@NoArgsConstructor
@AllArgsConstructor
public class DetailInfo {

    private int itemId;

    private String title;

    private String desc;

    private int price;

}

延迟队列的元素 UpdateTask

注意其中 getDelay() 的实现逻辑,根据当前时间与预订的延迟生效时间进行比较

@Data
@AllArgsConstructor
public class UpdateTask implements Delayed {

    private int itemId;

    private long delayTime;


    @Override
    public long getDelay(TimeUnit unit) {
        return delayTime - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (getDelay(TimeUnit.MICROSECONDS) - o.getDelay(TimeUnit.MICROSECONDS));
    }
}

主业务逻辑

更新事件的监听订阅使用了 Guava的EventBus来处理,若有疑问能够搜索EventBus的使用姿式

public class DetailManager {
    // 模拟真实数据存储空间
    private Map<Integer, DetailInfo> realMap = new ConcurrentHashMap<>();
    // 模拟缓存空间
    private Map<String, String> cache = new ConcurrentHashMap<>();

    private Gson gson = new Gson();

    private String getCacheKey(int itemId) {
        return "detailInfo_" + itemId;
    }

    // eventBus 用于发送更新事件;异步接受更新事件
    private AsyncEventBus eventBus;
    private void init() {
        DetailInfo detailInfo = new DetailInfo(1, "onw", "第一个测试", 100);
        DetailInfo detailInfo2 = new DetailInfo(2, "two", "第二个测试", 200);

        realMap.put(detailInfo.getItemId(), detailInfo);
        realMap.put(detailInfo2.getItemId(), detailInfo2);

        cache.put(getCacheKey(detailInfo.getItemId()), gson.toJson(detailInfo));
        cache.put(getCacheKey(detailInfo2.getItemId()), gson.toJson(detailInfo2));

        eventBus = new AsyncEventBus("Validate-Thread", 
            Executors.newFixedThreadPool(2));
        eventBus.register(this);
    }

    // 模拟更新商品
    public void updateDetail(int itemId) {
        DetailInfo detailInfo = realMap.get(itemId);
        long now = System.currentTimeMillis();
        detailInfo.setTitle("title_" + itemId + "_" + now);
        cache.put(getCacheKey(itemId), gson.toJson(detailInfo));

        // 发送一个修改的事件
        eventBus.post(new UpdateTask(itemId, now + 5000));
        System.out.println("[UpdateInfo]>>>ItemId: " + itemId + " updateTime: " + now + " validateTime: " + (now + 5000));
    }

    // 延迟队列
    private DelayQueue<UpdateTask> delayQueue = new DelayQueue<>();
    /**
     * 监听修改事件
     * @param updateTask
     */
    @Subscribe
    public void verify(UpdateTask updateTask) {
        long getTaskTime = System.currentTimeMillis();
        delayQueue.put(updateTask);

        try {
            UpdateTask task = delayQueue.take();
            long processTime = System.currentTimeMillis();


            DetailInfo real = realMap.get(task.getItemId());
            String cacheObj = cache.get(getCacheKey(task.getItemId()));
            boolean ans = gson.toJson(real).equals(cacheObj);
            System.out.println("validate itemId: " + updateTask.getItemId() +
                    " getEventTime: " + getTaskTime +
                    " processTime:" + processTime + " ans: " + ans);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        DetailManager detailManager = new DetailManager();
        detailManager.init();

        // 开始修改
        detailManager.updateDetail(1);
        Thread.sleep(20);
        detailManager.updateDetail(2);

        Thread.sleep(35000);
    }
}

简单说明主流程

  1. 首先初始化DetailManager
  2. 更新两个商品,在更新的逻辑中实现如下步骤
  • 更新实际的商品内容
  • 更新缓存的内容
  • 发送一条商品更新的消息
  1. 异步监听更新消息任务逻辑
  • 将消息塞入延迟队列
  • 从延迟队列中后去已经生效的消息,而后对帐

输出结果以下

[UpdateInfo]>>>ItemId: 1 updateTime: 1508677959067 validateTime: 1508677964067
[UpdateInfo]>>>ItemId: 2 updateTime: 1508677959103 validateTime: 1508677964103
Thread[pool-1-thread-1,5,main]>>> validate itemId: 1 getEventTime: 1508677959078 processTime:1508677964067 ans: true
Thread[pool-1-thread-2,5,main]>>> validate itemId: 2 getEventTime: 1508677964067 processTime:1508677964103 ans: true

从上面的输出能够得知,实际验证的时间戳和预期的时间错是相同的

III. 小结

延迟阻塞队列DelayQueue,学习下来以后感受很是有意思,首先是加深了使用姿式的了解,其次对其中的阻塞,唤醒机制有了必定了解,涨了锁使用知识的见识(这里面还有一个很是有意思的东西就是 ConditionReentrantLock的使用,后续线程安全篇的研究能够以此做为应用场景)

简单小结上面的学习内容

  1. 队列中更不能有null
  2. 底层使用的是优先级队列 PriorityQueue
  3. 经过锁来实现线程安全
  4. 须要使用阻塞的获取元素时,请使用 take(), poll(long, TimeUnit)两方法之一
  5. 要求延迟阻塞队列的元素实现 Delayed接口,内部实现的getDelay方法,要求返回值愈来愈小(若是一直大于0,这个延迟任务就一直没法执行了)

扫描关注,java分享

QrCodeGG

相关文章
相关标签/搜索