本文参考群主的博客http://cmsblogs.com/?p=2269java
Java 并发 API 提供了一种容许2个并发任务间相互交换数据的同步应用。更具体的说,Exchanger 类容许在2个线程间定义同步点,当2个线程到达这个点,他们相互交换数据类型,使用第一个线程的数据类型变成第二个的,而后第二个线程的数据类型变成第一个的。node
package com; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Exchanger; class Producer implements Runnable { // 要被相互交换的数据类型。 private List<String> buffer; // 用来同步 producer和consumer private final Exchanger<List<String>> exchanger; public Producer(List<String> buffer, Exchanger<List<String>> exchanger) { this.buffer = buffer; this.exchanger = exchanger; } public void run() { // 实现10次交换 for (int i = 0; i < 10; i++) { buffer.add("第" + i + "次生产者的数据" + i); try { // 调用exchange方法来与consumer交换数据 System.out.println("第" + i + "次生产者在等待....."); buffer = exchanger.exchange(buffer); System.out.println("第" + i + "次生产者交换后的数据:" + buffer.get(i)); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer implements Runnable { // 用来相互交换 private List<String> buffer; // 用来同步 producer和consumer private final Exchanger<List<String>> exchanger; public Consumer(List<String> buffer, Exchanger<List<String>> exchanger) { this.buffer = buffer; this.exchanger = exchanger; } public void run() { // 实现10次交换 for (int i = 0; i < 10; i++) { buffer.add("第" + i + "次消费者的数据" + i); try { // 调用exchange方法来与consumer交换数据 System.out.println("第" + i + "次消费者在等待....."); buffer = exchanger.exchange(buffer); System.out.println("第" + i + "次消费者交换后的数据:" + buffer.get(i)); } catch (InterruptedException e) { e.printStackTrace(); } } } }
//主类 import java.util.ArrayList; import java.util.List; import java.util.concurrent.Exchanger; public class Core { public static void main(String[] args) { // 建立2个buffers,分别给producer和consumer使用 List<String> buffer1 = new ArrayList<String>(); List<String> buffer2 = new ArrayList<String>(); // 建立Exchanger对象,用来同步producer和consumer Exchanger<List<String>> exchanger = new Exchanger<List<String>>(); // 建立Producer对象和Consumer对象 Producer producer = new Producer(buffer1, exchanger); Consumer consumer = new Consumer(buffer2, exchanger); // 建立线程来执行producer和consumer并开始线程 Thread threadProducer = new Thread(producer); Thread threadConsumer = new Thread(consumer); threadProducer.start(); threadConsumer.start(); } }
在Exchanger中,若是一个线程已经到达了exchanger节点时,对于它的伙伴节点的状况有三种:算法
Exchanger算法的核心是经过一个可交换数据的slot,以及一个能够带有数据item的参与者。数组
for (;;) { if (slot is empty) { // offer place item in a Node; if (can CAS slot from empty to node) { wait for release; return matching item in node; } } else if (can CAS slot from node to empty) { // release get the item in node; set matching item in node; release waiting thread; } // else retry on CAS failure }
Exchanger中定义了以下几个重要的成员变量:并发
private final Participant participant; private volatile Node[] arena; private volatile Node slot;
participant的做用是为每一个线程保留惟一的一个Node节点。app
slot为单个槽,arena为数组槽。他们都是Node类型。在这里可能会感受到疑惑,slot做为Exchanger交换数据的场景,应该只须要一个就能够了啊?为什么还多了一个Participant 和数组类型的arena呢?一个slot交换场所原则上来讲应该是能够的,但实际状况却不是如此,多个参与者使用同一个交换场所时,会存在严重伸缩性问题。既然单个交换场所存在问题,那么咱们就安排多个,也就是数组arena。经过数组arena来安排不一样的线程使用不一样的slot来下降竞争问题,而且能够保证最终必定会成对交换数据。可是Exchanger不是一来就会生成arena数组来下降竞争,只有当产生竞争是才会生成arena数组。那么怎么将Node与当前线程绑定呢?Participant ,Participant 的做用就是为每一个线程保留惟一的一个Node节点,它继承ThreadLocal,同时在Node节点中记录在arena中的下标index。ide
Node定义以下:this
@sun.misc.Contended static final class Node { int index; // arena的下标; int bound; // 上一次记录的Exchanger.bound int collides; // 在当前bound下CAS失败的次数 int hash; // 伪随机数,用于自旋; Object item; // 这个线程的当前项,也就是须要交换的数据; volatile Object match; // 作releasing操做的线程传递的项; volatile Thread parked; //挂起时设置线程值,其余状况下为null; }
exchange(V x):等待另外一个线程到达此交换点(除非当前线程被中断),而后将给定的对象传送给该线程,并接收该线程的对象。spa
public V exchange(V x) throws InterruptedException { Object v; Object item = (x == null) ? NULL_ITEM : x; // translate null args if ((arena != null || (v = slotExchange(item, false, 0L)) == null) && ((Thread.interrupted() || // disambiguates null return (v = arenaExchange(item, false, 0L)) == null))) throw new InterruptedException(); return (v == NULL_ITEM) ? null : (V)v; }
这个方法比较好理解:arena为数组槽,若是为null,则执行slotExchange()方法,不然判断线程是否中断,若是中断值抛出InterruptedException异常,没有中断则执行arenaExchange()方法。整套逻辑就是:若是slotExchange(Object item, boolean timed, long ns)方法执行失败了就执行arenaExchange(Object item, boolean timed, long ns)方法,最后返回结果V。线程
NULL_ITEM 为一个空节点,其实就是一个Object对象而已,slotExchange()为单个slot交换。
private final Object slotExchange(Object item, boolean timed, long ns) { // 获取当前线程的节点 p Node p = participant.get(); // 当前线程 Thread t = Thread.currentThread(); // 线程中断,直接返回 if (t.isInterrupted()) return null; // 自旋 for (Node q;;) { //slot != null if ((q = slot) != null) { //尝试CAS替换 if (U.compareAndSwapObject(this, SLOT, q, null)) { Object v = q.item; // 当前线程的项,也就是交换的数据 q.match = item; // 作releasing操做的线程传递的项 Thread w = q.parked; // 挂起时设置线程值 // 挂起线程不为null,线程挂起 if (w != null) U.unpark(w); return v; } //若是失败了,则建立arena //bound 则是上次Exchanger.bound if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ)) arena = new Node[(FULL + 2) << ASHIFT]; } //若是arena != null,直接返回,进入arenaExchange逻辑处理 else if (arena != null) return null; else { p.item = item; if (U.compareAndSwapObject(this, SLOT, null, p)) break; p.item = null; } } /* * 等待 release * 进入spin+block模式 */ int h = p.hash; long end = timed ? System.nanoTime() + ns : 0L; int spins = (NCPU > 1) ? SPINS : 1; Object v; while ((v = p.match) == null) { if (spins > 0) { h ^= h << 1; h ^= h >>> 3; h ^= h << 10; if (h == 0) h = SPINS | (int)t.getId(); else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); } else if (slot != p) spins = SPINS; else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) { U.putObject(t, BLOCKER, this); p.parked = t; if (slot == p) U.park(false, ns); p.parked = null; U.putObject(t, BLOCKER, null); } else if (U.compareAndSwapObject(this, SLOT, p, null)) { v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; break; } } U.putOrderedObject(p, MATCH, null); p.item = null; p.hash = h; return v; }
程序首先经过participant获取当前线程节点Node。检测是否中断,若是中断return null,等待后续抛出InterruptedException异常。
若是slot不为null,则进行slot消除,成功直接返回数据V,不然失败,则建立arena消除数组。
若是slot为null,但arena不为null,则返回null,进入arenaExchange逻辑。
若是slot为null,且arena也为null,则尝试占领该slot,失败重试,成功则跳出循环进入spin+block(自旋+阻塞)模式。
在自旋+阻塞模式中,首先取得结束时间和自旋次数。若是match(作releasing操做的线程传递的项)为null,其首先尝试spins+随机次自旋(改自旋使用当前节点中的hash,并改变之)和退让。当自旋数为0后,假如slot发生了改变(slot != p)则重置自旋数并重试。不然假如:当前未中断&arena为null&(当前不是限时版本或者限时版本+当前时间未结束):阻塞或者限时阻塞。假如:当前中断或者arena不为null或者当前为限时版本+时间已经结束:不限时版本:置v为null;限时版本:若是时间结束以及未中断则TIMED_OUT;不然给出null(缘由是探测到arena非空或者当前线程中断)。
match不为空时跳出循环。
private final Object arenaExchange(Object item, boolean timed, long ns) { Node[] a = arena; Node p = participant.get(); for (int i = p.index;;) { // access slot at i int b, m, c; long j; // j is raw array offset Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); if (q != null && U.compareAndSwapObject(a, j, q, null)) { Object v = q.item; // release q.match = item; Thread w = q.parked; if (w != null) U.unpark(w); return v; } else if (i <= (m = (b = bound) & MMASK) && q == null) { p.item = item; // offer if (U.compareAndSwapObject(a, j, null, p)) { long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; Thread t = Thread.currentThread(); // wait for (int h = p.hash, spins = SPINS;;) { Object v = p.match; if (v != null) { U.putOrderedObject(p, MATCH, null); p.item = null; // clear for next use p.hash = h; return v; } else if (spins > 0) { h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift if (h == 0) // initialize hash h = SPINS | (int)t.getId(); else if (h < 0 && // approx 50% true (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); // two yields per wait } else if (U.getObjectVolatile(a, j) != p) spins = SPINS; // releaser hasn't set match yet else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) { U.putObject(t, BLOCKER, this); // emulate LockSupport p.parked = t; // minimize window if (U.getObjectVolatile(a, j) == p) U.park(false, ns); p.parked = null; U.putObject(t, BLOCKER, null); } else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) { if (m != 0) // try to shrink U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); p.item = null; p.hash = h; i = p.index >>>= 1; // descend if (Thread.interrupted()) return null; if (timed && m == 0 && ns <= 0L) return TIMED_OUT; break; // expired; restart } } } else p.item = null; // clear offer } else { if (p.bound != b) { // stale; reset p.bound = b; p.collides = 0; i = (i != m || m == 0) ? m : m - 1; } else if ((c = p.collides) < m || m == FULL || !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { p.collides = c + 1; i = (i == 0) ? m : i - 1; // cyclically traverse } else i = m + 1; // grow p.index = i; } } }
首先经过participant取得当前节点Node,而后根据当前节点Node的index去取arena中相对应的节点node。前面提到过arena能够确保不一样的slot在arena中是不会相冲突的,那么是怎么保证的呢?
arena = new Node[(FULL + 2) << ASHIFT];
取得arena中的node节点后,若是定位的节点q 不为空,且CAS操做成功,则交换数据,返回交换的数据,唤醒等待的线程。
若是q等于null且下标在bound & MMASK范围以内,则尝试占领该位置,若是成功,则采用自旋 + 阻塞的方式进行等待交换数据。
若是下标不在bound & MMASK范围以内获取因为q不为null可是竞争失败的时候:消除p。加入bound 不等于当前节点的bond(b != p.bound),则更新p.bound = b,collides = 0 ,i = m或者m – 1。若是冲突的次数不到m 获取m 已经为最大值或者修改当前bound的值失败,则经过增长一次collides以及循环递减下标i的值;不然更新当前bound的值成功:咱们令i为m+1即为此时最大的下标。最后更新当前index的值。
Exchanger使用、原理都比较好理解,可是这个源码看起来真心有点儿复杂,是真心难看懂,可是这种交换的思路Doug Lea在后续博文中还会提到,例如SynchronousQueue、LinkedTransferQueue。
其实就是”我”和”你”(可能有多个”我”,多个”你”)在一个叫Slot的地方作交易(一手交钱,一手交货),过程分如下步骤: