在并发编程中咱们有时候须要使用线程安全的队列。若是咱们要实现一个线程安全的队列有两种实现方式:一种是使用阻塞算法,另外一种是使用非阻塞算法。使用阻塞算法的队列能够用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不一样的锁)等方式来实现,而非阻塞的实现方式则可使用循环CAS的方式来实现,本文让咱们一块儿来研究下Doug Lea是如何使用非阻塞的方式来实现线程安全队列ConcurrentLinkedQueue的,相信从大师身上咱们能学到很多并发编程的技巧。html
ConcurrentLinkedQueue是一个基于连接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当咱们添加一个元素的时候,它会添加到队列的尾部,当咱们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法来实现,该算法在Michael & Scott算法上进行了一些修改, Michael & Scott算法的详细信息能够参见参考资料一。java
咱们经过ConcurrentLinkedQueue的类图来分析一下它的结构。算法
(图1)编程
ConcurrentLinkedQueue由head节点和tair节点组成,每一个节点(Node)由节点元素(item)和指向下一个节点的引用(next)组成,节点与节点之间就是经过这个next关联起来,从而组成一张链表结构的队列。默认状况下head节点存储的元素为空,tair节点等于head节点。安全
private transient volatile Node<E> tail = head;
入队列就是将入队节点添加到队列的尾部。为了方便理解入队时队列的变化,以及head节点和tair节点的变化,每添加一个节点我就作了一个队列的快照图。并发
(图二)spa
经过debug入队过程并观察head节点和tail节点的变化,发现入队主要作两件事情,第一是将入队节点设置成当前队列尾节点的下一个节点。第二是更新tail节点,若是tail节点的next节点不为空,则将入队节点设置成tail节点,若是tail节点的next节点为空,则将入队节点设置成tail的next节点,因此tail节点不老是尾节点,理解这一点对于咱们研究源码会很是有帮助。线程
上面的分析让咱们从单线程入队的角度来理解入队过程,可是多个线程同时进行入队状况就变得更加复杂,由于可能会出现其余线程插队的状况。若是有一个线程正在入队,那么它必须先获取尾节点,而后设置尾节点的下一个节点为入队节点,但这时可能有另一个线程插队了,那么队列的尾节点就会发生变化,这时当前线程要暂停入队操做,而后从新获取尾节点。让咱们再经过源码来详细分析下它是如何使用CAS算法来入队的。debug
public boolean offer(E e) { if (e == null) throw new NullPointerException(); //入队前,建立一个入队节点 Node<E> n = new Node<E>(e); retry: //死循环,入队不成功反复入队。 for (;;) { //建立一个指向tail节点的引用 Node<E> t = tail; //p用来表示队列的尾节点,默认状况下等于tail节点。 Node<E> p = t; for (int hops = 0; ; hops++) { //得到p节点的下一个节点。 Node<E> next = succ(p); //next节点不为空,说明p不是尾节点,须要更新p后在将它指向next节点 if (next != null) { //循环了两次及其以上,而且当前节点仍是不等于尾节点 if (hops > HOPS && t != tail) continue retry; p = next; } //若是p是尾节点,则设置p节点的next节点为入队节点。 else if (p.casNext(null, n)) { //若是tail节点有大于等于1个next节点,则将入队节点设置成tair节点,更新失败了也 不要紧,由于失败了表示有其余线程成功更新了tair节点。 if (hops >= HOPS) casTail(t, n); // 更新tail节点,容许失败 return true; } // p有next节点,表示p的next节点是尾节点,则从新设置p节点 else { p = succ(p); } } } }
从源代码角度来看整个入队过程主要作二件事情。第一是定位出尾节点,第二是使用CAS算法能将入队节点设置成尾节点的next节点,如不成功则重试。设计
第一步定位尾节点。tail节点并不老是尾节点,因此每次入队都必须先经过tail节点来找到尾节点,尾节点可能就是tail节点,也多是tail节点的next节点。代码中循环体中的第一个if就是判断tail是否有next节点,有则表示next节点多是尾节点。获取tail节点的next节点须要注意的是p节点等于p的next节点的状况,只有一种可能就是p节点和p的next节点都等于空,表示这个队列刚初始化,正准备添加第一次节点,因此须要返回head节点。获取p节点的next节点代码以下
final Node<E> succ(Node<E> p) { Node<E> next = p.getNext(); return (p == next) ? head : next; }
第二步设置入队节点为尾节点。p.casNext(null, n)方法用于将入队节点设置为当前队列尾节点的next节点,p若是是null表示p是当前队列的尾节点,若是不为null表示有其余线程更新了尾节点,则须要从新获取当前队列的尾节点。
hops的设计意图。上面分析过对于先进先出的队列入队所要作的事情就是将入队节点设置成尾节点,doug lea写的代码和逻辑仍是稍微有点复杂。那么我用如下方式来实现行不行?
public boolean offer(E e) { if (e == null) throw new NullPointerException(); Node<E> n = new Node<E>(e); for (;;) { Node<E> t = tail; if (t.casNext(null, n) && casTail(t, n)) { return true; } } }
让tail节点永远做为队列的尾节点,这样实现代码量很是少,并且逻辑很是清楚和易懂。可是这么作有个缺点就是每次都须要使用循环CAS更新tail节点。若是能减小CAS更新tail节点的次数,就能提升入队的效率,因此doug lea使用hops变量来控制并减小tail节点的更新频率,并非每次节点入队后都将 tail节点更新成尾节点,而是当 tail节点和尾节点的距离大于等于常量HOPS的值(默认等于1)时才更新tail节点,tail和尾节点的距离越长使用CAS更新tail节点的次数就会越少,可是距离越长带来的负面效果就是每次入队时定位尾节点的时间就越长,由于循环体须要多循环一次来定位出尾节点,可是这样仍然能提升入队的效率,由于从本质上来看它经过增长对volatile变量的读操做来减小了对volatile变量的写操做,而对volatile变量的写操做开销要远远大于读操做,因此入队效率会有所提高。
private static final int HOPS = 1;
还有一点须要注意的是入队方法永远返回true,因此不要经过返回值判断入队是否成功。
出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用。让咱们经过每一个节点出队的快照来观察下head节点的变化。
从上图可知,并非每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操做才会更新head节点。这种作法也是经过hops变量来减小使用CAS更新head节点的消耗,从而提升出队效率。让咱们再经过源码来深刻分析下出队过程。
public E poll() { Node<E> h = head; // p表示头节点,须要出队的节点 Node<E> p = h; for (int hops = 0;; hops++) { // 获取p节点的元素 E item = p.getItem(); // 若是p节点的元素不为空,使用CAS设置p节点引用的元素为null,若是成功则返回p节点的元素。 if (item != null && p.casItem(item, null)) { if (hops >= HOPS) { //将p节点下一个节点设置成head节点 Node<E> q = p.getNext(); updateHead(h, (q != null) ? q : p); } return item; } // 若是头节点的元素为空或头节点发生了变化,这说明头节点已经被另一个线程修改了。那么获取p节点的下一个节点 Node<> next = succ(p); // 若是p的下一个节点也为空,说明这个队列已经空了 if (next == null) { // 更新头节点。 updateHead(h, p); break; } // 若是下一个元素不为空,则将头节点的下一个节点设置成头节点 p = next; } return null; }
首先获取头节点的元素,而后判断头节点元素是否为空,若是为空,表示另一个线程已经进行了一次出队操做将该节点的元素取走,若是不为空,则使用CAS的方式将头节点的引用设置成null,若是CAS成功,则直接返回头节点的元素,若是不成功,表示另一个线程已经进行了一次出队操做更新了head节点,致使元素发生了变化,须要从新获取头节点。