上篇文章咱们分析了AQS中的同步队列和条件队列,而ArrayBlockingQueue和LinkedBlockingQueue正是基于AQS实现的,若是对AQS和ReentrantLock的条件队列不熟悉的话,建议去看https://juejin.im/post/5c053e546fb9a049fc034924,它与咱们平时接触的LinkedList和ArrayList相比,最大的特色就是:java
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
void put(E e) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
boolean remove(Object o);
}
//除了上述方法还有继承自Queue接口的方法
//获取但不移除此队列的头元素,没有则跑异常NoSuchElementException
E element();
//获取但不移除此队列的头;若是此队列为空,则返回 null。
E peek();
//获取并移除此队列的头,若是此队列为空,则返回 null。
E poll();
复制代码
总结一下:node
这就是阻塞队列基本的增删查方法,接下来咱们看一下如何使用它。 #ArrayBlockingQueue阻塞队列的使用方法 再次回到上一篇文章的场景,基于生产者-消费者,生产者产生烤鸡,消费者消费烤鸡,若是使用ArrayBlockingQueue来实现,会比直接经过condition队列实现简单一些:spring
package com.springsingleton.demo.Chicken;
import java.util.concurrent.ArrayBlockingQueue;
public class ArrayBlockingQueueTest {
//定义吃鸡队列,队列大小是1
private ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
@SuppressWarnings("unchecked")
private void product() {
Chicken chicken = new Chicken();
try {
arrayBlockingQueue.put(chicken);
System.out.println(Thread.currentThread().getName()+" has produced a Chicken");
}catch (InterruptedException e){
System.out.println(e.getMessage());
}
}
private void consume(){
try {
//每次消费前先睡一秒钟
Thread.sleep(1000);
arrayBlockingQueue.take();
System.out.println(Thread.currentThread().getName()+" has eaten a Chicken");
}catch (InterruptedException e){
System.out.println(e.getMessage());
}
}
public static void main(String args[]){
ArrayBlockingQueueTest arrayBlockingQueueTest = new ArrayBlockingQueueTest();
new Thread( ()->{
while (true){
Thread.currentThread().setName("生产者一号");
arrayBlockingQueueTest.product();
}
}
).start();
new Thread( ()->{
while (true){
Thread.currentThread().setName("生产者二号");
arrayBlockingQueueTest.product();
}
}
).start();
new Thread( ()->{
while (true){
Thread.currentThread().setName("吃鸡者一号");
arrayBlockingQueueTest.consume();
}
}
).start();
new Thread( ()->{
while (true){
Thread.currentThread().setName("吃鸡者二号");
arrayBlockingQueueTest.consume();
}
}
).start();
}
}
复制代码
输出以下: 数组
//默认非公平阻塞队列
ArrayBlockingQueue queue = new ArrayBlockingQueue(666);
//公平阻塞队列
ArrayBlockingQueue queue1 = new ArrayBlockingQueue(666,true);
//构造方法源码
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
复制代码
经过构造方法发现:它的内部经过一个ReentrantLock和两个条件队列构成,既然是ReentrantLock,那么就有公平和非公平之分了,不懂ReetrantLock的去看上一篇文章:juejin.im/post/5c021b… ArrayBlockingQueue中的元素存在公平访问与非公平访问的区别,对于公平访问队列,被阻塞的线程能够按照阻塞的前后顺序访问队列,即先阻塞的线程先访问队列。而非公平队列,当队列可用时,阻塞的线程将进入争夺访问资源的竞争中,也就是说谁先抢到谁就执行,没有固定的前后顺序。安全
ArrayBlockingQueue的内部是经过一个可重入锁ReentrantLock和两个Condition条件对象来实现阻塞,这里先看看其内部成员变量bash
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 存储数据的数组 */
final Object[] items;
/**获取数据的索引,主要用于take,poll,peek,remove方法 */
int takeIndex;
/**添加数据的索引,主要用于 put, offer, or add 方法*/
int putIndex;
/** 队列元素的个数 */
int count;
/** 控制并不是访问的锁 */
final ReentrantLock lock;
/**notEmpty条件对象,用于通知take方法队列已有元素,可执行获取操做 */
private final Condition notEmpty;
/**notFull条件对象,用于通知put方法队列未满,可执行添加操做 */
private final Condition notFull;
}
复制代码
ArrayBlockingQueue内部确实是经过数组对象items来存储全部的数据,ArrayBlockingQueue经过一个ReentrantLock来同时控制添加线程与移除线程的并发访问,这点与LinkedBlockingQueue区别很大(稍后会分析)。 notEmpty条件队列则是用于存放等待或唤醒调用take方法的线程,告诉他们队列已有元素,能够执行获取操做。 同理notFull条件对象是用于等待或唤醒调用put方法的线程,告诉它们,队列未满,能够执行添加元素的操做。 takeIndex表明的是下一个方法(take,poll,peek,remove)被调用时获取数组元素的索引,putIndex则表明下一个方法(put, offer, or add)被调用时元素添加到数组中的索引。图示以下并发
咱们先来看看非阻塞的状况,也就是以前总结过得add和offer方法,都是非阻塞的添加到队列,只是一个失败返回fase,另外一个会抛异常:源码分析
//add方法实现,内部间接调用了offer(e)
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
//offer方法
public boolean offer(E e) {
//非空校验
checkNotNull(e);
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
//判断队列是否满
if (count == items.length)
return false;
else {
//入队
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
//入队操做
private void enqueue(E x) {
//获取当前存放数据的数组
final Object[] items = this.items;
//经过putIndex索引对数组进行赋值
items[putIndex] = x;
//索引自增,若是已经是最后一个位置,从新设置 putIndex = 0;
if (++putIndex == items.length)
putIndex = 0;
count++;//队列中元素数量加1
//唤醒调用take()方法的线程,执行元素获取操做。
notEmpty.signal();
}
复制代码
源码很简单:其中须要注意的是enqueue(E x)方法,这个方法内部经过putIndex索引直接将元素添加到数组items中,这里可能会疑惑的是当putIndex索引大小等于数组长度时,须要将putIndex从新设置为0,这是由于当前队列执行元素获取时老是从队列头部获取,而添加元素从中从队列尾部获取因此当队列索引(从0开始)与数组长度相等时,下次咱们就须要从数组头部开始添加了,以下图演示 :post
假设队列总共长度length为5,putindex指向的是最后一个空的array:下标为4 ui
此时元素1被移出:takeindex指向元素2
此时元素5被加入队列:下标为4的putindex自增后刚好等于队列长度5,那么下一次只能从队列头开始添加元素:
接下来咱们看看阻塞添加方法put:
//put方法,阻塞时可中断
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//该方法可中断
try {
//当队列元素个数与数组长度相等时,没法添加元素
while (count == items.length)
//将当前调用线程挂起,添加到notFull条件队列中等待唤醒
notFull.await();
enqueue(e);//若是队列没有满直接添加
} finally {
lock.unlock();
}
}
复制代码
put方法是一个阻塞的方法,若是队列元素已满,那么当前线程将会被notFull条件队列挂起加到条件队列中,直到队列有元素被移出才会唤醒执行添加操做。但若是队列没有满,那么就直接调用enqueue(e)方法将元素加入到数组队列中。
三个添加方法即put,offer,add,其中offer,add在正常状况下都是无阻塞的添加,而put方法是阻塞添加。这就是阻塞队列的添加过程。说白了就是当队列满时经过条件对象Condtion来阻塞当前调用put方法的线程,直到线程又再次被唤醒执行。 为了方便理解,总得来讲put方法的执行存在如下两种状况:
接下来有5个线程经过put方法阻塞入队,他们所有被阻塞,而线程被包装为Node队列存在条件队列中:
此时元素1被移出了,那么会调用notfull.signal方法,唤醒条件队列的WaitNode,waitNode唤醒后,会调用enqueue()方法入队:
一样的,咱们先看非阻塞的移出,poll和remove。 其中:poll(),获取并删除队列头元素,队列没有数据就返回null,内部经过dequeue()方法删除头元素
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//判断队列是否为null
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
//移除队列头元素并返回
private E dequeue() {
//拿到当前数组的数据
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//获取要删除的对象
E x = (E) items[takeIndex];
将数组中takeIndex索引位置设置为null
items[takeIndex] = null;
//takeIndex索引加1并判断是否与数组长度相等,
//若是相等说明已到尽头,恢复为0
if (++takeIndex == items.length)
takeIndex = 0;
count--;//队列个数减1
if (itrs != null)
//同时更新迭代器中的元素数据
itrs.elementDequeued();
//移出了元素说明队列有空位,唤醒notFull条件对象添加线程,执行添加操做
notFull.signal();
return x;
}
复制代码
总结就是加锁以后获取要删除的对象(注意,这里的lock和添加时候的lock是同一个lock,意味着同一时间只能添加或者删除,不能并发执行),以后将数组的takeindex进行处理,并在有空位以后唤醒添加队列的线程执行添加操做,接下来看remove方法:
public boolean remove(Object o) {
if (o == null) return false;
//获取数组数据
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
//若是此时队列不为null,这里是为了防止并发状况
if (count > 0) {
//获取下一个要添加元素时的索引
final int putIndex = this.putIndex;
//获取当前要被删除元素的索引
int i = takeIndex;
//执行循环查找要删除的元素
do {
//找到要删除的元素
if (o.equals(items[i])) {
removeAt(i);//执行删除
return true;//删除成功返回true
}
//当前删除索引执行加1后判断是否与数组长度相等
//若为true,说明索引已到数组尽头,将i设置为0
if (++i == items.length)
i = 0;
} while (i != putIndex);//继承查找
}
return false;
} finally {
lock.unlock();
}
}
//根据索引删除元素,其实是把删除索引以后的元素往前移动一个位置
void removeAt(final int removeIndex) {
final Object[] items = this.items;
//先判断要删除的元素是否为当前队列头元素
if (removeIndex == takeIndex) {
//若是是就简单了:直接删除
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;//队列元素减1
if (itrs != null)
itrs.elementDequeued();//更新迭代器中的数据
} else {
//若是要删除的元素不在队列头部,
//那么只需循环迭代把删除元素后面的全部元素往前移动一个位置
//获取下一个要被添加的元素的索引,做为循环判断结束条件
final int putIndex = this.putIndex;
//执行循环
for (int i = removeIndex;;) {
//获取要删除节点索引的下一个索引
int next = i + 1;
//判断是否已为数组长度,若是是从数组头部(索引为0)开始找
if (next == items.length)
next = 0;
//若是查找的索引不等于要添加元素的索引,说明元素能够再移动
if (next != putIndex) {
items[i] = items[next];//把后一个元素前移覆盖要删除的元
i = next;
} else {
//在removeIndex索引以后的元素都往前移动完毕后清空最后一个元素
items[i] = null;
this.putIndex = i;
break;//结束循环
}
}
count--;//队列元素减1
if (itrs != null)
itrs.removedAt(removeIndex);//更新迭代器数据
}
notFull.signal();//唤醒添加线程
}
复制代码
remove(Object o)方法的删除过程相对复杂些,由于该方法并非直接从队列头部删除元素,而是删除指定的位置。 首先线程先获取锁,再一步判断队列count>0,这点是保证并发状况下删除操做安全执行。接着获取下一个要添加源的索引putIndex以及takeIndex索引 ,做为后续循环的结束判断,由于只要putIndex与takeIndex不相等就说明队列没有结束。而后经过while循环找到要删除的元素索引,执行removeAt(i)方法删除,在removeAt(i)方法中实际上作了两件事:
接着看take()方法,是一个阻塞方法,直接获取队列头元素并删除。
//从队列头部删除,队列没有元素就阻塞,可中断
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//中断
try {
//若是队列没有元素
while (count == 0)
//执行阻塞操做
notEmpty.await();
return dequeue();//若是队列有元素执行删除操做
} finally {
lock.unlock();
}
}
复制代码
take方法其实很简单,有就删除没有就阻塞,注意这个阻塞是能够中断的,若是队列没有数据那么就加入notEmpty条件队列等待(有数据就直接取走,dequeue方法以前分析过了),若是有新的put线程添加了数据,那么put操做将会唤醒take线程,执行take操做。图示以下 假设队列全空时:
ArrayBlockingQueue内部经过一把锁ReentrantLock和两个AQS条件队列实现了阻塞的入队和删除: