阅读 JDK 源码:读写锁 ReentrantReadWriteLock

在 JUC 包中,共享锁包括 CountDownLatch、CyclicBarrier、Semaphore、ReentrantReadWriteLock、JDK1.8 新增的 StampedLock 等。java

ReentrantReadWriteLock 中定义了两个锁:共享锁 readLock 和独占锁 writeLock。
共享锁 readLock 用于读操做,能同时被多个线程获取;独占锁 writeLock 用于写入操做,只能被一个线程持有。
读锁、写锁均具备公平模式、非公平模式两种获取锁的方式。c#

因为 ReentrantReadWriteLock 是基于 AQS(AbstractQueuedSynchronizer)框架实现的锁工具,在阅读 ReentrantReadWriteLock 源码以前,须要先了解 AQS 中的独占模式、共享模式、Condition 机制的实现原理,相关内容可翻阅个人前几篇文章,文末已给出连接。segmentfault

本文基于 jdk1.8.0_91

@[toc]缓存

1. 继承体系

继承体系
ReentrantReadWriteLock 具备三个重要的内部类:ReadLock、WriteLock、Sync并发

  • ReentrantReadWriteLock 自己实现了 ReadWriteLock 接口。
  • ReadLock、WriteLock 实现了 Lock 接口。
  • Sync 继承了 AQS 抽象类,具备 FairSync 和 NonfairSync 两个子类。

1.1 ReadWriteLock

读写锁是一种特殊的锁,它把对共享资源的访问分为读访问和写访问,多个线程能够同时对共享资源进行读访问,可是同一时间只能有一个线程对共享资源进行写访问,使用读写锁能够极大地提升吞吐量。app

读与写之间是否互斥框架

能够看到,读写锁除了读读不互斥,读写、写读、写写都是互斥的。ide

经常使用业务场景:读多写少,好比服务缓存等。高并发

public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     *
     * @return the lock used for reading
     */
    Lock readLock();

    /**
     * Returns the lock used for writing.
     *
     * @return the lock used for writing
     */
    Lock writeLock();
}

ReentrantReadWriteLock 是接口 ReadWriteLock 的实现。工具

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    private static final long serialVersionUID = -6992448646407690164L;
    /** Inner class providing readlock */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
}

1.2 Lock

ReadLock、WriteLock 均实现了接口 Lock。

/**
 * The lock returned by method {@link ReentrantReadWriteLock#readLock}.
 */
public static class ReadLock implements Lock, java.io.Serializable {...}

/**
 * The lock returned by method {@link ReentrantReadWriteLock#writeLock}.
 */
public static class WriteLock implements Lock, java.io.Serializable {...}

1.3 AQS

ReentrantReadWriteLock 具备内部类 Sync。跟 ReentrantLock 同样,Sync 继承自 AQS。
ReentrantReadWriteLock 中的 ReadLock、WriteLock 对 Lock 的实现都是经过 Sync 来作到的。

Lock 接口                           ReadLock 实现                                           WriteLock 实现

lock()                              sync.acquireShared(1)                                  sync.acquire(1)
lockInterruptibly()                 sync.acquireSharedInterruptibly(1)                     sync.acquireInterruptibly(1)
tryLock()                           sync.tryReadLock()                                     sync.tryWriteLock()
tryLock(long time, TimeUnit unit)   sync.tryAcquireSharedNanos(1, unit.toNanos(timeout))   sync.tryAcquireNanos(1, unit.toNanos(timeout))
unlock()                            sync.releaseShared(1)                                  sync.release(1)
newCondition()                      UnsupportedOperationException                          sync.newCondition()

能够看到,ReadLock 是共享锁,WriteLock 是独占锁。ReadLock 不支持使用 Condition。
AQS 中提供了一系列的模板方法,在 Sync 中均获得实现。

java.util.concurrent.locks.AbstractQueuedSynchronizer

// 独占获取(资源数)
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

// 独占释放(资源数)
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

// 共享获取(资源数)
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

// 共享获取(资源数)
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

// 是否排它状态
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}

1.4 Sync

Sync 实现了 AQS 中所有的模板方法,所以能够同时支持独占锁、共享锁两种实现。
此外,还提供了两个抽象方法,用于制定公平性策略,强制子类实现。

  • readerShouldBlock:当前线程请求【读锁】是否须要阻塞,返回 true 说明须要等待,不然能够当即尝试获取。
  • writerShouldBlock:当前线程请求【写锁】是否须要阻塞,返回 true 说明须要等待,不然能够当即尝试获取。
