JDK容器学习之Queue:ConcurrentLinkedQueue

并发安全的链表队列 ConcurrentLinkedQueue

并发安全的链表队列,主要适用于多线程环境中;底层数据结构为链表,因为队列自己频繁的出队和进队,那么这个线程安全是如何保障html

I. 数据结构

从命名能够基本推测底层数据结构应该是链表,结合源码看下具体的链表节点java

private static class Node<E> {
    volatile E item;
    volatile Node<E> next;

    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }

    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            // jdk内用于保障原子操做的辅助类
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

// 链表头,注意 volatile 声明
private transient volatile Node<E> head;

// 链表尾,注意 volatile 声明
private transient volatile Node<E> tail;

从定义能够须要注意如下几点node

  1. Node内部只有next节点,所以底层存储为单向链表
  2. head和tail声明为volatile,禁止指令重排和修改对其余线程及时可见,保障线程安全的基本前提之一
  3. sun.misc.Unsafe UNSAFE JDK内部大量使用的一个辅助类,用于保障基本的cas操做(其原理尚没有研究,后续在并发篇中详细探究下)

II. 线程安全保障

按照常见的线程安全保障机制,通常处理方案是对进队和出队操做进行加锁,保障同一时刻只能有一个线程对队列进行写操做安全

然而队列不一样于Map,List, 出队和进队是比较频繁的操做,即队列会出现频繁的修改,若是加锁,性能无异会受到严重的影响数据结构

所以线程安全保障不是经过加锁来实现的多线程

1. 进队 offer

经过源码分析,不加锁如何实现线程安全并发

public boolean offer(E e) {
    // 队列中不能塞null
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    // 死循环,确保入队必定成功
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) { // p 为最后一个节点,尝试入队
            if (p.casNext(null, newNode)) { // 将tail的next指向newNode
                // 入队成功
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)
            // 多线程环境下,若此时另外一个线程执行了出队操做,且此时p出队了
            // 那么在poll方法中的updateHead方法会将head指向当前的q,而把p.next指向本身,即:p.next == p
            // 这个时候就会形成tail在head的前面,须要从新设置p
            // 若是tail已经改变,将p指向tail,但这个时候tail依然可能在head前面
            // 若是tail没有改变,直接将p指向head
            p = (t != (t = tail)) ? t : head;
        else
            // tail已经不是最后一个节点,将p指向最后一个节点
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

上面的实现虽然很短,在单线程环境下很好理解,就是获取队列尾,而后将队列尾的next指向新的节点,并更新tail便可 (即代码中if条件命中的逻辑)源码分析

涉及到多线程进行并发的出队进队时,逻辑就没这么简单了,下面进行多线程的场景区分,辅助理解性能

case1: 另外一个线程也进行入队操做,且优先完成入队操做this

下面图解进行示意

  • 因为线程B完成入队,致使q指向新的队尾
  • q != null, q != p
  • 进入else逻辑,执行将p指向最后一个节点,再次循环

conOffer

case2: 另外一个线程执行出队操做,且节点p正好出队了

下面图解示意

  • 完成初始化以后,p,t指向tail
  • 另外一个线程执行出队操做,正好将p出队了,由于出队中会执行updateHead逻辑,可能出现 p.next = p的场景
  • 此时进行q的赋值,获得结果 q==p, 而此时指向的tail节点不会为null(由于指向的是内存中原队列的Node节点,真实存在),所以进入 else if逻辑

conOffer2

2. 出队 poll

public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

出队操做,原理和入队操做差很少,都是经过非锁机制实现,经过CAS确保出队和入队自己的原子性;而为了保证多线程的并发修改安全,在死循环中进行了各类场景的兼容

3. 队列个数获取

单独拿出size方法,由于与常见的容器不一样,ConcurrentLinkedQueuesize()方法是非并发安全,且每次都会进行扫描整个链表,结果以下

public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}

III. 应用场景&小结

  1. 底层存储结构为单向链表
  2. 出队、入队都是非加锁操做,经过CAS保证出队和入队的原子性;为保证并发安全,出队和入队放在死循环中进行,对不一样的并发场景进行兼容
  3. size()方法非线程安全,且时间复杂度为 O(n)
  4. 判断队列是否为空,请用 isEmpty() 进行替代
  5. 由于未加锁,出队入队的性能相对较好,切代码的实现比较优雅;然实际的业务场景中,非大神级人物尽可能不要这么玩,维护成本过高

扫描关注,java分享

QrCode

参考

相关文章
相关标签/搜索