并发队列之ConcurrentLinkedQueue

  原本想着直接说线程池的,不过在说线程池以前,咱们必需要知道并发安全队列;由于通常状况下线程池中的线程数量是必定的,确定不会超过某个阈值,那么当任务太多了的时候,咱们必须把多余的任务保存到并发安全队列中,当线程池中的线程空闲下来了,就会到并发安全队列中拿任务;算法

  那么什么是并发安全队列呢?其实能够简单看做是一个链表,而后咱们先办法去存取节点;总的来讲,并发安全队列分为两种,一种是阻塞的,一种是非阻塞的,前者是用锁来实现的,后者用CAS实现的;安全

 

一.简单介绍ConcurrentLinkedQueue并发

  这个队列用法没什么好说的,就相似LinkedList的用法,怎么对一个链表继续增删改查,很少说,咱们就说一下其中几个关键的方法;this

  首先,这个队列是一个线程安全的无界非阻塞队列,其实就是一个单向链表,无界的意思就是没有限制最大长度,非阻塞表示用CAS实现入队和出队操做,咱们打开这个类就能够知道,有一个内部类Node,其中重要的属性以下所示:spa

//用于存放节点的值
volatile E item;
//指向下一个节点
volatile Node<E> next;
//这里也是用的是UNSAFE类,前面说过了,这个类直接提供CAS操做
private static final sun.misc.Unsafe UNSAFE;
//item字段的偏移量
private static final long itemOffset;
//next的偏移量
private static final long nextOffset;

 

 

  而后ConcurrentLinkedQueue中几个重要的属性,好像也没什么重要的,就保存了头节点和尾节点,注意,默认状况下头节点和尾节点都是哨兵节点,也就是一个存null的Node节点线程

//存放链表的头节点
private transient volatile Node<E> head;
//存放链表的尾节点
private transient volatile Node<E> tail;
//UNSAFE对象
private static final sun.misc.Unsafe UNSAFE;
//head字段的偏移量
private static final long headOffset;
//tail字段偏移量
private static final long tailOffset;

 

 

 

 

  下面咱们直接看一些重要方法吧!慢慢分析其中的算法才是关键的3d

 

二.offer方法指针

  这个方法的做用就是在队列末端添加一个节点,若是传递的参数是null,就抛出空指针异常,不然因为该队列是无界队列,该方法会一直返回true,并且该方法使用CAS算法实现的,因此不会阻塞线程;rest

