基于链表的无边界阻塞队列,经常使用与线程池建立中做为任务缓冲队列使用java
先看一下内部定义,与 ArrayBlockingQueue
作一下对比,顺带看下这二者的区别及不一样的应用场景node
/** 队列的容量, or Integer.MAX_VALUE if none */ private final int capacity; /** 队列中实际的个数 */ private final AtomicInteger count = new AtomicInteger(); /** * 队列头,但其中没有有效数据,它的下一个才保存实际的数据 * Head of linked list. * Invariant: head.item == null */ transient Node<E> head; /** * 队列尾,其内包含有效的数据 * Invariant: last.next == null */ private transient Node<E> last; /** 出队的锁, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** 进队的锁, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition(); static class Node<E> { // 存放在队列中的数据; 队列头的item为null E item; // 队列中,该节点的下一个节点,队列尾的next为null Node<E> next; Node(E x) { item = x; } }
说明数组
对比下ArrayBlockingQueue
,主要区别为两个地方数据结构
LinkedBlockingQueue
底层为链表;ArrayBlockingQueue
底层为数组(貌似有点多余,命名上就能够看出)LinkedBlockingQueue
出队和入队是两个锁,而ArrayBlockingQueue
是一个锁进行控制;即前者出队和入队能够并发执行;然后者会出现锁的竞争分析阻塞原理以前,先经过注释解释下LinkedBlockingQueue
的使用场景并发
public void put(E e) throws InterruptedException { // 队列中不能存在null if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { // 若队列已满,则等待`notFull(出队后,队列未满时).signal()`唤醒 while (count.get() == capacity) { notFull.await(); } enqueue(node); // 进队 c = count.getAndIncrement(); // 计数+1,并获取队列的实际元素个数 if (c + 1 < capacity) // 若进队后,队列依然没有满,则释放一个信号 (why?) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) // 表示队列从空到有一个数据,唤醒由于队列为空被阻塞的线程 signalNotEmpty(); } private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; } // 唤醒阻塞的出队线程,注意使用姿式,Condition的使用必须放在对应的锁中间,不然会报错 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
进队逻辑:this
notEmpty.singal()
,唤醒由于队列空被阻塞的出队线程public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 若是队列为空,则阻塞,等待入队以后,被唤醒 while (count.get() == 0) { notEmpty.await(); } x = dequeue(); // 进队 c = count.getAndDecrement(); if (c > 1) // 若是队列依然非空,则唤醒其余由于队列为空被阻塞的线程 notEmpty.signal(); } finally { takeLock.unlock();} if (c == capacity) // 原来队列为满的,此时出队一个后,正好非满,唤醒由于队列满被阻塞的线程 signalNotFull(); return x; } // 出队逻辑,实现逻辑是把出队Node节点设置为新的head,释放老的head节点 private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; } // 唤醒阻塞的出队线程,注意使用姿式,Condition的使用必须放在对应的锁中间,不然会报错 private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
出队逻辑.net
查看上面的源码时,还发现一个很是有意思的地方,出队成功以后,会判断若是以前的队列中元素的个数大于1(即出队以后,还有元素),会执行notEmpty.signal();
,唤醒被阻塞的出队线程,为何要这么干?线程
假设一种场景,一个空队列,两个线程(A,B)都执行出队,被阻塞;code
此时线程C执行入队,入队完成,由于队列由空到非空,会唤醒一个被阻塞的出队线程(假设为A); 由于出队和入队是能够并发的,如今在线程A执行`c = count.getAndDecrement();`以前,若线程D又入队成功一个,由于此时队列非空,因此不会调用`signalNotEmpty` 如今若是线程A执行出队以后,获取到的c应该为2,若是不执行`notEmpty.signal();`,就会致使线程B一直被阻塞,显然不符合咱们的预期
除了出队和入队的方法以外,还有几个有意思的方法,如队列中元素以数组形式输出,判断队列是否有元素,这两个操做,都会竞争出队和入队锁,确保在执行这个方法时,队列不会被其余线程修改对象
public boolean contains(Object o) { if (o == null) return false; fullyLock(); try { for (Node<E> p = head.next; p != null; p = p.next) if (o.equals(p.item)) return true; return false; } finally { fullyUnlock(); } } public Object[] toArray() { fullyLock(); try { int size = count.get(); Object[] a = new Object[size]; int k = 0; for (Node<E> p = head.next; p != null; p = p.next) a[k++] = p.item; return a; } finally { fullyUnlock(); } }
链表阻塞队列的经典使用case,基本上用过线程池就会用到这个了,如jdk中自带的
// java.util.concurrent.Executors public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
ArrayBlockingQueue
: 底层存储结构为数组,直接将数据存入数组中
LinkedBlockingQueue
: 底层存储结构为单向链表,会将数据封装到Node对象做为链表的节点,且链表头中不包含实际的元素信息
ArrayBlockingQueue
: 出队入队公用一把锁,即二者没法并发
LinkedBlockingQueue
: 出队和入队各一把锁,所以出队和入队可并发执行
所以在线程池的建立中,通常是使用LinkedBlockingQueue,至少线程在进入等待队列中时,出队和进队不会相互阻塞,可是二者之间有关联
ArrayBlockingQueue
: 是直接将对象插入或移除
LinkedBlockingQueue
: 须要把枚举对象转换为Node<E>进行插入或移除,其中会将出队的Node节点做为新的队列头,返回并置空Node的item元素
ArrayBlockingQueue
: 必须指定队列的容量
LinkedBlockingQueue
: 能够指定队列的容量,不指定时,容量为 Integer.MAX_VALUE