读写锁实现逻辑相对比较复杂,可是倒是一个常用到的功能,但愿将我对
ReentrantReadWriteLock
的源码的理解记录下来,能够对你们有帮助html
在理解ReentrantReadWriteLock
时须要具有一些基本的知识java
以前有写过一篇《深刻浅出AQS源码解析》关于AQS的文章,对AQS原理不了解的同窗能够先看一下缓存
ReentrantLock
的实现原理能够参考《深刻浅出ReentrantLock源码解析》函数
对于资源的访问就两种形式:要么是读操做,要么是写操做。读写锁是将被锁保护的临界资源的读操做和写操做分开,容许同时有多个线程同时对临界资源进行读操做,任意时刻只容许一个线程对资源进行写操做。简单的说,对与读操做采用的是共享锁,对于写操做采用的是排他锁。优化
ReentrantReadWriteLock
是用state
字段来表示读写锁重复获取资源的次数,高16位用来标记读锁的同步状态,低16位用来标记写锁的同步状态ui
// 划分的边界线,用16位来划分 static final int SHARED_SHIFT = 16; // 读锁的基本单位,也就是读锁加1或者减1的基本单位(1左移16位后的值) static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 读写锁的最大值(在计算读锁的时候须要先右移16位) static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 写锁的掩码,state值与掩码作与运算后获得写锁的真实值 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 获取资源被读锁占用的次数 static int sharedCount(int c){ return c >>> SHARED_SHIFT; } // 获取资源被写锁占用的次数 static int exclusiveCount(int c){ return c & EXCLUSIVE_MASK; }
在统计读锁被每一个线程持有的次数时,ReentrantReadWriteLock
采用的是HoldCounter
来实现的,具体以下:线程
// 持有读锁的线程重入的次数 static final class HoldCounter { // 重入的次数 int count = 0; // 持有读锁线程的线程id final long tid = getThreadId(Thread.currentThread()); } /** * 采用ThreadLocal机制,作到线程之间的隔离 */ static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } /** * 线程持有可重入读锁的次数 */ private transient ThreadLocalHoldCounter readHolds; /** * 缓存最后一个成功获取读锁的线程的重入次数,有两方面的好处: * 一、避免了经过访问ThreadLocal来获取读锁的信息,这个优化的前提是 * 假设多数状况下,一个获取读锁的线程,使用完之后就会释放读锁, * 也就是说最后获取读锁的线程和最早释放读锁的线程大多数状况下是同一个线程 * 二、由于ThreadLocal中的key是一个弱引用类型,当有一个变量持有HoldCounter对象时, * ThreadLocalHolderCounter中最后一个获取锁的线程信息不会被GC回收掉 */ private transient HoldCounter cachedHoldCounter; /** * 第一个获取读锁的线程,有两方面的考虑: * 一、记录将共享数量从0变成1的线程 * 二、对于无竞争的读锁来讲进行线程重入次数数据的追踪的成本是比较低的 */ private transient Thread firstReader = null; /** * 第一个获取读锁线程的重入次数,能够参考firstReader的解析 */ private transient int firstReaderHoldCount;
ReentrantReadWriteLock
一共有5个内部类,具体以下:设计
Sync
:公平锁和非公平锁的抽象类NonfairSync
:非公平锁的具体实现FairSync
:公平锁的具体实现ReadLock
:读锁的具体实现WriteLock
:写锁的具体实现咱们从读锁ReadLock
和写锁WriteLock
的源码开始分析,而后顺着这个思路将整个ReentrantReadWriteLock
中全部的核心源码(全部的包括内部类)进行分析。code
public static class ReadLock implements Lock, java.io.Serializable { private final Sync sync; /** * 经过ReentrantReadWriteLock中的公平锁或非公平锁来初始化sync变量 */ protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } /** * 阻塞的方式获取锁,由于读锁是共享锁,因此调用acquireShared方法 */ public void lock() { sync.acquireShared(1); } /** * 可中断且阻塞的方式获取锁 */ public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); } /** * 超时尝试获取锁,非阻塞的方式 */ public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } /** * 尝试获取写锁,非阻塞的方式 */ public boolean tryLock() { return sync.tryReadLock(); } /** * 释放锁 */ public void unlock() { sync.releaseShared(1); } }
接下来,咱们重点看一下在公平锁和非公平锁下Sync.acquireShared
、Sync.releaseShared
和Sync.tryLock
这3个方法的实现(acquireSharedInterruptibly
和tryAcquireSharedNanos
是AQS中的方法,这里就不在讨论了,具体能够参考《深刻浅出AQS源码解析》),其中Sync.acquireShared
中核心调用的方法是Sync.tryAcquireShared
,Sync. releaseShared
中核心调用的方法是Sync.tryReleaseShared
,Sync.tryLock
中核心调用的方法是Sync.tryReadLock
,因此咱们重点分析Sync.tryAcquireShared
方法、Sync.tryReleaseShared
方法和sync.tryReadLock
方法htm
protected final int tryAcquireShared(int unused) { /** * 以共享锁的方式尝试获取读锁,步骤以下: * 一、若是资源已经被写锁获取了,直接返回失败 * 二、若是读锁不须要等待(公平锁和非公平锁的具体实现有区别)、 * 而且读锁未超过上限、同时设置读锁的state值成功,则返回成功 * 三、若是步骤2失败了,须要进入fullTryAcquireShared函数再次尝试获取读锁 */ Thread current = Thread.currentThread(); int c = getState(); /** * 资源已经被写锁独占,直接返回false */ if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); /** * 一、读锁不须要等待 * 二、读锁未超过上限 * 三、设置读锁的state值成功 * 则返回成功 */ if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { // 记录第一个获取读锁的线程信息 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 第一个获取读锁的线程再次获取锁(重入) firstReaderHoldCount++; } else { // 修改获取锁的线程的重入的次数 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != LockSupport.getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } /** * 若是CAS失败再次获取读锁 */ return fullTryAcquireShared(current); }
接下来看一下fullTryAcquireShared方法:
final int fullTryAcquireShared(Thread current) { /** * 调用该方法的线程都是但愿获取读锁的线程,有3种状况: * 一、在尝试经过CAS操做修改state时因为有多个竞争读锁的线程致使CAS操做失败 * 二、须要排队等待获取读锁的线程(公平锁) * 三、超过读锁限制的最大申请次数的线程 */ HoldCounter rh = null; for (;;) { // 无限循环获取锁 int c = getState(); // 已经被写线程获取锁了,直接返回 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // 须要被block的读线程(公平锁) } else if (readerShouldBlock()) { // 若是时当前线程 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { // 清理当前线程中重入次数为0的数据 if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } // 当前线程获取锁失败 if (rh.count == 0) return -1; } } // 判断是否超过读锁的最大值 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 修改读锁的state值 if (compareAndSetState(c, c + SHARED_UNIT)) { // 最新获取到读锁的线程设置相关的信息 if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; // 当前线程重复获取锁(重入) } else { // 在readHolds中记录获取锁的线程的信息 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
tryReleaseShared
方法的实现逻辑比较简单,咱们直接看代码中的注释
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); /** * 若是当前线程是第一个获取读锁的线程,有两种状况: * 一、若是持有锁的次数为1,直接释放成功 * 二、若是持有锁的次数大于1,说明有重入的状况,须要次数减1 */ if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { /** * 若是当前线程不是第一个获取读锁的线程 * 须要更新线程持有锁的重入次数 * 若是次数小于等于0说明有异常,由于只有当前线程才会出现持有锁的重入次数等于0或者1 */ HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } // 修改state的值 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // 若是是最后一个释放读锁的线程nextc为0,不然不是 return nextc == 0; } }
tryReadLock
的代码比较简单,就直接在将解析过程在注释中描述
final boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) { // 无限循环获取读锁 int c = getState(); // 当前线程不是读线程,直接返回失败 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false; int r = sharedCount(c); // 读锁的总重入次数是否超过最大次数限制 if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded"); /** * 经过CAS操做设置state的值,若是成功表示尝试获取读锁成功,须要作如下几件事情: * 一、若是是第一获取读锁要记录第一个获取读锁的线程信息 * 二、若是是当前获取锁的线程和第一次获取锁的线程相同,须要更新第一获取线程的重入次数 * 三、更新获取读锁线程相关的信息 */ if (compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return true; } } }
public static class WriteLock implements Lock, java.io.Serializable { private final Sync sync; /** * 经过ReentrantReadWriteLock中的公平锁或非公平锁来初始化sync变量 */ protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; } /** * 阻塞的方式获取写锁 */ public void lock() { sync.acquire(1); } /** * 中断的方式获取写锁 */ public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } /** * 尝试获取写锁 */ public boolean tryLock( ) { return sync.tryWriteLock(); } /** * 超时尝试获取写锁 */ public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } /** * 释放写锁 */ public void unlock() { sync.release(1); } }
接下来,咱们重点看一下在公平锁和非公平锁下Sync.tryAcquire
、Sync.tryRelease
和Sync.tryWriteLock
这几个核心方法是如何实现写锁的功能
Sync.tryAcquire
方法的逻辑比较简单,就直接在代码中注释,具体以下:
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); // 读写锁的次数 int c = getState(); // 写锁的次数 int w = exclusiveCount(c); /* * 若是读写锁的次数不为0,说明锁可能有如下3中状况: * 一、所有是读线程占用资源 * 2. 所有是写线程占用资源 * 3. 读写线程都占用了资源(锁降级:持有写锁的线程能够去持有读锁),可是读写线程都是同一个线程 */ if (c != 0) { // 写线程不占用资源,第一个获取锁的线程也不是当前线程,直接获取失败 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 检查获取写锁的线程是否超过了最大的重入次数 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 修改state的状态,之因此没有用CAS操做来修改,是由于写线程只有一个,是独占的 setState(c + acquires); return true; } /* * 写线程是第一个竞争锁资源的线程 * 若是写线程须要等待(公平锁的状况),或者 * 写线程的state设置失败,直接返回false */ if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; // 设置当前线程为owner setExclusiveOwnerThread(current); return true; }
Sync.tryRelease
方便的代码很简单,直接看代码中的注释
protected final boolean tryRelease(int releases) { // 若是释放锁的线程不持有锁,返回失败 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 获取写锁的重入的次数 int nextc = getState() - releases; // 若是次数为0,须要释放锁的owner boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); // 释放锁的owner setState(nextc); return free; }
Sync.tryWriteLock
这个方法也比较简单,就直接上代码了
final boolean tryWriteLock() { Thread current = Thread.currentThread(); int c = getState(); // 读锁或者写锁已经被线程持有 if (c != 0) { int w = exclusiveCount(c); // 写锁第一次获取锁或者当前线程不是第一次获取写锁的线程(也就是否是owner),直接失败 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 超出写锁的最大次数,直接失败 if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } // 竞争写锁的线程修改state, // 若是成功将本身设置成锁的owner, // 若是失败直接返回 if (!compareAndSetState(c, c + 1)) return false; setExclusiveOwnerThread(current); // 设置当前线程持有锁 return true; }