咱们在介绍AbstractQueuedSynchronizer的时候介绍过,AQS支持独占式同步状态获取/释放、共享式同步状态获取/释放两种模式,对应的典型应用分别是ReentrantLock和Semaphore,AQS还能够混合两种模式使用,读写锁ReentrantReadWriteLock就是如此。java
设想如下情景:咱们在系统中有一个多线程访问的缓存,多个线程均可以对缓存进行读或写操做,可是读操做远远多于写操做,要求写操做要线程安全,且写操做执行完成要求对当前的全部读操做立刻可见。缓存
分析上面的需求:由于有多个线程可能会执行写操做,所以多个线程的写操做必须同步串行执行;而写操做执行完成要求对当前的全部读操做立刻可见,这就意味着当有线程正在读的时候,要阻塞写操做,当正在执行写操做时,要阻塞读操做。一个简单的实现就是将数据直接加上互斥锁,同一时刻不论是读仍是写线程,都只能有一个线程操做数据。可是这样的问题就是若是当前只有N个读线程,没有写线程,这N个读线程也要傻呵呵的排队读,尽管实际上是能够安全并发提升效率的。所以理想的实现是:安全
当有写线程时,则写线程独占同步状态。多线程
当没有写线程时只有读线程时,则多个读线程能够共享同步状态。并发
读写锁就是为了实现这种效果而生。函数
咱们先来看一下读写锁怎么使用,这里咱们基于hashmap(自己线程不安全)作一个多线程并发安全的缓存:源码分析
public class ReadWriteCache { private static Map<String, Object> data = new HashMap<>(); private static ReadWriteLock lock = new ReentrantReadWriteLock(false); private static Lock rlock = lock.readLock(); private static Lock wlock = lock.writeLock(); public static Object get(String key) { rlock.lock(); try { return data.get(key); } finally { rlock.unlock(); } } public static Object put(String key, Object value) { wlock.lock(); try { return data.put(key, value); } finally { wlock.unlock(); } } }
限于篇幅咱们只实现2个方法,get和put。从代码能够看出,咱们先建立一个 ReentrantReadWriteLock 对象,构造函数 false 表明是非公平的(非公平的含义和ReentrantLock相同)。而后经过readLock、writeLock方法分别获取读锁和写锁。在作读操做的时候,也就是get方法,咱们要先获取读锁;在作写操做的时候,即put方法,咱们要先获取写锁。ui
经过以上代码,咱们就构造了一个线程安全的缓存,达到咱们以前说的:写线程独占同步状态,多个读线程能够共享同步状态。this
咱们先来看下 ReentrantReadWriteLock 类的总体结构:spa
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { private final ReentrantReadWriteLock.ReadLock readerLock; /** Inner class providing writelock */ private final ReentrantReadWriteLock.WriteLock writerLock; /** Performs all synchronization mechanics */ final Sync sync; public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; } abstract static class Sync extends AbstractQueuedSynchronizer {}
static final class NonfairSync extends Sync {}
static final class FairSync extends Sync {}
public static class ReadLock implements Lock, java.io.Serializable {}
public static class WriteLock implements Lock, java.io.Serializable {} }
能够看到,在公平锁与非公平锁的实现上,与ReentrantLock同样,也是有一个继承AQS的内部类Sync,而后NonfairSync和FairSync都继承Sync,经过构造函数传入的布尔值决定要构造哪种Sync实例。
读写锁比ReentrantLock多出了两个内部类:ReadLock和WriteLock, 用来定义读锁和写锁,而后在构造函数中,会构造一个读锁和一个写锁实例保存到成员变量 readerLock 和 writerLock。咱们在上面的示例中使用到的 readLock() 和 writeLock() 方法就是返回这两个成员变量保存的锁实例。
咱们在Sync类中能够看到下列代码:
static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); //每次要让共享锁+1,就应该让state加 1<<16 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; //每种锁的最大重入数量 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** Returns the number of shared holds represented in count */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
能够看到主要是几个位移操做,经过上面的总体结构,咱们知道了在读写锁内保存了读锁和写锁的两个实例。以前在ReentrantLock中,咱们知道锁的状态是保存在Sync实例的state字段中的(继承自父类AQS),如今有了读写两把锁,然而能够看到仍是只有一个Sync实例,那么一个Sync实例的state是如何同时保存两把锁的状态的呢?答案就是用了位分隔:
state字段是32位的int,读写锁用state的低16位保存写锁(独占锁)的状态;高16位保存读锁(共享锁)的状态。
所以要获取独占锁当前的重入数量,就是 state & ((1 << 16) -1) (即 exclusiveCount 方法)
要获取共享锁当前的重入数量,就是 state >>> 16 (即 sharedCount 方法)
下面咱们具体看写锁和读锁的实现。
看下WriteLock类中的lock和unlock方法:
public void lock() { sync.acquire(1); } public void unlock() { sync.release(1); }
能够看到就是调用的独占式同步状态的获取与释放,所以真实的实现就是Sync的 tryAcquire和 tryRelease。
看下tryAcquire:
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); //获取独占锁的重入数 if (c != 0) { // 当前state不为0,此时:若是写锁状态为0说明读锁此时被占用返回false;若是写锁状态不为0且写锁没有被当前线程持有返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); //写锁重入数溢出 // Reentrant acquire setState(c + acquires); return true; }
//到这里了说明state为0,尝试直接cas。writerShouldBlock是为了实现公平或非公平策略的 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
逻辑很简单,直接看注释就能理解。
看下tryRelease:
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //非独占模式直接抛异常 int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); //若是独占模式重入数为0了,说明独占模式被释放 setState(nextc); //无论独占模式是否被释放,更新独占重入数 return free; }
逻辑很简单,直接看注释就能理解。
相似于写锁,读锁的lock和unlock的实际实现对应Sync的 tryAcquireShared 和 tryReleaseShared方法。
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //若是独占模式被占且不是当前线程持有,则获取失败 int r = sharedCount(c);
//若是公平策略没有要求阻塞且重入数没有到达最大值,则直接尝试CAS更新state if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
//更新成功后会在firstReaderHoldCount中或readHolds(ThreadLocal类型的)的本线程副本中记录当前线程重入数(浅蓝色代码),这是为了实现jdk1.6中加入的getReadHoldCount()方法的,这个方法能获取当前线程重入共享锁的次数(state中记录的是多个线程的总重入次数),加入了这个方法让代码复杂了很多,可是其原理仍是很简单的:若是当前只有一个线程的话,还不须要动用ThreadLocal,直接往firstReaderHoldCount这个成员变量里存重入数,当有第二个线程来的时候,就要动用ThreadLocal变量readHolds了,每一个线程拥有本身的副本,用来保存本身的重入数。 if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); //用来处理CAS没成功的状况,逻辑和上面的逻辑是相似的,就是加了无限循环 }
下面这个方法就不用细说了,和上面的处理逻辑相似,加了无限循环用来处理CAS失败的状况。
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread();
//浅蓝色代码也是为了实现jdk1.6中加入的getReadHoldCount()方法,在更新当前线程的重入数。 if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; }
//这里是真正的释放同步状态的逻辑,就是直接同步状态-SHARED_UNIT,而后CAS更新,没啥好说的 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
经过上面的源码分析,咱们能够发现一个现象:
在线程持有读锁的状况下,该线程不能取得写锁(由于获取写锁的时候,若是发现当前的读锁被占用,就立刻获取失败,无论读锁是否是被当前线程持有)
在线程持有写锁的状况下,该线程能够继续获取读锁(获取读锁时若是发现写锁被占用,只有写锁没有被当前线程占用的状况才会获取失败)
仔细想一想,这个设计是合理的:由于当线程获取读锁的时候,可能有其余线程同时也在持有读锁,所以不能把获取读锁的线程“升级”为写锁;而对于得到写锁的线程,它必定独占了读写锁,所以能够继续让它获取读锁,当它同时获取了写锁和读锁后,还能够先释放写锁继续持有读锁,这样一个写锁就“降级”为了读锁。
综上:
一个线程要想同时持有写锁和读锁,必须先获取写锁再获取读锁;
写锁能够“降级”为读锁;
读锁不能“升级”为写锁。
读写锁仍是很实用的,由于通常场景下,数据的并发操做都是读多于写,在这种状况下,读写锁可以提供比排它锁更好的并发性。
在读写锁的实现方面,原本觉得会比较复杂,结果看完源码的感觉也是快刀切西瓜,看来AQS的设计真的很棒,在AQS的基础上构建的组件实现都很简单。