阻塞队列之六:LinkedBlockingDeque

1、LinkedBlockingDeque简介

  java6增长了两种容器类型,Deque和BlockingDeque,它们分别对Queue和BlockingQueue进行了扩展。
  Deque是一个双端队列,deque(双端队列) 是 "Double Ended Queue" 的缩写。所以,双端队列是一个你能够从任意一端插入或者抽取元素的队列。实现了在队列头和队列尾的高效插入和移除。
  BlockingDeque 类是一个双端队列,在不可以插入元素时,它将阻塞住试图插入元素的线程;在不可以抽取元素时,它将阻塞住试图抽取的线程。
  正如阻塞队列使用与生产者-消费者模式,双端队列一样适用于另外一种相关模式,即工做密取。在生产者-消费者设计中,全部消费者有一个共享的工做队列,而在工做密取设计中,每一个消费者都有各自的双端队列。若是一个消费者完成了本身双端队列中的所有工做,那么它能够从其它消费者双端队列末尾秘密地获取工做。密取工做模式比传统的生产者-消费者模式具备更高的可伸缩性,这是由于工做者线程不会在单个共享的任务队列上发生竞争。在大多数时候,它们都只是访问本身的双端队列,从而极大地减小了竞争。当工做者线程须要访问另外一个队列时,它会从队列的尾部而不是头部获取工做,所以进一步下降了队列上的竞争程度。html


LinkedBlockingDeque是双向链表实现的双向并发阻塞队列。该阻塞队列同时支持FIFO和FILO两种操做方式,便可以从队列的头和尾同时操做(插入/删除);而且,该阻塞队列是支持线程安全。
此外,LinkedBlockingDeque仍是可选容量的(防止过分膨胀),便可以指定队列的容量。若是不指定,默认容量大小等于Integer.MAX_VALUE。java

 

BlockingDeque 的使用

在线程既是一个队列的生产者又是这个队列的消费者的时候可使用到 BlockingDeque。若是生产者线程须要在队列的两端均可以插入数据,消费者线程须要在队列的两端均可以移除数据,这个时候也可使用 BlockingDeque。BlockingDeque 图解:node

一个 BlockingDeque - 线程在双端队列的两端均可以插入和提取元素。
一个线程生产元素,并把它们插入到队列的任意一端。若是双端队列已满,插入线程将被阻塞,直到一个移除线程从该队列中移出了一个元素。若是双端队列为空,移除线程将被阻塞,直到一个插入线程向该队列插入了一个新元素。git

BlockingDeque 的方法

BlockingDeque 具备 4 组不一样的方法用于插入、移除以及对双端队列中的元素进行检查。若是请求的操做不能获得当即执行的话,每一个方法的表现也不一样。这些方法以下:数组

  抛异常 特定值 阻塞 超时
插入 addFirst(o) offerFirst(o) putFirst(o) offerFirst(o, timeout, timeunit)
移除 removeFirst(o) pollFirst(o) takeFirst(o) pollFirst(timeout, timeunit)
检查 getFirst(o) peekFirst(o)    

 

  抛异常 特定值 阻塞 超时
插入 addLast(o) offerLast(o) putLast(o) offerLast(o, timeout, timeunit)
移除 removeLast(o) pollLast(o) takeLast(o) pollLast(timeout, timeunit)
检查 getLast(o) peekLast(o)    


四组不一样的行为方式解释:安全

  1. 抛异常:若是试图的操做没法当即执行,抛一个异常。
  2. 特定值:若是试图的操做没法当即执行,返回一个特定的值(经常是 true / false)。
  3. 阻塞:若是试图的操做没法当即执行,该方法调用将会发生阻塞,直到可以执行。
  4. 超时:若是试图的操做没法当即执行,该方法调用将会发生阻塞,直到可以执行,但等待时间不会超过给定值。返回一个特定值以告知该操做是否成功(典型的是 true / false)。

BlockingDeque 继承自 BlockingQueue

BlockingDeque 接口继承自 BlockingQueue 接口。这就意味着你能够像使用一个 BlockingQueue 那样使用 BlockingDeque。若是你这么干的话,各类插入方法将会把新元素添加到双端队列的尾端,而移除方法将会把双端队列的首端的元素移除。正如 BlockingQueue 接口的插入和移除方法同样。
如下是 BlockingDeque 对 BlockingQueue 接口的方法的具体内部实现:数据结构

 

 

BlockingQueue BlockingDeque
add() addLast()
offer() x 2 offerLast() x 2
put() putLast()
   
remove() removeFirst()
poll() x 2 pollFirst()
take() takeFirst()
   
element() getFirst()
peek() peekFirst()

 

2、LinkedBlockingDeque源码分析

2.一、LinkedBlockingDeque的lock

LinkedBlockingDeque的原理就是使用一个可重入锁和这个锁生成的两个条件对象进行并发控制(classic two-condition algorithm)。LinkedBlockingDeque是一个带有长度的阻塞队列,初始化的时候能够指定队列长度(若是不指定就是Integer.MAX_VALUE),且指定长度以后不容许进行修改。多线程

    /** Main lock guarding all access */
    final ReentrantLock lock = new ReentrantLock();

    /** Condition for waiting takes */
    private final Condition notEmpty = lock.newCondition();

    /** Condition for waiting puts */
    private final Condition notFull = lock.newCondition();

2.二、数据结构

双向链表并发

/** 双向链表节点 */  
static final class Node<E> {  
    /** 
     * 元素值 
     */  
    E item;  
  
    /** 
     * 节点前驱 
     * 1.指向前驱;2.指向this,说明前驱是尾节点,看unlinklast;3.指向null说明没有前驱 
     */  
    Node<E> prev;  
  
    /** 
     * 节点后继 
     * 1.指向后继;2.指向this,说明后继是头结点,看unlinkfirst;3.指向null说明没有后继 
     */  
    Node<E> next;  
  
    Node(E x) {  
        item = x;  
    }  
}  

 

2.三、成员变量

    //first是双向链表的表头
    transient Node<E> first;

    //last是双向链表的表尾
    transient Node<E> last;

    //count是LinkedBlockingDeque的实际大小,即双向链表中当前节点个数
    private transient int count;

    //是LinkedBlockingDeque的容量,它是在建立LinkedBlockingDeque时指定的
    private final int capacity;

 

2.四、构造函数

    public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }
    
    public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }

    public LinkedBlockingDeque(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock lock = this.lock;
        lock.lock(); // Never contended, but necessary for visibility
        try {
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (!linkLast(new Node<E>(e)))
                    throw new IllegalStateException("Deque full");
            }
        } finally {
            lock.unlock();
        }
    }

 

2.五、入队

addFirst,addLast分别调用offerFirst,offerLast,而offerFirst,offerLast和putFirst,putLast都是调用了linkFirst和linkLast。框架

    public void addFirst(E e) {
        if (!offerFirst(e))
            throw new IllegalStateException("Deque full");
    }
    
    public void addLast(E e) {
        if (!offerLast(e))
            throw new IllegalStateException("Deque full");
    }
    
    public boolean offerFirst(E e) {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return linkFirst(node);
        } finally {
            lock.unlock();
        }
    }
    
    public boolean offerLast(E e) {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return linkLast(node);
        } finally {
            lock.unlock();
        }
    }
    
    public void putFirst(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (!linkFirst(node))
                notFull.await();
        } finally {
            lock.unlock();
        }
    }
    
    public void putLast(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (!linkLast(node))
                notFull.await();
        } finally {
            lock.unlock();
        }
    }

linkFirst和linkLast

/** 
 * 设置node为链表头节点,链表满时为false 
 */  
