LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队列。
接口 TransferQueue 和实现类 LinkedTransferQueue 从 Java 7 开始加入 J.U.C 之中。java
java.util.concurrent.LinkedTransferQueuenode
public class LinkedTransferQueue<E> extends AbstractQueue<E> implements TransferQueue<E>, java.io.Serializable
LinkedTransferQueue 的数据结构为链表,是一个松弛的双重队列(Dual Queues with Slack)。算法
双重队列指的是链表中的节点存在两种模式:数据节点(提供数据)、请求节点(请求数据)。segmentfault
对于 TransferQueue#transfer:
线程入队非数据节点时,若是没有匹配到数据节点则阻塞,直到其余线程提供数据节点与之匹配。
线程入队数据节点时,若是没有匹配到非数据节点则阻塞,直到其余线程提供非数据节点与之匹配。安全
item == null
与 isData 的值相反。与 SynchronousQueue.TransferQueue.QNode 的定义是同样的。
节点的匹配状态由 item 属性来控制:
对于数据节点,在匹配的时候,把该节点的 item 域从非空数据 CAS 设置为空;对于非数据节点,则相反。数据结构
static final class Node { final boolean isData; // 是不是数据节点 volatile Object item; // isData为true时才初始化,匹配时CAS修改该字段。使用Object而不是泛型E,容许将item指向自身 volatile Node next; volatile Thread waiter; // null until waiting // CAS methods for fields final boolean casNext(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } final boolean casItem(Object cmp, Object val) { // assert cmp == null || cmp.getClass() != Node.class; return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } /** * Constructs a new node. Uses relaxed write because item can * only be seen after publication via casNext. */ Node(Object item, boolean isData) { UNSAFE.putObject(this, itemOffset, item); // relaxed write this.isData = isData; } final void forgetNext() { UNSAFE.putObject(this, nextOffset, this); } // 节点被取消或被匹配以后会调用:设置item自链接,waiter为null final void forgetContents() { UNSAFE.putObject(this, itemOffset, this); UNSAFE.putObject(this, waiterOffset, null); } // 是否已匹配(已取消或已匹配:item自链接;已匹配:item == null 与 isData 的值相反) final boolean isMatched() { Object x = item; return (x == this) || ((x == null) == isData); } // 是不是一个未匹配的请求节点(!isData 为请求节点,item 为空说明未被修改,而一旦被匹配或取消则会修改 item) final boolean isUnmatchedRequest() { return !isData && item == null; } // 若是给定节点不能链接在当前节点后则返回true final boolean cannotPrecede(boolean haveData) { boolean d = isData; Object x; // 当前节点未匹配,且数据模式与给定节点相反,则返回true return d != haveData && (x = item) != this && (x != null) == d; } /** * Tries to artificially match a data node -- used by remove. */ final boolean tryMatchData() { // assert isData; Object x = item; if (x != null && x != this && casItem(x, null)) { LockSupport.unpark(waiter); return true; } return false; } private static final long serialVersionUID = -3375979862319811754L; // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; private static final long waiterOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); waiterOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("waiter")); } catch (Exception e) { throw new Error(e); } } }
// 队列头节点,第一次添加节点以前为空 transient volatile Node head; // 队列尾节点,第一次添加节点以前为空 private transient volatile Node tail;
LinkedTransferQueue 队列的松弛体如今:
采用无锁算法来维持链表的 head 和 tail 节点的位置,head 和 tail 节点并不严格指向链表的头尾节点。app
好比,一个包含 4 个有效节点的队列结构可能呈现为如下形式:
head 节点指向一个已匹配(matched)节点,该节点又指向队列中第一个未匹配(unmatched)节点。
tail 节点指向队列中最后一个节点。less
head tail | | v v M -> U -> U -> U -> U
因为队列中的节点须要维护其匹配状态,而一旦节点被匹配了,其匹配状态不会再改变。
所以,能够在链表头部存放零个或多个已经被匹配的前置节点,在链表尾部存放零个或多个还没有匹配的后置节点。
因为前置和后置节点都容许为零,意味着 LinkedTransferQueue 并不使用 dummy node 做为头节点。dom
head tail | | v v M -> M -> U -> U -> U -> U
好处是:每次入队出队操做,不会当即更新 head/tail,而是当 head/tail 节点和最近一个未匹配的节点之间的距离超过一个“松弛阀值”以后才会更新,能够节省 CAS 操做的开销。异步
LinkedTransferQueue 与 ConcurrentLinkedQueue 同样采用了松弛的队列结构。
默认构造函数为空,当第一次加入元素时才初始化 head/tail 节点。
/** * Creates an initially empty {@code LinkedTransferQueue}. */ public LinkedTransferQueue() { } /** * Creates a {@code LinkedTransferQueue} * initially containing the elements of the given collection, * added in traversal order of the collection's iterator. * * @param c the collection of elements to initially contain * @throws NullPointerException if the specified collection or any * of its elements are null */ public LinkedTransferQueue(Collection<? extends E> c) { this(); addAll(c); }
与之相比,ConcurrentLinkedQueue 和 SynchronousQueue.TransferQueue 初始化时都会构造空节点(dummy node)。
// java.util.concurrent.ConcurrentLinkedQueue#ConcurrentLinkedQueue() public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); } // java.util.concurrent.SynchronousQueue.TransferQueue#TransferQueue TransferQueue() { QNode h = new QNode(null, false); head = h; tail = h; }
LinkedTransferQueue 中定义了传递数据的 4 种方式:
/* * Possible values for "how" argument in xfer method. */ private static final int NOW = 0; // for untimed poll, tryTransfer // 当即返回 private static final int ASYNC = 1; // for offer, put, add // 异步,不会阻塞 private static final int SYNC = 2; // for transfer, take // 同步,阻塞直到匹配 private static final int TIMED = 3; // for timed poll, tryTransfer // 超时,阻塞直到超时
传递数据的方法定义:
java.util.concurrent.LinkedTransferQueue#xfer
/** * Implements all queuing methods. See above for explanation. * * @param e the item or null for take // 存入、取出、移交的数据元素 * @param haveData true if this is a put, else a take // 是否具备数据 * @param how NOW, ASYNC, SYNC, or TIMED // 4 种模式 * @param nanos timeout in nanosecs, used only if mode is TIMED // 超时时间 * @return an item if matched, else e * @throws NullPointerException if haveData mode but e is null */ private E xfer(E e, boolean haveData, int how, long nanos)
LinkedTransferQueue 因为继承了 BlockingQueue,遵循方法约定:
抛出异常 特殊值 阻塞 超时 插入 add(e) offer(e) put(e) offer(e, time, unit) 移除 remove() poll() take() poll(time, unit) 检查 element() peek() 不可用 不可用
此外,TransferQueue 新增了如下方法:
// 尝试移交元素,当即返回 boolean tryTransfer(E e); // 尝试移交元素,阻塞直到成功、超时或中断 boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException; // 移交元素,阻塞直到成功或中断 void transfer(E e) throws InterruptedException; // 判断是否有消费者 boolean hasWaitingConsumer(); // 查看消费者的数量 int getWaitingConsumerCount();
底层都是经过调用 LinkedTransferQueue#xfer 来实现。
入队: add(e) xfer(e, true, ASYNC, 0) offer(e) xfer(e, true, ASYNC, 0) put(e) xfer(e, true, ASYNC, 0) offer(e, time, unit) xfer(e, true, ASYNC, 0) 出队: remove() xfer(null, false, NOW, 0) poll() xfer(null, false, NOW, 0) take() xfer(null, false, SYNC, 0) poll(time, unit) xfer(null, false, TIMED, unit.toNanos(timeout)) 移交元素: tryTransfer(e) xfer(e, true, NOW, 0) tryTransfer(e, time, unit) xfer(e, true, TIMED, unit.toNanos(timeout) transfer(e) xfer(e, true, SYNC, 0)
因为队列是无界的,入队方法(add/put/offer)既不会抛异常,也不会阻塞或超时。
/** * Implements all queuing methods. See above for explanation. * * @param e the item or null for take // 存入、取出、移交的数据元素 * @param haveData true if this is a put, else a take // 是否具备数据 * @param how NOW, ASYNC, SYNC, or TIMED // 4 种模式 * @param nanos timeout in nanosecs, used only if mode is TIMED // 超时时间 * @return an item if matched, else e * @throws NullPointerException if haveData mode but e is null */ private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) throw new NullPointerException(); Node s = null; // the node to append, if needed retry: for (;;) { // restart on append race for (Node h = head, p = h; p != null;) { // find & match first node // 从头节点开始遍历,初始时h和p都指向头节点 boolean isData = p.isData; Object item = p.item; if (item != p && (item != null) == isData) { // unmatched // 节点p还没有匹配过:item不是p,item是否有值与isData相符 if (isData == haveData) // can't match // 节点p没法匹配:节点p与入参节点类型相同。此时需跳出本层循环,尝试入队 break; if (p.casItem(item, e)) { // match // 节点p匹配成功:item域的值从item变动为e for (Node q = p; q != h;) { // 若q != h,说明当前匹配的节点p不是头节点,而是位于头节点以后。 // 说明队列头部具备多于一个的已匹配节点,须要设置新的头节点,把已匹配的节点出队 // 循环以节点p为起始,一直日后遍历已匹配的节点 Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { // 若是节点h是头节点,而q是已匹配节点,分为两种状况: // 1. 若q.next为空,则将q设为新的头节点; // 2. 若q.next不为空,则将q.next设为新的头节点(注意此时q还在队列中,但不可达) h.forgetNext(); // 旧的头节点h出队(若h以前还有节点,则h自链接表明着以h为尾节点的旧链表将被回收) break; } // advance and retry // 进入这里,h不是头节点,说明其余线程修改过head,须要取最新的head做进一步判断 // 1. 若是head为空,或者head.next为空,或者head.next未匹配,则跳出再也不遍历head.next了 // 2. 虽然取得最新head,可是head.next是已匹配节点,须要从head.next开始遍历,从新设置head if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); // 唤醒p中等待的线程 return LinkedTransferQueue.<E>cast(item); } } // 来到这里,说明节点p是已匹配节点,没法与入参节点匹配,须要继续遍历p的下一个节点 Node n = p.next; // 若 p != n 则继续遍历下一个节点;若 p == n 说明p已经出队,这种状况是其余线程修改了head致使的,须要取新的head从新开始遍历 p = (p != n) ? n : (h = head); // Use head if p offlist } if (how != NOW) { // No matches available // 来到这里,说明没有匹配成功,则按照4种模式的规则入队。 if (s == null) s = new Node(e, haveData); Node pred = tryAppend(s, haveData); // 尝试入队 if (pred == null) continue retry; // lost race vs opposite mode // 入队失败,重试 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); // 阻塞等待,直到匹配或超时 } return e; // not waiting } }
基本思想:
代码流程:
匹配开始时,节点 p 和 h 均处于 head 的位置,假设此时链表的快照以下:
head/p/h | v M -> M -> U -> U -> U -> U
使用 p = p.next 遍历链表,当节点 p 匹配成功时,此时队列以下(U2M 表示节点从未匹配变成已匹配):
h p | | v v M -> M -> U2M -> U -> U -> U
因为 p != h,此时须要从新设置 head,分为两种状况:
状况一
若是在此期间没有其余线程修改过 head,说明当前状况以下:
head/h p | | v v M -> M -> M -> U -> U -> U
经过 casHead 将 p.next 设为新的 head,并对旧的头节点 h 执行 forgetNext 设为自链接,从原链表中断开。
h p head | | | v v v M -> M M -> U -> U -> U
因为节点 p 是不可达的,会被 GC 回收,最终已匹配的节点都会从链表中清除。
head | v U -> U -> U
状况二
若是在此期间其余线程修改过 head,那么 casHead 以前须要先获取最新的 head,再判断是否进一步重设 head。
若是获取最新的 head 以后,head.next 为已匹配节点,须要从 head.next 开始从新遍历再一次设置 head。
在 xfer 方法中,若是在队列头部匹配失败,则会按照 4 中规则从队列尾部入队:
java.util.concurrent.LinkedTransferQueue#xfer
if (how != NOW) { // No matches available if (s == null) s = new Node(e, haveData); Node pred = tryAppend(s, haveData); // 尝试入队 if (pred == null) // 若入队失败,从新从头部匹配 continue retry; // lost race vs opposite mode if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); }
节点 s 尝试从队列尾部入队的方法:
java.util.concurrent.LinkedTransferQueue#tryAppend
/** * Tries to append node s as tail. // 将节点s做为尾节点入队 * * @param s the node to append * @param haveData true if appending in data mode * @return null on failure due to losing race with append in * different mode, else s's predecessor, or s itself if no * predecessor */ private Node tryAppend(Node s, boolean haveData) { for (Node t = tail, p = t;;) { // move p to last node and append Node n, u; // temps for reads of next & tail if (p == null && (p = head) == null) { if (casHead(null, s)) // 队列为空,将s做为头节点。注意,这里插入第一个元素的时候tail指针并无指向s return s; // initialize } else if (p.cannotPrecede(haveData)) return null; // lost race vs opposite mode // 节点p以后没法链接节点,返回null(p与s匹配,不须要将s入队) else if ((n = p.next) != null) // not last; keep traversing // 节点p不是尾节点(由于tail并不严格指向尾节点),需继续遍历p.next // 若是节点p与t不等,且t不是tail节点(说明其余线程修改了tail,没必要遍历p.next了),则取新的tail赋值给p和t,从新从tail节点开始遍历 // 不然(尝试遍历p.next):1. 若是p与p.next不等,从p.next继续遍历;2. 若是p与p.next相等,则设p为空(说明队列为空,后续会将s做为头节点) p = p != t && t != (u = tail) ? (t = u) : // stale tail (p != n) ? n : null; // restart if off list else if (!p.casNext(null, s))// 进入这里,说明p是尾节点。若CAS失败,说明其余线程在p后加了节点,需继续遍历p.next p = p.next; // re-read on CAS failure else { // 进入这里,说明p是尾节点,且CAS将s设为p.next成功。 if (p != t) { // update if slack now >= 2 // p != t 说明松弛度大于等于2,须要从新设置tail节点 while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t); } return p; } } }
代码流程:
当节点 s 入队成功以后,须要在队列之中自旋、等待被其余线程匹配。
java.util.concurrent.LinkedTransferQueue#awaitMatch
/** * Spins/yields/blocks until node s is matched or caller gives up. * * @param s the waiting node // 当前节点,处于等待之中 * @param pred the predecessor of s, or s itself if it has no // 当前节点的上一个节点,若为 s 说明没有上一个节点,若为 null 则是未知的(做为预留) * predecessor, or null if unknown (the null case does not occur * in any current calls but may in possible future extensions) * @param e the comparison value for checking match // 当前节点的数据域值 * @param timed if true, wait only until timeout elapses // 是否等待超时 * @param nanos timeout in nanosecs, used only if timed is true // 超时时间 * @return matched item, or e if unmatched on interrupt or timeout */ private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; // 到期时间 Thread w = Thread.currentThread(); // 当前线程,即操做节点s的线程 int spins = -1; // initialized after first item and cancel checks // 自旋次数 ThreadLocalRandom randomYields = null; // bound if needed // 随机数,随机让一些自旋的线程让出CPU for (;;) { Object item = s.item; if (item != e) { // matched // 节点s的数据域item先后不相等,说明节点s已经被其余线程匹配了 // assert item != s; s.forgetContents(); // avoid garbage // s设置item自链接,waiter为null,这里没有使s=s.next,s仍挂在链表上 return LinkedTransferQueue.<E>cast(item); } if ((w.isInterrupted() || (timed && nanos <= 0)) && // 当前线程被取消,或者已超时 s.casItem(e, s)) { // cancel // 须要把节点s取消掉,设置item自链接 unsplice(pred, s); // 把s跟它的上一个节点pred断开链接 return e; } if (spins < 0) { // establish spins at/near front // 初始化spins if ((spins = spinsFor(pred, s.isData)) > 0) randomYields = ThreadLocalRandom.current(); } else if (spins > 0) { // spin // 自减,随机让出CPU --spins; if (randomYields.nextInt(CHAINED_SPINS) == 0) Thread.yield(); // occasionally yield // 让出CPU时间片,若又争夺到了CPU时间片则继续执行,不然等待直到再次得到CPU时间片(由其余线程让出) } else if (s.waiter == null) { // 当 spin == 0 时才会执行这里及后面的逻辑 s.waiter = w; // request unpark then recheck // 自旋结束了,设置s.waiter } else if (timed) { // 若是有超时,计算超时时间,并阻塞必定时间 nanos = deadline - System.nanoTime(); if (nanos > 0L) LockSupport.parkNanos(this, nanos); } else { // 不是超时的,直接阻塞,等待被唤醒 LockSupport.park(this); } } }
在等待匹配的自旋过程当中,执行如下逻辑:
等待匹配的过程当中,首先进行自旋,等自旋次数归零后,再判断是否进入阻塞,是由于线程的阻塞和唤醒操做涉及到系统内核态与用户态之间的切换,比较耗时。
在 LinkedTransferQueue#awaitMatch 中,节点在队列之中自旋和阻塞以前,首先须要获取自旋的次数。
if (spins < 0) { if ((spins = spinsFor(pred, s.isData)) > 0) randomYields = ThreadLocalRandom.current(); }
与自旋次数相关的属性:
/** True if on multiprocessor */ private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; /** * The number of times to spin (with randomly interspersed calls * to Thread.yield) on multiprocessor before blocking when a node * is apparently the first waiter in the queue. See above for * explanation. Must be a power of two. The value is empirically * derived -- it works pretty well across a variety of processors, * numbers of CPUs, and OSes. */ // 当一个节点是队列中的第一个waiter时,在多处理器上进行自旋的次数(随机穿插调用thread.yield) private static final int FRONT_SPINS = 1 << 7; /** * The number of times to spin before blocking when a node is * preceded by another node that is apparently spinning. Also * serves as an increment to FRONT_SPINS on phase changes, and as * base average frequency for yielding during spins. Must be a * power of two. */ // 当前继节点正在处理,当前节点在阻塞以前的自旋次数。 private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
与自旋次数相关的方法:
java.util.concurrent.LinkedTransferQueue#spinsFor
/** * Returns spin/yield value for a node with given predecessor and * data mode. See above for explanation. */ private static int spinsFor(Node pred, boolean haveData) { if (MP && pred != null) { if (pred.isData != haveData) // phase change return FRONT_SPINS + CHAINED_SPINS; if (pred.isMatched()) // probably at front return FRONT_SPINS; if (pred.waiter == null) // pred apparently spinning return CHAINED_SPINS; } return 0; }
须要注意的是,当前节点 s 在队列之中开始自旋以前,它的上一个的节点 pred 可能也正处于自旋之中。
初始化节点 s 自旋次数的流程:
对于自旋次数多少的设计,个人理解是,节点被匹配的可能性越高,则自旋次数越多;被匹配的可能性越低,则自旋次数越少,尽早进入阻塞以避免 CPU 空转。
而位于队列头部的节点是越有可能被匹配的,须要自旋最屡次。
在 LinkedTransferQueue#awaitMatch 中,节点在队列之中等待匹配时,若是检测到线程中断或已超时,须要取消匹配并将节点从链表中断开。
if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) { unsplice(pred, s); return e; }
此外,LinkedTransferQueue 中移除节点也会调用到 LinkedTransferQueue#unsplice 方法。
与清除节点相关的属性:
/** The number of apparent failures to unsplice removed nodes */ // 从链表上清除节点失败的次数 private transient volatile int sweepVotes; /** * The maximum number of estimated removal failures (sweepVotes) * to tolerate before sweeping through the queue unlinking * cancelled nodes that were not unlinked upon initial * removal. See above for explanation. The value must be at least * two to avoid useless sweeps when removing trailing nodes. */ // sweepVotes的阀值,达到该阈值才清理无效节点 static final int SWEEP_THRESHOLD = 32;
清除指定节点的方法:
java.util.concurrent.LinkedTransferQueue#unsplice
/** * Unsplices (now or later) the given deleted/cancelled node with * the given predecessor. * * @param pred a node that was at one time known to be the * predecessor of s, or null or s itself if s is/was at head * @param s the node to be unspliced */ // 把s跟它的上一个节点pred断开链接(当即断开,或者隔段时间再断开) final void unsplice(Node pred, Node s) { s.forgetContents(); // forget unneeded fields // s设置item自链接,waiter为null if (pred != null && pred != s && pred.next == s) { // 校验pred是不是s的上一个节点 // s是尾节点,或者 (s不是尾节点,s未出队,pred修改下一个节点为s.next成功,pred已经被匹配) Node n = s.next; if (n == null || (n != s && pred.casNext(s, n) && pred.isMatched())) { // 自旋的目的是:设置新的头节点,把旧的头节点出队 for (;;) { // check if at, or could be, head // pred或s为头节点,或者头节点为空,直接返回,不须要处理sweepVotes Node h = head; if (h == pred || h == s || h == null) return; // at head or list empty // 头节点未被匹配,跳出循环(说明head是准确的,后续须要处理sweepVotes) if (!h.isMatched()) break; // head已经被匹配了,且head.next为空,说明如今队列为空了,直接返回,不须要处理sweepVotes Node hn = h.next; if (hn == null) return; // now empty // 走到这里,说明head已经被匹配了,但head.next不为空。 // 若head不是自链接,尝试将head.next设置为新的头节点 if (hn != h && casHead(h, hn)) h.forgetNext(); // advance head // 旧的头节点设为自链接,表示出队 // 后续继续重新的头节点遍历,把已匹配的节点出队,重设头节点 } // 进入这里,说明 1. s多是尾节点 2. pred、s均不为头节点 3. 队列不为空 if (pred.next != pred && s.next != s) { // recheck if offlist // 再一次校验pred和s是否未出队 for (;;) { // sweep now if enough votes int v = sweepVotes; if (v < SWEEP_THRESHOLD) { // 阈值为32 if (casSweepVotes(v, v + 1)) break; } else if (casSweepVotes(v, 0)) { sweep(); // 达到阀值,进行“大扫除”,清除队列中的无效节点 break; } } } } } }
代码流程:
为何要累计 sweepVotes 进行大扫除,官方的解释:
除了经过节点自链接(self-linking)来方便垃圾回收之外,还须要在链表上解开对无效节点的链接。
通常来讲,若是想要在链表上移除节点s,只须要把 s 的上一个节点的 next 属性改掉便可。
可是用这种方式来让节点 s 不可达,在如下两种场景下是没法保证的:
这样会形成一种后果:当队列头部是一个阻塞的 take 请求节点,该节点以后是大量的超时时间很短的 poll 请求节点,一旦过了超时时间,队列中就会出现大量可达的无效节点。
(把这种状况代入 unsplice,大部分节点均可以经过 pred.casNext 移除,迷惑)
鉴于此,须要在一段时间以后遍历整个链表,把已匹配的节点清理出队。
/** * Unlinks matched (typically cancelled) nodes encountered in a * traversal from head. */ private void sweep() { for (Node p = head, s, n; p != null && (s = p.next) != null; ) { // 初始时,p = head,s = p.next if (!s.isMatched()) // Unmatched nodes are never self-linked // 未匹配的节点不会是自链接的! p = s; // 若是节点s未匹配,则让 p = p.next 继续遍历下一个节点 else if ((n = s.next) == null) // trailing node is pinned // 遍历结束,跳出循环 break; else if (s == n) // stale // s自链接,说明s已出队,当前是在废弃的链表上遍历,须要从新从head开始遍历 // No need to also check for p == s, since that implies s == n p = head; else // 进入这里,说明节点s是已匹配的,不是尾节点,不是自链接。 // 此时将节点s出队,虽然没有使s自链接,s不可达会被回收(注意s.item会不会被回收则取决于item自身是否可达) p.casNext(s, n); } }
size 方法遍历队列中所有的数据节点,并进行计数,最大值为 Integer.MAX_VALUE。
当遍历过程当中其余线程修改了 head,须要取新的 head 从新开始遍历。
size 方法并非一个恒定时长的操做。
java.util.concurrent.LinkedTransferQueue#size
public int size() { return countOfMode(true); }
/** * Traverses and counts unmatched nodes of the given mode. * Used by methods size and getWaitingConsumerCount. */ private int countOfMode(boolean data) { int count = 0; for (Node p = head; p != null; ) { if (!p.isMatched()) { if (p.isData != data) return 0; if (++count == Integer.MAX_VALUE) // saturated break; } Node n = p.next; if (n != p) // 节点p未出队,继续遍历p.next p = n; else { // 节点p已出队,取新的head从新开始遍历 count = 0; p = head; } } return count; }