多线程-阻塞队列

1.阻塞队列

阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素

 

 

阻塞队列提供了四种处理方法:

方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用

异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。

  • 返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null

  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。

  • 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

成员方法

队列 有界性 数据结构
ArrayBlockingQueue bounded(有界) 加锁 arrayList
LinkedBlockingQueue optionally-bounded 加锁 linkedList
PriorityBlockingQueue unbounded 加锁 heap
DelayQueue unbounded 加锁 heap
SynchronousQueue bounded 加锁
LinkedTransferQueue unbounded 加锁 heap
LinkedBlockingDeque unbounded 无锁 heap

下面分别简单介绍一下:

  • ArrayBlockingQueue:是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。【注:每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先获取锁的线程的请求一定先被满足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是当前等待时间最长的线程先获取锁】

  • LinkedBlockingQueue:一个由链表结构组成的有界队列,此队列的长度为Integer.MAX_VALUE。此队列按照先进先出的顺序进行排序。

  • PriorityBlockingQueue: 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。

  • DelayQueue: 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。(DelayQueue可以运用在以下应用场景:1.缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。2.定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。)

  • SynchronousQueue: 一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

  • LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。

  • LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。

ArrayBlockingQueue

package demo.queue;
​
import java.util.concurrent.ArrayBlockingQueue;
​
public class BlockingQueue1 {
​
    public static void main(String[] args) {
​
        ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(3);//给定初始容量
        // add/remove 抛出异常
        arrayBlockingQueue.add("a");
        arrayBlockingQueue.add("b");
        arrayBlockingQueue.add("c");
​
        //容量为3 当我们加入第四个值时则会抛出异常
        arrayBlockingQueue.add("d");
​
    }
​
}

输出:

 

package demo.queue;
​
import java.util.concurrent.ArrayBlockingQueue;
​
public class BlockingQueue1 {
​
    public static void main(String[] args) {
​
        ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(3);//给定初始容量
        // add/remove 抛出异常
        arrayBlockingQueue.add("a");
        arrayBlockingQueue.add("b");
        arrayBlockingQueue.add("c");
​
        //容量为3 当我们加入第四个值时则会抛出异常
        //arrayBlockingQueue.add("d");
​
        //remove
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
        //当队列没有元素时抛出异常
        System.out.println(arrayBlockingQueue.remove());
​
    }
​
}

输出:

 

add方法和offer方法最终调用的是enqueue(E x)方法 看源码可知

SynchronousQueue

SynchronousQueue , 只有一个容量!

每一个put操作,就需要有一个 take操作!

package demo.queue;
​
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
​
public class BlockingQueue2 {
​
    public static void main(String[] args) {
​
        SynchronousQueue<String> arrayBlockingQueue = new SynchronousQueue<>();
​
        // A 存
        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName() + "put a");
                arrayBlockingQueue.put("a");
                System.out.println(Thread.currentThread().getName() + "put b");
                arrayBlockingQueue.put("b");
                System.out.println(Thread.currentThread().getName() + "put c");
                arrayBlockingQueue.put("c");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"A:").start();
​
        // B 取
        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + arrayBlockingQueue.take());
​
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + arrayBlockingQueue.take());
​
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + arrayBlockingQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"B:").start();
​
​
​
​
    }
​
}

输出: