ConcurrentLinkedQueue源码分析

相关文章

Lock锁源码分析java

java阻塞队列BlockingQueuenode

HashMap源码分析-jdk1.6和jdk1.8的区别算法

LinkedList源码分析安全

ArrayList源码分析源码分析

前言

ConcurrentLinkedQueue是一个线程安全的队列,它采用的是 CAS 算法来进行实现,也就是说它是非阻塞的;队列中的元素按照 FIFO(先进先出)的原则对元素进行排列,此外,它是一个无界队列;添加元素的时候,在链表的尾部进行添加,获取元素的时候,从链表的头部获取。它内部采用的单向链表的形式来表示,链表的节点是定义在ConcurrentLinkedQueue的一个内部类。性能

类图

ConcurrentLinkedQueue 的类图以下所示:this

能够看到 ConcurrentLinkedQueue 实现了 Queue 接口和实现了继承了 AbstractQueue 类,而 Itr 和 Node则是它的一个内部类;spa

Queue 接口只是定义了一些队列的公共方法,以下:.net

public interface Queue<E> extends Collection<E> {
    // 添加元素
    boolean add(E e);
    // 添加元素
    boolean offer(E e);
    // 删除元素
    E remove();
    // 删除并返回第一个元素,若是队列为空,则返回 null 
    E poll();
   // 返回第一个元素,若是不存在,则抛出NoSuchElementException异常
    E element();
    // 返回第一个元素,但不删除,若是队列为空,则返回 null 
    E peek();
}

AbstractQueue 类也继承了 Queue接口,提供了某些方法的实现,以下所示:线程

public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> {

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

    public E remove() {
        E x = poll();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }
...............................
}

源码

接下来看看 ConcurrentLinkedQueue 的一个实现过程:

首先看一下队列中链表节点的定义,链表中的节点使用一个 Node 内部类来表示:

private static class Node<E> {
        // 节点中的元素
        volatile E item;
        // 下一个节点,没有上一个节点,表示它是一个单向链表的形式
        volatile Node<E> next;
        // 构造一个节点
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }
        // 使用 CAS 的方式设置节点的元素
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }
        // 设置下一个节点
        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }
        // 采用 CAS 的方式设置下一个节点
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }
      //  Unsafe 类的一些初始化
}

能够看到 Node 类的定义比较简单,值得注意的地方是 E item 元素和 Node next 节点都使用了 volatile 来修饰,这说明了元素或某个节点被一个线程修改了以后,其余的线程是立马看到修改后的值的。

接下来看一下 ConcurrentLinkedQueue 类中的属性和方法:

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {

    // 头节点,
    private transient volatile Node<E> head;
    // 尾节点,尾节点不必定是链表的最后一个节点
    private transient volatile Node<E> tail;
    // 构造
    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }
......................
}

以上能够看到,头节点 head 和尾节点 tail 都被 volatile 修饰,节点被一个线程修改了以后,是会把修改的最新的值刷新到主内存中去,当其余线程去读取该值的时候,会中主内存中获取最新的值,也就是一个线程修改了以后,对其余线程是当即可见的。

当使用空的构造其是实例化一个对象的时候,会建立一个节点,节点的值为 null(添加的时候,是不能为null的),并把头节点和尾节点都指向该节点,以下所示:

添加元素

以后向链表中添加元素,添加元素的时候,是在链表的尾部进行添加,添加元素有两个方法 add() 和 offer(),add() 会调用 offer() 进行添加,这两个方法永远都会返回 true,因此不要使用 true | false 来判断是否添加成功;

public boolean add(E e) {
        return offer(e);
    }
