阻塞队列的实现前面已经讲解完毕,今天咱们继续了解源码中非阻塞队列的实现,接下来就看一看ConcurrentLinkedQueue非阻塞队列是怎么完成操做的java
JDK版本号:1.8.0_171
ConcurrentLinkedQueue是一个基于链表实现的无界的线程安全的FIFO非阻塞队列。最大的不一样之处在于非阻塞特性,以前讲解的阻塞队列都会经过各类方式进行阻塞操做,在ConcurrentLinkedQueue中经过CAS操做来完成非阻塞操做。其中head和tail的更新相似以前在LinkedTransferQueue中讲解的slack(松弛度)机制,只有在slack阈值大于等于2时才会进行更新,尽可能减小CAS的操做次数,固然,这样的操做也提升了代码实现的复杂度node
从关系图上咱们也能够看到ConcurrentLinkedQueue没有去实现BlockingQueue接口安全
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable
为了了解其内部实现的操做,能够看下面的过程图理解其内部结点入队出队的转换过程多线程
除了CAS须要使用的常量,就只剩下head和tail两个引用结点,在其注释部分能够看到做者的说明,这里解释下:函数
head结点:this
tail结点(tail的next为null):spa
因为head结点和tail结点不是实时更新,达到松弛度阈值才进行更新,有可能致使head结点在tail结点以后的现象线程
/** * A node from which the first live (non-deleted) node (if any) * can be reached in O(1) time. * Invariants: * - all live nodes are reachable from head via succ() * - head != null * - (tmp = head).next != tmp || tmp != head * Non-invariants: * - head.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! */ private transient volatile Node<E> head; /** * A node from which the last node on list (that is, the unique * node with node.next == null) can be reached in O(1) time. * Invariants: * - the last node is always reachable from tail via succ() * - tail != null * Non-invariants: * - tail.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! * - tail.next may or may not be self-pointing to tail. */ private transient volatile Node<E> tail; // CAS操做 private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentLinkedQueue.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail")); } catch (Exception e) { throw new Error(e); } }
Node实现比较简单,没复杂的部分,主要是经过CAS操做进行更新变量指针
private static class Node<E> { volatile E item; volatile Node<E> next; /** * Constructs a new node. Uses relaxed write because item can * only be seen after publication via casNext. */ Node(E item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
无参构造方法建立了空结点同时头尾结点指向这个空结点,集合参数构造时循环添加结点,比较简单,主要须要理解默认无参构造函数建立时发生的变化rest
public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); } public ConcurrentLinkedQueue(Collection<? extends E> c) { Node<E> h = null, t = null; for (E e : c) { checkNotNull(e); Node<E> newNode = new Node<E>(e); if (h == null) h = t = newNode; else { t.lazySetNext(newNode); t = newNode; } } if (h == null) h = t = new Node<E>(null); head = h; tail = t; }
h != p的前提条件下尝试更新head指向到p,成功则尝试更新原head结点指向到本身,表示结点离队
/** * Tries to CAS head to p. If successful, repoint old head to itself * as sentinel for succ(), below. */ final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); }
获取p结点的后继结点,当next指向本身时,结点自己可能已经处于离队状态,此时返回head结点
/** * Returns the successor of p, or the head node if p.next has been * linked to self, which will only be true if traversing with a * stale pointer that is now off the list. */ final Node<E> succ(Node<E> p) { Node<E> next = p.next; return (p == next) ? head : next; }
入队操做核心方法,入队必成功,返回为true,表示入队会一直尝试操做直到成功,循环尝试中主要分为3种状况:
public boolean offer(E e) { // 判空 checkNotNull(e); final Node<E> newNode = new Node<E>(e); // 循环直到成功 for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; // p此时是最后一个结点 if (q == null) { // 则开始尝试更新p的next指向新插入的结点 // p的next未更新成功说明next被其余结点抢先更新了,从新循环判断尝试 if (p.casNext(null, newNode)) { // tail指向结点后已经添加了1个结点以上时才更新tail结点指向 // 即slack >= 2时才尝试更新 if (p != t) // hop two nodes at a time // 失败可能被其余线程更新了 casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } // p非最后一个结点,同时p = p.next则表示p自己已经离队,须要更新p else if (p == q) // tail结点已经被更新则更新tail不然从head结点开始寻找最后一个结点 p = (t != (t = tail)) ? t : head; else // p非最后一个结点,同时p未离队删除,若是tail被其余线程更新了则p更新成新的tail,不然p更新成p.next继续循环 p = (p != t && t != (t = tail)) ? t : q; } }
出队操做核心方法,一直尝试直到成功,循环尝试中主要分为4种状况:
public E poll() { restartFromHead: // 循环尝试直到成功 for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; // item非null则尝试更新成null(表示结点已出队) if (item != null && p.casItem(item, null)) { // 出队结点非以前的头结点,旧头结点h距离实际的head结点已经大于1则更新head if (p != h) // hop two nodes at a time // 出队结点后无结点则尝试更新head结点为出队结点自己(item = null),有结点则更新到出队结点后的那个结点 updateHead(h, ((q = p.next) != null) ? q : p); return item; } // item为空或item已被其余线程获取 // p结点自己为最后一个结点,则尝试更新head并更新原h结点指向本身,返回null else if ((q = p.next) == null) { updateHead(h, p); return null; } // p非最后一个结点,p == p.next 则表示p结点已经离队,则跳转restartFromHead从头从新循环判断 else if (p == q) continue restartFromHead; // p非最后一个结点,p也未离队,则更新p指向p的下一个结点,循环判断 else p = q; } } }
和poll方法相似,主要在于不会对结点进行出队操做,仅仅是获取头结点的item值,固然中间也可能帮助更新下head指向
public E peek() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null || (q = p.next) == null) { updateHead(h, p); return item; } else if (p == q) continue restartFromHead; else p = q; } } }
和poll方法相似,poll返回的是item这里返回的是结点,若是是null结点(item == null),这里判断下直接返回null
Node<E> first() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { boolean hasItem = (p.item != null); if (hasItem || (q = p.next) == null) { updateHead(h, p); return hasItem ? p : null; } else if (p == q) continue restartFromHead; else p = q; } } }
从队列中删除元素,经过item是否为null判断结点是否已经离队,是则继续后继判断,casItem(item, null)成功则表示移除结点成功,失败则表示被其余线程出队操做了,则继续后继判断
public boolean remove(Object o) { if (o != null) { Node<E> next, pred = null; for (Node<E> p = first(); p != null; pred = p, p = next) { boolean removed = false; E item = p.item; // item判断(非离队结点),不知足则继续判断后继结点 if (item != null) { if (!o.equals(item)) { next = succ(p); continue; } // 找到匹配结点则尝试更新item为null,表示当前结点已经离队 removed = p.casItem(item, null); } // 后继结点,到这说明匹配的结点已经删除了(别的线程删除或者当前线程删除) next = succ(p); // 取消匹配结点的关联 if (pred != null && next != null) // unlink pred.casNext(p, next); // 假如是当前线程删除的结点则返回,不然继续判断后继直到匹配或没有匹配结点才返回 if (removed) return true; } } return false; }
将集合c中的元素添加到队列中,添加到原队列尾部相似于上面的offer方法
public boolean addAll(Collection<? extends E> c) { if (c == this) // As historically specified in AbstractQueue#addAll throw new IllegalArgumentException(); // 定义两个指针结点指向集合c的头尾 // 先将c改形成Node链表 Node<E> beginningOfTheEnd = null, last = null; for (E e : c) { checkNotNull(e); Node<E> newNode = new Node<E>(e); if (beginningOfTheEnd == null) beginningOfTheEnd = last = newNode; else { last.lazySetNext(newNode); last = newNode; } } if (beginningOfTheEnd == null) return false; for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; // p为队列最后一个结点 if (q == null) { // 将队列与上面新建立的链表链接起来,更新失败再循环继续 if (p.casNext(null, beginningOfTheEnd)) { // tail更新失败从新尝试 if (!casTail(t, last)) { t = tail; if (last.next == null) casTail(t, last); } return true; } } // p非最后一个结点且已经离队 else if (p == q) // tail结点已经被更新则更新为tail不然从head结点开始寻找最后一个结点 p = (t != (t = tail)) ? t : head; else // p非最后一个结点,同时p未离队删除,若是tail被其余线程更新了则p更新成新的tail,不然p更新成p.next继续循环 p = (p != t && t != (t = tail)) ? t : q; } }
迭代器和以前队列讲解的迭代器类似,源码不是很复杂,同时remove方法这里是将item置为null,先后结点关联关系并不会操做,防止多线程遍历出现问题
构造方法中执行了advance()方法,提早设置好下次next执行时的结点nextNode,以及其item引用,hasNext判断nextNode便可,保证了迭代器的弱一致性,一旦hasNext返回true,那么调用next必定会获得相对应的item,即便该结点item已经被置为null
public Iterator<E> iterator() { return new Itr(); } private class Itr implements Iterator<E> { /** * next返回的Node */ private Node<E> nextNode; /** * 保存next的item,防止hasNext为true后结点被删除再调用next获取不到值的状况 */ private E nextItem; /** * 最近一次调用next返回的结点,若是经过调用remove删除了此元素,则重置为null,避免删除了不应删除的元素 */ private Node<E> lastRet; /** * 构造的时候就先保存了第一次调用next返回的Node */ Itr() { advance(); } /** * Moves to next valid node and returns item to return for * next(), or null if no such. */ private E advance() { lastRet = nextNode; E x = nextItem; Node<E> pred, p; if (nextNode == null) { p = first(); pred = null; } else { pred = nextNode; p = succ(nextNode); } for (;;) { if (p == null) { nextNode = null; nextItem = null; return x; } E item = p.item; if (item != null) { nextNode = p; nextItem = item; return x; } else { // 跳过null结点 Node<E> next = succ(p); if (pred != null && next != null) pred.casNext(p, next); p = next; } } } public boolean hasNext() { return nextNode != null; } public E next() { if (nextNode == null) throw new NoSuchElementException(); return advance(); } public void remove() { Node<E> l = lastRet; if (l == null) throw new IllegalStateException(); // rely on a future traversal to relink. l.item = null; lastRet = null; } }
ConcurrentLinkedQueue是一个基于链表实现的无界的线程安全的FIFO非阻塞队列,总体源码上最主要的部分在于两点:
结合图解很容易理清其实现以及操做流程,相比较于以前的LinkedTransferQueue源码算是简单了不少
以上内容若有问题欢迎指出,笔者验证后将及时修正,谢谢