Java并发编程笔记——J.U.C之collections框架:ConcurrentLinkedQueue

一:ConcurrentLinkedQueue简介

ConcurrentLinkedQueue是线程安全的无界非阻塞队列,其底层数据结构使用单向链表实现,对于入队和出队操做使用CAS来实现线程安全。java

Doug Lea在实现ConcurrentLinkedQueue时,并无利用锁或底层同步原语,而是彻底基于自旋+CAS的方式实现了该队列。回想一下AQS,AQS内部的CLH等待队列也是利用了这种方式。node

因为是彻底基于无锁算法实现的,因此当出现多个线程同时进行修改队列的操做(好比同时入队),极可能出现CAS修改失败的状况,那么失败的线程会进入下一次自旋,再尝试入队操做,直到成功。算法

因此,在并发量适中的状况下,ConcurrentLinkedQueue通常具备较好的性能。编程

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {

二:下面是ConcurrentLinkedQueue的类图结构

三:segmentfault

三:ConcurrentLinkedQueue原理

队列结构

咱们来看下ConcurrentLinkedQueue的内部结构:安全

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
    implements Queue<E>, java.io.Serializable {
 
    /**
     * 队列头指针
     */
    private transient volatile Node<E> head;
 
    /**
     * 队列尾指针.
     */
    private transient volatile Node<E> tail;
 
    // Unsafe mechanics
     
    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
    private static final long tailOffset;
     
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentLinkedQueue.class;
            headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head"));
            tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
 
    /**
     * 队列结点定义
     */
    private static class Node<E> {
        volatile E item;        // 元素值
        volatile Node<E> next;  // 后驱指针
 
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }
 
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }
 
        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }
 
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }
 
        // Unsafe mechanics
 
        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;
 
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
 
    //...
}

能够看到,ConcurrentLinkedQueue内部就是一个简单的单链表结构,每入队一个元素就是插入一个Node类型的结点。数据结构

字段head指向队列头,tail指向队列尾,经过Unsafe来CAS操做字段值以及node对象的字段值。多线程

构造器的定义

ConcurrentLinkedQueue包含两种构造器:并发

//构建一个空队列(head,tail均指向一个占位结点)
public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}
//根据以有集合构造队列
public ConcurrentLinkedQueue(Collection<? extends E> c) {
    Node<E> h = null, t = null;
    for (E e : c) {
        checkNotNull(e);
        Node<E> newNode = new Node<E>(e);
        if (h == null)
            h = t = newNode;
        else {
            t.lazySetNext(newNode);
            t = newNode;
        }
    }
    if (h == null)
        h = t = new Node<E>(null);
    head = h;
    tail = t;
}

咱们不妨先看下空构造器,经过空构造器创建的ConcurrentLinkedQueue对象,其head和tail指针并不是指向null,而是指向一个item值为null的node结点,以下图:函数

入队操做

元素的入队是在队尾插入元素,ConcurrentLinkedQueue的入队代码很是简单,却也很是精妙:

尾添加offer操做是在队列末一个元素,若是传递的参数是null则抛出NPE异常,不然因为ConcurrentLinkedQueue是无界队列,该方法会一直返回true。

另外,因为使用CAS算法,所以该方法不会阻塞挂起调用线程。下面具体看下实现原理。

public boolean add(E e) {//入队一个元素
    return offer(e);
}
//在队尾入队元素e,直到成功
public boolean offer(E e) {
    //e为null,则抛出空指针异常
    checkNotNull(e);    

    //构造node结点,在构造函数内部调用unsafe.putObject
    final Node<E> newNode = new Node<E>(e);    

    //从尾结点进行插入
    for (Node<E> t = tail, p = t;;) {   
        Node<E> q = p.next;

         //CASE1:q==null 说明p是尾结点,则直接插入
        if (q == null) {   
            //使用CAS设置p结点的next结点
            if (p.casNext(null, newNode)) {    
                //CAS成功,则说明新增结点已经放入链表,而后设置当前尾结点(包含head,第1,3,4....个结点为尾结点)
                if (p != t) // hop two nodes at a time    //CAS竞争失败的线程会在下一次自旋中进入该逻辑
                    casTail(t, newNode);  // Failure is OK.    //从新设置队尾指针tail
                return true;
            }    //CAS竞争失败则进入下一次自旋
        }
        else if (p == q)    //CASE2:发生了出队操做
            //多线程操做时,因为poll操做移除元素后,可能会把head变为自引用,也就是head的next变成了head,因此这里须要从新找新的head
            p = (t != (t = tail)) ? t : head;
        else
            //寻找尾结点
            p = (p != t && t != (t = tail)) ? t : q;    //将p从新指向队尾结点
    }
}

咱们来分析下offer方法的实现。单线程的状况下,元素入队比较好理解,直接线性地在队首插入元素便可。咱们假设有两个线程ThreadA和ThreadB同时进行入队地操做。

①ThreadA先单独入队两个元素九、2

此时队列地结构以下:

②ThreadA入队元素“10”,ThreadB入队元素“25”

此时ThreadA和ThreadB若并发执行,咱们看下会发生什么:

一、ThreadA和ThreadB同时进入自旋中的如下代码块:

