本文将主要结合源码对 JDK 中的阻塞队列进行分析,并比较其各自的特色;java
说到阻塞队列想到的第一个应用场景可能就是生产者消费者模式了,如图所示;node
根据上图所示,明显在入队和出队的时候,会发生竞争;因此一种很天然的想法就是使用锁,而在 JDK 中也的确是经过锁来实现的;因此 BlockingQueue
的源码其实能够当成锁的应用示例来查看;同时 JDK 也为咱们提供了多种不一样功能的队列:数组
接下来咱们就对最经常使用的 ArrayBlockingQueue
和 LinkedBlockingQueue
进行分析;源码分析
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { final Object[] items; // 容器数组 int takeIndex; // 出队索引 int putIndex; // 入队索引 int count; // 排队个数 final ReentrantLock lock; // 全局锁 private final Condition notEmpty; // 出队条件队列 private final Condition notFull; // 入队条件队列 ... }
ArrayBlockingQueue
的结构如图所示:
this
如图所示,线程
ArrayBlockingQueue
的数组实际上是一个逻辑上的环状结构,在添加、取出数据的时候,并无像 ArrayList
同样发生数组元素的移动(固然除了 removeAt(final int removeIndex)
);takeIndex
和 putIndex
指示读写位置;下面咱们就读写操做,对源码简单分析:指针
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) // 当队列已满的时候放入 putCondition 条件队列 notFull.await(); enqueue(e); // 入队 } finally { lock.unlock(); } }
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; // 插入队列 if (++putIndex == items.length) putIndex = 0; // 指针走一圈的时候复位 count++; notEmpty.signal(); // 唤醒 takeCondition 条件队列中等待的线程 }
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 当队列为空的时候,放入 takeCondition 条件 notEmpty.await(); return dequeue(); // 出队 } finally { lock.unlock(); } }
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; // 取出元素 items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); // 取出元素后,队列空出一位,因此唤醒 putCondition 中的线程 return x; }
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private final int capacity; // 默认 Integer.MAX_VALUE private final AtomicInteger count = new AtomicInteger(); // 容量 transient Node<E> head; // 头结点 head.item == null private transient Node<E> last; // 尾节点 last.next == null private final ReentrantLock takeLock = new ReentrantLock(); // 出队锁 private final Condition notEmpty = takeLock.newCondition(); // 出队条件 private final ReentrantLock putLock = new ReentrantLock(); // 入队锁 private final Condition notFull = putLock.newCondition(); // 入队条件 static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } }
LinkedBlockingQueue
的结构如图所示:
code
如图所示,blog
LinkedBlockingQueue
其实就是一个简单的单向链表,其中头部元素的数据为空,尾部元素的 next 为空;下面咱们就读写操做,对源码简单分析:索引
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; // 若是队列已满,直接返回失败 int c = -1; Node<E> node = new Node<E>(e); // 将数据封装为节点 final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); // 入队 c = count.getAndIncrement(); if (c + 1 < capacity) // 若是队列未满,则继续唤醒 putCondition 条件队列 notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) // 若是添加以前的容量为0,说明在出队的时候有竞争,则唤醒 takeCondition signalNotEmpty(); // 由于是两把锁,因此在唤醒 takeCondition的时候,还须要获取 takeLock return c >= 0; }
private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; // 链接节点,并设置尾节点 }
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { // 若是队列为空,则加入 takeCondition 条件队列 notEmpty.await(); } x = dequeue(); // 出队 c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); // 若是队列还有剩余,则继续唤醒 takeCondition 条件队列 } finally { takeLock.unlock(); } if (c == capacity) // 若是取以前队列是满的,说明入队的时候有竞争,则唤醒 putCondition signalNotFull(); // 一样注意是两把锁 return x; }
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC // 将next引用指向本身,则该节点不可达,在下一次GC的时候回收 head = first; E x = first.item; first.item = null; return x; }
根据以上的讲解,咱们能够逐步分析出一些不一样,以及在不一样场景队列的选择:
因此在这里并不能简单的给出详细的数据,证实哪一个队列更适合什么场景,最好是结合实际使用场景分析;