阻塞队列(BlockingQueue)是 Java 5 并发新特性中的内容,阻塞队列的接口是 java.util.concurrent.BlockingQueue,它提供了两个附加操做:当队列中为空时,从队列中获取元素的操做将被阻塞;当队列满时,向队列中添加元素的操做将被阻塞。html
阻塞队列经常使用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器。java
阻塞队列提供了四种操做方法:git
JDK7提供了7个阻塞队列。分别是github
下面分别简单介绍一下:数组
ArrayBlockingQueue:是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。【注:每个线程在获取锁的时候可能都会排队等待,若是在等待时间上,先获取锁的线程的请求必定先被知足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是当前等待时间最长的线程先获取锁】缓存
LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。队列头部和尾部均可以添加和移除元素,多线程并发时,能够将锁的竞争最多降到一半。安全
Java中线程安全的内置队列还有两个:ConcurrentLinkedQueue和LinkedTransferQueue,它们使用了CAS这种无锁的方式来实现了线程安全的队列。无锁的方式性能好,可是队列是无界的,用在生产系统中,生产者生产速度过快,可能致使内存溢出。有界的阻塞队列ArrayBlockingQueue和LinkedBlockingQueue,为了减小Java的垃圾回收对系统性能的影响,会尽可能选择array/heap格式的数据结构。这样的话就只剩下ArrayBlockingQueue。(先埋个坑在这儿,近来接触到了disruptor,感受妙趣横生。disruptor)数据结构
这里分析下ArrayBlockingQueue的实现原理。多线程
构造方法:并发
ArrayBlockingQueue(int capacity); ArrayBlockingQueue(int capacity, boolean fair); ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
ArrayBlockingQueue提供了三种构造方法,参数含义以下:
插入元素:
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
从源码能够看出,生产者首先得到锁lock,而后判断队列是否已经满了,若是满了,则等待,直到被唤醒,而后调用enqueue插入元素。
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null;
final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
以上是enqueue的实现,实现的操做是插入元素到一个环形数组,而后唤醒notEmpty上阻塞的线程。
获取元素:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
从源码能够看出,消费者首先得到锁,而后判断队列是否为空,为空,则等待,直到被唤醒,而后调用dequeue获取元素。
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null;
final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
以上是dequeue的实现,获取环形数组当前takeIndex的元素,并及时将当前元素置为null,设置下一次takeIndex的值takeIndex++,而后唤醒notFull上阻塞的线程。
还有其余方法offer(E e)
、poll()
、add(E e)
、remove()
、 offer(E e, long timeout, TimeUnit unit)
等的实现,由于经常使用take和put,这些方法就不一一赘述了。
使用阻塞队列实现生产者-消费者模式:
/** * Created by noly on 2017/5/19. */
public class BlockingQueueTest { public static void main (String[] args) { ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10); Consumer consumer = new Consumer(queue); Producer producer = new Producer(queue); producer.start(); consumer.start(); } } class Consumer extends Thread { private ArrayBlockingQueue<Integer> queue; public Consumer(ArrayBlockingQueue<Integer> queue){ this.queue = queue; } @Override public void run() { while(true) { try { Integer i = queue.take(); System.out.println("消费者从队列取出元素:" + i); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread { private ArrayBlockingQueue<Integer> queue; public Producer(ArrayBlockingQueue<Integer> queue){ this.queue = queue; } @Override public void run() { for (int i = 0; i < 100; i++) { try { queue.put(i); System.out.println("生产者向队列插入元素:" + i); } catch (InterruptedException e) { e.printStackTrace(); } } } }
若是不使用阻塞队列,使用Object.wait()和Object.notify()、非阻塞队列实现生产者-消费者模式,考虑线程间的通信,会很是麻烦。