Java同步数据结构之ConcurrentLinkedQueue

前言

前面介绍的Queue都是经过Lock锁实现的阻塞队列,今天介绍一种非阻塞队列ConcurrentLinkedQueue,所谓非阻塞,其实就是经过CAS代替加锁来实现的高效的非阻塞队列。当许多线程共享对公共集合的访问时,ConcurrentLinkedQueue是一个合适的选择。与大多数其余并发集合实现同样,该类不容许使用空元素。node

ConcurrentLinkedQueue是一个基于链表的无界线程安全的先进先出队列。虽然前面介绍的队列也有基于链表的实现,例如LinkedBlockingQueue以及SynchronousQueue的公平队列实现,可是ConcurrentLinkedQueue的链表实现与它们有本质的差异,LinkedBlockingQueue的链表实现存在老是指向第一个节点的虚拟head节点,以及始终指向队列最后一个节点的tail节点,可是ConcurrentLinkedQueue的head与tail则更加灵活多变,ConcurrentLinkedQueue有以下一些基本约束特性数组

1.CAS入队的最后一个节点的next指向为null。
2.队列中的全部未删除节点是那些item不为null,而且都能从head节点访问到的节点,由于删除节点是经过CAS将其item引用置为null。迭代器会跳过那些item为null的节点。因此若是队列是空的,那么全部item固然都必须为空。
3.head并不老是指向队列的第一个元素,tail也并不老是指向队列的最后一个节点。安全

针对ConcurrentLinkedQueue的headtail节点,有以下一些特性:多线程

  不变性 可变性
head

1.全部未删除的节点均可以从head节点经过succ()方法访问到并发

2.head不会为nullapp

3.head节点的next不会指向自身异步

1.head的item可能为null,也可能不为nullide

2.容许tail滞后于head,即容许从head经过succ()不能访问到tail。高并发

tail

1.最后一个节点老是能够从tail经过succ()方法访问到优化

2.tail不会为null

1.tail的item可能为null,也可能不为null

2.容许tail滞后于head,即容许从head经过succ()不能访问到tail。

3.tail节点的next能够指向自身,也能够不指向自身。

源码解析

进行源码解析以前,先看看ConcurrentLinkedQueue定义的内部节点类Node:

 1 private static class Node<E> {  2     volatile E item;  3     volatile Node<E> next;  4 
 5     /**
 6  * Constructs a new node. Uses relaxed write because item can  7  * only be seen after publication via casNext.  8      */
 9  Node(E item) { 10         UNSAFE.putObject(this, itemOffset, item); 11  } 12 
13     boolean casItem(E cmp, E val) { 14         return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); 15  } 16 
17     void lazySetNext(Node<E> val) { 18         UNSAFE.putOrderedObject(this, nextOffset, val); 19  } 20 
21     boolean casNext(Node<E> cmp, Node<E> val) { 22         return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); 23  } 24 
25     // Unsafe mechanics
26 
27     private static final sun.misc.Unsafe UNSAFE; 28     private static final long itemOffset; 29     private static final long nextOffset; 30 
31     static { 32         try { 33             UNSAFE = sun.misc.Unsafe.getUnsafe(); 34             Class<?> k = Node.class; 35             itemOffset = UNSAFE.objectFieldOffset 36                 (k.getDeclaredField("item")); 37             nextOffset = UNSAFE.objectFieldOffset 38                 (k.getDeclaredField("next")); 39         } catch (Exception e) { 40             throw new Error(e); 41  } 42  } 43 }
Node节点内部类

Node节点内部类很简单,只包含节点数据item以及指向下一个节点的next引用,和其它一些辅助方法,这里不表,接下来看构造方法吧。

构造方法

 1 private transient volatile Node<E> head;  2 
 3 private transient volatile Node<E> tail;  4 
 5 /**
 6  * 构造一个空队列  7  */
 8 public ConcurrentLinkedQueue() {  9     head = tail = new Node<E>(null); 10 } 11 
