public SynchronousQueue() { this(false); } public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
上面是SynchronousQueue的两个构造方法,能够看出SynchronousQueue的底层实际是建立一个转移队列,而且这个队列有公平和非公平两种模式,下面咱们先来看看公平模式的转移队列是怎么实现的。java
2.SynchronousQueue的公平模式node
不管是非公平的TransferStack队列仍是公平的TransferQueue队列,都是对父类Transfer的实现:安全
//Transferer是SynchronousQueue中的一个内部类,定义了一个转移方法 //TransferStack和TransferQueue都要实现该转移方法 abstract static class Transferer<E> { abstract E transfer(E e, boolean timed, long nanos); }
TransferQueue的是一个底层由链表实现的FIFO队列,其结点的定义以下:数据结构
static final class QNode { volatile QNode next; // 结点的后继 volatile Object item; // 结点中存储的数据 volatile Thread waiter; // 等待的线程 //是不是数据,此标识要与判断入队的操做是什么操做 //即判断相邻两次入队的操做是否相同,如果相邻两次入队 //的是不相同的操做(一次put一次take)那么就要进行配对后移除出队。 //不然,将操做入队便可 final boolean isData; //构造方法 QNode(Object item, boolean isData) { this.item = item; this.isData = isData; } //CAS更新后继结点 boolean casNext(QNode cmp, QNode val) { return next == cmp && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } //CAS更新结点中的数据 boolean casItem(Object cmp, Object val) { return item == cmp && UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } //取消操做,即尝试将item更新为结点自己 void tryCancel(Object cmp) { UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this); } //判断节点是否应该取消,即item是否是结点自己 boolean isCancelled() { return item == this; } //判断当前结点是否已再也不队列之中 boolean isOffList() { return next == this; } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = QNode.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
知道了TransferQueue的底层实现,在看看TransferQueue中重要的属性及构造方法:并发
static final class TransferQueue<E> extends Transferer<E> { //队列的队首结点 transient volatile QNode head; //队列的队尾结点 transient volatile QNode tail; //清除标记 transient volatile QNode cleanMe; TransferQueue() { QNode h = new QNode(null, false); // initialize to dummy node. head = h; tail = h; } private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; private static final long cleanMeOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = TransferQueue.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail")); cleanMeOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("cleanMe")); } catch (Exception e) { throw new Error(e); } } }
3.公平模式下的take和put操做app
//将指定元素添加到此队列,若有必要则等待另外一个线程接收它 public void put(E e) throws InterruptedException { //判断转移元素是否为null,也就是说同步队列中不能对null元素进行转移 //由于e元素是否为null,是transfer方法中判断入队操做是put仍是take //的一个依据 if (e == null) throw new NullPointerException(); //put方法中传入transfer的e不为null //调用transfer 默认不进行超时等待,若是发生中断则抛出中断异常 if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } } //获取并移除此队列的头,若有必要则等待另外一个线程插入它 public E take() throws InterruptedException { //take方法中传入transfer方法的e为null E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }
由take方法和put方法的源码可知,在公平模式下,这两个方法本质都是对TransferQueue中transfer方法的调用,下面来看看TransferQueue中的transfer方法是如何实现的?oop
E transfer(E e, boolean timed, long nanos) { QNode s = null; //根据e是否为null,标记本次入队的是什么操做 //e为null,isData为false,不然isData为true boolean isData = (e != null); for (;;) { //自旋 QNode t = tail; //获取队尾 QNode h = head; //获取队首 //判断队首或队尾是否为null,实际上h和t不会为null(构造方法中已经初始化过) if (t == null || h == null) // saw uninitialized value continue; // spin //判断是不是空队列,或者入队操做与队尾相同(即与队尾都是put或者take操做) //不管是空队列仍是与队尾相同操做,说明只能将操做入队,而不能进行 //匹配(操做相同,没法配对,配对要操做不一样才行) if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; //获取队尾的后继结点 //判断队尾是否已经改变,便是否有其余线程抢先进行入队或者配对操做 if (t != tail) // inconsistent read continue; //从新进行入队或者配对判断 //判断是否是有新增的与队尾相同的入队操做,如果,更新成新的队尾 if (tn != null) { // lagging tail advanceTail(t, tn); //尝试更新队尾结点,失败也没有关系 continue; } //判断是否超时(timed为是否设置了超时等待操做,nanos为剩余的等待时间) if (timed && nanos <= 0) // can't wait return null; //超时后直接返回null //判断s是否为null,如果则以e为item建立一个结点 if (s == null) s = new QNode(e, isData); //尝试更新队尾的后继结点为s结点,失败的话就循环继续尝试直到成功 if (!t.casNext(null, s)) // failed to link in continue; //在队尾新增了一个后继结点,那么队尾就应该是这个后继结点了 //所以须要将s更新为新的队尾结点 advanceTail(t, s); // swing tail and wait //空旋或者阻塞直到匹配的操做到来 Object x = awaitFulfill(s, e, timed, nanos); //到这一步,说明阻塞的操做已经配对成功或者操做已经被取消了,线程被唤醒了 //判断是配对成功了,仍是操做被取消了 //如果操做被取消,会设置s.item=s if (x == s) { // wait was cancelled clean(t, s); //清除结点s return null; //操做已经被取消,直接返回null } //判断结点s是否还在队列中 //如果还在队列中,且又配对操做成功,说明s结点应该是新的head //而且若s.item不为null还须要将s.item设为s自身,等待线程赋null if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head //判断是否 if (x != null) s.item = s; s.waiter = null; } return (x != null) ? (E)x : e; // 入队的操做与以前的队尾的操做不一样,能够进行配对(take配put,或 // put配take) } else { //获取队首的后继结点 QNode m = h.next; //出现t与队尾不一样,m为null,h与队首不一样,说明队列发生了改变 //即队列出现了其余线程抢先执行了入队或者配对的操做 if (t != tail || m == null || h != head) continue; //循环从新来 //获取后继结点的item Object x = m.item; //isData == (x != null)是判断m结点对应的操做与当前操做是否相同 //x == m 则是判断m结点是否被取消 //!m.casItem(x, e) 则是判断尝试更新m结点的item为e是否成功 //以上三个判断有一个为真,那就说明m结点已经不在队列中或是被取消或是匹配过了 if (isData == (x != null) || x == m || !m.casItem(x, e)) { advanceHead(h, m); //旧队首已通过时,更新队首 continue; } //更新head,到此处说明配对操做已经成功,应该将m结点变为head //而h结点则须要移除出列 advanceHead(h, m); // successfully fulfilled LockSupport.unpark(m.waiter); //唤醒m结点对应的线程 // return (x != null) ? (E)x : e; } } } //尝试更新队尾结点 void advanceTail(QNode t, QNode nt) { if (tail == t) UNSAFE.compareAndSwapObject(this, tailOffset, t, nt); } //等待结点被对应的操做匹配 Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { //根据timed标识即超时时间计算截止时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); //湖区当前线程的引用 //计算出自旋时间 int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); //死循环,确保操做能成功 for (;;) { //判断当前线程是否被中断,如果被中断那么取消 if (w.isInterrupted()) s.tryCancel(e); Object x = s.item; //获取结点对应的item对象 //判断结点的item是否仍是对象e //生成s节点的时候,s.item是等于e的,当取消操做(item变为s)或者 //匹配了操做的时候会进行更改 if (x != e) return x; //判断是否设置了超时等待功能 if (timed) { nanos = deadline - System.nanoTime(); //计算剩余的等待时间 if (nanos <= 0L) { //判断是否应超时,若超时直接尝试取消操做 s.tryCancel(e); continue; } } //自旋时间控制 if (spins > 0) --spins; //自旋时间减小 //判断是否须要设置等待线程 else if (s.waiter == null) s.waiter = w; else if (!timed) //没有设置超时等待功能,直接让让线程一直挂起 LockSupport.park(this); //设置了超时等待功能,就判断等待时间是否大于自旋的最大时间 //若大于自旋最大时间那就让线程阻塞一段时间 //不然让线程自旋一段时间 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } } //尝试取消当前结点对应的操做,即将结点中item值更新成结点自身(this) void tryCancel(Object cmp) { UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this); } //更新队首head指针,并将原队首结点的next指向其自身,方便GC void advanceHead(QNode h, QNode nh) { if (h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh)) h.next = h; // forget old next } //如果某个结点对应的操做被取消,那么这个操做对应的结点须要移除出队 //也就是清理无效结点。 /** * clean方法中若是删除的节点不是尾节点,那么能够直接进行删除, * 若是删除的节点是尾节点,那么用cleanMe标记须要删除的节点的前驱, * 这样在下一轮的clean的过程将会清除打了标记的节点。 */ void clean(QNode pred, QNode s) { s.waiter = null; //操做已经被取消,那么就不会有等待线程了 //判断s是不是pred的后继结点 while (pred.next == s) { // Return early if already unlinked QNode h = head; //队首引用 QNode hn = h.next; //队首的后继结点 //判断hn是否被取消,若被取消,那么更新head if (hn != null && hn.isCancelled()) { advanceHead(h, hn); continue; } QNode t = tail; // 队尾结点 //判断是不是空队列 if (t == h) return; QNode tn = t.next; //队尾的后继 //队尾改变,说明队列被其余线程修改过了 if (t != tail) continue; //t有后继结点,说明队尾该更新了 if (tn != null) { advanceTail(t, tn); continue; } //判断s结点是不是队尾结点 //若不是队尾结点,则只需将s的前驱pred结点的next指向s结点的后继结点sn //即成功将s结点从队列中清除 if (s != t) { // If not tail, try to unsplice QNode sn = s.next; if (sn == s || pred.casNext(s, sn)) return; } //到此处,说明要清除的结点就是队列的队尾,而队尾不能直接删除 QNode dp = cleanMe; //获取标识有要删除的结点的前驱结点 //判断是否有须要删除的结点,dp不为null,即cleanMe存在 //说明队列以前有队尾结点须要删除 if (dp != null) { // Try unlinking previous cancelled node QNode d = dp.next; QNode dn; //当cleanMe处于如下四种情形时,cleanMe失效 //(1)cleanMe的后继而空(cleanMe 标记的是须要删除节点的前驱) //(2)cleanMe的后继等于自身, //(3)须要删除节点的操做没有被取消, //(4)被删除的节点不是尾节点且其后继节点有效 if (d == null || // d is gone or d == dp || // d is off list or !d.isCancelled() || // d not cancelled or (d != t && // d not tail and (dn = d.next) != null && // has successor dn != d && // that is on list dp.casNext(d, dn))) // d unspliced casCleanMe(dp, null); //cleanMe失效,赋值为null if (dp == pred) return; // s is already saved node } else if (casCleanMe(null, pred)) //将队尾的前驱结点标记成cleanMe return; // Postpone cleaning s } } //用于判断结点是否还在队列中,若后继结点是自身,说明已经再也不队列中 //不然还在队列中 boolean isOffList() { return next == this; }
由上面对公平的SynchronousQueue的分析可知,底层是使用队列来实现公平模式的,而且线程安全是经过CAS方式实现的。公平的TransferQueue队列中会将连续相同的操做入队,而不一样的操做则会进行配对,即TransferQueue队列中要么没有存放操做,要么存放都是相同的操做(要么都是take,要么都是put),当有一个与队列中的操做不相同的操做时,队列会自动将队首操做与之进行匹配。大体流程(操做被取消未显示,方便理解)以下图所示:学习
4.SynchronousQueue的非公平模式this
SynchronousQueue的非公平模式是基于栈来实现的,咱们知道栈是后进先出的(LIFO),也就是说这里的非公平模式与ReentrantLock中的非公平模式区别巨大,后来先服务这太不公平了。spa
先来看看非公平模式的具体实现TransferStack的底层数据结构链表中结点的定义:
static final class SNode { volatile SNode next; // 链表的后继结点 volatile SNode match; // 匹配的结点 volatile Thread waiter; // 等待的线程 Object item; // 数据 int mode; //结点的模式 SNode(Object item) { this.item = item; } //CAS方式更新后继结点 boolean casNext(SNode cmp, SNode val) { return cmp == next && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } //尝试匹配结点,匹配成功就将等待匹配结点对应的线程唤醒继续后续操做 boolean tryMatch(SNode s) { if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; if (w != null) { // waiters need at most one unpark waiter = null; LockSupport.unpark(w); } return true; } return match == s; } //尝试取消结点,将match更新成结点自身即标志着结点已处于取消状态 void tryCancel() { UNSAFE.compareAndSwapObject(this, matchOffset, null, this); } //判断是否被取消 boolean isCancelled() { return match == this; } private static final sun.misc.Unsafe UNSAFE; private static final long matchOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = SNode.class; matchOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("match")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
知道告终点的基本结构,在来看看TransferStack中的重要属性及构造方法:
//TransferStack中没有构造方法,所以只有一个空构造 /** * TransferStack中定义了三个标记:REQUEST表示消费者,DATA表示生产者, * FULFILLING表示操做匹配状态。任何线程对TransferStack的操做都属于 * 上述3种状态中的一种 */ static final class TransferStack<E> extends Transferer<E> { //用于标记结点的类型,表明结点是消费者(对应take操做) static final int REQUEST = 0; //用于标记结点的类型,表明结点是生产者(对应put操做) static final int DATA = 1; //用于标记结点的状态,表明结点正处于匹配状态 static final int FULFILLING = 2; //栈顶结点 volatile SNode head; // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = TransferStack.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); } catch (Exception e) { throw new Error(e); } } }
5.非公平模式下的take和put操做
有上文对公平模式的学习,咱们知道take和put操做最终调用的都是transfer方法,只不过公平模式调用的是TransferQueue中的转移方法,非公平模式则是调用TransferStack中的转移方法
/** * transfer的大体过程:将一个操做与栈顶的操做进行配对 * 如果配对不成功(take对应put,或put对应take),那么直接将该操做 * 入栈;如果配对成功,即此时应该将栈顶操做出栈,可是不能直接出栈( * 若此时其余线程进行入栈,那么直接出栈会出问题),而是先将匹配的操做 * 标记成FULFILLING状态(匹配状态)而后入栈;当其余线程检查到这个匹配 * 的过程,就会先帮助配对,在去执行自身的操做 */ @SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed //判断当前操做是何种模式,REQUEST对应take,DATA对应put int mode = (e == null) ? REQUEST : DATA; for (;;) { //死循环 SNode h = head; //获取栈顶 //判断栈是否为空栈,若不为空栈,那么栈顶操做模式与当前操做模式是否相同 //如果栈顶操做与当前操做模式相同,那么就须要入栈该操做 if (h == null || h.mode == mode) { // empty or same-mode //判断是否设置了超时等待,若设置了超时等待,那等待时间是否还有剩余 if (timed && nanos <= 0) { //判断栈顶是否为null,且栈顶是否被取消 //若不为null,且栈顶操做被取消,那么就尝试更新栈顶操做为其后继 if (h != null && h.isCancelled()) casHead(h, h.next); // pop cancelled node else //栈顶为null或者没被取消 return null; //没有设置超时等待,或超时等待时间还未到,则尝试将当前操做入栈 } else if (casHead(h, s = snode(s, e, h, mode))) { //s入栈成功,那么就自旋或挂起等待匹配的操做到来在被唤醒 SNode m = awaitFulfill(s, timed, nanos); //操做自旋或挂起 //到这一步说明操做已经被取消,或者匹配到了对应的操做 //匹配结点m为结点本身自己,说明操做被取消 if (m == s) { // wait was cancelled clean(s); //清除被取消的结点 return null; } //如果栈不是空栈且栈顶的后继是s,说明操做s和其匹配的操做m,都还在栈中 //须要将其移除出栈,即更新栈顶为s的后继 if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller //根据操做类型返回对应的数据,最后返回的其实都是put进去的数据 return (E) ((mode == REQUEST) ? m.item : s.item); } //当前的操做与栈顶操做相匹配,进行匹配,将操做的状态更新成FULFILLING并入栈 } else if (!isFulfilling(h.mode)) { // try to fulfill if (h.isCancelled()) // 判断栈顶是否被取消 casHead(h, h.next); // 栈顶被取消更新栈顶 //入栈新结点s,s的后继为h,而且s处于FULFILLING状态(匹配状态) else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // loop until matched or waiters disappear SNode m = s.next; // s的后继m,由于s处于匹配状态,m多是其配对的结点 //配对结点为null //这里有些疑问,没搞懂什么情形下会出现m为null(按理不该该出现的) if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } //m的后继结点 SNode mn = m.next; //判断m和s配对是否成功,配对成功尝试更新栈顶为m的后继 if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match //没有匹配成功,说明m结点不是s的配对结点,继续向后寻找 s.casNext(m, mn); // help unlink } } //到这说明栈顶处于匹配状态,那么帮助其匹配 } else { // help a fulfiller SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } } //将结点对应的线程自旋或挂起以等待匹配的操做到来 SNode awaitFulfill(SNode s, boolean timed, long nanos) { //计算截止时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); //计算自旋时间 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) s.tryCancel(); //尝试取消操做 SNode m = s.match; //获取匹配的结点 //匹配结点不为null,说明要么匹配到了,要么被取消,均可以结束挂起了 if (m != null) return m; //判断是否设置了超时等待 if (timed) { //计算剩余时间 nanos = deadline - System.nanoTime(); //设置了超时等待,若截止时间已到,那么操做要被取消 if (nanos <= 0L) { s.tryCancel(); continue; } } //自旋时间递减 if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0; //设置等待线程 else if (s.waiter == null) s.waiter = w; // establish waiter so can park next iter //没有设置超时等待,线程一直挂起,知道被配对操做唤醒 else if (!timed) LockSupport.park(this); //线程挂起 //判断超时等待时间是否大于自旋最大时间 //如果大于,就直接将线程挂起一段时间 //不然只自旋不挂起 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } } //须要自旋的情形 //当前结点为栈顶 //栈顶为null //栈顶正在匹配中 boolean shouldSpin(SNode s) { SNode h = head; return (h == s || h == null || isFulfilling(h.mode)); } //尝试配对 boolean tryMatch(SNode s) { //尝试将当前结点的配对结点更新为s,更新成功就唤醒当前结对应的线程 if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; if (w != null) { // waiters need at most one unpark waiter = null; LockSupport.unpark(w); } return true; } return match == s; }
经过上面对非公平模式下TransferStack中transfer方法的分析,可知非公平模式实际上能够说是很是的不公平,由于TransferStack是利用栈的后进先出性质来进行配对的,也就说基本上都是后来先服务。其大体过程能够简化成以下图所示: