队列就能够想成是一个数组,从一头进入,一头出去,排队买饭java
BlockingQueue 阻塞队列,排队拥堵,首先它是一个队列,而一个阻塞队列在数据结构中所起的做用大体以下图所示:程序员
线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素数组
当阻塞队列是空时,从队列中获取元素的操做将会被阻塞
安全
当阻塞队列是满时,从队列中添加元素的操做将会被阻塞
数据结构
也就是说 试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其它线程往空的队列插入新的元素多线程
同理,试图往已经满的阻塞队列中添加新元素的线程,直到其它线程往满的队列中移除一个或多个元素,或者彻底清空队列后,使队列从新变得空闲起来,并后续新增架构
去海底捞吃饭,大厅满了,须要进候厅等待,可是这些等待的客户可以对商家带来利润,所以咱们很是欢迎他们阻塞测试
在多线程领域:所谓的阻塞,在某些清空下会挂起线程(即阻塞),一旦条件知足,被挂起的线程又会自动唤醒this
好处是咱们不须要关心何时须要阻塞线程,何时须要唤醒线程,由于这一切BlockingQueue都帮你一手包办了atom
在concurrent包发布之前,在多线程环境下,咱们每一个程序员都必须本身取控制这些细节,尤为还要兼顾效率和线程安全,而这会给咱们的程序带来不小的复杂度。
// 你用过List集合类 // ArrayList集合类熟悉么? // 还用过 CopyOnWriteList 和 BlockingQueue
BlockingQueue阻塞队列是属于一个接口,底下有七个实现类
这里须要掌握的是:ArrayBlockQueue、LinkedBlockingQueue、SynchronousQueue
抛出异常 | 当阻塞队列满时:在往队列中add插入元素会抛出 IIIegalStateException:Queue full 当阻塞队列空时:再往队列中remove移除元素,会抛出NoSuchException |
---|---|
特殊性 | 插入方法,成功true,失败false 移除方法:成功返回出队列元素,队列没有就返回空 |
一直阻塞 | 当阻塞队列满时,生产者继续往队列里put元素,队列会一直阻塞生产线程直到put数据or响应中断退出, 当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用。 |
超时退出 | 当阻塞队列满时,队里会阻塞生产者线程必定时间,超过限时后生产者线程会退出 |
但执行add方法,向已经满的ArrayBlockingQueue中添加元素时候,会抛出异常
// 阻塞队列,须要填入默认值 BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("b")); System.out.println(blockingQueue.add("c")); System.out.println(blockingQueue.add("XXX"));
运行后:
true true true Exception in thread "main" java.lang.IllegalStateException: Queue full at java.util.AbstractQueue.add(AbstractQueue.java:98) at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312) at com.moxi.interview.study.queue.BlockingQueueDemo.main(BlockingQueueDemo.java:25)
同时若是咱们多取出元素的时候,也会抛出异常,咱们假设只存储了3个值,可是取的时候,取了四次
// 阻塞队列,须要填入默认值 BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("b")); System.out.println(blockingQueue.add("c")); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove());
那么出现异常
true true true a b c Exception in thread "main" java.util.NoSuchElementException at java.util.AbstractQueue.remove(AbstractQueue.java:117) at com.moxi.interview.study.queue.BlockingQueueDemo.main(BlockingQueueDemo.java:30)
咱们使用 offer的方法,添加元素时候,若是阻塞队列满了后,会返回false,否者返回true
同时在取的时候,若是队列已空,那么会返回null
BlockingQueue blockingQueue = new ArrayBlockingQueue(3); System.out.println(blockingQueue.offer("a")); System.out.println(blockingQueue.offer("b")); System.out.println(blockingQueue.offer("c")); System.out.println(blockingQueue.offer("d")); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll());
运行结果
true true true false a b c null
咱们使用 put的方法,添加元素时候,若是阻塞队列满了后,添加消息的线程,会一直阻塞,直到队列元素减小,会被清空,才会唤醒
通常在消息中间件,好比RabbitMQ中会使用到,由于须要保证消息百分百不丢失,所以只有让它阻塞
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); blockingQueue.put("a"); blockingQueue.put("b"); blockingQueue.put("c"); System.out.println("================"); blockingQueue.take(); blockingQueue.take(); blockingQueue.take(); blockingQueue.take();
同时使用take取消息的时候,若是内容不存在的时候,也会被阻塞
offer( ) , poll 加时间
使用offer插入的时候,须要指定时间,若是2秒尚未插入,那么就放弃插入
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("b", 2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("c", 2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("d", 2L, TimeUnit.SECONDS));
同时取的时候也进行判断
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
若是2秒内取不出来,那么就返回null
SynchronousQueue没有容量,与其余BlockingQueue不一样,SynchronousQueue是一个不存储的BlockingQueue,每个put操做必须等待一个take操做,否者不能继续添加元素
下面咱们测试SynchronousQueue添加元素的过程
首先咱们建立了两个线程,一个线程用于生产,一个线程用于消费
生产的线程分别put了 A、B、C这三个字段
BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + "\t put A "); blockingQueue.put("A"); System.out.println(Thread.currentThread().getName() + "\t put B "); blockingQueue.put("B"); System.out.println(Thread.currentThread().getName() + "\t put C "); blockingQueue.put("C"); } catch (InterruptedException e) { e.printStackTrace(); } }, "t1").start();
消费线程使用take,消费阻塞队列中的内容,而且每次消费前,都等待5秒
new Thread(() -> { try { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } blockingQueue.take(); System.out.println(Thread.currentThread().getName() + "\t take A "); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } blockingQueue.take(); System.out.println(Thread.currentThread().getName() + "\t take B "); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } blockingQueue.take(); System.out.println(Thread.currentThread().getName() + "\t take C "); } catch (InterruptedException e) { e.printStackTrace(); } }, "t2").start();
最后结果输出为:
t1 put A t2 take A 5秒后... t1 put B t2 take B 5秒后... t1 put C t2 take C
咱们从最后的运行结果能够看出,每次t1线程向队列中添加阻塞队列添加元素后,t1输入线程就会等待 t2消费线程,t2消费后,t2处于挂起状态,等待t1在存入,从而周而复始,造成 一存一取的状态
一个初始值为0的变量,两个线程对其交替操做,一个加1,一个减1,来5轮
关于多线程的操做,咱们须要记住下面几句
咱们下面实现一个简单的生产者消费者模式,首先有资源类ShareData
/** * 资源类 */ class ShareData { private int number = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment() throws Exception{ // 同步代码块,加锁 lock.lock(); try { // 判断 while(number != 0) { // 等待不能生产 condition.await(); } // 干活 number++; System.out.println(Thread.currentThread().getName() + "\t " + number); // 通知 唤醒 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void decrement() throws Exception{ // 同步代码块,加锁 lock.lock(); try { // 判断 while(number == 0) { // 等待不能消费 condition.await(); } // 干活 number--; System.out.println(Thread.currentThread().getName() + "\t " + number); // 通知 唤醒 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
里面有一个number变量,同时提供了increment 和 decrement的方法,分别让number 加1和减1
可是咱们在进行判断的时候,为了防止出现虚假唤醒机制,不能使用if来进行判断,而应该使用while
// 判断 while(number != 0) { // 等待不能生产 condition.await(); }
不能使用 if判断
// 判断 if(number != 0) { // 等待不能生产 condition.await(); }
完整代码
/** * 生产者消费者 传统版 * 题目:一个初始值为0的变量,两个线程对其交替操做,一个加1,一个减1,来5轮 */ /** * 线程 操做 资源类 * 判断 干活 通知 * 防止虚假唤醒机制 */ /** * 资源类 */ class ShareData { private int number = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment() throws Exception{ // 同步代码块,加锁 lock.lock(); try { // 判断 while(number != 0) { // 等待不能生产 condition.await(); } // 干活 number++; System.out.println(Thread.currentThread().getName() + "\t " + number); // 通知 唤醒 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void decrement() throws Exception{ // 同步代码块,加锁 lock.lock(); try { // 判断 while(number == 0) { // 等待不能消费 condition.await(); } // 干活 number--; System.out.println(Thread.currentThread().getName() + "\t " + number); // 通知 唤醒 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } public class ProdConsumerTraditionDemo { public static void main(String[] args) { // 高内聚,低耦合 内聚指的是,一个空调,自身带有调节温度高低的方法 ShareData shareData = new ShareData(); // t1线程,生产 new Thread(() -> { for (int i = 0; i < 5; i++) { try { shareData.increment(); } catch (Exception e) { e.printStackTrace(); } } }, "t1").start(); // t2线程,消费 new Thread(() -> { for (int i = 0; i < 5; i++) { try { shareData.decrement(); } catch (Exception e) { e.printStackTrace(); } } }, "t2").start(); } }
最后运行成功后,咱们一个进行生产,一个进行消费
t1 1 t2 0 t1 1 t2 0 t1 1 t2 0 t1 1 t2 0 t1 1 t2 0
在concurrent包发布之前,在多线程环境下,咱们每一个程序员都必须本身去控制这些细节,尤为还要兼顾效率和线程安全,则这会给咱们的程序带来不小的时间复杂度
如今咱们使用新版的阻塞队列版生产者和消费者,使用:volatile、CAS、atomicInteger、BlockQueue、线程交互、原子引用
/** * 生产者消费者 阻塞队列版 * 使用:volatile、CAS、atomicInteger、BlockQueue、线程交互、原子引用 * */ class MyResource { // 默认开启,进行生产消费 // 这里用到了volatile是为了保持数据的可见性,也就是当TLAG修改时,要立刻通知其它线程进行修改 private volatile boolean FLAG = true; // 使用原子包装类,而不用number++ private AtomicInteger atomicInteger = new AtomicInteger(); // 这里不能为了知足条件,而实例化一个具体的SynchronousBlockingQueue BlockingQueue<String> blockingQueue = null; // 而应该采用依赖注入里面的,构造注入方法传入 public MyResource(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; // 查询出传入的class是什么 System.out.println(blockingQueue.getClass().getName()); } /** * 生产 * @throws Exception */ public void myProd() throws Exception{ String data = null; boolean retValue; // 多线程环境的判断,必定要使用while进行,防止出现虚假唤醒 // 当FLAG为true的时候,开始生产 while(FLAG) { data = atomicInteger.incrementAndGet() + ""; // 2秒存入1个data retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS); if(retValue) { System.out.println(Thread.currentThread().getName() + "\t 插入队列:" + data + "成功" ); } else { System.out.println(Thread.currentThread().getName() + "\t 插入队列:" + data + "失败" ); } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + "\t 中止生产,表示FLAG=false,生产介绍"); } /** * 消费 * @throws Exception */ public void myConsumer() throws Exception{ String retValue; // 多线程环境的判断,必定要使用while进行,防止出现虚假唤醒 // 当FLAG为true的时候,开始生产 while(FLAG) { // 2秒存入1个data retValue = blockingQueue.poll(2L, TimeUnit.SECONDS); if(retValue != null && retValue != "") { System.out.println(Thread.currentThread().getName() + "\t 消费队列:" + retValue + "成功" ); } else { FLAG = false; System.out.println(Thread.currentThread().getName() + "\t 消费失败,队列中已为空,退出" ); // 退出消费队列 return; } } } /** * 中止生产的判断 */ public void stop() { this.FLAG = false; } } public class ProdConsumerBlockingQueueDemo { public static void main(String[] args) { // 传入具体的实现类, ArrayBlockingQueue MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10)); new Thread(() -> { System.out.println(Thread.currentThread().getName() + "\t 生产线程启动"); System.out.println(""); System.out.println(""); try { myResource.myProd(); System.out.println(""); System.out.println(""); } catch (Exception e) { e.printStackTrace(); } }, "prod").start(); new Thread(() -> { System.out.println(Thread.currentThread().getName() + "\t 消费线程启动"); try { myResource.myConsumer(); } catch (Exception e) { e.printStackTrace(); } }, "consumer").start(); // 5秒后,中止生产和消费 try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(""); System.out.println(""); System.out.println("5秒中后,生产和消费线程中止,线程结束"); myResource.stop(); } }
最后运行结果
java.util.concurrent.ArrayBlockingQueue prod 生产线程启动 consumer 消费线程启动 prod 插入队列:1成功 consumer 消费队列:1成功 prod 插入队列:2成功 consumer 消费队列:2成功 prod 插入队列:3成功 consumer 消费队列:3成功 prod 插入队列:4成功 consumer 消费队列:4成功 prod 插入队列:5成功 consumer 消费队列:5成功 5秒中后,生产和消费线程中止,线程结束 prod 中止生产,表示FLAG=false,生产介绍