生产者消费者之Java简单实现

为何要使用生产者和消费者模式

  • 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。
  • 在多线程开发当中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。

什么是生产者消费者模式

  • 生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。
  • 这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。

代码实现(多生产者 和 多消费者)

  • QueueBuffer : 实现阻塞队列,将生产者和消费者解耦。它底层是一个数组,构造的时候指定数组的大小。因为实现的时多生产这和多消费者的模型,因此注意一下 put 和 get 中对阻塞条件的描述用的是while循环,这是为了生产者之间或者消费者之间他们的内部竞争所形成的数组越界异常。

clipboard.png

package concurrency;

public class QueueBuffer {
    private final int SIZE;
    private int count = 0;
    private int[] buffer;
    public QueueBuffer(int size){
        this.SIZE = size;
        buffer = new int[SIZE];
    }

    public int getSIZE(){
        return SIZE;
    }

    public synchronized void put(int value){
        while (count == SIZE){ //buffer已经满了 等待get   ,用while使用于多个生产者的状况
            try {
                wait();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }

        notifyAll(); //说明buffer中有元素 能够取
        buffer[count++] = value;
        System.out.println("Put "+value+" current size = "+count);
    }

    public synchronized int get(){
        while(count == 0){//用while使用于多个消费者的状况。
            try {
                wait();//buffer为空,须要等到put进元素
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
//        notify() 只是去通知其余的线程,可是synchronized 方法里面的代码仍是会执行完毕的。
//        synchronized方法原本就加了锁。代码的执行跟你的notify()也无关,代码的执行是跟你的
//        synchronized绑定一块儿而已。

        notifyAll(); //说明刚刚从buffer中取出了元素 有空位能够加进新的元素
        int result = buffer[--count];
        System.out.println("Get "+result+" current size = "+count);
        return result;
    }
}

class Test{
    public static void main(String[] args){
        QueueBuffer q = new QueueBuffer(10);
        new Producer(q);
        new Producer(q);
        new Producer(q);
        new Consumer(q);
        new Consumer(q);
        new Consumer(q);
        System.out.println("Press Control-C to stop.");
    }
}

Producerjava

package concurrency;

import java.util.Random;

public class Producer implements Runnable {

    Random rand = new Random(47);

    private QueueBuffer q;

    Producer(QueueBuffer q) {
        this.q = q;
        new Thread(this, "Producer").start();
    }

    public void run() {
        while (true) {
            q.put(rand.nextInt(q.getSIZE()));
            Thread.yield();
        }
    }
}

Consumer设计模式

package concurrency;

public class Consumer implements Runnable {
    private QueueBuffer q;

    Consumer(QueueBuffer q) {
        this.q = q;
        new Thread(this, "Consumer").start();
    }

    public void run() {
        while (true){
            q.get();
            Thread.yield();
        }
    }
}

注意事项

  • 调用obj的wait(), notify()方法前,必须得到obj对象的锁,也就是必须写在synchronized(obj) {…} 代码段内。
  • 调用obj.wait()后,线程A就释放了obj的锁,不然线程B没法得到obj锁,也就没法在synchronized(obj) {…} 代码段内唤醒A。
  • 当obj.wait()方法返回后,线程A须要再次得到obj锁,才能继续执行。
  • 若是A1,A2,A3都在obj.wait(),则B调用obj.notify()只能唤醒A1,A2,A3中的一个(具体哪个由JVM决定)。
  • obj.notifyAll()则能所有唤醒A1,A2,A3,可是要继续执行obj.wait()的下一条语句,必须得到obj锁,所以,A1,A2,A3只有一个有机会得到锁继续执行,例如A1,其他的须要等待A1释放obj锁以后才能继续执行。
  • 当B调用obj.notify/notifyAll的时候,B正持有obj锁,所以,A1,A2,A3虽被唤醒,可是仍没法得到obj锁。直到B退出synchronized块,释放obj锁后,A1,A2,A3中的一个才有机会得到锁继续执行。这一点很重要,并非调用 notify 或者 notifyAll 以后立刻释放锁,而是执行完相应的synchronized代码段。
相关文章
相关标签/搜索