所谓生产者消费者模式,即N个线程进行生产,同时N个线程进行消费,两种角色经过内存缓冲区进行通讯
图片来源https://www.cnblogs.com/chent...html
下面咱们经过四种方式,来实现生产者消费者模式。java
首先是最原始的synchronized
方式api
定义库存类(即图中缓存区)缓存
class Stock { private String name; // 标记库存是否有内容 private boolean hasComputer = false; public synchronized void putOne(String name) { // 若库存中已有内容,则生产线程阻塞等待 while (hasComputer) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } this.name = name; System.out.println("生产者...生产了 " + name); // 更新标记 this.hasComputer = true; // 这里用notify的话,假设p0执行完毕,此时c0,c1都在wait, 同时唤醒另外一个provider:p1, // p1判断标记后休眠,形成全部线程都wait的局面,即死锁; // 所以使用notifyAll解决死锁问题 this.notifyAll(); } public synchronized void takeOne() { // 若库存中没有内容,则消费线程阻塞等待生产完毕后继续 while (!hasComputer) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("消费者...消费了 " + name); this.hasComputer = false; this.notifyAll(); } }
定义生产者和消费者(为了节省空间和方便阅读,这里将生产者和消费者定义成了匿名内部类)多线程
public static void main(String[] args) { // 用于通讯的库存类 Stock computer = new Stock(); // 定义两个生产者和两个消费者 Thread p1 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.putOne("Dell"); } } }); Thread p2 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.putOne("Mac"); } } }); Thread c1 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.takeOne(); } } }); Thread c2 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.takeOne(); } } }); p1.start(); p2.start(); c1.start(); c2.start(); }
运行结果图oracle
第二种方式:Lock
ide
Jdk1.5以后加入了Lock接口,一个lock对象能够有多个Condition类
,Condition类负责对lock对象进行wait,notify,notifyall
操做ui
定义库存类this
class LockStock { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); // 加入库存概念,可批量生产和消费 // 定义最大库存为10 final String[] stock = new String[10]; // 写入标记、读取标记、已有商品数量 int putptr, takeptr, count; public void put(String computer) { // lock代替synchronized lock.lock(); try { // 若库存已满则生产者线程阻塞 while (count == stock.length) notFull.await(); // 库存中加入商品 stock[putptr] = computer; // 库存已满,指针置零,方便下次从新写入 if (++putptr == stock.length) putptr = 0; ++count; System.out.println(computer + " 正在生产数据: -- 库存剩余:" + count); notEmpty.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } public String take(String consumerName) { lock.lock(); try { while (count == 0) notEmpty.await(); // 从库存中获取商品 String computer = stock[takeptr]; if (++takeptr == stock.length) takeptr = 0; --count; System.out.println(consumerName + " 正在消费数据:" + computer + " -- 库存剩余:" + count); notFull.signal(); return computer; } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } // 无逻辑做用,放慢速度 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return ""; } }
以上部分代码摘自java7 API中Condition接口的官方示例spa
接着仍是定义生产者和消费者
public static void main(String[] args) { LockStock computer = new LockStock(); Thread p1 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.put("Dell"); } } }); Thread p2 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.put("Mac"); } } }); Thread c1 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.take("zhangsan"); } } }); Thread c2 = new Thread(new Runnable() { @Override public void run() { while (true) { computer.take("李四"); } } }); // 两个生产者两个消费者同时运行 p1.start(); p2.start(); c1.start(); c2.start(); }
运行结果图:
第三种方式:Semaphore
首先依旧是库存类:
class Stock { List<String> stock = new LinkedList(); // 互斥量,控制共享数据的互斥访问 private Semaphore mutex = new Semaphore(1); // canProduceCount能够生产的总数量。 经过生产者调用acquire,减小permit数目 private Semaphore canProduceCount = new Semaphore(10); // canConsumerCount能够消费的数量。经过生产者调用release,增长permit数目 private Semaphore canConsumerCount = new Semaphore(0); public void put(String computer) { try { // 可生产数量 -1 canProduceCount.acquire(); mutex.acquire(); // 生产一台电脑 stock.add(computer); System.out.println(computer + " 正在生产数据" + " -- 库存剩余:" + stock.size()); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 释放互斥锁 mutex.release(); // 释放canConsumerCount,增长能够消费的数量 canConsumerCount.release(); } // 无逻辑做用,放慢速度 try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } public void get(String consumerName) { try { // 可消费数量 -1 canConsumerCount.acquire(); mutex.acquire(); // 从库存消费一台电脑 String removedVal = stock.remove(0); System.out.println(consumerName + " 正在消费数据:" + removedVal + " -- 库存剩余:" + stock.size()); } catch (InterruptedException e) { e.printStackTrace(); } finally { mutex.release(); // 消费后释放canProduceCount,增长能够生产的数量 canProduceCount.release(); } } }
仍是生产消费者:
public class SemaphoreTest { public static void main(String[] args) { // 用于多线程操做的库存变量 final Stock stock = new Stock(); // 定义两个生产者和两个消费者 Thread dellProducer = new Thread(new Runnable() { @Override public void run() { while (true) { stock.put("Del"); } } }); Thread macProducer = new Thread(new Runnable() { @Override public void run() { while (true) { stock.put("Mac"); } } }); Thread consumer1 = new Thread(new Runnable() { @Override public void run() { while (true) { stock.get("zhangsan"); } } }); Thread consumer2 = new Thread(new Runnable() { @Override public void run() { while (true) { stock.get("李四"); } } }); dellProducer.start(); macProducer.start(); consumer1.start(); consumer2.start(); } }
运行结果图:
第四种方式:BlockingQueue
BlockingQueue的put和take底层实现其实也是使用了第二种方式中的ReentrantLock
+Condition
,而且帮咱们实现了库存队列,方便简洁
一、定义生产者
class Producer implements Runnable { // 库存队列 private BlockingQueue<String> stock; // 生产/消费延迟 private int timeOut; private String name; public Producer(BlockingQueue<String> stock, int timeout, String name) { this.stock = stock; this.timeOut = timeout; this.name = name; } @Override public void run() { while (true) { try { stock.put(name); System.out.println(name + " 正在生产数据" + " -- 库存剩余:" + stock.size()); TimeUnit.MILLISECONDS.sleep(timeOut); } catch (InterruptedException e) { e.printStackTrace(); } } } }
二、定义消费者
class Consumer implements Runnable { // 库存队列 private BlockingQueue<String> stock; private String consumerName; public Consumer(BlockingQueue<String> stock, String name) { this.stock = stock; this.consumerName = name; } @Override public void run() { while (true) { try { // 从库存消费一台电脑 String takeName = stock.take(); System.out.println(consumerName + " 正在消费数据:" + takeName + " -- 库存剩余:" + stock.size()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
三、定义库存并运行
public static void main(String[] args) { // 定义最大库存为10 BlockingQueue<String> stock = new ArrayBlockingQueue<>(10); Thread p1 = new Thread(new Producer(stock, 500, "Mac")); Thread p2 = new Thread(new Producer(stock, 500, "Dell")); Thread c1 = new Thread(new Consumer(stock,"zhangsan")); Thread c2 = new Thread(new Consumer(stock, "李四")); p1.start(); p2.start(); c1.start(); c2.start(); }
运行结果图:
感谢阅读~欢迎指正和补充~~~