abstract static class Sync extends AbstractQueuedSynchronizer {
    /**
     * Returns true if the current thread, when trying to acquire
     * the read lock, and otherwise eligible to do so, should block
     * because of policy for overtaking other waiting threads.
     */
    abstract boolean readerShouldBlock();

    /**
     * Returns true if the current thread, when trying to acquire
     * the write lock, and otherwise eligible to do so, should block
     * because of policy for overtaking other waiting threads.
     */
    abstract boolean writerShouldBlock();
}

1.4 FailSync/NonfairSync

Sync 具备两个子类 FailSync 和 NonfairSync。对应的是 ReentrantReadWriteLock 的两种实现:公平锁(fair lock)、非公平锁(non-fair lock)。

这两个子类只须要实现 Sync 中的 readerShouldBlock、writerShouldBlock 方法。
公平锁和非公平锁的区别,主要体如今获取读写锁的机制不一样:

  • 公平模式:同步队列中存在比当前线程等待【读、写】锁的时间还长的节点,则当前线程获取【读、写】锁时,均须要阻塞。
  • 非公平模式:当前线程能够当即获取【写】锁;当前线程获取【读】锁时,若是同步队列中存在等待【写】锁时间很长的节点,则当前线程须要阻塞(重入读除外)。

若是当前线程没法获取锁,会抛出异常,或进入同步队列进行排队等待。

1.4.1 公平锁

公平模式:无论获取读锁仍是写锁,都要严格按照前后顺序排队获取。

java.util.concurrent.locks.ReentrantReadWriteLock.FairSync

/**
 * Fair version of Sync
 */
static final class FairSync extends Sync {
    private static final long serialVersionUID = -2274990926593161451L;
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors(); // 若是同步队列中等待时间最长的节点不是当前线程,返回true;不然返回false
    }
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}

java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        // 头节点的下一个节点,不是当前线程的节点,说明当前线程等待锁时间不是最长的
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

1.4.2 非公平锁

非公平模式:获取写锁能够当即获取,无需排队;获取读锁以前,若判断等待时间最长的是写线程,则读线程需让步(重入读除外),进入阻塞。

java.util.concurrent.locks.ReentrantReadWriteLock.NonfairSync

/**
 * Nonfair version of Sync
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -8159625535654395037L;
    final boolean writerShouldBlock() {
        return false; // writers can always barge // 获取写锁无需阻塞
    }
    final boolean readerShouldBlock() {
        /* As a heuristic to avoid indefinite writer starvation,    
         * block if the thread that momentarily appears to be head  
         * of queue, if one exists, is a waiting writer.  This is
         * only a probabilistic effect since a new reader will not 
         * block if there is a waiting writer behind other enabled
         * readers that have not yet drained from the queue.
         */
        // 为了不写饥饿,若是同步队列中等待时间最长的节点是互斥节点,则获取读锁须要阻塞,返回true。
        return apparentlyFirstQueuedIsExclusive(); 
    }
}

java.util.concurrent.locks.AbstractQueuedSynchronizer#apparentlyFirstQueuedIsExclusive

final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null && // 头节点h不为空
        (s = h.next)  != null && // 存在等待中的节点s
        !s.isShared()         && // 节点s不是共享模式,即互斥
        s.thread != null;        // 节点s不是无效节点
}

2. 构造方法

构造 ReentrantReadWriteLock 的时候,会依据公平或非公平模式实例化 Sync,再使用 Sync 来构造 ReadLock、WriteLock 实例。

java.util.concurrent.locks.ReentrantReadWriteLock

/** Performs all synchronization mechanics */
final Sync sync;

/**
 * Creates a new {@code ReentrantReadWriteLock} with
 * default (nonfair) ordering properties.
 */
public ReentrantReadWriteLock() {
    this(false);
}

/**
 * Creates a new {@code ReentrantReadWriteLock} with
 * the given fairness policy.
 *
 * @param fair {@code true} if this lock should use a fair ordering policy
 */
public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

默认使用非公平锁。
将 Sync 实例传递给 ReadLock、WriteLock 实例。

java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock

public static class ReadLock implements Lock, java.io.Serializable {
    private final Sync sync;

    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
}

java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock

public static class WriteLock implements Lock, java.io.Serializable {
    private final Sync sync;

    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
}

3. 资源 state

3.1 资源定义

使用 AQS 的属性 state 来表示资源/锁。

/**
 * The synchronization state.
 */
private volatile int state;

ReentrantLock 使用 state 表示当前共享资源是否被其余线程锁占用。若是为 0 则表示未被占用,其余值表示该锁被重入的次数。
ReentrantReadWriteLock 使用 state 的高 16 位表示读状态,也就是获取到读锁的次数;使用低 16 位表示写状态,也就是获取到写锁的次数。

state

/*
 * Read vs write count extraction constants and functions.
 * Lock state is logically divided into two unsigned shorts:
 * The lower one representing the exclusive (writer) lock hold count,
 * and the upper the shared (reader) hold count. 
 */

static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1; // 十进制为:65535
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 二进制为:1111 1111 1111 1111

/** Returns the number of shared holds represented in count  */
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; } // 共享锁(读)的数量,取高16位
/** Returns the number of exclusive holds represented in count  */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } // 互斥锁(写)的数量,取低16位

对于指定的 state 值,使用 sharedCount() 计算获得共享锁(读锁)的数量,使用 exclusiveCount() 计算获得互斥锁(写锁)的数量。
可知,ReentrantReadWriteLock 最多支持 65535 个递归写入锁和 65535 个读取锁。

3.2 资源计数

Sync 中定义了两个内部类:

  • HoldCounter 用于记录单个线程id、该线程持有锁的数量。
  • ThreadLocalHoldCounter 用于记录全部线程id、各自持有锁的数量。
/**
 * A counter for per-thread read hold counts.
 * Maintained as a ThreadLocal; cached in cachedHoldCounter
 */
static final class HoldCounter { // 用来保存线程id、该线程持有共享锁的数量
    int count = 0;
    // Use id, not reference, to avoid garbage retention 
    // 记录线程id,不记录线程引用,防止没法GC
    final long tid = getThreadId(Thread.currentThread());
}

/**
 * ThreadLocal subclass. Easiest to explicitly define for sake
 * of deserialization mechanics.
 */
static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}

/**
 * The number of reentrant read locks held by current thread.
 * Initialized only in constructor and readObject.
 * Removed whenever a thread's read hold count drops to 0.
 */
// 存储全部线程的id,以及各自持有锁的数量
private transient ThreadLocalHoldCounter readHolds;

// 缓存,存储最后一个获取锁的 HoldCounter:线程id,该线程持有锁的数量
private transient HoldCounter cachedHoldCounter;

// 缓存,存储第一个获取锁的线程
private transient Thread firstReader = null;

// 缓存,记录第一个获取锁的线程持有锁的数量
private transient int firstReaderHoldCount;

Sync() {
    readHolds = new ThreadLocalHoldCounter(); // 建立 ThreadLocalHoldCounter 实例
    setState(getState()); // ensures visibility of readHolds
}

4. 读锁

方法摘要:

// 获取读取锁。
void lock() 

// 获取读取锁,除非当前线程被中断。 
void lockInterruptibly() 

// 由于 ReadLocks 不支持条件,因此将抛出 UnsupportedOperationException。
Condition newCondition() 

// 返回标识此锁及其锁状态的字符串。 
String toString() 

// 仅当写入锁在调用期间未被另外一个线程保持时获取读取锁。
boolean tryLock() 

// 若是另外一个线程在给定的等待时间内没有保持写入锁,而且当前线程未被中断,则获取读取锁。 
boolean tryLock(long timeout, TimeUnit unit) 

// 试图释放此锁。 
void unlock()

4.1 ReadLock#lock

获取读锁。

  1. 若是没有其余线程持有写锁,则当前线程获取读取锁并当即返回。
  2. 若是其余线程持有写锁,则当前线程进入阻塞,直到能够获取读锁。

即,跟写锁互斥。

java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#lock

/**
 * Acquires the read lock.
 *
 * <p>Acquires the read lock if the write lock is not held by
 * another thread and returns immediately.
 *
 * <p>If the write lock is held by another thread then
 * the current thread becomes disabled for thread scheduling
 * purposes and lies dormant until the read lock has been acquired.
 */
public void lock() {
    sync.acquireShared(1);
}

4.1.1 AQS#acquireShared

共享模式下获取锁/资源,无视中断。

代码流程:

  1. 获取共享锁/资源,获取失败则进入下一步
  2. 进入同步队列中等待获取锁/资源

java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireShared

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

其中,AQS#tryAcquireShared 返回值:

  • 负数:获取资源失败,准备进入同步队列;
  • 0:获取资源成功,但没有剩余可用资源;
  • 正数:获取资源成功,能够唤醒下一个等待线程;

在 ReentrantReadWriteLock.ReadLock 中,tryAcquireShared 只会返回 1 和 -1.
当读锁获取失败时(其余线程正在写,或者不符合公平性策略)返回 -1,此时才会执行 doAcquireShared 进入排队等待。

