Java并发之BlockingQueue

  1、Queue编程

        Queue是队列接口是 Collection的子接口。除了基本的 Collection操做外,队列还提供其余的插入、提取和检查操做。每一个方法都存在两种形式:一种抛出异常(操做失败时),另外一种返回一个特殊值(null 或 false,具体取决于操做)。插入操做的后一种形式是用于专门为有容量限制的 Queue 实现设计的;在大多数实现中,插入操做不会失败。缓存

 

  抛出异常 返回特殊值
插入 add(e) offer(e)
移除 remove(e) poll(e)
检查 element() peek()

 

        队列一般(但并不是必定)以 FIFO(先进先出)的方式排序各个元素。不过优先级队列和 LIFO 队列(或堆栈)例外,前者根据提供的比较器或元素的天然顺序对元素进行排序,后者按 LIFO(后进先出)的方式对元素进行排序。不管使用哪一种排序方式,队列的头 都是调用 remove() 或 poll() 所移除的元素。在 FIFO 队列中,全部的新元素都插入队列的末尾。其余种类的队列可能使用不一样的元素放置规则。每一个 Queue 实现必须指定其顺序属性。 安全

        若是可能,offer 方法可插入一个元素,失败则返回 false。这与 Collection.add 方法不一样,该方法只能经过抛出未经检查的异常使添加元素失败。offer 方法设计用于正常的失败状况,而不是出现异常的状况,例如在容量固定(有界)的队列中。 并发

        remove() 和 poll() 方法可移除和返回队列的头。到底从队列中移除哪一个元素是队列排序策略的功能,而该策略在各类实现中是不一样的。remove() 和 poll() 方法仅在队列为空时其行为有所不一样:remove() 方法抛出一个异常,而 poll() 方法则返回 null。 学习

        element() 和 peek() 获取但不移除队列的头,element与 peek 惟一的不一样在于:此队列为空时将抛出一个异常。this

        Queue 接口并未定义阻塞队列的方法,而这在并发编程中是很常见的。BlockingQueue 接口则定义了那些等待元素出现或等待队列中有可用空间的方法,这些方法扩展了此接口。 spa

        Queue 实现一般不容许插入 null 元素,尽管某些实现(如 LinkedList)并不由止插入 null。即便在容许 null 的实现中,也不该该将 null 插入到 Queue 中,由于 null 也用做 poll 方法的一个特殊返回值,代表队列不包含元素。 线程

        Queue 实现一般未定义 equals 和 hashCode 方法的基于元素的版本,而是从 Object 类继承了基于身份的版本,由于对于具备相同元素但有不一样排序属性的队列而言,基于元素的相等性并不是老是定义良好的。 设计

        Queue 做为队列能够实现一个按固定顺序访问其内部元素的结构,与 LinkedList等实现不一样,Queue并不能获取指定位置的元素。code

        在 ThreadPoolExecutor类中建立线程池时使用的是 BlockingQueue。BlockingQueue是 Queue的子接口,BlockingQueue的实现类有不少:ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue。

        Deque与 Queue不一样在于,Deque是一个双端队列,支持在两端插入和移除元素。名称 deque 是“double ended queue(双端队列)”的缩写,一般读为“deck”。大多数 Deque 实现对于它们可以包含的元素数没有固定限制,但此接口既支持有容量限制的双端队列,也支持没有固定大小限制的双端队列。 Deque不是咱们要学习的重点,下面就不提了。

        咱们要用到的实现为 ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue,DelayedWorkQueue.其中 DelayedWorkQueue是 ScheduledThreadPoolExecutor的内部类实现。

        顶层接口为 Queue,而后是 Queue的抽象实现类 AbstractQueue和子接口 BlockingQueue。图中四个类均继承于 AbstractQueue并实现 BlockingQueue接口,DelayedWorkQueue一样实现了 BlockingQueue,但DelayedWorkQueue继承自 AbstractCollection。

        如下是Queue的源代码:

