突击并发编程JUC系列演示代码地址: https://github.com/mtcarpenter/JavaTutorial前端
什么是阻塞队列
阻塞队列(BlockingQueue)是一个支持两个附加操做的队列。这两个附加的操做支持阻塞的插入和移除方法。java
- 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
- 支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。
阻塞队列经常使用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。git
插入和移除操做的4种处理方式
方法/处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
- 抛出异常:当队列满时,若是再往队列里插入元素,会抛出
IllegalStateException
("Queue full")异常。当队列空时,从队列里获取元素会抛出NoSuchElementException
异常。 - 返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。若是是移除方法,则是从队列里取出一个元素,若是没有则返回 null 。
- 一直阻塞:当阻塞队列满时,若是生产者线程往队列里 put 元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,若是消费者线程从队列里 take 元素,队列会阻塞住消费者线程,直到队列不为空。
- 超时退出:当阻塞队列满时,若是生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,若是超过了指定的时间,生产者线程就会退出。
若是是无界阻塞队列,队列不可能会出现满的状况,因此使用 put 或 offer 方法永远不会被阻塞,并且使用offer方法时,该方法永远返回 true。github
ArrayBlockingQueue
ArrayBlockingQueue
是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。 默认状况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,能够按照阻塞的前后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程均可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。为了保证公平性,一般会下降吞吐量。编程
阻塞式写方法
在ArrayBlockingQueue
中提供了两个阻塞式写方法,分别以下(在该队列中,不管是阻塞式写方法仍是非阻塞式写方法,都不容许写入null)。后端
void put(E e)
:向队列的尾部插入新的数据,当队列已满时调用该方法的线程会进入阻塞,直到有其余线程对该线程执行了中断操做,或者队列中的元素被其余线程消费。boolean offer(E e, long timeout, TimeUnit unit)
:向队列尾部写入新的数据,当队列已满时执行该方法的线程在指定的时间单位内将进入阻塞,直到到了指定的超时时间后,或者在此期间有其余线程对队列数据进行了消费。
put() 方法示例数组
public class ArrayBlockingQueueExample1 { public static void main(String[] args) { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3); try { queue.put("class 1"); queue.put("class 2"); queue.put("class 3"); // 超过指定得容量当前线程阻塞 queue.put("class 4"); } catch (InterruptedException e) { e.printStackTrace(); } } }
非阻塞式写方法
当队列已满时写入数据,若是不想使得当前线程进入阻塞,那么就可使用非阻塞式的写操做方法。缓存
boolean add(E e)
:向队列尾部写入新的数据,当队列已满时不会进入阻塞,可是该方法会抛出队列已满的异常。boolean offer(E e)
:向队列尾部写入新的数据,当队列已满时不会进入阻塞,而且会当即返回 false。
add() 方法示例多线程
public class ArrayBlockingQueueExample2 { public static void main(String[] args) { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3); queue.add("class 1"); queue.add("class 2"); queue.add("class 3"); // 超过指定容量 抛出异常 queue.add("class 4"); } } // 抛出异常
阻塞式读方法
E take()
:从队列头部获取数据,而且该数据会从队列头部移除,当队列为空时执行take方法的线程将进入阻塞,直到有其余线程写入新的数据,或者当前线程被执行了中断操做。E poll(long timeout, TimeUnit unit)
:从队列头部获取数据而且该数据会从队列头部移除,若是队列中没有任何元素时则执行该方法,当前线程会阻塞指定的时间,直到在此期间有新的数据写入,或者阻塞的当前线程被其余线程中断,当线程因为超时退出阻塞时,返回值为null。
take() 方法示例并发
public class ArrayBlockingQueueExample3 { public static void main(String[] args) { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3); queue.add("class 1"); queue.add("class 2"); queue.add("class 3"); try { // 取出对头元素 System.out.println(queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } // 队列大小 System.out.println(queue.size()); } } //class 1 // 2
非阻塞式读方法
E poll()
:从队列头部获取数据而且该数据会从队列头部移除,当队列为空时,该方法不会使得当前线程进入阻塞,而是返回null值。E peek()
:当队列为空时,该方法不会使得当前线程进入阻塞,而是返回null值。
public class ArrayBlockingQueueExample4 { public static void main(String[] args) { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3); // 队列无元素 直接返回 null System.out.println(queue.poll( )); System.out.println(queue.peek( )); } } // null // null
部分源码
public void put(E e) throws InterruptedException { // 检查元素 checkNotNull(e); final ReentrantLock lock = this.lock; // 获取锁 lock.lockInterruptibly(); try { // 元素满 一直阻塞,队列非满时,被唤醒 while (count == items.length) notFull.await(); // 入队 enqueue(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 获取锁 lock.lockInterruptibly(); try { // 队列为空 等待 while (count == 0) notEmpty.await(); // 出队 return dequeue(); } finally { lock.unlock(); } }
LinkedBlockingQueue
LinkedBlockingQueue
是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE
。此队列按照先进先出的原则对元素进行排序。
PriorityBlockingQueue
PriorityBlockingQueue
是一个支持优先级的无界阻塞队列。默认状况下元素采起天然顺序升序排列。也能够自定义类实现compareTo()
方法来指定元素排序规则,或者初始化PriorityBlockingQueue
时,指定构造参数Comparator 来对元素进行排序。须要注意的是不能保证同优先级元素的顺序。
public class PriorityBlockingQueueExample1 { public static void main(String[] args) { PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue(); queue.offer(1); queue.offer(12); queue.offer(21); queue.offer(6); // 内部排序 System.out.println(queue.poll()); // 1 System.out.println(queue.poll()); // 6 System.out.println(queue.poll()); // 12 System.out.println(queue.poll()); //21 } }
DelayQueue
DelayQueue
是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue
来实现。队列中的元素必须实现Delayed
接口,在建立元素时能够指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
DelayQueue
很是有用,能够将DelayQueue
运用在如下应用场景。
- 缓存系统的设计:能够用
DelayQueue
保存缓存元素的有效期,使用一个线程循环查询DelayQueue
,一旦能从DelayQueue
中获取元素时,表示缓存有效期到了。 - 定时任务调度:使用
DelayQueue
保存当天将会执行的任务和执行时间,一旦从DelayQueue
中获取到任务就开始执行,好比TimerQueue
就是使用DelayQueue
实现的。
DelayQueue
队列的元素必须实现Delayed
接口。咱们能够参考ScheduledThreadPoolExecutor
里ScheduledFutureTask
类的实现。
public class DelayQueueExample1 { public static void main(String[] args) throws InterruptedException { DelayQueue<DelayedEntry> queue = new DelayQueue<>(); // 延期3秒 处理 queue.put(new DelayedEntry("A", 30000L)); // 延期10 秒处理 queue.add(new DelayedEntry("B", 10000L)); // 延期 20 秒处理 queue.add(new DelayedEntry("C", 20000L)); int size = queue.size(); System.out.println("当前时间是:" + LocalDateTime.now()); // 从延时队列中获取元素, 将输出 A,B,C for (int i = 0; i < size; i++) { System.out.println(queue.take() + " ------ " + LocalDateTime.now()); } } } /** * 继承 Delayed 接口 */ class DelayedEntry implements Delayed { /** * 元素数据内容 */ private final String value; /** * 用于计算失效时间 */ private final long exeTime; DelayedEntry(String value, long exeTime) { this.value = value; this.exeTime = exeTime + System.currentTimeMillis(); } @Override public long getDelay(TimeUnit unit) { return exeTime - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { DelayedEntry t = (DelayedEntry) o; if (this.exeTime < t.exeTime) { return -1; } else if (this.exeTime > t.exeTime) { return 1; } else { return 0; } } @Override public String toString() { return "DelayedEntry{" + "value=" + value + ", exeTime=" + exeTime + '}'; } } //当前时间是:2020-10-15T16:26:37.167 //DelayedEntry{value=B, exeTime=1602750407104} ------ 2020-10-15T16:26:47.117 // DelayedEntry{value=C, exeTime=1602750417104} ------ 2020-10-15T16:26:57.105 //DelayedEntry{value=A, exeTime=1602750427104} ------ 2020-10-15T16:27:07.104
SynchronousQueue
SynchronousQueue
是一个不存储元素的阻塞队列。每个put操做必须等待一个take操做,不然不能继续添加元素。 它支持公平访问队列。默认状况下线程采用非公平性策略访问队列。使用如下构造方法能够建立公平性访问的SynchronousQueue
,若是设置为true,则等待的线程会采用先进先出的顺序访问队列。
LinkedTransferQueue
LinkedTransferQueue
是一个由链表结构组成的无界阻塞TransferQueue
队列。相对于其余阻塞队列,LinkedTransferQueue
多了tryTransfer
和transfer
方法。
-
transfer方法
若是当前有消费者正在等待接收元素(消费者使用
take()
方法或带时间限制的poll()方法时),transfer
方法能够把生产者传入的元素马上transfer
(传输)给消费者。若是没有消费者在等待接收元素,transfer 方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。transfer
方法的关键代码以下 -
tryTransfer方法
tryTransfer
方法是用来试探生产者传入的元素是否能直接传给消费者。若是没有消费者等待接收元素,则返回false。和transfer
方法的区别是tryTransfer
方法不管消费者是否接收,方法当即返回,而transfer
方法是必须等到消费者消费了才返回。 对于带有时间限制的tryTransfer(E e,long timeout,TimeUnit unit)
方法,试图把生产者传入的元素直接传给消费者,可是若是没有消费者消费该元素则等待指定的时间再返回,若是超时还没消费元素,则返回false,若是在超时时间内消费了元素,则返回 true。
LinkedBlockingDeque
LinkedBlockingDeque
是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是能够从队列的两端插入和移出元素。双向队列由于多了一个操做队列的入口,在多线程同时入队时,也就减小了一半的竞争。相比其余的阻塞队列,LinkedBlockingDeque
多了addFirst
、addLast
、offerFirst
、offerLast
、peekFirst
和peekLast
等方法,以First
单词结尾的方法,表示插入、获取(peek)或移除双端队列的第一个元素。以Last
单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。另外,插入方法add
等同于addLast
,移除方法remove
等效于removeFirst
。可是take
方法却等同于takeFirst
,不知道是否是JDK
的 bug,使用时仍是用带有First
和Last
后缀的方法更清楚。 在初始化LinkedBlockingDeque
时能够设置容量防止其过分膨胀。另外,双向阻塞队列能够运用在“工做窃取”模式中。
欢迎关注公众号 山间木匠 , 我是小春哥,从事 Java 后端开发,会一点前端、经过持续输出系列技术文章以文会友,若是本文能为您提供帮助,欢迎你们关注、点赞、分享支持,咱们下期再见!<br />