【Interview】深刻理解ConcurrentLinkedQueue源码

概述

  • ConcurrentLinkedQueue是一个基于连接节点的无边界的线程安全队列,它采用先进先出原则对元素进行排序,插入元素放入队列尾部,出队时从队列头部返回元素,利用CAS方式实现的
  • ConcurrentLinkedQueue的结构由头节点和尾节点组成的,都是使用volatile修饰的。每一个节点由节点元素item和指向下一个节点的next引用组成,组成一张链表结构。
  • ConcurrentLinkedQueue继承自AbstractQueue类,实现Queue接口

经常使用方法

  • boolean add(E e) 将指定元素插入此队列的尾部,当队列满时,抛出异常
  • boolean contains(Object o) 判断队列是否包含次元素
  • boolean isEmpty() 判断队列是否为空
  • boolean offer(E e) 将元素插入队列尾部,当队列满时返回false
  • E peek() 获取队列头部元素但不删除
  • E poll() 获取队列头部元素,并删除
  • boolean remove(Object o) 从队列中移指定元素
  • int size() 返回此队列中的元素数量,须要遍历一遍集合。判断队列是否为空时,不推荐此方法

源码分析

// 头节点
 private transient volatile Node<E> head;
 //尾节点
 private transient volatile Node<E> tail;
 
 public ConcurrentLinkedQueue() {
 		//初始化构造时 头结点等于尾结点
        head = tail = new Node<E>(null);
 }
 //建立一个最初包含给定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍历顺序来添加元素。
 public ConcurrentLinkedQueue(Collection<? extends E> c) {
        Node<E> h = null, t = null;
        for (E e : c) {
            checkNotNull(e);
            Node<E> newNode = new Node<E>(e);
            if (h == null)
                h = t = newNode;
            else {
                t.lazySetNext(newNode);
                t = newNode;
            }
        }
        if (h == null)
            h = t = new Node<E>(null);
        head = h;
        tail = t;
    }
    
  private static class Node<E> {
    volatile E item;
    volatile Node<E> next;
    //构造一个新节点
   Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
    }
    boolean casItem(E cmp, E val) {
           return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
     }
     void lazySetNext(Node<E> val) {
     UNSAFE.putOrderedObject(this, nextOffset, val);
}
    boolean casNext(Node<E> cmp, Node<E> val) {
    return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
  }
 private static final sun.misc.Unsafe UNSAFE;
 //当前结点的偏移量
 private static final long itemOffset;
 //下一个结点的偏移量
 private static final long nextOffset;
     static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

复制代码

offer入队操做

初始化

  • 初始化操做就是建立一个新结点,而且headtail相等,结点的数据域为空。
    初始化操做
  • 当第一次入队操做时,检查插入的值是否为空,为空则抛空指针,而后用当前的值建立一个新Node结点。而后执行死循环开始入队操做
    死循环入队操做
  • 首先定义了两个指针p和t,都指向tail
  • 而后定义q结点存储p的next指向的结点,此时p的next是为空没有结点的
    q指向p的next
  • 此时q==null 条件成立。执行p.casNext(null, newNode).以cas方式把p的下一个节点指向新建立出来的结点,而后往下执行,p=t 直接返回true。此时初始化构造的结点的next指向第一次入队成功的结点
    第一次入队成功结构
  • 第二次入队操做,首先也是非空检查,而后建立一个新结点。此时死循环入队操做。定义了两个指针p和t,都指向tail。定义q结点存储p的next指向的结点,此时p的next是不为空的,指向了上面建立的结点。因此q==null不成立。执行else操做

第二次入队q==null不成立

  • 此时p也不等于qp!=t不成立,p和t都是指向tail。由于不成立因此让p=q,此时p和q都是指向第二个结点。再次循循环操做。
    image
  • 而后再次p和t,都指向tail。定义q结点存储p的next指向的结点。此时p的next指向仍是空,因此q=null成立。执行p.casNext(null, newNode).以cas方式把pnext指向新建立出来的结点。
    image
  • 此时if (p != t)是成立的 执行casTail(t, newNode); 指望值是t,更新值新建立的结点。因而更新了tail结点移动到最后添加的结点

image
大概的入队流程就是这样重复上述操做。直到入队成功。 tail结点并非每次都是尾结点。因此每次入队都要经过 tail定位尾结点。

public boolean offer(E e) {
    	//检查结点是为null,若是插入null则抛出空指针
        checkNotNull(e);
        //构造一个新结点
        final Node<E> newNode = new Node<E>(e);
        //死循换,一直到入队成功
        for (Node<E> t = tail, p = t;;) {
        	//p表示队列尾结点,默认状况尾巴=结点就是taill结点
			//获取p结点的下一个节点
            Node<E> q = p.next;
            //q为空,说明p就是taill结点
            if (q == null) {
            	//case方式设置p(p=t)节点的next指向当前节点
                if (p.casNext(null, newNode)) {
                    if (p != t) 
                    	//p不等于t更新尾结点,
                        casTail(t, newNode); //失败了也是没事的,由于表示有其余线程成功更新了tail节点
                    return true;
                }
                //其余线程抢先完成入队,须要从新尝试
            }
            //q不为空,p和相等
            else if (p == q)
                p = (t != (t = tail)) ? t : head;
            else
            	// // 在两跳以后检查尾部更新.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }
复制代码

出队操做

public E poll() {
    	//一个标号
        restartFromHead:
        //死循环方
        for (;;) {
        	// 定义p,h两个指针 都指向head
            for (Node<E> h = head, p = h, q;;) {
            	//获取当前p的值
                E item = p.item;
				//值不为空,且以cas方式设置p的item赋值为空。两个条件成立向下执行
                if (item != null && p.casItem(item, null)) {
                	// p和h不相等则更新头结点,不然直接返回值
                    if (p != h) 
                    	//更新头结点,预期值是h,当p的next指向不为空,更新值是q,为空则是p
                        updateHead(h, ((q = p.next) != null) ? q : p);
                        //返回当前p的值
                    return item;
                }
                //若是item为空说明已经被出队了,而后判断q是否null,是空则说明当前队列为空了。可是q = p.next赋值语句已经执行了
                else if ((q = p.next) == null) {
                //更新头结点,预期值h=head,更新p.此时p的item是空,说明已经被出队了
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }
复制代码
  • 出队操做是以死循环的方式直到出队成功。 第一次出队首先执行for (Node<E> h = head, p = h, q;;) 定义两个指针ph都指向head
    image
  • 而后定义一个item存储p(这里就是head)的值,而后判断item是否为空,此时第一次出队时为空的,则执行 else if ((q = p.next) == null) ,此条件不成立,由于head的next有结点。执行 else if (p == q),此时不相等,由于上个操做已经把q赋值为p的next结点了。因此执行最后的else语句 p = q;在次循环执行。
    image
  • 此时p.item不为空条件成立且以cas方式更新p的item为空 p.casItem(item, null)。若是都两个条件都成立,判断 if (p != h)此时不成立的,更新updateHead(h, ((q = p.next) != null) ? q : p); 预期值是h,更新值是q 由于不为空。并返回item,第一次出队成功。

第一次出队成功结构

总结

  • CoucurrentLinkedQueue的结构由头节点和尾节点组成的,都是使用volatile修饰的。每一个节点由节点元素item和指向下一个节点的next引用组成.
  • 入队:先检查插入的值是否为空,若是是空则抛出异常。而后以死循坏的方式执行一直到入队成功,整个过程大概就是把tail结点的next指向新结点,而后更新tail为新结点便可。可是tail结点并非每次都是尾结点。因此每次入队都要经过tail定位尾结点。
  • 出队:出队操做就是从队列里返回一个最先插入的节点元素,并清空该节点对元素的引用。并非每次出队都更新head节点,当head节点有元素时,直接弹出head节点的元素,并以cas方式设置节点的itemnull,不会更新head节点。只有当head节点没有元素值时,出队操做才会更新head节点,这种作法是为了减小cas方式更新head节点的消耗,提供出队的效率
相关文章
相关标签/搜索