Java代码   收藏代码
  1. public interface Queue<E> extends Collection<E> {  
  2.     /** 
  3.      * 将指定的元素插入此队列(若是当即可行且不会违反容量限制),在成功时返回 true,若是当前没有可用的空间,则抛出IllegalStateException 
  4.      */  
  5.     boolean add(E e);  
  6.   
  7.     /** 
  8.      * 将指定的元素插入此队列(若是当即可行且不会违反容量限制),当使用有容量限制的队列时,此方法一般要优于add(E),后者可能没法插入元素,而只是抛出一个异常 
  9.      */  
  10.     boolean offer(E e);  
  11.   
  12.     /** 
  13.      * 获取并移除此队列的头。此方法与 poll 惟一的不一样在于:此队列为空时将抛出一个异常 
  14.      */  
  15.     E remove();  
  16.   
  17.     /** 
  18.      * 获取并移除此队列的头,若是此队列为空,则返回 null 
  19.      */  
  20.     E poll();  
  21.   
  22.     /** 
  23.      * 获取,可是不移除此队列的头。此方法与 peek 惟一的不一样在于:此队列为空时将抛出一个异常 
  24.      */  
  25.     E element();  
  26.   
  27.     /** 
  28.      * 获取但不移除此队列的头;若是此队列为空,则返回 null 
  29.      */  
  30.     E peek();  
  31. }  

 

        2、AbstractQueue

        AbstractQueue提供某些 Queue 操做的主要实现。此类中的实现适用于基本实现不 容许包含 null 元素时。add、remove 和 element 方法分别基于 offer、poll 和 peek 方法,可是它们经过抛出异常而不是返回 false 或 null 来指示失败。 

        扩展此类的 Queue 实现至少必须定义一个不容许插入 null 元素的 Queue.offer(E) 方法,该方法以及 Queue.peek()、Queue.poll()、Collection.size() 和 Collection.iterator() 都支持 Iterator.remove() 方法。一般还要重写其余方法。若是没法知足这些要求,那么能够转而考虑为 AbstractCollection 建立子类。 

        如下是 AbstractQueue的源代码:

Java代码   收藏代码
  1. public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> {  
  2.   
  3.     /** 
  4.      * 子类使用的构造方法 
  5.      */  
  6.     protected AbstractQueue() {  
  7.     }  
  8.   
  9.     /** 
  10.      * 将指定的元素插入到此队列中(若是当即可行且不会违反容量限制),在成功时返回 true,若是当前没有可用空间,则抛出 IllegalStateException。 
  11.      */  
  12.     public boolean add(E e) {  
  13.         if (offer(e))  
  14.             return true;  
  15.         else  
  16.             throw new IllegalStateException("Queue full");  
  17.     }  
  18.   
  19.     /** 
  20.      * 获取并移除此队列的头。此方法与 poll 惟一的不一样在于:此队列为空时将抛出一个异常。 
  21.      * 除非队列为空,不然此实现返回 poll 的结果。  
  22.      */  
  23.     public E remove() {  
  24.         E x = poll();  
  25.         if (x != null)  
  26.             return x;  
  27.         else  
  28.             throw new NoSuchElementException();  
  29.     }  
  30.   
  31.     /** 
  32.      * 获取但不移除此队列的头。此方法与 peek 惟一的不一样在于:此队列为空时将抛出一个异常。 
  33.      * 除非队列为空,不然此实现返回 peek 的结果。 
  34.      */  
  35.     public E element() {  
  36.         E x = peek();  
  37.         if (x != null)  
  38.             return x;  
  39.         else  
  40.             throw new NoSuchElementException();  
  41.     }  
  42.   
  43.     /** 
  44.      * 移除此队列中的全部元素。此调用返回后,队列将为空。  
  45.      * 此实现重复调用 poll,直到它返回 null 为止。  
  46.      */  
  47.     public void clear() {  
  48.         while (poll() != null)  
  49.             ;  
  50.     }  
  51.   
  52.     /** 
  53.      * 将指定 collection 中的全部元素都添加到此队列中。 
  54.      * 若是试图将某一队列 addAll 到该队列自己中,则会致使 IllegalArgumentException。 
  55.      * 此外,若是正在进行此操做时修改指定的 collection,则此操做的行为是不肯定的。 
  56.      * 此实如今指定的 collection 上进行迭代,并依次将迭代器返回的每个元素添加到此队列中。 
  57.      * 在试图添加某一元素(尤为是 null 元素)时若是遇到了运行时异常,则可能致使在抛出相关异常时只成功地添加了某些元素。 
  58.      */  
  59.     public boolean addAll(Collection<? extends E> c) {  
  60.         if (c == null)  
  61.             throw new NullPointerException();  
  62.         if (c == this)  
  63.             throw new IllegalArgumentException();  
  64.         boolean modified = false;  
  65.         Iterator<? extends E> e = c.iterator();  
  66.         while (e.hasNext()) {  
  67.             if (add(e.next()))  
  68.                 modified = true;  
  69.         }  
  70.         return modified;  
  71.     }  
  72. }  

 

        3、BlockingQueue

        阻塞队列(BlockingQueue)是一个支持两个附加操做的队列。这两个附加的操做是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用,从而产生阻塞。阻塞队列经常使用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的缓存容器,而消费者也只从容器里拿元素。

        BlockingQueue 的方法以四种形式出现,这四种形式的处理方式不一样:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 false,具体取决于操做),第三种是在操做能够成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。下表中总结了这些方法:

 

     抛出异常      返回特殊值       阻塞    超时
   插入    add(e) offer(e) put(e) offer(e, time, unit)
   移除    remove() poll() take() poll(time, unit)
   检查    element() peek() - -

 

        • 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。 

        • 返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,若是没有则返回null 

        • 阻塞:当阻塞队列满时,若是生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。 

        • 超时:当阻塞队列满时,队列会阻塞生产者线程一段时间,若是超过必定的时间,生产者线程就会退出。 

        BlockingQueue 不接受 null 元素。试图 add、put 或 offer 一个 null 元素时,某些实现会抛出 NullPointerException。null 被用做指示 poll 操做失败的警惕值。 

        BlockingQueue 能够是限定容量的。它在任意给定时间均可以有一个 remainingCapacity,超出此容量,便没法无阻塞地 put 附加元素。没有任何内部容量约束的 BlockingQueue 老是报告 Integer.MAX_VALUE 的剩余容量。 

        BlockingQueue 实现主要用于生产者-消费者队列,但它另外还支持 Collection 接口。所以,举例来讲,使用 remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操做一般不 会有效执行,只能有计划地偶尔使用,好比在取消排队信息时。 

        题外话:所谓生产者消费者模式,这里简单介绍一下。好比咱们在餐厅吃饭,咱们就是消费者,餐厅的厨师就是生产者,而餐厅的服务员就是一个缓冲环节。当生产者制做好菜品(生产产品),交由服务员(缓冲区),由服务员将菜品送至顾客(消费者)品用。


       

    生产者-消费者模式最重要的做用就是解耦,利用缓冲区将二者分离。若是每个厨师作完了菜都须要亲自送到顾客桌上,那么这样就是将厨师与顾客绑定到了一块儿。加入中间缓冲环节,也就是服务员,将送菜的任务交由服务员(缓冲区)去处理,这样生产者与消费者就能够各自作本身的事情了。在此模式下二者间支持并发操做,由于饭店的厨师确定不止一个,顾客也是如此。再有就是支持二者间不一样步,由于二者间的数量与效率是不一样步的,这就会致使生产与消费的速度不一样。

         BlockingQueue 实现是线程安全的。全部排队方法均可以使用内部锁或其余形式的并发控制来自动达到它们的目的。然而,大量的 Collection 操做(addAll、containsAll、retainAll 和 removeAll)没有 必要自动执行,除非在实现中特别说明。所以,举例来讲,在只添加了 c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。 

        BlockingQueue 实质上不 支持使用任何一种“close”或“shutdown”操做来指示再也不添加任何项。这种功能的需求和使用有依赖于实现的倾向。例如,一种经常使用的策略是:对于生产者,插入特殊的 end-of-stream 或 poison 对象,并根据使用者获取这些对象的时间来对它们进行解释。 

       注意,BlockingQueue 能够安全地与多个生产者和多个使用者一块儿使用。 

       如下是基于典型的生产者-消费者场景的一个用例:

