今天来介绍Java并发编程中最受欢迎的同步类——堪称并发一枝花之BlockingQueue。java
JDK版本:oracle java 1.8.0_102node
继续阅读以前,需确保你对锁和条件队列的使用方法烂熟于心,特别是条件队列,不然你可能没法理解如下源码的精妙之处,甚至基本的正确性。本篇暂不涉及此部份内容,需读者自行准备。git
BlockingQueue继承自Queue,增长了阻塞的入队、出队等特性:github
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
void put(E e) throws InterruptedException;
// can extends from Queue. i don't know why overriding here
boolean offer(E e);
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
// extends from Queue
// E poll();
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
boolean remove(Object o);
public boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}复制代码
为了方便讲解,我调整了部分方法的顺序,还增长了注释辅助说明。编程
须要关注的是两对方法:数组
BlockingQueue有不少实现类。根据github的code results排名,最经常使用的是LinkedBlockingQueue(253k)和ArrayBlockingQueue(95k)。LinkedBlockingQueue的性能在大部分状况下优于ArrayBlockingQueue,本文主要介绍LinkedBlockingQueue,文末会简要说起两者的对比。安全
两个阻塞方法相对简单,有助于理解LinkedBlockingQueue的核心思想:在队头和队尾各持有一把锁,入队和出队之间不存在竞争。数据结构
前面在Java实现生产者-消费者模型中按部就班的引出了BlockingQueue#put()和BlockingQueue#take()的实现,能够先去复习一下,了解为何LinkedBlockingQueue要如此设计。如下是更细致的讲解。并发
在队尾入队。putLock和notFull配合完成同步。oracle
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}复制代码
如今触发一个入队操做,分状况讨论。
入队前需获得锁putLock。检查队列非满,无需等待条件notFull,直接入队。入队后,检查队列非满(精确说是入队前“将满”,但不影响理解),随机通知一个生产者条件notFull知足。最后,检查入队前队列非空,则无需通知条件notEmpty。
注意点:
“单次通知”
,目的是减小无效竞争。但这不会产生“信号劫持”的问题,由于只有生产者在等待该条件。入队前需获得锁putLock。检查队列满,则等待条件notFull。条件notFull可能由出队成功触发(必要的),也可能由入队成功触发(也是必要的,避免“信号不足”的问题)。条件notFull知足后,入队。入队后,假设检查队列满(队列非满的状况同case1),则无需通知条件notFull。最后,检查入队前队列非空,则无需通知条件notEmpty。
注意点:
补充signalNotEmpty()、signalNotFull()的实现:
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}复制代码
入队前需获得锁putLock。检查队列空,则无需等待条件notFull,直接入队。入队后,若是队列非满,则同case1;若是队列满,则同case2。最后,假设检查入队前队列空(队列非空的状况同case1),则随机通知一个消费者条件notEmpty知足。
注意点:
“条件通知”
,是一种减小无效通知的措施。由于若是队列非空,则出队操做不会阻塞在条件notEmpty上。另外一方面,虽然已经有生产者完成了入队,但可能有消费者在生产者释放锁putLock后、通知条件notEmpty知足前,使队列变空;不过这没有影响,take()方法的while循环可以在线程竞争到锁以后再次确认。case4是一个特殊状况,分析方法相似于case1,但可能入队与出队之间存在竞争,咱们稍后分析。
在队头入队。takeLock和notEmpty配合完成同步。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}复制代码
依旧是四种case,put()和take()是对偶的,很容易分析,不赘述。
队列长度为1时,到底入队和出队之间存在竞争吗?这取决于LinkedBlockingQueue的底层数据结构。
最简单的是使用朴素链表,能够本身实现,也可使用JDK提供的非线程安全集合类,如LinkedList等。可是,队列长度为1时,朴素链表中的head、tail指向同一个节点,从而入队、出队更新同一个节点时存在竞争。
朴素链表:一个节点保存一个元素,不加任何控制和trick。典型如LinkedList。
增长dummy node可解决该问题(或者叫哨兵节点什么的)。定义Node(item, next),描述以下:
在新的数据结构中,更新操做发生在dummy和tail上,head仅仅做为示意存在,跟随dummy节点更新。队列长度为1时,虽然head、tail仍指向同一个节点,但dummy、tail指向不一样的节点,从而更新dummy和tail时不存在竞争。
源码中的head即为dummy
,first即为head
:
...
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
...
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
...
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
...复制代码
以put()为例,count自增必定要晚于enqueue执行,不然take()方法的while循环检查会失效。
用一个最简单的场景来分析,只有一个生产者线程T1,一个消费者线程T2。
假设目前队列长度0,则事件发生顺序:
很明显,在事件1发生后事件4发生前,虽然count>0,但队列中实际是没有元素的。所以,事件3 dequeue会执行失败(预计抛出NullPointerException)。事件4也就不会发生了。
若是先enqueue再count自增,就不会存在该问题。
仍假设目前队列长度0,则事件发生顺序:
换个方法,用状态机来描述:
状态S1
状态S2
状态S2
状态S1
状态S1
不少读者可能第一次从状态机的角度来理解并发程序设计,因此猴子选择先写出状态迁移序列,若是能理解上述序列,咱们再进行进一步的抽象。实际的状态机定义比下面要严谨的多,不过这里的描述已经足够了。
如今补充定义以下,不考虑入队和出队的区别:
状态S1
状态S2
LinkedBlockingQueue中的同步机制保证了不会有其余线程看到状态S2,即,S1->S2->S1两个状态转换只能由线程T1连续完成,其余线程没法在中间插入状态转换。
在猴子的理解中,并发程序设计的本质是状态机,即维护合法的状态和状态转换。以上是一个极其简单的场景,用状态机举例子就能够描述;然而,复杂场景须要用状态机作数学证实,这使得用状态机描述并发程序设计不太受欢迎(虽然口头描述也不能算严格证实)。不过,理解实现中的各类代码顺序、猛不丁蹦出的trick,这些只是“知其因此然”;经过简单的例子来掌握其状态机本质,才能让咱们了解其如何保证线程安全性,本身也能写出相似的实现,作到“知其然而知其因此然”。后面会继续用状态机分析ConcurrentLinkedQueue的源码,敬请期待。
分析了两个阻塞方法put()、take()后,非阻塞方法就简单了。
以offer为例,poll()同理。假设此时队列非空。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}复制代码
入队前需获得锁putLock。检查队列非满(隐含代表“无需等待条件notFull”),直接入队。入队后,检查队列非满,随机通知一个生产者(包括使用put()方法的生产者,下同)条件notFull知足。最后,检查入队前队列非空,则无需通知条件notEmpty。
能够看到,瞬时版offer()在队列非满时的行为与put()相同。
入队前需获得锁putLock。检查队列满,直接退出try-block。后同case1。
队列满时,offer()与put()的区别就显现出来了。put()经过while循环阻塞,一直等到条件notFull获得知足;而offer()却直接返回。
一个小point:
c在申请锁putLock前被赋值为-1。接下来,若是入队成功,会执行
c = count.getAndIncrement();
一句,则释放锁后,c的值将大于等于0。因而,这里直接用c是否大于等于0来判断是否入队成功。这种实现牺牲了可读性,只换来了无足轻重的性能或代码量的优化。本身在开发时,不要编写这种代码。
同上,以offer()为例。假设此时队列非空。
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}复制代码
该方法同put()很像,12-13行判断nanos超时的状况(吞掉了timeout参数非法的异常状况),因此区别只有14行:将阻塞的notFull.await()
换成非阻塞的超时版notFull.awaitNanos(nanos)
。
awaitNanos()的实现有点意思,这里不表。其实现类中的Javadoc描述很是干练:“Block until signalled, interrupted, or timed out.”,返回值为剩余时间。剩余时间小于等于参数nanos,表示:
nanos首先被初始化为timeout;接下来,消费者线程可能阻塞、收到信号屡次,每次收到信号被唤醒,返回的剩余时间都大于0并小于等于参数nanos,再用剩余时间做为下次等待的参数nanos,直到剩余时间小于等于0。以此实现总时长不超过timeout的超时检测。
其余同put()方法。
12-13行判断nanos参数非法后,直接返回了false。实现有问题,有可能违反接口声明。
根据Javadoc的返回值声明,返回值true表示入队成功,false表示入队失败。但若是传进来的timeout是一个负数,那么5行初始化的nanos也将是一个负数;进而一进入while循环,就在13行返回了false。然而,这是一种参数非法的状况,返回false让人误觉得参数正常,只是入队失败。这违反了接口声明,而且很是难以发现。
应该在函数头部就将参数非法的状况检查出来,相应抛出IllegalArgumentException。
github上LinkedBlockingQueue和ArrayBlockingQueue的使用频率都很高。大部分状况下均可以也建议使用LinkedBlockingQueue,但清楚两者的异同点,方能对症下药,在针对不一样的优化场景选择最合适的方案。
相同点:
不一样点
能够看到,LinkedBlockingQueue总体上是优于ArrayBlockingQueue的。因此,除非某些特殊缘由,不然应优先使用LinkedBlockingQueue。
可能不全,欢迎评论,随时增改。
没有。
本文连接:源码|并发一枝花之BlockingQueue
做者:猴子007
出处:monkeysayhi.github.io
本文基于 知识共享署名-相同方式共享 4.0 国际许可协议发布,欢迎转载,演绎或用于商业目的,可是必须保留本文的署名及连接。