首先声明,本文是伪源码分析。主要是基于状态机本身实现一个简化的并发队列,有助于读者掌握并发程序设计的核心——状态机;最后对源码实现略有说起。java
ConcurrentLinkedQueue不支持阻塞,没有BlockingQueue那么易用;但在中等规模的并发场景下,其性能却比BlockingQueue高很多,并且至关稳定。同时,ConcurrentLinkedQueue是学习CAS的经典案例。根据github的code results排名,ConcurrentLinkedQueue(164k)也十分流行,比我想象中的使用量大多了。很是值得一讲。node
对于状态机和并发程序设计的基本理解,能够参考源码|并发一枝花之BlockingQueue,建议第一次接触状态机的同窗速读参考文章以后,再来阅读此文章。git
JDK版本:oracle java 1.8.0_102github
读者能够跳过这部分,后面讲到offer()方法的实现时再回顾。面试
一般认为乐观锁的性能比悲观所更高,特别是在某些复杂的场景。这主要因为悲观锁在加锁的同时,也会把某些不会形成破坏的操做保护起来;而乐观锁的竞争则只发生在最小的并发冲突处,若是用悲观锁来理解,就是“锁的粒度最小”。但乐观锁的设计每每比较复杂,所以,复杂场景下仍是多用悲观锁。安全
首先保证正确性,有必要的话,再去追求性能。bash
乐观锁的实现每每须要硬件的支持,多数处理器都都实现了一个CAS指令,实现“Compare And Swap”的语义(这里的swap是“换入”,也就是set),构成了基本的乐观锁。数据结构
CAS包含3个操做数:并发
当且仅当位置V的值等于A时,CAS才会经过原子方式用新值B来更新位置V的值;不然不会执行任何操做。不管位置V的值是否等于A,都将返回V原有的值。oracle
一个有意思的事实是,“使用CAS控制并发”与“使用乐观锁”并不等价。CAS只是一种手段,既能够实现乐观锁,也能够实现悲观锁。乐观、悲观只是一种并发控制的策略。下文将分别用CAS实现悲观锁和乐观锁? 咱们先不讲JDK提供的实现,用状态机模型来分析一下,看咱们能不能本身实现一版。
状态机模型与是否须要并发无关,一个类无论是不是线程安全的,其状态机模型从类被实现(此时,全部类行为都是肯定的)开始就是肯定的。接口是类行为的一个子集,咱们从接口出发,逐渐构建出简化版ConcurrentLinkedQueue的状态机模型。
ConcurrentLinkedQueue实现了Queue接口:
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
}
复制代码
须要关注的是一对方法:
同时,理想的线程安全队列中,入队和出队之间不该该存在竞争,这样入队的状态机模型和出队的状态机模型能够彻底解耦,互不影响。
对咱们的状态机做出两个假设:
从而,能够先分析入队,再参照分析出队;而后可尝试去掉假设2,看如何完善咱们的实现来保证假设2成立;最后看看真·神Doug Lea如何实现,学习一波。
如今基于假设1和假设2,尝试定义入队模型的状态机。
咱们构造一个简化的场景:存在2个生产者P一、P2,同时触发入队操做。
若是是单线程环境,入队操做将是这样的:
// 准备
newNode.next = null;
curTail = tail;
// 入队前
assert tail == curTail && tail.next == null; // 状态S1
// 开始入队
tail.next = newNode; // 事件E1
// 入队中
assert tail == curTail && tail.next == newNode; // 状态S2
tail = tail.next; // 事件E2
// 结束入队
// 入队后
assert tail == newNode && tail.next == null; // 状态S3,合并到状态S1
复制代码
该过程涉及对两个域的修改:tail.next、tail。则随着操做的进行,队列会经历2种状态:
两个事件分别对应两个状态转换:
是否是很熟悉?由于ConcurrentLinkedQueue也是队列,必然同BlockingQueue类似甚至相同。区别在于如何维护这些状态和状态转换。
依赖CAS,两个状态转换T一、T2均可以实现为原子操做。留给咱们的问题是,如何维护合法的状态转换。
入队过程须要通过两个状态转换,且这两个状态转换必须连续发生。
不严谨。“连续”并非必要的,最后分析源码的时候会看到。不过,咱们暂时使用强一致性的模型。
LinkedBlockingQueue的思路便是如此。这是一种悲观策略——一次开门只放进来一个生产者,彷佛只能像LinkedBlockingQueue那样,用传统的锁putLock实现,实际上,依靠CAS也能实现:
public class ConcurrentLinkedQueue1<E> {
private volatile Node<E> tail;
public ConcurrentLinkedQueue1() {
throw new UnsupportedOperationException("Not implement");
}
public boolean offer(E e) {
Node<E> newNode = new Node<E>(e, new AtomicReference<>(null));
while (true) {
Node<E> curTail = tail;
AtomicReference<Node<E>> curNext = curTail.next;
// 尝试T1:CAS设置tail.next
if (curNext.compareAndSet(null, newNode)) {
// 成功者视为得到独占锁,完成了T1。直接执行T2:设置tail
tail = curNext.get();
return true;
}
// 失败者自旋等待
}
}
private static class Node<E> {
private volatile E item;
private AtomicReference<Node<E>> next;
public Node(E item, AtomicReference<Node<E>> next) {
this.item = item;
this.next = next;
}
}
}
复制代码
再来分析下T一、T2两个状态转换:
思路1是悲观的,认为T一、T2必须都由P1完成,若是P2插入就会“搞破坏”。而思路2则打开大门,欢迎任何“有能力”的生产者完成T2,是典型的乐观策略。
public class ConcurrentLinkedQueue2<E> {
private AtomicReference<Node<E>> tail;
public ConcurrentLinkedQueue2() {
throw new UnsupportedOperationException("Not implement");
}
public boolean offer(E e) {
Node<E> newNode = new Node<E>(e, new AtomicReference<>(null));
while (true) {
Node<E> curTail = tail.get();
AtomicReference<Node<E>> curNext = curTail.next;
// 尝试T1:CAS设置tail.next
if (curNext.compareAndSet(null, newNode)) {
// 成功者完成了T1,队列处于S2,继续尝试T2:CAS设置tail
tail.compareAndSet(curTail, curNext.get());
// 成功表示该生产者P1完成连续完成了T一、T2,队列处于S1
// 失败表示T2已经由生产者P2完成,队列处于S1
return true;
}
// 失败者得知队列处于S2,则尝试T2:CAS设置tail
tail.compareAndSet(curTail, curNext.get());
// 若是成功,队列转换到S1;若是失败,队列表示T2已经由生产者P1完成,队列已经处于S1
// 而后循环,从新尝试T1
}
}
private static class Node<E> {
private volatile E item;
private AtomicReference<Node<E>> next;
public Node(E item, AtomicReference<Node<E>> next) {
this.item = item;
this.next = next;
}
}
}
复制代码
咱们涉及的状态比较少(只有2个状态),继续看看可否减小无效的竞争,好比:
public class ConcurrentLinkedQueue3<E> {
private AtomicReference<Node<E>> tail;
public ConcurrentLinkedQueue3() {
throw new UnsupportedOperationException("Not implement");
}
public boolean offer(E e) {
Node<E> newNode = new Node<E>(e, new AtomicReference<>(null));
while (true) {
Node<E> curTail = tail.get();
AtomicReference<Node<E>> curNext = curTail.next;
// 先检查一下队列状态的状态,tail.next==null表示队列处于状态S1,仅此时才有CAS尝试T1的必要
if (curNext.get() == null) {
// 若是处于S1,尝试T1:CAS设置tail.next
if (curNext.compareAndSet(null, newNode)) {
// 成功者完成了T1,队列处于S2,继续尝试T2:CAS设置tail
tail.compareAndSet(curTail, curNext.get());
// 成功表示该生产者P1完成连续完成了T一、T2,队列处于S1
// 失败表示T2已经由生产者P2完成,队列处于S1
return true;
}
}
// 不然队列处于处于S2,或CAS尝试T1的失败者得知队列处于S2,则尝试T2:CAS设置tail
tail.compareAndSet(curTail, curNext.get());
// 若是成功,队列转换到S1;若是失败,队列表示T2已经由生产者P1完成,队列已经处于S1
// 而后循环,从新尝试T1
}
}
private static class Node<E> {
private volatile E item;
private AtomicReference<Node<E>> next;
public Node(E item, AtomicReference<Node<E>> next) {
this.item = item;
this.next = next;
}
}
}
复制代码
注意,上述实现中,while代码块后都没有返回值。这是被编译器容许的,由于编译器能够分析出,该方法不可能运行到while代码块以后,因此while代码块后的返回值语句也是无效的。
对偶的构造一个简化的场景:存在2个消费者C一、C2,同时触发出队操做。
不须要考虑悲观策略和优化方案,咱们尝试基于思路2的第一种实现撸一版基础的poll()方法。
而后,,,没撸动。想了一下,朴素链表(如LinkedList)中,直接用head表示维护头结点没法区分“已取出item未移动head指针”和“未取出item未移动head指针”(同“已取出item已移动head指针”)两种状态。因此仍是写一写才知道深浅啊,碰巧前两天写了BlockingQueue的分析,dummy node正好派上用场。
队列初始化以下:
dummy = new Node(null, null);
// tail = dummy; // 后面会用到
// head = dummy.next; // dummy.next 表示实际的头结点,但咱们不须要存储它
复制代码
单线程环境的出队过程:
// 准备
curDummy = dummy;
curNext = curDummy.next;
oldItem = curNext.item;
// 出队前
assert dummy == curDummy && dummy.next.item == oldItem; // 状态S1
// 开始出队
dummy.next.item = null; // 事件E1
// 出队中
assert dummy == curDummy && dummy.next.item == null; // 状态S2
dummy = dummy.next; // 事件E2
// 结束出队
// 出队后
assert dummy == curNext && dummy.next.item != null; // 状态S3,合并到状态S1
复制代码
状态:
状态转换:
public class ConcurrentLinkedQueue4<E> {
private AtomicReference<Node<E>> dummy;
public ConcurrentLinkedQueue4() {
dummy = new AtomicReference<>(new Node<>(null, null));
}
public E poll() {
while (true) {
Node<E> curDummy = dummy.get();
Node<E> curNext = curDummy.next;
E oldItem = curNext.item.get();
// 尝试T1:CAS设置dummy.next.item
if (curNext.item.compareAndSet(oldItem, null)) {
// 成功者完成了T1,队列处于S2,继续尝试T2:CAS设置dummy
dummy.compareAndSet(curDummy, curNext);
// 成功表示该消费者C1完成连续完成了T一、T2,队列处于S1
// 失败表示T2已经由消费者C2完成,队列处于S1
return oldItem;
}
// 失败者得知队列处于S2,则尝试T2:CAS设置dummy
dummy.compareAndSet(curDummy, curNext);
// 若是成功,队列转换到S1;若是失败,队列表示T2已经由消费者P1完成,队列已经处于S1
// 而后循环,从新尝试T1
}
}
private static class Node<E> {
private AtomicReference<E> item;
private volatile Node<E> next;
public Node(AtomicReference<E> item, Node<E> next) {
this.item = item;
this.next = next;
}
}
}
复制代码
实际上,前面的讨论有意回避了一个问题——若是入队/出队操做顺序不一样,咱们会构造出不一样的状态机。这至关于同一个类的另外一种实现,不违反前面做出的声明:
状态机模型与是否须要并发无关,一个类无论是不是线程安全的,其状态机模型从类被实现(此时,全部类行为都是肯定的)开始就是肯定的。
继续以出队为例,假设在单线程下,采用这样的顺序出队:
// 准备
curDummy = dummy;
curNext = curDummy.next;
oldItem = curNext.item;
// 出队前
assert dummy == curDummy && dummy.item == null; // 状态S1
// 开始出队
dummmy = dummy.next; // 事件E1
// 出队中
assert dummy == curNext && dummy.item == oldItem; // 状态S2
dummy.item = null; // 事件E2
// 结束出队
// 出队后
assert dummy == curNext && dummy.item == null; // 状态S3,合并到状态S1
复制代码
看起来,这样的操做顺序更容易定义各状态:
状态转换:
实现以下:
public class ConcurrentLinkedQueue5<E> {
private AtomicReference<Node<E>> dummy;
public ConcurrentLinkedQueue5() {
dummy = new AtomicReference<>(new Node<>(null, null));
}
public E poll() {
while (true) {
Node<E> curDummy = dummy.get();
Node<E> curNext = curDummy.next;
E oldItem = curNext.item.get();
// 尝试T1:CAS设置dummmy
if (dummy.compareAndSet(curDummy, curNext)) {
// 成功者完成了T1,队列处于S2,继续尝试T2:CAS设置dummy.item
curDummy.item.compareAndSet(oldItem, null);
// 成功表示该消费者C1完成连续完成了T一、T2,队列处于S1
// 失败表示T2已经由消费者C2完成,队列处于S1
return oldItem;
}
// 失败者得知队列处于S2,则尝试T2:CAS设置dummy.item
curDummy.item.compareAndSet(oldItem, null);
// 若是成功,队列转换到S1;若是失败,队列表示T2已经由消费者P1完成,队列已经处于S1
// 而后循环,从新尝试T1
}
}
private static class Node<E> {
private AtomicReference<E> item;
private volatile Node<E> next;
public Node(AtomicReference<E> item, Node<E> next) {
this.item = item;
this.next = next;
}
}
}
复制代码
实现上面状态机的过程当中,我想出了一个针对出队操做的trick:能够去掉dummy node,用head维护头结点+一步状态转换完成出队。
对啊,我写着写着又撸出来了。。。
去掉了dummy node,那么head.item的初始状态就是非空的,下面是简化的状态机。
单线程出队的操做顺序:
// 准备
curHead = head;
curNext = curHead.next;
oldItem = curHead.item;
// 出队前
assert head == curHead; // 状态S1
// 出队
head = head.next; // 事件E1
// 出队后
assert head == curNext; // 状态S2,合并到状态S1
复制代码
出队只须要尝试head后移,成功者可从旧的头结点curHead中取出item,以后curHead将被废弃;失败者再从新尝试便可。若是在尝试前就获得了item的引用,那么E1发生后,无论成功与否,在curHead上作什么都是无所谓的了,由于事实上没有任何消费者会再去访问它。
这是一个单状态的状态机,则状态:
状态转换:
实现以下:
public class ConcurrentLinkedQueue6<E> {
private AtomicReference<Node<E>> head;
public ConcurrentLinkedQueue6() {
throw new UnsupportedOperationException("Not implement");
}
public E poll() {
while (true) {
Node<E> curHead = head.get();
Node<E> curNext = curHead.next;
// 尝试T1:CAS设置head
if (head.compareAndSet(curHead, curNext)) {
// 成功者完成了T1,队列处于S1
return curHead.item; // 只让成功者取出item
}
// 失败者重试尝试
}
}
private static class Node<E> {
private volatile E item;
private volatile Node<E> next;
public Node(E item, Node<E> next) {
this.item = item;
this.next = next;
}
}
}
复制代码
前面都是基于假设2“入队、出队无竞争”讨论的。如今须要放开假设2,看如何完善已有的实现以保证假设2成立。或者若是不能保证假设2的话,如何解决竞争问题。
根据对LinkedBlockingQueue的分析,咱们得知,若是底层数据结构是朴素链表,那么队列空或长度为1的时候,head、tail都指向同一个节点(或都为null),这时必然存在竞争;dummy node较好的解决了这一问题。ConcurrentLinkedQueue4是基于dummy node的方案,咱们尝试在此基础上修改。
回顾dummy node的使用方法(配合ConcurrentLinkedQueue2和ConcurrentLinkedQueue4作了调整和精简):
下面分状况讨论。
队列空时,队列处于一个特殊的状态,从该状态出发,仅能完成入队相关的状态转换——通俗讲就是队列空时只容许入队操做。这时消除竞争很简单,只容许入队不容许出队便可:
public class ConcurrentLinkedQueue7<E> {
private AtomicReference<Node<E>> dummy;
private AtomicReference<Node<E>> tail;
public ConcurrentLinkedQueue7() {
Node<E> initNode = new Node<E>(
new AtomicReference<E>(null), new AtomicReference<Node<E>>(null));
dummy = new AtomicReference<>(initNode);
tail = new AtomicReference<>(initNode);
// Node<E> head = dummy.get().next.get();
}
public boolean offer(E e) {
Node<E> newNode = new Node<E>(new AtomicReference<>(e), new AtomicReference<>(null));
while (true) {
Node<E> curTail = tail.get();
AtomicReference<Node<E>> curNext = curTail.next;
if (curNext.compareAndSet(null, newNode)) {
tail.compareAndSet(curTail, curNext.get());
return true;
}
tail.compareAndSet(curTail, curNext.get());
}
}
public E poll() {
while (true) {
Node<E> curDummy = dummy.get();
Node<E> curNext = curDummy.next.get();
// 既能够用 dummy.next == null (head) 判空,也能够用 tail.item == null
// 不过鉴于处于poll()方法中,使用 dummy.next 可读性更好
if (curNext == null) {
return null;
}
E oldItem = curNext.item.get();
if (curNext.item.compareAndSet(oldItem, null)) {
dummy.compareAndSet(curDummy, curNext);
return oldItem;
}
dummy.compareAndSet(curDummy, curNext);
}
}
private static class Node<E> {
private AtomicReference<E> item;
private AtomicReference<Node<E>> next;
public Node(AtomicReference<E> item, AtomicReference<Node<E>> next) {
this.item = item;
this.next = next;
}
}
}
复制代码
ConcurrentLinkedQueue7须要原子的操做item和next,所以Node的item、next域都被声明为了AtomicReference。
队列空的时候:offer()方法同ConcurrentLinkedQueue2#offer(),不须要作特殊处理;poll()方法在ConcurrentLinkedQueue4#poll()的基础上,增长了32-34行的队列空检查。须要注意的是,检查必须放在队列转换的过程当中,防止消费者C2第一次尝试时队列非空,但第二次尝试时队列变空(因为C1取出了惟一的元素)的状况。
队列长度等于1时,入队与出队不会同时修改同一节点,这时必定不会发生竞争。分析以下。
假设存在一个生产者P1,一个消费者C1,同时触发入队/出队,队列中只有一个元素,因此只两个节点dummyNode、singleNode则此时:
assert dummy == dummyNode;
assert dummy.next.item == singleNode.item;
assert tail == singleNode;
assert tail.next == singleNode.next;
复制代码
回顾ConcurrentLinkedQueue7的实现:
所以,因为dummy node的引入,队列长度为1时,入队、出队之间天生就不存在竞争。
至此,咱们从最简单的场景触发,基于状态机实现了一个支持高性能offer()、poll()方法的ConcurrentLinkedQueue7。CAS的好处暂且不表,重要的是基于状态机进行并发程序设计的思想。只有抓住其状态机的本质,才能设计出正确、高效的并发类。
若是仍是没有体会到状态机的精妙之处,能够抛开状态机,并本身尝试基于乐观策略实现ConcurrentLinkedQueue。(之因此要基于乐观策略,是由于悲观策略能够认为是乐观策略的是特例,容易让人忽略其状态机的本质)
但愿看到这里,你已经理解了ConcurrentLinkedQueue的状态机本质,由于下面就再也不是本文的重点。
真·神Doug Lea的实现基于一个弱一致性的状态机:容许队列处于多种不一致的状态,经过恰当的选择“不一致的状态”,能作到用户无感;虽然增长了状态机的复杂度,但也进一步提升了性能。
网上分析文章很是多,读者可自行阅读,有必定难度。本文不打算讲解Doug Lea的实现,贴出源码仅供你们膜拜。
经常使用的是默认的空构造函数:
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable {
...
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
...
}
复制代码
Doug Lea也使用了dummy node,不过命名为了head。初始化方法同咱们实现的ConcurrentLinkedQueue7。
ConcurrentLinkedQueue7#offer()至关于ConcurrentLinkedQueue#offer()的一个特例。
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
if (p.casNext(null, newNode)) {
if (p != t)
casTail(t, newNode);
return true;
}
}
else if (p == q)
p = (t != (t = tail)) ? t : head;
else
p = (p != t && t != (t = tail)) ? t : q;
}
}
复制代码
具体来说,ConcurrentLinkedQueue容许的多个状态大致是这样的:
状态转换的规则也随之打破——再也不须要连续完成T一、T2,能够连续执行屡次类T1,最后执行一次类T2。
for循环中的几个分支就是在处理这些一致和不一致的状态。咱们前面定义的状态机空间中只容许状态S一、S2,所以是一个子集。增长的这些不一致的状态主要是为了减小CAS次数,进一步提升队列性能,这包含两个重要意义:
增长这些不一致的状态是很危险的,如S3,当队列长度为1的时候,tail与head的位置存在交叉。Doug Lea牛逼之处在于,在保证正确性的前提下,不只经过增长状态提升了性能,还减小了实际的CAS次数。
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
复制代码
分析方法相似于offer()。注意下updateHead()。
原本是想分析ConcurrentLinkedQueue源码的,没想到写完状态机就3600多字了,干货却很少。前路漫漫,源码咱下回见。
本文连接:源码|并发一枝花之ConcurrentLinkedQueue【伪】
做者:猴子007
出处:monkeysayhi.github.io
本文基于 知识共享署名-相同方式共享 4.0 国际许可协议发布,欢迎转载,演绎或用于商业目的,可是必须保留本文的署名及连接。