今天继续来说解阻塞队列,一个比较特殊的阻塞队列SynchronousQueue,经过Executors框架提供的线程池cachedThreadPool中咱们能够看到其被使用做为可缓存线程池的队列实现,下面经过源码来了解其内部实现,便于后面帮助咱们更好的使用线程池java
JDK版本号:1.8.0_171
synchronousQueue是一个没有数据缓冲的阻塞队列,生产者线程的插入操做put()必须等待消费者的删除操做take(),反过来也同样。固然,也能够不进行等待直接返回,例如poll和offernode
在使用上很好理解,每次操做都须要找到对应的匹配操做,如A线程经过put插入操做填入值1,若是无其余线程操做则须要阻塞等待一个线程执行take操做A线程才能继续,反过来一样道理,这样看彷佛synchronousQueue是没有队列进行保存数据的,每次操做都在等待其互补操做一块儿执行缓存
这里和其余阻塞队列不一样之处在于,内部类将入队出队操做统一封装成了一个接口实现,内部类数据保存的是每一个操做动做,好比put操做,保存插入的值,并根据标识来判断是入队仍是出队操做,若是是take操做,则值为null,经过标识符能判断出来是出队操做框架
多思考下,咱们须要找到互补的操做必然须要一个公共的区域来判断已经发生的全部操做,内部类就是用来进行这些操做的,SynchronousQueue分为公平策略(FIFO)和非公平策略(LIFO),两种策略分别对应其两个内部类实现,公平策略使用队列结构实现,非公平策略使用栈结构实现ide
因为篇幅过长,本篇先说明SynchronousQueue相关知识和公平策略下的实现类TransferQueue,下篇将说明非公平策略下的实现类TransferStack和其余知识ui
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable
/** The number of CPUs, for spin control */ // cpu数量,会在自旋控制时使用 static final int NCPUS = Runtime.getRuntime().availableProcessors(); // 自旋次数,指定了超时时间时使用,这个常量配合CAS操做使用,至关于循环次数 // 若是CAS操做失败,则根据这个参数判断继续循环 static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; // 自旋次数,未指定超时时间时使用 static final int maxUntimedSpins = maxTimedSpins * 16; /** * The number of nanoseconds for which it is faster to spin * rather than to use timed park. A rough estimate suffices. */ // 自旋超时时间阈值,在设置的时间超过这个时间时以这个时间为准,单位,纳秒 static final long spinForTimeoutThreshold = 1000L; // 后进先出队列和先进先出队列 @SuppressWarnings("serial") static class WaitQueue implements java.io.Serializable { } static class LifoWaitQueue extends WaitQueue { private static final long serialVersionUID = -3633113410248163686L; } static class FifoWaitQueue extends WaitQueue { private static final long serialVersionUID = -3623113410248163686L; } // 序列化操做使用 private ReentrantLock qlock; private WaitQueue waitingProducers; private WaitQueue waitingConsumers; /** * The transferer. Set only in constructor, but cannot be declared * as final without further complicating serialization. Since * this is accessed only at most once per public method, there * isn't a noticeable performance penalty for using volatile * instead of final here. */ // 全部的队列操做都经过transferer来执行,统一方法执行 // 初始化时会根据所选的策略实例化对应的内部实现类 private transient volatile Transferer<E> transferer;
从上边也能看出没有设置变量来保存入队出队操做的数据,统一操做方法都放置到了Transferer中this
构造方法很清晰,根据所选的策略实现对应的Transferer内部接口实现类来进行队列操做idea
// 默认非公平策略 public SynchronousQueue() { this(false); } // 可选策略,经过两个内部类TransferQueue和TransferStack来实现公平策略(队列)和非公平策略(栈) public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
抽象内部类Transferer,transfer方法用来替代put和take操做,每一个参数解释以下:spa
返回值:非空则代表操做成功,返回消费的item或生产的item;空则代表因为超时或中断引发操做失败。调用者能够经过检查Thread.interrupted判断是哪一种缘由线程
/** * Shared internal API for dual stacks and queues. */ abstract static class Transferer<E> { /** * Performs a put or take. * * @param e if non-null, the item to be handed to a consumer; * if null, requests that transfer return an item * offered by producer. * @param timed if this operation should timeout * @param nanos the timeout, in nanoseconds * @return if non-null, the item provided or received; if null, * the operation failed due to timeout or interrupt -- * the caller can distinguish which of these occurred * by checking Thread.interrupted. */ abstract E transfer(E e, boolean timed, long nanos); }
入队操做经过内部类调用transfer,传参含义以下已在上面内部抽象类中说明,入队元素e非空
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } } public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) return true; if (!Thread.interrupted()) // 超时返回false return false; // 线程中断抛错 throw new InterruptedException(); } public boolean offer(E e) { if (e == null) throw new NullPointerException(); return transferer.transfer(e, true, 0) != null; }
出队操做经过内部类调用transfer,入队元素e为null
public E take() throws InterruptedException { E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); } public E poll(long timeout, TimeUnit unit) throws InterruptedException { E e = transferer.transfer(null, true, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) return e; throw new InterruptedException(); } public E poll() { return transferer.transfer(null, true, 0); }
其余方法以空队列为标准进行处理,好比队列长度直接返回0,判空老是返回true,其余方法相似,直接参考源码,比较简单,很少说
上面已经看到了最重要的核心方法在于transferer.transfer方法,那么其具体的实现类中这个方法是如何实现的呢?
先说明公平策略下的实现类TransferQueue
基于Transferer实现公平策略下的实现类TransferQueue,既然是公平策略,则须要先进先出,这里queue也代表其结构特色,内部经过QNode类实现链表的队列形态,经过CAS操做更新链表元素
有两种状态须要注意:
QNode即为队列的链表实现,其中的变量属性isData也能够看出其保存的是每次的操做动做而不只仅是入队的值,入队操做会以QNode保存,出队操做一样会以QNode保存,同时变量都是经过CAS操做更新
static final class QNode { // next指向链表下一个节点 volatile QNode next; // next node in queue // 队列元素的值 volatile Object item; // CAS'ed to or from null // 保存等待的线程 volatile Thread waiter; // to control park/unpark // 是否有数据,队列元素的类型标识,入队时有数据值为true,出队时无数据值为false final boolean isData; QNode(Object item, boolean isData) { this.item = item; this.isData = isData; } // cas操做更新next boolean casNext(QNode cmp, QNode val) { return next == cmp && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // cas操做更新item boolean casItem(Object cmp, Object val) { return item == cmp && UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } // cas操做取消操做,将当前的QNode的item赋值为当前的QNode void tryCancel(Object cmp) { UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this); } // 判断是否取消成功,紧跟着tryCancel操做后进行判断 boolean isCancelled() { return item == this; } // 判断当前节点是否已处于离队状态,这里看到是将节点next指向本身 boolean isOffList() { return next == this; } // 获取item和next的偏移量,操做CAS使用 // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = QNode.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
队头队尾元素引用设置,须要注意的是cleanMe节点的含义,在具体方法操做中会进行说明
/** Head of queue */ // 队头 transient volatile QNode head; /** Tail of queue */ // 队尾 transient volatile QNode tail; /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was the last inserted node * when it was cancelled. */ // 标记节点,清理链表尾部节点时,不直接删除尾部节点,而是将尾节点的前驱节点next指向设置为cleanMe // 防止此时向尾部插入节点的线程失败致使出现数据问题 transient volatile QNode cleanMe; // 偏移量获取 private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; private static final long cleanMeOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = TransferQueue.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail")); cleanMeOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("cleanMe")); } catch (Exception e) { throw new Error(e); } }
头尾节点初始化操做
TransferQueue() { // 初始化一个值为null的QNode,初始化头尾节点 QNode h = new QNode(null, false); // initialize to dummy node. head = h; tail = h; }
CAS更新变量操做
/** * Tries to cas nh as new head; if successful, unlink * old head's next node to avoid garbage retention. */ // 尝试将nh更新为新的队头 void advanceHead(QNode h, QNode nh) { if (h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh)) // 原头节点next指向更新为本身,使得h为离队状态,isOffList方法为true h.next = h; // forget old next } /** * Tries to cas nt as new tail. */ // 尝试更新队尾节点 void advanceTail(QNode t, QNode nt) { if (tail == t) UNSAFE.compareAndSwapObject(this, tailOffset, t, nt); } /** * Tries to CAS cleanMe slot. */ // 尝试更新cleanMe节点 boolean casCleanMe(QNode cmp, QNode val) { return cleanMe == cmp && UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val); }
入队和出队操做,统一使用一个方法,即实现接口中的transfer方法来完成,须要明白的是保存的是每次操做这个动做
/** * Puts or takes an item. */ @SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { QNode s = null; // constructed/reused as needed // e为null时至关于出队操做isData为false,入队操做为true boolean isData = (e != null); for (;;) { // 获取最新的尾节点和头节点 QNode t = tail; QNode h = head; // 头,尾节点为空,未初始化,则循环spin if (t == null || h == null) // saw uninitialized value continue; // spin // 首尾节点相同则为空队列或尾节点类型和新操做的类型相同,都是入队操做或出队操做 // 为什么只判断尾部,由于若是头节点和尾结点不一样在队列中不可能存在 // 一入队和一出队直接进入else匹配上不会再保存在链表中 if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; // 尾节点已经被其余线程更新修改,则从新循环判断 if (t != tail) // inconsistent read continue; // tn非空,说明其余线程已经添加了节点,尝试更新尾节点,从新循环判断 if (tn != null) { // lagging tail advanceTail(t, tn); continue; } // 设置超时时间而且超时时间小于等于0则直接返回null if (timed && nanos <= 0) // can't wait return null; // s为null则初始化节点s if (s == null) s = new QNode(e, isData); // 尝试将s添加到尾节点的next上,失败则从新循环 if (!t.casNext(null, s)) // failed to link in continue; // 尝试更新尾节点,尾节点此时为s advanceTail(t, s); // swing tail and wait // 经过awaitFulfill方法自旋阻塞找到匹配操做的节点item,这个下面进行说明 Object x = awaitFulfill(s, e, timed, nanos); // 表示当前线程已经中断或者超时,在awaitFulfill超时或者中断时更新s.item指向本身 if (x == s) { // wait was cancelled // 清理节点,取消本次操做 clean(t, s); return null; } // 判断s是否已从队列移除,正常状况下,出队和入队操做匹配上s节点确定是须要被清理掉的 if (!s.isOffList()) { // not already unlinked // 未被从队列清除则尝试更新队头 advanceHead(t, s); // unlink if head // 当前线程为出队操做时,s节点取消操做 if (x != null) // and forget fields s.item = s; // 清除等待线程 s.waiter = null; } return (x != null) ? (E)x : e; // 与上次队列操做非同一类型操做,上次入队,此次为出队,上次出队,此次为入队才会执行 // 匹配操做才会执行下面逻辑 } else { // complementary-mode QNode m = h.next; // node to fulfill // 头节点或尾节点被其余线程更新或者为空队列则循环操做 if (t != tail || m == null || h != head) continue; // inconsistent read // 头节点的下一个节点对应的item Object x = m.item; // 同类型,被取消操做或更新item失败则更新头节点指向从新操做 if (isData == (x != null) || // m already fulfilled 相同类型操做说明m已经被其余线程操做匹配 x == m || // m cancelled 取消操做标识 // CAS更新item为匹配上的操做值,好比当前是出队操做,m为入队操做x为入队的值,那么此时要替换为出队值null // CAS操做失败 !m.casItem(x, e)) { // lost CAS // 删除匹配上的头节点,更新头节点 advanceHead(h, m); // dequeue and retry continue; } // 更新头节点 advanceHead(h, m); // successfully fulfilled // 释放m的等待线程锁使得m操做结束 LockSupport.unpark(m.waiter); return (x != null) ? (E)x : e; } } }
在transfer相同类型操做时被调用,正常状况下(不算超时和中断)阻塞线程直到与之匹配的操做到来再继续执行
/** * Spins/blocks until node s is fulfilled. * * @param s the waiting node * @param e the comparison value for checking match * @param timed true if timed wait * @param nanos timeout value * @return matched item, or s if cancelled */ // 自旋或阻塞直到超时或被唤醒匹配上节点 // 好比此时是入队操做,上次也是入队操做,在未设置超时时,这里可能须要自旋或阻塞等待一个出队操做来唤醒本次入队操做 // 至关于互补匹配上同时继续完成后续操做,出队操做拿走入队操做的值才能完成 入队操做被出队操做获取值才能完成 Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { /* Same idea as TransferStack.awaitFulfill */ // 获取超时时间点 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 当前线程 Thread w = Thread.currentThread(); // 仅在head.next==s时才使用spins(自旋次数),同时判断是否设置了超时 // 非head.next则不走spins,至关于只是在第一次操做入链表时执行自旋spins操做,不是上来就进行阻塞 // 也能明白,在入队和出队操做匹配时 新操做是和头节点匹配,故自旋必定次数而不是直接阻塞来提高执行效率,减小线程切换开销 int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // 判断当前线程是否中断,外部中断操做,至关于取消本次操做 if (w.isInterrupted()) // 尝试将s节点的item设置为s本身,这样判断的时候就知道这个节点是被取消的 s.tryCancel(e); Object x = s.item; // s的item已经改变,直接返回x // 没改变的状况下即没有匹配的操做,有匹配上的item即x将被改变,取消时如上也会改变,以下超时也会改变 // 故return后还须要要区分出取消和超时的状况 if (x != e) return x; // 线程超时将s节点的item设置为s本身 if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(e); continue; } } // 须要自旋时循环 if (spins > 0) --spins; // 设置s的等待线程 else if (s.waiter == null) s.waiter = w; // 未设置超时,直接阻塞 else if (!timed) LockSupport.park(this); // 设置超时时间阻塞 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
清理s节点,同时须要关注cleanMe节点,总体处理过程以下:
/** * Gets rid of cancelled node s with original predecessor pred. */ // 中断取消操做将pred节点代替s节点,修改先后节点之间的关联 void clean(QNode pred, QNode s) { // 清理前先将等待线程置空 s.waiter = null; // forget thread // pred与s的先后关系 while (pred.next == s) { // Return early if already unlinked QNode h = head; QNode hn = h.next; // Absorb cancelled first node as head // hn非空且被取消操做,更新头节点为hn if (hn != null && hn.isCancelled()) { advanceHead(h, hn); continue; } // 尾节点 QNode t = tail; // Ensure consistent read for tail // 空队列返回 if (t == h) return; // 尾节点下一个 QNode tn = t.next; // 尾节点已被其余线程更新 if (t != tail) continue; // 非空 更新尾节点 if (tn != null) { advanceTail(t, tn); continue; } // s不是尾节点 if (s != t) { // If not tail, try to unsplice // s的下一个节点 QNode sn = s.next; // 更新pred节点后一个节点为s的下一个节点,至关于删除s在链表中的关系 if (sn == s || pred.casNext(s, sn)) return; } // 执行到这里说明s为尾节点则须要处理cleanMe节点 QNode dp = cleanMe; if (dp != null) { // Try unlinking previous cancelled node // 被清除的节点,从下面else部分代码也能够看出若是为空,传入的节点为清理节点的前一个节点 // 这里表明上次须要清理的cleanMe节点 // 这里d表明真正须要清理的节点即dp.next QNode d = dp.next; QNode dn; if (d == null || // 清除节点为null,至关于已经清理了 d == dp || // dp节点处于离队状态 !d.isCancelled() || // 清除节点被取消 (d != t && // 清除节点非尾节点 (dn = d.next) != null && // 清除节点下一节点非null dn != d && // 清除节点下一节点在队列中 dp.casNext(d, dn))) // 清理d与其余节点的关系 casCleanMe(dp, null); // 清理完毕设置为null // 至关于s为须要清理的节点,上边已经清理过了,不须要再次清理 if (dp == pred) return; // s is already saved node // 更新cleanMe为pred,为下次清理准备 } else if (casCleanMe(null, pred)) return; // Postpone cleaning s } }
TransferQueue的源码操做上面已经说明完毕,为了更好的理解其内部数据转换,举个例子同时画图进行说明方便各位理解:
public class SynchronousQueueTest { public static void main(String[] args) { BlockingQueue<String> sq = new SynchronousQueue<>(true); new Thread(() -> { try { System.out.println(sq.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { sq.put("test"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
上面代码很简单,公平模式下,一个线程执行take操做,一个线程执行put操做,那么SynchronousQueue内部是如何处理的呢?咱们以图进行说明
1.建立公平策略下的SynchronousQueue
2.一线程执行take操做,以先执行take的线程为例子进行说明,此时另外一线程put操做还未执行,take操做阻塞等待
3.另外一线程执行put操做,唤醒以前阻塞等待的take操做,同时处理完成
以后会进行节点的清理和头尾节点的指向更新,这部分自行读者可自行画图理解
SynchronousQueue是一个无数据缓冲的阻塞队列,在不进行超时和中断的状况下,入队操做需匹配出队操做才能继续执行下去,至关于进行互补操做,同时执行,成双成对完成,在理解这点的基础上,咱们能够看到其拥有如下特色:
非公平策略下的实现类TransferStack和其余知识将放在下篇文章进行说明
以上内容若有问题欢迎指出,笔者验证后将及时修正,谢谢