注意,当读锁的计数达到最大值,后续再获取读锁会抛出异常 throw new Error("Maximum lock count exceeded"),而不是排队等待。

4.1.2 Sync#tryAcquireShared

代码流程:

  1. 若是其余线程持有写锁,则获取读锁失败。
  2. 不然(没有其余线程获取写锁),若是按照公平性策略能够获取读锁,则 CAS 尝试获取,并记录线程 id 和获取锁的次数。
  3. 上一步获取锁失败,进入 fullTryAcquireShared 在自旋中重试 CAS 获取锁。

公平/非公平规则:

  • 公平的 readerShouldBlock:同步队列中等待时间最长的节点不是当前线程,则当前线程获取读锁须要阻塞。
  • 非公平的 readerShouldBlock:同步队列中等待时间最长的节点是互斥节点,则当前线程获取读锁须要阻塞。

获取锁成功后,须要记录当前线程持有数的数量:

  1. 当前是第一个持有锁的线程,记为 firstReader,且 firstReaderHoldCount 记为 1。
  2. 当前线程是 firstReader,则 firstReaderHoldCount 加 1。
  3. 当前线程不是 firstReader,使用 cachedHoldCounter 或 ThreadLocalHoldCounter 来记录线程 id 和获取锁的次数

java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquireShared

protected final int tryAcquireShared(int unused) {
    /*
     * Walkthrough:
     * 1. If write lock held by another thread, fail.
     * 2. Otherwise, this thread is eligible for     
     *    lock wrt state, so ask if it should block
     *    because of queue policy. If not, try
     *    to grant by CASing state and updating count.
     *    Note that step does not check for reentrant 
     *    acquires, which is postponed to full version
     *    to avoid having to check hold count in
     *    the more typical non-reentrant case.
     * 3. If step 2 fails either because thread
     *    apparently not eligible or CAS fails or count
     *    saturated, chain to version with full retry loop.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        // 若是其余线程持有写锁,则当前线程进入阻塞(写读互斥)
        // 若是当前线程持有写锁,则容许再次获取读锁(支持锁降级)
        return -1;                            
    int r = sharedCount(c);      // 读锁被获取的次数
    if (!readerShouldBlock() &&  // 公平性策略校验,若不须要阻塞则进入下一步
        r < MAX_COUNT &&         // 读锁数量没有超限制
        compareAndSetState(c, c + SHARED_UNIT)) { // CAS 获取读锁(高16位加1)
        if (r == 0) { // 校验当前线程是不是 firstReader,并累计 firstReaderHoldCount
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {      // 进入这里,说明当前不是首个获取锁的线程,使用 HoldCounter 记录当前线程id和持有锁的数量
            HoldCounter rh = cachedHoldCounter; // 先查缓存 cachedHoldCounter
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get(); // 缓存没有命中,从 ThreadLocal 中获取,并更新缓存
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;     // 获取锁成功,持有读锁数量1
    }
    return fullTryAcquireShared(current); // 一次获取读锁失败后,尝试循环获取
}

4.1.3 Sync#fullTryAcquireShared

在 tryAcquireShared 中经行了一次快速获取读锁,可是 CAS 只能容许一个线程获取锁成功,而读锁是共享的,能够同时容许多个线程获取。所以须要调用 fullTryAcquireShared 执行完整版的获取锁的逻辑。

关注不一样的地方:

  1. tryAcquireShared 只 CAS 获取锁一次,失败了则调用 fullTryAcquireShared,后者会在自旋中不断重试 CAS 获取锁。
  2. tryAcquireShared 得知获取读锁须要阻塞,不会尝试 CAS;而 fullTryAcquireShared 得知获取读锁须要阻塞,会进一步判断当前是不是重入的,非重入时才会让出读锁。

也就是说,重入读的状况下,若是锁是可获取状态,不会让出锁给写线程。

java.util.concurrent.locks.ReentrantReadWriteLock.Sync#fullTryAcquireShared

/**
 * Full version of acquire for reads, that handles CAS misses
 * and reentrant reads not dealt with in tryAcquireShared.
 */
final int fullTryAcquireShared(Thread current) {
    /*
     * This code is in part redundant with that in
     * tryAcquireShared but is simpler overall by not
     * complicating tryAcquireShared with interactions between
     * retries and lazily reading hold counts.
     */
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            /**
              * 若是是其余线程获取了写锁,那么把当前线程阻塞;
              * 若是是当前线程获取了写锁,不阻塞,不然会形成死锁。
              * 从这里能够看到 ReentrantReadWriteLock 容许锁降级。
              */
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {
            /**
              * 非公平模式下,进入这里说明,同步队列的头结点的后继有一个竞争写锁的线程。
              * 因此这里有一个锁让步的操做,即让写锁先获取。
              * 1. 若是知足 firstReader == current 或者 rh.count > 0 说明是重入的读。
              *    不须要让步给写线程,不然会致使死锁。
              * 2. 若是 rh.count == 0 就说明,这个线程是第一次获取读锁。
              *    为了防止写饥饿,直接将当前线程放入同步队列排队等待。
              */
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) { // 说明不是第一次读(而是重入的读),不须要让步给写线程
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove(); // 当前线程不持有读锁,移除计数
                    }
                }
                if (rh.count == 0) // 说明是第一次读,须要让步给写线程
                    return -1;     // 返回-1,后续进入同步队列中排队等待锁
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) { // CAS 获取读锁(高16位加1)
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

当前线程若是持有写锁,容许获取读锁,即支持锁降级

为何要支持锁降级呢?主要是为了防止出现死锁。
假设不支持锁降级,当持有写锁的线程须要获取读锁时,只能进入阻塞等待锁变为可读。
可是只有当没有线程持有写锁时,读锁才可以被获取(写读互斥),致使读锁一直没法获取,此时出现死锁。

为何重入读锁时,不须要让出锁呢?一样是为了防止出现死锁。
假设重入读锁须要让给等待写锁的线程,则重入读的线程进入阻塞。
可是只有当没有线程持有读锁时,写锁才可以被获取(读写互斥),致使写锁一直没法被获取,此时出现死锁。

4.2 ReadLock#tryLock

仅当写锁在调用期间未被另外一个线程保持时,获取读锁。

java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#tryLock()

/**
 * Acquires the read lock only if the write lock is not held by
 * another thread at the time of invocation.
 */
public boolean tryLock() {
    return sync.tryReadLock();
}

代码流程:

  1. 若是其余线程持有写锁,则获取读锁失败。
  2. 若是读锁的获取数量超限制,则返回失败。
  3. 采用 CAS 获取锁,获取成功则计数;获取失败则返回,不会进入同步队列。

跟 tryAcquireShared 最大的区别没有调用 readerShouldBlock,所以可能会破坏公平规则,或形成写饥饿。

注意:
即便已将此锁设置为使用公平排序策略,可是调用 tryLock() 仍将当即获取读锁(若是有可用的),无论其余线程当前是否正在等待该读锁。在某些状况下,此“闯入”行为可能颇有用,即便它会打破公平性也如此。若是但愿遵照此锁的公平设置,则使用 tryLock(0, TimeUnit.SECONDS) ,它几乎是等效的(它也检测中断)。

java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryReadLock

/**
 * Performs tryLock for read, enabling barging in both modes.
 * This is identical in effect to tryAcquireShared except for
 * lack of calls to readerShouldBlock.
 */
final boolean tryReadLock() {
    Thread current = Thread.currentThread();
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current) // 其余线程持有写锁
            return false;
        int r = sharedCount(c);
        if (r == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) { // 自旋中 CAS 获取读锁(高16位加1)
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return true;
        }
    }
}