12 /**
13  * 构造一个包含给定集合元素的ConcurrentLinkedQueue实例 14  */
15 public ConcurrentLinkedQueue(Collection<? extends E> c) { 16     Node<E> h = null, t = null; 17     for (E e : c) { //遍历
18  checkNotNull(e); 19         Node<E> newNode = new Node<E>(e); 20         if (h == null) 21             h = t = newNode; 22         else { 23             //这里使用延迟赋值是由于后面对head、tail的写是volatile写因此能够保证可见性 24             //延迟赋值采用putOrderedObject方法只关注不被重排序便可。
25  t.lazySetNext(newNode); 26             t = newNode; 27  } 28  } 29     if (h == null) 30         h = t = new Node<E>(null); 31     head = h; 32     tail = t; 33 }
View Code

构造方法也很简单,当构造空队列的实例时,其head于tail都指向同一个item为null的虚拟节点;当以给定集合构造实例时,head指向第一个元素,tail指向最后一个元素。

入队offer

ConcurrentLinkedQueue的入队操做add、offer都是经过offer方法实现的,它将指定的元素插入到队列的最后一个元素后面,下面看看其源码:

 1 /**
 2  * 将指定的元素插入到此队列的末尾。  3  * 由于队列是无界的,因此这个方法永远不会返回false。  4  *  5  * 若是指定的元素为null,抛出NullPointerException  6  */
 7 public boolean offer(E e) {  8  checkNotNull(e);  9     final Node<E> newNode = new Node<E>(e); 10 
11     for (Node<E> t = tail, p = t;;) {//死循环,直到成功
12         Node<E> q = p.next; 13         //第一个if块
14         if (q == null) { //p是最后一个节点,尝试加入到队列最后一个元素后面
15             if (p.casNext(null, newNode)) { //CAS 尝试将新节点挂载到最后一个节点的next 16                 // Successful CAS is the linearization point 17                 // for e to become an element of this queue, 18                 // and for newNode to become "live".
19                 if (p != t) //最后一个节点距离tail大于1个节点时,更新tail指向最后一个节点
20                     casTail(t, newNode);  // Failure is OK. 为何容许失败。由于tail可能被其它操做抢先设置好了
21                 return true; 22             } //CAS失败,从新循环尝试。 23             // Lost CAS race to another thread; re-read next
24         }//第二个if块
25         else if (p == q) //到这里说明p节点已经被移除了。 26             // We have fallen off list. If tail is unchanged, it 27             // will also be off-list, in which case we need to 28             // jump to head, from which all live nodes are always 29             // reachable. Else the new tail is a better bet. 30             //tail节点已经被更新就取新的tail,不然取head,从新循环寻找最后一个节点尝试。
31             p = (t != (t = tail)) ? t : head; 32         else //第三个if块 33             //到这里说明tail并非指向的队列中最后一个元素。 34             
35             //这时候若是tail已经被其它操做更新了则取最新的tail,不然取p的next节点继续循环尝试。 36             // Check for tail updates after two hops.
37             p = (p != t && t != (t = tail)) ? t : q; 38  } 39 }
View Code

由于队列的无界的,因此操做在一个死循环中尝试,直到成功返回true。入队操做主要分三种状况(三个if块):

1. 找到了队列的最后一个元素(其next指向null),尝试将新节点加到其next,成功返回失败继续循环尝试。

2. 当前节点被移除了,须要从新定位起点进行循环找到最后一个元素,定位到哪儿取决于操做期间tail是否改变,tail改变则重新tail开始,不然只能从head开始。

3. 当前节点不是最后一个元素也没有被移除,这时候若是tail改变了则重新tail开始,不然直接从当前节点的next节点开始继续循环查找最后一个元素。

值得注意的是,在将新元素节点插入到最后一个节点以后,若是当前tail指向的节点距离队列最后一个节点超过一个节点时须要更新tail使其指向队列最后一个节点,不然不对tail作更新。因而可知,tail要么指向队列最后一个节点,要么指向队列的倒数第二个节点。若是发现有节点next指向自身则会认为是被移除的节点,从而摒弃它从新定位起点寻找最后一个节点。

