Java并发包源码学习系列:阻塞队列实现之DelayQueue源码解析

系列传送门:java

DelayQueue概述

DelayQueue是一个支持延时获取元素的无界阻塞队列,使用PriorityQueue来存储元素。编程

队中的元素必须实现Delayed接口【Delay接口又继承了Comparable,须要实现compareTo方法】,每一个元素都须要指明过时时间,经过getDelay(unit)获取元素剩余时间【剩余时间 = 到期时间 - 当前时间】,每次向优先队列中添加元素时根据compareTo方法做为排序规则。缓存

当从队列获取元素时,只有过时的元素才会出队列。并发

使用场景: 缓存系统设计、定时任务调度等。app

类图及重要字段

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    // 独占锁实现同步
    private final transient ReentrantLock lock = new ReentrantLock();
    // 优先队列存放数据
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    /**
     * 基于Leader-Follower模式的变体,用于尽可能减小没必要要的线程等待
     */
    private Thread leader = null;

    /**
     * 与lock对应的条件变量
     */
    private final Condition available = lock.newCondition();    
}
  1. 使用ReentrantLock独占锁实现线程同步,使用Condition实现等待通知机制。
  2. 基于Leader-Follower模式的变体,减小没必要要的线程等待。
  3. 内部使用PriorityQueue优先级队列存储元素,且队列中元素必须实现Delayed接口。

Delayed接口

队中的元素必须实现Delayed接口【Delay接口又继承了Comparable,须要实现compareTo方法】,每一个元素都须要指明过时时间,经过getDelay(unit)获取元素剩余时间【剩余时间 = 到期时间 - 当前时间】。dom

每次向优先队列中添加元素时根据compareTo方法做为排序规则,固然咱们约定一下,默认q.peek()出来的就是最早过时的元素。ide

public interface Delayed extends Comparable<Delayed> {
    // 返回剩余时间
    long getDelay(TimeUnit unit);
}

public interface Comparable<T> {
	// 定义比较方法
    public int compareTo(T o);
}

Delayed元素案例

学习了Delayed接口以后,咱们看一个实际的案例,加深印象,源于:《Java并发编程之美》。工具

static class DelayedElement implements Delayed {

        private final long delayTime; // 延迟时间
        private final long expire; // 到期时间
        private final String taskName; // 任务名称

        public DelayedElement (long delayTime, String taskName) {
            this.delayTime = delayTime;
            this.taskName = taskName;
            expire = now() + delayTime;
        }

        final long now () {
            return System.currentTimeMillis();
        }

        // 剩余时间 = 到期时间 - 当前时间
        @Override
        public long getDelay (TimeUnit unit) {
            return unit.convert(expire - now(), TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo (Delayed o) {
            return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
        }

        @Override
        public String toString () {
            final StringBuilder res = new StringBuilder("DelayedElement [ ");
            res.append("delay = ").append(delayTime);
            res.append(", expire = ").append(expire);
            res.append(", taskName = '").append(taskName).append('\'');
            res.append(" ] ");
            return res.toString();
        }
    }


    public static void main (String[] args) {
        // 建立delayQueue队列
        DelayQueue<DelayedElement> delayQueue = new DelayQueue<>();

        // 建立延迟任务
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            DelayedElement element = new DelayedElement(random.nextInt(500), "task: " + i);
            delayQueue.offer(element);
        }

        // 依次取出任务并打印
        DelayedElement ele = null;
        try {
            for (; ; ) {
                while ((ele = delayQueue.take()) != null) {
                    System.out.println(ele);
                }
            }
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }
// 打印结果
DelayedElement [ delay = 2, expire = 1611995426061, taskName = 'task: 4' ] 
DelayedElement [ delay = 52, expire = 1611995426111, taskName = 'task: 2' ] 
DelayedElement [ delay = 80, expire = 1611995426139, taskName = 'task: 5' ] 
DelayedElement [ delay = 132, expire = 1611995426191, taskName = 'task: 0' ] 
DelayedElement [ delay = 174, expire = 1611995426233, taskName = 'task: 9' ] 
DelayedElement [ delay = 175, expire = 1611995426234, taskName = 'task: 7' ] 
DelayedElement [ delay = 326, expire = 1611995426385, taskName = 'task: 3' ] 
DelayedElement [ delay = 447, expire = 1611995426506, taskName = 'task: 8' ] 
DelayedElement [ delay = 452, expire = 1611995426511, taskName = 'task: 1' ] 
DelayedElement [ delay = 486, expire = 1611995426545, taskName = 'task: 6' ]
  • 实现了compareTo方法,定义比较规则为越早过时的排在队头。
  • 实现了getDelay方法,计算公式为:剩余时间 = 到期时间 - 当前时间。

构造器

DelayQueue构造器相比于前几个,就显得很是easy了。学习

public DelayQueue() {}

    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }

void put(E e)

由于DelayQueue是无界队列,不会由于边界问题产生阻塞,所以put操做和offer操做是同样的。ui

public void put(E e) {
        offer(e);
    }

