想要了解更多关于Java生产者消费者问题的演变吗?那就看看这篇文章吧,咱们分别用旧方法和新方法来处理这个问题。html
生产者消费者问题是一个典型的多进程同步问题。java
对于大多数人来讲,这个问题多是咱们在学校,执行第一次并行算法所遇到的第一个同步问题。算法
虽然它很简单,但一直是并行计算中的最大挑战 - 多个进程共享一个资源。安全
生产者和消费者两个程序,共享一个大小有限的公共缓冲区。数据结构
假设一个生产者“生产”一份数据并将其存储在缓冲区中,而一个消费者“消费”这份数据,并将这份数据从缓冲区中删除。多线程
再假设如今这两个程序在并发地运行,咱们须要确保当缓冲区的数据已满时,生产者不会放置新数据进来,也要确保当缓冲区的数据为空时,消费者不会试图删除数据缓冲区的数据。并发
为了解决上述的并发问题,生产者和消费者将不得不相互通讯。app
若是缓冲区已满,生产者将处于睡眠状态,直到有通知信息唤醒。ide
在消费者将一些数据从缓冲区删除后,消费者将通知生产者,随后生产者将从新开始填充数据到缓冲区中。性能
若是缓冲区内容为空的化,那么状况是同样的,只不过,消费者会先等待生产者的通知。
但若是这种沟通作得不恰当,在进程彼此等待的位置可能致使程序死锁。
首先来看一个典型的Java方案来解决这个问题。
package ProducerConsumer; import java.util.LinkedList; import java.util.Queue; public class ClassicProducerConsumerExample { public static void main(String[] args) throws InterruptedException { Buffer buffer = new Buffer(2); Thread producerThread = new Thread(new Runnable() { @Override public void run() { try { buffer.produce(); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread consumerThread = new Thread(new Runnable() { @Override public void run() { try { buffer.consume(); } catch (InterruptedException e) { e.printStackTrace(); } } }); producerThread.start(); consumerThread.start(); producerThread.join(); consumerThread.join(); } static class Buffer { private Queue<Integer> list; private int size; public Buffer(int size) { this.list = new LinkedList<>(); this.size = size; } public void produce() throws InterruptedException { int value = 0; while (true) { synchronized (this) { while (list.size() >= size) { // wait for the consumer wait(); } list.add(value); System.out.println("Produced " + value); value++; // notify the consumer notify(); Thread.sleep(1000); } } } public void consume() throws InterruptedException { while (true) { synchronized (this) { while (list.size() == 0) { // wait for the producer wait(); } int value = list.poll(); System.out.println("Consume " + value); // notify the producer notify(); Thread.sleep(1000); } } } } }
这里咱们有生产者和消费者两个线程,它们共享一个公共缓冲区。生产者线程开始产生新的元素并将它们存储在缓冲区。若是缓冲区已满,那么生产者线程进入睡眠状态,直到有通知唤醒。不然,生产者线程将会在缓冲区建立一个新元素而后通知消费者。就像我以前说的,这个过程也适用于消费者。若是缓冲区为空,那么消费者将等待生产者的通知。不然,消费者将从缓冲区删除一个元素并通知生产者。
正如你所看到的,在以前的例子中,生产者和消费者的工做都是管理缓冲区的对象。这些线程仅仅调用了buffer.produce()和buffer.consume()两个方法就搞定了一切。
对于缓冲区是否应该负责建立或者删除元素,一直都是一个有争议的话题,但在我看来,缓冲区不该该作这种事情。固然,这取决于你想要达到的目的,但在这种状况下,缓冲区应该只是负责以线程安全的形式存储合并元素,而不是生产新的元素。
因此,让咱们把生产和消费的逻辑从缓冲对象中进行解耦。
package ProducerConsumer; import java.util.LinkedList; import java.util.Queue; public class ProducerConsumerExample2 { public static void main(String[] args) throws InterruptedException { Buffer buffer = new Buffer(2); Thread producerThread = new Thread(() -> { try { int value = 0; while (true) { buffer.add(value); System.out.println("Produced " + value); value ++; Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); Thread consumerThread = new Thread(() -> { try { while (true) { int value = buffer.poll(); System.out.println("Consume " + value); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); producerThread.start(); consumerThread.start(); producerThread.join(); consumerThread.join(); } static class Buffer { private Queue<Integer> list; private int size; public Buffer(int size) { this.list = new LinkedList<>(); this.size = size; } public void add(int value) throws InterruptedException { synchronized (this) { while (list.size() >= size) { wait(); } list.add(value); notify(); } } public int poll() throws InterruptedException { synchronized (this) { while (list.size() == 0) { wait(); } int value = list.poll(); notify(); return value; } } } }
这样好多了,至少如今缓冲区仅仅负责以线程安全的形式来存储和删除元素。
不过,咱们还能够进一步改善。
在前面的例子中,咱们已经建立了一个缓冲区,每当存储一个元素以前,缓冲区将等待是否有可用的一个槽以防止没有足够的存储空间,而且,在合并以前,缓冲区也会等待一个新的元素出现,以确保存储和删除的操做是线程安全的。
可是,Java自己的库已经整合了这些操做。它被称之为BlockingQueue,在这里能够查看它的详细文档。
BlockingQueue是一个以线程安全的形式存入和取出实例的队列。而这就是咱们所须要的。
因此,若是咱们在示例中使用BlockingQueue,咱们就不须要再去实现等待和通知的机制。
接下来,咱们来看看具体的代码。
package ProducerConsumer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; public class ProducerConsumerWithBlockingQueue { public static void main(String[] args) throws InterruptedException { BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(2); Thread producerThread = new Thread(() -> { try { int value = 0; while (true) { blockingQueue.put(value); System.out.println("Produced " + value); value++; Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); Thread consumerThread = new Thread(() -> { try { while (true) { int value = blockingQueue.take(); System.out.println("Consume " + value); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); producerThread.start(); consumerThread.start(); producerThread.join(); consumerThread.join(); } }
虽然runnables看起来跟以前同样,他们按照以前的方式生产和消费元素。
惟一的区别在于,这里咱们使用blockingQueue代替缓冲区对象。
这儿有不少种类型的BlockingQueue:
×××队列
有界队列
一个×××队列几乎能够无限地增长元素,任何添加操做将不会被阻止。
你能够以这种方式去建立一个×××队列:
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();
在这种状况下,因为添加操做不会被阻塞,生产者添加新元素时能够不用等待。每次当生产者想要添加一个新元素时,会有一个队列先存储它。可是,这里面也存在一个异常须要捕获。若是消费者删除元素的速度比生产者添加新的元素要慢,那么内存将被填满,咱们将可能获得一个OutOfMemory异常。
与之相反的则是有界队列,存在一个固定大小。你能够这样去建立它:
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);
二者最主要的区别在于,使用有界队列的状况下,若是队列内存已满,而生产者仍然试图往里面塞元素,那么队列将会被阻塞(具体阻塞方式取决于添加元素的方法)直到有足够的空间腾出来。
往blocking queue里面添加元素一共有如下四种方式:
add() - 若是插入成功返回true,不然抛出IllegalStateException
put() - 往队列中插入元素,并在有必要的状况下等待一个可用的槽(slot)
offer() - 若是插入元素成功返回true,不然返回false
offer(E e, long timeout, TimeUnit unit) – 在队列没有满的状况下,或者为了一个可用的slot而等待指定的时间后,往队列中插入一个元素。
因此,若是你使用put()方法插入元素,而队列内存已满的状况下,咱们的生产者就必须等待,直到有可用的slot出现。
以上就是咱们上一个案例的所有,这跟ProducerConsumerExample2的工做原理是同样的。
还有什么地方咱们能够优化的?那首先来分析一下咱们干了什么,咱们实例化了两个线程,一个被叫作生产者,专门往队列里面塞元素,另外一个被叫作消费者,负责从队列里面删元素。
然而,好的软件技术代表,手动地去建立和销毁线程是很差的作法。首先建立线程是一项昂贵的任务,每建立一个线程,意味着要经历一遍下面的步骤:
首先要分配内存给一个线程堆栈
操做系统要建立一个原生线程对应于Java的线程
跟这个线程相关的描述符被添加到JVM内部的数据结构中
首先别误会我,咱们的案例中用了几个线程是没有问题的,而那也是并发工做的方式之一。这里的问题是,咱们是手动地去建立线程,这能够说是一次糟糕的实践。若是咱们手动地建立线程,除了建立过程当中的消耗外,还有另外一个问题,就是咱们没法控制同时有多少个线程在运行。举个例子,若是同时有一百万次请求线上服务,那么每一次请求都会相应的建立一个线程,那么同时会有一百万个线程在后台运行,这将会致使[thread starvation](https://en.wikipedia.org/wiki/Starvation_(computer_science))
因此,咱们须要一种全局管理线程的方式,这就用到了线程池。
线程池将基于咱们选择的策略来处理线程的生命周期。它拥有有限数量的空闲线程,并在须要解决任务时启用它们。经过这种方式,咱们不须要为每个新的请求建立一个新线程,所以,咱们能够避免出现线程饥饿的问题。
Java线程池的实现包括:
一个任务队列
一个工做线程的集合
一个线程工厂
管理线程池状态的元数据
为了同时运行一些任务,你必须把他们先放到任务队列里。而后,当一个线程可用的时候,它将接收一个任务并运行它。可用的线程越多,并行执行的任务就越多。
除了管理线程生命周期,使用线程池还有另外一个好处,当你计划如何分割任务,以便同时执行时,你能想到更多种方式。并行性的单位再也不是线程了,而是任务。你设计一些任务来并发执行,而不是让一些线程经过共享公共的内存块来并发运行。按照功能需求来思考的方式能够帮助咱们避免一些常见的多线程问题,如死锁或数据竞争等。没有什么能够阻止咱们再次深刻这些问题,可是,因为使用了功能范式,咱们没办法命令式地同步并行计算(锁)。这比直接使用线程和共享内存所能碰到的概率要少的多。在咱们的例子中,共享一个阻塞队列不是想要的状况,但我就是想强调这个优点。
说了那么多,接下来咱们看看在案例中如何使用线程池。
package ProducerConsumer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; public class ProducerConsumerExecutorService { public static void main(String[] args) { BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(2); ExecutorService executor = Executors.newFixedThreadPool(2); Runnable producerTask = () -> { try { int value = 0; while (true) { blockingQueue.put(value); System.out.println("Produced " + value); value++; Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }; Runnable consumerTask = () -> { try { while (true) { int value = blockingQueue.take(); System.out.println("Consume " + value); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }; executor.execute(producerTask); executor.execute(consumerTask); executor.shutdown(); } }
这里的区别在于,咱们不在手动建立或运行消费者和生产者线程。咱们创建一个线程池,它将收到两个任务,生产者和消费者的任务。生产者和消费者的任务,实际上跟以前例子里面使用的runnable是相同的。如今,执行程序(线程池实现)将接收任务,并安排它的工做线程去执行他们。
在咱们简单的案例下,一切都跟以前同样运行。就像以前的例子,咱们仍然有两个线程,他们仍然要以一样的方式生产和消费元素。虽然咱们并无让性能获得提高,可是代码看起来干净多了。咱们再也不手动建立线程,而只是具体说明咱们想要什么:咱们想要并发执行某些任务。
因此,当你使用一个线程池时。你不须要考虑线程是并发执行的单位,相反的,你把一些任务看做并发执行的就好。以上就是你须要知道的,剩下的由执行程序去处理。执行程序会收到一些任务,而后,它会分配工做线程去处理它们。
首先,咱们看到了一个“传统”的消费者-生产者问题的解决方案。咱们尽可能避免了重复造没有必要的车轮,偏偏相反,咱们重用了已经测试过的解决方案,所以,咱们不是写一个通知等待系统,而是尝试使用Java已经提供的blocking queue,由于Java为咱们提供了一个很是有效的线程池来管理线程生命周期,让咱们能够摆脱手动建立线程。经过这些改进,消费者-生产者问题的解决方案看起来更可靠和更好理解。