前几天读LinkedTransferQueue(如下简称ltq)的源码,想加深下对松弛型双重队列的理解,无心中发现了这个问题:),通过仔细检查后确认了这是个bug,存在于JDK1.7.0_40和刚发布的JDK8中,去google和oracle官方彷佛也没有搜索到这个问题。java
重现bug:先来重现下这个bug,因为对并发线程的执行顺序预先不能作任何假设,因此极可能根本就不存在所谓的重现错误的“测试用例”,或者说这个测试用例应该是某种“执行顺序"。因此我一开始的作法是copy了一份ltq的源码,经过某个地方加自旋...可是这种方法毕竟要修改源码,后来我发现直接debug进源码就能够轻易重现bug了。node
LinkedTransferQueue:xfer(E e, boolean haveData, int how, long nanos) if (how != NOW) { // No matches available if (s == null) s = new Node(e, haveData); Node pred = tryAppend(s, haveData); if (pred == null) continue retry; // lost race vs opposite mode if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); } return e; // not waiting
在以上06行Node pred = tryAppend(s, havaData) 断点(我是windows下用eclipse调试);
debug如下代码:git
public static void main(String[] args) { final BlockingQueue<Long> queue = new LinkedTransferQueue<Long>(); Runnable offerTask = new Runnable(){ public void run(){ queue.offer(8L); System.out.println("offerTask thread has gone!"); } }; Runnable takeTask = new Runnable(){ public void run(){ try { System.out.println(Thread.currentThread().getId() + " " +queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }; Runnable takeTaskInterrupted = new Runnable(){ public void run(){ Thread.currentThread().interrupt(); try { System.out.println(Thread.currentThread().getId() + " " +queue.take()); } catch (InterruptedException e) { System.out.println(e + " "+Thread.currentThread().getId()); } } }; new Thread(offerTask).start(); new Thread(takeTask).start(); new Thread(takeTaskInterrupted).start(); }
执行到断点处以后,在Debug界面里面有Thread-0、Thread-一、Thread-2三个线程分别指代代码中的offerTask、takeTask、takeTaskInterrupted三者。如今执行三步:github
step 1: Resume Thread-1(没有输出,线程Thread-1本身挂起,等待数据)
step 2: Resume Thread-2(看到相似于 java.lang.InterruptedException 15 的输出)
step 3: Resume Thread-0(输出:offerTask thread has gone!)windows
offer线程已经执行完毕,而后咱们的64L呢,明明Thread-1在等待数据,数据丢失了吗?其实不是,只不过take线程如今没法取得offer线程提交的数据了。数据结构
若是你以为上面的数据丢失还不是什么大问题请在上面的示例下添加以下代码(和你CPU核心数相同的代码行:)并发
.............. new Thread(takeTask).start(); new Thread(takeTask).start(); new Thread(takeTask).start(); new Thread(takeTask).start();
把上面的3个step从新按顺序执行一遍,建议先打开任务管理器,接着忽略断点,让接下来这几个线程跑:)
CPU爆满了吧...实际上是被这几个线程占据了,你去掉几行代码,CPU使用率会有相应的调整。
因此这个bug可能会引发数据暂时遗失和CPU爆满, 只不过貌似发生这种状况的几率极低。oracle
缘由:为何会出现这个bug呢,要想了解缘由必须先深刻分析ltq内部所使用的数据结构和并发策略,ltq内部采用的是一种很是不一样的队列,即松弛型双重队列(Dual Queues with Slack)。eclipse
数据结构:函数
松弛的意思是说,它的head和tail节点相较于其余并发列队要求上更放松,构造它的目的是减小CAS操做的次数(相应的会增长next域的引用次数),举个例子:某个瞬间tail指向的节点后面已经有6个节点了(如下图借用源码的注释-_-|||没画过图),而其余并发队列真正的尾节点最多只能是tail的下一个节点。
* head tail
* | |
* v v
* M -> M -> U -> U -> U -> U->U->U->U->U
收缩的方式是大部分状况下不会用tail的next来设置tail节点,而是第一次收缩N个next(N>=2),而后查看可否2个一次来收缩tail。(head相似,而且head改变一次会致使前“head"节点的next域断裂即以下图)
*"prehead" head tail
* | | |
* v v v
* M M-> U -> U -> U -> U->U->U->U->U
双重是指有两种类型相互对立的节点(Node.isData==false || true),而且我理解的每种节点都有三种状态:
1 INIT(节点构造完成,刚进入队列的状态)
2 MATCHED(节点备置为“知足”状态,即入队节点标识的线程成功取得或者传递了数据)
3 CANCELED(节点被置为取消状态,即入队节点标识的线程由于超时或者中断决定放弃等待)
(bug的缘由就是现有代码中将二、3都当作MATCHED处理,后面会看到把3独立出来就修复了这个问题)
并发策略:
既然使用了松弛的双重队列,那么当take、offer等方法被调用时执行的策略也稍微不一样。
就咱们示例中的代码的流程来看,Thread-0、Thread-一、Thread-2几乎同时进入到了xfer的调用,发现队列为空,因此都构造了本身的node但愿入队,因而三者都从tail开始加入本身的node,咱们在这里的顺序是Thread-1->Thread-2->Thread-0,由于想要入队还要和当前的tail节点进行匹配获得“承认”才能尝试入队,队列为空Thread-1理所固然入队成功而且挂起了本身的线程(park)等待相对的调用来唤醒本身(unpark),而后Thread-2发现队列末尾的node和本身是同一类型的,因而经过了测试把本身也加入了队列,因为自己是中断的因此让本身进入MATCHED状态(bug就是这里了,上面说过CANCEL被当作MATCHED状态处理),接着咱们提交数据的Thread-0来了,发现末尾节点的类型虽然对立但倒是MATCHED状态(假如不为MATCHED会有退回再从head来探测一次的机会),因此认为队列已经为空,前面的调用已经被匹配完了,而后把本身的node入队,这样就造成了以下所示的场景:
* Thread-1 Thread-2 Thread-0
* | | |
* v v v
* REQUEST -> MATCHED -> DATA
好了, 如今Thread-3来了,先探测尾部发现Thread-0的node是类型相反的,因而退回从头部开始从新探测,可是又发现Thread-1的node的类型是相同的,因而再次去探测尾部看看可否入队.......结果形成CPU是停不下来的。
修复:
如上面所说,错误的本质在于当尾部的节点是CANCELED(取消)状态时不能做为被匹配完成的MATCHED状态处理,应该让后来者回退到head去从新测试一次因此重点是对源码作出以下修改(修改放在注释中):
static final class Node { final boolean isData; // false if this is a request node volatile Object item; // initially non-null if isData; CASed to match volatile Node next; volatile Thread waiter; // null until waiting /* static final Object CANCEL = new Object(); final void forgetWaiter(){ UNSAFE.putObject(this, waiterOffset, null); } final boolean isCanceled(){ return item == CANCEL; } */
在Node节点代码中加入标识取消的对象CANCEL。
private E xfer(E e, boolean haveData, int how, long nanos) { if (item != p && (item != null) == isData /*&& item!=Node.CANCEL*/) { // unmatched if (isData == haveData) // can't match
在xfer函数中添加对于为状态为取消的判断。
private E xfer(E e, boolean haveData, int how, long nanos) { Node pred = tryAppend(/*s,*/ haveData); ..... } private Node tryAppend(Node s, boolean haveData) { else if (p.cannotPrecede(/*s, */haveData)) else { /* if(p.isCanceled()) p.forgetContents();*/ if (p != t) { // update if slack now >= 2
添加对于前置节点为取消状态时当前节点的入队策略
final boolean cannotPrecede(boolean haveData) { boolean d = isData; Object x; return d != haveData && (x = item) != this && (x != null) == d; } final boolean cannotPrecede(Node node, boolean haveData) { boolean d = isData; if(d != haveData){ Object x = item; if(x != this && (x!=null) == d && x!= Node.CANCEL) return true; if(item == CANCEL){ if(node.next != this){ node.next = this; return true; } this.forgetContents(); } } node.next = null; return false; }
这一步是关键, 当咱们入队时发现前置节点是类型对立而且取消状态时,咱们就须要多一次的回退探测,因此借用了一下next域来标识这个CANCEL节点,下次通过时或者能够确认它能够当作MATCHED处理(它前面没有INIT节点)或者已经有别的节点粘接在它后面,咱们就进而处理那个节点,总之当咱们老是可以获得正确的行为。
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, /*Node.CANCEL*/)) { // cancel unsplice(pred, s); return e; }
这一处关键点把item的值从原来的s自己修改成咱们新增的CANCEL。
额 代码好乱,关于这个bug定位应该没问题,后面的缘由不少方面都没讲,剩下的还有不少处大大小小的修改=_=,整个修改以后的LinkedTransferQueue在github上,你们有兴趣的话能够参考下,已经经过了 JSR166测试套件。