来源:html
https://blog.biezhi.me/2019/01/simple-blocking-queue.htmljava
Java 并发经常使用的组件中有一种队列叫阻塞队列(BlockingQueue),当队列为空时,获取元素的线程会阻塞等待直到队列有数据;当队列满时,想要存储元素的线程会阻塞等待直到队列有空间。咱们常常会用这种数据结构能够实现生产者、消费者模型。面试
本文会经过两种方式来实现简单的有界阻塞队列,在最后分别测试不一样实现的性能差别。数组
看过 Java 并发相关书籍的同窗应该都见过 Monitor 这个词,有人称为监视器也有人叫它管程,不过都是一个意思:一个同步工具,至关于操做系统中的互斥量(mutex),即值为 1 的信号量。安全
synchronized
关键词的背后就是靠 monitor 实现的,monitor 的重要特色是,同一个时刻,只有一个线程能进入 monitor 中定义的临界区,这使得 monitor 可以达到互斥的效果。但仅仅有互斥的做用是不够的,没法进入 monitor 临界区的线程应该被阻塞,在必要的时候能够被唤醒,因此 Java 提供了 wait
和 notify
、notifyAll
的 API 给咱们使用。数据结构
wait()
: 让当前线程进入等待队列,同时会释放锁,直到被唤醒。notify()
: 从条件队列中随机唤醒一个线程,让它去参与锁竞争notifyAll()
: 唤醒条件队列中的全部线程,让它们去参与锁竞争实现同步的一种方式是使用 synchronized
关键字,还可使用 Lock
接口下的实现来完成,好比 ReentrantLock
,它是一把重入锁(synchronized 也是),基于 AQS 并发框架实现。咱们可使用它来进行加锁和释放锁,若是遇到有条件须要阻塞可使用 Condition
API。并发
Lock#newCondition()
: 建立一个新的条件Condition#await()
: 让当前线程等待Condition#signal()
: 唤醒一个等待线程条件和锁老是息息相关,在没有 Lock 接口的时候你会发现 monitor 机制有一个严重的问题:一把锁只能对应一个条件(也就是只能够作一次 wait),那么在唤醒的时候就可能出现唤醒丢失。举个例子,在两个方法上有不一样的条件会致使阻塞,它们持有一把锁,唤醒时候若是用 notify
只会从条件队列选择一个,使用 notifyAll
会带来大量的 CPU 上下文切换和锁竞争,伪代码以下:框架
1synchronized void foo() { 2 while(CONDITION1){ 3 wait(); 4 } 5 notifyAll(); 6} 7synchronized void bar() { 8 while(CONDITION2){ 9 wait(); 10 } 11 notifyAll(); 12}
咱们经过定义一个 Queue
接口来实现两种队列,该队列是有界队列,使用数组的方式实现,若是你有兴趣也可使用链表或栈来实现这个队列。提供 put
方法添加元素(满了则阻塞),take
方法弹出元素(没有元素则阻塞)。ide
1public interface Queue<E> { 2 3 // 添加新元素,当队列满则阻塞 4 void put(E e) throws InterruptedException; 5 6 // 弹出队头元素,当队列空则阻塞 7 E take() throws InterruptedException; 8 9 // 队列元素个数 10 int size(); 11 12 // 队列是否为空 13 boolean isEmpty(); 14 15}
核心思路:工具
1public class BlockingQueueWithSync<E> implements Queue<E> { 2 3 private E[] array; 4 private int head; // 队头指针 5 private int tail; // 队尾指针 6 7 private volatile int size; // 队列元素个数 8 9 public BlockingQueueWithSync(int capacity) { 10 array = (E[]) new Object[capacity]; 11 } 12 13 @Override 14 public synchronized void put(E e) throws InterruptedException { 15 // 当队列满的时候阻塞 16 while (size == array.length) { 17 this.wait(); 18 } 19 20 array[tail] = e; 21 // 队列装满后索引归零 22 if (++tail == array.length) { 23 tail = 0; 24 } 25 ++size; 26 // 通知其余消费端有数据了 27 this.notifyAll(); 28 } 29 30 @Override 31 public synchronized E take() throws InterruptedException { 32 // 当队列空的时候阻塞 33 while (isEmpty()) { 34 this.wait(); 35 } 36 37 E element = array[head]; 38 // 消费完后从0开始 39 if (++head == array.length) { 40 head = 0; 41 } 42 --size; 43 // 通知其余生产者能够生产了 44 this.notifyAll(); 45 return element; 46 } 47 48 @Override 49 public synchronized boolean isEmpty() { 50 return size == 0; 51 } 52 53 @Override 54 public synchronized int size() { 55 return size; 56 } 57 58}
1public class BlockingQueueWithLock<E> implements Queue<E> { 2 3 private E[] array; 4 private int head; 5 private int tail; 6 7 private volatile int size; 8 9 private Lock lock = new ReentrantLock(); 10 private Condition notFull = lock.newCondition(); 11 private Condition notEmpty = lock.newCondition(); 12 13 public BlockingQueueWithLock(int capacity) { 14 array = (E[]) new Object[capacity]; 15 } 16 17 @Override 18 public void put(E e) throws InterruptedException { 19 lock.lockInterruptibly(); 20 try { 21 // 队列满,阻塞 22 while (size == array.length) { 23 notFull.await(); 24 } 25 array[tail] = e; 26 if (++tail == array.length) { 27 tail = 0; 28 } 29 ++size; 30 notEmpty.signal(); 31 } finally { 32 lock.unlock(); 33 } 34 } 35 36 @Override 37 public E take() throws InterruptedException { 38 lock.lockInterruptibly(); 39 try { 40 // 队列空,阻塞 41 while (isEmpty()) { 42 notEmpty.await(); 43 } 44 E element = array[head]; 45 if (++head == array.length) { 46 head = 0; 47 } 48 --size; 49 // 通知isFull条件队列有元素出去 50 notFull.signal(); 51 return element; 52 } finally { 53 lock.unlock(); 54 } 55 } 56 57 @Override 58 public boolean isEmpty() { 59 lock.lock(); 60 try { 61 return size == 0; 62 } finally { 63 lock.unlock(); 64 } 65 } 66 67 @Override 68 public int size() { 69 lock.lock(); 70 try { 71 return size; 72 } finally { 73 lock.unlock(); 74 } 75 } 76 77}
1public class Benchmark { 2 3 @Test 4 public void testWithMonitor() { 5 Queue<Integer> queue = new BlockingQueueWithSync<>(5); 6 execute(queue); 7 } 8 9 @Test 10 public void testWithCondition() { 11 Queue<Integer> queue = new BlockingQueueWithLock<>(5); 12 execute(queue); 13 } 14 15 private void execute(Queue<Integer> queue) { 16 ExecutorService executorService = Executors.newCachedThreadPool(); 17 for (int i = 1; i <= 1000; i++) { 18 final int finalNum = i; 19 executorService.execute(() -> { 20 try { 21 queue.put(finalNum); 22 Integer take = queue.take(); 23 System.out.println("item: " + take); 24 } catch (InterruptedException e) { 25 e.printStackTrace(); 26 } 27 }); 28 } 29 executorService.shutdown(); 30 } 31 32}
这个测试程序让 2 个队列的可存储的元素数都为 5,开启 1000 个线程进行 put
和 take
操做,运行后查看总耗时。
能够看出,使用 synchronized
的方式性能较差。
● 深刻浅出 CAS
● Spring Boot Admin 2.2.0发布,支持最新Spring Boot/Cloud以外,新增中文展现!
● 你应该知道的 @ConfigurationProperties 注解的使用姿式,这一篇就够了
若有收获,请帮忙转发,您的鼓励是做者最大的动力,谢谢!
本文由博客一文多发平台 OpenWrite 发布!