生产者-消费者(producer-consumer)问题,也称做有界缓冲区(bounded-buffer)问题,两个进程共享一个公共的固定大小的缓冲区。
其中一个是生产者,用于将消息放入缓冲区;另一个是消费者,用于从缓冲区中取出消息。
问题出如今当缓冲区已经满了,而此时生产者还想向其中放入一个新的数据项的情形,其解决方法是让生产者此时进行休眠,等待消费者从缓冲区中取走了一个或者多个数据后再去唤醒它。
一样地,当缓冲区已经空了,而消费者还想去取消息,此时也可让消费者进行休眠,等待生产者放入一个或者多个数据时再唤醒它。java
Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成大相径庭的对象,以便经过将这些对象与任意 Lock 实现组合使用,为每一个对象提供多个等待 set (wait-set)。缓存
其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。多线程
在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll(),传统线程的通讯方式,Condition均可以实现,这里注意,Condition是被绑定到Lock上的,要建立一个Lock的Condition必须用newCondition()方法。app
这样看来,Condition和传统的线程通讯没什么区别,Condition的强大之处在于它能够为多个线程间创建不一样的Condition,下面引入API中的一段代码,加以说明。ide
代码:spa
class BoundedBuffer { final Lock lock = new ReentrantLock();//锁对象 final Condition notFull = lock.newCondition();//写线程条件 final Condition notEmpty = lock.newCondition();//读线程条件 final Object[] items = new Object[100];//缓存队列 int putptr/*写索引*/, takeptr/*读索引*/, count/*队列中存在的数据个数*/; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length)//若是队列满了 notFull.await();//阻塞写线程 items[putptr] = x;//赋值 if (++putptr == items.length) putptr = 0;//若是写索引写到队列的最后一个位置了,那么置为0 ++count;//个数++ notEmpty.signal();//唤醒读线程 } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0)//若是队列为空 notEmpty.await();//阻塞读线程 Object x = items[takeptr];//取值 if (++takeptr == items.length) takeptr = 0;//若是读索引读到队列的最后一个位置了,那么置为0 --count;//个数-- notFull.signal();//唤醒写线程 return x; } finally { lock.unlock(); } } }
这是一个处于多线程工做环境下的缓存区,缓存区提供了两个方法,put和take,put是存数据,take是取数据,内部有个缓存队列,具体变量和方法说明见代码,.net
这个缓存区类实现的功能:有多个线程往里面存数据和从里面取数据,其缓存队列(先进先出后进后出)能缓存的最大数值是100,多个线程间是互斥的,线程
当缓存队列中存储的值达到100时,将写线程阻塞,并唤醒读线程,当缓存队列中存储的值为0时,将读线程阻塞,并唤醒写线程,这也是ArrayBlockingQueue的内部实现。rest
下面分析一下代码的执行过程:code
1. 一个写线程执行,调用put方法;
2. 判断count是否为100,显然没有100;
3. 继续执行,存入值;
4. 判断当前写入的索引位置++后,是否和100相等,相等将写入索引值变为0,并将count+1;
5. 仅唤醒读线程阻塞队列中的一个;
6. 一个读线程执行,调用take方法;
7. ……
8. 仅唤醒写线程阻塞队列中的一个。
这就是多个Condition的强大之处,假设缓存队列中已经存满,那么阻塞的确定是写线程,唤醒的确定是读线程,相反,阻塞的确定是读线程,唤醒的确定是写线程,那么假设只有一个Condition会有什么效果呢,缓存队列中已经存满,这个Lock不知道唤醒的是读线程仍是写线程了,若是唤醒的是读线程,皆大欢喜,若是唤醒的是写线程,那么线程刚被唤醒,又被阻塞了,这时又去唤醒,这样就浪费了不少时间。
Condition的应用:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Main { static class NumberWrapper { public int value = 1; } public static void main(String[] args) { //初始化可重入锁 final Lock lock = new ReentrantLock(); //第一个条件当屏幕上输出到3 final Condition reachThreeCondition = lock.newCondition(); //第二个条件当屏幕上输出到6 final Condition reachSixCondition = lock.newCondition(); //NumberWrapper只是为了封装一个数字,一边能够将数字对象共享,并能够设置为final //注意这里不要用Integer, Integer 是不可变对象 final NumberWrapper num = new NumberWrapper(); //初始化A线程 Thread threadA = new Thread(new Runnable() { @Override public void run() { //须要先得到锁 lock.lock(); try { System.out.println("threadA start write"); //A线程先输出前3个数 while (num.value <= 3) { System.out.println(num.value); num.value++; } //输出到3时要signal,告诉B线程能够开始了 reachThreeCondition.signal(); } finally { lock.unlock(); } lock.lock(); try { //等待输出6的条件 reachSixCondition.await(); System.out.println("threadA start write"); //输出剩余数字 while (num.value <= 9) { System.out.println(num.value); num.value++; } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }); Thread threadB = new Thread(new Runnable() { @Override public void run() { try { lock.lock(); while (num.value <= 3) { //等待3输出完毕的信号 reachThreeCondition.await(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } try { lock.lock(); //已经收到信号,开始输出4,5,6 System.out.println("threadB start write"); while (num.value <= 6) { System.out.println(num.value); num.value++; } //4,5,6输出完毕,告诉A线程6输出完了 reachSixCondition.signal(); } finally { lock.unlock(); } } }); //启动两个线程 threadB.start(); threadA.start(); } }
threadA start write 1 2 3 threadB start write 4 5 6 threadA start write 7 8 9
基本思路就是首先要A线程先写1,2,3,这时候B线程应该等待reachThredCondition信号,而当A线程写完3以后就经过signal告诉B线程“我写到3了,该你了”,
这时候A线程要等reachSixCondition信号,同时B线程获得通知,开始写4,5,6,写完4,5,6以后B线程通知A线程reachSixCondition条件成立了,这时候A线程就开始写剩下的7,8,9了。
Java官方提供的例子:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Main { public static void main(String[] args) { final BoundedBuffer boundedBuffer = new BoundedBuffer(); Thread t1 = new Thread(new Runnable() { @Override public void run() { System.out.println("t1 run"); for (int i=0;i<20;i++) { try { System.out.println("putting.."); boundedBuffer.put(Integer.valueOf(i)); } catch (InterruptedException e) { e.printStackTrace(); } } } }) ; Thread t2 = new Thread(new Runnable() { @Override public void run() { for (int i=0;i<20;i++) { try { Object val = boundedBuffer.take(); System.out.println(val); } catch (InterruptedException e) { e.printStackTrace(); } } } }) ; t1.start(); t2.start(); } /** * BoundedBuffer 是一个定长100的集合,当集合中没有元素时,take方法须要等待,直到有元素时才返回元素 * 当其中的元素数达到最大值时,要等待直到元素被take以后才执行put的操做 * @author yukaizhao * */ static class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { System .out.println("put wait lock"); lock.lock(); System.out.println("put get lock"); try { while (count == items.length) { System.out.println("buffer full, please wait"); notFull.await(); } items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { System.out.println("take wait lock"); lock.lock(); System.out.println("take get lock"); try { while (count == 0) { System.out.println("no elements, please wait"); notEmpty.await(); } Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } } }
t1 run putting.. take wait lock take get lock no elements, please wait put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock 0 take wait lock take get lock 1 take wait lock take get lock 2 take wait lock take get lock 3 take wait lock take get lock 4 take wait lock take get lock 5 take wait lock take get lock 6 take wait lock take get lock 7 take wait lock take get lock 8 take wait lock take get lock 9 take wait lock take get lock 10 take wait lock take get lock 11 take wait lock take get lock 12 take wait lock take get lock 13 take wait lock take get lock 14 take wait lock take get lock 15 take wait lock take get lock 16 take wait lock take get lock 17 take wait lock take get lock 18 take wait lock take get lock 19
也能够修改t1,t2线程里面的循环,改为大于100,可能会碰到集合已满的状况。
这个示例中BoundedBuffer是一个固定长度的集合,
这个在其put操做时,若是发现长度已经达到最大长度,那么要等待notFull信号才能继续put,若是获得notFull信号会像集合中添加元素,而且put操做会发出notEmpty的信号,
而在其take方法中若是发现集合长度为空,那么会等待notEmpty的信号,接受到notEmpty信号才能继续take,同时若是拿到一个元素,那么会发出notFull的信号。
Condition与Object中的wati,notify,notifyAll区别:
1.Condition中的await()方法至关于Object的wait()方法,Condition中的signal()方法至关于Object的notify()方法,Condition中的signalAll()至关于Object的notifyAll()方法。
不一样的是,Object中的这些方法是和同步锁捆绑使用的;而Condition是须要与互斥锁/共享锁捆绑使用的。
2.Condition它更强大的地方在于:可以更加精细的控制多线程的休眠与唤醒。对于同一个锁,咱们能够建立多个Condition,在不一样的状况下使用不一样的Condition。
例如,假如多线程读/写同一个缓冲区:当向缓冲区中写入数据以后,唤醒"读线程";当从缓冲区读出数据以后,唤醒"写线程";而且当缓冲区满的时候,"写线程"须要等待;当缓冲区为空时,"读线程"须要等待。
若是采用Object类中的wait(),notify(),notifyAll()实现该缓冲区,当向缓冲区写入数据以后须要唤醒"读线程"时,不可能经过notify()或notifyAll()明确的指定唤醒"读线程",而只能经过notifyAll唤醒全部线程(可是notifyAll没法区分唤醒的线程是读线程,仍是写线程)。 可是,经过Condition,就能明确的指定唤醒读线程。
http://outofmemory.cn/java/java.util.concurrent/lock-reentrantlock-condition
http://blog.csdn.net/ghsau/article/details/7481142