BlockingQueue位于JDK5新增的concurrent包中,它很好地解决了多线程中,如何高效安全地“传输”数据的问题。经过这些高效而且线程安全的队列类,为咱们快速搭建高质量的多线程程序带来极大的便利。java
阻塞队列,顾名思义,它首先它是一个队列,在数据结构中,队列是一种线性表。程序员
咱们经过一个共享的队列,可使得数据由队列的一端输入,从另一端输出。经常使用的队列主要有如下两种:
先进先出(FIFO):先插入的队列的元素也最早出队列,相似于排队的功能。从某种程度上来讲这种队列也体现了一种公平性。
后进先出(LIFO):后插入队列的元素最早出队列,这种队列优先处理最近发生的事件,至关于栈。数组
在多线程环境中,经过队列能够很容易地实现数据共享,好比经典的“生产者”和“消费者”模型中,经过队列能够很便利地实现二者之间的数据共享。缓存
假设咱们有若干生产者线程,另外又有若干个消费者线程。若是生产者线程须要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就能够很方便地解决他们之间的数据共享问题。可是,若是生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的状况呢?理想状况下,若是生产者产出数据的速度大于消费者消费的速度,而且当生产出来的数据累积到必定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。安全
在JDK5的concurrent包发布之前,在多线程环境下,程序员只能靠本身去人为地控制这些实现细节,还要兼顾效率和线程安全,这会给咱们的程序带来不小的复杂度。因而,强大的concurrent包横空出世了,而且给咱们带来了强大的BlockingQueue。(注:在多线程领域,所谓阻塞,在某些状况下会挂起线程(即阻塞),一旦条件知足,被挂起的线程又会自动被唤醒)。数据结构
阻塞队列与咱们日常接触的普通队列(LinkedList或ArrayList等)的最大不一样点,在于阻塞队列提供了阻塞添加和阻塞删除的方法。多线程
*阻塞添加 所谓阻塞添加,是指当阻塞队列元素已满时,队列会阻塞加入元素的线程,直到队列元素不满时才从新唤醒线程执行元素加入操做。 *阻塞删除 阻塞删除是指在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空时再执行删除操做(通常都会返回被删除的元素)。
做为BlockingQueue的使用者,咱们不再用关心何时须要阻塞线程,何时须要唤醒线程,由于BlockingQueue把这一切都为咱们包办了。并发
BlockingQueue的核心方法:ide
插入方法:函数
删除方法:
检查方法
常见BlockingQueue:
①ArrayBlockingQueue
ArrayBlockingQueue是一个阻塞式的队列,继承自AbstractBlockingQueue,间接的实现了Queue接口和Collection接口。底层以数组的形式保存数据(实际上可看做一个循环数组)。经常使用的操做包括 add ,offer,put,remove,poll,take,peek。 前三者add offer put 是插入的操做。后面四个方法是取出的操做。
能够说,ArrayBlockingQueue 是一个用数组实现的有界阻塞队列,其内部按先进先出的原则对元素进行排序,其中put方法和take方法为添加和删除的阻塞方法。
须要注意的是,ArrayBlockingQueue内部的阻塞队列是经过重入锁ReenterLock和Condition条件队列实现的,因此ArrayBlockingQueue中的元素存在公平访问与非公平访问的区别,对于公平访问队列,被阻塞的线程能够按照阻塞的前后顺序访问队列,即先阻塞的线程先访问队列。而非公平队列,当队列可用时,阻塞的线程将进入争夺访问资源的竞争中,也就是说谁先抢到谁就执行,没有固定的前后顺序。
②LinkedBlockingQueue
LinkedBlockingQueue是底层基于链表实现的阻塞队列,内部维持着一个数据缓冲队列(该队列由链表构成)。当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者当即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue能够经过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于一样的原理。结构图以下:
LinkedBlockingQueue构造的时候若没有指定大小,则默认大小为Integer.MAX_VALUE,固然也能够在构造函数的参数中指定大小。LinkedBlockingQueue不接受null。
LinkedBlockingQueue之因此可以高效的处理并发数据,还由于其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的状况下生产者和消费者能够并行地操做队列中的数据,以此来提升整个队列的并发性能。
LinkedBlockingQueue中维持两把锁,一把锁用于入队,一把锁用于出队,这也就意味着,同一时刻,只能有一个线程执行入队,其他执行入队的线程将会被阻塞;同时,能够有另外一个线程执行出队,其他执行出队的线程将会被阻塞。换句话说,虽然入队和出队两个操做同时均只能有一个线程操做,可是能够一个入队线程和一个出队线程共同执行,也就意味着可能同时有两个线程在操做队列,那么为了维持线程安全,LinkedBlockingQueue使用一个AtomicInterger类型的变量表示当前队列中含有的元素个数,因此能够确保两个线程之间操做底层队列是线程安全的。
LinkedBlockingQueue能够指定容量,内部维持一个队列,因此有一个头节点head和一个尾节点last,内部维持两把锁,一个用于入队,一个用于出队,还有锁关联的Condition对象。重要字段有:
//容量,若是没有指定,该值为Integer.MAX_VALUE; private final int capacity; //当前队列中的元素 private final AtomicInteger count = new AtomicInteger(); //队列头节点,始终知足head.item==null transient Node<E> head; //队列的尾节点,始终知足last.next==null private transient Node<E> last; //用于出队的锁 private final ReentrantLock takeLock = new ReentrantLock(); //当队列为空时,保存执行出队的线程 private final Condition notEmpty = takeLock.newCondition(); //用于入队的锁 private final ReentrantLock putLock = new ReentrantLock(); //当队列满时,保存执行入队的线程 private final Condition notFull = putLock.newCondition();
LinkedBlockingQueue的构造方法有三个:
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null);//last和head在队列为空时都存在,因此队列中至少有一个节点 } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
从LinkedBlockingQueue的构造方法中能够看出:当调用无参的构造方法时,容量是int的最大值;队列中至少包含一个节点,哪怕队列对外表现为空;LinkedBlockingQueue不支持null元素。
ArrayBlockingQueue和LinkedBlockingQueue是两个最普通、最经常使用的阻塞队列。
LinkedBlockingQueue用一个链表保存元素,其内部有一个Node的内部类,其中有一个成员变量 Node next,这样就造成了一个链表的结构,要获取下一个元素,只要调用next就能够了。而ArrayBlockingQueue则基于数组来保存元素。
LinkedBlockingQueue内部读写(插入获取)各有一个锁,而ArrayBlockingQueue则读写共享一个锁。
③SynchronousQueue
不像ArrayBlockingQueue或LinkedBlockingQueue,SynchronousQueue内部并无数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,由于数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,固然遍历这个队列的操做也是不容许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲到队列中。能够这样来理解:生产者和消费者互相等待对方,握手,而后一块儿离开。
SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据须要(新任务到来时)建立新的线程,若是有空闲线程则会重复使用,线程空闲了60秒后会被回收。
④LinkedBlockingDeque
LinkedBlockingDeque是一个基于链表的双端阻塞队列。和LinkedBlockingQueue相似,区别在于该类实现了Deque接口,而LinkedBlockingQueue实现了Queue接口。
LinkedBlockingDeque是一个可选容量的阻塞队列,若是没有设置容量,那么容量将是Int的最大值。
LinkedBlockingDeque的底层数据结构是一个双端队列,该队列使用链表实现,如图所示:
LinkedBlockingDeque的重要字段有以下几个:
//队列的头节点 transient Node<E> first; //队列的尾节点 transient Node<E> last; //队列中元素的个数 private transient int count; //队列中元素的最大个数 private final int capacity; //锁 final ReentrantLock lock = new ReentrantLock(); //队列为空时,阻塞take线程的条件队列 private final Condition notEmpty = lock.newCondition(); //队列满时,阻塞put线程的条件队列 private final Condition notFull = lock.newCondition();
从上面的字段,能够看到LinkedBlockingDeque内部只有一把锁以及该锁上关联的两个条件,因此能够推断同一时刻只有一个线程能够在队头或者队尾执行入队或出队操做。能够发现这点和LinkedBlockingQueue不一样,LinkedBlockingQueue能够同时有两个线程在两端执行操做。
因为LinkedBlockingDeque是一个双端队列,因此就能够在队头执行入队和出队操做,也能够在队尾执行入队和出队操做。
LinkedBlockingDeque的构造方法有三个:
public LinkedBlockingDeque() { this(Integer.MAX_VALUE); } public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; } public LinkedBlockingDeque(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock lock = this.lock; lock.lock(); // Never contended, but necessary for visibility try { for (E e : c) { if (e == null) throw new NullPointerException(); if (!linkLast(new Node<E>(e))) throw new IllegalStateException("Deque full"); } } finally { lock.unlock(); } }
能够看到这三个构造方法的结构和LinkedBlockingQueue是相同的。 可是LinkedBlockingQueue是存在一个哨兵节点维持头节点的,而LinkedBlockingDeque中是没有的。
LinkedBlockingDeque和LinkedBlockingQueue的相同点在于: ①基于链表 ②容量可选,不设置的话,就是Int的最大值 LinkedBlockingDeque和LinkedBlockingQueue的不一样点在于: ①双端链表和单链表 ②不存在哨兵节点 ③一把锁+两个条件 LinkedBlockingDeque和ArrayBlockingQueue的相同点在于:使用一把锁+两个条件维持队列的同步。
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();for (int i = 0; i < 10; i++) {final int index = i;singleThreadExecutor.execute(new Runnable() { @Overridepublic void run() {try {System.out.println(index);Thread.sleep(2000);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}});}