DelayQueue系列(一):源码分析

本文原发表于简书DelayQueue之源码分析安全

本文将会对DelayQueue作一个简单的介绍,并提供部分源码的分析。bash

DelayQueue的特性基本上由BlockingQueue、PriorityQueue和Delayed的特性来决定的。并发

简而言之,DelayQueue是经过Delayed,使得不一样元素之间能按照剩余的延迟时间进行排序,而后经过PriorityQueue,使得超时的元素能最早被处理,而后利用BlockingQueue,将元素处理的操做阻塞住。源码分析

基本定义以下:ui

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    private Thread leader = null;
    private final Condition available = lock.newCondition();
}
复制代码

ReentrantLock lock = new ReentrantLock(); ReentrantLock是一个可重入的互斥锁,将由最近成功得到锁,而且尚未释放该锁的线程所拥有,当锁被其余线程得到时,调用lock的线程将没法得到锁。 在DelayQueue中,只有一个互斥锁lock。this

PriorityQueue q = new PriorityQueue(); PriorityQueue是一个优先级队列,每次从队列中取出的是具备最高优先权的元素。 在DelayQueue中,由于E继承于Delayed,因此q表示一个按照delayTime排序的优先级队列,用于存放须要延迟执行的元素。spa

Thread leader = null; 这里的leader设计出来是为了minimize unnecessary timed waiting(减小没必要要的等待时间),如何实现的方案会在详细解读中解释。 在DelayQueue中leader表示一个等待从队列中获取消息的线程。线程

Condition available = lock.newCondition(); Condition是lock对象的条件变量,只能和锁lock配合使用,用于控制并发程序访问竞争资源的安全。 一个锁lock能够有多个条件变量condition,每一个条件上能够有多个线程等待,经过调用await()方法,可让线程在该条件下等待。当调用signalAll()方法,又能够唤醒该条件下的等待的线程。 在DelayQueue中lock对象只有一个条件变量available。设计

如下是DelayQueue的主要方法:code

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}
复制代码

一、执行lock.lock(),获取锁。

二、把元素e添加到优先队列q(下称队列q)中。

三、判断队列q的队首元素是否为e。

四、若是e是队首元素的话,即元素e是最近可被执行的元素,意味着延迟队列的执行顺序将被变动。 执行leader = null,不然在执行take时,全部线程就会在if(leader!=null)的判断下进入等待。 执行available.signal(),唤醒其余等待中的线程,从新去循环执行take中的操做1-8。 若是不执行signal,那么在take方法中,只有执行awaitNanos(delay)的线程在等待delay指定的时间后自动唤醒,其余执行await的线程将一直被挂起。 若是没有新的线程去执行take方法,那么等待执行awaitNanos(delay)的线程自动唤醒时,此时等待时间将超过元素e的delayTime,这不符合预期。 即使有新的线程去执行take方法,那以前挂起的线程也将一直在等待,效率很低。

五、在finally块中执行lock.unlock()。 须要注意的是,锁必须在 finally 块中释放。不然,若是代码抛出异常,那么锁就有可能永远得不到释放。若是没有释放锁,那么就会产生死锁的问题。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } 复制代码

一、执行lock.lockInterruptibly(),获取锁。 lockInterruptibly和lock的区别在于 lock 在锁被其余线程占有,当前线程等待锁期间(下称等待锁期间),只考虑获取锁。只有在获取锁成功后,才会去响应中断。 而lockInterruptibly 在等待锁期间,会优先考虑响应中断,而不是响应锁的获取。若是当前线程被打断(interrupt)则该方法抛出InterruptedException。该方法提供了一种解除死锁的途径。

二、E first = q.peek(),获取队列q的队首元素first(下称first)。

三、若是first为空,则执行avaliable.await()让线程进入等待。实际上就是释放锁,而后挂起线程,等待被唤醒,此时其余线程能够得到锁了。 await()和awaitNanos(nanosTimeout)区别在于 执行awaitNanos(nanosTimeout)的线程比执行await()的线程多一个唤醒条件,超过等待nanosTimeout指定的时间,线程将自动唤醒。线程唤醒时,保证该线程是持有锁的。

四、若是first不为空,则执行first.getDelay(NANOSECONDS)获取first的剩余延迟时间delayTime(下称delayTime)

五、若是first的delayTime<=0,代表该元素已经达到以前设定的延迟时间了,则调用return q.poll(),将first从队列q中的移除而且返回该元素first.

六、若是first的delayTime>0,则将first指向null,释放first的引用,避免内存泄露.

七、若是线程leader(下称leader)不为空的话,则执行avaliable.await()让线程进入等待。leader不为空的话,代表已经有其余线程在获取优先队列q的队首元素了(下称获取队首元素),此时只须要执行avaliable.await()让当前线程进入等待便可。

八、若是leader为空,则执行Thread thisThread = Thread.currentThread();leader = thisThread;将leader指向当前线程,而后执行available.awaitNanos(delay);让线程最长等待delayTime的时间。最后在finally块中,若是leader依然指向前文获取的当前线程thisThread,那么将leader指向null,释放leader引用。 这里leader为空,代表还没有有其余线程在获取队首元素,此时设置leader对象,指向当前线程(下称currentThread)。由于currentThread执行了available.awaitNanos(delay)释放了锁,因此其余线程(下称otherThread)在调用take方法时能获取锁,可是由于leader非空,因此otherThread都会进入7的那步,直接进入等待,而不须要像currentThread那样执行8的一系列操做,达到设计leader线程的初衷。

九、循环执行以上1-8步,直到first非空且first的delayTime<=0,跳出循环。

十、跳出循环后,进入finally块。

十一、若是leader为空且队列q的队首元素非null(q队列中移除了上文的first元素后还有其余元素),此时执行available.signal(),调用signal唤醒其余等待中的线程。

十二、执行lock.unlock(),执行解锁操做。

ok,源码分析就先讲到这里了,下一期我准备讲一下如何将DelayQueue封装成可用的组件,让使用者调用起来更加方便。

相关文章
相关标签/搜索