ArrayBlockQueue源码解析

清明节和朋友去被抖音带火的一个餐厅,下午两点钟取晚上的号,前面已经有十几桌了,四点半餐厅开始正式营业,等轮到咱们已经近八点了。餐厅分为几个区域,只有最火的区域(在小船上)须要排号,其余区域基本上是随到随吃的,最冷清的区域几乎都没什么人。菜的价格异常的贵,味道也并很差。最后送出两张图: 数组

image.png
image.png

好了,进入今天的正题,今天要讲的是ArrayBlockQueue,ArrayBlockQueue是JUC提供的线程安全的有界的阻塞队列,一看到Array,第一反应:这货确定和数组有关,既然是数组,那天然是有界的了,咱们先来看看ArrayBlockQueue的基本使用方法,而后再看看ArrayBlockQueue的源码。安全

ArrayBlockQueue基本使用

public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<Integer> arrayBlockingQueue=new ArrayBlockingQueue(5);
        arrayBlockingQueue.offer(10);
        arrayBlockingQueue.offer(50);
        arrayBlockingQueue.add(20);
        arrayBlockingQueue.add(60);
        System.out.println(arrayBlockingQueue);

        System.out.println(arrayBlockingQueue.poll());
        System.out.println(arrayBlockingQueue);

        System.out.println(arrayBlockingQueue.take());
        System.out.println(arrayBlockingQueue);

        System.out.println(arrayBlockingQueue.peek());
        System.out.println(arrayBlockingQueue);
    }
复制代码

运行结果: bash

image.png

  1. 建立了一个长度为5的ArrayBlockQueue。
  2. 用offer方法,向ArrayBlockQueue添加了两个元素,分别是10,50。
  3. 用put方法,向ArrayBlockQueue添加了两个元素,分别是20,60。
  4. 打印出ArrayBlockQueue,结果是10,50,20,60。
  5. 用poll方法,弹出ArrayBlockQueue第一个元素,而且打印出来:10。
  6. 打印出ArrayBlockQueue,结果是50,20,60。
  7. 用take方法,弹出ArrayBlockQueue第一个元素,而且打印出来:50。
  8. 打印出ArrayBlockQueue,结果是20,60。
  9. 用peek方法,弹出ArrayBlockQueue第一个元素,而且打印出来:20。
  10. 打印出ArrayBlockQueue,结果是20,60。

代码比较简单,可是你确定会有疑问性能

  • offer/add(在上面的代码中没有演示)/put都是往队列里面添加元素,区别是什么?
  • poll/take/peek都是弹出队列的元素,区别是什么?
  • 底层代码是如何保证线程安全的?
  • 数据保存在哪里?

要解决上面几个疑问,最好的办法固然是看下源码,经过亲自阅读源码所产生的印象远远要比看视频,看博客,死记硬背最后的结论要深入的多。就算真的忘记了,只要再看看源码,瞬间能够回忆起来。ui

ArrayBlockQueue源码解析

构造方法

ArrayBlockQueue提供了三个构造方法,以下图所示: this

image.png

ArrayBlockingQueue(int capacity)
public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
复制代码

这是最经常使用的构造方法,传入capacity,capacity是容量的意思,也就是ArrayBlockingQueue的最大长度,方法内部直接调用了第二个构造方法,传入的第二个参数为false。spa

ArrayBlockingQueue(int capacity, boolean fair)
public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
复制代码

这个构造方法接受两个参数,分别是capacity和fair,fair是boolean类型的,表明是公平锁,仍是非公平锁,能够看出若是咱们用第一个构造方法来建立ArrayBlockingQueue的话,采用的是非公平锁,由于公平锁会损失必定的性能,在没有充足的理由的状况下,是没有必要采用公平锁的。线程

方法内部作了几件事情:3d

  1. 建立Object类型的数组,容量为capacity,而且赋值给当前类对象的items。
  2. 建立排他锁。
  3. 建立条件变量notEmpty 。
  4. 建立条件变量notFull。

至于排他锁和两个条件变量是作什么用的,看到后面就明白了。code

ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c)
public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        //调用第二个构造方法,方法内部就是初始化数组,排他锁,两个条件变量
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // 开启排他锁
        try {
            int i = 0;
            try {
                // 循环传入的集合,把集合中的元素赋值给items数组,其中i会自增
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;//把i赋值给count 
            //若是i==capacity,也就是到了最大容量,把0赋值给putIndex,不然把i赋值给putIndex
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();//释放排他锁
        }
    }
复制代码
  1. 调用第二个构造方法,方法内部就是初始化数组items,排他锁lock,以及两个条件变量。
  2. 开启排他锁。
  3. 循环传入的集合,将集合中的元素赋值给items数组,其中i会自增。
  4. 把i赋值给count。
  5. 若是i==capacity,说明到了最大的容量,就把0赋值给putIndex,不然把i赋值给putIndex。
  6. 在finally中释放排他锁。

看到这里,咱们应该明白这个构造方法的做用是什么了,就是把传入的集合做为ArrayBlockingQueuede初始化数据,可是咱们又会有一个新的疑问:count,putIndex 是作什么用的。

offer(E e)

