【Java并发编程】—–“J.U.C”:ArrayBlockingQueue

前言

本文的主要详细分析ArrayBlockingQueue的实现原理,因为该并发集合其底层是使用了java.util.ReentrantLock和java.util.Condition来完成并发控制的,咱们能够经过JDK的源代码更好的学习这些并发控制类的使用,同时该类也是全部并发集合中最简单的一个,分析该类的源码也是为以后分析其余并发集合作好基础。java

1.Queue接口和BlockingQueue接口回顾

1.1 Queue接口回顾

在Queue接口中,除了继承Collection接口中定义的方法外,它还分别额外地定义插入、删除、查询这3个操做,其中每个操做都以两种不一样的形式存在,每一种形式都对应着一个方法。数组

方法说明:安全

操做 抛出异常 返回特殊值
Insert add(e) offer(e)
Remove remove() poll()
Examine element() peek()
  1. add方法在将一个元素插入到队列的尾部时,若是出现队列已经满了,那么就会抛出IllegalStateException,而使用offer方法时,若是队列满了,则添加失败,返回false,但并不会引起异常。
  2. remove方法是获取队列的头部元素而且删除,若是当队列为空时,那么就会抛出NoSuchElementException。而poll在队列为空时,则返回一个null。
  3. element方法是从队列中获取到队列的第一个元素,但不会删除,可是若是队列为空时,那么它就会抛出NoSuchElementException。peek方法与之相似,只是不会抛出异常,而是返回false。

后面咱们在分析ArrayBlockingQueue的方法时,主要也是围绕着这几个方法来进行分析。并发

1.2 BlockingQueue接口回顾

BlockingQueue是JDK1.5出现的接口,它在原来的Queue接口基础上提供了更多的额外功能:当获取队列中的头部元素时,若是队列为空,那么它将会使执行线程处于等待状态;当添加一个元素到队列的尾部时,若是队列已经满了,那么它一样会使执行的线程处于等待状态。工具

前面咱们在说Queue接口时提到过,它针对于相同的操做提供了2种不一样的形式,而BlockingQueue更夸张,针对于相同的操做提供了4种不一样的形式。性能

该四种形式分别为:学习

  • 抛出异常
  • 返回一个特殊值(多是null或者是false,取决于具体的操做)
  • 阻塞当前执行直到其能够继续
  • 当线程被挂起后,等待最大的时间,若是一旦超时,即便该操做依旧没法继续执行,线程也不会再继续等待下去。

对应的方法说明:this

操做 抛出异常 返回特殊值 阻塞 超时
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek()

BlockingQueue虽然比起Queue在操做上提供了更多的支持,可是它在使用的使用也应该以下的几点:spa

  1. BlockingQueue中是不容许添加null的,该接受在声明的时候就要求全部的实现类在接收到一个null的时候,都应该抛出NullPointerException。
  1. BlockingQueue是线程安全的,所以它的全部和队列相关的方法都具备原子性。可是对于那么从Collection接口中继承而来的批量操做方法,好比addAll(Collection e)等方法,BlockingQueue的实现一般没有保证其具备原子性,所以咱们在使用的BlockingQueue,应该尽量地不去使用这些方法。
  2. BlockingQueue主要应用于生产者与消费者的模型中,其元素的添加和获取都是极具规律性的。可是对于remove(Object o)这样的方法,虽然BlockingQueue能够保证元素正确的删除,可是这样的操做会很是响应性能,所以咱们在没有特殊的状况下,也应该避免使用这类方法。

2. ArrayBlockingQueue深刻分析

有了上面的铺垫,下面咱们就能够真正开始分析ArrayBlockingQueue了。在分析以前,首先让咱们看看API对其的描述。
注意:这里使用的JDK版本为1.7,不一样的JDK版本在实现上存在不一样线程

ArrayBlockingQueueAPI说明.png

首先让咱们看下ArrayBlockingQueue的核心组成:

