在并发编程中,有时候须要使用线程安全的队列。若是要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另外一种是使用非阻塞算法。使用阻塞算法的队列能够用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不一样的锁)等方式来实现。非阻塞的实现方
式则可使用循环 CAS 的方式来实现。本节让咱们一块儿来研究一下 Doug Lea 是如何使用非阻塞的方式来实现线程安全队列 ConcurrentLinkedQueue 的,相信从大师身上咱们能学到很多并发编程的技巧。html
ConcurrentLinkedQueue
先进先出(FIFO)单向队列ConcurrentLinkedDeque
双向队列下面以 ConcurrentLinkedQueue 为例看使用非阻塞算法(CAS) 保证线程安全。java
ConcurrentLinkedQueue 由 head 节点和 tail 节点组成,每一个节点(Node)由节点元素(item)和
指向下一个节点(next)的引用组成,节点与节点之间就是经过这个 next 关联起来,从而组成一
张链表结构的队列。默认状况下 head 节点存储的元素为空,tail 节点等于 head 节点。head、tail 以及 Node.item、Node.next 都是 volatile 修辞。node
private transient volatile Node<E> head; private transient volatile Node<E> tail; private static class Node<E> { volatile E item; volatile Node<E> next; }
默认状况下 head、tail 都是空节点。算法
public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); }
获取一个节点的后继节点编程
// 遇到哨兵节点,从 head 开始遍历 final Node<E> succ(Node<E> p) { Node<E> next = p.next; return (p == next) ? head : next; }
入队列就是将入队节点添加到队列的尾部。为了方便理解入队时队列的变化,以及 head 节点和 tail 节点的变化,每添加一个节点我就作了一个队列的快照图(注意这是单线程入队状况)。安全
第一步添加元素 e1。队列更新 head 节点的 next 节点为元素 e1 节点。又由于 tail 节点默认状况下等于 head 节点,因此它们的 next 节点都指向元素 e1 节点。
第二步添加元素 e2。队列首先设置元素 e1 节点的 next 节点为元素 e2 节点,而后更新 tail 节点指向元素 e2 节点。
第三步添加元素 e3,设置 tail 节点的next节点为元素 e3 节点。
第四步添加元素 e4,设置元素 e3 的 next 节点为元素 e4 节点,而后将 tail 节点指向元素 e4 节点。数据结构
经过 debug 入队过程并观察 head 节点和 tail 节点的变化,发现入队主要作两件事情,第一是将入队节点设置成当前队列尾节点的下一个节点。第二是更新 tail 节点,若是 tail 节点的 next 节点不为空,则将入队节点设置成 tail 节点,若是 tail 节点的 next 节点为空,则将入队节点设置成 tail 的 next 节点,因此 tail 节点不老是尾节点,理解这一点对于咱们研究源码会很是有帮助。并发
上面的分析让咱们从单线程入队的角度来理解入队过程,可是多个线程同时进行入队状况就变得更加复杂,由于可能会出现其余线程插队的状况。若是有一个线程正在入队,那么它必须先获取尾节点,而后设置尾节点的下一个节点为入队节点,但这时可能有另一个线程插队了,那么队列的尾节点就会发生变化,这时当前线程要暂停入队操做,而后从新获取尾节点。让咱们再经过源码来详细分析下它是如何使用CAS算法来入队的。源码分析
public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; // 1. p is last node if (q == null) { // 1.1 经过自旋保证节点必定添加到数据链中 if (p.casNext(null, newNode)) { // 1.2 p表明当前结点,当前节点不是尾节点时更新 // 也就是说tail不必定是尾节点,尾节点为tail或tail.next // 更新失败了也不要紧,由于失败了表示有其余线程成功更新了tail节点 if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } // 2. 遇到哨兵节点,从 head 开始遍历 // 可是若是 tail 被修改,则使用 tail(由于可能被修改正确了) else if (p == q) p = (t != (t = tail)) ? t : head; // 3. 尾节点只多是tail或tail.next。若是tail发生变化则直接从tail开始遍历 else // Check for tail updates after two hops. // 其实我认为这里一直取p.next节点遍历最终能够遍历到尾节点,能够没必要取从新tail // 可能从新取tail会遍历更快 p = (p != t && t != (t = tail)) ? t : q; } }
上面分析咱们知道真正的尾节点可能 tail 或 tail.next,doug lea 写的代码和逻辑仍是稍微有点复杂。那么可不可让 tail 永远指向尾节点呢?代码以下spa
public boolean offer(E e) { Node n = new Node(e); for (;;) { Node</e><e> t = tail; if (t.casNext(null, n) && casTail(t, n)) { return true; } } }
让 tail 节点永远做为队列的尾节点,这样实现代码量很是少,并且逻辑很是清楚和易懂。可是这么作有个缺点就是每次都须要使用循环 CAS 更新 tail 节点,若是能减小 CAS 更新 tail 节点的次数,就能提升入队的效率。
因此 doug lea 使用 hops 变量(JDK1.8没有直接使用hops,但逻辑没有改变)来控制并减小 tail 节点的更新频率,并非每次节点入队后都将 tail 节点更新成尾节点,而是当 tail 节点和尾节点的距离大于等于常量 HOPS 的值(默认等于1)时才更新 tail 节点,tail 和尾节点的距离越长使用 CAS 更新 tail 节点的次数就会越少,可是距离越长带来的负面效果就是每次入队时定位尾节点的时间就越长,由于循环体须要多循环一次来定位出尾节点,可是这样仍然能提升入队的效率,由于从本质上来看它经过增长对 volatile 变量的读操做来减小了对 volatile 变量的写操做,而对 volatile 变量的写操做开销要远远大于读操做,因此入队效率会有所提高。
// JDK1.7 代码直接使用 hops 来控制 public boolean offer(E e) { if (e == null) throw new NullPointerException(); Node<E> n = new Node<E>(e); retry: for (;;) { Node<E> t = tail; Node<E> p = t; for (int hops = 0; ; hops++) { // 得到p节点的下一个节点。 Node<E> next = succ(p); // next节点不为空,说明p不是尾节点,须要更新p后在将它指向next节点 if (next != null) { if (hops > HOPS && t != tail) continue retry; p = next; } else if (p.casNext(null, n)) { if (hops >= HOPS) casTail(t, n); // 更新tail节点,容许失败 return true; } else { p = succ(p); } } } }
出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用。让咱们经过每一个节点出队的快照来观察下head节点的变化。
出队的代码和入队差很少,也有 hop 的概念。出队了完成了两件事:一是将节点的 item 设置为 null;二是更新头节点并将头节点的 next 指向本身,也就是哨兵节点。
public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; // 1. 出队后 p.item 必定为 null if (item != null && p.casItem(item, null)) { if (p != h) // hop two nodes at a time // 更新头节点并将头节点的 next 指向本身。成为哨兵节点,等 GC 回收 // 一样容许失败,说明其它的线程更新了头节点 updateHead(h, ((q = p.next) != null) ? q : p); return item; // 2. 遍历到尾节点了,没有元素了 } else if ((q = p.next) == null) { updateHead(h, p); return null; // 3. 出现哨兵节点,说明有其它线程poll后更新了head,须要从新从head开始遍历 } else if (p == q) continue restartFromHead; // 4. 继续遍历 else p = q; } } }
// 能够看到 size 是一个很耗时的方法 public int size() { int count = 0; for (Node<E> p = first(); p != null; p = succ(p)) if (p.item != null) if (++count == Integer.MAX_VALUE) break; return count; }
public boolean isEmpty() { return first() == null; }
若是只判断集合中是否存在元素请使用 isEmpty
// 从 head 开始遍历找到第一个 item!=null 的元素 Node<E> first() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { boolean hasItem = (p.item != null); // 要么找到了 item!=null 的元素,要么遍历完整个链表 if (hasItem || (q = p.next) == null) { updateHead(h, p); return hasItem ? p : null; } else if (p == q) continue restartFromHead; else p = q; } } }
// 遇到哨兵节点,从 head 开始遍历 final Node<E> succ(Node<E> p) { Node<E> next = p.next; return (p == next) ? head : next; }
参考:
天天用心记录一点点。内容也许不重要,但习惯很重要!