死磕java concurrent包系列(五)基于AQS的条件队列把LinkedBlockingQueue“扒光”

LinkedBlockingQueue的基础

LinkedBlockingQueue是一个基于链表的阻塞队列,实际使用上与ArrayBlockingQueue彻底同样,咱们只须要把以前烤鸡的例子中的Queue对象替换一下便可。若是对于ArrayBlockingQueue不熟悉,能够去看看https://juejin.im/post/5c0f79f3f265da61561f1becjava

LinkedBlockingQueue源码分析

源码在node上注释写明了,它是基于一个“two lock queue”算法实现的,感兴趣的同窗能够参考这篇paper:www.cs.rochester.edu/u/scott/pap… 这篇文章为了提高在多处理器的机器上的更好性能的并发而提出了这个算法,其中心思想是:经过两把锁分别控制并发,入队时:只须要锁Tail Node,出队时,只须要锁Head Node。 回到LinkedBlockingQueue,先看看内部成员变量:node

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = -6903933977591709194L;

    /**
     * Linked list node class
     */
    static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }

    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
复制代码

每一个添加到LinkedBlockingQueue队列中的数据都将被封装成Node节点(这个node不一样于AQS中的node,它是一个单向链表),其中head和last分别指向队列的头结点和尾结点。与ArrayBlockingQueue不一样的是,LinkedBlockingQueue内部分别使用了takeLock 和 putLock 对并发进行控制,也就是说,添加和删除操做并非互斥操做,能够同时进行,这样也就能够大大提升吞吐量。这里再次强调若是没有给LinkedBlockingQueue指定容量大小,其默认值将是Integer.MAX_VALUE,若是存在添加速度大于删除速度时候,有可能会内存溢出,这点在使用前但愿慎重考虑。至于LinkedBlockingQueue的实现原理图与ArrayBlockingQueue是相似的,除了对添加和移除方法使用单独的锁控制外,二者都使用了不一样的Condition条件对象做为等待队列,用于挂起take线程和put线程。 总结以下图: 算法

image.png

LinkedBlockingQueue的阻塞添加

一样的,添加的方法主要有:add offer 和put。咱们先看看非阻塞添加的add和offer方法,这两个方法的区别一样是添加失败时,add方法是抛异常,offer方法是返回false数组

public boolean add(E e) {
     if (offer(e))
         return true;
     else
         throw new IllegalStateException("Queue full");
}
复制代码
public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        //由于存在并发操做移出和入队互不冲突,与arrayBlockingQueue不一样,count被声明为Atomic
        final AtomicInteger count = this.count;
        //队列满了直接返回
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            //由于存在并发问题,加锁以后再次判断一下队列有没有满
            if (count.get() < capacity) {
                //入队
                enqueue(node);
                //容量+1返回旧值
                c = count.getAndIncrement();
                //由于在入队时可能同时有出队的线程同时把元素移除,因此在入队后作一个补偿,
                //若是队列还有空间,那么唤醒一个如归的线程执行添加操做
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        //c==0,只有可能最开始就是一个空队列(注意上面的c返回的是旧值)此时由于恰好添加了一个元素,
        //因此唤醒消费的线程去取移出元素
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
复制代码
//入队操做
private void enqueue(Node<E> node) {
     //队列尾节点指向新的node节点
     last = last.next = node;
}

//signalNotEmpty方法去唤醒移出元素的线程,为何要先获取锁才能signal呢?不懂的同窗回去看看AQS:
//由于条件队列是基于AQS的锁存在的,用法上必需要这么用,不然会抛出异常
private void signalNotEmpty() {
      final ReentrantLock takeLock = this.takeLock;
      takeLock.lock();
          //唤醒获取并删除元素的线程
          notEmpty.signal();
      } finally {
          takeLock.unlock();
      }
  }

复制代码

这里的Offer()方法作了两件事:bash

  • 第一件事是判断队列是否满,满了就直接释放锁,没满就将节点封装成Node入队,而后加锁后再次判断队列添加完成后是否已满,不满就继续唤醒等到在条件对象notFull上的添加线程。
  • 第二件事是,判断是否须要唤醒等到在notEmpty条件对象上的消费线程。

接下来看看put方法,与offer方法一模一样:并发

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //锁可被中断
        putLock.lockInterruptibly();
        try {
          //队列满时加入notFull条件队列
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            //队列尚未满时,继续唤醒添加线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        //c==0,只有可能最开始就是一个空队列(注意上面的c返回的是旧值)此时由于恰好添加了一个元素,
        //因此唤醒消费的线程去取移出元素
        if (c == 0)
            signalNotEmpty();
    }

复制代码

这里有几个问题:高并发

问题1:
为何添加完成后是继续唤醒在条件队列notFull上的添加线程而不是像ArrayBlockingQueue那样直接唤醒notEmpty条件对象上的消费线程?源码分析

