【并发队列】无界阻塞优先级队列 PriorityBlockingQueue源码解析

1、 前言

PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最高的元素,是二叉树最小堆的实现,研究过数组方式存放最小堆节点的都知道,直接遍历队列元素是无序的。java

2、 PriorityBlockingQueue类图结构

如图PriorityBlockingQueue内部有个数组queue用来存放队列元素,size用来存放队列元素个数,allocationSpinLockOffset是用来在扩容队列时候作cas的,目的是保证只有一个线程能够进行扩容。算法

因为这是一个优先级队列因此有个比较器comparator用来比较元素大小。lock独占锁对象用来控制同时只能有一个线程能够进行入队出队操做。notEmpty条件变量用来实现take方法阻塞模式。这里没有notFull 条件变量是由于这里的put操做是非阻塞的,为啥要设计为非阻塞的是由于这是无界队列。
最后PriorityQueue q用来搞序列化的。数组

以下构造函数,默认队列容量为11,默认比较器为null;并发

private static final int DEFAULT_INITIAL_CAPACITY = 11;
 
 
    public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }
 
    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }
 
    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

 

 

3、 offer操做

在队列插入一个元素,因为是无界队列,因此一直为成功返回true;框架

public boolean offer(E e) {
 
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
 
    //若是当前元素个数>=队列容量,则扩容(1)
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
 
 
    try {
        Comparator<? super E> cmp = comparator;
 
        //默认比较器为null
        if (cmp == null)(2)
            siftUpComparable(n, e, array);
        else
            //自定义比较器(3)
            siftUpUsingComparator(n, e, array, cmp);
 
        //队列元素增长1,而且激活notEmpty的条件队列里面的一个阻塞线程
        size = n + 1;(9)
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

 

主流程比较简单,下面看看两个主要函数函数

private void tryGrow(Object[] array, int oldCap) {
    lock.unlock(); //must release and then re-acquire main lock
    Object[] newArray = null;
 
    //cas成功则扩容(4)
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            //oldGap<64则扩容新增oldcap+2,否者扩容50%,而且最大为MAX_ARRAY_SIZE
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }
 
    //第一个线程cas成功后,第二个线程会进入这个地方,而后第二个线程让出cpu,尽可能让第一个线程执行下面点获取锁,可是这得不到确定的保证。(5)
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    lock.lock();(6)
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }

tryGrow目的是扩容,这里要思考下为啥在扩容前要先释放锁,而后使用cas控制只有一个线程能够扩容成功。个人理解是为了性能,由于扩容时候是须要花时间的,若是这些操做时候还占用锁那么其余线程在这个时候是不能进行出队操做的,也不能进行入队操做,这大大下降了并发性。oop

因此在扩容前释放锁,这容许其余出队线程能够进行出队操做,可是因为释放了锁,因此也容许在扩容时候进行入队操做,这就会致使多个线程进行扩容会出现问题,因此这里使用了一个spinlock用cas控制只有一个线程能够进行扩容,失败的线程调用Thread.yield()让出cpu,目的意在让扩容线程扩容后优先调用lock.lock从新获取锁,可是这得不到必定的保证,有可能调用Thread.yield()的线程先获取了锁。性能

那copy元素数据到新数组为啥放到获取锁后面那?缘由应该是由于可见性问题,由于queue并无被volatile修饰。另外有可能在扩容时候进行了出队操做,若是直接拷贝可能看到的数组元素不是最新的。而经过调用Lock后,获取的数组则是最新的,而且在释放锁前 数组内容不会变化。ui

具体建堆算法:this

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
 
    //队列元素个数>0则判断插入位置,否者直接入队(7)
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;(8)
}


第一次offer(2)时候下面用图说话模拟下过程:
假设队列容量为2

执行(1)为false因此执行(2),因为k=n=size=0;因此执行(8)元素入队,然执行(9)size+1;
如今队列状态:

  • 第二次offer(4)时候

执行(1)为false,因此执行(2)因为k=1,因此进入while循环,parent=0;e=2;key=4;key>e因此break;而后把4存到数据下标为1的地方,这时候队列状态为:

  • 第三次offer(6)时候

执行(1)为true,因此调用tryGrow,因为2<64因此newCap=2 + (2+2)=6;而后建立新数组并拷贝,而后调用siftUpComparable;k=2>0进入循环 parent=0;e=2;key=6;key>e因此break;而后把6放入下标为2的地方,如今队列状态:

  • 第四次offer(1)时候

