在讨论基于阻塞队列的生产者消费者模式以前咱们先搞清楚到底什么是生产者-消费者模式(producer-consumer模式)?html
好比有两个进程A和B,它们共享一个固定大小的缓冲区,A进程产生数据放入缓冲区,B进程从缓冲区中取出数据进行计算,那么这里其实就是一个生产者和消费者的模式,A至关于生产者,B至关于消费者java
在多线程开发中,若是生产者生产数据的速度很快,而消费者消费数据的速度很慢,那么生产者就必须等待消费者消费完了数据才可以继续生产数据,由于生产那么多也没有地方放啊;同理若是消费者的速度大于生产者那么消费者就会常常处理等待状态,因此为了达到生产者和消费者生产数据和消费数据之间的平衡,那么就须要一个缓冲区用来存储生产者生产的数据,因此就引入了生产者-消费者模式git
简单来讲这里的缓冲区的做用就是为了平衡生产者和消费者的处理能力,起到一个数据缓存的做用,同时也达到了一个解耦的做用github
生产者-消费者模式通常用于将生产数据的一方和消费数据的一方分割开来,将生产数据与消费数据的过程解耦开来web
Excutor任务执行框架:redis
消息中间件activeMQ:编程
任务的处理时间比较长的状况下:segmentfault
首先咱们从最简单的开始,假设只有一个生产者线程执行put操做,向缓冲区中添加数据,同时也只有一个消费者线程从缓冲区中取出数据windows
UML实体关系图,从UML类图中能够看出,咱们的producer和consumer类都持有一个对container对象的引用,这样的设计模式实际上在不少设计模式都有用到,好比咱们的装饰者模式等等,它们共同的目的都是为了达到解耦和复用的效果设计模式
在实现生产者-消费者模式以前咱们须要搞清两个问题:
1)容器中数据状态的一致性:当一个consumer执行了take()方法以后,此时容器为空,可是还没来得及更新容器的size,那么另一个consumer来了以后觉得size不等于0,那么继续执行take(),从而形成了了状态的不一致性
2)为了保证当容器里面没有数据的时候,消费者不会继续take,此时消费者释放锁,处于阻塞状态;而且一旦生产者添加了一条数据以后,此时从新唤醒消费者,消费者从新获取到容器的锁,继续执行take();
当容器里面满的时候,生产者也不会继续put, 此时生产者释放锁,处于阻塞状态;一旦消费者take了一条数据,此时应该唤醒生产者从新获取到容器的锁,继续put
因此对于该容器的任何访问都须要进行同步,也就是说在获取容器的数据以前,须要先获取到容器的锁。
而这里对于容器状态的同步能够参考以下几种方法:
要构建一个生产者消费者模式,那么首先就须要构建一个固定大小的缓冲区,而且该缓冲区具备可阻塞的put方法和take方法
接下来咱们采用第一种方法来实现该模型:使用Object的wait() / notify()方法实现生产者-消费者模型
ps:采用wait()/notify()方法的缺点是不能实现单生产者单消费者模式,由于要是用notify()就必须使用同步代码块
package test1; import java.util.LinkedList; public class Container { LinkedList<Integer> list = new LinkedList<Integer>(); int capacity = 10; public void put(int value){ while (true){ try { //sleep不能放在同步代码块里面,由于sleep不会释放锁, // 当前线程会一直占有produce线程,直到达到容量,调用wait()方法主动释放锁 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (this){ //当容器满的时候,producer处于等待状态 while (list.size() == capacity){ System.out.println("container is full,waiting ...."); try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //没有满,则继续produce System.out.println("producer--"+ Thread.currentThread().getName()+"--put:" + value); list.add(value++); //唤醒其余全部处于wait()的线程,包括消费者和生产者 notifyAll(); } } } public Integer take(){ Integer val = 0; while (true){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (this){ //若是容器中没有数据,consumer处于等待状态 while (list.size() == 0){ System.out.println("container is empty,waiting ..."); try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //若是有数据,继续consume val = list.removeFirst(); System.out.println("consumer--"+ Thread.currentThread().getName()+"--take:" + val); //唤醒其余全部处于wait()的线程,包括消费者和生产者 //notify必须放在同步代码块里面 notifyAll(); } } } }
ps:
这里须要注意的是sleep()不能放在synchronized代码块里面,由于咱们知道sleep()执行以后是不会释放锁的,也就是说当前线程仍然持有对container对象的互斥锁,这个时候当前线程继续判断list.size是否等于capacity,不等于就继续put,而后又sleep一会,而后又继续,直到当list.size == capacity,这个时候终于进入wait()方法,咱们知道wait()方法会释放锁,这个时候其余线程才有机会获取到container的互斥锁,
package test1; import test1.Container; import java.util.Random; public class Producer implements Runnable{ private Container container; public Producer(Container container) { this.container = container; } @Override public void run() { container.put(new Random().nextInt(100)); } }
package test1; import java.util.Random; public class Consumer implements Runnable{ private Container container; public Consumer(Container container) { this.container = container; } @Override public void run() { Integer val = container.take(); } }
package test1; import test1.Consumer; import test1.Container; import test1.Producer; public class Main { public static void main(String[] args){ Container container = new Container(); Thread producer1 = new Thread(new Producer(container)); Thread producer2 = new Thread(new Producer(container)); Thread producer3 = new Thread(new Producer(container)); Thread producer4 = new Thread(new Producer(container)); producer1.start(); producer2.start(); producer3.start(); producer4.start(); Thread consumer1 = new Thread(new Consumer(container)); Thread consumer2 = new Thread(new Consumer(container)); Thread consumer3 = new Thread(new Consumer(container)); Thread consumer4 = new Thread(new Consumer(container)); Thread consumer5 = new Thread(new Consumer(container)); Thread consumer6 = new Thread(new Consumer(container)); consumer1.start(); consumer2.start(); consumer3.start(); consumer4.start(); consumer5.start(); consumer6.start(); } }
运行结果
producer--Thread-1--put:80 producer--Thread-2--put:19 producer--Thread-3--put:8 producer--Thread-0--put:74 consumer--Thread-8--take:80 consumer--Thread-4--take:19 consumer--Thread-6--take:8 consumer--Thread-9--take:74 container is empty,waiting ... container is empty,waiting ... producer--Thread-2--put:20 consumer--Thread-7--take:20 container is empty,waiting ... producer--Thread-3--put:9 producer--Thread-1--put:81 producer--Thread-0--put:75 consumer--Thread-5--take:9 consumer--Thread-6--take:81 consumer--Thread-8--take:75 container is empty,waiting ... container is empty,waiting ... container is empty,waiting ...
生产者消费者模型中的共享资源是一个固定大小的缓冲区,该模式须要当缓冲区满的时候,生产者再也不生产数据,直到消费者消费了一个数据以后,才继续生产;同理当缓冲区空的时候,消费者再也不消费数据,直到生产者生产了一个数据以后,才继续消费
若是要经过信号量来解决这个问题:关键在于找到可以跟踪缓冲区的size大小变化,并根据缓冲区的数量变化来控制消费者和生产者线程之间的协做和运行
那么很容易很够想到用两个信号量:empytyCount和fullCount分别来表示缓冲区满或者空的状态,进而可以更加容易控制消费者和生产者到底何时处于阻塞状态,何时处于运行状态
同时为了使得程序更加具备健壮性,咱们还添加一个二进制信号量useQueue,确保队列的状态的完整性不受损害。例如当两个生产者同时向空队列添加数据时,从而破坏了队列内部的状态,使得其余计数信号量或者返回的缓冲区的size大小不具备一致性。(固然这里也可使用mutex来代替二进制信号量)
produce: P(emptyCount)//信号量emptyCount减一 P(useQueue)//二值信号量useQueue减一,变为0(其余线程不能进入缓冲区,阻塞状态) putItemIntoQueue(item)//执行put操做 V(useQueue)//二值信号量useQueue加一,变为1(其余线程能够进入缓冲区) V(fullCount)//信号量fullCount加一
consume: P(fullCount)//fullCount -= 1 P(useQueue)//useQueue -= 1(useQueue = 0) item ← getItemFromQueue() V(useQueue)//useQueue += 1 (useQueue = 1) V(emptyCount)//emptyCount += 1
ps: 这里的两个PV操做是否能够颠倒
首先生产者获取到信号量emptyCount,执行P(emptyCount),确保emptyCount不等于0,也就是还有空间添加数据,从而才可以进入临界区container
而后执行put操做,执行put操做以前须要为缓冲区加把锁,防止在put的过程当中,其余线程对缓冲区进行修改,因此这个时候须要获取另一个信号量useQueue
相反,若是先执行了 P(useQueue),而且此时的emptyCount = 0,那么生产者就会一直阻塞,直到消费者消费了一个数据;可是此时消费者又没法获取到互斥信号量useQueue,也会一直阻塞,因此就造成了一个死锁
因此这两个p操做是不能交换顺序的,信号量emptyCount是useQueue的基础和前提条件
此时若是生产者已经执行完put操做,那么能够先释放互斥信号量,再执行 V(fullCount);或者先执行 V(fullCount)再释放互斥信号量都没有关系。不会对其余的生产者消费者的状态产生影响;可是最好的仍是先释放互斥锁,再执行V(fullCount),这样能够保证当容器满的时候,消费者可以及时的获取到互斥锁
Container
package test3; import java.util.LinkedList; import java.util.List; import java.util.concurrent.Semaphore; public class Container { Semaphore fullCount = new Semaphore(0); Semaphore emptyCount = new Semaphore(10); Semaphore isUse = new Semaphore(1); List list = new LinkedList<Integer>(); public void put(Integer val){ try { emptyCount.acquire(); isUse.acquire(); list.add(val); System.out.println("producer--"+ Thread.currentThread().getName()+"--put:" + val+"===size:"+list.size()); } catch (InterruptedException e) { e.printStackTrace(); }finally { isUse.release(); fullCount.release(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } public Integer get(){ Integer val1 = 0; try { fullCount.acquire(); isUse.acquire(); val1 = (Integer) list.remove(0); System.out.println("consumer--"+ Thread.currentThread().getName()+"--take:" + val1+"===size:"+list.size()); } catch (InterruptedException e) { e.printStackTrace(); }finally { isUse.release(); emptyCount.release(); } return val1; } }
生产者
package test3; import java.util.Random; public class Producer implements Runnable{ private Container container; public Producer(Container container) { this.container = container; } @Override public void run() { while (true){ container.put(new Random().nextInt(100)); } } }
消费者
package test3; public class Consumer implements Runnable{ private Container container; public Consumer(Container container) { this.container = container; } @Override public void run() { while (true){ Integer val = container.get(); } } }
测试
package test3; public class Test { public static void main(String[] args){ Container container = new Container(); Thread producer1 = new Thread(new Producer(container)); Thread producer2 = new Thread(new Producer(container)); Thread producer3 = new Thread(new Producer(container)); Thread consumer1 = new Thread(new Consumer(container)); Thread consumer2 = new Thread(new Consumer(container)); Thread consumer3 = new Thread(new Consumer(container)); Thread consumer4 = new Thread(new Consumer(container)); producer1.start(); producer2.start(); producer3.start(); consumer1.start(); consumer2.start(); consumer3.start(); consumer4.start(); } }
producer--Thread-0--put:74===size:1 producer--Thread-4--put:16===size:2 producer--Thread-2--put:51===size:3 producer--Thread-1--put:77===size:4 producer--Thread-3--put:93===size:5 consumer--Thread-6--take:74===size:4 consumer--Thread-6--take:16===size:3 consumer--Thread-6--take:51===size:2 consumer--Thread-6--take:77===size:1 consumer--Thread-5--take:93===size:0 producer--Thread-4--put:19===size:1 producer--Thread-3--put:68===size:2 producer--Thread-0--put:72===size:3 consumer--Thread-6--take:19===size:2 consumer--Thread-6--take:68===size:1 consumer--Thread-5--take:72===size:0 producer--Thread-1--put:82===size:1 producer--Thread-2--put:32===size:2 consumer--Thread-5--take:82===size:1
因为这里的缓冲区由BlockingQueue容器代替,那么这里咱们就不须要从新建立一个容器类了,直接建立生产者类和消费者类,而且一样的都须要拥有一个容器类BlockingQueue的实例应用
package test; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; public class Producer implements Runnable{ private ArrayBlockingQueue<Integer> queue ; public Producer(ArrayBlockingQueue<Integer> queue) { this.queue = queue; } @Override public void run() { Random random = new Random(); while (true){ try { Thread.sleep(100); if(queue.size() == 10) System.out.println("================the queue is full,the producer thread is waiting.................."); int item = random.nextInt(100); queue.put(item); System.out.println("producer:" + Thread.currentThread().getName() + " produce:" + item+";the size of the queue:" + queue.size()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
package test; import java.util.concurrent.ArrayBlockingQueue; public class Consumer implements Runnable { private ArrayBlockingQueue<Integer> queue; public Consumer(ArrayBlockingQueue<Integer> queue) { this.queue = queue; } @Override public void run() { while (true){ try { Thread.sleep(100); if(queue.size() == 0) System.out.println("=============the queue is empty,the consumer thread is waiting................"); Integer item = queue.take(); System.out.println("consumer:" + Thread.currentThread().getName() + " consume:" + item+";the size of the queue:" + queue.size()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
package test; import java.util.concurrent.ArrayBlockingQueue; public class Test { public static void main(String[] args){ ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10); Thread producer1 = new Thread(new Producer(queue)); Thread producer2 = new Thread(new Producer(queue)); Thread producer3 = new Thread(new Producer(queue)); Thread producer4 = new Thread(new Producer(queue)); Thread producer5 = new Thread(new Producer(queue)); producer1.start(); producer2.start(); producer3.start(); producer4.start(); producer5.start(); Thread consumer1 = new Thread(new Consumer(queue)); Thread consumer2 = new Thread(new Consumer(queue)); consumer1.start(); consumer2.start(); try { producer1.join(); producer2.join(); producer3.join(); producer4.join(); producer5.join(); consumer1.join(); consumer2.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }
=============the queue is empty,the consumer thread is waiting................ consumer:Thread-5 consume:64;the size of the queue:0 producer:Thread-3 produce:64;the size of the queue:1 consumer:Thread-6 consume:87;the size of the queue:0 producer:Thread-1 produce:1;the size of the queue:3 producer:Thread-4 produce:87;the size of the queue:2 producer:Thread-2 produce:71;the size of the queue:2 producer:Thread-0 produce:76;the size of the queue:1 consumer:Thread-6 consume:71;the size of the queue:2 producer:Thread-1 produce:26;the size of the queue:6 producer:Thread-3 produce:6;the size of the queue:6 producer:Thread-0 produce:76;the size of the queue:5 producer:Thread-2 produce:37;the size of the queue:6
在用Lock和Condition的await()/signal()方法实现生产者消费者以前,咱们先来了解一下Lock和synchronized都是基于锁有哪些区别,以及Condition的await()/signal()方法和Object的wait()/notify()方法都是等待和唤醒又有哪些区别
锁机制 | Lock | synchronized |
---|---|---|
所属层次 | java.util.concurrent package中的一个接口 | 是一个关键字,JVM内置的语言实现 |
释放锁与加锁 | 经过lock()/unlock()进行手动释放与加锁 | 不须要,进入synchronized同步代码块就自动获取锁,退出同步代码块自动释放锁 |
设置超时时间 | trylock(timeout) | 没有超时时间,线程会一直阻塞,直到获取锁 |
公平机制 | 设置true,为公平锁,等待时间最长的先获取 | 没有 |
阻塞线程列表 | 能够查看正处于等待状态的线程列表 | 不能够 |
遇到异常时释放 | 当遇到异常时在finally中执行unlock() | 遇到异常时释放锁 |
底层实现 | 乐观锁方式(cas),每次不加锁而是假设没有冲突而去完成某项操做 | CPU悲观锁机制,即线程得到的是独占锁,只能依靠阻塞来等待线程释放锁 |
具体唤醒某一个线程 | ReentrantLock里面的Condition应用,可以控制signal哪一个线程 | 不能控制具体notify哪一个线程,notifyall()唤醒全部线程 |
灵活性 | 比synchronized更加灵活 | 不是那么灵活 |
响应中断 | 等待的线程能够响应中断 | 不能响应中断 |
应用场景 | 资源竞争激烈的状况下,是synchronized的几十倍 | 资源竞争不激烈时,优于Lock |
方法 | Condition | Object |
---|---|---|
阻塞等待 | await() | wait() |
唤醒其余线程 | signal() | notify()/notifyall() |
使用的锁 | 互斥锁/共享锁,如Lock | 同步锁:如synchronized |
一个锁对应 | 能够建立多个condition | 对应一个Object |
唤醒指定的线程 | 明确的指定线程 | 只能经过notifyAll唤醒全部线程;或者notify()随机唤醒 |
该实现方式相比较synchronized于object的wait()/notify()方法具备更加的灵活性,能够唤醒具体的消费者线程或者生产者线程,达到当缓冲区满的时候,唤醒消费者线程,此时生产者线程都将被阻塞,而不是向notifyall()那样唤醒全部的线程。
package test8; import java.util.LinkedList; import java.util.List; import java.util.Vector; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Container{ private final Lock lock = new ReentrantLock(); //表示生产者线程 private final Condition notFull = lock.newCondition(); //表示消费者线程 private final Condition notEmpty = lock.newCondition(); private int capacity; private List<Integer> list = new LinkedList<>(); public Container(int capacity) { this.capacity = capacity; } public Integer take(){ lock.lock(); try { while (list.size() == 0) try { System.out.println("the list is empty........"); notEmpty.await();//阻塞消费者线程 } catch (InterruptedException e) { e.printStackTrace(); } Integer val = list.remove(0); System.out.println("consumer--"+ Thread.currentThread().getName()+"--take:" + val+"===size:"+list.size()); notFull.signalAll();//唤醒全部生产者线程 return val; }finally { lock.unlock(); } } public void put(Integer val){ lock.lock(); try { while (list.size() == capacity){ try { System.out.println("the list is full........"); notFull.await();//阻塞生产者线程 } catch (InterruptedException e) { e.printStackTrace(); } } list.add(val); System.out.println("producer--"+ Thread.currentThread().getName()+"--put:" + val+"===size:"+ list.size()); notEmpty.signalAll();//唤醒全部消费者线程 }finally { lock.unlock(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
package test8; import java.util.Random; import java.util.TreeMap; import java.util.concurrent.locks.Condition; public class Producer implements Runnable { private Container container; public Producer(Container container) { this.container = container; } @Override public void run() { while (true){ container.put(new Random().nextInt(100)); } } }
package test8; public class Consumer implements Runnable { private Container container; public Consumer(Container container) { this.container = container; } @Override public void run() { while (true){ Integer val = container.take(); } } }
package test8; public class Test { public static void main(String[] args){ Container container = new Container(5); Thread producer1 = new Thread(new Producer(container)); Thread producer2 = new Thread(new Producer(container)); Thread producer3 = new Thread(new Producer(container)); Thread producer4 = new Thread(new Producer(container)); Thread producer5 = new Thread(new Producer(container)); Thread consumer1 = new Thread(new Consumer(container)); Thread consumer2 = new Thread(new Consumer(container)); producer1.start(); producer2.start(); producer3.start(); producer4.start(); producer5.start(); consumer1.start(); consumer2.start(); } }
the list is empty........ producer--Thread-3--put:77===size:1 consumer--Thread-6--take:77===size:0 the list is empty........ producer--Thread-4--put:55===size:1 producer--Thread-0--put:62===size:2 producer--Thread-1--put:90===size:3 producer--Thread-2--put:57===size:4 consumer--Thread-5--take:55===size:3 consumer--Thread-5--take:62===size:2 consumer--Thread-5--take:90===size:1 consumer--Thread-5--take:57===size:0 the list is empty........ the list is empty........ producer--Thread-0--put:10===size:1 producer--Thread-1--put:21===size:2 producer--Thread-3--put:3===size:3 producer--Thread-4--put:75===size:4 producer--Thread-2--put:94===size:5 consumer--Thread-5--take:10===size:4
对于单生产者单消费者,只用保证缓冲区满的时候,生产者不会继续向缓冲区放数据,缓冲区空的时候,消费者不会继续从缓冲区取数据,而不存在同时有两个生产者使用缓冲区资源,形成数据不一致的状态。
因此对于单生产者单消费者,若是采用信号量模型来实现的话,那么只须要两个信号量:empytyCount和fullCount分别来表示缓冲区满或者空的状态,进而可以更加容易控制消费者和生产者到底何时处于阻塞状态,何时处于运行状态; 而不须要使用互斥信号量了
produce: P(emptyCount)//信号量emptyCount减一 putItemIntoQueue(item)//执行put操做 V(fullCount)//信号量fullCount加一
consume: P(fullCount)//fullCount -= 1 item ← getItemFromQueue() V(emptyCount)//emptyCount += 1
实现
缓冲区容器类
package test9; import java.time.temporal.ValueRange; import java.util.LinkedList; import java.util.List; import java.util.concurrent.Semaphore; public class Container_spsc { Semaphore emptyCount = new Semaphore(10); Semaphore fullCount = new Semaphore(0); List<Integer> list = new LinkedList<Integer>(); public void put(int val){ try { emptyCount.acquire(); list.add(val); System.out.println("producer--"+ Thread.currentThread().getName()+"--put:" + val+"===size:"+list.size()); } catch (InterruptedException e) { e.printStackTrace(); }finally { fullCount.release(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } public Integer take(){ Integer val = 0; try { fullCount.acquire(); val = list.remove(0); System.out.println("consumer--"+ Thread.currentThread().getName()+"--take:" + val+"===size:"+list.size()); } catch (InterruptedException e) { e.printStackTrace(); }finally { emptyCount.release(); } return val; } }
生产者
package test9; import test8.Container; import java.util.Random; public class Producer implements Runnable { private Container_spsc container; public Producer(Container_spsc container) { this.container = container; } @Override public void run() { while (true){ container.put(new Random().nextInt(100)); } } }
消费者类
package test9; import test8.Container; public class Consumer implements Runnable { private Container_spsc container; public Consumer(Container_spsc container) { this.container = container; } @Override public void run() { while (true){ Integer take = container.take(); } } }
测试
package test9; public class Test { public static void main(String[] args){ Container_spsc container = new Container_spsc(); Thread producer = new Thread(new Producer(container)); Thread consumer = new Thread(new Consumer(container)); producer.start(); consumer.start(); } }
producer--Thread-0--put:62===size:1 consumer--Thread-1--take:62===size:0 producer--Thread-0--put:40===size:1 consumer--Thread-1--take:40===size:0 producer--Thread-0--put:86===size:1 consumer--Thread-1--take:86===size:0 producer--Thread-0--put:15===size:1 consumer--Thread-1--take:15===size:0 producer--Thread-0--put:83===size:1 consumer--Thread-1--take:83===size:0 producer--Thread-0--put:13===size:1 consumer--Thread-1--take:13===size:0
对于多生产者单消费者来讲,多生产者之间具备互斥关系,因此这里须要一个互斥锁来实现缓冲区的互斥访问,那么具体的实现方式就是在单生产者单消费者的基础之上,加一个互斥信号量useQueue
若是采用信号量来实现的话能够以下:
produce: P(emptyCount)//信号量emptyCount减一 P(useQueue)//二值信号量useQueue减一,变为0(其余线程不能进入缓冲区,阻塞状态) putItemIntoQueue(item)//执行put操做 V(useQueue)//二值信号量useQueue加一,变为1(其余线程能够进入缓冲区) V(fullCount)//信号量fullCount加一
consume: P(fullCount)//fullCount -= 1 item ← getItemFromQueue() V(emptyCount)//emptyCount += 1
具体的实现和单生产者单消费者差很少,只不过在生产者类里面多加了一个互斥信号量useQueue
对于单生产者多消费者同多生产者多消费者
produce: P(emptyCount)//信号量emptyCount减一 putItemIntoQueue(item)//执行put操做 V(fullCount)//信号量fullCount加一
consume: P(fullCount)//fullCount -= 1 P(useQueue)//二值信号量useQueue减一,变为0(其余线程不能进入缓冲区,阻塞状态) item ← getItemFromQueue() V(useQueue)//二值信号量useQueue加一,变为1(其余线程能够进入缓冲区) V(emptyCount)//emptyCount += 1
具体的实现和单生产者单消费者差很少,只不过在消费者类里面多加了一个互斥信号量useQueue
对于多生产者多消费者问题,是一个同步+互斥问题,不只须要生产者和消费者之间的同步协做,还须要实现对缓冲区资源的互斥访问;这个能够参考前面对生产者消费者4种实现方式
采用信号量
produce: P(emptyCount)//信号量emptyCount减一 P(useQueue)//二值信号量useQueue减一,变为0(其余线程不能进入缓冲区,阻塞状态) putItemIntoQueue(item)//执行put操做 V(useQueue)//二值信号量useQueue加一,变为1(其余线程能够进入缓冲区) V(fullCount)//信号量fullCount加一
consume: P(fullCount)//fullCount -= 1 P(useQueue)//二值信号量useQueue减一,变为0(其余线程不能进入缓冲区,阻塞状态) item ← getItemFromQueue() V(useQueue)//二值信号量useQueue加一,变为1(其余线程能够进入缓冲区) V(emptyCount)//emptyCount += 1
用一个缓冲区,生产者和消费者须要先获取到缓冲区的锁才能进行put和take操做,每一次put和take都须要获取一次锁,这须要大量的同步与互斥操做,十分损耗性能。
因此若是采用双缓冲区的话,一个缓冲区bufferA用于生产者执行put操做,一个缓冲区bufferB用于消费者执行take操做;生产者线程和消费者线程在使用各自的缓冲区以前都须要先获取到缓冲区对应的锁,才能进行操做;
生产者和消费者各自使用本身独立的缓冲区,那么就不存在同一个缓冲区被put的同时进行take操做
因此一旦生产者和消费者一旦获取到了对应缓冲区的锁,那么每一次执行put/take操做时就不用再次从新获取锁了,从而减小了不少获取锁、释放锁的性能开销
若是bufferA被put满了,那么生产者释放bufferA的锁,并等待消费者释放bufferB的锁;当bufferB被take空了,消费者释放bufferB的锁,此时生产者获取到bufferB的锁,对bufferB进行put;消费者获取到bufferA的锁,对bufferA进行take,那么就完成了一次缓冲区的切换
双缓冲区的状态
bufferA和bufferB都处于工做状态,一个读一个写
假设bufferA已经满了,那么生产者就会释放bufferA的锁,尝试获取bufferB,而此时bufferB还在执行take操做,消费者还没释放bufferB的锁,那么生产者进入等待状态
当bufferB为空,那么此时消费者释放bufferB的锁,尝试获取bufferA的锁,此时消费者被唤醒,从新尝试获取bufferB的锁
若是操做完当前的缓冲区以后,先获取另一个缓冲区的锁,再释放当前缓冲区的锁,就会发生死锁问题。若是bufferA和bufferB的线程同时尝试获取对方的锁,那么就会一直循环等待下去
因为双缓冲区是为了不每次读写的时候不用进行同步与互斥操做,因此对于一些原本就是线程安全的类例如arrayblockingqueue就不适合做为双缓冲区,由于他们内部已经实现了每次读写操做的时候进行加锁和释放
应用场景:
多个缓冲区构成一个缓冲池,一样须要两个同步信号量emtpyCount和fullCount,还有一个互斥信号量useQueue,同时还须要两个变量指示哪些是空缓冲区哪些是有数据的缓冲区,多缓冲区和双缓冲区同样,一样是以空间换时间,减小单个读写操做的同步与互斥操做,对于同一个缓冲区而言,不可能同时会put和take
讨论为何要引入环形缓冲区,其实也就是在讨论队列缓冲区有什么弊端,而环形缓冲区是如何解决这种弊端的=
那么咱们先认识一下什么是环形缓冲区
队列缓冲区
了解了如何使用Java经过简单的synchronized与object的wait()/notify()、Lock与Condition的await()/signal()方法、BlockingQueue、信号量semaphore四种方法来实现生产者消费者模型,之后有机会咱们在研究研究Linux和windows分别又是如何实现生产者消费者模型的
《Java并发编程实践》
实现生产者消费者模式的四种方式(Synchronized、Lock、Semaphore、BlockingQueue)
https://blog.csdn.net/luohuac...
https://www.geeksforgeeks.org...
https://www.geeksforgeeks.org...
https://blog.csdn.net/chencha...
https://www.cnblogs.com/Wante...
https://blog.csdn.net/u012403...
https://www.geeksforgeeks.org...
https://blog.csdn.net/woailuo...
https://blog.csdn.net/liuxiao...
https://program-think.blogspo...
https://zhuanlan.zhihu.com/p/...
欢迎关注个人公众号:小秋的博客,天天进步一点点