使用过ReentrantLock
的盆友应该也知道Condition
的存在。先讲解下它存在的意义:就是仿照实现Object
类的wait
signal
signallAll
等函数功能的。java
这里引伸一个面试常问到的问题:wait
会释放锁,sleep
不会。node
Condition
的一般使用场景是这样的: 生产者消费者模型,假设生产者只有在生产队列为空时才进行生产,则代码相似以下:Condition emptyCondition = ReentrantLock.newCondition();
Runnable consumer = new Runnable() {
public void run() {
if(queue.isEmpty()) {
emptyCondition.signal(); // emptyObj.notify();
} else {
consumer.consume();
}
}
}
Runnable provider = new Runnable() {
public void run() {
emptyCondition.wait(); // emptyObj.wait();
providerInstance.produce();
}
}
复制代码
因此咱们能够知道Condition
设计的意义了。下面咱们来说解下其实现原理。面试
还记得在AQS:JAVA经典之锁实现算法(一)提到的锁实现的Sync Queue
吗? Condition
的实现是相似的原理: 每一个AQS
里有x(视你newCondition
几回)个Condition Queue
,它的结点类也是AQS
内部类Node
。Node
里有一个nextWaiter
,指向下一个在同一Condition Queue
里的Node
。 结构以下图: 算法
condition.wait
必定是在成功lock
的线程里调用才有效,否则不符合逻辑,同时也会抛出IlleagleMornitorException
。Sync Queue
的队首,当调用condition.wait
时,该线程会释放锁(即将AQS
的state
置为0),同时唤醒后继结点,后继结点在acquire
的循环里会成功获取锁,而后将本身所在结点置为队首,而后开始本身线程本身的业务代码。 这个过程看下图:
condition
的signal
后,在Condition Queue
中的Node
会从Condition Queue
中出队,进入Sync Queue
队列,开始它的锁竞争的过程。 过程看下图:因此,这里能够看出来,即便是被signal
了,被signal
的线程也不是直接就开始跑,而是再次进入Sync Queue
开始竞争锁而已。这里的这个逻辑,跟Object.wait Object.signal
也是彻底同样的。bash
咱们先看一段运用到condition
的代码案例: 假设生成者在生产队列queue
为空时emptyCondition.signal
才进行生产操做ide
ReentrantLock locker = new ReentrantLock();
Condition emptyCondition = locker.newCondition();
Runnable consumer = new Runnable() {
public void run() {
locker.lock();
if (queue.isEmpty()) {
emptyCondition.signal();
} else {
...
}
locker.unlock();
}
};
Runnable producer = new Runnable() {
public void run() {
locker.lock();
emptyCondition.wait();
// 开始生产
...
locker.unlock();
}
}
复制代码
咱们从消费者一步一步走,拟定以下这样一套线程切换逻辑:函数
producer#lock
consumer#lock
producer#await
consumer#signal
consumer#unlock
producer#unlock
(先从Sync Queue Condition Queue
图解讲一遍,而后对应图解,对着代码撸一遍)ui
producer#lock
生产者直接获取锁成功,入队Sync Queue
,位队首 this
consumer#lock
消费者竞争锁失败,进入Sync Queue
等待获取锁 spa
producer#await
生产者进入等待,释放锁,出Sync Queue
,进入Condition Queue
,等待emptyCondition
来唤醒。
consumer#signal
消费者唤起生产者,生产者consumer
的node
自Condition Queue
转移到Sync Queue
开始竞争锁。
consumer.unlock
consumer
释放锁后,consumer
的node
从Sync Queue
出队,释放state
,唤醒后继结点provider#node
,provider
抢占到锁。
provider#unlock
这里就没有啥好说的了。
固然,我为了讲解过程,像在锁被第一次成功获取的时候,逻辑上虽然并非直接进入
Sync Queue
我也给讲解成直接进入Sync Queue
了,这是为了缩减边边角角的小逻辑,讲清楚主线逻辑。你们看明白主逻辑,而后再本身去撸一遍,就融会贯通了。
provider.lock
final void lock() {
// 这就直接获取锁成功了,没有else的逻辑了
if (compareAndSetState(0, 1))
// 这个方法是AQS类用来设置拥有锁的线程实例
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
复制代码
consumer#lock
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
// consumer.lock就要走这里了,由于上面的compareAndSetState
// 返回false
else
acquire(1);
}
复制代码
protected final boolean compareAndSetState(int expect, int update) {
// 楼下这个是CAS原理进行值修改,CAS就对比乐观锁来,
// 这里想要修改this这个对象的state字段,若是state是expect
// 则修改至update,返回true;不然false。咱们知道provider.lock
// 已经将state 改成非0值了,因此这里确定失败啦
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
复制代码
provider#await
先简单看下Condition
类对象结构
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
...
复制代码
一个Condition
对象就是一条链队,头尾结点在Condition
的内部字段指定firstWaiter lastWaiter
。
看await
方法
public final void await() throws InterruptedException {
// 由于await是响应中断的等待,这里就是检验下,
// 一般而言,凡是throws InterruptedException的,
// 开头基本都是这句
if (Thread.interrupted())
throw new InterruptedException();
// 这里是向condition queue中插入一个node,并返回之,
// 插入了这个node,就表明当前线程在condition queue
// 中开始等待了
Node node = addConditionWaiter();
// 这个是AQS释放锁方法,加个fully,就是用来将屡次
// 获取锁一次性都释放掉,而后将锁获取次数返回,
// 留着后面signal后成功获取锁的时候,还要加锁一样的
// 次数。
// !!!同时注意,这里唤醒了后继结点!后集结点就继续开始
// 竞争锁,就是在acquire那个自旋方法里,记得吗
// 不记得去看看文章(一)
int savedState = fullyRelease(node);
// 记录当前线程中断的标记
int interruptMode = 0;
// 判断当前的node是否已经转移到sync queue里了。
// 转移了,说明这个node已经开始竞争锁了,不用再等待
// 唤醒了,没转,继续自旋
while (!isOnSyncQueue(node)) {
// 这里把当前线程给挂起了
LockSupport.park(this);
// 这里的方法checkxxx就是用来检查waiting自旋期间,线程有没有
// interrupt掉。由于await方法是响应线程中断的。
// 若interrupt了,则在checkxxx方法里,会将node转移到
// sync Queue中,去竞争,不要担忧,由于同时
// 会设置interruptMode,在最后会根据其值抛Interrupted
// 异常。。
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
// 那何时就结束上面的自旋呢?一个是当前的线程被
// signal了,那node就被transfer到sync queue了,while
// 就不知足了。再一个就是线程中断了,在while循环体里给break掉了
}
// 跳出来后,紧接着去竞争锁,知道成功为止。&& 后面这个THROW_IE,标识
// 要抛出异常,不是的话,就是REINTERRPUT,表明保证线程的中断标记不被
// 重置便可。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 这儿是在condition queue里有多个waiter的时候才起做用,主要用来将
// CANCEL的结点从链队中剔除掉
// 具体你们本身看吧。如今忽略这
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 这儿就是处理interruptMode中断标记字段的逻辑
// 在reportxxx中,interruptMode为THROW_IE,则抛出
// 异常,不是,则保证线程的中断field不被重置为“未中断”便可
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
复制代码
consumer#signal
consumer
在调用emptyCondition.signal
的时候,会影响到emptyCondition
的condition queue
中的等待线程,这里 具体指上面的provider#await方法。
public final void signal() {
// 先判断下,lock锁是否是在调用signal方法的当前线程手里
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 取到condition queue里的第一个waiter node,这里也就是
// consumer,由于它第一个await进入condition queue了
Node first = firstWaiter;
// 这里去进行了具体的signal操做,具体会作先把waiter node的waitStatus
// 从CONDITION状态改成入Sync Queue的正常状态值0
// 而后修改Sync Queue 的Head Tail等,让其入队成功
// 最后再从其前驱结点的状态值上确保当前结点可以被唤起便可。
// 这里是由于这个waitStatus值对后继结点的行为是有影响的,像SIGNAL指
// 的是在结点释放后,要去唤醒后继结点
//
if (first != null)
doSignal(first);
}
复制代码
consumer#unlock
unlock
具体调用的 AQS
的release()
方法
public void unlock() {
sync.release(1);
}
// AQS.release
public final boolean release(int arg) {
// tryRelease,这里由NonFairSync实现,具体就是经过
// CAS去修改state值,并判断是否成功释放锁
if (tryRelease(arg)) {
// 成功释放了,则在waitStatus 不是初始状态时,去唤醒后继,
// 这个 != 0 来作判断的缘由,就要综合全部状况,
// 像FailSync NonFairSync \ Exclusive \ Share
// 等全部状况来看这里的waitSTatus都会处于什么状态。
// 全撸一遍的话,会发现这里的 != 0可以涵盖以上全部状况。
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
复制代码
provider#unlock
这里就同理上面了。
整体来看两个 queue
的转换仍是挺清楚的。只要记住,无论什么状况(中断与否),都是要从condition queue
转移到sync queue
的。具体你们仍是要本身去想一种线程切换场景,去走走看。 行文匆匆, 欢迎指正。