生产者消费者问题总结

生产者-消费者算是并发编程中常见的问题。依靠缓冲区咱们能够实现生产者与消费者之间的解耦。生产者只管往缓冲区里面放东西,消费者只管往缓冲区里面拿东西。这样咱们避免生产者想要交付数据给消费者,但消费者此时还没法接受数据这样的状况发生。java

wait notify

这个问题其实就是线程间的通信,因此要注意的是不能同时读写。生产者在缓冲区满的时候不生产,等待;消费者在缓冲区为空的时候不消费,等待。比较经典的作法是waitnotify编程

生产者线程执行15次set操做安全

public class Producer implements Runnable{
    private Channel channel;

    public Producer(Channel channel) {
        this.channel = channel;
    }

    @Override
    public void run() {
        for(int i=0;i<15;i++){
            channel.set(Thread.currentThread().getName()+" "+i);
        }
    }
}

消费者线程执行10次get操做数据结构

public class Consumer implements Runnable {
    private Channel channel;

    public Consumer(Channel channel) {
        this.channel = channel;
    }

    @Override
    public void run() {
        for(int i=0;i<10;i++){
            System.out.println("Consumer "+Thread.currentThread().getName()+" get "+channel.get());
        }
    }
}

如今定义Channel类,并建立两个生产者线程和三个消费者线程并发

public class Channel {
    private List<String> buffer=new ArrayList<>();
    private final int MAX_SIZE=10;

    public synchronized String get(){
        while (buffer.size()==0){//不要用if,醒来了也要再次判断
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        String str=buffer.remove(0);
        notifyAll();
        return str;
    }
    public synchronized void set(String str){
        while (buffer.size()==MAX_SIZE){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        buffer.add(str);
        notifyAll();
    }

    public static void main(String[] args) {
        Channel channel=new Channel();
        Producer producer=new Producer(channel);
        Consumer consumer=new Consumer(channel);
        for(int i=0;i<2;i++){
            new Thread(producer).start();
        }
        for (int i=0;i<3;i++){
            new Thread(consumer).start();
        }
    }
}

使用notifyAll而不是notify的缘由是,notify有可能出现屡次唤醒同类的状况,形成“假死”。咱们可使用Condition来实现更精确的唤醒。ide

Condition

将上面代码中的Channel类修改一下便可this

public class Channel {
    private List<String> buffer=new ArrayList<>();
    private final int MAX_SIZE=10;
    private Lock lock=new ReentrantLock();
    private Condition producer=lock.newCondition();
    private Condition consumer=lock.newCondition();
    
    public String get(){
        String str=null;
        try {
            lock.lock();
            while (buffer.size()==0){
                consumer.await();
            }
            str=buffer.remove(0);
            producer.signalAll();
        }catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
        return str;
    }
    public void set(String str){
        try {
            lock.lock();
            while (buffer.size()==MAX_SIZE){
                producer.await(); 
            }
            buffer.add(str);
            consumer.signalAll();
        }catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

双缓冲与Exchanger

当同步的花销很是大时,咱们能够采用双缓冲区的办法。双缓冲的一个好处就在于:由于生产者和消费者各自拥有一个缓冲区,因此他们不会同时对同一个缓冲区进行操做,那么咱们就不须要为读写操做加锁,用空间换了时间。在Java中能够经过Exchanger来交换两个线程之间的数据结构。线程

public class Producer implements Runnable{
    private List<String> buffer;
    private Exchanger<List<String>> exchanger;
    public Producer(List<String> buffer, Exchanger<List<String>> exchanger){
        this.buffer=buffer;
        this.exchanger=exchanger;
    }
    @Override
    public void run() {
        for(int i=0;i<10;i++){
            for (int j=0;j<10;j++)
            buffer.add("Thrad "+Thread.currentThread().getName()+" : "+i+" "+j);
            try {
                buffer=exchanger.exchange(buffer);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

public class Consumer implements Runnable {
    private Exchanger<List<String>> exchanger;
    private List<String> buffer;

    public Consumer(List<String> buffer,Exchanger<List<String>> exchanger) {
        this.exchanger = exchanger;
        this.buffer = buffer;
    }

    @Override
    public void run() {
        for(int i=0;i<10;i++){
            try {
             buffer=exchanger.exchange(buffer);
            } catch (InterruptedException e) {
             e.printStackTrace();
            }
            for(int j=0;j<10;j++){
                String message=buffer.get(0);
                System.out.println(message);
                buffer.remove(0);
            }
        }
    }
}

public class Main {
    public static void main(String[] args) {
        List<String> buffer1=new ArrayList<>();
        List<String> buffer2=new ArrayList<>();
        Exchanger<List<String>> exchanger=new Exchanger<>();
        Producer producer=new Producer(buffer1,exchanger);
        Consumer consumer=new Consumer(buffer2,exchanger);
        Thread t1=new Thread(producer);
        Thread t2=new Thread(consumer);
        t1.start();
        t2.start();
    }
}

BlockingQueue

咱们可使用更为方便安全的阻塞式集合来实现生产消费者模型。code

这类集合具备的特色是:当集合已满或者是为空的时候,被调用的方法不会当即执行,该方法将被阻塞,直到能够成功执行为止。rem

public class Channel {
    private BlockingQueue<String> blockingQueue=new ArrayBlockingQueue<>(10);
    public String get(){
        String str=null;
        try {
            str=blockingQueue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return str;
    }
    public void set(String str){
        try {
            blockingQueue.put(str);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

此次的Channel类是否是比以前的简洁了许多,有了BlockingQueue咱们就不用再去写wait和notify了。

相关文章
相关标签/搜索