都用于取队列的头结点,poll会删除头结点,peek不会删除头结点。java
增长有三种方式,前提:队列满 | 方式 | put | add | offer | |--------|--------|--------|--------| |特色|一直阻塞|抛异常|返回false|node
删除有三种方式,前提:队列为空 | 方式 | remove | poll | take | |--------|--------|--------|--------| |特色|NoSuchElementException|返回false|阻塞|数组
//链表节点内部类 static class Node<E> { //节点元素 E item; Node<E> next; Node(E x) { item = x; } } //容量界限,若是未设定,则为Integer最大值 private final int capacity; //当前元素个数 private final AtomicInteger count = new AtomicInteger(); //链表的头:head.item == null transient Node<E> head; //链表的尾:last.next == null private transient Node<E> last; //take,poll等获取锁 private final ReentrantLock takeLock = new ReentrantLock(); //等待任务的等待队列 private final Condition notEmpty = takeLock.newCondition(); //put,offer等插入锁 private final ReentrantLock putLock = new ReentrantLock(); //等待插入的等待队列 private final Condition notFull = putLock.newCondition();
signalNotEmpty()方法,在插入线程发现队列为空时调用,告知获取线程须要等待。 signalNotFull()方法,在获取线程发现队列已满时调用,告知插入线程须要等待。安全
//表示等待take。put/offer调用,不然一般不会锁定takeLock。 private void signalNotEmpty() { //获取takeLock final ReentrantLock takeLock = this.takeLock; //锁定takeLock takeLock.lock(); try { //唤醒take线程等待队列 notEmpty.signal(); } finally { //释放锁 takeLock.unlock(); } } //表示等待put,take/poll 调用 private void signalNotFull() { //获取putLock final ReentrantLock putLock = this.putLock; //锁定putLock putLock.lock(); try { //唤醒插入线程等待队列 notFull.signal(); } finally { //释放锁 putLock.unlock(); } }
enqueue()方法只能在持有 putLock 锁下执行,dequeue()在持有 takeLock 锁下执行。并发
//在队列尾部插入 private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; //last.next指向当前node //尾指针后移 last = last.next = node; } //移除队列头 private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; //保存头指针 Node<E> h = head; //获取当前链表第一个元素 Node<E> first = h.next; //头指针的next指向本身 h.next = h; // help GC //头指针指向第一个元素 head = first; //获取第一个元素的值 E x = first.item; //将第一个元素的值置空 first.item = null; //返回第一个元素的值 return x; }
在须要对两把锁同时加锁时,把加锁的顺序与释放的顺序封装成方法,确保全部地方都是一致的。并且获取锁时都是不响应中断的,一直获取直到加锁成功,这就避免了第一把锁加锁成功,而第二把锁加锁失败致使锁不释放的风险。ide
//锁定putLock和takeLock void fullyLock() { putLock.lock(); takeLock.lock(); } //与fullyLock的加锁顺序相反,先解锁takeLock,再解锁putLock void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }
简单介绍一下LinkedBlockingQueue中API的源码,如构造方法,新增,获取,删除,drainTo。函数
LinkedBlockingQueue有三个构造方法,其中无参构造尽可能少用,由于容量为Integer的最大值,操做不当会出现内存溢出。this
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { //参数校验 if (capacity <= 0) throw new IllegalArgumentException(); //设置容量 this.capacity = capacity; //首尾节点指向一个空节点 last = head = new Node<E>(null); } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); //获取putLock final ReentrantLock putLock = this.putLock; //锁定 putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
将给定的元素设置到队列中,若是设置成功返回true, 不然返回false。 e的值不能为空,不然抛出空指针异常。线程
//若是能够在不超过队列容量的状况下当即插入指定的元素到队列的尾部,成功后返回true,若是队列已满,返回false。当使用容量受限的队列时,此方法一般比方法BlockingQueue#add更可取,后者只能经过抛出异常才能插入元素。 public boolean offer(E e) { //非空判断 if (e == null) throw new NullPointerException(); //计数器 final AtomicInteger count = this.count; //若是队列已满,直接返回插入失败 if (count.get() == capacity) return false; int c = -1; //新建节点 Node<E> node = new Node<E>(e); //获取插入锁 final ReentrantLock putLock = this.putLock; //锁定 putLock.lock(); try { //若是队列未满 if (count.get() < capacity) { //插入队列 enqueue(node); //计数 c = count.getAndIncrement(); //还有空余空间 if (c + 1 < capacity) //唤醒插入线程 notFull.signal(); } } finally { //解锁 putLock.unlock(); } //若是队列为空 if (c == 0) //通知获取线程阻塞 signalNotEmpty(); //返回成功或者插入失败 return c >= 0; }
将元素设置到队列中,若是队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。指针
public void put(E e) throws InterruptedException { //不能够插入空元素 if (e == null) throw new NullPointerException(); //全部put/take/etc中的约定都是预先设置本地var //除非设置,不然保持计数为负数表示失败。 int c = -1; //新建节点 Node<E> node = new Node<E>(e); //获取putLock final ReentrantLock putLock = this.putLock; //获取计数器 final AtomicInteger count = this.count; //可中断加锁,即在锁获取过程当中不处理中断状态,而是直接抛出中断异常,由上层调用者处理中断。 putLock.lockInterruptibly(); try { /* * 注意count在wait守卫线程中使用,即便它没有被锁保护。 * 这是由于count只能在此时减小(全部其余put都被锁定关闭), * 若是它从容量更改,咱们(或其余一些等待put)将收到信号。 * 相似地,count在其余等待守卫线程中的全部其余用途也是如此。 */ //只要当前队列已满 while (count.get() == capacity) { //通知插入线程等待 notFull.await(); } //插入队列 enqueue(node); //数量加1 c = count.getAndIncrement(); //若是队列增长1个元素还未满 if (c + 1 < capacity) //唤醒插入进程 notFull.signal(); } finally { //解锁 putLock.unlock(); } //若是队列中没有元素了 if (c == 0) //通知获取线程等待 signalNotEmpty(); }
非阻塞的获取队列中的第一个元素,不出队列。
public E peek() { //队列为空,直接返回 if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { //获取第一个元素,非哨兵 Node<E> first = head.next; //元素为空,返回null if (first == null) return null; else //返回第一个元素值 return first.item; } finally { takeLock.unlock(); } }
非阻塞的获取队列中的值,未获取到返回null。
public E poll() { final AtomicInteger count = this.count; //队列为空,直接返回 if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { //队列非空,获取队列中元素 if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
从队列中移除指定的值。将两把锁都锁定。
public boolean remove(Object o) { //不支持null if (o == null) return false; //锁定两个锁 fullyLock(); try { //迭代队列 for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { //经过equals方法匹配待删除元素 if (o.equals(p.item)) { //移除p节点 unlink(p, trail); //成功 return true; } } //失败 return false; } finally { //解锁 fullyUnlock(); } } // 将内部节点p与前一个跟踪断开链接 void unlink(Node<E> p, Node<E> trail) { // assert isFullyLocked(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. //p节点内容置空 p.item = null; //trail节点的next指向p的next trail.next = p.next; //若是p是队尾 if (last == p) //trail变为队尾 last = trail; //若是队列已满 if (count.getAndDecrement() == capacity) //通知插入线程阻塞 notFull.signal(); }
清空队列。
//原子性地从队列中删除全部元素。此调用返回后,队列将为空。 public void clear() { //锁定 fullyLock(); try { //清空数据,帮助垃圾回收 for (Node<E> p, h = head; (p = h.next) != null; h = p) { h.next = h; p.item = null; } head = last; // assert head.item == null && head.next == null; //若是容量为0 if (count.getAndSet(0) == capacity) //唤醒插入线程 notFull.signal(); } finally { //解锁 fullyUnlock(); } }
将队列中值,所有移除,并发设置到给定的集合中。
public int drainTo(Collection<? super E> c, int maxElements) { //各类判断 if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; boolean signalNotFull = false; //锁 final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { //获取要转移的数量 int n = Math.min(maxElements, count.get()); // count.get provides visibility to first n Nodes Node<E> h = head; int i = 0; try { //组装集合 while (i < n) { Node<E> p = h.next; c.add(p.item); p.item = null; h.next = h; h = p; ++i; } return n; } finally { // Restore invariants even if c.add() threw if (i > 0) { // assert h.item == null; head = h; signalNotFull = (count.getAndAdd(-i) == capacity); } } } finally { takeLock.unlock(); if (signalNotFull) signalNotFull(); } }