JAVA concurrency -- 阻塞队列ArrayBlockingQueue源码详解

概述

ArrayBlockingQueue顾名思义,使用数组实现的阻塞队列。今天咱们就来详细讲述下他的代码实现数组

阻塞队列

什么是阻塞队列?缓存

阻塞队列是一种特殊的队列,使用场景为并发环境下。在某种状况下(当线程没法获取锁的时候)线程会被挂起而且在队列中等待,若是条件具有(锁被释放)那么就会唤醒挂起的线程。并发

通俗点来说的话,阻塞队列相似于理发店的等待区,当没有理发师空闲的时候,客人会在等待区等待,一旦有了空闲,就会有人自动递补。app

类的继承关系

ArrayBlockingQueue继承了抽象队列,而且实现了阻塞队列,所以它具有队列的全部基本特性。ide

基本实现原理

ArrayBlockingQueue的实现是基于ReentrantLock以及AQS内部实现的锁机制以及Condition机制。 ArrayBlockingQueue内部声明了两个Condition变量,一个叫notEmpty,一个叫notFull,当有数据加入队列时尝试唤醒notEmpty,当有数据移除队列时则唤醒notFull,从而实现一个相似于生产者消费者模型的机制。函数

源码分析

类成员变量

    // 队列的存储对象数组
    final Object[] items;    // 下一个取出的序号
    int takeIndex;    // 下一个放入队列的序号
    int putIndex;    // 队列中的元素数目
    int count;    // 锁以及用来控制队列的两个条件变量
    final ReentrantLock lock;    private final Condition notEmpty;    private final Condition notFull;    transient Itrs itrs = null;

构造函数

    public ArrayBlockingQueue(int capacity) {        this(capacity, false);
    }    
    // 通用的构造函数,以容量和是否公平锁为参数,余下两个构造函数均调用此函数
    public ArrayBlockingQueue(int capacity, boolean fair) {        if (capacity <= 0)            throw new IllegalArgumentException();        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {        // 调用构造函数
        this(capacity, fair);        // 为阻塞队列初始化数据(此操做须要上锁)
        final ReentrantLock lock = this.lock;
        lock.lock();        try {            int i = 0;            try {                // 将集合中的数据存放到数组中而且进行判空操做
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {                throw new IllegalArgumentException();
            }            // 修改count和putIndex的值
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

这里有一点疑问,这里明明是构造函数,是类初始化的地方,照理来讲不会产生竞争,为何要进行加锁操做呢?此处本来有一句原版的注释 Lock only for visibility, not mutual exclusion 锁是为了可见性而不是互斥。这句话怎么理解呢?咱们仔细观察代码,发现当咱们把集合中的数据所有插入队列中以后,咱们会修改相应的count以及putIndex的数值,可是若是咱们没有加锁,那么在集合插入完成前count以及putIndex没有完成初始化操做的时候若是有其余线程进行了插入等操做的话,会形成数据同步问题从而使得数据不许确,所以这里的锁是必要的。源码分析

队列操做

基础队列操做enqueue和dequeue

    // 队列的插入操做
    private void enqueue(E x) {        // 本地声明一个item数组的引用
        final Object[] items = this.items;        // 将元素放入数组中
        items[putIndex] = x;        // 若是此时已经到了数组的末尾了,将putIndex重置为0
        if (++putIndex == items.length)
            putIndex = 0;        // 元素数目加1
        count++;        // 发出通知告诉全部取数据的线程能够取数据
        notEmpty.signal();
    }    // 队列的移除操做
    private E dequeue() {        final Object[] items = this.items;        @SuppressWarnings("unchecked")        // 找到要移除的数据置空
        E x = (E) items[takeIndex];
        items[takeIndex] = null;        // 若是此时已经到了数组的末尾了,将takeIndex重置为0
        if (++takeIndex == items.length)
            takeIndex = 0;        // 元素数目减1
        count--;        // 迭代器操做,这个以后再说
        if (itrs != null)
            itrs.elementDequeued();        // 发出通知告知插入线程能够工做
        notFull.signal();        return x;
    }

这两个方法是队列操做的基本方法,基本上就是常规的数组数据插入移除,只是有一点很让人困惑 final Object[] items = this.items; 这段代码实现将类成员对象在本地建立了一个引用,而后在本地使用引用进行操做,为何要画蛇添足呢?除此以外,代码中大量用到了这种手法,例如: final ReentrantLock lock = this.lock; 这又是为了什么呢?对此笔者猜想多是和优化相关,由于jdk7中的实现与之不一样,是使用的类变量直接操做。在进行了资料查阅后,笔者找到了一个相对靠谱的解释:郑州市不孕不育医院:http://jbk.39.net/yiyuanfengcai/tsyl_zztjyy/3033/优化

这是ArrayBlockingQueue的做者Doug Lea的习惯,他认为这种书写习惯是对机器更加友好的书写this

固然也有一些大神有一些其余的解释:spa

final自己是不可变的,可是因为反射以及序列化操做的存在,final的不可变性就变得捉摸不定,除此以外一些编译器层面上在final上优化的不够好,致使会在使用到数据的时候反复重载致使缓存失效

但愿你们能够本身认真思考下,而后尝试下,获得本身的结论。

阻塞队列的插入操做

    public boolean offer(E e) {
        checkNotNull(e);        final ReentrantLock lock = this.lock;
        lock.lock();        try {            // 若是阻塞队列已满,那么插入失败
            if (count == items.length)                return false;            else {                // 不然插入成功
                enqueue(e);                return true;
            }
        } finally {
            lock.unlock();
        }
    }    public void put(E e) throws InterruptedException {
        checkNotNull(e);        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();        try {            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }    public boolean add(E e) {        if (offer(e))            return true;        else
            throw new IllegalStateException("Queue full");
    }

阻塞队列插入操做大体就以上几种,这几种的区别在代码中也体现得比较清楚了:

  1. offer返回的是布尔值,插入成功返回true不然(队列已满)返回false

  2. put没有返回值,假如队列是满的,他会一直阻塞直到队列为空的时候执行插入操做

  3. add实际上调用的就是offer,只是他在加入失败后会抛出异常

阻塞队列的移除操做

    public E poll() {        final ReentrantLock lock = this.lock;
        lock.lock();        try {            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }    public E take() throws InterruptedException {        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();        try {            while (count == 0)
                notEmpty.await();            return dequeue();
        } finally {
            lock.unlock();
        }
    }    public E peek() {        final ReentrantLock lock = this.lock;
        lock.lock();        try {            return itemAt(takeIndex);
        } finally {
            lock.unlock();
        }
    }
  1. poll执行成功会返回队列元素,若是队列为空则直接返回null

  2. take执行成功会返回队列元素,可是若是队列为空他不会返回而是等待有数据插入,而后取出

  3. peek则是直接获取队列元素,而且执行后不会将元素从队列中删除

迭代器实现

因为迭代器和内部队列共享数据,再加上阻塞队列的特性,致使为了实现迭代器功能,须要新增一些很复杂的代码实现。

内部声明了两个类来实现迭代器,一个是Itr继承Iterator<E>,一个则是Itrs

Itrs

Itrs是用来管理迭代器的。因为阻塞队列内部可能会有多个迭代器在同时工做,在迭代器内部发生删除或者是一些不常见的操做时可能会产生一些问题,好比他们会丢失本身的数据之类的。因此Itrs内部会维护一个变量用于记录循环的圈数,而且在删除操做removeAt的时候会通知全部的迭代器。

    class Itrs {        // 建立一个Node类做为单向链表(节点是弱引用)来管理迭代器
        private class Node extends WeakReference<Itr> {
            Node next;

            Node(Itr iterator, Node next) {                super(iterator);                this.next = next;
            }
        }        // 循环圈数
        int cycles = 0;        // 链表头
        private Node head;        // 清理相关的变量
        private Node sweeper = null;        private static final int SHORT_SWEEP_PROBES = 4;        private static final int LONG_SWEEP_PROBES = 16;

        Itrs(Itr initial) {
            register(initial);
        }        // 清理无效的迭代器(若是sweeper为空,则从头开始,不然从sweeper记录的节点开始)
        void doSomeSweeping(boolean tryHarder) {
            
        }        // 新增长一个迭代器
        void register(Itr itr) {
            head = new Node(itr, head);
        }        // 当takeIndex为0时调用此方法
        void takeIndexWrapped() {            // cycle数+1,内部实现通知全部迭代器并进行清理(链表遍历)
        }        // 有移除操做的时候调用此方法,并通知全部迭代器进行清理
        void removedAt(int removedIndex) {            // 简单的链表遍历,内部调用Itr的removedAt方法
        }        // 当发现队列为空的时候调用此方法,清理迭代器内的弱引用
        void queueIsEmpty() {
            
        }        // 有元素被取时是调用
        void elementDequeued() {            // 若是数组为空调用queueIsEmpty进行清理
            if (count == 0)
                queueIsEmpty();            // 若是takeIndex为0,调用takeIndexWrapped,来进行循环+1操做
            else if (takeIndex == 0)
                takeIndexWrapped();
        }
    }

Itr

Itrs是管理迭代器的,Itr则是迭代器的具体实现

    private class Itr implements Iterator<E> {        // 游标,用于寻找下一个元素
        private int cursor;        // 下一个元素
        private E nextItem;        // 下一个元素的下标
        private int nextIndex;        // 上一个元素
        private E lastItem;        // 上一个元素的下标
        private int lastRet;        // 上一个take的下标
        private int prevTakeIndex;        // 上一个循环
        private int prevCycles;        // 标记为空
        private static final int NONE = -1;        // 删除标记
        private static final int REMOVED = -2;        // DETACH标记专用于prevTakeIndex
        private static final int DETACHED = -3;

        Itr() {            // 这是构造函数,内部实现主要是初始化为主,
            // 而且在Itrs不为空的时候进行一波清理操做
        }        boolean isDetached() {            return prevTakeIndex < 0;
        }        private int incCursor(int index) {            // 游标+1,并从新计算值(判断是否走完一个循环,是否等于putIndex)
            if (++index == items.length)
                index = 0;            if (index == putIndex)
                index = NONE;            return index;
        }        // 判断给的删除数是不是有效值
        private boolean invalidated(int index, int prevTakeIndex,                                    long dequeues, int length) {
            
        }        // 计算在迭代器的上一次操做后全部的删除(出队)操做
        private void incorporateDequeues() {            // 主要方法为经过当前圈数和以前的圈数以及偏移量计算
            // 真实的删除数,而且和prevTakeIndex以及index的偏移量进行比较
        }        // 进行detach操做并进行清理
        private void detach() {
            
        }        // 判断是否有下一个节点
        public boolean hasNext() {
            
        }        // 没有下一个节点(没有detach的节点将会被执行detach操做)
        private void noNext() {
            
        }        // 找到下个节点
        public E next() {            // 实现不复杂,主要是须要判断节点是不是detach模式
        }        // 删除节点
        public void remove() {
            
        }        // 当队列为空或者后续很难找到下个节点的时候通知迭代器
        void shutdown() {
            
        }        // 辅助计算游标和prevTakeIndex之间的距离
        private int distance(int index, int prevTakeIndex, int length) {
            
        }        // 删除节点
        boolean removedAt(int removedIndex) {
            
        }        // 当takeIndex归0时调用
        boolean takeIndexWrapped() {
            
        }
    }

总结

ArrayBlockingQueue的实现能够说是比较的简单清晰,主要是利用了ReentrantLock内部的Condition,经过设置两个条件来巧妙地完成阻塞队列的实现,只要可以理解这两个条件的工做原理,源码的理解就没有太大的难度。ArrayBlockingQueue较难理解的反而是它内部的迭代器,因为阻塞队列的特性,他的迭代器可能会有丢失当前数据的风险,所以,做者创做的时候加入了许多复杂的方法来保证可靠性,可是在这里因为篇幅限制,以及迭代器在阻塞队列中的地位和重要性并不高,因此简单讲述,若是有兴趣能够本身找一份源码阅读。郑州不孕不育医院:http://mobile.03913882333.com/

相关文章
相关标签/搜索