在以前的文章中,已经对ArrayBlockingQueue、LinkedBlockingQueue这两个比较经常使用的阻塞队列作了源码分析,咱们知道其内部都是经过ReentrantLock来保证数据读写的线程安全,经过Condition来完成线程等待和唤醒,只不过ArrayBlockingQueue在读写时使用了一把锁所完成,而LinkedBlockingQueue对于读和写分别使用了两把锁来进行处理,从而达到读写分离的效果。算法
然而,经过锁机制来实现一个线程安全的队列,在并发不是特别高的状况下并非很是合适,由于在大多数状况下都只有几个线程同时访问,而每次执行都须要去加一次锁,从而致使线程进行上下文切换,影响总体性能。所以,JDK还为咱们提供了一个无锁线程安全的队列——ConcurrentLinkedQueue,其底层使用CAS来实现无阻塞的并发控制。本文将该队列的实现机制和源码作一个分析,让咱们共同看看Doug Lea大神是如何巧妙地经过无锁机制来实现一个线程安全的队列。编程
首先让咱们看看JDK文档对该类的描述:安全
ConcurrentLinkedQueue的API描述.png多线程
ConcurrentLinkedQueue是一个基于链表、无界、线程安全的队列。这个队列将元素按照先进先出的顺序进行存储。队列的头节点是在队列中存在时间最久的节点,队列的尾节点是在队列中存在时间最短的节点。新的元素会被插入到队列的尾部,而队列的元素的获取操做则会从队列的头部去获取元素。ConcurrentLinekedQueue适合做为多个线程共享访问的集合。与大多数并发集合的实现相似,该类也不容许添加null元素,该类的实现使用了一个高效的无锁算法,其算法的是基于podc-1996.pdf所改进的。并发
除了上面所描述的基本特性以外,ConcurrentLinkedQueue中还有一些其余的特色:工具
因为是基于链表的实现方式,与其余的并发队列相似,都会在内部定义一个节点类,ConcurrentLinedQueu亦是如此,首先咱们看一下节点的定义:源码分析
//该节点是一个静态内部类,所以其只能做用于该队列内部 private static class Node<E> { /* * 当前节点存储的元素,注意到这里使用volatile关键字来对节点进行 * 修饰,其目的是在并发读的时候保证内存的可见性 */ volatile E item; //当前节点的下一个节点 volatile Node<E> next; /** * 构建新的节点,这里没有使用volatile的方式来对节点的元素值进行设置,而是使用普通的写方式 * 由于对于一个新增的节点,只有在其被成功插入到队列尾部才对外可见,所以在这里没有对数据可见性的强制要求 */ Node(E item) { UNSAFE.putObject(this, itemOffset, item); } /* * 经过Unsafe来完成对当前节点元素的CAS操做 */ boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } /* * 使用普通的方式来设置当前节点的下一个节点 */ void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } /* * 经过Unsafe来完成对当前节点的下一个节点的CAS操做 */ boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = Node.class; //经过Unsafe来获取一个Node节点的item属性在内存中相对该对象的位置偏移量 itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); //经过Unsafe来获取一个Node节点的next节点属性在内存中相对该对象的位置偏移量 nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
上面节点的定义与以前分析的SynchronousQueue中的内部定义的节点很是相似,这里再也不过多阐述。性能
队列的属性定义this
为了提升快速查找队列中第一个节点和最后一个节点,所以ConcurrentLinkedQueue中分别定义了一个head节点和tail节点来快速定位。spa
/** * 不变性: * - 队列中全部未删除的节点均可以经过head节点的succ方法查找到 * - head节点必定不可能等于null * - (tmp = head).next != tmp,即head的next不能指向本身。 * * 可变性: * - head的item可能为null,也可能不为null * - tail节点可能会滞后于head节点,所以从head节点未必必定能够找到tail节点 * */ private transient volatile Node<E> head; /** * 不变性: * - 节点中的最后一个元素老是能够经过tail的succ方法来获取 * - tail节点不等于null * * 可变性: * - head的item可能为null,也可能不为null * - tail 节点的next可能指向本身,也可能不指向本身 * - tail节点可能会滞后于head节点,所以从head节点未必必定能够找到tail节点 */ private transient volatile Node<E> tail; public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); }
经过上面的描述咱们知道了该队列是经过一个头节点和一个尾节点,而后将中间连接节点之间两两相链接构成一个队列,下面让咱们分析一下ConcurrentLinkedQueue的内部的具体实现。(在这里须要说明一下,因为其节点是彻底地基于Unsafe来完成CAS的操做,若是你对该内容还不是很熟悉的话,能够参考个人深刻分析Java中的原子操做这篇文章,里面对原子操做有一个比较细致的描述。
在分析源码以前,咱们先经过几张图来讲明一下ConcurrentLinkedQueue的入队列的总体过程。在了解完总体的过程后,再结合源码去分析细节会更加容易理解:
队列初始化
队列初始化状态
在队列刚建立时,head和tail同时指向空节点,咱们也称其为dummy
节点。
dummy节点的说明
添加元素a
添加元素a
向队列中添加元素a,此时只是新节点追加到第一个节点的后面,可是tail节点并未发生改变。
添加元素b
添加元素b
向队列中添加元素b,此时新节点与原先的tail节点之间的距离大于1,所以tail节点在这个时候会更新,真正的指向了最后一个节点
添加元素c
添加元素c
因为新节点与tail节点的距离没有大于1,所以此时tail节点一样不会发生更新。
添加元素d
添加元素d
经过上面的图示,咱们能够看到ConcurrentLinkedQueue在入队列过程当中很是明显的一个特色就是tail指针不是实时更新的,即tail节点可能会滞后于队列中真正的最后一个节点,只有当最后的一个节点与tail节点以前的距离大于1时才会更新,而这样设计的目的就是为了减小避免每增长一个节点,tail节点都须要去执行一次CAS操做的状况发生。
public boolean offer(E e) { // 因为元素不容许为null,所以对元素作一个检查 checkNotNull(e); // 生成待插入的节点 final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; // 当q等于null时,则p就是队列中的最后一个元素 if (q == null) { // 执行cas操做,若是执行成功,则newNode成为队列的最后一个元素,但它未必是tail指向的元素 if (p.casNext(null, newNode)) { // 经过判断p!=t,从而肯定当前新节点与tail指向的节点之间的距离是否大于1 // 若是大于1,则须要更新tail指针 if (p != t) casTail(t, newNode); return true; } } // 若是p==q,则意味着当前p节点已经被从队列中移除(若是单纯从入队列看是看不出来的,后面结合出队列再回头分析) else if (p == q) /* 判断在执行过程当中tail是否发生变化,若是未发生变化,则tail也已经脱落队列 * 由于 p = t = tail,而p已经脱离队列,从而推断出tail也脱离了队列 * 那么此时只能从head开始,从新查找队列的最后一个元素 * 若是tail发生了变化,则直接从当前队列的tail开始查找队列的最后一个元素 */ p = (t != (t = tail)) ? t : head; else /** * 因为p节点的next不为null,而且p节点并未从队列中删除,所以须要继续查找队列的最后一个节点 * 判断执行过程当中tail节点是否发生了变化 * 若是发生了变化,则让p执行当前的tail,不然就让p直接指向它的next节点q */ p = (p != t && t != (t = tail)) ? t : q; } }
可能会有很多朋友对上面t != (t = tail)
的处理感到疑惑,疑惑的缘由可能会以为一个变量本身和本身比较,那不是必定为true嘛,怎么还会出现等于false的可能呢。为了理解这个问题,咱们一块儿看一下下面的这段代码:
public class VarCompareTest { static volatile int b = 2; public static void main(String[] args) { int a = b; int c = a != (a = b) ? 5 : 4; System.out.println(a); System.out.println(c); } }
这段代码的也用到了上面相似的t != (t = tail)
,可是在编译完成后,咱们经过IDEA自己的反编译工具来查看一下对应的Class文件的结果(目前因为对字节码指令不熟悉,所以不从那个角度去解读):
反编译Class文件后的结果
经过上面的代码,咱们能够看到,一开始a的值就等于b的值,其值为2;紧接着a的值首先经过一个局部变量记录,而后再将b的值赋值给a,因为b可能存在多线程修改的可能,此时b的值可能被其余线程改为了3,所以a的值也会变为3,最后再拿2与3进行比较,即var10000 != b
进行比较,就会存在等于false的状况发生了。对于t != (t = tail)
的状况也是如此,相信经过这个例子说明你们应该明白其中的原理了!
在分析出队列源码以前,咱们也结合上面的图来看一下出队列的总体过程:
队列当前状态
这里假设队列是基于上面入队列以后的状态进行的。
移除第1个节点
移除第一个节点
本来指向head的节点,此时的next指向了本身,所以它从队列中真正的移除。存储a的节点,其item置为了null,而且head节点发生变动,真正指向了队列中第一个有效(真正存储数据)的节点。
移除第2个节点
移除第2个节点
此时只是仅仅将节点的item设置为了null,可是head节点没有发生变动,这样作的目的也是为了减小一次CAS操做,它会等到下一次才去变动。
移除第3个节点
移除第3个节点
移除第4个节点
移除第4个节点
看完上图的分析,相信你们对ConcurrentLinkedQueue出队列的操做应该有一个直观的理解了,下面咱们看下源码的具体实现:
public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; // 若是元素的item不为null,则说明p节点是当前队列中的第一个未被删除的节点 // 此时也说明head指向的节点确实是队列中的第一个元素 // 经过CAS操做,将item设置为null,来标记其已经被删除 if (item != null && p.casItem(item, null)) { // 判断p节点与head指向的节点是不是同一个,若是不是则须要将head节点向前移动 if (p != h) updateHead(h, ((q = p.next) != null) ? q : p); return item; } // 节点的next为null,则说明队列为空,只有一个dummy节点 else if ((q = p.next) == null) { // 尝试将dummy节点p设置为新的head updateHead(h, p); return null; } // 若是p节点被删除,只能从队列的head从头开始再次查找 else if (p == q) // 跳回到最外层的循环,从新执行一次Node<E> h = head, p = h, q;操做 continue restartFromHead; else // 因为head指向的节点其item为null,即head指向的节点不是一个有效节点,所以继续经过head的next继续查找 p = q; } } } final void updateHead(Node<E> h, Node<E> p) { // 判断新设置的节点与原来的head节点是不是同一个,若是不是则将新节点设置为新的head节点, // 而且将原来的head节点的next指向本身。 if (h != p && casHead(h, p)) h.lazySetNext(h); }
通常来讲,head节点都在队列的左边,而tail节点在队列的右边。然而,在ConcurrentLinkedQueue中,可能存在tail节点在左边,而head节点却跑到了右边的状况,这种场景咱们将其称为tail lag behind head
。下面咱们分析一下在什么样的场景下会发生这样的状况,这对于真正理解ConcurrentLinkedQueue很是重要!
添加一个元素
从上图能够看到,在添加完一个元素后,tail节点并有发生改变。此时,假设咱们去获取队列中的元素,队列的结构就变成以下的样子。
取出队列元素
从上面的结构咱们能够看到,此时tail节点滞后于head节点,而且咱们此时经过head节点也没法查找到tail节点,由于该节点已经从队列中移除。当下一次添加元素的时候,就会出现tail节点本身指向本身的状况,此时就须要从新获取到head,将新增的元素追加到head后面。
至此,ConcurrentLinkedQueue的实现咱们已经分析完成了。该类的核心设计就在于CAS的无阻塞以及head/tail节点的延迟更新。尽可能咱们在实际的开发中基本不会去实现一个如此复杂的队列,可是经过分析一个经典无阻塞队列,能够更加好地帮助咱们理解并发编程。若是存在分析不对的地方,还望大神指出。
做者:码农一枚 连接:https://www.jianshu.com/p/32d6526494fd 來源:简书 著做权归做者全部。商业转载请联系做者得到受权,非商业转载请注明出处。