//队列末端添加一个节点
public boolean offer(E e) {
    //若是e为空,那么抛出空指针异常
    checkNotNull(e);
    //将传进来的元素封装成一个节点,Node的构造器中调用UNSAFE.putObject(this, itemOffset, item)把e赋值给节点中的item
    final Node<E> newNode = new Node<E>(e);

    //[1] //这里的for循环从最后的节点开始
    for (Node<E> t = tail, p = t;;) {
      Node<E> q = p.next;
      //[2]若是q为null,说明p就是最后的节点了
        if (q == null) {
            //[3]CAS更新:若是p节点的下一个节点是null,就把写个节点更新为newNode
            if (p.casNext(null, newNode)) {
                //[4]CAS成功,可是这时p==t,因此不会进入到这里的if里面,直接返回true
                //那么何时会走到这里面来呢?实际上是要有另一个线程也在调用offer方法的时候,会进入到这里面来
                if (p != t) 
                    casTail(t, newNode);  
                return true;
            }
        }
        else if (p == q) //[5]
            
            p = (t != (t = tail)) ? t : head;
        else //[6]
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

 

  

  上面执行到[3]的时候,因为头节点和尾节点默认都是指向哨兵节点的,因为这个时候p的下一个节点为null,因此当前线程A执行CAS会成功,下图所示;code

 

 

  若是此时还有一个线程B也来尝试[3]中CAS,因为此时p节点的下一个节点不是null了,因而线程B会跳到[1]出进行第二次循环,而后会到[6]中,因为p和t此时是相等的,因此这里是false,即p=q,下图所示:

 

 

  而后线程B又会跳到[1]处进行第三次循环,因为执行了Node<E> q = p.next,因此此时q指向最后的null,就到了[3]处进行CAS,此次是能够成功的,成功以后以下图所示:

 

 

   

  这个时候由于p!=t,因此能够进入到[4],这里又会进行一个CAS:若是tail和t指向的节点同样,那么就将tail指向新添加的节点,如图所示,这个时候线程B也就执行完了;

 

   

  其实还有[5]没有走到,这个是在poll操做以后才执行的,咱们先跳过,等说完poll方法以后再回头看看;另外说一下,add方法其实就是调用的是offer方法,就很少说了;

 

 

三.poll方法

  这个方法是获取头部的这个节点,若是队列为空则返回null;

public E poll() {
    //这里其实就是一个goto的标记,用于跳出for循环
    restartFromHead:
    //[1]
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            //[2]若是当前节点中存的值不为空,则CAS设置为null
            if (item != null && p.casItem(item, null)) {
                //[3]CAS成功就更新头节点的位置
                if (p != h) 
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            //[4]当前队列为空,就返回null
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            //[5]当前节点和下一个节点同样,说明节点自引用,则从新找头节点
            else if (p == q)
                continue restartFromHead;
            //[6]
            else
                p = q;
        }
    }
}

final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

  

  分为几种状况,第一种状况是线程A调用poll方法的时候,发现队列是空的,即头节点和尾节点都指向哨兵节点,就会直接到[4],返回null

  第二种状况,线程A执行到了[4],此时有一个线程却调用offer方法添加了一个节点,下图所示,那么此时线程A就不会走[4]了,[5]也不知足,因而会到[6]这里来,而后线程A又会跳到[1]处进行循环,此时p指向的节点中item不为null,因此会到[2]中;

 

   

  到了[2]中将p指向的节点中item用CAS设置为null,而后就到了[3],下面第一个图,因为p!=h,q=null,因此最后调用的是updateHead(h,p),这方法:若是头节点和h指向的是同样的,就将头节点指向p,咱们还能看到updateHead方法中h.lazySetNext(h)表示h的下一个节点指向本身,下面图二

 

   到了这里还没完,还记不记得offer方法中有一个地方的代码没有执行的啊!就是这种状况,尾节点本身引用本身,咱们再调用offer会怎么样呢?

  回到offer方法,先会到[1],而后q指向本身这个哨兵节点(注意,此时虽然p指向的节点中存的是null,可是p!=null},因而再到[5],此时的图以下左图所示;此时因为t==tail,因此p=head;

 

   再在offer方法循环一次,此时q指向null,下面左图所示,而后就能够进入[2]中进行CAS,CAS成功,由于此时p!=t,因此还要进行CAS将tail指向新节点,下面右图所示,可让GC回收那个垃圾!

妈耶,这里比较绕!哈哈哈哈哈哈哈哈哈哈哈

 

 

 

四.peek方法

  这个方法的做用就是获取队列头部的元素,只获取不移除,注意这个方法和上面的poll方法的区别啊!

public E peek() {
    //[1]goto标志
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            //[2]
            E item = p.item;
            //[3]
            if (item != null || (q = p.next) == null) {
                updateHead(h, p);
                return item;
            }
            //[4]
            else if (p == q)
                continue restartFromHead;
            else//[5]
                p = q;
        }
    }
}

  final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
  }

 

 

  

  若是队列中为空的时候,走到[3]的时候,就会以下图所示,因为h==p,因此updateHead方法啥也不作,而后返回就返回item为null

 

 

  若是队列不为空,那么以下左图所示,此时进入循环内不知足条件,会到[5]这里,将p指向q,而后再进行一次循环到[3],将q指向p的后一个节点,下面右图所示;

 

 

  而后调用updateHead方法,用CAS将头节点指向p这里,而后将h本身指向本身,下图所示,最后返回item

 

 

五.总结

  其实还有几个方法没说,可是感受比较容易就不浪费篇幅了,有兴趣的能够看看:size方法用于计算队列中节点的数量,但是因为没有加锁,在并发的条件下不许确;remove方法删除某个节点,其实就是遍历而后用equals方法比较item是否是同样,只不过若是存在多个符合条件的节点只删除第一个,而后返回true,不然返回false;contains方法判断队列中是否包含指定item的节点,也就是遍历,很容易;

  最麻烦的就是offer方法和poll方法,offer方法是在队列的最后面添加节点,而poll是获取头节点,而且删除第一个真正的队列节点(注意,节点分为两种,一种是哨兵节点,一种是真正的存了数据的节点啊),还简单的说了一下poll方法和peek方法的区别,后者只是获取,而不删除啊!用下面这个图帮助记忆一下;

相关文章
相关标签/搜索