public boolean offer(E e) {
        // 判空,为空则抛出空指针异常
        checkNotNull(e);
        // 建立要添加的节点
        final Node<E> newNode = new Node<E>(e);
        
        // 无限循环,入队不成功,则反复入队
        // t 表示 tail 节点
        // p 表示链表的尾节点,默认等于 tail 节点
        for (Node<E> t = tail, p = t;;) {
            // q 为尾节点的下一个节点         
            Node<E> q = p.next;
            // 若是尾节点的下一个节点为空,则表示 p 为尾节点
            if (q == null) {
                // CAS 设置尾节点的下一个节点为新添加的节点,若是设置失败,在再次尝试
                if (p.casNext(null, newNode)) {
                    // 若是tail节点有大于等于1个的 next 节点,则更新 tail 节点,将新添加的节点设置为 tail 节点
                    if (p != t) // 至关于循环两次更新一次 tail 节点
                        casTail(t, newNode);  // 新添加的节点设置为tail节点,容许失败,失败了表示有其余线程成功更新了tail节点
                    return true;
                }
            }
            else if (p == q) // 只有在尾节点和尾节点的下一个节点为空的状况下成立
                p = (t != (t = tail)) ? t : head;
            else 
                // 把 tail节点设置为为尾节点,再次循环设置下一个节点
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

以上方法就是向队列中添加元素的方法,该方法也是该类中比较难理解的方法;

从上面的代码能够看出,入队主要作两件事情:第一是将新添加的节点设置成当前队列尾节点的下一个节点;第二是更新tail节点,若是tail节点的next节点不为空,则将入队节点设置成tail节点,若是tail节点的next节点为空,则将入队节点设置成tail的next节点,因此tail节点不老是尾节点。

接下来经过图的方法来看一下元素入队的一个过程:

初始时队列以下:

1. 添加第一个节点:

for循环:

for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;

接下来进行 q==null 的判断:

if (q == null) {
                if (p.casNext(null, newNode)) {

                    if (p != t) 
                        casTail(t, newNode);  
                    return true;
                }
            }

此时,q==null, 也就是说尾节点的下一个节点为空,此时把新添加的节点设置为尾节点的下一个节点,有由于,p节点和 t节点是相等的,因此不会更新 tail 节点;设置成功后,链表以下:

2.添加第二个节点:

if (q == null) {
             ....
            }
            else if (p == q)
                // p = q = null
                p = (t != (t = tail)) ? t : head;
            else
                p = (p != t && t != (t = tail)) ? t : q;

此时, q 不为空,有p和q不想等,因此只会走 else 分支,p != t 为false,t != tail 也为false,全部 把 q 赋给 p ,此时队列以下:

以后,在走到for循环,此时 q == null, 因此,会把新添加的节点设置为 p 的下一个节点:

if (q == null) {
                if (p.casNext(null, newNode)) {
                    if (p != t) 
                        casTail(t, newNode);  
                    return true;
                }
            }

又由于此时,t 节点仍是指向第一个节点,p 指向第一个节点的next节点,t 和 p 不想等,因此执行 casTail(t, newNode);  新添加的节点设置为tail 节点,设置成功后,链表以下所示:

3.添加第三个元素,添加成功后,链表以下所示:

4.添加第四个节点,添加成功后,链表以下所示:

至关于在添加元素时,每循环两次才会更新一次 tail 节点。

为何不让 tail 节点永远为队列的尾节点,若是让 tail 节点永远为队列的尾节点,则实现的代码会更少且逻辑也会更清晰,这是由于,若是让 tail 永远为队列的尾节点,则每次都须要使用循环CAS更新tail节点,若是能减小更新 tail 节点的次数,入队的性能岂不是更高?因此说并非每次入队都须要更新尾节点,只有在tail节点和尾节点不想等的状况下才更新,这样能减小更新此时,提升效率。

获取元素

下面来看看获取元素的操做,ConcurrentLinkedQueue是一个FIFO的队列,因此获取元素的时候,老是获取到队列的第一个元素;获取元素有两个方法,poll() 和 peek(),poll()方法获取元素的时候,返回链表的第一个元素,并删除,而 peek() 方法获取元素的时候则不删除,下面看一下获取元素的主要代码逻辑:

public E poll() {
        // 循环跳转,goto语法
        restartFromHead:
        for (;;) {
            // p 表示要出队的节点,默认为 head节点
            for (Node<E> h = head, p = h, q;;) {
                // 出队的元素
                E item = p.item;
                // 若是出队的元素不为空,则把要出队的元素设置null,不更新head节点;若是出队元素为null或者cas设置失败,则表示有其余线程已经进行修改,则须要重写获取
                if (item != null && p.casItem(item, null)) {
                    if (p != h) // 当head元素为空,才会更新head节点,这里循环两次,更新一次head节点
                        updateHead(h, ((q = p.next) != null) ? q : p); // 更新head节点
                    return item;
                }
                // 队列为空,返回null
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                // 把 p 的next节点赋值给p
                else
                    p = q;
            }
        }
    }

下面经过图的方式来查看获取节点的过程:

首先队列以下所示:

以后去获取节点 item0,首先进行标记:

for (Node<E> h = head, p = h, q;;) {
       E item = p.item;
    ..............................
}

标记以下:

由于此时 p.item 不为空,所示 CAS 设置 p.item 为null,走第一个if语句,又由于此时 h 节点的 item 不为空,即 p 和 h 相等,因此不会更新头节点,直接返回 p 节点中的元素 item0,以下图所示:

if (item != null && p.casItem(item, null)) {
                    // 不会执行
                    if (p != h) // 
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }

接下来再获取 item1,进行标记,以下所示:

由于 p.item 为空,因此不知足条件  if (item != null && p.casItem(item, null)), 以后执行  else if ((q = p.next) == null),此时,q指向了 item1,

又由于 q 不为空,全部不会更新head节点,以后会执行最后一个else语句:p = q,第一次循环结束,开始第二次循环,又进行标记,此时标记以下:

由于 p.item 不为空,因此走以下代码逻辑,经过 CAS 把 p.item 设置为null,

if (item != null && p.casItem(item, null)) {
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }

由于此时,head节点元素为空,即 p 和 h 节点不相等,因此会更新头节点,又由于 p.next 即 item2 不为空,因此把 p.next 即 item2 设置为 head节点,设置成功后,队列以下所示:

以后获取item2也是一样的逻辑,获取 items2 后队列以下:

获取 item3 后以下所示:

以上就是从队列中获取元素的主要代码逻辑,从上可知,head节点不必定就是队列的第一个含有元素的节点,也不是每次获取元素后就更新head节点,只有当head中的元素为空的时候才更新head节点,这和添加 offer() 方法中更新tail节点相似,减小 CAS 更新head节点的次数,出队的效率会更高。

isEmpty()方法

ConcurrentLinkedQueue 经过 isEmpty来判断队列是否为空,代码以下:

public boolean isEmpty() {
        return first() == null;
    }
Node<E> first() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                boolean hasItem = (p.item != null);
                if (hasItem || (q = p.next) == null) {
                    updateHead(h, p);
                    return hasItem ? p : null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

isEmpty 方法会判断链表的第一个元素是否为空来进行判断的。

size()方法

public int size() {
        int count = 0;
        // succ() 获取下一个元素
        for (Node<E> p = first(); p != null; p = succ(p))
            if (p.item != null)
                if (++count == Integer.MAX_VALUE)
                    break;
        return count;
    }

size()方法会遍历全部的链表来查看有多少个元素。

对于在开发的时候,若是须要判断是否为空,则应该使用 isEmpty 而不该该使用 size() > 0 的方式,由于 size()会变量整个链表,效率较低。

ConcurrentLinkedQueue 类还有其余的一些方法,只有理解了入队和出队的方法 offer() 和 poll() 方法,其余方法就很好理解了。

例子

/**
 * @ Author:lenovo
 * @ Date:Created in 下午 7:22 2018/7/17 0017
 */
public class ConcurrentLinkedQueueTest {

    public static void main(String[] args) throws InterruptedException {
        new ConcurrentLinkedQueueTest().testConcurrentLinkedQueue();
        Thread.sleep(5000L);
    }

    int num = 0;

    public ConcurrentLinkedQueue testConcurrentLinkedQueue(){
        ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
        for(int i = 0; i < 100; i++) {
            new Thread(() -> {
                num++;
                queue.offer(num);
            }).start();
        }
        return queue;
    }
}
相关文章
相关标签/搜索