    public boolean offer(E e) {
        // 获取独占锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 加入优先队列里
            q.offer(e);
            // 判断堆顶元素是否是刚刚插入的元素
            // 若是判断为true,说明当前这个元素是将最早过时
            if (q.peek() == e) {
                // 重置leader线程为null
                leader = null; 
                // 激活available变量条件队列中的一个线程
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

E take()

take方法将会获取并移除队列里面延迟时间过时的元素 ,若是队列里面没有过时元素则陷入等待。

public E take() throws InterruptedException {
        // 获取独占锁
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                // 瞅一瞅谁最快过时
                E first = q.peek();
                // 队列为空,则将当前线程置入available的条件队列中,直到里面有元素
                if (first == null)
                    available.await();
                else {
                    // 看下还有多久过时
                    long delay = first.getDelay(NANOSECONDS);
                    // 哇,已通过期了,就移除它并返回
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    // leader不为null表示其余线程也在执行take
                    // 则将当前线程置入available的条件队列中
                    if (leader != null)
                        available.await();
                    else {
                        // 若是leader为null,则选择当前线程做为leader线程
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 等待delay时间,时间到以后,会出条件队列,继续竞争锁
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

first = null 有什么用

若是不设置first = null,将会引发内存泄露。

  • 线程A到达,队首元素没有到期,设置leader = 线程A,而且执行available.awaitNanos(delay);等待元素过时。
  • 这时线程B来了,由于leader != null,则会available.await();阻塞,线程C、D、E同理。
  • 线程A阻塞完毕了,再次循环,获取列首元素成功,出列。

这个时候列首元素应该会被回收掉,可是问题是它还被线程B、线程C持有着,因此不会回收,若是线程增多,且队首元素无限期的不能回收,就会形成内存泄漏。

总结

DelayQueue是一个支持延时获取元素无界阻塞队列,使用PriorityQueue来存储元素。

队中的元素必须实现Delayed接口【Delay接口又继承了Comparable,须要实现compareTo方法】,每一个元素都须要指明过时时间,经过getDelay(unit)获取元素剩余时间【剩余时间 = 到期时间 - 当前时间】,每次向优先队列中添加元素时根据compareTo方法做为排序规则。

基于Leader-Follower模式使用leader变量,减小没必要要的线程等待。

DelayQueue是无界队列,所以插入操做是非阻塞的。可是take操做从队列获取元素时,是阻塞的,阻塞规则为:

  • 当一个线程调用队列的take方法,若是队列为空,则将会调用 available.await()陷入阻塞。
  • 若是队列不为空,则查看队列的队首元素是否过时,根据getDelay的返回值是否小于0判断,若是过时则返回该元素。
  • 若是队首元素未过时,则判断当前线程是否为leader线程,若是不是,代表有其余线程在执行take操做,就调用available.await()陷入阻塞。
  • 若是没有其余线程在执行take,就将当前线程设置为leader,并等待队首元素过时,available.awaitNanos(delay)
  • leader线程退出take以后,将会调用available.signal()唤醒一个follower线程,接着回到开始那步。

参考阅读

相关文章
相关标签/搜索