并发编程(八)—— Java 并发队列 BlockingQueue 实现之 ArrayBlockingQueue 源码分析

开篇先介绍下 BlockingQueue 这个接口的规则,后面再看其实现。html

阻塞队列概要

阻塞队列与咱们日常接触的普通队列(LinkedList或ArrayList等)的最大不一样点,在于阻塞队列的阻塞添加和阻塞删除方法。java

阻塞添加
所谓的阻塞添加是指当阻塞队列元素已满时,队列会阻塞加入元素的线程,直队列元素不满时才从新唤醒线程执行元素加入操做。数组

阻塞删除
阻塞删除是指在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操做(通常都会返回被删除的元素)。缓存

 

因为Java中的阻塞队列接口BlockingQueue继承自Queue接口,所以先来看看阻塞队列接口为咱们提供的主要方法app

 1 public interface BlockingQueue<E> extends Queue<E> {  2 
 3     //将指定的元素插入到此队列的尾部(若是当即可行且不会超过该队列的容量)  4     //在成功时返回 true,若是此队列已满,则抛IllegalStateException。 
 5     boolean add(E e);  6 
 7     //将指定的元素插入到此队列的尾部(若是当即可行且不会超过该队列的容量)  8     // 将指定的元素插入此队列的尾部,若是该队列已满,  9     //则在到达指定的等待时间以前等待可用的空间,该方法可中断 
10     boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; 11 
12     //将指定的元素插入此队列的尾部,若是该队列已满,则一直等到(阻塞)。 
13     void put(E e) throws InterruptedException; 14 
15     //获取并移除此队列的头部,若是没有元素则等待(阻塞), 16     //直到有元素将唤醒等待线程执行该操做 
17     E take() throws InterruptedException; 18 
19     //获取并移除此队列的头部,在指定的等待时间前一直等到获取元素, //超过期间方法将结束
20     E poll(long timeout, TimeUnit unit) throws InterruptedException; 21 
22     //今后队列中移除指定元素的单个实例(若是存在)。 
23     boolean remove(Object o); 24 }

这里咱们把上述操做进行分类ide

插入方法:this

  add(E e) : 添加成功返回true,失败抛IllegalStateException异常
  offer(E e) : 成功返回 true,若是此队列已满,则返回 false。
  put(E e) :将元素插入此队列的尾部,若是该队列已满,则一直阻塞
删除方法:spa

  remove(Object o) :移除指定元素,成功返回true,失败返回false
  poll() : 获取并移除此队列的头元素,若队列为空,则返回 null
  take():获取并移除此队列头元素,若没有元素则一直阻塞。线程

阻塞队列的对元素的增删查操做主要就是上述的三类方法,一般状况下咱们都是经过这3类方法操做阻塞队列,了解完阻塞队列的基本方法后,下面咱们将分析阻塞队列中的两个实现类ArrayBlockingQueue和LinkedBlockingQueue的简单使用和实现原理,其中实现原理是这篇文章重点分析的内容。code

ArrayBlockingQueue

在看源码以前,经过查询API发现对ArrayBlockingQueue特色的简单介绍:

一、一个由数组支持的有界队列,此队列按FIFO(先进先出)原则对元素进行排序。
二、新元素插入到队列的尾部,队列获取操做则是从队列头部开始得到元素
三、这是一个简单的“有界缓存区”,一旦建立,就不能在增长其容量
四、在向已满队列中添加元素会致使操做阻塞,从空队列中提取元素也将致使阻塞
五、此类支持对等待的生产者线程和使用者线程进行排序的可选公平策略。默认状况下,不保证是这种排序的。然而经过将公平性(fairness)设置为true,而构造的队列容许按照FIFO顺序访问线程。公平性一般会下降吞吐量,但也减小了可变性和避免了“不平衡性”。

