Listing 6-3 运用Exchanger去擦除缓冲。java
package com.owen.thread.chapter6; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Exchanger; public class ExchangerDemo { final static Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>(); final static DataBuffer initialEmptyBuffer = new DataBuffer(); final static DataBuffer initialFullBuffer = new DataBuffer("I"); public static void main(String[] args) { class FillingLoop implements Runnable { int count = 0; @Override public void run() { DataBuffer currentBuffer = initialEmptyBuffer; try { while (true) { addToBuffer(currentBuffer); if (currentBuffer.isFull()) { System.out .println("filling thread wants to exchange"); currentBuffer = exchanger.exchange(currentBuffer); System.out .println("filling thread receives exchange"); } } } catch (InterruptedException ie) { System.out.println("filling thread interrupted"); } } void addToBuffer(DataBuffer buffer) { String item = "NI" + count++; System.out.println("Adding: " + item); buffer.add(item); } } class EmptyingLoop implements Runnable { @Override public void run() { DataBuffer currentBuffer = initialFullBuffer; try { while (true) { takeFromBuffer(currentBuffer); if (currentBuffer.isEmpty()) { System.out.println("emptying thread wants to " + "exchange"); currentBuffer = exchanger.exchange(currentBuffer); System.out.println("emptying thread receives " + "exchange"); } } } catch (InterruptedException ie) { System.out.println("emptying thread interrupted"); } } void takeFromBuffer(DataBuffer buffer) { System.out.println("taking: " + buffer.remove()); } } new Thread(new EmptyingLoop()).start(); new Thread(new FillingLoop()).start(); } } class DataBuffer { private final static int MAXITEMS = 10; private final List<String> items = new ArrayList<>(); DataBuffer() { } DataBuffer(String prefix) { for (int i = 0; i < MAXITEMS; i++) { String item = prefix + i; System.out.printf("Adding %s%n", item); items.add(item); } } synchronized void add(String s) { if (!isFull()) items.add(s); } synchronized boolean isEmpty() { return items.size() == 0; } synchronized boolean isFull() { return items.size() == MAXITEMS; } synchronized String remove() { if (!isEmpty()) return items.remove(0); return null; } }
主线程建立一个exchanger和初始化一对缓存对象。以后,建立EmptyingLoop和FillingLoop的类,各自都继承Runnable实现线程,经过start就能够调用这个线程了(这里用的执行器(exector))。每个run()的方法都在添加或从缓存中移除。当缓存为充足或为空时,exchanger会去擦除缓存让填充或空继续出现。上面例子执行结果:算法
Adding I0
Adding I1
Adding I2
Adding I3
Adding I4
Adding I5
Adding I6
Adding I7
Adding I8
Adding I9
taking: I0
taking: I1
taking: I2
taking: I3
taking: I4
taking: I5
taking: I6
taking: I7
taking: I8
taking: I9
emptying thread wants to exchange
Adding: NI0
Adding: NI1
Adding: NI2
Adding: NI3
Adding: NI4
Adding: NI5
Adding: NI6
Adding: NI7
Adding: NI8
Adding: NI9
filling thread wants to exchange
filling thread receives exchange
emptying thread receives exchange
Adding: NI10
taking: NI0
Adding: NI11
taking: NI1
Adding: NI12缓存