ConcurrentLinkedQueue是JUC中的基于链表的无锁队列实现。本文将解读其源码实现。java
ConcurrentLinkedQueue的实现是以Maged M. Michael和Michael L. Scott的论文Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms为原型进行改造的,不妨阅读此篇论文。node
下面我将论文中的介绍的入队与出队用接近Java语言的形式改写并加上注释。数据结构
enq node = new Node(value, null) loop tail = this.tail next = tail.next # 若是tail已经不是尾节点,重试循环。 if tail == this.tail # 队列处于稳定状态,尝试插入节点。 if next == null # 插入新节点,将尾节点与新节点连接起来。 # 若是成功则退出循环,不然重试。 if CAS(tail.next, next, <node, next.count+1> break # 队列处于中间状态,推动尾节点。 else CAS (this.tail, tail, <next, tail.count+1>) # 将尾节点更新为新插入的节点,失败不要紧,说明其它线程更新了尾节点。 CAS(this.tail, tail, <node, tail.count+1>) deq loop head = this.head tail = this.tail next = head.next # 若是head已经不是头节点,重试循环。 if head == this.head if head == tail # 队列处于稳定状态则出队失败。 if next == null return false # 有其它线程正在入队,推动尾节点。 CAS(this.tail, tail, <next, tail.count+1>) else # 成功将队列头节点CAS到下一个节点则出队成功,退出循环。 if CAS(this.head, head, <next, head.count+1>) break return true
因为Java自带垃圾回收,加上ConcurrentLinkedQueue对节点进行CAS且其内外方法都保证了节点不会复用,因此并不会出现ABA问题,所以节点不须要版本号。oop
正如典型的队列设计,内部的节点用以下的Node类表示优化
/** * 仅展现属性,其他略去。 */ private static class Node<E> { volatile E item; volatile Node<E> next; }
值得一提的是Node中有一个lazySetNext方法this
void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); }
与AtomicReference类同样,使用了UNSAFE.putOrderedObject方法来实现低延迟的写入。这个方法会插入Store-Store内存屏障,也就是保证写操做不会重排。而不会插入普通volatile写会插入的Store-Load屏障。线程
ConcurrentLinkedQueue在构造时会初始化head和tail为一个item为null的节点,做为哨兵节点。设计
private transient volatile Node<E> head; private transient volatile Node<E> tail;
ConcurrentLinkedQueue的源码仍是有些晦涩难懂的,可是doc很是详细,对阅读源码很是有帮助。若是带着从doc中介绍的设计与实现思路去读源码会轻松很多。指针
ConcurrentLinkedQueue是不容许向其插入空的item的,对于删除元素,会将其item给CAS为null,一旦某个元素的item变为null,就意味着它再也不是队列中的有效元素了,而且会将已删除节点的next指针指向自身。
这样能够实现尽量快地从已删除的元素跳事后面删除的元素,回到队列中。rest
ConcurrentLinkedQueue具备如下这些性质:
这里提到了succ方法,那么先睹为快,看一下succ方法吧。
final Node<E> succ(Node<E> p) { Node<E> next = p.next; // 若是next就是自身(表明已经不在队列中),则返回head,不然返回next。 return (p == next) ? head : next; }
由于ConcurrentLinkedQueue中的head和tail均可能会滞后,这实际上是一种避免频繁CAS的优化。固然过分的滞后也是会影响操做效率的,因此在具体实现的时候,会尽量能有机会更新head和tail就去更新它们。
public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; // 若是p的next为null,则说明此刻p为队列中最后一个元素。 if (q == null) { /* * cas成功则newNode成功入队,只是此刻tail仍是老的。 * 不然说明由于线程竞争的关系没有成功入队,须要重试。 */ if (p.casNext(null, newNode)) { /* * t是当前线程读到的tail快照,p是上面CAS时队列中最后一个元素。 * 这二者不一致说明该更新tail了。 * 若是CAS失败则说明tail已经被其它线程更新过了,这不要紧。 */ if (p != t) casTail(t, newNode); return true; } } /* * ConcurrentLinkedQueue的一个设计就是对于已经移除的元素, * 会将next置为自己,用于判断当前元素已经出队,接着从head继续遍历(能够看succ方法)。 * * 在整个offer方法的执行过程当中,p必定是等于t或者在t的后面的, * 所以若是p已经不在队列中的话,t也必定不在队列中了。 * * 因此从新读取一次tail到快照t, * 若是t未发生变化,就从head开始继续下去。 * 不然让p重新的t开始继续尝试入队是一个更好的选择(此时新的t极可能在head后面) */ else if (p == q) p = (t != (t = tail)) ? t : head; else /* * 若是p与t相等,则让p继续向后移动一个节点。 * * 若是p和t不相等,则说明已经经历至少两轮循环(仍然没有入队), * 则从新读取一次tail到t,若是t发生了变化,则从t开始再次尝试入队。 */ p = (p != t && t != (t = tail)) ? t : q; } }
public E poll() { restartFromHead: for (;;) { // p初始设置为head。 for (Node<E> h = head, p = h, q;;) { E item = p.item; /* * 成功将item给CAS为null则说明成功移除了元素。 * 这里的item != null判断也是为了尽量避免无心义的CAS。 */ if (item != null && p.casItem(item, null)) { /* * p若是与h不相等,则说明head极可能滞后,指向已不在队列中的元素。 * 若是此时p有后继,则更新head为p.next, * 不然尽管p已经被移除出去了,也只能更新head为p了。 */ if (p != h) updateHead(h, ((q = p.next) != null) ? q : p); return item; } /* * 若是没能成功移除p,且p也没有后继,则说明p为此时队列的最后元素。 * 因此更新head为p并返回null。 * * 注意这里h和p是可能相等的,updateHead会判断h和p是否相等以免无心义CAS。 */ else if ((q = p.next) == null) { updateHead(h, p); return null; } /* * p存在后继,须要检查是否p还在队列中。 * 若是p已经不在队列中(p==q),则从新读一次head到快照h并让p从h开始再尝试移除元素。 * * 由于必定有其它线程已经经过updateHead将head从p给CAS为新的head而且令p节点的next指向p本身, * 这时再一步步日后面走显然不值得,不如从如今的head开始从新来过。 */ else if (p == q) continue restartFromHead; // 继续向后走一个节点尝试移除元素。 else p = q; } } } final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); }
public E peek() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; // 其实这里的if就是将poll中的if前两个分支作了个合并。 if (item != null || (q = p.next) == null) { updateHead(h, p); return item; } else if (p == q) continue restartFromHead; else p = q; } } }
public boolean remove(Object o) { if (o != null) { Node<E> next, pred = null; // p为当前节点,pred为p前驱,next为后继。 for (Node<E> p = first(); p != null; pred = p, p = next) { boolean removed = false; E item = p.item; // item为null表明元素已经无效(认为不在队列中) if (item != null) { // 不是要删除的元素。 if (!o.equals(item)) { next = succ(p); continue; } removed = p.casItem(item, null); } next = succ(p); if (pred != null && next != null) // 前驱与后继连上。 pred.casNext(p, next); if (removed) return true; } } return false; }
/** * size方法效率其实挺差的,是一个O(n)的遍历。 */ public int size() { int count = 0; for (Node<E> p = first(); p != null; p = succ(p)) if (p.item != null) // 最多只返回Integer.MAX_VALUE if (++count == Integer.MAX_VALUE) break; return count; } /** * 这个方法和poll/peek方法差很少,只不过返回的是Node而不是元素。 * * 之因此peek方法没有复用first方法的缘由有2点 * 1. 会增长一次volatile读 * 2. 有可能会由于和poll方法的竞争,致使出现非指望的结果。 * 好比first返回的node非null,里面的item也不是null。 * 可是等到poll方法返回从first方法拿到的node的item的时候,item已经被poll方法CAS为null了。 * 那这个问题只能再peek中增长重试,这未免代价过高了。 * * 这就是first和peek代码没有复用的缘由。 */ Node<E> first() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { boolean hasItem = (p.item != null); if (hasItem || (q = p.next) == null) { updateHead(h, p); return hasItem ? p : null; } else if (p == q) continue restartFromHead; else p = q; } } }