文源网络,仅供学习之用,若有侵权请联系删除。
生产者-消费者模式是一个十分经典的多线程并发协做的模式,弄懂生产者-消费者问题可以让咱们对并发编程的理解加深。java
所谓生产者-消费者问题,实际上主要是包含了两类线程,一种是生产者线程用于生产数据,另外一种是消费者线程用于消费数据。面试
为了解耦生产者和消费者的关系,一般会采用共享的数据区域,就像是一个仓库,生产者生产数据以后直接放置在共享数据区中,并不须要关心消费者的行为;而消费者只须要从共享数据区中去获取数据,就再也不须要关心生产者的行为。spring
可是,这个共享数据区域中应该具有这样的线程间并发协做的功能:编程
在实现生产者消费者问题时,能够采用三种方式:设计模式
一、使用Object的wait/notify的消息通知机制;网络
二、使用Lock的Condition的await/signall的消息通知机制;数据结构
三、使用BlockingQueue实现。多线程
本文主要将这三种实现方式进行总结概括。并发
一、预备知识框架
Java 中,能够经过配合调用 Object 对象的 wait() 方法和 notify()方法或 notifyAll() 方法来实现线程间的通讯。在线程中调用 wait() 方法,将阻塞当前线程,直至等到其余线程调用了调用 notify() 方法或 notifyAll() 方法进行通知以后,当前线程才能从wait()方法出返回,继续执行下面的操做。
wait该方法用来将当前线程置入休眠状态,直到接到通知或被中断为止。在调用 wait()以前,线程必需要得到该对象的对象监视器锁,即只能在同步方法或同步块中调用 wait()方法。
调用wait()方法以后,当前线程会释放锁。若是调用wait()方法时,线程并未获取到锁的话,则会抛出IllegalMonitorStateException异常,这是以个RuntimeException。若是再次获取到锁的话,当前线程才能从wait()方法处成功返回。
notify该方法也要在同步方法或同步块中调用,即在调用前,线程也必需要得到该对象的对象级别锁,若是调用 notify()时没有持有适当的锁,也会抛出 IllegalMonitorStateException。
该方法任意从WAITTING状态的线程中挑选一个进行通知,使得调用wait()方法的线程从等待队列移入到同步队列中,等待有机会再一次获取到锁,从而使得调用wait()方法的线程可以从wait()方法处退出。
调用notify后,当前线程不会立刻释放该对象锁,要等到程序退出同步块后,当前线程才会释放锁。
notifyAll该方法与 notify ()方法的工做方式相同,重要的一点差别是:notifyAll 使全部原来在该对象上 wait 的线程通通退出WAITTING状态,使得他们所有从等待队列中移入到同步队列中去,等待下一次可以有机会获取到对象监视器锁。
二、wait/notify消息通知潜在的一些问题
1)notify早期通知
notify 通知的遗漏很容易理解,即 threadA 还没开始 wait 的时候,threadB 已经 notify 了,这样,threadB 通知是没有任何响应的,当 threadB 退出 synchronized 代码块后,threadA 再开始 wait,便会一直阻塞等待,直到被别的线程打断。
好比在下面的示例代码中,就模拟出notify早期通知带来的问题:
public class EarlyNotify { private static String lockObject = ""; public static void main(String[] args) { WaitThread waitThread = new WaitThread(lockObject); NotifyThread notifyThread = new NotifyThread(lockObject); notifyThread.start(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } waitThread.start(); } static class WaitThread extends Thread { private String lock; public WaitThread(String lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { try { System.out.println(Thread.currentThread().getName() + " 进去代码块"); System.out.println(Thread.currentThread().getName() + " 开始wait"); lock.wait(); System.out.println(Thread.currentThread().getName() + " 结束wait"); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class NotifyThread extends Thread { private String lock; public NotifyThread(String lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { System.out.println(Thread.currentThread().getName() + " 进去代码块"); System.out.println(Thread.currentThread().getName() + " 开始notify"); lock.notify(); System.out.println(Thread.currentThread().getName() + " 结束开始notify"); } } } }
示例中开启了两个线程,一个是WaitThread,另外一个是NotifyThread。NotifyThread会先启动,先调用notify方法。
而后WaitThread线程才启动,调用wait方法,可是因为通知过了,wait方法就没法再获取到相应的通知,所以WaitThread会一直在wait方法出阻塞,这种现象就是通知过早的现象。
针对这种现象,解决方法,通常是添加一个状态标志,让waitThread调用wait方法前先判断状态是否已经改变了没,若是通知早已发出的话,WaitThread就再也不去wait。
对上面的代码进行更正:
public class EarlyNotify { private static String lockObject = ""; private static boolean isWait = true; public static void main(String[] args) { WaitThread waitThread = new WaitThread(lockObject); NotifyThread notifyThread = new NotifyThread(lockObject); notifyThread.start(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } waitThread.start(); } static class WaitThread extends Thread { private String lock; public WaitThread(String lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { try { while (isWait) { System.out.println(Thread.currentThread().getName() + " 进去代码块"); System.out.println(Thread.currentThread().getName() + " 开始wait"); lock.wait(); System.out.println(Thread.currentThread().getName() + " 结束wait"); } } catch (InterruptedException e) { e.printStackTrace(); } } } } static class NotifyThread extends Thread { private String lock; public NotifyThread(String lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { System.out.println(Thread.currentThread().getName() + " 进去代码块"); System.out.println(Thread.currentThread().getName() + " 开始notify"); lock.notifyAll(); isWait = false; System.out.println(Thread.currentThread().getName() + " 结束开始notify"); } } } }
这段代码只是增长了一个isWait状态变量,NotifyThread调用notify方法后会对状态变量进行更新,在WaitThread中调用wait方法以前会先对状态变量进行判断,在该示例中,调用notify后将状态变量isWait改变为false,所以,在WaitThread中while对isWait判断后就不会执行wait方法,从而避免了Notify过早通知形成遗漏的状况。
总结:
在使用线程的等待/通知机制时,通常都要配合一个 boolean 变量值(或者其余可以判断真假的条件),在 notify 以前改变该 boolean 变量的值,让 wait 返回后可以退出 while 循环(通常都要在 wait 方法外围加一层 while 循环,以防止早期通知),或在通知被遗漏后,不会被阻塞在 wait 方法处。这样便保证了程序的正确性。
2)等待wait的条件发生变化
若是线程在等待时接受到了通知,可是以后等待的条件发生了变化,并无再次对等待条件进行判断,也会致使程序出现错误。
下面用一个例子来讲明这种状况
public class ConditionChange { private static List<String> lockObject = new ArrayList(); public static void main(String[] args) { Consumer consumer1 = new Consumer(lockObject); Consumer consumer2 = new Consumer(lockObject); Productor productor = new Productor(lockObject); consumer1.start(); consumer2.start(); productor.start(); } static class Consumer extends Thread { private List<String> lock; public Consumer(List lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { try { //这里使用if的话,就会存在wait条件变化形成程序错误的问题 if (lock.isEmpty()) { System.out.println(Thread.currentThread().getName() + " list为空"); System.out.println(Thread.currentThread().getName() + " 调用wait方法"); lock.wait(); System.out.println(Thread.currentThread().getName() + " wait方法结束"); } String element = lock.remove(0); System.out.println(Thread.currentThread().getName() + " 取出第一个元素为:" + element); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class Productor extends Thread { private List<String> lock; public Productor(List lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { System.out.println(Thread.currentThread().getName() + " 开始添加元素"); lock.add(Thread.currentThread().getName()); lock.notifyAll(); } } } }
会报异常:
Exception in thread "Thread-1" Thread-0 list为空 Thread-0 调用wait方法 Thread-1 list为空 Thread-1 调用wait方法 Thread-2 开始添加元素 Thread-1 wait方法结束 java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
异常缘由分析:
在这个例子中一共开启了3个线程,Consumer1,Consumer2以及Productor。首先Consumer1调用了wait方法后,线程处于了WAITTING状态,而且将对象锁释放出来。所以,Consumer2可以获取对象锁,从而进入到同步代块中,当执行到wait方法时,一样的也会释放对象锁。
所以,productor可以获取到对象锁,进入到同步代码块中,向list中插入数据后,经过notifyAll方法通知处于WAITING状态的Consumer1和Consumer2线程。
consumer1获得对象锁后,从wait方法出退出,删除了一个元素让List为空,方法执行结束,退出同步块,释放掉对象锁。这个时候Consumer2获取到对象锁后,从wait方法退出,继续往下执行,这个时候Consumer2再执行lock.remove(0);就会出错,由于List因为Consumer1删除一个元素以后已经为空了。
解决方案:
经过上面的分析,能够看出Consumer2报异常是由于线程从wait方法退出以后没有再次对wait条件进行判断,所以,此时的wait条件已经发生了变化。解决办法就是,在wait退出以后再对条件进行判断便可。
public class ConditionChange { private static List<String> lockObject = new ArrayList(); public static void main(String[] args) { Consumer consumer1 = new Consumer(lockObject); Consumer consumer2 = new Consumer(lockObject); Productor productor = new Productor(lockObject); consumer1.start(); consumer2.start(); productor.start(); } static class Consumer extends Thread { private List<String> lock; public Consumer(List lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { try { //这里使用if的话,就会存在wait条件变化形成程序错误的问题 while (lock.isEmpty()) { System.out.println(Thread.currentThread().getName() + " list为空"); System.out.println(Thread.currentThread().getName() + " 调用wait方法"); lock.wait(); System.out.println(Thread.currentThread().getName() + " wait方法结束"); } String element = lock.remove(0); System.out.println(Thread.currentThread().getName() + " 取出第一个元素为:" + element); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class Productor extends Thread { private List<String> lock; public Productor(List lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { System.out.println(Thread.currentThread().getName() + " 开始添加元素"); lock.add(Thread.currentThread().getName()); lock.notifyAll(); } } } }
上面的代码与以前的代码仅仅只是将 wait 外围的 if 语句改成 while 循环便可,这样当 list 为空时,线程便会继续等待,而不会继续去执行删除 list 中元素的代码。
总结:
在使用线程的等待/通知机制时,通常都要在 while 循环中调用 wait()方法,所以xuy配合使用一个 boolean 变量(或其余能判断真假的条件,如本文中的 list.isEmpty()),知足 while 循环的条件时,进入 while 循环,执行 wait()方法,不知足 while 循环的条件时,跳出循环,执行后面的代码。
三、“假死”状态
现象:若是是多消费者和多生产者状况,若是使用notify方法可能会出现“假死”的状况,即唤醒的是同类线程。
缘由分析:假设当前多个生产者线程会调用wait方法阻塞等待,当其中的生产者线程获取到对象锁以后使用notify通知处于WAITTING状态的线程,若是唤醒的仍然是生产者线程,就会形成全部的生产者线程都处于等待状态。
解决办法:将notify方法替换成notifyAll方法,若是使用的是lock的话,就将signal方法替换成signalAll方法。
总结
在Object提供的消息通知机制应该遵循以下这些条件:
永远在while循环中对条件进行判断而不是if语句中进行wait条件的判断;
使用NotifyAll而不是使用notify。
基本的使用范式以下:
// The standard idiom for calling the wait method in Java synchronized (sharedObject) { while (condition) { sharedObject.wait(); // (Releases lock, and reacquires on wakeup) } // do action based upon condition e.g. take or put into queue }
wait/notifyAll实现生产者-消费者
利用wait/notifyAll实现生产者和消费者代码以下:
public class ProductorConsumer { public static void main(String[] args) { LinkedList linkedList = new LinkedList(); ExecutorService service = Executors.newFixedThreadPool(15); for (int i = 0; i < 5; i++) { service.submit(new Productor(linkedList, 8)); } for (int i = 0; i < 10; i++) { service.submit(new Consumer(linkedList)); } } static class Productor implements Runnable { private List<Integer> list; private int maxLength; public Productor(List list, int maxLength) { this.list = list; this.maxLength = maxLength; } @Override public void run() { while (true) { synchronized (list) { try { while (list.size() == maxLength) { System.out.println("生产者" + Thread.currentThread().getName() + " list以达到最大容量,进行wait"); list.wait(); System.out.println("生产者" + Thread.currentThread().getName() + " 退出wait"); } Random random = new Random(); int i = random.nextInt(); System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + i); list.add(i); list.notifyAll(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } static class Consumer implements Runnable { private List<Integer> list; public Consumer(List list) { this.list = list; } @Override public void run() { while (true) { synchronized (list) { try { while (list.isEmpty()) { System.out.println("消费者" + Thread.currentThread().getName() + " list为空,进行wait"); list.wait(); System.out.println("消费者" + Thread.currentThread().getName() + " 退出wait"); } Integer element = list.remove(0); System.out.println("消费者" + Thread.currentThread().getName() + " 消费数据:" + element); list.notifyAll(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } }
输出结果:
生产者pool-1-thread-1 生产数据-232820990 生产者pool-1-thread-1 生产数据1432164130 生产者pool-1-thread-1 生产数据1057090222 生产者pool-1-thread-1 生产数据1201395916 生产者pool-1-thread-1 生产数据482766516 生产者pool-1-thread-1 list以达到最大容量,进行wait 消费者pool-1-thread-15 退出wait 消费者pool-1-thread-15 消费数据:1237535349 消费者pool-1-thread-15 消费数据:-1617438932 消费者pool-1-thread-15 消费数据:-535396055 消费者pool-1-thread-15 消费数据:-232820990 消费者pool-1-thread-15 消费数据:1432164130 消费者pool-1-thread-15 消费数据:1057090222 消费者pool-1-thread-15 消费数据:1201395916 消费者pool-1-thread-15 消费数据:482766516 消费者pool-1-thread-15 list为空,进行wait 生产者pool-1-thread-5 退出wait 生产者pool-1-thread-5 生产数据1442969724 生产者pool-1-thread-5 生产数据1177554422 生产者pool-1-thread-5 生产数据-133137235 生产者pool-1-thread-5 生产数据324882560 生产者pool-1-thread-5 生产数据2065211573 生产者pool-1-thread-5 生产数据253569900 生产者pool-1-thread-5 生产数据571277922 生产者pool-1-thread-5 生产数据1622323863 生产者pool-1-thread-5 list以达到最大容量,进行wait 消费者pool-1-thread-10 退出wait
参照Object的wait和notify/notifyAll方法,Condition也提供了一样的方法:
一、针对wait方法
void await() throws InterruptedException:当前线程进入等待状态,若是其余线程调用condition的signal或者signalAll方法而且当前线程获取Lock从await方法返回,若是在等待状态中被中断会抛出被中断异常;
long awaitNanos(long nanosTimeout):当前线程进入等待状态直到被通知,中断或者超时;
boolean await(long time, TimeUnit unit)throws InterruptedException:同第二种,支持自定义时间单位
boolean awaitUntil(Date deadline) throws InterruptedException:当前线程进入等待状态直到被通知,中断或者到了某个时间
二、针对notify方法
void signal():唤醒一个等待在condition上的线程,将该线程从等待队列中转移到同步队列中,若是在同步队列中可以竞争到Lock则能够从等待方法中返回。
void signalAll():与1的区别在于可以唤醒全部等待在condition上的线程
也就是说wait--->await,notify---->Signal。
若是采用lock中Conditon的消息通知原理来实现生产者-消费者问题,原理同使用wait/notifyAll同样。直接上代码:
public class ProductorConsumer { private static ReentrantLock lock = new ReentrantLock(); private static Condition full = lock.newCondition(); private static Condition empty = lock.newCondition(); public static void main(String[] args) { LinkedList linkedList = new LinkedList(); ExecutorService service = Executors.newFixedThreadPool(15); for (int i = 0; i < 5; i++) { service.submit(new Productor(linkedList, 8, lock)); } for (int i = 0; i < 10; i++) { service.submit(new Consumer(linkedList, lock)); } } static class Productor implements Runnable { private List<Integer> list; private int maxLength; private Lock lock; public Productor(List list, int maxLength, Lock lock) { this.list = list; this.maxLength = maxLength; this.lock = lock; } @Override public void run() { while (true) { lock.lock(); try { while (list.size() == maxLength) { System.out.println("生产者" + Thread.currentThread().getName() + " list以达到最大容量,进行wait"); full.await(); System.out.println("生产者" + Thread.currentThread().getName() + " 退出wait"); } Random random = new Random(); int i = random.nextInt(); System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + i); list.add(i); empty.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } } static class Consumer implements Runnable { private List<Integer> list; private Lock lock; public Consumer(List list, Lock lock) { this.list = list; this.lock = lock; } @Override public void run() { while (true) { lock.lock(); try { while (list.isEmpty()) { System.out.println("消费者" + Thread.currentThread().getName() + " list为空,进行wait"); empty.await(); System.out.println("消费者" + Thread.currentThread().getName() + " 退出wait"); } Integer element = list.remove(0); System.out.println("消费者" + Thread.currentThread().getName() + " 消费数据:" + element); full.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } } }
输出结果:
消费者pool-1-thread-9 消费数据:1146627506 消费者pool-1-thread-9 消费数据:1508001019 消费者pool-1-thread-9 消费数据:-600080565 消费者pool-1-thread-9 消费数据:-1000305429 消费者pool-1-thread-9 消费数据:-1270658620 消费者pool-1-thread-9 消费数据:1961046169 消费者pool-1-thread-9 消费数据:-307680655 消费者pool-1-thread-9 list为空,进行wait 消费者pool-1-thread-13 退出wait 消费者pool-1-thread-13 list为空,进行wait 消费者pool-1-thread-10 退出wait 生产者pool-1-thread-5 退出wait 生产者pool-1-thread-5 生产数据-892558288 生产者pool-1-thread-5 生产数据-1917220008 生产者pool-1-thread-5 生产数据2146351766 生产者pool-1-thread-5 生产数据452445380 生产者pool-1-thread-5 生产数据1695168334 生产者pool-1-thread-5 生产数据1979746693 生产者pool-1-thread-5 生产数据-1905436249 生产者pool-1-thread-5 生产数据-101410137 生产者pool-1-thread-5 list以达到最大容量,进行wait 生产者pool-1-thread-1 退出wait 生产者pool-1-thread-1 list以达到最大容量,进行wait 生产者pool-1-thread-4 退出wait 生产者pool-1-thread-4 list以达到最大容量,进行wait 生产者pool-1-thread-2 退出wait 生产者pool-1-thread-2 list以达到最大容量,进行wait 生产者pool-1-thread-3 退出wait 生产者pool-1-thread-3 list以达到最大容量,进行wait 消费者pool-1-thread-9 退出wait 消费者pool-1-thread-9 消费数据:-892558288
因为BlockingQueue内部实现就附加了两个阻塞操做。
即当队列已满时,阻塞向队列中插入数据的线程,直至队列中未满;当队列为空时,阻塞从队列中获取数据的线程,直至队列非空时为止。
能够利用BlockingQueue实现生产者-消费者为题,阻塞队列彻底能够充当共享数据区域,就能够很好的完成生产者和消费者线程之间的协做。
public class ProductorConsumer { private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(15); for (int i = 0; i < 5; i++) { service.submit(new Productor(queue)); } for (int i = 0; i < 10; i++) { service.submit(new Consumer(queue)); } } static class Productor implements Runnable { private BlockingQueue queue; public Productor(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { while (true) { Random random = new Random(); int i = random.nextInt(); System.out.println("生产者" + Thread.currentThread().getName() + "生产数据" + i); queue.put(i); } } catch (InterruptedException e) { e.printStackTrace(); } } } static class Consumer implements Runnable { private BlockingQueue queue; public Consumer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { while (true) { Integer element = (Integer) queue.take(); System.out.println("消费者" + Thread.currentThread().getName() + "正在消费数据" + element); } } catch (InterruptedException e) { e.printStackTrace(); } } } }
输出结果:
消费者pool-1-thread-7正在消费数据1520577501 生产者pool-1-thread-4生产数据-127809610 消费者pool-1-thread-8正在消费数据504316513 生产者pool-1-thread-2生产数据1994678907 消费者pool-1-thread-11正在消费数据1967302829 生产者pool-1-thread-1生产数据369331507 消费者pool-1-thread-9正在消费数据1994678907 生产者pool-1-thread-2生产数据-919544017 消费者pool-1-thread-12正在消费数据-127809610 生产者pool-1-thread-4生产数据1475197572 消费者pool-1-thread-14正在消费数据-893487914 生产者pool-1-thread-3生产数据906921688 消费者pool-1-thread-6正在消费数据-1292015016 生产者pool-1-thread-5生产数据-652105379 生产者pool-1-thread-5生产数据-1622505717 生产者pool-1-thread-3生产数据-1350268764 消费者pool-1-thread-7正在消费数据906921688 生产者pool-1-thread-4生产数据2091628867 消费者pool-1-thread-13正在消费数据1475197572 消费者pool-1-thread-15正在消费数据-919544017 生产者pool-1-thread-2生产数据564860122 生产者pool-1-thread-2生产数据822954707 消费者pool-1-thread-14正在消费数据564860122 消费者pool-1-thread-10正在消费数据369331507 生产者pool-1-thread-1生产数据-245820912 消费者pool-1-thread-6正在消费数据822954707 生产者pool-1-thread-2生产数据1724595968 生产者pool-1-thread-2生产数据-1151855115 消费者pool-1-thread-12正在消费数据2091628867 生产者pool-1-thread-4生产数据-1774364499 生产者pool-1-thread-4生产数据2006106757 消费者pool-1-thread-14正在消费数据-1774364499 生产者pool-1-thread-3生产数据-1070853639 消费者pool-1-thread-9正在消费数据-1350268764 消费者pool-1-thread-11正在消费数据-1622505717 生产者pool-1-thread-5生产数据355412953
能够看出,使用BlockingQueue来实现生产者-消费者很简洁,这正是利用了BlockingQueue插入和获取数据附加阻塞操做的特性。
关于生产者-消费者实现的三种方式,到这里就所有总结出来,但愿对你有帮助,同时若有错误欢迎指正。
我将面试题和答案都整理成了PDF文档,还有一套学习资料,涵盖Java虚拟机、spring框架、Java线程、数据结构、设计模式等等,但不只限于此。关注公众号【java圈子】获取资料,还有优质文章每日送达。