Java代码   收藏代码
  1. class Producer implements Runnable {  
  2.     private final BlockingQueue queue;  
  3.     private int i;  
  4.   
  5.     Producer(BlockingQueue q) {  
  6.         queue = q;  
  7.     }  
  8.   
  9.     public void run() {  
  10.         try {  
  11.             while (true) {  
  12.                 queue.put(produce());// 将产品放入缓冲队列  
  13.             }  
  14.         } catch (InterruptedException e) {  
  15.             e.printStackTrace();  
  16.         }  
  17.     }  
  18.   
  19.     int produce() {  
  20.         return i++;// 生产产品  
  21.     }  
  22. }  
  23.   
  24. class Consumer implements Runnable {  
  25.     private final BlockingQueue queue;  
  26.   
  27.     Consumer(BlockingQueue q) {  
  28.         queue = q;  
  29.     }  
  30.   
  31.     public void run() {  
  32.         try {  
  33.             while (true) {  
  34.                 consume(queue.take());// 从缓冲队列取出产品  
  35.             }  
  36.         } catch (InterruptedException e) {  
  37.             e.printStackTrace();  
  38.         }  
  39.     }  
  40.   
  41.     void consume(Object x) {  
  42.         System.out.println("消费:"+x);// 消费产品  
  43.     }  
  44. }  
  45.   
  46. public class Runner {  
  47.     public static void main(String[] args) {  
  48.         BlockingQueue q = new LinkedBlockingQueue<Integer>(10);// 或其余实现  
  49.         Producer p = new Producer(q);  
  50.         Consumer c1 = new Consumer(q);  
  51.         Consumer c2 = new Consumer(q);  
  52.         new Thread(p).start();  
  53.         new Thread(c1).start();  
  54.         new Thread(c2).start();  
  55.     }  
  56. }  
  57. //结果:  
  58. ...  
  59. 消费:160607  
  60. 消费:160608  
  61. 消费:160609  
  62. 消费:160610  
  63. 消费:160611  
  64. ...  

        当生产者与消费者线程启动后,首先生产者会不断往队列中添加产品,一旦队列填满则生产中止,而后消费者从队列中取出产品使用,显然过程当中使用了相似于 wait与 notify的流程,后面会详细分析。

        如下是 BlockingQueue的源代码:

