Java并发编程中级篇(七):并发任务间交换数据

Java API提供了一个同步辅助类Exchanger。它容许你在线程执行过程当中在线程之间交换数据。它的机制是在线程中设置通步点,当两个线程都到达同步点之时,它们交换数据结构,所以第一个线程的数据结构进入到第二个线程中,同时第二个线程的数据结构进入到第一个线程中。java

Exchange应用局限于两个线程,因此咱们能够使用Exchange来模拟只有一个生产者和一个消费者的生产者-消费者问题。缓存

建立一个生产者线程类Producer,声明一个List<String>做为缓冲区,一个Exchange<List<String>>用来和消费者交换数据。线程的执行方法中,循环10次,每次向缓冲区中加入10个记录并调用Exchange.exchange(),方法来和消费者交换数据。最后打印缓冲区记录数和当前循环次数。数据结构

public class Producer implements Runnable {
    private List<String> buffer;
    private Exchanger<List<String>> exchanger;

    public Producer(List<String> buffer, Exchanger<List<String>> exchanger) {
        this.buffer = buffer;
        this.exchanger = exchanger;
    }

    @Override
    public void run() {
        int cycle = 1;
        for (int i = 0; i < 10; i++) {
            System.out.printf("Producer: Cycle %d\n", cycle);
            for (int j = 0; j < 10; j++) {
                String message = "Event" + (i * 10 + j);
                System.out.printf("Producer: %s\n", message);
                buffer.add(message);
            }
            try {
                buffer = exchanger.exchange(buffer);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.printf("Producer: %d\n", buffer.size());
            cycle++;
        }
    }
}

建立一个消费者线程类,一样声明一个数据缓冲区List<String>和一个Exchange用于和生产者交换数据。执行10次循环,每次循环使用exchanger.exchange()方法等待和生产者交换数据,交换数据成功后打印数据缓冲区的内容。ide

public class Consumer implements Runnable{
    private List<String> buffer;
    private Exchanger<List<String>> exchanger;

    public Consumer(List<String> buffer, Exchanger<List<String>> exchanger) {
        this.buffer = buffer;
        this.exchanger = exchanger;
    }

