并发编程之并发队列

1、并发队列

在并发队列上JDK提供了两套实现,java

一个是以ConcurrentLinkedQueue为表明的高性能队列非阻塞,数组

一个是以BlockingQueue接口为表明的阻塞队列,不管哪一种都继承自Queue。安全

一、阻塞队列与非阻塞队

阻塞队列与普通队列的区别在于:并发

阻塞队列:ide

  • 当队列是空的时,从队列中获取元素的操做将会被阻塞,试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其余的线程往空的队列插入新的元素;
  • 当队列是满时,往队列里添加元素的操做会被阻塞。试图往已满的阻塞队列中添加新元素的线程一样也会被阻塞,直到其余的线程使队列从新变得空闲起来,如从队列中移除一个或者多个元素,或者彻底清空队列.

二、ConcurrentLinkedQeque

ConcurrentLinkedQueue : 是一个适用于高并发场景下的队列,经过无锁的方式,实现了高并发状态下的高性能,一般ConcurrentLinkedQueue性能好于BlockingQueue.它是一个基于连接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最早加入的,尾是最近加入的,该队列不容许null元素。高并发

// 非阻塞式队列,无界队列
ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
    q.offer("张三");
    q.offer("李四");
    q.offer("王五");
    //从头获取元素,删除该元素
    System.out.println(q.poll());
    //从头获取元素,不刪除该元素
    System.out.println(q.peek());
    //获取总长度
    System.out.println(q.size());复制代码

三、BlockingQueue

阻塞队列(BlockingQueue)是一个支持两个附加操做的队列。这两个附加的操做是:性能

  • 在队列为空时,获取元素的线程会等待队列变为非空。
  • 当队列满时,存储元素的线程会等待队列可用。

在Java中,BlockingQueue的接口位于java.util.concurrent 包中(在Java5版本开始提供),由上面介绍的阻塞队列的特性可知,阻塞队列是线程安全的。this

1)、ArrayBlockingQueue

ArrayBlockingQueue是一个有边界的阻塞队列,它的内部实现是一个数组。有边界的意思是它的容量是有限的,咱们必须在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。spa

ArrayBlockingQueue是以先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部。下面是一个初始化和使用ArrayBlockingQueue的例子:线程

<String> arrays = new ArrayBlockingQueue<String>(3);
    arrays.offer("张三");
     arrays.offer("李四");
    arrays.offer("王五");
    arrays.offer("666", 3, TimeUnit.SECONDS); // 队列满了,阻塞3秒后向下执行
    System.out.println(arrays.poll()); // 张三
    System.out.println(arrays.poll()); // 李四
    System.out.println(arrays.poll()); // 王五
    System.out.println(arrays.poll(3, TimeUnit.SECONDS)); //队列为空,阻塞3秒后结束复制代码

2)、LinkedBlockingQueue

LinkedBlockingQueue阻塞队列大小的配置是可选的,若是咱们初始化时指定一个大小,它就是有边界的,若是不指定,它就是无边界的。说是无边界,实际上是采用了默认大小为Integer.MAX_VALUE的容量 。它的内部实现是一个链表。

和ArrayBlockingQueue同样,LinkedBlockingQueue 也是以先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部。下面是一个初始化和使LinkedBlockingQueue的例子:

LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(3);
linkedBlockingQueue.add("张三");
linkedBlockingQueue.add("李四");
linkedBlockingQueue.add("李四");
System.out.println(linkedBlockingQueue.size()); // 3复制代码

3)、PriorityBlockingQueue(有界,快满时自动扩容,看似无界)

PriorityBlockingQueue是一个没有边界的队列,它的排序规则和 java.util.PriorityQueue同样。须要注意,PriorityBlockingQueue中容许插入null对象。

全部插入PriorityBlockingQueue的对象必须实现 java.lang.Comparable接口,队列优先级的排序规则就是按照咱们对这个接口的实现来定义的。

另外,咱们能够从PriorityBlockingQueue得到一个迭代器Iterator,但这个迭代器并不保证按照优先级顺序进行迭代。

4)、SynchronousQueue

SynchronousQueue队列内部仅容许容纳一个元素。当一个线程插入一个元素后会被阻塞,除非这个元素被另外一个线程消费。

5)、使用BlockingQueue模拟生产者与消费者

class ProducerThread implements Runnable {
    private BlockingQueue<String> blockingQueue;
    private AtomicInteger count = new AtomicInteger();
    private volatile boolean FLAG = true;

    public ProducerThread(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "生产者开始启动....");
        while (FLAG) {
            String data = count.incrementAndGet() + "";
            try {
                boolean offer = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
                if (offer) {
                    System.out.println(Thread.currentThread().getName() + ",生产队列" + data + "成功..");
                } else {
                    System.out.println(Thread.currentThread().getName() + ",生产队列" + data + "失败..");
                }
                Thread.sleep(1000);
            } catch (Exception e) {

            }
        }
        System.out.println(Thread.currentThread().getName() + ",生产者线程中止...");
    }

    public void stop() {
        this.FLAG = false;
    }

}

class ConsumerThread implements Runnable {
    private volatile boolean FLAG = true;
    private BlockingQueue<String> blockingQueue;

    public ConsumerThread(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "消费者开始启动....");
        while (FLAG) {
            try {
                String data = blockingQueue.poll(2, TimeUnit.SECONDS);
                if (data == null || data == "") {
                    FLAG = false;
                    System.out.println("消费者超过2秒时间未获取到消息.");
                    return;
                }
                System.out.println("消费者获取到队列信息成功,data:" + data);

            } catch (Exception e) {
                // TODO: handle exception
            }
        }
    }

}

public class Test0008 {

    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3);
        ProducerThread producerThread = new ProducerThread(blockingQueue);
        ConsumerThread consumerThread = new ConsumerThread(blockingQueue);
        Thread t1 = new Thread(producerThread);
        Thread t2 = new Thread(consumerThread);
        t1.start();
        t2.start();
        //10秒后 中止线程..
        try {
            Thread.sleep(10*1000);
            producerThread.stop();
        } catch (Exception e) {
            // TODO: handle exception
        }
    }

}
复制代码

  1. ArrayDeque, (数组双端队列) 
  2. PriorityQueue, (优先级队列) 
  3. ConcurrentLinkedQueue, (基于链表的并发队列) 
  4. DelayQueue, (延期阻塞队列)(阻塞队列实现了BlockingQueue接口) 
  5. ArrayBlockingQueue, 经常使用(基于数组的并发阻塞队列) 
  6. LinkedBlockingQueue, 经常使用(基于链表的FIFO阻塞队列) 
  7. LinkedBlockingDeque, (基于链表的FIFO双端阻塞队列) 
  8. PriorityBlockingQueue,经常使用 (带优先级的无界阻塞队列,) 
  9. SynchronousQueue经常使用 (并发同步阻塞队列)

本文由博客一文多发平台 OpenWrite 发布!

相关文章
相关标签/搜索