生产者消费者问题是研究多线程程序时绕不开的经典问题之一,它描述是有一块缓冲区做为仓库,生产者能够将产品放入仓库,消费者则能够从仓库中取走产品。解决生产者/消费者问题的方法可分为两类:(1)采用某种机制保护生产者和消费者之间的同步;(2)在生产者和消费者之间创建一个管道。第一种方式有较高的效率,而且易于实现,代码的可控制性较好,属于经常使用的模式。第二种管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。java
同步问题核心在于:如何保证同一资源被多个线程并发访问时的完整性。经常使用的同步方法是采用信号或加锁机制,保证资源在任意时刻至多被一个线程访问。Java语言在多线程编程上实现了彻底对象化,提供了对同步机制的良好支持。在Java中一共有五种方法支持同步,其中前四个是同步方法,一个是管道方法。算法
wait() / nofity()方法是基类Object的两个方法,也就意味着全部Java类都会拥有这两个方法,这样,咱们就能够为任何对象实现同步机制。数据库
wait()方法:当缓冲区已满/空时,生产者/消费者线程中止本身的执行,放弃锁,使本身处于等等状态,让其余线程执行。编程
notify()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其余等待的线程发出可执行的通知,同时放弃锁,使本身处于等待状态。数据结构
各起了4个生产者,4个消费者 多线程
package test; /* * 生产者消费者问题(ReentrantLock) * 库存为空时,生产者生产,消费者等待 * 库存尽是,消费者消费,生产者等待 */ public class Hosee { private static Integer count = 0; private final Integer FULL = 10; private static String LOCK = "LOCK"; class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } synchronized (LOCK) { while (count == FULL) { try { LOCK.wait(); } catch (Exception e) { e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count); //每生产一个唤醒消费者消费 LOCK.notifyAll(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } synchronized (LOCK) { while (count == 0) { try { LOCK.wait(); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } count--; System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有" + count); //没消费一个唤醒生产者生产 LOCK.notifyAll(); } } } } public static void main(String[] args) throws Exception { Hosee hosee = new Hosee(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); } }
(须要注意的是,用什么加锁就用什么notify和wait,实例中使用的是LOCK)并发
部分打印结果:框架
因为生产者和消费者说明一致,因此最多都是在2左右,当减小一个消费者时,则会加到10。dom
首先,咱们先来看看await()/signal()与wait()/notify()的区别:ide
那么为何有了synchronized还要提出Lock呢?
synchronized并不完美,它有一些功能性的限制 —— 它没法中断一个正在等候得到锁的线程,也没法经过投票获得锁,若是不想等下去,也就无法获得锁。同步还要求锁的释放只能在与得到锁所在的堆栈帧相同的堆栈帧中进行,多数状况下,这没问题(并且与异常处理交互得很好),可是,确实存在一些非块结构的锁定更合适的状况。
java.util.concurrent.lock 中的 Lock 框架是锁定的一个抽象,它容许把锁定的实现做为 Java 类,而不是做为语言的特性来实现(更加面向对象)。这就为 Lock 的多种实现留下了空间,各类实现可能有不一样的调度算法、性能特性或者锁定语义。 ReentrantLock 类实现了 Lock ,它拥有与 synchronized 相同的并发性和内存语义,可是添加了相似锁投票、定时锁等候和可中断锁等候的一些特性。此外,它还提供了在激烈争用状况下更佳的性能。(换句话说,当许多线程都想访问共享资源时,JVM 能够花更少的时候来调度线程,把更多时间用在执行线程上。)
reentrant 锁意味着什么呢?简单来讲,它有一个与锁相关的获取计数器,若是拥有锁的某个线程再次获得锁,那么获取计数器就加1,而后锁须要被释放两次才能得到真正释放(重入锁)。这模仿了 synchronized 的语义;若是线程进入由线程已经拥有的监控器保护的 synchronized 块,就容许线程继续进行,当线程退出第二个(或者后续) synchronized 块的时候,不释放锁,只有线程退出它进入的监控器保护的第一个synchronized 块时,才释放锁。
简单解释下重入锁:
public class Child extends Father implements Runnable{ final static Child child = new Child();//为了保证锁惟一 public static void main(String[] args) { for (int i = 0; i < 50; i++) { new Thread(child).start(); } } public synchronized void doSomething() { System.out.println("1child.doSomething()"); doAnotherThing(); // 调用本身类中其余的synchronized方法 } private synchronized void doAnotherThing() { super.doSomething(); // 调用父类的synchronized方法 System.out.println("3child.doAnotherThing()"); } @Override public void run() { child.doSomething(); } } class Father { public synchronized void doSomething() { System.out.println("2father.doSomething()"); } }
上述代码的锁都是child对象,当执行child.doSomething时,该线程得到child对象的锁,在doSomething方法内执行doAnotherThing时再次请求child对象的锁,由于synchronized是重入锁,因此能够获得该锁,继续在doAnotherThing里执行父类的doSomething方法时第三次请求child对象的锁,同理可获得,若是不是重入锁的话,那这后面这两次请求锁将会被一直阻塞,从而致使死锁。
在查看下面代码示例时,能够看到 Lock 和 synchronized 有一点明显的区别 —— lock 必须在 finally 块中释放。不然,若是受保护的代码将抛出异常,锁就有可能永远得不到释放!这一点区别看起来可能没什么,可是实际上,它极为重要。忘记在 finally 块中释放锁,可能会在程序中留下一个定时炸弹,当有一天炸弹爆炸时,您要花费很大力气才有找到源头在哪。而使用同步,JVM 将确保锁会得到自动释放。
Lock lock = new ReentrantLock(); lock.lock(); try { // update object state } finally { lock.unlock(); }
除此以外,与目前的 synchronized 实现相比,争用下的 ReentrantLock 实现更具可伸缩性。(在将来的 JVM 版本中,synchronized 的争用性能颇有可能会得到提升。)这意味着当许多线程都在争用同一个锁时,使用 ReentrantLock 的整体开支一般要比 synchronized 少得多。
在 Java1.5 中,synchronized 是性能低效的。由于这是一个重量级操做,须要调用操做接口,致使有可能加锁消耗的系统时间比加锁之外的操做还多。相比之下使用 Java 提供的 Lock 对象,性能更高一些。可是到了 Java1.6,发生了变化。synchronized 在语义上很清晰,能够进行不少优化,有适应自旋,锁消除,锁粗化,轻量级锁,偏向锁等等。致使在 Java1.6 上 synchronized 的性能并不比 Lock 差。官方也表示,他们也更支持 synchronized,在将来的版本中还有优化余地。
因此在确实须要一些 synchronized 所没有的特性的时候,好比时间锁等候、可中断锁等候、无块结构锁、多个条件变量或者锁投票使用ReentrantLock。ReentrantLock 还具备可伸缩性的好处,应当在高度争用的状况下使用它,可是请记住,大多数 synchronized 块几乎历来没有出现过争用,因此能够把高度争用放在一边。我建议用 synchronized 开发,直到确实证实 synchronized 不合适,而不要仅仅是假设若是使用 ReentrantLock “性能会更好”。请记住,这些是供高级用户使用的高级工具。(并且,真正的高级用户喜欢选择可以找到的最简单工具,直到他们认为简单的工具不适用为止。)。一如既往,首先要把事情作好,而后再考虑是否是有必要作得更快。
package test; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Hosee { private static Integer count = 0; private final Integer FULL = 10; final Lock lock = new ReentrantLock(); final Condition NotFull = lock.newCondition(); final Condition NotEmpty = lock.newCondition(); class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } lock.lock(); try { while (count == FULL) { try { NotFull.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count); NotEmpty.signal(); } finally { lock.unlock(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } lock.lock(); try { while (count == 0) { try { NotEmpty.await(); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } count--; System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有" + count); NotFull.signal(); } finally { lock.unlock(); } } } } public static void main(String[] args) throws Exception { Hosee hosee = new Hosee(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); } }
运行结果与第一个相似。上述代码用了两个Condition,其实用一个也是能够的,只不过要signalall()。
put()方法:相似于咱们上面的生产者线程,容量达到最大时,自动阻塞。
take()方法:相似于咱们上面的消费者线程,容量为0时,自动阻塞。
package test; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class Hosee { private static Integer count = 0; final BlockingQueue<Integer> bq = new ArrayBlockingQueue<Integer>(10); class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } try { bq.put(1); count++; System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } try { bq.take(); count--; System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有" + count); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } } } public static void main(String[] args) throws Exception { Hosee hosee = new Hosee(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); } }
其实这个BlockingQueue比较难用代码来演示,由于put()与take()方法没法与输出语句保证同步,固然你能够本身去实现 BlockingQueue(BlockingQueue是用await()/signal() 实现的)。因此在输出结果上你会发现不匹配。
例如:当缓冲区已满,生产者在put()操做时,put()内部调用了await()方法,放弃了线程的执行,而后消费者线程执行,调用take()方法,take()内部调用了signal()方法,通知生产者线程能够执行,导致在消费者的println()还没运行的状况下生产者的println()先被执行,因此有了输出不匹配的状况。
对于BlockingQueue你们能够放心使用,这可不是它的问题,只是在它和别的对象之间的同步有问题。
Semaphore 信号量,就是一个容许实现设置好的令牌。也许有1个,也许有10个或更多。
谁拿到令牌(acquire)就能够去执行了,若是没有令牌则须要等待。
执行完毕,必定要归还(release)令牌,不然令牌会被很快用光,别的线程就没法得到令牌而执行下去了。
package test; import java.util.concurrent.Semaphore; public class Hosee { int count = 0; final Semaphore notFull = new Semaphore(10); final Semaphore notEmpty = new Semaphore(0); final Semaphore mutex = new Semaphore(1); class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } try { notFull.acquire();//顺序不能颠倒,不然会形成死锁。 mutex.acquire(); count++; System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count); } catch (Exception e) { e.printStackTrace(); } finally { mutex.release(); notEmpty.release(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } try { notEmpty.acquire();//顺序不能颠倒,不然会形成死锁。 mutex.acquire(); count--; System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有" + count); } catch (Exception e) { e.printStackTrace(); } finally { mutex.release(); notFull.release(); } } } } public static void main(String[] args) throws Exception { Hosee hosee = new Hosee(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); } }
注意notFull.acquire()与mutex.acquire()的位置不能互换,若是先获得互斥锁再发生等待,会形成死锁。
package test; import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; public class Hosee { final PipedInputStream pis = new PipedInputStream(); final PipedOutputStream pos = new PipedOutputStream(); { try { pis.connect(pos); } catch (IOException e) { e.printStackTrace(); } } class Producer implements Runnable { @Override public void run() { try{ while(true){ int b = (int) (Math.random() * 255); System.out.println("Producer: a byte, the value is " + b); pos.write(b); pos.flush(); } }catch(Exception e){ e.printStackTrace(); }finally{ try{ pos.close(); pis.close(); }catch(IOException e){ System.out.println(e); } } } } class Consumer implements Runnable { @Override public void run() { try{ while(true){ int b = pis.read(); System.out.println("Consumer: a byte, the value is " + String.valueOf(b)); } }catch(Exception e){ e.printStackTrace(); }finally{ try{ pos.close(); pis.close(); }catch(IOException e){ System.out.println(e); } } } } public static void main(String[] args) throws Exception { Hosee hosee = new Hosee(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); } }
与阻塞队列同样,因为read()/write()方法与输出方法不必定同步,输出结果方面会发生不匹配现象,为了使结果更加明显,这里只有1个消费者和1个生产者。
读者—写者问题(Readers-Writers problem)也是一个经典的并发程序设计问题,是常常出现的一种同步问题。计算机系统中的数据(文件、记录)常被多个进程共享,但其中某些进程可能只要求读数据(称为读者Reader);另外一些进程则要求修改数据(称为写者Writer)。就共享数据而言,Reader和Writer是两组并发进程共享一组数据区,要求:
(1)容许多个读者同时执行读操做;
(2)不容许读者、写者同时操做;
(3)不容许多个写者同时操做。
Reader和Writer的同步问题分为读者优先、弱写者优先(公平竞争)和强写者优先三种状况,它们的处理方式不一样。
首先咱们都只考虑公平竞争的状况下,看看Java有哪些方法能够实现读者写者问题
ReentrantReadWriteLock会使用两把锁来解决问题,一个读锁,一个写锁
线程进入读锁的前提条件:
没有其余线程的写锁,
没有写请求或者有写请求,但调用线程和持有锁的线程是同一个
线程进入写锁的前提条件:
没有其余线程的读锁
没有其余线程的写锁
到ReentrantReadWriteLock,首先要作的是与ReentrantLock划清界限。它和后者都是单独的实现,彼此之间没有继承或实现的关系。而后就是总结这个锁机制的特性了:
看下ReentrantReadWriteLock这个类的两个构造函数
public ReentrantReadWriteLock() { this(false); } /** * Creates a new {@code ReentrantReadWriteLock} with * the given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantReadWriteLock(boolean fair) { sync = (fair)? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
fair这个参数表示是不是建立一个公平的读写锁,仍是非公平的读写锁。也就是抢占式仍是非抢占式。
公平和非公平:公平表示获取的锁的顺序是按照线程加锁的顺序来分配获取到锁的线程时最早加锁的线程,是按照FIFO的顺序来分配锁的;非公平表示获取锁的顺序是无需的,后来加锁的线程可能先得到锁,这种状况就致使某些线程可能一直没获取到锁。
公平锁为啥会影响性能,从code上来看看公平锁仅仅是多了一项检查是否在队首会影响性能,如不是,那么又是在什么地方影响的?假如是闯入的线程,会排在队尾并睡觉(parking)等待前任节点唤醒,这样势必会比非公平锁添加不少paking和unparking的操做
通常的应用场景是: 若是有多个读线程,一个写线程,并且写线程在操做的时候须要阻塞读线程,那么此时就须要使用公平锁,要否则可能写线程一直获取不到锁,致使线程饿死。
再简单说下锁降级
重入还容许从写入锁降级为读取锁,其实现方式是:先获取写入锁,而后获取读取锁,最后释放写入锁。可是,从读取锁升级到写入锁是不可能的。
rwl.readLock().lock(); if (!cacheValid) { // Must release read lock before acquiring write lock rwl.readLock().unlock(); rwl.writeLock().lock(); if (!cacheValid) { data = ... cacheValid = true; } rwl.readLock().lock(); rwl.writeLock().unlock(); // 降级:先获取读锁再释放写锁 }
下面咱们用读写锁来实现读者写者问题
import java.util.Random; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ReadWriteLockTest { public static void main(String[] args) { final Queue3 q3 = new Queue3(); for (int i = 0; i < 3; i++) { new Thread() { public void run() { while (true) { q3.get(); } } }.start(); } for (int i = 0; i < 3; i++) { new Thread() { public void run() { while (true) { q3.put(new Random().nextInt(10000)); } } }.start(); } } } class Queue3 { private Object data = null;// 共享数据,只能有一个线程能写该数据,但能够有多个线程同时读该数据。 private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); public void get() { rwl.readLock().lock();// 上读锁,其余线程只能读不能写 System.out.println(Thread.currentThread().getName() + " be ready to read data!"); try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "have read data :" + data); rwl.readLock().unlock(); // 释放读锁,最好放在finnaly里面 } public void put(Object data) { rwl.writeLock().lock();// 上写锁,不容许其余线程读也不容许写 System.out.println(Thread.currentThread().getName() + " be ready to write data!"); try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } this.data = data; System.out.println(Thread.currentThread().getName() + " have write data: " + data); rwl.writeLock().unlock();// 释放写锁 } }
运行结果:
Thread-0 be ready to read data! Thread-1 be ready to read data! Thread-2 be ready to read data! Thread-0have read data :null Thread-2have read data :null Thread-1have read data :null Thread-5 be ready to write data! Thread-5 have write data: 6934 Thread-5 be ready to write data! Thread-5 have write data: 8987 Thread-5 be ready to write data! Thread-5 have write data: 8496
在1.4中已经介绍了用信号量来实现生产者消费者问题,如今咱们将用信号量来实现读者写者问题,信号量的相关知识再也不重复,直接看代码
package test; import java.util.Random; import java.util.concurrent.Semaphore; public class ReadWrite { public static void main(String[] args) { final Queue3 q3 = new Queue3(); for (int i = 0; i < 3; i++) { new Thread() { public void run() { while (true) { try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } q3.get(); } } }.start(); } for (int i = 0; i < 3; i++) { new Thread() { public void run() { while (true) { try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } q3.put(new Random().nextInt(10000)); } } }.start(); } } } class Queue3 { private Object data = null;// 共享数据,只能有一个线程能写该数据,但能够有多个线程同时读该数据。 private Semaphore wmutex = new Semaphore(1); private Semaphore rmutex = new Semaphore(2); private int count = 0; public void get() { try { rmutex.acquire(); if (count == 0) wmutex.acquire();// 当第一读进程欲读数据库时,阻止写进程写 count++; System.out.println(Thread.currentThread().getName() + " be ready to read data!"); try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "have read data :" + data); count--; if (count == 0) wmutex.release(); rmutex.release(); } catch (Exception e) { e.printStackTrace(); } } public void put(Object data) { try { wmutex.acquire(); System.out.println(Thread.currentThread().getName() + " be ready to write data!"); try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } this.data = data; System.out.println(Thread.currentThread().getName() + " have write data: " + data); } catch (Exception e) { e.printStackTrace(); } finally { wmutex.release(); } } }
单纯使用信号量不能解决读者与写者问题,必须引入计数器count(能够用CountDownLatch代替 )对读进程计数;count与wmutex结合使用,使读读能同时进行,读写排斥。count为0时表示读进程开始,此时写进程阻塞(wmutex被读进程获取),当count不为0时,表示有多个读进程,就不用操做 wmutex了,由于第一个读进程已经得到了wmutex。count表示有多少个读进程在读,每次有一个就+1,读完了-1,当count==0时,表示读进程都结束了。此时wmutex释放,写进程才有机会得到wmutex。为了使读进程不要一直占有 wmutex,最好让读进程sleep一下,让写进程有机会得到wmutex,使效果更明显。
就此用Java实现生产者消费者问题(5种)和读者写者问题(2种)已经阐述完了,欢迎你们讨论以及给出不一样的解决方案。