/** 底层维护队列元素的数组 */
    final Object[] items;

    /**  当读取元素时数组的下标(这里称为读下标) */
    int takeIndex;

    /** 添加元素时数组的下标 (这里称为写小标)*/
    int putIndex;

    /** 队列中的元素个数 */
    int count;

    /**用于并发控制的工具类**/
    final ReentrantLock lock;

    /** 控制take操做时是否让线程等待 */
    private final Condition notEmpty;

    /** 控制put操做时是否让线程等待 */
    private final Condition notFull;

take方法分析(369-379行):

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        /*
            尝试获取锁,若是此时锁被其余线程锁占用,那么当前线程就处于Waiting的状态。           
            注意:当方法是支持线程中断响应的若是其余线程此时中断当前线程,
            那么当前线程就会抛出InterruptedException 
         */
        lock.lockInterruptibly();
        try {
            /*
          若是此时队列中的元素个数为0,那么就让当前线程wait,而且释放锁。
          注意:这里使用了while进行重复检查,是为了防止当前线程可能因为  其余未知的缘由被唤醒。
          (一般这种状况被称为"spurious wakeup")
            */    
        while (count == 0)
                notEmpty.await();
            //若是队列不为空,则从队列的头部取元素
            return extract();
        } finally {
             //完成锁的释放
            lock.unlock();
        }
    }

extract方法分析(163-171):

/*
      根据takeIndex来获取当前的元素,而后通知其余等待的线程。
      Call only when holding lock.(只有当前线程已经持有了锁以后,它才能调用该方法)
     */
    private E extract() {
        final Object[] items = this.items;

        //根据takeIndex获取元素,由于元素是一个Object类型的数组,所以它经过cast方法将其转换成泛型。
        E x = this.<E>cast(items[takeIndex]);

        //将当前位置的元素设置为null
        items[takeIndex] = null;

        //而且将takeIndex++,注意:这里由于已经使用了锁,所以inc方法中没有使用到原子操做
        takeIndex = inc(takeIndex);
      
        //将队列中的总的元素减1
        --count;
        //唤醒其余等待的线程
        notFull.signal();
        return x;
    }

put方法分析(318-239)

public void put(E e) throws InterruptedException {
        //首先检查元素是否为空,不然抛出NullPointerException
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //进行锁的抢占
        lock.lockInterruptibly();
        try {
            /*当队列的长度等于数组的长度,此时说明队列已经满了,这里一样
              使用了while来方式当前线程被"伪唤醒"。*/
            while (count == items.length)
                //则让当前线程处于等待状态
                notFull.await();
            //一旦获取到锁而且队列还未满时,则执行insert操做。
            insert(e);
        } finally {
            //完成锁的释放
            lock.unlock();
        }
    }

      //检查元素是否为空
     private static void checkNotNull(Object v) {
        if (v == null)
            throw new NullPointerException();
      }

     //该方法的逻辑很是简单
    private void insert(E x) {
        //将当前元素设置到putIndex位置   
        items[putIndex] = x;
        //让putIndex++
        putIndex = inc(putIndex);
        //将队列的大小加1
        ++count;
        //唤醒其余正在处于等待状态的线程
        notEmpty.signal();
    }

注:ArrayBlockingQueue实际上是一个循环队列
咱们使用一个图来简单说明一下:

黄色表示数组中有元素

1-1.png

 

当再一次执行put的时候,其结果为:

1-2.png

此时放入的元素会从头开始置,咱们经过其incr方法更加清晰的看出其底层的操做:

/**
     * Circularly increment i.
     */
    final int inc(int i) {
        //当takeIndex的值等于数组的长度时,就会从新置为0,这个一个循环递增的过程
        return (++i == items.length) ? 0 : i;
    }

至此,ArrayBlockingQueue的核心部分就分析完了,其他的队列操做基本上都是换汤不换药的,此处再也不一一列举。

做者:码农一枚 连接:https://www.jianshu.com/p/9a652250e0d1 來源:简书 著做权归做者全部。商业转载请联系做者得到受权,非商业转载请注明出处。

相关文章
相关标签/搜索