ConcurrentLinkedQueue源码分析

​ 在并发编程中,咱们可能常常须要用到线程安全的队列,JDK提供了两种模式的队列:阻塞队列和非阻塞队列。阻塞队列使用锁实现,非阻塞队列使用CAS实现。ConcurrentLinkedQueue是一个基于链表实现的无界线程安全队列,对于。下面看看JDK是如何使用非阻塞的方式来实现线程安全队列ConcurrentLinkedQueue的。java

成员属性

​ ConcurrentLinkedQueue由head和tail节点组成,节点与节点之间经过next链接,从而来组成一个链表结构的队列。node

private transient volatile Node<E> head;
private transient volatile Node<E> tail;
复制代码

Node类

​ Node有两个属性item和指向下一个节点的next,item和next都被声明成volatile类型,使用CAS来保证更新的线程安全。编程

private static class Node<E> {
    volatile E item;
    volatile Node<E> next;

    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }
	//更改Node中的数据域item 
    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }
    //更改Node中的指针域next
    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }
    //更改Node中的指针域next
    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}
复制代码

构造方法

​ 默认的无参构造,head和tail默认状况下指向item为null的Node哨兵结点。元素入队时被加入队尾,出队时候从队列头部获取一个元素。安全

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}
复制代码

offer方法

​ 在读源码并按照其执行流程分析以前,先给个结论:tail不必定指向对象真正的尾节点,后面咱们分析以后会发现这个特色。多线程

