Exchanger是在两个任务之间交换对象的栅栏。当两个任务进入栅栏时,它们各自拥有一个对象,当它们离开时,它们都拥有对方的对象。Exchanger的典型应用场景是:一个任务在建立对象,而这些对象的生产代价很高,另外一个任务在消费这些对象。经过这种方式,能够有更多的对象在被建立的同时被消费。
java
为了演示Exchanger类,咱们将建立生产者和消费者任务。ExchangerProducer和ExchangerConsumer使用一个List<Fat>做为要求交换的对象,它们都包含一个用于这个List<Fat>的Exchanger。当你调用Exchanger.exchange()方法时,它将阻塞直至对方任务调用它本身的exchange()方法,那时,这两个exchange()方法将同时完成,而List<Fat>被交换:
ide
import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; class ExchangerProducer implements Runnable { private List<Fat> holder; private Exchanger<List<Fat>> exchanger; public ExchangerProducer(Exchanger<List<Fat>> exchanger, List<Fat> holder) { this.exchanger = exchanger; this.holder = holder; } @Override public void run() { try { while(!Thread.interrupted()) { //填充列表 for (int i = 0;i < ExchangerDemo.size; i++) { holder.add(new Fat()); } //等待交换 holder = exchanger.exchange(holder); } } catch (InterruptedException e) { } System.out.println("Producer stopped."); } } class ExchangerConsumer implements Runnable { private List<Fat> holder; private Exchanger<List<Fat>> exchanger; private volatile Fat value; private static int num = 0; public ExchangerConsumer(Exchanger<List<Fat>> exchanger, List<Fat> holder) { this.exchanger = exchanger; this.holder = holder; } @Override public void run() { try { while(!Thread.interrupted()) { //等待交换 holder = exchanger.exchange(holder); //读取列表并移除元素 for (Fat x : holder) { num++; value = x; //在循环内删除元素,这对于CopyOnWriteArrayList是没有问题的 holder.remove(x); } if (num % 10000 == 0) { System.out.println("Exchanged count=" + num); } } } catch (InterruptedException e) { } System.out.println("Consumer stopped. Final value: " + value); } } public class ExchangerDemo { static int size = 10; static int delay = 5; //秒 public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); List<Fat> producerList = new CopyOnWriteArrayList<>(); List<Fat> consumerList = new CopyOnWriteArrayList<>(); Exchanger<List<Fat>> exchanger = new Exchanger<>(); exec.execute(new ExchangerProducer(exchanger, producerList)); exec.execute(new ExchangerConsumer(exchanger, consumerList)); TimeUnit.SECONDS.sleep(delay); exec.shutdownNow(); } } class Fat { private volatile double d; private static int counter = 1; private final int id = counter++; public Fat() { //执行一段耗时的操做 for (int i = 1; i<10000; i++) { d += (Math.PI + Math.E) / (double)i; } } public void print() {System.out.println(this);} public String toString() {return "Fat id=" + id;} }
执行结果(可能的结果):this
Exchanged count=10000 Exchanged count=20000 Exchanged count=30000 Exchanged count=40000 Exchanged count=50000 Exchanged count=60000 Exchanged count=70000 Exchanged count=80000 Consumer stopped. Final value: Fat id=88300 Producer stopped.
在main()中,建立了用于两个任务的单一的Exchanger,以及两个用于互换的CopyOnWriteArrayList。这个特定的List变体容许列表在被遍历的时候调用remove()方法,而不会抛出ConcurrentModifiedException异常。ExchangerProducer将填充这个List,而后将这个满列表跟ExchangerConsumer的空列表交换。交换以后,ExchangerProducer能够继续的生产Fat对象,而ExchangerConsumer则开始使用满列表中的对象。由于有了Exchanger,填充一个列表和消费另外一个列表便同时发生了。
code