生产者消费者模式是多线程中最为常见的模式:生产者线程(一个或多个)生成面包放进篮子里(集合或数组),同时,消费者线程(一个或多个)从篮子里(集合或数组)取出面包消耗。虽然它们任务不一样,但处理的资源是相同的,这体现的是一种线程间通讯方式。java
本文将先说明单生产者单消费者的状况,以后再说明多生产者多消费者模式的状况。还会分别使用wait()/nofity()/nofityAll()机制、lock()/unlock()机制实现这两种模式。数组
在开始介绍模式以前,先解释下wait()、notify()和notifyAll()方法的用法细节以及改进的lock()/unlock()、await()/signal()/signalAll()的用法。安全
wait()、notify()和notifyAll()分别表示让线程进入睡眠、唤醒睡眠线程以及唤醒全部睡眠的线程。可是,对象是哪一个线程呢?另外,在API文档中描述这三个方法都必须在有效监视器(可理解为持有锁)的前提下使用。这三个方法和锁有什么关系呢?多线程
以同步代码块synchronized(obj){}或同步函数为例,在它们的代码结构中可使用wait()、notify()以及notifyAll(),由于它们都持有锁。ide
对于下面的两个同步代码块来讲,分别使用的是锁obj1和锁obj2,其中线程一、线程2执行的是obj1对应的同步代码,线程三、线程4执行的是obj2对应的同步代码。函数
class MyLock implements Runnable { public int flag = 0; Object obj1 = new Object(); Object obj2 = new Object(); public void run(){ while(true){ if(flag%2=0){ synchronized(obj1){ //线程t1和t2执行此同步任务 //try{obj1.wait();}catch(InterruptedException i){} //obj1.notify() //obj1.notifyAll() } } else { synchronized(obj2){ //线程t3和t4执行此同步任务 //try{obj2.wait();}catch(InterruptedException i){} //obj2.notify() //obj2.notifyAll() } } } } } class Demo { public static void main(String[] args){ MyLock ml = new MyLock(); Thread t1 = new Thread(ml); Thread t2 = new Thread(ml); Thread t3 = new Thread(ml); Thread t4 = new Thread(ml); t1.start(); t2.start(); try{Thread.sleep(1)}catch(InterruptedException i){}; ml.flag++; t3.start(); t4.start(); } }
当t1开始执行到wait()时,它将进入睡眠状态,但却不是通常的睡眠,而是在一个被obj1标识的线程池中睡眠(其实是监视器对应线程池,只不过此时的监视器和锁是绑定在一块儿的)。当t2开始执行,它发现锁obj1被其余线程持有,它将进入睡眠态,此次睡眠是由于锁资源等待而非wait()进入的睡眠。由于t2已经判断过它要申请的是obj1锁,所以它也会进入obj1这个线程池睡眠,而不是普通的睡眠。同理t3和t4,这两个线程会进入obj2线程池睡眠。this
当某个线程执行到notify()时,这个notify()将 随机 唤醒它 所属锁对应线程池 中的 任意一个 线程。例如,obj1.notify()将唤醒obj1线程池中任意一个睡眠的线程(固然,若是没有睡眠线程则什么也不作)。同理notifyAll()则是唤醒所属锁对应线程池中全部睡眠的线程。线程
必需要搞清楚的是"对应锁",由于在调用wait()、notify()和notifyAll()时都必须明确指定锁。例如,obj1.wait()。若是省略了所属锁,则表示的是this这个对象,也就是说,只有在非静态的同步函数中才能省略这三个方法的前缀。设计
简而言之,当使用了同步,就使用了锁,线程也就有了归属,它的全部依据都由所属锁来决定。例如,线程同步时,判断锁是否空闲以决定是否执行后面的代码,亦决定是否去特定的线程池中睡眠,当唤醒时也只会唤醒所属锁对应线程池中的线程。指针
这几个方法在应用上,通常在一次任务中,wait()和notify()/notifyAll()是成对出现且择一执行的。换句话说,就是这一轮原子性同步执行过程当中,要么执行wait()进入睡眠,要么执行notify()唤醒线程池中的睡眠线程。要如何实现择一执行,能够考虑使用标记的方式来做为判断依据。参考后文的例子。
wait()系列的三个方法局限性很大,由于不管是睡眠仍是唤醒的动做,都彻底和锁耦合在一块儿了。例如,锁obj1关联的线程只能唤醒obj1线程池中的线程,而没法唤醒锁obj2关联的线程;再例如,在原来synchronized同步时,锁是在开始同步时隐式地自动获取的,且是在执行完一整个任务后,又隐式地自动释放锁,也就是说获取锁和释放锁的动做没法人为控制。
从JDK 1.5开始,java提供了java.util.concurrent.locks包,这个包中提供了Lock接口、Condition接口和ReadWriteLock接口,前两个接口将锁和监视器方法(睡眠、唤醒操做)解耦了。其中Lock接口只提供锁,经过锁方法newConditon()能够生成一个或多个与该锁关联的监视器,每一个监视器都有本身的睡眠、唤醒方法。也就是说Lock替代了synchronized方法和同步代码块的使用,Condition替代了Object监视器方法的使用。
以下图:
当某线程执行condition1.await()时,该线程将进入condition1监视器对应的线程池睡眠,当执行condition1.signal()时,将随机唤醒condition1线程池中的任意一个线程,当执行condition1.signalAll()时,将唤醒condition1线程池中的全部线程。同理,对于condition2监视器也是同样的。
即便有多个监视器,但只要它们关联的是同一个锁对象,就能够跨监视器操做对方线程。例如condition1中的线程能够执行condition2.signal()来唤醒condition2线程池中的某个线程。
要使用这种锁、监视器的关联方式,参考以下步骤:
import java.util.concurrent.locks.*; Lock l = new ReentrantLock(); Condition con1 = l.newCondition(); condition con2 = l.newCondition(); l.lock(); try{ //包含await()、signal()或signalAll()的代码段... } finally { l.unlock(); //因为代码段可能异常,但unlock()是必须执行的,因此必须使用try,且将unlock()放进finally段
具体用法见后文关于Lock、condition的示例代码。
一个生产者线程,一个消费者线程,生产者每生产一个面包放进盘子里,消费者从盘子里取出面包进行消费。其中生产者判断是否继续生产的依据是盘子里没有面包,而消费者判断是否消费的依据是盘子里有面包。因为这个模式中,盘子一直只放一个面包,所以能够把盘子省略掉,生产者和消费者直接手把手地交递面包便可。
首先须要描述这三个类,一是多线程共同操做的资源(此处即面包),二是生产者,三是消费者。在下面的例子中,我把生产面包和消费面包的方法分别封装到了生产者和消费者类中,若是把它们封装在面包类中则更容易理解。
//描述资源:面包的名称和编号,由编号决定面包的号码 class Bread { public String name; public int count = 1; public boolean flag = false; //该标记为wait()和notify()提供判断标记 } //生产者和消费者前后处理的面包资源是同一个,要确保这一点, //能够按单例模式来设计面包类,也能够将同一个面包对象经过构造方法传递给生产者和消费者,此处使用后一种方式。 //描述生产者 class Producer implements Runnable { private Bread b; //生产者的成员:它要处理的资源 Producer(Bread b){ this.b = b; } //提供生产面包的方法 public void produce(String name){ b.name = name + b.count; b.count++; } public void run(){ while(true){ synchronized(Bread.class){ //使用Bread.class做为锁标识,使得生产者和消费者的同步代码块可使用同一个锁 if(b.flag){ //wait()必须在同步代码块内部,不只由于必须持有锁才能睡眠,并且对锁这个资源的判断会出现混乱 try{Bread.class.wait();}catch(InterruptedException i){} } produce("面包"); System.out.println(Thread.currentThread().getName()+"----生产者------"+b.name); try{Thread.sleep(10);}catch(InterruptedException i){} b.flag = true; //标记的切换也必须在保持同步 Bread.class.notify(); //notify()也必须同步,不然锁都已经释放了,就没法作唤醒动做 //ps:一次同步任务中,wait()和notify()应当只能其中一个执行,不然对方线程会混乱 } } } } //描述消费者 class Consumer implements Runnable { private Bread b; //消费者的成员:它要处理的资源 Consumer(Bread b){ this.b = b; } //提供消费面包的方法 public String consume(){ return b.name; } public void run(){ while(true){ synchronized(Bread.class){ if(!b.flag){ try{Bread.class.wait();}catch(InterruptedException i){} } System.out.println(Thread.currentThread().getName()+"----消费者-------------"+consume()); try{Thread.sleep(10);}catch(InterruptedException i){} b.flag = false; Bread.class.notify(); } } } } public class ProduceConsume_1{ public static void main(String[] args) { //1.建立资源对象 Bread b = new Bread(); //2.建立生产者和消费者对象,将同一个面包对象传递给生产者和消费者 Producer pro = new Producer(b); Consumer con = new Consumer(b); //3.建立线程对象 Thread pro_t = new Thread(pro); Thread con_t = new Thread(con); pro_t.start(); con_t.start(); } }
最后的执行结果应当生产一个、消费一个,如此不断循环。以下:
Thread-0----生产者------面包1 Thread-1----消费者-------------面包1 Thread-0----生产者------面包2 Thread-1----消费者-------------面包2 Thread-0----生产者------面包3 Thread-1----消费者-------------面包3 Thread-0----生产者------面包4 Thread-1----消费者-------------面包4 Thread-0----生产者------面包5 Thread-1----消费者-------------面包5 Thread-0----生产者------面包6 Thread-1----消费者-------------面包6
代码以下:
import java.util.concurrent.locks.*; class Bread { public String name; public int count = 1; public boolean flag = false; //为生产者和消费者提供同一个锁对象以及同一个Condition对象 public static Lock lock = new ReentrantLock(); public static Condition condition = lock.newCondition(); } class Producer implements Runnable { private Bread b; Producer(Bread b){ this.b = b; } public void produce(String name){ b.name = name + b.count; b.count++; } public void run(){ while(true){ //使用Bread.lock来锁住资源 Bread.lock.lock(); try{ if(b.flag){ try{Bread.condition.await();}catch(InterruptedException i){} } produce("面包"); System.out.println(Thread.currentThread().getName()+"----生产者------"+b.name); try{Thread.sleep(10);}catch(InterruptedException i){} b.flag = true; Bread.condition.signal(); } finally { Bread.lock.unlock(); } } } } class Consumer implements Runnable { private Bread b; Consumer(Bread b){ this.b = b; } public String consume(){ return b.name; } public void run(){ while(true){ //使用Bread.lock来锁住资源 Bread.lock.lock(); try{ if(!b.flag){ try{Bread.condition.await();}catch(InterruptedException i){} } System.out.println(Thread.currentThread().getName()+"----消费者-------------"+consume()); try{Thread.sleep(10);}catch(InterruptedException i){} b.flag = false; Bread.condition.signal(); } finally { Bread.lock.unlock(); } } } } public class ProduceConsume_1{ public static void main(String[] args) { //1.建立资源对象 Bread b = new Bread(); //2.建立生产者和消费者对象,将同一个面包对象传递给生产者和消费者 Producer pro = new Producer(b); Consumer con = new Consumer(b); //3.建立线程对象 Thread pro_t = new Thread(pro); Thread con_t = new Thread(con); pro_t.start(); con_t.start(); } }
这里先说明多生产者多消费者,但同一个时刻最多只能有一个面包的模式,这个模式在实际中多是不理想的,但为了引出后面真实的多生产多消费模式,我以为有必要在这里解释这种模式,而且分析这种模式以及如何从单生产单消费的代码演变而来。
以下图:
从单生产单消费到多生产多消费,由于多线程安全问题和死锁问题,因此有两个方面的问题须要考虑:
对于某一方来讲,如何让多线程达到和单线程一样的生产或消费能力?也就是说,如何让多线程看上去就是单线程。多线程和单线程最大的区别在于多线程安全问题,所以,只要保证多线程执行的任务可以同步便可。
第1个问题考虑的是某一方多线程的问题,第2个问题考虑的是两方如何能和谐配合完成生产消费问题。也就是如何保证生产方和消费方一方活动的同时另外一方睡眠。只需在某一方执行完同步任务时,唤醒另外一方便可。
其实从单线程到多线程,就两个问题须要考虑:不一样步和死锁。(1)当生产方和消费方都出现了多线程,能够将生产方的多线程当作一个线程总体、消费方的多线程也当作一个总体,这解决的是线程安全问题。(2)再将生产方总体和消费方总体两方结合起来当作多线程,来解决死锁问题,而java中解决死锁的方式就是唤醒对方或唤醒全部。
问题是如何保证某一方的多线程之间同步?以多线程执行单消费方的代码为例进行分析。
while(true){ synchronized(Bread.class){ if(!b.flag){ try{Bread.class.wait();}catch(InterruptedException i){} } System.out.println(Thread.currentThread().getName()+"----消费者-------------"+consume()); try{Thread.sleep(10);}catch(InterruptedException i){} b.flag = false; Bread.class.notify(); } }
假设消费线程1消费完一个面包后唤醒了消费线程2,并继续循环,判断if(!flag),它将wait,因而锁被释放。假设CPU正好选中了消费线程2,那么消费线程2也将进入wait。当生产方生产了一个面包后,假设唤醒了消费线程1,它将从wait语句处继续向下消费刚生产完的面包,假设正好再次唤醒了消费线程2,当消费线程2被CPU选中后,消费线程2也将从wait语句处向下消费,消费的也是刚才生产的面包,问题再此出现了,连续唤醒的消费线程1和2消费的是同一个面包,也就是说面包被重复消费了。这又是多线程不一样步问题。
说了一大段,其实将视线放大后分析就很简单了,只要某一方的2个或多个线程都由于判断b.flag而wait,那么这两个或多个线程有可能会被连续唤醒而继续向下生产或消费。这形成了多线程不一样步问题。
不安全的问题就出在同一方的多个线程在连续唤醒后继续向下生产或消费。这是if语句引发的,若是可以让wait的线程在唤醒后还回头判断b.flag是否为true,就能让其决定是否继续wait仍是向下生产或消费。
能够将if语句替换为while语句来知足要求。这样一来,不管某一方的多个线程是否被连续唤醒,它们都将回头判断b.flag。
while(true){ synchronized(Bread.class){ while(!b.flag){ try{Bread.class.wait();}catch(InterruptedException i){} } System.out.println(Thread.currentThread().getName()+"----消费者-------------"+consume()); try{Thread.sleep(10);}catch(InterruptedException i){} b.flag = false; Bread.class.notify(); } }
解决了第一个多线程安全的问题,但会出现死锁问题。这很容易分析,将生产方看做一个总体,将消费方也看做一个总体,当生产方线程都wait了(生产方的线程被连续唤醒时会出现该方线程所有wait),消费方也都wait了,死锁就出现了。其实放大了看,将生产方、消费方分别看做一个线程,这两个线程组成多线程,当某一方wait后没法唤醒另外一方,另外一方也必定会wait,因而就死锁了。
对于双方死锁的问题,只要保证能唤醒对方,而非本方连续唤醒就能解决。使用notifyAll()或signalAll()便可,也能够经过signal()唤醒对方线程解决,见下面的第二段代码。
根据上面的分析,将单生产、单消费模式的代码改进一下,就能够变为多生产多消费单面包模式。、
//代码段1 class Bread { public String name; public int count = 1; public boolean flag = false; } //描述生产者 class Producer implements Runnable { private Bread b; Producer(Bread b){ this.b = b; } public void produce(String name){ b.name = name + b.count; b.count++; } public void run(){ while(true){ synchronized(Bread.class){ while(b.flag){ try{Bread.class.wait();}catch(InterruptedException i){} } produce("面包"); System.out.println(Thread.currentThread().getName()+"----生产者------"+b.name); try{Thread.sleep(10);}catch(InterruptedException i){} b.flag = true; Bread.class.notifyAll(); } } } } //描述消费者 class Consumer implements Runnable { private Bread b; Consumer(Bread b){ this.b = b; } public String consume(){ return b.name; } public void run(){ while(true){ synchronized(Bread.class){ while(!b.flag){ try{Bread.class.wait();}catch(InterruptedException i){} } System.out.println(Thread.currentThread().getName()+"----消费者-------------"+consume()); try{Thread.sleep(10);}catch(InterruptedException i){} b.flag = false; Bread.class.notifyAll(); } } } } public class ProduceConsume_5 { public static void main(String[] args) { //1.建立资源对象 Bread b = new Bread(); //2.建立生产者和消费者对象 Producer pro = new Producer(b); Consumer con = new Consumer(b); //3.建立线程对象 Thread pro_t1 = new Thread(pro); //生产线程1 Thread pro_t2 = new Thread(pro); //生产线程2 Thread con_t1 = new Thread(con); //消费线程1 Thread con_t2 = new Thread(con); //消费线程2 pro_t1.start(); pro_t2.start(); con_t1.start(); con_t2.start(); } }
如下是采用Lock和Conditon重构后的代码,使用的是signal()唤醒对方线程的方法。
//代码段2 import java.util.concurrent.locks.*; class Bread { public String name; public int count = 1; public boolean flag = false; public static Lock lock = new ReentrantLock(); public static Condition pro_con = lock.newCondition(); public static Condition con_con = lock.newCondition(); } //描述生产者 class Producer implements Runnable { private Bread b; Producer(Bread b){ this.b = b; } public void produce(String name){ b.name = name + b.count; b.count++; } public void run(){ while(true){ Bread.lock.lock(); try{ while(b.flag){ try{Bread.pro_con.await();}catch(InterruptedException i){} } produce("面包"); System.out.println(Thread.currentThread().getName()+"----生产者------"+b.name); try{Thread.sleep(10);}catch(InterruptedException i){} b.flag = true; Bread.con_con.signal(); //唤醒的是consumer线程 } finally { Bread.lock.unlock(); } } } } //描述消费者 class Consumer implements Runnable { private Bread b; Consumer(Bread b){ this.b = b; } public String consume(){ return b.name; } public void run(){ while(true){ Bread.lock.lock(); try{ while(!b.flag){ try{Bread.con_con.await();}catch(InterruptedException i){} } System.out.println(Thread.currentThread().getName()+"----消费者-------------"+consume()); try{Thread.sleep(10);}catch(InterruptedException i){} b.flag = false; Bread.pro_con.signal(); //唤醒的是producer线程 } finally { Bread.lock.unlock(); } } } } public class ProduceConsume_6 { public static void main(String[] args) { //1.建立资源对象 Bread b = new Bread(); //2.建立生产者和消费者对象 Producer pro = new Producer(b); Consumer con = new Consumer(b); //3.建立线程对象 Thread pro_t1 = new Thread(pro); Thread pro_t2 = new Thread(pro); Thread con_t1 = new Thread(con); Thread con_t2 = new Thread(con); pro_t1.start(); pro_t2.start(); con_t1.start(); con_t2.start(); } }
关于多生产、多消费问题作个总结:
(1).解决某一方多线程不一样步的方案是使用while(flag)来判断是否wait;
(2).解决双方死锁问题的方案是唤醒对方,可使用notifyAll(),signalAll()或对方监视器的signal()方法。
有多个生产者线程,多个消费者线程,生产者将生产的面包放进篮子(集合或数组)里,消费者从篮子里取出面包。生产者判断继续生产的依据是篮子已经满了,消费者判断继续消费的依据是篮子是否空了。此外,当消费者取出面包后,对应的位置又空了,生产者能够回头从篮子的起始位置继续生产,这能够经过重置篮子的指针来实现。
在这个模式里,除了描述生产者、消费者、面包,还须要描述篮子这个容器。假设使用数组做为容器,生产者每生产一个,生产指针向后移位,消费者每消费一个,消费指针向后移位。
代码以下:可参考API-->Condition类中给出的示例代码
import java.util.concurrent.locks.*; class Basket { private Bread[] arr; //the size of basket Basket(int size){ arr = new Bread[size]; } //the pointer of in and out private int in_ptr,out_ptr; //how many breads left in basket private int left; private Lock lock = new ReentrantLock(); private Condition full = lock.newCondition(); private Condition empty = lock.newCondition(); //bread into basket public void in(){ lock.lock(); try{ while(left == arr.length){ try{full.await();} catch (InterruptedException i) {i.printStackTrace();} } arr[in_ptr] = new Bread("MianBao",Producer.num++); System.out.println("Put the bread: "+arr[in_ptr].getName()+"------into basket["+in_ptr+"]"); left++; if(++in_ptr == arr.length){in_ptr = 0;} empty.signal(); } finally { lock.unlock(); } } //bread out from basket public Bread out(){ lock.lock(); try{ while(left == 0){ try{empty.await();} catch (InterruptedException i) {i.printStackTrace();} } Bread out_bread = arr[out_ptr]; System.out.println("Get the bread: "+out_bread.getName()+"-----------from basket["+out_ptr+"]"); left--; if(++out_ptr == arr.length){out_ptr = 0;} full.signal(); return out_bread; } finally { lock.unlock(); } } } class Bread { private String name; Bread(String name,int num){ this.name = name + num; } public String getName(){ return this.name; } } class Producer implements Runnable { private Basket basket; public static int num = 1; //the first number for Bread's name Producer(Basket b){ this.basket = b; } public void run(){ while(true) { basket.in(); try{Thread.sleep(10);}catch(InterruptedException i){} } } } class Consumer implements Runnable { private Basket basket; private Bread i_get; Consumer(Basket b){ this.basket = b; } public void run(){ while(true){ i_get = basket.out(); try{Thread.sleep(10);}catch(InterruptedException i){} } } } public class ProduceConsume_7 { public static void main(String[] args) { Basket b = new Basket(20); // the basket size = 20 Producer pro = new Producer(b); Consumer con = new Consumer(b); Thread pro_t1 = new Thread(pro); Thread pro_t2 = new Thread(pro); Thread con_t1 = new Thread(con); Thread con_t2 = new Thread(con); Thread con_t3 = new Thread(con); pro_t1.start(); pro_t2.start(); con_t1.start(); con_t2.start(); con_t3.start(); } }
这里涉及了消费者、生产者、面包和篮子,其中面包和篮子是多线程共同操做的资源,生产者线程生产面包放进篮子,消费者线程从篮子中取出面包。理想的代码是将生产任务和消费任务都封装在资源类中,由于面包是篮子容器的元素,因此不适合封装到面包类中,并且封装到篮子中,能更方便地操做容器。
注意,必定要将全部涉及资源操做的代码都放进锁的内部,不然会产生多线程不一样步问题。例如,在Producer类中定义了生产面包的方法produce(),而后将其做为放进篮子的方法basket.in()的参数,即basket.in(producer()),这是错误的行为,由于produce()是在锁的外部执行后才传递给in()方法的。