执行(1)为false,因此执行(2)因为k=3,因此进入while循环,parent=0;e=2;key=1; key<e;因此把2复制到数组下标为3的地方,而后k=0退出循环;而后把2存放到下标为0地方,如今状态:

4、 poll操做

在队列头部获取并移除一个元素,若是队列为空,则返回null

 

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}

主要看dequeue

private E dequeue() {
 
    //队列为空,则返回null
    int n = size - 1;
    if (n < 0)
        return null;
    else {
        //获取队头元素(1)
        Object[] array = queue;
        E result = (E) array[0];
 
        //获取对尾元素,并值null(2)
        E x = (E) array[n];
        array[n] = null;
 
        Comparator<? super E> cmp = comparator;
        if (cmp == null)//cmp=null则调用这个,把对尾元素位置插入到0位置,而且调整堆为最小堆(3)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;(4)
        return result;
    }
}

private static <T> void siftDownComparable(int k, T x, Object[] array,
                                            int n) {
     if (n > 0) {
         Comparable<? super T> key = (Comparable<? super T>)x;
         int half = n >>> 1;           // loop while a non-leaf
         while (k < half) {
             int child = (k << 1) + 1; // assume left child is least
             Object c = array[child];(5)
             int right = child + 1;(6)
             if (right < n &&
                 ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)(7)
                 c = array[child = right];
             if (key.compareTo((T) c) <= 0)(8)
                 break;
             array[k] = c;
             k = child;
         }
         array[k] = key;(9)
     }
 }


第一次调用poll()下面用图说话模拟下过程:

首先执行(1) result=1;而后执行(2)x=2;这时候队列状态

而后执行(3)后状态为:

执行(4)后的结果:

下面重点说说siftDownComparable这个屌屌的创建最小堆的算法:

首先说下思想,其中k一开始为0,x为数组里面最后一个元素,因为第0个元素为树根,被出队时候要被搞掉,因此建堆要从它的左右孩子节点找一个最小的值来当树根,子树根被搞掉后,会找子树的左右孩子最小的元素来代替,直到树节点为止,还不明白,不要紧,看图说话:
假如当前队列元素:

那么对于树为:

这时候若是调用了poll();那么result=2;x=11;如今树为:

而后看leftChildVal = 4;rightChildVal = 6; 4<6;因此c=4;也就是获取根节点的左右孩子值小的那一个; 而后看11>4也就是key>c;而后把c放入树根,如今树为:

而后看根的左边孩子4为根的子树咱们要为这个字树找一个根节点

看leftChildVal = 8;rightChildVal = 10; 8<10;因此c=8;也就是获取根节点的左右孩子值小的那一个; 而后看11>8也就是key>c;而后把c放入树根,如今树为:

这时候k=3;half=3因此推出循环,执行(9)后结果为:

这时候队列为:

5、 put操做

内部调用的offer,因为是无界队列,因此不须要阻塞

public void put(E e) {
    offer(e); // never need to block
}


获取队列头元素,若是队列为空则阻塞。

6、 take操做

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
 
        //若是队列为空,则阻塞,把当前线程放入notEmpty的条件队列
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}


7、 size操做

这里是阻塞实现,阻塞后直到入队操做调用notEmpty.signal 才会返回。

获取队列元个数,因为加了独占锁因此返回结果是精确的

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return size;
    } finally {
        lock.unlock();
    }
}

8、 开源框架中使用

9、总结

PriorityBlockingQueue相似于ArrayBlockingQueue内部使用一个独占锁来控制同时只有一个线程能够进行入队和出队,另外前者只使用了一个notEmpty条件变量而没有notFull这是由于前者是无界队列,当put时候永远不会处于await因此也不须要被唤醒。

PriorityBlockingQueue始终保证出队的元素是优先级最高的元素,而且能够定制优先级的规则,内部经过使用一个二叉树最小堆算法来维护内部数组,这个数组是可扩容的,当当前元素个数>=最大容量时候会经过算法扩容。

值得注意的是为了不在扩容操做时候其余线程不能进行出队操做,实现上使用了先释放锁,而后经过cas保证同时只有一个线程能够扩容成功。

相关文章
相关标签/搜索