private boolean linkFirst(Node<E> node) {  
    // assert lock.isHeldByCurrentThread();  
    if (count >= capacity) //超过容量false  
        return false;  
    Node<E> f = first;  
    node.next = f; //新节点的next指向原first  
    first = node; //设置node为新的first  
    if (last == null) //没有尾节点,就将node设置成尾节点  
        last = node;  
    else  
        f.prev = node; //有尾节点,那就将以前first的pre指向新增node  
    ++count; //累加节点数量  
    notEmpty.signal(); //有新节点入队,通知非空条件队列  
    return true;  
}  
  
/** 
 * 设置node为链表尾节点,链表满时为false 
 */  
private boolean linkLast(Node<E> node) {  
    // assert lock.isHeldByCurrentThread();  
    if (count >= capacity)  
        return false;  
    Node<E> l = last;  
    node.prev = l;  
    last = node;  
    if (first == null) //为null,说明以前队列空吧,那就first也指向node  
        first = node;  
    else  
        l.next = node; //非null,说明以前的last有值,就将以前的last的next指向node  
    ++count;  
    notEmpty.signal();  
    return true;  
}  

2.六、出队

    public E removeFirst() {
        E x = pollFirst();
        if (x == null) throw new NoSuchElementException();
        return x;
    }
    
    public E removeLast() {
        E x = pollLast();
        if (x == null) throw new NoSuchElementException();
        return x;
    }
    
    public E pollFirst() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return unlinkFirst();
        } finally {
            lock.unlock();
        }
    }
    
    public E pollLast() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return unlinkLast();
        } finally {
            lock.unlock();
        }
    }
    
    public E takeFirst() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            while ( (x = unlinkFirst()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }
    
    public E takeLast() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            while ( (x = unlinkLast()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }
    

核心方法

/** 
 * 移除头结点,链表空返回null 
 */  
private E unlinkFirst() {  
    // assert lock.isHeldByCurrentThread();  
    Node<E> f = first;  
    if (f == null)  
        return null; //空返回null  
    Node<E> n = f.next;  
    E item = f.item;  
    f.item = null;  
    f.next = f; // help GC  
    first = n;  
    if (n == null) //说明以前应该只有一个节点,移除头结点后,链表空,如今first和last都指向null了  
        last = null;  
    else  
        n.prev = null; //不然的话,n的pre原来指向以前的first,如今n变为first了,pre指向null  
    --count;  
    notFull.signal(); //通知非满条件队列  
    return item;  
}  
  
/** 
 * 移除尾结点,链表空返回null 
 */  
private E unlinkLast() {  
    // assert lock.isHeldByCurrentThread();  
    Node<E> l = last;  
    if (l == null)  
        return null;  
    Node<E> p = l.prev;  
    E item = l.item;  
    l.item = null;  
    l.prev = l; // help GC  
    last = p;  
    if (p == null)  
        first = null;  
    else  
        p.next = null;  
    --count;  
    notFull.signal();  
    return item;  
}  
  
/** 
 * 移除指定节点:p--》x--》n 
 */  
void unlink(Node<E> x) {  
    // assert lock.isHeldByCurrentThread();  
    Node<E> p = x.prev;  
    Node<E> n = x.next;   
    if (p == null) { //prev为null说明x节点为头结点  
        unlinkFirst();  
    } else if (n == null) {  
        unlinkLast(); //nex为null说明待清除节点为尾节点  
    } else { //不然的话节点处于链表中间  
        p.next = n; //将p和n互链  
        n.prev = p;  
        x.item = null;  
        // 没有断开x节点连接,可能有其余线程在迭代链表  
        --count;  
        notFull.signal();  
    }  
} 

2.七、peek方法

    public E peek() {
        return peekFirst();
    }
    
    public E peekFirst() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (first == null) ? null : first.item;
        } finally {
            lock.unlock();
        }
    }
    //peekLast就不贴了

2.八、size方法

    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }

 

3、JDK或开源框架中使用

4、使用示例


