java多线程系列(九)---ArrayBlockingQueue源码分析

java多线程系列(九)---ArrayBlockingQueue源码分析

目录

成员变量

  • 数组大小
final Object[] items;
  • 下一个进队元素的下标
/** items index for next take, poll, peek or remove */
    int takeIndex;
  • 下一个出队元素的下标
/** items index for next put, offer, or add */
    int putIndex;
  • 队列中元素的数目
/** Number of elements in the queue */
    int count;
  • 出队和入队须要的锁
/*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;
  • 出队条件
/** Condition for waiting takes */
    private final Condition notEmpty;
  • 入队条件
/** Condition for waiting puts */
    private final Condition notFull;

构造方法

  • 配置容量,先建立是否公平公平访问
public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
  • 从源码能够看到,建立一个object数组,而后建立一个公平或非公平锁,而后建立出队条件和入队条件

offer方法

public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
  • 首先检查是否为null
  • 而后lock锁住
  • 若是当前数目count已经为初始的时候容量,这时候本身返回false
  • 不然的话执行enqueue方法
private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
  • 将新的元素添加到数组的下一个进队的位置
  • 而后notEmpty出队条件唤醒,这个时候能够进行出队
  • 执行enqueue后而后释放锁

指定时间的offer方法

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }
  • 方法大体和前面的一致,不一样的时候是当队列满的时候,会等待一段时间,此时入队条件等待一段时间,一段时间后继续进入循环进行判断队列还满
  • 当队列不满的时候执行enqueue

add方法

  • 调用父类的add方法
public boolean add(E e) {
        return super.add(e);
    }
  • 父类AbstractQueue的add方法
public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
  • 执行offer方法,这个时候能够对比上面直接调用offer,offer方法若是入队失败会直接返回false,而add方法会抛出异常

put方法

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
  • 和限定时间的offer方法不一样,当队列满的时候,会一直等待,直到有人唤醒

poll方法

public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
  • 首先执行lock方法锁定
  • 若是当前队中无元素,那么返回null,不然执行dequeue方法
private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }
  • 根据出队下标取出元素,而后将该位置置为null
  • 将出队下标加一,若是出队下标等于了数组的大小,出队下标置为0
  • 队中元素数量减一
  • notFull唤醒,此时能够唤醒入队阻塞的线程

指定时间的poll方法

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
  • 与offer的指定时间和没有指定时间相似,poll指定时间的方法和没有指定时间的poll思路大体是同样的
  • 当此时队列为空的,为等待一段时间,而后自动唤醒,继续进入循环,直到队列中有元素,而后执行dequeue方法

take方法

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
  • 和前面指定时间的poll方法不一样,当队中为空的时候,会一直等待,直到被唤醒

总结

  • 入队
方法 特色
offer(E e) 队列满的时候,返回false
offer(E e, long timeout, TimeUnit unit) 队列满的时候,等待一段时间,释放锁,一段时间后,进入就绪状态
add(E e) 队列满的时候,直接抛出异常
put(E e) 队列慢的时候,线程阻塞,直到被唤醒
  • 出队
方法 特色
poll() 队列为空的时候,直接返回null
poll(long timeout, TimeUnit unit) 队列为空的时候,等待一段时间,释放锁,一段时候后,进入就绪状态
take() 队列为空的时候,一直等待,释放锁,直到被唤醒
  • 整体的设计思路,经过一个数组来模拟一个数组,出队和入队都是同步的,也就是同一时间只能有一个入队或者出队操做,而后在入队的时候,若是队列已满的话,根据方法的不一样有不一样的策略,能够直接返回或者抛出异常,也能够阻塞一段时间,等会在尝试入队,或者直接阻塞,直到有人唤醒。而出队的时候,若是为空能够直接返回,也能够等待一段时间而后再次尝试,也能够阻塞,直到有人唤醒

我以为分享是一种精神,分享是个人乐趣所在,不是说我以为我讲得必定是对的,我讲得可能不少是不对的,可是我但愿我讲的东西是我人生的体验和思考,是给不少人反思,也许给你一秒钟、半秒钟,哪怕说一句话有点道理,引起本身心里的感触,这就是我最大的价值。(这是我喜欢的一句话,也是我写博客的初衷)

做者:jiajun 出处: http://www.cnblogs.com/-new/
本文版权归做者和博客园共有,欢迎转载,但未经做者赞成必须保留此段声明,且在文章页面明显位置给出原文链接,不然保留追究法律责任的权利。若是以为还有帮助的话,能够点一下右下角的【推荐】,但愿可以持续的为你们带来好的技术文章!想跟我一块儿进步么?那就【关注】我吧。html

相关文章
相关标签/搜索