最近在看一些java基础的东西,看到了队列这章,打算对复习的一些知识点作一个笔记,也算是对本身思路的一个整理,本章先聊聊java中的阻塞队列java
参考文章:数组
http://ifeve.com/java-blocking-queue/app
https://blog.csdn.net/u014082714/article/details/52215130函数
由上图能够用看出java中的阻塞队列都实现了 BlockingQueue接口,BlockingQueue又继承自Queue性能
阻塞队列(BlockingQueue)是一个支持两个附加操做的队列。这两个附加的操做是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列经常使用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。this
阻塞队列提供了四种处理方法:spa
JDK7提供了7个阻塞队列。分别是操作系统
ArrayBlockingQueue.net
ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认状况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的全部生产者线程或消费者线程,当队列可用时,能够按照阻塞的前后顺序访问队列,即先阻塞的生产者线程,能够先往队列里插入元素,先阻塞的消费者线程,能够先从队列里获取元素。一般状况下为了保证公平性会下降吞吐量。咱们能够使用如下代码建立一个公平的阻塞队列:线程
ArrayBlockingQueue fairQueue =
new
ArrayBlockingQueue(
1000
,
true
);
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
经过源码咱们能够看到,构造器第一个参数是指定有界队列的大小(及数组的大小),第二个参数指定是否使用公平锁,这里能够看到阻塞队列的公平访问队列是经过重入锁来实现的(关于重入锁咱们在别的章节介绍)
下边咱们结合源码对其提供的方法作一个简单分析
关于构造器相关说明
/** * * 构造函数,设置队列的初始容量 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } /** * 构造函数。capacity设置数组大小 ,fair设置是否为公平锁 * capacity and the specified access policy. */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair);//是否为公平锁,若是是的话,那么先到的线程先得到锁对象。 //不然,由操做系统调度由哪一个线程得到锁,通常为false,性能会比较高 notEmpty = lock.newCondition(); notFull = lock.newCondition(); } /** *构造函数,带有初始内容的队列 */ public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); //要给数组设置内容,先上锁 try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e;//依次拷贝内容 } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i;//若是putIndex大于数组大小 ,那么从0从新开始 } finally { lock.unlock();//最后必定要释放锁 } }
关于方法的说明
/** * 添加一个元素,其实super.add里面调用了offer方法 */ public boolean add(E e) { return super.add(e); }
/**
* 当调用offer方法返回false时,直接抛出异常
*/
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
}
/** *加入成功返回true,不然返回false * */ public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock();//上锁 try { if (count == items.length) //超过数组的容量 return false; else { enqueue(e); //放入元素 return true; } } finally { lock.unlock(); } } /** * 若是队列已满的话,就会等待 */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly();//和lock()方法的区别是让它在阻塞时也可抛出异常跳出 try { while (count == items.length) notFull.await(); //这里就是阻塞了,要注意。若是运行到这里,那么它会释放上面的锁,一直等到notify enqueue(e); } finally { lock.unlock(); } } /** * 带有超时时间的插入方法,unit表示是按秒、分、时哪种 */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos);//带有超时等待的阻塞方法 } enqueue(e);//入队 return true; } finally { lock.unlock(); } } //实现的方法,若是当前队列为空,返回null public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } //实现的方法,若是当前队列为空,一直阻塞 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await();//队列为空,阻塞方法 return dequeue(); } finally { lock.unlock(); } } //带有超时时间的取元素方法,不然返回Null public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos);//超时等待 } return dequeue();//取得元素 } finally { lock.unlock(); } } //只是看一个队列最前面的元素,取出是不删除队列中的原来元素。队列为空时返回null public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); // 队列为空时返回null } finally { lock.unlock(); } } /** * 返回队列当前元素个数 * */ public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return count; } finally { lock.unlock(); } } /** * 返回当前队列再放入多少个元素就满队 */ public int remainingCapacity() { final ReentrantLock lock = this.lock; lock.lock(); try { return items.length - count; } finally { lock.unlock(); } } /** * 从队列中删除一个元素的方法。删除成功返回true,不然返回false */ public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { if (o.equals(items[i])) { removeAt(i); //真正删除的方法 return true; } if (++i == items.length) i = 0; } while (i != putIndex);//一直不断的循环取出来作判断 } return false; } finally { lock.unlock(); } } /** * 是否包含一个元素 */ public boolean contains(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { if (o.equals(items[i])) return true; if (++i == items.length) i = 0; } while (i != putIndex); } return false; } finally { lock.unlock(); } } /** * 清空队列 * */ public void clear() { final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int k = count; if (k > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { items[i] = null; if (++i == items.length) i = 0; } while (i != putIndex); takeIndex = putIndex; count = 0; if (itrs != null) itrs.queueIsEmpty(); for (; k > 0 && lock.hasWaiters(notFull); k--) notFull.signal(); } } finally { lock.unlock(); } } /** * 取出全部元素到集合 */ public int drainTo(Collection<? super E> c) { return drainTo(c, Integer.MAX_VALUE); } /** * 取出全部元素到集合 */ public int drainTo(Collection<? super E> c, int maxElements) { checkNotNull(c); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int n = Math.min(maxElements, count); int take = takeIndex; int i = 0; try { while (i < n) { @SuppressWarnings("unchecked") E x = (E) items[take]; c.add(x); items[take] = null; if (++take == items.length) take = 0; i++; } return n; } finally { // Restore invariants even if c.add() threw if (i > 0) { count -= i; takeIndex = take; if (itrs != null) { if (count == 0) itrs.queueIsEmpty(); else if (i > take) itrs.takeIndexWrapped(); } for (; i > 0 && lock.hasWaiters(notFull); i--) notFull.signal(); } } } finally { lock.unlock(); } }