一文弄懂java中的Queue家族

java中Queue家族简介java

简介

java中Collection集合有三你们族List,Set和Queue。固然Map也算是一种集合类,但Map并不继承Collection接口。安全

List,Set在咱们的工做中会常常使用,一般用来存储结果数据,而Queue因为它的特殊性,一般用在生产者消费者模式中。数据结构

如今很火的消息中间件好比:Rabbit MQ等都是Queue这种数据结构的展开。多线程

今天这篇文章将带你们进入Queue家族。app

Queue接口

先看下Queue的继承关系和其中定义的方法:ide

Queue继承自Collection,Collection继承自Iterable。spa

Queue有三类主要的方法,咱们用个表格来看一下他们的区别:线程

方法类型 方法名称 方法名称 区别
Insert add offer 两个方法都表示向Queue中添加某个元素,不一样之处在于添加失败的状况,add只会返回true,若是添加失败,会抛出异常。offer在添加失败的时候会返回false。因此对那些有固定长度的Queue,优先使用offer方法。
Remove remove poll 若是Queue是空的状况下,remove会抛出异常,而poll会返回null。
Examine element peek 获取Queue头部的元素,但不从Queue中删除。二者的区别仍是在于Queue为空的状况下,element会抛出异常,而peek返回null。
注意,由于对poll和peek来讲null是有特殊含义的,因此通常来讲Queue中禁止插入null,可是在实现中仍是有一些类容许插入null好比LinkedList。

尽管如此,咱们在使用中仍是要避免插入null元素。code

Queue的分类

通常来讲Queue能够分为BlockingQueue,Deque和TransferQueue三种。中间件

BlockingQueue

BlockingQueue是Queue的一种实现,它提供了两种额外的功能:

  1. 当当前Queue是空的时候,从BlockingQueue中获取元素的操做会被阻塞。
  2. 当当前Queue达到最大容量的时候,插入BlockingQueue的操做会被阻塞。

BlockingQueue的操做能够分为下面四类:

操做类型 Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable

第一类是会抛出异常的操做,当遇到插入失败,队列为空的时候抛出异常。

第二类是不会抛出异常的操做。

第三类是会Block的操做。当Queue为空或者达到最大容量的时候。

第四类是time out的操做,在给定的时间里会Block,超时会直接返回。

BlockingQueue是线程安全的Queue,能够在生产者消费者模式的多线程中使用,以下所示:

class Producer implements Runnable {
   private final BlockingQueue queue;
   Producer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { queue.put(produce()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
 }

 class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
 }

 class Setup {
   void main() {
     BlockingQueue q = new SomeQueueImplementation();
     Producer p = new Producer(q);
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);
     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
   }
 }

最后,在一个线程中向BlockQueue中插入元素以前的操做happens-before另一个线程中从BlockQueue中删除或者获取的操做。

Deque

Deque是Queue的子类,它表明double ended queue,也就是说能够从Queue的头部或者尾部插入和删除元素。

一样的,咱们也能够将Deque的方法用下面的表格来表示,Deque的方法能够分为对头部的操做和对尾部的操做:

方法类型 Throws exception Special value Throws exception Special value
Insert addFirst(e) offerFirst(e) addLast(e) offerLast(e)
Remove removeFirst() pollFirst() removeLast() pollLast()
Examine getFirst() peekFirst() getLast() peekLast()

和Queue的方法描述基本一致,这里就很少讲了。

当Deque以 FIFO (First-In-First-Out)的方法处理元素的时候,Deque就至关于一个Queue。

当Deque以LIFO (Last-In-First-Out)的方式处理元素的时候,Deque就至关于一个Stack。

TransferQueue

TransferQueue继承自BlockingQueue,为何叫Transfer呢?由于TransferQueue提供了一个transfer的方法,生产者能够调用这个transfer方法,从而等待消费者调用take或者poll方法从Queue中拿取数据。

还提供了非阻塞和timeout版本的tryTransfer方法以供使用。

咱们举个TransferQueue实现的生产者消费者的问题。

先定义一个生产者:

@Slf4j
@Data
@AllArgsConstructor
class Producer implements Runnable {
    private TransferQueue<String> transferQueue;

    private String name;

    private Integer messageCount;

    public static final AtomicInteger messageProduced = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                boolean added = transferQueue.tryTransfer( "第"+i+"个", 2000, TimeUnit.MILLISECONDS);
                log.info("transfered {} 是否成功: {}","第"+i+"个",added);
                if(added){
                    messageProduced.incrementAndGet();
                }
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
            }
        }
        log.info("total transfered {}",messageProduced.get());
    }
}

在生产者的run方法中,咱们调用了tryTransfer方法,等待2秒钟,若是没成功则直接返回。

再定义一个消费者:

@Slf4j
@Data
@AllArgsConstructor
public class Consumer implements Runnable {

    private TransferQueue<String> transferQueue;

    private String name;

    private int messageCount;

    public static final AtomicInteger messageConsumed = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                String element = transferQueue.take();
                log.info("take {}",element );
                messageConsumed.incrementAndGet();
                Thread.sleep(500);
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
            }
        }
        log.info("total consumed {}",messageConsumed.get());
    }

}

在run方法中,调用了transferQueue.take方法来取消息。

下面先看一下一个生产者,零个消费者的状况:

@Test
    public void testOneProduceZeroConsumer() throws InterruptedException {

        TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
        ExecutorService exService = Executors.newFixedThreadPool(10);
        Producer producer = new Producer(transferQueue, "ProducerOne", 5);

        exService.execute(producer);

        exService.awaitTermination(50000, TimeUnit.MILLISECONDS);
        exService.shutdown();
    }

输出结果:

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0个 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1个 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第2个 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第3个 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第4个 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - total transfered 0

能够看到,由于没有消费者,因此消息并无发送成功。

再看下一个有消费者的状况:

@Test
    public void testOneProduceOneConsumer() throws InterruptedException {

        TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
        ExecutorService exService = Executors.newFixedThreadPool(10);
        Producer producer = new Producer(transferQueue, "ProducerOne", 2);
        Consumer consumer = new Consumer(transferQueue, "ConsumerOne", 2);

        exService.execute(producer);
        exService.execute(consumer);

        exService.awaitTermination(50000, TimeUnit.MILLISECONDS);
        exService.shutdown();
    }

输出结果:

[pool-1-thread-2] INFO com.flydean.Consumer - take 第0个
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0个 是否成功: true
[pool-1-thread-2] INFO com.flydean.Consumer - take 第1个
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1个 是否成功: true
[pool-1-thread-1] INFO com.flydean.Producer - total transfered 2
[pool-1-thread-2] INFO com.flydean.Consumer - total consumed 2

能够看到Producer和Consumer是一个一个来生产和消费的。

总结

本文介绍了Queue接口和它的三大分类,这三大分类又有很是多的实现类,咱们将会在后面的文章中再详细介绍。

欢迎关注个人公众号:程序那些事,更多精彩等着您!
更多内容请访问 www.flydean.com
相关文章
相关标签/搜索