连接java
引言:数组
使用位置:自定义线程池时,线程池构造器中有个参数就是阻塞队列。阻塞队列也经常使用于生产者和消费者的场景缓存
阻塞队列经常使用于生产者和消费者的场景,生产者是往在容器队列里添加数据的线程,而消费者则从从容器队列中获取数据的线程,该容器队列就称之为阻塞队列多线程
放入数据:并发
获取数据:ide
抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue
full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。函数
返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,若是没有则返回null高并发
一直阻塞:当阻塞队列满时,若是生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。性能
超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,若是超过必定的时间,生产者线程就会退出this
总结
1.add,offer,put三种添加线程到队列的方法只在队列满的时候有区别,add为抛异常,offer返回boolean值,put直到添加成功为止。
JDK7提供了7个阻塞队列,分别是:
用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。
默认状况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的全部生产者线程或消费者线程,当队列可用时,能够按照阻塞的前后顺序访问队列,即先阻塞的生产者线程,能够先往队列里插入元素,先阻塞的消费者线程,能够先从队列里获取元素。
一般状况下为了保证公平性会下降吞吐量。咱们可使用如下代码建立一个公平的阻塞队列:
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
基于链表的阻塞队列,同ArrayListBlockingQueue相似,此队列按照先进先出(FIFO)的原则对元素进行排序,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者当即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue能够经过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于一样的原理。而LinkedBlockingQueue之因此可以高效的处理并发数据,还由于其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的状况下生产者和消费者能够并行地操做队列中的数据,以此来提升整个队列的并发性能。
做为开发者,咱们须要注意的是,若是构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个相似无限大小的容量(Integer.MAX_VALUE),这样的话,若是生产者的速度一旦大于消费者的速度,也许尚未等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。
ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最经常使用的阻塞队列,通常状况下,在处理多线程间的生产者消费者问题,使用这两个类足以。
是一个支持优先级的无界队列。默认状况下元素采起天然顺序升序排列。能够自定义实现compareTo()方法来指定元素进行排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。须要注意哦的是不能保证同优先级元素的顺序。
是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在建立元素时能够指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。咱们能够将DelayQueue运用在如下应用场景:
是一个不存储元素的阻塞队列。每个put操做必须等待一个take操做,不然不能继续添加元素。SynchronousQueue能够当作是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列自己并不存储任何元素,很是适合于传递性场景,好比在一个线程中使用的数据,传递给另一个线程使用,SynchronousQueue的吞吐量高于LinkedBlockingQueue 和 ArrayBlockingQueue。
是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其余阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。
transfer方法。若是当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法能够把生产者传入的元素马上transfer(传输)给消费者。若是没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。transfer方法的关键代码以下:
Node pred = tryAppend(s, haveData); return awaitMatch(s, pred, e, (how == TIMED), nanos);
第一行代码是试图把存放当前元素的s节点做为tail节点。第二行代码是让CPU自旋等待消费者消费元素。由于自旋会消耗CPU,因此自旋必定的次数后使用Thread.yield()方法来暂停当前正在执行的线程,并执行其余线程。
tryTransfer方法。则是用来试探下生产者传入的元素是否能直接传给消费者。若是没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法不管消费者是否接收,方法当即返回。而transfer方法是必须等到消费者消费了才返回。
对于带有时间限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,则是试图把生产者传入的元素直接传给消费者,可是若是没有消费者消费该元素则等待指定的时间再返回,若是超时还没消费元素,则返回false,若是在超时时间内消费了元素,则返回true。
是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你能够从队列的两端插入和移出元素。双端队列由于多了一个操做队列的入口,在多线程同时入队时,也就减小了一半的竞争。相比其余的阻塞队列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法,以First单词结尾的方法,表示插入,获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。另外插入方法add等同于addLast,移除方法remove等效于removeFirst。可是take方法却等同于takeFirst,不知道是否是Jdk的bug,使用时仍是用带有First和Last后缀的方法更清楚。
在初始化LinkedBlockingDeque时能够设置容量防止其过渡膨胀。另外双向阻塞队列能够运用在“工做窃取”模式中。
以ArrayBlockingQueue为例,咱们先来看看代码:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -817911632652898426L; /** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count; final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; ...省略 }
从上面代码能够看出ArrayBlockingQueue是维护一个Object类型的数组,takeIndex和putIndex分别表示队首元素和队尾元素的下标,count表示队列中元素的个数,lock则是一个可重入锁,notEmpty和notFull是等待条件。接下来咱们看看关键方法put:
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
从put方法的实现能够看出,它先获取了锁,而且获取的是可中断锁,而后判断当前元素个数是否等于数组的长度,若是相等,则调用notFull.await()进行等待,当被其余线程唤醒时,经过enqueue(e)方法插入元素,最后解锁。
/** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
插入成功后,经过notEmpty唤醒正在等待取元素的线程。再来看看take方法:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
跟put方法实现相似,put方法等待的是notFull信号,而take方法等待的是notEmpty信号。在take方法中,若是能够取元素,则经过dequeue方法取得元素,下面是dequeue方法的实现:
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
除了线程池的实现使用阻塞队列以外,咱们能够在生产者-消费者模式来使用阻塞队列,
首先使用Object.wait()、Object.notify()和非阻塞队列实现生产者-消费者模式:
public class Test { private int queueSize = 10; private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize); public static void main(String[] args) { Test test = new Test(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread{ @Override public void run() { while(true){ synchronized (queue) { while(queue.size() == 0){ try { System.out.println("队列空,等待数据"); queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); queue.notify(); } } queue.poll(); //每次移走队首元素 queue.notify(); } } } } class Producer extends Thread{ @Override public void run() { while(true){ synchronized (queue) { while(queue.size() == queueSize){ try { System.out.println("队列满,等待有空余空间"); queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); queue.notify(); } } queue.offer(1); //每次插入一个元素 queue.notify(); } } } } }
下面是使用阻塞队列实现的生产者-消费者模式:
public class Test { private int queueSize = 10; private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize); public static void main(String[] args) { Test test = new Test(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread{ @Override public void run() { while(true){ try { queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread{ @Override public void run() { while(true){ try { queue.put(1); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
很显然使用阻塞队列实现不须要单独考虑同步和线程间通讯的问题,实现起来很简单。