本文在我的技术博客同步发布,详情可用力戳
亦可扫描屏幕右侧二维码关注我的公众号,公众号内有我的联系方式,等你来撩...java
看过我上一篇文章的应该知道(家里条件容许的能够先看看上一篇文章),若是想实现一个生产者消费者模型,咱们能够基于JVM自带的synchronized+wait+notify实现,也能够用JDK里面的ReentrantLock+Condition实现!不过从上篇文章的demo看,实现起来也不是那么容易!由于咱们既要关心何时须要阻塞线程,又要须要关心什么时候唤醒线程。控制的细节太多,一个疏忽可能就致使了一个不易发现的bug,好比上篇文章中的虚假唤醒的例子!那有没有一种咱们不用关心那么多复杂细节就能实现生产者消费者模式的方法呢?本文要讲的阻塞队列就是一种很好的实现!node
在咱们刚开始学数据结构的时候,都接触过一种先进先出(first in first out,简称“FIFO”)的数据结构,叫队列。阻塞队列从名字看也是队列的一种,所以知足队列的特性,而后这个队列是可阻塞的!这个阻塞怎么理解呢?就是当咱们一个线程往阻塞队列里面添加元素的时候,若是队列满了,那这个线程不会直接返回,而是会被阻塞,直到元素添加成功!当咱们一个线程从阻塞队列里面获取元素的时候,若是队列是空的,那这个线程不会直接返回,而是会被阻塞直到元素获取成功。而阻塞以及唤醒的操做都由阻塞队列来管理!数组
咱们先看在java中阻塞队列基本的继承关系图:数据结构
完整的继承关系要比这张图复杂一些,但为了清晰起见图中我只画了主要的类和关系。队列的基接口Queue与咱们开发中常常用到的List、Set是兄弟关系,所以我这里也列出来了方便对比记忆!阻塞队列的基接口是继承自Queue接口的BlockingQueue接口,其余阻塞队列具体实现都继承BlockingQueue接口!并发
咱们先看队列基接口Queue中的方法ide
这个接口一共6个方法,咱们能够分为两组
一、“异常”组函数
一、add(e):将元素放到队列末尾,成功返回true,失败则抛异常。
二、remove():获取并移除队首元素,获取失败则抛异常。
三、element():获取队首元素,不移除,获取失败则抛异常。源码分析
二、“特殊值”组this
一、offer(e):将元素放到队列末尾,成功返回true,失败返回false。
二、poll():获取并返回队首元素,获取失败则返回null。
三、peek():获取队首元素,不移除,获取失败则返回null。idea
“异常”组的3个方法在操做失败的时候会抛异常,所以叫“异常”组!
“特殊值”组3个方法与“异常”组的3个方法是一一对应的,功能都同样,只是在操做失败的时候不会抛异常而是返回一个特殊值,所以叫“特殊值组”。
这两组方法都是在Queue接口中定义的,所以跟阻塞就没有什么关系了。那咱们再看看BlockingQueue接口中的方法
这个接口咱们重点关注标记出来的4个方法,这几个方法咱们也能够分为两组
三、“阻塞”组
一、put(e):将元素放到队列末尾,若是队列满了,则等待。
二、take():获取并移除队首元素,若是队列为空,则等待。
四、“超时”组
一、offer(e,time,unit):将元素放到队列末尾,若是队列满了,则等待,当等待超过指定时间后仍添加元素失败,则返回false,不然返回true。
二、poll(time,unit):获取并返回队首元素,若是队列为空,则等待,当等待超过指定时间后仍获取失败则返回null,不然返回获取到的元素。
这两组方法都是在BlockingQueue接口中定义的,所以都是跟阻塞相关的!
“阻塞”组2个方法在操做不成功的时候会一直阻塞线程,直到可以操做成功,所以叫“阻塞”组!用一个成语形容就是“不见不散”!
“超时”组2个方法与“超时”组的2个方法是一一对应的,功能都同样,只是这2个方法不会一直阻塞,超过了指定的时间还没成功就中止阻塞并返回,所以叫“超时”组!用一个成语形容就是“过期不候”!
这四组方法合在一块儿就有了下面的一张表格:
方法功能 | 异常组 | 特殊值组 | 阻塞组 | 超时组 |
---|---|---|---|---|
元素入队 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
元素出队 | remove() | pool() | take() | poll(time,unit) |
检查元素 | element() | peek() | 无 | 无 |
BlockingQueue的实现类有多个,可是若是每个源码都进行分析那不只很影响篇幅且不必,所以我这里拿三个经常使用的阻塞队列源码进行分析!在源码中jdk的版本为1.8!
咱们先看下ArrayBlockingQueue中的几个属性
/** The queued items 使用数组存储元素 */ final Object[] items; /** items index for next take, poll, peek or remove 下一个出队元素索引 */ int takeIndex; /** items index for next put, offer, or add 下一个入队元素索引 */ int putIndex; /** Number of elements in the queue 队列元素个数 */ int count; /* * ReentrantLock+Condition控制并发 * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
1.object类型数组,也意味着ArrayBlockingQueue底层数据结构是数组。
2.ReentrantLock+Condition,若是看过我上一篇文章的应该很熟悉,这是用作来线程同步和线程通讯的。
咱们再看下ArrayBlockingQueue的构造函数。
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c){ this(capacity, fair); //初始化一个集合到队列 .... }
这三个构造函数都必须传入一个int类型的capacity参数,这个参数也意味着ArrayBlockingQueue是一个有界的阻塞队列!
咱们前面说过队列有经常使用的四组方法,而跟阻塞相关的是“阻塞”组和“超时”组的四个方法!咱们以“阻塞”组的put()和take()方法为例,来窥探一下源码里面的奥秘:
/** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. */ public void put(E e) throws InterruptedException { checkNotNull(e); //加锁操做 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //判断队列是否知足入队条件,若是队列已满,则阻塞等待一个“不满”的信号 while (count == items.length) notFull.await(); //知足条件,则进行入队操做 enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; // 下一个入队元素索引超过了数组的长度,则又从0开始。 if (++putIndex == items.length) putIndex = 0; count++; //放入元素后,释放一个“不空”的信号。唤醒等待中的出队线程。 notEmpty.signal(); }
public E take() throws InterruptedException { //加锁操做 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //判断队列是否知足出队条件,若是队列为空,则阻塞等待一个“不空”的信号 while (count == 0) notEmpty.await(); //知足条件,则进行出队操做 return dequeue(); } finally { lock.unlock(); } } private E dequeue() { final Object[] items = this.items; E x = (E) items[takeIndex]; items[takeIndex] = null;//help GC // 下一个出队元素索引超过了数组的长度,则又从0开始。 if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued();//更新迭代器元素数据 //取出元素后,释放一个“不满”的信号。唤醒等待中的入队线程。 notFull.signal(); return x; }
ArrayBlockingQueue的入队出队代码仍是很简单的,当咱们往一个阻塞队列里面添加数据的时候,阻塞队列用一个固定长度的数据存储数据,若是数组的长度达到了最大容量,则添加数据的线程会被阻塞。当咱们从阻塞队列获取数据的时候,若是队列为空,则获取数据的线程会被阻塞!相信代码上的注释已经足够理解这块的代码逻辑了!
咱们先看下LinkedBlockingQueue中的几个属性
/** The capacity bound, or Integer.MAX_VALUE if none 队列容量 */ private final int capacity; /** Current number of elements 队列元素个数 */ private final AtomicInteger count = new AtomicInteger(); /** * 队列头 * Head of linked list. * Invariant: head.item == null */ transient Node<E> head; /** * 队列尾 * Tail of linked list. * Invariant: last.next == null */ private transient Node<E> last; /** Lock held by take, poll, etc 出队操做用到的锁 */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc 入队操做用到的锁 */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
1.Node类型的变量head和last,这是链表常见操做,也意味着LinkedBlockingQueue底层数据结构是链表。
2.与ArrayBlockingQueue不一样的是,这里有两个ReentrantLock对象,put操做个take操做的锁对象是分开的,这样作也是为了提升容器的并发能力。
再看下Node这个内部类
/** * Linked list node class */ static class Node<E> { E item; //指向下一个节点 Node<E> next; Node(E x) { item = x; } }
只有next属性意味着这是一个单向链表!
再看下LinkedBlockingQueue的构造函数
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); ... }
1.当构造函数不传capacity参数的时候,LinkedBlockingQueue就是一个无界阻塞队列(其实也并不是无界,不传默认值就是Integer.MAX_VALUE)。
2.当构造函数传入capacity参数的时候,LinkedBlockingQueue就是一个有界阻塞队列。
咱们依然看看在LinkedBlockingQueue中“阻塞”组的两个方法put()和take()分别怎么实现的
/** * Inserts the specified element at the tail of this queue, waiting if * necessary for space to become available. */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); //存储队列元素数量 int c = -1; //建立新节点 Node<E> node = new Node<E>(e); //获取putLock final ReentrantLock putLock = this.putLock; //队列元素数量 final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { //判断队列是否知足入队条件,若是队列已满,则阻塞等待一个“不满”的信号 while (count.get() == capacity) { notFull.await(); } //入队操做 enqueue(node); //队列元素数量+1,执行完下面这句后,count是入队后的元素数量,而c的值仍是入队前的元素数量。 c = count.getAndIncrement(); //当前入队操做成功后,若是元素数量还小于队列容量,则释放一个“不满”的信号 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } //这里的c前面说了是元素入队前的数量,若是入队前元素数量为0(队列是空的),那可能会有出队线程在等待一个“不空”的信号,因此这里释放一个“不空”的信号。 if (c == 0) signalNotEmpty(); } private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } public E take() throws InterruptedException { //出队元素 E x; //存储队列元素数量 int c = -1; //队列元素数量 final AtomicInteger count = this.count; //获取takeLock final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { //判断队列是否知足出队条件,若是队列为空,则阻塞等待一个“不空”的信号 while (count.get() == 0) { notEmpty.await(); } //出队操做 x = dequeue(); //队列元素数量-1,执行完下面这句后,count是出队后的元素数量,而c的值仍是出队前的元素数量。 c = count.getAndDecrement(); //当前出队操做成功前队列元素大于1,那当前出队操做成功后队列元素也就大于0,则释放一个“不空”的信号 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } //这里的c前面说了是元素出队前的数量,若是出队前元素数量为总容量(队列是满的),那可能会有入队线程在等待一个“不满”的信号,因此这里释放一个“不满”的信号。 if (c == capacity) signalNotFull(); return x; } private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
这里源码的同步逻辑比ArrayBlockingQueue中要稍微复杂一点,在ArrayBlockingQueue中每次入队都释放一个“不空”的信号,每次出队都释放一个“不满”的信号,而LinkedBlockingQueue则不一样。
元素入队的时候
1.入队后还有空位,则释放一个“不满”的信号。
2.入队时队列为空,则释放一个“不空”的信号。
元素出队的时候
1.出队后队列还有元素,则释放一个“不空”的信号。
2.出队前队列是满的,则释放一个“不满”的信号。
SynchronousQueue从名字看叫“同步队列”,怎么理解呢?虽然他也叫队列,可是他不提供空间存储元素!当一个线程往队列添加元素,须要匹配到有另一个线程从队列取元素,不然线程阻塞!当一个线程从队列获取元素,须要匹配到有另一个线程往队列添加元素,不然线程阻塞!因此这里的同步指的就是入队线程和出队线程须要同步!这里有点相似你妈妈对你说:“今年你再找不到女友,过年你就别回来了!”,因而你第二年就真的没回去过年!由于你是一个获取数据(找女友)的线程,数据没获取到则一直阻塞!
了解了大体概念,咱们再来看看源码!
/** * Creates a {@code SynchronousQueue} with nonfair access policy. */ public SynchronousQueue() { this(false); } /** * Creates a {@code SynchronousQueue} with the specified fairness policy. * * @param fair if true, waiting threads contend in FIFO order for * access; otherwise the order is unspecified. */ public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
两个构造函数,fair参数指定公平策略,默认为false,所以是非公平模式!先看看put和take方法的实现:
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } } public E take() throws InterruptedException { E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }
put和take方法很相似,都是调用transferer.transfer(...)方法,区别在于第一个参数!put方法在调用时候会参入入队的值,而take方法传入null。
上面说过有公平和非公平策略,今天将重点分析公平模式TransferQueue的源码!从名字能看出来这也是一个队列,咱们先看TransferQueue的重点属性和构造方法:
// 指向队列头部 transient volatile QNode head; // 指向队列尾部 transient volatile QNode tail; TransferQueue() { //初始化一个空 //#1 QNode h = new QNode(null, false); // initialize to dummy node. head = h; tail = h; }
一头一尾,链表的一向操做!构造方法中,建立了一个QNode结点,而且将head和tail都指向这个结点!咱们再看看QNode类的重要属性和构造方法:
volatile QNode next; // 指向队列的下一个节点 volatile Object item; // 节点存储的元素 volatile Thread waiter; // 被阻塞的线程 final boolean isData; // 是不是“数据”结点(入队线程为true,出队线程为false) QNode(Object item, boolean isData) { this.item = item; this.isData = isData; }
咱们再回到上面提到的transferer.transfer(...)方法,也就是TransferQueue中的transfer(...)方法,核心逻辑都在这个方法中体现:
/** * “存”或者“取”一个元素 */ @SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { QNode s = null; // constructed/reused as needed //当前操做类型,传非null的值则为生产线程,传null则为消费线程。 boolean isData = (e != null); for (;;) { QNode t = tail; QNode h = head; //上面咱们说过在构造方法中就建立了一个QNode结点,而且将head和tail都指向这个结点 //所以这里t、h通常状况下不会为null if (t == null || h == null) // saw uninitialized value continue; // spin //根据SynchronousQueue的特性,不一样类型的操做会配对成功。 //所以在阻塞队列中只会存在一种类型的阻塞节点,要么全是消费线程要么全是生产线程! //因此分三种状况: //1.h == t,这种状况下队列为空,须要将当前节点入队。 //2.t.isData == isData尾部节点的操做类型与当前操做类型 // 一致(尾部节点的操做类型表明着队列中全部节点的操做类型),须要将当前节点入队。 //3.队列不为空且尾部节点的操做类型与当前操做类型不一致, // 须要从队列头部匹配一个节点并返回。 //所以再看下面的代码,会根据上面3种状况走不一样的分支。 if (h == t || t.isData == isData) { // empty or same-mode //进入这个分支就是上面一、2的状况 //获取尾部节点的next指向,正常状况下tn等于null QNode tn = t.next; //下面是判断是否出现并发致使尾节点被更改 if (t != tail) // inconsistent read continue; if (tn != null) { // lagging tail advanceTail(t, tn); continue; } //超时判断 if (timed && nanos <= 0) // can't wait return null; //将当前操做建立为新节点,传入数据值和操做类型。 //#2 if (s == null) s = new QNode(e, isData); //一、将阻塞队列中尾部节点的next指向新节点 //二、将tail属性的指向设置为新节点 //#3 if (!t.casNext(null, s)) // failed to link in continue; advanceTail(t, s); // swing tail and wait //在这个方法内部会进行自旋或者阻塞,直到配对成功。 //建议这里先跳到下面这个方法内部看完逻辑再回来。 Object x = awaitFulfill(s, e, timed, nanos); //只有在线程被中断的状况下会进入这个分支 if (x == s) { // wait was cancelled clean(t, s); return null; } if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } //若是为生产线程,则返回入队的值;若是为消费线程,则返回匹配到的生产线程的值。 return (x != null) ? (E)x : e; } else { // complementary-mode //进入这个分支就是上面3的状况 //找到头部节点的next指向 //#4 QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) continue; // inconsistent read Object x = m.item; //m.casItem(x, e)方法很重要,会将匹配到的节点的item修改成当前操做的值。 //这样awaitFulfill方法的x != e条件才能成立,被匹配的阻塞线程才能返回。 //#5 if (isData == (x != null) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; } //调整head属性的指向,这里建议这里先跳到下面这个方法内部看完逻辑再回来。 advanceHead(h, m); // successfully fulfilled //唤醒匹配到的阻塞线程 LockSupport.unpark(m.waiter); //若是为生产线程,则返回入队的值;若是为消费线程,则返回匹配到的生产线程的值。 return (x != null) ? (E)x : e; } } } Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { /* Same idea as TransferStack.awaitFulfill */ final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); //若是头节点的next指向当前的数据节点,也就是当前数据节点是下一个待匹配的节点,那就自旋等待一下子。 //若是设置了超时时间就少自旋一下子,没有设置超时时间就多自旋一下子。 //能够看看maxTimedSpins和maxUntimedSpins两个属性的值设置,是与cpu数量相关的。 int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) s.tryCancel(e); Object x = s.item; // 第一次进来这里确定是相等的,因此不会进入这个分支。 // 当有其余的线程匹配到当前节点,这里的s.item的值会被更改(前面说到过的m.casItem(x, e)方法),因此方法返回。 if (x != e) return x; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(e); continue; } } if (spins > 0) --spins; else if (s.waiter == null) s.waiter = w; else if (!timed) //这里线程会阻塞,若是有线程与当前线程匹配,则被唤醒进行下一次循环。 LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } } void advanceHead(QNode h, QNode nh) { //这个方法作了两个操做 //一、将head属性的指向调整为头节点的下一个结点 //二、将原头节点的next指向原头节点自己 //#6 if (h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh)) h.next = h; // forget old next }
不知道看完上面的SynchronousQueue基于公平模式TransferQueue的源码有没有对SynchronousQueue有一个很好的了解!下面我模拟了一个场景,先有一个生产线程进入队列,而后一个消费线程进入队列。结合上面源码我画了几张节点变化的图例以便更好的理解上面整个过程,能够结合上面的源码一块儿看
//建立SynchronousQueue对象 SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>(true); //生产线程 new Thread(new Runnable() { @Override public void run() { try { synchronousQueue.put("VALUE"); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); Thread.sleep(1000); //消费线程 new Thread(new Runnable() { @Override public void run() { try { synchronousQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start();
咱们在建立SynchronousQueue对象时候会执行构造函数,也就是在源码#1处执行完后,会建立一个新的节点node,以下图所示,一头一尾都指向构造函数中建立出来的新节点node!
而后会执行synchronousQueue.put()的逻辑,也就是TransferQueue中的transfer(...)方法逻辑。按照咱们以前的分析,会执行到源码#2处,执行完后新的节点node1会被建立,以下图所示。
接着在代码#3处执行完后,节点图示以下,注意红色箭头指向的调整。
到这里,生产线程会进入awaitFulfill方法自旋后阻塞!等待消费线程的唤醒!
而后执行synchronousQueue.take()的逻辑,也就是TransferQueue中的transfer(...)方法逻辑。按照咱们以前的分析,会执行到源码#4处,执行完后就找到了咱们须要匹配的节点node1,注意红色箭头指向。
执行到#5处的方法会改变匹配到节点的item属性值,注意node1节点item属性的变化,以下图所示。
而后在代码#6处执行完后,节点图示以下,注意红色箭头指向的调整。
最后就是消费线程唤醒生产线程,消费线程返回,生产线程也返回,过程结束!
好了,源码分析就到这里结束了,你看懂了吗?