本文首发于一世流云专栏: https://segmentfault.com/blog...
SynchronousQueue
是JDK1.5时,随着J.U.C包一块儿引入的一种阻塞队列,它实现了BlockingQueue接口,底层基于栈和队列实现:html
没有看错,SynchronousQueue的底层实现包含两种数据结构——栈和队列。这是一种很是特殊的阻塞队列,它的特色简要归纳以下:java
注意:上述的特色1,和咱们以前介绍的Exchanger其实很是类似,能够类比Exchanger的功能来理解。
以前提到,SynchronousQueue根据公平/非公平访问策略的不一样,内部使用了两种不一样的数据结构:栈和队列。咱们先来看下对象的构造,SynchronousQueue只有2种构造器:node
/** * 默认构造器. * 默认使用非公平策略. */ public SynchronousQueue() { this(false); }
/** * 指定策略的构造器. */ public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
能够看到,对于公平策略,内部构造了一个TransferQueue对象,而非公平策略则是构造了TransferStack对象。这两个类都继承了内部类Transferer,SynchronousQueue中的全部方法,其实都是委托调用了TransferQueue/TransferStack的方法:算法
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** * tranferer对象, 构造时根据策略类型肯定. */ private transient volatile Transferer<E> transferer; /** * Shared internal API for dual stacks and queues. */ abstract static class Transferer<E> { /** * Performs a put or take. * * @param e 非null表示 生产者 -> 消费者; * null表示, 消费者 -> 生产者. * @return 非null表示传递的数据; null表示传递失败(超时或中断). */ abstract E transfer(E e, boolean timed, long nanos); } /** * Dual stack(双栈结构). * 非公平策略时使用. */ static final class TransferStack<E> extends Transferer<E> { // ... } /** * Dual Queue(双端队列). * 公平策略时使用. */ static final class TransferQueue<E> extends Transferer<E> { // ... } // ... }
非公平策略由TransferStack类实现,既然TransferStack是栈,那就有结点。TransferStack内部定义了名为SNode的结点:segmentfault
static final class SNode { volatile SNode next; volatile SNode match; // 与当前结点配对的结点 volatile Thread waiter; // 当前结点对应的线程 Object item; // 实际数据或null int mode; // 结点类型 SNode(Object item) { this.item = item; } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long matchOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = SNode.class; matchOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("match")); nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } // ... }
上述SNode结点的定义中有个mode
字段,表示结点的类型。TransferStack一共定义了三种结点类型,任何线程对TransferStack的操做都会建立下述三种类型的某种结点:性能优化
static final class TransferStack<E> extends Transferer<E> { /** * 未配对的消费者 */ static final int REQUEST = 0; /** * 未配对的生产者 */ static final int DATA = 1; /** * 配对成功的消费者/生产者 */ static final int FULFILLING = 2; volatile SNode head; // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = TransferStack.class; headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head")); } catch (Exception e) { throw new Error(e); } } // ... }
SynchronousQueue的入队操做调用了put方法:数据结构
/** * 入队指定元素e. * 若是没有另外一个线程进行出队操做, 则阻塞该入队线程. */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } }
SynchronousQueue的出队操做调用了take方法:并发
/** * 出队一个元素. * 若是没有另外一个线程进行出队操做, 则阻塞该入队线程. */ public E take() throws InterruptedException { E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }
能够看到,SynchronousQueue同样不支持null元素,实际的入队/出队操做都是委托给了transfer方法,该方法返回null表示出/入队失败(一般是线程被中断或超时):框架
/** * 入队/出队一个元素. */ E transfer(E e, boolean timed, long nanos) { SNode s = null; // s表示新建立的结点 // 入参e==null, 说明当前是出队线程(消费者), 不然是入队线程(生产者) // 入队线程建立一个DATA结点, 出队线程建立一个REQUEST结点 int mode = (e == null) ? REQUEST : DATA; for (; ; ) { // 自旋 SNode h = head; if (h == null || h.mode == mode) { // CASE1: 栈为空 或 栈顶结点类型与当前mode相同 if (timed && nanos <= 0) { // case1.1: 限时等待的状况 if (h != null && h.isCancelled()) casHead(h, h.next); else return null; } else if (casHead(h, s = snode(s, e, h, mode))) { // case1.2 将当前结点压入栈 SNode m = awaitFulfill(s, timed, nanos); // 阻塞当前调用线程 if (m == s) { // 阻塞过程当中被中断 clean(s); return null; } // 此时m为配对结点 if ((h = head) != null && h.next == s) casHead(h, s.next); // 入队线程null, 出队线程返回配对结点的值 return (E) ((mode == REQUEST) ? m.item : s.item); } // 执行到此处说明入栈失败(多个线程同时入栈致使CAS操做head失败),则进入下一次自旋继续执行 } else if (!isFulfilling(h.mode)) { // CASE2: 栈顶结点还未配对成功 if (h.isCancelled()) // case2.1: 元素取消状况(因中断或超时)的处理 casHead(h, h.next); else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) { // case2.2: 将当前结点压入栈中 for (; ; ) { SNode m = s.next; // s.next指向原栈顶结点(也就是与当前结点匹配的结点) if (m == null) { // m==null说明被其它线程抢先匹配了, 则跳出循环, 从新下一次自旋 casHead(s, null); s = null; break; } SNode mn = m.next; if (m.tryMatch(s)) { // 进行结点匹配 casHead(s, mn); // 匹配成功, 将匹配的两个结点所有弹出栈 return (E) ((mode == REQUEST) ? m.item : s.item); // 返回匹配值 } else // 匹配失败 s.casNext(m, mn); // 移除原待匹配结点 } } } else { // CASE3: 其它线程正在匹配 SNode m = h.next; if (m == null) // 栈顶的next==null, 则直接弹出, 从新进入下一次自旋 casHead(h, null); else { // 尝试和其它线程竞争匹配 SNode mn = m.next; if (m.tryMatch(h)) casHead(h, mn); // 匹配成功 else h.casNext(m, mn); // 匹配失败(被其它线程抢先匹配成功了) } } } }
整个transfer方法考虑了限时等待的状况,且入队/出队其实都是调用了同一个方法,其主干逻辑就是在一个自旋中完成如下三种状况之一的操做,直到成功,或者被中断或超时取消:性能
为了便于理解,咱们来看下面这个调用示例(假设不考虑限时等待的状况),假设一共有三个线程ThreadA、ThreadB、ThreadC:
①初始栈结构
初始栈为空,head
为栈顶指针,始终指向栈顶结点:
②ThreadA(生产者)执行入队操做
因为此时栈为空,因此ThreadA会进入CASE1,建立一个类型为DATA
的结点:
if (h == null || h.mode == mode) { // CASE1: 栈为空 或 栈顶结点类型与当前mode相同 if (timed && nanos <= 0) { // case1.1: 限时等待的状况 if (h != null && h.isCancelled()) casHead(h, h.next); else return null; } else if (casHead(h, s = snode(s, e, h, mode))) { // case1.2 将当前结点压入栈 SNode m = awaitFulfill(s, timed, nanos); // 阻塞当前调用线程 if (m == s) { // 阻塞过程当中被中断 clean(s); return null; } // 此时m为配对结点 if ((h = head) != null && h.next == s) casHead(h, s.next); // 入队线程null, 出队线程返回配对结点的值 return (E) ((mode == REQUEST) ? m.item : s.item); } // 执行到此处说明入栈失败(多个线程同时入栈致使CAS操做head失败),则进入下一次自旋继续执行 }
CASE1分支中,将结点压入栈后,会调用awaitFulfill
方法,该方法会阻塞调用线程:
/** * 阻塞当前调用线程, 并将线程信息记录在s.waiter字段上. * * @param s 等待的结点 * @return 返回配对的结点 或 当前结点(说明线程被中断了) */ SNode awaitFulfill(SNode s, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 性能优化操做(计算自旋次数) int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (; ; ) { if (w.isInterrupted()) s.tryCancel(); /** * s.match保存当前结点的匹配结点. * s.match==null说明尚未匹配结点 * s.match==s说明当前结点s对应的线程被中断了 */ SNode m = s.match; if (m != null) return m; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } if (spins > 0) spins = shouldSpin(s) ? (spins - 1) : 0; else if (s.waiter == null) // 尚未匹配结点, 则保存当前线程 s.waiter = w; // s.waiter保存当前阻塞线程 else if (!timed) LockSupport.park(this); // 阻塞当前线程 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
此时栈结构以下,结点的waiter
字段保存着建立该结点的线程ThreadA,ThreadA等待着被配对消费者线程唤醒:
③ThreadB(生产者)执行入队操做
此时栈顶结点的类型和ThreadB建立的结点相同(都是DATA
类型的结点),因此依然走CASE1分支,直接将结点压入栈:
④ThreadC(消费者)执行出队操做
此时栈顶结点的类型和ThreadC建立的结点匹配(栈顶DATA
类型,ThreadC建立的是REQUEST
类型),因此走CASE2分支,该分支会将匹配的两个结点弹出栈:
else if (!isFulfilling(h.mode)) { // CASE2: 栈顶结点还未配对成功 if (h.isCancelled()) // case2.1: 元素取消状况(因中断或超时)的处理 casHead(h, h.next); else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) { // case2.2: 将当前结点压入栈中 for (; ; ) { SNode m = s.next; // s.next指向原栈顶结点(也就是与当前结点匹配的结点) if (m == null) { // m==null说明被其它线程抢先匹配了, 则跳出循环, 从新下一次自旋 casHead(s, null); s = null; break; } SNode mn = m.next; if (m.tryMatch(s)) { // 进行结点匹配 casHead(s, mn); // 匹配成功, 将匹配的两个结点所有弹出栈 return (E) ((mode == REQUEST) ? m.item : s.item); // 返回匹配值 } else // 匹配失败 s.casNext(m, mn); // 移除原待匹配结点 } } }
上述isFulfilling方法就是判断结点是否匹配:
/** * 判断m是否已经配对成功. */ static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
ThreadC建立结点并压入栈后,栈的结构以下:
此时,ThreadC会调用tryMatch方法进行匹配,该方法的主要做用有两点:
match
字段置为与当前配对的结点(如上图中,结点m是待配对结点,最终m.math == s
)/** * 尝试将当前结点和s结点配对. */ boolean tryMatch(SNode s) { if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; if (w != null) { // 唤醒当前结点对应的线程 waiter = null; LockSupport.unpark(w); } return true; } return match == s; // 配对成功返回true }
匹配完成后,会将匹配的两个结点弹出栈,并返回匹配值:
if (m.tryMatch(s)) { // 进行结点匹配 casHead(s, mn); // 匹配成功, 将匹配的两个结点所有弹出栈 return (E) ((mode == REQUEST) ? m.item : s.item); // 返回匹配值 }
最终,ThreadC拿到了等待配对结点中的数据并返回,此时栈的结构以下:
注意: CASE2分支中ThreadC建立的结点的mode值并非REQUEST,其mode值为FULFILLING | mode
,FULFILLING | mode
的主要做用就是给栈顶结点置一个标识(二进制为11或10), 表示当前有线程正在对栈顶匹配,这时若是有其它线程进入自旋(并发状况),则CASE2必定失败,由于isFulfilling
的结果必然为true,因此会进入 CASE3分支——跳过栈顶结点进行匹配。
casHead(h, s = snode(s, e, h, FULFILLING | mode))
⑤ThreadB(生产者)唤醒后继续执行
ThreadB被唤醒后,会从原阻塞处继续执行,并进入下一次自旋,在下一次自旋中,因为结点的match
字段已经有了匹配结点,因此直接返回配对结点:
/** * 阻塞当前调用线程, 并将线程信息记录在s.waiter字段上. * * @param s 等待的结点 * @return 返回配对的结点 或 当前结点(说明线程被中断了) */ SNode awaitFulfill(SNode s, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 性能优化操做(计算自旋次数) int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (; ; ) { if (w.isInterrupted()) s.tryCancel(); /** * s.match保存当前结点的匹配结点. * s.match==null说明尚未匹配结点 * s.match==s说明当前结点s对应的线程被中断了 */ SNode m = s.match; if (m != null) return m; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } if (spins > 0) spins = shouldSpin(s) ? (spins - 1) : 0; else if (s.waiter == null) // 尚未匹配结点, 则保存当前线程 s.waiter = w; // s.waiter保存当前阻塞线程 else if (!timed) LockSupport.park(this); // 阻塞当前线程 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
最终,在下面分支中返回:
else if (casHead(h, s = snode(s, e, h, mode))) { // case1.2 将当前结点压入栈 SNode m = awaitFulfill(s, timed, nanos); // 阻塞当前调用线程 if (m == s) { // 阻塞过程当中被中断 clean(s); return null; } // 此时m为配对结点 if ((h = head) != null && h.next == s) casHead(h, s.next); // 入队线程null, 出队线程返回配对结点的值 return (E) ((mode == REQUEST) ? m.item : s.item); }
注意:对于 入队线程(生产者),返回的是它入队时携带的 原有元素值。
SynchronousQueue的公平策略由TransferQueue类实现,TransferQueue内部定义了名为QNode
的结点,一个head
队首指针,一个tail
队尾指针:
/** * Dual Queue(双端队列). * 公平策略时使用. */ static final class TransferQueue<E> extends Transferer<E> { /** * Head of queue */ transient volatile QNode head; /** * Tail of queue */ transient volatile QNode tail; /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was the last inserted node * when it was cancelled. */ transient volatile QNode cleanMe; /** * 队列结点定义. */ static final class QNode { volatile QNode next; // next node in queue volatile Object item; // CAS'ed to or from null volatile Thread waiter; // to control park/unpark final boolean isData; // ... } // ... }
关于TransferQueue的transfer方法就再也不赘述了,其思路和TransferStack大体相同,总之就是入队/出队必须一一匹配,不然任意一方就会加入队列并等待匹配线程唤醒。读者能够自行阅读TransferQueued的源码。
TransferQueue主要用于线程之间的数据交换,因为采用无锁算法,其性能通常比单纯的其它阻塞队列要高。它的最大特色时不存储实际元素,而是在内部经过栈或队列结构保存阻塞线程。后面咱们讲JUC线程池框架的时候,还会再次看到它的身影。