阻塞队列之ArrayBlockingQueue(ABQ)JDK1-8实现原理详解

以前有一篇关于关于java对阻塞队列提供了七种实现,若是有兴趣,能够去看一下 写得还算很全: www.jianshu.com/p/9e4eca735…java

今天我就想专门去研究下两个比较基础的ArrayBlockingQueue的源码实现以及实现原理。算法

关于阻塞队列的实现原理:数组

若是队列是空的,消费者会一直等待,当生产者添加元素时,消费者是如何知道当前队列有元素的呢?若是让你来设计阻塞队列你会如何设计,如何让生产者和消费者进行高效率的通讯呢?在jdk中通常是使用通知模式,我更想理解成观察者模式。就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。经过查看JDK源码发现ArrayBlockingQueue使用了Condition来实现。bash

先来看看构造函数有哪些?函数

ArrayBlockingQueue

public ArrayBlockingQueue(int capacity) { 
  this(capacity, false);//默认构造非公平锁的阻塞队列 
} 

public ArrayBlockingQueue(int capacity, boolean fair) { 
&emsp;&emsp;if (capacity <= 0)  
&emsp;&emsp;&emsp;&emsp;throw new IllegalArgumentException(); 
&emsp;&emsp;this.items = new Object[capacity]; 
&emsp;&emsp;lock = new ReentrantLock(fair);//初始化ReentrantLock重入锁,出队入队拥有这同一个锁 
&emsp;&emsp;notEmpty = lock.newCondition;//初始化非空等待队列
&emsp;&emsp;notFull = lock.newCondition;//初始化非满等待队列 
} 
public ArrayBlockingQueue(int capacity, boolean fair, Collecation<? extends E> c) { 
&emsp;&emsp;this(capacity, fair); 
&emsp;&emsp;final ReentrantLock lock = this.lock; 
&emsp;&emsp;lock.lock();//注意在这个地方须要得到锁,由于指定加入一个集合 ,必须等加入集合完成后,才能继续操做
&emsp;&emsp;try { 
&emsp;&emsp;&emsp;&emsp;int i = 0; 
&emsp;&emsp;&emsp;&emsp;try { 
&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;for (E e : c) { 
&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;checkNotNull(e); 
&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;item[i++] = e;//将集合添加进数组构成的队列中 
&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;} 
&emsp;&emsp;&emsp;&emsp;} catch (ArrayIndexOutOfBoundsException ex) { 
&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;throw new IllegalArgumentException(); 
&emsp;&emsp;&emsp;&emsp;} 
&emsp;&emsp;&emsp;&emsp;count = i;//队列中的实际数据数量 
&emsp;&emsp;&emsp;&emsp;putIndex = (i == capacity) ? 0 : i; 
&emsp;&emsp;} finally { 
&emsp;&emsp;&emsp;&emsp;lock.unlock(); 
&emsp;&emsp;} 
}
复制代码

在看一些基础属性:ui

final Object[] items;  //AQS实现是使用对象数组存放数据

    int takeIndex;        //出队列对应的索引

    int putIndex;    //下一个入队列的索引

    int count;    //队列总长度

    final ReentrantLock lock;     //可重入锁,用于入队列和出队列时加锁。

    private final Condition notEmpty; //这里是队列为null的时候,不能执行出队列操做
//这时候须要阻塞,若是有一个入队列操做,完成后,队列不为null了,须要通知唤醒出队列线程。
//这个设计是针对第四种或第三种处理,这个能够理解为消费者。

      //跟上面相反,当入队列为满时,若是是第四种或第三种处理,则先阻塞入队列操做,
//而后若是有一个出队列操做完成后,就唤醒出队列线程,也就是该condition。
    private final Condition notFull;

总结下: 

 notEmpty  能够理解为消费者线程。只有队列里有元素,才唤醒,执行出队列。

notEmpty  能够理解为生产者线程。只有队列里元素不是满的,才唤醒,执行入队列。
复制代码

而后咱们知道,对于阻塞队列,当入队列时,队列满,等别有四种处理,java在ArrayBlockingQueue类中,针对每种处理分别提供给咱们四种方法对应不一样的处理:this

下面分别针对几种入队列队列满的处理分析下:spa

1.add(e) 抛异常线程

public boolean add(E e) {
        return super.add(e);
    }

public boolean add(E e) {
//看这里,调用quque中的offer来判断队列是否满,offer在接口中,未实现,因此仍是在     
//子类ArrayBlockingQueue中重写的。咱们来看offer 
      if (offer(e))   
                 ArrayBlockingQueue
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

public boolean offer(E e) {
//判断,ABQ是不能加入null的
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();  //当须要判断队列长度时而且加入元素时,须要加锁。
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);   //看下面分析
                return true;
            }
        } finally {    //释放锁
            lock.unlock();
        }
    }

 private void enqueue(E x) {    
        final Object[] items = this.items;  
        items[putIndex] = x;   //把x加入数组的下一个索引的位置
        if (++putIndex == items.length)   //若是队列满,则putIndex置空。
            putIndex = 0;
        count++;
        notEmpty.signal();    //唤醒消费者线程消费队列里的元素。   
    }
复制代码

入队列的第一种第二种处理咱们已经看完了,下面看看第三种处理。设计

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();     //请求相应中断的锁,这里是调用可重入锁重写的acquireInterruptibly(1),方法
        try {
            //若是队列满,则自旋。
            while (count == items.length) {
                //区别: 若是超过指定时间,返回false
                if (nanos <= 0)
                    return false;
   //使当前线程等待直到发出信号或中断,或指定的等待时间过去。
                nanos = notFull.awaitNanos(nanos);  //此线程还持有该锁的时间
            }
            enqueue(e);  //若是队列不满,则执行入队列操做。
            return true;
        } finally {
            lock.unlock();
        }
    }
复制代码

第四种入队列满处理的逻辑 put,上面有看,应该懂了

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();   //队列满则阻塞生产者线程,也能够理解为入队列线程
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
复制代码

而后根据出队列的四种处理,咱们一样有如下的四种处理:

第一种处理:

//AbstractQueue#remove,这也是一个模板方法,定义删除队列元素的算法骨架,队列中元素时返回具体元素,元素为空时抛出异常,具体实现poll由子类实现, 
public E remove() { 
&emsp;&emsp;E x = poll();//poll方法由Queue接口定义 
&emsp;&emsp;if (x != null) 
&emsp;&emsp;&emsp;&emsp;return x; 
&emsp;&emsp;else 
&emsp;&emsp;&emsp;&emsp;throw new NoSuchElementException(); 
}

public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();   
        try {
            return (count == 0) ? null : dequeue();  //dequeue  出队列操做
        } finally {
            lock.unlock();
        }
    }

private E dequeue() {
        final Object[] items = this.items;
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)   //这里是迭代器相关的处理。
            itrs.elementDequeued();
        notFull.signal();     //出队列后,唤醒入队列线程
        return x;
    }
复制代码

第二种处理 poll()//队列不为空时返回队首值并移除;队列为空时返回null。非阻塞当即返回。见上面

第三种处理: poll(time, unit)//设定等待的时间,若是在指定时间内队列还未孔则返回null,不为空则返回队首值

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {  //若是队列尾null,自旋
                if (nanos <= 0)   //判断时间是否超时,ture返回null
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();  //出队列
        } finally {
            lock.unlock();
        }
    }
复制代码

第四种处理 take(e)//队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)   //若是队列为nul,自旋阻塞出队列线程。直到不为null
                notEmpty.await();
            return dequeue();   //出队列
        } finally {
            lock.unlock();
        }
    }
复制代码

总结:

从以上分析,能够看出出队列和入队列操做方法的第四种处理主要是经过条件的通知机制来完成可阻塞式的插入数据和获取数据。在理解ArrayBlockingQueue后再去理解的LinkedBlockingQueue就很容易了。

待续。。。

相关文章
相关标签/搜索