正如上篇文章聊聊 JDK 阻塞队列源码(ReentrantLock实现)所说,队列在咱们现实生活中队列随处可见,最经典的就是去银行办理业务,超市买东西排队等。今天楼主要讲的就是JDK中安全队列的另外一种实现使用CAS算法实现的安全队列。
html
在JDK中的队列都实现了 java.util.Queue 接口,下面就是楼主要说的无锁版本的队列实现:java
队列名字 | 是否加锁 | 数据结构 |
---|---|---|
LinkedTransferQueue | 否 | 链表 |
ConcurrentLinkedQueue | 否 | 链表 |
LinkedTransferQueue 的原理就是经过使用原子变量compare and swap(简称“CAS”)这种不加锁的方式来实现的进行并发控制,LinkedTransferQueue是一个无界的安全队列,其长度能够无限延伸,固然其带来的问题也是显而易见的。
node
add
方法:
算法
public boolean add(E e) { xfer(e, true, ASYNC, 0); return true; }
offer
方法:
安全
public boolean offer(E e) { xfer(e, true, ASYNC, 0); return true; }
poll
方法:
数据结构
public E poll() { return xfer(null, false, NOW, 0); }
take
方法:
并发
public E take() throws InterruptedException { E e = xfer(null, false, SYNC, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }
从上面代码中能够看出,这些方法最终都指向了 xfer
方法,只不过传入的不一样的参数。app
/** * Implements all queuing methods. See above for explanation. * * @param e the item or null for take * @param haveData true if this is a put, else a take * @param how NOW, ASYNC, SYNC, or TIMED * @param nanos timeout in nanosecs, used only if mode is TIMED * @return an item if matched, else e * @throws NullPointerException if haveData mode but e is null */
从源码的 doc 注释中能够知道
第一个参数,若是是 put 类型,就是实际的值,反之就是 null。
第二个参数,是否包含数据,put 类型就是 true,take 就是 false。
第三个参数,执行类型,有当即返回的NOW,有异步的ASYNC,有阻塞的SYNC, 有带超时的 TIMED。
第四个参数,只有在 TIMED类型才有做用。less
接下来咱们来看看 xfer
究竟是何方神圣异步
private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) throw new NullPointerException(); Node s = null; // the node to append, if needed retry: for (;;) { // restart on append race // 从 head 开始 for (Node h = head, p = h; p != null;) { // find & match first node // head 的类型。 boolean isData = p.isData; // head 的数据 Object item = p.item; // item != null 有 2 种状况,一是 put 操做, 二是 take 的 itme 被修改了(匹配成功) // (itme != null) == isData 要么表示 p 是一个 put 操做, 要么表示 p 是一个还没匹配成功的 take 操做 if (item != p && (item != null) == isData) { // 若是当前操做和 head 操做相同,就没有匹配上,结束循环,进入下面的 if 块。 if (isData == haveData) // can't match break; // 若是操做不一样,匹配成功, 尝试替换 item 成功, if (p.casItem(item, e)) { // match // 更新 head for (Node q = p; q != h;) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } // 唤醒原 head 线程. LockSupport.unpark(p.waiter); return LinkedTransferQueue.<E>cast(item); } } // 找下一个 Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } // 若是这个操做不是马上就返回的类型 if (how != NOW) { // No matches available // 且是第一次进入这里 if (s == null) // 建立一个 node s = new Node(e, haveData); // 尝试将 node 追加对队列尾部,并返回他的上一个节点。 Node pred = tryAppend(s, haveData); // 若是返回的是 null, 表示不能追加到 tail 节点,由于 tail 节点的模式和当前模式相反. if (pred == null) // 重来 continue retry; // lost race vs opposite mode // 若是不是异步操做(即马上返回结果) if (how != ASYNC) // 阻塞等待匹配值 return awaitMatch(s, pred, e, (how == TIMED), nanos); } return e; // not waiting } }
代码有点长,其实逻辑很简单。就是找到 head
节点,若是 head
节点是匹配的操做,就直接赋值,若是不是,添加到队列中。
注意:队列中永远只有一种类型的操做,要么是 put
类型, 要么是 take
类型.
与 LinkedTransferQueue
同样,ConcurrentLinkedQueue 同样是采用原子变量实现的并发控制,ConcurrentLinkedQueue
是一个基于连接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当咱们添加一个元素的时候,它会添加到队列的尾部,当咱们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法来实现。
add
方法:
public boolean add(E e) { return offer(e); }
offer
方法:
ConcurrentLinkedQueue
是无界的,因此offer
永远返回true,不能经过返回值来判断是否入队成功,
public boolean offer(E e) { // 校验是否为空 checkNotNull(e); //入队前,建立一个入队节点 final Node<E> newNode = new Node<E>(e); //循环CAS直到入队成功。 // 一、根据tail节点定位出尾节点(last node); // 二、将新节点置为尾节点的下一个节点, // 三、更新尾节点casTail。 for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; //判断p是否是尾节点,tail节点不必定是尾节点,判断是否是尾节点的依据是该节点的next是否是null if (q == null) { // p is last node if (p.casNext(null, newNode)) { //设置P节点的下一个节点为新节点,若是p的next为null,说明p是尾节点,casNext返回true; // 若是p的next不为null,说明有其余线程更新过队列的尾节点,casNext返回false。 // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } else if (p == q) //p节点是null的head节点恰好被出队,更新head节点时h.lazySetNext(h)把旧的head节点指向本身 // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. //判断tail节点有没有被更新,若是没被更新,1)p=q:p指向p.next继续寻找尾节点; //若是被更新了,2)p=t:P赋值为新的tail节点 p = (p != t && t != (t = tail)) ? t : q; } }
poll
方法:
public E poll() { restartFromHead: //两层循环 for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } //队列为空,更新head节点 else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) //p节点是null的head节点恰好被出队,更新head节点时h.lazySetNext(h);把旧的head节点指向本身。 //从新从head节点开始 continue restartFromHead; else p = q; //将p执行p的下一个节点 } } } //更新head节点 final void updateHead(Node<E> h, Node<E> p) { //经过CAS将head更新为P if (h != p && casHead(h, p)) h.lazySetNext(h);//把旧的head节点指向本身 } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); }
remove
方法:
public boolean remove(Object o) { if (o != null) { Node<E> next, pred = null; // 循环CAS直到删除节点 for (Node<E> p = first(); p != null; pred = p, p = next) { boolean removed = false; E item = p.item; if (item != null) { if (!o.equals(item)) { next = succ(p); continue; } // 经过CAS删除节点 removed = p.casItem(item, null); } next = succ(p); if (pred != null && next != null) // unlink pred.casNext(p, next); if (removed) return true; } } return false; }
本文主要介绍了两种CAS算法实现的安全队列,然而稳定性要较高的系统中,为了防止生产者速度过快,致使内存溢出,一般是不建议选择无界队列的。固然楼主水平有限,文章中难免有纰漏,望小伙伴谅解并指出,在技术的道路上一块儿成长。