若是要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另外一种是使用非阻塞算法。若是使用阻塞算法的队列能够用一个锁(出队和入队用同一把锁)或两个锁(出队和入队使用不一样的锁)等方式来实现。非阻塞的实现方式可使用循环CAS的方式来实现。node
ConcurrentLinkedQueue 是一个基于连接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当咱们添加一个元素的时,它会返回队列头部的元素。采用了 wait-free(即CAS)来实现。算法
ConcurrentLInkedQueue 由head节点和tail节点组成,每一个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,节点与节点之间就是经过这个next关联起来,从而组成一张列表结构的队列。安全
入队列就是将入队列节点添加到队列的尾部。入队主要作两件事:第一是将入队节点设置成当前队列尾节点的下一个节点,第二是更新tail节点,若是tail节点的next节点不为空,则将入队节点设置成tail节点,若是tail节点的next节点为空,则将入队节点设置成tail的next节点,因此tail节点不老是尾节点。(不理解?)---元素会先一次入队,而后更新tail节点的指向。this
public boolean offer(E e){ if (e == null) throw new NullPointerException(); //入队前,建立一个入队节点 Node<E> n = new Node<E>(e); retry: //死循环,入队不成功反复入队 for (;;){ //建立一个指向tail节点的引用 Node<E> t = tail; //p用来表示队列的尾节点,默认状况下等于tail节点。 Node<E> p = t; for (int hops = 0;;hops++){ //得到p节点的下一个节点。 Node<E> next = succ(p); //next 节点不为空,说明p不是尾节点,须要更新p后再将它指向next节点 if(next != null){ //循环两次及其以上,而且当前节点仍是不等于尾节点 if (hops >HOPS && t!=tail){ continue retry; } p =next; } //若是p是尾节点,则设置p节点的next节点为入队节点。入队节点是n else if(p.casNext(null,n)){ /*** * 若是tail节点有大于等于1个next ,则将入队节点设置成tail节点,更新失败了也没有关系 * 由于失败了表示有其余线程成功更新了tail节点 */ if(hops >= HOPS){ casTail(t,n);//更新tail节点,运行失败 return true; }else { p = succ(p); } } } } }
//JDK1.8 的处理策略 public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null) { // p is last node if (p.casNext(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } else if (p == q) // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; } }
tail节点并不老是尾节点,全部每次入队都必须先经过tail节点来找到尾节点。尾节点多是tail节点,也多是tail节点的next节点。线程
final Node<E> succ(Node<E> p){ Node<E> next = p.getNext(); //p节点和p的next节点都等于空,表示这个队列刚初始化,正准备添加节点,因此须要返回head节点。 return (p == next) ? head:next; }
p.casNext(null,n) 方法用于将入队节点设置为当前队列尾节点的next节点,若是p是null,表示p是当前队列的尾节点,若是不为null,表示其余线程更新了尾节点,则须要从新获取当前队列的尾节点。
让tail节点永远做为队列的尾节点,逻辑清晰和易懂。可是每次都须要使用循环CAS更新tail节点。若是能减小CAS更新tail节点的次数,就能提升入队的效率。设计
出队列就是从队列里返回一个节点元素,并清空该节点对元素的引用。队列出队,并非每次都更新head节点,当head节点里又元素时,直接弹出head节点里的元素,而不会更新head节点。缘由也是为了提升效率,减小CAS的次数。rest
//JDK 1.8 的处理策略 public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; } } }