4.3 ReadLock#unLock

释放读锁。

java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#unlock

/**
 * Attempts to release this lock.
 *
 * <p>If the number of readers is now zero then the lock
 * is made available for write lock attempts.
 */
public void unlock() {
    sync.releaseShared(1);
}

4.3.1 AQS#releaseShared

共享模式下释放锁/资源。

  1. 释放共享锁/资源,若释放成功,则进入下一步。
  2. 唤醒队列中的等待节点
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

在 ReentrantReadWriteLock 中,只有当所有读锁都释放以后,state == 0 时才会执行 doReleaseShared,唤醒同步队列中等待着的写线程。

4.3.2 Sync#tryReleaseShared

代码流程:

  1. 对 HoldCounter 中当前线程持有锁的计数进行自减。
  2. 自旋 CAS 更新 state 进行释放锁。
  3. 若 state == 0,说明当前没有线程持有锁,对于等待写入的线程来讲,能够发起锁请求。

能够看到,释放读锁不会与其余行为互斥。

java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryReleaseShared

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) { // 对当前线程持有锁的数量进行自减
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException(); // 当前线程没有持有锁,报错
        }
        --rh.count;
    }
    for (;;) { // 自旋 CAS 更新 state(确保多个线程可以并发释放锁)
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0; // 返回true,说明当前没有线程持有锁,对于等待写入的线程来讲,能够发起锁请求
    }
}

