Java阻塞队列

💛原文地址为http://www.javashuo.com/article/p-ktssqkkn-gq.html,转载请注明出处!html

什么是阻塞队列

原文地址为,转载请注明出处!
阻塞队列是一个支持阻塞的插入和移除的队列。java

  • 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
  • 支持阻塞的移除方法:意思是队列为空时,获取元素(同时移除元素)的线程会被阻塞,等到队列变为非空。

阻塞队列用法

阻塞队列经常使用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里获取元素的线程。数组

当阻塞队列不可用时,会有四种相应的处理方式:this

处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入操做 add(e) offer(e) put(e) offer(e, time, unit)
移除操做 remove() poll() take() poll(time, unit)
获取操做 element() peek() 不可用 不可用
  • 返回特殊值:插入元素时,会返回是否插入成功,成功返回true。若是是移除方法,则是从队列中取出一个元素,没有则返回null。
  • 一直阻塞:当阻塞队列满时,若是生产者线程往队列里面put元素,则生产者线程会被阻塞,知道队列不满或者响应中断退出。当队列为空时,若是消费者线程从队列里take元素。
  • 超时退出:当阻塞队列满时,若是生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,若是超过了指定时间,生产者线程就会退出。

若是是无界阻塞队列,队列则不会出现满的状况。线程

阻塞队列

  • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列code

  • PriorityBlockingQueue:一个支持优先级排序无界阻塞队列
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列
  • SynchronousQueue:一个不存储元素的阻塞队列
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列
    • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列

1.ArrayBlockingQueue

此队列按照先进先出(FIFO)的原则对元素进行排序htm

默认状况下不保证线程公平地访问队列(所谓公平是指当队列可用时,先被阻塞的线程先访问队列)blog

为了保证公平性一般会下降吞吐量。排序

公平锁是利用了可重入锁的公平锁来实现。索引

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

2.ArrayBlockingQueue

此队列按照先进先出(FIFO)的原则对元素进行排序

默认长度为Integer.MAX_VALUE

3.PriorityBlockingQueue

默认状况下元素采起天然顺序升序排列

能够自定义Comparator或者自定义类实现compareTo()方法来指定排序规则

不支持同优先级元素排序

4.DelayQueue

队列使用PriorityQueue来实现,队列中的元素必须实现Delayed接口

只有在延时期满才能从队列中提取元素

阻塞队列原理

若是队列是空的,消费者会一直等待,当生产者添加元素时,消费者是如何知道当前队列有元素的呢?

使用通知模式实现。所谓通知模式,就是当生产者往满的队列添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。

ArrayBlockingQueue为例子

/** items 存放队列中的元素*/
    final Object[] items;

    /** take 操做的索引 */
    int takeIndex;

    /** put 操做的索引 */
    int putIndex;

    /** 队列中元素个数 */
    int count;

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** 控制生产者 takes 操做的 Condition */
    private final Condition notEmpty;

    /** 控制消费者 put 操做的 Condition */
    private final Condition notFull;

put操做

public void put(E e) throws InterruptedException {
        checkNotNull(e); //判断 e == null
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); //获取锁,与lock不一样,能够尝试中断阻塞
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

入队操做,入队以后唤醒消费者线程。

private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

notFull.await();中其实调用了park方法,先使用setBlocker保存一下将要阻塞的线程,而后调用本地方法UNSAFE.park(boolean isAbsolute, long time)进行阻塞。

public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }
相关文章
相关标签/搜索