关于 ReentrantLock 中锁 lock() 和解锁 unlock() 的底层原理浅析

关于 ReentrantLock 中锁 lock() 和解锁 unlock() 的底层原理浅析node

以下代码,当咱们在使用 ReentrantLock 进行加锁和解锁时,底层究竟是如何帮助咱们进行控制的啦?多线程

    static Lock lock = new ReentrantLock();

    public static void main(String[] args) {

        // 使用两个线程模拟多线程执行并发
        new Thread(() -> doBusiness(), "Thread-1").start();
        new Thread(() -> doBusiness(), "Thread-2").start();
    }

    private static void doBusiness() {
        try {
            lock.lock();
            System.out.println("须要加锁的业务处理代码,防止并发异常");
        } finally {
            lock.unlock();
        }
    }

带着这样的疑问,咱们前后跟进 lock()和unlock() 源码一探究竟并发

说明:app

  一、在进行查看 ReentrantLock 进行 lock() 加锁和 unlock() 解锁源码时,须要知道 LockSupport 类、了解自旋锁以及链表相关知识。ui

  二、在分析过程当中,假设第一个线程获取到锁的时候执行代码须要很长时间才释放锁,及在第二个第三个线程来获取锁的时候,第一个线程并无执行完成,没有释放锁资源。this

  三、在分析过程当中,咱们假设第一个线程就是最早进来获取锁的线程,那么第二个第三个线程也是依次进入的,不会存在第三个线程先于第二个线程(即第三个线程若是先于第二个线程发生,那么第三个线程就是咱们下面描述的第二个线程)spa

 

1、 lock() 方法线程

一、查看lock()方法源码翻译

    public void lock() {
        sync.lock();
    }

  从上面能够看出 ReentrantLock 的 lock() 方法调用的是 sync 这个对象的 lock() 方法,而 Sync 就是一个实现了抽象类AQS(AbstractQueuedSynchronizer) 抽象队列同步器的一个子类,继续跟进代码(说明:ReentrantLock 分为公平锁和非公平锁,若是无参构造器建立锁默认是非公平锁,咱们按照非公平锁的代码来说解)code

  1.1 关于Sync子类的源码

abstract static class Sync extends AbstractQueuedSynchronizer { 
    // 此处省略具体实现AbstractQueuedSynchronizer 类的多个方法
}

  这里须要说明的是 AbstractQueuedSynchronizer 抽象队列同步器底层是一个经过Node实现的双向链表,该抽象同步器有三个属性 head 头节点tail 尾节点 和 state 状态值。

    属性1:head——注释英文翻译:等待队列的头部,懒加载,用于初始化,当调用 setHead() 方法的时候会对 head 进行修改。注:若是 head 节点存在,则 head 节点的 waitStatus 状态值用于保证其不变成 CANCELLED(取消,值为1) 状态

    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;

    属性2: tail——tail节点是等待队列的尾部,懒加载,在调用 enq() 方法添加一个新的 node 到等待队列的时候会修改 tail 节点。

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

    属性3:state——用于同步的状态码。若是 state 该值为0,则表示没有其余线程获取到锁,若是该值大于1则表示已经被某线程获取到了锁,该值能够是二、三、4,用该值来处理重入锁(递归锁)的逻辑。

    /**
     * The synchronization state.
     */
    private volatile int state;

  1.2 上面 Sync 类使用 Node来做为双向队列的具体保存值和状态的载体,Node 的具体结构以下

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node(); // 共享锁模式(主要用于读写锁中的读锁)
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null; // 排他锁模式(也叫互斥锁)

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1; // Node线程等待取消,再也不参与锁竞争,处于这种状态的Node会被踢出队列,被GC回收
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1; // 代表Node线程须要被唤醒,能够竞争锁
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2; // 表示这个Node线程在条件队列中,由于等待某个条件而被阻塞 
        /** waitStatus value to indicate the next acquireShared should unconditionally propagate */
        static final int PROPAGATE = -3; // 使用在共享模式头Node有可能处于这种状态, 表示锁的下一次获取能够无条件传播

        volatile int waitStatus; // 默认初始状态为0,全部新增node节点的初始状态都是0

        volatile Node prev; // 前驱Node节点

        volatile Node next; // 后继Node节点

        /** The thread that enqueued this node.  Initialized on construction and nulled out after use.*/
        volatile Thread thread; // 当前线程和节点进行绑定,经过构造器初始化Thread,在使用的时候将当前线程替换原有的null值

        // 省略部分代码

  说明:Sync 经过Node节点构建队列,Node节点使用prev和next节点来行程双向队列,使用prev来关联上一个节点,使用next来关联下一个节点,每个node节点和一个thread线程进行绑定,用来表示当前线程在阻塞队列中的具体位置和状态 waitStatus

 