下面以入队操做的图例来讲明这个队列的变化过程,首先假设队列最开始是一个空队列,那么其head、tail都指向一个item为null的虚拟节点,以下所示:

offer("A")

按照入队的逻辑分析过程为:tail指向的虚拟节点p的next为null,即q==p.next==null,因此直接执行第一个if块:p.casNext(null, newNode)将新节点挂载到虚拟节点p的next,成功以后因为tail指向的虚拟节点p==t,因此不会执行casTail(t, newNode)来更新tail指向刚才入队的节点,结束返回true,这时候队列的状态以下:

如上图所示,在入队第一个元素以后,head和tail亦然指向的当初的虚拟节点,虚拟节点指向刚才入队的节点A,这时候节点A就是队列的最后一个节点,tail并无指向它。

offer("B")

接着咱们再入队一个元素B,按照入队逻辑分析过程为:tail指向的虚拟节点p的next为A不为空,因此第一个if块不成立,而且q=p.next=A !=p,因此第二个if块也不成立,进入第三个if块执行p = (p != t && t != (t = tail)) ? t : q,假设此时tail并无被其它线程更新,因此三目运算的条件不成立,p=q即p==A,进入第二轮循环。第二轮循环此时p指向节点A,其next为null,因此第一个if块成立,将新节点B插入A节点的next,这时候由于p指向节点A,t即tail依然指向虚拟节点1.因此p != t 执行casTail(t, newNode)将更新tail执行新入队的节点B,结束返回true,这时候队列的状态以下:

如上图所示,在节点B入队以后,head依然指向当初的虚拟节点,但tail节点被更新指向到了最后一个节点,因此ConcurrentLinkedQueue会保证tail节点与队列最后一个节点直接的距离不超过一个节点,当超过期会更新tail指向最后一个节点,依次类推,加入咱们再执行offer("C"),那么队列将变成这样:

tail的这种并非每次都须要更新的策略能够提升程序的并发度,尽可能减小没必要要的CAS操做,可是程序的复杂度不言而喻变得更复杂难懂。

出队poll()

 1 public E poll() {  2  restartFromHead:  3     for (;;) {  4         for (Node<E> h = head, p = h, q;;) {  5             E item = p.item;  6             
 7             //第一个if块,p.item 不为null,则将item 设置为null
 8             if (item != null && p.casItem(item, null)) {  9                 //出队的p节点不是head,若是p后面还有节点更新head指向p.next,不然指向p,原head指向自身
10                 if (p != h) // hop two nodes at a time
11                     updateHead(h, ((q = p.next) != null) ? q : p); 12                 return item; 13             }//第二个if块,到这里说明p节点item为null或者已经被其它线程抢先拿走(casItem失败), 14             // p.next == null 说明被其它线程抢先拿走的p是队列中最后一个有效节点,如今队列已经空了
15             else if ((q = p.next) == null) { 16                 updateHead(h, p); //更新head指向p,将原来的head的next指向自身
17                 return null; 18             }//第三个if块,到这里说明p已经被其它线程抢先拿走了,须要从新开始循环
19             else if (p == q) 20                 continue restartFromHead; 21             else//第四个if块,到这里说明p是一个虚拟节点,而且队列不为空,继续看它的下一个节点q
22                 p = q; 23  } 24  } 25 }
View Code

入队逻辑主要在tail节点处作处理,而出队固然是从头部出队,因此在head节点处作处理,出队的逻辑也是由包含4个if块的死循环构成:

1. 从head处出发发现第一个item不为空的节点,则CAS更新item为null,若是出队的节点不是指向head就须要将head指向新的节点(队列不为空指向下一个节点,为空指向出队的节点),原head的next指向自身。

2. 发现队列为空了,更新head指向最后一个无效节点(item,next都为null),原head的next指向自身。

3. 节点抢先被其它线程拿走了,则从新从head处开始寻找第一个有效节点(item不为空)

4. 若是当前节点是一个虚拟节点(好比第一次)而且队列不为空,那么指向下一个节点继续尝试。

因为队列实际是以一个节点实例包装实际数据存储在队列中的,出队时只须要拿到实际的节点数据而不关心其依附的节点,因此出队时其依附的节点并不必定会被移除,而仅仅是将其item置空了,下面以实际的操做来观察队列的变化。

