Exchanger 是一个用于线程间协做的工具类,Exchanger用于进行线程间的数据交换,它提供一个同步点,在这个同步点,两个线程能够交换彼此的数据。这两个线程经过exchange 方法交换数据,若是第一个线程先执行exchange 方法,它会一直等待第二个线程也执行exchange 方法,当两个线程都到达同步点时,这两个线程就能够交换数据。html
源码:java
package java.util.concurrent; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; public class Exchanger<V> { private static final int ASHIFT = 7; private static final int MMASK = 0xff; private static final int SEQ = MMASK + 1; private static final int NCPU = Runtime.getRuntime().availableProcessors(); static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; private static final int SPINS = 1 << 10;// 自旋次数 private static final Object NULL_ITEM = new Object();//若是交换的数据为null,则用NULL_ITEM代替 private static final Object TIMED_OUT = new Object(); private final Participant participant;//每一个线程的数据,ThreadLocal 子类 private volatile Node[] arena; private volatile Node slot;// 用于交换数据的槽位 private volatile int bound; @sun.misc.Contended static final class Node { int index; //arena的下标,多个槽位的时候利用 int bound; // 上一次记录的Exchanger.bound; int collides; // 在当前bound下CAS失败的次数; int hash; // 用于自旋; Object item; // 这个线程的当前项,也就是须要交换的数据; volatile Object match; // 交换的数据 volatile Thread parked; // 线程 } static final class Participant extends ThreadLocal<Node> { // 初始值返回Node public Node initialValue() { return new Node(); } } private final Object arenaExchange(Object item, boolean timed, long ns) { // 槽位数组 Node[] a = arena; //表明当前线程的Node Node p = participant.get(); // p.index 初始值为 0 for (int i = p.index; ; ) { // access slot at i int b, m, c; long j; // j is raw array offset //在槽位数组中根据"索引" i 取出数据 j至关因而 "第一个"槽位 Node q = (Node) U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); // 该位置上有数据(即有线程在这里等待交换数据) if (q != null && U.compareAndSwapObject(a, j, q, null)) { // 进行数据交换,这里和单槽位的交换是同样的 Object v = q.item; // release q.match = item; Thread w = q.parked; if (w != null) U.unpark(w); return v; } // bound 是最大的有效的 位置,和MMASK相与,获得真正的存储数据的索引最大值 else if (i <= (m = (b = bound) & MMASK) && q == null) { // i 在这个范围内,该槽位也为空 //将须要交换的数据 设置给p p.item = item; // offer //设置该槽位数据(在该槽位等待其它线程来交换数据) if (U.compareAndSwapObject(a, j, null, p)) { long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; Thread t = Thread.currentThread(); // wait // 进行必定时间的自旋 for (int h = p.hash, spins = SPINS; ; ) { Object v = p.match; //在自旋的过程当中,有线程来和该线程交换数据 if (v != null) { //交换数据后,清空部分设置,返回交换获得的数据,over U.putOrderedObject(p, MATCH, null); p.item = null; // clear for next use p.hash = h; return v; } else if (spins > 0) { h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift if (h == 0) // initialize hash h = SPINS | (int) t.getId(); else if (h < 0 && // approx 50% true (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); // two yields per wait } // 交换数据的线程到来,可是尚未设置好match,再稍等一会 else if (U.getObjectVolatile(a, j) != p) spins = SPINS; //符合条件,特别注意m==0 这个说明已经到达area 中最小的存储数据槽位了 //没有其余线程在槽位等待了,全部当前线程须要阻塞在这里 else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) { U.putObject(t, BLOCKER, this); // emulate LockSupport p.parked = t; // minimize window // 再次检查槽位,看看在阻塞前,有没有线程来交换数据 if (U.getObjectVolatile(a, j) == p) U.park(false, ns); // 挂起 p.parked = null; U.putObject(t, BLOCKER, null); } // 当前这个槽位一直没有线程来交换数据,准备换个槽位试试 else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) { //更新bound if (m != 0) // try to shrink U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); p.item = null; p.hash = h; // 减少索引值 往"第一个"槽位的方向挪动 i = p.index >>>= 1; // descend // 发送中断,返回null if (Thread.interrupted()) return null; // 超时 if (timed && m == 0 && ns <= 0L) return TIMED_OUT; break; // expired; restart 继续主循环 } } } else //占据槽位失败,先清空item,防止成功交换数据后,p.item还引用着item p.item = null; // clear offer } else { // i 不在有效范围,或者被其它线程抢先了 //更新p.bound if (p.bound != b) { // stale; reset p.bound = b; //新bound ,重置collides p.collides = 0; //i若是达到了最大,那么就递减 i = (i != m || m == 0) ? m : m - 1; } else if ((c = p.collides) < m || m == FULL || !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { p.collides = c + 1; // 更新冲突 // i=0 那么就从m开始,不然递减i i = (i == 0) ? m : i - 1; // cyclically traverse } else //递增,日后挪动 i = m + 1; // grow // 更新index p.index = i; } } } private final Object slotExchange(Object item, boolean timed, long ns) { // 获得一个初试的Node Node p = participant.get(); // 当前线程 Thread t = Thread.currentThread(); // 若是发生中断,返回null,会重设中断标志位,并无直接抛异常 if (t.isInterrupted()) // preserve interrupt status so caller can recheck return null; for (Node q;;) { // 槽位 solt不为null,则说明已经有线程在这里等待交换数据了 if ((q = slot) != null) { // 重置槽位 if (U.compareAndSwapObject(this, SLOT, q, null)) { //获取交换的数据 Object v = q.item; //等待线程须要的数据 q.match = item; //等待线程 Thread w = q.parked; //唤醒等待的线程 if (w != null) U.unpark(w); return v; // 返回拿到的数据,交换完成 } // create arena on contention, but continue until slot null //存在竞争,其它线程抢先了一步该线程,所以须要采用多槽位模式,这个后面再分析 if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ)) arena = new Node[(FULL + 2) << ASHIFT]; }else if (arena != null) //多槽位不为空,须要执行多槽位交换 return null; // caller must reroute to arenaExchange else { //尚未其余线程来占据槽位 p.item = item; // 设置槽位为p(也就是槽位被当前线程占据) if (U.compareAndSwapObject(this, SLOT, null, p)) break; // 退出无限循环 p.item = null; // 若是设置槽位失败,则有可能其余线程抢先了,重置item,从新循环 } } //当前线程占据槽位,等待其它线程来交换数据 int h = p.hash; long end = timed ? System.nanoTime() + ns : 0L; int spins = (NCPU > 1) ? SPINS : 1; Object v; // 直到成功交换到数据 while ((v = p.match) == null) { if (spins > 0) { // 自旋 h ^= h << 1; h ^= h >>> 3; h ^= h << 10; if (h == 0) h = SPINS | (int)t.getId(); else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) // 主动让出cpu,这样能够提供cpu利用率(反正当前线程也自旋等待,还不如让其它任务占用cpu) Thread.yield(); } else if (slot != p) //其它线程来交换数据了,修改了solt,可是尚未设置match,再稍等一会 spins = SPINS; //须要阻塞等待其它线程来交换数据 //没发生中断,而且是单槽交换,没有设置超时或者超时时间未到 则继续执行 else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) { // cas 设置BLOCKER,能够参考Thread 中的parkBlocker U.putObject(t, BLOCKER, this); // 须要挂起当前线程 p.parked = t; if (slot == p) U.park(false, ns); // 阻塞当前线程 // 被唤醒后 p.parked = null; // 清空 BLOCKER U.putObject(t, BLOCKER, null); } // 不知足前面 else if 条件,交换失败,须要重置solt else if (U.compareAndSwapObject(this, SLOT, p, null)) { v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; break; } } //清空match U.putOrderedObject(p, MATCH, null); p.item = null; p.hash = h; // 返回交换获得的数据(失败则为null) return v; } public Exchanger() { participant = new Participant(); } public V exchange(V x) throws InterruptedException { Object v; Object item = (x == null) ? NULL_ITEM : x; if ((arena != null || (v = slotExchange(item, false, 0L)) == null) && ((Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null))) throw new InterruptedException(); return (v == NULL_ITEM) ? null : (V)v; } public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { Object v; Object item = (x == null) ? NULL_ITEM : x; long ns = unit.toNanos(timeout); if ((arena != null || (v = slotExchange(item, true, ns)) == null) && ((Thread.interrupted() || (v = arenaExchange(item, true, ns)) == null))) throw new InterruptedException(); if (v == TIMED_OUT) throw new TimeoutException(); return (v == NULL_ITEM) ? null : (V)v; } private static final sun.misc.Unsafe U; private static final long BOUND; private static final long SLOT; private static final long MATCH; private static final long BLOCKER; private static final int ABASE; static { int s; try { U = sun.misc.Unsafe.getUnsafe(); Class<?> ek = Exchanger.class; Class<?> nk = Node.class; Class<?> ak = Node[].class; Class<?> tk = Thread.class; BOUND = U.objectFieldOffset (ek.getDeclaredField("bound")); SLOT = U.objectFieldOffset (ek.getDeclaredField("slot")); MATCH = U.objectFieldOffset (nk.getDeclaredField("match")); BLOCKER = U.objectFieldOffset (tk.getDeclaredField("parkBlocker")); s = U.arrayIndexScale(ak); ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT); } catch (Exception e) { throw new Error(e); } if ((s & (s-1)) != 0 || s > (1 << ASHIFT)) throw new Error("Unsupported array scale"); } }
类型参数:api
V
- 能够交换的对象类型数组
每一个线程将条目上的某个方法呈现给exchange()
方法,与伙伴线程进行匹配,而且在返回时接收其伙伴的对象。app
用法示例:使用 Exchanger
在线程间交换缓冲区。在须要时,填充缓冲区的线程获取一个新腾空的缓冲区,并将填满的缓冲区传递给腾空缓冲区的线程。
ide
class FillAndEmpty { Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>(); DataBuffer initialEmptyBuffer = ... DataBuffer initialFullBuffer = ... class FillingLoop implements Runnable { public void run() { DataBuffer currentBuffer = initialEmptyBuffer; try { while (currentBuffer != null) { addToBuffer(currentBuffer); if (currentBuffer.isFull()) currentBuffer = exchanger.exchange(currentBuffer); } } catch (InterruptedException ex) { ... handle ... } } } class EmptyingLoop implements Runnable { public void run() { DataBuffer currentBuffer = initialFullBuffer; try { while (currentBuffer != null) { takeFromBuffer(currentBuffer); if (currentBuffer.isEmpty()) currentBuffer = exchanger.exchange(currentBuffer); } } catch (InterruptedException ex) { ... handle ...} } } void start() { new Thread(new FillingLoop()).start(); new Thread(new EmptyingLoop()).start(); } }
Exchanger() 建立一个新的 Exchanger。 |
V |
exchange(V x) 等待另外一个线程到达此交换点(除非当前线程被中断),而后将给定的对象传送给该线程,并接收该线程的对象。 |
V |
exchange(V x, long timeout, TimeUnit unit) 等待另外一个线程到达此交换点(除非当前线程被中断,或者超出了指定的等待时间),而后将给定的对象传送给该线程,同时接收该线程的对象。 |
public Exchanger()
建立一个新的 Exchanger。工具
public V exchange(V x) throws InterruptedException
等待另外一个线程到达此交换点(除非当前线程被 中断),而后将给定的对象传送给该线程,并接收该线程的对象。oop
若是另外一个线程已经在交换点等待,则出于线程调度目的,继续执行此线程,并接收当前线程传入的对象。当前线程当即返回,接收其余线程传递的交换对象。this
若是尚未其余线程在交换点等待,则出于调度目的,禁用当前线程,且在发生如下两种状况之一前,该线程将一直处于休眠状态:atom
中断
当前线程。若是当前线程:
则抛出 InterruptedException
,而且清除当前线程的已中断状态。
参数:
x
- 要交换的对象
返回:
另外一个线程提供的对象
抛出:
InterruptedException
- 若是当前线程在等待时被中断
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
等待另外一个线程到达此交换点(除非当前线程被 中断,或者超出了指定的等待时间),而后将给定的对象传送给该线程,同时接收该线程的对象。
若是另外一个线程已经在交换点上等待,则出于线程调度目的,继续执行此线程,并接收当前线程传入的对象。当前线程当即返回,并接收其余线程传递的交换对象。
若是尚未其余线程在交换点等待,则出于调度目的,禁用当前线程,且在发生如下三种状况之一前,该线程将一直处于休眠状态:
若是当前线程:
则抛出 InterruptedException
,而且清除当前线程的已中断状态。
若是超出指定的等待时间,则抛出 TimeoutException
异常。若是该时间小于等于零,则此方法根本不会等待。
参数:
x
- 要交换的对象
timeout
- 要等待的最长时间
unit
- timeout 参数的时间单位
返回:
其余线程提供的对象
抛出:
InterruptedException
- 若是当前线程在等待时被中断
TimeoutException
- 若是在另外一个线程进入交换点以前已经到达指定的等待时间
线程交换各自拥有的值:
package com.thread; import java.util.concurrent.Exchanger; public class ExchangerDemo extends Thread { private Exchanger<String> exchanger; private String name; public ExchangerDemo(String name, Exchanger<String> exchanger) { this.exchanger = exchanger; this.name = name; } public void run() { try { System.out.println(Thread.currentThread().getName() + ": " + exchanger.exchange(this.name)); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<>(); ExchangerDemo exchangerDemo1 = new ExchangerDemo("demo1", exchanger); exchangerDemo1.setName("exchanger1"); ExchangerDemo exchangerDemo2 = new ExchangerDemo("demo2", exchanger); exchangerDemo2.setName("exchanger2"); exchangerDemo1.start(); exchangerDemo2.start(); } }
运行结果:
exchanger1: demo2
exchanger2: demo1
从输出的值能够看到,两个线程的值已经发生了交换。