前几篇文章介绍了 AQS(AbstractQueuedSynchronizer)中的独占模式和对 Condition 的实现,这一篇文章来聊聊基于 AQS 框架实现的锁工具:ReentrantLock。java
ReentrantLock 是一个可重入的互斥锁,也被称为独占锁。node
ReentrantLock 具备两种实现:公平锁(fair lock)、非公平锁(non-fair lock)。c#
本文基于 jdk1.8.0_91
public class ReentrantLock implements Lock, java.io.Serializable
ReentrantLock 是 Lock 接口的实现,方法对照以下:segmentfault
Lock 接口 ReentrantLock 实现 lock() sync.lock() lockInterruptibly() sync.acquireInterruptibly(1) tryLock() sync.nonfairTryAcquire(1) tryLock(long time, TimeUnit unit) sync.tryAcquireNanos(1, unit.toNanos(timeout)) unlock() sync.release(1) newCondition() sync.newCondition()
可知 ReentrantLock 对 Lock 的实现都是调用内部类 Sync 来作的。框架
Sync 继承了 AQS,也就是说 ReentrantLock 的大部分实现都已经由 AQS 完成了。less
ReentrantLock 使用 AQS 中的 state 表示锁是否被其余线程锁占用。若是为 0 则表示未被占用,其余值表示该锁被重入的次数。对于获取锁失败的线程,要么在同步队列中等待(Lock#lock),要么直接返回(Lock#tryLock)。工具
此外,AQS 提供了一系列模板方法,对于互斥锁来讲,须要实现其中的 tryAcquire、tryRelease、isHeldExclusively 方法。ui
java.util.concurrent.locks.AbstractQueuedSynchronizerthis
// 独占获取(资源数) protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } // 独占释放(资源数) protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } // 共享获取(资源数) protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } // 共享获取(资源数) protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } // 是否排它状态 protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }
Sync 重写了 AQS 的 tryRelease 和 isHeldExclusively 方法,而 AQS 的 tryAcquire 方法交由 Sync 的子类来实现。
Sync 中还具有了一个抽象的 lock 方法,强制子类实现。spa
java.util.concurrent.locks.ReentrantLock.Sync
abstract static class Sync extends AbstractQueuedSynchronizer { /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstract void lock(); protected final boolean tryRelease(int releases) {...} protected final boolean isHeldExclusively() {...} }
Sync 具备两个子类 FailSync 和 NonfairSync,对应的是 ReentrantLock 的两种实现:公平锁(fair lock)、非公平锁(non-fair lock)。
Sync 中包含了一个抽象方法 lock 须要子类来实现,设计该抽象方法的目的是,给非公平模式加锁提供入口。
由于公平锁和非公平锁的区别,主要体如今获取锁的机制不一样:
也就是说,非公平锁只有在当前线程未进入同步队列以前,才能够去争夺锁,一旦进入同步队列,只能按排队顺序等待锁。
/** Synchronizer providing all implementation mechanics */ // 提供全部实现机制的同步器 private final Sync sync; /** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */ public ReentrantLock() { sync = new NonfairSync(); } /** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
为何默认建立的是非公平锁?
由于公平模式下,队列头部的线程从阻塞中被唤醒到真正运行,涉及到线程调度和 CPU 上下文的切换,比较耗时,这个过程当中锁处于空闲状态,浪费资源。
而非公平模式下,多个线程之间采用 CAS 来争夺锁,这中间没有时间延迟,可以提升吞吐量。
// 获取锁。 void lock() // 若是当前线程未被中断,则获取锁。 void lockInterruptibly() // 仅在调用时锁未被另外一个线程保持的状况下,才获取该锁。 boolean tryLock() // 若是锁在给定等待时间内没有被另外一个线程保持,且当前线程未被中断,则获取该锁。 boolean tryLock(long timeout, TimeUnit unit) // 试图释放此锁。 void unlock() // 返回用来与此 Lock 实例一块儿使用的 Condition 实例。 Condition newCondition()
获取锁:
java.util.concurrent.locks.ReentrantLock#lock
/** * Acquires the lock. * * <p>Acquires the lock if it is not held by another thread and returns * immediately, setting the lock hold count to one. * * <p>If the current thread already holds the lock then the hold * count is incremented by one and the method returns immediately. * * <p>If the lock is held by another thread then the * current thread becomes disabled for thread scheduling * purposes and lies dormant until the lock has been acquired, * at which time the lock hold count is set to one. */ public void lock() { sync.lock(); }
在 ReentrantLock 中,Sync 的两个子类 FairSync、NonfairSync 各自实现了 Sync#lock 方法。
公平锁
java.util.concurrent.locks.ReentrantLock.FairSync#lock
final void lock() { acquire(1); }
非公平锁
java.util.concurrent.locks.ReentrantLock.NonfairSync#lock
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
公平锁的 lock 方法直接调用 AQS#acquire(1);
非公平锁首先经过 CAS 修改 state 值来获取锁,当获取失败时才会调用 AQS#acquire(1) 来获取锁。
AQS 中的 acquire 方法中,只有 tryAcquire 方法须要子类来实现。
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
其中 tryRelease 方法尝试释放锁,释放锁成功会唤醒在同步队列中等待的线程,详细内容见AQS 独占模式实现原理,这里只介绍 ReentrantLock 对 tryRelease 的实现。
在 ReentrantLock 中,Sync 的两个子类 FairSync、NonfairSync 各自实现了 AQS#tryAcquire 方法。
公平锁
java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire
/** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && // 判断同步队列中是否有等待时间更长的节点 compareAndSetState(0, acquires)) { // 进入这里,说明当前线程等待锁时间最长,则CAS修改state setExclusiveOwnerThread(current); // 将当前线程设置为持有锁的线程 return true; } } else if (current == getExclusiveOwnerThread()) { // 当前线程已持有锁 int nextc = c + acquires; // 重入次数 if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 可重入的最大次数 Integer.MAX_VALUE setState(nextc); return true; } return false; }
代码流程:
非公平锁
java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { // 无论当前线程在同步队列中是否等待最久,都来 CAS 争夺锁 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 若是已得到锁,则重入。逻辑与公平锁一致 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
代码流程:
能够看到,公平锁比非公平锁多执行了 hasQueuedPredecessors 方法,用于判断同步队列中是否具备比当前线程等待锁时间还长的节点。
在 AQS 中的同步队列中,头节点是一个 dummy node,所以等待时间最长的节点是头节点的下一个节点,若该节点存在且不是当前线程,则 hasQueuedPredecessors 返回 true。说明当前节点不是同步队列中等待时间最长的节点,没法获取锁。
java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && // 头节点的下一个节点,不是当前线程的节点,说明当前线程等待锁时间不是最长的 ((s = h.next) == null || s.thread != Thread.currentThread()); }
若是当前线程未被中断,则获取锁。
java.util.concurrent.locks.ReentrantLock#lockInterruptibly
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
关注获取锁过程当中,对异常的处理。
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireInterruptibly
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) // 获取锁以前,当前线程已经中断,则直接抛出中断异常 throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireInterruptibly
/** * Acquires in exclusive interruptible mode. * @param arg the acquire argument */ private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 线程在阻塞等待锁的过程当中,被中断唤醒,则放弃等待锁,直接抛出异常 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
仅在调用时锁未被另外一个线程保持的状况下,才获取该锁。
与非公平锁的 NonfairSync#tryAcquire 方法同样,都是调用了 Sync#nonfairTryAcquire 方法。
注意,Sync#nonfairTryAcquire 方法不存在任何和队列相关的操做。
在锁没有被其余线程持有的状况下,无论使用的是公平锁仍是非公平锁,多个线程均可以调用该方法来 CAS 争夺锁,争夺失败了不用进入同步队列,直接返回。
使用公平锁的状况下,在上一个持有锁的线程释放锁(锁状态 state == 0)以后,同步队列中的线程被唤醒但还没有获取锁以前,此时当前线程执行 tryLock 可能会成功得到锁,这其实是对公平锁的公平性的一种破坏。
在某些状况下,打破公平性的行为多是有用的。若是但愿遵照此锁的公平设置,则使用 tryLock(0, TimeUnit.SECONDS)
,它几乎是等效的(也检测中断)。
java.util.concurrent.locks.ReentrantLock#tryLock()
public boolean tryLock() { return sync.nonfairTryAcquire(1); }
java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { // 无论同步队列中是否具备等待好久的节点,当前线程直接 CAS 争夺锁 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 若是已得到锁,则重入。逻辑与公平锁一致 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
若是锁在给定等待时间内没有被另外一个线程保持,且当前线程未被中断,则获取该锁。
java.util.concurrent.locks.ReentrantLock#tryLock(long, java.util.concurrent.TimeUnit)
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }
在 JDK 中,AQS 中的 tryAcquireNanos 方法只会被 ReentrantLock#tryLock(time, unit) 和 ReentrantReadWriteLock.WriteLock#tryLock(time, unit) 这两个方法调用到。
说明只有独占模式才能调用。
代码流程:
java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquireNanos
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireNanos
/** * Acquires in exclusive timed mode. * * @param arg the acquire argument * @param nanosTimeout max wait time * @return {@code true} if acquired */ private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) // 直到超时都没有得到锁,则返回false return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // 能够进入阻塞的状况下,剩余时间大于阈值,则阻塞,不然自旋 if (Thread.interrupted()) throw new InterruptedException(); // 线程在阻塞等待锁的过程当中,被中断唤醒,则放弃等待锁,直接抛出异常 } } finally { if (failed) cancelAcquire(node); } }
尝试释放锁。
java.util.concurrent.locks.ReentrantLock#unlock
/** * Attempts to release this lock. * * <p>If the current thread is the holder of this lock then the hold * count is decremented. If the hold count is now zero then the lock * is released. If the current thread is not the holder of this * lock then {@link IllegalMonitorStateException} is thrown. * * @throws IllegalMonitorStateException if the current thread does not * hold this lock */ public void unlock() { sync.release(1); }
尝试释放锁,若释放成功,且同步队列中具备等待中的节点,则尝试唤醒后继节点。
java.util.concurrent.locks.AbstractQueuedSynchronizer#release
public final boolean release(int arg) { if (tryRelease(arg)) { // 释放锁资源 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 唤醒head的后继节点 return true; } return false; }
其中 tryRelease 方法尝试释放锁,释放锁成功会唤醒在同步队列中等待的线程,详细内容见AQS 独占模式实现原理,这里只介绍 ReentrantLock 对 tryRelease 的实现。
在 ReentrantLock 中,不论是公平锁仍是非公平锁,均使用相同的 Sync#tryRelease 方法。
java.util.concurrent.locks.ReentrantLock.Sync#tryRelease
protected final boolean tryRelease(int releases) { int c = getState() - releases; // 计算释放以后的state if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; // 锁已所有释放(获取锁的次数必须等于释放次数),返回true,代表能够唤醒下一个等待线程 setExclusiveOwnerThread(null); // 设置独占锁持有线程为null } setState(c); return free; }
建立 AQS 中的 ConditionObject 对象。是能够与 Lock 一块儿使用的 Condition 接口实例。
java.util.concurrent.locks.ReentrantLock.Sync#newCondition
final ConditionObject newCondition() { return new ConditionObject(); }
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#ConditionObject
public ConditionObject() { }
Condition 实例支持与 Object 的监视器方法(wait、notify 和 notifyAll)相同的用法(await、signal 和 signalAll)。
相关阅读:
阅读 JDK 源码:AQS 中的独占模式
阅读 JDK 源码:AQS 中的共享模式
阅读 JDK 源码:AQS 对 Condition 的实现