Java中的阻塞队列-ArrayBlockingQueue(一)

最近在看一些java基础的东西,看到了队列这章,打算对复习的一些知识点作一个笔记,也算是对本身思路的一个整理,本章先聊聊java中的阻塞队列java

参考文章:数组

http://ifeve.com/java-blocking-queue/app

https://blog.csdn.net/u014082714/article/details/52215130函数

由上图能够用看出java中的阻塞队列都实现了 BlockingQueue接口,BlockingQueue又继承自Queue性能

一、什么是阻塞队列?

阻塞队列(BlockingQueue)是一个支持两个附加操做的队列。这两个附加的操做是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列经常使用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。this

阻塞队列提供了四种处理方法:spa

  • 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
  • 返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,若是没有则返回null
  • 一直阻塞:当阻塞队列满时,若是生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
  • 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,若是超过必定的时间,生产者线程就会退出。

2.、Java里的阻塞队列

JDK7提供了7个阻塞队列。分别是操作系统

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

 ArrayBlockingQueue.net

ArrayBlockingQueue是一个用数组实现有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认状况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的全部生产者线程或消费者线程,当队列可用时,能够按照阻塞的前后顺序访问队列,即先阻塞的生产者线程,能够先往队列里插入元素,先阻塞的消费者线程,能够先从队列里获取元素。一般状况下为了保证公平性会下降吞吐量。咱们能够使用如下代码建立一个公平的阻塞队列线程

ArrayBlockingQueue fairQueue = new  ArrayBlockingQueue(1000,true);

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
   throw new IllegalArgumentException();
   this.items = new Object[capacity];
   lock = new ReentrantLock(fair);
   notEmpty = lock.newCondition();
   notFull = lock.newCondition();
}

经过源码咱们能够看到,构造器第一个参数是指定有界队列的大小(及数组的大小),第二个参数指定是否使用公平锁,这里能够看到阻塞队列的公平访问队列是经过重入锁来实现的(关于重入锁咱们在别的章节介绍)

下边咱们结合源码对其提供的方法作一个简单分析

关于构造器相关说明

/**
* * 构造函数,设置队列的初始容量 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } /** * 构造函数。capacity设置数组大小 ,fair设置是否为公平锁 * capacity and the specified access policy. */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair);//是否为公平锁,若是是的话,那么先到的线程先得到锁对象。 //不然,由操做系统调度由哪一个线程得到锁,通常为false,性能会比较高 notEmpty = lock.newCondition(); notFull = lock.newCondition(); } /** *构造函数,带有初始内容的队列 */ public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); //要给数组设置内容,先上锁 try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e;//依次拷贝内容 } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i;//若是putIndex大于数组大小 ,那么从0从新开始 } finally { lock.unlock();//最后必定要释放锁 } }
关于方法的说明

/**
       * 添加一个元素,其实super.add里面调用了offer方法       */       public boolean add(E e) {           return super.add(e);       }    
