public class PriorityQueue<E> extends AbstractQueue<E> implements java.io.Serializable { //队列的默认容量 private static final int DEFAULT_INITIAL_CAPACITY = 11; //底层用于存放数据的数组 transient Object[] queue; // non-private to simplify nested class access //队列中的元素数量计数 private int size = 0; //比较器 private final Comparator<? super E> comparator; //快速失败机制使用的变量 transient int modCount = 0; //建立一个默认容量的队列 public PriorityQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } //建立一个指定容量的队列 public PriorityQueue(int initialCapacity) { this(initialCapacity, null); } //建立一个指定比较器的默认容量队列 public PriorityQueue(Comparator<? super E> comparator) { this(DEFAULT_INITIAL_CAPACITY, comparator); } //建立一个指定比较器且指定容量队列 public PriorityQueue(int initialCapacity, Comparator<? super E> comparator) { //判断指定的容量值是否合法 if (initialCapacity < 1) throw new IllegalArgumentException(); this.queue = new Object[initialCapacity]; //初始化底层数组 this.comparator = comparator; //比较器初始化 } //建立一个带有指定集合中的元素的队列 @SuppressWarnings("unchecked") public PriorityQueue(Collection<? extends E> c) { //判断c是不是有序集合 //如果有序集合,那么就以其比较器做为队列的比较器 if (c instanceof SortedSet<?>) { SortedSet<? extends E> ss = (SortedSet<? extends E>) c; this.comparator = (Comparator<? super E>) ss.comparator(); initElementsFromCollection(ss); } //判断集合是不是优先级队列 //如果的话,直接使用该队列的比较器, else if (c instanceof PriorityQueue<?>) { PriorityQueue<? extends E> pq = (PriorityQueue<? extends E>) c; this.comparator = (Comparator<? super E>) pq.comparator(); initFromPriorityQueue(pq); } else { this.comparator = null; initFromCollection(c); } } //将容器c中的元素添加到优先级队列中 private void initElementsFromCollection(Collection<? extends E> c) { Object[] a = c.toArray(); // If c.toArray incorrectly doesn't return Object[], copy it. if (a.getClass() != Object[].class) a = Arrays.copyOf(a, a.length, Object[].class); int len = a.length; if (len == 1 || this.comparator != null) for (int i = 0; i < len; i++) if (a[i] == null) throw new NullPointerException(); this.queue = a; this.size = a.length; } //将优先级队列c中的元素添加到当前优先级队列中 private void initFromPriorityQueue(PriorityQueue<? extends E> c) { if (c.getClass() == PriorityQueue.class) { this.queue = c.toArray(); this.size = c.size(); } else { initFromCollection(c); } } //将容器c中的元素添加到优先级队列中 private void initFromCollection(Collection<? extends E> c) { initElementsFromCollection(c); heapify(); } //建立包含优先级队列c中元素的队列,且使用同一个比较器 @SuppressWarnings("unchecked") public PriorityQueue(PriorityQueue<? extends E> c) { this.comparator = (Comparator<? super E>) c.comparator(); initFromPriorityQueue(c); } //建立包含排序集合c中元素的优先级队列,且使用同一个比较器 @SuppressWarnings("unchecked") public PriorityQueue(SortedSet<? extends E> c) { this.comparator = (Comparator<? super E>) c.comparator(); initElementsFromCollection(c); } }
PriorityQueue中的入队方法分析:java
//add与offer没有区别 public boolean add(E e) { return offer(e); } public boolean offer(E e) { //队列中不容许有null元素 if (e == null) throw new NullPointerException(); modCount++; //快速失败机制 int i = size; //获取当前队列中元素个数 //判断数组是否须要扩容 if (i >= queue.length) grow(i + 1); size = i + 1; //元素计数+1 //新增元素的插入位置 //若队列本来为空,则直接放到0位置 //若队列本来不为空 if (i == 0) queue[0] = e; else siftUp(i, e); //插入数组 return true; } //扩容 private void grow(int minCapacity) { int oldCapacity = queue.length; //队列旧容量 //扩容机制,队列原容量小于64时,扩容为原来的2倍再加2 //大于64,则扩大1.5倍 int newCapacity = oldCapacity + ((oldCapacity < 64) ? (oldCapacity + 2) : (oldCapacity >> 1)); if (newCapacity - MAX_ARRAY_SIZE > 0) newCapacity = hugeCapacity(minCapacity); queue = Arrays.copyOf(queue, newCapacity); } //上浮 /** * 上浮过程 * 假设已有一个有序堆(升序)以下所示: * 10 * / \ * 20 40 * / \ / * 60 70 90 * 如今要将元素30插入堆中,则有 * 1.将要插入的30先放在二叉堆的末尾 * 2.再将其与父结点进行比较,判断是否要上浮(小于父结点就上浮) * 3.若小于父结点则交换位置,再重复第2步骤继续上浮 * 4.若大于则直接结束上浮 * 10 10 * / \ / \ * 20 40 ——> 20 30 * / \ / \ / \ / \ * 60 70 90 30 60 70 90 40 */ private void siftUp(int k, E x) { //判断队列是天然排序仍是比较器排序 if (comparator != null) siftUpUsingComparator(k, x); //比较器排序 else siftUpComparable(k, x); //天然排序 } //入队操做本质是一个堆排序中的一个上浮的过程 private void siftUpUsingComparator(int k, E x) { //判断索引位置是否大于0,便是否到达堆顶 while (k > 0) { int parent = (k - 1) >>> 1; Object e = queue[parent]; if (comparator.compare(x, (E) e) >= 0) break; queue[k] = e; k = parent; } queue[k] = x; } //另外一个上浮方法,使用的天然排序 private void siftUpComparable(int k, E x) { Comparable<? super E> key = (Comparable<? super E>) x; while (k > 0) { int parent = (k - 1) >>> 1; Object e = queue[parent]; if (key.compareTo((E) e) >= 0) break; queue[k] = e; k = parent; } queue[k] = key; }
PriorityQueue中的出队方法分析:算法
public E poll() { if (size == 0) //判断队列是不是空队列 return null; int s = --size; modCount++; E result = (E) queue[0]; //取出队首元素 E x = (E) queue[s]; //获取队尾元素 queue[s] = null; //队尾赋null //将本来的队尾元素放到堆顶,再对整个堆进行排序整理 //即下沉 if (s != 0) siftDown(0, x); //下沉方法 return result; } //下沉 /** * 下沉过程 * 假设已有一个有序堆(升序)以下所示: * 10 * / \ * 20 30 * / \ / \ * 60 70 90 40 * 如今要将元素10出队,则有 * 1.将要出队的10移除出二叉堆,并将队尾40放到堆顶 * 2.将堆顶元素与两个子结点中较小的元素相比较,选择小的元素做为新的堆顶元素 * 3.重复对堆中前一半结点进行将第2步的比较交换 * 40 20 * / \ / \ * 20 30 ——> 40 30 * / \ / / \ / * 60 70 90 60 70 90 */ private void siftDown(int k, E x) { if (comparator != null) siftDownUsingComparator(k, x); //比较器下沉 else siftDownComparable(k, x); //天然排序下沉 } //使用天然排序下沉 private void siftDownComparable(int k, E x) { Comparable<? super E> key = (Comparable<? super E>)x; int half = size >>> 1; //下沉要对堆中前一半的结点都进行 while (k < half) { int child = (k << 1) + 1; Object c = queue[child]; //获取当前结点的左孩子 int right = child + 1; //右孩子索引 //若存在右孩子,那么左右孩子先比较大小,取小再与父结点比较 if (right < size && ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0) c = queue[child = right]; //父结点与子结点比较 //若父结点小于子结点,则直接结束下沉的过程 //不然,交互位置后继续下沉操做 if (key.compareTo((E) c) <= 0) break; queue[k] = c; k = child; } queue[k] = key; } //使用比较器下沉 @SuppressWarnings("unchecked") private void siftDownUsingComparator(int k, E x) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = queue[child]; int right = child + 1; if (right < size && comparator.compare((E) c, (E) queue[right]) > 0) c = queue[child = right]; if (comparator.compare(x, (E) c) <= 0) break; queue[k] = c; k = child; } queue[k] = x; }
3.DelayQueue的继续体系api
了解了DelayQueue的底层实际是经过PriorityQueue实现,再来看看DelayQueue的继承关系,以下图所示,父类及接口以前的学习中都已分析过,不在赘言。数组
4.Delay接口安全
DelayQueue队列与其余队列最明显的不一样之处,就是它的延时功能,也正由于这个延时特色,DelayQueue中的对象都必需要实现Delay接口,接下来就看看这个Delay接口是干什么的。数据结构
//用来标记那些应该在给定延迟时间以后执行的对象 public interface Delayed extends Comparable<Delayed> { //检查延迟是否结束,该方法返回一个延迟时间,时间到后在检查还有没有 //延迟,若没有延迟执行下一步,若还有延迟,继续等待 long getDelay(TimeUnit unit); }
DelayQueue的使用示例:多线程
/** * 延迟队列的使用示例 * 主线程建立三个延迟任务放到queue中,其余三个线程 * 在任务可用时取出 * Created by bzhang on 2019/4/1. */ public class TestDelayed implements Delayed { private String name; private Date takeTime; //延迟时间 public TestDelayed(String name, Date takeTime) { this.name = name; this.takeTime = takeTime; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Date getTakeTime() { return takeTime; } public void setTakeTime(Date takeTime) { this.takeTime = takeTime; } @Override public long getDelay(TimeUnit unit) { long convert = unit.convert(takeTime.getTime()-System.currentTimeMillis(), TimeUnit.MILLISECONDS); return convert; } @Override public int compareTo(Delayed o) { TestDelayed t = (TestDelayed)o; long l = this.takeTime.getTime() - t.getTakeTime().getTime(); if (l==0){ return 0; } return l > 0 ? 1 : -1; } @Override public String toString() { return "TestDelayed{" + "name='" + name + '\'' + ", takeTime=" + takeTime + '}'; } public static void main(String[] args) { DelayQueue queue = new DelayQueue(); long l = System.currentTimeMillis(); queue.put(new TestDelayed("A",new Date(l+5000))); queue.put(new TestDelayed("B",new Date(l+2000))); queue.put(new TestDelayed("C",new Date(l+7000))); System.out.println(new Date()); int t = 0; for (int i = 0;i < 3;i++){ new Thread(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getName()+queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } } } //结果 Tue Apr 02 11:03:33 CST 2019 Thread-1TestDelayed{name='B', takeTime=Tue Apr 02 11:03:35 CST 2019} Thread-0TestDelayed{name='A', takeTime=Tue Apr 02 11:03:38 CST 2019} Thread-2TestDelayed{name='C', takeTime=Tue Apr 02 11:03:40 CST 2019}
5.DelayQueue中的重要属性及构造方法并发
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { //重入锁,用于保证并发安全 private final transient ReentrantLock lock = new ReentrantLock(); //底层优先级队列,实际元素都存储与该队列中,底层是数组构成的二叉堆 private final PriorityQueue<E> q = new PriorityQueue<E>(); //下一个等待获取元素的线程,可减小没必要要的等待 private Thread leader = null; //条件控制,表示是否能够从队列中取数据 private final Condition available = lock.newCondition(); public DelayQueue() {} public DelayQueue(Collection<? extends E> c) { this.addAll(c); } }
6.DelayQueue的入队方法ide
//add方法本质就是调用offer方法,将元素新增到队列 public boolean add(E e) { return offer(e); } //同上 public void put(E e) { offer(e); } //延迟队列是无界队列,指定超时时间放入元素没有意义,与直接入队是同样的 public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); } //向队列中新增元素,元素位置以比较结果(compareTo方法)来肯定 public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); //调用底层优先级队列的offer方法来存储元素 //判断底层优先级队列的队首是不是新增元素 if (q.peek() == e) { leader = null; //唤醒条件等待队列的某一个线程,即说明队列中有元素了, //能够从队列中获取到元素了 available.signal(); } return true; } finally { lock.unlock(); } }
7.DelayQueue的出队方法oop
//返回延迟时间已到的第一个元素,或返回null(没有元素或元素延迟时间都未到) public E poll() { final ReentrantLock lock = this.lock; //重入锁 lock.lock(); //加锁同步 try { E first = q.peek(); //获取优先级队列中的队首元素 //判断队列是否为空,若不为空那么队首延迟时间是否到达,若都不知足 //说明队首元素可用,返回队首 //不然返回null if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } } //如有延迟时间已到的元素就当即返回,若无则一直等待 //队列中无元素那么也一直等待 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //可被中断锁 try { for (;;) { //自旋 E first = q.peek(); //获取队首元素 //若队列为空,直接进入条件队列等待唤醒 //队列不为空,则判断队首的延时是否到达 if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); //获取剩余延迟时间(单位是ns) if (delay <= 0) //没有剩余延迟时间,则将队首元素返回 return q.poll(); first = null; //判断是否已经有其余线程在等待取元素 //如有,那么就让当前线程直接等待 //若没有,那就说明当前只有本线程在等待获取队首元素 if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); //获取当前线程 leader = thisThread; //将单签线程设为等待获取队首的线程 try { //等待队首元素的延迟时间后,在尝试获取队首元素 available.awaitNanos(delay); } finally { //将等待获取的线程设为null,由于当前线程正在获取,所以不该该有leader //即leader为null,说明要么有线程正在执行获取操做,要么没有出队操做在进行 if (leader == thisThread) leader = null; } } } } } finally { //当前线程已经取完元素了,能够唤醒其余线程获取队首元素了 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } //指定时间内获取延迟的队首元素,若在指定等待时间内队首延迟时间未到达或队列为空 //就返回null public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); //队列是否为空,若为空队列,那么在指定等待是否到达,若等待时间也已到达 //那就返回null,若未到达等待时间,就继续等待 if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); //当前线程进入等待时间nanos纳秒 } else { long delay = first.getDelay(NANOSECONDS); //获取队首元素的延迟时间 //判断延迟时间是否到达,到达就直接将队首元素返回 if (delay <= 0) return q.poll(); //延迟时间未到,但等待时间已经达到,那么就返回null if (nanos <= 0) return null; first = null; // don't retain ref while waiting //延迟时间小于等待时间,说明能够在等待时间内获取到队首元素 //那么就在等待延迟时间到达的时间内,能够再次尝试将队首元素获取返回 //这里仅是再次尝试,由于可能在等待期间内有新的元素入队,且延迟时间最小成为新队首 if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { //等待时间 > 延迟时间 而且没有其它线程在等待, //那么当前元素成为leader,表示当前线程最先正在等待获取元素 Thread thisThread = Thread.currentThread(); leader = thisThread; try { //让等待时间到达 long timeLeft = available.awaitNanos(delay); //继续等待的时间 nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
8.peek方法
//peek方法仅仅就是为底层的优先级队列的peek方法加上锁 public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.peek(); } finally { lock.unlock(); } }
1.PriorityBlockingQueue的底层实现
PriorityBlockingQueue是一个线程安全的无界阻塞队列,能够看对是PriorityQueue的多线程版本,其底层数据结构与PriorityQueue相同,都是数组实现的利用二叉堆结构。前文已经分析过,这里再也不多说
2.PriorityBlockingQueue的继承体系
PriorityBlockingQueue的继承关系以下图所示,均是以前学习过的父类或接口。这里再也不展开。
3.PriorityBlockingQueue中的重要属性及构造方法
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //未指定队列初始容量时使用的默认容量 private static final int DEFAULT_INITIAL_CAPACITY = 11; //队列虽说是无界的,但实际队列是不能超过Integer.MAX_VALUE - 8这个值的 //如果超过报OOM异常 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //底层存放数据的数组 private transient Object[] queue; //队列中元素的个数,计数器 private transient int size; //用于判断优先级的比较器,若为null则使用天然排序 private transient Comparator<? super E> comparator; //重入锁,保证并发安全 private final ReentrantLock lock; //队列非空条件,用于出队操做 private final Condition notEmpty; //用于队列显示是否处于扩容状态,0表示没有在扩容 //而1表示处于扩容状态,将该值更新成1的线程会进行数组扩容 //其余要进行扩容的线程检查该值发现为1,则直接暂停线程让出CPU private transient volatile int allocationSpinLock; //将队列转换成线程不安全的优先级队列,用于序列化 private PriorityQueue<E> q; //建立一个默认初始容量的队列 public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } //建立一个指定初始容量的队列 public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } //建立一个指定容量和比较器的队列 public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; } //以集合c为底,建立一个队列 public PriorityBlockingQueue(Collection<? extends E> c) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); boolean heapify = true; // true if not known to be in heap order boolean screen = true; // true if must screen for nulls //根据集合c是哪种容器来决定建立怎样的初始队列 if (c instanceof SortedSet<?>) { SortedSet<? extends E> ss = (SortedSet<? extends E>) c; this.comparator = (Comparator<? super E>) ss.comparator(); heapify = false; } else if (c instanceof PriorityBlockingQueue<?>) { PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c; this.comparator = (Comparator<? super E>) pq.comparator(); screen = false; if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; } Object[] a = c.toArray(); int n = a.length; // If c.toArray incorrectly doesn't return Object[], copy it. if (a.getClass() != Object[].class) a = Arrays.copyOf(a, n, Object[].class); if (screen && (n == 1 || this.comparator != null)) { for (int i = 0; i < n; ++i) if (a[i] == null) throw new NullPointerException(); } this.queue = a; this.size = n; if (heapify) heapify(); } }
4.入队方法
//PriorityBlockingQueue全部的入队方法,都同样,由于队列是无界队列 //不存在加入队列失败的可能,所以最终都是调用offer方法 public boolean add(E e) { return offer(e); } public void put(E e) { offer(e); // never need to block } public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); // never need to block } public boolean offer(E e) { //优先级队列中不容许存在null元素,所以null元素没法肯定优先级 if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; //n为当前队列中的元素个数,cap为当前队列的容量 Object[] array; //判断底层数组是否须要扩容 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; //判断是使用比较器进行上浮操做,仍是使用天然排序进行上浮操做 if (cmp == null) siftUpComparable(n, e, array); //天然排序上浮 else siftUpUsingComparator(n, e, array, cmp); //比较器上浮 size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; } //天然上浮,与PriorityQueue中同样 private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { int parent = (k - 1) >>> 1; //获取父结点索引 Object e = array[parent]; //父结点 //比较插入的值与父结点的大小,若比父结点小,那么交换位置后,在继续比较 //若比父结点大,说明排序正确,无需在继续比较 if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key; } //比较器比较上浮 private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (cmp.compare(x, (T) e) >= 0) break; array[k] = e; k = parent; } array[k] = x; } //数组扩容 private void tryGrow(Object[] array, int oldCap) { // 扩容时不须要加锁,由于扩容是经过CAS方式来实现的, //这样不只能够提高效率,而且不影响出队操做 lock.unlock(); Object[] newArray = null; //将allocationSpinLock更新成1的线程进行数组扩容操做,其他要扩容的线程暂停 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { //扩容规则,容量小于64,扩大2倍+2,容量不小于64,则扩大1.5倍 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); //判断扩大后的容量是否越界 //如果会越界,则扩容规则改成旧容量+1,若仍越界,报OOM异常 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; //建立新数组 } finally { allocationSpinLock = 0; //恢复为0,表示没有在扩容状态 } } if (newArray == null) //未竞争到扩容操做的线程暂停 Thread.yield(); lock.lock(); /从新上锁 if (newArray != null && queue == array) { queue = newArray; //将旧数组中的数据转移到新数组中 System.arraycopy(array, 0, newArray, 0, oldCap); } }
5.出队方法
//获取并移除队首元素,若队列为空,返回null public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return dequeue(); //真正出队的方法 } finally { lock.unlock(); } } //真正执行获取并移除队首元素的方法 private E dequeue() { int n = size - 1; //移除队首后队列中的元素个数 ,同时也是队尾元素的索引 //判断队列是否为空队列,空队列直接返回null if (n < 0) return null; else { Object[] array = queue; //获取底层数组引用 E result = (E) array[0]; //获取队首元素 E x = (E) array[n]; //获取队尾元素 array[n] = null; //队尾置为null Comparator<? super E> cmp = comparator; //将原来队列的队尾放到队首位置,而后进行下沉操做(即二叉堆从新排序的操做) if (cmp == null) siftDownComparable(0, x, array, n); //使用天然排序下沉 else siftDownUsingComparator(0, x, array, n, cmp); //使用比较器下沉 size = n; return result; } } //下沉操做与PriorityQueue中相同,这里不在多作分析 //天然排序下沉 private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; int half = n >>> 1; // loop while a non-leaf while (k < half) { int child = (k << 1) + 1; // assume left child is least Object c = array[child]; int right = child + 1; if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; } } //比较器下沉 private static <T> void siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp) { if (n > 0) { int half = n >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < n && cmp.compare((T) c, (T) array[right]) > 0) c = array[child = right]; if (cmp.compare(x, (T) c) <= 0) break; array[k] = c; k = child; } array[k] = x; } } //获取并移除队首元素,若队列已空,则等待 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { //若返回的元素为null,说明队列中没有元素 //那么让当前线程进入条件队列中等待,当前队列有元素时,则 //会唤醒线程,在尝试获取并移除队首 while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; } //在必定时间内尝试获取并移除队首元素,若在指定时间内未成功, //返回null public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { //尝试获取并移除队首,若失败但超时时间未到,则进入条件等待 //一段时间后在进行尝试,若超时时间已过仍为成功获取并移除队首 //则返回null while ( (result = dequeue()) == null && nanos > 0) nanos = notEmpty.awaitNanos(nanos); } finally { lock.unlock(); } return result; } //获取但不移除队首元素 public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return (size == 0) ? null : (E) queue[0]; } finally { lock.unlock(); } }