Java中的阻塞队列_BlockingQueue数组
阻塞队列(BlockingQueue)是一个支持两个附加操做的队列。这两个附加的操做是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列经常使用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。缓存
阻塞队列提供了四种处理方法:数据结构
一个 BlockingQueue 多是有界的,若是在插入的时候,发现队列满了,那么 put 操做将会阻塞。一般,在这里咱们说的无界队列也不是说真正的无界,而是它的容量是 Integer.MAX_VALUE(21亿多)。多线程
JDK7提供了7个阻塞队列。分别是:less
ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认状况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的全部生产者线程或消费者线程,当队列可用时,能够按照阻塞的前后顺序访问队列,即先阻塞的生产者线程,能够先往队列里插入元素,先阻塞的消费者线程,能够先从队列里获取元素。一般状况下为了保证公平性会下降吞吐量。咱们可使用如下代码建立一个公平的阻塞队列:ui
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
访问者的公平性是使用可重入锁实现的,代码以下:this
/** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity and the specified access policy. * * @param capacity the capacity of this queue * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if {@code false} the access order is unspecified. * @throws IllegalArgumentException if {@code capacity < 1} */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
底层基于单向链表实现的阻塞队列,能够当作无界队列也能够当作有界队列来使用。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。线程
/** * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}. */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
也能够设置队列的长度设计
/** * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. * * @param capacity the capacity of this queue * @throws IllegalArgumentException if {@code capacity} is not greater * than zero */ public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
PriorityBlockingQueue是一个支持优先级的无界队列,内部数据结构使用数组实现。默认状况下元素采起天然顺序排列,也能够经过比较器comparator来指定元素的排序规则。元素按照升序排列。但它会在初始化的时候指定一个初始化的长度DEFAULT_INITIAL_CAPACITY,code
/** * Creates a {@code PriorityBlockingQueue} with the default * initial capacity (11) that orders its elements according to * their {@linkplain Comparable natural ordering}. */ public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); }
这个初始化的长度会动态调整的,动态调整的逻辑以下面这段代码,
/** * Tries to grow array to accommodate at least one more element * (but normally expand by about 50%), giving up (allowing retry) * on contention (which we expect to be rare). Call only while * holding lock. * * @param array the heap array * @param oldCap the length of the array */ private void tryGrow(Object[] array, int oldCap) { lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } if (newArray == null) // back off if another thread is allocating Thread.yield(); lock.lock(); if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } }
DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在建立元素时能够指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。咱们能够将DelayQueue运用在如下应用场景:
SynchronousQueue是一个不存储元素的阻塞队列。每个put操做必须等待一个take操做,不然不能继续添加元素。SynchronousQueue能够当作是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列自己并不存储任何元素,很是适合于传递性场景,好比在一个线程中使用的数据,传递给另一个线程使用,SynchronousQueue的吞吐量高于LinkedBlockingQueue 和 ArrayBlockingQueue。
LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其余阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。 transfer方法,若是当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法能够把生产者传入的元素马上transfer(传输)给消费者。若是没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。 transfer方法的关键代码以下:
Node pred = tryAppend(s, haveData); return awaitMatch(s, pred, e, (how == TIMED), nanos);
第一行代码是试图把存放当前元素的s节点做为tail节点。第二行代码是让CPU自旋等待消费者消费元素。由于自旋会消耗CPU,因此自旋必定的次数后使用Thread.yield()方法来暂停当前正在执行的线程,并执行其余线程。
tryTransfer方法。则是用来试探下生产者传入的元素是否能直接传给消费者。若是没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法不管消费者是否接收,方法当即返回。而transfer方法是必须等到消费者消费了才返回。
对于带有时间限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,则是试图把生产者传入的元素直接传给消费者,可是若是没有消费者消费该元素则等待指定的时间再返回,若是超时还没消费元素,则返回false,若是在超时时间内消费了元素,则返回true。
LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你能够从队列的两端插入和移出元素。双端队列由于多了一个操做队列的入口,在多线程同时入队时,也就减小了一半的竞争。相比其余的阻塞队列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法,以First单词结尾的方法,表示插入,获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。另外插入方法add等同于addLast,移除方法remove等效于removeFirst。可是take方法却等同于takeFirst,不知道是否是Jdk的bug,使用时仍是用带有First和Last后缀的方法更清楚。在初始化LinkedBlockingDeque时能够设置容量防止其过渡膨胀。另外双向阻塞队列能够运用在“工做窃取”模式中。
/** * Creates a {@code LinkedBlockingDeque} with a capacity of * {@link Integer#MAX_VALUE}. */ public LinkedBlockingDeque() { this(Integer.MAX_VALUE); } /** * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity. * * @param capacity the capacity of this deque * @throws IllegalArgumentException if {@code capacity} is less than 1 */ public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; }
==========END==========