public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();//开启排他锁
        try {
            if (count == items.length)//若是count==items.length,返回false
                return false;
            else {
                enqueue(e);//入队
                return true;//返回true
            }
        } finally {
            lock.unlock();//释放锁
        }
    }
复制代码
  1. 开启排他锁。
  2. 若是count==items.length,也就是到了最大的容量,返回false。
  3. 若是count<items.length,执行入队方法,而且返回true。
  4. 释放排他锁。

看到这里,咱们应该能够明白了,ArrayBlockQueue是如何保证线程安全的,仍是利用了ReentrantLock排他锁,count就是用来保存数组的当前大小的。咱们再来看看enqueue方法。

private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
复制代码

这方法比较简单,在代码里面就不写注释了,作了以下的操做:

  1. 把x赋值给items[putIndex] 。
  2. 将putIndex进行自增,若是自增后的值 == items.length,把0赋值给putIndex 。
  3. 执行count++操做。
  4. 调用条件变量notEmpty的signal方法,说明在某个地方,一定调用了notEmpty的await方法,这里就是唤醒由于调用notEmpty的await方法而被阻塞的线程。

这里就解答了一个疑问:putIndex是作什么的,就是入队元素的下标。

add(E e)

public boolean add(E e) {
        return super.add(e);
    }
复制代码
public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
复制代码

这个方法内部最终仍是调用的offer方法。

put(E e)

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//开启响应中断的排他锁
        try {
            while (count == items.length)//若是队列满了,调用notFull的await
                notFull.await();
            enqueue(e);//入队
        } finally {
            lock.unlock();//释放排他锁
        }
    }
复制代码
  1. 开启响应中断的排他锁,若是在获取锁的过程当中,当前的线程被中断,会抛出异常。
  2. 若是队列满了,调用notFull的await方法,说明在某个地方,一定调用了notFull的signal方法来唤醒当前线程,这里用while循环是为了防止虚假唤醒。
  3. 执行入队操做。
  4. 释放排他锁。

能够看到put方法和 offer/add方法的区别了:

  • offer/add:若是队列满了,直接返回false。
  • put:若是队列满了,当前线程被阻塞,等待唤醒。

poll()

public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
复制代码
  1. 开启排他锁。
  2. 若是count==0,直接返回null,不然执行dequeue出队操做。
  3. 释放排他锁。

咱们来看dequeue方法:

private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];//得到元素的值
        items[takeIndex] = null;//把null赋值给items[takeIndex] 
        if (++takeIndex == items.length)//若是takeIndex自增后的值== items.length,就把0赋值给takeIndex
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();//唤醒由于调用notFull的await方法而被阻塞的线程
        return x;
    }
复制代码
  1. 获取元素的值,takeIndex保存的是出队的下标。
  2. 把null赋值给items[takeIndex],也就是清空被弹出的元素。
  3. 若是takeIndex自增后的值== items.length,就把0赋值给takeIndex。
  4. count--。
  5. 唤醒由于调用notFull的await方法而被阻塞的线程。

这里调用了notFull的signal方法来唤醒由于调用notFull的await方法而被阻塞的线程,那到底在哪里调用了notFull的await方法呢,还记不记得在put方法中调用了notFull的await方法,咱们再看看:

while (count == items.length)
                notFull.await();
复制代码

当队列满了,就调用 notFull.await()来等待,在出队操做中,又调用了notFull.signal()来唤醒。

take()

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
复制代码
  1. 开启排他锁。
  2. 若是count==0,表明队列是空的,则调用notEmpty的await方法,用while循环是为了防止虚假唤醒。
  3. 执行出队操做。
  4. 释放排他锁。

这里调用了notEmpty的await方法,那么哪里调用了notEmpty的signal方法呢?在enqueue入队方法里。

咱们能够看到take和poll的区别:

  • take:若是队列为空,会阻塞,直到被唤醒了。
  • poll: 若是队列为空,直接返回null。

peek()

public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); 
        } finally {
            lock.unlock();
        }
    }
复制代码
final E itemAt(int i) {
        return (E) items[i];
    }
复制代码
  1. 开启排他锁。
  2. 得到元素。
  3. 释放排他锁。

咱们能够看到peek和poll/take的区别:

  • peek,只是获取元素,不会清空元素。
  • poll/take,获取并清空元素。

size()

public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
复制代码
  1. 开启排他锁。
  2. 返回count。
  3. 释放排他锁。

总结

至此,ArrayBlockQueue的核心源码就分析完毕了,咱们来作一个总结:

  • ArrayBlockQueue有几个比较重要的字段,分别是items,保存的是队列的数据,putIndex保存的是入队的下标,takeIndex保存的是出队的下标,count用来统计队列元素的个数,lock用来保证线程的安全性,notEmpty和notFull两个条件变量实现唤醒和阻塞。
  • offer和add是同样的,其中add方法内部调用的就是offer方法,若是队列满了,直接返回false。
  • put,若是队列满了,会被阻塞。
  • peek,只是弹出元素,不会清空元素。
  • poll,弹出并清空元素,若是队列为空,直接返回null。
  • take,弹出并清空元素,若是队列为空,会被阻塞。
相关文章
相关标签/搜索