生产者-消费者算是并发编程中常见的问题。依靠缓冲区咱们能够实现生产者与消费者之间的解耦。生产者只管往缓冲区里面放东西,消费者只管往缓冲区里面拿东西。这样咱们避免生产者想要交付数据给消费者,但消费者此时还没法接受数据这样的状况发生。java
这个问题其实就是线程间的通信,因此要注意的是不能同时读写。生产者在缓冲区满的时候不生产,等待;消费者在缓冲区为空的时候不消费,等待。比较经典的作法是wait
和notify
。编程
生产者线程执行15次set操做安全
public class Producer implements Runnable{ private Channel channel; public Producer(Channel channel) { this.channel = channel; } @Override public void run() { for(int i=0;i<15;i++){ channel.set(Thread.currentThread().getName()+" "+i); } } }
消费者线程执行10次get操做数据结构
public class Consumer implements Runnable { private Channel channel; public Consumer(Channel channel) { this.channel = channel; } @Override public void run() { for(int i=0;i<10;i++){ System.out.println("Consumer "+Thread.currentThread().getName()+" get "+channel.get()); } } }
如今定义Channel类,并建立两个生产者线程和三个消费者线程并发
public class Channel { private List<String> buffer=new ArrayList<>(); private final int MAX_SIZE=10; public synchronized String get(){ while (buffer.size()==0){//不要用if,醒来了也要再次判断 try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } String str=buffer.remove(0); notifyAll(); return str; } public synchronized void set(String str){ while (buffer.size()==MAX_SIZE){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } buffer.add(str); notifyAll(); } public static void main(String[] args) { Channel channel=new Channel(); Producer producer=new Producer(channel); Consumer consumer=new Consumer(channel); for(int i=0;i<2;i++){ new Thread(producer).start(); } for (int i=0;i<3;i++){ new Thread(consumer).start(); } } }
使用notifyAll而不是notify的缘由是,notify有可能出现屡次唤醒同类的状况,形成“假死”。咱们可使用Condition来实现更精确的唤醒。ide
将上面代码中的Channel类修改一下便可this
public class Channel { private List<String> buffer=new ArrayList<>(); private final int MAX_SIZE=10; private Lock lock=new ReentrantLock(); private Condition producer=lock.newCondition(); private Condition consumer=lock.newCondition(); public String get(){ String str=null; try { lock.lock(); while (buffer.size()==0){ consumer.await(); } str=buffer.remove(0); producer.signalAll(); }catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } return str; } public void set(String str){ try { lock.lock(); while (buffer.size()==MAX_SIZE){ producer.await(); } buffer.add(str); consumer.signalAll(); }catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } }
当同步的花销很是大时,咱们能够采用双缓冲区的办法。双缓冲的一个好处就在于:由于生产者和消费者各自拥有一个缓冲区,因此他们不会同时对同一个缓冲区进行操做,那么咱们就不须要为读写操做加锁,用空间换了时间。在Java中能够经过Exchanger来交换两个线程之间的数据结构。线程
public class Producer implements Runnable{ private List<String> buffer; private Exchanger<List<String>> exchanger; public Producer(List<String> buffer, Exchanger<List<String>> exchanger){ this.buffer=buffer; this.exchanger=exchanger; } @Override public void run() { for(int i=0;i<10;i++){ for (int j=0;j<10;j++) buffer.add("Thrad "+Thread.currentThread().getName()+" : "+i+" "+j); try { buffer=exchanger.exchange(buffer); } catch (InterruptedException e) { e.printStackTrace(); } } } } public class Consumer implements Runnable { private Exchanger<List<String>> exchanger; private List<String> buffer; public Consumer(List<String> buffer,Exchanger<List<String>> exchanger) { this.exchanger = exchanger; this.buffer = buffer; } @Override public void run() { for(int i=0;i<10;i++){ try { buffer=exchanger.exchange(buffer); } catch (InterruptedException e) { e.printStackTrace(); } for(int j=0;j<10;j++){ String message=buffer.get(0); System.out.println(message); buffer.remove(0); } } } } public class Main { public static void main(String[] args) { List<String> buffer1=new ArrayList<>(); List<String> buffer2=new ArrayList<>(); Exchanger<List<String>> exchanger=new Exchanger<>(); Producer producer=new Producer(buffer1,exchanger); Consumer consumer=new Consumer(buffer2,exchanger); Thread t1=new Thread(producer); Thread t2=new Thread(consumer); t1.start(); t2.start(); } }
咱们可使用更为方便安全的阻塞式集合来实现生产消费者模型。code
这类集合具备的特色是:当集合已满或者是为空的时候,被调用的方法不会当即执行,该方法将被阻塞,直到能够成功执行为止。rem
public class Channel { private BlockingQueue<String> blockingQueue=new ArrayBlockingQueue<>(10); public String get(){ String str=null; try { str=blockingQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } return str; } public void set(String str){ try { blockingQueue.put(str); } catch (InterruptedException e) { e.printStackTrace(); } } }
此次的Channel类是否是比以前的简洁了许多,有了BlockingQueue咱们就不用再去写wait和notify了。