简单的来讲,ArrayBlockingQueue 是一个用数组实现的有界阻塞队列,其内部按先进先出的原则对元素进行排序,其中put方法和take方法为添加和删除的阻塞方法,下面咱们经过ArrayBlockingQueue队列实现一个生产者消费者的案例,经过该案例简单了解其使用方式

使用示例

Consumer 消费者和 Producer 生产者,经过ArrayBlockingQueue 队列获取和添加元素,其中消费者调用了take()方法获取元素当队列没有元素就阻塞,生产者调用put()方法添加元素,当队列满时就阻塞,经过这种方式便实现生产者消费者模式。比直接使用等待唤醒机制或者Condition条件队列来得更加简单。

 1 package com.zejian.concurrencys.Queue;  2 import java.util.concurrent.ArrayBlockingQueue;  3 import java.util.concurrent.TimeUnit;  4 
 5 /**
 6  * Created by chenhao on 2018/01/07  7  */
 8 public class ArrayBlockingQueueDemo {  9     private final static ArrayBlockingQueue<Apple> queue= new ArrayBlockingQueue<>(1); 10     public static void main(String[] args){ 11         new Thread(new Producer(queue)).start(); 12         new Thread(new Producer(queue)).start(); 13         new Thread(new Consumer(queue)).start(); 14         new Thread(new Consumer(queue)).start(); 15  } 16 } 17 
18  class Apple { 19     public Apple(){ 20  } 21  } 22 
23 /**
24  * 生产者线程 25  */
26 class Producer implements Runnable{ 27     private final ArrayBlockingQueue<Apple> mAbq; 28     Producer(ArrayBlockingQueue<Apple> arrayBlockingQueue){ 29         this.mAbq = arrayBlockingQueue; 30  } 31 
32  @Override 33     public void run() { 34         while (true) { 35  Produce(); 36  } 37  } 38 
39     private void Produce(){ 40         try { 41             Apple apple = new Apple(); 42  mAbq.put(apple); 43             System.out.println("生产:"+apple); 44         } catch (InterruptedException e) { 45  e.printStackTrace(); 46  } 47  } 48 } 49 
50 /**
51  * 消费者线程 52  */
53 class Consumer implements Runnable{ 54 
55     private ArrayBlockingQueue<Apple> mAbq; 56     Consumer(ArrayBlockingQueue<Apple> arrayBlockingQueue){ 57         this.mAbq = arrayBlockingQueue; 58  } 59 
60  @Override 61     public void run() { 62         while (true){ 63             try { 64                 TimeUnit.MILLISECONDS.sleep(1000); 65  comsume(); 66             } catch (InterruptedException e) { 67  e.printStackTrace(); 68  } 69  } 70  } 71 
72     private void comsume() throws InterruptedException { 73         Apple apple = mAbq.take(); 74         System.out.println("消费Apple="+apple); 75  } 76 }

输出:

1 生产:com.zejian.concurrencys.Queue.Apple@109967f 2 消费Apple=com.zejian.concurrencys.Queue.Apple@109967f 3 生产:com.zejian.concurrencys.Queue.Apple@269a77 4 生产:com.zejian.concurrencys.Queue.Apple@1ce746e 5 消费Apple=com.zejian.concurrencys.Queue.Apple@269a77 6 消费Apple=com.zejian.concurrencys.Queue.Apple@1ce746e 7 ........

源码剖析

ArrayBlockingQueue内部的阻塞队列是经过重入锁ReenterLock和Condition条件队列实现的,因此ArrayBlockingQueue中的元素存在公平访问与非公平访问的区别,对于公平访问队列,被阻塞的线程能够按照阻塞的前后顺序访问队列,即先阻塞的线程先访问队列。而非公平队列,当队列可用时,阻塞的线程将进入争夺访问资源的竞争中,也就是说谁先抢到谁就执行,没有固定的前后顺序。建立公平与非公平阻塞队列代码以下:

 1 //默认非公平阻塞队列
 2 ArrayBlockingQueue queue = new ArrayBlockingQueue(2);  3 //公平阻塞队列
 4 ArrayBlockingQueue queue1 = new ArrayBlockingQueue(2,true);  5 
 6 //构造方法源码
 7 public ArrayBlockingQueue(int capacity) {  8      this(capacity, false);  9  } 10 
11 public ArrayBlockingQueue(int capacity, boolean fair) { 12      if (capacity <= 0) 13          throw new IllegalArgumentException(); 14      this.items = new Object[capacity]; 15      lock = new ReentrantLock(fair); 16      notEmpty = lock.newCondition(); 17      notFull = lock.newCondition(); 18  }

ArrayBlockingQueue的内部是经过一个可重入锁ReentrantLock和两个Condition条件对象来实现阻塞,这里先看看其内部成员变量

 1 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
 2         implements BlockingQueue<E>, java.io.Serializable {  3 
 4     /** 存储数据的数组 */
 5     final Object[] items;  6 
 7     /**获取数据的索引,主要用于take,poll,peek,remove方法 */
 8     int takeIndex;  9 
10     /**添加数据的索引,主要用于 put, offer, or add 方法*/
11     int putIndex; 12 
13     /** 队列元素的个数 */
14     int count; 15 
16 
17     /** 控制并不是访问的锁 */
18     final ReentrantLock lock; 19 
20     /**notEmpty条件对象,用于通知take方法队列已有元素,可执行获取操做 */
21     private final Condition notEmpty; 22 
23     /**notFull条件对象,用于通知put方法队列未满,可执行添加操做 */
24     private final Condition notFull; 25 
26     /**
27  迭代器 28      */
29     transient Itrs itrs = null; 30 
31 }

从成员变量可看出,ArrayBlockingQueue内部确实是经过数组对象items来存储全部的数据,值得注意的是ArrayBlockingQueue经过一个ReentrantLock来同时控制添加线程与移除线程的并不是访问,这点与LinkedBlockingQueue区别很大(稍后会分析)。而对于notEmpty条件对象则是用于存放等待或唤醒调用take方法的线程,告诉他们队列已有元素,能够执行获取操做。同理notFull条件对象是用于等待或唤醒调用put方法的线程,告诉它们,队列未满,能够执行添加元素的操做。takeIndex表明的是下一个方法(take,poll,peek,remove)被调用时获取数组元素的索引,putIndex则表明下一个方法(put, offer, or add)被调用时元素添加到数组中的索引。图示以下

添加

 1 //add方法实现,间接调用了offer(e)
 2 public boolean add(E e) {  3         if (offer(e))  4             return true;  5         else
 6             throw new IllegalStateException("Queue full");  7  }  8 
 9 //offer方法
10 public boolean offer(E e) { 11      checkNotNull(e);//检查元素是否为null
12      final ReentrantLock lock = this.lock; 13      lock.lock();//加锁
14      try { 15          if (count == items.length)//判断队列是否满
16              return false; 17          else { 18              enqueue(e);//添加元素到队列
19              return true; 20  } 21      } finally { 22  lock.unlock(); 23  } 24  } 25 
26 //入队操做
27 private void enqueue(E x) { 28     //获取当前数组
29     final Object[] items = this.items; 30     //经过putIndex索引对数组进行赋值
31     items[putIndex] = x; 32     //索引自增,若是已经是最后一个位置,从新设置 putIndex = 0;
33     if (++putIndex == items.length) 34         putIndex = 0; 35     count++;//队列中元素数量加1 36     //唤醒调用take()方法的线程,执行元素获取操做。
37  notEmpty.signal(); 38 }

这里的add方法和offer方法实现比较简单,其中须要注意的是enqueue(E x)方法,当putIndex索引大小等于数组长度时,须要将putIndex从新设置为0,由于后面讲到的取值也是从数组中第一个开始依次日后面取,取了以后会将原位置的值设置为null,方便循环put操做,这里要注意并非每次都是取数组中的第一个值,takeIndex也会增长。由于作了添加操做,数组中确定不会空,则 notEmpty条件会唤醒take()方法取值。

ok~,接着看put方法,它是一个阻塞添加的方法:

 1 //put方法,阻塞时可中断
 2  public void put(E e) throws InterruptedException {  3  checkNotNull(e);  4       final ReentrantLock lock = this.lock;  5       lock.lockInterruptibly();//该方法可中断
 6       try {  7           //当队列元素个数与数组长度相等时,没法添加元素
 8           while (count == items.length)  9               //将当前调用线程挂起,添加到notFull条件队列中等待唤醒
10  notFull.await(); 11           enqueue(e);//若是队列没有满直接添加。。
12       } finally { 13  lock.unlock(); 14  } 15   }

put方法是一个阻塞的方法,若是队列元素已满,那么当前线程将会被notFull条件对象挂起加到等待队列中,直到队列有空档才会唤醒执行添加操做。但若是队列没有满,那么就直接调用enqueue(e)方法将元素加入到数组队列中。到此咱们对三个添加方法即put,offer,add都分析完毕,其中offer,add在正常状况下都是无阻塞的添加,而put方法是阻塞添加。

(获取)删除

关于删除先看poll方法,该方法获取并移除此队列的头元素,若队列为空,则返回 null。

 1 public E poll() {  2   final ReentrantLock lock = this.lock;  3  lock.lock();  4    try {  5        //判断队列是否为null,不为null执行dequeue()方法,不然返回null
 6        return (count == 0) ? null : dequeue();  7    } finally {  8  lock.unlock();  9  } 10 } 11  //删除队列头元素并返回
12  private E dequeue() { 13      //拿到当前数组的数据
14      final Object[] items = this.items; 15       @SuppressWarnings("unchecked") 16       //获取要删除的对象
17       E x = (E) items[takeIndex]; 18  将数组中takeIndex索引位置设置为null 19       items[takeIndex] = null; 20       //takeIndex索引加1并判断是否与数组长度相等, 21       //若是相等说明已到尽头,恢复为0
22       if (++takeIndex == items.length) 23           takeIndex = 0; 24       count--;//队列个数减1
25       if (itrs != null) 26           itrs.elementDequeued();//同时更新迭代器中的元素数据 27       //删除了元素说明队列有空位,唤醒notFull条件对象添加线程,执行添加操做
28  notFull.signal(); 29       return x; 30  }

接着看take()方法,是一个阻塞方法,获取队列头元素并删除。

 1 //从队列头部删除,队列没有元素就阻塞,可中断
 2  public E take() throws InterruptedException {  3     final ReentrantLock lock = this.lock;  4       lock.lockInterruptibly();//中断
 5       try {  6           //若是队列没有元素
 7           while (count == 0)  8               //执行阻塞操做
 9  notEmpty.await(); 10           return dequeue();//若是队列有元素执行删除操做
11       } finally { 12  lock.unlock(); 13  } 14  }

take和poll的区别是,队列为空时,poll返回null,take则被挂起阻塞,直到有元素添加进来,take线程被唤醒,而后获取第一个元素并删除。

 

peek方法很是简单,直接返回当前队列的头元素但不删除任何元素。

 1 public E peek() {  2       final ReentrantLock lock = this.lock;  3  lock.lock();  4       try {  5        //直接返回当前队列的头元素,但不删除
 6           return itemAt(takeIndex); // null when queue is empty
 7       } finally {  8  lock.unlock();  9  } 10  } 11 
12 final E itemAt(int i) { 13       return (E) items[i]; 14   }

ok~,到此对于ArrayBlockingQueue的主要方法就分析完了。

原文出处:https://www.cnblogs.com/java-chen-hao/p/10234149.html

相关文章
相关标签/搜索