ReentrantLock的加锁过程

 

ReentrantLock的加锁过程

ReentrantLock是一个可重入的排他锁,他的加锁过程是经过cas操做完成的。ReentrantLock有三个内部类,分别是Sync , FairSync,NofairSync。其中后二者都继承了前者。经过他的内部类能够大概了解到ReentrantLock支持公平锁与非公平锁。java

废话很少说,先看一下ReentrantLock的非公平锁加锁的流程图。node

 

为了便于后面加锁流程的理解,先对AbstractQueuedSynchronizer的几个重要的属性进行简单说明:git

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    // . .
    // . .
    // . .省略
    static final class Node {
    
        // . .
        // . .
        // . .省略
        // 该属性记录当前节点的状态
        volatile int waitStatus;

        //指向前置节点
        volatile Node prev;

        // 指向后继节点
        volatile Node next;

        // 指向该节点所表明的线程
        volatile Thread thread;

        // . .
        // . .
        // . .省略
    

    }

    // 等待队列的头结点
    private transient volatile Node head;

    // 等待队列的尾结点
    private transient volatile Node tail;

    // 同步状态 ,多个线程进行锁竞争时,其实就是经过
    // cas操做将 state 的值从预期值变为想要更新的值
    // cas操做就是先比较在更新(即在更新以前先获取state的值,
    // 在真正进行更新的时候先将以前获取到state的值与如今
    // 的state的值进行比较,若是相等则更新不然就更新失败)。
    private volatile int state;

        // . .
        // . .
        // . .省略
    
}

Sync继承了AbstractQueuedSynchronizer类,加锁时会用到上面的属性因此加以说明。若是想看更详细的说明能够到并发

https://gitee.com/eatingbarbecue/jdk-translation 下载。这些是根据本身的理解简单翻译的(只翻译了这部分,后续会慢慢翻译若是你有幸看到这边文章又对其感兴趣能够加入进来,欢迎大神莅临指导)。jvm

下面按照流程图结合代码来讲明:高并发

/**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         *
         * 执行lock()方法。首先当即经过cas操做去修改state的值,尝试由0变为1,
         * 若是失败则正常执行acquire(1)
         */
        final void lock() {
            // 由于是非公平锁,因此无论等待队列里面是否有等待的线程
            // 直接经过cas操做 尝试将 state的值从0 更新为 1
            // 这里这个 1 还表示获取锁的次数。调用lock()方法
            // 是线程第一次竞争锁,当获取成功,state的值为1,
            // 若该线程在没有释放锁的状况下又调用了lock()方法,则改state的
            // 的值变为2 以此类推。这也说明ReentrantLock是可重入锁。
            // 这里compareAndSetState方法里面调用的是unsafe的
            // compareAndSwapInt方法。
            if (compareAndSetState(0, 1))
                // 若修改为功 这将持有锁的线程 置为当前线程
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // 否者执行 acquire方法。注意咋公平锁里是没有上面if的操做的
                // 直接执行acquire方法。
                acquire(1);
        }

 

