ArrayBlockingQueue底层是由数组实现的定长阻塞队列(阻塞表示若是没有原始那么获取元素会阻塞当前线程)html
ArrayBlockingQueue通常用于生产者消费者模型业务(排队机制,先进先出)java
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -817911632652898426L; /** The queued items 存储元素容器*/ final Object[] items; /** items index for next take, poll, peek or remove 使用过的元素 */ int takeIndex; /** items index for next put, offer, or add 添加过的元素 */ int putIndex; /** Number of elements in the queue 当前元素数量 */ int count;
add数组
public boolean add(E e) { return super.add(e); } super.add public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } public boolean offer(E e) { checkNotNull(e);//ArrayBlockingQueue不能存储null对象 final ReentrantLock lock = this.lock;//插入操做线程安全 lock.lock(); try { if (count == items.length)//若是当前count==items.length表示队列已经忙了,不能插入 return false; else { insert(e);//插入元素 return true; } } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x;//第一次put为0 putIndex = inc(putIndex);//递增 ++count;//数量递增 notEmpty.signal();//通知获取原始方法能够进行获取 } final int inc(int i) {//若是当前putIndex==items.length那么putIndex从新从零开始 return (++i == items.length) ? 0 : i; } //一样为添加元素,lock.lockInterruptibly若是检测到有Thread.interrupted();会直接抛出异常 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } }
remove安全
public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { if (o.equals(items[i])) {//从头部开始遍历元素判断 removeAt(i); return true; } } return false; } finally { lock.unlock(); } } //queue size = 10 putSize = 5 tackSize = 0 //queue 1,2,3,4,5 removeAt 3 step1: removeAt != takeIndex i = nexti = 4 void removeAt(int i) { final Object[] items = this.items; // if removing front item, just advance if (i == takeIndex) { items[takeIndex] = null;//引用设置为空 takeIndex = inc(takeIndex);//takeIndex++ } else { // slide over all others up through putIndex. for (;;) { int nexti = inc(i);//>队列的头部 递增(putIndex一个循环的0-n) if (nexti != putIndex) {//递增后部位putIndex所有向前移动位置 items[i] = items[nexti]; i = nexti; } else { items[i] = null;//元素设置为空 putIndex = i; break; } } } --count;//元素递减 notFull.signal();//通知notFull.awit() }
getoracle
public E poll() {//获取队列头部元素,获取后设置为空 final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : extract();//若是当前队列为空直接返回null,不为空调用extract() } finally { lock.unlock(); } } //获取队列头部元素,获取后设置为空 //take获取原始若是队列为空会进入阻塞状态知道等到有添加元素才会去返回 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly();//lock.lockInterruptibly若是检测到有Thread.interrupted();会直接抛出异常 try { while (count == 0) notEmpty.await();//若是没有元素进入等待状态,等待被唤醒 return extract(); } finally { lock.unlock(); } } //peek查看队列头部元素 public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : itemAt(takeIndex);//若是元素为空直接返回null,不为空条用itemAt(takeIndex) } finally { lock.unlock(); } } private E extract() { final Object[] items = this.items; E x = this.<E>cast(items[takeIndex]);//泛型转换而且得到当前元素 items[takeIndex] = null;//当前元素设置为空 takeIndex = inc(takeIndex);//获取原始递增 --count;//队列元素递减 notFull.signal();//通知notFull.await()能够进行插入元素 return x;//返回当前获取原始 } //获取元素 final E itemAt(int i) { return this.<E>cast(items[i]); }
定长队列,不能进行扩容ide
线程安全this