我在前段时间写了一篇关于AQS源码解析的文章AbstractQueuedSynchronizer超详细原理解析 ,在文章里边我说JUC
包中的大部分多线程相关的类都和AQS
相关,今天咱们就学习一下依赖于AQS
来实现的阻塞队列BlockingQueue
的实现原理。本文中的源码未加说明即来自于以ArrayBlockingQueue
。java
相信大多数同窗在学习线程池时会了解阻塞队列的概念,熟记各类类型的阻塞队列对线程池初始化的影响。当从阻塞队列获取元素可是队列为空时,当前线程会阻塞直到另外一个线程向阻塞队列中添加一个元素;相似的,当向一个阻塞队列加入元素时,若是队列已经满了,当前线程也会阻塞直到另一个线程从队列中读取一个元素。阻塞队列通常都是先进先出的,用来实现生产者和消费者模式。当发生上述两种状况时,阻塞队列有四种不一样的处理方式,这四种方式分别为抛出异常,返回特殊值(null或在是false),阻塞当前线程直到执行结束,最后一种是只阻塞固定时间,到时后还没法执行成功就放弃操做。这些方法都总结在下边这种表中了。node
咱们就只分析put
和take
方法。数组
咱们都知道,使用同步队列能够很轻松的实现生产者-消费者模式,其实,同步队列就是按照生产者-消费者的模式来实现的,咱们能够将put
函数看做生产者的操做,take
是消费者的操做。bash
咱们首先看一下ArrayListBlock
的构造函数。它初始化了put
和take
函数中使用到的关键成员变量,分别是ReentrantLock
和Condition
。多线程
public ArrayBlockingQueue(int capacity, boolean fair) {
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
复制代码
ReentrantLock是AQS
的子类,其newCondition
函数返回的Condition
接口实例是定义在AQS类内部的ConditionObject
实现类。它能够直接调用AQS
相关的函数。并发
put
函数会在队列末尾添加元素,若是队列已经满了,没法添加元素的话,就一直阻塞等待到能够加入为止。函数的源码以下所示。函数
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //先得到锁
try {
while (count == items.length)
//若是队列满了,就NotFull这个Condition对象上进行等待
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
//这里能够注意的是ArrayBlockingList实际上使用Array实现了一个环形数组,
//当putIndex达到最大时,就返回到起点,继续插入,
//固然,若是此时0位置的元素尚未被取走,
//下次put时,就会由于cout == item.length未被阻塞。
if (++putIndex == items.length)
putIndex = 0;
count++;
//由于插入了元素,通知等待notEmpty事件的线程。
notEmpty.signal();
}
复制代码
咱们会发现put函数使用了wait/notify的机制。与通常生产者-消费者的实现方式不一样,同步队列使用ReentrantLock
和Condition
相结合的先得到锁,再等待的机制;而不是Synchronized
和Object.wait
的机制。这里的区别咱们下一节再详细讲解。 看完了生产者相关的put
函数,咱们再来看一下消费者调用的take
函数。take
函数在队列为空时会被阻塞,一直到阻塞队列加入了新的元素。学习
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
//若是队列为空,那么在notEmpty对象上等待,
//当put函数调用时,会调用notEmpty的notify进行通知。
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
E x = (E) items[takeIndex];
items[takeIndex] = null; //取出takeIndex位置的元素
if (++takeIndex == items.length)
//若是到了尾部,将指针从新调整到头部
takeIndex = 0;
count--;
....
//通知notFull对象上等待的线程
notFull.signal();
return x;
}
复制代码
咱们发现ArrayBlockingList
并无使用Object.wait
,而是使用的Condition.await
,这是为何呢?其中又有哪些缘由呢? Condition
对象能够提供和Object
的wait
和notify
同样的行为,可是后者必须先获取synchronized
这个内置的monitor锁,才能调用;而Condition
则必须先获取ReentrantLock
。这两种方式在阻塞等待时都会将相应的锁释放掉,可是Condition
的等待能够中断,这是两者惟一的区别。ui
咱们先来看一下Condition
的wait
函数,wait
函数的流程大体以下图所示。this
wait
函数主要有三个步骤。一是调用addConditionWaiter
函数,在condition wait queue队列中添加一个节点,表明当前线程在等待一个消息。而后调用fullyRelease
函数,将持有的锁释放掉,调用的是AQS的函数,不清楚的同窗能够查看本篇开头的介绍的文章。最后一直调用isOnSyncQueue
函数判断节点是否被转移到sync queue
队列上,也就是AQS中等待获取锁的队列。若是没有,则进入阻塞状态,若是已经在队列上,则调用acquireQueued
函数从新获取锁。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//在condition wait队列上添加新的节点
Node node = addConditionWaiter();
//释放当前持有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//因为node在以前是添加到condition wait queue上的,如今判断这个node
//是否被添加到Sync的得到锁的等待队列上,Sync就是AQS的子类
//node在condition queue上说明还在等待事件的notify,
//notify函数会将condition queue 上的node转化到Sync的队列上。
while (!isOnSyncQueue(node)) {
//node尚未被添加到Sync Queue上,说明还在等待事件通知
//因此调用park函数来中止线程执行
LockSupport.park(this);
//判断是否被中断,线程从park函数返回有两种状况,一种是
//其余线程调用了unpark,另一种是线程被中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//代码执行到这里,已经有其余线程调用notify函数,或则被中断,该线程能够继续执行,可是必须先
//再次得到调用await函数时的锁.acquireQueued函数在AQS文章中作了介绍.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
 ....
}
final int fullyRelease(Node node) {
//AQS的方法,当前已经在锁中了,因此直接操做
boolean failed = true;
try {
int savedState = getState();
//获取state当前的值,而后保存,以待之后恢复
// release函数是AQS的函数,不清楚的同窗请看开头介绍的文章。
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
private int checkInterruptWhileWaiting(Node node) {
//中断可能发生在两个阶段中,一是在等待signa时,另一个是在得到signal以后
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
//这里要和下边的transferForSignal对应着看,这是线程中断进入的逻辑.那边是signal的逻辑
//两边可能有并发冲突,可是成功的一方必须调用enq来进入acquire lock queue中.
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
//若是失败了,说明transferForSignal那边成功了,等待node 进入acquire lock queue
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
复制代码
signal
函数将condition wait queue
队列中队首的线程节点转移等待获取锁的sync queue
队列中。这样的话,wait
函数中调用isOnSyncQueue
函数就会返回true,致使wait
函数进入最后一步从新获取锁的状态。
咱们这里来详细解析一下condition wait queue
和sync queue
两个队列的设计原理。condition wait queue
是等待消息的队列,由于阻塞队列为空而进入阻塞状态的take
函数操做就是在等待阻塞队列不为空的消息。而sync queue
队列则是等待获取锁的队列,take函数得到了消息,就能够运行了,可是它还必须等待获取锁以后才能真正进行运行状态。
signal
函数的示意图以下所示。
signal
函数其实就作了一件事情,就是不断尝试调用transferForSignal
函数,将condition wait queue
队首的一个节点转移到sync queue
队列中,直到转移成功。由于一次转移成功,就表明这个消息被成功通知到了等待消息的节点。
public final void signal() {
if (!isHeldExclusively())
//若是当前线程没有得到锁,抛出异常
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//将Condition wait queue中的第一个node转移到acquire lock queue中.
doSignal(first);
}
private void doSignal(Node first) {
do {
   //因为生产者的signal在有消费者等待的状况下,必需要通知
//一个消费者,因此这里有一个循环,直到队列为空
//把first 这个node从condition queue中删除掉
//condition queue的头指针指向node的后继节点,若是node后续节点为null,那么也将尾指针也置为null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
//transferForSignal将node转而添加到Sync的acquire lock 队列
}
final boolean transferForSignal(Node node) {
//若是设置失败,说明该node已经被取消了,因此返回false,让doSignal继续向下通知其余未被取消的node
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//将node添加到acquire lock queue中.
Node p = enq(node);
int ws = p.waitStatus;
//须要注意的是这里的node进行了转化
//ws>0表明canceled的含义因此直接unpark线程
//若是compareAndSetWaitStatus失败,因此直接unpark,让线程继续执行await中的
//进行isOnSyncQueue判断的while循环,而后进入acquireQueue函数.
//这里失败的缘由多是Lock其余线程释放掉了锁,同步设置p的waitStatus
//若是compareAndSetWaitStatus成功了呢?那么该node就一直在acquire lock queue中
//等待锁被释放掉再次抢夺锁,而后再unpark
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
复制代码
后边一篇文章主要讲解如何本身使用AQS
来建立符合本身业务需求的锁,请你们继续关注个人文章啦.一块儿进步偶。