if (q == null) {
    if (p.casNext(null, newNode)) {    //CASE1:正常状况下,新结点直接插入到队尾
       //CAS竞争插入成功
        if (p != t) // hop two nodes at a time//CAS竞争失败地线程会在下一次自旋中进入该逻辑
            casTail(t, newNode);  // Failure is OK.    //从新设置队尾指针tail
        return true;
    //CAS竞争插入失败则进入下一次自旋
    }

二、ThreadA执行cas操做(p.casNext)成功,插入新结点“10”

ThreadA执行完成后,直接返回true,队列结构以下:

三、ThreadB执行cas操做(p.casNext)失败

因为CAS操做同时修改队尾元素,致使ThreadB操做失败,则ThreadB进入下一次自旋;
在下一次自旋中,进入如下代码块:

else
    // Check for tail updates after two hops.
    p = (p != t && t != (t = tail)) ? t : q;    //将p从新指向队尾结点

上述分支的做用就是让p指针从新定位到队尾结点,此时队列结构以下:

而后ThreadB会继续下一次自旋,并再次进入如下代码块:

if (q == null) {
    // p is last node
    if (p.casNext(null, newNode)) {
        // Successful CAS is the linearization point
        // for e to become an element of this queue,
        // and for newNode to become "live".
        if (p != t) // hop two nodes at a time
            casTail(t, newNode);  // Failure is OK.
        return true;
    }
    // Lost CAS race to another thread; re-read next
}

此时,CAS操做成功,队列结构以下:

因为此时p!=t ,因此会调用casTail方法从新设置队尾指针:

private boolean casTail(Node<E> cmp, Node<E> val) {    //从新设置队尾指针tail
    return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}

这个分支只有在元素入队的同时,针对该元素也发生了“出队”操做才会执行,咱们后面会分析元素的“出队”,理解了“出队”操做再回头来看这个分支就容易理解不少了。

出队操做

队列中元素的“出队”是从队首移除元素,咱们来看下ConcurrentLinkedQueue是如何实现出队的:

//在队首出队元素,直到成功
public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            if (item != null && p.casItem(item, null)) {//CASE2:队首是非哨兵结点(item!=null)
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);//CASE1:队首是一个哨兵结点(item==null)
                return item;
            }
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

仍是经过示例来看,假设初始的队列结构以下:

①ThreadA先单独进行出队操做

因为head所指的是item==null的结点,因此ThreadA会执行如下分支:

else
    p = q;

而后进入下一次自旋,在自旋中执行如下分支,若是CAS操做成功,则移除首个有效元素,并从新设置头指针:

if (item != null && p.casItem(item, null)) {    //CASE2:队首是非哨兵结点
    // Successful CAS is the linearization point
    // for item to be removed from this queue.
    if (p != h) // hop two nodes at a time
        updateHead(h, ((q = p.next) != null) ? q : p);
    return item;
}

此时队列的结构以下:

若是ThreadA的CAS操做失败呢?

CAS操做失败则会进入如下分支,并从新开始自旋:

else if (p == q)
    continue restartFromHead;

最终前面两个null结点会被GC回收,队列结构以下:

②ThreadA继续进行出队操做

ThreadA继续执行“出队”操做,仍是执行如下分支:

if (item != null && p.casItem(item, null)) {
    // Successful CAS is the linearization point
    // for item to be removed from this queue.
    if (p != h) // hop two nodes at a time
        updateHead(h, ((q = p.next) != null) ? q : p);
    return item;
}

可是此时p==h,因此仅将头结点置null,这实际上是一种“懒删除”的策略。

出队元素“2”:

出队元素“10”:

最终队列结果以下:

③ThreadA进行出队,其它线程进行入队

这是最特殊的一种状况,当队列中只剩下一个元素时,若是同时发生出队和入队操做,会致使队列出现下面这种结构:(假设ThreadA进行出队元素“25”,ThreadB进行入队元素“11”)

此时tail.next=tail自身,因此ThreadB在执行入队时,会进入到offer方法的如下分支:

else if (p == q)    //CASE2:发生出队操做
    // We have fallen off list.  If tail is unchanged, it
    // will also be off-list, in which case we need to
    // jump to head, from which all live nodes are always
    // reachable.  Else the new tail is a better bet.
    p = (t != (t = tail)) ? t : head;

3、总结

ConcurrentLinkedQueue使用了自旋+CAS的非阻塞算法来保证线程并发访问时的数据一致性。因为队列自己是一种链表结构,因此虽然算法看起来很简单,但其实须要考虑各类并发的状况,实现复杂度较高,而且ConcurrentLinkedQueue不具有实时的数据一致性,实际运用中,队列通常在生产者-消费者的场景下使用得较多,因此ConcurrentLinkedQueue的使用场景并不如阻塞队列那么多。

另外,关于ConcurrentLinkedQueue还有如下须要注意的几点:

  1. ConcurrentLinkedQueue的迭代器是弱一致性的,这在并发容器中是比较广泛的现象,主要是指在一个线程在遍历队列结点而另外一个线程尝试对某个队列结点进行修改的话不会抛出ConcurrentModificationException,这也就形成在遍历某个还没有被修改的结点时,在next方法返回时能够看到该结点的修改,但在遍历后再对该结点修改时就看不到这种变化。
  2. size方法须要遍历链表,因此在并发状况下,其结果不必定是准确的,只能供参考。

参考书籍

Java并发编程之美

参考连接

http://www.javashuo.com/article/p-trqovday-gk.html

相关文章
相关标签/搜索