Java并发编程----ArrayBlockingQueue源码分析

1、什么是阻塞队列?

刚一听到阻塞队列,就以为它很是地高大上,很是地难!其实否则!为何?由于当你有一点基本的数据结构基础再看阻塞队列的定义以后你就会发现就那么回事。好了,言归正传,队列?无非就是一种具备先进先出(FIFO)特性的数据结构嘛!其最基本的操做是入队出队java

那上面是阻塞队列呢?咱们来看下关于它的一番定义:算法

阻塞队列(BlockingQueue)是一个支持两个附加操做的一种特殊队列。这两个附加的操做是:编程

  • 在队列为空时,获取元素的线程会等待队列变为非空。
  • 当队列满时,存储元素的线程会等待队列可用。

阻塞队列常常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素,所谓容器,就是咱们以前文章讲到的临界区,是为了将生产者和消费者进行解耦而加入的。数组

那么咱们就要开始问了,他的基本操做是怎样的呢?怎么实现队列的阻塞呢?下面是Java中阻塞队列支持的相关操做:安全

方法 抛出异常 返回特殊值 一直阻塞 超时退出
入队方法 add(e) offer(e) put(e) offer(e,time,unit)
出队方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用

从上表咱们能够看出put()take()方法当队列满或为空的状况下会一直阻塞,阻塞队列会提供对这两个操做的支持。bash

接下来我将列出 JDK 中对阻塞队列的相关实现,并见到那挑选其中的某个实现进行源码分析。数据结构

JDK中阻塞队列有如下实现:多线程

ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
复制代码

对了,小编说说我对有界和无界的理解,我也不清楚对仍是不对,不对的话麻烦你评论告诉小编,灰常感谢!并发

从实现方面讲:框架

有界 : 指的是实现里头持有的资源(数组)是有大小的,即容量是有限的

无界 : 指的是持有一个无界的链表

从访问方式看:

无界 : 指的是不拒绝某些线程的访问

有界 : 指的是拒绝某些线程的访问

2、JDK阻塞队列简单分析

ArrayBlockingQueue.java

对于阻塞队列的学习,咱们要时常在脑子里模拟并发对其操做的场景。让咱们先来看看ArrayBlockingQueue中声明的相关成员变量:

/** 存放队列元素的数组 */
    final Object[] items;

    /** 下一次调用 take, poll, peek 或者 remove 时元素的下标,队头指针 */
    int takeIndex;

    /** 下一次调用 put, offer, 或者 add 方法时元素的下标,队尾指针*/
    int putIndex;

    /** 队列元素的大小,至关于ArrayList中的size 
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.经典的双条件算法
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** 判断是否为空的条件变量,用来表示队列空与非空的状态 */
    private final Condition notEmpty;

    /** 判断是否满了条件变量 用来表示队列满或没满的状态 */
    private final Condition notFull;

    /**
     * 当前活动迭代器的共享状态,或者若是已知不存在,则返回null。 容许队列操做更新迭代器状态。
     * Shared state for currently active iterators, or null if there
     * are known not to be any.  Allows queue operations to update
     * iterator state.
     */
    //在迭代器和它们的队列之间共享数据,容许在删除元素时修改队列以更新迭代器
    transient Itrs itrs = null;
复制代码

对于存储数据元素的字段items,为何声明为Object[],而不声明为E[],能够查看小编另一篇文章Java 集合 ArrayList 源代码分析(带着问题看源码)

从上面能够看出,ArrayBlockingQueue拥有一个存储元素的数组items及其相关的出队入队指针及队列容量大小count,这些都是最基本的属性。再往下看能够看出队列中使用了经典的双条件算法,即拥有两个条件变量Condition类型的变量,Condition是JDK提供的在基本同步方法notify()、wait()、notifyAll()的基础上进行优化的工具类,它提供了代替wait(),notify()等方法的相应版本await()、signal()方法。通常来讲,Condition的使用通常结合一个锁来实现,ArrayBlockingQueue中使用了可重入锁ReentrantLock,即经典的一锁双条件

若是还不理解,想一想咱们在生产者和消费者文中代码里写的,在调用notify(),wait()等方法时必须先在synchnorized{}同步块下得到锁,道理是同样的,你调用await()、signal()的时候也须要进行lock()得到锁)

并且ConditionReentrantLock都是不可变的,final修饰,多线程安全啦!

知道了其成员变量,咱们再来看看其相应的构造方法:

/**
     * 使用给定的容量大小和默认的存取策略(FIFO)初始化一个ArrayBlockingQueue
     */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    /**
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity and the specified access policy.
     * 使用给定的容量大小和给定的存取策略初始化一个ArrayBlockingQueue
     * @param capacity the capacity of this queue
     * @param fair if {@code true} then queue accesses for threads blocked
     *        on insertion or removal, are processed in FIFO order;
     *        if {@code false} the access order is unspecified.(也就是下一步得到锁的还不指定是谁)
     * @throws IllegalArgumentException if {@code capacity < 1}
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        /**
         * 你容量不能为负数吧!
         */
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        //你初始化ArrayBlockingQueue的时候也要初始化你的成员变量吧!一所双条件很重要啊!
        lock = new ReentrantLock(fair);
        //Condition对象由锁来进行建立
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    /**
     * 经过给定的容量大小、存取策略,使用给定的Collection来初始化数据
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity, the specified access policy and initially containing the
     * elements of the given collection,
     * added in traversal order of the collection's iterator. */ public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { /** * 这一步同上 */ this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // 这里只是为了可见性,而不是指相互排斥,想一想你刚建立这个对象,哪有什么互相排斥嘛! try { int i = 0; try { for (E e : c) { /** * 先检查后操做机制,是一种很好的编程规范 */ checkNotNull(e); items[i++] = e; } /** * 通常来讲,c的大小是小于等于capacity的,不然报错了 */ } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } } 复制代码

从其构造器能够看出,咱们能够在初始化时指定一个容量大小,也能够经过传入一个Collection来初始化数据。同时咱们也能够看出,在最后一个构造方法中使用了checkNotNull()方法,其实这是一种颇有用的机制,优秀的框架通常都会这样子去写,好比Spring的Asserts.java,这也是一种断言机制,就是说咱们很肯定程序到达这一步必定是正确的,固然,若是不正确,那么确定抛出异常啦!下面咱们看看各类入队和出队的操做吧!

入队

  • put()
/**
     * 典型的生产者嘛!插入一个元素到尾部,一直等到(阻塞)直到已经满的队列变为非满状态
     */
    public void put(E e) throws InterruptedException {
        /**
         * 先检查后操做,若是是空,就抛出异常
         */
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        /**
         * 得到lock的锁,除非当前线程被中断了,也就是说当前的线程若是被中断,咱们连锁都得不到,还抛出可怕的异常
         */
        lock.lockInterruptibly();
        try {
            /**
             * 若是队列满了,确定得阻塞嘛!难道满了还加?还阻不阻塞了
             */
            while (count == items.length)
            /**
             * 当调用await方法后,当前线程会释放lock锁并进入Condition变量的等待队列,而其余线程调用signal方法后,通知正在Condition变量等待队列的线程从await方法返回,而且在返回前已经得到了锁。
             */
                notFull.await();
            //若是不满,那么就入队
            enqueue(e);
        } finally {
            lock.unlock();//解锁
        }
    }
复制代码

put()方法很显然就是典型生产者消费者模型中的生产者角色。只不过当满了的时候是经过调用await()的方法阻塞当前线程且释放锁,被阻塞的当前线程将进入Condition对象提供的等待队列中去排队,直到有元素从阻塞队列出队时,会调用notFull.signal()唤醒线程。

take()

public E take() throws InterruptedException {
        /**
         * 先加锁
         */
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            /**
             * 若是队列是空,那取个啥子,直接阻塞
             */
            while (count == 0)
                notEmpty.await();
                //出队
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
复制代码

能够看出出队的操做很是地简单粗暴,下面咱们再看看两个经常使用的内部方法:

enqueue()

/**
     * 真正的入队操做,能执行到此方法,说明你已经得到锁了,且当前线程符合生产者消费者模型的要求(即put时未满,take时非空)
     */
    private void enqueue(E x) {
        final Object[] items = this.items;
        //还记得putIndex指的是什么吗?指的就是下一个能够入队的元素下标
        items[putIndex] = x;
        //改变相应的下标,这可能一眼看不懂,须要画图,实际上是一个循环队列来着
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        //通知阻塞的线程前来消费
        notEmpty.signal();
    }
复制代码

dequeue()

/**
     * 真正的出队操做,能执行到此方法,说明你已经得到锁了,且当前线程符合生产者消费者模型的要求(即put时未满,take时非空)
     */
    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        //置为null
        items[takeIndex] = null;
        //改变相应的下标,这可能一眼看不懂,须要画图,实际上是一个循环队列来着
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        //防止出队致使其余线程迭代失败的操做
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }
复制代码

好了,源码分析就到这里啦!

相关文章
相关标签/搜索