4.4 ReadLock#newCondition

因为 AQS 中的 Condition 实现不支持共享锁,所以 ReentrantReadWriteLock 中的读锁不支持使用 Condition。

为何要设计成共享锁不支持使用 Condition 呢?

  1. 读锁和写锁虽然都是使用 AQS 的 state 属性,可是持有读锁和持有写锁是独立的。
  2. 假设读锁支持 Condition,当前线程获取读锁后,在 Condition 条件上等待时,因为读写互斥,可以解除该阻塞的其余线程没法获取写锁,致使当前线程没法被唤醒。

Java 官方的解释:

Read locks are held independently of write locks, so are not checked or affected. However it is essentially always an error to invoke a condition waiting method when the current thread has also acquired read locks, since other threads that could unblock it will not be able to acquire the write lock.

java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#newCondition

/**
 * Throws {@code UnsupportedOperationException} because
 * {@code ReadLocks} do not support conditions.
 *
 * @throws UnsupportedOperationException always
 */
public Condition newCondition() {
    throw new UnsupportedOperationException();
}

5. 写锁

方法摘要:

// 查询当前线程保持写入锁的数量。 
int getHoldCount() 

// 查询此写入锁是否由当前线程保持。
boolean isHeldByCurrentThread() 

// 获取写入锁。 
void lock() 

// 获取写入锁,除非当前线程被中断。
void lockInterruptibly() 

// 返回一个用来与此 Lock 实例一块儿使用的 Condition 实例。 
Condition newCondition() 

// 返回标识此锁及其锁状态的字符串。 
String toString() 

// 仅当写入锁在调用期间未被另外一个线程保持时获取该锁。
boolean tryLock() 

// 若是另外一个线程在给定的等待时间内没有保持写入锁,而且当前线程未被中断,则获取写入锁。
boolean tryLock(long timeout, TimeUnit unit) 

// 试图释放此锁。 
void unlock()

5.1 WriteLock#lock

获取写锁。

  1. 若是其余线程既没有保持读锁也没有保持写锁,则获取写锁并当即返回,并将写锁保持计数设置为 1。
  2. 若是当前线程已经保持写锁,则保持计数增长 1,该方法当即返回。
  3. 若是写锁被其余线程保持,则当前线程进入阻塞直到能够获取写锁。

即,跟读锁、写锁都互斥。

java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#lock

/**
 * Acquires the write lock.
 *
 * <p>Acquires the write lock if neither the read nor write lock
 * are held by another thread
 * and returns immediately, setting the write lock hold count to
 * one.
 *
 * <p>If the current thread already holds the write lock then the
 * hold count is incremented by one and the method returns
 * immediately.
 *
 * <p>If the lock is held by another thread then the current
 * thread becomes disabled for thread scheduling purposes and
 * lies dormant until the write lock has been acquired, at which
 * time the write lock hold count is set to one.
 */
public void lock() {
    sync.acquire(1);
}

5.1.1 AQS#acquire

AQS 中的 acquire 方法中,只有 tryAcquire 方法须要子类来实现。

代码流程:

  1. 尝试获取锁,失败则进入下一步。
  2. 进入同步队列中等待获取锁。

java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire

public final void acquire(int arg) {
    if (!tryAcquire(arg) && 
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

注意,当写锁的计数达到最大值,后续再获取写锁会抛出异常 throw new Error("Maximum lock count exceeded"),而不是排队等待。

5.1.2 Sync#tryAcquire

代码流程:

  1. 存在线程持有读锁,或者其余线程持有写锁,则当前线程获取写锁失败。
  2. 若是写锁已经饱和,没法再得到锁,返回失败。
  3. 不然,若是是重入获取写锁,或者公平性策略校验经过,则获取写锁,并更新写锁计数。

注意,当前线程持有读锁后,没法再获取写锁,即不支持锁升级

为何不支持锁升级?

  • 假设支持锁升级,持有读锁的线程均升级为写锁,会违反写锁是互斥锁的定义。
  • 假设只支持其中一个读锁进行升级,则会违反读写互斥的规则。

java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquire

protected final boolean tryAcquire(int acquires) {
    /*
     * Walkthrough:
     * 1. If read count nonzero or write count nonzero
     *    and owner is a different thread, fail.
     * 2. If count would saturate, fail. (This can only
     *    happen if count is already nonzero.)
     * 3. Otherwise, this thread is eligible for lock if
     *    it is either a reentrant acquire or
     *    queue policy allows it. If so, update state
     *    and set owner.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) { // 锁已经被线程持有,须要进一步区分读、写锁,是否当前线程持有
        // (Note: if c != 0 and w == 0 then shared count != 0)
        // 只有读锁(读写互斥。注意当前线程持有读锁以后,也没法获取写锁,不支持锁升级)
        // 或者持有写锁的不是当前线程,则没法获取写锁
        if (w == 0 || current != getExclusiveOwnerThread()) 
            return false;                                   
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire    // 进入这里,说明当前线程已持有写锁,是重入获取
        setState(c + acquires); // 更新 state,无需使用 CAS 
        return true;
    }
    // 进入这里,说明锁未被持有(可是同步队列中可能有线程在等待)
    if (writerShouldBlock() ||  // 校验公平性策略看可否获取写锁
        !compareAndSetState(c, c + acquires)) // 可以获取写锁,则 CAS 获取(低16位加1)
        return false;
    setExclusiveOwnerThread(current); // 获取写锁成功,记录持有写锁的是当前线程
    return true;
}

5.2 WriteLock#tryLock

仅当写锁在调用期间未被另外一个线程保持时,获取写锁。

java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#tryLock()

public boolean tryLock( ) {
    return sync.tryWriteLock();
}

代码流程:

  1. 存在线程持有读锁,或者其余线程持有写锁,则当前线程获取写锁失败。
  2. 若是写锁已经饱和,没法再得到锁,返回失败。
  3. 不然,若是是重入获取写锁,或者没当前没有线程持有锁(可能在同步队列中等待),则 CAS 获取锁,获取成功则计数;获取失败则返回,不会进入同步队列。

跟 Sync#tryAcquire 最大的区别没有调用 writerShouldBlock,所以可能会破坏公平规则。
若是但愿遵照此锁的公平设置,则使用 tryLock(0, TimeUnit.SECONDS) ,它几乎是等效的(它也检测中断)。

因为读锁是共享的,tryReadLock 中须要自旋进行 CAS 获取锁,而 tryWriteLock 中不用进行自旋,只 CAS 获取一次。

java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryWriteLock

/**
 * Performs tryLock for write, enabling barging in both modes.
 * This is identical in effect to tryAcquire except for lack
 * of calls to writerShouldBlock.
 */
final boolean tryWriteLock() { // 仅当写入锁在调用期间未被另外一个线程保持时获取该锁
    Thread current = Thread.currentThread();
    int c = getState();
    if (c != 0) {
        int w = exclusiveCount(c);
        if (w == 0 || current != getExclusiveOwnerThread()) // 只有读锁(读写互斥);或者持有写锁的不是当前线程
            return false;
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    if (!compareAndSetState(c, c + 1))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

5.3 WriteLock#unlock

试图释放此锁。

java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#unlock

public void unlock() {
    sync.release(1);
}

5.3.1 AQS#release

独占模式下释放锁。

  1. 尝试释放锁,若释放成功则进入下一步。
  2. 唤醒同步队列中的后继节点。

java.util.concurrent.locks.AbstractQueuedSynchronizer#release

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

在 ReentrantReadWriteLock 中,只有当所有写锁都释放以后,exclusiveCount == 0 时才会执行 unparkSuccessor,唤醒同步队列中等待着的读、写线程。

5.3.2 Sync#tryRelease

代码流程:

  1. 若是当前线程不是此锁的持有者,则抛出 IllegalMonitorStateException。
  2. 若是当前线程保持此锁,则将保持计数减 1。若是保持计数如今为 0,则释放该锁。

java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryRelease

protected final boolean tryRelease(int releases) { // 释放独占锁-写锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;         // 更新 state,无需 CAS
    boolean free = exclusiveCount(nextc) == 0; // 检查锁是否已所有释放
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

5.4 WriteLock#newCondition

返回一个用来与此 Lock 实例一块儿使用的 Condition 实例。

java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#newCondition

public Condition newCondition() {
    return sync.newCondition();
}

在使用内置监视器锁时,返回的 Condition 实例支持与 Object 的监视器方法(wait、notify 和 notifyAll)相同的用法。

  1. 若是当调用任何 Condition 方法时没有保持此写入锁,则抛出 IllegalMonitorStateException。(由于保持读锁是独立于写锁的,因此读锁将不被检查或受影响)
  2. 调用等待条件方法时,会释放写锁,在方法返回以前,会从新获取写锁,并将锁保持计数恢复到调用该方法时的值。
  3. 若是线程在等待时被中断,则等待将终止,并将抛出 InterruptedException,清除线程的已中断状态。
  4. 等待线程按 FIFO 顺序收到信号。
  5. 等待方法返回的线程从新获取锁的顺序,与线程最初获取锁的顺序相同,在默认状况下,未指定此顺序,但对于公平锁,它们更倾向于那些等待时间最长的线程。

6. 示例

JDK 官方示例,展现了如何利用重入来执行升级缓存后的锁降级(为简单起见,省略了异常处理):

class CachedData {
    Object data;
    volatile boolean cacheValid;
    ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

    void processCachedData() {
        rwl.readLock().lock();
        if (!cacheValid) {
            // Must release read lock before acquiring write lock
            rwl.readLock().unlock();
            rwl.writeLock().lock();
            // Recheck state because another thread might have acquired
            // write lock and changed state before we did.
            if (!cacheValid) {
                data = ...
                cacheValid = true;
            }
            // Downgrade by acquiring read lock before releasing write lock
            rwl.readLock().lock();
            // Unlock write, still hold read
            rwl.writeLock().unlock();
        }

        use(data);
        rwl.readLock().unlock();
    }
}

在使用某些种类的 Collection 时,可使用 ReentrantReadWriteLock 来提升并发性。
一般,在预期 collection 很大,读取者线程访问它的次数多于写入者线程,而且 entail 操做的开销高于同步开销时,这很值得一试。
例如,如下是一个使用 TreeMap 的类,预期它很大,而且能被同时访问。

class RWDictionary {
    private final Map<String, Data> m = new TreeMap<String, Data>();
    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private final Lock r = rwl.readLock();
    private final Lock w = rwl.writeLock();

    public Data get(String key) {
        r.lock();
        try { return m.get(key); }
        finally { r.unlock(); }
    }
    public String[] allKeys() {
        r.lock();
        try { return m.keySet().toArray(); }
        finally { r.unlock(); }
    }
    public Data put(String key, Data value) {
        w.lock();
        try { return m.put(key, value); }
        finally { w.unlock(); }
    }
    public void clear() {
        w.lock();
        try { m.clear(); }
        finally { w.unlock(); }
    }
}

7. 总结

7.1 互斥规则

读读不互斥,读写、写读、写写都是互斥的。

7.2 获取顺序

ReentrantReadWriteLock 不会按读取者优先或写入者优先的顺序,来对锁的访问进行排序。可是,它确实支持可选的公平性策略。

非公平模式(默认):
获取写锁能够当即获取,无需排队;获取读锁以前,若判断等待时间最长的是写线程,则非重入的读线程需进入阻塞。
连续竞争的非公平锁可能无限期地推迟一个或多个 reader 或 writer 线程,但吞吐量一般要高于公平锁。

公平模式:
无论获取读锁仍是写锁,都要严格按照前后顺序排队获取。
注意,读锁是共享的,只要同步队列中没有等待的线程,读锁能够被同时获取,一旦同步队列中具备线程在等待,后续的非重入的读线程只能入队等待。
此外,ReentrantReadWriteLock.ReadLock.tryLock() 和 ReentrantReadWriteLock.WriteLock.tryLock() 方法不会遵照此公平设置。

7.3 重入

重入规则能够看做是对公平性策略的一种修正,即便同步队列中存在等待线程时,已持有锁的线程能够重入获取锁,无需让步。
读锁的最大可重入次数是 65535,写锁的最大可重入次数一样是 65535。
超过最大可重入次数,直接抛异常。

7.4 锁降级

重入还容许从写锁降级为读锁,其实现方式是:先获取写锁,而后获取读锁,最后释放写锁、读锁。
可是,从读锁升级到写锁是不可能的。

7.5 Condition 支持

写锁提供了一个 Condition 实现,读锁不支持 Condition。

7.6 通知机制

读锁释放:当锁被多个读线程持有时,只有所有读锁都释放了,才会唤醒同步队列中等待着的节点,此时的等待节点是写线程。

写锁释放:当锁被单个写线程持有时,只有所有写锁都释放了,才会唤醒同步队列中等待着的节点,该节点多是写线程或读线程。

读锁获取:当同步队列中的读线程成功获取锁后,会唤醒队列中的下一个共享节点(读线程),再由下一个共享节点获取锁后唤醒下下个共享节点(见 AQS#setHeadAndPropagate)。


相关阅读:
阅读 JDK 源码:AQS 中的独占模式
阅读 JDK 源码:AQS 中的共享模式
阅读 JDK 源码:AQS 对 Condition 的实现
阅读 JDK 源码:可重入锁 ReentrantLock

做者:Sumkor
连接:https://segmentfault.com/a/11...

相关文章
相关标签/搜索