LinkedTransferQueue(LTQ) 相比 BlockingQueue 更进一步,生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费(不只仅是添加到队列里就完事)。新添加的 transfer 方法用来实现这种约束。顾名思义,阻塞就是发生在元素从一个线程 transfer 到另外一个线程的过程当中,它有效地实现了元素在线程之间的传递(以创建 Java 内存模型中的 happens-before 关系的方式)。Doug Lea 说从功能角度来说,LinkedTransferQueue 其实是 ConcurrentLinkedQueue、SynchronousQueue(公平模式)和 LinkedBlockingQueue 的超集。并且 LinkedTransferQueue 更好用,由于它不只仅综合了这几个类的功能,同时也提供了更高效的实现。html
推荐一篇 LinkedTransferQueue 的介绍:http://ifeve.com/java-transfer-queue/java
LinkedTransferQueue 实现了 TransferQueue 接口,下面就主要介绍一下这个接口。 TransferQueue 继承了 BlockingQueue(BlockingQueue 又继承了 Queue)并扩展了一些新方法。BlockingQueue(和Queue)是 JDK5 中加入的接口,它是指这样的一个队列:当生产者向队列添加元素但队列已满时,生产者会被阻塞;当消费者从队列移除元素但队列为空时,消费者会被阻塞。node
TransferQueue 则更进一步,生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费(不只仅是添加到队列里就完事)。新添加的 transfer 方法用来实现这种约束。顾名思义,阻塞就是发生在元素从一个线程 transfer 到另外一个线程的过程当中,它有效地实现了元素在线程之间的传递(以创建 Java 内存模型中的 happens-before 关系的方式)。算法
TransferQueue 还包括了其余的一些方法:两个 tryTransfer 方法,一个是非阻塞的,另外一个带有 timeout 参数设置超时时间的。还有两个辅助方法 hasWaitingConsumer() 和 getWaitingConsumerCount()。缓存
当我第一次看到 LinkedTransferQueue 时,首先想到了已有的实现类 SynchronousQueue。SynchronousQueue 的队列长度为 0,特别是对于两个线程之间传递元素这种用例。数据结构
LinkedTransferQueue 相比 SynchronousQueue 用处更广、更好用,由于你能够决定是使用 BlockingQueue 的方法(译者注:例如put方法)仍是确保一次传递完成(译者注:即transfer方法)。在队列中已有元素的状况下,调用 transfer 方法,能够确保队列中被传递元素以前的全部元素都能被处理。app
LinkedTransferQueue 的性能分别是 SynchronousQueue 的3倍(非公平模式)和14倍(公平模式)。由于像 ThreadPoolExecutor 这样的类在任务传递时都是使用 SynchronousQueue,因此使用 LinkedTransferQueue 来代替 SynchronousQueue 也会使得 ThreadPoolExecutor 获得相应的性能提高。less
下面你能够参考这往篇文章实现一个本身的 LinkedTransferQueue:http://ifeve.com/customizing-concurrency-classes-11-2/#more-7388dom
LTQ 内部采用的是一种很是不一样的队列,即松弛型双重队列(Dual Queues with Slack):http://ifeve.com/buglinkedtransferqueue-bug/#more-11117异步
强烈建议你们读一下 Doug Lea 的 java doc 文档,对 LTQ 的数据结构有很清楚的说明。
/** * A FIFO dual queue may be implemented using a variation of the * Michael & Scott (M&S) lock-free queue algorithm * (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf). * It maintains two pointer fields, "head", pointing to a * (matched) node that in turn points to the first actual * (unmatched) queue node (or null if empty); and "tail" that * points to the last node on the queue (or again null if * empty). For example, here is a possible queue with four data * elements: * * head tail * | | * v v * M -> U -> U -> U -> U * * M(matched) U(unmatched) */
翻译:FIFO 双队列可使用 Michael & Scott(M&S)无锁队列算法的变体实现。它维护两个指针字段: head 指向第一个不匹配节点(M)的前驱节点(若是为空则为空);tail 指向队列中的最后一个节点(若是为空则为空)。
双重是指有两种类型相互对立的节点(Node.isData==false或true),而且我理解的每种节点都有三种状态:
/** * 在更新head/tail和查找中寻求平衡,大多数场景1~3比较合适。 * 本质上:是增长对 volatile 变量读操做来减小了对 volatile 变量的写操做 * 而对 volatile 变量的写操做开销要远远大于读操做,所以使用Slack能增长效率 * * We introduce here an approach that lies between the extremes of * never versus always updating queue (head and tail) pointers. * This offers a tradeoff between sometimes requiring extra * traversal steps to locate the first and/or last unmatched * nodes, versus the reduced overhead and contention of fewer * updates to queue pointers. For example, a possible snapshot of * a queue is: * * head tail * | | * v v * M -> M -> U -> U -> U -> U * * The best value for this "slack" (the targeted maximum distance * between the value of "head" and the first unmatched node, and * similarly for "tail") is an empirical matter. We have found * that using very small constants in the range of 1-3 work best * over a range of platforms. Larger values introduce increasing * costs of cache misses and risks of long traversal chains, while * smaller values increase CAS contention and overhead. */
为了节省 CAS 操做的开销,LTQ 引入了“松弛度”的概念:在节点被匹配(被删除)以后,不会当即更新 head/tail,而是当 head/tail 节点和最近一个未匹配的节点之间的距离超过一个“松弛阀值”以后才会更新(在 LTQ 中,这个值为 2)。这个“松弛阀值”通常为1-3,若是太大会下降缓存命中率,而且会增长遍历链的长度;过小会增长 CAS 的开销。另外在 ConcurrentLinkedQueue 也有相应的应用:hops 设计意图
已匹配节点的 next 引用会指向自身。若是 GC 延迟回收,已删除节点链会积累的很长,此时垃圾收集会耗费高昂的代价,而且全部刚匹配的节点也不会被回收。为了不这种状况,咱们在 CAS 向后推动 head 时,会把已匹配的 head 的"next"引用指向自身(即“自连接节点”),这样就限制了链接已删除节点的长度(咱们也采起相似的方法,清除在其余节点字段中可能的垃圾保留值)。若是在遍历时遇到一个自连接节点,那就代表当前线程已经滞后于另一个更新 head 的线程,此时就须要从新获取 head 来遍历。
因此,在 LTQ 中,数据在某个线程的“某一时刻”可能存在下面这种形式:
static final class QNode { volatile Object item; // 节点包含的数据,非空表示生产者,空者是消费者 final boolean isData; // 表示该节点由生产者建立仍是由消费者建立,生产者true,消费者false volatile Thread waiter; // 等待在该节点上的线程。to control park/unpark volatile QNode next; // 指向队列中的下一个节点 }
Node 节点自己就是一个原子性操做,对节点的属性 item、waiter、next 都是原子性操做。
transient volatile Node head; private transient volatile Node tail; // 马上、异步、同步、超时返回 private static final int NOW = 0; // for untimed poll, tryTransfer private static final int ASYNC = 1; // for offer, put, add private static final int SYNC = 2; // for transfer, take private static final int TIMED = 3; // for timed poll, tryTransfer
LinkedTransferQueue 主要方法介绍:
public LinkedTransferQueue() { }
LinkedTransferQueue 初始化时什么也没作,也就是说 head=tail=null。
/** * @param e the item or null for take * @param haveData true if this is a put, else a take * @param how NOW, ASYNC, SYNC, or TIMED * @param nanos timeout in nanosecs, used only if mode is TIMED */ private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) throw new NullPointerException(); Node s = null; // the node to append, if needed retry: for (;;) { // restart on append race // 1. 尝试匹配 for (Node h = head, p = h; p != null;) { // find & match first node boolean isData = p.isData; Object item = p.item; // 1.2 p节点还未匹配则尝试进行匹配,为何不调用 !p.isMatched() ???? if (item != p && (item != null) == isData) { // unmatched // 1.3 两个节点的模式同样,则直接跳出循环,尝试入队 if (isData == haveData) // can't match break; // 1.4 p匹配成功 if (p.casItem(item, e)) { // match for (Node q = p; q != h;) { // 1.5 p已经匹配,直接将n设置为头节点。h -> p -> n Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry // 1.6 有其它线程更新了头节点,再次判断 slack<2。 // h -> q 若是 q.isMatched() 则能够将 q.next 设置为头节点 if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); // 唤醒等待的线程后直接返回 return LinkedTransferQueue.<E>cast(item); } } // 1.7 p==p.next 则说明p已经出队,失效了。须要从新从头节点开始匹配 Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } // 2. 到了这一步,只有未匹配上一种状况。根据how判断节点是否要入队并等待其它线程匹配 if (how != NOW) { // No matches available if (s == null) s = new Node(e, haveData); // 2.1 节点尝试入队,入队失败继续尝试 Node pred = tryAppend(s, haveData); if (pred == null) continue retry; // lost race vs opposite mode // 2.2 等待其它线程匹配成功后唤醒当前线程 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); } return e; // not waiting } }
说明:xfer 大体能够分三部分:
若是在上述操做中没有找到匹配节点,则根据参数 how 作不一样的处理:
NOW(poll, tryTransfer)
:当即返回。SYNC(transfer, take)
:经过 tryAppend 方法插入一个新的节点 s(item=e,isData = haveData)到队列尾,而后自旋或阻塞当前线程直到节点被匹配或者取消返回。ASYNC(offer, put, add)
:经过 tryAppend 方法插入一个新的节点 s(item=e,isData = haveData)到队列尾,异步直接返回。TIMED(timed poll, tryTransfer)
:经过 tryAppend 方法插入一个新的节点 s(item=e,isData = haveData)到队列尾,而后自旋或阻塞当前线程直到节点被匹配或者取消或等待超时返回。// 1. NOW(poll, tryTransfer) public boolean tryTransfer(E e) { return xfer(e, true, NOW, 0) == null; } // 2. SYNC(transfer, take) public void transfer(E e) throws InterruptedException { if (xfer(e, true, SYNC, 0) != null) { Thread.interrupted(); // failure possible only due to interrupt throw new InterruptedException(); } } // 3. ASYNC(offer, put, add) public boolean offer(E e) { xfer(e, true, ASYNC, 0); return true; } // 4. TIMED(timed poll, tryTransfer) public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) return true; if (!Thread.interrupted()) return false; throw new InterruptedException(); }
这里能够看到若是使用异步(ASYNC)的方式时线程不会阻塞,如 offer 时同一线程的数据节点也能够入队,也就是存储的数据长度再也不是 0,这也是和 SynchronousQueue 一个很大的不一样点。因此 Doug Lea 说从功能角度来说,LinkedTransferQueue 其实是 ConcurrentLinkedQueue、SynchronousQueue(公平模式)和 LinkedBlockingQueue 的超集。
private Node tryAppend(Node s, boolean haveData) { for (Node t = tail, p = t;;) { // move p to last node and append Node n, u; // temps for reads of next & tail // 1. 节点初始化 if (p == null && (p = head) == null) { if (casHead(null, s)) return s; // initialize } // 2. 节点s不能追加到p节点后。①p和s的模式不一样且②p还未匹配 else if (p.cannotPrecede(haveData)) return null; // lost race vs opposite mode // 3. p 不是尾节点 else if ((n = p.next) != null) // not last; keep traversing // t -> p 时 tail 改变则须要从新定位到尾节点 // p节点已经出队则须要从 head 开始从新遍历 p = p != t && t != (u = tail) ? (t = u) : // stale tail (p != n) ? n : null; // restart if off list // 4. 有其它线程添加节点时继续自旋,直到成功 else if (!p.casNext(null, s)) p = p.next; // re-read on CAS failure // 5. 终于添加到队列中。尝试更新尾节点 else { // 若是 p!=t 则队列状况以下,须要更新尾节点: t -> p -> s if (p != t) { // update if slack now >= 2 // 5.1 其它线程已经更新 tail,从新进行下面三个条件的判断 // 5.2 t.next.next!=null 则须要从新更新 tail。至于s!=t则是此时t没有踢出队列 while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t); } return p; } } }
说明:添加给定节点 s 到队列尾并返回 s 的前继节点;失败时(与其余不一样模式线程竞争失败)返回 null,此时 s 的前继节点 p 的模式和 s 不一样且 p 还没有被匹配,如 s 为请求节点,p 为数据节点且未匹配则不能将 s 追加到 p 后面。
/** * Spins/yields/blocks 直到s节点matched或canceled * * @param s the waiting node * @param pred s的前驱节点,若是没有前驱节点则为s本身 * @param e s节点的原始值 * @param timed true时限时等待,false时无限等待 * @param nanos timeout in nanosecs, used only if timed is true * @return matched item, or e if unmatched on interrupt or timeout */ private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); int spins = -1; // initialized after first item and cancel checks ThreadLocalRandom randomYields = null; // bound if needed for (;;) { Object item = s.item; // 1. item已经被修改,说明匹配成功。返回匹配后的值 if (item != e) { // matched // assert item != s; s.forgetContents(); // avoid garbage return LinkedTransferQueue.<E>cast(item); } // 2. 超时,返回匹配前的值 if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) { // cancel unsplice(pred, s); return e; } // 3. 设置自旋次数 if (spins < 0) { // establish spins at/near front if ((spins = spinsFor(pred, s.isData)) > 0) randomYields = ThreadLocalRandom.current(); // 4. 自旋,有很小的几率调用 yeild } else if (spins > 0) { // spin --spins; if (randomYields.nextInt(CHAINED_SPINS) == 0) Thread.yield(); // occasionally yield // 5. 设置等待线程,让其它线程唤醒 } else if (s.waiter == null) { s.waiter = w; // request unpark then recheck // 6. 阻塞直至其它线程唤醒,继续循环直到匹配成功或超时退出 } else if (timed) { nanos = deadline - System.nanoTime(); if (nanos > 0L) LockSupport.parkNanos(this, nanos); } else { LockSupport.park(this); } } }
说明:当前操做为同步操做时,会调用 awaitMatch 方法阻塞等待匹配,成功返回匹配后节点 item,超时则返回匹配前节点的 item 值 e。在等待期间若是线程被中断或等待超时,则取消匹配,并调用 unsplice 方法解除节点 s 和其前继节点的连接。
// 计算自旋次数 FRONT_SPINS=1<<7,CHAINED_SPINS=1<<6 private static int spinsFor(Node pred, boolean haveData) { if (MP && pred != null) { if (pred.isData != haveData) // phase change return FRONT_SPINS + CHAINED_SPINS; if (pred.isMatched()) // probably at front return FRONT_SPINS; if (pred.waiter == null) // pred apparently spinning return CHAINED_SPINS; } return 0; // 单核CPU时不自旋 }
// 统计数据节点个数 public int size() { return countOfMode(true); } // 统计请求节点个数 public int getWaitingConsumerCount() { return countOfMode(false); } private int countOfMode(boolean data) { int count = 0; for (Node p = head; p != null; ) { if (!p.isMatched()) { // p未匹配且属性指定的data模式则 ++count if (p.isData != data) return 0; if (++count == Integer.MAX_VALUE) // saturated break; } Node n = p.next; if (n != p) // 下一个节点 p = n; else { // p节点失效则计数器归0,从新从 head 遍历 count = 0; p = head; } } return count; }
// 查找是否包含指定的数据节点 item=o public boolean contains(Object o) { if (o == null) return false; for (Node p = head; p != null; p = succ(p)) { Object item = p.item; if (p.isData) { if (item != null && item != p && o.equals(item)) return true; } else if (item == null) // 有请求节点了,不用再匹配 break; } return false; } // 后继节点,若是节点失效,直接从 head 开始 final Node succ(Node p) { Node next = p.next; return (p == next) ? head : next; }
// 是否有请求节点 public boolean hasWaitingConsumer() { return firstOfMode(false) != null; } // 查找第一个 isData 模式的未匹配节点 private Node firstOfMode(boolean isData) { for (Node p = head; p != null; p = succ(p)) { if (!p.isMatched()) return (p.isData == isData) ? p : null; } return null; }
参考:
天天用心记录一点点。内容也许不重要,但习惯很重要!