在实际的软件开发过程当中,常常会碰到以下场景:某个模块负责产生数据,这些数据由另外一个模块来负责处理(此处的模块是广义的,能够是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。 java
单单抽象出生产者和消费者,还够不上是生产者/消费者模式。该模式还须要有一个缓冲区处于生产者和消费者之间,做为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。多线程
好处:并发
一、解耦 app
假设生产者和消费者分别是两个类。若是让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。未来若是消费者的代码发生变化,可能会影响到生产者。而若是二者都依赖于某个缓冲区,二者之间不直接依赖,耦合也就相应下降了。 ide
二、支持并发函数
使用了生产者/消费者模式以后,生产者和消费者能够是两个独立的并发主体(常见并发类型有进程和线程两种)。生产者把制造出来的数据往缓冲区一丢,就能够再去生产下一个数据。基本上不用依赖消费者的处理速度。其实当初这个模式,主要就是用来处理并发问题的。 this
三、支持忙闲不均
缓冲区还有另外一个好处。若是制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据能够暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。 线程
代码实例:code
/** * 苹果 * @author Administrator */ public class Apple { private int id; private String descFlat; private String color; public Apple(int id, String descFlat, String color){ this.id = id; this.descFlat = descFlat; this.color = color; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getDescFlat() { return descFlat; } public void setDescFlat(String descFlat) { this.descFlat = descFlat; } public String getColor() { return color; } public void setColor(String color) { this.color = color; } @Override public String toString() { return "Apple [id=" + id + ", descFlat=" + descFlat + ", color=" + color + "]"; } }
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; /** * 定义篮子 - 多线程模拟实现生产者/消费者模型 * @author Administrator */ public class Basket { BlockingQueue<Apple> queue = new LinkedBlockingQueue<Apple>(); // 生产 苹果 // put方法放入一个苹果,若basket满了,等到basket有位置 public void produce(Apple apple){ try { queue.put(apple); } catch (InterruptedException e) { e.printStackTrace(); } } // 消费 苹果 // take方法取出一个苹果,若basket为空,等到basket有苹果为止(获取并移除此队列的头部) public Apple consume(){ try { return queue.take(); } catch (InterruptedException e) { e.printStackTrace(); return null; } } }
/** * 定义生产者 * @author Administrator */ public class Producer implements Runnable{ private int instance; private Basket basket; private boolean flat; public Producer(int instance, Basket basket, boolean flat){ this.instance = instance; this.basket = basket; this.flat = flat; } public void run() { System.out.println("开始初始化生产" + instance + "苹果"); for(int i=0; i<instance; i++){ Apple apple = new Apple(i,"描述"+i,"红色"); basket.produce(apple); } System.out.println("初始化生产了" + instance + "苹果,完毕"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } while(flat){ if(basket.queue.size() < instance){ System.out.println("库存不足,准备生产了" + instance + "苹果"); for(int i=0; i<instance; i++){ Apple apple = new Apple(i,"描述"+i,"红色"); basket.produce(apple); } System.out.println("已经生产了" + instance + "苹果"); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
/** * 消费者 * @author Administrator */ public class Consumer implements Runnable{ private int instance; private Basket basket; private boolean flat; public Consumer(int instance, Basket basket, boolean flat){ this.instance = instance; this.basket = basket; this.flat = flat; } public void run() { while(flat){ System.out.println("篮子中苹果数量:" + basket.queue.size()); if(basket.queue.size() >= instance){ System.out.println("开始消费...."); for(int i=0; i<instance; i++){ Apple apple = basket.consume(); if(apple != null){ System.out.println(apple); } } System.out.println("消费....结束"); }else{ System.out.println("篮子中 苹果不足..."); try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
public class BlockingQueueTest { public static void main(String[] args) { // 定义一个 篮子 Basket basket = new Basket(); int instance = 5; boolean flat = true; // 初始化 生产者 Producer producer = new Producer(instance, basket,flat); new Thread(producer).start(); // 初始化 消费者 Consumer consumer = new Consumer(instance, basket, flat); new Thread(consumer).start(); } }