生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要做用是生成必定量的数据放到缓冲区中,而后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
.java
要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。一样,也可让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据以后,再唤醒消费者。一般采用进程间通讯的方法解决该问题。若是解决方法不够完善,则容易出现死锁的状况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒本身。该问题也能被推广到多个生产者和消费者的情形。
多线程
因为前两点缘由,所以须要保持线程间的同步,即一个线程消费(或生产)完,其余线程才能进行竞争CPU,得到消费(或生产)的机会。对于这一点,可使用条件变量进行线程间的同步:生产者线程在product以前,须要wait直至获取本身所需的信号量以后,才会进行product的操做;一样,对于消费者线程,在consume以前须要wait直到没有线程在访问共享区(缓冲区),再进行consume的操做,以后再解锁并唤醒其余可用阻塞线程。
性能
在访问共享区资源时,为避免多个线程同时访问资源形成混乱,须要对共享资源加锁,从而保证某一时刻只有一个线程在访问共享资源。测试
/**资源类**/ class Data { private int number = 0; /** * 判断等待、业务、通知 */ //+1 public synchronized void increment() throws InterruptedException { if (number != 0) { // 等待 this.wait(); } number++; System.out.println(Thread.currentThread().getName() + "=>" + number); // 通知其余线程,我+1完毕了 this.notifyAll(); } //-1 public synchronized void decrement() throws InterruptedException { if (number == 0) { //等待 this.wait(); } number--; System.out.println(Thread.currentThread().getName() + "=>" + number); // 通知其余线程,我-1完毕了 this.notifyAll(); } }
package com.xgp.pc; /** * 线程之间的通讯问题:生产者和消费者问题! 等待唤醒 通知 * 线程交替问题 A B 操做同一个变量 num = 0 * A num+1 * B num-1 * @author 薛国鹏 */ @SuppressWarnings("all") public class A { public static void main(String[] args) { Data data = new Data(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"A").start(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"B").start(); } }
A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 进程完成,退出码 0
package com.xgp.pc; /** * 线程之间的通讯问题:生产者和消费者问题! 等待唤醒 通知 * 线程交替问题 A B 操做同一个变量 num = 0 * A num+1 * B num-1 * @author 薛国鹏 */ @SuppressWarnings("all") public class A { public static void main(String[] args) { Data data = new Data(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"A").start(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"B").start(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"C").start(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"D").start(); } }
A=>1 B=>0 A=>1 B=>0 C=>1 A=>2 D=>1 D=>0 A=>1 C=>2 B=>1 B=>0 C=>1 A=>2 D=>1 D=>0 A=>1 C=>2 B=>1 B=>0 C=>1 A=>2 D=>1 D=>0 A=>1 C=>2 B=>1 B=>0 C=>1 A=>2 D=>1 D=>0 A=>1 C=>2 B=>1 B=>0 C=>1 D=>0 C=>1 D=>0 进程完成,退出码 0
/**资源类**/ class Data { private int number = 0; /** * 判断等待、业务、通知 */ //+1 public synchronized void increment() throws InterruptedException { while (number != 0) { // 等待 this.wait(); } number++; System.out.println(Thread.currentThread().getName() + "=>" + number); // 通知其余线程,我+1完毕了 this.notifyAll(); } //-1 public synchronized void decrement() throws InterruptedException { while (number == 0) { //等待 this.wait(); } number--; System.out.println(Thread.currentThread().getName() + "=>" + number); // 通知其余线程,我-1完毕了 this.notifyAll(); } }
A=>1 B=>0 A=>1 B=>0 C=>1 B=>0 A=>1 D=>0 C=>1 B=>0 A=>1 D=>0 C=>1 B=>0 A=>1 D=>0 C=>1 B=>0 A=>1 D=>0 C=>1 B=>0 A=>1 D=>0 C=>1 B=>0 A=>1 D=>0 C=>1 B=>0 A=>1 D=>0 C=>1 B=>0 A=>1 D=>0 C=>1 D=>0 C=>1 D=>0 进程完成,退出码 0
package com.xgp.pc; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @SuppressWarnings("all") public class B { public static void main(String[] args) { Data2 data = new Data2(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"A").start(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"B").start(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"C").start(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"D").start(); } } /**资源类**/ class Data2 { private int number = 0; /** * 判断等待、业务、通知 */ Lock lock = new ReentrantLock(); //锁监视器(取代了对象监视器的使用) Condition condition = lock.newCondition(); //+1 public void increment() throws InterruptedException { lock.lock(); try { while (number != 0) { // 等待 condition.await(); //等待 } number++; System.out.println(Thread.currentThread().getName() + "=>" + number); // 通知其余线程,我+1完毕了 condition.signalAll(); }catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); } } //-1 public void decrement() throws InterruptedException { lock.lock(); try { while (number == 0) { //等待 condition.await(); } number--; System.out.println(Thread.currentThread().getName() + "=>" + number); // 通知其余线程,我-1完毕了 condition.signalAll(); }catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); } } }
A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 A=>1 B=>0 C=>1 D=>0 C=>1 D=>0 C=>1 D=>0 C=>1 D=>0 C=>1 D=>0 C=>1 D=>0 C=>1 D=>0 C=>1 D=>0 C=>1 D=>0 C=>1 D=>0 进程完成,退出码 0
package com.xgp.pc; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @SuppressWarnings("all") public class C { public static void main(String[] args) { Data3 data = new Data3(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.printA(); } catch (Exception e) { e.printStackTrace(); } } },"A").start(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.printB(); } catch (Exception e) { e.printStackTrace(); } } },"B").start(); new Thread(() -> {for(int i = 0;i < 10;i++) { try { data.printC(); } catch (Exception e) { e.printStackTrace(); } } },"C").start(); } } class Data3 { private int number = 1; private Lock lock = new ReentrantLock(); //锁监视器(取代了对象监视器的使用) private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); public void printA() { lock.lock(); try { while (number != 1) { condition1.await(); } System.out.println(Thread.currentThread().getName()); //唤醒指定的B number = 2; condition2.signal(); }catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); } } public void printB() { lock.lock(); try { while (number != 2) { condition2.await(); } System.out.println(Thread.currentThread().getName()); //唤醒指定的B number = 3; condition3.signal(); }catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); } } public void printC() { lock.lock(); try { while (number != 3) { condition3.await(); } System.out.println(Thread.currentThread().getName()); //唤醒指定的B number = 1; condition1.signal(); }catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); } } }
A B C A B C A B C A B C A B C A B C A B C A B C A B C A B C 进程完成,退出码 0