BlockingQueue深刻分析

1.BlockingQueue定义的经常使用方法以下java

  抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
检查 element() peek() 不可用 不可用

 

1)add(anObject):把anObject加到BlockingQueue里,即若是BlockingQueue能够容纳,则返回true,不然招聘异常数组

2)offer(anObject):表示若是可能的话,将anObject加到BlockingQueue里,即若是BlockingQueue能够容纳,则返回true,不然返回false.安全

3)put(anObject):把anObject加到BlockingQueue里,若是BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.数据结构

4)poll(time):取走BlockingQueue里排在首位的对象,若不能当即取出,则能够等time参数规定的时间,取不到时返回null并发

5)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止函数

其中:BlockingQueue 不接受null 元素。试图add、put 或offer 一个null 元素时,某些实现会抛出NullPointerException。null 被用做指示poll 操做失败的警惕值。 性能

 

二、BlockingQueue的几个注意点

【1】BlockingQueue 能够是限定容量的。它在任意给定时间均可以有一个remainingCapacity,超出此容量,便没法无阻塞地put 附加元素。没有任何内部容量约束的BlockingQueue 老是报告Integer.MAX_VALUE 的剩余容量。this

【2】BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持Collection 接口。所以,举例来讲,使用remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操做一般 会有效执行,只能有计划地偶尔使用,好比在取消排队信息时。spa

【3】BlockingQueue 实现是线程安全的。全部排队方法均可以使用内部锁或其余形式的并发控制来自动达到它们的目的。然而,大量的 Collection 操做(addAll、containsAll、retainAll 和removeAll)没有 必要自动执行,除非在实现中特别说明。所以,举例来讲,在只添加了c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。.net

【4】BlockingQueue 实质上不支持使用任何一种“close”或“shutdown”操做来指示再也不添加任何项。这种功能的需求和使用有依赖于实现的倾向。例如,一种经常使用的策略是:对于生产者,插入特殊的end-of-stream 或poison 对象,并根据使用者获取这些对象的时间来对它们进行解释。

 

三、简要概述BlockingQueue经常使用的四个实现类

 

 

1)ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的.

2)LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的

3)PriorityBlockingQueue:相似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的天然排序顺序或者是构造函数的Comparator决定的顺序.

4)SynchronousQueue:特殊的BlockingQueue,对其的操做必须是放和取交替完成的.

    

其中LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不同,致使LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue.  

 

下面主要看一下ArrayBlockingQueue的源码:


 

public boolean offer(E e) {      
        if (e == null) throw new NullPointerException();      
        final ReentrantLock lock = this.lock;//每一个对象对应一个显示的锁      
        lock.lock();//请求锁直到得到锁(不能够被interrupte)      
        try {      
            if (count == items.length)//若是队列已经满了      
                return false;      
            else {      
                insert(e);      
                return true;      
            }      
        } finally {      
            lock.unlock();//      
        }      
}      
看insert方法:      
private void insert(E x) {      
        items[putIndex] = x;      
        //增长全局index的值。      
        /*    
        Inc方法体内部:    
        final int inc(int i) {    
        return (++i == items.length)? 0 : i;    
            }    
        这里能够看出ArrayBlockingQueue采用从前到后向内部数组插入的方式插入新元素的。若是插完了,putIndex可能从新变为0(在已经执行了移除操做的前提下,不然在以前的判断中队列为满)    
        */     
        putIndex = inc(putIndex);       
        ++count;      
        notEmpty.signal();//wake up one waiting thread      
}
public void put(E e) throws InterruptedException {      
        if (e == null) throw new NullPointerException();      
        final E[] items = this.items;      
        final ReentrantLock lock = this.lock;      
        lock.lockInterruptibly();//请求锁直到获得锁或者变为interrupted      
        try {      
            try {      
                while (count == items.length)//若是满了,当前线程进入noFull对应的等waiting状态      
                    notFull.await();      
            } catch (InterruptedException ie) {      
                notFull.signal(); // propagate to non-interrupted thread      
                throw ie;      
            }      
            insert(e);      
        } finally {      
            lock.unlock();      
        }      
}
public boolean offer(E e, long timeout, TimeUnit unit)      
        throws InterruptedException {      
     
        if (e == null) throw new NullPointerException();      
    long nanos = unit.toNanos(timeout);      
        final ReentrantLock lock = this.lock;      
        lock.lockInterruptibly();      
        try {      
            for (;;) {      
                if (count != items.length) {      
                    insert(e);      
                    return true;      
                }      
                if (nanos <= 0)      
                    return false;      
                try {      
                //若是没有被 signal/interruptes,须要等待nanos时间才返回      
                    nanos = notFull.awaitNanos(nanos);      
                } catch (InterruptedException ie) {      
                    notFull.signal(); // propagate to non-interrupted thread      
                    throw ie;      
                }      
            }      
        } finally {      
            lock.unlock();      
        }      
    }
public boolean add(E e) {      
    return super.add(e);      
}      
父类:      
public boolean add(E e) {      
        if (offer(e))      
            return true;      
        else     
            throw new IllegalStateException("Queue full");      
    }

该类中有几个实例变量:takeIndex/putIndex/count

用三个数字来维护这个队列中的数据变动:      
    /** items index for next take, poll or remove */     
    private int takeIndex;      
    /** items index for next put, offer, or add. */     
    private int putIndex;      
    /** Number of items in the queue */     
    private int count;
相关文章
相关标签/搜索