Java 并发容器和框架--ConcurrentLinkedQueue

简述

若是要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另外一种是使用非阻塞算法。若是使用阻塞算法的队列能够用一个锁(出队和入队用同一把锁)或两个锁(出队和入队使用不一样的锁)等方式来实现。非阻塞的实现方式可使用循环CAS的方式来实现。node

ConcurrentLinkedQueue 是一个基于连接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当咱们添加一个元素的时,它会返回队列头部的元素。采用了 wait-free(即CAS)来实现。算法

ConcurrentLinkedQueue的结构

ConcurrentLInkedQueue 由head节点和tail节点组成,每一个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,节点与节点之间就是经过这个next关联起来,从而组成一张列表结构的队列。安全

入队列

1.入队列的过程

入队列就是将入队列节点添加到队列的尾部。入队主要作两件事:第一是将入队节点设置成当前队列尾节点的下一个节点,第二是更新tail节点,若是tail节点的next节点不为空,则将入队节点设置成tail节点,若是tail节点的next节点为空,则将入队节点设置成tail的next节点,因此tail节点不老是尾节点。(不理解?)---元素会先一次入队,而后更新tail节点的指向。this

public boolean offer(E e){
        if (e == null) throw new NullPointerException();
        //入队前,建立一个入队节点
        Node<E> n = new Node<E>(e);

        retry:
        //死循环,入队不成功反复入队
        for (;;){
            //建立一个指向tail节点的引用
            Node<E> t = tail;
            //p用来表示队列的尾节点,默认状况下等于tail节点。
            Node<E> p = t;
            for (int hops = 0;;hops++){
                //得到p节点的下一个节点。
                Node<E> next = succ(p);
                
                //next 节点不为空,说明p不是尾节点,须要更新p后再将它指向next节点
                if(next != null){
                    //循环两次及其以上,而且当前节点仍是不等于尾节点
                    if (hops >HOPS && t!=tail){
                        continue retry;
                    }
                    p =next;
                }
                //若是p是尾节点,则设置p节点的next节点为入队节点。入队节点是n
                else if(p.casNext(null,n)){
                    /***
                     * 若是tail节点有大于等于1个next ,则将入队节点设置成tail节点,更新失败了也没有关系
                     * 由于失败了表示有其余线程成功更新了tail节点
                     */
                    if(hops >= HOPS){
                        casTail(t,n);//更新tail节点,运行失败
                        return true;
                    }else {
                        p = succ(p);
                    }
                    
                }
                
                
            }
            
        }
    }
//JDK1.8 的处理策略
    public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

2.定位尾节点

tail节点并不老是尾节点,全部每次入队都必须先经过tail节点来找到尾节点。尾节点多是tail节点,也多是tail节点的next节点。线程

final Node<E> succ(Node<E> p){
    Node<E> next = p.getNext();
    //p节点和p的next节点都等于空,表示这个队列刚初始化,正准备添加节点,因此须要返回head节点。
    return (p == next) ? head:next;
    
}

3.设置入队节点为尾节点

p.casNext(null,n) 方法用于将入队节点设置为当前队列尾节点的next节点,若是p是null,表示p是当前队列的尾节点,若是不为null,表示其余线程更新了尾节点,则须要从新获取当前队列的尾节点。

4.HOPS 的设计意图

让tail节点永远做为队列的尾节点,逻辑清晰和易懂。可是每次都须要使用循环CAS更新tail节点。若是能减小CAS更新tail节点的次数,就能提升入队的效率。设计

出队列

出队列就是从队列里返回一个节点元素,并清空该节点对元素的引用。队列出队,并非每次都更新head节点,当head节点里又元素时,直接弹出head节点里的元素,而不会更新head节点。缘由也是为了提升效率,减小CAS的次数。rest

//JDK 1.8 的处理策略

  public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;

                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }
相关文章
相关标签/搜索