生产者消费者模式是程序设计中很是常见的一种设计模式,被普遍运用在解耦、消息队列等场景。java
使用生产者消费者模式一般须要在二者之间增长一个阻塞队列做为媒介,有了媒介以后就至关于有了一个缓冲,平衡了二者的能力。编程
总体如上图所示,最上面是阻塞队列,右侧的 1 是生产者线程,生产者在生产数据后将数据存放在阻塞队列中,左侧的 2 是消费者线程,消费者获取阻塞队列中的数据。设计模式
而中间的 3 和 4 分别表明生产者消费者之间互相通讯的过程,由于不管阻塞队列是满仍是空均可能会产生阻塞,阻塞以后就须要在合适的时机去唤醒被阻塞的线程。多线程
那么何时阻塞线程须要被唤醒呢?有两种状况。编程语言
第一种状况是当消费者看到阻塞队列为空时,开始进入等待,这时生产者一旦往队列中放入数据,就会通知全部的消费者,唤醒阻塞的消费者线程。ide
另外一种状况是若是生产者发现队列已经满了,也会被阻塞,而一旦消费者获取数据以后就至关于队列空了一个位置,这时消费者就会通知全部正在阻塞的生产者进行生产。this
import java.util.concurrent.ArrayBlockingQueue; /** * 使用阻塞队列实现一个生产者与消费者模型 * * @author xiandongxie */ public class ProducerAndConsumer { private static ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(10); public static void main(String[] args) throws InterruptedException { Producer producer = new Producer(); Consumer consumer = new Consumer(); Thread producer1 = new Thread(producer, "producer-1"); Thread producer2 = new Thread(producer, "producer-2"); Thread consumer1 = new Thread(consumer, "consumer-2"); Thread consumer2 = new Thread(consumer, "consumer-2"); producer1.start(); producer2.start(); consumer1.start(); consumer2.start(); Thread.sleep(5); producer1.interrupt(); Thread.sleep(5); producer2.interrupt(); Thread.sleep(5); consumer1.interrupt(); consumer2.interrupt(); } static class Producer implements Runnable { @Override public void run() { int count = 0; while (true && !Thread.currentThread().isInterrupted()) { count++; String message = Thread.currentThread().getName() + " message=" + count; try { queue.put(message); } catch (InterruptedException e) { Thread.currentThread().interrupt(); e.printStackTrace(); } } } } static class Consumer implements Runnable { @Override public void run() { while (true && !Thread.currentThread().isInterrupted()) { try { String take = queue.take(); System.out.println(Thread.currentThread().getName() + ",消费信息:" + take); } catch (InterruptedException e) { Thread.currentThread().interrupt(); e.printStackTrace(); } } } } }
import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * 采用 Condition 自定义阻塞队列实现消费者与生产者 * * @author xiandongxie */ public class MyBlockingQueueForCondition<E> { private Queue<E> queue; private int max = 16; private ReentrantLock lock = new ReentrantLock(); // 没有空,则消费者能够消费,标记 消费者 private Condition notEmpty = lock.newCondition(); // 没有满,则生产者能够生产,标记 生产者 private Condition notFull = lock.newCondition(); public MyBlockingQueueForCondition(int size) { this.max = size; queue = new LinkedList(); } public void put(E o) throws InterruptedException { lock.lock(); try { while (queue.size() == max) { // 若是满了,阻塞生产者线程,释放 Lock notFull.await(); } queue.add(o); // 有数据了,通知等待的消费者,并唤醒 notEmpty.signalAll(); } finally { lock.unlock(); } } public E take() throws InterruptedException { lock.lock(); try { while (queue.size() == 0) { // 若是为空,阻塞消费者线程 notEmpty.await(); } E item = queue.remove(); // queue 未满,唤醒生产者 notFull.signalAll(); return item; } finally { lock.unlock(); } } }
import java.util.LinkedList; /** * 采用 wait,notify 实现阻塞队列 * * @author xiandongxie */ public class MyBlockingQueue<E> { private int maxSize; private LinkedList<E> storage; public MyBlockingQueue(int maxSize) { this.maxSize = maxSize; storage = new LinkedList<>(); } public synchronized void put(E e) throws InterruptedException { try { while (storage.size() == maxSize) { // 满了 wait(); } storage.add(e); } finally { notifyAll(); } } public synchronized E take() throws InterruptedException { try { while (storage.size() == 0) { // 没有数据 wait(); } return storage.remove(); } finally { notifyAll(); } } }