Lock锁源码分析java
HashMap源码分析-jdk1.6和jdk1.8的区别算法
ArrayList源码分析源码分析
ConcurrentLinkedQueue是一个线程安全的队列,它采用的是 CAS 算法来进行实现,也就是说它是非阻塞的;队列中的元素按照 FIFO(先进先出)的原则对元素进行排列,此外,它是一个无界队列;添加元素的时候,在链表的尾部进行添加,获取元素的时候,从链表的头部获取。它内部采用的单向链表的形式来表示,链表的节点是定义在ConcurrentLinkedQueue的一个内部类。性能
ConcurrentLinkedQueue 的类图以下所示:this
能够看到 ConcurrentLinkedQueue 实现了 Queue 接口和实现了继承了 AbstractQueue 类,而 Itr 和 Node则是它的一个内部类;spa
Queue 接口只是定义了一些队列的公共方法,以下:.net
public interface Queue<E> extends Collection<E> { // 添加元素 boolean add(E e); // 添加元素 boolean offer(E e); // 删除元素 E remove(); // 删除并返回第一个元素,若是队列为空,则返回 null E poll(); // 返回第一个元素,若是不存在,则抛出NoSuchElementException异常 E element(); // 返回第一个元素,但不删除,若是队列为空,则返回 null E peek(); }
AbstractQueue 类也继承了 Queue接口,提供了某些方法的实现,以下所示:线程
public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> { public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } public E remove() { E x = poll(); if (x != null) return x; else throw new NoSuchElementException(); } ............................... }
接下来看看 ConcurrentLinkedQueue 的一个实现过程:
首先看一下队列中链表节点的定义,链表中的节点使用一个 Node 内部类来表示:
private static class Node<E> { // 节点中的元素 volatile E item; // 下一个节点,没有上一个节点,表示它是一个单向链表的形式 volatile Node<E> next; // 构造一个节点 Node(E item) { UNSAFE.putObject(this, itemOffset, item); } // 使用 CAS 的方式设置节点的元素 boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } // 设置下一个节点 void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } // 采用 CAS 的方式设置下一个节点 boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // Unsafe 类的一些初始化 }
能够看到 Node 类的定义比较简单,值得注意的地方是 E item 元素和 Node next 节点都使用了 volatile 来修饰,这说明了元素或某个节点被一个线程修改了以后,其余的线程是立马看到修改后的值的。
接下来看一下 ConcurrentLinkedQueue 类中的属性和方法:
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable { // 头节点, private transient volatile Node<E> head; // 尾节点,尾节点不必定是链表的最后一个节点 private transient volatile Node<E> tail; // 构造 public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); } ...................... }
以上能够看到,头节点 head 和尾节点 tail 都被 volatile 修饰,节点被一个线程修改了以后,是会把修改的最新的值刷新到主内存中去,当其余线程去读取该值的时候,会中主内存中获取最新的值,也就是一个线程修改了以后,对其余线程是当即可见的。
当使用空的构造其是实例化一个对象的时候,会建立一个节点,节点的值为 null(添加的时候,是不能为null的),并把头节点和尾节点都指向该节点,以下所示:
以后向链表中添加元素,添加元素的时候,是在链表的尾部进行添加,添加元素有两个方法 add() 和 offer(),add() 会调用 offer() 进行添加,这两个方法永远都会返回 true,因此不要使用 true | false 来判断是否添加成功;
public boolean add(E e) { return offer(e); }
public boolean offer(E e) { // 判空,为空则抛出空指针异常 checkNotNull(e); // 建立要添加的节点 final Node<E> newNode = new Node<E>(e); // 无限循环,入队不成功,则反复入队 // t 表示 tail 节点 // p 表示链表的尾节点,默认等于 tail 节点 for (Node<E> t = tail, p = t;;) { // q 为尾节点的下一个节点 Node<E> q = p.next; // 若是尾节点的下一个节点为空,则表示 p 为尾节点 if (q == null) { // CAS 设置尾节点的下一个节点为新添加的节点,若是设置失败,在再次尝试 if (p.casNext(null, newNode)) { // 若是tail节点有大于等于1个的 next 节点,则更新 tail 节点,将新添加的节点设置为 tail 节点 if (p != t) // 至关于循环两次更新一次 tail 节点 casTail(t, newNode); // 新添加的节点设置为tail节点,容许失败,失败了表示有其余线程成功更新了tail节点 return true; } } else if (p == q) // 只有在尾节点和尾节点的下一个节点为空的状况下成立 p = (t != (t = tail)) ? t : head; else // 把 tail节点设置为为尾节点,再次循环设置下一个节点 p = (p != t && t != (t = tail)) ? t : q; } }
以上方法就是向队列中添加元素的方法,该方法也是该类中比较难理解的方法;
从上面的代码能够看出,入队主要作两件事情:第一是将新添加的节点设置成当前队列尾节点的下一个节点;第二是更新tail节点,若是tail节点的next节点不为空,则将入队节点设置成tail节点,若是tail节点的next节点为空,则将入队节点设置成tail的next节点,因此tail节点不老是尾节点。
接下来经过图的方法来看一下元素入队的一个过程:
初始时队列以下:
1. 添加第一个节点:
for循环:
for (Node<E> t = tail, p = t;;) { Node<E> q = p.next;
接下来进行 q==null 的判断:
if (q == null) { if (p.casNext(null, newNode)) { if (p != t) casTail(t, newNode); return true; } }
此时,q==null, 也就是说尾节点的下一个节点为空,此时把新添加的节点设置为尾节点的下一个节点,有由于,p节点和 t节点是相等的,因此不会更新 tail 节点;设置成功后,链表以下:
2.添加第二个节点:
if (q == null) { .... } else if (p == q) // p = q = null p = (t != (t = tail)) ? t : head; else p = (p != t && t != (t = tail)) ? t : q;
此时, q 不为空,有p和q不想等,因此只会走 else 分支,p != t 为false,t != tail 也为false,全部 把 q 赋给 p ,此时队列以下:
以后,在走到for循环,此时 q == null, 因此,会把新添加的节点设置为 p 的下一个节点:
if (q == null) { if (p.casNext(null, newNode)) { if (p != t) casTail(t, newNode); return true; } }
又由于此时,t 节点仍是指向第一个节点,p 指向第一个节点的next节点,t 和 p 不想等,因此执行 casTail(t, newNode); 新添加的节点设置为tail 节点,设置成功后,链表以下所示:
3.添加第三个元素,添加成功后,链表以下所示:
4.添加第四个节点,添加成功后,链表以下所示:
、
至关于在添加元素时,每循环两次才会更新一次 tail 节点。
为何不让 tail 节点永远为队列的尾节点,若是让 tail 节点永远为队列的尾节点,则实现的代码会更少且逻辑也会更清晰,这是由于,若是让 tail 永远为队列的尾节点,则每次都须要使用循环CAS更新tail节点,若是能减小更新 tail 节点的次数,入队的性能岂不是更高?因此说并非每次入队都须要更新尾节点,只有在tail节点和尾节点不想等的状况下才更新,这样能减小更新此时,提升效率。
下面来看看获取元素的操做,ConcurrentLinkedQueue是一个FIFO的队列,因此获取元素的时候,老是获取到队列的第一个元素;获取元素有两个方法,poll() 和 peek(),poll()方法获取元素的时候,返回链表的第一个元素,并删除,而 peek() 方法获取元素的时候则不删除,下面看一下获取元素的主要代码逻辑:
public E poll() { // 循环跳转,goto语法 restartFromHead: for (;;) { // p 表示要出队的节点,默认为 head节点 for (Node<E> h = head, p = h, q;;) { // 出队的元素 E item = p.item; // 若是出队的元素不为空,则把要出队的元素设置null,不更新head节点;若是出队元素为null或者cas设置失败,则表示有其余线程已经进行修改,则须要重写获取 if (item != null && p.casItem(item, null)) { if (p != h) // 当head元素为空,才会更新head节点,这里循环两次,更新一次head节点 updateHead(h, ((q = p.next) != null) ? q : p); // 更新head节点 return item; } // 队列为空,返回null else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) continue restartFromHead; // 把 p 的next节点赋值给p else p = q; } } }
下面经过图的方式来查看获取节点的过程:
首先队列以下所示:
以后去获取节点 item0,首先进行标记:
for (Node<E> h = head, p = h, q;;) { E item = p.item; .............................. }
标记以下:
由于此时 p.item 不为空,所示 CAS 设置 p.item 为null,走第一个if语句,又由于此时 h 节点的 item 不为空,即 p 和 h 相等,因此不会更新头节点,直接返回 p 节点中的元素 item0,以下图所示:
if (item != null && p.casItem(item, null)) { // 不会执行 if (p != h) // updateHead(h, ((q = p.next) != null) ? q : p); return item; }
接下来再获取 item1,进行标记,以下所示:
由于 p.item 为空,因此不知足条件 if (item != null && p.casItem(item, null)), 以后执行 else if ((q = p.next) == null),此时,q指向了 item1,
又由于 q 不为空,全部不会更新head节点,以后会执行最后一个else语句:p = q,第一次循环结束,开始第二次循环,又进行标记,此时标记以下:
由于 p.item 不为空,因此走以下代码逻辑,经过 CAS 把 p.item 设置为null,
if (item != null && p.casItem(item, null)) { if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; }
由于此时,head节点元素为空,即 p 和 h 节点不相等,因此会更新头节点,又由于 p.next 即 item2 不为空,因此把 p.next 即 item2 设置为 head节点,设置成功后,队列以下所示:
以后获取item2也是一样的逻辑,获取 items2 后队列以下:
获取 item3 后以下所示:
以上就是从队列中获取元素的主要代码逻辑,从上可知,head节点不必定就是队列的第一个含有元素的节点,也不是每次获取元素后就更新head节点,只有当head中的元素为空的时候才更新head节点,这和添加 offer() 方法中更新tail节点相似,减小 CAS 更新head节点的次数,出队的效率会更高。
ConcurrentLinkedQueue 经过 isEmpty来判断队列是否为空,代码以下:
public boolean isEmpty() { return first() == null; }
Node<E> first() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { boolean hasItem = (p.item != null); if (hasItem || (q = p.next) == null) { updateHead(h, p); return hasItem ? p : null; } else if (p == q) continue restartFromHead; else p = q; } } }
isEmpty 方法会判断链表的第一个元素是否为空来进行判断的。
public int size() { int count = 0; // succ() 获取下一个元素 for (Node<E> p = first(); p != null; p = succ(p)) if (p.item != null) if (++count == Integer.MAX_VALUE) break; return count; }
size()方法会遍历全部的链表来查看有多少个元素。
对于在开发的时候,若是须要判断是否为空,则应该使用 isEmpty 而不该该使用 size() > 0 的方式,由于 size()会变量整个链表,效率较低。
ConcurrentLinkedQueue 类还有其余的一些方法,只有理解了入队和出队的方法 offer() 和 poll() 方法,其余方法就很好理解了。
/** * @ Author:lenovo * @ Date:Created in 下午 7:22 2018/7/17 0017 */ public class ConcurrentLinkedQueueTest { public static void main(String[] args) throws InterruptedException { new ConcurrentLinkedQueueTest().testConcurrentLinkedQueue(); Thread.sleep(5000L); } int num = 0; public ConcurrentLinkedQueue testConcurrentLinkedQueue(){ ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>(); for(int i = 0; i < 100; i++) { new Thread(() -> { num++; queue.offer(num); }).start(); } return queue; } }