第一次poll()

以上面包含A,B,C三个节点的队列为例,执行第一次poll操做:因为head指向的是一个虚拟节点,其item为null因此第一个if块不成立,队列不为空第二个if也不成立,虚拟节点的next为节点A不为空第三个if块也不成立,因此进入第四个if块p指向虚拟节点的next即节点A,进入下一次循环,节点A的item不为空,执行第一个if块的p.casItem(item, null),假设成功,此时head指向虚拟节点,而p指向的节点A,而且A节点的next为节点B不为空,因此执行updateHead(h, 节点B),因此head会指向节点B,原来的head即虚拟节点1会指向自身,而节点A除了其item被置为null没有任何变化,最终队列以下:

由上图看出,poll拿走A以后,节点A并无被移除而仅仅是其item被置空,而是原head指向的虚拟节点1把next指向了自身从而脱离了队列,新的head指向了节点B.

第二次poll()

若是再次poll:此时head指向节点B,其item不为空,第一个if块成立,执行第一个if块的p.casItem(item, null),假设成功,此时head指向节点B,因此p != h不成立直接返回B,此时队列状态以下:

此时因为head指向的节点B便是被出队的节点数据,因此head并不会被更新,head这种相似tail同样并非每次都更新的策略也同样可以减小CAS的次数提升并发度。

第三次poll()

此时head指向item为null的节点B,因此同第一次poll同样,前三个if块都不成立,第四个if块将p定位到下一个节点C,第二次循环开始,第一个if块知足,执行p.casItem(item, null)将节点C的item置为null,因为此时p指向节点C,head指向节点B,因此p !=h成立,而且节点C的next为null,因此执行updateHead(h, 节点C);最终head指向节点C,节点B指向自身,队列状态以下:

上图证实了ConcurrentLinkedQueue容许tail滞后于head,即容许从head经过succ()不能访问到tail这一特性,以及tail节点的next能够指向自身的特性。

offer("D")

在三次poll以后,队列其实已经空了,而且根据上图head指向C,tail指向的B,若是这时候咱们入队元素D,会怎么样?此时tail指向节点C,节点C的next为null,知足第一个if块,执行p.casNext(null, newNode)将节点D挂载到节点C以后,而且由于tail指向的节点B,因此p!=t执行casTail(t, newNode)将tail指向了节点D.此时队列的状态以下:

此时head指向节点C,tail指向节点D.左边的链表部分已经脱离了队列变得无心义。

offer("E")

若是再次入队一个元素E,此时tail指向节点D,节点D的next为null,因此知足第一个if块,执行p.casNext(null, newNode)将新节点E插入到节点D后面,而且p即节点D == t即tail,因此不会更新tail,直接返回true,最后队列以下:

其它辅助方法

获取但不移除peek()

 1 public E peek() {  2  restartFromHead:  3     for (;;) {  4         for (Node<E> h = head, p = h, q;;) {  5             E item = p.item;  6             //item不为空,或者队列为空
 7             if (item != null || (q = p.next) == null) {  8                 updateHead(h, p); //更新head指向p,原head指向自身
 9                 return item; //返回
10             }//到这里说明当前节点item为空而且队列不为空,若是当前节点p已经被移除,从新循环
11             else if (p == q) 12                 continue restartFromHead; 13             else //到这里说明当前节点是一个item为空的节点,而且队列不为空那么循环其下一个节点
14                 p = q; 15  } 16  } 17 }
View Code

 peek虽然仅仅是获取但不移除节点,可是也会在返回去更新head,以上面offer("A")以后的队列为例,此时执行peek:head指向虚拟节点1,其item为null,其next指向节点A不为null,第一个if条件不成立,假设此时没有其它线程将虚拟节点移除,天然第二个if块也不成立,到第三个if块使p指向节点A,继续下一轮循环,此时p指向的节点A其item不为null,第一个if条件成立,执行updateHead(h, p),将head指向节点A,原来的head指向自身,最后返回“A”,最终队列的状态以下:

此时从head不可访问tail,若是此时继续执行peek,那么head指向节点A,其item不为null,第一个if条件成立,执行updateHead(h, p);因为updateHead里面存在 if (h != p && casHead(h, p))这样的判断,此时h == p因此并不会执行更新head的操做。当队列为空时,peek将返回null。

Node<E> first()

另外一个first方法和peek方法其实逻辑彻底同样,不一样的是first返回的是包装数据的节点,而peek返回的是节点包装的数据,这里就不作分析了。

final Node<E> succ(Node<E> p) 

若是p节点的next指向自身则返回head,不然返回p.next。

public int size()

返回此队列中的元素数量,若是超过了Integer.MAX_VALUE,返回Integer.MAX_VALUE,因为队列的异步性,此方法返回结果并不必定正确,或者说仅仅是一个舜态值。

public boolean contains(Object o)

若是此队列包含指定的元素,则返回true。更正式地说,当且仅当此队列包含至少一个元素e,使得o.equals(e)时返回true。注意比较的是真正的数据而不是节点哦。

public boolean remove(Object o)

从队列中移除一个与指定元素数据相等的实例,即便队列中存在多个相等的也仅仅只移除一个,注意比较的是节点数据。remove方法会使对应节点的item被置为null,而且使该节点的前一个节点的next指向被移除节点的下一个节点,即修改next跳过了这个被置空了item的节点。

public boolean addAll(Collection<? extends E> c)

将指定集合中的全部元素按照指定集合的迭代器返回的顺序追加到此队列的末尾。试图将队列添加到自身会致使IllegalArgumentException即c == this。该方法先把指定集合C中的元素构形成一个链表,最后再把这个链表的head连接到当前ConcurrentLinkedQueue队列的尾部,而后更新tail指向新的尾节点。

public Object[] toArray()

按适当的顺序返回包含此队列中全部非空元素(item不为null)的数组。该方法会分配一个新数组存储队列中每一个节点的item引用,因此调用者能够随意修改返回的数组并不会对原队列产生任何影响。此方法充当数组和集合之间的桥梁API。

public <T> T[] toArray(T[] a)

toArray()方法不一样在于它会尝试将队列中的item数据的类型转换成指定的类型并存储在指定的数组中,若是类型匹配而且指定的数组容量足够的话。不然将按照指定数组的运行时类型和该队列的大小分配一个新数组用于存储队列中item不为null的元素。数组中紧跟在队列末尾的元素将会被设置成null,所以调用者能够经过判断数组中出现的第一个null来判断队列元素已经结束。这种方法容许对输出数组的运行时类型进行精确控制,在某些状况下,还能够用来节省分配成本。toArray(new Object[0])在形式上与toArray()是彻底相同的。

 

该类不能保证批量操做addAll、removeAll、retainAll、containsAll、equals和toArray的原子性。例如,与addAll操做并发操做的迭代器可能只会看到一部分添加的元素。

内存一致性影响:
与其余并发集合同样, 一个线程先将一个对象放入ConcurrentLinkedQueue的动做 happen-before 后来的线程从ConcurrentLinkedQueue中执行访问或者删除该元素的操做。

迭代器

