系列传送门:html
LinkedTransferQueue在JDK1.7版本诞生,是由链表组成的无界TransferQueue,相对于其余阻塞队列,多了tryTransfer和transfer方法。java
TransferQueue:生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费(不只仅是添加到队列里就完事)。新添加的transfer方法用来实现这种约束。顾名思义,阻塞就是发生在元素从一个线程transfer到另外一个线程的过程当中,它有效地实现了元素在线程之间的传递(以创建Java内存模型中的happens-before关系的方式)。node
http://cs.oswego.edu/pipermail/concurrency-interest/2009-February/005888.html算法
Doug Lea评价TransferQueue是ConcurrentLinkedQueue、SynchronousQueue(在公平模式下)、无界的LinkedBlockingQueue等的超集,功能十分强大,最重要的是,它的实现也更加的高效。编程
总结:基于无锁CAS方式实现的无界FIFO队列。安全
public class LinkedTransferQueue<E> extends AbstractQueue<E> implements TransferQueue<E>, java.io.Serializable { //... }
LinkedTransferQueue不一样于其余的阻塞队列,它实现了TransferQueue接口,这必定是核心所在,咱们直接来看看接口定义的方法规范:并发
// 继承了BlockingQueue接口,并增长若干新方法 public interface TransferQueue<E> extends BlockingQueue<E> { /** * 将元素 传给等待的消费者【若是有的话】, 返回true, 若是不存在,返回false,不入队。 */ boolean tryTransfer(E e); /** * 将元素传递给等待的消费者【若是有的话】, 若是没有,则将e插入队列尾部, * 会一直等待,直到它被消费者接收 */ void transfer(E e) throws InterruptedException; /** * 在transfer的基础上,增长了超时操做,时间到了尚未被消费的话,返回false,并移除元素 */ boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException; /** * 若是存在消费者线程,返回true */ boolean hasWaitingConsumer(); /** * 获得等待获取元素的消费者线程的数量 */ int getWaitingConsumerCount(); }
public class LinkedTransferQueue<E> extends AbstractQueue<E> implements TransferQueue<E>, java.io.Serializable { private static final long serialVersionUID = -3223113410248163686L; /** 是否为多核处理器 */ private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; /** * 当一个节点目前是队列的第一个waiter时,阻塞前的自旋次数 */ private static final int FRONT_SPINS = 1 << 7; /** * 前驱节点正在处理,当前节点须要自旋的次数 */ private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; /** * */ static final int SWEEP_THRESHOLD = 32; // 队列中的节点 static final class Node {...} // 头节点 transient volatile Node head; /** 尾指针,注意可能不是最后一个节点,初始化为null */ private transient volatile Node tail; /** 删除节点失败的次数 */ private transient volatile int sweepVotes; /* * xfer方法中使用,定义how,解释很清楚了,每一个变量对应不一样的方法 */ private static final int NOW = 0; // for untimed poll, tryTransfer private static final int ASYNC = 1; // for offer, put, add private static final int SYNC = 2; // for transfer, take private static final int TIMED = 3; // for timed poll, tryTransfer
有耐心的同窗其实能够看一下javadoc的介绍,LinkedTransferQueue使用的队列结构实际上是这样的:是
slack dual queue
,他和普通的M&S dual queue
的区别在于,它不会每次操做的时候都更新head或tail,而是保持有针对性的slack懈怠,因此它的结构多是下面这样,tail指针指向的节点未必就是最后一个节点。apphead tail | | v v M -> M -> U -> U -> U -> U
Node节点的结构其实和SynchronousQueue公平模式差不太多,这一次看起来就比较清晰了,这边再总结一下,主要包含几个部分:几个重要字段,以及一些CAS方法。less
static final class Node { final boolean isData; // isData == true表示存数据,不然为获取数据 volatile Object item; // 存数据,item非null, 获取数据,匹配后,item为null volatile Node next; // next域 volatile Thread waiter; // 等待线程 // CAS操做next域 若是next为cmp,则变为val final boolean casNext(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // CAS操做item域,若是item为cmp,变为val final boolean casItem(Object cmp, Object val) { // assert cmp == null || cmp.getClass() != Node.class; return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } // 构造器 Node(Object item, boolean isData) { UNSAFE.putObject(this, itemOffset, item); // relaxed write this.isData = isData; } // 将next指向自身this final void forgetNext() { UNSAFE.putObject(this, nextOffset, this); } // 匹配或取消节点调用 final void forgetContents() { UNSAFE.putObject(this, itemOffset, this); UNSAFE.putObject(this, waiterOffset, null); } /** * 判断节点是否已经匹配,匹配取消也为true */ final boolean isMatched() { Object x = item; return (x == this) || ((x == null) == isData); } /** * 是否为一个未匹配的请求 item为null表示未匹配 */ final boolean isUnmatchedRequest() { return !isData && item == null; } /** * 若是给定的节点不能挂到当前节点后面,则返回true */ final boolean cannotPrecede(boolean haveData) { boolean d = isData; Object x; return d != haveData && (x = item) != this && (x != null) == d; } /** * 尝试去匹配一个数据节点 */ final boolean tryMatchData() { // assert isData; Object x = item; if (x != null && x != this && casItem(x, null)) { LockSupport.unpark(waiter); return true; } return false; } private static final long serialVersionUID = -3375979862319811754L; // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; private static final long waiterOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); waiterOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("waiter")); } catch (Exception e) { throw new Error(e); } } }
咱们接下来将会介绍LinkedTransferQueue提供的各类操做,他们都会调用一个方法:xfer。dom
这里咱们暂且不谈具体的实现,咱们只须要知道一下这个方法的四个入参分别是什么意思。
/** * xfer方法实现了全部的队列方法 * * @param e take操做传入null, 不然传入具体元素 * @param haveData put操做为true, take操做为false * @param how NOW, ASYNC, SYNC, or TIMED 不一样字段,先从名称上猜想一下他们的大意 * @param nanos 若是是TIMED模式,也就是具备超时机制的方法啦,具体超时的时间 * @return an item if matched, else e 返回匹配的元素,不然返回e * @throws NullPointerException 插入null值抛出空指针异常: haveData==true && e == null */ private E xfer(E e, boolean haveData, int how, long nanos) { // }
接下来咱们将分几类来分别看一下各类操做的定义。
LinkedTransferQueue是无界的,下面三个插入方法不会阻塞,他们都调用了xfer方法,传入元素e,havaData为true,how字段类型都为SYNC
。
public void put(E e) { xfer(e, true, ASYNC, 0); } public boolean offer(E e, long timeout, TimeUnit unit) { xfer(e, true, ASYNC, 0); return true; } public boolean offer(E e) { xfer(e, true, ASYNC, 0); return true; } public boolean add(E e) { xfer(e, true, ASYNC, 0); return true; }
// take public E take() throws InterruptedException { E e = xfer(null, false, SYNC, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); } // timed poll public E poll(long timeout, TimeUnit unit) throws InterruptedException { E e = xfer(null, false, TIMED, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) return e; throw new InterruptedException(); } // untimed poll public E poll() { return xfer(null, false, NOW, 0); }
一样的,获取元素的方法也都调用了xfer方法,他们都传入null,havaData都为false,可是传入的how字段类型不一样:
public boolean tryTransfer(E e) { return xfer(e, true, NOW, 0) == null; } public void transfer(E e) throws InterruptedException { if (xfer(e, true, SYNC, 0) != null) { Thread.interrupted(); // failure possible only due to interrupt throw new InterruptedException(); } } public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) return true; if (!Thread.interrupted()) return false; throw new InterruptedException(); }
xfer方法的实现,做者已经在注释中说的十分清楚啦,这边简单看下三个核心步骤,细节部分下面会学习。
一、Try to match an existing node 尝试去匹配一个节点
二、Try to append a new node (method tryAppend) 尝试将一个节点入队,对应tryAppend方法
三、Await match or cancellation (method awaitMatch) 阻塞等待一个节点被匹配或取消,对应awaitMatch方法
这个方法必然是核心方法了,毕竟它能够实现队列中提供的全部操做。
private E xfer(E e, boolean haveData, int how, long nanos) { // 若是 是插入的数据为null, 则NPE if (haveData && (e == null)) throw new NullPointerException(); Node s = null; // the node to append, if needed retry: for (;;) { // restart on append race // 第一次插入数据的时候,不会进入这个循环,由于p == null // 不然进入这个循环,从head首节点开始 for (Node h = head, p = h; p != null;) { // find & match first node boolean isData = p.isData; Object item = p.item; // 找到还未匹配的节点: isData的item应该是为非null, 若是是null代表用过了 if (item != p && (item != null) == isData) { // unmatched // 节点类型和当前类型一致,没法匹配 if (isData == haveData) // can't match break; // 将参数加入到item域, if (p.casItem(item, e)) { // match // 下面这个for循环,是匹配item以后进行的额外操做, // 好比将head更新为当前这个点 for (Node q = p; q != h;) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } // 阻塞线程 LockSupport.unpark(p.waiter); // 返回item值 return LinkedTransferQueue.<E>cast(item); } } // 若是节点已经匹配过了,向后 Node n = p.next; // p != n的状况很简单,将p移到n的位置, p==n表示什么呢? // 其实若是p.next == p 说明p节点已经被其余线程处理,那么p就从head开始 p = (p != n) ? n : (h = head); // Use head if p offlist } // 尚未找到能够匹配的点的话,会走到这 // 这里 若是 how 字段传入为 NOW ,便不会走里面的逻辑, // 也就是说untimed poll、 tryTransfer 不须要将元素入队 if (how != NOW) { // No matches available // 这里构造一个节点 if (s == null) s = new Node(e, haveData); // 初始化以后,调用tryAppend入队, 返回前驱节点 Node pred = tryAppend(s, haveData); // pred == null表示竞争失败,返回到retry的地方 if (pred == null) continue retry; // lost race vs opposite mode // 若是是ASYNC会跳过这里,马上返回e,不须要阻塞 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); } return e; // not waiting } }
核心流程:
直接上图吧:
tryAppend包含入队的逻辑,返回前驱节点。代码充分考虑到并发状况,仍是比较难懂的,若是要看明白,能够在图上画一画节点的变化。
private Node tryAppend(Node s, boolean haveData) { for (Node t = tail, p = t;;) { // move p to last node and append Node n, u; // temps for reads of next & tail // p == null && head == null 表示此时队头还未初始化 if (p == null && (p = head) == null) { // cas设置s为队头 if (casHead(null, s)) return s; // initialize } // 这里检测到异常状况,返回null,以后会continue retry; else if (p.cannotPrecede(haveData)) return null; // lost race vs opposite mode // 这里就是p一直找到tail的位置, else if ((n = p.next) != null) // not last; keep traversing // 这段... 吐槽一下 p = p != t && t != (u = tail) ? (t = u) : // stale tail (p != n) ? n : null; // restart if off list // 尝试将s插到队尾,若是失败,说明其余线程先插了,那么p向后移,重新开始 else if (!p.casNext(null, s)) p = p.next; // re-read on CAS failure else { if (p != t) { // update if slack now >= 2 // 这里会设置s为tail,若是成功的话,就退出循环了, // 若是失败的话,会进行后面的判断,一开始tail其实都是null的 // while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t); } // 返回加入节点的前驱节点 return p; } } }
该方法从当前的tail开始,找到实际的最后一个节点【前面说了,tail可能不是最后一个节点】,并尝试追加一个新的节点【若是head为null,则创建第一个节点】。
成功追加后,若是how为ASYNC,则返回。
注意:仅当它的前面节点都已经匹配或mode相同时,才能够追加节点。若是检测到其余的状况,咱们须要直接返回null,从新启动retry。
awaitMatch方法其实和SynchronousQueue的awaitFulfill逻辑差很少,线程会有三种状况:spins/yield/blocks
,直到node s被匹配或取消。
On multiprocessors, we use front-of-queue spinning: If a node appears to be the first unmatched node in the queue, it spins a bit before blocking.
若是一个节点可能会优先被匹配呢,它会优先选择自旋而不是阻塞,自旋次数到了才阻塞,主要是考虑到阻塞、唤醒须要消耗更多的资源。这边简单过一下:
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 自旋次数 int spins = -1; // initialized after first item and cancel checks // 这里是线程安全的Random ThreadLocalRandom randomYields = null; // bound if needed for (;;) { Object item = s.item; // if (item != e) { // matched // assert item != s; s.forgetContents(); // avoid garbage return LinkedTransferQueue.<E>cast(item); } // 若是中断或超时 ,就cas设置s的item为e if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) { // cancel // 断开 unsplice(pred, s); return e; } // 计算自旋次数 if (spins < 0) { // establish spins at/near front if ((spins = spinsFor(pred, s.isData)) > 0) randomYields = ThreadLocalRandom.current(); } else if (spins > 0) { // spin --spins; // 这里做者提示:虽然偶尔执行yield的收益不是很明显 // 但仍限制了 自旋对busy system 的影响 if (randomYields.nextInt(CHAINED_SPINS) == 0) Thread.yield(); // occasionally yield } // 设置一下waiter线程,标记一下谁在等 else if (s.waiter == null) { s.waiter = w; // request unpark then recheck } // 超时阻塞 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos > 0L) LockSupport.parkNanos(this, nanos); } // 自旋完仍是没有匹配,就park住 else { LockSupport.park(this); } } }
最后,来看个简单的案例吧。
/** * @author Summerday */ public class TestTransferQueue { // 无锁算法 无界队列 static TransferQueue<Integer> queue = new LinkedTransferQueue<>(); public static void main (String[] args) { for (int i = 1; i <= 10; i++) { new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + "消费 id - " + queue.take()); System.out.println("---------------------------------------------"); } catch (InterruptedException e) { e.printStackTrace(); } }, "consumer" + i).start(); } Thread producer = new Thread(() -> { while (true) { System.out.println("当前队列中等待的线程" + queue.getWaitingConsumerCount()); // 若是队列中有等待的消费者 if (queue.hasWaitingConsumer()) { int product = new Random().nextInt(500); try { System.out.println(Thread.currentThread().getName() + "生产 id - " + product); queue.tryTransfer(product); TimeUnit.MILLISECONDS.sleep(100); // 等待消费 } catch (InterruptedException e) { e.printStackTrace(); } } } }, "producer"); producer.setDaemon(true); producer.start(); } } // 打印结果: 当前队列中等待的线程10 producer生产 id - 266 consumer1消费 id - 266 --------------------------------------------- 当前队列中等待的线程9 producer生产 id - 189 consumer2消费 id - 189 --------------------------------------------- //... 省略
LinkedTransferQueue在JDK1.7版本诞生,是由链表组成的无界TransferQueue,相对于其余阻塞队列,不只多了tryTransfer和transfer方法,并且性能方面也有巨大的提高。
LinkedTransferQueue使用的队列结构是slack dual queue
,不会每次操做的时候都更新head或tail,而是保持有针对性的slack懈怠。
LinkedTransferQueue的全部队列操做都基于xfer方法,具体状况根据传入的how字段决定:NOW节点不入队,ASYNC节点入队但会当即返回,SYNC节点入队且阻塞,TIMED对应超时机制。
xfer的实现分为三个流程:
最后:具体步骤能够查看上面的解析,若有不足,望评论区指教。