二、上面的 sync.lock() 继续跟进源码(非公平锁):

    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else acquire(1);
    }

说明:上面代码说明,若是 compareAndSetState(0, 1) 为 true ,则执行 setExclusiveOwnerThread(Thread.currentThread()) ,不然执行 acquire(1);

  2.1 compareAndSetState(0, 1) 底层使用unsafe类完成CAS操做,意思就是判断当前state状态是否为0,若是为零则将该值修改成1,并返回true;state不为0,则没法将该值修改成1,返回false。

    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

  2.2 假如第1个线程进来的时候 compareAndSetState(0, 1) 确定执行成功,state 状态会从0变成1,同时返回true,执行 setExclusiveOwnerThread(Thread.currentThread()) 方法:

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

  setExclusiveOwnerThread(Thread.currentThread()) 表示将当前 Sync 对象和当前线程绑定,意思是代表:当前对内同步器执行的线程为 thread,该 thread 获取了锁正在执行。

  2.3 假如进来的线程为第2个,而且第一个线程还在执行没有释放锁,那么第2个线程就会执行 acquire(1)方法:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

  进入到该方法中发现,须要经过 !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 两个方法判断是否须要执行 selfInterrupt();

    (1)先执行tryAcquire(arg)这个方法进行判断

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
       // 获取state状态,由于第一个线程进来的时候只要尚未执行完就已经将state设置为1了(即:2.1步)
int c = getState();
       // 再次判断以前获取锁的线程是否已经释放锁了
if (c == 0) {
// 若是以前的线程已经释放锁,那么当前线程进来就将状态改成1,而且设置当前占用锁的线程为自身
if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } }
       // 判断当前占用锁的线程是否是就是我自身,若是是我自身,这将State在原值的基础上进行加1,来处理重入锁逻辑
else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires;
          // 判断重入锁次数是否是超过限制,超过限制则直接报错
if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

    从上面的方法看,若是第二个线程进来,且第一个线程还未释放锁的状况下,该方法 tryAcquire(arg) 直接放回false,那么 !tryAcquire(arg) 就为true,须要判断第二个方法 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),第二个方法先执行addWaiter(Node.EXCLUSIVE),及添加等待线程进入队列

    (2)添加等待线程到同步阻塞队列中

    private Node addWaiter(Node mode) {
     // 将当前线程和node节点进行绑定,设置模式为排他锁模式 Node node
= new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail;// 第二个线程也就是第一次进来该方法的线程,tail确定是null if (pred != null) { // 若是tail尾节点不为空,表示第三、四、5次进来的线程 node.prev = pred; // 那么就将当前进来的线程节点的 prev 节点指向以前的尾节点 if (compareAndSetTail(pred, node)) { // 经过比较并交换,若是当前尾节点在设置过程当中没有被其余线程抢先操做,那么就将当前节点设置为tail尾节点 pred.next = node; // 将之前尾节点的下一个节点指向当前节点(新的尾节点) return node; } } enq(node); // 若是为第二个线程进来,就是上面的 pred != null 成立没有执行,直接执行enq()方法 return node; }
    private Node enq(final Node node) {
        for (;;) { // 一直循环检查,至关于自旋锁
            Node t = tail;
            if (t == null) { // Must initialize
                   // 第二个线程的第一次进来确定先循环进入该方法,这时设置头结点,该头结点通常被称为哨兵节点,而且头和尾都指向该节点
if (compareAndSetHead(new Node())) tail = head; } else {
          // 一、第二个线程在第二次循环时将进入else 方法中,将该节点挂在哨兵节点(头结点)后,而且尾节点指向该节点,而且将该节点返回(该节点有prev信息) node.prev
= t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

    如上在执行 enq(final Node node) 结束,而且返回添加了第二个线程node节点的时候, addWaiter(Node mode) 方法会继续向上返回

    或者

    若是是添加第三、4个线程直接走 addWaiter(Node mode) 方法中的 if 流程直接添加返回

    都将到2.3 步,执行acquireQueued(final Node node, int arg),再次贴源码

    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    (3)即下一步就会执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法:

    注:上面的流程是将后面的线程加入到了同步阻塞队列中,下面的方法第一个if (p == head && tryAcquire(arg))则是看同步阻塞队列的第一条阻塞线程是否能够获取到锁,若是可以获取到锁就修改相应链表结构,第二个if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()即将发生线程阻塞

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
          // 自旋锁,若是为第二个线程,那么 p 就是 head 哨兵节点
final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {
            // 上面的 if 代表若是当前线程为同步阻塞队列中的第一个线程,那么就再次试图获取锁 tryAcquire(),若是获取成功,则修改同步阻塞队列 setHead(node); // 将head头结点(哨兵节点)设置为已经获取锁的线程node,并将该node的Theread 设置为空 p.next
= null; // help GC 取消和以前哨兵节点的关联,便于垃圾回收器对以前数据的回收 failed = false; return interrupted; }
          // 若是第二个线程没有获取到锁(同步阻塞队列中的第一个线程),那么就须要执行下面两个方法,注标的方法会让当前未获取到锁的线程阻塞
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
    private void setHead(Node node) {
        // 将哨兵节点日后移,而且将 thread 设置为空,取消和之前哨兵节点的关联,并于垃圾回收器回收
        head = node;
        node.thread = null;
        node.prev = null;
    }

    shouldParkAfterFailedAcquire(p, node)这个方法将哨兵队列的状态设置为待唤醒状态

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
     // 该方法第一次进来 pred为哨兵节点,ws为哨兵节点的初始0状态
     // 该方法第二次进来 pred为哨兵节点,ws为哨兵节点的状态-1状态
     // 该方法第3、四次进来就为链表的倒数第二个节点,ws为倒数第二个节点的状态
int ws = pred.waitStatus;
    
if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */
       // 如上第二次进来的时候哨兵节点的状态就是-1,此时返回true
return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do {
          // 若是ws及倒数第二个节点是取消状态,那么经过双向链表向前找倒数第三个,第四关节点,直到向前找到最近一个状态不是取消的node节点,并把当前节点挂在该节点后 node.prev
= pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 将头结点(哨兵节点)设置成待唤醒状态,第一次进来的时候 有0——>-1 而后继续执行返回false } return false; }
    parkAndCheckInterrupt()这个方法会让当前线程阻塞
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); // LockSupport.park()会致使当前线程阻塞,直到某个线程调用unpark()方法
        return Thread.interrupted();
    }

    那么在lock()方法执行时,只要第一个线程没有unlock()释放锁,其余全部线程都会加入同步阻塞队列中,该队列中记录了阻塞线程的顺序,在加入同步阻塞队列前有屡次机会能够抢先执行(非公平锁),若是没有被执行到,那么加入同步阻塞队列后,就只有头部节点(哨兵节点)后的阻塞线程有机会获取到锁进行逻辑处理。再次查看该方法: 

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    // if 代表只有头部节点(哨兵节点)后的节点在放入同步阻塞队列前能够获取锁
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 全部线程都被阻塞在这个方法处
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }        

 

2、unlock()方法

一、unlock源码

    public void unlock() {
        sync.release(1);
    }

  一样是调用的同步阻塞队列的方法 sync.release(1),跟进查看源码:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

二、查看tryRelease()方法:

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                // 若是不是自身锁对象调用unlock()方法的话,就报异常
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                // 若是标志位已经为0,表示重入锁已经所有释放,这将当前获取锁的线程设置为null,以便其余线程进行加锁
                setExclusiveOwnerThread(null);
            }
            // 更新重入锁解锁到达的次数,若是C不为0,表示还有重入锁unlock()没有调用完
            setState(c);
            return free;
        }        

