SynchronousQueue是一种特殊的阻塞队列,不一样于LinkedBlockingQueue、ArrayBlockingQueue和PriorityBlockingQueue,其内部没有任何容量,任何的入队操做都须要等待其余线程的出队操做,反之亦然。若是将SynchronousQueue用于生产者/消费者模式,那么至关于生产者和消费者手递手交易,即生产者生产出一个货物,则必须等到消费者过来取货,方可完成交易。
SynchronousQueue有一个fair选项,若是fair为true,称为fair模式,不然就是unfair模式。fair模式使用一个先进先出的队列保存生产者或者消费者线程,unfair模式则使用一个后进先出的栈保存。java
SynchronousQueue经过将入队出队的线程绑定到队列的节点上,并借助LockSupport的park()和unpark()实现等待,先到达的线程A需调用LockSupport的park()方法将当前线程进入阻塞状态,知道另外一个与之匹配的线程B调用LockSupport.unpark(Thread)来唤醒在该节点上等待的线程A。
基本逻辑:node
在深刻分析其实现机制以前,咱们先了解对于SynchronousQueue可执行哪些操做,因为SynchronousQueue的容量为0,因此一些针对集合的操做,如:isEmpty()/size()/clear()/remove(Object)/contains(Object)等操做都是无心义的,一样peek()也老是返回null。因此针对SynchronousQueue只有两类操做:安全
这两类操做内部都是调用Transferer的transfer(Object, boolean, long)方法,经过第一个参数是否为null,来区分是生产者仍是消费者(生产者不为null)。
针对以上状况,咱们将着重分析Transferer的transfer(Object, boolean, long)方法,这里因为两种不一样的公平模式,会存在两个Transferer的派生类:并发
public SynchronousQueue(boolean fair) { transferer = (fair)? new TransferQueue() : new TransferStack(); }
可见fair模式使用TransferQueue,unfair模式使用TransferStack,下面咱们将分别对这两种模式进行着重分析。app
fair模式使用一个FIFO的队列保存线程,TransferQueue的结构以下:oop
/** Dual Queue */ static final class TransferQueue extends Transferer { /** Node class for TransferQueue. */ static final class QNode { volatile QNode next; // next node in queue volatile Object item; // CAS'ed to or from null volatile Thread waiter; // to control park/unpark final boolean isData; QNode(Object item, boolean isData) { this.item = item; this.isData = isData; } ... } /** Head of queue */ transient volatile QNode head; /** Tail of queue */ transient volatile QNode tail; /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was the last inserted node * when it cancelled. */ transient volatile QNode cleanMe; TransferQueue() { QNode h = new QNode(null, false); // initialize to dummy node. head = h; tail = h; } ... }
以上是TransferQueue的大体结构,能够看到TransferQueue同一个普通的队列,同时存在一个指向队列头部的指针——head,和一个指向队列尾部的指针——tail;cleanMe的存在主要是解决不可清楚队列的尾节点的问题,后面会介绍到;队列的节点经过内部类QNode封装,QNode包含四个变量:this
其余的内容就是一些CAS变量以及操做,下面主要分析TransferQueue的三个重要方法:transfer(Object, boolean, long)、awaitFulfill(QNode, Object, boolean, long)、clean(QNode, QNode)。这三个方法是TransferQueue的核心,入口是transfer(),下面具体看代码。atom
/** * @By Vicky:交换数据,生产者和消费者经过e==null来区分 */ Object transfer(Object e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed int mode = (e == null)? REQUEST : DATA;// 根据e==null判断生产者仍是消费者,对应不一样的mode值 for (;;) { SNode h = head; // 栈为null或者栈顶元素的模式同当前模式,则进行入栈操做 if (h == null || h.mode == mode) { // empty or same-mode // 不等待,则直接返回null,返回以前顺带清理下被取消的元素 if (timed && nanos <= 0) { // can't wait if (h != null && h.isCancelled()) casHead(h, h.next); // pop cancelled node else return null; } else if (casHead(h, s = snode(s, e, h, mode))) {// 入栈,更新栈顶为新节点 // 等待,返回值m==s,则被取消,需清除 SNode m = awaitFulfill(s, timed, nanos); // m==s说明s被取消了,清除 if (m == s) { // wait was cancelled clean(s); return null; } // 帮忙出栈 if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller // 消费者则返回生产者的数据,生产者则返回本身的数据 return mode == REQUEST? m.item : s.item; } } else if (!isFulfilling(h.mode)) { // try to fulfill // 栈顶未开始匹配,则开始匹配 // h被取消,则出栈 if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry // 更新栈顶为新插入的节点,并更新节点的mode为FULFILLING,对应判断是否正在出栈的方法 // 匹配须要先将待匹配的节点入栈,因此不论是匹配仍是不匹配都须要建立一个节点入栈 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { // 循环直到找到一个能够匹配的节点 for (;;) { // loop until matched or waiters disappear // m即与s匹配的节点 SNode m = s.next; // m is s's match // m==null说明栈s以后无元素了,直接将栈顶设置为null,并从新进行最外层的循环 if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } // 将s设置为m的匹配节点,并更新栈顶为m.next,即将s和m同时出栈 SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (mode == REQUEST)? m.item : s.item; } else // lost match // 设置匹配失败,则说明m正准备出栈,帮助出栈 s.casNext(m, mn); // help unlink } } } else { // help a fulfiller // 栈顶已开始匹配,帮助匹配 // 此处的操做逻辑同上面的操做逻辑一致,目的就是帮助上面进行操做,由于此处完成匹配须要分红两步: // a.m.tryMatch(s)和b.casHead(s, mn) // 因此必然会插入其余线程,只要插入的线程也按照这个步骤执行那么就避免了不一致问题 SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } }
从上面的代码能够看出TransferQueue.transfer()的总体流程:spa
下面看看具体如何让一个线程进入阻塞。线程
/** *@ By Vicky:等待匹配,该方法会进入阻塞,直到三种状况下才返回: * a.等待被取消了,返回值为s * b.匹配上了,返回另外一个线程传过来的值 * c.线程被打断,会取消,返回值为s */ Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) { // timed==false,则不等待,lastTime==0便可 long lastTime = (timed)? System.nanoTime() : 0; // 当前线程 Thread w = Thread.currentThread(); // 循环次数,原理同自旋锁,若是不是队列的第一个元素则不自旋,由于压根轮不上他,自旋只是浪费CPU // 若是等待的话则自旋的次数少些,不等待就多些 int spins = ((head.next == s) ? (timed? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted())// 支持打断 s.tryCancel(e); // 若是s的item不等于e,有三种状况: // a.等待被取消了,此时x==s // b.匹配上了,此时x==另外一个线程传过来的值 // c.线程被打断,会取消,此时x==s // 不论是哪一种状况都不要再等待了,返回便可 Object x = s.item; if (x != e) return x; // 等到,直接超时取消 if (timed) { long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; if (nanos <= 0) { s.tryCancel(e); continue; } } // 自旋,直到spins==0,进入等待 if (spins > 0) --spins; // 设置等待线程 else if (s.waiter == null) s.waiter = w; // 调用LockSupport.park进入等待 else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
awaitFulfill()主要涉及自旋以及LockSupport.park()两个关键点,自旋可去了解自旋锁的原理。
自旋锁原理:经过空循环则霸占着CPU,避免当前线程进入睡眠,由于睡眠/唤醒是须要进行线程上下文切换的,因此若是线程睡眠的时间很段,那么使用空循环可以避免线程进入睡眠的耗时,从而快速响应。可是因为空循环会浪费CPU,因此也不能一直循环。自旋锁通常适合同步快很小,竞争不是很激烈的场景。
LockSupport.park()可到API文档进行了解。
下面再看看如何清除被取消的节点。
/** *@By Vicky:清除节点被取消的节点 */ void clean(QNode pred, QNode s) { s.waiter = null; // forget thread // 若是pred.next!=s则说明s已经出队了 while (pred.next == s) { // Return early if already unlinked QNode h = head; QNode hn = h.next; // Absorb cancelled first node as head // 从队列头部开始遍历,遇到被取消的节点则将其出队 if (hn != null && hn.isCancelled()) { advanceHead(h, hn); continue; } QNode t = tail; // Ensure consistent read for tail // t==h则队列为null if (t == h) return; QNode tn = t.next; if (t != tail) continue; // 帮助其余线程入队 if (tn != null) { advanceTail(t, tn); continue; } // 只能出队非尾节点 if (s != t) { // If not tail, try to unsplice // 出队方式很简单,将pred.next指向s.next便可 QNode sn = s.next; if (sn == s || pred.casNext(s, sn)) return; } // 若是s是队尾元素,那么就须要cleanMe出场了,若是cleanMe==null,则只需将pred赋值给cleanMe便可, // 赋值cleanMe的意思是等到s不是队尾时再进行清除,毕竟队尾只有一个 // 同时将上次的cleanMe清除掉,正常状况下此时的cleanMe已经不是队尾了,由于当前须要清除的节点是队尾 // (上面说的cleanMe实际上是须要清除的节点的前继节点) QNode dp = cleanMe; if (dp != null) { // Try unlinking previous cancelled node QNode d = dp.next; QNode dn; // d==null说明须要清除的节点已经没了 // d==dp说明dp已经被清除了,那么dp.next也一并被清除了 // 若是d未被取消,说明哪里出错了,将cleanMe清除,不清除这个节点了 // 后面括号将清除cleanMe的next出局,前提是cleanMe.next没有已经被出局 if (d == null || // d is gone or d == dp || // d is off list or !d.isCancelled() || // d not cancelled or (d != t && // d not tail and (dn = d.next) != null && // has successor dn != d && // that is on list dp.casNext(d, dn))) // d unspliced casCleanMe(dp, null); // dp==pred说明cleanMe.next已经其余线程被更新了 if (dp == pred) return; // s is already saved node } else if (casCleanMe(null, pred)) return; // Postpone cleaning s } }
清除节点时有个原则:不能清除队尾节点。因此若是对尾节点须要被清除,则将其保存到cleanMe变量,等待下次进行清除。在清除cleanMe时可能说的有点模糊,由于涉及到太多的并发会出现不少状况,因此if条件太多,致使难以分析所有状况。
以上就是TransferQueue的操做逻辑,下面看看后进先出的TransferStack。
unfair模式使用一个LIFO的队列保存线程,TransferStack的结构以下:
/** Dual stack */ static final class TransferStack extends Transferer { /* Modes for SNodes, ORed together in node fields */ /** Node represents an unfulfilled consumer */ static final int REQUEST = 0;// 消费者请求数据 /** Node represents an unfulfilled producer */ static final int DATA = 1;// 生产者生产数据 /** Node is fulfilling another unfulfilled DATA or REQUEST */ static final int FULFILLING = 2;// 正在匹配中... /** 只须要判断mode的第二位是否==1便可,==1则正在匹配中...*/ static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; } /** Node class for TransferStacks. */ static final class SNode { volatile SNode next; // next node in stack volatile SNode match; // the node matched to this volatile Thread waiter; // to control park/unpark Object item; // data; or null for REQUESTs int mode; // Note: item and mode fields don't need to be volatile // since they are always written before, and read after, // other volatile/atomic operations. SNode(Object item) { this.item = item; } } /** The head (top) of the stack */ volatile SNode head; static SNode snode(SNode s, Object e, SNode next, int mode) { if (s == null) s = new SNode(e); s.mode = mode; s.next = next; return s; } }
TransferStacks比TransferQueue的结构复杂些。使用一个head指向栈顶元素,使用内部类SNode封装栈中的节点信息,SNode包含5个变量:
SNode的5个变量,三个是volatile的,另外两个item和mode没有volatile修饰,代码注释给出的解释是:对这两个变量的写老是发生在volatile/原子操做的以前,读老是发生在volatile/原子操做的以后。
上面提到SNode.mode的三个常量表示栈中节点的状态,f分别为:
其余内部基本同TransferQueue,不一样之处是当匹配到一个节点时并不是是将被匹配的节点出栈,而是将匹配的节点入栈,而后同时将匹配上的两个节点一块儿出栈。下面咱们参照TransferQueue来看看TransferStacks的三个方法:transfer(Object, boolean, long)、awaitFulfill(QNode, Object, boolean, long)、clean(QNode, QNode)。
/** * @By Vicky:交换数据,生产者和消费者经过e==null来区分 */ Object transfer(Object e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed int mode = (e == null)? REQUEST : DATA;// 根据e==null判断生产者仍是消费者,对应不一样的mode值 for (;;) { SNode h = head; // 栈为null或者栈顶元素的模式同当前模式,则进行入栈操做 if (h == null || h.mode == mode) { // empty or same-mode // 不等待,则直接返回null,返回以前顺带清理下被取消的元素 if (timed && nanos <= 0) { // can't wait if (h != null && h.isCancelled()) casHead(h, h.next); // pop cancelled node else return null; } else if (casHead(h, s = snode(s, e, h, mode))) {// 入栈,更新栈顶为新节点 // 等待,返回值m==s,则被取消,需清除 SNode m = awaitFulfill(s, timed, nanos); // m==s说明s被取消了,清除 if (m == s) { // wait was cancelled clean(s); return null; } // 帮忙出栈 if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller // 消费者则返回生产者的数据,生产者则返回本身的数据 return mode == REQUEST? m.item : s.item; } } else if (!isFulfilling(h.mode)) { // try to fulfill // 栈顶未开始匹配,则开始匹配 // h被取消,则出栈 if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry // 更新栈顶为新插入的节点,并更新节点的mode为FULFILLING,对应判断是否正在出栈的方法 // 匹配须要先将待匹配的节点入栈,因此不论是匹配仍是不匹配都须要建立一个节点入栈 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { // 循环直到找到一个能够匹配的节点 for (;;) { // loop until matched or waiters disappear // m即与s匹配的节点 SNode m = s.next; // m is s's match // m==null说明栈s以后无元素了,直接将栈顶设置为null,并从新进行最外层的循环 if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } // 将s设置为m的匹配节点,并更新栈顶为m.next,即将s和m同时出栈 SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (mode == REQUEST)? m.item : s.item; } else // lost match // 设置匹配失败,则说明m正准备出栈,帮助出栈 s.casNext(m, mn); // help unlink } } } else { // help a fulfiller // 栈顶已开始匹配,帮助匹配 // 此处的操做逻辑同上面的操做逻辑一致,目的就是帮助上面进行操做,由于此处完成匹配须要分红两步: // a.m.tryMatch(s)和b.casHead(s, mn) // 因此必然会插入其余线程,只要插入的线程也按照这个步骤执行那么就避免了不一致问题 SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } }
从上面的代码能够看出TransferStack.transfer()的总体流程:
下面看看TransferStack是如何让一个线程进入阻塞。
/** *@ By Vicky:等待匹配,逻辑大体同TransferQueue可参考阅读 */ SNode awaitFulfill(SNode s, boolean timed, long nanos) { long lastTime = (timed)? System.nanoTime() : 0; Thread w = Thread.currentThread(); SNode h = head; // 计算自旋的次数,逻辑大体同TransferQueue int spins = (shouldSpin(s)? (timed? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) s.tryCancel(); // 若是s的match不等于null,有三种状况: // a.等待被取消了,此时x==s // b.匹配上了,此时match==另外一个节点 // c.线程被打断,会取消,此时x==s // 不论是哪一种状况都不要再等待了,返回便可 SNode m = s.match; if (m != null) return m; if (timed) { // 等待 long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; if (nanos <= 0) { s.tryCancel(); continue; } } // 自旋 if (spins > 0) spins = shouldSpin(s)? (spins-1) : 0; // 设置等待线程 else if (s.waiter == null) s.waiter = w; // establish waiter so can park next iter // 等待 else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
逻辑基本同TransferQueue,不一样之处是经过修改SNode的match变量标示匹配,以及取消。
下面再看看如何清除被取消的节点。
/** * @By Vicky:清除节点 */ void clean(SNode s) { s.item = null; // forget item s.waiter = null; // forget thread // 清除 SNode past = s.next; if (past != null && past.isCancelled()) past = past.next; // Absorb cancelled nodes at head // 从栈顶节点开始清除,一直到遇到未被取消的节点,或者直到s.next SNode p; while ((p = head) != null && p != past && p.isCancelled()) casHead(p, p.next); // Unsplice embedded nodes // 若是p自己未取消(上面的while碰到一个未取消的节点就会退出,但这个节点和past节点之间可能还有取消节点), // 再把p到past之间的取消节点都移除。 while (p != null && p != past) { SNode n = p.next; if (n != null && n.isCancelled()) p.casNext(n, n.next); else p = n; } }
以上即所有的TransferStack的操做逻辑。
看完了TransferQueue和TransferStack的逻辑,SynchronousQueue的逻辑基本清楚了。
SynchronousQueue的应用场景得看具体业务需求,J.U.C下有一个应用案例:Executors.newCachedThreadPool()就是使用SynchronousQueue做为任务队列。