非阻塞队列ConcurrentLinkedQueue与CAS算法应用分析

ConcurrentLinkedQueue是无阻塞队列的一种实现, 依赖与CAS算法实现。node

入队offer

  1. if(q==null)当前是尾节点 -> CAS赋值tail.next = newNode, 成功就跳出循环
  2. elseif(p == q)尾节点被移除 -> 从tail或head从新日后找
  3. else不是尾节点 -> 往next找

规则定义:

当一个节点的next指向自身时, 表示节点已经被移除, 注释中还会强调这一点。算法

完整代码(JDK8):

/**
 * Inserts the specified element at the tail of this queue.
 * As the queue is unbounded, this method will never return {@code false}.
 *
 * @return {@code true} (as specified by {@link Queue#offer})
 * @throws NullPointerException if the specified element is null
 */
/*
* 变量说明:
*   成员变量: 
*       head: 首节点
*       tail: 尾节点, 不必定指向末尾, 两次入队才更新一次
*   局部变量
*         t= tail; //保存循环开始时, 当时的tail值
*         p= t; // 每次查找的起始位置, 可能指向首节点head或者临时尾节点t
*         q= p.next; // 每次循环下一个节点
*        newNode= new Node; // 新节点
*
*
* 重要概念:
*     当p = p.next时, 表示节点已经被移除
*/
public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {    // 状况1:  p是尾节点
            // p is last node
            // p是尾节点就直接将新节点放入末尾
            if (p.casNext(null, newNode)) {
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                if (p != t) // hop two nodes at a time // 一次跳两个节点, 即插入两次, tail更新一次
                    casTail(t, newNode);  // Failure is OK. // 失败也无妨, 说明被别的线程更新了
                return true;  // 退出循环
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)  // 状况2:  p节点被删除了
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            // 保存的节点p、t都已经失效了,这时须要从新检索,从新检索的起始位置有两种状况
            //     1.1. 若是tail==t,表示tail也是失效的, 那么从head开始找
            //     1.2. 不然tail就是被其余线程更新了, 能够又试着从tail找
            p = (t != (t = tail)) ? t : head;
        else             // 状况3:   沿着p往下找
            // Check for tail updates after two hops.
            // 这段简单看做p = q就好理解了,  这么写是为了提升效率:
            //     1. 状况二中p可能指向了head(因为tail节点失效致使的)
            //     2. 如今tail可能被其余线程更新,也许从新指向了队尾
            //     3. 若是是, 尝试则从队尾开始找, 以减小迭代次数
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

这两段代码看了好久, 特别记录下:

  1. 状况2中的p = (t != (t = tail)) ? t : head;
    (t != (t = tail))能够分三步来看缓存

    1.1. 首先取出t
      1.2. 将tail赋值给t
      1.3. 将先前取出的t与更新后的t比较
  2. 状况3中p = (p != t && t != (t = tail)) ? t : q;并发

    首先: p != t: 这种状况只有可能发生在执行了状况2后
    现状: 这时p指向head或者中间的元素, t指向一个被删除了的节点
    那么若是tail被其余线程更新了, 咱们能够将t从新指向tail, p指向t, 就像刚进循环同样, 从尾节点开始检索。
    这样比从head日后找更有效率

出队poll

规则定义:

补充一项, item==null,也表示节点已经被删除(参考remove方法)。高并发

/**
* updateHead
*   
*/
public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

/**
 * Tries to CAS head to p. If successful, repoint old head to itself
 * as sentinel for succ(), below.
 */
final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

出队设值操做:

先更新head, 再将旧head的next指向本身this

Note:

CAS算法实现依靠Unsafe.compareAndSwapObject实现

UNSAFE.compareAndSwapObject(对象, 字段偏移量, 当前值, 新值)线程

能够为对象中的某个字段实现CAS操做

lazySet依赖UNSAFE.putOrderedObject实现

UNSAFE.putOrderedObject(对象, 字段偏移量, 新值)rest

这个只能用在 volatile字段上 我的理解: volatile的设值会致使本地缓存失效, 那么须要从新从主存读取, 使用这个方法可使寄存器缓存依旧有效, 没必要急于从主存取值。 使用目的: 移除节点时, 须要更新节点的next指向自身, 但如今next指向的数据实际是有效的; 高并发状况下,若是offser方法已经缓存了这个next值, 直接设置next会致使缓存行失效, CPU须要从新读取next; 而使用putOrderedObject可让offser从这个next继续检索
相关文章
相关标签/搜索