//从这可知,LinkedBlockingQueue是个单链表 static class Node<E> { E item; //数据 Node<E> next; //后继结点 Node(E x) { item = x; } }
2.LinkedBlockingQueue的继承体系java
LinkedBlockingQueue的继承关系以下图所示,由继承关系可知LinkedBlockingQueue与ArrayBlockingQueue实现的功能是相同的,只在存储数据结构上不一样。其父类及实现的接口在以前的学习中都已分析过,这里不在多说。node
3.重要的属性以及构造方法数组
在LinkedBlockingQueue中保证线程并发安全所使用的的方式与ArrayBlockingQueue类似,都是经过使用重入锁ReentrantLock来实现的,不一样的是ArrayBlockingQueue不论出队仍是入队使用的都是同一把锁,所以ArrayBlockingQueue在实际使用使是不能出入队并发执行的,而LinkedBlockingQueue在这边方面则不一样,LinkedBlockingQueue的出入队是各一把锁,分别控制出入队操做。安全
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //阻塞队列的容量,不能超过int类型的最大值 private final int capacity; //队列中元素的数量,是个原子计数类型 private final AtomicInteger count = new AtomicInteger(); //底层链表的头结点,即队首 transient Node<E> head; //底层链表的尾结点,即队尾 private transient Node<E> last; //重入锁,用于出队操做 private final ReentrantLock takeLock = new ReentrantLock(); //队列容许出队条件 private final Condition notEmpty = takeLock.newCondition(); //重入锁,用于入地操做 private final ReentrantLock putLock = new ReentrantLock(); //队列容许入队条件 private final Condition notFull = putLock.newCondition(); //默认构造方法,队列容量为最大值 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } //指定容量的构造方法 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); //初始化链表 } //拥有初始数据的阻塞队列 public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } } }
4.入队过程数据结构
与ArrayBlockingQueue相同,LinkedBlockingQueue中的入队方法也是put、add和offer三种,不过相比ArrayBlockingQueue中方法,它们更简单一些。并发
//父类AbstractQueue中的添加方法,新增失败就抛异常 public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } //入队方法,容量不足就放弃入队 public boolean offer(E e) { //从这能够知道,LinkedBlockingQueue中也是不容许null元素的 if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; //获取元素计数器 //判断队列是否已满,已满不容许在添加元素到队列中 if (count.get() == capacity) return false; int c = -1; //用于记录队列中原有元素的数量 Node<E> node = new Node<E>(e); //新建e元素接地 final ReentrantLock putLock = this.putLock; putLock.lock(); //入队锁 try { //获取锁后再次判断队列容量是否已满,已满放弃入队 if (count.get() < capacity) { enqueue(node); //真正执行入队的方法 c = count.getAndIncrement(); //元素计数+1,而且将原来元素的个数返回 //判断入队后,队列是否还有剩余容量,如有就唤醒其余某个入队操做线程 if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); //释放锁 } //判断队列原来是否是空队列,即未新增元素以前是否是为空 //若为空,那么出队操做此时应该都在等待状态,须要唤醒某个出队操做的线程 if (c == 0) signalNotEmpty(); //唤醒一个出队操做线程 return c >= 0; } //向队列中添加队尾元素 private void enqueue(Node<E> node) { last = last.next = node; //直接添加元素到链表尾结点 } //唤醒出队操做的线程 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; //获取出队锁 takeLock.lock(); try { notEmpty.signal(); //随机唤醒某个出队操做线程 } finally { takeLock.unlock(); } } //在必定时间内执行入队操做,若超过指定时间仍没法入队,那就放弃入队,可被中断 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { //从这能够知道,LinkedBlockingQueue中也是不容许null元素的 if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); //换算超时时间单位 int c = -1; final ReentrantLock putLock = this.putLock; //获取入队锁 final AtomicInteger count = this.count; //获取计数器 putLock.lockInterruptibly(); //可被中断的加锁 try { //判断队列容量是否已满,使用循环是为了防止虚假唤醒 //队列如果已满,则等待必定时间在尝试入队 while (count.get() == capacity) { if (nanos <= 0) //判断等待时间是否还有剩余 return false; //超时,返回入队失败 nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; } //向队列中添加元素,若队列容量不足,则等待直到队列有空间后继续添加 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; //获取入队锁 final AtomicInteger count = this.count; //计数器 putLock.lockInterruptibly(); try { //判断队列是否已满,队列已满则,入队操做线程进入等待状态 //唤醒后要再次判断队列是否有空间,防止虚假唤醒 while (count.get() == capacity) { //队列已满,线程进入条件队列等待 notFull.await(); } enqueue(node); //入队 c = count.getAndIncrement(); //获取原数量,并增长队列中的元素数量 if (c + 1 < capacity) //到此,说明队列中至少有一个元素,那么就能进行出队操做 //所以能够唤醒一个等待出队的线程执行出队操做 notFull.signal(); } finally { putLock.unlock(); } /判断队列原来是否是空队列,即未新增元素以前是否是为空 //若为空,那么出队操做此时应该都在等待状态,须要唤醒某个出队操做的线程 if (c == 0) signalNotEmpty(); }
5.出队过程学习
在LinkedBlockingQueue中的出队方法也是只有两种poll和take,一个不等待,一个等待。this
//移除并返回队首元素,若队列为空,则返回null public E poll() { final AtomicInteger count = this.count; //获取队列元素个数 if (count.get() == 0) //判断是否是空队列,空队列直接返回null return null; E x = null; //用于记录移除出队的结点 int c = -1; //用于记录队列原来结点数量 final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { //判断队列是否为空,不为空才能移除并获取队首结点 if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); //记录旧计数,并结点更新计数 //出队后,队列仍不为空,那么久能够继续唤醒一个出队操做的线程 if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } //在必定时间内尝试出队操做,若超时仍未成功,则返回null public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) //判断是否超时 return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } //移除并返回队首元素,若队列为空,则等待 public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { //判断队列是不是空队列,如果空队列那么当前出队操做进入等待状态 while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); //判断队列是否为空,队列不空,则能够继续唤醒其余的出队操做线程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } //判断未进行本次出队操做前队列是否已满,队列如果已满,说明全部的 //入队操做要么处于等待状态,要么不能成功,而如今至少队列执行过一次 //出队操做,此时队列必然还有容量能够执行入队操做,所以能够唤醒任意一个 //执行入队操做的线程 if (c == capacity) signalNotFull(); return x; }
6.peek方法spa
//获取但不移除队首元素 public E peek() { //队列中没有元素的话,就返回null if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; //head.next结点就是队首元素对应的结点 //判断队首是否为null,为null说明队列是空队列 if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } }
7.remove方法线程
//遍历队列查找o元素对应的结点并将其从队列中移除 public boolean remove(Object o) { if (o == null) return false; fullyLock(); //获取出入队锁,防止并发问题 try { //遍历队列对应的链表,查找要移除的元素 for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { //判断结点是不是要删除的结点,如果,那就要将该结点从 //链表中移除 if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); //将出入队锁都释放 } } //删除队列中元素时,要对其余的出入队操做进行同步 //所以删除操做要将出队锁和入队锁都获取到 void fullyLock() { putLock.lock(); takeLock.lock(); } //将结点p从链表中移除 void unlink(Node<E> p, Node<E> trail) { p.item = null; trail.next = p.next; //trail的后继结点变为p的后继结点 //如果p是尾结点,那么将last变为trail(trail是p的前驱结点) if (last == p) last = trail; //移除一个结点,那么结点计数要-1,而且能够唤醒入队操做的线程了 if (count.getAndDecrement() == capacity) notFull.signal(); }
8.size的统计
public int size() { return count.get(); //队列中的元素个数直接返回计数器值 }
2、LinkedBlockingDueue并发容器
1.LinkedBlockingDueue的底层实现
LinkedBlockingDueue能够看作是LinkedBlockingQueue的升级版,LinkedBlockingQueue能作的LinkedBlockingDueue也能作,不能作的LinkedBlockingDueue还能作,其底层数据结构也是链表,不过与LinkedBlockingQueue的单链表不一样,LinkedBlockingDueue是双向链表,而且还能够作堆栈使用。
结点的定义以下:
static final class Node<E> { //存储数据 E item; //前驱结点 Node<E> prev; //后继结点 Node<E> next; Node(E x) { item = x; } }
2.LinkedBlockingDueue的继承关系
LinkedBlockingDueue的继承关系以下所示,相比LinkedBlockingQueue多实现了一个双端队列的接口。
接下来看看BlockingDeque中定义了哪些方法:
public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> { //双端队列中若还有容量,将元素添加到队首,不然抛出异常 void addFirst(E e); //双端队列中若还有容量,将元素添加到队尾,不然抛出异常 void addLast(E e); //双端队列中若还有容量,将元素添加到队首,不然返回false boolean offerFirst(E e); //双端队列中若还有容量,将元素添加到队尾,不然返回false boolean offerLast(E e); //双端队列中若还有容量,当即将元素添加到队首,不然等待容量有空闲在添加 void putFirst(E e) throws InterruptedException; //双端队列中若还有容量,当即将元素添加到队尾,不然等待容量有空闲在添加 void putLast(E e) throws InterruptedException; //在必定时间内尝试将元素添加到队首,若到指定时间还没添加成功,则返回false boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException; //在必定时间内尝试将元素添加到队尾,若到指定时间还没添加成功,则返回false boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException; //若队列不空,当即获取并移除队首元素,若队列已空则等待到队列中有元素在执行 E takeFirst() throws InterruptedException; //若队列不空,当即获取并移除队尾元素,若队列已空则等待到队列中有元素在执行 E takeLast() throws InterruptedException; //在必定时间内尝试获取并移除队首元素,若到达指定时间队列中仍没有元素,直接放弃尝试,返回null E pollFirst(long timeout, TimeUnit unit) throws InterruptedException; //在必定时间内尝试获取并移除队尾元素,若到达指定时间队列中仍没有元素,直接放弃尝试,返回null E pollLast(long timeout, TimeUnit unit) throws InterruptedException; //移除队列中第一次出现的o元素 boolean removeFirstOccurrence(Object o); //移除队列中最后一个出现的o元素 boolean removeLastOccurrence(Object o); //双端队列中若还有容量,将元素添加到队尾,不然抛出异常 //该方法等同于addLast boolean add(E e); //双端队列中若还有容量,将元素添加到队尾,不然返回false //该方法等同于offerLast boolean offer(E e); //若队列不满,则当即向队尾添加元素,不然等待队列有空间后在添加 void put(E e) throws InterruptedException; //在必定时间内尝试将元素添加到队尾,若到指定时间还没添加成功,则返回false boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; //获取并移除队首,若队列为空,则抛异常 E remove(); //获取并移除队首,若队列为空,则返回null E poll(); //获取并移除队首,若队列为空,则等待队列不为空在执行 E take() throws InterruptedException; //在必定时间内尝试获取并移除队尾元素,若到达指定时间队列中仍没有元素,直接放弃尝试,返回null E poll(long timeout, TimeUnit unit) throws InterruptedException; //获取但不移除队首元素,若队列为空,那么抛异常 E element(); //获取但不移除队首元素,若队列为空,那么返回null E peek(); //删除队里中第一次出现的o元素 boolean remove(Object o); //判断队列中是否含有o元素 public boolean contains(Object o); //队列中元素的数量 public int size(); //获取队列的迭代器 Iterator<E> iterator(); //向队列中压入一个元素,即向队首添加一个元素,若队列没有 //容量,则抛出异常,等同于addFirst void push(E e); }
3.重要属性及构造方法
LinkedBlockingDueue也是个容量可选(最大为Integer.MAX_VALUE)的阻塞队列,且线程安全。与LinkedBlockingQueue类似,其线程安全也是经过ReentrantLock来实现的,不过略微不一样的似,LinkedBlockingDueue的底层只有一个重入锁,而LinkedBlockingQueue则有两个。
public class LinkedBlockingDeque<E> extends AbstractQueue<E> //队列的头结点 transient Node<E> first; //队尾结点 transient Node<E> last; //队列中的结点计数 private transient int count; //队列容量 private final int capacity; //重入锁 final ReentrantLock lock = new ReentrantLock(); //队列容许出队条件 private final Condition notEmpty = lock.newCondition(); //队列容许入队条件 private final Condition notFull = lock.newCondition(); //使用默认容量的队列 public LinkedBlockingDeque() { this(Integer.MAX_VALUE); } //指定容量的队列 public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; } //带有初始元素的队列 public LinkedBlockingDeque(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock lock = this.lock; lock.lock(); // Never contended, but necessary for visibility try { for (E e : c) { if (e == null) throw new NullPointerException(); if (!linkLast(new Node<E>(e))) throw new IllegalStateException("Deque full"); } } finally { lock.unlock(); } } }
4.入队过程
由对BlockingDeque的分析可知,LinkedBlockingDueue中存在着大量的入队方法,这里就不一一分析了,由于实现基本都差很少,只挑选个别来看看。
//addFirst方法的本质其实仍是调用offerFirst //向队首新增元素,若队列容量不足,则抛异常 public void addFirst(E e) { if (!offerFirst(e)) throw new IllegalStateException("Deque full"); } //向队首新增元素,若队列容量不足,则返回false public boolean offerFirst(E e) { //队列中不容许null元素存在 if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); //新建对应结点 final ReentrantLock lock = this.lock; lock.lock(); //加锁 try { return linkFirst(node); //真正执行添加队首结点的方法 } finally { lock.unlock(); } } //向队首新增结点 private boolean linkFirst(Node<E> node) { //判断队列是否已满 if (count >= capacity) return false; //队列已满直接返回失败 Node<E> f = first; //获取队首结点 node.next = f; //将原队首结点设为新增结点的后继结点 first = node; //将新增结点设为队首结点 //判断原队列中是否为空队列 if (last == null) last = node; //原队列若为空队列,那么此时队首队尾都是同一个结点 else f.prev = node; //设置原来的队首结点的前驱结点为新增结点 ++count; //队列中的结点数量+1 notEmpty.signal(); //唤醒执行出队操做的线程 return true; } //向队尾新增元素,若队列容量不足,则抛异常 public void addLast(E e) { if (!offerLast(e)) throw new IllegalStateException("Deque full"); } //向队尾新增元素,若队列容量不足,则返回false public boolean offerLast(E e) { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; lock.lock(); try { return linkLast(node); //真正实现添加队尾结点的方法 } finally { lock.unlock(); } } //向队尾新增结点 private boolean linkLast(Node<E> node) { //判断队列是否已满 if (count >= capacity) return false; Node<E> l = last; //获取当前队尾结点 node.prev = l; //将新增结点的前驱设为l last = node; //新增节点设为队尾 //判断队列本来是否为空 //若为空,则新增结点既是队首也是队尾 if (first == null) first = node; else l.next = node; //将l节点的后继设为新增结点 ++count; //计数+1 notEmpty.signal(); //唤醒执行出队操做的线程 return true; } //向队首新增结点,若队列已满,则等待 public void putFirst(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkFirst(node)) //若新增队首结点失败,则线程进入等待状态 notFull.await(); } finally { lock.unlock(); } } //向队尾新增结点,若队列已满,则等待 public void putLast(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkLast(node)) //若新增队尾结点失败,则线程进入等待状态 notFull.await(); } finally { lock.unlock(); } }
5.出队过程
同入队同样,出队的方法也不少,这里也只选个别来分析:
//获取并移除队首元素,若队列为空,则返回null public E pollFirst() { final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkFirst(); //将队首结点从队列中移除 } finally { lock.unlock(); } } //移除队首节点的方法 private E unlinkFirst() { //获取队首结点 Node<E> f = first; //判断队列是否为空队列,空队列直接返回null if (f == null) return null; //获取队首结点的后继结点,要做为新的队首结点 Node<E> n = f.next; E item = f.item; //获取队首节点的数据,用做返回值 f.item = null; //清空队首结点,方便GC回收 f.next = f; //队首出队的后继设为本身 first = n; //设置新的队首为n //判断队列是否还有结点 if (n == null) last = null; //队列如果空了,那么队尾也设为null else n.prev = null; //新队首的前驱设为null --count; //队列中的结点计数-1 //唤醒执行入队操做的线程,队列刚执行一次出队操做,必然有剩余空间 //所以能够执行入队操做 notFull.signal(); return item; } //获取并移除队尾元素,若队列为空,则返回null public E pollLast() { final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkLast(); //将队尾结点从队列中移除 } finally { lock.unlock(); } } //移除队尾结点的方法 private E unlinkLast() { //获取队尾引用 Node<E> l = last; //判断队列是不是空队列,空队列直接返回null if (l == null) return null; Node<E> p = l.prev; //获取队尾的前驱结点,用做新的队尾结点 E item = l.item; l.item = null; l.prev = l; //队尾结点出队后的前驱设为自身,方便GC回收 last = p; //设置新队尾 //判断队尾出队后队列中是否仍是有结点,即队列是否成了空队列 if (p == null) first = null; //空队列的队首也是null else p.next = null; //新队尾的后继设为null --count; //结点计数-1 notFull.signal(); //唤醒入队操做的线程 return item; } //将队首移除并返回,若队列已空则等待 public E takeFirst() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; //判断移除队首结点是否成功,失败则等待 while ( (x = unlinkFirst()) == null) notEmpty.await(); return x; } finally { lock.unlock(); } } //将队尾移除并返回,若队列已空则等待 public E takeLast() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; //判断移除队尾结点是否成功,失败则等待 while ( (x = unlinkLast()) == null) notEmpty.await(); return x; } finally { lock.unlock(); } }
6.总结
LinkedBlockingDueue中其余的方法就不一一分析了,都比较简单。
与LinkedBlockingQueue对比,LinkedBlockingDueue的线程安全以及阻塞等待的实现基本没有区别,两个阻塞队列基本能够通用(LinkedBlockingDueue用做栈时除外)。两个队列基本上只有两点不一样:一个是底层数据结构的细微区别,LinkedBlockingQueue是单向链表,而LinkedBlockingDueue则是双向链表;另外一个是重入锁的使用有些区别,LinkedBlockingDueue不论出入队都使用的是同一个锁对象,而LinkedBlockingQueue的出入队锁是分开的。