在IT技术面试过程当中,咱们常常会遇到生产者消费者问题(Producer-consumer problem), 这是多线程并发协做问题的经典案例。场景中包含三个对象,生产者(Producer),消费者(Consumer)以及一个固定大小的缓冲区(Buffer)。生产者的主要做用是不断生成数据放到缓冲区,消费者则从缓冲区不断消耗数据。该问题的关键是如何线程安全的操做共享数据块,保证生产者线程和消费者线程能够正确的更新数据块,主要考虑 1. 生产者不会在缓冲区满时加入数据. 2. 消费者应当中止在缓冲区时消耗数据. 3. 在同一时间应当只容许一个生产者或者消费者访问共享缓冲区(这一点是对于互斥操做访问共享区块的要求)。java
解决问题以上问题一般有信号量,wait & notify, 管道或者阻塞队列等几种思路。本文以Java语言为例一一进行举例讲解。面试
信号量(Semaphore)也称信号灯,是用来控制资源被同时访问的个数,好比控制访问数据库最大链接数的数量,线程经过acquire()得到链接许可,完成数据操做后,经过release()释放许可。对于生产者消费者问题来讲,为了知足线程安全操做的要求,同一时间咱们只容许一个线程访问共享数据区,所以须要一个大小为1的信号量mutex来控制互斥操做。注意到咱们还定义了notFull 和 notEmpty 信号量,notFull用于标识当前可用区块的空间大小,当notFull size 大于0时代表"not full", producer 能够继续生产,等于0时表示空间已满,没法继续生产;一样,对于notEmpty信号量来讲,大于0时代表 "not empty", consumer能够继续消耗,等于0 时代表没有产品,没法继续消耗。notFull初始size 为5 (5个available空间可供生产),notEmpty初始为0(没有产品可供消耗)。数据库
/*** 数据仓储class,全部的producer和consumer共享这个class对象 **/ static class DataWareHouse { //共享数据区 private final Queue<String> data = new LinkedList(); //非满锁 private final Semaphore notFull; //非空锁 private final Semaphore notEmpty; //互斥锁 private final Semaphore mutex; public DataWareHouse(int capacity) { this.notFull = new Semaphore(capacity); this.notEmpty = new Semaphore(0); mutex = new Semaphore(1); } public void offer(String x) throws InterruptedException { notFull.acquire(); //producer获取信号,notFull信号量减一 mutex.acquire(); //当前进程得到信号,mutex信号量减1,其余线程被阻塞操做共享区块data data.add(x); mutex.release(); //mutex信号量+1, 其余线程能够继续信号操做共享区块data notEmpty.release(); //成功生产数据,notEmpty信号量加1 } public String poll() throws InterruptedException { notEmpty.acquire(); //notEmpty信号减一 mutex.acquire(); String result = data.poll(); mutex.release(); notFull.release(); //成功消耗数据, notFull信号量加1 return result; } } /**Producer线程**/ static class Producer implements Runnable { private final DataWareHouse dataWareHouse; public Producer(final DataWareHouse dataWareHouse) { this.dataWareHouse = dataWareHouse; } @Override public void run() { while (true) { try { Thread.sleep(100); //生产的速度慢于消耗的速率 String s = UUID.randomUUID().toString(); System.out.println("put data " + s); dataWareHouse.offer(s); } catch (InterruptedException e) { e.printStackTrace(); } } } } /**Consumer线程**/ static class Consumer implements Runnable { private final DataWareHouse dataWareHouse; public Consumer(final DataWareHouse dataWareHouse) { this.dataWareHouse = dataWareHouse; } @Override public void run() { while (true) { while (true) { try { System.out.println("get data " + dataWareHouse.poll()); } catch (InterruptedException e) { e.printStackTrace(); } } } } } //测试代码 public static void main(String[] args) { final DataWareHouse dataWareHouse = new DataWareHouse(5); //三个producer 持续生产 for (int i = 0; i < 3; i++) { Thread t = new Thread(new Producer(dataWareHouse)); t.start(); } //三个consumer 持续消耗 for (int i = 0; i < 3; i++) { Thread t = new Thread(new Consumer(dataWareHouse)); t.start(); } }
Java Object对象类中包含三个final methods来容许线程之间进行通讯,告知资源的状态。它们分别是wait(), notify(), 和notifyAll()。安全
wait(): 顾名思义告诉当前线程释放锁,陷入休眠状态(waiting状态),等待资源。wait 方法自己是一个native method,它在Java中的使用语法以下所示:多线程
synchronized(lockObject ) { while( ! condition ) { lockObject.wait(); } //take the action here; }
notify(): 用于唤醒waiting状态的线程, 同时释放锁,被唤醒的线程能够从新得到锁访问资源。它的基本语法 以下并发
synchronized(lockObject) { //establish_the_condition; lockObject.notify(); //any additional code if needed }
notifyAll(): 不一样于notify(),它用于唤醒全部处于waiting状态的线程。语法以下:dom
synchronized(lockObject) { establish_the_condition; lockObject.notifyAll(); }
说完了这三个方法,来看下如何使用wait & notify(All) 来解决咱们的问题。新的DataWareHouse 类以下所示:ide
//producer类和consumer共享对象 static class DataWareHouse { //共享数据区 private final Queue<String> data = new LinkedList(); private int capacity; private int size = 0; public DataWareHouse(int capacity) { this.capacity = capacity; } public synchronized void offer(String x) throws InterruptedException { while (size == capacity) { //当buffer满时,producer进入waiting 状态 this.wait(); //使用this对象来加锁 } data.add(x); size++; notifyAll(); //当buffer 有数据时,唤醒全部等待的consumer线程 } public synchronized String poll() throws InterruptedException { while (size == 0) {//当buffer为空时,consumer 进入等待状态 this.wait(); } String result = data.poll(); size--; notifyAll(); //当数据被消耗,空间被释放,通知全部等待的producer。 return result; } }
Note: 在方法上使用synchronized 等价于在方法体内使用synchronized(this),二者都是使用this对象做为锁。工具
生产者和消费者类,以及测试代码和 信号量相同,不作重复列举了。测试
管道Pipe是实现进程或者线程(线程之间一般经过共享内存实现通信,而进程则经过scoket,管道,消息队列等技术)之间通讯经常使用方式,它链接输入流和输出流,基于生产者- 消费者模式构建的一种技术。具体实现能够经过建立一个管道输入流对象和管道输出流对象,而后将输入流和输出流就行连接,生产者经过往管道中写入数据,而消费者在管道数据流中读取数据,经过这种方式就实现了线程之间的互相通信。
具体实现代码以下所示
public class PipeSolution { static class DataWareHouse implements Closeable { private final PipedInputStream pis; private final PipedOutputStream pos; public DataWareHouse() throws IOException { pis = new PipedInputStream(); pos = new PipedOutputStream(); pis.connect(pos); //链接管道 } //向管道中写入数据 public void offer(int val) throws IOException { pos.write(val); pos.flush(); } //从管道中取数据. public int poll() throws IOException { //当管道中没有数据,方法阻塞 return pis.read(); } //关闭管道 @Override public void close() throws IOException { if (pis != null) { pis.close(); } if (pos != null) { pos.close(); } } } //consumer类 static class Consumer implements Runnable { private final DataWareHouse dataWareHouse; Consumer(DataWareHouse dataWareHouse) { this.dataWareHouse = dataWareHouse; } @Override public void run() { try { //消费者不断从管道中读取数据 while (true) { int num = dataWareHouse.poll(); System.out.println("get data +" + num); } } catch (IOException e) { throw new RuntimeException(e); } } } static class Producer implements Runnable { private final DataWareHouse dataWareHouse; private final Random random = new Random(); Producer(DataWareHouse dataWareHouse) { this.dataWareHouse = dataWareHouse; } @Override public void run() { try { //生产者不断向管道中写入数据 while (true) { int num = random.nextInt(256); dataWareHouse.offer(num); System.out.println("put data +" + num); Thread.sleep(1000); } } catch (Exception e) { throw new RuntimeException(e); } } public static void main(String[] args) throws IOException { DataWareHouse dataWareHouse = new DataWareHouse(); new Thread(new Producer(dataWareHouse)).start(); new Thread(new Consumer(dataWareHouse)).start(); } }
阻塞队列(BlockingQueue),具备1. 当队列满了的时候阻塞入队列操做 2. 当队列空了的时候阻塞出队列操做 3. 线程安全 的特性,于是阻塞队列一般被视为实现生产消费者模式最便捷的工具,其中DataWareHouse类实现代码以下:
static class DataWareHouse { //共享数据区 private final BlockingQueue<String> blockingQueue; public DataWareHouse(int capacity) { this.blockingQueue = new ArrayBlockingQueue<>(capacity); } public void offer(String x) { blockingQueue.offer(x); } public String poll() { return blockingQueue.poll(); } }
生产者和消费者类,以及测试代码和 信号量 相同,在此不作重复列举了。
生产者消费者问题是面试中常常会遇到的题目,本文总结了几种常见的实现方式,面试过程当中一般没必要要向面试官描述过多实现细节,说出每种实现方式的特色便可。但愿能给你们带来帮助。