并发Lock之ReentrantLock实现原理

咱们在以前介绍了并发编程的锁机制:synchronized和lock,lock接口的重要实现类是可重入锁ReentrantLock。而上一篇并发Lock之AQS(AbstractQueuedSynchronizer)详解介绍了AQS,谈到ReentrantLock,不得不谈抽象类AbstractQueuedSynchronizer(AQS)。AQS定义了一套多线程访问共享资源的同步器框架,ReentrantLock的实现依赖于该同步器。本文在介绍过AQS,结合其具体的实现类ReentrantLock分析实现原理。java

ReentrantLock类图

ReentrantLock
ReentrantLock类图

ReentrantLock实现了Lock接口,内部有三个内部类,Sync、NonfairSync、FairSync,Sync是一个抽象类型,它继承AbstractQueuedSynchronizer,这个AbstractQueuedSynchronizer是一个模板类,它实现了许多和锁相关的功能,并提供了钩子方法供用户实现,好比tryAcquire,tryRelease等。Sync实现了AbstractQueuedSynchronizer的tryRelease方法。NonfairSync和FairSync两个类继承自Sync,实现了lock方法,公平抢占和非公平抢占针对tryAcquire有不一样的实现。本文重点介绍ReentrantLock默认的实现,即非公平锁的获取锁和释放锁的实现。node

非公平锁的lock方法

lock方法

  • 在初始化ReentrantLock的时候,若是咱们不传参数,那么默认使用非公平锁,也就是NonfairSync。
public ReentrantLock() {   
  sync = new NonfairSync();  
} 
复制代码
  • 当咱们调用ReentrantLock的lock方法的时候,其实是调用了NonfairSync的lock方法,这个方法先用CAS操做,去尝试抢占该锁。若是成功,就把当前线程设置在这个锁上,表示抢占成功。若是失败,则调用acquire模板方法,等待抢占。代码以下:
static final class NonfairSync extends Sync {
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
复制代码
  • 上面调用acquire(1)实际上使用的是AbstractQueuedSynchronizer的acquire方法,它是一套锁抢占的模板,整体原理是先去尝试获取锁,若是没有获取成功,就在CLH队列中增长一个当前线程的节点,表示等待抢占。而后进入CLH队列的抢占模式,进入的时候也会去执行一次获取锁的操做,若是仍是获取不到,就调用LockSupport.park将当前线程挂起。那么当前线程何时会被唤醒呢?当持有锁的那个线程调用unlock的时候,会将CLH队列的头节点的下一个节点上的线程唤醒,调用的是LockSupport.unpark方法。acquire代码比较简单,具体以下:
public final void acquire(int arg) {
	if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
	    selfInterrupt();
}
复制代码
  • acquire方法内部先使用tryAcquire这个钩子方法去尝试再次获取锁,这个方法在NonfairSync这个类中其实就是使用了nonfairTryAcquire,具体实现原理是先比较当前锁的状态是不是0,若是是0,则尝试去原子抢占这个锁(设置状态为1,而后把当前线程设置成独占线程),若是当前锁的状态不是0,就去比较当前线程和占用锁的线程是否是一个线程,若是是,会去增长状态变量的值,从这里看出可重入锁之因此可重入,就是同一个线程能够反复使用它占用的锁。若是以上两种状况都不经过,则返回失败false。代码以下:
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
复制代码
  • tryAcquire一旦返回false,就会则进入acquireQueued流程,也就是基于CLH队列的抢占模式

首先,在CLH锁队列尾部增长一个等待节点,这个节点保存了当前线程,经过调用addWaiter实现,这里须要考虑初始化的状况,在第一个等待节点进入的时候,须要初始化一个头节点而后把当前节点加入到尾部,后续则直接在尾部加入节点就好了。编程

private Node addWaiter(Node mode) {
	// 初始化一个节点,这个节点保存当前线程
    Node node = new Node(Thread.currentThread(), mode);
    // 当CLH队列不为空的视乎,直接在队列尾部插入一个节点
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
	// 当CLH队列为空的时候,调用enq方法初始化队列
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 初始化节点,头尾都指向一个空节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {// 考虑并发初始化
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
复制代码
  • 将节点增长到CLH队列后,进入acquireQueued方法。

首先,外层是一个无限for循环,若是当前节点是头节点的下个节点,而且经过tryAcquire获取到了锁,说明头节点已经释放了锁,当前线程是被头节点那个线程唤醒的,这时候就能够将当前节点设置成头节点,而且将failed标记设置成false,而后返回。至于上一个节点,它的next变量被设置为null,在下次GC的时候会清理掉。安全

若是本次循环没有获取到锁,就进入线程挂起阶段,也就是shouldParkAfterFailedAcquire这个方法。微信

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
复制代码
  • 若是尝试获取锁失败,就会进入shouldParkAfterFailedAcquire方法,会判断当前线程是否挂起,若是前一个节点已是SIGNAL状态,则当前线程须要挂起。若是前一个节点是取消状态,则须要将取消节点从队列移除。若是前一个节点状态是其余状态,则尝试设置成SIGNAL状态,并返回不须要挂起,从而进行第二次抢占。完成上面的过后进入挂起阶段。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        //
        return true;
    if (ws > 0) {
        //
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
复制代码
  • 当进入挂起阶段,会进入parkAndCheckInterrupt方法,则会调用LockSupport.park(this)将当前线程挂起。
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

复制代码

非公平锁的unlock方法

  • 调用unlock方法,实际上是直接调用AbstractQueuedSynchronizer的release操做。
public void unlock() {
	sync.release(1);
}
复制代码
  • 进入release方法,内部先尝试tryRelease操做,主要是去除锁的独占线程,而后将状态减一,这里减一主要是考虑到可重入锁可能自身会屡次占用锁,只有当状态变成0,才表示彻底释放了锁。
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
复制代码
  • 一旦tryRelease成功,则将CHL队列的头节点的状态设置为0,而后唤醒下一个非取消的节点线程。
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
          free = true;
          setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
 }
复制代码
  • 一旦下一个节点的线程被唤醒,被唤醒的线程就会进入acquireQueued代码流程中,去获取锁。
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null) 
       LockSupport.unpark(s.thread);
}
复制代码

tryLock

在ReetrantLock的tryLock(long timeout, TimeUnit unit)提供了超时获取锁的功能。它的语义是在指定的时间内若是获取到锁就返回true,获取不到则返回false。这种机制避免了线程无限期的等待锁释放。多线程

public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
   return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
复制代码

具体看一下内部类里面的方法tryAcquireNanos并发

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
   if (Thread.interrupted())
       throw new InterruptedException();
   return tryAcquire(arg) ||
       doAcquireNanos(arg, nanosTimeout);
}
复制代码

