java并发:非阻塞队列之ConcurrentLinkedQueue

初始ConcurrentLinkedQueue

ConcurrentLinkedQueue 是线程安全的无界非阻塞队列,其底层使用单向链表实现,对于入队和出队操做使用 CAS 来实现线程安全。node

其类图以下:算法

从类图能够发现其与其它阻塞队列的一个明显区别是,ConcurrentLinkedQueue没有实现BlockingQueue接口,因此ConcurrentLinkedQueue没有提供具备阻塞性质的put、take等方法。安全

 

其构造函数以下:app

    /**
     * Creates a {@code ConcurrentLinkedQueue} that is initially empty.
     */
    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>();
    }

    /**
     * Creates a {@code ConcurrentLinkedQueue}
     * initially containing the elements of the given collection,
     * added in traversal order of the collection's iterator.
     *
     * @param c the collection of elements to initially contain
     * @throws NullPointerException if the specified collection or any
     *         of its elements are null
     */
    public ConcurrentLinkedQueue(Collection<? extends E> c) {
        Node<E> h = null, t = null;
        for (E e : c) {
            Node<E> newNode = new Node<E>(Objects.requireNonNull(e));
            if (h == null)
                h = t = newNode;
            else
                t.appendRelaxed(t = newNode);
        }
        if (h == null)
            h = t = new Node<E>();
        head = h;
        tail = t;
    }

解读:函数

ConcurrentLinkedQueue维护有两个 volatile 类型的 Node 节点分别用来存放队列的首、尾节点。ui

建立队列时头、尾节点指向一个 item 为 null 的哨兵节点。this

 

Node的定义以下:spa

    static final class Node<E> {
        volatile E item;
        volatile Node<E> next;

        /**
         * Constructs a node holding item.  Uses relaxed write because
         * item can only be seen after piggy-backing publication via CAS.
         */
        Node(E item) {
            ITEM.set(this, item);
        }

        /** Constructs a dead dummy node. */
        Node() {}

        void appendRelaxed(Node<E> next) {
            // assert next != null;
            // assert this.next == null;
            NEXT.set(this, next);
        }

        boolean casItem(E cmp, E val) {
            // assert item == cmp || item == null;
            // assert cmp != null;
            // assert val == null;
            return ITEM.compareAndSet(this, cmp, val);
        }
    }

解读:线程

在 Node 节点内部维护了一个使用 volatile 修饰的变量 item,用来存放节点的值;rest

next 用来存放链表的下一个节点,从而连接为一个单向无界链表。

 

添加元素的方法以下:

    /**
     * Inserts the specified element at the tail of this queue.
     * As the queue is unbounded, this method will never return {@code false}.
     *
     * @return {@code true} (as specified by {@link Queue#offer})
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        final Node<E> newNode = new Node<E>(Objects.requireNonNull(e));

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                if (NEXT.compareAndSet(p, null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time; failure is OK
                        TAIL.weakCompareAndSet(this, t, newNode);
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

解读:

offer操做是在队列末尾添加一个元素,若是传递的参数是 null,则抛出 NullPointerException 异常;不然因为 ConcurrentLinkedQueue 是无界队列,该方法将一直会返回 true(因为使用 CAS 无阻塞算法,该方法不会阻塞调用线程)。

队列一开始为空时的状态以下图:

 

想象一下,当单个或者多个线程操做队列可能发生的状况以理解上述代码中的各个分支。

 

小结:

offer操做中的关键步骤是经过 CAS 操做来控制某个时间只有一个线程能够追加元素到队列末尾。

CAS 竞争失败的线程会经过循环一次次尝试,直到 CAS 成功才会返回。

这里经过使用无限循环不断进行 CAS 尝试来代替阻塞算法挂起调用线程;相比阻塞算法,这是使用 CPU资源换取阻塞所带来的开销。

 

移除元素的方法以下:

    public E poll() {
        restartFromHead: for (;;) {
            for (Node<E> h = head, p = h, q;; p = q) {
                final E item;
                if ((item = p.item) != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
            }
        }
    }

解读:

poll 操做是在队列头部获取并移除一个元素,若是队列为空则返回 null。

 

Note:

ConcurrentLinkedQueue 须要遍历链表来获取 size,而不是使用原子变量(使用原子变量保存队列元素个数须要保证入队、出队操做是原子性操做)。

因为使用非阻塞 CAS 算法,没有加锁,因此在计算 size 时有可能进行了 offer、poll 或者 remove 操做,致使计算的元素个数不精确,因此在井发状况下 size 函数不是颇有用。