生产者消费者模式在咱们平常工做中用得很是多,好比:在模块解耦、消息队列、分布式场景中都很常见。这个模式里有三个角色,他们之间的关系是以下图这样的:前端
从图中 3 和 4 能够知道:不管阻塞队列是满仍是空均可能会产生阻塞,阻塞以后就要在合适的时候去唤醒被阻塞的线程。java
Q1:那何时会唤醒阻塞线程?面试
当消费者判断队列为空时,消费者线程进入等待。这期间生产者一旦往队列中放入数据,就会通知全部的消费者,唤醒阻塞的消费者线程。算法
反之,当生产者判断队列已满,生产者线程进入等待。这期间消费者一旦消费了数据、队列有空位,就会通知全部的生产者,唤醒阻塞的生产者线程。数据库
Q2:为何要用这种模式?编程
看了上面的 Q1,你们发现没有?生产者不用管消费者的动做,消费者也不用管生产者的动做;它两之间就是经过阻塞队列通讯,实现了解耦;阻塞队列的加入,平衡两者能力;生产者只有在队列满或消费者只有在队列空时才会等待,其余时间谁抢到锁谁工做,提升效率。以上就是缘由~设计模式
上篇文章《正确使用 wait、notify/notifyAll》说过,wait 让当前线程等待并释放锁,notify 唤醒任意一个等待同一个锁的线程,notifyAll 则是唤醒全部等待该锁的线程,而后谁抢到锁,谁执行。这就是所谓的等待唤醒机制安全
先来看看用等待唤醒机制如何实现生产者、消费者模式的,首先是阻塞队列:微信
public class MyBlockingQueue { private int maxSize; private LinkedList<Integer> queue; public MyBlockingQueue(int size) { this.maxSize = size; queue = new LinkedList<>(); } public synchronized void put() throws InterruptedException { while (queue.size() == maxSize) { System.out.println("队列已满,生产者: " + Thread.currentThread().getName() +"进入等待"); wait(); } Random random = new Random(); int i = random.nextInt(); System.out.println("队列未满,生产者: " + Thread.currentThread().getName() +"放入数据" + i); // 队列空才去唤醒消费者,其余时间自由竞争锁 if (queue.size() == 0) { notifyAll(); } queue.add(i); } public synchronized void take() throws InterruptedException { while (queue.size() == 0) { System.out.println("队列为空,消费者: " + Thread.currentThread().getName() +"进入等待"); wait(); } // 队列满了才去唤醒生产者,其余时间自由竞争锁 if (queue.size() == maxSize) { notifyAll(); } System.out.println("队列有数据,消费者: " + Thread.currentThread().getName() +"取出数据: " + queue.remove()); } }
主要逻辑在阻塞队列这边:先看 put 方法,while 检查队列是否满?满则进入等待并主动释放锁,不满则生产数据,同时判断放入数据以前队列是否空?空则唤醒消费者(由于队列已有数据,可消费)。数据结构
再看 take 方法,while 检查队列是否空?空则进入等待并主动释放锁,不空则生产数据,同时判断取出数据以前队列是否已满?满则唤醒生产者(由于队列已有空位,可生产)。
为何是 while 不是 if ?
你们可能有个疑问。为何判断队列 size 进入等待状态这里是用 while,不能用 if 吗?就这个 demo 而言,是能够的。由于咱们的生产者和消费者线程都只有一个,可是多线程状况下用 if 就大错特错了。想象如下状况:
假设有两个消费者一个生产者。队列为空,消费者一进入等待状态,释放锁。消费者二抢到锁,进入 if(queue.size == 0) 的判断,也进入等待,释放锁。这时生产者抢到锁生产数据,队列有数据了。反过来唤醒两个消费者。
消费者一抢到锁执行 wait() 后的逻辑,取完数据释放锁。这时消费者二拿到锁,执行 wait() 后的逻辑取数据,可是此时队列的数据已被消费者一取出,没有数据了,这时就会报异常了。
而用 while 为何能够?由于不论是消费者一仍是二抢到锁,循环体的逻辑以前。根据 while 的语法,它会再一次判断条件是否成立,而 if 不会。这就是用 while 不用 if 的缘由。
生产者:
public class Producer implements Runnable { private MyBlockingQueue myBlockingQueue; public Producer(MyBlockingQueue myBlockingQueue) { this.myBlockingQueue = myBlockingQueue; } @Override public void run() { for (int i = 0; i < 100; i++) { try { myBlockingQueue.put(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
消费者:
public class Consumer implements Runnable{ private MyBlockingQueue myBlockingQueue; public Consumer(MyBlockingQueue myBlockingQueue) { this.myBlockingQueue = myBlockingQueue; } @Override public void run() { for (int i = 0; i < 100; i++) { try { myBlockingQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
测试类:
public class MyBlockingQueueTest { public static void main(String[] args) { MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10); Producer producer = new Producer(myBlockingQueue); Consumer consumer = new Consumer(myBlockingQueue); new Thread(producer).start(); new Thread(consumer).start(); } }
Condition 是一个多线程间协调通讯的工具类,它的 await、sign/signAll 方法正好对应Object 的 wait、notify/notifyAll 方法。相比于 Object 的 wait、notify 方法,Condition 的 await、signal 结合的方式实现线程间协做更加安全和高效,因此更推荐这种方式实现线程间协做。关于 Condition 后面章节会继续研究,敬请关注
Object 的 wait、notify 方式须要结合 synchronized 关键字实现等待唤醒机制,一样 Condition 也须要结合 Lock 类-。那么这种方式如何实现生产者、消费者模式?看代码:
public class MyBlockingQueueForCondition { private Queue<Integer> queue; private int max = 10; private ReentrantLock lock = new ReentrantLock(); private Condition notEmpty = lock.newCondition(); private Condition notFull = lock.newCondition(); public MyBlockingQueueForCondition(int size) { this.max = size; queue = new LinkedList(); } public void put(Integer i) throws InterruptedException { // 加锁 lock.lock(); try { // 队列满了,进入等待 while (queue.size() == max) { System.out.println("队列已满,生产者: " + Thread.currentThread().getName() + "进入等待"); notFull.await(); } // 加入数据以前,队列为空?通知消费者,能够消费 if (queue.size() == 0) { notEmpty.signalAll(); } // 不然,继续生产 queue.add(i); } finally { // 最后别忘记释放锁 lock.unlock(); } } public Integer take() throws InterruptedException { // 加锁 lock.lock(); try { // 队列无数据,进入等待 while (queue.size() == 0) { System.out.println("队列为空,消费者: " + Thread.currentThread().getName() + "进入等待"); notEmpty.await(); } // 取出数据以前,队列已满?通知生产者,能够生产 if (queue.size() == max) { notFull.signalAll(); } // 不然,取出 return queue.remove(); } finally { // 最后别忘记释放锁 lock.unlock(); } } }
首先,定义了一个队列以及 ReentrantLock 类型的锁,在这基础上还建立 notFull、notEmpty 两个条件,分别表明未满、不为空的条件。最后定义了 take、put 方法。
take 和 put 逻辑差很少,这里只说 put 。由于消费生产模式确定用于多线程环境,须要保证同步。这里仍是先获取锁,确保同步。以后依然是判断队列是否已满?满了进入等待并释放锁,不满则继续生产,同时判断队列在生产前是否为空,为空才去唤醒消费者。不然不唤醒,由于当队列为空消费者才进入阻塞。
PS:最后是一个很是重要的细节,在 finally 里面释放锁,不然有可能出现异常没法释放锁的状况。
生产者:
public class ProducerForCondition implements Runnable { private MyBlockingQueueForCondition myBlockingQueueForCondition; public ProducerForCondition(MyBlockingQueueForCondition myBlockingQueueForCondition) { this.myBlockingQueueForCondition = myBlockingQueueForCondition; } @Override public void run() { for (int i = 0; i < 100; i++) { try { myBlockingQueueForCondition.put(i); } catch (InterruptedException e) { e.printStackTrace(); } } } }
消费者:
public class ConsumerForCondition implements Runnable{ private MyBlockingQueueForCondition myBlockingQueueForCondition; public ConsumerForCondition(MyBlockingQueueForCondition myBlockingQueueForCondition) { this.myBlockingQueueForCondition = myBlockingQueueForCondition; } @Override public void run() { for (int i = 0; i < 100; i++) { try { System.out.println("消费者取出数据: " + myBlockingQueueForCondition.take()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
测试类:
public class MyBlockingQueueForConditionTest { public static void main(String[] args) { MyBlockingQueueForCondition myBlockingQueueForCondition = new MyBlockingQueueForCondition(10); ProducerForCondition producerForCondition = new ProducerForCondition(myBlockingQueueForCondition); ConsumerForCondition consumerForCondition = new ConsumerForCondition(myBlockingQueueForCondition); new Thread(producerForCondition).start(); new Thread(consumerForCondition).start(); } }
看完前两种方式以后,有些小伙伴可能会说,实现个生产者消费者这么烦么?其实主要代码仍是在阻塞队列,这点 Java 早就为咱们考虑好了,它提供了 BlockingQueue 接口,并有实现类: ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、等。(关于阻塞队列,狗哥的多线程系列后面也会讲到)
咱们选用最简单的 ArrayBlockingQueue 实现。它的内部也是采起 ReentrantLock 和 Condition 结合的等待唤醒机制。因此,上面的两种方式实际上是为这种方式铺垫。很少比比,上代码:
public class ArrayBlockingQueueTest { public static void main(String[] args) { // 初始化长度为 10 的 ArrayBlockingQueue BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); // 生产者 Runnable producer = () -> { try { // 放入数据 Random random = new Random(); while (true) { queue.put(random.nextInt()); } } catch (Exception e) { System.out.println("生产数据出错: " + e.getMessage()); } }; // 开启线程生产数据 new Thread(producer).start(); // 消费者 Runnable consumer = () -> { try { // 取出数据 while (true) { System.out.println(queue.take()); } } catch (Exception e) { System.out.println("消费数据出错: " + e.getMessage()); } }; // 开启线程消费数据 new Thread(consumer).start(); } }
建立一个 ArrayBlockingQueue 并给定最大长度为 10,建立生产者和消费者。生产者在 while(true) 里面一直生产,与此同时消费者也是不断取数据,有数据就取出来。
看着是否是很简单?但其实背后 ArrayBlockingQueue 已经为咱们作好了线程间通讯的工做了,好比队列满了就去阻塞生产者线程,队列有空就去唤醒生产者线程等。
看了这几个例子以后,相信你对生产者消费者模式也有所了解。之后面试官让你手写一个阻塞队列,确定也难不倒你。
若是看到这里,喜欢这篇文章的话,请帮点个好看。微信搜索一个优秀的废人,关注后回复电子书送你 100+ 本编程电子书 ,不仅 Java 哦,详情看下图。回复 1024送你一套完整的 java 视频教程。