ConcurrentLinkedQueue的迭代器在建立实例的时候就已经拿到了第一个节点以及节点item数据,每一次执行next的时候又准备好下一次迭代的返回对象,同ArrayBlockingQueue同样,它也有一个lastRet变量用来暂时存储当前迭代的节点,用于在it.next调用完成以后,调用it.remove()时避免删除不该该删除的元素。

 1 public Iterator<E> iterator() {  2     return new Itr();  3 }  4 
 5 private class Itr implements Iterator<E> {  6 
 7     /**
 8  * 下一次调用next时对应的节点  9      */
10     private Node<E> nextNode; 11 
12     /**
13  * nextItem引用item对象,由于一旦咱们声明hasNext()中存在一个元素,咱们必须在下一个next()调用中返回它,即便它在hasNext()过程当中被删除。 14      */
15     private E nextItem; 16 
17     /**
18  * 上一次next返回的item对应的节点,用于remove() 19      */
20     private Node<E> lastRet; 21 
22  Itr() { 23  advance(); 24  } 25 
26     /**
27  * 移动到下一个有效节点并返回调用next()时的返回值,若是没有,则返回null。 28      */
29     private E advance() { 30         lastRet = nextNode; 31         E x = nextItem; 32 
33         Node<E> pred, p; 34         if (nextNode == null) { 35             p = first(); 36             pred = null; 37         } else { 38             pred = nextNode; 39             p = succ(nextNode); 40  } 41 
42         for (;;) { 43             if (p == null) { 44                 nextNode = null; 45                 nextItem = null; 46                 return x; 47  } 48             E item = p.item; 49             if (item != null) { 50                 nextNode = p; 51                 nextItem = item; 52                 return x; 53             } else { 54                 // 跳过item为null的节点
55                 Node<E> next = succ(p); 56                 if (pred != null && next != null) 57                     pred.casNext(p, next);//辅助断开无效节点
58                 p = next; 59  } 60  } 61     }
View Code

 

在建立迭代器实例的时候执行了一次advance()方法,准备好了第一个有效节点nextNode,以及其item引用,hasNext直接判断nextNode不为空便可,保证了迭代器的弱一致性,一旦hasNext返回true,那么调用next必定会获得相对应的item,即便在者之间该节点item已经被置为空了。

 1 public boolean hasNext() {  2     return nextNode != null; //检测的nextNode节点
 3 }  4 
 5 public E next() {  6     if (nextNode == null) throw new NoSuchElementException();  7     return advance(); //仍是调用的advance();
 8 }  9 
10 public void remove() { 11     Node<E> l = lastRet; 12     if (l == null) throw new IllegalStateException(); 13     // rely on a future traversal to relink.
14     l.item = null; //将item置为null
15     lastRet = null; 16 }
View Code

 

next方法仍是调用的advance()方法,remove方法借助了lastRet来将item置为null,因为直接操做的队列中的节点,因此迭代器的remove会真正的将队列中的节点item置为空,从而影响ConcurrentLinkedQueue队列自己。

可拆分迭代器Spliterator

ConcurrentLinkedQueue实现了本身的可拆分迭代器CLQSpliterator,从spliterator方法就能够看到:

public Spliterator<E> spliterator() { return new CLQSpliterator<E>(this); }

可拆分迭代器的 tryAdvance、forEachRemaining、trySplit方法都是非阻塞的,tryAdvance获取第一个item不为空的节点数据作指定的操做,forEachRemaining循环遍历当前迭代器中全部没有被移除的节点数据(item不为空)作指定的操做源码都很简单,就不贴代码了,至于它的拆分方法trySplit,其实和LinkedBlockingQueue的拆分方式是同样的,代码都几乎一致,它不是像ArrayBlockingQueue那样每次分一半,而是第一次只拆一个元素,第二次拆2个,第三次拆三个,依次内推,拆分的次数越多,拆分出的新迭代器分的得元素越多,直到一个很大的数MAX_BATCH(33554432) ,后面的迭代器每次都分到这么多的元素,拆分的实现逻辑很简单,每一次拆分结束都记录下拆分到哪一个元素,下一次拆分从上次结束的位置继续往下拆分,直到没有元素可拆分了返回null。

总结

ConcurrentLinkedQueue不少时候都是与LinkedBlockingQueue相对应的,ConcurrentLinkedQueue使用CAS实现了非阻塞的队列操做,而不是像LinkedBlockingQueue那样的双锁实现,ConcurrentLinkedQueue虽然也有head、tail节点的概念,可是不一样于LinkedBlockingQueue,ConcurrentLinkedQueue的head并非老是指向第一个节点,tail也不必定老是指向最后一个节点,只有当当前指针距离第一个/最后一个节点有两个或更多步时,才将更新head/tail,这种减小CAS次数的设计是一种优化,总的来讲它比起LinkedBlockingQueue来讲,ConcurrentLinkedQueue更多的使用与多线程共享访问同一个集合这种场景。

相关文章
相关标签/搜索