若是线程被中断了,那么直接抛出InterruptedException。若是未中断,先尝试获取锁,获取成功就直接返回,获取失败则进入doAcquireNanos。tryAcquire咱们已经看过,这里重点看一下doAcquireNanos作了什么。框架

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    // 起始时间
    long lastTime = System.nanoTime();
    // 线程入队
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        // 又是自旋!
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            // 若是前驱是头节点而且占用锁成功,则将当前节点变成头结点
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // 若是已经超时,返回false
            if (nanosTimeout <= 0)
                return false;
            // 超时时间未到,且须要挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                // 阻塞当前线程直到超时时间到期
                LockSupport.parkNanos(this, nanosTimeout);
            long now = System.nanoTime();
            // 更新nanosTimeout
            nanosTimeout -= now - lastTime;
            lastTime = now;
            if (Thread.interrupted())
                //相应中断
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
复制代码

doAcquireNanos的流程简述为:线程先入等待队列,而后开始自旋,尝试获取锁,获取成功就返回,失败则在队列里找一个安全点把本身挂起直到超时时间过时。这里为何还须要循环呢?由于当前线程节点的前驱状态可能不是SIGNAL,那么在当前这一轮循环中线程不会被挂起,而后更新超时时间,开始新一轮的尝试。ui

总结

ReentrantLock是可重入的锁,其内部使用的就是独占模式的AQS。公平锁和非公平锁不一样之处在于,公平锁在获取锁的时候,不会先去检查state状态,而是直接执行aqcuire(1)。公平锁多了hasQueuePredecessors这个方法,这个方法用于判断CHL队列中是否有节点,对于公平锁,若是CHL队列有节点,则新进入竞争的线程必定要在CHL上排队,而非公平锁则是无视CHL队列中的节点,直接进行竞争抢占,这就有可能致使CHL队列上的节点永远获取不到锁,这就是非公平锁之因此不公平的缘由,这里再也不赘述。this

订阅最新文章,欢迎关注个人公众号

微信公众号

参考

  1. Java中可重入锁ReentrantLock原理剖析
  2. ReentrantLock实现原理
相关文章
相关标签/搜索