系列传送门:java
LinkedBlockingQueue是由单链表构成的界限可选的阻塞队列,如不指定边界,则为Integer.MAX_VALUE
,所以如不指定边界,通常来讲,插入的时候都会成功。node
LinkedBlockingQueue支持FIFO先进先出的次序对元素进行排序。编程
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -6903933977591709194L; // 单链表节点 static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } /** 容量,若是不指定就是Integer.MAX_VALUE */ private final int capacity; /** 原子变量,记录元素个数 */ private final AtomicInteger count = new AtomicInteger(); /** * 哨兵头节点,head.next才是队列的第一个元素 */ transient Node<E> head; /** * 指向最后一个元素 */ private transient Node<E> last; /** 用来控制同时只有一个线程能够从队头获取元素 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 条件队列,队列为空时,执行出队take操做的线程将会被置入该条件队列 */ private final Condition notEmpty = takeLock.newCondition(); /** 用来控制同时只有一个线程能够从队尾插入元素 */ private final ReentrantLock putLock = new ReentrantLock(); /** 条件队列,队列满时,执行入队操做put的线程将会被置入该条件队列 */ private final Condition notFull = putLock.newCondition(); }
若是但愿获取一个元素,须要先获取takeLock锁,且notEmpty条件成立。并发
若是但愿插入一个元素,须要先获取putLock锁,且notFull条件成立。工具
使用LinkedBlockingQueue的时候,能够指定容量,也能够使用默认的Integer.MAX_VALUE,几乎就是无界的了,固然,也能够传入集合对象,直接构造。学习
// 若是不指定容量,默认容量为Integer.MAX_VALUE (1 << 30) - 1 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } // 传入指定的容量 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; // 初始化last 和 head指针 last = head = new Node<E>(null); } // 传入指定集合对象,容量视为Integer.MAX_VALUE,直接构造queue public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); // 写线程获取putLock final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
队列的操做最核心的部分莫过于入队和出队了,后面分析的方法基本上都基于这两个工具方法。this
LinkedBlockingQueue的出队和入队相对ArrayBlockingQueue来讲就简单不少啦:.net
private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; }
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; // head向后移一位 E x = first.item; first.item = null; return x; }
队列中的元素其实是从head.first开始的,那么移除队头,其实就是将head指向head.next便可。线程
take操做将会获取当前队列头部元素并移除,若是队列为空则阻塞当前线程直到队列不为空,退出阻塞时返回获取的元素。3d
若是线程在阻塞时被其余线程设置了中断标志,则抛出InterruptedException异常并返回。
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; // 首先要获取takeLock final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 若是队列为空, notEmpty不知足,就等着 while (count.get() == 0) { notEmpty.await(); } // 出队 x = dequeue(); // c先赋值为count的值, count 减 1 c = count.getAndDecrement(); // 此次出队后至少还有一个元素,唤醒notEmpty中的读线程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } // c == capacity 表示在该元素出队以前,队列是满的 if (c == capacity) // 由于在这以前队列是满的,可能会有写线程在等着,这里作个唤醒 signalNotFull(); return x; } // 用于唤醒写线程 private void signalNotFull() { final ReentrantLock putLock = this.putLock; // 获取putLock putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
put操做将向队尾插入元素,若是队列未满则插入,若是队列已满,则阻塞当前线程直到队列不满。
若是线程在阻塞时被其余线程设置了中断标志,则抛出InterruptedException异常并返回。
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // 全部的插入操做 都约定 本地变量c 做为是否失败的标识 int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 插入操做获取 putLock putLock.lockInterruptibly(); try { // 队列满,这时notFull条件不知足,await while (count.get() == capacity) { notFull.await(); } enqueue(node); // c先返回count的值 , 原子变量 + 1 , c = count.getAndIncrement(); // 至少还有一个空位能够插入,notFull条件是知足的,唤醒它 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } // c == 0 表示在该元素入队以前,队列是空的 if (c == 0) // 由于在这以前队列是空的,可能会有读线程在等着,这里作个唤醒 signalNotEmpty(); } // 用于唤醒读线程 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; // 获取takeLock takeLock.lock(); try { // 唤醒 notEmpty.signal(); } finally { takeLock.unlock(); } }
在take阻塞式获取方法的基础上额外增长超时功能,传入一个timeout,获取不到而阻塞的时候,若是时间到了,即便还获取不到,也只能当即返回null。
public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 这里就是超时机制的逻辑所在 while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
在put阻塞式插入方法的基础上额外增长超时功能,传入一个timeout,获取不到而阻塞的时候,若是时间到了,即便还获取不到,也只能当即返回null。
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
offer(E e)是非阻塞的方法,向队尾插入一个元素,若是队列未满,则插入成功并返回true;若是队列已满则丢弃当前元素,并返回false。
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; // 此时队列已满,直接返回false if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); // 插入操做 获取putLock final ReentrantLock putLock = this.putLock; putLock.lock(); try { // 加锁后再校验一次 if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; // 只要不是-1,就表明成功~ }
从队列头部获取并移除第一个元素,若是队列为空则返回null。
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { // 若是队列不为空,则出队, 并递减计数 if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
瞅一瞅队头的元素是啥,若是队列为空,则返回null。
public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { // 实际上第一个元素是head.next Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } }
移除队列中与元素o相等【指的是equals方法断定相同】的元素,移除成功返回true,若是队列为空或没有匹配元素,则返回false。
public boolean remove(Object o) { if (o == null) return false; fullyLock(); try { // trail 和 p 同时向后遍历, 若是p匹配了,就让trail.next = p.next表明移除p for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } } // trail为p的前驱, 但愿移除p节点 void unlink(Node<E> p, Node<E> trail) { // assert isFullyLocked(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. p.item = null; trail.next = p.next;// 移除p // 若是p已是最后一个节点了,就更新一下last if (last == p) last = trail; // 移除一个节点以后,队列从满到未满, 唤醒notFull if (count.getAndDecrement() == capacity) notFull.signal(); } //----- 多个锁 获取和释放的顺序是 相反的 // 同时上锁 void fullyLock() { putLock.lock(); takeLock.lock(); } // 同时解锁 void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }
Integer.MAX_VALUE
,所以如不指定边界,通常来讲,插入的时候都会成功。若是但愿获取一个元素,须要先获取takeLock锁,且notEmpty条件成立。
若是但愿插入一个元素,须要先获取putLock锁,且notFull条件成立。