/**
* 当调用offer方法返回false时,直接抛出异常
*/
     public boolean add(E e) {
       if (offer(e))
           return true;
else
throw new IllegalStateException("Queue full");
}
}
     /**       *加入成功返回true,不然返回false       *        */       public boolean offer(E e) {           checkNotNull(e);           final ReentrantLock lock = this.lock;           lock.lock();//上锁  
        try {               if (count == items.length) //超过数组的容量  
                return false;               else {                   enqueue(e); //放入元素  
                return true;               }           } finally {               lock.unlock();           }       }          /**       * 若是队列已满的话,就会等待       */       public void put(E e) throws InterruptedException {           checkNotNull(e);           final ReentrantLock lock = this.lock;           lock.lockInterruptibly();//和lock()方法的区别是让它在阻塞时也可抛出异常跳出  
        try {               while (count == items.length)                   notFull.await(); //这里就是阻塞了,要注意。若是运行到这里,那么它会释放上面的锁,一直等到notify  
            enqueue(e);           } finally {               lock.unlock();           }       }          /**       * 带有超时时间的插入方法,unit表示是按秒、分、时哪种       */       public boolean offer(E e, long timeout, TimeUnit unit)           throws InterruptedException {              checkNotNull(e);           long nanos = unit.toNanos(timeout);           final ReentrantLock lock = this.lock;           lock.lockInterruptibly();           try {               while (count == items.length) {                   if (nanos <= 0)                       return false;                   nanos = notFull.awaitNanos(nanos);//带有超时等待的阻塞方法  
            }               enqueue(e);//入队  
            return true;           } finally {               lock.unlock();           }       }          //实现的方法,若是当前队列为空,返回null  
    public E poll() {           final ReentrantLock lock = this.lock;           lock.lock();           try {               return (count == 0) ? null : dequeue();           } finally {               lock.unlock();           }       }        //实现的方法,若是当前队列为空,一直阻塞  
    public E take() throws InterruptedException {           final ReentrantLock lock = this.lock;           lock.lockInterruptibly();           try {               while (count == 0)                   notEmpty.await();//队列为空,阻塞方法  
            return dequeue();           } finally {               lock.unlock();           }       }       //带有超时时间的取元素方法,不然返回Null  
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {           long nanos = unit.toNanos(timeout);           final ReentrantLock lock = this.lock;           lock.lockInterruptibly();           try {               while (count == 0) {                   if (nanos <= 0)                       return null;                   nanos = notEmpty.awaitNanos(nanos);//超时等待  
            }               return dequeue();//取得元素  
        } finally {               lock.unlock();           }       }       //只是看一个队列最前面的元素,取出是不删除队列中的原来元素。队列为空时返回null  
    public E peek() {           final ReentrantLock lock = this.lock;           lock.lock();           try {               return itemAt(takeIndex); // 队列为空时返回null  
        } finally {               lock.unlock();           }       }          /**       * 返回队列当前元素个数       *       */       public int size() {           final ReentrantLock lock = this.lock;           lock.lock();           try {               return count;           } finally {               lock.unlock();           }       }          /**       * 返回当前队列再放入多少个元素就满队       */       public int remainingCapacity() {           final ReentrantLock lock = this.lock;           lock.lock();           try {               return items.length - count;           } finally {               lock.unlock();           }       }          /**       *  从队列中删除一个元素的方法。删除成功返回true,不然返回false       */       public boolean remove(Object o) {           if (o == nullreturn false;           final Object[] items = this.items;           final ReentrantLock lock = this.lock;           lock.lock();           try {               if (count > 0) {                   final int putIndex = this.putIndex;                   int i = takeIndex;                   do {                       if (o.equals(items[i])) {                           removeAt(i); //真正删除的方法  
                        return true;                       }                       if (++i == items.length)                           i = 0;                   } while (i != putIndex);//一直不断的循环取出来作判断  
            }               return false;           } finally {               lock.unlock();           }       }          /**       * 是否包含一个元素       */       public boolean contains(Object o) {           if (o == nullreturn false;           final Object[] items = this.items;           final ReentrantLock lock = this.lock;           lock.lock();           try {               if (count > 0) {                   final int putIndex = this.putIndex;                   int i = takeIndex;                   do {                       if (o.equals(items[i]))                           return true;                       if (++i == items.length)                           i = 0;                   } while (i != putIndex);               }               return false;           } finally {               lock.unlock();           }       }          /**       * 清空队列       *       */       public void clear() {           final Object[] items = this.items;           final ReentrantLock lock = this.lock;           lock.lock();           try {               int k = count;               if (k > 0) {                   final int putIndex = this.putIndex;                   int i = takeIndex;                   do {                       items[i] null;                       if (++i == items.length)                           i = 0;                   } while (i != putIndex);                   takeIndex = putIndex;                   count = 0;                   if (itrs != null)                       itrs.queueIsEmpty();                   for (; k > 0 && lock.hasWaiters(notFull); k--)                       notFull.signal();               }           } finally {               lock.unlock();           }       }          /**       * 取出全部元素到集合       */       public int drainTo(Collection<? super E> c) {           return drainTo(c, Integer.MAX_VALUE);       }          /**       * 取出全部元素到集合       */       public int drainTo(Collection<? super E> c, int maxElements) {           checkNotNull(c);           if (c == this)               throw new IllegalArgumentException();           if (maxElements <= 0)               return 0;           final Object[] items = this.items;           final ReentrantLock lock = this.lock;           lock.lock();           try {               int n = Math.min(maxElements, count);               int take = takeIndex;               int i = 0;               try {                   while (i < n) {                       @SuppressWarnings("unchecked")                       E x = (E) items[take];                       c.add(x);                       items[take] null;                       if (++take == items.length)                           take = 0;                       i++;                   }                   return n;               } finally {                   // Restore invariants even if c.add() threw  
                if (i > 0) {                       count -= i;                       takeIndex = take;                       if (itrs != null) {                           if (count == 0)                               itrs.queueIsEmpty();                           else if (i > take)                               itrs.takeIndexWrapped();                       }                       for (; i > 0 && lock.hasWaiters(notFull); i--)                           notFull.signal();                   }               }           } finally {               lock.unlock();           }       }  
相关文章
相关标签/搜索