9、生产者与消费者模式

生产者消费者模式

  • 生产者消费者模式是程序设计中很是常见的一种设计模式,被普遍运用在解耦、消息队列等场景。java

  • 使用生产者消费者模式一般须要在二者之间增长一个阻塞队列做为媒介,有了媒介以后就至关于有了一个缓冲,平衡了二者的能力。编程

  • 总体如上图所示,最上面是阻塞队列,右侧的 1 是生产者线程,生产者在生产数据后将数据存放在阻塞队列中,左侧的 2 是消费者线程,消费者获取阻塞队列中的数据。设计模式

  • 而中间的 3 和 4 分别表明生产者消费者之间互相通讯的过程,由于不管阻塞队列是满仍是空均可能会产生阻塞,阻塞以后就须要在合适的时机去唤醒被阻塞的线程。多线程

  • 那么何时阻塞线程须要被唤醒呢?有两种状况。编程语言

  • 第一种状况是当消费者看到阻塞队列为空时,开始进入等待,这时生产者一旦往队列中放入数据,就会通知全部的消费者,唤醒阻塞的消费者线程。ide

  • 另外一种状况是若是生产者发现队列已经满了,也会被阻塞,而一旦消费者获取数据以后就至关于队列空了一个位置,这时消费者就会通知全部正在阻塞的生产者进行生产。this

使用 BlockingQueue 实现生产者消费者模式

import java.util.concurrent.ArrayBlockingQueue;

/**
 * 使用阻塞队列实现一个生产者与消费者模型
 *
 * @author xiandongxie
 */
public class ProducerAndConsumer {

    private static ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

    public static void main(String[] args) throws InterruptedException {
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        Thread producer1 = new Thread(producer, "producer-1");
        Thread producer2 = new Thread(producer, "producer-2");
        Thread consumer1 = new Thread(consumer, "consumer-2");
        Thread consumer2 = new Thread(consumer, "consumer-2");

        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();

        Thread.sleep(5);
        producer1.interrupt();
        Thread.sleep(5);
        producer2.interrupt();

        Thread.sleep(5);
        consumer1.interrupt();
        consumer2.interrupt();

    }

    static class Producer implements Runnable {
        @Override
        public void run() {
            int count = 0;
            while (true && !Thread.currentThread().isInterrupted()) {
                count++;
                String message = Thread.currentThread().getName() + " message=" + count;
                try {
                    queue.put(message);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    e.printStackTrace();
                }
            }
        }
    }

    static class Consumer implements Runnable {

        @Override
        public void run() {
            while (true && !Thread.currentThread().isInterrupted()) {
                try {
                    String take = queue.take();
                    System.out.println(Thread.currentThread().getName() + ",消费信息:" + take);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    e.printStackTrace();
                }
            }
        }
    }
}

使用 Condition 实现生产者消费者模式

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 采用 Condition 自定义阻塞队列实现消费者与生产者
 *
 * @author xiandongxie
 */
public class MyBlockingQueueForCondition<E> {

    private Queue<E> queue;
    private int max = 16;
    private ReentrantLock lock = new ReentrantLock();
    // 没有空,则消费者能够消费,标记 消费者
    private Condition notEmpty = lock.newCondition();
    // 没有满,则生产者能够生产,标记 生产者
    private Condition notFull = lock.newCondition();

    public MyBlockingQueueForCondition(int size) {
        this.max = size;
        queue = new LinkedList();
    }

    public void put(E o) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == max) {
                // 若是满了,阻塞生产者线程,释放 Lock
                notFull.await();
            }
            queue.add(o);
            // 有数据了,通知等待的消费者,并唤醒
            notEmpty.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == 0) {
                // 若是为空,阻塞消费者线程
                notEmpty.await();
            }
            E item = queue.remove();
            // queue 未满,唤醒生产者
            notFull.signalAll();
            return item;
        } finally {
            lock.unlock();
        }
    }

}
  • 这里须要注意,在 take() 方法中使用 while( queue.size() == 0 ) 检查队列状态,而不能用 if( queue.size() == 0 )。
  • 由于生产者消费者每每是多线程的,假设有两个消费者,第一个消费者线程获取数据时,发现队列为空,便进入等待状态;
  • 由于第一个线程在等待时会释放 Lock 锁,因此第二个消费者能够进入并执行 if( queue.size() == 0 ),也发现队列为空,因而第二个线程也进入等待;
  • 而此时,若是生产者生产了一个数据,便会唤醒两个消费者线程,而两个线程中只有一个线程能够拿到锁,并执行 queue.remove 操做,另一个线程由于没有拿到锁而卡在被唤醒的地方,而第一个线程执行完操做后会在 finally 中经过 unlock 解锁,而此时第二个线程即可以拿到被第一个线程释放的锁,继续执行操做,也会去调用 queue.remove 操做,然而这个时候队列已经为空了,因此会抛出 NoSuchElementException 异常,这不符合逻辑。
  • 而若是用 while 作检查,当第一个消费者被唤醒获得锁并移除数据以后,第二个线程在执行 remove 前仍会进行 while 检查,发现此时依然知足 queue.size() == 0 的条件,就会继续执行 await 方法,避免了获取的数据为 null 或抛出异常的状况。
  • 多线程的代码大部分都用 while 而不用 if,无论线程在哪被切换中止了,while 的话,线程上次切换判断结果对下次切换判断没有影响,可是if的话,若线程切换前,条件成立过了,可是该线程再次拿到 cpu 使用权的时候,其实条件已经不成立了,因此不该该执行。(本质缘由:就是原子性问题,CPU 严重的原子性是针对 CPU 指令的,而不是针对高级编程语言的语句的)。

使用 wait/notify 实现生产者消费者模式

import java.util.LinkedList;

/**
 * 采用 wait,notify 实现阻塞队列
 *
 * @author xiandongxie
 */
public class MyBlockingQueue<E> {
    private int maxSize;
    private LinkedList<E> storage;

    public MyBlockingQueue(int maxSize) {
        this.maxSize = maxSize;
        storage = new LinkedList<>();
    }

    public synchronized void put(E e) throws InterruptedException {
        try {
            while (storage.size() == maxSize) {
                // 满了
                wait();
            }
            storage.add(e);
        } finally {
            notifyAll();
        }

    }

    public synchronized E take() throws InterruptedException {
        try {
            while (storage.size() == 0) {
                // 没有数据
                wait();
            }
            return storage.remove();
        } finally {
            notifyAll();
        }
    }

}
相关文章
相关标签/搜索