本文首发于一世流云专栏: https://segmentfault.com/blog...
在开始讲ConcurrentLinkedDeque
以前,咱们先来了解下Deque这种数据结构,咱们知道Queue是一种具备FIFO特色的数据结构,元素只能在队首进行“入队”操做,在队尾进行“出队”操做。java
而Deque(double-ended queue)是一种双端队列,也就是说能够在任意一端进行“入队”,也能够在任意一端进行“出队”:node
Deque的数据结构示意图以下:算法
咱们再来看下JDK中Queue和Deque这两种数据结构的接口定义,看看Deque和Queue相比有哪些加强:segmentfault
Queue的接口很是简单,一共只有三种类型的操做:入队、出队、读取。api
上述方法,能够划分以下:安全
操做类型 | 抛出异常 | 返回特殊值 |
---|---|---|
入队 | add(e) | offer(e) |
出队 | remove() | poll() |
读取 | element() | peek() |
每种操做类型,都给出了两种方法,区别就是其中一种操做在队列的状态不知足某些要求时,会抛出异常;另外一种,则直接返回特殊值(如null)。数据结构
Queue接口的全部方法Deque都具有,只不过队首/队尾均可以进行“出队”和“入队”操做:并发
操做类型 | 抛出异常 | 返回特殊值 |
---|---|---|
队首入队 | addFirst(e) | offerFirst(e) |
队首出队 | removeFirst() | pollFirst() |
队首读取 | getFirst() | peekFirst() |
队尾入队 | addLast(e) | offerLast(e) |
队尾出队 | removeLast() | pollLast() |
队尾读取 | getLast() | peekLast() |
除此以外,Deque还能够看成“栈”来使用,咱们知道“栈”是一种具备“LIFO”特色的数据结构(关于栈,能够参考个人这篇博文:栈),Deque提供了push
、pop
、peek
这三个栈方法,通常实现这三个方法时,能够利用已有方法,即有以下映射关系:oracle
栈方法 | Deque方法 |
---|---|
push | addFirst(e) |
pop | removeFirst() |
peek | peekFirst() |
关于Deque接口的更多细节,读者能够参考Oracle的官方文档:https://docs.oracle.com/javas...工具
ConcurrentLinkedDeque
是JDK1.7时,J.U.C包引入的一个集合工具类。在JDK1.7以前,除了Stack类外,并无其它适合并发环境的“栈”数据结构。ConcurrentLinkedDeque做为双端队列,能够看成“栈”来使用,而且高效地支持并发环境。
ConcurrentLinkedDeque和ConcurrentLinkedQueue同样,采用了无锁算法,底层基于自旋+CAS的方式实现。
咱们先来看下ConcurrentLinkedDeque的内部结构:
public class ConcurrentLinkedDeque<E> extends AbstractCollection<E> implements Deque<E>, java.io.Serializable { /** * 头指针 */ private transient volatile Node<E> head; /** * 尾指针 */ private transient volatile Node<E> tail; private static final Node<Object> PREV_TERMINATOR, NEXT_TERMINATOR; // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; static { PREV_TERMINATOR = new Node<Object>(); PREV_TERMINATOR.next = PREV_TERMINATOR; NEXT_TERMINATOR = new Node<Object>(); NEXT_TERMINATOR.prev = NEXT_TERMINATOR; try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentLinkedDeque.class; headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("tail")); } catch (Exception e) { throw new Error(e); } } /** * 双链表结点定义 */ static final class Node<E> { volatile Node<E> prev; // 前驱指针 volatile E item; // 结点值 volatile Node<E> next; // 后驱指针 Node() { } Node(E item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } void lazySetPrev(Node<E> val) { UNSAFE.putOrderedObject(this, prevOffset, val); } boolean casPrev(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, prevOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long prevOffset; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; prevOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("prev")); itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } // ... }
能够看到,ConcurrentLinkedDeque的内部和ConcurrentLinkedQueue相似,不过是一个双链表结构,每入队一个元素就是插入一个Node类型的结点。字段head
指向队列头,tail
指向队列尾,经过Unsafe来CAS操做字段值以及Node对象的字段值。
须要特别注意的是ConcurrentLinkedDeque包含两个特殊字段:PREV_TERMINATOR、NEXT_TERMINATOR。
这两个字段初始时都指向一个值为null的空结点,这两个字段在结点删除时使用,后面会详细介绍:
ConcurrentLinkedDeque包含两种构造器:
/** * 空构造器. */ public ConcurrentLinkedDeque() { head = tail = new Node<E>(null); }
/** * 从已有集合,构造队列 */ public ConcurrentLinkedDeque(Collection<? extends E> c) { Node<E> h = null, t = null; for (E e : c) { checkNotNull(e); Node<E> newNode = new Node<E>(e); if (h == null) h = t = newNode; else { // 在队尾插入元素 t.lazySetNext(newNode); newNode.lazySetPrev(t); t = newNode; } } initHeadTail(h, t); }
咱们重点看下空构造器,经过空构造器创建的ConcurrentLinkedDeque对象,其head和tail指针并不是指向null,而是指向一个item值为null的Node结点——哨兵结点,以下图:
双端队列与普通队列的入队区别是:双端队列既能够在“队尾”插入元素,也能够在“队首”插入元素。ConcurrentLinkedDeque的入队方法有不少:addFirst(e)
、addLast(e)
、offerFirst(e)
、offerLast(e)
:
public void addFirst(E e) { linkFirst(e); } public void addLast(E e) { linkLast(e); } public boolean offerFirst(E e) { linkFirst(e); return true; } public boolean offerLast(E e) { linkLast(e); return true; }
能够看到,队首“入队”其实就是调用了linkFirst(e)
方法,而队尾“入队”是调用了 linkLast(e)方法。咱们先来看下队首“入队”——linkFirst(e):
/** * 在队首插入一个元素. */ private void linkFirst(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); // 建立待插入的结点 restartFromHead: for (; ; ) for (Node<E> h = head, p = h, q; ; ) { if ((q = p.prev) != null && (q = (p = q).prev) != null) // Check for head updates every other hop. // If p == q, we are sure to follow head instead. p = (h != (h = head)) ? h : q; else if (p.next == p) // PREV_TERMINATOR continue restartFromHead; else { // p is first node newNode.lazySetNext(p); // CAS piggyback if (p.casPrev(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this deque, // and for newNode to become "live". if (p != h) // hop two nodes at a time casHead(h, newNode); // Failure is OK. return; } // Lost CAS race to another thread; re-read prev } } }
为了便于理解,咱们以示例来看:假设有两个线程ThreadA和ThreadB同时进行入队操做。
①ThreadA先单独入队一个元素9
此时,ThreadA会执行CASE3分支:
else { // CASE3: p是队首结点 newNode.lazySetNext(p); // “新结点”的next指向队首结点 if (p.casPrev(null, newNode)) { // 队首结点的prev指针指向“新结点” if (p != h) // hop two nodes at a time casHead(h, newNode); // Failure is OK. return; } // 执行到此处说明CAS操做失败,有其它线程也在队首插入元素 }
队列的结构以下:
②ThreadA入队一个元素2,同时ThreadB入队一个元素10
此时,依然执行CASE3分支,咱们假设ThreadA操做成功,ThreadB操做失败:
else { // CASE3: p是队首结点 newNode.lazySetNext(p); // “新结点”的next指向队首结点 if (p.casPrev(null, newNode)) { // 队首结点的prev指针指向“新结点” if (p != h) // hop two nodes at a time casHead(h, newNode); // Failure is OK. return; } // 执行到此处说明CAS操做失败,有其它线程也在队首插入元素 }
ThreadA的CAS操做成功后,会进入如下判断:
if (p != h) // hop two nodes at a time casHead(h, newNode); // Failure is OK.
上述判断的做用就是重置head头指针,能够看到,ConcurrentLinkedDeque实际上是以每次跳2个结点的方式移动指针,这主要考虑到并发环境以这种hop跳的方式能够提高效率。
此时队列的机构以下:
注意,此时ThreadB的p.casPrev(null, newNode)
操做失败了,因此会进入下一次自旋,在下一次自旋中继续进入CASE3。若是ThreadA的casHead操做没有完成,ThreadB就进入了下一次自旋,则会进入分支1,重置指针p指向队首。最终队列结构以下:
在队尾插入元素和队首相似,再也不赘述,读者能够本身阅读源码。
ConcurrentLinkedDeque的出队同样分为队首、队尾两种状况:removeFirst()
、pollFirst()
、removeLast()
、pollLast()
。
public E removeFirst() { return screenNullResult(pollFirst()); } public E removeLast() { return screenNullResult(pollLast()); } public E pollFirst() { for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.item; if (item != null && p.casItem(item, null)) { unlink(p); return item; } } return null; } public E pollLast() { for (Node<E> p = last(); p != null; p = pred(p)) { E item = p.item; if (item != null && p.casItem(item, null)) { unlink(p); return item; } } return null; }
能够看到,两个remove方法其实内部都调用了对应的poll方法,咱们重点看下队尾的“出队”——pollLast方法:
public E pollLast() { for (Node<E> p = last(); p != null; p = pred(p)) { E item = p.item; if (item != null && p.casItem(item, null)) { unlink(p); return item; } } return null; }
last方法用于寻找队尾结点,即知足p.next == null && p.prev != p
的结点:
Node<E> last() { restartFromTail: for (; ; ) for (Node<E> t = tail, p = t, q; ; ) { if ((q = p.next) != null && (q = (p = q).next) != null) // Check for tail updates every other hop. // If p == q, we are sure to follow tail instead. p = (t != (t = tail)) ? t : q; else if (p == t // It is possible that p is NEXT_TERMINATOR, // but if so, the CAS is guaranteed to fail. || casTail(t, p)) return p; else continue restartFromTail; } }
pred方法用于寻找当前结点的前驱结点(若是前驱是自身,则返回队尾结点):
final Node<E> pred(Node<E> p) { Node<E> q = p.prev; return (p == q) ? last() : q; }
unlink方法断开结点的连接:
/** * Unlinks non-null node x. */ void unlink(Node<E> x) { // assert x != null; // assert x.item == null; // assert x != PREV_TERMINATOR; // assert x != NEXT_TERMINATOR; final Node<E> prev = x.prev; final Node<E> next = x.next; if (prev == null) { unlinkFirst(x, next); } else if (next == null) { unlinkLast(x, prev); } else { Node<E> activePred, activeSucc; boolean isFirst, isLast; int hops = 1; // Find active predecessor for (Node<E> p = prev; ; ++hops) { if (p.item != null) { activePred = p; isFirst = false; break; } Node<E> q = p.prev; if (q == null) { if (p.next == p) return; activePred = p; isFirst = true; break; } else if (p == q) return; else p = q; } // Find active successor for (Node<E> p = next; ; ++hops) { if (p.item != null) { activeSucc = p; isLast = false; break; } Node<E> q = p.next; if (q == null) { if (p.prev == p) return; activeSucc = p; isLast = true; break; } else if (p == q) return; else p = q; } // TODO: better HOP heuristics if (hops < HOPS // always squeeze out interior deleted nodes && (isFirst | isLast)) return; // Squeeze out deleted nodes between activePred and // activeSucc, including x. skipDeletedSuccessors(activePred); skipDeletedPredecessors(activeSucc); // Try to gc-unlink, if possible if ((isFirst | isLast) && // Recheck expected state of predecessor and successor (activePred.next == activeSucc) && (activeSucc.prev == activePred) && (isFirst ? activePred.prev == null : activePred.item != null) && (isLast ? activeSucc.next == null : activeSucc.item != null)) { updateHead(); // Ensure x is not reachable from head updateTail(); // Ensure x is not reachable from tail // Finally, actually gc-unlink x.lazySetPrev(isFirst ? prevTerminator() : x); x.lazySetNext(isLast ? nextTerminator() : x); } } }
ConcurrentLinkedDeque相比ConcurrentLinkedQueue,功能更丰富,可是因为底层结构是双链表,且彻底采用CAS+自旋的无锁算法保证线程安全性,因此须要考虑各类并发状况,源码比ConcurrentLinkedQueue更加难懂,留待有精力做进一步分析。
ConcurrentLinkedDeque使用了自旋+CAS的非阻塞算法来保证线程并发访问时的数据一致性。因为队列自己是一种双链表结构,因此虽然算法看起来很简单,但其实须要考虑各类并发的状况,实现复杂度较高,而且ConcurrentLinkedDeque不具有实时的数据一致性,实际运用中,若是须要一种线程安全的栈结构,可使用ConcurrentLinkedDeque。
另外,关于ConcurrentLinkedDeque还有如下须要注意的几点: