java并发编程之并发容器和框架(三)

Java中的阻塞队列

1 什么是阻塞队列

阻塞队列(BlockingQueue)是一种支持两个附加操做的队列.java

  • 当队列满时,队列会阻塞存储元素的线程,直到队列有可用空间
  • 在队列为空时,获取元素的线程会等待队列变为非空

阻塞队列经常使用于生产者和消费者的场景,生产者是向队列里存储元素的线程,消费者是从队列里获取元素的线程.阻塞队列就是生产者存储元素、消费者获取元素的容器.node

在阻塞队列不可用时,这两个附加操做提供了4种处理方式
这里写图片描述数组

  • 抛出异常
    • 当队列满时,若是再往队列里插入元素,会抛出IllegalStateException(“Queuefull”)异常
    • 当队列空时,从队列里获取元素会抛出NoSuchElementException异常
  • 返回特殊值
    • 当往队列插入元素时,会返回元素是否插入成功,成功则返回true
    • 如果移除方法,则是从队列里取出一个元素,若没有则返回null
  • 一直阻塞
    • 当阻塞队列满时,若是生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列有可用空间或响应中断退出
    • 当队列空时,若消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列非空
  • 超时退出
    • 当阻塞队列满时,若生产者线程往队列里插入元素,队列会阻塞生产者线程
      一段时间,若超过指定的时间,生产者线程就会退出

注意 如果无界阻塞队列,队列不会出现满的状况,因此使用put或offer方法永远不会被阻塞,使用offer方法时,永远返回true缓存

BlockingQueue 不接受 null 元素。试图 add、put 或 offer null 元素时,会抛出 NullPointerException。null 被用做指示 poll 操做失败的警惕值(没法经过编译)。 安全

BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持 Collection 接口。所以,举例来讲,使用 remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操做一般表现并不高效,只能有计划地偶尔使用,好比在取消排队信息时。markdown

BlockingQueue 实现是线程安全的。全部排队方法均可以使用内置锁或其余形式的并发控制来自动达到它们的目的。然而,大量的Collection 操做(addAll、containsAll、retainAll 和 removeAll)没有必要自动执行,除非在实现中特别说明。所以,举例来讲,在只添加 c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。并发

BlockingQueue 实质上不支持使用任何一种“close”或“shutdown”操做来指示再也不添加任何项。这种功能的需求和使用有依赖于实现的倾向。例如,一种经常使用的策略是:对于生产者,插入特殊的 end-of-stream 或 poison 对象,并根据使用者获取这些对象的时间来对它们进行解释。 ui

2 Java里的阻塞队列

至JDK8,Java提供了7个阻塞队列
·ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
·LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
·PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
·DelayQueue:一个使用优先级队列实现的无界阻塞队列。
·SynchronousQueue:一个不存储元素的阻塞队列。
·LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
·LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。this

2.1 ArrayBlockingQueue

是一个用数组实现的有界阻塞队列.此队列按FIFO的原则对元素进行排序spa

这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和消费者获取的元素.一旦建立了这样的缓存区,就不能再增长其容量.试图向已满队列中放入元素会致使操做受阻塞;试图从空队列中提取元素将致使相似阻塞.

此类支持对等待的生产者和消费者线程进行排序的可选的公平策略.默认状况下,不保证这种机制.然而,经过将公平性设置为 true 而构造的队列容许按 FIFO 顺序访问线程。公平性一般会下降吞吐量,但也减小了可变性和避免了“不平衡性”.

所谓公平访问队列是指阻塞的线程,能够按照阻塞的前后顺序访问队列,即先阻塞的线程先访问队列.
非公平性是对先等待的线程是非公平的,当队列有可用空间时,阻塞的线程均可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列.
为保证公平性,一般会下降吞吐量.咱们可使用如下代码建立一个公平的阻塞队列:

ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);

访问者的公平性是使用可重入锁实现的,代码以下:

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
    }

3 阻塞队列的实现原理

若队列为空,消费者会一直等待,当生产者添加元素时,消费者是如何知道当前队列有元素的呢?让咱们看看JDK是如何实现的。
使用通知模式实现。所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。经过查看源码发现ArrayBlockingQueue使用了Condition来实现,代码以下。

private final Condition notFull;
    private final Condition notEmpty;

    public ArrayBlockingQueue(int capacity, boolean fair) {
        // 省略其余代码
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

     private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

当往队列里插入一个元素时,若是队列不可用,那么阻塞生产者主要经过
LockSupport.park(this)来实现。

public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

继续进入源码,发现调用setBlocker先保存一下将要阻塞的线程,而后调用unsafe.park阻塞当前线程。

public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        unsafe.park(false, 0L);
        setBlocker(t, null);
    }

unsafe.park是个native方法,代码以下。

public native void park(boolean isAbsolute, long time);

park这个方法会阻塞当前线程,只有如下4种状况中的一种发生时,该方法才会返回。

  • 与park对应的unpark执行或已经执行时。“已经执行”是指unpark先执行,而后再执行park的状况。
  • 线程被中断时。
  • 等待完time参数指定的毫秒数时。
  • 异常现象发生时,这个异常现象没有任何缘由。
相关文章
相关标签/搜索