http://m.blog.csdn.net/blog/luoyuyou/30256497java
背景:node
一个BlockingQueue的是一个这样的队列,每一个插入操做都必须等待另外一个删除操做,反过来也同样。一个同步队列没有内部容量这个概念。你不能使用peek操做,由于一个元素仅在你试着删除它的时候才可以被取得。你不能插入一个元素(任何方法),直到另外一个线程试着删除它。你不能迭代它们,由于没有东西能够被迭代。queue的头元素head是第一个进入队列中的元素,而且插入线程正在为它等待。若是队列中没有像以上的元素存在,那么调用poll的方法会返回null。对于Collection的其余方法(好比contains),SynchronousQueue表现得像一个空的集合。它不容许null入队。
算法
这个队列相似于CSP和Ada中使用的会合信道。它们适合于切换的设计,好比一个线程中的对象必须同步等待另外一个线程中运行的对象从而传递一些信息/事件/任务。数据结构
这个类支持可选的公平策略从而制订生产者和等待者的等待顺序。默认状况下,这个顺序是没有保证的,使用true能够确保队列是FIFO的。app
这个类以及它的迭代器实现了某些Collection和Iterator中的方法。less
算法:ide
这个算法实现了双重栈和双重队列算法。oop
(LIfo)栈用做非公平模式,(Fifo)队列用于公平模式。这二者的性能类似。Fifo常常能在竞争状况下提供更高的吞吐量,可是Lifo可以在通常应用中维持更高的线程局部性。性能
双重队列(以及类似的栈)在任什么时候刻都是持有“数据”--item经过put操做提供或者“请求”--slo经过take操做提供,或者为空。一个调用试着“知足”(一个请求的调用获得数据或者一个数据的调用匹配了请求)结果是出队了一个模式互补的节点。最有趣的的地方在于任何操做都可以明确当前队列处于哪一种模式,以及表现得好像不须要锁。
this
队列和栈都扩展了抽象的Transferer接口,它定义了惟一一个transfer方法用于put或者take。这些方法统一在一个方法下是由于在双重数据结构中,put和take方法是对称的两种方法,因此几乎全部代码能够被组合。结果transfer方法是比较长的,可是这样相对于把他们分红几乎重复的几部分代码仍是更好的。
这个队列和栈共享了不少类似的概念可是不多的具体细节。为了简单性,他们保持不一样从而在后续能够分开演化。
这个同步队列的算法不一样于之前的算法,包括消除的机制。主要的不一样在于:
/** * Puts or takes an item. */ @SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { QNode s = null; // constructed/reused as needed boolean isData = (e != null); for (;;) { QNode t = tail; QNode h = head; if (t == null || h == null) // saw uninitialized value continue; // spin if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; if (t != tail) // inconsistent read continue; if (tn != null) { // lagging tail advanceTail(t, tn); continue; } if (timed && nanos <= 0) // can't wait return null; if (s == null) s = new QNode(e, isData); if (!t.casNext(null, s)) // failed to link in continue; advanceTail(t, s); // swing tail and wait Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { // wait was cancelled clean(t, s); return null; } if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? (E)x : e; } else { // complementary-mode QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) continue; // inconsistent read Object x = m.item; if (isData == (x != null) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; } advanceHead(h, m); // successfully fulfilled LockSupport.unpark(m.waiter); return (x != null) ? (E)x : e; } } }
这里的基本算法是循环试着执行下列行动之一:
接着咱们来看阻塞操做awaitFulfill:
/** * Spins/blocks until node s is fulfilled. * * @param s the waiting node * @param e the comparison value for checking match * @param timed true if timed wait * @param nanos timeout value * @return matched item, or s if cancelled */ Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { /* Same idea as TransferStack.awaitFulfill */ final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) s.tryCancel(e); Object x = s.item; if (x != e) return x; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(e); continue; } } if (spins > 0) --spins; else if (s.waiter == null) s.waiter = w; else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
这里的s是根据调用构造的节点,e为传递的数据(null是请求,不然数据),timed为限时标示,nanos时间量(纳秒):
咱们最后看transfer方法中的清理操做:
void clean(QNode pred, QNode s) { s.waiter = null; // forget thread /* * At any given time, exactly one node on list cannot be * deleted -- the last inserted node. To accommodate this, * if we cannot delete s, we save its predecessor as * "cleanMe", deleting the previously saved version * first. At least one of node s or the node previously * saved can always be deleted, so this always terminates. */ while (pred.next == s) { // Return early if already unlinked QNode h = head; QNode hn = h.next; // Absorb cancelled first node as head if (hn != null && hn.isCancelled()) { advanceHead(h, hn); continue; } QNode t = tail; // Ensure consistent read for tail if (t == h) return; QNode tn = t.next; if (t != tail) continue; if (tn != null) { advanceTail(t, tn); continue; } if (s != t) { // If not tail, try to unsplice QNode sn = s.next; if (sn == s || pred.casNext(s, sn)) return; } QNode dp = cleanMe; if (dp != null) { // Try unlinking previous cancelled node QNode d = dp.next; QNode dn; if (d == null || // d is gone or d == dp || // d is off list or !d.isCancelled() || // d not cancelled or (d != t && // d not tail and (dn = d.next) != null && // has successor dn != d && // that is on list dp.casNext(d, dn))) // d unspliced casCleanMe(dp, null); if (dp == pred) return; // s is already saved node } else if (casCleanMe(null, pred)) return; // Postpone cleaning s } }
在任一时刻,只有一个链表中的节点不能被删除---最后一个插入的节点。为了迁就这个原则,若是咱们没法删除s,那么咱们就保存它的前驱节点为“cleanMe”,首先删除以前保存的版本。因此至少s或者以前保存的节点可以被删除,因此最后老是可以被删除!
详情以下:
if (dp == pred)这一行在我看来是不必的,由于永远不可能出现。
咱们接着分析stack:
stack
/** * Puts or takes an item. */ @SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed int mode = (e == null) ? REQUEST : DATA; for (;;) { SNode h = head; if (h == null || h.mode == mode) { // empty or same-mode if (timed && nanos <= 0) { // can't wait if (h != null && h.isCancelled()) casHead(h, h.next); // pop cancelled node else return null; } else if (casHead(h, s = snode(s, e, h, mode))) { SNode m = awaitFulfill(s, timed, nanos); if (m == s) { // wait was cancelled clean(s); return null; } if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller return (E) ((mode == REQUEST) ? m.item : s.item); } } else if (!isFulfilling(h.mode)) { // try to fulfill if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // loop until matched or waiters disappear SNode m = s.next; // m is s's match if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match s.casNext(m, mn); // help unlink } } } else { // help a fulfiller SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } }
这里的Node实际上拥有三种状态:REQUEST/DATA/FULFILLING,基本算法是循环试着执行下列3种行动之一:
/** * Spins/blocks until node s is matched by a fulfill operation. * * @param s the waiting node * @param timed true if timed wait * @param nanos timeout value * @return matched node, or s if cancelled */ SNode awaitFulfill(SNode s, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) s.tryCancel(); SNode m = s.match; if (m != null) return m; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0; else if (s.waiter == null) s.waiter = w; // establish waiter so can park next iter else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
这个操做事实上和队列版本中的相似,首先来解释下注释中的内容:当一个节点/线程试着去阻塞,它会在设置waiter域以后至少检查一次状态,而后才会调用parking(阻塞),这样子能够经过waiter从而和它的知足者协做从而确保不会丢失信号。若是当前调用的几点位于栈顶,那么在park以前会首先尝试自旋,这样能够在生产者和消费者很是接近时避免阻塞。可是这个只在多核处理器下才会有用。从代码中的检查状况能够看出,在优先级上,中断状态--->正式返回---->超时。(因此最后一个检查是用来探测超时的)除了非限时的同步队列。{poll/offer}方法不会检查中断以及等待过久,因此对于中断和超时的判断被放置于transfer方法中,这样要好于调用awaitFulfill。详情以下(与队列版本中相似):取得当前的结束时间,当前线程,以及自旋次数。而后进入循环。首先判断是否中断,判断限时版本下的时间流逝,判断自旋,以及根据当前节点s所处的位置来设置自旋次数。设置线程对象(用于唤醒)。最后根据是否限时来阻塞当前线程,限时版本下会根据阈值来判断是否须要阻塞。最后咱们来看处理中断和超时状况下的清理操做clean:
void clean(SNode s) { s.item = null; // forget item s.waiter = null; // forget thread /* * At worst we may need to traverse entire stack to unlink * s. If there are multiple concurrent calls to clean, we * might not see s if another thread has already removed * it. But we can stop when we see any node known to * follow s. We use s.next unless it too is cancelled, in * which case we try the node one past. We don't check any * further because we don't want to doubly traverse just to * find sentinel. */ SNode past = s.next; if (past != null && past.isCancelled()) past = past.next; // Absorb cancelled nodes at head SNode p; while ((p = head) != null && p != past && p.isCancelled()) casHead(p, p.next); // Unsplice embedded nodes while (p != null && p != past) { SNode n = p.next; if (n != null && n.isCancelled()) p.casNext(n, n.next); else p = n; } }