前面学习了基于数组的非阻塞双端队列
ArrayDeque
,其内部维护一个数组和指向队列头和队列尾索引的两个成员变量;本篇则探究下基于数组的阻塞队列是什么样的数据结构,又有什么特性,相较于ArrayDeque
又有什么异同;而后就是使用场景了java
先看内部成员变量定义, 和 ArrayDequeue
相比,差异不大,一个数组,两个索引;此外多了一个锁和两个断定条件数组
/** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count; /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
注意安全
count
直接表示队列的元素个数(注意DelayQueue是经过遍从来获取队列长度,且并发修改会有问题,那么这个是如何保证并发的?)数据结构以下图数据结构
并发
分析阻塞原理以前,先经过注释解释下ArrayBlockingQueue
的使用场景源码分析
通用的进队方法以下,是非阻塞的方式,当数组满时,直接返回false,为保证并发安全,进队操做是加锁实现学习
public boolean offer(E e) { // 非空校验 checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); // 进队加锁 try { if (count == items.length) // 队列满,则直接返回false return false; else { enqueue(e); return true; } } finally { lock.unlock(); } } // 直接将元素塞入数组 private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
阻塞方式的进队实现以下ui
public void put(E e) throws InterruptedException { checkNotNull(e); // 非空判断 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 获取锁 try { while (count == items.length) { // 一直阻塞,知道队列非满时,被唤醒 notFull.await(); } enqueue(e); // 进队 } finally { lock.unlock(); } } public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { // 阻塞,知道队列不满 // 或者超时时间已过,返回false if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } }
源码分析,阻塞入队的逻辑比较清晰,小结一下this
offer(e)
put(e)
或 offer(e, timeout, unit)
notEmpty.signal()
唤起被阻塞的出队线程非阻塞出队方法以下url
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; // 直接将队头扔出去,并置空数组中该位置 // 并移动队列头到下一位 @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) // 保障在遍历时,能够进行出队操做 itrs.elementDequeued(); notFull.signal(); return x; }
阻塞的实现,逻辑比较清晰,首先竞争锁,判断是否为空,是阻塞直到非空;不然弹出队列头元素
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } }
小结
poll()
方法take()
或 poll(long,TimeUnit)
方法建立线程池时,一般会用到 ArrayBlockingQueue
或者LinkedBlockingQueue
,如
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2));
延迟队列也是并发安全,ArrayBlockingQueue
相比较 DelayQueue
应用场景的区别主要在
getDelay()
返回值小于0)基于数组阻塞队列ArrayBlockingQueue
take()
或 poll(long, TimeUnit)
poll()
offer(E, long, TimeUnit)
或put(E)
offer(E)
add(E)