系列传送门:java
SynchronousQueue是一个不存储元素的阻塞队列,每一个插入的操做必须等待另外一个线程进行相应的删除操做,反之亦然,所以这里的Synchronous指的是读线程和写线程须要同步,一个读线程匹配一个写线程。node
你不能在该队列中使用peek方法,由于peek是只读取不移除,不符合该队列特性,该队列不存储任何元素,数据必须从某个写线程交给某个读线程,而不是在队列中等待倍消费,很是适合传递性场景。面试
SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。编程
该类还支持可供选择的公平性策略,默认采用非公平策略,当队列可用时,阻塞的线程均可以争夺访问队列的资格。并发
public class TestSync { public static void main (String[] args) { SynchronousQueue<Integer> queue = new SynchronousQueue<>(true); Producer producer = new Producer(queue); Customer customer = new Customer(queue); producer.start(); customer.start(); } } class Producer extends Thread{ SynchronousQueue<Integer> queue; Producer(SynchronousQueue<Integer> queue){ this.queue = queue; } @SneakyThrows @Override public void run () { while(true){ int product = new Random().nextInt(500); System.out.println("生产产品, id : " + product); System.out.println("等待3s后给消费者消费..."); TimeUnit.SECONDS.sleep(3); queue.put(product); TimeUnit.MILLISECONDS.sleep(100); } } } class Customer extends Thread{ SynchronousQueue<Integer> queue; Customer(SynchronousQueue<Integer> queue){ this.queue = queue; } @SneakyThrows @Override public void run () { while(true){ Integer product = queue.take(); System.out.println("消费产品, id : " + product); System.out.println(); } } } // 打印结果 生产产品, id : 194 等待3s后给消费者消费... 消费产品, id : 194 生产产品, id : 140 等待3s后给消费者消费... 消费产品, id : 140 生产产品, id : 40 等待3s后给消费者消费... 消费产品, id : 40
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // put方法 : e是生产者传递给消费者的元素 if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } }
public E take() throws InterruptedException { // take方法: 表示消费者等待生产者提供元素 E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }
put方法和take方法都调用了transferer的transfer方法,他们的区别在哪呢?咱们能够发现:app
这一点必须明确,transfer是根据这一点来判断读or写线程,接着决定是否匹配等,直接来看下Transfer类吧。dom
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private transient volatile Transferer<E> transferer; }
SynchronousQueue内部维护了volatile修饰的Transferer变量,它的核心操做都将委托给transferer。ide
abstract static class Transferer<E> { /** * Performs a put or take. */ abstract E transfer(E e, boolean timed, long nanos); }
Transferer类中定义了抽象方法transfer,该方法用于转移元素,是最最核心的方法,咱们先大概了解一下定义:工具
// 默认使用非公平策略 public SynchronousQueue() { this(false); } /** * 指定公平策略, */ public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
能够发现,在构造SynchronousQueue的时候,能够传入fair参数指定公平策略,有下面两种选择:oop
他俩即是Transfer类的实现,SynchronousQueue相关操做也都是基于这俩类的,咱们接下来将会重点分析这俩的实现。
static final class TransferQueue<E> extends Transferer<E> { static final class QNode{...} transient volatile QNode head; transient volatile QNode tail; transient volatile QNode cleanMe; TransferQueue() { QNode h = new QNode(null, false); // 初始化虚拟头节点 head = h; tail = h; }
QNode定义了队列中存放的节点:
static final class QNode { volatile QNode next; // next域 volatile Object item; // 存放数据,用CAS设置 volatile Thread waiter; // 标记在该节点上等待的线程是哪一个 final boolean isData; // isData == true表示写线程节点 QNode(Object item, boolean isData) { this.item = item; this.isData = isData; } // ...省略一系列CAS方法 }
E transfer(E e, boolean timed, long nanos) { QNode s = null; // constructed/reused as needed // 判断当前节点的模式 boolean isData = (e != null); // 循环 for (;;) { QNode t = tail; QNode h = head; if (t == null || h == null) // saw uninitialized value continue; // spin // 队列为空 或 当前节点和队列尾节点类型相同,则将节点入队 if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; // 说明有其余节点入队,致使读到的tail不一致,continue if (t != tail) // inconsistent read continue; // 有其余节点入队,可是tail是一致的,尝试将tn设置为尾节点,continue if (tn != null) { // lagging tail advanceTail(t, tn); // 若是tail为t,设置为tn continue; } // timed == true 而且超时了, 直接返回null if (timed && nanos <= 0) // can't wait return null; // 构建一个新节点 if (s == null) s = new QNode(e, isData); // 将当前节点插入到tail以后,如不成功,则continue if (!t.casNext(null, s)) // failed to link in continue; // 将当前节点设置为新的tail advanceTail(t, s); // swing tail and wait // 这个方法下面会分析:自旋或阻塞线程,直到知足s.item != e Object x = awaitFulfill(s, e, timed, nanos); // x == s 表示节点被取消、中断或超时 if (x == s) { // wait was cancelled clean(t, s); return null; } // isOffList用于判断节点是否已经出队 next == this if (!s.isOffList()) { // not already unlinked // 尝试将s节点设置为head advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? (E)x : e; // 队列不为空 且节点类型不一样,一个读一个写,就能够匹配了 } else { // complementary-mode // 队头节点 QNode m = h.next; // node to fulfill // 这里若是其余线程对队列进行了操做,就从新再来 if (t != tail || m == null || h != head) continue; // inconsistent read // 下面是出队的代码 Object x = m.item; //isData == (x != null) 判断isData的类型是否和队头节点类型相同 // x == m 表示m被取消了 // !m.casItem(x, e))表示将e设置为m的item失败 if (isData == (x != null) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) { // lost CAS // 上面三种状况,任意一种发生,都进行h的出队操做,m变成head,而后重试 advanceHead(h, m); // dequeue and retry continue; } // 匹配成功,将m变为head,虚拟节点 advanceHead(h, m); // successfully fulfilled // 唤醒在m上等待的线程 LockSupport.unpark(m.waiter); // 获得数据 return (x != null) ? (E)x : e; } } }
这个方法将会进行自旋或者阻塞,直到知足某些条件。
//Spins/blocks until node s is fulfilled. Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { /* Same idea as TransferStack.awaitFulfill */ final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 计算须要自旋的次数 // 若是刚好 s 正好是第一个加入的节点,则会自旋一段时间,避免阻塞,提升效率 // 由于其余状况是会涉及到 park挂起线程的 int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // w为当前线程,若是被中断了,则取消该节点 if (w.isInterrupted()) s.tryCancel(e); Object x = s.item; // 知足这个条件,才会退出循环,也是惟一的出口 // 若是 线程一、被阻塞,接着唤醒或者二、中断了,x != e 就会成立 if (x != e) return x; // 若是设置了timed,须要判断一下是否超时 if (timed) { nanos = deadline - System.nanoTime(); // 若是超时,取消该节点,continue,下一次在 x!=e时退出循环 if (nanos <= 0L) { s.tryCancel(e); continue; } } // 每次减小自旋次数 if (spins > 0) --spins; // 次数用完了,设置一下s的等待线程为当前线程 else if (s.waiter == null) s.waiter = w; // 没有超时设置的阻塞 else if (!timed) LockSupport.park(this); // 剩余时间小于spinForTimeoutThreshold的时候,自旋性能的效率更高 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
这边总结一下一些注意点:
取消操做其实就是将节点的item设置为this,
void tryCancel(Object cmp) { UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this); } boolean isCancelled() { return item == this; }
也就是说,若是一旦执行了tryCancel操做【中断,取消,超时】,退出awaitFulfill以后,必定知足:
// x == s 表示节点被取消、中断或超时 if (x == s) { // wait was cancelled clean(t, s); return null; }
会执行clean方法清理s节点:
void clean(QNode pred, QNode s) { s.waiter = null; // 清除thread引用 /* * 不管什么时候,队列中的最后一个节点都没法删除,所以使用cleanMe保存它的前驱 */ while (pred.next == s) { QNode h = head; QNode hn = h.next; // Absorb cancelled first node as head // 队头被取消的状况,出队 if (hn != null && hn.isCancelled()) { advanceHead(h, hn); continue; } QNode t = tail; // Ensure consistent read for tail if (t == h) // 队列此时为空,就退出了 return; QNode tn = t.next; if (t != tail) // 队尾并发改变了 continue; // tn一直定位到为null if (tn != null) { advanceTail(t, tn); continue; } // 这里 s!= t 表示没有到要删除的元素不是最后一个, // 那么直接将pred.next = s.next就能够了 if (s != t) { // If not tail, try to unsplice QNode sn = s.next; if (sn == s || pred.casNext(s, sn)) // 删除完毕,退出 return; } // 走到这里,说明须要删除的s节点是队尾节点,须要使用cleanMe QNode dp = cleanMe; if (dp != null) { // Try unlinking previous cancelled node // d这里指的就是 要删除的节点 QNode d = dp.next; QNode dn; if (d == null || // d is gone or d == dp || // d is off list or !d.isCancelled() || // d not cancelled or (d != t && // d not tail and (dn = d.next) != null && // has successor dn != d && // that is on list dp.casNext(d, dn))) // d unspliced casCleanMe(dp, null); // 清除cleanMe if (dp == pred) return; // s is already saved node // 该分支将dp定位到 pred的位置【第一次应该都会走到这】 } else if (casCleanMe(null, pred)) return; // Postpone cleaning s } }
注意:不管什么时候, 最后插入的节点不能被删除,由于直接删除会存在并发风险,当节点s是最后一个节点时, 将s.pred保存为cleamMe节点,下次再进行清除操做。
transfer就是在一个循环中,不断地去作下面这些事情:
注意:不管是上面哪一种状况,都会不断检测是否有其余线程在进行操做,若是有的话,会帮助其余线程执行入队出队操做。
TransferStack就大体过一下吧:
static final class TransferStack<E> extends Transferer<E> { // 表示一个未匹配的消费者 static final int REQUEST = 0; // 表明一个未匹配的生产者 static final int DATA = 1; // 表示匹配另外一个生产者或消费者 static final int FULFILLING = 2; // 头节点 volatile SNode head; // SNode节点定义 static final class SNode {...}
static final class SNode { volatile SNode next; // next node in stack volatile SNode match; // the node matched to this volatile Thread waiter; // to control park/unpark Object item; // data; or null for REQUESTs int mode; // Note: item and mode fields don't need to be volatile // since they are always written before, and read after, // other volatile/atomic operations. SNode(Object item) { this.item = item; } }
E transfer(E e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed int mode = (e == null) ? REQUEST : DATA; // e为null表示读,非null表示写 for (;;) { SNode h = head; // 若是栈为空,或者节点模式和头节点模式相同, 将节点压入栈 if (h == null || h.mode == mode) { // empty or same-mode // 处理超时 if (timed && nanos <= 0) { // can't wait if (h != null && h.isCancelled()) // 头节点弹出 casHead(h, h.next); // pop cancelled node else return null; //未超时状况,生成snode节点,尝试将s设置为头节点 } else if (casHead(h, s = snode(s, e, h, mode))) { // 自旋,等待线程匹配 SNode m = awaitFulfill(s, timed, nanos); // 表示节点被取消、或中断、或超时 if (m == s) { // wait was cancelled // 清理节点 clean(s); return null; } if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller // 若是是请求数据,则返回匹配的item, 不然返回s的item return (E) ((mode == REQUEST) ? m.item : s.item); } // 栈不为空, 且模式不相等,说明是一对匹配的节点 // 尝试用节点s 去知足 h, 这里判断 (m & FULFILLING) == 0会走这个分支 } else if (!isFulfilling(h.mode)) { // try to fulfill // h已经被取消了 if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry // 将当前节点 标记为FULFILLING, 并设置为head else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // loop until matched or waiters disappear // 这里m是头节点 SNode m = s.next; // m is s's match // 说明被其余线程抢走了,从新设置head if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } // 获得与m匹配的节点 SNode mn = m.next; // 尝试去匹配,匹配成功会唤醒等待的线程 if (m.tryMatch(s)) { // 匹配成功,两个都弹出 casHead(s, mn); // pop both s and m // 返回数据节点的值 m.item return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match s.casNext(m, mn); // help unlink } } // 走到这,表示有其余线程在进行配对(m & FULFILLING) != 0 // 帮助进行匹配,接着执行出栈操做 } else { // help a fulfiller SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } }
transfer方法其实就是在一个循环中持续地去作下面三件事情:
FULFILLING
标记,将当前线程压入栈顶,和栈中的节点进行匹配,匹配成功,出栈这两个节点。isFulfilling(h.mode)
,则帮助它进行匹配并出栈,再执行后续操做。SynchronousQueue是一个不存储元素的阻塞队列,每一个插入的操做必须等待另外一个线程进行相应的删除操做,反之亦然,所以这里的Synchronous指的是读线程和写线程须要同步,一个读线程匹配一个写线程。
该类还支持可供选择的公平性策略,针对不一样的公平性策略有两种不一样的Transfer实现,TransferQueue实现公平模式和TransferStack实现非公平模式。
take和put操做都调用了transfer核心方法,根据传入的参数e是否为null来对应处理。
最后:Synchronous好抽象啊,好难懂,有不少地方画了图也是很难理解,若有不足,望评论区指教。
《Java并发编程的艺术》
《Java并发编程之美》