    @Override
    public void run() {
        int cycle = 1;
        for (int i = 0; i < 10; i++) {
            System.out.printf("Consumer: Cycle %d\n", cycle);
            try {
                buffer = exchanger.exchange(buffer);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.printf("Consumer: Buffer size %d\n", buffer.size());
            for(Iterator<String> iterator = buffer.iterator(); iterator.hasNext();) {
                String message = iterator.next();
                System.out.printf("Consumer: %s\n", message);
                iterator.remove();
            }
            cycle++;
        }
    }
}

建立主方法类,声明两个List<Stting>分别做为生产者和消费者的数据缓冲区,声明一个Exchanger对象用于在生产者线程和消费者线程之间交换数据。而后启动两个线程。this

public class Main {
    public static void main(String[] args) {
        List<String> buffer1 = new ArrayList<String>();
        List<String> buffer2 = new ArrayList<String>();
        Exchanger<List<String>> exchanger = new Exchanger<List<String>>();

        Thread threadP = new Thread(new Producer(buffer1, exchanger));
        Thread threadC = new Thread(new Consumer(buffer2, exchanger));

        threadP.start();
        threadC.start();
    }
}

查看控制台执行结果。执行开始后消费者线程持有一个空缓冲区,并等待和生产者线程在第一个同步点交换数据。生产者则向一个空的缓冲区中写入10条记录,而后再第一个同步点与消费者交换数据。在这个同步点上,消费者得到生产者的10条记录的缓存,生产者得到消费者的空缓存,因此你会看到消费者打印缓存中的10条记录,而生产者交换后的缓存数据量为0.线程

Producer: Cycle 1
Producer: Event0
Producer: Event1
Producer: Event2
Producer: Event3
Producer: Event4
Producer: Event5
Producer: Event6
Producer: Event7
Producer: Event8
Producer: Event9
Consumer: Cycle 1
Producer: 0
Producer: Cycle 2
Producer: Event10
Producer: Event11
Producer: Event12
Producer: Event13
Producer: Event14
Producer: Event15
Producer: Event16
Producer: Event17
Producer: Event18
Producer: Event19
Consumer: Buffer size 10
Consumer: Event0
Consumer: Event1
Consumer: Event2
Consumer: Event3
Consumer: Event4
Consumer: Event5
Consumer: Event6
Consumer: Event7
Consumer: Event8
Consumer: Event9
Consumer: Cycle 2
Consumer: Buffer size 10
Consumer: Event10
Consumer: Event11
Consumer: Event12
Consumer: Event13
Consumer: Event14
Consumer: Event15
Consumer: Event16
Consumer: Event17
Consumer: Event18
Consumer: Event19
Consumer: Cycle 3
Producer: 0
Producer: Cycle 3
Producer: Event20
Producer: Event21
Producer: Event22
Producer: Event23
Producer: Event24
Producer: Event25
Producer: Event26
Producer: Event27
Producer: Event28
Producer: Event29
Producer: 0
Producer: Cycle 4
Producer: Event30
Producer: Event31
Consumer: Buffer size 10
Consumer: Event20
Producer: Event32
Consumer: Event21
Consumer: Event22
Consumer: Event23
Producer: Event33
Consumer: Event24
Consumer: Event25
Consumer: Event26
Consumer: Event27
Consumer: Event28
Consumer: Event29
Consumer: Cycle 4
Producer: Event34
Producer: Event35
Producer: Event36
Producer: Event37
Producer: Event38
Producer: Event39
Producer: 0
Producer: Cycle 5
Producer: Event40
Consumer: Buffer size 10
Consumer: Event30
Consumer: Event31
Consumer: Event32
Consumer: Event33
Producer: Event41
Producer: Event42
Consumer: Event34
Producer: Event43
Consumer: Event35
Producer: Event44
Consumer: Event36
Producer: Event45
Consumer: Event37
Consumer: Event38
Consumer: Event39
Consumer: Cycle 5
Producer: Event46
Producer: Event47
Producer: Event48
Producer: Event49
Producer: 0
Producer: Cycle 6
Consumer: Buffer size 10
Consumer: Event40
Consumer: Event41
Producer: Event50
Consumer: Event42
Consumer: Event43
Consumer: Event44
Producer: Event51
Consumer: Event45
Producer: Event52
Consumer: Event46
Producer: Event53
Consumer: Event47
Producer: Event54
Producer: Event55
Producer: Event56
Producer: Event57
Producer: Event58
Producer: Event59
Consumer: Event48
Consumer: Event49
Consumer: Cycle 6
Consumer: Buffer size 10
Consumer: Event50
Consumer: Event51
Consumer: Event52
Consumer: Event53
Consumer: Event54
Consumer: Event55
Consumer: Event56
Consumer: Event57
Consumer: Event58
Consumer: Event59
Consumer: Cycle 7
Producer: 0
Producer: Cycle 7
Producer: Event60
Producer: Event61
Producer: Event62
Producer: Event63
Producer: Event64
Producer: Event65
Producer: Event66
Producer: Event67
Producer: Event68
Producer: Event69
Producer: 0
Producer: Cycle 8
Producer: Event70
Producer: Event71
Producer: Event72
Producer: Event73
Producer: Event74
Producer: Event75
Producer: Event76
Producer: Event77
Producer: Event78
Producer: Event79
Consumer: Buffer size 10
Consumer: Event60
Consumer: Event61
Consumer: Event62
Consumer: Event63
Consumer: Event64
Consumer: Event65
Consumer: Event66
Consumer: Event67
Consumer: Event68
Consumer: Event69
Consumer: Cycle 8
Consumer: Buffer size 10
Producer: 0
Producer: Cycle 9
Consumer: Event70
Producer: Event80
Consumer: Event71
Consumer: Event72
Producer: Event81
Consumer: Event73
Producer: Event82
Consumer: Event74
Consumer: Event75
Consumer: Event76
Producer: Event83
Consumer: Event77
Producer: Event84
Producer: Event85
Consumer: Event78
Consumer: Event79
Consumer: Cycle 9
Producer: Event86
Producer: Event87
Producer: Event88
Producer: Event89
Producer: 0
Producer: Cycle 10
Consumer: Buffer size 10
Consumer: Event80
Producer: Event90
Producer: Event91
Consumer: Event81
Producer: Event92
Consumer: Event82
Producer: Event93
Consumer: Event83
Producer: Event94
Consumer: Event84
Consumer: Event85
Consumer: Event86
Consumer: Event87
Consumer: Event88
Producer: Event95
Producer: Event96
Producer: Event97
Producer: Event98
Consumer: Event89
Consumer: Cycle 10
Producer: Event99
Producer: 0
Consumer: Buffer size 10
Consumer: Event90
Consumer: Event91
Consumer: Event92
Consumer: Event93
Consumer: Event94
Consumer: Event95
Consumer: Event96
Consumer: Event97
Consumer: Event98
Consumer: Event99
相关文章
相关标签/搜索