ConcurrentLinkedQueue是无阻塞队列的一种实现, 依赖与CAS算法实现。node
if(q==null)
当前是尾节点 -> CAS赋值tail.next = newNode, 成功就跳出循环elseif(p == q)
尾节点被移除 -> 从tail或head从新日后找else
不是尾节点 -> 往next找当一个节点的next指向自身时, 表示节点已经被移除, 注释中还会强调这一点。算法
/** * Inserts the specified element at the tail of this queue. * As the queue is unbounded, this method will never return {@code false}. * * @return {@code true} (as specified by {@link Queue#offer}) * @throws NullPointerException if the specified element is null */ /* * 变量说明: * 成员变量: * head: 首节点 * tail: 尾节点, 不必定指向末尾, 两次入队才更新一次 * 局部变量 * t= tail; //保存循环开始时, 当时的tail值 * p= t; // 每次查找的起始位置, 可能指向首节点head或者临时尾节点t * q= p.next; // 每次循环下一个节点 * newNode= new Node; // 新节点 * * * 重要概念: * 当p = p.next时, 表示节点已经被移除 */ public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null) { // 状况1: p是尾节点 // p is last node // p是尾节点就直接将新节点放入末尾 if (p.casNext(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (p != t) // hop two nodes at a time // 一次跳两个节点, 即插入两次, tail更新一次 casTail(t, newNode); // Failure is OK. // 失败也无妨, 说明被别的线程更新了 return true; // 退出循环 } // Lost CAS race to another thread; re-read next } else if (p == q) // 状况2: p节点被删除了 // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. // 保存的节点p、t都已经失效了,这时须要从新检索,从新检索的起始位置有两种状况 // 1.1. 若是tail==t,表示tail也是失效的, 那么从head开始找 // 1.2. 不然tail就是被其余线程更新了, 能够又试着从tail找 p = (t != (t = tail)) ? t : head; else // 状况3: 沿着p往下找 // Check for tail updates after two hops. // 这段简单看做p = q就好理解了, 这么写是为了提升效率: // 1. 状况二中p可能指向了head(因为tail节点失效致使的) // 2. 如今tail可能被其余线程更新,也许从新指向了队尾 // 3. 若是是, 尝试则从队尾开始找, 以减小迭代次数 p = (p != t && t != (t = tail)) ? t : q; } }
状况2中的p = (t != (t = tail)) ? t : head;
(t != (t = tail))
能够分三步来看缓存
1.1. 首先取出t 1.2. 将tail赋值给t 1.3. 将先前取出的t与更新后的t比较
状况3中p = (p != t && t != (t = tail)) ? t : q;
并发
首先: p != t: 这种状况只有可能发生在执行了状况2后
现状: 这时p指向head或者中间的元素, t指向一个被删除了的节点
那么若是tail被其余线程更新了, 咱们能够将t从新指向tail, p指向t, 就像刚进循环同样, 从尾节点开始检索。
这样比从head日后找更有效率
补充一项, item==null,也表示节点已经被删除(参考remove方法)。高并发
/** * updateHead * */ public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; } } } /** * Tries to CAS head to p. If successful, repoint old head to itself * as sentinel for succ(), below. */ final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); }
先更新head, 再将旧head的next指向本身this
UNSAFE.compareAndSwapObject(对象, 字段偏移量, 当前值, 新值)
线程
能够为对象中的某个字段实现CAS操做
UNSAFE.putOrderedObject(对象, 字段偏移量, 新值)
rest
这个只能用在 volatile字段上 我的理解: volatile的设值会致使本地缓存失效, 那么须要从新从主存读取, 使用这个方法可使寄存器缓存依旧有效, 没必要急于从主存取值。 使用目的: 移除节点时, 须要更新节点的next指向自身, 但如今next指向的数据实际是有效的; 高并发状况下,若是offser方法已经缓存了这个next值, 直接设置next会致使缓存行失效, CPU须要从新读取next; 而使用putOrderedObject可让offser从这个next继续检索