三、若是tryRelease()方法成功执行,表示以前获取锁的线程已经执行完全部须要同步的代码(重入锁也彻底退出),那么就须要唤醒同步阻塞队列中的第一个等待的线程(也是等待最久的线程),执行unparkSuccessor(h)方法:

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        // 先获取头结点(哨兵节点)的waitStatus状态,若是小于0,则能够获取锁,并将waitStatus的状态设置为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            // 若是哨兵节点的下一个节点为null,或者状态为1表示已经取消,则依次循环寻找后面节点,直至找到一个waitStatus<0的节点,并将该节点设置为须要获取锁的节点
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // 将该node节点的线程解锁,容许它去获取锁,而后执行业务逻辑
 LockSupport.unpark(s.thread);
    }

 

3、unlock()方法调用后,会到lock()方法阻塞的地方,完成唤醒工做

一、在上面方法 unparkSuccessor(Node node) 中执行完 LockSupport.unpark(s.thread) 后在同步阻塞队列后的第一个 node 关联的线程将被唤醒,即unlock()方法代码执行完,将会到lock() 源码解析的 2.3 步里,第三次贴该处源码:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

二、在上面放大的红色方法中,以前上面lock()源码讲了当中全部线程都被阻塞了,以下面源码红色标记的地方:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

三、全部未获取到锁的线程都在 parkAndCheckInterrupt() 方法处阻塞着,因此咱们即将唤醒的哨兵节点后的第一个阻塞线程也是在该处阻塞着,在执行完 unlock() 源码步骤第3步unparkSuccessor(Node node)中的方法,则将返回到以前阻塞线程的这个方法 parkAndCheckInterrupt()的这行代码 LockSupport.park(this) 的下一步执行 Thread.interrupted(),由于线程没有被打断,因此返回false,故acquireQueued(final Node node, int arg)方法中继续轮训再次尝试acquireQueued(final Node node, int arg)获取锁,由于第一个线程已经释放锁,因此第二个线程能够获取锁了,并在执行完后返回interrupted为false,表示线程不是被中断的。理,其余线程也在parkAndCheckInterrupt()这个方法中中断着,等待被第二个线程唤醒。

 

总结:

  在第一个 A 线程 lock() 获取到锁后,第一个线程会在底层的同步阻塞队列中设置锁状态 state 为1(若是重入锁屡次获取 state 每次加1),并设置拥有当前锁的线程为自身A线程,其余线程 B/C/D 来获取锁的时候就会比较锁状态是否为0,若是不为0,表示已经被获取了锁,再次比较获取锁的线程是否为自身,若是为自身则对 state 加1(知足重入锁的规则),不然这将当前未获取到锁的线程放入同步阻塞队列中,在放入的过程当中,须要设置 head 哨兵节点和 tail 尾节点,以及相应的 waitStatus 状态,而且在放入过程当中须要设置当前节点以及先关节点的 prev 和 next 节点,从而达到双向队列的效果,存放到阻塞队列后,线程会被阻塞到这样一个方法中 parkAndCheckInterrupt(),等待被唤醒。

  在第一个 A 线程执行完毕,调用 unlock() 解锁后,unlock() 方法会从同步阻塞队列的哨兵节点后的第一个节点获取等待解锁的线程B,并将其解锁,而后就会到B阻塞的方法 parkAndCheckInterrupt() 来继续执行,调用 selfInterrupt()方法,最终调用底层的 c语言的唤醒方法,使得 B 线程完成 lock() 方法获取到锁,而后执行业务逻辑。其余线程以此类推,依次发生阻塞和唤醒。

 

根据源码总结的 lock() 和 unlock() 的原理,欢迎大神批评指正,若有恰好的看法,也欢迎你们相互交流。

相关文章
相关标签/搜索