一,Producer-Consumer模式java
Producer:生产者的意思,指的是生成数据的线程。
Consumer:消费者的意思,指的是使用数据的线程
当生产者和消费者以不一样的线程运行时,二者之间的处理速度差别就会引发问题。好比,消费者想获取数据,但是数据尚未生成。
或者生产者想要交付数据,而消费者的状态还没法接收数据。
Producer-Consumer模式在生产者消费者之间加入了一个桥梁角色。该桥梁角色用于消除线程间处理速度的差别。
在该模式中,生产者和消费者都有多个,当消费者和生产者都只有一个时,称之为Pipe模式数组
二,例子安全
/** * Table类用于表示放置蛋糕的桌子. * 存放蛋糕的容器是数组。因此放置蛋糕是有顺序的,拿取蛋糕也是有顺序的。 */ public class Table { private final String[] buffer; //实际存放蛋糕的容器 private int tail;//下次put的位置 private int head;//下次take的位置 private int count;//buffer中蛋糕的个数 public Table(int count) { this.buffer = new String[count]; this.tail = 0; this.head = 0; this.count = 0; } //放置蛋糕 public synchronized void put(String cake)throws InterruptedException{ System.out.println(Thread.currentThread().getName()+" put "+cake); while (count >= buffer.length){ wait(); } buffer[tail] = cake; tail = (tail+1)%buffer.length; count++; notifyAll(); } //拿取蛋糕 public synchronized String take()throws InterruptedException{ while (count <= 0){ wait(); } String cake = buffer[head]; head = (head+1)%buffer.length; count--; notifyAll(); System.out.println(Thread.currentThread().getName()+" takes "+cake); return cake; } }
public class MakerThread extends Thread { private final Random random; private final Table table; private static int id =0;//蛋糕的流水号 public MakerThread(String name, Table table, long seed) { super(name); this.random = new Random(seed); this.table = table; } @Override public void run() { try { while (true){ Thread.sleep(random.nextInt(1000)); String cake = "[ Cake NO."+nextId() +" by "+getName()+"]"; table.put(cake); } }catch (InterruptedException e){ } } private static synchronized int nextId(){ return id++; } }
public class EaterThread extends Thread { private final Random random; private final Table table; public EaterThread(String name, Table table, long seed) { super(name); this.random = new Random(seed); this.table = table; } @Override public void run() { try { while (true){ String cake = table.take(); Thread.sleep(random.nextInt(1000)); } }catch (InterruptedException e){ } } }
public class Test { public static void main(String[] args) { Table table = new Table(3); new MakerThread("MakerThread-1",table,31415).start(); new MakerThread("MakerThread-2",table,92653).start(); new MakerThread("MakerThread-3",table,58979).start(); new EaterThread("EaterThread-1",table,32384).start(); new EaterThread("EaterThread-2",table,62643).start(); new EaterThread("EaterThread-3",table,38327).start(); } }
三,InterruptedException异常数据结构
1.加了InterruptedException 的方法
java.lang.Object类的wait方法
java.lang.Thread类的sleep方法
java.lang.Thread类的的join方法
2.加了 throws InterruptedException 的方法可能会花费时间,可是能够取消
花费时间体如今:
执行wait方法,须要等待notify/notifyAll方法唤醒,须要花费时间
执行sleep方法,会暂停执行,这也花费时间
执行join方法,会等待指定线程终止,须要时间
能够取消体如今:
假如Alice线程执行了Thread.sleep(100000);咱们能够在别的线程中使用 alice.interrupt();当执行了interrupt后,正在sleep的线程会终止暂停状态,alice线程抛出
InterruptedException异常。这样线程Alice的控制权就会转移到捕获该异常的catch语句块中。
3.interrupt方法只是改变了中断状态
上面调用interrupt后,线程抛出InterruptedException异常,只是由于alice线程调用了线程中断的方法(wait,sleep,join)。
当线程执行普通的逻辑处理时,即便别的线程调用alice.interrupt(); 也不会抛出异常,而是继续执行。只有当线程继续执行到sleep,wait,join等方法的调用时,
才会抛出InterruptedException异常
4,其余线程执行alice.interrupt()时,并不须要获取alice线程实例的锁,不管什么时候,任何线程均可以调用其余线程的interrupt方法
5.notify和interrupt的区别:
??????????dom
四,isInterrupted方法:检查中断状态ide
isInterrupted是Thread类的实例方法,用于检查指定线程的中断状态,
若指定线程处于中断状态:返回true,
未处于中断状态:返回false函数
五,Thread.Interrupted:检查并清除中断状态this
该方法是Thread类的静态方法,用于检查并清除当前线程的中断状态(操做对象是线程自己)
若当前线程处于中断状态,返回true。而且当前线程的中断状态会被清楚。
若当前线程未处于中断状态,返回false。spa
六,java.util.concurrent包中的队列线程
1.BlockingQueue接口:阻塞队列
该接口表示在达到合适的状态以前线程一直阻塞(wait)的队列。实现类:
1).ArrayBlockingQueue类:基于数组的 BlockingQueue
元素个数有限的BlockingQueue
2).LinkedBlockingQueue类:基于链表的BlockingQueue
元素个数没有最大限制的,只要有内存,就能够一直put数据
3).PriorityBlockingQueue类:带有优先级的BlockingQueue
数据的优先级是根据Comparable接口的天然排序,或构造函数的Comparator接口决定的顺序指定的
4).SynchronousQueue类:直接传递的BlockingQueue
该类用于执行由Producer角色到Consumer角色的 直接传递。若是Producer角色先put,在Consumer角色take以前,Producer角色的线程将一直阻塞。
反之同样阻塞
2.ConcurrentLinkedQueue类:元素个数没有最大限制的线程安全队列
ConcurrentLinkedQueue中,内部数据结构是分开的,线程之间互不影响,因此就不须要进行互斥处理
3.使用ArrayBlockingQueue替代上面实例程序中的Table类
代码:
/** * 使用阻塞队列完成table的功能 */ public class BlockQueueTable extends ArrayBlockingQueue<String>{ public BlockQueueTable(int capacity) { super(capacity); } public void put(String cake) throws InterruptedException { System.out.println(Thread.currentThread().getName()+" puts "+cake); super.put(cake); } public String take() throws InterruptedException { String cake = super.take(); System.out.println(Thread.currentThread().getName()+" takes "+cake); return cake; } }
七,java.util.concurrent.Exchanger类
该类用来交换两个线程的数据,只有当两个线程都准备好了,才会进行交换。否则其中一个线程会一直等待另外一个线程
public class ExchangerTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final Exchanger exchanger = new Exchanger(); service.execute(new Runnable() { @Override public void run() { try { String data1 = "aaa"; System.out.println("线程"+Thread.currentThread().getName()+ "正在准备把 "+data1+"换出去"); Thread.sleep(new Random().nextInt(3000)); String data2 = (String) exchanger.exchange(data1); System.out.println("线程"+Thread.currentThread().getName()+ "换回的数据为"+data2); }catch (InterruptedException e){ } } }); service.execute(new Runnable() { @Override public void run() { try { String data1 = "bbb"; System.out.println("线程"+Thread.currentThread().getName()+ "正在准备把 "+data1+"换出去"); Thread.sleep(new Random().nextInt(3000)); String data2 = (String) exchanger.exchange(data1); System.out.println("线程"+Thread.currentThread().getName()+ "换回的数据为"+data2); }catch (InterruptedException e){ } } }); } }
运行结果:
线程pool-1-thread-1正在准备把 aaa换出去线程pool-1-thread-2正在准备把 bbb换出去线程pool-1-thread-2换回的数据为aaa线程pool-1-thread-1换回的数据为bbb