死磕 java同步系列之ReentrantLock源码解析(二)——条件锁

问题

(1)条件锁是什么?java

(2)条件锁适用于什么场景?node

(3)条件锁的await()是在其它线程signal()的时候唤醒的吗?git

简介

条件锁,是指在获取锁以后发现当前业务场景本身没法处理,而须要等待某个条件的出现才能够继续处理时使用的一种锁。源码分析

好比,在阻塞队列中,当队列中没有元素的时候是没法弹出一个元素的,这时候就须要阻塞在条件notEmpty上,等待其它线程往里面放入一个元素后,唤醒这个条件notEmpty,当前线程才能够继续去作“弹出一个元素”的行为。学习

注意,这里的条件,必须是在获取锁以后去等待,对应到ReentrantLock的条件锁,就是获取锁以后才能调用condition.await()方法。ui

在java中,条件锁的实现都在AQS的ConditionObject类中,ConditionObject实现了Condition接口,下面咱们经过一个例子来进入到条件锁的学习中。this

使用示例

public class ReentrantLockTest {
    public static void main(String[] args) throws InterruptedException {
        // 声明一个重入锁
        ReentrantLock lock = new ReentrantLock();
        // 声明一个条件锁
        Condition condition = lock.newCondition();

        new Thread(()->{
            try {
                lock.lock();  // 1
                try {
                    System.out.println("before await");  // 2
                    // 等待条件
                    condition.await();  // 3
                    System.out.println("after await");  // 10
                } finally {
                    lock.unlock();  // 11
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        
        // 这里睡1000ms是为了让上面的线程先获取到锁
        Thread.sleep(1000);
        lock.lock();  // 4
        try {
            // 这里睡2000ms表明这个线程执行业务须要的时间
            Thread.sleep(2000);  // 5
            System.out.println("before signal");  // 6
            // 通知条件已成立
            condition.signal();  // 7
            System.out.println("after signal");  // 8
        } finally {
            lock.unlock();  // 9
        }
    }
}
复制代码

上面的代码很简单,一个线程等待条件,另外一个线程通知条件已成立,后面的数字表明代码实际运行的顺序,若是你能把这个顺序看懂基本条件锁掌握得差很少了。spa

源码分析

ConditionObject的主要属性

public class ConditionObject implements Condition, java.io.Serializable {
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;
}
复制代码

能够看到条件锁中也维护了一个队列,为了和AQS的队列区分,我这里称为条件队列,firstWaiter是队列的头节点,lastWaiter是队列的尾节点,它们是干什么的呢?接着看。线程

lock.newCondition()方法

新建一个条件锁。指针

// ReentrantLock.newCondition()
public Condition newCondition() {
    return sync.newCondition();
}
// ReentrantLock.Sync.newCondition()
final ConditionObject newCondition() {
    return new ConditionObject();
}
// AbstractQueuedSynchronizer.ConditionObject.ConditionObject()
public ConditionObject() { }
复制代码

新建一个条件锁最后就是调用的AQS中的ConditionObject类来实例化条件锁。

condition.await()方法

condition.await()方法,代表如今要等待条件的出现。

// AbstractQueuedSynchronizer.ConditionObject.await()
public final void await() throws InterruptedException {
    // 若是线程中断了,抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加节点到Condition的队列中,并返回该节点
    Node node = addConditionWaiter();
    // 彻底释放当前线程获取的锁
    // 由于锁是可重入的,因此这里要把获取的锁所有释放
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 是否在同步队列中
    while (!isOnSyncQueue(node)) {
        // 阻塞当前线程
        LockSupport.park(this);
        
        // 上面部分是调用await()时释放本身占有的锁,并阻塞本身等待条件的出现
        // *************************分界线************************* //
        // 下面部分是条件已经出现,尝试去获取锁
        
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    
    // 尝试获取锁,注意第二个参数,这是上一章分析过的方法
    // 若是没获取到会再次阻塞(这个方法这里就不贴出来了,有兴趣的翻翻上一章的内容)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 清除取消的节点
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 线程中断相关
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
// AbstractQueuedSynchronizer.ConditionObject.addConditionWaiter
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 若是条件队列的尾节点已取消,从头节点开始清除全部已取消的节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        // 从新获取尾节点
        t = lastWaiter;
    }
    // 新建一个节点,它的等待状态是CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 若是尾节点为空,则把新节点赋值给头节点(至关于初始化队列)
    // 不然把新节点赋值给尾节点的nextWaiter指针
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    // 尾节点指向新节点
    lastWaiter = node;
    // 返回新节点
    return node;
}
// AbstractQueuedSynchronizer.fullyRelease
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 获取状态变量的值,重复获取锁,这个值会一直累加
        // 因此这个值也表明着获取锁的次数
        int savedState = getState();
        // 一次性释放全部得到的锁
        if (release(savedState)) {
            failed = false;
            // 返回获取锁的次数
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
// AbstractQueuedSynchronizer.isOnSyncQueue
final boolean isOnSyncQueue(Node node) {
    // 若是等待状态是CONDITION,或者前一个指针为空,返回false
    // 说明尚未移到AQS的队列中
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 若是next指针有值,说明已经移到AQS的队列中了
    if (node.next != null) // If has successor, it must be on queue
        return true;
    // 从AQS的尾节点开始往前寻找看是否能够找到当前节点,找到了也说明已经在AQS的队列中了
    return findNodeFromTail(node);
}
复制代码

这里有几个难理解的点:

(1)Condition的队列和AQS的队列不彻底同样;

AQS的队列头节点是不存在任何值的,是一个虚节点;

Condition的队列头节点是存储着实实在在的元素值的,是真实节点。
复制代码

(2)各类等待状态(waitStatus)的变化;

首先,在条件队列中,新建节点的初始等待状态是CONDITION(-2);

其次,移到AQS的队列中时等待状态会更改成0(AQS队列节点的初始等待状态为0);

而后,在AQS的队列中若是须要阻塞,会把它上一个节点的等待状态设置为SIGNAL(-1);

最后,无论在Condition队列仍是AQS队列中,已取消的节点的等待状态都会设置为CANCELLED(1);

另外,后面咱们在共享锁的时候还会讲到另一种等待状态叫PROPAGATE(-3)。
复制代码

(3)类似的名称;

AQS中下一个节点是next,上一个节点是prev;

Condition中下一个节点是nextWaiter,没有上一个节点。
复制代码

若是弄明白了这几个点,看懂上面的代码仍是轻松加愉快的,若是没弄明白,彤哥这里指出来了,但愿您回头再看看上面的代码。

下面总结一下await()方法的大体流程:

(1)新建一个节点加入到条件队列中去;

(2)彻底释放当前线程占有的锁;

(3)阻塞当前线程,并等待条件的出现;

(4)条件已出现(此时节点已经移到AQS的队列中),尝试获取锁;

也就是说await()方法内部实际上是先释放锁->等待条件->再次获取锁的过程。

condition.signal()方法

condition.signal()方法通知条件已经出现。

// AbstractQueuedSynchronizer.ConditionObject.signal
public final void signal() {
    // 若是不是当前线程占有着锁,调用这个方法抛出异常
    // 说明signal()也要在获取锁以后执行
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 条件队列的头节点
    Node first = firstWaiter;
    // 若是有等待条件的节点,则通知它条件已成立
    if (first != null)
        doSignal(first);
}
// AbstractQueuedSynchronizer.ConditionObject.doSignal
private void doSignal(Node first) {
    do {
        // 移到条件队列的头节点日后一位
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 至关于把头节点从队列中出队
        first.nextWaiter = null;
        // 转移节点到AQS队列中
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
// AbstractQueuedSynchronizer.transferForSignal
final boolean transferForSignal(Node node) {
    // 把节点的状态更改成0,也就是说即将移到AQS队列中
    // 若是失败了,说明节点已经被改为取消状态了
    // 返回false,经过上面的循环可知会寻找下一个可用节点
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 调用AQS的入队方法把节点移到AQS的队列中
    // 注意,这里enq()的返回值是node的上一个节点,也就是旧尾节点
    Node p = enq(node);
    // 上一个节点的等待状态
    int ws = p.waitStatus;
    // 若是上一个节点已取消了,或者更新状态为SIGNAL失败(也是说明上一个节点已经取消了)
    // 则直接唤醒当前节点对应的线程
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    // 若是更新上一个节点的等待状态为SIGNAL成功了
    // 则返回true,这时上面的循环不成立了,退出循环,也就是只通知了一个节点
    // 此时当前节点仍是阻塞状态
    // 也就是说调用signal()的时候并不会真正唤醒一个节点
    // 只是把节点从条件队列移到AQS队列中
    return true;
}
复制代码

signal()方法的大体流程为:

(1)从条件队列的头节点开始寻找一个非取消状态的节点;

(2)把它从条件队列移到AQS队列;

(3)且只移动一个节点;

注意,这里调用signal()方法后并不会真正唤醒一个节点,那么,唤醒一个节点是在啥时候呢?

还记得开头例子吗?倒回去再好好看看,signal()方法后,最终会执行lock.unlock()方法,此时才会真正唤醒一个节点,唤醒的这个节点若是曾经是条件节点的话又会继续执行await()方法“分界线”下面的代码。

结束了,仔细体会下^^

若是非要用一个图来表示的话,我想下面这个图能够大体表示一下(这里是用时序图画的,可是实际并不能算做一个真正的时序图哈,了解就好):

ReentrantLock

总结

(1)重入锁是指可重复获取的锁,即一个线程获取锁以后再尝试获取锁时会自动获取锁;

(2)在ReentrantLock中重入锁是经过不断累加state变量的值实现的;

(3)ReentrantLock的释放要跟获取匹配,即获取了几回也要释放几回;

(4)ReentrantLock默认是非公平模式,由于非公平模式效率更高;

(5)条件锁是指为了等待某个条件出现而使用的一种锁;

(6)条件锁比较经典的使用场景就是队列为空时阻塞在条件notEmpty上;

(7)ReentrantLock中的条件锁是经过AQS的ConditionObject内部类实现的;

(8)await()和signal()方法都必须在获取锁以后释放锁以前使用;

(9)await()方法会新建一个节点放到条件队列中,接着彻底释放锁,而后阻塞当前线程并等待条件的出现;

(10)signal()方法会寻找条件队列中第一个可用节点移到AQS队列中;

(11)在调用signal()方法的线程调用unlock()方法才真正唤醒阻塞在条件上的节点(此时节点已经在AQS队列中);

(12)以后该节点会再次尝试获取锁,后面的逻辑与lock()的逻辑基本一致了。

彩蛋

为何java有自带的关键字synchronized了还须要实现一个ReentrantLock呢?

首先,它们都是可重入锁;

其次,它们都默认是非公平模式;

而后,...,呃,咱们下一章继续深刻探讨 ReentrantLock VS synchronized。

推荐阅读

  1. 死磕 java同步系列之ReentrantLock源码解析(一)——公平锁、非公平锁

  2. 死磕 java同步系列之AQS起篇

  3. 死磕 java同步系列之本身动手写一个锁Lock

  4. 死磕 java魔法类之Unsafe解析

  5. 死磕 java同步系列之JMM(Java Memory Model)

  6. 死磕 java同步系列之volatile解析

  7. 死磕 java同步系列之synchronized解析


欢迎关注个人公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一块儿畅游源码的海洋。

qrcode
相关文章
相关标签/搜索