/**
     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a {@code volatile} read
     * and write.
     *
     * 经过原子操做试图把state的值从预期值(expect)改为更新值(update)。
     * 该操做只有在state值得等于预期值的状况下才能成功。
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        // 实际是调用的unsafe的compareAndSwapInt方法。改方法有四个参数,
        // 其中第一个参数是当前锁对象,第二个参数为要修改锁对象的哪一个属性值
        // 第三个与第四个参数分别为预期值和要更新成的值。后两个参数比较好理解
        // 这里简单说一下前两个参数的含义,stateOffset是state相对于锁对象的
        // 起始指针的偏移量。
        // ——————————————
        // | this       | 0x00000
        // | stateOffset| 0x00010
        // |            |
        // |            |
        // ———————————————
        // 该方法是获取state的偏移量的值。
        // stateOffset = unsafe.objectFieldOffset
        //                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        // 这里的stateOffset是在类加载的时候初始化的
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
// 注意改方法是AbstractOwnableSynchronizer类实现的
    // 将锁的持有线程进行赋值
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

根据代码能够看到非公平锁在上锁时会先直接竞争锁,若是竞争成功则方法执行结束 ,进入同步代码块(即调用lock方法的线程的后面的代码)。也就是流程途中最上面的部分:学习

若竞争失败则执行acquire(1)方法:ui

/**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * 以独占的方式获取锁,忽略中断。返回成功时至少调用一次 {@link #tryAcquire},
     * 若tryAcquire失败,当前线程入队,会一直调用{@link #tryAcquire}直到成功
     * 获取到锁。该方法一般被{@link Lock#lock}调用。
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     *
     * @param arg 这个参数会传给{@link #tryAcquire}能够根据实际状况来让参数
     *            表达你想要表达的意思。
     */
    public final void acquire(int arg) {

        //当尝试获取锁失败的时候则将当前线程保存到一个节点里以独占的模式添加到等待
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

由上代码可知,acquire方法先tryAcquire ,当tryAcquire失败时才执行入队操做即addWaiter方法。下面贴出tryAcquire方法的代码:this

protected final boolean tryAcquire(int acquires) {
        // 这里非公平锁的tryAcquire方法调用的是父类Sync的
        // 方法(公平锁里的tryAcquire方法并无调用父类的方法
        // 而是本身实现的),
        return nonfairTryAcquire(acquires);
    }

继续看 nonfairTryAcquire方法:.net

/**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         * 
         * 非公平的tryLock实现。
         */
        // 
        final boolean nonfairTryAcquire(int acquires) {
            // 取到当前线程
            final Thread current = Thread.currentThread();
            // 先获取state的值,记录他的初始状态
            int c = getState();
            // 若是 c == 0 则说明在上一步获取state时没有线程竞争到
            // 锁,能够去竞争锁(即经过cas操做将state 由 0 变为 1)
            if (c == 0) {
                // 过cas操做将state 由 0 变为 1 
                // 该操做保证只有一个线程能够操做成功
                if (compareAndSetState(0, acquires)) {
                    // 若是成功 则state的值已经变为 1 
                    // 将当前线程置为拥有锁的线程
                    // 注意这里的代码只有一个线程能够执行到
                    setExclusiveOwnerThread(current);
                    // 返回true 该线程的加锁流程执行完毕
                    return true;
                }
            }
            // 若是 c != 0 而后判断 当前线程是不是拥有锁的线程
            else if (current == getExclusiveOwnerThread()) {
                // 若是是的话 则将 将state的值加上方法的参数(acquires)
                // 这里是处理同一个线程在没有释放锁的状况下屡次获取锁的
                // 过程(即锁的重入),state记录的就是重入的次数。
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // 这里设置state的值的时候没有用cas操做,由于能执行到这块代码
                // 的只能是是拥有锁的线程,不存在竞争因此没有必要使用cas操做
                setState(nextc);
                // 返回true
                return true;
            }
            // 当state的值不为0并且当前线程也不是拥有锁的线程则返回false
            return false;
        }

从nonfairTryAcquire方法可知 两点:

  • 非公平锁去获取锁的时候是无论等待队列的,这就会出现这种状况,最新竞争的线程颇有可能比这它前面的线程先获取锁。

  • tryAcquire方法里面处理了锁重入的逻辑,state的值记录了重入的次数。

整个tryAcquire的执行流程对应以下图(从执行tryAcquire()开始):

从流程图能够轻易的发现,这个方法没有自旋的状况,要么成功要么失败。

若tryAcquire失败则执行入队操做addWaiter(Node.EXCLUSIVE)(能够看上面acquire方法)。

下面是addWaiter(Node.EXCLUSIVE)代码:

/**
     * Creates and enqueues node for current thread and given mode.
     *
     * 以当前线程和指定的模式建立节点而且入队
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 先尝试快速将节点插入队尾。若果失败则执行完整的入队方法。
        // 问题 一、为何有这个先尝试入队的操做
        // 这里先尝试将当前node加入队尾,是由于这段代码的执行成功的几率
        // 是很高的,因此不用每次都建立循环,这样jvm不须要建立循环,提升效率
        // 能够学习一下这种变成技巧。
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // 经过cas操做肯定当前线程是否能够更新队尾的next指针指向当前的node
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 若是上面的代码没有走到return则进入enq方法。
        // 这里须要注意的是 并无return enq(node);
        // 问题 二、为何没有return enq(node);
        // 由于enq(node)方法返回的是队尾的前置节点
        enq(node);
        return node;
    }

这里有两个问题须要注意,上面代码注释里已经写清楚了。下面看enq(node)方法:

/**
     * Inserts node into queue, initializing if necessary. See picture above.
     *
     * 在队列里插入节点,若是队列为空先初始化。
     *
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        // 用死循环来保证当前节点必定能插入到队尾
        for (;;) {
            // 获取当前尾结点
            // 此时可能有其余线程也执行到这一步
            Node t = tail;
            // 若是尾结点为空说明队列尚未初始化 先初始化
            // 此时可能有其余线程也执行到这一步
            if (t == null) { // Must initialize
                // 这里经过cas操做初始化队列的head 
                // 此时可能有其余线程也执行到这一步,因此使用cas
                // 操做进行队列初始化,保证队列只初始化一次。
                // 假如当先线程初始化失败,则会又进入循环最开部分
                // 在判断就不会为空了
                // 还要注意的,并非把当前的node做为head,而是新建立的node。
                if (compareAndSetHead(new Node()))
                    // 这里tail = head tail就不为空
                    tail = head;
            } else {
                // 这里可能会有多个未加入队尾的node指向队列的尾结点,可是不影响
                node.prev = t;
                // 经过cas操做将node加入到队尾,代码compareAndSetTail(t, node)实际的操做是将锁的 队尾偏移量
                // 从原来指向t 变为指向 node 。只执行完这句话 在java层面这个入队操做
                // 还没完,还须要执行 t.next = node; 即下面的代码。由于等待队列是一个双向队列。
                // 
                if (compareAndSetTail(t, node)) {
                    // 这句代码执行玩才算入队操做完成
                    t.next = node;
                    // 注意这里并无返回 node 而是node前置节点,也就是旧的队尾的节点。
                    return t;
                }
            }
        }
    }

对应流程图的部分为:

addWaiter方法会有一个小小的自旋,那就是入队操做。当节点入队成功后会执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),acquireQueued方法经过循环(自旋)和LockSupport.park()完成线程在lock()方法上的等待,下面看acquieQueued方法的代码:

/**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * 队列里的线程会不停的以独占模式去获取锁(即cas操做返回成功)。
     * condition wait与acquire都会调用这个方法。
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        // 记录当前线程有异常或者被中断,
        // 便于后面finally的处理
        boolean failed = true;
        try {
            // 记录线程中断标志
            boolean interrupted = false;
            // 开始自循环,
            for (;;) {
                // 获取节点的前置节点,当前节点的前置节点为空时
                // 会抛出空指针异常
                final Node p = node.predecessor();
                // 当前置节点为头结点 而且 获取到了锁( 成功修改了锁的 state )
                if (p == head && tryAcquire(arg)) {
                    // 将当前节点设置为头结点
                    // 具体的作法是将 node
                    //        head = node;
                    //        node.thread = null;
                    //        node.prev = null;
                    // 这里当前线程已经出队了,node做为新的head节点已经不保存
                    // 线程的信息了。
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    // 返回中断标志,这里须要注意并无返回true,由于本方法能执行完
                    // 就说明该线程已经成功获取锁。返回中断标志是为了让acquire方法响应
                    // 中断。
                    return interrupted;
                }
                // 若当前节点的前置节点不是head节点,或者竞争锁失败 进入
                // shouldParkAfterFailedAcquire方法,该方法见名知意
                // 获取锁失败时是否要park线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 当程序正常执行的话,是必定 执行过 failed = false;
            // 在退出循环而后执行 finally 代码块,此时 failed == false
            // cancelAcquire(node)是执行不到的,
            // 当for循环里遇到异常时没有走到 failed = false; 就退出循环才会
            // 执行到cancelAcquire(node).
            if (failed)
                cancelAcquire(node);
        }
    }

在acquireQueued方法里,正常状况下只有当 p == head && tryAcquire(arg) 的时候才能跳出循环。若没有

if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;

这块代码,那么当前节点只能等到他的前辈们一个一个的出队了才能轮到他。在等待的这段时间,当前线程也没有闲着,会一直循环的问cup我是否是能够出队了。这样彷佛也能完成加锁的过程。若是线程少还行,cup可能会有耐心和那个时间,可是若是在高并发的状况下,cup就必须咋百忙中去回答你。对应术语上来讲就是太占用cpu资源了。因此 shouldParkAfterFailedAcquire方法就是处理这种状况的,下面看代码:

/**
     *
     * 检查更新失败acquire的节点的status。若是线程须要阻塞则返回true。acquire
     * 的循环主要又他来控制(好比替换前置节点或者前置节点的waiteStatus)。
     * 要求pred = node.prev(指的是这个方法的参数的关系)。
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取前置节点的waitSatue 下面是 waitStatue的几个值所表明的的意思
        //         *   SIGNAL:    -1, 该节点的后继节点能够park()
        //         *
        //         *   CANCELLED:  1 ,该节点已经被取消,若是某个节点的前置节点的waitStatue则跳过
                        
        //         *
        //         *   CONDITION:  -1, 该节点在条件队列里才会出现这个值,通常在等待队列是不会出现的
        //         *
        //         *   PROPAGATE:  暂时还没看
        //         *   0:          不是上述的任bai何一个。
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * 该线程能够执行LockSupport.park()方法。
             * 就是意味着该线程已经知道不应我执行呢,在队列里面等这前面的节点叫你吧
             * 经过LockSupport.unpark()方法唤醒
             * 
             */
            return true;
        if (ws > 0) {
            /*
            * 当ws > 0 时 说明该节点的前置节点已经被取消了可是尚未出队,这个前置节点已经不能叫醒你了(这个前置节点里的线程已经死了
             * 不能执行唤醒操做了),因此该节点须要向前找一个能够叫醒本身的节点,他会一直问向前问直到前面的某个节点
             * 能够叫醒本身。对应waitStatus <= 0;
             * 
             * 
             * 
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 当前置节点的waitStatus 为 0 或者 PROPAGATE 时 说明咱们须要一个信号,
            // 可是还不须要park呢,这时候咱们须要等一轮 看看前置节点是否释放锁,
            // 这时候先把前置节点的waitStatuus设置为Node.SIGNAL 若是没有获取到锁
            // 那么就能够park等着了。
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

shouldParkAfterFailedAcquire作了三件事

  • 若pred.waitStatue = -1 , 则返回true,进入park阻塞状态。

  • 若pred.waitStatue = 1 ,剔除pred,而且向前找到,而且将本身链接到一个非取消的线程节点上 ,返回false

  • 若pred.waitStatue = 0 or pred.waitStatue = -2,则将pred.waitStatue 置为 -1 ,返回false

当shouldParkAfterFailedAcquire返回false时会进入下一次循环,判断 pred == head && tryAcquire(1) ...

当shouldParkAfterFailedAcquire返回true时,调用parkAndCheckInterrupt方法进入阻塞状态:

/**
     * Convenience method to park and then check if interrupted
     *
     * 让当前线程阻塞,返回中断状态
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        
        // 
        LockSupport.park(this);
        return Thread.interrupted();
    }

LockSupport.park(this)的实现以下

public static void park(Object blocker) {
        Thread t = Thread.currentThread();

        //
        // setBlocker的调用了
        // UNSAFE.putObject(t, parkBlockerOffset, arg);
        // 该方法是将当前线程的 parkBlock偏移量置为blocker
        setBlocker(t, blocker);
        // 进入阻塞状态 等待被唤醒 等着其余线程执行 LockSupport.unpark(t)
        // 关于LockSupport方法的说明能够看这篇博客:
        // https://blog.csdn.net/a7980718/article/details/83661613
        UNSAFE.park(false, 0L);
        // 当被唤醒后先把parkBlock偏移量置为null,由于已经不
        // 被阻塞了因此须要移除blocker
        setBlocker(t, null);
    }

      关于LockSupport方法的说明能够看这篇博客:https://blog.csdn.net/a7980718/article/details/83661613

当线程从阻塞状态被唤醒后会返回线程的 中断状态 return Thread.interrupted()。对应流程图的最后部分:

到这里在正常的状况下(没有异常的状况下)整个上锁流程算是走完了,稍微总结一下:从lock.lock()开始:

  1. 经过cas操做获取锁若成功,方法执行结束,不然进入第2步。

  2. 若state == 0 尝试获取锁,成功则方法结束,不然进入第3步,若(state != 0 && 当先线程 == 获取锁的线程) 则 state++ 方法结束,不然进入第3步

  3. 将线程包装成一个node 进入到等待队列

  4. 若 前置节点 == head && tryAcquire(1) == true  方法结束 返回中断状态,不然进入第5步。

  5. 若pred.waitStatue = -1 , 则返回true, 进入park阻塞状态。若pred.waitStatue = 1 ,剔除pred,而且向前找到,而且将本身链接到一个非取消的线程节点上 ,返回false,若pred.waitStatue = 0 or pred.waitStatue = -2,则将pred.waitStatue 置为 -1 ,返回false. 第五步执行完毕后返回第四步。