【并发队列】有界阻塞队列 ArrayBlockingQueue 源码解析

1、 前言

上节介绍了无界链表方式的阻塞队列LinkedBlockingQueue,本节来研究下有界使用数组方式实现的阻塞队列ArrayBlockingQueue。数组

主要实现原理: 经过ReentrantLock、Condition实现。利用Condition的signal、await实现。缓存

在队列满时, 若是在put会触发notFull.await(), 等待take取出操做后经过notFull.signal()唤醒put。并发

在队列空时, 若是在take会触发notEmpty.await(), 等待put放入操做后经过notEmpty.signal()唤醒take。函数

经过记录inputIndex记录下次要存放的位置,循环。this

2、 ArrayBlockingQueue类图结构


如图ArrayBlockingQueue内部有个数组items用来存放队列元素,putindex下标标示入队元素下标,takeIndex是出队下标,count统计队列元素个数,从定义可知道并无使用volatile修饰,这是由于访问这些变量使用都是在锁块内,并不存在可见性问题。另外有个独占锁lock用来对出入队操做加锁,这致使同时只有一个线程能够访问入队出队,另外notEmpty,notFull条件变量用来进行出入队的同步。spa

另外构造函数必须传入队列大小参数,因此为有界队列,默认是Lock为非公平锁。线程

1指针

2code

3队列

4

5

6

7

8

9

10

11

12

public ArrayBlockingQueue(int capacity) {

        this(capacity, false);

  }

 

    public ArrayBlockingQueue(int capacity, boolean fair) {

        if (capacity <= 0)

            throw new IllegalArgumentException();

        this.items = new Object[capacity];

        lock = new ReentrantLock(fair);

        notEmpty = lock.newCondition();

        notFull =  lock.newCondition();

    }

3、offer操做

在队尾插入元素,若是队列满则返回false,否者入队返回true。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

public boolean offer(E e) {

 

    //e为null,则抛出NullPointerException异常

    checkNotNull(e);

 

    //获取独占锁

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

        //若是队列满则返回false

        if (count == items.length)

            return false;

        else {

            //否者插入元素

            insert(e);

            return true;

        }

    } finally {

        //释放锁

        lock.unlock();

    }

}

 

 

private void insert(E x) {

 

    //元素入队

    items[putIndex] = x;

 

    //计算下一个元素应该存放的下标

    putIndex = inc(putIndex);

    ++count;

    notEmpty.signal();

}

 

//循环队列,计算下标

final int inc(int i) {

    return (++i == items.length) ? 0 : i;

}

这里因为在操做共享变量前加了锁,因此不存在内存不可见问题,加过锁后获取的共享变量都是从主内存获取的,而不是在CPU缓存或者寄存器里面的值,释放锁后修改的共享变量值会刷新会主内存中。

另外这个队列是使用循环数组实现,因此计算下一个元素存放下标时候有些特殊。另外insert后调用 notEmpty.signal();是为了激活调用notEmpty.await()阻塞后放入notEmpty条件队列中的线程。

4、put操做

在队列尾部添加元素,若是队列满则等待队列有空位置插入后返回

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

public void put(E e) throws InterruptedException {

    checkNotNull(e);

    final ReentrantLock lock = this.lock;

 

    //获取可被中断锁

    lock.lockInterruptibly();

    try {

 

        //若是队列满,则把当前线程放入notFull管理的条件队列

        while (count == items.length)

            notFull.await();

 

        //插入元素

        insert(e);

    } finally {

        lock.unlock();

    }

}

须要注意的是若是队列满了那么当前线程会阻塞,知道出队操做调用了notFull.signal方法激活该线程。

代码逻辑很简单,可是这里须要思考一个问题为啥调用lockInterruptibly方法而不是Lock方法。个人理解是由于调用了条件变量的await()方法,而await()方法会在中断标志设置后抛出InterruptedException异常后退出,因此还不如在加锁时候先看中断标志是否是被设置了,若是设置了直接抛出InterruptedException异常,就不用再去获取锁了。而后看了其余并发类里面凡是调用了await的方法获取锁时候都是使用的lockInterruptibly方法而不是Lock也验证了这个想法。

5、poll操做

从队头获取并移除元素,队列为空,则返回null。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

public E poll() {

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

        //当前队列为空则返回null,否者

        return (count == 0) ? null : extract();

    } finally {

        lock.unlock();

    }

}

 

private E extract() {

    final Object[] items = this.items;

 

    //获取元素值

    E x = this.<E>cast(items[takeIndex]);

 

    //数组中值值为null;

    items[takeIndex] = null;

 

    //队头指针计算,队列元素个数减一

    takeIndex = inc(takeIndex);

    --count;

 

    //发送信号激活notFull条件队列里面的线程

    notFull.signal();

    return x;

}

6、take操做

从队头获取元素,若是队列为空则阻塞直到队列有元素。

1

2

3

4

5

6

7

8

9

10

11

12

13

public E take() throws InterruptedException {

    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly();

    try {

 

        //队列为空,则等待,直到队列有元素

        while (count == 0)

            notEmpty.await();

        return extract();

    } finally {

        lock.unlock();

    }

}

须要注意的是若是队列为空,当前线程会被挂起放到notEmpty的条件队列里面,直到入队操做执行调用notEmpty.signal后当前线程才会被激活,await才会返回。

7、peek操做

返回队列头元素但不移除该元素,队列为空,返回null

1

2

3

4

5

6

7

8

9

10

11

12

13

14

public E peek() {

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

        //队列为空返回null,否者返回头元素

        return (count == 0) ? null : itemAt(takeIndex);

    } finally {

        lock.unlock();

    }

}

 

final E itemAt(int i) {

    return this.<E>cast(items[i]);

}

8、 size操做

获取队列元素个数,很是精确由于计算size时候加了独占锁,其余线程不能入队或者出队或者删除元素

1

2

3

4

5

6

7

8

9

public int size() {

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

        return count;

    } finally {

        lock.unlock();

    }

}

9、总结

ArrayBlockingQueue经过使用全局独占锁实现同时只能有一个线程进行入队或者出队操做,这个锁的粒度比较大,有点相似在方法上添加synchronized的意味。其中offer,poll操做经过简单的加锁进行入队出队操做,而put,take则使用了条件变量实现若是队列满则等待,若是队列空则等待,而后分别在出队和入队操做中发送信号激活等待线程实现同步。另外相比LinkedBlockingQueue,ArrayBlockingQueue的size操做的结果是精确的,由于计算前加了全局锁。

相关文章
相关标签/搜索