什么是生产者/消费者模型并发
一种重要的模型,基于等待/通知机制。生产者/消费者模型描述的是有一块缓冲区做为仓库,生产者可将产品放入仓库,消费者能够从仓库中取出产品,生产者/消费者模型关注的是如下几个点:函数
生产者/模型做为一种重要的模型,它的优势在于:this
利用wait()/notify()实现生产者/消费者模型spa
既然生产者/消费者模型有一个缓冲区,那么咱们就本身作一个缓冲区,生产者和消费者的通讯都是经过这个缓冲区的。value为""表示缓冲区空,value不为""表示缓冲区满:线程
public class ValueObject { public static String value = ""; }
接下来就是一个生产者了,若是缓冲区满了的,那么就wait(),再也不生产了,等待消费者消费完通知;若是缓冲区是空的,那么就生产数据到缓冲区中code
public class Producer { private Object lock; public Producer(Object lock) { this.lock = lock; } public void setValue() { try { synchronized (lock) { if (!ValueObject.value.equals("")) lock.wait(); String value = System.currentTimeMillis() + "_" + System.nanoTime(); System.out.println("Set的值是:" + value); ValueObject.value = value; lock.notify(); } } catch (InterruptedException e) { e.printStackTrace(); } } }
消费者相似,若是缓冲区是空的,那么就再也不消费,wait()等待,等待生产者生产完通知;若是缓冲区不是空的,那么就去拿数据:blog
public class Customer { private Object lock; public Customer(Object lock) { this.lock = lock; } public void getValue() { try { synchronized (lock) { if (ValueObject.value.equals("")) lock.wait(); System.out.println("Get的值是:" + ValueObject.value); ValueObject.value = ""; lock.notify(); } } catch (InterruptedException e) { e.printStackTrace(); } } }
写个主函数,开两个线程调用Producer里面的getValue()方法和Customer()里面的setValue()方法:get
public static void main(String[] args) { Object lock = new Object(); final Producer producer = new Producer(lock); final Customer customer = new Customer(lock); Runnable producerRunnable = new Runnable() { public void run() { while (true) { producer.setValue(); } } }; Runnable customerRunnable = new Runnable() { public void run() { while (true) { customer.getValue(); } } }; Thread producerThread = new Thread(producerRunnable); Thread CustomerThread = new Thread(customerRunnable); producerThread.start(); CustomerThread.start(); }
看一下运行结果:产品
...
Set的值是:1444025677743_162366875965845
Get的值是:1444025677743_162366875965845
Set的值是:1444025677743_162366875983541
Get的值是:1444025677743_162366875983541
Set的值是:1444025677743_162366876004776
Get的值是:1444025677743_162366876004776
...
生产数据和消费数据必定是成对出现的,生产一个消费一个,满了不生产,空了不消费,生产者不能无限生产,消费者也不能无限消费,符合生产者/消费者模型。生产者速度快,就不占用CPU时间片,等着消费者消费完通知它继续生产,这块时间片能够用来给其余线程用。it
利用await()/signal()实现生产者和消费者模型
同样,先定义一个缓冲区:
public class ValueObject { public static String value = ""; }
换种写法,生产和消费方法放在一个类里面:
public class ThreadDomain41 extends ReentrantLock { private Condition condition = newCondition(); public void set() { try { lock(); while (!"".equals(ValueObject.value)) condition.await(); ValueObject.value = "123"; System.out.println(Thread.currentThread().getName() + "生产了value, value的当前值是" + ValueObject.value); condition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { unlock(); } } public void get() { try { lock(); while ("".equals(ValueObject.value)) condition.await(); ValueObject.value = ""; System.out.println(Thread.currentThread().getName() + "消费了value, value的当前值是" + ValueObject.value); condition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { unlock(); } } }
一样的,开两个线程,一个线程调用set()方法生产,另外一个线程调用get()方法消费:
public static void main(String[] args) { final ThreadDomain41 td = new ThreadDomain41(); Runnable producerRunnable = new Runnable() { public void run() { for (int i = 0; i < Integer.MAX_VALUE; i++) td.set(); } }; Runnable customerRunnable = new Runnable() { public void run() { for (int i = 0; i < Integer.MAX_VALUE; i++) td.get(); } }; Thread ProducerThread = new Thread(producerRunnable); ProducerThread.setName("Producer"); Thread ConsumerThread = new Thread(customerRunnable); ConsumerThread.setName("Consumer"); ProducerThread.start(); ConsumerThread.start(); }
看一下运行结果:
...
Producer生产了value, value的当前值是123
Consumer消费了value, value的当前值是
Producer生产了value, value的当前值是123
Consumer消费了value, value的当前值是
Producer生产了value, value的当前值是123
Consumer消费了value, value的当前值是
...
和wait()/notify()机制的实现效果同样,一样符合生产者/消费者模型
当心假死
生产者/消费者模型最终达到的目的是平衡生产者和消费者的处理能力,达到这个目的的过程当中,并不要求只有一个生产者和一个消费者。能够多个生产者对应多个消费者,能够一个生产者对应一个消费者,能够多个生产者对应一个消费者。
假死就发生在上面三种场景下。理论分析就能说明问题,因此就不写代码了。代码要写也很简单,上面的两个例子随便修改一个,开一个生产者线程/多个消费者线程、开多个生产者线程/消费者线程、开多个生产者线程/多个消费者线程均可以。假死指的是所有线程都进入了WAITING状态,那么程序就再也不执行任何业务功能了,整个项目呈现停滞状态。
比方说有生产者A和生产者B,缓冲区因为空了,消费者处于WAITING。生产者B处于WAITING,生产者A被消费者通知生产,生产者A生产出来的产品本应该通知消费者,结果通知了生产者B,生产者B被唤醒,发现缓冲区满了,因而继续WAITING。至此,两个生产者线程处于WAITING,消费者处于WAITING,系统假死。
上面的分析能够看出,假死出现的缘由是由于notify的是同类,因此非单生产者/单消费者的场景,能够采起两种方法解决这个问题:
一、synchronized用notifyAll()唤醒全部线程、ReentrantLock用signalAll()唤醒全部线程
二、用ReentrantLock定义两个Condition,一个表示生产者的Condition,一个表示消费者的Condition,唤醒的时候调用相应的Condition的signal()方法就能够了