private static void checkNotNull(Object v) {
    if (v == null)
        throw new NullPointerException();
}
public boolean offer(E e) {
    //(1)若是e为null会抛出空指针异常
    checkNotNull(e);
    //(2)建立一个新的Node结点,Node的构造函数中会调用Unsafe类的putObject方法
    final Node<E> newNode = new Node<E>(e);
    //(3)从尾节点插入新的结点
    for (Node<E> t = tail, p = t;;) {
        //q为尾节点的next结点,可是在多线程中,若是有别的线程修改了tail结点那么在本线程中能够看到p!=null(后
        //面的CAS就是这样作的)
        Node<E> q = p.next;
        //(4)若是q为null,说明如今p是尾节点,那么能够执行添加
        if (q == null) {
            //(5)这里使用cas设置p结点的next结点为newNode
            //(传入null,比较p的next是否为null,为null则将next设置为newNode)
            if (p.casNext(null, newNode)) {
                //(6)下面是更新tail结点的代码
                //在CAS执行成功以后,p(原链表的tail)结点的next已是newNode,这里就设置tail结点为newNode
                if (p != t) // hop two nodes at a time
                    // 若是p不等于t,说明有其它线程先一步更新tail
                    // 也就不会走到q==null这个分支了
                    // p取到的多是t后面的值
                    // 把tail原子更新为新节点
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
        }
        //若是被移除了
        else if (p == q)
            //(7)多线程操做的时候,可能会有别的线程使用poll方法移除元素后可能会把head的next变成head,因此这里须要找到新的head:这里请参考后面的poll方法的讲解图示进行理解
            p = (t != (t = tail)) ? t : head;
        else
            // (8)查询尾节点
            p = (p != t && t != (t = tail)) ? t : q;
    }
}
复制代码

​ 上面是offer方法的实现以及注释,这里咱们分为单线程执行和多线程执行两种状况,按照上面的源码实现一步步分析整个的流程。先讨论单线程执行的过程并发

单线程执行

​ 在单线程环境下执行,那么就直接按照方法实现一步步执行判断便可,下面经过适当的图示来讲明这个过程框架

  1. 首先当一个线程调用offer方法的时候,在代码(1)处进行非空检查,为null抛出异常,不为null执行(2)函数

  2. 代码(2)Node<E> newNode = new Node<E>(e)使用item做为构造函数的参数,建立一个新的结点性能

  3. 代码(3)for (Node<E> t = tail, p = t;;)从队列尾部开始自旋循环,保证从队列尾部添加新的结点测试

  4. 得到tailnext结点(q),此时的队列状况以下图所示(默认构造方法中将head和tail都指向的是一个item为null的结点)。此时的q指向的是null

  1. 代码(4)if (q == null)处执行判断q==null为true

  2. 代码(5)if (p.casNext(null, newNode))处执行的是将p的next结以CAS方式更新为咱们建立的newNode。(其中CAS会判断p的next是否为null,为null才更新为newNode

  3. 此时的p==t,因此不会执行更新tail的代码块(6)casTail(t, newNode),而是从offer方法退出。这时候队列状况以下所示

  1. 那么这一个线程执行完,但是tail尚未改变呢:实际上第二次进行offer的时候,会发现p=tail,p.next!=null,就会执行代码(8)p = (p != t && t != (t = tail)) ? t : q,简单分析一下:

    • p != t:p为tail,t为tail,因此为false
    • t != (t = tail):显然也是false
  2. 因此结果就是p=q,而后进行下一次循环,以后判断的p.next就是null,因此能够CAS成功,也由于p!=t,因此会更新tail结点。

​ 因此上面给的结论在这里就体现了,即tail并不老是指向队列的尾节点,那么实际上也能够换种方式让tail指向尾节点,即以下这样实现

if (e == null)
    throw new NullPointerException();
Node<E> n = new Node<E>(e);
for (;;) {
    Node<E> t = tail;
    if (t.casNext(null, n) && casTail(t, n)) {
        return true;
    }
}
复制代码

​ 可是若是大量的入队操做,那么每次都须要以CAS方式更新tail指向的结点,当数据量很大的时候对性能的影响是很大的。因此最终实现上,是以减小CAS操做来提升大数量的入队操做的性能:每间隔1次(tail指向和真正的尾节点之间差1)进行CAS操做更新tail指向尾节点(可是距离越长带来的负面效果就是每次入队时定位尾节点的时间就越长,由于循环体须要多循环一次来定位出尾节点(将指向真正的尾节点,而后添加newNode))。其实在前面分析成员属性时候也知道了,tail是被volatile修饰的,而CAS方式本质上仍是对于volatile变量的读写操做,而volatile的写操做开销大于读操做的,因此Concurrent Linked Queue的是线上是经过增长对于volatile变量的读操做次数从而相对的减小对其写操做。下面是单线程执行offer方法的时候tail指向的变化简图示意

多线程执行

​ 上面演示的单个线程的执行,那么当在多线程环境下执行的话会发生什么状况,这里假设两个线程并发的执行.

状况1

这里分析的其实就是假设多个线程都会执行到CAS更新p.next结点的代码,咱们下面看一下,假设threadA调用offer(item1),threadB调用offer(item2)都执行到p.casNext(null, newNode)位置处

  • CAS操做的原子性,假设threadA先执行了上面那行代码,并成功更新了p.next为newNode
  • 这时候threadB天然在进行CAS比较的时候就会失败了(p.next!=null),因此会进行下一次循环从新获取tail结点而后尝试更新

这时候的队列状况以下

  • threadB得到tail结点以后,发现其q!=nullq=p.next,p=tail

  • 继续判断p==q也是false,因此执行代码(8)

  • 分析一下p = (p != t && t != (t = tail)) ? t : q这个代码

    1. p != t:p为tail,t为tail,因此为false
    2. t != (t = tail):显然也是false
    3. 因此上面三目运算的结果就是p=q,以下图所示结果

  • 而后再次执行循环,这时候p.next就是null了,因此能够执行代码(5)p.casNext(null,newNode)。这个时候CAS判断获得p.next == null,因此能够设置p.next=Node(item2)

  • CAS成功后,判断p!=t(如上图所示),因此就能够设置tail为Node(item2)了。而后从offer退出,这个时候队列状况为

​ 能够看出,状况1中假设两个线程初始时候都拿到的是p=tail,p.next=null,那么都会执行CAS尝试添加newNode,可是只有一个线程可以在第一次循环的时候添加成功而后返回true(可是这时候的tail尚未变化,相似单线程总结那块的tail和真正的尾节点差1或0),因此另外一个线程会在第二次循环中从新尝试,这个时候就会改变p的指向,即p = (p != t && t != (t = tail)) ? t : q代码处。而后再第三次循环中才能真正CAS添加成功(固然咱们这里分析的是假想的两个线程状况,实际多线程环境确定更复杂,可是逻辑仍是差很少的)

状况2

​ 这里分析的是主要是代码p = (p != t && t != (t = tail)) ? t : q的另外一种状况,即p=t的状况,仍是先分析一下这行,假设如今

  • p != t为true,
  • t != (t = tail) : 也为true(左边的t是再循环开始的时候得到的指向tail的信息,括号中从新得到tail并赋值给t,这个时候有可能别的线程已经更改了 volatile修饰的tail了)

​ 那么结果就是p 从新指向队列的尾节点tail了,下面假想一种这样的状况

​ 实际上这种是利用volatile的可见性快速将一个要添加元素的线程找到当前队列的尾节点,避免多余的循环。 如图,假设threadA此时读取了变量tail,threadB恰好在这个时候添加若干Node后,此时会修改tail指针,那么这个时候线程A再次执行t=tail时t会指向另一个节点,因此threadA先后两次读取的变量t指向的节点不相同,即t != (t = tail)为true,而且因为t指向节点的变化p != t也为true,此时该行代码的执行结果为p和t最新的t指针指向了同一个节点,而且此时t也是队列真正的尾节点。那么,如今已经定位到队列真正的队尾节点,就能够执行offer操做了。

状况3

​ 上面咱们讨论的都是多线程去添加元素的操做,那么当既有线程offer也有线程调用poll方法的时候呢,这里就要调用offer方法中的代码块(7)了。由于尚未说到poll方法,因此这里的代码就先不作解释,下面讲poll方法在多线程中的执行的时候,会拿offer-poll-offer这种状况进行说明,那么offer方法就可能执行这几行代码了。

else if (p == q)
    //(7)多线程操做的时候,可能会有别的线程使用poll方法移除元素后可能会把head的next变成head,因此这里须要找到新的head
    p = (t != (t = tail)) ? t : head;
复制代码

add方法

public boolean add(E e) {
    return offer(e);//这里仍是调用的offer方法,上面说到了,这里就不说明了
}
复制代码

poll方法

​ poll方法是在队列头部获取并移除一个元素,若是队列为空就返回null,下面先看下poll方法的源码,而后仍是分别分析单线程和多线程下的执行

public E poll() {
    //标记
    restartFromHead:
    for (;;) {//自旋循环
        for (Node<E> h = head, p = h, q;;) {
            //(1)保存当前结点的item
            E item = p.item;
            //(2)若是当前结点的值不为null,那就将其变为null
            if (item != null && p.casItem(item, null)) {
                //(3)CAS成功以后会标记当前结点,并从链表中移除
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            //(4)若是队列为空会返回null
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            //(5)若是当前结点被自引用了,从新找寻新的队列头节点
            else if (p == q)
                continue restartFromHead;
            else
                p = q; //进行下一次循环,改变p的指向位置
        }
    }
}
final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}
复制代码

上面咱们已经看了poll方法的源码,下面咱们就按照这个方法的实现经过图示的方式来理解一下。

单线程执行

​ poll操做是从队头获取元素,因此:

  • 从head结点开始循环,首先for (Node<E> h = head, p = h, q;;)得到当前队列的头节点,固然若是队列一开始就为空的时候,就以下所示

​ 因为head结点是做为哨兵结点存在的,因此会执行到代码(4)else if ((q = p.next) == null),由于队列为空,因此直接执行updateHead(h, p),而updateHead方法中判断的h=p,因此直接返回null。

  • 上面是队列为空的状况 ,那么当队列不为空的时候呢,假设如今队列状况以下所示

  • 因此在代码(4)else if ((q = p.next) == null)处的判断结果是false,

  • 因此执行下一个判断else if (p == q),判断结果仍是false

  • 最后执行p=q,完了以后下一次循环队列状态为

  • 在新的一次循环中,能够判断获得item!=null,因此使用CAS方式将item设置为null,(这是单线程状况下的测试)因此继续执行if(p!=h),判断结果为true。因此执行if中的内容:updateHead(h, ((q = p.next) != null) ? q : p),什么意思呢?以下所示,因此咱们这里的结果就是q=null,因此传入的参数为p(p指向的位置如上图所示)

    //updateHead方法的参数(Node h,Node p)
    q = p.next;
    if(null != q) {
    	//第二个参数就是q
    } else {
        //第二个参数就是p
    }
    复制代码

    而后执行updateHead方法,这里咱们须要再看一下该方法的细节

    final void updateHead(Node<E> h, Node<E> p) {
        //若是h!=p,就以CAS的方式将head结点设置为p
        if (h != p && casHead(h, p))
            //这里是将h结点的next结点设置为本身(h)
            h.lazySetNext(h);
    }
    //Node类中的方法
    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }
    复制代码

    那么执行完这些以后,队列中状态是什么样呢,以下图所示。执行完毕就返回被移除的元素怒item1

多线程执行offer、poll

​ 上面分析了单线程下,调用poll方法的执行流程。其实刚刚再将offer方法的时候还有一个坑没有解决。以下描述的状况

  • 假设原有队列中有一个元素item1

  • 假设在thread1调用offer方法的时候,别的线程恰好调用poll方法将head结点移除了,按照上面的分析,poll方法调用后队列的状况以下

  • (这里回忆一下offer的执行流程)因此在thread1继续执行的时候,执行的for (Node<E> t = tail, p = t;;)以后得到tail指向的位置如上图所示,可是这个tail指向的结点的next指针指向的位置仍是本身。因此Node<E> q = p.next执行以后q=tail=p。因此在offer方法中就会执行如下判断

    else if (p == q)
        //(7)多线程操做的时候,可能会有别的线程使用poll方法移除元素后可能会把head的next变成head,因此这里须要找到新的head
        p = (t != (t = tail)) ? t : head;
    复制代码

    仍是简单分析一下p = (t != (t = tail)) ? t : head这句,以下所示。简单分析以后就能得出,p指向了poll方法调用完毕后的新的head结点(如上图所示的head结点),而后调用offer的线程就能正常的添加结点了,具体流程仍是和上面讲到的同样。(那这个tail又在何时被指向队尾结点呢,实际上在调用offer方法添加完元素以后p.casNext(null, newNode),就会判断得出p != t,那完了以后就会更新tail指向的位置了)

    //在最开始时候得到的t=tail
    t=tail; //for循环中赋值t
    //...offer的其余代码
    if(t != (t = tail)) { //这里仍是同样:tail为volatile修饰,因此从新读取tail变量
        p = t; //这里表示tail结点不变(按照上图poll执行完后的状况,tail指向位置没有变化,因此p不会被赋值为t)
    } else {
        p = head; //注意这时候的head已经指向的新的首结点
    }
    复制代码

多线程执行poll、poll

​ 分析这么多,咱们发现跟offer方法留坑同样,poll还有一处代码尚未分析,因此下面仍是经过图示进行分析,先看下这个代码框架。

//标记
restartFromHead:
for (;;) {//自旋循环
    for (Node<E> h = head, p = h, q;;) {
        //...other code
        //这是自旋循环体中的一个判断
        else if (p == q)
            continue restartFromHead;
    }
}
复制代码

​ 仍是假设如今两个线程去执行poll方法,

  • 初始状况下的队列状态为

  • 假设threadA执行poll方法,并成功的执行if (item != null && p.casItem(item, null))这块,将item1设置为了null,以下图所示。

  • 可是threadA尚未执行updateHead方法,这个时候threadB执行poll以后,p指向了上图中的head,以下所示

  • 以后threadA执行updateHead方法更新了head的指向,并将原head的next结点指向本身.那么线程B执行q=p.next,天然获得的就是p==q的结果了,因此这个时候就须要跳到外层循环从新获取最新的head结点,而后继续执行

poll方法总结

​ poll方法在移除头部元素的时候,使用CAS操做将头节点的item设置为了null,而后经过冲洗设置头节点head的指向位置来达到删除队列元素的效果。这个时候原来的头部哨兵结点就是一个孤立的结点了,会被回收掉。固然,若是线程执行poll方法的时候发现head结点被修改(上面说的这种状况),就须要跳转到最外层循环从新获取新的结点。

peek方法

​ 获取队列头部的第一个元素但不删除,若是队列为空则返回null。下面是该方法的实现

public E peek() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            if (item != null || (q = p.next) == null) {
                updateHead(h, p);
                return item;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}
复制代码

​ 须要注意的是,第一次调用peek方法的时候会删除哨兵结点,并让队列中的head结点指向队列中的第一个元素或者null.

size方法

​ 计算当前队列元素个数,可是由于使用的是CAS的方式在并发环境下可能由于别的线程删除或者增长元素致使计算结果不许确。

public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}
//找到队列中的第一个元素(head指向的item为null的结点不算(就是哨兵结点)),
//没有则返回null
Node<E> first() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            boolean hasItem = (p.item != null);
            if (hasItem || (q = p.next) == null) {
                updateHead(h, p);
                return hasItem ? p : null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}
复制代码

remove方法

​ 传入的参数为要删除的元素,若是队列中存在该元素就删除找到的第一个,而后返回true,不然返回false

public boolean remove(Object o) {
    if (o != null) { //若是传入参数为null,直接返回false
        Node<E> next, pred = null;
        for (Node<E> p = first(); p != null; pred = p, p = next) {
            boolean removed = false;
            E item = p.item;
            //找到相等的就使用cas设置为null,只有一个线程操做成功
            //别的循环查找是否又别的匹配的obj
            if (item != null) {
                if (!o.equals(item)) {
                    //获取next元素
                    next = succ(p);
                    continue;
                }
                removed = p.casItem(item, null);
            }

            next = succ(p);
            if (pred != null && next != null) // unlink
                pred.casNext(p, next);
            if (removed)
                return true;
        }
    }
    return false;
}
复制代码

参考自《Java并发编程的艺术》

相关文章
相关标签/搜索