Java代码   收藏代码
  1. public interface BlockingQueue<E> extends Queue<E> {  
  2.     /** 
  3.      * 将指定元素插入此队列中(若是当即可行且不会违反容量限制),成功时返回 true, 
  4.      * 若是当前没有可用的空间,则抛出 IllegalStateException。 
  5.      * 当使用有容量限制的队列时,一般首选 offer 
  6.      */  
  7.     boolean add(E e);  
  8.   
  9.     /** 
  10.      * 将指定元素插入此队列中(若是当即可行且不会违反容量限制),成功时返回 true, 
  11.      * 若是当前没有可用的空间,则返回 false。 
  12.      * 当使用有容量限制的队列时,此方法一般要优于 add(E),后者可能没法插入元素,而只是抛出一个异常 
  13.      */  
  14.     boolean offer(E e);  
  15.   
  16.     /** 
  17.      * 将指定元素插入此队列中,将等待可用的空间(即阻塞) 
  18.      */  
  19.     void put(E e) throws InterruptedException;  
  20.   
  21.     /** 
  22.      * 将指定元素插入此队列中,在到达指定的等待时间前等待可用的空间 
  23.      */  
  24.     boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;  
  25.   
  26.     /** 
  27.      * 获取并移除此队列的头部,在元素变得可用以前一直等待 
  28.      */  
  29.     E take() throws InterruptedException;  
  30.   
  31.     /** 
  32.      * 获取并移除此队列的头部,在指定的等待时间前等待可用的元素 
  33.      */  
  34.     E poll(long timeout, TimeUnit unit) throws InterruptedException;  
  35.   
  36.     /** 
  37.      * 返回在无阻塞的理想状况下(不存在内存或资源约束)此队列能接受的附加元素数量; 
  38.      * 若是没有内部限制,则返回 Integer.MAX_VALUE 
  39.      */  
  40.     int remainingCapacity();  
  41.   
  42.     /** 
  43.      * 今后队列中移除指定元素的单个实例(若是存在)。 
  44.      * 更确切地讲,若是此队列包含一个或多个知足 o.equals(e) 的元素 e,则移除该元素。 
  45.      * 若是此队列包含指定元素(或者此队列因为调用而发生更改),则返回 true 
  46.      */  
  47.     boolean remove(Object o);  
  48.   
  49.     /** 
  50.      * 若是此队列包含指定元素,则返回 true。更确切地讲,当且仅当此队列至少包含一个知足 o.equals(e) 的元素 e时,返回 true 
  51.      */  
  52.     public boolean contains(Object o);  
  53.   
  54.     /** 
  55.      * 移除此队列中全部可用的元素,并将它们添加到给定 collection 中。此操做可能比反复轮询此队列更有效。 
  56.      * 在试图向 collection c 中添加元素没有成功时,可能致使在抛出相关异常时, 
  57.      * 元素会同时在两个 collection 中出现,或者在其中一个 collection中出现,也可能在两个 collection 中都不出现。 
  58.      * 若是试图将一个队列放入自身队列中,则会致使 IllegalArgumentException 异常。 
  59.      * 此外,若是正在进行此操做时修改指定的 collection,则此操做行为是不肯定的。 
  60.      */  
  61.     int drainTo(Collection<? super E> c);  
  62.   
  63.     /** 
  64.      * 最多今后队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。 
  65.      * 在试图向 collection c中添加元素没有成功时,可能致使在抛出相关异常时, 
  66.      * 元素会同时在两个 collection 中出现,或者在其中一个 collection中出现,也可能在两个 collection 中都不出现。 
  67.      * 若是试图将一个队列放入自身队列中,则会致使 IllegalArgumentException 异常。 
  68.      * 此外,若是正在进行此操做时修改指定的 collection,则此操做行为是不肯定的。 
  69.      */  
  70.     int drainTo(Collection<? super E> c, int maxElements);  
  71. }  

        后续几篇介绍阻塞队列的相关实现。

相关文章
相关标签/搜索