PriorityBlockingQueue 是带优先级的无界阻塞队列,每次出队都返回优先级最高的元素,是二叉树最小堆的实现。 PriorityBlockingQueue 数据结构和 PriorityQueue 一致,而线程安全性使用的是 ReentrantLock。html
// 最大可分配队列容量 Integer.MAX_VALUE - 8,减 8 是由于有的 VM 实如今数组头有些内容 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; // 默认队列容量11,这里不是 HashMap,不须要 hash 取余,所以没必要是 2^n private static final int DEFAULT_INITIAL_CAPACITY = 11; // 数组结构,是二叉树最小堆的实现 private transient Object[] queue; private transient int size;
PriorityBlockingQueue 使用 ReentrantLock 保证数据安全性,数据结构使用的是数组。PriorityBlockingQueue 数组的结构和 PriorityQueue 一致,是基于平衡二叉堆实现,父节点下标是 n,左节点则是 2n + 1,右节点是 2n + 2。queue[0] 永远都是最小的元素。java
public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; }
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; // 1. 扩容 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; // 2. 将节点 e 插入数据 array 的第 n 个位置 if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; }
代码很简单,添加元素时 offer 作了两件事:一是判断是否须要扩容(tryGrow),二是将元素 e 插入到数组中(siftUpComparable)。先看一下如何进行扩容的,至于元素添加在 poll 时再一块儿分析。数组
// 集合中元素个数 size>=queue.length 则进行扩容 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); // tryGrow 最终只有一个线程能扩容成功,其它线程经过 while 自旋检查当前扩容是否完毕 private void tryGrow(Object[] array, int oldCap) { // 1. 释放锁,这样在数组扩容期间其它线程能够正常出队 lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; // 2. allocationSpinLock 是数组扩容的独占锁 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { // oldGap<64则扩容新增oldcap+2,否者扩容50%,而且最大为MAX_ARRAY_SIZE int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } // 3.1 若是 oldCap=MAX_ARRAY_SIZE 则 newCap 就会变成负数 // 3.2 若是 queue 已经改变,则有其它线程已经完成扩容 ok // 线程1已经完成扩容,线程2执行到这里时 queue=newArray if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } // 4. 线程1 cas 成功后,线程2会进入这个地方,而后线程2让出 cpu // 尽可能让线程1执行下面代码获取锁,可是这得不到确定的保证。 if (newArray == null) // back off if another thread is allocating Thread.yield(); // 5. 从新获取锁,只有一个线程能够最终完成数组的扩容。 // cas 只进行了数组的初始化,即 newArray=new Object[newCap],可能有多个线程都成功了 lock.lock(); // 6. 数组元素拷贝到新数组中,完成扩容。可能有多个线程都初始化了 newArray if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } }
tryGrow 目的是扩容,这里要思考下为啥在扩容前要先释放锁,而后使用 cas 控制只有一个线程能够扩容成功呢?安全
其实这里不先释放锁也是能够的,也就是在整个扩容期间一直持有锁,可是扩容是须要花时间的,若是扩容的时候还占用锁,那么其余线程在这个时候是不能进行出队和入队操做的,这大大下降了并发性。数据结构
因此在扩容前释放锁,这容许其余出队线程能够进行出队操做,可是因为释放了锁,因此也容许在扩容时候进行入队操做,这就会致使多个线程进行扩容会出现问题,因此这里使用了一个 spinlock 用 cas 控 制只有一个线程能够进行扩容,失败的线程调用 Thread.yield() 让出 cpu,目的意在让扩容线程扩容后优先调用 lock.lock 从新获取锁,可是这得不到必定的保证,有可能调用 Thread.yield() 的线程先获取了锁。若是这时候扩容线程还没扩容完毕,其余线程是经过自旋检查当前扩容是否完毕。并发
那 copy 元素数据到新数组为啥放到获取锁后面那?缘由应该是由于可见性问题,由于 queue 并无被 volatile 修饰。另外有可能在扩容时候进行了出队操做,若是直接拷贝可能看到的数组元素不是最新的。而经过调用 Lock 后,获取的数组则是最新的,而且在释放锁前 数组内容不会变化。oop
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return dequeue(); } finally { lock.unlock(); } } // 出队 private E dequeue() { int n = size - 1; // 1. 没有元素直接返回 null if (n < 0) return null; else { Object[] array = queue; // 2. array[0] 永远都是最小的元素 E result = (E) array[0]; E x = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; // 3. 由于 array[0] 已经出队,如今须要将元素 array[n] 插入到 0 这个位置 if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; // 4. 返回 array[0] return result; } }
// 数组结构,是二叉树最小堆的实现,array[0] 永远是优先级最高的元素 private transient Object[] queue; // offer 时将元素 e 插入到节点 n 位置 siftUpComparable(n, e, array); // poll 时将最后一个元素 array[n] 插入到 0 位置 siftDownComparable(0, x, array, n);
(1) siftUpComparable源码分析
// 将元素 x 插入数据 array 的第 k 个位置 private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key; }
这个排序看过去有些奇怪,怎么有 parent,而且下标是 (k-1)>>>1 呢?其实这里的操做就反应了优先队列的真正数据结构,其其实是一个二叉树,将二叉树存储在数组之中而已。根节点就是数组的 0 位。下图给出其具体结构:ui
PriorityQueue 是一个彻底二叉树,且不容许出现 null 节点,其父节点都比叶子节点小,这个是堆排序中的小顶堆。二叉树存入数组的方式很简单,就是从上到下,从左到右。彻底二叉树能够和数组中的位置一一对应:this
如今在看 siftUpComparable 代码就轻松多了,实际上就是将要插入的元素 x 和它的父节点作对比,若是比父节点大就一直向上移动。由于比较后元素是在向上移动,因此叫 siftUpComparable
(2) siftDownComparable
// 将元素 x 插入数据 array 的第 k 个位置,n 表示当前数组的最后一个位置 private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; int half = n >>> 1; // loop while a non-leaf while (k < half) { int child = (k << 1) + 1; // assume left child is least Object c = array[child]; int right = child + 1; if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; } }
siftDownComparable(0, x, array, n) 将元素 x 和第 0 位的左右子节点进行比较,若是 x 大于这两个子节点则向下移动,小的子节点则上移。这样 array[0] 又变成最小的值了。
参考:
天天用心记录一点点。内容也许不重要,但习惯很重要!