java.util.ArrayDeque 类提供了可调整大小的阵列,并实现了Deque接口。如下是关于阵列双端队列的要点:
Java.util.ArrayDeque

  • 数组双端队列没有容量限制,使他们增加为必要支持使用。

  • 它们不是线程安全的;若是没有外部同步。

  • 不支持多线程并发访问。

  • null元素被禁止使用在数组deques。

  • 它们要比堆栈Stack和LinkedList快。

此类及其迭代器实现Collection和Iteratorinterfaces方法可选。

类的声明

如下是java.util.ArrayDeque类的声明:

public class ArrayDeque<E> extends AbstractCollection<E> implements Deque<E>, Cloneable, Serializable

这里<E>表明一个元素,它能够是任何类。例如,若是你正在构建一个整数数组列表,那么初始化可为

ArrayList<Integer> list = new ArrayList<Integer>();

 

 

S.N. 方法 & 描述
1 boolean add(E e) 
此方法将添加指定的元素,在此deque队列的末尾。
2 void addFirst(E e) 
此方法将添加指定的元素,在此deque队列的前面。
3 void addLast(E e) 
此方法将插入指定的元素,在此deque队列的末尾。
4 void clear() 
此方法移除此deque队列的元素。
5 ArrayDeque<E> clone() 
此方法返回此deque队列的副本。
6 boolean contains(Object o) 
若是此deque 队列包含指定的元素,此方法返回true。
7 Iterator<E> descendingIterator() 
此方法返回一个迭代器在此deque队列以逆向顺序的元素。
8 E element() 
此方法检索,可是不移除此deque队列表示的队列的头部。
9 E getFirst()
此方法检索,可是不移除此deque队列的第一个元素。
10 E getLast() 
此方法检索,可是不移除此deque队列的最后一个元素。
11 boolean isEmpty() 
若是此deque队列不包含元素,此方法返回true。
12 Iterator<E> iterator() 
此方法返回一个迭代器在此deque队列的元素。
13 boolean offer(E e)
此方法将指定的元素,在此deque队列的末尾。
14 boolean offerFirst(E e) 
此方法将指定的元素,在此deque队列的前面。
15 boolean offerLast(E e) 
此方法将指定的元素,在此deque队列的末尾。
16 E peek() 
此方法检索,可是不移除此deque队列表示的队列的头部,若是此deque队列为空,则返回null。
17 E peekFirst() 
此方法检索,可是不移除此deque 队列的第一个元素,或者若是此deque 队列为空,则返回null。
18 E peekLast() 
此方法检索,可是不移除此deque队列的最后一个元素,若是此deque队列为空,则返回null。
19 E poll() 
此方法检索并移除此deque队列表示的队列的头部,若是此deque队列为空,则返回null。
20 E pollFirst() 
此方法检索并移除此deque队列的第一个元素,或者若是此deque队列为空,则返回null。
21 E pollLast() 
此方法检索并移除此deque队列的最后一个元素,若是此deque队列为空,则返回null。
22 E pop() 
这种方法的此deque队列所表示的堆栈弹出一个元素。
23 void push(E e) 
这种方法将元素推入此deque队列所表示的堆栈。
24 E remove() 
此方法检索并移除此deque队列表示的队列的头部。
25 boolean remove(Object o) 
此方法今后deque队列中移除指定元素的单个实例。
26 E removeFirst() 
此方法检索并移除此deque队列的第一个元素。
27 boolean removeFirstOccurrence(Object o) 
此方法移除此deque队列的指定元素的第一个匹配。
28 E removeLast() 
此方法检索并移除此deque队列的最后一个元素。
29 boolean removeLastOccurrence(Object o) 
此方法移除此deque队列的指定元素的最后一次出现。
30 int size() 
此方法返回在此deque队列的元素个数。
31 object[] toArray() 这个方法返回一个包含全部在此deque队列在适当的序列中元素的数组。
相关文章
相关标签/搜索