JUC阻塞队列之DelayQueue源码分析

DelayQueue是一个支持延时获取元素的无界阻塞队列。而且队列中的元素必须实现Delayed接口。在建立元素时能够指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中获取到元素。DelayQueue的应用范围很是广阔,如能够用它来保存缓存中元素的有效期,也可用它来实现定时任务。缓存

Delayed接口

在分析DelayQueue源码以前,咱们先来看看Delayd接口,其源码定义以下:bash

public interface Delayed extends Comparable < Delayed > {

    /**
     * 指定返回对象的延时时间
     * @param  unit [时间单位]
     * @return      [延时的剩余,0或者-1表示延时已通过期]
     */
    long getDelay(TimeUnit unit);
}
复制代码

咱们看到,Delayed接口继承了Comparable接口,即实现Delayed接口的对象必须实现**getDelay(TimeUnit unit)方法和compareTo(T o)方法。这里compareTo(T o)**方法能够用来实现元素的排序,能够将延时时间长的放到队列的末尾。并发

DelayQueue构造函数

上面分析了Delayed接口,接下来咱们分析DelayQueue的构造函数。DelayQueue提供了2种构造函数,一个是无参构造函数,一个是给定集合为参数的构造函数。其源码以下:函数

/**
 * 构建一个空的DelayQueue
 */
public DelayQueue() {}

/**
 * 给定集合c为参数的构造函数
 * 将集合c中的元素所有放入到DelayQueue中
 */
public DelayQueue(Collection < ? extends E > c) {
    this.addAll(c);
}
复制代码

addAll方法是AbstractQueue抽象类中的方法,其源码以下:ui

public boolean addAll(Collection < ? extends E > c) {
    // 参数检测
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    boolean modified = false;
    //遍历集合c中的元素
    for (E e: c)
        // 调用DelayQueue中的add方法
        if (add(e))
            modified = true;
    return modified;
}
复制代码

从上面的源码中,咱们能够看到,AbstractQueue抽象类中addAll方法实际是调用DelayQueue类中的add方法来实现的。this

DelayQueue 入列操做

DelayQueue提供了4中入列操做,分别是:spa

  • **add(E e):**阻塞的将制定元素添加到延时队列中去,由于队列是无界的所以此方法永不阻塞。
  • **offer(E e):**阻塞的将制定元素添加到延时队列中去,由于队列是无界的所以此方法永不阻塞。
  • **put(E e):**阻塞的将制定元素添加到延时队列中去,由于队列是无界的所以此方法永不阻塞。
  • **offer(E e, long timeout, TimeUnit unit):**阻塞的将制定元素添加到延时队列中去,由于队列是无界的所以此方法永不阻塞。

这里你们可能会奇怪,为何这些入列方法的解释都是同样的?这个问题先等下回答,咱们先来看看这几个入列方法的源码定义:线程

public boolean add(E e) {
    return offer(e);
}

public boolean offer(E e) {
    //获取可重入锁
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lock();
    try {
        //调用PriorityQueue中的offer方法
        q.offer(e);
        //调用PriorityQueue中的peek方法
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        //释放锁
        lock.unlock();
    }
}

public void put(E e) {
    offer(e);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e);
}
复制代码

这里咱们从源码中能够看到,**add(E e)**方法、**put(E e)方法和offer(E e,long timeout,TimeUnit unit)方法都是调用offer(E e)方法来实现的,这也是为何这几个方法的解释都是同样的缘由。其中offer(E e)方法的核心又是调用了PriorityQueue中的offer(E e)**方法,PriorityQueue和PriorityBlockingQueue都是以二叉堆的无界队列,只不过PriorityQueue不是阻塞的而PriorityBlockingQueue是阻塞的。code

DelayQueue出列操做

DelayQueue提供了3中出列操做方法,它们分别是:对象

  • **poll():**检索并删除此队列的开头,若是此队列没有延迟延迟的元素,则返回null
  • **take():**检索并除去此队列的头,若有必要,请等待直到该队列上具备过时延迟的元素可用。
  • **poll(long timeout, TimeUnit unit):**检索并删除此队列的头,若有必要,请等待直到该队列上具备过时延迟的元素可用,或者或指定的等待时间到期。

下面咱们来一个一个分析出列操做的原来。

poll():

poll操做的源码定义以下:

public E poll() {
   //获取可重入锁
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lock();
    try {
       //获取队列中的第一个元素
        E first = q.peek();
        //若果元素为null,或者头元素还未过时,则返回false
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
           //调用PriorityQueue中的出列方法
            return q.poll();
    } finally {
        lock.unlock();
    }
}
复制代码

该方法与PriorityQueue的poll方法惟一的区别就是多了**if (first == null || first.getDelay(NANOSECONDS) > 0)**这个条件判断,该条件是表示若是队列中没有元素或者队列中的元素未过时,则返回null。

take

take操做源码定义以下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lockInterruptibly();
    try {
    	//西循环
        for (;;) {
        	//查看队列头元素
            E first = q.peek();
            //若是队列头元素为null,则表示队列中没有数据,线程进入等待队列
            if (first == null)
                available.await();
            else {
            	// 获取first元素剩余的延时时间
                long delay = first.getDelay(NANOSECONDS);
                //若果剩余延时时间<=0 表示元素已通过期,能够从队列中获取元素
                if (delay <= 0)
                	//直接返回头部元素
                    return q.poll();
                //若果剩余延时时间>0,表示元素还未过时,则将first置为null,防止内存溢出
                first = null; // don't retain ref while waiting //若是leader不为null,则直接进入等待队列中等待 if (leader != null) available.await(); else { //若果leader为null,则把当前线程赋值给leader,并超时等待delay纳秒 Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) //唤醒线程 available.signal(); lock.unlock(); } } 复制代码

take操做比poll操做稍微要复杂些,可是逻辑仍是相对比较简单。只是在获取元素的时候先检查元素的剩余延时时间,若是剩余延时时间<=0,则直接返回队列头元素。若是剩余延时时间>0,则判断leader是否为null,若果leader不为null,则表示已经有线程在等待获取队列的头部元素,所以直接进入等待队列中等待。若果leader为null,则表示这是第一个获取头部元素的线程,把当前线程赋值给leader,而后超时等待剩余延时时间。在take操做中须要注意的一点是fist=null,由于若是first不置为null的话会引发内存溢出的异常,这是由于在并发的时候,每一个线程都会持有一份first,所以first不会被释放,若果线程数过多,就会致使内存溢出的异常。

poll(long timeout, TimeUnit unit)

超时等待获取队列元素的源码以下:

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null) {
                if (nanos <= 0)
                    return null;
                else
                    nanos = available.awaitNanos(nanos);
            } else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                if (nanos <= 0)
                    return null;
                first = null; // don't retain ref while waiting if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } 复制代码

这个出列操做的逻辑和take出列操做的逻辑几乎同样,惟一不一样的在于take是无时间限制等待,而改操做是超时等待。

总结

DelayQueue的入列和出列操做逻辑相对比较简单,就是在获取元素的时候,判断元素是否已通过期,若果过时就能够直接获取,没有过时的话poll操做是直接返回null,take操做是进入等待队列中等待。

相关文章
相关标签/搜索