ConcurrentLinkedQueue
并发安全的链表队列,主要适用于多线程环境中;底层数据结构为链表,因为队列自己频繁的出队和进队,那么这个线程安全是如何保障html
从命名能够基本推测底层数据结构应该是链表,结合源码看下具体的链表节点java
private static class Node<E> { volatile E item; volatile Node<E> next; Node(E item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { // jdk内用于保障原子操做的辅助类 UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } // 链表头,注意 volatile 声明 private transient volatile Node<E> head; // 链表尾,注意 volatile 声明 private transient volatile Node<E> tail;
从定义能够须要注意如下几点node
volatile
,禁止指令重排和修改对其余线程及时可见,保障线程安全的基本前提之一sun.misc.Unsafe UNSAFE
JDK内部大量使用的一个辅助类,用于保障基本的cas操做(其原理尚没有研究,后续在并发篇中详细探究下)按照常见的线程安全保障机制,通常处理方案是对进队和出队操做进行加锁,保障同一时刻只能有一个线程对队列进行写操做安全
然而队列不一样于Map,List, 出队和进队是比较频繁的操做,即队列会出现频繁的修改,若是加锁,性能无异会受到严重的影响数据结构
所以线程安全保障不是经过加锁来实现的多线程
offer
经过源码分析,不加锁如何实现线程安全并发
public boolean offer(E e) { // 队列中不能塞null checkNotNull(e); final Node<E> newNode = new Node<E>(e); // 死循环,确保入队必定成功 for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null) { // p 为最后一个节点,尝试入队 if (p.casNext(null, newNode)) { // 将tail的next指向newNode // 入队成功 if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } else if (p == q) // 多线程环境下,若此时另外一个线程执行了出队操做,且此时p出队了 // 那么在poll方法中的updateHead方法会将head指向当前的q,而把p.next指向本身,即:p.next == p // 这个时候就会形成tail在head的前面,须要从新设置p // 若是tail已经改变,将p指向tail,但这个时候tail依然可能在head前面 // 若是tail没有改变,直接将p指向head p = (t != (t = tail)) ? t : head; else // tail已经不是最后一个节点,将p指向最后一个节点 p = (p != t && t != (t = tail)) ? t : q; } }
上面的实现虽然很短,在单线程环境下很好理解,就是获取队列尾,而后将队列尾的next指向新的节点,并更新tail便可 (即代码中if条件命中的逻辑),源码分析
涉及到多线程进行并发的出队进队时,逻辑就没这么简单了,下面进行多线程的场景区分,辅助理解性能
case1: 另外一个线程也进行入队操做,且优先完成入队操做this
下面图解进行示意
case2: 另外一个线程执行出队操做,且节点p正好出队了
下面图解示意
updateHead
逻辑,可能出现 p.next = p
的场景q==p
, 而此时指向的tail节点不会为null(由于指向的是内存中原队列的Node节点,真实存在),所以进入 else if
逻辑poll
public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; } } }
出队操做,原理和入队操做差很少,都是经过非锁机制实现,经过CAS确保出队和入队自己的原子性;而为了保证多线程的并发修改安全,在死循环中进行了各类场景的兼容
单独拿出size
方法,由于与常见的容器不一样,ConcurrentLinkedQueue
的size()
方法是非并发安全,且每次都会进行扫描整个链表,结果以下
public int size() { int count = 0; for (Node<E> p = first(); p != null; p = succ(p)) if (p.item != null) // Collection.size() spec says to max out if (++count == Integer.MAX_VALUE) break; return count; }
size()
方法非线程安全,且时间复杂度为 O(n)
isEmpty()
进行替代