分析1: 先回想一下ArrayBlockingQueue:它内部只有一个锁,在内部完成添加元素操做后直接唤醒消费线程去消费。若是ArrayBlockingQueue在添加元素以后再唤醒添加线程的话,消费的线程就可能一直被block,没法执行。 而为了不这种状况,对于LinkedBlockingQueue来讲,他有两个锁,添加和删除元素不是互斥的,添加的过程当中可能已经删除好几个元素了,因此他在设计上要尽量的去唤醒两个条件队列。 添加线程在队列没有满时本身直接唤醒本身的其余添加线程,若是没有等待的添加线程,直接结束了。若是有就直到队列元素已满才结束挂起。注意消费线程的执行过程也是如此。这也是为何LinkedBlockingQueue的吞吐量要相对大些的缘由。post

问题2: 为何if (c == 0)时才去唤醒消费线程呢性能

分析2: 什么状况下c等于0呢?c值是添加元素前队列的大小,也就是说,以前是空队列,空队列时会有什么状况呢,空队列会阻塞全部的take进程,将其封装到notEmpty的条件队列中。这个时候,c以前是0,如今在执行了enqueue方法后,队列中有元素了,因此他须要当即唤醒阻塞的take进程,不然阻塞的take进程就一直block在队列里,一直沉睡下去。 为何c>0时,就不会唤醒呢?由于take方法和put方法同样,take方法每次take完元素后,若是队列还有值,它会继续唤醒take队列,也就是说他只要没有被await()阻塞,他就会一直不断的唤醒take线程,而不须要再添加的时候再去唤醒,形成没必要要的性能浪费

LinkedBlockingQueue的阻塞移出

相对的,咱们再看看take方法:

public E take() throws InterruptedException {
        E x;
        int c = -1;
        //获取当前队列大小
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();//可中断
        try {
            //若是队列没有数据,当前take线程到条件队列中
            while (count.get() == 0) {
                notEmpty.await();
            }
            //若是存在数据直接删除并返回该数据
            x = dequeue();
            c = count.getAndDecrement();//队列大小减1,返回以前的值
            if (c > 1)
                notEmpty.signal();//还有数据就唤醒后续的消费线程
        } finally {
            takeLock.unlock();
        }
        //知足条件(以前队列是满的,如今刚刚执行dequeue拿出了一个),
        //唤醒条件对象上等待队列中的添加线程
        if (c == capacity)
            signalNotFull();
        return x;
    }

private E dequeue() {
        Node<E> h = head;//获取头结点
        Node<E> first = h.next; //获取头结的下一个节点(要删除的节点)
        h.next = h; // help GC//本身next指向本身,即被删除
        head = first;//更新头结点
        E x = first.item;//获取删除节点的值
        first.item = null;//清空数据,由于first变成头结点是不能带数据的,这样也就删除队列的带数据的第一个节点
        return x;
    }


复制代码

take方法是一个可阻塞可中断的移除方法,主要作了两件事:

  • 若是队列没有数据就挂起当前线程到 notEmpty条件对象的等待队列中一直等待,若是有数据就删除节点并返回数据项,同时唤醒后续消费线程;
  • 尝试唤醒条件对象notFull上等待队列中的添加线程:假设以前队列中满员了,那么新来的put进程将会被阻塞进notFull条件队列,而后await挂起沉睡。这个时候有线程经过take方法拿出了一个元素,若是此时不唤醒notFull条件队列,那么以前满员时队列中的线程就会一直睡死过去

总结

LinkedBlockingQueue的两个队列:

  • notFull条件队列(队列满时阻塞的put线程): await的时机:队列满了 signal的时机:一是put方法放入元素后,若是队列还有空位,会singal线程继续添加;二是若是队列最开始满员,take方法移出了一个元素后,队列还有一个空位时也会唤醒它。

  • notEmpty条件队列(队列空时候阻塞的take线程): await的时机:队列空了 signal的时机:一是take方法移出元素后,若是队列还有空位,会singal线程继续移出;二是若是队列最开始空的,put方法放入了一个元素后,队列还有一个元素时也会唤醒它。

这种算法就是“two lock queue”的设计思想,这也是LinkedBlockingQueue的吞吐量较高的本质缘由

ArrayBlockingQueue和LinkedBlockingQueue的比较总结

经过上述的分析,对于LinkedBlockingQueue和ArrayBlockingQueue的基本使用以及内部实现原理咱们已较为熟悉了,这里咱们就对它们两间的区别来个小结

1.队列大小和构造方法有所不一样,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue能够是有界的也能够是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的状况下,可能会形成内存溢出等问题,有坑。

2.数据存储容器不一样,ArrayBlockingQueue采用的是数组做为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点做为链接对象的单向链表。

3.从GC的角度分析:因为ArrayBlockingQueue采用的是数组的存储容器,所以在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内须要高效并发地处理大批量数据的时,对于GC可能存在较大影响。

4.二者的实现队列添加或移除的锁不同,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操做和移除操做采用的同一个ReenterLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提升队列的吞吐量,也意味着在高并发的状况下生产者和消费者能够并行地操做队列中的数据,以此来提升整个队列的并发性能。

相关文章
相关标签/搜索